当前位置: 首页 > news >正文

RabbitMQ高级-高级特性

1.消息可靠性传递

在使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式

1.confirm 确认模式        确认模式是由exchange决定的

2.return 退回模式        回退模式是由routingKey决定的

rabbitmq整个消息投递的路径为:

producer -> rabbitmq broker -> exchange -> queue ->consumer

消息从producer到exchange则会返回一个confirmCallback

消息从exchange->queue投递失败则会返回一个returnCallback

我们将利用这两个callback控制消息的可靠性投递

创建rabbitmq-producer-spring项目 并配置

配置文件:开启模式

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory确认模式开启:publisher-confirms="true"回退模式开启: publisher-returns="true"--><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"publisher-confirms="true"  publisher-returns="true"/><!--定义管理交换机、队列--><rabbit:admin connection-factory="connectionFactory"/><!--定义rabbitTemplate对象操作可以在代码中方便发送消息--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/><!--消息可靠性投递(生产端)--><rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue><rabbit:direct-exchange name="test_exchange_confirm"><rabbit:bindings><rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding></rabbit:bindings></rabbit:direct-exchange></beans>

确认模式 (publisher-confirms="true") 

@RunWith(SpringRunner.class)
@ContextConfiguration(locations = "classpath:/spring-rabbitmq-producer.xml")
public class ProducerTest {//@Autowired   在配置文件配好了//RabbitAdmin rabbitAdmin;//用于操作交换机、队列@Autowiredprivate RabbitTemplate rabbitTemplate;//确认模式,收到消息与否都会被执行@Testpublic void testConfirm() {//1. 开启去确认回调函数  publisher-confirms="true"//2. 注册回调函数,当交换机收到消息,confirm方法会被调用rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlationData = " + correlationData); //关联数据,在发送消息时指定的//确认机制:ack true表示收到了 false表示没收到if (ack){System.out.println("接收成功消息" + ack);}else {System.out.println("接收失败消息" + cause);//做一些处理,让消息再次发送。}}});//3. 发送消息rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "confirm info...");//成功//测试不正常情况//rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm....");//失败}
}

回退模式 (publisher-returns="true")

@RunWith(SpringRunner.class)
@ContextConfiguration(locations = "classpath:/spring-rabbitmq-producer.xml")
public class ProducerTest {//@Autowired   在配置文件配好了//RabbitAdmin rabbitAdmin;//用于操作交换机、队列@Autowiredprivate RabbitTemplate rabbitTemplate;//回退模式@Testpublic void testReturn() {//1.开启回退模式 publisher-returns="true"//2.注册回退函数(ReturnCallBack),当没有任何队列与交换机绑定,或即使绑定也没有任何匹配时,将执行回退方法//      成功接收,不执行return方法//3. 必须设置强制回退 setMandatory(true)  设置交换机处理失败消息的模式rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {System.out.println("消息对象message = " + message);System.out.println("错误码replyCode = " + replyCode);System.out.println("错误信息replyText = " + replyText);System.out.println("交换机exchange = " + exchange);System.out.println("路由键routingKey = " + routingKey);//处理}});//3. 发送消息//rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "confirm info...");//成功//测试不正常情况rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm123", "confirm info...");//成功}}

小结:

设置 ConnectionFactory的publisher-confirms="true" 开启 确认模式

使用 rabbitTemplate.setConfirmCallback 设置回调函数。当消息发送到 exchange 后回调 confirm 方法。在方法中判断 ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

设置 ConnectionFactory 的 publisher-returns="true" 开启 退回模式

使用 rabbitTemplate.setReturnCallback 设置退回函数,当消息从exchange 路由到 queue 失败后,如果设置了 rabbitTemplate.setMandatory(true) 参数,则会将消息退回给 producer并执行回调函数returnedMessage

2.Consumer Ack (消费端确认方式)

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式

有三种确认方式:

自动确认:acknowledge=“none”   默认

手动确认:acknowledge=“manual”

根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息

创建rabbitmq-consumer-spring项目并配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><context:component-scan base-package="com.listener" /><!--定义监听器容器默认为自动确认acknowledge="manual":手动签收--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"><!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>--><rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener></rabbit:listener-container>
</beans>
package com;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {@Testpublic void test(){while (true){}}
}

1.自动确认 (默认) 

//自动确认
@Component
//implements MessageListener 自动确认
//ChannelAwareMessageListener 手动确认
public class AckListener implements MessageListener {//自动确认@Overridepublic void onMessage(Message message) {try {System.out.println(new String(message.getBody(),"UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

2.手动确认 (acknowledge="manual":手动签收) 

package com.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import javax.sound.midi.Soundbank;
import java.io.UnsupportedEncodingException;/*
* 1.设置手动签收:acknowledge=”manual“
* 2.让监听器类实现ChannelAwareMessageListener接口
* 3.如果消息成功处理,则调用channel的basicAck()签收
* 4.如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer
* */
@Component
//implements MessageListener 自动确认
//ChannelAwareMessageListener 手动确认
public class AckListener implements ChannelAwareMessageListener {//手动确认@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//消息唯一标识long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.接收转换消息System.out.println(new String(message.getBody(),"UTF-8"));//消息得到后需要进行业务逻辑处理,如存储数据库,发送邮件、短信、保存到Redis等等//2.处理业务逻辑System.out.println("处理业务逻辑...");int i = 3/0;//模拟业务异常//3.手动签收//业务逻辑成功,才会确认,否则不予确认channel.basicAck(deliveryTag,true);//消息唯一标识和是否确认多个} catch (Exception e) {e.printStackTrace();//4.拒绝签收//NoAck 不予确认/**  三个参数*  deliveryTag 消息标签,唯一标识,类似数据库主键*  multiple true 拒绝接收的所有消息 false,拒绝刚刚的消息*  requeue true 表示是否重回原有队列,false表示丢弃或成为死信 (被死信队列接收)*/channel.basicNack(deliveryTag,true,false);}}
}

3.消费端限流 (prefetch="2"限流为2) 

 

就是在消费这些消息的时候做了限流

修改配置

 <context:component-scan base-package="com.listener" /><!--定义监听器容器默认为自动确认acknowledge="manual":手动签收--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2"><!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>--><rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener></rabbit:listener-container>

新建 com.listener.QosListener 

package com.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** Consumer 限流机制*  1. 确保消息被确认。不确认是不继续处理其他消息的*  2. listener-container配置属性*      prefetch=1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉取下一条消息。*/
@Component
public class QosListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1.获取消息System.out.println(new String(message.getBody()));//2.确认消息channel.basicAck(deliveryTag,true);}catch (Exception e){e.printStackTrace();channel.basicNack(deliveryTag,true,false);}}
}

小结:

在<rabbit:listener-container>中配置prefetch属性设置消费端一次拉去多少消息

消费端的确认模式一定为手动确认。acknowledge=“manual”

4.TTL (消息存活时间)

TTL 全称 Time To Live(存活时间/过期时间)

当消息到达存活时间后,还没有被消费,会被自动清除

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

控制后台演示消息过期 

修改管理后台界面,增加队列

参数:表示过期时间,单位毫秒 ,10000表示10秒

增加交换机

绑定队列

发送消息

Delivery mode:2-Persistent表示需要进行持久化

查看消息,可以看到消息,但十秒之后,消息自动消失,因为我们设置了十秒消息过期

代码实现 

修改配置文件 spring-rabbitmq-producer.xml

    <!--TTL 队列--><rabbit:queue name="test_queue_ttl" id="test_queue_ttl"><!--设置queue的参数--><rabbit:queue-arguments><!--设置x-message-ttl队列的过期时间默认情况下value-type的类型是String类型,但时间的类型是number类型,所以需要设置成integer类型--><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry><!--指定类型?因为队列已经存在,配置队列需要和已存在队列属性相同--><entry key="x-queue-type" value="classic" value-type="java.lang.String"></entry></rabbit:queue-arguments></rabbit:queue><!--设置交换机--><rabbit:topic-exchange name="test_exchange_ttl"><!--交换机绑定队列--><rabbit:bindings><rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>

在测试类 ProducerTest 中,添加测试方法,发送消息 

    /*** TTL:过期时间*  1. 队列统一过期*  2. 消息单独过期* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。*/@Testpublic void testTTL() {//队列统一过期for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl");}}@Testpublic void testMessageTtl() {// 消息后处理对象,设置一些消息的参数信息MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置message的信息// 第二个方法:消息的过期时间 ,5秒之后过期message.getMessageProperties().setExpiration("5000");//2.返回该消息return message;}};

小结

设置消息队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期

如果两者都进行了设置,以时间段的为准

5.DLX(死信队列)

死信队列,英文缩写:DLX 。DeadLetter Exchange(死信交换机),无法被消费的消息,当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX

面试题: 消息成为死信的三种情况:

队列消息数量到达限制;比如队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列

消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false

消息超时,原队列存在消息过期设置,消息到达超时时间未被消费

队列绑定死信交换机:

给队列设置参数:x-dead-letter-exchange和x-dead-letter-routing-key

修改配置文件 spring-rabbitmq-producer.xml

    <!--死信队列--><!--1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)--><rabbit:queue name="test_queue_dlx" id="test_queue_dlx"><!--3. 正常队列绑定死信交换机 设置条件--><rabbit:queue-arguments><!--3.1 x-dead-letter-exchange:死信交换机名称--><entry key="x-dead-letter-exchange" value="exchange_dlx"/><!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey--><entry key="x-dead-letter-routing-key" value="dlx.hehe"></entry><!--4.1 设置队列的过期时间 ttl--><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/><!--4.2 设置队列的长度限制 max-length --><entry key="x-max-length" value="10" value-type="java.lang.Integer"/></rabbit:queue-arguments></rabbit:queue><!--正常交换机--><rabbit:topic-exchange name="test_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)--><rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue><rabbit:topic-exchange name="exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>

在测试类 ProducerTest 中,添加测试方法

三种成为死信情况 

    /*发送测试死信消息:1、过期时间2、长度限制3、消息拒收*/@Testpublic void testDlx(){//1. 测试过期时间,死信消息//rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");//2. 测试长度限制后,消息死信/*for (int i = 0; i < 11; i++) {rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?i =" + i);}*///3.发送消息:拒收情况rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}

消费端需创建DlxListener并在配置添加对应监听容器 

小结

1.死信交换机和死信队列和普通的没有区别

2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

6.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费

需求场景:

1. 下单后,30分钟未支付,取消订单,回滚库存

2. 新用户注册成功30分钟后,发送短信问候

实现方式

1.定时器

2.延迟队列

但是RabbitMQ没有提供延迟队列功能

但是可以使用:TTL+死信队列 组合实现延迟队列的效果

代码实现 

修改配置文件 spring-rabbitmq-producer.xml

<!--延迟队列:1、定义正常交换机(order_exchange)和队列(order_queue)2、定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)3、绑定,设置正常队列过期时间为30分钟
--><!--1、定义正常交换机(order_exchange)和队列(order_queue)--><rabbit:queue name="order_queue" id="order_queue"><!--3、绑定,设置正常队列过期时间为30分钟--><rabbit:queue-arguments><entry key="x-dead-letter-exchange" value="order_exchange_dlx"></entry><entry key="x-dead-letter-routing-key" value="dlx.order.cancel"></entry><entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/></rabbit:queue-arguments></rabbit:queue><rabbit:topic-exchange name="order_exchange"><rabbit:bindings><rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange><!--2、定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)--><rabbit:queue name="order_queue_dlx" id="order_queue_dlx"></rabbit:queue><rabbit:topic-exchange name="order_exchange_dlx"><rabbit:bindings><rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding></rabbit:bindings></rabbit:topic-exchange>

生产端:ProducerTest

package com;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@ContextConfiguration(locations = "classpath:/spring-rabbitmq-producer.xml")
public class ProducerTest {//@Autowired   在配置文件配好了//RabbitAdmin rabbitAdmin;//用于操作交换机、队列@Autowiredprivate RabbitTemplate rabbitTemplate;//确认模式,收到消息与否都会被执行@Testpublic void testConfirm() {//1. 开启去确认回调函数  publisher-confirms="true"//2. 注册回调函数,当交换机收到消息,confirm方法会被调用rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlationData = " + correlationData); //关联数据,在发送消息时指定的//确认机制:ack true表示收到了 false表示没收到if (ack){System.out.println("接收成功消息" + ack);}else {System.out.println("接收失败消息" + cause);//做一些处理,让消息再次发送。}}});//3. 发送消息rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "confirm info...");//成功//测试不正常情况//rabbitTemplate.convertAndSend("test_exchange_confirm000", "confirm", "message confirm....");//失败}@Testpublic void testSend(){for (int i = 0; i < 10; i++) {//发送消息rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","confirm info...");}}/*** TTL:过期时间*  1. 队列统一过期*  2. 消息单独过期* 如果设置了消息的过期时间,也设置了队列的过期时间,它以时间短的为准。*/@Testpublic void testTTL() {//队列统一过期for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl");}}@Testpublic void testMessageTtl() {// 消息后处理对象,设置一些消息的参数信息MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1.设置message的信息// 第二个方法:消息的过期时间 ,5秒之后过期message.getMessageProperties().setExpiration("5000");//2.返回该消息return message;}};}/*发送测试死信消息:1、过期时间2、长度限制3、消息拒收*/@Testpublic void testDlx(){//1. 测试过期时间,死信消息//rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");//2. 测试长度限制后,消息死信/*for (int i = 0; i < 11; i++) {rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?i =" + i);}*///3.发送消息:拒收情况rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");}@Testpublic void testDelay() throws InterruptedException {//1、发送订单消息。将来是在订单系统中,下单成功后,发送消息rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=333333");//2、打印倒计时for (int i = 10; i > 0; i--) {System.out.println(i+"...");Thread.sleep(1000);}}
}

消费端:OrderListener

package com.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;@Component
public class OrderListener implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(1000);long deliveryTag = message.getMessageProperties().getDeliveryTag();try{//1.接收转换消息System.out.println(new String(message.getBody()));//2.处理业务逻辑System.out.println("处理业务逻辑...");System.out.println("根据订单id查询其状态...");System.out.println("判断状态是否为支付成功?");System.out.println("取消订单,回滚库存!");//3.手动签收channel.basicAck(deliveryTag,true);}catch (Exception e){//4.拒绝签收/** 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端** */System.out.println("出现异常,拒绝接收!");//拒绝接收,不重回队列requeue=falsechannel.basicNack(deliveryTag,true,false);//channel.basicReject(deliveryTag,true);只允许单挑确认}}
}

修改配置:spring-rabbitmq-consumer.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--加载配置文件--><context:property-placeholder location="classpath:rabbitmq.properties"/><!-- 定义rabbitmq connectionFactory --><rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"port="${rabbitmq.port}"username="${rabbitmq.username}"password="${rabbitmq.password}"virtual-host="${rabbitmq.virtual-host}"/><context:component-scan base-package="com.listener" /><!--定义监听器容器默认为自动确认acknowledge="manual":手动签收--><rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="2"><!--<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>--><!--<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>--><!--定义监听器,监听正常的队列--><!--<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"></rabbit:listener>--><!--注意实现延迟队列功能时,消费端监听的是死信队列--><rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener></rabbit:listener-container></beans>

消费端:ConsumerTest

package com;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {@Testpublic void test(){while (true){}}
}

7.日志与监控

1.RabbitMQ日志

RabbitMQ默认日志存放路径: /var/log/rabbitmq/rabbit@xxx.log

日志包含了RabbitMQ的版本号,Erlang的版本号,RabbitMQ服务结点名称,cookie的hash值,RabbitMQ配置文件地址,内存限制,磁盘限制,默认账户guest的创建以及权限配置等等

2.web管控台监控 

8.消息追踪-Firehose 

在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位

在RabbitMQ中可以使用 Firehose 和 rabbitmq_tracing插件 功能来实现消息追踪

firehose的机制是将生产者投递给rabbitmq的消息,rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace,它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称,分别对应生产者投递到exchange的消息,和消费者从queue上获取的消息

注意: 打开 trace 会影响消息写入功能,适当打开后请关闭

开启Firehose命令:

rabbitmqctl trace_on

关闭Firehose命令:

rabbitmqctl trace_off

相关文章:

RabbitMQ高级-高级特性

1.消息可靠性传递 在使用RabbitMQ的时候&#xff0c;作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性模式 1.confirm 确认模式 确认模式是由exchange决定的 2.return 退回模式 回退模式是由routing…...

Word粘贴时出现“运行时错误53,文件未找到:MathPage.WLL“的解决方案

在安装完MathType后&#xff0c;打开word复制粘贴时报错“运行时错误53,文件未找到&#xff1a;MathPage.WLL” 首先确定自己电脑的位数&#xff08;这里默认32位&#xff09; 右击MathType桌面图标&#xff0c;点击“打开文件所在位置”&#xff0c; 然后分别找到MathPage.W…...

html元素基本使用

前言 大家好&#xff0c;我是jiantaoyab&#xff0c;第一次学习前端的html&#xff0c;写一篇笔记总结常用的元素 语义化 例如只要是 不管字体的大小是怎么样&#xff0c;有没有加粗都是标题&#xff0c;元素显示到页面中的效果应该由css决定&#xff0c;这就是语义化。 文…...

PHP+golang开源办公系统CRM管理系统

基于ThinkPHP6 Layui MySQL的企业办公系统。集成系统设置、人事管理、消息管理、审批管理、日常办公、客户管理、合同管理、项目管理、财务管理、电销接口集成、在线签章等模块。系统简约&#xff0c;易于功能扩展&#xff0c;方便二次开发。 服务器运行环境要求 PHP > 7.…...

smartmontools-5.43交叉编译Smartctl

嵌入式系统的sata盘经常故障&#xff0c;需要使用smatctl工具监控和诊断sata故障。 1. 从网上下载开源smartmontools-5.43包。 2. 修改makefile进行交叉编译。 由于软件包中已经包含Makefile.am&#xff0c;Makefile.in。直接运行 automake --add-missing 生成Makefile。 3.…...

idea找不到或无法加载主类

前言 今天在运行项目的时候突然出了这样一个错误&#xff1a;IDEA 错误 找不到或无法加载主类,相信只要是用过IDEA的朋友都 遇到过它吧&#xff0c;但是每次遇到都是一顿焦头烂额、抓耳挠腮、急赤白咧&#xff01;咋整呢&#xff1f;听我给你吹~ 瞧我这张嘴~ 问题报错 找不…...

2.二进制的方式读写文件

文章目录 写入文件代码运行结果 读出文件代码运行结果 文件打开模式标记&#xff08;查表&#xff09; 写入文件 ------写文件一共五步&#xff1a;------ 第一步&#xff1a;包含头文件 第二步&#xff1a;创建流对象 第三步&#xff1a;指定方式打开文件 第四步&#xff1a;…...

Seata的详细解释

什么是seata Seata&#xff08;Simple Extensible Autonomous Transaction Architecture&#xff09;是一个开源的分布式事务解决方案。它是由阿里巴巴集团开发的&#xff0c;旨在解决分布式系统中的事务一致性问题。 Seata提供了一种简单易用的方式来实现跨多个数据库和服务的…...

JS手写实现洋葱圈模型

解释洋葱圈模型&#xff1a; 当我们执行第一个中间件时&#xff0c;首先输出1&#xff0c;然后调用next()&#xff0c;那么此时它会等第二个中间件执行完毕才会继续执行第一个中间件。然后执行第二个中间件&#xff0c;输出3&#xff0c;调用next()&#xff0c;执行第三中间件…...

3.Windows下安装MongoDB和Compass教程

Windows下安装MongoDB 总体体验下来&#xff0c;&#xff0c;要比MySQL的安装简单了许多&#xff0c;没有过多的配置&#xff0c;直接就上手了&#xff01; 1、下载 进入官方的下载页面https://www.mongodb.com/try/download/community&#xff0c;如下选择&#xff0c;我选…...

go反射实战

文章目录 demo1 数据类型判断demo2 打印任意类型数据 demo1 数据类型判断 使用reflect.TypeOf()方法打印go中数据类型&#xff0c;可参考go官方API文档;使用格式化参数%T也能打印数据类型。 package mainimport "fmt" import "reflect" import "io&…...

Docker 中 MySQL 的部署与管理

目录 一、Docker 中部署 MySQL1.1 部署 MySQL1.2 进入容器并创建数据库1.3 Navicat 可视化工具连接 二、可能存在的问题2.1 1130 - Host ‘172.17.0.1‘ is not allowed to connect to this MySQL server 参考资料 一、Docker 中部署 MySQL 1.1 部署 MySQL 首先&#xff0c;从…...

基础练习题之函数

前言 这些题目来自与一些刷题网站,以及c primer plus,继续练习 第一题 给你一个数&#xff0c;让他进行巴啦啦能量&#xff0c;沙鲁沙鲁&#xff0c;小魔仙大变身&#xff0c;如果进行变身的数不满足条件的话&#xff0c;就继续让他变身。。。直到满足条件为止。 巴啦啦能量…...

Java NIO浅析

NIO&#xff08;Non-blocking I/O&#xff0c;在Java领域&#xff0c;也称为New I/O&#xff09;&#xff0c;是一种同步非阻塞的I/O模型&#xff0c;也是I/O多路复用的基础&#xff0c;已经被越来越多地应用到大型应用服务器&#xff0c;成为解决高并发与大量连接、I/O处理问题…...

数据挖掘与大数据的结合

随着大数据技术的不断发展和普及&#xff0c;数据挖掘在大数据环境下的应用也变得更加广泛和深入。以下将探讨大数据技术对数据挖掘的影响&#xff0c;以及如何利用大数据技术处理海量数据并进行有效的数据挖掘&#xff0c;同时分析大数据环境下的数据挖掘挑战和解决方案。 1.…...

分布式链路追踪(一)SkyWalking(2)使用

一、使用方法 1、简介 agent探针可以让我们不修改代码的情况下&#xff0c;对Java应用上使用到的组件进行动态监控&#xff0c;获取运行数据发送到OAP上进行统计和存储。agent探针在Java使用中是使用Java agent技术实现。不需要更改任何代码&#xff0c;Java agent会通过虚拟…...

【QT入门】VS2019+QT的开发环境配置

声明&#xff1a;该专栏为本人学习Qt知识点时候的笔记汇总&#xff0c;希望能给初学的朋友们一点帮助(加油&#xff01;) 往期回顾&#xff1a; 【QT入门】什么是qt&#xff0c;发展历史&#xff0c;特征&#xff0c;应用&#xff0c;QtCreator-CSDN博客【QT入门】Windows平台下…...

RTP 控制协议 (RTCP) 反馈用于拥塞控制

摘要 有效的 RTP 拥塞控制算法&#xff0c;需要比标准 RTP 控制协议(RTCP)发送方报告(SR)和接收方报告(RR)数据包提供的关于数据包丢失、定时和显式拥塞通知 (ECN) 标记的更细粒度的反馈。 本文档描述了 RTCP 反馈消息&#xff0c;旨在使用 RTP 对交互式实时流量启用拥塞控制…...

基于SpringBoot SSM vue办公自动化系统

基于SpringBoot SSM vue办公自动化系统 系统功能 登录 个人中心 请假信息管理 考勤信息管理 出差信息管理 行政领导管理 代办事项管理 文档管理 公告信息管理 企业信息管理 会议室信息管理 资产设备管理 员工信息管理 开发环境和技术 开发语言&#xff1a;Java 使用框架: S…...

SpingBoot集成Rabbitmq及Docker部署

文章目录 介绍RabbitMQ的特点Rabbitmq术语消息发布接收流程 Docker部署管理界面说明Overview: 这个页面显示了RabbitMQ服务器的一般信息&#xff0c;例如集群节点的名字、状态、运行时间等。Connections: 在这里&#xff0c;可以查看、管理和关闭当前所有的TCP连接。Channels: …...

子组件自定义事件$emit实现新页面弹窗关闭之后父界面刷新

文章目录 需求弹窗关闭之后父界面刷新展示最新数据 实现方案AVUE 大文本默认展开slotVUE 自定义事件实现 父界面刷新那么如何用呢? 思路核心代码1. 事件定义2. 帕斯卡命名组件且在父组件中引入以及注册3. 子组件被引用与父事件监听4.父组件回调函数 5.按钮弹窗事件 需求 弹窗…...

【框架】跨端开发框架介绍(Windows/MacOS/Linux/Andriod/iOS/H5/小程序)

1. 跨端框架介绍 跨端框架 基本信息 说明 移动端 &#xff08;性能&#xff1a;uniapp < ReactNative < Flutter&#xff09; uniapp 注&#xff1a;weex已经嵌入uniapp 适用范围&#xff1a;Andriod、iOS、H5、国产小程序、快应用 引擎&#xff1a; 所属公司&#x…...

亚马逊云科技 Lambda 运行selenium

有些定时任务需要使用自动化测试的工具&#xff0c;如果使用亚马逊云科技 Lambda来实现这个功能的话&#xff0c;那么就需要图形框架&#xff0c;而我们知道lambda其实是一个虚拟机&#xff0c;而且按照系统级别依赖比较困难。所以这里选择使用容器的形式进行发布。 在dockerf…...

算法——前缀和之除自身以外数组的乘积、和为K的子数组、和可被K整除的子数组、连续数组、矩阵区域和

这几道题对于我们前面讲过的一维、二维前缀和进行了运用,包含了面对特殊情况的反操作 目录 4.除自身以外数组的乘积 4.1解析 4.2题解 5.和为K的子数组 5.1解析 5.2题解 6.和可被K整除的子数组 6.1解析 6.2题解 7.连续数组 7.1题解 7.2题解 8.矩阵区域和 8.1解析 …...

Text-to-SQL 工具Vanna + MySQL本地部署 | 数据库对话机器人

今天我们来重点研究与实测一个开源的Text2SQL优化框架 – Vanna 1. Vanna 简介【Text-to-SQL 工具】 Vanna 是一个基于 MIT 许可的开源 Python RAG&#xff08;检索增强生成&#xff09;框架&#xff0c;用于 SQL 生成和相关功能。它允许用户在数据上训练一个 RAG “模型”&a…...

linux最佳入门(笔记)

1、内核的主要功能 2、常用命令 3、通配符&#xff1a;这个在一些启动文件中很常见 4、输入/输出重定向 意思就是将结果输出到别的地方&#xff0c;例如&#xff1a;ls标准会输出文件&#xff0c;默认是输出到屏幕&#xff0c;但是用>dir后&#xff0c;是将结果输出到dir文…...

加速 PyTorch 模型预测常见方法梳理

目录 1. 使用 GPU 加速 2. 批量推理 3. 使用半精度浮点数 (FP16) 4. 禁用梯度计算 5. 模型简化与量化 6. 使用 TorchScript 7. 模型并行和数据并行 结论 在使用 PyTorch 进行模型预测时&#xff0c;可以通过多种方法来加快推理速度。以下是一些加速模型预测的常用方法&…...

【STM32定时器 TIM小总结】

STM32 TIM详解 TIM介绍定时器类型基本定时器通用定时器高级定时器常用名词时序图预分频时序计数器时序图 定时器中断配置图定时器定时 代码调试 TIM介绍 定时器&#xff08;Timer&#xff09;是微控制器中的一个重要模块&#xff0c;用于生成定时和延时信号&#xff0c;以及处…...

RISC-V 编译环境搭建:riscv-gnu-toolchain 和 riscv-tools

RISC-V 编译环境搭建&#xff1a;riscv-gnu-toolchain 和 riscv-tools 编译环境搭建以及说明 操作系统&#xff1a;什么系统都可以 虚拟机&#xff1a;VMmare Workstation Pro 17.50.x (版本不限) 编译环境&#xff1a;Ubuntu 18.04.5 CPU&#xff1a;i7-8750h(虚拟机分配4核…...

一文速通ESP32(基于MicroPython)——含示例代码

ESP32 简介 ESP32-S3 是一款集成 2.4 GHz Wi-Fi 和 Bluetooth 5 (LE) 的 MCU 芯片&#xff0c;支持远距离模式 (Long Range)。ESP32-S3 搭载 Xtensa 32 位 LX7 双核处理器&#xff0c;主频高达 240 MHz&#xff0c;内置 512 KB SRAM (TCM)&#xff0c;具有 45 个可编程 GPIO 管…...