flink测试map转换函数和process函数
背景
在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点
flink中测试函数
首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:
- OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
- KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
- TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
- KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream
其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子
map函数测试代码:
@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}
process处理函数代码:
@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}
此外附加官方的map函数的测试代码:
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>* <li>RichFunction methods are called correctly* <li>Timestamps of processed elements match the input timestamp* <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}
包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试
参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/
相关文章:
flink测试map转换函数和process函数
背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...

【跟小嘉学习JavaWeb开发】第一章 开发环境搭建
系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...
CSS语法、选择器、属性
1.css语法 * 格式:选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意:* 每一对属性需要使用;隔开,最后一对属性可以不加 2.选择器:筛选具有相似特征的元素 * 分类:1. 基…...

深度学习读取txt训练数据绘制参数曲线图的方法
有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图,有很多训练形成了一个训练log文件,于是需要读取log文件中的内容并绘制成曲线图。 如下实例,有一个log文件的部分截图,需要将其读取出来并绘制曲线图 废话不多说&…...

VB.NET—DataGridView控件教程详解
目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步: 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件,它提供了一个可视化的表格控件,允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...

MCU测试科普|如何进行MCU芯片测试,具体流程是什么?
MCU芯片测试系统是一种专门用于检测MCU芯片性能和质量的综合性设备。它通常由硬件和软件两部分组成,硬件包括测试仪器、适配器、测试夹具等,用于连接被测MCU芯片和测试机,实现高效高精度的测试。软件部分通常包括测试程序、测试管理软件等&am…...
单向循环代码实现cpp
// 单向循环链表 class CircleLink { public:CircleLink(){head_ new Node();tail_ head_;head_->next_ head_;}~CircleLink(){Node* p head_->next_;while (p ! head_){head_->next_ p->next_;delete p;p head_->next_;}delete head_;}public:// 尾插法 …...

【原创】java+jsp+servlet简单图书管理系统设计与实现
摘要: 图书管理系统是一个专门针对图书馆管理而设计的系统,它可以帮助图书管理员有效的对图书进行管理,在图书管理系统的设计中,首先要考虑的是系统的需求分析,该系统的设计与实现涉及多个方面,包括数据库…...

JVM之jinfo虚拟机配置信息工具
jinfo虚拟机配置信息工具 1、jinfo jinfo(Configuration Info for Java)的作用是实时地查看和调整虚拟机的各项参数。 使用jps -v 可以查看虚拟机启动时显示指定的参数列表,但是如果想知道未被显示指定的参数的系统默认值,除 …...

软件测试|PO设计模式在 UI 自动化中的实践
PO的思想最早是2013年由IT大佬Martin Flower提出的:https://martinfowler.com/bliki/PageObject.html 没错,就是他 — 没错,就是他 — 在他的文章里有这样一张经典样图,图片中展示了测试代码中直接操作HTML元素和使用PO模式将page对象封装成…...

如何上传自己的Jar到Maven中央仓库
在项目开发过程中,我们常常会使用 Maven 从仓库拉取开源的第三方 Jar 包。本文将带领大家将自己写好的代码或开源项目发布到 Maven中央仓库中,让其他人可以直接依赖你的 Jar 包,而不需要先下载你的代码后 install 到本地。 注册帐号 点击以…...

智能井盖传感器功能,万宾科技产品介绍
在国家治理方面,对社会的治理是一个重要的领域,一定要在推进社会治理现代化过程中提高市政府的管理和工作能力,推动社会拥有稳定有序的发展。在管理过程中对全市井盖进行统一化管理,可能是市政府比较头疼的难题,如果想…...

洛谷P4185 离线+并查集
好题,发现没有强制在线,可以离线操作 排序之后带集合点数的并查集就好了 #include<bits/stdc.h> using namespace std; const int N 1e510; int n,m; int p[N],sz[N];int find(int x){if(x!p[x])p[x] find(p[x]);return p[x]; } struct Node{in…...
遇到java.security.AccessControlException:access denied怎么办?
今天工作中遇到了如下报错,记录一下解决方案。 目录 问题 分析 结论 问题 这个问题出现在openjdk8启动网页端Java应用。 Java Exception:java.security.AccessControlException:access denied("java.net.SocketPermission""22.188.130.11:9000…...

c++对接CAT1400
最近工作中遇到需要对接1400协议,网上搜索不到c/c++的实现,所以记录一下自己的实现。 第一步注册: 1400是在http摘要认证的基础上做的,所以要去了解http摘要认证的流程 说明: 1.视图库通过用户分配,手动分配username,password给三方对接程序 2.三方对接程序第一次请求由…...

Linux基础【Linux知识贩卖机】
偶尔的停顿和修整,对于人生是非常必要的。 --随记 文章目录 Linux目录目录结构磁盘分区相关命令 相对路径和绝对路径 文件权限用户分类umask创建文件权限计算方法粘滞位 总结 Linux目录 目录结构 Linux 操作系统采用了一种层次化的目录结构,常被称为标…...

CSS 边框、轮廓线
一、CSS边框: CSS边框属性允许指定一个元素边框的样式和颜色。 1)、边框样式:border-style属性用来定义边框的样式,border-style值: 2)、边框宽度:border-width属性用于指定边框宽度。指定变宽…...
Transformer架构 完整的处理流程
Transformer 是由多层的 Encoder 和 Decoder 构成的。每一层的 Encoder 和 Decoder 都包含了多头自注意力机制(Multi-head Self Attention)、前馈神经网络(Feed Forward)和添加及归一化(Add & Norm)。特…...

git and svn 行尾风格配置强制为lf
git CLI配置: // 提交时转换为LF,检出时转换为CRLF git config --global core.autocrlf true // 提交时转换为LF,检出时不转换 git config --global core.autocrlf input // 提交检出均不转换 git config --global core.autocrlf f…...

达梦数据库答案
1、 创建数据库实例,到/dm8/data下,数据库名:DEMO,实例名DEMOSERVER(10分) [dmdbadmServer ~]$ cd /dm8/tool [dmdbadmServer tool]$ ./dbca.sh1、 簇大小32,页大小16,登录密码&…...

安宝特案例丨寻医不再长途跋涉?Vuzix再次以AR技术智能驱动远程医疗
加拿大领先科技公司TeleVU基于Vuzix智能眼镜打造远程医疗生态系统,彻底革新患者护理模式。 安宝特合作伙伴TeleVU成立30余年,沉淀医疗技术、计算机科学与人工智能经验,聚焦医疗保健领域,提供AR、AI、IoT解决方案。 该方案使医疗…...
scan_mode设计原则
scan_mode设计原则 在进行mtp controller设计时,基本功能设计完成后,需要设计scan_mode设计。 1、在进行scan_mode设计时,需要保证mtp处于standby模式,不会有擦写、编程动作。 2、只需要固定mtp datasheet说明的接口即可…...
2025年全国I卷数学压轴题解答
第19题第3问: b b b 使得存在 t t t, 对于任意的 x x x, 5 cos x − cos ( 5 x t ) < b 5\cos x-\cos(5xt)<b 5cosx−cos(5xt)<b, 求 b b b 的最小值. 解: b b b 的最小值 b m i n min t max x g ( x , t ) b_{min}\min_{t} \max_{x} g(x,t) bmi…...
Java严格模式withResolverStyle解析日期错误及解决方案
在Java中使用DateTimeFormatter并启用严格模式(ResolverStyle.STRICT)时,解析日期字符串"2025-06-01"报错的根本原因是:模式字符串中的年份格式yyyy被解释为YearOfEra(纪元年份),而非…...

Modbus转ETHERNET IP网关:快速冷却系统的智能化升级密钥
现代工业自动化系统中,无锡耐特森Modbus转Ethernet IP网关MCN-EN3001扮演着至关重要的角色。通过这一技术,传统的串行通讯协议Modbus得以在更高速、更稳定的以太网环境中运行,为快速冷却系统等关键设施的自动化控制提供了强有力的支撑。快速冷…...
python打卡第48天
知识点回顾: 随机张量的生成:torch.randn函数卷积和池化的计算公式(可以不掌握,会自动计算的)pytorch的广播机制:加法和乘法的广播机制 ps:numpy运算也有类似的广播机制,基本一致 **…...
PHP 表单 - 验证邮件和URL
PHP 表单 - 验证邮件和URL 引言 在Web开发中,表单是用户与网站交互的重要途径。一个功能完善的表单不仅可以收集用户数据,还能提高用户体验。在表单设计中,验证邮件地址和URL是常见的需求。本文将详细介绍如何在PHP中实现邮件和URL的验证&a…...

wsl开启即闪退
[ 问题 ]: 在一次电脑卡住,强制关机重启后,遇到打开WSL就闪退的问题在CMD中打开WSL,出现如上图的描述: C:\Users\admin>wsl wsl: 检测到 localhost 代理配置,但未镜像到 WSL。NAT 模式下的 WSL 不支持…...
数据库(sqlite)基本操作
数据库(sqlite) 一:简介: 为什么需要单独的数据库来进行管理数据? 数据的各种查询功能数据的备份和恢复花大量时间在文件数据的结构设计和维护上要考虑多线程对数据的操作会涉及到同步问题,会增加很多额…...

【websocket】安装与使用
websocket安装与使用 1. 介绍2. 安装3. websocketpp常用接口4. Websocketpp使用4.1 服务端4.2 客户端 1. 介绍 WebSocket 是从 HTML5 开始支持的一种网页端和服务端保持长连接的 消息推送机制。 传统的 web 程序都是属于 “一问一答” 的形式,即客户端给服务器发送…...