【开发篇】一、处理函数:定时器与定时服务
文章目录
- 1、基本处理函数
- 2、定时器和定时服务
- 3、KeyedProcessFunction下演示定时器
- 4、process重获取当前watermark
前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作
,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑
!!!
1、基本处理函数
处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:
stream.process(new MyProcessFunction())
-
ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction
-
ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型
-
ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}
抽象方法 processElement:
- 定义处理元素的逻辑
- 流中的每个元素都会调用一次这个房啊
- 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据
非抽象方法onTimer:
- 定时器触发时调用这个方法
- 注册定时器即设一个闹钟,
onTimer则是闹钟响了以后要做的事
- onTimer是基于时间线的一个回调方法
- onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)
最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction
关于处理函数的分类:
- 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
- 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction
2、定时器和定时服务
ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:
- 获取当前的处理时间
long currentProcessingTime();
- 获取当前的水位线(事件时间)
long currentWatermark();
- 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
- 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
- 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);
注意:
- 只有在KeyedStream中才支持使用TimerService设置定时器的操作
- TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次
3、KeyedProcessFunction下演示定时器
事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L) //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value 每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}
运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms
,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:
看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发
再用处理时间下的定时器:
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}
运行:
4、process重获取当前watermark
还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:
//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception { // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}
此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个
在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。
上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)
相关文章:

【开发篇】一、处理函数:定时器与定时服务
文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图: 在Flink…...

重入漏洞EtherStore
重入漏洞 // SPDX-License-Identifier: MIT pragma solidity ^0.8.13;contract EtherStore {mapping(address > uint) public balances;function deposit() public payable {balances[msg.sender] msg.value;}function withdraw() public {uint bal balances[msg.sender]…...

账号运营的底层逻辑---获客思维
什么是运营? 运营是做什么的? 什么是内容运营? 什么是活动运营? 一篇带你搞清楚所有的底层逻辑!...

Pinia中如何实现数据持久化操作
使用vue3中的pinia,我们可以在多个页面间共享数据,但是一旦我们关闭或刷新页面,这些数据就会丢失,因此,我们需要有一种数据持久化的解决方案。在记录vue3 使用vue3中的pinia,我们可以在多个页面间共享数据&…...
【owt-server】RTC视频接收调用流程学习笔记1: Call::CreateVideoReceiveStream 前后
WebRTC源码分析——Call模块 大神提到,call模块是在worker线程创建的。主要创建接收、发送流Call模块是WebRTC会话中不可缺少的一个模块,一个Call对象可以包含多个发送/接收流,且这些流对应同一个远端端点,并共享码率估计。 call中通过webrtc::VideoReceiveStream::Config …...

淘宝商品链接获取淘宝商品评论数据(用 Python实现淘宝商品评论信息抓取)
在网页抓取方面,可以使用 Python、Java 等编程语言编写程序,通过模拟 HTTP 请求,获取淘宝多网站上的商品详情页面评论内容。在数据提取方面,可以使用正则表达式、XPath 等方式从 HTML 代码中提取出有用的信息。值得注意的是&#…...

十九、类型信息(1)
本章概要 为什么需要 RTTI RTTI(RunTime Type Information,运行时类型信息)能够在程序运行时发现和使用类型信息 RTTI 把我们从只能在编译期进行面向类型操作的禁锢中解脱了出来,并且让我们可以使用某些非常强大的程序。对 RTTI …...

十八、字符串(3)
本章概要 正则表达式 基础创建正则表达式量词CharSequencePattern 和 Matcherfinde()组(Groups)start() 和 end()Pattern 标记split()替换操作reset()正则表达式与 Java I/0 正则表达式 很久之前,_正则表达式_就已经整合到标准 Unix 工具…...

基于SSM的酒店预约及管理系统设计与实现
末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…...
MIxformerV2的onnx和tensorrt加速
MIxformerV2的onnx和tensorrt加速 注意事项 地址:github地址 注意事项 转换成onnx模型之前,最好现简化算法的源代码,使其结构干净。因为在进行onnx转换后,可能在进行onnx→trt时算子不匹配,这时就需要去查看模型的源…...
Kotlin 中let 、run 、with、apply、also的用法与区别
实例代码 User(val userName:String,val age:Int){fun printName(){println(userName)}fun getUserName():String{return userName}} let 函数 let 函数常用来与对象的空判断一起用,起到作用于的限定效果。let 函数最后一行返回值。(比如实例需要let函…...

PHP函数的定义与最简单后门原理
PHP函数的定义与最简单后门原理 文章目录 PHP函数的定义与最简单后门原理函数的定义函数调用的过程变量的范围局部变量全局变量 可变函数动态函数 PHP 最简单后门原理分析 函数的定义 使用function关键字来定义一个函数定义函数的函数名避开关键字形式参数是传递映射的实际参数…...

PlantSimulation访问本地Excel文件的方法
PlantSimulation访问本地Excel文件的方法 PlantSimulation访问本地Excel文件的方法PlantSimulation访问本地Excel文件的方法 //Param StatusTable,T_DataTable:object var T_DataTable:object:=DataTable IF NOT isComputerAccessPermittedMESSageBox("计算机访问被阻止,…...

使用微PE工具箱制作winU盘启动盘~重装系统
1.准备一个大于8G的U盘,为了保证传输和安装速度请确保U盘的质量。 2.鼠标右键点击U盘,进行格式化: 3.下载微PE工具箱: 微PE工具箱 - 下载 4.安装微PE工具箱:选择安装到U盘 5.选择U盘后,开始安装…...

漏洞复现-jquery-picture-cut 任意文件上传_(CVE-2018-9208)
jquery-picture-cut 任意文件上传_(CVE-2018-9208) 漏洞信息 jQuery Picture Cut v1.1以下版本中存在安全漏洞CVE-2018-9208文件上传漏洞 描述 picture cut是一个jquery插件,以友好和简单的方式处理图像,具有基于bootstrap…...
Golang Websocket框架:实时通信的新选择
前言 在现代应用程序中,实时通信已经成为了一种必需的特性。而Websocket是一种在客户端和服务器之间建立持久连接的协议,可以实现实时的双向通信。Golang作为一门高效且简洁的语言,也提供了一些优秀的Websocket框架,方便开发者构…...

ExoPlayer架构详解与源码分析(7)——SampleQueue
系列文章目录 ExoPlayer架构详解与源码分析(1)——前言 ExoPlayer架构详解与源码分析(2)——Player ExoPlayer架构详解与源码分析(3)——Timeline ExoPlayer架构详解与源码分析(4)—…...

第二证券:基本面改善预期强化 机构聚焦科技成长
沪指日前迎来“三连涨”。10月26日,上证指数、深证成指和创业板指全部收红,分别收涨0.48%、0.40%、0.65%。此前的两个交易日,上证指数、深证成指也均收涨,创业板指24日涨幅也达到了0.85%。 从近期密布发布的策略报告来看…...

大语言模型在天猫AI导购助理项目的实践!
本文主要介绍了Prompt设计、大语言模型SFT和LLM在手机天猫AI导购助理项目应用。 ChatGPT基本原理 “会说话的AI”,“智能体” 简单概括成以下几个步骤: 预处理文本:ChatGPT的输入文本需要进行预处理。 输入编码:ChatGPT将经过预…...

【STM32】GPIO控制LED(HAL库版)
STM32最新固件库v3.5/Libraries/CMSIS/CM3/DeviceSupport/ST/STM32F10x/system_stm32f10x.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.com) STM32最新固件库v3.5/Libraries/STM32F10x_StdPeriph_Driver/src/stm32f10x_gpio.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

【WiFi帧结构】
文章目录 帧结构MAC头部管理帧 帧结构 Wi-Fi的帧分为三部分组成:MAC头部frame bodyFCS,其中MAC是固定格式的,frame body是可变长度。 MAC头部有frame control,duration,address1,address2,addre…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...

使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...

R语言速释制剂QBD解决方案之三
本文是《Quality by Design for ANDAs: An Example for Immediate-Release Dosage Forms》第一个处方的R语言解决方案。 第一个处方研究评估原料药粒径分布、MCC/Lactose比例、崩解剂用量对制剂CQAs的影响。 第二处方研究用于理解颗粒外加硬脂酸镁和滑石粉对片剂质量和可生产…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)
引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...