RabbitMQ(二)
二、高级特性、应用问题以及集群搭建
高级特性
1.消息的可靠性投递
在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
rabbitMQ整个消息投递的路径为:
producer -> rabbitMQ broker -> exchange -> queue -> consumer
- confirm确认模式
confirm确认模式是再producer传递给exchange过程中控制消息的模式,当消息成功的从producer传递到了exchange,那么则会返回一个 confirmCallBack() 回调函数 - return 退回模式
return退回模式是指消息从exchange传递给queue过程中消息传递失败,则会返回一个returnCallBack() 回调函数
1.1 confirm确认模式的代码编写:
因为确认模式是producer到exchange,所以代码和配置修改应该写在生产者的模块中。
第一步:开启确认模式
新版本的rabbitmq弃用了publish-confirms:true,可以改用
publisher-confirm-type: correlated实现同样的效果
spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105#开启确认模式publisher-confirm-type: correlated
第二步:编写confirmCallBack()函数
回调函数confirm()的返回值在发送消息成功时ack为true,但是我遇到一个问题,就是消息发送成功了,在队列中也能看到,但是返回值ack为false,
clean channel shutdown;
这是因为convertAncSend()方法结束后rabbitMQ的资源也就关闭了,所以就算成功了,回调函数返回值也是false;所以我们在后面强制睡眠200ms,让资源晚点关闭,这样的话得到的ack就是true了
package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootMqProducerApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid contextLoads() throws InterruptedException {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关的配置信息* @param b 消息是否发送成功* @param s 消息发送失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了");System.out.println(b);if(b){System.out.println("消息从producer -> exchange成功");System.out.println("失败原因:" + s);}else{System.out.println("消息从producer -> exchange失败");System.out.println("失败原因:" + s);}}});rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"test.hello","测试springboot整合交换机");Thread.sleep(200);}
}
结果:
1.2 return回退模式的代码编写
第一步:开启回退模式
spring:rabbitmq:password: heimausername: heimaport: 5673virtual-host: itcasthost: 1.12.244.105#开启确认模式publisher-confirm-type: correlated#开启回退模式publisher-returns: true
第二步:编写returnCallBack()函数
第三步:设置exchange处理消息的模式
①setMandatory为true,如果消息没有到队列queue,则返回消息给发送方
②setMandatory为false,如果消息没有到队列queue,则丢弃消息(默认)
package com.rabbitmq.springboot_mqproducer;import com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootMqProducerApplicationTests {@ResourceRabbitTemplate rabbitTemplate;@Testvoid contextLoads() throws InterruptedException {//编写confirm回调函数rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData 相关的配置信息* @param b 消息是否发送成功* @param s 消息发送失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm方法被执行了");System.out.println(b);if(b){System.out.println("消息从producer -> exchange成功");System.out.println("失败原因:" + s);}else{//消息发送失败,需要做一些处理System.out.println("消息从producer -> exchange失败");System.out.println("失败原因:" + s);}}});//编写return回调函数rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("return回退模式回调函数执行了");System.out.println("消息:"+returnedMessage.getMessage());System.out.println("exchange:"+returnedMessage.getExchange());System.out.println("replyCode:"+returnedMessage.getReplyCode());System.out.println("replyText:"+returnedMessage.getReplyText());System.out.println("routingKey:"+returnedMessage.getRoutingKey());}});//设置回退模式中,exchange处理消息的方式/*当将mandatory设置为false(默认值),如果RabbitMQ无法将消息路由,消息将会被静默丢弃,生产者不会收到通知。当设置mandatory为true时,意味着消息被视为"mandatory",如果在发布消息时RabbitMQ无法将消息路由到任何队列(例如由于没有匹配的队列与指定的路由键),则代理将通过调用ReturnListener回调的returnedMessage方法将消息返回给生产者(发布者)。生产者可以根据需要适当地处理这个返回的消息,例如记录日志或执行某些恢复操作。*/rabbitTemplate.setMandatory(true);//TODO 这里把routingKey写错,是为了让交换机找不到queue,从而触发returnCallBack()函数rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,"testtttt.hello","测试springboot整合交换机");Thread.sleep(200);}}
消息的可靠投递小结:
- 设置配置publisher-confirm-type: correlated开启确认模式
- 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true, 则发送成功,如果为false,则发送失败,需要处理。
- 设置ConnectionFactory的publisher-returns="true"开肩退回模式。
- 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。
- 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(),用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务
2.Consumer Ack
ack指Acknowledge,确认。表示消费端收到消息后的确认方式。
有三种确认方式:
- 自动确认:acknowledge=“none”
- 手动确认:acknowledge=“manual”
- 根据异常情况确认:acknowledge=“auto”(这种方式很麻烦,不做讲解)
其中自动确认是指,当消息一旦被Consumer接收到, 则自动确认收到,并将相应message从RabbitMQ的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck() 手动签收,如果出现异常,则调用channel.basicNack() 方法,让其自动重新发送消息。
代码编写:
发送消息的生产者端代码不用变,只需要能够发送消息就行
消费者端:
第一步:编写yml配置文件
spring:rabbitmq:username: heimapassword: heimavirtual-host: itcasthost: 1.12.244.105port: 5673#设置消息为手动签收listener:simple:acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理concurrency: 1 #当前监听的数量max-concurrency: 5 #最大监听数量retry:enabled: true #是否支持重试max-attempts: 4 #最大重试次数,默认为3
第二步:编写消费者代码
消费者端创建一个listener并实现ChannelAwareMessageListener接口(其实也可以不实现该接口,只要 @RabbitListener 标记的方法,或者 @RabbitListener 标记的类+ @RabbitHandler 标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数,都可以)
package com.rabbit.springboot_mqconsumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;/*** @author Watching* * @date 2023/7/19* * Describe:*/
@Component
public class RabbitMQListener implements ChannelAwareMessageListener {
// @RabbitListener(queues = {"boot_topic_queue"})//填写队列名称,可以以字符串数组的方式监听多个队列
// public void listener(Message message){
// System.out.println("message:"+message);
// }/*** 使用ChannelAwareMessageListener监听器接口中的onMessage()方法来充当消费者,如果上面注释的方法与当前方法同时存在,一条消息只会被消费一次。不会被两个方法都消费** @param message* @param channel* @throws Exception Consumer ACK机制:* 1.设置手动签收。acknowledge= "manual”* 2.让监听器类实现ChannelAwareMessageListener接口* 3.如果消息成功处理,则调用channel的basicAck()签收* 4.如果消息处理失败,则调用channel的basicNack( )拒绝签收,broker重新发送给consumer*/@RabbitListener(queues = "boot_topic_queue" )@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{//1.接收消息System.out.println("message:" + message);System.out.println("channel:" + channel);//2.处理业务逻辑System.out.println("模拟处理业务逻辑......");//3.手动签收/*void basicAck(long deliveryTag, boolean multiple) throws IOException;deliveryTag:当消费者接收到一条消息后,RabbitMQ 会为该消息分配一个唯一的 DeliveryTag。这个 DeliveryTag 是一个64位的长整型数值,并且只在该 Channel 内唯一,即相同 Channel 下的 DeliveryTag 不会重复。multiple:当 multiple 设置为 false 时,表示只确认当前指定的 deliveryTag 对应的一条消息。也就是说,只确认指定的单个消息已经成功被处理或处理失败。当 multiple 设置为 true 时,表示确认当前指定的 deliveryTag 及其之前所有未确认的消息(在同一个 Channel 下)。也就是说,会一次性确认多条消息的处理状态,将 deliveryTag 小于或等于指定 deliveryTag 的所有消息都确认处理了。这种批量确认的机制有助于提高消息的处理效率,特别是当消费者处理多条消息时,可以通过一次性确认多条消息的方式来减少网络开销和消费者端的负担。在使用 channel.basicAck(deliveryTag, multiple) 和 channel.basicNack(deliveryTag, multiple, requeue) 方法时,可以根据实际场景来选择是单条确认还是批量确认,以满足不同的业务需求。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);System.out.println("完成手动签收");}catch(Exception e){//4.出现异常,拒绝签收/*deliveryTag:一个唯一标识消息的64位长整型数值,用于确认消息的消费状态。multiple:一个布尔类型的参数,用于决定是否批量处理多条消息。若设置为 true,则会否定当前指定 deliveryTag 及其之前的所有未确认消息;若设置为 false,则只否定当前指定 deliveryTag 对应的一条消息。requeue:一个布尔类型的参数,表示是否将消息重新放回队列。若设置为 true,则消息会被重新入队列,RabbitMQ 会再次将它发送给消费者;若设置为 false,则消息会被直接丢弃,不会重新放回队列。*/System.out.println("代码逻辑出现异常,拒收");channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}}
}
只需要两步,就可以实现Consumer ack,下面我们来测试一下:
首先是正常运行的代码的结果:(业务逻辑代码无异常)
生产者端是用的前面测试boot整合的代码
然后我们来测试业务逻辑代码出错的情况,我们在业务逻辑代码处添加一个除数不能为0的异常
再次运行代码,一直在重试,一直再报错
消息的可靠性总结
1.持久化:
- exchange要持久化
- queue要持久化
- message要持久化
2.生产方确认Confirm(在后续文章中会讲解如何在回调函数中进行具体的处理
3.消费方确认Ack
4. Broker高可用(集群搭建
3.消费端限流
在A系统中,每秒最多只能处理1000条请求,如果在一秒钟只能瞬间有5000条请求打入A系统,那么A系统就会崩溃,所以我们在A系统中加入一个MQ中间件,让5000个请求先发送到MQ,然后A系统再分批次的从MQ中拉取1000条请求,这样A系统就避免了崩溃的情况。
这也是我们常说的MQ的削峰功能
设置MQ消费限流很简单,只需要设置两个属性:
- 确认模式设置为手动确认(在上面的Ack我们已经讲过)
- 设置prefetch属性,prefetch = n,n就是每次从MQ中获取消息的数量
其余的消费端代码和生产者端代码不用修改。
当设置了消费端限流后,如果从MQ中取出1条消息,消费者端没有进行确认,那么消费者端将不会再从MQ中取消息,直到消息被确认。
4.TTL
TTL全称Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列Queue设置过期时间。
举一个例子:
生活中我们在购买商品的时候会下订单,系统会提示我们要在30分钟之内付款,否则订单将会被取消。
Ⅰ、先在控制台模拟上面的情况
①创建一个交换机
②创建一个队列
③进入交换机exchange_ttl和队列queue_ttl进行绑定
④消息的发布
⑤在消息队列中查看
将鼠标放上ttl,就可以看到设置的时间,等时间一过,这条消息就会被自动清除。
Ⅱ、代码实现队列过期,和消息过期
①创建交换机,队列,以及绑定关系
package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author Watching* * @date 2023/7/18* * Describe:*/
@Configuration
public class MQConfig {public static final String QUEUE_TTL_NAME = "queue_ttl";public static final String EXCHANGE_TTL_NAME = "exchange_ttl";/*
创建队列,测试ttl特性*/@Bean("test_queue_ttl")public Queue ttlQueue() {Map<String,Object> arguments = new HashMap<>();arguments.put("x-message-ttl",10000);//消息过期的时间arguments.put("x-expires",100000);//队列过期的时间//设置队列的ttl时间return QueueBuilder.durable(QUEUE_TTL_NAME).withArguments(arguments).build();//参数的属性可以在控制台上查看}/*
创建一个交换机测试队列ttl特性*/@Bean("test_exchange_ttl")public Exchange ttlExchange() {return ExchangeBuilder.topicExchange(EXCHANGE_TTL_NAME).durable(true).build();}/*绑定ttl交换机和队列*/@Beanpublic Binding ttlBinding(@Qualifier("test_exchange_ttl") Exchange exchange, @Qualifier("test_queue_ttl") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ttl.#").noargs();}
}
在创建队列时,我们指定了x-message-ttl,使队列中的所有消息都是一个固定的时间过期
我们还可以在发送消息时,指定每条消息的过期时间。
只需要在发送方法convertAndSend()方法中添加一个消息后处理参数即可
/*MessagePostProcessor 是 Spring AMQP 中的一个接口,用于对消息进行后处理。通过实现该接口,你可以在发送消息之前对消息进行一些自定义处理,例如添加自定义的消息头、修改消息内容等。*/MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置消息属性message.getMessageProperties().setExpiration("5000");//5000ms过期//2.返回该消息return message;}};@Testvoid testSend() throws InterruptedException {for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TTL_NAME, "ttl.hello", "测试ttl"+i,messagePostProcessor);}Thread.sleep(200);}
小细节:
①当队列设置了x-expires和x-messgae-ttl,消息过期时间以短的为准
②当队列设置了x-messgae-ttl,且发送消息时通过消息后处理也设置了过期时间,那么消息过期时间也以短的为准。
③当十条消息中只有一条消息设置了过期时间,这条消息过期后,只有处于队列顶端,即即将被消费时,才会对这条消息是否过期做判断。
5.死信队列
5.1 概念
死信队列,英文缩写: DLX ,Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。(死信队列为什么英文翻译过来使死信交换机呢?因为交换机概念只有在RabbitMQ中才有,其它MQ中间件只有队列概念,所以习惯叫死信队列,而RabbitMQ中存在交换机概念,所以叫死信交换机。)
在这里我们需要理解的问题有:
①消息什么时候成为死信?
- 队列长度达到限制,比如队列最多容纳10条消息,当第11条消息进入时,这条消息就成为了死信消息。
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间却并未被消费
以上三种,满足一条即为死信消息
②队列如何绑定死信交换机?
队列设置参数:x-dead-letter-exchange和x-dead-letter-routing-key
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:消息发送时指定的routingKey
5.2 代码实现死信队列
创建死信队列:
- 1.声明正常的队列(test_queue_dLx)和交换机(test_exchange_dlx)
- 2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
- 3.正常队列绑定死信交换机,正常队列绑定死信队列不需要创建Binding Bean,只需要在正常队列创建时设置参数就可以
– 设置两个参数:
x-dead-letter-exchange:死信交换机名称
x-dead-letter-routing-key:发送给死信交换机的routingkey
设置正常队列中的消息的过期时间x-message-ttl
设置正常队列的长度限制x-max-length
package com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author Watching* * @date 2023/7/18* * Describe:*/
@Configuration
public class MQConfig {/*** 测试死信队列*//*创建普通交换机和普通队列*/@Bean("test_exchange_dlx")public Exchange testDlxExchange() {return ExchangeBuilder.topicExchange("test_exchange_dlx").durable(true).build();}@Bean("test_queue_dlx")public Queue testDlxQueue() {Map<String,Object> map = new HashMap<>();//x-dead-letter-exchange:死信交换机名称map.put("x-dead-letter-exchange","exchange_dlx");//x-dead-letter-routing-key:发送给死信交换机的routingkeymap.put("x-dead-letter-routing-key","dlx.hehe");//这个routingkey只需要满足死信交换机的路由规则就可以//设置正常队列中的消息的过期时间ttlmap.put("x-message-ttl",10000);//设置正常队列的长度限制max-lengthmap.put("x-max_length",10);return QueueBuilder.durable("test_queue_dlx").withArguments(map).build();}@Beanpublic Binding binding1(@Qualifier("test_exchange_dlx") Exchange exchange,@Qualifier("test_queue_dlx")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();}/*创建死信交换机和死信队列*/@Bean("exchange_dlx")public Exchange dlxExchange() {return ExchangeBuilder.topicExchange("exchange_dlx").durable(true).build();}@Bean("queue_dlx")public Queue dlxQueue() {return QueueBuilder.durable("queue_dlx").build();}@Beanpublic Binding binding2(@Qualifier("exchange_dlx") Exchange exchange,@Qualifier("queue_dlx")Queue queue){return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();}/*绑定普通队列和死信交换机,并不需要写一个Binding,只需要在普通队列中添加参数就行*/
}
发送消息测试死信消息:
1.过期时间
2.长度限制
3.消息拒收
@Testvoid testDlx() {//1.过期时间
// rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息超出过期时间变成死信");//2.超出队列消息数量限制
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hello", "测试消息超出队列数量限制变成死信");
// }//3.消费端拒收rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hello","测试消息被拒收变成死信");}
死信队列小结:
1.死信交换机,死信队列和普通交换机,普通队列没有区别.
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被重新路由到死信队列中
3.消息成为死信的三种情况
- 消息在队列中到达超时时间并未被消费
- 消息在消费者端被拒收,且设置了不重回队列
- 队列长度存在限制,消息数量超出了限制
6.延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
1.下单后,30分钟未支付,取消订单,回滚库存。
2.新用户注册成功7天后,发送短信问候。
实现方式:
1.定时器
2.延迟队列
订单系统将订单放入延迟队列种,30分钟后取出,去库存系统中判断订单是否已经支付,再进行后续的支付或者未支付操作
但是!
RabbitMQ官方没有提供延迟队列,所以我们需要使用ttl+死信队列构成延迟队列
普通队列设置为30min中过期,过期后消息路由到死信队列,库存系统从死信队列中取消息,这样就形成了一个延迟队列
代码实现延迟队列
1.定义正常交换机(order_exchange)和队列(order_queue),同时绑定
2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx),同时绑定
3.绑定正常队列和死信交换机,设置正常队列过期时间为10秒
/*** 测试延迟队列*//*1.定义正常交换机(order_exchange)和队列(order_queue)*/@Bean("orderQueue")public Queue orderQueue(){//3.正常队列绑定死信交换机Map<String,Object> map = new HashMap<>();map.put("x-dead-letter-exchange","order_exchange_dlx");map.put("x-dead-letter-routing-key","dlx.order.hehe");//设置正常队列的消息过期时间map.put("x-message-ttl",10000);return QueueBuilder.durable("order_queue").withArguments(map).build();}@Bean("orderExchange")public Exchange orderExchange(){return ExchangeBuilder.topicExchange("order_exchange").build();}@Beanpublic Binding orderBinding(@Qualifier("orderQueue")Queue queue,@Qualifier("orderExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();}/*2.定义死信交换机(order_exchange_dlx) 和队列(order_queue_dlx)*/@Bean("orderQueueDlx")public Queue orderQueueDlx(){return QueueBuilder.durable("order_queue_dlx").build();}@Bean("orderExchangeDlx")public Exchange orderExchangeDlx(){return ExchangeBuilder.topicExchange("order_exchange_dlx").build();}@Beanpublic Binding orderBindingDlx(@Qualifier("orderQueueDlx")Queue queue,@Qualifier("orderExchangeDlx")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs();}
4.创建生产者发送消息
/*** 测试延迟队列*/@Testvoid testDelay() throws InterruptedException {rabbitTemplate.convertAndSend("order_exchange","order.test","测试延迟队列");for (int i = 10;i > 0;i--){System.out.println(i+"...");Thread.sleep(1000);}}
5.创建消费者
package com.rabbit.springboot_mqconsumer;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** @author Watching* * @date 2023/8/2* * Describe:*/
@Component
public class OrderListener implements ChannelAwareMessageListener {@RabbitListener(queues = "order_queue_dlx")//监听死信队列@Overridepublic void onMessage(Message message, Channel channel) throws Exception {try{//1.接收messageSystem.out.println("message:"+message);//2.处理业务逻辑System.out.println("处理业务逻辑");System.out.println("根据订单id在数据库中查询订单状态");System.out.println("判断订单是否支付成功");System.out.println("未支付,回滚库存,取消订单");//3.手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch (Exception e){//4.业务出错,拒绝签收channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);//业务出错,拒签后要将这条消息重新放回死信队列}}
}
延迟队列小结:
1.延迟队列指消息进入队列后,可以被延迟一定时间,再进行消费。
2. RabbitMQ没有提供延迟队列功能,但是可以使用: TTL + DLX来实现延迟队列效果。
应用问题
1.消息补偿
消息补偿机制
2.幂等性保障
幂等性保障
相关文章:

RabbitMQ(二)
二、高级特性、应用问题以及集群搭建 高级特性 1.消息的可靠性投递 在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。 rabbitMQ整个消息投递的路径为: produ…...
Linux软件实操
systemctl命令 Linux系统的很多内置或第三方的软件均支持使用systemctl命令控制软件(服务)的启动、停止、开机自启 systemctl start(启动) 或 stop(关闭) 或 status(查看状态) 或 enable(开启开机自启) disable(关闭开机自启) 服务名: 控制服务的状态 系统内置的服务: Netwo…...

kagNet:对常识推理的知识感知图网络 8.4+8.5
这里写目录标题 摘要介绍概述问题陈述推理流程 模式图基础概念识别模式图构造概念网通过寻找路径来匹配子图基于KG嵌入的路径修剪 知识感知图网络图卷积网络(GCN)关系路径编码分层注意机制 实验数据集和使用步骤比较方法KAGNET是实施细节性能比较和分析I…...

Jmeter 压测工具使用手册[详细]
1. jemter 简介 jmeter 是 apache 公司基于 java 开发的一款开源压力测试工具,体积小,功能全,使用方便,是一个比较轻量级的测试工具,使用起来非常简 单。因为 jmeter 是 java 开发的,所以运行的时候必须先…...

matlab智能算法程序包89套最新高清录制!matlab专题系列!
关于我为什么要做代码分享这件事? 助力科研旅程! 面对茫茫多的文献,想复现却不知从何做起,我们通过打包成品代码,将过程完善,让您可以拿到一手的复现过程以及资料,从而在此基础上,照…...

caj文件怎么转换成pdf?了解一下这种方法
caj文件怎么转换成pdf?如果你曾经遇到过需要将CAJ文件转换成PDF格式的情况,那么你一定知道这是一件麻烦的事情。幸运的是,现在有许多软件和工具可以帮助你完成这项任务。下面就给大家介绍一款使用工具。 【迅捷PDF转换器】是一款功能强大的工…...

windows 同时安装 Mysql 5.7 和8.0
下载链接 https://dev.mysql.com/downloads/mysql/ 推荐下载 MSI,可以通过图像化界面配置 8.1 版本 安装5.7 系统安装两个MySQL 怎么访问 都是mysql,所以环境变量 配置,只能一个生效,生效就是谁靠前谁生效 cmd 录入 services.m…...

数字孪生的「三张皮」问题:数据隐私、安全与伦理挑战
引言 随着数字化时代的来临,数据成为了当今社会的宝贵资源。然而,数据的广泛使用也带来了一系列隐私、安全与伦理挑战。数字孪生作为一种虚拟的数字化实体,通过收集和分析大量数据,模拟和预测现实世界中的各种情境,为…...

Hadoop学习:深入解析MapReduce的大数据魔力(上)
Hadoop学习:深入解析MapReduce的大数据魔力(上) 前言1.MapReduce概述1.1MapReduce 定义1.2MapReduce 优缺点优点缺点 1.3MapReduce 核心思想1.4 MapReduce 进程1.5 官方WordCount源码1.6 常用数据序列化类型1.7 MapReduce 编程规范1.8 WordCo…...

MQ(一)-MQ理论与消息中间件简介
MQ理论 队列,是一种FIFO 先进先出的数据结构。消息:在不同应用程序之间传递的数据。将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。MQ通常三大作用: 异步、解耦、限流 Spring…...
vb与EXCEL的连接
一、 VB读写EXCEL表: VB本身提自动化功能可以读写EXCEL表,其方法如下: 1、在工程中引用Microsoft Excel类型库: 从"工程"菜单中选择"引用"栏;选择Microsoft Excel 9.0 Object Libraryÿ…...

java使用openOffice将excel转换pdf时,将所有列显示在一页
1.接上文,格式转换的基础问题已解决,但还有些细节问题需要单独处理,如excel转换至pdf时,如何将所有列显示在一页的问题,此问题大家都有遇到,解决方案也比较多,我也尝试过重写某类,来…...

python数据容器
目录 数据容器 反向索引 list列表 语法 案例 列表的特点 列表的下表索引 list的常用操作 list列表的遍历 while循环遍历 for循环遍历 tuple元组 前言 元组定义 元组特点 获取元组元素 元组的相关操作 元组的遍历 while循环遍历 for循环遍历 字符串 前言…...

【TypeScript】中定义与使用 Class 类的解读理解
目录 类的概念类的继承 :类的存取器:类的静态方法与静态属性:类的修饰符:参数属性:抽象类:类的类型: 总结: 类的概念 类是用于创建对象的模板。他们用代码封装数据以处理该数据。JavaScript 中的…...

好用的数据库管理软件之idea(idea也有数据库???)
1.建立maven项目(maven项目添加依赖,对于后期连接数据库很方便) 2.连接数据库。。。 这里一定注意端口号,不要搞错了 和上一张图片不一样哦 3.数据库测试代码。。。 然后你就可以在这里边写MySQL代码了,这个工具对于新…...
《操作系统-李治军》测验错题集
章节测试1 启动保护模式以后,指令jmpi 0, 8执行和没有启动保护模式有何区别?() 答:得出跳转地址的方式不同 实模式:cs<<4 ip 保护模式:cs查表 ip 在系统调用的实现中,在i…...

DP-GAN-判别器代码
将输出的rgb作为输入,输入到判别器中。接着执行一个for循环,看一下body_down列表的组成和x经过body_down之后的值。 body_down是由残差块D组成的列表: 残差块的参数为:(3,128),(128,128),(128,256),(256,256),(256,512),(512,5…...

基于多线程实现服务器并发
看大丙老师的B站视频总结的笔记19-基于多线程实现服务器并发分析_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1F64y1U7A2/?p19&spm_id_frompageDriver&vd_sourcea934d7fc6f47698a29dac90a922ba5a3 思路:首先accept是有一个线程的,另外…...

Golang之路---03 面向对象——接口与多态
接口与多态 何为接口 在面向对象的领域里,接口一般这样定义:接口定义一个对象的行为。接口只指定了对象应该做什么,至于如何实现这个行为(即实现细节),则由对象本身去确定。 在 Go 语言中,…...

一条自由游动的鲸鱼
先看效果: 再看代码: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>鲸鱼</title><style>#canvas-container {width: 100%;height: 100vh;overflow: hidden;}&l…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
C++:std::is_convertible
C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!
本文介绍了一种名为AnomalyAny的创新框架,该方法利用Stable Diffusion的强大生成能力,仅需单个正常样本和文本描述,即可生成逼真且多样化的异常样本,有效解决了视觉异常检测中异常样本稀缺的难题,为工业质检、医疗影像…...