大数据-玩转数据-双流JOIN
一、双流JOIN
在Flink中, 支持两种方式的流的Join: Window Join和Interval Join
二、Window Join
窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
 注意:
 1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
 2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。
 Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join
滚动窗口Join代码段示例
 
package com.lyh.flink12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/24 22:09*/
public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}
运行结果:
 
三、Interval Join
间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
 如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
 orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

 Interval Join只支持event-time
 必须是keyBy之后的流才可以interval join
package com.lyh.flink12;
import com.lyh.bean.WaterSensor;
 import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.table.planner.expressions.In;
 import org.apache.flink.util.Collector;
import java.time.Duration;public class  Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}
运行结果:
 
相关文章:
 
大数据-玩转数据-双流JOIN
一、双流JOIN 在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 二、Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会…...
 
from PIL import Image,文字成图,ImageFont import jieba分词,input优雅python绘制图片
开始的代码 import os from PIL import Image, ImageDraw, ImageFont import jiebadef generate_image_with_white_bg(text, font_path, output_path):# 设置图片大小和背景颜色image_width 800image_height 600bg_color (255, 255, 255) # 白色# 创建图片对象image Imag…...
渗透测试信息收集方法笔记
一、指纹识别 1、钟馗之眼https://www.zoomeye.org/ 2、天眼查https://www.tianyancha.com/ 3、工具:御剑WEB指纹识别系统正式版,可以查网站用了哪些框架,什么版本,有哪些漏洞 4、kali whatweb 二、信息泄露 1、csdn https://www.…...
 
协议栈——连接服务器
如对方的ip和port配置信息,这里的连接是指通信前的准备工作 上一篇介绍查看套接字的命令时,可以看到很多信息,但是刚刚创建出来的套接字是什么信息都没有的,协议栈也因此不知道和谁通信; 客户端填补信息 这一步中调…...
 
数据结构--队列与循环队列的实现
数据结构–队列的实现 1.队列的定义 比如有一个人叫做张三,这天他要去医院看病,看病时就需要先挂号,由于他来的比较晚,所以他的号码就比较大,来的比较早的号码就比较小,需要到就诊窗口从小号到大依次排队,前面的小号就诊结束之后,才会轮到大号来,小号每就诊完毕就销毁,每新来…...
 
数据结构—栈、队列、链表
一、栈 Stack(存取O(1)) 先进后出,进去123,出来321。 基于数组:最后一位为栈尾,用于取操作。 基于链表:第一位为栈尾,用于取操作。 1.1、数组栈 /*** 基于数组实现的顺序栈&#…...
2023年4月到7月工作经历
2023年4 有同事说程序崩溃一起分析得结果 unsigned uNum 2; std::string str "abc" uNum; std::cout << str; 结果是c 。如果uNum 很大的话,就可能崩溃。 unsigned uNum 2; //std::string str "abc" uN…...
 
嵌入式Linux应用开发-驱动大全-同步与互斥③
嵌入式Linux应用开发-驱动大全-同步与互斥③ 第一章 同步与互斥③1.4 Linux锁的介绍与使用1.4.1 锁的类型1.4.1.1 自旋锁1.4.1.2 睡眠锁 1.4.2 锁的内核函数1.4.2.1 自旋锁1.4.2.2 信号量1.4.2.3 互斥量1.4.2.4 semaphore和 mutex的区别 1.4.3 何时用何种锁1.4.4 内核抢占(pree…...
 
力扣-383.赎金信
Idea 使用一个hashmap 或者一个int数组存储第二次字符串中每一个字符及其出现的次数 遍历第一个字符串,讲出现的重复字符减1,若该字符次数已经为0,则返回false AC Code class Solution { public:bool canConstruct(string ransomNote, strin…...
 
计算机网络 第二章物理层
计算机网络第二章知识点速刷 其中重要的是信源和信宿,以及调制解调器在通信模型当中起到的作用。...
uniapp:动态修改页面标题
我们经常遇到这种情况,点击新增按钮,进入一个空白表单页面,点击修改按钮,其实也是进入这个表单页面,只是表单内容已经被数据库的记录反显了,为了区别页面,我们还需要动态设置页面的标题…...
 
java学生管理系统
一、项目概述 本学生管理系统旨在提供一个方便的界面,用于学校或机构管理学生信息,包括学生基本信息、课程成绩等。 二、系统架构 系统采用经典的三层架构,包括前端使用JavaSwing,后端采用Java Servlet,数据库使用M…...
Docker和容器化:简介和使用案例
Docker和容器化:简介和使用案例 引言 容器化技术在近年来变得越来越流行,为开发人员和运维团队提供了更加灵活、高效的软件部署和管理方式。其中,Docker是最为知名和广泛使用的容器化平台之一。本篇博客文章将介绍Docker和容器化的基本概念…...
 
(高阶) Redis 7 第18讲 RedLock 分布式锁
🌹 以下分享 RedLock 分布式锁,如有问题请指教。🌹🌹 如你对技术也感兴趣,欢迎交流。🌹🌹🌹 如有对阁下帮助,请👍点赞💖收藏🐱🏍分享😀 问题 分布式锁问题从(高阶) Redis 7 第17讲 分布式锁 实战篇_PJ码匠人的博客-CSDN博客 这篇文章来看,…...
 
嵌入式软件架构基础设施设计方法
大家好,今天分享一篇嵌入式软件架构设计相关的文章。 软件架构这东西,众说纷纭,各有观点。在我看来,软件架构是软件系统的基本结构,包含其组件、组件之间的关系、组件设计与演进的规则,以及体现这些规则的基…...
 
MySQL进阶_3.性能分析工具的使用
文章目录 第一节、数据库服务器的优化步骤第二节、查看系统性能参数第三节、 慢查询日志第四节、 查看 SQL 执行成本第五节、 分析查询语句:EXPLAIN5.1 基本语法5.2 EXPLAIN各列作用 第一节、数据库服务器的优化步骤 当我们遇到数据库调优问题的时候,可…...
 
Scala第十三章节
Scala第十三章节 1. 高阶函数介绍 2. 作为值的函数 3. 匿名函数 4. 柯里化 5. 闭包 6. 控制抽象 7. 案例: 计算器 scala总目录 文档资料下载...
 
Nginx高级 第一部分:扩容
Nginx高级 第一部分:扩容 通过扩容提升整体吞吐量 1.单机垂直扩容:硬件资源增加 云服务资源增加 整机:IBM、浪潮、DELL、HP等 CPU/主板:更新到主流 网卡:10G/40G网卡 磁盘:SAS(SCSI) HDD(机械…...
vue项目上线后去除控制台所有console.log打印-配置说明
方式一 npm i babel-plugin-transform-remove-console --save-dev babel.config.js文件中添加 // 然后在babel.config.js中添加判断 const prodPlugin []if (process.env.NODE_ENV production) { // 如果是生产环境,则自动清理掉打印的日志,但保留…...
 
《XSS-Labs》02. Level 11~20
XSS-Labs 索引Level-11题解 Level-12题解 Level-13题解 Level-14题解 Level-15题解 Level-16题解 Level-17题解 Level-18~20题解 靶场部署在 VMware - Win7。 靶场地址:https://github.com/do0dl3/xss-labs 只要手动注入恶意 JavaScript 脚本成功,就可以…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
 
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
 
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
 
P3 QT项目----记事本(3.8)
3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...
 
Unity UGUI Button事件流程
场景结构 测试代码 public class TestBtn : MonoBehaviour {void Start(){var btn GetComponent<Button>();btn.onClick.AddListener(OnClick);}private void OnClick(){Debug.Log("666");}}当添加事件时 // 实例化一个ButtonClickedEvent的事件 [Formerl…...
 
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...
