当前位置: 首页 > news >正文

基于Canal的数据同步

基于Canal的数据同步

一、 系统结构

该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。

  1. canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
  2. CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目

二、. Canal配置

在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。

1) 配置文件路径:canal/conf/canal.properties

在这里插入图片描述

2) 配置文件路径:

canal/conf/example/instance.properties

在这里插入图片描述
在这里插入图片描述

三、 Spring Boot配置

1. 项目结构

在这里插入图片描述

2. Canal账号密码配置

进入到Config下的CanalClient类文件。

注意密码是通过MD5加密的,图中这这段字符应替换为canal。

在这里插入图片描述

3. 目标数据库配置

在yml中配置数据同步目标数据库即可
在这里插入图片描述

源代码:

package com.canal.canalclient.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/*** @Auther: fzl* @Date: 2020/4/20 01:21* @Description:*/
public class CanalClient {private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();public static void startCanal() {//获取canalServer连接:本机地址,端口号CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址", "端口号"), "example", "canal", "canal");int batchSize = 1000;try {//连接canalServerconnector.connect();//订阅Desctinstionconnector.subscribe();connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//轮询拉取数据   上面的whereMessage message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//睡眠Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);System.out.println("aa"+size);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 10) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}public static JdbcTemplate jdbcTemplate;/*** 模拟执行队列里面的sql语句*/public static void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();jdbcTemplate.execute(sql);System.out.println("[sql]----> " + sql);}}/*** 数据处理** @param entrys*/private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == CanalEntry.EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private static void saveUpdateSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private static void saveDeleteSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private static void saveInsertSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}
}
package com.canal.canalclient;import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {@Autowired@Qualifier("test_master_energy") //有多个数据源的,需要名称区分private static JdbcTemplate jdbcTemplate;@Overridepublic void run(ApplicationArguments args)  throws Exception{log.info("开始监听同步数据库");CanalClient.jdbcTemplate = jdbcTemplate;CanalClient.startCanal();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.canal</groupId><artifactId>CanalClient</artifactId><version>0.0.1-SNAPSHOT</version><name>CanalClient</name><description>CanalClient</description><properties><java.version>19</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><!-- 打包时跳过测试 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources></build></project>

相关文章:

基于Canal的数据同步

基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架&#xff0c;而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal&#xff0c;可以实现 MySQL 数据库的实时数…...

vuetify设置页面默认主题色

前言 最近工作中接到一个任务&#xff1a; 项目中分light和dark两种主题色a、b页面默认为dark其他页面默认为light 项目前端环境&#xff1a; vue2jsyarnvuexvuetifyelement ui 解决思路 routerjs中配置路径时进行默认主题设置 在左侧aside点击菜单时&#xff0c;进行主题切…...

【Python入门第二十三天】Python 继承

Python 继承 继承允许我们定义继承另一个类的所有方法和属性的类。 父类是继承的类&#xff0c;也称为基类。 子类是从另一个类继承的类&#xff0c;也称为派生类。 创建父类 任何类都可以是父类&#xff0c;因此语法与创建任何其他类相同&#xff1a; 实例 创建一个名为…...

C#中,读取一个或多个文件内容的方法

读取一个或多个文件内容的方法 在C#中&#xff0c;可以使用File.ReadAllLines方法一次读取多个文件中的所有行内容。例如&#xff0c;以下代码读取了两个文件中的所有行内容&#xff0c;然后将它们合并在一起&#xff1a; string[] file1Lines File.ReadAllLines("file1…...

1 基于神经辐射场(neural Radiance Fileds, Nerf)的三维重建- 简介

Nerf简介 Nerf&#xff08;neural Radiance Fileds&#xff09; 为2020年ICCV上提出的一个基于隐式表达的三维重建方法&#xff0c;使用2D的 Posed Imageds 来生成&#xff08;表达&#xff09;复杂的三维场景。现在越来越多的研究人员开始关注这个潜力巨大的领域&#xff0c;也…...

水果FLStudio21.0.0中文版全能数字音乐工作站DAW

FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持&#xff0c;更快捷的音频剪辑及素材管理器&#xff0c;多样主题随心换&#xff01;Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音&#xff0c;20余年的技术积淀和实力研发&#xff0c;FL Studio 已经从电音…...

【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用

GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …...

C语言之中rand()函数是如何实现的

rand()函数是一个C标准库中的随机数生成函数&#xff0c;用于生成一个范围在0到RAND_MAX之间的伪随机数。RAND_MAX是一个常量&#xff0c;它是随机数的最大值&#xff0c;通常被定义为32767。 rand()函数的实现原理可以概括为以下几个步骤&#xff1a; 初始化随机数生成器 在…...

winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)

上周在看别人写的上位机demo代码时&#xff0c;发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库&#xff0c;没有程序入口方法Main&#xff0c;但却很神奇能调试&#xff0c;最后发现原来Vistual Studio启动了一个外挂程序UserContr…...

SBUS的协议详解

SBUS 1.串口配置&#xff1a; 100k波特率&#xff0c; 8位数据位&#xff08;在stm32中要选择9位&#xff09;&#xff0c; 偶校验&#xff08;EVEN), 2位停止位&#xff0c; 无控流&#xff0c;25个字节&#xff0c; 2.协议格式&#xff1a; [startbyte] [data1][data2]……...

【PyTorch】教程:torch.nn.Hardshrink

torch.nn.Hardshrink CLASS torch.nn.Hardshrink(lambd0.5) 参数 lambd ([float]) – the λ\lambdaλ 默认为 0.5 定义 HardShrink(x){x,if x>λx,if x<−λ0,otherwise \text{HardShrink}(x) \begin{cases} x, & \text{ if } x > \lambda \\ x, & \text{…...

JavaScript 函数参数

JavaScript 函数对参数的值(arguments)没有进行任何的检查。JavaScript 函数参数与大多数其他语言的函数参数的区别在于&#xff1a;它不会关注有多少个参数被传递&#xff0c;不关注传递的参数的数据类型。函数显式参数与隐藏参数(arguments)在先前的教程中&#xff0c;我们已…...

【C】标准IO库函数

fopen/fclose #include <stdio.h>FILE *fopen(const char *path, const char *mode); 返回值&#xff1a;成功返回文件指针&#xff0c;出错返回NULL并设置errnoint fclose(FILE *fp); 返回值&#xff1a;成功返回0&#xff0c;出错返回EOF并设置errnomode参数是一个字符…...

http客户端Feign

Feign替代RestTemplate RestTemplate方式调用存在的缺陷 String url"http://userservice/user/"order.getUserId();User user restTemplate.getForObject(url, User.class); 代码可读性差&#xff0c;变成体验不统一&#xff1b; 参数复杂的时候URL难以维护。 &l…...

如何在Java中使用枚举类:从入门到进阶

枚举类是Java中一种特殊的数据类型&#xff0c;它允许我们将一组有限的值作为一组常量来使用&#xff0c;这些常量在代码中具有固定的名称和类型。在Java中&#xff0c;枚举类通常用于代表状态、选项和类别等具有离散值的变量。本篇博客将深入探讨Java中的枚举类&#xff0c;包…...

操作系统(1.2)--引论

目录 一、操作系统的基本特性 1.并发性 1.1 并行与并发 1.2 引入进程 2.共享性 2.1 互斥共享方式 2.3 同时访问方式 3.虚拟 3.1 时分复用技术 4. 异 步 二、操作系统的主要功能 1.处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 调度 2. 内…...

【Linux】 shell if的[]和[[]]区别

文章目录[]和test[]和[[]]区别总结参考[]和test Shell中的 test 命令用于检查某个条件是否成立&#xff0c;它可以进行数值、字符和文件三个方面的测试 test常用于 if &#xff0c;作为判断条件&#xff0c;if test等价于 if [ ]&#xff0c;因此&#xff0c;test和[] 内的内…...

利用flask解析海康摄像头视频

利用flask解析海康摄像头视频利用flask解析海康摄像头和大华摄像头的视频一、安装依赖包二、获取海康摄像头视频流三、将视频流输出到Web页面四、 创建HTML模板文件利用flask解析海康摄像头和大华摄像头的视频 作为AI智能的一种应用场景&#xff0c;视频监控系统已经在各个行业…...

./docker-compose.yml‘ is invalid

文章目录前言提示原因版本太低解决方法更新删除原来不能执行的/usr/local/bin/docker-compose下载安装docker-compose添加权限前言 安装ctfd过程中的一些报错 rootubuntu:/CTFd# docker-compose up -d ERROR: The Compose file ./docker-compose.yml is invalid because: net…...

Java 流程控制

条件/选择结构 if if(条件表达式){// 表达式为 true 时&#xff0c;执行该代码块 }if(true) {System.out.println("hello"); }if else if(条件表达式){// 表达式为 true 时&#xff0c;执行该代码块 } else {// 表达式为 false 时&#xff0c;执行该代码块 }if(1 …...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

Python Ovito统计金刚石结构数量

大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

vulnyx Blogger writeup

信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面&#xff0c;gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress&#xff0c;说明目标所使用的cms是wordpress&#xff0c;访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...

比较数据迁移后MySQL数据库和OceanBase数据仓库中的表

设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...