手写消息队列(基于RabbitMQ)
一、什么是消息队列?
提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型。
我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:
- 解耦合.
- 削峰填谷.
(1)解耦合
在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。

(2)削峰填谷
在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。

其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列。
二、需求整理
1、生产者消费者模型核心概念
生产者 (Producer): 负责将消息发送到消息队列中。
消费者 (Consumer): 从消息队列中接收和处理消息。。
中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。
发布 (Publish): 生产者将消息投递到中间人的过程。
订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。
根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)
一个生产者,一个消费者:

N 个生产者,N 个消费者:

2、Broker 设计概要
我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储 和 转发,其中涉及到的核心概念如下:
虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。
交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。
队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定
(一个 Queue 中的消息可以来自于多个 Exchange)。绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。
消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。
上述概念在 Broker 中的体现如图所示:

补充说明1:数据存储
以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:
- 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
- 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。
补充说明:2: 交换机类型与转发规则
上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:
前要说明:
- 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
- 以下 routingKey(路由键)是生产者发送消息时指定的关键字。
(1)Direct(直接交换机)
- 生产者发送消息时,会指定一个"目标队列"的名字(此时的 routingKey 就是 队列的名字,bindingKey 无效)
- 交换机收到消息后,查看当前交换机对应的绑定里面是否存在队列名字为routingKey的队列
- 如果有,就转发过去(把消息塞进对应的“目标队列”中)
- 如果没有,消息直接丢弃

(2)Fanout(扇出交换机)
- 生产者无需指定routingKey,直接发送消息到指定交换机
- 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)

(3)Topic(主题交换机)
- 生产者发送消息时,指定一个 routingKey
- 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
- 如果有,就将消息转发到对应的绑定队列中。
- 如果没有,则将消息丢弃。

PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
3、Broker 核心 API
Broker 基于以上概念和功能,需要实现的核心 API 如下:
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
补充说明:
- 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
- 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
- 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。
4、客户端设计概要
生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。
客户端核心API:
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
补充说明:
- 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
- 这里的一个Connection对象代表一个TCP连接。
- Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
- 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
- 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。
5、小结
最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:
- 实现生产者、BrokerServer、消费者三个部分
- 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
- 重点实现BrokerServer,包括内部的基本概念和核心 API。
- 数据的持久化存储。
三、具体实现
附上连接:
1、消息队列详细设计与实现思维导图
2、Gitee 完整代码地址
相关文章:
手写消息队列(基于RabbitMQ)
一、什么是消息队列? 提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式阻塞式队列定时器线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途…...
kafka本地安装报错
Error: VM option ‘UseG1GC’ is experimental and must be enabled via -XX:UnlockExperimentalVMOptions. #打开 bin/kafka-run-class.sh KAFKA_JVM_PERFORMANCE_OPTS“-server -XX:UseG1GC -XX:MaxGCPauseMillis20 -XX:InitiatingHeapOccupancyPercent35 -XX:ExplicitGCInv…...
王者荣耀游戏
游戏运行如下: sxt Background package sxt;import java.awt.*; //背景类 public class Background extends GameObject{public Background(GameFrame gameFrame) {super(gameFrame);}Image bg Toolkit.getDefaultToolkit().getImage("C:\\Users\\24465\\D…...
MobaXterm如何连接CentOS7的Linux虚拟机?Redis可视化客户端工具如何连接Linux版Redis?
一、打开Lunix虚拟机,进入虚拟机中,在终端中输入ifconfig,得到以下信息,红框中为ip地址 二、打开MobaXterm,点击session 选择SSH,在Remote host中输入linux得到的IP地址,Specify username中可起一个任意的连接名称。 输入密码 四、…...
python实现炫酷的屏幕保护程序
shigen日更文章的博客写手,擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。记录成长,分享认知,留住感动。 上次的文章如何实现一个下班倒计时程序的阅读量很高,觉得也很实用酷炫,下边是昨天的体验…...
java学习part06数组
62-数组-数组的概述_哔哩哔哩_bilibili 这篇 Java 基础,我吹不动了 - 掘金 (juejin.cn) 1.数组概念 重点 2.数组声明和初始化 new的时候要么给出静态初始化的数据{a,b,c},要么给出动态初始化指定长度 [4]。 否则报错,初始化必须确定长度…...
Java 的异常体系
Java 中 Throwable 是所有异常和错误的超类,两个直接子类是 Error(错误)和 Exception(异常) 在Java中,异常的根类是java.lang.Throwable类,而根类又分为两大类:Error和Exception&…...
V100 GPU服务器安装CUDA教程
大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…...
快速弄懂Python3.11中的新特性
Python 3.11 引入了许多新特性和改进,让我们逐一详细了解这些更新: 1. 更详细的错误消息 Python 3.11 在错误报告方面做出了显著改进,与 Python 3.10 相比,它提供了更详细的错误消息,能够指出表达式中具体哪个部分导…...
七,vi和vim
Linux系统会内置vi文本编辑器 Vim具有程序编辑的能力,可以看做是Vi的增强版本,可以主动的以字体颜色辨别语法的正确性,方便程序设计。代码补完、编译及错误跳转等方便编程的功能特别丰富,在程序员中被广泛使用。 vi和vim常用的三…...
湖科大计网:传输层
一、传输层概述 一、基本概念 传输层是端到端的协议。 因特网的两种不同的传输层协议: TCP:面向连接 UDP:无连接 我们在学习的过程中,只需要关注传输层之间的通信,不需要关注传输层协议数据是经过路由器转发至目的网络…...
设计模式(二)-创建者模式(3)-抽象工厂模式
一、为什么需要抽象工厂模式? 在工厂模式中,我们需要定义多个继承于共同工厂抽象基类的工厂子类,这些子类负责创建一个对应的对象。工厂模式存在一个缺点就是:每次扩展新的工厂子类,就会增加系统的复杂度。 如果我们…...
[计算机网络]网络层概述
呼,写了这么久终于重新开始啦! 自己落下了太多东西了.....是时候应该重新拾掇起来了. 关于后面的代码项目,我的想法是vilas.js仍然使用js来进行编写,但是后续其他的项目会开始尝试使用ts来进行书写了. 就算是前端也需要点规范吧..... 0.写在前面 这篇文章要和大家道个歉,首…...
猫12分类:使用yolov5训练检测模型
前言: 在使用yolov5之前,尝试过到百度飞桨平台(小白不建议)、AutoDL平台(这个比较友好,经济实惠)训练模型。但还是没有本地训练模型来的舒服。因此远程了一台学校电脑来搭建自己的检测模型。配置…...
Kubernetes Dashboard部署ImagePullBackOff问题处理
通常,出现ImagePullBackOff问题是由于Kubernetes集群无法拉取所需的镜像导致的。解决这个问题的方法通常包括以下步骤: 1. 检查Pod的描述信息: kubectl describe pod/[pod名称] --namespacekubernetes-dashboard 查看Events部分是否有关于…...
十四、Docker的基本操作
目录 (一)镜像命令 一、拉取Nginx 二、查看镜像 三、导出文件 四、删除镜像 五、加载镜像 (二)容器命令 一、例子:运行一个nginx容器 1、输入运行命令 2、使用命令查看宿主机ip 3、在外部浏览器访问 4、查看…...
C#,数值计算——插值和外推,分段线性插值(Linear_interp)的计算方法与源程序
1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// 分段线性插值 /// Piecewise linear interpolation object. /// Construct with x and y vectors, then call interp for interpolated values. /// </summary> …...
详细讲解什么是单例模式
当谈到单例模式时,我们指的是一种设计模式,它确保一个类只有一个实例,并提供一个全局访问点来访问该实例。这种模式在软件开发中很常见,特别是需要控制资源访问、配置管理、日志记录器等情况下。 让我们用一个简单的例子来解释单…...
在springBoot中同时使用mysql和MongoDB
在SpringBoot中非关系向数据库MongoDB和关系型数据库MySQL都可通过引入相关依赖并按照指定配置单独集成; mysql引入依赖: compile "org.springframework.boot:spring-boot-starter-web:1.5.18.RELEASE"compile "org.springframework.boot:spring-boot-start…...
2023.11.19 hadoop之MapReduce
目录 1.简介 2.分布式计算框架-Map Reduce 3.mapreduce的步骤 4.MapReduce底层原理 map阶段 shuffle阶段 reduce阶段 1.简介 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架; Mapreduce核心功能是…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
C语言中提供的第三方库之哈希表实现
一. 简介 前面一篇文章简单学习了C语言中第三方库(uthash库)提供对哈希表的操作,文章如下: C语言中提供的第三方库uthash常用接口-CSDN博客 本文简单学习一下第三方库 uthash库对哈希表的操作。 二. uthash库哈希表操作示例 u…...
AI语音助手的Python实现
引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...
