Kafka 深入客户端 — 事务
Kafka 事务确保了数据在写入Kafka时的原子性和一致性。
1 幂等
幂等就是对接口的多次调用所产生的结果和调用一次是一致的。
Kafka 生产者在进行重试的时候可能会写入重复的消息,开启幂等性功能后就可以避免这种情况。将生产者客户端参数enable.idempotence设置为true即可。
1.1 实现原理
Kafka 引入了producer id(简称PID)和序列号(sequence number)这两个概念。分别对应v2版的日志格式中RecordBatch的producer id 和first sequence这两个字段。
每个新的生产者实例在初始化时都会由broker分配一个PID(对于用户不可见)。
对于每个PID,消息发送到每个分区都有对应的序列号,序列号从0开始单调递增。生产者每发送一条消息,就会将<PID,分区>对应的序列号的值加1。
1.1.1 序列号值比对
broker端会在内存中为每一对<PID,分区>维护一个序列号(SN_old),对于收到的每一条消息,将比对它的序列号值(SN_new)。以下有三种情况:
SN_new < SN_old+1:消息被重复写入,broker可以直接将其丢弃。
SN_new = SN_old+1:是新消息,且消息没有丢失。
SN_new > SN_old+1:消息可能丢失,生产者会抛出OutOfOlderSequenceException异常。
1.1.2 局限性
Kafka 的幂等只能保证单个生产者会话(session)中单分区的幂等。
2 事务
事务可以保证跨生产者会话的消息幂等发送及新生产者实例及跨生产者会话的事务恢复。
前者指具有相同事务id的新生产者示例被创建且工作的时候,旧的且拥有相同事务id的生产者实例将不再工作。
后者指某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交,要么被终止。使新的生产者实例从一个正常的状态开始工作。
2.1 实现原理

图 生产者及事务协调器初始化事务到提交事务的流程
开启事务,生产者客户端需要提供唯一的transactionalId(通过客户端参数transactional.id来设置)。并且需要开启幂等特性。
2.1.1 查找事务协调器
KafkaProducer的initTransactions()方法初始化事务。
TransactionCoordinator事务协调器负责分配PID和管理事务。
生产者首先要找到对应的事务协调器所在broker节点。发送FindCoorinatorRequest请求,Kafka根据请求体中的coordinator_key(事务id)来查找节点(具体方式是根据事务id的哈希值计算其在主题__transaction_state中的分区编号),算法如下:
分区编号 = 事务id的哈希值 % transaction_state的分区数
根据分区编号,寻找此分区leader副本所在的broker节点,并将节点信息返回给生产者。
2.1.2 获取PID
生产者在找到事务协调器的节点后,发送InitProducerIdRequest请求(如果未开启事务特性而只开启幂等性,这个请求可以方式给任意broker,否则只会发给事务协调器所在的broker)来获取PID。
事务协调器生成PID后,会把事务id和对应的PID以消息的形式保存到主题__transaction_state中。
该请求还会触发协调器执行以下任务:
1)增加该PID对应的producer_epoch(单调递增)。具有相同PID但producer_epoch小于该值的其他生产者新开启的事务会被拒绝。
producer_epoch 确保了相同事务id任何时刻只有一个生产者实例。
具有相同事务id的新生产者实例被创建且工作的时,旧的且拥有相同事务id的生产者实例将不再工作。
2)恢复(Commit)或终止(Abort)之前生产者未完成的事务。
2.1.3 开启事务
KafkaProducer的beginTransaction()方法开启一个事务。调用该方法后,生产者本地会标记已经开启了一个新的事务。只有在生产者发送第一条消息之后事务协调器才会认为该事务已开启。
2.1.4 发送消息
在生产者给一个新的分区发送数据库之前,它需要先向事务协调器发送AddPartitionsToTxnRequest请求,让事务协调器将<transactionId,TopicPartition>的对应关系存储在主题__transaction_state中。
如果该分区是对应事务中的第一个分区,那么事务协调器还会启动会该事务的计时。
随后生产者通过ProduceRequest请求发送消息(ProducerBatch)到用户自定义主题中。和普通消息不同的是,ProducerBatch会包含实质的PID、producer_epoch和sequence_number。
2.1.5 提交或终止事务
调用KafkaProducer的commitTransaction()或abortTransaction()方法来结束当前的事务。
生产者会向事务协调器发送EndTxnRequest请求。协调器收到请求后会执行如下操作:
- 事务协调器将PREPARE_COMMIT或PREPARE_ABORT消息写入主题__transaction_state。
- 协调器向事务中各个分区的leader节点发送WriteTxnMarkersRequest请求,当leader节点收到请求后,会在相应分区中写入控制消息来标识事务的终结。它和普通消息一样存储在日志文件中。
- 事务协调器将最终的COMPLETE_COMMIT或COMPLETE_ABORT信息写入主题__transaction_state以表明当前事务已结束。
此时可以删除主题__transaction_state中所有关于该事务的消息(将相应的消息设置为墓碑消息即可)。
2.1.6 控制消息ControlBatch
各个分区的leader收到事务协调器的WriteTxnMarkersRequest请求后,会在相应的分区写入控制消息(ControlBatch)。来标识事务的终结。它和普通消息一样存储在日志文件中。不同点在于RecordBatch中某些字段的值。

图 控制消息日志格式
key 中的type表示控制类型:0 ABORT,1 COMMIT。
value中的coordinator_epoch 表示协调器的纪元(版本)。
2.2 消费者与事务
Kafka并不能保证已提交的事务中的所有消息都能被消费:
- 事务中的某些消息可能被清理(压缩或删除)。
- 消费者通过seek()方法访问任意offset的消息,从而可能遗漏事务中的部分消息。
- 消费者在消费时可能没有分配到事务内的所有分区。
2.2.1 消费者的隔离级别
消费端的参数isolation.level,默认值为“read_uncommitted”表示可以消费未提交的事务,而“read_committed”表示只能消费已提交的事务。
如果隔离级别为“read_committed”,生产者开启事务,并发送3条消息,在生产者执行commitTransaction()或abortTransaction()方法前,KafkaConsumer看不到这些消息,但是其内部会缓存消息,直到生产者执行commitTransaction(),它才会将消息推送给消费端应用;如果生产者执行abortTransaction(),那它就会丢弃这些消息。
相关文章:
Kafka 深入客户端 — 事务
Kafka 事务确保了数据在写入Kafka时的原子性和一致性。 1 幂等 幂等就是对接口的多次调用所产生的结果和调用一次是一致的。 Kafka 生产者在进行重试的时候可能会写入重复的消息,开启幂等性功能后就可以避免这种情况。将生产者客户端参数enable.idempotence设置为…...
TensorFlow 2基本功能和示例代码
TensorFlow 2.x 是 Google 开源的一个深度学习框架,广泛用于构建和训练机器学习模型。 一、核心特点 1. Keras API 集成 TensorFlow 2.x 将 Keras 作为其核心 API,简化了模型的构建和训练流程。Keras 提供了高层次的 API,易于使用和理解。…...
ZZNUOJ(C/C++)基础练习1011——1020(详解版)
1011 : 圆柱体表面积 题目描述 输入圆柱体的底面半径r和高h,计算圆柱体的表面积并输出到屏幕上。要求定义圆周率为如下宏常量 #define PI 3.14159 输入 输入两个实数,表示圆柱体的底面半径r和高h。 输出 输出一个实数,即圆柱体的表面积&…...
Python 字典:快速掌握高效的数据存储方式
文章目录 一、什么是字典?字典的定义二、字典的基本操作1. 访问字典的值2. 修改字典中的值3. 添加新的键值对4. 删除键值对5. 获取字典长度三、字典的遍历1. 遍历键2. 遍历值3. 遍历键值对四、字典的常用方法1. `keys()`:获取所有键2. `values()`:获取所有值3. `items()`:获…...
Baklib探索内容中台的核心价值与实施策略
内容概要 在数字化转型的背景下,内容中台逐渐成为企业数字化策略中的关键组成部分。内容中台是一个集成的内容管理体系,旨在打破信息孤岛,使内容能够在各个业务部门和平台之间高效流通。这种管理体系不仅能够提升内容的生产效率,…...
网络安全攻防实战:从基础防护到高级对抗
📝个人主页🌹:一ge科研小菜鸡-CSDN博客 🌹🌹期待您的关注 🌹🌹 引言 在信息化时代,网络安全已经成为企业、政府和个人必须重视的问题。从数据泄露到勒索软件攻击,每一次…...
论文阅读(十三):复杂表型关联的贝叶斯、基于系统的多层次分析:从解释到决策
1.论文链接:Bayesian, Systems-based, Multilevel Analysis of Associations for Complex Phenotypes: from Interpretation to Decision 摘要: 遗传关联研究(GAS)报告的结果相对稀缺,促使许多研究方向。尽管关联概念…...
13.zookeeper开机自启动配置
要在Linux(RHEL7.7)系统中设置zookeeper开机自启动,可以创建一个系统服务单元文件。以下是为详细配置部署,假设你已经安装了zookeeper并且可以通过zkServer.sh命令启动它。 1.进入/lib/systemd/system目录 命令: cd /lib/systemd/system [root@rhel77 system]# cd /lib/…...
“““【运用 R 语言里的“predict”函数针对 Cox 模型展开新数据的预测以及推理。】“““
主题与背景 本文主要介绍了如何在R语言中使用predict函数对已拟合的Cox比例风险模型进行新数据的预测和推理。Cox模型是一种常用的生存分析方法,用于评估多个因素对事件发生时间的影响。文章通过具体的代码示例展示了如何使用predict函数的不同参数来获取生存概率和…...
Oracle Primavera P6 最新版 v24.12 更新 1/2
目录 引言 P6 PPM 更新内容 1. 在提交更新基线前预览调整 2. 快速轻松地取消链接活动 3. 选择是否从 XER 文件导入责任经理 4. 提高全局变更报告的清晰度 5. 将整个分层代码值路径导出到 CPP 6. 里程碑活动支持所有关系类型 6. 时间表批准 7. 性能改进 8. 安装改进 …...
AI大模型开发原理篇-2:语言模型雏形之词袋模型
基本概念 词袋模型(Bag of Words,简称 BOW)是自然语言处理和信息检索等领域中一种简单而常用的文本表示方法,它将文本看作是一组单词的集合,并忽略文本中的语法、词序等信息,仅关注每个词的出现频率。 文本…...
JavaWeb学习-SpringBotWeb开发入门(HTTP协议)
(一)SpringBotWeb开发步骤 (1)创建springboot工程,并勾选开发相关依赖 (2)定义HelloController类,添加方法hello,并添加注解 (3)运行测试 (二)HTTP入门概述 创建请求页面 package com.itheima.demo3; /*请求处理类,加上注解标识为请求处理类*/import org.spr…...
网站结构优化:加速搜索引擎收录的关键
本文来自:百万收录网 原文链接:https://www.baiwanshoulu.com/9.html 网站结构优化对于加速搜索引擎收录至关重要。以下是一些关键策略,旨在通过优化网站结构来提高搜索引擎的抓取效率和收录速度: 一、合理规划网站架构 采用扁…...
本地部署deepseek模型步骤
文章目录 0.deepseek简介1.安装ollama软件2.配置合适的deepseek模型3.安装chatbox可视化 0.deepseek简介 DeepSeek 是一家专注于人工智能技术研发的公司,致力于打造高性能、低成本的 AI 模型,其目标是让 AI 技术更加普惠,让更多人能够用上强…...
【deepseek】deepseek-r1本地部署-第二步:huggingface.co替换为hf-mirror.com国内镜像
一、背景 由于国际镜像国内无法直接访问,会导致搜索模型时加载失败,如下: 因此需将国际地址替换为国内镜像地址。 二、操作 1、使用vscode打开下载路径 2、全局地址替换 关键字 huggingface.co 替换为 hf-mirror.com 注意:务…...
sunrays-framework配置重构
文章目录 1.common-log4j2-starter1.目录结构2.Log4j2Properties.java 新增两个属性3.Log4j2AutoConfiguration.java 条件注入LogAspect4.ApplicationEnvironmentPreparedListener.java 从Log4j2Properties.java中定义的配置读取信息 2.common-minio-starter1.MinioProperties.…...
Spark Streaming的背压机制的原理与实现代码及分析
Spark Streaming的背压机制是一种根据JobScheduler反馈的作业执行信息来动态调整Receiver数据接收率的机制。 在Spark 1.5.0及以上版本中,可以通过设置spark.streaming.backpressure.enabled为true来启用背压机制。当启用背压机制时,Spark Streaming会自…...
刷题记录 贪心算法-2:455. 分发饼干
题目:455. 分发饼干 难度:简单 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],这是能让孩子们满足胃口的饼干的最小尺寸&a…...
360大数据面试题及参考答案
数据清理有哪些方法? 数据清理是指发现并纠正数据文件中可识别的错误,包括检查数据一致性,处理无效值和缺失值等。常见的数据清理方法有以下几种: 去重处理:数据中可能存在重复的记录,这不仅会占用存储空间,还可能影响分析结果。通过对比每条记录的关键属性,若所有关键…...
【大模型】Ollama+AnythingLLM搭建RAG大模型私有知识库
文章目录 一、AnythingLLM简介二、搭建本地智能知识库2.1 安装Ollama2.2 安装AnythingLLM 参考资料 一、AnythingLLM简介 AnythingLLM是由Mintplex Labs Inc.开发的一个全栈应用程序,是一款高效、可定制、开源的企业级文档聊天机器人解决方案。AnythingLLM能够将任…...
深入MapReduce——从MRv1到Yarn
引入 我们前面篇章有提到,和MapReduce的论文不太一样。在Hadoop1.0实现里,每一个MapReduce的任务并没有一个独立的master进程,而是直接让调度系统承担了所有的worker 的master 的角色,这就是Hadoop1.0里的 JobTracker。在Hadoop1…...
arkui-x 前端布局编码模板
build() {Column() {Row() {// 上侧页面布局实现}// 下侧页面布局实现}.width(Const.THOUSANDTH_1000).height(Const.THOUSANDTH_1000).justifyContent(FlexAlign.SpaceBetween).backgroundImage($r(app.media.background_xxx)).backgroundImageSize(ImageSize.Cover).backgrou…...
代理模式 -- 学习笔记
代理模式学习笔记 什么是代理? 代理是一种设计模式,用户可以通过代理操作,而真正去进行处理的是我们的目标对象,代理可以在方法增强(如:记录日志,添加事务,监控等) 拿一…...
sem_init的概念和使用案例
sem_init 是 POSIX 线程库中用于初始化未命名信号量(unnamed semaphore)的函数,常用于多线程或多进程间的同步。以下是其概念和使用案例的详细说明: 概念 函数原型: #include <semaphore.h>int sem_init(sem_t …...
JVM_类的加载、链接、初始化、卸载、主动使用、被动使用
①. 说说类加载分几步? ①. 按照Java虚拟机规范,从class文件到加载到内存中的类,到类卸载出内存为止,它的整个生命周期包括如下7个阶段: 第一过程的加载(loading)也称为装载验证、准备、解析3个部分统称为链接(Linking)在Java中数据类型分为基本数据类型和引用数据…...
ProfibusDP主机与从机交互
ProfibusDP 主机SD2索要数据下发:68 08 F7 68 01 02 03 21 05 06 07 08 1C 1668:SD2 08:LE F7:LEr 68:SD2 01:目的地址 02:源地址 03:FC_CYCLIC_DATA_EXCHANGE功能码 21:数据地址 05,06,07,08&a…...
Java设计模式:结构型模式→组合模式
Java 组合模式详解 1. 定义 组合模式(Composite Pattern)是一种结构型设计模式,它允许将对象组合成树形结构以表示“部分-整体”的层次。组合模式使得客户端能够以统一的方式对待单个对象和对象集合的一致性,有助于处理树形结构…...
【福州市AOI小区面】shp数据学校大厦商场等占地范围面数据内容测评
AOI城区小区面样图和数据范围查看: — 字段里面有name字段。分类比较多tpye:每个值代表一个类型。比如字段type中1549代表小区住宅,1563代表学校。小区、学校等占地面积范围数据 —— 小区范围占地面积面数据shp格式 无偏移坐标,只…...
【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR
【Python实现机器遗忘算法】复现2023年TNNLS期刊算法UNSIR 1 算法原理 Tarun A K, Chundawat V S, Mandal M, et al. Fast yet effective machine unlearning[J]. IEEE Transactions on Neural Networks and Learning Systems, 2023. 本文提出了一种名为 UNSIR(Un…...
基于SpringBoot的阳光幼儿园管理系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…...
