Flink自定义Source模拟数据流
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>FlinkJoin</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!--com.mysql.cj.jdbc.Driver--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--这个一定要加,否则会报错(5001好像是,记不清了)--><dependency><groupId>org.glassfish.jersey.inject</groupId><artifactId>jersey-hk2</artifactId><version>2.34</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version><scope>compile</scope></dependency></dependencies>
</project>
实体类
订单类
package com.zxl.bean;// TODO: 2024/1/6 订单类 public class Orders {//订单IDprivate Long order_id;//用户IDprivate Long user_id;//订单日期private Long order_date;//订单金额private Integer order_amount;//商品IDprivate Integer product_id;//订单数量private Long order_num;public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Long getUser_id() {return user_id;}public void setUser_id(Long user_id) {this.user_id = user_id;}public Long getOrder_date() {return order_date;}public void setOrder_date(Long order_date) {this.order_date = order_date;}public Integer getOrder_amount() {return order_amount;}public void setOrder_amount(Integer order_amount) {this.order_amount = order_amount;}public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public Long getOrder_num() {return order_num;}public void setOrder_num(Long order_num) {this.order_num = order_num;}public Orders() {}public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {this.order_id = order_id;this.user_id = user_id;this.order_date = order_date;this.order_amount = order_amount;this.product_id = product_id;this.order_num = order_num;}@Overridepublic String toString() {return "Orders{" +"order_id=" + order_id +", user_id=" + user_id +", order_date=" + order_date +", order_amount=" + order_amount +", product_id=" + product_id +", order_num=" + order_num +'}';}
}
支付类
package com.zxl.bean;// TODO: 2024/1/6 支付类 public class Payments {//支付IDprivate Long payment_id;//订单号private Long order_id;//支付金额private Integer payment_amount;//支付类型private String payment_type;//支付日期private Long payment_date;public Long getPayment_id() {return payment_id;}public void setPayment_id(Long payment_id) {this.payment_id = payment_id;}public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Integer getPayment_amount() {return payment_amount;}public void setPayment_amount(Integer payment_amount) {this.payment_amount = payment_amount;}public String getPayment_type() {return payment_type;}public void setPayment_type(String payment_type) {this.payment_type = payment_type;}public Long getPayment_date() {return payment_date;}public void setPayment_date(Long payment_date) {this.payment_date = payment_date;}public Payments() {}public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {this.payment_id = payment_id;this.order_id = order_id;this.payment_amount = payment_amount;this.payment_type = payment_type;this.payment_date = payment_date;}@Overridepublic String toString() {return "payments{" +"payment_id=" + payment_id +", order_id=" + order_id +", payment_amount=" + payment_amount +", payment_type='" + payment_type + '\'' +", payment_date=" + payment_date +'}';}
}
商品类
用作维表测试
package com.zxl.bean;// TODO: 2024/1/6 商品类public class Products {//商品IDprivate Integer product_id;//商品名称private String product_name;//商品价格private Integer product_price;//商品库存private Long product_num;//商品分类private String product_type;public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public String getProduct_name() {return product_name;}public void setProduct_name(String product_name) {this.product_name = product_name;}public Integer getProduct_price() {return product_price;}public void setProduct_price(Integer product_price) {this.product_price = product_price;}public Long getProduct_num() {return product_num;}public void setProduct_num(Long product_num) {this.product_num = product_num;}public String getProduct_type() {return product_type;}public void setProduct_type(String product_type) {this.product_type = product_type;}public Products() {}public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {this.product_id = product_id;this.product_name = product_name;this.product_price = product_price;this.product_num = product_num;this.product_type = product_type;}@Overridepublic String toString() {return "products{" +"product_id=" + product_id +", product_name='" + product_name + '\'' +", product_price=" + product_price +", product_num=" + product_num +", product_type='" + product_type + '\'' +'}';}
}
数据生成
订单数据生成
package com.zxl.datas;import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class OrdersData implements SourceFunction<Orders> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//订单日期private static Long getOrder_date() {//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//用户IDprivate static Long getUser_id() {return random.nextLong();}//订单金额private static Integer getOrder_amount() {return random.nextInt(100);}//商品IDprivate static Integer getProduct_id() {return random.nextInt(100);}//订单数量private static Long getOrder_num() {return random.nextLong();}//订单类private static Orders getOrders() {Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());return orders;}@Overridepublic void run(SourceContext<Orders> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getOrders());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
支付数据生成
package com.zxl.datas;import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Date;
import java.util.Random;public class PaymentData implements SourceFunction<Payments> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//支付IDprivate static Long getPayment_id(){return random.nextLong();}//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//支付金额private static Integer getPayment_amount(){return random.nextInt(1000);}//支付类型private static String getPayment_type(){String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};int are= random.nextInt(6);String area=type[are];return area;}//支付日期private static Long getPayment_date(){//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//支付类private static Payments getPayments(){Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());return payments;}@Overridepublic void run(SourceContext<Payments> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getPayments());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
测试数据打印
package com.zxl.flink;import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class flinkWorks {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据 DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/6 支付数据 DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());//打印数据paymentsDataStreamSource.print();ordersDataStreamSource.print();//启动程序environment.execute();}
}

相关文章:
Flink自定义Source模拟数据流
maven依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...
[易语言]使用易语言部署工业级人脸检测模型
【框架地址】 https://github.com/ShiqiYu/libfacedetection 【算法介绍】 Libfacedetection是一个开源的计算机视觉库,主要用于实时的人脸检测。它利用深度学习技术,特别是卷积神经网络(CNN),实现了高精度的脸部定位…...
2024年海外推广怎么做?
要说如何做海外推广,绝大多数的外贸小伙伴都能说上一些,但是大部分人对于推广系统知识略知一二,没有构建一个系统化的知识框架。 海外推广的几大步骤 1.定策略 做海外推广前,我们需要制定一套营销策略,需要去定义一…...
Redis分布式锁--java实现
文章目录 Redis分布式锁方案:SETNX EXPIRE基本原理比较好的实现会产生四个问题 几种解决原子性的方案方案:SETNX value值是(系统时间过期时间)方案:使用Lua脚本(包含SETNX EXPIRE两条指令)方案:SET的扩展…...
好消息,Linux Kernel 6.7正式发布!
据有关资料显示,该版本是有史以来合并数最多的版本之一,包含 17k 个非合并 commit,实际合并的超过1K个。 那么该版本主要有哪边变化呢?下面我来一一列举一下: Bcachefs文件系统已被合并到主线内核,这是一款…...
【k8s】Kubernetes 声明式 API、命令式
1. 资源管理方式: 1>. 命令式对象管理∶直接使用命令去操作kubernetes资源 kubectl run nginx-pod --imagenginx:1.17.1 --port802>. 命令式对象配置∶通过命令配置和配置文件去操作kubernetes资源 kubectl create/patch -f nginx-pod.yaml3>. 声明式对…...
解锁营销新高度:幽灵鲨CRM推广平台线索对接功能详解
数字营销时代,线索对接是推动业务增长的关键。你是否为线索分布在不同的平台而来回切换?你是否为无法及时联系客户而错失商机?幽灵鲨CRM系统作为一款领先的客户关系管理解决方案,不仅实现了对主流推广平台的全面对接,更…...
uniapp 创建组件
组件:用于将某个功能的 HTML、CSS、JS 封装到一个文件中,提高代码的复用性和可维护性。 创建组件 一、在根目录中创建 components 文件夹,右键点击新建组件。 二、输入组件名称、选择默认模板、点击创建组件。 三、在组件中正常编写内容即可…...
Linux--部署 Tomcat 及其负载均衡
1.案例前置知识点 1)Tomcat简介 名称由来:Tomcat最初是由 Sun的软件构架师詹姆斯邓肯戴维森开发的。后来他帮助将其变 为开源项目,并由Sun贡献给Apache软件基金会。由于大部分开源项目OReilly都会出一本相关的 书,并且将其封面设…...
影像组学介绍
影像组学介绍 1 影像组学介绍2 具体提取影像组学方法流程及工具代码:2.1 影像数据获取2.2 感兴趣区域分割2.3 特征提取与降维选择2.3.1 特征提取:2.3.2 特征降维(特征选择) 2.4 建模分析:2.5 结果分析 参考: 1 影像组学介绍 其实…...
什么是云服务器?云服务器的工作原理是介绍
阿里云服务器ECS英文全程Elastic Compute Service,云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务,阿里云提供多种云服务器ECS实例规格,如经济型e实例、通用算力型u1、ECS计算型c7、通用型g7、GPU实例等,阿里云百科aliyunbai…...
【前后端的那些事】前后端环境搭建+树形结构表格实现
文章目录 1. 前后端项目环境搭建2. table-tree2.1 后端准备2.2 前端准备 前言:最近写项目,发现了一些很有意思的功能,想写文章,录视频把这些内容记录下。但这些功能太零碎,如果为每个功能都单独搭建一个项目࿰…...
PHP版学校教务管理系统源码带文字安装教程
PHP版学校教务管理系统源码带文字安装教程 运行环境 服务器宝塔面板 PHP 7.0 Mysql 5.5及以上版本 Linux Centos7以上 系统介绍: 后台权限控制:支持多个管理员,学生管理,学生成绩,教师管理,文章管理&#x…...
前端背景收集之烟花背景
文章目录 🐒个人主页🏅Vue项目常用组件模板仓库📖前言:🎀源码如下: 🐒个人主页 🏅Vue项目常用组件模板仓库 📖前言: 本篇博客主要提供前端背景收集之烟花背景…...
PCL 格网法计算点云的占地面积
目录 一、算法原理二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT生成的文章。 一、算法原理 该方法主要用于粗略统计机载点云的占地面积。方法原理是将点云沿 X O Y XOY...
《设计模式的艺术》笔记 - 面向对象设计原则
1、单一职责原则 一个类只负责单一功能领域中的相应职责。 2、开闭原则 一个软件实体应当对扩展开放,对修改关闭。即软件实体应当尽量在不修改原有代码的情况下进行扩展。 3、里氏代换原则 所有引用基类的地方必须能透明地使用其子类的对象。即在软件中将一个基类…...
《Linux C编程实战》笔记:线程同步
这一节主要是解决共享资源的处理。操作系统里也讲过互斥、锁之类的概念。 互斥锁 互斥锁通过锁机制来实现线程同步,同一时刻只允许一个线程执行一个关键部分的代码 一下是操作互斥锁的函数,均声明在pthread.h中。 pthread_mutex_init(初始…...
leetcode141.环形链表
题目 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#…...
景联文科技:以高质量数据赋能文生图大模型
1月5日,在智求共赢・中国AIGC产业应用峰会暨无界AI生态合作伙伴大会上,中国AIGC产业联盟联合无界AI发布了《中国AIGC文生图产业白皮书2023》,从AIGC文生图发展历程、主流工具、产业实践以及规模预测等多个维度,全面揭示了中国AIGC…...
[论文笔记] PAI-Megatron中qwen和mistral合并到Megtron-LM
一、千问 关于tokenizer的改动: 1.1、更改build_tokenizer中tokenizer类的加载。 /mnt/nas/pretrain/code/Megatron-LM/megatron/tokenizer/__init__.py 或者 tokenizer.py 在build_tokenizer.py函数中: elif args.tokenizer_type == "QwenTokenizer":assert a…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...
Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
智能职业发展系统:AI驱动的职业规划平台技术解析
智能职业发展系统:AI驱动的职业规划平台技术解析 引言:数字时代的职业革命 在当今瞬息万变的就业市场中,传统的职业规划方法已无法满足个人和企业的需求。据统计,全球每年有超过2亿人面临职业转型困境,而企业也因此遭…...
