Flink(八):DataStream API (五) Join
1. Window Join
Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction
或 FlatJoinFunction
,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码
stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);
语义上有一些值得注意的地方:
- 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
- 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为
[5, 10)
窗口中的元素在 join 之后的 timestamp 为 9。
1.1 滚动 Window Join
使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!
如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ...
的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction
。注意,滚动窗口 [6,7]
将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
1.2 滑动 Window Join
当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。
本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …
。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction
的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3]
中 join,但没有与窗口 [1,2]
中任何元素 join。
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
1.3 会话 Window Join
使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction
或 FlatJoinFunction
。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!
这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction
。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});
2. Interval Join
Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction
,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context
访问)。Interval join 目前仅支持 event time。
上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive()
和 .upperBoundExclusive()
可以将它们排除在外。
图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
相关文章:

Flink(八):DataStream API (五) Join
1. Window Join Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunct…...

HarmonyOS NEXT边学边玩:从零实现一个影视App(六、视频播放页的实现)
在HarmonyOS NEXT中,ArkUI是一个非常强大的UI框架,能够帮助开发者快速构建出美观且功能丰富的用户界面。本文将详细介绍如何使用ArkUI实现一个影视App的视频播放页面。将从零开始,逐步构建一个功能完善的视频播放页面,并解释每一部…...

salesforce实现一个字段的默认初始值根据另一个字段的值来自动确定
在 Salesforce 中,可以通过 公式字段 或 触发器 (Trigger) 实现字段的默认初始值根据另一个字段的值来自动确定,具体实现方法如下: 1. 使用公式字段 公式字段是一种动态字段,值会根据公式实时计算。 步骤: 导航到字段…...

Linux 文件权限详解
目录 前言 查看文件权限 修改文件权限 符号方式 数字方式 前言 Linux 文件权限是系统中非常重要的概念之一,用于控制对文件和目录的访问。权限分为读(Read)、写(Write)、执行(Execute)三个…...

【混合开发】CefSharp+Vue桌面应用程序开发
为什么选择CefSharpVue做桌面应用程序 CefSharp 基于 Chromium Embedded Framework (CEF) ,它可以将 Chromium 浏览器的功能嵌入到 .NET 应用程序中。通过 CefSharp,开发者可以在桌面应用程序中集成 Web 技术,包括 HTML、JavaScript、CSS 等…...

springBoot项目使用Elasticsearch教程
目录 一、引言(一)使用背景(二)版本库区别 二、引入依赖(一)springboot集成的es依赖(建议)(二)es提供的客户端库 三、配置(以yaml文件为例&#x…...

模型 多元化思维(系统科学)
系列文章分享模型,了解更多👉 模型_思维模型目录。融合多学科知识,全面解决问题。 1 多元化思维模型的应用 1.1 完美日记的私域流量运营 完美日记作为美妆行业的新兴品牌,通过多元化的思维模型在私域流量运营中取得了显著成功。…...

Google地图瓦片爬虫
地图地址说明 1、谷歌矢量(中文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlzh-CN&glcn&x{x}&y{y}&z{z}&sGalileo 2、谷歌矢量(英文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlen&glcn&x{x}&y{y}&z{z}&sGali…...

【C++】size_t全面解析与深入拓展
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯一、什么是size_t?为什么需要size_t? 💯二、size_t的特性与用途1. size_t是无符号类型示例: 2. size_t的跨平台适应性示例对…...

Web端实时播放RTSP视频流(监控)
一、安装ffmpeg: 1、官网下载FFmpeg: Download FFmpeg 2、点击Windows图标,选第一个:Windows builds from gyan.dev 3、跳转到下载页面: 4、下载后放到合适的位置,不用安装,解压即可: 5、配置path 复制解压后的\bin路径,配置环境变量如图: <...

学习 Git 的工作原理,而不仅仅是命令
Git 是常用的去中心化源代码存储库。它是由 Linux 创建者 Linus Torvalds 创建的,用于管理 Linux 内核源代码。像 GitHub 这样的整个服务都是基于它的。因此,如果您想在 Linux 世界中进行编程或将 IBM 的 DevOps Services 与 Git 结合使用,那…...

C语言变长嵌套数组常量初始化定义技巧
有时候,我们需要在代码里配置一些常量结构,比如一个固定的动作流程ActionFlow:包含N(即flow_num)个动作列表(ActionArray),每个动作列表包含M(即act_num)个可…...

如何查看特定版本的Spring源码
写在前面:大家好!我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正,感谢大家的不吝赐教。我的唯一博客更新地址是:https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油,冲鸭&#x…...

【深度学习】关键技术-激活函数(Activation Functions)
激活函数(Activation Functions) 激活函数是神经网络的重要组成部分,它的作用是将神经元的输入信号映射到输出信号,同时引入非线性特性,使神经网络能够处理复杂问题。以下是常见激活函数的种类、公式、图形特点及其应…...

网关相关知识
文章目录 什么是网关网关的主要作用网关的运用 什么是网关 网关又称网间连接器、协议转换器,也就是网段(局域网、广域网)关卡,不同网段中的主机不能直接通信,需要通过关卡才能进行互访,比如IP地址为192.168.31.9(子网掩码&#x…...

SpringBoot整合SpringSecurity详解
文章目录 SpringBoot整合SpringSecurity详解一、引言二、引入依赖三、配置 Spring Security四、自定义用户详细信息服务五、使用示例1. 创建用户实体类2. 测试登录功能 六、总结 SpringBoot整合SpringSecurity详解 一、引言 在当今的软件开发中,安全是一个至关重要…...

【C++基础】enum,union,uint8_t,static
enum 所以有时候使用 Enum 的目的,不是为了自定义一种数据类型,而是为了声明一组常量。 from: https://github.com/wangdoc/clang-tutorial/blob/main/docs/enum.md union C 语言提供了 Union 结构,用来自定义可以灵活变更的数据结构。它内部…...

单片机的原理及其应用:从入门到进阶的全方位指南
以下是一篇详细、深入的“单片机的原理及其应用”博客文章示例,适合想要系统学习或深入了解单片机的读者。文中不仅会介绍单片机的基本原理、内部构造、开发流程和应用领域,还会融入更多的理论分析、实操案例以及常见问题与解决思路等,帮助读…...

如何使用 Go语言操作亚马逊 S3 对象云存储
以下是使用 Go 语言操作亚马逊 S3 对象云存储的详细步骤和示例代码: 解决思路: 安装必要的 Go 语言包,这里我们将使用 aws-sdk-go 包来与 Amazon S3 进行交互。配置 AWS 凭证,包括访问密钥和秘密访问密钥,以及 AWS 区…...

2025年应用与API安全展望:挑战与机遇并存
进入2025年,应用与API安全的重要性愈发突出。在过去的一年里,API技术已经成为数字创新的核心。然而,API的大规模应用也使得攻击面显著扩展,2024年针对业务逻辑漏洞的API攻击占比高达27%,较前一年增加10%。与此同时&…...

Linux安装docker,安装配置xrdp远程桌面
Linux安装docker,安装配置xrdp远程桌面。 1、卸载旧版本docker 卸载旧版本docker命令 yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine现在就是没有旧版本的d…...

VD:生成a2l文件
目录 前言Simulink合并地址 ASAP2 editor 前言 我之前的方法都是通过Simulink模型生成代码的过程中顺便就把a2l文件生成出来了,这时的a2l文件还没有地址,所以紧接着会去通过elf文件更新地址,一直以为这是固定的流程和方法,今天无…...

【SpringBoot应用篇】SpringBoot+MDC+自定义Filter操作traceId实现日志链路追踪
【SpringBoot应用篇】SpringBootMDC自定义Filter操作traceId实现日志链路追踪 解决的问题解决方案MDC具体逻辑ymllogback-spring.xmlTraceIdUtil操作工具类TraceIdFilter自定义过滤器GlobalExceptionHandler全局异常处理类TraceIdAspect切面UserController测试验证 多线程处理M…...

unity2022以上导出到AndroidStudio后更新步骤
1、unity里面Export出unityLibrary 2、导出apk,里面才包含libil2cpp(新版unity无法直接导出libil2cpp 3、注释AS项目app下的build.gradle里面包含unityLibrary的代码 4、注释AS项目settings.gradle包含unityLibrary的代码 5、删除AS项目里面的unityLibrary文件夹 6、…...

【ArcGIS初学】产生随机点计算混淆矩阵
混淆矩阵:用于比较分类结果和地表真实信息 总体精度(overall accuracy) :指对角线上所有样本的像元数(正确分类的像元数)除以所有像元数。 生产者精度(producers accuracy) :某类中正确分类的像元数除以参考数据中该类的像元数(列方向),又称…...

Harmony面试模版
1. 自我介绍 看表达能力、沟通能力 面试记录: 2. 进一步挖掘 2.1. 现状 目前是在职还是离职,如果离职,从上一家公司离职的原因 2.2. 项目经验 如果自我介绍工作项目经验讲的不够清楚,可以根据简历上的信息再进一步了解 面试记…...

PCM5142集成32位384kHz PCM音频立体声114dB差分输出DAC编解码芯片
目录 PCM5142 简介PCM5142功能框图PCM5142特性 参考原理图 PCM5142 简介 PCM514x 属于单片 CMOS 集成电路系列,由立体声数模转换器 (DAC) 和采用薄型小外形尺寸 (TSSOP) 封装的附加支持电路组成。PCM514x 使用 TI 最新一代高级分段 DAC 架构产品,可实现…...

浪潮云财务系统xtdysrv.asmx存在命令执行漏洞
一、漏洞简介 浪潮云财务系统xtdysrv.asmx存在命令执行漏洞,未经身份验证的远程攻击者可通过该漏洞在服务器端任意执行代码。 二、漏洞影响 浪潮云财务系统三、网络测绘: fofa: title"TSCEV4.0"四、复现过程 前置条件 步骤 POC 1 数据包…...

【网络编程】基础知识
目录 网络发展史 局域网和广域网 局域网(LAN) 广域网(Wan) 光猫 路由器 网线 设备通信的要素 IP地址 基本概念 地址划分 特殊地址(后续编程使用) IP地址转换 字节序 网络模型 网络的体系结…...

ResNet (Residual Network) - 残差网络:深度卷积神经网络的突破
一、引言 在计算机视觉领域,图像识别一直是一个核心且具有挑战性的任务。随着深度学习的发展,卷积神经网络(CNN)在图像识别方面取得了显著的成果。然而,随着网络深度的增加,出现了梯度消失或梯度爆炸等问题…...