当前位置: 首页 > news >正文

SpringCloud微服务技术栈.黑马跟学(十二)

SpringCloud微服务技术栈.黑马跟学 十二

  • 今日目标
  • 服务异步通信-高级篇
  • 1.消息可靠性
    • 1.1.生产者消息确认
      • 1.1.1.修改配置
      • 1.1.2.定义Return回调
      • 1.1.3.定义ConfirmCallback
    • 1.2.消息持久化
      • 1.2.1.交换机持久化
      • 1.2.2.队列持久化
      • 1.2.3.消息持久化
    • 1.3.消费者消息确认
      • 1.3.1.演示none模式
      • 1.3.2.演示auto模式
    • 1.4.消费失败重试机制
      • 1.4.1.本地重试
      • 1.4.2.失败策略
    • 1.5.总结
  • 2.死信交换机
    • 2.1.初识死信交换机
      • 2.1.1.什么是死信交换机
      • 2.1.2.利用死信交换机接收死信(拓展)
      • 2.1.3.总结
    • 2.2.TTL
      • 2.2.1.接收超时死信的死信交换机
      • 2.2.2.声明一个队列,并且指定TTL
      • 2.2.3.发送消息时,设定TTL
      • 2.2.4.总结
    • 2.3.延迟队列
      • 2.3.1.安装DelayExchange插件
        • 1.安装DelayExchange插件
        • 2.下载插件
        • 3.上传插件
        • 4.安装插件
      • 2.3.2.DelayExchange原理
      • 2.3.3.使用DelayExchange
        • 1)声明DelayExchange交换机
        • 2)发送消息
      • 2.3.4.总结
  • 3.惰性队列
    • 3.1.消息堆积问题
    • 3.2.惰性队列
      • 3.2.1.基于命令行设置lazy-queue
      • 3.2.2.基于@Bean声明lazy-queue
      • 3.2.3.基于@RabbitListener声明LazyQueue
      • 3.3.总结
  • 4.MQ集群
    • 4.1.集群分类
    • 4.2.普通集群
      • 4.2.1.集群结构和特征
      • 4.2.2.部署
        • 1.集群分类
        • 2.获取cookie
        • 3.准备集群配置
        • 4.启动集群
        • 5.测试
          • 6.数据共享测试
          • 7.可用性测试
    • 4.3.镜像集群
      • 4.3.1.集群结构和特征
      • 4.3.2.部署
      • 1.镜像模式的特征
        • 2.镜像模式的配置
        • 3.exactly模式
        • 4.模式
        • 5.nodes模式
        • 6.测试
        • 7.测试数据共享
        • 8.测试高可用
  • 5.仲裁队列
    • 4.4.仲裁队列
      • 4.4.1.集群特征
      • 4.4.2.部署
        • 1.添加仲裁队列
        • 2.测试
        • 3.集群扩容
        • 4.加入集群
        • 5.增加仲裁队列副本
      • 4.4.3.Java代码创建仲裁队列
      • 4.4.4.SpringAMQP连接MQ集群

今日目标

在这里插入图片描述

服务异步通信-高级篇

消息队列在使用过程中,面临着很多实际问题需要思考:
在这里插入图片描述

1.消息可靠性

消息从发送,到消费者接收,会经理多个过程:
在这里插入图片描述
其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机

针对这些问题,RabbitMQ分别给出了解决方案:

  • 生产者确认机制
  • mq持久化
  • 消费者确认机制
  • 失败重试机制

下面我们就通过案例来演示每一个步骤。
首先,导入课前资料提供的demo工程:
在这里插入图片描述
项目结构如下:
在这里插入图片描述
用docker启动即可

docker start mq

要创建一个队列起名simple.queue
在这里插入图片描述
然后在交换机中把amq.topic交换机,和上面创建的队列simple.queue绑定,我们手动配置
在这里插入图片描述
进入amq.topic交换机后,绑定队列
在这里插入图片描述
绑定后如图:
在这里插入图片描述

1.1.生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。

返回结果有两种方式:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。

在这里插入图片描述
在这里插入图片描述

注意:
在这里插入图片描述

1.1.1.修改配置

首先,修改publisher服务中的application.yml文件,添加下面的内容:

spring:rabbitmq:publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true

说明:

  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated⭐:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

1.1.2.定义Return回调

每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置:

修改publisher服务,添加一个:

package cn.itcast.mq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}

1.1.3.定义ConfirmCallback

ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。

在publisher服务的cn.itcast.mq.spring.SpringAmqpTest类中,定义一个单元测试方法:

public void testSendMessage2SimpleQueue() throws InterruptedException {// 1.消息体String message = "hello, spring amqp!";// 2.全局唯一的消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 3.添加callbackcorrelationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()));// 4.发送消息rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执Thread.sleep(2000);
}

全部配置完后,运行测试类SpringAmqpTest.java,这说明消息发送成功
在这里插入图片描述
然后呢,我们来一个消息发送失败的情况,我们故意填错交换机的名字
在这里插入图片描述
调用后,后台打印日志如下:
在这里插入图片描述
然后我们尝试填错,routingKey看一下
在这里插入图片描述
报错信息如下:
在这里插入图片描述
之后我们恢复代码,都保证正确即可

总结:
SpringAMQP中处理消息确认的几种情况:
● publisher-comfirm:

  • 消息成功发送到exchange,返回ack
  • 消息发送失败,没有到达交换机,返回nack
  • 消息发送过程中出现异常,没有收到回执

● 消息成功发送到exchange, 但没有路由到queue,

  • 调用ReturnCallback

1.2.消息持久化

生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。

要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
  • 消息持久化

1.2.1.交换机持久化

RabbitMQ中交换机默认是非持久化的,mq重启后就丢失。

我们通过命令
重启mq

docker restart mq

然后查看队列、交换机的情况,比如我们创建的是持久化队列
在这里插入图片描述
SpringAMQP中可以通过代码指定交换机持久化:

@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}

事实上,默认情况下,由SpringAMQP声明的交换机都是持久化的。

可以在RabbitMQ控制台看到持久化的交换机都会带上D的标示:
在这里插入图片描述

1.2.2.队列持久化

RabbitMQ中队列默认是非持久化的,mq重启后就丢失。
SpringAMQP中可以通过代码指定交换机持久化:

我们可以先去mq图形化界面把simple.queue删除

@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}

事实上,默认情况下,由SpringAMQP声明的队列都是持久化的。
可以在RabbitMQ控制台看到持久化的队列都会带上D的标示:
在这里插入图片描述

这些做完后,我们启动ConsumerApplication.java,然后查看mq的图形化界面
交换机是持久的
在这里插入图片描述
队列是持久的
在这里插入图片描述

1.2.3.消息持久化

首先把consumer服务停了,不要消费我们的消息
我们在mq的图形化界面,点击simple.queue队列,然后编辑消息,点击发送
在这里插入图片描述
查看有1条消息
在这里插入图片描述
然后我们重启docker中的mq

docker restart mq

然后再回来看mq的图形化界面,发现队列还在,但是消息没了
在这里插入图片描述

利用SpringAMQP发送消息时,可以设置消息的属性(MessageProperties),指定delivery-mode:

  • 1:非持久化
  • 2:持久化

用java代码指定:
在这里插入图片描述

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
运行测试类SpringAmqpTest.java之后,查看mq的图形化界面
在这里插入图片描述
查看一下具体消息
在这里插入图片描述
然后我们重启一下docker的mq容器

docker restart mq

在这里插入图片描述

注意:AMQP中创建的交换机、队列、消息默认都是持久的
交换机:
在这里插入图片描述
队列:
在这里插入图片描述

消息:
在这里插入图片描述
在这里插入图片描述

1.3.消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

设想这样的场景:

  • 1)RabbitMQ投递消息给消费者
  • 2)消费者获取消息后,返回ACK给RabbitMQ
  • 3)RabbitMQ删除消息
  • 4)消费者宕机,消息尚未处理

这样,消息就丢失了。因此消费者返回ACK的时机非常重要。

而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto⭐:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

由此可知:

  • none模式下,消息投递是不可靠的,可能丢失
  • auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
  • manual:自己根据业务情况,判断什么时候该ack

一般,我们都是使用默认的auto即可。

1.3.1.演示none模式

修改consumer服务的application.yml文件,添加下面内容:

spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理异常:
修改SpringRabbitListener.java

@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {log.info("消费者接收到simple.queue的消息:【{}】", msg);// 模拟异常System.out.println(1 / 0);log.debug("消息处理完成!");
}

测试可以发现,当消息处理抛异常时,消息依然被RabbitMQ删除了。
dubug启动Consumer
发现消息还没接收呢,直接就没了
在这里插入图片描述

在这里插入图片描述
也就是说,消费者虽然接收到了消息,但是假如消费者还没有读取,发生了报错或者宕机,这个消息就会丢失

1.3.2.演示auto模式

再次把确认机制修改为auto:

spring:rabbitmq:listener:simple:acknowledge-mode: auto # 关闭ack

我们去mq的图形化界面创建消息
在这里插入图片描述
发送后,我们看到图形化界面中有1条消息
在这里插入图片描述
IDEA后台因为我们认为写了1/0的错误算数运算,导致IDEA不停重发请求重试消息的推送,这显然也不符合我们的要求
在这里插入图片描述

在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态):
在这里插入图片描述
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
在这里插入图片描述

1.4.消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
在这里插入图片描述
怎么办呢?

1.4.1.本地重试

我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初始的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 4 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

修改SpringRabbitListener.java
修改为日志打印的形式

    @RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {log.debug("消费者接收到simple.queue的消息:【" + msg + "】");System.out.println(1 / 0);log.info("消费者处理消息成功!");}

重启consumer服务,重复之前的测试。可以发现:
在这里插入图片描述

  • 在重试4次后,SpringAMQP会抛出异常

在这里插入图片描述
在这里插入图片描述

AmqpRejectAndDontRequeueException,说明本地重试触发了

  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

1.4.2.失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机⭐
    在这里插入图片描述

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

1)在consumer服务中定义处理失败消息的交换机和队列

@Bean
public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,关联队列和交换机

@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码:

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;@Configuration
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

以上配置完之后,我们再重复步骤发送消息
在这里插入图片描述
发送后我们看到失败交换机有了
在这里插入图片描述
队列也有了
在这里插入图片描述
看一下IDEA的后台
在这里插入图片描述
看一下error.queue中的消息,很清晰把错误栈都输出了
在这里插入图片描述

1.5.总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

2.死信交换机

2.1.初识死信交换机

2.1.1.什么是死信交换机

什么是死信?

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果这个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)。

如图,一个消息被消费者拒绝了,变成了死信:
在这里插入图片描述
因为simple.queue绑定了死信交换机 dl.direct,因此死信会投递给这个交换机:
在这里插入图片描述

如果这个死信交换机也绑定了一个队列,则消息最终会进入这个存放死信的队列:
在这里插入图片描述

另外,队列将死信投递给死信交换机时,必须知道两个信息:

  • 死信交换机名称
  • 死信交换机与死信队列绑定的RoutingKey

这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列。
在这里插入图片描述

2.1.2.利用死信交换机接收死信(拓展)

在失败重试策略中,默认的RejectAndDontRequeueRecoverer会在本地重试次数耗尽后,发送reject给RabbitMQ,消息变成死信,被丢弃。

我们可以给simple.queue添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列。
在这里插入图片描述

我们在consumer服务中,定义一组死信交换机、死信队列:

// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化.deadLetterExchange("dl.direct") // 指定死信交换机.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

2.1.3.总结

什么样的消息会成为死信?

  • 消息被消费者reject或者返回nack
  • 消息超时未消费
  • 队列满了

死信交换机的使用场景是什么?

  • 如果队列绑定了死信交换机,死信会投递到死信交换机;
  • 可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。

2.2.TTL

一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况:

  • 消息所在的队列设置了超时时间
  • 消息本身设置了超时时间
    在这里插入图片描述

2.2.1.接收超时死信的死信交换机

在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.ttl.queue", durable = "true"),exchange = @Exchange(name = "dl.ttl.direct"),key = "ttl"
))
public void listenDlQueue(String msg){log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}

2.2.2.声明一个队列,并且指定TTL

要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:

@Bean
public Queue ttlQueue(){return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化.ttl(10000) // 设置队列的超时时间,10秒.deadLetterExchange("dl.ttl.direct") // 指定死信交换机.build();
}

注意,这个队列设定了死信交换机为dl.ttl.direct

声明交换机,将ttl与交换机绑定:

@Bean
public DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}

发送消息,但是不要指定TTL:

@Test
public void testTTLQueue() {// 创建消息String message = "hello, ttl queue";// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);// 记录日志log.debug("发送消息成功");
}

发送消息的日志:
在这里插入图片描述

查看下接收消息的日志:
在这里插入图片描述

因为队列的TTL值是10000ms,也就是10秒。可以看到消息发送与接收之间的时差刚好是10秒。

2.2.3.发送消息时,设定TTL

在发送消息时,也可以指定TTL:

@Test
public void testTTLMsg() {// 创建消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000").build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);log.debug("发送消息成功");
}

查看发送消息日志:
在这里插入图片描述

接收消息日志:
在这里插入图片描述

这次,发送与接收的延迟只有5秒。说明当队列、消息都设置了TTL时,任意一个到期就会成为死信。

2.2.4.总结

消息超时的两种方式是?

  • 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
  • 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信

如何实现发送一个消息20秒后消费者才收到消息?

  • 给消息的目标队列指定死信交换机
  • 将消费者监听的队列绑定到死信交换机
  • 发送消息时给消息设置超时时间为20秒

2.3.延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
在这里插入图片描述

使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

2.3.1.安装DelayExchange插件

参考课前资料:
在这里插入图片描述

1.安装DelayExchange插件

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

因为我们之前是基于Docker安装RabbitMQ,所以下面我们会讲解基于Docker来安装RabbitMQ插件。

2.下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

在这里插入图片描述

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。

课前资料也提供了下载好的插件:
在这里插入图片描述

3.上传插件

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。如果不是基于Docker的同学,请参考第一章部分,重新创建Docker容器。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

在这里插入图片描述
接下来,将插件上传到这个目录即可:
在这里插入图片描述

4.安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:
在这里插入图片描述

2.3.2.DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

在这里插入图片描述

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列

2.3.3.使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

1)声明DelayExchange交换机

基于注解方式(推荐):
在这里插入图片描述
SpringRabbitListener.java

/*** 延迟交换机和队列** @param message*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listenDelayExchange(String message) {log.info("消费者接收到了delay.queue的消息" + message);}

也可以基于@Bean的方式:
在这里插入图片描述

2)发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:
在这里插入图片描述
SpringAmqpTest.java

    /*** 发送延迟消息** @throws InterruptedException*/@Testpublic void testSendDealyMessage() throws InterruptedException {String routingKey = "delay";// 创建消息Message message = MessageBuilder.withBody("hello, delay message !".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setHeader("x-delay", 5000).build();// 准备CorrelationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("delay.direct", routingKey, message, correlationData);}log.info("发送delay消息成功!");

看一下mq的图形界面
在这里插入图片描述

运行测试类SpringAmqpTest.java发送消息,虽然发送delay消息成功,但是下面报了错误
在这里插入图片描述
查看consumer,5秒后接收消息成功
在这里插入图片描述

那这里为什么会报错呢,这是因为delay的交换机是将消息持有了5秒后发送的,这里其实不是报错,而是消息暂存了5秒,过了5秒才发送到队列,那我们能不让它们报错吗,当然可以,我们继续修改。
根据receivedDelay是否有值判断是否重发,有就不重发
修改publisher中的CommonConfig.java
在这里插入图片描述

    @Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置eturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息if (message.getMessageProperties().getReceivedDelay() > 0) {// 是一个延迟消息,忽略报错信息return;}// 记录日志log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},routingKey:{},消息:{}",replyCode, replyText, exchange, routingKey, message);// 如果有失败的,可以进行消息的重发});}

再此通过测试类发送消息,报错就没有了
在这里插入图片描述

2.3.4.总结

延迟队列插件的使用步骤包括哪些?

•声明一个交换机,添加delayed属性为true

•发送消息时,添加x-delay头,值为超时时间

3.惰性队列

3.1.消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

在这里插入图片描述
解决消息堆积有三种思路:

  • 增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
  • 在消费者内开启线程池加快消息处理速度
  • 扩大队列容积,提高堆积上限

要提升队列容积,把消息保存在内存中显然是不行的。

3.2.惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

3.2.1.基于命令行设置lazy-queue

而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues :策略的作用对象,是所有的队列

3.2.2.基于@Bean声明lazy-queue

在这里插入图片描述
新建类LazyConfig.java

@Configuration
public class LazyConfig {/*** 惰性队列** @return*/@Beanpublic Queue lazyQueue() {return QueueBuilder.durable("lazy.queue").lazy().build();}/*** 普通队列** @return*/@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normal.queue").build();}
}

启动Consumer服务,查看mq图形界面
在这里插入图片描述
修改SpringAmqpTest.java

    /*** 测试惰性队列** @throws InterruptedException*/@Testpublic void testSendLazyQueue() throws InterruptedException {for (int i = 0; i < 1000000; i++) {String routingKey = "lazy.queue";// 创建消息Message message = MessageBuilder.withBody("hello, lazy queue".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();rabbitTemplate.convertAndSend(routingKey, message);//log.info("发送lazy队列消息成功!");}}/*** 测试惰性队列** @throws InterruptedException*/@Testpublic void testSendNormalQueue() throws InterruptedException {for (int i = 0; i < 1000000; i++) {String routingKey = "normal.queue";// 创建消息Message message = MessageBuilder.withBody("hello, normal queue".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();rabbitTemplate.convertAndSend(routingKey, message);//log.info("发送normal队列消息成功!");}}

我们看到lazy queue中,memory中没有,直接写磁盘,比较稳定
在这里插入图片描述
我们看normal queue中,往内存中写,而且写一会儿就会往磁盘刷新一部分,稳定性不好
在这里插入图片描述

3.2.3.基于@RabbitListener声明LazyQueue

在这里插入图片描述

3.3.总结

消息堆积问题的解决方案?

  • 队列上绑定多个消费者,提高消费速度
  • 使用惰性队列,可以再mq中保存更多消息

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out,性能比较稳定

惰性队列的缺点有哪些?

  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO

4.MQ集群

4.1.集群分类

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。

镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。

镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

4.2.普通集群

4.2.1.集群结构和特征

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失

结构如图:

在这里插入图片描述

4.2.2.部署

参考课前资料:《RabbitMQ部署指南.md》

集群部署
接下来,我们看看如何安装RabbitMQ的集群。

1.集群分类

在RabbitMQ的官方文档中,讲述了两种集群的配置方式:

  • 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
  • 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。

我们先来看普通模式集群,我们的计划部署3节点的mq集群:

主机名控制台端口amqp通信端口
mq18081 —> 156728071 —> 5672
mq28082 —> 156728072 —> 5672
mq38083 —> 156728073 —> 5672

集群中的节点标示默认都是:rabbit@[hostname],因此以上三个节点的名称分别为:

  • rabbit@mq1
  • rabbit@mq2
  • rabbit@mq3

2.获取cookie

RabbitMQ底层依赖于Erlang,而Erlang虚拟机就是一个面向分布式的语言,默认就支持集群模式。集群模式中的每个RabbitMQ 节点使用 cookie 来确定它们是否被允许相互通信。

要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。

每个集群节点必须具有相同的 cookie。实例之间也需要它来相互通信。

我们先在之前启动的mq容器中获取一个cookie值,作为集群的cookie。执行下面的命令:

docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie

可以看到cookie值如下:

FXZMCVGLBIXZCDEMMVZQ

接下来,停止并删除当前的mq容器,我们重新搭建集群。

docker rm -f mq

在这里插入图片描述
清理数据卷

docker volume prune

3.准备集群配置

在/tmp目录新建一个配置文件 rabbitmq.conf:

cd /tmp
# 创建文件
touch rabbitmq.conf

文件内容如下:

loopback_users.guest = false
listeners.tcp.default = 5672
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3

再创建一个文件,记录cookie

cd /tmp
# 创建cookie文件
touch .erlang.cookie
# 写入cookie
echo "FXZMCVGLBIXZCDEMMVZQ" > .erlang.cookie
# 修改cookie文件的权限
chmod 600 .erlang.cookie

准备三个目录,mq1、mq2、mq3:

cd /tmp
# 创建目录
mkdir mq1 mq2 mq3

然后拷贝rabbitmq.conf、cookie文件到mq1、mq2、mq3:

# 进入/tmp
cd /tmp
# 拷贝
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3

4.启动集群

创建一个网络:

docker network create mq-net

docker volume create

运行命令

docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:3.8-management

5.测试

在mq1这个节点上添加一个队列:
在这里插入图片描述

如图,在mq2和mq3两个控制台也都能看到:
在这里插入图片描述

6.数据共享测试

点击这个队列,进入管理页面:
在这里插入图片描述

然后利用控制台发送一条消息到这个队列:
在这里插入图片描述

结果在mq2、mq3上都能看到这条消息:

在这里插入图片描述

7.可用性测试

我们让其中一台节点mq1宕机:

docker stop mq1

然后登录mq2或mq3的控制台,发现simple.queue也不可用了:
在这里插入图片描述

说明数据并没有拷贝到mq2和mq3。

4.3.镜像集群

4.3.1.集群结构和特征

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主

结构如图:
在这里插入图片描述

4.3.2.部署

参考课前资料:《RabbitMQ部署指南.md》
镜像模式

在刚刚的案例中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案。

官方文档地址:https://www.rabbitmq.com/ha.html

1.镜像模式的特征

默认情况下,队列只保存在创建该队列的节点上。而镜像模式下,创建队列的节点被称为该队列的主节点,队列还会拷贝到集群中的其它节点,也叫做该队列的镜像节点。

但是,不同队列可以在集群中的任意节点上创建,因此不同队列的主节点可以不同。甚至,一个队列的主节点可能是另一个队列的镜像节点

用户发送给队列的一切请求,例如发送消息、消息回执默认都会在主节点完成,如果是从节点接收到请求,也会路由到主节点去完成。镜像节点仅仅起到备份数据作用

当主节点接收到消费者的ACK时,所有镜像都会删除节点中的数据。

总结如下:

  • 镜像队列结构是一主多从(从就是镜像)
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
  • 不具备负载均衡功能,因为所有操作都会有主节点完成(但是不同队列,其主节点可以不同,可以利用这个提高吞吐量)

2.镜像模式的配置

镜像模式的配置有3种模式:

ha-modeha-params效果
准确模式exactly队列的副本量count集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
all(none)队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。
nodesnode names指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。

这里我们以rabbitmqctl命令作为案例来讲解配置语法。

语法示例:

3.exactly模式

rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • rabbitmqctl set_policy:固定写法
  • ha-two:策略名称,自定义
  • "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
  • '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
    • "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
    • "ha-params":2:策略参数,这里是2,就是副本数量为2,1主1镜像
    • "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销

4.模式

rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
  • ha-all:策略名称,自定义
  • "^all\.":匹配所有以all.开头的队列名
  • '{"ha-mode":"all"}':策略内容
    • "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点

5.nodes模式

rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
  • rabbitmqctl set_policy:固定写法
  • ha-nodes:策略名称,自定义
  • "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
  • '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
    • "ha-mode":"nodes":策略模式,此处是nodes模式
    • "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称

6.测试

我们使用exactly模式的镜像,因为集群节点数量为3,因此镜像数量就设置为2.

运行下面的命令:

docker exec -it mq1 rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

下面,我们创建一个新的队列:

在这里插入图片描述

在任意一个mq控制台查看队列:
在这里插入图片描述

7.测试数据共享

给two.queue发送一条消息:
在这里插入图片描述

然后在mq1、mq2、mq3的任意控制台查看消息:

在这里插入图片描述

8.测试高可用

现在,我们让two.queue的主节点mq1宕机:

docker stop mq1

查看集群状态:
在这里插入图片描述

查看队列状态:
在这里插入图片描述

发现依然是健康的!并且其主节点切换到了rabbit@mq2上

5.仲裁队列

从RabbitMQ 3.8版本开始,引入了新的仲裁队列,他具备与镜像队里类似的功能,但使用更加方便。

4.4.仲裁队列

4.4.1.集群特征

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致

4.4.2.部署

参考课前资料:《RabbitMQ部署指南.md》

仲裁队列
从RabbitMQ 3.8版本开始,引入了新的仲裁队列,他具备与镜像队里类似的功能,但使用更加方便。

1.添加仲裁队列

在任意控制台添加一个队列,一定要选择队列类型为Quorum类型。
在这里插入图片描述
在任意控制台查看队列:
在这里插入图片描述

可以看到,仲裁队列的 + 2字样。代表这个队列有2个镜像节点。

因为仲裁队列默认的镜像数为5。如果你的集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是3.

2.测试

可以参考对镜像集群的测试,效果是一样的。

3.集群扩容

4.加入集群

1)启动一个新的MQ容器:

docker run -d --net mq-net \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=itcast \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq4 \
--hostname mq5 \
-p 8074:15672 \
-p 8084:15672 \
rabbitmq:3.8-management

2)进入容器控制台:

docker exec -it mq4 bash

3)停止mq进程

rabbitmqctl stop_app

4)重置RabbitMQ中的数据:

rabbitmqctl reset

5)加入mq1:

rabbitmqctl join_cluster rabbit@mq1

6)再次启动mq进程

rabbitmqctl start_app

在这里插入图片描述

5.增加仲裁队列副本

我们先查看下quorum.queue这个队列目前的副本情况,进入mq1容器:

docker exec -it mq1 bash

执行命令:

rabbitmq-queues quorum_status "quorum.queue"

结果:
在这里插入图片描述
现在,我们让mq4也加入进来:

rabbitmq-queues add_member "quorum.queue" "rabbit@mq4"

结果:
在这里插入图片描述

再次查看:

rabbitmq-queues quorum_status "quorum.queue"

在这里插入图片描述
查看控制台,发现quorum.queue的镜像数量也从原来的 +2 变成了 +3:
在这里插入图片描述

4.4.3.Java代码创建仲裁队列

@Bean
public Queue quorumQueue() {return QueueBuilder.durable("quorum.queue") // 持久化.quorum() // 仲裁队列.build();
}

4.4.4.SpringAMQP连接MQ集群

注意,这里用address来代替host、port方式

spring:rabbitmq:addresses: 192.168.150.105:8071, 192.168.150.105:8072, 192.168.150.105:8073username: itcastpassword: 123321virtual-host: /

相关文章:

SpringCloud微服务技术栈.黑马跟学(十二)

SpringCloud微服务技术栈.黑马跟学 十二今日目标服务异步通信-高级篇1.消息可靠性1.1.生产者消息确认1.1.1.修改配置1.1.2.定义Return回调1.1.3.定义ConfirmCallback1.2.消息持久化1.2.1.交换机持久化1.2.2.队列持久化1.2.3.消息持久化1.3.消费者消息确认1.3.1.演示none模式1.3…...

HashMap集合存储学生对象并遍历

需求&#xff1a;创建一个HashMap集合&#xff0c;键是学生对象&#xff08;Student&#xff09;,值是居住地。存储多个键值对元素&#xff0c;并遍历。 要求保证键的唯一性&#xff1a;如果学生对象的成员变量值相同&#xff0c;我们就认为是同一个对象 思路&#xff1a; 定义…...

“提效”|教你用ChatGPT玩数据

ChatGPT与数据分析&#xff08;二&#xff09; 上文给简单聊了一下为什么ChatGPT不能取代数据分析师&#xff0c;本文我们来深入感受一下如何让GPT帮助数据分析师“提效”。 场景一&#xff1a;SQL取数 背景&#xff1a;多数数据分析师都要用SQL语言从数据库中提取数据&#x…...

https://app.hackthebox.com/machines/Inject

https://app.hackthebox.com/machines/Inject Ref&#xff1a; 1.https://blog.csdn.net/qq_58869808/article/details/129505388 2.https://blog.csdn.net/m0_73998094/article/details/129474782 info collecting ┌──(kwkl㉿kwkl)-[~/HODL/htb/Inject] └─$ nmap -A …...

Java Web 实战 15 - 计算机网络之网络编程套接字

文章目录一 . 网络编程中的基本概念1.1 网络编程1.2 客户端(client) / 服务器(server)1.3 请求(request) / 响应(response)1.4 客户端和服务器之间的交互数据1.4.1 一问一答1.4.2 多问一答1.4.3 一问多答1.4.4 多问多答二 . socket 套接字2.1 UDP 的 Socket API2.1.1 引子2.1.2…...

基于pdf2docx模块Python实现批量将PDF转Word文档(安装+完整代码教程)

PDF文件是一种常见的文档格式&#xff0c;但是在编辑和修改时不太方便&#xff0c;因为PDF本质上是一种静态的文档格式。因此&#xff0c;有时候我们需要将PDF文件转换成Word格式&#xff0c;以便更好地编辑和修改文档。在本篇文章中&#xff0c;我们将介绍如何使用Python实现P…...

3.21~3.22

识编程语言中的&#xff0c;局部变量&#xff0c;全局变量&#xff0c;以及变量生存周期&#xff0c;整形&#xff0c;浮点型数据的内存表示&#xff0c;od的内存窗口的使用 先看一个代码样例 #include<windows.h> #include<stdio.h>#pragma warning(disable:499…...

Chromium 改造实录:增加 MPEG TS 格式支持

在《选择最新 Chromium&#xff0c;支持 H264 / H265》一文中&#xff0c;记录了我通过升级 Chromium 版本解决了 H264 / H265 视频支持难题。然而难题接踵而至&#xff0c;这次的难题是 MPEG TS 流的支持。MPEG2-TS 传输流广泛应用于数字电视广播系统&#xff0c;所以是一个不…...

性能优化之-事件代理

js中的事件委托或是事件代理简单理解 事件委托也叫事件代理&#xff0c;“事件代理”即是把原本需要绑定在子元素的响应事件&#xff08;click、keydown…&#xff09;委托给父元素&#xff0c;让父元素担当事件监听的职务。事件代理的原理是DOM元素的事件冒泡。 概述&#x…...

MSDS 即化学品安全说明书

MSDS 即化学品安全说明书&#xff0c;亦可译为化学品安全技术说明书或化学品安全数据说明书&#xff0c;是化学品生产商和进口商用来阐明化学品的理化特性&#xff08;如PH值&#xff0c;闪点&#xff0c;易燃度&#xff0c;反应活性等&#xff09;以及对使用者的健康&#xff…...

真人手办没法实现网购?我有一个好办法!

记得以前在网上看到过一个冷笑话式的问答&#xff0c;问的是中国最早的手办是什么&#xff0c;有网友回答是秦始皇兵马俑&#xff0c;这个抖机灵式的回答简直妙得让人会心一笑。 你接触过手办吗&#xff1f; 提到手办&#xff0c;大家第一时间想到的&#xff0c;肯定都会是各…...

2019湖南省大学生程序设计竞赛题解(D)

D-Modulo Nine 很妙的类似区间dp&#xff0c; 我自己是想不到&#xff0c;本题解题思路来自学长的博客&#xff1a; 长沙橘子猫 题意 有一个长度为 nnn 的序列&#xff0c;你可以给每个位置填 0∼90\sim90∼9 的一个数&#xff0c;有 mmm 个限制&#xff0c;每个限制 [li,ri…...

【开发】中间件——RocketMQ

分布式消息系统 RocketMQ概念&#xff0c;用途&#xff0c;特性安装RocketMQ掌握RocketMQ的api使用对producer、consumer进行详解了解RocketMQ的存储特点 简介及相关概念JavaAPISpringBoot整合RocketMQ消息的顺序收发消息系统的事务、存储、重试策略消息系统的集群 RocketMQ R…...

36 UnitTest框架 - 参数化

目录 一、参数化环境准备 1、方式一&#xff1a;在终端&#xff08;cmd&#xff09;安装parameterized 2、方式二&#xff1a;在Pycharm中安装parameterized 二、参数化 1、什么事参数化&#xff1f; 2、参数化引入案例 &#xff08;1&#xff09;需求 &#xff08;2&a…...

Qt源码阅读(四) 事件循环

事件系统 文章为本人理解&#xff0c;如有理解不到位之处&#xff0c;烦请各位指正。 文章目录事件系统什么是事件循环&#xff1f;事件是如何产生的&#xff1f;sendEventpostEvent事件是如何处理的&#xff1f;事件循环是怎么遍历的&#xff1f;事件过滤器event夹带私货时间Q…...

银行数字化转型导师坚鹏:银行数字化领导力提升之道

银行数字化领导力提升之道 ——融合中西智慧&#xff0c;践行知行合一思想&#xff0c;实现知行果合一 课程背景&#xff1a; 很多银行存在以下问题&#xff1a;不知道如何领导数字员工&#xff1f;不清楚银行数字化领导力模型的内涵&#xff1f;不知道如何开展银行数字化…...

Vue2 -- 自定义单选内容的单选框组件

自定义单选内容的单选框组件 之前做的一个项目&#xff0c;在项目中有一个关于人员权限分配的功能&#xff0c;给人员指定各个模块的权限信息&#xff0c;分为 write 可写权限read 可读权限none 没有权限 项目要求画面中只显示 W R 两个按钮控制指定权限信息&#xff0c;都不…...

让PyTorch训练速度更快,你需要掌握这17种方法

掌握这 17 种方法&#xff0c;用最省力的方式&#xff0c;加速你的 Pytorch 深度学习训练。近日&#xff0c;Reddit 上一个帖子热度爆表。主题内容是关于怎样加速 PyTorch 训练。原文作者是来自苏黎世联邦理工学院的计算机科学硕士生 LORENZ KUHN&#xff0c;文章向我们介绍了在…...

LeetCode-309. 最佳买卖股票时机含冷冻期

目录题目思路动态规划题目来源 309. 最佳买卖股票时机含冷冻期 题目思路 每天最多只可能有三种状态中的一种 0表示当前处于买入状态(持有股票) 1表示当前处于卖出状态(不持有股票) 2表示当前处于冷冻状态 设dp[i][j]表示i - 1天状态为j时所拥有的最大现金 dp[i][0] Math.ma…...

AUTOSAR知识点Com(七):CANSM初认知

目录 1、概述 2、CanSM主要做什么 2.1、CAN控制器状态管理 2.2、CAN收发器状态管理 2.3、Busoff检测 1、概述 CANSM&#xff08;Controller Area Network State Manager&#xff09;是AUTOSAR&#xff08;Automotive Open System Architecture&#xff09;标准中的一个模块…...

【Java学习笔记】Arrays类

Arrays 类 1. 导入包&#xff1a;import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序&#xff08;自然排序和定制排序&#xff09;Arrays.binarySearch()通过二分搜索法进行查找&#xff08;前提&#xff1a;数组是…...

Qt Widget类解析与代码注释

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码&#xff0c;写上注释 当然可以&#xff01;这段代码是 Qt …...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

LabVIEW双光子成像系统技术

双光子成像技术的核心特性 双光子成像通过双低能量光子协同激发机制&#xff0c;展现出显著的技术优势&#xff1a; 深层组织穿透能力&#xff1a;适用于活体组织深度成像 高分辨率观测性能&#xff1a;满足微观结构的精细研究需求 低光毒性特点&#xff1a;减少对样本的损伤…...

9-Oracle 23 ai Vector Search 特性 知识准备

很多小伙伴是不是参加了 免费认证课程&#xff08;限时至2025/5/15&#xff09; Oracle AI Vector Search 1Z0-184-25考试&#xff0c;都顺利拿到certified了没。 各行各业的AI 大模型的到来&#xff0c;传统的数据库中的SQL还能不能打&#xff0c;结构化和非结构的话数据如何和…...

MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释

以Module Federation 插件详为例&#xff0c;Webpack.config.js它可能的配置和含义如下&#xff1a; 前言 Module Federation 的Webpack.config.js核心配置包括&#xff1a; name filename&#xff08;定义应用标识&#xff09; remotes&#xff08;引用远程模块&#xff0…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能

指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...