大数据-玩转数据-双流JOIN
一、双流JOIN
在Flink中, 支持两种方式的流的Join: Window Join和Interval Join
二、Window Join
窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。
Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join
滚动窗口Join代码段示例

package com.lyh.flink12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/24 22:09*/
public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888) // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999) // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
运行结果:

三、Interval Join
间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

Interval Join只支持event-time
必须是keyBy之后的流才可以interval join
package com.lyh.flink12;
import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;
import java.time.Duration;public class Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}
运行结果:

相关文章:
大数据-玩转数据-双流JOIN
一、双流JOIN 在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 二、Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会…...
from PIL import Image,文字成图,ImageFont import jieba分词,input优雅python绘制图片
开始的代码 import os from PIL import Image, ImageDraw, ImageFont import jiebadef generate_image_with_white_bg(text, font_path, output_path):# 设置图片大小和背景颜色image_width 800image_height 600bg_color (255, 255, 255) # 白色# 创建图片对象image Imag…...
渗透测试信息收集方法笔记
一、指纹识别 1、钟馗之眼https://www.zoomeye.org/ 2、天眼查https://www.tianyancha.com/ 3、工具:御剑WEB指纹识别系统正式版,可以查网站用了哪些框架,什么版本,有哪些漏洞 4、kali whatweb 二、信息泄露 1、csdn https://www.…...
协议栈——连接服务器
如对方的ip和port配置信息,这里的连接是指通信前的准备工作 上一篇介绍查看套接字的命令时,可以看到很多信息,但是刚刚创建出来的套接字是什么信息都没有的,协议栈也因此不知道和谁通信; 客户端填补信息 这一步中调…...
数据结构--队列与循环队列的实现
数据结构–队列的实现 1.队列的定义 比如有一个人叫做张三,这天他要去医院看病,看病时就需要先挂号,由于他来的比较晚,所以他的号码就比较大,来的比较早的号码就比较小,需要到就诊窗口从小号到大依次排队,前面的小号就诊结束之后,才会轮到大号来,小号每就诊完毕就销毁,每新来…...
数据结构—栈、队列、链表
一、栈 Stack(存取O(1)) 先进后出,进去123,出来321。 基于数组:最后一位为栈尾,用于取操作。 基于链表:第一位为栈尾,用于取操作。 1.1、数组栈 /*** 基于数组实现的顺序栈&#…...
2023年4月到7月工作经历
2023年4 有同事说程序崩溃一起分析得结果 unsigned uNum 2; std::string str "abc" uNum; std::cout << str; 结果是c 。如果uNum 很大的话,就可能崩溃。 unsigned uNum 2; //std::string str "abc" uN…...
嵌入式Linux应用开发-驱动大全-同步与互斥③
嵌入式Linux应用开发-驱动大全-同步与互斥③ 第一章 同步与互斥③1.4 Linux锁的介绍与使用1.4.1 锁的类型1.4.1.1 自旋锁1.4.1.2 睡眠锁 1.4.2 锁的内核函数1.4.2.1 自旋锁1.4.2.2 信号量1.4.2.3 互斥量1.4.2.4 semaphore和 mutex的区别 1.4.3 何时用何种锁1.4.4 内核抢占(pree…...
力扣-383.赎金信
Idea 使用一个hashmap 或者一个int数组存储第二次字符串中每一个字符及其出现的次数 遍历第一个字符串,讲出现的重复字符减1,若该字符次数已经为0,则返回false AC Code class Solution { public:bool canConstruct(string ransomNote, strin…...
计算机网络 第二章物理层
计算机网络第二章知识点速刷 其中重要的是信源和信宿,以及调制解调器在通信模型当中起到的作用。...
uniapp:动态修改页面标题
我们经常遇到这种情况,点击新增按钮,进入一个空白表单页面,点击修改按钮,其实也是进入这个表单页面,只是表单内容已经被数据库的记录反显了,为了区别页面,我们还需要动态设置页面的标题…...
java学生管理系统
一、项目概述 本学生管理系统旨在提供一个方便的界面,用于学校或机构管理学生信息,包括学生基本信息、课程成绩等。 二、系统架构 系统采用经典的三层架构,包括前端使用JavaSwing,后端采用Java Servlet,数据库使用M…...
Docker和容器化:简介和使用案例
Docker和容器化:简介和使用案例 引言 容器化技术在近年来变得越来越流行,为开发人员和运维团队提供了更加灵活、高效的软件部署和管理方式。其中,Docker是最为知名和广泛使用的容器化平台之一。本篇博客文章将介绍Docker和容器化的基本概念…...
(高阶) Redis 7 第18讲 RedLock 分布式锁
🌹 以下分享 RedLock 分布式锁,如有问题请指教。🌹🌹 如你对技术也感兴趣,欢迎交流。🌹🌹🌹 如有对阁下帮助,请👍点赞💖收藏🐱🏍分享😀 问题 分布式锁问题从(高阶) Redis 7 第17讲 分布式锁 实战篇_PJ码匠人的博客-CSDN博客 这篇文章来看,…...
嵌入式软件架构基础设施设计方法
大家好,今天分享一篇嵌入式软件架构设计相关的文章。 软件架构这东西,众说纷纭,各有观点。在我看来,软件架构是软件系统的基本结构,包含其组件、组件之间的关系、组件设计与演进的规则,以及体现这些规则的基…...
MySQL进阶_3.性能分析工具的使用
文章目录 第一节、数据库服务器的优化步骤第二节、查看系统性能参数第三节、 慢查询日志第四节、 查看 SQL 执行成本第五节、 分析查询语句:EXPLAIN5.1 基本语法5.2 EXPLAIN各列作用 第一节、数据库服务器的优化步骤 当我们遇到数据库调优问题的时候,可…...
Scala第十三章节
Scala第十三章节 1. 高阶函数介绍 2. 作为值的函数 3. 匿名函数 4. 柯里化 5. 闭包 6. 控制抽象 7. 案例: 计算器 scala总目录 文档资料下载...
Nginx高级 第一部分:扩容
Nginx高级 第一部分:扩容 通过扩容提升整体吞吐量 1.单机垂直扩容:硬件资源增加 云服务资源增加 整机:IBM、浪潮、DELL、HP等 CPU/主板:更新到主流 网卡:10G/40G网卡 磁盘:SAS(SCSI) HDD(机械…...
vue项目上线后去除控制台所有console.log打印-配置说明
方式一 npm i babel-plugin-transform-remove-console --save-dev babel.config.js文件中添加 // 然后在babel.config.js中添加判断 const prodPlugin []if (process.env.NODE_ENV production) { // 如果是生产环境,则自动清理掉打印的日志,但保留…...
《XSS-Labs》02. Level 11~20
XSS-Labs 索引Level-11题解 Level-12题解 Level-13题解 Level-14题解 Level-15题解 Level-16题解 Level-17题解 Level-18~20题解 靶场部署在 VMware - Win7。 靶场地址:https://github.com/do0dl3/xss-labs 只要手动注入恶意 JavaScript 脚本成功,就可以…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
PL0语法,分析器实现!
简介 PL/0 是一种简单的编程语言,通常用于教学编译原理。它的语法结构清晰,功能包括常量定义、变量声明、过程(子程序)定义以及基本的控制结构(如条件语句和循环语句)。 PL/0 语法规范 PL/0 是一种教学用的小型编程语言,由 Niklaus Wirth 设计,用于展示编译原理的核…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
C#中的CLR属性、依赖属性与附加属性
CLR属性的主要特征 封装性: 隐藏字段的实现细节 提供对字段的受控访问 访问控制: 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性: 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑: 可以…...
