Flink的多流转换(分流-侧输出流、合流-union、connect、join)
在实际应用中,我们可能要将多个不同来源的数据连接合并在一起进行处理,也有可能要将一条流拆分成多条流进行处理,这就涉及到了Flink的多流转换问题。简单来说,就是分流和合流两大操作,分流主要通过侧输出流实现,合流的算子就比较丰富了,有union、connect、join等。
一、分流
所谓分流,就是通过定义一些筛选条件,将一个dataStream拆分成多个子dataStream的过程,每条子数据流之间完全独立。Flink中的分流主要通过侧输出流来实现。
通过调用底层的处理函数,可以获取到上下文信息,调用上下文的.output方法就可以实施分流操作了。.output方法需要传入一个“输出标签"(OutputTag),用来标记侧输出流(相当于给侧输出流盖了个戳,指明他的名称和类型),之后也可以通过.getSideOutput()方法传入OutputTag获取到相应的侧输出流。
二、合流
对多个来源的多条流进行联合处理时,需要用到合流操作,具体有如下几种合流算子:
1. union
union操作要求不同流中的数据类型必须一致, 类似sql语言中的union,是纵向的合并。对datastream调用.union方法即可实现多流合并,合并后的流类型仍然是datastream。这里要注意,多条流合并后的水位线应以最小的那个为准(类似多个并行子任务向下游传递)。
stream1.union(stream2, stream3, ...)
2. connect
union操作简单,但要求流的数据类型一致,实际应用中实用性不高。针对两条数据类型不一样的流,Flink还提供了connect合流操作,connect操作只能连接两条流。
(1) 两个dataStream进行connect -> 连接流(ConnectedStreams)
对于两条数据类型不一致的dataStream进行连接,调用.connect()方法,所得到的是一个连接流ConnectedStreams,然后再调用同处理方法分别对两条流进行处理,得到一个统一类型的dataStream。这里的同处理方法可以是map、flatmap也可以是底层的处理函数process,只是在传入参数时跟以往的单流不同,如map方法传入的不再是MapFunction而是CoMapFunction,可以实现对两条流分别做map操作。
对ConnectedStreams也可以先调用keyBy进行按键分区操作后,再调用同处理方法。这里调用KeyBy后得到的仍然是ConnectedStreams,keyBy要传入两个参数keySelector1和keySelector2类似于sql中两表之间的 join操作的关联字段。
connectedStreams.keyBy(keySelector1, keySelector2);
(2) dataStream与广播流(broadcastStream)进行connect -> 广播连接流
当需要动态定义某些规则或配置时,如维度表配置信息是动态变化的,存储在MySQL数据库中,我们用maxwell实时对它进行了监控,当发生变化时,这个配置信息是要完整的告知原始数据流的(从业务数据库中抽取的原始数据),即若原始数据流分为了多个并行子任务,则每个并行子任务上都应该知道配置信息的变化,因此需要对配置信息进行广播连接。
对dataStream调用.broadcast()方法就可以得到广播流,将要处理的数据流与这条广播流进行connect,得到的就是广播连接流,可以调用.process方法进行动态处理,同样要实现的是一个类似CoProcessFunction的抽象类,对两条流分别进行处理。
3. join
connect方法已经能够实现各种需求了,但是其支持的处理函数太过于底层,在很多场景下太过于抽象了,flink还为datastream提供了内置的join算子和coGroup算子来简化一些特定场景下的合流操作。
(1) 窗口联结(window join)
当我们不仅需要对两条流进行连接,还需要对连接后的流进行窗口操作,Flink为这种场景专门提供了一个窗口联结算子。如下操作可将两条流基于联结字段进行配对,并将key相同的放入一个窗口进行窗口计算。
stream1.join(stream2).where(<KeySelector>) // stream1的联结字段.equalTo(<KeySelector>) // stream2的联结字段.window(<WindowAssigner>).apply(<JoinFunction>)
注意 这里调用窗口函数只能通过.apply()方法。
窗口join的具体流程如下:两条流根据key进行分组,分别进入对应的窗口存储;到达窗口时间时,会先统计窗口内两条流的笛卡尔积,然后进行遍历,遍历到一对匹配的数据就调用一次窗口函数并输出结果。
(2) 间隔联结(interval join)
间隔联结为数据流中的每一条数据单独开辟属于自己的时间窗口。试想这样一个场景,对于一条流A中的一条数据a,它只想和自己时间戳的前后一段时间间隔的B数据流进行连接,这样窗口联结就无法做到,需要间隔联结。
间隔联结的两条流必须基于相同的key,且需要给定间隔上界和间隔下界,则数据a的窗口大小就是[a.timestamp+lowbound, a.timestamp+upperbound],其中lowbound<upperbound,两者都可正可负。
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process(new ProcessJoinFunction(){})
4. coGroup
coGroup 与窗口联结类似,也是将两条流合并后开窗处理匹配元素,调用时只需将.join()方法换成.coGroup()方法即可。
stream1.coGroup(stream2).where(<KeySelector>) // stream1的联结字段.equalTo(<KeySelector>) // stream2的联结字段.window(<WindowAssigner>).apply(<CoGroupFunction>)
在window join中,数据在窗口中是先做笛卡尔积,再遍历是否匹配, 只有匹配的数据才会去调用apply方法,因此,window join实现的是类似sql中的inner join功能。而在coGroup函数中,数据不会做笛卡尔积,而是将所有搜集到的数据都传入到apply方法中,用户可以自定义匹配逻辑,因此可以实现任意外连接或是其他用户想要的连接方式。
相关文章:
Flink的多流转换(分流-侧输出流、合流-union、connect、join)
在实际应用中,我们可能要将多个不同来源的数据连接合并在一起进行处理,也有可能要将一条流拆分成多条流进行处理,这就涉及到了Flink的多流转换问题。简单来说,就是分流和合流两大操作,分流主要通过侧输出流实现&#x…...
DirectUI属性表
<?xml version"1.0" encoding"UTF-8"?> <Controls><Window parent""><Attribute name"size" default"0,0" type"SIZE" comment"窗口的初始化大小,如(800,600)"/><Attribu…...

RBAC权限控制
1、Spring Security 是一个功能强大的Java安全框架,它提供了全面的安全认证和授权的支持。 2 SpringSecurity配置类(源码逐行解析) Spring Security的配置类是实现安全控制的核心部分 开启Spring Security各种功能,以确保Web应…...
STM32高级物联网通信之以太网通讯
目录 以太网通讯基础知识 什么是以太网 互联网和以太网的区别 1)概念与范围 (1)互联网 (2)以太网 2)技术特点 (1)互联网 (2)以太网 3)应用场景 (1)互联网 (2)以太网 以太网的层次 1)物理层 2)数据链路层 OSI 7层模型 TCPIP 4层模型 一些常见…...

【小程序】全局配置window和tabBar
目录 全局配置 1. 全局配置文件及常用的配置项 全局配置 - window 1. 小程序窗口的组成部分 2. 了解 window 节点常用的配置项 编辑 3. 设置导航栏的标题 4. 设置导航栏的背景色 5. 设置导航栏的标题颜色 6. 全局开启下拉刷新功能 7. 设置下拉刷新时窗口的背景色 …...

详解VHDL如何编写Testbench
1.概述 仿真测试平台文件(Testbench)是可以用来验证所设计的硬件模型正确性的 VHDL模型,它为所测试的元件提供了激励信号,可以以波形的方式显示仿真结果或把测试结果存储到文件中。这里所说的激励信号可以直接集成在测试平台文件中,也可以从…...

冥想的实践
这是我某一天的正念和冥想实践,我对正念练习、冥想练习进行了分别的统计。 正念练习:1分钟**5次 冥想:15分钟10分钟 正念练习,基本在工作休息时间练习。当然,工作过程中,也有一部分时间会有正念的状态&am…...

STM32F103RCT6学习之四:定时器
1.基础 定时器可以对输入的时钟进行计数,并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元,在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能,而且还包含内外时钟源选择、输入捕获、…...

如何在网页端使用 IDE 高效地阅读 GitHub 源码?
如何在网页端使用 IDE 高效地阅读 GitHub 源码? 前言什么是 GitHub1s?使用 GitHub1s 阅读 browser-use 项目源码步骤 1: 打开 GitHub 项目页面步骤 2: 修改 URL 使用 GitHub1s步骤 3: 浏览文件结构步骤 4: 使用代码高亮和智能补全功能步骤 5: 快速跳转和…...

易基因: BS+ChIP-seq揭示DNA甲基化调控非编码RNA(VIM-AS1)抑制肿瘤侵袭性|Exp Mol Med
大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 肝细胞癌(hepatocellular carcinoma,HCC)早期复发仍然是一个具有挑战性的领域,其中涉及的机制尚未完全被理解。尽管微血管侵犯(…...

欢迪迈手机商城设计与实现基于(代码+数据库+LW)
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本欢迪迈手机商城就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息…...
数据库基础与应用:从概念到实践
数据库是信息技术中的核心组件之一,是现代计算机系统中不可或缺的部分。无论是日常应用的社交网络、电子商务网站,还是企业级的大型系统,几乎所有的信息管理都离不开数据库。那么,数据库究竟是什么?它是如何工作的&…...

jenkins集成工具(一)部署php项目
目录 什么是CI 、CD Jenkins集成工具 一、Jenkins介绍 二、jenkins的安装和部署 环境部署 安装jenkins 安装gitlab 配置镜像源进行安装 修改密码 安装git工具 上传测试代码 Jenkins部署php项目wordpress 发布php代码 安装插件 测试代码发布 实现发布成功发送邮件…...

17_HTML5 Web 存储 --[HTML5 API 学习之旅]
HTML5 Web 存储(Web Storage)是 HTML5 引入的一种在用户浏览器中存储数据的机制。它提供了比传统的 cookies 更加方便和强大的功能,包括更大的存储空间、更好的性能以及更简单的 API。Web 存储主要分为两种类型:localStorage 和 s…...

GCP Cloud Architect exam - PASS
备考指南 推荐视频课程 https://www.udemy.com/course/google-cloud-architect-certifications/?couponCodeKEEPLEARNING 推荐题库 https://www.udemy.com/course/gcp-professional-cloud-architect-exam-practice-tests-2024/?couponCodeKEEPLEARNING 错题集 http…...

【Sentinel】初识Sentinel
目录 1.1.雪崩问题及解决方案 1.1.1.雪崩问题 1.1.2.超时处理 1.1.3.仓壁模式 1.1.4.断路器 1.1.5.限流 1.1.6.总结 1.2.服务保护技术对比 1.3.Sentinel介绍和安装 1.3.1.初识Sentinel 1.3.2.安装Sentinel 1.4.微服务整合Sentinel 1.1.雪崩问题及解决方案 1.1.1.…...

java常见类库
StringBuffer类 String和StringBuffer的区别 String 不可变性:String 类是不可变的,这意味着一旦创建了一个 String 对象,其值就不能改变。每次对 String 进行修改(如连接、替换等操作)都会产生新的 String 对象&…...

Wordly Wise 3000 国际背单词01 介绍 + 测词汇量
📚 Wordly Wise 3000 国际背单词01 介绍 测词汇量 🌟 大家好!我们正式启动背Wordly Wise 3000单词,旨在利用国际资源和科学的学练方法,帮助大家更得效地坚持学练单词。我们将通过图文和Video等多种形式与大家分享经验…...
Unity Dots理论学习-2.ECS有关的模块(1)
Unity的实体组件系统(ECS)是支撑DOTS模块和技术的面向数据架构。ECS为Unity中的内存数据和runtime进程调度提供了高度的控制和确定性。 ECS for Unity 2022 LTS 配备了两个兼容的物理引擎,一个高级的Netcode package,以及一个用来…...

2021.12.28基于UDP同信的相关流程
作业 1、将TCP的CS模型再敲一遍 服务器 #include <myhead.h> #define PORT 8888 #define IP "192.168.124.123" int main(int argc, const char *argv[]) {//创建套接字//绑定本机IP和端口号//监听客户端请求//接收客户端连接请求//收发消息//创建套接字int…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

label-studio的使用教程(导入本地路径)
文章目录 1. 准备环境2. 脚本启动2.1 Windows2.2 Linux 3. 安装label-studio机器学习后端3.1 pip安装(推荐)3.2 GitHub仓库安装 4. 后端配置4.1 yolo环境4.2 引入后端模型4.3 修改脚本4.4 启动后端 5. 标注工程5.1 创建工程5.2 配置图片路径5.3 配置工程类型标签5.4 配置模型5.…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
Caliper 配置文件解析:config.yaml
Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...

华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...