当前位置: 首页 > article >正文

Canal - 数据同步

一、简介1、介绍Canal 是用 Java 开发的基于数据库增量日志解析提供增量数据订阅消费的中间件。 目前Canal 主要支持了MySQL的Binlog解析解析完成后利用Canal Client来处理获得相关数据。数据库同步需要阿里的Otter中间件基于Canal。GitHub地址https://github.com/alibaba/canal2、MySQL的binlog1) 什么是binlogMySQL 的二进制日志可以说MySQL最重要的日志了它记录了所有的DDL和DML(除了数据查询语句)语句以事件形式记录还包含语句所执行的消耗的时间MySQL的二进制日志是事务安全型的。一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:其一MySQL Replication在Master端开启binlogMaster 把它的二进制日志传递给slaves 来达到master-slave 数据一致的目的。其二数据恢复通过使用mysqlbinlog工具来使恢复数据。二进制日志包括两类文件二进制日志索引文件文件名后缀为.index用于记录所有的二进制文件二进制日志文件文件名后缀为.00000*记录数据库所有的DDL和DML(除了数据查询语句)语句事件。2) binlog的开启MySQL配置文件的位置Linux: /etc/my.cnf 如果/etc目录下没有可以通过locate my.cnf查找位置Windows: \my.ini在mysql的配置文件下,修改配置 在[mysqld]区块添加log-binmysql-bin这个表示binlog日志的前缀是mysql-bin以后生成的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成每次mysql重启或者到达单个文件大小的阈值时新生一个 文件按顺序编号。3) binlog的分类设置mysql的binlog格式有三种分别是STATEMENT,MIXED,ROW。 在配置文件中可以选择配置binlog_format statement|mixed|row三种格式的区别a、statement 语句级binlog会记录每次一执行写操作的语句。 相对row模式节省空间但是可能产生不一致性比如 update test set create_datenow(); 如果用binlog日志进行恢复由于执行时间不同可能产生的数据就不同。优点 节省空间 缺点 有可能造成数据不一致。b、row 行级binlog会记录每次操作后每行记录的变化。优点保持数据的绝对一致性。因为不管sql是什么引用了什么函数他只记录执行后的效果。 缺点占用较大空间。c、mixed 混合级别statement的升级版一定程度上解决了statement模式因为一些情况而造成的数据不一致问题。默认还是statement在某些情况下譬如当函数中包含 UUID() 时包含 AUTO_INCREMENT 字段的表被更新时执行 INSERT DELAYED 语句时用 UDF 时会按照 ROW的方式进行处理优点节省空间同时兼顾了一定的一致性。 缺点还有些极个别情况依旧会造成不一致另外statement和mixed对于需要对 binlog监控的情况都不方便。3、工作原理就是把自己伪装成Slave从Master复制数据。二、安装1、MySQL环境准备1Binlog设置修改mysql的配置文件开启MySQL Binlog设置sudo vim /etc/my.cnf在[mysqld]模块下添加一下内容[mysqld]server_id1log-binmysql-binbinlog_formatrow# 需要监控的库binlog-do-dbtest_maxwell并重启Mysql服务sudo systemctl restart mysqld登录mysql并查看是否修改完成mysql -uroot -p123456mysql show variables like %binlog%;查看下列属性binlog_format | ROWwin2查看binlog文件进入/var/lib/mysql目录查看MySQL生成的binlog文件注MySQL生成的binlog文件初始大小一定是154字节前缀是log-bin参数配置的后缀是默认从.000001然后依次递增。除了binlog文件文件以外MySQL还会额外生产一个.index索引文件用来记录当前使用的binlog文件。3创建账号分配一个账号可以操作该数据库GRANT ALL ON *.* TO canal% IDENTIFIED BY 123456;GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO canal%;刷新mysql表权限flush privileges;2、安装Canal1上传并解压注意canal解压后是分散的我们在指定解压目录的时候需要将canal指定上mkdir /opt/module/canaltar -zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal/2修改配置文件1修改canal.properties的配置文件......canal.port 11111......# tcp, kafka, RocketMQcanal.serverMode tcp......########################################################## destinations ##############################################################canal.destinations example说明canal端口号默认就是11111修改canal的输出model默认tcp改为输出到kafka多实例配置如果创建多个实例一个canal服务中可以有多个instanceconf/下的每一个example即是一个实例每个实例下面都有独立的配置文件。默认只有一个实例example如果需要多个实例处理不同的MySQL数据的话直接拷贝出多个example并对其重新命名命名和配置文件中指定的名称一致修改 canal.properties中的canal.destinations实例1实例2实例3。2修改instance.properties配置文件修改conf/example目录下的配置文件如果是多个实例可以配置多个配置文件。## mysql serverId , v1.0.26 will autoGencanal.instance.mysql.slaveId10canal.instance.master.address192.168.10.139:3306......# username/passwordcanal.instance.dbUsernamerootcanal.instance.dbPasswordrootcanal.instance.connectionCharset UTF-8canal.instance.defaultDatabaseName test# enable druid Decrypt database passwordcanal.instance.enableDruidfalse3启动./bin/startup.sh三、实时监控1、TCP监控1数据结构2创建maven项目3添加依赖?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdcom.hk/groupId artifactIdhadoopDemo/artifactId version1.0-SNAPSHOT/version /parent artifactIdcanalDemo/artifactId properties maven.compiler.source8/maven.compiler.source maven.compiler.target8/maven.compiler.target project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties dependencies dependency groupIdcom.alibaba.otter/groupId artifactIdcanal.client/artifactId version1.1.2/version /dependency dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version2.4.1/version /dependency /dependencies /project4编写客户端代码package com.hk; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class Main { public static void main(String[] args) throws InvalidProtocolBufferException { // 1.获取canal连接对象 CanalConnector canalConnector CanalConnectors.newSingleConnector(new InetSocketAddress(hd01, 11111), example, , ); // 循环监听 while (true) { // 获取连接 canalConnector.connect(); // 要监听的数据库和表 canalConnector.subscribe(test_maxwell.*); // 获取message一次获取10条修改 Message message canalConnector.get(10); // 获取entry ListCanalEntry.Entry entries message.getEntries(); // 遍历entry if(entries.size() 0) { try { System.out.println(暂无数据修改......); Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } } else { for (CanalEntry.Entry entry : entries) { // 获取表名 String tableName entry.getHeader().getTableName(); // 获取entry类型 CanalEntry.EntryType entryType entry.getEntryType(); // 判断entryType是否为ROWDATA if(entryType CanalEntry.EntryType.ROWDATA) { // 序列化数据 ByteString storeValue entry.getStoreValue(); // 反序列化 CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(storeValue); // 获取事件类型 CanalEntry.EventType eventType rowChange.getEventType(); // 获取具体的数据 ListCanalEntry.RowData rowDatasList rowChange.getRowDatasList(); // 打印数据 for (CanalEntry.RowData rowData : rowDatasList) { // 获取修改前的数据 ListCanalEntry.Column beforeColumnsList rowData.getBeforeColumnsList(); JSONObject beforeData new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } // 获取修改后的数据 ListCanalEntry.Column afterColumnsList rowData.getAfterColumnsList(); JSONObject afterData new JSONObject(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println(TableName: tableName ,EventType: eventType ,Before: beforeData ,After: afterData); } } } } } } }2、发送到Kafka1启动Kafka和zookeeper2修改配置文件修改canal.properties中canal的输出model默认tcp改为输出到kafka# tcp, kafka, RocketMQcanal.serverMode kafka......########################################################### MQ ###############################################################canal.mq.servers hd01:6667,hd02:6667,hd03:6667修改instance.properties输出到Kafka的主题以及分区数# mq configcanal.mq.topiccanal_testcanal.mq.partitionsNum1# hash partition config#canal.mq.partition0#canal.mq.partitionHashmytest.person:id,mytest.role:id注意默认还是输出到指定Kafka主题的一个kafka分区因为多个分区并行可能会打 乱binlog的顺序如果要提高并行度首先设置kafka的分区数1,然后设置 canal.mq.partitionHash属性3启动Canalbin/startup.sh4测试向MySQL中插入数据后查看消费者控制台插入数据INSERT INTO test VALUES(1001,zhangsan),(1002,lisi);Kafka 消费者控制台{data:[{id:1001,name:zhangsan},{id:1002,name:lisi}],database:test-maxwwell,es:1639360729000,id:1,isDdl:false,mysqlType:{id: varchar(255),name:varchar(255)},old:null,sql:,sqlType:{id:12,name:12,table:test,ts:1639361038454,type:INSERT}

相关文章:

Canal - 数据同步

一、简介 1、介绍 Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前Canal 主要支持了MySQL的Binlog解析,解析完成后利用Canal Client来处理获得相关数据。(数据库同步需要阿里的Otter中间件&#xf…...

基于 Qt C++ 开发一套集成阿里通义千问大模型的多模态智能应用终端

你想要基于 Qt C++ 开发一套**集成阿里通义千问大模型的多模态智能应用终端**,支持**图文音视频理解**,适配电商客服、工业质检、智能创作等阿里生态全场景,并具备高并发、高稳定性(日均调用超10亿次级别的架构设计)。 下面我给你一套**可直接落地的 Qt + 通义千问多模态…...

C#事务处理最佳实践:别再让“主表存了、明细丢了”的破事发生

大家好,我是刚子。做业务开发的时候,经常遇到一个操作要同时更新好几张表的情况。比如保存一张单据,既要写主表,又要写明细,还得写关联条件。这种场景下,要么全部成功,要么全部失败,…...

YOLO26 改进、魔改| 通道-空间注意力与密集多尺度特征融合模块CSDF,通过融合通道注意力、空间注意力和多尺度空洞卷积,增强特征表示能力,提升模型对复杂场景下多尺度目标的识别与分割性能。

遥感图像语义分割任务中面临的三大核心挑战:尺度变化剧烈、类间光谱相似性高、以及空间上下文复杂。传统的卷积神经网络虽能提取局部特征,但其感受野有限,难以建模长距离依赖与多尺度目标;而基于Transformer的方法虽能捕获全局信息…...

Nano-Banana Studio实战案例:输入‘Backpack‘生成极简纯白风平铺拆解图

Nano-Banana Studio实战案例:输入Backpack生成极简纯白风平铺拆解图 1. 案例背景与工具介绍 今天我要分享一个特别实用的AI设计工具实战案例——使用Nano-Banana Studio一键生成背包的极简纯白风格平铺拆解图。 Nano-Banana Studio是一个基于Stable Diffusion XL…...

鱼音频生成 API 集成指南

在这篇文章中,我们将介绍如何集成鱼音频生成 API,该 API 能够通过输入提示词来克隆您的声音。这项技术的应用场景包括语音合成、自动化语音助手、以及任何需要个性化语音输出的应用。 环境准备 在使用鱼音频生成 API 之前,您需要先申请相应…...

EcomGPT-7B多语言模型实战:用同一模型服务中国工厂(中文)与海外买家(英文)

EcomGPT-7B多语言模型实战:用同一模型服务中国工厂(中文)与海外买家(英文) 如果你在做跨境电商,一定遇到过这样的麻烦:工厂给的商品信息是中文的,一堆参数混在一起,而你…...

Java抽象类深度解析(面试必备)

抽象类是Java面试中高频考点,理解它的本质与使用场景,能让你在面试中脱颖而出。本篇文章将从概念、原理、示例到面试高频问题,全方位解析抽象类。 ⏱ 30秒快速回答 抽象类是使用 abstract 修饰的类,不能被实例化,可以…...

测试功能指南 富文本

你好!看起来你输入了“test”,是在测试功能吗?😊 如果有什么具体问题、需要帮助的地方,或者想了解某方面的信息(比如学习、生活、科技、健康等),欢迎随时告诉我,我很乐意…...

Docling Studio 开发札记

当我开始构建 Docling Studio 时,目标很简单:为开发者提供一种可视化方式来检查 Docling 从文档中提取的内容。边界框、分块、元数据——你需要看到才能信任流水线的那些东西。 但任何构建过 RAG 系统的人都知道,真正的问题不在于提取。而在…...

软件可用性管理中的MTTR优化

软件可用性管理中的MTTR优化:提升系统可靠性的关键策略 在数字化时代,软件系统的可用性直接影响用户体验和业务连续性。平均修复时间(MTTR)是衡量系统可靠性的核心指标之一,它反映了从故障发生到问题解决所需的平均时…...

曦智科技开启招股:最高估值160亿港元 4月28日上市 阿里高瓴淡马锡加持

雷递网 雷建平 4月20日上海曦智科技股份有限公司(简称:“曦智科技”,股票代码:“01879”)今日开启招股,准备2026年4月28日在港交所上市。曦智科技发行区间为166.60港元至183.2港元,计划发售约13…...

从‘欠拟合’到‘过拟合’:手把手用AdaBoostRegressor可视化理解集成学习的拟合过程

从‘欠拟合’到‘过拟合’:用AdaBoostRegressor可视化集成学习的拟合演变 当第一次接触机器学习中的集成学习概念时,很多人会被"弱学习器组合成强学习器"的说法所困惑。究竟这些弱学习器是如何协同工作的?为什么增加学习器数量有时…...

PyQt5安装及学习

学习目标 因为毕设需要,所以今天网上学习一下。做一个建议界面,或者后续可以借鉴ai做一下。 pyqt5安装 (yolov8) PS E:\pycharm\2024.11.28open3d> pip install pyqt5-tools -i https://pypi.tuna.tsinghua.edu.cn/simple Looking in indexes: http…...

告别花屏!用Arduino TFT_eSPI库驱动SPI LCD显示中文的保姆级避坑指南

告别花屏!用Arduino TFT_eSPI库驱动SPI LCD显示中文的保姆级避坑指南 第一次点亮SPI接口的LCD屏幕时,那种兴奋感就像打开了新世界的大门。但随之而来的花屏、乱码、内存溢出等问题,又让人瞬间跌入谷底。作为过来人,我完全理解这种…...

计算机网络习题及答案

仅供参考第一章 概述1、计算机网络可以向用户提供哪些服务?答:基于互联网的连通性和共享,计算机网络可以向用户提供:①信息交换服务,如电子邮件(收发信息)、文件传输(上传和下载大文…...

保姆级教程:用Python串口和GBK编码玩转SYN6288 TTS模块(附完整代码)

Python全平台串口控制SYN6288语音合成模块实战指南 第一次听到SYN6288发出清晰的中文语音时,那种"机器开口说话"的奇妙感至今难忘。作为一款性价比极高的中文TTS模块,SYN6288通过简单的串口指令就能实现高质量的语音合成,特别适合智…...

深信服AC1000-B1200到手第一步:从开箱到激活上网的保姆级图文指南

深信服AC1000-B1200设备开箱配置全流程实战手册 当你第一次拿到深信服AC1000-B1200这台企业级网络设备时,可能会被它专业的接口阵列和指示灯搞得有些不知所措。作为一款广泛应用于企业网络边界的安全网关设备,它的初始配置确实需要一些专业指导。本文将带…...

C# 14 AOT编译Dify客户端:从.NET 8到.NET 9 Preview 5,实测启动速度提升92%的5步极简流程

第一章:C# 14 AOT编译Dify客户端:技术演进与价值定位C# 14 引入的原生AOT(Ahead-of-Time)编译能力,标志着.NET平台在云原生与边缘计算场景中迈出了关键一步。当这一能力被应用于构建Dify服务的轻量级客户端时&#xff…...

终极指南:如何用thermalmonitordDisabler解锁iPhone性能限制

终极指南:如何用thermalmonitordDisabler解锁iPhone性能限制 【免费下载链接】thermalmonitordDisabler A tool used to disable iOS daemons. 项目地址: https://gitcode.com/gh_mirrors/th/thermalmonitordDisabler 你是否曾在玩游戏时突然卡顿&#xff1f…...

理解「响应式编程」在Spring WebFlux中的应用

响应式编程在现代高并发系统中扮演着重要角色,而Spring WebFlux作为Spring生态中的响应式框架,为开发者提供了处理异步非阻塞请求的强大工具。理解响应式编程在WebFlux中的应用,不仅能提升系统性能,还能优化资源利用率。本文将围绕…...

如何免费解锁八大网盘全速下载:2025年终极直链下载助手完整指南

如何免费解锁八大网盘全速下载:2025年终极直链下载助手完整指南 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移动云…...

解决Leaflet加载天地图的最大痛点:突破17级缩放限制的两种实战方案

突破Leaflet中天地图17级缩放限制的工程实践 第一次在项目中集成天地图时,那种流畅的加载体验让人印象深刻——直到用户突然问:"为什么这个区域无法继续放大了?"这才发现Leaflet默认的17级缩放限制成了项目交付的绊脚石。作为国内主…...

别再纠结无损格式了!手把手教你用Foobar2000搭配ASIO/WASAPI,榨干Windows电脑的HiFi潜力

解锁PC音质天花板:Foobar2000ASIO/WASAPI实战指南 当大多数人还在纠结是否要花大价钱升级音响设备时,Windows电脑其实隐藏着未被发掘的音频潜力。通过正确的软件配置,你的普通PC也能释放出接近专业级音频设备的音质表现。本文将带你深入探索如…...

别只盯着性能!从RapidJSON和cJSON的源码设计,聊聊C/C++ JSON库的‘优雅’与‘实用’

从RapidJSON与cJSON的源码哲学,解码C/C JSON库的设计艺术 在技术选型时,我们常常被性能指标和功能列表所吸引,却忽略了背后更为重要的设计哲学。RapidJSON和cJSON作为C/C领域最具代表性的两个JSON库,它们的差异远不止于性能数据表…...

ODM(原始设计制造商)模式,本质上是“赚辛苦钱

结合你掌握的信息(ODM模式、大小周、整机等),以下是从职业发展、行业环境、公司治理三个维度的批判性分析与建议: 1. 业务模式的“护城河”与“天花板” (ODM vs. 自有品牌) 批判性分析:ODM(原始设计制造商)模式,本质上是“赚辛苦钱”。虽然公司想做“整机”,但如果没…...

2026跨行业通吃的经管类证书。

先说明一下,这篇文章是我自己这几年在经管专业学习和求职过程中接触到的一些信息整理,每个人情况不同,我说的不一定对,仅供你参考。经管类专业有个特点——看起来就业面很广,银行、互联网、快消、咨询、公务员都能试试…...

Linux环境搭建及基础指令

Xshell 登录主机打开Xshell后, 输入指令 ssh root[自己云服务器的公网地址]输入登录名(一般就是root)及密码后, 看到以上提示, 就说明登陆成功啦!Xshell下的复制粘贴复制: Ctrll Fn insert粘贴: shift Fn insertLinux下的基本指令在学习具体指令前, 得先创建一个框架, 才能…...

杰理SDK开发-杰理之家-实现清除手机APP用户配置功能、重置参数

前言现在为止也开发了许多杰理TWS蓝牙耳机、音响项目SDK的案子,在调试案子时不断的向前辈们学习到了很多关于蓝牙音响、蓝牙TWS耳机专业的知识。想在这里做一个学习汇总,方便各位同行和对杰理芯片SDK感兴趣的小伙伴们学习;本章详细讲解杰理SD…...

Dify 2026缓存机制升级全解析,为什么你的Agent响应慢了3.8倍?(附12个真实压测对比数据)

第一章:Dify 2026缓存机制升级的核心动因与架构演进 Dify 2026 的缓存机制重构并非简单性能调优,而是面向多租户大模型应用平台在高并发、低延迟、强一致性场景下的系统性演进。随着用户侧 RAG 流程平均响应时间突破 850ms,以及 LLM 编排链路…...