Debezium嵌入式连接postgresql封装服务
文章目录
- 1.项目结构:
- 2.依赖:
- 3.application.properties
- 4.DebeziumConnectorConfig类
- 5.TableEnum类
- 6.TableHandler接口(表处理抽象)
- 7.DefaultTableHandler默认实现类
- 8.UserTableHandler处理类
- 9.TableHandlerFactory工厂
- 10.DebeziumListener 监听事件
- 11.测试
环境:JDK8,Debezium1.94,postgresql12
1.项目结构:

2.依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.7.18</version></parent><groupId>com.linging</groupId><artifactId>springboot-debezium-server</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><debezium.version>1.9.4.Final</debezium.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium.version}</version><exclusions><exclusion><artifactId>slf4j-reload4j</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-postgres</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.43</version></dependency></dependencies></project>
3.application.properties
# Debezium Configuration
#连接器基本信息
#指定 Debezium 连接器的名称,唯一标识,用于在 Kafka Connect 中区分不同的连接器。
debezium.name=my-postgres-connector
#指定连接器的类名,这里是连接postgresql
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector#偏移量相关
#指定偏移量存储的实现类,这里使用的是文件存储,将偏移量存储在本地文件中。
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
#处理数据的偏移量存储路径
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
#处理数据的偏移量提交时间间隔,单位毫秒,设置为 0 表示每次处理完一批记录后立即提交偏移量,这可以减少数据丢失的风险,但会增加系统开销
#以上一次提交时间开始计算
debezium.offset.flush.interval.ms=10000#数据库连接信息
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
#捕获变更的数据库名称
debezium.database.dbname=db_test
#指定逻辑服务器的唯一标识,用于区分不同的数据库实例
debezium.database.server.id=postgresql_0
#指定逻辑服务器的名称,用于在 Kafka 主题中区分不同的数据库实例
debezium.database.server.name=customer_postgres_db_server#数据库历史记录
#指定数据库模式记录的实现类,这里使用的是文件存储,将数据库历史记录存储在本地文件中
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
#指定数据库模式记录文件的路径
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat#表和字段过滤
#debezium.table.include.list=public.user
#debezium.column.include.list=public.user.id,public.user.name#其他配置
#指定是否自动创建 PostgreSQL 的逻辑复制槽,设置为 filtered 表示只捕获配置中指定的表和字段的变更
debezium.publication.autocreate.mode=filtered
#指定 PostgreSQL 的逻辑复制插件名称,pgoutput是 PostgreSQL 的逻辑复制插件名称,用于捕获变更
debezium.plugin.name=pgoutput
#指定逻辑复制槽的名称
debezium.slot.name=dbz_customerdb_listener
#不执行初始快照,直接捕获变更数据,取值:never、initial、when_needed
debezium.snapshot.mode=never
#批量提交条数
debezium.max.batch.size=100logging.level.root=INFO
logging.level.io.debezium.postgres.BinlogReader=INFO
logging.level.io.davidarhcanjo=DEBUG
logging.level.io.debezium=INFO
4.DebeziumConnectorConfig类
@Configuration
public class DebeziumConnectorConfig {@Beanpublic Properties customerConnector(Environment env) {Properties props = new Properties();props.setProperty("name", env.getProperty("debezium.name"));props.setProperty("connector.class", env.getProperty("debezium.connector.class"));props.setProperty("offset.storage", env.getProperty("debezium.offset.storage"));props.setProperty("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename"));props.setProperty("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms"));props.setProperty("database.hostname", env.getProperty("debezium.database.hostname"));props.setProperty("database.port", env.getProperty("debezium.database.port"));props.setProperty("database.user", env.getProperty("debezium.database.user"));props.setProperty("database.password", env.getProperty("debezium.database.password"));props.setProperty("database.dbname", env.getProperty("debezium.database.dbname"));props.setProperty("database.server.id", env.getProperty("debezium.database.server.id"));props.setProperty("database.server.name", env.getProperty("debezium.database.server.name"));props.setProperty("database.history", env.getProperty("debezium.database.history"));props.setProperty("database.history.file.filename", env.getProperty("debezium.database.history.file.filename"));props.setProperty("table.include.list", TableEnum.getTableNames()); //表名props.setProperty("column.include.list", TableEnum.getColumns()); // 表中得哪些字段props.setProperty("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode"));props.setProperty("plugin.name", env.getProperty("debezium.plugin.name"));props.setProperty("slot.name", env.getProperty("debezium.slot.name"));props.setProperty("snapshot.mode", env.getProperty("debezium.snapshot.mode"));props.setProperty("max.batch.size", env.getProperty("debezium.max.batch.size"));return props;}
}
5.TableEnum类
package com.linging.enums;import java.util.Arrays;
import java.util.stream.Collectors;/*** 监听的表及字段配置* @author Linging* @version 1.0.0* @since 1.0*/
public enum TableEnum {DEFAULT("default", "defaultTableHandler", null),USER("public.user", "userTableHandler", "public.user.id,public.user.name"),;// 表名称private final String tableName;// 表处理类的名称private final String handlerName;// 表的字段名称,多个用逗号隔开public final String columnName;TableEnum(String tableName, String handlerName, String columnName) {this.tableName = tableName;this.handlerName = handlerName;this.columnName = columnName;}public String getTableName() {return tableName;}public String getHandlerName() {return handlerName;}public String getColumnName() {return columnName;}public static String getTableNames(){return Arrays.stream(TableEnum.values()).map(TableEnum::getTableName).filter(name -> !"default".equals(name)).distinct().collect(Collectors.joining(","));}public static String getColumns(){return Arrays.stream(TableEnum.values()).filter(e -> !"default".equals(e.getTableName()) && e.getColumnName() != null).map(TableEnum::getColumnName).distinct().collect(Collectors.joining(","));}
}
6.TableHandler接口(表处理抽象)
public interface TableHandler {void handle(SourceRecord sourceRecord);void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer);
}
7.DefaultTableHandler默认实现类
/*** 默认处理类*/
@Component("defaultTableHandler")
public class DefaultTableHandler implements TableHandler {private static final Logger log = LoggerFactory.getLogger(DefaultTableHandler.class);@Overridepublic void handle(SourceRecord sourceRecord) {log.info("Handling default table: {}", sourceRecord.topic());log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());}@Overridepublic void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {log.info("Handling batch default table: {}", recordChangeEvents.size());}
}
8.UserTableHandler处理类
/*** user表变更处理类*/
@Component("userTableHandler")
public class UserTableHandler implements TableHandler {private static final Logger log = LoggerFactory.getLogger(UserTableHandler.class);@Overridepublic void handle(SourceRecord sourceRecord) {log.info("Handling user table: {}", sourceRecord.topic());log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());// 添加具体的处理逻辑Struct sourceRecordChangeValue= (Struct) sourceRecord.value();if (sourceRecordChangeValue != null) {Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));// 处理非读操作if(operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;Struct struct = (Struct) sourceRecordChangeValue.get(record);Map<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// TODO 处理逻辑(保存数据库,发送MQ等操作,需要保证幂等)log.info("Updated Data: {} with Operation: {}", payload, operation.name());}}}@Overridepublic void handleBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) {for (RecordChangeEvent<SourceRecord> recordChangeEvent : recordChangeEvents) {try {SourceRecord sourceRecord = recordChangeEvent.record();// TODO 处理逻辑(保存数据库,发送MQ等操作,需要保证幂等)this.handle(sourceRecord);// 标记已处理committer.markProcessed(recordChangeEvent);} catch (InterruptedException e) {log.error("处理异常:", e);}}}
}
9.TableHandlerFactory工厂
@Component
public class TableHandlerFactory implements ApplicationContextAware {@Value("${debezium.database.server.name}")private String prefixServerName;private ApplicationContext context;private final Map<String, TableHandler> handlers = new HashMap<>();@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context = applicationContext;}@PostConstructpublic void init(){String name = null;for (TableEnum tableEnum : TableEnum.values()) {if(TableEnum.DEFAULT.equals(tableEnum)){name = tableEnum.getTableName();}else{name = getTableName(tableEnum.getTableName());}handlers.putIfAbsent(name,context.getBean(tableEnum.getHandlerName(), TableHandler.class));}}public TableHandler getHandler(String tableName) {return handlers.getOrDefault(tableName, handlers.get(TableEnum.DEFAULT.getTableName()));}public String getTableName(String name){return prefixServerName + "." + name;}}
10.DebeziumListener 监听事件
@Component
public class DebeziumListener {private static final Logger log = LoggerFactory.getLogger(DebeziumListener.class);private final ExecutorService executor = Executors.newSingleThreadExecutor();private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;private final TableHandlerFactory tableHandlerFactory;@Autowiredpublic DebeziumListener(Properties customerConnector, TableHandlerFactory tableHandlerFactory) {this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(customerConnector).using(OffsetCommitPolicy.periodic(customerConnector)).notifying(this::handleChangeEventBatch).build();this.tableHandlerFactory = tableHandlerFactory;}/*** 批量记录处理* @param recordChangeEvents* @param committer*/private void handleChangeEventBatch(List<RecordChangeEvent<SourceRecord>> recordChangeEvents,DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer){// 根据表分组Map<String, List<RecordChangeEvent<SourceRecord>>> tableName2List = recordChangeEvents.stream().collect(Collectors.groupingBy(event -> event.record().topic()));tableName2List.forEach((tableName, recordChangeEventList) -> {TableHandler handler = tableHandlerFactory.getHandler(tableName);handler.handleBatch(recordChangeEventList, committer);});try {// 触发提交策略committer.markBatchFinished();} catch (InterruptedException e) {log.error("提交异常:", e);}}/*** 单条记录处理* @param sourceRecordChangeEvent*/private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordChangeEvent) {SourceRecord sourceRecord = sourceRecordChangeEvent.record();log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());String tableName = sourceRecord.topic();// 获取对应的表处理类TableHandler handler = tableHandlerFactory.getHandler(tableName);handler.handle(sourceRecord);}@PostConstructprivate void start() {this.executor.execute(debeziumEngine);}@PreDestroyprivate void stop() {if (this.debeziumEngine != null) {try {this.debeziumEngine.close();} catch (IOException e) {log.error("关闭debeziumEngine异常:", e);}}this.executor.shutdown();}
}
11.测试
启动服务,修改数据库user表数据:


相关文章:
Debezium嵌入式连接postgresql封装服务
文章目录 1.项目结构:2.依赖:3.application.properties4.DebeziumConnectorConfig类5.TableEnum类6.TableHandler接口(表处理抽象)7.DefaultTableHandler默认实现类8.UserTableHandler处理类9.TableHandlerFactory工厂10.Debezium…...
Mixed Content: The page at https://xxx was loaded over HTTPS
一、核心原因分析 Mixed Content 警告是由于 HTTPS 页面中引用了 HTTP 协议的资源(如脚本、图片、iframe 等),导致浏览器因安全策略阻止加载这些非加密内容。HTTP 资源可能被中间人攻击篡改,破坏 HTTPS 页面的整体安全性。 二、推荐解决方案 1. 强制资源升级为 HTTPS •…...
深度学习、图像算法学习记录
深度学习加速 综述文档: https://chenzomi12.github.io/02Hardware01Foundation/02ArchSlim.html winograd: https://zhuanlan.zhihu.com/p/260109670 ncnn 1.修改模型结构,优化模型内存访问次数,加速。 VGG 和 InceptionNet : …...
对象的创建方式有哪些?在虚拟机中具体的创建过程是怎样的?
在Java中,对象的创建方式及其在虚拟机中的具体过程如下: 一、对象的创建方式 使用 new 关键字 最常见的对象创建方式,直接调用类的构造方法。 MyClass obj new MyClass();反射(Reflection) 通过 Class 或 Constructor…...
Python 爬取 1688.item_get_factory 接口:获取工厂档案信息实战指南
在电商采购和供应链管理中,了解供应商的工厂信息是至关重要的一步。1688 作为国内领先的 B2B 平台,提供了丰富的供应商和工厂档案信息。通过 item_get_factory API 接口,开发者可以获取工厂的详细信息,包括工厂名称、地址、联系方…...
15. git push
基本概述 git push 的作用是:把本地分支的提交推送到远程仓库。推送分支需要满足快进规则(Fast-Forward),即远程分支的最新提交必须是本地分支的直接祖先,这个是通过哈希值值进行判断的。 基本用法 1.完整格式 git…...
Perl 发送邮件
Perl 发送邮件 概述 Perl 是一种强大的编程语言,广泛应用于系统管理、网络编程和数据分析等领域。其中,使用 Perl 发送邮件是一项非常实用的技能。本文将详细介绍使用 Perl 发送邮件的方法,包括必要的配置、代码示例以及注意事项。 准备工…...
Rust所有权详解
文章目录 Rust所有权所有权规则作用域 内存和分配移动与克隆栈空间堆空间 关于函数的所有权机制作为参数作为返回值 引用与租借垂悬引用 Rust所有权 C/C中我们对于堆内存通常需要自己手动管理,手动申请和释放,即便有了智能指针,对于效率的影…...
大模型推理--Qwen2.5-Omni在A100上的初体验
过去的一周Qwen2.5-Omni产生了很高的热度,吸引了很多人的目光。它的多模态确实很吸引人,放出来的demo体验还算尚可(语音对话的延迟还是太大),所以就在A100 PCIe上实地部署了一下,初步对其速度进行了测试&am…...
CExercise_07_1指针和数组_2数组元素的逆序数组逆序(指针版 reverse_by_ptr 和下标版 reverse_arr)
题目: 数组元素的逆序。要求使用[]运算符以及纯粹指针操作两种方式来完成。 关键点 arr[i] arr[len - 1 - i]; arr[0]arr[len-1]; 如果数组序列是偶数,则调换最中间一对为止;若为奇数,则单出一个不用反转. 思想就是长度取一半 eg:8/2, 9/24.5,反转一半,到5时固定…...
框架PasteForm实际开发案例,换个口味显示数据,支持echarts,只需要标记几个特性即可在管理端显示(2)
PasteForm框架的主要思想就是对Dto进行标记特性,然后管理端的页面就会以不一样的UI呈现 使用PasteForm框架开发,让你免去开发管理端的烦恼,你只需要专注于业务端和用户端! 在管理端中,如果说表格是基本的显示方式,那么图表chart就是一个锦上添花的体现! 如果一个项目拥…...
Starrocks的Bitmap索引和Bloom filter索引以及全局字典
写这个的主要作用是梳理一下Starrocks的索引效率以及使用场景。 Starrocks Bitmap索引 原理: Bitmap 索引是一种使用 bitmap 的特殊数据库索引。bitmap 即为一个 bit 数组,一个 bit 的取值有两种:0 或 1。 每一个 bit 对应数据表中的一行&…...
Explain的使用
1.使用explain语句去查看分析结果 如explain select * from test1 where id=1;会出现:id selecttype table type possible_keys key key_len ref rows extra各列。 其中, type=const表示通过索引一次就找到了; key=primary的话,表示使用了主键; type=all,表示为全表…...
QML面试笔记--UI设计篇05容器控件
1. QML中容器控件全解:构建灵活界面的基石 1.1. Item(万物容器)1.2. Rectangle(视觉容器)1.3. ListView(动态列表容器)1.4. Frame(表单容器)1.5. SwipeView(页…...
Windows操作系统安全配置(一)
1.操作系统和数据库系统管理用户身份标识应具有不易被冒用的特点,口令应有复杂度要求并定期更换 配置方法:运行“gpedit.msc”计算机配置->Windows设置->安全设置>帐户策略->密码策略: 密码必须符合复杂性要求->启用 密码长度最小值->…...
LibreOffice 自动化操作目录
一、应用场景 批量更新 Word/ODT 文档目录自动化生成报告模板与 Python 结合实现文档处理流水线 二、环境准备 1. 安装 LibreOffice 下载地址: LibreOffice 官网版本要求: 7.2(确保支持最新 UNO API)安装注意: 勾选“创建快速…...
基于大模型应用技能的学习路径
总览与优先级 基础知识巩固与扩展(2-4周)数据处理与机器学习基础(4-6周)深度学习基础与PyTorch框架(6-8周)自然语言处理(NLP)基础与Transformer架构(6-8周)F…...
VSCode运行,各类操作缓慢,如何清理
VSCode写代码,随着项目逐步进展,代码量在增加,依赖的第三方头文件也在增加, 先是发现代码提示的速度变慢, 后来格式化代码速度太慢 然后c/c代码的语法检查有时候压根就失败,来个错误提示 还有source contro…...
2024年的核心技术与最佳实践
前端开发领域近年来经历了翻天覆地的变化,从简单的HTML/CSS页面到如今复杂的单页应用(SPA)和渐进式Web应用(PWA)。本文将探讨2024年前端开发的核心技术栈、工具链和最佳实践。 一、前端三大基石的最新进展 1. HTML5的增强特性 Web Components标准化 原生对话框(&…...
redis(2)-mysql-锁
1.数据倾斜: 解决:虚拟节点 2.缓存穿透:缓存雪崩、击穿 3.分布式锁 多把锁控制不同节点上的一致性问题。 锁是有失效时间的。 强制回收。 4.redis 和zookeeper的区别 redis 数据支持有效期 4.1 zookeeper 分布式一致性服务框架&am…...
LeetCode 热题 100 题解记录
LeetCode 热题 100 题解记录 哈希 1. 两数之和 利用Map判断是否包含需要的值来求解 49. 字母异位词分组 初始化哈希表: 创建一个哈希表 map,用于存储分组结果。键为排序后的字符串,值为原字符串列表。 遍历输入字符串数组: 对于…...
OpenLayers:海量图形渲染之矢量切片
最近由于在工作中涉及到了海量图形渲染的问题,因此我开始研究相关的解决方案。在咨询了许多朋友之后发现矢量切片似乎是行业内最常用的一种解决方案,于是我便开始研究它该如何使用。 一、什么是矢量切片 矢量切片按照我的理解就是用栅格切片的方式把矢…...
AI智算-K8s+vLLM Ray:DeepSeek-r1 671B 满血版分布式推理部署实践
K8s + vLLM & Ray:DeepSeek-r1 671B 满血版分布式推理部署实践 前言环境准备1. 模型下载2. 软硬件环境介绍正式部署1. 模型切分2. 整体部署架构3. 安装 LeaderWorkerSet4. 通过 LWS 部署DeepSeek-r1模型5. 查看显存使用率6. 服务对外暴露7. 测试调用API7.1 通过 curl7.2 通…...
tcp/ip攻击及防范
作为高防工程师,我每天拦截数以万计的恶意流量,其中TCP/IP协议层攻击是最隐蔽、最具破坏性的威胁之一。常见的攻击手法包括: 1. SYN Flood攻击:攻击者发送大量伪造的SYN包,耗尽服务器连接资源,导致正常用…...
深入浅出SPI通信协议与STM32实战应用(W25Q128驱动)(实战部分)
1. W25Q128简介 W25Q128 是Winbond推出的128M-bit(16MB)SPI接口Flash存储器,支持标准SPI、Dual-SPI和Quad-SPI模式。关键特性: 工作电压:2.7V~3.6V分页结构:256页/块,每块16KB,共1…...
前端知识点---闭包(javascript)
文章目录 1.怎么理解闭包?2.闭包的特点3.闭包的作用?4 闭包注意事项:5 形象理解6 闭包的应用 1.怎么理解闭包? 函数里面包着另一个函数,并且内部函数可以访问外部函数的变量。 <script> function box() {//周围状态(外部函数中定义的…...
Java 泛型的逆变与协变:深入理解类型安全与灵活性
泛型是 Java 中强大的特性之一,它提供了类型安全的集合操作。然而,泛型的类型关系(如逆变与协变)常常让人感到困惑。 本文将深入探讨 Java 泛型中的逆变与协变,帮助你更好地理解其原理和应用场景。 一、什么是协变与…...
Web框架 --- Web服务器和Web应用服务器
Web框架 --- Web服务器和Web应用服务器 什么是HTTP Web服务器Web框架与Web服务器的关系 --- 以SpringBoot 和 Tomcat为例Simple Web Server Example 在日常开发的时候不管是用什么样的Web框架,比如Srpingboot或者ASP.Net, 我们只要在IDE里点击Run,项目就…...
【SpringBoot】98、SpringBoot中整合springdoc-openapi-ui接口文档
1、引入依赖 引入依赖<dependency><groupId>org.springdoc</groupId><artifactId>springdoc-openapi-ui...
多线程(进阶)(内涵面试题)
目录 一、常见的锁策略 1. 悲观锁 vs 乐观锁 2. 重量级锁 vs 轻量级锁 3. 挂起等待锁 vs 自旋锁 4. 普通互斥锁 vs 读写锁 5. 可重入锁 vs 不可重入锁 6. 公平锁 vs 非公平锁 7. 面试题 二、synchronized的原理 1. 基本特点 2. 加锁工作过程 1)偏向锁&am…...
