当前位置: 首页 > news >正文

【kafka系列】Kafka事务的实现原理

目录

1. 事务核心组件

1.1 幂等性生产者(Idempotent Producer)

1.2 事务协调器(TransactionCoordinator)

1.3 事务日志(Transaction Log)

2. 事务执行流程

2.1 事务初始化

2.2 发送消息

2.3 事务提交(两阶段提交)

2.4 事务完成

3. 消费者事务隔离

3.1 隔离级别

3.2 实现机制

4. 异常处理与容错

4.1 生产者宕机

4.2 协调器宕机

4.3 Broker宕机

5. 关键源码解析

5.1 事务协调器核心逻辑

5.2 两阶段提交实现

5.3 消费者过滤未提交消息

6. 事务配置与使用

6.1 生产者配置

6.2 消费者配置

7. 事务性能与限制

总结


  1. 幂等生产者:通过ProducerIDSequenceNumber去重,避免消息重复(源码见ProducerStateManager)。
  2. 事务协调器(TransactionCoordinator)
    • 每个事务绑定一个Coordinator,处理BEGIN_TRANSACTIONCOMMIT/ABORT请求。
    • 事务状态存储在内部Topic __transaction_state中(通过TransactionStateManager管理)。
  1. 两阶段提交
    • 阶段1:标记事务为“预提交”,写入所有参与分区的数据。
    • 阶段2:写入COMMIT标记到事务日志,消费者仅可见已提交的事务消息。

Kafka事务机制通过幂等性生产者事务协调器(TransactionCoordinator)两阶段提交(2PC) 实现跨分区的原子性写入,确保消息要么全部提交,要么全部丢弃。以下是核心实现机制:


1. 事务核心组件

1.1 幂等性生产者(Idempotent Producer)
  • 作用:确保单分区内消息不重复。
  • 实现机制
    • PID(Producer ID):每个生产者实例唯一,由Broker分配。
    • Sequence Number:每个消息的单调递增序列号,Broker校验序列号连续性。
    • 源码类ProducerStateManager(管理PID与序列号)。
1.2 事务协调器(TransactionCoordinator)
  • 作用:管理事务生命周期,协调事务提交或中止。
  • 实现机制
    • 每个事务绑定一个协调器(通过事务ID哈希选择Broker)。
    • 维护事务状态机(TransactionState),存储在内部Topic __transaction_state
    • 源码类TransactionCoordinatorTransactionStateManager
1.3 事务日志(Transaction Log)
  • 作用:持久化事务状态,防止协调器宕机后数据丢失。
  • 存储位置:内部Topic __transaction_state,每个分区对应一个协调器。
  • 数据格式:事务ID、PID、状态(PrepareCommitCompleted等)、超时时间。

2. 事务执行流程

2.1 事务初始化
  1. 生产者初始化事务
    • 调用initTransactions(),向协调器注册事务ID,获取PID。
    • 协调器在__transaction_state中记录事务元数据。
2.2 发送消息
  1. 发送事务消息
    • 生产者发送消息时携带PID、序列号、事务ID。
    • Broker将消息写入日志,但标记为未提交(对消费者不可见)。
2.3 事务提交(两阶段提交)
  • 阶段1:Prepare Commit
    生产者向协调器发送EndTxnRequest,协调器将事务状态置为PrepareCommit,并持久化到事务日志。
  • 阶段2:Commit Markers写入
    协调器向所有涉及的分区Leader发送WriteTxnMarkers请求,Leader在日志中写入事务提交标记(Control Batch)。
2.4 事务完成
  • Broker将事务消息标记为已提交,消费者可读取(需配置isolation.level=read_committed)。

3. 消费者事务隔离

3.1 隔离级别
  • read_committed:仅消费已提交的事务消息(跳过未提交的Control Batch)。
  • read_uncommitted:消费所有消息(默认模式,不保证事务原子性)。
3.2 实现机制
  • 消费者在拉取消息时,Broker根据隔离级别过滤未提交的事务消息。
  • 源码逻辑KafkaConsumerfetcher模块解析Control Batch,决定是否跳过消息。

4. 异常处理与容错

4.1 生产者宕机
  • 事务超时(transaction.timeout.ms):协调器自动中止未完成的事务。
  • 新生产者实例需重新初始化事务,旧事务状态由协调器清理。
4.2 协调器宕机
  • 事务日志持久化在__transaction_state,新协调器加载日志恢复状态。
4.3 Broker宕机
  • 副本机制保证事务日志和消息日志的高可用,Leader切换后继续处理事务。

5. 关键源码解析

5.1 事务协调器核心逻辑
//事务状态管理(TransactionStateManager)
public class TransactionStateManager {// 持久化事务状态到__transaction_statedef appendTransactionToLog(transactionState: TransactionState): Unit = {val records = List(new SimpleRecord(transactionState.toBytes))transactionLog.append(records)}
}
5.2 两阶段提交实现
// 协调器发送提交标记(TransactionCoordinator)
private def sendTxnMarkers(transactionState: TransactionState): Unit = {
// 向所有分区Leader发送WriteTxnMarkersRequest
transactionState.partitions.foreach { partition =>val request = new WriteTxnMarkersRequest.Builder(partition, Commit)sendRequestToLeader(request)}
}
5.3 消费者过滤未提交消息
// 消费者拉取消息过滤(ConsumerFetcherThread)
private def filterCommittedMessages(records: ConsumerRecords): ConsumerRecords = {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {records.filter(_.controlBatchType != ControlBatchType.ABORT)
} else {records
}
}

6. 事务配置与使用

6.1 生产者配置
props.put("enable.idempotence", "true");  // 开启幂等性
props.put("transactional.id", "my-tx-id"); // 必须设置事务ID
props.put("transaction.timeout.ms", "60000"); // 事务超时时间
6.2 消费者配置
props.put("isolation.level", "read_committed"); // 仅消费已提交消息

7. 事务性能与限制

  • 性能开销:事务引入两阶段提交和日志持久化,吞吐量下降约20%-30%。
  • 限制
    • 事务仅支持单会话(一个生产者实例)。
    • 事务消息的消费者必须使用Kafka Consumer API(不支持旧版基于ZooKeeper的消费者)。

总结

Kafka事务通过以下机制实现跨分区的原子性:

  1. 幂等性生产者:避免单分区消息重复。
  2. 事务协调器与两阶段提交:确保所有分区要么全部提交,要么全部回滚。
  3. 事务日志持久化:保障协调器故障恢复后状态一致。
  4. 消费者隔离级别:控制事务消息的可见性。

正确配置后,Kafka事务可支持金融级场景的精确一次(Exactly-Once)语义

相关文章:

【kafka系列】Kafka事务的实现原理

目录 1. 事务核心组件 1.1 幂等性生产者(Idempotent Producer) 1.2 事务协调器(TransactionCoordinator) 1.3 事务日志(Transaction Log) 2. 事务执行流程 2.1 事务初始化 2.2 发送消息 2.3 事务提…...

网络将内网服务转换到公网上

当然,以下是根据您提供的描述,对内网端口在公网上转换过程的详细步骤,并附上具体例子进行说明: 内网端口在公网上的转换过程详细步骤 1. 内网服务配置 步骤说明: 在内网中的某台计算机(我们称之为“内网…...

c#自动更新-源码

软件维护与升级 修复漏洞和缺陷:软件在使用过程中可能会发现各种漏洞和缺陷,自动更新可以及时推送修复程序,增强软件的稳定性和安全性,避免因漏洞被利用而导致数据泄露、系统崩溃等问题。提升性能:通过自动更新&#x…...

爬虫实战:利用代理ip爬取推特网站数据

引言 亮数据-网络IP代理及全网数据一站式服务商屡获殊荣的代理网络、强大的数据挖掘工具和现成可用的数据集。亮数据:网络数据平台领航者https://www.bright.cn/?promoRESIYEAR50/?utm_sourcebrand&utm_campaignbrnd-mkt_cn_csdn_yingjie202502 在跨境电商、社…...

git使用,注意空格

第一节 安装完成后,找个目录用于存储,打开目录右击选择git bash here 命令1 姓名 回车 git config --global user.name "li" 命令2 邮箱 回车 git config --global user.email "888163.com" 命令3 初始化新仓库,下载克隆 回…...

138,【5】buuctf web [RootersCTF2019]I_<3_Flask

进入靶场 这段代码是利用 Python 的类继承和反射机制来尝试执行系统命令读取flag.txt文件内容 .__class__:空字符串对象调用__class__属性,得到str类,即字符串的类型。__class__.__base__:str类的__base__属性指向其基类&#xf…...

docker 运行 芋道微服务

创建文件夹 docker-ai 文件夹下放入需要jar包的文件夹及 docker-compose.yml 文件 docker-compose.yml 内容:我这里的是ai服务,所以将原先的文件内容做了变更,你们需要用到什么服务就在下面文件中进行更改即可 version: 3 services:yudao-g…...

C++ Primer 函数重载

欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...

【Rust中级教程】1.6. 内存 Pt.4:静态(static)内存与‘static生命周期标注

喜欢的话别忘了点赞、收藏加关注哦(加关注即可阅读全文),对接下来的教程有兴趣的可以关注专栏。谢谢喵!(・ω・) 1.6.1. 静态(static)内存 static内存实际上是一个统称,它指的是程序编译后的文…...

【设计模式】【行为型模式】解释器模式(Interpreter)

👋hi,我不是一名外包公司的员工,也不会偷吃茶水间的零食,我的梦想是能写高端CRUD 🔥 2025本人正在沉淀中… 博客更新速度 👍 欢迎点赞、收藏、关注,跟上我的更新节奏 🎵 当你的天空突…...

修改OnlyOffice编辑器默认字体

通过Docker修改OnlyOffice编辑器默认字体 问题描述详细方案1. 删除原生字体文件2. 创建字体目录3. 复制字体文件到容器中4. 执行字体更新脚本5. 重新启动容器 注意事项 问题描述 在OnlyOffice中,编辑器的默认字体可能不符合公司或个人的需求,通常会使用…...

React echarts柱状图点击某个柱子跳转页面

绘制echarts柱状图 在 ECharts 中,如果你想要在点击柱状图的某个柱子时进行页面跳转,你可以通过设置 series 中的 data 属性中的 itemStyle 或者使用 series 的 label 属性中的 emphasis 属性来实现。但是,直接在柱状图中实现点击跳转通常涉…...

wordpress主题插件开发中高频使用的38个函数

核心模板函数 get_header()/get_footer()/get_sidebar() – 加载模板部件 the_title()/the_content()/the_excerpt() – 显示文章标题、内容、摘要 the_post() – 循环中获取文章数据 bloginfo(‘url’) – 获取站点URL wp_head()/wp_footer() – 输出头部/尾部代码 wp_n…...

ElasticSearch基础和使用

ElasticSearch基础 1 初识ES相关组件 (1)Elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。Elasticsearch结合kibana、Logstash、Beats组件 也就是elastic stack(ELK) 广泛应…...

qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene

qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene code review! 文章目录 qt-C++笔记之QGraphicsScene和 QGraphicsView中setScene、通过scene得到view、通过view得scene1.`setScene` 方法2.通过 `scene` 获取它的视图 (`views()`)…...

小白win10安装并配置yt-dlp

需要yt-dlp和ffmpeg 注意存放路径最好都是全英文 win10安装并配置yt-dlp 一、下载1.下载yt-dlp2. fffmpeg下载 二、配置环境三、cmd操作四、yt-dlp下视频操作 一、下载 1.下载yt-dlp yt-dlp地址 找到win的压缩包点下载,并解压 2. fffmpeg下载 ffmpeg官方下载 …...

【kafka系列】broker

目录 Broker 接收生产者消息和返回消息给消费者的流程逻辑分析 Broker 处理生产者消息的核心流程 Broker 处理消费者消息的核心流程 关键点总结 Broker 接收生产者消息和返回消息给消费者的流程逻辑分析 Broker 处理生产者消息的核心流程 接收请求 Broker 的 SocketServer …...

用大模型学大模型05-线性回归

deepseek.com:多元线性回归的目标函数,损失函数,梯度下降 标量和矩阵形式的数学推导,pytorch真实能跑的代码案例以及模型,数据,预测结果的可视化展示, 模型应用场景和优缺点,及如何改进解决及改进方法数据推…...

Python实现AWS Fargate自动化部署系统

一、背景介绍 在现代云原生应用开发中,自动化部署是提高开发效率和保证部署质量的关键。AWS Fargate作为一项无服务器计算引擎,可以让我们专注于应用程序开发而无需管理底层基础设施。本文将详细介绍如何使用Python实现AWS Fargate的完整自动化部署流程。 © ivwdcwso (ID…...

国产编辑器EverEdit - 上下翻滚不迷路(历史编辑位置、历史光标位置回溯功能)

1 光标位置跳转 1.1 应用场景 某些场景下,用户从当前编辑位置跳转到别的位置查阅信息,如果要快速跳转回之前编辑位置,则可以使用光标跳转相关功能。 1.2 使用方法 1.2.1 上一个编辑位置 跳转到上一个编辑位置,即文本修改过的位…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

生成 Git SSH 证书

🔑 1. ​​生成 SSH 密钥对​​ 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" ​​参数说明​​: -t rsa&#x…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂&#xff…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日,2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席,并作《智能体在安全领域的应用实践》主题演讲,分享了在智能体在安全领域的突破性实践。他指出,百度通过将安全能力…...

[Java恶补day16] 238.除自身以外数组的乘积

给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...

Java多线程实现之Thread类深度解析

Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...

AI,如何重构理解、匹配与决策?

AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...

JS设计模式(4):观察者模式

JS设计模式(4):观察者模式 一、引入 在开发中,我们经常会遇到这样的场景:一个对象的状态变化需要自动通知其他对象,比如: 电商平台中,商品库存变化时需要通知所有订阅该商品的用户;新闻网站中&#xff0…...