当前位置: 首页 > news >正文

springcloud rocketmq 新增的消费者组从哪里开始消费

如果新建一个新的消费者组,是否会消费历史消息,导致重复消费?

直接在 console 界面新增消费者组,但是没有办法绑定订阅关系,没有找到入口,在 控制台项目源码 rocketmq-externals 也没有找到可以确定订阅关系的接口,在阿里云的生产控制台也没有绑定的入口。
在这里插入图片描述
在这里插入图片描述
所以只能是消费者启动后再注册订阅关系。
消费者从哪里消费的计算:
RebalancePushImpl.java
在这里插入图片描述

默认走的是:CONSUME_FROM_LAST_OFFSET 规则,按照官方说法,是从最后的消费位点开始继续消费。
关键的获取消费位点的逻辑:readOffset方法:
RemoteBrokerOffsetStore.java
在这里插入图片描述
集群模式下,是从远程获取的偏移量,跟据 fetchConsumeOffsetFromBroker 方法:
在这里插入图片描述
在这里插入图片描述
报错,其实就是服务端没有该消费者组的offset,被catch住默认返回 -1.
又不是重试队列,所以拿最大的偏置,broker-a queue-8 的 brokerOffset 是 25
在这里插入图片描述
出来到了 RebalanceImpl.java 的 updateProcessQueueTableInRebalance 方法。
在这里插入图片描述
然后会被添加到 pullRequestList 通过 this.dispatchPullRequest(pullRequestList)

控制台topic消费进度中已经保存了新的消费者组的消费进度,但 consumeOffset都是 0, 还有 759 个消息没有消费。
在这里插入图片描述

消费者消费了一些比较早前的消息:
在这里插入图片描述
在这里插入图片描述

消费进度也随之更新。
在这里插入图片描述
为什么和官方的说法不一致呢?CONSUME_FROM_LAST_OFFSET 为什么没有起到作用?
参考官方的修复:Fix CONSUME_FROM_LAST_OFFSET mode may pull data from 0L #4909

~~
至于这个console怎么看?参考以下:
在rocketmq的控制台中,选择 topic -> consumer manage,就可以查看一个主题下的消费者组、集群、队列的消费情况。
在这里插入图片描述
其中,10.122.24.41 是本人的内网ip,如果我本地线程卡住了(或者debug中),这个在线状态也会下线的。目前我是分配到了其中的8个集群队列,broker-a(8~15)。
在这里插入图片描述
offsetTable 的内容和我所描述的一致。
此外:
我还有对比组,是之前创建的废弃的消费者组,集群位点 brokerOffset 不变,消费位点 consumerOffset 落后了许多,落后的总量 diffTotal 代表此消费者组还有这么多未消费的消息。而且也没有在线的消费者客户端 consumerClient。
在这里插入图片描述
如果这时,我配置启动消费者去消费此消费者组。预计会消费 delay = 294 个消息。
结果也确实如此,将消息消费完,而且分配到了所有集群的所有队列。
在这里插入图片描述

测试结果:默认策略会从offset = 0 开始消费。
在这里插入图片描述
所以该参数没有用,还是从0开始消费了,这时候只能靠消费者组重置位点操作了。

~~
tags过滤,是服务端过滤,为什么会直接将不需要的消息也丢失掉呢?
这就要涉及到订阅关系一致性。
在这里插入图片描述
在这里插入图片描述

参考:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering

tags过滤会将不匹配的直接跳过(丢失)
我的理解是,现在没有为某个tags有单独记录消费进度的地方,所谓的服务端过滤,也只是说用hashcode快速匹配拉取而已,之后也是直接将offset拉到队列尾的。

参考:https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg?spm=a2c6h.12873639.article-detail.18.3ba035175CHVos

相关文章:

springcloud rocketmq 新增的消费者组从哪里开始消费

如果新建一个新的消费者组,是否会消费历史消息,导致重复消费? 直接在 console 界面新增消费者组,但是没有办法绑定订阅关系,没有找到入口,在 控制台项目源码 rocketmq-externals 也没有找到可以确定订阅关系…...

Redis-缓存

什么是缓存? 缓存就像自行车和越野车的避震器,降低硬着陆造成的损害 缓存就是系统的避震器,,防止过高的数据访问猛冲系统,导致其操作线程无法及时处理信息而瘫痪 缓存(Cache),就是数据交换的缓冲区,俗称的缓存就是缓冲区内的数据,一般从数…...

MySQL练习05

题目 步骤 触发器 use mydb16_trigger; #使用数据库create table goods( gid char(8) primary key, name varchar(10), price decimal(8,2), num int);create table orders( oid int primary key auto_increment, gid char(10) not null, name varchar(10), price decima…...

[C++][STL源码剖析] 详解AVL树的实现

目录 1.概念 2.实现 2.1 初始化 2.2 插入 2.2.1 旋转(重点) 左单旋 右单旋 双旋 2.❗ 双旋后,对平衡因子的处理 2.3 判断测试 完整代码: 拓展:删除 1.概念 二叉搜索树虽可以缩短查找的效率,但…...

Kubernetes存储 - Node本地存储卷

官方文档 Kubernetes管理的Node本地存储目前有三种,分别是EmptyDir,HostPath,Local,EmptyDir是一种与Pod同生命周期的Node临时存储;HostPath是Node的目录;Local是基于持久卷(PV)管理的Node目录。接下来详细说明这几种类型如何以存…...

Cocos Creator2D游戏开发-(2)Cocos 常见名词

场景(Scene): 它一个容器,容纳游戏中的各个元素,如精灵,标签,节点对象。它负责着游戏的运行逻辑,以帧为单位渲染这些内容。就是你理解到的那个场景; 个人理解就是一个画面, 一个游戏不同的关卡,会有不同的…...

【不同设备间的数据库连接】被连接设备如何开权限给申请连接的设备

为了方便叙述,简称申请连接数据库的设备为a,被连接的为b 1.确保在同一局域网下,检查a的ip 如果你设置的动态ip,那么每重启一次这个ip都会变。两种选择,每次都给b同步一下你的最新ip,或者a设置成静态ip。具…...

Whisper离线部署问题处理

Whisper是OpenAI开发一款开源语音识别模型,可以帮我们低成本的拥有语音识别的能力。具体的安装部署方法,我在这里就不详细说了,网上有很多相关文章: 使用OpenAI的Whisper 模型进行语音识别 (baidu.com) 我这里主要想说的是&…...

【Hive SQL】数据探查-数据抽样

文章目录 数据随机抽样1、随机数排序抽样(rand())2、数据块抽样(tablesample())3、分桶抽样 数据随机抽样 在大规模数据量的数据分析及建模任务中,往往针对全量数据进行挖掘分析时会十分耗时和占用集群资源&#xff0c…...

微信答题小程序产品研发-需求分析与原型设计

欲知应候何时节,六月初迎大暑风。 我前面说过,我决意仿一款答题小程序,所以我做了大量的调研。 题库软件产品开发不仅仅是写代码这一环,它包含从需求调研、分析与构思、设计到开发、测试再到部署上线一系列复杂过程。 需求分析…...

基础模板Mybatis-plus+Springboot+Mysql开发配置文件

1.pom.xml <dependencies><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.2.2</version></dependency>// mybatisplus功能<dependency&g…...

java-poi实现excel自定义注解生成数据并导出

因为项目很多地方需要使用导出数据excel的功能&#xff0c;所以开发了一个简易的统一生成导出方法。 依赖 <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>4.0.1</version…...

LeetCode707 设计链表

前言 题目&#xff1a; 707. 设计链表 文档&#xff1a; 代码随想录——设计链表 编程语言&#xff1a; C 解题状态&#xff1a; 代码功底不够&#xff0c;只能写个大概 思路 主要考察对链表结构的熟悉程度&#xff0c;对链表的增删改查&#xff0c;比较考验代码功底以及对链表…...

[Mysql-DDL数据操作语句]

目录 DDL语句操作数据库 库&#xff1a; 查看&#xff1a;show 创建&#xff1a;creat 删除&#xff1a;drop 使用(切换)&#xff1a;use 表&#xff1a; 查看&#xff1a;desc show 创建&#xff1a;create 表结构修改 rename as add drop modify change rename as …...

google 浏览器插件开发简单学习案例:TodoList;打包成crx离线包

参考&#xff1a; google插件支持&#xff1a; https://blog.csdn.net/weixin_42357472/article/details/140412993 这里是把前面做的TodoList做成google插件&#xff0c;具体网页可以参考下面链接 TodoList网页&#xff1a; https://blog.csdn.net/weixin_42357472/article/de…...

如何学习Doris:糙快猛的大数据之路(从入门到专家)

引言:大数据世界的新玩家 还记得我第一次听说"Doris"这个名字时的情景吗?那是在一个炎热的夏日午后,我正在办公室里为接下来的大数据项目发愁。作为一个刚刚跨行到大数据领域的新手,我感觉自己就像是被丢进了深海的小鱼—周围全是陌生的概念和技术。 就在这时,我的…...

梯度下降算法,gradient descent algorithm

定义&#xff1a;是一个优化算法&#xff0c;也成最速下降算法&#xff0c;主要的部的士通过迭代找到目标函数的最小值&#xff0c;或者收敛到最小值。 说人话就是求一个函数的极值点&#xff0c;极大值或者极小值 算法过程中有几个超参数&#xff1a; 学习率n&#xff0c;又称…...

Spring boot 2.0 升级到 3.3.1 的相关问题 (六)

文章目录 Spring boot 2.0 升级到 3.3.1 的相关问题 &#xff08;六&#xff09;spring-data-redis 和 Spring AOP 警告的问题问题描述问题调研结论解决方案方案1-将冲突的Bean 提升为InfrastructureBean方案2 其他相关资料 Spring boot 2.0 升级到 3.3.1 的相关问题 &#xff…...

C++模版基础知识与STL基本介绍

目录 一. 泛型编程 二. 函数模板 1. 概念 2. 函数模版格式 3. 函数模版的原理 4. 模版函数的实例化 (1). 隐式实例化 (2.) 显式实例化 5. 模版参数的匹配原则 三. 类模板 1. 类模板的定义格式 2. 类模板的实例化 四. STL的介绍 1. 什么是STL&#xff1f; 2. STL的版…...

Android 防止重复点击

1.第一种方式&#xff1a; // 两次点击按钮之间的点击间隔不能少于1000毫秒 private static final int MIN_CLICK_DELAY_TIME 700; private static long lastClickTime; /** * 是否是快速点击 * return */ public static boolean isFastClick() { …...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad&#xff08;Adaptive Gradient Algorithm&#xff09;是一种自适应学习率的优化算法&#xff0c;由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率&#xff0c;适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

ip子接口配置及删除

配置永久生效的子接口&#xff0c;2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

Docker 本地安装 mysql 数据库

Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker &#xff1b;并安装。 基础操作不再赘述。 打开 macOS 终端&#xff0c;开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的

修改bug思路&#xff1a; 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑&#xff1a;async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...

STM32HAL库USART源代码解析及应用

STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...

LangFlow技术架构分析

&#x1f527; LangFlow 的可视化技术栈 前端节点编辑器 底层框架&#xff1a;基于 &#xff08;一个现代化的 React 节点绘图库&#xff09; 功能&#xff1a; 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...

pycharm 设置环境出错

pycharm 设置环境出错 pycharm 新建项目&#xff0c;设置虚拟环境&#xff0c;出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...