flink对状态ttl进行单元测试
背景
在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确
测试状态ttl超时的单元测试
首先看一下处理函数:
// 处理函数
public class MyStateProcessFunction extends KeyedProcessFunction<String, String, String> {// 键值分区状态ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<String>("previousInput", Types.STRING);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(10, false).build();stateDescriptor.enableTimeToLive(ttlConfig);previousInput = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(100);String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;collector.collect(out);if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉previousInput.update(in);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {if (Objects.nonNull(previousInput.value())) {out.collect(String.format("timer trigger %s", previousInput.value()));} else {out.collect(String.format("timer trigger state clear", previousInput.value()));}}}
单元测试代码:
/*** 测试状态处理函数,包含状态的ttl配置,以及ontimer方法**/
@Test
public void testKeyedStateProcessFunction() throws Exception {MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("hello", 10);// 注册了一个定时器,定时器100后过期Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// 测试输出Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));// 查看下状态应该已经被设置Assert.assertEquals("hello", previousInput.value());testHarness.processElement("world", 10);// 再次测试输出Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());// 再次查看下状态应该已经被设置Assert.assertEquals("world", previousInput.value());// 设置时间为1分钟,让状态超时testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());// 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态testHarness.processElement("NotUpdate1", System.currentTimeMillis());// 查看下状态应该已经被清理Assert.assertNull(previousInput.value());// 设置让定时器过期,顺带确认下状态已经被清理testHarness.setProcessingTime(100);// 测试输出(包含两个输入+一个定时器的输出)Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),testHarness.extractOutputValues());testHarness.close();
}
测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下
相关文章:
flink对状态ttl进行单元测试
背景 在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确 测试状态ttl超时的单元测试 首先看一下处理函数: // 处理函…...

Mac电脑安装打印机驱动
1.在打印机背面找到型号,当想要安装的驱动在官网找不到时可直接搜索该系列:比如MF系列 2.安装完成后需要添加打印机 当打印机和电脑在同一个WiFi下的时候查找打印机IP,输入IP后可以查到对应的打印机,添加后即可使用...

C语言 每日一题 牛客网 11.13 Day17
找零 Z国的货币系统包含面值1元、4元、16元、64元共计4种硬币,以及面值1024元的纸币。 现在小Y使用1024元的纸币购买了一件价值为N(0 < N≤1024)的商品,请问最少他会收到多少硬币? 思路 运用if语句进行判断分类 代码实现 int main() {…...
python读取npy和dat文件信息
前言 python读取.dat 和 .npy 数据 Code import numpy as np def read_dat():print("read data .dat \n")path "./c1_input.dat"data np.fromfile(path, np.float16).reshape(4,38,800)print(fdata :{data}, data shape:{data.shape}, data dtype:{d…...

【Git】第四篇:基本操作(理解工作区、暂存区、版本库)
Git 工作区、暂存区和版本库 工作区:就是我们创建的本地仓库所在的目录暂存区: stage或index,一般放在.git(可隐藏文件)目录下的index文件(.git/index)中,所以我们把暂存区有时候也叫做索引(in…...

Word转PDF简单示例,分别在windows和centos中完成转换
概述 本篇博客以简单的示例代码分别在Windows和Linux环境下完成Word转PDF的文档转换。 文章提供SpringBoot Vue3的示例代码。 文章为什么要分为Windows和Linux环境? 因为在如下提供的Windows后端示例代码中使用documents4j库做转换,此库需要调用命令行…...
推荐收藏!大模型算法工程师面试题来了(附答案)
自 ChatGPT 在去年 11 月底横空出世,大模型的风刮了整一年。 历经了百模大战、Llama 2 开源、GPTs 发布等一系列里程碑事件,将大模型技术推至无可争议的 C 位。基于大模型的研究与讨论,也让我们愈发接近这波技术浪潮的核心。 最近大模型相关…...
线程与进程
文章目录 什么是进程?什么是线程?线程、进程的区别多线程编程 什么是进程? 进程(Process)是计算机中的程序关于数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位。简单来说,进程就…...

SparkSQL之Analyzed LogicalPlan生成过程
经过AstBuilder的处理,得到了Unresolved LogicalPlan。该逻辑算子树中未被解析的有UnresolvedRelation和UnresolvedAttribute两种对象。Analyzer所起到的主要作用就是将这两种节点或表达式解析成有类型的(Typed)对象。在此过程中,…...
Vue的状态管理有哪些?
在Vue中,有多种方式可以进行状态管理,以下是一些常见的Vue状态管理解决方案: 1:Vuex: Vuex是Vue官方提供的状态管理库,用于管理Vue应用程序中的状态。Vuex使用一个单一的全局状态树(state tre…...

1000道精心打磨的计算机考研题,408小伙伴不可错过
难度高! 知识点多! 复习时间短! 不要怕,计算机考研1000题来了! 不是数学考研1000题! 也不是政治考研1000题! 而是专属计算机考研小伙伴的超精选1000题! 计算机考研专业课需要大…...

Flink SQL 表值聚合函数(Table Aggregate Function)详解
使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。 函数功能: 在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值: select max(xxx) from source_table gr…...
pgsql_全文检索_使用空间换时间的方法支持中文搜索
pgsql_全文检索_使用空间换时间的方法支持中文搜索 一、环境 PostgreSQL 14.2, compiled by Visual C build 1914, 64-bit 二、引言 提到全文检索首先想到的就是ES(ElasticSearch)和Lucene,专业且强大。对于一些小众场景对于搜索要求不高,数据量也不…...

OpenGL_Learn10(颜色)
1. 颜色 我们在现实生活中看到某一物体的颜色并不是这个物体真正拥有的颜色,而是它所反射的(Reflected)颜色。换句话说,那些不能被物体所吸收(Absorb)的颜色(被拒绝的颜色)就是我们能够感知到的物体的颜色。例如,太阳光…...

使用Go语言抓取酒店价格数据的技术实现
目录 一、引言 二、准备工作 三、抓取数据 四、数据处理与存储 五、数据分析与可视化 六、结论与展望 一、引言 随着互联网的快速发展,酒店预订已经成为人们出行的重要环节。在选择酒店时,价格是消费者考虑的重要因素之一。因此,抓取酒…...

设计模式1
一、设计模式分类: 1、创建型模式:创建与使用分离,单例、原型、工厂、抽象、建造者。 2、结构型模式:用于描述如何将对象按某种更大的…...
数字人部署之VITS+Wav2lip数据流转处理问题
一、模型 VITS模型训练教程VITS-从零开始微调(finetune)训练并部署指南-支持本地云端 Wav2lip是2D数字人,可参考训练嘴型同步模型Wav2Lip PS:以上模型都是开源可用。 二. VITS数据处理问题 VITS模型的输出为一维的numpy类型数据ÿ…...

RK3568笔记五:基于Yolov5的训练及部署
若该文为原创文章,转载请注明原文出处。 一. 部署概述 环境:Ubuntu20.04、python3.8 芯片:RK3568 芯片系统:buildroot 开发板:ATK-DLRK3568 开发主要参考文档:《Rockchip_Quick_Start_RKNN_Toolkit2_C…...

VR虚拟现实:VR技术如何进行原型制作
VR虚拟现实原型制作 利用VR虚拟现实软件进行原型制作可以用于增强原型测试期间的沉浸感,减少产品设计迭代次数,并将与产品原型制作相关的成本降低40-65%。 VR虚拟现实原型制作市场规模 用于原型制作的虚拟现实 (VR) 市场在 2017 年估计为 2.104 亿美元…...

51单片机入门
一、单片机以及开发板介绍 写在前面:本文为作者自学笔记,课程为哔哩哔哩江协科技51单片机入门教程,感兴趣可以看看,适合普中A2开发板或者HC6800-ESV2.0江协科技课程所用开发板。 工具安装请另行搜索,这里不做介绍&…...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...

深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”
目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
Go 语言并发编程基础:无缓冲与有缓冲通道
在上一章节中,我们了解了 Channel 的基本用法。本章将重点分析 Go 中通道的两种类型 —— 无缓冲通道与有缓冲通道,它们在并发编程中各具特点和应用场景。 一、通道的基本分类 类型定义形式特点无缓冲通道make(chan T)发送和接收都必须准备好࿰…...

GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...

uniapp 开发ios, xcode 提交app store connect 和 testflight内测
uniapp 中配置 配置manifest 文档:manifest.json 应用配置 | uni-app官网 hbuilderx中本地打包 下载IOS最新SDK 开发环境 | uni小程序SDK hbulderx 版本号:4.66 对应的sdk版本 4.66 两者必须一致 本地打包的资源导入到SDK 导入资源 | uni小程序SDK …...