flink的KeyedBroadcastProcessFunction测试
背景
我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态
测试KeyedBroadcastProcessFunction类
@Testpublic void testHarnessForKeyedBroadcastProcessFunction() throws Exception {KeyedBroadcastProcessFunction<String, String, String, String> function = new MyKeyedBroadcastProcessFunction();// 键值分区状态final ValueStateDescriptor<String> valueStateDescriptor =new ValueStateDescriptor<>("item", BasicTypeInfo.STRING_TYPE_INFO);// 广播状态final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);KeyedBroadcastOperatorTestHarness<String, String, String, String> harness =ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x -> x,TypeInformation.of(String.class), ruleStateDescriptor);harness.processBroadcastElement("0", 1);harness.processBroadcastElement("000", 2);harness.processElement("1", 10);// 判断键值分区状态(注意这里最好就只是某个key下面,也就是分组key直接设置为x->"固定常数值"即可)ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);Assert.assertEquals(valueState.value(), "1");// 判断广播状态BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);Assert.assertTrue(broadcastState.contains("0"));Assert.assertTrue(broadcastState.contains("000"));// 判断输出的列表Assert.assertEquals(harness.extractOutputValues(), Arrays.asList("0", "000", "1"));}
关键代码:
1.获取键值分区状态
ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);
2.获取广播状态:
BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);
3.工具类
public class ProcessFunctionTestHarnesses {public ProcessFunctionTestHarnesses() {}public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {OneInputStreamOperatorTestHarness<IN, OUT> testHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.setup();testHarness.open();return testHarness;}public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?>... descriptors) throws Exception {BroadcastOperatorTestHarness<IN1, IN2, OUT> testHarness = new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor<?, ?>... descriptors) throws Exception {KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}
}
相关文章:
flink的KeyedBroadcastProcessFunction测试
背景 我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态 测试KeyedBroadcastProcessFunction类 Testpublic void testHarnessForKeyedBroadcastProcessFunction()…...
【pytorch深度学习】torch-张量Tensor
torch-张量Tensor 文章目录 torch-张量Tensor1. 张量Tensor 1. 张量Tensor torch.tensor() # 创建一个标量(0维张量) scalar_tensor torch.tensor(3.14) # 创建一个向量(1维张量) vector_tensor torch.tensor([1, 2, 3]) # 创…...
odoo16前端框架源码阅读——rpc_service.js
odoo16前端框架源码阅读——rpc_service.js 先介绍点背景知识,这样方便阅读代码。 一、 JSONRPC的规范 https://www.jsonrpc.org/specification 中文翻译版本:https://wiki.geekdream.com/Specification/json-rpc_2.0.html JSON-RPC是一个无状态且轻…...
Nat. Med. | 成年人的城市生活环境对心理健康的影响
今天为大家介绍的是来自Jiayuan Xu和Gunter Schumann团队的一篇论文。城市居民暴露于许多可能相互结合和相互作用的环境因素,这些因素可能影响心理健康。目前尚未有工作尝试建模城市生活的复杂实际暴露与大脑和心理健康之间的关系,以及这如何受遗传因素调…...
stm32 WIFI模块_8266使用
使用以上配置可以正常回应,其中无论勾选或者不勾选DTR/RTS都可以得到正常回应 ATCWMODE?表示查询当前WiFi状态是处于热点模式(AP模式)或者是连接其他WiFi的那个模式。通过图片看出这个符号不能省略。 设置AP热点命令格式:ATCWSAP…...
【C/C++】malloc 或者 new 动态分配内存
1. malloc 是一个在 C 语言中用于动态分配内存的函数。 通过 malloc 函数,我们可以在程序运行时请求一定大小的内存块,然后将该内存块用于存储数据。 malloc 函数的声明如下: void* malloc(size_t size);它接受一个参数 size,表…...
如果让你重新开始学 C/C++,你的学习路线会是怎么选择?
1. 第一阶段 学好 C 语言和 Linux 1.1 学好 C 语言 无论你是科班还是非科班,建议你一定要学好 C 语言,它应该作为你必须掌握好的语言。你要熟悉 C 语言的基本语法,包括: 顺序、条件、循环三大控制语句 C 中几大基元数据类型的用…...
PCL安装与使用
1 apt安装 ubuntu20.04及以上版本下可以直接通过apt方式安装pcl编译好的二进制文件,二进制安装的版本为1.10。 sudo apt update sudo apt install libpcl-dev 2 源码安装 在pcl的github上下载对应的版本进行安装: https://github.com/PointCloudLibrary/pcl/rel…...
力扣刷题-二叉树-对称二叉树
101 对称二叉树 给你一个二叉树的根节点 root , 检查它是否轴对称。 示例 1: 输入:root [1,2,2,3,4,4,3] 输出:true 示例 2: 输入:root [1,2,2,null,3,null,3] 输出:false 思路 我的思路…...
常见面试题-计算机网络相关
1.OSI 七层模型? OSI 七层模型:应用层、表示层、会话层、传输层、网络层、数据链路层、物理层 TCP/IP 五层模型:应用层、传输层、网络层、链路层、物理层 应用层 应用层是由网络应用程序使用的,是离用户最近的一层 应用层通过…...
leetcode做题笔记231. 2 的幂
给你一个整数 n,请你判断该整数是否是 2 的幂次方。如果是,返回 true ;否则,返回 false 。 如果存在一个整数 x 使得 n 2x ,则认为 n 是 2 的幂次方。 示例 1: 输入:n 1 输出:tr…...
AI主播“败走”双11,想用AI省成本的商家醒醒吧,程序员不必担心失业,发展空间依旧很大
目录 1 2 3 “AI人”并不算是新鲜事,随着AI的发展,AI主播也开始悄悄进入到直播间中。 持续无间断的直播、比人工费便宜等优势,让很多商家选择了AI主播。 AI主播到底好不好用?终于在今年“双11”现出了原形。 1 AI主播没火过半年…...
◢Django 自写分页与使用
目录 1、设置分页样式,并展示到浏览器 2、模拟页码 3、生成分页 4、数据显示 5、上一页下一页 6、数据库的数据分页 7、封装分页 8、使用封装好的分页 建立好app后,设置路径path(in2/,views.in2),视图def in2(request): ,HTML: in2.html…...
某城高速综合管控大数据大屏可视化【可视化项目案例-04】
🎉🎊🎉 你的技术旅程将在这里启航! 🚀🚀 本文选自专栏:可视化技术专栏100例 可视化技术专栏100例,包括但不限于大屏可视化、图表可视化等等。订阅专栏用户在文章底部可下载对应案例源码以供大家深入的学习研究。 🎓 每一个案例都会提供完整代码和详细的讲解,不…...
如何在Linux下进行文件查看
cat 文本内容显示到终端 head 查看文件开头 tail 查看文件结尾 常用参数 -f 文件内容更新后,显示信息同步更新 wc 统计文件内容信息...
OSG练习:模仿Ventsim制作三维矿井智能通风系统
1、效果 2、计划内容 1) 三维场景的加载显示;已实现 2)矿井巷道建模及纹理;已实现 3)矿井基础数据采集及修正;已实现 4)通风网络解算算法;已实现 5)通风设备及设施模型制作;未实现 6)风流模拟效果 ;进行中 7)火灾模拟效果;未实现 8)巷道属性查看栏;未实现 9)…...
【数据结构】非递归实现二叉树的前 + 中 + 后 + 层序遍历(听说面试会考?)
👦个人主页:Weraphael ✍🏻作者简介:目前学习C和算法 ✈️专栏:数据结构 🐋 希望大家多多支持,咱一起进步!😁 如果文章对你有帮助的话 欢迎 评论💬 点赞&…...
32 Feign性能优化
2.3.Feign使用优化 Feign底层发起http请求,依赖于其它的框架。其底层客户端实现包括: •URLConnection:默认实现,不支持连接池 •Apache HttpClient :支持连接池 •OKHttp:支持连接池 因此提高Feign的…...
星岛专栏|从Web3发展看金融与科技的融合之道
11月起,欧科云链与香港主流媒体星岛集团开设Web3.0安全技术专栏,该专栏主要面向香港从业者、交易机构、监管机构输出专业性的安全合规建议,旨在促进香港Web3.0行业向安全与合规发展。 出品|欧科云链研究院 自2016年首届香港金融…...
什么是网络爬虫?
网络爬虫是一种自动化程序,可以自动地浏览网站并从网站上抽取数据。APP数据抓取实际上也是运用了网络爬虫的技术,只不过抓取的对象不是网站上的信息,而是手机APP上的数据。下面详细介绍APP数据抓取的过程。 1、确定数据需求 首先需要明确要抓…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
Linux简单的操作
ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
