Flink—— Data Source 介绍
Data Source 简介
Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。
Flink 中你可以使用 StreamExecutionEnvironment.addSource(sourceFunction) 来为你的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然你也可以通过实现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink Data Source分类
Flink的数据源可以根据数据的来源和特性进行分类。以下是常见的Flink数据源分类:
集合数据源
集合数据源(Collection Data Source):集合数据源指的是将本地的集合或数组作为输入数据的数据源。在Flink中,可以使用fromCollection、fromElements等方法将Java或Scala中的集合数据转化为数据流进行处理。
1、fromCollection(Collection) - 从 Java 的 Java.util.Collection 创建数据流。集合中的所有元素类型必须相同。
2、fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class 指定了该迭代器返回元素的类型。
3、fromElements(T …) - 从给定的对象序列中创建数据流。所有对象类型必须相同。
4、fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class 指定了该迭代器返回元素的类型。
5、generateSequence(from, to) - 创建一个生成指定区间范围内的数字序列的并行数据流。
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;
import java.util.Arrays;
import java.util.List;public class CollectionDataSourceExample {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 创建一个包含整数的集合List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);// 将集合转化为Flink的DataSetDataSet<Integer> dataset = env.fromCollection(data);// 打印数据集中的元素dataset.print();}
}
关于使用集合数据源的注意事项:
-
数据规模:集合数据源适用于小规模数据集。确保你的数据集在内存中能够合理存放,不至于导致内存溢出。
-
内存消耗:集合数据源会将所有数据存储在内存中,因此需要谨慎处理大型数据集,避免对内存资源造成过大压力。
-
并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高作业的执行效率。
-
调试和测试:集合数据源非常适合用于本地调试和测试,可以快速验证处理逻辑并观察输出结果。
使用集合数据源时需要注意这些方面,以确保作业能够稳定运行并获得良好的性能表现。
文件数据源
文件数据源(File Data Source):文件数据源用于从文件系统中读取数据,可以是本地文件系统或分布式文件系统(如HDFS)。Flink提供了readTextFile、readCsvFile等方法来支持常见文件格式的数据读取。
1、readTextFile(path) - 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回。
2、readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)。
3、readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - 这是上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。你可以通过 pathFilter 进一步排除掉需要处理的文件。
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.DataSet;public class FileDataSourceExample {public static void main(String[] args) throws Exception {final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 从文件创建数据集String filePath = "path/to/your/file.txt";DataSet<String> text = env.readTextFile(filePath);// 打印文件中的内容text.print();}
}
关于使用文件数据源的注意事项:
-
文件路径:确保提供的文件路径是正确的,可以是本地文件系统路径,也可以是HDFS路径或其他支持的文件系统路径。
-
文件格式:Flink支持多种文件格式,包括文本文件、CSV文件、Parquet文件等。根据实际情况选择合适的文件格式进行读取。
-
并行度设置:在集群环境下,可以通过设置并行度来充分利用集群资源,提高文件读取的并行处理能力。
-
文件分区:对于大型文件,可以考虑文件分区和并行读取,以加速数据的加载和处理过程。
-
文件读取性能:尽量避免频繁的小文件读取操作,因为这会增加文件系统的负担并降低整体性能。
使用文件数据源时需要注意以上方面,以确保能够有效地读取文件数据,并且提高作业的执行效率。
Socket数据源
Socket数据源(Socket Data Source):Socket数据源允许通过网络套接字接收数据,通常用于测试和演示目的。Flink可以使用socketTextStream方法从TCP socket接收数据流。
socketTextStream(String hostname, int port) - 从 socket 读取。元素可以用分隔符切分。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SocketDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从socket创建数据流String hostname = "localhost";int port = 9999;env.socketTextStream(hostname, port).print();// 执行作业env.execute("Socket Data Source Example");}
}
关于使用Socket数据源的注意事项:
-
主机和端口:确保指定的主机和端口是正确的,并且能够与数据源通信。
-
网络延迟:由于Socket数据源涉及网络通信,因此可能受到网络延迟的影响。需要考虑网络性能对作业整体性能的影响。
-
并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。
-
数据格式:需要确保从Socket接收到的数据能够被正确解析和处理,例如按行读取文本数据等。
-
容错机制:在使用Socket数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。
使用Socket数据源时需要注意以上方面,以确保能够有效地接收数据并提高作业的执行效率。
自定义数据源
自定义数据源(Custom Data Source):除了上述内置的数据源外,Flink还支持自定义数据源。用户可以实现自己的SourceFunction接口来定义特定的数据生成逻辑,例如从消息队列、数据库、传感器等实时数据源中读取数据。
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class CustomDataSource extends RichParallelSourceFunction<String> {private boolean running = true;@Overridepublic void run(SourceContext<String> ctx) throws Exception {while (running) {// 生成数据String data = generateData();// 发射数据ctx.collect(data);// 控制数据生成频率Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}private String generateData() {// 实现自定义的数据生成逻辑return "some data";}
}
在这个示例中,我们创建了一个名为CustomDataSource的类,它继承自RichParallelSourceFunction并指定了数据类型为String。在run方法中,我们使用一个循环来生成数据并通过collect方法将数据发射出去。在cancel方法中,我们设置了一个标志位来控制数据源的运行状态。
关于使用自定义数据源的注意事项:
-
并行度设置:根据数据源的性质和数据量合理地设置并行度,以充分利用集群资源。
-
数据生成频率:确保数据生成的频率和速度能够适应作业的处理能力,避免数据源产生过快导致作业无法及时处理。
-
容错机制:在自定义数据源中,需要考虑作业的容错机制,例如在发生故障时如何正确处理和恢复。
-
数据格式:确保从自定义数据源产生的数据能够被正确解析和处理,符合作业的输入要求。
-
资源管理:需要确保自定义数据源的资源占用和生命周期管理,避免资源泄露或过度占用资源。
使用自定义数据源时需要考虑以上方面,并确保能够有效地产生数据并提高作业的执行效率。
Apache Kafka数据源
Apache Kafka数据源(Kafka Data Source):作为流数据处理框架,Flink对Kafka提供了良好的集成支持。可以使用addSource方法结合Flink的Kafka Connector来从Kafka主题中读取数据。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-consumer-group");// 创建Kafka数据流FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);kafkaDataStream.print();// 执行作业env.execute("Kafka Data Source Example");}
}
在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后设置Kafka的连接配置,包括bootstrap servers和consumer group id等。接下来,我们创建了一个FlinkKafkaConsumer对象,指定了要消费的topic以及数据的序列化方式,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。
关于使用Kafka数据源的注意事项:
-
Kafka配置:确保指定的Kafka配置正确,并能够与Kafka集群进行通信。
-
序列化方式:根据实际情况选择合适的数据序列化方式,例如SimpleStringSchema、JSON、Avro等。
-
并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。
-
数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。
-
容错机制:在使用Kafka数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。
使用Kafka数据源时需要注意以上方面,以确保能够有效地消费Kafka中的数据并提高作业的执行效率。
Apache Pulsar数据源
Apache Pulsar数据源(Pulsar Data Source):类似于Kafka,Flink也集成了对Pulsar的支持,可以直接从Pulsar主题中读取数据。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException;public class PulsarDataSourceExample {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String serviceUrl = "pulsar://localhost:6650";String topic = "my-topic";FlinkPulsarSource<String> pulsarSource = new FlinkPulsarSource<>(serviceUrl,topic,Schema.STRING);DataStream<String> pulsarDataStream = env.addSource(pulsarSource);pulsarDataStream.print();env.execute("Pulsar Data Source Example");}
}
在这个示例中,我们首先创建了一个StreamExecutionEnvironment对象,然后指定了Pulsar的连接信息和要消费的topic。接下来,我们创建了一个FlinkPulsarSource对象,并指定了Pulsar的serviceUrl、topic以及数据的Schema,并将其添加到流处理环境中。最后,我们通过调用print方法来打印数据流中的内容,并通过execute方法启动作业并执行。
关于使用Pulsar数据源的注意事项:
-
Pulsar连接配置:确保指定的Pulsar连接信息正确,并能够与Pulsar集群进行通信。
-
Schema设置:根据实际情况选择合适的数据Schema,例如STRING、JSON、AVRO等。
-
并行度设置:可以通过设置并行度来充分利用集群资源,提高数据流处理的并行能力。
-
数据消费策略:需要考虑消费数据的策略,如是否从最新/最旧的数据开始消费,以及如何处理消费过程中的偏移量。
-
容错机制:在使用Pulsar数据源时,需要考虑作业的容错机制,以确保在发生故障或数据丢失时能够正确处理和恢复。
使用Pulsar数据源时需要注意以上方面,以确保能够有效地消费Pulsar中的数据并提高作业的执行效率。
这些不同类型的数据源为Flink应用程序提供了灵活的数据接入方式,使得Flink可以轻松地处理不同来源和格式的数据。根据具体的业务需求和场景特点,可以选择合适的数据源类型来构建流处理和批处理应用程序。
更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)
相关文章:
Flink—— Data Source 介绍
Data Source 简介 Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来ÿ…...
树之二叉排序树(二叉搜索树)
什么是排序树 说一下普通二叉树可不是左小右大的 插入的新节点是以叶子形式进行插入的 二叉排序树的中序遍历结果是一个升序的序列 下面是两个典型的二叉排序树 二叉排序树的操作 构造树的过程即是对无序序列进行排序的过程。 存储结构 通常采用二叉链表作为存储结构 不能 …...
管易云与电商平台的无代码集成:实现API连接与用户运营
管易云简介及其与电商平台的合作 金蝶管易云是金蝶集团旗下以电商为核心业务的子公司,是国内最早的电商ERP服务商之一,总部在上海,与淘宝、天猫、 京东、拼多多、抖音等300多家主流电商平台建立合作关系,同时管易云是互联网平台首…...
ElementUI的el-upload上传组件与表单一起提交遇到的各种问题以及解决办法(超详细,每个步骤都有详细解读)
背景: 使用ruoyi-vue进行2次开发,需要实现表单与文件上传一起提交,并且文件上传有4个,且文件校验很复杂,因此ruoyi-vue集成的上传组件FileUpload调试几天后发现真不太适用,最终选择element UI原生组件el-upload(FileUpload也是基于el-upload实现的),要实现表单与文件同…...
python flask_restful “message“: “Failed to decode JSON object: None“
1、问题表现 "message": "Failed to decode JSON object: None"2、出现的原因 Werkzeug 版本过高 3、解决方案 pip install Werkzeug2.0解决效果 可以正常显示json数据了 {"message": {"rate": "参数错误"} }...
Linux内核有什么之内存管理子系统有什么第六回 —— 小内存分配(4)
接前一篇文章:Linux内核有什么之内存管理子系统有什么第五回 —— 小内存分配(3) 本文内容参考: linux进程虚拟地址空间 《趣谈Linux操作系统 核心原理篇:第四部分 内存管理—— 刘超》 特此致谢! 二、小…...
【OpenHarmony内核】Harmony内核之线程操作函数(二)
文章目录 前言一、获取线程优先级二、转交控制运行权三、挂起线程3.1 线程的挂起是什么意思?3.2 函数介绍四、恢复线程五、分离指定的线程5.1 分离线程是什么意思5.2 函数介绍六、等待线程终止运行七、终止当前线程的运行八、终止指定线程的运行九、获取活跃线程数总结前言 O…...
二十五、W5100S/W5500+RP2040树莓派Pico<Modebus TCP Server示例>
文章目录 1 前言2 简介2 .1 什么是Modbus TCP?2.2 Modbus TCP指令介绍2.3 请求数据过程2.4 Modbus TCP协议优点2.5 Modbus TCP应用场景 3 WIZnet以太网芯片4 Modbus TCP示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意…...
Android画个圆点状态灯
1、创建一个 XML 文件在 res/drawable 目录下(默认为黑色) <?xml version"1.0" encoding"utf-8"?> <shape xmlns:android"http://schemas.android.com/apk/res/android"android:shape"oval"><…...
高性能网络编程 - 解读3种线程模型
文章目录 Pre线程模型1:传统阻塞 I/O 服务模型线程模型2:Reactor 模式Reactor 模式的基本设计思想Reactor 模式中的关键组成3种典型实现单 Reactor 单线程单 Reactor 多线程主从 Reactor 多线程 小结 线程模型3:Proactor 模型 Pre 高性能网络…...
MATLAB中deconvwnr函数用法
目录 语法 说明 示例 使用 Wiener 滤波对图像进行去模糊处理 deconvwnr函数的功能是使用 Wiener 滤波对图像进行去模糊处理。 语法 J deconvwnr(I,psf,nsr) J deconvwnr(I,psf,ncorr,icorr) J deconvwnr(I,psf) 说明 J deconvwnr(I,psf,nsr) 使用 Wiener 滤波算法对…...
赛宁网安入选国家工业信息安全漏洞库(CICSVD)2023年度技术组成员单
近日,由国家工业信息安全发展研究中心、工业信息安全产业发展联盟主办的“2023工业信息安全大会”在北京成功举行。 会上,国家工业信息安全发展研究中心对为国家工业信息安全漏洞库(CICSVD)提供技术支持的单位授牌表彰。北京赛宁…...
Git系列之Git集成开发工具及git扩展使用
🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《Git实战开发》。🎯🎯 &a…...
selenium headless 无头模式慢
selenium设置headlessTrue发现非常慢,headlessFalse要快很多。 最后测试发现升级到selenium最新版本,selenium4.15.2。设置--headlessnew,解决了,速度正常了。 新版selenium有了两种headless模式,参见:He…...
快速修复因相机断电导致视频文件打不开的问题
3-5 本文主要解决因相机突然断电导致拍摄的视频文件打不开的问题。 在日常工作中,有时候需要使用相机拍摄视频,比如现在有不少短视频拍摄的需求,如果因电池突然断电的原因,导致拍出来的视频播放不了,这时候就容易出大…...
Ceph 笔记, ssh写入缓存
硬件建议 — Ceph 文档 写入缓存 企业级 SSD 和 HDD 通常包括断电保护功能,包括 在运行时断电时确保数据耐久性,以及 使用多级缓存来加快直接或同步写入速度。这些设备 可以在两种缓存模式之间切换 -- 刷新到的易失性缓存 具有 fsync 的持久性媒体&a…...
WebSocket魔法师:打造实时应用的无限可能
1、背景 在开发一些前端页面的时候,总是能接收到这样的需求:如何保持页面并实现自动更新数据呢?以往的常规做法,是前端使用定时轮询后端接口,获取响应后重新渲染前端页面,这种做法虽然能达到类似的效果&…...
网络运维Day06-补充
文章目录 RAID磁盘阵列RAID0条带模式RAID1镜像模式RAID5高性价比模式RAID01RAID10 逻辑卷一块磁盘的使用流程逻辑卷的使用流程 制作逻辑卷步骤一:添加硬盘步骤二:分区规划步骤三:制作物理卷步骤四:制作卷组步骤五:制作…...
openssl+SM2开发实例一(含源码)
一、SM2算法介绍 SM2(国密算法2) 是中国国家密码管理局(CNCA)颁布的椭圆曲线密码算法标准,属于非对称加密算法。它基于椭圆曲线离散对数问题,提供了安全可靠的数字签名、密钥交换和公钥加密等功能。SM2被设…...
操作系统 | 编写内核
🌈个人主页:Sarapines Programmer🔥 系列专栏:《操作系统实验室》🔖少年有梦不应止于心动,更要付诸行动。 目录结构 1. 操作系统实验之编写内核 1.1 实验目的 1.2 实验内容 1.3 实验步骤 1.4 实验过程 …...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
微服务商城-商品微服务
数据表 CREATE TABLE product (id bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 商品id,cateid smallint(6) UNSIGNED NOT NULL DEFAULT 0 COMMENT 类别Id,name varchar(100) NOT NULL DEFAULT COMMENT 商品名称,subtitle varchar(200) NOT NULL DEFAULT COMMENT 商…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态
前言 在人工智能技术飞速发展的今天,深度学习与大模型技术已成为推动行业变革的核心驱动力,而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心,系统性地呈现了两部深度技术著作的精华:…...
