canal实现mysql数据同步
目录
1、canal下载
2、mysql同步用户创建和授权
3、canal admin安装和启动
4、canal server安装和启动
5、java 端集成监听canal 同步的mysql数据
6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步
1、canal下载
canal实现mysql数据同步可以直接安装canal server就可以了,但是为了方便管理(instance配置,canal server状态管理,集群等),需要安装canal admin,应用下载地址:Releases · alibaba/canal · GitHub
进入页面可以选择需要安装的版本
下载canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz
2、mysql同步用户创建和授权
登录mysql
mysql -h 127.0.0.1 -P 3306 -u root -p创建同步用户 repl 密码设为123456
CREATE USER 'repl'@'%' IDENTIFIED BY '123456';给予同步权限
GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456';给予repl只读test库的权限,test库是用来同步数据的
GRANT SELECT ON test.* to 'repl'@'%' identified by '123456';canal_manager是canal admin需要的,给予repl对该库的读写权限
GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456';mysql my.cnf配置文件增加主从配置master数据库的配置信息
#主数据主从配置 唯一id
server_id=1
#开启logbin
log-bin=mysql-bin
#写入模式 row
binlog-format=ROW
#需要同步的库
binlog-do-db=test
#忽略的数据库
replicate-ignore-db=mysql
replicate-ignore-db=sys
replicate-ignore-db=information_schema
replicate-ignore-db=performance_schema
在canal-admin解压文件的conf中有一个canal_manager.sql,导入到master数据库
3、canal admin安装和启动
把canal.admin-1.1.8.tar.gz上传到linux
解压 tar -zvxf canal.admin-1.1.8.tar.gz
进入conf目录下,编辑application.yml配置文件。
server:port: 8089
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8spring.datasource:address: 127.0.0.1:3306database: canal_managerusername: replpassword: 123456driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=truehikari:maximum-pool-size: 30minimum-idle: 1canal:adminUser: adminadminPasswd: 123456
重点介绍以下几个参数:
address:我们需要订阅(也就是mysql master服务器)mysql所在的服务器IP和数据库端口。
database:canal.admin web系统必须的几张表,需要在mysql master服务器上初始化conf/canal_manager.sql文件。
sername和password就是mysql master服务器创建的用于复制的用户和密码,也就是我们在canal server中配置的repl 和 123456。
driver-class-name:mysql的驱动,默认是MYSQL5的驱动,如果你的MYSQL是8的(我的就是),要将驱动改为com.mysql.cj.jdbc.Driver。
另外,还需要在mysql连接后面加上allowPublicKeyRetrieval=true,不然启动时,有可能报错。
启动canal.admin
进入bin目录,执行如下命令,启动canal.admin:
./startup.sh
查看 admin 日志
2022-12-10 03:13:58.995 [main] INFO o.s.jmx.export.annotation.AnnotationMBeanExporter -
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)
如果出现上述日志,说明启动成功!
登录admin
通过http://127.0.0.1:8089/访问,默认密码:admin/123456。
注意,IP和密码需要改成你自己配置的。如果是在服务器上配置的,别忘记放开8089端口。
输入用户名和密码之后,出现上述页面说明配置成功!
如果需要修改密码,直接通过执行 select upper(sha1(unhex(sha1('1234567')))) 这个sql得到结果,然后复制到canal_manager库的canal_user表的password字段中就可以了,其中1234567是明文密码,执行上述sql会得到一个密码。
4、canal server安装和启动
把canal.deployer-1.1.8.tar.gz上传到linux
解压 tar -zvxf ccanal.deployer-1.1.8.tar.gz
进入conf目录下,编辑canal.properties配置文件。
注意,如果直接编辑canal.properties,可能无法启动,报如下错误:
可以通过如下方式修改
mv canal.properties canal.properties_bak
cp canal_local.properties canal.properties
vim canal.properties
# register ip
canal.register.ip =# canal admin config canalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
# admin auto register canal server启动后自动注入到canal admin管理模块
canal.admin.register.auto = true
canal.admin.register.cluster =
canal.admin.register.name =
一般只需要修改下面这3个
canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
启动canal.server
进入bin目录,执行如下命令,启动canal.server:
./startup.sh
查看canal日志
启动后,canalAdmin的server管理模块,对应创建的canal server会动态识别到,状态变为启动
5、java 端集成监听canal 同步的mysql数据
1、引入依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
2、编写测试代码
package com.hy.das.config;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;@Component
public class CanalClient implements InitializingBean{private final static int BATCH_SIZE = 1000;@Overridepublic void afterPropertiesSet() throws Exception {// 创建链接 此处的11111为tcp端口 在canal admin Server管理模块可以查看CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),"test", "", "");try {//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(".*\\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);System.out.println(message.getEntries().size());//获取批量IDlong batchId = message.getId();//获取批量的数量int size = message.getEntries().size();//如果没有数据if (batchId == -1 || size == 0) {try {//线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {System.out.println("----------------");//如果有数据,处理数据//遍历entries,单条解析for (CanalEntry.Entry entry : message.getEntries()) {//获取表名String tableName = entry.getHeader().getTableName();//获取类型CanalEntry.EntryType entryType = entry.getEntryType();//获取序列化后的数据ByteString storeValue = entry.getStoreValue();//判断entry类型是否为ROWDATA类型if (CanalEntry.EntryType.ROWDATA.equals(entryType)){//反序列化CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//获取当前事件操作类型CanalEntry.EventType eventType = rowChange.getEventType();//获取数据集List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();//遍历for (CanalEntry.RowData rowData : rowDatasList) {//改变前数据JSONObject jsonObjectBefore = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {jsonObjectBefore.put(column.getName(),column.getValue());}//改变后数据JSONObject jsonObjectAfter = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {jsonObjectAfter.put(column.getName(),column.getValue());}System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);}}else {System.out.println("当前操作类型为:"+entryType);}}}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}
}
newSingleConnector方法里面的test是一个instance实列,定义了需要同步的master库的信息(ip、端口、用户名、密码、binlog文件名称、同步位置、需要同步的库、不需要同步的库等)
在canal admin web管理界面的Instance 管理模块,点击新建Instance进行创建,新建页面的Instance名称就是test,这个可以随便填写,代码对应修改就行,所属集群/主机,因为我这里是单机部署,直接选择自动注入的canal server就行,点击载入模板,获取配置初始信息,下图中标出的信息按照实际的修改填入就行,点击保存后,启动这个Instance。
3、启动服务,对test库的sys_user表进行数据更新,可以看到后台已经收到变更数据
6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步
注意上面需要提供对外访问的端口需要开通安全组,比如8089、11111等端口。
参考文章:
【CanalAdmin部署文档】_canal-admin-CSDN博客
https://zhuanlan.zhihu.com/p/590705531
相关文章:

canal实现mysql数据同步
目录 1、canal下载 2、mysql同步用户创建和授权 3、canal admin安装和启动 4、canal server安装和启动 5、java 端集成监听canal 同步的mysql数据 6、java tcp同步只是其中一种方式,还可以通过kafka、rabbitmq等方式进行数据同步 1、canal下载 canal实现mysq…...
解决 MySQL 表结构修改中锁定异常的全链路实战指南:从表结构设计到版本调优
引言 在 MySQL 中执行ALTER TABLE修改表结构(如新增字段、调整字段类型)时,锁定异常是最常见的阻碍。无论是 5.7 的 “锁等待超时”、8.0 的 “MDL 锁阻塞”,还是高并发下的 “长事务死锁”,本质都是表结构修改需要获…...
动态规划应用场景 + 代表题目清单(模板加上套路加上题单)
1. 序列型DP(Sequence DP) ✅ 应用场景 单个或多个序列(数组/字符串),求最优子结构。 常见问题:最长递增子序列、最长公共子序列、回文子序列。 🧠 套路总结 单序列:dp[i] max(…...

易境通专线散拼系统:全方位支持多种专线物流业务!
在全球化电商快速发展的今天,跨境电商物流已成为电商运营中极为重要的环节。为了确保物流效率、降低运输成本,越来越多的电商卖家选择专线物流服务。专线物流作为五大主要跨境电商物流模式之一,通过固定的运输路线和流程,极大提高…...
nvm版本管理下pnpm 安装失败问题解决
检查当前使用的 Node.js 是否由 nvm 管理 nvm current 应显示类似 18.16.0 这样的版本号,而不是 system。如果是 system,说明你正在使用系统中其他位置的 Node.js 而不是 nvm 管理的版本。 切换回 nvm 管理的版本 nvm use 18.16.0清除 npm 缓存和全局安装…...
C++高频面试考点 -- 智能指针
C高频面试考点 – 智能指针 C11中引入智能指针的概念,方便堆内存管理。这是因为使用普通指针,容易造成堆内存泄漏,二次释放,程序发生异常时内存泄漏等问题。 智能指针在C11版本之后提供,包含在头文件<memory>中…...

06 如何定义方法,掌握有参无参,有无返回值,调用数组作为参数的方法,方法的重载
1.调用方法 2.掌握有参函数 3.调用数组作为参数 一个例题:数组参数,返回值 方法的重载 两个例题:冒泡排序和九九乘法表的格式学习...

使用vscode MSVC CMake进行C++开发和Debug
使用vscode MSVC CMake进行C开发和Debug 前言软件安装安装插件构建debuug方案一debug方案二其他 前言 一般情况下我都是使用visual studio来进行c开发的,但是由于python用的是vscode,所以二者如果统一的话能稍微提高一点效率。 软件安装 需要安装的软…...
C# AutoMapper对象映射详解
引言 在现代软件开发中,特别是采用分层架构的应用程序,我们经常需要在不同的对象类型之间进行转换。例如,从数据库实体(Entity)转换为数据传输对象(DTO),或者从视图模型(…...
Keil5 MDK LPC1768 RT-Thread KSZ8041NL uIP1.3.1实现UDP网络通讯(服务端接收并发数据)
作为服务端,嵌入式软件实现流程: [上位机A/B/C/...] ↓ UDP [uIP 协议栈接收] ↓ [udp_appcall()] |-> 复制数据 |-> 保存源IP/端口 |-> 推送到接收队列 …...

提升开发运维效率:原力棱镜游戏公司的 Amazon Q Developer CLI 实践
引言 在当今快速发展的云计算环境中,游戏开发者面临着新的挑战和机遇。为了提升开发效率,需要更智能的工具来辅助工作流程。Amazon Q Developer CLI 作为亚马逊云科技推出的生成式 AI 助手,为开发者提供了一种新的方式来与云服务交互。 Ama…...
20250523-BUG-E1696:无法打开元数据文件“platform.winmd(已解决)
BUG:E1696:无法打开元数据文件“platform.winmd(已解决) 最近在用VisualStudio2022打开一个VisualStudio2017的C老项目后报了这个错,几经周折终于解决了,以下是我用的解决方法: 将Debug从Win32改…...
职业规划:动态迭代的系统化路径
1. 底层逻辑:构建职业规划的3大支柱 1.1 价值观锚定 1.1.1 生涯幻游法 通过想象理想生活的场景,包括工作环境、时间分配、人际关系、经济状态等,明确自己内心真正渴望的生活和工作状态,为职业规划提供方向指引。 1.1.2 价值观筛选 使用「价值观筛选卡」从30个常见职业价值…...
redisson-spring-boot-starter 版本选择
以下是更详细的 Spring Boot 与 redisson-spring-boot-starter 版本对应关系,按照 Spring Boot 主版本和子版本细分: 1. Spring Boot 3.x 系列 3.2.x 推荐 Redisson 版本:3.23.1(最新稳定版,兼容 Redis 7.x…...
Docker run -v 的 rw 和 ro 模式_docker ro
一、前言 在使用 Docker 启动容器时,通常需要将宿主机的文件或目录挂载到容器中,以便于管理配置、持久化数据和调试日志。本篇博客将重点介绍 -v/--volume 参数的使用方式、挂载权限(rw 与 ro)的区别,以及如何通过 do…...
CentOS相关操作hub(更新中)
CentOS介绍: CentOS(Community Enterprise Operating System)是基于 Red Hat Enterprise Linux(RHEL)源代码编译的开源企业级操作系统,提供与 RHEL 二进制兼容的功能 完全兼容 RHEL,可直接使用…...

@Column 注解属性详解
提示:文章旨在说明 Column 注解属性如何在日常开发中使用,数据库类型为 MySql,其他类型数据库可能存在偏差,需要注意。 文章目录 一、name 方法二、unique 方法三、nullable 方法四、insertable 方法五、updatable 方法六、column…...

基于 ESP32 与 AWS 全托管服务的 IoT 架构:MQTT + WebSocket 实现设备-云-APP 高效互联
目录 一、总体架构图 二、设备端(ESP32)低功耗设计(适配 AWS IoT) 1.MQTT 设置(ESP32 连接 AWS IoT Core) 2.低功耗策略总结(ESP32) 三、云端架构(基于 AWS Serverless + IoT Core) 1.AWS IoT Core 接入 2.云端 → APP:WebSocket 推送方案 流程: 3.数据存…...

unity在urp管线中插入事件
由于在urp下,打包后传统的相机事件有些无法正确执行,这时候我们需要在urp管线中的特定时机进行处理一些事件,需要创建继承ScriptableRenderPass和ScriptableRendererFeature的脚本,示例如下: PluginEventPass…...
前后端的双精度浮点数精度不一致问题解决方案,自定义Spring的消息转换器处理JSON转换
在 Java 中,Long 是一个 64 位的长整型,通常用于表示很大的整数。在后端,Long 类型的数据没有问题,因为 Java 本身使用的是 64 位的整数,可以表示的范围非常大。 但是,在前端 JavaScript 中,Lo…...

docker安装es连接kibana并安装分词器
使用Docker部署Elasticsearch、Kibana并安装分词器有以下主要优点: 1. 快速部署与一致性 一键式部署:通过Docker Compose可以快速搭建完整的ELK栈环境 环境一致性:确保开发、测试和生产环境完全一致,避免"在我机器上能运行…...

线性回归中涉及的数学基础
线性回归中涉及的数学基础 本文详细地说明了线性回归中涉及到的主要的数学基础。 如果数学基础很扎实可以直接空降博文: 线性回归(一)-CSDN博客 一、概率、似然与概率密度函数 1. 概率(Probability) 定义:概率是描述…...

如何计算VLLM本地部署Qwen3-4B的GPU最小配置应该是多少?多人并发访问本地大模型的GPU配置应该怎么分配?
本文一定要阅读我上篇文章!!! 超详细VLLM框架部署qwen3-4B加混合推理探索!!!-CSDN博客 本文是基于上篇文章遗留下的问题进行说明的。 一、本文解决的问题 问题1:我明明只部署了qwen3-4B的模型…...
PostgreSQL日常维护
目录 一:基本使用 1.登录数据库 2.数据库操作 2.1列出库 2.2创建库 2.3删除库 2.4切换库 2.5查看库大小 3.数据表操作 3.1 列出表 3.2创建表 3.3复制表 3.4删除表 4.模式操作命令 4.1创建模式 4.2默认模式 4.3删除模式 4.4查看所有模式 4.5 在指定…...

Attu下载 Mac版与Win版
通过Git地址下载 Mac 版选择对于的架构进行安装 其中遇到了安装不成功,文件损坏等问题 一般是两种情况导致 1.安装版本不对 2.系统权限限制 https://www.cnblogs.com/similar/p/11280162.html打开terminal执行以下命令 sudo spctl --master-disable安装包Git下载地…...

V2X协议|如何做到“车联万物”?【无线通信小百科】
1、什么是V2X V2X(Vehicle-to-Everything)即“车联万物”,是一项使车辆能够与周围环境实现实时通信的前沿技术。它允许车辆与其他交通参与者和基础设施进行信息交互。通过V2X,车辆不仅具备“远程感知”能力,还能在更大…...
【zookeeper】--部署3.6.3
文章目录 下载解压创建data和logs配置文件1)创建目录并且编辑 zoo.cfg2)接下来将 node01 的 ZooKeeper 所有文件拷贝至 node02 和 node03。推荐从 node02 和 node03 拷贝4)最后 vim /etc/profile 配置环境变量,环境搭建结束。配完环境变量后 source /etc…...

[测试_3] 生命周期 | Bug级别 | 测试流程 | 思考
目录 一、软件测试的生命周期(重点) 1、软件测试 & 软件开发生命周期 (1)需求分析 (2)测试计划 (3)测试设计与开发 (4)测试执行 (5&am…...
物联网(IoT)智能项目全景指南:技术构架、实现细节与应用实践
目录 一、物联网项目的核心组成和发展方向 1. 核心组成 2. 发展趋势 二、系统设计的详细流程 1. 需求分析与方案规划 2. 硬件方案深度设计 3. 软件架构设计 4. 方案示意图(架构图) 三、关键技术深度剖析 1. 传感器及其接口技术 2. 嵌入式MCU选…...
【Go】1、Go语言基础
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言的特点 Go语言由Google团队设计,以简洁、高效、并发友好为核心目标。 具有以下优点: 语法简单、学习曲线平缓:语法关键字很少,且…...