Spring boot框架下的RabbitMQ消息中间件
1. RabbitMQ 基础概念
1.1 消息处理流程与组件配合
- Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。
- Exchange(交换机) 接收到消息后,根据 Routing Key(路由键) 和 Binding(绑定规则),决定将消息发送到哪些 Queue(队列)。
- Queue(队列) 存储消息,等待 Consumer(消费者) 消费。
- Consumer(消费者) 从队列中接收并处理消息。
Producer(生产者)
作用:负责发送消息到 RabbitMQ 的入口,指定消息的 Exchange 和 Routing Key。
关键点:
- Producer 只需要知道 Exchange 和 Routing Key,不关心队列。
- Producer 不直接与队列交互,消息的路由和存储由 Exchange 和 Binding 决定。
代码示例:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String routingKey, String message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);System.out.println("Sent message: " + message);}
}
调用示例:
producer.sendMessage("direct-exchange", "key1", "Hello RabbitMQ");
direct-exchange
:目标交换机。key1
:消息的路由键。
Exchange(交换机)
作用:接收来自 Producer 的消息,并根据 Routing Key 和 Binding 的配置,决定将消息发送到哪些队列。
Exchange 通常需要手动注册为 Bean。
RabbitMQ 的 Exchange 是通过名称来标识的。
在 Spring Boot 中,您通过
@Bean
方法注册 Exchange 时,实际上是将 Exchange 的名称和类型绑定到 RabbitMQ 服务器。发送消息时,RabbitMQ 客户端会根据 Exchange 的名称找到对应的 Exchange,并根据 Routing Key 将消息路由到队列。
类型:
-
Direct Exchange:精确匹配 Routing Key。消息的 Routing Key 必须与 Binding 的 Routing Key 完全一致。
-
Topic Exchange:支持通配符匹配。例如,
with("key.*")
可以匹配key.1
、key.2
等。 -
Fanout Exchange:忽略 Routing Key,消息会被广播到所有绑定的队列。
-
Headers Exchange:忽略 Routing Key,根据消息头属性匹配。
代码示例(定义交换机):
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ExchangeConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("direct-exchange");}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanout-exchange");}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topic-exchange");}@Beanpublic HeadersExchange headersExchange() {return new HeadersExchange("headers-exchange");}
}
Queue(队列)
作用:消息的存储容器,等待消费者从中取出消息进行处理。
Queue 也需要手动注册为 Bean。Spring Boot 不会自动注册队列,因为队列的名称和属性(如是否持久化、是否排他等)需要根据业务需求进行配置。
关键点:
- 消息会保存在队列中,直到被消费。
- 队列可以是持久化的(重启 RabbitMQ 后消息仍然存在)或非持久化的。
代码示例(定义队列):
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class QueueConfig {@Beanpublic Queue demoQueue() {return new Queue("demo-queue", true); // 持久化队列}
}
Routing Key(路由键)
作用:决定消息如何从交换机路由到队列。
关键点:
- Routing Key 由 Producer 指定。
- 在 Direct 和 Topic 类型的 Exchange 中,Routing Key 决定队列是否接收消息。
Binding(绑定)
- 作用:将队列与交换机连接,并定义路由规则。
- 关键点:
- Binding 定义了队列接受消息的条件。
- 结合 Routing Key 和交换机类型,共同决定消息的路由方式。
代码示例(定义绑定):
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class BindingConfig {@Beanpublic Binding binding(Queue demoQueue, DirectExchange directExchange) {return BindingBuilder.bind(demoQueue).to(directExchange).with("key1");}
}
with("key1")
的作用是 指定 Binding 的 Routing Key。它的含义是:
-
当消息发送到 Exchange 时,Exchange 会根据消息的 Routing Key 和 Binding 的 Routing Key 进行匹配。
-
如果匹配成功,消息会被路由到对应的队列;如果匹配失败,消息会被丢弃或进入死信队列(如果有配置)。
Consumer(消费者)
作用:从队列中接收并处理消息。
关键点:
- 消费者与队列直接关联。
- 多个消费者可以监听同一队列,实现负载均衡。
代码示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class Consumer {@RabbitListener(queues = "demo-queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}
1.2 RabbitMQ 消息传输模型
点对点模型
定义:消息从生产者发送到队列,由消费者从队列中接收,消息只能被一个消费者消费。
实现:
- 使用默认交换机(空字符串
""
)。 - 直接将消息发送到队列。
代码示例:
rabbitTemplate.convertAndSend("", "demo-queue", "Point-to-Point Message");
发布订阅模型
定义:生产者将消息发送到 Fanout 类型的交换机,消息会广播到所有绑定的队列。
实现:
- 不需要 Routing Key。
- 所有绑定到 Fanout 交换机的队列都会接收消息。
代码示例:
rabbitTemplate.convertAndSend("fanout-exchange", "", "Fanout Message");
路由模型
定义:生产者将消息发送到 Direct 类型的交换机,根据 Routing Key 精确匹配队列。
实现:
- 队列通过 Binding 绑定到交换机时,指定 Routing Key。
- 消息的 Routing Key 必须与 Binding 的 Routing Key 一致。
代码示例:
rabbitTemplate.convertAndSend("direct-exchange", "key1", "Routing Message");
2. 环境准备
2.1 安装与配置 RabbitMQ
下载 Docker
-
访问 Docker 官方网站:Docker: Accelerated Container Application Development。
-
根据您的操作系统(Windows、macOS 或 Linux)下载并安装 Docker Desktop。
启动 Docker
-
安装完成后,启动 Docker Desktop。
-
确保 Docker 正在运行(任务栏或菜单栏中可以看到 Docker 图标)。
使用 Docker 快速部署 RabbitMQ
Docker 是部署 RabbitMQ 的最简单方式。通过以下命令,您可以快速启动一个 RabbitMQ 容器:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
参数说明:
-
-d
:以后台模式运行容器。 -
--name rabbitmq
:为容器指定名称(rabbitmq
)。 -
-p 5672:5672
:将容器的 5672 端口映射到主机的 5672 端口(RabbitMQ 的消息通信端口)。 -
-p 15672:15672
:将容器的 15672 端口映射到主机的 15672 端口(RabbitMQ 管理插件的 Web 界面端口)。 -
rabbitmq:management
:使用带有管理插件的 RabbitMQ 镜像。
验证 RabbitMQ 是否运行
运行以下命令,查看容器是否正常运行:
docker ps
如果看到 rabbitmq
容器正在运行,说明 RabbitMQ 已成功启动。
2.2 使用 RabbitMQ 管理插件
RabbitMQ 提供了一个 Web 管理界面,方便您监控和管理 RabbitMQ。
访问管理界面
-
打开浏览器,访问
http://localhost:15672
。 -
使用默认用户名和密码登录:
-
用户名:
guest
-
密码:
guest
-
管理界面功能
-
Overview:查看 RabbitMQ 的整体状态,如连接数、队列数、消息速率等。
-
Connections:查看当前连接到 RabbitMQ 的客户端。
-
Channels:查看当前打开的通道。
-
Exchanges:查看和管理 Exchange。
-
Queues:查看和管理 Queue。
-
Admin:管理用户和权限。
2.3 用户与权限配置
默认情况下,RabbitMQ 只有一个用户 guest
,密码也是 guest
。为了安全性和权限管理,建议创建新用户并分配权限。
1. 创建新用户
在 RabbitMQ 管理界面中:
-
点击顶部导航栏的 Admin。
-
在用户列表下方,点击 Add a user。
-
输入用户名和密码,例如:
-
用户名:
admin
-
密码:
admin123
-
-
点击 Add user 完成创建。
2. 分配权限
-
在用户列表中,找到刚创建的用户(如
admin
)。 -
点击用户右侧的 Set permission。
-
在权限设置页面:
-
Virtual Host:选择
/
(默认的虚拟主机)。 -
Configure:输入
.*
,表示允许用户配置所有资源。 -
Write:输入
.*
,表示允许用户写入所有资源。 -
Read:输入
.*
,表示允许用户读取所有资源。
-
-
点击 Set permission 完成权限分配。
3. 使用新用户登录
-
退出当前用户(点击右上角的
guest
,选择 Log out)。 -
使用新用户(如
admin
)登录。
2.4 Spring Boot 中引入 RabbitMQ 依赖
在 pom.xml
中添加以下依赖:
<dependencies><!-- RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>
spring-boot-starter-amqp
是 Spring Boot 提供的 RabbitMQ 集成依赖,它包含了以下内容:
-
RabbitMQ 客户端库:
-
自动引入 RabbitMQ 的 Java 客户端库(
amqp-client
),用于与 RabbitMQ 服务器通信。
-
-
Spring AMQP 支持:
-
提供了 Spring 对 AMQP(Advanced Message Queuing Protocol)的支持,包括
RabbitTemplate
、@RabbitListener
等。
-
2.5 Spring Boot 配置 RabbitMQ
在 Spring Boot 项目中,您需要在 application.properties
或 application.yml
中配置 RabbitMQ 的连接信息。
示例配置
# RabbitMQ 连接配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin123
配置说明:
-
spring.rabbitmq.host
:RabbitMQ 服务器地址(默认localhost
)。 -
spring.rabbitmq.port
:RabbitMQ 消息通信端口(默认5672
)。 -
spring.rabbitmq.username
:RabbitMQ 用户名。 -
spring.rabbitmq.password
:RabbitMQ 密码。
3. Spring Boot 集成 RabbitMQ 的消息生产和消费
3.1 消息生产者(Producer)
在 Spring Boot 中,我们使用 RabbitTemplate
来发送消息。它由 spring-boot-starter-amqp
自动配置成为一个 Bean,可直接通过 @Autowired
注入。
如果 message 不是 String 类型的处理
- Spring AMQP(
spring-boot-starter-amqp
)在使用RabbitTemplate
时,默认的消息转换器(MessageConverter
)通常会将对象序列化为 JSON 或者将字符串消息转换为字节。 - 如果你的业务数据不是
String
,常见做法是:- 在发送时把非字符串对象序列化(如转换为 JSON 字符串);
- 或者配置自定义的
MessageConverter
,让 Spring 帮你把对象自动序列化/反序列化。
典型做法:手动序列化为 JSON 再发送
@Service
public class CustomObjectProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendCustomObject(String queueName, MyCustomObject obj) {// 1. 将自定义对象序列化为 JSON 字符串String jsonString = new Gson().toJson(obj);// 2. 发送 JSON 字符串到 RabbitMQrabbitTemplate.convertAndSend(queueName, jsonString);}
}
- 在消费者端,你也可以将消息(JSON 字符串)反序列化为
MyCustomObject
。
配置自定义 Converter(可选)
- Spring AMQP 提供了
Jackson2JsonMessageConverter
等现成转换器。
@Configuration
public class RabbitConfig {@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置 RabbitTemplate 使用该转换器@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jsonMessageConverter());return template;}
}
- 这样一来,
rabbitTemplate.convertAndSend(queueName, myObject)
会自动把myObject
转成 JSON 发送;消费者端则自动解析为同样的 Java 对象。
1)基本消息发送
场景:
将消息直接发送到指定的队列,跳过交换机的路由,让 RabbitMQ 把消息放到这个队列中。
核心代码示例:
@Service
public class BasicProducer {@Autowiredprivate RabbitTemplate rabbitTemplate; // 1.自动注入的 RabbitTemplate/*** 2.发送基本消息到指定的队列* @param queueName 目标队列名称* @param message 消息内容*/public void sendToQueue(String queueName, String message) {// 3.调用 convertAndSend,直接将消息放入指定队列rabbitTemplate.convertAndSend(queueName, message);System.out.println("Message sent to queue: " + queueName + ", content: " + message);}
}
代码详解:
-
@Autowired
private RabbitTemplate rabbitTemplate;`-
Spring Boot 自动为我们配置了
RabbitTemplate
,不用手动定义 Bean。 -
通过依赖注入即可使用所有与 RabbitMQ 交互的方法。
-
-
public void sendToQueue(String queueName, String message)
-
方法参数包括:
-
queueName
: 目标队列的名称。 -
message
: 要发送的字符串类型消息内容。
-
-
-
rabbitTemplate.convertAndSend(queueName, message)
-
convertAndSend
方法会将消息转换(转换为字节)并发送到指定队列。 -
如果该队列不存在,RabbitMQ 会尝试自动创建(前提是 Broker 端配置允许自动创建队列)。
-
2)发送到交换机
场景:
将消息发送到一个交换机(Exchange),再由交换机通过 Routing Key
将消息路由到匹配的队列中。
核心代码示例:
@Service
public class ExchangeProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送消息到指定交换机* @param exchangeName 交换机名称* @param routingKey 路由键* @param message 消息内容*/public void sendToExchange(String exchangeName, String routingKey, String message) {// 将消息发送到 exchangeName 指定的交换机,使用 routingKey 进行路由rabbitTemplate.convertAndSend(exchangeName, routingKey, message);System.out.println("Message sent to exchange: " + exchangeName + " with routingKey: " + routingKey);}
}
代码详解:
-
exchangeName
- 要发送到的交换机名称,例如
"direct-exchange"
、"fanout-exchange"
等。
- 要发送到的交换机名称,例如
-
routingKey
- 路由键,用来匹配绑定(Binding)。
- 例如:对
DirectExchange
而言,需要队列绑定时的路由键与发送时的路由键相同,消息才能到达队列。
-
rabbitTemplate.convertAndSend(exchangeName, routingKey, message)
- 将消息先发送到交换机,再根据路由键将消息投递到目标队列。
3)发送带消息属性的消息
场景:
需要为消息设置 TTL(过期时间)或优先级等属性,控制消息在队列中的行为。
核心代码示例:
@Service
public class PropertyProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送带消息属性的消息(如 TTL, 优先级)*/public void sendMessageWithProperties(String exchange, String routingKey, String messageContent) {// 1.创建 MessageProperties 对象,用于指定消息的属性MessageProperties properties = new MessageProperties();properties.setExpiration("10000"); // 过期时间:10秒 (单位:毫秒)properties.setPriority(5); // 优先级设为 5// 2.根据消息体和属性构建 Message 对象Message message = new Message(messageContent.getBytes(), properties);// 3.使用 send 方法(而非 convertAndSend)直接发送 Message 对象rabbitTemplate.send(exchange, routingKey, message);System.out.println("Message with properties sent: " + messageContent);}
}
代码详解:
-
MessageProperties properties = new MessageProperties();
MessageProperties
用于设置 AMQP 协议层的各种消息头信息。
-
properties.setExpiration("10000");
setExpiration
设置消息的 TTL(Time-To-Live),单位是毫秒。- 如果到达时间后消息仍未被消费,RabbitMQ 会将其从队列中移除并送入死信队列(如果配置了死信队列)。
-
properties.setPriority(5);
- 设置消息的优先级为 5,前提是队列本身需要支持优先级队列(创建队列时指定
x-max-priority
)。
- 设置消息的优先级为 5,前提是队列本身需要支持优先级队列(创建队列时指定
-
new Message(messageContent.getBytes(), properties)
- 将纯文本消息转换为
Message
对象,结合了消息属性和消息体。
- 将纯文本消息转换为
-
rabbitTemplate.send(exchange, routingKey, message);
- 与
convertAndSend
不同,它不会尝试进行消息转换(如 JSON、字符串),而是直接发送完整的 AMQPMessage
对象。
- 与
Message
构造函数public Message(byte[] body, MessageProperties messageProperties) {this.body = body;this.messageProperties = messageProperties; }
body
:消息体的字节数组。messageProperties
:AMQP 的消息属性,包括 TTL、优先级、headers 等。、如果消息体不是String类型
- 手动转换为字节:你可以先将自定义对象转换为字节数组(例如通过 JSON 序列化或 Java 序列化),再放入
new Message(...)
的第一个参数。MyCustomObject obj = new MyCustomObject(); // 假设你想用 JSON String jsonString = new Gson().toJson(obj); byte[] body = jsonString.getBytes(StandardCharsets.UTF_8);MessageProperties properties = new MessageProperties(); // 设置一些属性 Message message = new Message(body, properties);
- 为什么不会自动转 JSON?使用
new Message(...)
构造方法是“纯” AMQP 层的做法,不会调用 Spring 的转换器,因此你必须自己处理序列化。- 使用
Message
构造函数 时,你必须自行处理对象到byte[]
的转换(无论是字符串、JSON,还是其他格式)。- 如果想让 Spring AMQP 自动转换,你通常使用
rabbitTemplate.convertAndSend(Object msg)
这种高级 API,或者配置自定义MessageConverter
。
3.2 消息消费者(Consumer)
消费者的核心功能是在指定的队列中监听消息,并根据配置的确认模式(自动确认或手动确认)对消息进行处理或拒绝。
1)监听队列并消费消息
核心代码示例(自动确认模式):
@Service
public class Consumer {/*** 使用注解 @RabbitListener 指定要监听的队列* 由于默认为 auto-ack 模式,* 当消息到达后,RabbitMQ 会自动确认并从队列中删除该消息。*/@RabbitListener(queues = "demo-queue")public void receiveMessage(String message) {// 1.从 queueName 队列中取到的消息内容System.out.println("Received message: " + message);// 2.在 auto-ack 模式下,无需手动 ack// 如果这里出现异常,RabbitMQ 不会再次发送消息给消费者,消息会丢失。}
}
代码详解(自动确认模式):
-
@RabbitListener(queues = "demo-queue")
- 声明监听名为
demo-queue
的队列。 - 一旦有新消息到达该队列,就会自动回调此方法。
- 声明监听名为
-
public void receiveMessage(String message)
- 默认参数类型为字符串,当 RabbitMQ 收到消息后会尝试将其转换为
String
并注入到message
中。
- 默认参数类型为字符串,当 RabbitMQ 收到消息后会尝试将其转换为
-
自动确认(auto-ack)的风险
- 如果消费者在处理消息时抛出异常,消息已经被 RabbitMQ 标记为“已确认”,不会再重新发送或进入死信队列,导致消息丢失。
2)确认机制
自动确认(auto-ack)
-
行为:
-
当消费者从队列中获取消息后,RabbitMQ 会立即将该消息标记为已确认(acknowledged),并从队列中删除。
-
-
问题:
-
如果消息处理失败(例如消费者抛出异常),消息已经被确认并从队列中删除,无法重新处理。
-
如果消费者崩溃或断开连接,未处理的消息会丢失。
-
-
适用场景:
-
对消息处理的可靠性要求不高的场景。
-
手动确认(manual-ack)
-
行为:
-
消费者处理完消息后,必须显式调用
basicAck
方法确认消息。 -
如果消息处理失败,可以调用
basicNack
或basicReject
方法拒绝消息。
-
-
优点:
-
确保消息处理的可靠性。
-
支持消息重新入队或发送到死信队列。
-
-
适用场景:
-
对消息处理的可靠性要求较高的场景。
-
核心代码示例:
@Service
public class ManualAckConsumer {/*** 在 application.properties 中配置:* spring.rabbitmq.listener.simple.acknowledge-mode=manual* 使得 RabbitMQ 使用手动确认模式*/@RabbitListener(queues = "demo-queue")public void receiveMessage(Message message, Channel channel) throws IOException {try {// 1.从消息中获取消息体String body = new String(message.getBody());System.out.println("Processing message: " + body);// 2.如果业务处理成功,则调用 basicAck 手动确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {System.err.println("Message processing failed: " + e.getMessage());// 3.如果处理失败,需要决定是重新入队还是拒绝并进入死信队列// requeue = true -> 重新入队// requeue = false -> 丢弃或进入死信队列(根据队列配置)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);}}
}
代码详解:
-
配置手动确认:
- 在
application.properties
添加spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 表示 Spring AMQP 使用手动确认模式(manual-ack)。
- 在
-
public void receiveMessage(Message message, Channel channel)
:- 与自动确认不同,这里不仅接收字符串,还接收了
org.springframework.amqp.core.Message
对象和com.rabbitmq.client.Channel
。 Message
:包含消息体(body)和消息属性(headers 等)。Channel
:给我们提供了basicAck
,basicNack
,basicReject
等底层 AMQP 操作。
- 与自动确认不同,这里不仅接收字符串,还接收了
-
手动确认成功:
channel.basicAck(deliveryTag, multiple)
:deliveryTag
:本次消息的唯一标记,从message.getMessageProperties().getDeliveryTag()
获取。multiple = false
:只确认当前这条消息。
basicAck(long deliveryTag, boolean multiple)
- 这里的
deliveryTag
并不是在你构造Message
时生成的,而是 RabbitMQ Broker 在投递消息给消费者时由底层 AMQP 协议自动分配的一个递增的序号。long deliveryTag = message.getMessageProperties().getDeliveryTag();
-
手动确认失败:
channel.basicNack(deliveryTag, multiple, requeue)
或basicReject
:requeue = true
:将消息重新放回队列等待下一次消费(可能导致死循环,如处理一直失败)。requeue = false
:拒绝消息,若配置了死信队列,则进入死信队列;否则丢弃消息。
3)处理消费失败
自动确认模式下的处理
-
在自动确认模式下,如果消息处理失败,RabbitMQ 不会重新发送消息,因为消息已经被确认并从队列中删除。
-
问题:
-
消息丢失,无法重新处理。
-
手动确认模式下的处理
- 在手动确认模式下,如果消息处理失败,可以通过以下方式处理:
-
重新入队:
-
调用
basicNack
或basicReject
方法,并将requeue
参数设置为true
。 -
消息会重新进入队列,等待下一次消费。
-
-
发送到死信队列:
-
调用
basicNack
或basicReject
方法,并将requeue
参数设置为false
。 -
如果队列配置了死信队列,消息会被发送到死信队列。
-
重试机制(Spring AMQP 提供的简单重试)(只支持手动确认机制)
是重试失败了才会将消息重新入队 ,所以重试在前,重新入队在后
# 启用重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 间隔倍数
spring.rabbitmq.listener.simple.retry.multiplier=2.0
# 最大重试间隔
spring.rabbitmq.listener.simple.retry.max-interval=10000
-
Spring AMQP 提供了 重试机制,可以在消费者处理消息失败时,自动进行多次重试,而不是直接将消息重新入队。
行为
-
当消息处理失败时,Spring AMQP 会在 本地 进行重试(即不将消息重新入队),直到达到最大重试次数。
-
如果重试次数用尽,消息会被拒绝(
basicNack
或basicReject
),并根据配置决定是否重新入队或发送到死信队列。
死信队列(DLQ)
-
当消息被拒绝或过期时,RabbitMQ 会将其发送到我们配置的死信交换机(DLX),再路由到死信队列(DLQ)。
-
配置示例:
@Configuration public class RabbitConfig {@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal-queue").withArgument("x-dead-letter-exchange", "dead-letter-exchange") // 指定死信交换机.withArgument("x-dead-letter-routing-key", "dead-letter-routing-key") // 指定死信路由键.build();}@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead-letter-exchange");}@Beanpublic Queue deadLetterQueue() {return new Queue("dead-letter-queue");}@Beanpublic Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");} }
-
原理:
- 正常队列通过
x-dead-letter-exchange
指定死信交换机,一旦消息被拒绝(requeue=false
)或超时(TTL 到期),RabbitMQ 会把消息发送到dead-letter-exchange
。 dead-letter-exchange
与dead-letter-queue
进行绑定(路由键dead-letter-routing-key
),从而实现死信队列的存储。
- 正常队列通过
-
重新入队 vs 发送到死信队列
- 重新入队:
channel.basicNack(deliveryTag, false, true)
- 适用于临时性错误,比如数据库锁冲突、网络抖动等,等待后续重新处理。
- 发送到死信队列:
channel.basicNack(deliveryTag, false, false)
- 适用于永久性错误,比如消息格式无法解析,或业务逻辑指定不应再尝试。
- 重新入队:
相关文章:
Spring boot框架下的RabbitMQ消息中间件
1. RabbitMQ 基础概念 1.1 消息处理流程与组件配合 Producer(生产者) 发送消息。消息先发送到 Exchange(交换机),而不是直接到队列。Exchange(交换机) 接收到消息后,根据 Routing …...

1 行命令引发的 Go 应用崩溃
一、前言 不久前,阿里云 ARMS 团队、编译器团队、MSE 团队携手合作,共同发布并开源了 Go 语言的编译时自动插桩技术。该技术以其零侵入的特性,为 Go 应用提供了与 Java 监控能力相媲美的解决方案。开发者只需将 go build 替换为新编译命令 o…...

ScratchLLMStepByStep:训练自己的Tokenizer
1. 引言 分词器是每个大语言模型必不可少的组件,但每个大语言模型的分词器几乎都不相同。如果要训练自己的分词器,可以使用huggingface的tokenizers框架,tokenizers包含以下主要组件: Tokenizer: 分词器的核心组件,定…...

G1原理—10.如何优化G1中的FGC
大纲 1.G1的FGC可以优化的点 2.一个bug导致的FGC(Kafka发送重试 subList导致List越来越大) 3.为什么G1的FGC比ParNew CMS要更严重 4.FGC的一些参数及优化思路 1.G1的FGC可以优化的点 (1)FGC的基本原理 (2)遇到FGC应该怎么处理 (3)应该如何操作来规避FGC (4)应该如何操…...

Java基础——概念和常识(语言特点、JVM、JDK、JRE、AOT/JIT等介绍)
我是一个计算机专业研0的学生卡蒙Camel🐫🐫🐫(刚保研) 记录每天学习过程(主要学习Java、python、人工智能),总结知识点(内容来自:自我总结网上借鉴࿰…...

2025.1.16——三、supersqli 绕过|堆叠注入|handler查询法|预编译绕过法|修改原查询法
题目来源:攻防世界supersqli 目录 一、打开靶机,整理已知信息 二、sqlmap解题 step 1:爆数据库 step 2:爆表 二、手工注入解题 step 1:判断注入类型 step 2:判断字段数 step 3:查询数据…...

浅谈计算机网络03 | 现代网络组成
现代网络组成 一 、网络生态体系1.1网络生态系统的多元主体1.2 网络接入设施的多样类型 二、现代网络的典型体系结构解析三、高速网络技术3.1 以太网技术3.2 Wi-Fi技术的深度剖析3.2.1 应用场景的多元覆盖3.2.2 标准升级与性能提升 3.3 4G/5G蜂窝网的技术演进3.3.1 蜂窝技术的代…...

Red Hat8:搭建FTP服务器
目录 一、匿名FTP访问 1、新建挂载文件 2、挂载 3、关闭防火墙 4、搭建yum源 5、安装VSFTPD 6、 打开配置文件 7、设置配置文件如下几个参数 8、重启vsftpd服务 9、进入图形化界面配置网络 10、查看IP地址 11、安装ftp服务 12、遇到拒绝连接 13、测试 二、本地…...

EWM 批次管理 / Batch Management
目录 1 简介 2 业务数据 2.1 基于 PO,创建 ERP LE - Delivery 内向交货单,同时同步到 EWM 内向交货单 2.2 在 EWM 内向交货单,创建批次。EWM 批次创建的前提条件来自于物料主数据批次分类(023)。SAP 提供的标准条件…...
Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的?
Java 面试题 - ArrayList 和 LinkedList 的区别,哪个集合是线程安全的? 在 Java 开发中,ArrayList和LinkedList是两个常用的集合类,它们在数据结构和性能上有诸多不同,同时线程安全性也各有特点。深入理解这些差异&am…...

初学SpringBoot
目录 什么是SpringBoot 使用 Spring Boot有什么好处 Spring Boot 特点 在线构建 IntelliJ IDEA在线模板构建 IntelliJ IDEA 通maven项目构建 SpringBoot的常用配置 入口类和相关注解 定制Banner 修改banner图标 关闭banner 常规属性修改 tomcat端口号修改 常规属性…...
【网络云SRE运维开发】2025第3周-每日【2025/01/15】小测-【第14章ospf高级配置】理论和实操解析
文章目录 14.1 选择题解题思路和参考答案14.2 理论题解题思路和参考答案14.3 实操题解题思路和参考答案思科(Cisco)设备华为(Huawei)设备小米/锐捷(或其他支持标准CLI命令的设备)通过网络管理工具注意事项 …...
AWS S3 跨账户访问 Cross Account Access
进入S3对应的存储桶,上面选项选权限,存储桶策略 -- 编辑,输入对应的policy。 完全控制,包含上传删除权限,policy如下: {"Version": "2012-10-17","Statement": [{"Si…...
Ubuntu20.4和docker终端指令、安装Go环境、安装搜狗输入法、安装WPS2019:保姆级图文详解
目录 前言1、docker、node、curl版本查看终端命令1.1、查看docker版本1.2、查看node.js版本1.3、查看curl版本1.4、Ubuntu安装curl1.5、Ubuntu终端保存命令 2、安装docker-compose、Go语言2.1、安装docker-compose2.2、go语言安装步骤2.3、git版本查看 3、Ubuntu20.4安装搜狗输…...
Kotlin语言的正则表达式
Kotlin语言中的正则表达式 引言 正则表达式(Regular Expression,简称Regex)是一种用于匹配字符串中字符组合的工具。在数据处理、文本解析等领域,正则表达式以其强大的字符串处理能力得到了广泛的应用。而Kotlin作为一种现代的编…...

npm的包管理
从哪里下载包 国外有一家 IT 公司,叫做 npm,Inc.这家公司旗下有一个非常著名的网站: https://www.npmjs.com/,它是全球最大的包共享平台,你可以从这个网站上搜索到任何你需要的包,只要你有足够的耐心!到目前位置,全球约…...
深度学习在文本情感分析中的应用
引言 情感分析是自然语言处理(NLP)中的一个重要任务,旨在识别和提取文本中的主观信息。随着深度学习技术的发展,我们可以使用深度学习模型来提高情感分析的准确性和效率。本文将介绍如何使用深度学习进行文本情感分析,…...

【大模型系列篇】数字人音唇同步模型——腾讯开源MuseTalk
之前有一期我们体验了阿里开源的半身数字人项目EchoMimicV2,感兴趣的小伙伴可跳转至《AI半身数字人开箱体验——开源项目EchoMimicV2》,今天带大家来体验腾讯开源的数字人音唇同步模型MuseTalk。 MuseTalk 是一个实时高品质音频驱动的唇形同步模型&#…...

Formality:参考设计/实现设计以及顶层设计
相关阅读 Formalityhttps://blog.csdn.net/weixin_45791458/category_12841971.html?spm1001.2014.3001.5482 Formality存在两个重要的概念:参考设计/实现设计和顶层设计,本文就将对此进行详细阐述。参考设计/实现设计是中两个重要的全局概念&am…...

RPA赋能内容创作:打造小红书入门词语图片的全自动化流程
🌟 嗨,我是LucianaiB! 🌍 总有人间一两风,填我十万八千梦。 🚀 路漫漫其修远兮,吾将上下而求索。 用RPA全自动化批量生产【入门词语】图片做小红书商单,保姆级工具开发教程 最近由…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...

Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...

Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...

DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...

springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...

【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...