flink的集成测试
背景
日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试
flink中进行集成测试
flink中进行集成测试的关键类MiniClusterWithClientResource,这是一个启动本地flink集群的关键类,先看一下集成测试的关键代码:
/*** FLINK集成测试* https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/**/
public class FlinkIntegrationTest {public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {{put("heartbeat.timeout", "300000");}});@ClassRulepublic static MiniClusterWithClientResource flinkCluster =new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));}@Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));}// create a testing sinkprivate static class CollectSink implements SinkFunction<String> {// must be staticpublic static final List<String> values = Collections.synchronizedList(new ArrayList<>());@Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
由于我们是集成测试,我们一般输入source和输出sink是自己构造的,比如这里的CollectSink,这里就可以正常测试包括状态在内的pineline集成测试了
相关文章:
flink的集成测试
背景 日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试 flink中进行集成测试 flink中进行集成测…...
gitee推荐-1Panel
以下内容来源于gitee。 gitee地址:1Panel: 🔥 🔥 🔥 现代化、开源的 Linux 服务器运维管理面板。 大概和宝塔类似,但支持docker。在线体验:https://demo.1panel.cn/ 稍微试了下,没找到apache,…...

GEE 22:基于GEE实现物种分布模型(更新中。。。。。。)
物种分布模型 1. 数据点准备1.1 数据加载1.2 去除指定距离内的重复点1.3 定义研究区范围1.4 选择预测因子 1. 数据点准备 1.1 数据加载 首先需要将CSV文件导入到GEE平台中,同样也可以导入shp格式文件。 // 1.Loading and cleaning your species data *************…...

阿里云windwos 安装oracle数据库,外部用工具连接不上,只能在服务器本机通过127.0.0.1 连接
1. 首先检查阿里云服务器安全组端口是否开放 oracle 数据库端口 2. 其次找到oracle 安装的目录,打开这俩个文件,将localhost 修改为 服务器本机名称 3.重启oracle 监听服务,就可以连接了...
UniApp 中的 image 属性讲解
在 UniApp 中,image 是用于显示图片的组件,它具有多种属性,可以控制图片的展示方式和行为。下面我将为您讲解一些常用的 image 属性。 基本属性 src:指定要显示的图片资源路径,可以是本地路径或远程 URL。mode&#…...

卷积神经网络(CNN)车牌识别
文章目录 一、前言二、前期工作1. 设置GPU(如果使用的是CPU可以忽略这步)2. 导入数据3. 查看数据3.数据可视化4.标签数字化 二、构建一个tf.data.Dataset1.预处理函数2.加载数据3.配置数据 三、搭建网络模型四、设置动态学习率五、编译六、训练八、保存和…...

弹窗concrt140.dll丢失的解决方法,深度解析concrt140.dll丢失的原因
在计算机使用过程中,我们经常会遇到一些错误提示或者系统崩溃的情况。其中,concrt140.dll是一个常见的错误提示,这个错误通常会导致某些应用程序无法正常运行。为了解决这个问题,本文将介绍5种详细的解决方法,帮助您恢…...

CANdelaStudio 使用教程4 编辑State
文章目录 简述1、State Groups2、Dependencies3、 Defaults State1、 会话状态2、 新增会话状态3、 编辑 服务对 State 的依赖关系 State Diagram 简述 1、State Groups 2、Dependencies 在这里,可以编辑现有服务在不同会话状态或安全访问状态的支持情况和状态转换…...

FANUC机器人到达某个点位时,为什么不显示@符号?
FANUC机器人到达某个点位时,为什么不显示@符号? 该功能由变量$MNDSP_POSCF = 0(不显示)/1(显示)/2(光标移动该行显示) 控制,该变量设置为不同的值,则启用对应的功能。 如下图所示,为该变量设置不同的值时的对比, 其他常用的系统变量可参考以下内容: 在R寄存器指定速度…...
JVM运行参数介绍 -Xms -Xmx -Xmn -Xss
文章目录 CharGPT问答Java运行参数“-Xmx2048m -Xms1024m -Xmn512m -Xss256k”如何调优jvm的运行参数 JVM相关介绍Java 虚拟机底层原理知识总结 CharGPT问答 Java运行参数“-Xmx2048m -Xms1024m -Xmn512m -Xss256k” 2023/11/26 20:30:27 这些参数是用于配置 Java 虚拟机&am…...
Hive删除符合条件的记录
Hive在使用中不支持update和delete操作,那么如果想删除部分条件的记录需要怎么操作?本文记录下解决方法。 思路:使用selectwhere选出想要保留的数据,使用insert overwrite向原表覆盖插入数据. insert overwrite table dbname.tab…...

Linux加强篇006-存储结构与管理硬盘
目录 前言 1. 从“/”开始 2. 物理设备命名规则 3. 文件系统与数据资料 4. 挂载硬件设备 5. 添加硬盘设备 6. 添加交换分区 7. 磁盘容量配额 8. VDO虚拟数据优化 9. 软硬方式链接 前言 悟已往之不谏,知来者之可追。实迷途其未远,觉今是而昨非…...

GIT版本控制和常用命令使用介绍
GIT版本控制和常用命令使用介绍 1. 版本控制1.1 历史背景1.2 什么是版本控制1.3 常见版本控制工具1.4 版本控制的分类 2 Git介绍2.1 Git 工作流程2.2 基本概念2.3 文件的四种状态2.4 忽略文件2.5 Git命令2.5.1 查看本地git配置命令2.5.2 远程库信息查看命令2.5.3 分支交互命令2…...

微服务学习|初识Docker、使用Docker、自定义镜像、DockerCompose、Docker镜像仓库
初识Docker 项目部署的问题 大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题 依赖关系复杂,容易出现兼容性问题 开发、测试、生产环境有差异 Docker如何解决依赖的兼容问题的? 将应用的Libs (函数库)、Deps (依赖)配置与应用…...

蓝桥杯每日一题2023.11.24
题目描述 #include <stdio.h> #define N 100int connected(int* m, int p, int q) {return m[p]m[q]? 1 : 0; }void link(int* m, int p, int q) {int i;if(connected(m,p,q)) return;int pID m[p];int qID m[q];for(i0; i<N; i) ________________________________…...

内网穿透的应用-如何在本地安装Flask,以及将其web界面发布到公网上并进行远程访问
轻量级web开发框架:Flask本地部署及实现公网访问界面 文章目录 轻量级web开发框架:Flask本地部署及实现公网访问界面前言1. 安装部署Flask2. 安装Cpolar内网穿透3. 配置Flask的web界面公网访问地址4. 公网远程访问Flask的web界面 前言 本篇文章讲解如何…...
【重要】Splunk 的 Lookup Table能否被覆盖呢?
1: 背景: 用户自己的lookup table 可能需要被覆盖,因为用户自己会自动更新,但是如果不是用户自己更新,Deployer 上面发布的时候,用于没有用户的table ,那么默认是不能把客户的table overwite 的。如果用户要覆盖的话,如果客户有权限的话,客户可以自己更换lookup table…...
SELinux零知识学习三十、SELinux策略语言之角色和用户(1)
接前一篇文章:SELinux零知识学习二十九、SELinux策略语言之类型强制(14) 三、SELinux策略语言之类型强制 SELinux提供了一种依赖于类型强制(类型增强,TE)的基于角色的访问控制(Role-Based Access Control),角色用于组域类型和限制域类型与用户之间的关系,SELinux中的…...

MyBatis Generator使用总结
MyBatis Generator使用总结 介绍具体使用数据准备插件引入配置条件构建讲解demo地址 介绍 MyBatis Generator (MBG) 是 MyBatis 的代码生成器。它能够根据数据库表,自动生成 java 实体类、dao 层接口(mapper 接口)及m…...
编程语言发展史:Ruby语言的发展和应用
介绍 Ruby是一种高级编程语言,最初由日本的松本行弘开发。它在20世纪90年代初首次发布,并在2000年代初开始变得流行。 Ruby是一种动态、面向对象的语言,具有简单、易于学习和使用的语法,因此被广泛应用于Web开发、数据分析、游戏…...
[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?
🧠 智能合约中的数据是如何在区块链中保持一致的? 为什么所有区块链节点都能得出相同结果?合约调用这么复杂,状态真能保持一致吗?本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...

从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...

聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...

dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?
现有的 Redis 分布式锁库(如 Redisson)相比于开发者自己基于 Redis 命令(如 SETNX, EXPIRE, DEL)手动实现分布式锁,提供了巨大的便利性和健壮性。主要体现在以下几个方面: 原子性保证 (Atomicity)ÿ…...