当前位置: 首页 > news >正文

RabbitMQ 应用

在这里插入图片描述

文章目录

  • 前言
  • 1. Simple 简单模式
  • 2. Work Queue 工作队列模式
  • 3. Pubulish/Subscribe 发布/订阅模式
    • Exchange 的类型
  • 4. Routing 路由模式
  • 5. Topics 通配符模式
  • 6. RPC RPC通信
  • 7. Publisher Confirms 发布确认
    • 1. 单独确认
    • 2. 批量确认
    • 3. 异步确认

前言

前面我们学习了 RabbitMQ 的基本使用以及 RabbitMQ 的快速上手,那么这篇文章我将为大家介绍 RabbitMQ 提供的 7 种工作模式,我们上一篇快速入门实现的案例其实就是 7 种工作模式中的简单模式。

第一种:Simple 简单模式
在这里插入图片描述
第二种:Work Queue 工作队列模式
在这里插入图片描述
第三种:Publish/Subscribe 发布/订阅模式
在这里插入图片描述
第四种:Routing 路由模式
在这里插入图片描述
第五种:Topic 通配符模式
在这里插入图片描述
第六种:RPC 模式
在这里插入图片描述

第七种就是 Publisher Confirms 发布确认模式。

1. Simple 简单模式

在这里插入图片描述
简单模式主要由一个 Producer、一个 Queue和一个 Consumer 组成。

简单模式的特点就是:一个生产者 P,一个消费者 C,消息只能被消费一次,也称为点对点(Point-to-Point)模式。

这里的具体实现上一篇文章我就写了,这里就不写了,大家可以去看看。

2. Work Queue 工作队列模式

在这里插入图片描述
工作队列模式由一个生产者 P,一个队列 Queue 和多个消费者 C1、C2…组成,在这种模式下,Work Queue 会将消息分派给不同的消费者,每个消费者都会接收到不同的消息,意思就是 Work Queue 接收到了 10 条消息,那么 Work Queue 会将这 10 条消息分成两部分,每个部分 5 条消息,每条消息都不重复,然后将这五条消息分别发送给 C1 和 C2。

特点:消息不会重复,分配给不同的消费者。

适用场景:集群环境中做异步处理。比如我们平时在银行中办理业务取号的时候,当要办理业务的人取号(生产者)了之后,那么这些号码就会被存放在队列中,银行中的每个窗口(消费者)会给不同号的人办理业务。

在这里插入图片描述
那么我们看看通过 Java 代码如何实现一个工作队列模式。

对于这些经常使用到的变量,我们将其归到一个类中进行管理:

public class Constants {public static final String IP = "*.*.*.*";public static final Integer PORT = 5672;public static final String VIRTUAL_HOST = "test";public static final String USER_NAME = "admin";public static final String PASSWORD = "xxx";public static final String WORK_QUEUE = "work.queue";
}

生产者代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//发送消息for (int i = 0; i < 10; i++) {String msg = "work queue" + i;channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());}System.out.println("消息发送成功");channel.close();connection.close();}
}

消费者1 和消费者 2 的代码是一样的:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE,consumer);//释放资源//channel.close(); 我们这里可以先不释放资源,不然当我们先运行消费者的时候queue中没有消息,consumer的连接就会直接关闭了//connection.close();}
}

先启动两个消费者,然后再启动生产者:

在这里插入图片描述
在这里插入图片描述

可以看到 Consumer1 和 Consumer2 拿到的消息都是不重复的消息。

3. Pubulish/Subscribe 发布/订阅模式

在这里插入图片描述

Exchange 的类型

可以发现这个模式相较于前面两个模式,多出了一个 X,这个 X 指的是 Exchange 交换机。

交换机的作用是:生产者将消息发送到 Exchange,由交换机将消息按照一定的规则路由到一个或者多个队列中。

AMQP 协议中的交换机的类型有六种:fanout,direct,topic,headers,System和自定义,但是 RabbitMQ 中交换机的类型只有前四种。

  • Fanout:广播,将消息交给所有绑定到交换机的队列(Publish/Subcribe模式)
  • Direct:定向,把消息交给符合指定 routing key 的队列(Routing 模式)
  • Topic:通配符,把消息交给符合 routing pattern(路由模式)的队列(Topic 模式)
  • headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。headers 类型的交换机性能很差,而且也不实用,基本上不会看到它的存在

Exchaneg 只负责转发消息,不具备存储消息的能力,因此如果没有队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息就会丢失。

  • Routing Key:路由键。生产者将消息发送给转换机的时候,指定的一个字符串,用来告诉交换机应该如何处理这个消息
  • Binding Key:绑定。RabbitMQ 中通过 Binding 将交换机和队列关联起来,在绑定的时候一般会指定一个 Binding Key,这样 RabbitMQ 就知道如何正确的将消息路由到队列了。

在这里插入图片描述
当 Exchange 拿到生产者发送来的消息之后,会将消息中带的 Routing Key 和与该交换机绑定的队列的 Binding Key 进行匹配,然后将这个消息发送给 Routing Key 和 Binding Key 匹配的队列。

当知道了交换机的几种类型之后,我们来看看如何使用代码实现出来一个 Publish/Subscribe 模式。

生产者代码:

首先还是与 RabbitMQ-Server 建立连接,开启信道,与前面不同的操作是,前面我们声明交换机的时候因为使用的是默认的交换机,所以就没有显式的声明交换机,但是在涉及到交换机类型的时候,我们就需要显式的声明交换机,虽然 RabbitMQ 默认为我们提供了各个类型的交换机,但是名字可能不好记,所以不如我们自己实现一个:

在这里插入图片描述

Java 中声明一个交换机的方法主要是 exchangeDeclare() 方法,这个方法有很多个重载方法,但是我们主要使用下面的这种方法:

在这里插入图片描述

AMQP.Exchange.DeclareOk exchangeDeclare(String var1, BuiltinExchangeType var2, boolean var3) throws IOException;
  • String var1: 这个参数是交换机的名称。它是必须的,用于在RabbitMQ中唯一标识一个交换机。你可以根据需要为这个交换机命名。
  • BuiltinExchangeType var2: 这个参数指定了交换机的类型。该类是一个枚举类,内部枚举了交换器的类型
  • boolean var3: 这个布尔值参数指定交换机是否应该被标记为持久的(即,在RabbitMQ重启后仍然存在)。如果设置为true,交换机将持久化;如果设置为false,交换机则不会持久化。
public enum BuiltinExchangeType {DIRECT("direct"),FANOUT("fanout"),TOPIC("topic"),HEADERS("headers");private final String type;private BuiltinExchangeType(String type) {this.type = type;}public String getType() {return this.type;}
}

声明完成交换器后,就是声明队列,声明队列之后就是需要绑定交换器和队列了:

绑定交换器和队列使用的方法是 queueBind() 方法,该方法也是有两个重载的方法:

AMQP.Queue.BindOk queueBind(String var1, String var2, String var3) throws IOException;AMQP.Queue.BindOk queueBind(String var1, String var2, String var3, Map<String, Object> var4) throws IOException;
  • String var1: 队列的名称。这是你想要绑定到交换机的队列的唯一标识符。
  • String var2: 交换机的名称。这是你想要将队列绑定到的交换机的名称。
  • String var3: 路由键。当消息发送到交换机时,交换机将使用路由键来确定哪些队列应该接收这个消息。路由键可以是任何字符串,其解释取决于交换机的类型。
  • Map<String, Object> var4: 绑定参数。这是一个可选参数,允许你为绑定指定额外的参数,这些参数将根据交换机和队列的特定需求进行解释。例如,对于某些交换机类型(如headers交换机),绑定参数可能用于定义消息头中的条件。对于大多数用途,这个参数可能为空或未使用。

我们这里没有使用到额外的参数,所以就使用三个参数的方法:

channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);//声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);//绑定交换机和队列channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");//生产消息for (int i = 0; i < 10; i++) {String msg = "fanout exchange" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());}System.out.println("消息发送成功");channel.close();connection.close();}
}

消费者代码:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1,consumer);}
}

两个消费者的代码基本上都一样,就是声明队列的时候声明的是两个不同的队列。

先启动两个消费者,然后再启动生产者:

在这里插入图片描述
在这里插入图片描述
可以看到,Exchange 将收到的生产者生产的消息复制为 N 份发送给了所有与其绑定的队列,然后消费者拿到的消息就是一样的消息。这就是 Publish/Subscribe 模式。

4. Routing 路由模式

在这里插入图片描述
路由模式其实和发布/订阅模式非常相似,它是在发布订阅模式的基础上,增加了路由 key,其实也不算增加,只不过发布/订阅模式交换器和队列的 Binding key 都是一样的,然后生产者发送的消息中携带的 Routing Key 也是和这些 Binding Key 是相匹配的,所有交换器会将收到的消息发送给所有与其绑定的队列。

而路由模式则是交换器和队列的 Binding Key 并不是完全相同的,而是存在差异,这样当交换器接收到 Routing Key 的时候,就会将消息发送给与之匹配的队列。那么我们看看如何使用代码来实现 路由模式。

生产者代码

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换器channel.exchangeDeclare(Constants.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true);//声明队列channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);//绑定交换器和队列channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,"a");channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,"b");channel.queueBind(Constants.ROUTING_QUEUE2,Constants.ROUTING_EXCHANGE,"a");//生产消息String msg = "routing exchange a";channel.basicPublish(Constants.ROUTING_EXCHANGE,"a",null,msg.getBytes());msg = "routing exchange b";channel.basicPublish(Constants.ROUTING_EXCHANGE,"b",null,msg.getBytes());System.out.println("消息发送成功");channel.close();connection.close();}
}

消费者代码:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.ROUTING_QUEUE1,consumer);}
}

在这里插入图片描述
在这里插入图片描述

5. Topics 通配符模式

在这里插入图片描述
跟 Java、MySQL 一样,我们的 RabbitMQ 也是支持 通配符的,所以我们的 Routing Key 和 Binding Key 也是可以有通配符的。在 RabbitMQ 中的匹配字符有两个 * 和 #。

  • *匹配一个单词
  • #匹配0个或者多个单词

注意 RabbitMQ 中匹配字符匹配的不是字符,而是单词,RabbitMQ 中由 . 分隔一个单词。a.b.c,a b c 都叫做一个单词,aa.bb.cc,aa bb cc 也就做一个单词。

生产者代码:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换器channel.exchangeDeclare(Constants.TOPICS_EXCHANGE, BuiltinExchangeType.TOPIC,true);//声明队列channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.TOPICS_QUEUE2,true,false,false,null);//绑定交换器和队列channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,"*.a.*");channel.queueBind(Constants.TOPICS_QUEUE1,Constants.TOPICS_EXCHANGE,"c.#");channel.queueBind(Constants.TOPICS_QUEUE2,Constants.TOPICS_EXCHANGE,"*.a.*");//生产消息String msg = "topics exchange *.a.*";channel.basicPublish(Constants.TOPICS_EXCHANGE,"hello.a.r",null,msg.getBytes());msg = "topics exchange c.#";channel.basicPublish(Constants.TOPICS_EXCHANGE,"c.world",null,msg.getBytes());channel.close();connection.close();}
}

消费者代码:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.TOPICS_QUEUE1,true,false,false,null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPICS_QUEUE1,consumer);}
}

在这里插入图片描述
在这里插入图片描述

6. RPC RPC通信

在这里插入图片描述

在 RPC 通信的过程中,没有生产者和消费者,比较像咱们的 RPC 远程调用,大概就是通过两个队列实现了一个可回调的过程。

在这里插入图片描述
RPC 通信的过程:

  1. 客户端发送消息到一个指定的队列,并在消息属性中设置 replyTo 字段,这个字段指定了一个回调队列,用于接收服务器的响应
  2. 服务端收到请求后,处理请求并发送响应消息到 replyTo 指定的回调队列
  3. 客户端在回调队列上等待消息,一旦收到响应,客户端会检查消息的 replyTo 属性,以确保它是所期望的响应

RPC 通信客户端代码:

public class RpcClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明交换器 我们这是使用默认的交换器//生命队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);//发送请求String msg = "rpc...";//设置请求的唯一标识String correlationId = UUID.randomUUID().toString();//设置请求的相关属性AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,properties,msg.getBytes());//接收响应//通过阻塞队列来接收响应BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String response = new String(body);System.out.println("接收到回调消息:" + response);if (correlationId.equals(properties.getCorrelationId())) {blockingQueue.offer(response);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,consumer);String result = blockingQueue.take();System.out.println("[RPC Client 响应结果]:" + result);}
}

RPC 服务端代码:

public class RpcServer {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);//接收请求channel.basicQos(1); //这个的作用后面再为大家介绍Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("接收到请求:" + request);String response = "针对request:" + request + ",相应成功";AMQP.BasicProperties properties1 = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,properties1,response.getBytes());//envelope.getDeliveryTag() 每个消息都有一个唯一的deliveryTag//false表示是否批量确认消息channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,consumer);}
}

在这里插入图片描述
在这里插入图片描述

7. Publisher Confirms 发布确认

RabbitMQ的Publisher Confirms(发布确认)机制是一种确保消息从生产者(Publisher)安全发送到RabbitMQ服务器的机制。当生产者向RabbitMQ发送消息时,它可能希望知道消息是否已经被RabbitMQ服务器成功接收并存储起来,以确保消息的可靠性。Publisher Confirms机制就是为了满足这一需求而设计的。

  1. 生产者将 Channel 设置为 confirm 模式 (通过调用 channel.confirmSelect()完成)后,发布的每一条消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联起来,以便跟踪消息的状态
  2. 当消息被 RabbitMQ 服务器接收并处理之后,服务器会异步的像生产者发送一个确认 ACK 给生产者(包含消息的唯一ID),表明消息已经送达

通过这个 Publisher Confirms 模式,可以避免消息的丢失问题。

消息丢失大概分为三种情况:

  1. 生产者问题,因为程序故障,网络抖动等原因,生产者没有向 Broker 发送消息
  2. 消息中间价问题,也就是我们的 RabbitMQ Broker 出现问题,生产者将消息成功的发送给了 Broker,但是 Broker 没有将消息保存好,导致消息丢失
  3. 消费者问题,Broker 将消息成功发送给了消费者,但是消费者在消费的时候,因为没有处理好,导致消费者这里的消息丢失了,并且 broker 也将消费者失败的消息从队列中删除了

RabbitMQ 对于上面可能出现的三种问题都给出了解决,问题 2 可以通过持久化机制解决,问题 3 可以通过消息应答机制解决。针对问题 1,则可以通过 Publisher Confirms 机制解决。

RabbitMQ 发布确认是 0.9.1 协议的扩展,默认情况下他不会被启用,生产者可以通过 channel.confirmSelect() 将信道设置为 confirm 模式。

RabbitMQ 提供的发布确认有三种策略,那么接下来我们来了解一下这三种策略的优劣。

1. 单独确认

它要求生产者(Publisher)在发送每一个消息后,都等待RabbitMQ服务器的确认(Confirm),确保消息已经被RabbitMQ成功接收并处理,然后再继续发送下一个消息。

2. 批量确认

在 RabbitMQ 的发布确认(Publisher Confirms)机制中,批量确认(Batch Acknowledgment)是一个重要的特性,它允许 RabbitMQ 在一次确认消息中同时确认多个消息。这对于提高性能和减少网络开销非常有帮助。

3. 异步确认

RabbitMQ 发布确认中的异步确认策略是一种高效且可靠的机制,用于在消息发送过程中异步地接收确认回调,以提高生产者的吞吐量和性能。

public class PublisherConfirms {private static final Integer MESSAGE_COUNT = 1000;static Connection createConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.IP);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER_NAME);factory.setPassword(Constants.PASSWORD);return factory.newConnection();}public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//Strategy #1: Publishing Messages Individually//单独确认publishingMessagesIndividually();//Strategy #2: Publishing Messages in Batches//批量确认publishingMessagesInBatches();//Strategy #3: Handling Publisher Confirms Asynchronously//异步确认handlingPublisherConfirmsAsynchronously();}/*** 单独确认* @throws IOException* @throws TimeoutException* @throws InterruptedException*/private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {try (Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);//4. 发送消息, 并等待确认long start = System.currentTimeMillis();for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());//等待确认channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 批量确认* @throws IOException* @throws TimeoutException* @throws InterruptedException*/private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {try(Connection connection = createConnection()) {//1. 开启信道Channel channel = connection.createChannel();//2. 设置信道为confirm模式channel.confirmSelect();//3. 声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);//4. 发送消息, 并进行确认long start = System.currentTimeMillis();int batchSize = 100;int outstandingMessageCount = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());outstandingMessageCount++;if (outstandingMessageCount==batchSize){channel.waitForConfirmsOrDie(5000);outstandingMessageCount = 0;}}if (outstandingMessageCount>0){channel.waitForConfirmsOrDie(5000);}long end = System.currentTimeMillis();System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);}}/*** 异步确认* @throws IOException* @throws TimeoutException*/private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException {try (Connection connection = createConnection()) {//开启信道Channel channel = connection.createChannel();//设置信道为confirm模式channel.confirmSelect();//声明转换器 这里我们使用默认的转换器//声明队列channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);//监听confirm//记录开始时间long start= System.currentTimeMillis();//该集合用来存放未确认的消息的IDSortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {//deliveryTag 是消息的唯一ID multiple 表示是否批量确认if (multiple) {//如果是批量确认,则将deliveryTag之前的消息ID都删除了confirmSeqNo.headSet(deliveryTag + 1).clear();}else {confirmSeqNo.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {//这里为了简单,当RabbitMQ Broker无法正确处理消息的话,我们也认为它处理了if (multiple) {confirmSeqNo.headSet(deliveryTag + 1).clear();}else {confirmSeqNo.remove(deliveryTag);}}});//发送消息for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "pulisher confirms" + i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()) {try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}}long end = System.currentTimeMillis();System.out.printf("异步确认策略,消息条数:%d,耗时:%d ms \n",MESSAGE_COUNT,end - start);}}
}

在这里插入图片描述
可以看到单独确认策略所需要的时间是比较多的,而异步策略则能够快速的处理这些。

相关文章:

RabbitMQ 应用

文章目录 前言1. Simple 简单模式2. Work Queue 工作队列模式3. Pubulish/Subscribe 发布/订阅模式Exchange 的类型 4. Routing 路由模式5. Topics 通配符模式6. RPC RPC通信7. Publisher Confirms 发布确认1. 单独确认2. 批量确认3. 异步确认 前言 前面我们学习了 RabbitMQ 的…...

使用Python读取Excel数据的详细指南

在数据分析中&#xff0c;Excel文件是一种常见的数据存储格式。使用Python读取Excel数据可以帮助我们更方便地进行数据处理和分析。本文将介绍如何在Python 2和Python 3中读取Excel数据&#xff0c;具体步骤和代码示例详细说明。 准备工作 在开始之前&#xff0c;请确保你已经…...

VitePress 动态路由与路径加载器详解

在使用 VitePress 构建静态网站时&#xff0c;动态路由功能允许我们通过单个 Markdown 文件和动态数据生成多个页面。本文将详细介绍如何使用动态路由以及路径加载器文件来生成这些页面&#xff0c;并提供实例代码和解释说明。 动态路由基础 动态路由的核心在于使用带有参数的…...

C#编程语言及.NET 平台快速入门指南

Office Word 不显示 Citavi 插件&#xff0c;如何修复&#xff1f;_citavi安装后word无加载项-CSDN博客 https://blog.csdn.net/Viviane_2022/article/details/128946061?spm1001.2100.3001.7377&utm_mediumdistribute.pc_feed_blog_category.none-task-blog-classify_ta…...

高等代数精解【9】

文章目录 向量空间与矩阵矩阵的行列式矩阵A的秩保持不变方阵的行列式线性无关的条件1. 线性组合为零向量的唯一性2. 矩阵的秩3. 几何解释&#xff08;对于二维和三维空间&#xff09;4. 行列式&#xff08;对于方阵&#xff09;总结 矩阵的非零子式基础重要性例子注意事项 非奇…...

谷粒商城の缓存篇

文章目录 前言一、本地缓存和分布式缓存1.本地缓存2.分布式缓存 二、项目实战1.配置Redis2.整合业务代码2.1 缓存击穿2.2 缓存雪崩2.3 缓存穿透2.4 业务代码1.0版2.5 分布式锁1.0版2.6 分布式锁2.0版2.7 Spring Cache及缓存一致性问题2.7.1 Spring Cache2.7.2 缓存一致性问题2.…...

永远学习:为什么人工智能难以适应新挑战

理解深度学习的局限性并追求真正的持续适应 欢迎来到雲闪世界。 “智者适应环境&#xff0c;正如水适应水瓶。”——中国谚语 “适应或灭亡&#xff0c;现在和以往一样&#xff0c;是大自然的必然法则。”——赫伯特乔治威尔斯 近年来&#xff0c;人工智能取得了长足的进步。所…...

【spring】 Jackson :@JsonIgnore 注解

@JsonIgnore 是 Jackson 库中的一个注解,用于在序列化和反序列化过程中忽略某个字段。也就是说,当对象被转换为 JSON 或从 JSON 转换为对象时,带有 @JsonIgnore 注解的字段将不会被包含在内在这个示例中,ignoredField 字段将不会出现在生成的 JSON 字符串中。 import com.…...

Dependencies与DependencyManagement的区别

现在Maven项目管理&#xff0c;在开发中时比较常用的&#xff0c;在一些项目汇总遇到依赖冲突的问题之后&#xff0c;还是没有能有一个很好的解决办法&#xff0c;这次就来看看在使用Maven管理依赖的过程中dependencies与dependencyManagement的区别。 DepencyManagement应用场…...

git svn 日记

1. git log -p -1 --name-only 该命令用于查看最新的一次提交记录的详细信息&#xff0c;包括文件更改情况。 git log&#xff1a;显示 Git 仓库的提交历史。-p&#xff1a;显示每次提交的差异 (diff)&#xff0c;也就是文件内容的修改部分。-1&#xff1a;表示只显示最近的一…...

FSMC

RAM ROM RAM和ROM相比&#xff0c;两者的最大区别是RAM在断电以后保存在上面的数据会自动消失&#xff0c;而ROM不会自动消失&#xff0c;可以长时间断电保存。 并且RAM的速度要远远高于ROM的速度。 SRAM SRAM 的存储单元以锁存器来存储数据&#xff0c;种电路结构不需要定时…...

NAT技术+代理服务器+内网穿透

NAT技术 IPv4协议中&#xff0c;会存在IP地址数量不充足的问题&#xff0c;所以不同的子网中会存在相同IP地址的主机。那么就可以理解为私有网络的IP地址并不是唯一对应的&#xff0c;而公网中的IP地址都是唯一的&#xff0c;所以NAT&#xff08;Network Address Translation&…...

【ABAP】ole2 excel多sheet导入导出

原理就不分享了 原来是用了动态表格&#xff0c;但是要导出不方便&#xff0c;所以就写死了&#xff0c;excel多sheet导入的类放在另一篇文章里 REPORT zcdemo17. INCLUDE ole2incl.DATA: excel TYPE ole2_object,workbooks TYPE ole2_object,workbook TYPE ole2_object…...

图像配准-小结

图像配准&#xff1a;找到一对图像间的几何变换关系&#xff0c;并且将待配准图像根据几何变换关系对齐到参考图像上&#xff0c;从而为图像融合、变化检测/监测提供基础。图像匹配&#xff0c;在某些语境中可能与上面的图像配准指的是一个东西&#xff0c;而在某些语境中可能指…...

【2025】基于Python的空气质量综合分析系统的设计与实现(源码+文档+调试+答疑)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…...

计算机基础知识-2

x86架构的寄存器 AT&T汇编和Intel汇编的区别 每取出完一条指令&#xff0c;PC会自动&#xff0b;"1"&#xff0c;指向下一条要被执行的指令。这里的1是指下一条指令&#xff0c;但是指令本身可能占用多个字节&#xff0c;所以地址可能不是以1叠加 当前执行的是10…...

Ubuntu2204配置连续失败后账户锁定

配置启用pam_faillock sudo nano /etc/pam.d/common-auth在最上面添加以下内容 auth required pam_faillock.so preauth silent audit auth sufficient pam_unix.so nullok try_first_pass auth [defaultdie] pam_faillock.so authfail auditsudo nano /etc/pam.d/…...

windows下安装elasticSearch和kibana

下载es 下载地址官网 下载后是个压缩包(elasticsearch-8.15.0-windows-x86_64)&#xff0c;解压即可 启动 配置 改一下 /conf/jvm.options文件&#xff0c;最后加一行编码配置&#xff0c;这个是为了启动后防止控制台乱码 -Dfile.encodingGBK启动es 依赖jdk8环境&#xf…...

Java-IDEA模拟一个Redis服务器,与Redis客户端进行一次简单的交互。默认端口号:6379

首先要了解Redis的交互协议。 摘抄&#xff1a; 简单字符串&#xff08;Simple Strings&#xff09;: 以 “” 开头&#xff0c;例如 “OK\r\n” 表示一个成功的响应。错误&#xff08;Errors&#xff09;: 以 “-” 开头&#xff0c;例如 “-ERR unknown command\r\n” 表示一…...

WEB服务与虚拟主机/IIS中间件部署

WWW&#xff08;庞大的信息系统&#xff09;是基于客户机/服务器⽅式的信息发现技术和超⽂本技术的综合。网页浏览器//网页服务器 WWW的构建基于三项核⼼技术&#xff1a; HTTP&#xff1a;超文本传输协议&#xff0c;⽤于在Web服务器和客户端之间传输数据。HTML&#xff1a;⽤…...

JAVA开源项目 图书个性化推荐系统 计算机毕业设计

本文项目编号 T 015 &#xff0c;文末自助获取源码 \color{red}{T015&#xff0c;文末自助获取源码} T015&#xff0c;文末自助获取源码 目录 一、系统介绍1.1 业务分析1.2 用例设计1.3 时序设计 二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究…...

Spring Boot 注解探秘:HTTP 请求的魅力之旅

在SpringBoot应用开发中&#xff0c;处理Http请求是一项基础且重要的任务。Spring Boot通过提供一系列丰富的注解极大地简化了这一过程&#xff0c;使得定义请求处理器和路由变得更加直观与便捷。这些注解不仅帮助开发者清晰地定义不同类型的HTTP请求如何被处理&#xff0c;同时…...

TYPE-C USB设计

目录 摘要 TYPE-C电路 握手过程 USB电路 摘要 TYPE-C,是USB的一种接口&#xff0c;USB的第一种接口为常见的USB接口&#xff0c;U盘即为这种接口&#xff1b;第二种接口的形状类似一个凸字&#xff0c;常应用在打印机中&#xff0c;第三种接口即为TYPE-C&#xff0c;支持正…...

Python炒股自动化,怎样理解股票交易性质

炒股自动化&#xff1a;申请官方API接口&#xff0c;散户也可以 python炒股自动化&#xff08;0&#xff09;&#xff0c;申请券商API接口 python炒股自动化&#xff08;1&#xff09;&#xff0c;量化交易接口区别 Python炒股自动化&#xff08;2&#xff09;&#xff1a;获取…...

Vue2 day-02

目录 一. Vue脚手架(Vue CLI) 1.1 安装新版本的Vue脚手架vue/cli 1.2 用命令创建Vue项目 1.2.1 命令创建vue项目 1.2.2 默认创建 1.2.3 自定义创建 1.2.4 基于ui界面创建Vue项目 1.3 分析Vue脚手架生成的项目结构及代码执行 1.3.1 默认创建文件结构 1.3.2 分开放置文…...

什么?!新版 Node.js V22.5 自带 SQLite 模块啦

前言 2024年7月&#xff0c;Node.js V22.5.0 版本发布&#xff0c;自带了 SQLite 模块&#xff0c;意味着开发者可以直接在程序中使用 SQLite 数据库&#xff0c;而无需引入第三方库&#x1f44d;。 话不多说&#xff0c;感觉来体验一波✈。 安装/升级 我现在用的是21.4.0版…...

Maven持续集成(Continuous integration,简称CI)版本友好管理

从Maven 3.5.0-beta-1 版本开始可以在pom文件中使用 r e v i s i o n 、 {revision}、 revision、{sha1}、${changelist}做为版本的占位符。 一、单module简单使用${revision}的场景 <project><modelVersion>4.0.0</modelVersion><parent><groupId…...

EvoSuite使用总结

1.安装EvoSuite插件 以IDEA为例&#xff0c;在Plugins栏搜索EvoSuite后点击install&#xff0c;安装完成后重启IDEA 2.使用EvoSuite 选中文件右键选择Run EvoSuite 生成成功可以看到如下提示&#xff1a; 注意事项&#xff1a; 生成路径&#xff1a;src/test/java 使用juni…...

Cortex-A7:简单中断处理(不可嵌套中断)机制

0 参考资料 ARM Cortex-A(armV7)编程手册V4.0.pdf ARM体系结构与编程第2版1 前言 Cortex-M系列内核MCU中断硬件原生支持嵌套中断&#xff0c;开发者不需要为了实现嵌套中断而进行额外的工作。但在Cortex-A7中&#xff0c;硬件原生是不支持嵌套中断的&#xff0c;这从Cortex-A…...

k8s HPA

水平自动扩容和缩容HPA HPA全称Horizontal Pod Autoscaler&#xff0c;即pod水平自动伸缩。HPA可以基于CPU利用率对replication controller、deployment和replicaset中的pod数量进行自动扩缩容&#xff08;除了CPU利用率&#xff0c;也可以基于其他应用程序提供的度量指标cust…...