Debezium日常分享系列之:定制Debezium 信号发送和通知
Debezium日常分享系列之:定制Debezium 信号发送和通知
- 一、自定义信号和通知通道
- 二、结论
Debezium 2.3 在信号和通知功能方面引入了新的改进。除了 Debezium 提供的预定义信号和通知通道之外,您还可以设置新的信号和通知通道。此功能使用户能够自定义系统以满足他们的独特需求,并将其与现有基础设施或第三方解决方案相结合。它通过精确捕获和传达信号事件并通过首选渠道触发通知,实现对数据变化的有效监控和主动响应。
Debezium日常分享系列之:Debezium 信号发送和通知 - 第 1 部分
一、自定义信号和通知通道
在 Debezium 中,可以自定义信号和通知通道以满足特定要求。例如,我们可以通过为信号和通知创建 HTTP 通道来实现自定义。此 HTTP 通道从 http 端点接收信号,并且可以在信号传送后将通知发送回端点。
让我们探索一个示例,演示如何使用 Debezium Postgres 连接器、发送信号的模拟服务器以及通过 http 端点接收通知的 Postbin 来创建和利用 HTTP 信号和通知通道。
设置 HTTP 信号通道:
- 将 Debezium Postgres 连接器配置为在发生相关数据库更改时接收信号。
- 设置服务以使用 HTTP 通道向 Debezium 发送信号。该服务可以是数据库、第三方应用程序或任何其他可以发送 http 请求的系统。在此示例中,我们将使用模拟服务器向 Debezium 发送信号。 Mock Server 是一个可用于模拟 http 请求和响应的服务。
- 配置模拟服务器以使用适当的 HTTP 方法(例如 POST)通过 http 端点发送信号。
- 根据需要自定义 HTTP 通道设置以定义 http 端点 URL、身份验证、标头和任何其他参数。
设置 HTTP 通知通道:
- 一旦 Debezium 接收并处理信号,它就可以触发向 http 端点发布通知。在此示例中,我们将使用 HTTP 通道将通知发送到 Postbin bin。 Postbin是一个可以用来接收http请求并查看请求详细信息的服务。
- 自定义通知的 HTTP 通道设置,在 Postbin 中创建 bin,并根据需要定义 http 端点 URL、身份验证、标头和任何其他参数。
- 使用适当的 HTTP 方法(例如 POST)将通知事件转发到 http 端点,即 Postbin bin。可以根据需要自定义通知负载。
博客文章中此示例的完整源代码在 Debezium 示例存储库的 http-signal-notification 目录下提供。
创建一个 java 项目来构建 HTTP 信号和通知通道。运行以下命令使用 Maven 创建一个新的 java 项目:
mvn archetype:generate-DgroupId=io.debezium.examples-DartifactId=http-signaling-notification
将以下依赖项添加到 Debezium 版本(2.3 及更高版本)的 pom.xml 文件中:
<dependency><groupId>io.debezium</groupId><artifactId>debezium-core</artifactId><version>2.3.0.Final</version>
</dependency>
要使用模拟服务器接收信号,请创建定义模拟服务器服务的 Docker Compose 文件。模拟服务器服务的配置如下:
services:mockServer:image: mockserver/mockserver:latestports:- 1080:1080environment:- MOCKSERVER_WATCH_INITIALIZATION_JSON=true- MOCKSERVER_INITIALIZATION_JSON_PATH=/config/initializerJson.jsonvolumes:- ./initializerJson.json:/config/initializerJson.json
设置环境变量 MOCKSERVER_WATCH_INITIALIZATION_JSON 和 MOCKSERVER_INITIALIZATION_JSON_PATH 以使模拟服务器能够监视初始化 JSON 文件中的更改并指定其路径。包含信号的http请求和响应信息的initializerJson.json文件被安装到模拟服务器容器中。
initializerJson.json 文件定义了一个对路径 /api/signal 的模拟 http 请求,其中查询字符串参数 code=10969。当模拟服务器收到此请求时,它将使用包含 id、类型和数据的 JSON 正文进行响应。响应的状态码为200,表示响应成功。 initializerJson.json文件的定义如下:
[{"httpRequest" : {"method" : "GET","path" : "/api/signal","queryStringParameters" : {"code" : ["10969"]}},"httpResponse" : {"body": "{\"id\":\"924e3ff8-2245-43ca-ba77-2af9af02fa07\",\"type\":\"log\",\"data\":{\"message\": \"Signal message received from http endpoint.\"}}","statusCode": 200}}
]
- id :标识信号实例的任意唯一字符串。
- type :要发送的信号类型。在此示例中,类型为日志,它请求连接器将条目添加到连接器的日志文件中。处理信号后,连接器会在日志中打印指定的消息。
- data :传递给信号事件的 JSON 格式的参数。在此示例中,消息参数被传递给信号事件。
通过实现SignalChannelReader接口创建HTTP信号通道,如下所示:
public class HttpSignalChannel implements SignalChannelReader {private static final Logger LOGGER = LoggerFactory.getLogger(HttpSignalChannel.class);public static final String CHANNEL_NAME = "http";private static final List<SignalRecord> SIGNALS = new ArrayList<>();public CommonConnectorConfig connectorConfig;@Overridepublic String name() { (1)return CHANNEL_NAME;}@Overridepublic void init(CommonConnectorConfig connectorConfig) { (2)this.connectorConfig = connectorConfig;}@Overridepublic List<SignalRecord> read() { (3)try {String requestUrl = "http://mockServer:1080/api/signal?code=10969";// send http request to the mock serverHttpClient httpClient = HttpClient.newHttpClient();HttpRequest request = HttpRequest.newBuilder().uri(URI.create(requestUrl)).GET().header("Content-Type", "application/json").build();// read the responseHttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == 200) {ObjectMapper mapper = new ObjectMapper();String responseBody = response.body();// parse the response bodyJsonNode signalJson = mapper.readTree(responseBody);Map<String, Object> additionalData = signalJson.has("additionalData") ? mapper.convertValue(signalJson.get("additionalData"), new TypeReference<>() {}) : new HashMap<>();String id = signalJson.get("id").asText();String type = signalJson.get("type").asText();String data = signalJson.get("data").toString();SignalRecord signal = new SignalRecord(id, type, data, additionalData);LOGGER.info("Recorded signal event '{}' ", signal);// process the signalSIGNALS.add(signal);} else {LOGGER.warn("Error while reading signaling events from endpoint: {}", response.statusCode());}} catch (IOException | InterruptedException e) {LOGGER.warn("Exception while preparing to process the signal '{}' from the endpoint", e.getMessage());e.printStackTrace();}return SIGNALS;}@Overridepublic void close() { (4)SIGNALS.clear();}
}
- name() 方法返回信号通道的名称。要使 Debezium 能够使用通道,请在连接器的 signal.enabled.channels 属性中指定名称 http。
- init() 方法可用于初始化 http 通道所需的特定配置、变量或连接。
- read() 方法从 http 端点读取信号并返回将由 Debezium 连接器处理的 SignalRecord 对象列表。
- close() 方法关闭所有分配的资源。
通过实现NotificationChannel接口创建通知通道,如下所示:
public class HttpNotificationChannel implements NotificationChannel {private static final Logger LOGGER = LoggerFactory.getLogger(HttpNotificationChannel.class);public static final String CHANNEL_NAME = "http";private static final String NOTIFICATION_PREFIX = "[HTTP NOTIFICATION SERVICE]";@Overridepublic String name() { (1)return CHANNEL_NAME;}@Overridepublic void init(CommonConnectorConfig config) { (2)// custom configuration}@Overridepublic void send(Notification notification) { (3)LOGGER.info(String.format("%s Sending notification to http channel", NOTIFICATION_PREFIX));String binId = createBin();sendNotification(binId, notification);}private static String createBin() {// Create a bin on the servertry {HttpRequest request = HttpRequest.newBuilder().uri(new URI("https://www.toptal.com/developers/postbin/api/bin")).POST(HttpRequest.BodyPublishers.ofString(" ")).build();HttpClient httpClient = HttpClient.newHttpClient();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == HTTP_CREATED) {String binId = response.body().replaceAll(".*\"binId\":\"([^\"]+)\".*", "$1");LOGGER.info("Bin created: " + response.body());return binId;}} catch (URISyntaxException | InterruptedException | IOException e) {throw new RuntimeException(e);}return null;}private static void sendNotification (String binId, Notification notification) {// Get notification from the bintry {ObjectMapper mapper = new ObjectMapper();String notificationString = mapper.writeValueAsString(notification);HttpRequest request = HttpRequest.newBuilder().uri(new URI("https://www.toptal.com/developers/postbin/" + binId)).header("Content-Type", "application/json").POST(HttpRequest.BodyPublishers.ofString(notificationString)).build();HttpClient httpClient = HttpClient.newHttpClient();HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());if (response.statusCode() == HTTP_OK) {LOGGER.info("Notification received : " + response.body());}} catch (URISyntaxException | InterruptedException | IOException e) {throw new RuntimeException(e);}}@Overridepublic void close() { (4)}
}
- name() 方法返回通知通道的名称。要使 Debezium 能够使用通道,请在连接器的 notification.enabled.channels 属性中指定 http。
- init() 方法可用于初始化通道所需的特定配置、变量或连接。
- send() 方法将通知发送到通道。该通知包含由 Debezium 连接器处理的 SignalRecord 对象。
- close() 方法关闭所有分配的资源。
分别在 META-INF/services 目录下的 io.debezium.pipeline.signal.SignalChannelReader 和 io.debezium.pipeline.notification.channels.NotificationChannel 文件下声明 HTTP 信号和通知通道。
编译 Java 项目并将其导出为 JAR 文件。这可以使用 Maven 或您喜欢的构建工具来完成。将 JAR 文件复制到包含要使用的 Debezium 连接器的 JAR 文件的目录。例如,如果您想要将自定义信号和通知通道与 Debezium Postgres 连接器一起使用,请将 JAR 文件复制到 /kafka/connect/debezium-connector-postgres 目录。
此示例提供了一个 Docker Compose 文件,其中定义了必要的服务,包括 Mock Server、Zookeeper、Kafka Connect 和 Postgres 数据库。
要启动服务,请运行以下命令:
export DEBEZIUM_VERSION=2.3
docker-compose up -d
确保服务已启动并正在运行,并且 Postgres 数据库已准备好接受连接后,下一步是注册连接器。这涉及创建连接器配置文件。让我们创建一个名为 register-postgres.json 的文件,其中包含以下属性:
{"name": "inventory-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","tasks.max": 1,"database.hostname": "postgres","database.port": 5432,"database.user": "postgres","database.password": "postgres","database.dbname" : "postgres","topic.prefix": "dbserver1","schema.include.list": "inventory","signal.enabled.channels": "http", 1"notification.enabled.channels": "http" 2}
}
- signal.enabled.channels 属性指定连接器要使用的信号通道。在这种情况下,连接器使用 http 信号通道。
- notification.enabled.channels 属性指定连接器要使用的通知通道。在这种情况下,连接器使用 http 通知通道。
现在我们已经准备好了连接器配置文件,我们可以通过执行以下命令来向 Kafka Connect 注册连接器:
curl -i -X POST -H "Accept:application/json" \-H "Content-Type:application/json" http://localhost:8083/connectors/ \-d @register-postgres.json
连接器成功注册后,您可以查看连接器日志以观察信号事件。这些日志提供了有关连接器的处理和进度的见解,包括任何与信号相关的信息。您将遇到类似于以下内容的日志消息:
Recorded signal event 'SignalRecord{id='924e3ff8-2245-43ca-ba77-2af9af02fa07', type='log', data='{"message":"Signal message received from http endpoint."}', additionalData={}}' [io.debezium.examples.signal.HttpSignalChannel]
此外,您可能会注意到与发送到邮筒的通知事件相关的日志消息。例如:
[HTTP NOTIFICATION SERVICE] Sending notification to http channel [io.debezium.examples.notification.HttpNotificationChannel]
Bin created: {"binId":"1688742588469-1816775151528","now":1688742588470,"expires":1688744388470} [io.debezium.examples.notification.HttpNotificationChannel]
它提供有关通知事件的信息,例如创建具有唯一标识符 (binId) 的 bin 以及其他相关详细信息。要从 Postbin 检索通知事件,请从日志消息中获取 binId 并使用它从 Postbin 请求相应的通知事件。要查看通知事件,您可以使用以下 URL 访问 Postbin:https://www.toptal.com/developers/postbin/b/:binId。将 URL 中的 :binId 替换为从连接器日志中获取的实际 binId。
发送到 Postbin 的通知事件如下所示:
二、结论
在本教程中,我们探讨了如何为 Debezium 连接器创建自定义信号和通知通道。我们创建了一个自定义信号通道,用于从 HTTP 端点接收信号事件。我们还创建了一个自定义通知通道,用于将通知事件发送到 HTTP 端点。
Debezium 的综合信号和通知系统可与第三方解决方案无缝集成,使用户能够随时了解 Debezium 连接器的状态和进度。该系统的可扩展性使用户能够自定义信号和通知渠道,以满足他们的定制需求。
相关文章:

Debezium日常分享系列之:定制Debezium 信号发送和通知
Debezium日常分享系列之:定制Debezium 信号发送和通知 一、自定义信号和通知通道二、结论 Debezium 2.3 在信号和通知功能方面引入了新的改进。除了 Debezium 提供的预定义信号和通知通道之外,您还可以设置新的信号和通知通道。此功能使用户能够自定义系…...

RpcProvider(rpc服务提供者)实现思路
RpcProvider(服务提供者)实现思路 上一节说到,如何将一个本地服务发布成远程服务,但没有说明一个rpc框架怎么进行调用的,看看上节代码 #include <iostream> #include <string> #include "user.pb.h…...

GNSS技术知识你知道多少?这些你或许还未掌握
GNSS信号频段 GNSS频谱图展示了不同的GNSS信号及其星座、载波频率、调制方案,以及所有这些信号在同一L波段频段内如何相互关联,是GNSS专业人员的必备工具,包括设计和开发GNSS系统的工程师,以及测试GNSS系统的工程师。 GNSS术语 …...

YOLOv8教程系列:三、使用YOLOv8模型进行自定义数据集半自动标注
YOLOv8半自动标注 目标检测半自动标注的优点包括: 1.提高标注效率:算法能够自动标注部分数据,减少了人工标注的工作量,节省时间和资源。 2.降低成本:自动标注可以减少人工标注的成本,特别是对于大规模数据…...

AI聊天GPT三步上篮!
1、是什么? CHATGPT是OpenAI开发的基于GPT(Generative Pre-trained Transformer)架构的聊天型人工智能模型。也就是你问它答,根据网络抓去训练 2、怎么用? 清晰表达自己诉求,因为它就是一个AI助手&#…...

如何彻底卸载VMware
目录 第一章、停止并卸载VMware程序1.1)停止VMware有关的服务1.2)打开任务管理器停止进程1.3)卸载VMware程序 第二章、残留文件删除2.1)打开注册表2.2)删除注册表残留文件2.3)C盘文件删除 友情提醒…...

[个人笔记] Windows配置NTP时间同步
Windows - 运维篇 第六章 Windows配置NTP时间同步 Windows - 运维篇系列文章回顾Windows配置NTP时间同步域控环境的NTP配置工作组环境的NTP配置Windows的CMD部分命令集 参考来源 系列文章回顾 第一章 迁移WinSrv系统到虚拟机 第二章 本地安全策略xcopy实现实时备份文件夹内容 …...

Jetson Docker 编译 FFmpeg 支持硬解nvmpi和cuvid
0 设备和docker信息 设备为NVIDIA Jetson Xavier NX,jetpack版本为 5.1.1 [L4T 35.3.1] 使用的docker镜像为nvcr.io/nvidia/l4t-ml:r35.2.1-py3,详见https://catalog.ngc.nvidia.com/orgs/nvidia/containers/l4t-ml 使用下列命令拉取镜像: sudo docker pull nvcr…...

某某某小说app接口抓包分析
详细说明查看原文 https://sdk.qzbonline.com/ver9/shuhuajs/sdk/ioszh_shuhuajs_conf.htmlhttps://sdk.qzbonline.com/prov8/ymqxs/sdk/ios_ymqxs_conf.htmlhttps://sdk.qzbonline.com/prov8/ymqxs/sdk/ios_ymqxs_conf2.htmlhttps://sdk.qzbonline.com/prov8/fqhyxs/sdk/iosz…...

开发一个RISC-V上的操作系统(四)—— 内存管理
目录 往期文章传送门 一、内存管理简介 二、Linker Script 链接脚本 三、动态分配内存 四、测试 往期文章传送门 开发一个RISC-V上的操作系统(一)—— 环境搭建_riscv开发环境_Patarw_Li的博客-CSDN博客 开发一个RISC-V上的操作系统(二…...

区块链:可验证随机函数
本篇主要介绍可验证随机函数的定义及其在区块链上的作用。 1 可验证随机函数 1.1 定义 可验证随机函数(Verifiable Random Function,VRF)本质上还是一类具有验证功能的伪随机函数。对于一个特定的输入 m m m以及输入者的私钥 S K SK SK,VRF会输出一个随…...

Flask中flask-session
Flask中flask-session Flask-Session是一个为Flask应用程序开发的工具,允许您轻松处理服务器端会话。会话是存储和追踪用户特定数据的方式。例如,当用户登录到应用程序时,他们的状态(即登录状态)可以保存在会话中&…...

react-Native init初始化项目报错”TypeError: cli.init is not a function“
文章目录 一、问题:二、解决: 一、问题: 在react-native init appDemo 创建项目时,报错TypeError: cli.init is not a function。 二、解决: 产生这个问题的原因是:使用这种方式创建工程,rea…...

【gitlib】linux系统rpm安装gitlib最新版本
目录 下载gitlib安装包 安装需要的依赖 设置开机启动 安装邮件服务器并设置开机启动 rpm执行安装gitlib 修改gitlib.rb文件的属性 修改完毕后执行更新配置 查看gitlib运行 查看gitlib初始化root密码 gitlib入口访问地址 下载gitlib安装包 Index of /gitlab-ce/yum/el7/…...

iOS开发-检查版本更新与强制更新控制
iOS开发-检查版本更新与强制更新控制。 在开发中经常遇到需要检查版本,检查版本及请求appstoreLookUrl查看版本号与当前的版本号进行比对,看是否需要更新。强制更新控制,是将获取到当前版本号传给服务端,服务端判断当前的版本是否…...

自动化运维工具——Ansible
自动化运维工具——Ansible 一、Ansible概述二、ansible 环境安装部署1.管理端安装 ansible2.ansible 目录结构3.配置主机清单4.配置密钥对验证 三、ansible 命令行模块1.command 模块2.shell 模块3.cron 模块4.user 模块5.group 模块6.copy 模块7.file 模块8.hostname 模块9&a…...

W2NER详解
论文:https://arxiv.org/pdf/2112.10070.pdf 代码:https://github.com/ljynlp/W2NER 文章目录 W2NER介绍模型架构解码 源码介绍数据输入格式模型代码 参考资料 W2NER 介绍 W2NER模型,将NER任务转化预测word-word(备注ÿ…...

ElementUI tabs标签页样式改造美化
今天针对ElementUI的Tabs标签页进行了样式修改,更改为如下图所属的样子。 在线运行地址:JSRUN项目-ElementUI tabs标签页样式改造 大家如果有需要可以拿来修改使用,下面我也简单的贴上代码,代码没有注释,很抱歉&#x…...
出海周报|Temu在美状告shein、ChatGPT安卓版上线、小红书回应闪退
工程机械产业“出海”成绩喜人,山东相关企业全国最多Temu在美状告shein,跨境电商战事升级TikTok将在美国推出电子商务计划,售卖中国商品高德即将上线国际图服务,初期即可覆盖全球超200个国家和地区ChatGPT安卓版正式上线ÿ…...

2023年7月26日 单例模式
单例模式 饿汉模式 package com.wz.cinema.platform.server.util;public class DataManager {/*** 单例模式:整个类在运行中只会有一个实例* 既然是在运行中只有一个实例,那么就必须* 考虑多线程环境** 单例模式分为懒汉模式和饿汉模式* 饿汉模式本身就是…...

[ 容器 ] Docker 安全及日志管理
目录 Docker 容器与虚拟机的区别Docker 存在的安全问题Docker 架构缺陷与安全机制Docker 安全基线标准容器相关的常用安全配置方法限制流量流向镜像安全避免Docker 容器中信息泄露DockerClient 端与 DockerDaemon 的通信安全 容器的安全性问题的根源在于容器和宿主机共享内核。…...

游游的排列构造
示例1 输入 5 2 输出 3 1 5 2 4 示例2 输入 5 3 输出 2 1 4 3 5 #include<bits/stdc.h> using namespace std; typedef long long ll; const int N1e55; int n,k; int main(){scanf("%d%d",&n,&k);int xn-k1;int yn-k;int f1;for(int i1;i&l…...

拯救者Y9000K无线Wi-Fi有时不稳定?该如何解决?
由于不同品牌路由器的性能差异,无法完美兼容最新的无线网卡技术,在连接网络时(特别是网络负载较大的情况下),可能会出现Wi-Fi信号断开、无法网络无法访问、延迟突然变大的情况;可尝试下面方法进行调整。 1…...

【业务功能篇59】Springboot + Spring Security 权限管理 【下篇】
UserDetails接口定义了以下方法: getAuthorities(): 返回用户被授予的权限集合。这个方法返回的是一个集合类型,其中每个元素都是一个GrantedAuthority对象,表示用户被授予的权限。getPassword(): 返回用户的密码。这个方法返回的是一个字符…...

性能优化 - 前端性能监控和性能指标计算方式
性能优化 - 前端性能监控和性能指标计算方式 前言一. 性能指标介绍1.1 单一指标介绍1.2 指标计算① Redirect(重定向耗时)② AppCache(应用程序缓存的DNS解析)③ DNS(DNS解析耗时)④ TCP(TCP连接耗时)⑤ TTFB(请求响应耗时)⑥ Trans(内容传输耗时)⑦ DOM(DOM解析耗时) 1.3 FP(f…...
git stash clear清空本地暂存代码
git stash clear清空本地暂存代码 git stash 或者 git stash list 查看本地暂存的代码。 清除本地暂存的代码修改: git stash clear git回退代码仓库版本_git回退到之前的版本会影响本地代码嘛_zhangphil的博客-CSDN博客git回退代码版本_git回退到之前的版本会影…...

消防应急照明设置要求在炼钢车间电气室的应用
摘 要:文章以GB51309—2018《消防应急照明和疏散指示系统技术标准》为设计依据,结合某炼钢车间转炉项目的设计过程,在炼钢车间电气室的疏散照明和备用照明的设计思路、原则和方法等方面进行阐述。通过选择合理的消防应急疏散照明控制系统及灯具供配电方案…...

element 表单验证 深层验证绑定
直接上代码 :prop 和prop 都可以,vue2和vue3或者是element、elementplus都可以用 <template><div class"page page-table"><section class"page-query-form"><breadcrumb :hasLine"false" /></section&g…...

brew 换镜像网站
在国内,使用brew极慢. 因为它需要访问国外的一些服务器. 解决方法是使用国内的镜像站. 如果是首次安装: curl https://raw.githubusercontent.com/Homebrew/install/master/install.sh > install-brew.sh 然后,在下载的文件中, 修改BREW_REPO为: BREW_REPO"https…...

WIZnet W5500-EVB-Pico 静态IP配置教程(二)
W5500是一款高性价比的 以太网芯片,其全球独一无二的全硬件TCP、IP协议栈专利技术,解决了嵌入式以太网的接入问题,简单易用,安全稳定,是物联网设备的首选解决方案。WIZnet提供完善的配套资料以及实时周到的技术支持服务…...