当前位置: 首页 > news >正文

使用 Spring Boot 和 Canal 实现 MySQL 数据库同步

文章目录

  • 前言
  • 一、背景
  • 二、Canal 简介
  • 三、主库数据库配置
      • 1.主库配置
      • 2.创建 Canal 用户并授予权限
  • 四.配置 Canal Server
      • 1.Canal Server 配置文件
      • 2.启动 Canal Server
  • 五.开发 Spring Boot 客户端
      • 1. 引入依赖
      • 2. 配置 Canal 客户端
      • 3. 实现数据同步逻辑
  • 六.启动并测试
  • 七.注意事项
  • 八.总结


前言

在分布式系统中,数据同步是一个常见的需求。例如,我们可能需要将主库的数据实时同步到多个从库,或者将数据从一个数据库集群同步到另一个集群。本篇内容通过一个实际案例,介绍如何使用 Spring Boot 和 Canal 实现 MySQL 数据库之间的数据同步。


一、背景

假设我们有以下数据库架构:

  • 两个主库:db_1 和 db_2。
    每个主库对应两个从库:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。
  • 我们的目标是:
    将 db_1 的数据同步到 db_1_bk_1 和 db_1_bk_2。
    将 db_2 的数据同步到 db_2_bk_1 和 db_2_bk_2。

二、Canal 简介

Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量数据订阅与分发工具。它通过模拟 MySQL 的从节点,实时捕获主库的 Binlog 日志,并将数据变更事件推送给下游消费者。Canal 支持多种下游适配器,如 Kafka、RabbitMQ 和直接消费。

三、主库数据库配置

1.主库配置

为了使 Canal 能够正常解析 Binlog 日志,主库需要进行以下配置:

  • 开启 Binlog 日志:确保主库开启了 Binlog 日志,并且设置为 ROW 模式。
  • 配置 server-id:为每个主库设置唯一的 server-id。
  • 创建 Canal 用户并授予权限:创建一个用户供 Canal 使用,并授予必要的权限。

编辑主库的配置文件(my.cnf 或 my.ini),添加以下内容:

[mysqld]
# 开启 Binlog 日志
log-bin=mysql-bin
# 设置 Binlog 格式为 ROW 模式
binlog-format=ROW
# 设置唯一的 server-id
server-id=1

注意:

  • 如果你有多个主库,每个主库的 server-id 必须是唯一的。
  • 修改配置后,需要重启 MySQL 服务以使配置生效。

2.创建 Canal 用户并授予权限

Canal 需要一个具有读取 Binlog 权限的 MySQL 用户。以下是创建用户并授予权限的步骤:

# 登录 MySQL
mysql -u root -p
# 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予权限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新权限
FLUSH PRIVILEGES;

说明:

  • canal 用户需要足够的权限来读取 Binlog 数据,但不需要对数据库进行写操作。
  • 如果你的 MySQL 版本较新(8.x),可能需要使用 ALTER USER 命令来设置密码:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';

四.配置 Canal Server

Canal Server 是 Canal 的核心组件,负责连接主库并解析 Binlog 数据。我们需要为每个主库配置一个 Canal 实例。

1.Canal Server 配置文件

在 Canal Server 的配置目录下,创建两个实例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_1 数据库的所有表
canal.instance.filter.regex=db_1\\..*

conf/db_2/instance.properties:

# 主库的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_2 数据库的所有表
canal.instance.filter.regex=db_2\\..*

2.启动 Canal Server

使用以下命令启动 Canal Server:

nohup sh bin/canal.sh start &

注意:

  • 确保主库的 Binlog 位置和文件名正确。如果不确定,可以通过 SHOW MASTER STATUS; 命令查看。
  • 如果主库已经运行了一段时间,需要指定 Binlog 的起始位置,避免重复同步旧数据。

五.开发 Spring Boot 客户端

Spring Boot 客户端作为 Canal 的消息消费者,负责接收数据变更事件并同步到目标从库。

1. 引入依赖

在 Spring Boot 项目的 pom.xml文件中,引入 Canal 客户端依赖:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.8</version>
</dependency>

2. 配置 Canal 客户端

application.yml 文件中,配置 Canal Server 的地址:

canal:server.ip: canal_server_ipserver.port: 11111

3. 实现数据同步逻辑

创建一个 Canal 客户端服务类,用于接收和处理数据变更事件。
CanalClientService.java:

@Service
public class CanalClientService {private final CanalConnector canalConnector;public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");}@PostConstructpublic void start() {canalConnector.connect();canalConnector.subscribe("db_1..*, db_2..*"); // 订阅 db_1 和 db_2 的所有表new Thread(this::process).start();}private void process() {while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {continue;}for (Entry entry : message.getEntries()) {handleData(entry);}canalConnector.ack(batchId);}}private void handleData(Entry entry) {String schemaName = entry.getHeader().getSchemaName(); // 数据库名String tableName = entry.getHeader().getTableName();  // 表名EventType eventType = entry.getHeader().getEventType(); // 数据变更类型System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);// 根据来源数据库同步到对应的从库if ("db_1".equals(schemaName)) {syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");} else if ("db_2".equals(schemaName)) {syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");}}private void syncToBackupDbs(Entry entry, String... backupDbs) {// 根据事件类型同步到从库if (entry.getHeader().getEventType() == EventType.INSERT) {for (String db : backupDbs) {syncInsert(entry, db);}} else if (entry.getHeader().getEventType() == EventType.UPDATE) {for (String db : backupDbs) {syncUpdate(entry, db);}} else if (entry.getHeader().getEventType() == EventType.DELETE) {for (String db : backupDbs) {syncDelete(entry, db);}}}private void syncInsert(Entry entry, String backupDb) {// 使用 MyBatis 将数据插入到对应的从库System.out.println("INSERT into " + backupDb);}private void syncUpdate(Entry entry, String backupDb) {// 使用 MyBatis 将数据更新到对应的从库System.out.println("UPDATE into " + backupDb);}private void syncDelete(Entry entry, String backupDb) {// 使用 MyBatis 将数据从对应的从库删除System.out.println("DELETE from " + backupDb);}
}

六.启动并测试

  • 启动 Canal Server。
  • 启动 Spring Boot 应用。
  • 在主库 db_1 或 db_2 中插入、更新或删除数据。
  • 观察从库 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。

七.注意事项

  • 数据一致性:确保从库的数据与主库保持一致。可以通过事务或锁机制来避免冲突。
  • 性能优化:如果数据量较大,建议结合中间件(如 Kafka)进行缓冲和负载均衡。
  • 错误处理:在同步过程中,需要处理网络异常、数据库连接异常等情况。
  • Canal Server 高可用:在生产环境中,建议部署 Canal Server 的集群,以提高系统的可用性。

八.总结

通过 Spring Boot 和 Canal,我们可以实现 MySQL 数据库之间的高效数据同步。Canal 提供了强大的 Binlog 解析能力,而 Spring Boot 则提供了灵活的开发框架,两者结合可以轻松应对复杂的分布式数据同步需求。希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言。

相关文章:

使用 Spring Boot 和 Canal 实现 MySQL 数据库同步

文章目录 前言一、背景二、Canal 简介三、主库数据库配置1.主库配置2.创建 Canal 用户并授予权限 四.配置 Canal Server1.Canal Server 配置文件2.启动 Canal Server 五.开发 Spring Boot 客户端1. 引入依赖2. 配置 Canal 客户端3. 实现数据同步逻辑 六.启动并测试七.注意事项八…...

vue3 在element-plus表格使用render-header

在vue2中 element表格render-header 源码是有返回h()函数的 在vue3 element-plus 表格源码 render-header函数没有返回h函数了 所以需要用render-header方法中创建虚拟DOM节点的话需要引用h方法 <el-table-column header-align"right" align"right" …...

算法——结合实例了解Minimax算法(极小化极大算法)

计算机科学中最有趣的事情之一就是编写一个人机博弈的程序。有大量的例子&#xff0c;最出名的是编写一个国际象棋的博弈机器。但不管是什么游戏&#xff0c;程序趋向于遵循一个被称为Minimax算法&#xff0c;伴随着各种各样的子算法在一块。本篇将简要介绍 minimax 算法&#…...

使用 DeepSeek 生成商城流程图

步骤 1.下载 mermaid 2.使用 DeepSeek 生成 mermaid 格式 3.复制内容到 4.保存备用。 结束。...

什么是GraphQL?

如果你在寻找漏洞利用方式,请参考下面的文章 GraphQL API 漏洞 |网络安全学院 GitHub - swisskyrepo/PayloadsAllTheThings: A list of useful payloads and bypass for Web Application Security and Pentest/CTF GraphQL 查询&#xff08;Query&#xff09; GraphQL 既不是…...

Spring Boot 的约定优于配置,你的理解是什么?

Spring Boot 的“约定优于配置”&#xff1a;开发效率的革命性提升 在软件开发中&#xff0c;开发者常常需要花费大量时间编写繁琐的配置文件&#xff0c;尤其是在传统的 Java EE 或 Spring 框架中。而 Spring Boot 通过“约定优于配置”&#xff08;Convention Over Configur…...

C#开源大型商城系统之B2B2C+O2O一体化_OctShop

一、应用背景与引言 在当今数字化商业的浪潮中&#xff0c;电子商务平台的构建成为众多企业拓展业务、提升竞争力的关键举措。C# 语言以其强大的功能、高效的性能以及良好的开发框架支持&#xff0c;在商城系统开发领域占据着重要地位。独立开源的大型 C# 商城系统&#xff0c…...

gitte远程仓库修改后,本地没有更新,本地与远程仓库不一致

问题 &#xff1a;gitte远程仓库修改后&#xff0c;本地没有更新&#xff0c;本地与远程仓库不一致 现象&#xff1a; [cxqiZwz9fjj2ssnshikw14avaZ rpc]$ git push Username for https://gitee.com: beihangya Password for https://beihangyagitee.com: To https://gitee.c…...

【对比】Pandas 和 Polars 的区别

Pandas vs Polars 对比表 特性PandasPolars开发语言Python&#xff08;Cython 实现核心部分&#xff09;Rust&#xff08;高性能系统编程语言&#xff09;性能较慢&#xff0c;尤其在大数据集上&#xff08;内存占用高&#xff0c;计算效率低&#xff09;极快&#xff0c;利用…...

el-input无法输入0.0001的小数,自动转换为0在vue3中的bug

今天遇到个bug&#xff0c;el-input中只能输入0.1或者输入0.1再加上00成为0.001&#xff0c;不能直接输入0.001&#xff0c;否则自动转换为0。需要去掉 v-model.number后面的 .number 源代码&#xff1a; <el-table-column label"实发数量" width"120"…...

Ubuntu 下 systemd 介绍

系列文章目录 Linux内核学习 Linux 知识&#xff08;1&#xff09; Linux 知识&#xff08;2&#xff09; WSL Ubuntu QEMU 虚拟机 Linux 调试视频 PCIe 与 USB 的补充知识 vscode 使用说明 树莓派 4B 指南 设备驱动畅想 Linux内核子系统 Linux 文件系统挂载 QEMU 通过网络实现…...

BERT文本分类(PyTorch和Transformers)畅用七个模型架构

&#xff08;PyTorch&#xff09;BERT文本分类&#xff1a;七种模型架构 &#x1f31f; 1. 介绍 使用BERT完成文本分类任务&#xff08;如情感分析&#xff0c;新闻文本分类等等&#xff09;对于NLPer已经是很基础的工作了&#xff01;虽说已迈入LLM时代&#xff0c;但是BERT…...

两步在 Vite 中配置 Tailwindcss

第一步&#xff1a;安装依赖 npm i -D tailwindcss tailwindcss/vite第二步&#xff1a;引入 tailwindcss 更改配置 // src/main.js import tailwindcss/index// vite.config.js import vue from vitejs/plugin-vue import tailwindcss from tailwindcss/viteexport default …...

【vmware虚拟机安装教程】

以下是在VMware Workstation Pro上安装虚拟机的详细教程&#xff1a; 准备工作 下载VMware Workstation Pro 访问VMware官网下载并安装VMware Workstation Pro&#xff08;支持Windows和Linux系统&#xff09;。安装完成后&#xff0c;确保已激活软件&#xff08;试用版或正式…...

文字转语音(三)FreeTTS实现

项目中有相关的功能&#xff0c;就简单研究了一下。 说明 FreeTTS 是一个基于 Java 的开源文本转语音&#xff08;TTS&#xff09;引擎&#xff0c;旨在将文字内容转换为自然语音输出。 FreeTTS 适合对 英文语音质量要求低、预算有限且需要离线运行 的场景&#xff0c;但若需…...

string类详解(上)

文章目录 目录1. STL简介1.1 什么是STL1.2 STL的版本1.3 STL的六大组件 2. 为什么学习string类3. 标准库中的string类3.1 string类3.2 string类的常用接口说明 目录 STL简介为什么学习string类标准库中的string类string类的模拟实现现代版写法的String类写时拷贝 1. STL简介 …...

Visual Studio Code使用ai大模型编成

1、在Visual Studio Code搜索安装roo code 2、去https://openrouter.ai/settings/keys官网申请个免费的配置使用...

外贸跨境订货系统流程设计、功能列表及源码输出

在全球化的商业环境下&#xff0c;外贸跨境订货系统对于企业拓展国际市场、提升运营效率至关重要。该系统旨在为外贸企业提供一个便捷、高效、安全的订货平台&#xff0c;实现商品展示、订单管理、物流跟踪等功能&#xff0c;满足跨境业务的多样化需求。以下将详细阐述外贸订货…...

TraeAi上手体验

一、Trae介绍 由于MarsCode 在国内由于规定限制&#xff0c;无法使用 Claude 3.5 Sonnet 模型&#xff0c;字节跳动选择在海外推出 Trae&#xff0c;官网&#xff1a;https://www.trae.ai/。 二、安装 1.下载安装Trae-Setup-x64.exe 2.注册登录 安装完成后&#xff0c;点击登…...

深入解析 vLLM:高性能 LLM 服务框架的架构之美(一)原理与解析

修改内容时间2.4.1处理请求的流程&#xff0c;引用更好的流程图2025.02.11首发2025.02.08 深入解析 vLLM&#xff1a;高性能 LLM 服务框架的架构之美&#xff08;一&#xff09;原理与解析 深入解析 vLLM&#xff1a;高性能 LLM 服务框架的架构之美&#xff08;二&#xff09;…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

如何在看板中体现优先级变化

在看板中有效体现优先级变化的关键措施包括&#xff1a;采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中&#xff0c;设置任务排序规则尤其重要&#xff0c;因为它让看板视觉上直观地体…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

[论文阅读]TrustRAG: Enhancing Robustness and Trustworthiness in RAG

TrustRAG: Enhancing Robustness and Trustworthiness in RAG [2501.00879] TrustRAG: Enhancing Robustness and Trustworthiness in Retrieval-Augmented Generation 代码&#xff1a;HuichiZhou/TrustRAG: Code for "TrustRAG: Enhancing Robustness and Trustworthin…...

阿里云Ubuntu 22.04 64位搭建Flask流程(亲测)

cd /home 进入home盘 安装虚拟环境&#xff1a; 1、安装virtualenv pip install virtualenv 2.创建新的虚拟环境&#xff1a; virtualenv myenv 3、激活虚拟环境&#xff08;激活环境可以在当前环境下安装包&#xff09; source myenv/bin/activate 此时&#xff0c;终端…...

文件上传漏洞防御全攻略

要全面防范文件上传漏洞&#xff0c;需构建多层防御体系&#xff0c;结合技术验证、存储隔离与权限控制&#xff1a; &#x1f512; 一、基础防护层 前端校验&#xff08;仅辅助&#xff09; 通过JavaScript限制文件后缀名&#xff08;白名单&#xff09;和大小&#xff0c;提…...