当前位置: 首页 > news >正文

Flink实时电商数仓(八)

用户域登录各窗口汇总表

  • 主要任务:从kafka页面日志主题读取数据,统计
    • 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户
    • 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。

思路分析

  1. 读取kafka页面主题数据
  2. 转换数据结构:String -> JSONObject
  3. 过滤数据,uid不为null
    • 登录的两种情况
      • 用户打开应用后自动登录
      • 用户打印应用后没有登录,浏览后跳转到登录页面
    • 过滤条件:
      • uid不为null且last_page_id is null
      • last_page_id = login
  4. 设置水位线
  5. 按照uid分组
  6. 统计回流用户数和独立用户数
  7. 开窗聚合
  8. 写入doris

具体实现

  1. 设置端口、并行度、消费者组、kafka主题
  2. 读取dwd页面主题数据
    - stream.print()
  3. 对数据进行清洗过滤:uid不为空
    • stream.flatMap()使用flatMap过滤
    • new FlatMapFunction<>(){}在该方法内部转换为JSONObject, 并且获取uid和lastPageId, try-catch这段代码
    • 判断是否满足思路分析中的条件,如果中途发生异常,直接catch后打印到控制台清理掉即可。
  4. 先注册水位线
    • jsonObjStream.assignTimestampAndWatermark
    • new SerializableTimestampAssigner<>, 提取数据中的ts
  5. 按照uid分组
    • stream.keyby()按照uid进行分组
  6. 判断独立用户和回流用户
    • 创建UserLoginBean, 使用状态保存用户的登录信息
    • 在open方法中,getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))创建状态记录用户上一次的登录时间
    • processElement方法中比较当前登录的日期和状态存储的日期
      • 如果lastLoginDt==null是新用户
      • 如果不为空,判断上次登录时间和当前时间的差值是否大于7天;如果大于7天,说明是回流用户。
      • 如果小于7天,还需要判断上次登录时间是否是今天,如果不是今天,则说明该用户本次是独立用户。
  7. 开窗聚合
    • 使用滚动窗口开窗聚合
    • reduce算子中写聚合逻辑
    • process算子中获取窗口信息
  8. 写入doris
    • 创建doris sink,写出到doris

核心代码

public static void main(String[] args) {new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//1.读取dwd页面数据//stream.print();//2. 对数据进行清洗过滤SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);//3. 注册水位线SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);//4. 按照uid分组KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);//5. 判断独立用户和回流用户SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);//processedStream.print();//开窗聚合SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);//reducedStream.print();//写入DorisreducedStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));}

[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)

相关文章:

Flink实时电商数仓(八)

用户域登录各窗口汇总表 主要任务&#xff1a;从kafka页面日志主题读取数据&#xff0c;统计 七日回流用户&#xff1a;之前活跃的用户&#xff0c;有一段时间不活跃了&#xff0c;之后又开始活跃&#xff0c;称为回流用户当日独立用户数&#xff1a;同一个用户当天重复登录&a…...

Python Pymysql实现数据存储

什么是 PyMySQL&#xff1f; PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库&#xff0c;Python2 中则使用mysqldb。 PyMySQL 遵循 Python 数据库 API v2.0 规范&#xff0c;并包含了 pure-Python MySQL 客户端库。 PyMySQL 安装 在使用 PyMySQL 之前&#xf…...

软件测试/测试开发丨Python 常用第三方库 pymysql

pymysql 概述 Python 的数据库接口标准是 Python DB-APIPyMySQL 是从 Python 连接到 MySQL 数据库服务器的接口PyMySQL 的目标是成为 MySQLdb 的替代品官方文档&#xff1a;pymysql.readthedocs.io/ pymysql 安装 使用 pip 安装使用 Pycharm 界面安装 pip install pymysqlp…...

第二节 linux操作系统安装与配置

一&#xff1a;Vmware虚拟机安装与使用   ①VMware是一个虚拟PC的软件&#xff0c;可以在现有的操作系统上虚拟出一个新的硬件环境&#xff0c;相当于模拟出一台新的PC &#xff0c;以此来实现在一台机器上真正同时运行多个独立的操作系统。   ②VMware主要特点&#xff1a…...

ChatGPT 对SEO的影响

ChatGPT 的兴起是否预示着 SEO 的终结&#xff1f; 一点也不。事实上&#xff0c;如果使用得当&#xff0c;它可以让你的 SEO 工作变得更加容易。 强调“正确使用时”。 你可以使用ChatGPT来帮助进行关键字研究的头脑风暴部分、重新措辞你的内容、生成架构标记等等。 但你不…...

光伏逆变器MPPT的作用、原理及算法

MPPT是逆变器非常核心的技术&#xff0c;MPPT电压在进行光伏电站设计时一项非常关键的参数。 一、什么是MPPT&#xff1f; &#xff08;单块光伏组件的I-V、P-V曲线&#xff09; 上图中&#xff0c;光伏组件的输出电压和电流遵循I-V曲线(绿色)、P-V曲线(蓝色)&#xff0c;如果…...

一文详解pyspark常用算子与API

rdd.glom() 对rdd的数据进行嵌套&#xff0c;嵌套按照分区来进行 rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)print(rdd.glom().collect()) 输出&#xff1a;[[1,2,3,4],[5,6,7,8,9]] 参考 PySpark基础入门&#xff08;2&#xff09;&#xff1a;RDD及其常用算子…...

使用Rollup 搭建开发环境

1 什么是Rollup Rollup 是一个用于 JavaScript 的模块打包工具&#xff0c;它将小的代码片段编译成更大、更复杂的代码&#xff0c;例如库或应用程序。它使用 JavaScript 的 ES6 版本中包含的新标准化代码模块格式&#xff0c;而不是以前的 CommonJS 和 AMD 等特殊解决方案。(开…...

ubuntu:beyond compare 4 This license key has been revoked 解决办法

https://www.cnblogs.com/zhibei/p/12095431.html 错误如图所示&#xff1a; 解决办法&#xff1a; &#xff08;1&#xff09;先用find命令找到bcompare所在位置&#xff1a;sudo find /home/ -name *bcompare &#xff08;2&#xff09;进入 /home/whf/.config,删除/bco…...

华为交换机生成树STP配置案例

企业内部网络怎么防止网络出现环路&#xff1f;学会STP生成树技术就可以解决啦。 STP简介 在二层交换网络中&#xff0c;一旦存在环路就会造成报文在环路内不断循环和增生&#xff0c;产生广播风暴&#xff0c;从而占用所有的有效带宽&#xff0c;使网络变得无法正常通信。 在…...

Avalonia框架下实现热更新

在Avalonia框架下实现热更新&#xff08;也称为动态加载或模块化更新&#xff09;&#xff0c;通常涉及程序集的动态加载与卸载&#xff0c;以及UI元素、视图模型或其他应用程序逻辑部分的实时替换。由于Avalonia本身是一个跨平台的GUI框架&#xff0c;并没有直接内置热更新机制…...

适用于各种危险区域的火焰识别摄像机,实时监测、火灾预防、安全监控,为安全保驾护航

火灾是一种极具破坏力的灾难&#xff0c;对人们的生命和财产造成了严重的威胁。为了更好地预防和防范火灾&#xff0c;火焰识别摄像机作为一种先进的监控设备&#xff0c;正逐渐受到人们的重视和应用。本文将介绍火焰识别摄像机在安全监控和火灾预防方面的全面应用方案。 一、火…...

react-router-dom5升级到6

前言 升级前版本为5.1.2 下载与运行 下载 npm install react-router-dom6运行 运行发现报错: 将node_modules删除&#xff0c;重新执行npm i即可 运行发现如下报错 这是因为之前有引用react-router-dom.min&#xff0c;v6中取消了该文件&#xff0c;所以未找到文件导致报错。…...

Linux调试工具—gdb

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;HEART BEAT—YOASOBI 2:20━━━━━━️&#x1f49f;──────── 5:35 &#x1f504; ◀️ ⏸ ▶️ ☰ …...

SpringCloud(H版alibaba)框架开发教程之nacos做配置中心——附源码(2)

上篇主要讲了使用eureka&#xff0c;zk&#xff0c;nacos当注册中心 这篇内容是nacos配置中心 代码改动部分mysql驱动更新到8.0&#xff0c;数据库版本升级到了8.0&#xff0c;nacos版本更新到了2.x nacos2.x链接 链接&#xff1a;https://pan.baidu.com/s/11nObzgTjWisAfOp…...

网络摄像头爆破实战

*** 重要说明&#xff1a;仅用于交流网络安全测试技术&#xff0c;并唤起大家对网络安全的重视&#xff0c;如用本文的技术干违法的事情&#xff0c;博主概不负责。*** 文章目录 前言1. 发现摄像头2. 发现端口3. 确定品牌信息4. 确定RTSP地址5. 获取视频流6. 获取密码7. 再次获…...

亚信安慧AntDB数据并行加载工具的实现(二)

3.功能性说明 本节对并行加载工具的部分支持的功能进行简要说明。 1) 支持表类型 并行加载工具支持普通表、分区表。 2) 支持指定导入字段 文件中并不是必须包含表中所有的字段&#xff0c;用户可以指定导入某些字段&#xff0c;但是指定的字段数要和文件中的字段数保持一…...

【Java进阶篇】JDK新版本中的新特性都有哪些

JDK新版本中的新特性都有哪些 ✔️经典解析✔️拓展知识仓✔️本地变量类型推断✔️Switch 表达式✔️Text Blocks✔️Records✔️封装类✔️instanceof 模式匹配✔️switch 模式匹配 ✅✔️虚拟线程 ✔️经典解析 JDK 8中推出了Lambda表达式、Stream、Optional、新的日期API等…...

力扣labuladong一刷day49天迪杰斯特拉

力扣labuladong一刷day49天迪杰斯特拉 文章目录 力扣labuladong一刷day49天迪杰斯特拉一、743. 网络延迟时间二、1631. 最小体力消耗路径三、1514. 概率最大的路径 一、743. 网络延迟时间 题目链接&#xff1a;https://leetcode.cn/problems/network-delay-time/ 使用迪杰斯特…...

MCS接口技术----定时/计数,中断

目录 一.中断系统相关寄存器 1.51单片机中断系统的总体结构&#xff1a; 2.中断源的中断级别&#xff08;由高到低&#xff09;&#xff1a; 3.与中断有关的四个寄存器&#xff1a; &#xff08;1&#xff09;TCON---定时控制寄存器 &#xff08;2&#xff09;IE---中断允…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...

C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)

名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁

赛门铁克威胁猎手团队最新报告披露&#xff0c;数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据&#xff0c;严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能&#xff0c;但SEMR…...

Windows电脑能装鸿蒙吗_Windows电脑体验鸿蒙电脑操作系统教程

鸿蒙电脑版操作系统来了&#xff0c;很多小伙伴想体验鸿蒙电脑版操作系统&#xff0c;可惜&#xff0c;鸿蒙系统并不支持你正在使用的传统的电脑来安装。不过可以通过可以使用华为官方提供的虚拟机&#xff0c;来体验大家心心念念的鸿蒙系统啦&#xff01;注意&#xff1a;虚拟…...

Java数组Arrays操作全攻略

Arrays类的概述 Java中的Arrays类位于java.util包中&#xff0c;提供了一系列静态方法用于操作数组&#xff08;如排序、搜索、填充、比较等&#xff09;。这些方法适用于基本类型数组和对象数组。 常用成员方法及代码示例 排序&#xff08;sort&#xff09; 对数组进行升序…...

动态规划-1035.不相交的线-力扣(LeetCode)

一、题目解析 光看题目要求和例图&#xff0c;感觉这题好麻烦&#xff0c;直线不能相交啊&#xff0c;每个数字只属于一条连线啊等等&#xff0c;但我们结合题目所给的信息和例图的内容&#xff0c;这不就是最长公共子序列吗&#xff1f;&#xff0c;我们把最长公共子序列连线起…...