当前位置: 首页 > news >正文

java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景

之前我们在一片文章里简单介绍过Flink的多流合并算子

java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join

今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。

Interval Join不是两个窗口做关联,更适用于处理乱序数据流之间的关联。它的作用更类似于从左流中a元素本身出发,对右流中一段时间内的数据进行关联(Inner Join:只关联相同Key的数据)。

如图所示:

 下边这条流中的2关联到上范围内的0/1

源码解析

Flink版本1.14.4

按住Ctrl+鼠标左键,点击process进入源码

 这里process方法是在KeydStream.java下IntervalJoined类下的方法

 包装返回类型的TypeInfomation(TypeInfo的介绍可以看上一篇)

 返回的outputType

SingleOutputStreamOperator使用给定的用户函数完成联接操作,该函数针对每个联接的元素对执行。这种方法允许传递输出类型的显式类型信息。

 IntervalJoinOperator初始化

左边界<=右边界检查

获取左流还有右流数据对应的序列化(从TypeInfo获取的)

继续看IntervalJoinOperator中的其余关键实现

open方法用来注册定时器

 初始化两个流的map状态

处理左侧流中的数据。每当数据到达左流时,它就会被添加到左缓冲区。将从右侧缓冲区中查找该元素可能的候选联接,如果该对位于用户定义的边界内,则将其传递给 ProcessJoinFunction

​ 同理处理右流

进入数据处理函数

获取数据,取出事件时间

 超过当前watermark的数据进行过滤

 

数据没问题的话,将数据添加到状态

 ​​

遍历另一条流的状态,遍历其中的数据,把满足时间要求的数据进行collect

​注册一个当前事件时间戳+右边界的定时器

定时器触发后,清空map状态中时间戳-左边界的那条数据

简单实例 

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkCode</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><jdk.version>1.8</jdk.version><jar.name>ubs-data-converter</jar.name><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.2</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${jdk.version}</source><target>${jdk.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${jar.name}</finalName><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.glassfish.jersey.core:jersey-common</exclude></excludes></artifactSet><relocations><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.shade.apache.kafka</shadedPattern></relocation></relocations><filters><filter><artifact>*</artifact><includes><include>org/apache/htrace/**</include><include>org/apache/avro/**</include><include>org/apache/flink/streaming/**</include><include>org/apache/flink/connector/**</include><include>org/apache/kafka/**</include><include>org/apache/hive/**</include><include>org/apache/hadoop/hive/**</include><include>org/apache/curator/**</include><include>org/apache/zookeeper/**</include><include>org/apache/jute/**</include><include>org/apache/thrift/**</include><include>org/apache/http/**</include><include>org/I0Itec/**</include><include>jline/**</include><include>com/yammer/**</include><include>kafka/**</include><include>org/apache/hadoop/hbase/**</include><include>com/alibaba/fastjson/**</include><include>org/elasticsearch/action/**</include><include>io/confluent/**</include><include>com/fasterxml/**</include><include>org/elasticsearch/**</include><include>hbase-default.xml</include><include>hbase-site.xml</include></includes></filter><filter><artifact>org.apache.hadoop.hive.*:*</artifact><excludes><exclude></exclude><exclude></exclude><exclude></exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

user bean

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class User{Integer id;Long t;}

 order bean 

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Order {Integer id;Long price;Long time;}

main

package ubs.app.intervaljoin;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import ubs.app.intervaljoin.bean.Order;
import ubs.app.intervaljoin.bean.User;
import ubs.app.intervaljoin.source.OrderSource;
import ubs.app.intervaljoin.source.UserSource;import java.time.Duration;public class IntervalJoinApp  {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置watermarkWatermarkStrategy<User> userWatermarkStrategy = WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<User>() {@Overridepublic long extractTimestamp(User element, long recordTimestamp) {return element.getT();}});DataStream<User> userDataStreamSource = env.addSource(new UserSource()).assignTimestampsAndWatermarks(userWatermarkStrategy);//设置watermarkWatermarkStrategy<Order> orderWatermarkStrategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Order>() {@Overridepublic long extractTimestamp(Order element, long recordTimestamp) {return element.getTime();}});DataStream<Order> orderDataStreamSource = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(orderWatermarkStrategy);env.setParallelism(1);SingleOutputStreamOperator<String> process = userDataStreamSource.keyBy(o -> o.getId()).intervalJoin(orderDataStreamSource.keyBy(o -> o.getId())).between(Time.seconds(-5), Time.seconds(0)).process(new ProcessJoinFunction<User, Order, String>() {@Overridepublic void processElement(User left, Order right, ProcessJoinFunction<User, Order, String>.Context ctx, Collector<String> out) throws Exception {Integer lid = left.getId();Long lt = left.getT();Integer rid = right.getId();long rt = right.getTime();out.collect(String.format("左%s 左时间%s 右%s 右时间%s 关联到了 %s", lid, lt/1000, rid, rt/1000, rt/1000-lt/1000));}});process.print();env.execute();}
}

相关文章:

java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景 之前我们在一片文章里简单介绍过Flink的多流合并算子 java Flink&#xff08;三十六&#xff09;Flink多流合并算子UNION、CONNECT、CoGroup、Join 今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。 Interval Join不是两个窗口做关联&#xff0c;…...

JsonUtility.ToJson 和UnityWebRequest 踩过的坑记录

项目场景&#xff1a; 需求&#xff1a;我在做网络接口链接&#xff0c;使用的unity自带的 UnityWebRequest &#xff0c;数据传输使用的json&#xff0c;json和自定义数据转化使用的也是unity自带的JsonUtility。使用过程中发现两个bug。 1.安全验证失败。 报错为&#xff1a…...

面试算法-69-三角形最小路径和

题目 给定一个三角形 triangle &#xff0c;找出自顶向下的最小路径和。 每一步只能移动到下一行中相邻的结点上。相邻的结点 在这里指的是 下标 与 上一层结点下标 相同或者等于 上一层结点下标 1 的两个结点。也就是说&#xff0c;如果正位于当前行的下标 i &#xff0c;那…...

流畅的 Python 第二版(GPT 重译)(九)

第四部分&#xff1a;控制流 第十七章&#xff1a;迭代器、生成器和经典协程 当我在我的程序中看到模式时&#xff0c;我认为这是一个麻烦的迹象。程序的形状应该只反映它需要解决的问题。代码中的任何其他规律性对我来说都是一个迹象&#xff0c;至少对我来说&#xff0c;这表…...

单片机学到什么程度才可以去工作?

单片机学到什么程度才可以去工作? 如果没有名校或学位的加持&#xff0c;你还得再努力一把&#xff0c;才能从激烈的竞争中胜出。以下这些技能可以给你加分&#xff0c;你看情况学&#xff0c;不同行业对这些组件会有取舍: . Cortex-M内核:理解MCU内核各部件的工作机制&#…...

内网穿透方案

内网穿透 有几种流行的内网穿透软件可供选择&#xff0c;它们都能帮助你在内网环境中建立与外部网络的连接。以下是其中一些常用的内网穿透软件&#xff1a; Ngrok&#xff1a;Ngrok 是一个简单易用的内网穿透工具&#xff0c;可以快速创建安全的公共 URL&#xff0c;让你可以…...

WordPress菜单函数wp_nav_menu各参数

wordpress主题制作时&#xff0c;常常会在不同的位置调用不同的菜单&#xff0c;使用下面的这个代码&#xff0c;再加上CSS给菜单做新的样式&#xff0c;可满足wordpress模板制作时对菜单调用的所有需求。 wp_nav_menu( array( theme_location > ,//导航别名 menu > , /…...

类于对象(上)--- 类的定义、访问限定符、计算类和对象的大小、this指针

在本篇中将会介绍一个很重要和很基础的Cpp知识——类和对象。对于类和对象的篇目将会有三篇&#xff0c;本篇是基础篇&#xff0c;将会介绍类的定义、类的访问限定符符和封装、计算类和对象的大小、以及类的 this 指针。目录如下&#xff1a; 目录 1. 关于类 1.1 类的定义 2 类…...

提升交付效率:Booking.com 金融技术团队的成功实践

Booking.com 金融技术业务部门的团队对其平台的后端和前端实施了一系列改进措施&#xff0c;并通过 DORA 指标将交付性能提高了一倍。此外&#xff0c;还使用了微前端 (MFE) 模式&#xff0c;将单体 FE 应用程序分解为多个可单独部署的分解应用程序。 2022 年年中&#xff0c;B…...

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑

文章目录 &#x1f343;前言&#x1f334;扫描线程的实现&#x1f332;实现消费消息&#x1f333;实现addConsumer()方法&#x1f38b;VirtualHost类订阅消息的完善⭕总结 &#x1f343;前言 本次开发目标 实现消费消息的核心逻辑 &#x1f334;扫描线程的实现 我们先给Cons…...

【Three.js】使用精灵图Sprite创建面朝相机的文本标注

目录 &#x1f41d;前言 &#x1f41d;canvas创建文字 &#x1f41d;将canvas作为纹理贴图加载到sprite中 &#x1f41d;封装方法 &#x1f41d;前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面&#xff0c;它通常和纹理贴图结合使用&#xff0c;贴图可以是一张图…...

C++中的类模板

C中的类模板 类模板 类模板在C中是一种非常强大的工具&#xff0c;它允许程序员编写与数据类型无关的代码。简单来说&#xff0c;类模板允许你定义一个蓝图&#xff0c;这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性&#xff0c;减少重复代码&#xff0…...

【每日一题】好子数组的最大分数

Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样&#xff0c;计算的都是最大矩形的面积。只不过多了一个约束&#xff1a;矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架

一、安装node.js 本来想粗略写一下的&#xff0c;但是搭建脚手架的时候&#xff0c;遇到了很多问题&#xff0c;浪费快两天时间&#xff0c;记录一下自己的解决办法希望对你们有帮助&#xff01; 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...

【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver

目录 前言 原理分析 step 0 step 1 EXP 前文&#xff1a;【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛&#x1f62d;&#xff0c;快做&#x1f92e;了 有了之前的经验&#xff0c;思路顺挺快的&#xff0c;中间不…...

视频技术1:使用ABLMediaServer推流rtsp

ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源&#xff0c;转换为rtsp流的过程。 作用&#xff1a;用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...

HTML5+CSS3+JS小实例:创意罗盘时钟

实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...

设计数据库之内部模式:SQL基本操作

Chapter4&#xff1a;设计数据库之内部模式&#xff1a;SQL基本操作 笔记来源&#xff1a; 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤&#xff1a; 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...

Git浅谈配置文件和免密登录

一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部)&#xff1a;项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...

【好玩的经典游戏】Docker环境下部署RPG网页小游戏

【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

ios苹果系统,js 滑动屏幕、锚定无效

现象&#xff1a;window.addEventListener监听touch无效&#xff0c;划不动屏幕&#xff0c;但是代码逻辑都有执行到。 scrollIntoView也无效。 原因&#xff1a;这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作&#xff0c;从而会影响…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)

本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...

深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用

文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么&#xff1f;1.1.2 感知机的工作原理 1.2 感知机的简单应用&#xff1a;基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...

Redis:现代应用开发的高效内存数据存储利器

一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发&#xff0c;其初衷是为了满足他自己的一个项目需求&#xff0c;即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源&#xff0c;Redis凭借其简单易用、…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用

一、方案背景​ 在现代生产与生活场景中&#xff0c;如工厂高危作业区、医院手术室、公共场景等&#xff0c;人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式&#xff0c;存在效率低、覆盖面不足、判断主观性强等问题&#xff0c;难以满足对人员打手机行为精…...

libfmt: 现代C++的格式化工具库介绍与酷炫功能

libfmt: 现代C的格式化工具库介绍与酷炫功能 libfmt 是一个开源的C格式化库&#xff0c;提供了高效、安全的文本格式化功能&#xff0c;是C20中引入的std::format的基础实现。它比传统的printf和iostream更安全、更灵活、性能更好。 基本介绍 主要特点 类型安全&#xff1a…...