当前位置: 首页 > news >正文

手写消息队列(基于RabbitMQ)

一、什么是消息队列?

提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型

我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:

  1. 解耦合.
  2. 削峰填谷.

(1)解耦合
在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。

(2)削峰填谷
在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。

其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列

二、需求整理

1、生产者消费者模型核心概念

  1. 生产者 (Producer): 负责将消息发送到消息队列中。

  2. 消费者 (Consumer): 从消息队列中接收和处理消息。。

  3. 中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。

  4. 发布 (Publish): 生产者将消息投递到中间人的过程。

  5. 订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。

根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)

一个生产者,一个消费者:

N 个生产者,N 个消费者:

2、Broker 设计概要

我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储转发,其中涉及到的核心概念如下:

  1. 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。

  2. 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。

  3. 队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定
    (一个 Queue 中的消息可以来自于多个 Exchange)。

  4. 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。

  5. 消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。

上述概念在 Broker 中的体现如图所示:

补充说明1:数据存储

以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:

  • 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
  • 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。

补充说明:2: 交换机类型与转发规则

上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:

前要说明:

  • 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
  • 以下 routingKey(路由键)是生产者发送消息时指定的关键字。

(1)Direct(直接交换机)

  1. 生产者发送消息时,会指定一个"目标队列"的名字(此时的 routingKey 就是 队列的名字,bindingKey 无效)
  2. 交换机收到消息后,查看当前交换机对应的绑定里面是否存在队列名字为routingKey的队列
  3. 如果有,就转发过去(把消息塞进对应的“目标队列”中)
  4. 如果没有,消息直接丢弃

(2)Fanout(扇出交换机)

  1. 生产者无需指定routingKey,直接发送消息到指定交换机
  2. 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)

(3)Topic(主题交换机)

  1. 生产者发送消息时,指定一个 routingKey
  2. 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
  3. 如果有,就将消息转发到对应的绑定队列中。
  4. 如果没有,则将消息丢弃。

PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3、Broker 核心 API

Broker 基于以上概念和功能,需要实现的核心 API 如下:

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

补充说明:

  1. 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
  2. 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
  3. 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。

4、客户端设计概要

生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。

客户端核心API:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

补充说明:

  1. 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
  2. 这里的一个Connection对象代表一个TCP连接。
  3. Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
  4. 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
  5. 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。

5、小结

最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:

  1. 实现生产者、BrokerServer、消费者三个部分
  2. 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
  3. 重点实现BrokerServer,包括内部的基本概念和核心 API。
  4. 数据的持久化存储。

三、具体实现

附上连接:

1、消息队列详细设计与实现思维导图
2、Gitee 完整代码地址

相关文章:

手写消息队列(基于RabbitMQ)

一、什么是消息队列? 提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式阻塞式队列定时器线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途…...

kafka本地安装报错

Error: VM option ‘UseG1GC’ is experimental and must be enabled via -XX:UnlockExperimentalVMOptions. #打开 bin/kafka-run-class.sh KAFKA_JVM_PERFORMANCE_OPTS“-server -XX:UseG1GC -XX:MaxGCPauseMillis20 -XX:InitiatingHeapOccupancyPercent35 -XX:ExplicitGCInv…...

王者荣耀游戏

游戏运行如下: sxt Background package sxt;import java.awt.*; //背景类 public class Background extends GameObject{public Background(GameFrame gameFrame) {super(gameFrame);}Image bg Toolkit.getDefaultToolkit().getImage("C:\\Users\\24465\\D…...

MobaXterm如何连接CentOS7的Linux虚拟机?Redis可视化客户端工具如何连接Linux版Redis?

一、打开Lunix虚拟机,进入虚拟机中,在终端中输入ifconfig,得到以下信息,红框中为ip地址 二、打开MobaXterm,点击session 选择SSH,在Remote host中输入linux得到的IP地址,Specify username中可起一个任意的连接名称。 输入密码 四、…...

python实现炫酷的屏幕保护程序

shigen日更文章的博客写手,擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。记录成长,分享认知,留住感动。 上次的文章如何实现一个下班倒计时程序的阅读量很高,觉得也很实用酷炫,下边是昨天的体验…...

java学习part06数组

62-数组-数组的概述_哔哩哔哩_bilibili 这篇 Java 基础,我吹不动了 - 掘金 (juejin.cn) 1.数组概念 重点 2.数组声明和初始化 new的时候要么给出静态初始化的数据{a,b,c},要么给出动态初始化指定长度 [4]。 否则报错,初始化必须确定长度…...

Java 的异常体系

Java 中 Throwable 是所有异常和错误的超类,两个直接子类是 Error(错误)和 Exception(异常) 在Java中,异常的根类是java.lang.Throwable类,而根类又分为两大类:Error和Exception&…...

V100 GPU服务器安装CUDA教程

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…...

快速弄懂Python3.11中的新特性

Python 3.11 引入了许多新特性和改进,让我们逐一详细了解这些更新: 1. 更详细的错误消息 Python 3.11 在错误报告方面做出了显著改进,与 Python 3.10 相比,它提供了更详细的错误消息,能够指出表达式中具体哪个部分导…...

七,vi和vim

Linux系统会内置vi文本编辑器 Vim具有程序编辑的能力,可以看做是Vi的增强版本,可以主动的以字体颜色辨别语法的正确性,方便程序设计。代码补完、编译及错误跳转等方便编程的功能特别丰富,在程序员中被广泛使用。 vi和vim常用的三…...

湖科大计网:传输层

一、传输层概述 一、基本概念 传输层是端到端的协议。 因特网的两种不同的传输层协议: TCP:面向连接 UDP:无连接 我们在学习的过程中,只需要关注传输层之间的通信,不需要关注传输层协议数据是经过路由器转发至目的网络…...

设计模式(二)-创建者模式(3)-抽象工厂模式

一、为什么需要抽象工厂模式? 在工厂模式中,我们需要定义多个继承于共同工厂抽象基类的工厂子类,这些子类负责创建一个对应的对象。工厂模式存在一个缺点就是:每次扩展新的工厂子类,就会增加系统的复杂度。 如果我们…...

[计算机网络]网络层概述

呼,写了这么久终于重新开始啦! 自己落下了太多东西了.....是时候应该重新拾掇起来了. 关于后面的代码项目,我的想法是vilas.js仍然使用js来进行编写,但是后续其他的项目会开始尝试使用ts来进行书写了. 就算是前端也需要点规范吧..... 0.写在前面 这篇文章要和大家道个歉,首…...

猫12分类:使用yolov5训练检测模型

前言: 在使用yolov5之前,尝试过到百度飞桨平台(小白不建议)、AutoDL平台(这个比较友好,经济实惠)训练模型。但还是没有本地训练模型来的舒服。因此远程了一台学校电脑来搭建自己的检测模型。配置…...

Kubernetes Dashboard部署ImagePullBackOff问题处理

通常,出现ImagePullBackOff问题是由于Kubernetes集群无法拉取所需的镜像导致的。解决这个问题的方法通常包括以下步骤: 1. 检查Pod的描述信息: kubectl describe pod/[pod名称] --namespacekubernetes-dashboard 查看Events部分是否有关于…...

十四、Docker的基本操作

目录 (一)镜像命令 一、拉取Nginx 二、查看镜像 三、导出文件 四、删除镜像 五、加载镜像 (二)容器命令 一、例子:运行一个nginx容器 1、输入运行命令 2、使用命令查看宿主机ip 3、在外部浏览器访问 4、查看…...

C#,数值计算——插值和外推,分段线性插值(Linear_interp)的计算方法与源程序

1 文本格式 using System; namespace Legalsoft.Truffer { /// <summary> /// 分段线性插值 /// Piecewise linear interpolation object. /// Construct with x and y vectors, then call interp for interpolated values. /// </summary> …...

详细讲解什么是单例模式

当谈到单例模式时&#xff0c;我们指的是一种设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点来访问该实例。这种模式在软件开发中很常见&#xff0c;特别是需要控制资源访问、配置管理、日志记录器等情况下。 让我们用一个简单的例子来解释单…...

在springBoot中同时使用mysql和MongoDB

在SpringBoot中非关系向数据库MongoDB和关系型数据库MySQL都可通过引入相关依赖并按照指定配置单独集成; mysql引入依赖: compile "org.springframework.boot:spring-boot-starter-web:1.5.18.RELEASE"compile "org.springframework.boot:spring-boot-start…...

2023.11.19 hadoop之MapReduce

目录 1.简介 2.分布式计算框架-Map Reduce 3.mapreduce的步骤 4.MapReduce底层原理 map阶段 shuffle阶段 reduce阶段 1.简介 Mapreduce是一个分布式运算程序的编程框架&#xff0c;是用户开发“基于hadoop的数据分析应用”的核心框架&#xff1b; Mapreduce核心功能是…...

力扣第841题 钥匙和房间 C++ DFS BFS 附Java代码

题目 841. 钥匙和房间 中等 相关标签 深度优先搜索 广度优先搜索 图 有 n 个房间&#xff0c;房间按从 0 到 n - 1 编号。最初&#xff0c;除 0 号房间外的其余所有房间都被锁住。你的目标是进入所有的房间。然而&#xff0c;你不能在没有获得钥匙的时候进入锁住的房间…...

React 中 react-i18next 切换语言( 项目国际化 )

背景 平时中会遇到需求&#xff0c;就是切换语言&#xff0c;语种等。其实总的来说都是用i18n来实现的 思路 首先在项目中安装i18n插件&#xff0c;然后将插件引入到项目&#xff0c;然后配置语言包&#xff08;语言包需要你自己来进行配置&#xff0c;自己编写语言包&#xff…...

antd design 5 版本 文件上传

<UploadcustomRequest{customRequest}accept".csv" showUploadList{false}><Button icon{<UploadOutlined />}>上传 CSV 文件</Button></Upload> accept 代表限制的上传类型 也可设置 .excel // 文件上传 ( CSV ) const customReques…...

【如何学习Python自动化测试】—— 浏览器操作

4 、 浏览器操作 4.1 浏览器最大化 Webdriver 打开浏览器后&#xff0c;默认不是最大化&#xff0c;如果需要界面最大化&#xff0c;需要通过 maximize_window()方法来实现&#xff0c;代码如下&#xff1a; maximize_window()方法是Selenium WebDriver提供的一个方法&#xf…...

Python编程技巧 – 使用字典

Python编程技巧 – 使用字典 Python Programming Skills – Using Dictionary Dictionary, 即字典&#xff0c;这是Python语言的一种重要的数据结构&#xff1b;Python字典是以键&#xff08;key&#xff09;值(value)对为元素&#xff0c;来存储数据的集合。 前文提到Python列…...

el-tree 与table表格联动

html部分 <div class"org-left"><el-input v-model"filterText" placeholder"" size"default" /><el-tree ref"treeRef" class"filter-tree" :data"treeData" :props"defaultProp…...

Leetcode刷题详解——删除并获得点数

1. 题目链接&#xff1a;740. 删除并获得点数 2. 题目描述&#xff1a; 给你一个整数数组 nums &#xff0c;你可以对它进行一些操作。 每次操作中&#xff0c;选择任意一个 nums[i] &#xff0c;删除它并获得 nums[i] 的点数。之后&#xff0c;你必须删除 所有 等于 nums[i] …...

HTTP四种请求方式,状态码,请求和响应报文

1.get请求 一般用于获取数据请求参数在URL后面请求参数的大小有限制 2.post请求 一般用于修改数据提交的数据在请求体中提交数据的大小没有限制 3.put请求 一般用于添加数据 4.delete请求 一般用于删除数据 5.一次完整的http请求过程 域名解析&#xff1a;使用DNS协议…...

Python - Wave2lip 环境配置与 Wave2lip x GFP-GAN 实战 [超详细!]

一.引言 前面介绍了 GFP-GAN 的原理与应用&#xff0c;其用于优化图像画质。本文关注另外一个相关的项目 Wave2lip&#xff0c;其可以通过人物视频与自定义音频进行适配&#xff0c;改变视频中人物的嘴型与音频对应。 二.Wave2Lip 简介 Wave2lip 研究 lip-syncing 以达到视频…...

2311rust,1.31版本更新

1.31.0稳定版 Rust1.31可能是最激动人心的版本! 使用Cargo创建一个新项目: cargo new foo以下是Cargo.toml的内容: [package] name "foo" version "0.1.0" authors ["名字"] edition "2018" //版本. [dependencies]在[package]…...