Flink(八):DataStream API (五) Join
1. Window Join
Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码
stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);
语义上有一些值得注意的地方:
- 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
- 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为
[5, 10)窗口中的元素在 join 之后的 timestamp 为 9。
1.1 滚动 Window Join
使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
1.2 滑动 Window Join
当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
1.3 会话 Window Join
使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
2. Interval Join
Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。Interval join 目前仅支持 event time。

上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。
图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
相关文章:
Flink(八):DataStream API (五) Join
1. Window Join Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunct…...
HarmonyOS NEXT边学边玩:从零实现一个影视App(六、视频播放页的实现)
在HarmonyOS NEXT中,ArkUI是一个非常强大的UI框架,能够帮助开发者快速构建出美观且功能丰富的用户界面。本文将详细介绍如何使用ArkUI实现一个影视App的视频播放页面。将从零开始,逐步构建一个功能完善的视频播放页面,并解释每一部…...
salesforce实现一个字段的默认初始值根据另一个字段的值来自动确定
在 Salesforce 中,可以通过 公式字段 或 触发器 (Trigger) 实现字段的默认初始值根据另一个字段的值来自动确定,具体实现方法如下: 1. 使用公式字段 公式字段是一种动态字段,值会根据公式实时计算。 步骤: 导航到字段…...
Linux 文件权限详解
目录 前言 查看文件权限 修改文件权限 符号方式 数字方式 前言 Linux 文件权限是系统中非常重要的概念之一,用于控制对文件和目录的访问。权限分为读(Read)、写(Write)、执行(Execute)三个…...
【混合开发】CefSharp+Vue桌面应用程序开发
为什么选择CefSharpVue做桌面应用程序 CefSharp 基于 Chromium Embedded Framework (CEF) ,它可以将 Chromium 浏览器的功能嵌入到 .NET 应用程序中。通过 CefSharp,开发者可以在桌面应用程序中集成 Web 技术,包括 HTML、JavaScript、CSS 等…...
springBoot项目使用Elasticsearch教程
目录 一、引言(一)使用背景(二)版本库区别 二、引入依赖(一)springboot集成的es依赖(建议)(二)es提供的客户端库 三、配置(以yaml文件为例&#x…...
模型 多元化思维(系统科学)
系列文章分享模型,了解更多👉 模型_思维模型目录。融合多学科知识,全面解决问题。 1 多元化思维模型的应用 1.1 完美日记的私域流量运营 完美日记作为美妆行业的新兴品牌,通过多元化的思维模型在私域流量运营中取得了显著成功。…...
Google地图瓦片爬虫
地图地址说明 1、谷歌矢量(中文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlzh-CN&glcn&x{x}&y{y}&z{z}&sGalileo 2、谷歌矢量(英文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlen&glcn&x{x}&y{y}&z{z}&sGali…...
【C++】size_t全面解析与深入拓展
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯一、什么是size_t?为什么需要size_t? 💯二、size_t的特性与用途1. size_t是无符号类型示例: 2. size_t的跨平台适应性示例对…...
Web端实时播放RTSP视频流(监控)
一、安装ffmpeg: 1、官网下载FFmpeg: Download FFmpeg 2、点击Windows图标,选第一个:Windows builds from gyan.dev 3、跳转到下载页面: 4、下载后放到合适的位置,不用安装,解压即可: 5、配置path 复制解压后的\bin路径,配置环境变量如图: <...
学习 Git 的工作原理,而不仅仅是命令
Git 是常用的去中心化源代码存储库。它是由 Linux 创建者 Linus Torvalds 创建的,用于管理 Linux 内核源代码。像 GitHub 这样的整个服务都是基于它的。因此,如果您想在 Linux 世界中进行编程或将 IBM 的 DevOps Services 与 Git 结合使用,那…...
C语言变长嵌套数组常量初始化定义技巧
有时候,我们需要在代码里配置一些常量结构,比如一个固定的动作流程ActionFlow:包含N(即flow_num)个动作列表(ActionArray),每个动作列表包含M(即act_num)个可…...
如何查看特定版本的Spring源码
写在前面:大家好!我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正,感谢大家的不吝赐教。我的唯一博客更新地址是:https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油,冲鸭&#x…...
【深度学习】关键技术-激活函数(Activation Functions)
激活函数(Activation Functions) 激活函数是神经网络的重要组成部分,它的作用是将神经元的输入信号映射到输出信号,同时引入非线性特性,使神经网络能够处理复杂问题。以下是常见激活函数的种类、公式、图形特点及其应…...
网关相关知识
文章目录 什么是网关网关的主要作用网关的运用 什么是网关 网关又称网间连接器、协议转换器,也就是网段(局域网、广域网)关卡,不同网段中的主机不能直接通信,需要通过关卡才能进行互访,比如IP地址为192.168.31.9(子网掩码&#x…...
SpringBoot整合SpringSecurity详解
文章目录 SpringBoot整合SpringSecurity详解一、引言二、引入依赖三、配置 Spring Security四、自定义用户详细信息服务五、使用示例1. 创建用户实体类2. 测试登录功能 六、总结 SpringBoot整合SpringSecurity详解 一、引言 在当今的软件开发中,安全是一个至关重要…...
【C++基础】enum,union,uint8_t,static
enum 所以有时候使用 Enum 的目的,不是为了自定义一种数据类型,而是为了声明一组常量。 from: https://github.com/wangdoc/clang-tutorial/blob/main/docs/enum.md union C 语言提供了 Union 结构,用来自定义可以灵活变更的数据结构。它内部…...
单片机的原理及其应用:从入门到进阶的全方位指南
以下是一篇详细、深入的“单片机的原理及其应用”博客文章示例,适合想要系统学习或深入了解单片机的读者。文中不仅会介绍单片机的基本原理、内部构造、开发流程和应用领域,还会融入更多的理论分析、实操案例以及常见问题与解决思路等,帮助读…...
如何使用 Go语言操作亚马逊 S3 对象云存储
以下是使用 Go 语言操作亚马逊 S3 对象云存储的详细步骤和示例代码: 解决思路: 安装必要的 Go 语言包,这里我们将使用 aws-sdk-go 包来与 Amazon S3 进行交互。配置 AWS 凭证,包括访问密钥和秘密访问密钥,以及 AWS 区…...
2025年应用与API安全展望:挑战与机遇并存
进入2025年,应用与API安全的重要性愈发突出。在过去的一年里,API技术已经成为数字创新的核心。然而,API的大规模应用也使得攻击面显著扩展,2024年针对业务逻辑漏洞的API攻击占比高达27%,较前一年增加10%。与此同时&…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
站群服务器的应用场景都有哪些?
站群服务器主要是为了多个网站的托管和管理所设计的,可以通过集中管理和高效资源的分配,来支持多个独立的网站同时运行,让每一个网站都可以分配到独立的IP地址,避免出现IP关联的风险,用户还可以通过控制面板进行管理功…...
在 Visual Studio Code 中使用驭码 CodeRider 提升开发效率:以冒泡排序为例
目录 前言1 插件安装与配置1.1 安装驭码 CodeRider1.2 初始配置建议 2 示例代码:冒泡排序3 驭码 CodeRider 功能详解3.1 功能概览3.2 代码解释功能3.3 自动注释生成3.4 逻辑修改功能3.5 单元测试自动生成3.6 代码优化建议 4 驭码的实际应用建议5 常见问题与解决建议…...
图解JavaScript原型:原型链及其分析 | JavaScript图解
忽略该图的细节(如内存地址值没有用二进制) 以下是对该图进一步的理解和总结 1. JS 对象概念的辨析 对象是什么:保存在堆中一块区域,同时在栈中有一块区域保存其在堆中的地址(也就是我们通常说的该变量指向谁&…...
Python学习(8) ----- Python的类与对象
Python 中的类(Class)与对象(Object)是面向对象编程(OOP)的核心。我们可以通过“类是模板,对象是实例”来理解它们的关系。 🧱 一句话理解: 类就像“图纸”,对…...
