基于SpringBoot解决RabbitMQ消息丢失问题
基于SpringBoot解决RabbitMQ消息丢失问题
- 一、RabbitMQ解决消息丢失问题
- 二、方案实践
- 1、在生产者服务相关配置
- 2、在消费者服务相关配置
- 三、测试验证
- 1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer
- 2、在producer中调用接口,发送消息
- 3、观察控制台打印日志
- 四、项目结构及源码
- 1、项目结构
- 2、源码下载
一、RabbitMQ解决消息丢失问题
RabbitMQ通过以下机制来保证消息的可靠性,从而解决消息丢失问题:
(1)消息持久化:RabbitMQ支持将消息持久化到磁盘,即使RabbitMQ服务器宕机或重启,消息也不会丢失。在发布消息时,可以设置消息的持久化标志,这样消息就会被写入磁盘中,而不是仅仅保存在内存中。在自定义MQ的配置中,设置消息队列和交换机,默认为true代表持久化。
(2)消息确认机制:RabbitMQ提供了消息确认机制,即生产者在发送消息后,可以等待RabbitMQ服务器返回确认信息,以确保消息已经被正确地接收和处理。如果RabbitMQ服务器没有返回确认信息,生产者可以选择重新发送消息或者采取其他的补救措施。在自定义MQ的配置中,配置RabbitTemplate,设置setConfirmCallback和setReturnCallback进行确认。
(3)事务机制:RabbitMQ还支持事务机制,即生产者可以将多个操作封装在一个事务中,只有当所有的操作都成功完成后,才提交事务。如果某个操作失败,整个事务会被回滚,从而保证消息的完整性和一致性。
(4)消息重试机制:如果消息在传输过程中出现异常,RabbitMQ会自动进行消息重试,直到消息被正确地处理为止。可以通过设置重试次数和重试时间间隔来控制消息重试的行为。可以在application.yaml配置文件中设置
rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /#1、确保消息从发送端到服务端投递可靠(分为以下两个步骤)#1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为 publisher-confirm-type: correlatepublisher-confirm-type: correlated#1.2、确认消息从交换机中到队列中publisher-returns: true
综上所述,RabbitMQ通过持久化、确认、事务和重试等机制来保证消息的可靠性,从而解决消息丢失的问题。下面按照上述4个机制进行代码实践。
二、方案实践
1、在生产者服务相关配置
1.1 通过自定义RabbitConfig设置消息投递失败的策略为返回到客户端,处理返回的消息(请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去)。
@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。//我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);} else {log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);}}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去,// 而是提交在交换器中,过期时间到了才提交上去,并非是bug!你可以用if进行判断交换机名称来捕捉该报错/*if (exchange.equals("你声明的延迟队列的交换机")) {return;}*/log.info("ReturnsCallback 消息被退回:{},回应码:{},回应信息:{},交换机:{},路由键:{}", returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey());}});return rabbitTemplate;}
}
DirectRabbitConfig直连交换机配置
@Slf4j
@Configuration
public class DirectRabbitConfig {private static final String QUEUE = "TestDirectQueue";private static final String EXCHANGE = "TestDirectExchange";private static final String ROUTING_KEY = "TestDirectRouting";/*** 创建一个名为TestDirectQueue的队列** @return*/@Beanpublic Queue testDirectQueue() {// durable:是否持久化,默认为true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。return new Queue(QUEUE);}/*** 创建一个名为TestDirectExchange的Direct类型的交换机** @return*/@Beanpublic DirectExchange testDirectExchange() {// durable:是否持久化,true,持久化交换机。// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机return new DirectExchange(EXCHANGE);}/*** 绑定交换机和队列** @return*/@Beanpublic Binding bindingDirect() {//bind队列to交换机中with路由key(routing key)return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY);}
}
2、在消费者服务相关配置
2.1 配置DirectConsumer
/*** 直连交换机消息** @author* @DATE 2025/6/2**/
@RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {@RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {/*@RabbitHandlerpublic void process(Object data, Channel channel, Message message) throws IOException {log.info("消费者接受到的消息是:{},消息体为:{}", data, message);//由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}*/@RabbitHandlerpublic void process(Object message, Channel channel,@Headers Map<String,Object> map) {System.out.println(message);//这里只是模拟业务,其实还有很多可能,比如验证用户的银行卡号已经过期等等,都可以发出nackif (map.get("error")!= null){System.out.println("错误的消息");try {channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true); //否认消息return;} catch (IOException e) {e.printStackTrace();}}try {System.out.println("业务在这里执行!");channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //确认消息} catch (IOException e) {//实际业务场景,可能需要将上面的channel.basicNack,放到异常里面进行重试!e.printStackTrace();}}}
三、测试验证
1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer
首先调用接口,生成交换机和队列,否则,在启动consumer时候会报找不到交换机和队列错误。
然后在RabbitMQ界面手动清空队列消息,防止干扰本次实验。
2、在producer中调用接口,发送消息
3、观察控制台打印日志
生产者控制台
消费者控制台
可以看到由于接口中第1,4,5条消息会正常发送,所以在consumer已经进行了正常消费,并且针对第5条进行了业务重试。
由于第2,3条分别由于交换机错误或者队列错误,导致消息发送失败。
四、项目结构及源码
1、项目结构
2、源码下载
RabbitMQ,欢迎Star
相关文章:

基于SpringBoot解决RabbitMQ消息丢失问题
基于SpringBoot解决RabbitMQ消息丢失问题 一、RabbitMQ解决消息丢失问题二、方案实践1、在生产者服务相关配置2、在消费者服务相关配置 三、测试验证1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer2、在producer中调用接口࿰…...

免费插件集-illustrator插件-Ai插件-随机填色
文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件,加强illustrator使用人员工作效率,实现路径随机填色。首先从下载网址下载这款插件https://download.csdn.net/download/m0_67316550/87890501&#…...
使用 Unstructured 开源库快速入门指南
引言 本文将介绍如何使用 Unstructured 开源库(GitHub,PyPI)和 Python,在本地开发环境中将 PDF 文件拆分为标准的 Unstructured 文档元素和元数据。这些元素和元数据可用于 RAG(检索增强生成)应用、AI 代理…...
白银6月想法
一、市场回顾 2025年5月,SHFE白银主力合约总体呈现出震荡偏强的运行格局。从2025年5月1日至2025年5月30日,白银期货价格整体运行在7944元至8342元区间内,最高价出现在5月22日的8342.0元,最低价则为5月15日的7944元。最终在5月30日…...
OpenCV 滑动条调整图像对比度和亮度
一、知识点 1、int createTrackbar(const String & trackbarname, const String & winname, int * value, int count, TrackbarCallback onChange 0, void * userdata 0); (1)、创建一个滑动条并将其附在指定窗口上。 (2)、参数说明: trackbarname: 创建的…...
船舶事故海上搜救VR情景演练全场景 “复刻”,沉浸式救援体验
船舶事故海上搜救 VR 情景演练系统的一大核心优势,便是能够全场景 “复刻” 海上事故,为使用者带来沉浸式的船舶事故海上搜救 VR 情景演练体验。 在船舶事故海上搜救 VR 情景演练的事故场景模拟方面,系统几乎涵盖了所有常见的船舶事故类型。…...
使用Caddy在Ubuntu 22.04上配置HTTPS反向代理
使用Caddy在Ubuntu 22.04上配置HTTPS反向代理(无域名/IP验证+密码保护) 一、 环境说明 环境说明:测试环境,生产环境请谨慎OS: Ubuntu 22.04.1 LTSCaddy版本:v2.10.0服务器IP: 192.168.3.88(内网)公网IP: 10.2.3.11(测试虚拟)代理端口: 9080后端服务: http://192.168.3…...
无人机目标检测与语义分割数据集(猫脸码客)
UAV 无人机数据集:驱动无人机配送研究迈向新高度 在科技浪潮的迅猛推动下,无人机配送这一新兴物流模式正以前所未有的态势,悄然改变着人们的生活图景。为深入挖掘并优化无人机配送技术,名为 UAV Delivery 的无人机数据集应运而生…...

Web设计之登录网页源码分享,PHP数据库连接,可一键运行!
HTML 页面结构(index.html) 1. 流星雨动态背景 2. 主体界面(包含登录和注册表单) <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

Cursor + Claude 4:微信小程序流量主变现开发实战案例
前言 随着微信小程序生态的日益成熟,越来越多的开发者开始关注如何通过小程序实现流量变现。本文将详细介绍如何使用Cursor编辑器结合Claude 4 AI助手,快速开发一个具备流量主变现功能的微信小程序,并分享实际的开发经验和变现策略。 项目…...
㊗️高考加油
以下是极为详细的高考注意事项清单,涵盖考前、考中、考后全流程,建议逐条核对: 一、考前准备 1. 证件与物品 必带清单: 准考证:打印2份(1份备用),塑封或夹在透明文件袋中防皱湿。身…...

Redis Key过期策略
概述 Redis的Key过期策略是其内存管理系统的核心组成部分,主要包括「被动过期」、「主动过期」和「内存淘汰」三个机制。其中「内存淘汰」相关内容已经在上一篇「Redis内存淘汰策略」中进行了详细的讲解,有信兴趣的同学可以在回顾上一篇文章。本文将着重…...

【C/C++】实现固定地址函数调用
在 C 里,函数地址在程序运行期间通常是固定的,不过在动态链接库(DLL)或者共享库(SO)中,函数地址可能会因为地址空间布局随机化(ASLR)而改变。所以我们想要通过地址直接调…...

多模态大语言模型arxiv论文略读(109)
Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文标题:Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文作者:Wenwen Zhuang, Xin Huang, Xiantao Z…...

性能优化笔记
性能优化转载 https://www.cnblogs.com/tengzijian/p/17858112.html 性能优化的一般策略及方法 简言之,非必要,不优化。先保证良好的设计,编写易于理解和修改的整洁代码。如果现有的代码很糟糕,先清理重构,然后再考…...
bat批量去掉本文件夹中的文件扩展名
本文本夹内 批量去掉本文件夹中的文件扩展名 假如你有一些文件,你想去掉他们的扩展名 有没有方便的办法呢 今天我们就分享一种办法。 下面,就来看看吧。 首先我们新建一个记事本,把名字改为,批量去掉本文件夹中的文件扩展名.txt 然…...
基于ROS2,撰写python脚本,根据给定的舵-桨动力学模型实现动力学更新
提问 #! /usr/bin/env python3from control_planner import usvParam as P from control_planner.courseController import courseLimitationimport tf_transformations # ROS2没有自带tf.transformations, 需装第三方库 import rclpy from rclpy.node import Node from pid_…...

Scrapy爬虫教程(新手)
1. Scrapy的核心组成 引擎(engine):scrapy的核心,所有模块的衔接,数据流程梳理。 调度器(scheduler):本质可以看成一个集合和队列,里面存放着一堆即将要发送的请求&#…...
数据可视化大屏案例落地实战指南:捷码平台7天交付方法论
分享大纲: 1、落地前置:数据可视化必备的规划要素 2、数据可视化双路径开发 3、验证案例:数据可视化落地成效 在当下数字化转型浪潮中,数据可视化建设已成为关键环节。数据可视化大屏的落地,成为企业数据可视化建设的难…...
第五篇:Go 并发模型全解析——Channel、Goroutine
第五篇:Go 并发模型全解析——Channel、Goroutine 一、序章:Java 的并发往事 在 Java 世界中,说到“并发”,你可能立马想到以下名词:Thread、Runnable、ExecutorService、synchronized、volatile。再复杂点,ReentrantLock、CountDownLatch、BlockingQueue 纷纷登场,仿…...
锁的艺术:深入浅出讲解乐观锁与悲观锁
在多线程和分布式系统中,数据一致性是一个核心问题。锁机制作为解决并发冲突的重要手段,被广泛应用于各种场景。乐观锁和悲观锁是两种常见的锁策略,它们在设计理念、实现方式和适用场景上各有特点。本文将深入探讨乐观锁和悲观锁的原理、实现…...
在网页加载时自动运行js的方法(2025最新)
在网页加载时自动运行JavaScript方法有多种实现方式,以下是常见的几种方法: 1. 使用 DOMContentLoaded 事件 当初始HTML文档完全加载和解析后触发(无需等待图片等资源加载): document.addEventListener(DOMC…...

在Windows下编译出llama_cpp_python的DLL后,在虚拟环境中使用方法
定位编译生成的文件 在VS2022编译完成后,在构建目录(如build/Release或build/Debug)中寻找以下关键文件: ggml.dll、ggml_base.dll、ggml_cpu.dll、ggml_cuda.dll、llama.dll(核心动态链接库) llama_cp…...
CSS radial-gradient函数详解
目录 基本语法 关键参数详解 1. 渐变形状(Shape) 2. 渐变大小(Size) 3. 中心点位置(Position) 4. 颜色断点(Color Stops) 常见应用场景 1. 基本圆形渐变 2. 椭圆渐变 3. 模…...
n8n 自动化平台 Docker 部署教程(附 PostgreSQL 与更新指南)
n8n 自动化平台 Docker 部署教程(附 PostgreSQL 与更新指南) n8n 是一个强大的可视化工作流自动化工具,支持无代码或低代码地集成各种服务。本文将手把手教你如何通过 Docker 快速部署 n8n,并介绍如何使用 PostgreSQL、设置时区以…...

关于datetime获取时间的问题
import datetime print(datetime.now())如果用上述代码,会报错: 以下才是正确代码: from datetime import datetime print(datetime.now()) 结果: 如果想格式化时间,使用代码: from datetime import da…...
前端面试五之vue2基础
1.属性绑定v-bind(:) v-bind 是 Vue 2 中用于动态绑定属性的核心指令,它支持多种语法和用法,能够灵活地绑定 DOM 属性、组件 prop,甚至动态属性名。通过 v-bind,可以实现数据与视图之间的高效同…...
使用python实现奔跑的线条效果
效果,展示(视频效果展示): 奔跑的线条 from turtle import * import time t1Turtle() t2Turtle() t3Turtle() t1.hideturtle() t2.hideturtle() t3.hideturtle() t1.pencolor("red") t2.pencolor("green") t3…...
Oracle 审计参数:AUDIT_TRAIL 和 AUDIT_SYS_OPERATIONS
Oracle 审计参数:AUDIT_TRAIL 和 AUDIT_SYS_OPERATIONS 一 AUDIT_TRAIL 参数 1.1 参数功能 AUDIT_TRAIL 是 Oracle 数据库中最核心的审计控制参数,决定审计记录的存储位置和记录方式。 1.2 参数取值及含义 取值说明适用场景NONE禁用数据库审计测试环…...
Android LinearLayout、FrameLayout、RelativeLayout、ConstraintLayout大混战
一、为什么布局性能如此重要? 在Android应用中,布局渲染耗时直接决定了界面的流畅度。根据Google官方数据,超过60%的卡顿问题源于布局性能不佳。本文将彻底解析三大传统布局的性能奥秘,并提供可直接落地的优化方案。 二、三大布局…...