当前位置: 首页 > news >正文

Kafka 消费组位移

Kafka 消费组位移

  • 消费者 API
  • 命令行

Kafka : 基于日志结构(log-based)的消息引擎

  • 消费消息时,只是从磁盘文件上读取数据,不会删除消息数据
  • 位移数据能由消费者控制,能很容易修改位移的值,实现重复消费历史数据

重设位移的两个维度 :

  • 位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值
  • 时间维度 : 将位移调整成 > 该时间的最小位移

7 种重设策略 :

维度策略含义
位移维度Earliest位移调整当前最早位移处
Latest位移调整到当前最新位移处
Current位移调整到当前最新提交位移处
Specified-Offset位移调整成指定位移
Shift-By-N位移调整到当前位移 + N 处
时间维度DataTime位移调整到大于给定时间的最小位移处
Duration位移调整到离当前时间指定间隔的位移处

Earliest 策略 : 将位移调整到主题当前最早位移处

  • 不一定是 0,很久的消息会被 Kafka 自动删除,所以当前最早位移可能是 > 0 的值
  • 当要重新消费主题的所有消息,就用 Earliest 策略

Latest 策略 : 把位移重设成最新末端位移

  • 当向某个主题发送 15 条消息,最新末端位移是 15
  • 当要跳过所有历史消息,从最新的消息处开始消费,就用 Latest 策略

Current 策略 : 将位移调整成消费者当前提交的最新位移

  • 修改消费者代码,并重启消费者,结果发现代码有问题,回滚前的代码变更,还把位移重设为消费者重启时的位置

Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处

  • 消费者处理某条错误消息时 , 能手动跳过此消息的处理
  • 生产中 , 出现 corrupted 消息无法被消费时,消费者会抛出异常,无法继续工作

Specified-Offset 策略 : 指定位移的绝对数值

Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离

  • 把位移重设成当前位移的前 100 条位移处,指定 N 为 -100

DateTime : 指定一个时间,将位移重置到该时间之后的最早位移处

  • 使用场景 : 重新消费昨天的数据,并使用该策略重设位移到昨天 0 点

Duration 策略 : 相对的时间间隔,将位移调整到距离当前给定时间间隔的位移处,格式是 PnDTnHnMnS

  • Java 8 引入 Duration : 以字母 P 开头,后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)
  • 将位移调回到 15 分钟前,就指定 PT0H15M0S

消费者 API

Earliest 策略 :

Properties consumerProperties = new Properties();
// 禁止自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 重设的消费者组的组 ID
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic = "test";  // 要重设位移的 Kafka 主题 
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo ->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
} 

Latest 策略 :

consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));

Current 策略 : 获取当前提交的最新位移 :

consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).forEach(tp -> {long committedOffset = consumer.committed(tp).offset();consumer.seek(tp, committedOffset);
});

Specified-Offset 策略 :

long targetOffset = 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset);
}

实现 Shift-By-N 策略

for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset = consumer.committed(tp).offset() + 123L; consumer.seek(tp, targetOffset);
}

DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点

long ts = LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> ts));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}

Duration 策略 : 将位移调回 30 分钟前

Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000  * 60));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}

命令行

Earliest 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-earliest –execute

Latest 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-latest --execute

Current 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-current --execute

Specified-Offset 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-offset <offset> --execute

Shift-By-N 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--shift-by <offset_N> --execute

DateTime 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--by-duration PT0H30M0S --execute

相关文章:

Kafka 消费组位移

Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构&#xff08;log-based&#xff09;的消息引擎 消费消息时&#xff0c;只是从磁盘文件上读取数据&#xff0c;不会删除消息数据位移数据能由消费者控制&#xff0c;能很容易修改位移的值&#xff0c;实现重复消费历史数据…...

Python|数学|贪心|数组|动态规划|单选记录:实现保留3位有效数字(四舍六入五成双规则)|用Python来创造一个提示用户输入数字的乘法表|最小路径和

1、实现保留3位有效数字&#xff08;四舍六入五成双规则&#xff09;&#xff08;数学&#xff0c;算法&#xff09; 贡献者&#xff1a;weixin_45782673 输入&#xff1a;1234 输出&#xff1a;1234 12 12.0 4 4.00 0.2 0.200 0.32 0.320 1.3 1.30 1.235 1.24 1.245 1.24 1.…...

【MySQL】MySQL的索引

目录 介绍 索引的分类 索引的操作-创建索引-单列索引-普通索引 格式 操作 索引的操作-创建索引-单列索引-唯一索引 索引的操作-创建索引-单列索引-主键索引 索引的操作-创建索引-组合索引 索引的操作-全文索引 索引的操作-空间索引 索引的验证 索引的特点 介绍…...

弱监督实例分割 Box-supervised Instance Segmentation with Level Set Evolution 论文笔记

弱监督实例分割 Box-supervised Instance Segmentation with Level Set Evolution 论文笔记一、Abstract二、引言三、相关工作3.1 基于 Box 的实例分割3.2 基于层级的分割四、提出的方法4.1 图像分割中的层级模型4.2 基于 Box 的实例分割在 Bounding Box 内的层级进化输入的数据…...

Springboot是什么

目录 为什么会要用springboot 1、之前 2、现在 springboot优点 springboot四大核心 自动装配介绍 1、自动装配作用是什么 2、自动装配原理 springboot starter是什么 1、starter作用 2、比如&#xff1a;我们想搭建java web框架 3、starter原理 SpringBootApplica…...

LeetCode 134. 加油站(函数图像法 / 贪心)

题目&#xff1a; 链接&#xff1a;LeetCode 134. 加油站 难度&#xff1a;中等 在一条环路上有 n 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中…...

王道计算机组成原理课代表 - 考研计算机 第三章 存储系统 究极精华总结笔记

本篇博客是考研期间学习王道课程 传送门 的笔记&#xff0c;以及一整年里对 计算机组成 知识点的理解的总结。希望对新一届的计算机考研人提供帮助&#xff01;&#xff01;&#xff01; 关于对 存储系统 章节知识点总结的十分全面&#xff0c;涵括了《计算机组成原理》课程里…...

Flask-mock接口数据流程

背景&#xff1a;由于在开发过程中&#xff0c;会遇到以下的痛点 1.服务端接口提测延期&#xff0c;具体接口逻辑未完成实现&#xff0c;接口未能正常调通&#xff0c;导致客户端提测停滞&#xff1b; 2.因为前期已在技术评审上已与客户端开发定好接口字段&#xff0c;客户端比…...

springboot项目配置序列化,反序列化器

介绍本文介绍在项目中时间类型、枚举类型的序列化和反序列化自定义的处理类&#xff0c;也可以使用注解。建议枚举都实现一个统一的接口&#xff0c;方便处理。我这定义了一个Dict接口。枚举类型注解处理这种方式比较灵活&#xff0c;可以让枚举按照自己的方式序列化&#xff0…...

c++11 标准模板(STL)(std::unordered_map)(九)

定义于头文件 <unordered_map> template< class Key, class T, class Hash std::hash<Key>, class KeyEqual std::equal_to<Key>, class Allocator std::allocator< std::pair<const Key, T> > > class unordered…...

Seay代码审计工具

一、简介Seay是基于C#语言开发的一款针对PHP代码安全性审计的系统&#xff0c;主要运行于Windows系统上。这款软件能够发现SQL注入、代码执行、命令执行、文件包含、文件上传、绕过转义防护、拒绝服务、XSS跨站、信息泄露、任意URL跳转等漏洞&#xff0c;基本上覆盖常见PHP漏洞…...

界面开发(4)--- PyQt5实现打开图像及视频播放功能

PyQt5创建打开图像及播放视频页面 上篇文章主要介绍了如何实现登录界面的账号密码注册及登录功能&#xff0c;还简单介绍了有关数据库的连接方法。这篇文章我们介绍一下如何在设计的页面中打开本地的图像&#xff0c;以及实现视频播放功能。 实现打开图像功能 为了便于记录实…...

核心系统国产平台迁移验证

核心系统国产平台迁移验证 摘要&#xff1a;信息技术应用创新&#xff0c;旨在实现信息技术领域的自主可控&#xff0c;保障国家信息安全。金融领域又是关系国家经济命脉的行业&#xff0c;而对核心交易系统的信息技术应用创新是交易所未来将要面临的重大挑战。为了推进国产化进…...

【数据结构之二叉树】——二叉树的概念及结构,特殊的二叉树和二叉树性质

文章目录一、二叉树的概念及结构1.概念2.现实中的二叉树3. 特殊的二叉树&#xff1a;3.二叉树的性质二、二叉树练习题总结一、二叉树的概念及结构 1.概念 一棵二叉树是结点的一个有限集合&#xff0c;该集合: 或者为空由一个根节点加上两棵别称为左子树和右子树的二叉树组成…...

Android学习之帧动画和视图动画

帧动画 帧动画中的每一帧其实都是一张图片&#xff0c;将许多图片连起来播放&#xff0c;就形成了帧动画。 在drawable目录下新建frmae_animation文件&#xff0c;在这个文件中定义了帧动画的每一帧要显示的图片&#xff0c;播放时&#xff0c;按从上到下显示。 <?xml v…...

vue2和vue3的区别

这周呢主要就是整理整理学的东西&#xff0c;不然看的也记不住&#xff0c;把这些学的东西做成笔记&#xff0c;感觉会清楚许多&#xff0c;这次就把vue2和vue3的区别总结一下&#xff0c;明天要考四级&#xff0c;嗐&#xff0c;本来想着复习四级&#xff0c;结果只写了一两套…...

【你不知道的事】JavaScript 中用一种更先进的方式进行深拷贝:structuredClone

你是否知道&#xff0c;JavaScript中有一种原生的方法来做对象的深拷贝? 本文我们要介绍的是 structuredClone 函数&#xff0c;它是内置在 JavaScript 运行时中的: const calendarEvent {title: "Builder.io Conf",date: new Date(123),attendees: ["Steve…...

XE开发Linux应用(二)-Webservice

新建一个工程。选择如图。继续输入服务名然后就生成对应的单元。增加linux 平台。完善对应的单元代码{ Invokable implementation File for Txaliontest which implements Ixaliontest }unit xaliontestImpl;interfaceuses Soap.InvokeRegistry, System.Types, Soap.XSBuiltIns…...

kubernetes实战与源码学习

1.1 关于Kubernetes的介绍与核心对象概念 关于Kubernetes的介绍与核心对象概念-阿里云开发者社区 k8s架构 核心对象 使用kubeadm10分钟部署k8集群 使用 KuboardSpray 安装kubernetes_v1.23.1 | Kuboard k8s-上部署第一个应用程序 Deployment基本概念 给应用添加service&a…...

CNCF x Alibaba云原生技术公开课 第八章 应用配置管理

Pod配置管理分类 可变配置就用 ConfigMap&#xff1b;敏感信息是用 Secret&#xff1b;身份认证是用 ServiceAccount&#xff1b;资源配置是用 Resources&#xff1b;安全管控是用 SecurityContext&#xff1b;前置校验是用 InitContainers。 1、ConfigMap 概念&#xff1a;…...

告别AI人像翻车!MusePublic艺术创作引擎保姆级入门教程

告别AI人像翻车&#xff01;MusePublic艺术创作引擎保姆级入门教程 &#x1f3db; MusePublic 艺术创作引擎是一款专为艺术感时尚人像创作设计的轻量化文本生成图像系统&#xff0c;基于MusePublic专属大模型&#xff0c;采用safetensors安全格式封装&#xff0c;深度优化优雅…...

OpenClaw跨平台部署:Qwen3.5-9B在mac/Windows/Linux下的差异处理

OpenClaw跨平台部署&#xff1a;Qwen3.5-9B在mac/Windows/Linux下的差异处理 1. 为什么需要关注跨平台差异&#xff1f; 去年我在帮团队搭建自动化办公流程时&#xff0c;发现一个有趣的现象&#xff1a;同样的OpenClaw配置脚本&#xff0c;在同事的MacBook上运行流畅&#x…...

GTE文本向量模型部署全攻略:从零到一搭建企业级文本处理服务

GTE文本向量模型部署全攻略&#xff1a;从零到一搭建企业级文本处理服务 1. 项目介绍与核心价值 如果你正在寻找一个能一站式解决中文文本分析难题的工具&#xff0c;那么GTE文本向量模型可能就是你的答案。想象一下&#xff0c;一个模型就能帮你识别文档里的关键人物、地点&…...

Chrome WebRTC 实战:构建高可靠实时通信系统的关键技术与避坑指南

最近在做一个需要实时音视频通信的项目&#xff0c;选型时自然想到了 WebRTC。虽然标准很美好&#xff0c;但在 Chrome 浏览器里真正把它用起来、特别是用到生产环境&#xff0c;那真是“坑”出不穷。从 NAT 穿不透导致连不上&#xff0c;到不同设备上视频卡成 PPT&#xff0c;…...

Go语言HTTP服务开发:从标准库到框架

Go语言HTTP服务开发&#xff1a;从标准库到框架 作为一个写了十几年代码的Go后端老兵&#xff0c;我在HTTP服务开发上踩过不少坑。今天就来分享一下Go语言HTTP服务开发的实践经验&#xff0c;从标准库到框架。 一、标准库net/http 1. 基本用法 package mainimport ("fmt&q…...

嵌入式系统的实时性能优化详解

嵌入式系统的实时性能优化详解 实时系统概述 实时系统是指能够在规定的时间内完成特定任务的系统&#xff0c;其正确性不仅取决于计算结果的正确性&#xff0c;还取决于结果产生的时间。在嵌入式系统中&#xff0c;实时性能优化至关重要。 实时系统分类 硬实时系统&#xf…...

若依分离版集成Activiti7:从零构建企业级流程中心

1. 环境准备与版本兼容性检查 在开始整合之前&#xff0c;我们需要先确认几个关键点。若依分离版是基于SpringBoot的前后端分离架构&#xff0c;而Activiti7作为新一代工作流引擎&#xff0c;两者整合最需要注意的就是版本兼容性。我去年在金融项目里就遇到过因为版本不匹配导致…...

Linux桌面定制——快速迁移状态栏位置的终端技巧

1. 为什么需要调整状态栏位置 第一次用Unity桌面时&#xff0c;我就被左侧的状态栏搞得浑身难受。作为常年使用Windows的用户&#xff0c;总觉得状态栏就该乖乖待在屏幕底部。后来发现不少Linux新手都有类似的困扰——明明是个高效的操作系统&#xff0c;却因为这种小细节影响使…...

从零搭建硬件测试台:手把手教你用LabVIEW连接菊水PBZ40电源并读取数据

从零搭建硬件测试台&#xff1a;LabVIEW与菊水PBZ40电源的深度集成实战 在工业自动化和科研测试领域&#xff0c;可编程电源的系统集成一直是工程师面临的常见挑战。菊水PBZ40系列作为实验室常用高精度电源&#xff0c;其RS232C接口与LabVIEW图形化编程环境的结合&#xff0c;能…...

从码农到冥府CTO:重建六道轮回系统的质量保障实践

第一章 职业跃迁&#xff1a;技术人的冥府晋升之路1.1 技术职级体系重构冥府技术团队沿用硅谷职级模型&#xff0c;但增设业力评估维度&#xff1a;L1 鬼卒程序员&#xff1a;执行生死簿数据录入&#xff08;日均处理10万条因果记录&#xff09;L3 无常高级工程师&#xff1a;负…...