当前位置: 首页 > article >正文

Flink源码阅读:双流操作

Window Join我们先回顾一下 window join 的使用方法。DataStreamTuple2String, Double result source1.join(source2) .where(record - record.f0) .equalTo(record - record.f0) .window(TumblingEventTimeWindows.of(Time.seconds(2L))) .apply(new JoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double() { Override public Tuple2String, Double join(Tuple2String, Double record1, Tuple2String, Double record2) throws Exception { return Tuple2.of(record1.f0, record1.f1); } });上述调用链路类的流转如下在 WithWindow 的 apply 方法中是构建了一个 coGroupedWindowedStream然后调用它的 apply 方法。public T SingleOutputStreamOperatorT apply( JoinFunctionT1, T2, T function, TypeInformationT resultType) { // clean the closure function input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream.apply(new JoinCoGroupFunction(function), resultType); }这里可以看出Window Join 的底层是转换成 coGroup 进行处理的。在 JoinCoGroupFunction 中coGroup 方法就是对两个流进行两层遍历然后将其应用到我们自定义的 JoinFunction 上。private static class JoinCoGroupFunctionT1, T2, T extends WrappingFunctionJoinFunctionT1, T2, T implements CoGroupFunctionT1, T2, T { private static final long serialVersionUID 1L; public JoinCoGroupFunction(JoinFunctionT1, T2, T wrappedFunction) { super(wrappedFunction); } Override public void coGroup(IterableT1 first, IterableT2 second, CollectorT out) throws Exception { for (T1 val1 : first) { for (T2 val2 : second) { out.collect(wrappedFunction.join(val1, val2)); } } } }CoGroupCoGroup 的整体用法和流程与 Join 都类似我们就不逐个介绍了。我们直接来看 apply 方法。public T SingleOutputStreamOperatorT apply( CoGroupFunctionT1, T2, T function, TypeInformationT resultType) { // clean the closure function input1.getExecutionEnvironment().clean(function); UnionTypeInfoT1, T2 unionType new UnionTypeInfo(input1.getType(), input2.getType()); UnionKeySelectorT1, T2, KEY unionKeySelector new UnionKeySelector(keySelector1, keySelector2); SingleOutputStreamOperatorTaggedUnionT1, T2 taggedInput1 input1.map(new Input1TaggerT1, T2()); taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false); taggedInput1.returns(unionType); SingleOutputStreamOperatorTaggedUnionT1, T2 taggedInput2 input2.map(new Input2TaggerT1, T2()); taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false); taggedInput2.returns(unionType); DataStreamTaggedUnionT1, T2 unionStream taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream new KeyedStreamTaggedUnionT1, T2, KEY( unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger ! null) { windowedStream.trigger(trigger); } if (evictor ! null) { windowedStream.evictor(evictor); } if (allowedLateness ! null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply( new CoGroupWindowFunctionT1, T2, T, KEY, W(function), resultType); }在 apply 方法中先把两个流进行合并然后创建了 windowedStream并把窗口相关的属性设置好最后是调用 windowedStream 的 apply 方法。在调用windowedStream.apply方法时又将 function 包装成了 CoGroupWindowFunction。private static class CoGroupWindowFunctionT1, T2, T, KEY, W extends Window extends WrappingFunctionCoGroupFunctionT1, T2, T implements WindowFunctionTaggedUnionT1, T2, T, KEY, W { private static final long serialVersionUID 1L; public CoGroupWindowFunction(CoGroupFunctionT1, T2, T userFunction) { super(userFunction); } Override public void apply(KEY key, W window, IterableTaggedUnionT1, T2 values, CollectorT out) throws Exception { ListT1 oneValues new ArrayList(); ListT2 twoValues new ArrayList(); for (TaggedUnionT1, T2 val : values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); } }在 CoGroupWindowFunction 的 apply 方法中是将主键为 key 的流分开两个流再去调用 JoinCoGroupFunction 的 coGroup 方法。这里的 values 都是相同的 key原因是在 window 中维护的 windowState它内部是一个 stateTable窗口的 namespace 和 key 共同维护一个 state当窗口触发时就会对相同 key 的数据调用 apply 方法。Interval Join梳理完了 Window Join 和 CoGroup 之后我们再接着看 Interval Join。还是先来回顾一下用法。DataStreamTuple2String, Double intervalJoinResult source1.keyBy(record - record.f0) .intervalJoin(source2.keyBy(record - record.f0)) .between(Time.seconds(-2), Time.seconds(2)) .process(new ProcessJoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double() { Override public void processElement(Tuple2String, Double record1, Tuple2String, Double record2, ProcessJoinFunctionTuple2String, Double, Tuple2String, Double, Tuple2String, Double.Context context, CollectorTuple2String, Double out) throws Exception { out.collect(Tuple2.of(record1.f0, record1.f1 record2.f1)); } });通过用法可以看出interval join 传入的对象是两个 KeyedStream接着使用 between 方法定义 interval join 的上下边界最后调用 process 方法执行计算逻辑。在调用过程中类型的转换如下图。我们主要关注 process 的逻辑。public OUT SingleOutputStreamOperatorOUT process( ProcessJoinFunctionIN1, IN2, OUT processJoinFunction, TypeInformationOUT outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunctionIN1, IN2, OUT cleanedUdf left.getExecutionEnvironment().clean(processJoinFunction); if (isEnableAsyncState) { final AsyncIntervalJoinOperatorKEY, IN1, IN2, OUT operator new AsyncIntervalJoinOperator( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, left.getType() .createSerializer( left.getExecutionConfig().getSerializerConfig()), right.getType() .createSerializer( right.getExecutionConfig().getSerializerConfig()), cleanedUdf); return left.connect(right) .keyBy(keySelector1, keySelector2) .transform(Interval Join [Async], outputType, operator); } else { final IntervalJoinOperatorKEY, IN1, IN2, OUT operator new IntervalJoinOperator( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, left.getType() .createSerializer( left.getExecutionConfig().getSerializerConfig()), right.getType() .createSerializer( right.getExecutionConfig().getSerializerConfig()), cleanedUdf); return left.connect(right) .keyBy(keySelector1, keySelector2) .transform(Interval Join, outputType, operator); } }Interval join 是基于 ConnectedStream 实现的ConnectedStream 提供了更加通用的双流操作它将两个流组合成一个 TwoInputTransformation然后加入执行图中。具体的 Operator 是 IntervalJoinOperator 或 AsyncIntervalJoinOperator它们都是 TwoInputStreamOperator 的实现类提供processElement1和processElement2两个方法分别处理两个输入源的数据最终都调用的是 processElement。private THIS, OTHER void processElement( final StreamRecordTHIS record, final MapStateLong, ListIntervalJoinOperator.BufferEntryTHIS ourBuffer, final MapStateLong, ListIntervalJoinOperator.BufferEntryOTHER otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue record.getValue(); final long ourTimestamp record.getTimestamp(); if (ourTimestamp Long.MIN_VALUE) { throw new FlinkException( Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.); } if (isLate(ourTimestamp)) { sideOutput(ourValue, ourTimestamp, isLeft); return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.EntryLong, ListBufferEntryOTHER bucket : otherBuffer.entries()) { final long timestamp bucket.getKey(); if (timestamp ourTimestamp relativeLowerBound || timestamp ourTimestamp relativeUpperBound) { continue; } for (BufferEntryOTHER entry : bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime (relativeUpperBound 0L) ? ourTimestamp relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } }在 IntervalJoinOperator 中维护了两个 MapState每个消息进来的时候都会加入到 MapState 中key 是 timestampvalue 是一个元素的列表。然后遍历另一个 MapState得到符合条件的数据。最后是为每条数据注册一个定时器当时间超过有效范围后会从 MapState 中清除这个时间戳的数据。总结本文我们梳理了 Flink 的三种双流操作的源码我们了解到 Window Join 底层是通过 CoGroup 实现的。CoGroup 本身是将两个流合并成 WindowedStream 并依赖于 WindowState 进行数据 join。最后 Interval Join 是通过 ConnectedStreams 实现的内部的 IntervalJoinOperator 会维护两个 MapState通过 MapState 进行数据关联。

相关文章:

Flink源码阅读:双流操作

Window Join我们先回顾一下 window join 的使用方法。DataStream<Tuple2<String, Double>> result source1.join(source2).where(record -> record.f0).equalTo(record -> record.f0).window(TumblingEventTimeWindows.of(Time.seconds(2L))).apply(new Joi…...

微信QQ防撤回神器:RevokeMsgPatcher 2.1 终极使用教程

微信QQ防撤回神器&#xff1a;RevokeMsgPatcher 2.1 终极使用教程 【免费下载链接】RevokeMsgPatcher :trollface: A hex editor for WeChat/QQ/TIM - PC版微信/QQ/TIM防撤回补丁&#xff08;我已经看到了&#xff0c;撤回也没用了&#xff09; 项目地址: https://gitcode.co…...

3步搭建高效NTQQ机器人:LuckyLilliaBot全功能配置指南

3步搭建高效NTQQ机器人&#xff1a;LuckyLilliaBot全功能配置指南 【免费下载链接】LuckyLilliaBot NTQQ的OneBot API插件 项目地址: https://gitcode.com/gh_mirrors/li/LuckyLilliaBot LuckyLilliaBot是一款基于OneBot11协议的NTQQ机器人框架&#xff0c;它能帮助开发…...

L1-064 估值一亿的ai核心代码 (分数20)字符串处理

•无论用户说什么&#xff0c;首先把对方说的话在一行中原样打印出来&#xff1b;•消除原文中多余空格&#xff1a;把相邻单词间的多个空格换成 1 个空格&#xff0c;把行首尾的空格全部删掉&#xff0c;把标点符号前面的空格删掉&#xff1b; •把原文中所有大写英文字母变成…...

Monaco-Editor插件使用小坑

无法通过鼠标进行选中文本<div id"monacoEditor" class"monacoEditor"></div>外层添加了splinter拖拽组件&#xff0c;导致mousemove事件被拦截&#xff0c;给monaco-editor添加css&#xff1a;pointer-events&#xff1a;auto.monacoEditor .…...

硬件解放:开源工具突破设备限制的深度探索指南

硬件解放&#xff1a;开源工具突破设备限制的深度探索指南 【免费下载链接】OpenCore-Legacy-Patcher Experience macOS just like before 项目地址: https://gitcode.com/GitHub_Trending/op/OpenCore-Legacy-Patcher 当你的设备被厂商贴上"过时"标签&#x…...

实战应用:基于快马平台从零到一构建功能完备的openclaw101风格项目平台

今天想和大家分享一个实战经验&#xff1a;如何从零开始构建一个功能完备的开源项目托管平台。类似openclaw101这样的网站&#xff0c;其实用现代开发工具和云平台可以快速实现。下面我就把整个搭建过程拆解成几个关键环节&#xff0c;希望能给想做类似项目的朋友一些参考。 项…...

效率提升:基于快马AI生成vmware虚拟机自动化部署脚本,告别手动配置

在开发过程中&#xff0c;虚拟机环境的搭建往往是耗时又容易出错的环节。特别是当需要频繁创建不同配置的虚拟机时&#xff0c;手动操作不仅效率低下&#xff0c;还容易遗漏关键步骤。最近尝试用自动化脚本解决这个问题&#xff0c;效果出乎意料地好&#xff0c;分享下具体实现…...

Qwen3-14B日志分析教程:ELK栈收集推理请求、响应、错误全链路追踪

Qwen3-14B日志分析教程&#xff1a;ELK栈收集推理请求、响应、错误全链路追踪 1. 为什么需要日志分析 当你在私有化部署Qwen3-14B模型时&#xff0c;可能会遇到各种问题&#xff1a;为什么推理速度突然变慢了&#xff1f;为什么API返回了错误响应&#xff1f;哪些请求消耗了最…...

BG3 Mod Manager:智能模组管理工具让博德之门3模组体验升级

BG3 Mod Manager&#xff1a;智能模组管理工具让博德之门3模组体验升级 【免费下载链接】BG3ModManager A mod manager for Baldurs Gate 3. This is the only official source! 项目地址: https://gitcode.com/gh_mirrors/bg/BG3ModManager 博德之门3作为一款备受欢迎的…...

Fiji图像处理软件更新故障排查指南:当科学工具遇到“升级烦恼“

Fiji图像处理软件更新故障排查指南&#xff1a;当科学工具遇到"升级烦恼" 【免费下载链接】fiji A "batteries-included" distribution of ImageJ :battery: 项目地址: https://gitcode.com/gh_mirrors/fi/fiji Fiji作为生物图像分析领域的瑞士军刀…...

宁德时代2026春招开启:6000+offer,这一轮机会在扩大

很多人现在还在犹豫一个问题&#xff1a;新能源是不是已经开始降温了&#xff1f;现在再投&#xff0c;还能不能拿到好的岗位&#xff1f;但从今年的招聘情况来看&#xff0c;趋势其实很清晰&#xff1a;岗位没有减少&#xff0c;而是在结构性增加。尤其是动力电池、储能、电池…...

Phi-3-mini-4k-instruct新手入门:Ollama部署详解,从安装到第一个对话

Phi-3-mini-4k-instruct新手入门&#xff1a;Ollama部署详解&#xff0c;从安装到第一个对话 1. 认识Phi-3-mini-4k-instruct&#xff1a;轻量级AI助手 Phi-3-mini-4k-instruct是一个仅有38亿参数的轻量级语言模型&#xff0c;由微软团队开发。虽然体积小巧&#xff0c;但它在…...

如何彻底解决消息撤回难题?RevokeMsgPatcher带来的革新方案

如何彻底解决消息撤回难题&#xff1f;RevokeMsgPatcher带来的革新方案 【免费下载链接】RevokeMsgPatcher :trollface: A hex editor for WeChat/QQ/TIM - PC版微信/QQ/TIM防撤回补丁&#xff08;我已经看到了&#xff0c;撤回也没用了&#xff09; 项目地址: https://gitco…...

S2-Pro模型推理服务高可用部署:基于Docker与Kubernetes的架构

S2-Pro模型推理服务高可用部署&#xff1a;基于Docker与Kubernetes的架构 1. 为什么需要高可用部署 在实际生产环境中&#xff0c;AI模型推理服务的稳定性直接影响业务连续性。想象一下&#xff0c;当你的电商平台正在举行大促活动&#xff0c;AI推荐系统突然宕机&#xff0c…...

小白也能玩转AI翻译:translategemma图文翻译快速入门指南

小白也能玩转AI翻译&#xff1a;translategemma图文翻译快速入门指南 1. 认识translategemma&#xff1a;你的私人翻译助手 translategemma-12b-it是Google基于Gemma 3模型开发的开源翻译模型&#xff0c;它能同时处理文本和图片中的文字翻译。想象一下&#xff0c;你正在国外…...

Hunyuan-MT-7B多语种能力:Pixel Language Portal在联合国六种官方语言互译中的表现

Hunyuan-MT-7B多语种能力&#xff1a;Pixel Language Portal在联合国六种官方语言互译中的表现 1. 引言&#xff1a;当像素冒险遇见多语言翻译 在全球化交流日益频繁的今天&#xff0c;语言障碍仍然是横亘在不同文化之间的无形壁垒。传统翻译工具往往给人冰冷、机械的使用体验…...

OmenSuperHub终极指南:简单三步掌控暗影精灵硬件性能

OmenSuperHub终极指南&#xff1a;简单三步掌控暗影精灵硬件性能 【免费下载链接】OmenSuperHub 项目地址: https://gitcode.com/gh_mirrors/om/OmenSuperHub 你是否厌倦了官方Omen Gaming Hub的臃肿体积和烦人广告&#xff1f;是否希望获得纯净的硬件控制体验&#xf…...

5步搞定Qwen3-Embedding-4B向量服务:SGlang部署亲测有效

5步搞定Qwen3-Embedding-4B向量服务&#xff1a;SGlang部署亲测有效 1. Qwen3-Embedding-4B模型简介 1.1 模型核心能力 Qwen3-Embedding-4B是通义实验室推出的新一代文本嵌入模型&#xff0c;专为高效语义编码设计。作为Qwen3系列的一员&#xff0c;它在保持中等参数规模&am…...

屏幕取色与设计辅助工具 ColorWanted:提升设计师与开发者工作效率的专业解决方案

屏幕取色与设计辅助工具 ColorWanted&#xff1a;提升设计师与开发者工作效率的专业解决方案 【免费下载链接】ColorWanted Screen color picker for Windows (Windows 上的屏幕取色器) 项目地址: https://gitcode.com/gh_mirrors/co/ColorWanted 你是否曾遇到这样的工作…...

马年市场快报分析:欧美组合式一氧化碳及可燃气体报警器指南

马年市场快报分析&#xff1a;欧美组合式一氧化碳及可燃气体报警器指南根据您提供的快报内容&#xff0c;我将从专业角度逐步分析欧美组合式一氧化碳&#xff08;CO&#xff09;及可燃气体报警器的关键信息&#xff0c;包括安全标准、风险因素、探测器区别、安装建议以及相关产…...

云容笔谈效果对比评测: vs Stable Diffusion 3.5东方人像生成质量深度分析

云容笔谈效果对比评测&#xff1a; vs Stable Diffusion 3.5东方人像生成质量深度分析 1. 评测背景与目的 东方人像生成一直是AI图像生成领域的特殊挑战。西方模型在生成东方人脸时常常出现面部结构不自然、表情僵硬、缺乏东方神韵等问题。本次评测将深入对比「云容笔谈」和S…...

解锁3大智能功能:League-Toolkit让普通玩家也能玩转专业级游戏分析

解锁3大智能功能&#xff1a;League-Toolkit让普通玩家也能玩转专业级游戏分析 【免费下载链接】League-Toolkit An all-in-one toolkit for LeagueClient. Gathering power &#x1f680;. 项目地址: https://gitcode.com/gh_mirrors/le/League-Toolkit 在英雄联盟的召…...

ubuntu秘钥生成PKCS1 格式秘钥

openssl genrsa -out key 2048 openssl rsa -in key -out key2 -traditional...

Odoo 19成本核算避坑指南:标准成本法下差异分析、委外加工汇率风险与WIP分录丢失问题

Odoo 19成本核算实战避坑指南&#xff1a;标准成本差异、委外加工与WIP分录的深度解决方案 在制造业数字化转型浪潮中&#xff0c;Odoo 19作为开源ERP的领军者&#xff0c;其制造与会计模块的深度集成能力备受企业青睐。然而&#xff0c;当我们真正将系统投入生产环境时&#x…...

AI Token Platform - AI Token 中转计费平台

AI Token Platform - AI Token 中转计费平台 AI Token Platform 是一款企业级 AI Token 中转与计费平台&#xff0c;深度融合 多模型 AI 网关、Kill Bill 计费引擎 与 企业级会员管理 三大核心能力。平台以"统一 API 接入 灵活计费策略 企业级会员体系"为核心理念…...

PyTorch 2.8镜像实战落地:教育机构AI教学平台(图文+视频+LLM)集成方案

PyTorch 2.8镜像实战落地&#xff1a;教育机构AI教学平台&#xff08;图文视频LLM&#xff09;集成方案 1. 教育AI平台的技术挑战与解决方案 现代教育机构在构建AI教学平台时面临三大技术难题&#xff1a;多模态内容生成、算力资源管理和教学场景适配。PyTorch 2.8深度学习镜…...

从模电理论到商用落地,应届生必做的无线充项目,H 桥 / LC 谐振 + QI 协议全栈详解

很多初学嵌入式的同学、正在准备秋招的电子信息类应届生&#xff0c;都会遇到两个核心困境&#xff1a;一是模电学了 H 桥、LC 谐振&#xff0c;只会背公式做题&#xff0c;根本不知道怎么在真实产品里落地&#xff1b;二是学完单片机只会点灯&#xff0c;写的都是流水账代码&a…...

【米家IoT开发】巧用Charles抓包,高效定位与调试网络接口

1. 为什么Charles是米家IoT开发的调试神器 当你开发米家扩展程序时&#xff0c;最头疼的莫过于接口返回异常数据&#xff0c;或者请求莫名其妙失败。这时候如果只能靠猜问题出在哪里&#xff0c;那简直就是在黑暗中摸索。我刚开始做米家IoT开发时&#xff0c;就经常被这种问题困…...

猫抓插件:浏览器资源嗅探的革命性解决方案

猫抓插件&#xff1a;浏览器资源嗅探的革命性解决方案 【免费下载链接】cat-catch 猫抓 浏览器资源嗅探扩展 / cat-catch Browser Resource Sniffing Extension 项目地址: https://gitcode.com/GitHub_Trending/ca/cat-catch 你是否曾在浏览网页时&#xff0c;看到心仪的…...