当前位置: 首页 > article >正文

kafka consumer 手动 ack

在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer 类中的 commitSync()commitAsync() 方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。

以下是一个简单的手动 ack 的示例代码:

1. 配置 KafkaConsumer 和手动确认消费

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Collections;
import java.util.Properties;public class KafkaManualAckConsumer {public static void main(String[] args) {// 配置消费者的基本属性Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 服务器地址properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组IDproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息key反序列化properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息value反序列化properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交,启用手动提交// 创建 KafkaConsumerKafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(Collections.singletonList("my-topic"));try {while (true) {// 拉取消息var records = consumer.poll(1000); // 拉取数据,等待最多1000ms// 处理每一条消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());// 处理完消息后手动提交偏移量// commitSync: 确保消息成功提交consumer.commitSync();});}} catch (Exception e) {e.printStackTrace();} finally {// 关闭消费者consumer.close();}}
}

2. 代码解析

  1. 配置消费者:
    • ENABLE_AUTO_COMMIT_CONFIG 设置为 false,禁用自动提交偏移量。这样就可以在处理完每条消息后手动提交。
  2. 消息消费与手动 ack:
    • poll(1000) 方法拉取最多 1000 毫秒内的消息。
    • commitSync() 方法用于同步提交当前的偏移量,即消费到的消息的位移,这样可以确保 Kafka 消费者确认该消息已处理。
  3. 异常处理:
    • 异常捕获块 catch 用于处理消费过程中可能出现的任何错误,确保程序不会崩溃。
  4. 关闭消费者:
    • finally 块中调用 consumer.close() 来关闭消费者连接。

3. 使用 commitAsync 提高性能(可选)

如果对性能要求更高,可以考虑使用 commitAsync() 方法,它不会阻塞当前线程,提交操作将在后台异步完成:

consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.out.println("Error committing offset: " + exception.getMessage());} else {System.out.println("Successfully committed offsets: " + offsets);}
});

这样你可以不阻塞线程,提高消费性能,尤其是在高吞吐量的环境中。
如果你有其他的需求或者想更细致地控制消费的细节,随时告诉我!

相关文章:

kafka consumer 手动 ack

在消费 Kafka 消息时&#xff0c;手动确认&#xff08;acknowledge&#xff09;消息的消费&#xff0c;可以通过使用 KafkaConsumer 类中的 commitSync() 或 commitAsync() 方法来实现。这些方法将提交当前偏移量&#xff0c;确保在消费者崩溃时不会重新消费已处理的消息。 以…...

final 关键字在不同上下文中的用法及其名称

1. final 变量 名称&#xff1a;final 变量&#xff08;常量&#xff09;。 作用&#xff1a;一旦赋值后&#xff0c;值不能被修改。 分类&#xff1a; final 实例变量&#xff1a;必须在声明时或构造函数中初始化。 final 静态变量&#xff1a;必须在声明时或静态代码块中初…...

PHP面试题--后端部分

本文章持续更新内容 之前没来得及整理时间问题导致每次都得找和重新背 这次整理下也方便各位小伙伴一起更轻松的一起踏入编程之路 欢迎各位关注博主不定期更新各种高质量内容适合小白及其初级水平同学一起学习 一起成为大佬 数组函数有那些 ps&#xff1a;本题挑难的背因为…...

Python 高精度计算利器:decimal 模块详解

Python 高精度计算利器&#xff1a;decimal 模块详解 在 Python 编程中&#xff0c;处理浮点数时&#xff0c;标准的 float 类型往往会因二进制表示的特性而产生精度问题。decimal 模块应运而生&#xff0c;它提供了十进制浮点运算功能&#xff0c;能让开发者在需要高精度计算…...

hbase相关问题处理

1.如果遇到ZK宕机,通过HTable和Connection两种连接方式获取数据,在实现原理和故障恢复上有何异同? 通过new HTable方式,则每次方法调用都会建立新的连接,而且会从zk获取表的元数据,会导致将业务的并发传导到zookeeper服务,会对全局所有依赖zookeeper服务的节点存在一定…...

Linux下的网络通信编程

在不同主机之间&#xff0c;进行进程间的通信。 1解决主机之间硬件的互通 2.解决主机之间软件的互通. 3.IP地址&#xff1a;来区分不同的主机&#xff08;软件地址&#xff09; 4.MAC地址&#xff1a;硬件地址 5.端口号&#xff1a;区分同一主机上的不同应用进程 网络协议…...

什么是“零日漏洞”(Zero-Day Vulnerability)?为何这类攻击被视为高风险威胁?

正文 零日漏洞&#xff08;Zero-Day Vulnerability&#xff09; 是指软件、硬件或系统中存在的、尚未被开发者发现或修复的安全漏洞。攻击者在开发者意识到漏洞存在之前&#xff08;即“零日”内&#xff09;利用该漏洞发起攻击&#xff0c;因此得名。这类漏洞的“零日”特性使…...

AI数据分析:用DeepSeek做数据清洗

在当今数据驱动的时代&#xff0c;数据分析已成为企业和个人决策的重要工具。随着人工智能技术的快速发展&#xff0c;AI 驱动的数据分析工具正在改变我们处理和分析数据的方式。本文将着重介绍如何使用 DeepSeek 进行数据清洗。 数据清洗是数据分析的基础&#xff0c;其目的是…...

把GB型材库放入solidwork中点击库无法应

1、文件夹的位置要选择对&#xff0c;如下图&#xff1a; 2、文件夹一定要嵌套三层&#xff0c;如下图...

【前端】XML,XPATH,与HTML的关系

XML与HTML关系 XML&#xff08;可扩展标记语言&#xff09;和 HTML&#xff08;超文本标记语言&#xff09;是两种常见的标记语言&#xff0c;但它们有不同的目的和用途。它们都使用类似的标记结构&#xff08;标签&#xff09;&#xff0c;但在设计上存在一些关键的差异。 XML…...

IP-----动态路由OSPF(2)

这只是IP的其中一块内容&#xff0c;IP还有更多内容可以查看IP专栏&#xff0c;前一章内容为动态路由OSPF &#xff0c;可通过以下路径查看IP-----动态路由OSPF-CSDN博客,欢迎指正 注意&#xff01;&#xff01;&#xff01;本部分内容较多所以分成了两部分在上一章 5.动态路…...

《HelloGitHub》第 107 期

兴趣是最好的老师&#xff0c;HelloGitHub 让你对编程感兴趣&#xff01; 简介 HelloGitHub 分享 GitHub 上有趣、入门级的开源项目。 github.com/521xueweihan/HelloGitHub 这里有实战项目、入门教程、黑科技、开源书籍、大厂开源项目等&#xff0c;涵盖多种编程语言 Python、…...

leetcode_字典树 139. 单词拆分

139. 单词拆分 给你一个字符串 s 和一个字符串列表 wordDict 作为字典。如果可以利用字典中出现的一个或多个单词拼接出 s 则返回 true。 注意&#xff1a;不要求字典中出现的单词全部都使用&#xff0c;并且字典中的单词可以重复使用。 思路: 定义状态&#xff1a; 设dp[i]表…...

计算机毕业设计Python+DeepSeek-R1大模型游戏推荐系统 Steam游戏推荐系统 游戏可视化 游戏数据分析(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

网络流算法: Dinic算法

图论相关帖子 基本概念图的表示: 邻接矩阵和邻接表图的遍历: 深度优先与广度优先拓扑排序图的最短路径:Dijkstra算法和Bellman-Ford算法最小生成树二分图多源最短路径强连通分量欧拉回路和汉密尔顿回路网络流算法: Edmonds-Karp算法网络流算法: Dinic算法 环境要求 本文所用…...

【Springboot】解决问题 o.s.web.servlet.PageNotFound : No mapping for *

使用 cursor 进行老项目更新为 springboot 的 web 项目&#xff0c;发生了奇怪的问题&#xff0c;就是 html 文件访问正常&#xff0c;但是静态文件就是 404 检查了各种配置&#xff0c;各种比较&#xff0c;各种调试&#xff0c;最后放弃时候&#xff0c;清理没用的配置文件&…...

Spring Boot 3.x 基于 Redis 实现邮箱验证码认证

文章目录 依赖配置开启 QQ 邮箱 SMTP 服务配置文件代码实现验证码服务邮件服务接口实现执行流程 依赖配置 <dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spr…...

PostgreSQL10 物理流复制实战:构建高可用数据库架构!

背景 PostgreSQL 10 在高可用架构中提供了物理复制&#xff0c;也称为流复制&#xff08;Streaming Replication&#xff09;&#xff0c;用于实现实例级别的数据同步。PostgreSQL 复制机制主要包括物理复制和逻辑复制&#xff1a;物理复制依赖 WAL 日志进行物理块级别的同步&…...

从零开始开发纯血鸿蒙应用之语音朗读

从零开始开发纯血鸿蒙应用 〇、前言一、API 选型1、基本情况2、认识TextToSpeechEngine 二、功能集成实践1、改造右上角菜单2、实现语音播报功能2.1、语音引擎的获取和关闭2.2、设置待播报文本2.3、speak 目标文本2.4、设置语音回调 三、总结 〇、前言 中华汉字洋洋洒洒何其多…...

RabbitMQ系列(五)基本概念之Queue

在 RabbitMQ 中&#xff0c;Queue&#xff08;队列&#xff09; 是存储消息的容器&#xff0c;也是消息传递的核心载体。以下是其核心特性与作用的全方位解析&#xff1a; 一、Queue 的定义与核心作用 消息存储容器 Queue 是 RabbitMQ 中实际存储消息的实体&#xff0c;生产者…...

奔图Pantum M7165DN黑白激光打印一体机报数据清除中…维修

故障描述: 一台奔图Pantum M7165DN黑白激光打印一体机开机自检正常,自检过后就不能工作了,按键面板无任何反应一直提示数据清除中…,如果快速操作的话也能按出菜单、功能啥的,不过一会又死机了,故障请看下图: 故障检修: 经分析可能是主板数据出现了问题,看看能不能快速…...

TP-LINK路由器如何设置网段、网关和DHCP服务

目标 ①将路由器的网段由192.168.1.XXX改为192.168.5.XXX ②确认DHCP是启用的&#xff0c;并将DHCP的IP池的范围设置为排除自己要手动指定的IP地址&#xff0c;避免IP冲突。 01-复位路由器 路由器按住复位键10秒以上进行重置操作 02-进入路由器管理界面 电脑连接到路由器&…...

神经网络代码入门解析

神经网络代码入门解析 import torch import matplotlib.pyplot as pltimport randomdef create_data(w, b, data_num): # 数据生成x torch.normal(0, 1, (data_num, len(w)))y torch.matmul(x, w) b # 矩阵相乘再加bnoise torch.normal(0, 0.01, y.shape) # 为y添加噪声…...

设计一个“车速计算”SWC,通过Sender-Receiver端口输出车速信号。

1. 需求分析 功能目标:根据车轮脉冲信号(轮速传感器输入)计算当前车速,并将结果通过Sender端口发送给其他SWC。 输入:轮速脉冲数(如WheelPulse,类型uint32)。 输出:车速(如VehicleSpeed,类型float32,单位km/h)。 触发方式:周期性计算(例如每10ms执行一次)。 2.…...

TCP/IP 5层协议簇:网络层(IP数据包的格式、路由器原理)

目录 1. TCP/IP 5层协议簇 2. IP 三层包头协议 3. 路由器原理 4. 交换机和路由的对比 1. TCP/IP 5层协议簇 如下&#xff1a; 2. IP 三层包头协议 数据包如下&#xff1a;IP包头不是固定的&#xff0c;每一个数字是一个bit 其中数据部分是上层的内容&#xff0c;IP包头最…...

1JVM概念

JVM&#xff08;Java虚拟机&#xff09;详解 1. ​基本概念与作用​ JVM&#xff08;Java Virtual Machine&#xff09;是Java程序的运行环境&#xff0c;负责将编译后的字节码&#xff08;.class文件&#xff09;解释或编译为机器指令执行&#xff0c;并管理内存、线程、安全…...

echarts柱状图不是完全铺满容器,左右两边有空白

目录 处理前&#xff1a;echarts柱状图不是完全铺满容器&#xff0c;左右两边有空白处理前&#xff1a;通过调整 grid 组件配置处理后效果修改代码&#xff1a;1. 调整 grid 组件配置原理解决办法 2. 处理 xAxis 的 boundaryGap 属性原理解决办法 3. 调整 barMaxWidth 和 barMi…...

ArcGIS Pro技巧实战:高效矢量化天地图地表覆盖图

在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;地表覆盖图的矢量化是一项至关重要的任务。天地图作为中国国家级的地理信息服务平台&#xff0c;提供了丰富且详尽的地表覆盖数据。然而&#xff0c;这些数据通常以栅格格式存在&#xff0c;不利于进行空间分析和数据…...

西门子S7-1200比较指令

西门子S7-1200 PLC比较指令学习笔记 一、比较指令的作用 核心功能&#xff1a;用于比较两个数值的大小或相等性&#xff0c;结果为布尔值&#xff08;True/False&#xff09;。典型应用&#xff1a; 触发条件控制&#xff08;如温度超过阈值启动报警&#xff09;数据筛选&…...

【AD】3-6 层次原理图

自上而下 1.放置-页面符号&#xff0c;并设置属性 2.放置-端口 可通过如下设置将自动生成关掉 3.放置-添加图纸入口&#xff0c;并创建图纸 自下而上 1.子图的原理图页设计 设计资原理图&#xff0c;复制网络标签&#xff0c;智能粘贴未PORT 2.新建主图原理图 创建框…...