使用 Spring Boot 和 Canal 实现 MySQL 数据库同步
文章目录
- 前言
- 一、背景
- 二、Canal 简介
- 三、主库数据库配置
- 1.主库配置
- 2.创建 Canal 用户并授予权限
- 四.配置 Canal Server
- 1.Canal Server 配置文件
- 2.启动 Canal Server
- 五.开发 Spring Boot 客户端
- 1. 引入依赖
- 2. 配置 Canal 客户端
- 3. 实现数据同步逻辑
- 六.启动并测试
- 七.注意事项
- 八.总结
前言
在分布式系统中,数据同步是一个常见的需求。例如,我们可能需要将主库的数据实时同步到多个从库,或者将数据从一个数据库集群同步到另一个集群。本篇内容通过一个实际案例,介绍如何使用 Spring Boot 和 Canal 实现 MySQL 数据库之间的数据同步。
一、背景
假设我们有以下数据库架构:
- 两个主库:db_1 和 db_2。
每个主库对应两个从库:db_1_bk_1、db_1_bk_2 和 db_2_bk_1、db_2_bk_2。 - 我们的目标是:
将 db_1 的数据同步到 db_1_bk_1 和 db_1_bk_2。
将 db_2 的数据同步到 db_2_bk_1 和 db_2_bk_2。
二、Canal 简介
Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量数据订阅与分发工具。它通过模拟 MySQL 的从节点,实时捕获主库的 Binlog 日志,并将数据变更事件推送给下游消费者。Canal 支持多种下游适配器,如 Kafka、RabbitMQ 和直接消费。
三、主库数据库配置
1.主库配置
为了使 Canal 能够正常解析 Binlog 日志,主库需要进行以下配置:
- 开启 Binlog 日志:确保主库开启了 Binlog 日志,并且设置为 ROW 模式。
- 配置 server-id:为每个主库设置唯一的 server-id。
- 创建 Canal 用户并授予权限:创建一个用户供 Canal 使用,并授予必要的权限。
编辑主库的配置文件(my.cnf 或 my.ini),添加以下内容:
[mysqld]
# 开启 Binlog 日志
log-bin=mysql-bin
# 设置 Binlog 格式为 ROW 模式
binlog-format=ROW
# 设置唯一的 server-id
server-id=1
注意:
- 如果你有多个主库,每个主库的 server-id 必须是唯一的。
- 修改配置后,需要重启 MySQL 服务以使配置生效。
2.创建 Canal 用户并授予权限
Canal 需要一个具有读取 Binlog 权限的 MySQL 用户。以下是创建用户并授予权限的步骤:
# 登录 MySQL
mysql -u root -p
# 创建用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授予权限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
# 刷新权限
FLUSH PRIVILEGES;
说明:
- canal 用户需要足够的权限来读取 Binlog 数据,但不需要对数据库进行写操作。
- 如果你的 MySQL 版本较新(8.x),可能需要使用 ALTER USER 命令来设置密码:
ALTER USER 'canal'@'%' IDENTIFIED BY 'canal';
四.配置 Canal Server
Canal Server 是 Canal 的核心组件,负责连接主库并解析 Binlog 数据。我们需要为每个主库配置一个 Canal 实例。
1.Canal Server 配置文件
在 Canal Server 的配置目录下,创建两个实例配置文件:conf/db_1/instance.properties 和 conf/db_2/instance.properties。
conf/db_1/instance.properties:
# 主库的地址和端口
canal.instance.master.address=db_1_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_1 数据库的所有表
canal.instance.filter.regex=db_1\\..*
conf/db_2/instance.properties:
# 主库的地址和端口
canal.instance.master.address=db_2_ip:3306
# Canal 连接主库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 需要同步的表正则表达式,这里表示同步 db_2 数据库的所有表
canal.instance.filter.regex=db_2\\..*
2.启动 Canal Server
使用以下命令启动 Canal Server:
nohup sh bin/canal.sh start &
注意:
- 确保主库的 Binlog 位置和文件名正确。如果不确定,可以通过 SHOW MASTER STATUS; 命令查看。
- 如果主库已经运行了一段时间,需要指定 Binlog 的起始位置,避免重复同步旧数据。
五.开发 Spring Boot 客户端
Spring Boot 客户端作为 Canal 的消息消费者,负责接收数据变更事件并同步到目标从库。
1. 引入依赖
在 Spring Boot 项目的 pom.xml文件中,引入 Canal 客户端依赖:
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.8</version>
</dependency>
2. 配置 Canal 客户端
在 application.yml 文件中,配置 Canal Server 的地址:
canal:server.ip: canal_server_ipserver.port: 11111
3. 实现数据同步逻辑
创建一个 Canal 客户端服务类,用于接收和处理数据变更事件。
CanalClientService.java:
@Service
public class CanalClientService {private final CanalConnector canalConnector;public CanalClientService(@Value("${canal.server.ip}") String canalServerIp, @Value("${canal.server.port}") int canalServerPort) {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp, canalServerPort), "example", "", "");}@PostConstructpublic void start() {canalConnector.connect();canalConnector.subscribe("db_1..*, db_2..*"); // 订阅 db_1 和 db_2 的所有表new Thread(this::process).start();}private void process() {while (true) {Message message = canalConnector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {continue;}for (Entry entry : message.getEntries()) {handleData(entry);}canalConnector.ack(batchId);}}private void handleData(Entry entry) {String schemaName = entry.getHeader().getSchemaName(); // 数据库名String tableName = entry.getHeader().getTableName(); // 表名EventType eventType = entry.getHeader().getEventType(); // 数据变更类型System.out.println("Schema: " + schemaName + ", Table: " + tableName + ", Type: " + eventType);// 根据来源数据库同步到对应的从库if ("db_1".equals(schemaName)) {syncToBackupDbs(entry, "db_1_bk_1", "db_1_bk_2");} else if ("db_2".equals(schemaName)) {syncToBackupDbs(entry, "db_2_bk_1", "db_2_bk_2");}}private void syncToBackupDbs(Entry entry, String... backupDbs) {// 根据事件类型同步到从库if (entry.getHeader().getEventType() == EventType.INSERT) {for (String db : backupDbs) {syncInsert(entry, db);}} else if (entry.getHeader().getEventType() == EventType.UPDATE) {for (String db : backupDbs) {syncUpdate(entry, db);}} else if (entry.getHeader().getEventType() == EventType.DELETE) {for (String db : backupDbs) {syncDelete(entry, db);}}}private void syncInsert(Entry entry, String backupDb) {// 使用 MyBatis 将数据插入到对应的从库System.out.println("INSERT into " + backupDb);}private void syncUpdate(Entry entry, String backupDb) {// 使用 MyBatis 将数据更新到对应的从库System.out.println("UPDATE into " + backupDb);}private void syncDelete(Entry entry, String backupDb) {// 使用 MyBatis 将数据从对应的从库删除System.out.println("DELETE from " + backupDb);}
}
六.启动并测试
- 启动 Canal Server。
- 启动 Spring Boot 应用。
- 在主库 db_1 或 db_2 中插入、更新或删除数据。
- 观察从库 db_1_bk_1、db_1_bk_2、db_2_bk_1 和 db_2_bk_2 是否同步成功。
七.注意事项
- 数据一致性:确保从库的数据与主库保持一致。可以通过事务或锁机制来避免冲突。
- 性能优化:如果数据量较大,建议结合中间件(如 Kafka)进行缓冲和负载均衡。
- 错误处理:在同步过程中,需要处理网络异常、数据库连接异常等情况。
- Canal Server 高可用:在生产环境中,建议部署 Canal Server 的集群,以提高系统的可用性。
八.总结
通过 Spring Boot 和 Canal,我们可以实现 MySQL 数据库之间的高效数据同步。Canal 提供了强大的 Binlog 解析能力,而 Spring Boot 则提供了灵活的开发框架,两者结合可以轻松应对复杂的分布式数据同步需求。希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言。
相关文章:
使用 Spring Boot 和 Canal 实现 MySQL 数据库同步
文章目录 前言一、背景二、Canal 简介三、主库数据库配置1.主库配置2.创建 Canal 用户并授予权限 四.配置 Canal Server1.Canal Server 配置文件2.启动 Canal Server 五.开发 Spring Boot 客户端1. 引入依赖2. 配置 Canal 客户端3. 实现数据同步逻辑 六.启动并测试七.注意事项八…...
vue3 在element-plus表格使用render-header
在vue2中 element表格render-header 源码是有返回h()函数的 在vue3 element-plus 表格源码 render-header函数没有返回h函数了 所以需要用render-header方法中创建虚拟DOM节点的话需要引用h方法 <el-table-column header-align"right" align"right" …...
算法——结合实例了解Minimax算法(极小化极大算法)
计算机科学中最有趣的事情之一就是编写一个人机博弈的程序。有大量的例子,最出名的是编写一个国际象棋的博弈机器。但不管是什么游戏,程序趋向于遵循一个被称为Minimax算法,伴随着各种各样的子算法在一块。本篇将简要介绍 minimax 算法&#…...
使用 DeepSeek 生成商城流程图
步骤 1.下载 mermaid 2.使用 DeepSeek 生成 mermaid 格式 3.复制内容到 4.保存备用。 结束。...
什么是GraphQL?
如果你在寻找漏洞利用方式,请参考下面的文章 GraphQL API 漏洞 |网络安全学院 GitHub - swisskyrepo/PayloadsAllTheThings: A list of useful payloads and bypass for Web Application Security and Pentest/CTF GraphQL 查询(Query) GraphQL 既不是…...
Spring Boot 的约定优于配置,你的理解是什么?
Spring Boot 的“约定优于配置”:开发效率的革命性提升 在软件开发中,开发者常常需要花费大量时间编写繁琐的配置文件,尤其是在传统的 Java EE 或 Spring 框架中。而 Spring Boot 通过“约定优于配置”(Convention Over Configur…...
C#开源大型商城系统之B2B2C+O2O一体化_OctShop
一、应用背景与引言 在当今数字化商业的浪潮中,电子商务平台的构建成为众多企业拓展业务、提升竞争力的关键举措。C# 语言以其强大的功能、高效的性能以及良好的开发框架支持,在商城系统开发领域占据着重要地位。独立开源的大型 C# 商城系统,…...
gitte远程仓库修改后,本地没有更新,本地与远程仓库不一致
问题 :gitte远程仓库修改后,本地没有更新,本地与远程仓库不一致 现象: [cxqiZwz9fjj2ssnshikw14avaZ rpc]$ git push Username for https://gitee.com: beihangya Password for https://beihangyagitee.com: To https://gitee.c…...
【对比】Pandas 和 Polars 的区别
Pandas vs Polars 对比表 特性PandasPolars开发语言Python(Cython 实现核心部分)Rust(高性能系统编程语言)性能较慢,尤其在大数据集上(内存占用高,计算效率低)极快,利用…...
el-input无法输入0.0001的小数,自动转换为0在vue3中的bug
今天遇到个bug,el-input中只能输入0.1或者输入0.1再加上00成为0.001,不能直接输入0.001,否则自动转换为0。需要去掉 v-model.number后面的 .number 源代码: <el-table-column label"实发数量" width"120"…...
Ubuntu 下 systemd 介绍
系列文章目录 Linux内核学习 Linux 知识(1) Linux 知识(2) WSL Ubuntu QEMU 虚拟机 Linux 调试视频 PCIe 与 USB 的补充知识 vscode 使用说明 树莓派 4B 指南 设备驱动畅想 Linux内核子系统 Linux 文件系统挂载 QEMU 通过网络实现…...
BERT文本分类(PyTorch和Transformers)畅用七个模型架构
(PyTorch)BERT文本分类:七种模型架构 🌟 1. 介绍 使用BERT完成文本分类任务(如情感分析,新闻文本分类等等)对于NLPer已经是很基础的工作了!虽说已迈入LLM时代,但是BERT…...
两步在 Vite 中配置 Tailwindcss
第一步:安装依赖 npm i -D tailwindcss tailwindcss/vite第二步:引入 tailwindcss 更改配置 // src/main.js import tailwindcss/index// vite.config.js import vue from vitejs/plugin-vue import tailwindcss from tailwindcss/viteexport default …...
【vmware虚拟机安装教程】
以下是在VMware Workstation Pro上安装虚拟机的详细教程: 准备工作 下载VMware Workstation Pro 访问VMware官网下载并安装VMware Workstation Pro(支持Windows和Linux系统)。安装完成后,确保已激活软件(试用版或正式…...
文字转语音(三)FreeTTS实现
项目中有相关的功能,就简单研究了一下。 说明 FreeTTS 是一个基于 Java 的开源文本转语音(TTS)引擎,旨在将文字内容转换为自然语音输出。 FreeTTS 适合对 英文语音质量要求低、预算有限且需要离线运行 的场景,但若需…...
string类详解(上)
文章目录 目录1. STL简介1.1 什么是STL1.2 STL的版本1.3 STL的六大组件 2. 为什么学习string类3. 标准库中的string类3.1 string类3.2 string类的常用接口说明 目录 STL简介为什么学习string类标准库中的string类string类的模拟实现现代版写法的String类写时拷贝 1. STL简介 …...
Visual Studio Code使用ai大模型编成
1、在Visual Studio Code搜索安装roo code 2、去https://openrouter.ai/settings/keys官网申请个免费的配置使用...
外贸跨境订货系统流程设计、功能列表及源码输出
在全球化的商业环境下,外贸跨境订货系统对于企业拓展国际市场、提升运营效率至关重要。该系统旨在为外贸企业提供一个便捷、高效、安全的订货平台,实现商品展示、订单管理、物流跟踪等功能,满足跨境业务的多样化需求。以下将详细阐述外贸订货…...
TraeAi上手体验
一、Trae介绍 由于MarsCode 在国内由于规定限制,无法使用 Claude 3.5 Sonnet 模型,字节跳动选择在海外推出 Trae,官网:https://www.trae.ai/。 二、安装 1.下载安装Trae-Setup-x64.exe 2.注册登录 安装完成后,点击登…...
深入解析 vLLM:高性能 LLM 服务框架的架构之美(一)原理与解析
修改内容时间2.4.1处理请求的流程,引用更好的流程图2025.02.11首发2025.02.08 深入解析 vLLM:高性能 LLM 服务框架的架构之美(一)原理与解析 深入解析 vLLM:高性能 LLM 服务框架的架构之美(二)…...
stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
如何在网页里填写 PDF 表格?
有时候,你可能希望用户能在你的网站上填写 PDF 表单。然而,这件事并不简单,因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件,但原生并不支持编辑或填写它们。更糟的是,如果你想收集表单数据ÿ…...
iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈
在日常iOS开发过程中,性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期,开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发,但背后往往隐藏着系统资源调度不当…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
