当前位置: 首页 > news >正文

Flink - souce算子

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/

相关文章:

Flink - souce算子

水善利万物而不争&#xff0c;处众人之所恶&#xff0c;故几于道&#x1f4a6; 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 …...

使用vue creat搭建项目

一、查看是否安装node和npm&#xff08;显示版本号说明安装成功&#xff09; node -v npm -v 显示版本号说明安装成功&#xff0c;如果没有安装&#xff0c;则需要先安装。 二、安装vue-cli脚手架 查看安装的版本&#xff08;显示版本号说明安装成功&#xff09; vue -V 三…...

面试题 -- 基础知识

文章目录 1. 深拷贝 和 浅拷贝的区别2. 懒加载模式3. frame和bounds有什么不同&#xff1f;4. What is push notification&#xff1f;推送实现 5. 什么是序列化&#xff1f;6. 什么是安全释放7. 响应者链8. 简述沙盒机制 1. 深拷贝 和 浅拷贝的区别 浅拷贝是指针拷贝&#xf…...

Zabbix分布式监控快速入门

目录 1 Zabbix简介1.1 软件架构1.2 版本选择1.3 功能特性 2 安装与部署2.1 时间同步需求2.2 下载仓库官方源2.3 Zabbix-Server服务端的安装2.3.1 安装MySQL2.3.1.1 创建Zabbix数据库2.3.1.2 导入Zabbix库的数据文件 2.3.2 配置zabbix_server.conf2.3.3 开启Zabbix-Server服务2.…...

基于Spring包扫描工具和MybatisPlus逆向工程组件的数据表自动同步机制

公司产品产出的项目较多。同步数据库表结构工作很麻烦。一个alter语句要跑到N个客户机上执行脚本。超级费时麻烦。介于此&#xff0c;原有方案是把增量脚本放到一resource包下&#xff0c;项目启动时执行逐行执行一次。但由于模块开发人员较多&#xff0c;总有那么一两个机灵鬼…...

leetcode 面试题 0106.字符串压缩

⭐️ 题目描述 &#x1f31f; leetcode链接&#xff1a;面试题 0106.字符串压缩 思路&#xff1a; 开辟一个新的空间&#xff08;空间要大一点&#xff0c;因为可能压缩后的字符串比原字符串大&#xff09;&#xff0c;然后遍历原字符串统计当前字符的个数&#xff0c;再写入到…...

三、Spring源码-实例化

Spring源码-Bean的实例化 接下来我们看看Bean的实例化处理 一、BeanDefinition 首先我们来看看BeanDefinition的存放位置。因为Bean对象的实例化肯定是BeanFactory基于对应的BeanDefinition的定义来实现的&#xff0c;所以在这个过程中BeanDefinition是非常重要的&#xff0c;…...

算法的法律框架:预测未来的关键趋势

随着科技的飞速发展&#xff0c;算法和人工智能&#xff08;AI&#xff09;已成为我们社会生活的重要组成部分。然而&#xff0c;它们也带来了许多新的法律和道德挑战&#xff0c;这使得算法的法律框架变得日益重要。在这个背景下&#xff0c;预测未来算法法律框架的关键趋势成…...

Ubuntu Server版 之 共享文件 samba和NFS 两种方法

NFS 和 Samba NFS &#xff1a; linux之间资源共享 Samba&#xff1a; 是windows系统与Linux系统之间资源共享的 samba 安装samba 工具 sudo apt install samba 创建共享目录 sudo mkdir /home/shared sudo chmod 777 /home/shared 配置sambd sudo vim /etc/samba/smb.con…...

实时协作:团队效率倍增的关键

实时协作是指团队在当前时刻共同完成项目的能力。无论是否使用技术&#xff0c;都能实现这一点。然而&#xff0c;随着远程工作的盛行&#xff0c;安全的协作工具被用来让团队成员在项目和一般业务之间保持联系和同步。 传统协作与实时协作的区别 两种类型的协作最明显的区别…...

电脑选睡眠、休眠还是关机?

关机 这是大家最熟悉的。关机时&#xff0c;系统首先关闭所有运行中的程序&#xff0c;然后关闭系统后台服务。随后&#xff0c;系统向主板请求关机&#xff0c;主板断开电源的供电使能&#xff0c;让电源切断对绝大多数设备的供电&#xff08;只剩一些内部零件仍会维持电源供应…...

算法通关村第三关——不简单的数组增删改查

线性表基础 线性表概念 线性表就是具有相同特征数据元素的一个有限序列&#xff0c;其中包含元素的个数称为线性表的长度 线性表类型 从不同的角度看&#xff0c;线性表有不同的分类 语言实现角度 顺序表有两种实现方式 一体式 分离式 一体式结构 一体式&#xff1a;存储信息…...

【Linux】动静态库

目录 写在前面的话 如何编写静态库库 编写静态库 ar命令 Makefile自动化形成静态库 如何使用编写的静态库 1.拷贝到系统路径中 2.指定路径搜索 如何编写动态库 编写动态库 完善Makefile 如何使用编写的动态库 指定路径搜索(不可行及原因) 环境变量LD_LIBRARY_PAT…...

《kubernetes权威指南》-第一章学习笔记

1.什么是kubernetes&#xff1f; kubernetes是一个全新的基于容器技术的分布式架构领先方案。 2.为什么要用kubernetes&#xff1f; 使用kubernetes提供的解决方案能够减少30%的开发成本&#xff0c;并且能够将开发人员的精力更加集中于业务本身&#xff0c;同时可以降低系统…...

ubuntu 18.04 磁盘太满无法进入系统

安装了一个压缩包&#xff0c;装了一半提示磁盘空间少导致安装失败。我也没在意&#xff0c;退出虚拟机打算扩展硬盘。等我在虚拟机设置中完成扩展操作&#xff0c;准备进入虚拟机内部进行操作时&#xff0c;发现登录不进去了 shift 登入GUN GRUB设置项的问题 网上都是在开机…...

基于LNMP配置WordPress建站时出现的问题汇总

目录 wordpress上传文件报错问题描述原因分析&#xff1a;解决方案&#xff1a; wordpress裁剪图片报错问题描述原因分析&#xff1a;解决方案&#xff1a; 配置固定链接和伪链接 wordpress上传文件报错 WP内部错误&#xff0c;在上传文件时发生了错误&#xff0c;显示权限不足…...

【Spring Cloud】Gateway的配置与使用

文章目录 前言第一步&#xff0c;创建一个springboot工程第二步&#xff0c;添加依赖第三步&#xff0c;编写yml文件第四步&#xff0c;启动主启动类总结 前言 Gateway其实是springcloud 原生的东西&#xff0c;但是我还是想放在这里讲&#xff0c;因为我们使用nacos时&#x…...

概念、框架简介--ruoyi学习(一)

开始进行ruoyi框架的学习&#xff0c;比起其他的前后端不分离的&#xff0c;这个起码看的清晰一些吧。 这一节主要是看了ruoyi的官方文档后&#xff0c;记录了以下不懂的概念&#xff0c;并且整理了ruoyi框架中的相关内容。 一些概念 前端 store store是状态管理库&#x…...

IDEA的基础使用——【初识IDEA】

IDEA的基础使用——【初识IDEA】 文章目录 IDEA简介前言官网 IDEA的下载与安装选择下载路径勾选自己需要的其余按默认选项进行即可 目录简介安装目录简介 运行Hello WorldIDEA快捷键常用模板模板一&#xff1a;psvm&#xff08;main&#xff09;模板二&#xff1a;模板三&#…...

LeetCode刷题总结-动态规划篇

LeetCode刷题总结-动态规划篇 本文总结LeetCode上有动态规划的算法题&#xff0c;推荐刷题总数为54道。具体考点分析如下图&#xff1a; 1.中心扩展法 题号&#xff1a;132. 分割回文串 II&#xff0c;难度困难 2.背包问题 题号&#xff1a;140. 单词拆分 II&#xff0c;难…...

RestClient

什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端&#xff0c;它允许HTTP与Elasticsearch 集群通信&#xff0c;而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级&#xff…...

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...