【RabbitMQ】 RabbitMQ高级特性(二)
文章目录
- 一、重试机制
- 1.1、重试配置
- 1.2、配置交换机&队列
- 1.3、发送消息
- 1.4、消费消息
- 1.5、运行程序
- 1.6、 手动确认
- 二、TTL
- 2.1、设置消息的TTL
- 2.2、设置队列的TTL
- 2.3、两者区别
- 三 、死信队列
- 6.1 死信的概念
- 3.2 代码示例
- 3.2.1、声明队列和交换机
- 3.2.2、正常队列绑定死信交换机
- 3.2.3 制造死信产生的条件
- 3.2.4、发送消息
- 3.2.5、测试死信
- 3.3、常见面试题
- 四、延迟队列
- 4.1、概念
- 4.2、应用场景
- 4.3、TTL+死信队列实现
- 4.4、常见面试题
- 五、事务
- 5.1、配置事务管理器
- 5.2、声明队列
- 5.3、生产者
- 结语
本文延续上文RabbitMQ高级特性(一)为大家继续讲解RabbitMQ其他高级特性
一、重试机制
在消息传递过程中, 可能会遇到各种问题, 如网络故障, 服务不可用, 资源不足等, 这些问题可能导致消息处理失败. 为了解决这些问题, RabbitMQ 提供了重试机制, 允许消息在处理失败后重新发送. 但如果是程序逻辑引起的错误, 那么多次重试也是没有⽤的, 可以设置重试次数
1.1、重试配置
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/jiaohuanlistener:simple:acknowledge-mode: auto #消息接收确认retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时⻓为5秒max-attempts: 5 # 最⼤重试次数(包括⾃⾝消费的⼀次)
1.2、配置交换机&队列
//重试机制public static final String RETRY_QUEUE = "retry_queue";public static final String RETRY_EXCHANGE_NAME = "retry_exchange";//重试机制 发布订阅模式//1. 交换机@Bean("retryExchange")public Exchange retryExchange() {return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("retryQueue")public Queue retryQueue() {return QueueBuilder.durable(Constant.RETRY_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("retryBinding")public Binding retryBinding(@Qualifier("retryExchange") FanoutExchangeexchange, @Qualifier("retryQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
1.3、发送消息
@RequestMapping("/retry")
public String retry(){rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test...");return "发送成功!"; }
1.4、消费消息
@Component
public class RetryQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.RETRY_QUEUE)public void ListenerQueue(Message message) throws Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3 / 0;System.out.println("处理完成");}
}
1.5、运行程序
我们可以观察到结果
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 1
o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=1, consumerTag=amq.ctag-vYckQBt9_0-5v2oG9oBnFw, consumerQueue=ack_queue])
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void com.jiaohuan.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'threw exception
如果对异常进行捕获, 那么就不会进行重试 代码修改如下:
System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(),"UTF-8"),
message.getMessageProperties().getDeliveryTag());
//模拟处理失败
try {int num = 3/0;System.out.println("处理完成");
}catch (Exception e){System.out.println("处理失败");
}
重新运行程序, 结果如下:
接收到消息: consumer ack test..., deliveryTag: 1
处理失败
1.6、 手动确认
改为手动确认
@RabbitListener(queues = Constant.RETRY_QUEUE)public void ListenerQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3 / 0;System.out.println("处理完成");//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收Thread.sleep(1000);//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃channel.basicNack(deliveryTag, true, true);}}
运⾏结果:
接收到消息: retry test..., deliveryTag: 1
接收到消息: retry test..., deliveryTag: 2
接收到消息: retry test..., deliveryTag: 3
接收到消息: retry test..., deliveryTag: 4
接收到消息: retry test..., deliveryTag: 5
接收到消息: retry test..., deliveryTag: 6
接收到消息: retry test..., deliveryTag: 7
接收到消息: retry test..., deliveryTag: 8
接收到消息: retry test..., deliveryTag: 9
接收到消息: retry test..., deliveryTag: 10
接收到消息: retry test..., deliveryTag: 11
可以看到, 手动确认模式时, 重试次数的限制不会像在自动确认模式下那样直接生效, 因为是否重试以及何时重试更多地取决于应⽤程序的逻辑和消费者的实现. ⾃动确认模式下, RabbitMQ 会在消息被投递给消费者后自动确认消息. 如果消费者处理消息时抛出异 常, RabbitMQ 根据配置的重试参数自动将消息重新⼊队, 从而实现重试. 重试次数和重试间隔等参数可以直接在RabbitMQ的配置中设定,并且RabbitMQ会负责执行这些重试策略.
⼿动确认模式下, 消费者需要显式地对消息进行确认. 如果消费者在处理消息时遇到异常, 可以选择不确认消息使消息可以重新⼊队. 重试的控制权在于应用程序本身, 而不是RabbitMQ的内部机制. 应用程序 可以通过自己的逻辑和利用RabbitMQ的⾼级特性来实现有效的重试策略。
使⽤重试机制时需要注意:
1 . ⾃动确认模式下: 程序逻辑异常, 多次重试还是失败, 消息就会被自动确认, 那么消息就丢失 了
2 . ⼿动确认模式下: 程序逻辑异常, 多次重试消息依然处理失败, 无法被确认, 就⼀直是unacked的状态, 导致消息积压
二、TTL
TTL(Time to Live, 过期时间), 即过期时间. RabbitMQ可以对消息和队列设置TTL.当消息到达存活时间之后, 还没有被消费, 就会被⾃动清除。
咱们在⽹上购物, 经常会遇到⼀个场景, 当下单超过24⼩时还未付款, 订单会被⾃动取消 还有类似的, 申请退款之后, 超过7天未被处理, 则⾃动退款
2.1、设置消息的TTL
目前有两种方法可以设置消息的TTL.
⼀是设置队列的TTL, 队列中所有消息都有相同的过期时间. ⼆是对消息本身进行单独设置, 每条消息的TTL可以不同. 如果两种方法⼀起使用, 则消息的TTL以两者之间较小的那个数值为准. 先看针对每条消息设置TTL。针对每条消息设置TTL的方法是在发送消息的方法中加入expiration的属性参数,单位为毫秒.
配置交换机&队列:
//TTLpublic static final String TTL_QUEUE = "ttl_queue";public static final String TTL_EXCHANGE_NAME = "ttl_exchange";//ttl//1. 交换机@Bean("ttlExchange")public Exchange ttlExchange() {return ExchangeBuilder.fanoutExchange(Constant.TTL_EXCHANGE_NAME).durable(true).build();}//2. 队列@Bean("ttlQueue")public Queue ttlQueue() {return QueueBuilder.durable(Constant.TTL_QUEUE).build();}//3. 队列和交换机绑定 Binding@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlExchange") FanoutExchange exchange,@Qualifier("ttlQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
发送消息:
@RequestMapping("/ttl")public String ttl() {String ttlTime = "10000";//10srabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration(ttlTime);return messagePostProcessor;});return"发送成功!";}
观看结果:
发送消息后, 可以看到, Ready消息为1:
10秒钟之后, 刷新页面, 发现消息已被删除:
如果不设置TTL,则表⽰此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到 消费者,否则该消息会被立即丢弃
2.2、设置队列的TTL
设置队列TTL的方法是在创建队列时, 加⼊ x-message-ttl 参数实现的, 单位是毫秒。
配置队列和绑定关系:
public static final String TTL_QUEUE2 = "ttl_queue2";//设置ttl@Bean("ttlQueue2")public Queue ttlQueue2() {//设置20秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();}//3. 队列和交换机绑定 Binding@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlExchange") FanoutExchange exchange,@Qualifier("ttlQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
设置过期时间, 也可以采⽤以下方式:
@Bean("ttlQueue2")public Queue ttlQueue2() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-message-ttl", 20000);//20秒过期return QueueBuilder.durable(Constant.TTL_QUEUE2).withArguments(arguments).build();}
发送消息:
@RequestMapping("/ttl")public String ttl() {// String ttlTime = "30000";//10s// //发送带ttl的消息// rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...", messagePostProcessor -> {// messagePostProcessor.getMessageProperties().setExpiration(ttlTime);// return messagePostProcessor;//});//发送不带ttl的消息rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE_NAME, "", "ttl test...");return "发送成功!";}
运行程序:
运行之后发现,新增了⼀个队列, 队列Features有⼀个TTL标识:
调用接口, 发送消息:
发送消息后, 可以看到, Ready消息为1:
采⽤发布订阅模式, 所有与该交换机绑定的队列(ttl_queue和ttl_queue2)都会收到消息
20秒钟之后, 刷新页面, 发现消息已被删除
由于ttl_queue队列, 未设置过期时间, 所以ttl_queue的消息未删除。
2.3、两者区别
设置队列TTL属性的方法, ⼀旦消息过期, 就会从队列中删除 设置消息TTL的方法, 即使消息过期, 也不会马上从队列中删除, 而是在即将投递到消费者之前进行判定的.
为什么这两种方法处理的方式不⼀样?
因为设置队列过期时间, 队列中已过期的消息肯定在队列头部, RabbitMQ只要定期从队头开始扫描是否 有过期的消息即可. ⽽设置消息TTL的方式, 每条消息的过期时间不同, 如果要删除所有过期消息需要扫描整个队列, 所以不 如等到此消息即将被消费时再判定是否过期, 如果过期再进⾏删除即可.
三 、死信队列
6.1 死信的概念
死信(dead message) 简单理解就是因为种种原因, ⽆法被消费的信息, 就是死信. 有死信, ⾃然就有死信队列. 当消息在⼀个队列中变成死信之后,它能被重新被发送到另⼀个交换器 中,这个交换器就是DLX( Dead Letter Exchange ), 绑定DLX的队列, 就称为死信队列(Dead Letter Queue,简称DLQ)
消息变成死信⼀般是由于以下几种情况:
- 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false.
- 消息过期.
- 队列达到最大长度
3.2 代码示例
3.2.1、声明队列和交换机
包含两部分:
• 声明正常的队列和正常的交换机
• 声明死信队列和死信交换机
死信交换机和死信队列和普通的交换机, 队列没有区别
//死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitmq.Constant;/*** 死信队列相关配置*/
@Configuration
public class DLXConfig {//死信交换机@Bean("dlxExchange")public Exchange dlxExchange() {returnExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();}//2. 死信队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DLX_QUEUE).build();}//3. 死信队列和交换机绑定 Binding@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange,@Qualifier("dlxQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();}//正常交换机@Bean("normalExchange")public Exchange normalExchange() {return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();}//正常队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();}//正常队列和交换机绑定 Binding@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") Exchangeexchange, @Qualifier("normalQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();}
}
3.2.2、正常队列绑定死信交换机
当这个队列中存在死信时, RabbitMQ会自动的把这个消息重新发布到设置的DLX上, 进而被路由到另一个队列, 即死信队列.可以监听这个死信队列中的消息以进⾏相应的处理
@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的RoutingKeyreturn QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}
3.2.3 制造死信产生的条件
@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的RoutingKey//制造死信产⽣的条件arguments.put("x-message-ttl", 10000);//10秒过期arguments.put("x-max-length", 10);//队列⻓度return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}
3.2.4、发送消息
@RequestMapping("/dlx")public void dlx() {//测试过期时间, 当时间达到TTL, 消息⾃动进⼊到死信队列rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");//测试队列⻓度// for (int i = 0; i < 20; i++) {// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");// }//测试消息拒收// rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");}
3.2.5、测试死信
程序启动之后, 观察队列:
队列Features说明:
D: durable的缩写, 设置持久化
TTL: Time to Live, 队列设置了TTL
Lim: 队列设置了长度(x-max-length)
DLX: 队列设置了死信交换机(x-dead-letter-exchange)
DLK: 队列设置了死信RoutingKey(x-dead-letter-routing-key)
- 测试过期时间, 到达过期时间之后, 进⼊死信队列
发送之后:
10秒后, 消息进入到死信队列:
生产者首先发送⼀条消息,然后经过交换器(normal_exchange)顺利地存储到队列(normal_queue)中. 由于队列normal_queue设置了过期时间为10s, 在这10s内没有消费者消费这条消息, 那么判定这条消息过期. 由于设置了DLX, 过期之时, 消息会被丢给交换器(dlx_exchange)中, 这时根据RoutingKey匹配, 找到匹配的队列(dlx_queue), 最后消息被存储在queue.dlx这个死信队列中.
- 测试达到队列长度, 消息进入死信队列
队列⻓度设置为10, 我们发送20条数据, 会有10条数据直接进⼊到死信队列 发送前, 死信队列只有⼀条数据
发送20条消息:
//测试队列⻓度
for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
运⾏后, 可以看到死信队列变成了11条
过期之后, 正常队列的10条也会进入到死信队列
3.3、常见面试题
死信队列作为RabbitMQ的高级特性,也是面试的一大重点。
- 死信队列的概念 死信(Dead Letter)是消息队列中的⼀种特殊消息, 它指的是那些无法被正常消费或处理的消息. 在消息队列系统中, 如RabbitMQ, 死信队列用于存储这些死信消息
- 死信的来源
1)消息过期: 消息在队列中存活的时间超过了设定的TTL
2)消息被拒绝: 消费者在处理消息时, 可能因为消息内容错误, 处理逻辑异常等原因拒绝处理该消息. 如果拒绝时指定不重新入队(requeue=false), 消息也会成为死信.
3)队列满了: 当队列达到最大长度, 无法再容纳新的消息时, 新来的消息会被处理为死信. - 死信队列的应用场景 对于RabbitMQ来说, 死信队列是⼀个非常有用的特性. 它可以处理异常情况下,消息不能够被消费者正 确消费而被置⼊死信队列中的情况, 应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的 异常情况, 进而可以改善和优化系统. 比如: 用户支付订单之后, 支付系统会给订单系统返回当前订单的⽀付状态。为了保证支付信息不丢失, 需要使用到死信队列机制. 当消息消费异常时, 将消息投入到死信队列中, 由订单系统的其他消费者来监听这个队列, 并对数据进行处理(比如发送工单等,进行人工确认).
场景的应用场景还有:
• 消息重试:将死信消息重新发送到原队列或另⼀个队列进行重试处理.
• 消息丢弃:直接丢弃这些无法处理的消息,以避免它们占⽤系统资源.
• ⽇志收集:将死信消息作为日志收集起来,用于后续分析和问题定位.
四、延迟队列
4.1、概念
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, 而是等待特定时间后, 消费者才能拿到这个消息进行消费.
4.2、应用场景
延迟队列的使用场景有很多, 比如:
- 智能家居: 用户希望通过手机远程遥控家⾥的智能设备在指定的时间进行⼯作. 这时候就可以将用户指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
- ⽇常管理: 预定会议后,需要在会议开始前十五分钟提醒参会⼈参加会议
- ⽤⼾注册成功后, 7天后发送短信, 提高用户活跃度等
- …
RabbitMQ本身没有直接支持延迟队列的的功能, 但是可以通过前面所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能. 假设⼀个应用中需要将每条消息都设置为10秒的延迟, 生产者通过 normal_exchange 这个交换器将 发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并非是 normal_queue 这个队列, 而是 dlx_queue 这个队列. 当消息从normal_queue 这个队列中过期之后被存入 dlx_queue 这个 队列中,消费者就恰巧消费到了延迟10秒的这条消息.
所以死信队列展⽰的也是延迟队列的使用.
4.3、TTL+死信队列实现
代码实现:
先看TTL+死信队列实现延迟队列
继续沿用死信队列的代码即可
声明队列
//正常队列@Bean("normalQueue")public Queue normalQueue() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);//绑定死 信队列arguments.put("x-dead-letter-routing-key", "dlx");//设置发送给死信队列的RoutingKeyreturn QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();}
⽣产者:
发送两条消息, ⼀条消息10s后过期, 第二条20s后过期
@RequestMapping("/delay")public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});return "发送成功!";}
消费者:
//指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("%tc 死信队列接收到消息: %s, deliveryTag: %d%n", newDate(),newString(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
}
运行程序:
通过控制台观察死信队列消费情况:
死信队列接收到消息: ttl test 10s…Wed May 22 11:58:50 CST , deliveryTag: 1
死信队列接收到消息: ttl test 20s…Wed May 22 11:58:50 CST , deliveryTag: 2
可以看到, 两条消息按照过期时间依次进入了死信队列. 延迟队列, 就是希望等待特定的时间之后, 消费者才能拿到这个消息. TTL刚好可以让消息延迟⼀段时间 成为死信, 成为死信的消息会被投递到死信队列⾥, 这样消费者⼀直消费死信队列里的消息就可以了.
存在问题
接下来把⽣产消息的顺序修改⼀下 先发送20s过期数据, 再发送10s过期数据
@RequestMapping("/delay")public String delay() {//发送带ttl的消息rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 20s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("20000");//20s过期return messagePostProcessor;});rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal","ttl test 10s..." + new Date(), messagePostProcessor -> {messagePostProcessor.getMessageProperties().setExpiration("10000");//10s过期return messagePostProcessor;});return "发送成功!";}
这时会发现: 10s过期的消息,,也是在20s后才进入到死信队列.
消息过期之后, 不⼀定会被马上丢弃. 因为RabbitMQ只会检查队首消息是否过期, 如果过期则丢到死信队列. 此时就会造成⼀个问题, 如果第⼀个消息的延时时间很长, 第二个消息的延时时间很短, 那第二个 消息并不会优先得到执行.
所以在考虑使用TTL+死信队列实现延迟任务队列的时候, 需要确认业务上每个任务的延迟时间是⼀致 的, 如果遇到不同的任务类型需要不同的延迟的话, 需要为每⼀种不同延迟时间的消息建⽴单独的消息队列。
另外注意,同样可以使用插件使得消息按照延迟时间到达消费者
4.4、常见面试题
延迟队列作为RabbitMQ的高级特性,也是面试的一大重点. 介绍下RabbitMQ的延迟队列。延迟队列是⼀个特殊的队列, 消息发送之后, 并不立即给消费者, 而是等待特定的时间, 才发送给消费者. 延迟队列的应用场景有很多, 比如:
- 订单在十分钟内未支付自动取消
- 用户注册成功后, 3天后发调查问卷
- 用户发起退款, 24小时后商家未处理, 则默认同意, 自动退款
- …
但RabbitMQ本身并没直接实现延迟队列, 通常有两种方法:
1 . TTL+死信队列组合的方式
2 . 使用官方提供的延迟插件实现延迟功能
⼆者对比:
- 基于死信实现的延迟队列
a. 优点: 1) 灵活不需要额外的插件支持
b. 缺点: 1) 存在消息顺序问题 2) 需要额外的逻辑来处理死信队列的消息, 增加了系统的复杂性 - 基于插件实现的延迟队列
a. 优点: 1) 通过插件可以直接创建延迟队列, 简化延迟消息的实现. 2) 避免了DLX的时序问题
b. 缺点: 1) 需要依赖特定的插件, 有运维工作 2) 只适用特定版本
五、事务
RabbitMQ是基于AMQP协议实现的, 该协议实现了事务机制, 因此RabbitMQ也支持事务机制. SpringAMQP也提供了对事务相关的操作. RabbitMQ事务允许开发者确保消息的发送和接收是原子性的, 要么全部成功, 要么全部失败.
5.1、配置事务管理器
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TransactionConfig {@Beanpublic RabbitTransactionManagertransactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}
5.2、声明队列
@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable("trans_queue").build();}
5.3、生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/trans")
@RestController
public class TransactionProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Transactional@RequestMapping("/send")public String send() {rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");int a = 5 / 0;rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");return "发送成功";}
}
通过测试发现:
- 不加 @Transactional , 会发现消息1发送成功
- 添加 @Transactional , 消息1和消息2全部发送失败
结语
本篇文章主要介绍了RAbbitMQ中的部分高级特性,主要从重试机制、有效时间TTL、死信队列、延迟队列和事务几方面展开。以上就是本文全部内容,感谢各位能够看到最后,如有问题,欢迎各位大佬在评论区指正,希望大家可以有所收获!创作不易,希望大家多多支持!
最后,大家再见!祝好!我们下期见!
相关文章:

【RabbitMQ】 RabbitMQ高级特性(二)
文章目录 一、重试机制1.1、重试配置1.2、配置交换机&队列1.3、发送消息1.4、消费消息1.5、运行程序1.6、 手动确认 二、TTL2.1、设置消息的TTL2.2、设置队列的TTL2.3、两者区别 三 、死信队列6.1 死信的概念3.2 代码示例3.2.1、声明队列和交换机3.2.2、正常队列绑定死信交…...
大数据技术全景解析:HDFS、HBase、MapReduce 与 Chukwa
大数据技术全景解析:HDFS、HBase、MapReduce 与 Chukwa 在当今这个信息爆炸的时代,大数据已经成为企业竞争力的重要组成部分。从电商的用户行为分析到金融的风险控制,从医疗健康的数据挖掘到智能制造的实时监控,大数据技术无处不…...

电子电路:什么是电流离散性特征?
关于电荷的量子化,即电荷的最小单位是电子的电荷量e。在宏观电路中,由于电子数量极大,电流看起来是连续的。但在微观层面,比如纳米器件或单电子晶体管中,单个电子的移动就会引起可观测的离散电流。 还要提到散粒噪声,这是电流离散性的表现之一。当电流非常小时,例如在二…...

深入理解位图(Bit - set):概念、实现与应用
目录 引言 一、位图概念 (一)基本原理 (二)适用场景 二、位图的实现(C 代码示例) 三、位图应用 1. 快速查找某个数据是否在一个集合中 2. 排序 去重 3. 求两个集合的交集、并集等 4. 操作系…...

猫番阅读APP:丰富资源,优质体验,满足你的阅读需求
猫番阅读APP是一款专为书籍爱好者设计的移动阅读应用,致力于提供丰富的阅读体验和多样化的书籍资源。它不仅涵盖了小说、非虚构、杂志等多个领域的电子书,还提供了个性化推荐、书架管理、离线下载等功能,满足不同读者的阅读需求。无论是通勤路…...
Java文件读写程序
1.引言 在日常的软件开发中,文件操作是常见的功能之一。不仅要了解如何读写文件,更要知道如何安全地操作文件以避免程序崩溃或数据丢失。这篇文章将深入分析一个简单的 Java 文件读写程序 Top.java,包括其基本实现、潜在问题以及改进建议&am…...
深入解析Java事件监听机制与应用
Java事件监听机制详解 一、事件监听模型组成 事件源(Event Source) 产生事件的对象(如按钮、文本框等组件) 事件对象(Event Object) 封装事件信息的对象(如ActionEvent包含事件源信息…...

MetaMask安装及使用-使用水龙头获取测试币的坑?
常见的异常有: 1.unable to request drip, please try again later. 2.You must hold at least 1 LINK on Ethereum Mainnet to request native tokens. 3.The address provided does not have sufficient historical activity or balance on the Ethereum Mainne…...

AI:OpenAI论坛分享—《AI重塑未来:技术、经济与战略》
AI:OpenAI论坛分享—《AI重塑未来:技术、经济与战略》 导读:2025年4月24日,OpenAI论坛全面探讨了 AI 的发展趋势、技术范式、地缘政治影响以及对经济和社会的广泛影响。强调了 AI 的通用性、可扩展性和高级推理能力,以…...

Linux配置vimplus
配置vimplus CentOS的配置方案很简单,但是Ubuntu的解决方案网上也很多但是有效的很少,尤其是22和24的解决方案,在此我整理了一下我遇到的问题解决方法 CentOS7 一键配置VimForCPP 基本上不会有什么特别难解决的报错 sudo yum install vims…...

服务端HttpServletRequest、HttpServletResponse、HttpSession
一、概述 在JavaWeb 开发中,获取客户端传递的参数至关重要。http请求是客户端向服务端发起数据传输协议,主要包含包含请求行、请求头、空行和请求体四个部分,在这四部分中分别携带客户端传递到服务端的数据。常见的http请求方式有get、post、…...

实验九视图索引
设计性实验 1. 创建视图V_A包括学号,姓名,性别,课程号,课程名、成绩; 一个语句把学号103 课程号3-105 的姓名改为陆君茹1,性别为女 ,然后查看学生表的信息变化,再把上述数据改为原…...

git 本地提交后修改注释
dos命令行进入目录,idea可以点击Terminal 进入命令行 git commit --amend -m "修改内容"...

面向具身智能的视觉-语言-动作模型(VLA)综述
具身智能被广泛认为是通用人工智能(AGI)的关键要素,因为它涉及控制具身智能体在物理世界中执行任务。在大语言模型和视觉语言模型成功的基础上,一种新的多模态模型——视觉语言动作模型(VLA)已经出现&#…...
Thrust库中的Gather和Scatter操作
Thrust库中的Gather和Scatter操作 Thrust是CUDA提供的一个类似于C STL的并行算法库,其中包含两个重要的数据操作:gather(聚集)和scatter(散开)。 Gather操作 Gather操作从一个源数组中按照指定的索引收集元素到目标数组中。 函数原型: t…...

计算机发展的历程
计算机系统的概述 一, 计算机系统的定义 计算机系统的概念 计算机系统 硬件 软件 硬件的概念 计算机的实体, 如主机, 外设等 计算机系统的物理基础 决定了计算机系统的天花板瓶颈 软件的概念 由具有各类特殊功能的程序组成 决定了把硬件的性能发挥到什么程度 软件的分类…...

深度学习驱动下的目标检测技术:原理、算法与应用创新(三)
五、基于深度学习的目标检测代码实现 5.1 开发环境搭建 开发基于深度学习的目标检测项目,首先需要搭建合适的开发环境,确保所需的工具和库能够正常运行。以下将详细介绍 Python、PyTorch 等关键开发工具和库的安装与配置过程。 Python 是一种广泛应用于…...
Python爬虫实战:研究 RPC 远程调用机制,实现逆向解密
1. 引言 在网络爬虫技术的实际应用中,目标网站通常采用各种加密手段保护其数据传输和业务逻辑。这些加密机制给爬虫开发带来了巨大挑战,传统的爬虫技术往往难以应对复杂的加密算法。逆向解密作为一种应对策略,旨在通过分析和破解目标网站的加密机制,获取原始数据。 然而,…...
[学习] RTKLib详解:qzslex.c、rcvraw.c与solution.c
RTKLib详解:qzslex.c、rcvraw.c与solution.c 本文是 RTKLlib详解 系列文章的一篇,目前该系列文章还在持续总结写作中,以发表的如下,有兴趣的可以翻阅。 [学习] RTKlib详解:功能、工具与源码结构解析 [学习]RTKLib详解…...

jenkins流水线常规配置教程!
Jenkins流水线是在工作中实现CI/CD常用的工具。以下是一些我在工作和学习中总结出来常用的一些流水线配置:变量需要加双引号括起来 "${main}" 一 引用无账号的凭据 使用变量方式引用,这种方式只适合只由密码,没有用户名的凭证。例…...
Java中序列化和反序列化的理解
基本概念 序列化(Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程,而反序列化(Deserialization)则是将这种形式重新转换为对象的过程。 核心作用 持久化存储:将对象状态保存到文件或数据库中 网络传输:在网络间传递对象…...

基于OpenCV的SIFT特征和FLANN匹配器的指纹认证
文章目录 引言一、概述二、代码解析1. 图像显示函数2. 核心认证函数2.1 创建SIFT特征提取器2.2 检测关键点和计算描述符(源图像)2.3 检测关键点和计算描述符(模板图像)2.4 创建FLANN匹配器2.5 使用K近邻匹配 3. 匹配点筛选4. 认证…...
零基础学Java——第十一章:实战项目 - 桌面应用开发(JavaFX入门)
第十一章:实战项目 - 桌面应用开发(JavaFX入门) 欢迎来到我们实战项目的桌面应用开发部分!在前面的章节中,我们可能已经接触了Swing。现在,我们将目光投向JavaFX,一个更现代、功能更丰富的用于…...
Milvus 视角看主流嵌入式模型(Embeddings)
嵌入是一种机器学习概念,用于将数据映射到高维空间,其中语义相似的数据被紧密排列在一起。嵌入模型通常是 BERT 或其他 Transformer 系列的深度神经网络,它能够有效地用一系列数字(称为向量)来表示文本、图像和其他数据…...

leetcode:58. 最后一个单词的长度(python3解法)
难度:简单 给你一个字符串 s,由若干单词组成,单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大子字符串。 示例 1: 输入:s "Hello World"…...

虹科应用 | 探索PCAN卡与医疗机器人的革命性结合
随着医疗技术的不断进步,医疗机器人在提高手术精度、减少感染风险以及提升患者护理质量方面发挥着越来越重要的作用。医疗机器人的精确操作依赖于稳定且高效的数据通信系统,虹科提供的PCAN四通道mini PCIe转CAN FD卡,正是为了满足这一需求而设…...

entity线段材质设置
在cesium中,我们可以改变其entity线段材质,这里以直线为例. 首先我们先创建一条直线 const redLine viewer.entities.add({polyline: {positions: Cesium.Cartesian3.fromDegreesArray([-75,35,-125,35,]),width: 5,material:material, 保存后可看到在地图上创建了一条线段…...

[STM32] 5-1 时钟树(上)
文章目录 前言5-1 时钟树(上)时钟树的基本介绍时钟树的基本结构大树和小树频率运算简介计数器和分频STM32内部结构树的结构于关键节点SYSCLK(System Clock) 系统时钟 72M maxHCLK(AHB Clock) AHB时钟 36M maxPLCK(APB1 Clock) APB1时钟 36M maxPLCK2(APB…...

【Linux网络与网络编程】12.NAT技术内网穿透代理服务
1. NAT技术 之前我们说到过 IPv4 协议中IP 地址数量不充足的问题可以使用 NAT 技术来解决。还提到过本地主机向公网中的一个服务器发起了一个网络请求,服务器是怎么将应答返回到该本地主机呢?(如何进行内网转发?) 这就…...
【HTTPS基础概念与原理】TLS握手过程详解
以下是 TLS握手过程的详细拆解,涵盖客户端与服务器之间的关键交互步骤,包括ClientHello、ServerHello、证书验证、密钥交换等核心阶段,并对比TLS 1.2与TLS 1.3的差异: 一、TLS握手的核心目标 协商协议版本:确定双方支…...