当前位置: 首页 > news >正文

Flink双流Join

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join

  • coGroup

  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

:::color3 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

代码演示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果env.setParallelism(1);//2. source-加载数据   key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, Tuple3<String, Integer, String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("绿色:"+ Arrays.toString(arr));return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("绿色的时间:"+timeStamp);System.out.println(element.f0);return timeStamp;}}));;// 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777).map(new MapFunction<String, Tuple3<String,Integer,String>>() {@Overridepublic Tuple3<String, Integer, String> map(String line) throws Exception {String[] arr = line.split(",");System.out.println("橘色:"+ Arrays.toString(arr));return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);}})// 因为用到了EventTime 所以势必用到水印,否则报错.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {Long timeStamp = 0L;SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date date = null;try {date = simpleDateFormat.parse(element.f2);} catch (ParseException e) {throw new RuntimeException(e);}timeStamp = date.getTime();System.out.println("橘色的时间:"+timeStamp);return timeStamp;}}));//3. transformation-数据处理转换DataStream resultStream = greenStream.join(orangeStream).where(tup3 -> tup3.f0).equalTo(tup3 -> tup3.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {@Overridepublic Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {System.out.println(t1.f2);System.out.println(t2.f2);return Tuple3.of(t1.f0, t1.f1, t2.f1);}});//4. sink-数据输出resultStream.print();//5. execute-执行env.execute();}
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

:::

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
​
橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

相关文章:

Flink双流Join

在离线 Hive 中&#xff0c;我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢&#xff1f;Flink DataStream API 为我们提供了3个算子来实现双流 join&#xff0c;分别是&#xff1a; join coGroup intervalJoin 下面我们分别详细看一下这…...

【数据结构实战篇】用C语言实现你的私有队列

&#x1f3dd;️专栏&#xff1a;【数据结构实战篇】 &#x1f305;主页&#xff1a;f狐o狸x 在前面的文章中我们用C语言实现了栈的数据结构&#xff0c;本期内容我们将实现队列的数据结构 一、队列的概念 队列&#xff1a;只允许在一端进行插入数据操作&#xff0c;在另一端…...

基于web的海贼王动漫介绍 html+css静态网页设计6页+设计文档

&#x1f4c2;文章目录 一、&#x1f4d4;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站演示 五、⚙️网站代码 &#x1f9f1;HTML结构代码 &#x1f492;CSS样式代码 六、&#x1f527;完整源码下载 七、&#x1f4e3;更多 一、&#…...

2022 年 9 月青少年软编等考 C 语言三级真题解析

目录 T1. 课程冲突T2. 42 点思路分析T3. 最长下坡思路分析T4. 吃糖果思路分析T5. 放苹果思路分析T1. 课程冲突 此题为 2021 年 9 月三级第一题原题,见 2021 年 9 月青少年软编等考 C 语言三级真题解析中的 T1。 T2. 42 点 42 42 42 是: 组合数学上的第 5 5 5 个卡特兰数字…...

机器学习算法(六)---逻辑回归

常见的十大机器学习算法&#xff1a; 机器学习算法&#xff08;一&#xff09;—决策树 机器学习算法&#xff08;二&#xff09;—支持向量机SVM 机器学习算法&#xff08;三&#xff09;—K近邻 机器学习算法&#xff08;四&#xff09;—集成算法 机器学习算法&#xff08;五…...

计算机科学中的主要协议

1、主要应用层协议&#xff1a; HTTP、FTP、SMTP、POP、IMAP、DNS、TELNET和SSH等 应用层协议的主要功能是支持网络应用&#xff0c;定义了不同应用程序之间的通信规则。它们负责将用户操作转换为网络可以理解的数据格式&#xff0c;并通过传输层进行传输。应用层协议直接与用…...

下载maven 3.6.3并校验文件做md5或SHA512校验

一、下载Apache Maven 3.6.3 Apache Maven 3.6.3 官方下载链接&#xff1a; 二进制压缩包&#xff08;推荐&#xff09;: ZIP格式: https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.zipTAR.GZ格式: https://archive.apache.org/dist/…...

【Android】View工作原理

View 是Android在视觉上的呈现在界面上Android提供了一套GUI库&#xff0c;里面有很多控件&#xff0c;但是很多时候我们并不满足于系统提供的控件&#xff0c;因为这样就意味这应用界面的同类化比较严重。那么怎么才能做出与众不同的效果呢&#xff1f;答案是自定义View&#…...

TIE算法具体求解-为什么是泊松方程和傅里叶变换

二维泊松方程的通俗理解 二维泊松方程 是偏微分方程的一种形式&#xff0c;通常用于描述空间中某个标量场&#xff08;如位相场、电势场&#xff09;的分布规律。其一般形式为&#xff1a; ∇ 2 ϕ ( x , y ) f ( x , y ) \nabla^2 \phi(x, y) f(x, y) ∇2ϕ(x,y)f(x,y) 其…...

postman中获取随机数、唯一ID、时间日期(包括当前日期增减)截取指定位数的字符等

在Postman中&#xff0c;您可以使用内置的动态变量和编写脚本的方式来获取随机数、唯一ID、时间日期以及截取指定位数的字符。以下是具体的操作方法&#xff1a; 一、postman中获取随机数、唯一ID、时间日期&#xff08;包括当前日期增减&#xff09;截取指定位数的字符等 获取…...

【计算机网络】实验3:集线器和交换器的区别及交换器的自学习算法

实验 3&#xff1a;集线器和交换器的区别及交换器的自学习算法 一、 实验目的 加深对集线器和交换器的区别的理解。 了解交换器的自学习算法。 二、 实验环境 • Cisco Packet Tracer 模拟器 三、 实验内容 1、熟悉集线器和交换器的区别 (1) 第一步&#xff1a;构建网络…...

flink学习(14)—— 双流join

概述 Join:内连接 CoGroup&#xff1a;内连接&#xff0c;左连接&#xff0c;右连接 Interval Join&#xff1a;点对面 Join 1、Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。 2、Join 可以支持处理时间&#xff08;processing time&#xff09;和事件时…...

HTTP协议详解:从HTTP/1.0到HTTP/3的演变与优化

深入浅出&#xff1a;从头到尾全面解析HTTP协议 一、HTTP协议概述 1.1 HTTP协议简介 HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;是互联网上应用最广泛的通信协议之一。它用于客户端与服务器之间的数据传输&#xff0c;尤其是在Web…...

张量并行和流水线并行在Transformer中的具体部位

目录 张量并行和流水线并行在Transformer中的具体部位 一、张量并行 二、流水线并行 张量并行和流水线并行在Transformer中的具体部位 张量并行和流水线并行是Transformer模型中用于提高训练效率的两种并行策略。它们分别作用于模型的不同部位,以下是对这两种并行的具体说…...

WEB开发: 丢掉包袱,拥抱ASP.NET CORE!

今天的 Web 开发可以说进入了一个全新的时代&#xff0c;前后端分离、云原生、微服务等等一系列现代技术架构应运而生。在这个背景下&#xff0c;作为开发者&#xff0c;你一定希望找到一个高效、灵活、易于扩展且具有良好性能的框架。那么&#xff0c;ASP.NET Core 显然是一个…...

【论文阅读】Federated learning backdoor attack detection with persistence diagram

目的&#xff1a;检测联邦学习环境下&#xff0c;上传上来的模型是不是恶意的。 1、将一个模型转换为|L|个PD,&#xff08;其中|L|为层数&#xff09; 如何将每一层转换成一个PD&#xff1f; 为了评估第&#x1d457;层的激活值&#xff0c;我们需要&#x1d450;个输入来获…...

Gooxi Eagle Stream 2U双路通用服务器:性能强劲 灵活扩展 稳定易用

人工智能的高速发展开启了飞轮效应&#xff0c;实施数字化变革成为了企业的一道“抢答题”和“必答题”&#xff0c;而数据已成为现代企业的命脉。以HPC和AI为代表的新业务就像节节攀高的树梢&#xff0c;象征着业务创新和企业成长。但在树梢之下&#xff0c;真正让企业保持成长…...

【计算机网络】实验2:总线型以太网的特性

实验 2&#xff1a;总线型以太网的特性 一、 实验目的 加深对MAC地址&#xff0c;IP地址&#xff0c;ARP协议的理解。 了解总线型以太网的特性&#xff08;广播&#xff0c;竞争总线&#xff0c;冲突&#xff09;。 二、 实验环境 • Cisco Packet Tracer 模拟器 三、 实…...

如何在Spark中使用gbdt模型分布式预测

这目录 1 训练gbdt模型2 第三方包python环境打包3 Spark中使用gbdt模型3.1 spark配置文件3.2 主函数main.py 4 spark任务提交 1 训练gbdt模型 我们可以基于lightgbm快速的训练一个gbdt模型&#xff0c;训练相对比较简单&#xff0c;只要把训练样本处理好&#xff0c;几行代码可…...

Qt-5.14.2 example

官方历程很丰富&#xff0c;modbus、串口、chart图表、3D、视频 共享方便使用 Building and Running an Example You can test that your Qt installation is successful by opening an existing example application project. To run an example application on an Android …...

CentOS 7 Minimal安装后,别急着装图形界面!先试试这个命令搞定粘贴和联网

CentOS 7 Minimal安装后的高效运维起点&#xff1a;命令行解决粘贴与联网难题当你第一次启动刚安装好的CentOS 7 Minimal系统&#xff0c;面对漆黑终端闪烁的光标&#xff0c;是否感到一丝不安&#xff1f;许多新手在遇到无法从宿主机粘贴命令或无法联网时&#xff0c;第一反应…...

利用Taotoken模型广场为你的智能客服场景选择合适的大模型

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 利用Taotoken模型广场为你的智能客服场景选择合适的大模型 构建智能客服系统时&#xff0c;一个核心决策是选择合适的大语言模型。…...

终极暗黑破坏神2存档编辑器:轻松修改单机角色的完整指南

终极暗黑破坏神2存档编辑器&#xff1a;轻松修改单机角色的完整指南 【免费下载链接】d2s-editor 项目地址: https://gitcode.com/gh_mirrors/d2/d2s-editor 还在为暗黑破坏神2单机存档的管理而烦恼吗&#xff1f;d2s-editor是一款功能强大的暗黑破坏神2存档编辑器&…...

MySQL 空间数据类型 GIS:地图功能的数据库实现

开场白 做 LBS&#xff08;基于位置的服务&#xff09;的时候&#xff0c;很多人直接用经纬度两个字段存&#xff0c;然后算距离用公式在应用层算。数据量小的时候没问题&#xff0c;数据一大&#xff0c;每次查附近的人都要全表扫描算一遍距离&#xff0c;性能根本扛不住。我…...

Playwright安装失败排障指南:五种生产级部署方式

1. 为什么“mcp-playwright”安装总卡在第一步&#xff1f;——先破除三个普遍误解你是不是也遇到过这样的情况&#xff1a;在终端里敲下pip install mcp-playwright&#xff0c;回车后等了三分钟&#xff0c;结果弹出一长串红色报错&#xff0c;最后一行赫然写着ERROR: No mat…...

从激活困境到一键解放:KMS_VL_ALL_AIO如何重塑你的Windows与Office体验

从激活困境到一键解放&#xff1a;KMS_VL_ALL_AIO如何重塑你的Windows与Office体验 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO 你是否曾为Windows激活问题而烦恼&#xff1f;每次重装系统后…...

Applera1n终极指南:如何在iOS 15-16设备上完整绕过iCloud激活锁

Applera1n终极指南&#xff1a;如何在iOS 15-16设备上完整绕过iCloud激活锁 【免费下载链接】applera1n icloud bypass for ios 15-16 项目地址: https://gitcode.com/gh_mirrors/ap/applera1n Applera1n是一款专门针对iOS 15-16系统的iCloud激活锁绕过工具&#xff0c;…...

对比直接使用厂商API体验Taotoken聚合服务的稳定性

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 对比直接使用厂商API体验Taotoken聚合服务的稳定性 在集成大模型能力到实际业务的过程中&#xff0c;开发者除了关注模型效果和成本…...

5分钟解锁Switch终极性能:Atmosphere大气层系统完全指南

5分钟解锁Switch终极性能&#xff1a;Atmosphere大气层系统完全指南 【免费下载链接】Atmosphere-stable 大气层整合包系统稳定版 项目地址: https://gitcode.com/gh_mirrors/at/Atmosphere-stable 想让你的Nintendo Switch游戏体验彻底升级吗&#xff1f;Atmosphere-st…...

Cursor Pro破解终极指南:5步实现永久免费使用的完整解决方案

Cursor Pro破解终极指南&#xff1a;5步实现永久免费使用的完整解决方案 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve reached yo…...