Flink自定义Source模拟数据流
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>FlinkJoin</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!--com.mysql.cj.jdbc.Driver--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--这个一定要加,否则会报错(5001好像是,记不清了)--><dependency><groupId>org.glassfish.jersey.inject</groupId><artifactId>jersey-hk2</artifactId><version>2.34</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version><scope>compile</scope></dependency></dependencies>
</project>
实体类
订单类
package com.zxl.bean;// TODO: 2024/1/6 订单类 public class Orders {//订单IDprivate Long order_id;//用户IDprivate Long user_id;//订单日期private Long order_date;//订单金额private Integer order_amount;//商品IDprivate Integer product_id;//订单数量private Long order_num;public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Long getUser_id() {return user_id;}public void setUser_id(Long user_id) {this.user_id = user_id;}public Long getOrder_date() {return order_date;}public void setOrder_date(Long order_date) {this.order_date = order_date;}public Integer getOrder_amount() {return order_amount;}public void setOrder_amount(Integer order_amount) {this.order_amount = order_amount;}public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public Long getOrder_num() {return order_num;}public void setOrder_num(Long order_num) {this.order_num = order_num;}public Orders() {}public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {this.order_id = order_id;this.user_id = user_id;this.order_date = order_date;this.order_amount = order_amount;this.product_id = product_id;this.order_num = order_num;}@Overridepublic String toString() {return "Orders{" +"order_id=" + order_id +", user_id=" + user_id +", order_date=" + order_date +", order_amount=" + order_amount +", product_id=" + product_id +", order_num=" + order_num +'}';}
}
支付类
package com.zxl.bean;// TODO: 2024/1/6 支付类 public class Payments {//支付IDprivate Long payment_id;//订单号private Long order_id;//支付金额private Integer payment_amount;//支付类型private String payment_type;//支付日期private Long payment_date;public Long getPayment_id() {return payment_id;}public void setPayment_id(Long payment_id) {this.payment_id = payment_id;}public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Integer getPayment_amount() {return payment_amount;}public void setPayment_amount(Integer payment_amount) {this.payment_amount = payment_amount;}public String getPayment_type() {return payment_type;}public void setPayment_type(String payment_type) {this.payment_type = payment_type;}public Long getPayment_date() {return payment_date;}public void setPayment_date(Long payment_date) {this.payment_date = payment_date;}public Payments() {}public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {this.payment_id = payment_id;this.order_id = order_id;this.payment_amount = payment_amount;this.payment_type = payment_type;this.payment_date = payment_date;}@Overridepublic String toString() {return "payments{" +"payment_id=" + payment_id +", order_id=" + order_id +", payment_amount=" + payment_amount +", payment_type='" + payment_type + '\'' +", payment_date=" + payment_date +'}';}
}
商品类
用作维表测试
package com.zxl.bean;// TODO: 2024/1/6 商品类public class Products {//商品IDprivate Integer product_id;//商品名称private String product_name;//商品价格private Integer product_price;//商品库存private Long product_num;//商品分类private String product_type;public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public String getProduct_name() {return product_name;}public void setProduct_name(String product_name) {this.product_name = product_name;}public Integer getProduct_price() {return product_price;}public void setProduct_price(Integer product_price) {this.product_price = product_price;}public Long getProduct_num() {return product_num;}public void setProduct_num(Long product_num) {this.product_num = product_num;}public String getProduct_type() {return product_type;}public void setProduct_type(String product_type) {this.product_type = product_type;}public Products() {}public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {this.product_id = product_id;this.product_name = product_name;this.product_price = product_price;this.product_num = product_num;this.product_type = product_type;}@Overridepublic String toString() {return "products{" +"product_id=" + product_id +", product_name='" + product_name + '\'' +", product_price=" + product_price +", product_num=" + product_num +", product_type='" + product_type + '\'' +'}';}
}
数据生成
订单数据生成
package com.zxl.datas;import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class OrdersData implements SourceFunction<Orders> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//订单日期private static Long getOrder_date() {//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//用户IDprivate static Long getUser_id() {return random.nextLong();}//订单金额private static Integer getOrder_amount() {return random.nextInt(100);}//商品IDprivate static Integer getProduct_id() {return random.nextInt(100);}//订单数量private static Long getOrder_num() {return random.nextLong();}//订单类private static Orders getOrders() {Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());return orders;}@Overridepublic void run(SourceContext<Orders> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getOrders());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
支付数据生成
package com.zxl.datas;import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Date;
import java.util.Random;public class PaymentData implements SourceFunction<Payments> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//支付IDprivate static Long getPayment_id(){return random.nextLong();}//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//支付金额private static Integer getPayment_amount(){return random.nextInt(1000);}//支付类型private static String getPayment_type(){String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};int are= random.nextInt(6);String area=type[are];return area;}//支付日期private static Long getPayment_date(){//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//支付类private static Payments getPayments(){Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());return payments;}@Overridepublic void run(SourceContext<Payments> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getPayments());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
测试数据打印
package com.zxl.flink;import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class flinkWorks {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据 DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/6 支付数据 DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());//打印数据paymentsDataStreamSource.print();ordersDataStreamSource.print();//启动程序environment.execute();}
}

相关文章:
Flink自定义Source模拟数据流
maven依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...
[易语言]使用易语言部署工业级人脸检测模型
【框架地址】 https://github.com/ShiqiYu/libfacedetection 【算法介绍】 Libfacedetection是一个开源的计算机视觉库,主要用于实时的人脸检测。它利用深度学习技术,特别是卷积神经网络(CNN),实现了高精度的脸部定位…...
2024年海外推广怎么做?
要说如何做海外推广,绝大多数的外贸小伙伴都能说上一些,但是大部分人对于推广系统知识略知一二,没有构建一个系统化的知识框架。 海外推广的几大步骤 1.定策略 做海外推广前,我们需要制定一套营销策略,需要去定义一…...
Redis分布式锁--java实现
文章目录 Redis分布式锁方案:SETNX EXPIRE基本原理比较好的实现会产生四个问题 几种解决原子性的方案方案:SETNX value值是(系统时间过期时间)方案:使用Lua脚本(包含SETNX EXPIRE两条指令)方案:SET的扩展…...
好消息,Linux Kernel 6.7正式发布!
据有关资料显示,该版本是有史以来合并数最多的版本之一,包含 17k 个非合并 commit,实际合并的超过1K个。 那么该版本主要有哪边变化呢?下面我来一一列举一下: Bcachefs文件系统已被合并到主线内核,这是一款…...
【k8s】Kubernetes 声明式 API、命令式
1. 资源管理方式: 1>. 命令式对象管理∶直接使用命令去操作kubernetes资源 kubectl run nginx-pod --imagenginx:1.17.1 --port802>. 命令式对象配置∶通过命令配置和配置文件去操作kubernetes资源 kubectl create/patch -f nginx-pod.yaml3>. 声明式对…...
解锁营销新高度:幽灵鲨CRM推广平台线索对接功能详解
数字营销时代,线索对接是推动业务增长的关键。你是否为线索分布在不同的平台而来回切换?你是否为无法及时联系客户而错失商机?幽灵鲨CRM系统作为一款领先的客户关系管理解决方案,不仅实现了对主流推广平台的全面对接,更…...
uniapp 创建组件
组件:用于将某个功能的 HTML、CSS、JS 封装到一个文件中,提高代码的复用性和可维护性。 创建组件 一、在根目录中创建 components 文件夹,右键点击新建组件。 二、输入组件名称、选择默认模板、点击创建组件。 三、在组件中正常编写内容即可…...
Linux--部署 Tomcat 及其负载均衡
1.案例前置知识点 1)Tomcat简介 名称由来:Tomcat最初是由 Sun的软件构架师詹姆斯邓肯戴维森开发的。后来他帮助将其变 为开源项目,并由Sun贡献给Apache软件基金会。由于大部分开源项目OReilly都会出一本相关的 书,并且将其封面设…...
影像组学介绍
影像组学介绍 1 影像组学介绍2 具体提取影像组学方法流程及工具代码:2.1 影像数据获取2.2 感兴趣区域分割2.3 特征提取与降维选择2.3.1 特征提取:2.3.2 特征降维(特征选择) 2.4 建模分析:2.5 结果分析 参考: 1 影像组学介绍 其实…...
什么是云服务器?云服务器的工作原理是介绍
阿里云服务器ECS英文全程Elastic Compute Service,云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务,阿里云提供多种云服务器ECS实例规格,如经济型e实例、通用算力型u1、ECS计算型c7、通用型g7、GPU实例等,阿里云百科aliyunbai…...
【前后端的那些事】前后端环境搭建+树形结构表格实现
文章目录 1. 前后端项目环境搭建2. table-tree2.1 后端准备2.2 前端准备 前言:最近写项目,发现了一些很有意思的功能,想写文章,录视频把这些内容记录下。但这些功能太零碎,如果为每个功能都单独搭建一个项目࿰…...
PHP版学校教务管理系统源码带文字安装教程
PHP版学校教务管理系统源码带文字安装教程 运行环境 服务器宝塔面板 PHP 7.0 Mysql 5.5及以上版本 Linux Centos7以上 系统介绍: 后台权限控制:支持多个管理员,学生管理,学生成绩,教师管理,文章管理&#x…...
前端背景收集之烟花背景
文章目录 🐒个人主页🏅Vue项目常用组件模板仓库📖前言:🎀源码如下: 🐒个人主页 🏅Vue项目常用组件模板仓库 📖前言: 本篇博客主要提供前端背景收集之烟花背景…...
PCL 格网法计算点云的占地面积
目录 一、算法原理二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT生成的文章。 一、算法原理 该方法主要用于粗略统计机载点云的占地面积。方法原理是将点云沿 X O Y XOY...
《设计模式的艺术》笔记 - 面向对象设计原则
1、单一职责原则 一个类只负责单一功能领域中的相应职责。 2、开闭原则 一个软件实体应当对扩展开放,对修改关闭。即软件实体应当尽量在不修改原有代码的情况下进行扩展。 3、里氏代换原则 所有引用基类的地方必须能透明地使用其子类的对象。即在软件中将一个基类…...
《Linux C编程实战》笔记:线程同步
这一节主要是解决共享资源的处理。操作系统里也讲过互斥、锁之类的概念。 互斥锁 互斥锁通过锁机制来实现线程同步,同一时刻只允许一个线程执行一个关键部分的代码 一下是操作互斥锁的函数,均声明在pthread.h中。 pthread_mutex_init(初始…...
leetcode141.环形链表
题目 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#…...
景联文科技:以高质量数据赋能文生图大模型
1月5日,在智求共赢・中国AIGC产业应用峰会暨无界AI生态合作伙伴大会上,中国AIGC产业联盟联合无界AI发布了《中国AIGC文生图产业白皮书2023》,从AIGC文生图发展历程、主流工具、产业实践以及规模预测等多个维度,全面揭示了中国AIGC…...
[论文笔记] PAI-Megatron中qwen和mistral合并到Megtron-LM
一、千问 关于tokenizer的改动: 1.1、更改build_tokenizer中tokenizer类的加载。 /mnt/nas/pretrain/code/Megatron-LM/megatron/tokenizer/__init__.py 或者 tokenizer.py 在build_tokenizer.py函数中: elif args.tokenizer_type == "QwenTokenizer":assert a…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
【从零学习JVM|第三篇】类的生命周期(高频面试题)
前言: 在Java编程中,类的生命周期是指类从被加载到内存中开始,到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期,让读者对此有深刻印象。 目录 …...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...
解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist
现象: android studio报错: [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决: 不要动CMakeLists.…...
