当前位置: 首页 > news >正文

flink对状态ttl进行单元测试

背景

在处理键值分区状态时,使用ttl设置过期时间是我们经常使用的,但是任何代码的修改都需要首先进行单元测试,本文就使用单元测试来验证一下状态ttl的设置是否正确

测试状态ttl超时的单元测试

首先看一下处理函数:

// 处理函数
public class MyStateProcessFunction extends KeyedProcessFunction<String, String, String> {// 键值分区状态ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<String>("previousInput", Types.STRING);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.minutes(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(10, false).build();stateDescriptor.enableTimeToLive(ttlConfig);previousInput = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(100);String out = (Objects.nonNull(previousInput.value()) ? previousInput.value() : "") + in;collector.collect(out);if (!in.contains("NotUpdate")) {// 为了模仿有访问状态,但是不更新状态,正常情况下业务逻辑是访问其他key组的其它state,而一直没有访问的key的状态会在超时时间到之后被清理掉previousInput.update(in);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {if (Objects.nonNull(previousInput.value())) {out.collect(String.format("timer trigger %s", previousInput.value()));} else {out.collect(String.format("timer trigger state clear", previousInput.value()));}}}

单元测试代码:

/*** 测试状态处理函数,包含状态的ttl配置,以及ontimer方法**/
@Test
public void testKeyedStateProcessFunction() throws Exception {MyStateProcessFunction myStateProcessFunction = new MyStateProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myStateProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("hello", 10);// 注册了一个定时器,定时器100后过期Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// 测试输出Assert.assertEquals(Lists.newArrayList("hello"), testHarness.extractOutputValues());ValueState<String> previousInput = myStateProcessFunction.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));// 查看下状态应该已经被设置Assert.assertEquals("hello", previousInput.value());testHarness.processElement("world", 10);// 再次测试输出Assert.assertEquals(Lists.newArrayList("hello", "helloworld"), testHarness.extractOutputValues());// 再次查看下状态应该已经被设置Assert.assertEquals("world", previousInput.value());// 设置时间为1分钟,让状态超时testHarness.setStateTtlProcessingTime(Time.minutes(1).toMilliseconds());// 触发下状态访问,这样flink就会清理,正常生产中不需要这一步,访问状态本来就一直在进行中,只是可能是其他key分组的状态testHarness.processElement("NotUpdate1", System.currentTimeMillis());// 查看下状态应该已经被清理Assert.assertNull(previousInput.value());// 设置让定时器过期,顺带确认下状态已经被清理testHarness.setProcessingTime(100);// 测试输出(包含两个输入+一个定时器的输出)Assert.assertEquals(Lists.newArrayList("hello", "helloworld", "NotUpdate1", "timer trigger state clear"),testHarness.extractOutputValues());testHarness.close();
}

测试代码中已经包含了详细的注解,我们实现自己的ttl单元测试时可以参考下

相关文章:

flink对状态ttl进行单元测试

背景 在处理键值分区状态时&#xff0c;使用ttl设置过期时间是我们经常使用的&#xff0c;但是任何代码的修改都需要首先进行单元测试&#xff0c;本文就使用单元测试来验证一下状态ttl的设置是否正确 测试状态ttl超时的单元测试 首先看一下处理函数&#xff1a; // 处理函…...

Mac电脑安装打印机驱动

1.在打印机背面找到型号&#xff0c;当想要安装的驱动在官网找不到时可直接搜索该系列&#xff1a;比如MF系列 2.安装完成后需要添加打印机 当打印机和电脑在同一个WiFi下的时候查找打印机IP&#xff0c;输入IP后可以查到对应的打印机&#xff0c;添加后即可使用...

C语言 每日一题 牛客网 11.13 Day17

找零 Z国的货币系统包含面值1元、4元、16元、64元共计4种硬币&#xff0c;以及面值1024元的纸币。 现在小Y使用1024元的纸币购买了一件价值为N(0 < N≤1024)的商品&#xff0c;请问最少他会收到多少硬币&#xff1f; 思路 运用if语句进行判断分类 代码实现 int main() {…...

python读取npy和dat文件信息

前言 python读取.dat 和 .npy 数据 Code import numpy as np def read_dat():print("read data .dat \n")path "./c1_input.dat"data np.fromfile(path, np.float16).reshape(4,38,800)print(fdata :{data}, data shape:{data.shape}, data dtype:{d…...

【Git】第四篇:基本操作(理解工作区、暂存区、版本库)

Git 工作区、暂存区和版本库 工作区&#xff1a;就是我们创建的本地仓库所在的目录暂存区&#xff1a; stage或index&#xff0c;一般放在.git(可隐藏文件)目录下的index文件&#xff08;.git/index&#xff09;中&#xff0c;所以我们把暂存区有时候也叫做索引&#xff08;in…...

Word转PDF简单示例,分别在windows和centos中完成转换

概述 本篇博客以简单的示例代码分别在Windows和Linux环境下完成Word转PDF的文档转换。 文章提供SpringBoot Vue3的示例代码。 文章为什么要分为Windows和Linux环境&#xff1f; 因为在如下提供的Windows后端示例代码中使用documents4j库做转换&#xff0c;此库需要调用命令行…...

推荐收藏!大模型算法工程师面试题来了(附答案)

自 ChatGPT 在去年 11 月底横空出世&#xff0c;大模型的风刮了整一年。 历经了百模大战、Llama 2 开源、GPTs 发布等一系列里程碑事件&#xff0c;将大模型技术推至无可争议的 C 位。基于大模型的研究与讨论&#xff0c;也让我们愈发接近这波技术浪潮的核心。 最近大模型相关…...

线程与进程

文章目录 什么是进程&#xff1f;什么是线程&#xff1f;线程、进程的区别多线程编程 什么是进程&#xff1f; 进程&#xff08;Process&#xff09;是计算机中的程序关于数据集合上的一次运行活动&#xff0c;是系统进行资源分配和调度的基本单位。简单来说&#xff0c;进程就…...

SparkSQL之Analyzed LogicalPlan生成过程

经过AstBuilder的处理&#xff0c;得到了Unresolved LogicalPlan。该逻辑算子树中未被解析的有UnresolvedRelation和UnresolvedAttribute两种对象。Analyzer所起到的主要作用就是将这两种节点或表达式解析成有类型的&#xff08;Typed&#xff09;对象。在此过程中&#xff0c;…...

Vue的状态管理有哪些?

在Vue中&#xff0c;有多种方式可以进行状态管理&#xff0c;以下是一些常见的Vue状态管理解决方案&#xff1a; 1&#xff1a;Vuex&#xff1a; Vuex是Vue官方提供的状态管理库&#xff0c;用于管理Vue应用程序中的状态。Vuex使用一个单一的全局状态树&#xff08;state tre…...

1000道精心打磨的计算机考研题,408小伙伴不可错过

难度高&#xff01; 知识点多&#xff01; 复习时间短&#xff01; 不要怕&#xff0c;计算机考研1000题来了&#xff01; 不是数学考研1000题&#xff01; 也不是政治考研1000题&#xff01; 而是专属计算机考研小伙伴的超精选1000题&#xff01; 计算机考研专业课需要大…...

Flink SQL 表值聚合函数(Table Aggregate Function)详解

使用场景&#xff1a; 表值聚合函数即 UDTAF&#xff0c;这个函数⽬前只能在 Table API 中使⽤&#xff0c;不能在 SQL API 中使⽤。 函数功能&#xff1a; 在 SQL 表达式中&#xff0c;如果想对数据先分组再进⾏聚合取值&#xff1a; select max(xxx) from source_table gr…...

pgsql_全文检索_使用空间换时间的方法支持中文搜索

pgsql_全文检索_使用空间换时间的方法支持中文搜索 一、环境 PostgreSQL 14.2, compiled by Visual C build 1914, 64-bit 二、引言 提到全文检索首先想到的就是ES(ElasticSearch)和Lucene&#xff0c;专业且强大。对于一些小众场景对于搜索要求不高&#xff0c;数据量也不…...

OpenGL_Learn10(颜色)

1. 颜色 我们在现实生活中看到某一物体的颜色并不是这个物体真正拥有的颜色&#xff0c;而是它所反射的(Reflected)颜色。换句话说&#xff0c;那些不能被物体所吸收(Absorb)的颜色&#xff08;被拒绝的颜色&#xff09;就是我们能够感知到的物体的颜色。例如&#xff0c;太阳光…...

使用Go语言抓取酒店价格数据的技术实现

目录 一、引言 二、准备工作 三、抓取数据 四、数据处理与存储 五、数据分析与可视化 六、结论与展望 一、引言 随着互联网的快速发展&#xff0c;酒店预订已经成为人们出行的重要环节。在选择酒店时&#xff0c;价格是消费者考虑的重要因素之一。因此&#xff0c;抓取酒…...

设计模式1

![在这里插入图片描述](https://img-blog.csdnimg.cn/c9fbecf1ae89436095885722380ea460.png)一、设计模式分类&#xff1a; 1、创建型模式&#xff1a;创建与使用分离&#xff0c;单例、原型、工厂、抽象、建造者。 2、结构型模式&#xff1a;用于描述如何将对象按某种更大的…...

数字人部署之VITS+Wav2lip数据流转处理问题

一、模型 VITS模型训练教程VITS-从零开始微调&#xff08;finetune&#xff09;训练并部署指南-支持本地云端 Wav2lip是2D数字人&#xff0c;可参考训练嘴型同步模型Wav2Lip PS:以上模型都是开源可用。 二. VITS数据处理问题 VITS模型的输出为一维的numpy类型数据&#xff…...

RK3568笔记五:基于Yolov5的训练及部署

若该文为原创文章&#xff0c;转载请注明原文出处。 一. 部署概述 环境&#xff1a;Ubuntu20.04、python3.8 芯片&#xff1a;RK3568 芯片系统&#xff1a;buildroot 开发板&#xff1a;ATK-DLRK3568 开发主要参考文档&#xff1a;《Rockchip_Quick_Start_RKNN_Toolkit2_C…...

VR虚拟现实:VR技术如何进行原型制作

VR虚拟现实原型制作 利用VR虚拟现实软件进行原型制作可以用于增强原型测试期间的沉浸感&#xff0c;减少产品设计迭代次数&#xff0c;并将与产品原型制作相关的成本降低40-65%。 VR虚拟现实原型制作市场规模 用于原型制作的虚拟现实 (VR) 市场在 2017 年估计为 2.104 亿美元…...

51单片机入门

一、单片机以及开发板介绍 写在前面&#xff1a;本文为作者自学笔记&#xff0c;课程为哔哩哔哩江协科技51单片机入门教程&#xff0c;感兴趣可以看看&#xff0c;适合普中A2开发板或者HC6800-ESV2.0江协科技课程所用开发板。 工具安装请另行搜索&#xff0c;这里不做介绍&…...

新手福音:在快马平台用clawhub编写你的第一个爬虫程序

作为一个刚接触爬虫开发的新手&#xff0c;最近在尝试用clawhub框架写第一个爬虫程序时&#xff0c;发现这个框架对初学者特别友好。特别是在InsCode(快马)平台上&#xff0c;通过简单的描述就能生成结构清晰的示例代码&#xff0c;大大降低了学习门槛。下面分享下我的学习过程…...

微信好友关系智能管理:告别单向社交,重建健康社交网络

微信好友关系智能管理&#xff1a;告别单向社交&#xff0c;重建健康社交网络 【免费下载链接】WechatRealFriends 微信好友关系一键检测&#xff0c;基于微信ipad协议&#xff0c;看看有没有朋友偷偷删掉或者拉黑你 项目地址: https://gitcode.com/gh_mirrors/we/WechatReal…...

AI辅助开发:让快马AI生成能自适应Instagram页面改版的下载脚本

最近在做一个Instagram媒体下载工具时&#xff0c;遇到了一个很头疼的问题&#xff1a;每次Instagram更新页面结构&#xff0c;我的脚本就会失效。后来尝试用InsCode(快马)平台的AI辅助功能&#xff0c;发现可以很好地解决这个问题。今天就来分享一下如何利用AI生成一个能自适应…...

AI赋能开发:让快马平台智能优化与扩展你的openclaw101.dev工具库

AI赋能开发&#xff1a;让快马平台智能优化与扩展你的openclaw101.dev工具库 最近在维护openclaw101.dev项目时&#xff0c;我发现工具函数库需要一次全面的优化升级。传统手动修改不仅耗时&#xff0c;还容易遗漏潜在优化点。这次尝试用InsCode(快马)平台的AI辅助开发功能&am…...

WindowResizer完整指南:如何突破Windows窗口限制自由调整大小

WindowResizer完整指南&#xff1a;如何突破Windows窗口限制自由调整大小 【免费下载链接】WindowResizer 一个可以强制调整应用程序窗口大小的工具 项目地址: https://gitcode.com/gh_mirrors/wi/WindowResizer 你是否曾经遇到过某些应用程序窗口无法调整大小的困扰&am…...

通义千问1.8B-Chat应用案例:智能客服问答系统搭建实战

通义千问1.8B-Chat应用案例&#xff1a;智能客服问答系统搭建实战 1. 引言&#xff1a;用轻量模型解决真实业务问题 如果你正在为搭建一个智能客服系统而烦恼&#xff0c;觉得大模型成本太高、部署太复杂&#xff0c;那么这篇文章就是为你准备的。今天&#xff0c;我要分享一…...

3步彻底解决魔兽争霸3帧率限制:开源插件实战指南

3步彻底解决魔兽争霸3帧率限制&#xff1a;开源插件实战指南 【免费下载链接】WarcraftHelper Warcraft III Helper , support 1.20e, 1.24e, 1.26a, 1.27a, 1.27b 项目地址: https://gitcode.com/gh_mirrors/wa/WarcraftHelper 还在为魔兽争霸3的60FPS帧率限制而烦恼吗…...

【源码深度】Android View绘制流程全解析|吃透measure、layout、draw三大流程与UI卡顿优化|Android全栈体系150讲-10

...

基于视觉AI的智能游戏助手:鸣潮自动化工具全攻略

基于视觉AI的智能游戏助手&#xff1a;鸣潮自动化工具全攻略 【免费下载链接】ok-wuthering-waves 鸣潮 后台自动战斗 自动刷声骸 一键日常 Automation for Wuthering Waves 项目地址: https://gitcode.com/GitHub_Trending/ok/ok-wuthering-waves ok-wuthering-waves是…...

019、无监督学习:聚类分析与降维技术(K-Means, PCA)

上周排查一个嵌入式设备的内存泄漏问题&#xff0c;dump出来的堆内存数据有十几万条记录&#xff0c;肉眼根本看不出规律。后来把每条内存分配记录抽象成&#xff08;分配大小、存活时间、调用栈哈希&#xff09;三个特征&#xff0c;扔进K-Means里跑了三分钟&#xff0c;五个聚…...