当前位置: 首页 > article >正文

kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)

在这里插入图片描述


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

代码示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.从时间戳开始消费

使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()

源码分析:

offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

代码实例:

Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.从分区首尾消费

使用seekToBeginning()seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

代码实例:

// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事项

  1. 分区分配与poll()的依赖
    seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

  2. 数据过期问题
    若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

  3. 异步提交与位移覆盖风险
    异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性

  4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

3.完整代码实例

public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分区分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 获取24小时前的时间戳对应偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 处理无有效偏移量的情况(如从头开始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}

相关文章:

kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)

1.简介 Kafka的poll()方法消费无法精准的掌握其消费的起始位置&#xff0c;auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置&#xff08;如固定偏移量、时间戳或分区首尾&#xff09;开…...

【后端高阶面经:架构篇】46、分布式架构:如何应对高并发的用户请求

一、架构设计原则:构建可扩展的系统基石 在分布式系统中,高并发场景对架构设计提出了极高要求。 分层解耦与模块化是应对复杂业务的核心策略,通过将系统划分为客户端、CDN/边缘节点、API网关、微服务集群、缓存层和数据库层等多个层次,实现各模块的独立演进与维护。 1.1 …...

网络编程学习笔记——TCP网络编程

文章目录 1、socket()函数2、bind()函数3、listen()4、accept()5、connect()6、send()/write()7、recv()/read()8、套接字的关闭9、TCP循环服务器模型10、TCP多线程服务器11、TCP多进程并发服务器 网络编程常用函数 socket() 创建套接字bind() 绑定本机地址和端口connect() …...

Vue+element-ui,实现表格渲染缩略图,鼠标悬浮缩略图放大,点击缩略图播放视频(一)

Vueelement-ui&#xff0c;实现表格渲染缩略图&#xff0c;鼠标悬浮缩略图放大&#xff0c;点击缩略图播放视频 前言整体代码预览图具体分析基础结构主要标签作用videoel-popover 前言 如标题&#xff0c;需要实现这样的业务 此处文章所实现的&#xff0c;是静态视频资源。 注…...

day13 leetcode-hot100-22(链表1)

160. 相交链表 - 力扣&#xff08;LeetCode&#xff09; 1.哈希集合HashSet 思路 &#xff08;1&#xff09;将A链的所有数据存储到HashSet中。 &#xff08;2&#xff09;遍历B链&#xff0c;找到是否在A中存在。 具体代码 /*** Definition for singly-linked list.* pu…...

【Oracle】DQL语言

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. DQL概述1.1 什么是DQL&#xff1f;1.2 DQL的核心功能 2. SELECT语句基础2.1 基本语法结构2.2 最简单的查询2.3 DISTINCT去重 3. WHERE条件筛选3.1 基本条件运算符3.2 逻辑运算符组合3.3 高级条件筛选 4. 排序…...

HUAWEI华为MateBook D 14 2021款i5,i7集显非触屏(NBD-WXX9,NbD-WFH9)原装出厂Win10系统

适用型号&#xff1a;NbD-WFH9、NbD-WFE9A、NbD-WDH9B、NbD-WFE9、 链接&#xff1a;https://pan.baidu.com/s/1qTCbaQQa8xqLR-4Ooe3ytg?pwdvr7t 提取码&#xff1a;vr7t 华为原厂WIN系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office…...

【STIP】安全Transformer推理协议

Secure Transformer Inference Protocol 论文地址&#xff1a;https://arxiv.org/abs/2312.00025 摘要 模型参数和用户数据的安全性对于基于 Transformer 的服务&#xff08;例如 ChatGPT&#xff09;至关重要。虽然最近在安全两方协议方面取得的进步成功地解决了服务 Transf…...

leetcode hot100刷题日记——27.对称二叉树

方法一&#xff1a;递归法 class Solution { public:bool check(TreeNode *left,TreeNode *right){//左子树和右子树的节点同时是空的是对称的if(leftnullptr&&rightnullptr){return true;}if(leftnullptr||rightnullptr){return false;}//检查左右子树的值相不相等&a…...

高考加油(Python+HTML)

前言 询问DeepSeek根据自己所学到的知识来生成多个可执行的代码&#xff0c;为高考学子加油。最开始生成的都会有点小问题&#xff0c;还是需要自己调试一遍&#xff0c;下面就是完整的代码&#xff0c;当然了最后几天也不会有多少人看&#xff0c;都在专心的备考。 Python励…...

贪心算法应用:Ford-Fulkerson最大流问题详解

Java中的贪心算法应用:Ford-Fulkerson最大流问题详解 1. 最大流问题概述 最大流问题(Maximum Flow Problem)是图论中的一个经典问题,旨在找到一个从源节点(source)到汇节点(sink)的最大流量。Ford-Fulkerson方法是解决最大流问题的经典算法之一,它属于贪心算法的范畴…...

UE5 Niagara 如何让四元数进行旋转

Axis Angle中&#xff0c;X,Y,Z分别为旋转的轴向&#xff0c;W为旋转的角度&#xff0c;在这里旋转角度不需要除以2&#xff0c;因为里面已经除了&#xff0c;再将计算好的四元数与要进行旋转的四元数进行相乘&#xff0c;结果就是按照原来的角度绕着某一轴向旋转了某一角度...

从“黑箱”到透明化:MES如何重构生产执行全流程?

引言 在传统制造企业中&#xff0c;生产执行环节常面临“计划混乱、进度难控、异常频发、数据滞后”的困境。人工派工效率低下、物料错配频发、质量追溯困难等问题&#xff0c;直接导致交付延期、成本攀升、客户流失。深蓝易网MES系统以全流程数字化管理为核心&#xff0c;通过…...

探索Linux互斥:线程安全与资源共享

个人主页&#xff1a;chian-ocean 文章专栏-Linux 前言&#xff1a; 互斥是并发编程中避免竞争条件和保护共享资源的核心技术。通过使用锁或信号量等机制&#xff0c;能够确保多线程或多进程环境下对共享资源的安全访问&#xff0c;避免数据不一致、死锁等问题。 竞争条件 竞…...

JWT安全:假密钥.【签名随便写实现越权绕过.】

JWT安全&#xff1a;假密钥【签名随便写实现越权绕过.】 JSON Web 令牌 (JWT)是一种在系统之间发送加密签名 JSON 数据的标准化格式。理论上&#xff0c;它们可以包含任何类型的数据&#xff0c;但最常用于在身份验证、会话处理和访问控制机制中发送有关用户的信息(“声明”)。…...

Python爬虫实战:抓取百度15天天气预报数据

&#x1f310; 编程基础第一期《9-30》–使用python中的第三方模块requests&#xff0c;和三个内置模块(re、json、pprint)&#xff0c;实现百度地图的近15天天气信息抓取 记得安装 pip install requests&#x1f4d1; 项目介绍 网络爬虫是Python最受欢迎的应用场景之一&…...

RV1126 + FFPEG多路码流项目

代码主体思路&#xff1a; 一.VI,VENC,RGA模块初始化 1.先创建一个自定义公共结构体&#xff0c;用于方便管理各个模块 rkmedia_config_public.h //文件名字#ifndef _RV1126_PUBLIC_H #define _RV1126_PUBLIC_H#include <assert.h> #include <fcntl.h> #include …...

NodeJS 基于 Koa, 开发一个读取文件,并返回给客户端文件下载,以及读取文件形成列表和文件删除的代码演示

前言 在上一篇文章 《Nodejs 实现 Mysql 数据库的全量备份的代码演示》 中&#xff0c;我们演示了如何将用户的 Mysql 数据库进行备份的代码。但是&#xff0c;这个备份&#xff0c;只是备份在了服务器上了。 而我们用户的真实需求&#xff0c;是需要将备份文件下载到本地进行…...

为什么在我的Flask里面有两个路由,但是在网页里有一个却不能正确访问到智能体

1. /zhoushibo 能访问&#xff0c;/chat 直接浏览器访问报 Method Not Allowed 原因&#xff1a; /zhoushibo 路由是你用 app.route(/zhoushibo) 定义的&#xff0c;返回的是一个HTML网页&#xff0c;浏览器访问没问题。 /chat 路由你用的是 app.route(/chat, methods[POST])…...

哈工大计算机系统2024大作业——Hello的程序人生

计算机系统 大作业 题 目 程序人生-Hello’s P2P 专 业 人工智能 学   号 2022112040 班 级 2203601 学 生 郄东昕 指 导 教 师 吴锐 计算机科学与技术学院…...

2025年软件测试面试八股文(含答案+文档)

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Part1 1、你的测试职业发展是什么&#xff1f; 测试经验越多&#xff0c;测试能力越高。所以我的职业发展是需要时间积累的&#xff0c;一步步向着高级测试工程师…...

【仿生系统】qwen的仿生机器人解决方案

收到完整需求后&#xff0c;我将从系统架构设计、算法实现路径、训练方法三个维度为您构建完整的仿生机器人解决方案。以下是分阶段实施方案&#xff1a; 一、核心系统架构设计&#xff08;模块化可进化架构&#xff09; 1. 多模态感知引擎 - 视觉子系统&#xff1a;YOLOv8SAM…...

Flutter3.22适配运行鸿蒙系统问题记录

Flutter3.22适配运行鸿蒙系统问题记录 一&#xff1a;适配条件适配过程问题记录&#xff08;1&#xff09;环境配置问题&#xff08;2&#xff09;Concurrent modification during iteration: Instance(length:2) of_GrowableList 报错&#xff08;3&#xff09;三方插件寻找替…...

秋招Day10 - JVM - 内存管理

JVM组织架构主要有三个部分&#xff1a;类加载器、运行时数据区和字节码执行引擎 类加载器&#xff1a;负责从文件系统、网络或其他来源加载class文件&#xff0c;将class文件中的二进制数据加载到内存中运行时数据区&#xff1a;运行时的数据存放的区域&#xff0c;分为方法区…...

Spring Boot 3.5.0中文文档上线

Spring Boot 3.5.0 中文文档翻译完成&#xff0c;需要的可收藏 传送门&#xff1a;Spring Boot 3.5.0 中文文档...

Redisson学习专栏(一):快速入门及核心API实践

文章目录 前言一、Redisson简介1.1 什么是Redisson&#xff1f;1.2 解决了什么问题&#xff1f; 二、快速入门2.1 环境准备 2.2 基础配置三、核心API解析3.1 分布式锁&#xff08;RLock&#xff09;3.2 分布式集合3.2.1 RMap&#xff08;分布式Map&#xff09;3.2.2 RList&…...

Pandas学习入门一

1.什么是Pandas? Pandas是一个强大的分析结构化数据的工具集&#xff0c;基于NumPy构建&#xff0c;提供了高级数据结构和数据操作工具&#xff0c;它是使Python成为强大而高效的数据分析环境的重要因素之一。 一个强大的分析和操作大型结构化数据集所需的工具集基础是NumPy…...

基于Piecewise Jerk Speed Optimizer的速度规划算法(附ROS C++/Python仿真)

目录 1 时空解耦运动规划2 PJSO速度规划原理2.1 优化变量2.2 代价函数2.3 约束条件2.4 二次规划形式 3 算法仿真3.1 ROS C仿真3.2 Python仿真 1 时空解耦运动规划 在自主移动系统的运动规划体系中&#xff0c;时空解耦的递进式架构因其高效性与工程可实现性被广泛采用。这一架…...

关于 JavaScript 版本、TypeScript、Vue 的区别说明, PHP 开发者入门 Vue 的具体方案

以下是关于 JavaScript 版本、TypeScript、Vue 的区别说明&#xff0c;以及 PHP 开发者入门 Vue 的具体方案&#xff1a; 一、JavaScript 版本演进 JavaScript 的核心版本以 ECMAScript 规范&#xff08;ES&#xff09; 命名&#xff1a; 版本发布时间关键特性ES52009严格模式…...

中断和信号详解

三种中断 中断分为三种&#xff1a;硬件中断、异常中断、软中断 硬件中断 设备向中断控制器发送中断请求&#xff0c;中断控制器生成对应中断号&#xff0c;然后通过中断引脚向cpu发送高电平&#xff0c;cpu收到请求后不会立即处理&#xff0c;cpu会处理完当前指令&#xff…...