Flink自定义Source模拟数据流
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>FlinkJoin</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!--com.mysql.cj.jdbc.Driver--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--这个一定要加,否则会报错(5001好像是,记不清了)--><dependency><groupId>org.glassfish.jersey.inject</groupId><artifactId>jersey-hk2</artifactId><version>2.34</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version><scope>compile</scope></dependency></dependencies>
</project>
实体类
订单类
package com.zxl.bean;// TODO: 2024/1/6 订单类 public class Orders {//订单IDprivate Long order_id;//用户IDprivate Long user_id;//订单日期private Long order_date;//订单金额private Integer order_amount;//商品IDprivate Integer product_id;//订单数量private Long order_num;public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Long getUser_id() {return user_id;}public void setUser_id(Long user_id) {this.user_id = user_id;}public Long getOrder_date() {return order_date;}public void setOrder_date(Long order_date) {this.order_date = order_date;}public Integer getOrder_amount() {return order_amount;}public void setOrder_amount(Integer order_amount) {this.order_amount = order_amount;}public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public Long getOrder_num() {return order_num;}public void setOrder_num(Long order_num) {this.order_num = order_num;}public Orders() {}public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {this.order_id = order_id;this.user_id = user_id;this.order_date = order_date;this.order_amount = order_amount;this.product_id = product_id;this.order_num = order_num;}@Overridepublic String toString() {return "Orders{" +"order_id=" + order_id +", user_id=" + user_id +", order_date=" + order_date +", order_amount=" + order_amount +", product_id=" + product_id +", order_num=" + order_num +'}';}
}
支付类
package com.zxl.bean;// TODO: 2024/1/6 支付类 public class Payments {//支付IDprivate Long payment_id;//订单号private Long order_id;//支付金额private Integer payment_amount;//支付类型private String payment_type;//支付日期private Long payment_date;public Long getPayment_id() {return payment_id;}public void setPayment_id(Long payment_id) {this.payment_id = payment_id;}public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Integer getPayment_amount() {return payment_amount;}public void setPayment_amount(Integer payment_amount) {this.payment_amount = payment_amount;}public String getPayment_type() {return payment_type;}public void setPayment_type(String payment_type) {this.payment_type = payment_type;}public Long getPayment_date() {return payment_date;}public void setPayment_date(Long payment_date) {this.payment_date = payment_date;}public Payments() {}public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {this.payment_id = payment_id;this.order_id = order_id;this.payment_amount = payment_amount;this.payment_type = payment_type;this.payment_date = payment_date;}@Overridepublic String toString() {return "payments{" +"payment_id=" + payment_id +", order_id=" + order_id +", payment_amount=" + payment_amount +", payment_type='" + payment_type + '\'' +", payment_date=" + payment_date +'}';}
}
商品类
用作维表测试
package com.zxl.bean;// TODO: 2024/1/6 商品类public class Products {//商品IDprivate Integer product_id;//商品名称private String product_name;//商品价格private Integer product_price;//商品库存private Long product_num;//商品分类private String product_type;public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public String getProduct_name() {return product_name;}public void setProduct_name(String product_name) {this.product_name = product_name;}public Integer getProduct_price() {return product_price;}public void setProduct_price(Integer product_price) {this.product_price = product_price;}public Long getProduct_num() {return product_num;}public void setProduct_num(Long product_num) {this.product_num = product_num;}public String getProduct_type() {return product_type;}public void setProduct_type(String product_type) {this.product_type = product_type;}public Products() {}public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {this.product_id = product_id;this.product_name = product_name;this.product_price = product_price;this.product_num = product_num;this.product_type = product_type;}@Overridepublic String toString() {return "products{" +"product_id=" + product_id +", product_name='" + product_name + '\'' +", product_price=" + product_price +", product_num=" + product_num +", product_type='" + product_type + '\'' +'}';}
}
数据生成
订单数据生成
package com.zxl.datas;import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class OrdersData implements SourceFunction<Orders> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//订单日期private static Long getOrder_date() {//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//用户IDprivate static Long getUser_id() {return random.nextLong();}//订单金额private static Integer getOrder_amount() {return random.nextInt(100);}//商品IDprivate static Integer getProduct_id() {return random.nextInt(100);}//订单数量private static Long getOrder_num() {return random.nextLong();}//订单类private static Orders getOrders() {Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());return orders;}@Overridepublic void run(SourceContext<Orders> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getOrders());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
支付数据生成
package com.zxl.datas;import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Date;
import java.util.Random;public class PaymentData implements SourceFunction<Payments> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//支付IDprivate static Long getPayment_id(){return random.nextLong();}//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//支付金额private static Integer getPayment_amount(){return random.nextInt(1000);}//支付类型private static String getPayment_type(){String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};int are= random.nextInt(6);String area=type[are];return area;}//支付日期private static Long getPayment_date(){//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//支付类private static Payments getPayments(){Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());return payments;}@Overridepublic void run(SourceContext<Payments> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getPayments());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}
测试数据打印
package com.zxl.flink;import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class flinkWorks {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据 DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/6 支付数据 DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());//打印数据paymentsDataStreamSource.print();ordersDataStreamSource.print();//启动程序environment.execute();}
}
相关文章:

Flink自定义Source模拟数据流
maven依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...

[易语言]使用易语言部署工业级人脸检测模型
【框架地址】 https://github.com/ShiqiYu/libfacedetection 【算法介绍】 Libfacedetection是一个开源的计算机视觉库,主要用于实时的人脸检测。它利用深度学习技术,特别是卷积神经网络(CNN),实现了高精度的脸部定位…...

2024年海外推广怎么做?
要说如何做海外推广,绝大多数的外贸小伙伴都能说上一些,但是大部分人对于推广系统知识略知一二,没有构建一个系统化的知识框架。 海外推广的几大步骤 1.定策略 做海外推广前,我们需要制定一套营销策略,需要去定义一…...

Redis分布式锁--java实现
文章目录 Redis分布式锁方案:SETNX EXPIRE基本原理比较好的实现会产生四个问题 几种解决原子性的方案方案:SETNX value值是(系统时间过期时间)方案:使用Lua脚本(包含SETNX EXPIRE两条指令)方案:SET的扩展…...

好消息,Linux Kernel 6.7正式发布!
据有关资料显示,该版本是有史以来合并数最多的版本之一,包含 17k 个非合并 commit,实际合并的超过1K个。 那么该版本主要有哪边变化呢?下面我来一一列举一下: Bcachefs文件系统已被合并到主线内核,这是一款…...

【k8s】Kubernetes 声明式 API、命令式
1. 资源管理方式: 1>. 命令式对象管理∶直接使用命令去操作kubernetes资源 kubectl run nginx-pod --imagenginx:1.17.1 --port802>. 命令式对象配置∶通过命令配置和配置文件去操作kubernetes资源 kubectl create/patch -f nginx-pod.yaml3>. 声明式对…...

解锁营销新高度:幽灵鲨CRM推广平台线索对接功能详解
数字营销时代,线索对接是推动业务增长的关键。你是否为线索分布在不同的平台而来回切换?你是否为无法及时联系客户而错失商机?幽灵鲨CRM系统作为一款领先的客户关系管理解决方案,不仅实现了对主流推广平台的全面对接,更…...

uniapp 创建组件
组件:用于将某个功能的 HTML、CSS、JS 封装到一个文件中,提高代码的复用性和可维护性。 创建组件 一、在根目录中创建 components 文件夹,右键点击新建组件。 二、输入组件名称、选择默认模板、点击创建组件。 三、在组件中正常编写内容即可…...

Linux--部署 Tomcat 及其负载均衡
1.案例前置知识点 1)Tomcat简介 名称由来:Tomcat最初是由 Sun的软件构架师詹姆斯邓肯戴维森开发的。后来他帮助将其变 为开源项目,并由Sun贡献给Apache软件基金会。由于大部分开源项目OReilly都会出一本相关的 书,并且将其封面设…...

影像组学介绍
影像组学介绍 1 影像组学介绍2 具体提取影像组学方法流程及工具代码:2.1 影像数据获取2.2 感兴趣区域分割2.3 特征提取与降维选择2.3.1 特征提取:2.3.2 特征降维(特征选择) 2.4 建模分析:2.5 结果分析 参考: 1 影像组学介绍 其实…...

什么是云服务器?云服务器的工作原理是介绍
阿里云服务器ECS英文全程Elastic Compute Service,云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务,阿里云提供多种云服务器ECS实例规格,如经济型e实例、通用算力型u1、ECS计算型c7、通用型g7、GPU实例等,阿里云百科aliyunbai…...

【前后端的那些事】前后端环境搭建+树形结构表格实现
文章目录 1. 前后端项目环境搭建2. table-tree2.1 后端准备2.2 前端准备 前言:最近写项目,发现了一些很有意思的功能,想写文章,录视频把这些内容记录下。但这些功能太零碎,如果为每个功能都单独搭建一个项目࿰…...

PHP版学校教务管理系统源码带文字安装教程
PHP版学校教务管理系统源码带文字安装教程 运行环境 服务器宝塔面板 PHP 7.0 Mysql 5.5及以上版本 Linux Centos7以上 系统介绍: 后台权限控制:支持多个管理员,学生管理,学生成绩,教师管理,文章管理&#x…...

前端背景收集之烟花背景
文章目录 🐒个人主页🏅Vue项目常用组件模板仓库📖前言:🎀源码如下: 🐒个人主页 🏅Vue项目常用组件模板仓库 📖前言: 本篇博客主要提供前端背景收集之烟花背景…...

PCL 格网法计算点云的占地面积
目录 一、算法原理二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT生成的文章。 一、算法原理 该方法主要用于粗略统计机载点云的占地面积。方法原理是将点云沿 X O Y XOY...

《设计模式的艺术》笔记 - 面向对象设计原则
1、单一职责原则 一个类只负责单一功能领域中的相应职责。 2、开闭原则 一个软件实体应当对扩展开放,对修改关闭。即软件实体应当尽量在不修改原有代码的情况下进行扩展。 3、里氏代换原则 所有引用基类的地方必须能透明地使用其子类的对象。即在软件中将一个基类…...

《Linux C编程实战》笔记:线程同步
这一节主要是解决共享资源的处理。操作系统里也讲过互斥、锁之类的概念。 互斥锁 互斥锁通过锁机制来实现线程同步,同一时刻只允许一个线程执行一个关键部分的代码 一下是操作互斥锁的函数,均声明在pthread.h中。 pthread_mutex_init(初始…...

leetcode141.环形链表
题目 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#…...

景联文科技:以高质量数据赋能文生图大模型
1月5日,在智求共赢・中国AIGC产业应用峰会暨无界AI生态合作伙伴大会上,中国AIGC产业联盟联合无界AI发布了《中国AIGC文生图产业白皮书2023》,从AIGC文生图发展历程、主流工具、产业实践以及规模预测等多个维度,全面揭示了中国AIGC…...

[论文笔记] PAI-Megatron中qwen和mistral合并到Megtron-LM
一、千问 关于tokenizer的改动: 1.1、更改build_tokenizer中tokenizer类的加载。 /mnt/nas/pretrain/code/Megatron-LM/megatron/tokenizer/__init__.py 或者 tokenizer.py 在build_tokenizer.py函数中: elif args.tokenizer_type == "QwenTokenizer":assert a…...

python设计模式有哪几种
Python 中常见的设计模式有以下几种 一 单例模式(Singleton Pattern):确保一个类只有一个实例,并提供全局访问点。 二 工厂模式(Factory Pattern):使用工厂方法来创建对象,而不是直…...

C语言从入门到实战——数据在内存中的存储方式
数据在内存中的存储方式 前言1. 整数在内存中的存储2. 大小端字节序和字节序判断2.1 什么是大小端2.2 为什么有大小端2.3 练习2.3.1 练习12.3.2 练习22.3.3 练习32.3.4 练习42.3.5 练习52.3.6 练习6 3. 浮点数在内存中的存储3.1 练习3.2 浮点数的存储3.2.1 浮点数存的过程3.2.2…...

高效便捷的远程管理利器——Royal TSX for Mac软件介绍
Royal TSX for Mac是一款功能强大、操作便捷的远程管理软件。无论是远程桌面、SSH、VNC、Telnet还是FTP,用户都可以通过Royal TSX轻松地远程连接和管理各种服务器、计算机和网络设备。 Royal TSX for Mac提供了直观的界面和丰富的功能,让用户能够快速便…...

Docker 部署后端项目自动化脚本
文章目录 开机自启动docker打包后端项目Dockerfile文件脚本文件使用 开机自启动docker systemctl enable dockersystemctl is-enabled docker打包后端项目 这里的项目位置是target同级目录 1.在项目下面新建一个bin目录 新建一个package.txt 写入下方代码后 后缀改为.bat ec…...

MySQL从0到1全教程【2】SQL语言的通用语法及分类
1 SQL语言的通用语法格式 无论是那种数据库的产品,SQL语法都是通用的。 SQL语句可以单行编写也可以多行编写,以分号结尾。SQL语句可以使用空格或者缩进的方式来增强语句的可读性,空格和缩进的数量没有限制。MySQL数据库的SQL语句是不区分大…...

【npm link】Node命令中的npm link命令的使用,还有CLI全局命令的使用,开发命令行工具必不可少的部分
😁 作者简介:一名大四的学生,致力学习前端开发技术 ⭐️个人主页:夜宵饽饽的主页 ❔ 系列专栏:NodeJs 👐学习格言:成功不是终点,失败也并非末日,最重要的是继续前进的勇气…...

Unity组件开发--相机跟随角色和旋转
1.相机跟随组件,节点: 2.相机跟随组件脚本: using System; using System.Collections; using System.Collections.Generic; using Unity.Burst.Intrinsics; using UnityEngine; using UnityEngine.UI;public class CameraFollow : Singleton&…...

JavaScript系列——Proxy(代理)
文章目录 概要Proxy 语法handler 对象的方法Proxy 示例常用handler 对象的方法的参数handler.get()语法示例 handler.set()语法示例 使用场景验证值修正及附加属性 小结 概要 Proxy 用于创建一个对象的代理,将对原对象上的操作(属性获取、赋值、函数调用…...

QT第三天
使用QT完成水果计价界面和功能,如下图: 运行结果: 代码: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QListWidgetItem>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_N…...

Jetpack Compose -> 声明式UI Modifier
前言 本章主要介绍下 Compose 的声明式 UI 以及初级写法; 什么是声明式UI 传统UI 传统 UI 方式来声明UI <androidx.appcompat.widget.LinearLayoutCompat android:layout_width"match_parent" android:layout_height"match_parent&quo…...