当前位置: 首页 > news >正文

基于Canal的数据同步

基于Canal的数据同步

一、 系统结构

该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。

  1. canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
  2. CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目

二、. Canal配置

在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。

1) 配置文件路径:canal/conf/canal.properties

在这里插入图片描述

2) 配置文件路径:

canal/conf/example/instance.properties

在这里插入图片描述
在这里插入图片描述

三、 Spring Boot配置

1. 项目结构

在这里插入图片描述

2. Canal账号密码配置

进入到Config下的CanalClient类文件。

注意密码是通过MD5加密的,图中这这段字符应替换为canal。

在这里插入图片描述

3. 目标数据库配置

在yml中配置数据同步目标数据库即可
在这里插入图片描述

源代码:

package com.canal.canalclient.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/*** @Auther: fzl* @Date: 2020/4/20 01:21* @Description:*/
public class CanalClient {private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();public static void startCanal() {//获取canalServer连接:本机地址,端口号CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址", "端口号"), "example", "canal", "canal");int batchSize = 1000;try {//连接canalServerconnector.connect();//订阅Desctinstionconnector.subscribe();connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//轮询拉取数据   上面的whereMessage message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//睡眠Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);System.out.println("aa"+size);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 10) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}public static JdbcTemplate jdbcTemplate;/*** 模拟执行队列里面的sql语句*/public static void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();jdbcTemplate.execute(sql);System.out.println("[sql]----> " + sql);}}/*** 数据处理** @param entrys*/private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == CanalEntry.EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private static void saveUpdateSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private static void saveDeleteSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private static void saveInsertSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}
}
package com.canal.canalclient;import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {@Autowired@Qualifier("test_master_energy") //有多个数据源的,需要名称区分private static JdbcTemplate jdbcTemplate;@Overridepublic void run(ApplicationArguments args)  throws Exception{log.info("开始监听同步数据库");CanalClient.jdbcTemplate = jdbcTemplate;CanalClient.startCanal();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.canal</groupId><artifactId>CanalClient</artifactId><version>0.0.1-SNAPSHOT</version><name>CanalClient</name><description>CanalClient</description><properties><java.version>19</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><!-- 打包时跳过测试 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources></build></project>

相关文章:

基于Canal的数据同步

基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架&#xff0c;而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal&#xff0c;可以实现 MySQL 数据库的实时数…...

vuetify设置页面默认主题色

前言 最近工作中接到一个任务&#xff1a; 项目中分light和dark两种主题色a、b页面默认为dark其他页面默认为light 项目前端环境&#xff1a; vue2jsyarnvuexvuetifyelement ui 解决思路 routerjs中配置路径时进行默认主题设置 在左侧aside点击菜单时&#xff0c;进行主题切…...

【Python入门第二十三天】Python 继承

Python 继承 继承允许我们定义继承另一个类的所有方法和属性的类。 父类是继承的类&#xff0c;也称为基类。 子类是从另一个类继承的类&#xff0c;也称为派生类。 创建父类 任何类都可以是父类&#xff0c;因此语法与创建任何其他类相同&#xff1a; 实例 创建一个名为…...

C#中,读取一个或多个文件内容的方法

读取一个或多个文件内容的方法 在C#中&#xff0c;可以使用File.ReadAllLines方法一次读取多个文件中的所有行内容。例如&#xff0c;以下代码读取了两个文件中的所有行内容&#xff0c;然后将它们合并在一起&#xff1a; string[] file1Lines File.ReadAllLines("file1…...

1 基于神经辐射场(neural Radiance Fileds, Nerf)的三维重建- 简介

Nerf简介 Nerf&#xff08;neural Radiance Fileds&#xff09; 为2020年ICCV上提出的一个基于隐式表达的三维重建方法&#xff0c;使用2D的 Posed Imageds 来生成&#xff08;表达&#xff09;复杂的三维场景。现在越来越多的研究人员开始关注这个潜力巨大的领域&#xff0c;也…...

水果FLStudio21.0.0中文版全能数字音乐工作站DAW

FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持&#xff0c;更快捷的音频剪辑及素材管理器&#xff0c;多样主题随心换&#xff01;Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音&#xff0c;20余年的技术积淀和实力研发&#xff0c;FL Studio 已经从电音…...

【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用

GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …...

C语言之中rand()函数是如何实现的

rand()函数是一个C标准库中的随机数生成函数&#xff0c;用于生成一个范围在0到RAND_MAX之间的伪随机数。RAND_MAX是一个常量&#xff0c;它是随机数的最大值&#xff0c;通常被定义为32767。 rand()函数的实现原理可以概括为以下几个步骤&#xff1a; 初始化随机数生成器 在…...

winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)

上周在看别人写的上位机demo代码时&#xff0c;发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库&#xff0c;没有程序入口方法Main&#xff0c;但却很神奇能调试&#xff0c;最后发现原来Vistual Studio启动了一个外挂程序UserContr…...

SBUS的协议详解

SBUS 1.串口配置&#xff1a; 100k波特率&#xff0c; 8位数据位&#xff08;在stm32中要选择9位&#xff09;&#xff0c; 偶校验&#xff08;EVEN), 2位停止位&#xff0c; 无控流&#xff0c;25个字节&#xff0c; 2.协议格式&#xff1a; [startbyte] [data1][data2]……...

【PyTorch】教程:torch.nn.Hardshrink

torch.nn.Hardshrink CLASS torch.nn.Hardshrink(lambd0.5) 参数 lambd ([float]) – the λ\lambdaλ 默认为 0.5 定义 HardShrink(x){x,if x>λx,if x<−λ0,otherwise \text{HardShrink}(x) \begin{cases} x, & \text{ if } x > \lambda \\ x, & \text{…...

JavaScript 函数参数

JavaScript 函数对参数的值(arguments)没有进行任何的检查。JavaScript 函数参数与大多数其他语言的函数参数的区别在于&#xff1a;它不会关注有多少个参数被传递&#xff0c;不关注传递的参数的数据类型。函数显式参数与隐藏参数(arguments)在先前的教程中&#xff0c;我们已…...

【C】标准IO库函数

fopen/fclose #include <stdio.h>FILE *fopen(const char *path, const char *mode); 返回值&#xff1a;成功返回文件指针&#xff0c;出错返回NULL并设置errnoint fclose(FILE *fp); 返回值&#xff1a;成功返回0&#xff0c;出错返回EOF并设置errnomode参数是一个字符…...

http客户端Feign

Feign替代RestTemplate RestTemplate方式调用存在的缺陷 String url"http://userservice/user/"order.getUserId();User user restTemplate.getForObject(url, User.class); 代码可读性差&#xff0c;变成体验不统一&#xff1b; 参数复杂的时候URL难以维护。 &l…...

如何在Java中使用枚举类:从入门到进阶

枚举类是Java中一种特殊的数据类型&#xff0c;它允许我们将一组有限的值作为一组常量来使用&#xff0c;这些常量在代码中具有固定的名称和类型。在Java中&#xff0c;枚举类通常用于代表状态、选项和类别等具有离散值的变量。本篇博客将深入探讨Java中的枚举类&#xff0c;包…...

操作系统(1.2)--引论

目录 一、操作系统的基本特性 1.并发性 1.1 并行与并发 1.2 引入进程 2.共享性 2.1 互斥共享方式 2.3 同时访问方式 3.虚拟 3.1 时分复用技术 4. 异 步 二、操作系统的主要功能 1.处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 调度 2. 内…...

【Linux】 shell if的[]和[[]]区别

文章目录[]和test[]和[[]]区别总结参考[]和test Shell中的 test 命令用于检查某个条件是否成立&#xff0c;它可以进行数值、字符和文件三个方面的测试 test常用于 if &#xff0c;作为判断条件&#xff0c;if test等价于 if [ ]&#xff0c;因此&#xff0c;test和[] 内的内…...

利用flask解析海康摄像头视频

利用flask解析海康摄像头视频利用flask解析海康摄像头和大华摄像头的视频一、安装依赖包二、获取海康摄像头视频流三、将视频流输出到Web页面四、 创建HTML模板文件利用flask解析海康摄像头和大华摄像头的视频 作为AI智能的一种应用场景&#xff0c;视频监控系统已经在各个行业…...

./docker-compose.yml‘ is invalid

文章目录前言提示原因版本太低解决方法更新删除原来不能执行的/usr/local/bin/docker-compose下载安装docker-compose添加权限前言 安装ctfd过程中的一些报错 rootubuntu:/CTFd# docker-compose up -d ERROR: The Compose file ./docker-compose.yml is invalid because: net…...

Java 流程控制

条件/选择结构 if if(条件表达式){// 表达式为 true 时&#xff0c;执行该代码块 }if(true) {System.out.println("hello"); }if else if(条件表达式){// 表达式为 true 时&#xff0c;执行该代码块 } else {// 表达式为 false 时&#xff0c;执行该代码块 }if(1 …...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)

升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点&#xff0c;但无自动故障转移能力&#xff0c;Master宕机后需人工切换&#xff0c;期间消息可能无法读取。Slave仅存储数据&#xff0c;无法主动升级为Master响应请求&#xff…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

ZYNQ学习记录FPGA(一)ZYNQ简介

一、知识准备 1.一些术语,缩写和概念&#xff1a; 1&#xff09;ZYNQ全称&#xff1a;ZYNQ7000 All Pgrammable SoC 2&#xff09;SoC:system on chips(片上系统)&#xff0c;对比集成电路的SoB&#xff08;system on board&#xff09; 3&#xff09;ARM&#xff1a;处理器…...

论文阅读:Matting by Generation

今天介绍一篇关于 matting 抠图的文章&#xff0c;抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法&#xff0c;已经有很多的工作和这个任务相关。这两年 diffusion 模型很火&#xff0c;大家又开始用 diffusion 模型做各种 CV 任务了&am…...