源码解析:Apache RocketMQ重置消费位点
引入
reset offset,即重置消费进度,一般在以下场景中使用:
- 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
- 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。
重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析
重置offset起始调用位置:
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。
这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:
- 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
- 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
- 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
- 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
- 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
- assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
- 最后prepare reset result并返回response。
- Client端处理:
- 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
- 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
- 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
- 再执行updateConsumeOffset:更新consumerOffset。
- 最后再resume,继续消费。
补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。
public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);if (null == map) {return null;} else {return map.remove(queueId);}
}
参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq
相关文章:

源码解析:Apache RocketMQ重置消费位点
引入 reset offset,即重置消费进度,一般在以下场景中使用: 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置…...

Python 自动化之处理docx文件(一)
批量筛选docx文档中关键词 文章目录 批量筛选docx文档中关键词前言一、做成什么样子二、基本架构三、前期输入模块1.引入库2.路径输入3.关键词输入 三、数据处理模块1.基本架构2.如果是docx文档2.1.读取当前文档内容2.2.遍历匹配关键字2.3.触发匹配并记录日志 3.如果目录下还有…...
Vue mixins详解
文章目录 前言Vue中的mixins详解什么是mixins简单例子mixins的特点mixins与vuex的区别mixins与公共组件的区别前言 在Vue中,mixins是一种可重用的代码片段,可以在多个组件中共享。它可以包含组件的选项,如data、methods、computed等,以及生命周期钩子函数。 本文将详细介…...

ssl证书问题导致本地启动前端服务报500
报错如下:注意查看报错信息 问题:系统原是http,后台调整为https后,ssl证书有点问题, vue项目本地服务,使用代理,webpack默认,证书强校验,导致请求无法发出,后…...

Rust 学习
Rust 官网:https://www.rust-lang.org/zh-CN/ 模块 库:https://crates.io/ 1、Rust 简介 Rust 语言的主要目标之一是解决传统 系统级编程语言(如 C 和 C)中常见的安全性问题,例如空指针引用、数据竞争等。为了实现这个…...

1.1 【应用开发】应用开发简介
写在前面 Screen图形子系统基于客户端/服务器模型,其中应用程序是请求图形服务的客户端(Screen)。它包括一个合成窗口系统作为这些服务之一,这意味着所有应用程序渲染都是在离屏缓冲区上执行的,然后可以在稍后用于更新…...

在windows系统搭建LVGL模拟器(codeblock工程)
1.codeblock准备 下载codeblock(mingw),安装。可参考网上教程。 2.pc_simulator_win_codeblocks 工程获取 仓库地址:lvgl/lv_port_win_codeblocks: Windows PC simulator project for LVGL embedded GUI Library (github.com) 拉取代码到本地硬盘&…...
2023第十四届蓝桥杯国赛 C/C++ 大学 B 组
文章目录 前言试题 A: 子 2023作者思考题解答案 试题 B: 双子数作者思考题解 试题 C: 班级活动作者思考题解 试题 D: 合并数列作者思考题解 试题 E: 数三角作者思考题解 试题 F: 删边问题作者思考题解 试题 G: AB 路线作者思考题解 试题 H: 抓娃娃作者思考题解 试题 I: 拼数字试…...

如何在页面中加入百度地图
官方文档:jspopularGL | 百度地图API SDK (baidu.com) 添加一下代码就可以实现 <!DOCTYPE html> <html> <head><meta name"viewport" content"initial-scale1.0, user-scalableno"/><meta http-equiv"Conten…...
Windows VC++提升当前进程权限到管理员权限
Windows VC提升当前进程权限 Windows VC提升当前进程权限到管理员权限 Windows VC提升当前进程权限到管理员权限 有时候Windows下我们需要提升当前进程的权限到管理员权限,相关VC代码如下: #ifndef SAFE_CLOSE_HANDLE #define SAFE_CLOSE_HANDLE(handl…...

算法leetcode|92. 反转链表 II(rust重拳出击)
文章目录 92. 反转链表 II:样例 1:样例 2:提示:进阶: 分析:题解:rust:go:c:python:java: 92. 反转链表 II: 给你单链表的…...

Chapter 7 - 3. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理
Pause Threshold for Long Distance Links长途链路的暂停阈值 This section uses the following basic concepts: 本节使用以下基本概念: Bit Time (BT): It is the time taken to transmit one bit. It is the reciprocal of the bit rate. For example, BT of a 10 GbE po…...

优雅玩转实验室服务器(二)传输文件
使用服务器最重要的肯定是传输文件了,我们不仅需要本地的一些资源上传到服务器,好进行实验,也需要将服务器计算得到的实验结果传输到本地,来进行预览或者报告撰写。 首先,由于涉及到服务器操作,我强烈推荐…...

动态面板简介以及ERP原型图案列
动态面板简介以及ERP原型图案列 1.Axure动态面板简介2.使用Axure制作ERP登录界面3.使用Asure完成左侧菜单栏4.使用Axuer完成公告栏5.使用Axuer完成左边侧边栏 1.Axure动态面板简介 在Axure RP中,动态面板是一种强大的交互设计工具,它允许你创建可交互的…...

漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)
TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…...

python 爬虫 m3u8 视频文件 加密解密 整合mp4
文章目录 一、完整代码二、视频分析1. 认识m3u8文件2. 获取密钥,构建解密器3. 下载ts文件4. 合并ts文件为mp4 三、总结 一、完整代码 完整代码如下: import requests from multiprocessing import Pool import re import os from tqdm import tqdm fro…...
mybatis中xml文件容易搞混的属性
目录 第一章、1.1)MyBatis中resultMap标签1.2)MyBatis的resultType1.3)MyBatis的parameterType1.4)type属性1.5)jdbcType属性1.6)javaType属性1.7)ofType属性 友情提醒: 先看文章目录ÿ…...
android Retrofit2.0请求 延长超时操作
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory;public class MyApiClient {private static final String BASE_URL "https://api.example.com/";// 创建 OkHttpClient,并设置超时时间…...

Axure之动态面板轮播图
目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦!!!希望能帮到你了哦!!! 一.介绍 Axure中的动态面板是一个非常有用的组…...
一文读懂算法中的时间复杂度和空间复杂度,O(1)、O(logn)、O(n)、O(n^2)、O(2^n) 附举例说明,常见的时间复杂度,空间复杂度
时间复杂度和空间复杂度是什么 时间复杂度(Time Complexity)是描述算法运行时间长短的一个度量。空间复杂度(Space Complexity)是描述算法在运行过程中所需要的存储空间大小的一个度量。 时间复杂度和空间复杂度是衡量算法性能…...

超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
C# SqlSugar:依赖注入与仓储模式实践
C# SqlSugar:依赖注入与仓储模式实践 在 C# 的应用开发中,数据库操作是必不可少的环节。为了让数据访问层更加简洁、高效且易于维护,许多开发者会选择成熟的 ORM(对象关系映射)框架,SqlSugar 就是其中备受…...

代码规范和架构【立芯理论一】(2025.06.08)
1、代码规范的目标 代码简洁精炼、美观,可持续性好高效率高复用,可移植性好高内聚,低耦合没有冗余规范性,代码有规可循,可以看出自己当时的思考过程特殊排版,特殊语法,特殊指令,必须…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

Linux中《基础IO》详细介绍
目录 理解"文件"狭义理解广义理解文件操作的归类认知系统角度文件类别 回顾C文件接口打开文件写文件读文件稍作修改,实现简单cat命令 输出信息到显示器,你有哪些方法stdin & stdout & stderr打开文件的方式 系统⽂件I/O⼀种传递标志位…...

C++中vector类型的介绍和使用
文章目录 一、vector 类型的简介1.1 基本介绍1.2 常见用法示例1.3 常见成员函数简表 二、vector 数据的插入2.1 push_back() —— 在尾部插入一个元素2.2 emplace_back() —— 在尾部“就地”构造对象2.3 insert() —— 在任意位置插入一个或多个元素2.4 emplace() —— 在任意…...
Linux信号保存与处理机制详解
Linux信号的保存与处理涉及多个关键机制,以下是详细的总结: 1. 信号的保存 进程描述符(task_struct):每个进程的PCB中包含信号相关信息。 pending信号集:记录已到达但未处理的信号(未决信号&a…...

Git 使用大全:从入门到精通
Git 是目前最流行的分布式版本控制系统,被广泛应用于软件开发中。本文将全面介绍 Git 的各种功能和使用方法,包含大量代码示例和实践建议。 文章目录 Git 基础概念版本控制系统Git 的特点Git 的三个区域Git 文件状态 Git 安装与配置安装 GitLinuxmacOSWi…...