SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丢失、消息重复、消息顺序、消息顺序)
SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丢失、消息重复、消息顺序、消息顺序)
- RabbitMQ常见问题解决方案
- 问题一:消息丢失的解决方案
- (1)生成者丢失消息
- 丢失的情景
- 解决方案1:发送方确认机制(推荐,最常用)
- 解决方案2:事务(不推荐,因为性能差)
- (2)MQ丢失消息
- 丢失的情景
- 解决方案:开启RabbitMQ的持久化+开启镜像队列
- (3)消费者丢失消息
- 丢失的情景 1
- 解决方案:无需解决
- 丢失的情景 2
- 扩展:重试机制
- 解决方案:消费者方确认机制(推荐,最常用)
- 问题二:消息重复的解决方案
- 什么时候会重复消费
- 如何解决
- 问题三:保证消息顺序的解决方案
- 单一队列和单一消费者模式(RabbitMQ)
- 问题四:消息堆积的解决方案
- 消息堆积原因
- 预防措施
- 已出事故的解决措施
RabbitMQ常见问题解决方案
问题一:消息丢失的解决方案
首先明确一条消息的传送流程:生产者->MQ->消费者
所以这三个节点都可能丢失数据
(1)生成者丢失消息
丢失的情景
发送消息过程中出现网络问题:生产者以为发送成功,但RabbitMQ server没有收到
解决方案1:发送方确认机制(推荐,最常用)
发送方确认机制最大的好处在于它是异步的,等信道返回ark确认的同时继续发送下一条消息(不会堵塞其他消息的发送)
(一)修改application.properties配置
# 确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirms=true #旧版本
spring.rabbitmq.publisher-confirm-type=correlated #新版本
# 确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
springBoot 2.2.0.RELEASE版本之前 是使用 spring.rabbitmq.publisher-confirms=true
在2.2.0及之后 使用spring.rabbitmq.publisher-confirm-type=correlated 属性配置代替
(二)新建配置文件RabbitTemplate
对于 发送确认 写法有多种方式,以下的是其中的一种方式
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//setMandatory设置表示:消息在没有被队列接收时是否应该被退回给生产者(true:退回;false:丢弃)。//通常与yml配置文件中的publisher-returns配合一起使用,若不配置该项,setReutrnCallback将不会有消息返回rabbitTemplate.setMandatory(true);//帮助生产者判断 确认消息是否成功发送到RabbitMQ//ack 为true表示已发送成功 false表示发送失败rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);System.out.println("ConfirmCallback: "+"确认情况:"+ack);System.out.println("ConfirmCallback: "+"原因:"+cause);});//当消息无法 放到队列里面时 返回的提醒rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("ReturnCallback: "+"消息:"+message);System.out.println("ReturnCallback: "+"回应码:"+replyCode);System.out.println("ReturnCallback: "+"回应信息:"+replyText);System.out.println("ReturnCallback: "+"交换机:"+exchange);System.out.println("ReturnCallback: "+"路由键:"+routingKey);});return rabbitTemplate;}
}
解决方案2:事务(不推荐,因为性能差)
RabbitMQ提供的事务功能,在生产者发送数据之前开启RabbitMQ事务
(2)MQ丢失消息
丢失的情景
RabbitMQ服务端接收到消息后由于服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;
解决方案:开启RabbitMQ的持久化+开启镜像队列
RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化、消息的持久化
三者 都 持久化 才能保证 RabbitMQ服务重启之后,消息才能存在且能发出去
交换机持久化
交换机持久化描述的是当这个交换机上没有注册队列时,这个交换机是否删除。
如果要打开持久化的话也很简单 (上面列子都是有体现的)
//定义直接交换机
@Bean
public DirectExchange directExchange() {//第一个参数:定义交换机的名称,第二个参数:是否持久化,第三个参数:是否自动删除return new DirectExchange("directExchange", true, false);
}
队列持久化
队列持久化描述的是当这个队列没有消费者在监听时,是否进行删除。
持久化做法:
//定义队列
@Bean
public Queue directQueue() {//第一个参数:队列的名称,第二个参数:是否持久化return new Queue("directQueue", true);
}
消息持久化
关键配置 持久化(MessageDeliveryMode.PERSISTENT)
@Test
public void testDurableMessage() {// 1.准备消息Message message = MessageBuilder.withBody("hello, rabbitmq".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2.消息ID,封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.发送消息rabbitTemplate.convertAndSend("simple.queue", message, correlationData);log.info("发送消息成功");
}
(3)消费者丢失消息
丢失的情景 1
RabbitMQ服务端向消费者发送完消息之后,网络断了,消息并没有到达消费者
解决方案:无需解决
无需解决。因为此情景下服务端收不到确认消息,会再次发送的。
丢失的情景 2
启用了重试机制,重试指定次数之后,还没成功,但消息被确认。
扩展:重试机制
重试机制的三大前提
- 重试模式已启用:通过配置 spring.rabbitmq.listener.simple.retry.enabled=true 来启用重试模式。
- 抛出了异常:在 @RabbitListener 标注的方法中抛出了异常,通常是 RuntimeException 或 Error。
Spring AMQP 会捕获这些异常并根据配置的重试策略来重试消息。- 未达到最大重试次数:消息的重试次数尚未达到配置的最大值(spring.rabbitmq.listener.simple.retry.maxAttempts)。
配置以下即可实现重试操作
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数(默认3次)
spring.rabbitmq.listener.simple.retry.max-attempts=5
解决方案:消费者方确认机制(推荐,最常用)
改成手动后就 可以实现 “先操作业务逻辑(数据库操作)后,再手动从队列上删除这个消息” 的动作
其中“从队列上删除这个消息“这个动作体现就是 使用 channel.basicAck 去完成的。
切记改成手动后,这个channel.basicAck方法一定要写。
(一)修改application.properties配置
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
(二)修改Service接收信息项
当消息在进入 emailProcess、smsProcess(被@RabbitListener注解) 方法时就已经被视为“接收到了”,但是需要 你 执行 channel.basicAck(手动确认)才能让这个消息从队列上删除。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class DirectReceiver {@RabbitHandler@RabbitListener(queues = "emailQueue") //监听的队列名称public void emailProcess(Channel channel, Message message) throws IOException {try{System.out.println(new String(message.getBody(),"UTF-8"));//TODO 具体业务.......//你使用手动消息确认模式时,basicAck 一定要执行,不然会导致会保留在队列中,无法被消费//第1个参数表示消息投递序号//第2个参数false只确认当前一个消息收到(大多数情况下都设置为false),true确认所有consumer获得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {//若是消息没有成功接收,第二个参数设置为true的话,代表重新放回队列中,false则为丢弃,在此也可以做成放置死信队列的操作channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}
确认和拒绝消息:
- basicAck: 这个方法用于确认消息已被成功处理。
第一个参数是消息的delivery tag(用于标识消息),
第二个参数指定是否批量确认(false
表示只确认当前消息)。- basicReject: 这个方法用于拒绝消息。
第一个参数同样是delivery tag,
第二个参数指定是否将消息重新放回队列(false
表示不重新放回,即丢弃消息)。
方法解释:
- emailProcess: 这个方法监听
emailQueue
队列。
当队列中有消息时,它会打印出消息的内容,并尝试确认消息。
如果处理过程中发生异常,它会拒绝消息,但不会重新放回队列(第二个参数为false
)。
问题二:消息重复的解决方案
什么时候会重复消费
1.自动提交模式时
消费者收到消息后,要自动提交,但提交后,网络出故障,RabbitMQ服务器没收到提交消息,那么此消息会被重新放入队列,会再次发给消费者。
2.手动提交模式时
情景1:网络故障问题,同上。
情景2:接收到消息并处理结束了,此时消费者挂了,没有手动提交消息。
总体来说就是:网络不可达、消费端宕机。
如何解决
消费端处理消息的业务逻辑保持幂等性
比如你拿个数据要写库,先根据主键查一下,如果这数据有了,就别插入了,update 一下。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
问题三:保证消息顺序的解决方案
单一队列和单一消费者模式(RabbitMQ)
在RabbitMQ中,可以确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。
因为队列本身就是一个先进先出的结构。
适用场景:RabbitMQ用户且对消息顺序有严格要求的场景。
优点:实现简单,易于管理。
缺点:可能成为性能瓶颈,在处理大量消息时需要考虑扩展性。
问题四:消息堆积的解决方案
消息堆积原因
消息堆积即消息没及时被消费,是生产者生产消息速度快于消费者消费的速度导致的。
消费者消费慢可能是因为:本身逻辑耗费时间较长、阻塞了。
预防措施
生产者
1.减少发布频率
3.考虑使用队列最大长度限制
消费者
1.优化代码
已出事故的解决措施
情况1:堆积的消息还需要使用
方案1:简单修复
修复 消费者(consumer)的问题,让他恢复消费速度,然后等待几个小时消费完毕
方案2:复杂修复
单队列消费转变为多队列并行消费
也是需要先 修复 消费者(consumer)的问题,再进行下面的步骤
步骤 1: 队列和路由设置
1.创建新队列:在RabbitMQ中创建10个新队列,每个队列分配一个独特的名称。
2. 设置交换机:定义一个直连型(Direct)交换机。
3. 绑定路由键:将每个新队列通过唯一的路由键绑定到直连型交换机上。
伪代码例子:
// 假设这是配置类的一部分
@Bean
Queue queue1() { return new Queue("queue1", false);
}
@Bean
Queue queue2() { return new Queue("queue2", false);
}
// 以此类推,为其他9个队列创建Bean
.........
@Bean
DirectExchange exchange() { return new DirectExchange("myExchange");
}
@Bean
Binding binding1(Queue queue1, DirectExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with("routingKey1");
}
@Bean
Binding binding2(Queue queue2, DirectExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with("routingKey2");
}
// 以此类推,为其他队列和路由键创建绑定
......
步骤 2: 消息分发
1.接收堆积数据:现有消费者(或分发者)接收从发送者处堆积的数据。
2.分发到新队列:实现分发逻辑,将接收到的消息根据路由键分发到相应的10个新队列中。
伪代码例子:
@RabbitListener(queues = "oldQueue")
public void emailProcess(Message message, Channel channel) throws IOException { try { // 生成1-10之间的顺序数 SequentialRandom sequentialRandom = new SequentialRandom()String key = sequentialRandom.getNextSequentialRandom();// 重新发送消息到交换机,交换机将根据routingKey将消息路由到正确的队列 rabbitTemplate.convertAndSend("myExchange", "routingKey"+key, new String(message.getBody(),"UTF-8")); // 确认原始队列中的消息(如果您想要的话) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 处理异常,可能包括记录日志、发送警报等 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }
}
public class SequentialRandom { private int currentIndex = 1; // 初始索引为1 /** * 获取下一个顺序数* @return 下一个数字,从1到10循环 */ public int getNextSequentialRandom() { int next = currentIndex; currentIndex = (currentIndex % 10) + 1; // 使用模运算实现循环,并更新索引 return next; }
}
步骤 3: 并行消费
1.开发新消费端:编写新的消费端程序,该程序能够监并处理来自10个新队列的消息。
2. 部署并启动:将新消费端程序部署到服务器,并启动它以开始并行消费。
伪代码例子:
@Component
public class ParallelConsumer { @RabbitListener(queues = {"queue1"}) public void receiveMessage1(Message message) { // 处理消息 } @RabbitListener(queues = {"queue2"}) public void receiveMessage2(Message message) { // 处理消息 } // ... @RabbitListener(queues = {"queue10"}) public void receiveMessage3(Message message) { // 处理消息 }
}
情况2:堆积的消息不需要使用
删除消息即可。(可以在RabbitMQ控制台删除,或者使用命令)。
相关文章:

SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丢失、消息重复、消息顺序、消息顺序)
SpringBoot教程(十五) | SpringBoot集成RabbitMq(消息丢失、消息重复、消息顺序、消息顺序) RabbitMQ常见问题解决方案问题一:消息丢失的解决方案(1)生成者丢失消息丢失的情景解决方案1…...

TensorRT-LLM高级用法
--multi_block_mode decoding phase, 推理1个新token, 平时:按照batch样本,按照head,将计算平均分给所有SM; batch_size*num_heads和SM数目相比较小时:有些SM会空闲;加了--multi_block_mode&…...

文心一言功能新升级:读文档、懂翻译、能识图
9月4日,百度文心一言官网显示,在向全社会开放一周年之际,文心一言进行了功能最新全面升级,同时在周年期间为新老会员增加1个月专业版免费使用体验。 据了解,针对网页版用户需求,文心一言实现了创作内容更加…...

C++机试——走方格的方案
题目 请计算n*m的棋盘格子(n为横向的格子数,m为竖向的格子数)从棋盘左上角出发沿着边缘线从左上角走到右下角,总共有多少种走法,要求不能走回头路,即:只能往右和往下走,不能往左和往…...

Bootstrap 字体图标无法显示问题,<i>标签字体图标无法显示问题
bootstrap fileInput 以及 Bootstrap 字体图标无法显示问题。 今天在用 bootstrap fileInput 插件的时候发现图标无法显示,如下: 查看DOM,发现那些图标是<i>标签做的: 网上的方案 方案1 网上很多人说是我们打乱了boots…...

docker registry 仓库加密
docker registry 仓库加密 1、背景 公司一直用的镜像仓库是docker registry,但是有个安全问题,就是仓库从web ui的浏览到镜像的拉取都是可以直接使用的,还是放到了公网上,只需要知道你的域名那就是畅通无阻了,可以…...

利用高德+ArcGIS优雅获取任何感兴趣的矢量边界
荷花十里,清风鉴水,明月天衣。 四时之景不同,乐亦无穷尽也。今天呢,梧桐君给大家讲解一下,如何利用高德地图,随机所欲的获取shp边界数据。 文章主要分成以下几个步骤: 首先搜索你想获取的矢量…...

炮弹【USACO】
题目背景 时/空限制:1s / 64MB 题目描述 贝茜已经精通了变成炮弹并沿着长度为 N 的数轴弹跳的艺术,数轴上的位置从左到右编号为 1,2,…,N 。 她从某个整数位置 S 开始,以 1 的起始能量向右弹跳。 如果贝茜的能量为 k ,则她将…...

python如何读取excel文件内的数据
目录 前言一、安装openpyxl二、读取Excel数据总结前言 在Python中读取Excel数据,最常用的库之一是openpyxl(用于.xlsx格式)和xlrd(尽管xlrd从版本2.0开始不再支持.xlsx,仅支持旧的.xls格式)。然而,对于大多数现代应用来说,openpyxl是一个更好的选择,因为它支持.xlsx格…...

Java项目: 基于SpringBoot+mybatis+maven+mysql教师工作量管理系统(含源码+数据库+毕业论文)
一、项目简介 本项目是一套基于SpringBootmybatismavenmysql教师工作量管理系统 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格调试,eclipse或者idea 确保可以运行! 该系统功能完善、界面美观…...

项目开发--数据库--postgresql数据库操作
背景 1、安装postgresql的基础方法 2、基本操作命令 解决方案 安装命令 在ubuntu环境当中进行安装。 sudo apt install postgresql安装完毕之后直接进行测试,如果看到如下内容则安装成功。 sudo systemctl status postgresql使用DBeaver进行连接报错ÿ…...

c语言——用一维数组输出杨辉三角形
一.代码 #include <stdio.h> int Num[100]; int Hang; int Lie; int a; int Flag; int main() {Lie 1;Hang 1;a 0;while (1) {//列1为1if (Lie 1) {Num[1] 1;Lie;}//数据存到数组里面while (Hang > Lie && Hang ! 2) { if (Hang!Lie) {Flag Num[Lie] …...

Codeforces Round 971 (Div. 4) (A~G1)
A、B题太简单,不做解释 C 对于 x y 两个方向,每一个方向至少需要 x / k 向上取整的步数,取最大值。 由于 x 方向先移动,假如 x 方向需要的步数多于 y 方向的步数,那么最后 y 方向的那一步就不需要了,答案…...

为什么构造函数不能为虚函数?为什么析构函数可以为虚函数,如果不设为虚函数可能会存在什么问题?
目录 一、为什么构造函数不能为虚函数? 二、为什么析构函数可以是虚函数?如果不设为虚函数可能会存在什么问题? 构造函数不能为虚函数,因为在构造过程中,虚函数机制尚未生效,对象还未完成构造,…...

【数据结构】单链表功能的实现
目录 1.链表的概念及结构 2.单链表功能的实现 2.1打印单链表 2.2创建节点 2.3单链表尾插 2.3单链表头插 2.5单链表尾删 2.6单链表头删 2.7单链表的查找 2.8在指定位置之前插入数据 2.9在指定位置之后插入数据 2.10删除pos节点 2.11删除pos之后的节点 2.12销毁链表…...

最新车型库大全|阿里云实现调用API接口
整体请求流程: 介绍: 本次解析通过阿里云云市场的云服务来实现查询车型库大全查询,首先需要选择一家可以提供查询的商品。 [探数API]车型库查询_API专区_云市场-阿里云 步骤1: 选择商品 如图点击免费试用,即可免费申请该接口数…...

70. 爬楼梯
70. 爬楼梯 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢? 示例 1: 输入:n 2 输出:2 解释:有两种方法可以爬到楼顶。 1.1 阶 1 阶 2.2 阶 示例…...

pytorch正向传播没问题,loss.backward()使定义的神经网络中权重参数变为nan
记录一个非常坑爹的bug:loss回传导致神经网络中一个linear层的权重参数变为nan 1.首先loss值是正常数值; 2.查了好多网上的解决办法:检查原始输入神经网络数据有没有nan值,初始化权重参数,使用relu激活函数,梯度裁剪&a…...

❤《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案
《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案 文章目录 《实战纪录片 1 》原生开发小程序中遇到的问题和解决方案1、问题一:原生开发中 request请求中返回 的数据无法 使用this传递给 data{}中怎么办?2、刚登录后如何将token信息保存…...

2024.9.6 作业
手写unique_ptr指针指针 代码: #include <iostream> #include <stdexcept>template <typename T> class unique_ptr { public:// 构造函数explicit unique_ptr(T* ptr nullptr) : m_ptr(ptr) {}// 析构函数~unique_ptr() {delete m_ptr;}// 禁…...

2024年架构设计师论文-“模型驱动架构设计方法及其应用”
论模型驱动架构设计方法及其应用 模型驱动架构设计是一种用于应用系统开发的软件设计方法,以模型构造、模型转换和精化为核心,提供了一套软件设计的指导规范。在模型驱动架构环境下,通过创建出机器可读和高度抽象的模型实现对不同问题域的描述…...

Tapd敏捷开发平台的使用心得
Tapd敏捷开发平台的使用心得 一、Tapd 简介 TAPD(Tencent Agile Product Development),腾讯敏捷产品研发平台行业领先的敏捷协作方案,贯穿敏捷产品研发生命周期的一站式服务,了解敏捷如下图 二、几个核心模块概念 需求迭代缺陷故事墙前期项目需求的管理,可以按类别建…...

远程桌面 Rust Desk 自建服务器
因为某些原因(诈骗),Rush Desk 服务已暂停国内访问,今天我们介绍如何利用自己的服务器搭建 Rust Desk 远程桌面,低延迟电脑远程手机,手机远程电脑等 一、准备工作 准备一台服务器,我用的腾讯云服务器,一年…...

开源网安引领AIGC+开发安全,智能防护铸就软件安全新高度
近日,国内网络安全领域知名媒体数说安全正式发布了《2024年中国网络安全市场100强》和《2024年中国网络安全十大创新方向》。开源网安凭借在市场表现力、资源支持力以及产品在AI方向的创新力上的优秀表现成功入选百强榜单,并被评为“AIGC开发安全”典型厂…...

树和二叉树
树 节点(Node:) 树由一系列的节点组成,每个节点可以包含数据和指向其他节点的链接。 节点通常包含一个数据元素和若干指向其他节点的指针 根节点(Root): 树的顶部节点称为根节点,…...

一篇带你速通差分算法(C/C++)
个人主页:摆烂小白敲代码 创作领域:算法、C/C 持续更新算法领域的文章,让博主在您的算法之路上祝您一臂之力 欢迎各位大佬莅临我的博客,您的关注、点赞、收藏、评论是我持续创作最大的动力 差分算法是一种在计算机科学中常用的算法…...

贷款利率高低跟什么有关?仅凭身份证就能贷到款?额度是多少?
在金融的广阔舞台上,借款人的“信用基石”——即其综合资质,是决定贷款利率高低的决定性因素。这并非偶然,而是银行基于详尽的风险评估与收益预期所做出的精准判断。 需明确的是,贷款的易得性并不意味着无门槛的放任。它更像是设置…...

苹果电脑需要安装杀毒软件吗?探索Mac的安全世界!
在聊到电脑安全时,许多Mac用户都骄傲地声称:“我的Mac是不会中病毒的!”确实,与Windows PC相比,Mac因其UNIX-based的操作系统构架,天生就更加安全。但这是否意味着Mac完全不需要杀毒软件呢?让我…...

Oracle start with connect BY 死循环
解决办法 检查start with前有没有where条件, 如果有的话,套一层select,再 Oracle start with connect BY...

力扣接雨水
给定 n 个非负整数表示每个宽度为 1 的柱子的高度图,计算按此排列的柱子,下雨之后能接多少雨水。 示例 1: 输入:height [0,1,0,2,1,0,1,3,2,1,2,1] 输出:6 解释:上面是由数组 [0,1,0,2,1,0,1,3,2,1,2,1] 表…...