源码解析:Apache RocketMQ重置消费位点
引入
reset offset,即重置消费进度,一般在以下场景中使用:
- 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
- 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。
重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析
重置offset起始调用位置:
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。
这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:

- 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
- 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
- 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
- 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
- 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
- assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
- 最后prepare reset result并返回response。
- Client端处理:
- 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
- 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
- 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
- 再执行updateConsumeOffset:更新consumerOffset。
- 最后再resume,继续消费。
补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。
public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);if (null == map) {return null;} else {return map.remove(queueId);}
}
参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq
相关文章:
源码解析:Apache RocketMQ重置消费位点
引入 reset offset,即重置消费进度,一般在以下场景中使用: 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置…...
Python 自动化之处理docx文件(一)
批量筛选docx文档中关键词 文章目录 批量筛选docx文档中关键词前言一、做成什么样子二、基本架构三、前期输入模块1.引入库2.路径输入3.关键词输入 三、数据处理模块1.基本架构2.如果是docx文档2.1.读取当前文档内容2.2.遍历匹配关键字2.3.触发匹配并记录日志 3.如果目录下还有…...
Vue mixins详解
文章目录 前言Vue中的mixins详解什么是mixins简单例子mixins的特点mixins与vuex的区别mixins与公共组件的区别前言 在Vue中,mixins是一种可重用的代码片段,可以在多个组件中共享。它可以包含组件的选项,如data、methods、computed等,以及生命周期钩子函数。 本文将详细介…...
ssl证书问题导致本地启动前端服务报500
报错如下:注意查看报错信息 问题:系统原是http,后台调整为https后,ssl证书有点问题, vue项目本地服务,使用代理,webpack默认,证书强校验,导致请求无法发出,后…...
Rust 学习
Rust 官网:https://www.rust-lang.org/zh-CN/ 模块 库:https://crates.io/ 1、Rust 简介 Rust 语言的主要目标之一是解决传统 系统级编程语言(如 C 和 C)中常见的安全性问题,例如空指针引用、数据竞争等。为了实现这个…...
1.1 【应用开发】应用开发简介
写在前面 Screen图形子系统基于客户端/服务器模型,其中应用程序是请求图形服务的客户端(Screen)。它包括一个合成窗口系统作为这些服务之一,这意味着所有应用程序渲染都是在离屏缓冲区上执行的,然后可以在稍后用于更新…...
在windows系统搭建LVGL模拟器(codeblock工程)
1.codeblock准备 下载codeblock(mingw),安装。可参考网上教程。 2.pc_simulator_win_codeblocks 工程获取 仓库地址:lvgl/lv_port_win_codeblocks: Windows PC simulator project for LVGL embedded GUI Library (github.com) 拉取代码到本地硬盘&…...
2023第十四届蓝桥杯国赛 C/C++ 大学 B 组
文章目录 前言试题 A: 子 2023作者思考题解答案 试题 B: 双子数作者思考题解 试题 C: 班级活动作者思考题解 试题 D: 合并数列作者思考题解 试题 E: 数三角作者思考题解 试题 F: 删边问题作者思考题解 试题 G: AB 路线作者思考题解 试题 H: 抓娃娃作者思考题解 试题 I: 拼数字试…...
如何在页面中加入百度地图
官方文档:jspopularGL | 百度地图API SDK (baidu.com) 添加一下代码就可以实现 <!DOCTYPE html> <html> <head><meta name"viewport" content"initial-scale1.0, user-scalableno"/><meta http-equiv"Conten…...
Windows VC++提升当前进程权限到管理员权限
Windows VC提升当前进程权限 Windows VC提升当前进程权限到管理员权限 Windows VC提升当前进程权限到管理员权限 有时候Windows下我们需要提升当前进程的权限到管理员权限,相关VC代码如下: #ifndef SAFE_CLOSE_HANDLE #define SAFE_CLOSE_HANDLE(handl…...
算法leetcode|92. 反转链表 II(rust重拳出击)
文章目录 92. 反转链表 II:样例 1:样例 2:提示:进阶: 分析:题解:rust:go:c:python:java: 92. 反转链表 II: 给你单链表的…...
Chapter 7 - 3. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理
Pause Threshold for Long Distance Links长途链路的暂停阈值 This section uses the following basic concepts: 本节使用以下基本概念: Bit Time (BT): It is the time taken to transmit one bit. It is the reciprocal of the bit rate. For example, BT of a 10 GbE po…...
优雅玩转实验室服务器(二)传输文件
使用服务器最重要的肯定是传输文件了,我们不仅需要本地的一些资源上传到服务器,好进行实验,也需要将服务器计算得到的实验结果传输到本地,来进行预览或者报告撰写。 首先,由于涉及到服务器操作,我强烈推荐…...
动态面板简介以及ERP原型图案列
动态面板简介以及ERP原型图案列 1.Axure动态面板简介2.使用Axure制作ERP登录界面3.使用Asure完成左侧菜单栏4.使用Axuer完成公告栏5.使用Axuer完成左边侧边栏 1.Axure动态面板简介 在Axure RP中,动态面板是一种强大的交互设计工具,它允许你创建可交互的…...
漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)
TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…...
python 爬虫 m3u8 视频文件 加密解密 整合mp4
文章目录 一、完整代码二、视频分析1. 认识m3u8文件2. 获取密钥,构建解密器3. 下载ts文件4. 合并ts文件为mp4 三、总结 一、完整代码 完整代码如下: import requests from multiprocessing import Pool import re import os from tqdm import tqdm fro…...
mybatis中xml文件容易搞混的属性
目录 第一章、1.1)MyBatis中resultMap标签1.2)MyBatis的resultType1.3)MyBatis的parameterType1.4)type属性1.5)jdbcType属性1.6)javaType属性1.7)ofType属性 友情提醒: 先看文章目录ÿ…...
android Retrofit2.0请求 延长超时操作
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory;public class MyApiClient {private static final String BASE_URL "https://api.example.com/";// 创建 OkHttpClient,并设置超时时间…...
Axure之动态面板轮播图
目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦!!!希望能帮到你了哦!!! 一.介绍 Axure中的动态面板是一个非常有用的组…...
一文读懂算法中的时间复杂度和空间复杂度,O(1)、O(logn)、O(n)、O(n^2)、O(2^n) 附举例说明,常见的时间复杂度,空间复杂度
时间复杂度和空间复杂度是什么 时间复杂度(Time Complexity)是描述算法运行时间长短的一个度量。空间复杂度(Space Complexity)是描述算法在运行过程中所需要的存储空间大小的一个度量。 时间复杂度和空间复杂度是衡量算法性能…...
HTML 列表、表格、表单
1 列表标签 作用:布局内容排列整齐的区域 列表分类:无序列表、有序列表、定义列表。 例如: 1.1 无序列表 标签:ul 嵌套 li,ul是无序列表,li是列表条目。 注意事项: ul 标签里面只能包裹 li…...
2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
面试高频问题
文章目录 🚀 消息队列核心技术揭秘:从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"?性能背后的秘密1.1 顺序写入与零拷贝:性能的双引擎1.2 分区并行:数据的"八车道高速公路"1.3 页缓存与批量处理…...
React从基础入门到高级实战:React 实战项目 - 项目五:微前端与模块化架构
React 实战项目:微前端与模块化架构 欢迎来到 React 开发教程专栏 的第 30 篇!在前 29 篇文章中,我们从 React 的基础概念逐步深入到高级技巧,涵盖了组件设计、状态管理、路由配置、性能优化和企业级应用等核心内容。这一次&…...
轻量级Docker管理工具Docker Switchboard
简介 什么是 Docker Switchboard ? Docker Switchboard 是一个轻量级的 Web 应用程序,用于管理 Docker 容器。它提供了一个干净、用户友好的界面来启动、停止和监控主机上运行的容器,使其成为本地开发、家庭实验室或小型服务器设置的理想选择…...
RabbitMQ 各类交换机
为什么要用交换机? 交换机用来路由消息。如果直发队列,这个消息就被处理消失了,那别的队列也需要这个消息怎么办?那就要用到交换机 交换机类型 1,fanout:广播 特点 广播所有消息:将消息…...
