源码解析:Apache RocketMQ重置消费位点
引入
reset offset,即重置消费进度,一般在以下场景中使用:
- 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
- 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。
重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析
重置offset起始调用位置:
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。
这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:

- 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
- 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
- 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
- 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
- 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
- assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
- 最后prepare reset result并返回response。
- Client端处理:
- 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
- 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
- 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
- 再执行updateConsumeOffset:更新consumerOffset。
- 最后再resume,继续消费。
补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。
public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);if (null == map) {return null;} else {return map.remove(queueId);}
}
参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq
相关文章:
源码解析:Apache RocketMQ重置消费位点
引入 reset offset,即重置消费进度,一般在以下场景中使用: 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置…...
Python 自动化之处理docx文件(一)
批量筛选docx文档中关键词 文章目录 批量筛选docx文档中关键词前言一、做成什么样子二、基本架构三、前期输入模块1.引入库2.路径输入3.关键词输入 三、数据处理模块1.基本架构2.如果是docx文档2.1.读取当前文档内容2.2.遍历匹配关键字2.3.触发匹配并记录日志 3.如果目录下还有…...
Vue mixins详解
文章目录 前言Vue中的mixins详解什么是mixins简单例子mixins的特点mixins与vuex的区别mixins与公共组件的区别前言 在Vue中,mixins是一种可重用的代码片段,可以在多个组件中共享。它可以包含组件的选项,如data、methods、computed等,以及生命周期钩子函数。 本文将详细介…...
ssl证书问题导致本地启动前端服务报500
报错如下:注意查看报错信息 问题:系统原是http,后台调整为https后,ssl证书有点问题, vue项目本地服务,使用代理,webpack默认,证书强校验,导致请求无法发出,后…...
Rust 学习
Rust 官网:https://www.rust-lang.org/zh-CN/ 模块 库:https://crates.io/ 1、Rust 简介 Rust 语言的主要目标之一是解决传统 系统级编程语言(如 C 和 C)中常见的安全性问题,例如空指针引用、数据竞争等。为了实现这个…...
1.1 【应用开发】应用开发简介
写在前面 Screen图形子系统基于客户端/服务器模型,其中应用程序是请求图形服务的客户端(Screen)。它包括一个合成窗口系统作为这些服务之一,这意味着所有应用程序渲染都是在离屏缓冲区上执行的,然后可以在稍后用于更新…...
在windows系统搭建LVGL模拟器(codeblock工程)
1.codeblock准备 下载codeblock(mingw),安装。可参考网上教程。 2.pc_simulator_win_codeblocks 工程获取 仓库地址:lvgl/lv_port_win_codeblocks: Windows PC simulator project for LVGL embedded GUI Library (github.com) 拉取代码到本地硬盘&…...
2023第十四届蓝桥杯国赛 C/C++ 大学 B 组
文章目录 前言试题 A: 子 2023作者思考题解答案 试题 B: 双子数作者思考题解 试题 C: 班级活动作者思考题解 试题 D: 合并数列作者思考题解 试题 E: 数三角作者思考题解 试题 F: 删边问题作者思考题解 试题 G: AB 路线作者思考题解 试题 H: 抓娃娃作者思考题解 试题 I: 拼数字试…...
如何在页面中加入百度地图
官方文档:jspopularGL | 百度地图API SDK (baidu.com) 添加一下代码就可以实现 <!DOCTYPE html> <html> <head><meta name"viewport" content"initial-scale1.0, user-scalableno"/><meta http-equiv"Conten…...
Windows VC++提升当前进程权限到管理员权限
Windows VC提升当前进程权限 Windows VC提升当前进程权限到管理员权限 Windows VC提升当前进程权限到管理员权限 有时候Windows下我们需要提升当前进程的权限到管理员权限,相关VC代码如下: #ifndef SAFE_CLOSE_HANDLE #define SAFE_CLOSE_HANDLE(handl…...
算法leetcode|92. 反转链表 II(rust重拳出击)
文章目录 92. 反转链表 II:样例 1:样例 2:提示:进阶: 分析:题解:rust:go:c:python:java: 92. 反转链表 II: 给你单链表的…...
Chapter 7 - 3. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理
Pause Threshold for Long Distance Links长途链路的暂停阈值 This section uses the following basic concepts: 本节使用以下基本概念: Bit Time (BT): It is the time taken to transmit one bit. It is the reciprocal of the bit rate. For example, BT of a 10 GbE po…...
优雅玩转实验室服务器(二)传输文件
使用服务器最重要的肯定是传输文件了,我们不仅需要本地的一些资源上传到服务器,好进行实验,也需要将服务器计算得到的实验结果传输到本地,来进行预览或者报告撰写。 首先,由于涉及到服务器操作,我强烈推荐…...
动态面板简介以及ERP原型图案列
动态面板简介以及ERP原型图案列 1.Axure动态面板简介2.使用Axure制作ERP登录界面3.使用Asure完成左侧菜单栏4.使用Axuer完成公告栏5.使用Axuer完成左边侧边栏 1.Axure动态面板简介 在Axure RP中,动态面板是一种强大的交互设计工具,它允许你创建可交互的…...
漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)
TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…...
python 爬虫 m3u8 视频文件 加密解密 整合mp4
文章目录 一、完整代码二、视频分析1. 认识m3u8文件2. 获取密钥,构建解密器3. 下载ts文件4. 合并ts文件为mp4 三、总结 一、完整代码 完整代码如下: import requests from multiprocessing import Pool import re import os from tqdm import tqdm fro…...
mybatis中xml文件容易搞混的属性
目录 第一章、1.1)MyBatis中resultMap标签1.2)MyBatis的resultType1.3)MyBatis的parameterType1.4)type属性1.5)jdbcType属性1.6)javaType属性1.7)ofType属性 友情提醒: 先看文章目录ÿ…...
android Retrofit2.0请求 延长超时操作
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory;public class MyApiClient {private static final String BASE_URL "https://api.example.com/";// 创建 OkHttpClient,并设置超时时间…...
Axure之动态面板轮播图
目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦!!!希望能帮到你了哦!!! 一.介绍 Axure中的动态面板是一个非常有用的组…...
一文读懂算法中的时间复杂度和空间复杂度,O(1)、O(logn)、O(n)、O(n^2)、O(2^n) 附举例说明,常见的时间复杂度,空间复杂度
时间复杂度和空间复杂度是什么 时间复杂度(Time Complexity)是描述算法运行时间长短的一个度量。空间复杂度(Space Complexity)是描述算法在运行过程中所需要的存储空间大小的一个度量。 时间复杂度和空间复杂度是衡量算法性能…...
Xilinx Video IP实战:如何将HDMI输入转换为AXI4-Stream(附仿真+上板测试)
Xilinx Video IP实战:HDMI转AXI4-Stream全流程开发指南 在FPGA视频处理系统中,将HDMI等视频输入接口转换为标准化的AXI4-Stream协议是构建复杂视频处理流水线的关键第一步。不同于简单的接口转换,这一过程涉及视频时序解析、数据位宽适配、时…...
从轨迹到网络:广州休闲步行空间格局刻画 | 论文全解析与方法论深度拆解
从轨迹到网络:广州休闲步行空间格局刻画 | 论文全解析与方法论拆解 原文:From trajectories to network: Delineating the spatial pattern of recreational walking in Guangzhou》 一、论文核心概览:摘要与关键词 1.1 核心摘要解析 本文的核心内容可拆解为5个核心模块,…...
探索前沿技术趋势:2024年最值得关注的创新应用场景
1. 生成式AI的爆发式应用 2024年最让人兴奋的技术趋势,莫过于生成式AI从实验室走向千家万户。我最近测试了十几个主流AI创作工具,发现它们已经能完成许多过去认为"只有人类能做到"的任务。比如用Midjourney生成产品设计图,只需要简…...
用Rust还是JavaScript?Tauri 2.0系统托盘开发的两种姿势与选型建议
Tauri 2.0系统托盘开发:Rust与JavaScript的技术选型深度解析 当桌面应用需要常驻后台运行时,系统托盘功能便成为用户体验的关键组件。Tauri 2.0作为新一代跨平台桌面框架,允许开发者在前端JavaScript与后端Rust两种技术栈中实现这一功能。本文…...
保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍)
保研党必看:用本科论文逆袭IEEE二区期刊的5个关键操作(含时间管理秘籍) 在保研竞争日益激烈的当下,一篇高质量的学术论文往往能成为决定成败的关键。对于大多数本科生来说,科研经历有限、资源匮乏是普遍面临的困境。但…...
LEDPatternLib:非阻塞LED动画库设计与嵌入式实践
1. 项目概述LEDPatternLib 是一款面向嵌入式 LED 动画控制的轻量级、模块化 Arduino 库,专为资源受限的微控制器平台设计。其核心目标并非替代底层驱动,而是构建在成熟硬件抽象层之上的非阻塞(non-blocking)模式动画调度框架。该库…...
Antares LoRaWAN库深度解析:嵌入式LoRaWAN MAC层实现指南
1. Antares LoRaWAN 库深度技术解析:面向嵌入式工程师的 LoRaWAN MAC 层实现指南 1.1 库定位与工程价值 Antares LoRaWAN 是一个专为 Arduino 生态设计的轻量级 LoRaWAN MAC 层实现库,其核心价值不在于功能堆砌,而在于 可理解性、可调试性与…...
告别标注烦恼:用DINOv2自监督模型,在Intel Image数据集上3个epoch实现93%准确率
零标注成本实战:DINOv2自监督模型在Intel Image数据集上的高效迁移方案 当我在实验室第一次尝试用传统方法训练一个图像分类模型时,面对数千张需要手动标注的图片,几乎要放弃这个课题。直到发现了自监督学习这个宝藏领域——特别是DINOv2这样…...
嵌入式工程师技术成长路径:从单片机到Linux驱动开发
嵌入式工程师职业发展路径的技术思考1. 职业发展阶段与技术演进1.1 单片机开发阶段对于刚毕业的电子工程专业学生,单片机开发通常是职业起点。这一阶段主要涉及:8/16/32位微控制器(如STM32系列)的应用开发基础外设驱动开发(GPIO、UART、SPI、I2C等)实时操…...
天津专业的阀门厂排名
在天津,阀门行业发展态势良好,众多阀门厂各有特色与优势。中国通用机械工业协会最新发布的《2026年阀门行业高质量发展白皮书》显示,天津的阀门产业在技术创新、产品质量和市场份额等方面都有不错的表现。下面为大家介绍几家天津比较知名的阀…...
