1. Flink自定义Source
一. Source 简介
DataStream是Flink的低级API,用于进行数据的实时处理,Flink编程模型分为Source、Transformation、Sink三个部分,如下图所示。

默认Flink提供了大量的内置Source,常见的Source如下:
- 基于文件的Source
- 基于Socket的Source
- 基于集合的Source
- 基于Kafka消息队列的Source
当以上内置Source不能满足业务需要时,可以实现自定义Source。
Flink中有关Source的接口类的继承关系如下:

- SourceFunction:单并行度Source的基类
- RichSourceFunction:单并行度增强型Source的基类
- ParallelSourceFunction:多并行度Source的基类
- RichParallelSourceFunction:多并行度增强型Source的基类
二. 自定义单并行度Source
自定义单并行度的source需要实现SourceFunction接口。
代码实现:
MySource.java
package flink.basic.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;public class MySource implements SourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {// "Num"加上0~100的随机数生成一个字符串ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}
SourceDemo.java
package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MySource());source.print();env.execute("source_demo");}
}
运行结果:
5> Num: 62
6> Num: 91
7> Num: 13
8> Num: 53
三. 自定义多并行度Source
自定义多并行度的source需要实现ParallelSourceFunction接口。
代码实现:
MyParallelSource.java
package flink.basic.source;import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.Random;public class MyParallelSource implements ParallelSourceFunction<String> {boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {Random random = new Random();while (running) {ctx.collect("Num: " + random.nextInt(100));Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}
SourceDemo.java
package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new MyParallelSource());source.print();env.execute("source_demo");}
}
运行结果:
7> Num: 43
8> Num: 30
1> Num: 92
2> Num: 50
5> Num: 39
6> Num: 6
4> Num: 20
3> Num: 2
四. 自定义单并行度增强型Source
增强型Source额外提供了open和close方法,可以用于自定义Source的初始化和清理工作。单并行度增强型Source需要实现RichSourceFunction接口。下面演示实现读取mysql表的单并行度Source。
在mysql中创建student表,并插入三条数据。
create table student (id int primary key,name varchar(50),age int
);insert into student values(1, "name1", 20),(2, "name2", 30), (3, "name3", 15);
实现代码
Student.java
package flink.basic.source;public class Student {private int id;private String name;private int age;public Student(int id, String name, int age) {this.id = id;this.name = name;this.age = age;}public Student() {}public int getId() {return id;}public void setId(int id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "Student{" +"id=" + id +", name='" + name + '\'' +", age=" + age +'}';}
}
MysqlSource.java
package flink.basic.source;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;public class MysqlSource extends RichSourceFunction<Student> {Connection conn;Statement stmt;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.cj.jdbc.Driver");String url = "jdbc:mysql://192.168.47.130:3306/test";String user = "root";String password = "root";conn = DriverManager.getConnection(url,user,password);stmt = conn.createStatement();}@Overridepublic void run(SourceContext<Student> ctx) throws Exception {ResultSet rs = stmt.executeQuery("select * from student");while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}rs.close();}@Overridepublic void cancel() {}@Overridepublic void close() throws Exception {stmt.close();conn.close();}
}
SourceDemo.java
package flink.basic.source;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SourceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();// 添加mysql SourceDataStreamSource<Student> source = env.addSource(new MysqlSource());source.print();env.execute("source_demo");}
}
运行结果:
1> Student{id=3, name='name3', age=15}
8> Student{id=2, name='name2', age=30}
7> Student{id=1, name='name1', age=20}
相关文章:
1. Flink自定义Source
一. Source 简介 DataStream是Flink的低级API,用于进行数据的实时处理,Flink编程模型分为Source、Transformation、Sink三个部分,如下图所示。 默认Flink提供了大量的内置Source,常见的Source如下: 基于文件的Sour…...
关于LinuxWindows双系统在八月更新后出现的问题
问题描述类似于:Verifying shim SBAT data failed: If you are, this is caused by a reported problem in the August update if you can get into Windows, either uninstall the August update, or open Command Prompt as administrator and run this command,…...
VMware:如何在CentOS7上开启22端口
打开虚拟机:【编辑】【虚拟机网络设置】 其中填入的虚拟机IP地址是虚拟机中centos的IP地址,虚拟机端口为需要映射的centos端口 配置好之后保存,打开宿主机 win cmd telnet 192.168.1.26 22 如果出现上述窗口,则说明已经成功开放…...
ubuntu远程桌面开启opengl渲染权限
背景 最近用windows的【远程桌面连接】登录ubuntu后(xrdp协议),发现gl环境是集显的,但是本地登录ubuntu桌面后是独显(英伟达),想要在远程桌面上也用独显渲染环境。 一、查看是独显还是集显环境…...
从小学题到技术选型哲学:以智能客服系统为例,解读相关AI技术栈20241211
🧠💡从小学题到技术选型哲学:以智能客服系统为例,解读相关AI技术栈 引言:从小学数学题到技术智慧 📚✨ 在小学数学题中,有这样一道问题: “一个长方形变成平行四边形后,…...
【C语言练习(5)—回文数判断】
C语言练习(5) 文章目录 C语言练习(5)前言问题问题解析结果总结 前言 通过回文数练习,巩固数字取余和取商如何写代码 问题 输入一个五位数判断是否为回文数? 问题解析 回文数是指正读反读都一样的整数。…...
【Rust 学习笔记】Rust 基础数据类型介绍——数组、向量和切片
博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 博客内容主要围绕: 5G/6G协议讲解 高级C语言讲解 Rust语言讲解 文章目录 Rust 基础数据类型介绍——数组、向量和切片一、数组、向量和…...
2024年特别报告,「十大生活方式」研究数据报告
“一朵花成轻奢品、一只玩偶掀抢购狂潮、一片荒地变文旅圣地…” 近年爆火的野兽派、Jellycat、阿那亚等诸多品牌,与消费者选择的生活方式息息相关。 今年小红书的内容种草、直播电商,也都依循着“生活方式”的轨迹。生活方式的价值所向,可…...
R中单细胞RNA-seq分析教程 (5)
引言 本系列开启R中单细胞RNA-seq数据分析教程[1],持续更新,欢迎关注,转发! 10. 伪时间细胞排序 如前所述,在 UMAP 嵌入中看到的背侧端脑细胞形成的类似轨迹的结构,很可能代表了背侧端脑兴奋性神经元的分化…...
openpnp - Too many misdetects - retry and verify fiducial/nozzle tip detection
文章目录 openpnp - Too many misdetects - retry and verify fiducial/nozzle tip detection概述笔记环境光最好弱一些在设备标定时,吸嘴上不要装绿色屏蔽片如果吸嘴不在底部相机中间,先检查设置底部相机坐标调整底部相机坐标 吸嘴校验的细节底部相机坐…...
不与最大数相同的数字之和
不与最大数相同的数字之和 C语言代码C 语言代码Java语言代码Python语言代码 💐The Begin💐点点关注,收藏不迷路💐 输出一个整数数列中不与最大数相同的数字之和。 输入 输入分为两行: 第一行为N(N为接下来数的个数&…...
CSS学习记录11
CSS布局 - display属性 display属性是用于控制布局的最终要的CSS属性。display 属性规定是否/如何显示元素。每个HTML元素都有一个默认的display值,具体取决于它的元素类型。大多数元素的默认display值为block 或 inline。 块级元素(block element&…...
D95【python 接口自动化学习】- pytest进阶之fixture用法
day95 pytest的fixture详解(二) 学习日期:20241210 学习目标:pytest基础用法 -- pytest的fixture详解(二) 学习笔记: fixture(autouseTrue) func的autouse是TRUE时,所有函数方法…...
Abaqus断层扫描三维重建插件CT2Model 3D V1.1版本更新
更新说明 Abaqus AbyssFish CT2Model3D V1.1版本更新新增对TIF、TIFF图像文件格式的支持。本插件用户可免费获取升级服务。 插件介绍 插件说明: Abaqus基于CT断层扫描的三维重建插件CT2Model 3D 应用案例: ABAQUS基于CT断层扫描的细观混凝土三维重建…...
隐式对象和泛型
implicit object 作用: case class DatabaseConfig(driver:String,url:String)//作为函数的隐士参数的默认值implicit object MySqlDefault extends DatabaseConfig("mysql","localhost:443")def getConn(implicit config: DatabaseConfig):Uni…...
CSS的颜色表示方式
以下介绍几种常见的CSS颜色表示方式: 颜色名称 html和css规范中定义了147种可用的颜色名用的相对较少 16进制表示 css三原色:红、绿、蓝16进制的颜色值: #rrggbb16进制整数规定颜色成分,所有的值均介于 00 - ff 之间ÿ…...
单链表常见面试题 —— LeetCode
一.删除链表中与val相等的所有节点 1.题目描述 ----- 203. 移除链表元素 - 力扣(LeetCode) 给你一个链表的头节点 head 和一个整数 val ,请你删除链表中所有满足 Node.val val 的节点,并返回 新的头节点 。 列表中的节点数目在范…...
Pydantic中的discriminator:优雅地处理联合类型详解
Pydantic中的discriminator:优雅地处理联合类型详解 引言1. 什么是discriminator?2. 基本使用示例3. discriminator的工作原理4. 更复杂的实际应用场景5. 使用建议6. 潜在陷阱和注意事项结论最佳实践 引言 在Python的类型系统中,有时我们需要…...
pgloader SQLSERVER -> PostgreSQL 配置文件样例
pgloader 是什么?安装和基本用户法可以去其他同道的blog上去看,这里不占用网络空间了。刚开始用官方的文档读起还是很费劲的,所以把常用的配置例子放在这里。 官方文档:https://pgloader.readthedocs.io/en/latest/index.html 迁…...
APP、小程序对接聚合广告平台,有哪些广告变现策略?
开发者对接聚合广告平台,可以让自身流量价值最大化,获得更多的广告曝光机会,对接单一的广告联盟容易造成广告填充不足,收益不稳定的问题。#APP广告变现# APP开发者根据应用的生命周期、用户特征和产品定位,选择最适合…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
JAVA后端开发——多租户
数据隔离是多租户系统中的核心概念,确保一个租户(在这个系统中可能是一个公司或一个独立的客户)的数据对其他租户是不可见的。在 RuoYi 框架(您当前项目所使用的基础框架)中,这通常是通过在数据表中增加一个…...
Webpack性能优化:构建速度与体积优化策略
一、构建速度优化 1、升级Webpack和Node.js 优化效果:Webpack 4比Webpack 3构建时间降低60%-98%。原因: V8引擎优化(for of替代forEach、Map/Set替代Object)。默认使用更快的md4哈希算法。AST直接从Loa…...
Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storms…...
