当前位置: 首页 > news >正文

RocketMQ生产者消息发送出去了,消费者一直接收不到怎么办?(Rocket MQ订阅关系一致性)

问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME,(如下图) 这种诡异现象该怎么解决?
在这里插入图片描述

1. 先说解决方案

这种情况99%是由于订阅关系不一致导致的,可以排查下程序看看是否有多个消费者使用了同一个group,并且订阅了不同的主题。逻辑图展示如下:
在这里插入图片描述
这种情况只需要将不同的消费者的group区分一下即可, 逻辑关系图变成如下这种:
在这里插入图片描述

到此为止,是不是惊奇的发现,问题解决了?

2. 注意事项:订阅关系一致性

看下Rocket MQ官方文档给出的说明:

定义
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。

和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

在这里插入图片描述
这里面只描述出了Tag的一致,事实上下面这种订阅关系也是错误的,同一个group中的两个消费者分别订阅了不同的主题, 违背了定义中的消费行为一致原则:

//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA);
//Consumer c2Consumer 
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicB);

3. 剖析源码实现,分析原因

从GitHub下载rocketmq源码通过idea打开之后,从官方提供的example进来:
在这里插入图片描述
进入到DefaultMQPushConsumer构造方法中,可以发现初始化了一个DefaultMQPushConsumerImpl类:

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 这里初始化一个默认的push类型的Consumer实现类defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}

然后继续进入到DefaultMQPushConsumerImpl类中, 可以看见有一个成员变量MQClientInstance mQClientFactory, 在DefaultMQPushConsumerImpl类的start()(启动消费者)方法中会通过MQClientManager初始化MQClientInstance类.
在这里插入图片描述
接着跳转到MQClientInstance构造方法中, 会发现有这样一行代码, 初始化了一个rebalanceService. 这个rebalanceService就是RocketMQ隔一段时间进行rebalance的核心实现.
在这里插入图片描述
继续剖析RebalanceService类, 发现其实现了Runnable接口, 话不多说, 直接看其 run()方法中做了什么事.

呀! 原来是隔一段时间调用一次上述咱们提到的DefaultMQPushConsumerImpl类中的doRebalance()方法, 搞了半天又绕回来了. … … … … … …

直接进入到这里面, 看看rebalance的逻辑:
在这里插入图片描述
集群部署模式下, 会进行rebalance操作, 根据topic名称和group名称获取到所有的consumer列表.

case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 这里根据topic名称和Group进行获取到所有的consumerList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}

但是进去这行代码里面发现, topic名称仅仅用来获取Broker的网络地址, 真正获取到所有Consumer列表的是通过Group名称获取的, 看到这里相信大家基本上能够恍然大悟. 回归到上面的问题: 如果一个一个Group中的多个消费者分别订阅了不同的主题, 即: 消费行为不一致, 无论这个属于当前Group中的消费者是否订阅了这个主题, 都会参与rebalance.
在这里插入图片描述
画图解释一下, 假设在同一个Group下, 两个Consumer都分别订阅了Topic1和Topic2, 这种情况订阅关系一致,
在这里插入图片描述
假设消费者1消费Topic2的速度比较快, 经过一次rebalance之后, Consumer订阅的队列逻辑有可能成为这样的:
在这里插入图片描述
此时由于订阅关系的一致性, 整体系统并不会出现问题. 接下来看一种情况, 同一个消费组中的Consumer1 订阅了Topic1, Consumer2订阅了Topic2, 初始情况逻辑关系是这样:
在这里插入图片描述
由于进行rebalance是通过Group获取对应的消费者客户端ID, 因此rebalance之后可能出现Consumer1 指向了Topic2中的某一个队列, 同理, Consumer2指向了Topic1中的队列. 但是这与Consumer中设定的topic不一致, 因此会出现RocketMQ中消息状态为为NOT_COMSUME_YET

(个人通过对源码的简单梳理总结的文章, 如有错误欢迎指正)

相关文章:

RocketMQ生产者消息发送出去了,消费者一直接收不到怎么办?(Rocket MQ订阅关系一致性)

问题: 使用RocketMQ消息队列&#xff0c;生产者将数据发送出去了&#xff0c;但是生产者一致没接收到&#xff08;或者是间隔好几分钟&#xff0c;突然接收到一条数据&#xff09;怎么办&#xff1f;并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME&#…...

使用Golang开发硬件驱动

1. 介绍 Golang是一种简洁、高效的编程语言&#xff0c;它的强大并发性能和丰富的标准库使得它成为了开发硬件驱动的理想选择。在本文中&#xff0c;我们将探讨如何使用Golang开发硬件驱动程序&#xff0c;并提供一个实例来帮助你入门。 2. 准备工作 在开始之前&#xff0c;…...

设计模式(19)命令模式

一、介绍&#xff1a; 1、定义&#xff1a;命令模式&#xff08;Command Pattern&#xff09;是一种行为设计模式&#xff0c;它将请求封装为一个对象&#xff0c;从而使你可以使用不同的请求对客户端进行参数化。命令模式还支持请求的排队、记录日志、撤销操作等功能。 2、组…...

QModelIndex 与QStandardItem相互转换

目录 1、 QModelIndex 转换成QStandardItem 2 、QStandardItem 转换成 QModelIndex 3、示例 4、总结 1、 QModelIndex 转换成QStandardItem QStandardItem * itemQStandardItemModel::​itemFromIndex(const QModelIndex & index) const 借助QStandardItemModel来完成…...

Linux - 进程地址空间

前言 首先&#xff0c;我们先要对 内存当中存储 各个数据之间的 结构要有一个 大概的了解&#xff1a; 各个区当中存储的数据使用类型不同&#xff0c;所以&#xff0c;这些数据在使用方式上是有差别的。比如下面这个例子&#xff1a; 在C 语言当中我们不能直接对 上述的 str…...

系统架构设计师-第16章-嵌入式系统架构设计理论与实践-软考学习笔记

嵌入式系统( Embedded System) 是为了特定应用而专门构建的计算机系统&#xff0c;其架构是随着嵌入式系统的逐步应用而发展形成的。嵌入式软件架构的设计与嵌入式系统的体系架构是密不可分的。因此&#xff0c;本常首先介绍嵌入式系统硬件相关知识&#xff08;系统特征、硬件组…...

pod进阶

目录 资源限制 CPU 资源单位 内存 资源单位 实例 健康检查 探针的三种规则&#xff1a; Probe支持三种检查方法&#xff1a; 示例1&#xff1a;exec方式 示例2&#xff1a;httpGet方式 示例3&#xff1a;tcpSocket方式 示例4&#xff1a;就绪检测 扩展 资源限制 当定…...

系列四十七、Spring的事务传播行为案例演示(七)#NOT_SUPPORTED

一、演示Spring的传播行为&#xff08;NOT_SUPPORTED&#xff09; 1.1、StockServiceImplNOT_SUPPORTED /*** Author : 一叶浮萍归大海* Date: 2023/10/30 15:43* Description: 演示NOT_SUPPORTED的传播行为* 外部不存在事务&#xff1a;不开启新的事务* 外部存在…...

54.RabbitMQ快速实战以及核心概念详解

MQ MQ&#xff1a;MessageQueue&#xff0c;消息队列。这东西分两个部分来理解&#xff1a; 队列&#xff0c;是一种FIFO 先进先出的数据结构。 消息&#xff1a;在不同应用程序之间传递的数据。将消息以队列的形式存储起来&#xff0c;并且在不同的应用程序之间进行传递&am…...

Qt TreeView 设置节点不可编辑

目录 1. 创建treeview 2、节点不可编辑 3、设置logo 4、实例代码 1. 创建treeview //声明模型 QStandardItemModel *model;//创建4行&#xff0c;1列的模型 model new QStandardItemModel(4,1);//添加标题 model->setHeaderData(0, Qt::Horizontal, tr("Tree View…...

python django获取某个角色的某个数据和——例如:获取所有订单的应付金额总和

model关系如下&#xff1a; class Order(models.Model):订单product models.ForeignKey(Product, on_deletemodels.SET_NULL, blankTrue, nullTrue, verbose_name"产品")no models.CharField(max_length50, blankTrue, nullTrue, verbose_name订单编号, db_indexT…...

如何在React项目中引用less

安装less npm install less less-loader --save-dev暴露 webpack 文件 利用 npx create-react-app 搭建的 React 项目&#xff0c;默认隐藏 webpack 配置文件&#xff0c;引入 less 需要修改 webpack 配置文件&#xff0c;因此我们需要执行命令暴露 webpack 配置文件。 请先将…...

NUXT前端服务端渲染技术框架

服务端渲染又称SSR&#xff08;Server Side Render&#xff09;实在服务端完成页面的内容&#xff0c;而不是在客户端通过AJAX获取数据 优势&#xff1a;更好的SEO&#xff0c;由于搜索引擎爬虫抓取工具可以直接查看完全渲染的页面 Nuxt.js是一个基于Vue.js的轻量级应用框架&a…...

力扣每日一题90:子集

题目描述&#xff1a; 给你一个整数数组 nums &#xff0c;其中可能包含重复元素&#xff0c;请你返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。返回的解集中&#xff0c;子集可以按 任意顺序 排列。 示例 1&#xff1a; 输入&#x…...

「linux基础」上传代码到github/gitee

一、在gitee创建一个仓库 1.创建仓库 2.获取仓库地址 二、克隆仓库文件到linux中 1.查看Linux中是否安装git&#xff1a;git --version 如果没有&#xff0c;在root下使用指令 yum install -y git 安装。 2.使用 git clone 仓库地址&#xff0c;克隆仓库文件到linux中 三、第…...

Hafnium总体考虑

安全之安全(security)博客目录导读 目录 一、安全世界构建平台 二、安全分区调度 三、平台拓扑...

C#__对Json文件的解析和序列化

Json: 存储和交换文本信息的语法。&#xff08;类似XML&#xff0c;语法独立&#xff09; 一种轻量级的数据交换格式。&#xff08;更小&#xff0c;更快&#xff0c;更易解析&#xff09; 语法规则: 数据在键值对里面&#xff0c;数据由逗号分隔开。 …...

如果一定要在C++和JAVA中选择,是C++还是java?

如果一定要在C和JAVA中选择&#xff0c;是C还是java&#xff1f; 计算机专业的同学对这个问题有疑惑的&#xff0c;- 定要看一下这个回答! 上来直接给出最中肯的建议: 如果你是刚刚步入大学的大一时间非常充裕的同学&#xff0c;猪学长强烈建议先学C/C.因为C 非常 最近很多…...

如何运行深度学习项目代码

运行项目代码是第一步哦&#xff01; 配环境 使用anaconda环境&#xff1b; conda 环境 按照项目提示的README.md&#xff0c;安装指定版本的python&#xff1b; 当然新版python会兼容旧版&#xff0c;也就是你的环境下python版本比它高也不要紧&#xff1b; 但是更新的pyt…...

C语言 每日一题 day9

求最大值及其下标 本题要求编写程序&#xff0c;找出给定的n个数中的最大值及其对应的最小下标&#xff08;下标从0开始&#xff09;。 输入格式 : 输入在第一行中给出一个正整数n&#xff08;1 < n≤10&#xff09;。第二行输入n个整数&#xff0c;用空格分开。 输出格式 …...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

(转)什么是DockerCompose?它有什么作用?

一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用&#xff0c;而无需手动一个个创建和运行容器。 Compose文件是一个文本文件&#xff0c;通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录&#xff0c;不允许匿名访问&#xff0c;kefu只能访问/data/kefu目录&#xff0c;不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

Java编程之桥接模式

定义 桥接模式&#xff08;Bridge Pattern&#xff09;属于结构型设计模式&#xff0c;它的核心意图是将抽象部分与实现部分分离&#xff0c;使它们可以独立地变化。这种模式通过组合关系来替代继承关系&#xff0c;从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...