当前位置: 首页 > news >正文

Maxwell 底层原理 详解

        Maxwell 是一个 MySQL 数据库的增量数据捕获(CDC, Change Data Capture)工具,它通过读取 MySQL 的 binlog(Binary Log)来捕获数据变化,并将这些变化实时地发送到如 Kafka、Kinesis、RabbitMQ 或其他输出端。Maxwell 允许用户捕捉到 INSERT、UPDATE、DELETE 等操作的记录,并将其以 JSON 格式发送到下游系统,用于数据同步、分析、实时监控等应用场景。

        要详细解释 Maxwell 的底层原理及源代码,我们需要从 MySQL binlog 的工作机制、Maxwell 如何解析 binlog、内部架构的各个核心组件、事件处理机制等多方面进行深入解析。

1. MySQL binlog 工作原理

        MySQL 的 binlog 是记录数据库事务性和非事务性数据变化的二进制日志文件,所有的 INSERT、UPDATE、DELETE 以及对表结构的更改操作(如 ALTER TABLE)都会写入 binlog 中。这使得 binlog 成为数据库增量数据捕获的重要来源。

binlog 具有两种格式:

  • ROW 格式:记录每一行的数据变化,捕捉到行级别的增删改操作。
  • STATEMENT 格式:记录 SQL 语句本身的执行。
  • MIXED 格式:结合了 ROW 和 STATEMENT 两种格式。

➢ 三种格式的区别:

◼ statement

语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如update test set create_date=now();如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。优点:  节省空间   缺点:  有可能造成数据不一致。

◼ row

行级,  binlog 会记录每次操作后每行记录的变化。优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。缺点:占用较大空间。

◼ mixed

混合级别,statement  的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题。默认还是 statement,在某些情况下,譬如:当函数中包含  UUID()  时;包含  AUTO_INCREMENT  字段的表被更新时;执行  INSERT DELAYED  语句时;用  UDF  时;会按照  ROW 的方式进行处理  优点:节省空间,同时兼顾了一定的一致性。缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 监控的情况都不方便。

        Maxwell 依赖的是 ROW 格式,因为 ROW 格式可以直接获取到数据变化的细节,如具体哪一行数据发生了修改,这对于实时的数据同步和分析非常关键。

2. Maxwell 架构与工作流程

Maxwell 的架构可以概括为以下几个部分:

  1. Binlog Position 监控:Maxwell 会从 MySQL 的 binlog 文件中读取增量变化事件,且会记录当前读取到的 binlog 文件的位置(position),以保证在 Maxwell 重启后能够继续从上次的位置读取。
  2. Binlog 解析:Maxwell 通过解析 MySQL 的 binlog 文件来获取数据的变化详情(包括表名、列值、操作类型等)。
  3. 事件处理器(Event Processor):解析后的 binlog 数据会通过 Maxwell 的事件处理器进行处理,并转换为 JSON 格式。
  4. 输出适配器(Producer Adapter):Maxwell 支持将处理后的数据发送到多个目标输出(如 Kafka、Kinesis 等)。
2.1 核心组件

Maxwell 的底层工作机制由以下几个核心组件协同实现:

  1. BinlogConnectorReplicator

    • 负责与 MySQL 进行通信并获取 binlog 数据。
    • 使用 MySQL Binary Log Client Library 实现 binlog 的读取和消费。Maxwell 通过 BinlogConnectorReplicator 连接 MySQL,获取实时的 binlog 数据。
  2. BinlogParser

    • 负责将二进制格式的 binlog 转换为可理解的事件对象。
    • 它解析 ROW 格式的 binlog 并将其转换为 Maxwell 可以处理的内部事件对象(如 Insert、Update、Delete 事件)。
  3. MaxwellContext

    • 管理 Maxwell 的运行状态,包括当前的 binlog position、错误处理、断点续传等。
    • MaxwellContext 还负责维护 Maxwell 的元数据(如表结构缓存、上次处理的 binlog 位置等),以保证数据的一致性和容错性。
  4. MaxwellReplicator

    MaxwellReplicator 是系统的核心执行器,它从 BinlogConnectorReplicator 获取 binlog 数据,并通过 BinlogParser 解析这些数据,生成 RowMap 对象(用于描述数据变化)。
  5. RowMap

    RowMap 是 Maxwell 对数据变更的内部抽象,它将 binlog 中的行变化转化为键值对的形式,包含了表名、数据库名、操作类型(insert、update、delete)以及具体的行数据。
  6. Producer

    • Producer 是事件发布器,它负责将处理过的事件推送到外部系统(如 Kafka、Kinesis、文件等)。
    • Producer 将 RowMap 转换为 JSON 格式并将其发送至指定的输出端。
2.2 事件流处理流程

Maxwell 的数据流处理可以分为以下几个步骤:

  1. 读取 binlog:Maxwell 通过 BinlogConnectorReplicator 从 MySQL binlog 中读取最新的事件。
  2. 解析 binlogBinlogParser 将 binlog 的二进制数据解析为内部事件对象(如 InsertUpdateDelete 事件)。
  3. 生成事件对象:解析后的 binlog 事件会被封装为 RowMap 对象,RowMap 中包含了数据库名、表名、操作类型、变更的数据行内容。
  4. 事件发布:通过 Producer,Maxwell 将 RowMap 转换为 JSON 格式,并发送到外部系统,如 Kafka、Kinesis 等。

    格式数据举例


    json 字段的说明:

    字段

    解释

    database

    变更数据所属的数据库

    table

    表更数据所属的表

    type

    数据变更类型

    ts

    数据变更发生的时间

    xid

    事务id

    commit

    事务提交标志,可用于重新组装事务

    data

    对于insert类型,表示插入的数据;对于update类型,标识修改之后的数据;对于delete类型,表示删除的数据

    old

    对于update类型,表示修改之前的数据,只包含变更字段

3. 源代码分析

为了更详细地解释 Maxwell 的工作原理,接下来分析其核心类的部分源代码。

3.1 BinlogConnectorReplicator(binlog 读取器)

        BinlogConnectorReplicator 是 Maxwell 通过 binlog client 读取 MySQL binlog 数据的核心组件。它负责通过 MySQL Replication 协议从 MySQL 实例拉取 binlog 事件。

public class BinlogConnectorReplicator extends AbstractReplicator {private BinaryLogClient client;private MaxwellFilter filter;public BinlogConnectorReplicator(MaxwellContext context, Position startPosition) throws Exception {super(context);this.client = new BinaryLogClient(context.getConfig().mysqlHost,context.getConfig().mysqlPort,context.getConfig().mysqlUser,context.getConfig().mysqlPassword);// 设置监听器处理 binlog 事件client.registerEventListener(this::handleEvent);}public void start() throws IOException {// 启动客户端开始从 binlog 中获取数据client.connect();}private void handleEvent(Event event) {// 处理 binlog 事件// 将 event 转换为 Maxwell 的 RowMap 对象}
}

在上面的代码中:

  • BinaryLogClient 是用来与 MySQL binlog 进行通信的核心类,它会与 MySQL 建立连接并监听 binlog 的变化。
  • handleEvent 方法会被 MySQL binlog 的事件触发,当 binlog 中有新事件时,该方法会被调用,将事件处理并转换为 Maxwell 的内部对象。
3.2 BinlogParser(binlog 解析器)

        BinlogParser 负责将从 binlog 中获取的二进制事件解析为 Maxwell 可以理解的对象。对于每个 binlog 事件,都会转换为相应的 RowMap 对象。

public class BinlogParser {public RowMap parse(Event event) {EventType type = event.getHeader().getEventType();// 根据 binlog 事件类型处理不同的操作switch (type) {case WRITE_ROWS:return handleInsertEvent(event);case UPDATE_ROWS:return handleUpdateEvent(event);case DELETE_ROWS:return handleDeleteEvent(event);default:return null;}}private RowMap handleInsertEvent(Event event) {// 解析 insert 事件,将其封装为 RowMap}private RowMap handleUpdateEvent(Event event) {// 解析 update 事件,将其封装为 RowMap}private RowMap handleDeleteEvent(Event event) {// 解析 delete 事件,将其封装为 RowMap}
}

在 BinlogParser 中:

  • parse 方法会根据事件类型(如 WRITE_ROWSUPDATE_ROWSDELETE_ROWS)调用对应的处理方法,将事件转换为 RowMap
  • RowMap 是用于描述数据变化的核心对象,包含了具体的数据变化信息。
3.3 RowMap(事件描述对象)

        RowMap 是 Maxwell 中的核心数据结构,负责存储解析后的 binlog 数据。它包含了数据库名、表名、操作类型(如 insert、update、delete)以及具体的列值数据。

public class RowMap {private String database;private String table;private String type; // insert, update, deleteprivate Map<String, Object> data;public RowMap(String database, String table, String type) {this.database = database;this.table = table;this.type = type;this.data = new HashMap<>();}public void putData(String column, Object value) {data.put(column, value);}public String toJSON() {// 将 RowMap 转换为 JSON 字符串}
}

在 RowMap 中:

  • database 和 table 表示数据变更的数据库和表。
  • type 表示操作类型(INSERT、UPDATE、DELETE)。
  • data 是存储行变化数据的键值对映射(列名 -> 值)。
3.4 Producer(事件发布器)

        Producer 负责将处理好的事件(即 RowMap)发送到外部系统,如 Kafka 或 Kinesis。Maxwell 提供了多种 Producer 实现,用户可以选择适合自己需求的 Producer。

public class KafkaProducer extends AbstractProducer {private org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer;public KafkaProducer(MaxwellContext context) {Properties props = new Properties();props.put("bootstrap.servers", context.getConfig().kafkaBootstrapServers);this.kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);}@Overridepublic void push(RowMap r) {String topic = getKafkaTopic(r);String key = r.getPrimaryKey();String value = r.toJSON();kafkaProducer.send(new ProducerRecord<>(topic, key, value));}
}

在 KafkaProducer 中:

  • push 方法将 RowMap 对象转换为 JSON 格式,并发送到指定的 Kafka topic。

4. Maxwell 高级特性

  1. Schema 变更捕获:Maxwell 也能够捕捉 MySQL 表结构的变化(如 ALTER TABLE),它维护了一份 schema 的缓存,以便解析 binlog 事件时能够正确映射列与值。

  2. 断点续传:Maxwell 记录并维护 binlog 的位置,当服务重启或崩溃时,能够从上次停止的位置继续读取,不会丢失任何数据。

  3. 过滤:Maxwell 支持基于数据库和表的过滤,用户可以通过配置文件或命令行参数来指定需要捕获或忽略的数据库和表。

  4. 事务处理:Maxwell 通过 binlog 的事务边界来确保事件的顺序性和一致性,保证在输出端(如 Kafka)消费时,数据的顺序与数据库中的顺序一致。

总结

        Maxwell 是一个轻量级的 MySQL binlog 解析工具,它通过 BinlogConnectorReplicator 连接 MySQL 并获取 binlog 数据,利用 BinlogParser 解析这些二进制日志,将其转化为易于处理的 RowMap 对象,并通过 Producer 发送到外部系统。Maxwell 提供了灵活的输出方式和良好的容错机制,适用于实时数据同步和流式数据处理场景。

相关文章:

Maxwell 底层原理 详解

Maxwell 是一个 MySQL 数据库的增量数据捕获&#xff08;CDC, Change Data Capture&#xff09;工具&#xff0c;它通过读取 MySQL 的 binlog&#xff08;Binary Log&#xff09;来捕获数据变化&#xff0c;并将这些变化实时地发送到如 Kafka、Kinesis、RabbitMQ 或其他输出端。…...

使用短效IP池的优势是什么?

短效IP池作为代理IP服务中一种独特的资源管理方式&#xff0c;其应用已经在数据采集、市场分析和网络安全等多个领域中展示出强大的功能。尽管“短效”听起来似乎意味着某种限制&#xff0c;然而在某些特定的应用场景下&#xff0c;短效IP池却提供了无可比拟的优势。本文将详细…...

zynq烧写程序到flash后不运行

&#x1f3c6;本文收录于《全栈Bug调优(实战版)》专栏&#xff0c;主要记录项目实战过程中所遇到的Bug或因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&am…...

JMeter如何设置HTTP代理服务器?

1、 2、添加线程组 3、设置HTTP代理服务器&#xff0c;目标控制器选择“测试计划>线程组” 过滤掉不需要的信息 4、设置电脑手动代理 5、点击启动&#xff0c;在浏览器操作就可以了...

React面试题笔记(一)

一、react基础面试题 1.react中keys的作用是什么? key是是用于追踪哪些列表被修改&#xff0c;被添加或者被移除的辅助标识。 在开发过程中&#xff0c;我们需要保证某个元素的 key 在其同级元素中具有唯一性。在 React Diff 算法中 React 会借助元素的 Key 值来判断该元素是…...

3.Java入门笔记--基础语法

1.字面量 概念&#xff1a;计算机用来处理数据的&#xff0c;字面量就是告诉程序员数据在程序中的书写格式 常用数据&#xff1a;整数&#xff0c;小数直接写&#xff1b;字符单引号&#xff08;A&#xff09;且只能放一个字符&#xff1b;字符串双引号&#xff08;"Hel…...

关于SOCKS协议的常见误区有哪些?

代理协议在设备与代理服务器之间的数据交换中起到了关键作用。在这方面&#xff0c;SOCKS代理协议是常见的选择之一&#xff0c;被广泛应用于下载、传输和上传网络数据的场景。然而&#xff0c;关于SOCKS代理协议存在一些常见的误解&#xff0c;让我们来逐一了解。 一、使用SO…...

无极低码课程【redis windows下服务注册密码修改】

下载Windows版本的Redis linux环境 (自行下载) 1.打开官网https://redis.io/downloads/ windows环境 1.打开github https://github.com/microsoftarchive/redis/releases 然后选择你喜欢的版本zip或msi下载 2.这里下载zip版,解压后后,打开安装目录 3.双击redis-server…...

多ip访问多网站

1,关闭防火墙和安全软件 [rootlocalhost ~]# systemctl stop firewalld.service [rootlocalhost ~]# setenforce 02,挂载点&#xff0c;下载nginx [rootlocalhost ~]# mount /dev/sr0 /mnt [rootlocalhost ~]# dnf install nginx -y 3,一个虚拟机增加多个ip地址 [rootloc…...

Pytest参数详解 — 基于命令行模式!

1、--collect-only 查看在给定的配置下哪些测试用例会被执行 2、-k 使用表达式来指定希望运行的测试用例。如果测试名是唯一的或者多个测试名的前缀或者后缀相同&#xff0c;可以使用表达式来快速定位&#xff0c;例如&#xff1a; 命令行-k参数.png 3、-m 标记&#xff08;…...

指针——函数指针数组

&#xff08;一&#xff09;前文回顾 1、前篇代码分析 void(*signal(int , void(*)(int)))(int) ; 那么这串代码究竟是什么呢&#xff1f; 别慌&#xff0c;让我们来一步一步拆解&#xff0c;首先我们通过之前的学习&#xff0c;已经明白了什么是函数指针&#xff08;如果有…...

MySQL中的增查操作:探索数据的奥秘,开启数据之门

本节&#xff0c;我们继续深入了解MySQL&#xff0c;本章所讲的基础操作&#xff0c;针对的是表的增删查改&#xff01; 一、Create 新增 1.1、语法 INSERT [INTO] table_name[(column [, column] ...)] VALUES(value_list) [, (value_list)] ... value_list: value, [, va…...

oracle_查询建表语句

查询建表语句 SELECTdbms_metadata.get_ddl ( TABLE, <table_name> ) FROMdualdbms_metadata.get_ddl&#xff1a;是Oracle提供的一个函数&#xff0c;用于获取数据库对象的DDL语句&#xff0c;它允许你查看或导出数据库对象的创建脚本‘TABLE’&#xff1a; 是这个函数…...

004-按照指定功能模块名称分组

按照指定功能模块名称分组 一、说明1.现在有一个需求&#xff1a;2.具体做法 二、代码案例三、效果展示 一、说明 1.现在有一个需求&#xff1a; 需要把一个功能模块的几个功能点放在同一个文档目录下&#xff0c;这几个功能点分布在不同的 Controller 2.具体做法 需要把他…...

ChatGPT写作助手:论文写作必备提示词一览

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 随着人工智能技术的发展&#xff0c;ChatGPT在学术写作领域的应用越来越广泛。它不仅能够帮助撰写论文&#xff0c;还可以通过不同的提示词完成构思、文献综述、数据分析、润色等任务&a…...

大数据开发电脑千元配置清单

大数据开发电脑配置清单 电脑型号HUANANZHI 台式电脑操作系统Windows 11 专业版 64位&#xff08;Version 23H2 / DirectX 12&#xff09;处理器英特尔 Xeon(至强) E5-2673 v3 2.40GHz主板HUANANZHI X99-P4T&#xff08;P55 芯片组&#xff09;显卡NVIDIA GeForce GT 610 ( 2…...

VP9官方手册-帧内预测

8.5.1 intra prediction process...

windows 自定义scheme协议。

浏览器打开自定义scheme参考上一篇&#xff1a;Chromium 自定义scheme协议启动过程分析c 1、注册表里面按照如下格式填写自定义scheme协议导入&#xff1a; Windows Registry Editor Version 5.00[HKEY_CLASSES_ROOT\jdtest] "URL:jdtest Protocol" "URL Proto…...

什么是SQLite?

一、什么是SQLite? SQLite是一个进程内的软件库&#xff0c;实现了自给自足的、无服务器的、零配置的、事务性的SQL数据库引擎。它是一个零配置的数据库&#xff0c;这意味着与其他数据库不一样&#xff0c;您不需要在系统中配置。 就像其它数据库&#xff0c;SQLite引擎不是…...

域1:安全与风险管理 第2章-人员安全与风险管理

第二章的内容确实较为丰富&#xff0c;需要细心且耐心地逐步消化和理解。不妨放慢阅读速度&#xff0c;对每个要点都进行深入思考&#xff0c;确保自己真正掌握了其核心意义。这样&#xff0c;虽然可能花费更多时间&#xff0c;但能够更扎实地掌握第二章的知识&#xff0c;为后…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

镜像里切换为普通用户

如果你登录远程虚拟机默认就是 root 用户&#xff0c;但你不希望用 root 权限运行 ns-3&#xff08;这是对的&#xff0c;ns3 工具会拒绝 root&#xff09;&#xff0c;你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案&#xff1a;创建非 roo…...

sqlserver 根据指定字符 解析拼接字符串

DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

C++.OpenGL (10/64)基础光照(Basic Lighting)

基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...

在树莓派上添加音频输入设备的几种方法

在树莓派上添加音频输入设备可以通过以下步骤完成&#xff0c;具体方法取决于设备类型&#xff08;如USB麦克风、3.5mm接口麦克风或HDMI音频输入&#xff09;。以下是详细指南&#xff1a; 1. 连接音频输入设备 USB麦克风/声卡&#xff1a;直接插入树莓派的USB接口。3.5mm麦克…...