深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
文章目录
- 文章导图
- RabbitMQ架构及相关概念
- 四大核心概念
- 名词解读
- 七大工作模式及四大交换机类型
- 0、前置了解-默认交换机DirectExchange
- 1、简单模式(Simple Queue)-默认DirectExchange
- 2、 工作队列模式(Work Queues)-默认DirectExchange
- 3、发布/订阅模式(Publish/Subscribe)-FanoutExchange
- 4、路由模式(Routing)-自定义DirectExchange
- 5、主题模式(Topics)-TopicExchange
- 总结
- 三种队列类型
- 普通队列
- 死信队列(Dead Letter Queue, DLQ)
- 定义
- 触发条件
- 应用场景
- 配置
- 延迟队列(Delayed Queue)
- 定义
- 实现方式
- 应用场景
- 两者区别
- 代码实战
- 1. 延迟队列:TTL+DLX死信队列
- 配置步骤
- 2. 延迟队列:RabbitMQ延迟消息插件
- 配置步骤
- 3、死信队列: basic.reject或basic.nack
- 1. 引入依赖
- 2. 配置交换机、队列和死信队列
- 3. 生产者发送消息
- 4. 消费者监听并拒绝消息
- 5. 消费者监听死信队列
- 总结
RabbitMQ系列文章 |
---|
深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念 |
TODO:RabbitMQ可靠性 |
TODO:RabbtiMQ顺序性 |
TODO:RabbitMQ常见问题整理 |
文章导图
RabbitMQ架构及相关概念
四大核心概念
生产者
产生数据发送消息的程序是生产者。
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定 。
队列
队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者
名词解读
- Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
- Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
- Connection:publisher/consumer 和 broker 之间的 TCP 连接 Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的
- channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
- Queue:消息最终被送到这里等待 consumer 取走 Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。
- Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
七大工作模式及四大交换机类型
网上查了很多资料有的说是五种,有的说是四种,可以看到在RabbitMQ在官网提到的共有7种工作模式:https://www.rabbitmq.com/tutorials
第6种是RPC调用,这个我们正常肯定不用这个实现RPC,而第7种是confirm 确认模式,可以用于保证生产者消息发送的可靠性,这个我后面会再专门介绍。
现在我们主要讲前5种工作模式,实际上总结来说5种又可以总结为是3种,其实第1、2、4根据他们都是Direct交换机可以归结为一种,下文我会详细讲解一下。
0、前置了解-默认交换机DirectExchange
RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同。
如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。 默认交换机可以通过设置routing_key来指定消息的目的地,例如:
// 将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")
但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。
1、简单模式(Simple Queue)-默认DirectExchange
这个和别的几种模式对比看着没有X,这个其实用了默认的交换机,我们需要提供一个生产者一个队列以及一个消费者。消息传播图如下:
//Config
@Bean
Queue queue1() {return new Queue("simpleQueue");
}// 生产者
@Autowired
private RabbitTemplate rabbitTemplate;public void sendSimpleMessage(String message) {rabbitTemplate.convertAndSend("simpleQueue", message);
}// 消费者
@RabbitListener(queues = "simpleQueue")
public void receiveSimpleMessage(String message) {System.out.println("Received: " + message);
}
这个时候使用的其实是默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key
相同的 Queue 上,例如消息队列名为 “simpleQueue”,则 routingkey 为 “simpleQueue” 的消息会被该消息队列接收。
具体可以看下源码发送convertAndSend
:
/** Alias for amq.direct default exchange. */
private static final String DEFAULT_EXCHANGE = "";private static final String DEFAULT_ROUTING_KEY = "";private volatile String exchange = DEFAULT_EXCHANGE;
private volatile String routingKey = DEFAULT_ROUTING_KEY;@Override
public void convertAndSend(String routingKey, final Object object) throws AmqpException {//可以发现这个this.exchange就是DEFAULT_EXCHANGE = ""convertAndSend(this.exchange, routingKey, object, (CorrelationData) null);
}
2、 工作队列模式(Work Queues)-默认DirectExchange
这种情况是这样的:
一个生产者,也是一个默认的交换机(DirectExchange),一个队列,两个消费者。
一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要消费某一条消息。
和第一种对比主要体现在有多个消费者进行消费,主要优势在于可以通过增加消费者来提高处理能力。
//Config
@Bean
Queue queue1() {return new Queue("workQueue");
}// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendWorkMessage(String message) {rabbitTemplate.convertAndSend("workQueue", message);
}// Consumer
@RabbitListener(queues = "workQueue")
public void receiveWorkMessage(String message) {System.out.println("Received: " + message);// Simulate workThread.sleep(1000);
}
3、发布/订阅模式(Publish/Subscribe)-FanoutExchange
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用,FanoutExchange 配置方式如下:
在这里首先创建 FanoutExchange,参数含义与创建 DirectExchange 参数含义一致,然后创建两个 Queue,再将这两个 Queue 都绑定到 FanoutExchange 上。
//Config
@Bean
public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");
}@Bean
public Queue fanoutQueue1() {return new Queue("fanoutQueue1");
}@Bean
public Queue fanoutQueue2() {return new Queue("fanoutQueue2");
}@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
接下来创建两个消费者,两个消费者分别消费两个消息队列中的消息,然后在单元测试中发送消息,如下:
// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendFanoutMessage(String message) {rabbitTemplate.convertAndSend("fanoutExchange", null, message);
}// Consumer
@RabbitListener(queues = "fanoutQueue1")
public void receiveFanoutMessage1(String message) {System.out.println("Queue1 Received: " + message);
}@RabbitListener(queues = "fanoutQueue2")
public void receiveFanoutMessage2(String message) {System.out.println("Queue2 Received: " + message);
}
注意这里发送消息时不需要 routingkey
,指定 exchange
即可,routingkey
可以直接传一个 null
。
4、路由模式(Routing)-自定义DirectExchange
DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “directQueue1”,则 routingkey 为 “directQueue1” 的消息会被该消息队列接收。
// Config
@Bean
public DirectExchange directExchange() {return new DirectExchange("directExchange");
}@Bean
public Queue directQueue1() {return new Queue("directQueue1");
}@Bean
public Queue directQueue2() {return new Queue("directQueue2");
}@Bean
public Binding directBinding1(DirectExchange directExchange, Queue directQueue1) {return BindingBuilder.bind(directQueue1).to(directExchange).with("info");
}@Bean
public Binding directBinding2(DirectExchange directExchange, Queue directQueue2) {return BindingBuilder.bind(directQueue2).to(directExchange).with("error");
}
可以发现我们可以根据routingKey控制发送到哪个队列上,这个本质上和我们前面2种模式都是一样的,采用的都是DirectExchange,只不过前面2种的交换机DirectExchange是""默认值,现在我们这里是指定了自己的DirectExchange
// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendDirectMessage(String message, String routingKey) {rabbitTemplate.convertAndSend("directExchange", routingKey, message);
}// Consumer
@RabbitListener(queues = "directQueue1")
public void receiveDirectMessage1(String message) {System.out.println("Queue1 Received: " + message);
}@RabbitListener(queues = "directQueue2")
public void receiveDirectMessage2(String message) {System.out.println("Queue2 Received: " + message);
}
特别注意:如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
5、主题模式(Topics)-TopicExchange
在 RabbitMQ 的主题模式(Topics)中,消息通过带有路由键的主题交换机(
TopicExchange
)进行路由。消息的路由键是一个点分隔的字符串,消费者可以使用绑定键(带有通配符)来订阅感兴趣的消息。
- 队列
topicQueue1
使用绑定键*.orange.*
,匹配任意第一个和第三个单词,以orange
为第二个单词的消息。 - 队列
topicQueue2
使用绑定键*.*.rabbit
,匹配任意前两个单词,以rabbit
为第三个单词的消息。
// Config
@Bean
public TopicExchange topicExchange() {return new TopicExchange("topicExchange");
}@Bean
public Queue topicQueue1() {return new Queue("topicQueue1");
}@Bean
public Queue topicQueue2() {return new Queue("topicQueue2");
}@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.orange.*");
}@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.rabbit");
}
topicQueue1
和topicQueue2
接收匹配其绑定键的消息。- 灵活路由: 主题模式允许通过复杂的路由键实现灵活的消息路由。
- 使用场景: 适用于需要按模式匹配路由消息的场景,比如日志分级、区域性数据分发等。
// Producer
@Autowired
private RabbitTemplate rabbitTemplate;public void sendTopicMessage(String message, String routingKey) {rabbitTemplate.convertAndSend("topicExchange", routingKey, message);
}// Consumer
@RabbitListener(queues = "topicQueue1")
public void receiveTopicMessage1(String message) {System.out.println("Queue1 Received: " + message);
}@RabbitListener(queues = "topicQueue2")
public void receiveTopicMessage2(String message) {System.out.println("Queue2 Received: " + message);
}
总结
看了上面的5个例子,其实本质上我们可以根据Exchange交换机类型归结为3种工作模式Direct、Fanout、Topic
- Direct:定向,把消息交给符合指定routing key 的队列 (第1、2、4其实都是这种交换机)
- Fanout:广播,将消息交给所有绑定到交换机的队列 第**(第3种模式)**
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列**(第5种模式)**
这里提一下,交换机还有一种类型,Headers:头匹配,基于MQ的消息头匹配,不过这种用的非常少,可以忽略!
不难发现,这三种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,三种类别对应着三种判断角度。
- direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;
- fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;
- topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
三种队列类型
普通队列
我们平常发送的正常都是普通队列,比如上面5种工作模式说的都是普通队列,就不多说了
死信队列(Dead Letter Queue, DLQ)
特别注意:
- 队列和消息都有个TTL生存时间,队列的TTL到达后队列会自动删除,消息不会进入死信队列;
- 消息的生存时间到达后会进入死信队列。消息的生存时间可以在队列设置所有消息的TTL,也可以对某个消息单独设置TTL。
定义
死信队列是用于处理无法被消费者正确处理的消息的队列。当消息在原始队列中无法被消费时,会被转移到死信队列中。
触发条件
消息会变成死信并进入死信队列的几种情况:
- 消息被消费者拒绝(通过
basic.reject
或basic.nack
),并且requeue=false
。 - 消息在队列中超过了TTL(Time To Live)时间。
- 队列达到最大长度限制,无法再接收新消息。
应用场景
- 处理无法被消费的消息,避免消息堆积影响其他消息的消费。
- 记录和监控消息处理错误,方便进行后续处理
配置
- 通过设置
x-dead-letter-exchange
和x-dead-letter-routing-key
将消息路由到死信队列。 - 在原始队列中设置死信交换机和死信队列的相关参数
延迟队列(Delayed Queue)
定义
延迟队列是一种特殊的队列,消息在发送到队列后,需要等待一段时间后才能被消费。
实现方式
-
通过死信队列实现延迟任务:
把死信队列就当成延迟队列,具体来说是这样:
假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信
routing_key
,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。- 将消息发送到一个没有消费者的队列,设置消息的TTL。
- 消息过期后进入死信队列,再由死信队列的消费者处理。
-
通过RabbitMQ延迟插件:
- 使用RabbitMQ的延迟插件(
rabbitmq_delayed_message_exchange
插件),消息在延迟一段时间后再投递到目标队列中进行消费。
- 使用RabbitMQ的延迟插件(
应用场景
- 订单超时未支付自动取消。
- 用户注册后未登录的提醒。
- 预定会议前的通知
两者区别
使用TTL和死信队列实现延迟插件其实是会有一些问题的:
- 问题一:当我们的业务比较复杂的时候, 需要针对不同的业务消息类型设置不同的过期时间策略, 必然我们也需要为不同的队列消息的过期时间创建很多的Queue的Bean对象, 当业务复杂到一定程度时, 这种方式维护成本过高;
- 问题二:就是队列的先进先出原则导致的问题,当先进入队列的消息的过期时间比后进入消息中的过期时间长的时候,消息是串行被消费的,所以必然是等到先进入队列的消息的过期时间结束, 后进入队列的消息的过期时间才会被监听,然而实际上这个消息早就过期了,这就导致了本来过期时间为3秒的消息,实际上过了13秒才会被处理,这在实际应用场景中肯定是不被允许的;
延迟交换机插件可以在一定程度上解决上述两种问题。
特性 | 死信队列 | 延迟队列 |
---|---|---|
定义 | 处理无法被消费的消息 | 消息在指定时间后才被消费 |
触发条件 | 消息被拒绝、消息过期、队列满 | 消息设置了TTL或使用延迟插件 |
应用场景 | 处理消费失败的消息,避免队列堵塞 | 订单超时取消、提醒通知等延迟处理场景 |
实现方式 | 配置死信交换机和死信队列 | 使用TTL和死信队列或延迟插件 |
消息处理 | 进入死信队列后进行特殊处理 | 延迟一段时间后再投递到目标队列 |
代码实战
1. 延迟队列:TTL+DLX死信队列
配置步骤
1、引入依赖
在pom.xml
中引入Spring Boot和RabbitMQ的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、配置交换机和队列
在Spring Boot的配置类中,配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机:
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal_exchange");}// 死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead_letter_exchange");}// 普通队列并绑定到普通交换机@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal_queue").withArgument("x-dead-letter-exchange", "dead_letter_exchange").withArgument("x-dead-letter-routing-key", "dead_letter_routing_key").build();}@Beanpublic Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");}// 死信队列并绑定到死信交换机@Beanpublic Queue deadLetterQueue() {return new Queue("dead_letter_queue");}@Beanpublic Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");}
}
3、生产者发送消息
在生产者发送消息时,可以指定消息的TTL(Time-To-Live)。TTL到期后,消息会被转发到死信队列:
- 创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间
java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send(@RequestParam String message, @RequestParam int delay) {rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message, msg -> {//创建了一个匿名内部类实现了MessagePostProcessor接口,并重写了postProcessMessage()方法。在该方法中,设置了消息的延迟时间为传进来的delay延迟时间msg.getMessageProperties().setExpiration(String.valueOf(delay));return msg;});return "Message sent with delay: " + delay;}
}
4、消费者监听死信队列
消费者监听死信队列,接收到消息后处理:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "dead_letter_queue")public void receiveMessage(String message) {System.out.println("Received message: " + message);}
}
2. 延迟队列:RabbitMQ延迟消息插件
RabbitMQ有一个插件 rabbitmq-delayed-message-exchange
可以直接支持延迟消息队列。
配置步骤
1、安装RabbitMQ延迟消息插件
首先,确保RabbitMQ服务器上已安装rabbitmq-delayed-message-exchange
插件。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange/21、**配置交换机和队列**
2、在Spring Boot中配置使用延迟消息交换机:
- 使用CustomExchange类创建一个自定义交换机对象。CustomExchange是Spring AMQP库提供的一个类,用于创建自定义的交换机。构造方法的参数依次为交换机的名称、类型、是否持久化、是否自动删除和属性。
- 交换机的名称为DELAYED_EXCHANGE,类型为"x-delayed-message",持久化为true,自动删除为false,属性为之前创建的HashMap对象。
java复制代码import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitDelayedConfig {@Beanpublic CustomExchange delayedExchange() {return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, Map.of("x-delayed-type", "direct"));}@Beanpublic Queue delayedQueue() {return new Queue("delayed_queue");}@Beanpublic Binding delayedBinding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("delayed_routing_key").noargs();}
}
3、生产者发送消息
生产者在发送消息时,可以设置延迟时间:
java复制代码import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.HashMap;
import java.util.Map;@RestController
public class DelayedProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDelayed")public String sendDelayed(@RequestParam String message, @RequestParam int delay) {Map<String, Object> headers = new HashMap<>();headers.put("x-delay", delay);rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routing_key", message, msg -> {msg.getMessageProperties().getHeaders().putAll(headers);return msg;});return "Delayed message sent with delay: " + delay;}
}
4、消费者监听延迟队列
与TTL+DLX方法相同,消费者直接监听队列接收消息:
java复制代码import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DelayedConsumer {@RabbitListener(queues = "delayed_queue")public void receiveDelayedMessage(String message) {System.out.println("Received delayed message: " + message);}
}
3、死信队列: basic.reject或basic.nack
死信队列有3种情况: 这里就举常见的手动ack的情况拒绝消息实现死信队列
要在Spring Boot中使用RabbitMQ实现死信队列(Dead Letter Queue,DLQ),并处理消息被消费者拒绝的情况(通过basic.reject
或basic.nack
并且requeue=false
),可以按照以下步骤来实现。
1. 引入依赖
首先,在pom.xml
中引入Spring Boot和RabbitMQ的依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置交换机、队列和死信队列
接下来,在Spring Boot的配置类中配置一个普通队列、一个死信队列、一个普通交换机和一个死信交换机。
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normal_exchange");}// 死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange("dead_letter_exchange");}// 普通队列并绑定到普通交换机@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal_queue").withArgument("x-dead-letter-exchange", "dead_letter_exchange") // 设置死信交换机.withArgument("x-dead-letter-routing-key", "dead_letter_routing_key") // 设置死信RoutingKey.build();}@Beanpublic Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("normal_routing_key");}// 死信队列并绑定到死信交换机@Beanpublic Queue deadLetterQueue() {return new Queue("dead_letter_queue");}@Beanpublic Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("dead_letter_routing_key");}
}
3. 生产者发送消息
在生产者中发送消息到普通队列:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/send")public String send(@RequestParam String message) {rabbitTemplate.convertAndSend("normal_exchange", "normal_routing_key", message);return "Message sent: " + message;}
}
4. 消费者监听并拒绝消息
注意这里的前提是要开启手动ack:
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual # 手动ack
消费者监听普通队列并有条件地拒绝消息,将消息转发到死信队列:
- 当发送的消息内容为
"reject"
时,该消息会被拒绝并转发到死信队列。 - 当发送其他内容的消息时,消息会被正常消费。
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;@Component
public class Consumer {@RabbitListener(queues = "normal_queue")public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {try {String msg = new String(message.getBody());System.out.println("Received message: " + msg);// 根据某些条件判断是否拒绝消息if ("reject".equals(msg)) {// 拒绝消息,并且不重新入队(requeue=false)channel.basicReject(tag, false);System.out.println("Message rejected: " + msg);} else {// 消费成功,确认消息channel.basicAck(tag, false);}} catch (Exception e) {// 异常情况也可以使用basicNack将消息拒绝,并且不重新入队channel.basicNack(tag, false, false);}}
}
5. 消费者监听死信队列
最后,消费者监听死信队列,处理被拒绝的消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DeadLetterConsumer {@RabbitListener(queues = "dead_letter_queue")public void receiveDeadLetterMessage(String message) {System.out.println("Received dead letter message: " + message);}
}
总结
- 配置普通队列和死信队列,并通过设置
x-dead-letter-exchange
和x-dead-letter-routing-key
来实现消息被拒绝后的处理。 - 消费者可以根据业务逻辑通过
basic.reject
或basic.nack
拒绝消息,并指定不重新入队(requeue=false
),从而将消息转发到死信队列。 - 死信队列中的消息可以被另一个消费者监听和处理。
相关文章:

深入RabbitMQ世界:探索3种队列、4种交换机、7大工作模式及常见概念
文章目录 文章导图RabbitMQ架构及相关概念四大核心概念名词解读 七大工作模式及四大交换机类型0、前置了解-默认交换机DirectExchange1、简单模式(Simple Queue)-默认DirectExchange2、 工作队列模式(Work Queues)-默认DirectExchange3、发布/订阅模式(Publish/Subscribe)-Fano…...

将目标检测模型导出到C++|RT-DETR、YOLO-NAS、YOLOv10、YOLOv9、YOLOv8
点击下方卡片,关注“小白玩转Python”公众号 最近,出现了更新的YOLO模型,还有RT-DETR模型,这是一个声称能击败YOLO模型的变换器模型,我想将这些模型导出并进行比较,并将它们添加到我的库中。在这篇文章中&a…...

【Windows】解决新版 Edge 浏览器开机自启问题(简单有效)
文章目录 1.前言2.查找资料3.查找方法4.解决办法1.点击浏览器的三个...,然后点击设置2.选择【开始、主页和新建标签页】选项卡,然后关闭【Windows设备启动时】 结语 参考文章: 解决新版 Edge 浏览器开机自启问题(简单有效…...

如何给3D人物换衣服CC4
1.导入人物 2.设置人物Apose 3.导入衣服 create -> accessory 选择fbx文件 设置衣服的大小和位置。 4.绑定衣服 设置衣服的权重 添加动作就可以看效果了。...

如何对列表、字符串进行分组
如何对列表、字符串进行分组 1、效果 2、代码 使用python自带库collections中的Counter函数即可实现 代码如下: # -*- coding: utf-8 -*-""" @contact: @file: test.py @time: 2024/9/8 11:18 @author: LDC """ from collections import Co…...

【GEE代码实例教程详解:NDVI时间序列趋势分析】
GEE(Google Earth Engine)是一个强大的云计算平台,用于处理和分析大规模地球科学数据集。以下是一个关于如何使用GEE进行NDVI(归一化植被指数)时间序列趋势分析的详细教程。 一、引言 NDVI时间序列趋势分析是一种统计…...

51单片机-DS1302(RTC实时时钟芯片)
数据手册在主页资源免费贡献 开发板芯片数据手册 https://www.alipan.com/s/nnkdHhMGjrz 提取码: 95ik 点击链接保存,...

FreeRTOS学习笔记—②RTOS的认识及任务管理篇
由于正在学习韦东山老师的RTOS课程,结合了网上的一些资料,整理记录了下自己的感悟,用于以后自己的回顾。如有不对的地方请各位大佬纠正。 文章目录 一、RTOS的优势二、RTOS的核心功能2.1 任务管理2.1.1 任务的创建2.1.2 任务的删除*2.1.3 任…...

【C++从练气到飞升】22---C++中的异常
🎈个人主页:库库的里昂 ✨收录专栏:C从练气到飞升 🎉鸟欲高飞先振翅,人求上进先读书🎉 目录 ⛳️推荐 一、C语言传统的处理错误的方式 二、C异常 三、异常的使用 3.1 异常的抛出和捕获 3.1.1 异常的抛…...

前端:HTML、CSS、JS、Vue
1 前端 内容概要 了解前端三件套(HTML、CSS、JS)在前端所起的作用掌握HTML标签的功能,掌握重要标签(a标签,form标签)了解CSS了解JS的基础语法掌握Vue的基础语法重点掌握Vue项目怎么启动项目掌握前后端分离是什么。前端做什么事情,后端做什么…...

RocksDB简介
一、RocksDB是什么 常见的数据库如 Redis Mysql Mongo 可以单独提供网络服务RocksDB提供存储服务,是一个嵌入式KV存储引擎 Rocksdb没有server code,用户需要自己实现server的部分来得到c-s架构的数据库。二、RocksDB的诞生 基于flash存储和ssd普及,网络latency在query worklo…...

[VC] Visual Studio中读写权限冲突
前置场景: 编译没有报错,但是运行提示 内存异常: 情景1: 如下代码运行异常,提示引发了异常:写入权限冲突。*** 是 0xFFFFF..... char* str (char*)malloc(10);str[0] 0x30; 解决方案:要包含头…...

ChatGPT3.5/4.0新手使用手册,国内中文版使用教程
引言 欢迎使用ChatGPT!无论你是刚开始接触AI聊天机器人,还是已经有了一些使用经验,这篇新手使用手册将帮助你快速上手,并且从ChatGPT中获得最优的体验。本文主要聚焦于提示词(Prompt)的使用教学࿰…...

基于MicroPython的ESP8266与超声波传感器设计方案
基于MicroPython的ESP8266与超声波传感器的设计方案: 一、硬件准备 1. ESP8266 开发板(如NodeMCU) 2. 超声波传感器(如HC-SR04) 3. 杜邦线若干 二、硬件连接 1. 将超声波传感器的VCC引脚和ESP8266 的3.3V引脚,分别连接5V和3.3V电…...

仿华为车机UI--图标从Workspace拖动到Hotseat同时保留图标在原来位置
基于Android13 Launcher3,原生系统如果把图标从Workspace拖动到Hotseat里则Workspace就没有了,需求是执行拖拽动作后,图标同时保留在原位置。 实现效果如下: 实现思路: 1.如果在workspace中拖动,则保留原来“改变图标…...

C++ 中的 override 和 overload的区别
目录 1.Overload(重载) 2.override(重写) 3.override 和 overload 的根本区别 4.override 和 overload 的实际应用 5.override 和 overload 的常见误区 6.总结 1.Overload(重载) 定义:在同一个作用域内,可以声明几个功能类似的函数名相同的函数&am…...

spring boot3框架@Validated失效
项目中使用的springboot3.2.1,在使用Validated校验controller里参数时始终不生效;在网上查了相关资料,添加了spring-boot-starter-validation依赖但还是不行 经过层层调试,终于发现问题; springboot3添加Validated后校验的是 ja…...

UE5引擎工具链知识点
当我们提到“引擎工具链的开发”时,通常指的是为游戏开发或其他类型的软件开发创建一系列工具和技术栈的过程。这包括但不限于游戏引擎本身(如Unity或Unreal Engine),以及围绕这些引擎构建的各种工具和服务,比如用于构…...

Python的图像算术与逻辑运算详解
一.图像加法运算 图像加法运算主要有两种方法。第一种是调用Numpy库实现,目标图像像素为两张图像的像素之和;第二种是通过OpenCV调用add()函数实现。第二种方法的函数原型如下: dst add(src1, src2[, dst[, mask[, dtype]]]) – src1表示第…...

WSL 下的 CentOS 装 Docker
WSL 下的 CentOS 装 Docker 卸载旧版本安装前的准备工作1. 安装 yum-utils2. 添加阿里云的 yum 镜像仓库3. 快速生成 Yum 缓存 安装Docker启动docker运行 hello-world卸载 Docker 引擎参考资料 卸载旧版本 sudo yum remove docker \ docker-client \ docker-client-latest \ d…...

v0.dev快速开发
探索v0.dev:次世代开发者之利器 今之技艺日新月异,开发者之工具亦随之进步不辍。v0.dev者,新兴之开发者利器也,迅速引起众多开发者之瞩目。本文将引汝探究v0.dev之基本功能与优势,助汝速速上手,提升开发之…...

python之字符串
创建字符串 s "Hello, World!"常用字符串操作 获取字符串长度 length len(s) print(length) # 输出: 13字符串拼接 s1 "Hello" s2 "World" s3 s1 ", " s2 "!" print(s3) # 输出: Hello, World!重复字符串 s …...

算法打卡 Day28(回溯算法)-组合总数 + 组合总数 Ⅱ+ 电话号码的字母组合
文章目录 Leetcode 17-电话号码的字母组合题目描述解题思路 Leetcode 39-组合总数题目描述解题思路 Leetcode 216-组合总数 Ⅲ题目描述解题思路 Leetcode 17-电话号码的字母组合 题目描述 https://leetcode.cn/problems/letter-combinations-of-a-phone-number/description/ …...

【Hadoop|MapReduce篇】MapReduce概述
1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. Map…...

设置Virtualbox虚拟机共享文件夹
由于工作环境的原因,选择Virtualbox的方式安装虚拟操作系统,常用的操作系统为ubuntu,不知道道友是否也曾遇到这样的问题,就是虚拟机和主机进行文件拖拽的时候,会因为手抖造成拖拽失败,虚拟机界面显示大个的…...

从零开始的机器学习之旅
尊敬的读者们,在这个快速发展的数字时代,机器学习无疑已经成为了科技领域的一颗璀璨明星。它如同一把打开未来之门的钥匙,让我们能够窥探到数据背后的无限可能。今天,我将带领大家开启一段从零开始的机器学习之旅,让我…...

开源还是封闭?人工智能的两难选择
这篇文章于 2024 年 7 月 29 日首次出现在 The New Stack 上。人工智能正处于软件行业的完美风暴中,现在马克扎克伯格 (Mark Zuckerberg) 正在呼吁开源 AI。 关于如何控制 AI 的三个强大观点正在发生碰撞: 1 . 所有 AI 都应该是开…...

Prometheus 服务监控
官网:https://prometheus.io Prometheus 是什么 Prometheus 是一个开源的系统监控和报警工具,专注于记录和存储时间序列数据(time-series data)。它最初由 SoundCloud 开发,并已成为 CNCF(云原生计算基金会…...

建模杂谈系列252 规则的串行改并行
说明 提到规则,还是需要看一眼RETE算法: Rete算法是一种用于高效处理基于规则的系统中的模式匹配问题的算法,广泛应用于专家系统、推理引擎和生产系统。它的设计目的是在大量规则和数据的组合中快速找到满足特定规则条件的模式。 Rete算法…...

0.ffmpeg面向对象oopc
因为查rtsp相关问题,接触了下ffmpeg源码,发现它和linux内核一样,虽然都是c写的,但是都是面向对象的思想,c的面向对象称之为oopc。 这让我想起来一件好玩的事,有些搞linux内核驱动的只会c的开发人员不知道l…...