当前位置: 首页 > news >正文

Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

目录

    • 生产者ack机制
    • 消费者ack模式
    • 手动提交ACK

生产者ack机制

Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。

Kafka 提供了三种 ACK 机制的配置选项,分别是:

  1. acks=0:生产者在成功将消息发送到网络缓冲区后即视为消息已被提交,不等待任何服务器响应。这种配置下,可能会出现消息丢失的情况。

  2. acks=1:生产者在成功将消息发送到主题的分区 leader 后即视为消息已被提交。这种配置下,生产者会收到分区 leader
    的确认,但仍有可能出现消息丢失的情况,例如当 leader 出现故障,而消息尚未复制到其他副本时。

  3. acks=all 或acks=-1:生产者需要等待所有分区副本都成功写入消息后才视为消息已被提交。这种配置下,生产者会等待所有分区副本的确认,确保消息被复制到足够数量的副本后才返回提交确认。这是最安全的确认方式,但也会导致较长的等待时间。

在实际使用中,根据对消息可靠性和延迟的要求,可以选择不同的 ACKs 级别。一般来说,如果对消息的可靠性要求较高,可以选择较高的 ACKs 级别,但需要考虑相应的延迟成本。

我们可以通过spring.kafka.producer.acks来配置ack机制

spring.kafka.producer.acks=1

消费者ack模式

kafka支持的消费模式,在AbstractMessageListenerContainer.AckMode的枚举中,下面就介绍下各个模式的区别

public enum AckMode {/*** Commit after each record is processed by the listener.*/RECORD,/*** Commit whatever has already been processed before the next poll.*/BATCH,/*** Commit pending updates after* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.*/TIME,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded.*/COUNT,/*** Commit pending updates after* {@link ContainerProperties#setAckCount(int) ackCount} has been* exceeded or after {@link ContainerProperties#setAckTime(long)* ackTime} has elapsed.*/COUNT_TIME,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}.*/MANUAL,/*** User takes responsibility for acks using an* {@link AcknowledgingMessageListener}. The consumer* immediately processes the commit.*/MANUAL_IMMEDIATE,}

AckMode模式

RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
当使用 RECORD 确认模式时,消息监听容器会在每个消息被单独处理后进行确认。这意味着,如果一条消息被成功处理,它将作为单独的记录进行确认;如果处理失败,也会针对该消息进行错误记录。这种确认模式适用于需要精确处理每个消息的应用场景,例如确保每个消息都被正确处理。

BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
当使用 BATCH 确认模式时,消息监听容器会在批量处理一组消息后进行确认。这意味着,消息监听容器会将多个消息合并为批次,并将它们作为一组进行处理。只有在整个批次都被成功处理后,该批次的所有消息才会被确认。这种确认模式适用于需要提高处理效率的场景,例如批量处理大量消息以减少网络传输和系统调用的开销。

TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

COUNT_TIME:TIME或COUNT 有一个条件满足时提交

MANUAL:这是手动确认模式,消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息。只有当消费者调用 acknowledge() 方法后,才会向 Kafka 服务器发送确认消息。这种模式可以保证消息的可靠性和顺序性,但需要消费者显式地处理确认逻辑。

MANUAL_IMMEDIATE:这是立即手动确认模式,与 MANUAL 模式类似,但消费者在调用 acknowledge() 方法时,会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度,但可能会增加重复消费的风险。

MANUAL和MANUAL_IMMEDIATE的区别

MANUAL 和 MANUAL_IMMEDIATE 都是 Kafka 消费者的手动确认模式,它们的区别在于确认的时机不同。

MANUAL 模式下,消费者需要显式地调用 Acknowledgment.acknowledge() 方法来确认消息,在调用该方法之后,消息才会被标记为已消费,并且确认消息会在下次 poll() 时发送到 Kafka 服务器。这种模式的优点是可以保证消息的可靠性和顺序性,但需要消费者显式地处理确认逻辑。

相比之下,MANUAL_IMMEDIATE 模式下,在消费者调用 Acknowledgment.acknowledge() 方法时,会立即向 Kafka 服务器发送确认消息。这种模式可以提高消息处理的速度,但可能会增加重复消费的风险,因为如果消息处理失败,Kafka 不会再次发送该消息,而是认为该消息已经被成功消费了。

在实际使用中,应根据业务需求和性能要求来选择合适的确认模式。如果要求消息的可靠性和顺序性比较高,可以选择 MANUAL 模式;如果要求处理速度比较高,可以选择 MANUAL_IMMEDIATE 模式。

AckMode 可以通过配置文件或代码进行设置。例如,在 Spring Boot 应用中,可以使用以下配置方式指定确认模式:

spring.kafka.listener.ack-mode=manual_immediate

手动提交ACK

kafka默认是自动提交ack的,很多时候,我们都需要手动提交,这就要进行以下配置

1、设置enable-auto-commit=false,禁止自动提交
2、设置ack-mode为manual_immediate

在配置文件进行如下配置

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual_immediate

3、监听方法的入参加入Acknowledgment ack 参数,并在消费完成之后调用acknowledge方法,如下所示

	@KafkaListener(topics = "my-topic2",groupId = "myGroup")public void  receiveMessage2(String message, Acknowledgment ack){log.info("消费消息:"+message);//ack确认ack.acknowledge();}

相关文章:

Spring Boot 整合kafka:生产者ack机制和消费者AckMode消费模式、手动提交ACK

目录 生产者ack机制消费者ack模式手动提交ACK 生产者ack机制 Kafka 生产者的 ACK 机制指的是生产者在发送消息后,对消息副本的确认机制。ACK 机制可以帮助生产者确保消息被成功写入 Kafka 集群中的多个副本,并在需要时获取确认信息。 Kafka 提供了三种…...

Java+Swing: 主界面组件布局 整理9

说明:这篇博客是在上一篇的基础上的,因为上一篇已经将界面的框架搭好了,这篇主要是将里面的组件完善。 分为三个部分,北边的组件、中间的组件、南边的组件 // 放置北边的组件layoutNorth(contentPane);// 放置中间的 Jtablelayou…...

pytorch:YOLOV1的pytorch实现

pytorch:YOLOV1的pytorch实现 注:本篇仅为学习记录、学习笔记,请谨慎参考,如果有错误请评论指出。 参考: 动手学习深度学习pytorch版——从零开始实现YOLOv1 目标检测模型YOLO-V1损失函数详解 3.1 YOLO系列理论合集(Y…...

YOLOv8配置文件yolov8.yaml解读

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 | 接辅导、项目定制 位置 该文件的位置位于 ./ultralytics/cfg/models/v8/yolov8.yaml 模型参数配置 # Parameters nc: 80 # number of classes scales: #…...

4-Tornado高并发原理

核心原理就是协程epoll事件循环,再使用协程之后,开销是特别的小,那具体如何提供高并发的呢? 异步非阻塞IO 这意味我们整套开发的模式不在与原来一样,正因为不再一样,所以有时我们在理解代码时就有可能会比…...

基于以太坊的智能合约开发Solidity(事件日志篇)

//声明版本号(程序中的版本号要和编译器版本号一致) pragma solidity ^0.5.17; //合约 contract EventTest {//状态变量uint public Variable;//构造函数constructor() public{Variable 100;}event ValueChanged(uint newValue); //事件声明event Log(…...

【BME2112】w11 notes

下周做老鼠实验 group analysis SPM group analysis 数据地址resting state 可以分析:correlation 计算两个脑区的相关性 静息态实验简单functional 成功的实验能看到激活区不成功的实验:比如被试头动太大,不是健康的被试 Spontaneous brain…...

Flutter笔记:滑块及其实现分析1

Flutter笔记 滑块分析1 作者:李俊才 (jcLee95):https://blog.csdn.net/qq_28550263 邮箱 :291148484163.com 本文地址:https://blog.csdn.net/qq_28550263/article/details/134900784 本文从设计角度&#…...

【React Hooks】useReducer()

useReducer 的三个参数是可选的,默认就是initialState,如果在调用的时候传递第三个参数那么他就会改变为你传递的参数,实际开发不建议这样写。会增加代码的不可读性。 使用方法: 必须将 useReducer 的第一个参数(函数…...

如何把kubernetes pod中的文件拷贝到宿主机上或者把宿主机上文件拷贝到kubernetes pod中

1. 创建一个 Kubernetes Pod 首先&#xff0c;下面是一个示例Pod的定义文件&#xff08;pod.yaml&#xff09;&#xff1a; cat > nginx.yaml << EOF apiVersion: v1 kind: Pod metadata:name: my-nginx spec:containers:- name: nginximage: nginx EOF kubectl app…...

Android 13 - Media框架(20)- ACodec(二)

这一节开始我们就来学习 ACodec 的实现 1、创建 ACodec ACodec 是在 MediaCodec 中创建的&#xff0c;这里先贴出创建部分的代码&#xff1a; mCodec mGetCodecBase(name, owner);if (mCodec NULL) {ALOGE("Getting codec base with name %s (owner%s) failed", n…...

TCP单聊和UDP群聊

TCP协议单聊 服务端&#xff1a; import java.awt.BorderLayout; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.V…...

智能优化算法应用:基于鲸鱼算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于鲸鱼算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于鲸鱼算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.鲸鱼算法4.实验参数设定5.算法结果6.参考文献7.MA…...

TortoiseGit 小乌龟svn客户端软件查看仓库地址

进入代码路径...

uniapp微信小程序分包,小程序分包

前言&#xff0c;都知道我是一个后端开发、所以今天来写一下uniapp。 起因是美工给我的切图太大&#xff0c;微信小程序不让了&#xff0c;在网上找了一大堆分包的文章&#xff0c;我心思我照着写的啊&#xff0c;怎么就一直报错呢&#xff1f; 错误原因 tabBar的页面被我放在分…...

『Linux升级路』进度条小程序

一、预备知识 在编写『Linux升级路』进度条小程序之前&#xff0c;我们需要了解一些预备知识。本文将详细介绍缓冲区和回车换行的概念。 1.1 缓冲区 缓冲区是计算机内存中的一块区域&#xff0c;用于临时存储数据。在编程中&#xff0c;我们经常使用缓冲区来临时保存数据&am…...

使用rust slint开发桌面应用

安装QT5&#xff0c;过程省略 安装rust&#xff0c;过程省略 创建工程 cargo new slint_demo 在cargo.toml添加依赖 [dependencies] slint "1.1.1" [build-dependencies] slint-build "1.1.1" 创建build.rs fn main() {slint_build::compile(&quo…...

Flutter桌面应用程序定义系统托盘Tray

文章目录 概念实现方案1. tray_manager依赖库支持平台实现步骤 2. system_tray依赖库支持平台实现步骤 3. 两种方案对比4. 注意事项5. 话题拓展 概念 系统托盘&#xff1a;系统托盘是一种用户界面元素&#xff0c;通常出现在操作系统的任务栏或桌面顶部。它是一个水平的狭长区…...

docker:安装mysql以及最佳实践

文章目录 1、拉取镜像2、运行容器3、进入容器方式一方式二方式三容器进入后连接mysql和在宿主机连接mysql的区别 持久化数据持久化数据最佳实践 1、拉取镜像 docker pull mysql2、运行容器 docker run -d -p 3307:3306 --name mysql-container -e MYSQL_ROOT_PASSWORD123456 …...

uniapp实战 —— 自定义顶部导航栏

效果预览 下图中的红框区域 范例代码 src\pages.json 配置隐藏默认顶部导航栏 "navigationStyle": "custom", // 隐藏默认顶部导航src\pages\index\components\CustomNavbar.vue 封装自定义顶部导航栏的组件&#xff08;要点在于&#xff1a;获取屏幕边界…...

YOLO26涨点改进| TPAMI 2026 |独家创新首发、Conv改进篇| 引入LPM 局部先验特征增强模块,更加聚焦于目标区域并抑制背景干扰,助力目标检测、图像分割、图像恢复、图像增强有效涨点

一、本文介绍 🔥本文给大家介绍使用 LPM 局部先验特征增强模块 改进YOLO26网络模型,通过构建重要性图对特征提取过程进行引导,使模型能够更加聚焦于目标区域并抑制背景干扰,从而提升特征表达质量和目标区分能力。其优势体现在能够有效增强关键区域信息、提升小目标和复杂…...

Elsevier Tracker:学术审稿状态自动化追踪解决方案

Elsevier Tracker&#xff1a;学术审稿状态自动化追踪解决方案 【免费下载链接】Elsevier-Tracker 项目地址: https://gitcode.com/gh_mirrors/el/Elsevier-Tracker Elsevier Tracker是一款开源Chrome插件&#xff0c;专为学术研究者设计&#xff0c;提供Elsevier期刊审…...

高并发接口总被打崩?我用 ArrayBlockingQueue + 底层源码深度剖析搞定流控

一、实现原理⚠️注意 ✔️有界阻塞队列&#xff1a;容量固定&#xff0c;必须在初始化时指定长度&#xff0c;无自动扩容机制。 ✔️先进先出&#xff08;FIFO&#xff09;&#xff1a;入队元素从队尾添加&#xff0c;出队元素从队首取出。 ✔️存取互斥&#xff1a;所有读写操…...

Pandas读写Parquet文件避坑指南:pyarrow和fastparquet引擎怎么选?columns参数真能省内存吗?

Pandas读写Parquet文件避坑指南&#xff1a;引擎选择与内存优化实战解析 当你第一次听说Parquet格式能比CSV节省80%存储空间时&#xff0c;可能和我一样兴奋地立刻把项目里的数据全转成了.parquet后缀。但真正在生产环境部署时&#xff0c;却发现pd.read_parquet()在不同机器上…...

AI Agent在数据分析领域应用研究

我个人是从技术做到管理&#xff0c;从实施做到咨询&#xff0c;从售前做到销售&#xff0c;在技术领域来说我最擅长的就是数据技术。在大学时我学过Oracle 6.0&#xff0c;参加工作后又到清华大学参加过Oracle 8i培训&#xff0c;接着又做过Oracle DBA&#xff0c;后来又做数据…...

宝塔Linux面板Bug修复:添加反向代理出错

起因 由于工作需要&#xff0c;在宝塔面板中创建一个反向代理的站点&#xff0c;结果每次都报错: 向宝塔论天提交了Bug&#xff0c;结果两天了还在审核中。 由于急用&#xff0c;因此不等官方修复了&#xff0c;自己动手修复&#xff01; 故障原因 从报错信息可以看到&…...

如何告别投稿焦虑:Elsevier Tracker智能监控插件的完整指南

如何告别投稿焦虑&#xff1a;Elsevier Tracker智能监控插件的完整指南 【免费下载链接】Elsevier-Tracker 项目地址: https://gitcode.com/gh_mirrors/el/Elsevier-Tracker 还在为Elsevier投稿系统的繁琐查询而烦恼吗&#xff1f;每次登录系统查看审稿进度都需要重复点…...

开发提效新思路:用快马平台打造你的个性化qoderwork代码片段工厂

今天想和大家分享一个提升前端开发效率的实用思路 - 用InsCode(快马)平台打造自己的代码片段工厂。作为一个经常需要重复编写UI组件的前端开发者&#xff0c;我发现这个方案能显著减少重复劳动。 痛点分析 每次新项目都要从零开始写导航栏、页脚这些基础组件特别浪费时间。虽然…...

3大优势让学术翻译更安全:Zotero PDF翻译插件离线方案全解析

3大优势让学术翻译更安全&#xff1a;Zotero PDF翻译插件离线方案全解析 【免费下载链接】zotero-pdf-translate Translate PDF, EPub, webpage, metadata, annotations, notes to the target language. Support 20 translate services. 项目地址: https://gitcode.com/gh_mi…...

Awesome Rust核心库精选:异步编程与网络开发

Awesome Rust核心库精选&#xff1a;异步编程与网络开发 本文深入探讨了Rust生态系统中的核心库&#xff0c;重点分析了异步运行时&#xff08;Tokio与async-std&#xff09;、网络编程库、HTTP客户端/服务器框架、数据序列化工具链以及密码学与安全相关库。通过对比分析各库的…...