当前位置: 首页 > news >正文

flink的KeyedBroadcastProcessFunction测试

背景

我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态

测试KeyedBroadcastProcessFunction类

    @Testpublic void testHarnessForKeyedBroadcastProcessFunction() throws Exception {KeyedBroadcastProcessFunction<String, String, String, String> function = new MyKeyedBroadcastProcessFunction();// 键值分区状态final ValueStateDescriptor<String> valueStateDescriptor =new ValueStateDescriptor<>("item", BasicTypeInfo.STRING_TYPE_INFO);// 广播状态final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);KeyedBroadcastOperatorTestHarness<String, String, String, String> harness =ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x -> x,TypeInformation.of(String.class), ruleStateDescriptor);harness.processBroadcastElement("0", 1);harness.processBroadcastElement("000", 2);harness.processElement("1", 10);// 判断键值分区状态(注意这里最好就只是某个key下面,也就是分组key直接设置为x->"固定常数值"即可)ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);Assert.assertEquals(valueState.value(), "1");// 判断广播状态BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);Assert.assertTrue(broadcastState.contains("0"));Assert.assertTrue(broadcastState.contains("000"));// 判断输出的列表Assert.assertEquals(harness.extractOutputValues(), Arrays.asList("0", "000", "1"));}

关键代码:
1.获取键值分区状态

ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);

2.获取广播状态:

BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);

3.工具类

public class ProcessFunctionTestHarnesses {public ProcessFunctionTestHarnesses() {}public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {OneInputStreamOperatorTestHarness<IN, OUT> testHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.setup();testHarness.open();return testHarness;}public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?>... descriptors) throws Exception {BroadcastOperatorTestHarness<IN1, IN2, OUT> testHarness = new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor<?, ?>... descriptors) throws Exception {KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}
}

相关文章:

flink的KeyedBroadcastProcessFunction测试

背景 我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试&#xff0c;以确保上线之前这个函数的功能是正常的&#xff0c;包括里面的广播状态和键值分区状态 测试KeyedBroadcastProcessFunction类 Testpublic void testHarnessForKeyedBroadcastProcessFunction()…...

【pytorch深度学习】torch-张量Tensor

torch-张量Tensor 文章目录 torch-张量Tensor1. 张量Tensor 1. 张量Tensor torch.tensor() # 创建一个标量&#xff08;0维张量&#xff09; scalar_tensor torch.tensor(3.14) # 创建一个向量&#xff08;1维张量&#xff09; vector_tensor torch.tensor([1, 2, 3]) # 创…...

odoo16前端框架源码阅读——rpc_service.js

odoo16前端框架源码阅读——rpc_service.js 先介绍点背景知识&#xff0c;这样方便阅读代码。 一、 JSONRPC的规范 https://www.jsonrpc.org/specification 中文翻译版本&#xff1a;https://wiki.geekdream.com/Specification/json-rpc_2.0.html JSON-RPC是一个无状态且轻…...

Nat. Med. | 成年人的城市生活环境对心理健康的影响

今天为大家介绍的是来自Jiayuan Xu和Gunter Schumann团队的一篇论文。城市居民暴露于许多可能相互结合和相互作用的环境因素&#xff0c;这些因素可能影响心理健康。目前尚未有工作尝试建模城市生活的复杂实际暴露与大脑和心理健康之间的关系&#xff0c;以及这如何受遗传因素调…...

stm32 WIFI模块_8266使用

使用以上配置可以正常回应&#xff0c;其中无论勾选或者不勾选DTR/RTS都可以得到正常回应 ATCWMODE?表示查询当前WiFi状态是处于热点模式&#xff08;AP模式&#xff09;或者是连接其他WiFi的那个模式。通过图片看出这个符号不能省略。 设置AP热点命令格式&#xff1a;ATCWSAP…...

【C/C++】malloc 或者 new 动态分配内存

1. malloc 是一个在 C 语言中用于动态分配内存的函数。 通过 malloc 函数&#xff0c;我们可以在程序运行时请求一定大小的内存块&#xff0c;然后将该内存块用于存储数据。 malloc 函数的声明如下&#xff1a; void* malloc(size_t size);它接受一个参数 size&#xff0c;表…...

如果让你重新开始学 C/C++,你的学习路线会是怎么选择?

1. 第一阶段 学好 C 语言和 Linux 1.1 学好 C 语言 无论你是科班还是非科班&#xff0c;建议你一定要学好 C 语言&#xff0c;它应该作为你必须掌握好的语言。你要熟悉 C 语言的基本语法&#xff0c;包括&#xff1a; 顺序、条件、循环三大控制语句 C 中几大基元数据类型的用…...

PCL安装与使用

1 apt安装 ubuntu20.04及以上版本下可以直接通过apt方式安装pcl编译好的二进制文件,二进制安装的版本为1.10。 sudo apt update sudo apt install libpcl-dev 2 源码安装 在pcl的github上下载对应的版本进行安装&#xff1a; https://github.com/PointCloudLibrary/pcl/rel…...

力扣刷题-二叉树-对称二叉树

101 对称二叉树 给你一个二叉树的根节点 root &#xff0c; 检查它是否轴对称。 示例 1&#xff1a; 输入&#xff1a;root [1,2,2,3,4,4,3] 输出&#xff1a;true 示例 2&#xff1a; 输入&#xff1a;root [1,2,2,null,3,null,3] 输出&#xff1a;false 思路 我的思路…...

常见面试题-计算机网络相关

1.OSI 七层模型&#xff1f; OSI 七层模型&#xff1a;应用层、表示层、会话层、传输层、网络层、数据链路层、物理层 TCP/IP 五层模型&#xff1a;应用层、传输层、网络层、链路层、物理层 应用层 应用层是由网络应用程序使用的&#xff0c;是离用户最近的一层 应用层通过…...

leetcode做题笔记231. 2 的幂

给你一个整数 n&#xff0c;请你判断该整数是否是 2 的幂次方。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 如果存在一个整数 x 使得 n 2x &#xff0c;则认为 n 是 2 的幂次方。 示例 1&#xff1a; 输入&#xff1a;n 1 输出&#xff1a;tr…...

AI主播“败走”双11,想用AI省成本的商家醒醒吧,程序员不必担心失业,发展空间依旧很大

目录 1 2 3 “AI人”并不算是新鲜事&#xff0c;随着AI的发展&#xff0c;AI主播也开始悄悄进入到直播间中。 持续无间断的直播、比人工费便宜等优势&#xff0c;让很多商家选择了AI主播。 AI主播到底好不好用&#xff1f;终于在今年“双11”现出了原形。 1 AI主播没火过半年…...

◢Django 自写分页与使用

目录 1、设置分页样式,并展示到浏览器 2、模拟页码 3、生成分页 4、数据显示 5、上一页下一页 6、数据库的数据分页 7、封装分页 8、使用封装好的分页 建立好app后&#xff0c;设置路径path(in2/,views.in2)&#xff0c;视图def in2(request): &#xff0c;HTML: in2.html…...

某城高速综合管控大数据大屏可视化【可视化项目案例-04】

🎉🎊🎉 你的技术旅程将在这里启航! 🚀🚀 本文选自专栏:可视化技术专栏100例 可视化技术专栏100例,包括但不限于大屏可视化、图表可视化等等。订阅专栏用户在文章底部可下载对应案例源码以供大家深入的学习研究。 🎓 每一个案例都会提供完整代码和详细的讲解,不…...

如何在Linux下进行文件查看

cat 文本内容显示到终端 head 查看文件开头 tail 查看文件结尾 常用参数 -f 文件内容更新后&#xff0c;显示信息同步更新 wc 统计文件内容信息...

OSG练习:模仿Ventsim制作三维矿井智能通风系统

1、效果 2、计划内容 1) 三维场景的加载显示;已实现 2)矿井巷道建模及纹理;已实现 3)矿井基础数据采集及修正;已实现 4)通风网络解算算法;已实现 5)通风设备及设施模型制作;未实现 6)风流模拟效果 ;进行中 7)火灾模拟效果;未实现 8)巷道属性查看栏;未实现 9)…...

【数据结构】非递归实现二叉树的前 + 中 + 后 + 层序遍历(听说面试会考?)

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;数据结构 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&…...

32 Feign性能优化

2.3.Feign使用优化 Feign底层发起http请求&#xff0c;依赖于其它的框架。其底层客户端实现包括&#xff1a; •URLConnection&#xff1a;默认实现&#xff0c;不支持连接池 •Apache HttpClient &#xff1a;支持连接池 •OKHttp&#xff1a;支持连接池 因此提高Feign的…...

星岛专栏|从Web3发展看金融与科技的融合之道

11月起&#xff0c;欧科云链与香港主流媒体星岛集团开设Web3.0安全技术专栏&#xff0c;该专栏主要面向香港从业者、交易机构、监管机构输出专业性的安全合规建议&#xff0c;旨在促进香港Web3.0行业向安全与合规发展。 出品&#xff5c;欧科云链研究院 自2016年首届香港金融…...

什么是网络爬虫?

网络爬虫是一种自动化程序&#xff0c;可以自动地浏览网站并从网站上抽取数据。APP数据抓取实际上也是运用了网络爬虫的技术&#xff0c;只不过抓取的对象不是网站上的信息&#xff0c;而是手机APP上的数据。下面详细介绍APP数据抓取的过程。 1、确定数据需求 首先需要明确要抓…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility

Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

中医有效性探讨

文章目录 西医是如何发展到以生物化学为药理基础的现代医学&#xff1f;传统医学奠基期&#xff08;远古 - 17 世纪&#xff09;近代医学转型期&#xff08;17 世纪 - 19 世纪末&#xff09;​现代医学成熟期&#xff08;20世纪至今&#xff09; 中医的源远流长和一脉相承远古至…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...