Debezium发布历史87
原文地址: https://debezium.io/blog/2020/03/19/integration-testing-for-change-data-capture-with-testcontainers/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
使用 Testcontainer 进行变更数据捕获的集成测试
三月 19, 2020 作者: Gunnar Morling
讨论 testcontainers postgres
使用 Debezium 设置变更数据捕获 (CDC) 管道通常只是配置问题,无需涉及任何编程。对 CDC 设置进行自动化测试仍然是一个非常好的主意,确保一切配置正确并且 Debezium 连接器按预期设置。
涉及两个主要组件,其配置需要考虑:
源数据库:必须对其进行设置,以便 Debezium 可以连接到它并检索更改事件;详细信息取决于具体的数据库,例如对于 MySQL,binlog 必须为“行”模式,对于 Postgres,必须安装支持的逻辑解码插件之一等。
Debezium 连接器:必须使用正确的数据库主机和凭据进行配置,可能使用 SSL、应用表和列过滤器、可能一个或多个单一消息转换 (SMT) 等。
这就是新添加的 Debezium对与Testcontainers的集成测试的支持的用武之地。它允许使用 Linux 容器映像设置所有必需的组件(Apache Kafka、Kafka Connect 等)、配置和部署 Debezium 连接器并对生成的运行断言更改数据事件。
让我们看看它是如何完成的。
项目设立
假设您正在使用 Apache Maven 进行依赖项管理,请将以下依赖项添加到您的pom.xml中,引入 Debezium Testcontainers 集成和Apache Kafka 的 Testcontainers 模块:
初始化测试容器
声明了所有必需的依赖项后,就可以编写 CDC 集成测试了。通过 Testcontainers,集成测试是使用 Linux 容器和 Docker 实现的。它提供了一个 Java API,用于启动和管理测试所需的资源。我们可以用它来启动 Apache Kafka、Kafka Connect 和 Postgres 数据库:
public class CdcTest {
private static Network network = Network.newNetwork();
private static KafkaContainer kafkaContainer = new KafkaContainer()
.withNetwork(network);
public static PostgreSQLContainer<?> postgresContainer =
new PostgreSQLContainer<>(“debezium/postgres:11”)
.withNetwork(network)
.withNetworkAliases(“postgres”);
public static DebeziumContainer debeziumContainer =
new DebeziumContainer(“1.1.0.CR1”)
.withNetwork(network)
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer);
@BeforeClass
public static void startContainers() {
Startables.deepStart(Stream.of(
kafkaContainer, postgresContainer, debeziumContainer))
.join();
}
}
定义供所有服务使用的 Docker 网络
为 Apache Kafka 设置容器
为 Postgres 11 设置容器(使用 Debezium 的 Postgres 容器映像)
使用 Debezium 为 Kafka Connect 设置容器
@BeforeClass在一个方法中启动所有三个容器
请注意,您需要安装 Docker 才能使用 Testcontainers。
测试实施
所需的基础设施到位后,我们可以为 CDC 设置编写测试。测试的总体流程是这样的:
为 Postgres 数据库配置 Debezium 连接器
执行几条SQL语句来改变一些数据
使用 Kafka 消费者从相应的 Kafka 主题检索生成的更改数据事件
针对这些事件运行一些断言
这是测试的 shell:
@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
try (Connection connection = getConnection(postgresContainer);
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer =
getConsumer(kafkaContainer)) {
// TODO ...
}
}
数据库连接的凭据可以从通过 Testcontainers 启动的 Postgres 容器中获取,很好地避免了任何冗余:
private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
throws SQLException {
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
postgresContainer.getUsername(),
postgresContainer.getPassword());
}
Kafka 消费者也是如此:
private KafkaConsumer<String, String> getConsumer(
KafkaContainer kafkaContainer) {
return new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG,"tc-" + UUID.randomUUID(),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"),new StringDeserializer(),new StringDeserializer());
}
现在我们来实现实际的测试逻辑:
statement.execute(“create schema todo”);
statement.execute(“create table todo.Todo (” +
"id int8 not null, " +
"title varchar(255), " +
“primary key (id))”);
statement.execute(“alter table todo.Todo replica identity full”);
statement.execute(“insert into todo.Todo values (1, ‘Learn CDC’)”);
statement.execute(“insert into todo.Todo values (2, ‘Learn Debezium’)”);
ConnectorConfiguration connector = ConnectorConfiguration
.forJdbcContainer(postgresContainer)
.with(“database.server.name”, “dbserver1”);
debeziumContainer.registerConnector(“my-connector”,
connector);
consumer.subscribe(Arrays.asList(“dbserver1.todo.todo”));
List<ConsumerRecord<String, String>> changeEvents =
drain(consumer, 2);
ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath. read(changeEvent.key(), “ . i d " ) ) . i s E q u a l T o ( 1 ) ; a s s e r t T h a t ( J s o n P a t h . < S t r i n g > r e a d ( c h a n g e E v e n t . v a l u e ( ) , " .id")) .isEqualTo(1); assertThat(JsonPath.<String> read(changeEvent.value(), " .id")).isEqualTo(1);assertThat(JsonPath.<String>read(changeEvent.value(),".op”))
.isEqualTo(“r”);
assertThat(JsonPath. read(changeEvent.value(), “$.after.title”))
.isEqualTo(“Learn CDC”);
changeEvent = changeEvents.get(1);
assertThat(JsonPath. read(changeEvent.key(), “ . i d " ) ) . i s E q u a l T o ( 2 ) ; a s s e r t T h a t ( J s o n P a t h . < S t r i n g > r e a d ( c h a n g e E v e n t . v a l u e ( ) , " .id")) .isEqualTo(2); assertThat(JsonPath.<String> read(changeEvent.value(), " .id")).isEqualTo(2);assertThat(JsonPath.<String>read(changeEvent.value(),".op”))
.isEqualTo(“r”);
assertThat(JsonPath. read(changeEvent.value(), “$.after.title”))
.isEqualTo(“Learn Debezium”);
consumer.unsubscribe();
在Postgres数据库中创建一个表并插入两条记录
注册 Debezium Postgres 连接器的实例
从 Kafka 中的更改事件主题读取两条记录并断言它们的属性
请注意 Debezium 的 Testcontainers 支持如何允许从数据库容器播种连接器配置,从而避免显式给出数据库连接属性的需要。仅必须给出唯一的database.server.name,当然您可以应用其他配置选项,例如表或列过滤器、SMT 等。
drain()为了简洁起见,省略了从 Kafka 主题读取给定数量记录的方法的源代码。您可以在 GitHub 上的完整示例中找到它。
基于 JsonPath 的断言可以方便地断言预期数据更改事件的属性,但当然您也可以使用任何其他 JSON API 来完成这项工作。当使用 Apache Avro 而不是 JSON 作为序列化格式时,您必须改用 Avro API。
包起来
Testcontainers 和 Debezium 对它的支持使得为 CDC 设置编写自动化集成测试变得相当容易。
本文讨论的测试方法可以通过多种方式进行扩展。例如,可能需要将连接器配置置于修订控制之下(以便您可以管理和跟踪任何配置更改)并使用这些配置文件驱动测试。您还可以更进一步并测试整个数据流管道。为此,您不仅必须部署 Debezium 连接器,还必须部署接收器连接器,例如用于数据仓库或搜索服务器的连接器。然后,您可以对这些接收器系统中的数据运行断言,确保数据管道端到端的正确性。
您对测试 CDC 设置和管道有何看法?请在下面的评论中告诉我们!
相关文章:
Debezium发布历史87
原文地址: https://debezium.io/blog/2020/03/19/integration-testing-for-change-data-capture-with-testcontainers/ 欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯. 使用 Testcontainer 进行变更数…...
Leetcode131.分割回文串-Palindrome Patitioning-Python-回溯法
解题思路: 1.切割回文串,可以用解决找组合问题的思路解决,而解决组合问题,可以用回溯法,故本题选择回溯法。 2.理解两个事情:1.递归函数里的for循环是横向遍历给定字符串s的每一个字母。2.针对s的每一个字…...

Java面试——基础篇
目录 1、java语言有哪些优点和缺点? 2、JVM 、 JDK 和 JRE的关系 3、为什么说 Java 语言“编译与解释并存”? 4、Java和c的区别 5、基本数据类型 5.1、java的8种基本数据类型: 5.2、基本类型和包装类型的区别: 5.3、包装类型的缓存机…...

C++——结构体
1,结构体基本概念 结构体属于用户自定义的数据类型,允许用户存储不同的数据类型。像int(整型),浮点型,bool型,字符串型等都是属于系统内置的数据类型。而今天要学习的结构体则是属于我们自定义…...

C++技术要点总结, 面试必备, 收藏起来慢慢看
目录 1. 语言对比 1.1 C 11 新特性 2.2 C 和 C 的区别 2.3 Python 和 C 的区别 2. 编译内存相关 2.1. C 程序编译过程 2.2. C 内存管理 2.3. 栈和堆的区别 2.4. 变量的区别 2.5. 全局变量定义在头文件中有什么问题? 2.6. 内存对齐 2.7. 什么是内存泄露 …...

VR数字展厅,平面静态跨越到3D立体化时代
近些年,VR的概念被越来越多的人提起,较为常见的形式就是VR数字展厅。VR数字展厅的出现,让各地以及各行业的展厅展馆的呈现和宣传都发生了很大的改变和革新,同时也意味着展览传播的方式不再局限于原来的图文、视频,而是…...

Linux中LVM实验
LVM实验: 1、分区 -L是大小的意思-n名称的意思 从vg0(卷组)分出来 2、格式化LV逻辑卷 LVM扩容 如果icdir空间不够了, 扩展空间lvextend -L 5G /dev/vg0/lv1 /dev/vg0/lv1(pp,vg,lv) 刷新文件系统xfs_growfs /lvdir VG扩容 …...

外包干了一个月,技术退步明显。。。。。
先说一下自己的情况,本科生,19年通过校招进入南京某软件公司,干了接近4年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…...

gitlab.rb主要配置
根据是否docker安装,进入挂载目录或安装目录 修改此文件,我一般是在可视化窗口中修改,有时候也在命令行手敲 将下面的配置复制到该文件中 external_url http://192.168.100.50 # nginx[listen_port] = 8000 (docker安装的这一行不需要,因为端口映射导致此处修改会导致访问…...
网络协议基础
tcp/ip协议簇 TCP/IP协议族 网络接口层(没有特定的协议) 物理层 数据链路层 网络层: IP (v4/v6) ARP(地址解析协议) RARP . ICMP (Internet控制报文协议) IGMP 传输层: TCP (传输控制协议) UDP (用户数据报协议) 应用层: 都是基于传输层协议的端口,总共端口0~65535 …...

Mac使用adb调试安卓手机
0x00 背景 最近windows电脑休息,用mac办公比较多,手机用时间长了,不太灵光,准备修理一番。于是要用mac调试下android手机。配置略显麻烦,网上的步骤多参差不齐。估计是入门步骤,大佬们也懒得写的太细。于是…...

Web 开发 1: Flask 框架介绍和使用
在 Web 开发中,Flask 是一个流行且灵活的 Python Web 框架,用于构建 Web 应用程序。它简洁而易于上手,适用于小型到中型的项目。在本篇博客中,我将为你介绍 Flask 框架的基础知识和常用技巧,帮助你更好地掌握 Web 开发…...
Centos7.6之禅道开源版17.6.1安装记录
Centos7.6之禅道开源版17.6.1安装记录 文章目录 Centos7.6之禅道开源版17.6.1安装记录1. 下载2. 安装3. 登录4. 连接数据库1. 本地连接2. 远程连接1. 开启远程访问用户2. 更改mysql绑定的主机3. 重启Apache与MySQL服务 4. 常用命令1. Apache和Mysql常用命令2. 其他 1. 下载 官网…...
有趣的代码(简单)
1.代码1 #include<stdio.h> #include<string.h> #include<windows.h> #define _CRT_SECURE_NO_WARNINGS 1 void love() {system("color 4");printf(" **** ***************** ** …...

Java和Redis实现一个简单的热搜功能
1. 前言 我们有一个简单的需求: 搜索栏展示当前登陆的个人用户的搜索历史记录,删除个人历史记录。用户在搜索栏输入某字符,则将该字符记录下来 以zset格式存储的redis中,记录该字符被搜索的个数以及当前的时间戳 (用…...

超越传统,想修哪里就修哪里,SUPIR如何通过文本提示实现智能图像修复
项目简介 通过参数增加使得模型不仅能够修复图像中的错误或损坏,还能根据文本提示进行智能修复。例如根据描述来改变图像中的特定细节。这样的处理方式提升了图像修复的质量和智能度,使得模型能够更准确、更灵活地恢复和改进图像。 SUPIR的主要功能图像…...

《如何画好架构图》学习笔记
看了一堂《如何画好架构图》的公开课,结合网上的资料与经验做一些思考总结。文中的例子和图片大多是从课程中摘录的。 1. 4R架构定义 4R架构定义其实是软件架构定义经过归纳提炼后的简称。 软件架构定义:软件架构是指软件系统的顶层(Rank&am…...

redis整合
一.redis的发布订阅 什么 是发布和订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 1、Redis的发布和订阅 客户端订阅频道发布的消息 频道发布消息 订阅者就可以…...
开循环低温样品架节约液氦操作技巧
开循环低温样品架以降温快、无轰动源、重量轻、装置便利等特色遭到大多数客户的喜爱。但是制冷剂消耗量引起的运用本钱是客户在运用过程中zhong点重视的问题,特别是随着全球液氦价格继续飙升,开循环样品架的运用本钱也在逐渐添加,如何节约液氦…...

年薪30W+,待遇翻倍,我的经历值得每个测试人借鉴
从自考大专到出走公司,从半年无业露宿深圳北站,从8k…到11.5k…再到20k,我的经历值得每个测试人借鉴 或许学历并没有那么重要 12年高考之后,在朋友的介绍下(骗了过去),没有好好的读大学&#x…...
浅谈 React Hooks
React Hooks 是 React 16.8 引入的一组 API,用于在函数组件中使用 state 和其他 React 特性(例如生命周期方法、context 等)。Hooks 通过简洁的函数接口,解决了状态与 UI 的高度解耦,通过函数式编程范式实现更灵活 Rea…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...

手游刚开服就被攻击怎么办?如何防御DDoS?
开服初期是手游最脆弱的阶段,极易成为DDoS攻击的目标。一旦遭遇攻击,可能导致服务器瘫痪、玩家流失,甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案,帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈
在日常iOS开发过程中,性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期,开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发,但背后往往隐藏着系统资源调度不当…...
【Go语言基础【13】】函数、闭包、方法
文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数(函数作为参数、返回值) 三、匿名函数与闭包1. 匿名函数(Lambda函…...
C++课设:简易日历程序(支持传统节假日 + 二十四节气 + 个人纪念日管理)
名人说:路漫漫其修远兮,吾将上下而求索。—— 屈原《离骚》 创作者:Code_流苏(CSDN)(一个喜欢古诗词和编程的Coder😊) 专栏介绍:《编程项目实战》 目录 一、为什么要开发一个日历程序?1. 深入理解时间算法2. 练习面向对象设计3. 学习数据结构应用二、核心算法深度解析…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅
目录 前言 操作系统与驱动程序 是什么,为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中,我们在使用电子设备时,我们所输入执行的每一条指令最终大多都会作用到硬件上,比如下载一款软件最终会下载到硬盘上&am…...

C++实现分布式网络通信框架RPC(2)——rpc发布端
有了上篇文章的项目的基本知识的了解,现在我们就开始构建项目。 目录 一、构建工程目录 二、本地服务发布成RPC服务 2.1理解RPC发布 2.2实现 三、Mprpc框架的基础类设计 3.1框架的初始化类 MprpcApplication 代码实现 3.2读取配置文件类 MprpcConfig 代码实现…...