当前位置: 首页 > news >正文

SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/

这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz,canal.admin-1.1.7.tar.gz 是一个监控服务,可选;

这里说一下 deployer 和 adapter 的关系,deployer 主要是监控源数据的数据变更,也是就所有的 insert、update、delete,

只要数据有变化就通知 adapter ,所以真正负责往目标库写数据的是 adapter 。

二、canal-deployer 配置说明

建议先新建一个文件夹 deployer ,然后把上面下载的压缩包拷进去在解压;

修改 /conf/example/instance.properties,这里只贴出了要修改的地方

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

这个服务个人理解是一个服务端,程序的客户端会连接他,他监听到数据变化再转发给 adapter

三、canal-adapter 配置说明

建议先新建一个文件夹 adapter ,然后把上面下载的压缩包拷进去在解压;

这里有两个地方要修改,以 mysql 数据同步为例

/conf/application.yml

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
​
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:
​srcDataSources:defaultDS:url: jdbc:mysql://localhost:3306/hebeiqx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falseusername: rootpassword: 123456canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: rdbkey: mysql1properties:jdbc.driverClassName: com.mysql.jdbc.Driverjdbc.url: jdbc:mysql://localhost:3306/weather?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falsejdbc.username: rootjdbc.password: 123456

其实这里也没什么改的,srcDataSources 源数据库连接信息,canalAdapters 下面的目标数据库的连接信息,canalAdapters 下面一个实例就是一个 topic

/conf/rdb/mytest_user.yml 这个文件的配置比较奇葩,大概有三种场景

1、单表同步,targetTable 后面直接写目标库表名,这个版本不需要写目标库的名称
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000
2、整个数据库同步,但是有个要求是两个数据库的名字要一致,而且是必须(有疑问?看看3就解决了)
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: mytest
3、多表同步,网上的案例都是单表的demo,目前还没有看到我这种方式

上面的1里面同步了 table1 这张表,那现在还要同步 table2 这种表怎么办,你是不是以为是这样:

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:- database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000- database: testtable: table2targetTable: table2targetPk:id: idmapAll: truecommitBatch: 7000

上面这种方式启动就直接报错了,网上找了一天也没看到相关资料......

重点:把 mytest_user.yml 复制一份,再里面再配置另一张表就可以了,很脑残但是真管用;

注意这里所有文件的名字都是 xxx_user.yml 这种格式,内容就跟 1 里面的一样,把表名改一下就行;

四、SpringBoot 集成

添加 maven 依赖

<!--canal-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version><!-- 去掉否则启动报错 --><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.17.3</version>
</dependency>

客户端连接代码,都是模板代码之间用就行,printEnity 和 printColumn 这俩方法没有也行

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
​
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
@Component
public class CanalClient {
​private Logger logger = LoggerFactory.getLogger(CanalClient.class);
​private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
​private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
​@Autowiredprivate CanalInstanceProperties canalInstanceProperties;
​@PostConstructprivate void startListening() {canalInstanceProperties.getInstance().forEach(instanceName -> {executors.submit(() -> {connector(instanceName);});});}
​/*** 消费canal的线程池*/public void connector(String instanceName) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), instanceName, "", "");
​try {// 打开连接connector.connect();// 订阅所有消息//connector.subscribe(".*\\..*");// 只订阅test1数据库下的所有表connector.subscribe("hebeiqx.*");// 恢复到之前同步的那个位置connector.rollback();
​int batchSize = 1000;for (; ; ) {// 获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少Message message = connector.getWithoutAck(batchSize);// 获取消息idlong batchId = message.getId();// 获取批量的数量int size = message.getEntries().size();if (size == 0 || batchId == -1) {//logger.info("暂无数据......");try {// 没有数据就等待1秒Thread.sleep(1000);} catch (InterruptedException ignored) {}}if (batchId != -1) {logger.info("数据同步监听中......");logger.info("instance -> {}, msgId -> {}", instanceName, batchId);// 数据处理//printEnity(message.getEntries());// 提交确认connector.ack(batchId);// 处理失败,回滚数据connector.rollback(batchId);}}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
​private void printEnity(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {// 开启/关闭事务的实体类型,跳过if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}
​// RowChange对象,包含了一行数据变化的所有特征// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange = null;
​try {// 序列化数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {e.printStackTrace();}
​assert rowChange != null;
​// 获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();
​// 打印Header信息logger.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));
​// 判断是否是DDL语句if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {logger.info("sql ------------>{}", rowChange.getSql());}
​// 获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {switch (rowChange.getEventType()) {// 如果希望监听多种事件,可以手动增加casecase UPDATE:printColumn(rowData.getAfterColumnsList());printColumn(rowData.getBeforeColumnsList());break;case INSERT:printColumn(rowData.getAfterColumnsList());break;case DELETE:printColumn(rowData.getBeforeColumnsList());break;default:}}}}
​private void printColumn(List<CanalEntry.Column> columns) {StringBuilder sb = new StringBuilder();for (CanalEntry.Column column : columns) {sb.append("[");sb.append(column.getName()).append(" : ").append(column.getValue()).append("update=").append(column.getUpdated());sb.append("]");sb.append("    ");}logger.info(sb.toString());}
}

相关文章:

SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系 下载地址&#xff1a;https://github.com/alibaba/canal/releases/download/canal-1.1.7/ 这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz&#xff0c;canal.admin-1.1.7.tar.gz 是一个监控服务&#xff0c;可选&#xf…...

【CVE-2022-22733漏洞复现】

Apache ShardingSphere ElasticJob-UI漏洞 漏洞编号:CVE-2022-22733 文档说明 本文作者:SwBack 创作时间:2024/1/21 19:19:19 知乎:https://www.zhihu.com/people/back-88-87 CSDN:https://blog.csdn.net/qq_30817059 百度搜索: SwBack漏洞描述 Apache ShardingSphere Elast…...

Python爬虫---scrapy框架---当当网管道封装

项目结构&#xff1a; dang.py文件&#xff1a;自己创建&#xff0c;实现爬虫核心功能的文件 import scrapy from scrapy_dangdang_20240113.items import ScrapyDangdang20240113Itemclass DangSpider(scrapy.Spider):name "dang" # 名字# 如果是多页下载的话, …...

【机器学习】机器学习四大类第01课

一、机器学习四大类 有监督学习 (Supervised Learning) 有监督学习是通过已知的输入-输出对&#xff08;即标记过的训练数据&#xff09;来学习函数关系的过程。在训练阶段&#xff0c;模型会根据这些示例调整参数以尽可能准确地预测新的、未见过的数据点的输出。 实例&#x…...

下述默认构造函数有什么问题?

12.4 // points to string allocated by new // holds length of string 独立的、相同的数据,而不会重叠。由于同样的原因,必须定义赋值操作符。对于每一种情况,最终目的 都是执行深度复制,也就是说,复制实际的数据,而不仅仅是复制指向数据的指针。 对象的存储持续性为自动或…...

vite和mockjs配合使用

vite mockjs 当后端还没准备完成之前&#xff0c;前端可以使用 mock 模拟后端响应&#xff0c;提高开发效率 1、安装插件 使用 vite-plugin-mock 插件&#xff0c;配合mockjs完成项目的 mock 配置 npm install mockjs vite-plugin-mock2、vite配置插件 在 vite.config.js…...

【数据结构】常见八大排序算法总结

目录 前言 1.直接插入排序 2.希尔排序 3.选择排序 4.堆排序 5.冒泡排序 6.快速排序 6.1Hoare版本 6.2挖坑法 6.3前后指针法 6.4快速排序的递归实现 6.5快速排序的非递归实现 7.归并排序 8.计数排序&#xff08;非比较排序&#xff09; 9.补充:基数排序 10.总结…...

系统学英语 — 句法 — 常规句型

目录 文章目录 目录5 大基本句型复合句型主语从句宾语从句表语从句定语从句状语从句同位语从句补语从句 谓语句型 5 大基本句型 主谓&#xff1a;主语发出一个动作&#xff0c;例如&#xff1a;He cried.主谓宾&#xff1a;we study English.主系表&#xff1a;主语具有某些特…...

Github操作网络异常笔记

Github操作网络异常笔记 1. 源由2. 解决2.1 方案一2.2 方案二 3. 总结 1. 源由 开源技术在国内永远是“蛋疼”&#xff0c;这些"政治"问题对于追求技术的我们&#xff0c;形成无法回避的障碍。 $ git pull ssh: connect to host github.com port 22: Connection ti…...

Vue3新特性defineModel()便捷的双向绑定数据

官网介绍 传送门 配置 要求&#xff1a; 版本&#xff1a; vue > 3.4(必须&#xff01;&#xff01;&#xff01;)配置&#xff1a;vite.config.js 使用场景和案例 使用场景&#xff1a;父子组件的数据双向绑定&#xff0c;不用emit和props的繁重代码 具体案例 代码实…...

vue列表飞入效果

效果 实现代码 <template><div><button click"add">添加</button><TransitionGroup name"list" tag"ul"><div class"list-item" v-for"item in items" :key"item.id">{{ i…...

C语言·预处理详解

1. 预定义符号 C语言设置了一些预定义符号&#xff0c;可以直接使用&#xff0c;预定义符号也是在预处理期间处理的 __FILE__ 进行编译的源文件 __LINE__ 文件当前的行号 __DATE__ 文件被编译的日期 __TIME__ 文件被编译的时间 __STDC__ 如果编译器遵循ANSI C&#xff0c;…...

服务器与普通电脑的区别,普通电脑可以当作服务器用吗?

服务器在我们日常应用中非常常见&#xff0c;手机APP、手机游戏、PC游戏、小程序、网站等等都需要部署在服务器上&#xff0c;为我们提供各种计算、应用服务。服务器也是计算机的一种&#xff0c;虽然内部结构相差不大&#xff0c;但是服务器的运行速度更快、负载更高、成本更高…...

数字身份所有权:Web3时代用户数据的掌控权

随着Web3时代的来临&#xff0c;数字身份的概念正焕发出崭新的光芒。在这个数字化的时代&#xff0c;用户的个人数据变得愈加珍贵&#xff0c;而Web3则为用户带来了数字身份所有权的概念&#xff0c;重新定义了用户与个人数据之间的关系。本文将深入探讨Web3时代用户数据的掌控…...

python爬虫如何写,有哪些成功爬取的案例

编写Python爬虫时&#xff0c;常用的库包括Requests、Beautiful Soup和Scrapy。以下是三个简单的Python爬虫案例&#xff0c;分别使用Requests和Beautiful Soup&#xff0c;以及Scrapy。 1. 使用Requests和Beautiful Soup爬取网页内容&#xff1a; import requests from bs4 …...

PLC物联网网关BL104实现PLC协议转MQTT、OPC UA、Modbus TCP

随着物联网技术的迅猛发展&#xff0c;人们深刻认识到在智能化生产和生活中&#xff0c;实时、可靠、安全的数据传输至关重要。在此背景下&#xff0c;高性能的物联网数据传输解决方案——协议转换网关应运而生&#xff0c;广泛应用于工业自动化和数字化工厂应用环境中。 无缝衔…...

explain工具优化mysql需要达到什么级别?

explain工具优化mysql需要达到什么级别&#xff1f; 一、explain工具是什么&#xff1f;二、explain查询后各字段的含义三、explain查询后type字段有哪些类型&#xff1f;四、type类型需要优化到哪个阶段&#xff1f; 一、explain工具是什么&#xff1f; explain是什么&#x…...

RHCE作业

架设一台NFS服务器&#xff0c;并按照以下要求配置 1、开放/nfs/shared目录&#xff0c;供所有用户查询资料 2、开放/nfs/upload目录&#xff0c;为192.168.xxx.0/24网段主机可以上传目录&#xff0c;并将所有用户及所属的组映射为nfs-upload,其UID和GID均为210 3、将/home/to…...

在Java中调企微机器人发送消息到群里

目录 如何使用群机器人 消息类型及数据格式 文本类型 markdown类型 图片类型 图文类型 文件类型 模版卡片类型 文本通知模版卡片 图文展示模版卡片 消息发送频率限制 文件上传接口 Java 执行语句 String url "webhook的Url"; String result HttpReque…...

鸿蒙开发(四)UIAbility和Page交互

通过上一篇的学习&#xff0c;相信大家对UIAbility已经有了初步的认知。在上篇中&#xff0c;我们最后实现了一个小demo&#xff0c;从一个UIAbility调起了另外一个UIAbility。当时我提到过&#xff0c;暂不实现比如点击EntryAbility中的控件去触发跳转&#xff0c;而是在Entry…...

Qwen3-ForcedAligner-0.6B在语音克隆中的应用:精准音素对齐技术

Qwen3-ForcedAligner-0.6B在语音克隆中的应用&#xff1a;精准音素对齐技术 1. 引言 你有没有遇到过这样的情况&#xff1a;用语音克隆技术生成的声音&#xff0c;听起来总感觉哪里不对劲&#xff1f;可能是某个字的发音时长不对&#xff0c;或者是词语之间的停顿不自然。这些…...

零基础实战:揭秘Python漫画下载器高效收藏完整指南

零基础实战&#xff1a;揭秘Python漫画下载器高效收藏完整指南 【免费下载链接】copymanga-downloader 使用python编译exe/bash/命令行参数来下载copymanga(拷贝漫画)中的漫画&#xff0c;支持批量选话下载和获取您收藏的漫画并下载&#xff01;(windows&linux支持&#xf…...

大数据-253 离线数仓 - Airflow 入门与任务调度实战:DAG、Operator、Executor 部署排错指南

TL;DR 场景&#xff1a;面向离线数仓与定时任务场景&#xff0c;快速理解 Airflow 的核心概念、DAG 编排方式与基础命令。结论&#xff1a;本文内容适合作为 Airflow 入门示例&#xff0c;但代码与命令明显偏旧&#xff0c;需区分 Airflow 1.x 与 2.x 版本差异。产出&#xff…...

Godep依赖自动发现机制:Go项目依赖管理的终极指南

Godep依赖自动发现机制&#xff1a;Go项目依赖管理的终极指南 【免费下载链接】godep dependency tool for go 项目地址: https://gitcode.com/gh_mirrors/go/godep Godep作为Go语言早期经典的依赖管理工具&#xff0c;通过自动发现与追踪项目依赖&#xff0c;为Go开发者…...

XUnity.AutoTranslator:如何为Unity游戏构建高效的多语言本地化系统

XUnity.AutoTranslator&#xff1a;如何为Unity游戏构建高效的多语言本地化系统 【免费下载链接】XUnity.AutoTranslator 项目地址: https://gitcode.com/gh_mirrors/xu/XUnity.AutoTranslator XUnity.AutoTranslator是一个专为Unity游戏设计的自动翻译插件&#xff0c…...

基于Flowable全局监听器实现智能节点跳过:告别重复审批

1. 为什么需要智能跳过重复审批节点&#xff1f; 想象一下这样的场景&#xff1a;你设计了一个采购审批流程&#xff0c;部门经理需要先后审批"采购申请"和"采购确认"两个节点。但当这两个节点都分配给同一位经理时&#xff0c;他会在系统里看到两个完全相…...

一天一个开源项目(第59篇):Dream Recorder - 用 AI 把梦境变成视频的物理设备

引言 “Record your dreams. Wake up. Speak. Watch them come to life.” 这是「一天一个开源项目」系列的第 59 篇文章。今天介绍的项目是 Dream Recorder&#xff08;GitHub&#xff09;。 想把梦境变成可回放的视频&#xff1f;Dream Recorder 是 Modem 开源的物理梦境记录…...

RRFLibraries:Duet 3D打印机固件的硬实时C++驱动库

1. RRFLibraries 项目概述RRFLibraries 是 RepRapFirmware 生态系统中高度工程化的底层软件基础设施&#xff0c;其定位并非通用型嵌入式库&#xff0c;而是专为 3D 打印固件——特别是 Duet 系列控制器&#xff08;Duet 2 WiFi、Duet 3 Mainboard、Duet 3 Mini&#xff09;——…...

ILI9341 TFT驱动库:裸机SPI显示驱动设计与优化

1. SPI_TFT_ILI9341 库概述SPI_TFT_ILI9341 是一个面向嵌入式平台的轻量级图形驱动库&#xff0c;专为基于 ILI9341 显示控制器的 2.4 英寸、240320 分辨率 SPI 接口 TFT-LCD 模块设计。该库不依赖操作系统&#xff0c;可直接运行于裸机环境&#xff08;Bare Metal&#xff09;…...

高效图像浏览:解锁90+格式的轻量级解决方案

高效图像浏览&#xff1a;解锁90格式的轻量级解决方案 【免费下载链接】ImageGlass &#x1f3de; A lightweight, versatile image viewer 项目地址: https://gitcode.com/gh_mirrors/im/ImageGlass 在数字时代&#xff0c;我们每天都要与各种图像格式打交道&#xff0…...