当前位置: 首页 > news >正文

RabbitMQ应用问题 - 消息顺序性保证、消息积压问题

文章目录

  • MQ 消息顺序性保证
    • 概述
    • 原因分析
    • 解决方案
    • 基于 spring-cloud-stream 实现`分区消费`
  • 消息挤压问题
    • 概述
    • 原因分析
    • 解决方案

MQ 消息顺序性保证


概述

a)消息顺序性:消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的.

例如 生产者 发送消息顺序是 msg1、msg2、msg3,那么消费者也需要按照 msg1、msg2、msg3 的顺序进行消费.

b)顺序不一致可能会导致哪些问题?

例如用户系统中,用户需要对昵称进行了两次修改,此时生产者发送两条消息:

  1. 消息1:修改 用户318 的昵称为 “白天”.
  2. 消息2:修改 用户318 的昵称为 “黑夜”.
    那么,按正常的逻辑来讲,用户318 的名称最后因该为 “黑夜”,但如果 消息1 是最后一个被消费者消费的消息,那么 用户318 的名称就变成了 “白天”.

原因分析

Note:以下场景成立的前提是,只能有一个生产者!因为生产者发送消息给 mq,中间都需要经过网络传输,而网络的不确定性是非常大的,因此无法保证多个生产者的消息谁先到 mq.

a)多个消费者:
多个消息会被不同的消费者并行处理,也就意味着有的消费者消费的快,有的消费者消费的慢,从而导致消息处理的顺序性无法保证.

b)网络波动:
网络波动可能会导致消费者消费完消息返回的 ack 丢失,从而使得 mq 以为消息发给消费者的中途丢失了,进而使得消息重新入队,这就意味着 如果队列中此时还有其他消息,那么这个重新入队的消息就会排在队列尾部,而头部的消息会被优先消费,导致顺序性问题.

实际上,也就意味着,只要触发了消息重新入队的操作,就会导致顺序性问题.

c)消息路由问题:
在复杂的路由场景中(例如大量应用 Topic 交换机),消息可能会根据 routingKey 被分发到不同的队列,使得无法保证全局的顺序性.

d)死信队列:
消息因为一些原因(例如被消费者返回 nack + requeue=false),然后放入死信队列,那么死信队列无论是网络传输,还是处理死信队列的消费者和普通队列的消费者并行处理,都会导致顺序不一致的情况.

解决方案

顺序性的保证分为 局部顺序性保证全局顺序性保证.
例如如下,假设消息入队的顺序为 msg1、msg2、msg3、msg4、msg5…
在这里插入图片描述
在这里插入图片描述
消息顺序性保证的常见策略:

Note:以下顺序性保证策略往往不是单独使用进行保证的,而是多种组合使用.

a)单队列,单消费者(全局顺序性)
最简单的方式就是使用单个队列,并由单个消费者进行消息. 对于消息在队列先进先出,这是 RabbitMQ 给我们保证的.

b)业务逻辑控制(全局顺序性)
例如给每个消息引入一个序号(类似 TCP 确认应答),序号 3 消费之前,要保证序号 2 被成功消费…

c)手动消息确认机制(局部顺序性)
消费者在处理完消息后,显式的发送确认,这样 RabbitMQ 才会移除并继续处理下一个消息.

Ps:在 RabbitMQ 中,当消费者接收到一条消息时,这条消息并不会立即从队列中删除。相反,消息会保持在队列中,直到 RabbitMQ 收到消费者发回的确认.

d)分区消费(局部顺序性)
单个消费者的吞吐量太低了,当需要多个消费者来提高处理速度时,可以使用分区消费. 也就是把一个队列分割成多个分区(例如根据订单系统,将 订单id 进行 hash 或者其他算法 -> 保证同样的订单 id,经过这个算法后,得到的队列名称是一致的(如果 同样的订单 id 一会跑到队列1,一会跑到队列2,就会导致多个消费并行消费,最终消费顺序不一致)),最后每个分区由一个消费者处理,保证每个分区内消息的顺序性.

Ps:RabbitMQ 本身没有实现分区消费

基于 spring-cloud-stream 实现分区消费

Note: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html

RabbitMQ 并没有实现分区消费,因此这里可以引入一些其他的机制来实现.

a)引入依赖

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.2</version><relativePath/> <!-- lookup parent from repository --></parent><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2023.0.2</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>

b)配置文件如下:

spring:rabbitmq:host: env-baseport: 5672username: rootpassword: 1111cloud:stream:bindings: # bindings 表示消息通道绑定配置generate-out-0: # generate-out-0 是一个输出通信的名称,表示这是生成消息的第一个通道(还可能由类似 generate-out-1 的其他通道)destination: partitioned.destination # 消息发送的名称为 "partitioned.destination" 的目的地(目的地在这里就是 mq 消息队列).producer: # 生产者配置# partitioned: truepartition-key-expression: headers['partitionKey'] # 表示消息应该发送到哪个分区(这个跟代码里配置的 header 有关)partition-count: 2 # 表示有两个分区(两个队列). 生产者会根据 "partition-key-expression" 计算的结果,将消息分配到这两个分区之一required-groups: # 配置消费组- myGroup

c)代码如下:

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.Random;
import java.util.function.Supplier;@SpringBootApplication
public class SpringCloudStreamMqApplication {private static final Random RANDOM = new Random(System.currentTimeMillis());private static final String[] data = new String[] {"abc1", "abc2", "abc3","abc4",};public static void main(String[] args) {new SpringApplicationBuilder(SpringCloudStreamMqApplication.class).web(WebApplicationType.NONE) //不运行其他 web 组件.run(args);}/*** 分区消息:* 方法返回一个函数,这个函数每次调用都会从 data 中随机选择一个字符串,* 生成一个带有分区键(partitionKey)的消息,并将这个消息返回.*/@Beanpublic Supplier<Message<?>> generate() {return () -> {String value = data[RANDOM.nextInt(data.length)];System.out.println("Sending: " + value);return MessageBuilder.withPayload(value).setHeader("partitionKey", value).build();};}}

d)效果演示:
在 mq 管理平台可以看到多出来了一个交换机 和 两个队列(分区)
在这里插入图片描述
在这里插入图片描述
在 partitioned.destination.myGroup-0 中获取消息,可以看到都是 “abc2” 和 “abc4”
在这里插入图片描述
在 partitioned.destination.myGroup-1 中获取消息,可以看到都是 “abc1” 和 “abc3”
在这里插入图片描述

消息挤压问题


概述

消息挤压:在消息队列中,待处理的消息数量超过了消费者的处理能力,导致消息在队列中不断堆积的现象.

原因分析

a)消息生产过快
在流量较大的情况下,生产者发送消息速率大于消费者消费消息速率.

b)消费者处理能力不足

  • 消费端业务复杂,耗时长.
  • 系统资源限制,例如 CPU、内存、磁盘I/O 限制消费者处理速度.
  • 消费者在处理消息时出现异常,导致消息无法被正确处理和确认.
  • 服务器端配置过低

c)网络问题
由于网络抖动,消费者没有及时反馈 ack/nack,导致消息不断重发.

d)消费者代码逻辑异常,引发重试
消费者配置了手动 ack + requeue= true,导致一旦由于消费者代码逻辑引发异常,就会造成消息不断重新入队,不断重试,进而导致消息积压.

解决方案

Note:实际工作中,更多的是处理消费者的效率

a)提高消费者效率:

  • 提高消费者的数量,比如新增机器.
  • 如果消费端业务分散耗时,可以考虑使用 CompletableFuture 实现多线程异步编排.
  • 设置 prefetchCount,当一个消费者阻塞时,消息转发到其他没有阻塞的消费者.
  • 消息引发异常时,考虑配置重试机制,或者转入死信队列.

b)限制生产者速率:

  • 使用限流工具,限制消息发送速率的上限.
  • 设置消息过期时间. 如果消息过期没有消费,可以配置死信队列,不仅避免消息丢失,还减少了主队列的压力.

在这里插入图片描述

相关文章:

RabbitMQ应用问题 - 消息顺序性保证、消息积压问题

文章目录 MQ 消息顺序性保证概述原因分析解决方案基于 spring-cloud-stream 实现分区消费 消息挤压问题概述原因分析解决方案 MQ 消息顺序性保证 概述 a&#xff09;消息顺序性&#xff1a;消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的. 例如 生产者 发送消息顺序…...

linux tcp通讯demo

linux tcp通讯demo代码。通过用chatgpt生成的代码。做一个代码记录。 一、基本的通讯demo server.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h>…...

在 MongoDB 中,如何配置副本集以实现读写分离?

在 MongoDB 中&#xff0c;配置副本集以实现读写分离主要涉及以下几个步骤&#xff1a; 初始化副本集&#xff1a; 创建副本集时&#xff0c;需要在所有参与节点上运行 rs.initiate() 命令。这将初始化一个新的副本集。 添加成员到副本集&#xff1a; 使用 rs.add() 命令将所有…...

虚拟dom-Diff算法

虚拟dom-Diff算法 vue2 diff算法在vue2中就是patch&#xff0c;通过新旧虚拟dom对比&#xff0c;找到最小变化然后进行dom操作 在页面首次渲染的时候会调用一次patch并创建新的vnode&#xff0c;不会进行深层次的比较&#xff0c;然后再组件中数据发生变化的时候&#xff0c;…...

01创建型设计模式——单例模式

一、单例模式简介 单例模式&#xff08;Singleton Pattern&#xff09;是一种创建型设计模式&#xff08;GoF书中解释创建型设计模式&#xff1a;一种用来处理对象的创建过程的模式&#xff09;&#xff0c;单例模式是其中的一种&#xff0c;它确保一个类只有一个实例&#xff…...

图像分割(一)

一、概述 语义分割&#xff1a;是把每个像素都打上标签&#xff08;这个像素点是人、树、背景等&#xff09; 实例分割&#xff1a;不光要区别类别&#xff0c;还要区分类别中的每一个个体 损失函数&#xff1a;逐像素的交叉熵&#xff1b;样本均衡问题 MIOU指标&#xff1a…...

C++ 新经典:设计模式 目录(先留框架,慢慢来~)

C 新经典&#xff1a;设计模式 C 新经典&#xff1a;设计模式 C 新经典&#xff1a;设计模式第1章 设计模式与软件开发思想、编程环境介绍第2章 模板方法模式第3章 工厂模式、原型模式、建造者模式第4章 策略模式第5章 观察者模式第6章 装饰模式第7章 单件模式第8章 外观模式第…...

go之命令行工具urfave-cli

一、urfave/cli urfave/cli 是一个声明性的、简单、快速且有趣的包&#xff0c;用于用 Go 构建命令行工具。 二、快速使用 2.1 引入依赖 go get github.com/urfave/cli/v2 2.2 demo package mainimport ("fmt""log""os""github.com/ur…...

四种应用层协议——MQTT、CoAP、WebSockets和HTTP——在工业物联网监控系统中的性能比较

目录 摘要(Abstract) 实验设置 实验结果 节选自《A Comparative Analysis of Application Layer Protocols within an Industrial Internet of Things Monitoring System》&#xff0c;作者是 Jurgen Aquilina、Peter Albert Xuereb、Emmanuel Francalanza、Jasmine Mallia …...

MySQL的脏读、不可重复读、幻读与隔离级别

脏读/不可重复读/幻读 脏读 脏读(Dirty Read)发生在一个事务读取了另一个事务尚未提交的数据。如果第二个事务失败并回滚&#xff0c;第一个事务读到的数据就是错误的。这意味着数据从一开始就是不稳定或者“脏”的。 举例 事务A读取了某条记录的值为X。事务B修改该记录的值…...

程序员前端开发者的AI绘画副业之路:在裁员危机中寻找新机遇

正文&#xff1a; 在这个充满变数的时代&#xff0c;作为一名前端开发者&#xff0c;我经历了行业的起伏&#xff0c;见证了裁员危机和中年失业危机的残酷。在这样的背景下&#xff0c;我开始了利用AI绘画作为副业的探索&#xff0c;不仅为了寻求经济上的稳定&#xff0c;更是为…...

Burp Suite的使用和文件上传漏洞靶场试验

第一步&#xff1a;分析如何利用漏洞&#xff0c;通过对代码的查阅发现&#xff0c;代码的逻辑是先上传后删除&#xff0c;意味着&#xff0c;我可以利用webshell.php文件在上传到删除之间的间隙&#xff0c;执行webshell.php的代码&#xff0c;给上级目录创建一个shell.php木马…...

如何在Ubuntu中安装deepin wine版的企业微信

如何在Ubuntu中安装deepin wine版的企业微信 运行如下一条命令将移植仓库添加到系统中 wget -O- https://deepin-wine.i-m.dev/setup.sh | sh自此以后&#xff0c;你可以像对待普通的软件包一样&#xff0c;使用apt-get系列命令进行各种应用安装、更新和卸载清理了。 安装企业…...

案例:Nginx + Tomcat集群(负载均衡 动静分离)

目录 案例 案例环境 案例步骤 部署Tomcat服务器 部署Nginx服务器 实现负载均衡和读写分离 日志控制 案例 案例环境 操作系统 IP 地址 角色 CentOS 192.168.10.101 Nginx服务器&#xff08;调度器&#xff09; CentOS 192.168.10.102 Tomcat服务器① CentOS 1…...

【密码学】密码协议的分类:②认证协议

密码协议的分类有很多种方式&#xff0c;这里我采取的是基于协议实现的目的来分类。可以将密码协议分成三类&#xff1a;认证协议、密钥建立协议、认证密钥建立协议。 一、认证协议是什么&#xff1f; 认证协议都在认证些什么东西呢&#xff1f;认证一般要认证三个东西&#x…...

异步编程(Promise详解)

目录 异步编程 回调函数 回调地狱 Promise 基本概念 Promise的特点 1.Promise是一种构造函数 2.Promise接收函数创建实例 3.Promise对象有三种状态 4.Promise状态转变不可逆 5.Promise 实例创建即执行 6.Promise可注册处理函数 7.Promise支持链式调用 Promise的静…...

DjangoORM注入分享

DjangoORM注入 简介 ​ 这篇文章中&#xff0c;分享一些关于django orm相关的技术积累和如果orm注入相关的安全问题讨论。 ​ 攻击效果同数据库注入 从Django-Orm开始 开发角度 ​ Django ORM&#xff08;Object-Relational Mapping&#xff09;是Django框架中用于处理数…...

【HBZ分享】Redis各种类型的数据结构应用场景

String(字符串类型) 计数器&#xff1a; incr / decr, 比如商品库存&#xff0c;业务号的发号器业务数据key-value缓存&#xff0c; 缓存结果数据&#xff0c;提高网站性能&#xff0c;缓解DB压力分布式session会话&#xff0c; 集群环境下存储token鉴权信息分布式锁&#xff…...

anaconda创建并且配置pytorch(完整版)

&#x1f4da;博客主页&#xff1a;knighthood2001 ✨公众号&#xff1a;认知up吧 ** &#x1f383;知识星球&#xff1a;【认知up吧|成长|副业】介绍** ❤️如遇文章付费&#xff0c;可先看看我公众号中是否发布免费文章❤️ &#x1f64f;笔者水平有限&#xff0c;欢迎各位大…...

高级java每日一道面试题-2024年8月10日-网络篇-你对跨域了解多少?

如果有遗漏,评论区告诉我进行补充 面试官: 你对跨域了解多少? 我回答: 跨域问题&#xff0c;即Cross-Origin Resource Sharing&#xff08;CORS&#xff09;&#xff0c;是现代Web开发中一个非常重要的概念&#xff0c;涉及到浏览器的安全策略——同源策略&#xff08;Same…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

Linux 文件类型,目录与路径,文件与目录管理

文件类型 后面的字符表示文件类型标志 普通文件&#xff1a;-&#xff08;纯文本文件&#xff0c;二进制文件&#xff0c;数据格式文件&#xff09; 如文本文件、图片、程序文件等。 目录文件&#xff1a;d&#xff08;directory&#xff09; 用来存放其他文件或子目录。 设备…...

DockerHub与私有镜像仓库在容器化中的应用与管理

哈喽&#xff0c;大家好&#xff0c;我是左手python&#xff01; Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库&#xff0c;用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

JDK 17 新特性

#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持&#xff0c;不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的&#xff…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...