当前位置: 首页 > article >正文

RabbitMQ的其中工作模式介绍以及Java的实现

文章目录

  • 前文
  • 一、模式介绍
    • 1. 简单模式
    • 2. 工作队列模式
    • 3. 广播模式
    • 4. 路由模式
    • 5. 通配符模式
    • 6. RPC模式
    • 7. 发布确认模式
  • 二、代码实现
    • 1、简单模式
    • 2、工作队列模式
      • 生产者
      • 消费者
        • 消费者 1
        • 消费者 2
    • 3、广播模式 (Fanout Mode)
      • 生产者
      • 消费者
    • 4、路由模式 (Direct Mode)
      • 生产者
      • 消费者
    • 5、通配符模式
    • 6、RPC模式 (Remote Procedure Call Mode)
      • 服务器 (Server)
      • 客户端 (Client)
    • 7、发布确认模式 (Publisher Confirms)
      • 1. 单独确认 (Publishing Messages Individually)
      • 2. 批量确认 (Publishing Messages in Batches)
      • 3. 异步确认
      • 对比总结

前文

为了更好的理解RabbitMQ中的工作模式,最好先了解RabbitMQ的几种常见交换机的类型

  1. Fanout(扇出交换机)
    它会忽视路由键,把消息发送给所有绑定了该交换机的所有队列

  2. Direct(直接交换机)
    根据生产者发送消息时设置的routingKey和交换机与不同队列绑定的bindingKey进行匹配,如果匹配把消息发送给对应的队列

  3. Topic(通配符交换机)
    可以认为是Direct的升级版。Direct中bindingKey必须是一个常量字符串,在Topic中bindingKey可以是一个通配符,类似于正则表达式。只要routingKey符合bindingKey的字符串模式,那么就可以把消息发送给指定队列

RabbitMQ中用 .来分割每一个单词。*表示匹配一个任意单词,可以是单个字母。#表示可以匹配0个或者多个单词,比*宽松。
例如,# 可以匹配 a、a.b、a.b.c 等,而 . 只能匹配正好两个单词的路由键(如 a.b)

  1. Header
    这种交换机不依赖于routingKey和bindingKey。它会根据消息中的headers属性进行匹配。但是由于其性能低下,因此很少用。

此外,代码实现部分博客使用的RabbitMQ自带的依赖包。Spring也支持RabbitMQ。
两者在RabbitMQ官网都有说明。


一、模式介绍

1. 简单模式

七个模式中最简单的模式,特点是一个生产者p、一个消费者c,消息只能被消费一次。适用于消息只能被单个消费者消费的场景。
在这里插入图片描述

2. 工作队列模式

概述: ⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息

特点: 消息不会重复, 分配给不同的消费者

适⽤场景: 集群环境中做异步处理。例如12306候补成功的短信服务,其中每个短信服务功能是一样的,消息给到那个消费者都可以,类似于集群:在这里插入图片描述

在这里插入图片描述

3. 广播模式

概述: 图中x是exchange,exchange会根据消息中的routingkey与Q1、Q2绑定的bindkey进行匹配,如果匹配成功,把消息转发给指定的队列。
特点: 一个生产者发送给exchange的消息,会被exchange复制多分,分别发送给绑定了这个exchange的queue。每个消费者获得的消息都是一样的
应用场景: 比如1001就老喜欢这种东西了,想给自己的客户推销广告,用广播模式,就可以把消息发送给所有的用户。
在这里插入图片描述

4. 路由模式

概述: 这个模式相当于是广播模式的一个约束,它会根据消息中的routingKey和与其他队列绑定的bindingKey进行匹配,如果匹配才会把消息发送给指定队列。

💡routingKey 和bindgKey必须完全一直才能匹配成功

在这里插入图片描述

5. 通配符模式

概述: 相当于路由模式的升级版,只要消息中的routingKey与指定队列的通配符匹配进行发送消息。
在这里插入图片描述

根据上图示例:

  1. ff.a.j与*.a.*匹配,该消息就会发送到Q1
  2. 消息:c.jojo.hyy 与c.#匹配,该消息就会发送到Q2

6. RPC模式

概述: RCP模式下 没有Producter和Consumer的概念,取而代之的是Client和Server的结构。Client发送消息给Server并且希望Server能发送一个期望的响应给Client,可以使用RPC模式.

特点: Client发送消息会设定两个字段relyTo、correlationId。replyTo用于指定Server使用哪一个回调队列(图中使用的Reply)发送响应给到Client。Client会等待回调队列发送reply给到自己,根据correlationId确保是Cilen需要的响应。
在这里插入图片描述


7. 发布确认模式

概述: 发布确认机制是RabbitMQ用于保证消息可靠性的其中一个方式。

  1. producter把对应的channel设置成confirm模式(通过channel.confirmSelect方法实现),并且设定一个消息唯一ID,把消息与唯一ID关联起来
  2. exchange接收到消息后会发送一个ACK响应给到producter(响应中含有唯一ID),表明消息已经送达。
    这种方法可以尽可能的避免在消息发送过程中的丢失问题。

二、代码实现

代码实现,主要有两种,一个是RabbitMQ官方提供的依赖包,另一个是Spring官方AMQP对RabbitMQ的封装实现,两者都会演示

RabbitMQ中央仓库
找到合适的版本导入即可,本博客使用的5.20.0版本。

1、简单模式

生产者:

public class Producer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);//4、发布消息channel.basicPublish("", Constants.SIMPLE_QUEUE, null, "呵呵".getBytes());System.out.println("执行了发布");//5、关闭连接channel.close();connection.close();}
}

💡

  1. Channel、Connection的相关包都来自于com.rabbitmq.client不要导错了
  2. 步骤四中参数 “” 的意思是使用默认交换机(Direct类型),bindingKey就是已经绑定的队列名字。
    消费者:
public class Consumer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);//5、定义consumer逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//根据实际场景消费消息System.out.println("消息已经被消费,获取的消息内容:" + new String(body));}};//6、消费内容channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);//7、关闭连接channel.close();connection.close();}
}

💡
consumerTag: 标识不同消费者的唯一标签
envelope: 描述了消息传递的细节,如该消息是由那个交换机发送的,消息指定的routingKey是什么,消息的唯一标识deliveryTag。
properties: 用于设定RabbitMQ的高级属性
body: 消息的本体,以二进制方式存储

2、工作队列模式

工作队列模式(Work Queue Mode)是一种任务分发的模式,允许多个消费者从同一个队列中获取消息并处理,从而实现任务的负载均衡。消息会被轮询(Round-Robin)分发到不同的消费者,适合处理耗时任务的场景。

生产者

import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Product {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、发送20条消息for (int i = 0; i < 20; i++) {channel.basicPublish("", Constants.WORK_QUEUE, null, ("工作队列的消息" + i).getBytes());}// 5、关闭连接channel.close();connection.close();}
}

说明:

  • 队列声明:使用 channel.queueDeclare 声明一个持久化的队列(durable=true),确保队列在 RabbitMQ 重启后依然存在。
  • 消息发送:通过 basicPublish 方法向默认交换机("")发送消息,路由键为队列名称(Constants.WORK_QUEUE)。
  • 消息内容:循环发送 20 条消息,每条消息为 "工作队列的消息" + i
  • 连接关闭:发送完成后关闭信道和连接。

💡 注意

  • 参数 "" 表示使用默认交换机(Direct 类型),路由键直接绑定到队列名称。

消费者

消费者从工作队列中获取消息并处理。以下是两个消费者的实现,分别命名为 Consumer1Consumer2,它们共享同一队列的消息,每个消费者拿到不同的消息。

消费者 1
public class Consumer1 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("第一个消费者消费消息:" + new String(body));}};// 5、消费channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// 6、保持连接(注释掉关闭连接的代码)// channel.close();// connection.close();}
}
消费者 2

public class Consumer2 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST); // RabbitMQ的IP地址factory.setPort(Constants.PORT); // RabbitMQ的服务端口号,默认是5672factory.setVirtualHost(Constants.VIRTUAL_HOST); // 虚拟主机factory.setUsername(Constants.USER_NAME); // RabbitMQ登录账号factory.setPassword(Constants.PASSWORD); // RabbitMQ登录密码return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException {// 1、建立连接Connection connection = createConnection();// 2、开启信道Channel channel = connection.createChannel();// 3、声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// 4、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("第二个消费者消费消息:" + new String(body));}};// 5、消费channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// 6、保持连接(注释掉关闭连接的代码)// channel.close();// connection.close();}
}

说明:

  • 队列声明:与生产者一致,消费者也声明相同的队列,确保队列存在。
  • 消费逻辑:通过继承 DefaultConsumer 并重写 handleDelivery 方法,定义消息处理逻辑。Consumer1Consumer2 分别打印接收到的消息,标识为“第一个消费者”和“第二个消费者”。
  • 消息消费:使用 channel.basicConsume 订阅队列,autoAck=true 表示自动确认消息(消费者接收消息后自动通知 RabbitMQ,把消息从队列中删除)。

💡 注意

  • consumerTag:标识消费者的唯一标签,用于区分不同的消费者。
  • envelope:包含消息的元数据,如路由键、交换机和 deliveryTag(消息的唯一标识)。
  • properties:消息的附加属性,可用于高级配置。
  • body:消息的实际内容,以字节数组形式存储。

3、广播模式 (Fanout Mode)

广播模式通过 Fanout 交换机将消息分发到所有绑定的队列,忽略路由键,适合发布/订阅场景。以下基于提供的代码续写。

生产者

关键点:

  • 声明 Fanout 交换机 (exchangeDeclare)。
  • 声明并绑定多个队列到交换机 (queueDeclare, queueBind)。
  • 发布消息到交换机,路由键为空 (basicPublish)。

消费者

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD);  //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);}
}

关键点:

  • 声明队列并监听消息 (queueDeclare, basicConsume)。
  • 每个消费者独立消费绑定队列的消息。
  • 自动确认消息 (autoAck=true)。

💡 注意:

  • Fanout 模式下,路由键被忽略,消息广播到所有绑定队列。
  • 确保交换机和队列正确绑定,避免消息丢失。

4、路由模式 (Direct Mode)

路由模式通过 Direct 交换机根据路由键精确分发消息到匹配的队列,适合需要条件路由的场景。

生产者

public class Producer {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5、交换机绑定队列q1 q2channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"q1");channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"q2");//6、生产者发送消息channel.basicPublish(Constants.DIRECT_EXCHANGE,"q1",null,("q1需要接收到这个消息").getBytes());channel.basicPublish(Constants.DIRECT_EXCHANGE,"q2",null,("q2需要接收到这个消息").getBytes());//7、关闭资源channel.close();connection.close();}
}

关键点:

  • 声明 Direct 交换机 (exchangeDeclare)。
  • 声明队列并绑定到交换机,指定路由键 (queueBind)。
  • 发布消息时指定路由键 (basicPublish)。

消费者

public class Consumer1 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);//5、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+new String(body));}};//5、消费channel.basicConsume(Constants.DIRECT_QUEUE1,true,consumer);//6、关闭连接channel.close();connection.close();}
}
public class Consumer2 {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel=connection.createChannel();//3、声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);//4、声明队列channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);//5、定义消费逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+new String(body));}};//5、消费channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);//6、关闭连接channel.close();connection.close();}
}

关键点:

  • 声明队列并监听消息 (queueDeclare, basicConsume)。
  • 根据队列绑定的路由键接收对应消息。
  • 自动确认消息 (autoAck=true)。

💡 注意:

  • 路由键必须精确匹配,消息才会分发到对应队列。
  • 队列可以绑定多个路由键,增加灵活性。
  • 未绑定路由键的队列不会收到消息。

5、通配符模式

通配符模式和路由模式实现的不同点就是交换机使用TOPIC类型,交换机和队列绑定使用通配符,其他代码几乎一致,这里就不演示了。

6、RPC模式 (Remote Procedure Call Mode)

RPC模式通过RabbitMQ实现客户端与服务器的双向通信,客户端发送请求到服务器并等待响应,适合需要同步响应的场景。

服务器 (Server)

public class Server {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列并且设定对多处理消息数channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.basicQos(1);//5、定义consumer逻辑DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//根据实际场景消费消息System.out.println("客服端发送了这个消息:" + new String(body));//获取消息中的correID将其发送会客户端AMQP.BasicProperties proper = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();//给客户端发送响应,指定使用replayTochannel.basicPublish("", properties.getReplyTo(), proper, ("收到来自客户端的请求").getBytes());channel.basicAck(envelope.getDeliveryTag(), false);}};//6、消费内容channel.basicConsume(Constants.RPC_REQUEST_QUEUE, true, consumer);}
}

关键点:

  • 声明请求和响应队列 (queueDeclare)。
  • 设置消息处理限制 (basicQos),确保按序处理。
  • 消费请求队列消息,发送响应到客户端指定的 replyTo 队列。
  • 使用 correlationId 关联请求和响应。

客户端 (Client)

public class Client {private static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);//RabbitMQ的IP地址factory.setPort(Constants.PORT);//RabbitMQ的服务端口号,模式是5672factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机,可以在RabbitMQ终端创建一个factory.setUsername(Constants.USER_NAME);//RabbitMQ登录账号factory.setPassword(Constants.PASSWORD);//RabbitMQ登录密码Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1、建立连接Connection connection = createConnection();//2、开启信道Channel channel = connection.createChannel();//3、声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);//4、生成唯一ID用于区分当前消息String corrID= UUID.randomUUID().toString();//5、配置请求相关属性AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrID).replyTo(Constants.RPC_RESPONSE_QUEUE).build();//4、发布消息channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, "呵呵".getBytes());System.out.println("执行了发布");//5、等待响应final BlockingQueue<String> bq= new LinkedBlockingQueue<>(1);DefaultConsumer consumer=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response=new String(body);if(properties.getCorrelationId().equals(corrID)){System.out.println("接收到回调消息:"+response);bq.offer(response);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);bq.take();}
}

关键点:

  • 声明请求和响应队列 (queueDeclare)。
  • 生成唯一 correlationId 标识请求。
  • 发送请求到 RPC_REQUEST_QUEUE,指定 replyTo 为响应队列。
  • 监听 RPC_RESPONSE_QUEUE,验证 correlationId 匹配后处理响应。

7、发布确认模式 (Publisher Confirms)

发布确认模式确保生产者发送的消息被RabbitMQ正确接收,提供可靠性保证。确认方式有以下三种:

1. 单独确认 (Publishing Messages Individually)

  private static void individually() throws Exception {try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4. 发送消息, 并等待确认long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}

关键点:

  • 启用确认模式 (confirmSelect)。
  • 每发送一条消息,同步等待确认 (waitForConfirmsOrDie)。
  • 适合小规模消息发送,因为性能较低。

2. 批量确认 (Publishing Messages in Batches)

 private static void publishingMessagesInBatches() throws Exception{try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}

关键点:

  • 启用确认模式 (confirmSelect)。
  • 每发送一批消息 (如100条),同步等待确认 (waitForConfirmsOrDie)。
  • 平衡了性能与可靠性。
  • 但在一些消息容易遗失的场景,我们不清楚具体是那个消息出现问题,需要批量重发消息,性能可能不增返降。

3. 异步确认

    private static void asynchronously() throws Exception{try (Connection connection = createConnection()){//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);//4. 监听confirm//集合中存储的是未确认的消息IDlong start = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple){confirmSeqNo.headSet(deliveryTag+1).clear();}else {confirmSeqNo.remove(deliveryTag);}//业务需要根据实际场景进行处理, 比如重发, 此处代码省略}});//5. 发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long end = System.currentTimeMillis();System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}

关键点:

  • 启用确认模式 (confirmSelect)。
  • 使用 channel.ConfirmListener() 开启监听,异步处理确认 (handleAck, handleNack)。
  • 通过 SortedSet 跟踪未确认消息。
  • 最高吞吐量,适合大规模消息发送。

对比总结

策略优点缺点适用场景
单独确认简单,高可靠性延迟高,吞吐量低小规模、可靠性优先
批量确认平衡性能与可靠性仍需同步等待,部分延迟中等规模、可靠性与性能兼顾
异步确认高吞吐量,低延迟实现复杂,需处理失败重发大规模、高性能需求

相关文章:

RabbitMQ的其中工作模式介绍以及Java的实现

文章目录 前文一、模式介绍1. 简单模式2. 工作队列模式3. 广播模式4. 路由模式5. 通配符模式6. RPC模式7. 发布确认模式 二、代码实现1、简单模式2、工作队列模式生产者消费者消费者 1消费者 2 3、广播模式 (Fanout Mode)生产者消费者 4、路由模式 (Direct Mode)生产者消费者 5…...

vue2项目搭建

作者碎碎念&#xff1a;开历史倒车了&#xff0c;没想到不兼容&#xff0c;只能从vue3->vue2了。 1 vue3和vue2 这部分参考了官网的《vue3迁移指南》&#xff1a;Vue 3 的支持库进行了重大更新。以下是新的默认建议的摘要: 新版本的 Router, Devtools & test utils 来…...

Spring AI 源码解析:Tool Calling链路调用流程及示例

Tool工具允许模型与一组API或工具进行交互&#xff0c;增强模型功能&#xff0c;主要用于&#xff1a; 信息检索&#xff1a;从外部数据源检索信息&#xff0c;如数据库、Web服务、文件系统或Web搜索引擎等 采取行动&#xff1a;可用于在软件系统中执行特定操作&#xff0c;如…...

2025年- H48-Lc156 --236. 二叉树的最近公共祖先(递归、深搜)--Java版

1.题目描述 递归终止条件&#xff1a; 如果当前节点 root 为 null&#xff0c;表示到达了叶子节点的空子树&#xff1b; 如果当前节点是 p 或 q&#xff0c;就返回它&#xff08;因为从这里可以回溯寻找公共祖先&#xff09;。 2.思路 &#xff08;1&#xff09; 如果当前节…...

【人工智能】低代码-模版引擎

模板引擎是一种将数据与静态模板结合&#xff0c;生成动态内容的工具。它的核心作用是将业务逻辑与展示层分离&#xff0c;使代码更易维护、复用和管理。 核心功能 变量替换&#xff1a;将模板中的占位符替换为动态数据。 逻辑控制&#xff1a;支持条件判断&#xff08;if/els…...

Hertz+Kitex快速上手开发

本篇文章以用户注册接口为例&#xff0c;快速上手HertzKitex 以用户注册接口来演示hertz结合kitex实现网关微服务架构的最简易版本 项目结构 api- gateway&#xff1a;网关实现&#xff0c;这里采用hertz框架 idl&#xff1a;接口定义用来生成kitex代码 kitex_gen&#xff…...

线程池配置经验总结

1. 核心线程数配置(corePoolSize) 1.1 核心线程数的配置影响因素 CPU核心数 CPU密集型任务&#xff1a;核心线程数 ≈ CPU核心数 1IO密集型任务&#xff1a;核心线程数 ≈ CPU核心数 (1 平均等待时间/平均计算时间) 一般经验值&#xff1a;2 CPU核心数 内存大小&#xff…...

机器学习课程设计报告 —— 基于二分类的岩石与金属识别模型

机器学习课程设计报告 题 目&#xff1a; 基于二分类的岩石与金属识别模型 专 业&#xff1a; 机器人工程 学生姓名&#xff1a; XXX 指导教师&#xff1a; XXX 完成日期&#xff1a…...

分词算法BPE详解和CLIP的应用

一、TL&#xff1b;DR BPE通过替换相邻最频繁的字符和持续迭代来实现压缩CLIP对text进行标准化和预分词后&#xff0c;对每一个单词进行BPE编码和查表&#xff0c;完成token_id的转换 二、BPE算法 2.1 核心思想和原理 paper&#xff1a;Neural Machine Translation of Rare…...

STM32F103_Bootloader程序开发02 - Bootloader程序架构与STM32F103ZET6的Flash内存规划

导言 在工业设备和机器人项目中&#xff0c;固件远程升级能力已成为提升设备维护性与生命周期的关键手段。本文将围绕STM32平台&#xff0c;系统性介绍一个简洁、可靠的Bootloader程序设计思路。 我们将Bootloader核心流程划分为五大功能模块&#xff1a; 启动入口与升级模式判…...

通过Auto平台与VScode搭建远程开发环境(以Stable Diffusion Web UI为例)

文章目录 Stable Diffusion Web UI一、&#x1f3af;主要功能概述二、&#x1f9e0;支持的主要模型体系三、&#x1f4e6;安装方式简述✅ 一、前提准备✅ 二、安装步骤混乱版本&#xff08;仅用于记录测试过程&#xff09;第一步&#xff1a;克隆仓库&#xff08;使用清华大学镜…...

Windows_Rider C#语言开发环境构建

Windows_Rider C#语言开发环境构建 一、C#语言简介历史背景语言特点应用领域开发工具未来发展方向 二、Rider简介功能特点支持的语言免费版本最新更新 三、开发环境构建&#xff08;一&#xff09;安装 JetBrains Rider&#xff08;二&#xff09;安装 .NET SDK&#xff08;三&…...

Unity 打包程序全屏置顶无边框

该模块功能: 1. 打包无边框 2. 置顶 3. 不允许切屏 4.多显示器状态下,程序只在主显示上运行 5.全屏 Unity 打包设置: 如果更改打包设置,最好将Version版本增加一下,否则可能不会覆盖前配置文件 代码: 挂在场景中即可 using UnityEngine; using System; // 确保这行存…...

GAMES104 Piccolo引擎搭建配置

操作系统&#xff1a;windows11 家庭版 inter 17 12 th 显卡&#xff1a;amd 运行内存&#xff1a;>12 1、如何构建&#xff1f; 在github下载&#xff1a;网址如下 https://github.com/BoomingTech/Piccolo 下载后安装 git、vs2022 Git Visual Studio 2022 IDE - …...

第 29 场 蓝桥·算法入门赛

1. 不油腻的星座 "我们只欢迎不油腻的星座&#xff01;" 在「非哺乳动物星座联盟」的派对上&#xff0c;主持人突然宣布&#xff1a;"请在场的 12 星座中&#xff0c;名字里包含哺乳动物的立刻离场"&#xff0c;结果白羊、金牛、狮子、摩羯 44 个星座红着脸…...

用service 和 SCAN实现sqlplus/jdbc连接Oracle 11g RAC时负载均衡

说明 11.2推出的SCAN &#xff0c;简化了客户端连接&#xff08;当增加或者减少RAC实例时&#xff0c;不需要修改客户端配置&#xff0c;并且scan listener有各个实例的负载情况&#xff0c;可以实现连接时负载均衡。 不过客户端需要使用专门建立的service,而不能用RAC数据库…...

Jenkins 中获取构建触发用户的完整指南

在持续集成(CI/CD)流程中,追踪构建的触发用户是排查问题、审计操作或通知相关人员的重要需求。然而,Jenkins 默认不直接暴露触发构建的用户信息,尤其是在自动触发场景下。本文将详细介绍 多种获取 Jenkins 构建触发用户的方法,涵盖插件使用、脚本编写和 API 查询,并提供…...

防火墙流量管理

带宽管理介绍 针对企业用户流量&#xff0c;防火墙提供了带宽管理功能&#xff0c;基于出/入接口、源/目的安全区域、源/目的地址、时间段、报文DSCP优先级等信息&#xff0c;对通过自身的流量进行管理和控制。 带宽管理提供带宽限制、带宽保证和连接数限制功能&#xff0c;可…...

uniapp+ts 多环境编译

1. 创建项目 npx degit dcloudio/uni-preset-vue#vite-ts [项目名称] 2.创建env目录 多环境配置文件命名为.env.别名 添加index.d.ts interface ImportMetaEnv{readonly VITE_ENV:string,readonly UNI_PLATFORM:string,readonly VITE_APPID:string,readonly VITE_NAME:stri…...

Linux系统移植①:uboot概念

Linux系统移植①&#xff1a;uboot概念 uboot概念 1、uboot是一个比较复杂的裸机程序。 2、uboot就是一个bootloader,作用就是用原于启动Linux或其他系统。uboot最主要的工作就是初始化DDR。因为Linux是运行再DDR里面的。一般Linux镜像zImage&#xff08;uImage&#xff09;设…...

linux 学习之位图(bitmap)数据结构

bitmap 可以高效地表示大量的布尔值&#xff0c;并且在许多情况下可以提供快速的位操作。 1 定义 enum device_state{DOWN,DOEN_DONE,MAILBOX_READY,MAILBOX_PENDING,STATE_BUILD };DECLARE_BITMAP(state,STATE_BUILD)&#xff1b;相当于》u32 state[BITS_TO_LONGS(4)] BIT…...

DAY 35

import torch import torch.nn as nn import torch.optim as optim from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.preprocessing import MinMaxScaler import time import matplotlib.pyplot as plt# 设置GPU设…...

理论篇一:了解webpack是什么,能解决什么问题,如何使用

Webpack 是前端工程化的核心工具之一,它的核心目标是将前端项目中的各种资源(JS、CSS、图片等)高效打包成浏览器可运行的静态文件。以下是系统化的解答: 一、Webpack 是什么? 1. 定义 Webpack 是一个 静态模块打包工具(Static Module Bundler),它通过分析项目的依赖关…...

AWS EC2实例安全远程访问最佳实践

EC2 远程连接方案对比 远程访问 Amazon EC2 实例主要有以下四种方式&#xff1a; Secure Shell (SSH) 远程访问AWS Systems Manager 会话管理器适用于 Linux 实例的 EC2 Serial ConsoleAmazon EC2 Instance Connect SSH 远程访问 SSH&#xff08;Secure Shell&#xff09;广…...

集群、容器云与裸金属服务器的全面对比分析

文章目录 引言 集群 2.1 定义 2.2 特点 2.3 应用场景 容器云 3.1 定义 3.2 核心功能 3.3 应用场景 裸金属 4.1 定义 4.2 特点 4.3 应用场景 三者的区别 5.1 架构与性能 5.2 管理与运维 5.3 成本与灵活性 总结 1. 引言 在云计算和数据中心领域&#xff0c;50…...

【强化学习】#7 基于表格型方法的规划和学习

主要参考学习资料&#xff1a;《强化学习&#xff08;第2版&#xff09;》[加]Richard S.Suttion [美]Andrew G.Barto 著 文章源文件&#xff1a;https://github.com/INKEM/Knowledge_Base 本章更是厘清概念厘到头秃&#xff0c;如有表达不恰当之处还请多多指教—— 概述 环境…...

EasyRTC嵌入式音视频通信SDK一对一音视频通信,打造远程办公/医疗/教育等场景解决方案

一、方案概述​ 数字技术发展促使在线教育、远程医疗等行业对一对一实时音视频通信需求激增。传统方式存在低延迟、高画质及多场景适配不足等问题&#xff0c;而EasyRTC凭借音视频处理、高效信令交互与智能网络适配技术&#xff0c;打造稳定低延迟通信&#xff0c;满足基础通信…...

Linux/aarch64架构下安装Python的Orekit开发环境

1.背景 国产化趋势越来越强&#xff0c;从软件到硬件&#xff0c;从操作系统到CPU&#xff0c;甚至显卡&#xff0c;就产生了在国产ARM CPU和Kylin系统下部署Orekit的需求&#xff0c;且之前的开发是基于Python的&#xff0c;需要做适配。 2.X86架构下安装Python/Orekit开发环…...

网络安全-等级保护(等保) 3-2-1 GB/T 28449-2019 第6章 方案编制活动

################################################################################ GB/T 28449-2019《信息安全技术 网络安全等级保护测评过程指南》是规定了等级测评过程&#xff0c;是纵向的流程&#xff0c;包括&#xff1a;四个基本测评活动:测评准备活动、方案编制活…...

Oracle Enqueue Names

Oracle Enqueue Names Enqueue&#xff08;排队锁&#xff09;是Oracle数据库中用于协调多进程并发访问共享资源的锁机制。 This appendix lists Oracle enqueues. Enqueues are shared memory structures (locks) that serialize access to database resources. They can be…...