【kafka系列】Kafka事务的实现原理
目录
1. 事务核心组件
1.1 幂等性生产者(Idempotent Producer)
1.2 事务协调器(TransactionCoordinator)
1.3 事务日志(Transaction Log)
2. 事务执行流程
2.1 事务初始化
2.2 发送消息
2.3 事务提交(两阶段提交)
2.4 事务完成
3. 消费者事务隔离
3.1 隔离级别
3.2 实现机制
4. 异常处理与容错
4.1 生产者宕机
4.2 协调器宕机
4.3 Broker宕机
5. 关键源码解析
5.1 事务协调器核心逻辑
5.2 两阶段提交实现
5.3 消费者过滤未提交消息
6. 事务配置与使用
6.1 生产者配置
6.2 消费者配置
7. 事务性能与限制
总结
- 幂等生产者:通过
ProducerID和SequenceNumber去重,避免消息重复(源码见ProducerStateManager)。- 事务协调器(TransactionCoordinator):
- 每个事务绑定一个Coordinator,处理
BEGIN_TRANSACTION、COMMIT/ABORT请求。- 事务状态存储在内部Topic
__transaction_state中(通过TransactionStateManager管理)。
- 两阶段提交:
- 阶段1:标记事务为“预提交”,写入所有参与分区的数据。
- 阶段2:写入
COMMIT标记到事务日志,消费者仅可见已提交的事务消息。
Kafka事务机制通过幂等性生产者、事务协调器(TransactionCoordinator) 和 两阶段提交(2PC) 实现跨分区的原子性写入,确保消息要么全部提交,要么全部丢弃。以下是核心实现机制:
1. 事务核心组件
1.1 幂等性生产者(Idempotent Producer)
- 作用:确保单分区内消息不重复。
- 实现机制:
-
- PID(Producer ID):每个生产者实例唯一,由Broker分配。
- Sequence Number:每个消息的单调递增序列号,Broker校验序列号连续性。
- 源码类:
ProducerStateManager(管理PID与序列号)。
1.2 事务协调器(TransactionCoordinator)
- 作用:管理事务生命周期,协调事务提交或中止。
- 实现机制:
-
- 每个事务绑定一个协调器(通过事务ID哈希选择Broker)。
- 维护事务状态机(
TransactionState),存储在内部Topic__transaction_state。 - 源码类:
TransactionCoordinator、TransactionStateManager。
1.3 事务日志(Transaction Log)
- 作用:持久化事务状态,防止协调器宕机后数据丢失。
- 存储位置:内部Topic
__transaction_state,每个分区对应一个协调器。 - 数据格式:事务ID、PID、状态(
PrepareCommit、Completed等)、超时时间。
2. 事务执行流程
2.1 事务初始化
- 生产者初始化事务:
-
- 调用
initTransactions(),向协调器注册事务ID,获取PID。 - 协调器在
__transaction_state中记录事务元数据。
- 调用
2.2 发送消息
- 发送事务消息:
-
- 生产者发送消息时携带PID、序列号、事务ID。
- Broker将消息写入日志,但标记为未提交(对消费者不可见)。
2.3 事务提交(两阶段提交)
- 阶段1:Prepare Commit
生产者向协调器发送EndTxnRequest,协调器将事务状态置为PrepareCommit,并持久化到事务日志。 - 阶段2:Commit Markers写入
协调器向所有涉及的分区Leader发送WriteTxnMarkers请求,Leader在日志中写入事务提交标记(Control Batch)。
2.4 事务完成
- Broker将事务消息标记为已提交,消费者可读取(需配置
isolation.level=read_committed)。
3. 消费者事务隔离
3.1 隔离级别
read_committed:仅消费已提交的事务消息(跳过未提交的Control Batch)。read_uncommitted:消费所有消息(默认模式,不保证事务原子性)。
3.2 实现机制
- 消费者在拉取消息时,Broker根据隔离级别过滤未提交的事务消息。
- 源码逻辑:
KafkaConsumer的fetcher模块解析Control Batch,决定是否跳过消息。
4. 异常处理与容错
4.1 生产者宕机
- 事务超时(
transaction.timeout.ms):协调器自动中止未完成的事务。 - 新生产者实例需重新初始化事务,旧事务状态由协调器清理。
4.2 协调器宕机
- 事务日志持久化在
__transaction_state,新协调器加载日志恢复状态。
4.3 Broker宕机
- 副本机制保证事务日志和消息日志的高可用,Leader切换后继续处理事务。
5. 关键源码解析
5.1 事务协调器核心逻辑
//事务状态管理(TransactionStateManager)
public class TransactionStateManager {// 持久化事务状态到__transaction_statedef appendTransactionToLog(transactionState: TransactionState): Unit = {val records = List(new SimpleRecord(transactionState.toBytes))transactionLog.append(records)}
}
5.2 两阶段提交实现
// 协调器发送提交标记(TransactionCoordinator)
private def sendTxnMarkers(transactionState: TransactionState): Unit = {
// 向所有分区Leader发送WriteTxnMarkersRequest
transactionState.partitions.foreach { partition =>val request = new WriteTxnMarkersRequest.Builder(partition, Commit)sendRequestToLeader(request)}
}
5.3 消费者过滤未提交消息
// 消费者拉取消息过滤(ConsumerFetcherThread)
private def filterCommittedMessages(records: ConsumerRecords): ConsumerRecords = {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {records.filter(_.controlBatchType != ControlBatchType.ABORT)
} else {records
}
}
6. 事务配置与使用
6.1 生产者配置
props.put("enable.idempotence", "true"); // 开启幂等性
props.put("transactional.id", "my-tx-id"); // 必须设置事务ID
props.put("transaction.timeout.ms", "60000"); // 事务超时时间
6.2 消费者配置
props.put("isolation.level", "read_committed"); // 仅消费已提交消息
7. 事务性能与限制
- 性能开销:事务引入两阶段提交和日志持久化,吞吐量下降约20%-30%。
- 限制:
-
- 事务仅支持单会话(一个生产者实例)。
- 事务消息的消费者必须使用Kafka Consumer API(不支持旧版基于ZooKeeper的消费者)。
总结
Kafka事务通过以下机制实现跨分区的原子性:
- 幂等性生产者:避免单分区消息重复。
- 事务协调器与两阶段提交:确保所有分区要么全部提交,要么全部回滚。
- 事务日志持久化:保障协调器故障恢复后状态一致。
- 消费者隔离级别:控制事务消息的可见性。
正确配置后,Kafka事务可支持金融级场景的精确一次(Exactly-Once)语义
相关文章:
【kafka系列】Kafka事务的实现原理
目录 1. 事务核心组件 1.1 幂等性生产者(Idempotent Producer) 1.2 事务协调器(TransactionCoordinator) 1.3 事务日志(Transaction Log) 2. 事务执行流程 2.1 事务初始化 2.2 发送消息 2.3 事务提…...
网络将内网服务转换到公网上
当然,以下是根据您提供的描述,对内网端口在公网上转换过程的详细步骤,并附上具体例子进行说明: 内网端口在公网上的转换过程详细步骤 1. 内网服务配置 步骤说明: 在内网中的某台计算机(我们称之为“内网…...
c#自动更新-源码
软件维护与升级 修复漏洞和缺陷:软件在使用过程中可能会发现各种漏洞和缺陷,自动更新可以及时推送修复程序,增强软件的稳定性和安全性,避免因漏洞被利用而导致数据泄露、系统崩溃等问题。提升性能:通过自动更新&#x…...
爬虫实战:利用代理ip爬取推特网站数据
引言 亮数据-网络IP代理及全网数据一站式服务商屡获殊荣的代理网络、强大的数据挖掘工具和现成可用的数据集。亮数据:网络数据平台领航者https://www.bright.cn/?promoRESIYEAR50/?utm_sourcebrand&utm_campaignbrnd-mkt_cn_csdn_yingjie202502 在跨境电商、社…...
git使用,注意空格
第一节 安装完成后,找个目录用于存储,打开目录右击选择git bash here 命令1 姓名 回车 git config --global user.name "li" 命令2 邮箱 回车 git config --global user.email "888163.com" 命令3 初始化新仓库,下载克隆 回…...
138,【5】buuctf web [RootersCTF2019]I_<3_Flask
进入靶场 这段代码是利用 Python 的类继承和反射机制来尝试执行系统命令读取flag.txt文件内容 .__class__:空字符串对象调用__class__属性,得到str类,即字符串的类型。__class__.__base__:str类的__base__属性指向其基类…...
docker 运行 芋道微服务
创建文件夹 docker-ai 文件夹下放入需要jar包的文件夹及 docker-compose.yml 文件 docker-compose.yml 内容:我这里的是ai服务,所以将原先的文件内容做了变更,你们需要用到什么服务就在下面文件中进行更改即可 version: 3 services:yudao-g…...
C++ Primer 函数重载
欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...
【Rust中级教程】1.6. 内存 Pt.4:静态(static)内存与‘static生命周期标注
喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以关注专栏。谢谢喵!(・ω・) 1.6.1. 静态(static)内存 static内存实际上是一个统称,它指的是程序编译后的文…...
【设计模式】【行为型模式】解释器模式(Interpreter)
👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD 🔥 2025本人正在沉淀中… 博客更新速度 👍 欢迎点赞、收藏、关注,跟上我的更新节奏 🎵 当你的天空突…...
修改OnlyOffice编辑器默认字体
通过Docker修改OnlyOffice编辑器默认字体 问题描述详细方案1. 删除原生字体文件2. 创建字体目录3. 复制字体文件到容器中4. 执行字体更新脚本5. 重新启动容器 注意事项 问题描述 在OnlyOffice中,编辑器的默认字体可能不符合公司或个人的需求,通常会使用…...
React echarts柱状图点击某个柱子跳转页面
绘制echarts柱状图 在 ECharts 中,如果你想要在点击柱状图的某个柱子时进行页面跳转,你可以通过设置 series 中的 data 属性中的 itemStyle 或者使用 series 的 label 属性中的 emphasis 属性来实现。但是,直接在柱状图中实现点击跳转通常涉…...
wordpress主题插件开发中高频使用的38个函数
核心模板函数 get_header()/get_footer()/get_sidebar() – 加载模板部件 the_title()/the_content()/the_excerpt() – 显示文章标题、内容、摘要 the_post() – 循环中获取文章数据 bloginfo(‘url’) – 获取站点URL wp_head()/wp_footer() – 输出头部/尾部代码 wp_n…...
ElasticSearch基础和使用
ElasticSearch基础 1 初识ES相关组件 (1)Elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。Elasticsearch结合kibana、Logstash、Beats组件 也就是elastic stack(ELK) 广泛应…...
qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene
qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene code review! 文章目录 qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene1.`setScene` 方法2.通过 `scene` 获取它的视图 (`views()`)…...
小白win10安装并配置yt-dlp
需要yt-dlp和ffmpeg 注意存放路径最好都是全英文 win10安装并配置yt-dlp 一、下载1.下载yt-dlp2. fffmpeg下载 二、配置环境三、cmd操作四、yt-dlp下视频操作 一、下载 1.下载yt-dlp yt-dlp地址 找到win的压缩包点下载,并解压 2. fffmpeg下载 ffmpeg官方下载 …...
【kafka系列】broker
目录 Broker 接收生产者消息和返回消息给消费者的流程逻辑分析 Broker 处理生产者消息的核心流程 Broker 处理消费者消息的核心流程 关键点总结 Broker 接收生产者消息和返回消息给消费者的流程逻辑分析 Broker 处理生产者消息的核心流程 接收请求 Broker 的 SocketServer …...
用大模型学大模型05-线性回归
deepseek.com:多元线性回归的目标函数,损失函数,梯度下降 标量和矩阵形式的数学推导,pytorch真实能跑的代码案例以及模型,数据,预测结果的可视化展示, 模型应用场景和优缺点,及如何改进解决及改进方法数据推…...
Python实现AWS Fargate自动化部署系统
一、背景介绍 在现代云原生应用开发中,自动化部署是提高开发效率和保证部署质量的关键。AWS Fargate作为一项无服务器计算引擎,可以让我们专注于应用程序开发而无需管理底层基础设施。本文将详细介绍如何使用Python实现AWS Fargate的完整自动化部署流程。 © ivwdcwso (ID…...
国产编辑器EverEdit - 上下翻滚不迷路(历史编辑位置、历史光标位置回溯功能)
1 光标位置跳转 1.1 应用场景 某些场景下,用户从当前编辑位置跳转到别的位置查阅信息,如果要快速跳转回之前编辑位置,则可以使用光标跳转相关功能。 1.2 使用方法 1.2.1 上一个编辑位置 跳转到上一个编辑位置,即文本修改过的位…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器
——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的一体化测试平台,覆盖应用全生命周期测试需求,主要提供五大核心能力: 测试类型检测目标关键指标功能体验基…...
如何为服务器生成TLS证书
TLS(Transport Layer Security)证书是确保网络通信安全的重要手段,它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书,可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
JavaScript 数据类型详解
JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型(Primitive) 和 对象类型(Object) 两大类,共 8 种(ES11): 一、原始类型(7种) 1. undefined 定…...
C# 表达式和运算符(求值顺序)
求值顺序 表达式可以由许多嵌套的子表达式构成。子表达式的求值顺序可以使表达式的最终值发生 变化。 例如,已知表达式3*52,依照子表达式的求值顺序,有两种可能的结果,如图9-3所示。 如果乘法先执行,结果是17。如果5…...
