Flink定时器
flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。
一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class EventTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);WatermarkStrategy<Temperature> twsDS= WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);//System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));//System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}
下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。

二、事件处理时间,事件处理时间触发有系统时间有关
package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class ProcessTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
// WatermarkStrategy<Temperature> twsDS
// = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
// .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
// SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);temperature.update(ctx.timerService().currentProcessingTime());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}
事件处理时间是不需要下一个事件触发的

三、总结
事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法
//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。

上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。
相关文章:
Flink定时器
flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。…...
《算力互联互通标准体系1.0》发布,为算力互联成网发展提供指导框架
2024年政府工作报告提出:“适度超前建设数字基础设施,加快形成全国一体化算力体系,培育算力产业生态”。因此提供普惠化算力服务、培育算力大市场的算力互联网体系是响应国家布局的重要路径。 我国算力产业发展已取得突破性进展,…...
视频监控平台:Liveweb视频汇聚融合平台智慧安防视频监控应用方案
Liveweb是一款功能强大、灵活部署的安防视频监控平台,支持多种主流标准协议,包括GB28181、RTSP/Onvif、RTMP等,同时兼容海康Ehome、海大宇等厂家的私有协议和SDK接入。该平台不仅提供传统安防监控功能,还支持接入AI智能分析&#…...
STM32串口第一次接收数据时第一个字节丢失的问题
解决方法:开启中断之前,先清除标志位【1】。 串口清除标志位: __HAL_UART_CLEAR_PEFLAG(&huart1); HAL_UART_Receive_IT(&huart1,&RxUart, 1); 定时器清除标志位: __HAL_TIM_CLEAR_FLAG(&htim3,TIM_FLAG_UPDATE);…...
Zookeeper基本命令解析
ZooKeeper -server host:port -client-configuration properties-file cmd args addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE addauth scheme auth 一、整体命令格式 ZooKeeper -serve…...
RustDesk远程及自建服务器搭建教程
要开始使用RustDesk远程和自建服务器,你需要遵循以下步骤: 下载和安装RustDesk:RustDesk是一款开源的远程支持应用程序。你可以在其官方网站(https://rustdesk.com/)上下载适用于你的操作系统的安装程序。安装过程非常…...
广州大彩串口屏安卓/linux触摸屏四路CVBS输入实现同时显示!
一、适用范围 适合广州大彩A40系列产品 产品型号: 二、概述 CVBS只需要一条线缆即可完成视频信号的传输,具有兼容性强、使用简单、成本低廉等优点。典型分辨率为720x480(NTSC制)或720x576(PAL制)。 三、…...
Python:模拟(包含例题)
模拟题:直接按照题目含义模拟即可,一般不涉及算法 注意: 1.读懂题:理清楚题目流程 2.代码和步骤一一对应:变量名,函数名,函数功能 3.提取重复的部分,写成对应的函数(…...
Python OCR 文字识别
一.引言 文字识别,也称为光学字符识别(Optical Character Recognition, OCR),是一种将不同形式的文档(如扫描的纸质文档、PDF文件或数字相机拍摄的图片)中的文字转换成可编辑和可搜索的数据的技术。随着技…...
阿里巴巴2017实习生笔试题(二)
阿里巴巴2017实习生笔试题(二) 2024/12/25 1.下面哪一个不是动态链接库的优点? B A.共享 B.装载速度快 C.开发模式好 D.减少页面交换 解析 1 静态链接库的优点 (1) 代码装载速度快,执行速度略比动态链接库快;…...
Docker安装与使用
文章目录 0.关键词1.安装docker2.镜像和容器3.Docker基础1.常见命令2.数据卷(volume)要解决的问题:什么是数据卷:解决的办法: 3.数据卷的使用基本命令挂载数据卷(nginx)基于本地目录数据挂载&am…...
通过nginx设置一个图片服务器,并使用 Nginx 作为反向代理
通过nginx设置一个图片服务器,并使用 Nginx 作为反向代理 安装nginx 首先需要去官网下载一个nginx,我这里下载了最新的稳定版本:nginx-1.26.2,下载下来是一个压缩包,解压之后就可以直接用了。 修改nginx的配置文件 …...
MacOS M3源代码编译Qt6.8.1
编译时间过长,如果不想自己编译,可以通过如果网盘进行下载: 链接: https://pan.baidu.com/s/17lvF5jQ-vR6vE-KEchzrVA?pwdts26 提取码: ts26 在macOS上编译Qt 6需要一些前置步骤和工具。以下是编译Qt 6的基本步骤: 安装Xcode和…...
【Pytorch实用教程】PyTorch 自带的数据集全面解读
下面这篇博客文章将带你快速了解 PyTorch 自带(或官方维护)的各类常用数据集,并介绍它们的使用方法,包括图像、文本和音频数据集。希望能帮助你在项目中快速上手并提高效率。 一、为什么要使用 PyTorch 自带的数据集? 1. 方便、快捷 官方维护的数据集通常已经帮助我们做好…...
Flask使用的正例和反例
Flask使用的正例和反例 文章目录 Flask使用的正例和反例一 , 使用注册异常二 , 新增数据成功后要返回新增数据的id三, 模型查询语句抽取成函数四, 业务逻辑函数传递的参数不应该用字典类型,要传不同字段的参数…...
2024年河北省职业院校技能大赛云计算应用赛项赛题第2套(私有云)
#需要资源(软件包及镜像)或有问题的,可私聊博主!!! #需要资源(软件包及镜像)或有问题的,可私聊博主!!! #需要资源(软件包…...
我的软件架构师——Java 职位面试经历。
最近,我参加了一家领先的服务型公司的软件架构师(Java)职位的面试。我在这里分享了一些面试官问我的问题。我只列出了与 Java 相关的问题,因为本文主要关注 Java。面试官问我有关 AWS、Docker、Kubernetes、Kafka、Elastic Search、SQL/NoSQL 和设计模式的问题。 ClassNotF…...
npm error code ETIMEDOUT
参考:https://blog.csdn.net/qq_38572963/article/details/142052986 二、解决办法 1、清空缓存 npm cache clean --force 2、查看当前的npm镜像设置 npm config get registry 3、切换新镜像源 npm config set registry https://registry.npmmirror.com 4、查看新源是否设置成功…...
(11)(3.2) ESC信号问题
文章目录 前言 1 信号电平不足 2 感应噪声 3 ESC过电压尖峰 4 ESC固件错误 前言 本页讨论了 ESC 信号的几个潜在问题,这些问题可能导致不可靠的操作,甚至在起飞时坠毁。这些主要发生在较大的四轮飞机上,这些飞机从动力和自动驾驶仪到 E…...
Postman最新接口自动化持续集成
学习地址:https://www.bilibili.com/video/BV1VDC5Y7EJA?spm_id_from333.788.videopod.episodes&vd_source336a0b0a2ff09832b3a55c3599ffb193&p9 1、旧版:PostmanNewmanAllureJenkins 缺点:需要安装较多软件,脚步需要手…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
面试高频问题
文章目录 🚀 消息队列核心技术揭秘:从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"?性能背后的秘密1.1 顺序写入与零拷贝:性能的双引擎1.2 分区并行:数据的"八车道高速公路"1.3 页缓存与批量处理…...
快速排序算法改进:随机快排-荷兰国旗划分详解
随机快速排序-荷兰国旗划分算法详解 一、基础知识回顾1.1 快速排序简介1.2 荷兰国旗问题 二、随机快排 - 荷兰国旗划分原理2.1 随机化枢轴选择2.2 荷兰国旗划分过程2.3 结合随机快排与荷兰国旗划分 三、代码实现3.1 Python实现3.2 Java实现3.3 C实现 四、性能分析4.1 时间复杂度…...
【Java多线程从青铜到王者】单例设计模式(八)
wait和sleep的区别 我们的wait也是提供了一个还有超时时间的版本,sleep也是可以指定时间的,也就是说时间一到就会解除阻塞,继续执行 wait和sleep都能被提前唤醒(虽然时间还没有到也可以提前唤醒),wait能被notify提前唤醒…...
用 FFmpeg 实现 RTMP 推流直播
RTMP(Real-Time Messaging Protocol) 是直播行业中常用的传输协议。 一般来说,直播服务商会给你: ✅ 一个 RTMP 推流地址(你推视频上去) ✅ 一个 HLS 或 FLV 拉流地址(观众观看用)…...
STL 2迭代器
文章目录 1.迭代器2.输入迭代器3.输出迭代器1.插入迭代器 4.前向迭代器5.双向迭代器6.随机访问迭代器7.不同容器返回的迭代器类型1.输入 / 输出迭代器2.前向迭代器3.双向迭代器4.随机访问迭代器5.特殊迭代器适配器6.为什么 unordered_set 只提供前向迭代器? 1.迭代器…...
