当前位置: 首页 > news >正文

Kafka 幂等性与事务

文章目录

  • 幂等性
    • 实现机制
    • 配置使用
    • 局限性
  • 事务
    • 使用场景
    • 配置使用
    • 实现机制
    • 事务过程
      • 事务初始化
      • 事务开始
      • 事务提交
      • 事务取消
      • 事务消费

幂等性

Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。

实现机制

通过引入ProducerID和SequenceNumber来实现Broker对于每条接收的消息都会验证PID,同时会检查SeqNumber是否比Broker维护的SeqNumber值严格+1,只有符合要求的才是合法的,其他情况都会丢弃。

  • ProducerID:Producer初始化时由Broker分配,作为每个Producer会话的唯一标识
  • SequenceNumber:Producer发送的每条消息的标识(更准确地说是每一个消息批次,即ProducerBatch),从0开始单调递增。Broker根据它来判断写入的消息是否可接受。

配置使用

Producer设置

  • enable.idempotence=true:表示使用幂等性生产者。当enable.idempotence配置为true时,acks必须配置为all。并且建议max.in.flight.requests.per.connection的值小于5。
  • acks=all

局限性

  • 只能保证 Producer 在单个会话内不丟不重 ,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性 ,当涉及多个 Topic-Partition 时,这中间的状态无法同步。

事务

Kafka 事务基于幂等性实现,通过事务机制,Kafka 可以实现对多个 Topic 、多个 Partition 的原子性的写入,即处于同一个事务内的所有消息,最终结果是要么全部写成功,要么全部写失败。

使用场景

  • 对多个 Topic 、多个 Partition 的原子性的写入
  • Consumer-Transform-Producer模式下,将消费者提交偏移量操作和生成者一系列生成消息的操作封装成一个原子操作。避免重复消费

配置使用

Producer设置

  • transactional.id:事务id,类型为string,客户端自定义

Consumer设置

  • isolation.level:read_committed。事务隔离级别,默认为空。

实现机制

引入以下组件:

  • Transactional Coordinator‌:负责管理和协调事务。每个Kafka broker上都会运行一个Transactional Coordinator实例。
  • Transaction Log‌:这是一个内部Topic(__transaction_state),用于存储事务的元数据信息,包括事务的状态、参与的分区等。
  • Control Messages:由Transactional Coordinator‌写入topic的一种特殊消息,但对于Consumer来说不可见。是用来让Broker告知consumer拉取的消息是否已被原子性提交。
  • TransactionId:事务ID,类型为String字符串,由Producer客户端自定义。提供稳定不变的ID意义在于可以在异常后重启从断点进行恢复。
  • Epoch:单调递增的事务Id标识,可以保证具有相同TransactionId的Producer,旧的无法写入。
  • ProducerID、SequenceNumber:标记生产者、消息的唯一标识

事务过程

事务初始化

所有的事务操作都需要Transactional Coordinator‌管理和协调
1.获取Transactional Coordinator‌地址
Producer发送携带Transactionid的请求到任意一个Broker,Broker对获取到Transactionid做hashcode后对topic(__transaction_state)默认分区(50)取模,所得分区主副本所在的Broker作为TransactionalCoordinator‌
2.获取ProducerID和Epoch
Producer对TransactionalCoordinator‌发送请求,此时会分配ProducerId及Epoch,并将信息持久化。最后向Producer返回ProducerId+Epoch。之后的每次请求都会携带ProducerId和Epoch。
(__transaction_state中信息格式为key-value,key为Transactionid,value包含ProducerID、Epoch、事务和分区信息等)

事务开始

3.消息写入
Producer开始事务写入,先将本地事务状态更改为IN_TRANSACTION,然后发送消息之前,Producer会将topic-partition相关的信息发送给TransactionalCoordinator‌,由它完成持久化(更新__transaction_state)。之后Producer开始对相关topic-partition发送消息

事务提交

4.Producer触发事务提交
Producer首先发送请求给TransactionalCoordinator‌,由它更新__transaction_state将事务状态更改为PrepareCommit,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages(会持续重试,直到成功)给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteCommit。

事务取消

5.Producer或Coordinator触发事务取消
事物取消可以由Producer发起取消或者TransactionalCoordinator‌检测到事务超时而取消,此时均会更新__transaction_state更改为PrepareAbort,之后返回成功响应给Producer。TransactionalCoordinator‌发送Control Messages给涉及此次事务的topic-partition,写入成功之后,再次更新__transaction_state,将事务状态更新为CompleteAbort。

取消的事务会记录在.txnindex文件中,主要包含以下信息:currentVersion、producerId、firstOffset(当前事务的开始offset)、lastOffset(当前事务的结束offset)、lastStableOffset(存储时的LSO)

事务消费

正常消费时
读隔离级别为 read-committed, 在内部会使用存储在topic-partition中的Control messgae,来过滤掉没有提交的消息。(回滚的消息也没有删除,只是在读数据时过滤该数据)

对于Consumer-Transform-Producer下,会通过groupId算出__consumer_offsets topic中对应的partition,然后加该partition的信息也加入到Transaction Log‌中,最终在统一取消或提交。同样也会将Control message写入__consumer_offsets对应的分区。

  • 需要将enable.auto.commit设置为false
  • 使用producer.sendOffsetsToTransaction()来提交offset

在这里插入图片描述

参考
https://z.itpub.net/article/detail/F86DD78AECAC4DEC92468DEFFEB4ED0D
https://www.cnblogs.com/hongdada/p/16945086.html
学习笔记之Kafka幂等和事务_transaction.state.log.replication.factor-CSDN博客

相关文章:

Kafka 幂等性与事务

文章目录 幂等性实现机制配置使用局限性 事务使用场景配置使用实现机制事务过程事务初始化事务开始事务提交事务取消事务消费 幂等性 Producer 无论向 Broker 发送多少次重复的数据,Broker 端只会持久化一条,保证数据不丢失且不重复。 实现机制 通过引…...

day2 Linux操作系统指令

思维导图 在家目录下创建目录文件,dir 1、dir下创建dir1和dir2 2、把当前目录下的所有文件拷贝到dir1中, 3、把当前目录下的所有脚本文件拷贝到dir2中 4、把dir2打包并压缩为dir2.tar.xz 5、再把dir2.tar.xz移动到dir1中 6、解压dir1中的压缩包 7、使用…...

AI一周重要会议和活动概览

一、小模型的曙光和机会之思辨高峰论坛 会议介绍:小模型的曙光和机会之思辨”高峰论坛暨第32期CSIG图像图形学科前沿讲习班于2025年1月3—4日在杭州举办,会议由中国图象图形学学会主办,中国图象图形学学会前沿科技论坛委员会承办。本次论坛设…...

重启ubuntu服务器,如何让springboot服务自动运行

文章目录 1. 使用 systemd 服务步骤: 2. 使用 cron 的 reboot 任务步骤: 3. 使用 init.d 脚本(适用于较旧版本)步骤: 推荐方案 为了确保在重启Ubuntu服务器后,让springboot的服务test.jar象 nohup java -ja…...

python系列教程237——启动扩展功能

朋友们,如需转载请标明出处:https://blog.csdn.net/jiangjunshow 声明:在人工智能技术教学期间,不少学生向我提一些python相关的问题,所以为了让同学们掌握更多扩展知识更好地理解AI技术,我让助理负责分享…...

U盘格式化工具合集:6个免费的U盘格式化工具

在日常使用中,U盘可能会因为文件系统不兼容、数据损坏或使用需求发生改变而需要进行格式化。一个合适的格式化工具不仅可以清理存储空间,还能解决部分存储问题。本文为大家精选了6款免费的U盘格式化工具,并详细介绍它们的功能、使用方法、优缺…...

循环神经网络(RNN)入门指南:从原理到实践

目录 1. 循环神经网络的基本概念 2. 简单循环网络及其应用 3. 参数学习与优化 4. 基于门控的循环神经网络 4.1 长短期记忆网络(LSTM) 4.1.1 LSTM的核心组件: 4.2 门控循环单元(GRU) 5 实际应用中的优化技巧 5…...

马原复习笔记

文章目录 前言导论物质实践人类社会资本主义社会主义共产主义后记 前言 一月二号下午四点多考试,很友好,不是早八,哈哈哈。之前豪言壮语和朋友说这次马原要全对,多做了几次测试之后,发现总有一些知识点是自己不知道的…...

Android Room 框架的初步使用

一、简介 Room 是一个强大的对象关系映射库,它允许你将 SQLite 数据库中的表映射到 Java 或 Kotlin 的对象(称为实体)上。你可以使用简单的注解(如 Entity、Dao 和 Database)来定义数据库表、数据访问对象&#xff08…...

什么是过度拟合和欠拟合?

在机器学习中,当一个算法的预测非常接近或者直接等于它的训练数据,导致不能够准确预测除了训练数据以外的数据,我们把这种情况称为过度拟合。算法能够非常接近甚至就是训练的数据,是个非常好的事,但是它不能准确预测除…...

DotnetSpider实现网络爬虫

1. 使用DotnetSpider框架 DotnetSpider是一个开源的、轻量、灵活、高性能、跨平台的分布式网络爬虫框架,适用于.NET平台。它可以帮助开发者快速实现网页数据的抓取功能。 1.1 安装DotnetSpider NuGet包 首先,你需要在你的.NET项目中安装DotnetSpider NuGet包。你可以通过…...

锐捷WLAN产品出货量排名第一!

摘要:2024年Q3锐捷WLAN产品出货量排名第一!锐捷多形态Wi-Fi 7产品重磅出击! 近日, IT市场研究和咨询公司IDC发布《IDC中国企业级WLAN市场跟踪报告,2024年Q3》。报告显示,锐捷WLAN产品在2024年Q3出货量位居行业首位。至此,锐捷WLAN产品在2024年的Q1、Q2、Q3均实现了市场出货量的…...

win32汇编环境下,对话框程序中生成listview列表控件,点击标题栏自动排序的示例

;把代码抄进radasm里面,可以直接编译运行。重要的地方加了备注。 ;这个有点复杂,重要的地方加了备注 ;以下是ASM文件 ;>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>…...

自动化文档处理:Azure AI Document Intelligence

Azure AI Document Intelligence支持多种文件格式,包括PDF、JPEG、PNG等。其核心功能是将这些文档按页进行内容提取,并转化为LangChain文档。其默认输出格式是Markdown,这使得文档可以通过MarkdownHeaderTextSplitter进行语义分片。您也可以使…...

【Maven】Maven打包机制详解

Maven打包的类型? 以下是几种常见的打包形式: 1、jar (Java Archive) 用途:用于包含 Java 类文件和其他资源(如属性文件、配置文件等)的库项目。特点: 可以被其他项目作为依赖引用。适合创建独立的应用程…...

Python 向量检索库Faiss使用

Faiss(Facebook AI Similarity Search)是一个由 Facebook AI Research 开发的库,它专门用于高效地搜索和聚类大量向量。Faiss 能够在几毫秒内搜索数亿个向量,这使得它非常适合于实现近似最近邻(ANN)搜索&am…...

pd.Timestamp接收的参数类型

pd.Timestamp() 是 Pandas 中用于表示单个日期时间的函数,它可以接受多种类型的参数。以下是 pd.Timestamp() 可以接受的主要参数类型,并举例说明: 1. 日期时间字符串(Date/Time String) pd.Timestamp() 可以接收标准…...

FOC控制原理-ADC采样时机

0、文章推荐 SimpleFOC移植STM32(五)—— 电流采样及其变换_极对数对电流采样的影响-CSDN博客 FOC 电流采样方案对比(单电阻/双电阻/三电阻) - 知乎 (zhihu.com) FOC中的三种电流采样方式,你真的会选择吗?…...

运行python程序报错 undefined symbol: ffi_type_uint32 的参考解决方法

文章目录 写在前面一、问题描述二、解决方法参考链接 写在前面 自己的测试环境&#xff1a; Ubuntu20.04 ROS-Noetic 一、问题描述 运行 python 程序出现如下问题&#xff1a; Traceback (most recent call last):File "<string>", line 1, in <module&…...

怎么使用阿里的docker国产镜像源

要使用 阿里云 Docker 镜像加速器&#xff0c;你需要先注册并获取加速器的 URL&#xff0c;然后将其配置到 Docker 的配置文件中。下面是具体的使用步骤&#xff1a; 步骤 1&#xff1a;登录阿里云控制台并获取镜像加速器 URL 登录阿里云控制台 打开 阿里云官网&#xff0c;并…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现

目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

Appium+python自动化(十六)- ADB命令

简介 Android 调试桥(adb)是多种用途的工具&#xff0c;该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具&#xff0c;其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利&#xff0c;如安装和调试…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩

目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件

在选煤厂、化工厂、钢铁厂等过程生产型企业&#xff0c;其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进&#xff0c;需提前预防假检、错检、漏检&#xff0c;推动智慧生产运维系统数据的流动和现场赋能应用。同时&#xff0c;…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)

宇树机器人多姿态起立控制强化学习框架论文解析 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架&#xff08;一&#xff09; 论文解读&#xff1a;交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

Python如何给视频添加音频和字幕

在Python中&#xff0c;给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加&#xff0c;包括必要的代码示例和详细解释。 环境准备 在开始之前&#xff0c;需要安装以下Python库&#xff1a;…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...