Debezium发布历史87
原文地址: https://debezium.io/blog/2020/03/19/integration-testing-for-change-data-capture-with-testcontainers/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
使用 Testcontainer 进行变更数据捕获的集成测试
三月 19, 2020 作者: Gunnar Morling
讨论 testcontainers postgres
使用 Debezium 设置变更数据捕获 (CDC) 管道通常只是配置问题,无需涉及任何编程。对 CDC 设置进行自动化测试仍然是一个非常好的主意,确保一切配置正确并且 Debezium 连接器按预期设置。
涉及两个主要组件,其配置需要考虑:
源数据库:必须对其进行设置,以便 Debezium 可以连接到它并检索更改事件;详细信息取决于具体的数据库,例如对于 MySQL,binlog 必须为“行”模式,对于 Postgres,必须安装支持的逻辑解码插件之一等。
Debezium 连接器:必须使用正确的数据库主机和凭据进行配置,可能使用 SSL、应用表和列过滤器、可能一个或多个单一消息转换 (SMT) 等。
这就是新添加的 Debezium对与Testcontainers的集成测试的支持的用武之地。它允许使用 Linux 容器映像设置所有必需的组件(Apache Kafka、Kafka Connect 等)、配置和部署 Debezium 连接器并对生成的运行断言更改数据事件。
让我们看看它是如何完成的。
项目设立
假设您正在使用 Apache Maven 进行依赖项管理,请将以下依赖项添加到您的pom.xml中,引入 Debezium Testcontainers 集成和Apache Kafka 的 Testcontainers 模块:
初始化测试容器
声明了所有必需的依赖项后,就可以编写 CDC 集成测试了。通过 Testcontainers,集成测试是使用 Linux 容器和 Docker 实现的。它提供了一个 Java API,用于启动和管理测试所需的资源。我们可以用它来启动 Apache Kafka、Kafka Connect 和 Postgres 数据库:
public class CdcTest {
private static Network network = Network.newNetwork();
private static KafkaContainer kafkaContainer = new KafkaContainer()
.withNetwork(network);
public static PostgreSQLContainer<?> postgresContainer =
new PostgreSQLContainer<>(“debezium/postgres:11”)
.withNetwork(network)
.withNetworkAliases(“postgres”);
public static DebeziumContainer debeziumContainer =
new DebeziumContainer(“1.1.0.CR1”)
.withNetwork(network)
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer);
@BeforeClass
public static void startContainers() {
Startables.deepStart(Stream.of(
kafkaContainer, postgresContainer, debeziumContainer))
.join();
}
}
定义供所有服务使用的 Docker 网络
为 Apache Kafka 设置容器
为 Postgres 11 设置容器(使用 Debezium 的 Postgres 容器映像)
使用 Debezium 为 Kafka Connect 设置容器
@BeforeClass在一个方法中启动所有三个容器
请注意,您需要安装 Docker 才能使用 Testcontainers。
测试实施
所需的基础设施到位后,我们可以为 CDC 设置编写测试。测试的总体流程是这样的:
为 Postgres 数据库配置 Debezium 连接器
执行几条SQL语句来改变一些数据
使用 Kafka 消费者从相应的 Kafka 主题检索生成的更改数据事件
针对这些事件运行一些断言
这是测试的 shell:
@Test
public void canObtainChangeEventsFromPostgres() throws Exception {
try (Connection connection = getConnection(postgresContainer);
Statement statement = connection.createStatement();
KafkaConsumer<String, String> consumer =
getConsumer(kafkaContainer)) {
// TODO ...
}
}
数据库连接的凭据可以从通过 Testcontainers 启动的 Postgres 容器中获取,很好地避免了任何冗余:
private Connection getConnection(PostgreSQLContainer<?> postgresContainer)
throws SQLException {
return DriverManager.getConnection(postgresContainer.getJdbcUrl(),
postgresContainer.getUsername(),
postgresContainer.getPassword());
}
Kafka 消费者也是如此:
private KafkaConsumer<String, String> getConsumer(
KafkaContainer kafkaContainer) {
return new KafkaConsumer<>(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG,"tc-" + UUID.randomUUID(),ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"),new StringDeserializer(),new StringDeserializer());
}
现在我们来实现实际的测试逻辑:
statement.execute(“create schema todo”);
statement.execute(“create table todo.Todo (” +
"id int8 not null, " +
"title varchar(255), " +
“primary key (id))”);
statement.execute(“alter table todo.Todo replica identity full”);
statement.execute(“insert into todo.Todo values (1, ‘Learn CDC’)”);
statement.execute(“insert into todo.Todo values (2, ‘Learn Debezium’)”);
ConnectorConfiguration connector = ConnectorConfiguration
.forJdbcContainer(postgresContainer)
.with(“database.server.name”, “dbserver1”);
debeziumContainer.registerConnector(“my-connector”,
connector);
consumer.subscribe(Arrays.asList(“dbserver1.todo.todo”));
List<ConsumerRecord<String, String>> changeEvents =
drain(consumer, 2);
ConsumerRecord<String, String> changeEvent = changeEvents.get(0);
assertThat(JsonPath. read(changeEvent.key(), “ . i d " ) ) . i s E q u a l T o ( 1 ) ; a s s e r t T h a t ( J s o n P a t h . < S t r i n g > r e a d ( c h a n g e E v e n t . v a l u e ( ) , " .id")) .isEqualTo(1); assertThat(JsonPath.<String> read(changeEvent.value(), " .id")).isEqualTo(1);assertThat(JsonPath.<String>read(changeEvent.value(),".op”))
.isEqualTo(“r”);
assertThat(JsonPath. read(changeEvent.value(), “$.after.title”))
.isEqualTo(“Learn CDC”);
changeEvent = changeEvents.get(1);
assertThat(JsonPath. read(changeEvent.key(), “ . i d " ) ) . i s E q u a l T o ( 2 ) ; a s s e r t T h a t ( J s o n P a t h . < S t r i n g > r e a d ( c h a n g e E v e n t . v a l u e ( ) , " .id")) .isEqualTo(2); assertThat(JsonPath.<String> read(changeEvent.value(), " .id")).isEqualTo(2);assertThat(JsonPath.<String>read(changeEvent.value(),".op”))
.isEqualTo(“r”);
assertThat(JsonPath. read(changeEvent.value(), “$.after.title”))
.isEqualTo(“Learn Debezium”);
consumer.unsubscribe();
在Postgres数据库中创建一个表并插入两条记录
注册 Debezium Postgres 连接器的实例
从 Kafka 中的更改事件主题读取两条记录并断言它们的属性
请注意 Debezium 的 Testcontainers 支持如何允许从数据库容器播种连接器配置,从而避免显式给出数据库连接属性的需要。仅必须给出唯一的database.server.name,当然您可以应用其他配置选项,例如表或列过滤器、SMT 等。
drain()为了简洁起见,省略了从 Kafka 主题读取给定数量记录的方法的源代码。您可以在 GitHub 上的完整示例中找到它。
基于 JsonPath 的断言可以方便地断言预期数据更改事件的属性,但当然您也可以使用任何其他 JSON API 来完成这项工作。当使用 Apache Avro 而不是 JSON 作为序列化格式时,您必须改用 Avro API。
包起来
Testcontainers 和 Debezium 对它的支持使得为 CDC 设置编写自动化集成测试变得相当容易。
本文讨论的测试方法可以通过多种方式进行扩展。例如,可能需要将连接器配置置于修订控制之下(以便您可以管理和跟踪任何配置更改)并使用这些配置文件驱动测试。您还可以更进一步并测试整个数据流管道。为此,您不仅必须部署 Debezium 连接器,还必须部署接收器连接器,例如用于数据仓库或搜索服务器的连接器。然后,您可以对这些接收器系统中的数据运行断言,确保数据管道端到端的正确性。
您对测试 CDC 设置和管道有何看法?请在下面的评论中告诉我们!
相关文章:
Debezium发布历史87
原文地址: https://debezium.io/blog/2020/03/19/integration-testing-for-change-data-capture-with-testcontainers/ 欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯. 使用 Testcontainer 进行变更数…...
Leetcode131.分割回文串-Palindrome Patitioning-Python-回溯法
解题思路: 1.切割回文串,可以用解决找组合问题的思路解决,而解决组合问题,可以用回溯法,故本题选择回溯法。 2.理解两个事情:1.递归函数里的for循环是横向遍历给定字符串s的每一个字母。2.针对s的每一个字…...
Java面试——基础篇
目录 1、java语言有哪些优点和缺点? 2、JVM 、 JDK 和 JRE的关系 3、为什么说 Java 语言“编译与解释并存”? 4、Java和c的区别 5、基本数据类型 5.1、java的8种基本数据类型: 5.2、基本类型和包装类型的区别: 5.3、包装类型的缓存机…...
C++——结构体
1,结构体基本概念 结构体属于用户自定义的数据类型,允许用户存储不同的数据类型。像int(整型),浮点型,bool型,字符串型等都是属于系统内置的数据类型。而今天要学习的结构体则是属于我们自定义…...
C++技术要点总结, 面试必备, 收藏起来慢慢看
目录 1. 语言对比 1.1 C 11 新特性 2.2 C 和 C 的区别 2.3 Python 和 C 的区别 2. 编译内存相关 2.1. C 程序编译过程 2.2. C 内存管理 2.3. 栈和堆的区别 2.4. 变量的区别 2.5. 全局变量定义在头文件中有什么问题? 2.6. 内存对齐 2.7. 什么是内存泄露 …...
VR数字展厅,平面静态跨越到3D立体化时代
近些年,VR的概念被越来越多的人提起,较为常见的形式就是VR数字展厅。VR数字展厅的出现,让各地以及各行业的展厅展馆的呈现和宣传都发生了很大的改变和革新,同时也意味着展览传播的方式不再局限于原来的图文、视频,而是…...
Linux中LVM实验
LVM实验: 1、分区 -L是大小的意思-n名称的意思 从vg0(卷组)分出来 2、格式化LV逻辑卷 LVM扩容 如果icdir空间不够了, 扩展空间lvextend -L 5G /dev/vg0/lv1 /dev/vg0/lv1(pp,vg,lv) 刷新文件系统xfs_growfs /lvdir VG扩容 …...
外包干了一个月,技术退步明显。。。。。
先说一下自己的情况,本科生,19年通过校招进入南京某软件公司,干了接近4年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…...
gitlab.rb主要配置
根据是否docker安装,进入挂载目录或安装目录 修改此文件,我一般是在可视化窗口中修改,有时候也在命令行手敲 将下面的配置复制到该文件中 external_url http://192.168.100.50 # nginx[listen_port] = 8000 (docker安装的这一行不需要,因为端口映射导致此处修改会导致访问…...
网络协议基础
tcp/ip协议簇 TCP/IP协议族 网络接口层(没有特定的协议) 物理层 数据链路层 网络层: IP (v4/v6) ARP(地址解析协议) RARP . ICMP (Internet控制报文协议) IGMP 传输层: TCP (传输控制协议) UDP (用户数据报协议) 应用层: 都是基于传输层协议的端口,总共端口0~65535 …...
Mac使用adb调试安卓手机
0x00 背景 最近windows电脑休息,用mac办公比较多,手机用时间长了,不太灵光,准备修理一番。于是要用mac调试下android手机。配置略显麻烦,网上的步骤多参差不齐。估计是入门步骤,大佬们也懒得写的太细。于是…...
Web 开发 1: Flask 框架介绍和使用
在 Web 开发中,Flask 是一个流行且灵活的 Python Web 框架,用于构建 Web 应用程序。它简洁而易于上手,适用于小型到中型的项目。在本篇博客中,我将为你介绍 Flask 框架的基础知识和常用技巧,帮助你更好地掌握 Web 开发…...
Centos7.6之禅道开源版17.6.1安装记录
Centos7.6之禅道开源版17.6.1安装记录 文章目录 Centos7.6之禅道开源版17.6.1安装记录1. 下载2. 安装3. 登录4. 连接数据库1. 本地连接2. 远程连接1. 开启远程访问用户2. 更改mysql绑定的主机3. 重启Apache与MySQL服务 4. 常用命令1. Apache和Mysql常用命令2. 其他 1. 下载 官网…...
有趣的代码(简单)
1.代码1 #include<stdio.h> #include<string.h> #include<windows.h> #define _CRT_SECURE_NO_WARNINGS 1 void love() {system("color 4");printf(" **** ***************** ** …...
Java和Redis实现一个简单的热搜功能
1. 前言 我们有一个简单的需求: 搜索栏展示当前登陆的个人用户的搜索历史记录,删除个人历史记录。用户在搜索栏输入某字符,则将该字符记录下来 以zset格式存储的redis中,记录该字符被搜索的个数以及当前的时间戳 (用…...
超越传统,想修哪里就修哪里,SUPIR如何通过文本提示实现智能图像修复
项目简介 通过参数增加使得模型不仅能够修复图像中的错误或损坏,还能根据文本提示进行智能修复。例如根据描述来改变图像中的特定细节。这样的处理方式提升了图像修复的质量和智能度,使得模型能够更准确、更灵活地恢复和改进图像。 SUPIR的主要功能图像…...
《如何画好架构图》学习笔记
看了一堂《如何画好架构图》的公开课,结合网上的资料与经验做一些思考总结。文中的例子和图片大多是从课程中摘录的。 1. 4R架构定义 4R架构定义其实是软件架构定义经过归纳提炼后的简称。 软件架构定义:软件架构是指软件系统的顶层(Rank&am…...
redis整合
一.redis的发布订阅 什么 是发布和订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。 Redis 客户端可以订阅任意数量的频道。 1、Redis的发布和订阅 客户端订阅频道发布的消息 频道发布消息 订阅者就可以…...
开循环低温样品架节约液氦操作技巧
开循环低温样品架以降温快、无轰动源、重量轻、装置便利等特色遭到大多数客户的喜爱。但是制冷剂消耗量引起的运用本钱是客户在运用过程中zhong点重视的问题,特别是随着全球液氦价格继续飙升,开循环样品架的运用本钱也在逐渐添加,如何节约液氦…...
年薪30W+,待遇翻倍,我的经历值得每个测试人借鉴
从自考大专到出走公司,从半年无业露宿深圳北站,从8k…到11.5k…再到20k,我的经历值得每个测试人借鉴 或许学历并没有那么重要 12年高考之后,在朋友的介绍下(骗了过去),没有好好的读大学&#x…...
centos 7 部署awstats 网站访问检测
一、基础环境准备(两种安装方式都要做) bash # 安装必要依赖 yum install -y httpd perl mod_perl perl-Time-HiRes perl-DateTime systemctl enable httpd # 设置 Apache 开机自启 systemctl start httpd # 启动 Apache二、安装 AWStats࿰…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...
【实施指南】Android客户端HTTPS双向认证实施指南
🔐 一、所需准备材料 证书文件(6类核心文件) 类型 格式 作用 Android端要求 CA根证书 .crt/.pem 验证服务器/客户端证书合法性 需预置到Android信任库 服务器证书 .crt 服务器身份证明 客户端需持有以验证服务器 客户端证书 .crt 客户端身份…...
GeoServer发布PostgreSQL图层后WFS查询无主键字段
在使用 GeoServer(版本 2.22.2) 发布 PostgreSQL(PostGIS)中的表为地图服务时,常常会遇到一个小问题: WFS 查询中,主键字段(如 id)莫名其妙地消失了! 即使你在…...
