Flink - souce算子
水善利万物而不争,处众人之所恶,故几于道💦
目录
1. 从Java的集合中读取数据
2. 从本地文件中读取数据
3. 从HDFS中读取数据
4. 从Socket中读取数据
5. 从Kafka中读取数据
6. 自定义Source
官方文档 - Flink1.13

1. 从Java的集合中读取数据
fromCollection(waterSensors)
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:

2. 从本地文件中读取数据
readTextFile(“input/words.txt”),支持相对路径和绝对路径
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
运行结果:

3. 从HDFS中读取数据
readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)
要先在pom文件中添加hadoop-client依赖:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:

4. 从Socket中读取数据
socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据, windows中 nc -lp 9999 Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:

5. 从Kafka中读取数据
addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))
第一个参数是topic,
第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)
第三个参数是Kafka的配置。
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}
运行结果:

6. 自定义Source
addSource(new XXXX())
大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.
public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,run和cancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止
public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}
运行结果:

demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/
相关文章:
Flink - souce算子
水善利万物而不争,处众人之所恶,故几于道💦 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 …...
使用vue creat搭建项目
一、查看是否安装node和npm(显示版本号说明安装成功) node -v npm -v 显示版本号说明安装成功,如果没有安装,则需要先安装。 二、安装vue-cli脚手架 查看安装的版本(显示版本号说明安装成功) vue -V 三…...
面试题 -- 基础知识
文章目录 1. 深拷贝 和 浅拷贝的区别2. 懒加载模式3. frame和bounds有什么不同?4. What is push notification?推送实现 5. 什么是序列化?6. 什么是安全释放7. 响应者链8. 简述沙盒机制 1. 深拷贝 和 浅拷贝的区别 浅拷贝是指针拷贝…...
Zabbix分布式监控快速入门
目录 1 Zabbix简介1.1 软件架构1.2 版本选择1.3 功能特性 2 安装与部署2.1 时间同步需求2.2 下载仓库官方源2.3 Zabbix-Server服务端的安装2.3.1 安装MySQL2.3.1.1 创建Zabbix数据库2.3.1.2 导入Zabbix库的数据文件 2.3.2 配置zabbix_server.conf2.3.3 开启Zabbix-Server服务2.…...
基于Spring包扫描工具和MybatisPlus逆向工程组件的数据表自动同步机制
公司产品产出的项目较多。同步数据库表结构工作很麻烦。一个alter语句要跑到N个客户机上执行脚本。超级费时麻烦。介于此,原有方案是把增量脚本放到一resource包下,项目启动时执行逐行执行一次。但由于模块开发人员较多,总有那么一两个机灵鬼…...
leetcode 面试题 0106.字符串压缩
⭐️ 题目描述 🌟 leetcode链接:面试题 0106.字符串压缩 思路: 开辟一个新的空间(空间要大一点,因为可能压缩后的字符串比原字符串大),然后遍历原字符串统计当前字符的个数,再写入到…...
三、Spring源码-实例化
Spring源码-Bean的实例化 接下来我们看看Bean的实例化处理 一、BeanDefinition 首先我们来看看BeanDefinition的存放位置。因为Bean对象的实例化肯定是BeanFactory基于对应的BeanDefinition的定义来实现的,所以在这个过程中BeanDefinition是非常重要的,…...
算法的法律框架:预测未来的关键趋势
随着科技的飞速发展,算法和人工智能(AI)已成为我们社会生活的重要组成部分。然而,它们也带来了许多新的法律和道德挑战,这使得算法的法律框架变得日益重要。在这个背景下,预测未来算法法律框架的关键趋势成…...
Ubuntu Server版 之 共享文件 samba和NFS 两种方法
NFS 和 Samba NFS : linux之间资源共享 Samba: 是windows系统与Linux系统之间资源共享的 samba 安装samba 工具 sudo apt install samba 创建共享目录 sudo mkdir /home/shared sudo chmod 777 /home/shared 配置sambd sudo vim /etc/samba/smb.con…...
实时协作:团队效率倍增的关键
实时协作是指团队在当前时刻共同完成项目的能力。无论是否使用技术,都能实现这一点。然而,随着远程工作的盛行,安全的协作工具被用来让团队成员在项目和一般业务之间保持联系和同步。 传统协作与实时协作的区别 两种类型的协作最明显的区别…...
电脑选睡眠、休眠还是关机?
关机 这是大家最熟悉的。关机时,系统首先关闭所有运行中的程序,然后关闭系统后台服务。随后,系统向主板请求关机,主板断开电源的供电使能,让电源切断对绝大多数设备的供电(只剩一些内部零件仍会维持电源供应…...
算法通关村第三关——不简单的数组增删改查
线性表基础 线性表概念 线性表就是具有相同特征数据元素的一个有限序列,其中包含元素的个数称为线性表的长度 线性表类型 从不同的角度看,线性表有不同的分类 语言实现角度 顺序表有两种实现方式 一体式 分离式 一体式结构 一体式:存储信息…...
【Linux】动静态库
目录 写在前面的话 如何编写静态库库 编写静态库 ar命令 Makefile自动化形成静态库 如何使用编写的静态库 1.拷贝到系统路径中 2.指定路径搜索 如何编写动态库 编写动态库 完善Makefile 如何使用编写的动态库 指定路径搜索(不可行及原因) 环境变量LD_LIBRARY_PAT…...
《kubernetes权威指南》-第一章学习笔记
1.什么是kubernetes? kubernetes是一个全新的基于容器技术的分布式架构领先方案。 2.为什么要用kubernetes? 使用kubernetes提供的解决方案能够减少30%的开发成本,并且能够将开发人员的精力更加集中于业务本身,同时可以降低系统…...
ubuntu 18.04 磁盘太满无法进入系统
安装了一个压缩包,装了一半提示磁盘空间少导致安装失败。我也没在意,退出虚拟机打算扩展硬盘。等我在虚拟机设置中完成扩展操作,准备进入虚拟机内部进行操作时,发现登录不进去了 shift 登入GUN GRUB设置项的问题 网上都是在开机…...
基于LNMP配置WordPress建站时出现的问题汇总
目录 wordpress上传文件报错问题描述原因分析:解决方案: wordpress裁剪图片报错问题描述原因分析:解决方案: 配置固定链接和伪链接 wordpress上传文件报错 WP内部错误,在上传文件时发生了错误,显示权限不足…...
【Spring Cloud】Gateway的配置与使用
文章目录 前言第一步,创建一个springboot工程第二步,添加依赖第三步,编写yml文件第四步,启动主启动类总结 前言 Gateway其实是springcloud 原生的东西,但是我还是想放在这里讲,因为我们使用nacos时&#x…...
概念、框架简介--ruoyi学习(一)
开始进行ruoyi框架的学习,比起其他的前后端不分离的,这个起码看的清晰一些吧。 这一节主要是看了ruoyi的官方文档后,记录了以下不懂的概念,并且整理了ruoyi框架中的相关内容。 一些概念 前端 store store是状态管理库&#x…...
IDEA的基础使用——【初识IDEA】
IDEA的基础使用——【初识IDEA】 文章目录 IDEA简介前言官网 IDEA的下载与安装选择下载路径勾选自己需要的其余按默认选项进行即可 目录简介安装目录简介 运行Hello WorldIDEA快捷键常用模板模板一:psvm(main)模板二:模板三&#…...
LeetCode刷题总结-动态规划篇
LeetCode刷题总结-动态规划篇 本文总结LeetCode上有动态规划的算法题,推荐刷题总数为54道。具体考点分析如下图: 1.中心扩展法 题号:132. 分割回文串 II,难度困难 2.背包问题 题号:140. 单词拆分 II,难…...
19c补丁后oracle属主变化,导致不能识别磁盘组
补丁后服务器重启,数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后,存在与用户组权限相关的问题。具体表现为,Oracle 实例的运行用户(oracle)和集…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
Vue记事本应用实现教程
文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展:显示创建时间8. 功能扩展:记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)
在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马(服务器方面的)的原理,连接,以及各种木马及连接工具的分享 文件木马:https://w…...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...
