当前位置: 首页 > news >正文

Flink移除器Evictor

前言

在 Flink 窗口计算模型中,数据被 WindowAssigner 划分到对应的窗口后,再经过触发器 Trigger 判断窗口是否要 fire 计算,如果窗口要计算,会把数据丢给移除器 Evictor,Evictor 可以先移除部分元素再交给 ProcessFunction 处理,也可以等 ProcessFunction 处理完成后再移除数据。

认识Evictor

Flink中所有的移除器都是org.apache.flink.streaming.api.windowing.evictors.Evictor的子类

public interface Evictor<T, W extends Window> extends Serializable {void evictBefore(Iterable<TimestampedValue<T>> var1, int var2, W var3, EvictorContext var4);void evictAfter(Iterable<TimestampedValue<T>> var1, int var2, W var3, EvictorContext var4);public interface EvictorContext {long getCurrentProcessingTime();MetricGroup getMetricGroup();long getCurrentWatermark();}
}

Evictor定义了两个方法:

  • evictBefore ProcessFunction处理前调用,用于移除无须计算的元素
  • evictAfter ProcessFunction处理后调用

内置的Evictor

Flink内置了三个Evictor,当这些Evictor不满足业务场景时,也可以自定义Evictor。

1、TimeEvictor

给定一个时间窗口大小,仅保留该时间窗口范围内的元素,对于超过了窗口时间范围的元素,会一律移除。

public class TimeEvictor<W extends Window> implements Evictor<Object, W> {private static final long serialVersionUID = 1L;private final long windowSize;private final boolean doEvictAfter;public TimeEvictor(long windowSize) {this.windowSize = windowSize;this.doEvictAfter = false;}public TimeEvictor(long windowSize, boolean doEvictAfter) {this.windowSize = windowSize;this.doEvictAfter = doEvictAfter;}public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (!this.doEvictAfter) {this.evict(elements, size, ctx);}}public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (this.doEvictAfter) {this.evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, Evictor.EvictorContext ctx) {if (this.hasTimestamp(elements)) {long currentTime = this.getMaxTimestamp(elements);long evictCutoff = currentTime - this.windowSize;Iterator<TimestampedValue<Object>> iterator = elements.iterator();while(iterator.hasNext()) {TimestampedValue<Object> record = (TimestampedValue)iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}}}
}

2、DeltaEvictor

给定一个 double 阈值和一个差值计算函数 DeltaFunction,依次计算窗口内元素和最后一个元素的差值 delta,所有 delta 超过阈值的元素都会被移除。

public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {private static final long serialVersionUID = 1L;DeltaFunction<T> deltaFunction;private double threshold;private final boolean doEvictAfter;private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.doEvictAfter = false;}private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {this.deltaFunction = deltaFunction;this.threshold = threshold;this.doEvictAfter = doEvictAfter;}public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, Evictor.EvictorContext ctx) {if (!this.doEvictAfter) {this.evict(elements, size, ctx);}}public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, Evictor.EvictorContext ctx) {if (this.doEvictAfter) {this.evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<T>> elements, int size, Evictor.EvictorContext ctx) {TimestampedValue<T> lastElement = (TimestampedValue)Iterables.getLast(elements);Iterator<TimestampedValue<T>> iterator = elements.iterator();while(iterator.hasNext()) {TimestampedValue<T> element = (TimestampedValue)iterator.next();if (this.deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}}
}

3、CountEvictor

给定一个 maxCount,依次遍历窗口内的元素,数量超过 maxCount 后的所有元素全部移除。

public class CountEvictor<W extends Window> implements Evictor<Object, W> {private static final long serialVersionUID = 1L;private final long maxCount;private final boolean doEvictAfter;private CountEvictor(long count, boolean doEvictAfter) {this.maxCount = count;this.doEvictAfter = doEvictAfter;}private CountEvictor(long count) {this.maxCount = count;this.doEvictAfter = false;}public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (!this.doEvictAfter) {this.evict(elements, size, ctx);}}public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {if (this.doEvictAfter) {this.evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, Evictor.EvictorContext ctx) {if ((long)size > this.maxCount) {int evictedCount = 0;Iterator<TimestampedValue<Object>> iterator = elements.iterator();while(iterator.hasNext()) {iterator.next();++evictedCount;if ((long)evictedCount > (long)size - this.maxCount) {break;}iterator.remove();}}}
}

自定义Evictor

实现org.apache.flink.streaming.api.windowing.evictors.Evictor接口即可自定义 Evictor,泛型要注意,第一个是元素类型,第二个是窗口类型。

举个例子,我们定义一个 Evictor,它在 ProcessFunction 计算前把窗口内所有的奇数全部移除掉,只保留偶数。

public static class MyEvictor implements Evictor<Integer, GlobalWindow> {@Overridepublic void evictBefore(Iterable<TimestampedValue<Integer>> iterable, int i, GlobalWindow globalWindow, EvictorContext evictorContext) {Iterator<TimestampedValue<Integer>> iterator = iterable.iterator();while (iterator.hasNext()) {TimestampedValue<Integer> value = iterator.next();if (value.getValue() % 2 != 0) {iterator.remove();}}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Integer>> iterable, int i, GlobalWindow globalWindow, EvictorContext evictorContext) {}
}

编写一个简单的 Flink 作业验证一下我们自定义的 Evictor,数据源手动指定为数字1到6,统一分配到 GlobalWindow 窗口,Trigger 元素等于6个就出发计算,最终输出窗口内的元素

public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();environment.fromElements(1, 2, 3, 4, 5, 6).windowAll(GlobalWindows.create()).trigger(CountTrigger.of(6)).evictor(new MyEvictor()).process(new ProcessAllWindowFunction<Integer, Object, GlobalWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Integer, Object, GlobalWindow>.Context context, Iterable<Integer> iterable, Collector<Object> collector) throws Exception {String elements = StringUtils.joinWith(",", iterable);System.err.println(elements);}});environment.execute();
}

运行Flink作业,控制台输出[2, 4, 6],奇数在计算前就被移除掉了。

尾巴

Flink 的 Evictor 主要用于在窗口计算过程中,对窗口中的元素进行筛选和剔除。通过定义特定的 Evictor 策略,可以有效地控制窗口内数据的留存和输出。 Evictor 有助于提高数据处理的准确性和效率。它能够根据业务需求,如时间、数据特征等,去除不符合条件的数据,从而使窗口的计算结果更具针对性和可靠性。

相关文章:

Flink移除器Evictor

前言 在 Flink 窗口计算模型中&#xff0c;数据被 WindowAssigner 划分到对应的窗口后&#xff0c;再经过触发器 Trigger 判断窗口是否要 fire 计算&#xff0c;如果窗口要计算&#xff0c;会把数据丢给移除器 Evictor&#xff0c;Evictor 可以先移除部分元素再交给 ProcessFu…...

R语言实现多元线性回归高杠杠点,离群点分析

14a set.seed(1) x1 = runif(100) x2 = 0.5 * x1 + rnorm(100)/...

overfrp内网穿透:使用域名将内网http/https服务暴露到公网

项目地址&#xff1a;https://github.com/sometiny/overfrp 使用overfrp部署穿透服务器&#xff0c;绑定域名后&#xff0c;可使用域名访问内网的http/https服务。 用例中穿透服务器和内网机器之间的访问全链路加密&#xff0c;具有ssh2相当的安全级别。&#xff01;&#xf…...

springboot034在线商城系统设计与开发-代码(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 题目&#xff1a;ONLY在线商城系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本ONLY在线商城系统…...

什么是第三范式(3NF)?为什么要遵守第三范式?

第三范式&#xff08;Third Normal Form, 3NF&#xff09;是数据库设计中的一个重要概念&#xff0c;它是对关系型数据库规范化的一种标准。 在数据库设计中&#xff0c;通过将数据表按照一定的规则进行分解&#xff0c;可以减少数据冗余和提高数据的一致性。 3NF 是建立在第…...

大数据比对,shell脚本与hive技术结合

需求描述 从主机中获取加密数据内容&#xff0c;解密数据内容&#xff08;可能会存在json解析&#xff09;插入到另一个库中&#xff0c;比对原始库和新库的相同表数据的数据一致性内容。 数据一致性比对实现 上亿条数据&#xff0c;如何比对并发现两个表数据差异 相关流程…...

【Linux安全基线】- CentOS 7/8安全配置指南

在企业业务的生产环境中&#xff0c;Linux服务器的安全性至关重要&#xff0c;尤其是对于具有超级用户权限的root账号。滥用或被入侵后&#xff0c;可能会造成数据泄露、系统损坏等严重安全问题。为了减少这种风险&#xff0c;本文将详细介绍如何通过一系列安全措施来增强CentO…...

PDF.js的使用及其跨域问题解决

目录 一、PDF.js 简介 二、使用配置和步骤 1.引入PDF.js 2.加载PDF文件 3.渲染PDF页面 三、在Vue中使用PDF.js示例 1.安装PDF.js 2.在Vue组件中使用 四、在原生js中使用PDF.js示例 1.加载PDF文件并渲染页面 五、解决跨域问题 1.服务器配置 2.使用代理服务器 下面介…...

Linux Redis查询key与移除日常操作

维护老项目Express node 编写的后端程序、有这么一个方法、没有设置redis过期时间&#xff08;建议设置过期时间&#xff0c;毕竟登录生产服务器并不是每个人都有权限登录的&#xff01;&#xff01;&#xff01;&#xff09;。如果变动只能通过登录生产服务器、手动修改… 于…...

开源两个月,antflow后端项目全网获近100星

从六月初开源,转眼间AntFlow已经开源将近四个月了(前端比后端早了大约2个月,后端于8.18开源).(其实准备是重构以前开源版本.前年的时候我们已经将Vue2版的流程设计器开源了.后来由于疫情原因,没有再继续持续开发.)后来有一天再打开仓库的时候,发现虽然很久没有更新了,但是不断有…...

设计模式——工厂方法模式(2)抽象工厂模式(3)

一、写在前面 创建型模式 单例模式工厂方法模式抽象工厂模式原型模式建造者模式 结构型模式行为型模式工厂方法模式和抽象工厂模式都属于工厂模式&#xff0c;所以放在一起介绍了 二、介绍 为什么要工厂模式&#xff1f;工厂就像一个黑盒一样&#xff0c;所以用工厂模式来创…...

简单聊聊System V下的IPC + 内核是如何管理该IPC

文章目录 前言&#xff1a;&#x1f383;消息队列&#xff1a;1. **消息队列的基本概念**2. **消息队列的特点**3. **常见的消息队列操作&#xff08;Linux IPC&#xff09;****1) msgget&#xff1a;创建或获取消息队列****2) msgsnd&#xff1a;发送消息****3) msgrcv&#x…...

【WRF工具】服务器上安装convert_geotiff

【WRF工具】服务器上安装convert_geotiff convert_geotiff简介方法1&#xff1a;下载安装包后下载convert_geotiff依赖库安装库1&#xff1a;libtiff库2&#xff1a;sqlite库3&#xff1a;curl库4&#xff1a;projcmake更新&#xff08;可选&#xff09;库5&#xff1a;geotiff…...

RPC通讯基础原理

1.RPC&#xff08;Remote Procedure Call&#xff09;概述 RPC是一种通过网络从远程计算机上调用程序的技术&#xff0c;使得构建分布式计算更加容易&#xff0c;在提供强大的远程调用能力时不损失本地调用的语义简洁性&#xff0c;提供一种透明调用机制&#xff0c;让使用者不…...

JavaScript 第18章:安全性

在JavaScript开发中&#xff0c;确保应用的安全性是非常重要的。下面我将根据你提到的几个方面来讲解如何增强Web应用程序的安全性。 XSS&#xff08;跨站脚本&#xff09;攻击防御 示例代码&#xff1a; function escapeHTML(unsafe) {return unsafe.replace(/&/g, &qu…...

基于workbox实现PWA预缓存能力

引言 Service Worker 是一项流行的技术&#xff0c;尽管在许多项目中尚未得到充分利用。基于本次项目首页加载优化的机会&#xff0c;决定尝试使用 Google 出品的 Workbox&#xff0c;以观察其优化效果。 开始 安装 项目使用 Webpack 打包&#xff0c;而 Workbox 提供了 We…...

探索Web3生态系统:社区、协议与参与者的角色

Web3代表着互联网的下一个演变阶段&#xff0c;旨在通过去中心化技术赋予用户更大的控制权和参与感。在这个新兴生态系统中&#xff0c;社区、协议和参与者扮演着不可或缺的角色&#xff0c;共同推动着Web3的建设与发展。 社区的核心作用 在Web3中&#xff0c;社区通过提供反馈…...

无人机电机故障率骤降:创新设计与六西格玛方法论双赢

项目背景 TBR-100是消费级无人机头部企业推出的主打消费级无人机&#xff0c;凭借其出色的续航能力和卓越的操控性&#xff0c;在市场上获得了广泛认可。在产品运行过程&#xff0c;用户反馈电机故障率偏高&#xff0c;尤其是在飞行一段时间后出现电机过热、损坏以及运行不稳定…...

samba禁用时拷贝服务器文件到本地的脚本

Android系统开发一般在ubuntu服务器上&#xff0c;我们办公电脑一般是windows。在将编译出来的模块push到板子上时&#xff0c;一般采用adb push 方式。 有时由于种种原因会出现服务器禁用了samba&#xff0c;导致无法直接用adb push 的情况。 下面介绍用winscp 走ssh 拷贝服…...

C#代码 串口通信晋中A2板,控制直流电机

1&#xff0c;在电脑中给晋中板中下载编译好的程序。 0x39 &#xff1a;开启电机的标识 代码&#xff1a; /********************************************************************************** **** 实验名称&#xff1a;串口通信实验 接线说明&#xff1a; 实验现象&…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

【kafka】Golang实现分布式Masscan任务调度系统

要求&#xff1a; 输出两个程序&#xff0c;一个命令行程序&#xff08;命令行参数用flag&#xff09;和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽&#xff0c;然后将消息推送到kafka里面。 服务端程序&#xff1a; 从kafka消费者接收…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

生成 Git SSH 证书

&#x1f511; 1. ​​生成 SSH 密钥对​​ 在终端&#xff08;Windows 使用 Git Bash&#xff0c;Mac/Linux 使用 Terminal&#xff09;执行命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​&#xff1a; -t rsa&#x…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...