从ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger代码看如何实现一个自定义的触发器
背景
当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项
ContinuousEventTimeTrigger源码解析
@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算,为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要?ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time == window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.get();if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.clear();fireTimestampState.add(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}@Overridepublic String toString() {return "ContinuousEventTimeTrigger(" + interval + ")";}@VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}
ContinuousProcessingTimeTrigger源码
@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);timestamp = ctx.getCurrentProcessingTime();// 注册计时器,为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器?if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time + interval);ctx.registerProcessingTimeTimer(time + interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE?状态是何时清理的?return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}@VisibleForTestingpublic long getInterval() {return interval;}@Overridepublic String toString() {return "ContinuousProcessingTimeTrigger(" + interval + ")";}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}
疑问:
1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册?
答案: 其实我们需要看下它注册后的目的作用是什么,ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算,那问题就来了,如果只是触发计算,那么如果没有,那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的
2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE?状态是什么时候清理的?
答案: 首先要明确状态的清理这个逻辑,状态其实包括窗口的状态,触发器的状态等,返回FIRE仅仅是触发计算,而不会清理任何状态,而假设返回FIRE_AND_PURGE的作用是触发计算,并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的),其实状态的清理是由WindowOperator在清理时间到时进行的(对于事件时间是窗口结束时间+迟到容忍间隔时间,对于处理时间是窗口结束时间),所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE,可以统一由WindowOperator在清理时间到之后统一清理状态
相关文章:
从ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger代码看如何实现一个自定义的触发器
背景 当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项 Continuo…...
Linux 5种网络模型
[参考]:《黑马程序员Redis》https://www.bilibili.com/video/BV1cr4y1671t/?p166&share_sourcecopy_web&vd_source9e65300ccca322aeb367bb1eb677b0fc [参考]:《操作系统》 [参考]:《UNIX网络编程》 为了避免用户应用导致冲突甚至内…...
10.1 调试事件读取寄存器
当读者需要获取到特定进程内的寄存器信息时,则需要在上述代码中进行完善,首先需要编写CREATE_PROCESS_DEBUG_EVENT事件,程序被首次加载进入内存时会被触发此事件,在该事件内首先我们通过lpStartAddress属性获取到当前程序的入口地…...
Linux系统常用指令篇---(一)
Linux系统常用指令篇—(一) 1.cd指令 Linux系统中,磁盘上的文件和目录被组成一棵目录树,每个节点都是目录或文件。 语法:cd 目录名 功能:改变工作目录。将当前工作目录改变到指定的目录下。 (简单理解为进入指定目录下) 举例: cd .. : 返…...
【初识Linux】:常见指令(1)
朋友们、伙计们,我们又见面了,本期来给大家解读一下有关Linux的基础知识点,如果看完之后对你有一定的启发,那么请留下你的三连,祝大家心想事成! C 语 言 专 栏:C语言:从入门到精通 数…...
STM32复习笔记(四):看门狗
目录 (一)简介 (二)IWDG IWDG的CUBEMX工程配置 IWDG相关函数(非常少,所以直接贴上来): (三)WWDG (一)简介 看门狗分为独立看门…...
【C++进阶(七)】仿函数深度剖析模板进阶讲解
💓博主CSDN主页:杭电码农-NEO💓 ⏩专栏分类:C从入门到精通⏪ 🚚代码仓库:NEO的学习日记🚚 🌹关注我🫵带你学习C 🔝🔝 模板进阶 1. 前言2. 仿函数的概念3. 仿函数的实…...
基于SSM的电动车上牌管理系统(有报告)。Javaee项目。
演示视频: 基于SSM的电动车上牌管理系统(有报告)。Javaee项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构,通过Spring SpringM…...
mstsc无法保存RDP凭据, 100%生效
问题 即使如下两项都打勾,其还是无法保存凭据,特别是连接Ubuntu (freerdp server): 解决方法 网上多种复杂方法,不生效,其思路是修改后台配置,以使mstsc跟平常一样自动记住凭据。最后,如下的…...
OpenGLES:绘制一个混色旋转的3D球体
效果展示 本篇博文会实现一个混色旋转的3D球体 一.球体解析 前面几篇博文讲解了如何使用OpenGLES实现不同的3D图形 本篇博文讲解怎样实现3D世界的代表图形:一个混色旋转的3D球体 1.1 极限正多面体 如果有学习过我前几篇3D图形绘制的博文,就知道要想…...
Spring AOP 基于注解源码整理
导入配置类 EnableAspectJAutoProxy 注解导入 AspectJAutoProxyRegistrarImportBeanDefinitionRegistrar#registerBeanDefinitions向容器中加入AnnotationAwareAspectJAutoProxyCreatorAnnotationAwareAspectJAutoProxyCreator#initBeanFactory初始化ReflectiveAspectJAdvisor…...
C语言 —— 函数栈帧的创建和销毁
在我们之前学习函数的时候,我们可能有很多困惑? 比如: 局部变量是怎么创建的?为什么局部变量的值是随机值?函数是怎么传参的?传参的顺序是怎样的?形参和实参是什么关系?函数调用是怎么做的?函数调用是结束后怎么返回的? 那么要解决这些问题, 我们就需要知道…...
Appleid苹果账号自动解锁改密(自动解锁二验改密码)
目前该项目能实现以下功能: 多用户使用,权限控制多账号管理账号分享页,支持设置密码、有效期、自定义HTML内容自动解锁与关闭二步验证自动/定时修改密码自动删除Apple ID中的设备代理池与Selenium集群,提高解锁成功率允许手动触发…...
Conflicting peer dependency: eslint@8.50.0
npm install 输出 npm ERR! code ERESOLVE npm ERR! ERESOLVE could not resolve npm ERR! npm ERR! While resolving: vue/eslint-config-standard6.1.0 npm ERR! Found: eslint-plugin-vue8.7.1 npm ERR! node_modules/eslint-plugin-vue npm ERR! dev eslint-plugin-vue…...
Vue3 defineProps使用
MyTag.vue <script setup> import { ref, nextTick, defineProps, defineEmits } from "vue"; const props defineProps({flag: Boolean,title: String, }); // 写成这样也可以 // const props defineProps(["flag", "title"]);const e…...
机器学习7:逻辑回归
一、说明 逻辑回归模型是处理分类问题的最常见机器学习模型之一。二项式逻辑回归只是逻辑回归模型的一种类型。它指的是两个变量的分类,其中概率用于确定二元结果,因此“二项式”中的“bi”。结果为真或假 — 0 或 1。 二项式逻辑回归的一个例子是预测人…...
生活小记-纸张尺寸
A系列纸张: A0:841 x 1189 毫米A1:594 x 841 毫米A2:420 x 594 毫米A3:297 x 420 毫米A4:210 x 297 毫米A5:148 x 210 毫米A6:105 x 148 毫米A7:74 x 105 毫米A8…...
【MATLAB源码-第41期】基于压缩感知算法的OFDM系统信道估计和LS算法对比仿真。
操作环境: MATLAB 2013b 1、算法描述 压缩感知(Compressed Sensing, CS)是一种从稀疏或可压缩信号中重构完整信号的数学理论和技术。下面详细介绍压缩感知和它在OFDM信道估计中的应用。 1. 压缩感知基本概念 在传统采样理论中࿰…...
优思学院|六西格玛将烹饪和美味提升至极致
最近,我们曾提到一个美国男子如何利用六西格玛来控制糖尿病。这表明六西格玛逐渐被认为是一个不仅可以在工作场所之外使用,尤其不仅限于制造业的系统。 六西格玛的核心理念是改进过程的质量,从而改善最终结果。如果你做了晚餐或尝试了一道新…...
git stash
git stash 是 Git 中一个非常有用的命令,用于临时保存当前工作目录中的修改,以便你可以切换到其他分支或处理其他任务而不丢失你的修改。它的主要用途是: 保存未提交的修改:你可以使用 git stash 命令将未提交的修改(包…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
R语言AI模型部署方案:精准离线运行详解
R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
Spring AI 入门:Java 开发者的生成式 AI 实践之路
一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
