实战:MySQL数据同步神器之Canal
1.概叙
场景一:数据增量实时同步
项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。
那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?
场景二:缓存一致性问题
Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客

在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?
我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?
针对上面两种场景,可以看看canal的解决方案。
什么是Canal?
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。
GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件
Home · alibaba/canal Wiki · GitHub
支持范围
Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。
部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X

主要特性
高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。
2.Canal原理

MySQL主备复制原理
-
MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)
-
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
-
MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据

Canal工作原理
Canal巧妙地模拟了MySQL主从复制的机制。具体而言:
- 伪装为MySQL Slave:canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
- 获取Binary Log:MySQL Master接收到请求后,开始推送Binary Log给Canal。
- 解析日志事件:Canal解析接收到的Binary Log,将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流),转换为json格式。
- 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。
优点: 可以完全和业务代码解耦,增量日志订阅。
缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。
Canal总体架构


对应这些软件包:
- deployer包:即canal server,负责从master库同步binlog,将数据通过tcp的方式同步给Adapter适配器,经过Adapter适配同步给目标库;通过MQ将数据同步给canal client或目标库,canal client也可以再同步给目标库。
- server包官方介绍:https://github.com/alibaba/canal/wiki/DevGuide
- adapter包:给目标库同步数据。目前支持:Hbase、RDB、ES。
- adapter包官方介绍:https://github.com/alibaba/canal/wiki/ClientAdapter
- admin包:canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
- admin包官方介绍:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
- canal.client:消费server数据
- Canal设计了client-server架构,支持多种语言客户端通过protobuf 3.0协议与之交互,官方及社区提供了以下客户端:
- Java客户端:ClientExample
- C#客户端:CanalSharp
- Go客户端:canal-go
- Python客户端:canal-python
- PHP客户端:canal-php
- Rust客户端:canal-rs
- Node.js客户端:canal-nodejs
-
除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:
Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
Otter:Canal的消费端开源项目,用于数据同步与数据集成。
Canal高可用架构

整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。
canal server实现流程如下:
- 1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
- 2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
- 3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动instance。
- 4、canal client 每次进行connect时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
依赖:
- JDK1.8
- MySQL:用于canal-admin存储配置和节点等相关数据
- Zookeeper
3.Canal实战
准备环境
| 名称 | 版本 |
|---|---|
| MySQL | 5.7 |
| elasticsearch | 7.17.9 |
| canal | 1.1.7 |
| jdk | 1.8 |
-
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复- 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
-
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;下载 canal

本次操作只需要下载admin、deployer两个包。
下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。

创建admin的数据库:conf/canal_manager.sql

启动admin前,先完成建库和见表(包括admin的默认登录账号密码 admin/123456),否则启动报错。主要是如下六张表。

修改配置文件
修改server的配置文件:conf/canal.properties 和conf/example/instance.properties


切记数据库和配置文件中的密码要一致。

修改admin的配置文件:conf/application.yml


分别启动admin和server
启动server:bin/startup.bat

启动admin:bin/startup.bat

登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard
默认账号密码:admin/123456

配置java客户端
引入依赖
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>
客户端代码
package com.zxx.study.base.canal;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;import java.net.InetSocketAddress;
import java.util.List;/*** @author zhouxx* @create 2024-08-01 21:59*/
public class CanalClient {@SneakyThrowspublic static void main(String[] args) {try {// 创建canal客户端,单链接模式CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",11111), "example", "", "");// 创建连接canalConnector.connect();while (true) {// 订阅数据库// canalConnector.subscribe("mall");// 获取数据Message message = canalConnector.get(100);// 获取Entry集合List<CanalEntry.Entry> entries = message.getEntries();// 判断集合是否为空,如果为空,则等待一会继续拉取数据if (entries.size() <= 0) {
// System.out.println("当次抓取没有数据,休息一会。。。。。。");Thread.sleep(1000);} else {// 遍历entries,单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName = entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType = entry.getEntryType();//3.获取序列化后的数据ByteString storeValue = entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();//7.获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();//8.遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);/*** Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}* */}}}}}}catch (Exception e){e.printStackTrace();}}}
运行效果:在数据库里面忝删改查,均可以在客户端中打印出来

cancel框架同步mysql数据到kafka
参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客
利用canal进行MySQL到ES的数据实时同步
参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客

结合上图,至此,场景一和场景二中的问题,均可以解决。
相关文章:
实战:MySQL数据同步神器之Canal
1.概叙 场景一:数据增量实时同步 项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数…...
5.6软件工程-运维
运维 系统转换系统维护系统评价练习题 系统转换 新老系统的转换 系统转换是指:新系统开发完毕,投入运行,取代现有系统的过程,需要考虑多方面的问题,以实现与老系统的交接,有一下三种转换计划: …...
在JavaScript中如何确保构造函数只被new调用
构造函数是一个特殊的函数,用于初始化一个新创建的对象。它是在创建对象时自动调用的。构造函数通常用于为对象的属性赋值,或者执行其他必要的设置。 使用函数名大写字母开头,这是一种命名约定,用于区分构造函数和普通函数。如何…...
【数据结构算法经典题目刨析(c语言)】反转链表(图文详解)
💓 博客主页:C-SDN花园GGbond ⏩ 文章专栏:数据结构经典题目刨析(c语言) 目录 一、题目描述 二、思路分析 三、代码实现 一、题目描述: 二、思路分析 : 通过三个指针n1,n2,n3来实现链表的反转 1.首先初始化 n1为…...
机器学习之争:Python vs R,谁更胜一筹?
一、引言 随着人工智能和大数据的迅速发展,机器学习已成为现代科技的重要组成部分。在医疗、金融、零售、制造等多个领域,机器学习技术的应用无处不在。从数据分析到预测建模,再到深度学习,机器学习正在改变我们的工作和生活方式…...
Vulnhub靶机:JANGOW_ 1.0.1
目录 前言: 一、安装虚拟机Jangow:1.0.1靶机 二、Web部分 前言: 难度:简单,本文使用VirtualBox打开,下载地址: https://download.vulnhub.com/jangow/jangow-01-1.0.1.ova 一、安装虚拟机J…...
Python脚本实现USB自动复制文件
USB驱动器作为常见的数据存储设备,经常用于数据传输和备份。 然而,我们在手动处理文件复制可能效率低下且容易出错。 因此,我们可以利用Python编写脚本来自动化这一过程,提高效率和数据安全性。 准备工作 首先,我们需…...
【C++学习第19天】最小生成树(对应无向图)
一、最小生成树 二、代码 1、Prim算法 #include <cstring> #include <iostream> #include <algorithm>using namespace std;const int N 510, INF 0x3f3f3f3f;int n, m; int g[N][N]; int dist[N]; bool st[N];int prim() {memset(dist, 0x3f, sizeof di…...
第一个 Flask 项目
第一个 Flask 项目 安装环境创建项目启动程序访问项目参数说明Flask对象的初始化参数app.run()参数 应用程序配置参数使用 Flask 的 config.from_object() 方法使用 Flask 的 config.from_pyfile() 方法使用 Flask 的 config.from_envvar() 方法步骤 1: 设置环境变量步骤 2: 编…...
利用 Angular 发挥环境的力量
一.介绍 您是否曾想过如何在不同的环境中为同一应用设置不同的颜色、标题或 API 调用?可以肯定的是,生产 API 和测试 API 是不同的,应谨慎使用。部署时,我们不会在项目的所有地方手动更改所有 API 调用。不应这样做,因…...
Vue3+TypeScript+printjs 实现标签批量打印功能
前言:临时性需求没怎么接触过前端,代码实现有问题及优化点希望大佬可以留言告知一下 开发工具:VS CODE 界面开发:Vue3TypeScriptElementPlus 打印组件:Print-JS 前端打印入口图: 标签页面: …...
微信文件如何直接打印及打印功能在哪里设置?
在数字化时代,打印需求依旧不可或缺,但传统打印店的高昂价格和不便操作常常让人头疼。幸运的是,琢贝打印作为一款集便捷、经济、高效于一体的网上打印平台,正逐渐成为众多用户的首选。特别是通过微信小程序下单,更是让…...
dataX -20240804-master分支
1、相关报错 Error: java.io.IOException: java.lang.RuntimeException: ORC split generation failed with exception: org.apache.orc.impl.SchemaEvolution$IllegalEvolutionException: ORC does not support type conversion from file type struct<nanos:int> (10)…...
【网络】传输层
传输层 一、预备知识1、端口号1、端口号范围划分2、知名端口号3、两个问题4、netstat && iostate5、pidof6、谈下面协议始终铭记两个问题 二、UDP协议(简单)1、UDP协议端格式2、UDP的特点3、面向数据报4、UDP缓冲区 三、TCP协议(重点…...
学生管理系统之更新和删除、筛选
学生管理系统之更新和删除 建立新的窗口 添加组件 进行布局 使用Widget把二个放在一块,作为一列,然后全选进行栅格布局,最后添加弹簧进行微调。 编写增加的槽函数 在主函数中调用对话框...
教您一键批量下载拼多多批发图片信息,节省时间
图片是电商的核心展示手段,高质量、吸引人的图片能显著提升商品吸引力,增强用户体验,促进购买决策。良好的视觉呈现有助于品牌形象的塑造,提高转化率和客户满意度,对电商平台的流量和销售业绩具有直接影响。 使用图快…...
基于微信小程序的微课堂笔记的设计与实现(源码+论文+部署讲解等)
博主介绍:✌全网粉丝10W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术栈介绍:我是程序员阿龙ÿ…...
去噪扩散恢复模型
去噪扩散恢复模型 Bahjat Kawar 计算机科学系 以色列海法理工学院 bahjat.kawarcs.technion.ac.il Michael Elad 计算机科学系 以色列海法理工学院 eladcs.technion.ac.il Stefano Ermon 计算机科学系 美国加利福尼亚州斯坦福大学 ermoncs.stanford.edu …...
Stable Diffusion 官方模型V1.5版本下载
模型描述 Stable Diffusion的官方模型更适合绘制偏写实的风格,如果您想绘制二次元之类的风格,可以考虑下载本站的其它模型。 安装方法 将模型下载后,将会得到一个名为****.ckpt格式的文件,将该文件剪切至你的Stable Diffusion本…...
【算法】双指针-OJ题详解1
双指针-OJ题 移动零(点击跳转)原理讲解代码实现 复写零(点击跳转)原理讲解代码实现 快乐数(点击跳转)原理讲解代码实现 盛最多水的容器(点击跳转)原理讲解代码实现 有效三角形的个数…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...
Matlab | matlab常用命令总结
常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
深入浅出深度学习基础:从感知机到全连接神经网络的核心原理与应用
文章目录 前言一、感知机 (Perceptron)1.1 基础介绍1.1.1 感知机是什么?1.1.2 感知机的工作原理 1.2 感知机的简单应用:基本逻辑门1.2.1 逻辑与 (Logic AND)1.2.2 逻辑或 (Logic OR)1.2.3 逻辑与非 (Logic NAND) 1.3 感知机的实现1.3.1 简单实现 (基于阈…...
