当前位置: 首页 > news >正文

基于Canal的数据同步

基于Canal的数据同步

一、 系统结构

该数据同步系统由Spring Boot和Canal共同组成。
Spring Boot 是一个流行的 Java Web 框架,而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal,可以实现 MySQL 数据库的实时数据同步到其他系统中。

  1. canal.deployer-1.1.7-SNAPSHOT.tar.gz为Canal软件压缩包,需要安装在服务器上,并根据下文进行配置文件的修改。
  2. CanalClient.rar为用Spring Boot框架编写的数据库监听同步项目

二、. Canal配置

在解压Canal文件夹后,需要配置两个文件。
在配置Canal前,需要确保Mysql的Binlog已经开启,并且模式为ROW,找到当前binlog的文件名和position。

1) 配置文件路径:canal/conf/canal.properties

在这里插入图片描述

2) 配置文件路径:

canal/conf/example/instance.properties

在这里插入图片描述
在这里插入图片描述

三、 Spring Boot配置

1. 项目结构

在这里插入图片描述

2. Canal账号密码配置

进入到Config下的CanalClient类文件。

注意密码是通过MD5加密的,图中这这段字符应替换为canal。

在这里插入图片描述

3. 目标数据库配置

在yml中配置数据同步目标数据库即可
在这里插入图片描述

源代码:

package com.canal.canalclient.config;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.springframework.jdbc.core.JdbcTemplate;import java.net.InetSocketAddress;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/*** @Auther: fzl* @Date: 2020/4/20 01:21* @Description:*/
public class CanalClient {private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();public static void startCanal() {//获取canalServer连接:本机地址,端口号CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("IP地址", "端口号"), "example", "canal", "canal");int batchSize = 1000;try {//连接canalServerconnector.connect();//订阅Desctinstionconnector.subscribe();connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少//轮询拉取数据   上面的whereMessage message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//睡眠Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);System.out.println("aa"+size);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 10) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}public static JdbcTemplate jdbcTemplate;/*** 模拟执行队列里面的sql语句*/public static void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();jdbcTemplate.execute(sql);System.out.println("[sql]----> " + sql);}}/*** 数据处理** @param entrys*/private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == CanalEntry.EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private static void saveUpdateSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private static void saveDeleteSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private static void saveInsertSql(CanalEntry.Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}
}
package com.canal.canalclient;import com.canal.canalclient.config.CanalClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class StartedFunction implements ApplicationRunner {@Autowired@Qualifier("test_master_energy") //有多个数据源的,需要名称区分private static JdbcTemplate jdbcTemplate;@Overridepublic void run(ApplicationArguments args)  throws Exception{log.info("开始监听同步数据库");CanalClient.jdbcTemplate = jdbcTemplate;CanalClient.startCanal();}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.canal</groupId><artifactId>CanalClient</artifactId><version>0.0.1-SNAPSHOT</version><name>CanalClient</name><description>CanalClient</description><properties><java.version>19</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.9</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.32</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><plugins><!-- 打包时跳过测试 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>true</skipTests></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources></build></project>

相关文章:

基于Canal的数据同步

基于Canal的数据同步 一、 系统结构 该数据同步系统由Spring Boot和Canal共同组成。 Spring Boot 是一个流行的 Java Web 框架&#xff0c;而 Canal 则是阿里巴巴开源的 MySQL 数据库的数据变更监听框架。结合 Spring Boot 和 Canal&#xff0c;可以实现 MySQL 数据库的实时数…...

vuetify设置页面默认主题色

前言 最近工作中接到一个任务&#xff1a; 项目中分light和dark两种主题色a、b页面默认为dark其他页面默认为light 项目前端环境&#xff1a; vue2jsyarnvuexvuetifyelement ui 解决思路 routerjs中配置路径时进行默认主题设置 在左侧aside点击菜单时&#xff0c;进行主题切…...

【Python入门第二十三天】Python 继承

Python 继承 继承允许我们定义继承另一个类的所有方法和属性的类。 父类是继承的类&#xff0c;也称为基类。 子类是从另一个类继承的类&#xff0c;也称为派生类。 创建父类 任何类都可以是父类&#xff0c;因此语法与创建任何其他类相同&#xff1a; 实例 创建一个名为…...

C#中,读取一个或多个文件内容的方法

读取一个或多个文件内容的方法 在C#中&#xff0c;可以使用File.ReadAllLines方法一次读取多个文件中的所有行内容。例如&#xff0c;以下代码读取了两个文件中的所有行内容&#xff0c;然后将它们合并在一起&#xff1a; string[] file1Lines File.ReadAllLines("file1…...

1 基于神经辐射场(neural Radiance Fileds, Nerf)的三维重建- 简介

Nerf简介 Nerf&#xff08;neural Radiance Fileds&#xff09; 为2020年ICCV上提出的一个基于隐式表达的三维重建方法&#xff0c;使用2D的 Posed Imageds 来生成&#xff08;表达&#xff09;复杂的三维场景。现在越来越多的研究人员开始关注这个潜力巨大的领域&#xff0c;也…...

水果FLStudio21.0.0中文版全能数字音乐工作站DAW

FL Studio 21.0.0官方中文版重磅发布纯正简体中文支持&#xff0c;更快捷的音频剪辑及素材管理器&#xff0c;多样主题随心换&#xff01;Mac版新增对苹果M2/1家族芯片原生支持。编曲、剪辑、录音、混音&#xff0c;20余年的技术积淀和实力研发&#xff0c;FL Studio 已经从电音…...

【GlobalMapper精品教程】055:GM坐标转换器的巧妙使用

GM软件提供了一个简单实用的坐标转换工具,可以实现地理坐标和投影坐标之间的高斯正反算及多种转换计算。 文章目录 一、坐标转换器认识二、坐标转换案例1. 地理坐标←→地理坐标2. 地理坐标←→投影坐标三、在输出坐标上创建新的点四、其他转换工具的使用一、坐标转换器认识 …...

C语言之中rand()函数是如何实现的

rand()函数是一个C标准库中的随机数生成函数&#xff0c;用于生成一个范围在0到RAND_MAX之间的伪随机数。RAND_MAX是一个常量&#xff0c;它是随机数的最大值&#xff0c;通常被定义为32767。 rand()函数的实现原理可以概括为以下几个步骤&#xff1a; 初始化随机数生成器 在…...

winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)

上周在看别人写的上位机demo代码时&#xff0c;发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库&#xff0c;没有程序入口方法Main&#xff0c;但却很神奇能调试&#xff0c;最后发现原来Vistual Studio启动了一个外挂程序UserContr…...

SBUS的协议详解

SBUS 1.串口配置&#xff1a; 100k波特率&#xff0c; 8位数据位&#xff08;在stm32中要选择9位&#xff09;&#xff0c; 偶校验&#xff08;EVEN), 2位停止位&#xff0c; 无控流&#xff0c;25个字节&#xff0c; 2.协议格式&#xff1a; [startbyte] [data1][data2]……...

【PyTorch】教程:torch.nn.Hardshrink

torch.nn.Hardshrink CLASS torch.nn.Hardshrink(lambd0.5) 参数 lambd ([float]) – the λ\lambdaλ 默认为 0.5 定义 HardShrink(x){x,if x>λx,if x<−λ0,otherwise \text{HardShrink}(x) \begin{cases} x, & \text{ if } x > \lambda \\ x, & \text{…...

JavaScript 函数参数

JavaScript 函数对参数的值(arguments)没有进行任何的检查。JavaScript 函数参数与大多数其他语言的函数参数的区别在于&#xff1a;它不会关注有多少个参数被传递&#xff0c;不关注传递的参数的数据类型。函数显式参数与隐藏参数(arguments)在先前的教程中&#xff0c;我们已…...

【C】标准IO库函数

fopen/fclose #include <stdio.h>FILE *fopen(const char *path, const char *mode); 返回值&#xff1a;成功返回文件指针&#xff0c;出错返回NULL并设置errnoint fclose(FILE *fp); 返回值&#xff1a;成功返回0&#xff0c;出错返回EOF并设置errnomode参数是一个字符…...

http客户端Feign

Feign替代RestTemplate RestTemplate方式调用存在的缺陷 String url"http://userservice/user/"order.getUserId();User user restTemplate.getForObject(url, User.class); 代码可读性差&#xff0c;变成体验不统一&#xff1b; 参数复杂的时候URL难以维护。 &l…...

如何在Java中使用枚举类:从入门到进阶

枚举类是Java中一种特殊的数据类型&#xff0c;它允许我们将一组有限的值作为一组常量来使用&#xff0c;这些常量在代码中具有固定的名称和类型。在Java中&#xff0c;枚举类通常用于代表状态、选项和类别等具有离散值的变量。本篇博客将深入探讨Java中的枚举类&#xff0c;包…...

操作系统(1.2)--引论

目录 一、操作系统的基本特性 1.并发性 1.1 并行与并发 1.2 引入进程 2.共享性 2.1 互斥共享方式 2.3 同时访问方式 3.虚拟 3.1 时分复用技术 4. 异 步 二、操作系统的主要功能 1.处理机管理功能 1.1 进程控制 1.2 进程同步 1.3 进程通信 1.4 调度 2. 内…...

【Linux】 shell if的[]和[[]]区别

文章目录[]和test[]和[[]]区别总结参考[]和test Shell中的 test 命令用于检查某个条件是否成立&#xff0c;它可以进行数值、字符和文件三个方面的测试 test常用于 if &#xff0c;作为判断条件&#xff0c;if test等价于 if [ ]&#xff0c;因此&#xff0c;test和[] 内的内…...

利用flask解析海康摄像头视频

利用flask解析海康摄像头视频利用flask解析海康摄像头和大华摄像头的视频一、安装依赖包二、获取海康摄像头视频流三、将视频流输出到Web页面四、 创建HTML模板文件利用flask解析海康摄像头和大华摄像头的视频 作为AI智能的一种应用场景&#xff0c;视频监控系统已经在各个行业…...

./docker-compose.yml‘ is invalid

文章目录前言提示原因版本太低解决方法更新删除原来不能执行的/usr/local/bin/docker-compose下载安装docker-compose添加权限前言 安装ctfd过程中的一些报错 rootubuntu:/CTFd# docker-compose up -d ERROR: The Compose file ./docker-compose.yml is invalid because: net…...

Java 流程控制

条件/选择结构 if if(条件表达式){// 表达式为 true 时&#xff0c;执行该代码块 }if(true) {System.out.println("hello"); }if else if(条件表达式){// 表达式为 true 时&#xff0c;执行该代码块 } else {// 表达式为 false 时&#xff0c;执行该代码块 }if(1 …...

ubuntu搭建nfs服务centos挂载访问

在Ubuntu上设置NFS服务器 在Ubuntu上&#xff0c;你可以使用apt包管理器来安装NFS服务器。打开终端并运行&#xff1a; sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享&#xff0c;例如/shared&#xff1a; sudo mkdir /shared sud…...

【Linux】C语言执行shell指令

在C语言中执行Shell指令 在C语言中&#xff0c;有几种方法可以执行Shell指令&#xff1a; 1. 使用system()函数 这是最简单的方法&#xff0c;包含在stdlib.h头文件中&#xff1a; #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包

文章目录 现象&#xff1a;mysql已经安装&#xff0c;但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时&#xff0c;可能是因为以下几个原因&#xff1a;1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

6.9-QT模拟计算器

源码: 头文件: widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMouseEvent>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);…...

从实验室到产业:IndexTTS 在六大核心场景的落地实践

一、内容创作&#xff1a;重构数字内容生产范式 在短视频创作领域&#xff0c;IndexTTS 的语音克隆技术彻底改变了配音流程。B 站 UP 主通过 5 秒参考音频即可克隆出郭老师音色&#xff0c;生成的 “各位吴彦祖们大家好” 语音相似度达 97%&#xff0c;单条视频播放量突破百万…...

【题解-洛谷】P10480 可达性统计

题目&#xff1a;P10480 可达性统计 题目描述 给定一张 N N N 个点 M M M 条边的有向无环图&#xff0c;分别统计从每个点出发能够到达的点的数量。 输入格式 第一行两个整数 N , M N,M N,M&#xff0c;接下来 M M M 行每行两个整数 x , y x,y x,y&#xff0c;表示从 …...