当前位置: 首页 > article >正文

【kafka系列】Kafka事务的实现原理

目录

1. 事务核心组件

1.1 幂等性生产者(Idempotent Producer)

1.2 事务协调器(TransactionCoordinator)

1.3 事务日志(Transaction Log)

2. 事务执行流程

2.1 事务初始化

2.2 发送消息

2.3 事务提交(两阶段提交)

2.4 事务完成

3. 消费者事务隔离

3.1 隔离级别

3.2 实现机制

4. 异常处理与容错

4.1 生产者宕机

4.2 协调器宕机

4.3 Broker宕机

5. 关键源码解析

5.1 事务协调器核心逻辑

5.2 两阶段提交实现

5.3 消费者过滤未提交消息

6. 事务配置与使用

6.1 生产者配置

6.2 消费者配置

7. 事务性能与限制

总结


  1. 幂等生产者:通过ProducerIDSequenceNumber去重,避免消息重复(源码见ProducerStateManager)。
  2. 事务协调器(TransactionCoordinator)
    • 每个事务绑定一个Coordinator,处理BEGIN_TRANSACTIONCOMMIT/ABORT请求。
    • 事务状态存储在内部Topic __transaction_state中(通过TransactionStateManager管理)。
  1. 两阶段提交
    • 阶段1:标记事务为“预提交”,写入所有参与分区的数据。
    • 阶段2:写入COMMIT标记到事务日志,消费者仅可见已提交的事务消息。

Kafka事务机制通过幂等性生产者事务协调器(TransactionCoordinator)两阶段提交(2PC) 实现跨分区的原子性写入,确保消息要么全部提交,要么全部丢弃。以下是核心实现机制:


1. 事务核心组件

1.1 幂等性生产者(Idempotent Producer)
  • 作用:确保单分区内消息不重复。
  • 实现机制
    • PID(Producer ID):每个生产者实例唯一,由Broker分配。
    • Sequence Number:每个消息的单调递增序列号,Broker校验序列号连续性。
    • 源码类ProducerStateManager(管理PID与序列号)。
1.2 事务协调器(TransactionCoordinator)
  • 作用:管理事务生命周期,协调事务提交或中止。
  • 实现机制
    • 每个事务绑定一个协调器(通过事务ID哈希选择Broker)。
    • 维护事务状态机(TransactionState),存储在内部Topic __transaction_state
    • 源码类TransactionCoordinatorTransactionStateManager
1.3 事务日志(Transaction Log)
  • 作用:持久化事务状态,防止协调器宕机后数据丢失。
  • 存储位置:内部Topic __transaction_state,每个分区对应一个协调器。
  • 数据格式:事务ID、PID、状态(PrepareCommitCompleted等)、超时时间。

2. 事务执行流程

2.1 事务初始化
  1. 生产者初始化事务
    • 调用initTransactions(),向协调器注册事务ID,获取PID。
    • 协调器在__transaction_state中记录事务元数据。
2.2 发送消息
  1. 发送事务消息
    • 生产者发送消息时携带PID、序列号、事务ID。
    • Broker将消息写入日志,但标记为未提交(对消费者不可见)。
2.3 事务提交(两阶段提交)
  • 阶段1:Prepare Commit
    生产者向协调器发送EndTxnRequest,协调器将事务状态置为PrepareCommit,并持久化到事务日志。
  • 阶段2:Commit Markers写入
    协调器向所有涉及的分区Leader发送WriteTxnMarkers请求,Leader在日志中写入事务提交标记(Control Batch)。
2.4 事务完成
  • Broker将事务消息标记为已提交,消费者可读取(需配置isolation.level=read_committed)。

3. 消费者事务隔离

3.1 隔离级别
  • read_committed:仅消费已提交的事务消息(跳过未提交的Control Batch)。
  • read_uncommitted:消费所有消息(默认模式,不保证事务原子性)。
3.2 实现机制
  • 消费者在拉取消息时,Broker根据隔离级别过滤未提交的事务消息。
  • 源码逻辑KafkaConsumerfetcher模块解析Control Batch,决定是否跳过消息。

4. 异常处理与容错

4.1 生产者宕机
  • 事务超时(transaction.timeout.ms):协调器自动中止未完成的事务。
  • 新生产者实例需重新初始化事务,旧事务状态由协调器清理。
4.2 协调器宕机
  • 事务日志持久化在__transaction_state,新协调器加载日志恢复状态。
4.3 Broker宕机
  • 副本机制保证事务日志和消息日志的高可用,Leader切换后继续处理事务。

5. 关键源码解析

5.1 事务协调器核心逻辑
//事务状态管理(TransactionStateManager)
public class TransactionStateManager {// 持久化事务状态到__transaction_statedef appendTransactionToLog(transactionState: TransactionState): Unit = {val records = List(new SimpleRecord(transactionState.toBytes))transactionLog.append(records)}
}
5.2 两阶段提交实现
// 协调器发送提交标记(TransactionCoordinator)
private def sendTxnMarkers(transactionState: TransactionState): Unit = {
// 向所有分区Leader发送WriteTxnMarkersRequest
transactionState.partitions.foreach { partition =>val request = new WriteTxnMarkersRequest.Builder(partition, Commit)sendRequestToLeader(request)}
}
5.3 消费者过滤未提交消息
// 消费者拉取消息过滤(ConsumerFetcherThread)
private def filterCommittedMessages(records: ConsumerRecords): ConsumerRecords = {
if (isolationLevel == IsolationLevel.READ_COMMITTED) {records.filter(_.controlBatchType != ControlBatchType.ABORT)
} else {records
}
}

6. 事务配置与使用

6.1 生产者配置
props.put("enable.idempotence", "true");  // 开启幂等性
props.put("transactional.id", "my-tx-id"); // 必须设置事务ID
props.put("transaction.timeout.ms", "60000"); // 事务超时时间
6.2 消费者配置
props.put("isolation.level", "read_committed"); // 仅消费已提交消息

7. 事务性能与限制

  • 性能开销:事务引入两阶段提交和日志持久化,吞吐量下降约20%-30%。
  • 限制
    • 事务仅支持单会话(一个生产者实例)。
    • 事务消息的消费者必须使用Kafka Consumer API(不支持旧版基于ZooKeeper的消费者)。

总结

Kafka事务通过以下机制实现跨分区的原子性:

  1. 幂等性生产者:避免单分区消息重复。
  2. 事务协调器与两阶段提交:确保所有分区要么全部提交,要么全部回滚。
  3. 事务日志持久化:保障协调器故障恢复后状态一致。
  4. 消费者隔离级别:控制事务消息的可见性。

正确配置后,Kafka事务可支持金融级场景的精确一次(Exactly-Once)语义

相关文章:

【kafka系列】Kafka事务的实现原理

目录 1. 事务核心组件 1.1 幂等性生产者(Idempotent Producer) 1.2 事务协调器(TransactionCoordinator) 1.3 事务日志(Transaction Log) 2. 事务执行流程 2.1 事务初始化 2.2 发送消息 2.3 事务提…...

HarmonyOS NEXT网络状态监听HTTP和RCP请求网络

当我们在HarmonyOS NEXT中开发的应用,基本上都会使用网络请求,从服务端获取数据在客户端显示或者供用户交互,有时候网络发生变化时,我们需要做一些相应的操作,接下来我们一起来了解下在HarmonyOS NEXT下如何监听网络状…...

2025.2.16

Web [GDOUCTF 2023]泄露的伪装: 点进去看就是装神弄鬼,那就直接扫描 果然有东西 第一个是php代码 第二个是个文件 访问发现是一样的 分析一下:使用 file_get_contents($cxk) 函数读取 $cxk 变量中指定的 URL 或文件的内容。 如果读取的内…...

使用Java爬虫获取京东JD.item_sku API接口数据

在电商领域,商品的SKU(Stock Keeping Unit)信息是运营和管理的关键数据。SKU信息包括商品的规格、价格、库存等,对于商家的库存管理、定价策略和市场分析至关重要。京东作为国内领先的电商平台,提供了丰富的API接口&am…...

【ISO 14229-1:2023 UDS诊断(会话控制0x10服务)测试用例CAPL代码全解析③】

ISO 14229-1:2023 UDS诊断【会话控制0x10服务】_TestCase03 作者:车端域控测试工程师 更新日期:2025年02月15日 关键词:UDS诊断、0x10服务、诊断会话控制、ECU测试、ISO 14229-1:2023 TC10-003测试用例 用例ID测试场景验证要点参考条款预期…...

MySQL安装MySQL服务时提示Install-Remove of the Service Denied

文章目录 问题描述排查1.字面意思2.搜索引擎3.官方文档4.源码 处理方法相关扩展 问题描述 MySQL安装MySQL服务时提示Install-Remove of the Service Denied! 详细报错如下: C:\Users\荷塘月色>net start mysql 服务名无效。请键入 NET HELPMSG 2185 以获得更多…...

python学opencv|读取图像(六十五)使用cv2.boundingRect()函数实现图像轮廓矩形标注

【1】引言 前序学习进程中,已经使用cv2.findContours()函数cv2.drawContours()函数实现图像轮廓识别和标注,这种标注沿着图像的轮廓进行,比较细致。相关文章链接为: python学opencv|读取图像(六十四)使用…...

haproxy实现MySQL服务器负载均衡

1.环境准备 准备好下面四台台服务器: 主机名IP角色open-Euler1192.168.121.150mysql-server1openEuler-2192.168.121.151mysql-server2openEuler-3192.168.121.152clientRocky8-1192.168.121.160haproxy 2.mysql服务器配置 1.下载mariadb #下载mariadb [rootop…...

open3d绘制平面

在Open3D中绘制平面通常涉及到创建一个平面模型并将其可视化。Open3D是一个开源库,主要用于3D数据的处理和可视化,但它主要用于3D数据的处理,并不直接支持绘制2D平面。如果你想在Open3D中“绘制”一个平面,你可以通过以下几种方法来实现类似的效果: 方法1:使用o3d.geome…...

【ISO 14229-1:2023 UDS诊断全量测试用例清单系列:第十四节】

ISO 14229-1:2023 UDS诊断服务测试用例全解析(CommunicationControl_0x28服务) 作者:车端域控测试工程师 更新日期:2025年02月14日 关键词:UDS协议、0x28服务、通信控制、ISO 14229-1:2023、ECU测试 一、服务功能概述…...

C语言简单练习题

文章目录 练习题一、计算n的阶乘bool类型 二、计算1!2!3!...10!三、计算数组arr中的元素个数二分法查找 四、动态打印字符Sleep()ms延时函数system("cls")清屏函数 五、模拟用户登录strcmp()函数 六、猜数字小游戏产生一个随机数randsrandRAND_MAX时间戳time() 示例 …...

每日一题——把数字翻译成字符串

把数字翻译成字符串 题目描述示例示例1示例2 题解动态规划代码实现复杂度分析 总结 题目描述 有一种将字母编码成数字的方式:‘a’->1, ‘b’->2, … , ‘z’->26。 现在给一串数字,返回有多少种可能的译码结果。 数据范围:字符串…...

C语言之easyX

目录 概要 easyX整体架构 图形绘制 画布宽高 圆形 图片的贴图 加载图像 游戏框架 概要 easyX是一个轻量级的图形库,用于在Windows平台上进行简单的2D图形绘制。它提供了一组简单易用的函数,可以方便地绘制基本的图形元素,如线条、矩形、圆形…...

安卓基础(持续更新的笔记)

为什么要这样: // 创建请求体RequestBody body RequestBody.create(MediaType.parse("application/json; charsetutf-8"),jsonObject.toString()); jsonObject 就包含了一个 JSON 数据,它其实就是: {"name": "张…...

10. Hbase Compaction命令

一. 什么是Compaction 在 HBase 中,频繁进行数据插入、更新和删除操作会生成许多小的 HFile,当 HFile 数量增多时,会影响HBase的读写性能。此外,垃圾数据的存在也会增加存储需求。因此,定期进行 Compact操作&#xff…...

在 UniApp 项目中设置多语言

在 UniApp 项目中设置多语言支持可以通过以下步骤实现: 1. 安装依赖 首先,你需要安装 vue-i18n 插件来处理多语言支持。 npm install vue-i18n --save2. 创建语言文件 在项目中创建一个 lang 文件夹,用于存放不同语言的翻译文件。例如&am…...

告别卡关!XSS挑战之旅全关卡通关思路详解

XSS挑战之旅 XSS测试思路Level1Level2Level3Level4Level5Level6Level7Level8Level9Level10Level11Level12Level13Level14Level15Level16Level17Level18Level19Level20免责声明: XSS测试思路 确定输入输出点: 寻找URL参数、表单输入、HTTP头(R…...

SpringCloud框架下的注册中心比较:Eureka与Consul的实战解析

摘要 在探讨SpringCloud框架中的两种注册中心之前,有必要回顾单体架构与分布式架构的特点。单体架构将所有业务功能集成在一个项目中,优点是架构简单、部署成本低,但耦合度高。分布式架构则根据业务功能对系统进行拆分,每个模块作…...

【Java】分布式锁Redis和Redisson

https://blog.csdn.net/weixin_44606481/article/details/134373900 https://www.bilibili.com/video/BV1nW421R7qJ Redis锁机制一般是由 setnx 命令实现,set if not exists,语法setnx key value,将key设置值为value,如果key不存在…...

Python的imutils库详细介绍

imutils 是一个专为简化OpenCV(计算机视觉库)常见操作而设计的Python工具库,提供了一系列便捷函数,使图像和视频处理更加高效和简洁。以下是对其功能、安装及用法的详细介绍: 1. 安装方法 通过pip安装: p…...

蓝桥杯 Java B 组之简单动态规划(爬楼梯、斐波那契数列)

Day 6:简单动态规划(爬楼梯、斐波那契数列) 动态规划(Dynamic Programming,简称 DP)是计算机科学中的一种算法设计思想,用来解决最优解问题,它的核心思想是将大问题分解为小问题&am…...

Hive增量迁移方案与实操PB级

客户一共1PB数据,每天新增10T,有些表只保留3天。 需要客户提供: a.tbl_size(大小GB) a.last_mtime(最新更新时间) a.tbl_ttl(保留时间) b.last_part_dt(分区值) b.last_part_size(最新分区大小) t_day(表更新规律,t几) 因为目前…...

【练习】【双指针】力扣热题100 283. 移动零

题目 给定一个数组 nums,编写一个函数将所有 0 移动到数组的末尾,同时保持非零元素的相对顺序。 请注意 ,必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2: 输入: nums [0] 输出…...

Python 依赖管理的革新——Poetry 深度解析

引言 在 Python 生态中,依赖管理一直是开发者关注的重要话题。从最初的 pip 和 virtualenv,到后来的 pipenv,Python 依赖管理工具不断进化。而近年来,Poetry 作为一款集成包管理和虚拟环境管理的新兴工具,逐渐获得了广…...

从零开始学Python爬虫:(二)使用基本库urllib(下)

一、异常处理 关于某些情况下,可能会出现异常,如果不处理它们,会发生很多错误。 而urllib库提供了error模块来处理这些异常,该模块包括以下功能: (1)URLError 该类含有一个属性reason&#x…...

电商分布式场景中如何保证数据库与缓存的一致性?实战方案与Java代码详解

文章目录 一、缓存一致性问题的本质写后读不一致:更新数据库后,缓存未及时失效并发读写竞争:多个线程同时修改同一数据缓存与数据库事务不同步:部分成功导致数据错乱 二、5大核心解决方案与代码实现方案1:延迟双删策略…...

kamailio中Core Cookbook 核心配置手册

Core Cookbook 核心配置手册 版本: Kamailio SIP 服务器 v6.0.x (稳定版) 概述 本教程收集了 Kamailio 核心导出到配置文件的功能和参数。 注意: 本页参数未按字母顺序排列。 结构 kamailio.cfg 的结构可分为三部分: 全局参数模块设置路由块 建议按此顺序排列以保持清晰…...

【嵌入式Linux应用开发基础】read函数与write函数

目录 一、read 函数 1.1. 函数原型 1.2. 参数说明 1.3. 返回值 1.4. 示例代码 二、write 函数 2.1. 函数原型 2.2. 参数说明 2.3. 返回值 2.4. 示例代码 三、关键注意事项 3.1 部分读写 3.2 错误处理 3.3 阻塞与非阻塞模式 3.4 数据持久化 3.5 线程安全 四、嵌…...

一、OpenSM 架构部署及原理详解

目录 一、OpenSM 架构与核心功能 1. InfiniBand 子网管理器(SM)的作用 2. OpenSM 的架构 二、OpenSM 部署步骤(以 Linux 为例) 1. 安装依赖与软件包 2. 配置文件 3. 启动 OpenSM 服务 4. 验证部署 5. 高可用性配置(可选) 三、OpenSM 工作原理详解 1. 拓扑发现(…...

2526考研资料分享 百度网盘

通过网盘分享的文件:01、2026【考研数学】 链接:https://pan.baidu.com/s/1PwMzp_yCYqjBqa7492mP3w?pwd98wg 提取码:98wg--来自百度网盘超级会员v3的分享 通过网盘分享的文件:01、2026【考研政治】 链接:https://pan.baidu.com/s/1PwMzp_yCYqjBqa7492…...