Flink自定义Source模拟数据流
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>FlinkJoin</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!--com.mysql.cj.jdbc.Driver--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--这个一定要加,否则会报错(5001好像是,记不清了)--><dependency><groupId>org.glassfish.jersey.inject</groupId><artifactId>jersey-hk2</artifactId><version>2.34</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version><scope>compile</scope></dependency></dependencies>
</project>
实体类
订单类
package com.zxl.bean;// TODO: 2024/1/6 订单类 public class Orders {//订单IDprivate Long order_id;//用户IDprivate Long user_id;//订单日期private Long order_date;//订单金额private Integer order_amount;//商品IDprivate Integer product_id;//订单数量private Long order_num;public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Long getUser_id() {return user_id;}public void setUser_id(Long user_id) {this.user_id = user_id;}public Long getOrder_date() {return order_date;}public void setOrder_date(Long order_date) {this.order_date = order_date;}public Integer getOrder_amount() {return order_amount;}public void setOrder_amount(Integer order_amount) {this.order_amount = order_amount;}public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public Long getOrder_num() {return order_num;}public void setOrder_num(Long order_num) {this.order_num = order_num;}public Orders() {}public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {this.order_id = order_id;this.user_id = user_id;this.order_date = order_date;this.order_amount = order_amount;this.product_id = product_id;this.order_num = order_num;}@Overridepublic String toString() {return "Orders{" +"order_id=" + order_id +", user_id=" + user_id +", order_date=" + order_date +", order_amount=" + order_amount +", product_id=" + product_id +", order_num=" + order_num +'}';}
}
支付类
package com.zxl.bean;// TODO: 2024/1/6 支付类 public class Payments {//支付IDprivate Long payment_id;//订单号private Long order_id;//支付金额private Integer payment_amount;//支付类型private String payment_type;//支付日期private Long payment_date;public Long getPayment_id() {return payment_id;}public void setPayment_id(Long payment_id) {this.payment_id = payment_id;}public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Integer getPayment_amount() {return payment_amount;}public void setPayment_amount(Integer payment_amount) {this.payment_amount = payment_amount;}public String getPayment_type() {return payment_type;}public void setPayment_type(String payment_type) {this.payment_type = payment_type;}public Long getPayment_date() {return payment_date;}public void setPayment_date(Long payment_date) {this.payment_date = payment_date;}public Payments() {}public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {this.payment_id = payment_id;this.order_id = order_id;this.payment_amount = payment_amount;this.payment_type = payment_type;this.payment_date = payment_date;}@Overridepublic String toString() {return "payments{" +"payment_id=" + payment_id +", order_id=" + order_id +", payment_amount=" + payment_amount +", payment_type='" + payment_type + '\'' +", payment_date=" + payment_date +'}';}
}
商品类
用作维表测试
package com.zxl.bean;// TODO: 2024/1/6 商品类public class Products {//商品IDprivate Integer product_id;//商品名称private String product_name;//商品价格private Integer product_price;//商品库存private Long product_num;//商品分类private String product_type;public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public String getProduct_name() {return product_name;}public void setProduct_name(String product_name) {this.product_name = product_name;}public Integer getProduct_price() {return product_price;}public void setProduct_price(Integer product_price) {this.product_price = product_price;}public Long getProduct_num() {return product_num;}public void setProduct_num(Long product_num) {this.product_num = product_num;}public String getProduct_type() {return product_type;}public void setProduct_type(String product_type) {this.product_type = product_type;}public Products() {}public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {this.product_id = product_id;this.product_name = product_name;this.product_price = product_price;this.product_num = product_num;this.product_type = product_type;}@Overridepublic String toString() {return "products{" +"product_id=" + product_id +", product_name='" + product_name + '\'' +", product_price=" + product_price +", product_num=" + product_num +", product_type='" + product_type + '\'' +'}';}
}
数据生成
订单数据生成
package com.zxl.datas;import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class OrdersData implements SourceFunction<Orders> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//订单日期private static Long getOrder_date() {//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//用户IDprivate static Long getUser_id() {return random.nextLong();}//订单金额private static Integer getOrder_amount() {return random.nextInt(100);}//商品IDprivate static Integer getProduct_id() {return random.nextInt(100);}//订单数量private static Long getOrder_num() {return random.nextLong();}//订单类private static Orders getOrders() {Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());return orders;}@Overridepublic void run(SourceContext<Orders> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getOrders());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
支付数据生成
package com.zxl.datas;import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Date;
import java.util.Random;public class PaymentData implements SourceFunction<Payments> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//支付IDprivate static Long getPayment_id(){return random.nextLong();}//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//支付金额private static Integer getPayment_amount(){return random.nextInt(1000);}//支付类型private static String getPayment_type(){String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};int are= random.nextInt(6);String area=type[are];return area;}//支付日期private static Long getPayment_date(){//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//支付类private static Payments getPayments(){Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());return payments;}@Overridepublic void run(SourceContext<Payments> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getPayments());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
测试数据打印
package com.zxl.flink;import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class flinkWorks {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据 DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/6 支付数据 DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());//打印数据paymentsDataStreamSource.print();ordersDataStreamSource.print();//启动程序environment.execute();}
}

相关文章:
Flink自定义Source模拟数据流
maven依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...
[易语言]使用易语言部署工业级人脸检测模型
【框架地址】 https://github.com/ShiqiYu/libfacedetection 【算法介绍】 Libfacedetection是一个开源的计算机视觉库,主要用于实时的人脸检测。它利用深度学习技术,特别是卷积神经网络(CNN),实现了高精度的脸部定位…...
2024年海外推广怎么做?
要说如何做海外推广,绝大多数的外贸小伙伴都能说上一些,但是大部分人对于推广系统知识略知一二,没有构建一个系统化的知识框架。 海外推广的几大步骤 1.定策略 做海外推广前,我们需要制定一套营销策略,需要去定义一…...
Redis分布式锁--java实现
文章目录 Redis分布式锁方案:SETNX EXPIRE基本原理比较好的实现会产生四个问题 几种解决原子性的方案方案:SETNX value值是(系统时间过期时间)方案:使用Lua脚本(包含SETNX EXPIRE两条指令)方案:SET的扩展…...
好消息,Linux Kernel 6.7正式发布!
据有关资料显示,该版本是有史以来合并数最多的版本之一,包含 17k 个非合并 commit,实际合并的超过1K个。 那么该版本主要有哪边变化呢?下面我来一一列举一下: Bcachefs文件系统已被合并到主线内核,这是一款…...
【k8s】Kubernetes 声明式 API、命令式
1. 资源管理方式: 1>. 命令式对象管理∶直接使用命令去操作kubernetes资源 kubectl run nginx-pod --imagenginx:1.17.1 --port802>. 命令式对象配置∶通过命令配置和配置文件去操作kubernetes资源 kubectl create/patch -f nginx-pod.yaml3>. 声明式对…...
解锁营销新高度:幽灵鲨CRM推广平台线索对接功能详解
数字营销时代,线索对接是推动业务增长的关键。你是否为线索分布在不同的平台而来回切换?你是否为无法及时联系客户而错失商机?幽灵鲨CRM系统作为一款领先的客户关系管理解决方案,不仅实现了对主流推广平台的全面对接,更…...
uniapp 创建组件
组件:用于将某个功能的 HTML、CSS、JS 封装到一个文件中,提高代码的复用性和可维护性。 创建组件 一、在根目录中创建 components 文件夹,右键点击新建组件。 二、输入组件名称、选择默认模板、点击创建组件。 三、在组件中正常编写内容即可…...
Linux--部署 Tomcat 及其负载均衡
1.案例前置知识点 1)Tomcat简介 名称由来:Tomcat最初是由 Sun的软件构架师詹姆斯邓肯戴维森开发的。后来他帮助将其变 为开源项目,并由Sun贡献给Apache软件基金会。由于大部分开源项目OReilly都会出一本相关的 书,并且将其封面设…...
影像组学介绍
影像组学介绍 1 影像组学介绍2 具体提取影像组学方法流程及工具代码:2.1 影像数据获取2.2 感兴趣区域分割2.3 特征提取与降维选择2.3.1 特征提取:2.3.2 特征降维(特征选择) 2.4 建模分析:2.5 结果分析 参考: 1 影像组学介绍 其实…...
什么是云服务器?云服务器的工作原理是介绍
阿里云服务器ECS英文全程Elastic Compute Service,云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务,阿里云提供多种云服务器ECS实例规格,如经济型e实例、通用算力型u1、ECS计算型c7、通用型g7、GPU实例等,阿里云百科aliyunbai…...
【前后端的那些事】前后端环境搭建+树形结构表格实现
文章目录 1. 前后端项目环境搭建2. table-tree2.1 后端准备2.2 前端准备 前言:最近写项目,发现了一些很有意思的功能,想写文章,录视频把这些内容记录下。但这些功能太零碎,如果为每个功能都单独搭建一个项目࿰…...
PHP版学校教务管理系统源码带文字安装教程
PHP版学校教务管理系统源码带文字安装教程 运行环境 服务器宝塔面板 PHP 7.0 Mysql 5.5及以上版本 Linux Centos7以上 系统介绍: 后台权限控制:支持多个管理员,学生管理,学生成绩,教师管理,文章管理&#x…...
前端背景收集之烟花背景
文章目录 🐒个人主页🏅Vue项目常用组件模板仓库📖前言:🎀源码如下: 🐒个人主页 🏅Vue项目常用组件模板仓库 📖前言: 本篇博客主要提供前端背景收集之烟花背景…...
PCL 格网法计算点云的占地面积
目录 一、算法原理二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT生成的文章。 一、算法原理 该方法主要用于粗略统计机载点云的占地面积。方法原理是将点云沿 X O Y XOY...
《设计模式的艺术》笔记 - 面向对象设计原则
1、单一职责原则 一个类只负责单一功能领域中的相应职责。 2、开闭原则 一个软件实体应当对扩展开放,对修改关闭。即软件实体应当尽量在不修改原有代码的情况下进行扩展。 3、里氏代换原则 所有引用基类的地方必须能透明地使用其子类的对象。即在软件中将一个基类…...
《Linux C编程实战》笔记:线程同步
这一节主要是解决共享资源的处理。操作系统里也讲过互斥、锁之类的概念。 互斥锁 互斥锁通过锁机制来实现线程同步,同一时刻只允许一个线程执行一个关键部分的代码 一下是操作互斥锁的函数,均声明在pthread.h中。 pthread_mutex_init(初始…...
leetcode141.环形链表
题目 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#…...
景联文科技:以高质量数据赋能文生图大模型
1月5日,在智求共赢・中国AIGC产业应用峰会暨无界AI生态合作伙伴大会上,中国AIGC产业联盟联合无界AI发布了《中国AIGC文生图产业白皮书2023》,从AIGC文生图发展历程、主流工具、产业实践以及规模预测等多个维度,全面揭示了中国AIGC…...
[论文笔记] PAI-Megatron中qwen和mistral合并到Megtron-LM
一、千问 关于tokenizer的改动: 1.1、更改build_tokenizer中tokenizer类的加载。 /mnt/nas/pretrain/code/Megatron-LM/megatron/tokenizer/__init__.py 或者 tokenizer.py 在build_tokenizer.py函数中: elif args.tokenizer_type == "QwenTokenizer":assert a…...
如何快速掌握QRemeshify:面向初学者的Blender四边形网格重构完整指南
如何快速掌握QRemeshify:面向初学者的Blender四边形网格重构完整指南 【免费下载链接】QRemeshify A Blender extension for an easy-to-use remesher that outputs good-quality quad topology 项目地址: https://gitcode.com/gh_mirrors/qr/QRemeshify QRe…...
用ESP32和VS1053模块DIY网络收音机:从硬件接线到Arduino代码调试全流程
用ESP32和VS1053打造智能网络收音机:从元器件选型到音频流调试实战 在物联网和智能硬件蓬勃发展的今天,ESP32凭借其出色的无线连接能力和丰富的外设接口,成为DIY音频项目的理想选择。本文将手把手带你完成一个功能完整的网络收音机项目&#…...
高精度运放在电流传感器中的设计与应用
高精度运算放大器在电流传感器中的应用设计1. 电流传感器概述1.1 电流传感器类型与特性电流传感器是用于测量电路电流的关键元件,根据测量原理主要分为以下几种类型:传感器类型测量范围典型应用场景分流电阻式μA~100A电池监测、电机控制磁感应式10mA~1k…...
SPI通信协议与菊花链模式应用解析
四线SPI通信协议与菊花链模式应用详解1. SPI接口基础1.1 四线SPI接口定义串行外设接口(SPI)是微控制器与外围IC之间最广泛使用的通信接口之一,具有同步、全双工、主从式架构特点。标准四线SPI接口包含以下信号线:SCLK(Serial Clock):时钟信号…...
OpenPLC Editor:重塑工业自动化编程的开源方案
OpenPLC Editor:重塑工业自动化编程的开源方案 【免费下载链接】OpenPLC_Editor 项目地址: https://gitcode.com/gh_mirrors/ope/OpenPLC_Editor 在工业自动化领域,PLC(可编程逻辑控制器)编程长期被商业软件垄断ÿ…...
java新手福音:用快马ai生成渐进式八股文学习项目,轻松入门核心知识
作为一个Java新手,刚开始接触"八股文"这个概念时,我完全摸不着头脑。直到在InsCode(快马)平台上尝试了他们的Java学习项目生成功能,才发现原来枯燥的理论知识可以变得这么生动有趣。 渐进式学习路径设计 这个项目最让我惊喜的是它的…...
无网环境下的containerd部署实战:从静态二进制到服务就绪
1. 为什么需要离线部署containerd? 在工业控制、军工系统、金融核心业务等特殊场景中,服务器往往运行在物理隔离的网络环境中。我曾经参与过一个智能制造项目,生产线的控制服务器连内网都不允许接入,更别说访问互联网了。这种环境…...
基于Session管理的在线视频学习平台防作弊策略
1. Session管理在在线学习平台中的核心作用 在线视频学习平台最头疼的问题之一,就是如何防止用户通过多设备同时登录来刷学习进度。想象一下,如果用户同时在手机、平板和电脑上登录同一个账号,三倍速刷完课程,这对其他认真学习的用…...
2021 年 3 月青少年软编等考 C 语言四级真题解析
目录 T1. 酒鬼 思路分析 T2. 重启系统 思路分析 T3. 鸣人的影分身 思路分析 T4. 宠物小精灵之收服 思路分析 T1. 酒鬼 题目链接:SOJ D1053 Santo 刚刚与房东打赌赢得了一间在 New Clondike 的大客厅。今天,他来到这个大客厅欣赏他的奖品。房东摆出了一行瓶子在酒吧上。瓶子…...
Qwen3-4B写作大师优化技巧:3个提示词方法让AI输出质量翻倍
Qwen3-4B写作大师优化技巧:3个提示词方法让AI输出质量翻倍 1. 为什么提示词对Qwen3-4B如此重要 Qwen3-4B-Instruct不是普通的文本生成模型,而是一个具备深度推理能力的AI写作伙伴。与基础模型不同,它经过专门的指令微调(Instruc…...
