【开发篇】一、处理函数:定时器与定时服务
文章目录
- 1、基本处理函数
- 2、定时器和定时服务
- 3、KeyedProcessFunction下演示定时器
- 4、process重获取当前watermark
前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!
1、基本处理函数
处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:
stream.process(new MyProcessFunction())
-
ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction
-
ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型
-
ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}
抽象方法 processElement:
- 定义处理元素的逻辑
- 流中的每个元素都会调用一次这个房啊
- 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据
非抽象方法onTimer:
- 定时器触发时调用这个方法
- 注册定时器即设一个闹钟,
onTimer则是闹钟响了以后要做的事 - onTimer是基于时间线的一个回调方法
- onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)
最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction
关于处理函数的分类:
- 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
- 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction
2、定时器和定时服务
ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:
- 获取当前的处理时间
long currentProcessingTime();
- 获取当前的水位线(事件时间)
long currentWatermark();
- 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
- 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);
注意:
- 只有在KeyedStream中才支持使用TimerService设置定时器的操作
- TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次
3、KeyedProcessFunction下演示定时器
事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L) //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value 每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}
运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

再用处理时间下的定时器:
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}
运行:

4、process重获取当前watermark
还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:
//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}
此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。


上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)
相关文章:
【开发篇】一、处理函数:定时器与定时服务
文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图: 在Flink…...
重入漏洞EtherStore
重入漏洞 // SPDX-License-Identifier: MIT pragma solidity ^0.8.13;contract EtherStore {mapping(address > uint) public balances;function deposit() public payable {balances[msg.sender] msg.value;}function withdraw() public {uint bal balances[msg.sender]…...
账号运营的底层逻辑---获客思维
什么是运营? 运营是做什么的? 什么是内容运营? 什么是活动运营? 一篇带你搞清楚所有的底层逻辑!...
Pinia中如何实现数据持久化操作
使用vue3中的pinia,我们可以在多个页面间共享数据,但是一旦我们关闭或刷新页面,这些数据就会丢失,因此,我们需要有一种数据持久化的解决方案。在记录vue3 使用vue3中的pinia,我们可以在多个页面间共享数据&…...
【owt-server】RTC视频接收调用流程学习笔记1: Call::CreateVideoReceiveStream 前后
WebRTC源码分析——Call模块 大神提到,call模块是在worker线程创建的。主要创建接收、发送流Call模块是WebRTC会话中不可缺少的一个模块,一个Call对象可以包含多个发送/接收流,且这些流对应同一个远端端点,并共享码率估计。 call中通过webrtc::VideoReceiveStream::Config …...
淘宝商品链接获取淘宝商品评论数据(用 Python实现淘宝商品评论信息抓取)
在网页抓取方面,可以使用 Python、Java 等编程语言编写程序,通过模拟 HTTP 请求,获取淘宝多网站上的商品详情页面评论内容。在数据提取方面,可以使用正则表达式、XPath 等方式从 HTML 代码中提取出有用的信息。值得注意的是&#…...
十九、类型信息(1)
本章概要 为什么需要 RTTI RTTI(RunTime Type Information,运行时类型信息)能够在程序运行时发现和使用类型信息 RTTI 把我们从只能在编译期进行面向类型操作的禁锢中解脱了出来,并且让我们可以使用某些非常强大的程序。对 RTTI …...
十八、字符串(3)
本章概要 正则表达式 基础创建正则表达式量词CharSequencePattern 和 Matcherfinde()组(Groups)start() 和 end()Pattern 标记split()替换操作reset()正则表达式与 Java I/0 正则表达式 很久之前,_正则表达式_就已经整合到标准 Unix 工具…...
基于SSM的酒店预约及管理系统设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…...
MIxformerV2的onnx和tensorrt加速
MIxformerV2的onnx和tensorrt加速 注意事项 地址:github地址 注意事项 转换成onnx模型之前,最好现简化算法的源代码,使其结构干净。因为在进行onnx转换后,可能在进行onnx→trt时算子不匹配,这时就需要去查看模型的源…...
Kotlin 中let 、run 、with、apply、also的用法与区别
实例代码 User(val userName:String,val age:Int){fun printName(){println(userName)}fun getUserName():String{return userName}} let 函数 let 函数常用来与对象的空判断一起用,起到作用于的限定效果。let 函数最后一行返回值。(比如实例需要let函…...
PHP函数的定义与最简单后门原理
PHP函数的定义与最简单后门原理 文章目录 PHP函数的定义与最简单后门原理函数的定义函数调用的过程变量的范围局部变量全局变量 可变函数动态函数 PHP 最简单后门原理分析 函数的定义 使用function关键字来定义一个函数定义函数的函数名避开关键字形式参数是传递映射的实际参数…...
PlantSimulation访问本地Excel文件的方法
PlantSimulation访问本地Excel文件的方法 PlantSimulation访问本地Excel文件的方法PlantSimulation访问本地Excel文件的方法 //Param StatusTable,T_DataTable:object var T_DataTable:object:=DataTable IF NOT isComputerAccessPermittedMESSageBox("计算机访问被阻止,…...
使用微PE工具箱制作winU盘启动盘~重装系统
1.准备一个大于8G的U盘,为了保证传输和安装速度请确保U盘的质量。 2.鼠标右键点击U盘,进行格式化: 3.下载微PE工具箱: 微PE工具箱 - 下载 4.安装微PE工具箱:选择安装到U盘 5.选择U盘后,开始安装…...
漏洞复现-jquery-picture-cut 任意文件上传_(CVE-2018-9208)
jquery-picture-cut 任意文件上传_(CVE-2018-9208) 漏洞信息 jQuery Picture Cut v1.1以下版本中存在安全漏洞CVE-2018-9208文件上传漏洞 描述 picture cut是一个jquery插件,以友好和简单的方式处理图像,具有基于bootstrap…...
Golang Websocket框架:实时通信的新选择
前言 在现代应用程序中,实时通信已经成为了一种必需的特性。而Websocket是一种在客户端和服务器之间建立持久连接的协议,可以实现实时的双向通信。Golang作为一门高效且简洁的语言,也提供了一些优秀的Websocket框架,方便开发者构…...
ExoPlayer架构详解与源码分析(7)——SampleQueue
系列文章目录 ExoPlayer架构详解与源码分析(1)——前言 ExoPlayer架构详解与源码分析(2)——Player ExoPlayer架构详解与源码分析(3)——Timeline ExoPlayer架构详解与源码分析(4)—…...
第二证券:基本面改善预期强化 机构聚焦科技成长
沪指日前迎来“三连涨”。10月26日,上证指数、深证成指和创业板指全部收红,分别收涨0.48%、0.40%、0.65%。此前的两个交易日,上证指数、深证成指也均收涨,创业板指24日涨幅也达到了0.85%。 从近期密布发布的策略报告来看…...
大语言模型在天猫AI导购助理项目的实践!
本文主要介绍了Prompt设计、大语言模型SFT和LLM在手机天猫AI导购助理项目应用。 ChatGPT基本原理 “会说话的AI”,“智能体” 简单概括成以下几个步骤: 预处理文本:ChatGPT的输入文本需要进行预处理。 输入编码:ChatGPT将经过预…...
【STM32】GPIO控制LED(HAL库版)
STM32最新固件库v3.5/Libraries/CMSIS/CM3/DeviceSupport/ST/STM32F10x/system_stm32f10x.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.com) STM32最新固件库v3.5/Libraries/STM32F10x_StdPeriph_Driver/src/stm32f10x_gpio.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云
目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...
华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...
云原生安全实战:API网关Kong的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关(API Gateway) API网关是微服务架构中的核心组件,负责统一管理所有API的流量入口。它像一座…...
