[尚硅谷 flink] 基于时间的合流——双流联结(Join)
文章目录
- 8.1 窗口联结(Window Join)
- 8.2 **间隔联结(Interval Join)**
8.1 窗口联结(Window Join)
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
package org.example.watermark;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key,来进行匹配关联// 3. 只能拿到匹配上的数据,类似有固定时间范围的inner joinDataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1的keyby.equalTo(r2 -> r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用join方法* @param first ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}
其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。
8.2 间隔联结(Interval Join)
- 间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
- 处理迟到数据
//2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1的数据* @param right ks2的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();
相关文章:

[尚硅谷 flink] 基于时间的合流——双流联结(Join)
文章目录 8.1 窗口联结(Window Join)8.2 **间隔联结(Interval Join)** 8.1 窗口联结(Window Join) Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并…...

怎样恢复已删除的照片?教你3个方法,一键恢复!
很多人喜欢以拍照的形式记录生活,手机里的照片就很容易堆积成山,但当内存不够用时就不得不选择删除。可是这些美好的照片始终是很多人心中抹不去的记忆,那么该怎样恢复已删除的照片呢?下面几招,教你一键恢复࿰…...

植物糖基转移酶数据库-23年-地表最强系列-文献精读-6
pUGTdb: A comprehensive database of plant UDP-dependent glycosyltransferases pUGTdb:植物UDP依赖糖基转移酶的全面数据库 一篇关于植物糖基转移数据库的综述,地表最强,总结的最全面的版本之一,各位看官有推荐请留言评论区~…...

虚拟机打不开
问题 另一个程序已锁定文件的一部分,进程无法访问 打不开磁盘“G:\centeros\hadoop104kl\hadoop100-cl2.vmdk”或它所依赖的某个快照磁盘。 模块“Disk”启动失败。 未能启动虚拟机。 原因 前一次非正常关闭虚拟机导致.lck 文件是VMWare软件的一种磁盘锁文件&…...

MySQL数据库版本为5.5.62,时间戳超出2038年1月19日的解决方案
MySQL数据库版本是 5.5.62,已设置字段的类型为BIGINT,使用FROM_UNIXTIME()函数来转换时间戳,返回NULL。 SELECT FROM_UNIXTIME(1617970800)SELECT FROM_UNIXTIME(2185743121)MySQL数据库版本为5.5.62,已设置字段的类型为BIGINT&a…...
C++20 semaphore(信号量) 详解
头文件在C20中是并发库技术规范(Technical Specification, TS)的一部分。信号量是同步原语,帮助控制多线程程序中对共享资源的访问。头文件提供了标准C方式来使用信号量。 使用环境 Windows:VS中打开项目属性,修改C语…...

【简单讲解下Lisp的学习历程】
🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出…...

构建高效网络:深入理解正向与反向代理的作用与配置
正向代理 如果把局域网外的互联网环境想象成一个巨大的资源库,则局域网中的客户端要访问互联网则需要通过代理服务器来访问,这种代理成为正向代理。 示例: 用户想要访问 https://chensir.ink (目标服务器)࿰…...

Linux:make/makefile的使用
一、什么是makefile/make 会不会写makefile,从一个侧面说明了一个人是否具备完成大型工程的能力 一个工程中的源文件不计数,其按类型、功能、模块分别放在若干个目录中,makefile定义了一系列的 规则来指定,哪些文件需要先编译&am…...

Java设计模式—策略模式(商场打折)
策略这个词应该怎么理解,打个比方说,我们出门的时候会选择不同的出行方式,比如骑自行车、坐公交、坐火车、坐飞机、坐火箭等等,这些出行方式,每一种都是一个策略。 再比如我们去逛商场,商场现在正在搞活动&…...

FOR循环
oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 前面两种循环都要根据条件是否成立而确定循环体的执行,具体循环体执行多少次事先并不知道。 FOR 循环可以控制循环执行的次数,由循环变量控制循环体的…...
C++: 命名空间/C++输入输出/缺省参数/函数重载/引用/内联函数
进入C以后,就翻开了新的篇章。C支持C语言的使用。事实上,C是创建者在发现C语言中有很多不好用的地方(在后续学习中会明显看到)后,在C语言基础上又加入了许多语法,于是就成了C。 1.命名空间 来源ÿ…...

Java | Leetcode Java题解之第13题罗马数字转整数
题目: 题解: class Solution {Map<Character, Integer> symbolValues new HashMap<Character, Integer>() {{put(I, 1);put(V, 5);put(X, 10);put(L, 50);put(C, 100);put(D, 500);put(M, 1000);}};public int romanToInt(String s) {int …...
题目:学习使用register定义变量的方法。
题目:学习使用register定义变量的方法。 There is no nutrition in the blog content. After reading it, you will not only suffer from malnutrition, but also impotence. The blog content is all parallel goods. Those who are worried about being cheated …...

IO_DAY7
1:实现2个终端之间的互相聊天 要求:千万不要做出来2个终端之间的消息发送是读一写的,一定要能够做到,一个终端发送n条消息,另一个终端一条消息都不回复都是没有问题的 终端A: #include<myhead.h> int main(int argc, char…...

大模型学习笔记八:手撕AutoGPT
文章目录 一、功能需求二、演示用例三、核心模块流程图四、代码分析1)Agent类目录创建智能体对象2)开始主流程3)在prompt的main目录输入主prompt和最后prompt4)增加实际的工具集tools(也就是函数)5…...

Java常用API_System——常用方法及代码演示
1.System.exit(int status) 方法的形参int status为状态码,如果是0,说明虚拟机正常停止,如果非0,说明虚拟机非正常停止。需要将程序结束时可以调用这个方法 代码演示: public class Test {public static void main(S…...

neo4j图数据库下载安装配置
neo4j下载地址Index of /doc/neo4j/3.5.8/ 1.说明:jdk 1.8 版本对应的 neo4j 数据库版本 推荐安装3.X版本 2.配置系统环境变量 3.启动 neo4j.bat console 4.访问...
结构化面试-有矛盾的人际沟通题
例题一: 你和小张一起值班,但是小张没来,刚好领导检查发现后批评了他,事后小张埋怨你, 认为你在领导面前表现,并在同事中传播,同事也觉得你不通人情,你怎么处理? 回答&a…...
AI技术创业机会之金融科技
金融科技服务(FinTech)领域正经历着一场由人工智能(AI)技术引领的深刻变革,为创业者提供了无数创新与颠覆传统金融服务模式的机会。以下详述了金融科技服务中AI技术的具体创业机会及其细节与内容,以期为有志于涉足此领域的创业者提供全面的洞察与参考。 一、智能投顾与财…...

TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
可靠性+灵活性:电力载波技术在楼宇自控中的核心价值
可靠性灵活性:电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中,电力载波技术(PLC)凭借其独特的优势,正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据,无需额外布…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...