flink对状态ttl进行单元测试
背景
在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确
测试状态ttl超时的单元测试
首先看一下处理函数:
// 处理函数
public class MyStateProcessFunction extends KeyedProcessFunction<String, String, String> {// 键值分区状态ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<String>("previousInput", Types.STRING);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(10, false).build();stateDescriptor.enableTimeToLive(ttlConfig);previousInput = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(100);String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;collector.collect(out);if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉previousInput.update(in);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {if (Objects.nonNull(previousInput.value())) {out.collect(String.format("timer trigger %s", previousInput.value()));} else {out.collect(String.format("timer trigger state clear", previousInput.value()));}}}
单元测试代码:
/*** 测试状态处理函数,包含状态的ttl配置,以及ontimer方法**/
@Test
public void testKeyedStateProcessFunction() throws Exception {MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("hello", 10);// 注册了一个定时器,定时器100后过期Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// 测试输出Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));// 查看下状态应该已经被设置Assert.assertEquals("hello", previousInput.value());testHarness.processElement("world", 10);// 再次测试输出Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());// 再次查看下状态应该已经被设置Assert.assertEquals("world", previousInput.value());// 设置时间为1分钟,让状态超时testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());// 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态testHarness.processElement("NotUpdate1", System.currentTimeMillis());// 查看下状态应该已经被清理Assert.assertNull(previousInput.value());// 设置让定时器过期,顺带确认下状态已经被清理testHarness.setProcessingTime(100);// 测试输出(包含两个输入+一个定时器的输出)Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),testHarness.extractOutputValues());testHarness.close();
}
测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下
相关文章:
flink对状态ttl进行单元测试
背景 在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确 测试状态ttl超时的单元测试 首先看一下处理函数: // 处理函…...

Mac电脑安装打印机驱动
1.在打印机背面找到型号,当想要安装的驱动在官网找不到时可直接搜索该系列:比如MF系列 2.安装完成后需要添加打印机 当打印机和电脑在同一个WiFi下的时候查找打印机IP,输入IP后可以查到对应的打印机,添加后即可使用...

C语言 每日一题 牛客网 11.13 Day17
找零 Z国的货币系统包含面值1元、4元、16元、64元共计4种硬币,以及面值1024元的纸币。 现在小Y使用1024元的纸币购买了一件价值为N(0 < N≤1024)的商品,请问最少他会收到多少硬币? 思路 运用if语句进行判断分类 代码实现 int main() {…...
python读取npy和dat文件信息
前言 python读取.dat 和 .npy 数据 Code import numpy as np def read_dat():print("read data .dat \n")path "./c1_input.dat"data np.fromfile(path, np.float16).reshape(4,38,800)print(fdata :{data}, data shape:{data.shape}, data dtype:{d…...

【Git】第四篇:基本操作(理解工作区、暂存区、版本库)
Git 工作区、暂存区和版本库 工作区:就是我们创建的本地仓库所在的目录暂存区: stage或index,一般放在.git(可隐藏文件)目录下的index文件(.git/index)中,所以我们把暂存区有时候也叫做索引(in…...

Word转PDF简单示例,分别在windows和centos中完成转换
概述 本篇博客以简单的示例代码分别在Windows和Linux环境下完成Word转PDF的文档转换。 文章提供SpringBoot Vue3的示例代码。 文章为什么要分为Windows和Linux环境? 因为在如下提供的Windows后端示例代码中使用documents4j库做转换,此库需要调用命令行…...
推荐收藏!大模型算法工程师面试题来了(附答案)
自 ChatGPT 在去年 11 月底横空出世,大模型的风刮了整一年。 历经了百模大战、Llama 2 开源、GPTs 发布等一系列里程碑事件,将大模型技术推至无可争议的 C 位。基于大模型的研究与讨论,也让我们愈发接近这波技术浪潮的核心。 最近大模型相关…...
线程与进程
文章目录 什么是进程?什么是线程?线程、进程的区别多线程编程 什么是进程? 进程(Process)是计算机中的程序关于数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位。简单来说,进程就…...

SparkSQL之Analyzed LogicalPlan生成过程
经过AstBuilder的处理,得到了Unresolved LogicalPlan。该逻辑算子树中未被解析的有UnresolvedRelation和UnresolvedAttribute两种对象。Analyzer所起到的主要作用就是将这两种节点或表达式解析成有类型的(Typed)对象。在此过程中,…...
Vue的状态管理有哪些?
在Vue中,有多种方式可以进行状态管理,以下是一些常见的Vue状态管理解决方案: 1:Vuex: Vuex是Vue官方提供的状态管理库,用于管理Vue应用程序中的状态。Vuex使用一个单一的全局状态树(state tre…...

1000道精心打磨的计算机考研题,408小伙伴不可错过
难度高! 知识点多! 复习时间短! 不要怕,计算机考研1000题来了! 不是数学考研1000题! 也不是政治考研1000题! 而是专属计算机考研小伙伴的超精选1000题! 计算机考研专业课需要大…...

Flink SQL 表值聚合函数(Table Aggregate Function)详解
使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。 函数功能: 在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值: select max(xxx) from source_table gr…...
pgsql_全文检索_使用空间换时间的方法支持中文搜索
pgsql_全文检索_使用空间换时间的方法支持中文搜索 一、环境 PostgreSQL 14.2, compiled by Visual C build 1914, 64-bit 二、引言 提到全文检索首先想到的就是ES(ElasticSearch)和Lucene,专业且强大。对于一些小众场景对于搜索要求不高,数据量也不…...

OpenGL_Learn10(颜色)
1. 颜色 我们在现实生活中看到某一物体的颜色并不是这个物体真正拥有的颜色,而是它所反射的(Reflected)颜色。换句话说,那些不能被物体所吸收(Absorb)的颜色(被拒绝的颜色)就是我们能够感知到的物体的颜色。例如,太阳光…...

使用Go语言抓取酒店价格数据的技术实现
目录 一、引言 二、准备工作 三、抓取数据 四、数据处理与存储 五、数据分析与可视化 六、结论与展望 一、引言 随着互联网的快速发展,酒店预订已经成为人们出行的重要环节。在选择酒店时,价格是消费者考虑的重要因素之一。因此,抓取酒…...

设计模式1
一、设计模式分类: 1、创建型模式:创建与使用分离,单例、原型、工厂、抽象、建造者。 2、结构型模式:用于描述如何将对象按某种更大的…...
数字人部署之VITS+Wav2lip数据流转处理问题
一、模型 VITS模型训练教程VITS-从零开始微调(finetune)训练并部署指南-支持本地云端 Wav2lip是2D数字人,可参考训练嘴型同步模型Wav2Lip PS:以上模型都是开源可用。 二. VITS数据处理问题 VITS模型的输出为一维的numpy类型数据ÿ…...

RK3568笔记五:基于Yolov5的训练及部署
若该文为原创文章,转载请注明原文出处。 一. 部署概述 环境:Ubuntu20.04、python3.8 芯片:RK3568 芯片系统:buildroot 开发板:ATK-DLRK3568 开发主要参考文档:《Rockchip_Quick_Start_RKNN_Toolkit2_C…...

VR虚拟现实:VR技术如何进行原型制作
VR虚拟现实原型制作 利用VR虚拟现实软件进行原型制作可以用于增强原型测试期间的沉浸感,减少产品设计迭代次数,并将与产品原型制作相关的成本降低40-65%。 VR虚拟现实原型制作市场规模 用于原型制作的虚拟现实 (VR) 市场在 2017 年估计为 2.104 亿美元…...

51单片机入门
一、单片机以及开发板介绍 写在前面:本文为作者自学笔记,课程为哔哩哔哩江协科技51单片机入门教程,感兴趣可以看看,适合普中A2开发板或者HC6800-ESV2.0江协科技课程所用开发板。 工具安装请另行搜索,这里不做介绍&…...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...

【快手拥抱开源】通过快手团队开源的 KwaiCoder-AutoThink-preview 解锁大语言模型的潜力
引言: 在人工智能快速发展的浪潮中,快手Kwaipilot团队推出的 KwaiCoder-AutoThink-preview 具有里程碑意义——这是首个公开的AutoThink大语言模型(LLM)。该模型代表着该领域的重大突破,通过独特方式融合思考与非思考…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级
在互联网的快速发展中,高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司,近期做出了一个重大技术决策:弃用长期使用的 Nginx,转而采用其内部开发…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

学校时钟系统,标准考场时钟系统,AI亮相2025高考,赛思时钟系统为教育公平筑起“精准防线”
2025年#高考 将在近日拉开帷幕,#AI 监考一度冲上热搜。当AI深度融入高考,#时间同步 不再是辅助功能,而是决定AI监考系统成败的“生命线”。 AI亮相2025高考,40种异常行为0.5秒精准识别 2025年高考即将拉开帷幕,江西、…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...

论文阅读:Matting by Generation
今天介绍一篇关于 matting 抠图的文章,抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法,已经有很多的工作和这个任务相关。这两年 diffusion 模型很火,大家又开始用 diffusion 模型做各种 CV 任务了&am…...
WEB3全栈开发——面试专业技能点P7前端与链上集成
一、Next.js技术栈 ✅ 概念介绍 Next.js 是一个基于 React 的 服务端渲染(SSR)与静态网站生成(SSG) 框架,由 Vercel 开发。它简化了构建生产级 React 应用的过程,并内置了很多特性: ✅ 文件系…...
前端调试HTTP状态码
1xx(信息类状态码) 这类状态码表示临时响应,需要客户端继续处理请求。 100 Continue 服务器已收到请求的初始部分,客户端应继续发送剩余部分。 2xx(成功类状态码) 表示请求已成功被服务器接收、理解并处…...