当前位置: 首页 > news >正文

源码解析:Apache RocketMQ重置消费位点

引入

reset offset,即重置消费进度,一般在以下场景中使用:

  1. 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
  2. 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。

重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析

重置offset起始调用位置:

org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent

区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。

这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:
在这里插入图片描述

  • 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
  • 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
  • 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
    • 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
    • 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
    • assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
    • 最后prepare reset result并返回response。
  • Client端处理:
    • 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
    • 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
    • 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
    • 再执行updateConsumeOffset:更新consumerOffset。
    • 最后再resume,继续消费。

补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。

public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);if (null == map) {return null;} else {return map.remove(queueId);}
}

参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq

相关文章:

源码解析:Apache RocketMQ重置消费位点

引入 reset offset&#xff0c;即重置消费进度&#xff0c;一般在以下场景中使用&#xff1a; 需要重新消费已经消费过的消息&#xff0c;重置到最早位置或根据时间进行重置。消息积压&#xff0c;不需要消费积压的消息&#xff0c;重置到最新位置&#xff0c;使其从最新位置…...

Python 自动化之处理docx文件(一)

批量筛选docx文档中关键词 文章目录 批量筛选docx文档中关键词前言一、做成什么样子二、基本架构三、前期输入模块1.引入库2.路径输入3.关键词输入 三、数据处理模块1.基本架构2.如果是docx文档2.1.读取当前文档内容2.2.遍历匹配关键字2.3.触发匹配并记录日志 3.如果目录下还有…...

Vue mixins详解

文章目录 前言Vue中的mixins详解什么是mixins简单例子mixins的特点mixins与vuex的区别mixins与公共组件的区别前言 在Vue中,mixins是一种可重用的代码片段,可以在多个组件中共享。它可以包含组件的选项,如data、methods、computed等,以及生命周期钩子函数。 本文将详细介…...

ssl证书问题导致本地启动前端服务报500

报错如下&#xff1a;注意查看报错信息 问题&#xff1a;系统原是http&#xff0c;后台调整为https后&#xff0c;ssl证书有点问题&#xff0c; vue项目本地服务&#xff0c;使用代理&#xff0c;webpack默认&#xff0c;证书强校验&#xff0c;导致请求无法发出&#xff0c;后…...

Rust 学习

Rust 官网&#xff1a;https://www.rust-lang.org/zh-CN/ 模块 库&#xff1a;https://crates.io/ 1、Rust 简介 Rust 语言的主要目标之一是解决传统 系统级编程语言&#xff08;如 C 和 C&#xff09;中常见的安全性问题&#xff0c;例如空指针引用、数据竞争等。为了实现这个…...

1.1 【应用开发】应用开发简介

写在前面 Screen图形子系统基于客户端/服务器模型&#xff0c;其中应用程序是请求图形服务的客户端&#xff08;Screen&#xff09;。它包括一个合成窗口系统作为这些服务之一&#xff0c;这意味着所有应用程序渲染都是在离屏缓冲区上执行的&#xff0c;然后可以在稍后用于更新…...

在windows系统搭建LVGL模拟器(codeblock工程)

1.codeblock准备 下载codeblock(mingw)&#xff0c;安装。可参考网上教程。 2.pc_simulator_win_codeblocks 工程获取 仓库地址&#xff1a;lvgl/lv_port_win_codeblocks: Windows PC simulator project for LVGL embedded GUI Library (github.com) 拉取代码到本地硬盘&…...

2023第十四届蓝桥杯国赛 C/C++ 大学 B 组

文章目录 前言试题 A: 子 2023作者思考题解答案 试题 B: 双子数作者思考题解 试题 C: 班级活动作者思考题解 试题 D: 合并数列作者思考题解 试题 E: 数三角作者思考题解 试题 F: 删边问题作者思考题解 试题 G: AB 路线作者思考题解 试题 H: 抓娃娃作者思考题解 试题 I: 拼数字试…...

如何在页面中加入百度地图

官方文档&#xff1a;jspopularGL | 百度地图API SDK (baidu.com) 添加一下代码就可以实现 <!DOCTYPE html> <html> <head><meta name"viewport" content"initial-scale1.0, user-scalableno"/><meta http-equiv"Conten…...

Windows VC++提升当前进程权限到管理员权限

Windows VC提升当前进程权限 Windows VC提升当前进程权限到管理员权限 Windows VC提升当前进程权限到管理员权限 有时候Windows下我们需要提升当前进程的权限到管理员权限&#xff0c;相关VC代码如下&#xff1a; #ifndef SAFE_CLOSE_HANDLE #define SAFE_CLOSE_HANDLE(handl…...

算法leetcode|92. 反转链表 II(rust重拳出击)

文章目录 92. 反转链表 II&#xff1a;样例 1&#xff1a;样例 2&#xff1a;提示&#xff1a;进阶&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 92. 反转链表 II&#xff1a; 给你单链表的…...

Chapter 7 - 3. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理

Pause Threshold for Long Distance Links长途链路的暂停阈值 This section uses the following basic concepts: 本节使用以下基本概念: Bit Time (BT): It is the time taken to transmit one bit. It is the reciprocal of the bit rate. For example, BT of a 10 GbE po…...

优雅玩转实验室服务器(二)传输文件

使用服务器最重要的肯定是传输文件了&#xff0c;我们不仅需要本地的一些资源上传到服务器&#xff0c;好进行实验&#xff0c;也需要将服务器计算得到的实验结果传输到本地&#xff0c;来进行预览或者报告撰写。 首先&#xff0c;由于涉及到服务器操作&#xff0c;我强烈推荐…...

动态面板简介以及ERP原型图案列

动态面板简介以及ERP原型图案列 1.Axure动态面板简介2.使用Axure制作ERP登录界面3.使用Asure完成左侧菜单栏4.使用Axuer完成公告栏5.使用Axuer完成左边侧边栏 1.Axure动态面板简介 在Axure RP中&#xff0c;动态面板是一种强大的交互设计工具&#xff0c;它允许你创建可交互的…...

漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)

TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…...

python 爬虫 m3u8 视频文件 加密解密 整合mp4

文章目录 一、完整代码二、视频分析1. 认识m3u8文件2. 获取密钥&#xff0c;构建解密器3. 下载ts文件4. 合并ts文件为mp4 三、总结 一、完整代码 完整代码如下&#xff1a; import requests from multiprocessing import Pool import re import os from tqdm import tqdm fro…...

mybatis中xml文件容易搞混的属性

目录 第一章、1.1&#xff09;MyBatis中resultMap标签1.2&#xff09;MyBatis的resultType1.3&#xff09;MyBatis的parameterType1.4&#xff09;type属性1.5&#xff09;jdbcType属性1.6&#xff09;javaType属性1.7&#xff09;ofType属性 友情提醒: 先看文章目录&#xff…...

android Retrofit2.0请求 延长超时操作

import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory;public class MyApiClient {private static final String BASE_URL "https://api.example.com/";// 创建 OkHttpClient&#xff0c;并设置超时时间…...

Axure之动态面板轮播图

目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦&#xff01;&#xff01;&#xff01;希望能帮到你了哦&#xff01;&#xff01;&#xff01; 一.介绍 Axure中的动态面板是一个非常有用的组…...

一文读懂算法中的时间复杂度和空间复杂度,O(1)、O(logn)、O(n)、O(n^2)、O(2^n) 附举例说明,常见的时间复杂度,空间复杂度

时间复杂度和空间复杂度是什么 时间复杂度&#xff08;Time Complexity&#xff09;是描述算法运行时间长短的一个度量。空间复杂度&#xff08;Space Complexity&#xff09;是描述算法在运行过程中所需要的存储空间大小的一个度量。 时间复杂度和空间复杂度是衡量算法性能…...

Xilinx Video IP实战:如何将HDMI输入转换为AXI4-Stream(附仿真+上板测试)

Xilinx Video IP实战&#xff1a;HDMI转AXI4-Stream全流程开发指南 在FPGA视频处理系统中&#xff0c;将HDMI等视频输入接口转换为标准化的AXI4-Stream协议是构建复杂视频处理流水线的关键第一步。不同于简单的接口转换&#xff0c;这一过程涉及视频时序解析、数据位宽适配、时…...

从轨迹到网络:广州休闲步行空间格局刻画 | 论文全解析与方法论深度拆解

从轨迹到网络:广州休闲步行空间格局刻画 | 论文全解析与方法论拆解 原文:From trajectories to network: Delineating the spatial pattern of recreational walking in Guangzhou》 一、论文核心概览:摘要与关键词 1.1 核心摘要解析 本文的核心内容可拆解为5个核心模块,…...

探索前沿技术趋势:2024年最值得关注的创新应用场景

1. 生成式AI的爆发式应用 2024年最让人兴奋的技术趋势&#xff0c;莫过于生成式AI从实验室走向千家万户。我最近测试了十几个主流AI创作工具&#xff0c;发现它们已经能完成许多过去认为"只有人类能做到"的任务。比如用Midjourney生成产品设计图&#xff0c;只需要简…...

用Rust还是JavaScript?Tauri 2.0系统托盘开发的两种姿势与选型建议

Tauri 2.0系统托盘开发&#xff1a;Rust与JavaScript的技术选型深度解析 当桌面应用需要常驻后台运行时&#xff0c;系统托盘功能便成为用户体验的关键组件。Tauri 2.0作为新一代跨平台桌面框架&#xff0c;允许开发者在前端JavaScript与后端Rust两种技术栈中实现这一功能。本文…...

保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍)

保研党必看&#xff1a;用本科论文逆袭IEEE二区期刊的5个关键操作&#xff08;含时间管理秘籍&#xff09; 在保研竞争日益激烈的当下&#xff0c;一篇高质量的学术论文往往能成为决定成败的关键。对于大多数本科生来说&#xff0c;科研经历有限、资源匮乏是普遍面临的困境。但…...

LEDPatternLib:非阻塞LED动画库设计与嵌入式实践

1. 项目概述LEDPatternLib 是一款面向嵌入式 LED 动画控制的轻量级、模块化 Arduino 库&#xff0c;专为资源受限的微控制器平台设计。其核心目标并非替代底层驱动&#xff0c;而是构建在成熟硬件抽象层之上的非阻塞&#xff08;non-blocking&#xff09;模式动画调度框架。该库…...

Antares LoRaWAN库深度解析:嵌入式LoRaWAN MAC层实现指南

1. Antares LoRaWAN 库深度技术解析&#xff1a;面向嵌入式工程师的 LoRaWAN MAC 层实现指南 1.1 库定位与工程价值 Antares LoRaWAN 是一个专为 Arduino 生态设计的轻量级 LoRaWAN MAC 层实现库&#xff0c;其核心价值不在于功能堆砌&#xff0c;而在于 可理解性、可调试性与…...

告别标注烦恼:用DINOv2自监督模型,在Intel Image数据集上3个epoch实现93%准确率

零标注成本实战&#xff1a;DINOv2自监督模型在Intel Image数据集上的高效迁移方案 当我在实验室第一次尝试用传统方法训练一个图像分类模型时&#xff0c;面对数千张需要手动标注的图片&#xff0c;几乎要放弃这个课题。直到发现了自监督学习这个宝藏领域——特别是DINOv2这样…...

嵌入式工程师技术成长路径:从单片机到Linux驱动开发

嵌入式工程师职业发展路径的技术思考1. 职业发展阶段与技术演进1.1 单片机开发阶段对于刚毕业的电子工程专业学生&#xff0c;单片机开发通常是职业起点。这一阶段主要涉及&#xff1a;8/16/32位微控制器(如STM32系列)的应用开发基础外设驱动开发(GPIO、UART、SPI、I2C等)实时操…...

天津专业的阀门厂排名

在天津&#xff0c;阀门行业发展态势良好&#xff0c;众多阀门厂各有特色与优势。中国通用机械工业协会最新发布的《2026年阀门行业高质量发展白皮书》显示&#xff0c;天津的阀门产业在技术创新、产品质量和市场份额等方面都有不错的表现。下面为大家介绍几家天津比较知名的阀…...