当前位置: 首页 > news >正文

【入门Flink】- 10基于时间的双流联合(join)

统计固定时间内两条流数据的匹配情况,需要自定义来实现——可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了内置的 join 算子。

窗口联结(Window Join)

一段时间的双流合并

定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

stream1.join(stream2).where(<KeySelector>) // stream1 的 keyBy.equalTo(<KeySelector>) // stream2 的 keyBy.window(<WindowAssigner>).apply(<JoinFunction>)
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1, 1),Tuple3.of("a", 11, 1),Tuple3.of("b", 2, 1),Tuple3.of("b", 12, 1),Tuple3.of("c", 14, 1),Tuple3.of("d", 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String,Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner((value, ts) -> value.f1 * 1000L));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0) // ds1 的keyby.equalTo(r2 -> r2.f0) // ds2 的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 关联上的数据,调用 join 方法* @param first ds1 的数据* @param second ds2 的数据*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "<----->" + second;}});join.print();env.execute();}
}

输出:

image-20231112153403293

window join:

  1. 两条流落在同一个时间窗口范围内才能匹配
  2. 根据 keyBy 的 key,来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join

间隔联结(Interval Join)

存在如下场景:两条流匹配的两个数据有可能刚好“卡在”窗口边缘两侧,窗口内就都没有匹配了,可以使用“间隔联结”(interval join)来解决。

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)“下界”(lowerBound);可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp +upperBound], 即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:这段时间作为可以匹配另一条流数据的“窗口”范围。

匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

image-20231112154002415

stream1
.keyBy(<KeySelector>)// KeyedStream 调用   
.intervalJoin(stream2.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Overridepublic void processElement(Integer left, Integer right,Context ctx, Collector<String> out){out.collect(left + "," + right);}
});

处理迟到数据,可以使用左右侧输出流

完整代码:

public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.socketTextStream("hadoop102", 7777).map((MapFunction<String, Tuple2<String, Integer>>) value -> {String[] datas = value.split(",");return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.socketTextStream("hadoop102", 8888).map((MapFunction<String, Tuple3<String, Integer, Integer>>) value -> {String[] datas = value.split(",");return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) -> value.f1 * 1000L));/*** 【Interval join】* 1、只支持事件时间* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后* 3、process 中,只能处理 join 上的数据* 4、两条流关联后的 watermark,以两条流中最小的为准* 5、如果 当前数据的事件时间 < 当前的 watermark,就是迟到数据,主流的 process 不处理* => between 后,可以指定将 左流 或 右流的迟到数据放入侧输出流* *///1. 分别做 keyby,key 其实就是关联条件KeyedStream<Tuple2<String, Integer>, String> ks1 = ds1.keyBy(r1 -> r1.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> ks2 = ds2.keyBy(r2 -> r2.f0);//2. 调用 interval join// 左右测输出流迟到标签OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)) // 指定上下界.sideOutputLeftLateData(ks1LateTag) // 将ks1的迟到数据,放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将ks2的迟到数据,放入侧输出流.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 两条流的数据匹配上,才会调用这个方法* @param left ks1 的数据* @param right ks2 的数据* @param ctx 上下文* @param out 采集器*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {// 进入这个方法,是关联上的数据out.collect(left + "<------>" + right);}});process.print("主流");process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");env.execute();}
}

相关文章:

【入门Flink】- 10基于时间的双流联合(join)

统计固定时间内两条流数据的匹配情况&#xff0c;需要自定义来实现——可以用窗口&#xff08;window&#xff09;来表示。为了更方便地实现基于时间的合流操作&#xff0c;Flink 的 DataStrema API 提供了内置的 join 算子。 窗口联结&#xff08;Window Join&#xff09; 一…...

【Python Opencv】图片与视频的操作

文章目录 前言一、opencv图片1.1 读取图像1.2 显示图像1.3 写入图像1.4 示例代码 二、Opencv视频2.1 从相机捕获视频获取摄像头一帧一帧读取显示图片VideoCapture 中的get和set函数示例代码 2.2 从文件播放视频示例代码 2.3 保存视频示例代码 总结 前言 在计算机视觉和图像处理…...

【从入门到起飞】JavaAPI—System,Runtime,Object,Objects类

&#x1f38a;专栏【JavaSE】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 &#x1f384;欢迎并且感谢大家指出小吉的问题&#x1f970; 文章目录 &#x1f354;System类⭐exit()⭐currentTimeMillis()&#x1f384;用…...

【Git】的分支和标签的讲解及实际应用场景

目录 讲解 环境讲述 分支标签的区别 分支 命令 场景应用 标签 命令 标签规范 讲解 环境讲述 当软件从开发到正式环境部署的过程中&#xff0c;不同环境的作用 开发环境&#xff1a;用于开发人员进行软件开发、测试和调试。在这个环境中&#xff0c;开发人员可以快速地…...

修改django开发环境runserver命令默认的端口

runserver默认8000端口 虽然python manage.py runserver 8080 可以指定端口&#xff0c;但不想每次runserver都添加8080这个参数 可以通过修改manage.py进行修改&#xff0c;只需要加三行&#xff1a; from django.core.management.commands.runserver import Command as Ru…...

kubeadm安装k8s高可用集群

目录 一、环境规划 二、注意事项&#xff1a; 三、环境准备&#xff1a; 1. 关闭防火墙规则&#xff0c;关闭selinux&#xff0c;关闭swap交换&#xff1a; 2. 修改主机名 3. 所有节点修改hosts文件&#xff1a; 4. 所有节点时间同步&#xff1a; 5. 所有节点实现Linux的资…...

来看看电脑上有哪些不为人知的小众软件?

​ 电脑上的各类软件有很多&#xff0c;除了那些常见的大众化软件&#xff0c;还有很多不为人知的小众软件&#xff0c;专注于实用功能&#xff0c;简洁干净、功能强悍。 1.桌面停靠栏工具——BitDock ​ BitDock是一款运行在Windows系统中的桌面停靠栏工具&#xff0c;功能实…...

一个进程最多可以创建多少个线程?

前言 话不多说&#xff0c;先来张脑图~ linux 虚拟内存知识回顾 虚拟内存空间长啥样 在 Linux 操作系统中&#xff0c;虚拟地址空间的内部又被分为内核空间和用户空间两部分&#xff0c;不同位数的系统&#xff0c;地址空间的范围也不同。比如最常见的 32 位和 64 位系统&am…...

ElasticSearch文档分析

ElasticSearch文档分析 包含下面的过程&#xff1a; 将一块文本分成适合于倒排索引的独立的 词条将这些词条统一化为标准格式以提高它们的“可搜索性”&#xff0c;或者 recall 分析器执行上面的工作。分析器实际上是将三个功能封装到了一个包里&#xff1a; 字符过滤器 首先&a…...

Xilinx FPGA平台DDR3设计详解(一):DDR SDRAM系统框架

DDR SDRAM&#xff08;双倍速率同步动态随机存储器&#xff09;是一种内存技术&#xff0c;它可以在时钟信号的上升沿和下降沿都传输数据&#xff0c;从而提高数据传输的速率。DDR SDRAM已经发展了多代&#xff0c;包括DDR、DDR2、DDR3、DDR4和DDR5&#xff0c;每一代都有不同的…...

Spring Data JPA方法名命名规则

最近巩固一下JPA&#xff0c;网上看到这些资料&#xff0c;这里记录巩固一下。 一、Spring Data Jpa方法定义的规则 简单条件查询 简单条件查询&#xff1a;查询某一个实体类或者集合。 按照Spring Data的规范的规定&#xff0c;查询方法以find | read | get开头&…...

【Leetcode Sheet】Weekly Practice 15

Leetcode Test 2586 统计范围内的元音字符串数(11.7) 给你一个下标从 0 开始的字符串数组 words 和两个整数&#xff1a;left 和 right 。 如果字符串以元音字母开头并以元音字母结尾&#xff0c;那么该字符串就是一个 元音字符串 &#xff0c;其中元音字母是 a、e、i、o、u…...

人力资源社会保障部办公厅关于推行专业技术人员职业资格电子证书的通知

&#xff08;人社厅发〔2021〕97号&#xff09; 各省、自治区、直辖市及新疆生产建设兵团人力资源社会保障厅&#xff08;局&#xff09;&#xff0c;中共海南省委人才发展局&#xff0c;国务院有关部门、直属机构人事部门&#xff0c;有关协会、学会&#xff1a; 为贯彻落实…...

什么是光电耦合器?如何选择型号及种类

光电耦合器(英文缩写为OC)亦称光电隔离器&#xff0c;简称光耦&#xff1b;以光为媒介传输电信号&#xff1b;它对输入、输出电信号有良好的隔离作用&#xff0c;是目前种类最多、用途最广的光电器件之一&#xff1b;所以&#xff0c;它在各种电路中得到广泛的应用。 光耦合器…...

hive里因为列名用了关键字导致建表失败

代码 现象 ParseException line 6:4 cannot recognize input near percent String COMMENT in column name or primary key or foreign key 23/11/13 11:52:57 ERROR org.apache.hadoop.hive.ql.Driver: FAILED: ParseException line 6:4 cannot recognize input near percent …...

MySQL 报错 incorrect datetime value ‘0000-00-00 00:00:00‘ for column

使用navicat导入数据时报错&#xff1a; MySQL 报错 incorrect datetime value ‘0000-00-00 00:00:00’ for column 这是因为当前的MySQL不支持datetime为0的情况。 MySQL报incorrect datetime value ‘0000-00-00 00:00:00’ for column错误原因&#xff0c;是由于在MySQL5.7…...

Jira Data Center(非集群)升级操作

一、升级准备 Jira 管理界面执行升级检查下载升级包&#xff0c;使用原操作方式相同的方式安装。我这里原来的版本是通过./atlassian-jira-software-9.11.2-x64.bin安装的&#xff0c;接下来下载atlassian-jira-software-9.11.3-x64.bin的安装文件停止 Jira&#xff0c;bin/st…...

Spring IOC - BeanDefinition解析

1. BeanDefinition的属性 BeanDefinition作为接口定义了属性的get、set方法。这些属性基本定义在其直接实现类AbstractBeanDefinition中&#xff0c;各属性的含义如下表所示&#xff1a; 类型 名称 含义 常量 SCOPE_DEFAULT 默认作用域&#xff1a;单例模式 AUT…...

ds前后台博客系统

源码私信或者公众号java大师获取 博客简介&#xff1a;本博客采用Spring Boot LayUI做为基础&#xff0c;进行的博客系统开发&#xff0c;与bootvue相比&#xff0c;更为适合开发简单的系统&#xff0c;并且更容易上手&#xff0c;简单&#xff01;高效&#xff01;更易上手&a…...

算法leetcode|88. 合并两个有序数组(rust重拳出击)

文章目录 88. 合并两个有序数组&#xff1a;样例 1&#xff1a;样例 2&#xff1a;样例 3&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 88. 合并两个有序数组&#xff1a; …...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义&#xff08;Task Definition&…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地

借阿里云中企出海大会的东风&#xff0c;以**「云启出海&#xff0c;智联未来&#xff5c;打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办&#xff0c;现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...

【Linux】Linux 系统默认的目录及作用说明

博主介绍&#xff1a;✌全网粉丝23W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…...

【LeetCode】3309. 连接二进制表示可形成的最大数值(递归|回溯|位运算)

LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 题目描述解题思路Java代码 题目描述 题目链接&#xff1a;LeetCode 3309. 连接二进制表示可形成的最大数值&#xff08;中等&#xff09; 给你一个长度为 3 的整数数组 nums。 现以某种顺序 连接…...

在 Spring Boot 项目里,MYSQL中json类型字段使用

前言&#xff1a; 因为程序特殊需求导致&#xff0c;需要mysql数据库存储json类型数据&#xff0c;因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...