基于Canal的数据同步
基于Canal的数据同步
一、 系统结构
该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。
- canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
- CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目
二、. Canal配置
在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。
1) 配置文件路径:canal/conf/canal.properties

2) 配置文件路径:
canal/conf/example/instance.properties


三、 Spring Boot配置
1. 项目结构

2. Canal账号密码配置
进入到Config下的CanalClient类文件。
注意密码是通过MD5加密的,图中这这段字符应替换为canal。

3. 目标数据库配置
在yml中配置数据同步目标数据库即可

源代码:
package com.canal.canalclient.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/*** @Auther: fzl* @Date: 2020/4/20 01:21* @Description:*/
public class CanalClient {private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();public static void startCanal() {//获取canalServer连接:本机地址,端口号CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址", "端口号"), "example", "canal", "canal");int batchSize = 1000;try {//连接canalServerconnector.connect();//订阅Desctinstionconnector.subscribe();connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//轮询拉取数据 上面的whereMessage message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//睡眠Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);System.out.println("aa"+size);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 10) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}public static JdbcTemplate jdbcTemplate;/*** 模拟执行队列里面的sql语句*/public static void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();jdbcTemplate.execute(sql);System.out.println("[sql]----> " + sql);}}/*** 数据处理** @param entrys*/private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == CanalEntry.EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private static void saveUpdateSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private static void saveDeleteSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private static void saveInsertSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}
}
package com.canal.canalclient;import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {@Autowired@Qualifier("test_master_energy") //有多个数据源的,需要名称区分private static JdbcTemplate jdbcTemplate;@Overridepublic void run(ApplicationArguments args) throws Exception{log.info("开始监听同步数据库");CanalClient.jdbcTemplate = jdbcTemplate;CanalClient.startCanal();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.canal</groupId><artifactId>CanalClient</artifactId><version>0.0.1-SNAPSHOT</version><name>CanalClient</name><description>CanalClient</description><properties><java.version>19</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><!-- 打包时跳过测试 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources></build></project>相关文章:
基于Canal的数据同步
基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数…...
vuetify设置页面默认主题色
前言 最近工作中接到一个任务: 项目中分light和dark两种主题色a、b页面默认为dark其他页面默认为light 项目前端环境: vue2jsyarnvuexvuetifyelement ui 解决思路 routerjs中配置路径时进行默认主题设置 在左侧aside点击菜单时,进行主题切…...
【Python入门第二十三天】Python 继承
Python 继承 继承允许我们定义继承另一个类的所有方法和属性的类。 父类是继承的类,也称为基类。 子类是从另一个类继承的类,也称为派生类。 创建父类 任何类都可以是父类,因此语法与创建任何其他类相同: 实例 创建一个名为…...
C#中,读取一个或多个文件内容的方法
读取一个或多个文件内容的方法 在C#中,可以使用File.ReadAllLines方法一次读取多个文件中的所有行内容。例如,以下代码读取了两个文件中的所有行内容,然后将它们合并在一起: string[] file1Lines File.ReadAllLines("file1…...
1 基于神经辐射场(neural Radiance Fileds, Nerf)的三维重建- 简介
Nerf简介 Nerf(neural Radiance Fileds) 为2020年ICCV上提出的一个基于隐式表达的三维重建方法,使用2D的 Posed Imageds 来生成(表达)复杂的三维场景。现在越来越多的研究人员开始关注这个潜力巨大的领域,也…...
水果FLStudio21.0.0中文版全能数字音乐工作站DAW
FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持,更快捷的音频剪辑及素材管理器,多样主题随心换!Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音,20余年的技术积淀和实力研发,FL Studio 已经从电音…...
【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用
GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …...
C语言之中rand()函数是如何实现的
rand()函数是一个C标准库中的随机数生成函数,用于生成一个范围在0到RAND_MAX之间的伪随机数。RAND_MAX是一个常量,它是随机数的最大值,通常被定义为32767。 rand()函数的实现原理可以概括为以下几个步骤: 初始化随机数生成器 在…...
winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)
上周在看别人写的上位机demo代码时,发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库,没有程序入口方法Main,但却很神奇能调试,最后发现原来Vistual Studio启动了一个外挂程序UserContr…...
SBUS的协议详解
SBUS 1.串口配置: 100k波特率, 8位数据位(在stm32中要选择9位), 偶校验(EVEN), 2位停止位, 无控流,25个字节, 2.协议格式: [startbyte] [data1][data2]……...
【PyTorch】教程:torch.nn.Hardshrink
torch.nn.Hardshrink CLASS torch.nn.Hardshrink(lambd0.5) 参数 lambd ([float]) – the λ\lambdaλ 默认为 0.5 定义 HardShrink(x){x,if x>λx,if x<−λ0,otherwise \text{HardShrink}(x) \begin{cases} x, & \text{ if } x > \lambda \\ x, & \text{…...
JavaScript 函数参数
JavaScript 函数对参数的值(arguments)没有进行任何的检查。JavaScript 函数参数与大多数其他语言的函数参数的区别在于:它不会关注有多少个参数被传递,不关注传递的参数的数据类型。函数显式参数与隐藏参数(arguments)在先前的教程中,我们已…...
【C】标准IO库函数
fopen/fclose #include <stdio.h>FILE *fopen(const char *path, const char *mode); 返回值:成功返回文件指针,出错返回NULL并设置errnoint fclose(FILE *fp); 返回值:成功返回0,出错返回EOF并设置errnomode参数是一个字符…...
http客户端Feign
Feign替代RestTemplate RestTemplate方式调用存在的缺陷 String url"http://userservice/user/"order.getUserId();User user restTemplate.getForObject(url, User.class); 代码可读性差,变成体验不统一; 参数复杂的时候URL难以维护。 &l…...
如何在Java中使用枚举类:从入门到进阶
枚举类是Java中一种特殊的数据类型,它允许我们将一组有限的值作为一组常量来使用,这些常量在代码中具有固定的名称和类型。在Java中,枚举类通常用于代表状态、选项和类别等具有离散值的变量。本篇博客将深入探讨Java中的枚举类,包…...
操作系统(1.2)--引论
目录 一、操作系统的基本特性 1.并发性 1.1 并行与并发 1.2 引入进程 2.共享性 2.1 互斥共享方式 2.3 同时访问方式 3.虚拟 3.1 时分复用技术 4. 异 步 二、操作系统的主要功能 1.处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 调度 2. 内…...
【Linux】 shell if的[]和[[]]区别
文章目录[]和test[]和[[]]区别总结参考[]和test Shell中的 test 命令用于检查某个条件是否成立,它可以进行数值、字符和文件三个方面的测试 test常用于 if ,作为判断条件,if test等价于 if [ ],因此,test和[] 内的内…...
利用flask解析海康摄像头视频
利用flask解析海康摄像头视频利用flask解析海康摄像头和大华摄像头的视频一、安装依赖包二、获取海康摄像头视频流三、将视频流输出到Web页面四、 创建HTML模板文件利用flask解析海康摄像头和大华摄像头的视频 作为AI智能的一种应用场景,视频监控系统已经在各个行业…...
./docker-compose.yml‘ is invalid
文章目录前言提示原因版本太低解决方法更新删除原来不能执行的/usr/local/bin/docker-compose下载安装docker-compose添加权限前言 安装ctfd过程中的一些报错 rootubuntu:/CTFd# docker-compose up -d ERROR: The Compose file ./docker-compose.yml is invalid because: net…...
Java 流程控制
条件/选择结构 if if(条件表达式){// 表达式为 true 时,执行该代码块 }if(true) {System.out.println("hello"); }if else if(条件表达式){// 表达式为 true 时,执行该代码块 } else {// 表达式为 false 时,执行该代码块 }if(1 …...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件
在选煤厂、化工厂、钢铁厂等过程生产型企业,其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进,需提前预防假检、错检、漏检,推动智慧生产运维系统数据的流动和现场赋能应用。同时,…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的“no matching...“系列算法协商失败问题
【SSH疑难排查】轻松解决新版OpenSSH连接旧服务器的"no matching..."系列算法协商失败问题 摘要: 近期,在使用较新版本的OpenSSH客户端连接老旧SSH服务器时,会遇到 "no matching key exchange method found", "n…...
关于easyexcel动态下拉选问题处理
前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...
Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
