掌握RabbitMQ:全面知识点汇总与实践指南
前言
RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。
特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。
作用:服务间异步通信;顺序消费;定时任务;请求削峰;
1、AMQP协议定义
AMQP(Advanced Message Queuing Protocol)高级消息队列协议,是一个高效的、跨平台的应用层协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输
特性 | AMQP | MQTT |
---|---|---|
适用场景 | 大型企业级应用、金融交易、云服务 | 物联网、移动应用、智能家居 |
通信模式 | 生产者-消费者 | 发布-订阅 |
消息大小 | 较大,适合复杂的消息结构 | 小型,适合简单的消息 |
QoS 级别 | 支持,但不如 MQTT 精细 | 详细的 QoS 级别,特别是针对 IoT 场景 |
性能要求 | 对性能有一定要求,但更注重可靠性和安全性 | 极低的带宽消耗和资源占用 |
安全性 | 强调端到端的安全性 | 支持基本的安全特性,适用于资源受限环境 |
2、AMQP机制
1>AMQP生产者、消费者工作机制
AMQP高级消息队列协议,基于生产者消费者模式,消息基于交换器Exchange、队列Queue、绑定Binding进行路由。
- 生产者发送消息到Broker消息代理服务
- 交换器接收生产者发送的消息,根据预定义规则,分发给一个或多个队列
- 队列存储消息,直到消费者取走消息
- 消费者,读取队列中的消息
AMQP定义了严格的消息结构,使用了类型化数据表示法描述消息内容来兼容不同的系统。
类型化数据表示法(Typed Representation of Data)是指在计算机编程语言中,数据和其相关联的类型信息一起被表示的方法。
2>AMQP消息传递方式
特性 | 点对点模式 (P2P) | 发布/订阅模式 (Pub/Sub) |
---|---|---|
消息传递方式 | 每条消息仅被一个消费者处理 | 一条消息可以被多个消费者同时接收 |
队列数量 | 单个队列 | 每个消费者有自己的队列 |
生产者行为 | 直接发送到队列 | 发送到交换器,由交换器负责路由 |
消费者行为 | 从同一队列中竞争消费 | 各自独立消费自己的队列中的消息 |
适用场景 | 任务分配、工作流管理 | 广播通知、日志记录、事件驱动架构 |
扩展性 | 受限于单个队列的吞吐量 | 可以通过增加更多的消费者来提高整体吞吐量 |
复杂度 | 较低,易于理解和实现 | 需要考虑交换器类型、路由规则等因素,稍微复杂 |
- 1、点对点
生产者将消息发送到一个特定的队列中,而消费者则从该队列中获取消息
每个消息只会被一个消费者处理,即使有多个消费者监听同一个队列。
竞争消费:多个实例尝试处理同一个消息时,可能出现重复消费或消息未及时得到处理的情况。
(1)竞争消费问题
在k8s部署多实例场景下,虽然提升了系统的吞吐量,通过调度器实现了负载均衡,多个实例从一个队列中读取消息,但是并发场景客观存在竞争消费的情况,导致重复消费消息。
(2)解决建议
合理配置消息队列、业务方法幂等性设计、分布式锁控制、增加监控告警和自动恢复动作。
// 生产者代码片段
Channel channel = connection.createChannel();
channel.queueDeclare("task_queue", true, false, false, null);
String message = "Task to be processed";
channel.basicPublish("", "task_queue", null, message.getBytes());// 消费者代码片段
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 执行任务...channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> {});
- 2、发布订阅
生产者将消息发送到一个交换器(Exchange),而不是直接发送到队列。交换器根据预定义的路由规则(Binding Key)将消息转发给一个或多个匹配的队列。每个队列可以有多个消费者订阅,所有订阅者都能收到相同的消息副本
(1)主题分区
为不同类型的时间,创建不同的主题或分区,来减少不必要的复制。实例只订阅感兴趣的主题,降低资源开销
(2)限流
避免过载,限制单位之间内消费的最大消息
// 生产者代码片段
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs_exchange", "fanout");
String message = "Info log message";
channel.basicPublish("logs_exchange", "", null, message.getBytes());// 消费者代码片段
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs_exchange", "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");// 处理日志...
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
3、AMQP消息只被消费一次
- 1、合理配置消息队列ACK机制
大多数消息队列都提供了手动确认(ACK)的功能,允许消费者成功处理后,主动通知消息代理
// 使用 RabbitMQ 的手动确认示例
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
// 处理完成后发送 ACK
channel.basicAck(envelope.getDeliveryTag(), false);
- 2、合理配置消息队列预取数量
防止一次性去除较多的未处理消息。
// 设置预取计数
channel.basicQos(prefetchCount);
- 3、消费者幂等性设计
针对消息全局唯一的ID,入库,每次收到消息时先检查是否已入库
确保同一条消息多次处理的结果是一致的,避免重复的消息执行两次结果不一致
增加补偿机制,比如退款,退积分等概念的操作 - 4、分布式锁
借助Redis 的 Redlock 算法协调多个消费者实例之间的消息处理,只有获取到锁的消息可以处理,其他的放弃或等待。 - 5、监控告警机制
监控消息队列服务健康情况,针对可能重复消费的消息及时告警到服务负责人介入处理。 - 6、事务性消息
指的是消息和业务操作,一起成功或一起失败的机制。
(1)本地事务+补偿机制
(2)二阶段提交
引入协调者和参与者的概念。
客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出提交事务命令,否则全部回滚。每个参与者返回ack结果,协调者汇总执行结果,释放占用的资源。
(3)三阶段提交
针对二阶段提交完善事务性消息机制。
首先客户端向协调者发起事务请求,协调者询问各参与者是否准备好。全部准备好,则发出预执行事务命令。各参与者收到命令,执行事务,但不提交。并返回ack,等待最终命令。协调者收到全部准备好,则发出提交事务命令。
4、AMQP 消息顺序消费
- 单实例独占队列,可以保证顺序消费,但是分布式高可用场景一般都是多实例部署,独占队列无法解决消息顺序消费的问题。
- 为了保证顺序消费,通常建议针对预取消息数量Prefetch Count设置为1:
channel.basicQos(1);
- 可以使用分布式锁确保消息消费是同步操作,并发安全,在成功处理消息后,手动发送ack确认到消息代理。
- 另外使用幂等性设计来避免重复消费。
- 增加补偿机制来处理幂等性设计无法保证的场景,比如退款等操作
- 增加监控告警到服务负责人。
- 可以对消息根据业务类型或特定的前缀规则,将不同的消息分到不同的分区或队列中,每个队列和分区内部是遵循先进先出规则来保证顺序消费的。
5、AMQP消息可靠性
- 事务支持
允许一组操作作为一个整体提交或回滚。 - ACK确认机制
当消息成功投递后,接收方会向发送方发送 ACK 确认;如果发生错误,则发送 NACK 拒绝。 - 持久化选项
可以选择是否将消息保存到磁盘上,以防服务中断时丢失重要数据。
6、RabbitMQ配置ACK
1>rabbitmq.conf或rabbitmq.ini开启配置
# 启用自动恢复功能,确保在网络中断后能够自动重连
connection.cached = true
# 设置心跳检测间隔,防止长时间无通信导致连接断开
heartbeat = 60
# 启用 Publisher Confirms,允许生产者收到消息确认
publisher_confirms = on
2>消费者手动确认
声明队列,确保队列存在
设置预取计数,限制每次从队列中拉取的消息数量为 1,以避免过载
开启手动确认模式,通过 channel.basicConsume 方法中的第二个参数 false 来关闭自动确认,改为手动确认
发送 ACK 确认,在成功处理完消息后,调用 channel.basicAck 方法发送确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class RabbitMQConsumer {private final static String QUEUE_NAME = "task_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,确保它存在channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置预取计数为 1,确保每次只处理一条消息channel.basicQos(1);// 开启手动确认模式DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {// 模拟任务处理时间Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();} finally {System.out.println(" [x] Done");// 手动发送 ACK 确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};// 开始消费消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}}
}
3>配置 Publisher Confirms和Transaction
允许生产者在发送消息后等待消息代理的确认
// 开启 Publisher Confirms 模式
channel.confirmSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}
Channel channel = connection.createChannel();
// 开启 Publisher Confirms 模式
channel.confirmSelect();
// 发送消息并等待确认
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());if (!channel.waitForConfirmsOrDie(timeout)) {// 处理未确认的消息...}
} catch (Exception e) {// 处理异常情况...
}// 开启事务模式
channel.txSelect();
try {channel.basicPublish(exchangeName, routingKey, null, message.getBytes());channel.txCommit();
} catch (Exception e) {channel.txRollback();
}
7、RabbitMQ配置协议
1>rabbitmq.conf
RabbitMQ默认是AMQP 0-9-1协议。支持设置监听端口。
支持启用SSL认证提高安全性。
支持设置心跳保证客户端和服务端连接保持活跃。
# 设置 AMQP 0-9-1 的监听端口
listeners.tcp.default = 5672
# 确保 AMQP 插件已启用,AMQP 0-9-1 是默认启用的
enabled_plugins = [rabbitmq_amqp1_0]# 启用 SSL/TLS 支持
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/private_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# 设置 SSL/TLS 监听端口
listeners.ssl.default = 5671# 设置心跳间隔时间为 60 秒
heartbeat = 60
8、RabbitMQ消息持久化
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class PersistentExample {private final static String QUEUE_NAME = "persistent_queue";private final static String EXCHANGE_NAME = "persistent_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 创建持久化的交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);// 创建持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定队列到交换器channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routing_key");// 发送持久化的消息String message = "Persistent message!";AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2 表示持久化.build();channel.basicPublish(EXCHANGE_NAME, "routing_key", props, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}
- 1、持久化队列
channel.queueDeclare("durable_queue", true, false, false, null);
- 2、交换器持久化
确保在 RabbitMQ 启动时已经预声明了所有必要的交换器和队列绑定,以避免消息丢失
channel.exchangeDeclare("durable_exchange", "direct", true);
- 3、消息持久化
delivery_mode 参数:设置为 2 表示持久化消息;设置为 1(默认)则表示非持久化消息
channel.basicPublish("exchange_name", "routing_key", new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());
9、RabbitMQ自动重连
网络中断或其他异常情况下自动重新连接到 RabbitMQ 并恢复之前的连接状态
ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
10、RabbitMQ组件
组件 | 名称 | 说明 |
---|---|---|
Producer | 生产者 | 负责生成并发送消息的应用程序。 |
Consumer | 消费者 | 负责接收并处理消息的应用程序。 |
Message | 消息 | 承载业务数据的基本单元,包含消息体(Body)、属性(Properties)等信息。 |
Exchange | 交换机 | 用于接收来自生产者的消息,并根据路由规则将其分发到一个或多个队列中。 |
Queue | 队列 | 存储待处理消息的地方,消费者从中拉取消息进行处理。 |
Binding | 绑定 | 定义了交换机和队列之间的关系,包括路由键等参数。 |
Virtual Host | 虚拟主机 | 类似于命名空间的概念,用于隔离不同的应用环境,每个虚拟主机都有自己独立的一套用户、权限、交换机、队列等资源。 |
11、RabbitMQ核心组件交换器和路由键
交换器(Exchange)和路由键(Routing Key)是消息传递系统的核心组件,它们共同决定了消息如何从生产者传递到正确的队列。
消息提供方生产消息,根据预定规则,路由至匹配的一个或多个队列。
消息创建时设定路由键,消息发布到交换器时,通过队列路由键,把队列绑定到交换器上。消息到达交换器后,RabbitMQ会将消息的路由键与队列的路由键进行匹配。
若队列至少有一个消费者订阅,消息将以轮询方式发给消费者。
交换器 | 说明 | 应用场景 |
---|---|---|
Direct | 精确匹配路由键 | 只有当路由键完全匹配时,消息才会被发送到对应的队列。适用于一对一的消息分发。 |
Topic | 基于通配符模式匹配路由键 | 适用于灵活的消息过滤和多条件匹配。 |
Fanout | 广播所有消息给所有绑定的队列 | 适用于需要将相同消息发送给多个消费者的场景。 |
Headers | 根据消息头属性进行路由 | 适用于复杂的消息路由需求,例如根据多个字段组合来决定消息去向。 |
1>Direct Exchange 精准匹配路由键交换器
根据路由键完全匹配队列,如果找不到匹配的队列,则消息会被丢弃。
- 生产者
// 创建 Direct Exchange
channel.exchangeDeclare("direct_logs", "direct");// 绑定队列到 Exchange,并指定 Binding Key
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "info");
channel.queueBind(queueName, "direct_logs", "warning");
channel.queueBind(queueName, "direct_logs", "error");// 发送消息时指定 Routing Key
channel.basicPublish("direct_logs", "info", null, "Info log message".getBytes());
- 消费者
import com.rabbitmq.client.*;public class DirectConsumer {private final static String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Binding Keyif (argv.length < 1) {System.err.println("Usage: DirectConsumer [info] [warning] [error]");System.exit(1);}for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
2>Fanout Exchange广播交换器
广播所有消息给所有绑定的队列
- 生产者
// 创建 Fanout Exchange
channel.exchangeDeclare("logs", "fanout");// 绑定队列到 Exchange
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");// 发送消息时不指定 Routing Key
channel.basicPublish("logs", "", null, "Broadcast log message".getBytes());
- 消费者
import com.rabbitmq.client.*;public class FanoutConsumer {private final static String EXCHANGE_NAME = "logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchangechannel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
3>Topic Exchange 通配符路由器
*:匹配一个单词;#:匹配零个或多个单词
- 生产者
// 创建 Topic Exchange
channel.exchangeDeclare("topic_logs", "topic");// 绑定队列到 Exchange,并指定 Binding Key 模式
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.orange.*");
channel.queueBind(queueName, "topic_logs", "*.*.rabbit");
channel.queueBind(queueName, "topic_logs", "lazy.#");// 发送消息时指定符合模式的 Routing Key
channel.basicPublish("topic_logs", "quick.orange.rabbit", null, "Quick orange rabbit".getBytes());
- 消费者
import com.rabbitmq.client.*;public class TopicConsumer {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Binding Key 模式if (argv.length < 1) {System.err.println("Usage: TopicConsumer [binding_key_pattern]");System.exit(1);}for (String bindingKey : argv) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
4>Headers Exchange 根据消息头属性进行路由
不依赖路由键,当消息的 headers 完全匹配时,才会将消息发送到对应的队列。
- 生产者
// 创建 Headers Exchange
channel.exchangeDeclare("headers_exchange", "headers");// 绑定队列到 Exchange,并指定 Headers 匹配规则
Map<String, Object> headers = new HashMap<>();
headers.put("user_id", "12345");
headers.put("order_status", "pending");
AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, "headers_exchange", "", headers);// 发送带有 Headers 的消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish("headers_exchange", "", props, "Message with specific headers".getBytes());
- 消费者
import com.rabbitmq.client.*;public class HeadersConsumer {private final static String EXCHANGE_NAME = "headers_exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明 Exchangechannel.exchangeDeclare(EXCHANGE_NAME, "headers");// 创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定队列到 Exchange,并指定 Headers 匹配规则Map<String, Object> headers = new HashMap<>();headers.put("user_id", "12345");headers.put("order_status", "pending");AMQP.Queue.BindOk bindOk = channel.queueBind(queueName, EXCHANGE_NAME, "", headers);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}}
}
12、RabbitMQ核心方法及参数说明
1>newConnection 创建连接工程并开启连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
2>createChannel 创建信道
RabbitMQ 使用信道的方式来传输数据
信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接可以创建多个信道,每个信道都是独立的通信线路,可以并发地发送和接收消息。
Channel channel = connection.createChannel();
3>exchangeDeclare 交换器声明
channel.exchangeDeclare("my_exchange", "direct", true, false, null);
exchange: 交换器名称。
type: 交换器类型(如 “direct”, “fanout”, “topic”, “headers”)。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
autoDelete: 自动删除标志,true 表示当最后一个队列断开时自动删除交换器。
internal: 内部交换器标志,true 表示该交换器只能被其他交换器使用,不能直接由生产者发布消息。
arguments: 其他可选参数,例如死信交换器、过期时间等。
4>queueDeclare 队列声明
// 创建临时队列
String queueName = channel.queueDeclare().getQueue();
queue: 队列名称,为空字符串时表示创建临时队列。
durable: 持久化标志,true 表示持久化,false 表示非持久化。
exclusive: 排他性标志,true 表示仅当前连接可用,连接关闭后自动删除。
autoDelete: 自动删除标志,true 表示当最后一个消费者断开时自动删除队列。
arguments: 其他可选参数,例如死信队列、过期时间等。
5>queueBind 队列绑定
将队列绑定到指定的交换器上,并提供路由键或匹配规则
channel.queueBind(queueName, "my_exchange", "routing_key");
queue: 队列名称。
exchange: 交换器名称。
routingKey: 路由键,对于某些类型的交换器(如 Direct 或 Topic),这个值是必须的;对于 Fanout 类型,通常为空字符串。
arguments: 可选参数,主要用于 Headers Exchange 的匹配规则
6>basicPublish 发布消息
向指定的交换器发布一条消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2) // 2 表示持久化.build();
channel.basicPublish("my_exchange", "routing_key", props, message.getBytes());
exchange: 交换器名称。
routingKey: 路由键。
props: 消息属性,包括内容类型、编码、持久化模式等。
body: 消息体,即要发送的数据。
7>basicConsume 消费消息
费来自指定队列的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
queue: 队列名称。
autoAck: 自动确认标志,true 表示收到消息后立即确认,false 表示手动确认。
deliverCallback: 回调函数,用于处理接收到的消息。
cancelCallback: 取消回调函数,当消费者的取消通知到达时调用
8>basicAck 消息确认
手动确认模式下,当消费者成功处理完消息后,需要调用此方法来确认消息已被消费
channel.basicAck(envelope.getDeliveryTag(), false);
9>basicNack 消息丢弃
当消费者无法处理某条消息时,可以拒绝这条消息,并决定是否重新入队或者丢弃
// 第三个参数表示是否重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
13、RabbitMQ镜像集群模式
搭建RabbitMQ保证消息队列的高可用。
创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue。
优点:高可用,单个节点挂掉,其他节点仍可用
缺点:高负载,如果某个队列消息很重,则镜像复制的实例下也会很重,性能开销大。
参考博客:消息队列中点对点与发布订阅区别
Powered by niaonao
相关文章:

掌握RabbitMQ:全面知识点汇总与实践指南
前言 RabbitMQ 是基于 AMQP 高级消息队列协议的消息队列技术。 特点:它通过发布/订阅模型,实现了服务间的高度解耦。因为消费者不需要确保提供者的存在。 作用:服务间异步通信;顺序消费;定时任务;请求削…...

go如何从入门进阶到高级
针对Go语言的学习,不同阶段应采取不同的学习方式,以达到最佳效果.本文将Go的学习分为入门、实战、进阶三个阶段,下面分别详细介绍 一、社区 Go语言中文网 作为专注于Go语言学习与推广的平台,Go语言中文网为开发者提供了丰富的中…...
在环境冲突情况下调整优先级以解决ROS Catkin构建中缺少模块的问题【ubuntu20.04】
在机器人操作系统(ROS)的开发过程中,构建工作空间时遇到各种依赖性问题是常见的挑战之一。尤其是在多Python环境共存的情况下,环境变量的冲突往往导致诸如缺少empy模块等错误。本文将详细介绍在ROS Noetic与Anaconda Python环境共…...

github 个人主页配置
Guthub 个人主页 (官方称呼是 profile)可以展示很多有用的信息,例如添加一个首页被访问次数的计数器,一个被 Star 与 Commit 的概览信息,以及各种技能标签,设备标签等,还可以利用 wakatime 显示…...
STM32-笔记30-编程实现esp8266联网功能
串口2连接ESP8266模块 复制项目文件34-ESP8266串口间的通信 重命名为35-编程实现ESP8266联网功能 打开项目文件 main.c #include "sys.h" #include "delay.h" #include "led.h" #include "uart1.h" #include "esp8266.h"…...

oscp备考 oscp系列——Kioptix Level 1靶场 古老的 Apache Vuln
目录 前言 1. 主机发现 2. 端口扫描 3. 指纹识别 4. 目录扫描 5. 漏洞搜索和利用 前言 oscp备考,oscp系列——Kioptix Level 1靶场 Kioptix Level 1难度为简单靶场,主要考察 nmap的使用已经是否会看输出,以及是否会通过应用查找对应漏…...

《机器学习》——随机森林
文章目录 什么是随机森林?随机森林的原理随机森林的优缺点优点缺点 随机森林模型API主要参数 实例实例步骤导入数据处理数据,切分数据构建模型训练模型测试数据并输出分类报告和混淆矩阵画出模型的前十重要性的特征 扩展 什么是随机森林? -随…...

指代消解:自然语言处理中的核心任务与技术进展
目录 前言1. 指代消解的基本概念与分类1.1 回指与共指 2. 指代消解的技术方法2.1 端到端指代消解2.2 高阶推理模型2.3 基于BERT的模型 3. 事件共指消解:跨文档的挑战与进展3.1 联合模型3.2 语义嵌入模型(EPASE) 4. 应用场景与前景展望4.1 关键…...
记录一下Unity webgl cannot read properties of undefined reading apply 错误
出现这个问题说明你Build 文件夹的内容和最新的打包内容冲突了 解决方法是把Build文件夹里面的东西全部删了 然后使用Unity重新生成这些文件 后续发现还是有这个问题 然后想了一下本地冲突应该在前端吧本地的文件删了重新拉取服务器的文件才行 以下是解决方法 <script t…...

【C语言程序设计——选择结构程序设计】求阶跃函数的值(头歌实践教学平台习题)【合集】
目录😋 任务描述 相关知识 1. 选择结构基本概念 2. 主要语句类型(if、if-else、switch) 3. 跃迁函数中变量的取值范围 4. 计算阶跃函数的值 编程要求 测试说明 通关代码 测试结果 任务描述 本关任务:输入x的值&#x…...

unity 播放 序列帧图片 动画
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、方法一:代码控制播放序列帧1、设置图片属性2、创建Image组件3、简单的代码控制4、挂载代码并赋值 二、方法二:直接使用1.Image上添加…...
HTML - <a>
目录 1.简介 2.属性 2.1 href 2.2 hreflang 2.3 title 2.4 target 2.5 rel 2.6 referrerpolicy 2.7 ping 2.8 type 2.9 download 3.邮件链接 4.电话链接 1.简介 链接(hyperlink)是互联网的核心。它允许用户在页面上,从一个网址…...

Unity学习笔记(六)使用状态机重构角色移动、跳跃、冲刺
前言 本文为Udemy课程The Ultimate Guide to Creating an RPG Game in Unity学习笔记 整体状态框架(简化) Player 是操作对象的类: 继承了 MonoBehaviour 用于定义游戏对象的行为,每个挂载在 Unity 游戏对象上的脚本都需要继承自 MonoBehaviour&#x…...

【C++数据结构——树】二叉树的遍历算法(头歌教学实验平台习题) 【合集】
目录😋 任务描述 相关知识 1. 二叉树的基本概念与结构定义 2. 建立二叉树 3. 先序遍历 4. 中序遍历 5. 后序遍历 6. 层次遍历 测试说明 通关代码 测试结果 任务描述 本关任务:实现二叉树的遍历 相关知识 为了完成本关任务,你需要掌…...
Android Telephony | 协议测试针对 test SIM attach network 的问题解决(3GPP TS 36523-1-i60)
背景 除了运营商实网卡之外,在各种lab的协议测试中需要 follow 3GPP 协议定义(可以查询3gpp.org website 获取),那么 feature 需要支持覆盖的卡就不止运营商本身了。 本文介绍 IA APN流程,重点关注在协议/lab测试中,针对测试卡、非实网卡的的设置项,记录遇到的问题分…...

jenkins入门3 --执行一个小demo
1、新建视图 视图可以理解为是item的集合,这样可以将item分类。新建视频可以选择加入已有的item 2、新建item 1)输入任务名称、选择一个类型,常用的是第一个freestyle project 2)进行item相关配置,general 设置项目名字,描述,参数…...

STM32传感器系列:GPS定位模块
简介 我们在做一些项目的时候,可能需要使用到GPS模块,我们可以通过这个模块获得当前的位置以及时间,我这里就教大家如何去使用GPS定位模块,并且把示例代码开源到评论区下面,有需要自取即可,我我这里用到的…...

技术成长战略是什么?
文章目录 技术成长战略是什么?1. 前言2. 跟技术大牛学成长战略2.1 系统性能专家案例2.2 从开源到企业案例2.3 技术媒体大V案例2.4 案例小结 3. 学习金字塔和刻意训练4. 战略思维的诞生5. 建议 技术成长战略是什么? 1. 前言 在波波的微信技术交流群里头…...

【前端】Vue3与Element Plus结合使用的超详细教程:从入门到精通
文章目录 Moss前沿AI一、教程概述1.1 目标读者1.2 学习目标 二、为什么选择Vue3与Element Plus2.1 Vue3的优势2.2 Element Plus的优势2.3 二者结合的优势 三、环境搭建3.1 创建Vue3项目3.2 安装Element Plus3.3 引入Element Plus 四、Element Plus常用组件使用详解4.1 按钮&…...

Linux 35.6 + JetPack v5.1.4之 pytorch升级
Linux 35.6 JetPack v5.1.4之 pytorch升级 1. 源由2. 升级步骤1:获取二进制版本步骤2:安装二进制版本步骤3:获取torchvision步骤4:安装torchvision步骤5:检查安装版本 3. 使用4. 补充4.1 torchvision版本问题4.2 支持…...
[Java 基础]Java 中的关键字
在 Java 编程语言中,关键字 (Keywords) 是预定义的、具有特殊含义的标识符 (identifiers)。它们是 Java 语言语法的一部分,被 Java 编译器赋予了特定的功能和用途。因此,你不能将关键字用作变量名、类名、方法名或其他用户自定义的标识符。 …...
Java项目中常用的中间件及其高频问题避坑
Java项目中常用的中间件及其高频问题避坑如下: 一、常用中间件分类及作用 1. 消息队列中间件 作用:解耦系统、异步通信、削峰填谷。代表产品: Kafka:高吞吐量流处理,适合日志收集、实时分析。RocketMQ:金融级可靠性,支持事务消…...
每日算法刷题Day25 6.7:leetcode二分答案3道题,用时1h40min(遇到两道动态规划和贪心时间较长)
3. 1631.最小体力消耗路径(中等,dfs不熟练) 1631. 最小体力消耗路径 - 力扣(LeetCode) 思想 1.你准备参加一场远足活动。给你一个二维 rows x columns 的地图 heights ,其中 heights[row][col] 表示格子 (row, col) 的高度。一开始你在最左…...
使用Caddy在Ubuntu 22.04上配置HTTPS反向代理
使用Caddy在Ubuntu 22.04上配置HTTPS反向代理(无域名/IP验证+密码保护) 一、 环境说明 环境说明:测试环境,生产环境请谨慎OS: Ubuntu 22.04.1 LTSCaddy版本:v2.10.0服务器IP: 192.168.3.88(内网)公网IP: 10.2.3.11(测试虚拟)代理端口: 9080后端服务: http://192.168.3…...

多模态大语言模型arxiv论文略读(109)
Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文标题:Math-PUMA: Progressive Upward Multimodal Alignment to Enhance Mathematical Reasoning ➡️ 论文作者:Wenwen Zhuang, Xin Huang, Xiantao Z…...
Windows 系统安装 Redis 详细教程
Windows 系统安装 Redis 详细教程 一、Redis 简介 Redis(Remote Dictionary Server)是一个开源的、基于内存的高性能键值存储系统,常被用作数据库、缓存和消息中间件。相比传统数据库,Redis 具有以下优势: 超高性能…...

Python_day47
作业:对比不同卷积层热图可视化的结果 一、不同卷积层的特征特性 卷积层类型特征类型特征抽象程度对输入的依赖程度低层卷积层(如第 1 - 3 层)边缘、纹理、颜色、简单形状等基础特征低高,直接与输入像素关联中层卷积层(…...

Spring AI(10)——STUDIO传输的MCP服务端
Spring AI MCP(模型上下文协议)服务器Starters提供了在 Spring Boot 应用程序中设置 MCP 服务器的自动配置。它支持将 MCP 服务器功能与 Spring Boot 的自动配置系统无缝集成。 本文主要演示支持STDIO传输的MCP服务器 仅支持STDIO传输的MCP服务器 导入j…...

【Java微服务组件】分布式协调P4-一文打通Redisson:从API实战到分布式锁核心源码剖析
欢迎来到啾啾的博客🐱。 记录学习点滴。分享工作思考和实用技巧,偶尔也分享一些杂谈💬。 有很多很多不足的地方,欢迎评论交流,感谢您的阅读和评论😄。 目录 引言Redisson基本信息Redisson网站 Redisson应用…...

Prompt提示工程指南#Kontext图像到图像
重要提示:单个prompt的最大token数为512 # 核心能力 Kontext图像编辑系统能够: 理解图像上下文语义实现精准的局部修改保持原始图像风格一致性支持复杂的多步迭代编辑 # 基础对象修改 示例场景:改变汽车颜色 Prompt设计: Change …...