当前位置: 首页 > article >正文

使用 Spring Boot 和 Canal 实现 MySQL 数据库同步

文章目录

  • 前言
  • 一、背景
  • 二、Canal 简介
  • 三、主库数据库配置
      • 1.主库配置
      • 2.创建 Canal 用户并授予权限
  • 四.配置 Canal Server
      • 1.Canal Server 配置文件
      • 2.启动 Canal Server
  • 五.开发 Spring Boot 客户端
      • 1. 引入依赖
      • 2. 配置 Canal 客户端
      • 3. 实现数据同步逻辑
  • 六.启动并测试
  • 七.注意事项
  • 八.总结


前言

在分布式系统中,数据同步是一个常见的需求。例如,我们可能需要将主库的数据实时同步到多个从库,或者将数据从一个数据库集群同步到另一个集群。本篇内容通过一个实际案例,介绍如何使用 Spring Boot 和 Canal 实现 MySQL 数据库之间的数据同步。


一、背景

假设我们有以下数据库架构:

  • 两个主库:db_1 和 db_2。
    每个主库对应两个从库:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
  • 我们的目标是:
    将 db_1 的数据同步到 db_1_bk_1 和 db_1_bk_2。
    将 db_2 的数据同步到 db_2_bk_1 和 db_2_bk_2。

二、Canal 简介

Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量数据订阅与分发工具。它通过模拟 MySQL 的从节点,实时捕获主库的 Binlog 日志,并将数据变更事件推送给下游消费者。Canal 支持多种下游适配器,如 Kafka、RabbitMQ 和直接消费。

三、主库数据库配置

1.主库配置

为了使 Canal 能够正常解析 Binlog 日志,主库需要进行以下配置:

  • 开启 Binlog 日志:确保主库开启了 Binlog 日志,并且设置为 ROW 模式。
  • 配置 server-id:为每个主库设置唯一的 server-id。
  • 创建 Canal 用户并授予权限:创建一个用户供 Canal 使用,并授予必要的权限。

编辑主库的配置文件(my.cnf 或 my.ini),添加以下内容:

[mysqld]
# 开启 Binlog 日志
log-bin=mysql-bin
# 设置 Binlog 格式为 ROW 模式
binlog-format=ROW
# 设置唯一的 server-id
server-id=1

注意:

  • 如果你有多个主库,每个主库的 server-id 必须是唯一的。
  • 修改配置后,需要重启 MySQL 服务以使配置生效。

2.创建 Canal 用户并授予权限

Canal 需要一个具有读取 Binlog 权限的 MySQL 用户。以下是创建用户并授予权限的步骤:

# 登录 MySQL
mysql -u root -p
# 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予权限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新权限
FLUSH PRIVILEGES;

说明:

  • canal 用户需要足够的权限来读取 Binlog 数据,但不需要对数据库进行写操作。
  • 如果你的 MySQL 版本较新(8.x),可能需要使用 ALTER USER 命令来设置密码:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';

四.配置 Canal Server

Canal Server 是 Canal 的核心组件,负责连接主库并解析 Binlog 数据。我们需要为每个主库配置一个 Canal 实例。

1.Canal Server 配置文件

在 Canal Server 的配置目录下,创建两个实例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_1 数据库的所有表
canal.instance.filter.regex=db_1\\..*

conf/db_2/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_2 数据库的所有表
canal.instance.filter.regex=db_2\\..*

2.启动 Canal Server

使用以下命令启动 Canal Server:

nohup sh bin/canal.sh start &

注意:

  • 确保主库的 Binlog 位置和文件名正确。如果不确定,可以通过 SHOW MASTER STATUS; 命令查看。
  • 如果主库已经运行了一段时间,需要指定 Binlog 的起始位置,避免重复同步旧数据。

五.开发 Spring Boot 客户端

Spring Boot 客户端作为 Canal 的消息消费者,负责接收数据变更事件并同步到目标从库。

1. 引入依赖

在 Spring Boot 项目的 pom.xml文件中,引入 Canal 客户端依赖:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.8</version>
</dependency>

2. 配置 Canal 客户端

application.yml 文件中,配置 Canal Server 的地址:

canal:server.ip: canal_server_ipserver.port: 11111

3. 实现数据同步逻辑

创建一个 Canal 客户端服务类,用于接收和处理数据变更事件。
CanalClientService.java:

@Service
public class CanalClientService {private final CanalConnector canalConnector;public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");}@PostConstructpublic void start() {canalConnector.connect();canalConnector.subscribe("db_1..*, db_2..*"); // 订阅 db_1 和 db_2 的所有表new Thread(this::process).start();}private void process() {while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {continue;}for (Entry entry : message.getEntries()) {handleData(entry);}canalConnector.ack(batchId);}}private void handleData(Entry entry) {String schemaName = entry.getHeader().getSchemaName(); // 数据库名String tableName = entry.getHeader().getTableName();  // 表名EventType eventType = entry.getHeader().getEventType(); // 数据变更类型System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);// 根据来源数据库同步到对应的从库if ("db_1".equals(schemaName)) {syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");} else if ("db_2".equals(schemaName)) {syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");}}private void syncToBackupDbs(Entry entry, String... backupDbs) {// 根据事件类型同步到从库if (entry.getHeader().getEventType() == EventType.INSERT) {for (String db : backupDbs) {syncInsert(entry, db);}} else if (entry.getHeader().getEventType() == EventType.UPDATE) {for (String db : backupDbs) {syncUpdate(entry, db);}} else if (entry.getHeader().getEventType() == EventType.DELETE) {for (String db : backupDbs) {syncDelete(entry, db);}}}private void syncInsert(Entry entry, String backupDb) {// 使用 MyBatis 将数据插入到对应的从库System.out.println("INSERT into " + backupDb);}private void syncUpdate(Entry entry, String backupDb) {// 使用 MyBatis 将数据更新到对应的从库System.out.println("UPDATE into " + backupDb);}private void syncDelete(Entry entry, String backupDb) {// 使用 MyBatis 将数据从对应的从库删除System.out.println("DELETE from " + backupDb);}
}

六.启动并测试

  • 启动 Canal Server。
  • 启动 Spring Boot 应用。
  • 在主库 db_1 或 db_2 中插入、更新或删除数据。
  • 观察从库 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。

七.注意事项

  • 数据一致性:确保从库的数据与主库保持一致。可以通过事务或锁机制来避免冲突。
  • 性能优化:如果数据量较大,建议结合中间件(如 Kafka)进行缓冲和负载均衡。
  • 错误处理:在同步过程中,需要处理网络异常、数据库连接异常等情况。
  • Canal Server 高可用:在生产环境中,建议部署 Canal Server 的集群,以提高系统的可用性。

八.总结

通过 Spring Boot 和 Canal,我们可以实现 MySQL 数据库之间的高效数据同步。Canal 提供了强大的 Binlog 解析能力,而 Spring Boot 则提供了灵活的开发框架,两者结合可以轻松应对复杂的分布式数据同步需求。希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言。

相关文章:

使用 Spring Boot 和 Canal 实现 MySQL 数据库同步

文章目录 前言一、背景二、Canal 简介三、主库数据库配置1.主库配置2.创建 Canal 用户并授予权限 四.配置 Canal Server1.Canal Server 配置文件2.启动 Canal Server 五.开发 Spring Boot 客户端1. 引入依赖2. 配置 Canal 客户端3. 实现数据同步逻辑 六.启动并测试七.注意事项八…...

中上211硕对嵌入式AI感兴趣,如何有效规划学习路径?

今天给大家分享的是一位粉丝的提问&#xff0c;中上211硕对嵌入式AI感兴趣&#xff0c;如何有效规划学习路径&#xff1f; 接下来把粉丝的具体提问和我的回复分享给大家&#xff0c;希望也能给一些类似情况的小伙伴一些启发和帮助。 同学提问&#xff1a; 中上211&#xff0c;…...

OpenCV中的边缘检测

边缘检测是图像处理和计算机视觉中的关键技术之一&#xff0c;旨在识别图像中像素强度发生显著变化的区域&#xff0c;这些区域通常对应于物体的边界或轮廓。边缘检测在机器视觉中具有重要的需求背景&#xff0c;主要体现在以下几个方面&#xff1a; 图像分割&#xff1a;边缘…...

Python爬虫-猫眼电影的影院数据

前言 本文是该专栏的第46篇,后面会持续分享python爬虫干货知识,记得关注。 本文笔者以猫眼电影为例子,获取猫眼的影院相关数据。 废话不多说,具体实现思路和详细逻辑,笔者将在正文结合完整代码进行详细介绍。接下来,跟着笔者直接往下看正文详细内容。(附带完整代码) …...

家里WiFi信号穿墙后信号太差怎么处理?

一、首先在调制解调器&#xff08;俗称&#xff1a;猫&#xff09;测试网速&#xff0c;网速达不到联系运营商&#xff1b; 二、网线影响不大&#xff0c;5类网线跑500M完全没问题&#xff1b; 三、可以在卧室增加辅助路由器&#xff08;例如小米AX系列&#xff09;90~200元区…...

【前端学习笔记】Webpack

1.介绍 Webpack 是一个现代 JavaScript 应用程序的静态模块打包工具&#xff0c;它将 JavaScript、CSS、图片、字体等资源文件打包成一个或多个静态文件&#xff0c;以供浏览器使用。当 webpack 处理应用程序时&#xff0c;它会在内部从一个或多个入口点构建一个 依赖图(depend…...

数据结构(陈越,何钦铭)第三讲 树(上)

3.1 树与数的表示 3.1.1 顺序查找 int SequentialSearch(List Tbl,ElementType K){int i;Tbl->Element[0]K;for(iTbl->Length;Tbl->Element[i]!K;i--);return i; } typedef struct LNode *List; struct LNode{ElementType Element[MAXSIZE];int Length; };3.1.2 二分…...

【深度解析】图解Deepseek-V3模型架构-混合专家模型(MoE)

一、引言 最近非常火爆的DeepSeek-V3模型&#xff0c;是一个包含6710亿总参数的强大混合专家模型&#xff08;MoE&#xff09;&#xff0c;其中每个token激活370亿参数。该模型在DeepSeek-V2验证有效的核心架构基础上&#xff0c;采用多头潜在注意力&#xff08;MLA&#xff0…...

c#判断exe文件是不是7z或者rar的自解压文件

亲测可以实现检测7z的自解压&#xff0c;但是对于rar的自解压格式&#xff0c;最新版不支持&#xff0c;尝试修改回发现几乎检测成了exe文件&#xff0c;这显然是不正确的&#xff0c;其他版本未测试。 如下图所示&#xff0c;可以检测出自解压格式的7z文件&#xff0c;黑色显…...

富士SC2022,C325,C328打印机扫描到网络详细教程

前言&#xff1a; 在开始教程之前,我先声明目前该教程适用于FujiXerox apeos C325Z和FujiXerox DocuCentre SC2022打印机。这次教程以FujiXerox DocuCentre SC2022为例&#xff0c;该打印机IP地址为10.40.11.240。 前提条件 &#xff1a; 1. 安装打印机所需打印机和扫…...

涌现之谜:神经网络中的意识幻象与信息熵变

导言&#xff1a;黑箱中的幽灵剧场 当AlphaGo在棋盘第37手落下超越人类棋谱的"神之一着"时&#xff0c;观者感受到的震颤不亚于目睹意识的曙光。这种认知幻觉暴露了智能研究的基本困境&#xff1a;在权重矩阵的混沌涨落中&#xff0c;究竟诞生的是真正的认知主体&am…...

WEB安全--SQL注入--常见的注入手段

一、联表查询&#xff1a; 1.1原理&#xff1a; 当payload参数被后端查询语句接收到时&#xff0c;其中的非法语句通过union关联显示出其他的数据 1.2示例&#xff1a; #payload: -1 and union select 1,2,database()--#query: $sqlselect * from users where id-1 and union …...

wordpress get_footer();与wp_footer();的区别的关系

在WordPress中&#xff0c;get_footer() 和 wp_footer() 是两个不同的函数&#xff0c;它们在主题开发中扮演着不同的角色&#xff0c;但都与页面的“页脚”部分有关。以下是它们的区别和关系&#xff1a; 1. get_footer() get_footer() 是一个用于加载页脚模板的函数。它的主…...

人工智能3d点云之Pointnet++项目实战源码解读(点云分类与分割)

一.项目文件概述 二.数据读取模块配置 实际代码运行时是先定义与加载好模型&#xff0c;然后再去读取数据进来传入到模型网络中去训练。但现在反过来先读取数据开始。 进入ModelNetDataLoader类的_getitem方法, 做标准化的目的是处理异常大的数值 上面返回的cls是类别,相当于…...

IP 路由基础 | 路由条目生成 / 路由表内信息获取

注&#xff1a;本文为 “IP 路由” 相关文章合辑。 未整理去重。 IP 路由基础 秦同学学学已于 2022-04-09 18:44:20 修改 一. IP 路由产生背景 我们都知道 IP 地址可以标识网络中的一个节点&#xff0c;并且每个 IP 地址都有自己的网段&#xff0c;各个网段并不相同&#xf…...

Redis 启用自动内存碎片清理异常

Redis 启用自动内存碎片清理异常 127.0.0.1:6379> config set activedefrag yes (error) DISABLED Active defragmentation cannot be enabled: it requires a Redis server compiled with a modified Jemalloc like the one shipped by default with the Redis source dis…...

java后端开发day16--字符串(二)

&#xff08;以下内容全部来自上述课程&#xff09; 1.StringBuilder 因为StringBuilder是Java已经写好的类。 java在底层对他进行了一些特殊处理。 打印对象不是地址值而是属性值。 1.概述 StringBuilder可以看成是一个容器&#xff0c;创建之后里面的内容是可变的。 作用…...

LabVIEW危化品仓库的安全监测系统

本案例展示了基于LabVIEW平台设计的危化品仓库安全监测系统&#xff0c;结合ZigBee无线通信技术、485串口通讯技术和传感器技术&#xff0c;实现了对危化品仓库的实时无线监测。该系统不仅能提高安全性&#xff0c;还能大幅提升工作效率&#xff0c;确保危化品仓库的安全运营。…...

深度学习框架探秘|Keras 应用案例解析以及 Keras vs TensorFlow vs PyTorch

引言 上一篇文章《深度学习框架探秘&#xff5c;Keras&#xff1a;深度学习的魔法钥匙》 我们初步学习了 Keras&#xff0c;包括它是什么、具备哪些优势&#xff08;简洁易用的 API、强大的兼容性、广泛的应用领域&#xff09;&#xff0c;以及基本使用方法。本文&#xff0c;…...

VINS-mono代码笔记

feature_tracker_node.cpp: 一、通过roslaunch文件的参数服务器获得配置参数 二、获得相机的内参 三、订阅图像&#xff0c;img_callback&#xff1a; 1、第一帧图像只记录时间戳 2、与之前时间戳比较一下&#xff0c;判断是否要发布当前帧&#xff0c;避免高频率发送&#xff…...

Maven下载安装IDEA使用MavenJava在pom.xml配置教程

一、Maven 简介 Maven 是一个强大的项目管理和构建工具&#xff0c;主要用于 Java 项目的构建、依赖管理和文档生成等。它通过一个统一的 XML 文件&#xff08;pom.xml&#xff09;来管理项目的整个生命周期&#xff0c;包括编译、测试、打包、发布等环节。 二、Maven 下载与…...

NAT(网络地址转换)技术详解:网络安全渗透测试中的关键应用与防御策略

目录 NAT的作用 NAT类型 NAT工作流程示例 NAT 转换技术的原理 源地址转换&#xff08;SNAT&#xff0c;Source NAT&#xff09;&#xff1a; 目标地址转换&#xff08;DNAT&#xff0c;Destination NAT&#xff09;&#xff1a; 端口地址转换&#xff08;PAT&#xff0c…...

newgrp docker需要每次刷新问题

每次都需要运行 newgrp docker 的原因: 当用户被添加到 docker 组后&#xff0c;当前会话并不会立即更新组信息&#xff0c;因此需要通过 newgrp docker 切换到新的用户组以使权限生效 如果不想每次都手动运行 newgrp docker&#xff0c;可以在终端中配置一个自动刷新的脚本。…...

【FastAPI 使用FastAPI和uvicorn来同时运行HTTP和HTTPS的Python应用程序】

在本文中&#xff0c;我们将介绍如何使用 FastAPI和uvicorn来同时运行HTTP和HTTPS的 Python应用程序。 简介 FastAPI是一个高性能的Web框架&#xff0c;可以用于构建快速、可靠的API。它基于Python的类型提示和异步支持&#xff0c;使得开发者可以轻松地编写出安全且高效的代…...

容器化部署Kafka的最佳实践:基于KRaft模式的无ZooKeeper方案

一、docker 部署kafka单节点 1.1安装docker 可以参考这篇CentOS 7安装docker并配置镜像加速 1.3 运行kafka&#xff08;注意修改zookeeper&#xff0c;kafka地址&#xff09; docker run -d --name kafka -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://172.16.10.180:9092 -p …...

20250214 随笔 线程安全 线程不安全

1. 什么是线程安全 & 线程不安全&#xff1f; 线程安全&#xff08;Thread-Safe&#xff09;&#xff1a;在多线程环境下访问同一个对象时&#xff0c;不会产生数据竞争、不会出现数据不一致的问题。线程不安全&#xff08;Not Thread-Safe&#xff09;&#xff1a;在多线…...

rem、em、vw区别

在前端开发里&#xff0c;rem、em、vw都是用来设置元素大小的单位&#xff0c;下面就用大白话讲讲它们的区别。 参考标准不一样 rem&#xff1a;就像大家都用同一把“大尺子”来量东西&#xff0c;这把“大尺子”就是网页里根元素&#xff08;也就是 <html> 标签&#…...

【PHP】php+mysql 活动信息管理系统(源码+论文+数据库+数据库文件)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;专__注&#x1f448;&#xff1a;专注主流机器人、人工智能等相关领域的开发、测试技术。 【PHP】php 活动信息管理系统&#xff08;源码论文…...

cURL请求与Javascript请求转换工具

cURL请求与Javascript请求在线转换工具(如 curlconverter) 首先,看看各个证据中关于curl的定义。提到cURL是“Client for URLs”的缩写,最初全大写是为了方便记忆,社区也将其解释为“Client URL Request Library”或递归的“Curl URL Request Library”。同时,还指出cURL…...

rv1103b编译opencv

opencv-3.4.16&#xff0c;png的neon会报错&#xff0c;如果想开可以参考 https://blog.csdn.net/m0_60827485/article/details/137561429 rm -rf build mkdir build cd build cmake -DCMAKE_BUILD_TYPERELEASE \ -DCMAKE_C_COMPILERxxx/arm-rockchip831-linux-uclibcgnueabih…...