【API篇】五、Flink分流合流API
文章目录
- 1、filter算子实现分流
- 2、分流:使用侧输出流
- 3、合流:union
- 4、合流:connect
- 5、connect案例
分流,很形象的一个词,就像一条大河,遇到岸边有分叉的,而形成了主流和测流。对于数据流也一样,不过是一个个水滴替换成了一条条数据。
将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
1、filter算子实现分流
Demo案例:读取一个整数数字流,将数据流划分为奇数流和偶数流。
实现思路:针对同一个流,多次条用filter算子来拆分
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> ds = env.socketTextStream("node01", 9527).map(Integer::valueOf);//将ds 分为两个流 ,一个是奇数流,一个是偶数流//使用filter 过滤两次SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);ds1.print("偶数");ds2.print("奇数");env.execute();}
}
以上实现的明显缺陷是,同一条数据,被多次处理。以上其实是将原始数据流stream复制两份,然后对每一份分别做筛选,冗余且低效。
2、分流:使用侧输出流
基本步骤为:
- 使用process算子(Flink分层API中的最底层的处理函数)
- 定义OutputTag对象,即输出标签对象,用于后面标记和提取侧流
- 调用上下文ctx的.output()方法
- 通过主流获取侧流
案例:实现将WaterSensor按照Id类型进行分流
先定义下MapFunction的转换规则,用来将输入的数据转为自定义的WaterSensor对象:
public class WaterSensorMapFunction implements MapFunction<String,WaterSensor>{@Overridepublic WaterSensor map(String value) throws Exception {String[] strArr = value.split( regex: ",");//String组装对象return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));}
}
使用侧流:
public class SplitStreamByOutputTag { public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());//定义两个输出标签对象,用于后面标记和提取侧流OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));//返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>(){@Override//形参为别为:流中的一条数据、上下文对象、收集器public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print("主流");SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);s1DS.printToErr("侧流s1"); //区别主流,让控制台输出标红s2DS.printToErr("侧流s2");env.execute();}
}
相关传参说明,首先是创建OutputTag对象时的传参:
- 第一个参数为标签名,用于区分是哪一个侧流
- 第二个是放入侧流中的数据的类型,且必须是Flink的类型(TypeInfomation,借助Types类)
- OutputTag的泛型,是流到对应的侧流的数据类型
ProcessFunction接口的泛型中:
- 第一个是输入的数据类型
- 第二个是输出到主流上的数据类型
ctx.output方法的形参:
- 第一个为outputTag对象
- 第二个为数据,上面代码中传value即直接输出数据本身,也可输出处理后的数据,主流侧流数据类型不用一致
看下运行效果:
3、合流:union
将来源不同的多条流,合并成一条来联合处理,即合流。最简单的合流操作,就是直接将多条流合在一起,叫作流的联合(union)
union的条件是:
- 每条流中要合并的数据类型必须相同(原始不同,可先借助map,在union)
- 合并之后的新流会包括所有流中的元素,数据类型不变
stream1.union(stream2, stream3, ...) //可变长参数
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
//输出:
1
2
3
2
2
3
2
2
3
4、合流:connect
union合并流受限于数据类型,因此还有另一种合流操作:connect
public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Integer流SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("node01", 9527).map(i -> Integer.parseInt(i));//String流DataStreamSource<String> source2 = env.socketTextStream("node01", 2795);/*** 总结: 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的*/ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "来源于原source1流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "来源于原source2流:" + value;}});result.print();env.execute(); }
}
使用 connect 合流的总结:
- 一次只能连接 2条流,因为connect返回的是一个ConnectedStreams对象,不再是DataStreamSource或其子类了
- 两条流中的数据类型可以不一样
- 连接后可以调用 map、flatmap、process来处理,但是各处理各的
以map为例,其形参是一个CoMapFuntion接口类型,泛型则分别是流1的数据类型、流2的数据类型、合并及处理后输出的数据类型。两个map方法可以看出,虽然两个流合并成一个了,但处理数据时还是各玩各的。
- .map1()就是对第一条流中数据的map操作
- .map2()则是针对第二条流
connect 就类比被逼相亲后结婚,两个人看似成一家了,但实际上各自玩各自的。往大了举例就相当于一国两制。
5、connect案例
和connect以后的map传CoMapFunction一样,process算子也不再传ProcessFunction,而是CoProcessFunction,实现两个方法:
- processElement1():针对第一条流
- processElement2():针对第二条流
connect合并后得到的ConnectedStreams也可以直接调用.keyBy()进行按键分区,分区后返回的还是一个ConnectedStreams
connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2,是两条流中各自的键选择器
ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理
案例需求:连接两条流,输出能根据id匹配上的数据,即两个流里元组f0相同的数据(类似inner join效果)
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//二元组流DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));//三元组流DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> result = connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 定义 HashMap,缓存来过的数据,key=id,value=list<数据>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s1数据,都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条,直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s2数据,都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条,直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}});result.print();env.execute();}
}
运行效果:
相关文章:

【API篇】五、Flink分流合流API
文章目录 1、filter算子实现分流2、分流:使用侧输出流3、合流:union4、合流:connect5、connect案例 分流,很形象的一个词,就像一条大河,遇到岸边有分叉的,而形成了主流和测流。对于数据流也一样…...

flutter开发的一个小小小问题,内网依赖下不来
问题 由于众所周知的原因,flutter编译时,经常出现Could not get resource https://storage.googleapis.com/download.flutter.io…的问题,如下: * What went wrong: Could not determine the dependencies of task :app:lintVit…...

RabbitMQ队列及交换机的使用
目录 一、简单模型 1、首先控制台创建一个队列 2、父工程导入依赖 3、生产者配置文件 4、写测试类 5、消费者配置文件 6、消费者接收消息 二、WorkQueues模型 1、在控制台创建一个新的队列 2、生产者生产消息 3、创建两个消费者接收消息 4、能者多劳充分利用每一个消…...
分布式唯一Id,它比GUID好
分布式唯一Id,它比GUID好 一、前言 分布式唯一Id,顾名思义,是指在全世界任何一台计算机上都不会重复的唯一Id。 在单机/单服务器/单数据库的小型应用中,不需要用到这类东西。但在高并发、海量数据、大型分布式应用中,…...

计算机服务器中了勒索病毒怎么解决,勒索病毒解密流程,数据恢复
计算机服务器中了勒索病毒是一件非常令人头疼的事情,勒索病毒不仅会加密企业服务器中的数据,还会对企业计算机系统带来损害,严重地影响了企业的正常运转。最近,云天数据恢复中心工程师总结了,今年以来网络上流行的勒索…...

【NPM】vuex 数据持久化库 vuex-persistedstate
在 GitHub 上找到:vuex-persistedstate。 安装 npm install --save vuex-persistedstate使用 import { createStore } from "vuex"; import createPersistedState from "vuex-persistedstate";const store createStore({// ...plugins: [cr…...

英语——分享篇——每日200词——2601-2800
2601——resistant——[rɪzɪstənt]——adj.抵抗的——resistant——resi热死(拼音)st石头(拼音)ant蚂蚁(熟词)——热死了石头上的蚂蚁还在抵抗——The body may be less resistant if it is cold. ——天冷时,身体的抵抗力会下降。 2602——prospect——[prɒspe…...

SpringCloud-Sentinel
一、介绍 (1)提供界面配置配置服务限流、服务降级、服务熔断 (2)SentinelResource的blockHandler只处理后台配置的异常,运行时异常fallBack处理,且资源名为value时才生效,走兜底方法 二、安装…...

为什么索引要用B+树来实现呢,而不是B树
首先,常规的数据库存储引擎,一般都是采用 B 树或者 B树来实现索引的存储。 B树 因为 B 树是一种多路平衡树,用这种存储结构来存储大量数据,它的整个高度会相比二叉树来说,会矮很多。 而对于数据库来说,所有…...
使用vue3前端开发的一些知识点
Vue 3 是一种流行的 JavaScript 框架,用于构建用户界面。它是 Vue.js 框架的第三个主要版本,具有许多新特性和性能改进。下面是 Vue 3 的一些常用语法和概念的详细介绍: 创建 Vue 实例: 在 Vue 3 中,你可以通过创建一个…...

零基础Linux_20(进程信号)内核态和用户态+处理信号+不可重入函数+volatile
目录 1. 内核态和用户态 1.1 内核态和用户态概念 1.2 内核态和用户态转化 2. 处理信号 2.2 捕捉信号 2.2 系统调用sigaction 3. 不可重入函数 4. volatile关键字 5. SIGCHLD信号(了解) 6. 笔试选择题 答案及解析 本篇完。 1. 内核态和用户态…...
vite+vue3+elementPlus+less+router+pinia+axios
1.创建项目2.按需引入elementplus3.引入less安装vue-router安装 axios安装 piniapinia的持久化配置(用于把数据放在localStorage中)---另外增加的配置 1.创建项目 npm init vitelatest2.按需引入elementplus npm install element-plus --save//按需引入 npm install -D unpl…...

VMwarePlayer安装Ubuntu,切换中文并安装中文输入法
1.下载和安装 虚拟机使用的免费版官网链接:VMwarePlayer Ubuntu镜像下载官网链接:Ubuntu桌面版 自己学习使用,不需要考虑迁移之类的。选择单个磁盘IO性能会更高 安装过程中如果出现如下报错,则用系统管理员身份运行 右击VMwa…...
C# JSON转为实体类和List,以及结合使用
引用 using Newtonsoft.Json;using Newtonsoft.Json.Linq;JSON转实体类 public class Person {public string Name { get; set; }public int Age { get; set; }public string Gender { get; set; } }string jsonStr "{\"name\": \"Tom\", \"a…...

使用TensorRT-LLM进行高性能推理
LLM的火爆之后,英伟达(NVIDIA)也发布了其相关的推理加速引擎TensorRT-LLM。TensorRT是nvidia家的一款高性能深度学习推理SDK。此SDK包含深度学习推理优化器和运行环境,可为深度学习推理应用提供低延迟和高吞吐量。而TensorRT-LLM是在TensorRT基础上针对大模型进一步…...

怎么去别人的github工程下载
1、网络 确保网络能够顺利访问github,有的地方的公共网络不能访问github,我之前开过科学上网的会员,发现没必要特意开去访问它。可以直接开手机热点,一般是可以顺利访问的。 2、下载 以我的github开源笔记qq-hh/C_review (gith…...

【java基础-实战3】list遍历时删除元素的方法
插: 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 坚持不懈,越努力越幸运,大家一起学习鸭~~~ 在实际的业务开发中,容器的遍历可以说是非…...
云计算与云服务
云计算与云服务 1、云计算与云服务概述2、云服务模式(IaaS、PaaS、SaaS、DaaS)3、公有云、私有云和混合云1、云计算与云服务概述 什么是云计算? “云”实质上就是一个网络,狭义上讲,云计算就是一种提供资源的网络,使用者可以随时获取“云”上的资源,按需求量使用,并且…...

Ubuntu20.4 设置代理
主要是涉及2个代理 涉及apt 可以在、/etc/apt/apt.conf 中进行修改 在系统全局可以在/etc/profile中进行修改...
RustDay06------Exercise[71-80]
71.box的使用 说实话这题没太看懂.敲了个模板跟着提示就过了 // box1.rs // // At compile time, Rust needs to know how much space a type takes up. This // becomes problematic for recursive types, where a value can have as part of // itself another value of th…...

【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...

使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...

深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
【FTP】ftp文件传输会丢包吗?批量几百个文件传输,有一些文件没有传输完整,如何解决?
FTP(File Transfer Protocol)本身是一个基于 TCP 的协议,理论上不会丢包。但 FTP 文件传输过程中仍可能出现文件不完整、丢失或损坏的情况,主要原因包括: ✅ 一、FTP传输可能“丢包”或文件不完整的原因 原因描述网络…...

云安全与网络安全:核心区别与协同作用解析
在数字化转型的浪潮中,云安全与网络安全作为信息安全的两大支柱,常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异,并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全:聚焦于保…...
Python第七周作业
Python第七周作业 文章目录 Python第七周作业 1.使用open以只读模式打开文件data.txt,并逐行打印内容 2.使用pathlib模块获取当前脚本的绝对路径,并创建logs目录(若不存在) 3.递归遍历目录data,输出所有.csv文件的路径…...

RKNN开发环境搭建2-RKNN Model Zoo 环境搭建
目录 1.简介2.环境搭建2.1 启动 docker 环境2.2 安装依赖工具2.3 下载 RKNN Model Zoo2.4 RKNN模型转化2.5编译C++1.简介 RKNN Model Zoo基于 RKNPU SDK 工具链开发, 提供了目前主流算法的部署例程. 例程包含导出RKNN模型, 使用 Python API, CAPI 推理 RKNN 模型的流程. 本…...