当前位置: 首页 > news >正文

flink测试map转换函数和process函数

背景

在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点

flink中测试函数

首先我们根据我们要测试的是数据流的类型选择不同的测试套件,如下所示:

  1. OneInputStreamOperatorTestHarness:适用于 DataStreams 数据流
  2. KeyedOneInputStreamOperatorTestHarness:适用于 KeyedStreams 分组后的数据流
  3. TwoInputStreamOperatorTestHarness:适用于两个数据流DataStream的 ConnectedStream
  4. KeyedTwoInputStreamOperatorTestHarness:适用于两个 KeyedStream 的 ConnectedStream

其次,根据是测试map函数还是process函数,我们选择不同的操作符,如果是map函数我们选择StreamFlatMap算子(可同时处理FlatMap和带状态的RichFlatmap函数)还是ProcessFunctionTestHarnesses.forXX算子

map函数测试代码:

@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();// OneInputStreamOperatorTestHarness takes the input and output types as type parametersOneInputStreamOperatorTestHarness<String, String> testHarness =// KeyedOneInputStreamOperatorTestHarness takes three arguments:// Flink operator object, key selector and key typenew KeyedOneInputStreamOperatorTestHarness<String, String, String>(new StreamFlatMap<>(statefulFlatMap),x -> "1", Types.STRING);testHarness.open();// test first recordtestHarness.processElement("world", 10);ValueState<String> previousInput =statefulFlatMap.getRuntimeContext().getState(new ValueStateDescriptor<>("previousInput", Types.STRING));String stateValue = previousInput.value();Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());Assert.assertEquals("world", stateValue);// test second recordtestHarness.processElement("parallel", 20);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("hello parallel world", 20)), testHarness.extractOutputStreamRecords());Assert.assertEquals("parallel", previousInput.value());}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
}

process处理函数代码:

@Testpublic void testProcessElement() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);// Function time is initialized to 0testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10)),testHarness.extractOutputStreamRecords());}@Testpublic void testOnTimer() throws Exception {MyProcessFunction myProcessFunction = new MyProcessFunction();OneInputStreamOperatorTestHarness<String, String> testHarness =ProcessFunctionTestHarnesses.forKeyedProcessFunction(myProcessFunction, x -> "1", Types.STRING);testHarness.open();testHarness.processElement("world", 10);Assert.assertEquals(1, testHarness.numProcessingTimeTimers());// Function time is set to 50testHarness.setProcessingTime(50);Assert.assertEquals(Lists.newArrayList(new StreamRecord<>("hello world", 10),new StreamRecord<>("Timer triggered at timestamp 50")),testHarness.extractOutputStreamRecords());}public class MyProcessFunction extends KeyedProcessFunction<String, String, String> {@Overridepublic void processElement(String in, Context context, Collector<String> collector) throws Exception {context.timerService().registerProcessingTimeTimer(50);String out = "hello " + in;collector.collect(out);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {out.collect(String.format("Timer triggered at timestamp %d", timestamp));}}

此外附加官方的map函数的测试代码:

/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.flink.streaming.api.operators;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;import org.junit.Assert;
import org.junit.Test;import java.util.concurrent.ConcurrentLinkedQueue;/*** Tests for {@link StreamMap}. These test that:** <ul>*   <li>RichFunction methods are called correctly*   <li>Timestamps of processed elements match the input timestamp*   <li>Watermarks are correctly forwarded* </ul>*/
public class StreamFlatMapTest {private static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {private static final long serialVersionUID = 1L;@Overridepublic void flatMap(Integer value, Collector<Integer> out) throws Exception {if (value % 2 == 0) {out.collect(value);out.collect(value * value);}}}@Testpublic void testFlatMap() throws Exception {StreamFlatMap<Integer, Integer> operator =new StreamFlatMap<Integer, Integer>(new MyFlatMap());OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);long initialTime = 0L;ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();testHarness.open();testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));testHarness.processWatermark(new Watermark(initialTime + 2));testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));expectedOutput.add(new Watermark(initialTime + 2));expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());}@Testpublic void testOpenClose() throws Exception {StreamFlatMap<String, String> operator =new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());OneInputStreamOperatorTestHarness<String, String> testHarness =new OneInputStreamOperatorTestHarness<String, String>(operator);long initialTime = 0L;testHarness.open();testHarness.processElement(new StreamRecord<String>("Hello", initialTime));testHarness.close();Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);}// This must only be used in one test, otherwise the static fields will be changed// by several tests concurrentlyprivate static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {private static final long serialVersionUID = 1L;public static boolean openCalled = false;public static boolean closeCalled = false;@Overridepublic void open(OpenContext openContext) throws Exception {super.open(openContext);if (closeCalled) {Assert.fail("Close called before open.");}openCalled = true;}@Overridepublic void close() throws Exception {super.close();if (!openCalled) {Assert.fail("Open was not called before close.");}closeCalled = true;}@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {if (!openCalled) {Assert.fail("Open was not called before run.");}out.collect(value);}}
}

包含同时测试FlatMap和RichFlatMap函数,但是其中没有操作状态,我前面的例子包含了RichFlatMap状态的测试

参考文献:
https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/

相关文章:

flink测试map转换函数和process函数

背景 在flink中&#xff0c;我们需要对我们写的map转换函数&#xff0c;process处理函数进行单元测试&#xff0c;测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新&#xff0c;本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...

【跟小嘉学习JavaWeb开发】第一章 开发环境搭建

系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...

CSS语法、选择器、属性

1.css语法 * 格式&#xff1a;选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意&#xff1a;* 每一对属性需要使用&#xff1b;隔开&#xff0c;最后一对属性可以不加 2.选择器&#xff1a;筛选具有相似特征的元素 * 分类&#xff1a;1. 基…...

深度学习读取txt训练数据绘制参数曲线图的方法

有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图&#xff0c;有很多训练形成了一个训练log文件&#xff0c;于是需要读取log文件中的内容并绘制成曲线图。 如下实例&#xff0c;有一个log文件的部分截图&#xff0c;需要将其读取出来并绘制曲线图 废话不多说&…...

VB.NET—DataGridView控件教程详解

目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步&#xff1a; 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件&#xff0c;它提供了一个可视化的表格控件&#xff0c;允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...

MCU测试科普|如何进行MCU芯片测试,具体流程是什么?

MCU芯片测试系统是一种专门用于检测MCU芯片性能和质量的综合性设备。它通常由硬件和软件两部分组成&#xff0c;硬件包括测试仪器、适配器、测试夹具等&#xff0c;用于连接被测MCU芯片和测试机&#xff0c;实现高效高精度的测试。软件部分通常包括测试程序、测试管理软件等&am…...

单向循环代码实现cpp

// 单向循环链表 class CircleLink { public:CircleLink(){head_ new Node();tail_ head_;head_->next_ head_;}~CircleLink(){Node* p head_->next_;while (p ! head_){head_->next_ p->next_;delete p;p head_->next_;}delete head_;}public:// 尾插法 …...

【原创】java+jsp+servlet简单图书管理系统设计与实现

摘要&#xff1a; 图书管理系统是一个专门针对图书馆管理而设计的系统&#xff0c;它可以帮助图书管理员有效的对图书进行管理&#xff0c;在图书管理系统的设计中&#xff0c;首先要考虑的是系统的需求分析&#xff0c;该系统的设计与实现涉及多个方面&#xff0c;包括数据库…...

JVM之jinfo虚拟机配置信息工具

jinfo虚拟机配置信息工具 1、jinfo jinfo&#xff08;Configuration Info for Java&#xff09;的作用是实时地查看和调整虚拟机的各项参数。 使用jps -v 可以查看虚拟机启动时显示指定的参数列表&#xff0c;但是如果想知道未被显示指定的参数的系统默认值&#xff0c;除 …...

软件测试|PO设计模式在 UI 自动化中的实践

PO的思想最早是2013年由IT大佬Martin Flower提出的&#xff1a;https://martinfowler.com/bliki/PageObject.html 没错&#xff0c;就是他 — 没错&#xff0c;就是他 — 在他的文章里有这样一张经典样图,图片中展示了测试代码中直接操作HTML元素和使用PO模式将page对象封装成…...

如何上传自己的Jar到Maven中央仓库

在项目开发过程中&#xff0c;我们常常会使用 Maven 从仓库拉取开源的第三方 Jar 包。本文将带领大家将自己写好的代码或开源项目发布到 Maven中央仓库中&#xff0c;让其他人可以直接依赖你的 Jar 包&#xff0c;而不需要先下载你的代码后 install 到本地。 注册帐号 点击以…...

智能井盖传感器功能,万宾科技产品介绍

在国家治理方面&#xff0c;对社会的治理是一个重要的领域&#xff0c;一定要在推进社会治理现代化过程中提高市政府的管理和工作能力&#xff0c;推动社会拥有稳定有序的发展。在管理过程中对全市井盖进行统一化管理&#xff0c;可能是市政府比较头疼的难题&#xff0c;如果想…...

洛谷P4185 离线+并查集

好题&#xff0c;发现没有强制在线&#xff0c;可以离线操作 排序之后带集合点数的并查集就好了 #include<bits/stdc.h> using namespace std; const int N 1e510; int n,m; int p[N],sz[N];int find(int x){if(x!p[x])p[x] find(p[x]);return p[x]; } struct Node{in…...

遇到java.security.AccessControlException:access denied怎么办?

今天工作中遇到了如下报错&#xff0c;记录一下解决方案。 目录 问题 分析 结论 问题 这个问题出现在openjdk8启动网页端Java应用。 Java Exception:java.security.AccessControlException:access denied("java.net.SocketPermission""22.188.130.11:9000…...

c++对接CAT1400

最近工作中遇到需要对接1400协议,网上搜索不到c/c++的实现,所以记录一下自己的实现。 第一步注册: 1400是在http摘要认证的基础上做的,所以要去了解http摘要认证的流程 说明: 1.视图库通过用户分配,手动分配username,password给三方对接程序 2.三方对接程序第一次请求由…...

Linux基础【Linux知识贩卖机】

偶尔的停顿和修整&#xff0c;对于人生是非常必要的。 --随记 文章目录 Linux目录目录结构磁盘分区相关命令 相对路径和绝对路径 文件权限用户分类umask创建文件权限计算方法粘滞位 总结 Linux目录 目录结构 Linux 操作系统采用了一种层次化的目录结构&#xff0c;常被称为标…...

CSS 边框、轮廓线

一、CSS边框&#xff1a; CSS边框属性允许指定一个元素边框的样式和颜色。 1&#xff09;、边框样式&#xff1a;border-style属性用来定义边框的样式&#xff0c;border-style值&#xff1a; 2&#xff09;、边框宽度&#xff1a;border-width属性用于指定边框宽度。指定变宽…...

Transformer架构 完整的处理流程

Transformer 是由多层的 Encoder 和 Decoder 构成的。每一层的 Encoder 和 Decoder 都包含了多头自注意力机制&#xff08;Multi-head Self Attention&#xff09;、前馈神经网络&#xff08;Feed Forward&#xff09;和添加及归一化&#xff08;Add & Norm&#xff09;。特…...

git and svn 行尾风格配置强制为lf

git CLI配置&#xff1a; // 提交时转换为LF&#xff0c;检出时转换为CRLF git config --global core.autocrlf true // 提交时转换为LF&#xff0c;检出时不转换 git config --global core.autocrlf input // 提交检出均不转换 git config --global core.autocrlf f…...

达梦数据库答案

1、 创建数据库实例&#xff0c;到/dm8/data下&#xff0c;数据库名&#xff1a;DEMO&#xff0c;实例名DEMOSERVER&#xff08;10分&#xff09; [dmdbadmServer ~]$ cd /dm8/tool [dmdbadmServer tool]$ ./dbca.sh1、 簇大小32&#xff0c;页大小16&#xff0c;登录密码&…...

grdpwasm:基于Go WebAssembly的Web RDP客户端,无需插件直连Windows远程桌面!

导航菜单可进行切换导航、登录、外观设置等操作。平台方面&#xff0c;有AI代码创作&#xff0c;如GitHub Copilot可借助AI编写更优质代码&#xff0c;GitHub Spark能构建并部署智能应用等&#xff1b;开发者工作流包括Actions自动化工作流、Codespaces即时开发环境等&#xff…...

保姆级教程:用GD32F103的DAC+TIMER+DMA生成正弦波,示波器实测波形稳如老狗

GD32F103实战&#xff1a;DACTIMERDMA正弦波生成全解析 最近在调试一个音频信号发生器项目时&#xff0c;发现不少初学者在使用GD32的DAC功能时都会遇到波形不稳定、配置复杂的问题。今天我就以GD32103C-START开发板为例&#xff0c;手把手带大家实现一个零CPU占用的正弦波发生…...

哔咔漫画下载器完整指南:3倍速打造个人离线漫画库

哔咔漫画下载器完整指南&#xff1a;3倍速打造个人离线漫画库 【免费下载链接】picacomic-downloader 哔咔漫画 picacomic pica漫画 bika漫画 PicACG 多线程下载器&#xff0c;带图形界面 带收藏夹&#xff0c;已打包exe 下载速度飞快 项目地址: https://gitcode.com/gh_mirr…...

【2024最硬核VS Code自动化教程】:覆盖GitHub Actions+Dev Containers+Task Runner的Copilot Next三重协同配置

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;VS Code Copilot Next 自动化工作流配置全景概览 VS Code Copilot Next 并非独立插件&#xff0c;而是基于 GitHub Copilot Chat 的深度集成增强形态&#xff0c;依托 VS Code 1.86 的新扩展主机 API …...

专业音频解密方案:ncmdump全面解析与高效NCM格式转换指南

专业音频解密方案&#xff1a;ncmdump全面解析与高效NCM格式转换指南 【免费下载链接】ncmdump 项目地址: https://gitcode.com/gh_mirrors/ncmd/ncmdump 在数字音乐管理领域&#xff0c;格式兼容性问题一直是困扰用户的痛点&#xff0c;特别是网易云音乐采用的NCM加密…...

YesPlayMusic深度解析:网易云音乐纯净播放的终极解决方案

YesPlayMusic深度解析&#xff1a;网易云音乐纯净播放的终极解决方案 【免费下载链接】YesPlayMusic 高颜值的第三方网易云播放器&#xff0c;支持 Windows / macOS / Linux :electron: 项目地址: https://gitcode.com/gh_mirrors/ye/YesPlayMusic 厌倦了官方客户端繁杂…...

Visual C++运行库修复工具终极指南:从故障诊断到批量管理

Visual C运行库修复工具终极指南&#xff1a;从故障诊断到批量管理 【免费下载链接】vcredist AIO Repack for latest Microsoft Visual C Redistributable Runtimes 项目地址: https://gitcode.com/gh_mirrors/vc/vcredist 你是否曾经遇到过这样的场景&#xff1a;刚下…...

RAG评估框架解析:提升检索增强生成系统质量

1. RAG评估框架概述&#xff1a;为什么我们需要专门工具&#xff1f;在构建基于检索增强生成&#xff08;RAG&#xff09;的系统时&#xff0c;开发者常陷入一个误区&#xff1a;认为只要拼接好检索模块和生成模块就能自动获得优质输出。但实际工程实践中&#xff0c;我们至少面…...

数据科学思维导图:从工具链到实战心法

1. 数据科学的达芬奇密码&#xff1a;掌握数据科学思维导图数据科学就像一场精心编排的交响乐&#xff0c;需要统计学家的严谨、工程师的务实和艺术家的创造力。作为一名从业十余年的数据科学家&#xff0c;我发现真正优秀的数据分析项目往往遵循着相似的底层逻辑 - 就像达芬奇…...

从PolarCTF靶场四道Web题,聊聊那些容易被忽略的‘非主流’漏洞利用技巧

从PolarCTF靶场四道Web题&#xff0c;聊聊那些容易被忽略的‘非主流’漏洞利用技巧 在Web安全领域&#xff0c;常规漏洞利用方法往往被广泛讨论&#xff0c;但真正的高手对决往往取决于对"非主流"技巧的掌握程度。就像武术中的奇招异式&#xff0c;这些不太常见的利…...