当前位置: 首页 > news >正文

rabbitmq五种模式的总结——附java-se实现(详细)

rabbitmq五种模式的总结

完整项目地址:https://github.com/9lucifer/rabbitmq4j-learning

在这里插入图片描述

一、简单模式

(一)简单模式概述

RabbitMQ 的简单模式是最基础的消息队列模式,包含以下两个角色:

  1. 生产者:负责发送消息到队列。
  2. 消费者:负责从队列中接收并处理消息。

在简单模式中,消息的传递是单向的,生产者将消息发送到队列,消费者从队列中接收消息。

image-20250216063036914


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._01_hello;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦)");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("01-hello2", true, false, false, null);// 6. 发送消息/*** 参数说明:* 1. 交换机名称:空字符串(使用默认交换机)* 2. 路由键:队列名称(01-hello2)* 3. 额外属性:null* 4. 消息内容:字节数组*/channel.basicPublish("", "01-hello2", null, "hello rabbitmq2".getBytes());System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
结果

image-20250216063335314


(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._01_hello_c;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("ip(要换成真实的ip哦");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("01-hello2", false, false, false, null);// 6. 接收消息/*** 参数说明:* 1. 队列名称:01-hello2* 2. 是否自动确认:true(消息被消费后自动确认)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("01-hello2", true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {System.out.println("接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
结果

image-20250216063418597

在mq中查看

image-20250216063443454


(四)总结

  1. 简单模式:适用于一对一的简单消息传递场景。
  2. 生产者:负责创建队列并发送消息。
  3. 消费者:负责从队列中接收并处理消息。
  4. 注意事项
    • 队列名称需保持一致,不然一定会报错!
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用完资源后需显式关闭 ChannelConnection

二、工作模式

(一)工作模式概述

工作模式是 RabbitMQ 的一种常见模式,用于将任务分发给多个消费者。它的特点是:

  1. 一个生产者:负责发送消息到队列。
  2. 多个消费者:共同消费同一个队列中的消息。
  3. 消息分发机制:默认情况下,RabbitMQ 会以轮询(Round-Robin)的方式将消息分发给消费者。

工作模式适用于任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

image-20250216065036476


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的队列中。

package top.miqiu._02_work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否持久化:true(重启后队列仍然存在)* 3. 是否独占队列:false(允许多个消费者连接)* 4. 是否自动删除:false(队列不会自动删除)* 5. 额外参数:null*/channel.queueDeclare("02-work1", true, false, false, null);// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;channel.basicPublish("", "02-work1", null, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 队列声明(queueDeclare):创建队列并设置队列属性。
  2. 消息发送(basicPublish):通过循环发送多条消息到队列。
  3. 持久化队列:设置为 true,确保队列在 RabbitMQ 重启后仍然存在。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._02_work;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!别忘了改");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明队列(需与生产者保持一致)channel.queueDeclare("02-work1", true, false, false, null);// 6. 设置每次只接收一条消息channel.basicQos(1);// 7. 接收消息/*** 参数说明:* 1. 队列名称:02-work1* 2. 是否自动确认:false(手动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume("02-work1", false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 队列声明(queueDeclare):确保队列存在,需与生产者保持一致。
  2. 消息预取(basicQos):设置每次只接收一条消息,避免某个消费者处理过多消息。
  3. 手动确认(basicAck):消息处理完成后手动确认,确保消息不会丢失。
  4. 消息处理耗时:通过 Thread.sleep(1000) 模拟消息处理耗时。
效果

image-20250216065155794

image-20250216065214992


(四)工作模式的特点

  1. 消息分发机制
    • 默认情况下,RabbitMQ 会以轮询的方式将消息分发给多个消费者。
    • 可以通过 basicQos 设置每次只接收一条消息,避免某个消费者处理过多消息。
  2. 消息确认机制
    • 设置为手动确认(autoAck=false),确保消息处理完成后才确认。(防止业务处理失败的情况下丢失消息)
    • 如果消费者在处理消息时崩溃,未确认的消息会重新分发给其他消费者。
  3. 适用场景
    • 任务分发场景,例如将耗时的任务分发给多个 Worker 处理。

(五)总结

  1. 工作模式:适用于任务分发场景,多个消费者共同消费同一个队列中的消息。
  2. 生产者:负责发送消息到队列。
  3. 消费者:负责接收并处理消息,支持手动确认和消息预取。
  4. 注意事项
    • 队列名称需保持一致。
    • 消息确认机制需根据业务需求选择自动或手动确认。
    • 使用 basicQos 控制消息分发,避免某个消费者处理过多消息。

三、发布订阅模式

(一)发布订阅模式概述

发布订阅模式(Publish/Subscribe Mode)是 RabbitMQ 的一种模式,用于将消息广播给多个消费者。它的特点是:

  1. 一个生产者:将消息发送到交换机(Exchange)。
  2. 多个消费者:每个消费者都有自己的队列,并与交换机绑定。
  3. 消息广播:交换机将消息广播给所有绑定的队列。

发布订阅模式适用于消息广播场景,例如日志系统、通知系统等。

image-20250216071658856


(二)生产者代码解析

生产者负责创建消息并将其发送到指定的交换机中。

package top.miqiu._03_pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 交换机类型:fanout(广播模式)*/channel.exchangeDeclare("03-pubsub", "fanout");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:03-pubsub* 2. 路由键:空字符串(fanout 模式忽略路由键)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("03-pubsub", "", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建交换机并设置类型为 fanout(广播模式)。
  2. 消息发送(basicPublish):将消息发送到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  3. 消息广播:消息会被广播到所有绑定到该交换机的队列。

(三)消费者代码解析

代码

消费者负责从队列中接收并处理消息。

package top.miqiu._03_pubsub;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明交换机channel.exchangeDeclare("03-pubsub", "fanout");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:03-pubsub* 3. 路由键:空字符串(fanout 模式忽略路由键)*/channel.queueBind(queue, "03-pubsub", "");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,路由键为空字符串(fanout 模式忽略路由键)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
结果

image-20250216071734904

image-20250216071749761

可以看到两个consumer都消费了相同的消息


(四)发布订阅模式的特点

  1. 消息广播:交换机将消息广播给所有绑定的队列。
  2. 临时队列:消费者可以创建临时队列,队列名称由 RabbitMQ 自动生成。
  3. 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。

(五)总结

  1. 发布订阅模式:适用于消息广播场景,多个消费者各自接收相同的消息。
  2. 生产者:负责将消息发送到交换机。
  3. 消费者:负责创建队列并绑定到交换机,接收并处理消息。
  4. 注意事项
    • 交换机类型需设置为 fanout
    • 队列绑定到交换机时,路由键为空字符串。
    • 临时队列的名称由 RabbitMQ 自动生成。

(六)RabbitMQ 交换机类型总结

交换机类型描述路由行为适用场景
Fanout广播模式,将消息发送到所有绑定到该交换机的队列。忽略路由键(Routing Key),消息会被广播到所有绑定的队列。日志系统、通知系统等需要广播消息的场景。
Direct直接模式,根据路由键将消息发送到匹配的队列。消息的路由键必须与队列绑定的路由键完全匹配。任务分发、点对点通信等需要精确路由的场景。
Topic主题模式,根据路由键的模式匹配将消息发送到符合条件的队列。支持通配符匹配:* 匹配一个单词,# 匹配零个或多个单词。消息分类、多条件路由等需要灵活匹配的场景。
Headers头部模式,根据消息的头部属性(Headers)进行匹配。不依赖路由键,而是通过消息的头部属性匹配队列绑定的条件。复杂的路由逻辑,例如根据消息的元数据进行路由。

详细说明

1. Fanout 交换机(广播,常用)
  • 特点
    • 消息会被广播到所有绑定到该交换机的队列。
    • 忽略路由键(Routing Key)。
  • 适用场景
    • 日志系统:将日志消息广播给多个消费者。
    • 通知系统:将通知消息广播给多个用户。
2. Direct 交换机
  • 特点
    • 消息的路由键必须与队列绑定的路由键完全匹配。
    • 支持一对一或一对多的精确路由。
  • 适用场景
    • 任务分发:将特定任务路由到特定的 Worker。
    • 点对点通信:将消息发送到特定的接收者。
3. Topic 交换机
  • 特点
    • 支持通配符匹配:
      • * 匹配一个单词。
      • # 匹配零个或多个单词。
    • 路由键的格式通常是点分字符串(如 user.create)。
  • 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。
4. Headers 交换机
  • 特点
    • 不依赖路由键,而是通过消息的头部属性(Headers)进行匹配。
    • 支持复杂的匹配规则(如 x-match 参数)。
  • 适用场景
    • 复杂的路由逻辑:根据消息的元数据进行路由。
    • 需要高度灵活性的场景。

对比
场景FanoutDirectTopicHeaders
日志广播所有消费者接收相同的日志消息。不适用。不适用。不适用。
任务分发不适用。将任务路由到特定的 Worker。将任务分类路由到不同的 Worker。根据任务的元数据进行路由。
通知系统所有用户接收相同的通知。特定用户接收特定通知。根据通知类型路由到不同用户。根据通知的元数据进行路由。
消息分类不适用。不适用。根据消息主题进行路由。根据消息的头部属性进行路由。

总结
  • Fanout:适用于广播场景。
  • Direct:适用于精确路由场景。
  • Topic:适用于灵活的路由场景。
  • Headers:适用于复杂的路由逻辑。

四、路由模式

(一)路由模式概述

路由模式是 RabbitMQ 的一种模式,使用 Direct 交换机 根据消息的 路由键(Routing Key) 将消息发送到匹配的队列。它的特点是:

  1. 一个生产者:将消息发送到 Direct 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键,只有匹配的路由键的消息才会被接收。
  3. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。

路由模式适用于需要根据特定条件精确路由消息的场景,例如日志级别分类、任务分发等。

image-20250216073521308


(二)生产者代码解析

生产者负责创建消息并将其发送到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机/*** 参数说明:* 1. 交换机名称:04-routing* 2. 交换机类型:direct*/channel.exchangeDeclare("04-routing", "direct");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:04-routing* 2. 路由键:err(消息将发送到绑定 err 路由键的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("04-routing", "err", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Direct 交换机,类型为 direct
  2. 消息发送(basicPublish):指定路由键(如 err),消息会被发送到绑定该路由键的队列。
  3. 路由键匹配:只有队列绑定的路由键与消息的路由键完全匹配时,消息才会被路由到该队列。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Direct 交换机,同时指定路由键。

package top.miqiu._04_routing;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("你的ip!!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Direct 交换机channel.exchangeDeclare("04-routing", "direct");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:04-routing* 3. 路由键:info、err、waring*/channel.queueBind(queue, "04-routing", "info");channel.queueBind(queue, "04-routing", "err");channel.queueBind(queue, "04-routing", "waring");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Direct 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键(如 infoerrwaring)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

consumer1绑定了[info,err,waring],所以在producer绑定了info时发送消息的情况下,consumer1可以接收到信息

image-20250216073403669

由于consumer2绑定的是trace,所以consumer2是接收不到消息的

image-20250216073447112


(四)路由模式的特点

  1. 精确路由:消息的路由键必须与队列绑定的路由键完全匹配。
  2. 多路由键支持:一个队列可以绑定多个路由键,接收多种类型的消息。
  3. 适用场景
    • 日志级别分类:将不同级别的日志(如 infoerr)路由到不同的队列。
    • 任务分发:将特定任务路由到特定的 Worker。

(五)总结

  1. 路由模式:适用于需要根据路由键精确路由消息的场景。
  2. 生产者:负责将消息发送到 Direct 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Direct 交换机,同时指定路由键。
  4. 注意事项
    • 路由键必须完全匹配。
    • 一个队列可以绑定多个路由键,接收多种类型的消息。

五、Topic 模式

(一)Topic 模式概述

Topic 模式是 RabbitMQ 的一种模式,使用 Topic 交换机 根据消息的 路由键(Routing Key) 进行模式匹配,将消息发送到符合条件的队列。它的特点是:

  1. 一个生产者:将消息发送到 Topic 交换机,并指定路由键。
  2. 多个消费者:每个消费者可以绑定一个或多个路由键模式,只有匹配的路由键的消息才会被接收。
  3. 灵活的路由:支持通配符匹配:
    • * 匹配一个单词。
    • # 匹配零个或多个单词。

Topic 模式适用于需要根据复杂条件灵活路由消息的场景,例如消息分类、多条件路由等。

image-20250216075115591


(二)生产者代码解析

代码

生产者负责创建消息并将其发送到 Topic 交换机,同时指定路由键。

package top.miqiu._05_topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机/*** 参数说明:* 1. 交换机名称:05-topic* 2. 交换机类型:topic*/channel.exchangeDeclare("05-topic", "topic");// 6. 发送消息for (int i = 0; i < 20; i++) {String message = "hello work:" + i;/*** 参数说明:* 1. 交换机名称:05-topic* 2. 路由键:user.hi(消息将发送到匹配 user.* 或 user.# 的队列)* 3. 消息属性:MessageProperties.TEXT_PLAIN* 4. 消息内容:字节数组*/channel.basicPublish("05-topic", "user.hi", MessageProperties.TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");// 7. 关闭资源channel.close();connection.close();}
}
关键点:
  1. 交换机声明(exchangeDeclare):创建 Topic 交换机,类型为 topic
  2. 消息发送(basicPublish):指定路由键(如 user.hi),消息会被发送到匹配的队列。
  3. 通配符匹配
    • * 匹配一个单词(如 user.* 匹配 user.hi,但不匹配 user.hi.there)。
    • # 匹配零个或多个单词(如 user.# 匹配 user.hiuser.hi.there)。

(三)消费者代码解析

代码

消费者负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。

package top.miqiu._05_topic;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置 RabbitMQ 服务器的 IP、端口、用户名和密码connectionFactory.setHost("用自己的ip!!");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");// 3. 创建连接对象Connection connection = connectionFactory.newConnection();// 4. 创建 ChannelChannel channel = connection.createChannel();// 5. 声明 Topic 交换机channel.exchangeDeclare("05-topic", "topic");// 6. 创建临时队列String queue = channel.queueDeclare().getQueue();// 7. 绑定队列到交换机,并指定路由键模式/*** 参数说明:* 1. 队列名称:queue* 2. 交换机名称:05-topic* 3. 路由键模式:user.*(匹配 user.hi、user.hello 等)*/channel.queueBind(queue, "05-topic", "user.*");// 8. 接收消息/*** 参数说明:* 1. 队列名称:queue* 2. 是否自动确认:true(自动确认消息)* 3. 消息处理回调:DeliverCallback* 4. 消息取消回调:CancelCallback*/channel.basicConsume(queue, true, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery delivery) throws IOException {try {// 模拟消息处理耗时Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2 user.* 接收到消息:" + new String(delivery.getBody()));}}, new CancelCallback() {@Overridepublic void handle(String consumerTag) throws IOException {System.out.println("消息被取消");}});}
}
关键点:
  1. 交换机声明(exchangeDeclare):确保 Topic 交换机存在,需与生产者保持一致。
  2. 临时队列(queueDeclare):创建一个临时队列,队列名称由 RabbitMQ 自动生成。
  3. 队列绑定(queueBind):将队列绑定到交换机,并指定路由键模式(如 user.*)。
  4. 消息接收(basicConsume):从队列中接收消息并处理。
效果

当我在producer使用“employee.hi”作为路由key的时候,绑定了“employee.*”的consumer1可以消费这个消息

image-20250216075306371


(四)Topic 模式的特点

  1. 灵活的路由:支持通配符匹配,可以根据复杂的条件路由消息。
  2. 多路由键支持:一个队列可以绑定多个路由键模式,接收多种类型的消息。
  3. 适用场景
    • 消息分类:根据消息的主题进行路由。
    • 多条件路由:支持灵活的路由规则。

(五)总结

  1. Topic 模式:适用于需要根据复杂条件灵活路由消息的场景。
  2. 生产者:负责将消息发送到 Topic 交换机,并指定路由键。
  3. 消费者:负责创建队列并绑定到 Topic 交换机,同时指定路由键模式。
  4. 注意事项
    • 路由键模式支持通配符 *#
    • 一个队列可以绑定多个路由键模式,接收多种类型的消息。

相关文章:

rabbitmq五种模式的总结——附java-se实现(详细)

rabbitmq五种模式的总结 完整项目地址&#xff1a;https://github.com/9lucifer/rabbitmq4j-learning 一、简单模式 &#xff08;一&#xff09;简单模式概述 RabbitMQ 的简单模式是最基础的消息队列模式&#xff0c;包含以下两个角色&#xff1a; 生产者&#xff1a;负责发…...

Qt中基于开源库QRencode生成二维码(附工程源码链接)

目录 1.QRencode简介 2.编译qrencode 3.在Qt中直接使用QRencode源码 3.1.添加源码 3.2.用字符串生成二维码 3.3.用二进制数据生成二维码 3.4.界面设计 3.5.效果展示 4.注意事项 5.源码下载 1.QRencode简介 QRencode是一个开源的库&#xff0c;专门用于生成二维码&…...

Java数据结构---链表

目录 一、链表的概念和结构 1、概念 2、结构 二、链表的分类 三、链表的实现 1、创建节点类 2、定义表头 3、创建链表 4、打印链表 5、链表长度 6、看链表中是否包含key 7、在index位置插入val&#xff08;0下标为第一个位置&#xff09; 8、删除第一个关键字key …...

mongodb是怎么分库分表的

在构建高性能的数据库架构时&#xff0c;MongoDB的分库分表策略扮演着至关重要的角色&#xff0c;它通过一系列精细的步骤确保了数据的高效分布与访问。以下是对这一过程的详尽阐述&#xff0c;旨在提供一个清晰且优化过的理解框架。 确定分片键&#xff08;Shard Key&#xf…...

C++自研游戏引擎-碰撞检测组件-八叉树AABB检测算法实现

八叉树碰撞检测是一种在三维空间中高效处理物体碰撞检测的算法&#xff0c;其原理可以类比为一个管理三维空间物体的智能系统。这个示例包含两个部分&#xff1a;八叉树部分用于宏观检测&#xff0c;AABB用于微观检测。AABB可以更换为均值或节点检测来提高检测精度。 八叉树的…...

spring boot对接clerk 实现用户信息获取

在现代Web应用中&#xff0c;用户身份验证和管理是一个关键的功能。Clerk是一个提供身份验证和用户管理的服务&#xff0c;可以帮助开发者快速集成这些功能。在本文中&#xff0c;我们将介绍如何使用Spring Boot对接Clerk&#xff0c;以实现用户信息的获取。 1.介绍 Clerk提供…...

一种动态地址的查询

背景 当我们注入一个进程&#xff0c;通过函数地址进行call时经常会遇到这样的一个问题。对方程序每周四会自动更新。更新后之前的函数地址就变化了&#xff0c;然后需要重新找地址。所以&#xff0c;我就使用了一个动态查询的方式。 第一步&#xff1a;先为需要call的函数生…...

周雨彤:用角色与生活,诠释审美的艺术

提到内娱审美优秀且持续在线的女演员&#xff0c;周雨彤绝对是其中最有代表性的一个。 独树一帜的表演美学 作为新生代演员中的实力派代表&#xff0c;周雨彤凭借细腻的表演和对角色的深度共情&#xff0c;在荧幕上留下了多个令人难忘的“出圈”形象。在《我在他乡挺好的》中…...

使用jks给空apk包签名

1、在平台官方下载空的apk包&#xff08;上传应用时有提醒下载&#xff09; 2、找到jdk目录&#xff0c;比如C:\Program Files\Java\jdk1.8\bin&#xff0c;并把下载的空包apk和jks文件放到bin下 3、以管理员身份运行cmd&#xff0c;如果不是管理员会签名失败 4、用cd定位到…...

500. 键盘行 771. 宝石与石头 简单 find接口的使用

500. 键盘行1 给你一个字符串数组 words &#xff0c;只返回可以使用在 美式键盘 同一行的字母打印出来的单词。键盘如下图所示。 请注意&#xff0c;字符串 不区分大小写&#xff0c;相同字母的大小写形式都被视为在同一行。 美式键盘 中&#xff1a; 第一行由字符 "qwer…...

仙剑世界手游新手攻略 仙剑世界能用云手机玩吗

欢迎来到《仙剑世界》手游的仙侠世界&#xff01;作为新手玩家&#xff0c;以下是一些详细的攻略和建议&#xff0c;帮助你快速上手并享受游戏的乐趣。 一、新手职业推荐 1.轩辕&#xff1a;这是一个偏辅助的职业&#xff0c;可以给队友提供输出加成等增益效果&#xff0c;不过…...

[题解]2024CCPC重庆站-小 C 的神秘图形

Sources&#xff1a;K - 小 C 的神秘图形Abstract&#xff1a;给定正整数 n ( 1 ≤ n ≤ 1 0 5 ) n(1\le n\le 10^5) n(1≤n≤105)&#xff0c;三进制字符串 n 1 , n 2 ( ∣ n 1 ∣ ∣ n 2 ∣ n ) n_1,n_2(|n_1||n_2|n) n1​,n2​(∣n1​∣∣n2​∣n)&#xff0c;按如下方法…...

NPS内网穿透SSH使用手册

1、说明 nps-一款轻量级、高性能、功能强大的内网穿透代理服务器 github地址&#xff1a;https://github.com/ehang-io/nps 官网文档地址&#xff1a;https://ehang-io.github.io/nps/#/?idnps 2、服务端 下载地址&#xff1a;https://github.com/ehang-io/nps/releases 下…...

大幂计算和大阶乘计算【C语言】

大幂计算&#xff1a; #include<stdio.h> long long int c[1000000]{0}; int main() {long long a,b,x1;c[0]1;printf("请输入底数&#xff1a;");scanf("%lld",&a);printf("请输入指数&#xff1a;");scanf("%lld",&b…...

【Linux】详谈 进程控制

目录 一、进程是什么 二、task_struct 三、查看进程 四、创建进程 4.1 fork函数的认识 4.2 2. fork函数的返回值 五、进程终止 5.1. 进程退出的场景 5.2. 进程常见的退出方法 5.2.1 从main返回 5.2.1.1 错误码 5.2.2 exit函数 5.2.3 _exit函数 5.2.4 缓冲区问题补…...

Linux top 命令

作用 top 是一个实时系统监控工具&#xff0c;用于查看系统的资源使用情况和进程状态。 示例 以下是一些常用的 top 命令示例&#xff1a; top &#xff1a;动态显示结果&#xff0c;每 3 秒刷新一次。 top -d 2&#xff1a;动态显示结果&#xff0c;每 2 秒刷新一次。 top …...

Leetcode 424-替换后的最长重复字符

给你一个字符串 s 和一个整数 k 。你可以选择字符串中的任一字符&#xff0c;并将其更改为任何其他大写英文字符。该操作最多可执行 k 次。 在执行上述操作后&#xff0c;返回 包含相同字母的最长子字符串的长度。 题解 可以先做LCR 167/Leetcode 03再做本题 滑动窗口&…...

《StyleDiffusion:通过扩散模型实现可控的解耦风格迁移》学习笔记

paper&#xff1a;2308.07863 目录 摘要 1、介绍 2、相关工作 2.1 神经风格迁移&#xff08;NST&#xff09; 2.2 解耦表示学习&#xff08;DRL&#xff09; 2.3 扩散模型&#xff08;Diffusion Models&#xff09; 3、方法 3.1 风格移除模块 3.2 风格转移模块 3.3 …...

Django 创建表时 “__str__ ”方法的使用

在 Django 模型中&#xff0c;__str__ 方法是一个 Python 特殊方法&#xff08;也称为“魔术方法”&#xff09;&#xff0c;用于定义对象的字符串表示形式。它的作用是控制当对象被转换为字符串时&#xff0c;应该返回什么样的内容。 示例&#xff1a; 我在初学ModelForm时尝…...

图像处理之CSC

CSC 是 Color Space Conversion&#xff08;色彩空间转换&#xff09;的缩写&#xff0c;它涉及图像处理中的亮度、饱和度、对比度和色度等参数的调整。这些参数是图像处理中的核心概念&#xff0c;通常用于描述和操作图像的颜色信息。 以下是亮度、饱和度、对比度和色度与 CS…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

汽车生产虚拟实训中的技能提升与生产优化​

在制造业蓬勃发展的大背景下&#xff0c;虚拟教学实训宛如一颗璀璨的新星&#xff0c;正发挥着不可或缺且日益凸显的关键作用&#xff0c;源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例&#xff0c;汽车生产线上各类…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

MySQL 部分重点知识篇

一、数据库对象 1. 主键 定义 &#xff1a;主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 &#xff1a;确保数据的完整性&#xff0c;便于数据的查询和管理。 示例 &#xff1a;在学生信息表中&#xff0c;学号可以作为主键&#xff…...

Golang——7、包与接口详解

包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...

vue3 daterange正则踩坑

<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...

五子棋测试用例

一.项目背景 1.1 项目简介 传统棋类文化的推广 五子棋是一种古老的棋类游戏&#xff0c;有着深厚的文化底蕴。通过将五子棋制作成网页游戏&#xff0c;可以让更多的人了解和接触到这一传统棋类文化。无论是国内还是国外的玩家&#xff0c;都可以通过网页五子棋感受到东方棋类…...