【开发篇】一、处理函数:定时器与定时服务
文章目录
- 1、基本处理函数
- 2、定时器和定时服务
- 3、KeyedProcessFunction下演示定时器
- 4、process重获取当前watermark
前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作
,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑
!!!
1、基本处理函数
处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:
stream.process(new MyProcessFunction())
-
ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction
-
ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型
-
ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}
抽象方法 processElement:
- 定义处理元素的逻辑
- 流中的每个元素都会调用一次这个房啊
- 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据
非抽象方法onTimer:
- 定时器触发时调用这个方法
- 注册定时器即设一个闹钟,
onTimer则是闹钟响了以后要做的事
- onTimer是基于时间线的一个回调方法
- onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)
最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction
关于处理函数的分类:
- 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
- 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction
2、定时器和定时服务
ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:
- 获取当前的处理时间
long currentProcessingTime();
- 获取当前的水位线(事件时间)
long currentWatermark();
- 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
- 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);
注意:
- 只有在KeyedStream中才支持使用TimerService设置定时器的操作
- TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次
3、KeyedProcessFunction下演示定时器
事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L) //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value 每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}
运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms
,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:
看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发
再用处理时间下的定时器:
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}
运行:
4、process重获取当前watermark
还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:
//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}
此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个
在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。
上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)
相关文章:

【开发篇】一、处理函数:定时器与定时服务
文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图: 在Flink…...

重入漏洞EtherStore
重入漏洞 // SPDX-License-Identifier: MIT pragma solidity ^0.8.13;contract EtherStore {mapping(address > uint) public balances;function deposit() public payable {balances[msg.sender] msg.value;}function withdraw() public {uint bal balances[msg.sender]…...

账号运营的底层逻辑---获客思维
什么是运营? 运营是做什么的? 什么是内容运营? 什么是活动运营? 一篇带你搞清楚所有的底层逻辑!...

Pinia中如何实现数据持久化操作
使用vue3中的pinia,我们可以在多个页面间共享数据,但是一旦我们关闭或刷新页面,这些数据就会丢失,因此,我们需要有一种数据持久化的解决方案。在记录vue3 使用vue3中的pinia,我们可以在多个页面间共享数据&…...
【owt-server】RTC视频接收调用流程学习笔记1: Call::CreateVideoReceiveStream 前后
WebRTC源码分析——Call模块 大神提到,call模块是在worker线程创建的。主要创建接收、发送流Call模块是WebRTC会话中不可缺少的一个模块,一个Call对象可以包含多个发送/接收流,且这些流对应同一个远端端点,并共享码率估计。 call中通过webrtc::VideoReceiveStream::Config …...

淘宝商品链接获取淘宝商品评论数据(用 Python实现淘宝商品评论信息抓取)
在网页抓取方面,可以使用 Python、Java 等编程语言编写程序,通过模拟 HTTP 请求,获取淘宝多网站上的商品详情页面评论内容。在数据提取方面,可以使用正则表达式、XPath 等方式从 HTML 代码中提取出有用的信息。值得注意的是&#…...

十九、类型信息(1)
本章概要 为什么需要 RTTI RTTI(RunTime Type Information,运行时类型信息)能够在程序运行时发现和使用类型信息 RTTI 把我们从只能在编译期进行面向类型操作的禁锢中解脱了出来,并且让我们可以使用某些非常强大的程序。对 RTTI …...

十八、字符串(3)
本章概要 正则表达式 基础创建正则表达式量词CharSequencePattern 和 Matcherfinde()组(Groups)start() 和 end()Pattern 标记split()替换操作reset()正则表达式与 Java I/0 正则表达式 很久之前,_正则表达式_就已经整合到标准 Unix 工具…...

基于SSM的酒店预约及管理系统设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…...
MIxformerV2的onnx和tensorrt加速
MIxformerV2的onnx和tensorrt加速 注意事项 地址:github地址 注意事项 转换成onnx模型之前,最好现简化算法的源代码,使其结构干净。因为在进行onnx转换后,可能在进行onnx→trt时算子不匹配,这时就需要去查看模型的源…...
Kotlin 中let 、run 、with、apply、also的用法与区别
实例代码 User(val userName:String,val age:Int){fun printName(){println(userName)}fun getUserName():String{return userName}} let 函数 let 函数常用来与对象的空判断一起用,起到作用于的限定效果。let 函数最后一行返回值。(比如实例需要let函…...

PHP函数的定义与最简单后门原理
PHP函数的定义与最简单后门原理 文章目录 PHP函数的定义与最简单后门原理函数的定义函数调用的过程变量的范围局部变量全局变量 可变函数动态函数 PHP 最简单后门原理分析 函数的定义 使用function关键字来定义一个函数定义函数的函数名避开关键字形式参数是传递映射的实际参数…...

PlantSimulation访问本地Excel文件的方法
PlantSimulation访问本地Excel文件的方法 PlantSimulation访问本地Excel文件的方法PlantSimulation访问本地Excel文件的方法 //Param StatusTable,T_DataTable:object var T_DataTable:object:=DataTable IF NOT isComputerAccessPermittedMESSageBox("计算机访问被阻止,…...

使用微PE工具箱制作winU盘启动盘~重装系统
1.准备一个大于8G的U盘,为了保证传输和安装速度请确保U盘的质量。 2.鼠标右键点击U盘,进行格式化: 3.下载微PE工具箱: 微PE工具箱 - 下载 4.安装微PE工具箱:选择安装到U盘 5.选择U盘后,开始安装…...

漏洞复现-jquery-picture-cut 任意文件上传_(CVE-2018-9208)
jquery-picture-cut 任意文件上传_(CVE-2018-9208) 漏洞信息 jQuery Picture Cut v1.1以下版本中存在安全漏洞CVE-2018-9208文件上传漏洞 描述 picture cut是一个jquery插件,以友好和简单的方式处理图像,具有基于bootstrap…...
Golang Websocket框架:实时通信的新选择
前言 在现代应用程序中,实时通信已经成为了一种必需的特性。而Websocket是一种在客户端和服务器之间建立持久连接的协议,可以实现实时的双向通信。Golang作为一门高效且简洁的语言,也提供了一些优秀的Websocket框架,方便开发者构…...

ExoPlayer架构详解与源码分析(7)——SampleQueue
系列文章目录 ExoPlayer架构详解与源码分析(1)——前言 ExoPlayer架构详解与源码分析(2)——Player ExoPlayer架构详解与源码分析(3)——Timeline ExoPlayer架构详解与源码分析(4)—…...

第二证券:基本面改善预期强化 机构聚焦科技成长
沪指日前迎来“三连涨”。10月26日,上证指数、深证成指和创业板指全部收红,分别收涨0.48%、0.40%、0.65%。此前的两个交易日,上证指数、深证成指也均收涨,创业板指24日涨幅也达到了0.85%。 从近期密布发布的策略报告来看…...

大语言模型在天猫AI导购助理项目的实践!
本文主要介绍了Prompt设计、大语言模型SFT和LLM在手机天猫AI导购助理项目应用。 ChatGPT基本原理 “会说话的AI”,“智能体” 简单概括成以下几个步骤: 预处理文本:ChatGPT的输入文本需要进行预处理。 输入编码:ChatGPT将经过预…...

【STM32】GPIO控制LED(HAL库版)
STM32最新固件库v3.5/Libraries/CMSIS/CM3/DeviceSupport/ST/STM32F10x/system_stm32f10x.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.com) STM32最新固件库v3.5/Libraries/STM32F10x_StdPeriph_Driver/src/stm32f10x_gpio.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...
CMake控制VS2022项目文件分组
我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...

渗透实战PortSwigger靶场:lab13存储型DOM XSS详解
进来是需要留言的,先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码,输入的<>当成字符串处理回显到页面中,看来只是把用户输…...