当前位置: 首页 > news >正文

2024.07纪念一 debezium : spring-boot结合debezium

使用前提:

一、mysql开启了logibin

在mysql的安装路径下的my.ini中 

【mysqlid】下

添加
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

参考gitee的项目,即拉即用。参考地址

zhanghl/spring-boot-debeziumicon-default.png?t=N7T8https://gitee.com/zhl001/spring-boot-debezium项目中一个三个文件,pom和两个类需要参考。

pom.xml

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.13.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.felord</groupId><artifactId>spring-boot-debezium</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-debezium</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><debezium.version>1.5.2.Final</debezium.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springboot与mybatis的整合包--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.0</version></dependency><!--mysql驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--springboot与JDBC整合包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!--sqlserver数据源--><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>sqljdbc4</artifactId><version>4.0</version></dependency></dependencies>

两个java类

DebeziumConfiguration.java
package cn.felord.debezium.debezium;import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.List;
import java.util.Map;import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;/*** The type Debezium configuration.** @author n1* @since 2021 /6/1 17:01*/
@Configuration
public class DebeziumConfiguration {/*** Debezium 配置.** @return configuration*/@Beanio.debezium.config.Configuration debeziumConfig() {return io.debezium.config.Configuration.create()
//            连接器的Java类名称.with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
//                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。.with("offset.storage.file.filename", "/tmp/offsets.dat")
//                捕获偏移量的周期.with("offset.flush.interval.ms", "1")
//               连接器的唯一名称.with("name", "mysql-connector")
//                数据库的hostname.with("database.hostname", "10.1.1.1")
//                端口.with("database.port", "3306")
//                用户名.with("database.user", "canal")
//                密码.with("database.password", "canal")
//                 包含的数据库列表,你的数据库.with("database.include.list", "md_test")
//                是否包含数据库表结构层面的变更,建议使用默认值true.with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id.with("database.server.id", 1)
//                	MySQL 服务器或集群的逻辑名称.with("database.server.name", "customer-mysql-db-server")
//                历史变更记录.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
//                历史变更记录存储位置,存储DDL.with("database.history.file.filename", "/tmp/dbhistory.dat").build();}/*** Debezium server bootstrap debezium server bootstrap.** @param configuration the configuration* @return the debezium server bootstrap*/@BeanDebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);return debeziumServerBootstrap;}private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {recordChangeEvents.forEach(r -> {SourceRecord sourceRecord = r.record();String topic = sourceRecord.topic();Struct sourceRecordChangeValue = (Struct) sourceRecord.value();if (sourceRecordChangeValue != null) {// 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));if (operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;// 获取增删改对应的结构体数据Struct struct = (Struct) sourceRecordChangeValue.get(record);// 将变更的行封装为MapMap<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// 这里简单打印一下System.out.println("operation = " + operation);System.out.println("data = " + payload);if(operation.toString().equals("CREATE")){System.out.println("新增记录一条");}//tabelNameif(topic.split("\\.").length > 2){String tableName = topic.split("\\.")[2];System.out.println("tabelName" + tableName);}}}});}}
DebeziumServerBootstrap.java
package cn.felord.debezium.debezium;import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;import java.util.concurrent.Executor;
import java.util.concurrent.Executors;/*** @author n1* @since 2021/6/2 10:45*/
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {private final Executor executor = Executors.newSingleThreadExecutor();private DebeziumEngine<?> debeziumEngine;@Overridepublic void start() {executor.execute(debeziumEngine);}@SneakyThrows@Overridepublic void stop() {debeziumEngine.close();}@Overridepublic boolean isRunning() {return false;}@Overridepublic void afterPropertiesSet() throws Exception {Assert.notNull(debeziumEngine, "debeziumEngine must not be null");}
}

 结语:

太强了,比canal强10倍,非侵入性。对比可参考:

不想引入MQ?不妨试试 Debezium-CSDN博客

为什么是debezium

这么多技术框架,为什么选debezium?

看起来很多。但一一排除下来就debezium和canal。

sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web领域的structs2。而且,它们基于查询而非binlog日志,其实不属于CDC。首先排除。

flink cdc是大数据领域的框架,一般web项目的数据量属于大材小用了。

同时databus,maxwell相对比较冷门,用得比较少。

「最后不用canal的原因有以下几点:」

  • canal需要安装,这违背了“如非必要,勿增实体”的原则。

  • canal只能对MYSQL进行CDC监控。有很大的局限性。

  • 大数据领域非常流行的flink cdc(阿里团队主导)底层使用的也是debezium,而非同是阿里出品的canal。

  • debezium可借助kafka组件,将变动的数据发到kafka topic,后续的读取操作只需读取kafka,可有效减少数据库的读取压力。可保证一次语义,至少一次语义。

  • 同时,也可基于内嵌部署模式,无需我们手动部署kafka集群,可满足”如非必要,勿增实体“的原则。

  • 而且canal只支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

实时监视同步数据库变更,这个框架真是神器_Mysql

相关文章:

2024.07纪念一 debezium : spring-boot结合debezium

使用前提&#xff1a; 一、mysql开启了logibin 在mysql的安装路径下的my.ini中 【mysqlid】下 添加 log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义&#xff0c;不要和 canal 的 slaveId 重复 参考gitee的项目…...

mysql怎么查询json里面的字段

mysql怎么查询json里面的字段&#xff1a; 要在 MySQL 数据库中查询 JSON 字段中的 city 值&#xff0c;你可以使用 MySQL 提供的 JSON 函数。假设表名是 your_table&#xff0c;包含一个名为 json_column 的 JSON 字段。 以下是一个查询示例&#xff0c;展示如何从 json_colu…...

C++ 右值 左值引用

一.什么是左值引用 右值引用 1.左值引用 左值是一个表示数据的表达式(如变量名或解引用的指针)&#xff0c;我们可以获取它的地址可以对它赋值。定义时const修饰符后的左值&#xff0c;不能给他赋值&#xff0c;但是可以取它的地址。左值引用就是给左值的引用&#xff0c;给左…...

「JavaEE」Spring IoC 1:Bean 的存储

&#x1f387;个人主页 &#x1f387;所属专栏&#xff1a;Spring &#x1f387;欢迎点赞收藏加关注哦&#xff01; IoC 简介 IoC 全称 Inversion of Control&#xff0c;即控制反转 控制反转是指控制权反转&#xff1a;获得依赖对象的过程被反转了 传统开发模式中&…...

springBoot快速搭建WebSocket

添加依赖 在pom.xml中加入WebSocket相关依赖&#xff1a; <dependencies><!-- websocket --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>…...

掌控授权的艺术:Laravel自定义策略模式深度解析

掌控授权的艺术&#xff1a;Laravel自定义策略模式深度解析 在现代Web应用开发中&#xff0c;权限管理是核心功能之一。Laravel框架通过其策略模式提供了一种优雅的方式来处理授权问题。然而&#xff0c;随着应用的复杂性增加&#xff0c;内置的策略可能不足以满足所有需求。这…...

Git操作指令(随时更新)

Git操作指令 一、安装git 1、设置配置信息&#xff1a; # global全局配置 git config --global user.name "Your username" git config --global user.email "Your email"# 显示颜色 git config --global color.ui true# 配置别名&#xff0c;各种指令都…...

SpringSecurity自定义登录方式

自定义登录&#xff1a; 定义Token定义Filter定义Provider配置类中定义登录的接口 自定义AuthenticationToken public class EmailAuthenticationToken extends UsernamePasswordAuthenticationToken{public EmailAuthenticationToken(Object principal, Object credentials) …...

黑神话悟空是什么游戏 黑神话悟空配置要求 黑神话悟空好玩吗值得买吗 黑神话悟空苹果电脑可以玩吗

《黑神话&#xff1a;悟空》的类型定义是一款单机动作角色扮演游戏&#xff0c;但实际体验后会发现&#xff0c;游戏在很多设计上采用了「魂like」作品的常见元素。根据个人上手试玩&#xff0c;《黑神话&#xff1a;悟空》的推进节奏比较接近魂类游戏&#xff0c;Boss战也更像…...

深入浅出消息队列----【延迟消息的实现原理】

深入浅出消息队列----【延迟消息的实现原理】 粗说 RocketMQ 的设计细说 RocketMQ 的设计这样实现是否有什么问题&#xff1f; 本文仅是文章笔记&#xff0c;整理了原文章中重要的知识点、记录了个人的看法 文章来源&#xff1a;编程导航-鱼皮【yes哥深入浅出消息队列专栏】 粗…...

npm提示 certificate has expired 证书已过期 已解决

在用npm新建项目时&#xff0c;突然发现报错提示 : certificate has expired 证书已过期 了解一下&#xff0c;在网络通信中&#xff0c;HTTPS 是一种通过 SSL/TLS 加密的安全 HTTP 通信协议。证书在 HTTPS 中扮演着至关重要的角色&#xff0c;用于验证服务器身份并加密数据传输…...

KEIL如何封装文件成lib

一、为什么要封装文件成LIB 提高编译效率 如果一份文件已经在整个工程中发挥出了我们期待的作用&#xff0c;现在想要将其封装成库&#xff0c;则可以在已经成型的工程文件中将不需要编译的文件从工程全部移出掉&#xff0c;只留下我们需要封装的文件&#xff0c;这样就可以提…...

【python】OpenCV—Faster Video File FPS

文章目录 1、需求描述2、正常方法 cv2.read3、加速方法 imutils.video.FileVideoStream4、涉及到的核心库函数4.1、imutils.video.FPS4.2、imutils.video.FileVideoStream 5、参考 1、需求描述 使用线程和队列数据结构将视频文件的 FPS 速率提高 &#xff01; 我们的目标是将…...

JavaScript变量的类型转换

类型转换分为两种:显示类型转换、隐式类型转换 1.显示类型转换 String()Number()Boolean()toString()parseInt(string)parseFloat(string)2.隐式类型转换 (1)isNaN () 判断指定的参数是否为 NaN(非数字类型),返回结果为 Boolean 类型。也就是说:任何不能被转换为数值的…...

如何申请免费SSL证书以消除访问网站显示连接不安全提醒

在当今互联网时代&#xff0c;网络安全已成为一个不可忽视的问题。当用户浏览一些网站时&#xff0c;有时会看到浏览器地址栏出现“不安全”的提示&#xff0c;这意味着该网站没有安装SSL证书&#xff0c;数据传输可能存在风险。那么&#xff0c;如何消除这种不安全提醒&#x…...

关于P2P(点对点)

P2P 是一种客户端与客户端之间&#xff0c;点对点连接的技术&#xff0c;在早前的客户端都是公网IP&#xff0c;没有NAT的情况下&#xff0c;P2P是较为容易实现的。 但现在的P2P&#xff0c;实现上面会略微有一些复杂&#xff1a;需要采取UDP打洞的技术&#xff0c;但UDP打出来…...

前端怎么本地起一个服务查看本地文件

1.安装拓展 安装 Live Server拓展 2.创建一个html文件 3.在html文件中右键选择 Open with Live Server 4.浏览器打开运行的地址&#xff0c;并去除路径&#xff0c;例如:http://127.0.0.1:5500/...

建造者模式(Builder Pattern)

建造者模式&#xff08;Builder Pattern&#xff09;是一种创建型设计模式&#xff0c;它主要用于将一个复杂对象的构建与它的表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。这种设计模式的核心思想是将一个复杂对象的构建分解成多个相对简单的步骤&#xff0c;并…...

【MySQL】索引 【下】{聚簇索引VS非聚簇索引/创建主键索引/全文索引的创建/索引创建原则}

文章目录 1.聚簇索引 VS 非聚簇索引经典问题 2.索引操作创建主键索引唯一索引的创建普通索引的创建全文索引的创建查询索引删除索引索引创建原则 1.聚簇索引 VS 非聚簇索引 之前介绍的将所有的数据都放在叶子节点的这种存储引擎对应的就是 InnoDB 默认存储表数据的存储结构。 …...

论文快过(图像配准|Coarse_LoFTR_TRT)|适用于移动端的LoFTR算法的改进分析 1060显卡上45fps

项目地址&#xff1a;https://github.com/Kolkir/Coarse_LoFTR_TRT 创建时间&#xff1a;2022年 相关训练数据&#xff1a;BlendedMVS LoFTR [19]是一种有效的深度学习方法&#xff0c;可以在图像对上寻找合适的局部特征匹配。本文报道了该方法在低计算性能和有限内存条件下的…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

Python实现prophet 理论及参数优化

文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候&#xff0c;写过一篇简单实现&#xff0c;后期随着对该模型的深入研究&#xff0c;本次记录涉及到prophet 的公式以及参数调优&#xff0c;从公式可以更直观…...

相机从app启动流程

一、流程框架图 二、具体流程分析 1、得到cameralist和对应的静态信息 目录如下: 重点代码分析: 启动相机前,先要通过getCameraIdList获取camera的个数以及id,然后可以通过getCameraCharacteristics获取对应id camera的capabilities(静态信息)进行一些openCamera前的…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...

08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险

C#入门系列【类的基本概念】&#xff1a;开启编程世界的奇妙冒险 嘿&#xff0c;各位编程小白探险家&#xff01;欢迎来到 C# 的奇幻大陆&#xff01;今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类&#xff01;别害怕&#xff0c;跟着我&#xff0c;保准让你轻松搞…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...