当前位置: 首页 > news >正文

flink-触发器Trigger和移除器Evictor

窗口原理与机制

图片链接:https://blog.csdn.net/qq_35590459/article/details/132177154

  1. 数据流进入算子前,被提交给WindowAssigner,决定元素被放到哪个或哪些窗口,同时可能会创建新窗口或者合并旧的窗口。
  2. 每一个窗口都拥有一个属于自己的触发器Trigger,每当有元素被分配到该窗口,或者之前注册的定时器超时时,Trigger都会被调用。
  3. Trigger被触发后,窗口中的元素集合就会交给Evictor(如果指定了),遍历窗口中的元素列表,并决定最先进入窗口的多少个元素需要被移除。
  4. 窗口函数计算结果值,发送给下游;

Trigger 触发器

触发器作用:控制窗口什么时候除法计算。即执行窗口函数;基于WindowStream调用trigger()方法,传入自定义触发器(trigger);

每一个窗口分配器(windowAssigner) 都会对应一个默认的触发器;

 源码样例

  @PublicEvolvingpublic <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {return new WindowedStream<>(this, assigner);}@PublicEvolvingpublic WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {this.input = input;this.builder =new WindowOperatorBuilder<>(windowAssigner,windowAssigner.getDefaultTrigger(input.getExecutionEnvironment()),input.getExecutionConfig(),input.getType(),input.getKeySelector(),input.getKeyType());}==============默认触发器===
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}

Triger类有4个方法

  1.  onElement:窗口中每来一个元素调用该方法。
    onProcessingTime:当注册的处理时间定时器触发时,将调用这个方法。onEventTime:当注时的事件时间定时器触发时,将调用这个方法。clear:窗口关闭冰销毁时调用这个方法,一般用来清除自定义状态。onElement() ,onProcessingTime(),onEventTime()方法的返回类型都是 TriggerResult;TriggerResult中包含四个枚举值:
    CONTINUE:表示对窗口不执行任何操作。
    FIRE:触发计算并输出结果。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
    PURGE:表示将窗口中的数据和窗口清除。
    FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。
    

源码

/** No action is taken on the window. */
CONTINUE(false, false),
/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */
FIRE_AND_PURGE(true, true),
/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/
FIRE(true, false),
/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/
PURGE(false, true);

flink提供的触发器 

flink提供触发器

  • EventTimeTrigger通过对比EventTime和窗口的Endtime确定是否触发窗口计算,如果EventTime大于Window EndTime则触发,否则不触发,窗口将继续等待。
  • ProcessingTimeoutTrigger:当内置触发器满足设置的超时时间时,触发窗口的计算。
  • ProcessTimeTrigger:通过对比ProcessTime和窗口EndTme确定是否触发窗口,如果ProcessTime大于EndTime则触发计算,否则窗口继续等待。
  • ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前EndTime触发窗口计算。
  • ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者Window的结束时间小于当前ProcessTime触发窗口计算。
  • CountTrigger:根据接入数据量是否超过设定的阙值判断是否触发窗口计算。
  • DeltaTrigger:根据接入数据计算出来的Delta指标是否超过指定的Threshold去判断是否触发窗口计算。
  • PurgingTrigger:可以将任意触发器作为参数转换为Purge类型的触发器,计算完成后数据将被清理。
  • NeverTrigger:任何时候都不触发窗口计算,全局窗口触发器,

原文链接:https://blog.csdn.net/qq_37555071/article/details/122514061

水印触发一般是窗口关闭时间

flink提供的触发器是与窗口对应,当有水印时,如果水印时间大于等于窗口结束时间会触发计算;window.maxTimestamp()获取的是窗口end-1; EventTimeTrigger 的源码可以很明确可以看到注册时注册了触发时间为window.maxTimestamp(),这也是窗口关闭的触发机制。

如果在窗口关闭前触发计算设置多个触发计算时间,这样实现一些特定的需求。例如,每10s输出一次当天的累计数据;

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {private static final long serialVersionUID = 1L;private EventTimeTrigger() {}@Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());return TriggerResult.CONTINUE;}}@Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {// 限定触发条件为窗口关闭时间,否则就继续窗口 return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}
.....
 

自定义触发器

继承Triger,重写抽象方法,案例

.window(TumblingEventTimeWindows.of(Time.hours(24))).trigger(new MyTrigger()).process(new WindowResult()).print();窗口长24小时,每十秒触发一次计算
===================public static class MyTrigger extends Trigger<Event, TimeWindow> {@Overridepublic TriggerResult onElement(Event event, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {//定义状态,记录该状态 触发器第一个元素进来时注册全部的触发器ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));//第一次注册,右面全部跳过if (isFirstEvent.value() == null) {for (long i = timeWindow.getStart(); i < timeWindow.getEnd(); i = i + 10000L) {//注册触发器  间隔10striggerContext.registerEventTimeTimer(i);}isFirstEvent.update(true);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {//使用的事件时间,因此触发窗口的计算return TriggerResult.FIRE;}@Overridepublic TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));isFirstEvent.clear();}}

移除器Evictor

作用:主要用来定义移除某些数据的逻辑。基于windowedStream调用evictor()方法,就可以传入一个自定义得移除器(Evictor)。不同窗口类型都有各自预测实现的移除器。

stream.keyby().window().evictor(new MyEvictor)

evictBefore():定义窗口执行函数之前移除的数据操作,移除后的数据不参与窗口计算;

evictAfter():定义执行窗口函数后移除数据的操作;

默认情况下预实现的移出弃都是在执行窗口函数之前移除数据

flink 提供的移除器

CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除; CountEvictor在countwindow中有明确定义引用。
DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。(暂时不清楚作用)
TimeEvictor:  接受窗口inteval时间,它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - inteval小的所有元素。TimeEvictor.of() 方法来构建; inteval 不是窗口时间,如果为0,窗口没有数据输出

//TimeEvictor  部分源码 @Overridepublic void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (!doEvictAfter) {evict(elements, size, ctx);}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (doEvictAfter) {evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime = getMaxTimestamp(elements);long evictCutoff = currentTime - windowSize;//移除时间窗口时间之前的数据,注意:获取的并不是窗口end时间for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}}
// 获取当前元素中最大的时间private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {long currentTime = Long.MIN_VALUE;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();currentTime = Math.max(currentTime, record.getTimestamp());}return currentTime;}// 保留多长时间的数据public static <W extends Window> TimeEvictor<W> of(Time windowSize) {return new TimeEvictor<>(windowSize.toMilliseconds());}/*** Creates a {@code TimeEvictor} that keeps the given number of elements. Eviction is done* before/after the window function based on the value of doEvictAfter.** @param windowSize The amount of time for which to keep elements.* @param doEvictAfter Whether eviction is done after window function.*/public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {return new TimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter);}
例如
stream.keyBy(r -> r.user)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.evictor(TimeEvictor.of(Time.seconds(3)))  // 只输出窗口关闭前3s的数据
.process( new WindowResult())
.print();

注意:如果在evict中使用了iterable.iterator(),后面再次使用时不能直接使用

 .keyBy(r -> r.user).window(TumblingEventTimeWindows.of(Time.seconds(10)));window.evictor(new Evictor<Event, TimeWindow>() {@Overridepublic void evictBefore(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {Iterator<TimestampedValue<Event>> iterator = elements.iterator();while (iterator.hasNext()){TimestampedValue<Event> next = iterator.next();if(next.getValue().url.equals("./prod?id=1")){iterator.remove();}}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Event>> elements, int size, TimeWindow window, EvictorContext evictorContext) {return;}}).process(new ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>() {@Overridepublic void process(String s, ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<UrlViewCount> out) throws Exception {AtomicInteger i= new AtomicInteger();elements.forEach(v-> i.getAndIncrement());out.collect(new UrlViewCount(s+"====",// 获取迭代器中的元素个数  不能再使用iterable.spliterator().getExactSizeIfKnown(),否侧获取数据一一直为-1i.longValue(),context.window().getStart(),context.window().getEnd()));} }).print();

相关文章:

flink-触发器Trigger和移除器Evictor

窗口原理与机制 图片链接&#xff1a;https://blog.csdn.net/qq_35590459/article/details/132177154 数据流进入算子前&#xff0c;被提交给WindowAssigner&#xff0c;决定元素被放到哪个或哪些窗口&#xff0c;同时可能会创建新窗口或者合并旧的窗口。每一个窗口都拥有一个…...

【力扣 28】找出字符串中第一个匹配项的下标 C++题解(字符串匹配)

给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;haystack “s…...

软件构造 | Design Patterns for Reuse and Maintainability

Design Patterns for Reuse and Maintainability &#xff08;面向可复用性和可维护性的设计模式&#xff09; Open-Closed Principle (OCP) ——对扩展的开放&#xff0c;对修改已有代码的封 Why reusable design patterns A design… …enables flexibility to change …...

Python数据分析-股票分析和可视化(深证指数)

一、内容简介 股市指数作为衡量股市整体表现的重要工具&#xff0c;不仅反映了市场的即时状态&#xff0c;也提供了经济健康状况的关键信号。在全球经济体系中&#xff0c;股市指数被广泛用于预测经济活动&#xff0c;评估投资环境&#xff0c;以及制定财政和货币政策。在中国…...

Linux如何安装openjdk1.8

文章目录 Centosyum安装jdk和JRE配置全局环境变量验证ubuntu使用APT(适用于Ubuntu 16.04及以上版本)使用PPA(可选,适用于需要特定版本或旧版Ubuntu)Centos yum安装jdk和JRE yum install java-1.8.0-openjdk-devel.x86_64 安装后的目录 配置全局环境变量 vim /etc/pr…...

【LLVM】LTO学习

看这篇文章&#xff0c;文中的代码都是错的&#xff0c;给出的命令行也是错的。 真不如参考文献中也是华为的外国员工写的PPT。 但是&#xff0c;上述的文件中的指令也存在报错&#xff0c;还是官方文档看着舒服。...

事务的特性-原子性(Atomicity)、一致性(Consistency)、隔离性(Asolation)、持久性(Durability)

一、引言 1、数据库管理系统DBMS为保证定义的事务是一个逻辑工作单元&#xff0c;达到引入事务的目的&#xff0c;实现的事务机制要保证事务具有原子性、一致性、隔离性和持久性&#xff0c;事务的这四个特性也统称为事务的ACID特性 2、当事务保持了ACID特性&#xff0c;才能…...

redis哨兵模式(Redis Sentinel)

哨兵模式的背景 当主服务器宕机后&#xff0c;需要手动把一台从服务器切换为主服务器&#xff0c;这就需要人工干预&#xff0c;费事费力&#xff0c;还会造成一段时间内服务不可用。这不是一种推荐的方式。 为了解决单点故障和提高系统的可用性&#xff0c;需要一种自动化的监…...

【牛客】牛客小白月赛97 题解 A - E

文章目录 A - 三角形B - 好数组C - 前缀平方和序列D - 走一个大整数迷宫E - 前缀和前缀最大值 A - 三角形 map存一下每个数出现了多少次&#xff0c;再遍历map #include <bits/stdc.h>using namespace std;#define int long long using i64 long long;typedef pair<…...

Spring Boot中泛型参数的灵活运用:最佳实践与性能优化

泛型是Java中一种强大的特性&#xff0c;它提供了编写通用代码的能力&#xff0c;使得代码更加灵活和可复用。在Spring Boot应用程序中&#xff0c;泛型参数的灵活运用可以带来诸多好处&#xff0c;包括增强代码的可读性、提高系统的健壮性以及优化系统的性能。本文将深入探讨在…...

MySQL建表时的注意事项

以下是我对MySQL建表时的注意事项。其实&#xff0c;建表事项有很多&#xff0c;我的总结如下&#xff1a; 1 存储引擎的选择&#xff0c;一般做开发&#xff0c;都是要支持事务的&#xff0c;所以选择InnoDB 2 对字段类型的选择&#xff1a; ​ 对于日期类型如果要记录时分…...

Advanced RAG 09:『提示词压缩』技术综述

编者按&#xff1a; 如何最大限度地发挥 LLMs 的强大能力&#xff0c;同时还能控制其推理成本&#xff1f;这是当前业界研究的一个热点课题。 针对这一问题&#xff0c;本期精心选取了一篇关于"提示词压缩"(Prompt Compression)技术的综述文章。正如作者所说&#xf…...

(13)DroneCAN 适配器节点(二)

文章目录 前言 2 固件 2.1 基于F103 2.2 基于F303 2.3 基于F431 3 ArduPilot固件DroneCAN设置 3.1 f303-通用设置示例 4 DroneCAN适配器节点 前言 这些节点允许现有的 ArduPilot 支持的外围设备作为 DroneCAN 或 MSP 设备适应 CAN 总线。这也允许扩展自动驾驶仪硬件的…...

摸鱼大数据——Spark基础——Spark环境安装——Spark Local[*]搭建

一、虚拟机配置 查看每一台的虚拟机的IP地址和网关地址 查看路径: cat /etc/sysconfig/network-scripts/ifcfg-ens33 2.修改 VMware的网络地址: 使用VMnet8 3.修改windows的对应VMware的网卡地址 4.通过finalshell 或者其他的shell连接工具即可连接使用即可, 连接后, 测试一…...

函数内部结构分层浅析(从MVC分层架构联想)

函数内部结构分层浅析&#xff08;从MVC分层架构联想&#xff09; 分层架构:一种将软件代码按不同功能进行划分的架构模式。 优点包括&#xff1a; 可维护性&#xff1a;各层职责明确&#xff0c;易于单独修改维护。 可扩展性&#xff1a;方便添加或修改某一层&#xff0c;不…...

【three.js案例二】时空隧道

import * as THREE from ./build/three.module.js // 引入轨道控制器扩展库OrbitControls.js import { OrbitControls } from three/addons/controls/OrbitControls.js; // 引入dat.gui.js的一个类GUI import { GUI } from three/addons/libs/lil-gui.module.min.js;// 场景 co…...

动手学深度学习(Pytorch版)代码实践 -计算机视觉-48全连接卷积神经网络(FCN)

48全连接卷积神经网络&#xff08;FCN&#xff09; 1.构造函数 import torch import torchvision from torch import nn from torch.nn import functional as F import matplotlib.pyplot as plt import liliPytorch as lp from d2l import torch as d2l# 构造模型 pretrained…...

【Python游戏】猫和老鼠

本文收录于 《一起学Python趣味编程》专栏,从零基础开始,分享一些Python编程知识,欢迎关注,谢谢! 文章目录 一、前言二、代码示例三、知识点梳理四、总结一、前言 本文介绍如何使用Python的海龟画图工具turtle,开发猫和老鼠游戏。 什么是Python? Python是由荷兰人吉多范…...

【无标题】c# WEBAPI 读写表到Redis

//c# WEBAPI 读写表到Redis using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Http; using System.Web.Http; using Newtonsoft.Json; using StackExchange.Redis; using System.Data; using System.Web; namespace …...

【剑指Offer系列】53-0到n中缺失的数字(index)

给定一个包含 [0, n] 中 n 个数的数组 nums &#xff0c;找出 [0, n] 这个范围内没有出现在数组中的那个数。 示例 1&#xff1a; 输入&#xff1a;nums [3,0,1] 输出&#xff1a;2 解释&#xff1a;n 3&#xff0c;因为有 3 个数字&#xff0c;所以所有的数字都在范围 [0,3]…...

Python爬虫实战:研究MechanicalSoup库相关技术

一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

Oracle查询表空间大小

1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

从物理机到云原生:全面解析计算虚拟化技术的演进与应用

前言&#xff1a;我的虚拟化技术探索之旅 我最早接触"虚拟机"的概念是从Java开始的——JVM&#xff08;Java Virtual Machine&#xff09;让"一次编写&#xff0c;到处运行"成为可能。这个软件层面的虚拟化让我着迷&#xff0c;但直到后来接触VMware和Doc…...

在Zenodo下载文件 用到googlecolab googledrive

方法&#xff1a;Figshare/Zenodo上的数据/文件下载不下来&#xff1f;尝试利用Google Colab &#xff1a;https://zhuanlan.zhihu.com/p/1898503078782674027 参考&#xff1a; 通过Colab&谷歌云下载Figshare数据&#xff0c;超级实用&#xff01;&#xff01;&#xff0…...

JUC并发编程(二)Monitor/自旋/轻量级/锁膨胀/wait/notify/锁消除

目录 一 基础 1 概念 2 卖票问题 3 转账问题 二 锁机制与优化策略 0 Monitor 1 轻量级锁 2 锁膨胀 3 自旋 4 偏向锁 5 锁消除 6 wait /notify 7 sleep与wait的对比 8 join原理 一 基础 1 概念 临界区 一段代码块内如果存在对共享资源的多线程读写操作&#xf…...

边缘计算设备全解析:边缘盒子在各大行业的落地应用场景

随着工业物联网、AI、5G的发展&#xff0c;数据量呈爆炸式增长。但你有没有想过&#xff0c;我们生成的数据&#xff0c;真的都要发回云端处理吗&#xff1f;其实不一定。特别是在一些对响应时间、网络带宽、数据隐私要求高的行业里&#xff0c;边缘计算开始“火”了起来&#…...