当前位置: 首页 > news >正文

大数据-玩转数据-双流JOIN

一、双流JOIN

在Flink中, 支持两种方式的流的Join: Window Join和Interval Join

二、Window Join

窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素.
注意:
1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会处理(就是忽略掉了)
2.join成功后的元素的会以所在窗口的最大时间作为其时间戳. 例如窗口[5,10), 则元素会以9作为自己的时间戳。
Window join 仍然可分为 滚动窗口、滑动窗口Join、会话窗口Join

滚动窗口Join代码段示例
在这里插入图片描述

package com.lyh.flink12;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/24 22:09*/
public class Flink01_Join_Window_Tumbling {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999)  // 在socket终端只输入毫秒级别的时间戳.map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {return element.getTs() * 1000;}}));s1.join(s2).where(WaterSensor::getId).equalTo(WaterSensor::getId).window(TumblingEventTimeWindows.of(Time.seconds(5))) // 必须使用窗口.apply(new JoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic String join(WaterSensor first, WaterSensor second) throws Exception {return "first: " + first + ", second: " + second;}}).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

运行结果:
在这里插入图片描述

三、Interval Join

间隔流join(Interval Join), 是指使用一个流的数据按照key去join另外一条流的指定范围的数据.
如下图: 橙色的流去join绿色的流.范围是由橙色流的event-time + lower bound和event-time + upper bound来决定的.
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

在这里插入图片描述
Interval Join只支持event-time
必须是keyBy之后的流才可以interval join

package com.lyh.flink12;

import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;

import java.time.Duration;public class  Sql_Join_Windows_Interval{public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> s1 = env.socketTextStream("hadoop100", 8888).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));SingleOutputStreamOperator<WaterSensor> s2 = env.socketTextStream("hadoop100", 9999).map(value -> {String[] data = value.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {@Overridepublic long extractTimestamp(WaterSensor element, long timestamp) {return element.getTs();}}));s1.keyBy(WaterSensor::getId).intervalJoin(s2.keyBy(WaterSensor::getId)).between(Time.seconds(-2),Time.seconds(3)).process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor left,WaterSensor right,Context ctx,Collector<String> out) throws Exception {out.collect(left + "," + right);}}).print();try{env.execute();} catch (Exception e){e.printStackTrace();}}}

运行结果:
在这里插入图片描述

相关文章:

大数据-玩转数据-双流JOIN

一、双流JOIN 在Flink中, 支持两种方式的流的Join: Window Join和Interval Join 二、Window Join 窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素. 注意: 1.所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, 则a流中这个元素就不会…...

from PIL import Image,文字成图,ImageFont import jieba分词,input优雅python绘制图片

开始的代码 import os from PIL import Image, ImageDraw, ImageFont import jiebadef generate_image_with_white_bg(text, font_path, output_path):# 设置图片大小和背景颜色image_width 800image_height 600bg_color (255, 255, 255) # 白色# 创建图片对象image Imag…...

渗透测试信息收集方法笔记

一、指纹识别 1、钟馗之眼https://www.zoomeye.org/ 2、天眼查https://www.tianyancha.com/ 3、工具&#xff1a;御剑WEB指纹识别系统正式版&#xff0c;可以查网站用了哪些框架&#xff0c;什么版本&#xff0c;有哪些漏洞 4、kali whatweb 二、信息泄露 1、csdn https://www.…...

协议栈——连接服务器

如对方的ip和port配置信息&#xff0c;这里的连接是指通信前的准备工作 上一篇介绍查看套接字的命令时&#xff0c;可以看到很多信息&#xff0c;但是刚刚创建出来的套接字是什么信息都没有的&#xff0c;协议栈也因此不知道和谁通信&#xff1b; 客户端填补信息 这一步中调…...

数据结构--队列与循环队列的实现

数据结构–队列的实现 1.队列的定义 比如有一个人叫做张三,这天他要去医院看病,看病时就需要先挂号,由于他来的比较晚,所以他的号码就比较大,来的比较早的号码就比较小,需要到就诊窗口从小号到大依次排队,前面的小号就诊结束之后,才会轮到大号来,小号每就诊完毕就销毁,每新来…...

数据结构—栈、队列、链表

一、栈 Stack&#xff08;存取O(1)&#xff09; 先进后出&#xff0c;进去123&#xff0c;出来321。 基于数组&#xff1a;最后一位为栈尾&#xff0c;用于取操作。 基于链表&#xff1a;第一位为栈尾&#xff0c;用于取操作。 1.1、数组栈 /*** 基于数组实现的顺序栈&#…...

2023年4月到7月工作经历

2023年4 有同事说程序崩溃一起分析得结果 unsigned uNum 2; std::string str "abc" uNum; std::cout << str; 结果是c 。如果uNum 很大的话&#xff0c;就可能崩溃。 unsigned uNum 2; //std::string str "abc" uN…...

嵌入式Linux应用开发-驱动大全-同步与互斥③

嵌入式Linux应用开发-驱动大全-同步与互斥③ 第一章 同步与互斥③1.4 Linux锁的介绍与使用1.4.1 锁的类型1.4.1.1 自旋锁1.4.1.2 睡眠锁 1.4.2 锁的内核函数1.4.2.1 自旋锁1.4.2.2 信号量1.4.2.3 互斥量1.4.2.4 semaphore和 mutex的区别 1.4.3 何时用何种锁1.4.4 内核抢占(pree…...

力扣-383.赎金信

Idea 使用一个hashmap 或者一个int数组存储第二次字符串中每一个字符及其出现的次数 遍历第一个字符串&#xff0c;讲出现的重复字符减1&#xff0c;若该字符次数已经为0&#xff0c;则返回false AC Code class Solution { public:bool canConstruct(string ransomNote, strin…...

计算机网络 第二章物理层

计算机网络第二章知识点速刷 其中重要的是信源和信宿&#xff0c;以及调制解调器在通信模型当中起到的作用。...

uniapp:动态修改页面标题

我们经常遇到这种情况&#xff0c;点击新增按钮&#xff0c;进入一个空白表单页面&#xff0c;点击修改按钮&#xff0c;其实也是进入这个表单页面&#xff0c;只是表单内容已经被数据库的记录反显了&#xff0c;为了区别页面&#xff0c;我们还需要动态设置页面的标题&#xf…...

java学生管理系统

一、项目概述 本学生管理系统旨在提供一个方便的界面&#xff0c;用于学校或机构管理学生信息&#xff0c;包括学生基本信息、课程成绩等。 二、系统架构 系统采用经典的三层架构&#xff0c;包括前端使用JavaSwing&#xff0c;后端采用Java Servlet&#xff0c;数据库使用M…...

Docker和容器化:简介和使用案例

Docker和容器化&#xff1a;简介和使用案例 引言 容器化技术在近年来变得越来越流行&#xff0c;为开发人员和运维团队提供了更加灵活、高效的软件部署和管理方式。其中&#xff0c;Docker是最为知名和广泛使用的容器化平台之一。本篇博客文章将介绍Docker和容器化的基本概念…...

(高阶) Redis 7 第18讲 RedLock 分布式锁

🌹 以下分享 RedLock 分布式锁,如有问题请指教。🌹🌹 如你对技术也感兴趣,欢迎交流。🌹🌹🌹 如有对阁下帮助,请👍点赞💖收藏🐱‍🏍分享😀 问题 分布式锁问题从(高阶) Redis 7 第17讲 分布式锁 实战篇_PJ码匠人的博客-CSDN博客 这篇文章来看,…...

嵌入式软件架构基础设施设计方法

大家好&#xff0c;今天分享一篇嵌入式软件架构设计相关的文章。 软件架构这东西&#xff0c;众说纷纭&#xff0c;各有观点。在我看来&#xff0c;软件架构是软件系统的基本结构&#xff0c;包含其组件、组件之间的关系、组件设计与演进的规则&#xff0c;以及体现这些规则的基…...

MySQL进阶_3.性能分析工具的使用

文章目录 第一节、数据库服务器的优化步骤第二节、查看系统性能参数第三节、 慢查询日志第四节、 查看 SQL 执行成本第五节、 分析查询语句&#xff1a;EXPLAIN5.1 基本语法5.2 EXPLAIN各列作用 第一节、数据库服务器的优化步骤 当我们遇到数据库调优问题的时候&#xff0c;可…...

Scala第十三章节

Scala第十三章节 1. 高阶函数介绍 2. 作为值的函数 3. 匿名函数 4. 柯里化 5. 闭包 6. 控制抽象 7. 案例: 计算器 scala总目录 文档资料下载...

Nginx高级 第一部分:扩容

Nginx高级 第一部分&#xff1a;扩容 通过扩容提升整体吞吐量 1.单机垂直扩容&#xff1a;硬件资源增加 云服务资源增加 整机&#xff1a;IBM、浪潮、DELL、HP等 CPU/主板&#xff1a;更新到主流 网卡&#xff1a;10G/40G网卡 磁盘&#xff1a;SAS(SCSI) HDD&#xff08;机械…...

vue项目上线后去除控制台所有console.log打印-配置说明

方式一 npm i babel-plugin-transform-remove-console --save-dev babel.config.js文件中添加 // 然后在babel.config.js中添加判断 const prodPlugin []if (process.env.NODE_ENV production) { // 如果是生产环境&#xff0c;则自动清理掉打印的日志&#xff0c;但保留…...

《XSS-Labs》02. Level 11~20

XSS-Labs 索引Level-11题解 Level-12题解 Level-13题解 Level-14题解 Level-15题解 Level-16题解 Level-17题解 Level-18~20题解 靶场部署在 VMware - Win7。 靶场地址&#xff1a;https://github.com/do0dl3/xss-labs 只要手动注入恶意 JavaScript 脚本成功&#xff0c;就可以…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

mongodb源码分析session执行handleRequest命令find过程

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程&#xff0c;并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令&#xff0c;把数据流转换成Message&#xff0c;状态转变流程是&#xff1a;State::Created 》 St…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?

现有的 Redis 分布式锁库&#xff08;如 Redisson&#xff09;相比于开发者自己基于 Redis 命令&#xff08;如 SETNX, EXPIRE, DEL&#xff09;手动实现分布式锁&#xff0c;提供了巨大的便利性和健壮性。主要体现在以下几个方面&#xff1a; 原子性保证 (Atomicity)&#xff…...

Unity UGUI Button事件流程

场景结构 测试代码 public class TestBtn : MonoBehaviour {void Start(){var btn GetComponent<Button>();btn.onClick.AddListener(OnClick);}private void OnClick(){Debug.Log("666");}}当添加事件时 // 实例化一个ButtonClickedEvent的事件 [Formerl…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...