flink的KeyedBroadcastProcessFunction测试
背景
我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态
测试KeyedBroadcastProcessFunction类
@Testpublic void testHarnessForKeyedBroadcastProcessFunction() throws Exception {KeyedBroadcastProcessFunction<String, String, String, String> function = new MyKeyedBroadcastProcessFunction();// 键值分区状态final ValueStateDescriptor<String> valueStateDescriptor =new ValueStateDescriptor<>("item", BasicTypeInfo.STRING_TYPE_INFO);// 广播状态final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);KeyedBroadcastOperatorTestHarness<String, String, String, String> harness =ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x -> x,TypeInformation.of(String.class), ruleStateDescriptor);harness.processBroadcastElement("0", 1);harness.processBroadcastElement("000", 2);harness.processElement("1", 10);// 判断键值分区状态(注意这里最好就只是某个key下面,也就是分组key直接设置为x->"固定常数值"即可)ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);Assert.assertEquals(valueState.value(), "1");// 判断广播状态BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);Assert.assertTrue(broadcastState.contains("0"));Assert.assertTrue(broadcastState.contains("000"));// 判断输出的列表Assert.assertEquals(harness.extractOutputValues(), Arrays.asList("0", "000", "1"));}
关键代码:
1.获取键值分区状态
ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);
2.获取广播状态:
BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);
3.工具类
public class ProcessFunctionTestHarnesses {public ProcessFunctionTestHarnesses() {}public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {OneInputStreamOperatorTestHarness<IN, OUT> testHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.setup();testHarness.open();return testHarness;}public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?>... descriptors) throws Exception {BroadcastOperatorTestHarness<IN1, IN2, OUT> testHarness = new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor<?, ?>... descriptors) throws Exception {KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}
}
相关文章:
flink的KeyedBroadcastProcessFunction测试
背景 我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态 测试KeyedBroadcastProcessFunction类 Testpublic void testHarnessForKeyedBroadcastProcessFunction()…...
【pytorch深度学习】torch-张量Tensor
torch-张量Tensor 文章目录 torch-张量Tensor1. 张量Tensor 1. 张量Tensor torch.tensor() # 创建一个标量(0维张量) scalar_tensor torch.tensor(3.14) # 创建一个向量(1维张量) vector_tensor torch.tensor([1, 2, 3]) # 创…...
odoo16前端框架源码阅读——rpc_service.js
odoo16前端框架源码阅读——rpc_service.js 先介绍点背景知识,这样方便阅读代码。 一、 JSONRPC的规范 https://www.jsonrpc.org/specification 中文翻译版本:https://wiki.geekdream.com/Specification/json-rpc_2.0.html JSON-RPC是一个无状态且轻…...

Nat. Med. | 成年人的城市生活环境对心理健康的影响
今天为大家介绍的是来自Jiayuan Xu和Gunter Schumann团队的一篇论文。城市居民暴露于许多可能相互结合和相互作用的环境因素,这些因素可能影响心理健康。目前尚未有工作尝试建模城市生活的复杂实际暴露与大脑和心理健康之间的关系,以及这如何受遗传因素调…...

stm32 WIFI模块_8266使用
使用以上配置可以正常回应,其中无论勾选或者不勾选DTR/RTS都可以得到正常回应 ATCWMODE?表示查询当前WiFi状态是处于热点模式(AP模式)或者是连接其他WiFi的那个模式。通过图片看出这个符号不能省略。 设置AP热点命令格式:ATCWSAP…...
【C/C++】malloc 或者 new 动态分配内存
1. malloc 是一个在 C 语言中用于动态分配内存的函数。 通过 malloc 函数,我们可以在程序运行时请求一定大小的内存块,然后将该内存块用于存储数据。 malloc 函数的声明如下: void* malloc(size_t size);它接受一个参数 size,表…...

如果让你重新开始学 C/C++,你的学习路线会是怎么选择?
1. 第一阶段 学好 C 语言和 Linux 1.1 学好 C 语言 无论你是科班还是非科班,建议你一定要学好 C 语言,它应该作为你必须掌握好的语言。你要熟悉 C 语言的基本语法,包括: 顺序、条件、循环三大控制语句 C 中几大基元数据类型的用…...

PCL安装与使用
1 apt安装 ubuntu20.04及以上版本下可以直接通过apt方式安装pcl编译好的二进制文件,二进制安装的版本为1.10。 sudo apt update sudo apt install libpcl-dev 2 源码安装 在pcl的github上下载对应的版本进行安装: https://github.com/PointCloudLibrary/pcl/rel…...

力扣刷题-二叉树-对称二叉树
101 对称二叉树 给你一个二叉树的根节点 root , 检查它是否轴对称。 示例 1: 输入:root [1,2,2,3,4,4,3] 输出:true 示例 2: 输入:root [1,2,2,null,3,null,3] 输出:false 思路 我的思路…...
常见面试题-计算机网络相关
1.OSI 七层模型? OSI 七层模型:应用层、表示层、会话层、传输层、网络层、数据链路层、物理层 TCP/IP 五层模型:应用层、传输层、网络层、链路层、物理层 应用层 应用层是由网络应用程序使用的,是离用户最近的一层 应用层通过…...
leetcode做题笔记231. 2 的幂
给你一个整数 n,请你判断该整数是否是 2 的幂次方。如果是,返回 true ;否则,返回 false 。 如果存在一个整数 x 使得 n 2x ,则认为 n 是 2 的幂次方。 示例 1: 输入:n 1 输出:tr…...

AI主播“败走”双11,想用AI省成本的商家醒醒吧,程序员不必担心失业,发展空间依旧很大
目录 1 2 3 “AI人”并不算是新鲜事,随着AI的发展,AI主播也开始悄悄进入到直播间中。 持续无间断的直播、比人工费便宜等优势,让很多商家选择了AI主播。 AI主播到底好不好用?终于在今年“双11”现出了原形。 1 AI主播没火过半年…...

◢Django 自写分页与使用
目录 1、设置分页样式,并展示到浏览器 2、模拟页码 3、生成分页 4、数据显示 5、上一页下一页 6、数据库的数据分页 7、封装分页 8、使用封装好的分页 建立好app后,设置路径path(in2/,views.in2),视图def in2(request): ,HTML: in2.html…...

某城高速综合管控大数据大屏可视化【可视化项目案例-04】
🎉🎊🎉 你的技术旅程将在这里启航! 🚀🚀 本文选自专栏:可视化技术专栏100例 可视化技术专栏100例,包括但不限于大屏可视化、图表可视化等等。订阅专栏用户在文章底部可下载对应案例源码以供大家深入的学习研究。 🎓 每一个案例都会提供完整代码和详细的讲解,不…...
如何在Linux下进行文件查看
cat 文本内容显示到终端 head 查看文件开头 tail 查看文件结尾 常用参数 -f 文件内容更新后,显示信息同步更新 wc 统计文件内容信息...

OSG练习:模仿Ventsim制作三维矿井智能通风系统
1、效果 2、计划内容 1) 三维场景的加载显示;已实现 2)矿井巷道建模及纹理;已实现 3)矿井基础数据采集及修正;已实现 4)通风网络解算算法;已实现 5)通风设备及设施模型制作;未实现 6)风流模拟效果 ;进行中 7)火灾模拟效果;未实现 8)巷道属性查看栏;未实现 9)…...

【数据结构】非递归实现二叉树的前 + 中 + 后 + 层序遍历(听说面试会考?)
👦个人主页:Weraphael ✍🏻作者简介:目前学习C和算法 ✈️专栏:数据结构 🐋 希望大家多多支持,咱一起进步!😁 如果文章对你有帮助的话 欢迎 评论💬 点赞&…...
32 Feign性能优化
2.3.Feign使用优化 Feign底层发起http请求,依赖于其它的框架。其底层客户端实现包括: •URLConnection:默认实现,不支持连接池 •Apache HttpClient :支持连接池 •OKHttp:支持连接池 因此提高Feign的…...

星岛专栏|从Web3发展看金融与科技的融合之道
11月起,欧科云链与香港主流媒体星岛集团开设Web3.0安全技术专栏,该专栏主要面向香港从业者、交易机构、监管机构输出专业性的安全合规建议,旨在促进香港Web3.0行业向安全与合规发展。 出品|欧科云链研究院 自2016年首届香港金融…...
什么是网络爬虫?
网络爬虫是一种自动化程序,可以自动地浏览网站并从网站上抽取数据。APP数据抓取实际上也是运用了网络爬虫的技术,只不过抓取的对象不是网站上的信息,而是手机APP上的数据。下面详细介绍APP数据抓取的过程。 1、确定数据需求 首先需要明确要抓…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...

Linux nano命令的基本使用
参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时,显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...

在 Spring Boot 中使用 JSP
jsp? 好多年没用了。重新整一下 还费了点时间,记录一下。 项目结构: pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...