当前位置: 首页 > news >正文

【RabbitMQ消息队列原理与应用】

RabbitMQ消息队列原理与应用

一、消息队列概述

(一)概念

消息队列(Message Queue,简称MQ)是一种应用程序间的通信方式,它允许应用程序通过将消息放入队列中,而不是直接调用其他应用程序的接口,实现应用程序间的解耦。发送消息的应用程序称为生产者(Producer),接收消息的应用程序称为消费者(Consumer),消息在队列(Queue)中暂存,直到被消费者消费。

(二)作用

1. 解耦
  • 在分布式系统中,不同服务之间可能存在复杂的依赖关系,使用消息队列可以将这些服务解耦。例如,在一个电商系统中,订单服务产生订单后,需要通知库存服务和物流服务,若不使用消息队列,订单服务可能需要直接调用库存服务和物流服务的接口。而使用消息队列后,订单服务只需将消息发送到队列,库存服务和物流服务作为消费者从队列中获取消息,这样它们之间就不需要直接调用,减少了服务间的耦合,使得系统更易于维护和扩展。从源码角度来看,生产者只关心将消息发送到队列,消费者只关心从队列接收消息,它们通过队列这个中介实现了间接通信,而无需知晓对方的具体实现。
2. 异步处理
  • 一些操作可能比较耗时,如发送邮件、短信通知等,使用消息队列可以将这些操作异步化。生产者将消息发送到队列后即可继续处理后续业务,而不必等待这些耗时操作完成。在 RabbitMQ 的 Channel 类中,生产者使用 basicPublish() 方法将消息发送到队列,此过程是异步的,发送完消息后,生产者的线程可以立即处理其他任务,而耗时的消息处理工作则由消费者在其他时间处理。
3. 流量削峰
  • 在高并发场景下,如电商平台的秒杀活动,可能会瞬间产生大量请求,使用消息队列可以将这些请求暂存到队列中,消费者按照自己的处理能力从队列中取出消息进行处理,避免系统因瞬间流量过大而崩溃。在 RabbitMQ 的架构中,队列起到缓冲作用,将高峰流量存储下来,再由消费者慢慢消化,避免后端服务直接承受高并发压力。

(三)应用场景

1. 分布式系统集成
  • 不同服务间通过消息队列进行通信和协作,实现系统的分布式部署和松耦合,如在微服务架构中,各个微服务可通过消息队列进行事件通知、数据传递等。例如,用户注册服务完成用户注册后,可将消息发送到消息队列,其他服务(如积分服务、通知服务等)可根据这些消息进行相应操作。
2. 日志处理
  • 将日志信息发送到消息队列,然后由专门的日志处理服务进行处理和存储,避免日志处理影响主要业务逻辑。多个服务作为生产者将日志消息发送到队列,日志处理服务作为消费者从队列中拉取消息,将日志存储到数据库或文件系统中。

二、RabbitMQ 的安装与配置

(一)安装过程

使用 Docker 安装:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

这个命令启动了一个 RabbitMQ 容器,同时开启了管理界面(端口 15672)和消息队列服务端口(5672),方便管理和使用 RabbitMQ。

详细安装过程请参考这篇文章 RabbitMQ实战

(二)代码示例:核心概念与操作

1. 发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello, RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

代码解释:

  • ConnectionFactory 用于创建连接 RabbitMQ 的工厂,设置了 RabbitMQ 服务器的主机地址、端口以及用户名和密码。
    connection.createChannel() 创建了一个通信通道,通过该通道可以进行消息的发送和接收操作。
  • channel.queueDeclare(QUEUE_NAME, false, false, false, null) 声明了一个队列,这里的参数依次表示队列名称、是否持久化、是否独占、是否自动删除、队列的其他属性。
  • channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes(“UTF-8”)) 将消息发送到队列,第一个参数是交换机名称(这里为空表示使用默认交换机),第二个参数是队列名称,第三个参数是消息属性,最后是消息的字节数组表示。
2. 接收消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

代码解释:

  • 同样使用 ConnectionFactory 创建连接和通道。
  • channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}) 用于消费消息,第一个参数是队列名称,第二个参数表示是否自动确认消费(这里设置为 true),第三个参数是消息到达时的回调函数,最后是取消消费的回调函数。

(三)核心概念

  1. 交换机(Exchange)
    交换机是消息发送的入口,根据路由键(Routing Key)将消息路由到不同的队列。在 RabbitMQ 的 Exchange 类中,负责消息的分发,不同类型的交换机有不同的路由策略。
  2. 队列(Queue)
    存储消息的地方,在 Queue 类中,负责消息的存储和管理,队列可以设置持久化、独占、自动删除等属性,不同属性会影响队列的存储和使用方式。
  3. 绑定(Binding)
    绑定将交换机和队列联系起来,通过路由键将消息从交换机路由到队列,在 Binding 类中,存储了交换机、队列和路由键的绑定关系,根据这个关系实现消息的路由。

(四)不同类型的交换机及应用场景

1. 直连交换机(Direct Exchange)
  • 特点:根据消息的路由键将消息路由到相应的队列,消息的路由键必须完全匹配队列绑定的路由键。在 DirectExchange 类中,会根据路由键进行精确匹配,找到对应的队列。
  • 应用场景:适用于需要精确路由的场景,如订单处理系统中,根据订单类型(如普通订单、团购订单等)将消息路由到不同队列,由不同的服务处理不同类型的订单。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class DirectExchangeProducer {private final static String EXCHANGE_NAME = "direct_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");String message = "Direct Exchange Message";String routingKey = "order.normal";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "' with routing key '" + routingKey + "'");}}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class DirectExchangeConsumer {private final static String EXCHANGE_NAME = "direct_exchange";private final static String QUEUE_NAME = "normal_order_queue";private final static String ROUTING_KEY = "order.normal";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
2. 主题交换机(Topic Exchange)
  • 特点:使用通配符(* 表示一个单词,# 表示零个或多个单词)的路由键,更灵活地路由消息。在 TopicExchange 类中,会根据通配符匹配路由键,将消息路由到多个队列。
  • 应用场景:适用于更灵活的路由场景,如日志系统,根据日志级别(如 *.info、#.error 等)将不同级别的日志消息路由到不同队列。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TopicExchangeProducer {private final static String EXCHANGE_NAME = "topic_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");String message = "Topic Exchange Message";String routingKey = "log.info";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "' with routing key '" + routingKey + "'");}}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class TopicExchangeConsumer {private final static String EXCHANGE_NAME = "topic_exchange";private final static String QUEUE_NAME = "info_log_queue";private final static String ROUTING_KEY = "log.info";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}
3. 扇出交换机(Fanout Exchange)
  • 特点:将消息广播到所有绑定的队列,不考虑路由键。在 FanoutExchange 类中,会将收到的消息复制到所有绑定的队列中。
  • 应用场景:适用于需要广播消息的场景,如消息通知系统,将一条消息同时发送给多个服务或用户,如在一个在线教育平台,当课程更新时,将更新消息发送给所有订阅该课程的用户。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class FanoutExchangeProducer {private final static String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "Fanout Exchange Message";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class FanoutExchangeConsumer {private final static String EXCHANGE_NAME = "fanout_exchange";private final static String QUEUE_NAME = "fanout_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

三、RabbitMQ 的消息持久化机制与可靠性保证

(一)消息持久化机制

1. 队列持久化
  • 在声明队列时,将 durable 参数设置为 true,可使队列持久化,在 RabbitMQ 重启后队列依然存在。在 Queue 类的 queueDeclare 方法中,会根据 durable 参数将队列信息存储到磁盘上,确保队列在服务器重启后不会丢失。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
2. 消息持久化
  • 在发送消息时,设置消息的 deliveryMode 属性为 2,表示消息是持久化的,会将消息存储在磁盘上。在 BasicProperties 类中,设置 deliveryMode 可以将消息持久化,这样即使 RabbitMQ 服务器重启,消息也不会丢失。
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PersistentProducer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "Persistent Message";AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).build();channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

代码解释:

  • ConnectionFactory 用于创建与 RabbitMQ 的连接,通过设置 host、port、username 和 password 来指定连接的相关信息。
  • channel.queueDeclare(QUEUE_NAME, true, false, false, null) 声明了一个持久化队列,这里将 durable 参数设为 true,表示该队列会在 RabbitMQ 服务器重启后仍然存在,确保队列的持久性。
  • AMQP.BasicProperties 类用于设置消息的属性,通过 builder().deliveryMode(2).build() 为消息设置 deliveryMode 为 2,使消息持久化,确保即使在 RabbitMQ 服务器重启或意外关闭的情况下,消息不会丢失。
  • channel.basicPublish(“”, QUEUE_NAME, properties, message.getBytes(“UTF-8”)) 方法将带有持久化属性的消息发送到指定队列,确保消息被存储在磁盘上。

(二)确认机制

1. 生产者确认

生产者可以使用确认机制确保消息已被正确发送到 RabbitMQ 服务器。在 Channel 类中,使用 confirmSelect() 方法开启确认模式,然后通过 waitForConfirms() 或 waitForConfirmsOrDie() 方法等待确认消息。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;public class ConfirmedProducer {private final static String QUEUE_NAME = "confirmed_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.confirmSelect();String message = "Confirmed Message";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));if (channel.waitForConfirms()) {System.out.println(" [x] Sent '" + message + "' and confirmed");} else {System.out.println(" [x] Sent '" + message + "' but not confirmed");}}}
}

代码解释:

  • channel.confirmSelect() 开启确认模式,使得生产者可以确认消息是否已被服务器接收。
  • channel.basicPublish() 发送消息。
  • channel.waitForConfirms() 等待服务器确认消息,如果收到确认,则消息成功发送;否则,可能出现发送失败的情况。
2. 消费者确认

消费者可以使用确认机制告知 RabbitMQ 消息是否已被成功处理。在 Channel 类中,通过 basicAck() 方法手动确认消息已被成功消费,使用 basicNack() 或 basicReject() 方法表示消息处理失败。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class ConfirmedConsumer {private final static String QUEUE_NAME = "confirmed_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 模拟消息处理过程Thread.sleep(1000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println(" [x] Acknowledged");} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println(" [x] Not acknowledged");}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

代码解释:

  • channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}) 中,将自动确认设置为 false,需要手动确认消息的消费。
  • deliverCallback 是消息到达时的回调函数,收到消息后,会对消息进行处理(这里模拟处理过程,使用 Thread.sleep(1000))。
  • channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) 手动确认消息已成功处理,通知 RabbitMQ 可以将消息从队列中移除。
  • channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true) 表示消息处理失败,第三个参数 true 表示重新入队,将消息重新放回队列中,以便再次消费。

(三)消息的可靠性保证策略

1. 持久化与确认机制结合
  • 结合队列和消息的持久化以及生产者和消费者的确认机制,可以大大提高消息传递的可靠性。
  • 生产者发送持久化消息并等待确认,消费者手动确认消息,确保消息在整个生命周期内的可靠性。
2. 备份交换机(Alternate Exchange)

当消息无法路由到任何队列时,可以使用备份交换机将消息路由到备用队列。在 Exchange 类中,设置 alternate-exchange 属性可以将无法路由的消息发送到备份交换机。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class AlternateExchangeProducer {private final static String MAIN_EXCHANGE_NAME = "main_exchange";private final static String BACKUP_EXCHANGE_NAME = "backup_exchange";private final static String QUEUE_NAME = "backup_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(MAIN_EXCHANGE_NAME, "direct", true, false, null);channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, "fanout", true, false, null);channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, BACKUP_EXCHANGE_NAME, "");channel.exchangeBind(MAIN_EXCHANGE_NAME, BACKUP_EXCHANGE_NAME, "");String message = "Message for Alternate Exchange";channel.basicPublish(MAIN_EXCHANGE_NAME, "non_existing_routing_key", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

代码解释:

  • channel.exchangeDeclare(MAIN_EXCHANGE_NAME, “direct”, true, false, null) 声明主交换机,设置为持久化。
  • channel.exchangeDeclare(BACKUP_EXCHANGE_NAME, “fanout”, true, false, null) 声明备份交换机,这里使用扇出交换机将消息广播到所有绑定队列。
  • channel.queueDeclare(QUEUE_NAME, true, false, false, null) 声明备份队列,设置为持久化。
  • channel.queueBind(QUEUE_NAME, BACKUP_EXCHANGE_NAME, “”) 将备份队列绑定到备份交换机。
  • channel.exchangeBind(MAIN_EXCHANGE_NAME, BACKUP_EXCHANGE_NAME, “”) 将备份交换机绑定到主交换机,作为备用路由。
  • channel.basicPublish(MAIN_EXCHANGE_NAME, “non_existing_routing_key”, null, message.getBytes(“UTF-8”)) 发送消息到主交换机,但使用了一个不存在的路由键,会导致消息无法路由到正常队列,从而被路由到备份交换机,最终到达备份队列。
3. 死信队列(Dead Letter Queue)

当消息在队列中满足一定条件(如过期、被拒绝、超出队列长度等)时,会被发送到死信队列,方便对这些消息进行后续处理。在 Queue 类中,通过设置 x-dead-letter-exchange 和 x-dead-letter-routing-key 参数将队列关联到死信队列。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class DeadLetterQueueProducer {private final static String QUEUE_NAME = "normal_queue";private final static String DEAD_LETTER_QUEUE_NAME = "dead_letter_queue";private final static String EXCHANGE_NAME = "dead_letter_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null);channel.queueBind(DEAD_LETTER_QUEUE_NAME, EXCHANGE_NAME, "dead_letter_key");Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", EXCHANGE_NAME);args.put("x-dead-letter-routing-key", "dead_letter_key");channel.queueDeclare(QUEUE_NAME, true, false, false, args);String message = "Message for Dead Letter Queue";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

代码解释:

  • channel.exchangeDeclare(EXCHANGE_NAME, “direct”) 声明一个用于死信队列的交换机。
  • channel.queueDeclare(DEAD_LETTER_QUEUE_NAME, true, false, false, null) 声明死信队列并绑定到该交换机。
  • channel.queueBind(DEAD_LETTER_QUEUE_NAME, EXCHANGE_NAME, “dead_letter_key”) 完成绑定。
  • args.put(“x-dead-letter-exchange”, EXCHANGE_NAME) 和 args.put(“x-dead-letter-routing-key”, “dead_letter_key”) 为正常队列设置死信队列相关属性,当正常队列中的消息满足死信条件时,会被路由到死信队列。
  • channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes(“UTF-8”)) 发送消息到正常队列。

四、实际项目案例展示

(一)案例背景

考虑一个在线票务系统,包括用户下单、库存管理、支付处理等服务,这些服务之间需要高效、可靠的消息传递。

(二)架构设计

生产者服务(用户下单):

用户下单后,订单服务作为生产者将订单消息发送到 RabbitMQ 的队列或交换机。使用直连交换机将不同类型的订单(如电影票订单、演出票订单)路由到不同队列。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class TicketOrderProducer {private final static String EXCHANGE_NAME = "ticket_order_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");String message = "New Ticket Order";String routingKey = "movie_ticket_order";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "' with routing key '" + routingKey + "'");}}
}
消费者服务(库存管理):

库存管理服务作为消费者,从相应队列接收订单消息,处理订单的库存扣减操作。使用确认机制确保消息可靠处理。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class InventoryConsumer {private final static String EXCHANGE_NAME = "ticket_order_exchange";private final static String QUEUE_NAME = "movie_ticket_order_queue";private final static String ROUTING_KEY = "movie_ticket_order";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 模拟库存扣减操作Thread.sleep(2000);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println(" [x] Inventory deducted and acknowledged");} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println(" [x] Inventory deduction failed, not acknowledged");}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}
消费者服务(支付处理):

支付服务作为另一个消费者,从相应队列接收订单消息,处理订单的支付操作,同样使用确认机制保证消息的可靠处理。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;public class PaymentConsumer {private final static String EXCHANGE_NAME = "ticket_order_exchange";private final static String QUEUE_NAME = "payment_order_queue";private final static String ROUTING_KEY = "payment_order";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 模拟支付操作Thread.sleep(1500);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println(" [x] Payment processed and acknowledged");} catch (Exception e) {channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);System.out.println(" [x] Payment processing failed, not acknowledged");}};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

代码解释:

  • ConnectionFactory 类用于创建与 RabbitMQ 的连接,通过设置 host、port、username 和 password 来配置连接信息。
  • channel.exchangeDeclare(EXCHANGE_NAME, “direct”) 声明一个直连类型的交换机,用于路由消息。
  • channel.queueDeclare(QUEUE_NAME, true, false, false, null) 声明一个持久化的队列,该队列在 RabbitMQ 重启后不会丢失。
  • channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY) 将队列绑定到交换机上,使用 ROUTING_KEY 作为路由键,确保消息能正确路由到该队列。
  • DeliverCallback 是一个回调函数,当消息被接收时会被调用。在这个回调函数中:
    String message = new String(delivery.getBody(), “UTF-8”); 将接收到的消息体转换为字符串。
  • Thread.sleep(1500); 模拟支付操作的耗时。
  • channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 表示成功处理消息后手动确认,通知 RabbitMQ 可以将该消息从队列中移除。
  • channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 表示处理消息失败,将消息重新放回队列(第三个参数 true 表示重新入队),以便后续再次处理。
  • channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {}); 开始消费消息,false 表示需要手动确认消息消费,deliverCallback 是消息接收的回调函数,consumerTag -> {} 是取消消费的回调函数(这里未实现具体逻辑)。

(三)性能优化与可靠性保障

1. 性能优化
连接池的使用:

可以使用连接池来管理与 RabbitMQ 的连接,避免频繁创建和关闭连接,提高性能。例如,使用 Apache Commons Pool 等库创建连接池。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;public class RabbitMQConnectionPool {private final static String QUEUE_NAME = "pooled_queue";private final ObjectPool<Channel> channelPool;public RabbitMQConnectionPool() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection;try {connection = factory.newConnection();} catch (Exception e) {throw new RuntimeException("Failed to create connection", e);}GenericObjectPoolConfig<Channel> poolConfig = new GenericObjectPoolConfig<>();poolConfig.setMaxTotal(10);channelPool = new GenericObjectPool<>(new RabbitMQChannelFactory(connection), poolConfig);}public void sendMessage(String message) throws Exception {try (Channel channel = channelPool.borrowObject()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}private static class RabbitMQChannelFactory extends BasePooledObjectFactory<Channel> {private final Connection connection;public RabbitMQChannelFactory(Connection connection) {this.connection = connection;}@Overridepublic Channel create() throws Exception {return connection.createChannel();}@Overridepublic PooledObject<Channel> wrap(Channel channel) {return new DefaultPooledObject<>(channel);}}public static void main(String[] args) throws Exception {RabbitMQConnectionPool pool = new RabbitMQConnectionPool();pool.sendMessage("Message from connection pool");}
}

代码解释:

  • GenericObjectPoolConfig 用于配置连接池的参数,如 setMaxTotal(10) 设置连接池的最大连接数。
  • RabbitMQChannelFactory 是一个自定义的连接工厂,用于创建 Channel 对象。
  • channelPool 是一个 ObjectPool 类型的连接池,通过 GenericObjectPool 实现。
  • sendMessage 方法从连接池获取 Channel 对象,发送消息后将 Channel 对象归还给连接池,避免频繁创建和关闭 Channel,提高性能。
批量发送和接收消息:

生产者可以将多个消息批量发送,减少网络开销。消费者也可以批量处理消息,提高处理效率。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;import java.util.ArrayList;
import java.util.List;public class RabbitMQBatchProducer {private final static String QUEUE_NAME = "batch_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);List<byte[]> messages = new ArrayList<>();for (int i = 0; i < 10; i++) {messages.add(("Batch Message " + i).getBytes("UTF-8"));}channel.confirmSelect();channel.basicPublish("", QUEUE_NAME, new BasicProperties().builder().deliveryMode(2).build(), messages.get(0));for (int i = 1; i < messages.size(); i++) {channel.basicPublish("", QUEUE_NAME, null, messages.get(i));}if (channel.waitForConfirms()) {System.out.println(" [x] Sent batch messages and confirmed");} else {System.out.println(" [x] Failed to send batch messages");}}}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
import java.util.List;public class RabbitMQBatchConsumer {private final static String QUEUE_NAME = "batch_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);List<String> receivedMessages = new ArrayList<>();DeliverCallback deliverCallback = (consumerTag, delivery) -> {receivedMessages.add(new String(delivery.getBody(), "UTF-8"));if (receivedMessages.size() >= 5) {processBatchMessages(receivedMessages);receivedMessages.clear();}};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}private static void processBatchMessages(List<String> messages) {for (String message : messages) {System.out.println(" [x] Received '" + message + "'");// 在这里可以进行批量消息的处理操作}}
}

代码解释:

在 RabbitMQBatchProducer 中:

  • messages 列表存储要发送的多个消息。
  • channel.confirmSelect() 开启确认模式。
  • 先发送第一个消息并设置属性,后续消息使用 channel.basicPublish(“”, QUEUE_NAME, null, messages.get(i)) 发送,减少属性设置次数。
  • channel.waitForConfirms() 等待确认。
  • 在 RabbitMQBatchConsumer 中:
  • receivedMessages 列表存储接收到的消息,当达到一定数量(这里是 5 条)时,调用 processBatchMessages 进行批量处理。
2. 可靠性保障

持久化、确认机制和重试机制的综合运用:
确保队列和消息的持久化,生产者和消费者都使用确认机制,并结合重试机制处理异常情况,提高系统的可靠性。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.ConfirmListener;
import java.util.concurrent.atomic.AtomicInteger;public class ReliableMessageSystem {private final static String QUEUE_NAME = "reliable_queue";private static final int MAX_RETRIES = 3;public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, true, false, false, null);channel.confirmSelect();AtomicInteger retries = new AtomicInteger(0);channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) {System.out.println(" [x] Message acknowledged");retries.set(0);}@Overridepublic void handleNack(long deliveryTag, boolean multiple) {if (retries.getAndIncrement() < MAX_RETRIES) {try {System.out.println(" [x] Message not acknowledged, retrying...");// 重新发送消息String message = "Retry Message";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));} catch (Exception e) {System.out.println(" [x] Failed to retry");}} else {System.out.println(" [x] Exceeded max retries");}}});String message = "Reliable Message";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}

代码解释:

  • channel.queueDeclare(QUEUE_NAME, true, false, false, null) 确保队列持久化。
  • channel.confirmSelect() 开启生产者确认模式。
  • AtomicInteger retries 用于记录重试次数。
  • channel.addConfirmListener 添加确认监听器,根据 handleAck 和 handleNack 处理确认和未确认的情况,未确认时根据重试次数进行消息的重新发送。

(四)总结

在分布式系统中,RabbitMQ 作为一种强大的消息队列,通过其丰富的功能和特性,可以帮助我们实现系统的解耦、异步处理和流量削峰,提升系统的性能和可靠性。通过深入理解其核心概念,如交换机、队列、绑定,以及使用不同类型的交换机满足不同的路由需求,我们可以构建出灵活多样的消息传递架构。同时,通过消息持久化、确认机制、备份交换机、死信队列等手段,可以保证消息的可靠性。在实际项目中,我们可以根据不同的业务场景和性能要求,结合性能优化技术(如连接池、批量处理)和可靠性保障策略(如持久化、确认和重试机制),构建出高效、可靠的消息传递系统。
RabbitMQ 的源码实现细节展现了其在消息存储、路由、消费和确认等方面的精细设计,例如在 Channel 类中对消息的发送、接收和确认操作的实现,以及 Queue 类对队列的管理和存储等。通过上述的代码示例和实际案例,希望你能更好地掌握 RabbitMQ 的使用,在分布式系统开发中灵活运用消息队列技术,提高系统的可维护性和可扩展性,满足不同业务场景下的消息传递需求。
需要注意的是,在实际使用 RabbitMQ 时,要根据系统的负载、性能要求和可靠性要求,合理配置和优化各项参数,确保系统在不同的场景下都能稳定、高效地运行。同时,持续关注 RabbitMQ 的官方文档和社区资源,以便更好地应对可能出现的问题和利用新的特性。

相关资料已更新
关注公众号:搜 架构研究站,回复:资料领取,即可获取全部面试题以及1000+份学习资料
在这里插入图片描述

相关文章:

【RabbitMQ消息队列原理与应用】

RabbitMQ消息队列原理与应用 一、消息队列概述 &#xff08;一&#xff09;概念 消息队列&#xff08;Message Queue&#xff0c;简称MQ&#xff09;是一种应用程序间的通信方式&#xff0c;它允许应用程序通过将消息放入队列中&#xff0c;而不是直接调用其他应用程序的接口…...

反欺诈风控体系及策略

本文详细介绍了互联网领域金融信贷行业的反欺诈策略。首先&#xff0c;探讨了反欺诈的定义、重要性以及在当前互联网发展背景下欺诈风险的加剧。接着&#xff0c;分析了反欺诈的主要手段和基础技术&#xff0c;包括对中介和黑产的了解、欺诈风险的具体类型和表现方式&#xff0…...

Mac 12.1安装tiger-vnc问题-routines:CRYPTO_internal:bad key length

背景&#xff1a;因为某些原因需要从本地mac连接远程linxu桌面查看一些内容&#xff0c;必须使用桌面查看&#xff0c;所以ssh无法满足&#xff0c;所以决定安装vnc客户端。 问题&#xff1a; 在mac上通过 brew install tiger-vnc命令安装, 但是报错如下&#xff1a; > D…...

【代码分析】Unet-Pytorch

1&#xff1a;unet_parts.py 主要包含&#xff1a; 【1】double conv&#xff0c;双层卷积 【2】down&#xff0c;下采样 【3】up&#xff0c;上采样 【4】out conv&#xff0c;输出卷积 """ Parts of the U-Net model """import torch im…...

【LLM入门系列】01 深度学习入门介绍

NLP Github 项目&#xff1a; NLP 项目实践&#xff1a;fasterai/nlp-project-practice 介绍&#xff1a;该仓库围绕着 NLP 任务模型的设计、训练、优化、部署和应用&#xff0c;分享大模型算法工程师的日常工作和实战经验 AI 藏经阁&#xff1a;https://gitee.com/fasterai/a…...

安卓系统主板_迷你安卓主板定制开发_联发科MTK安卓主板方案

安卓主板搭载联发科MT8766处理器&#xff0c;采用了四核Cortex-A53架构&#xff0c;高效能和低功耗设计。其在4G网络待机时的电流消耗仅为10-15mA/h&#xff0c;支持高达2.0GHz的主频。主板内置IMG GE832 GPU&#xff0c;运行Android 9.0系统&#xff0c;内存配置选项丰富&…...

关键点检测——HRNet原理详解篇

&#x1f34a;作者简介&#xff1a;秃头小苏&#xff0c;致力于用最通俗的语言描述问题 &#x1f34a;专栏推荐&#xff1a;深度学习网络原理与实战 &#x1f34a;近期目标&#xff1a;写好专栏的每一篇文章 &#x1f34a;支持小苏&#xff1a;点赞&#x1f44d;&#x1f3fc;、…...

25考研总结

11408确实难&#xff0c;25英一直接单科斩杀&#x1f62d; 对过去这一年多备考的经历进行复盘&#xff0c;以及考试期间出现的问题进行思考。 考408的人&#xff0c;政治英语都不能拖到最后&#xff0c;408会惩罚每一个偷懒的人&#xff01; 政治 之所以把政治写在最开始&am…...

网络安全态势感知

一、网络安全态势感知&#xff08;Cyber Situational Awareness&#xff09;是一种通过收集、处理和分析网络数据来理解当前和预测未来网络安全状态的能力。它的目的是提供实时的、安全的网络全貌&#xff0c;帮助组织理解当前网络中发生的事情&#xff0c;评估风险&#xff0c…...

在K8S中,节点状态notReady如何排查?

在kubernetes集群中&#xff0c;当一个节点&#xff08;Node&#xff09;的状态变为NotReady时&#xff0c;意味着该节点可能无法运行Pod或不能正确相应kubernetes控制平面。排查NotReady节点通常涉及以下步骤&#xff1a; 1. 获取基本信息 使用kubectl命令行工具获取节点状态…...

深度学习在光学成像中是如何发挥作用的?

深度学习在光学成像中的作用主要体现在以下几个方面&#xff1a; 1. **图像重建和去模糊**&#xff1a;深度学习可以通过优化图像重建算法来处理模糊图像或降噪&#xff0c;改善成像质量。这涉及到从低分辨率图像生成高分辨率图像&#xff0c;突破传统光学系统的分辨率限制。 …...

树莓派linux内核源码编译

Raspberry Pi 内核 托管在 GitHub 上&#xff1b;更新滞后于上游 Linux内核&#xff0c;Raspberry Pi 会将 Linux 内核的长期版本整合到 Raspberry Pi 内核中。 1 构建内核 操作系统随附的默认编译器和链接器被配置为构建在该操作系统上运行的可执行文件。原生编译使用这些默…...

本地小主机安装HomeAssistant开源智能家居平台打造个人AI管家

文章目录 前言1. 添加镜像源2. 部署HomeAssistant3. HA系统初始化配置4. HA系统添加智能设备4.1 添加已发现的设备4.2 添加HACS插件安装设备 5. 安装cpolar内网穿透5.1 配置HA公网地址 6. 配置固定公网地址 前言 大家好&#xff01;今天我要向大家展示如何将一台迷你的香橙派Z…...

SpringBoot返回文件让前端下载的几种方式

01 背景 在后端开发中&#xff0c;通常会有文件下载的需求&#xff0c;常用的解决方案有两种&#xff1a; 不通过后端应用&#xff0c;直接使用nginx直接转发文件地址下载&#xff08;适用于一些公开的文件&#xff0c;因为这里不需要授权&#xff09;通过后端进行下载&#…...

人工智能及深度学习的一些题目

1、一个含有2个隐藏层的多层感知机&#xff08;MLP&#xff09;&#xff0c;神经元个数都为20&#xff0c;输入和输出节点分别由8和5个节点&#xff0c;这个网络有多少权重值&#xff1f; 答&#xff1a;在MLP中&#xff0c;权重是连接神经元的参数&#xff0c;每个连接都有一…...

15-利用dubbo远程服务调用

本文介绍利用apache dubbo调用远程服务的开发过程&#xff0c;其中利用zookeeper作为注册中心。关于zookeeper的环境搭建&#xff0c;可以参考我的另一篇博文&#xff1a;14-zookeeper环境搭建。 0、环境 jdk&#xff1a;1.8zookeeper&#xff1a;3.8.4dubbo&#xff1a;2.7.…...

【Rust自学】8.5. HashMap Pt.1:HashMap的定义、创建、合并与访问

8.5.0. 本章内容 第八章主要讲的是Rust中常见的集合。Rust中提供了很多集合类型的数据结构&#xff0c;这些集合可以包含很多值。但是第八章所讲的集合与数组和元组有所不同。 第八章中的集合是存储在堆内存上而非栈内存上的&#xff0c;这也意味着这些集合的数据大小无需在编…...

未来网络技术的新征程:5G、物联网与边缘计算(10/10)

一、5G 网络&#xff1a;引领未来通信新潮流 &#xff08;一&#xff09;5G 网络的特点 高速率&#xff1a;5G 依托良好技术架构&#xff0c;提供更高的网络速度&#xff0c;峰值要求不低于 20Gb/s&#xff0c;下载速度最高达 10Gbps。相比 4G 网络&#xff0c;5G 的基站速度…...

LLM(十二)| DeepSeek-V3 技术报告深度解读——开源模型的巅峰之作

近年来&#xff0c;大型语言模型&#xff08;LLMs&#xff09;的发展突飞猛进&#xff0c;逐步缩小了与通用人工智能&#xff08;AGI&#xff09;的差距。DeepSeek-AI 团队最新发布的 DeepSeek-V3&#xff0c;作为一款强大的混合专家模型&#xff08;Mixture-of-Experts, MoE&a…...

Uniapp在浏览器拉起导航

Uniapp在浏览器拉起导航 最近涉及到要在浏览器中拉起导航&#xff0c;对目标点进行路线规划等功能&#xff0c;踩了一些坑&#xff0c;找到了使用方法。&#xff08;浏览器拉起&#xff09; 效果展示 可以拉起三大平台及苹果导航 点击选中某个导航&#xff0c;会携带经纬度跳转…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖

在前面的练习中&#xff0c;每个页面需要使用ref&#xff0c;onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入&#xff0c;需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

376. Wiggle Subsequence

376. Wiggle Subsequence 代码 class Solution { public:int wiggleMaxLength(vector<int>& nums) {int n nums.size();int res 1;int prediff 0;int curdiff 0;for(int i 0;i < n-1;i){curdiff nums[i1] - nums[i];if( (prediff > 0 && curdif…...

智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制

在数字化浪潮席卷全球的今天&#xff0c;数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具&#xff0c;在大规模数据获取中发挥着关键作用。然而&#xff0c;传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时&#xff0c;常出现数据质…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

LRU 缓存机制详解与实现(Java版) + 力扣解决

&#x1f4cc; LRU 缓存机制详解与实现&#xff08;Java版&#xff09; 一、&#x1f4d6; 问题背景 在日常开发中&#xff0c;我们经常会使用 缓存&#xff08;Cache&#xff09; 来提升性能。但由于内存有限&#xff0c;缓存不可能无限增长&#xff0c;于是需要策略决定&am…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能

1. 开发环境准备 ​​安装DevEco Studio 3.1​​&#xff1a; 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK ​​项目配置​​&#xff1a; // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...