rabbitmq五种模式的总结——附java-se实现(详细)
rabbitmq五种模式的总结
完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning
一、简单模式
(一)简单模式概述
RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色:
- 生产者:负责发送消息到队列。
- 消费者:负责从队列中接收并处理消息。
在简单模式中,消息的传递是单向的,生产者将消息发送到队列,消费者从队列中接收消息。
(二)生产者代码解析
代码
生产者负责创建消息并将其发送到指定的队列中。
package top.miqiu._01_hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦)");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("01-hello2", true, false, false, null);// 6. 发送消息/*** 参数说明:* 1. 交换机名称:空字符串(使用默认交换机)* 2. 路由键:队列名称(01-hello2)* 3. 额外属性:null* 4. 消息内容:字节数组*/channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
结果
(三)消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._01_hello_c;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("01-hello2", false, false, false, null);// 6. 接收消息/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否自动确认:true(消息被消费后自动确认)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("01-hello2", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println("接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
结果
在mq中查看
(四)总结
- 简单模式:适用于一对一的简单消息传递场景。
- 生产者:负责创建队列并发送消息。
- 消费者:负责从队列中接收并处理消息。
- 注意事项:
- 队列名称需保持一致,不然一定会报错!
- 消息确认机制需根据业务需求选择自动或手动确认。
- 使用完资源后需显式关闭
Channel
和Connection
。
二、工作模式
(一)工作模式概述
工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个消费者。它的特点是:
- 一个生产者:负责发送消息到队列。
- 多个消费者:共同消费同一个队列中的消息。
- 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给消费者。
工作模式适用于任务分发场景,例如将耗时的任务分发给多个 Worker 处理。
(二)生产者代码解析
生产者负责创建消息并将其发送到指定的队列中。
package top.miqiu._02_work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("02-work1", true, false, false, null);// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;channel.basicPublish("", "02-work1", null, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 队列声明(queueDeclare):创建队列并设置队列属性。
- 消息发送(basicPublish):通过循环发送多条消息到队列。
- 持久化队列:设置为
true
,确保队列在 RabbitMQ 重启后仍然存在。
(三)消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._02_work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("02-work1", true, false, false, null);// 6. 设置每次只接收一条消息channel.basicQos(1);// 7. 接收消息/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否自动确认:false(手动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("02-work1", false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 队列声明(queueDeclare):确保队列存在,需与生产者保持一致。
- 消息预取(basicQos):设置每次只接收一条消息,避免某个消费者处理过多消息。
- 手动确认(basicAck):消息处理完成后手动确认,确保消息不会丢失。
- 消息处理耗时:通过
Thread.sleep(1000)
模拟消息处理耗时。
效果
(四)工作模式的特点
- 消息分发机制:
- 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个消费者。
- 可以通过
basicQos
设置每次只接收一条消息,避免某个消费者处理过多消息。
- 消息确认机制:
- 设置为手动确认(
autoAck=false
),确保消息处理完成后才确认。(防止业务处理失败的情况下丢失消息) - 如果消费者在处理消息时崩溃,未确认的消息会重新分发给其他消费者。
- 设置为手动确认(
- 适用场景:
- 任务分发场景,例如将耗时的任务分发给多个 Worker 处理。
(五)总结
- 工作模式:适用于任务分发场景,多个消费者共同消费同一个队列中的消息。
- 生产者:负责发送消息到队列。
- 消费者:负责接收并处理消息,支持手动确认和消息预取。
- 注意事项:
- 队列名称需保持一致。
- 消息确认机制需根据业务需求选择自动或手动确认。
- 使用
basicQos
控制消息分发,避免某个消费者处理过多消息。
三、发布订阅模式
(一)发布订阅模式概述
发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个消费者。它的特点是:
- 一个生产者:将消息发送到交换机(Exchange)。
- 多个消费者:每个消费者都有自己的队列,并与交换机绑定。
- 消息广播:交换机将消息广播给所有绑定的队列。
发布订阅模式适用于消息广播场景,例如日志系统、通知系统等。
(二)生产者代码解析
生产者负责创建消息并将其发送到指定的交换机中。
package top.miqiu._03_pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 交换机类型:fanout(广播模式)*/channel.exchangeDeclare("03-pubsub", "fanout");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 路由键:空字符串(fanout 模式忽略路由键)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 交换机声明(exchangeDeclare):创建交换机并设置类型为
fanout
(广播模式)。 - 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(
fanout
模式忽略路由键)。 - 消息广播:消息会被广播到所有绑定到该交换机的队列。
(三)消费者代码解析
代码
消费者负责从队列中接收并处理消息。
package top.miqiu._03_pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare("03-pubsub", "fanout");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:03-pubsub* 3. 路由键:空字符串(fanout 模式忽略路由键)*/channel.queueBind(queue, "03-pubsub", "");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持一致。
- 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(
fanout
模式忽略路由键)。 - 消息接收(basicConsume):从队列中接收消息并处理。
结果
可以看到两个consumer都消费了相同的消息
(四)发布订阅模式的特点
- 消息广播:交换机将消息广播给所有绑定的队列。
- 临时队列:消费者可以创建临时队列,队列名称由 RabbitMQ 自动生成。
- 适用场景:
- 日志系统:将日志消息广播给多个消费者。
- 通知系统:将通知消息广播给多个用户。
(五)总结
- 发布订阅模式:适用于消息广播场景,多个消费者各自接收相同的消息。
- 生产者:负责将消息发送到交换机。
- 消费者:负责创建队列并绑定到交换机,接收并处理消息。
- 注意事项:
- 交换机类型需设置为
fanout
。 - 队列绑定到交换机时,路由键为空字符串。
- 临时队列的名称由 RabbitMQ 自动生成。
- 交换机类型需设置为
(六)RabbitMQ 交换机类型总结
交换机类型 | 描述 | 路由行为 | 适用场景 |
---|---|---|---|
Fanout | 广播模式,将消息发送到所有绑定到该交换机的队列。 | 忽略路由键(Routing Key),消息会被广播到所有绑定的队列。 | 日志系统、通知系统等需要广播消息的场景。 |
Direct | 直接模式,根据路由键将消息发送到匹配的队列。 | 消息的路由键必须与队列绑定的路由键完全匹配。 | 任务分发、点对点通信等需要精确路由的场景。 |
Topic | 主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。 | 支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。 | 消息分类、多条件路由等需要灵活匹配的场景。 |
Headers | 头部模式,根据消息的头部属性(Headers)进行匹配。 | 不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。 | 复杂的路由逻辑,例如根据消息的元数据进行路由。 |
详细说明
1. Fanout 交换机(广播,常用)
- 特点:
- 消息会被广播到所有绑定到该交换机的队列。
- 忽略路由键(Routing Key)。
- 适用场景:
- 日志系统:将日志消息广播给多个消费者。
- 通知系统:将通知消息广播给多个用户。
2. Direct 交换机
- 特点:
- 消息的路由键必须与队列绑定的路由键完全匹配。
- 支持一对一或一对多的精确路由。
- 适用场景:
- 任务分发:将特定任务路由到特定的 Worker。
- 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
- 特点:
- 支持通配符匹配:
*
匹配一个单词。#
匹配零个或多个单词。
- 路由键的格式通常是点分字符串(如
user.create
)。
- 支持通配符匹配:
- 适用场景:
- 消息分类:根据消息的主题进行路由。
- 多条件路由:支持灵活的路由规则。
4. Headers 交换机
- 特点:
- 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
- 支持复杂的匹配规则(如
x-match
参数)。
- 适用场景:
- 复杂的路由逻辑:根据消息的元数据进行路由。
- 需要高度灵活性的场景。
对比
场景 | Fanout | Direct | Topic | Headers |
---|---|---|---|---|
日志广播 | 所有消费者接收相同的日志消息。 | 不适用。 | 不适用。 | 不适用。 |
任务分发 | 不适用。 | 将任务路由到特定的 Worker。 | 将任务分类路由到不同的 Worker。 | 根据任务的元数据进行路由。 |
通知系统 | 所有用户接收相同的通知。 | 特定用户接收特定通知。 | 根据通知类型路由到不同用户。 | 根据通知的元数据进行路由。 |
消息分类 | 不适用。 | 不适用。 | 根据消息主题进行路由。 | 根据消息的头部属性进行路由。 |
总结
- Fanout:适用于广播场景。
- Direct:适用于精确路由场景。
- Topic:适用于灵活的路由场景。
- Headers:适用于复杂的路由逻辑。
四、路由模式
(一)路由模式概述
路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:
- 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
- 多个消费者:每个消费者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
- 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
路由模式适用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。
(二)生产者代码解析
生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。
package top.miqiu._04_routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机/*** 参数说明:* 1. 交换机名称:04-routing* 2. 交换机类型:direct*/channel.exchangeDeclare("04-routing", "direct");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:04-routing* 2. 路由键:err(消息将发送到绑定 err 路由键的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 交换机声明(exchangeDeclare):创建 Direct 交换机,类型为
direct
。 - 消息发送(basicPublish):指定路由键(如
err
),消息会被发送到绑定该路由键的队列。 - 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。
(三)消费者代码解析
代码
消费者负责创建队列并绑定到 Direct 交换机,同时指定路由键。
package top.miqiu._04_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机channel.exchangeDeclare("04-routing", "direct");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:04-routing* 3. 路由键:info、err、waring*/channel.queueBind(queue, "04-routing", "info");channel.queueBind(queue, "04-routing", "err");channel.queueBind(queue, "04-routing", "waring");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持一致。
- 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如
info
、err
、waring
)。 - 消息接收(basicConsume):从队列中接收消息并处理。
效果
consumer1绑定了[info,err,waring],所以在producer绑定了info时发送消息的情况下,consumer1可以接收到信息
由于consumer2绑定的是trace,所以consumer2是接收不到消息的
(四)路由模式的特点
- 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
- 多路由键支持:一个队列可以绑定多个路由键,接收多种类型的消息。
- 适用场景:
- 日志级别分类:将不同级别的日志(如
info
、err
)路由到不同的队列。 - 任务分发:将特定任务路由到特定的 Worker。
- 日志级别分类:将不同级别的日志(如
(五)总结
- 路由模式:适用于需要根据路由键精确路由消息的场景。
- 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
- 消费者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
- 注意事项:
- 路由键必须完全匹配。
- 一个队列可以绑定多个路由键,接收多种类型的消息。
五、Topic 模式
(一)Topic 模式概述
Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:
- 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
- 多个消费者:每个消费者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
- 灵活的路由:支持通配符匹配:
*
匹配一个单词。#
匹配零个或多个单词。
Topic 模式适用于需要根据复杂条件灵活路由消息的场景,例如消息分类、多条件路由等。
(二)生产者代码解析
代码
生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。
package top.miqiu._05_topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机/*** 参数说明:* 1. 交换机名称:05-topic* 2. 交换机类型:topic*/channel.exchangeDeclare("05-topic", "topic");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:05-topic* 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
- 交换机声明(exchangeDeclare):创建 Topic 交换机,类型为
topic
。 - 消息发送(basicPublish):指定路由键(如
user.hi
),消息会被发送到匹配的队列。 - 通配符匹配:
*
匹配一个单词(如user.*
匹配user.hi
,但不匹配user.hi.there
)。#
匹配零个或多个单词(如user.#
匹配user.hi
和user.hi.there
)。
(三)消费者代码解析
代码
消费者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
package top.miqiu._05_topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机channel.exchangeDeclare("05-topic", "topic");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键模式/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:05-topic* 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)*/channel.queueBind(queue, "05-topic", "user.*");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
- 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持一致。
- 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
- 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如
user.*
)。 - 消息接收(basicConsume):从队列中接收消息并处理。
效果
当我在producer使用“employee.hi”作为路由key的时候,绑定了“employee.*”的consumer1可以消费这个消息
(四)Topic 模式的特点
- 灵活的路由:支持通配符匹配,可以根据复杂的条件路由消息。
- 多路由键支持:一个队列可以绑定多个路由键模式,接收多种类型的消息。
- 适用场景:
- 消息分类:根据消息的主题进行路由。
- 多条件路由:支持灵活的路由规则。
(五)总结
- Topic 模式:适用于需要根据复杂条件灵活路由消息的场景。
- 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
- 消费者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
- 注意事项:
- 路由键模式支持通配符
*
和#
。 - 一个队列可以绑定多个路由键模式,接收多种类型的消息。
- 路由键模式支持通配符
相关文章:

rabbitmq五种模式的总结——附java-se实现(详细)
rabbitmq五种模式的总结 完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning 一、简单模式 (一)简单模式概述 RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色: 生产者:负责发…...

Qt中基于开源库QRencode生成二维码(附工程源码链接)
目录 1.QRencode简介 2.编译qrencode 3.在Qt中直接使用QRencode源码 3.1.添加源码 3.2.用字符串生成二维码 3.3.用二进制数据生成二维码 3.4.界面设计 3.5.效果展示 4.注意事项 5.源码下载 1.QRencode简介 QRencode是一个开源的库,专门用于生成二维码&…...

Java数据结构---链表
目录 一、链表的概念和结构 1、概念 2、结构 二、链表的分类 三、链表的实现 1、创建节点类 2、定义表头 3、创建链表 4、打印链表 5、链表长度 6、看链表中是否包含key 7、在index位置插入val(0下标为第一个位置) 8、删除第一个关键字key …...
mongodb是怎么分库分表的
在构建高性能的数据库架构时,MongoDB的分库分表策略扮演着至关重要的角色,它通过一系列精细的步骤确保了数据的高效分布与访问。以下是对这一过程的详尽阐述,旨在提供一个清晰且优化过的理解框架。 确定分片键(Shard Key…...
C++自研游戏引擎-碰撞检测组件-八叉树AABB检测算法实现
八叉树碰撞检测是一种在三维空间中高效处理物体碰撞检测的算法,其原理可以类比为一个管理三维空间物体的智能系统。这个示例包含两个部分:八叉树部分用于宏观检测,AABB用于微观检测。AABB可以更换为均值或节点检测来提高检测精度。 八叉树的…...

spring boot对接clerk 实现用户信息获取
在现代Web应用中,用户身份验证和管理是一个关键的功能。Clerk是一个提供身份验证和用户管理的服务,可以帮助开发者快速集成这些功能。在本文中,我们将介绍如何使用Spring Boot对接Clerk,以实现用户信息的获取。 1.介绍 Clerk提供…...
一种动态地址的查询
背景 当我们注入一个进程,通过函数地址进行call时经常会遇到这样的一个问题。对方程序每周四会自动更新。更新后之前的函数地址就变化了,然后需要重新找地址。所以,我就使用了一个动态查询的方式。 第一步:先为需要call的函数生…...

周雨彤:用角色与生活,诠释审美的艺术
提到内娱审美优秀且持续在线的女演员,周雨彤绝对是其中最有代表性的一个。 独树一帜的表演美学 作为新生代演员中的实力派代表,周雨彤凭借细腻的表演和对角色的深度共情,在荧幕上留下了多个令人难忘的“出圈”形象。在《我在他乡挺好的》中…...

使用jks给空apk包签名
1、在平台官方下载空的apk包(上传应用时有提醒下载) 2、找到jdk目录,比如C:\Program Files\Java\jdk1.8\bin,并把下载的空包apk和jks文件放到bin下 3、以管理员身份运行cmd,如果不是管理员会签名失败 4、用cd定位到…...

500. 键盘行 771. 宝石与石头 简单 find接口的使用
500. 键盘行1 给你一个字符串数组 words ,只返回可以使用在 美式键盘 同一行的字母打印出来的单词。键盘如下图所示。 请注意,字符串 不区分大小写,相同字母的大小写形式都被视为在同一行。 美式键盘 中: 第一行由字符 "qwer…...

仙剑世界手游新手攻略 仙剑世界能用云手机玩吗
欢迎来到《仙剑世界》手游的仙侠世界!作为新手玩家,以下是一些详细的攻略和建议,帮助你快速上手并享受游戏的乐趣。 一、新手职业推荐 1.轩辕:这是一个偏辅助的职业,可以给队友提供输出加成等增益效果,不过…...
[题解]2024CCPC重庆站-小 C 的神秘图形
Sources:K - 小 C 的神秘图形Abstract:给定正整数 n ( 1 ≤ n ≤ 1 0 5 ) n(1\le n\le 10^5) n(1≤n≤105),三进制字符串 n 1 , n 2 ( ∣ n 1 ∣ ∣ n 2 ∣ n ) n_1,n_2(|n_1||n_2|n) n1,n2(∣n1∣∣n2∣n),按如下方法…...

NPS内网穿透SSH使用手册
1、说明 nps-一款轻量级、高性能、功能强大的内网穿透代理服务器 github地址:https://github.com/ehang-io/nps 官网文档地址:https://ehang-io.github.io/nps/#/?idnps 2、服务端 下载地址:https://github.com/ehang-io/nps/releases 下…...

大幂计算和大阶乘计算【C语言】
大幂计算: #include<stdio.h> long long int c[1000000]{0}; int main() {long long a,b,x1;c[0]1;printf("请输入底数:");scanf("%lld",&a);printf("请输入指数:");scanf("%lld",&b…...

【Linux】详谈 进程控制
目录 一、进程是什么 二、task_struct 三、查看进程 四、创建进程 4.1 fork函数的认识 4.2 2. fork函数的返回值 五、进程终止 5.1. 进程退出的场景 5.2. 进程常见的退出方法 5.2.1 从main返回 5.2.1.1 错误码 5.2.2 exit函数 5.2.3 _exit函数 5.2.4 缓冲区问题补…...
Linux top 命令
作用 top 是一个实时系统监控工具,用于查看系统的资源使用情况和进程状态。 示例 以下是一些常用的 top 命令示例: top :动态显示结果,每 3 秒刷新一次。 top -d 2:动态显示结果,每 2 秒刷新一次。 top …...

Leetcode 424-替换后的最长重复字符
给你一个字符串 s 和一个整数 k 。你可以选择字符串中的任一字符,并将其更改为任何其他大写英文字符。该操作最多可执行 k 次。 在执行上述操作后,返回 包含相同字母的最长子字符串的长度。 题解 可以先做LCR 167/Leetcode 03再做本题 滑动窗口&…...

《StyleDiffusion:通过扩散模型实现可控的解耦风格迁移》学习笔记
paper:2308.07863 目录 摘要 1、介绍 2、相关工作 2.1 神经风格迁移(NST) 2.2 解耦表示学习(DRL) 2.3 扩散模型(Diffusion Models) 3、方法 3.1 风格移除模块 3.2 风格转移模块 3.3 …...

Django 创建表时 “__str__ ”方法的使用
在 Django 模型中,__str__ 方法是一个 Python 特殊方法(也称为“魔术方法”),用于定义对象的字符串表示形式。它的作用是控制当对象被转换为字符串时,应该返回什么样的内容。 示例: 我在初学ModelForm时尝…...
图像处理之CSC
CSC 是 Color Space Conversion(色彩空间转换)的缩写,它涉及图像处理中的亮度、饱和度、对比度和色度等参数的调整。这些参数是图像处理中的核心概念,通常用于描述和操作图像的颜色信息。 以下是亮度、饱和度、对比度和色度与 CS…...

C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?
uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件,用于在原生应用中加载 HTML 页面: 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...