当前位置: 首页 > news >正文

Flink的多流转换(分流-侧输出流、合流-union、connect、join)

        在实际应用中,我们可能要将多个不同来源的数据连接合并在一起进行处理,也有可能要将一条流拆分成多条流进行处理,这就涉及到了Flink的多流转换问题。简单来说,就是分流和合流两大操作,分流主要通过侧输出流实现,合流的算子就比较丰富了,有union、connect、join等。

一、分流

        所谓分流,就是通过定义一些筛选条件,将一个dataStream拆分成多个子dataStream的过程,每条子数据流之间完全独立。Flink中的分流主要通过侧输出流来实现。

        通过调用底层的处理函数,可以获取到上下文信息,调用上下文的.output方法就可以实施分流操作了。.output方法需要传入一个“输出标签"(OutputTag),用来标记侧输出流(相当于给侧输出流盖了个戳,指明他的名称和类型),之后也可以通过.getSideOutput()方法传入OutputTag获取到相应的侧输出流。

二、合流

        对多个来源的多条流进行联合处理时,需要用到合流操作,具体有如下几种合流算子:

1. union

        union操作要求不同流中的数据类型必须一致, 类似sql语言中的union,是纵向的合并。对datastream调用.union方法即可实现多流合并,合并后的流类型仍然是datastream。这里要注意,多条流合并后的水位线应以最小的那个为准(类似多个并行子任务向下游传递)。

stream1.union(stream2, stream3, ...)

2. connect

        union操作简单,但要求流的数据类型一致,实际应用中实用性不高。针对两条数据类型不一样的流,Flink还提供了connect合流操作,connect操作只能连接两条流。

(1) 两个dataStream进行connect -> 连接流(ConnectedStreams)

        对于两条数据类型不一致的dataStream进行连接,调用.connect()方法,所得到的是一个连接流ConnectedStreams,然后再调用同处理方法分别对两条流进行处理,得到一个统一类型的dataStream。这里的同处理方法可以是map、flatmap也可以是底层的处理函数process,只是在传入参数时跟以往的单流不同,如map方法传入的不再是MapFunction而是CoMapFunction,可以实现对两条流分别做map操作。

        对ConnectedStreams也可以先调用keyBy进行按键分区操作后,再调用同处理方法。这里调用KeyBy后得到的仍然是ConnectedStreams,keyBy要传入两个参数keySelector1和keySelector2类似于sql中两表之间的 join操作的关联字段。

connectedStreams.keyBy(keySelector1, keySelector2);

(2) dataStream与广播流(broadcastStream)进行connect -> 广播连接流

       当需要动态定义某些规则或配置时,如维度表配置信息是动态变化的,存储在MySQL数据库中,我们用maxwell实时对它进行了监控,当发生变化时,这个配置信息是要完整的告知原始数据流的(从业务数据库中抽取的原始数据),即若原始数据流分为了多个并行子任务,则每个并行子任务上都应该知道配置信息的变化,因此需要对配置信息进行广播连接。

        对dataStream调用.broadcast()方法就可以得到广播流,将要处理的数据流与这条广播流进行connect,得到的就是广播连接流,可以调用.process方法进行动态处理,同样要实现的是一个类似CoProcessFunction的抽象类,对两条流分别进行处理。

3. join

        connect方法已经能够实现各种需求了,但是其支持的处理函数太过于底层,在很多场景下太过于抽象了,flink还为datastream提供了内置的join算子和coGroup算子来简化一些特定场景下的合流操作。

(1) 窗口联结(window join)

        当我们不仅需要对两条流进行连接,还需要对连接后的流进行窗口操作,Flink为这种场景专门提供了一个窗口联结算子。如下操作可将两条流基于联结字段进行配对,并将key相同的放入一个窗口进行窗口计算。

stream1.join(stream2).where(<KeySelector>)    // stream1的联结字段.equalTo(<KeySelector>)    // stream2的联结字段.window(<WindowAssigner>).apply(<JoinFunction>)

        注意 这里调用窗口函数只能通过.apply()方法。

        窗口join的具体流程如下:两条流根据key进行分组,分别进入对应的窗口存储;到达窗口时间时,会先统计窗口内两条流的笛卡尔积,然后进行遍历,遍历到一对匹配的数据就调用一次窗口函数并输出结果。

(2) 间隔联结(interval join)

        间隔联结为数据流中的每一条数据单独开辟属于自己的时间窗口。试想这样一个场景,对于一条流A中的一条数据a,它只想和自己时间戳的前后一段时间间隔的B数据流进行连接,这样窗口联结就无法做到,需要间隔联结。

        间隔联结的两条流必须基于相同的key,且需要给定间隔上界和间隔下界,则数据a的窗口大小就是[a.timestamp+lowbound, a.timestamp+upperbound],其中lowbound<upperbound,两者都可正可负。

stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process(new ProcessJoinFunction(){})

4. coGroup

        coGroup 与窗口联结类似,也是将两条流合并后开窗处理匹配元素,调用时只需将.join()方法换成.coGroup()方法即可。

stream1.coGroup(stream2).where(<KeySelector>)    // stream1的联结字段.equalTo(<KeySelector>)    // stream2的联结字段.window(<WindowAssigner>).apply(<CoGroupFunction>)

        在window join中,数据在窗口中是先做笛卡尔积,再遍历是否匹配, 只有匹配的数据才会去调用apply方法,因此,window join实现的是类似sql中的inner join功能。而在coGroup函数中,数据不会做笛卡尔积,而是将所有搜集到的数据都传入到apply方法中,用户可以自定义匹配逻辑,因此可以实现任意外连接或是其他用户想要的连接方式。

相关文章:

Flink的多流转换(分流-侧输出流、合流-union、connect、join)

在实际应用中&#xff0c;我们可能要将多个不同来源的数据连接合并在一起进行处理&#xff0c;也有可能要将一条流拆分成多条流进行处理&#xff0c;这就涉及到了Flink的多流转换问题。简单来说&#xff0c;就是分流和合流两大操作&#xff0c;分流主要通过侧输出流实现&#x…...

DirectUI属性表

<?xml version"1.0" encoding"UTF-8"?> <Controls><Window parent""><Attribute name"size" default"0,0" type"SIZE" comment"窗口的初始化大小,如(800,600)"/><Attribu…...

RBAC权限控制

1、Spring Security 是一个功能强大的Java安全框架&#xff0c;它提供了全面的安全认证和授权的支持。 2 SpringSecurity配置类&#xff08;源码逐行解析&#xff09; Spring Security的配置类是实现安全控制的核心部分 开启Spring Security各种功能&#xff0c;以确保Web应…...

STM32高级物联网通信之以太网通讯

目录 以太网通讯基础知识 什么是以太网 互联网和以太网的区别 1)概念与范围 (1)互联网 (2)以太网 2)技术特点 (1)互联网 (2)以太网 3)应用场景 (1)互联网 (2)以太网 以太网的层次 1)物理层 2)数据链路层 OSI 7层模型 TCPIP 4层模型 一些常见…...

【小程序】全局配置window和tabBar

目录 全局配置 1. 全局配置文件及常用的配置项 全局配置 - window 1. 小程序窗口的组成部分 2. 了解 window 节点常用的配置项 ​编辑 3. 设置导航栏的标题 4. 设置导航栏的背景色 5. 设置导航栏的标题颜色 6. 全局开启下拉刷新功能 7. 设置下拉刷新时窗口的背景色 …...

详解VHDL如何编写Testbench

1.概述 仿真测试平台文件(Testbench)是可以用来验证所设计的硬件模型正确性的 VHDL模型&#xff0c;它为所测试的元件提供了激励信号&#xff0c;可以以波形的方式显示仿真结果或把测试结果存储到文件中。这里所说的激励信号可以直接集成在测试平台文件中&#xff0c;也可以从…...

冥想的实践

这是我某一天的正念和冥想实践&#xff0c;我对正念练习、冥想练习进行了分别的统计。 正念练习&#xff1a;1分钟**5次 冥想&#xff1a;15分钟10分钟 正念练习&#xff0c;基本在工作休息时间练习。当然&#xff0c;工作过程中&#xff0c;也有一部分时间会有正念的状态&am…...

STM32F103RCT6学习之四:定时器

1.基础 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元&#xff0c;在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能&#xff0c;而且还包含内外时钟源选择、输入捕获、…...

如何在网页端使用 IDE 高效地阅读 GitHub 源码?

如何在网页端使用 IDE 高效地阅读 GitHub 源码&#xff1f; 前言什么是 GitHub1s&#xff1f;使用 GitHub1s 阅读 browser-use 项目源码步骤 1: 打开 GitHub 项目页面步骤 2: 修改 URL 使用 GitHub1s步骤 3: 浏览文件结构步骤 4: 使用代码高亮和智能补全功能步骤 5: 快速跳转和…...

易基因: BS+ChIP-seq揭示DNA甲基化调控非编码RNA(VIM-AS1)抑制肿瘤侵袭性|Exp Mol Med

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 肝细胞癌&#xff08;hepatocellular carcinoma&#xff0c;HCC&#xff09;早期复发仍然是一个具有挑战性的领域&#xff0c;其中涉及的机制尚未完全被理解。尽管微血管侵犯&#xff08…...

欢迪迈手机商城设计与实现基于(代码+数据库+LW)

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本欢迪迈手机商城就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息…...

数据库基础与应用:从概念到实践

数据库是信息技术中的核心组件之一&#xff0c;是现代计算机系统中不可或缺的部分。无论是日常应用的社交网络、电子商务网站&#xff0c;还是企业级的大型系统&#xff0c;几乎所有的信息管理都离不开数据库。那么&#xff0c;数据库究竟是什么&#xff1f;它是如何工作的&…...

jenkins集成工具(一)部署php项目

目录 什么是CI 、CD Jenkins集成工具 一、Jenkins介绍 二、jenkins的安装和部署 环境部署 安装jenkins 安装gitlab 配置镜像源进行安装 修改密码 安装git工具 上传测试代码 Jenkins部署php项目wordpress 发布php代码 安装插件 测试代码发布 实现发布成功发送邮件…...

17_HTML5 Web 存储 --[HTML5 API 学习之旅]

HTML5 Web 存储&#xff08;Web Storage&#xff09;是 HTML5 引入的一种在用户浏览器中存储数据的机制。它提供了比传统的 cookies 更加方便和强大的功能&#xff0c;包括更大的存储空间、更好的性能以及更简单的 API。Web 存储主要分为两种类型&#xff1a;localStorage 和 s…...

GCP Cloud Architect exam - PASS

备考指南 推荐视频课程 https://www.udemy.com/course/google-cloud-architect-certifications/?couponCodeKEEPLEARNING 推荐题库 https://www.udemy.com/course/gcp-professional-cloud-architect-exam-practice-tests-2024​/?couponCodeKEEPLEARNING 错题集 http…...

【Sentinel】初识Sentinel

目录 1.1.雪崩问题及解决方案 1.1.1.雪崩问题 1.1.2.超时处理 1.1.3.仓壁模式 1.1.4.断路器 1.1.5.限流 1.1.6.总结 1.2.服务保护技术对比 1.3.Sentinel介绍和安装 1.3.1.初识Sentinel 1.3.2.安装Sentinel 1.4.微服务整合Sentinel 1.1.雪崩问题及解决方案 1.1.1.…...

java常见类库

StringBuffer类 String和StringBuffer的区别 String 不可变性&#xff1a;String 类是不可变的&#xff0c;这意味着一旦创建了一个 String 对象&#xff0c;其值就不能改变。每次对 String 进行修改&#xff08;如连接、替换等操作&#xff09;都会产生新的 String 对象&…...

Wordly Wise 3000 国际背单词01 介绍 + 测词汇量

&#x1f4da; Wordly Wise 3000 国际背单词01 介绍 测词汇量 &#x1f31f; 大家好&#xff01;我们正式启动背Wordly Wise 3000单词&#xff0c;旨在利用国际资源和科学的学练方法&#xff0c;帮助大家更得效地坚持学练单词。我们将通过图文和Video等多种形式与大家分享经验…...

Unity Dots理论学习-2.ECS有关的模块(1)

Unity的实体组件系统&#xff08;ECS&#xff09;是支撑DOTS模块和技术的面向数据架构。ECS为Unity中的内存数据和runtime进程调度提供了高度的控制和确定性。 ECS for Unity 2022 LTS 配备了两个兼容的物理引擎&#xff0c;一个高级的Netcode package&#xff0c;以及一个用来…...

2021.12.28基于UDP同信的相关流程

作业 1、将TCP的CS模型再敲一遍 服务器 #include <myhead.h> #define PORT 8888 #define IP "192.168.124.123" int main(int argc, const char *argv[]) {//创建套接字//绑定本机IP和端口号//监听客户端请求//接收客户端连接请求//收发消息//创建套接字int…...

路沿模板,乐山水泥路面模板,40公分路面钢模哪里有名

打路面模板&#xff1a;乐山水泥路面的优质之选在道路建设中&#xff0c;打路面模板起着至关重要的作用。它不仅关系到路面的成型质量&#xff0c;还影响着整个工程的效率和成本。乐山地区对于道路建设的需求不断增加&#xff0c;尤其是在水泥路面的铺设方面&#xff0c;40公分…...

DanKoe 视频笔记:人生经验课:给18岁自己的信

在本节课中&#xff0c;我们将学习一位28岁人士回顾过去&#xff0c;总结出的核心人生经验。这些经验旨在帮助年轻人&#xff0c;特别是那些感到迷茫、渴望超越平凡生活的人&#xff0c;建立自主性、明确目标并采取有效行动。我们将把这些经验整理成一套清晰的教程&#xff0c;…...

别再折腾了!保姆级AirSim+UE5.3安装配置指南(附常见编译错误解决)

AirSim与虚幻引擎5.3深度整合&#xff1a;从零搭建自动驾驶仿真环境的完整实践 在自动驾驶技术快速发展的今天&#xff0c;仿真环境已成为算法开发与测试不可或缺的一环。微软开源的AirSim作为一个高度逼真的仿真平台&#xff0c;与虚幻引擎5.3的结合为开发者提供了前所未有的视…...

Galaxy UI组件库深度解析:3000+开源UI元素的完整实践手册

Galaxy UI组件库深度解析&#xff1a;3000开源UI元素的完整实践手册 【免费下载链接】galaxy The largest Open-Source UI Library! Community-made and free to use. Made with either CSS or Tailwind. 项目地址: https://gitcode.com/gh_mirrors/gal/galaxy 在当今快…...

在树莓派4B上编译运行Speedtest-CLI:手把手解决curl和expat库的交叉编译难题

树莓派4B实战&#xff1a;从零构建Speedtest-CLI测速工具全流程指南 1. 环境准备与工具链配置 在树莓派4B上构建Speedtest-CLI测速工具&#xff0c;首先需要搭建完整的交叉编译环境。不同于x86平台的直接编译&#xff0c;ARM架构下的开发需要特别注意工具链的选择和配置。 必备…...

ESP32S3-Cam + MPU6050 DMP移植避坑实录:从编译报错到姿态数据稳定输出的完整流程

ESP32S3-Cam与MPU6050 DMP移植实战&#xff1a;从编译报错到稳定姿态解算的全流程解析 当ESP32S3-Cam遇上MPU6050的DMP&#xff08;数字运动处理器&#xff09;功能&#xff0c;本应是物联网项目中实现低成本姿态检测的完美组合。但实际移植过程中&#xff0c;开发者往往会遭遇…...

乙巳马年·皇城大门春联生成终端W安全部署实践:网络配置与访问控制

乙巳马年皇城大门春联生成终端W安全部署实践&#xff1a;网络配置与访问控制 最近在星图GPU平台上部署了一个挺有意思的AI应用&#xff0c;叫“皇城大门春联生成终端W”。说白了&#xff0c;就是一个能根据你的要求&#xff0c;自动生成各种风格春联的AI模型。部署过程本身不难…...

选择性记忆提取,把人类遗忘机制用在了RAG上,这架构真有点东西

当前大模型处理长文本面临三大瓶颈&#xff1a;算力爆炸&#xff1a;传统注意力机制随文本长度呈二次方增长&#xff08;O(N)&#xff09;&#xff0c;百万级token直接OOMRAG碎片化&#xff1a;检索增强生成将文档切成独立片段&#xff0c;破坏多跳推理的逻辑链条记忆遗忘&…...

古基因组学:降解DNA的损伤模式、污染评估与群体历史推断

点击 “AladdinEdu&#xff0c;你的AI学习实践工作坊”&#xff0c;注册即送-H卡级别算力&#xff0c;沉浸式云原生集成开发环境&#xff0c;80G大显存多卡并行&#xff0c;按量弹性计费&#xff0c;教育用户更享超低价。 摘要&#xff1a;古基因组学通过对古代生物遗骸中高度降…...

掌握TegraRcmGUI:从入门到精通的Switch注入实践指南

掌握TegraRcmGUI&#xff1a;从入门到精通的Switch注入实践指南 【免费下载链接】TegraRcmGUI C GUI for TegraRcmSmash (Fuse Gele exploit for Nintendo Switch) 项目地址: https://gitcode.com/gh_mirrors/te/TegraRcmGUI TegraRcmGUI是一款基于C开发的图形化界面工具…...