当前位置: 首页 > article >正文

基于SpringBoot解决RabbitMQ消息丢失问题

基于SpringBoot解决RabbitMQ消息丢失问题

  • 一、RabbitMQ解决消息丢失问题
  • 二、方案实践
    • 1、在生产者服务相关配置
    • 2、在消费者服务相关配置
  • 三、测试验证
    • 1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer
    • 2、在producer中调用接口,发送消息
    • 3、观察控制台打印日志
  • 四、项目结构及源码
    • 1、项目结构
    • 2、源码下载

一、RabbitMQ解决消息丢失问题

RabbitMQ通过以下机制来保证消息的可靠性,从而解决消息丢失问题:

(1)消息持久化:RabbitMQ支持将消息持久化到磁盘,即使RabbitMQ服务器宕机或重启,消息也不会丢失。在发布消息时,可以设置消息的持久化标志,这样消息就会被写入磁盘中,而不是仅仅保存在内存中。在自定义MQ的配置中,设置消息队列和交换机,默认为true代表持久化。

(2)消息确认机制:RabbitMQ提供了消息确认机制,即生产者在发送消息后,可以等待RabbitMQ服务器返回确认信息,以确保消息已经被正确地接收和处理。如果RabbitMQ服务器没有返回确认信息,生产者可以选择重新发送消息或者采取其他的补救措施。在自定义MQ的配置中,配置RabbitTemplate,设置setConfirmCallback和setReturnCallback进行确认。

(3)事务机制:RabbitMQ还支持事务机制,即生产者可以将多个操作封装在一个事务中,只有当所有的操作都成功完成后,才提交事务。如果某个操作失败,整个事务会被回滚,从而保证消息的完整性和一致性。

(4)消息重试机制:如果消息在传输过程中出现异常,RabbitMQ会自动进行消息重试,直到消息被正确地处理为止。可以通过设置重试次数和重试时间间隔来控制消息重试的行为。可以在application.yaml配置文件中设置

  rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /#1、确保消息从发送端到服务端投递可靠(分为以下两个步骤)#1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为  publisher-confirm-type: correlatepublisher-confirm-type: correlated#1.2、确认消息从交换机中到队列中publisher-returns: true

综上所述,RabbitMQ通过持久化、确认、事务和重试等机制来保证消息的可靠性,从而解决消息丢失的问题。下面按照上述4个机制进行代码实践。

二、方案实践

1、在生产者服务相关配置

1.1 通过自定义RabbitConfig设置消息投递失败的策略为返回到客户端,处理返回的消息(请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去)。

@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。//我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);} else {log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);}}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {// 请注意!如果你使用了延迟队列插件,那么一定会调用该callback方法,因为数据并没有提交上去,// 而是提交在交换器中,过期时间到了才提交上去,并非是bug!你可以用if进行判断交换机名称来捕捉该报错/*if (exchange.equals("你声明的延迟队列的交换机")) {return;}*/log.info("ReturnsCallback 消息被退回:{},回应码:{},回应信息:{},交换机:{},路由键:{}", returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey());}});return rabbitTemplate;}
}

DirectRabbitConfig直连交换机配置

@Slf4j
@Configuration
public class DirectRabbitConfig {private static final String QUEUE = "TestDirectQueue";private static final String EXCHANGE = "TestDirectExchange";private static final String ROUTING_KEY = "TestDirectRouting";/*** 创建一个名为TestDirectQueue的队列** @return*/@Beanpublic Queue testDirectQueue() {// durable:是否持久化,默认为true,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。return new Queue(QUEUE);}/*** 创建一个名为TestDirectExchange的Direct类型的交换机** @return*/@Beanpublic DirectExchange testDirectExchange() {// durable:是否持久化,true,持久化交换机。// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机return new DirectExchange(EXCHANGE);}/*** 绑定交换机和队列** @return*/@Beanpublic Binding bindingDirect() {//bind队列to交换机中with路由key(routing key)return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY);}
}

2、在消费者服务相关配置

2.1 配置DirectConsumer

/*** 直连交换机消息** @author* @DATE 2025/6/2**/
@RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {@RabbitListener(queues = "TestDirectQueue")
@Component
@Slf4j
public class DirectConsumer {/*@RabbitHandlerpublic void process(Object data, Channel channel, Message message) throws IOException {log.info("消费者接受到的消息是:{},消息体为:{}", data, message);//由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}*/@RabbitHandlerpublic void process(Object message, Channel channel,@Headers Map<String,Object> map) {System.out.println(message);//这里只是模拟业务,其实还有很多可能,比如验证用户的银行卡号已经过期等等,都可以发出nackif (map.get("error")!= null){System.out.println("错误的消息");try {channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);      //否认消息return;} catch (IOException e) {e.printStackTrace();}}try {System.out.println("业务在这里执行!");channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);            //确认消息} catch (IOException e) {//实际业务场景,可能需要将上面的channel.basicNack,放到异常里面进行重试!e.printStackTrace();}}}

三、测试验证

1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer

在这里插入图片描述
首先调用接口,生成交换机和队列,否则,在启动consumer时候会报找不到交换机和队列错误。
然后在RabbitMQ界面手动清空队列消息,防止干扰本次实验。
在这里插入图片描述

2、在producer中调用接口,发送消息

在这里插入图片描述

3、观察控制台打印日志

生产者控制台
在这里插入图片描述
消费者控制台
在这里插入图片描述
可以看到由于接口中第1,4,5条消息会正常发送,所以在consumer已经进行了正常消费,并且针对第5条进行了业务重试。
由于第2,3条分别由于交换机错误或者队列错误,导致消息发送失败。

四、项目结构及源码

1、项目结构

在这里插入图片描述

2、源码下载

RabbitMQ,欢迎Star

相关文章:

基于SpringBoot解决RabbitMQ消息丢失问题

基于SpringBoot解决RabbitMQ消息丢失问题 一、RabbitMQ解决消息丢失问题二、方案实践1、在生产者服务相关配置2、在消费者服务相关配置 三、测试验证1、依次启动RabbitMQ、producer(建议先清空队列里面旧的测试消息再启动consumer)和consumer2、在producer中调用接口&#xff0…...

免费插件集-illustrator插件-Ai插件-随机填色

文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件&#xff0c;加强illustrator使用人员工作效率&#xff0c;实现路径随机填色。首先从下载网址下载这款插件https://download.csdn.net/download/m0_67316550/87890501&#…...

使用 Unstructured 开源库快速入门指南

引言 本文将介绍如何使用 Unstructured 开源库&#xff08;GitHub&#xff0c;PyPI&#xff09;和 Python&#xff0c;在本地开发环境中将 PDF 文件拆分为标准的 Unstructured 文档元素和元数据。这些元素和元数据可用于 RAG&#xff08;检索增强生成&#xff09;应用、AI 代理…...

白银6月想法

一、市场回顾 2025年5月&#xff0c;SHFE白银主力合约总体呈现出震荡偏强的运行格局。从2025年5月1日至2025年5月30日&#xff0c;白银期货价格整体运行在7944元至8342元区间内&#xff0c;最高价出现在5月22日的8342.0元&#xff0c;最低价则为5月15日的7944元。最终在5月30日…...

OpenCV 滑动条调整图像对比度和亮度

一、知识点 1、int createTrackbar(const String & trackbarname, const String & winname, int * value, int count, TrackbarCallback onChange 0, void * userdata 0); (1)、创建一个滑动条并将其附在指定窗口上。 (2)、参数说明: trackbarname: 创建的…...

船舶事故海上搜救VR情景演练全场景 “复刻”,沉浸式救援体验​

船舶事故海上搜救 VR 情景演练系统的一大核心优势&#xff0c;便是能够全场景 “复刻” 海上事故&#xff0c;为使用者带来沉浸式的船舶事故海上搜救 VR 情景演练体验。​ 在船舶事故海上搜救 VR 情景演练的事故场景模拟方面&#xff0c;系统几乎涵盖了所有常见的船舶事故类型。…...

使用Caddy在Ubuntu 22.04上配置HTTPS反向代理

使用Caddy在Ubuntu 22.04上配置HTTPS反向代理(无域名/IP验证+密码保护) 一、 环境说明 环境说明:测试环境,生产环境请谨慎OS: Ubuntu 22.04.1 LTSCaddy版本:v2.10.0服务器IP: 192.168.3.88(内网)公网IP: 10.2.3.11(测试虚拟)代理端口: 9080后端服务: http://192.168.3…...

无人机目标检测与语义分割数据集(猫脸码客)

UAV 无人机数据集&#xff1a;驱动无人机配送研究迈向新高度 在科技浪潮的迅猛推动下&#xff0c;无人机配送这一新兴物流模式正以前所未有的态势&#xff0c;悄然改变着人们的生活图景。为深入挖掘并优化无人机配送技术&#xff0c;名为 UAV Delivery 的无人机数据集应运而生…...

Web设计之登录网页源码分享,PHP数据库连接,可一键运行!

HTML 页面结构&#xff08;index.html&#xff09; 1. 流星雨动态背景 2. 主体界面&#xff08;包含登录和注册表单&#xff09; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

Cursor + Claude 4:微信小程序流量主变现开发实战案例

前言 随着微信小程序生态的日益成熟&#xff0c;越来越多的开发者开始关注如何通过小程序实现流量变现。本文将详细介绍如何使用Cursor编辑器结合Claude 4 AI助手&#xff0c;快速开发一个具备流量主变现功能的微信小程序&#xff0c;并分享实际的开发经验和变现策略。 项目…...

㊗️高考加油

以下是极为详细的高考注意事项清单&#xff0c;涵盖考前、考中、考后全流程&#xff0c;建议逐条核对&#xff1a; 一、考前准备 1. 证件与物品 必带清单&#xff1a; 准考证&#xff1a;打印2份&#xff08;1份备用&#xff09;&#xff0c;塑封或夹在透明文件袋中防皱湿。身…...

Redis Key过期策略

概述 Redis的Key过期策略是其内存管理系统的核心组成部分&#xff0c;主要包括「被动过期」、「主动过期」和「内存淘汰」三个机制。其中「内存淘汰」相关内容已经在上一篇「Redis内存淘汰策略」中进行了详细的讲解&#xff0c;有信兴趣的同学可以在回顾上一篇文章。本文将着重…...

【C/C++】实现固定地址函数调用

在 C 里&#xff0c;函数地址在程序运行期间通常是固定的&#xff0c;不过在动态链接库&#xff08;DLL&#xff09;或者共享库&#xff08;SO&#xff09;中&#xff0c;函数地址可能会因为地址空间布局随机化&#xff08;ASLR&#xff09;而改变。所以我们想要通过地址直接调…...

多模态大语言模型arxiv论文略读(109)

Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文标题&#xff1a;Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文作者&#xff1a;Wenwen Zhuang, Xin Huang, Xiantao Z…...

性能优化笔记

性能优化转载 https://www.cnblogs.com/tengzijian/p/17858112.html 性能优化的一般策略及方法 简言之&#xff0c;非必要&#xff0c;不优化。先保证良好的设计&#xff0c;编写易于理解和修改的整洁代码。如果现有的代码很糟糕&#xff0c;先清理重构&#xff0c;然后再考…...

bat批量去掉本文件夹中的文件扩展名

本文本夹内 批量去掉本文件夹中的文件扩展名 假如你有一些文件&#xff0c;你想去掉他们的扩展名 有没有方便的办法呢 今天我们就分享一种办法。 下面&#xff0c;就来看看吧。 首先我们新建一个记事本&#xff0c;把名字改为&#xff0c;批量去掉本文件夹中的文件扩展名.txt 然…...

基于ROS2,撰写python脚本,根据给定的舵-桨动力学模型实现动力学更新

提问 #! /usr/bin/env python3from control_planner import usvParam as P from control_planner.courseController import courseLimitationimport tf_transformations # ROS2没有自带tf.transformations, 需装第三方库 import rclpy from rclpy.node import Node from pid_…...

Scrapy爬虫教程(新手)

1. Scrapy的核心组成 引擎&#xff08;engine&#xff09;&#xff1a;scrapy的核心&#xff0c;所有模块的衔接&#xff0c;数据流程梳理。 调度器&#xff08;scheduler&#xff09;&#xff1a;本质可以看成一个集合和队列&#xff0c;里面存放着一堆即将要发送的请求&#…...

数据可视化大屏案例落地实战指南:捷码平台7天交付方法论

分享大纲&#xff1a; 1、落地前置&#xff1a;数据可视化必备的规划要素 2、数据可视化双路径开发 3、验证案例&#xff1a;数据可视化落地成效 在当下数字化转型浪潮中&#xff0c;数据可视化建设已成为关键环节。数据可视化大屏的落地&#xff0c;成为企业数据可视化建设的难…...

第五篇:Go 并发模型全解析——Channel、Goroutine

第五篇:Go 并发模型全解析——Channel、Goroutine 一、序章:Java 的并发往事 在 Java 世界中,说到“并发”,你可能立马想到以下名词:Thread、Runnable、ExecutorService、synchronized、volatile。再复杂点,ReentrantLock、CountDownLatch、BlockingQueue 纷纷登场,仿…...

锁的艺术:深入浅出讲解乐观锁与悲观锁

在多线程和分布式系统中&#xff0c;数据一致性是一个核心问题。锁机制作为解决并发冲突的重要手段&#xff0c;被广泛应用于各种场景。乐观锁和悲观锁是两种常见的锁策略&#xff0c;它们在设计理念、实现方式和适用场景上各有特点。本文将深入探讨乐观锁和悲观锁的原理、实现…...

在网页加载时自动运行js的方法(2025最新)

在网页加载时自动运行JavaScript方法有多种实现方式&#xff0c;以下是常见的几种方法&#xff1a; 1. ​​使用 DOMContentLoaded 事件​​ 当初始HTML文档完全加载和解析后触发&#xff08;无需等待图片等资源加载&#xff09;&#xff1a; document.addEventListener(DOMC…...

在Windows下编译出llama_cpp_python的DLL后,在虚拟环境中使用方法

定位编译生成的文件 在VS2022编译完成后&#xff0c;在构建目录&#xff08;如build/Release或build/Debug&#xff09;中寻找以下关键文件&#xff1a; ggml.dll、ggml_base.dll、ggml_cpu.dll、ggml_cuda.dll、llama.dll&#xff08;核心动态链接库&#xff09; llama_cp…...

CSS radial-gradient函数详解

目录 基本语法 关键参数详解 1. 渐变形状&#xff08;Shape&#xff09; 2. 渐变大小&#xff08;Size&#xff09; 3. 中心点位置&#xff08;Position&#xff09; 4. 颜色断点&#xff08;Color Stops&#xff09; 常见应用场景 1. 基本圆形渐变 2. 椭圆渐变 3. 模…...

n8n 自动化平台 Docker 部署教程(附 PostgreSQL 与更新指南)

n8n 自动化平台 Docker 部署教程&#xff08;附 PostgreSQL 与更新指南&#xff09; n8n 是一个强大的可视化工作流自动化工具&#xff0c;支持无代码或低代码地集成各种服务。本文将手把手教你如何通过 Docker 快速部署 n8n&#xff0c;并介绍如何使用 PostgreSQL、设置时区以…...

关于datetime获取时间的问题

import datetime print(datetime.now())如果用上述代码&#xff0c;会报错&#xff1a; 以下才是正确代码&#xff1a; from datetime import datetime print(datetime.now()) 结果&#xff1a; 如果想格式化时间&#xff0c;使用代码&#xff1a; from datetime import da…...

前端面试五之vue2基础

1.属性绑定v-bind&#xff08;&#xff1a;&#xff09; v-bind 是 Vue 2 中用于动态绑定属性的核心指令&#xff0c;它支持多种语法和用法&#xff0c;能够灵活地绑定 DOM 属性、组件 prop&#xff0c;甚至动态属性名。通过 v-bind&#xff0c;可以实现数据与视图之间的高效同…...

使用python实现奔跑的线条效果

效果&#xff0c;展示&#xff08;视频效果展示&#xff09;&#xff1a; 奔跑的线条 from turtle import * import time t1Turtle() t2Turtle() t3Turtle() t1.hideturtle() t2.hideturtle() t3.hideturtle() t1.pencolor("red") t2.pencolor("green") t3…...

Oracle 审计参数:AUDIT_TRAIL 和 AUDIT_SYS_OPERATIONS

Oracle 审计参数&#xff1a;AUDIT_TRAIL 和 AUDIT_SYS_OPERATIONS 一 AUDIT_TRAIL 参数 1.1 参数功能 AUDIT_TRAIL 是 Oracle 数据库中最核心的审计控制参数&#xff0c;决定审计记录的存储位置和记录方式。 1.2 参数取值及含义 取值说明适用场景NONE禁用数据库审计测试环…...

Android LinearLayout、FrameLayout、RelativeLayout、ConstraintLayout大混战

一、为什么布局性能如此重要&#xff1f; 在Android应用中&#xff0c;布局渲染耗时直接决定了界面的流畅度。根据Google官方数据&#xff0c;超过60%的卡顿问题源于布局性能不佳。本文将彻底解析三大传统布局的性能奥秘&#xff0c;并提供可直接落地的优化方案。 二、三大布局…...