当前位置: 首页 > news >正文

SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.7/

这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz,canal.admin-1.1.7.tar.gz 是一个监控服务,可选;

这里说一下 deployer 和 adapter 的关系,deployer 主要是监控源数据的数据变更,也是就所有的 insert、update、delete,

只要数据有变化就通知 adapter ,所以真正负责往目标库写数据的是 adapter 。

二、canal-deployer 配置说明

建议先新建一个文件夹 deployer ,然后把上面下载的压缩包拷进去在解压;

修改 /conf/example/instance.properties,这里只贴出了要修改的地方

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

这个服务个人理解是一个服务端,程序的客户端会连接他,他监听到数据变化再转发给 adapter

三、canal-adapter 配置说明

建议先新建一个文件夹 adapter ,然后把上面下载的压缩包拷进去在解压;

这里有两个地方要修改,以 mysql 数据同步为例

/conf/application.yml

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null
​
canal.conf:mode: tcp #tcp kafka rocketMQ rabbitMQflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: -1timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumerrabbitmq.host:rabbitmq.virtual.host:rabbitmq.username:rabbitmq.password:rabbitmq.resource.ownerId:
​srcDataSources:defaultDS:url: jdbc:mysql://localhost:3306/hebeiqx?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falseusername: rootpassword: 123456canalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: rdbkey: mysql1properties:jdbc.driverClassName: com.mysql.jdbc.Driverjdbc.url: jdbc:mysql://localhost:3306/weather?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true&verifyServerCertificate=false&useSSL=falsejdbc.username: rootjdbc.password: 123456

其实这里也没什么改的,srcDataSources 源数据库连接信息,canalAdapters 下面的目标数据库的连接信息,canalAdapters 下面一个实例就是一个 topic

/conf/rdb/mytest_user.yml 这个文件的配置比较奇葩,大概有三种场景

1、单表同步,targetTable 后面直接写目标库表名,这个版本不需要写目标库的名称
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000
2、整个数据库同步,但是有个要求是两个数据库的名字要一致,而且是必须(有疑问?看看3就解决了)
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:mirrorDb: truedatabase: mytest
3、多表同步,网上的案例都是单表的demo,目前还没有看到我这种方式

上面的1里面同步了 table1 这张表,那现在还要同步 table2 这种表怎么办,你是不是以为是这样:

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:- database: testtable: table1targetTable: table1targetPk:id: idmapAll: truecommitBatch: 7000- database: testtable: table2targetTable: table2targetPk:id: idmapAll: truecommitBatch: 7000

上面这种方式启动就直接报错了,网上找了一天也没看到相关资料......

重点:把 mytest_user.yml 复制一份,再里面再配置另一张表就可以了,很脑残但是真管用;

注意这里所有文件的名字都是 xxx_user.yml 这种格式,内容就跟 1 里面的一样,把表名改一下就行;

四、SpringBoot 集成

添加 maven 依赖

<!--canal-->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version><!-- 去掉否则启动报错 --><exclusions><exclusion><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId></exclusion></exclusions>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version>
</dependency>
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.17.3</version>
</dependency>

客户端连接代码,都是模板代码之间用就行,printEnity 和 printColumn 这俩方法没有也行

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
​
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
​
import javax.annotation.PostConstruct;
​
@Component
public class CanalClient {
​private Logger logger = LoggerFactory.getLogger(CanalClient.class);
​private static final ThreadFactory springThreadFactory = new CustomizableThreadFactory("canal-pool-");
​private static final ExecutorService executors = Executors.newFixedThreadPool(1, springThreadFactory);
​@Autowiredprivate CanalInstanceProperties canalInstanceProperties;
​@PostConstructprivate void startListening() {canalInstanceProperties.getInstance().forEach(instanceName -> {executors.submit(() -> {connector(instanceName);});});}
​/*** 消费canal的线程池*/public void connector(String instanceName) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), instanceName, "", "");
​try {// 打开连接connector.connect();// 订阅所有消息//connector.subscribe(".*\\..*");// 只订阅test1数据库下的所有表connector.subscribe("hebeiqx.*");// 恢复到之前同步的那个位置connector.rollback();
​int batchSize = 1000;for (; ; ) {// 获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少Message message = connector.getWithoutAck(batchSize);// 获取消息idlong batchId = message.getId();// 获取批量的数量int size = message.getEntries().size();if (size == 0 || batchId == -1) {//logger.info("暂无数据......");try {// 没有数据就等待1秒Thread.sleep(1000);} catch (InterruptedException ignored) {}}if (batchId != -1) {logger.info("数据同步监听中......");logger.info("instance -> {}, msgId -> {}", instanceName, batchId);// 数据处理//printEnity(message.getEntries());// 提交确认connector.ack(batchId);// 处理失败,回滚数据connector.rollback(batchId);}}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
​private void printEnity(List<CanalEntry.Entry> entries) {for (CanalEntry.Entry entry : entries) {// 开启/关闭事务的实体类型,跳过if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}
​// RowChange对象,包含了一行数据变化的所有特征// 比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange = null;
​try {// 序列化数据rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {e.printStackTrace();}
​assert rowChange != null;
​// 获取操作类型:insert/update/delete类型CanalEntry.EventType eventType = rowChange.getEventType();
​// 打印Header信息logger.info(String.format("================>; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));
​// 判断是否是DDL语句if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {logger.info("sql ------------>{}", rowChange.getSql());}
​// 获取RowChange对象里的每一行数据,打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {switch (rowChange.getEventType()) {// 如果希望监听多种事件,可以手动增加casecase UPDATE:printColumn(rowData.getAfterColumnsList());printColumn(rowData.getBeforeColumnsList());break;case INSERT:printColumn(rowData.getAfterColumnsList());break;case DELETE:printColumn(rowData.getBeforeColumnsList());break;default:}}}}
​private void printColumn(List<CanalEntry.Column> columns) {StringBuilder sb = new StringBuilder();for (CanalEntry.Column column : columns) {sb.append("[");sb.append(column.getName()).append(" : ").append(column.getValue()).append("update=").append(column.getUpdated());sb.append("]");sb.append("    ");}logger.info(sb.toString());}
}

相关文章:

SpringBoot 集成 Canal 基于 MySQL 做数据同步

一、canal 组件关系 下载地址&#xff1a;https://github.com/alibaba/canal/releases/download/canal-1.1.7/ 这里面主要的有两个 canal.deployer-1.1.7.tar.gz 和 canal.adapter-1.1.7.tar.gz&#xff0c;canal.admin-1.1.7.tar.gz 是一个监控服务&#xff0c;可选&#xf…...

【CVE-2022-22733漏洞复现】

Apache ShardingSphere ElasticJob-UI漏洞 漏洞编号:CVE-2022-22733 文档说明 本文作者:SwBack 创作时间:2024/1/21 19:19:19 知乎:https://www.zhihu.com/people/back-88-87 CSDN:https://blog.csdn.net/qq_30817059 百度搜索: SwBack漏洞描述 Apache ShardingSphere Elast…...

Python爬虫---scrapy框架---当当网管道封装

项目结构&#xff1a; dang.py文件&#xff1a;自己创建&#xff0c;实现爬虫核心功能的文件 import scrapy from scrapy_dangdang_20240113.items import ScrapyDangdang20240113Itemclass DangSpider(scrapy.Spider):name "dang" # 名字# 如果是多页下载的话, …...

【机器学习】机器学习四大类第01课

一、机器学习四大类 有监督学习 (Supervised Learning) 有监督学习是通过已知的输入-输出对&#xff08;即标记过的训练数据&#xff09;来学习函数关系的过程。在训练阶段&#xff0c;模型会根据这些示例调整参数以尽可能准确地预测新的、未见过的数据点的输出。 实例&#x…...

下述默认构造函数有什么问题?

12.4 // points to string allocated by new // holds length of string 独立的、相同的数据,而不会重叠。由于同样的原因,必须定义赋值操作符。对于每一种情况,最终目的 都是执行深度复制,也就是说,复制实际的数据,而不仅仅是复制指向数据的指针。 对象的存储持续性为自动或…...

vite和mockjs配合使用

vite mockjs 当后端还没准备完成之前&#xff0c;前端可以使用 mock 模拟后端响应&#xff0c;提高开发效率 1、安装插件 使用 vite-plugin-mock 插件&#xff0c;配合mockjs完成项目的 mock 配置 npm install mockjs vite-plugin-mock2、vite配置插件 在 vite.config.js…...

【数据结构】常见八大排序算法总结

目录 前言 1.直接插入排序 2.希尔排序 3.选择排序 4.堆排序 5.冒泡排序 6.快速排序 6.1Hoare版本 6.2挖坑法 6.3前后指针法 6.4快速排序的递归实现 6.5快速排序的非递归实现 7.归并排序 8.计数排序&#xff08;非比较排序&#xff09; 9.补充:基数排序 10.总结…...

系统学英语 — 句法 — 常规句型

目录 文章目录 目录5 大基本句型复合句型主语从句宾语从句表语从句定语从句状语从句同位语从句补语从句 谓语句型 5 大基本句型 主谓&#xff1a;主语发出一个动作&#xff0c;例如&#xff1a;He cried.主谓宾&#xff1a;we study English.主系表&#xff1a;主语具有某些特…...

Github操作网络异常笔记

Github操作网络异常笔记 1. 源由2. 解决2.1 方案一2.2 方案二 3. 总结 1. 源由 开源技术在国内永远是“蛋疼”&#xff0c;这些"政治"问题对于追求技术的我们&#xff0c;形成无法回避的障碍。 $ git pull ssh: connect to host github.com port 22: Connection ti…...

Vue3新特性defineModel()便捷的双向绑定数据

官网介绍 传送门 配置 要求&#xff1a; 版本&#xff1a; vue > 3.4(必须&#xff01;&#xff01;&#xff01;)配置&#xff1a;vite.config.js 使用场景和案例 使用场景&#xff1a;父子组件的数据双向绑定&#xff0c;不用emit和props的繁重代码 具体案例 代码实…...

vue列表飞入效果

效果 实现代码 <template><div><button click"add">添加</button><TransitionGroup name"list" tag"ul"><div class"list-item" v-for"item in items" :key"item.id">{{ i…...

C语言·预处理详解

1. 预定义符号 C语言设置了一些预定义符号&#xff0c;可以直接使用&#xff0c;预定义符号也是在预处理期间处理的 __FILE__ 进行编译的源文件 __LINE__ 文件当前的行号 __DATE__ 文件被编译的日期 __TIME__ 文件被编译的时间 __STDC__ 如果编译器遵循ANSI C&#xff0c;…...

服务器与普通电脑的区别,普通电脑可以当作服务器用吗?

服务器在我们日常应用中非常常见&#xff0c;手机APP、手机游戏、PC游戏、小程序、网站等等都需要部署在服务器上&#xff0c;为我们提供各种计算、应用服务。服务器也是计算机的一种&#xff0c;虽然内部结构相差不大&#xff0c;但是服务器的运行速度更快、负载更高、成本更高…...

数字身份所有权:Web3时代用户数据的掌控权

随着Web3时代的来临&#xff0c;数字身份的概念正焕发出崭新的光芒。在这个数字化的时代&#xff0c;用户的个人数据变得愈加珍贵&#xff0c;而Web3则为用户带来了数字身份所有权的概念&#xff0c;重新定义了用户与个人数据之间的关系。本文将深入探讨Web3时代用户数据的掌控…...

python爬虫如何写,有哪些成功爬取的案例

编写Python爬虫时&#xff0c;常用的库包括Requests、Beautiful Soup和Scrapy。以下是三个简单的Python爬虫案例&#xff0c;分别使用Requests和Beautiful Soup&#xff0c;以及Scrapy。 1. 使用Requests和Beautiful Soup爬取网页内容&#xff1a; import requests from bs4 …...

PLC物联网网关BL104实现PLC协议转MQTT、OPC UA、Modbus TCP

随着物联网技术的迅猛发展&#xff0c;人们深刻认识到在智能化生产和生活中&#xff0c;实时、可靠、安全的数据传输至关重要。在此背景下&#xff0c;高性能的物联网数据传输解决方案——协议转换网关应运而生&#xff0c;广泛应用于工业自动化和数字化工厂应用环境中。 无缝衔…...

explain工具优化mysql需要达到什么级别?

explain工具优化mysql需要达到什么级别&#xff1f; 一、explain工具是什么&#xff1f;二、explain查询后各字段的含义三、explain查询后type字段有哪些类型&#xff1f;四、type类型需要优化到哪个阶段&#xff1f; 一、explain工具是什么&#xff1f; explain是什么&#x…...

RHCE作业

架设一台NFS服务器&#xff0c;并按照以下要求配置 1、开放/nfs/shared目录&#xff0c;供所有用户查询资料 2、开放/nfs/upload目录&#xff0c;为192.168.xxx.0/24网段主机可以上传目录&#xff0c;并将所有用户及所属的组映射为nfs-upload,其UID和GID均为210 3、将/home/to…...

在Java中调企微机器人发送消息到群里

目录 如何使用群机器人 消息类型及数据格式 文本类型 markdown类型 图片类型 图文类型 文件类型 模版卡片类型 文本通知模版卡片 图文展示模版卡片 消息发送频率限制 文件上传接口 Java 执行语句 String url "webhook的Url"; String result HttpReque…...

鸿蒙开发(四)UIAbility和Page交互

通过上一篇的学习&#xff0c;相信大家对UIAbility已经有了初步的认知。在上篇中&#xff0c;我们最后实现了一个小demo&#xff0c;从一个UIAbility调起了另外一个UIAbility。当时我提到过&#xff0c;暂不实现比如点击EntryAbility中的控件去触发跳转&#xff0c;而是在Entry…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

【生成模型】视频生成论文调研

工作清单 上游应用方向&#xff1a;控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

Git常用命令完全指南:从入门到精通

Git常用命令完全指南&#xff1a;从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...

elementUI点击浏览table所选行数据查看文档

项目场景&#xff1a; table按照要求特定的数据变成按钮可以点击 解决方案&#xff1a; <el-table-columnprop"mlname"label"名称"align"center"width"180"><template slot-scope"scope"><el-buttonv-if&qu…...

Oracle11g安装包

Oracle 11g安装包 适用于windows系统&#xff0c;64位 下载路径 oracle 11g 安装包...