源码解析:Apache RocketMQ重置消费位点
引入
reset offset,即重置消费进度,一般在以下场景中使用:
- 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
- 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。
重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析
重置offset起始调用位置:
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。
这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:

- 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
- 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
- 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
- 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
- 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
- assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
- 最后prepare reset result并返回response。
- Client端处理:
- 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
- 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
- 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
- 再执行updateConsumeOffset:更新consumerOffset。
- 最后再resume,继续消费。
补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。
public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);if (null == map) {return null;} else {return map.remove(queueId);}
}
参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq
相关文章:
源码解析:Apache RocketMQ重置消费位点
引入 reset offset,即重置消费进度,一般在以下场景中使用: 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置…...
Python 自动化之处理docx文件(一)
批量筛选docx文档中关键词 文章目录 批量筛选docx文档中关键词前言一、做成什么样子二、基本架构三、前期输入模块1.引入库2.路径输入3.关键词输入 三、数据处理模块1.基本架构2.如果是docx文档2.1.读取当前文档内容2.2.遍历匹配关键字2.3.触发匹配并记录日志 3.如果目录下还有…...
Vue mixins详解
文章目录 前言Vue中的mixins详解什么是mixins简单例子mixins的特点mixins与vuex的区别mixins与公共组件的区别前言 在Vue中,mixins是一种可重用的代码片段,可以在多个组件中共享。它可以包含组件的选项,如data、methods、computed等,以及生命周期钩子函数。 本文将详细介…...
ssl证书问题导致本地启动前端服务报500
报错如下:注意查看报错信息 问题:系统原是http,后台调整为https后,ssl证书有点问题, vue项目本地服务,使用代理,webpack默认,证书强校验,导致请求无法发出,后…...
Rust 学习
Rust 官网:https://www.rust-lang.org/zh-CN/ 模块 库:https://crates.io/ 1、Rust 简介 Rust 语言的主要目标之一是解决传统 系统级编程语言(如 C 和 C)中常见的安全性问题,例如空指针引用、数据竞争等。为了实现这个…...
1.1 【应用开发】应用开发简介
写在前面 Screen图形子系统基于客户端/服务器模型,其中应用程序是请求图形服务的客户端(Screen)。它包括一个合成窗口系统作为这些服务之一,这意味着所有应用程序渲染都是在离屏缓冲区上执行的,然后可以在稍后用于更新…...
在windows系统搭建LVGL模拟器(codeblock工程)
1.codeblock准备 下载codeblock(mingw),安装。可参考网上教程。 2.pc_simulator_win_codeblocks 工程获取 仓库地址:lvgl/lv_port_win_codeblocks: Windows PC simulator project for LVGL embedded GUI Library (github.com) 拉取代码到本地硬盘&…...
2023第十四届蓝桥杯国赛 C/C++ 大学 B 组
文章目录 前言试题 A: 子 2023作者思考题解答案 试题 B: 双子数作者思考题解 试题 C: 班级活动作者思考题解 试题 D: 合并数列作者思考题解 试题 E: 数三角作者思考题解 试题 F: 删边问题作者思考题解 试题 G: AB 路线作者思考题解 试题 H: 抓娃娃作者思考题解 试题 I: 拼数字试…...
如何在页面中加入百度地图
官方文档:jspopularGL | 百度地图API SDK (baidu.com) 添加一下代码就可以实现 <!DOCTYPE html> <html> <head><meta name"viewport" content"initial-scale1.0, user-scalableno"/><meta http-equiv"Conten…...
Windows VC++提升当前进程权限到管理员权限
Windows VC提升当前进程权限 Windows VC提升当前进程权限到管理员权限 Windows VC提升当前进程权限到管理员权限 有时候Windows下我们需要提升当前进程的权限到管理员权限,相关VC代码如下: #ifndef SAFE_CLOSE_HANDLE #define SAFE_CLOSE_HANDLE(handl…...
算法leetcode|92. 反转链表 II(rust重拳出击)
文章目录 92. 反转链表 II:样例 1:样例 2:提示:进阶: 分析:题解:rust:go:c:python:java: 92. 反转链表 II: 给你单链表的…...
Chapter 7 - 3. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理
Pause Threshold for Long Distance Links长途链路的暂停阈值 This section uses the following basic concepts: 本节使用以下基本概念: Bit Time (BT): It is the time taken to transmit one bit. It is the reciprocal of the bit rate. For example, BT of a 10 GbE po…...
优雅玩转实验室服务器(二)传输文件
使用服务器最重要的肯定是传输文件了,我们不仅需要本地的一些资源上传到服务器,好进行实验,也需要将服务器计算得到的实验结果传输到本地,来进行预览或者报告撰写。 首先,由于涉及到服务器操作,我强烈推荐…...
动态面板简介以及ERP原型图案列
动态面板简介以及ERP原型图案列 1.Axure动态面板简介2.使用Axure制作ERP登录界面3.使用Asure完成左侧菜单栏4.使用Axuer完成公告栏5.使用Axuer完成左边侧边栏 1.Axure动态面板简介 在Axure RP中,动态面板是一种强大的交互设计工具,它允许你创建可交互的…...
漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)
TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…...
python 爬虫 m3u8 视频文件 加密解密 整合mp4
文章目录 一、完整代码二、视频分析1. 认识m3u8文件2. 获取密钥,构建解密器3. 下载ts文件4. 合并ts文件为mp4 三、总结 一、完整代码 完整代码如下: import requests from multiprocessing import Pool import re import os from tqdm import tqdm fro…...
mybatis中xml文件容易搞混的属性
目录 第一章、1.1)MyBatis中resultMap标签1.2)MyBatis的resultType1.3)MyBatis的parameterType1.4)type属性1.5)jdbcType属性1.6)javaType属性1.7)ofType属性 友情提醒: 先看文章目录ÿ…...
android Retrofit2.0请求 延长超时操作
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory;public class MyApiClient {private static final String BASE_URL "https://api.example.com/";// 创建 OkHttpClient,并设置超时时间…...
Axure之动态面板轮播图
目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦!!!希望能帮到你了哦!!! 一.介绍 Axure中的动态面板是一个非常有用的组…...
一文读懂算法中的时间复杂度和空间复杂度,O(1)、O(logn)、O(n)、O(n^2)、O(2^n) 附举例说明,常见的时间复杂度,空间复杂度
时间复杂度和空间复杂度是什么 时间复杂度(Time Complexity)是描述算法运行时间长短的一个度量。空间复杂度(Space Complexity)是描述算法在运行过程中所需要的存储空间大小的一个度量。 时间复杂度和空间复杂度是衡量算法性能…...
React第五十七节 Router中RouterProvider使用详解及注意事项
前言 在 React Router v6.4 中,RouterProvider 是一个核心组件,用于提供基于数据路由(data routers)的新型路由方案。 它替代了传统的 <BrowserRouter>,支持更强大的数据加载和操作功能(如 loader 和…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
vue3+vite项目中使用.env文件环境变量方法
vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量,这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
Web后端基础(基础知识)
BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...
DBLP数据库是什么?
DBLP(Digital Bibliography & Library Project)Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高,数据库文献更新速度很快,很好地反映了国际计算机科学学术研…...
