大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化
文章目录
- 0 前言
- 1、环境准备
- 1.1 flink 下载相关 jar 包
- 1.2 生成 kafka 数据
- 1.3 开发前的三个小 tip
- 2、flink-sql 客户端编写运行 sql
- 2.1 创建 kafka 数据源表
- 2.2 指标统计:每小时成交量
- 2.2.1 创建 es 结果表, 存放每小时的成交量
- 2.2.2 执行 sql ,统计每小时的成交量
- 2.3 指标统计:每10分钟累计独立用户数
- 2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
- 2.3.2 创建视图
- 2.3.3 执行 sql ,统计每10分钟的累计独立用户数
- 2.4 指标统计:商品类目销量排行
- 2.4.1 创建商品类目维表
- 2.4.1 创建 es 结果表,存放商品类目排行表
- 2.4.2 创建视图
- 2.4.3 执行 sql , 统计商品类目销量排行
- 3、最终效果与体验心得
- 3.1 最终效果
- 3.2 体验心得
- 3.2.1 执行
- 3.2.2 存储
- 4 最后
0 前言
🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。
为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是
🚩 flink大数据淘宝用户行为数据实时分析与可视化
🥇学长这里给一个题目综合评分(每项满分5分)
- 难度系数:3分
- 工作量:3分
- 创新点:4分
1、环境准备
1.1 flink 下载相关 jar 包
flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。说明与下载入口
本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。
需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。
| 外部 | 版本 | jar |
|---|---|---|
| kafka | 4.1 | flink-sql-connector-kafka_2.11-1.10.2.jar flink-json-1.10.2-sql-jar.jar |
| elasticsearch | 7.6 | flink-sql-connector-elasticsearch7_2.11-1.10.2.jar |
| mysql | 5.7 | flink-jdbc_2.11-1.10.2.jar mysql-connector-java-8.0.11.jar |
1.2 生成 kafka 数据
用户行为数据来源: 阿里云天池公开数据集
网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5
商品类目纬度数据来源: category.sql
数据生成器:datagen.py
有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。
修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据
# 5000 并发
nohup python3 datagen.py 5000 &
1.3 开发前的三个小 tip
-
生成器往 kafka 写数据,会自动创建主题,无需事先创建
-
flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建
-
Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。
使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表
2、flink-sql 客户端编写运行 sql
# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib
2.1 创建 kafka 数据源表
-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime as PROCTIME(),WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH ('connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 'format.type' = 'json'
);
SELECT * FROM user_behavior;
2.2 指标统计:每小时成交量
2.2.1 创建 es 结果表, 存放每小时的成交量
CREATE TABLE buy_cnt_per_hour (hour_of_day BIGINT,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'buy_cnt_per_hour','connector.document-type' = 'user_behavior','connector.bulk-flush.max-actions' = '1','update-mode' = 'append','format.type' = 'json'
);
2.2.2 执行 sql ,统计每小时的成交量
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
2.3 指标统计:每10分钟累计独立用户数
2.3.1 创建 es 结果表,存放每10分钟累计独立用户数
CREATE TABLE cumulative_uv (time_str STRING,uv BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'cumulative_uv','connector.document-type' = 'user_behavior', 'update-mode' = 'upsert','format.type' = 'json'
);
2.3.2 创建视图
CREATE VIEW uv_per_10min AS
SELECTMAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
2.3.3 执行 sql ,统计每10分钟的累计独立用户数
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
2.4 指标统计:商品类目销量排行
2.4.1 创建商品类目维表
先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。
CREATE TABLE category_dim (sub_category_id BIGINT,parent_category_name STRING
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink','connector.table' = 'category','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'root','connector.password' = 'root','connector.lookup.cache.max-rows' = '5000','connector.lookup.cache.ttl' = '10min'
);
2.4.1 创建 es 结果表,存放商品类目排行表
CREATE TABLE top_category (category_name STRING,buy_cnt BIGINT
) WITH ('connector.type' = 'elasticsearch', 'connector.version' = '7', 'connector.hosts' = 'http://172.16.122.13:9200', 'connector.index' = 'top_category','connector.document-type' = 'user_behavior','update-mode' = 'upsert','format.type' = 'json'
);
2.4.2 创建视图
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
2.4.3 执行 sql , 统计商品类目销量排行
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
3、最终效果与体验心得
3.1 最终效果
整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

3.2 体验心得
3.2.1 执行
-
flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。
-
对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。
-
flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。
本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
3.2.2 存储
-
flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。
-
flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。
4 最后
相关文章:
大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化
文章目录 0 前言1、环境准备1.1 flink 下载相关 jar 包1.2 生成 kafka 数据1.3 开发前的三个小 tip 2、flink-sql 客户端编写运行 sql2.1 创建 kafka 数据源表2.2 指标统计:每小时成交量2.2.1 创建 es 结果表, 存放每小时的成交量2.2.2 执行 sql &#x…...
大语言模型训练数据集
大语言模型的数据集有很多,以下是一些常用的: - 中文维基百科:这是一个包含大量中文文本的数据集,可用于训练中文语言模型。 - 英文维基百科:这是一个包含大量英文文本的数据集,可用于训练英文语言模型。 …...
python的课后练习总结4(while循环)
for循环用于针对序列中的每个元素的一个代码块。 while循环是不断的运行,直到指定的条件不满足为止。 while 条件: 条件成立重复执行的代码1 条件成立重复执行的代码2 …….. i 1while i < 5:print(i)i i 11、使用wh…...
Flink Connector 开发
Flink Streaming Connector Flink是新一代流批统一的计算引擎,它需要从不同的第三方存储引擎中把数据读过来,进行处理,然后再写出到另外的存储引擎中。Connector的作用就相当于一个连接器,连接Flink计算引擎跟外界存储系统。Flin…...
Golang leetcode707 设计链表 (链表大成)
文章目录 设计链表 Leetcode707不使用头节点使用头节点 推荐** 设计链表 Leetcode707 题目要求我们通过实现几个方法来完成对链表的各个操作 由于在go语言中都为值传递,(注意这里与值类型、引用类型的而区别),所以即使我们直接在…...
Django和Vue项目运行过程中遇到的问题及解决办法
这是我从CSDN上边买来的一个系统的资源,准备在此基础上改成自己的系统,但是在运行项目这一步上都把自己难为了好几天,经过不断的摸索,终于完成了第一步!!! 如果大家也遇到同样的问题࿰…...
Single-Image Crowd Counting via Multi-Column Convolutional Neural Network
Single-Image Crowd Counting via Multi-Column Convolutional Neural Network 论文背景人群密度方法过去的发展历史早期方法基于轨迹聚类的方法基于特征回归的方法基于图像的方法 Multi-column CNN用于人群计数基于密度图的人群计数通过几何自适应核生成密度图密度图估计的多列…...
el-cascader隐藏某一级的勾选框及vue报错Error in callback for watcher “options“的解决办法
今天用到饿了么的级联选择器时出现了这个报错Error in callback for watcher “options“: “TypeError: Cannot read propertie ‘level‘ of null,因为需求是在不同类型 el-cascader多选的时候默认是可以勾选所有级的选项的,如下图: 包含级联cascader的options、select的…...
2024美赛数学建模思路A题B题C题D题E题F题思路汇总 选题分析
文章目录 1 赛题思路2 美赛比赛日期和时间3 赛题类型4 美赛常见数模问题5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 美赛比赛日期和时间 比赛开始时间:北京时间2024年2月2日(周五ÿ…...
C++ 常用设计模式
一、工厂模式 from:C开发常用的设计模式及其实现详解 - 知乎 摘抄: 简单工厂、工厂、抽象工厂: 简单工厂需要工厂内部判断,而工厂模式不需要修改工厂类: 抽象工厂: 接上图: 未完待续.........
高性价比的高速吹风机/高速风筒解决方案,基于普冉单片机开发
高速吹风机是近些年非常火的一款产品,快速崛起并颠覆了传统吹风机,高速吹风机也成为了传统吹风机替代的一个大趋势。高速吹风机是利用高转速产生的大风量来快速吹干头发,由于其精巧的外观设计、超低的噪声、出色的干发效果,高速吹…...
toRefs的用法
文章目录 toRefs是什么toRefs的作用以及为什么要用它? toRefs是什么 toRefs 是 Vue 3 Composition API 中的一个函数,它用于将响应式对象转换为普通对象,其中对象的每个属性都是 ref 对象。这是因为在 Vue 3 中,reactive 创建的对…...
MySQL基础篇(三)约束
一、概述 概念:约束是作用于表中字段上的规则,用于限制存储在表中的数据。 目的:保证数据库中数据的正确、有效性和完整性。 分类: 注意:约束是作用于表中字段上的,可以在创建表/修改表的时候添加约束。 二…...
Java进阶 1-2 枚举
目录 常量特定方法 职责链模式的枚举实现 状态机模式的枚举实现 多路分发 1、使用枚举类型实现分发 2、使用常量特定方法实现分发 3、使用EnumMap实现分发 4、使用二维数组实现分发 本笔记参考自: 《On Java 中文版》 常量特定方法 在Java中,我们…...
一个人最大的内驱力是什么?
1、不因为孤独或外界压力而降低「生活标准“」的能力。 ”因为寂寞去约炮“、“因为家里催婚匆忙结婚“、”因为没谈过恋爱随便找个人交往。 “你的每一次选择都是在为自己想要的世界而投的票,往后余生是幸福还是悲剧,就是在这一次次 的将就与坚持死磕中…...
解决方法:公众号的API上传素材报错40005
公众号的API上传素材报错40005 Error uploading file : {"errcode":40005,"errmsg":"invalid file type hint: [YOkxGA0122w487] rid: 223442-323247e7bd5-5d75322d88"}上传错误原因分析: 之前成功的示例,文件名为"…...
音量控制软件sound control mac功能亮点
sound control mac可以帮助用户控制某个独立应用程序的音量,通过每应用音量,均衡器,平衡和音频路由独立控制每个应用的音频,还有整个系统的音量。 sound control mac功能亮点 每个应用程序的音量控制 独立控制应用的数量。 键盘音…...
Spring Boot 生产就绪中文文档-下
本文为官方文档直译版本。原文链接 由于篇幅较长,遂分两篇。上半部分中文文档 Spring Boot 生产就绪中文文档-下 度量标准入门受支持的监控系统AppOpticsAtlasDatadogDynatracev2 API自动配置手动配置 v1 API (旧版)与版本无关的设置 ElasticGangliaGraphiteHumioIn…...
DS|树结构及应用
题目一:DS树 -- 树的先根遍历(双亲转先序) 题目描述: 给出一棵树的双亲表示法结果,用一个二维数组表示,位置下标从0开始,如果双亲位置为-1则表示该结点为根结点 编写程序,输出该树…...
Java 读取超大excel文件
注意:此参考解决方案只是针对xlsx格式的excel文件! Maven <dependency><groupId>com.monitorjbl</groupId><artifactId>xlsx-streamer</artifactId><version>2.2.0</version> </dependency>读取方式1…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
设计模式和设计原则回顾
设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...
黑马Mybatis
Mybatis 表现层:页面展示 业务层:逻辑处理 持久层:持久数据化保存 在这里插入图片描述 Mybatis快速入门  class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用
1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...
无人机侦测与反制技术的进展与应用
国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机(无人驾驶飞行器,UAV)技术的快速发展,其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统,无人机的“黑飞”&…...
Elastic 获得 AWS 教育 ISV 合作伙伴资质,进一步增强教育解决方案产品组合
作者:来自 Elastic Udayasimha Theepireddy (Uday), Brian Bergholm, Marianna Jonsdottir 通过搜索 AI 和云创新推动教育领域的数字化转型。 我们非常高兴地宣布,Elastic 已获得 AWS 教育 ISV 合作伙伴资质。这一重要认证表明,Elastic 作为 …...
基于鸿蒙(HarmonyOS5)的打车小程序
1. 开发环境准备 安装DevEco Studio (鸿蒙官方IDE)配置HarmonyOS SDK申请开发者账号和必要的API密钥 2. 项目结构设计 ├── entry │ ├── src │ │ ├── main │ │ │ ├── ets │ │ │ │ ├── pages │ │ │ │ │ ├── H…...
MeanFlow:何凯明新作,单步去噪图像生成新SOTA
1.简介 这篇文章介绍了一种名为MeanFlow的新型生成模型框架,旨在通过单步生成过程高效地将先验分布转换为数据分布。文章的核心创新在于引入了平均速度的概念,这一概念的引入使得模型能够通过单次函数评估完成从先验分布到数据分布的转换,显…...
