大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化
文章目录
- 0 前言
- 1、环境准备
- 1.1 flink 下载相关 jar 包
- 1.2 生成 kafka 数据
- 1.3 开发前的三个小 tip
- 2、flink-sql 客户端编写运行 sql
- 2.1 创建 kafka 数据源表
- 2.2 指标统计:每小时成交量
- 2.2.1 创建 es 结果表, 存放每小时的成交量
- 2.2.2 执行 sql ,统计每小时的成交量
- 2.3 指标统计:每10分钟累计独立用户数
- 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
- 2.3.2 创建视图
- 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
- 2.4 指标统计:商品类目销量排行
- 2.4.1 创建商品类目维表
- 2.4.1 创建 es 结果表,存放商品类目排行表
- 2.4.2 创建视图
- 2.4.3 执行 sql , 统计商品类目销量排行
- 3、最终效果与体验心得
- 3.1 最终效果
- 3.2 体验心得
- 3.2.1 执行
- 3.2.2 存储
- 4 最后
0 前言
🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。
为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是
🚩 flink大数据淘宝用户行为数据实时分析与可视化
🥇学长这里给一个题目综合评分(每项满分5分)
- 难度系数:3分
- 工作量:3分
- 创新点:4分
1、环境准备
1.1 flink 下载相关 jar 包
flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口
本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。
需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。
外部 | 版本 | jar |
---|---|---|
kafka | 4.1 | flink-sql-connector-kafka_2.11-1.10.2.jar flink-json-1.10.2-sql-jar.jar |
elasticsearch | 7.6 | flink-sql-connector-elasticsearch7_2.11-1.10.2.jar |
mysql | 5.7 | flink-jdbc_2.11-1.10.2.jar mysql-connector-java-8.0.11.jar |
1.2 生成 kafka 数据
用户行为数据来源: 阿里云天池公开数据集
网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5
商品类目纬度数据来源: category.sql
数据生成器:datagen.py
有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。
修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据
# 5000 并发
nohup python3 datagen.py 5000 &
1.3 开发前的三个小 tip
-
生成器往 kafka 写数据,会自动创建主题,无需事先创建
-
flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建
-
Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。
使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表
2、flink-sql 客户端编写运行 sql
# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib
2.1 创建 kafka 数据源表
-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'
);
SELECT * FROM user_behavior;
2.2 指标统计:每小时成交量
2.2.1 创建 es 结果表, 存放每小时的成交量
CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);
2.2.2 执行 sql ,统计每小时的成交量
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
2.3 指标统计:每10分钟累计独立用户数
2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior', 'update-mode' = 'upsert','format.type' = 'json'
);
2.3.2 创建视图
CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
2.3.3 执行 sql ,统计每10分钟的累计独立用户数
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
2.4 指标统计:商品类目销量排行
2.4.1 创建商品类目维表
先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。
CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);
2.4.1 创建 es 结果表,存放商品类目排行表
CREATE TABLE top_category (category_name STRING,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);
2.4.2 创建视图
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
2.4.3 执行 sql , 统计商品类目销量排行
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
3、最终效果与体验心得
3.1 最终效果
整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。
3.2 体验心得
3.2.1 执行
-
flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。
-
对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。
-
flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。
本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
3.2.2 存储
-
flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。
-
flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。
4 最后
相关文章:

大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化
文章目录 0 前言1、环境准备1.1 flink 下载相关 jar 包1.2 生成 kafka 数据1.3 开发前的三个小 tip 2、flink-sql 客户端编写运行 sql2.1 创建 kafka 数据源表2.2 指标统计:每小时成交量2.2.1 创建 es 结果表, 存放每小时的成交量2.2.2 执行 sql &#x…...
大语言模型训练数据集
大语言模型的数据集有很多,以下是一些常用的: - 中文维基百科:这是一个包含大量中文文本的数据集,可用于训练中文语言模型。 - 英文维基百科:这是一个包含大量英文文本的数据集,可用于训练英文语言模型。 …...

python的课后练习总结4(while循环)
for循环用于针对序列中的每个元素的一个代码块。 while循环是不断的运行,直到指定的条件不满足为止。 while 条件: 条件成立重复执行的代码1 条件成立重复执行的代码2 …….. i 1while i < 5:print(i)i i 11、使用wh…...

Flink Connector 开发
Flink Streaming Connector Flink是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector的作用就相当于一个连接器,连接Flink计算引擎跟外界存储系统。Flin…...

Golang leetcode707 设计链表 (链表大成)
文章目录 设计链表 Leetcode707不使用头节点使用头节点 推荐** 设计链表 Leetcode707 题目要求我们通过实现几个方法来完成对链表的各个操作 由于在go语言中都为值传递,(注意这里与值类型、引用类型的而区别),所以即使我们直接在…...

Django和Vue项目运行过程中遇到的问题及解决办法
这是我从CSDN上边买来的一个系统的资源,准备在此基础上改成自己的系统,但是在运行项目这一步上都把自己难为了好几天,经过不断的摸索,终于完成了第一步!!! 如果大家也遇到同样的问题࿰…...

Single-Image Crowd Counting via Multi-Column Convolutional Neural Network
Single-Image Crowd Counting via Multi-Column Convolutional Neural Network 论文背景人群密度方法过去的发展历史早期方法基于轨迹聚类的方法基于特征回归的方法基于图像的方法 Multi-column CNN用于人群计数基于密度图的人群计数通过几何自适应核生成密度图密度图估计的多列…...

el-cascader隐藏某一级的勾选框及vue报错Error in callback for watcher “options“的解决办法
今天用到饿了么的级联选择器时出现了这个报错Error in callback for watcher “options“: “TypeError: Cannot read propertie ‘level‘ of null,因为需求是在不同类型 el-cascader多选的时候默认是可以勾选所有级的选项的,如下图: 包含级联cascader的options、select的…...

2024美赛数学建模思路A题B题C题D题E题F题思路汇总 选题分析
文章目录 1 赛题思路2 美赛比赛日期和时间3 赛题类型4 美赛常见数模问题5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 美赛比赛日期和时间 比赛开始时间:北京时间2024年2月2日(周五ÿ…...

C++ 常用设计模式
一、工厂模式 from:C开发常用的设计模式及其实现详解 - 知乎 摘抄: 简单工厂、工厂、抽象工厂: 简单工厂需要工厂内部判断,而工厂模式不需要修改工厂类: 抽象工厂: 接上图: 未完待续.........

高性价比的高速吹风机/高速风筒解决方案,基于普冉单片机开发
高速吹风机是近些年非常火的一款产品,快速崛起并颠覆了传统吹风机,高速吹风机也成为了传统吹风机替代的一个大趋势。高速吹风机是利用高转速产生的大风量来快速吹干头发,由于其精巧的外观设计、超低的噪声、出色的干发效果,高速吹…...
toRefs的用法
文章目录 toRefs是什么toRefs的作用以及为什么要用它? toRefs是什么 toRefs 是 Vue 3 Composition API 中的一个函数,它用于将响应式对象转换为普通对象,其中对象的每个属性都是 ref 对象。这是因为在 Vue 3 中,reactive 创建的对…...

MySQL基础篇(三)约束
一、概述 概念:约束是作用于表中字段上的规则,用于限制存储在表中的数据。 目的:保证数据库中数据的正确、有效性和完整性。 分类: 注意:约束是作用于表中字段上的,可以在创建表/修改表的时候添加约束。 二…...

Java进阶 1-2 枚举
目录 常量特定方法 职责链模式的枚举实现 状态机模式的枚举实现 多路分发 1、使用枚举类型实现分发 2、使用常量特定方法实现分发 3、使用EnumMap实现分发 4、使用二维数组实现分发 本笔记参考自: 《On Java 中文版》 常量特定方法 在Java中,我们…...
一个人最大的内驱力是什么?
1、不因为孤独或外界压力而降低「生活标准“」的能力。 ”因为寂寞去约炮“、“因为家里催婚匆忙结婚“、”因为没谈过恋爱随便找个人交往。 “你的每一次选择都是在为自己想要的世界而投的票,往后余生是幸福还是悲剧,就是在这一次次 的将就与坚持死磕中…...
解决方法:公众号的API上传素材报错40005
公众号的API上传素材报错40005 Error uploading file : {"errcode":40005,"errmsg":"invalid file type hint: [YOkxGA0122w487] rid: 223442-323247e7bd5-5d75322d88"}上传错误原因分析: 之前成功的示例,文件名为"…...

音量控制软件sound control mac功能亮点
sound control mac可以帮助用户控制某个独立应用程序的音量,通过每应用音量,均衡器,平衡和音频路由独立控制每个应用的音频,还有整个系统的音量。 sound control mac功能亮点 每个应用程序的音量控制 独立控制应用的数量。 键盘音…...
Spring Boot 生产就绪中文文档-下
本文为官方文档直译版本。原文链接 由于篇幅较长,遂分两篇。上半部分中文文档 Spring Boot 生产就绪中文文档-下 度量标准入门受支持的监控系统AppOpticsAtlasDatadogDynatracev2 API自动配置手动配置 v1 API (旧版)与版本无关的设置 ElasticGangliaGraphiteHumioIn…...
DS|树结构及应用
题目一:DS树 -- 树的先根遍历(双亲转先序) 题目描述: 给出一棵树的双亲表示法结果,用一个二维数组表示,位置下标从0开始,如果双亲位置为-1则表示该结点为根结点 编写程序,输出该树…...

Java 读取超大excel文件
注意:此参考解决方案只是针对xlsx格式的excel文件! Maven <dependency><groupId>com.monitorjbl</groupId><artifactId>xlsx-streamer</artifactId><version>2.2.0</version> </dependency>读取方式1…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

相机从app启动流程
一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

AD学习(3)
1 PCB封装元素组成及简单的PCB封装创建 封装的组成部分: (1)PCB焊盘:表层的铜 ,top层的铜 (2)管脚序号:用来关联原理图中的管脚的序号,原理图的序号需要和PCB封装一一…...

Linux基础开发工具——vim工具
文章目录 vim工具什么是vimvim的多模式和使用vim的基础模式vim的三种基础模式三种模式的初步了解 常用模式的详细讲解插入模式命令模式模式转化光标的移动文本的编辑 底行模式替换模式视图模式总结 使用vim的小技巧vim的配置(了解) vim工具 本文章仍然是继续讲解Linux系统下的…...

智警杯备赛--excel模块
数据透视与图表制作 创建步骤 创建 1.在Excel的插入或者数据标签页下找到数据透视表的按钮 2.将数据放进“请选择单元格区域“中,点击确定 这是最终结果,但是由于环境启不了,这里用的是自己的excel,真实的环境中的excel根据实训…...

第2课 SiC MOSFET与 Si IGBT 静态特性对比
2.1 输出特性对比 2.2 转移特性对比 2.1 输出特性对比 器件的输出特性描述了当温度和栅源电压(栅射电压)为某一具体数值时,漏极电流(集电极电流...