当前位置: 首页 > news >正文

Flink实时电商数仓(八)

用户域登录各窗口汇总表

  • 主要任务:从kafka页面日志主题读取数据,统计
    • 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户
    • 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。

思路分析

  1. 读取kafka页面主题数据
  2. 转换数据结构:String -> JSONObject
  3. 过滤数据,uid不为null
    • 登录的两种情况
      • 用户打开应用后自动登录
      • 用户打印应用后没有登录,浏览后跳转到登录页面
    • 过滤条件:
      • uid不为null且last_page_id is null
      • last_page_id = login
  4. 设置水位线
  5. 按照uid分组
  6. 统计回流用户数和独立用户数
  7. 开窗聚合
  8. 写入doris

具体实现

  1. 设置端口、并行度、消费者组、kafka主题
  2. 读取dwd页面主题数据
    - stream.print()
  3. 对数据进行清洗过滤:uid不为空
    • stream.flatMap()使用flatMap过滤
    • new FlatMapFunction<>(){}在该方法内部转换为JSONObject, 并且获取uid和lastPageId, try-catch这段代码
    • 判断是否满足思路分析中的条件,如果中途发生异常,直接catch后打印到控制台清理掉即可。
  4. 先注册水位线
    • jsonObjStream.assignTimestampAndWatermark
    • new SerializableTimestampAssigner<>, 提取数据中的ts
  5. 按照uid分组
    • stream.keyby()按照uid进行分组
  6. 判断独立用户和回流用户
    • 创建UserLoginBean, 使用状态保存用户的登录信息
    • 在open方法中,getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))创建状态记录用户上一次的登录时间
    • processElement方法中比较当前登录的日期和状态存储的日期
      • 如果lastLoginDt==null是新用户
      • 如果不为空,判断上次登录时间和当前时间的差值是否大于7天;如果大于7天,说明是回流用户。
      • 如果小于7天,还需要判断上次登录时间是否是今天,如果不是今天,则说明该用户本次是独立用户。
  7. 开窗聚合
    • 使用滚动窗口开窗聚合
    • reduce算子中写聚合逻辑
    • process算子中获取窗口信息
  8. 写入doris
    • 创建doris sink,写出到doris

核心代码

public static void main(String[] args) {new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//1.读取dwd页面数据//stream.print();//2. 对数据进行清洗过滤SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);//3. 注册水位线SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);//4. 按照uid分组KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);//5. 判断独立用户和回流用户SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);//processedStream.print();//开窗聚合SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);//reducedStream.print();//写入DorisreducedStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));}

[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)

相关文章:

Flink实时电商数仓(八)

用户域登录各窗口汇总表 主要任务&#xff1a;从kafka页面日志主题读取数据&#xff0c;统计 七日回流用户&#xff1a;之前活跃的用户&#xff0c;有一段时间不活跃了&#xff0c;之后又开始活跃&#xff0c;称为回流用户当日独立用户数&#xff1a;同一个用户当天重复登录&a…...

Python Pymysql实现数据存储

什么是 PyMySQL&#xff1f; PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库&#xff0c;Python2 中则使用mysqldb。 PyMySQL 遵循 Python 数据库 API v2.0 规范&#xff0c;并包含了 pure-Python MySQL 客户端库。 PyMySQL 安装 在使用 PyMySQL 之前&#xf…...

软件测试/测试开发丨Python 常用第三方库 pymysql

pymysql 概述 Python 的数据库接口标准是 Python DB-APIPyMySQL 是从 Python 连接到 MySQL 数据库服务器的接口PyMySQL 的目标是成为 MySQLdb 的替代品官方文档&#xff1a;pymysql.readthedocs.io/ pymysql 安装 使用 pip 安装使用 Pycharm 界面安装 pip install pymysqlp…...

第二节 linux操作系统安装与配置

一&#xff1a;Vmware虚拟机安装与使用   ①VMware是一个虚拟PC的软件&#xff0c;可以在现有的操作系统上虚拟出一个新的硬件环境&#xff0c;相当于模拟出一台新的PC &#xff0c;以此来实现在一台机器上真正同时运行多个独立的操作系统。   ②VMware主要特点&#xff1a…...

ChatGPT 对SEO的影响

ChatGPT 的兴起是否预示着 SEO 的终结&#xff1f; 一点也不。事实上&#xff0c;如果使用得当&#xff0c;它可以让你的 SEO 工作变得更加容易。 强调“正确使用时”。 你可以使用ChatGPT来帮助进行关键字研究的头脑风暴部分、重新措辞你的内容、生成架构标记等等。 但你不…...

光伏逆变器MPPT的作用、原理及算法

MPPT是逆变器非常核心的技术&#xff0c;MPPT电压在进行光伏电站设计时一项非常关键的参数。 一、什么是MPPT&#xff1f; &#xff08;单块光伏组件的I-V、P-V曲线&#xff09; 上图中&#xff0c;光伏组件的输出电压和电流遵循I-V曲线(绿色)、P-V曲线(蓝色)&#xff0c;如果…...

一文详解pyspark常用算子与API

rdd.glom() 对rdd的数据进行嵌套&#xff0c;嵌套按照分区来进行 rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)print(rdd.glom().collect()) 输出&#xff1a;[[1,2,3,4],[5,6,7,8,9]] 参考 PySpark基础入门&#xff08;2&#xff09;&#xff1a;RDD及其常用算子…...

使用Rollup 搭建开发环境

1 什么是Rollup Rollup 是一个用于 JavaScript 的模块打包工具&#xff0c;它将小的代码片段编译成更大、更复杂的代码&#xff0c;例如库或应用程序。它使用 JavaScript 的 ES6 版本中包含的新标准化代码模块格式&#xff0c;而不是以前的 CommonJS 和 AMD 等特殊解决方案。(开…...

ubuntu:beyond compare 4 This license key has been revoked 解决办法

https://www.cnblogs.com/zhibei/p/12095431.html 错误如图所示&#xff1a; 解决办法&#xff1a; &#xff08;1&#xff09;先用find命令找到bcompare所在位置&#xff1a;sudo find /home/ -name *bcompare &#xff08;2&#xff09;进入 /home/whf/.config,删除/bco…...

华为交换机生成树STP配置案例

企业内部网络怎么防止网络出现环路&#xff1f;学会STP生成树技术就可以解决啦。 STP简介 在二层交换网络中&#xff0c;一旦存在环路就会造成报文在环路内不断循环和增生&#xff0c;产生广播风暴&#xff0c;从而占用所有的有效带宽&#xff0c;使网络变得无法正常通信。 在…...

Avalonia框架下实现热更新

在Avalonia框架下实现热更新&#xff08;也称为动态加载或模块化更新&#xff09;&#xff0c;通常涉及程序集的动态加载与卸载&#xff0c;以及UI元素、视图模型或其他应用程序逻辑部分的实时替换。由于Avalonia本身是一个跨平台的GUI框架&#xff0c;并没有直接内置热更新机制…...

适用于各种危险区域的火焰识别摄像机,实时监测、火灾预防、安全监控,为安全保驾护航

火灾是一种极具破坏力的灾难&#xff0c;对人们的生命和财产造成了严重的威胁。为了更好地预防和防范火灾&#xff0c;火焰识别摄像机作为一种先进的监控设备&#xff0c;正逐渐受到人们的重视和应用。本文将介绍火焰识别摄像机在安全监控和火灾预防方面的全面应用方案。 一、火…...

react-router-dom5升级到6

前言 升级前版本为5.1.2 下载与运行 下载 npm install react-router-dom6运行 运行发现报错: 将node_modules删除&#xff0c;重新执行npm i即可 运行发现如下报错 这是因为之前有引用react-router-dom.min&#xff0c;v6中取消了该文件&#xff0c;所以未找到文件导致报错。…...

Linux调试工具—gdb

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;HEART BEAT—YOASOBI 2:20━━━━━━️&#x1f49f;──────── 5:35 &#x1f504; ◀️ ⏸ ▶️ ☰ …...

SpringCloud(H版alibaba)框架开发教程之nacos做配置中心——附源码(2)

上篇主要讲了使用eureka&#xff0c;zk&#xff0c;nacos当注册中心 这篇内容是nacos配置中心 代码改动部分mysql驱动更新到8.0&#xff0c;数据库版本升级到了8.0&#xff0c;nacos版本更新到了2.x nacos2.x链接 链接&#xff1a;https://pan.baidu.com/s/11nObzgTjWisAfOp…...

网络摄像头爆破实战

*** 重要说明&#xff1a;仅用于交流网络安全测试技术&#xff0c;并唤起大家对网络安全的重视&#xff0c;如用本文的技术干违法的事情&#xff0c;博主概不负责。*** 文章目录 前言1. 发现摄像头2. 发现端口3. 确定品牌信息4. 确定RTSP地址5. 获取视频流6. 获取密码7. 再次获…...

亚信安慧AntDB数据并行加载工具的实现(二)

3.功能性说明 本节对并行加载工具的部分支持的功能进行简要说明。 1) 支持表类型 并行加载工具支持普通表、分区表。 2) 支持指定导入字段 文件中并不是必须包含表中所有的字段&#xff0c;用户可以指定导入某些字段&#xff0c;但是指定的字段数要和文件中的字段数保持一…...

【Java进阶篇】JDK新版本中的新特性都有哪些

JDK新版本中的新特性都有哪些 ✔️经典解析✔️拓展知识仓✔️本地变量类型推断✔️Switch 表达式✔️Text Blocks✔️Records✔️封装类✔️instanceof 模式匹配✔️switch 模式匹配 ✅✔️虚拟线程 ✔️经典解析 JDK 8中推出了Lambda表达式、Stream、Optional、新的日期API等…...

力扣labuladong一刷day49天迪杰斯特拉

力扣labuladong一刷day49天迪杰斯特拉 文章目录 力扣labuladong一刷day49天迪杰斯特拉一、743. 网络延迟时间二、1631. 最小体力消耗路径三、1514. 概率最大的路径 一、743. 网络延迟时间 题目链接&#xff1a;https://leetcode.cn/problems/network-delay-time/ 使用迪杰斯特…...

MCS接口技术----定时/计数,中断

目录 一.中断系统相关寄存器 1.51单片机中断系统的总体结构&#xff1a; 2.中断源的中断级别&#xff08;由高到低&#xff09;&#xff1a; 3.与中断有关的四个寄存器&#xff1a; &#xff08;1&#xff09;TCON---定时控制寄存器 &#xff08;2&#xff09;IE---中断允…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

多场景 OkHttpClient 管理器 - Android 网络通信解决方案

下面是一个完整的 Android 实现&#xff0c;展示如何创建和管理多个 OkHttpClient 实例&#xff0c;分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

渲染学进阶内容——模型

最近在写模组的时候发现渲染器里面离不开模型的定义,在渲染的第二篇文章中简单的讲解了一下关于模型部分的内容,其实不管是方块还是方块实体,都离不开模型的内容 🧱 一、CubeListBuilder 功能解析 CubeListBuilder 是 Minecraft Java 版模型系统的核心构建器,用于动态创…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...