Spring boot框架下的RabbitMQ消息中间件
1. RabbitMQ 基础概念
1.1 消息处理流程与组件配合
- Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。
- Exchange(交换机) 接收到消息后,根据 Routing Key(路由键) 和 Binding(绑定规则),决定将消息发送到哪些 Queue(队列)。
- Queue(队列) 存储消息,等待 Consumer(消费者) 消费。
- Consumer(消费者) 从队列中接收并处理消息。
Producer(生产者)
作用:负责发送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。
关键点:
- Producer 只需要知道 Exchange 和 Routing Key,不关心队列。
- Producer 不直接与队列交互,消息的路由和存储由 Exchange 和 Binding 决定。
代码示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);System.out.println("Sent message: " + message);}
}
调用示例:
producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
direct-exchange:目标交换机。key1:消息的路由键。
Exchange(交换机)
作用:接收来自 Producer 的消息,并根据 Routing Key 和 Binding 的配置,决定将消息发送到哪些队列。
Exchange 通常需要手动注册为 Bean。
RabbitMQ 的 Exchange 是通过名称来标识的。
在 Spring Boot 中,您通过
@Bean方法注册 Exchange 时,实际上是将 Exchange 的名称和类型绑定到 RabbitMQ 服务器。发送消息时,RabbitMQ 客户端会根据 Exchange 的名称找到对应的 Exchange,并根据 Routing Key 将消息路由到队列。
类型:
-
Direct Exchange:精确匹配 Routing Key。消息的 Routing Key 必须与 Binding 的 Routing Key 完全一致。
-
Topic Exchange:支持通配符匹配。例如,
with("key.*")可以匹配key.1、key.2等。 -
Fanout Exchange:忽略 Routing Key,消息会被广播到所有绑定的队列。
-
Headers Exchange:忽略 Routing Key,根据消息头属性匹配。
代码示例(定义交换机):
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ExchangeConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct-exchange");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout-exchange");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic-exchange");}@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers-exchange");}
}
Queue(队列)
作用:消息的存储容器,等待消费者从中取出消息进行处理。
Queue 也需要手动注册为 Bean。Spring Boot 不会自动注册队列,因为队列的名称和属性(如是否持久化、是否排他等)需要根据业务需求进行配置。
关键点:
- 消息会保存在队列中,直到被消费。
- 队列可以是持久化的(重启 RabbitMQ 后消息仍然存在)或非持久化的。
代码示例(定义队列):
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class QueueConfig {@Beanpublic Queue demoQueue() {return new Queue("demo-queue", true); // 持久化队列}
}
Routing Key(路由键)
作用:决定消息如何从交换机路由到队列。
关键点:
- Routing Key 由 Producer 指定。
- 在 Direct 和 Topic 类型的 Exchange 中,Routing Key 决定队列是否接收消息。
Binding(绑定)
- 作用:将队列与交换机连接,并定义路由规则。
- 关键点:
- Binding 定义了队列接受消息的条件。
- 结合 Routing Key 和交换机类型,共同决定消息的路由方式。
代码示例(定义绑定):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class BindingConfig {@Beanpublic Binding binding(Queue demoQueue, DirectExchange directExchange) {return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");}
}
with("key1") 的作用是 指定 Binding 的 Routing Key。它的含义是:
-
当消息发送到 Exchange 时,Exchange 会根据消息的 Routing Key 和 Binding 的 Routing Key 进行匹配。
-
如果匹配成功,消息会被路由到对应的队列;如果匹配失败,消息会被丢弃或进入死信队列(如果有配置)。
Consumer(消费者)
作用:从队列中接收并处理消息。
关键点:
- 消费者与队列直接关联。
- 多个消费者可以监听同一队列,实现负载均衡。
代码示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class Consumer {@RabbitListener(queues = "demo-queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}
1.2 RabbitMQ 消息传输模型
点对点模型
定义:消息从生产者发送到队列,由消费者从队列中接收,消息只能被一个消费者消费。
实现:
- 使用默认交换机(空字符串
"")。 - 直接将消息发送到队列。
代码示例:
rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");
发布订阅模型
定义:生产者将消息发送到 Fanout 类型的交换机,消息会广播到所有绑定的队列。
实现:
- 不需要 Routing Key。
- 所有绑定到 Fanout 交换机的队列都会接收消息。
代码示例:
rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");
路由模型
定义:生产者将消息发送到 Direct 类型的交换机,根据 Routing Key 精确匹配队列。
实现:
- 队列通过 Binding 绑定到交换机时,指定 Routing Key。
- 消息的 Routing Key 必须与 Binding 的 Routing Key 一致。
代码示例:
rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");
2. 环境准备
2.1 安装与配置 RabbitMQ
下载 Docker
-
访问 Docker 官方网站:Docker: Accelerated Container Application Development。
-
根据您的操作系统(Windows、macOS 或 Linux)下载并安装 Docker Desktop。
启动 Docker
-
安装完成后,启动 Docker Desktop。
-
确保 Docker 正在运行(任务栏或菜单栏中可以看到 Docker 图标)。
使用 Docker 快速部署 RabbitMQ
Docker 是部署 RabbitMQ 的最简单方式。通过以下命令,您可以快速启动一个 RabbitMQ 容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
参数说明:
-
-d:以后台模式运行容器。 -
--name rabbitmq:为容器指定名称(rabbitmq)。 -
-p 5672:5672:将容器的 5672 端口映射到主机的 5672 端口(RabbitMQ 的消息通信端口)。 -
-p 15672:15672:将容器的 15672 端口映射到主机的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。 -
rabbitmq:management:使用带有管理插件的 RabbitMQ 镜像。
验证 RabbitMQ 是否运行
运行以下命令,查看容器是否正常运行:
docker ps
如果看到 rabbitmq 容器正在运行,说明 RabbitMQ 已成功启动。
2.2 使用 RabbitMQ 管理插件
RabbitMQ 提供了一个 Web 管理界面,方便您监控和管理 RabbitMQ。
访问管理界面
-
打开浏览器,访问
http://localhost:15672。 -
使用默认用户名和密码登录:
-
用户名:
guest -
密码:
guest
-
管理界面功能
-
Overview:查看 RabbitMQ 的整体状态,如连接数、队列数、消息速率等。
-
Connections:查看当前连接到 RabbitMQ 的客户端。
-
Channels:查看当前打开的通道。
-
Exchanges:查看和管理 Exchange。
-
Queues:查看和管理 Queue。
-
Admin:管理用户和权限。
2.3 用户与权限配置
默认情况下,RabbitMQ 只有一个用户 guest,密码也是 guest。为了安全性和权限管理,建议创建新用户并分配权限。
1. 创建新用户
在 RabbitMQ 管理界面中:
-
点击顶部导航栏的 Admin。
-
在用户列表下方,点击 Add a user。
-
输入用户名和密码,例如:
-
用户名:
admin -
密码:
admin123
-
-
点击 Add user 完成创建。
2. 分配权限
-
在用户列表中,找到刚创建的用户(如
admin)。 -
点击用户右侧的 Set permission。
-
在权限设置页面:
-
Virtual Host:选择
/(默认的虚拟主机)。 -
Configure:输入
.*,表示允许用户配置所有资源。 -
Write:输入
.*,表示允许用户写入所有资源。 -
Read:输入
.*,表示允许用户读取所有资源。
-
-
点击 Set permission 完成权限分配。
3. 使用新用户登录
-
退出当前用户(点击右上角的
guest,选择 Log out)。 -
使用新用户(如
admin)登录。
2.4 Spring Boot 中引入 RabbitMQ 依赖
在 pom.xml 中添加以下依赖:
<dependencies><!-- RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>
spring-boot-starter-amqp 是 Spring Boot 提供的 RabbitMQ 集成依赖,它包含了以下内容:
-
RabbitMQ 客户端库:
-
自动引入 RabbitMQ 的 Java 客户端库(
amqp-client),用于与 RabbitMQ 服务器通信。
-
-
Spring AMQP 支持:
-
提供了 Spring 对 AMQP(Advanced Message Queuing Protocol)的支持,包括
RabbitTemplate、@RabbitListener等。
-
2.5 Spring Boot 配置 RabbitMQ
在 Spring Boot 项目中,您需要在 application.properties 或 application.yml 中配置 RabbitMQ 的连接信息。
示例配置
# RabbitMQ 连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin123
配置说明:
-
spring.rabbitmq.host:RabbitMQ 服务器地址(默认localhost)。 -
spring.rabbitmq.port:RabbitMQ 消息通信端口(默认5672)。 -
spring.rabbitmq.username:RabbitMQ 用户名。 -
spring.rabbitmq.password:RabbitMQ 密码。
3. Spring Boot 集成 RabbitMQ 的消息生产和消费
3.1 消息生产者(Producer)
在 Spring Boot 中,我们使用 RabbitTemplate 来发送消息。它由 spring-boot-starter-amqp 自动配置成为一个 Bean,可直接通过 @Autowired 注入。
如果 message 不是 String 类型的处理
- Spring AMQP(
spring-boot-starter-amqp)在使用RabbitTemplate时,默认的消息转换器(MessageConverter)通常会将对象序列化为 JSON 或者将字符串消息转换为字节。 - 如果你的业务数据不是
String,常见做法是:- 在发送时把非字符串对象序列化(如转换为 JSON 字符串);
- 或者配置自定义的
MessageConverter,让 Spring 帮你把对象自动序列化/反序列化。
典型做法:手动序列化为 JSON 再发送
@Service
public class CustomObjectProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendCustomObject(String queueName, MyCustomObject obj) {// 1. 将自定义对象序列化为 JSON 字符串String jsonString = new Gson().toJson(obj);// 2. 发送 JSON 字符串到 RabbitMQrabbitTemplate.convertAndSend(queueName, jsonString);}
}
- 在消费者端,你也可以将消息(JSON 字符串)反序列化为
MyCustomObject。
配置自定义 Converter(可选)
- Spring AMQP 提供了
Jackson2JsonMessageConverter等现成转换器。
@Configuration
public class RabbitConfig {@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置 RabbitTemplate 使用该转换器@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jsonMessageConverter());return template;}
}
- 这样一来,
rabbitTemplate.convertAndSend(queueName, myObject)会自动把myObject转成 JSON 发送;消费者端则自动解析为同样的 Java 对象。
1)基本消息发送
场景:
将消息直接发送到指定的队列,跳过交换机的路由,让 RabbitMQ 把消息放到这个队列中。
核心代码示例:
@Service
public class BasicProducer {@Autowiredprivate RabbitTemplate rabbitTemplate; // 1.自动注入的 RabbitTemplate/*** 2.发送基本消息到指定的队列* @param queueName 目标队列名称* @param message 消息内容*/public void sendToQueue(String queueName, String message) {// 3.调用 convertAndSend,直接将消息放入指定队列rabbitTemplate.convertAndSend(queueName, message);System.out.println("Message sent to queue: " + queueName + ", content: " + message);}
}
代码详解:
-
@Autowiredprivate RabbitTemplate rabbitTemplate;`-
Spring Boot 自动为我们配置了
RabbitTemplate,不用手动定义 Bean。 -
通过依赖注入即可使用所有与 RabbitMQ 交互的方法。
-
-
public void sendToQueue(String queueName, String message)-
方法参数包括:
-
queueName: 目标队列的名称。 -
message: 要发送的字符串类型消息内容。
-
-
-
rabbitTemplate.convertAndSend(queueName, message)-
convertAndSend方法会将消息转换(转换为字节)并发送到指定队列。 -
如果该队列不存在,RabbitMQ 会尝试自动创建(前提是 Broker 端配置允许自动创建队列)。
-
2)发送到交换机
场景:
将消息发送到一个交换机(Exchange),再由交换机通过 Routing Key 将消息路由到匹配的队列中。
核心代码示例:
@Service
public class ExchangeProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息到指定交换机* @param exchangeName 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void sendToExchange(String exchangeName, String routingKey, String message) {// 将消息发送到 exchangeName 指定的交换机,使用 routingKey 进行路由rabbitTemplate.convertAndSend(exchangeName, routingKey, message);System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);}
}
代码详解:
-
exchangeName- 要发送到的交换机名称,例如
"direct-exchange"、"fanout-exchange"等。
- 要发送到的交换机名称,例如
-
routingKey- 路由键,用来匹配绑定(Binding)。
- 例如:对
DirectExchange而言,需要队列绑定时的路由键与发送时的路由键相同,消息才能到达队列。
-
rabbitTemplate.convertAndSend(exchangeName, routingKey, message)- 将消息先发送到交换机,再根据路由键将消息投递到目标队列。
3)发送带消息属性的消息
场景:
需要为消息设置 TTL(过期时间)或优先级等属性,控制消息在队列中的行为。
核心代码示例:
@Service
public class PropertyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送带消息属性的消息(如 TTL, 优先级)*/public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {// 1.创建 MessageProperties 对象,用于指定消息的属性MessageProperties properties = new MessageProperties();properties.setExpiration("10000"); // 过期时间:10秒 (单位:毫秒)properties.setPriority(5); // 优先级设为 5// 2.根据消息体和属性构建 Message 对象Message message = new Message(messageContent.getBytes(), properties);// 3.使用 send 方法(而非 convertAndSend)直接发送 Message 对象rabbitTemplate.send(exchange, routingKey, message);System.out.println("Message with properties sent: " + messageContent);}
}
代码详解:
-
MessageProperties properties = new MessageProperties();MessageProperties用于设置 AMQP 协议层的各种消息头信息。
-
properties.setExpiration("10000");setExpiration设置消息的 TTL(Time-To-Live),单位是毫秒。- 如果到达时间后消息仍未被消费,RabbitMQ 会将其从队列中移除并送入死信队列(如果配置了死信队列)。
-
properties.setPriority(5);- 设置消息的优先级为 5,前提是队列本身需要支持优先级队列(创建队列时指定
x-max-priority)。
- 设置消息的优先级为 5,前提是队列本身需要支持优先级队列(创建队列时指定
-
new Message(messageContent.getBytes(), properties)- 将纯文本消息转换为
Message对象,结合了消息属性和消息体。
- 将纯文本消息转换为
-
rabbitTemplate.send(exchange, routingKey, message);- 与
convertAndSend不同,它不会尝试进行消息转换(如 JSON、字符串),而是直接发送完整的 AMQPMessage对象。
- 与
Message构造函数public Message(byte[] body, MessageProperties messageProperties) {this.body = body;this.messageProperties = messageProperties; }
body:消息体的字节数组。messageProperties:AMQP 的消息属性,包括 TTL、优先级、headers 等。、如果消息体不是String类型
- 手动转换为字节:你可以先将自定义对象转换为字节数组(例如通过 JSON 序列化或 Java 序列化),再放入
new Message(...)的第一个参数。MyCustomObject obj = new MyCustomObject(); // 假设你想用 JSON String jsonString = new Gson().toJson(obj); byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);MessageProperties properties = new MessageProperties(); // 设置一些属性 Message message = new Message(body, properties);
- 为什么不会自动转 JSON?使用
new Message(...)构造方法是“纯” AMQP 层的做法,不会调用 Spring 的转换器,因此你必须自己处理序列化。- 使用
Message构造函数 时,你必须自行处理对象到byte[]的转换(无论是字符串、JSON,还是其他格式)。- 如果想让 Spring AMQP 自动转换,你通常使用
rabbitTemplate.convertAndSend(Object msg)这种高级 API,或者配置自定义MessageConverter。
3.2 消息消费者(Consumer)
消费者的核心功能是在指定的队列中监听消息,并根据配置的确认模式(自动确认或手动确认)对消息进行处理或拒绝。
1)监听队列并消费消息
核心代码示例(自动确认模式):
@Service
public class Consumer {/*** 使用注解 @RabbitListener 指定要监听的队列* 由于默认为 auto-ack 模式,* 当消息到达后,RabbitMQ 会自动确认并从队列中删除该消息。*/@RabbitListener(queues = "demo-queue")public void receiveMessage(String message) {// 1.从 queueName 队列中取到的消息内容System.out.println("Received message: " + message);// 2.在 auto-ack 模式下,无需手动 ack// 如果这里出现异常,RabbitMQ 不会再次发送消息给消费者,消息会丢失。}
}
代码详解(自动确认模式):
-
@RabbitListener(queues = "demo-queue")- 声明监听名为
demo-queue的队列。 - 一旦有新消息到达该队列,就会自动回调此方法。
- 声明监听名为
-
public void receiveMessage(String message)- 默认参数类型为字符串,当 RabbitMQ 收到消息后会尝试将其转换为
String并注入到message中。
- 默认参数类型为字符串,当 RabbitMQ 收到消息后会尝试将其转换为
-
自动确认(auto-ack)的风险
- 如果消费者在处理消息时抛出异常,消息已经被 RabbitMQ 标记为“已确认”,不会再重新发送或进入死信队列,导致消息丢失。
2)确认机制
自动确认(auto-ack)
-
行为:
-
当消费者从队列中获取消息后,RabbitMQ 会立即将该消息标记为已确认(acknowledged),并从队列中删除。
-
-
问题:
-
如果消息处理失败(例如消费者抛出异常),消息已经被确认并从队列中删除,无法重新处理。
-
如果消费者崩溃或断开连接,未处理的消息会丢失。
-
-
适用场景:
-
对消息处理的可靠性要求不高的场景。
-
手动确认(manual-ack)
-
行为:
-
消费者处理完消息后,必须显式调用
basicAck方法确认消息。 -
如果消息处理失败,可以调用
basicNack或basicReject方法拒绝消息。
-
-
优点:
-
确保消息处理的可靠性。
-
支持消息重新入队或发送到死信队列。
-
-
适用场景:
-
对消息处理的可靠性要求较高的场景。
-
核心代码示例:
@Service
public class ManualAckConsumer {/*** 在 application.properties 中配置:* spring.rabbitmq.listener.simple.acknowledge-mode=manual* 使得 RabbitMQ 使用手动确认模式*/@RabbitListener(queues = "demo-queue")public void receiveMessage(Message message, Channel channel) throws IOException {try {// 1.从消息中获取消息体String body = new String(message.getBody());System.out.println("Processing message: " + body);// 2.如果业务处理成功,则调用 basicAck 手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {System.err.println("Message processing failed: " + e.getMessage());// 3.如果处理失败,需要决定是重新入队还是拒绝并进入死信队列// requeue = true -> 重新入队// requeue = false -> 丢弃或进入死信队列(根据队列配置)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}
代码详解:
-
配置手动确认:
- 在
application.properties添加spring.rabbitmq.listener.simple.acknowledge-mode=manual - 表示 Spring AMQP 使用手动确认模式(manual-ack)。
- 在
-
public void receiveMessage(Message message, Channel channel):- 与自动确认不同,这里不仅接收字符串,还接收了
org.springframework.amqp.core.Message对象和com.rabbitmq.client.Channel。 Message:包含消息体(body)和消息属性(headers 等)。Channel:给我们提供了basicAck,basicNack,basicReject等底层 AMQP 操作。
- 与自动确认不同,这里不仅接收字符串,还接收了
-
手动确认成功:
channel.basicAck(deliveryTag, multiple):deliveryTag:本次消息的唯一标记,从message.getMessageProperties().getDeliveryTag()获取。multiple = false:只确认当前这条消息。
basicAck(long deliveryTag, boolean multiple)
- 这里的
deliveryTag并不是在你构造Message时生成的,而是 RabbitMQ Broker 在投递消息给消费者时由底层 AMQP 协议自动分配的一个递增的序号。long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
手动确认失败:
channel.basicNack(deliveryTag, multiple, requeue)或basicReject:requeue = true:将消息重新放回队列等待下一次消费(可能导致死循环,如处理一直失败)。requeue = false:拒绝消息,若配置了死信队列,则进入死信队列;否则丢弃消息。
3)处理消费失败
自动确认模式下的处理
-
在自动确认模式下,如果消息处理失败,RabbitMQ 不会重新发送消息,因为消息已经被确认并从队列中删除。
-
问题:
-
消息丢失,无法重新处理。
-
手动确认模式下的处理
- 在手动确认模式下,如果消息处理失败,可以通过以下方式处理:
-
重新入队:
-
调用
basicNack或basicReject方法,并将requeue参数设置为true。 -
消息会重新进入队列,等待下一次消费。
-
-
发送到死信队列:
-
调用
basicNack或basicReject方法,并将requeue参数设置为false。 -
如果队列配置了死信队列,消息会被发送到死信队列。
-
重试机制(Spring AMQP 提供的简单重试)(只支持手动确认机制)
是重试失败了才会将消息重新入队 ,所以重试在前,重新入队在后
# 启用重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 间隔倍数
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重试间隔
spring.rabbitmq.listener.simple.retry.max-interval=10000
-
Spring AMQP 提供了 重试机制,可以在消费者处理消息失败时,自动进行多次重试,而不是直接将消息重新入队。
行为
-
当消息处理失败时,Spring AMQP 会在 本地 进行重试(即不将消息重新入队),直到达到最大重试次数。
-
如果重试次数用尽,消息会被拒绝(
basicNack或basicReject),并根据配置决定是否重新入队或发送到死信队列。
死信队列(DLQ)
-
当消息被拒绝或过期时,RabbitMQ 会将其发送到我们配置的死信交换机(DLX),再路由到死信队列(DLQ)。
-
配置示例:
@Configuration public class RabbitConfig {@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").withArgument("x-dead-letter-exchange", "dead-letter-exchange") // 指定死信交换机.withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由键.build();}@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead-letter-exchange");}@Beanpublic Queue deadLetterQueue() {return new Queue("dead-letter-queue");}@Beanpublic Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");} } -
原理:
- 正常队列通过
x-dead-letter-exchange指定死信交换机,一旦消息被拒绝(requeue=false)或超时(TTL 到期),RabbitMQ 会把消息发送到dead-letter-exchange。 dead-letter-exchange与dead-letter-queue进行绑定(路由键dead-letter-routing-key),从而实现死信队列的存储。
- 正常队列通过
-
重新入队 vs 发送到死信队列
- 重新入队:
channel.basicNack(deliveryTag, false, true)- 适用于临时性错误,比如数据库锁冲突、网络抖动等,等待后续重新处理。
- 发送到死信队列:
channel.basicNack(deliveryTag, false, false)- 适用于永久性错误,比如消息格式无法解析,或业务逻辑指定不应再尝试。
- 重新入队:
相关文章:
Spring boot框架下的RabbitMQ消息中间件
1. RabbitMQ 基础概念 1.1 消息处理流程与组件配合 Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。Exchange(交换机) 接收到消息后,根据 Routing …...
1 行命令引发的 Go 应用崩溃
一、前言 不久前,阿里云 ARMS 团队、编译器团队、MSE 团队携手合作,共同发布并开源了 Go 语言的编译时自动插桩技术。该技术以其零侵入的特性,为 Go 应用提供了与 Java 监控能力相媲美的解决方案。开发者只需将 go build 替换为新编译命令 o…...
ScratchLLMStepByStep:训练自己的Tokenizer
1. 引言 分词器是每个大语言模型必不可少的组件,但每个大语言模型的分词器几乎都不相同。如果要训练自己的分词器,可以使用huggingface的tokenizers框架,tokenizers包含以下主要组件: Tokenizer: 分词器的核心组件,定…...
G1原理—10.如何优化G1中的FGC
大纲 1.G1的FGC可以优化的点 2.一个bug导致的FGC(Kafka发送重试 subList导致List越来越大) 3.为什么G1的FGC比ParNew CMS要更严重 4.FGC的一些参数及优化思路 1.G1的FGC可以优化的点 (1)FGC的基本原理 (2)遇到FGC应该怎么处理 (3)应该如何操作来规避FGC (4)应该如何操…...
Java基础——概念和常识(语言特点、JVM、JDK、JRE、AOT/JIT等介绍)
我是一个计算机专业研0的学生卡蒙Camel🐫🐫🐫(刚保研) 记录每天学习过程(主要学习Java、python、人工智能),总结知识点(内容来自:自我总结网上借鉴࿰…...
2025.1.16——三、supersqli 绕过|堆叠注入|handler查询法|预编译绕过法|修改原查询法
题目来源:攻防世界supersqli 目录 一、打开靶机,整理已知信息 二、sqlmap解题 step 1:爆数据库 step 2:爆表 二、手工注入解题 step 1:判断注入类型 step 2:判断字段数 step 3:查询数据…...
浅谈计算机网络03 | 现代网络组成
现代网络组成 一 、网络生态体系1.1网络生态系统的多元主体1.2 网络接入设施的多样类型 二、现代网络的典型体系结构解析三、高速网络技术3.1 以太网技术3.2 Wi-Fi技术的深度剖析3.2.1 应用场景的多元覆盖3.2.2 标准升级与性能提升 3.3 4G/5G蜂窝网的技术演进3.3.1 蜂窝技术的代…...
Red Hat8:搭建FTP服务器
目录 一、匿名FTP访问 1、新建挂载文件 2、挂载 3、关闭防火墙 4、搭建yum源 5、安装VSFTPD 6、 打开配置文件 7、设置配置文件如下几个参数 8、重启vsftpd服务 9、进入图形化界面配置网络 10、查看IP地址 11、安装ftp服务 12、遇到拒绝连接 13、测试 二、本地…...
EWM 批次管理 / Batch Management
目录 1 简介 2 业务数据 2.1 基于 PO,创建 ERP LE - Delivery 内向交货单,同时同步到 EWM 内向交货单 2.2 在 EWM 内向交货单,创建批次。EWM 批次创建的前提条件来自于物料主数据批次分类(023)。SAP 提供的标准条件…...
Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的?
Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的? 在 Java 开发中,ArrayList和LinkedList是两个常用的集合类,它们在数据结构和性能上有诸多不同,同时线程安全性也各有特点。深入理解这些差异&am…...
初学SpringBoot
目录 什么是SpringBoot 使用 Spring Boot有什么好处 Spring Boot 特点 在线构建 IntelliJ IDEA在线模板构建 IntelliJ IDEA 通maven项目构建 SpringBoot的常用配置 入口类和相关注解 定制Banner 修改banner图标 关闭banner 常规属性修改 tomcat端口号修改 常规属性…...
【网络云SRE运维开发】2025第3周-每日【2025/01/15】小测-【第14章ospf高级配置】理论和实操解析
文章目录 14.1 选择题解题思路和参考答案14.2 理论题解题思路和参考答案14.3 实操题解题思路和参考答案思科(Cisco)设备华为(Huawei)设备小米/锐捷(或其他支持标准CLI命令的设备)通过网络管理工具注意事项 …...
AWS S3 跨账户访问 Cross Account Access
进入S3对应的存储桶,上面选项选权限,存储桶策略 -- 编辑,输入对应的policy。 完全控制,包含上传删除权限,policy如下: {"Version": "2012-10-17","Statement": [{"Si…...
Ubuntu20.4和docker终端指令、安装Go环境、安装搜狗输入法、安装WPS2019:保姆级图文详解
目录 前言1、docker、node、curl版本查看终端命令1.1、查看docker版本1.2、查看node.js版本1.3、查看curl版本1.4、Ubuntu安装curl1.5、Ubuntu终端保存命令 2、安装docker-compose、Go语言2.1、安装docker-compose2.2、go语言安装步骤2.3、git版本查看 3、Ubuntu20.4安装搜狗输…...
Kotlin语言的正则表达式
Kotlin语言中的正则表达式 引言 正则表达式(Regular Expression,简称Regex)是一种用于匹配字符串中字符组合的工具。在数据处理、文本解析等领域,正则表达式以其强大的字符串处理能力得到了广泛的应用。而Kotlin作为一种现代的编…...
npm的包管理
从哪里下载包 国外有一家 IT 公司,叫做 npm,Inc.这家公司旗下有一个非常著名的网站: https://www.npmjs.com/,它是全球最大的包共享平台,你可以从这个网站上搜索到任何你需要的包,只要你有足够的耐心!到目前位置,全球约…...
深度学习在文本情感分析中的应用
引言 情感分析是自然语言处理(NLP)中的一个重要任务,旨在识别和提取文本中的主观信息。随着深度学习技术的发展,我们可以使用深度学习模型来提高情感分析的准确性和效率。本文将介绍如何使用深度学习进行文本情感分析,…...
【大模型系列篇】数字人音唇同步模型——腾讯开源MuseTalk
之前有一期我们体验了阿里开源的半身数字人项目EchoMimicV2,感兴趣的小伙伴可跳转至《AI半身数字人开箱体验——开源项目EchoMimicV2》,今天带大家来体验腾讯开源的数字人音唇同步模型MuseTalk。 MuseTalk 是一个实时高品质音频驱动的唇形同步模型&#…...
Formality:参考设计/实现设计以及顶层设计
相关阅读 Formalityhttps://blog.csdn.net/weixin_45791458/category_12841971.html?spm1001.2014.3001.5482 Formality存在两个重要的概念:参考设计/实现设计和顶层设计,本文就将对此进行详细阐述。参考设计/实现设计是中两个重要的全局概念&am…...
RPA赋能内容创作:打造小红书入门词语图片的全自动化流程
🌟 嗨,我是LucianaiB! 🌍 总有人间一两风,填我十万八千梦。 🚀 路漫漫其修远兮,吾将上下而求索。 用RPA全自动化批量生产【入门词语】图片做小红书商单,保姆级工具开发教程 最近由…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...
(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...
ios苹果系统,js 滑动屏幕、锚定无效
现象:window.addEventListener监听touch无效,划不动屏幕,但是代码逻辑都有执行到。 scrollIntoView也无效。 原因:这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作,从而会影响…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
