当前位置: 首页 > news >正文

springboot集成canal

目录

      • 一、打开mysql的binlog
        • 1.1 打开 MySQL 配置文件 `my.cnf`(通常位于 `/etc/mysql/my.cnf` 或 `/etc/my.cnf`)并添加或修改以下设置:
        • 1.2 重启mysql服务
        • 1.3 验证是否生效
      • 二、 部署canal 服务端(docker)
        • 2.1 下载启动脚本(可能需要梯子)
        • 2.2 启动服务
        • 2.3 验证服务启动成功
      • 三、springboot端集成canal客户端
        • 3.1 添加依赖 /配置
        • 3.2 客户端代码
        • 3.3 数据同步效果

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf/etc/my.cnf)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row

注意 :确保binlog-format是 row模式

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS;

二、 部署canal 服务端(docker)

2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \-e canal.destinations=test \-e canal.instance.master.address=127.0.0.1:3306  \-e canal.instance.dbUsername=canal  \-e canal.instance.dbPassword=canal  \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false  \-e canal.instance.filter.regex=.*\\..* 

参数解释:

-e canal.auto.scan=false:关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>

可以看到这样的打印:
image.png

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version>
</dependency>
<!--  canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111  #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {@Value("${canal.host}")private String canalHost;@Value("${canal.port}")private int canalPort;@Value("${canal.destination}")private String canalDestination;@Value("${canal.username}")private String canalUsername;@Value("${canal.password}")private String canalPassword;@Value("${canal.batch.size}")private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService = recordService;}@Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService = Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}@Overridepublic void destroy() throws Exception {if (executorService != null) {executorService.shutdown();}}private class Task implements Runnable {@Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try {//如果没有数据if (batchId == -1 || size == 0) {// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error("Error occurred when running Canal Client", e);} finally {canalConnector.disconnect();}}}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");}}}}/*** 执行数据同步* @param columns* @param type*/private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {if(type == null){return;}JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增recordService.save(bean);log.info("新增成功->{}", jsonObject.toJSONString(0));}else if (type.equals("UPDATE")){// 执行编辑recordService.updateById(bean);log.info("编辑成功->{}", jsonObject.toJSONString(0));}else if (type.equals("DELETE")){// 执行删除recordService.removeById(bean.getRecordId());log.info("删除成功->{}", jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

相关文章:

springboot集成canal

目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf&#xff08;通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf&#xff09;并添加或修改以下设置&#xff1a;1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端&#xff08;docker&#xff09;2.1 下载启动脚本(可…...

leetcode数论(2447. 最大公因数等于 K 的子数组数目)

前言 经过前期的数据结构和算法学习&#xff0c;开始以OD机考题作为练习题&#xff0c;继续加强下熟练程度。 描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 nums 的子数组中元素的最大公因数等于 k 的子数组数目。 子数组 是数组中一个连续的非空序列…...

实现数组扁平化的几种方式

目标: 实现数组扁平化[1,[2,[3,4,5]]] > [1,2,3,4,5] 我们有几种方法可以实现,分别为: 1、递归 function flatten(list){return list.reduce((tar, cur) > {if(Array.isArray(cur)){tar tar.concat(flatten(cur));} else {tar.push(cur);}return tar;}, []); } flatt…...

3D打印技术正悄然重塑模具工业格局

虽被誉为“工业之母”的模具在批量生产中仍占据核心地位&#xff0c;但3D打印以其“无模”直接成型的特性&#xff0c;在小批量、非标准化及复杂结构件制造领域展现出独特优势&#xff0c;随着技术和装备的不断发展&#xff0c;目前3D打印正逐渐向批量生产渗透&#xff0c;某品…...

深入解析 KMZ 文件的处理与可视化:从数据提取到地图展示项目实战

文章目录 1. KMZ 文件与 KML 文件简介1.1 KMZ 文件1.2 KML 文件 2. Python 环境配置与依赖安装3. 代码实现详解3.1 查找 KMZ 文件3.2 解压 KMZ 文件3.3 解析 KML 文件3.4 可视化 KMZ 数据 4. 项目实战4.1. 数据采集4.2. 项目完整代码 5. 项目运行与结果展示6. 总结与展望 在处理…...

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构和使用方式)

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构) 本文介绍论文原理介绍网络代码多种yaml设置网络测试及实验结果<!-- 这里放入论文图片 --> &emsp;;本文介绍 本文给大家带来的改进机制是结合MobileNetV4骨干网络,其中来自2024.5月发布的MobileNetV4…...

学习安卓开发遇到的问题

问题1&#xff1a;学习禁用与恢复按钮中&#xff1a; java代码报错&#xff1a;报错代码是 R.id.btn_enable;case R.id.btn_disable;case R.id.btn_test: 代码如下&#xff1a;&#xff08;实现功能在代码后面&#xff09; package com.example.apptest;import static java.…...

数学建模--禁忌搜索

目录 算法基本原理 关键要素 应用实例 实现细节 python代码示例 总结 禁忌搜索算法在解决哪些具体类型的组合优化问题中最有效&#xff1f; 禁忌搜索算法的邻域结构设计有哪些最佳实践或案例研究&#xff1f; 如何动态更新禁忌表以提高禁忌搜索算法的效率和性能&#…...

LeetCode 第136场双周赛个人题解

Q1. 求出胜利玩家的数目 原题链接 Q1. 求出胜利玩家的数目 思路分析 直接模拟 时间复杂度&#xff1a;O(N) AC代码 class Solution { public:int winningPlayerCount(int n, vector<vector<int>>& pick) {unordered_map<int, unordered_map<int, …...

The operation was rejected by your operating system. code CERT_HAS_EXPIRED报错解决

各种报错&#xff0c;试了清缓存&#xff0c;使用管理员权限打开命令行工具&#xff0c;更新npm&#xff0c;都不好使 最终解决&#xff1a;删除 c:/user/admin/ .npmrc...

[Git][基本操作]详细讲解

目录 1.创建本地仓库2.配置 Git3.添加文件1.添加文件2.提交文件3.其他 && 说明 4.删除文件5.跟踪修改文件6.版本回退7.撤销修改0.前言1.未add2.已add&#xff0c;未commit3.已add&#xff0c;已commit 1.创建本地仓库 创建⼀个Git本地仓库&#xff1a;git init运行该命…...

springMVC中从Excel文件中导入导出数据

目录 1. 数据库展示2. 导入依赖3. 写方法3.1 导入数据3.2 导出数据 4. 效果5. 不足6. 参考链接 1. 数据库展示 2. 导入依赖 pom.xml <!--文件上传处理--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId>&…...

C++的STL简介(三)

目录 1.vector的模拟实现 1.1begin&#xff08;&#xff09; 1.2end&#xff08;&#xff09; 1.3打印信息 1.4 reserve&#xff08;&#xff09; 1.5 size&#xff08;&#xff09; 1.6 capacity&#xff08;&#xff09; 1.7 push_back() 1.8[ ] 1.9 pop_back() 1.10 insert&…...

BERT模型

BERT模型是由谷歌团队于2019年提出的 Encoder-only 的 语言模型&#xff0c;发表于NLP顶会ACL上。原文题目为&#xff1a;《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》链接 在前大模型时代&#xff0c;BERT模型可以算是一个参数量比…...

举例说明计算机视觉(CV)技术的优势和挑战

计算机视觉&#xff08;CV&#xff09;技术是通过计算机模拟和处理图像与视频数据来模拟人类视觉的能力。它可以带来许多优势&#xff0c;也面临一些挑战。 优势&#xff1a; 自动化&#xff1a;CV技术可以自动处理大量的图像和视频数据&#xff0c;从而提高工作效率和准确性。…...

Animate软件基础:关于补间动画中的图层

Animate 文档中的每一个场景都可以包含任意数量的时间轴图层。使用图层和图层文件夹可组织动画序列的内容和分隔动画对象。在图层和文件夹中组织它们可防止它们在重叠时相互擦除、连接或分段。若要创建一次包含多个元件或文本字段的补间移动的动画&#xff0c;请将每个对象放置…...

mac|安装hashcat(压缩包密码p解)

一、安装Macports&#xff08;如果有brew就不用这一步&#xff09; 根据官网文档&#xff1a;The MacPorts Project -- Download & Installation&#xff0c;安装步骤如下 1、下载MacPorts&#xff0c;这里我用的是tar.gz &#xff0c;可以通过keka&#xff08;keka安装在…...

【保姆级系列:锐捷模拟器的下载安装使用全套教程】

保姆级系列&#xff1a;锐捷模拟器的下载安装使用全套教程 1.介绍2.下载3.安装4.实践教程5.验证 1.介绍 锐捷目前可以通过EVE-NG来模拟自己家的路由器&#xff0c;交换机&#xff0c;防火墙。实现方式是把自己家的镜像导入到EVE-ng里面来运行。下面主要就是介绍如何下载镜像和…...

virtualbox7安装centos7.9配置静态ip

1.背景 我大概在一年之前安装virtualbox7centos7.9的环境&#xff0c;但看视频说用vagrant启动的窗口可以不用第三方工具(比如xshell、secure等)连接centos7.9&#xff0c;于是尝鲜试了下还可以&#xff0c;导致系统文件格式是vmdk了&#xff08;网上有vmdk转vdi的方法&#xf…...

结构型设计模式:桥接/组合/装饰/外观/享元

结构型设计模式&#xff1a;适配器/代理 (qq.com)...

设计模式和设计原则回顾

设计模式和设计原则回顾 23种设计模式是设计原则的完美体现,设计原则设计原则是设计模式的理论基石, 设计模式 在经典的设计模式分类中(如《设计模式:可复用面向对象软件的基础》一书中),总共有23种设计模式,分为三大类: 一、创建型模式(5种) 1. 单例模式(Sing…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】

1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件&#xff08;System Property Definition File&#xff09;&#xff0c;用于声明和管理 Bluetooth 模块相…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

精益数据分析(97/126):邮件营销与用户参与度的关键指标优化指南

精益数据分析&#xff08;97/126&#xff09;&#xff1a;邮件营销与用户参与度的关键指标优化指南 在数字化营销时代&#xff0c;邮件列表效度、用户参与度和网站性能等指标往往决定着创业公司的增长成败。今天&#xff0c;我们将深入解析邮件打开率、网站可用性、页面参与时…...

OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 在 GPU 上对图像执行 均值漂移滤波&#xff08;Mean Shift Filtering&#xff09;&#xff0c;用于图像分割或平滑处理。 该函数将输入图像中的…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...