当前位置: 首页 > news >正文

Flink - souce算子

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/

相关文章:

Flink - souce算子

水善利万物而不争&#xff0c;处众人之所恶&#xff0c;故几于道&#x1f4a6; 目录 1. 从Java的集合中读取数据 2. 从本地文件中读取数据 3. 从HDFS中读取数据 4. 从Socket中读取数据 5. 从Kafka中读取数据 6. 自定义Source 官方文档 - Flink1.13 1. 从Java的集合中读取数据 …...

使用vue creat搭建项目

一、查看是否安装node和npm&#xff08;显示版本号说明安装成功&#xff09; node -v npm -v 显示版本号说明安装成功&#xff0c;如果没有安装&#xff0c;则需要先安装。 二、安装vue-cli脚手架 查看安装的版本&#xff08;显示版本号说明安装成功&#xff09; vue -V 三…...

面试题 -- 基础知识

文章目录 1. 深拷贝 和 浅拷贝的区别2. 懒加载模式3. frame和bounds有什么不同&#xff1f;4. What is push notification&#xff1f;推送实现 5. 什么是序列化&#xff1f;6. 什么是安全释放7. 响应者链8. 简述沙盒机制 1. 深拷贝 和 浅拷贝的区别 浅拷贝是指针拷贝&#xf…...

Zabbix分布式监控快速入门

目录 1 Zabbix简介1.1 软件架构1.2 版本选择1.3 功能特性 2 安装与部署2.1 时间同步需求2.2 下载仓库官方源2.3 Zabbix-Server服务端的安装2.3.1 安装MySQL2.3.1.1 创建Zabbix数据库2.3.1.2 导入Zabbix库的数据文件 2.3.2 配置zabbix_server.conf2.3.3 开启Zabbix-Server服务2.…...

基于Spring包扫描工具和MybatisPlus逆向工程组件的数据表自动同步机制

公司产品产出的项目较多。同步数据库表结构工作很麻烦。一个alter语句要跑到N个客户机上执行脚本。超级费时麻烦。介于此&#xff0c;原有方案是把增量脚本放到一resource包下&#xff0c;项目启动时执行逐行执行一次。但由于模块开发人员较多&#xff0c;总有那么一两个机灵鬼…...

leetcode 面试题 0106.字符串压缩

⭐️ 题目描述 &#x1f31f; leetcode链接&#xff1a;面试题 0106.字符串压缩 思路&#xff1a; 开辟一个新的空间&#xff08;空间要大一点&#xff0c;因为可能压缩后的字符串比原字符串大&#xff09;&#xff0c;然后遍历原字符串统计当前字符的个数&#xff0c;再写入到…...

三、Spring源码-实例化

Spring源码-Bean的实例化 接下来我们看看Bean的实例化处理 一、BeanDefinition 首先我们来看看BeanDefinition的存放位置。因为Bean对象的实例化肯定是BeanFactory基于对应的BeanDefinition的定义来实现的&#xff0c;所以在这个过程中BeanDefinition是非常重要的&#xff0c;…...

算法的法律框架:预测未来的关键趋势

随着科技的飞速发展&#xff0c;算法和人工智能&#xff08;AI&#xff09;已成为我们社会生活的重要组成部分。然而&#xff0c;它们也带来了许多新的法律和道德挑战&#xff0c;这使得算法的法律框架变得日益重要。在这个背景下&#xff0c;预测未来算法法律框架的关键趋势成…...

Ubuntu Server版 之 共享文件 samba和NFS 两种方法

NFS 和 Samba NFS &#xff1a; linux之间资源共享 Samba&#xff1a; 是windows系统与Linux系统之间资源共享的 samba 安装samba 工具 sudo apt install samba 创建共享目录 sudo mkdir /home/shared sudo chmod 777 /home/shared 配置sambd sudo vim /etc/samba/smb.con…...

实时协作:团队效率倍增的关键

实时协作是指团队在当前时刻共同完成项目的能力。无论是否使用技术&#xff0c;都能实现这一点。然而&#xff0c;随着远程工作的盛行&#xff0c;安全的协作工具被用来让团队成员在项目和一般业务之间保持联系和同步。 传统协作与实时协作的区别 两种类型的协作最明显的区别…...

电脑选睡眠、休眠还是关机?

关机 这是大家最熟悉的。关机时&#xff0c;系统首先关闭所有运行中的程序&#xff0c;然后关闭系统后台服务。随后&#xff0c;系统向主板请求关机&#xff0c;主板断开电源的供电使能&#xff0c;让电源切断对绝大多数设备的供电&#xff08;只剩一些内部零件仍会维持电源供应…...

算法通关村第三关——不简单的数组增删改查

线性表基础 线性表概念 线性表就是具有相同特征数据元素的一个有限序列&#xff0c;其中包含元素的个数称为线性表的长度 线性表类型 从不同的角度看&#xff0c;线性表有不同的分类 语言实现角度 顺序表有两种实现方式 一体式 分离式 一体式结构 一体式&#xff1a;存储信息…...

【Linux】动静态库

目录 写在前面的话 如何编写静态库库 编写静态库 ar命令 Makefile自动化形成静态库 如何使用编写的静态库 1.拷贝到系统路径中 2.指定路径搜索 如何编写动态库 编写动态库 完善Makefile 如何使用编写的动态库 指定路径搜索(不可行及原因) 环境变量LD_LIBRARY_PAT…...

《kubernetes权威指南》-第一章学习笔记

1.什么是kubernetes&#xff1f; kubernetes是一个全新的基于容器技术的分布式架构领先方案。 2.为什么要用kubernetes&#xff1f; 使用kubernetes提供的解决方案能够减少30%的开发成本&#xff0c;并且能够将开发人员的精力更加集中于业务本身&#xff0c;同时可以降低系统…...

ubuntu 18.04 磁盘太满无法进入系统

安装了一个压缩包&#xff0c;装了一半提示磁盘空间少导致安装失败。我也没在意&#xff0c;退出虚拟机打算扩展硬盘。等我在虚拟机设置中完成扩展操作&#xff0c;准备进入虚拟机内部进行操作时&#xff0c;发现登录不进去了 shift 登入GUN GRUB设置项的问题 网上都是在开机…...

基于LNMP配置WordPress建站时出现的问题汇总

目录 wordpress上传文件报错问题描述原因分析&#xff1a;解决方案&#xff1a; wordpress裁剪图片报错问题描述原因分析&#xff1a;解决方案&#xff1a; 配置固定链接和伪链接 wordpress上传文件报错 WP内部错误&#xff0c;在上传文件时发生了错误&#xff0c;显示权限不足…...

【Spring Cloud】Gateway的配置与使用

文章目录 前言第一步&#xff0c;创建一个springboot工程第二步&#xff0c;添加依赖第三步&#xff0c;编写yml文件第四步&#xff0c;启动主启动类总结 前言 Gateway其实是springcloud 原生的东西&#xff0c;但是我还是想放在这里讲&#xff0c;因为我们使用nacos时&#x…...

概念、框架简介--ruoyi学习(一)

开始进行ruoyi框架的学习&#xff0c;比起其他的前后端不分离的&#xff0c;这个起码看的清晰一些吧。 这一节主要是看了ruoyi的官方文档后&#xff0c;记录了以下不懂的概念&#xff0c;并且整理了ruoyi框架中的相关内容。 一些概念 前端 store store是状态管理库&#x…...

IDEA的基础使用——【初识IDEA】

IDEA的基础使用——【初识IDEA】 文章目录 IDEA简介前言官网 IDEA的下载与安装选择下载路径勾选自己需要的其余按默认选项进行即可 目录简介安装目录简介 运行Hello WorldIDEA快捷键常用模板模板一&#xff1a;psvm&#xff08;main&#xff09;模板二&#xff1a;模板三&#…...

LeetCode刷题总结-动态规划篇

LeetCode刷题总结-动态规划篇 本文总结LeetCode上有动态规划的算法题&#xff0c;推荐刷题总数为54道。具体考点分析如下图&#xff1a; 1.中心扩展法 题号&#xff1a;132. 分割回文串 II&#xff0c;难度困难 2.背包问题 题号&#xff1a;140. 单词拆分 II&#xff0c;难…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

【决胜公务员考试】求职OMG——见面课测验1

2025最新版&#xff01;&#xff01;&#xff01;6.8截至答题&#xff0c;大家注意呀&#xff01; 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:&#xff08; B &#xff09; A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill

视觉语言模型&#xff08;Vision-Language Models, VLMs&#xff09;&#xff0c;为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展&#xff0c;机器人仍难以胜任复杂的长时程任务&#xff08;如家具装配&#xff09;&#xff0c;主要受限于人…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...

高考志愿填报管理系统---开发介绍

高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发&#xff0c;采用现代化的Web技术&#xff0c;为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## &#x1f4cb; 系统概述 ### &#x1f3af; 系统定…...