flink的AggregateFunction,merge方法作用范围
背景
AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现
AggregateFunction.merge方法调用时机
AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示

对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并
public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow =mergingWindows.addWindow(window,new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows,W stateWindowResult,Collection<W> mergedStateWindows)throws Exception {triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});
继续查看AbstractHeapMergingState.mergeNamespaces方法,
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {if (sources == null || sources.isEmpty()) {return; // nothing to do}final StateTable<K, N, SV> map = stateTable;SV merged = null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState = map.removeAndGetOld(source);if (merged != null && sourceState != null) {//此处合并状态并调用AggregateFunction.merge方法merged = mergeState(merged, sourceState);} else if (merged == null) {merged = sourceState;}}// merge into the target, if neededif (merged != null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}
这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错
相关文章:
flink的AggregateFunction,merge方法作用范围
背景 AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现 AggregateFunction.merge方法调用时…...
Day25力扣打卡
打卡记录 寻找旋转排序数组中的最小值(二分) 链接 由于是旋转排序数组,所以整个数组有两部分是递增的,选取右侧最后元素,即可将整个数组分为大于该元素和小于该元素,碰头地段即为最小值。 class Solutio…...
SpringCloud - OpenFeign 参数传递和响应处理(全网最详细)
目录 一、OpenFeign 参数传递和响应处理 1.1、feign 客户端参数传递 1.1.1、零散类型参数传递 1. 例如 querystring 方式传参 2. 例如路径方式传参 1.1.2、对象参数传递 1. 对象参数传递案例 1.1.3、数组参数传递 1. 数组传参案例 1.1.4、集合类型的参数传递…...
Postgresql数据类型-布尔类型
前面介绍了PostgreSQL支持的数字类型、字符类型、时间日期类型,这些数据类型是关系型数据库的常规数据类型,此外PostgreSQL还支持很多非常规数据类型,比如布尔类型、网络地址类型、数组类型、范围类型、json/jsonb类型等,从这一节…...
SPASS-交叉表分析
导入数据 修改变量测量类型 分析->描述统计->交叉表 表中显示行、列变量通过卡方检验给出的独立性检验结果。共使用了三种检验方法。上表各种检验方法显著水平sig.都远远小于0.05,所以有理由拒绝实验准备与评价结果是独立的假设,即认为实验准备这个评价指标是…...
用Python的requests库来模拟爬取地图商铺信息
由于谷歌地图抓取商铺信息涉及到API使用和反爬虫策略,直接爬取可能会遇到限制。但是,我们可以使用Python的requests库来模拟爬取某个网页,然后通过正则表达式或其他文本处理方法来提取商铺信息。以下是一个简单的示例: # 导入requ…...
使用EvoMap/Three.js模拟无人机灯光秀
一、创建地图对象 首先我们需要创建一个EM.Map对象,该对象代表了一个地图实例,并设置id为"map"的文档元素作为地图的容器。 let map new EM.Map("map",{zoom:22.14,center:[8.02528, -29.27638, 0],pitch:71.507,roll:2.01,maxPit…...
11.9存储器实验总结(单ram,双ram,FIFO)
实验设计 单端口RAM实现 双端口RAM实现 FIFO实现 文件结构为...
linux(ubuntu)安装并使用scrcpy
scrcpy 是一款开源的在计算机上显示和控制 Android 设备的工具。要在 Ubuntu 上使用 scrcpy,你可以按照以下步骤进行安装: 通过命令行安装 scrcpy: 安装 scrcpy: 打开终端(Terminal)并执行以下命令来安装…...
linux rsyslog安装配置
syslog是Linux系统默认的日志守护进程。默认的syslog配置文件是/etc/rsyslog.conf文件。syslog守护进程是可配置的,它允许人们为每一种类型的系统信息精确地指定一个存放地点。syslog使用UDP 514/TCP 514端口。 1.环境信息 环境信息 HostnameIpAddressOS versionModuleNoterh…...
美国Embarcadero公司正式发布2023 RAD Studio Delphi C++ Builder 12 Athens
Embarcadero 非常高兴地宣布发布 RAD Studio 12 Athens 以及 Delphi 12 和 CBuilder 12。RAD Studio 12 Athens 版本包含令人兴奋的新功能,为该产品的未来奠定了基础。 目录 主要新功能 C 的奇妙之处Delphi 的一些不错的补充FireMonkey 和 Skia 作为新基金会采用 MD…...
树莓派4B的测试记录(CPU、FFMPEG)
本文是用来记录树莓派 4B 的一些测试记录。 温度 下面记录中的风扇和大风扇是这样的: 为什么要用大风扇呢?因为小风扇在外壳上,气流通过外壳的珊格会有啸叫,声音不大但是很烦人,大风扇没这个问题,并且同样…...
物联网AI MicroPython学习之语法 二进制与ASCII转换
学物联网,来万物简单IoT物联网!! ubinascii 介绍 ubinascii模块实现了二进制数据与各种ASCII编码之间的转换。 接口说明 a2b_base64 - 解码base64编码的数据 函数原型:ubinascii.a2b_base64(data)注意事项: 在解码…...
学之思项目的搭建部署 打jar包失败的解决方法
学之思系统介绍部署java环境安装maven安装node.js前端打包工具命令npmGit命令获取源代码安装配置mysql前端打包打包jar包服务上线!!!打jar包失败的解决方法 学之思系统介绍 学之思开源考试系统是一款 java vue 的前后端不分离的考试系统。主要优点是开发、部署简单快捷、界面…...
[100天算法】-定长子串中元音的最大数目(day 67)
题目描述 给你字符串 s 和整数 k 。请返回字符串 s 中长度为 k 的单个子字符串中可能包含的最大元音字母数。英文中的 元音字母 为(a, e, i, o, u)。示例 1:输入:s "abciiidef", k 3 输出:3 解释…...
Elastic Observability 8.11:ES|QL、APM 中的通用分析和增强的 SLOs
作者:Tom Grabowski, Katrin Freihofner, Israel Ogbole Elastic Observability 8.11 引入了 ES|QL for Observability(技术预览版)、Universal ProfilingTM 和 Elastic APM 集成,以及针对 Elastic Observability 的新 SLO &#…...
TexGen简单模型对应inp文件简单梳理-2
模型 默认最简单的编织复材,编辑材料属性时发现基体属性设置正常,各向同性材料,但是纱线的材料属性却没有弹性性能的设置。 导出inp文件后,导入ABAQUS中其实可以看到有两种材料,纱线也是有属性的。 ABAQUS中修改属性的…...
VUE获取当前日期的周日和周六
<template><div><div click"handleLast()">上一周</div><div click"handleNext()">下一周</div><el-calendarref"monChild"v-model"value":first-day-of-week"7":range"[sta…...
K8S篇之k8s containerd模式fail to pull image certificate signed by unknown authority
"k8s containerd模式fail to pull image certificate signed by unknown authority"的问题 解决方案:您有两个选择:配置证书或禁用证书验证。 配置证书:您可以为 containerd 配置证书,使其信任由未知机构签名的证书。 具…...
算法进阶指南图论 最优贸易
最优贸易 题目描述 C C C 国有 n n n 个大城市和 m m m 条道路,每条道路连接这 n n n 个城市中的某两个城市。任意两个城市之间最多只有一条道路直接相连。这 m m m 条道路中有一部分为单向通行的道路,一部分为双向通行的道路,双向通行的…...
计算机网络复习(第三章):数据链路层
数据链路层:成帧、差错控制、可靠传输与介质访问控制 引言:数据链路层在网络中的位置 数据链路层位于物理层之上、网络层之下。物理层负责把比特转换成电信号、光信号或无线电波并在传输介质上传播,而数据链路层要解决的问题更进一步…...
工作绩效数据—>工作绩效信息—>工作绩效报告
在软考10大管理知识域, 经常会看到这3个词。 所有10大管理都有工作绩效数据、工作绩效信息,在整合管理、沟通管理中还包括工作绩效报告 这3个词是有先后顺序的, 第一 数据, 第二 信息, 第三 报告。 从5大过程组的角度看…...
AI短剧制作工具源码部署教程,从环境搭建到SAAS多开
温馨提示:文末有资源获取方式随着AI生成技术的快速迭代,短剧创作的门槛正在急剧下降。最近有不少朋友咨询如何搭建一套属于自己的AI短剧创作平台,今天就简单记录一下从环境准备到SAAS多开的完整过程。源码获取方式在源码闪购网。一、环境准备…...
超越向量搜索:三层图结构RAG系统实现多跳推理
1. 项目概述:当传统向量检索遇到瓶颈时在信息检索领域,基于向量相似度的搜索(Vector Search)早已成为处理非结构化数据的标配方案。但从业者们都清楚一个事实:当查询复杂度超过某个阈值时,单纯依赖向量距离…...
FPGA断电程序就丢?手把手教你用Vivado把程序‘焊死’进Flash(以S25FL128为例)
FPGA断电程序丢失?Vivado固化Flash全流程实战(S25FL128为例) 刚接触FPGA开发的工程师常会遇到这样的困惑:明明通过JTAG成功下载了程序,设备运行一切正常,但一旦断电重启,所有配置都消失了。这种…...
SpringMVC5.0
Spring留言板实现预期结果可以发布并显示点击提交后,显示并清除输入框并且再次刷新后,不会清除下面的缓存约定前后端交互接口Ⅰ 发布留言 url : /message/publish . param(参数) : from,to,say . return : true / false .Ⅱ 查询留言 url : /message/get…...
认识 DeerFlow:一个跑在 LangGraph 上的 Super Agent Harness
DeerFlow 给自己的定位不是"又一个 Agent 框架",而是 Super Agent Harness。这个词不是随便用的——它意味着 DeerFlow 要解决的不是"Agent 能不能跑",而是"Agent 能不能跑得住"。它和 Harness Engineering、Agent Team、…...
不会 PS、AI 也能画顶刊插图
做科研的朋友大概都遇见过这种尴尬:实验做了大半年,数据整理得清晰合理,论文逻辑也打磨通顺,偏偏就卡在一张论文插图上。零设计基础不会用专业绘图软件,PS的图层逻辑理不清,通用AI绘图生成的图到处都是专业…...
如何5分钟搞定多游戏模组管理:XXMI启动器的完整解决方案
如何5分钟搞定多游戏模组管理:XXMI启动器的完整解决方案 【免费下载链接】XXMI-Launcher Modding platform for GI, HSR, WW and ZZZ 项目地址: https://gitcode.com/gh_mirrors/xx/XXMI-Launcher 还在为《原神》、《崩坏:星穹铁道》、《绝区零》…...
【VSCode日志调试终极指南】:20年DevOps专家亲授5大高阶技巧,90%开发者从未用过的隐藏功能
更多请点击: https://intelliparadigm.com 第一章:VSCode日志调试的核心价值与演进脉络 在现代前端与全栈开发中,日志调试已从辅助手段跃升为关键诊断范式。VSCode 通过集成终端、调试器与扩展生态,将传统 console.log 的原始输出…...
