Flink的多流转换(分流-侧输出流、合流-union、connect、join)
在实际应用中,我们可能要将多个不同来源的数据连接合并在一起进行处理,也有可能要将一条流拆分成多条流进行处理,这就涉及到了Flink的多流转换问题。简单来说,就是分流和合流两大操作,分流主要通过侧输出流实现,合流的算子就比较丰富了,有union、connect、join等。
一、分流
所谓分流,就是通过定义一些筛选条件,将一个dataStream拆分成多个子dataStream的过程,每条子数据流之间完全独立。Flink中的分流主要通过侧输出流来实现。
通过调用底层的处理函数,可以获取到上下文信息,调用上下文的.output方法就可以实施分流操作了。.output方法需要传入一个“输出标签"(OutputTag),用来标记侧输出流(相当于给侧输出流盖了个戳,指明他的名称和类型),之后也可以通过.getSideOutput()方法传入OutputTag获取到相应的侧输出流。
二、合流
对多个来源的多条流进行联合处理时,需要用到合流操作,具体有如下几种合流算子:
1. union
union操作要求不同流中的数据类型必须一致, 类似sql语言中的union,是纵向的合并。对datastream调用.union方法即可实现多流合并,合并后的流类型仍然是datastream。这里要注意,多条流合并后的水位线应以最小的那个为准(类似多个并行子任务向下游传递)。
stream1.union(stream2, stream3, ...)
2. connect
union操作简单,但要求流的数据类型一致,实际应用中实用性不高。针对两条数据类型不一样的流,Flink还提供了connect合流操作,connect操作只能连接两条流。
(1) 两个dataStream进行connect -> 连接流(ConnectedStreams)
对于两条数据类型不一致的dataStream进行连接,调用.connect()方法,所得到的是一个连接流ConnectedStreams,然后再调用同处理方法分别对两条流进行处理,得到一个统一类型的dataStream。这里的同处理方法可以是map、flatmap也可以是底层的处理函数process,只是在传入参数时跟以往的单流不同,如map方法传入的不再是MapFunction而是CoMapFunction,可以实现对两条流分别做map操作。
对ConnectedStreams也可以先调用keyBy进行按键分区操作后,再调用同处理方法。这里调用KeyBy后得到的仍然是ConnectedStreams,keyBy要传入两个参数keySelector1和keySelector2类似于sql中两表之间的 join操作的关联字段。
connectedStreams.keyBy(keySelector1, keySelector2);
(2) dataStream与广播流(broadcastStream)进行connect -> 广播连接流
当需要动态定义某些规则或配置时,如维度表配置信息是动态变化的,存储在MySQL数据库中,我们用maxwell实时对它进行了监控,当发生变化时,这个配置信息是要完整的告知原始数据流的(从业务数据库中抽取的原始数据),即若原始数据流分为了多个并行子任务,则每个并行子任务上都应该知道配置信息的变化,因此需要对配置信息进行广播连接。
对dataStream调用.broadcast()方法就可以得到广播流,将要处理的数据流与这条广播流进行connect,得到的就是广播连接流,可以调用.process方法进行动态处理,同样要实现的是一个类似CoProcessFunction的抽象类,对两条流分别进行处理。
3. join
connect方法已经能够实现各种需求了,但是其支持的处理函数太过于底层,在很多场景下太过于抽象了,flink还为datastream提供了内置的join算子和coGroup算子来简化一些特定场景下的合流操作。
(1) 窗口联结(window join)
当我们不仅需要对两条流进行连接,还需要对连接后的流进行窗口操作,Flink为这种场景专门提供了一个窗口联结算子。如下操作可将两条流基于联结字段进行配对,并将key相同的放入一个窗口进行窗口计算。
stream1.join(stream2).where(<KeySelector>) // stream1的联结字段.equalTo(<KeySelector>) // stream2的联结字段.window(<WindowAssigner>).apply(<JoinFunction>)
注意 这里调用窗口函数只能通过.apply()方法。
窗口join的具体流程如下:两条流根据key进行分组,分别进入对应的窗口存储;到达窗口时间时,会先统计窗口内两条流的笛卡尔积,然后进行遍历,遍历到一对匹配的数据就调用一次窗口函数并输出结果。
(2) 间隔联结(interval join)
间隔联结为数据流中的每一条数据单独开辟属于自己的时间窗口。试想这样一个场景,对于一条流A中的一条数据a,它只想和自己时间戳的前后一段时间间隔的B数据流进行连接,这样窗口联结就无法做到,需要间隔联结。
间隔联结的两条流必须基于相同的key,且需要给定间隔上界和间隔下界,则数据a的窗口大小就是[a.timestamp+lowbound, a.timestamp+upperbound],其中lowbound<upperbound,两者都可正可负。
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process(new ProcessJoinFunction(){})
4. coGroup
coGroup 与窗口联结类似,也是将两条流合并后开窗处理匹配元素,调用时只需将.join()方法换成.coGroup()方法即可。
stream1.coGroup(stream2).where(<KeySelector>) // stream1的联结字段.equalTo(<KeySelector>) // stream2的联结字段.window(<WindowAssigner>).apply(<CoGroupFunction>)
在window join中,数据在窗口中是先做笛卡尔积,再遍历是否匹配, 只有匹配的数据才会去调用apply方法,因此,window join实现的是类似sql中的inner join功能。而在coGroup函数中,数据不会做笛卡尔积,而是将所有搜集到的数据都传入到apply方法中,用户可以自定义匹配逻辑,因此可以实现任意外连接或是其他用户想要的连接方式。
相关文章:
Flink的多流转换(分流-侧输出流、合流-union、connect、join)
在实际应用中,我们可能要将多个不同来源的数据连接合并在一起进行处理,也有可能要将一条流拆分成多条流进行处理,这就涉及到了Flink的多流转换问题。简单来说,就是分流和合流两大操作,分流主要通过侧输出流实现&#x…...
DirectUI属性表
<?xml version"1.0" encoding"UTF-8"?> <Controls><Window parent""><Attribute name"size" default"0,0" type"SIZE" comment"窗口的初始化大小,如(800,600)"/><Attribu…...
RBAC权限控制
1、Spring Security 是一个功能强大的Java安全框架,它提供了全面的安全认证和授权的支持。 2 SpringSecurity配置类(源码逐行解析) Spring Security的配置类是实现安全控制的核心部分 开启Spring Security各种功能,以确保Web应…...
STM32高级物联网通信之以太网通讯
目录 以太网通讯基础知识 什么是以太网 互联网和以太网的区别 1)概念与范围 (1)互联网 (2)以太网 2)技术特点 (1)互联网 (2)以太网 3)应用场景 (1)互联网 (2)以太网 以太网的层次 1)物理层 2)数据链路层 OSI 7层模型 TCPIP 4层模型 一些常见…...
【小程序】全局配置window和tabBar
目录 全局配置 1. 全局配置文件及常用的配置项 全局配置 - window 1. 小程序窗口的组成部分 2. 了解 window 节点常用的配置项 编辑 3. 设置导航栏的标题 4. 设置导航栏的背景色 5. 设置导航栏的标题颜色 6. 全局开启下拉刷新功能 7. 设置下拉刷新时窗口的背景色 …...
详解VHDL如何编写Testbench
1.概述 仿真测试平台文件(Testbench)是可以用来验证所设计的硬件模型正确性的 VHDL模型,它为所测试的元件提供了激励信号,可以以波形的方式显示仿真结果或把测试结果存储到文件中。这里所说的激励信号可以直接集成在测试平台文件中,也可以从…...
冥想的实践
这是我某一天的正念和冥想实践,我对正念练习、冥想练习进行了分别的统计。 正念练习:1分钟**5次 冥想:15分钟10分钟 正念练习,基本在工作休息时间练习。当然,工作过程中,也有一部分时间会有正念的状态&am…...
STM32F103RCT6学习之四:定时器
1.基础 定时器可以对输入的时钟进行计数,并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元,在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能,而且还包含内外时钟源选择、输入捕获、…...
如何在网页端使用 IDE 高效地阅读 GitHub 源码?
如何在网页端使用 IDE 高效地阅读 GitHub 源码? 前言什么是 GitHub1s?使用 GitHub1s 阅读 browser-use 项目源码步骤 1: 打开 GitHub 项目页面步骤 2: 修改 URL 使用 GitHub1s步骤 3: 浏览文件结构步骤 4: 使用代码高亮和智能补全功能步骤 5: 快速跳转和…...
易基因: BS+ChIP-seq揭示DNA甲基化调控非编码RNA(VIM-AS1)抑制肿瘤侵袭性|Exp Mol Med
大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 肝细胞癌(hepatocellular carcinoma,HCC)早期复发仍然是一个具有挑战性的领域,其中涉及的机制尚未完全被理解。尽管微血管侵犯(…...
欢迪迈手机商城设计与实现基于(代码+数据库+LW)
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本欢迪迈手机商城就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息…...
数据库基础与应用:从概念到实践
数据库是信息技术中的核心组件之一,是现代计算机系统中不可或缺的部分。无论是日常应用的社交网络、电子商务网站,还是企业级的大型系统,几乎所有的信息管理都离不开数据库。那么,数据库究竟是什么?它是如何工作的&…...
jenkins集成工具(一)部署php项目
目录 什么是CI 、CD Jenkins集成工具 一、Jenkins介绍 二、jenkins的安装和部署 环境部署 安装jenkins 安装gitlab 配置镜像源进行安装 修改密码 安装git工具 上传测试代码 Jenkins部署php项目wordpress 发布php代码 安装插件 测试代码发布 实现发布成功发送邮件…...
17_HTML5 Web 存储 --[HTML5 API 学习之旅]
HTML5 Web 存储(Web Storage)是 HTML5 引入的一种在用户浏览器中存储数据的机制。它提供了比传统的 cookies 更加方便和强大的功能,包括更大的存储空间、更好的性能以及更简单的 API。Web 存储主要分为两种类型:localStorage 和 s…...
GCP Cloud Architect exam - PASS
备考指南 推荐视频课程 https://www.udemy.com/course/google-cloud-architect-certifications/?couponCodeKEEPLEARNING 推荐题库 https://www.udemy.com/course/gcp-professional-cloud-architect-exam-practice-tests-2024/?couponCodeKEEPLEARNING 错题集 http…...
【Sentinel】初识Sentinel
目录 1.1.雪崩问题及解决方案 1.1.1.雪崩问题 1.1.2.超时处理 1.1.3.仓壁模式 1.1.4.断路器 1.1.5.限流 1.1.6.总结 1.2.服务保护技术对比 1.3.Sentinel介绍和安装 1.3.1.初识Sentinel 1.3.2.安装Sentinel 1.4.微服务整合Sentinel 1.1.雪崩问题及解决方案 1.1.1.…...
java常见类库
StringBuffer类 String和StringBuffer的区别 String 不可变性:String 类是不可变的,这意味着一旦创建了一个 String 对象,其值就不能改变。每次对 String 进行修改(如连接、替换等操作)都会产生新的 String 对象&…...
Wordly Wise 3000 国际背单词01 介绍 + 测词汇量
📚 Wordly Wise 3000 国际背单词01 介绍 测词汇量 🌟 大家好!我们正式启动背Wordly Wise 3000单词,旨在利用国际资源和科学的学练方法,帮助大家更得效地坚持学练单词。我们将通过图文和Video等多种形式与大家分享经验…...
Unity Dots理论学习-2.ECS有关的模块(1)
Unity的实体组件系统(ECS)是支撑DOTS模块和技术的面向数据架构。ECS为Unity中的内存数据和runtime进程调度提供了高度的控制和确定性。 ECS for Unity 2022 LTS 配备了两个兼容的物理引擎,一个高级的Netcode package,以及一个用来…...
2021.12.28基于UDP同信的相关流程
作业 1、将TCP的CS模型再敲一遍 服务器 #include <myhead.h> #define PORT 8888 #define IP "192.168.124.123" int main(int argc, const char *argv[]) {//创建套接字//绑定本机IP和端口号//监听客户端请求//接收客户端连接请求//收发消息//创建套接字int…...
手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...
【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
IP如何挑?2025年海外专线IP如何购买?
你花了时间和预算买了IP,结果IP质量不佳,项目效率低下不说,还可能带来莫名的网络问题,是不是太闹心了?尤其是在面对海外专线IP时,到底怎么才能买到适合自己的呢?所以,挑IP绝对是个技…...
排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...
SpringAI实战:ChatModel智能对话全解
一、引言:Spring AI 与 Chat Model 的核心价值 🚀 在 Java 生态中集成大模型能力,Spring AI 提供了高效的解决方案 🤖。其中 Chat Model 作为核心交互组件,通过标准化接口简化了与大语言模型(LLM࿰…...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...
