flink测试map转换函数和process函数
背景
在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点
flink中测试函数
首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:
- OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
- KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
- TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
- KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream
其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子
map函数测试代码:
@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}
process处理函数代码:
@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}
此外附加官方的map函数的测试代码:
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>* <li>RichFunction methods are called correctly* <li>Timestamps of processed elements match the input timestamp* <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}
包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试
参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/
相关文章:
flink测试map转换函数和process函数
背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...
【跟小嘉学习JavaWeb开发】第一章 开发环境搭建
系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...
CSS语法、选择器、属性
1.css语法 * 格式:选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意:* 每一对属性需要使用;隔开,最后一对属性可以不加 2.选择器:筛选具有相似特征的元素 * 分类:1. 基…...
深度学习读取txt训练数据绘制参数曲线图的方法
有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图,有很多训练形成了一个训练log文件,于是需要读取log文件中的内容并绘制成曲线图。 如下实例,有一个log文件的部分截图,需要将其读取出来并绘制曲线图 废话不多说&…...
VB.NET—DataGridView控件教程详解
目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步: 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件,它提供了一个可视化的表格控件,允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...
MCU测试科普|如何进行MCU芯片测试,具体流程是什么?
MCU芯片测试系统是一种专门用于检测MCU芯片性能和质量的综合性设备。它通常由硬件和软件两部分组成,硬件包括测试仪器、适配器、测试夹具等,用于连接被测MCU芯片和测试机,实现高效高精度的测试。软件部分通常包括测试程序、测试管理软件等&am…...
单向循环代码实现cpp
// 单向循环链表 class CircleLink { public:CircleLink(){head_ new Node();tail_ head_;head_->next_ head_;}~CircleLink(){Node* p head_->next_;while (p ! head_){head_->next_ p->next_;delete p;p head_->next_;}delete head_;}public:// 尾插法 …...
【原创】java+jsp+servlet简单图书管理系统设计与实现
摘要: 图书管理系统是一个专门针对图书馆管理而设计的系统,它可以帮助图书管理员有效的对图书进行管理,在图书管理系统的设计中,首先要考虑的是系统的需求分析,该系统的设计与实现涉及多个方面,包括数据库…...
JVM之jinfo虚拟机配置信息工具
jinfo虚拟机配置信息工具 1、jinfo jinfo(Configuration Info for Java)的作用是实时地查看和调整虚拟机的各项参数。 使用jps -v 可以查看虚拟机启动时显示指定的参数列表,但是如果想知道未被显示指定的参数的系统默认值,除 …...
软件测试|PO设计模式在 UI 自动化中的实践
PO的思想最早是2013年由IT大佬Martin Flower提出的:https://martinfowler.com/bliki/PageObject.html 没错,就是他 — 没错,就是他 — 在他的文章里有这样一张经典样图,图片中展示了测试代码中直接操作HTML元素和使用PO模式将page对象封装成…...
如何上传自己的Jar到Maven中央仓库
在项目开发过程中,我们常常会使用 Maven 从仓库拉取开源的第三方 Jar 包。本文将带领大家将自己写好的代码或开源项目发布到 Maven中央仓库中,让其他人可以直接依赖你的 Jar 包,而不需要先下载你的代码后 install 到本地。 注册帐号 点击以…...
智能井盖传感器功能,万宾科技产品介绍
在国家治理方面,对社会的治理是一个重要的领域,一定要在推进社会治理现代化过程中提高市政府的管理和工作能力,推动社会拥有稳定有序的发展。在管理过程中对全市井盖进行统一化管理,可能是市政府比较头疼的难题,如果想…...
洛谷P4185 离线+并查集
好题,发现没有强制在线,可以离线操作 排序之后带集合点数的并查集就好了 #include<bits/stdc.h> using namespace std; const int N 1e510; int n,m; int p[N],sz[N];int find(int x){if(x!p[x])p[x] find(p[x]);return p[x]; } struct Node{in…...
遇到java.security.AccessControlException:access denied怎么办?
今天工作中遇到了如下报错,记录一下解决方案。 目录 问题 分析 结论 问题 这个问题出现在openjdk8启动网页端Java应用。 Java Exception:java.security.AccessControlException:access denied("java.net.SocketPermission""22.188.130.11:9000…...
c++对接CAT1400
最近工作中遇到需要对接1400协议,网上搜索不到c/c++的实现,所以记录一下自己的实现。 第一步注册: 1400是在http摘要认证的基础上做的,所以要去了解http摘要认证的流程 说明: 1.视图库通过用户分配,手动分配username,password给三方对接程序 2.三方对接程序第一次请求由…...
Linux基础【Linux知识贩卖机】
偶尔的停顿和修整,对于人生是非常必要的。 --随记 文章目录 Linux目录目录结构磁盘分区相关命令 相对路径和绝对路径 文件权限用户分类umask创建文件权限计算方法粘滞位 总结 Linux目录 目录结构 Linux 操作系统采用了一种层次化的目录结构,常被称为标…...
CSS 边框、轮廓线
一、CSS边框: CSS边框属性允许指定一个元素边框的样式和颜色。 1)、边框样式:border-style属性用来定义边框的样式,border-style值: 2)、边框宽度:border-width属性用于指定边框宽度。指定变宽…...
Transformer架构 完整的处理流程
Transformer 是由多层的 Encoder 和 Decoder 构成的。每一层的 Encoder 和 Decoder 都包含了多头自注意力机制(Multi-head Self Attention)、前馈神经网络(Feed Forward)和添加及归一化(Add & Norm)。特…...
git and svn 行尾风格配置强制为lf
git CLI配置: // 提交时转换为LF,检出时转换为CRLF git config --global core.autocrlf true // 提交时转换为LF,检出时不转换 git config --global core.autocrlf input // 提交检出均不转换 git config --global core.autocrlf f…...
达梦数据库答案
1、 创建数据库实例,到/dm8/data下,数据库名:DEMO,实例名DEMOSERVER(10分) [dmdbadmServer ~]$ cd /dm8/tool [dmdbadmServer tool]$ ./dbca.sh1、 簇大小32,页大小16,登录密码&…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...
