Flink定时器
flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。简单来说事件时间就相当于人出生的时间,一般数据生成的时候也会有创建时间。而事件处理时间则相当于人具体做某件事的时间,一条数据可能是2023年生成的,但是到2024年才被处理,这个2024年便被称为这个事件的处理时间。
一、事件时间定时器(event time),这是基于事件时间来触发的,这里面有一个小坑。当第一个事件到的时候创建一个定时器10秒后触发。对我们大部分人来说我既然已经创建了这个定时器,那么10秒后,他就会自动触发。但事实上他10秒后如果没有事件到来他并不会触发。大概意思就是前一个事件创建的定时器需要后一个事件的时间来触发。下面是事件时间定时器的一种实现方式。
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class EventTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);WatermarkStrategy<Temperature> twsDS= WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tSSODS.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);//System.out.printf("当前事件处理:"+DateFormat.getDateTime(ctx.timestamp()));//System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteEventTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerEventTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteEventTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}
下面我们做一个测试,来验证一下这个解释:前一个事件创建的定时器需要后一个事件的时间来触发。他们的时间间隔超过了10秒钟,但是时间并没有触发,而是下一个事件到的时候才触发的。

二、事件处理时间,事件处理时间触发有系统时间有关
package com.xcj;
import com.xcj.flink.bean.Temperature;
import com.xcj.util.DateFormat;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;public class ProcessTime {public static void main(String[] args) throws Exception {SourceTemperature mySourceTemperature = new SourceTemperature();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Temperature> tDSSource = env.addSource(mySourceTemperature);
// WatermarkStrategy<Temperature> twsDS
// = WatermarkStrategy.<Temperature>forBoundedOutOfOrderness(Duration.ofSeconds(0))
// .withTimestampAssigner((temperature, recordTimestamp) -> temperature.getTimestamp());
//
// SingleOutputStreamOperator<Temperature> tSSODS = tDSSource.assignTimestampsAndWatermarks(twsDS);KeyedStream<Temperature, String> keyByDS = tDSSource.keyBy(temperature -> temperature.getDay());SingleOutputStreamOperator<Temperature> process = keyByDS.process(new ProcessFunction<Temperature, Temperature>() {ListState<Temperature> temperatureListState;ValueState<Temperature> temperatureState;ValueState<Integer> size;ValueState<Long> temperature;@Overridepublic void open(OpenContext openContext) throws Exception {ListStateDescriptor<Temperature> listStateDescriptor = new ListStateDescriptor<>("listState", Temperature.class);temperatureListState = getRuntimeContext().getListState(listStateDescriptor);temperatureState = getRuntimeContext().getState(new ValueStateDescriptor<>("temperatureState", Temperature.class));size = getRuntimeContext().getState(new ValueStateDescriptor<>("sizeState", Integer.class));temperature = getRuntimeContext().getState(new ValueStateDescriptor<>("temperature", Long.class));}@Overridepublic void processElement(Temperature value, ProcessFunction<Temperature, Temperature>.Context ctx, Collector<Temperature> out) throws Exception {Temperature value1 = temperatureState.value();//System.out.println(ctx.timestamp());System.out.printf("当前事件时间:"+DateFormat.getDateTime(value.getTimestamp()));System.out.println("当前水位线:"+DateFormat.getDateTime(ctx.timerService().currentWatermark()));if(value1 == null){temperatureState.update(value);temperatureListState.add(value);size.update(1);temperature.update(ctx.timerService().currentProcessingTime());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}else{if(value1.getTemperature() < value.getTemperature()){temperatureState.update(value);temperatureListState.add(value);size.update(size.value()+1);//System.out.println(size.value());if(size.value()>= 3){System.out.printf("警告警告:");Iterator<Temperature> iterator = temperatureListState.get().iterator();while(iterator.hasNext()){out.collect(iterator.next());}temperatureListState.clear();temperatureState.clear();size.clear();ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);}}else{System.out.println("温度降低了");temperatureState.update(value);temperatureListState.clear();temperatureListState.add(value);size.update(1);ctx.timerService().deleteProcessingTimeTimer(temperature.value()+1000*10);temperature.update(value.getTimestamp());ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);}}}@Overridepublic void onTimer(long timestamp, ProcessFunction<Temperature, Temperature>.OnTimerContext ctx, Collector<Temperature> out) throws Exception {System.out.printf("时间到了:清空温度"+DateFormat.getDateTime(ctx.timestamp()));temperatureListState.clear();temperatureState.clear();size.clear();if(temperature.value() != null)ctx.timerService().deleteProcessingTimeTimer(temperature.value() + 10*1000);}});process.print("当前警告温度为:");env.execute();}
}//自己定义数据源
class SourceTemperature extends RichSourceFunction<Temperature> {@Overridepublic void run(SourceContext<Temperature> ctx) throws Exception {Scanner scanner = new Scanner(System.in);while (true) {Temperature temperature = new Temperature();System.out.print("请输入温度: ");//double temp = Math.random()*40;double temp = scanner.nextDouble();//System.out.println(temp);temperature.setTemperature(temp);temperature.setTimestamp(new Date().getTime());ctx.collect(temperature);//Thread.sleep(1000);}}@Overridepublic void cancel() {}
}//自定义实体类
class Temperature1 {public Temperature1(double temperature, long timestamp) {this.temperature = temperature;this.timestamp = timestamp;}public Temperature1(){};//温度private double temperature;//时间private long timestamp;//idprivate String day = "2024-12-24";public double getTemperature() {return temperature;}public void setTemperature(double temperature) {this.temperature = temperature;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public String getDay() {return day;}public void setDay(String day) {this.day = day;}@Overridepublic String toString() {return "Temperature1{" +"temperature=" + temperature +", timestamp=" + timestamp +", day='" + day + '\'' +'}';}
}
事件处理时间是不需要下一个事件触发的

三、总结
事件时间(event time) 与事件处理时间(process time)定时器整体代码其实差不多,主要是在注册定时器的时候选择的方法
//事件时间
ctx.timerService().registerEventTimeTimer(value.getTimestamp());
//事件处理事件
ctx.timerService().registerProcessingTimeTimer(temperature.value()+1000*10);
和不同定时器的逻辑。注意:事件时间定时器是需要下一个事件来触发上一个事件的定时任务,但是事件处理时间定时器是不需要下一个事件来触发的,他是根据注册时间和系统时间的差值来触发的。

上面我把注册时间改为了过去很久的时间,来一个就触发一次定时任务,因为注册时间与当前系统时间相差>10秒,所以会直接触发。
相关文章:
Flink定时器
flink的定时器都是基于事件时间(event time)或事件处理时间(processing time)的变化来触发响应的。对一部分新手玩家来说,可能不清楚事件时间和事件处理时间的区别。我这里先说一下我的理解,防止下面懵逼。…...
《算力互联互通标准体系1.0》发布,为算力互联成网发展提供指导框架
2024年政府工作报告提出:“适度超前建设数字基础设施,加快形成全国一体化算力体系,培育算力产业生态”。因此提供普惠化算力服务、培育算力大市场的算力互联网体系是响应国家布局的重要路径。 我国算力产业发展已取得突破性进展,…...
视频监控平台:Liveweb视频汇聚融合平台智慧安防视频监控应用方案
Liveweb是一款功能强大、灵活部署的安防视频监控平台,支持多种主流标准协议,包括GB28181、RTSP/Onvif、RTMP等,同时兼容海康Ehome、海大宇等厂家的私有协议和SDK接入。该平台不仅提供传统安防监控功能,还支持接入AI智能分析&#…...
STM32串口第一次接收数据时第一个字节丢失的问题
解决方法:开启中断之前,先清除标志位【1】。 串口清除标志位: __HAL_UART_CLEAR_PEFLAG(&huart1); HAL_UART_Receive_IT(&huart1,&RxUart, 1); 定时器清除标志位: __HAL_TIM_CLEAR_FLAG(&htim3,TIM_FLAG_UPDATE);…...
Zookeeper基本命令解析
ZooKeeper -server host:port -client-configuration properties-file cmd args addWatch [-m mode] path # optional mode is one of [PERSISTENT, PERSISTENT_RECURSIVE] - default is PERSISTENT_RECURSIVE addauth scheme auth 一、整体命令格式 ZooKeeper -serve…...
RustDesk远程及自建服务器搭建教程
要开始使用RustDesk远程和自建服务器,你需要遵循以下步骤: 下载和安装RustDesk:RustDesk是一款开源的远程支持应用程序。你可以在其官方网站(https://rustdesk.com/)上下载适用于你的操作系统的安装程序。安装过程非常…...
广州大彩串口屏安卓/linux触摸屏四路CVBS输入实现同时显示!
一、适用范围 适合广州大彩A40系列产品 产品型号: 二、概述 CVBS只需要一条线缆即可完成视频信号的传输,具有兼容性强、使用简单、成本低廉等优点。典型分辨率为720x480(NTSC制)或720x576(PAL制)。 三、…...
Python:模拟(包含例题)
模拟题:直接按照题目含义模拟即可,一般不涉及算法 注意: 1.读懂题:理清楚题目流程 2.代码和步骤一一对应:变量名,函数名,函数功能 3.提取重复的部分,写成对应的函数(…...
Python OCR 文字识别
一.引言 文字识别,也称为光学字符识别(Optical Character Recognition, OCR),是一种将不同形式的文档(如扫描的纸质文档、PDF文件或数字相机拍摄的图片)中的文字转换成可编辑和可搜索的数据的技术。随着技…...
阿里巴巴2017实习生笔试题(二)
阿里巴巴2017实习生笔试题(二) 2024/12/25 1.下面哪一个不是动态链接库的优点? B A.共享 B.装载速度快 C.开发模式好 D.减少页面交换 解析 1 静态链接库的优点 (1) 代码装载速度快,执行速度略比动态链接库快;…...
Docker安装与使用
文章目录 0.关键词1.安装docker2.镜像和容器3.Docker基础1.常见命令2.数据卷(volume)要解决的问题:什么是数据卷:解决的办法: 3.数据卷的使用基本命令挂载数据卷(nginx)基于本地目录数据挂载&am…...
通过nginx设置一个图片服务器,并使用 Nginx 作为反向代理
通过nginx设置一个图片服务器,并使用 Nginx 作为反向代理 安装nginx 首先需要去官网下载一个nginx,我这里下载了最新的稳定版本:nginx-1.26.2,下载下来是一个压缩包,解压之后就可以直接用了。 修改nginx的配置文件 …...
MacOS M3源代码编译Qt6.8.1
编译时间过长,如果不想自己编译,可以通过如果网盘进行下载: 链接: https://pan.baidu.com/s/17lvF5jQ-vR6vE-KEchzrVA?pwdts26 提取码: ts26 在macOS上编译Qt 6需要一些前置步骤和工具。以下是编译Qt 6的基本步骤: 安装Xcode和…...
【Pytorch实用教程】PyTorch 自带的数据集全面解读
下面这篇博客文章将带你快速了解 PyTorch 自带(或官方维护)的各类常用数据集,并介绍它们的使用方法,包括图像、文本和音频数据集。希望能帮助你在项目中快速上手并提高效率。 一、为什么要使用 PyTorch 自带的数据集? 1. 方便、快捷 官方维护的数据集通常已经帮助我们做好…...
Flask使用的正例和反例
Flask使用的正例和反例 文章目录 Flask使用的正例和反例一 , 使用注册异常二 , 新增数据成功后要返回新增数据的id三, 模型查询语句抽取成函数四, 业务逻辑函数传递的参数不应该用字典类型,要传不同字段的参数…...
2024年河北省职业院校技能大赛云计算应用赛项赛题第2套(私有云)
#需要资源(软件包及镜像)或有问题的,可私聊博主!!! #需要资源(软件包及镜像)或有问题的,可私聊博主!!! #需要资源(软件包…...
我的软件架构师——Java 职位面试经历。
最近,我参加了一家领先的服务型公司的软件架构师(Java)职位的面试。我在这里分享了一些面试官问我的问题。我只列出了与 Java 相关的问题,因为本文主要关注 Java。面试官问我有关 AWS、Docker、Kubernetes、Kafka、Elastic Search、SQL/NoSQL 和设计模式的问题。 ClassNotF…...
npm error code ETIMEDOUT
参考:https://blog.csdn.net/qq_38572963/article/details/142052986 二、解决办法 1、清空缓存 npm cache clean --force 2、查看当前的npm镜像设置 npm config get registry 3、切换新镜像源 npm config set registry https://registry.npmmirror.com 4、查看新源是否设置成功…...
(11)(3.2) ESC信号问题
文章目录 前言 1 信号电平不足 2 感应噪声 3 ESC过电压尖峰 4 ESC固件错误 前言 本页讨论了 ESC 信号的几个潜在问题,这些问题可能导致不可靠的操作,甚至在起飞时坠毁。这些主要发生在较大的四轮飞机上,这些飞机从动力和自动驾驶仪到 E…...
Postman最新接口自动化持续集成
学习地址:https://www.bilibili.com/video/BV1VDC5Y7EJA?spm_id_from333.788.videopod.episodes&vd_source336a0b0a2ff09832b3a55c3599ffb193&p9 1、旧版:PostmanNewmanAllureJenkins 缺点:需要安装较多软件,脚步需要手…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...
毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
