Flink触发器Trigger
前言
在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求,可以通过 WindowedStream.trigger() 指定自定义的 Trigger。
认识Trigger
所有触发器都是org.apache.flink.streaming.api.windowing.triggers.Trigger的子类,父类定义了一个触发器应该具备的能力:
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;public Trigger() {}public abstract TriggerResult onElement(T var1, long var2, W var4, TriggerContext var5) throws Exception;public abstract TriggerResult onProcessingTime(long var1, W var3, TriggerContext var4) throws Exception;public abstract TriggerResult onEventTime(long var1, W var3, TriggerContext var4) throws Exception;public boolean canMerge() {return false;}public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}public abstract void clear(W var1, TriggerContext var2) throws Exception;
}
Trigger 抽象类提供了六个方法:
- onElement 元素被加入到窗口时触发,返回值决定窗口是否计算
- onProcessingTime 注册的ProcessingTime任务时间到达时触发
- onEventTime 注册的EventTime任务时间到达时触发
- canMerge 是否可以合并
- onMerge Trigger合并
- clear 窗口被移除时触发
如果数据本身携带窗口是否触发计算的标记,那么重写 onElement() 方法即可;但是在时间窗口计算模型下,并不是通过元素来判断窗口是否需要计算的,而是窗口的结束时间到达时才出发计算,这个时候就需要用到定时器 Timer。Timer 就像一个闹钟,我们可以在它上面注册一个未来的时间戳,当这个时间到达时,对应的事件就会被触发,就像闹钟喊醒沉睡的你一样。
Trigger 会有自己的 Timer,TriggerContext 提供了注册时间事件的方法,你可以根据自己采用的时间语义,调用对应的注册方法来注册事件。
triggerContext.registerProcessingTimeTimer();
triggerContext.registerEventTimeTimer();
在重写这三个方法时,要重点关注方法的返回值。方法的返回值有两个作用:1、窗口内的数据是否可以计算,2、窗口内的数据是否需要清理。
public enum TriggerResult {CONTINUE(false, false),FIRE_AND_PURGE(true, true),FIRE(true, false),PURGE(false, true);private final boolean fire;private final boolean purge;
}
TriggerResult 枚举有四个值,含义分别是:
- CONTINUE 不做任何操作
- FIRE 触发窗口计算,但是数据仍然保留
- PURGE 不触发计算,只是清理窗口内数据
- FIRE_AND_PURGE 触发窗口计算,同时清理数据
内置的Trigger
Flink 内置了许多常用的 Trigger,大多数情况下它们足以支撑我们的业务场景,只有当内置的Trigger不符合要求时,才需要开发自定义的 Trigger。
1、EventTimeTrigger
和事件时间窗口搭配使用的 Trigger,直到 Watermark 时间戳等于窗口的结束时间才会触发计算。
onElement 的逻辑是:当有元素加入到窗口时,先判断当前 Watermark 时间戳是否到达窗口的结束时间,如果到了就直接触发计算,否则注册一个时间事件,等待 Timer 触发窗口计算。
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}。。。。。。
}
2、ProcessingTimeTrigger
和 EventTimeTrigger 差不多,区别是采用的处理时间语义,没有 Watermark 相关的判断,直接注册ProcessingTime 事件等待窗口触发计算。
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return TriggerResult.FIRE;}
}
3、DeltaTrigger
DeltaTrigger 会计算新加入窗口的元素和上一个元素的差值,当这个差值超过给定的阈值时,窗口就会触发计算,元素之间的差值是通过指定的 DeltaFunction 计算出来的。
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {private static final long serialVersionUID = 1L;private final DeltaFunction<T> deltaFunction;private final double threshold;private final ValueStateDescriptor<T> stateDesc;private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);}public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);if (lastElementState.value() == null) {lastElementState.update(element);return TriggerResult.CONTINUE;} else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {lastElementState.update(element);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}
}
自定义Trigger
通过子类继承 Trigger 重写相应的方法,即可自定义我们自己的触发器。
举个例子,我们自定义一个和时间不相关的 Trigger,我们等窗口积攒到一定数量的元素再出发计算。如下示例程序:
public static class CounterTrigger extends Trigger<Long, GlobalWindow> {private final int count;public CounterTrigger(int count) {this.count = count;}@Overridepublic TriggerResult onElement(Long element, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {// 通过Flink state来保存窗口内积攒的元素数量ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<Integer>("count", Integer.class));int elementCount = Optional.ofNullable(countState.value()).orElse(0) + 1;if (elementCount >= this.count) {countState.update(0);return TriggerResult.FIRE_AND_PURGE;}countState.update(elementCount);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {}}
接下来验证一下我们的Trigger是否生效。
我们编写一个Flink作业,数据源每秒生成10个随机数,数据会被统一划分到 GlobalWindow 窗口,然后指定我们自定义的 CounterTrigger,等窗口内积攒了十条数据就出发求和计算。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Long>() {@Overridepublic void run(SourceContext<Long> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublic void cancel() {}}).windowAll(GlobalWindows.create()).trigger(new CounterTrigger(10)).process(new ProcessAllWindowFunction<Long, Object, GlobalWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Long, Object, GlobalWindow>.Context context, Iterable<Long> iterable, Collector<Object> collector) throws Exception {Iterator<Long> iterator = iterable.iterator();long sum = 0L;while (iterator.hasNext()) {sum += iterator.next();}System.err.println(sum);}});environment.execute();
}
运行Flink作业,控制台每隔一段时间就会输出随机数之和。
尾巴
在 Flink 中,Trigger 决定了何时触发窗口的计算和输出结果。通过灵活配置 Trigger 规则,能够精确控制数据处理的时机,适应不同的业务需求和数据特点。
Trigger 能够处理各种复杂的情况,例如在特定条件满足时触发,或者基于时间间隔、数据量等因素进行触发。它为开发者提供了精细的控制手段,确保数据处理的准确性和及时性。 合理运用 Trigger 可以优化 Flink 作业的性能,避免不必要的计算和资源消耗。
相关文章:
Flink触发器Trigger
前言 在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求…...
【操作系统的使用】Linux 系统环境变量与服务管理:设置与控制的艺术
文章目录 系统环境变量与服务管理:设置与控制的艺术一、系统环境变量的设置1.1 临时设置环境变量1.2 永久设置环境变量 二、服务启动类型的设置2.1 查看服务状态2.2 启动和停止服务2.3 设置服务的启动类型2.3.1 设置服务在启动时运行2.3.2 禁用服务在启动时运行2.3.…...
速盾:高防cdn配置中性能优化是什么?
高防CDN配置中的性能优化是指通过调整CDN配置以提升网站的加载速度、响应时间和用户体验。在进行性能优化时,需要考虑多个因素,包括CDN节点的选择和布置、缓存策略、缓存过期时间、预取和预加载、并发连接数和网络延迟等。 首先,CDN节点的选…...
Qt_软件添加版本信息
文章内容: 给生成的软件添加软件的版权等信息 #include <windows.h> //中文的话增加下面这一行 #pragma code_page(65001)VS_VERSION_INFO VERSIONINFO...
mallocfree和newdelete的区别
malloc\free和new\delete的区别 malloc/free new/delete 身份: 函数 运算符\关键字 返回值: void* 带类型的指针 参数: 字节个数(手动计算) 类型 自动计算字节数 处理数组: 手动计算数组总字节数 new 类型[数量] 扩容࿱…...
无锁队列实现(Michael Scott),伪代码与c++实现
一、Michael & Scoot 原版伪代码实现 structure pointer_t {ptr: pointer to node_t, count: unsigned integer}structure node_t {value: data type, next: pointer_t}structure queue_t {Head: pointer_t, Tail: pointer_t}initialize(Q: pointer to queue_t)node new_…...
猜数字小游戏
前言 猜数字游戏是一款经典且简单的互动游戏,常常用于提高逻辑思维能力和锻炼数学技巧。本文将深入探讨一段用 JavaScript 编写的猜数字游戏代码,帮助读者理解游戏的基本逻辑和实现方法。这段代码不仅适合初学者练习编程技巧,也是了解用户交…...
在Windows上搭建ChatTTS:从本地部署到远程AI音频生成全攻略
文章目录 前言1. 下载运行ChatTTS模型2. 安装Cpolar工具3. 实现公网访问4. 配置ChatTTS固定公网地址 前言 本篇文章主要介绍如何快速地在Windows系统电脑中本地部署ChatTTS开源文本转语音项目,并且我们还可以结合Cpolar内网穿透工具创建公网地址,随时随…...
如何用好 CloudFlare 的速率限制防御攻击
最近也不知道咋回事儿,群里好多站长都反映被CC 攻击了。有人说依旧是 PCDN 干的,但明月感觉不像,因为有几个站长被 CC 攻击都是各种动态请求(这里的动态请求指的是.php 文件的请求)。经常被攻击的站长们都知道,WordPress /Typecho 这类动态博客系统最怕的就是这种动态请求…...
Unity3D 立方体纹理与自制天空盒详解
立方体纹理和自制天空盒是游戏开发中常用的技术之一,可以为游戏增添更加丰富的视觉效果。在本文中,我们将详细介绍Unity3D中立方体纹理和自制天空盒的使用方法,并给出相应的代码实现。 对惹,这里有一个游戏开发交流小组ÿ…...
【工具】VSCODE下载,配置初次设置
打开 settings.json 文件,包含了 Visual Studio Code (VSCode) 中的各种用户配置。 {"files.associations": {"*.vue": "vue","*.wpy": "vue","*.wxml": "html","*.wxss": "…...
vue使用jquery的ajax,页面跳转
一、引入jquery依赖 打开终端更新npm npm install -g npm 更新完后引入输入npm install jquery 加载完后 在最外层的package.json文件中加入以下代码 配置好后导入jquery 设置变量用于接收服务器传输的数据 定义ajax申请数据 服务器的Controller层传输数据 (…...
基于微信小程序的社区二手交易系统的详细设计和实现(源码+lw+部署文档+讲解等)
项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念,提供了一套默认的配置,让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…...
D34【python 接口自动化学习】- python基础之输入输出与文件操作
day34 文件关闭 学习日期:20241011 学习目标:输入输出与文件操作﹣-46 常见常新:文件的关闭 学习笔记: 文件关闭的内部工作过程 close()函数 with语句 常用的打开关闭文件 # 文件关闭 # 方式…...
【Linux系列】set -euo pipefail 命令详解
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
【Python爬虫实战】正则:中文匹配与贪婪非贪婪模式详解
🌈个人主页:https://blog.csdn.net/2401_86688088?typeblog 🔥 系列专栏:https://blog.csdn.net/2401_86688088/category_12797772.html 目录 前言 一、匹配中文 (一)匹配单个中文字符 (二…...
保护数据安全:JS前端加密与PHP后端解密实战教程,让敏感信息更安全
保护数据安全:JS前端加密与PHP后端解密实战教程,让敏感信息更安全 在Web开发中,确保用户提交的敏感信息(如密码、手机号码等)的安全性是非常重要的。一种常见的做法是使用加密技术来保护这些数据,在传输过…...
72 分布式锁
72 分布式锁 什么是分布式锁 分布式锁 分布式 锁。那么分布式是指的什么呢?锁又是锁的谁呢?在业务开发中我们经常会听到分布式分布式的概念,分布式也很简单,通俗的来说就是你具有多个服务器,每个服务器上运行的程序…...
使用Windbg分析dump文件排查C++软件异常的一般步骤与要点分享
目录 1、概述 2、打开dump文件,查看发生异常的异常类型码 3、查看发生异常的那条汇编指令 3.1、汇编代码能最直接、最本真的反映出崩溃的原因 3.2、汇编指令中访问64KB小地址内存区,可能是访问了空指针 3.3、汇编指令中访问了很大的内核态的内存地…...
30 天 Python 3 学习计划
30 天 Python 3 学习计划 https://www.runoob.com/python3/python3-tutorial.html 1. Python3 基础语法 2. Python3 基本数据类型 3. Python3 数据类型转换 4. Python3 解释器 5. Python3 注释 6. Python3 运算符 7. Python3 数字(Number) 8. Python3 字符串 …...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...
在 Spring Boot 项目里,MYSQL中json类型字段使用
前言: 因为程序特殊需求导致,需要mysql数据库存储json类型数据,因此记录一下使用流程 1.java实体中新增字段 private List<User> users 2.增加mybatis-plus注解 TableField(typeHandler FastjsonTypeHandler.class) private Lis…...
