计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化
文章目录
- 0 前言
- 1、环境准备
- 1.1 flink 下载相关 jar 包
- 1.2 生成 kafka 数据
- 1.3 开发前的三个小 tip
- 2、flink-sql 客户端编写运行 sql
- 2.1 创建 kafka 数据源表
- 2.2 指标统计:每小时成交量
- 2.2.1 创建 es 结果表, 存放每小时的成交量
- 2.2.2 执行 sql ,统计每小时的成交量
- 2.3 指标统计:每10分钟累计独立用户数
- 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
- 2.3.2 创建视图
- 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
- 2.4 指标统计:商品类目销量排行
- 2.4.1 创建商品类目维表
- 2.4.1 创建 es 结果表,存放商品类目排行表
- 2.4.2 创建视图
- 2.4.3 执行 sql , 统计商品类目销量排行
- 3、最终效果与体验心得
- 3.1 最终效果
- 3.2 体验心得
- 3.2.1 执行
- 3.2.2 存储
- 4 最后
0 前言
🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。
为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是
🚩 flink大数据淘宝用户行为数据实时分析与可视化
🥇学长这里给一个题目综合评分(每项满分5分)
- 难度系数:3分
- 工作量:3分
- 创新点:4分
1、环境准备
1.1 flink 下载相关 jar 包
flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口
本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。
需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。
外部 | 版本 | jar |
---|---|---|
kafka | 4.1 | flink-sql-connector-kafka_2.11-1.10.2.jar flink-json-1.10.2-sql-jar.jar |
elasticsearch | 7.6 | flink-sql-connector-elasticsearch7_2.11-1.10.2.jar |
mysql | 5.7 | flink-jdbc_2.11-1.10.2.jar mysql-connector-java-8.0.11.jar |
1.2 生成 kafka 数据
用户行为数据来源: 阿里云天池公开数据集
网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5
商品类目纬度数据来源: category.sql
数据生成器:datagen.py
有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。
修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据
# 5000 并发
nohup python3 datagen.py 5000 &
1.3 开发前的三个小 tip
-
生成器往 kafka 写数据,会自动创建主题,无需事先创建
-
flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建
-
Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。
使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表
2、flink-sql 客户端编写运行 sql
# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib
2.1 创建 kafka 数据源表
-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'
);
SELECT * FROM user_behavior;
2.2 指标统计:每小时成交量
2.2.1 创建 es 结果表, 存放每小时的成交量
CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);
2.2.2 执行 sql ,统计每小时的成交量
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
2.3 指标统计:每10分钟累计独立用户数
2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior', 'update-mode' = 'upsert','format.type' = 'json'
);
2.3.2 创建视图
CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
2.3.3 执行 sql ,统计每10分钟的累计独立用户数
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
2.4 指标统计:商品类目销量排行
2.4.1 创建商品类目维表
先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。
CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);
2.4.1 创建 es 结果表,存放商品类目排行表
CREATE TABLE top_category (category_name STRING,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);
2.4.2 创建视图
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
2.4.3 执行 sql , 统计商品类目销量排行
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
3、最终效果与体验心得
3.1 最终效果
整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。
3.2 体验心得
3.2.1 执行
-
flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。
-
对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。
-
flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。
本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
3.2.2 存储
-
flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。
-
flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。
4 最后
相关文章:

计算机毕设 flink大数据淘宝用户行为数据实时分析与可视化
文章目录 0 前言1、环境准备1.1 flink 下载相关 jar 包1.2 生成 kafka 数据1.3 开发前的三个小 tip 2、flink-sql 客户端编写运行 sql2.1 创建 kafka 数据源表2.2 指标统计:每小时成交量2.2.1 创建 es 结果表, 存放每小时的成交量2.2.2 执行 sql &#x…...

8.2 矢量图层点要素单一符号使用一
文章目录 前言单一符号(Single symbol)渲染简单标记(Simple Marker)QGis代码实现 SVG标记(SVG marker)QGis代码实现 总结 前言 上一篇教程对矢量图层符号化做了一个整体介绍,并以点图层为例介绍了可以使用的渲染器&am…...
SQL企业微信群机器人消息推送
--参考资料地址 --微软官方地址: https://learn.microsoft.com/zh-cn/sql/relational-databases/system-stored-procedures/ole-automation-stored-procedures-transact-sql?view=sql-server-ver16 --腾讯官方地址:https://developer.work.weixin.qq.com/ --使…...

vscode远程连接ubuntu
修改环境变量,改使用git自带的ssh工具 openssh: C:\Windows\System32\OpenSSH\ssh.exeGit ssh: C:\Program Files\Git\usr\bin\ssh.exe vscode安装插件remote-ssh 重开软件,在左侧拓展入口下方,进入远程资源管理器 点击设置,进…...
Positive Technologies 在迪拜宣布与地区网络安全解决方案提供商开展合作
在中东最大的信息技术展 GITEX GLOBAL 2023 的间隙,Positive Technologies 同意与八家组织(网络安全服务和解决方案提供商)合作,在该地区开展合作,推广最先进的产品,并分享信息安全领域的经验。该公司强调了…...

Pyside6 QTextEdit
Pyside6 QTextEdit QTextEdit使用QTextEdit常用函数文本编辑类函数文本框格式设置函数设置文字颜色设置文字背景颜色设置文字格式设置文本框样式程序设置界面设置 QTextEdit信号textChanged信号 完整程序界面程序主程序 QTextEdit类提供了一个用于编辑和显示纯文本和富文本的组…...
Hadoop核心机制详细解析
Hadoop核心机制详细解析 Hadoop的核心机制是通过HDFS文件系统和MapReduce算法进行存储资源、内存和程序的有效利用与管理。在现实的实例中,通过Hadoop,可以轻易的将多台普通的或低性能的服务器组合成分布式的运算-存储集群,提供大数据量的存…...

Chromium源码由浅入深(一)
工作中需要对Chromium源码、尤其是源码中图形部分进行深入研究,所以借此机会边学习边写文章,分享一下我的实时学习研究Chromium源码的由浅入深的过程。 闲言少叙,书归正传。 通过命令行启动Chrome浏览器,命令及结果如下…...

Spring Authorization Server 1.1 扩展 OAuth2 密码模式与 Spring Cloud Gateway 整合实战
目录 前言无图无真相创建数据库授权服务器maven 依赖application.yml授权服务器配置AuthorizationServierConfigDefaultSecutiryConfig 密码模式扩展PasswordAuthenticationTokenPasswordAuthenticationConverterPasswordAuthenticationProvider JWT 自定义字段自定义认证响应认…...

UE4 UltraDynamicSky 天气与水体交互
最上面的Lerp的A通道为之前的水面效果,B是做的冰面效果 用Dynamic_Landscape_Weather_Effects的BaseColor的R通道四舍五入作为Lerp的Alpha值 使用一张贴图,乘以RadialGradientExponential对材质边缘做弱化,RadialGradientExponential的Raid…...
Liunx 实时调度策略 SCHED_RR SCHED_FIFO 区别 适用情况
SCHED_RR SCHED_FIFO 适用情况 SCHED_FIFO 先进先出调度。只能在静态优先级高于0的情况下使用,这意味着当 SCHED_FIFO 线程变得可运行时,它总是立即抢占当前正在运行的任何 SCHED_OTHER、SCHED_BATCH 或 SCHED_IDLE 线程。SCHED_FIFO 线程一直运行到被…...
mac上使用虚拟机vm, 里面的镜像挂起会占用电脑的内存吗, 挂起和关机的区别是什么, 会影响正常电脑的内存和硬盘使用吗
解释 在Mac(或任何其他操作系统)上使用虚拟机(如VMware Fusion、Parallels Desktop、VirtualBox等)时,“挂起”(Suspend)和“关机”(Power Off或Shut Down)是两种不同的虚…...

AIGC时代 浪潮信息积极推动存储产品创新
近几年,AIGC的兴起,进一步驱动了全闪、混闪等存储产品的创新,也为市场带来了新的机遇,对于厂家而言,也需要升级存储产品的容量、性能及功能,方能满足场景诉求。对此,浪潮信息面向AIGC应用场景打…...
【PG】PostgreSQL字符集
目录 设置字符集 1 设置集群默认的字符集编码 2 设置数据库的字符集编码 查看字符集 1 查看数据字符集编码 2 查看服务端字符集 3 查看客户端字符集 4 查看默认的排序规则和字符分类 被支持的字符集 PostgreSQL里面的字符集支持你能够以各种字符集存储文本,…...
力扣:137. 只出现一次的数字 II(Python3)
题目: 给你一个整数数组 nums ,除某个元素仅出现 一次 外,其余每个元素都恰出现 三次 。请你找出并返回那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法且使用常数级空间来解决此问题。 来源:力扣(Lee…...

orb-slam3编译手册(Ubuntu20.04)
orb-slam3编译手册(Ubuntu20.04) 一、环境要求1.安装git2.安装g3.安装CMake4.安装vi编辑器 二、源代码下载三、依赖库下载1.Eigen安装2.Pangolin安装3.opencv安装4.安装Python & libssl-dev5.安装boost库 三、安装orb-slam3四、数据集下载及测试 写在…...

升级 Xcode 15模拟器 iOS 17.0 Simulator(21A328) 下载失败
升级 IDE Xcode 15 后本地模拟器 Simulator 全被清空,反复重新尝试 Get 下载频频因网络异常断开而导致失败 ... 注:通过 Get 方式下载一定要保证当前网络环境足够平稳,网络环境不好的情况下该方法几乎成不了 解决办法 Get 方式行不通可以尝试通过 官网 途径先下载 模拟器安装包…...

PHP 函数、PHP 简单后门
函数 基本结构 语法结构 function 函数名(形式参数1,形式参数2...){//函数体return 返回值 }定义并执行一个简单函数 // funtion.phpfunction test(){echo "This is function ".__FUNCTION__; }test();函数传参 // function.phpfunction add($x, $y){$sum $x …...

前端实现菜单按钮级权限
核心思想就是通过登录请求此用户对应的权限菜单,然后跳转首页,触发全局前置导航守卫,在全局导航守卫中通过 addRoute 添加动态路由进去。addRoute有一个需要注意的地方,就是我们添加完动态路由后,地址栏上立即访问添加…...

STM32:TTL串口调试
一.TTL串口概要 TTL只需要两个线就可以完成两个设备之间的双向通信,一个发送电平的I/O称之为TX,与另一个设备的接收I/O口RX相互连接。两设备之间还需要连接地线(GND),这样两设备就有相同的0V参考电势。 二.TTL串口调试 实现电脑通过STM32发送…...

网络六边形受到攻击
大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...

12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...

CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...