当前位置: 首页 > news >正文

大数据Flink(八十八):Interval Join(时间区间 Join)

文章目录

Interval Join(时间区间 Join)


Interval Join(时间区间 Join)

Interval Join 定义(支持 Batch\Streaming):Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据。

应用场景:为什么有 Regular Join 还要 Interval Join 呢?刚刚的案例也讲了,Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。所以可以理解 Interval Join 就是用于消灭回撤流的。

Interval Join 包含以下几种(以 L 作为左流中的数据标识,R 作为右流中的数据标识):

  • Inner Interval Join:流任务中,只有两条流 Join 到(满足 Join on 中的条件:两条流的数据在时间区间 + 满足其他等值条件)才输出,输出 +[L, R]
  • Left Interval Join:流任务中,左流数据到达之后,如果没有 Join 到右流的数据,就会等待(放在 State 中等),如果之后右流之后数据到达之后,发现能和刚刚那条左流数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null],如果右流 State 中的数据过期了,就直接从 State 中删除。
  • Right Interval Join:和 Left Interval Join 执行逻辑一样,只不过左表和右表的执行逻辑完全相反
  • Full Interval Join:流任务中,左流或者右流的数据到达之后,如果没有 Join 到另外一条流的数据,就会等待(左流放在左流对应的 State 中等,右流放在右流对应的 State 中等),如果之后另一条流数据到达之后,发现能和刚刚那条数据 Join 到,则会输出 +[L, R]。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null],右流过期输出 -[null, R])

可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。

使用示例:

间隔联接需要至少一个等联接谓词和在两侧限制时间的联接条件。可以通过比较两个输入表中相同类型的时间属性(即处理时间或事件时间)的两个适当的范围谓词(<, <=, >=, >)BETWEEN谓词或单个相等谓词来定义这种条件。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

如果订单在收到订单后四个小时内发货,则上面的示例会将所有订单与其相应的发货合并在一起。

实际案例:还是刚刚的案例,曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光关联之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click):

下面为 Inner Interval Join:

 

Flink SQL> CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,`timestamp` bigint,row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),watermark for row_time as row_time 
) WITH ('connector' = 'socket','hostname' = 'node1',        'port' = '8888','format' = 'csv'
);Flink SQL> CREATE TABLE click_log_table (log_id BIGINT,click_params     STRING,`timestamp` bigint,row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),watermark for row_time as row_time 
)
WITH ('connector' = 'socket','hostname' = 'node1',        'port' = '9999','format' = 'csv'
);Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '10' SECOND AND click_log_table.row_time;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800 ->2022-07-20 15:06:40

开启netcat的9999端口,输入数据:

1,zhangsan,1658300805 ->2022-07-20 15:06:45

输出结果如下:

s_id                       s_params                 c_id                       c_params1                         hadoop                    1                         zhangsan

9999端口,继续输入数据:

1,zhangsan,1658300811 -> 2022-07-20 15:06:51

输出结果没有反应,因为这个时间:2022-07-20 15:06:51超过了时间区间下限。

如果是 Left Interval Join:

Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800		->2022-07-20 15:06:40

开启netcat的9999端口,输入数据:

1,zhangsan,1658300805	->2022-07-20 15:06:45

输出结果如下:

s_id                       s_params                 c_id                       c_params1                         hadoop                    1                       zhangsan

8888端口,继续输入数据:

1,hadoop,1658300801		->2022-07-20 15:06:41
1,hadoop,1658300811		->2022-07-20 15:06:51

输出结果如下:

s_id                       s_params                 c_id                       c_params1                         hadoop                    1                       zhangsan1                         hadoop                    1                       zhangsan

2022-07-20 15:06:51这条数据不会有任何的输出,因为已经超过了右表的边界。

如果是 Full Interval Join:

Flink SQL> SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '5' SECOND AND click_log_table.row_time + INTERVAL '5' SECOND;

开启netcat的8888端口,输入数据:

1,hadoop,1658300800		->2022-07-20 15:06:40
1,hadoop,1658300801		->2022-07-20 15:06:41

 开启netcat的9999端口,输入数据:

1,zhangsan,1658300805	->2022-07-20 15:06:45
1,zhangsan,1658300811	->2022-07-20 15:06:51

输出结果如下:

 s_id                       s_params                 c_id                       c_params1                         hadoop                    1                       zhangs1                         hadoop                    1                       zhangsan
  • 关于 Interval Join 的注意事项:

实时 Interval Join 可以不是 等值 join。等值 join 和 非等值 join 区别在于,等值 join 数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join 数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出 


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

相关文章:

大数据Flink(八十八):Interval Join(时间区间 Join)

文章目录 Interval Join&#xff08;时间区间 Join&#xff09; Interval Join&#xff08;时间区间 Join&#xff09; Interval Join 定义&#xff08;支持 Batch\Streaming&#xff09;&#xff1a;Interval Join 在离线的概念中是没有的。Interval Join 可以让一条流去 Jo…...

数字IC笔试千题解--判断题篇(五)

前言 出笔试题汇总&#xff0c;是为了总结秋招可能遇到的问题&#xff0c;做题不是目的&#xff0c;在做题的过程中发现自己的漏洞&#xff0c;巩固基础才是目的。 所有题目结果和解释由笔者给出&#xff0c;答案主观性较强&#xff0c;若有错误欢迎评论区指出&#xff0c;资料…...

Kubernetes(k8s)上搭建一主两从的mysql8集群

Kubernetes上搭建一主两从的mysql8集群 环境准备搭建nfs服务器安装NFS暴露nfs目录开启nfs服务器 安装MySQL集群创建命名空间创建MySQL密码的Secret安装MySQL主节点创建pv和pvc主节点的配置文件部署mysql主节点 安装第一个MySQL Slave节点创建pv和pvc第一个从节点配置文件部署my…...

MySQL备份与恢复

MySQL备份与恢复一、备份1、数据备份的重要性2、数据备份分类2.1 物理备份2.2 逻辑备份 3、数据库备份策略4、常用的备份方法和工具5、数据库上云迁移 二、数据库完全备份1、简介2、物理冷备份与恢复2.1 物理冷备份2.2 备份恢复2.3 补充知识date 3、mysqldump备份与恢复3.1 完全…...

【RTOS学习】单片机中的C语言

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《RTOS学习》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 本喵默认各位小伙伴都会C语言&#xff0c;我们平时学习C语言都是在Windows环境下学习的&#xff0…...

确知波束形成matlab仿真

阵列信号处理中的导向矢量 假设一均匀线性阵列&#xff0c;有N个阵元组成&#xff0c;满足&#xff1a;远场、窄带假设。 图1. 均匀线性阵模型 假设信源发射信号&#xff0c;来波方向为 θ \theta θ&#xff0c;第一个阵元接收到的信号为 x ( t ) x(t) x(t)&#xff0c;则第…...

并发编程相关面试题

线程基础 线程和进程的区别&#xff1a; ----------------------------------------------------------------------- 创建线程的方式&#xff1a; 1 继承Thread类 2 实现runnable接口 3 实现callable 接口&#xff08;有返回值的&#xff09; 4 线程池创建线程 ------…...

Cpp/Qt-day050921Qt

目录 实现使用数据库的登录注册功能 头文件&#xff1a; registrwidget.h: widget.h: 源文件&#xff1a; registrwidget.c: widget.h: 效果图&#xff1a; 思维导图 实现使用数据库的登录注册功能 头文件&#xff1a; registrwidget.h: #ifndef REGISTRWIDGET_H #de…...

视频汇聚/视频云存储/视频监控管理平台EasyCVR分发rtsp流起播慢优化步骤详解

安防视频监控/视频集中存储/云存储/磁盘阵列EasyCVR平台可拓展性强、视频能力灵活、部署轻快&#xff0c;可支持的主流标准协议有国标GB28181、RTSP/Onvif、RTMP等&#xff0c;以及支持厂家私有协议与SDK接入&#xff0c;包括海康Ehome、海大宇等设备的SDK等。平台既具备传统安…...

ElementUI之登陆+注册->饿了吗完成用户登录界面搭建,axios之get请求,axios之post请求,跨域,注册界面

饿了吗完成用户注册登录界面搭建axios之get请求axios之post请求跨域 1.饿了吗完成用户注册登录界面搭建 将端口号8080改为8081 导入依赖&#xff0c;在项目根目录使用命令npm install element-ui -S&#xff0c;添加Element-UI模块 -g&#xff1a;将依赖下载node_glodal全局依…...

2023华为杯研究生数学建模研赛E题出血脑卒中完整论文(含28个详细预处理数据及结果表格)

大家好呀&#xff0c;从发布赛题一直到现在&#xff0c;总算完成了全国研究生数学建模竞赛&#xff08;数模研赛&#xff09;E题完整的成品论文。 本论文可以保证原创&#xff0c;保证高质量。绝不是随便引用一大堆模型和代码复制粘贴进来完全没有应用糊弄人的垃圾半成品论文。…...

Java中的继承是什么?

在Java中&#xff0c;继承是一种面向对象编程的概念&#xff0c;它允许一个类&#xff08;称为子类或派生类&#xff09;继承另一个类&#xff08;称为父类或基类&#xff09;的属性和方法。通过继承&#xff0c;子类可以获得父类的属性和方法&#xff0c;并且可以添加自己的特…...

Python - flask后端开发笔记

​ Flask入门 有一篇很全面的博客可以参考&#xff1a;Python Flask Web 框架入门 跨域问题处理 from flask_cors import CORS CORS(app,supports_credentialsTrue,origins[url], # 前端url列表 ) 文件发送 from flask import send_from_directory app.route(/download) …...

Flutter实现PS钢笔工具,实现高精度抠图的效果。

演示&#xff1a; 代码&#xff1a; import dart:ui;import package:flutter/material.dart hide Image; import package:flutter/services.dart; import package:flutter_screenutil/flutter_screenutil.dart; import package:kq_flutter_widgets/widgets/animate/stack.dart…...

苏宁滑块验证

网址&#xff1a;https://passport.suning.com/ids/login总结一下&#xff0c;别被他的表面现象给骗了&#xff0c;这玩意儿&#xff0c;个人认为&#xff0c;腾讯的都没法跟他比&#xff01;&#xff01;&#xff01; 难点&#xff1a;动态混淆&#xff0c;vmp&#xff0c;图片…...

c语言。。。

gcc thread.c -lpthread -o app -fexec-charsetgbkthread.c为当前目录下编写的c代码 代码中引入了<pthread.h>线程库&#xff0c;所以要加上-lpthread -o app 输出.exe的c可执行文件&#xff0c;文件名为app -fexec-charsetgbk 设置编码方式&#xff0c;防止控制台输出中…...

vue-cli创建项目、vue项目目录结(运行vue项目)、ES6导入导出语法、vue项目编写规范

vue-cli创建项目、vue项目目录结构、 ES6导入导出语法、vue项目编写规范 1 vue-cli创建项目 1.1 vue-cli 命令行创建项目 1.2 使用vue-cli-ui创建 2 vue项目目录结构 2.1 运行vue项目 2.2 vue项目的目录结构 3 es6导入导出语法 4 vue项目编写规范 4.1 修改项目 4.2 以后…...

QT读取DLL加载算法

有这样一个场景&#xff0c;我有一个GUI软件&#xff0c;把他想象成PS软件&#xff0c;集成了很多工具。现在我要添加新算法(PS工具)&#xff0c;该怎么办&#xff1f; 有三种办法&#xff1a; 第一种我把新算法代码加到项目中&#xff0c;编译整个项目。 第二种&#xff0c;新…...

HTTPX-用于Python的下一代HTTP客户端

1、前言 在使用 Python 进行接口自动化时&#xff0c;大多数都会使用 requests 模块&#xff0c;requests 是一个常用的 HTTP 请求库&#xff0c;可以方便地向网站发送 HTTP 请求&#xff0c;并获取响应结果。 本篇将介绍 Python 的下一代 HTTP 客户端 - HTTPX 2、简介 HTT…...

[LLM+AIGC] 01.应用篇之中文ChatGPT初探及利用ChatGPT润色论文对比浅析(文心一言 | 讯飞星火)

近年来&#xff0c;人工智能技术火热发展&#xff0c;尤其是OpenAI在2022年11月30日发布ChatGPT聊天机器人程序&#xff0c;其使用了Transformer神经网络架构&#xff08;GPT-3.5&#xff09;&#xff0c;能够基于在预训练阶段所见的模式、统计规律和知识来生成回答&#xff0c…...

智慧教育——解读AI一体化智慧校园解决方案【附全文阅读】

适应人群为学校管理人员、教师、学生、技术运维人员及教育信息化建设相关从业者。主要内容围绕 AI 一体化智慧校园建设,阐述总体规划及革命性意义(提升教学管理水平、降低成本等);介绍八大应用中心(教学管理、物联网管控、校园安全等),涵盖智能选课排课、校园安防监控等…...

台达 PLC ES 与 3 台欧姆龙 E5CC 温控器通讯程序分享

台达PLC ES与3台欧姆龙E5CC温控器通讯程序 程序带注释&#xff0c;并附送昆仑通态和威纶通触摸屏有接线方式&#xff0c;设置 程序温度可靠 器件&#xff1a;台达DVP ES系列的PLC&#xff0c;3台欧姆龙E5CC系列温控器&#xff0c;昆仑通态&#xff0c;威纶通触摸屏 功能&#x…...

应急响应靶机练习-Web2

一、靶机介绍这个靶机主要是通过暴力破解ftp&#xff0c;获取ftp账号后上传了php shell&#xff0c;获取shell后创建后门用户&#xff0c;以及做了一些端口转发操作。靶机采用phpstudy&#xff0c;开启了ftp和web服务&#xff0c;但是要注意的是&#xff0c;一旦ftp开启&#x…...

ILI9163C SPI TFT驱动库深度解析与嵌入式优化实践

1. TFT_ILI9163C 驱动库深度解析&#xff1a;面向嵌入式系统的高性能SPI TFT显示方案1.1 项目定位与工程价值TFT_ILI9163C 是一款专为嵌入式平台优化的高性能 SPI TFT 显示驱动库&#xff0c;核心目标是在资源受限的微控制器上实现接近硬件极限的图形刷新吞吐量。其设计哲学并非…...

特征精炼残差改进YOLOv26多层卷积与恒等映射协同优化突破

特征精炼残差改进YOLOv26多层卷积与恒等映射协同优化突破 引言 在目标检测领域&#xff0c;特征表示的质量直接决定了模型的检测性能。传统的卷积神经网络在特征提取过程中往往面临着特征退化、信息丢失等问题。为了解决这些挑战&#xff0c;本文提出了一种基于特征精炼残差模…...

**发散创新:基于 Rust的微服务生态构建与性能优化实战**在现代云原生架构中,**Rust语言正迅速成为构建高并发、低延迟微服

发散创新&#xff1a;基于 Rust 的微服务生态构建与性能优化实战 在现代云原生架构中&#xff0c;Rust 语言正迅速成为构建高并发、低延迟微服务的首选工具之一。它不仅提供了媲美 C/C 的性能&#xff0c;还通过所有权机制彻底避免了内存安全问题。本文将围绕 Rust 在微服务生态…...

C语言三大控制结构:零基础学循环与选择

C语言编程里&#xff0c;控制结构用以构架程序逻辑&#xff0c;是新手入门的关键要点&#xff0c;掌握顺序、选择、循环这三大基本控制结构&#xff0c;可使你脱离单纯顺序代码编写&#xff0c;达成更复杂、更灵活的程序逻辑&#xff0c;本文会将C语言控制结构的核心知识点讲解…...

人流后多久干净才算正常?行业洞察与科学修护指南

人工流产后&#xff0c;出血排净时间是判断身体恢复状态的核心指标&#xff0c;也是女性关注的首要问题。结合行业研究与临床实践&#xff0c;本文将深入解析人流后出血的正常范围、异常信号&#xff0c;同时结合行业修护标准&#xff0c;为女性提供科学、实用的恢复指引&#…...

高速移动场景下无线信道的延迟-多普勒域建模与优化

1. 高速移动场景下的无线信道挑战 想象一下你正坐在时速120公里的高铁上刷视频&#xff0c;突然画面开始卡顿——这就是典型的高速移动场景通信问题。当收发端相对速度超过100km/h时&#xff0c;传统无线信道模型就会像老式收音机遇到隧道一样"失灵"。我在参与某车企…...

AGX Orin 部署PyTorch生态:从JetPack适配到torchvision编译避坑指南

1. AGX Orin开发环境初始化 刚拿到AGX Orin开发套件时&#xff0c;很多开发者会直接开始安装PyTorch&#xff0c;但往往忽略了基础环境配置的重要性。我去年在部署一个工业质检项目时就踩过这个坑——当时为了赶进度跳过了JetPack版本检查&#xff0c;结果导致后续torchvision编…...