当前位置: 首页 > news >正文

Flink之窗口触发机制及自定义Trigger的使用

1 窗口触发机制

窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下:
  • Trigger
  • ProcessingTimeoutTrigger
  • EventTimeTrigger
  • CountTrigger
  • DeltaTrigger
  • NeverTrigger in GlobalWindows
  • ContinuousEventTimeTrigger
  • PurgingTrigger
  • ContinuousProcessingTimeTrigger
  • ProcessingTimeTrigger
通常情况下是不需要自己重写Trigger的,使用Flink内置的就可以,除非特殊业务特殊需求.
1.1 源码解析

EventTimeTrigger源码说明如何触发窗口计算,在EventTimeTrigger源码中只需要关注onElementonEventTime两个方法即可,源码内容如下:

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}// 基于数据驱动的方法@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {// 判断当前watermark是否大于等于窗口的最大时间if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 如果大于等于窗口的最大时间触发计算return TriggerResult.FIRE;} else {// 小于窗口的最大时间首先注册定时器ctx.registerEventTimeTimer(window.maxTimestamp());// 然后等待数据继续输入,不触发计算return TriggerResult.CONTINUE;}}// 基于事件时间定时器驱动的方法@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {// 根据不断发送来的watermark判断是否触发计算return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}// ...
}

源码中将不需要的关注的代码都已省略

  • onElement

    注释中写明这个方法是基于数据进行驱动的,也就是说只有数据到达时才会执行这个方法,每一个窗口都有自己的startTimeendTime,也就是窗口的范围,判断条件中window.maxTimestamp()获取的就是当前窗口的endTime,如果当前watermark超出当前窗口的endTime就会触发这个窗口计算,TriggerResult.FIRE表示的就是窗口开始计算,如果当前watermark小于endTime就不会触发窗口计算这个窗口会继续等待数据输入,也就是TriggerResult.CONTINUE方法.

  • onEventTime

    onElement是由数据驱动的,但是Flink的实际数据处理过程是存在没有数据发送到当前窗口,但是会有watermark源源不断的发送到当前窗口的情况,在多并行度的执行条件下就会发生这种情况.在onEventTime方法中如果上游发送过来的watermark等于当前窗口的endTime就会执行TriggerResult.FIRE否则还是执行TriggerResult.CONTINUE.

Trigger的触发机制就是这样,其他的CountTrigger等大致逻辑基本是一样的,了解清楚源码中这两个方法的作用很容易理解.

1.2 代码实现

通常Flink内置的Trigger都可以满足数据处理需求,往往在实际开发中可能会存在特殊的业务需求,这时用户可以自定义Trigger,以达到控制窗口触发计算的规则. 可以仿照EventTimeTrigger来构建一个自定义Trigger,只需要将其中的部分代码简单进行修改,并在onElement方法中添加自定的触发逻辑即可.
  • 自定义Trigger

    /*** 这里首先需要继承Trigger类,并将<Object, TimeWindow>中的Object修改成自己需要的数据类型,这段代码中需要根据UserEvent2中的数据* 来控制触发窗口计算的条件,所以将Object修改成UserEvent2**/ 
    public class CustomTrigger extends Trigger<UserEvent2, TimeWindow> {public CustomTrigger() {}// 通过修改onElement方法中窗口计算的触发逻辑实现自定义方式@Overridepublic TriggerResult onElement(// 这里也要将原有的Object类型修改成上面的UserEvent2UserEvent2 element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {// 原有的判断逻辑不动,这个是为了便捷,判断逻辑可以根据实际需求进行修改,或者如同下面中添加一个新的触发逻辑if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {return TriggerResult.FIRE;// 这里增加一个判断逻辑,当用户行为时间为2700的时候也触发计算} else if (element.getTime().equals("2700")) {return TriggerResult.FIRE;// 原有的判断逻辑不动} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) {long windowMaxTimestamp = window.maxTimestamp();if (windowMaxTimestamp > ctx.getCurrentWatermark()) {ctx.registerEventTimeTimer(windowMaxTimestamp);}}// 将toString中俄返回值根据用户的需要进行修改@Overridepublic String toString() {return "CustomTrigger()";}// 将返回值更改成创建的自定义Trigger类public static CustomTrigger create() {return new CustomTrigger();}
    }
    
  • 业务代码

    // ...
    SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 设置滚动窗口大小为10s.trigger(new CustomTrigger()) // 传入自定义的Trigger类.allowedLateness(Time.seconds(2)) // 允许迟到数据迟到时间2s,同watermark中的forBoundedOutOfOrderness功能类似.sideOutputLateData(lateData) // 将迟到数据进行测流输出.max("time");// 获取用户行为发生事件最大的这条数据
    // ...
    

    上面这段业务代码中设置的滚动窗口的大小为10s,正常来说只有满足end - start = 10000的时候才会触发窗口计算,但是在自定义Trigger中指定了当数据中时间为2700的时候也触发窗口计算,在时间为2700的数据没到达时候还会按照原有的逻辑触发窗口计算,但是只要2700的数据到达,不管时候达到TumblingEventTimeWindows.of(Time.seconds(10))这个条件,都会触发窗口计算.

相关文章:

Flink之窗口触发机制及自定义Trigger的使用

1 窗口触发机制 窗口计算的触发机制都是由Trigger类决定的,Flink中为各类内置的WindowsAssigner都设计了对应的默认Trigger. 层次结构如下: Trigger ProcessingTimeoutTriggerEventTimeTriggerCountTriggerDeltaTriggerNeverTrigger in GlobalWindowsContinuousEventTimeTrigge…...

蓝牙资讯|2024年智能家居新趋势,蓝牙助力智能家居发展

2024年将迎来变革&#xff0c;智能家居趋势不仅会影响我们的生活空间&#xff0c;还会提高我们的生活质量&#xff0c;让我们有更多时间享受属于自己的时光。 2024年智能家居新趋势 趋势一&#xff1a;多功能科技 2024年预示着多功能技术的趋势&#xff0c;创新将成为焦点。混…...

机器学习 | Python实现GA-XGBoost遗传算法优化极限梯度提升树特征分类模型调参

机器学习 | Python实现GA-XGBoost遗传算法优化极限梯度提升树特征分类 目录 机器学习 | Python实现GA-XGBoost遗传算法优化极限梯度提升树特征分类基本介绍模型描述程序设计参考资料基本介绍 XGBoost的核心算法思想基本就是: 不断地添加树,不断地进行特征分裂来生长一棵树,每…...

手部关键点检测3:Pytorch实现手部关键点检测(手部姿势估计)含训练代码和数据集

手部关键点检测3&#xff1a;Pytorch实现手部关键点检测(手部姿势估计)含训练代码和数据集 目录 手部关键点检测3&#xff1a;Pytorch实现手部关键点检测(手部姿势估计)含训练代码和数据集 1. 前言 2.手部关键点检测(手部姿势估计)方法 (1)Top-Down(自上而下)方法 (2)Bot…...

服务日志性能调优,由log引出的巨坑

只有被线上服务问题毒打过的人才明白日志有多重要&#xff01; 谁赞成&#xff0c;谁反对&#xff1f;如果你深有同感&#xff0c;那恭喜你是个社会人了&#xff1a;&#xff09; 日志对程序的重要性不言而喻&#xff0c;轻巧、简单、无需费脑&#xff0c;程序代码中随处可见…...

【VR】【Unity】如何调整Quest2的隐藏系统时间日期

【背景】 网络虽然OK&#xff0c;但是Oculus Quest要连上商店还必须调整好系统时间&#xff0c;不过在Quest系统中&#xff0c;时间对用户是不可见的&#xff0c;本篇介绍调整的方法。 【方法】 打开SideQuest&#xff0c;没有的话先去下载一个。打开后先登录&#xff0c;如…...

C++之设计模式

C23种设计模式 https://blog.csdn.net/qq_40309341/article/details/120318957 设计模式可以同时使用多个。在软件开发中&#xff0c;通常会根据需求和问题的复杂性&#xff0c;结合多种设计模式来构建应用程序&#xff0c;以提高代码的可维护性、可扩展性和重用性。不同的设计…...

Django ORM查询

文章目录 1 增 -- 向表内插入一条数据2 删 -- 删除表内数据&#xff08;物理删除&#xff09;3 改 -- update操作更新某条数据4 查 -- 基本的表查询&#xff08;包括多表、跨表、子查询、联表查询&#xff09;4.1 基本查询4.2 双下划线查询条件4.3 逻辑查询&#xff1a;or、and…...

如何在CentOS 7中卸载Python 2.7,并安装3.X

Python是一种常用的编程语言&#xff0c;但是如果您不需要在服务器上使用Python 2.7&#xff0c;那么本文将详细介绍如何在CentOS 7上卸载Python 2.7。 一、检查Python版本 在卸载Python 2.7之前&#xff0c;必须检查系统上的Python版本。 在终端中执行以下命令&#xff1a;…...

10.17七段数码管单个多个(部分)

单个数码管的实现 第一种方式 一端并接称为位码&#xff1b;一端分别接收电平信号以控制灯的亮灭&#xff0c;称为段码 8421BCD码转七段数码管段码是将BCD码表示的十进制数转换成七段LED数码管的7个驱动段码&#xff0c; 段码就是LED灯的信号 a为1表示没用到a&#xff0c;a为…...

linux静态库与动态库

库是一种可执行的二进制文件&#xff0c;是编译好的代码。使用库可以提高开发效率。在Linux 下有静态库和动态库。   静态库在程序编译的时候会被链接到目标代码里面。所以程序在运行的时候不再需要静态库了。因此编译出来的体积就比较大。以 lib 开头&#xff0c;以.a 结尾。…...

LeetCode 面试题 10.03. 搜索旋转数组

文章目录 一、题目二、C# 题解 一、题目 搜索旋转数组。给定一个排序后的数组&#xff0c;包含n个整数&#xff0c;但这个数组已被旋转过很多次了&#xff0c;次数不详。请编写代码找出数组中的某个元素&#xff0c;假设数组元素原先是按升序排列的。若有多个相同元素&#xff…...

SpringCloudSleuth异步线程支持和传递

场景 在使用Sleuth做链路跟踪时&#xff0c;默认情况下异步线程会断链&#xff0c;需要进行代码调整支持。 调整内容 方式一 使用Async实现异步线程 开启异步线程池 EnableAsync SpringBootApplication public class LizzApplication {public static void main(String[] a…...

如何使用 Disco 将黑白照片彩色化

Disco 是一个基于视觉语言模型&#xff08;LLM&#xff09;的图像彩色化工具。它使用 LLM 来生成彩色图像&#xff0c;这些图像与原始黑白图像相似。 本文将介绍如何使用 Disco 将黑白照片彩色化。 使用 Disco 提供了一个简单的在线演示&#xff0c;可以用于测试模型。 访问…...

ChatGPT AIGC 制作大屏可视化分析案例

第一部分提示词prompt: 商品 价格 p1 13 p2 41 p3 42 p4 53 p5 19 p6 28 p7 92 p8 62 城市 销量 北京 69 上海 13 南京 18 武汉 66 成都 70 你现在是一名非常专业的数据分析师,请结合上述数据完成下列几件事情 1:第一部分数…...

2023年9款好用的在线流程图软件推荐!

随着互联网技术和基础设施的发展&#xff0c;人们能用上比过去更加稳定的网络&#xff0c;因此在使用各类工具软件时&#xff0c;越来越倾向于选择在线工具&#xff0c;或是推出了网页版的应用。 就流程图软件而言&#xff0c;过去想要绘制流程图&#xff0c;我们得在电脑上安…...

剑指Offer || 044.在每个树行中找最大值

题目 给定一棵二叉树的根节点 root &#xff0c;请找出该二叉树中每一层的最大值。 示例1&#xff1a; 输入: root [1,3,2,5,3,null,9] 输出: [1,3,9] 解释:1/ \3 2/ \ \ 5 3 9 示例2&#xff1a; 输入: root [1,2,3] 输出: [1,3] 解释:1/ \2 3示例3&#xff…...

ESP32网络开发实例-UDP数据发送与接收

UDP数据发送与接收 文章目录 UDP数据发送与接收1、UDP简单介绍2、软件准备3、硬件准备4、代码实现本文将详细介绍在Arduino开发环境中,如何实现ESP32通过UDP协议进行数据发送与接收。 1、UDP简单介绍 用户数据报协议 (UDP) 是一种跨互联网使用的通信协议,用于对时间敏感的传…...

液压自动化成套设备比例阀放大器

液压电气成套设备的比例阀放大器是一种电子控制设备&#xff0c;用于控制液压动力系统中的液压比例阀1。 比例阀放大器通常采用电子信号进行控制&#xff0c;以控制比例阀的开度和流量&#xff0c;以实现液压系统的可靠控制。比例阀放大器主要由以下组成部分&#xff1a; 驱动…...

专业144,总分440+,上岸西北工业大学827西工大信号与系统考研经验分享

我的初试备考从4月末&#xff0c;持续到初试前&#xff0c;这中间没有中断。 总的时间分配上&#xff0c;是数学>专业课>英语>政治&#xff0c;虽然大家可支配时间和基础千差万别&#xff0c;但是这么分配是没错的。 数学 时间安排&#xff1a;3月-7月&#xff1a;…...

Waymo Open Dataset Docker部署:环境配置与容器化最佳实践

Waymo Open Dataset Docker部署&#xff1a;环境配置与容器化最佳实践 【免费下载链接】waymo-open-dataset Waymo Open Dataset 项目地址: https://gitcode.com/gh_mirrors/wa/waymo-open-dataset Waymo Open Dataset是自动驾驶领域的重要开源项目&#xff0c;提供了丰…...

如何用ViGEmBus实现Windows内核级游戏手柄模拟:架构解析与实践指南

如何用ViGEmBus实现Windows内核级游戏手柄模拟&#xff1a;架构解析与实践指南 【免费下载链接】ViGEmBus Windows kernel-mode driver emulating well-known USB game controllers. 项目地址: https://gitcode.com/gh_mirrors/vi/ViGEmBus ViGEmBus是一款Windows内核模…...

YOLO-v5小目标检测:微小物体识别效果惊艳展示

YOLO-v5小目标检测&#xff1a;微小物体识别效果惊艳展示 1. 小目标检测的技术挑战 在计算机视觉领域&#xff0c;小目标检测一直是个棘手的问题。当目标在图像中占据的像素面积小于3232时&#xff0c;传统检测算法往往会遇到以下困难&#xff1a; 特征信息不足&#xff1a;…...

从仿真到现实:聊聊PIN二极管模型在有源衰减器设计中的那些“坑”与优化思路

从仿真到现实&#xff1a;PIN二极管模型在有源衰减器设计中的关键挑战与工程优化 在射频电路设计中&#xff0c;有源衰减器的性能直接影响着系统的动态范围和信号质量。当我们从仿真环境转向实际电路实现时&#xff0c;PIN二极管模型的准确性往往成为决定成败的关键因素。许多工…...

荣耀XD21路由器IPTV设置指南:不用VLAN交换机实现单线复用

荣耀XD21路由器单线复用实战&#xff1a;无需VLAN交换机实现IPTV与网络并行传输 客厅弱电箱仅预留单根网线却需要同时承载IPTV和无线网络信号——这是许多家庭网络改造中遇到的典型难题。传统方案往往依赖价格不菲的VLAN交换机实现单线复用&#xff0c;但通过荣耀XD21路由器的隐…...

汽车UDS刷写避坑指南:从S32K144 Bootloader的链接文件到安全访问,这些细节你注意了吗?

汽车UDS刷写实战避坑手册&#xff1a;S32K144 Bootloader开发中的七个致命细节 当你在凌晨三点的实验室里盯着CANoe窗口不断跳出的NRC 31&#xff08;requestOutOfRange&#xff09;错误码时&#xff0c;会不会突然怀念用J-Link直接烧录的简单日子&#xff1f;UDS刷写就像汽车电…...

导师推荐 2026 最新!降AI率软件测评与好用工具推荐

2026年真正好用的AI论文降重与改写工具&#xff0c;核心看降重效果、去AI味、格式保留、学术适配四大指标。综合实测&#xff0c;千笔AI、ThouPen、豆包、DeepSeek、Grammarly 是当前最值得推荐的梯队&#xff0c;覆盖从免费到付费、从中文到英文、从文科到理工的全场景需求。 …...

OpenClaw数据可视化:GLM-4.7-Flash分析结果自动图表生成

OpenClaw数据可视化&#xff1a;GLM-4.7-Flash分析结果自动图表生成 1. 为什么需要自动化数据可视化 作为一名经常需要处理数据的开发者&#xff0c;我发现自己80%的时间都花在了数据清洗和图表调整上。每次分析新数据集时&#xff0c;都要重复这些步骤&#xff1a;写Python脚…...

高效文件同步:SyncTrayzor在Windows上的完整解决方案

高效文件同步&#xff1a;SyncTrayzor在Windows上的完整解决方案 【免费下载链接】SyncTrayzor Windows tray utility / filesystem watcher / launcher for Syncthing 项目地址: https://gitcode.com/gh_mirrors/sy/SyncTrayzor SyncTrayzor是Windows平台上最实用的Syn…...

Linux下Conda+R+RStudio环境配置全攻略:从零搭建高效数据分析平台

1. 为什么选择Conda管理R环境&#xff1f; 很多数据分析师习惯直接在系统里安装R和R包&#xff0c;但很快就会遇到版本冲突的麻烦。比如你需要安装一个要求R 4.3.0的包&#xff0c;但系统里装的是R 4.2.0&#xff0c;更糟的是其他所有包都是基于4.2.0编译的。这时候conda的价值…...