当前位置: 首页 > news >正文

【API篇】五、Flink分流合流API

文章目录

  • 1、filter算子实现分流
  • 2、分流:使用侧输出流
  • 3、合流:union
  • 4、合流:connect
  • 5、connect案例

分流,很形象的一个词,就像一条大河,遇到岸边有分叉的,而形成了主流和测流。对于数据流也一样,不过是一个个水滴替换成了一条条数据。

在这里插入图片描述

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

在这里插入图片描述

1、filter算子实现分流

Demo案例:读取一个整数数字流,将数据流划分为奇数流和偶数流。

实现思路:针对同一个流,多次条用filter算子来拆分

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<Integer> ds = env.socketTextStream("node01", 9527).map(Integer::valueOf);//将ds 分为两个流 ,一个是奇数流,一个是偶数流//使用filter 过滤两次SingleOutputStreamOperator<Integer> ds1 = ds.filter(x -> x % 2 == 0);SingleOutputStreamOperator<Integer> ds2 = ds.filter(x -> x % 2 == 1);ds1.print("偶数");ds2.print("奇数");env.execute();}
}

以上实现的明显缺陷是,同一条数据,被多次处理。以上其实是将原始数据流stream复制两份,然后对每一份分别做筛选,冗余且低效。

2、分流:使用侧输出流

基本步骤为:

  • 使用process算子(Flink分层API中的最底层的处理函数)
  • 定义OutputTag对象,即输出标签对象,用于后面标记和提取侧流
  • 调用上下文ctx的.output()方法
  • 通过主流获取侧流
案例:实现将WaterSensor按照Id类型进行分流

先定义下MapFunction的转换规则,用来将输入的数据转为自定义的WaterSensor对象:

public class WaterSensorMapFunction implements MapFunction<StringWaterSensor>{@Overridepublic WaterSensor map(String value) throws Exception {String[] strArr = value.split( regex: ",");//String组装对象return new WaterSensor(strArr[0],Long.value0f(strArr[1]),Integer.value0f(strArr[2]));}
}

使用侧流:

public class SplitStreamByOutputTag {    public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperator<WaterSensor> ds = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction());//定义两个输出标签对象,用于后面标记和提取侧流OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));//返回的都是主流SingleOutputStreamOperator<WaterSensor> ds1 = ds.process(new ProcessFunction<WaterSensor, WaterSensor>(){@Override//形参为别为:流中的一条数据、上下文对象、收集器public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {if ("s1".equals(value.getId())) {ctx.output(s1, value);} else if ("s2".equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print("主流");SideOutputDataStream<WaterSensor> s1DS = ds1.getSideOutput(s1);SideOutputDataStream<WaterSensor> s2DS = ds1.getSideOutput(s2);s1DS.printToErr("侧流s1");  //区别主流,让控制台输出标红s2DS.printToErr("侧流s2");env.execute();}
}

相关传参说明,首先是创建OutputTag对象时的传参:

  • 第一个参数为标签名,用于区分是哪一个侧流
  • 第二个是放入侧流中的数据的类型,且必须是Flink的类型(TypeInfomation,借助Types类)
  • OutputTag的泛型,是流到对应的侧流的数据类型

ProcessFunction接口的泛型中:

  • 第一个是输入的数据类型
  • 第二个是输出到主流上的数据类型

ctx.output方法的形参:

  • 第一个为outputTag对象
  • 第二个为数据,上面代码中传value即直接输出数据本身,也可输出处理后的数据,主流侧流数据类型不用一致

看下运行效果:

在这里插入图片描述

3、合流:union

将来源不同的多条流,合并成一条来联合处理,即合流。最简单的合流操作,就是直接将多条流合在一起,叫作流的联合(union)

在这里插入图片描述

union的条件是:

  • 每条流中要合并的数据类型必须相同(原始不同,可先借助map,在union)
  • 合并之后的新流会包括所有流中的元素,数据类型不变
stream1.union(stream2, stream3, ...)  //可变长参数
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> ds1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> ds2 = env.fromElements(2, 2, 3);DataStreamSource<String> ds3 = env.fromElements("2", "2", "3");ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}
//输出:
1
2
3
2
2
3
2
2
3

4、合流:connect

union合并流受限于数据类型,因此还有另一种合流操作:connect

在这里插入图片描述

public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Integer流SingleOutputStreamOperator<Integer> source1 = env.socketTextStream("node01", 9527).map(i -> Integer.parseInt(i));//String流DataStreamSource<String> source2 = env.socketTextStream("node01", 2795);/*** 总结: 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理,但是各处理各的*/ConnectedStreams<Integer, String> connect = source1.connect(source2);SingleOutputStreamOperator<String> result = connect.map(new CoMapFunction<Integer, String, String>() {@Overridepublic String map1(Integer value) throws Exception {return "来源于原source1流:" + value.toString();}@Overridepublic String map2(String value) throws Exception {return "来源于原source2流:" + value;}});result.print();env.execute();    }
}

使用 connect 合流的总结:

  • 一次只能连接 2条流,因为connect返回的是一个ConnectedStreams对象,不再是DataStreamSource或其子类了
  • 两条流中的数据类型可以不一样
  • 连接后可以调用 map、flatmap、process来处理,但是各处理各的

以map为例,其形参是一个CoMapFuntion接口类型,泛型则分别是流1的数据类型、流2的数据类型、合并及处理后输出的数据类型。两个map方法可以看出,虽然两个流合并成一个了,但处理数据时还是各玩各的。

  • .map1()就是对第一条流中数据的map操作
  • .map2()则是针对第二条流

在这里插入图片描述

connect 就类比被逼相亲后结婚,两个人看似成一家了,但实际上各自玩各自的。往大了举例就相当于一国两制。

5、connect案例

和connect以后的map传CoMapFunction一样,process算子也不再传ProcessFunction,而是CoProcessFunction,实现两个方法:

  • processElement1():针对第一条流
  • processElement2():针对第二条流

connect合并后得到的ConnectedStreams也可以直接调用.keyBy()进行按键分区,分区后返回的还是一个ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);
//keySelector1和keySelector2,是两条流中各自的键选择器

ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理

案例需求:连接两条流,输出能根据id匹配上的数据,即两个流里元组f0相同的数据(类似inner join效果)
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//二元组流DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(Tuple2.of(1, "a1"),Tuple2.of(1, "a2"),Tuple2.of(2, "b"),Tuple2.of(3, "c"));//三元组流DataStreamSource<Tuple3<Integer, String, Integer>> source2 = env.fromElements(Tuple3.of(1, "aa1", 1),Tuple3.of(1, "aa2", 2),Tuple3.of(2, "bb", 1),Tuple3.of(3, "cc", 1));ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);// 多并行度下,需要根据 关联条件 进行keyby,才能保证key相同的数据到一起去,才能匹配上ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connectKey = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);SingleOutputStreamOperator<String> result = connectKey.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {// 定义 HashMap,缓存来过的数据,key=id,value=list<数据>Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();@Overridepublic void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s1数据,都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple2<Integer, String>> s1Values = new ArrayList<>();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条,直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id,查找s2的数据,只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {out.collect("s1:" + value + "<--------->s2:" + s2Element);}}}@Overridepublic void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {Integer id = value.f0;// TODO 1.来过的s2数据,都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据,初始化 value的list,放入 hashmapList<Tuple3<Integer, String, Integer>> s2Values = new ArrayList<>();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条,直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id,查找s1的数据,只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {out.collect("s1:" + s1Element + "<--------->s2:" + value);}}}});result.print();env.execute();}
}

运行效果:

在这里插入图片描述

相关文章:

【API篇】五、Flink分流合流API

文章目录 1、filter算子实现分流2、分流&#xff1a;使用侧输出流3、合流&#xff1a;union4、合流&#xff1a;connect5、connect案例 分流&#xff0c;很形象的一个词&#xff0c;就像一条大河&#xff0c;遇到岸边有分叉的&#xff0c;而形成了主流和测流。对于数据流也一样…...

flutter开发的一个小小小问题,内网依赖下不来

问题 由于众所周知的原因&#xff0c;flutter编译时&#xff0c;经常出现Could not get resource https://storage.googleapis.com/download.flutter.io…的问题&#xff0c;如下&#xff1a; * What went wrong: Could not determine the dependencies of task :app:lintVit…...

RabbitMQ队列及交换机的使用

目录 一、简单模型 1、首先控制台创建一个队列 2、父工程导入依赖 3、生产者配置文件 4、写测试类 5、消费者配置文件 6、消费者接收消息 二、WorkQueues模型 1、在控制台创建一个新的队列 2、生产者生产消息 3、创建两个消费者接收消息 4、能者多劳充分利用每一个消…...

分布式唯一Id,它比GUID好

分布式唯一Id&#xff0c;它比GUID好 一、前言 分布式唯一Id&#xff0c;顾名思义&#xff0c;是指在全世界任何一台计算机上都不会重复的唯一Id。 在单机/单服务器/单数据库的小型应用中&#xff0c;不需要用到这类东西。但在高并发、海量数据、大型分布式应用中&#xff0c…...

计算机服务器中了勒索病毒怎么解决,勒索病毒解密流程,数据恢复

计算机服务器中了勒索病毒是一件非常令人头疼的事情&#xff0c;勒索病毒不仅会加密企业服务器中的数据&#xff0c;还会对企业计算机系统带来损害&#xff0c;严重地影响了企业的正常运转。最近&#xff0c;云天数据恢复中心工程师总结了&#xff0c;今年以来网络上流行的勒索…...

【NPM】vuex 数据持久化库 vuex-persistedstate

在 GitHub 上找到&#xff1a;vuex-persistedstate。 安装 npm install --save vuex-persistedstate使用 import { createStore } from "vuex"; import createPersistedState from "vuex-persistedstate";const store createStore({// ...plugins: [cr…...

英语——分享篇——每日200词——2601-2800

2601——resistant——[rɪzɪstənt]——adj.抵抗的——resistant——resi热死(拼音)st石头(拼音)ant蚂蚁(熟词)——热死了石头上的蚂蚁还在抵抗——The body may be less resistant if it is cold. ——天冷时&#xff0c;身体的抵抗力会下降。 2602——prospect——[prɒspe…...

SpringCloud-Sentinel

一、介绍 &#xff08;1&#xff09;提供界面配置配置服务限流、服务降级、服务熔断 &#xff08;2&#xff09;SentinelResource的blockHandler只处理后台配置的异常&#xff0c;运行时异常fallBack处理&#xff0c;且资源名为value时才生效&#xff0c;走兜底方法 二、安装…...

为什么索引要用B+树来实现呢,而不是B树

首先&#xff0c;常规的数据库存储引擎&#xff0c;一般都是采用 B 树或者 B树来实现索引的存储。 B树 因为 B 树是一种多路平衡树&#xff0c;用这种存储结构来存储大量数据&#xff0c;它的整个高度会相比二叉树来说&#xff0c;会矮很多。 而对于数据库来说&#xff0c;所有…...

使用vue3前端开发的一些知识点

Vue 3 是一种流行的 JavaScript 框架&#xff0c;用于构建用户界面。它是 Vue.js 框架的第三个主要版本&#xff0c;具有许多新特性和性能改进。下面是 Vue 3 的一些常用语法和概念的详细介绍&#xff1a; 创建 Vue 实例&#xff1a; 在 Vue 3 中&#xff0c;你可以通过创建一个…...

零基础Linux_20(进程信号)内核态和用户态+处理信号+不可重入函数+volatile

目录 1. 内核态和用户态 1.1 内核态和用户态概念 1.2 内核态和用户态转化 2. 处理信号 2.2 捕捉信号 2.2 系统调用sigaction 3. 不可重入函数 4. volatile关键字 5. SIGCHLD信号&#xff08;了解&#xff09; 6. 笔试选择题 答案及解析 本篇完。 1. 内核态和用户态…...

vite+vue3+elementPlus+less+router+pinia+axios

1.创建项目2.按需引入elementplus3.引入less安装vue-router安装 axios安装 piniapinia的持久化配置(用于把数据放在localStorage中)---另外增加的配置 1.创建项目 npm init vitelatest2.按需引入elementplus npm install element-plus --save//按需引入 npm install -D unpl…...

VMwarePlayer安装Ubuntu,切换中文并安装中文输入法

1.下载和安装 虚拟机使用的免费版官网链接&#xff1a;VMwarePlayer Ubuntu镜像下载官网链接&#xff1a;Ubuntu桌面版 自己学习使用&#xff0c;不需要考虑迁移之类的。选择单个磁盘IO性能会更高 安装过程中如果出现如下报错&#xff0c;则用系统管理员身份运行 右击VMwa…...

C# JSON转为实体类和List,以及结合使用

引用 using Newtonsoft.Json;using Newtonsoft.Json.Linq;JSON转实体类 public class Person {public string Name { get; set; }public int Age { get; set; }public string Gender { get; set; } }string jsonStr "{\"name\": \"Tom\", \"a…...

使用TensorRT-LLM进行高性能推理

LLM的火爆之后&#xff0c;英伟达(NVIDIA)也发布了其相关的推理加速引擎TensorRT-LLM。TensorRT是nvidia家的一款高性能深度学习推理SDK。此SDK包含深度学习推理优化器和运行环境,可为深度学习推理应用提供低延迟和高吞吐量。而TensorRT-LLM是在TensorRT基础上针对大模型进一步…...

怎么去别人的github工程下载

1、网络 确保网络能够顺利访问github&#xff0c;有的地方的公共网络不能访问github&#xff0c;我之前开过科学上网的会员&#xff0c;发现没必要特意开去访问它。可以直接开手机热点&#xff0c;一般是可以顺利访问的。 2、下载 以我的github开源笔记qq-hh/C_review (gith…...

【java基础-实战3】list遍历时删除元素的方法

插&#xff1a; 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 坚持不懈&#xff0c;越努力越幸运&#xff0c;大家一起学习鸭~~~ 在实际的业务开发中&#xff0c;容器的遍历可以说是非…...

云计算与云服务

云计算与云服务 1、云计算与云服务概述2、云服务模式(IaaS、PaaS、SaaS、DaaS)3、公有云、私有云和混合云1、云计算与云服务概述 什么是云计算? “云”实质上就是一个网络,狭义上讲,云计算就是一种提供资源的网络,使用者可以随时获取“云”上的资源,按需求量使用,并且…...

Ubuntu20.4 设置代理

主要是涉及2个代理 涉及apt 可以在、/etc/apt/apt.conf 中进行修改 在系统全局可以在/etc/profile中进行修改...

RustDay06------Exercise[71-80]

71.box的使用 说实话这题没太看懂.敲了个模板跟着提示就过了 // box1.rs // // At compile time, Rust needs to know how much space a type takes up. This // becomes problematic for recursive types, where a value can have as part of // itself another value of th…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望

文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例&#xff1a;使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例&#xff1a;使用OpenAI GPT-3进…...

centos 7 部署awstats 网站访问检测

一、基础环境准备&#xff08;两种安装方式都要做&#xff09; bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats&#xff0…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

Neo4j 集群管理:原理、技术与最佳实践深度解析

Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...