springboot集成canal
目录
- 一、打开mysql的binlog
- 1.1 打开 MySQL 配置文件 `my.cnf`(通常位于 `/etc/mysql/my.cnf` 或 `/etc/my.cnf`)并添加或修改以下设置:
- 1.2 重启mysql服务
- 1.3 验证是否生效
- 二、 部署canal 服务端(docker)
- 2.1 下载启动脚本(可能需要梯子)
- 2.2 启动服务
- 2.3 验证服务启动成功
- 三、springboot端集成canal客户端
- 3.1 添加依赖 /配置
- 3.2 客户端代码
- 3.3 数据同步效果
项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求
原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。
具体步骤
一、打开mysql的binlog
1.1 打开 MySQL 配置文件 my.cnf
(通常位于 /etc/mysql/my.cnf
或 /etc/my.cnf
)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
注意 :确保binlog-format是 row模式
1.2 重启mysql服务
具体命令根据你的服务器类型决定
1.3 验证是否生效
SHOW MASTER STATUS;
二、 部署canal 服务端(docker)
2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \-e canal.destinations=test \-e canal.instance.master.address=127.0.0.1:3306 \-e canal.instance.dbUsername=canal \-e canal.instance.dbPassword=canal \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false \-e canal.instance.filter.regex=.*\\..*
参数解释:
-e canal.auto.scan=false:关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>
可以看到这样的打印:
三、springboot端集成canal客户端
3.1 添加依赖 /配置
<!-- canal begin-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version>
</dependency>
<!-- canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111 #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {@Value("${canal.host}")private String canalHost;@Value("${canal.port}")private int canalPort;@Value("${canal.destination}")private String canalDestination;@Value("${canal.username}")private String canalUsername;@Value("${canal.password}")private String canalPassword;@Value("${canal.batch.size}")private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService = recordService;}@Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService = Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}@Overridepublic void destroy() throws Exception {if (executorService != null) {executorService.shutdown();}}private class Task implements Runnable {@Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try {//如果没有数据if (batchId == -1 || size == 0) {// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error("Error occurred when running Canal Client", e);} finally {canalConnector.disconnect();}}}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");}}}}/*** 执行数据同步* @param columns* @param type*/private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {if(type == null){return;}JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增recordService.save(bean);log.info("新增成功->{}", jsonObject.toJSONString(0));}else if (type.equals("UPDATE")){// 执行编辑recordService.updateById(bean);log.info("编辑成功->{}", jsonObject.toJSONString(0));}else if (type.equals("DELETE")){// 执行删除recordService.removeById(bean.getRecordId());log.info("删除成功->{}", jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}
3.3 数据同步效果
有点感叹需求就是最好的老师,但是完不成需求就不好玩了
相关文章:

springboot集成canal
目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf)并添加或修改以下设置:1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端(docker)2.1 下载启动脚本(可…...
leetcode数论(2447. 最大公因数等于 K 的子数组数目)
前言 经过前期的数据结构和算法学习,开始以OD机考题作为练习题,继续加强下熟练程度。 描述 给你一个整数数组 nums 和一个整数 k ,请你统计并返回 nums 的子数组中元素的最大公因数等于 k 的子数组数目。 子数组 是数组中一个连续的非空序列…...
实现数组扁平化的几种方式
目标: 实现数组扁平化[1,[2,[3,4,5]]] > [1,2,3,4,5] 我们有几种方法可以实现,分别为: 1、递归 function flatten(list){return list.reduce((tar, cur) > {if(Array.isArray(cur)){tar tar.concat(flatten(cur));} else {tar.push(cur);}return tar;}, []); } flatt…...

3D打印技术正悄然重塑模具工业格局
虽被誉为“工业之母”的模具在批量生产中仍占据核心地位,但3D打印以其“无模”直接成型的特性,在小批量、非标准化及复杂结构件制造领域展现出独特优势,随着技术和装备的不断发展,目前3D打印正逐渐向批量生产渗透,某品…...

深入解析 KMZ 文件的处理与可视化:从数据提取到地图展示项目实战
文章目录 1. KMZ 文件与 KML 文件简介1.1 KMZ 文件1.2 KML 文件 2. Python 环境配置与依赖安装3. 代码实现详解3.1 查找 KMZ 文件3.2 解压 KMZ 文件3.3 解析 KML 文件3.4 可视化 KMZ 数据 4. 项目实战4.1. 数据采集4.2. 项目完整代码 5. 项目运行与结果展示6. 总结与展望 在处理…...

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构和使用方式)
YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构) 本文介绍论文原理介绍网络代码多种yaml设置网络测试及实验结果<!-- 这里放入论文图片 -->  ;本文介绍 本文给大家带来的改进机制是结合MobileNetV4骨干网络,其中来自2024.5月发布的MobileNetV4…...

学习安卓开发遇到的问题
问题1:学习禁用与恢复按钮中: java代码报错:报错代码是 R.id.btn_enable;case R.id.btn_disable;case R.id.btn_test: 代码如下:(实现功能在代码后面) package com.example.apptest;import static java.…...

数学建模--禁忌搜索
目录 算法基本原理 关键要素 应用实例 实现细节 python代码示例 总结 禁忌搜索算法在解决哪些具体类型的组合优化问题中最有效? 禁忌搜索算法的邻域结构设计有哪些最佳实践或案例研究? 如何动态更新禁忌表以提高禁忌搜索算法的效率和性能&#…...
LeetCode 第136场双周赛个人题解
Q1. 求出胜利玩家的数目 原题链接 Q1. 求出胜利玩家的数目 思路分析 直接模拟 时间复杂度:O(N) AC代码 class Solution { public:int winningPlayerCount(int n, vector<vector<int>>& pick) {unordered_map<int, unordered_map<int, …...

The operation was rejected by your operating system. code CERT_HAS_EXPIRED报错解决
各种报错,试了清缓存,使用管理员权限打开命令行工具,更新npm,都不好使 最终解决:删除 c:/user/admin/ .npmrc...

[Git][基本操作]详细讲解
目录 1.创建本地仓库2.配置 Git3.添加文件1.添加文件2.提交文件3.其他 && 说明 4.删除文件5.跟踪修改文件6.版本回退7.撤销修改0.前言1.未add2.已add,未commit3.已add,已commit 1.创建本地仓库 创建⼀个Git本地仓库:git init运行该命…...

springMVC中从Excel文件中导入导出数据
目录 1. 数据库展示2. 导入依赖3. 写方法3.1 导入数据3.2 导出数据 4. 效果5. 不足6. 参考链接 1. 数据库展示 2. 导入依赖 pom.xml <!--文件上传处理--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId>&…...

C++的STL简介(三)
目录 1.vector的模拟实现 1.1begin() 1.2end() 1.3打印信息 1.4 reserve() 1.5 size() 1.6 capacity() 1.7 push_back() 1.8[ ] 1.9 pop_back() 1.10 insert&…...

BERT模型
BERT模型是由谷歌团队于2019年提出的 Encoder-only 的 语言模型,发表于NLP顶会ACL上。原文题目为:《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》链接 在前大模型时代,BERT模型可以算是一个参数量比…...
举例说明计算机视觉(CV)技术的优势和挑战
计算机视觉(CV)技术是通过计算机模拟和处理图像与视频数据来模拟人类视觉的能力。它可以带来许多优势,也面临一些挑战。 优势: 自动化:CV技术可以自动处理大量的图像和视频数据,从而提高工作效率和准确性。…...

Animate软件基础:关于补间动画中的图层
Animate 文档中的每一个场景都可以包含任意数量的时间轴图层。使用图层和图层文件夹可组织动画序列的内容和分隔动画对象。在图层和文件夹中组织它们可防止它们在重叠时相互擦除、连接或分段。若要创建一次包含多个元件或文本字段的补间移动的动画,请将每个对象放置…...

mac|安装hashcat(压缩包密码p解)
一、安装Macports(如果有brew就不用这一步) 根据官网文档:The MacPorts Project -- Download & Installation,安装步骤如下 1、下载MacPorts,这里我用的是tar.gz ,可以通过keka(keka安装在…...

【保姆级系列:锐捷模拟器的下载安装使用全套教程】
保姆级系列:锐捷模拟器的下载安装使用全套教程 1.介绍2.下载3.安装4.实践教程5.验证 1.介绍 锐捷目前可以通过EVE-NG来模拟自己家的路由器,交换机,防火墙。实现方式是把自己家的镜像导入到EVE-ng里面来运行。下面主要就是介绍如何下载镜像和…...

virtualbox7安装centos7.9配置静态ip
1.背景 我大概在一年之前安装virtualbox7centos7.9的环境,但看视频说用vagrant启动的窗口可以不用第三方工具(比如xshell、secure等)连接centos7.9,于是尝鲜试了下还可以,导致系统文件格式是vmdk了(网上有vmdk转vdi的方法…...

结构型设计模式:桥接/组合/装饰/外观/享元
结构型设计模式:适配器/代理 (qq.com)...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...

水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关
在水泥厂的生产流程中,工业自动化网关起着至关重要的作用,尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关,为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多,其中不少设备采用Devicenet协议。Devicen…...

【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...

企业大模型服务合规指南:深度解析备案与登记制度
伴随AI技术的爆炸式发展,尤其是大模型(LLM)在各行各业的深度应用和整合,企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者,还是积极拥抱AI转型的传统企业,在面向公众…...
Python常用模块:time、os、shutil与flask初探
一、Flask初探 & PyCharm终端配置 目的: 快速搭建小型Web服务器以提供数据。 工具: 第三方Web框架 Flask (需 pip install flask 安装)。 安装 Flask: 建议: 使用 PyCharm 内置的 Terminal (模拟命令行) 进行安装,避免频繁切换。 PyCharm Terminal 配置建议: 打开 Py…...
STL 2迭代器
文章目录 1.迭代器2.输入迭代器3.输出迭代器1.插入迭代器 4.前向迭代器5.双向迭代器6.随机访问迭代器7.不同容器返回的迭代器类型1.输入 / 输出迭代器2.前向迭代器3.双向迭代器4.随机访问迭代器5.特殊迭代器适配器6.为什么 unordered_set 只提供前向迭代器? 1.迭代器…...
linux设备重启后时间与网络时间不同步怎么解决?
linux设备重启后时间与网络时间不同步怎么解决? 设备只要一重启,时间又错了/偏了,明明刚刚对时还是对的! 这在物联网、嵌入式开发环境特别常见,尤其是开发板、树莓派、rk3588 这类设备。 解决方法: 加硬件…...