当前位置: 首页 > news >正文

Flink自定义Source模拟数据流

maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zxl</groupId><artifactId>FlinkJoin</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.22</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!--com.mysql.cj.jdbc.Driver--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--这个一定要加,否则会报错(5001好像是,记不清了)--><dependency><groupId>org.glassfish.jersey.inject</groupId><artifactId>jersey-hk2</artifactId><version>2.34</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version><scope>compile</scope></dependency></dependencies>
</project>

实体类

订单类

package com.zxl.bean;// TODO: 2024/1/6 订单类 public class Orders {//订单IDprivate Long order_id;//用户IDprivate Long user_id;//订单日期private Long order_date;//订单金额private Integer order_amount;//商品IDprivate Integer product_id;//订单数量private Long order_num;public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Long getUser_id() {return user_id;}public void setUser_id(Long user_id) {this.user_id = user_id;}public Long getOrder_date() {return order_date;}public void setOrder_date(Long order_date) {this.order_date = order_date;}public Integer getOrder_amount() {return order_amount;}public void setOrder_amount(Integer order_amount) {this.order_amount = order_amount;}public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public Long getOrder_num() {return order_num;}public void setOrder_num(Long order_num) {this.order_num = order_num;}public Orders() {}public Orders(Long order_id, Long user_id, Long order_date, Integer order_amount, Integer product_id, Long order_num) {this.order_id = order_id;this.user_id = user_id;this.order_date = order_date;this.order_amount = order_amount;this.product_id = product_id;this.order_num = order_num;}@Overridepublic String toString() {return "Orders{" +"order_id=" + order_id +", user_id=" + user_id +", order_date=" + order_date +", order_amount=" + order_amount +", product_id=" + product_id +", order_num=" + order_num +'}';}
}

支付类

package com.zxl.bean;// TODO: 2024/1/6 支付类 public class Payments {//支付IDprivate Long payment_id;//订单号private Long order_id;//支付金额private Integer payment_amount;//支付类型private String payment_type;//支付日期private Long payment_date;public Long getPayment_id() {return payment_id;}public void setPayment_id(Long payment_id) {this.payment_id = payment_id;}public Long getOrder_id() {return order_id;}public void setOrder_id(Long order_id) {this.order_id = order_id;}public Integer getPayment_amount() {return payment_amount;}public void setPayment_amount(Integer payment_amount) {this.payment_amount = payment_amount;}public String getPayment_type() {return payment_type;}public void setPayment_type(String payment_type) {this.payment_type = payment_type;}public Long getPayment_date() {return payment_date;}public void setPayment_date(Long payment_date) {this.payment_date = payment_date;}public Payments() {}public Payments(Long payment_id, Long order_id, Integer payment_amount, String payment_type, Long payment_date) {this.payment_id = payment_id;this.order_id = order_id;this.payment_amount = payment_amount;this.payment_type = payment_type;this.payment_date = payment_date;}@Overridepublic String toString() {return "payments{" +"payment_id=" + payment_id +", order_id=" + order_id +", payment_amount=" + payment_amount +", payment_type='" + payment_type + '\'' +", payment_date=" + payment_date +'}';}
}

商品类

用作维表测试

package com.zxl.bean;// TODO: 2024/1/6 商品类public class Products {//商品IDprivate Integer product_id;//商品名称private String product_name;//商品价格private Integer product_price;//商品库存private Long product_num;//商品分类private String product_type;public Integer getProduct_id() {return product_id;}public void setProduct_id(Integer product_id) {this.product_id = product_id;}public String getProduct_name() {return product_name;}public void setProduct_name(String product_name) {this.product_name = product_name;}public Integer getProduct_price() {return product_price;}public void setProduct_price(Integer product_price) {this.product_price = product_price;}public Long getProduct_num() {return product_num;}public void setProduct_num(Long product_num) {this.product_num = product_num;}public String getProduct_type() {return product_type;}public void setProduct_type(String product_type) {this.product_type = product_type;}public Products() {}public Products(Integer product_id, String product_name, Integer product_price, Long product_num, String product_type) {this.product_id = product_id;this.product_name = product_name;this.product_price = product_price;this.product_num = product_num;this.product_type = product_type;}@Overridepublic String toString() {return "products{" +"product_id=" + product_id +", product_name='" + product_name + '\'' +", product_price=" + product_price +", product_num=" + product_num +", product_type='" + product_type + '\'' +'}';}
}

数据生成

订单数据生成

package com.zxl.datas;import com.zxl.bean.Orders;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;public class OrdersData implements SourceFunction<Orders> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//订单日期private static Long getOrder_date() {//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//用户IDprivate static Long getUser_id() {return random.nextLong();}//订单金额private static Integer getOrder_amount() {return random.nextInt(100);}//商品IDprivate static Integer getProduct_id() {return random.nextInt(100);}//订单数量private static Long getOrder_num() {return random.nextLong();}//订单类private static Orders getOrders() {Orders orders = new Orders(getOrder_id(), getUser_id(), getOrder_date(), getOrder_amount(), getProduct_id(), getOrder_num());return orders;}@Overridepublic void run(SourceContext<Orders> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getOrders());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}

支付数据生成

package com.zxl.datas;import com.zxl.bean.Payments;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Date;
import java.util.Random;public class PaymentData implements SourceFunction<Payments> {private static Random random = new Random();private static boolean isRunning = true;private static Integer num = 0;//支付IDprivate static Long getPayment_id(){return random.nextLong();}//订单IDprivate static Long getOrder_id() {num++;long aLong = Long.parseLong(num.toString());return aLong;}//支付金额private static Integer getPayment_amount(){return random.nextInt(1000);}//支付类型private static String getPayment_type(){String[] type = {"银行卡", "支付宝", "微信", "美团", "抖音", "现金"};int are= random.nextInt(6);String area=type[are];return area;}//支付日期private static Long getPayment_date(){//为了模拟数据延迟所里利用随机数进行模拟时间int i = random.nextInt(15);return Long.valueOf(i);}//支付类private static Payments getPayments(){Payments payments = new Payments(getPayment_id(),getOrder_id(),getPayment_amount(),getPayment_type(),getPayment_date());return payments;}@Overridepublic void run(SourceContext<Payments> sourceContext) throws Exception {while (isRunning) {sourceContext.collect(getPayments());Thread.sleep(1000);}}@Overridepublic void cancel() {isRunning = false;}
}

测试数据打印

package com.zxl.flink;import com.zxl.bean.Orders;
import com.zxl.bean.Payments;
import com.zxl.datas.OrdersData;
import com.zxl.datas.PaymentData;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class flinkWorks {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据 DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/6 支付数据 DataStreamSource<Payments> paymentsDataStreamSource = environment.addSource(new PaymentData());//打印数据paymentsDataStreamSource.print();ordersDataStreamSource.print();//启动程序environment.execute();}
}

在这里插入图片描述

相关文章:

Flink自定义Source模拟数据流

maven依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.…...

[易语言]使用易语言部署工业级人脸检测模型

【框架地址】 https://github.com/ShiqiYu/libfacedetection 【算法介绍】 Libfacedetection是一个开源的计算机视觉库&#xff0c;主要用于实时的人脸检测。它利用深度学习技术&#xff0c;特别是卷积神经网络&#xff08;CNN&#xff09;&#xff0c;实现了高精度的脸部定位…...

2024年海外推广怎么做?

要说如何做海外推广&#xff0c;绝大多数的外贸小伙伴都能说上一些&#xff0c;但是大部分人对于推广系统知识略知一二&#xff0c;没有构建一个系统化的知识框架。 海外推广的几大步骤 1.定策略 做海外推广前&#xff0c;我们需要制定一套营销策略&#xff0c;需要去定义一…...

Redis分布式锁--java实现

文章目录 Redis分布式锁方案&#xff1a;SETNX EXPIRE基本原理比较好的实现会产生四个问题 几种解决原子性的方案方案&#xff1a;SETNX value值是&#xff08;系统时间过期时间&#xff09;方案&#xff1a;使用Lua脚本(包含SETNX EXPIRE两条指令)方案&#xff1a;SET的扩展…...

好消息,Linux Kernel 6.7正式发布!

据有关资料显示&#xff0c;该版本是有史以来合并数最多的版本之一&#xff0c;包含 17k 个非合并 commit&#xff0c;实际合并的超过1K个。 那么该版本主要有哪边变化呢&#xff1f;下面我来一一列举一下&#xff1a; Bcachefs文件系统已被合并到主线内核&#xff0c;这是一款…...

【k8s】Kubernetes 声明式 API、命令式

1. 资源管理方式&#xff1a; 1>. 命令式对象管理∶直接使用命令去操作kubernetes资源 kubectl run nginx-pod --imagenginx:1.17.1 --port802>. 命令式对象配置∶通过命令配置和配置文件去操作kubernetes资源 kubectl create/patch -f nginx-pod.yaml3>. 声明式对…...

解锁营销新高度:幽灵鲨CRM推广平台线索对接功能详解

数字营销时代&#xff0c;线索对接是推动业务增长的关键。你是否为线索分布在不同的平台而来回切换&#xff1f;你是否为无法及时联系客户而错失商机&#xff1f;幽灵鲨CRM系统作为一款领先的客户关系管理解决方案&#xff0c;不仅实现了对主流推广平台的全面对接&#xff0c;更…...

uniapp 创建组件

组件&#xff1a;用于将某个功能的 HTML、CSS、JS 封装到一个文件中&#xff0c;提高代码的复用性和可维护性。 创建组件 一、在根目录中创建 components 文件夹&#xff0c;右键点击新建组件。 二、输入组件名称、选择默认模板、点击创建组件。 三、在组件中正常编写内容即可…...

Linux--部署 Tomcat 及其负载均衡

1.案例前置知识点 1&#xff09;Tomcat简介 名称由来&#xff1a;Tomcat最初是由 Sun的软件构架师詹姆斯邓肯戴维森开发的。后来他帮助将其变 为开源项目&#xff0c;并由Sun贡献给Apache软件基金会。由于大部分开源项目OReilly都会出一本相关的 书&#xff0c;并且将其封面设…...

影像组学介绍

影像组学介绍 1 影像组学介绍2 具体提取影像组学方法流程及工具代码&#xff1a;2.1 影像数据获取2.2 感兴趣区域分割2.3 特征提取与降维选择2.3.1 特征提取&#xff1a;2.3.2 特征降维(特征选择) 2.4 建模分析&#xff1a;2.5 结果分析 参考&#xff1a; 1 影像组学介绍 其实…...

什么是云服务器?云服务器的工作原理是介绍

阿里云服务器ECS英文全程Elastic Compute Service&#xff0c;云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务&#xff0c;阿里云提供多种云服务器ECS实例规格&#xff0c;如经济型e实例、通用算力型u1、ECS计算型c7、通用型g7、GPU实例等&#xff0c;阿里云百科aliyunbai…...

【前后端的那些事】前后端环境搭建+树形结构表格实现

文章目录 1. 前后端项目环境搭建2. table-tree2.1 后端准备2.2 前端准备 前言&#xff1a;最近写项目&#xff0c;发现了一些很有意思的功能&#xff0c;想写文章&#xff0c;录视频把这些内容记录下。但这些功能太零碎&#xff0c;如果为每个功能都单独搭建一个项目&#xff0…...

PHP版学校教务管理系统源码带文字安装教程

PHP版学校教务管理系统源码带文字安装教程 运行环境 服务器宝塔面板 PHP 7.0 Mysql 5.5及以上版本 Linux Centos7以上 系统介绍&#xff1a; 后台权限控制&#xff1a;支持多个管理员&#xff0c;学生管理&#xff0c;学生成绩&#xff0c;教师管理&#xff0c;文章管理&#x…...

前端背景收集之烟花背景

文章目录 &#x1f412;个人主页&#x1f3c5;Vue项目常用组件模板仓库&#x1f4d6;前言&#xff1a;&#x1f380;源码如下&#xff1a; &#x1f412;个人主页 &#x1f3c5;Vue项目常用组件模板仓库 &#x1f4d6;前言&#xff1a; 本篇博客主要提供前端背景收集之烟花背景…...

PCL 格网法计算点云的占地面积

目录 一、算法原理二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT生成的文章。 一、算法原理 该方法主要用于粗略统计机载点云的占地面积。方法原理是将点云沿 X O Y XOY...

《设计模式的艺术》笔记 - 面向对象设计原则

1、单一职责原则 一个类只负责单一功能领域中的相应职责。 2、开闭原则 一个软件实体应当对扩展开放&#xff0c;对修改关闭。即软件实体应当尽量在不修改原有代码的情况下进行扩展。 3、里氏代换原则 所有引用基类的地方必须能透明地使用其子类的对象。即在软件中将一个基类…...

《Linux C编程实战》笔记:线程同步

这一节主要是解决共享资源的处理。操作系统里也讲过互斥、锁之类的概念。 互斥锁 互斥锁通过锁机制来实现线程同步&#xff0c;同一时刻只允许一个线程执行一个关键部分的代码 一下是操作互斥锁的函数&#xff0c;均声明在pthread.h中。 pthread_mutex_init&#xff08;初始…...

leetcode141.环形链表

题目 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&#…...

景联文科技:以高质量数据赋能文生图大模型

1月5日&#xff0c;在智求共赢・中国AIGC产业应用峰会暨无界AI生态合作伙伴大会上&#xff0c;中国AIGC产业联盟联合无界AI发布了《中国AIGC文生图产业白皮书2023》&#xff0c;从AIGC文生图发展历程、主流工具、产业实践以及规模预测等多个维度&#xff0c;全面揭示了中国AIGC…...

[论文笔记] PAI-Megatron中qwen和mistral合并到Megtron-LM

一、千问 关于tokenizer的改动: 1.1、更改build_tokenizer中tokenizer类的加载。 /mnt/nas/pretrain/code/Megatron-LM/megatron/tokenizer/__init__.py 或者 tokenizer.py 在build_tokenizer.py函数中: ​elif args.tokenizer_type == "QwenTokenizer":assert a…...

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站&#xff0c;会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后&#xff0c;网站没有变化的情况。 不熟悉siteground主机的新手&#xff0c;遇到这个问题&#xff0c;就很抓狂&#xff0c;明明是哪都没操作错误&#x…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

【网络安全产品大调研系列】2. 体验漏洞扫描

前言 2023 年漏洞扫描服务市场规模预计为 3.06&#xff08;十亿美元&#xff09;。漏洞扫描服务市场行业预计将从 2024 年的 3.48&#xff08;十亿美元&#xff09;增长到 2032 年的 9.54&#xff08;十亿美元&#xff09;。预测期内漏洞扫描服务市场 CAGR&#xff08;增长率&…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

Linux-07 ubuntu 的 chrome 启动不了

文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了&#xff0c;报错如下四、启动不了&#xff0c;解决如下 总结 问题原因 在应用中可以看到chrome&#xff0c;但是打不开(说明&#xff1a;原来的ubuntu系统出问题了&#xff0c;这个是备用的硬盘&a…...

Rust 异步编程

Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

回溯算法学习

一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek

文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama&#xff08;有网络的电脑&#xff09;2.2.3 安装Ollama&#xff08;无网络的电脑&#xff09;2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...