大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)
一、需求描述
每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。
二、需求分析
- 1.统计每个商品的点击量, 开窗
- 2.分组窗口分组
- 3.over窗口
三、需求实现
3.1、创建数据源示例
input/UserBehavior.csv
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000
3.2、创建目标表
CREATE DATABASE flink_sql; //创建flink_sql库
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (`w_end` timestamp NOT NULL,`item_id` bigint(20) NOT NULL,`item_count` bigint(20) NOT NULL,`rk` bigint(20) NOT NULL,PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.3、导入JDBC Connector依赖
<!-- 导入JDBC Connector依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
3.4、代码实现
package com.atguigu.flink.java.chapter_12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/31 9:11*/
public class Flink01_HotItem_TopN {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 使用sql从文件读取数据tenv.executeSql("create table user_behavior(" +" user_id bigint, " +" item_id bigint, " +" category_id int, " +" behavior string, " +" ts bigint, " +" event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +" watermark for event_time as event_time - interval '5' second " +")with(" +" 'connector'='filesystem', " +" 'path'='input/UserBehavior.csv', " +" 'format'='csv')");// 每隔 10m 统计一次最近 1h 的热门商品 top// 1. 计算每每个窗口内每个商品的点击量Table t1 = tenv.sqlQuery("select " +" item_id, " +" hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +" count(*) item_count " +"from user_behavior " +"where behavior='pv' " +"group by hop(event_time, interval '10' minute, interval '1' hour), item_id");tenv.createTemporaryView("t1", t1);// 2. 按照窗口开窗, 对商品点击量进行排名Table t2 = tenv.sqlQuery("select " +" *," +" row_number() over(partition by w_end order by item_count desc) rk " +"from t1");tenv.createTemporaryView("t2", t2);// 3. 取 top3Table t3 = tenv.sqlQuery("select " +" item_id, w_end, item_count, rk " +"from t2 " +"where rk<=3");// 4. 数据写入到mysql// 4.1 创建输出表tenv.executeSql("create table hot_item(" +" item_id bigint, " +" w_end timestamp(3), " +" item_count bigint, " +" rk bigint, " +" PRIMARY KEY (w_end, rk) NOT ENFORCED)" +"with(" +" 'connector' = 'jdbc', " +" 'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +" 'table-name' = 'hot_item', " +" 'username' = 'root', " +" 'password' = 'aaaaaa' " +")");// 4.2 写入到输出表t3.executeInsert("hot_item");}
}
执行结果:

四、总结
Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。
相关文章:
大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)
一、需求描述 每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。 二、需求分析 1.统计每个商品的点击量, 开窗2.分组窗口分组3.over窗口 三、需求实现 3.1、创建数据源示例 input/UserBehavior.csv 543462,1715,1464116,pv,1511658000 662867,22…...
python中实现定时任务的几种方案
目录 while True: sleep()Timeloop库threading.Timersched模块schedule模块APScheduler框架Celery框架数据流工具Apache Airflow概述Airflow 核心概念Airflow 的架构 总结以下几种方案实现定时任务,可根据不同需求去使用不同方案。 while True: sleep() 利用whil…...
AcWing算法提高课-5.6.1同余方程
宣传一下 算法提高课整理 CSDN个人主页:更好的阅读体验 原题链接 题目描述 求关于 x x x 的同余方程 a x ≡ 1 ( m o d b ) ax ≡ 1 \pmod b ax≡1(modb) 的最小正整数解。 输入格式 输入只有一行,包含两个正整数 a , b a,b a,b,用一…...
Docker Tutorial
什么是Docker 为每个应用提供完全隔离的运行环境 Dockerfile, Image,Container Image: 相当于虚拟机的快照(snapshot)里面包含了我们需要部署的应用程序以及替它所关联的所有库。通过image,我们可以创建很…...
平面图—简单应用
平面图:若一个图𝐺能画在平面𝑆上,且使𝐺的边仅在端点处相交,则称图𝐺为可嵌入平面𝑆,𝐺称为可平面图,简称为平面图。 欧拉公式:设有…...
安装JDK(Java SE Development Kit)超详细教程
文章时间 : 2023-10-04 1. 下载地址 直接去下载地址:https://www.oracle.com/java/technologies/downloads/ (需要翻墙,不想翻墙或者不想注册oracel账号的,直接去我的阿里云盘) 阿里云盘:http…...
KUKA机器人通过3点法设置工作台基坐标系的具体方法
KUKA机器人通过3点法设置工作台基坐标系的具体方法 具体方法和步骤可参考以下内容: 进入主菜单界面,依次选择“投入运行”—“测量”—基坐标,选择“3点法”, 在系统弹出的基坐标编辑界面,给基座标编号为3,命名为table1,然后单击“继续”按钮,进行下一步操作, 在弹出的…...
以太网的MAC层
以太网的MAC层 一、硬件地址 局域网中,硬件地址又称物理地址或MAC地址(因为用在MAC帧),它是局域网上每一台计算机中固化在适配器的ROM中的地址。 关于地址问题,有这样的定义:“名字指出我们所要寻…...
Hadoop启动后jps发现没有DateNode解决办法
多次使用 Hadoop namenode -format 格式化节点后DateNode丢失 找到hadoop配置文件core-site.xml查找tmp路径 进入该路径,使用rm -rf data删除data文件 再次使用Hadoop namenode -format 格式化后jps后出现DateNode节点...
VUE3照本宣科——应用实例API与setup
VUE3照本宣科——应用实例API与setup 前言一、应用实例API1.createApp()2.app.use()3.app.mount() 二、setup 前言 👨💻👨🌾📝记录学习成果,以便温故而知新 “VUE3照本宣科”是指照着中文官网和菜鸟教…...
json/js对象的key有什么区别?
1.对于JS对象来说 一个js对象如果是这样的 obj {"0": "小明","0name": "小明明", "": 18,"¥": "哈哈"," ": "爱好广泛" }对于js对象来说,有时候key是不…...
极大似然估计概念的理解——统计学习方法
目录 1.最大似然估计的概念的理解1 2.最大似然估计的概念的理解2 3.最大似然估计的概念的理解3 4.例子 1.最大似然估计的概念的理解1 最大似然估计是一种概率论在统计学上的概念,是参数估计的一种方法。给定观测数据来评估模型参数。也就是模型已知,参…...
python模拟表格任意输入位置
在表格里输入数值,要任意位置,我找到了好方法: input输入 1. 行 2. 列输入:1 excel每行输入文字input输入位置 3.2 表示输入位置在:3行个列是要实现一个类似于 Excel 表格的输入功能,并且希望能够指定输入…...
如何限制文件只能通过USB打印机打印,限制打印次数和时限并且无法在打印前查看或编辑内容
在今天这个高度信息化的时代,文档打印已经成为日常工作中不可或缺的一部分。然而,这也带来了诸多安全风险,如文档被篡改、知识产权被侵犯以及信息泄露等。为了解决这些问题,只印应运而生。作为一款独特的软件工具,只印…...
车牌文本检测与识别:License Plate Recognition Based On Multi-Angle View Model
论文作者:Dat Tran-Anh,Khanh Linh Tran,Hoai-Nam Vu 作者单位:Thuyloi University;Posts and Telecommunications Institute of Technology 论文链接:http://arxiv.org/abs/2309.12972v1 内容简介: 1)方向&#x…...
Blender中的4种视图着色模式
Blender中有四种主要的视图着色模式:线框、实体、Look Dev和渲染。它们的主要区别如下: - 线框模式只显示物体的边缘(线框),可以让您看到场景中的所有物体,也可以调整线框的颜色和背景的颜色。 - 实…...
Flutter项目安装到Android手机一直显示在assembledebug
问题 Flutter项目安装到Android手机一直显示在assembledebug 原因 网络不好,gradle依赖下载不下来 解决方案 修改如下的文件 gradle-wrapper.properties 使用腾讯提供的gradle镜像下载 distributionUrlhttps://mirrors.cloud.tencent.com/gradle/gradle-7.5…...
数据挖掘实验(二)数据预处理【等深分箱与等宽分箱】
一、分箱平滑的原理 (1)分箱方法 在分箱前,一定要先排序数据,再将它们分到等深(等宽)的箱中。 常见的有两种分箱方法:等深分箱和等宽分箱。 等深分箱:按记录数进行分箱࿰…...
Vue2 第一次学习
本章为超级浓缩版,文章过于短,方便复习使用哦~ 文章目录 1. 简单引入 vue.js2. 指令2.1 事件绑定指令 v-on (简写 )2.2 内容渲染指令2.3 双向绑定指令 v-model2.4 属性绑定指令 v-bind (简写 : )2.5 条件渲染指令2.6 循环指令 v-for 3. vue 其他知识3.1 侦听器 watch3.2 计算属…...
tiny模式基本原理整合
【Tiny模式】的基本构成 M【首头在首位】 U【/】 V【HTTP/】 Host H【真实ip】 XH \r回车 \n换行 \t制表 \ 空格 一个基本的模式构成 [method] [uri] [version]\r\nHost: [host]\r\n[method] [uri] [version]\r\nHost: [host]\r\n 检测顺序 http M H XH 有些地区 XH H M 我这边…...
DRAM计算内存的电源传输网络优化策略
1. DRAM计算内存中的电源传输网络挑战与优化在数据密集型应用爆炸式增长的今天,传统冯诺依曼架构面临严峻的"内存墙"挑战。计算内存(Compute-in-Memory, CIM)技术通过在内存内部执行计算任务,从根本上改变了数据处理范式…...
边缘计算与AI驱动:2019年技术底层逻辑重塑与产业变革
1. 从数据洪流到智能边缘:2019年的技术底层逻辑重塑 每天产生2.5万亿亿字节的数据,这个数字听起来像是天方夜谭,但这就是我们正在面对的现实。更关键的是,其中90%的数据是在过去两年里生成的。作为一名在半导体和系统设计领域摸爬…...
AI工具导航与实战指南:从分类体系到选型策略
1. 项目概述:AI-Infinity,一个前沿AI工具的探索者指南如果你和我一样,对AI领域层出不穷的新工具感到既兴奋又头疼,那么这个项目绝对值得你花时间深入了解。AI-Infinity,这个由开发者meetpateltech维护的GitHub仓库&…...
41《CAN总线报文周期、抖动与实时性分析》
CAN总线基础:从物理层到数据链路层的核心概念 一、一个让我熬夜的CAN问题 去年调试某款车载ECU时遇到个诡异现象:同一批次的控制器,有的在-20℃低温下CAN通信完全正常,有的却频繁丢帧。示波器挂上去一看,显性电平的下降沿斜率明显变缓,从正常的15ns拖到了40ns。查了三天…...
3分钟掌握Windows与Office智能激活:KMS_VL_ALL_AIO终极解决方案
3分钟掌握Windows与Office智能激活:KMS_VL_ALL_AIO终极解决方案 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 还在为Windows和Office的激活问题烦恼吗?KMS_VL_ALL_AIO作…...
告别卡顿!用UltraISO给旧笔记本装Win10和Ubuntu双系统,从制作启动盘到分区配置完整流程
旧笔记本焕新指南:用UltraISO打造Win10与Ubuntu双系统全流程 每次打开那台陪伴多年的旧笔记本,风扇的轰鸣声和系统卡顿的转圈图标都在提醒你——是时候给它一次重生了。不同于直接更换硬件的高成本方案,通过双系统安装让老旧设备重获新生&…...
Linux桌面便签终极方案:Sticky让你的灵感永不丢失
Linux桌面便签终极方案:Sticky让你的灵感永不丢失 【免费下载链接】sticky A sticky notes app for the linux desktop 项目地址: https://gitcode.com/gh_mirrors/stic/sticky 在Linux桌面上高效管理零散信息一直是许多用户的痛点。Sticky作为一款专为Linux…...
从零构建现代化个人作品集网站:技术选型、架构设计与性能优化实战
1. 项目概述与核心价值 最近在GitHub上看到一个挺有意思的项目,叫“YasirAwan4831/arch-technologies-internship-task-1-portfolio-website”。光看这个仓库名,信息量其实不小。这明显是一个实习生的任务项目,来自一家叫“Arch Technologies…...
从理论到实践:径向基函数(RBF)插值在数据拟合中的应用
1. 径向基函数插值:给离散数据穿上连续外衣 第一次接触RBF插值时,我正在处理一组气象站采集的温度数据。这些站点像随意撒在地图上的芝麻,有的区域密集,有的区域稀疏。当我试图绘制全国温度分布图时,传统线性插值产生的…...
基于Python的Discord机器人开发:从自动化管理到插件化架构实战
1. 项目概述:一个为Discord社区量身打造的智能助手 如果你在运营一个Discord服务器,无论是游戏公会、技术社区还是兴趣小组,肯定遇到过这样的场景:新成员加入后,需要手动发送欢迎消息、引导他们阅读规则;成…...
