【kafka系列】Kafka事务的实现原理
目录
1. 事务核心组件
1.1 幂等性生产者(Idempotent Producer)
1.2 事务协调器(TransactionCoordinator)
1.3 事务日志(Transaction Log)
2. 事务执行流程
2.1 事务初始化
2.2 发送消息
2.3 事务提交(两阶段提交)
2.4 事务完成
3. 消费者事务隔离
3.1 隔离级别
3.2 实现机制
4. 异常处理与容错
4.1 生产者宕机
4.2 协调器宕机
4.3 Broker宕机
5. 关键源码解析
5.1 事务协调器核心逻辑
5.2 两阶段提交实现
5.3 消费者过滤未提交消息
6. 事务配置与使用
6.1 生产者配置
6.2 消费者配置
7. 事务性能与限制
总结
- 幂等生产者:通过
ProducerID和SequenceNumber去重,避免消息重复(源码见ProducerStateManager)。- 事务协调器(TransactionCoordinator):
- 每个事务绑定一个Coordinator,处理
BEGIN_TRANSACTION、COMMIT/ABORT请求。- 事务状态存储在内部Topic
__transaction_state中(通过TransactionStateManager管理)。
- 两阶段提交:
- 阶段1:标记事务为“预提交”,写入所有参与分区的数据。
- 阶段2:写入
COMMIT标记到事务日志,消费者仅可见已提交的事务消息。
Kafka事务机制通过幂等性生产者、事务协调器(TransactionCoordinator) 和 两阶段提交(2PC) 实现跨分区的原子性写入,确保消息要么全部提交,要么全部丢弃。以下是核心实现机制:
1. 事务核心组件
1.1 幂等性生产者(Idempotent Producer)
- 作用:确保单分区内消息不重复。
- 实现机制:
-
- PID(Producer ID):每个生产者实例唯一,由Broker分配。
- Sequence Number:每个消息的单调递增序列号,Broker校验序列号连续性。
- 源码类:
ProducerStateManager(管理PID与序列号)。
1.2 事务协调器(TransactionCoordinator)
- 作用:管理事务生命周期,协调事务提交或中止。
- 实现机制:
-
- 每个事务绑定一个协调器(通过事务ID哈希选择Broker)。
- 维护事务状态机(
TransactionState),存储在内部Topic__transaction_state。 - 源码类:
TransactionCoordinator、TransactionStateManager。
1.3 事务日志(Transaction Log)
- 作用:持久化事务状态,防止协调器宕机后数据丢失。
- 存储位置:内部Topic
__transaction_state,每个分区对应一个协调器。 - 数据格式:事务ID、PID、状态(
PrepareCommit、Completed等)、超时时间。
2. 事务执行流程
2.1 事务初始化
- 生产者初始化事务:
-
- 调用
initTransactions(),向协调器注册事务ID,获取PID。 - 协调器在
__transaction_state中记录事务元数据。
- 调用
2.2 发送消息
- 发送事务消息:
-
- 生产者发送消息时携带PID、序列号、事务ID。
- Broker将消息写入日志,但标记为未提交(对消费者不可见)。
2.3 事务提交(两阶段提交)
- 阶段1:Prepare Commit
生产者向协调器发送EndTxnRequest,协调器将事务状态置为PrepareCommit,并持久化到事务日志。 - 阶段2:Commit Markers写入
协调器向所有涉及的分区Leader发送WriteTxnMarkers请求,Leader在日志中写入事务提交标记(Control Batch)。
2.4 事务完成
- Broker将事务消息标记为已提交,消费者可读取(需配置
isolation.level=read_committed)。
3. 消费者事务隔离
3.1 隔离级别
read_committed:仅消费已提交的事务消息(跳过未提交的Control Batch)。read_uncommitted:消费所有消息(默认模式,不保证事务原子性)。
3.2 实现机制
- 消费者在拉取消息时,Broker根据隔离级别过滤未提交的事务消息。
- 源码逻辑:
KafkaConsumer的fetcher模块解析Control Batch,决定是否跳过消息。
4. 异常处理与容错
4.1 生产者宕机
- 事务超时(
transaction.timeout.ms):协调器自动中止未完成的事务。 - 新生产者实例需重新初始化事务,旧事务状态由协调器清理。
4.2 协调器宕机
- 事务日志持久化在
__transaction_state,新协调器加载日志恢复状态。
4.3 Broker宕机
- 副本机制保证事务日志和消息日志的高可用,Leader切换后继续处理事务。
5. 关键源码解析
5.1 事务协调器核心逻辑
//事务状态管理(TransactionStateManager)
public class TransactionStateManager {// 持久化事务状态到__transaction_statedef appendTransactionToLog(transactionState: TransactionState): Unit = {val records = List(new SimpleRecord(transactionState.toBytes))transactionLog.append(records)}
}
5.2 两阶段提交实现
// 协调器发送提交标记(TransactionCoordinator)
private def sendTxnMarkers(transactionState: TransactionState): Unit = {
// 向所有分区Leader发送WriteTxnMarkersRequest
transactionState.partitions.foreach { partition =>val request = new WriteTxnMarkersRequest.Builder(partition, Commit)sendRequestToLeader(request)}
}
5.3 消费者过滤未提交消息
// 消费者拉取消息过滤(ConsumerFetcherThread)
private def filterCommittedMessages(records: ConsumerRecords): ConsumerRecords = {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {records.filter(_.controlBatchType != ControlBatchType.ABORT)
} else {records
}
}
6. 事务配置与使用
6.1 生产者配置
props.put("enable.idempotence", "true"); // 开启幂等性
props.put("transactional.id", "my-tx-id"); // 必须设置事务ID
props.put("transaction.timeout.ms", "60000"); // 事务超时时间
6.2 消费者配置
props.put("isolation.level", "read_committed"); // 仅消费已提交消息
7. 事务性能与限制
- 性能开销:事务引入两阶段提交和日志持久化,吞吐量下降约20%-30%。
- 限制:
-
- 事务仅支持单会话(一个生产者实例)。
- 事务消息的消费者必须使用Kafka Consumer API(不支持旧版基于ZooKeeper的消费者)。
总结
Kafka事务通过以下机制实现跨分区的原子性:
- 幂等性生产者:避免单分区消息重复。
- 事务协调器与两阶段提交:确保所有分区要么全部提交,要么全部回滚。
- 事务日志持久化:保障协调器故障恢复后状态一致。
- 消费者隔离级别:控制事务消息的可见性。
正确配置后,Kafka事务可支持金融级场景的精确一次(Exactly-Once)语义
相关文章:
【kafka系列】Kafka事务的实现原理
目录 1. 事务核心组件 1.1 幂等性生产者(Idempotent Producer) 1.2 事务协调器(TransactionCoordinator) 1.3 事务日志(Transaction Log) 2. 事务执行流程 2.1 事务初始化 2.2 发送消息 2.3 事务提…...
网络将内网服务转换到公网上
当然,以下是根据您提供的描述,对内网端口在公网上转换过程的详细步骤,并附上具体例子进行说明: 内网端口在公网上的转换过程详细步骤 1. 内网服务配置 步骤说明: 在内网中的某台计算机(我们称之为“内网…...
c#自动更新-源码
软件维护与升级 修复漏洞和缺陷:软件在使用过程中可能会发现各种漏洞和缺陷,自动更新可以及时推送修复程序,增强软件的稳定性和安全性,避免因漏洞被利用而导致数据泄露、系统崩溃等问题。提升性能:通过自动更新&#x…...
爬虫实战:利用代理ip爬取推特网站数据
引言 亮数据-网络IP代理及全网数据一站式服务商屡获殊荣的代理网络、强大的数据挖掘工具和现成可用的数据集。亮数据:网络数据平台领航者https://www.bright.cn/?promoRESIYEAR50/?utm_sourcebrand&utm_campaignbrnd-mkt_cn_csdn_yingjie202502 在跨境电商、社…...
git使用,注意空格
第一节 安装完成后,找个目录用于存储,打开目录右击选择git bash here 命令1 姓名 回车 git config --global user.name "li" 命令2 邮箱 回车 git config --global user.email "888163.com" 命令3 初始化新仓库,下载克隆 回…...
138,【5】buuctf web [RootersCTF2019]I_<3_Flask
进入靶场 这段代码是利用 Python 的类继承和反射机制来尝试执行系统命令读取flag.txt文件内容 .__class__:空字符串对象调用__class__属性,得到str类,即字符串的类型。__class__.__base__:str类的__base__属性指向其基类…...
docker 运行 芋道微服务
创建文件夹 docker-ai 文件夹下放入需要jar包的文件夹及 docker-compose.yml 文件 docker-compose.yml 内容:我这里的是ai服务,所以将原先的文件内容做了变更,你们需要用到什么服务就在下面文件中进行更改即可 version: 3 services:yudao-g…...
C++ Primer 函数重载
欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...
【Rust中级教程】1.6. 内存 Pt.4:静态(static)内存与‘static生命周期标注
喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以关注专栏。谢谢喵!(・ω・) 1.6.1. 静态(static)内存 static内存实际上是一个统称,它指的是程序编译后的文…...
【设计模式】【行为型模式】解释器模式(Interpreter)
👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD 🔥 2025本人正在沉淀中… 博客更新速度 👍 欢迎点赞、收藏、关注,跟上我的更新节奏 🎵 当你的天空突…...
修改OnlyOffice编辑器默认字体
通过Docker修改OnlyOffice编辑器默认字体 问题描述详细方案1. 删除原生字体文件2. 创建字体目录3. 复制字体文件到容器中4. 执行字体更新脚本5. 重新启动容器 注意事项 问题描述 在OnlyOffice中,编辑器的默认字体可能不符合公司或个人的需求,通常会使用…...
React echarts柱状图点击某个柱子跳转页面
绘制echarts柱状图 在 ECharts 中,如果你想要在点击柱状图的某个柱子时进行页面跳转,你可以通过设置 series 中的 data 属性中的 itemStyle 或者使用 series 的 label 属性中的 emphasis 属性来实现。但是,直接在柱状图中实现点击跳转通常涉…...
wordpress主题插件开发中高频使用的38个函数
核心模板函数 get_header()/get_footer()/get_sidebar() – 加载模板部件 the_title()/the_content()/the_excerpt() – 显示文章标题、内容、摘要 the_post() – 循环中获取文章数据 bloginfo(‘url’) – 获取站点URL wp_head()/wp_footer() – 输出头部/尾部代码 wp_n…...
ElasticSearch基础和使用
ElasticSearch基础 1 初识ES相关组件 (1)Elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。Elasticsearch结合kibana、Logstash、Beats组件 也就是elastic stack(ELK) 广泛应…...
qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene
qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene code review! 文章目录 qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene1.`setScene` 方法2.通过 `scene` 获取它的视图 (`views()`)…...
小白win10安装并配置yt-dlp
需要yt-dlp和ffmpeg 注意存放路径最好都是全英文 win10安装并配置yt-dlp 一、下载1.下载yt-dlp2. fffmpeg下载 二、配置环境三、cmd操作四、yt-dlp下视频操作 一、下载 1.下载yt-dlp yt-dlp地址 找到win的压缩包点下载,并解压 2. fffmpeg下载 ffmpeg官方下载 …...
【kafka系列】broker
目录 Broker 接收生产者消息和返回消息给消费者的流程逻辑分析 Broker 处理生产者消息的核心流程 Broker 处理消费者消息的核心流程 关键点总结 Broker 接收生产者消息和返回消息给消费者的流程逻辑分析 Broker 处理生产者消息的核心流程 接收请求 Broker 的 SocketServer …...
用大模型学大模型05-线性回归
deepseek.com:多元线性回归的目标函数,损失函数,梯度下降 标量和矩阵形式的数学推导,pytorch真实能跑的代码案例以及模型,数据,预测结果的可视化展示, 模型应用场景和优缺点,及如何改进解决及改进方法数据推…...
Python实现AWS Fargate自动化部署系统
一、背景介绍 在现代云原生应用开发中,自动化部署是提高开发效率和保证部署质量的关键。AWS Fargate作为一项无服务器计算引擎,可以让我们专注于应用程序开发而无需管理底层基础设施。本文将详细介绍如何使用Python实现AWS Fargate的完整自动化部署流程。 © ivwdcwso (ID…...
国产编辑器EverEdit - 上下翻滚不迷路(历史编辑位置、历史光标位置回溯功能)
1 光标位置跳转 1.1 应用场景 某些场景下,用户从当前编辑位置跳转到别的位置查阅信息,如果要快速跳转回之前编辑位置,则可以使用光标跳转相关功能。 1.2 使用方法 1.2.1 上一个编辑位置 跳转到上一个编辑位置,即文本修改过的位…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
2024年赣州旅游投资集团社会招聘笔试真
2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
基于Docker Compose部署Java微服务项目
一. 创建根项目 根项目(父项目)主要用于依赖管理 一些需要注意的点: 打包方式需要为 pom<modules>里需要注册子模块不要引入maven的打包插件,否则打包时会出问题 <?xml version"1.0" encoding"UTF-8…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)
前言: 最近在做行为检测相关的模型,用的是时空图卷积网络(STGCN),但原有kinetic-400数据集数据质量较低,需要进行细粒度的标注,同时粗略搜了下已有开源工具基本都集中于图像分割这块,…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
