【RabbitMQ】队列模型
1.概述
RabbitMQ作为消息队列,有6种队列模型,分别在不同的场景进行使用,分别是Hello World,Work queues,Publish/Subscribe,Routing,Topics,RPC。
下面就分别对几个模型进行讲述。
2.Hello World
这个模型也叫直连模型,从这个名字来看就知道它的原理很简单,是一个线性的且没有分支的模型。

生产者生产消息,把消息给队列,队列在把消息给消费者进行消费。
代码实现
工具类
public class RabbitMQUtils {public static Connection getConnection() throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.153.132");//设置rabbitmq的主机地址factory.setPort(5672);//设置rabbitmq的端口号factory.setUsername("admin");//设置用户名factory.setPassword("admin");//设置密码factory.setVirtualHost("/abc");//设置虚拟主机//根据连接工厂获取一个连接对象Connection connection = factory.newConnection();return connection;}
}
生产者
/*** 基于HelloWorld消息模型的消息生产者*/
public class MessageProvider1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection= RabbitMQUtils.getConnection();//根据连接对象创建一个Channel(信道)Channel channel = connection.createChannel();/*** 根据Channel声明一个队列* 参数1:队列的名称* 参数2:描述队列是否持久化(true 讲队列以文件的形式保存在硬盘上,下一次启动mq服务还可以看到该队列)* 参数3:是否独占队列(true 只有当前会话才能使用该队列)* 参数4:队列是否会自动删除* 参数5:对队列进行的额外设置*/channel.queueDeclare("hello", true, false, false, null);/*** 发布消息* 参数1:交换机的名称* 参数2:队列的名称* 参数3:消息进行额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN消息持久化* 参数4:消息主体,以字节数组的方式进行消息的发送*/channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());//关闭资源channel.close();connection.close();}
}
消费者部分
/*** 基于HelloWorld消息模型的消息消费者*/
public class MessageConsumer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection= RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明一个队列channel.queueDeclare("hello", true, false, false, null);/*** 消息的消费* 参数1:队列的名称* 参数2:是否开启自动确认机制 true 开启* 参数3:处理队列里面消息的回调函数*/channel.basicConsume("hello",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println(msg);}});}
}
这时候先启动消费者等待生产者的消息,然后再开启生产者,那么消费者就会接收到消息,这里是”Hello rabbit“
这里的队列名称要自己再RabbitMQ的管理界面进行创建

在这个界面可以进行手动创建
3.Work queues
它与第一种模型不同的是,消息队列不再是单一的讲消息传送给一个消费者,而是可以传送给多个消费者。它就解决了生产者生产消息的速率大于消费者消费消息的速率,使消息不断地堆积在队列中。
同样的它的模型也很简单,也是生产消息给队列,队列再将消息分给队列。
代码实现
生产者
/*** 消息的生产者 -- 工作队列实现*/
public class MessageProvider2 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare("work", false, false, false, null);// 发送消息for (int i = 1; i <= 10; i++) {String message = "hello,SpringCloud" + i;channel.basicPublish("", "work", null, message.getBytes());}// 关闭资源channel.close();connection.close();}
}
消费者
public class MessageConsumer_q2 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();final Channel channel = connection.createChannel();//声明队列channel.queueDeclare("work",false,false,false,null);//消费消息 参数2 false 关闭消息的自动确认机制channel.basicConsume("work",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消息是:" + new String(body));}});}
}
消费者的代码都一样的,就不重复写了。
但现在要思考一个问题,就是我这里的代码是开启了消息的自动确认机制,就是意思是队列会将消息按照顺序平均分配给消费者,也就是消费者会先再队列中确认消息(确认意味着消息从队列中删除),这样就会出现一个问题,有的消费者消费得快,有得消费者消费得慢,这样得结果显然不是我们想要的,我们肯定希望能者多劳,也就是按需分配,而不是按量平均分配 。还有一个问题,就是消费者一下子确认这么多消息然后慢慢消费,万一某个消费者在消费的时候挂掉了,这些消息都已经不在队列中了,那不就直接导致消息丢失了。
因此我们需要对消息进行手动确认而不是自动确认
public class MessageConsumer_q1 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();final Channel channel = connection.createChannel();//声明队列channel.queueDeclare("work", false, false, false, null);//消费消息 第二个参数改成false,即关闭自动确认channel.basicConsume("work", false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 开启手动确认 不再同时确认多条消息channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("消息是:" + new String(body));}});}
}
4.fanout
这个模型就相对复杂点,多了一个交换机。

广播类型的交换机可以和任意类型的队列进行配对。
每个消费者有自己的队列。
每个队列都要绑定到交换机.
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
交换机把消息发送给绑定过的所有队列 队列的消费者都能拿到消息。
实现一条消息被多个消费者消费
代码实现
生产者
public class MessageProvider3 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();/*** 声明交换机* 参数1:交换机的名称* 参数2:交换机的类型 fanout广播类型的交换机(可以和任何队列进行绑定)*/channel.exchangeDeclare("exchange_fanout","fanout");/*** 发布消息* 参数1:交换机的名称* 参数2;路由key 由于是广播类型的交换机,所以不用定义路由key* 参数3:对消息进行的额外设置* 参数4:消息主体*/channel.basicPublish("exchange_fanout","",null,"hello rabbitmq".getBytes());}
}
消费者
public class MessageConsumer3 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("exchange_fanout", "fanout");//声明队列channel.queueDeclare("q1", false, false, false, null);//将交换机和队列进行绑定 参数1: 队列的名称 参数2: 交换机的名称 参数3: 路由keychannel.queueBind("q1", "exchange_fanout", "");//消费消息channel.basicConsume("q1", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}
}
一条消息会被所有消费者消费
5.Direct
在前面的模型中,一条消息被所有的消费者消费,但是在某些特定的场景下,我们希望不同的消息被不同的队列消费

生产者向交换机发送消息时,会指定一个routing key
交换机在接收到消息时会根据这个key来对它绑定的队列的routing key进行匹配,匹配的才会发送过去。
代码实现
生产者
/*** 消息的生产者 -- direct消息模型*/
public class MessageProvider {public static void main(String[] args) throws Exception {// 获取与RabbitMQ的连接Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机,名称为exchange_direct,类型为directchannel.exchangeDeclare("exchange_direct", "direct");// 发布消息,发送到exchange_direct交换机,routingKey为add,消息内容为"hello, this is direct message"channel.basicPublish("exchange_direct", "add", null, "hello, this is direct message".getBytes());// 关闭信道channel.close();// 关闭连接connection.close();}
}
消费者1
/*** 消息的消费者1 -- direct类型*/
public class ConsumerProvider_1 {public static void main(String[] args) throws Exception {// 获取与RabbitMQ的连接Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机,名称为exchange_direct,类型为directchannel.exchangeDeclare("exchange_direct", "direct");// 声明队列,名称为queue_directchannel.queueDeclare("queue_direct", false, false, false, null);// 使用routingKey绑定队列和交换机,分别绑定add、update的routingKeychannel.queueBind("queue_direct", "exchange_direct", "add");channel.queueBind("queue_direct", "exchange_direct", "update");// 消费消息,设置为自动确认channel.basicConsume("queue_direct", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印接收到的消息内容System.out.println(new String(body));}});}
}
消费者2
/*** 消息的消费者2 -- direct类型*/
public class ConsumerProvider_2 {public static void main(String[] args) throws Exception {// 获取与RabbitMQ的连接Connection connection = RabbitMQUtils.getConnection();// 创建信道Channel channel = connection.createChannel();// 声明交换机,名称为exchange_direct,类型为directchannel.exchangeDeclare("exchange_direct", "direct");// 声明队列,名称为queue_direct1channel.queueDeclare("queue_direct1", false, false, false, null);// 使用routingKey绑定队列和交换机,分别绑定update、delete的routingKeychannel.queueBind("queue_direct1", "exchange_direct", "update");channel.queueBind("queue_direct1", "exchange_direct", "delete");// 消费消息,设置为自动确认channel.basicConsume("queue_direct1", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 打印接收到的消息内容System.out.println(new String(body));}});}
}
消费者1的队列与交换机绑定的routing key分别是 add、update
消费者2的队列与交换机绑定的routing key分别是update、delete
示例代码中的生产者生产的消息的routing key是add,那么只有消费者1能接收到消息。
6.Topics
这个模型与Direct的原理是一样的,差异就在于它的routing key是有模糊匹配的机制。
但是它的routing key一般是由多个单词拼在一起,中间由”.“进行隔开,例如item.insert

这里的通配符有两种
分别是*和#
*可以匹配一个单词
#可以匹配一个或多个单词
代码演示
生产者
public class MessageProvider5 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明topic类型的交换机channel.exchangeDeclare("topic_exchange", "topic");//发布消息channel.basicPublish("topic_exchange","user.product.add",null,"hello,this is topic message".getBytes());//关闭资源channel.close();connection.close();}
}
消费者1
public class MessageConsumer5_1 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("topic_exchange", "topic");//声明队列channel.queueDeclare("topic_q1", false, false, false, null);//将队列和交换机进行绑定channel.queueBind("topic_q1", "topic_exchange", "user.*");//消费消息channel.basicConsume("topic_q1", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}
}
消费者2
public class MessageConsumer5_2 {public static void main(String[] args) throws Exception {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("topic_exchange", "topic");//声明队列channel.queueDeclare("topic_q2", false, false, false, null);//将队列和交换机进行绑定channel.queueBind("topic_q2", "topic_exchange", "user.#");//消费消息channel.basicConsume("topic_q2", true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}});}
}
7.RPC(了解)

- 客户端:
- 与 RabbitMQ 建立连接并创建信道。
- 声明一个用于接收响应的回调队列。
- 生成一个唯一的
correlationId,用于匹配请求和响应。 - 发送请求消息到 RPC 队列,同时设置
replyTo为回调队列名称,correlationId为生成的唯一标识。 - 从回调队列接收响应消息,并根据
correlationId进行匹配。
- 服务端:
- 与 RabbitMQ 建立连接并创建信道。
- 声明 RPC 队列。
- 从 RPC 队列接收请求消息。
- 处理请求消息,生成响应结果。
- 将响应消息发送到
replyTo指定的回调队列,并带上相同的correlationId。
相关文章:
【RabbitMQ】队列模型
1.概述 RabbitMQ作为消息队列,有6种队列模型,分别在不同的场景进行使用,分别是Hello World,Work queues,Publish/Subscribe,Routing,Topics,RPC。 下面就分别对几个模型进行讲述。…...
StarRocks 助力首汽约车精细化运营
作者:任智红,首汽约车大数据负责人 更多交流,联系我们:https://wx.focussend.com/weComLink/mobileQrCodeLink/334%201%202/ffbe5 导读: 本文整理自首汽约车大数据负责人任智红在 StarRocks 年度峰会上的演讲…...
Springboot--Kafka客户端参数关键参数的调整方法
调整 Kafka 客户端参数需结合生产者、消费者和 Broker 的配置,以实现性能优化、可靠性保障或资源限制。以下是关键参数的调整方法和注意事项: 一、生产者参数调整 max.request.size 作用:限制单个请求的最大字节数(包括消…...
C++ 基类的虚析构函数与派生的析构函数关系
1、基类非虚析构函数,派生类析构函数,基类指针指向派生类 class Base { public:~Base() { // 非虚析构函数std::cout << "Base class destructor" << std::endl;} };class Derived : public Base { public:~Derived() { // 派生…...
解决Spring Boot上传默认限制文件大小和完善超限异常(若依框架)
文章目录 报错信息问题分析技术原理解决方法1️⃣调整 Spring Boot 配置文件2️⃣检查内嵌 Tomcat 配置(可选)3️⃣ 代码自定义配置(覆盖配置文件) 全局异常处理代码 报错信息 org.springframework.web.multipart.MaxUploadSizeE…...
AI平台如何实现推理?数算岛是一个开源的AI平台(主要用于管理和调度分布式AI训练和推理任务。)
数算岛是一个开源的AI平台,主要用于管理和调度分布式AI训练和推理任务。它基于Kubernetes构建,支持多种深度学习框架(如TensorFlow、PyTorch等)。以下是数算岛实现模型推理的核心原理、架构及具体实现步骤: 一、数算岛…...
痉挛性斜颈康复助力:饮食调养指南
痉挛性斜颈患者除了积极治疗,合理饮食也能辅助缓解症状,提升生活质量。其健康饮食可从以下方面着手: 高蛋白质食物助力肌肉修复 痉挛性斜颈会导致颈部肌肉异常收缩,消耗较多能量,蛋白质有助于肌肉的修复与维持。日常可…...
mysql镜像创建docker容器,及其可能遇到的问题
前提,已经弄好基本的docker服务了。 一、基本流程 1、目录准备 我自己的资料喜欢放在 /data 目录下,所以老规矩: 先进入 /data 目录: cd /data 创建 mysql 目录并进入: mkdir mysql cd mysql 2、镜像查找 docke…...
Dify平台
目录 安装介绍Dify:开源大语言模型应用开发平台核心功能应用场景架构设计优势 安装 基于RDS PostgreSQL与Dify平台构建AI应用 使用RDS PostgreSQL打造RAG应用 介绍 Dify是一个开源的大语言模型(LLM)应用开发平台,融合了后端即…...
荣耀90 GT信息
外观设计 屏幕:采用 6.7 英寸 AMOLED 荣耀绿洲护眼屏,超窄边框设计,其上边框 1.6mm,左右黑边 1.25mm,屏占较高,带来更广阔的视觉体验。屏幕还支持 120Hz 自由刷新率,可根据使用场景自动切换刷新…...
JavaEE——线程的状态
目录 前言1. NEW2. TERMINATED3. RUNNABLE4. 三种阻塞状态总结 前言 本篇文章来讲解线程的几种状态。在Java中,线程的状态是一个枚举类型,Thread.State。其中一共分为了六个状态。分别为:NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING, TERMI…...
spring mvc 在拦截器、控制器和视图中获取和使用国际化区域信息的完整示例
在拦截器、控制器和视图中获取和使用国际化区域信息的完整示例 1. 核心组件代码示例 1.1 配置类(Spring Boot) import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.spring…...
1021 Deepest Root
1021 Deepest Root 分数 25 全屏浏览 切换布局 作者 CHEN, Yue 单位 浙江大学 A graph which is connected and acyclic can be considered a tree. The height of the tree depends on the selected root. Now you are supposed to find the root that results in a highest…...
RuntimeError: Error(s) in loading state_dict for ChartParser
一 bug错误 最近使用千问大模型有一个bug,报错信息如下 raise RuntimeError(Error(s) in loading state_dict for {}:\n\t{}.format( RuntimeError: Error(s) in loading state_dict for ChartParser:Unexpected key(s) in state_dict: "pretrained_model.em…...
WHAT - React 惰性初始化
目录 在 React 中如何使用惰性初始化示例:常规初始化 vs. 惰性初始化1. 常规初始化2. 惰性初始化 为什么使用惰性初始化示例:从 localStorage 获取值并使用惰性初始化总结 在 React 中,惰性初始化(Lazy Initialization)…...
2025 年安徽交安安全员考试:利用记忆宫殿强化记忆
安徽考生在面对交安安全员考试繁杂的知识点时,记忆宫殿是强大的记忆工具。选择一个熟悉且空间结构清晰的场所作为记忆宫殿,如自己居住的房屋。将房屋的不同区域,如客厅、卧室、厨房等,分别对应不同知识板块,像客厅对应…...
安全编码课程 实验6 整数安全
实验项目 实现安全计数器:实现 Counter 结构,确保计数范围为 0~100。 实验要求: 1、使用 struct 封装计数值value; 2、计数器初值为 0; 3、increment() 方法增加计数,但不能超过 100; 4、decrem…...
解决上传PDF、视频、音频等格式文件到FTP站点时报错“将文件复制到FTP服务器时发生错误。请检查是否有权限将文件放到该服务器上”问题
一、问题描述 可以将文本文件(.txt格式),图像文件(.jpg、.png等格式)上传到我们的FTP服务器上;但是上传一些PDF文件、视频等文件时就会报错“ 将文件复制到FTP服务器时发生错误。请检查是否有权限将文件放到该服务器上。 详细信息: 200 Type set to l. 227 Entering Pas…...
【Linux操作系统】:信号
Linux操作系统下的信号 一、引言 首先我们可以简单理解一下信号的概念,信号,顾名思义,就是我们操作系统发送给进程的消息。举个简单的例子,我们在写C/C程序的时候,当执行a / 0类似的操作的时候,程序直接就挂…...
经典频域分析法(Bode图、Nyquist判据) —— 理论、案例与交互式 GUI 实现
目录 经典频域分析法(Bode图、Nyquist判据) —— 理论、案例与交互式 GUI 实现一、引言二、经典频域分析方法的基本原理2.1 Bode 图分析2.2 Nyquist 判据三、数学建模与公式推导3.1 一阶系统的频域响应3.2 多极系统的 Bode 图绘制3.3 Nyquist 判据的数学描述四、经典频域分析…...
使用scoop一键下载jdk和实现版本切换
安装 在 PowerShell 中输入下面内容,保证允许本地脚本的执行: set-executionpolicy remotesigned -scope currentuser然后执行下面的命令安装 Scoop: iwr -useb get.scoop.sh | iex国内用户可以使用镜像源安装:powershell iwr -us…...
对状态模式的理解
对状态模式的理解 一、场景二、不采用状态模式1、代码2、缺点 三、采用状态模式1、代码1.1 状态类1.2 上下文(这里指:媒体播放器)1.3 客户端 2、优点 一、场景 同一个东西(例如:媒体播放器),有一…...
LangChain与LangGraph内置回调函数
LangChain与LangGraph回调函数指南 回调函数概述 LangChain和LangGraph共享同一套回调系统,通过BaseCallbackHandler类提供了丰富的生命周期钩子,可用于监控、调试和跟踪AI应用的执行过程。 回调函数流程图 #mermaid-svg-EsqgET3Cjlj0l0Z1 {font-fami…...
字符串哈希算法详解:原理、实现与应用
字符串哈希是一种高效处理字符串匹配和比较的技术,它通过将字符串映射为一个唯一的数值(哈希值),从而在O(1)时间内完成子串的比较。本文将结合代码实现,详细讲解前缀哈希法的工作原理,并通过流程图逐步解析…...
vue2(webpack)集成electron和 electron 打包
前言 之前发过一篇vue集成electron的文章,但是用vue3vite实现的,在vue2webpack工程可能不适用,所以这篇文章就主要介绍vue2webpack集成electron方法 创建项目 vue create vue-electron-demo目录架构 vue-electron-demo/ ├── src/ …...
C++内存管理优化实战:提升应用性能与效率
🧑 博主简介:CSDN博客专家、CSDN平台优质创作者,高级开发工程师,数学专业,拥有高级工程师证书;擅长C/C、C#等开发语言,熟悉Java常用开发技术,能熟练应用常用数据库SQL server,Oracle…...
redis数据迁移之通过redis-dump镜像
这里写目录标题 一、redis-dump 镜像打包1.1 安装windows docker1.2 idea项目创建1.3 idea镜像打包 二、redis数据迁移2.1 数据导出2.2 数据导入 一、redis-dump 镜像打包 没有找到可用的redis-dump镜像,需要自己打包一下,这里我是在idea直接打包的 1.…...
C语言单链表的增删改补
目录 (一)单链表的结构定义及初始化 (二)单链表的尾插,头插 (三)单链表的尾删,头删 (四)单链表的查找,删除,销毁 单链表是数据结构课程里的第二个数据结构。单链表在逻辑结构是连续的,在物理…...
redis导入成功,缺不显示数据
SpringBootTest class SecurityApplicationTests {AutowiredStringRedisTemplate template; //添加这句代码,自动装载,即可解决文章三处代码报错Testvoid contextLoads() {String compact Jwts.builder().signWith(Jwts.SIG.HS512.key().build()).subj…...
从表格到序列:Swift 如何优雅地解 LeetCode 251 展开二维向量
文章目录 摘要描述题解答案题解代码分析示例测试及结果时间复杂度空间复杂度总结 摘要 在这篇文章中,我们将深入探讨 LeetCode 第 251 题——“展开二维向量”的问题。通过 Swift 语言,我们不仅会提供可运行的示例代码,还会结合实际场景进行…...
