当前位置: 首页 > article >正文

【RabbitMQ】RabbitMQ的下载安装及使用

安装RabbitMQ

下载网站:https://www.rabbitmq.com/docs/install-windows

在这里插入图片描述

点击后,会直接定位到依赖介绍位置,告诉你需要安装Erlang

在这里插入图片描述

下载Erlang

Erlang也是一种编程语言,只是比较小众,但其拥有极为出色的性能

在这里插入图片描述

这个网站是到GitHub上下载的,可能需要点魔法,也可以去Erlang官网下载(能下,但慢)

在这里插入图片描述

下载RabbitMQ

下载Erlang的同时,也顺便下载RabbitMQ吧

在这里插入图片描述

或者直接使用别人下载好的包,比如我这提供的包

安装Erlang

运行下载好的exe文件

  • 点击Next即可

在这里插入图片描述

  • 选择安装路径,点击Next继续

在这里插入图片描述

  • 点击Install安装

在这里插入图片描述

  • 安装完后,点击Close即可

在这里插入图片描述

安装RabbitMQ

运行下载好的exe文件

  • 点击Next即可

在这里插入图片描述

  • 选择安装路径,点击Install安装

在这里插入图片描述

  • 安装成功后,点击Next继续

在这里插入图片描述

  • 点击Finish完成安装

在这里插入图片描述

安装插件

找到RabbitMQ目录下的sbin目录,打开CMD控制台,输入rabbitmq-plugins.bat enable rabbitmq_management命令

在这里插入图片描述

重启RabbitMQ服务后访问http://localhost:15672

在这里插入图片描述

默认账号密码均为guest

在这里插入图片描述

使用RabbitMQ

  • 导入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.24.0</version>
</dependency>

一对一队列模型

  • 生产者发送消息
/*** 一对一消息队列模型:生产者*/
public class SingleProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接工厂,用于创建到RabbitMQ服务器的连接ConnectionFactory factory = new ConnectionFactory();// 设置RabbitMQ服务器地址factory.setHost("localhost");// 创建一个连接,用于和RabbitMQ服务器建立通信通道try (Connection connection = factory.newConnection();// 创建一个通道Channel channel = connection.createChannel()) {// 声明一个队列,队列名为hellochannel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";// 将消息发布到指定队列中channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

运行后,RabbitMQ管理后台会增加一个队列

在这里插入图片描述

  • 消费者消费消息
/*** 一对一消息队列模型:消费者*/
public class SingleConsumer {// 定义我们正在监听的队列名称private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接工厂并配置连接信息ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 从工厂中获取一个新的连接Connection connection = factory.newConnection();// 创建一个新的通道Channel channel = connection.createChannel();// 声明一个队列,在该通道中声明我们要监听的队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建一个回调函数,用于处理从队列中接收到的消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息体并转换为字符串String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};// 在通道上开始消费队列中的消息,接收的消息会传递给deliverCallback进行处理,会持续阻塞channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

未运行代码前

在这里插入图片描述

运行代码后

在这里插入图片描述

此时在不中断消费者代码运行的情况下,再去运行生产者代码,会发现消费者会持续消费生产者增加的消息

在这里插入图片描述

一对多队列模型

  • 生产者发送消息
/*** 一对多消息队列:生产者*/
public class MultiProducer {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// durable参数设置为 true,服务器重启后队列不丢失channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);// 此处使用scanner循环输入消息,模拟生产者不定时多次生产Scanner scanner = new Scanner(System.in); while (scanner.hasNext()){String message = scanner.nextLine();// 指定 MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

运行代码,可以模拟发送多条数据

在这里插入图片描述

  • 消费者消费信息
/*** 一对多消息队列:消费者*/
public class MultiConsumer {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();// 设置持久化channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {System.out.println(" [x] Received '" + message + "'");doWork(message);} finally {System.out.println(" [x] Done");// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};// 开始消费消息,传入队列名称。关闭自动确认,投递回调和消费者取消回调channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}// 模拟消息处理,消息中每有一个“.”就让线程暂停10s,模拟复杂的耗时工程private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(10000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}

运行消费者代码

在这里插入图片描述

队列中的消息不是一次性全部接收的,而是需要等待消费者完成消费(处理完事务后)并主动确认完成后,才会继续发送下一条消息

在这里插入图片描述

这与一对多有什么关系呢?不急,我们停掉消费者代码运行先,然后让生产者进行生成消息

在这里插入图片描述

然后,直接拷贝一份消费者代码,命名为MultiConsumer2。此时运行两个消费者看效果

在这里插入图片描述

会发现,两个消费者会轮流接收队列消息,消费完(完成任务)后才会确认并接收新的队列消息,直至队列所有消息被消费完

在这里插入图片描述
下面交换机模型为初学者见解,可能存在理解错误,看看就好,我后面也会深入学习,但大概率不会再修改本文章,所以要专业的、准确的还是得看官方文档,这里会用就行,被误导了概不负责,谢谢

交换机模型

交换机是消息队列中的一个组件,有点类似于中转站,上面两个模型都是生产者创建消息队列,然后由消费者去接收指定消息队列中的消息,而交换机模型中,生产者不再创建指定的消息队列,而是创建一个交换机,由消费者去绑定交换机并创建消息队列,然后再接收生产者的消息。由直接变间接。

这就有点像网络路由一样,最初,两台电脑要互发消息,就必须各自开一个网口连接网线,三台电脑要交互就各开两个网口,随着电脑接入的越多,一台电脑上要的网口就越多,网线交错也就越复杂,这时为了更好梳理网线和减少网口,就有了集线器、交互器、路由器等,而RabbitMQ中的交换机也是同样道理,为了方便管理多个消息队列及其后续变动

在这里插入图片描述

交换机有direct, topic, headersfanout四种类型,因为headers交换机的性能较差,不太推荐使用,了解有该类型即可

Fanout交换机

fanout有扇出的意思,该类型交换机会把消息一次性扇出(发布)给所有与该交换机绑定的消息队列,适用于广播消息,如更新文章后,广播消息给所有订阅文章的用户

在这里插入图片描述

  • 生产者发送消息
/*** 交换机模型:生产者*/
public class FanoutProducer {// 定义交换机的名称private static final String EXCHANGE_NAME = "exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置工厂的主机地址factory.setHost("localhost");// 创建一个连接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//  声明一个交换机,交换机名称为exchange,类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 发布消息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){// 获取用户输入String message = scanner.nextLine();// 将消息发送到指定的交换机上,交换机名称为exchange,路由键为空,消息属性为null,消息内容为用户输入的字符串channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

此时运行生产者代码,并输入内容,会发现队列表中没有新增队列

在这里插入图片描述

但可以再Exchanges中看到名为exchange、类型为fanout的交换机信息

在这里插入图片描述

  • 消费者消费消息
/*** 交换机模型:消费者*/
public class FanoutConsumer {// 声明交换机的名称private static final String EXCHANGE_NAME = "exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置工厂的主机地址factory.setHost("localhost");// 创建一个连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明一个交换机,交换机名称为exchange,类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// String queueName = channel.queueDeclare().getQueue();// 创建队列,名称为fanout_queue,并绑定到交换机上String queueName = "yg1_queue";channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [员工1] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}

运行消费者代码,会发现队列表中新增了yg1_queue队列

在这里插入图片描述

此时拷贝多一份消费者代码,并修改队列名为yg2_queue

在这里插入图片描述

运行后,队列表会新增yg2_queue队列

在这里插入图片描述

此时生产者发送消息后,会同时发送给两位消费者进行处理

在这里插入图片描述

需要注意的是,如果生产者先发送消息,再创建消费者,因为还没有创建存储的消息队列,所以是无法存储消息的,即消费者无法接收队列创建前的旧消息;但如果消费者已经启动过一次了(RabbitMQ中已有其消息队列),那么生产者发送消息后再启动消费者,还是能接收到消息的;

就比如你没有QQ号,那么别人发送不了消息给你,当你创建好QQ号后,无论你是否上线,别人都能发送消息给你

以一对多队列模型为例

  • 我们删除队列表中所有队列

在这里插入图片描述

  • 然后只运行生产者代码,会发现它直接创建好了一个消息队列

在这里插入图片描述

  • 此时发送消息后再启动消费者,消费者是能接收到队列消息的

在这里插入图片描述

因为消息只在消息队列中传递,交换机只是中间件。这里的生产者只创建交换机,不创建队列,队列有消费者创建

在这里插入图片描述

在这里插入图片描述

为什么要用交换机呢? (个人理解)

打个不恰当的比喻:消息队列就是Q群,此时你有n个Q群,你要给每个群都发送一个拼手气红包,让群友去争抢;你自然可以手动一个一个去发,但更好的方式是选择采用脚本(交换机),通过脚本(交换机)去给该账号下的每个群(消息队列)都去发送一个拼手气红包。这样的好处在于,后面不论是有新群还是有群被解散,你都无需理会,你只需在意是否是自己q号上的群(是否绑定在交换机上)

Direct交换机

fanout就像AOE技能,无差别的范围攻击,而Direct就像是指定性单体技能,即使有多个消息队列绑定在其上,也能根据路由键给指定消息队列发送消息,适用于指派任务,通过路由键分发任务给指定消息队列

  • 生产者发送消息
/*** direct交换机模型:生产者*/
public class DirectProducer {// 定义交换机名称private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置连接工厂的主机地址为本地主机factory.setHost("localhost");// 建立连接并创建通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 使用通道声明交换机,类型为directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){// 读取用户输入内容,并以空格分隔String userInput = scanner.nextLine();String[] parts = userInput.split(" ");if(parts.length < 1){continue;}String message = parts[0];// 路由键,用于确定消息被路由到哪个队列String severity = parts[1];// 发布消息到交换机channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [生产者] 发送 '" + severity + "':'" + message + "'");}}}
}

运行生产者代码,同样不会生成消息队列而是创建类型为direct的交换机

在这里插入图片描述

  • 消费者消费消息
/*** direct交换机模型:消费者*/
public class DirectConsumer {// 定义我们正在监听的交换机名称private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置连接工厂的主机地址为本地主机factory.setHost("localhost");// 建立与 RabbitMQ 服务器的连接Connection connection = factory.newConnection();// 创建一个通道Channel channel = connection.createChannel();// 声明一个 direct 类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 声明一个匿名队列,并获取队列名称// String queueName = channel.queueDeclare().getQueue();// 手动声明队列名称String queueName = "cy_queue";// 创建队列,并设置队列的持久化属性为 truechannel.queueDeclare(queueName, true, false, false, null);// 绑定队列到交换机上,并指定绑定路由键为“cy”channel.queueBind(queueName, EXCHANGE_NAME, "cy");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建一个 DeliverCallback 实例来处理接收到的消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [残炎] 收到了 '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};// 开始消费队列中的消息,设置自动确认消息已被消费channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

运行消费者代码,会生成指定名称的队列

在这里插入图片描述

此时,生产者发送消息并指定路由键后,对应的消费者会收到消息,而不属于它 的消息不会收到

在这里插入图片描述

同样的,拷贝一份消费者代码并启动

在这里插入图片描述

生产者每次发送给不同目标发送消息,都会精准无误地转发到指定目标

在这里插入图片描述

其实上面也同样演示了一个问题,那就是发送消息给不存在的路由键目标,也就是还没拷贝第二份消费者(fy)代码时生产者给fy发送的消息,是直接丢弃的。

direct就像是能输入具体发送目标类型的红包脚本,它允许我们自行选择要发送的目标群类型,而不是账号下的所有群,毕竟有些群可能是不相干的人拉你进的,群友与你没有任何瓜葛,你又何必给他们发呢?又或者你只想给自己所有的家族群发(只要你有标记哪些群是家族群)

direct能否给不同队列发送消息?

可以的,官网明确说了,不同消息队列允许绑定相同的路由键,而我们发送消息只关注路由键是否存在,并不在意有几个队列绑定在同一路由键上,所以我们可以将同类型的消息队列绑定在同一路由键上

在这里插入图片描述

Topic交换机

topic交换机与direct交换机类型,也是指定性,只不过它不再是单体指定,而是允许指定多个目标(注意,这里的目标指的是路由键而非具体的消息队列)。

  • 生产者发送消息
/*** topic交换机模型:生产者*/
public class TopicProducer {// 交换机名称private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] argv) throws Exception {// 创建连接工厂并设置连接参数ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 建立连接并创建通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 使用通道声明交换机,类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String userInput = scanner.nextLine();String[] parts = userInput.split(" ");if (parts.length < 1) {continue;}String message = parts[0];String routingKey = parts[1];// 发布消息到指定的交换机和路由键channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [生产者] 发送 '" + routingKey + "':'" + message + "'");}}}
}

运行生产者代码,同样不会生成消息队列而是创建类型为topic的交换机

在这里插入图片描述

  • 消费者消费消息
/*** topic交换机模型:消费者*/
public class TopicConsumer {// 定义监听的交换机名称private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明使用的交换机,并指定类型为topicchannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建cy队列String queueName = "cy_queue";// 声明队列,并设置队列未持久化、非排他、非自动删除channel.queueDeclare(queueName, true, false, false, null);// 绑定队列到交换机上,并指定路由键模式为“#.cy.#”channel.queueBind(queueName, EXCHANGE_NAME, "#.cy.#");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [cy组] 收到 '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

同样拷贝一份消费者代码并运行

在这里插入图片描述

与direct对比,除了routing key采用格式不同外,表面上好像并没有太大的区别

在这里插入图片描述

但topic可以以组合的发送而direct并不能,如下,cy组和fy组可以以任意形式组合,并发送对应消息给他们

在这里插入图片描述

这样,我们就可以一次性指定多个具体目标去处理指定消息

值得注意的一点,topic允许一个消息队列绑定多个绑定键,然后只要匹配其中一个即可收到消息

我们再拷贝一份消费者代码,修改如下,这里我们绑定的路由键包含cy与fy

在这里插入图片描述

此时无论我们给cy发,还是fy发,cyfy组都能收到消息

在这里插入图片描述

且需要注意的是,即使我们同时给cy和fy发消息(项目3 cy.fy),cyfy组都只会收到一条消息,不同出现重复接收的情况

在这里插入图片描述

topic就像是比direct更高级的脚本(官方说的,topic是比direct更复杂),direct这个脚本只能指定一个群类型,假设我除了想给所有家族群发,还想给我创建的群发,或者我管理的群,很显然,这需要我重新设置目标类型并再一次启动脚本。

在这里插入图片描述

这自然没有任何问题,无非就是需要操作多次的问题,可事实上,可能有些家族群是由我创建的或者由我管理的,那么就会出现一个问题,脚本会在某些群里发送多次红包,这很显然不符合我给指定目标群发送一个拼手气红包的目的。topic便是能处理这个问题的脚本,我能一次性设置家族群、我是群主、我是管理员三个类型,然后一次性给我Q号下满足这些类型的群发送一个拼手气红包,且及时有些群满足多个条件,也只会发送一个。

项目实战

项目中有很多种方法使用RabbitMQ,如使用官方的客户端,或者使用别人封装好的客户端,因为我的项目使用的 SpringBoot 框架,所以这里直接选择 Spring官方提供的封装好的客户端:Spring Boot RabbitMQ Starter

  • 导入依赖
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.2</version>
</dependency>
  • yml配置
spring:rabbitmq:# rabbitmq 主机地址host: localhost# 端口port: 5672# 登录账号和密码username: guestpassword: guest

在项目使用MQ前,需先创建好交换机和队列

  • MqInitMain
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 用于创建测试程序用到的交换机和队列(只用在程序启动前执行一次)*/
public class MqInitMain {public static void main(String[] args) {try{// 创建链接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");// 创建连接Connection connection = factory.newConnection();// 创建通道Channel channel = connection.createChannel();// 定义交换机的名称为“code_exchange”String EXCHAMGE_NAME = "code_exchange";// 声明交换机,并指定类型channel.exchangeDeclare(EXCHAMGE_NAME, "direct");// 创建队列,随机分配一个队列名称String queueName = "code_queue";// 声明队列,设置队列持久化、非独占、非自动删除、并传入额外参数为nullchannel.queueDeclare(queueName, true, false, false, null);// 将队列绑定到交换机,指定路由键为“my_routingKey”channel.queueBind(queueName, EXCHAMGE_NAME, "my_routingKey");}catch (Exception e){e.printStackTrace();}}
}

运行后关闭即可,RabbitMQ后台会增加对应交换机和队列

在这里插入图片描述

  • 生产者:MyMessageProducer
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class MyMessageProducer {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送消息* @param exchange  交换机名称,指定消息要发送到哪个交换机* @param routingKey 路由键,指定消息要根据什么规则路由到对应的队列* @param message 消息内容,要发生的具体消息*/public void sendMessage(String exchange, String routingKey, Object message) {// 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)rabbitTemplate.convertAndSend(exchange, routingKey, message);}
}
  • 消费者:MyMessageConsumer
@Component
@Slf4j
public class MyMessageConsumer {/***** @param message* @param channel* @param deliveryTag*/@SneakyThrows// 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认@RabbitListener(queues = {"code_queue"},ackMode = "MANUAL")public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {log.info("接收到消息:{}", message);// deliveryTag是一个数字标识,在消息消费者接收到消息后用于向RabbitMQ确认消息的处理状态,告知该消息已经被消费,可以进行确认和从队列中删除。channel.basicAck(deliveryTag, false);}
}

需要注意,填写的队列名一定得在RabbitMQ服务中存在,否则会直接报错

在这里插入图片描述

所以推荐把队列名、交换机名、路由键给抽出来作为常数管理

public interface BiMqConstant {// 交换机、队列、路由键信息String BI_EXCHANGE_NAME = "bi_exchange";String BI_QUEUE_NAME = "bi_queue";String BI_ROUTING_KEY = "bi_routingKey";
}

测试

@SpringBootTest
class MyMessageProducerTest {@Resourceprivate MyMessageProducer myMessageProducer;@Testvoid sendMessage() {myMessageProducer.sendMessage("code_exchange", "my_routingKey", "Hello, RabbitMQ!");}
}

消费者监听到队列有消息,就会接收并处理

在这里插入图片描述

当我们路由键写错,RabbitMQ没有对应路由键,RabbitMQ会直接丢弃该消息

在这里插入图片描述

如果本项目没有对应消息队列的消费者,那么也不会去消费该消息队列的消息

项目中使用

  • Controller/server层
@Resource
private BiMessageProducer biMessageProducer;public BaseResponse<BiResponse> genChartByAiAsyncMq(@RequestPart("file") MultipartFile multipartFile,GenChartByAiRequest genChartByAiRequest, HttpServletRequest request){// TODO 处理代码逻辑......// 这里是先将用户信息处理好后,先存入到数据库中,并标准执行状态为等待// 在返回结果前先提交一个任务// 建议处理任务队列满之后,抛异常的情况(因为提交任务报错了,前端会返回异常)biMessageProducer.sendMessage(String.valueOf(newChartId));// TODO 处理代码逻辑......return ResultUtils.success(biResponse);
}
  • BiMessageProducer
/*** 发送消息(固定发送目标)* @param message 消息内容,要发生的具体消息*/
public void sendMessage(Object message) {// 使用rabbitTimplate的convertAndSend方法发送消息到指定交换机(exchange)和路由键(routingKey)rabbitTemplate.convertAndSend(BiMqConstant.BI_EXCHANGE_NAME, BiMqConstant.BI_ROUTING_KEY, message);
}
  • BiMessageConsumer
@SneakyThrows
// 使用 @RabbitListener 注解,监听code-queue队列,并设置消息的确认机制为手动确认
@RabbitListener(queues = {BiMqConstant.BI_QUEUE_NAME},ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {log.info("接收到消息:{}", message);if (StringUtils.isBlank(message)){// 如果更多失败,拒绝当前消息,让消息重新进入队列channel.basicNack(deliveryTag,false,false);throw new BusinessException(ErrorCode.SYSTEM_ERROR,"消息为空");}// TODP 进行业务逻辑处理......// 在需要抛出异常处,即业务处理失败时,拒绝消息if (!updateResult){// 只要不成功,就拒绝消息channel.basicNack(deliveryTag,false,false);handleChartUpdateError(chart.getId(), "更新失败");}// 如果业务处理完了,就确认消息channel.basicAck(deliveryTag, false);
}

当用户提交一个请求后,会发送一个消息到RabbitMQ上。此时不会理会该消息是否被消费,而是直接返回一个等待结果给用户;当有进程空闲时,消费者会开始监听并处理该消息的业务,如果业务处理异常,则拒绝消息,让消息重回队列中等再次被人接收;否则确认消息已被消费。此时用户可在后台中看到请求已完成

相关文章:

【RabbitMQ】RabbitMQ的下载安装及使用

安装RabbitMQ 下载网站&#xff1a;https://www.rabbitmq.com/docs/install-windows 点击后&#xff0c;会直接定位到依赖介绍位置&#xff0c;告诉你需要安装Erlang 下载Erlang Erlang也是一种编程语言&#xff0c;只是比较小众&#xff0c;但其拥有极为出色的性能 这个网站是…...

Stylelint 如何处理 CSS 预处理器

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…...

Word中Ctrl+V粘贴报错问题

Word中CtrlV粘贴时显示“文件未找到&#xff1a;MathPage.WLL”的问题 Word的功能栏中有MathType&#xff0c;但无法使用&#xff0c;显示灰色。 解决方法如下&#xff1a; 首先找到MathType安装目录下MathPage.wll文件以及MathType Commands 2016.dotm文件&#xff0c;分别复…...

jmeter逻辑控制器9

1&#xff0c;简单控制器2&#xff0c;录制控制器3&#xff0c;循环控制器4&#xff0c;随机控制器5&#xff0c;随机顺序控制器6&#xff0c;if控制器7&#xff0c;模块控制器8&#xff0c;Include控制器9&#xff0c;事物控制器本文永久更新地址: 1&#xff0c;简单控制器 不…...

uniapp mqttjs 小程序开发

在UniApp中集成MQTT.js开发微信小程序时&#xff0c;需注意平台差异、协议兼容性及消息处理等问题。以下是关键步骤与注意事项的综合指南&#xff1a; 一、环境配置与依赖安装 安装MQTT.js 推荐使用兼容性较好的版本&#xff1a;mqtt4.1.0&#xff08;H5和小程序兼容性最佳&…...

GitHub Copilot Agent 模式系统提示词

系统提示词 你是一名 AI 编程助手。 当被问及你的名字时&#xff0c;你必须回答“GitHub Copilot”。请严格且完整地遵循用户的要求。 遵守微软内容政策。 避免涉及侵犯版权的内容。如果有人要求你生成有害、仇恨、种族主义、性别歧视、淫秽、暴力或与软件工程完全无关的内容&…...

【设计模式】【行为型模式】模板方法模式(Template Method)

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f4eb; 欢迎V&#xff1a; flzjcsg2&#xff0c;我们共同讨论Java深渊的奥秘 &#x1f…...

w200基于spring boot的个人博客系统的设计与实现

&#x1f64a;作者简介&#xff1a;多年一线开发工作经验&#xff0c;原创团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看文章末尾⬇️联系方式获取&#xff0c;记得注明来意哦~&#x1f339;赠送计算机毕业设计600个选题excel文…...

React 生命周期函数详解

React 组件在其生命周期中有多个阶段&#xff0c;每个阶段都有特定的生命周期函数&#xff08;Lifecycle Methods&#xff09;。这些函数允许你在组件的不同阶段执行特定的操作。以下是 React 组件生命周期的主要阶段及其对应的生命周期函数&#xff0c;并结合了 React 16.3 的…...

docker grafana安装

mkdir /root/grafana-storage chmod 777 -R /root/grafana-storage docker run -d -p 3000:3000 --namedocker-apisix-grafana-1 --network docker-apisix_apisix -v /root/grafana-storage:/var/lib/grafana grafana/grafana:9.1.0 浏览器访问&#xff1a; http://192.…...

H5+CSS+JS制作好看的轮播图

先来看效果 点击下方按钮可以做到平滑切换轮播&#xff0c;轮播图片可以根据自定义随心变化。 先来看一下页面代码结构 <div class"container"><div class"lunbo-wrap"><div id"slide"></div><div class"butto…...

aio-pika 快速上手(Python 异步 RabbitMQ 客户端)

目录 简介官方文档如何使用 简介 aio-pika 是一个 Python 异步 RabbitMQ 客户端。5.0.0 以前 aio-pika 基于 pika 进行封装&#xff0c;5.0.0 及以后使用 aiormq 进行封装。 https://github.com/mosquito/aio-pikahttps://pypi.org/project/aio-pika/ pip install aio-pika官…...

表单与交互:HTML表单标签全面解析

目录 前言 一.HTML表单的基本结构 基本结构 示例 二.常用表单控件 文本输入框 选择控件 文件上传 按钮 综合案例 三.标签的作用 四.注意事项 前言 HTML&#xff08;超文本标记语言&#xff09;是构建网页的基础&#xff0c;其中表单&#xff08;<form>&…...

非递减子序列(力扣491)

这道题的难点依旧是去重&#xff0c;但是与之前做过的子集类问题的区别就是&#xff0c;这里是求子序列&#xff0c;意味着我们不能先给数组中的元素排序。因为子序列中的元素的相对位置跟原数组中的相对位置是一样的&#xff0c;如果我们改变数组中元素的顺序&#xff0c;子序…...

Python基础-元组tuple的学习

在 Python 中&#xff0c;元组&#xff08;tuple&#xff09;是一种不可变的序列类型&#xff0c;允许存储不同类型的元素。元组非常类似于列表&#xff08;list&#xff09;&#xff0c;但与列表不同的是&#xff0c;元组一旦创建&#xff0c;就不能修改其内容。 1 元组的创建…...

Vue与Konva:解锁Canvas绘图的无限可能

前言 在现代Web开发中&#xff0c;动态、交互式的图形界面已成为提升用户体验的关键要素。Vue.js&#xff0c;作为一款轻量级且高效的前端框架&#xff0c;凭借其响应式数据绑定和组件化开发模式&#xff0c;赢得了众多开发者的青睐。而当Vue.js邂逅Konva.js&#xff0c;两者结…...

如何修改DNS解析?

DNS(域名系统)就像互联网的“电话簿”&#xff0c;负责将我们输入的网址转换为计算机能够理解的IP地址。如果DNS解析出现问题&#xff0c;访问网站就会受到影响。那我们该如何修改DNS解析呢?接下来&#xff0c;我们就来介绍一下这个话题。 为什么要修改DNS解析? 使用默认的…...

go语言文件和目录

打开和关闭文件 os.Open()函数能够打开一个文件&#xff0c;返回一个*File 和一个 err。操作完成文件对象以后一定要记得关闭文件。 package mainimport ("fmt""os" )func main() {// 只读方式打开当前目录下的 main.go 文件file, err : os.Open(".…...

Solidity09 Solidity构造函数和修饰器

文章目录 一、构造函数二、修饰器三、OpenZeppelin的Ownable标准实现四、Remix 演示示例五、代码示例 这一讲&#xff0c;我们将用合约权限控制&#xff08; Ownable&#xff09;的例子介绍 Solidity语言中构造函数&#xff08; constructor&#xff09;和独有的修饰器&…...

ES6 Map 数据结构是用总结

1. Map 基本概念 Map 是 ES6 提供的新的数据结构&#xff0c;它类似于对象&#xff0c;但是"键"的范围不限于字符串&#xff0c;各种类型的值&#xff08;包括对象&#xff09;都可以当作键。Map 也可以跟踪键值对的原始插入顺序。 1.1 基本用法 // 创建一个空Map…...

使用wpa_supplicant和wpa_cli 扫描wifi热点及配网

一&#xff1a;简要说明 交叉编译wpa_supplicant工具后会有wpa_supplicant和wpa_cli两个程序生产&#xff0c;如果知道需要连接的wifi热点及密码的话不需要遍历及查询所有wifi热点的名字及信号强度等信息的话&#xff0c;使用wpa_supplicant即可&#xff0c;否则还需要使用wpa_…...

json转excel,在excel内导入json, json-to-excel插件

简介 JSON 转 Excel 是一款 Microsoft Excel 插件&#xff0c;可将 JSON 数据转换为 Excel 表格。 要求 此插件适用于以下环境&#xff1a;Excel 2013 Service Pack 1 或更高版本、Excel 2016 for Mac、Excel 2016 或更高版本、Excel Online。 快速开始 本快速开始指南适用…...

大模型应用与实战:专栏概要与内容目录

文章目录 大模型应用与实战&#x1f4da; 核心内容模块一、大模型推理与部署1.1 推理框架应用实践1.2 框架源码深度解析1.3 高并发部署优化1.4 国产化平台适配 二、Agent框架专题2.1 Langchain系列2.2 Qwen-Agent系列2.3 Dify应用实践2.4 框架对比与迁移 三、微调技术研究3.1 微…...

ZooKeeper 的典型应用场景:从概念到实践

引言 在分布式系统的生态中&#xff0c;ZooKeeper 作为一个协调服务框架&#xff0c;扮演着至关重要的角色。它的设计目的是提供一个简单高效的解决方案来处理分布式系统中常见的协调问题。本文将详细探讨 ZooKeeper 的典型应用场景&#xff0c;包括但不限于配置管理、命名服务…...

Arbess基础教程-创建流水线

Arbess(谐音阿尔卑斯) 是一款开源免费的 CI/CD 工具&#xff0c;本文将介绍如何使用 Arbess 配置你的第一条流水线&#xff0c;以快速入门上手。 1. 创建流水线 根据不同需求来创建不同的流水线。 1.1 配置基本信息 配置流水线的基本信息&#xff0c;如分组&#xff0c;环境&…...

科普书《从一到无穷大》的科普知识推翻百年集论

科普书《从一到无穷大》的科普知识推翻百年集论 黄小宁 “我们给两组无穷大数列中的各个数一一配对&#xff0c;如果最后这两组都一个不剩&#xff0c;这两组无穷大就是相等的&#xff1b;如果有一组还有些数没有配出去&#xff0c;这一组就比另一组大些&#xff0c;或者说强些…...

【键盘识别】实例分割

第一步 键盘检测 方案一 canny边缘检测 canny边缘检测检测结果不稳定,容易因为复杂背景或光线变换检测出其他目标。 如图是用canny边缘检测方法标出的检测出的边缘的四个红点。 参考的是这篇文章OpenCV实战之三 | 基于OpenCV实现图像校正_opencv 图像校正-CSDN博客 方案二…...

25/2/7 <机器人基础>雅可比矩阵计算 雅可比伪逆

雅可比矩阵计算 雅可比矩阵的定义 假设我们有一个简单的两个关节的平面机器人臂&#xff0c;其末端执行器的位置可以表示为&#xff1a; 其中&#xff1a; L1​ 和 L2 是机器人臂的长度。θ1​ 和 θ2是关节的角度。 计算雅可比矩阵 雅可比矩阵 JJ 的定义是将关节速度与末…...

apisix的real-ip插件使用说明

k8s集群入口一般都需要过负载均衡&#xff0c;然后再到apisix。 这时候如果后台业务需要获取客户端ip&#xff0c;可能拿到的是lb或者网关的内网ip。 这里一般要获取真实ip需要做几个处理。 1. 负载均衡上&#xff0c;一般支持配置获取真实ip参数&#xff0c;需要配置上。然…...

位图的深入解析:从数据结构到图像处理与C++实现

在学习优选算法课程的时候&#xff0c;博主学习位运算了解到位运算的这个概念&#xff0c;之前没有接触过&#xff0c;就查找了相关的资料&#xff0c;丰富一下自身&#xff0c;当作课外知识来了解一下。 位图&#xff08;Bitmap&#xff09;是一种用于表示图像的数据结构&…...