当前位置: 首页 > news >正文

java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景

之前我们在一片文章里简单介绍过Flink的多流合并算子

java Flink(三十六)Flink多流合并算子UNION、CONNECT、CoGroup、Join

今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。

Interval Join不是两个窗口做关联,更适用于处理乱序数据流之间的关联。它的作用更类似于从左流中a元素本身出发,对右流中一段时间内的数据进行关联(Inner Join:只关联相同Key的数据)。

如图所示:

 下边这条流中的2关联到上范围内的0/1

源码解析

Flink版本1.14.4

按住Ctrl+鼠标左键,点击process进入源码

 这里process方法是在KeydStream.java下IntervalJoined类下的方法

 包装返回类型的TypeInfomation(TypeInfo的介绍可以看上一篇)

 返回的outputType

SingleOutputStreamOperator使用给定的用户函数完成联接操作,该函数针对每个联接的元素对执行。这种方法允许传递输出类型的显式类型信息。

 IntervalJoinOperator初始化

左边界<=右边界检查

获取左流还有右流数据对应的序列化(从TypeInfo获取的)

继续看IntervalJoinOperator中的其余关键实现

open方法用来注册定时器

 初始化两个流的map状态

处理左侧流中的数据。每当数据到达左流时,它就会被添加到左缓冲区。将从右侧缓冲区中查找该元素可能的候选联接,如果该对位于用户定义的边界内,则将其传递给 ProcessJoinFunction

​ 同理处理右流

进入数据处理函数

获取数据,取出事件时间

 超过当前watermark的数据进行过滤

 

数据没问题的话,将数据添加到状态

 ​​

遍历另一条流的状态,遍历其中的数据,把满足时间要求的数据进行collect

​注册一个当前事件时间戳+右边界的定时器

定时器触发后,清空map状态中时间戳-左边界的那条数据

简单实例 

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkCode</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><jdk.version>1.8</jdk.version><jar.name>ubs-data-converter</jar.name><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--Flink 版本--><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.10</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.9.2</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.16</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>1.9.2</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${jdk.version}</source><target>${jdk.version}</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><finalName>${jar.name}</finalName><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.glassfish.jersey.core:jersey-common</exclude></excludes></artifactSet><relocations><relocation><pattern>com.google.common</pattern><shadedPattern>com.shade.google.common</shadedPattern></relocation><relocation><pattern>org.apache.kafka</pattern><shadedPattern>org.shade.apache.kafka</shadedPattern></relocation></relocations><filters><filter><artifact>*</artifact><includes><include>org/apache/htrace/**</include><include>org/apache/avro/**</include><include>org/apache/flink/streaming/**</include><include>org/apache/flink/connector/**</include><include>org/apache/kafka/**</include><include>org/apache/hive/**</include><include>org/apache/hadoop/hive/**</include><include>org/apache/curator/**</include><include>org/apache/zookeeper/**</include><include>org/apache/jute/**</include><include>org/apache/thrift/**</include><include>org/apache/http/**</include><include>org/I0Itec/**</include><include>jline/**</include><include>com/yammer/**</include><include>kafka/**</include><include>org/apache/hadoop/hbase/**</include><include>com/alibaba/fastjson/**</include><include>org/elasticsearch/action/**</include><include>io/confluent/**</include><include>com/fasterxml/**</include><include>org/elasticsearch/**</include><include>hbase-default.xml</include><include>hbase-site.xml</include></includes></filter><filter><artifact>org.apache.hadoop.hive.*:*</artifact><excludes><exclude></exclude><exclude></exclude><exclude></exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

user bean

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class User{Integer id;Long t;}

 order bean 

package ubs.app.intervaljoin.bean;import lombok.*;@Data
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Order {Integer id;Long price;Long time;}

main

package ubs.app.intervaljoin;import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import ubs.app.intervaljoin.bean.Order;
import ubs.app.intervaljoin.bean.User;
import ubs.app.intervaljoin.source.OrderSource;
import ubs.app.intervaljoin.source.UserSource;import java.time.Duration;public class IntervalJoinApp  {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置watermarkWatermarkStrategy<User> userWatermarkStrategy = WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<User>() {@Overridepublic long extractTimestamp(User element, long recordTimestamp) {return element.getT();}});DataStream<User> userDataStreamSource = env.addSource(new UserSource()).assignTimestampsAndWatermarks(userWatermarkStrategy);//设置watermarkWatermarkStrategy<Order> orderWatermarkStrategy = WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<Order>() {@Overridepublic long extractTimestamp(Order element, long recordTimestamp) {return element.getTime();}});DataStream<Order> orderDataStreamSource = env.addSource(new OrderSource()).assignTimestampsAndWatermarks(orderWatermarkStrategy);env.setParallelism(1);SingleOutputStreamOperator<String> process = userDataStreamSource.keyBy(o -> o.getId()).intervalJoin(orderDataStreamSource.keyBy(o -> o.getId())).between(Time.seconds(-5), Time.seconds(0)).process(new ProcessJoinFunction<User, Order, String>() {@Overridepublic void processElement(User left, Order right, ProcessJoinFunction<User, Order, String>.Context ctx, Collector<String> out) throws Exception {Integer lid = left.getId();Long lt = left.getT();Integer rid = right.getId();long rt = right.getTime();out.collect(String.format("左%s 左时间%s 右%s 右时间%s 关联到了 %s", lid, lt/1000, rid, rt/1000, rt/1000-lt/1000));}});process.print();env.execute();}
}

相关文章:

java Flink(四十三)Flink Interval Join源码解析以及简单实例

背景 之前我们在一片文章里简单介绍过Flink的多流合并算子 java Flink&#xff08;三十六&#xff09;Flink多流合并算子UNION、CONNECT、CoGroup、Join 今天我们通过Flink 1.14的源码对Flink的Interval Join进行深入的理解。 Interval Join不是两个窗口做关联&#xff0c;…...

JsonUtility.ToJson 和UnityWebRequest 踩过的坑记录

项目场景&#xff1a; 需求&#xff1a;我在做网络接口链接&#xff0c;使用的unity自带的 UnityWebRequest &#xff0c;数据传输使用的json&#xff0c;json和自定义数据转化使用的也是unity自带的JsonUtility。使用过程中发现两个bug。 1.安全验证失败。 报错为&#xff1a…...

面试算法-69-三角形最小路径和

题目 给定一个三角形 triangle &#xff0c;找出自顶向下的最小路径和。 每一步只能移动到下一行中相邻的结点上。相邻的结点 在这里指的是 下标 与 上一层结点下标 相同或者等于 上一层结点下标 1 的两个结点。也就是说&#xff0c;如果正位于当前行的下标 i &#xff0c;那…...

流畅的 Python 第二版(GPT 重译)(九)

第四部分&#xff1a;控制流 第十七章&#xff1a;迭代器、生成器和经典协程 当我在我的程序中看到模式时&#xff0c;我认为这是一个麻烦的迹象。程序的形状应该只反映它需要解决的问题。代码中的任何其他规律性对我来说都是一个迹象&#xff0c;至少对我来说&#xff0c;这表…...

单片机学到什么程度才可以去工作?

单片机学到什么程度才可以去工作? 如果没有名校或学位的加持&#xff0c;你还得再努力一把&#xff0c;才能从激烈的竞争中胜出。以下这些技能可以给你加分&#xff0c;你看情况学&#xff0c;不同行业对这些组件会有取舍: . Cortex-M内核:理解MCU内核各部件的工作机制&#…...

内网穿透方案

内网穿透 有几种流行的内网穿透软件可供选择&#xff0c;它们都能帮助你在内网环境中建立与外部网络的连接。以下是其中一些常用的内网穿透软件&#xff1a; Ngrok&#xff1a;Ngrok 是一个简单易用的内网穿透工具&#xff0c;可以快速创建安全的公共 URL&#xff0c;让你可以…...

WordPress菜单函数wp_nav_menu各参数

wordpress主题制作时&#xff0c;常常会在不同的位置调用不同的菜单&#xff0c;使用下面的这个代码&#xff0c;再加上CSS给菜单做新的样式&#xff0c;可满足wordpress模板制作时对菜单调用的所有需求。 wp_nav_menu( array( theme_location > ,//导航别名 menu > , /…...

类于对象(上)--- 类的定义、访问限定符、计算类和对象的大小、this指针

在本篇中将会介绍一个很重要和很基础的Cpp知识——类和对象。对于类和对象的篇目将会有三篇&#xff0c;本篇是基础篇&#xff0c;将会介绍类的定义、类的访问限定符符和封装、计算类和对象的大小、以及类的 this 指针。目录如下&#xff1a; 目录 1. 关于类 1.1 类的定义 2 类…...

提升交付效率:Booking.com 金融技术团队的成功实践

Booking.com 金融技术业务部门的团队对其平台的后端和前端实施了一系列改进措施&#xff0c;并通过 DORA 指标将交付性能提高了一倍。此外&#xff0c;还使用了微前端 (MFE) 模式&#xff0c;将单体 FE 应用程序分解为多个可单独部署的分解应用程序。 2022 年年中&#xff0c;B…...

【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑

文章目录 &#x1f343;前言&#x1f334;扫描线程的实现&#x1f332;实现消费消息&#x1f333;实现addConsumer()方法&#x1f38b;VirtualHost类订阅消息的完善⭕总结 &#x1f343;前言 本次开发目标 实现消费消息的核心逻辑 &#x1f334;扫描线程的实现 我们先给Cons…...

【Three.js】使用精灵图Sprite创建面朝相机的文本标注

目录 &#x1f41d;前言 &#x1f41d;canvas创建文字 &#x1f41d;将canvas作为纹理贴图加载到sprite中 &#x1f41d;封装方法 &#x1f41d;前言 在Three.js中精灵Sprite是一个总是面朝摄像机的平面&#xff0c;它通常和纹理贴图结合使用&#xff0c;贴图可以是一张图…...

C++中的类模板

C中的类模板 类模板 类模板在C中是一种非常强大的工具&#xff0c;它允许程序员编写与数据类型无关的代码。简单来说&#xff0c;类模板允许你定义一个蓝图&#xff0c;这个蓝图可以用来生成具体类型的类。使用类模板可以提高代码的复用性&#xff0c;减少重复代码&#xff0…...

【每日一题】好子数组的最大分数

Tag 【单调栈】【暴力枚举】【数组】【2024-03-19】 题目来源 1793. 好子数组的最大分数 解题思路 本题和 84. 柱状图中最大的矩形 一样&#xff0c;计算的都是最大矩形的面积。只不过多了一个约束&#xff1a;矩形必须包含下标 k。 以下的方法一和方法二是 84. 柱状图中最…...

Vue2(七):超详细vue开发环境搭建(win7),nodejs下载与安装,安装淘宝镜像(报错已解决),配置脚手架

一、安装node.js 本来想粗略写一下的&#xff0c;但是搭建脚手架的时候&#xff0c;遇到了很多问题&#xff0c;浪费快两天时间&#xff0c;记录一下自己的解决办法希望对你们有帮助&#xff01; 1.下载nodejs 安装包下载链接【CNPM Binaries Mirror】 下载我划线的这个&am…...

【Web】记录CISCN 2021 总决赛 ezj4va题目复现——AspectJWeaver

目录 前言 原理分析 step 0 step 1 EXP 前文&#xff1a;【Web】浅聊Java反序列化之AspectJWeaver——任意文件写入-CSDN博客 前言 这就是当年传说中的零解题嘛&#x1f62d;&#xff0c;快做&#x1f92e;了 有了之前的经验&#xff0c;思路顺挺快的&#xff0c;中间不…...

视频技术1:使用ABLMediaServer推流rtsp

ABLMediaServer定位是高性能、高稳定、开箱即用、商用级别的流媒体服务器 下边展示了如何把1个mp3作为输入源&#xff0c;转换为rtsp流的过程。 作用&#xff1a;用rtsp模拟摄像头的视频流 1、启动ABLMediaServer ABLMediaServer-2024-03-13\WinX64\ABLMediaServer.exe 配…...

HTML5+CSS3+JS小实例:创意罗盘时钟

实例:创意罗盘时钟 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=…...

设计数据库之内部模式:SQL基本操作

Chapter4&#xff1a;设计数据库之内部模式&#xff1a;SQL基本操作 笔记来源&#xff1a; 1.《漫画数据库》—科学出版社 2.SQL | DDL, DQL, DML, DCL and TCL Commands 设计数据库的步骤&#xff1a; 概念模式 概念模式(conceptual schema)是指将现实世界模型化的阶段进而&…...

Git浅谈配置文件和免密登录

一、文章内容 简述git三种配置ssh免密登录以及遇见的问题git可忽略文件git remote 相关操作 二、Git三种配置 项目配置文件(局部)&#xff1a;项目路径/.git/config 文件 git config --local user.name name git config --local user.email 123qq.cc全局配置文(所有用户): …...

【好玩的经典游戏】Docker环境下部署RPG网页小游戏

【好玩的经典游戏】Docker环境下部署RPG网页小游戏 一、react-tetris小游戏介绍1.1 react-tetris小游戏简介1.2 项目预览二、本次实践介绍2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 安装Docker环境3.2 检查Docker服务状态3.3 检查Docker版本3.4 检查docker compose…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络&#xff0c;将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具&#xff0c;支持 Chrome、Firefox、Safari 等主流浏览器&#xff0c;提供多语言 API&#xff08;Python、JavaScript、Java、.NET&#xff09;。它的特点包括&a…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程&#xff0c;系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...