[尚硅谷 flink] 基于时间的合流——双流联结(Join)
文章目录
- 8.1 窗口联结(Window Join)
- 8.2 **间隔联结(Interval Join)**
8.1 窗口联结(Window Join)
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
package org.example.watermark;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key,来进行匹配关联// 3. 只能拿到匹配上的数据,类似有固定时间范围的inner joinDataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1的keyby.equalTo(r2 -> r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用join方法* @param first ds1的数据* @param second ds2的数据* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}
其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。
8.2 间隔联结(Interval Join)
- 间隔联结的原理
间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:

下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。
所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {out.collect(left + "," + right);}});
- 处理迟到数据
//2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1的数据* @param right ks2的数据* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();
相关文章:
[尚硅谷 flink] 基于时间的合流——双流联结(Join)
文章目录 8.1 窗口联结(Window Join)8.2 **间隔联结(Interval Join)** 8.1 窗口联结(Window Join) Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并…...
怎样恢复已删除的照片?教你3个方法,一键恢复!
很多人喜欢以拍照的形式记录生活,手机里的照片就很容易堆积成山,但当内存不够用时就不得不选择删除。可是这些美好的照片始终是很多人心中抹不去的记忆,那么该怎样恢复已删除的照片呢?下面几招,教你一键恢复࿰…...
植物糖基转移酶数据库-23年-地表最强系列-文献精读-6
pUGTdb: A comprehensive database of plant UDP-dependent glycosyltransferases pUGTdb:植物UDP依赖糖基转移酶的全面数据库 一篇关于植物糖基转移数据库的综述,地表最强,总结的最全面的版本之一,各位看官有推荐请留言评论区~…...
虚拟机打不开
问题 另一个程序已锁定文件的一部分,进程无法访问 打不开磁盘“G:\centeros\hadoop104kl\hadoop100-cl2.vmdk”或它所依赖的某个快照磁盘。 模块“Disk”启动失败。 未能启动虚拟机。 原因 前一次非正常关闭虚拟机导致.lck 文件是VMWare软件的一种磁盘锁文件&…...
MySQL数据库版本为5.5.62,时间戳超出2038年1月19日的解决方案
MySQL数据库版本是 5.5.62,已设置字段的类型为BIGINT,使用FROM_UNIXTIME()函数来转换时间戳,返回NULL。 SELECT FROM_UNIXTIME(1617970800)SELECT FROM_UNIXTIME(2185743121)MySQL数据库版本为5.5.62,已设置字段的类型为BIGINT&a…...
C++20 semaphore(信号量) 详解
头文件在C20中是并发库技术规范(Technical Specification, TS)的一部分。信号量是同步原语,帮助控制多线程程序中对共享资源的访问。头文件提供了标准C方式来使用信号量。 使用环境 Windows:VS中打开项目属性,修改C语…...
【简单讲解下Lisp的学习历程】
🎥博主:程序员不想YY啊 💫CSDN优质创作者,CSDN实力新星,CSDN博客专家 🤗点赞🎈收藏⭐再看💫养成习惯 ✨希望本文对您有所裨益,如有不足之处,欢迎在评论区提出…...
构建高效网络:深入理解正向与反向代理的作用与配置
正向代理 如果把局域网外的互联网环境想象成一个巨大的资源库,则局域网中的客户端要访问互联网则需要通过代理服务器来访问,这种代理成为正向代理。 示例: 用户想要访问 https://chensir.ink (目标服务器)࿰…...
Linux:make/makefile的使用
一、什么是makefile/make 会不会写makefile,从一个侧面说明了一个人是否具备完成大型工程的能力 一个工程中的源文件不计数,其按类型、功能、模块分别放在若干个目录中,makefile定义了一系列的 规则来指定,哪些文件需要先编译&am…...
Java设计模式—策略模式(商场打折)
策略这个词应该怎么理解,打个比方说,我们出门的时候会选择不同的出行方式,比如骑自行车、坐公交、坐火车、坐飞机、坐火箭等等,这些出行方式,每一种都是一个策略。 再比如我们去逛商场,商场现在正在搞活动&…...
FOR循环
oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 前面两种循环都要根据条件是否成立而确定循环体的执行,具体循环体执行多少次事先并不知道。 FOR 循环可以控制循环执行的次数,由循环变量控制循环体的…...
C++: 命名空间/C++输入输出/缺省参数/函数重载/引用/内联函数
进入C以后,就翻开了新的篇章。C支持C语言的使用。事实上,C是创建者在发现C语言中有很多不好用的地方(在后续学习中会明显看到)后,在C语言基础上又加入了许多语法,于是就成了C。 1.命名空间 来源ÿ…...
Java | Leetcode Java题解之第13题罗马数字转整数
题目: 题解: class Solution {Map<Character, Integer> symbolValues new HashMap<Character, Integer>() {{put(I, 1);put(V, 5);put(X, 10);put(L, 50);put(C, 100);put(D, 500);put(M, 1000);}};public int romanToInt(String s) {int …...
题目:学习使用register定义变量的方法。
题目:学习使用register定义变量的方法。 There is no nutrition in the blog content. After reading it, you will not only suffer from malnutrition, but also impotence. The blog content is all parallel goods. Those who are worried about being cheated …...
IO_DAY7
1:实现2个终端之间的互相聊天 要求:千万不要做出来2个终端之间的消息发送是读一写的,一定要能够做到,一个终端发送n条消息,另一个终端一条消息都不回复都是没有问题的 终端A: #include<myhead.h> int main(int argc, char…...
大模型学习笔记八:手撕AutoGPT
文章目录 一、功能需求二、演示用例三、核心模块流程图四、代码分析1)Agent类目录创建智能体对象2)开始主流程3)在prompt的main目录输入主prompt和最后prompt4)增加实际的工具集tools(也就是函数)5…...
Java常用API_System——常用方法及代码演示
1.System.exit(int status) 方法的形参int status为状态码,如果是0,说明虚拟机正常停止,如果非0,说明虚拟机非正常停止。需要将程序结束时可以调用这个方法 代码演示: public class Test {public static void main(S…...
neo4j图数据库下载安装配置
neo4j下载地址Index of /doc/neo4j/3.5.8/ 1.说明:jdk 1.8 版本对应的 neo4j 数据库版本 推荐安装3.X版本 2.配置系统环境变量 3.启动 neo4j.bat console 4.访问...
结构化面试-有矛盾的人际沟通题
例题一: 你和小张一起值班,但是小张没来,刚好领导检查发现后批评了他,事后小张埋怨你, 认为你在领导面前表现,并在同事中传播,同事也觉得你不通人情,你怎么处理? 回答&a…...
AI技术创业机会之金融科技
金融科技服务(FinTech)领域正经历着一场由人工智能(AI)技术引领的深刻变革,为创业者提供了无数创新与颠覆传统金融服务模式的机会。以下详述了金融科技服务中AI技术的具体创业机会及其细节与内容,以期为有志于涉足此领域的创业者提供全面的洞察与参考。 一、智能投顾与财…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...
BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践
6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
华硕a豆14 Air香氛版,美学与科技的馨香融合
在快节奏的现代生活中,我们渴望一个能激发创想、愉悦感官的工作与生活伙伴,它不仅是冰冷的科技工具,更能触动我们内心深处的细腻情感。正是在这样的期许下,华硕a豆14 Air香氛版翩然而至,它以一种前所未有的方式&#x…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
