Flink(八):DataStream API (五) Join
1. Window Join
Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction
或 FlatJoinFunction
,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码
stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);
语义上有一些值得注意的地方:
- 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
- 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为
[5, 10)
窗口中的元素在 join 之后的 timestamp 为 9。
1.1 滚动 Window Join
使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!
如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ...
的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction
。注意,滚动窗口 [6,7]
将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
1.2 滑动 Window Join
当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。
本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …
。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction
的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3]
中 join,但没有与窗口 [1,2]
中任何元素 join。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
1.3 会话 Window Join
使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!
这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction
。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
2. Interval Join
Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction
,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context
访问)。Interval join 目前仅支持 event time。
上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive()
和 .upperBoundExclusive()
可以将它们排除在外。
图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
相关文章:

Flink(八):DataStream API (五) Join
1. Window Join Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunct…...

HarmonyOS NEXT边学边玩:从零实现一个影视App(六、视频播放页的实现)
在HarmonyOS NEXT中,ArkUI是一个非常强大的UI框架,能够帮助开发者快速构建出美观且功能丰富的用户界面。本文将详细介绍如何使用ArkUI实现一个影视App的视频播放页面。将从零开始,逐步构建一个功能完善的视频播放页面,并解释每一部…...
salesforce实现一个字段的默认初始值根据另一个字段的值来自动确定
在 Salesforce 中,可以通过 公式字段 或 触发器 (Trigger) 实现字段的默认初始值根据另一个字段的值来自动确定,具体实现方法如下: 1. 使用公式字段 公式字段是一种动态字段,值会根据公式实时计算。 步骤: 导航到字段…...
Linux 文件权限详解
目录 前言 查看文件权限 修改文件权限 符号方式 数字方式 前言 Linux 文件权限是系统中非常重要的概念之一,用于控制对文件和目录的访问。权限分为读(Read)、写(Write)、执行(Execute)三个…...

【混合开发】CefSharp+Vue桌面应用程序开发
为什么选择CefSharpVue做桌面应用程序 CefSharp 基于 Chromium Embedded Framework (CEF) ,它可以将 Chromium 浏览器的功能嵌入到 .NET 应用程序中。通过 CefSharp,开发者可以在桌面应用程序中集成 Web 技术,包括 HTML、JavaScript、CSS 等…...
springBoot项目使用Elasticsearch教程
目录 一、引言(一)使用背景(二)版本库区别 二、引入依赖(一)springboot集成的es依赖(建议)(二)es提供的客户端库 三、配置(以yaml文件为例&#x…...

模型 多元化思维(系统科学)
系列文章分享模型,了解更多👉 模型_思维模型目录。融合多学科知识,全面解决问题。 1 多元化思维模型的应用 1.1 完美日记的私域流量运营 完美日记作为美妆行业的新兴品牌,通过多元化的思维模型在私域流量运营中取得了显著成功。…...
Google地图瓦片爬虫
地图地址说明 1、谷歌矢量(中文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlzh-CN&glcn&x{x}&y{y}&z{z}&sGalileo 2、谷歌矢量(英文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlen&glcn&x{x}&y{y}&z{z}&sGali…...

【C++】size_t全面解析与深入拓展
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯一、什么是size_t?为什么需要size_t? 💯二、size_t的特性与用途1. size_t是无符号类型示例: 2. size_t的跨平台适应性示例对…...

Web端实时播放RTSP视频流(监控)
一、安装ffmpeg: 1、官网下载FFmpeg: Download FFmpeg 2、点击Windows图标,选第一个:Windows builds from gyan.dev 3、跳转到下载页面: 4、下载后放到合适的位置,不用安装,解压即可: 5、配置path 复制解压后的\bin路径,配置环境变量如图: <...

学习 Git 的工作原理,而不仅仅是命令
Git 是常用的去中心化源代码存储库。它是由 Linux 创建者 Linus Torvalds 创建的,用于管理 Linux 内核源代码。像 GitHub 这样的整个服务都是基于它的。因此,如果您想在 Linux 世界中进行编程或将 IBM 的 DevOps Services 与 Git 结合使用,那…...
C语言变长嵌套数组常量初始化定义技巧
有时候,我们需要在代码里配置一些常量结构,比如一个固定的动作流程ActionFlow:包含N(即flow_num)个动作列表(ActionArray),每个动作列表包含M(即act_num)个可…...

如何查看特定版本的Spring源码
写在前面:大家好!我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正,感谢大家的不吝赐教。我的唯一博客更新地址是:https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油,冲鸭&#x…...

【深度学习】关键技术-激活函数(Activation Functions)
激活函数(Activation Functions) 激活函数是神经网络的重要组成部分,它的作用是将神经元的输入信号映射到输出信号,同时引入非线性特性,使神经网络能够处理复杂问题。以下是常见激活函数的种类、公式、图形特点及其应…...
网关相关知识
文章目录 什么是网关网关的主要作用网关的运用 什么是网关 网关又称网间连接器、协议转换器,也就是网段(局域网、广域网)关卡,不同网段中的主机不能直接通信,需要通过关卡才能进行互访,比如IP地址为192.168.31.9(子网掩码&#x…...

SpringBoot整合SpringSecurity详解
文章目录 SpringBoot整合SpringSecurity详解一、引言二、引入依赖三、配置 Spring Security四、自定义用户详细信息服务五、使用示例1. 创建用户实体类2. 测试登录功能 六、总结 SpringBoot整合SpringSecurity详解 一、引言 在当今的软件开发中,安全是一个至关重要…...
【C++基础】enum,union,uint8_t,static
enum 所以有时候使用 Enum 的目的,不是为了自定义一种数据类型,而是为了声明一组常量。 from: https://github.com/wangdoc/clang-tutorial/blob/main/docs/enum.md union C 语言提供了 Union 结构,用来自定义可以灵活变更的数据结构。它内部…...
单片机的原理及其应用:从入门到进阶的全方位指南
以下是一篇详细、深入的“单片机的原理及其应用”博客文章示例,适合想要系统学习或深入了解单片机的读者。文中不仅会介绍单片机的基本原理、内部构造、开发流程和应用领域,还会融入更多的理论分析、实操案例以及常见问题与解决思路等,帮助读…...
如何使用 Go语言操作亚马逊 S3 对象云存储
以下是使用 Go 语言操作亚马逊 S3 对象云存储的详细步骤和示例代码: 解决思路: 安装必要的 Go 语言包,这里我们将使用 aws-sdk-go 包来与 Amazon S3 进行交互。配置 AWS 凭证,包括访问密钥和秘密访问密钥,以及 AWS 区…...

2025年应用与API安全展望:挑战与机遇并存
进入2025年,应用与API安全的重要性愈发突出。在过去的一年里,API技术已经成为数字创新的核心。然而,API的大规模应用也使得攻击面显著扩展,2024年针对业务逻辑漏洞的API攻击占比高达27%,较前一年增加10%。与此同时&…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...

前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...

Spring数据访问模块设计
前面我们已经完成了IoC和web模块的设计,聪明的码友立马就知道了,该到数据访问模块了,要不就这俩玩个6啊,查库势在必行,至此,它来了。 一、核心设计理念 1、痛点在哪 应用离不开数据(数据库、No…...

Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!
简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求,并检查收到的响应。它以以下模式之一…...