当前位置: 首页 > news >正文

源码解析:Apache RocketMQ重置消费位点

引入

reset offset,即重置消费进度,一般在以下场景中使用:

  1. 需要重新消费已经消费过的消息,重置到最早位置或根据时间进行重置。
  2. 消息积压,不需要消费积压的消息,重置到最新位置,使其从最新位置开始消费。

重置到最早、或者根据时间进行重置与消息补发的区别?
● 消息补发是将原先的消息由生产者重发一次,与区别的那边消息本质上不是同一条消息(除了消息体一样以外)。
● 重置操作是操作消费位点(offset),本质上还是消费生产者之前发送的那条消息。
源码解析

重置offset起始调用位置:

org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamporg.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent

区别:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetNewConcurrent
● 这个看看用来并发的重置消费者的offset。可以多个consumer、多个queue可以同时进行处理。
org.apache.rocketmq.tools.admin.DefaultMQAdminExt#resetOffsetByTimestamp
● 用来根据给定的时间戳来重置消费者的偏移量。

这两个入口本质上都是resetOffset,没有本质上的区别,我们以resetOffsetNewConcurrent为例,具体流程如下图:
在这里插入图片描述

  • 首先是examineTopicRouteInfo:主要是获取topic的路由信息,如果路由信息不存在,则无法进行后续操作。
  • 再者是InvokeBrokerToResetOffset:根据上一步拿到的路由信息,遍历路由,一次向broker发起调用。
  • 请求到达服务端(Broker端),判断是否是Broker端侧处理 ○ Broker端处理:
    • 前置检查(look-ahead check):判断当前BrokerRole是否正确、检验当前Topic、ConsumerGroup是否存在,不满足任意条件,直接返回。
    • 将传递过来的offset或者根据timestamp查询到的offset统一放置到queueOffsetMap中
    • assignResetOffset:将上一步的queueOffsetMap的offset放到对应的resetOffsetTable和offsetTable中。
    • 最后prepare reset result并返回response。
  • Client端处理:
    • 先执行queryOffset:查询当前topic下的group下offsetTable中是是否存储了offset信息,有就返回对应的值,没有返回-1;
    • 前置检查(look-ahead check):检查上一步返回结果consumerOffset是否为-1,为-1表示当前group不存在;检查timeStampOffset是否满足条件;满足上述所有条件将timeStampOffset/consumerOffset中较为小的值放到offsetTable中,如果是C的客户端,直接将timestampOffset放入offsetTable中。
    • 请求到达客户端后,先将对应的consumer挂起(suspend),清除ProcessQueue中的消息,在sleep 10s。
    • 再执行updateConsumeOffset:更新consumerOffset。
    • 最后再resume,继续消费。

补充:
如果是服务端重置,重置之后的offset会写入resetOffsetTable中,在后续进行拉取操作的时候会删除resetOffsetTable中对应的offset;如果queryThenEraseResetOffset中有返回值,将resetOffset作为GetMessageResult的nextBeginOffset,拉取操作用的offset。

public Long queryThenEraseResetOffset(String topic, String group, Integer queueId) {String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);if (null == map) {return null;} else {return map.remove(queueId);}
}

参考:
● https://rocketmq.apache.org/
● https://github.com/apache/rocketmq

相关文章:

源码解析:Apache RocketMQ重置消费位点

引入 reset offset&#xff0c;即重置消费进度&#xff0c;一般在以下场景中使用&#xff1a; 需要重新消费已经消费过的消息&#xff0c;重置到最早位置或根据时间进行重置。消息积压&#xff0c;不需要消费积压的消息&#xff0c;重置到最新位置&#xff0c;使其从最新位置…...

Python 自动化之处理docx文件(一)

批量筛选docx文档中关键词 文章目录 批量筛选docx文档中关键词前言一、做成什么样子二、基本架构三、前期输入模块1.引入库2.路径输入3.关键词输入 三、数据处理模块1.基本架构2.如果是docx文档2.1.读取当前文档内容2.2.遍历匹配关键字2.3.触发匹配并记录日志 3.如果目录下还有…...

Vue mixins详解

文章目录 前言Vue中的mixins详解什么是mixins简单例子mixins的特点mixins与vuex的区别mixins与公共组件的区别前言 在Vue中,mixins是一种可重用的代码片段,可以在多个组件中共享。它可以包含组件的选项,如data、methods、computed等,以及生命周期钩子函数。 本文将详细介…...

ssl证书问题导致本地启动前端服务报500

报错如下&#xff1a;注意查看报错信息 问题&#xff1a;系统原是http&#xff0c;后台调整为https后&#xff0c;ssl证书有点问题&#xff0c; vue项目本地服务&#xff0c;使用代理&#xff0c;webpack默认&#xff0c;证书强校验&#xff0c;导致请求无法发出&#xff0c;后…...

Rust 学习

Rust 官网&#xff1a;https://www.rust-lang.org/zh-CN/ 模块 库&#xff1a;https://crates.io/ 1、Rust 简介 Rust 语言的主要目标之一是解决传统 系统级编程语言&#xff08;如 C 和 C&#xff09;中常见的安全性问题&#xff0c;例如空指针引用、数据竞争等。为了实现这个…...

1.1 【应用开发】应用开发简介

写在前面 Screen图形子系统基于客户端/服务器模型&#xff0c;其中应用程序是请求图形服务的客户端&#xff08;Screen&#xff09;。它包括一个合成窗口系统作为这些服务之一&#xff0c;这意味着所有应用程序渲染都是在离屏缓冲区上执行的&#xff0c;然后可以在稍后用于更新…...

在windows系统搭建LVGL模拟器(codeblock工程)

1.codeblock准备 下载codeblock(mingw)&#xff0c;安装。可参考网上教程。 2.pc_simulator_win_codeblocks 工程获取 仓库地址&#xff1a;lvgl/lv_port_win_codeblocks: Windows PC simulator project for LVGL embedded GUI Library (github.com) 拉取代码到本地硬盘&…...

2023第十四届蓝桥杯国赛 C/C++ 大学 B 组

文章目录 前言试题 A: 子 2023作者思考题解答案 试题 B: 双子数作者思考题解 试题 C: 班级活动作者思考题解 试题 D: 合并数列作者思考题解 试题 E: 数三角作者思考题解 试题 F: 删边问题作者思考题解 试题 G: AB 路线作者思考题解 试题 H: 抓娃娃作者思考题解 试题 I: 拼数字试…...

如何在页面中加入百度地图

官方文档&#xff1a;jspopularGL | 百度地图API SDK (baidu.com) 添加一下代码就可以实现 <!DOCTYPE html> <html> <head><meta name"viewport" content"initial-scale1.0, user-scalableno"/><meta http-equiv"Conten…...

Windows VC++提升当前进程权限到管理员权限

Windows VC提升当前进程权限 Windows VC提升当前进程权限到管理员权限 Windows VC提升当前进程权限到管理员权限 有时候Windows下我们需要提升当前进程的权限到管理员权限&#xff0c;相关VC代码如下&#xff1a; #ifndef SAFE_CLOSE_HANDLE #define SAFE_CLOSE_HANDLE(handl…...

算法leetcode|92. 反转链表 II(rust重拳出击)

文章目录 92. 反转链表 II&#xff1a;样例 1&#xff1a;样例 2&#xff1a;提示&#xff1a;进阶&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 92. 反转链表 II&#xff1a; 给你单链表的…...

Chapter 7 - 3. Congestion Management in Ethernet Storage Networks以太网存储网络的拥塞管理

Pause Threshold for Long Distance Links长途链路的暂停阈值 This section uses the following basic concepts: 本节使用以下基本概念: Bit Time (BT): It is the time taken to transmit one bit. It is the reciprocal of the bit rate. For example, BT of a 10 GbE po…...

优雅玩转实验室服务器(二)传输文件

使用服务器最重要的肯定是传输文件了&#xff0c;我们不仅需要本地的一些资源上传到服务器&#xff0c;好进行实验&#xff0c;也需要将服务器计算得到的实验结果传输到本地&#xff0c;来进行预览或者报告撰写。 首先&#xff0c;由于涉及到服务器操作&#xff0c;我强烈推荐…...

动态面板简介以及ERP原型图案列

动态面板简介以及ERP原型图案列 1.Axure动态面板简介2.使用Axure制作ERP登录界面3.使用Asure完成左侧菜单栏4.使用Axuer完成公告栏5.使用Axuer完成左边侧边栏 1.Axure动态面板简介 在Axure RP中&#xff0c;动态面板是一种强大的交互设计工具&#xff0c;它允许你创建可交互的…...

漏刻有时百度地图API实战开发(12)(切片工具的使用、添加自定义图层TileLayer)

TileLayer向地图中添加自定义图层 var tileLayer new BMap.TileLayer();tileLayer.getTilesUrl function (tileCoord, zoom) {var x tileCoord.x;var y tileCoord.y;return images/tiles/ zoom /tile- x _ y .png;}var lockMap new BMap.MapType(lock_map, tileLaye…...

python 爬虫 m3u8 视频文件 加密解密 整合mp4

文章目录 一、完整代码二、视频分析1. 认识m3u8文件2. 获取密钥&#xff0c;构建解密器3. 下载ts文件4. 合并ts文件为mp4 三、总结 一、完整代码 完整代码如下&#xff1a; import requests from multiprocessing import Pool import re import os from tqdm import tqdm fro…...

mybatis中xml文件容易搞混的属性

目录 第一章、1.1&#xff09;MyBatis中resultMap标签1.2&#xff09;MyBatis的resultType1.3&#xff09;MyBatis的parameterType1.4&#xff09;type属性1.5&#xff09;jdbcType属性1.6&#xff09;javaType属性1.7&#xff09;ofType属性 友情提醒: 先看文章目录&#xff…...

android Retrofit2.0请求 延长超时操作

import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory;public class MyApiClient {private static final String BASE_URL "https://api.example.com/";// 创建 OkHttpClient&#xff0c;并设置超时时间…...

Axure之动态面板轮播图

目录 一.介绍 二.好处 三.动态面板轮播图 四.动态面板多方式登录 五.ERP登录 六.ERP的左侧菜单栏 七.ERP的公告栏 今天就到这了哦&#xff01;&#xff01;&#xff01;希望能帮到你了哦&#xff01;&#xff01;&#xff01; 一.介绍 Axure中的动态面板是一个非常有用的组…...

一文读懂算法中的时间复杂度和空间复杂度,O(1)、O(logn)、O(n)、O(n^2)、O(2^n) 附举例说明,常见的时间复杂度,空间复杂度

时间复杂度和空间复杂度是什么 时间复杂度&#xff08;Time Complexity&#xff09;是描述算法运行时间长短的一个度量。空间复杂度&#xff08;Space Complexity&#xff09;是描述算法在运行过程中所需要的存储空间大小的一个度量。 时间复杂度和空间复杂度是衡量算法性能…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

从零实现富文本编辑器#5-编辑器选区模型的状态结构表达

先前我们总结了浏览器选区模型的交互策略&#xff0c;并且实现了基本的选区操作&#xff0c;还调研了自绘选区的实现。那么相对的&#xff0c;我们还需要设计编辑器的选区表达&#xff0c;也可以称为模型选区。编辑器中应用变更时的操作范围&#xff0c;就是以模型选区为基准来…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...