当前位置: 首页 > news >正文

SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/

这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz,canal.admin-1.1.7.tar.gz 是一个监控服务,可选;

这里说一下 deployer 和 adapter 的关系,deployer 主要是监控源数据的数据变更,也是就所有的 insert、update、delete,

只要数据有变化就通知 adapter ,所以真正负责往目标库写数据的是 adapter 。

二、canal-deployer 配置说明

建议先新建一个文件夹 deployer ,然后把上面下载的压缩包拷进去在解压;

修改 /conf/example/instance.properties,这里只贴出了要修改的地方

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

这个服务个人理解是一个服务端,程序的客户端会连接他,他监听到数据变化再转发给 adapter

三、canal-adapter 配置说明

建议先新建一个文件夹 adapter ,然后把上面下载的压缩包拷进去在解压;

这里有两个地方要修改,以 mysql 数据同步为例

/conf/application.yml

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
​
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:
​srcDataSources:defaultDS:url: jdbc:mysql://localhost:3306/hebeiqx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falseusername: rootpassword: 123456canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: rdbkey: mysql1properties:jdbc.driverClassName: com.mysql.jdbc.Driverjdbc.url: jdbc:mysql://localhost:3306/weather?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falsejdbc.username: rootjdbc.password: 123456

其实这里也没什么改的,srcDataSources 源数据库连接信息,canalAdapters 下面的目标数据库的连接信息,canalAdapters 下面一个实例就是一个 topic

/conf/rdb/mytest_user.yml 这个文件的配置比较奇葩,大概有三种场景

1、单表同步,targetTable 后面直接写目标库表名,这个版本不需要写目标库的名称
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000
2、整个数据库同步,但是有个要求是两个数据库的名字要一致,而且是必须(有疑问?看看3就解决了)
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: mytest
3、多表同步,网上的案例都是单表的demo,目前还没有看到我这种方式

上面的1里面同步了 table1 这张表,那现在还要同步 table2 这种表怎么办,你是不是以为是这样:

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:- database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000- database: testtable: table2targetTable: table2targetPk:id: idmapAll: truecommitBatch: 7000

上面这种方式启动就直接报错了,网上找了一天也没看到相关资料......

重点:把 mytest_user.yml 复制一份,再里面再配置另一张表就可以了,很脑残但是真管用;

注意这里所有文件的名字都是 xxx_user.yml 这种格式,内容就跟 1 里面的一样,把表名改一下就行;

四、SpringBoot 集成

添加 maven 依赖

<!--canal-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version><!-- 去掉否则启动报错 --><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.17.3</version>
</dependency>

客户端连接代码,都是模板代码之间用就行,printEnity 和 printColumn 这俩方法没有也行

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
​
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
@Component
public class CanalClient {
​private Logger logger = LoggerFactory.getLogger(CanalClient.class);
​private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
​private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
​@Autowiredprivate CanalInstanceProperties canalInstanceProperties;
​@PostConstructprivate void startListening() {canalInstanceProperties.getInstance().forEach(instanceName -> {executors.submit(() -> {connector(instanceName);});});}
​/*** 消费canal的线程池*/public void connector(String instanceName) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), instanceName, "", "");
​try {// 打开连接connector.connect();// 订阅所有消息//connector.subscribe(".*\\..*");// 只订阅test1数据库下的所有表connector.subscribe("hebeiqx.*");// 恢复到之前同步的那个位置connector.rollback();
​int batchSize = 1000;for (; ; ) {// 获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少Message message = connector.getWithoutAck(batchSize);// 获取消息idlong batchId = message.getId();// 获取批量的数量int size = message.getEntries().size();if (size == 0 || batchId == -1) {//logger.info("暂无数据......");try {// 没有数据就等待1秒Thread.sleep(1000);} catch (InterruptedException ignored) {}}if (batchId != -1) {logger.info("数据同步监听中......");logger.info("instance -> {}, msgId -> {}", instanceName, batchId);// 数据处理//printEnity(message.getEntries());// 提交确认connector.ack(batchId);// 处理失败,回滚数据connector.rollback(batchId);}}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
​private void printEnity(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {// 开启/关闭事务的实体类型,跳过if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}
​// RowChange对象,包含了一行数据变化的所有特征// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange = null;
​try {// 序列化数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {e.printStackTrace();}
​assert rowChange != null;
​// 获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();
​// 打印Header信息logger.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));
​// 判断是否是DDL语句if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {logger.info("sql ------------>{}", rowChange.getSql());}
​// 获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {switch (rowChange.getEventType()) {// 如果希望监听多种事件,可以手动增加casecase UPDATE:printColumn(rowData.getAfterColumnsList());printColumn(rowData.getBeforeColumnsList());break;case INSERT:printColumn(rowData.getAfterColumnsList());break;case DELETE:printColumn(rowData.getBeforeColumnsList());break;default:}}}}
​private void printColumn(List<CanalEntry.Column> columns) {StringBuilder sb = new StringBuilder();for (CanalEntry.Column column : columns) {sb.append("[");sb.append(column.getName()).append(" : ").append(column.getValue()).append("update=").append(column.getUpdated());sb.append("]");sb.append("    ");}logger.info(sb.toString());}
}

相关文章:

SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系 下载地址&#xff1a;https://github.com/alibaba/canal/releases/download/canal-1.1.7/ 这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz&#xff0c;canal.admin-1.1.7.tar.gz 是一个监控服务&#xff0c;可选&#xf…...

【CVE-2022-22733漏洞复现】

Apache ShardingSphere ElasticJob-UI漏洞 漏洞编号:CVE-2022-22733 文档说明 本文作者:SwBack 创作时间:2024/1/21 19:19:19 知乎:https://www.zhihu.com/people/back-88-87 CSDN:https://blog.csdn.net/qq_30817059 百度搜索: SwBack漏洞描述 Apache ShardingSphere Elast…...

Python爬虫---scrapy框架---当当网管道封装

项目结构&#xff1a; dang.py文件&#xff1a;自己创建&#xff0c;实现爬虫核心功能的文件 import scrapy from scrapy_dangdang_20240113.items import ScrapyDangdang20240113Itemclass DangSpider(scrapy.Spider):name "dang" # 名字# 如果是多页下载的话, …...

【机器学习】机器学习四大类第01课

一、机器学习四大类 有监督学习 (Supervised Learning) 有监督学习是通过已知的输入-输出对&#xff08;即标记过的训练数据&#xff09;来学习函数关系的过程。在训练阶段&#xff0c;模型会根据这些示例调整参数以尽可能准确地预测新的、未见过的数据点的输出。 实例&#x…...

下述默认构造函数有什么问题?

12.4 // points to string allocated by new // holds length of string 独立的、相同的数据,而不会重叠。由于同样的原因,必须定义赋值操作符。对于每一种情况,最终目的 都是执行深度复制,也就是说,复制实际的数据,而不仅仅是复制指向数据的指针。 对象的存储持续性为自动或…...

vite和mockjs配合使用

vite mockjs 当后端还没准备完成之前&#xff0c;前端可以使用 mock 模拟后端响应&#xff0c;提高开发效率 1、安装插件 使用 vite-plugin-mock 插件&#xff0c;配合mockjs完成项目的 mock 配置 npm install mockjs vite-plugin-mock2、vite配置插件 在 vite.config.js…...

【数据结构】常见八大排序算法总结

目录 前言 1.直接插入排序 2.希尔排序 3.选择排序 4.堆排序 5.冒泡排序 6.快速排序 6.1Hoare版本 6.2挖坑法 6.3前后指针法 6.4快速排序的递归实现 6.5快速排序的非递归实现 7.归并排序 8.计数排序&#xff08;非比较排序&#xff09; 9.补充:基数排序 10.总结…...

系统学英语 — 句法 — 常规句型

目录 文章目录 目录5 大基本句型复合句型主语从句宾语从句表语从句定语从句状语从句同位语从句补语从句 谓语句型 5 大基本句型 主谓&#xff1a;主语发出一个动作&#xff0c;例如&#xff1a;He cried.主谓宾&#xff1a;we study English.主系表&#xff1a;主语具有某些特…...

Github操作网络异常笔记

Github操作网络异常笔记 1. 源由2. 解决2.1 方案一2.2 方案二 3. 总结 1. 源由 开源技术在国内永远是“蛋疼”&#xff0c;这些"政治"问题对于追求技术的我们&#xff0c;形成无法回避的障碍。 $ git pull ssh: connect to host github.com port 22: Connection ti…...

Vue3新特性defineModel()便捷的双向绑定数据

官网介绍 传送门 配置 要求&#xff1a; 版本&#xff1a; vue > 3.4(必须&#xff01;&#xff01;&#xff01;)配置&#xff1a;vite.config.js 使用场景和案例 使用场景&#xff1a;父子组件的数据双向绑定&#xff0c;不用emit和props的繁重代码 具体案例 代码实…...

vue列表飞入效果

效果 实现代码 <template><div><button click"add">添加</button><TransitionGroup name"list" tag"ul"><div class"list-item" v-for"item in items" :key"item.id">{{ i…...

C语言·预处理详解

1. 预定义符号 C语言设置了一些预定义符号&#xff0c;可以直接使用&#xff0c;预定义符号也是在预处理期间处理的 __FILE__ 进行编译的源文件 __LINE__ 文件当前的行号 __DATE__ 文件被编译的日期 __TIME__ 文件被编译的时间 __STDC__ 如果编译器遵循ANSI C&#xff0c;…...

服务器与普通电脑的区别,普通电脑可以当作服务器用吗?

服务器在我们日常应用中非常常见&#xff0c;手机APP、手机游戏、PC游戏、小程序、网站等等都需要部署在服务器上&#xff0c;为我们提供各种计算、应用服务。服务器也是计算机的一种&#xff0c;虽然内部结构相差不大&#xff0c;但是服务器的运行速度更快、负载更高、成本更高…...

数字身份所有权:Web3时代用户数据的掌控权

随着Web3时代的来临&#xff0c;数字身份的概念正焕发出崭新的光芒。在这个数字化的时代&#xff0c;用户的个人数据变得愈加珍贵&#xff0c;而Web3则为用户带来了数字身份所有权的概念&#xff0c;重新定义了用户与个人数据之间的关系。本文将深入探讨Web3时代用户数据的掌控…...

python爬虫如何写,有哪些成功爬取的案例

编写Python爬虫时&#xff0c;常用的库包括Requests、Beautiful Soup和Scrapy。以下是三个简单的Python爬虫案例&#xff0c;分别使用Requests和Beautiful Soup&#xff0c;以及Scrapy。 1. 使用Requests和Beautiful Soup爬取网页内容&#xff1a; import requests from bs4 …...

PLC物联网网关BL104实现PLC协议转MQTT、OPC UA、Modbus TCP

随着物联网技术的迅猛发展&#xff0c;人们深刻认识到在智能化生产和生活中&#xff0c;实时、可靠、安全的数据传输至关重要。在此背景下&#xff0c;高性能的物联网数据传输解决方案——协议转换网关应运而生&#xff0c;广泛应用于工业自动化和数字化工厂应用环境中。 无缝衔…...

explain工具优化mysql需要达到什么级别?

explain工具优化mysql需要达到什么级别&#xff1f; 一、explain工具是什么&#xff1f;二、explain查询后各字段的含义三、explain查询后type字段有哪些类型&#xff1f;四、type类型需要优化到哪个阶段&#xff1f; 一、explain工具是什么&#xff1f; explain是什么&#x…...

RHCE作业

架设一台NFS服务器&#xff0c;并按照以下要求配置 1、开放/nfs/shared目录&#xff0c;供所有用户查询资料 2、开放/nfs/upload目录&#xff0c;为192.168.xxx.0/24网段主机可以上传目录&#xff0c;并将所有用户及所属的组映射为nfs-upload,其UID和GID均为210 3、将/home/to…...

在Java中调企微机器人发送消息到群里

目录 如何使用群机器人 消息类型及数据格式 文本类型 markdown类型 图片类型 图文类型 文件类型 模版卡片类型 文本通知模版卡片 图文展示模版卡片 消息发送频率限制 文件上传接口 Java 执行语句 String url "webhook的Url"; String result HttpReque…...

鸿蒙开发(四)UIAbility和Page交互

通过上一篇的学习&#xff0c;相信大家对UIAbility已经有了初步的认知。在上篇中&#xff0c;我们最后实现了一个小demo&#xff0c;从一个UIAbility调起了另外一个UIAbility。当时我提到过&#xff0c;暂不实现比如点击EntryAbility中的控件去触发跳转&#xff0c;而是在Entry…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

css实现圆环展示百分比,根据值动态展示所占比例

代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

OkHttp 中实现断点续传 demo

在 OkHttp 中实现断点续传主要通过以下步骤完成&#xff0c;核心是利用 HTTP 协议的 Range 请求头指定下载范围&#xff1a; 实现原理 Range 请求头&#xff1a;向服务器请求文件的特定字节范围&#xff08;如 Range: bytes1024-&#xff09; 本地文件记录&#xff1a;保存已…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

从零实现STL哈希容器:unordered_map/unordered_set封装详解

本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说&#xff0c;直接开始吧&#xff01; 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...