当前位置: 首页 > news >正文

实战:MySQL数据同步神器之Canal

1.概叙

场景一:数据增量实时同步

项目中业务数据量比较大,每类业务表都达到千万级别,虽然做了分库分表,每张表数据控制在300W以下,但是效率还是达不到要求,为了提高查询效率,打算使用ES进行数据查询。

那么这个时候问题来了,怎么把MySQL中增量数据同步到ES?

场景二:缓存一致性问题

Java web应用性能分析之【高并发之缓存-多级缓存】_java多级缓存-CSDN博客

在前面文章中有提到这个场景,如何保证redis、EhCache、MySQL数据一致?

我们都知道作为数据库写操作,是不通过缓存的。假设商品服务实例 1 将 1 号商品价格调整为 80 元,这会衍生一个新问题:如何主动向应用程序推送数据变更的消息来保证它们也能同步更新缓存呢?

针对上面两种场景,可以看看canal的解决方案。

什么是Canal?

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。项目起源于阿里巴巴内部对于跨机房数据同步的需求,通过解析MySQL的二进制日志(Binary Log),Canal能够捕获并推送数据库的变更事件,满足了诸如数据库镜像、实时备份、索引实时维护等多种业务场景的需求。

GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

Home · alibaba/canal Wiki · GitHub

支持范围

Canal当前支持MySQL数据库的多个版本,包括但不限于5.1.x、5.5.x、5.6.x、5.7.x及8.0.x,同时也兼容阿里云RDS等云数据库服务,为用户提供了广泛的数据库兼容性保障。

部分支持MySQL体系数据库:Mariadb 10.x、PolarDB-X

主要特性

高性能与低延迟:Canal 1.1.x版本进行了深度优化,性能提升高达150%。
Prometheus监控:原生集成Prometheus监控,便于系统健康状况的跟踪。
消息系统集成:直接支持Kafka、RocketMQ消息投递,便于与大数据平台对接。
云数据库支持:无缝对接阿里云RDS,解决了自动主备切换及离线Binlog解析问题。
Docker部署:提供Docker镜像,简化部署流程。
WebUI管理:Canal-Admin工程引入WebUI,实现动态配置、任务管理与日志查看等功能。

2.Canal原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志(binary log), 日志中的记录叫做二进制日志事件(binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映到它自己的数据

Canal工作原理

Canal巧妙地模拟了MySQL主从复制的机制。具体而言:

  1. 伪装为MySQL Slave:canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  2. 获取Binary Log:MySQL Master接收到请求后,开始推送Binary Log给Canal。
  3. 解析日志事件:Canal解析接收到的Binary Log,将数据变更信息转换为易于处理的结构化数据。canal 解析 binary log 对象(原始为 byte 流),转换为json格式。
  4. 数据同步:Canal 客户端通过 TCP 协议或 MQ 形式监听 Canal 服务端,同步数据到 ES、KFK、HBase、RocketMQ、Pulsar等。

优点: 可以完全和业务代码解耦,增量日志订阅。

缺点:实时性不高,订阅mysql日志,DB中数据事务成功后,开始同步至canal。

Canal总体架构

对应这些软件包:

  1. deployer包:即canal server,负责从master库同步binlog,将数据通过tcp的方式同步给Adapter适配器,经过Adapter适配同步给目标库;通过MQ将数据同步给canal client或目标库,canal client也可以再同步给目标库。
    1. server包官方介绍:https://github.com/alibaba/canal/wiki/DevGuide
  2. adapter包:给目标库同步数据。目前支持:Hbase、RDB、ES。
    1. adapter包官方介绍:https://github.com/alibaba/canal/wiki/ClientAdapter
  3. admin包:canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide
    1. admin包官方介绍:https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
  4. canal.client:消费server数据
    1. Canal设计了client-server架构,支持多种语言客户端通过protobuf 3.0协议与之交互,官方及社区提供了以下客户端:
    2. Java客户端:ClientExample
    3. C#客户端:CanalSharp
    4. Go客户端:canal-go
    5. Python客户端:canal-python
    6. PHP客户端:canal-php
    7. Rust客户端:canal-rs
    8. Node.js客户端:canal-nodejs
  5. 除了基础功能,Canal还支持丰富的进阶特性和周边生态工具,如:

    Canal-Admin:提供Web界面管理Canal实例,实现配置、监控和运维的可视化操作。
    canal2sql:一个工具项目,能根据Binlog生成SQL,便于数据迁移或备份。
    Otter:Canal的消费端开源项目,用于数据同步与数据集成。

Canal高可用架构

整个 HA 机制的控制主要是依赖了zookeeper的两个特性:watcher、EPHEMERAL节点。canal的 HA 机制实现分为两部分,canal server 和 canal client分别有对应的实现。

canal server实现流程如下:

  • 1、canal server 要启动某个 canal instance 时都先向 zookeeper 进行一次尝试启动判断 (实现:创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
  • 2、创建 zookeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
  • 3、一旦 zookeeper 发现 canal server A 创建的节点消失后,立即通知其他的 canal server 再次进行步骤1的操作,重新选出一个 canal server 启动instance。
  • 4、canal client 每次进行connect时,会首先向 zookeeper 询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。

依赖:

  • JDK1.8
  • MySQL:用于canal-admin存储配置和节点等相关数据
  • Zookeeper

3.Canal实战

准备环境

名称版本
MySQL5.7
elasticsearch7.17.9
canal1.1.7
jdk1.8
  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

    下载 canal

本次操作只需要下载admin、deployer两个包。

下载后解压即可用,当然,要运行起来的话,需要配置jdk1.8,这里就不再多说。

创建admin的数据库:conf/canal_manager.sql

启动admin前,先完成建库和见表(包括admin的默认登录账号密码  admin/123456),否则启动报错。主要是如下六张表。

修改配置文件

修改server的配置文件:conf/canal.properties 和conf/example/instance.properties

切记数据库和配置文件中的密码要一致。

修改admin的配置文件:conf/application.yml

分别启动admin和server

启动server:bin/startup.bat

启动admin:bin/startup.bat

登录admin:http://192.168.1.4:8089/#/login?redirect=%2Fdashboard

默认账号密码:admin/123456

配置java客户端

引入依赖

        <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency>

客户端代码

package com.zxx.study.base.canal;import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.SneakyThrows;import java.net.InetSocketAddress;
import java.util.List;/*** @author zhouxx* @create 2024-08-01 21:59*/
public class CanalClient  {@SneakyThrowspublic static void main(String[] args) {try {// 创建canal客户端,单链接模式CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.4",11111), "example", "", "");// 创建连接canalConnector.connect();while (true) {// 订阅数据库// canalConnector.subscribe("mall");// 获取数据Message message = canalConnector.get(100);// 获取Entry集合List<CanalEntry.Entry> entries = message.getEntries();// 判断集合是否为空,如果为空,则等待一会继续拉取数据if (entries.size() <= 0) {
//                System.out.println("当次抓取没有数据,休息一会。。。。。。");Thread.sleep(1000);} else {// 遍历entries,单条解析for (CanalEntry.Entry entry : entries) {//1.获取表名String tableName = entry.getHeader().getTableName();//2.获取类型CanalEntry.EntryType entryType = entry.getEntryType();//3.获取序列化后的数据ByteString storeValue = entry.getStoreValue();//4.判断当前entryType类型是否为ROWDATAif (CanalEntry.EntryType.ROWDATA.equals(entryType)) {//5.反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);//6.获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();//7.获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();//8.遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {JSONObject beforeData = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}//数据打印System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);/*** Table:test_user,EventType:UPDATE,Before:{"name":"1111221","id":"1"},After:{"name":"11tom","id":"1"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx","id":"17"}* Table:test_user,EventType:INSERT,Before:{},After:{"name":"zhouxx1","id":"18"}* */}}}}}}catch (Exception e){e.printStackTrace();}}}

运行效果:在数据库里面忝删改查,均可以在客户端中打印出来

cancel框架同步mysql数据到kafka

参考:cancel框架同步mysql数据到kafka_mysql cancel-CSDN博客

利用canal进行MySQL到ES的数据实时同步

参考:利用canal进行MySQL到ES的数据实时同步_canal es-CSDN博客

结合上图,至此,场景一和场景二中的问题,均可以解决。

相关文章:

实战:MySQL数据同步神器之Canal

1.概叙 场景一&#xff1a;数据增量实时同步 项目中业务数据量比较大&#xff0c;每类业务表都达到千万级别&#xff0c;虽然做了分库分表&#xff0c;每张表数据控制在300W以下&#xff0c;但是效率还是达不到要求&#xff0c;为了提高查询效率&#xff0c;打算使用ES进行数…...

5.6软件工程-运维

运维 系统转换系统维护系统评价练习题 系统转换 新老系统的转换 系统转换是指&#xff1a;新系统开发完毕&#xff0c;投入运行&#xff0c;取代现有系统的过程&#xff0c;需要考虑多方面的问题&#xff0c;以实现与老系统的交接&#xff0c;有一下三种转换计划&#xff1a; …...

在JavaScript中如何确保构造函数只被new调用

构造函数是一个特殊的函数&#xff0c;用于初始化一个新创建的对象。它是在创建对象时自动调用的。构造函数通常用于为对象的属性赋值&#xff0c;或者执行其他必要的设置。 使用函数名大写字母开头&#xff0c;这是一种命名约定&#xff0c;用于区分构造函数和普通函数。如何…...

【数据结构算法经典题目刨析(c语言)】反转链表(图文详解)

&#x1f493; 博客主页&#xff1a;C-SDN花园GGbond ⏩ 文章专栏&#xff1a;数据结构经典题目刨析(c语言) 目录 一、题目描述 二、思路分析 三、代码实现 一、题目描述&#xff1a; 二、思路分析 &#xff1a; 通过三个指针n1,n2,n3来实现链表的反转 1.首先初始化 n1为…...

机器学习之争:Python vs R,谁更胜一筹?

一、引言 随着人工智能和大数据的迅速发展&#xff0c;机器学习已成为现代科技的重要组成部分。在医疗、金融、零售、制造等多个领域&#xff0c;机器学习技术的应用无处不在。从数据分析到预测建模&#xff0c;再到深度学习&#xff0c;机器学习正在改变我们的工作和生活方式…...

Vulnhub靶机:JANGOW_ 1.0.1

目录 前言&#xff1a; 一、安装虚拟机Jangow&#xff1a;1.0.1靶机 二、Web部分 前言&#xff1a; 难度&#xff1a;简单&#xff0c;本文使用VirtualBox打开&#xff0c;下载地址&#xff1a; https://download.vulnhub.com/jangow/jangow-01-1.0.1.ova 一、安装虚拟机J…...

Python脚本实现USB自动复制文件

USB驱动器作为常见的数据存储设备&#xff0c;经常用于数据传输和备份。 然而&#xff0c;我们在手动处理文件复制可能效率低下且容易出错。 因此&#xff0c;我们可以利用Python编写脚本来自动化这一过程&#xff0c;提高效率和数据安全性。 准备工作 首先&#xff0c;我们需…...

【C++学习第19天】最小生成树(对应无向图)

一、最小生成树 二、代码 1、Prim算法 #include <cstring> #include <iostream> #include <algorithm>using namespace std;const int N 510, INF 0x3f3f3f3f;int n, m; int g[N][N]; int dist[N]; bool st[N];int prim() {memset(dist, 0x3f, sizeof di…...

第一个 Flask 项目

第一个 Flask 项目 安装环境创建项目启动程序访问项目参数说明Flask对象的初始化参数app.run()参数 应用程序配置参数使用 Flask 的 config.from_object() 方法使用 Flask 的 config.from_pyfile() 方法使用 Flask 的 config.from_envvar() 方法步骤 1: 设置环境变量步骤 2: 编…...

利用 Angular 发挥环境的力量

一.介绍 您是否曾想过如何在不同的环境中为同一应用设置不同的颜色、标题或 API 调用&#xff1f;可以肯定的是&#xff0c;生产 API 和测试 API 是不同的&#xff0c;应谨慎使用。部署时&#xff0c;我们不会在项目的所有地方手动更改所有 API 调用。不应这样做&#xff0c;因…...

Vue3+TypeScript+printjs 实现标签批量打印功能

前言&#xff1a;临时性需求没怎么接触过前端&#xff0c;代码实现有问题及优化点希望大佬可以留言告知一下 开发工具&#xff1a;VS CODE 界面开发&#xff1a;Vue3TypeScriptElementPlus 打印组件&#xff1a;Print-JS 前端打印入口图&#xff1a; 标签页面&#xff1a; …...

微信文件如何直接打印及打印功能在哪里设置?

在数字化时代&#xff0c;打印需求依旧不可或缺&#xff0c;但传统打印店的高昂价格和不便操作常常让人头疼。幸运的是&#xff0c;琢贝打印作为一款集便捷、经济、高效于一体的网上打印平台&#xff0c;正逐渐成为众多用户的首选。特别是通过微信小程序下单&#xff0c;更是让…...

dataX -20240804-master分支

1、相关报错 Error: java.io.IOException: java.lang.RuntimeException: ORC split generation failed with exception: org.apache.orc.impl.SchemaEvolution$IllegalEvolutionException: ORC does not support type conversion from file type struct<nanos:int> (10)…...

【网络】传输层

传输层 一、预备知识1、端口号1、端口号范围划分2、知名端口号3、两个问题4、netstat && iostate5、pidof6、谈下面协议始终铭记两个问题 二、UDP协议&#xff08;简单&#xff09;1、UDP协议端格式2、UDP的特点3、面向数据报4、UDP缓冲区 三、TCP协议&#xff08;重点…...

学生管理系统之更新和删除、筛选

学生管理系统之更新和删除 建立新的窗口 添加组件 进行布局 使用Widget把二个放在一块,作为一列,然后全选进行栅格布局,最后添加弹簧进行微调。 编写增加的槽函数 在主函数中调用对话框...

教您一键批量下载拼多多批发图片信息,节省时间

图片是电商的核心展示手段&#xff0c;高质量、吸引人的图片能显著提升商品吸引力&#xff0c;增强用户体验&#xff0c;促进购买决策。良好的视觉呈现有助于品牌形象的塑造&#xff0c;提高转化率和客户满意度&#xff0c;对电商平台的流量和销售业绩具有直接影响。 使用图快…...

基于微信小程序的微课堂笔记的设计与实现(源码+论文+部署讲解等)

博主介绍&#xff1a;✌全网粉丝10W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术栈介绍&#xff1a;我是程序员阿龙&#xff…...

去噪扩散恢复模型

去噪扩散恢复模型 Bahjat Kawar 计算机科学系 以色列海法理工学院 bahjat.kawarcs.technion.ac.il Michael Elad 计算机科学系 以色列海法理工学院 eladcs.technion.ac.il Stefano Ermon 计算机科学系 美国加利福尼亚州斯坦福大学 ermoncs.stanford.edu …...

Stable Diffusion 官方模型V1.5版本下载

模型描述 Stable Diffusion的官方模型更适合绘制偏写实的风格&#xff0c;如果您想绘制二次元之类的风格&#xff0c;可以考虑下载本站的其它模型。 安装方法 将模型下载后&#xff0c;将会得到一个名为****.ckpt格式的文件&#xff0c;将该文件剪切至你的Stable Diffusion本…...

【算法】双指针-OJ题详解1

双指针-OJ题 移动零&#xff08;点击跳转&#xff09;原理讲解代码实现 复写零&#xff08;点击跳转&#xff09;原理讲解代码实现 快乐数&#xff08;点击跳转&#xff09;原理讲解代码实现 盛最多水的容器&#xff08;点击跳转&#xff09;原理讲解代码实现 有效三角形的个数…...

7.4.分块查找

一.分块查找的算法思想&#xff1a; 1.实例&#xff1a; 以上述图片的顺序表为例&#xff0c; 该顺序表的数据元素从整体来看是乱序的&#xff0c;但如果把这些数据元素分成一块一块的小区间&#xff0c; 第一个区间[0,1]索引上的数据元素都是小于等于10的&#xff0c; 第二…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

毫米波雷达基础理论(3D+4D)

3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文&#xff1a; 一文入门汽车毫米波雷达基本原理 &#xff1a;https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...

高考志愿填报管理系统---开发介绍

高考志愿填报管理系统是一款专为教育机构、学校和教师设计的学生信息管理和志愿填报辅助平台。系统基于Django框架开发&#xff0c;采用现代化的Web技术&#xff0c;为教育工作者提供高效、安全、便捷的学生管理解决方案。 ## &#x1f4cb; 系统概述 ### &#x1f3af; 系统定…...