当前位置: 首页 > news >正文

1. Flink自定义Source

一. Source 简介

DataStream是Flink的低级API,用于进行数据的实时处理,Flink编程模型分为Source、Transformation、Sink三个部分,如下图所示。
在这里插入图片描述

默认Flink提供了大量的内置Source,常见的Source如下:

  • 基于文件的Source
  • 基于Socket的Source
  • 基于集合的Source
  • 基于Kafka消息队列的Source

当以上内置Source不能满足业务需要时,可以实现自定义Source。

Flink中有关Source的接口类的继承关系如下:
在这里插入图片描述

  • SourceFunction:单并行度Source的基类
  • RichSourceFunction:单并行度增强型Source的基类
  • ParallelSourceFunction:多并行度Source的基类
  • RichParallelSourceFunction:多并行度增强型Source的基类
二. 自定义单并行度Source

自定义单并行度的source需要实现SourceFunction接口。

代码实现

MySource.java

package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class MySource implements SourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {// "Num"加上0~100的随机数生成一个字符串ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("source_demo");}
}

运行结果

5> Num: 62
6> Num: 91
7> Num: 13
8> Num: 53
三. 自定义多并行度Source

自定义多并行度的source需要实现ParallelSourceFunction接口。

代码实现

MyParallelSource.java

package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;public class MyParallelSource implements ParallelSourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MyParallelSource());source.print();env.execute("source_demo");}
}

运行结果

7> Num: 43
8> Num: 30
1> Num: 92
2> Num: 50
5> Num: 39
6> Num: 6
4> Num: 20
3> Num: 2
四. 自定义单并行度增强型Source

增强型Source额外提供了open和close方法,可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。

在mysql中创建student表,并插入三条数据。

create table student (id int primary key,name varchar(50),age int
);insert into student values(1, "name1", 20),(2, "name2", 30), (3, "name3", 15);

实现代码

Student.java

package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id = id;this.name = name;this.age = age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}
}

MysqlSource.java

package flink.basic.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;public class MysqlSource extends RichSourceFunction<Student> {Connection conn;Statement stmt;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://192.168.47.130:3306/test";String user = "root";String password = "root";conn = DriverManager.getConnection(url,user,password);stmt = conn.createStatement();}@Overridepublic void run(SourceContext<Student> ctx) throws Exception {ResultSet rs = stmt.executeQuery("select * from student");while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}rs.close();}@Overridepublic void cancel() {}@Overridepublic void close() throws Exception {stmt.close();conn.close();}
}

SourceDemo.java

package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSource<Student> source = env.addSource(new MysqlSource());source.print();env.execute("source_demo");}
}

运行结果

1> Student{id=3, name='name3', age=15}
8> Student{id=2, name='name2', age=30}
7> Student{id=1, name='name1', age=20}

相关文章:

1. Flink自定义Source

一. Source 简介 DataStream是Flink的低级API&#xff0c;用于进行数据的实时处理&#xff0c;Flink编程模型分为Source、Transformation、Sink三个部分&#xff0c;如下图所示。 默认Flink提供了大量的内置Source&#xff0c;常见的Source如下&#xff1a; 基于文件的Sour…...

关于LinuxWindows双系统在八月更新后出现的问题

问题描述类似于&#xff1a;Verifying shim SBAT data failed: If you are, this is caused by a reported problem in the August update if you can get into Windows, either uninstall the August update, or open Command Prompt as administrator and run this command,…...

VMware:如何在CentOS7上开启22端口

打开虚拟机&#xff1a;【编辑】【虚拟机网络设置】 其中填入的虚拟机IP地址是虚拟机中centos的IP地址&#xff0c;虚拟机端口为需要映射的centos端口 配置好之后保存&#xff0c;打开宿主机 win cmd telnet 192.168.1.26 22 如果出现上述窗口&#xff0c;则说明已经成功开放…...

ubuntu远程桌面开启opengl渲染权限

背景 最近用windows的【远程桌面连接】登录ubuntu后&#xff08;xrdp协议&#xff09;&#xff0c;发现gl环境是集显的&#xff0c;但是本地登录ubuntu桌面后是独显&#xff08;英伟达&#xff09;&#xff0c;想要在远程桌面上也用独显渲染环境。 一、查看是独显还是集显环境…...

从小学题到技术选型哲学:以智能客服系统为例,解读相关AI技术栈20241211

&#x1f9e0;&#x1f4a1;从小学题到技术选型哲学&#xff1a;以智能客服系统为例&#xff0c;解读相关AI技术栈 引言&#xff1a;从小学数学题到技术智慧 &#x1f4da;✨ 在小学数学题中&#xff0c;有这样一道问题&#xff1a; “一个长方形变成平行四边形后&#xff0c…...

【C语言练习(5)—回文数判断】

C语言练习&#xff08;5&#xff09; 文章目录 C语言练习&#xff08;5&#xff09;前言问题问题解析结果总结 前言 通过回文数练习&#xff0c;巩固数字取余和取商如何写代码 问题 输入一个五位数判断是否为回文数&#xff1f; 问题解析 回文数是指正读反读都一样的整数。…...

【Rust 学习笔记】Rust 基础数据类型介绍——数组、向量和切片

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 博客内容主要围绕&#xff1a; 5G/6G协议讲解 高级C语言讲解 Rust语言讲解 文章目录 Rust 基础数据类型介绍——数组、向量和切片一、数组、向量和…...

2024年特别报告,「十大生活方式」研究数据报告

“一朵花成轻奢品、一只玩偶掀抢购狂潮、一片荒地变文旅圣地…” 近年爆火的野兽派、Jellycat、阿那亚等诸多品牌&#xff0c;与消费者选择的生活方式息息相关。 今年小红书的内容种草、直播电商&#xff0c;也都依循着“生活方式”的轨迹。生活方式的价值所向&#xff0c;可…...

R中单细胞RNA-seq分析教程 (5)

引言 本系列开启R中单细胞RNA-seq数据分析教程[1]&#xff0c;持续更新&#xff0c;欢迎关注&#xff0c;转发&#xff01; 10. 伪时间细胞排序 如前所述&#xff0c;在 UMAP 嵌入中看到的背侧端脑细胞形成的类似轨迹的结构&#xff0c;很可能代表了背侧端脑兴奋性神经元的分化…...

openpnp - Too many misdetects - retry and verify fiducial/nozzle tip detection

文章目录 openpnp - Too many misdetects - retry and verify fiducial/nozzle tip detection概述笔记环境光最好弱一些在设备标定时&#xff0c;吸嘴上不要装绿色屏蔽片如果吸嘴不在底部相机中间&#xff0c;先检查设置底部相机坐标调整底部相机坐标 吸嘴校验的细节底部相机坐…...

不与最大数相同的数字之和

不与最大数相同的数字之和 C语言代码C 语言代码Java语言代码Python语言代码 &#x1f490;The Begin&#x1f490;点点关注&#xff0c;收藏不迷路&#x1f490; 输出一个整数数列中不与最大数相同的数字之和。 输入 输入分为两行&#xff1a; 第一行为N(N为接下来数的个数&…...

CSS学习记录11

CSS布局 - display属性 display属性是用于控制布局的最终要的CSS属性。display 属性规定是否/如何显示元素。每个HTML元素都有一个默认的display值&#xff0c;具体取决于它的元素类型。大多数元素的默认display值为block 或 inline。 块级元素&#xff08;block element&…...

D95【python 接口自动化学习】- pytest进阶之fixture用法

day95 pytest的fixture详解&#xff08;二&#xff09; 学习日期&#xff1a;20241210 学习目标&#xff1a;pytest基础用法 -- pytest的fixture详解&#xff08;二&#xff09; 学习笔记&#xff1a; fixture(autouseTrue) func的autouse是TRUE时&#xff0c;所有函数方法…...

Abaqus断层扫描三维重建插件CT2Model 3D V1.1版本更新

更新说明 Abaqus AbyssFish CT2Model3D V1.1版本更新新增对TIF、TIFF图像文件格式的支持。本插件用户可免费获取升级服务。 插件介绍 插件说明&#xff1a; Abaqus基于CT断层扫描的三维重建插件CT2Model 3D 应用案例&#xff1a; ABAQUS基于CT断层扫描的细观混凝土三维重建…...

隐式对象和泛型

implicit object 作用&#xff1a; case class DatabaseConfig(driver:String,url:String)//作为函数的隐士参数的默认值implicit object MySqlDefault extends DatabaseConfig("mysql","localhost:443")def getConn(implicit config: DatabaseConfig):Uni…...

CSS的颜色表示方式

以下介绍几种常见的CSS颜色表示方式&#xff1a; 颜色名称 html和css规范中定义了147种可用的颜色名用的相对较少 16进制表示 css三原色&#xff1a;红、绿、蓝16进制的颜色值&#xff1a; #rrggbb16进制整数规定颜色成分&#xff0c;所有的值均介于 00 - ff 之间&#xff…...

单链表常见面试题 —— LeetCode

一.删除链表中与val相等的所有节点 1.题目描述 ----- 203. 移除链表元素 - 力扣&#xff08;LeetCode&#xff09; 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 列表中的节点数目在范…...

Pydantic中的discriminator:优雅地处理联合类型详解

Pydantic中的discriminator&#xff1a;优雅地处理联合类型详解 引言1. 什么是discriminator&#xff1f;2. 基本使用示例3. discriminator的工作原理4. 更复杂的实际应用场景5. 使用建议6. 潜在陷阱和注意事项结论最佳实践 引言 在Python的类型系统中&#xff0c;有时我们需要…...

pgloader SQLSERVER -> PostgreSQL 配置文件样例

pgloader 是什么&#xff1f;安装和基本用户法可以去其他同道的blog上去看&#xff0c;这里不占用网络空间了。刚开始用官方的文档读起还是很费劲的&#xff0c;所以把常用的配置例子放在这里。 官方文档&#xff1a;https://pgloader.readthedocs.io/en/latest/index.html 迁…...

APP、小程序对接聚合广告平台,有哪些广告变现策略?

开发者对接聚合广告平台&#xff0c;可以让自身流量价值最大化&#xff0c;获得更多的广告曝光机会&#xff0c;对接单一的广告联盟容易造成广告填充不足&#xff0c;收益不稳定的问题。#APP广告变现# APP开发者根据应用的生命周期、用户特征和产品定位&#xff0c;选择最适合…...

[具身智能-659]:ROS2 与人类大脑神经系统 完整类比 + 异同对比总结

一、整体核心类比ROS2 就是人工机器人版的「中枢神经系统」机器人的硬件架构、节点分工、消息通信、协同逻辑&#xff0c;完全复刻人脑神经工作模式&#xff1a;CPU/GPU计算单元为算法节点 大脑皮层&#xff08;认知、推理、决策、多模态理解&#xff09;MCU 传感器 / 运动节点…...

本地大模型推理引擎:高性能、可编程的部署与优化实战

1. 项目概述&#xff1a;一个为本地大模型打造的“瑞士军刀”式推理引擎如果你最近在折腾本地部署的大语言模型&#xff0c;比如Llama、Qwen或者DeepSeek&#xff0c;那你大概率遇到过这样的场景&#xff1a;模型文件下载好了&#xff0c;推理框架也装上了&#xff0c;但实际跑…...

crawdad-openclaw:构建高韧性智能爬虫的模块化框架实战

1. 项目概述&#xff1a;一个为数据抓取而生的开源“机械爪”如果你和我一样&#xff0c;在数据工程或网络爬虫领域摸爬滚打过几年&#xff0c;那你一定经历过这样的时刻&#xff1a;面对一个结构复杂、反爬机制严密的网站&#xff0c;你精心编写的爬虫脚本在运行了几个小时后&…...

Godot任务系统设计:数据驱动与事件驱动的游戏任务框架

1. 项目概述&#xff1a;为Godot游戏注入灵魂的“任务系统”如果你用Godot引擎做过游戏&#xff0c;尤其是RPG、冒险或者任何需要引导玩家推进流程的类型&#xff0c;你肯定琢磨过一件事&#xff1a;怎么搞一个靠谱的任务系统&#xff1f;是硬编码一堆if-else判断任务状态&…...

【审计专栏-监督监管领域】【信息科学与工程学】【社会科学】第十篇 社会底层核心规则(核心权力、核心利益、核心资源绑定、私下运作、关键价值交换、上下博弈)04

模型046:企业复杂利益链与多方利益博弈模型 1. 模型概述 项目 内容 模型名称​ 企业复杂利益链与多方利益博弈模型 核心场景​ 一家大型建筑企业“宏建集团”中标某市的地铁延长线建设项目。项目涉及总包方(宏建)、多个分包商(土建、机电、装修等)、材料供应商、监理…...

时序逻辑与值函数分解在强化学习中的应用

1. 时序逻辑与值函数分解的核心原理 时序逻辑&#xff08;Temporal Logic, TL&#xff09;作为形式化方法的重要分支&#xff0c;其本质是通过数学语言描述系统在时间维度上的行为约束。在控制理论与强化学习领域&#xff0c;TL的价值在于将复杂的任务需求转化为可计算的优化目…...

Vivado HLS高效IP开发与优化实战指南

1. Vivado HLS高效IP开发实战解析在FPGA设计领域&#xff0c;高层次综合&#xff08;HLS&#xff09;技术正在彻底改变传统RTL设计流程。作为Xilinx设计套件的核心组件&#xff0c;Vivado HLS允许开发者直接使用C/C等高级语言描述硬件功能&#xff0c;通过自动化转换生成优化的…...

一文分清Agent与Skill

在AI应用开发或学习过程中&#xff0c;很多人都会陷入一个困惑&#xff1a;Agent和Skill到底有什么区别&#xff1f;其实只要抓住“定位”和“能力”两个核心&#xff0c;就能轻松拨开迷雾&#xff0c;把这两个概念彻底分清。 先懂Skill 先从我们最熟悉的Skill说起。Skill是封装…...

如何通过Elden Ring FPS Unlock And More解锁《艾尔登法环》全部性能:新手完整指南

如何通过Elden Ring FPS Unlock And More解锁《艾尔登法环》全部性能&#xff1a;新手完整指南 【免费下载链接】EldenRingFpsUnlockAndMore A small utility to remove frame rate limit, change FOV, add widescreen support and more for Elden Ring 项目地址: https://gi…...

AMD Ryzen处理器深度调试:5个关键功能助你完全掌控硬件性能

AMD Ryzen处理器深度调试&#xff1a;5个关键功能助你完全掌控硬件性能 【免费下载链接】SMUDebugTool A dedicated tool to help write/read various parameters of Ryzen-based systems, such as manual overclock, SMU, PCI, CPUID, MSR and Power Table. 项目地址: https…...