当前位置: 首页 > news >正文

rabbitmq五种模式的总结——附java-se实现(详细)

rabbitmq五种模式的总结

完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning

在这里插入图片描述

一、简单模式

(一)简单模式概述

RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色:

  1. 生产者:负责发送消息到队列。
  2. 消费者:负责从队列中接收并处理消息。

在简单模式中,消息的传递是单向的,生产者将消息发送到队列,消费者从队列中接收消息。

image-20250216063036914


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._01_hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦)");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("01-hello2", true, false, false, null);// 6. 发送消息/*** 参数说明:* 1. 交换机名称:空字符串(使用默认交换机)* 2. 路由键:队列名称(01-hello2)* 3. 额外属性:null* 4. 消息内容:字节数组*/channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
结果

image-20250216063335314


(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._01_hello_c;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("01-hello2", false, false, false, null);// 6. 接收消息/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否自动确认:true(消息被消费后自动确认)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("01-hello2", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println("接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
结果

image-20250216063418597

在mq中查看

image-20250216063443454


(四)总结

  1. 简单模式:适用于一对一的简单消息传递场景。
  2. 生产者:负责创建队列并发送消息。
  3. 消费者:负责从队列中接收并处理消息。
  4. 注意事项
    • 队列名称需保持一致,不然一定会报错!
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用完资源后需显式关闭 ChannelConnection

二、工作模式

(一)工作模式概述

工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个消费者。它的特点是:

  1. 一个生产者:负责发送消息到队列。
  2. 多个消费者:共同消费同一个队列中的消息。
  3. 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给消费者。

工作模式适用于任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

image-20250216065036476


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._02_work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("02-work1", true, false, false, null);// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;channel.basicPublish("", "02-work1", null, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 队列声明(queueDeclare):创建队列并设置队列属性。
  2. 消息发送(basicPublish):通过循环发送多条消息到队列。
  3. 持久化队列:设置为 true,确保队列在 RabbitMQ 重启后仍然存在。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._02_work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("02-work1", true, false, false, null);// 6. 设置每次只接收一条消息channel.basicQos(1);// 7. 接收消息/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否自动确认:false(手动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("02-work1", false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 队列声明(queueDeclare):确保队列存在,需与生产者保持一致。
  2. 消息预取(basicQos):设置每次只接收一条消息,避免某个消费者处理过多消息。
  3. 手动确认(basicAck):消息处理完成后手动确认,确保消息不会丢失。
  4. 消息处理耗时:通过 Thread.sleep(1000) 模拟消息处理耗时。
效果

image-20250216065155794

image-20250216065214992


(四)工作模式的特点

  1. 消息分发机制
    • 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个消费者。
    • 可以通过 basicQos 设置每次只接收一条消息,避免某个消费者处理过多消息。
  2. 消息确认机制
    • 设置为手动确认(autoAck=false),确保消息处理完成后才确认。(防止业务处理失败的情况下丢失消息)
    • 如果消费者在处理消息时崩溃,未确认的消息会重新分发给其他消费者。
  3. 适用场景
    • 任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

(五)总结

  1. 工作模式:适用于任务分发场景,多个消费者共同消费同一个队列中的消息。
  2. 生产者:负责发送消息到队列。
  3. 消费者:负责接收并处理消息,支持手动确认和消息预取。
  4. 注意事项
    • 队列名称需保持一致。
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用 basicQos 控制消息分发,避免某个消费者处理过多消息。

三、发布订阅模式

(一)发布订阅模式概述

发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个消费者。它的特点是:

  1. 一个生产者:将消息发送到交换机(Exchange)。
  2. 多个消费者:每个消费者都有自己的队列,并与交换机绑定。
  3. 消息广播:交换机将消息广播给所有绑定的队列。

发布订阅模式适用于消息广播场景,例如日志系统、通知系统等。

image-20250216071658856


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的交换机中。

package top.miqiu._03_pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 交换机类型:fanout(广播模式)*/channel.exchangeDeclare("03-pubsub", "fanout");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 路由键:空字符串(fanout 模式忽略路由键)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建交换机并设置类型为 fanout(广播模式)。
  2. 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  3. 消息广播:消息会被广播到所有绑定到该交换机的队列。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._03_pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare("03-pubsub", "fanout");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:03-pubsub* 3. 路由键:空字符串(fanout 模式忽略路由键)*/channel.queueBind(queue, "03-pubsub", "");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
结果

image-20250216071734904

image-20250216071749761

可以看到两个consumer都消费了相同的消息


(四)发布订阅模式的特点

  1. 消息广播:交换机将消息广播给所有绑定的队列。
  2. 临时队列:消费者可以创建临时队列,队列名称由 RabbitMQ 自动生成。
  3. 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。

(五)总结

  1. 发布订阅模式:适用于消息广播场景,多个消费者各自接收相同的消息。
  2. 生产者:负责将消息发送到交换机。
  3. 消费者:负责创建队列并绑定到交换机,接收并处理消息。
  4. 注意事项
    • 交换机类型需设置为 fanout
    • 队列绑定到交换机时,路由键为空字符串。
    • 临时队列的名称由 RabbitMQ 自动生成。

(六)RabbitMQ 交换机类型总结

交换机类型描述路由行为适用场景
Fanout广播模式,将消息发送到所有绑定到该交换机的队列。忽略路由键(Routing Key),消息会被广播到所有绑定的队列。日志系统、通知系统等需要广播消息的场景。
Direct直接模式,根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。
Topic主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。消息分类、多条件路由等需要灵活匹配的场景。
Headers头部模式,根据消息的头部属性(Headers)进行匹配。不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑,例如根据消息的元数据进行路由。

详细说明

1. Fanout 交换机(广播,常用)
  • 特点
    • 消息会被广播到所有绑定到该交换机的队列。
    • 忽略路由键(Routing Key)。
  • 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。
2. Direct 交换机
  • 特点
    • 消息的路由键必须与队列绑定的路由键完全匹配。
    • 支持一对一或一对多的精确路由。
  • 适用场景
    • 任务分发:将特定任务路由到特定的 Worker。
    • 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
  • 特点
    • 支持通配符匹配:
      • * 匹配一个单词。
      • # 匹配零个或多个单词。
    • 路由键的格式通常是点分字符串(如 user.create)。
  • 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。
4. Headers 交换机
  • 特点
    • 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
    • 支持复杂的匹配规则(如 x-match 参数)。
  • 适用场景
    • 复杂的路由逻辑:根据消息的元数据进行路由。
    • 需要高度灵活性的场景。

对比
场景FanoutDirectTopicHeaders
日志广播所有消费者接收相同的日志消息。不适用。不适用。不适用。
任务分发不适用。将任务路由到特定的 Worker。将任务分类路由到不同的 Worker。根据任务的元数据进行路由。
通知系统所有用户接收相同的通知。特定用户接收特定通知。根据通知类型路由到不同用户。根据通知的元数据进行路由。
消息分类不适用。不适用。根据消息主题进行路由。根据消息的头部属性进行路由。

总结
  • Fanout:适用于广播场景。
  • Direct:适用于精确路由场景。
  • Topic:适用于灵活的路由场景。
  • Headers:适用于复杂的路由逻辑。

四、路由模式

(一)路由模式概述

路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:

  1. 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
  3. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。

路由模式适用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。

image-20250216073521308


(二)生产者代码解析

生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机/*** 参数说明:* 1. 交换机名称:04-routing* 2. 交换机类型:direct*/channel.exchangeDeclare("04-routing", "direct");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:04-routing* 2. 路由键:err(消息将发送到绑定 err 路由键的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Direct 交换机,类型为 direct
  2. 消息发送(basicPublish):指定路由键(如 err),消息会被发送到绑定该路由键的队列。
  3. 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机channel.exchangeDeclare("04-routing", "direct");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:04-routing* 3. 路由键:info、err、waring*/channel.queueBind(queue, "04-routing", "info");channel.queueBind(queue, "04-routing", "err");channel.queueBind(queue, "04-routing", "waring");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如 infoerrwaring)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

consumer1绑定了[info,err,waring],所以在producer绑定了info时发送消息的情况下,consumer1可以接收到信息

image-20250216073403669

由于consumer2绑定的是trace,所以consumer2是接收不到消息的

image-20250216073447112


(四)路由模式的特点

  1. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
  2. 多路由键支持:一个队列可以绑定多个路由键,接收多种类型的消息。
  3. 适用场景
    • 日志级别分类:将不同级别的日志(如 infoerr)路由到不同的队列。
    • 任务分发:将特定任务路由到特定的 Worker。

(五)总结

  1. 路由模式:适用于需要根据路由键精确路由消息的场景。
  2. 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
  4. 注意事项
    • 路由键必须完全匹配。
    • 一个队列可以绑定多个路由键,接收多种类型的消息。

五、Topic 模式

(一)Topic 模式概述

Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:

  1. 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
  3. 灵活的路由:支持通配符匹配:
    • * 匹配一个单词。
    • # 匹配零个或多个单词。

Topic 模式适用于需要根据复杂条件灵活路由消息的场景,例如消息分类、多条件路由等。

image-20250216075115591


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。

package top.miqiu._05_topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机/*** 参数说明:* 1. 交换机名称:05-topic* 2. 交换机类型:topic*/channel.exchangeDeclare("05-topic", "topic");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:05-topic* 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Topic 交换机,类型为 topic
  2. 消息发送(basicPublish):指定路由键(如 user.hi),消息会被发送到匹配的队列。
  3. 通配符匹配
    • * 匹配一个单词(如 user.* 匹配 user.hi,但不匹配 user.hi.there)。
    • # 匹配零个或多个单词(如 user.# 匹配 user.hiuser.hi.there)。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。

package top.miqiu._05_topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机channel.exchangeDeclare("05-topic", "topic");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键模式/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:05-topic* 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)*/channel.queueBind(queue, "05-topic", "user.*");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如 user.*)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

当我在producer使用“employee.hi”作为路由key的时候,绑定了“employee.*”的consumer1可以消费这个消息

image-20250216075306371


(四)Topic 模式的特点

  1. 灵活的路由:支持通配符匹配,可以根据复杂的条件路由消息。
  2. 多路由键支持:一个队列可以绑定多个路由键模式,接收多种类型的消息。
  3. 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。

(五)总结

  1. Topic 模式:适用于需要根据复杂条件灵活路由消息的场景。
  2. 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
  4. 注意事项
    • 路由键模式支持通配符 *#
    • 一个队列可以绑定多个路由键模式,接收多种类型的消息。

相关文章:

rabbitmq五种模式的总结——附java-se实现(详细)

rabbitmq五种模式的总结 完整项目地址&#xff1a;https://github.com/9lucifer/rabbitmq4j-learning 一、简单模式 &#xff08;一&#xff09;简单模式概述 RabbitMQ 的简单模式是最基础的消息队列模式&#xff0c;包含以下两个角色&#xff1a; 生产者&#xff1a;负责发…...

Qt中基于开源库QRencode生成二维码(附工程源码链接)

目录 1.QRencode简介 2.编译qrencode 3.在Qt中直接使用QRencode源码 3.1.添加源码 3.2.用字符串生成二维码 3.3.用二进制数据生成二维码 3.4.界面设计 3.5.效果展示 4.注意事项 5.源码下载 1.QRencode简介 QRencode是一个开源的库&#xff0c;专门用于生成二维码&…...

Java数据结构---链表

目录 一、链表的概念和结构 1、概念 2、结构 二、链表的分类 三、链表的实现 1、创建节点类 2、定义表头 3、创建链表 4、打印链表 5、链表长度 6、看链表中是否包含key 7、在index位置插入val&#xff08;0下标为第一个位置&#xff09; 8、删除第一个关键字key …...

mongodb是怎么分库分表的

在构建高性能的数据库架构时&#xff0c;MongoDB的分库分表策略扮演着至关重要的角色&#xff0c;它通过一系列精细的步骤确保了数据的高效分布与访问。以下是对这一过程的详尽阐述&#xff0c;旨在提供一个清晰且优化过的理解框架。 确定分片键&#xff08;Shard Key&#xf…...

C++自研游戏引擎-碰撞检测组件-八叉树AABB检测算法实现

八叉树碰撞检测是一种在三维空间中高效处理物体碰撞检测的算法&#xff0c;其原理可以类比为一个管理三维空间物体的智能系统。这个示例包含两个部分&#xff1a;八叉树部分用于宏观检测&#xff0c;AABB用于微观检测。AABB可以更换为均值或节点检测来提高检测精度。 八叉树的…...

spring boot对接clerk 实现用户信息获取

在现代Web应用中&#xff0c;用户身份验证和管理是一个关键的功能。Clerk是一个提供身份验证和用户管理的服务&#xff0c;可以帮助开发者快速集成这些功能。在本文中&#xff0c;我们将介绍如何使用Spring Boot对接Clerk&#xff0c;以实现用户信息的获取。 1.介绍 Clerk提供…...

一种动态地址的查询

背景 当我们注入一个进程&#xff0c;通过函数地址进行call时经常会遇到这样的一个问题。对方程序每周四会自动更新。更新后之前的函数地址就变化了&#xff0c;然后需要重新找地址。所以&#xff0c;我就使用了一个动态查询的方式。 第一步&#xff1a;先为需要call的函数生…...

周雨彤:用角色与生活,诠释审美的艺术

提到内娱审美优秀且持续在线的女演员&#xff0c;周雨彤绝对是其中最有代表性的一个。 独树一帜的表演美学 作为新生代演员中的实力派代表&#xff0c;周雨彤凭借细腻的表演和对角色的深度共情&#xff0c;在荧幕上留下了多个令人难忘的“出圈”形象。在《我在他乡挺好的》中…...

使用jks给空apk包签名

1、在平台官方下载空的apk包&#xff08;上传应用时有提醒下载&#xff09; 2、找到jdk目录&#xff0c;比如C:\Program Files\Java\jdk1.8\bin&#xff0c;并把下载的空包apk和jks文件放到bin下 3、以管理员身份运行cmd&#xff0c;如果不是管理员会签名失败 4、用cd定位到…...

500. 键盘行 771. 宝石与石头 简单 find接口的使用

500. 键盘行1 给你一个字符串数组 words &#xff0c;只返回可以使用在 美式键盘 同一行的字母打印出来的单词。键盘如下图所示。 请注意&#xff0c;字符串 不区分大小写&#xff0c;相同字母的大小写形式都被视为在同一行。 美式键盘 中&#xff1a; 第一行由字符 "qwer…...

仙剑世界手游新手攻略 仙剑世界能用云手机玩吗

欢迎来到《仙剑世界》手游的仙侠世界&#xff01;作为新手玩家&#xff0c;以下是一些详细的攻略和建议&#xff0c;帮助你快速上手并享受游戏的乐趣。 一、新手职业推荐 1.轩辕&#xff1a;这是一个偏辅助的职业&#xff0c;可以给队友提供输出加成等增益效果&#xff0c;不过…...

[题解]2024CCPC重庆站-小 C 的神秘图形

Sources&#xff1a;K - 小 C 的神秘图形Abstract&#xff1a;给定正整数 n ( 1 ≤ n ≤ 1 0 5 ) n(1\le n\le 10^5) n(1≤n≤105)&#xff0c;三进制字符串 n 1 , n 2 ( ∣ n 1 ∣ ∣ n 2 ∣ n ) n_1,n_2(|n_1||n_2|n) n1​,n2​(∣n1​∣∣n2​∣n)&#xff0c;按如下方法…...

NPS内网穿透SSH使用手册

1、说明 nps-一款轻量级、高性能、功能强大的内网穿透代理服务器 github地址&#xff1a;https://github.com/ehang-io/nps 官网文档地址&#xff1a;https://ehang-io.github.io/nps/#/?idnps 2、服务端 下载地址&#xff1a;https://github.com/ehang-io/nps/releases 下…...

大幂计算和大阶乘计算【C语言】

大幂计算&#xff1a; #include<stdio.h> long long int c[1000000]{0}; int main() {long long a,b,x1;c[0]1;printf("请输入底数&#xff1a;");scanf("%lld",&a);printf("请输入指数&#xff1a;");scanf("%lld",&b…...

【Linux】详谈 进程控制

目录 一、进程是什么 二、task_struct 三、查看进程 四、创建进程 4.1 fork函数的认识 4.2 2. fork函数的返回值 五、进程终止 5.1. 进程退出的场景 5.2. 进程常见的退出方法 5.2.1 从main返回 5.2.1.1 错误码 5.2.2 exit函数 5.2.3 _exit函数 5.2.4 缓冲区问题补…...

Linux top 命令

作用 top 是一个实时系统监控工具&#xff0c;用于查看系统的资源使用情况和进程状态。 示例 以下是一些常用的 top 命令示例&#xff1a; top &#xff1a;动态显示结果&#xff0c;每 3 秒刷新一次。 top -d 2&#xff1a;动态显示结果&#xff0c;每 2 秒刷新一次。 top …...

Leetcode 424-替换后的最长重复字符

给你一个字符串 s 和一个整数 k 。你可以选择字符串中的任一字符&#xff0c;并将其更改为任何其他大写英文字符。该操作最多可执行 k 次。 在执行上述操作后&#xff0c;返回 包含相同字母的最长子字符串的长度。 题解 可以先做LCR 167/Leetcode 03再做本题 滑动窗口&…...

《StyleDiffusion:通过扩散模型实现可控的解耦风格迁移》学习笔记

paper&#xff1a;2308.07863 目录 摘要 1、介绍 2、相关工作 2.1 神经风格迁移&#xff08;NST&#xff09; 2.2 解耦表示学习&#xff08;DRL&#xff09; 2.3 扩散模型&#xff08;Diffusion Models&#xff09; 3、方法 3.1 风格移除模块 3.2 风格转移模块 3.3 …...

Django 创建表时 “__str__ ”方法的使用

在 Django 模型中&#xff0c;__str__ 方法是一个 Python 特殊方法&#xff08;也称为“魔术方法”&#xff09;&#xff0c;用于定义对象的字符串表示形式。它的作用是控制当对象被转换为字符串时&#xff0c;应该返回什么样的内容。 示例&#xff1a; 我在初学ModelForm时尝…...

图像处理之CSC

CSC 是 Color Space Conversion&#xff08;色彩空间转换&#xff09;的缩写&#xff0c;它涉及图像处理中的亮度、饱和度、对比度和色度等参数的调整。这些参数是图像处理中的核心概念&#xff0c;通常用于描述和操作图像的颜色信息。 以下是亮度、饱和度、对比度和色度与 CS…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

基于Uniapp开发HarmonyOS 5.0旅游应用技术实践

一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架&#xff0c;支持"一次开发&#xff0c;多端部署"&#xff0c;可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务&#xff0c;为旅游应用带来&#xf…...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

Docker 本地安装 mysql 数据库

Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker &#xff1b;并安装。 基础操作不再赘述。 打开 macOS 终端&#xff0c;开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

通过MicroSip配置自己的freeswitch服务器进行调试记录

之前用docker安装的freeswitch的&#xff0c;启动是正常的&#xff0c; 但用下面的Microsip连接不上 主要原因有可能一下几个 1、通过下面命令可以看 [rootlocalhost default]# docker exec -it freeswitch fs_cli -x "sofia status profile internal"Name …...

华为OD最新机试真题-数组组成的最小数字-OD统一考试(B卷)

题目描述 给定一个整型数组,请从该数组中选择3个元素 组成最小数字并输出 (如果数组长度小于3,则选择数组中所有元素来组成最小数字)。 输入描述 行用半角逗号分割的字符串记录的整型数组,0<数组长度<= 100,0<整数的取值范围<= 10000。 输出描述 由3个元素组成…...