当前位置: 首页 > news >正文

【开发篇】一、处理函数:定时器与定时服务

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
long currentProcessingTime();
  • 获取当前的水位线(事件时间)
long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value  每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}

运行:

在这里插入图片描述

4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}

此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)

相关文章:

【开发篇】一、处理函数:定时器与定时服务

文章目录 1、基本处理函数2、定时器和定时服务3、KeyedProcessFunction下演示定时器4、process重获取当前watermark 前面API篇完结&#xff0c;对数据的转换、聚合、窗口等&#xff0c;都是基于DataStream的&#xff0c;称DataStreamAPI&#xff0c;如图&#xff1a; 在Flink…...

重入漏洞EtherStore

重入漏洞 // SPDX-License-Identifier: MIT pragma solidity ^0.8.13;contract EtherStore {mapping(address > uint) public balances;function deposit() public payable {balances[msg.sender] msg.value;}function withdraw() public {uint bal balances[msg.sender]…...

账号运营的底层逻辑---获客思维

什么是运营&#xff1f; 运营是做什么的&#xff1f; 什么是内容运营&#xff1f; 什么是活动运营&#xff1f; 一篇带你搞清楚所有的底层逻辑&#xff01;...

Pinia中如何实现数据持久化操作

使用vue3中的pinia&#xff0c;我们可以在多个页面间共享数据&#xff0c;但是一旦我们关闭或刷新页面&#xff0c;这些数据就会丢失&#xff0c;因此&#xff0c;我们需要有一种数据持久化的解决方案。在记录vue3 使用vue3中的pinia&#xff0c;我们可以在多个页面间共享数据&…...

【owt-server】RTC视频接收调用流程学习笔记1: Call::CreateVideoReceiveStream 前后

WebRTC源码分析——Call模块 大神提到,call模块是在worker线程创建的。主要创建接收、发送流Call模块是WebRTC会话中不可缺少的一个模块,一个Call对象可以包含多个发送/接收流,且这些流对应同一个远端端点,并共享码率估计。 call中通过webrtc::VideoReceiveStream::Config …...

淘宝商品链接获取淘宝商品评论数据(用 Python实现淘宝商品评论信息抓取)

在网页抓取方面&#xff0c;可以使用 Python、Java 等编程语言编写程序&#xff0c;通过模拟 HTTP 请求&#xff0c;获取淘宝多网站上的商品详情页面评论内容。在数据提取方面&#xff0c;可以使用正则表达式、XPath 等方式从 HTML 代码中提取出有用的信息。值得注意的是&#…...

十九、类型信息(1)

本章概要 为什么需要 RTTI RTTI&#xff08;RunTime Type Information&#xff0c;运行时类型信息&#xff09;能够在程序运行时发现和使用类型信息 RTTI 把我们从只能在编译期进行面向类型操作的禁锢中解脱了出来&#xff0c;并且让我们可以使用某些非常强大的程序。对 RTTI …...

十八、字符串(3)

本章概要 正则表达式 基础创建正则表达式量词CharSequencePattern 和 Matcherfinde()组&#xff08;Groups&#xff09;start() 和 end()Pattern 标记split()替换操作reset()正则表达式与 Java I/0 正则表达式 很久之前&#xff0c;_正则表达式_就已经整合到标准 Unix 工具…...

基于SSM的酒店预约及管理系统设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…...

MIxformerV2的onnx和tensorrt加速

MIxformerV2的onnx和tensorrt加速 注意事项 地址&#xff1a;github地址 注意事项 转换成onnx模型之前&#xff0c;最好现简化算法的源代码&#xff0c;使其结构干净。因为在进行onnx转换后&#xff0c;可能在进行onnx→trt时算子不匹配&#xff0c;这时就需要去查看模型的源…...

Kotlin 中let 、run 、with、apply、also的用法与区别

实例代码 User(val userName:String,val age:Int){fun printName(){println(userName)}fun getUserName():String{return userName}} let 函数 let 函数常用来与对象的空判断一起用&#xff0c;起到作用于的限定效果。let 函数最后一行返回值。&#xff08;比如实例需要let函…...

PHP函数的定义与最简单后门原理

PHP函数的定义与最简单后门原理 文章目录 PHP函数的定义与最简单后门原理函数的定义函数调用的过程变量的范围局部变量全局变量 可变函数动态函数 PHP 最简单后门原理分析 函数的定义 使用function关键字来定义一个函数定义函数的函数名避开关键字形式参数是传递映射的实际参数…...

PlantSimulation访问本地Excel文件的方法

PlantSimulation访问本地Excel文件的方法 PlantSimulation访问本地Excel文件的方法PlantSimulation访问本地Excel文件的方法 //Param StatusTable,T_DataTable:object var T_DataTable:object:=DataTable IF NOT isComputerAccessPermittedMESSageBox("计算机访问被阻止,…...

使用微PE工具箱制作winU盘启动盘~重装系统

1.准备一个大于8G的U盘&#xff0c;为了保证传输和安装速度请确保U盘的质量。 2.鼠标右键点击U盘&#xff0c;进行格式化&#xff1a; 3.下载微PE工具箱&#xff1a; 微PE工具箱 - 下载 4.安装微PE工具箱&#xff1a;选择安装到U盘 5.选择U盘后&#xff0c;开始安装&#xf…...

漏洞复现-jquery-picture-cut 任意文件上传_(CVE-2018-9208)

jquery-picture-cut 任意文件上传_&#xff08;CVE-2018-9208&#xff09; 漏洞信息 jQuery Picture Cut v1.1以下版本中存在安全漏洞CVE-2018-9208文件上传漏洞 描述 ​ picture cut是一个jquery插件&#xff0c;以友好和简单的方式处理图像&#xff0c;具有基于bootstrap…...

Golang Websocket框架:实时通信的新选择

前言 在现代应用程序中&#xff0c;实时通信已经成为了一种必需的特性。而Websocket是一种在客户端和服务器之间建立持久连接的协议&#xff0c;可以实现实时的双向通信。Golang作为一门高效且简洁的语言&#xff0c;也提供了一些优秀的Websocket框架&#xff0c;方便开发者构…...

ExoPlayer架构详解与源码分析(7)——SampleQueue

系列文章目录 ExoPlayer架构详解与源码分析&#xff08;1&#xff09;——前言 ExoPlayer架构详解与源码分析&#xff08;2&#xff09;——Player ExoPlayer架构详解与源码分析&#xff08;3&#xff09;——Timeline ExoPlayer架构详解与源码分析&#xff08;4&#xff09;—…...

第二证券:基本面改善预期强化 机构聚焦科技成长

沪指日前迎来“三连涨”。10月26日&#xff0c;上证指数、深证成指和创业板指全部收红&#xff0c;分别收涨0.48%、0.40%、0.65%。此前的两个交易日&#xff0c;上证指数、深证成指也均收涨&#xff0c;创业板指24日涨幅也达到了0.85%。 从近期密布发布的策略报告来看&#xf…...

大语言模型在天猫AI导购助理项目的实践!

本文主要介绍了Prompt设计、大语言模型SFT和LLM在手机天猫AI导购助理项目应用。 ChatGPT基本原理 “会说话的AI”&#xff0c;“智能体” 简单概括成以下几个步骤&#xff1a; 预处理文本&#xff1a;ChatGPT的输入文本需要进行预处理。 输入编码&#xff1a;ChatGPT将经过预…...

【STM32】GPIO控制LED(HAL库版)

STM32最新固件库v3.5/Libraries/CMSIS/CM3/DeviceSupport/ST/STM32F10x/system_stm32f10x.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.com) STM32最新固件库v3.5/Libraries/STM32F10x_StdPeriph_Driver/src/stm32f10x_gpio.c 林何/STM32F103C8 - 码云 - 开源中国 (gitee.…...

终极指南:让macOS Finder视频预览功能焕发新生的QLVideo插件

终极指南&#xff1a;让macOS Finder视频预览功能焕发新生的QLVideo插件 【免费下载链接】QuickLookVideo This package allows macOS Finder to display thumbnails, static QuickLook previews, cover art and metadata for most types of video files. 项目地址: https://…...

快速验证dify部署方案:用快马生成环境检查与部署脚本原型

最近在折腾dify的本地部署&#xff0c;发现环境配置这块特别容易踩坑。作为一个开源AI应用开发平台&#xff0c;dify的部署涉及Python版本、Docker环境、端口占用等一系列依赖项检查&#xff0c;手动操作既繁琐又容易遗漏步骤。正好发现InsCode(快马)平台能快速生成这类工具的原…...

Windows 11 + RTX4060Ti 实战:用PyTorch复现Kaggle冠军的U-Net,搞定Kvasir息肉分割

Windows 11 RTX4060Ti 实战&#xff1a;用PyTorch复现Kaggle冠军的U-Net&#xff0c;搞定Kvasir息肉分割 在消费级硬件上实现专业级医学图像分割并非遥不可及。当RTX 40系列显卡遇上PyTorch框架&#xff0c;配合Kaggle冠军团队的U-Net架构&#xff0c;我们完全可以在Windows 1…...

Midscene.js:重塑UI自动化的革命性AI视觉驱动方案

Midscene.js&#xff1a;重塑UI自动化的革命性AI视觉驱动方案 【免费下载链接】midscene AI-powered, vision-driven UI automation for every platform. 项目地址: https://gitcode.com/GitHub_Trending/mid/midscene 你是否曾为编写复杂的UI自动化脚本而头疼&#xff…...

多任务学习调参新思路:如何让模型自己决定分类和回归任务谁更重要?

多任务学习中的自适应权重分配&#xff1a;让模型学会动态平衡分类与回归任务 想象一下&#xff0c;你正在训练一个自动驾驶系统&#xff0c;它需要同时完成车辆检测&#xff08;分类任务&#xff09;和深度估计&#xff08;回归任务&#xff09;。传统方法中&#xff0c;你需要…...

3个高效步骤掌握Godot PCK解析与资源提取技术

3个高效步骤掌握Godot PCK解析与资源提取技术 【免费下载链接】godot-unpacker godot .pck unpacker 项目地址: https://gitcode.com/gh_mirrors/go/godot-unpacker Godot引擎作为开源游戏开发框架的代表&#xff0c;其特有的PCK资源打包格式为游戏分发提供了便利&#…...

TOAST UI Chart缩放功能完全指南:如何快速实现数据深入探索

TOAST UI Chart缩放功能完全指南&#xff1a;如何快速实现数据深入探索 【免费下载链接】tui.chart &#x1f35e;&#x1f4ca; Beautiful chart for data visualization. 项目地址: https://gitcode.com/gh_mirrors/tu/tui.chart TOAST UI Chart是一款功能强大的数据可…...

用STM32和GP2Y1014AU0F做个空气质量检测仪(附完整代码和接线图)

基于STM32的空气质量检测仪实战开发指南 最近几年&#xff0c;随着人们对健康生活环境的关注度不断提升&#xff0c;空气质量监测设备正从专业领域走向大众消费市场。作为一名嵌入式开发爱好者&#xff0c;我发现市面上的商用检测仪要么价格昂贵&#xff0c;要么功能单一&#…...

HGTector2:微生物基因组水平基因转移检测的完整免费指南

HGTector2&#xff1a;微生物基因组水平基因转移检测的完整免费指南 【免费下载链接】HGTector HGTector2: Genome-wide prediction of horizontal gene transfer based on distribution of sequence homology patterns. 项目地址: https://gitcode.com/gh_mirrors/hg/HGTect…...

5分钟打造现代化Windows提示界面:ModernFlyouts彻底改变你的系统体验

5分钟打造现代化Windows提示界面&#xff1a;ModernFlyouts彻底改变你的系统体验 【免费下载链接】ModernFlyouts A modern Fluent Design replacement for the old Metro themed flyouts present in Windows. 项目地址: https://gitcode.com/gh_mirrors/mo/ModernFlyouts …...