Flink实时电商数仓(八)
用户域登录各窗口汇总表
- 主要任务:从kafka页面日志主题读取数据,统计
- 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户
- 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。
思路分析
- 读取kafka页面主题数据
- 转换数据结构:
String -> JSONObject - 过滤数据,uid不为null
- 登录的两种情况
- 用户打开应用后自动登录
- 用户打印应用后没有登录,浏览后跳转到登录页面
- 过滤条件:
- uid不为null且last_page_id is null
- last_page_id = login
- 登录的两种情况
- 设置水位线
- 按照uid分组
- 统计回流用户数和独立用户数
- 开窗聚合
- 写入doris
具体实现
- 设置端口、并行度、消费者组、kafka主题
- 读取dwd页面主题数据
-stream.print() - 对数据进行清洗过滤:uid不为空
stream.flatMap()使用flatMap过滤new FlatMapFunction<>(){}在该方法内部转换为JSONObject, 并且获取uid和lastPageId, try-catch这段代码- 判断是否满足思路分析中的条件,如果中途发生异常,直接catch后打印到控制台清理掉即可。
- 先注册水位线
jsonObjStream.assignTimestampAndWatermarknew SerializableTimestampAssigner<>, 提取数据中的ts
- 按照uid分组
stream.keyby()按照uid进行分组
- 判断独立用户和回流用户
- 创建
UserLoginBean, 使用状态保存用户的登录信息 - 在open方法中,
getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))创建状态记录用户上一次的登录时间 - 在
processElement方法中比较当前登录的日期和状态存储的日期- 如果
lastLoginDt==null是新用户 - 如果不为空,判断上次登录时间和当前时间的差值是否大于7天;如果大于7天,说明是回流用户。
- 如果小于7天,还需要判断上次登录时间是否是今天,如果不是今天,则说明该用户本次是独立用户。
- 如果
- 创建
- 开窗聚合
- 使用滚动窗口开窗聚合
- 在
reduce算子中写聚合逻辑 - 在
process算子中获取窗口信息
- 写入doris
- 创建
doris sink,写出到doris
- 创建
核心代码
public static void main(String[] args) {new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//1.读取dwd页面数据//stream.print();//2. 对数据进行清洗过滤SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);//3. 注册水位线SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);//4. 按照uid分组KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);//5. 判断独立用户和回流用户SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);//processedStream.print();//开窗聚合SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);//reducedStream.print();//写入DorisreducedStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));}
[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)
相关文章:
Flink实时电商数仓(八)
用户域登录各窗口汇总表 主要任务:从kafka页面日志主题读取数据,统计 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户当日独立用户数:同一个用户当天重复登录&a…...
Python Pymysql实现数据存储
什么是 PyMySQL? PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库,Python2 中则使用mysqldb。 PyMySQL 遵循 Python 数据库 API v2.0 规范,并包含了 pure-Python MySQL 客户端库。 PyMySQL 安装 在使用 PyMySQL 之前…...
软件测试/测试开发丨Python 常用第三方库 pymysql
pymysql 概述 Python 的数据库接口标准是 Python DB-APIPyMySQL 是从 Python 连接到 MySQL 数据库服务器的接口PyMySQL 的目标是成为 MySQLdb 的替代品官方文档:pymysql.readthedocs.io/ pymysql 安装 使用 pip 安装使用 Pycharm 界面安装 pip install pymysqlp…...
第二节 linux操作系统安装与配置
一:Vmware虚拟机安装与使用 ①VMware是一个虚拟PC的软件,可以在现有的操作系统上虚拟出一个新的硬件环境,相当于模拟出一台新的PC ,以此来实现在一台机器上真正同时运行多个独立的操作系统。 ②VMware主要特点:…...
ChatGPT 对SEO的影响
ChatGPT 的兴起是否预示着 SEO 的终结? 一点也不。事实上,如果使用得当,它可以让你的 SEO 工作变得更加容易。 强调“正确使用时”。 你可以使用ChatGPT来帮助进行关键字研究的头脑风暴部分、重新措辞你的内容、生成架构标记等等。 但你不…...
光伏逆变器MPPT的作用、原理及算法
MPPT是逆变器非常核心的技术,MPPT电压在进行光伏电站设计时一项非常关键的参数。 一、什么是MPPT? (单块光伏组件的I-V、P-V曲线) 上图中,光伏组件的输出电压和电流遵循I-V曲线(绿色)、P-V曲线(蓝色),如果…...
一文详解pyspark常用算子与API
rdd.glom() 对rdd的数据进行嵌套,嵌套按照分区来进行 rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)print(rdd.glom().collect()) 输出:[[1,2,3,4],[5,6,7,8,9]] 参考 PySpark基础入门(2):RDD及其常用算子…...
使用Rollup 搭建开发环境
1 什么是Rollup Rollup 是一个用于 JavaScript 的模块打包工具,它将小的代码片段编译成更大、更复杂的代码,例如库或应用程序。它使用 JavaScript 的 ES6 版本中包含的新标准化代码模块格式,而不是以前的 CommonJS 和 AMD 等特殊解决方案。(开…...
ubuntu:beyond compare 4 This license key has been revoked 解决办法
https://www.cnblogs.com/zhibei/p/12095431.html 错误如图所示: 解决办法: (1)先用find命令找到bcompare所在位置:sudo find /home/ -name *bcompare (2)进入 /home/whf/.config,删除/bco…...
华为交换机生成树STP配置案例
企业内部网络怎么防止网络出现环路?学会STP生成树技术就可以解决啦。 STP简介 在二层交换网络中,一旦存在环路就会造成报文在环路内不断循环和增生,产生广播风暴,从而占用所有的有效带宽,使网络变得无法正常通信。 在…...
Avalonia框架下实现热更新
在Avalonia框架下实现热更新(也称为动态加载或模块化更新),通常涉及程序集的动态加载与卸载,以及UI元素、视图模型或其他应用程序逻辑部分的实时替换。由于Avalonia本身是一个跨平台的GUI框架,并没有直接内置热更新机制…...
适用于各种危险区域的火焰识别摄像机,实时监测、火灾预防、安全监控,为安全保驾护航
火灾是一种极具破坏力的灾难,对人们的生命和财产造成了严重的威胁。为了更好地预防和防范火灾,火焰识别摄像机作为一种先进的监控设备,正逐渐受到人们的重视和应用。本文将介绍火焰识别摄像机在安全监控和火灾预防方面的全面应用方案。 一、火…...
react-router-dom5升级到6
前言 升级前版本为5.1.2 下载与运行 下载 npm install react-router-dom6运行 运行发现报错: 将node_modules删除,重新执行npm i即可 运行发现如下报错 这是因为之前有引用react-router-dom.min,v6中取消了该文件,所以未找到文件导致报错。…...
Linux调试工具—gdb
🎬慕斯主页:修仙—别有洞天 ♈️今日夜电波:HEART BEAT—YOASOBI 2:20━━━━━━️💟──────── 5:35 🔄 ◀️ ⏸ ▶️ ☰ …...
SpringCloud(H版alibaba)框架开发教程之nacos做配置中心——附源码(2)
上篇主要讲了使用eureka,zk,nacos当注册中心 这篇内容是nacos配置中心 代码改动部分mysql驱动更新到8.0,数据库版本升级到了8.0,nacos版本更新到了2.x nacos2.x链接 链接:https://pan.baidu.com/s/11nObzgTjWisAfOp…...
网络摄像头爆破实战
*** 重要说明:仅用于交流网络安全测试技术,并唤起大家对网络安全的重视,如用本文的技术干违法的事情,博主概不负责。*** 文章目录 前言1. 发现摄像头2. 发现端口3. 确定品牌信息4. 确定RTSP地址5. 获取视频流6. 获取密码7. 再次获…...
亚信安慧AntDB数据并行加载工具的实现(二)
3.功能性说明 本节对并行加载工具的部分支持的功能进行简要说明。 1) 支持表类型 并行加载工具支持普通表、分区表。 2) 支持指定导入字段 文件中并不是必须包含表中所有的字段,用户可以指定导入某些字段,但是指定的字段数要和文件中的字段数保持一…...
【Java进阶篇】JDK新版本中的新特性都有哪些
JDK新版本中的新特性都有哪些 ✔️经典解析✔️拓展知识仓✔️本地变量类型推断✔️Switch 表达式✔️Text Blocks✔️Records✔️封装类✔️instanceof 模式匹配✔️switch 模式匹配 ✅✔️虚拟线程 ✔️经典解析 JDK 8中推出了Lambda表达式、Stream、Optional、新的日期API等…...
力扣labuladong一刷day49天迪杰斯特拉
力扣labuladong一刷day49天迪杰斯特拉 文章目录 力扣labuladong一刷day49天迪杰斯特拉一、743. 网络延迟时间二、1631. 最小体力消耗路径三、1514. 概率最大的路径 一、743. 网络延迟时间 题目链接:https://leetcode.cn/problems/network-delay-time/ 使用迪杰斯特…...
MCS接口技术----定时/计数,中断
目录 一.中断系统相关寄存器 1.51单片机中断系统的总体结构: 2.中断源的中断级别(由高到低): 3.与中断有关的四个寄存器: (1)TCON---定时控制寄存器 (2)IE---中断允…...
Java JFreeChart 折线图X轴标签优化:5分钟搞定密集数据展示问题
Java JFreeChart折线图X轴标签优化实战:解决密集数据展示难题 在数据可视化领域,折线图是最常用的图表类型之一。但当数据量激增时,X轴标签往往会因为空间不足而显示为省略号,严重影响图表可读性。本文将深入探讨如何通过定制化方…...
获取应用内部JMX统计信息的编程方法
本文介绍了如何在Java应用程序中编程JMX(Java Management Extensions)统计信息,无需建立远程连接或使用外部JMX客户端。通过直接访问MBeanServer,您可以查询和获取应用程序中的各种性能指标和管理信息,如Kafka消费者组…...
C# 工业级温度监控软件:支持多PLC通信与实时曲线绘制
前言工业自动化领域,温度监控是保障生产安全与产品质量的核心环节。面对多台设备分散、数据孤岛严重的现状,开发一套高效、可视化的上位机系统显得尤为重要。本文将详细介绍一款基于 WinForms 与 S7.Net 开发的温度监控系统。该系统不仅实现了对多台西门…...
从‘飞到红色建筑左边’说起:拆解无人机视觉语言导航(VLN)背后的三大工程难题
从"飞到红色建筑左边"说起:拆解无人机视觉语言导航的工程化困局 当你在测试场地对无人机说出"飞到红色建筑左边"时,这个看似简单的指令背后,是一场跨越模态鸿沟的复杂解码过程。不同于实验室里的完美演示,真实…...
3步掌握Umi-OCR批量处理:从海量图片中高效提取文字
3步掌握Umi-OCR批量处理:从海量图片中高效提取文字 【免费下载链接】Umi-OCR Umi-OCR: 这是一个免费、开源、可批量处理的离线OCR软件,适用于Windows系统,支持截图OCR、批量OCR、二维码识别等功能。 项目地址: https://gitcode.com/GitHub_…...
西北工业大学GeekOS实验踩坑记:从分段到分页,手把手教你搞定Project4的虚拟内存
西北工业大学GeekOS实验深度解析:虚拟内存实现与优化实战 实验背景与核心挑战 操作系统课程中的GeekOS项目一直是计算机专业学生深入理解系统底层原理的重要实践环节。Project4作为其中的关键里程碑,要求学生从分段存储管理过渡到分页虚拟内存系统的实…...
从报错到解决:Pycharm中Tensorflow2.x与1.x代码兼容性问题全解析
从报错到解决:Pycharm中Tensorflow2.x与1.x代码兼容性问题全解析 在深度学习领域,TensorFlow作为最受欢迎的框架之一,其版本迭代带来的变化常常让开发者感到头疼。特别是从TensorFlow 1.x升级到2.x版本后,许多核心API发生了重大改…...
嘎嘎降AI退款申请完整流程:不达标怎么拿回费用的具体步骤
嘎嘎降AI退款申请完整流程:不达标怎么拿回费用的具体步骤 这篇教程来自实操经验。帮三个同学处理过论文AI率,加上自己的,前后操作了十几次。把流程总结成教程,尽量详细。 核心工具推荐嘎嘎降AI(www.aigcleaner.com&a…...
ESP32智能语音助手创新实践:从技术原理到跨设备协同
ESP32智能语音助手创新实践:从技术原理到跨设备协同 【免费下载链接】xiaozhi-esp32 Build your own AI friend 项目地址: https://gitcode.com/GitHub_Trending/xia/xiaozhi-esp32 随着物联网技术的飞速发展,智能语音交互已成为连接物理世界与数…...
STVD与STVP实战指南:从环境搭建到串口烧录全流程解析
1. STVD与STVP开发环境全解析 第一次接触STM8开发的朋友,往往会被STVD和STVP这两个工具搞得一头雾水。我刚开始用的时候也踩过不少坑,比如明明安装了STVD却编译不了C程序,烧录时总是提示设备保护。后来才发现,STM8开发需要工具链的…...
