flink的AggregateFunction,merge方法作用范围
背景
AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现
AggregateFunction.merge方法调用时机
AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并
public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow =mergingWindows.addWindow(window,new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows,W stateWindowResult,Collection<W> mergedStateWindows)throws Exception {triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});
继续查看AbstractHeapMergingState.mergeNamespaces方法,
public void mergeNamespaces(N target, Collection<N> sources) throws Exception {if (sources == null || sources.isEmpty()) {return; // nothing to do}final StateTable<K, N, SV> map = stateTable;SV merged = null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState = map.removeAndGetOld(source);if (merged != null && sourceState != null) {//此处合并状态并调用AggregateFunction.merge方法merged = mergeState(merged, sourceState);} else if (merged == null) {merged = sourceState;}}// merge into the target, if neededif (merged != null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}
这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错
相关文章:

flink的AggregateFunction,merge方法作用范围
背景 AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现 AggregateFunction.merge方法调用时…...

Day25力扣打卡
打卡记录 寻找旋转排序数组中的最小值(二分) 链接 由于是旋转排序数组,所以整个数组有两部分是递增的,选取右侧最后元素,即可将整个数组分为大于该元素和小于该元素,碰头地段即为最小值。 class Solutio…...

SpringCloud - OpenFeign 参数传递和响应处理(全网最详细)
目录 一、OpenFeign 参数传递和响应处理 1.1、feign 客户端参数传递 1.1.1、零散类型参数传递 1. 例如 querystring 方式传参 2. 例如路径方式传参 1.1.2、对象参数传递 1. 对象参数传递案例 1.1.3、数组参数传递 1. 数组传参案例 1.1.4、集合类型的参数传递…...

Postgresql数据类型-布尔类型
前面介绍了PostgreSQL支持的数字类型、字符类型、时间日期类型,这些数据类型是关系型数据库的常规数据类型,此外PostgreSQL还支持很多非常规数据类型,比如布尔类型、网络地址类型、数组类型、范围类型、json/jsonb类型等,从这一节…...

SPASS-交叉表分析
导入数据 修改变量测量类型 分析->描述统计->交叉表 表中显示行、列变量通过卡方检验给出的独立性检验结果。共使用了三种检验方法。上表各种检验方法显著水平sig.都远远小于0.05,所以有理由拒绝实验准备与评价结果是独立的假设,即认为实验准备这个评价指标是…...

用Python的requests库来模拟爬取地图商铺信息
由于谷歌地图抓取商铺信息涉及到API使用和反爬虫策略,直接爬取可能会遇到限制。但是,我们可以使用Python的requests库来模拟爬取某个网页,然后通过正则表达式或其他文本处理方法来提取商铺信息。以下是一个简单的示例: # 导入requ…...

使用EvoMap/Three.js模拟无人机灯光秀
一、创建地图对象 首先我们需要创建一个EM.Map对象,该对象代表了一个地图实例,并设置id为"map"的文档元素作为地图的容器。 let map new EM.Map("map",{zoom:22.14,center:[8.02528, -29.27638, 0],pitch:71.507,roll:2.01,maxPit…...

11.9存储器实验总结(单ram,双ram,FIFO)
实验设计 单端口RAM实现 双端口RAM实现 FIFO实现 文件结构为...
linux(ubuntu)安装并使用scrcpy
scrcpy 是一款开源的在计算机上显示和控制 Android 设备的工具。要在 Ubuntu 上使用 scrcpy,你可以按照以下步骤进行安装: 通过命令行安装 scrcpy: 安装 scrcpy: 打开终端(Terminal)并执行以下命令来安装…...
linux rsyslog安装配置
syslog是Linux系统默认的日志守护进程。默认的syslog配置文件是/etc/rsyslog.conf文件。syslog守护进程是可配置的,它允许人们为每一种类型的系统信息精确地指定一个存放地点。syslog使用UDP 514/TCP 514端口。 1.环境信息 环境信息 HostnameIpAddressOS versionModuleNoterh…...

美国Embarcadero公司正式发布2023 RAD Studio Delphi C++ Builder 12 Athens
Embarcadero 非常高兴地宣布发布 RAD Studio 12 Athens 以及 Delphi 12 和 CBuilder 12。RAD Studio 12 Athens 版本包含令人兴奋的新功能,为该产品的未来奠定了基础。 目录 主要新功能 C 的奇妙之处Delphi 的一些不错的补充FireMonkey 和 Skia 作为新基金会采用 MD…...

树莓派4B的测试记录(CPU、FFMPEG)
本文是用来记录树莓派 4B 的一些测试记录。 温度 下面记录中的风扇和大风扇是这样的: 为什么要用大风扇呢?因为小风扇在外壳上,气流通过外壳的珊格会有啸叫,声音不大但是很烦人,大风扇没这个问题,并且同样…...

物联网AI MicroPython学习之语法 二进制与ASCII转换
学物联网,来万物简单IoT物联网!! ubinascii 介绍 ubinascii模块实现了二进制数据与各种ASCII编码之间的转换。 接口说明 a2b_base64 - 解码base64编码的数据 函数原型:ubinascii.a2b_base64(data)注意事项: 在解码…...

学之思项目的搭建部署 打jar包失败的解决方法
学之思系统介绍部署java环境安装maven安装node.js前端打包工具命令npmGit命令获取源代码安装配置mysql前端打包打包jar包服务上线!!!打jar包失败的解决方法 学之思系统介绍 学之思开源考试系统是一款 java vue 的前后端不分离的考试系统。主要优点是开发、部署简单快捷、界面…...

[100天算法】-定长子串中元音的最大数目(day 67)
题目描述 给你字符串 s 和整数 k 。请返回字符串 s 中长度为 k 的单个子字符串中可能包含的最大元音字母数。英文中的 元音字母 为(a, e, i, o, u)。示例 1:输入:s "abciiidef", k 3 输出:3 解释…...

Elastic Observability 8.11:ES|QL、APM 中的通用分析和增强的 SLOs
作者:Tom Grabowski, Katrin Freihofner, Israel Ogbole Elastic Observability 8.11 引入了 ES|QL for Observability(技术预览版)、Universal ProfilingTM 和 Elastic APM 集成,以及针对 Elastic Observability 的新 SLO &#…...

TexGen简单模型对应inp文件简单梳理-2
模型 默认最简单的编织复材,编辑材料属性时发现基体属性设置正常,各向同性材料,但是纱线的材料属性却没有弹性性能的设置。 导出inp文件后,导入ABAQUS中其实可以看到有两种材料,纱线也是有属性的。 ABAQUS中修改属性的…...

VUE获取当前日期的周日和周六
<template><div><div click"handleLast()">上一周</div><div click"handleNext()">下一周</div><el-calendarref"monChild"v-model"value":first-day-of-week"7":range"[sta…...
K8S篇之k8s containerd模式fail to pull image certificate signed by unknown authority
"k8s containerd模式fail to pull image certificate signed by unknown authority"的问题 解决方案:您有两个选择:配置证书或禁用证书验证。 配置证书:您可以为 containerd 配置证书,使其信任由未知机构签名的证书。 具…...

算法进阶指南图论 最优贸易
最优贸易 题目描述 C C C 国有 n n n 个大城市和 m m m 条道路,每条道路连接这 n n n 个城市中的某两个城市。任意两个城市之间最多只有一条道路直接相连。这 m m m 条道路中有一部分为单向通行的道路,一部分为双向通行的道路,双向通行的…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...

AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...

DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...