当前位置: 首页 > news >正文

Flink(八):DataStream API (五) Join

1. Window Join

Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。常见的用例可以总结为以下代码

stream.join(otherStream).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>);

语义上有一些值得注意的地方:

  • 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
  • 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。

1.1 滚动 Window Join

使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出!

如图所示,定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(TumblingEventTimeWindows.of(Time.milliseconds(2))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

1.2 滑动 Window Join

当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出!注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。

本例中定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */)).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

1.3 会话 Window Join

使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出!

这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.join(greenStream).where(<KeySelector>).equalTo(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.milliseconds(1))).apply (new JoinFunction<Integer, Integer, String> (){@Overridepublic String join(Integer first, Integer second) {return first + "," + second;}});

2. Interval Join

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。Interval join 目前仅支持 event time。

上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;...DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;orangeStream.keyBy(<KeySelector>).intervalJoin(greenStream.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String>(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});

相关文章:

Flink(八):DataStream API (五) Join

1. Window Join Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义&#xff0c;并且两个流中的元素都会被用于计算窗口的结果。两个流中的元素在组合之后&#xff0c;会被传递给用户定义的 JoinFunction 或 FlatJoinFunct…...

HarmonyOS NEXT边学边玩:从零实现一个影视App(六、视频播放页的实现)

在HarmonyOS NEXT中&#xff0c;ArkUI是一个非常强大的UI框架&#xff0c;能够帮助开发者快速构建出美观且功能丰富的用户界面。本文将详细介绍如何使用ArkUI实现一个影视App的视频播放页面。将从零开始&#xff0c;逐步构建一个功能完善的视频播放页面&#xff0c;并解释每一部…...

salesforce实现一个字段的默认初始值根据另一个字段的值来自动确定

在 Salesforce 中&#xff0c;可以通过 公式字段 或 触发器 (Trigger) 实现字段的默认初始值根据另一个字段的值来自动确定&#xff0c;具体实现方法如下&#xff1a; 1. 使用公式字段 公式字段是一种动态字段&#xff0c;值会根据公式实时计算。 步骤&#xff1a; 导航到字段…...

Linux 文件权限详解

目录 前言 查看文件权限 修改文件权限 符号方式 数字方式 前言 Linux 文件权限是系统中非常重要的概念之一&#xff0c;用于控制对文件和目录的访问。权限分为读&#xff08;Read&#xff09;、写&#xff08;Write&#xff09;、执行&#xff08;Execute&#xff09;三个…...

【混合开发】CefSharp+Vue桌面应用程序开发

为什么选择CefSharpVue做桌面应用程序 CefSharp 基于 Chromium Embedded Framework (CEF) &#xff0c;它可以将 Chromium 浏览器的功能嵌入到 .NET 应用程序中。通过 CefSharp&#xff0c;开发者可以在桌面应用程序中集成 Web 技术&#xff0c;包括 HTML、JavaScript、CSS 等…...

springBoot项目使用Elasticsearch教程

目录 一、引言&#xff08;一&#xff09;使用背景&#xff08;二&#xff09;版本库区别 二、引入依赖&#xff08;一&#xff09;springboot集成的es依赖&#xff08;建议&#xff09;&#xff08;二&#xff09;es提供的客户端库 三、配置&#xff08;以yaml文件为例&#x…...

模型 多元化思维(系统科学)

系列文章分享模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。融合多学科知识&#xff0c;全面解决问题。 1 多元化思维模型的应用 1.1 完美日记的私域流量运营 完美日记作为美妆行业的新兴品牌&#xff0c;通过多元化的思维模型在私域流量运营中取得了显著成功。…...

Google地图瓦片爬虫

地图地址说明 1、谷歌矢量(中文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlzh-CN&glcn&x{x}&y{y}&z{z}&sGalileo 2、谷歌矢量(英文标注) http://mt{0-3}.google.cn/vt/vm416115521&hlen&glcn&x{x}&y{y}&z{z}&sGali…...

【C++】size_t全面解析与深入拓展

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;一、什么是size_t&#xff1f;为什么需要size_t&#xff1f; &#x1f4af;二、size_t的特性与用途1. size_t是无符号类型示例&#xff1a; 2. size_t的跨平台适应性示例对…...

Web端实时播放RTSP视频流(监控)

一、安装ffmpeg: 1、官网下载FFmpeg: Download FFmpeg 2、点击Windows图标,选第一个:Windows builds from gyan.dev 3、跳转到下载页面: 4、下载后放到合适的位置,不用安装,解压即可: 5、配置path 复制解压后的\bin路径,配置环境变量如图: <...

学习 Git 的工作原理,而不仅仅是命令

Git 是常用的去中心化源代码存储库。它是由 Linux 创建者 Linus Torvalds 创建的&#xff0c;用于管理 Linux 内核源代码。像 GitHub 这样的整个服务都是基于它的。因此&#xff0c;如果您想在 Linux 世界中进行编程或将 IBM 的 DevOps Services 与 Git 结合使用&#xff0c;那…...

C语言变长嵌套数组常量初始化定义技巧

有时候&#xff0c;我们需要在代码里配置一些常量结构&#xff0c;比如一个固定的动作流程ActionFlow&#xff1a;包含N&#xff08;即flow_num&#xff09;个动作列表&#xff08;ActionArray&#xff09;&#xff0c;每个动作列表包含M&#xff08;即act_num&#xff09;个可…...

如何查看特定版本的Spring源码

写在前面&#xff1a;大家好&#xff01;我是晴空๓。如果博客中有不足或者的错误的地方欢迎在评论区或者私信我指正&#xff0c;感谢大家的不吝赐教。我的唯一博客更新地址是&#xff1a;https://ac-fun.blog.csdn.net/。非常感谢大家的支持。一起加油&#xff0c;冲鸭&#x…...

【深度学习】关键技术-激活函数(Activation Functions)

激活函数&#xff08;Activation Functions&#xff09; 激活函数是神经网络的重要组成部分&#xff0c;它的作用是将神经元的输入信号映射到输出信号&#xff0c;同时引入非线性特性&#xff0c;使神经网络能够处理复杂问题。以下是常见激活函数的种类、公式、图形特点及其应…...

网关相关知识

文章目录 什么是网关网关的主要作用网关的运用 什么是网关 网关又称网间连接器、协议转换器&#xff0c;也就是网段(局域网、广域网)关卡&#xff0c;不同网段中的主机不能直接通信&#xff0c;需要通过关卡才能进行互访&#xff0c;比如IP地址为192.168.31.9(子网掩码&#x…...

SpringBoot整合SpringSecurity详解

文章目录 SpringBoot整合SpringSecurity详解一、引言二、引入依赖三、配置 Spring Security四、自定义用户详细信息服务五、使用示例1. 创建用户实体类2. 测试登录功能 六、总结 SpringBoot整合SpringSecurity详解 一、引言 在当今的软件开发中&#xff0c;安全是一个至关重要…...

【C++基础】enum,union,uint8_t,static

enum 所以有时候使用 Enum 的目的&#xff0c;不是为了自定义一种数据类型&#xff0c;而是为了声明一组常量。 from: https://github.com/wangdoc/clang-tutorial/blob/main/docs/enum.md union C 语言提供了 Union 结构&#xff0c;用来自定义可以灵活变更的数据结构。它内部…...

单片机的原理及其应用:从入门到进阶的全方位指南

以下是一篇详细、深入的“单片机的原理及其应用”博客文章示例&#xff0c;适合想要系统学习或深入了解单片机的读者。文中不仅会介绍单片机的基本原理、内部构造、开发流程和应用领域&#xff0c;还会融入更多的理论分析、实操案例以及常见问题与解决思路等&#xff0c;帮助读…...

如何使用 Go语言操作亚马逊 S3 对象云存储

以下是使用 Go 语言操作亚马逊 S3 对象云存储的详细步骤和示例代码&#xff1a; 解决思路&#xff1a; 安装必要的 Go 语言包&#xff0c;这里我们将使用 aws-sdk-go 包来与 Amazon S3 进行交互。配置 AWS 凭证&#xff0c;包括访问密钥和秘密访问密钥&#xff0c;以及 AWS 区…...

2025年应用与API安全展望:挑战与机遇并存

进入2025年&#xff0c;应用与API安全的重要性愈发突出。在过去的一年里&#xff0c;API技术已经成为数字创新的核心。然而&#xff0c;API的大规模应用也使得攻击面显著扩展&#xff0c;2024年针对业务逻辑漏洞的API攻击占比高达27%&#xff0c;较前一年增加10%。与此同时&…...

Linux安装docker,安装配置xrdp远程桌面

Linux安装docker&#xff0c;安装配置xrdp远程桌面。 1、卸载旧版本docker 卸载旧版本docker命令 yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine现在就是没有旧版本的d…...

VD:生成a2l文件

目录 前言Simulink合并地址 ASAP2 editor 前言 我之前的方法都是通过Simulink模型生成代码的过程中顺便就把a2l文件生成出来了&#xff0c;这时的a2l文件还没有地址&#xff0c;所以紧接着会去通过elf文件更新地址&#xff0c;一直以为这是固定的流程和方法&#xff0c;今天无…...

【SpringBoot应用篇】SpringBoot+MDC+自定义Filter操作traceId实现日志链路追踪

【SpringBoot应用篇】SpringBootMDC自定义Filter操作traceId实现日志链路追踪 解决的问题解决方案MDC具体逻辑ymllogback-spring.xmlTraceIdUtil操作工具类TraceIdFilter自定义过滤器GlobalExceptionHandler全局异常处理类TraceIdAspect切面UserController测试验证 多线程处理M…...

unity2022以上导出到AndroidStudio后更新步骤

1、unity里面Export出unityLibrary 2、导出apk&#xff0c;里面才包含libil2cpp(新版unity无法直接导出libil2cpp 3、注释AS项目app下的build.gradle里面包含unityLibrary的代码 4、注释AS项目settings.gradle包含unityLibrary的代码 5、删除AS项目里面的unityLibrary文件夹 6、…...

【ArcGIS初学】产生随机点计算混淆矩阵

混淆矩阵&#xff1a;用于比较分类结果和地表真实信息 总体精度(overall accuracy) :指对角线上所有样本的像元数(正确分类的像元数)除以所有像元数。 生产者精度(producers accuracy) &#xff1a;某类中正确分类的像元数除以参考数据中该类的像元数(列方向)&#xff0c;又称…...

Harmony面试模版

1. 自我介绍 看表达能力、沟通能力 面试记录&#xff1a; 2. 进一步挖掘 2.1. 现状 目前是在职还是离职&#xff0c;如果离职&#xff0c;从上一家公司离职的原因 2.2. 项目经验 如果自我介绍工作项目经验讲的不够清楚&#xff0c;可以根据简历上的信息再进一步了解 面试记…...

PCM5142集成32位384kHz PCM音频立体声114dB差分输出DAC编解码芯片

目录 PCM5142 简介PCM5142功能框图PCM5142特性 参考原理图 PCM5142 简介 PCM514x 属于单片 CMOS 集成电路系列&#xff0c;由立体声数模转换器 (DAC) 和采用薄型小外形尺寸 (TSSOP) 封装的附加支持电路组成。PCM514x 使用 TI 最新一代高级分段 DAC 架构产品&#xff0c;可实现…...

浪潮云财务系统xtdysrv.asmx存在命令执行漏洞

一、漏洞简介 浪潮云财务系统xtdysrv.asmx存在命令执行漏洞&#xff0c;未经身份验证的远程攻击者可通过该漏洞在服务器端任意执行代码。 二、漏洞影响 浪潮云财务系统三、网络测绘&#xff1a; fofa: title"TSCEV4.0"四、复现过程 前置条件 步骤 POC 1 数据包…...

【网络编程】基础知识

目录 网络发展史 局域网和广域网 局域网&#xff08;LAN&#xff09; 广域网&#xff08;Wan&#xff09; 光猫 路由器 网线 设备通信的要素 IP地址 基本概念 地址划分 特殊地址&#xff08;后续编程使用&#xff09; IP地址转换 字节序 网络模型 网络的体系结…...

ResNet (Residual Network) - 残差网络:深度卷积神经网络的突破

一、引言 在计算机视觉领域&#xff0c;图像识别一直是一个核心且具有挑战性的任务。随着深度学习的发展&#xff0c;卷积神经网络&#xff08;CNN&#xff09;在图像识别方面取得了显著的成果。然而&#xff0c;随着网络深度的增加&#xff0c;出现了梯度消失或梯度爆炸等问题…...