当前位置: 首页 > news >正文

从ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger代码看如何实现一个自定义的触发器

背景

当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项

ContinuousEventTimeTrigger源码解析

@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算,为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要?ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time == window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.get();if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.clear();fireTimestampState.add(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}@Overridepublic String toString() {return "ContinuousEventTimeTrigger(" + interval + ")";}@VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}

ContinuousProcessingTimeTrigger源码

@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);timestamp = ctx.getCurrentProcessingTime();// 注册计时器,为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器?if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time + interval);ctx.registerProcessingTimeTimer(time + interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE?状态是何时清理的?return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}@VisibleForTestingpublic long getInterval() {return interval;}@Overridepublic String toString() {return "ContinuousProcessingTimeTrigger(" + interval + ")";}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}

疑问:

1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册?
答案: 其实我们需要看下它注册后的目的作用是什么,ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算,那问题就来了,如果只是触发计算,那么如果没有,那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的
2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE?状态是什么时候清理的?
答案: 首先要明确状态的清理这个逻辑,状态其实包括窗口的状态,触发器的状态等,返回FIRE仅仅是触发计算,而不会清理任何状态,而假设返回FIRE_AND_PURGE的作用是触发计算,并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的),其实状态的清理是由WindowOperator在清理时间到时进行的(对于事件时间是窗口结束时间+迟到容忍间隔时间,对于处理时间是窗口结束时间),所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE,可以统一由WindowOperator在清理时间到之后统一清理状态

相关文章:

从ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger代码看如何实现一个自定义的触发器

背景 当我们想要实现提前触发计算的触发器时&#xff0c;我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类&#xff0c;我们本文就从代码角度解析下实现自定义触发器的一些注意事项 Continuo…...

Linux 5种网络模型

[参考]&#xff1a;《黑马程序员Redis》https://www.bilibili.com/video/BV1cr4y1671t/?p166&share_sourcecopy_web&vd_source9e65300ccca322aeb367bb1eb677b0fc [参考]&#xff1a;《操作系统》 [参考]&#xff1a;《UNIX网络编程》 为了避免用户应用导致冲突甚至内…...

10.1 调试事件读取寄存器

当读者需要获取到特定进程内的寄存器信息时&#xff0c;则需要在上述代码中进行完善&#xff0c;首先需要编写CREATE_PROCESS_DEBUG_EVENT事件&#xff0c;程序被首次加载进入内存时会被触发此事件&#xff0c;在该事件内首先我们通过lpStartAddress属性获取到当前程序的入口地…...

Linux系统常用指令篇---(一)

Linux系统常用指令篇—(一) 1.cd指令 Linux系统中&#xff0c;磁盘上的文件和目录被组成一棵目录树&#xff0c;每个节点都是目录或文件。 语法:cd 目录名 功能&#xff1a;改变工作目录。将当前工作目录改变到指定的目录下。 (简单理解为进入指定目录下) 举例: cd .. : 返…...

【初识Linux】:常见指令(1)

朋友们、伙计们&#xff0c;我们又见面了&#xff0c;本期来给大家解读一下有关Linux的基础知识点&#xff0c;如果看完之后对你有一定的启发&#xff0c;那么请留下你的三连&#xff0c;祝大家心想事成&#xff01; C 语 言 专 栏&#xff1a;C语言&#xff1a;从入门到精通 数…...

STM32复习笔记(四):看门狗

目录 &#xff08;一&#xff09;简介 &#xff08;二&#xff09;IWDG IWDG的CUBEMX工程配置 IWDG相关函数&#xff08;非常少&#xff0c;所以直接贴上来&#xff09;&#xff1a; &#xff08;三&#xff09;WWDG &#xff08;一&#xff09;简介 看门狗分为独立看门…...

【C++进阶(七)】仿函数深度剖析模板进阶讲解

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; 模板进阶 1. 前言2. 仿函数的概念3. 仿函数的实…...

基于SSM的电动车上牌管理系统(有报告)。Javaee项目。

演示视频&#xff1a; 基于SSM的电动车上牌管理系统&#xff08;有报告&#xff09;。Javaee项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;通过Spring SpringM…...

mstsc无法保存RDP凭据, 100%生效

问题 即使如下两项都打勾&#xff0c;其还是无法保存凭据&#xff0c;特别是连接Ubuntu (freerdp server)&#xff1a; 解决方法 网上多种复杂方法&#xff0c;不生效&#xff0c;其思路是修改后台配置&#xff0c;以使mstsc跟平常一样自动记住凭据。最后&#xff0c;如下的…...

OpenGLES:绘制一个混色旋转的3D球体

效果展示 本篇博文会实现一个混色旋转的3D球体 一.球体解析 前面几篇博文讲解了如何使用OpenGLES实现不同的3D图形 本篇博文讲解怎样实现3D世界的代表图形&#xff1a;一个混色旋转的3D球体 1.1 极限正多面体 如果有学习过我前几篇3D图形绘制的博文&#xff0c;就知道要想…...

Spring AOP 基于注解源码整理

导入配置类 EnableAspectJAutoProxy 注解导入 AspectJAutoProxyRegistrarImportBeanDefinitionRegistrar#registerBeanDefinitions向容器中加入AnnotationAwareAspectJAutoProxyCreatorAnnotationAwareAspectJAutoProxyCreator#initBeanFactory初始化ReflectiveAspectJAdvisor…...

C语言 —— 函数栈帧的创建和销毁

在我们之前学习函数的时候&#xff0c;我们可能有很多困惑? 比如: 局部变量是怎么创建的?为什么局部变量的值是随机值?函数是怎么传参的?传参的顺序是怎样的?形参和实参是什么关系?函数调用是怎么做的?函数调用是结束后怎么返回的? 那么要解决这些问题, 我们就需要知道…...

Appleid苹果账号自动解锁改密(自动解锁二验改密码)

目前该项目能实现以下功能&#xff1a; 多用户使用&#xff0c;权限控制多账号管理账号分享页&#xff0c;支持设置密码、有效期、自定义HTML内容自动解锁与关闭二步验证自动/定时修改密码自动删除Apple ID中的设备代理池与Selenium集群&#xff0c;提高解锁成功率允许手动触发…...

Conflicting peer dependency: eslint@8.50.0

npm install 输出 npm ERR! code ERESOLVE npm ERR! ERESOLVE could not resolve npm ERR! npm ERR! While resolving: vue/eslint-config-standard6.1.0 npm ERR! Found: eslint-plugin-vue8.7.1 npm ERR! node_modules/eslint-plugin-vue npm ERR! dev eslint-plugin-vue…...

Vue3 defineProps使用

MyTag.vue <script setup> import { ref, nextTick, defineProps, defineEmits } from "vue"; const props defineProps({flag: Boolean,title: String, }); // 写成这样也可以 // const props defineProps(["flag", "title"]);const e…...

机器学习7:逻辑回归

一、说明 逻辑回归模型是处理分类问题的最常见机器学习模型之一。二项式逻辑回归只是逻辑回归模型的一种类型。它指的是两个变量的分类&#xff0c;其中概率用于确定二元结果&#xff0c;因此“二项式”中的“bi”。结果为真或假 — 0 或 1。 二项式逻辑回归的一个例子是预测人…...

生活小记-纸张尺寸

A系列纸张&#xff1a; A0&#xff1a;841 x 1189 毫米A1&#xff1a;594 x 841 毫米A2&#xff1a;420 x 594 毫米A3&#xff1a;297 x 420 毫米A4&#xff1a;210 x 297 毫米A5&#xff1a;148 x 210 毫米A6&#xff1a;105 x 148 毫米A7&#xff1a;74 x 105 毫米A8&#xf…...

【MATLAB源码-第41期】基于压缩感知算法的OFDM系统信道估计和LS算法对比仿真。

操作环境&#xff1a; MATLAB 2013b 1、算法描述 压缩感知&#xff08;Compressed Sensing, CS&#xff09;是一种从稀疏或可压缩信号中重构完整信号的数学理论和技术。下面详细介绍压缩感知和它在OFDM信道估计中的应用。 1. 压缩感知基本概念 在传统采样理论中&#xff0…...

优思学院|六西格玛将烹饪和美味提升至极致

最近&#xff0c;我们曾提到一个美国男子如何利用六西格玛来控制糖尿病。这表明六西格玛逐渐被认为是一个不仅可以在工作场所之外使用&#xff0c;尤其不仅限于制造业的系统。 六西格玛的核心理念是改进过程的质量&#xff0c;从而改善最终结果。如果你做了晚餐或尝试了一道新…...

git stash

git stash 是 Git 中一个非常有用的命令&#xff0c;用于临时保存当前工作目录中的修改&#xff0c;以便你可以切换到其他分支或处理其他任务而不丢失你的修改。它的主要用途是&#xff1a; 保存未提交的修改&#xff1a;你可以使用 git stash 命令将未提交的修改&#xff08;包…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例&#xff0c;模拟20个网页的爬取&#xff0c;每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程&#xff1a;允许程序同时执行多个任务&#xff0c;提高IO密集型任务&#xff08;如网络请求&#xff09;的效率…...

【HTTP三个基础问题】

面试官您好&#xff01;HTTP是超文本传输协议&#xff0c;是互联网上客户端和服务器之间传输超文本数据&#xff08;比如文字、图片、音频、视频等&#xff09;的核心协议&#xff0c;当前互联网应用最广泛的版本是HTTP1.1&#xff0c;它基于经典的C/S模型&#xff0c;也就是客…...

JavaScript 数据类型详解

JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型&#xff08;Primitive&#xff09; 和 对象类型&#xff08;Object&#xff09; 两大类&#xff0c;共 8 种&#xff08;ES11&#xff09;&#xff1a; 一、原始类型&#xff08;7种&#xff09; 1. undefined 定…...

Qemu arm操作系统开发环境

使用qemu虚拟arm硬件比较合适。 步骤如下&#xff1a; 安装qemu apt install qemu-system安装aarch64-none-elf-gcc 需要手动下载&#xff0c;下载地址&#xff1a;https://developer.arm.com/-/media/Files/downloads/gnu/13.2.rel1/binrel/arm-gnu-toolchain-13.2.rel1-x…...

OD 算法题 B卷【正整数到Excel编号之间的转换】

文章目录 正整数到Excel编号之间的转换 正整数到Excel编号之间的转换 excel的列编号是这样的&#xff1a;a b c … z aa ab ac… az ba bb bc…yz za zb zc …zz aaa aab aac…; 分别代表以下的编号1 2 3 … 26 27 28 29… 52 53 54 55… 676 677 678 679 … 702 703 704 705;…...

android RelativeLayout布局

<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...

零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程

STM32F1 本教程使用零知标准板&#xff08;STM32F103RBT6&#xff09;通过I2C驱动ICM20948九轴传感器&#xff0c;实现姿态解算&#xff0c;并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化&#xff0c;适合嵌入式及物联网开发者。在基础驱动上新增…...

DeepSeek源码深度解析 × 华为仓颉语言编程精粹——从MoE架构到全场景开发生态

前言 在人工智能技术飞速发展的今天&#xff0c;深度学习与大模型技术已成为推动行业变革的核心驱动力&#xff0c;而高效、灵活的开发工具与编程语言则为技术创新提供了重要支撑。本书以两大前沿技术领域为核心&#xff0c;系统性地呈现了两部深度技术著作的精华&#xff1a;…...