Flink触发器Trigger
前言
在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求,可以通过 WindowedStream.trigger() 指定自定义的 Trigger。
认识Trigger
所有触发器都是org.apache.flink.streaming.api.windowing.triggers.Trigger的子类,父类定义了一个触发器应该具备的能力:
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {private static final long serialVersionUID = -4104633972991191369L;public Trigger() {}public abstract TriggerResult onElement(T var1, long var2, W var4, TriggerContext var5) throws Exception;public abstract TriggerResult onProcessingTime(long var1, W var3, TriggerContext var4) throws Exception;public abstract TriggerResult onEventTime(long var1, W var3, TriggerContext var4) throws Exception;public boolean canMerge() {return false;}public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException("This trigger does not support merging.");}public abstract void clear(W var1, TriggerContext var2) throws Exception;
}
Trigger 抽象类提供了六个方法:
- onElement 元素被加入到窗口时触发,返回值决定窗口是否计算
- onProcessingTime 注册的ProcessingTime任务时间到达时触发
- onEventTime 注册的EventTime任务时间到达时触发
- canMerge 是否可以合并
- onMerge Trigger合并
- clear 窗口被移除时触发
如果数据本身携带窗口是否触发计算的标记,那么重写 onElement() 方法即可;但是在时间窗口计算模型下,并不是通过元素来判断窗口是否需要计算的,而是窗口的结束时间到达时才出发计算,这个时候就需要用到定时器 Timer。Timer 就像一个闹钟,我们可以在它上面注册一个未来的时间戳,当这个时间到达时,对应的事件就会被触发,就像闹钟喊醒沉睡的你一样。
Trigger 会有自己的 Timer,TriggerContext 提供了注册时间事件的方法,你可以根据自己采用的时间语义,调用对应的注册方法来注册事件。
triggerContext.registerProcessingTimeTimer();
triggerContext.registerEventTimeTimer();
在重写这三个方法时,要重点关注方法的返回值。方法的返回值有两个作用:1、窗口内的数据是否可以计算,2、窗口内的数据是否需要清理。
public enum TriggerResult {CONTINUE(false, false),FIRE_AND_PURGE(true, true),FIRE(true, false),PURGE(false, true);private final boolean fire;private final boolean purge;
}
TriggerResult 枚举有四个值,含义分别是:
- CONTINUE 不做任何操作
- FIRE 触发窗口计算,但是数据仍然保留
- PURGE 不触发计算,只是清理窗口内数据
- FIRE_AND_PURGE 触发窗口计算,同时清理数据
内置的Trigger
Flink 内置了许多常用的 Trigger,大多数情况下它们足以支撑我们的业务场景,只有当内置的Trigger不符合要求时,才需要开发自定义的 Trigger。
1、EventTimeTrigger
和事件时间窗口搭配使用的 Trigger,直到 Watermark 时间戳等于窗口的结束时间才会触发计算。
onElement 的逻辑是:当有元素加入到窗口时,先判断当前 Watermark 时间戳是否到达窗口的结束时间,如果到了就直接触发计算,否则注册一个时间事件,等待 Timer 触发窗口计算。
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}。。。。。。
}
2、ProcessingTimeTrigger
和 EventTimeTrigger 差不多,区别是采用的处理时间语义,没有 Watermark 相关的判断,直接注册ProcessingTime 事件等待窗口触发计算。
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private ProcessingTimeTrigger() {}public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) {ctx.registerProcessingTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {return TriggerResult.FIRE;}
}
3、DeltaTrigger
DeltaTrigger 会计算新加入窗口的元素和上一个元素的差值,当这个差值超过给定的阈值时,窗口就会触发计算,元素之间的差值是通过指定的 DeltaFunction 计算出来的。
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {private static final long serialVersionUID = 1L;private final DeltaFunction<T> deltaFunction;private final double threshold;private final ValueStateDescriptor<T> stateDesc;private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.stateDesc = new ValueStateDescriptor("last-element", stateSerializer);}public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {ValueState<T> lastElementState = (ValueState)ctx.getPartitionedState(this.stateDesc);if (lastElementState.value() == null) {lastElementState.update(element);return TriggerResult.CONTINUE;} else if (this.deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {lastElementState.update(element);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}
}
自定义Trigger
通过子类继承 Trigger 重写相应的方法,即可自定义我们自己的触发器。
举个例子,我们自定义一个和时间不相关的 Trigger,我们等窗口积攒到一定数量的元素再出发计算。如下示例程序:
public static class CounterTrigger extends Trigger<Long, GlobalWindow> {private final int count;public CounterTrigger(int count) {this.count = count;}@Overridepublic TriggerResult onElement(Long element, long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {// 通过Flink state来保存窗口内积攒的元素数量ValueState<Integer> countState = triggerContext.getPartitionedState(new ValueStateDescriptor<Integer>("count", Integer.class));int elementCount = Optional.ofNullable(countState.value()).orElse(0) + 1;if (elementCount >= this.count) {countState.update(0);return TriggerResult.FIRE_AND_PURGE;}countState.update(elementCount);return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long timestamp, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {}}
接下来验证一下我们的Trigger是否生效。
我们编写一个Flink作业,数据源每秒生成10个随机数,数据会被统一划分到 GlobalWindow 窗口,然后指定我们自定义的 CounterTrigger,等窗口内积攒了十条数据就出发求和计算。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.addSource(new SourceFunction<Long>() {@Overridepublic void run(SourceContext<Long> sourceContext) throws Exception {while (true) {Threads.sleep(100);sourceContext.collect(ThreadLocalRandom.current().nextLong(100));}}@Overridepublic void cancel() {}}).windowAll(GlobalWindows.create()).trigger(new CounterTrigger(10)).process(new ProcessAllWindowFunction<Long, Object, GlobalWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Long, Object, GlobalWindow>.Context context, Iterable<Long> iterable, Collector<Object> collector) throws Exception {Iterator<Long> iterator = iterable.iterator();long sum = 0L;while (iterator.hasNext()) {sum += iterator.next();}System.err.println(sum);}});environment.execute();
}
运行Flink作业,控制台每隔一段时间就会输出随机数之和。
尾巴
在 Flink 中,Trigger 决定了何时触发窗口的计算和输出结果。通过灵活配置 Trigger 规则,能够精确控制数据处理的时机,适应不同的业务需求和数据特点。
Trigger 能够处理各种复杂的情况,例如在特定条件满足时触发,或者基于时间间隔、数据量等因素进行触发。它为开发者提供了精细的控制手段,确保数据处理的准确性和及时性。 合理运用 Trigger 可以优化 Flink 作业的性能,避免不必要的计算和资源消耗。
相关文章:
Flink触发器Trigger
前言 在 Flink 窗口计算模型中,数据先经过 WindowAssigner 分配窗口,然后再经过触发器 Trigger,Trigger 决定了一个窗口何时被 ProcessFunction 处理。每个 WindowAssigner 都有一个默认的 Trigger,如果默认的不满足需求…...
【操作系统的使用】Linux 系统环境变量与服务管理:设置与控制的艺术
文章目录 系统环境变量与服务管理:设置与控制的艺术一、系统环境变量的设置1.1 临时设置环境变量1.2 永久设置环境变量 二、服务启动类型的设置2.1 查看服务状态2.2 启动和停止服务2.3 设置服务的启动类型2.3.1 设置服务在启动时运行2.3.2 禁用服务在启动时运行2.3.…...
速盾:高防cdn配置中性能优化是什么?
高防CDN配置中的性能优化是指通过调整CDN配置以提升网站的加载速度、响应时间和用户体验。在进行性能优化时,需要考虑多个因素,包括CDN节点的选择和布置、缓存策略、缓存过期时间、预取和预加载、并发连接数和网络延迟等。 首先,CDN节点的选…...
Qt_软件添加版本信息
文章内容: 给生成的软件添加软件的版权等信息 #include <windows.h> //中文的话增加下面这一行 #pragma code_page(65001)VS_VERSION_INFO VERSIONINFO...
mallocfree和newdelete的区别
malloc\free和new\delete的区别 malloc/free new/delete 身份: 函数 运算符\关键字 返回值: void* 带类型的指针 参数: 字节个数(手动计算) 类型 自动计算字节数 处理数组: 手动计算数组总字节数 new 类型[数量] 扩容࿱…...
无锁队列实现(Michael Scott),伪代码与c++实现
一、Michael & Scoot 原版伪代码实现 structure pointer_t {ptr: pointer to node_t, count: unsigned integer}structure node_t {value: data type, next: pointer_t}structure queue_t {Head: pointer_t, Tail: pointer_t}initialize(Q: pointer to queue_t)node new_…...
猜数字小游戏
前言 猜数字游戏是一款经典且简单的互动游戏,常常用于提高逻辑思维能力和锻炼数学技巧。本文将深入探讨一段用 JavaScript 编写的猜数字游戏代码,帮助读者理解游戏的基本逻辑和实现方法。这段代码不仅适合初学者练习编程技巧,也是了解用户交…...
在Windows上搭建ChatTTS:从本地部署到远程AI音频生成全攻略
文章目录 前言1. 下载运行ChatTTS模型2. 安装Cpolar工具3. 实现公网访问4. 配置ChatTTS固定公网地址 前言 本篇文章主要介绍如何快速地在Windows系统电脑中本地部署ChatTTS开源文本转语音项目,并且我们还可以结合Cpolar内网穿透工具创建公网地址,随时随…...
如何用好 CloudFlare 的速率限制防御攻击
最近也不知道咋回事儿,群里好多站长都反映被CC 攻击了。有人说依旧是 PCDN 干的,但明月感觉不像,因为有几个站长被 CC 攻击都是各种动态请求(这里的动态请求指的是.php 文件的请求)。经常被攻击的站长们都知道,WordPress /Typecho 这类动态博客系统最怕的就是这种动态请求…...
Unity3D 立方体纹理与自制天空盒详解
立方体纹理和自制天空盒是游戏开发中常用的技术之一,可以为游戏增添更加丰富的视觉效果。在本文中,我们将详细介绍Unity3D中立方体纹理和自制天空盒的使用方法,并给出相应的代码实现。 对惹,这里有一个游戏开发交流小组ÿ…...
【工具】VSCODE下载,配置初次设置
打开 settings.json 文件,包含了 Visual Studio Code (VSCode) 中的各种用户配置。 {"files.associations": {"*.vue": "vue","*.wpy": "vue","*.wxml": "html","*.wxss": "…...
vue使用jquery的ajax,页面跳转
一、引入jquery依赖 打开终端更新npm npm install -g npm 更新完后引入输入npm install jquery 加载完后 在最外层的package.json文件中加入以下代码 配置好后导入jquery 设置变量用于接收服务器传输的数据 定义ajax申请数据 服务器的Controller层传输数据 (…...
基于微信小程序的社区二手交易系统的详细设计和实现(源码+lw+部署文档+讲解等)
项目运行截图 技术框架 后端采用SpringBoot框架 Spring Boot 是一个用于快速开发基于 Spring 框架的应用程序的开源框架。它采用约定大于配置的理念,提供了一套默认的配置,让开发者可以更专注于业务逻辑而不是配置文件。Spring Boot 通过自动化配置和约…...
D34【python 接口自动化学习】- python基础之输入输出与文件操作
day34 文件关闭 学习日期:20241011 学习目标:输入输出与文件操作﹣-46 常见常新:文件的关闭 学习笔记: 文件关闭的内部工作过程 close()函数 with语句 常用的打开关闭文件 # 文件关闭 # 方式…...
【Linux系列】set -euo pipefail 命令详解
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
【Python爬虫实战】正则:中文匹配与贪婪非贪婪模式详解
🌈个人主页:https://blog.csdn.net/2401_86688088?typeblog 🔥 系列专栏:https://blog.csdn.net/2401_86688088/category_12797772.html 目录 前言 一、匹配中文 (一)匹配单个中文字符 (二…...
保护数据安全:JS前端加密与PHP后端解密实战教程,让敏感信息更安全
保护数据安全:JS前端加密与PHP后端解密实战教程,让敏感信息更安全 在Web开发中,确保用户提交的敏感信息(如密码、手机号码等)的安全性是非常重要的。一种常见的做法是使用加密技术来保护这些数据,在传输过…...
72 分布式锁
72 分布式锁 什么是分布式锁 分布式锁 分布式 锁。那么分布式是指的什么呢?锁又是锁的谁呢?在业务开发中我们经常会听到分布式分布式的概念,分布式也很简单,通俗的来说就是你具有多个服务器,每个服务器上运行的程序…...
使用Windbg分析dump文件排查C++软件异常的一般步骤与要点分享
目录 1、概述 2、打开dump文件,查看发生异常的异常类型码 3、查看发生异常的那条汇编指令 3.1、汇编代码能最直接、最本真的反映出崩溃的原因 3.2、汇编指令中访问64KB小地址内存区,可能是访问了空指针 3.3、汇编指令中访问了很大的内核态的内存地…...
30 天 Python 3 学习计划
30 天 Python 3 学习计划 https://www.runoob.com/python3/python3-tutorial.html 1. Python3 基础语法 2. Python3 基本数据类型 3. Python3 数据类型转换 4. Python3 解释器 5. Python3 注释 6. Python3 运算符 7. Python3 数字(Number) 8. Python3 字符串 …...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
Python爬虫(一):爬虫伪装
一、网站防爬机制概述 在当今互联网环境中,具有一定规模或盈利性质的网站几乎都实施了各种防爬措施。这些措施主要分为两大类: 身份验证机制:直接将未经授权的爬虫阻挡在外反爬技术体系:通过各种技术手段增加爬虫获取数据的难度…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
自然语言处理——Transformer
自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效,它能挖掘数据中的时序信息以及语义信息,但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN,但是…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...
