RabbitMQ应用问题 - 消息顺序性保证、消息积压问题
文章目录
- MQ 消息顺序性保证
- 概述
- 原因分析
- 解决方案
- 基于 spring-cloud-stream 实现`分区消费`
- 消息挤压问题
- 概述
- 原因分析
- 解决方案
MQ 消息顺序性保证
概述
a)消息顺序性:消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的.
例如 生产者 发送消息顺序是 msg1、msg2、msg3,那么消费者也需要按照 msg1、msg2、msg3 的顺序进行消费.
b)顺序不一致可能会导致哪些问题?
例如用户系统中,用户需要对昵称进行了两次修改,此时生产者发送两条消息:
- 消息1:修改 用户318 的昵称为 “白天”.
- 消息2:修改 用户318 的昵称为 “黑夜”.
那么,按正常的逻辑来讲,用户318 的名称最后因该为 “黑夜”,但如果 消息1 是最后一个被消费者消费的消息,那么 用户318 的名称就变成了 “白天”.
原因分析
Note:以下场景成立的前提是,只能有一个生产者!因为生产者发送消息给 mq,中间都需要经过网络传输,而网络的不确定性是非常大的,因此无法保证多个生产者的消息谁先到 mq.
a)多个消费者:
多个消息会被不同的消费者并行
处理,也就意味着有的消费者消费的快,有的消费者消费的慢,从而导致消息处理的顺序性无法保证.
b)网络波动:
网络波动可能会导致消费者消费完消息返回的 ack 丢失,从而使得 mq 以为消息发给消费者的中途丢失了,进而使得消息重新入队,这就意味着 如果队列中此时还有其他消息,那么这个重新入队的消息就会排在队列尾部,而头部的消息会被优先消费
,导致顺序性问题.
实际上,也就意味着,只要触发了消息重新入队的操作,就会导致顺序性问题.
c)消息路由问题:
在复杂的路由场景中(例如大量应用 Topic 交换机),消息可能会根据 routingKey 被分发到不同的队列,使得无法保证全局的顺序性.
d)死信队列:
消息因为一些原因(例如被消费者返回 nack + requeue=false),然后放入死信队列,那么死信队列无论是网络传输,还是处理死信队列的消费者和普通队列的消费者并行处理,都会导致顺序不一致的情况.
解决方案
顺序性的保证分为 局部顺序性保证
和 全局顺序性保证
.
例如如下,假设消息入队的顺序为 msg1、msg2、msg3、msg4、msg5…
消息顺序性保证的常见策略:
Note:以下顺序性保证策略往往不是单独使用进行保证的,而是多种组合使用.
a)单队列,单消费者(全局顺序性)
最简单的方式就是使用单个队列,并由单个消费者进行消息. 对于消息在队列先进先出,这是 RabbitMQ 给我们保证的.
b)业务逻辑控制(全局顺序性)
例如给每个消息引入一个序号(类似 TCP 确认应答),序号 3 消费之前,要保证序号 2 被成功消费…
c)手动消息确认机制(局部顺序性)
消费者在处理完消息后,显式的发送确认,这样 RabbitMQ 才会移除并继续处理下一个消息.
Ps:在 RabbitMQ 中,当消费者接收到一条消息时,这条消息并不会立即从队列中删除。相反,消息会保持在队列中,直到 RabbitMQ 收到消费者发回的确认.
d)分区消费(局部顺序性)
单个消费者的吞吐量太低了,当需要多个消费者来提高处理速度时,可以使用分区消费. 也就是把一个队列分割成多个分区(例如根据订单系统,将 订单id 进行 hash 或者其他算法
-> 保证同样的订单 id,经过这个算法后,得到的队列名称是一致的(如果 同样的订单 id 一会跑到队列1,一会跑到队列2,就会导致多个消费并行消费,最终消费顺序不一致)
),最后每个分区由一个消费者处理,保证每个分区内消息的顺序性.
Ps:RabbitMQ 本身没有实现
分区消费
基于 spring-cloud-stream 实现分区消费
Note: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html
RabbitMQ 并没有实现分区消费
,因此这里可以引入一些其他的机制来实现.
a)引入依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.2</version><relativePath/> <!-- lookup parent from repository --></parent><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2023.0.2</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies>
b)配置文件如下:
spring:rabbitmq:host: env-baseport: 5672username: rootpassword: 1111cloud:stream:bindings: # bindings 表示消息通道绑定配置generate-out-0: # generate-out-0 是一个输出通信的名称,表示这是生成消息的第一个通道(还可能由类似 generate-out-1 的其他通道)destination: partitioned.destination # 消息发送的名称为 "partitioned.destination" 的目的地(目的地在这里就是 mq 消息队列).producer: # 生产者配置# partitioned: truepartition-key-expression: headers['partitionKey'] # 表示消息应该发送到哪个分区(这个跟代码里配置的 header 有关)partition-count: 2 # 表示有两个分区(两个队列). 生产者会根据 "partition-key-expression" 计算的结果,将消息分配到这两个分区之一required-groups: # 配置消费组- myGroup
c)代码如下:
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.Random;
import java.util.function.Supplier;@SpringBootApplication
public class SpringCloudStreamMqApplication {private static final Random RANDOM = new Random(System.currentTimeMillis());private static final String[] data = new String[] {"abc1", "abc2", "abc3","abc4",};public static void main(String[] args) {new SpringApplicationBuilder(SpringCloudStreamMqApplication.class).web(WebApplicationType.NONE) //不运行其他 web 组件.run(args);}/*** 分区消息:* 方法返回一个函数,这个函数每次调用都会从 data 中随机选择一个字符串,* 生成一个带有分区键(partitionKey)的消息,并将这个消息返回.*/@Beanpublic Supplier<Message<?>> generate() {return () -> {String value = data[RANDOM.nextInt(data.length)];System.out.println("Sending: " + value);return MessageBuilder.withPayload(value).setHeader("partitionKey", value).build();};}}
d)效果演示:
在 mq 管理平台可以看到多出来了一个交换机 和 两个队列(分区)
在 partitioned.destination.myGroup-0 中获取消息,可以看到都是 “abc2” 和 “abc4”
在 partitioned.destination.myGroup-1 中获取消息,可以看到都是 “abc1” 和 “abc3”
消息挤压问题
概述
消息挤压:在消息队列中,待处理的消息数量超过了消费者的处理能力,导致消息在队列中不断堆积的现象.
原因分析
a)消息生产过快
在流量较大的情况下,生产者发送消息速率
大于消费者消费消息速率
.
b)消费者处理能力不足
- 消费端业务复杂,耗时长.
- 系统资源限制,例如 CPU、内存、磁盘I/O 限制消费者处理速度.
- 消费者在处理消息时出现异常,导致消息无法被正确处理和确认.
- 服务器端配置过低
c)网络问题
由于网络抖动,消费者没有及时反馈 ack/nack,导致消息不断重发.
d)消费者代码逻辑异常,引发重试
消费者配置了手动 ack + requeue= true,导致一旦由于消费者代码逻辑引发异常,就会造成消息不断重新入队,不断重试,进而导致消息积压.
解决方案
Note:实际工作中,更多的是处理消费者的效率
a)提高消费者效率:
- 提高消费者的数量,比如新增机器.
- 如果消费端业务分散耗时,可以考虑使用 CompletableFuture 实现多线程异步编排.
- 设置 prefetchCount,当一个消费者阻塞时,消息转发到其他没有阻塞的消费者.
- 消息引发异常时,考虑配置重试机制,或者转入死信队列.
b)限制生产者速率:
- 使用限流工具,限制消息发送速率的上限.
- 设置消息过期时间. 如果消息过期没有消费,可以配置死信队列,不仅避免消息丢失,还减少了主队列的压力.
相关文章:

RabbitMQ应用问题 - 消息顺序性保证、消息积压问题
文章目录 MQ 消息顺序性保证概述原因分析解决方案基于 spring-cloud-stream 实现分区消费 消息挤压问题概述原因分析解决方案 MQ 消息顺序性保证 概述 a)消息顺序性:消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的. 例如 生产者 发送消息顺序…...

linux tcp通讯demo
linux tcp通讯demo代码。通过用chatgpt生成的代码。做一个代码记录。 一、基本的通讯demo server.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h>…...

在 MongoDB 中,如何配置副本集以实现读写分离?
在 MongoDB 中,配置副本集以实现读写分离主要涉及以下几个步骤: 初始化副本集: 创建副本集时,需要在所有参与节点上运行 rs.initiate() 命令。这将初始化一个新的副本集。 添加成员到副本集: 使用 rs.add() 命令将所有…...

虚拟dom-Diff算法
虚拟dom-Diff算法 vue2 diff算法在vue2中就是patch,通过新旧虚拟dom对比,找到最小变化然后进行dom操作 在页面首次渲染的时候会调用一次patch并创建新的vnode,不会进行深层次的比较,然后再组件中数据发生变化的时候,…...

01创建型设计模式——单例模式
一、单例模式简介 单例模式(Singleton Pattern)是一种创建型设计模式(GoF书中解释创建型设计模式:一种用来处理对象的创建过程的模式),单例模式是其中的一种,它确保一个类只有一个实例ÿ…...

图像分割(一)
一、概述 语义分割:是把每个像素都打上标签(这个像素点是人、树、背景等) 实例分割:不光要区别类别,还要区分类别中的每一个个体 损失函数:逐像素的交叉熵;样本均衡问题 MIOU指标:…...

C++ 新经典:设计模式 目录(先留框架,慢慢来~)
C 新经典:设计模式 C 新经典:设计模式 C 新经典:设计模式第1章 设计模式与软件开发思想、编程环境介绍第2章 模板方法模式第3章 工厂模式、原型模式、建造者模式第4章 策略模式第5章 观察者模式第6章 装饰模式第7章 单件模式第8章 外观模式第…...

go之命令行工具urfave-cli
一、urfave/cli urfave/cli 是一个声明性的、简单、快速且有趣的包,用于用 Go 构建命令行工具。 二、快速使用 2.1 引入依赖 go get github.com/urfave/cli/v2 2.2 demo package mainimport ("fmt""log""os""github.com/ur…...

四种应用层协议——MQTT、CoAP、WebSockets和HTTP——在工业物联网监控系统中的性能比较
目录 摘要(Abstract) 实验设置 实验结果 节选自《A Comparative Analysis of Application Layer Protocols within an Industrial Internet of Things Monitoring System》,作者是 Jurgen Aquilina、Peter Albert Xuereb、Emmanuel Francalanza、Jasmine Mallia …...

MySQL的脏读、不可重复读、幻读与隔离级别
脏读/不可重复读/幻读 脏读 脏读(Dirty Read)发生在一个事务读取了另一个事务尚未提交的数据。如果第二个事务失败并回滚,第一个事务读到的数据就是错误的。这意味着数据从一开始就是不稳定或者“脏”的。 举例 事务A读取了某条记录的值为X。事务B修改该记录的值…...

程序员前端开发者的AI绘画副业之路:在裁员危机中寻找新机遇
正文: 在这个充满变数的时代,作为一名前端开发者,我经历了行业的起伏,见证了裁员危机和中年失业危机的残酷。在这样的背景下,我开始了利用AI绘画作为副业的探索,不仅为了寻求经济上的稳定,更是为…...

Burp Suite的使用和文件上传漏洞靶场试验
第一步:分析如何利用漏洞,通过对代码的查阅发现,代码的逻辑是先上传后删除,意味着,我可以利用webshell.php文件在上传到删除之间的间隙,执行webshell.php的代码,给上级目录创建一个shell.php木马…...

如何在Ubuntu中安装deepin wine版的企业微信
如何在Ubuntu中安装deepin wine版的企业微信 运行如下一条命令将移植仓库添加到系统中 wget -O- https://deepin-wine.i-m.dev/setup.sh | sh自此以后,你可以像对待普通的软件包一样,使用apt-get系列命令进行各种应用安装、更新和卸载清理了。 安装企业…...

案例:Nginx + Tomcat集群(负载均衡 动静分离)
目录 案例 案例环境 案例步骤 部署Tomcat服务器 部署Nginx服务器 实现负载均衡和读写分离 日志控制 案例 案例环境 操作系统 IP 地址 角色 CentOS 192.168.10.101 Nginx服务器(调度器) CentOS 192.168.10.102 Tomcat服务器① CentOS 1…...
【密码学】密码协议的分类:②认证协议
密码协议的分类有很多种方式,这里我采取的是基于协议实现的目的来分类。可以将密码协议分成三类:认证协议、密钥建立协议、认证密钥建立协议。 一、认证协议是什么? 认证协议都在认证些什么东西呢?认证一般要认证三个东西&#x…...

异步编程(Promise详解)
目录 异步编程 回调函数 回调地狱 Promise 基本概念 Promise的特点 1.Promise是一种构造函数 2.Promise接收函数创建实例 3.Promise对象有三种状态 4.Promise状态转变不可逆 5.Promise 实例创建即执行 6.Promise可注册处理函数 7.Promise支持链式调用 Promise的静…...

DjangoORM注入分享
DjangoORM注入 简介 这篇文章中,分享一些关于django orm相关的技术积累和如果orm注入相关的安全问题讨论。 攻击效果同数据库注入 从Django-Orm开始 开发角度 Django ORM(Object-Relational Mapping)是Django框架中用于处理数…...

【HBZ分享】Redis各种类型的数据结构应用场景
String(字符串类型) 计数器: incr / decr, 比如商品库存,业务号的发号器业务数据key-value缓存, 缓存结果数据,提高网站性能,缓解DB压力分布式session会话, 集群环境下存储token鉴权信息分布式锁ÿ…...

anaconda创建并且配置pytorch(完整版)
📚博客主页:knighthood2001 ✨公众号:认知up吧 ** 🎃知识星球:【认知up吧|成长|副业】介绍** ❤️如遇文章付费,可先看看我公众号中是否发布免费文章❤️ 🙏笔者水平有限,欢迎各位大…...

高级java每日一道面试题-2024年8月10日-网络篇-你对跨域了解多少?
如果有遗漏,评论区告诉我进行补充 面试官: 你对跨域了解多少? 我回答: 跨域问题,即Cross-Origin Resource Sharing(CORS),是现代Web开发中一个非常重要的概念,涉及到浏览器的安全策略——同源策略(Same…...

AtCoder Beginner Contest 365 A~E
A.Leap Year(思维) 题意: 给你一个介于 1583 1583 1583和 2023 2023 2023之间的整数 Y Y Y。 求公历 Y Y Y年的天数。 在给定的范围内, Y Y Y年的天数如下: 如果 Y Y Y不是 4 4 4的倍数,则为 365 365 …...

多机部署, 负载均衡-LoadBalance
目录 1.负载均衡介绍 1.1问题描述 1.2什么是负载均衡 1.3负载均衡的一些实现 服务端负载均衡 客户端负载均衡 2.Spring Cloud LoadBalancer 2.1快速上手实现负载均衡 2.2负载均衡策略 自定义负载均衡策略 3.服务部署(Linux) 3.1服务构建打包…...

(回溯) LeetCode 78. 子集
原题链接 一. 题目描述 给你一个整数数组 nums ,数组中的元素 互不相同 。返回该数组所有可能的 子集 (幂集)。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1: 输入:nums [1,2,3] 输出&…...

DQL数据查询语言(多表处理)—/—<7>
一、多表处理 当前有两个表,一个是学生表student,一个是分数表score student表字段名表示如下(共1000条数据): score表字段表示如下(共6000条数据): 1、求每个学生的总分 SELECT …...

力扣刷题总结
去年有段时间一直在刷题,进步神速,解决了以往刷完就忘的问题,这里总结下经验,给有需要的人参考下,核心观点就仨: 1. 打好数据结构与算法基础 2. 多刷题多练习 3. 形成自己的知识体系 下图是我梳理的知识体…...

BLDC ESC 无刷直流电子调速器驱动方式
BLDC ESC 无刷直流电子调速器驱动方式 1. 源由2. 驱动方法2.1 Trapezoidal 1202.2 Trapezoidal 1502.3 Sinusoidal 1802.4 Field-Orientated Control (FOC) 3. FOC(Field-Oriented Control)3.1 引入坐标系3.2 Clarke and Park变换Clarke 变换(…...

解决 IntelliJ IDEA 编译错误 “Groovyc: Internal groovyc error: code 1” 及 JVM 内存配置问题
在使用 IntelliJ IDEA 进行开发时,我们可能会遇到各种编译和运行错误,其中之一就是 Groovy 编译器错误(Groovyc: Internal groovyc error: code 1)或 JVM 内存不足错误。这类错误可能会影响开发效率,但通过调整 JVM 内…...

LeetCode.2940.找到Alice和Bob可以相遇的建筑
友情提示:这个方法并没有通过案例,只通过了944个案例(很难受),超时了,但是想着还是分享出来吧 题目描述: 给你一个下标从 0 开始的正整数数组 heights ,其中 heights[i] 表示第 i …...

OFD板式文件创建JAVA工具-EASYOFD 四、文字 Text
JAVA版本的OFD板式文件创建工具easyofd. 功能包含了图像、 图像、 文字、和模版页功能。同时也支持OFD文件的数字签名及验签,电子签章及验签。 本JAVA版本的easyofd使用原生方式创建板式文件,不依赖JAVA的SWT库。 项目地址:http://…...

【概念速通】李群 lie group
李群 lie group 概念速通 快速示例介绍:【引入】单位复数 (The unit complex numbers) 是李群 (lie group) 最简单的例子之一【进一步】SO(2): The 2D rotation matrices【Typical uses】SE(2): Pose of a robot in the plane Group & Lie Group 定义࿱…...