Flink - souce算子
水善利万物而不争,处众人之所恶,故几于道💦
目录
1. 从Java的集合中读取数据
2. 从本地文件中读取数据
3. 从HDFS中读取数据
4. 从Socket中读取数据
5. 从Kafka中读取数据
6. 自定义Source
官方文档 - Flink1.13
1. 从Java的集合中读取数据
fromCollection(waterSensors)
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
2. 从本地文件中读取数据
readTextFile(“input/words.txt”)
,支持相对路径和绝对路径
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
运行结果:
3. 从HDFS中读取数据
readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)
要先在pom文件中添加hadoop-client依赖:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
4. 从Socket中读取数据
socketTextStream(“hadoop101”,9999)
,这个输入源不支持多个并行度。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据, windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
5. 从Kafka中读取数据
addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))
第一个参数是topic,
第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)
第三个参数是Kafka的配置。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:
6. 自定义Source
addSource(new XXXX())
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.
public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
自定义数据源需要定义一个类,然后实现SourceFunction
接口,然后实现其中的两个方法,run
和cancel
,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止
public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}
运行结果:
demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/
相关文章:

Flink - souce算子
水善利万物而不争,处众人之所恶,故几于道💦 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 …...

使用vue creat搭建项目
一、查看是否安装node和npm(显示版本号说明安装成功) node -v npm -v 显示版本号说明安装成功,如果没有安装,则需要先安装。 二、安装vue-cli脚手架 查看安装的版本(显示版本号说明安装成功) vue -V 三…...
面试题 -- 基础知识
文章目录 1. 深拷贝 和 浅拷贝的区别2. 懒加载模式3. frame和bounds有什么不同?4. What is push notification?推送实现 5. 什么是序列化?6. 什么是安全释放7. 响应者链8. 简述沙盒机制 1. 深拷贝 和 浅拷贝的区别 浅拷贝是指针拷贝…...

Zabbix分布式监控快速入门
目录 1 Zabbix简介1.1 软件架构1.2 版本选择1.3 功能特性 2 安装与部署2.1 时间同步需求2.2 下载仓库官方源2.3 Zabbix-Server服务端的安装2.3.1 安装MySQL2.3.1.1 创建Zabbix数据库2.3.1.2 导入Zabbix库的数据文件 2.3.2 配置zabbix_server.conf2.3.3 开启Zabbix-Server服务2.…...

基于Spring包扫描工具和MybatisPlus逆向工程组件的数据表自动同步机制
公司产品产出的项目较多。同步数据库表结构工作很麻烦。一个alter语句要跑到N个客户机上执行脚本。超级费时麻烦。介于此,原有方案是把增量脚本放到一resource包下,项目启动时执行逐行执行一次。但由于模块开发人员较多,总有那么一两个机灵鬼…...

leetcode 面试题 0106.字符串压缩
⭐️ 题目描述 🌟 leetcode链接:面试题 0106.字符串压缩 思路: 开辟一个新的空间(空间要大一点,因为可能压缩后的字符串比原字符串大),然后遍历原字符串统计当前字符的个数,再写入到…...

三、Spring源码-实例化
Spring源码-Bean的实例化 接下来我们看看Bean的实例化处理 一、BeanDefinition 首先我们来看看BeanDefinition的存放位置。因为Bean对象的实例化肯定是BeanFactory基于对应的BeanDefinition的定义来实现的,所以在这个过程中BeanDefinition是非常重要的,…...
算法的法律框架:预测未来的关键趋势
随着科技的飞速发展,算法和人工智能(AI)已成为我们社会生活的重要组成部分。然而,它们也带来了许多新的法律和道德挑战,这使得算法的法律框架变得日益重要。在这个背景下,预测未来算法法律框架的关键趋势成…...

Ubuntu Server版 之 共享文件 samba和NFS 两种方法
NFS 和 Samba NFS : linux之间资源共享 Samba: 是windows系统与Linux系统之间资源共享的 samba 安装samba 工具 sudo apt install samba 创建共享目录 sudo mkdir /home/shared sudo chmod 777 /home/shared 配置sambd sudo vim /etc/samba/smb.con…...

实时协作:团队效率倍增的关键
实时协作是指团队在当前时刻共同完成项目的能力。无论是否使用技术,都能实现这一点。然而,随着远程工作的盛行,安全的协作工具被用来让团队成员在项目和一般业务之间保持联系和同步。 传统协作与实时协作的区别 两种类型的协作最明显的区别…...

电脑选睡眠、休眠还是关机?
关机 这是大家最熟悉的。关机时,系统首先关闭所有运行中的程序,然后关闭系统后台服务。随后,系统向主板请求关机,主板断开电源的供电使能,让电源切断对绝大多数设备的供电(只剩一些内部零件仍会维持电源供应…...

算法通关村第三关——不简单的数组增删改查
线性表基础 线性表概念 线性表就是具有相同特征数据元素的一个有限序列,其中包含元素的个数称为线性表的长度 线性表类型 从不同的角度看,线性表有不同的分类 语言实现角度 顺序表有两种实现方式 一体式 分离式 一体式结构 一体式:存储信息…...

【Linux】动静态库
目录 写在前面的话 如何编写静态库库 编写静态库 ar命令 Makefile自动化形成静态库 如何使用编写的静态库 1.拷贝到系统路径中 2.指定路径搜索 如何编写动态库 编写动态库 完善Makefile 如何使用编写的动态库 指定路径搜索(不可行及原因) 环境变量LD_LIBRARY_PAT…...
《kubernetes权威指南》-第一章学习笔记
1.什么是kubernetes? kubernetes是一个全新的基于容器技术的分布式架构领先方案。 2.为什么要用kubernetes? 使用kubernetes提供的解决方案能够减少30%的开发成本,并且能够将开发人员的精力更加集中于业务本身,同时可以降低系统…...

ubuntu 18.04 磁盘太满无法进入系统
安装了一个压缩包,装了一半提示磁盘空间少导致安装失败。我也没在意,退出虚拟机打算扩展硬盘。等我在虚拟机设置中完成扩展操作,准备进入虚拟机内部进行操作时,发现登录不进去了 shift 登入GUN GRUB设置项的问题 网上都是在开机…...
基于LNMP配置WordPress建站时出现的问题汇总
目录 wordpress上传文件报错问题描述原因分析:解决方案: wordpress裁剪图片报错问题描述原因分析:解决方案: 配置固定链接和伪链接 wordpress上传文件报错 WP内部错误,在上传文件时发生了错误,显示权限不足…...

【Spring Cloud】Gateway的配置与使用
文章目录 前言第一步,创建一个springboot工程第二步,添加依赖第三步,编写yml文件第四步,启动主启动类总结 前言 Gateway其实是springcloud 原生的东西,但是我还是想放在这里讲,因为我们使用nacos时&#x…...

概念、框架简介--ruoyi学习(一)
开始进行ruoyi框架的学习,比起其他的前后端不分离的,这个起码看的清晰一些吧。 这一节主要是看了ruoyi的官方文档后,记录了以下不懂的概念,并且整理了ruoyi框架中的相关内容。 一些概念 前端 store store是状态管理库&#x…...

IDEA的基础使用——【初识IDEA】
IDEA的基础使用——【初识IDEA】 文章目录 IDEA简介前言官网 IDEA的下载与安装选择下载路径勾选自己需要的其余按默认选项进行即可 目录简介安装目录简介 运行Hello WorldIDEA快捷键常用模板模板一:psvm(main)模板二:模板三&#…...

LeetCode刷题总结-动态规划篇
LeetCode刷题总结-动态规划篇 本文总结LeetCode上有动态规划的算法题,推荐刷题总数为54道。具体考点分析如下图: 1.中心扩展法 题号:132. 分割回文串 II,难度困难 2.背包问题 题号:140. 单词拆分 II,难…...

UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...

剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放
简介 前面两期文章我们介绍了I2S的读取和写入,一个是通过INMP441麦克风模块采集音频,一个是通过PCM5102A模块播放音频,那如果我们将两者结合起来,将麦克风采集到的音频通过PCM5102A播放,是不是就可以做一个扩音器了呢…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...

回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释
以Module Federation 插件详为例,Webpack.config.js它可能的配置和含义如下: 前言 Module Federation 的Webpack.config.js核心配置包括: name filename(定义应用标识) remotes(引用远程模块࿰…...