当前位置: 首页 > article >正文

canal实现mysql数据同步

目录

1、canal下载

2、mysql同步用户创建和授权

3、canal admin安装和启动

4、canal server安装和启动

5、java 端集成监听canal 同步的mysql数据

6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步


1、canal下载

canal实现mysql数据同步可以直接安装canal server就可以了,但是为了方便管理(instance配置,canal server状态管理,集群等),需要安装canal admin,应用下载地址:Releases · alibaba/canal · GitHub

进入页面可以选择需要安装的版本

下载canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz

2、mysql同步用户创建和授权

登录mysql
mysql -h 127.0.0.1 -P 3306 -u root -p创建同步用户 repl 密码设为123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';给予同步权限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';给予repl只读test库的权限,test库是用来同步数据的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';canal_manager是canal admin需要的,给予repl对该库的读写权限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';mysql my.cnf配置文件增加主从配置master数据库的配置信息
#主数据主从配置 唯一id
server_id=1
#开启logbin
log-bin=mysql-bin
#写入模式 row
binlog-format=ROW
#需要同步的库
binlog-do-db=test
#忽略的数据库
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema

在canal-admin解压文件的conf中有一个canal_manager.sql,导入到master数据库

3、canal admin安装和启动

把canal.admin-1.1.8.tar.gz上传到linux

解压 tar -zvxf canal.admin-1.1.8.tar.gz 

进入conf目录下,编辑application.yml配置文件。

server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: replpassword: 123456driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=truehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: 123456

重点介绍以下几个参数:

address:我们需要订阅(也就是mysql master服务器)mysql所在的服务器IP和数据库端口。

database:canal.admin web系统必须的几张表,需要在mysql master服务器上初始化conf/canal_manager.sql文件。

sername和password就是mysql master服务器创建的用于复制的用户和密码,也就是我们在canal server中配置的repl 和 123456。

driver-class-name:mysql的驱动,默认是MYSQL5的驱动,如果你的MYSQL是8的(我的就是),要将驱动改为com.mysql.cj.jdbc.Driver。

另外,还需要在mysql连接后面加上allowPublicKeyRetrieval=true,不然启动时,有可能报错。

启动canal.admin

进入bin目录,执行如下命令,启动canal.admin:

./startup.sh

查看 admin 日志

2022-12-10 03:13:58.995 [main] INFO  o.s.jmx.export.annotation.AnnotationMBeanExporter - 
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO  org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO  org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO  o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO  com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)

如果出现上述日志,说明启动成功!

登录admin

通过http://127.0.0.1:8089/访问,默认密码:admin/123456。

注意,IP和密码需要改成你自己配置的。如果是在服务器上配置的,别忘记放开8089端口。

输入用户名和密码之后,出现上述页面说明配置成功!

如果需要修改密码,直接通过执行 select upper(sha1(unhex(sha1('1234567')))) 这个sql得到结果,然后复制到canal_manager库的canal_user表的password字段中就可以了,其中1234567是明文密码,执行上述sql会得到一个密码。

4、canal server安装和启动

把canal.deployer-1.1.8.tar.gz上传到linux

解压 tar -zvxf ccanal.deployer-1.1.8.tar.gz

进入conf目录下,编辑canal.properties配置文件。

注意,如果直接编辑canal.properties,可能无法启动,报如下错误:

可以通过如下方式修改

mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties
# register ip
canal.register.ip =# canal admin config  canalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server启动后自动注入到canal admin管理模块
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =

一般只需要修改下面这3个

canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB

启动canal.server

进入bin目录,执行如下命令,启动canal.server:

./startup.sh

查看canal日志

启动后,canalAdmin的server管理模块,对应创建的canal server会动态识别到,状态变为启动

5、java 端集成监听canal 同步的mysql数据

1、引入依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>

2、编写测试代码

package com.hy.das.config;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;@Component
public class CanalClient implements InitializingBean{private final static int BATCH_SIZE = 1000;@Overridepublic void afterPropertiesSet() throws Exception {// 创建链接 此处的11111为tcp端口 在canal admin Server管理模块可以查看CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"test", "", "");try {//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(".*\\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);System.out.println(message.getEntries().size());//获取批量IDlong batchId = message.getId();//获取批量的数量int size = message.getEntries().size();//如果没有数据if (batchId == -1 || size == 0) {try {//线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {System.out.println("----------------");//如果有数据,处理数据//遍历entries,单条解析for (CanalEntry.Entry entry : message.getEntries()) {//获取表名String tableName = entry.getHeader().getTableName();//获取类型CanalEntry.EntryType entryType = entry.getEntryType();//获取序列化后的数据ByteString storeValue = entry.getStoreValue();//判断entry类型是否为ROWDATA类型if (CanalEntry.EntryType.ROWDATA.equals(entryType)){//反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//获取当前事件操作类型CanalEntry.EventType eventType = rowChange.getEventType();//获取数据集List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//遍历for (CanalEntry.RowData rowData : rowDatasList) {//改变前数据JSONObject jsonObjectBefore = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {jsonObjectBefore.put(column.getName(),column.getValue());}//改变后数据JSONObject jsonObjectAfter = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {jsonObjectAfter.put(column.getName(),column.getValue());}System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);}}else {System.out.println("当前操作类型为:"+entryType);}}}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
}

newSingleConnector方法里面的test是一个instance实列,定义了需要同步的master库的信息(ip、端口、用户名、密码、binlog文件名称、同步位置、需要同步的库、不需要同步的库等)

在canal admin web管理界面的Instance 管理模块,点击新建Instance进行创建,新建页面的Instance名称就是test,这个可以随便填写,代码对应修改就行,所属集群/主机,因为我这里是单机部署,直接选择自动注入的canal server就行,点击载入模板,获取配置初始信息,下图中标出的信息按照实际的修改填入就行,点击保存后,启动这个Instance。

3、启动服务,对test库的sys_user表进行数据更新,可以看到后台已经收到变更数据

6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步

注意上面需要提供对外访问的端口需要开通安全组,比如8089、11111等端口。

参考文章:

【CanalAdmin部署文档】_canal-admin-CSDN博客

https://zhuanlan.zhihu.com/p/590705531

相关文章:

canal实现mysql数据同步

目录 1、canal下载 2、mysql同步用户创建和授权 3、canal admin安装和启动 4、canal server安装和启动 5、java 端集成监听canal 同步的mysql数据 6、java tcp同步只是其中一种方式&#xff0c;还可以通过kafka、rabbitmq等方式进行数据同步 1、canal下载 canal实现mysq…...

解决 MySQL 表结构修改中锁定异常的全链路实战指南:从表结构设计到版本调优

引言 在 MySQL 中执行ALTER TABLE修改表结构&#xff08;如新增字段、调整字段类型&#xff09;时&#xff0c;锁定异常是最常见的阻碍。无论是 5.7 的 “锁等待超时”、8.0 的 “MDL 锁阻塞”&#xff0c;还是高并发下的 “长事务死锁”&#xff0c;本质都是表结构修改需要获…...

动态规划应用场景 + 代表题目清单(模板加上套路加上题单)

1. 序列型DP&#xff08;Sequence DP&#xff09; ✅ 应用场景 单个或多个序列&#xff08;数组/字符串&#xff09;&#xff0c;求最优子结构。 常见问题&#xff1a;最长递增子序列、最长公共子序列、回文子序列。 &#x1f9e0; 套路总结 单序列&#xff1a;dp[i] max(…...

易境通专线散拼系统:全方位支持多种专线物流业务!

在全球化电商快速发展的今天&#xff0c;跨境电商物流已成为电商运营中极为重要的环节。为了确保物流效率、降低运输成本&#xff0c;越来越多的电商卖家选择专线物流服务。专线物流作为五大主要跨境电商物流模式之一&#xff0c;通过固定的运输路线和流程&#xff0c;极大提高…...

nvm版本管理下pnpm 安装失败问题解决

检查当前使用的 Node.js 是否由 nvm 管理 nvm current 应显示类似 18.16.0 这样的版本号&#xff0c;而不是 system。如果是 system&#xff0c;说明你正在使用系统中其他位置的 Node.js 而不是 nvm 管理的版本。 切换回 nvm 管理的版本 nvm use 18.16.0清除 npm 缓存和全局安装…...

C++高频面试考点 -- 智能指针

C高频面试考点 – 智能指针 C11中引入智能指针的概念&#xff0c;方便堆内存管理。这是因为使用普通指针&#xff0c;容易造成堆内存泄漏&#xff0c;二次释放&#xff0c;程序发生异常时内存泄漏等问题。 智能指针在C11版本之后提供&#xff0c;包含在头文件<memory>中…...

06 如何定义方法,掌握有参无参,有无返回值,调用数组作为参数的方法,方法的重载

1.调用方法 2.掌握有参函数 3.调用数组作为参数 一个例题&#xff1a;数组参数&#xff0c;返回值 方法的重载 两个例题&#xff1a;冒泡排序和九九乘法表的格式学习...

使用vscode MSVC CMake进行C++开发和Debug

使用vscode MSVC CMake进行C开发和Debug 前言软件安装安装插件构建debuug方案一debug方案二其他 前言 一般情况下我都是使用visual studio来进行c开发的&#xff0c;但是由于python用的是vscode&#xff0c;所以二者如果统一的话能稍微提高一点效率。 软件安装 需要安装的软…...

C# AutoMapper对象映射详解

引言 在现代软件开发中&#xff0c;特别是采用分层架构的应用程序&#xff0c;我们经常需要在不同的对象类型之间进行转换。例如&#xff0c;从数据库实体&#xff08;Entity&#xff09;转换为数据传输对象&#xff08;DTO&#xff09;&#xff0c;或者从视图模型&#xff08…...

Keil5 MDK LPC1768 RT-Thread KSZ8041NL uIP1.3.1实现UDP网络通讯(服务端接收并发数据)

作为服务端&#xff0c;嵌入式软件实现流程&#xff1a; [上位机A/B/C/...] ↓ UDP [uIP 协议栈接收] ↓ [udp_appcall()] |-> 复制数据 |-> 保存源IP/端口 |-> 推送到接收队列 …...

提升开发运维效率:原力棱镜游戏公司的 Amazon Q Developer CLI 实践

引言 在当今快速发展的云计算环境中&#xff0c;游戏开发者面临着新的挑战和机遇。为了提升开发效率&#xff0c;需要更智能的工具来辅助工作流程。Amazon Q Developer CLI 作为亚马逊云科技推出的生成式 AI 助手&#xff0c;为开发者提供了一种新的方式来与云服务交互。 Ama…...

20250523-BUG-E1696:无法打开元数据文件“platform.winmd(已解决)

BUG&#xff1a;E1696&#xff1a;无法打开元数据文件“platform.winmd&#xff08;已解决&#xff09; 最近在用VisualStudio2022打开一个VisualStudio2017的C老项目后报了这个错&#xff0c;几经周折终于解决了&#xff0c;以下是我用的解决方法&#xff1a; 将Debug从Win32改…...

职业规划:动态迭代的系统化路径

1. 底层逻辑:构建职业规划的3大支柱 1.1 价值观锚定 1.1.1 生涯幻游法 通过想象理想生活的场景,包括工作环境、时间分配、人际关系、经济状态等,明确自己内心真正渴望的生活和工作状态,为职业规划提供方向指引。 1.1.2 价值观筛选 使用「价值观筛选卡」从30个常见职业价值…...

redisson-spring-boot-starter 版本选择

以下是更详细的 Spring Boot 与 redisson-spring-boot-starter 版本对应关系&#xff0c;按照 Spring Boot 主版本和子版本细分&#xff1a; 1. Spring Boot 3.x 系列 3.2.x 推荐 Redisson 版本&#xff1a;3.23.1&#xff08;最新稳定版&#xff0c;兼容 Redis 7.x&#xf…...

Docker run -v 的 rw 和 ro 模式_docker ro

一、前言 在使用 Docker 启动容器时&#xff0c;通常需要将宿主机的文件或目录挂载到容器中&#xff0c;以便于管理配置、持久化数据和调试日志。本篇博客将重点介绍 -v/--volume 参数的使用方式、挂载权限&#xff08;rw 与 ro&#xff09;的区别&#xff0c;以及如何通过 do…...

CentOS相关操作hub(更新中)

CentOS介绍&#xff1a; CentOS&#xff08;Community Enterprise Operating System&#xff09;是基于 Red Hat Enterprise Linux&#xff08;RHEL&#xff09;源代码编译的开源企业级操作系统&#xff0c;提供与 RHEL 二进制兼容的功能 完全兼容 RHEL&#xff0c;可直接使用…...

@Column 注解属性详解

提示&#xff1a;文章旨在说明 Column 注解属性如何在日常开发中使用&#xff0c;数据库类型为 MySql&#xff0c;其他类型数据库可能存在偏差&#xff0c;需要注意。 文章目录 一、name 方法二、unique 方法三、nullable 方法四、insertable 方法五、updatable 方法六、column…...

基于 ESP32 与 AWS 全托管服务的 IoT 架构:MQTT + WebSocket 实现设备-云-APP 高效互联

目录 一、总体架构图 二、设备端(ESP32)低功耗设计(适配 AWS IoT) 1.MQTT 设置(ESP32 连接 AWS IoT Core) 2.低功耗策略总结(ESP32) 三、云端架构(基于 AWS Serverless + IoT Core) 1.AWS IoT Core 接入 2.云端 → APP:WebSocket 推送方案 流程: 3.数据存…...

unity在urp管线中插入事件

由于在urp下&#xff0c;打包后传统的相机事件有些无法正确执行&#xff0c;这时候我们需要在urp管线中的特定时机进行处理一些事件&#xff0c;需要创建继承ScriptableRenderPass和ScriptableRendererFeature的脚本&#xff0c;示例如下&#xff1a; PluginEventPass&#xf…...

前后端的双精度浮点数精度不一致问题解决方案,自定义Spring的消息转换器处理JSON转换

在 Java 中&#xff0c;Long 是一个 64 位的长整型&#xff0c;通常用于表示很大的整数。在后端&#xff0c;Long 类型的数据没有问题&#xff0c;因为 Java 本身使用的是 64 位的整数&#xff0c;可以表示的范围非常大。 但是&#xff0c;在前端 JavaScript 中&#xff0c;Lo…...

docker安装es连接kibana并安装分词器

使用Docker部署Elasticsearch、Kibana并安装分词器有以下主要优点&#xff1a; 1. 快速部署与一致性 一键式部署&#xff1a;通过Docker Compose可以快速搭建完整的ELK栈环境 环境一致性&#xff1a;确保开发、测试和生产环境完全一致&#xff0c;避免"在我机器上能运行…...

线性回归中涉及的数学基础

线性回归中涉及的数学基础 本文详细地说明了线性回归中涉及到的主要的数学基础。 如果数学基础很扎实可以直接空降博文: 线性回归&#xff08;一&#xff09;-CSDN博客 一、概率、似然与概率密度函数 1. 概率&#xff08;Probability&#xff09; 定义&#xff1a;概率是描述…...

如何计算VLLM本地部署Qwen3-4B的GPU最小配置应该是多少?多人并发访问本地大模型的GPU配置应该怎么分配?

本文一定要阅读我上篇文章&#xff01;&#xff01;&#xff01; 超详细VLLM框架部署qwen3-4B加混合推理探索&#xff01;&#xff01;&#xff01;-CSDN博客 本文是基于上篇文章遗留下的问题进行说明的。 一、本文解决的问题 问题1&#xff1a;我明明只部署了qwen3-4B的模型…...

PostgreSQL日常维护

目录 一&#xff1a;基本使用 1.登录数据库 2.数据库操作 2.1列出库 2.2创建库 2.3删除库 2.4切换库 2.5查看库大小 3.数据表操作 3.1 列出表 3.2创建表 3.3复制表 3.4删除表 4.模式操作命令 4.1创建模式 4.2默认模式 4.3删除模式 4.4查看所有模式 4.5 在指定…...

Attu下载 Mac版与Win版

通过Git地址下载 Mac 版选择对于的架构进行安装 其中遇到了安装不成功&#xff0c;文件损坏等问题 一般是两种情况导致 1.安装版本不对 2.系统权限限制 https://www.cnblogs.com/similar/p/11280162.html打开terminal执行以下命令 sudo spctl --master-disable安装包Git下载地…...

V2X协议|如何做到“车联万物”?【无线通信小百科】

1、什么是V2X V2X&#xff08;Vehicle-to-Everything&#xff09;即“车联万物”&#xff0c;是一项使车辆能够与周围环境实现实时通信的前沿技术。它允许车辆与其他交通参与者和基础设施进行信息交互。通过V2X&#xff0c;车辆不仅具备“远程感知”能力&#xff0c;还能在更大…...

【zookeeper】--部署3.6.3

文章目录 下载解压创建data和logs配置文件1)创建目录并且编辑 zoo.cfg2)接下来将 node01 的 ZooKeeper 所有文件拷贝至 node02 和 node03。推荐从 node02 和 node03 拷贝4&#xff09;最后 vim /etc/profile 配置环境变量&#xff0c;环境搭建结束。配完环境变量后 source /etc…...

[测试_3] 生命周期 | Bug级别 | 测试流程 | 思考

目录 一、软件测试的生命周期&#xff08;重点&#xff09; 1、软件测试 & 软件开发生命周期 &#xff08;1&#xff09;需求分析 &#xff08;2&#xff09;测试计划 &#xff08;3&#xff09;测试设计与开发 &#xff08;4&#xff09;测试执行 &#xff08;5&am…...

物联网(IoT)智能项目全景指南:技术构架、实现细节与应用实践

目录 一、物联网项目的核心组成和发展方向 1. 核心组成 2. 发展趋势 二、系统设计的详细流程 1. 需求分析与方案规划 2. 硬件方案深度设计 3. 软件架构设计 4. 方案示意图&#xff08;架构图&#xff09; 三、关键技术深度剖析 1. 传感器及其接口技术 2. 嵌入式MCU选…...

【Go】1、Go语言基础

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言的特点 Go语言由Google团队设计&#xff0c;以简洁、高效、并发友好为核心目标。 具有以下优点&#xff1a; 语法简单、学习曲线平缓&#xff1a;语法关键字很少&#xff0c;且…...