rabbitmq五种模式的总结——附java-se实现(详细)
rabbitmq五种模式的总结
完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning

一、简单模式
(一)简单模式概述
RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色:
- 生产者:负责发送消息到队列。
- 消费者:负责从队列中接收并处理消息。
在简单模式中,消息的传递是单向的,生产者将消息发送到队列,消费者从队列中接收消息。

(二)生产者代码解析
代码
生产者负责创建消息并将其发送到指定的队列中。
package top.miqiu._01_hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦)");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("01-hello2", true, false, false, null);// 6. 发送消息/*** 参数说明:* 1. 交换机名称:空字符串(使用默认交换机)* 2. 路由键:队列名称(01-hello2)* 3. 额外属性:null* 4. 消息内容:字节数组*/channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
结果

(三)消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._01_hello_c;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("01-hello2", false, false, false, null);// 6. 接收消息/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否自动确认:true(消息被消费后自动确认)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("01-hello2", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println("接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
结果

在mq中查看

(四)总结
- 简单模式:适用于一对一的简单消息传递场景。
- 生产者:负责创建队列并发送消息。
- 消费者:负责从队列中接收并处理消息。
- 注意事项:
- 队列名称需保持一致,不然一定会报错!
- 消息确认机制需根据业务需求选择自动或手动确认。
- 使用完资源后需显式关闭
Channel和Connection。
二、工作模式
(一)工作模式概述
工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个消费者。它的特点是:
- 一个生产者:负责发送消息到队列。
- 多个消费者:共同消费同一个队列中的消息。
- 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给消费者。
工作模式适用于任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

(二)生产者代码解析
生产者负责创建消息并将其发送到指定的队列中。
package top.miqiu._02_work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("02-work1", true, false, false, null);// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;channel.basicPublish("", "02-work1", null, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 队列声明(queueDeclare):创建队列并设置队列属性。
- 消息发送(basicPublish):通过循环发送多条消息到队列。
- 持久化队列:设置为
true,确保队列在 RabbitMQ 重启后仍然存在。
(三)消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._02_work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("02-work1", true, false, false, null);// 6. 设置每次只接收一条消息channel.basicQos(1);// 7. 接收消息/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否自动确认:false(手动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("02-work1", false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 队列声明(queueDeclare):确保队列存在,需与生产者保持一致。
- 消息预取(basicQos):设置每次只接收一条消息,避免某个消费者处理过多消息。
- 手动确认(basicAck):消息处理完成后手动确认,确保消息不会丢失。
- 消息处理耗时:通过
Thread.sleep(1000)模拟消息处理耗时。
效果


(四)工作模式的特点
- 消息分发机制:
- 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个消费者。
- 可以通过
basicQos设置每次只接收一条消息,避免某个消费者处理过多消息。
- 消息确认机制:
- 设置为手动确认(
autoAck=false),确保消息处理完成后才确认。(防止业务处理失败的情况下丢失消息) - 如果消费者在处理消息时崩溃,未确认的消息会重新分发给其他消费者。
- 设置为手动确认(
- 适用场景:
- 任务分发场景,例如将耗时的任务分发给多个 Worker 处理。
(五)总结
- 工作模式:适用于任务分发场景,多个消费者共同消费同一个队列中的消息。
- 生产者:负责发送消息到队列。
- 消费者:负责接收并处理消息,支持手动确认和消息预取。
- 注意事项:
- 队列名称需保持一致。
- 消息确认机制需根据业务需求选择自动或手动确认。
- 使用
basicQos控制消息分发,避免某个消费者处理过多消息。
三、发布订阅模式
(一)发布订阅模式概述
发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个消费者。它的特点是:
- 一个生产者:将消息发送到交换机(Exchange)。
- 多个消费者:每个消费者都有自己的队列,并与交换机绑定。
- 消息广播:交换机将消息广播给所有绑定的队列。
发布订阅模式适用于消息广播场景,例如日志系统、通知系统等。

(二)生产者代码解析
生产者负责创建消息并将其发送到指定的交换机中。
package top.miqiu._03_pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 交换机类型:fanout(广播模式)*/channel.exchangeDeclare("03-pubsub", "fanout");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 路由键:空字符串(fanout 模式忽略路由键)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 交换机声明(exchangeDeclare):创建交换机并设置类型为
fanout(广播模式)。 - 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(
fanout模式忽略路由键)。 - 消息广播:消息会被广播到所有绑定到该交换机的队列。
(三)消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._03_pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare("03-pubsub", "fanout");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:03-pubsub* 3. 路由键:空字符串(fanout 模式忽略路由键)*/channel.queueBind(queue, "03-pubsub", "");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持一致。
- 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(
fanout模式忽略路由键)。 - 消息接收(basicConsume):从队列中接收消息并处理。
结果


可以看到两个consumer都消费了相同的消息
(四)发布订阅模式的特点
- 消息广播:交换机将消息广播给所有绑定的队列。
- 临时队列:消费者可以创建临时队列,队列名称由 RabbitMQ 自动生成。
- 适用场景:
- 日志系统:将日志消息广播给多个消费者。
- 通知系统:将通知消息广播给多个用户。
(五)总结
- 发布订阅模式:适用于消息广播场景,多个消费者各自接收相同的消息。
- 生产者:负责将消息发送到交换机。
- 消费者:负责创建队列并绑定到交换机,接收并处理消息。
- 注意事项:
- 交换机类型需设置为
fanout。 - 队列绑定到交换机时,路由键为空字符串。
- 临时队列的名称由 RabbitMQ 自动生成。
- 交换机类型需设置为
(六)RabbitMQ 交换机类型总结
| 交换机类型 | 描述 | 路由行为 | 适用场景 |
|---|---|---|---|
| Fanout | 广播模式,将消息发送到所有绑定到该交换机的队列。 | 忽略路由键(Routing Key),消息会被广播到所有绑定的队列。 | 日志系统、通知系统等需要广播消息的场景。 |
| Direct | 直接模式,根据路由键将消息发送到匹配的队列。 | 消息的路由键必须与队列绑定的路由键完全匹配。 | 任务分发、点对点通信等需要精确路由的场景。 |
| Topic | 主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。 | 支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。 | 消息分类、多条件路由等需要灵活匹配的场景。 |
| Headers | 头部模式,根据消息的头部属性(Headers)进行匹配。 | 不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。 | 复杂的路由逻辑,例如根据消息的元数据进行路由。 |
详细说明
1. Fanout 交换机(广播,常用)
- 特点:
- 消息会被广播到所有绑定到该交换机的队列。
- 忽略路由键(Routing Key)。
- 适用场景:
- 日志系统:将日志消息广播给多个消费者。
- 通知系统:将通知消息广播给多个用户。
2. Direct 交换机
- 特点:
- 消息的路由键必须与队列绑定的路由键完全匹配。
- 支持一对一或一对多的精确路由。
- 适用场景:
- 任务分发:将特定任务路由到特定的 Worker。
- 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
- 特点:
- 支持通配符匹配:
*匹配一个单词。#匹配零个或多个单词。
- 路由键的格式通常是点分字符串(如
user.create)。
- 支持通配符匹配:
- 适用场景:
- 消息分类:根据消息的主题进行路由。
- 多条件路由:支持灵活的路由规则。
4. Headers 交换机
- 特点:
- 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
- 支持复杂的匹配规则(如
x-match参数)。
- 适用场景:
- 复杂的路由逻辑:根据消息的元数据进行路由。
- 需要高度灵活性的场景。
对比
| 场景 | Fanout | Direct | Topic | Headers |
|---|---|---|---|---|
| 日志广播 | 所有消费者接收相同的日志消息。 | 不适用。 | 不适用。 | 不适用。 |
| 任务分发 | 不适用。 | 将任务路由到特定的 Worker。 | 将任务分类路由到不同的 Worker。 | 根据任务的元数据进行路由。 |
| 通知系统 | 所有用户接收相同的通知。 | 特定用户接收特定通知。 | 根据通知类型路由到不同用户。 | 根据通知的元数据进行路由。 |
| 消息分类 | 不适用。 | 不适用。 | 根据消息主题进行路由。 | 根据消息的头部属性进行路由。 |
总结
- Fanout:适用于广播场景。
- Direct:适用于精确路由场景。
- Topic:适用于灵活的路由场景。
- Headers:适用于复杂的路由逻辑。
四、路由模式
(一)路由模式概述
路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:
- 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
- 多个消费者:每个消费者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
- 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
路由模式适用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。

(二)生产者代码解析
生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。
package top.miqiu._04_routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机/*** 参数说明:* 1. 交换机名称:04-routing* 2. 交换机类型:direct*/channel.exchangeDeclare("04-routing", "direct");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:04-routing* 2. 路由键:err(消息将发送到绑定 err 路由键的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 交换机声明(exchangeDeclare):创建 Direct 交换机,类型为
direct。 - 消息发送(basicPublish):指定路由键(如
err),消息会被发送到绑定该路由键的队列。 - 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。
(三)消费者代码解析
代码
消费者负责创建队列并绑定到 Direct 交换机,同时指定路由键。
package top.miqiu._04_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机channel.exchangeDeclare("04-routing", "direct");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:04-routing* 3. 路由键:info、err、waring*/channel.queueBind(queue, "04-routing", "info");channel.queueBind(queue, "04-routing", "err");channel.queueBind(queue, "04-routing", "waring");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持一致。
- 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如
info、err、waring)。 - 消息接收(basicConsume):从队列中接收消息并处理。
效果
consumer1绑定了[info,err,waring],所以在producer绑定了info时发送消息的情况下,consumer1可以接收到信息

由于consumer2绑定的是trace,所以consumer2是接收不到消息的

(四)路由模式的特点
- 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
- 多路由键支持:一个队列可以绑定多个路由键,接收多种类型的消息。
- 适用场景:
- 日志级别分类:将不同级别的日志(如
info、err)路由到不同的队列。 - 任务分发:将特定任务路由到特定的 Worker。
- 日志级别分类:将不同级别的日志(如
(五)总结
- 路由模式:适用于需要根据路由键精确路由消息的场景。
- 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
- 消费者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
- 注意事项:
- 路由键必须完全匹配。
- 一个队列可以绑定多个路由键,接收多种类型的消息。
五、Topic 模式
(一)Topic 模式概述
Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:
- 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
- 多个消费者:每个消费者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
- 灵活的路由:支持通配符匹配:
*匹配一个单词。#匹配零个或多个单词。
Topic 模式适用于需要根据复杂条件灵活路由消息的场景,例如消息分类、多条件路由等。

(二)生产者代码解析
代码
生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。
package top.miqiu._05_topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机/*** 参数说明:* 1. 交换机名称:05-topic* 2. 交换机类型:topic*/channel.exchangeDeclare("05-topic", "topic");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:05-topic* 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 交换机声明(exchangeDeclare):创建 Topic 交换机,类型为
topic。 - 消息发送(basicPublish):指定路由键(如
user.hi),消息会被发送到匹配的队列。 - 通配符匹配:
*匹配一个单词(如user.*匹配user.hi,但不匹配user.hi.there)。#匹配零个或多个单词(如user.#匹配user.hi和user.hi.there)。
(三)消费者代码解析
代码
消费者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
package top.miqiu._05_topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机channel.exchangeDeclare("05-topic", "topic");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键模式/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:05-topic* 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)*/channel.queueBind(queue, "05-topic", "user.*");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持一致。
- 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如
user.*)。 - 消息接收(basicConsume):从队列中接收消息并处理。
效果
当我在producer使用“employee.hi”作为路由key的时候,绑定了“employee.*”的consumer1可以消费这个消息

(四)Topic 模式的特点
- 灵活的路由:支持通配符匹配,可以根据复杂的条件路由消息。
- 多路由键支持:一个队列可以绑定多个路由键模式,接收多种类型的消息。
- 适用场景:
- 消息分类:根据消息的主题进行路由。
- 多条件路由:支持灵活的路由规则。
(五)总结
- Topic 模式:适用于需要根据复杂条件灵活路由消息的场景。
- 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
- 消费者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
- 注意事项:
- 路由键模式支持通配符
*和#。 - 一个队列可以绑定多个路由键模式,接收多种类型的消息。
- 路由键模式支持通配符
相关文章:
rabbitmq五种模式的总结——附java-se实现(详细)
rabbitmq五种模式的总结 完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning 一、简单模式 (一)简单模式概述 RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色: 生产者:负责发…...
Qt中基于开源库QRencode生成二维码(附工程源码链接)
目录 1.QRencode简介 2.编译qrencode 3.在Qt中直接使用QRencode源码 3.1.添加源码 3.2.用字符串生成二维码 3.3.用二进制数据生成二维码 3.4.界面设计 3.5.效果展示 4.注意事项 5.源码下载 1.QRencode简介 QRencode是一个开源的库,专门用于生成二维码&…...
Java数据结构---链表
目录 一、链表的概念和结构 1、概念 2、结构 二、链表的分类 三、链表的实现 1、创建节点类 2、定义表头 3、创建链表 4、打印链表 5、链表长度 6、看链表中是否包含key 7、在index位置插入val(0下标为第一个位置) 8、删除第一个关键字key …...
mongodb是怎么分库分表的
在构建高性能的数据库架构时,MongoDB的分库分表策略扮演着至关重要的角色,它通过一系列精细的步骤确保了数据的高效分布与访问。以下是对这一过程的详尽阐述,旨在提供一个清晰且优化过的理解框架。 确定分片键(Shard Key…...
C++自研游戏引擎-碰撞检测组件-八叉树AABB检测算法实现
八叉树碰撞检测是一种在三维空间中高效处理物体碰撞检测的算法,其原理可以类比为一个管理三维空间物体的智能系统。这个示例包含两个部分:八叉树部分用于宏观检测,AABB用于微观检测。AABB可以更换为均值或节点检测来提高检测精度。 八叉树的…...
spring boot对接clerk 实现用户信息获取
在现代Web应用中,用户身份验证和管理是一个关键的功能。Clerk是一个提供身份验证和用户管理的服务,可以帮助开发者快速集成这些功能。在本文中,我们将介绍如何使用Spring Boot对接Clerk,以实现用户信息的获取。 1.介绍 Clerk提供…...
一种动态地址的查询
背景 当我们注入一个进程,通过函数地址进行call时经常会遇到这样的一个问题。对方程序每周四会自动更新。更新后之前的函数地址就变化了,然后需要重新找地址。所以,我就使用了一个动态查询的方式。 第一步:先为需要call的函数生…...
周雨彤:用角色与生活,诠释审美的艺术
提到内娱审美优秀且持续在线的女演员,周雨彤绝对是其中最有代表性的一个。 独树一帜的表演美学 作为新生代演员中的实力派代表,周雨彤凭借细腻的表演和对角色的深度共情,在荧幕上留下了多个令人难忘的“出圈”形象。在《我在他乡挺好的》中…...
使用jks给空apk包签名
1、在平台官方下载空的apk包(上传应用时有提醒下载) 2、找到jdk目录,比如C:\Program Files\Java\jdk1.8\bin,并把下载的空包apk和jks文件放到bin下 3、以管理员身份运行cmd,如果不是管理员会签名失败 4、用cd定位到…...
500. 键盘行 771. 宝石与石头 简单 find接口的使用
500. 键盘行1 给你一个字符串数组 words ,只返回可以使用在 美式键盘 同一行的字母打印出来的单词。键盘如下图所示。 请注意,字符串 不区分大小写,相同字母的大小写形式都被视为在同一行。 美式键盘 中: 第一行由字符 "qwer…...
仙剑世界手游新手攻略 仙剑世界能用云手机玩吗
欢迎来到《仙剑世界》手游的仙侠世界!作为新手玩家,以下是一些详细的攻略和建议,帮助你快速上手并享受游戏的乐趣。 一、新手职业推荐 1.轩辕:这是一个偏辅助的职业,可以给队友提供输出加成等增益效果,不过…...
[题解]2024CCPC重庆站-小 C 的神秘图形
Sources:K - 小 C 的神秘图形Abstract:给定正整数 n ( 1 ≤ n ≤ 1 0 5 ) n(1\le n\le 10^5) n(1≤n≤105),三进制字符串 n 1 , n 2 ( ∣ n 1 ∣ ∣ n 2 ∣ n ) n_1,n_2(|n_1||n_2|n) n1,n2(∣n1∣∣n2∣n),按如下方法…...
NPS内网穿透SSH使用手册
1、说明 nps-一款轻量级、高性能、功能强大的内网穿透代理服务器 github地址:https://github.com/ehang-io/nps 官网文档地址:https://ehang-io.github.io/nps/#/?idnps 2、服务端 下载地址:https://github.com/ehang-io/nps/releases 下…...
大幂计算和大阶乘计算【C语言】
大幂计算: #include<stdio.h> long long int c[1000000]{0}; int main() {long long a,b,x1;c[0]1;printf("请输入底数:");scanf("%lld",&a);printf("请输入指数:");scanf("%lld",&b…...
【Linux】详谈 进程控制
目录 一、进程是什么 二、task_struct 三、查看进程 四、创建进程 4.1 fork函数的认识 4.2 2. fork函数的返回值 五、进程终止 5.1. 进程退出的场景 5.2. 进程常见的退出方法 5.2.1 从main返回 5.2.1.1 错误码 5.2.2 exit函数 5.2.3 _exit函数 5.2.4 缓冲区问题补…...
Linux top 命令
作用 top 是一个实时系统监控工具,用于查看系统的资源使用情况和进程状态。 示例 以下是一些常用的 top 命令示例: top :动态显示结果,每 3 秒刷新一次。 top -d 2:动态显示结果,每 2 秒刷新一次。 top …...
Leetcode 424-替换后的最长重复字符
给你一个字符串 s 和一个整数 k 。你可以选择字符串中的任一字符,并将其更改为任何其他大写英文字符。该操作最多可执行 k 次。 在执行上述操作后,返回 包含相同字母的最长子字符串的长度。 题解 可以先做LCR 167/Leetcode 03再做本题 滑动窗口&…...
《StyleDiffusion:通过扩散模型实现可控的解耦风格迁移》学习笔记
paper:2308.07863 目录 摘要 1、介绍 2、相关工作 2.1 神经风格迁移(NST) 2.2 解耦表示学习(DRL) 2.3 扩散模型(Diffusion Models) 3、方法 3.1 风格移除模块 3.2 风格转移模块 3.3 …...
Django 创建表时 “__str__ ”方法的使用
在 Django 模型中,__str__ 方法是一个 Python 特殊方法(也称为“魔术方法”),用于定义对象的字符串表示形式。它的作用是控制当对象被转换为字符串时,应该返回什么样的内容。 示例: 我在初学ModelForm时尝…...
图像处理之CSC
CSC 是 Color Space Conversion(色彩空间转换)的缩写,它涉及图像处理中的亮度、饱和度、对比度和色度等参数的调整。这些参数是图像处理中的核心概念,通常用于描述和操作图像的颜色信息。 以下是亮度、饱和度、对比度和色度与 CS…...
记第一次运行codex
一、问的问题 › 我有3个c文件:" file1.c(定义变量的地方)#include <stdio.h>// 定义全局变量(只定义一次)int global_var 100;void print_value(){printf("file1.c 中的 global_var %d\n", global_var);}…...
使用 Taotoken 为你的 Node.js 后端服务稳定接入多模型能力
使用 Taotoken 为你的 Node.js 后端服务稳定接入多模型能力 1. 场景需求与方案选择 假设你正在开发一个需要 AI 对话功能的 Web 应用,后端采用 Node.js 技术栈。这类场景通常面临几个核心需求:需要稳定可靠的大模型调用接口、能够灵活切换不同模型以适…...
如何高效使用VLC媒体播放器:5个必备技巧与完整指南
如何高效使用VLC媒体播放器:5个必备技巧与完整指南 【免费下载链接】vlc VLC media player - All pull requests are ignored, please use MRs on https://code.videolan.org/videolan/vlc 项目地址: https://gitcode.com/gh_mirrors/vl/vlc VLC媒体播放器作…...
别再死记硬背Transformer了!用PyTorch手把手实现一个简易翻译模型(附完整代码)
用PyTorch从零构建Transformer翻译模型:代码驱动的深度学习实践 如果你已经读过Transformer的论文或看过相关教程,却依然对如何实现这个革命性架构感到迷茫,那么这篇文章正是为你准备的。我们将避开繁琐的理论推导,直接进入代码层…...
告别FreeRTOS?手把手教你用NuttX在STM32上跑个“小Linux”(附完整配置流程)
从FreeRTOS到NuttX:在STM32上构建类Linux开发环境的完整指南 嵌入式开发者们是否厌倦了传统RTOS繁琐的API调用?是否渴望在资源受限的微控制器上获得接近Linux的开发体验?NuttX正是为解决这些痛点而生。这个独特的实时操作系统将POSIX兼容性带…...
新房装修、养宠除味、母婴抗敏:霍尼韦尔三款空气净化器全场景推荐
众所周知,空气质量直接影响日常生活的舒适度与健康。面对市面上繁多的空气净化器品牌与型号,不少消费者在选购时感到困惑。霍尼韦尔空气净化器凭借卓越的技术与良好的品牌声誉,在中国高端空气净化器市场的全渠道监测销额中位列第一࿰…...
如何用Botty实现暗黑2重制版自动化刷宝:从新手到高手的完整指南
如何用Botty实现暗黑2重制版自动化刷宝:从新手到高手的完整指南 【免费下载链接】botty D2R Pixel Bot 项目地址: https://gitcode.com/gh_mirrors/bo/botty 还在为暗黑2重制版中重复的刷怪、拾取、整理而疲惫吗?Botty作为一款开源的像素级自动化…...
从零部署到SLO达标:MCP 2026推理引擎集成避坑清单(含12个已验证的Kubernetes Operator配置缺陷)
更多请点击: https://intelliparadigm.com 第一章:从零部署到SLO达标:MCP 2026推理引擎集成避坑清单(含12个已验证的Kubernetes Operator配置缺陷) MCP 2026 是新一代低延迟、高吞吐推理引擎,其 Operator …...
初创团队如何利用 Taotoken 统一管理多个 AI 项目的 API 密钥与访问
初创团队如何利用 Taotoken 统一管理多个 AI 项目的 API 密钥与访问 1. 多项目密钥管理的核心挑战 初创团队在同时推进多个 AI 应用原型开发时,通常会面临三个典型问题。首先是密钥分散管理带来的安全隐患,不同成员可能将 API Key 硬编码在代码或配置文…...
Taotoken 用量看板与账单追溯功能如何帮助控制项目预算
Taotoken 用量看板与账单追溯功能如何帮助控制项目预算 1. 用量看板的核心观测维度 Taotoken 用量看板为项目管理者提供了多维度的实时观测能力。在控制台首页的用量概览区域,可以直观查看当前计费周期内的总 Token 消耗量、各模型调用占比以及费用分布。这些数据…...
