Kafka 幂等性与事务
文章目录
- 幂等性
- 实现机制
- 配置使用
- 局限性
- 事务
- 使用场景
- 配置使用
- 实现机制
- 事务过程
- 事务初始化
- 事务开始
- 事务提交
- 事务取消
- 事务消费
幂等性
Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。
实现机制
通过引入ProducerID和SequenceNumber来实现Broker对于每条接收的消息都会验证PID,同时会检查SeqNumber是否比Broker维护的SeqNumber值严格+1,只有符合要求的才是合法的,其他情况都会丢弃。
- ProducerID:Producer初始化时由Broker分配,作为每个Producer会话的唯一标识
- SequenceNumber:Producer发送的每条消息的标识(更准确地说是每一个消息批次,即ProducerBatch),从0开始单调递增。Broker根据它来判断写入的消息是否可接受。
配置使用
Producer设置
- enable.idempotence=true:表示使用幂等性生产者。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
- acks=all
局限性
- 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
- 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态无法同步。
事务
Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。
使用场景
- 对多个 Topic 、多个 Partition 的原子性的写入
- Consumer-Transform-Producer模式下,将消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。避免重复消费
配置使用
Producer设置
- transactional.id:事务id,类型为string,客户端自定义
Consumer设置
- isolation.level:read_committed。事务隔离级别,默认为空。
实现机制
引入以下组件:
- Transactional Coordinator:负责管理和协调事务。每个Kafka broker上都会运行一个Transactional Coordinator实例。
- Transaction Log:这是一个内部Topic(__transaction_state),用于存储事务的元数据信息,包括事务的状态、参与的分区等。
- Control Messages:由Transactional Coordinator写入topic的一种特殊消息,但对于Consumer来说不可见。是用来让Broker告知consumer拉取的消息是否已被原子性提交。
- TransactionId:事务ID,类型为String字符串,由Producer客户端自定义。提供稳定不变的ID意义在于可以在异常后重启从断点进行恢复。
- Epoch:单调递增的事务Id标识,可以保证具有相同TransactionId的Producer,旧的无法写入。
- ProducerID、SequenceNumber:标记生产者、消息的唯一标识
事务过程
事务初始化
所有的事务操作都需要Transactional Coordinator管理和协调
1.获取Transactional Coordinator地址
Producer发送携带Transactionid的请求到任意一个Broker,Broker对获取到Transactionid做hashcode后对topic(__transaction_state)默认分区(50)取模,所得分区主副本所在的Broker作为TransactionalCoordinator
2.获取ProducerID和Epoch
Producer对TransactionalCoordinator发送请求,此时会分配ProducerId及Epoch,并将信息持久化。最后向Producer返回ProducerId+Epoch。之后的每次请求都会携带ProducerId和Epoch。
(__transaction_state中信息格式为key-value,key为Transactionid,value包含ProducerID、Epoch、事务和分区信息等)
事务开始
3.消息写入
Producer开始事务写入,先将本地事务状态更改为IN_TRANSACTION,然后发送消息之前,Producer会将topic-partition相关的信息发送给TransactionalCoordinator,由它完成持久化(更新__transaction_state)。之后Producer开始对相关topic-partition发送消息
事务提交
4.Producer触发事务提交
Producer首先发送请求给TransactionalCoordinator,由它更新__transaction_state将事务状态更改为PrepareCommit,之后返回成功响应给Producer。TransactionalCoordinator发送Control Messages(会持续重试,直到成功)给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteCommit。
事务取消
5.Producer或Coordinator触发事务取消
事物取消可以由Producer发起取消或者TransactionalCoordinator检测到事务超时而取消,此时均会更新__transaction_state更改为PrepareAbort,之后返回成功响应给Producer。TransactionalCoordinator发送Control Messages给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteAbort。
取消的事务会记录在.txnindex文件中,主要包含以下信息:currentVersion、producerId、firstOffset(当前事务的开始offset)、lastOffset(当前事务的结束offset)、lastStableOffset(存储时的LSO)
事务消费
正常消费时
读隔离级别为 read-committed, 在内部会使用存储在topic-partition中的Control messgae,来过滤掉没有提交的消息。(回滚的消息也没有删除,只是在读数据时过滤该数据)
对于Consumer-Transform-Producer下,会通过groupId算出__consumer_offsets topic中对应的partition,然后加该partition的信息也加入到Transaction Log中,最终在统一取消或提交。同样也会将Control message写入__consumer_offsets对应的分区。
- 需要将enable.auto.commit设置为false
- 使用producer.sendOffsetsToTransaction()来提交offset

参考
https://z.itpub.net/article/detail/F86DD78AECAC4DEC92468DEFFEB4ED0D
https://www.cnblogs.com/hongdada/p/16945086.html
学习笔记之Kafka幂等和事务_transaction.state.log.replication.factor-CSDN博客
相关文章:
Kafka 幂等性与事务
文章目录 幂等性实现机制配置使用局限性 事务使用场景配置使用实现机制事务过程事务初始化事务开始事务提交事务取消事务消费 幂等性 Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。 实现机制 通过引…...
day2 Linux操作系统指令
思维导图 在家目录下创建目录文件,dir 1、dir下创建dir1和dir2 2、把当前目录下的所有文件拷贝到dir1中, 3、把当前目录下的所有脚本文件拷贝到dir2中 4、把dir2打包并压缩为dir2.tar.xz 5、再把dir2.tar.xz移动到dir1中 6、解压dir1中的压缩包 7、使用…...
AI一周重要会议和活动概览
一、小模型的曙光和机会之思辨高峰论坛 会议介绍:小模型的曙光和机会之思辨”高峰论坛暨第32期CSIG图像图形学科前沿讲习班于2025年1月3—4日在杭州举办,会议由中国图象图形学学会主办,中国图象图形学学会前沿科技论坛委员会承办。本次论坛设…...
重启ubuntu服务器,如何让springboot服务自动运行
文章目录 1. 使用 systemd 服务步骤: 2. 使用 cron 的 reboot 任务步骤: 3. 使用 init.d 脚本(适用于较旧版本)步骤: 推荐方案 为了确保在重启Ubuntu服务器后,让springboot的服务test.jar象 nohup java -ja…...
python系列教程237——启动扩展功能
朋友们,如需转载请标明出处:https://blog.csdn.net/jiangjunshow 声明:在人工智能技术教学期间,不少学生向我提一些python相关的问题,所以为了让同学们掌握更多扩展知识更好地理解AI技术,我让助理负责分享…...
U盘格式化工具合集:6个免费的U盘格式化工具
在日常使用中,U盘可能会因为文件系统不兼容、数据损坏或使用需求发生改变而需要进行格式化。一个合适的格式化工具不仅可以清理存储空间,还能解决部分存储问题。本文为大家精选了6款免费的U盘格式化工具,并详细介绍它们的功能、使用方法、优缺…...
循环神经网络(RNN)入门指南:从原理到实践
目录 1. 循环神经网络的基本概念 2. 简单循环网络及其应用 3. 参数学习与优化 4. 基于门控的循环神经网络 4.1 长短期记忆网络(LSTM) 4.1.1 LSTM的核心组件: 4.2 门控循环单元(GRU) 5 实际应用中的优化技巧 5…...
马原复习笔记
文章目录 前言导论物质实践人类社会资本主义社会主义共产主义后记 前言 一月二号下午四点多考试,很友好,不是早八,哈哈哈。之前豪言壮语和朋友说这次马原要全对,多做了几次测试之后,发现总有一些知识点是自己不知道的…...
Android Room 框架的初步使用
一、简介 Room 是一个强大的对象关系映射库,它允许你将 SQLite 数据库中的表映射到 Java 或 Kotlin 的对象(称为实体)上。你可以使用简单的注解(如 Entity、Dao 和 Database)来定义数据库表、数据访问对象(…...
什么是过度拟合和欠拟合?
在机器学习中,当一个算法的预测非常接近或者直接等于它的训练数据,导致不能够准确预测除了训练数据以外的数据,我们把这种情况称为过度拟合。算法能够非常接近甚至就是训练的数据,是个非常好的事,但是它不能准确预测除…...
DotnetSpider实现网络爬虫
1. 使用DotnetSpider框架 DotnetSpider是一个开源的、轻量、灵活、高性能、跨平台的分布式网络爬虫框架,适用于.NET平台。它可以帮助开发者快速实现网页数据的抓取功能。 1.1 安装DotnetSpider NuGet包 首先,你需要在你的.NET项目中安装DotnetSpider NuGet包。你可以通过…...
锐捷WLAN产品出货量排名第一!
摘要:2024年Q3锐捷WLAN产品出货量排名第一!锐捷多形态Wi-Fi 7产品重磅出击! 近日, IT市场研究和咨询公司IDC发布《IDC中国企业级WLAN市场跟踪报告,2024年Q3》。报告显示,锐捷WLAN产品在2024年Q3出货量位居行业首位。至此,锐捷WLAN产品在2024年的Q1、Q2、Q3均实现了市场出货量的…...
win32汇编环境下,对话框程序中生成listview列表控件,点击标题栏自动排序的示例
;把代码抄进radasm里面,可以直接编译运行。重要的地方加了备注。 ;这个有点复杂,重要的地方加了备注 ;以下是ASM文件 ;>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>…...
自动化文档处理:Azure AI Document Intelligence
Azure AI Document Intelligence支持多种文件格式,包括PDF、JPEG、PNG等。其核心功能是将这些文档按页进行内容提取,并转化为LangChain文档。其默认输出格式是Markdown,这使得文档可以通过MarkdownHeaderTextSplitter进行语义分片。您也可以使…...
【Maven】Maven打包机制详解
Maven打包的类型? 以下是几种常见的打包形式: 1、jar (Java Archive) 用途:用于包含 Java 类文件和其他资源(如属性文件、配置文件等)的库项目。特点: 可以被其他项目作为依赖引用。适合创建独立的应用程…...
Python 向量检索库Faiss使用
Faiss(Facebook AI Similarity Search)是一个由 Facebook AI Research 开发的库,它专门用于高效地搜索和聚类大量向量。Faiss 能够在几毫秒内搜索数亿个向量,这使得它非常适合于实现近似最近邻(ANN)搜索&am…...
pd.Timestamp接收的参数类型
pd.Timestamp() 是 Pandas 中用于表示单个日期时间的函数,它可以接受多种类型的参数。以下是 pd.Timestamp() 可以接受的主要参数类型,并举例说明: 1. 日期时间字符串(Date/Time String) pd.Timestamp() 可以接收标准…...
FOC控制原理-ADC采样时机
0、文章推荐 SimpleFOC移植STM32(五)—— 电流采样及其变换_极对数对电流采样的影响-CSDN博客 FOC 电流采样方案对比(单电阻/双电阻/三电阻) - 知乎 (zhihu.com) FOC中的三种电流采样方式,你真的会选择吗?…...
运行python程序报错 undefined symbol: ffi_type_uint32 的参考解决方法
文章目录 写在前面一、问题描述二、解决方法参考链接 写在前面 自己的测试环境: Ubuntu20.04 ROS-Noetic 一、问题描述 运行 python 程序出现如下问题: Traceback (most recent call last):File "<string>", line 1, in <module&…...
怎么使用阿里的docker国产镜像源
要使用 阿里云 Docker 镜像加速器,你需要先注册并获取加速器的 URL,然后将其配置到 Docker 的配置文件中。下面是具体的使用步骤: 步骤 1:登录阿里云控制台并获取镜像加速器 URL 登录阿里云控制台 打开 阿里云官网,并…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
企业如何增强终端安全?
在数字化转型加速的今天,企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机,到工厂里的物联网设备、智能传感器,这些终端构成了企业与外部世界连接的 “神经末梢”。然而,随着远程办公的常态化和设备接入的爆炸式…...
JVM虚拟机:内存结构、垃圾回收、性能优化
1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
