flink的集成测试
背景
日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试
flink中进行集成测试
flink中进行集成测试的关键类MiniClusterWithClientResource,这是一个启动本地flink集群的关键类,先看一下集成测试的关键代码:
/*** FLINK集成测试* https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/**/
public class FlinkIntegrationTest {public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {{put("heartbeat.timeout", "300000");}});@ClassRulepublic static MiniClusterWithClientResource flinkCluster =new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));}@Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));}// create a testing sinkprivate static class CollectSink implements SinkFunction<String> {// must be staticpublic static final List<String> values = Collections.synchronizedList(new ArrayList<>());@Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}public class StatefulFlatMap extends RichFlatMapFunction<String, String> {ValueState<String> previousInput;@Overridepublic void open(Configuration parameters) throws Exception {previousInput = getRuntimeContext().getState(new ValueStateDescriptor<String>("previousInput", Types.STRING));}@Overridepublic void flatMap(String in, Collector<String> collector) throws Exception {String out = "hello " + in;if(previousInput.value() != null){out = out + " " + previousInput.value();}previousInput.update(in);collector.collect(out);}
由于我们是集成测试,我们一般输入source和输出sink是自己构造的,比如这里的CollectSink,这里就可以正常测试包括状态在内的pineline集成测试了
相关文章:
flink的集成测试
背景 日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试 flink中进行集成测试 flink中进行集成测…...
gitee推荐-1Panel
以下内容来源于gitee。 gitee地址:1Panel: 🔥 🔥 🔥 现代化、开源的 Linux 服务器运维管理面板。 大概和宝塔类似,但支持docker。在线体验:https://demo.1panel.cn/ 稍微试了下,没找到apache,…...
GEE 22:基于GEE实现物种分布模型(更新中。。。。。。)
物种分布模型 1. 数据点准备1.1 数据加载1.2 去除指定距离内的重复点1.3 定义研究区范围1.4 选择预测因子 1. 数据点准备 1.1 数据加载 首先需要将CSV文件导入到GEE平台中,同样也可以导入shp格式文件。 // 1.Loading and cleaning your species data *************…...
阿里云windwos 安装oracle数据库,外部用工具连接不上,只能在服务器本机通过127.0.0.1 连接
1. 首先检查阿里云服务器安全组端口是否开放 oracle 数据库端口 2. 其次找到oracle 安装的目录,打开这俩个文件,将localhost 修改为 服务器本机名称 3.重启oracle 监听服务,就可以连接了...
UniApp 中的 image 属性讲解
在 UniApp 中,image 是用于显示图片的组件,它具有多种属性,可以控制图片的展示方式和行为。下面我将为您讲解一些常用的 image 属性。 基本属性 src:指定要显示的图片资源路径,可以是本地路径或远程 URL。mode&#…...
卷积神经网络(CNN)车牌识别
文章目录 一、前言二、前期工作1. 设置GPU(如果使用的是CPU可以忽略这步)2. 导入数据3. 查看数据3.数据可视化4.标签数字化 二、构建一个tf.data.Dataset1.预处理函数2.加载数据3.配置数据 三、搭建网络模型四、设置动态学习率五、编译六、训练八、保存和…...
弹窗concrt140.dll丢失的解决方法,深度解析concrt140.dll丢失的原因
在计算机使用过程中,我们经常会遇到一些错误提示或者系统崩溃的情况。其中,concrt140.dll是一个常见的错误提示,这个错误通常会导致某些应用程序无法正常运行。为了解决这个问题,本文将介绍5种详细的解决方法,帮助您恢…...
CANdelaStudio 使用教程4 编辑State
文章目录 简述1、State Groups2、Dependencies3、 Defaults State1、 会话状态2、 新增会话状态3、 编辑 服务对 State 的依赖关系 State Diagram 简述 1、State Groups 2、Dependencies 在这里,可以编辑现有服务在不同会话状态或安全访问状态的支持情况和状态转换…...
FANUC机器人到达某个点位时,为什么不显示@符号?
FANUC机器人到达某个点位时,为什么不显示@符号? 该功能由变量$MNDSP_POSCF = 0(不显示)/1(显示)/2(光标移动该行显示) 控制,该变量设置为不同的值,则启用对应的功能。 如下图所示,为该变量设置不同的值时的对比, 其他常用的系统变量可参考以下内容: 在R寄存器指定速度…...
JVM运行参数介绍 -Xms -Xmx -Xmn -Xss
文章目录 CharGPT问答Java运行参数“-Xmx2048m -Xms1024m -Xmn512m -Xss256k”如何调优jvm的运行参数 JVM相关介绍Java 虚拟机底层原理知识总结 CharGPT问答 Java运行参数“-Xmx2048m -Xms1024m -Xmn512m -Xss256k” 2023/11/26 20:30:27 这些参数是用于配置 Java 虚拟机&am…...
Hive删除符合条件的记录
Hive在使用中不支持update和delete操作,那么如果想删除部分条件的记录需要怎么操作?本文记录下解决方法。 思路:使用selectwhere选出想要保留的数据,使用insert overwrite向原表覆盖插入数据. insert overwrite table dbname.tab…...
Linux加强篇006-存储结构与管理硬盘
目录 前言 1. 从“/”开始 2. 物理设备命名规则 3. 文件系统与数据资料 4. 挂载硬件设备 5. 添加硬盘设备 6. 添加交换分区 7. 磁盘容量配额 8. VDO虚拟数据优化 9. 软硬方式链接 前言 悟已往之不谏,知来者之可追。实迷途其未远,觉今是而昨非…...
GIT版本控制和常用命令使用介绍
GIT版本控制和常用命令使用介绍 1. 版本控制1.1 历史背景1.2 什么是版本控制1.3 常见版本控制工具1.4 版本控制的分类 2 Git介绍2.1 Git 工作流程2.2 基本概念2.3 文件的四种状态2.4 忽略文件2.5 Git命令2.5.1 查看本地git配置命令2.5.2 远程库信息查看命令2.5.3 分支交互命令2…...
微服务学习|初识Docker、使用Docker、自定义镜像、DockerCompose、Docker镜像仓库
初识Docker 项目部署的问题 大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题 依赖关系复杂,容易出现兼容性问题 开发、测试、生产环境有差异 Docker如何解决依赖的兼容问题的? 将应用的Libs (函数库)、Deps (依赖)配置与应用…...
蓝桥杯每日一题2023.11.24
题目描述 #include <stdio.h> #define N 100int connected(int* m, int p, int q) {return m[p]m[q]? 1 : 0; }void link(int* m, int p, int q) {int i;if(connected(m,p,q)) return;int pID m[p];int qID m[q];for(i0; i<N; i) ________________________________…...
内网穿透的应用-如何在本地安装Flask,以及将其web界面发布到公网上并进行远程访问
轻量级web开发框架:Flask本地部署及实现公网访问界面 文章目录 轻量级web开发框架:Flask本地部署及实现公网访问界面前言1. 安装部署Flask2. 安装Cpolar内网穿透3. 配置Flask的web界面公网访问地址4. 公网远程访问Flask的web界面 前言 本篇文章讲解如何…...
【重要】Splunk 的 Lookup Table能否被覆盖呢?
1: 背景: 用户自己的lookup table 可能需要被覆盖,因为用户自己会自动更新,但是如果不是用户自己更新,Deployer 上面发布的时候,用于没有用户的table ,那么默认是不能把客户的table overwite 的。如果用户要覆盖的话,如果客户有权限的话,客户可以自己更换lookup table…...
SELinux零知识学习三十、SELinux策略语言之角色和用户(1)
接前一篇文章:SELinux零知识学习二十九、SELinux策略语言之类型强制(14) 三、SELinux策略语言之类型强制 SELinux提供了一种依赖于类型强制(类型增强,TE)的基于角色的访问控制(Role-Based Access Control),角色用于组域类型和限制域类型与用户之间的关系,SELinux中的…...
MyBatis Generator使用总结
MyBatis Generator使用总结 介绍具体使用数据准备插件引入配置条件构建讲解demo地址 介绍 MyBatis Generator (MBG) 是 MyBatis 的代码生成器。它能够根据数据库表,自动生成 java 实体类、dao 层接口(mapper 接口)及m…...
编程语言发展史:Ruby语言的发展和应用
介绍 Ruby是一种高级编程语言,最初由日本的松本行弘开发。它在20世纪90年代初首次发布,并在2000年代初开始变得流行。 Ruby是一种动态、面向对象的语言,具有简单、易于学习和使用的语法,因此被广泛应用于Web开发、数据分析、游戏…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
AGain DB和倍数增益的关系
我在设置一款索尼CMOS芯片时,Again增益0db变化为6DB,画面的变化只有2倍DN的增益,比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析: 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...
Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
Redis:现代应用开发的高效内存数据存储利器
一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发,其初衷是为了满足他自己的一个项目需求,即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源,Redis凭借其简单易用、…...
C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...
Elastic 获得 AWS 教育 ISV 合作伙伴资质,进一步增强教育解决方案产品组合
作者:来自 Elastic Udayasimha Theepireddy (Uday), Brian Bergholm, Marianna Jonsdottir 通过搜索 AI 和云创新推动教育领域的数字化转型。 我们非常高兴地宣布,Elastic 已获得 AWS 教育 ISV 合作伙伴资质。这一重要认证表明,Elastic 作为 …...
