kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)
1.简介
Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset
参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()
方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。
2.指定消费位置
2.1.从特定偏移量开始消费
使用seek(TopicPartition partition, long offset)
指定具体偏移量。
源码分析:
seek()
方法更新消费者内部的subscriptions
对象的position
字段,记录目标偏移量。- 后续
poll()
时,Fetcher
类根据此位置向Broker发送拉取请求。
代码示例:
consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));
2.2.从时间戳开始消费
使用offsetsForTimes()
获取时间戳对应的偏移量,再调用seek()
。
源码分析:
offsetsForTimes()
向Broker发送ListOffsetRequest
,查询满足时间戳条件的最早或最新偏移量。
代码实例:
Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});
2.3.从分区首尾消费
使用seekToBeginning()
或seekToEnd()
,或通过beginningOffsets()
/endOffsets()
获取首尾偏移量后手动设置。
代码实例:
// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));
2.4.注意事项
-
分区分配与poll()的依赖
seek()
必须在分区分配完成后调用,否则会抛出IllegalStateException
。需通过循环poll()
确保分配到分区。 -
数据过期问题
若指定偏移量对应的消息已被删除(如日志清理导致),seek()
将失效。此时需使用beginningOffsets()
获取当前最小有效偏移量。 -
异步提交与位移覆盖风险
异步提交(commitAsync()
)失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync()
)保证原子性。 -
seek()
方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。
3.完整代码实例
public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分区分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 获取24小时前的时间戳对应偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 处理无有效偏移量的情况(如从头开始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}
相关文章:

kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)
1.简介 Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开…...

【后端高阶面经:架构篇】46、分布式架构:如何应对高并发的用户请求
一、架构设计原则:构建可扩展的系统基石 在分布式系统中,高并发场景对架构设计提出了极高要求。 分层解耦与模块化是应对复杂业务的核心策略,通过将系统划分为客户端、CDN/边缘节点、API网关、微服务集群、缓存层和数据库层等多个层次,实现各模块的独立演进与维护。 1.1 …...

网络编程学习笔记——TCP网络编程
文章目录 1、socket()函数2、bind()函数3、listen()4、accept()5、connect()6、send()/write()7、recv()/read()8、套接字的关闭9、TCP循环服务器模型10、TCP多线程服务器11、TCP多进程并发服务器 网络编程常用函数 socket() 创建套接字bind() 绑定本机地址和端口connect() …...

Vue+element-ui,实现表格渲染缩略图,鼠标悬浮缩略图放大,点击缩略图播放视频(一)
Vueelement-ui,实现表格渲染缩略图,鼠标悬浮缩略图放大,点击缩略图播放视频 前言整体代码预览图具体分析基础结构主要标签作用videoel-popover 前言 如标题,需要实现这样的业务 此处文章所实现的,是静态视频资源。 注…...

day13 leetcode-hot100-22(链表1)
160. 相交链表 - 力扣(LeetCode) 1.哈希集合HashSet 思路 (1)将A链的所有数据存储到HashSet中。 (2)遍历B链,找到是否在A中存在。 具体代码 /*** Definition for singly-linked list.* pu…...

【Oracle】DQL语言
个人主页:Guiat 归属专栏:Oracle 文章目录 1. DQL概述1.1 什么是DQL?1.2 DQL的核心功能 2. SELECT语句基础2.1 基本语法结构2.2 最简单的查询2.3 DISTINCT去重 3. WHERE条件筛选3.1 基本条件运算符3.2 逻辑运算符组合3.3 高级条件筛选 4. 排序…...

HUAWEI华为MateBook D 14 2021款i5,i7集显非触屏(NBD-WXX9,NbD-WFH9)原装出厂Win10系统
适用型号:NbD-WFH9、NbD-WFE9A、NbD-WDH9B、NbD-WFE9、 链接:https://pan.baidu.com/s/1qTCbaQQa8xqLR-4Ooe3ytg?pwdvr7t 提取码:vr7t 华为原厂WIN系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office…...

【STIP】安全Transformer推理协议
Secure Transformer Inference Protocol 论文地址:https://arxiv.org/abs/2312.00025 摘要 模型参数和用户数据的安全性对于基于 Transformer 的服务(例如 ChatGPT)至关重要。虽然最近在安全两方协议方面取得的进步成功地解决了服务 Transf…...

leetcode hot100刷题日记——27.对称二叉树
方法一:递归法 class Solution { public:bool check(TreeNode *left,TreeNode *right){//左子树和右子树的节点同时是空的是对称的if(leftnullptr&&rightnullptr){return true;}if(leftnullptr||rightnullptr){return false;}//检查左右子树的值相不相等&a…...

高考加油(Python+HTML)
前言 询问DeepSeek根据自己所学到的知识来生成多个可执行的代码,为高考学子加油。最开始生成的都会有点小问题,还是需要自己调试一遍,下面就是完整的代码,当然了最后几天也不会有多少人看,都在专心的备考。 Python励…...

贪心算法应用:Ford-Fulkerson最大流问题详解
Java中的贪心算法应用:Ford-Fulkerson最大流问题详解 1. 最大流问题概述 最大流问题(Maximum Flow Problem)是图论中的一个经典问题,旨在找到一个从源节点(source)到汇节点(sink)的最大流量。Ford-Fulkerson方法是解决最大流问题的经典算法之一,它属于贪心算法的范畴…...

UE5 Niagara 如何让四元数进行旋转
Axis Angle中,X,Y,Z分别为旋转的轴向,W为旋转的角度,在这里旋转角度不需要除以2,因为里面已经除了,再将计算好的四元数与要进行旋转的四元数进行相乘,结果就是按照原来的角度绕着某一轴向旋转了某一角度...

从“黑箱”到透明化:MES如何重构生产执行全流程?
引言 在传统制造企业中,生产执行环节常面临“计划混乱、进度难控、异常频发、数据滞后”的困境。人工派工效率低下、物料错配频发、质量追溯困难等问题,直接导致交付延期、成本攀升、客户流失。深蓝易网MES系统以全流程数字化管理为核心,通过…...

探索Linux互斥:线程安全与资源共享
个人主页:chian-ocean 文章专栏-Linux 前言: 互斥是并发编程中避免竞争条件和保护共享资源的核心技术。通过使用锁或信号量等机制,能够确保多线程或多进程环境下对共享资源的安全访问,避免数据不一致、死锁等问题。 竞争条件 竞…...

JWT安全:假密钥.【签名随便写实现越权绕过.】
JWT安全:假密钥【签名随便写实现越权绕过.】 JSON Web 令牌 (JWT)是一种在系统之间发送加密签名 JSON 数据的标准化格式。理论上,它们可以包含任何类型的数据,但最常用于在身份验证、会话处理和访问控制机制中发送有关用户的信息(“声明”)。…...

Python爬虫实战:抓取百度15天天气预报数据
🌐 编程基础第一期《9-30》–使用python中的第三方模块requests,和三个内置模块(re、json、pprint),实现百度地图的近15天天气信息抓取 记得安装 pip install requests📑 项目介绍 网络爬虫是Python最受欢迎的应用场景之一&…...

RV1126 + FFPEG多路码流项目
代码主体思路: 一.VI,VENC,RGA模块初始化 1.先创建一个自定义公共结构体,用于方便管理各个模块 rkmedia_config_public.h //文件名字#ifndef _RV1126_PUBLIC_H #define _RV1126_PUBLIC_H#include <assert.h> #include <fcntl.h> #include …...

NodeJS 基于 Koa, 开发一个读取文件,并返回给客户端文件下载,以及读取文件形成列表和文件删除的代码演示
前言 在上一篇文章 《Nodejs 实现 Mysql 数据库的全量备份的代码演示》 中,我们演示了如何将用户的 Mysql 数据库进行备份的代码。但是,这个备份,只是备份在了服务器上了。 而我们用户的真实需求,是需要将备份文件下载到本地进行…...

为什么在我的Flask里面有两个路由,但是在网页里有一个却不能正确访问到智能体
1. /zhoushibo 能访问,/chat 直接浏览器访问报 Method Not Allowed 原因: /zhoushibo 路由是你用 app.route(/zhoushibo) 定义的,返回的是一个HTML网页,浏览器访问没问题。 /chat 路由你用的是 app.route(/chat, methods[POST])…...

哈工大计算机系统2024大作业——Hello的程序人生
计算机系统 大作业 题 目 程序人生-Hello’s P2P 专 业 人工智能 学 号 2022112040 班 级 2203601 学 生 郄东昕 指 导 教 师 吴锐 计算机科学与技术学院…...

2025年软件测试面试八股文(含答案+文档)
🍅 点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快 Part1 1、你的测试职业发展是什么? 测试经验越多,测试能力越高。所以我的职业发展是需要时间积累的,一步步向着高级测试工程师…...
【仿生系统】qwen的仿生机器人解决方案
收到完整需求后,我将从系统架构设计、算法实现路径、训练方法三个维度为您构建完整的仿生机器人解决方案。以下是分阶段实施方案: 一、核心系统架构设计(模块化可进化架构) 1. 多模态感知引擎 - 视觉子系统:YOLOv8SAM…...

Flutter3.22适配运行鸿蒙系统问题记录
Flutter3.22适配运行鸿蒙系统问题记录 一:适配条件适配过程问题记录(1)环境配置问题(2)Concurrent modification during iteration: Instance(length:2) of_GrowableList 报错(3)三方插件寻找替…...

秋招Day10 - JVM - 内存管理
JVM组织架构主要有三个部分:类加载器、运行时数据区和字节码执行引擎 类加载器:负责从文件系统、网络或其他来源加载class文件,将class文件中的二进制数据加载到内存中运行时数据区:运行时的数据存放的区域,分为方法区…...

Spring Boot 3.5.0中文文档上线
Spring Boot 3.5.0 中文文档翻译完成,需要的可收藏 传送门:Spring Boot 3.5.0 中文文档...

Redisson学习专栏(一):快速入门及核心API实践
文章目录 前言一、Redisson简介1.1 什么是Redisson?1.2 解决了什么问题? 二、快速入门2.1 环境准备 2.2 基础配置三、核心API解析3.1 分布式锁(RLock)3.2 分布式集合3.2.1 RMap(分布式Map)3.2.2 RList&…...

Pandas学习入门一
1.什么是Pandas? Pandas是一个强大的分析结构化数据的工具集,基于NumPy构建,提供了高级数据结构和数据操作工具,它是使Python成为强大而高效的数据分析环境的重要因素之一。 一个强大的分析和操作大型结构化数据集所需的工具集基础是NumPy…...

基于Piecewise Jerk Speed Optimizer的速度规划算法(附ROS C++/Python仿真)
目录 1 时空解耦运动规划2 PJSO速度规划原理2.1 优化变量2.2 代价函数2.3 约束条件2.4 二次规划形式 3 算法仿真3.1 ROS C仿真3.2 Python仿真 1 时空解耦运动规划 在自主移动系统的运动规划体系中,时空解耦的递进式架构因其高效性与工程可实现性被广泛采用。这一架…...
关于 JavaScript 版本、TypeScript、Vue 的区别说明, PHP 开发者入门 Vue 的具体方案
以下是关于 JavaScript 版本、TypeScript、Vue 的区别说明,以及 PHP 开发者入门 Vue 的具体方案: 一、JavaScript 版本演进 JavaScript 的核心版本以 ECMAScript 规范(ES) 命名: 版本发布时间关键特性ES52009严格模式…...
中断和信号详解
三种中断 中断分为三种:硬件中断、异常中断、软中断 硬件中断 设备向中断控制器发送中断请求,中断控制器生成对应中断号,然后通过中断引脚向cpu发送高电平,cpu收到请求后不会立即处理,cpu会处理完当前指令ÿ…...