基于Canal的数据同步
基于Canal的数据同步
一、 系统结构
该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。
- canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
- CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目
二、. Canal配置
在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。
1) 配置文件路径:canal/conf/canal.properties
2) 配置文件路径:
canal/conf/example/instance.properties
三、 Spring Boot配置
1. 项目结构
2. Canal账号密码配置
进入到Config下的CanalClient类文件。
注意密码是通过MD5加密的,图中这这段字符应替换为canal。
3. 目标数据库配置
在yml中配置数据同步目标数据库即可
源代码:
package com.canal.canalclient.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/*** @Auther: fzl* @Date: 2020/4/20 01:21* @Description:*/
public class CanalClient {private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();public static void startCanal() {//获取canalServer连接:本机地址,端口号CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址", "端口号"), "example", "canal", "canal");int batchSize = 1000;try {//连接canalServerconnector.connect();//订阅Desctinstionconnector.subscribe();connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//轮询拉取数据 上面的whereMessage message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//睡眠Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);System.out.println("aa"+size);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 10) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}public static JdbcTemplate jdbcTemplate;/*** 模拟执行队列里面的sql语句*/public static void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();jdbcTemplate.execute(sql);System.out.println("[sql]----> " + sql);}}/*** 数据处理** @param entrys*/private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == CanalEntry.EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private static void saveUpdateSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private static void saveDeleteSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private static void saveInsertSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}
}
package com.canal.canalclient;import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {@Autowired@Qualifier("test_master_energy") //有多个数据源的,需要名称区分private static JdbcTemplate jdbcTemplate;@Overridepublic void run(ApplicationArguments args) throws Exception{log.info("开始监听同步数据库");CanalClient.jdbcTemplate = jdbcTemplate;CanalClient.startCanal();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.canal</groupId><artifactId>CanalClient</artifactId><version>0.0.1-SNAPSHOT</version><name>CanalClient</name><description>CanalClient</description><properties><java.version>19</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><!-- 打包时跳过测试 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources></build></project>
相关文章:

基于Canal的数据同步
基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数…...

vuetify设置页面默认主题色
前言 最近工作中接到一个任务: 项目中分light和dark两种主题色a、b页面默认为dark其他页面默认为light 项目前端环境: vue2jsyarnvuexvuetifyelement ui 解决思路 routerjs中配置路径时进行默认主题设置 在左侧aside点击菜单时,进行主题切…...
【Python入门第二十三天】Python 继承
Python 继承 继承允许我们定义继承另一个类的所有方法和属性的类。 父类是继承的类,也称为基类。 子类是从另一个类继承的类,也称为派生类。 创建父类 任何类都可以是父类,因此语法与创建任何其他类相同: 实例 创建一个名为…...
C#中,读取一个或多个文件内容的方法
读取一个或多个文件内容的方法 在C#中,可以使用File.ReadAllLines方法一次读取多个文件中的所有行内容。例如,以下代码读取了两个文件中的所有行内容,然后将它们合并在一起: string[] file1Lines File.ReadAllLines("file1…...

1 基于神经辐射场(neural Radiance Fileds, Nerf)的三维重建- 简介
Nerf简介 Nerf(neural Radiance Fileds) 为2020年ICCV上提出的一个基于隐式表达的三维重建方法,使用2D的 Posed Imageds 来生成(表达)复杂的三维场景。现在越来越多的研究人员开始关注这个潜力巨大的领域,也…...

水果FLStudio21.0.0中文版全能数字音乐工作站DAW
FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持,更快捷的音频剪辑及素材管理器,多样主题随心换!Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音,20余年的技术积淀和实力研发,FL Studio 已经从电音…...

【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用
GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …...
C语言之中rand()函数是如何实现的
rand()函数是一个C标准库中的随机数生成函数,用于生成一个范围在0到RAND_MAX之间的伪随机数。RAND_MAX是一个常量,它是随机数的最大值,通常被定义为32767。 rand()函数的实现原理可以概括为以下几个步骤: 初始化随机数生成器 在…...

winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)
上周在看别人写的上位机demo代码时,发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库,没有程序入口方法Main,但却很神奇能调试,最后发现原来Vistual Studio启动了一个外挂程序UserContr…...

SBUS的协议详解
SBUS 1.串口配置: 100k波特率, 8位数据位(在stm32中要选择9位), 偶校验(EVEN), 2位停止位, 无控流,25个字节, 2.协议格式: [startbyte] [data1][data2]……...

【PyTorch】教程:torch.nn.Hardshrink
torch.nn.Hardshrink CLASS torch.nn.Hardshrink(lambd0.5) 参数 lambd ([float]) – the λ\lambdaλ 默认为 0.5 定义 HardShrink(x){x,if x>λx,if x<−λ0,otherwise \text{HardShrink}(x) \begin{cases} x, & \text{ if } x > \lambda \\ x, & \text{…...

JavaScript 函数参数
JavaScript 函数对参数的值(arguments)没有进行任何的检查。JavaScript 函数参数与大多数其他语言的函数参数的区别在于:它不会关注有多少个参数被传递,不关注传递的参数的数据类型。函数显式参数与隐藏参数(arguments)在先前的教程中,我们已…...
【C】标准IO库函数
fopen/fclose #include <stdio.h>FILE *fopen(const char *path, const char *mode); 返回值:成功返回文件指针,出错返回NULL并设置errnoint fclose(FILE *fp); 返回值:成功返回0,出错返回EOF并设置errnomode参数是一个字符…...

http客户端Feign
Feign替代RestTemplate RestTemplate方式调用存在的缺陷 String url"http://userservice/user/"order.getUserId();User user restTemplate.getForObject(url, User.class); 代码可读性差,变成体验不统一; 参数复杂的时候URL难以维护。 &l…...
如何在Java中使用枚举类:从入门到进阶
枚举类是Java中一种特殊的数据类型,它允许我们将一组有限的值作为一组常量来使用,这些常量在代码中具有固定的名称和类型。在Java中,枚举类通常用于代表状态、选项和类别等具有离散值的变量。本篇博客将深入探讨Java中的枚举类,包…...
操作系统(1.2)--引论
目录 一、操作系统的基本特性 1.并发性 1.1 并行与并发 1.2 引入进程 2.共享性 2.1 互斥共享方式 2.3 同时访问方式 3.虚拟 3.1 时分复用技术 4. 异 步 二、操作系统的主要功能 1.处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 调度 2. 内…...
【Linux】 shell if的[]和[[]]区别
文章目录[]和test[]和[[]]区别总结参考[]和test Shell中的 test 命令用于检查某个条件是否成立,它可以进行数值、字符和文件三个方面的测试 test常用于 if ,作为判断条件,if test等价于 if [ ],因此,test和[] 内的内…...
利用flask解析海康摄像头视频
利用flask解析海康摄像头视频利用flask解析海康摄像头和大华摄像头的视频一、安装依赖包二、获取海康摄像头视频流三、将视频流输出到Web页面四、 创建HTML模板文件利用flask解析海康摄像头和大华摄像头的视频 作为AI智能的一种应用场景,视频监控系统已经在各个行业…...
./docker-compose.yml‘ is invalid
文章目录前言提示原因版本太低解决方法更新删除原来不能执行的/usr/local/bin/docker-compose下载安装docker-compose添加权限前言 安装ctfd过程中的一些报错 rootubuntu:/CTFd# docker-compose up -d ERROR: The Compose file ./docker-compose.yml is invalid because: net…...
Java 流程控制
条件/选择结构 if if(条件表达式){// 表达式为 true 时,执行该代码块 }if(true) {System.out.println("hello"); }if else if(条件表达式){// 表达式为 true 时,执行该代码块 } else {// 表达式为 false 时,执行该代码块 }if(1 …...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

渗透实战PortSwigger靶场-XSS Lab 14:大多数标签和属性被阻止
<script>标签被拦截 我们需要把全部可用的 tag 和 event 进行暴力破解 XSS cheat sheet: https://portswigger.net/web-security/cross-site-scripting/cheat-sheet 通过爆破发现body可以用 再把全部 events 放进去爆破 这些 event 全部可用 <body onres…...

对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...

C++:多态机制详解
目录 一. 多态的概念 1.静态多态(编译时多态) 二.动态多态的定义及实现 1.多态的构成条件 2.虚函数 3.虚函数的重写/覆盖 4.虚函数重写的一些其他问题 1).协变 2).析构函数的重写 5.override 和 final关键字 1&#…...

基于SpringBoot在线拍卖系统的设计和实现
摘 要 随着社会的发展,社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统,主要的模块包括管理员;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...