当前位置: 首页 > news >正文

springboot集成canal

目录

      • 一、打开mysql的binlog
        • 1.1 打开 MySQL 配置文件 `my.cnf`(通常位于 `/etc/mysql/my.cnf` 或 `/etc/my.cnf`)并添加或修改以下设置:
        • 1.2 重启mysql服务
        • 1.3 验证是否生效
      • 二、 部署canal 服务端(docker)
        • 2.1 下载启动脚本(可能需要梯子)
        • 2.2 启动服务
        • 2.3 验证服务启动成功
      • 三、springboot端集成canal客户端
        • 3.1 添加依赖 /配置
        • 3.2 客户端代码
        • 3.3 数据同步效果

项目上需要一个app,但是他们没有公网服务器,所以就在自家公网服务器开了一个mysql,项目上的服务器是能访问外网的,所以canal完美适配了这个需求

原理简介:canal服务端模拟mysql主从协议伪装成从数据库,从而读取主库的binlog,我们使用canal客户端自定义数据同步规则。

具体步骤

一、打开mysql的binlog

1.1 打开 MySQL 配置文件 my.cnf(通常位于 /etc/mysql/my.cnf/etc/my.cnf)并添加或修改以下设置:
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row

注意 :确保binlog-format是 row模式

1.2 重启mysql服务

具体命令根据你的服务器类型决定

1.3 验证是否生效
SHOW MASTER STATUS;

二、 部署canal 服务端(docker)

2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 
2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false \-e canal.destinations=test \-e canal.instance.master.address=127.0.0.1:3306  \-e canal.instance.dbUsername=canal  \-e canal.instance.dbPassword=canal  \-e canal.instance.connectionCharset=UTF-8 \-e canal.instance.tsdb.enable=true \-e canal.instance.gtidon=false  \-e canal.instance.filter.regex=.*\\..* 

参数解释:

-e canal.auto.scan=false:关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例,而是使用手动指定的配置。
-e canal.destinations=test:设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address=127.0.0.1:3306:指定主数据库的地址和端口。这里是本地 MySQL 实例,监听在 3306 端口。
-e canal.instance.dbUsername=canal:设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPassword=canal:设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对,以验证用户身份。
-e canal.instance.connectionCharset=UTF-8:设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enable=true:启用 Canal 的时间序列数据库(TSDB)。TSDB 用于存储时间戳和位置信息,这有助于在重启时恢复复制状态。
-e canal.instance.gtidon=false:关闭 GTID(全局事务标识符)。如果 GTID 处于关闭状态,Canal 将基于 binlog 文件和位置进行复制,而不是 GTID。
-e canal.instance.filter.regex=.*\\..*:设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库(.)和表(.*)。
2.3 验证服务启动成功
docker logs <containerids>

可以看到这样的打印:
image.png

三、springboot端集成canal客户端

3.1 添加依赖 /配置
<!--  canal begin-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.0</version>
</dependency>
<!--  canal end-->
canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111  #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 100
3.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j
@Component
public class CanalClient implements InitializingBean, DisposableBean {@Value("${canal.host}")private String canalHost;@Value("${canal.port}")private int canalPort;@Value("${canal.destination}")private String canalDestination;@Value("${canal.username}")private String canalUsername;@Value("${canal.password}")private String canalPassword;@Value("${canal.batch.size}")private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService = recordService;}@Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService = Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}@Overridepublic void destroy() throws Exception {if (executorService != null) {executorService.shutdown();}}private class Task implements Runnable {@Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message = canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId = message.getId();//获取批量的数量int size = message.getEntries().size();try {//如果没有数据if (batchId == -1 || size == 0) {// log.info("无数据");// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误,也直接确认,跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error("Error occurred when running Canal Client", e);} finally {canalConnector.disconnect();}}}}private void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();//打印Header信息log.info("================》; binlog[{} : {}] , name[{}, {}] , eventType : {}",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info("================》;isDdl: true,sql:{}", rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType == CanalEntry.EventType.DELETE) {log.info(">>>>>>>>>> 删除 >>>>>>>>>>");printColumnAndExecute(rowData.getBeforeColumnsList(), "DELETE");//如果是新增语句} else if (eventType == CanalEntry.EventType.INSERT) {log.info(">>>>>>>>>> 新增 >>>>>>>>>>");printColumnAndExecute(rowData.getAfterColumnsList(), "INSERT");//如果是更新的语句} else {log.info(">>>>>>>>>> 更新 >>>>>>>>>>");//变更前的数据log.info("------->; before");printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info("------->; after");printColumnAndExecute(rowData.getAfterColumnsList(), "UPDATE");}}}}/*** 执行数据同步* @param columns* @param type*/private void printColumnAndExecute(List<CanalEntry.Column> columns, String type) {if(type == null){return;}JSONObject jsonObject = new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean = jsonObject.toBean(Record.class);if(type.equals("INSERT")){// 执行新增recordService.save(bean);log.info("新增成功->{}", jsonObject.toJSONString(0));}else if (type.equals("UPDATE")){// 执行编辑recordService.updateById(bean);log.info("编辑成功->{}", jsonObject.toJSONString(0));}else if (type.equals("DELETE")){// 执行删除recordService.removeById(bean.getRecordId());log.info("删除成功->{}", jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN){log.info("********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){log.info("********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type={}",entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}
3.3 数据同步效果

image.png
有点感叹需求就是最好的老师,但是完不成需求就不好玩了

相关文章:

springboot集成canal

目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf&#xff08;通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf&#xff09;并添加或修改以下设置&#xff1a;1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端&#xff08;docker&#xff09;2.1 下载启动脚本(可…...

leetcode数论(2447. 最大公因数等于 K 的子数组数目)

前言 经过前期的数据结构和算法学习&#xff0c;开始以OD机考题作为练习题&#xff0c;继续加强下熟练程度。 描述 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 nums 的子数组中元素的最大公因数等于 k 的子数组数目。 子数组 是数组中一个连续的非空序列…...

实现数组扁平化的几种方式

目标: 实现数组扁平化[1,[2,[3,4,5]]] > [1,2,3,4,5] 我们有几种方法可以实现,分别为: 1、递归 function flatten(list){return list.reduce((tar, cur) > {if(Array.isArray(cur)){tar tar.concat(flatten(cur));} else {tar.push(cur);}return tar;}, []); } flatt…...

3D打印技术正悄然重塑模具工业格局

虽被誉为“工业之母”的模具在批量生产中仍占据核心地位&#xff0c;但3D打印以其“无模”直接成型的特性&#xff0c;在小批量、非标准化及复杂结构件制造领域展现出独特优势&#xff0c;随着技术和装备的不断发展&#xff0c;目前3D打印正逐渐向批量生产渗透&#xff0c;某品…...

深入解析 KMZ 文件的处理与可视化:从数据提取到地图展示项目实战

文章目录 1. KMZ 文件与 KML 文件简介1.1 KMZ 文件1.2 KML 文件 2. Python 环境配置与依赖安装3. 代码实现详解3.1 查找 KMZ 文件3.2 解压 KMZ 文件3.3 解析 KML 文件3.4 可视化 KMZ 数据 4. 项目实战4.1. 数据采集4.2. 项目完整代码 5. 项目运行与结果展示6. 总结与展望 在处理…...

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构和使用方式)

YOLOv5轻量化改进 | backbone | 结合MobileNetV4(包含多个结构) 本文介绍论文原理介绍网络代码多种yaml设置网络测试及实验结果<!-- 这里放入论文图片 --> &emsp;;本文介绍 本文给大家带来的改进机制是结合MobileNetV4骨干网络,其中来自2024.5月发布的MobileNetV4…...

学习安卓开发遇到的问题

问题1&#xff1a;学习禁用与恢复按钮中&#xff1a; java代码报错&#xff1a;报错代码是 R.id.btn_enable;case R.id.btn_disable;case R.id.btn_test: 代码如下&#xff1a;&#xff08;实现功能在代码后面&#xff09; package com.example.apptest;import static java.…...

数学建模--禁忌搜索

目录 算法基本原理 关键要素 应用实例 实现细节 python代码示例 总结 禁忌搜索算法在解决哪些具体类型的组合优化问题中最有效&#xff1f; 禁忌搜索算法的邻域结构设计有哪些最佳实践或案例研究&#xff1f; 如何动态更新禁忌表以提高禁忌搜索算法的效率和性能&#…...

LeetCode 第136场双周赛个人题解

Q1. 求出胜利玩家的数目 原题链接 Q1. 求出胜利玩家的数目 思路分析 直接模拟 时间复杂度&#xff1a;O(N) AC代码 class Solution { public:int winningPlayerCount(int n, vector<vector<int>>& pick) {unordered_map<int, unordered_map<int, …...

The operation was rejected by your operating system. code CERT_HAS_EXPIRED报错解决

各种报错&#xff0c;试了清缓存&#xff0c;使用管理员权限打开命令行工具&#xff0c;更新npm&#xff0c;都不好使 最终解决&#xff1a;删除 c:/user/admin/ .npmrc...

[Git][基本操作]详细讲解

目录 1.创建本地仓库2.配置 Git3.添加文件1.添加文件2.提交文件3.其他 && 说明 4.删除文件5.跟踪修改文件6.版本回退7.撤销修改0.前言1.未add2.已add&#xff0c;未commit3.已add&#xff0c;已commit 1.创建本地仓库 创建⼀个Git本地仓库&#xff1a;git init运行该命…...

springMVC中从Excel文件中导入导出数据

目录 1. 数据库展示2. 导入依赖3. 写方法3.1 导入数据3.2 导出数据 4. 效果5. 不足6. 参考链接 1. 数据库展示 2. 导入依赖 pom.xml <!--文件上传处理--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId>&…...

C++的STL简介(三)

目录 1.vector的模拟实现 1.1begin&#xff08;&#xff09; 1.2end&#xff08;&#xff09; 1.3打印信息 1.4 reserve&#xff08;&#xff09; 1.5 size&#xff08;&#xff09; 1.6 capacity&#xff08;&#xff09; 1.7 push_back() 1.8[ ] 1.9 pop_back() 1.10 insert&…...

BERT模型

BERT模型是由谷歌团队于2019年提出的 Encoder-only 的 语言模型&#xff0c;发表于NLP顶会ACL上。原文题目为&#xff1a;《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》链接 在前大模型时代&#xff0c;BERT模型可以算是一个参数量比…...

举例说明计算机视觉(CV)技术的优势和挑战

计算机视觉&#xff08;CV&#xff09;技术是通过计算机模拟和处理图像与视频数据来模拟人类视觉的能力。它可以带来许多优势&#xff0c;也面临一些挑战。 优势&#xff1a; 自动化&#xff1a;CV技术可以自动处理大量的图像和视频数据&#xff0c;从而提高工作效率和准确性。…...

Animate软件基础:关于补间动画中的图层

Animate 文档中的每一个场景都可以包含任意数量的时间轴图层。使用图层和图层文件夹可组织动画序列的内容和分隔动画对象。在图层和文件夹中组织它们可防止它们在重叠时相互擦除、连接或分段。若要创建一次包含多个元件或文本字段的补间移动的动画&#xff0c;请将每个对象放置…...

mac|安装hashcat(压缩包密码p解)

一、安装Macports&#xff08;如果有brew就不用这一步&#xff09; 根据官网文档&#xff1a;The MacPorts Project -- Download & Installation&#xff0c;安装步骤如下 1、下载MacPorts&#xff0c;这里我用的是tar.gz &#xff0c;可以通过keka&#xff08;keka安装在…...

【保姆级系列:锐捷模拟器的下载安装使用全套教程】

保姆级系列&#xff1a;锐捷模拟器的下载安装使用全套教程 1.介绍2.下载3.安装4.实践教程5.验证 1.介绍 锐捷目前可以通过EVE-NG来模拟自己家的路由器&#xff0c;交换机&#xff0c;防火墙。实现方式是把自己家的镜像导入到EVE-ng里面来运行。下面主要就是介绍如何下载镜像和…...

virtualbox7安装centos7.9配置静态ip

1.背景 我大概在一年之前安装virtualbox7centos7.9的环境&#xff0c;但看视频说用vagrant启动的窗口可以不用第三方工具(比如xshell、secure等)连接centos7.9&#xff0c;于是尝鲜试了下还可以&#xff0c;导致系统文件格式是vmdk了&#xff08;网上有vmdk转vdi的方法&#xf…...

结构型设计模式:桥接/组合/装饰/外观/享元

结构型设计模式&#xff1a;适配器/代理 (qq.com)...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

rnn判断string中第一次出现a的下标

# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比

在机器学习的回归分析中&#xff0c;损失函数的选择对模型性能具有决定性影响。均方误差&#xff08;MSE&#xff09;作为经典的损失函数&#xff0c;在处理干净数据时表现优异&#xff0c;但在面对包含异常值的噪声数据时&#xff0c;其对大误差的二次惩罚机制往往导致模型参数…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路&#xff1a; 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑&#xff1a;async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

【从零学习JVM|第三篇】类的生命周期(高频面试题)

前言&#xff1a; 在Java编程中&#xff0c;类的生命周期是指类从被加载到内存中开始&#xff0c;到被卸载出内存为止的整个过程。了解类的生命周期对于理解Java程序的运行机制以及性能优化非常重要。本文会深入探寻类的生命周期&#xff0c;让读者对此有深刻印象。 目录 ​…...

Java求职者面试指南:计算机基础与源码原理深度解析

Java求职者面试指南&#xff1a;计算机基础与源码原理深度解析 第一轮提问&#xff1a;基础概念问题 1. 请解释什么是进程和线程的区别&#xff1f; 面试官&#xff1a;进程是程序的一次执行过程&#xff0c;是系统进行资源分配和调度的基本单位&#xff1b;而线程是进程中的…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...