消息队列(一):需求分析
为什么要做这样一个项目?
首先,我们在之前学习的时候,就认识了一下 生产者消费者模式,这样一个模式有两大好处:
- 解耦合
- 本来有个分布式系统,A服务器 调⽤ B服务器(A给B发请求,B给A返回响应)===》 A 和 B 的耦合是⽐较⼤的!
- 引⼊消息队列后,A把请求发送到消息队列,B再从消息队列获取到请求;此时就算 A 或者 B 崩溃了,都不会影响到对方
-
- 削峰填谷
- ⽐如A是⼊⼝服务器,A 调⽤ B 完成⼀些具体业务,如果是 A 和 B 直接通信,如果突然A 收到 ⼀组⽤户的请求的峰值,此时 B 也会随着受到峰值
- 引⼊消息队列后,A把请求发送到消息队列,B再从消息队列获取到请求。 (虽然A收到很多请 求,队列也收到了很多请求,但是B仍旧可以按照原来的节奏处理请求。不⾄于说⼀下就收到太 多的并发量。)
- 举个例⼦:⾼铁⽕⻋站,进站⼝。 乘客好⽐A ,进站⼝好⽐B,是有限的,就需要⼀个队列来 排队,这样不管⼈多少,就不会影响到乘客进站以后的坐⻋。
-
什么是消息队列?
队列可以说是一个数据结构,可以存储数据,如下图,我们从右侧(队尾)插入元素(入队),从队头获取元素(出队)。
了解了队列之后,我们来看一下什么是消息队列,消息队列就是我们常说的MQ,英文叫Message Queue,是作为一个单独的中间件产品存在的,独立部署。
这个消息队列有啥好处呢?
我们先前已经提过了 解耦合 、 削峰填谷 ;除此之外还有一个好处 : 异步,关于这个异步,等到正式介绍的相关代码我再来细说,这里就不多介绍了。
市⾯上⼀些知名的消息队列
- RabbitMQ
- Kafka
- RocketMQ
- ActiveMQ
本项目是 基于 RabbitMQ 实现的较为简易的(所谓建议的意思就是还可以继续完善) MQ;RabbitMQ 是一款比较 “ 年长 ” 的消息队列,比较经典,所以选择了这一款消息队列。
需求分析
核心概念1
- ⽣产者(Producer)
- 消费者(Consumer)
- 中间⼈(Broker)
- 发布(Push) ⽣产者向中间⼈这⾥投递消息的过程
- 订阅(Subscribe) 哪些消费者要从中间⼈取数据,这个注册的过程,称为 "订阅"
- 消费 (Consume) 消费者从中间⼈这⾥取数据的动作
一个生产者,一个消费者
多个生产者,多个消费者
核心概念2
- 虚拟主机(Virtual Host),类似于 MySQL 中的 database,算是⼀个 "逻辑" 上的数据集合
- ⼀个Broker server 上可以组织多种不同类别数据,可以使⽤ Virtual Host 做出逻辑上的区分
- 实际开发中,⼀个 Broker server也可能同时⽤来管理多个 业务线上的数据,就可以使⽤ Virtual Host 做出逻辑上的区分。
- 交换机(Exchange)
- ⽣产者把消息投递给 Broker Server,实际上是把消息先交给了 (公司某⼀层楼)Broker Server 上的交换机,再由交换机把消息交给对应的队列。 (交换机类似于“前台⼩姐姐”)
- 队列(Queue)
- 真正⽤来存储处理消息的实体,后续消费者也是从对应的队列中取数据
- ⼀个⼤的消息队列中,可以有很多具体的⼩队列
- 绑定(Binding)
- 把交换机和队列之间,建⽴关系。
- 可以把 交换机 和 队列 视为,数据库中 多对多的关系。可以想象,在 MQ 中,也是有⼀个这 样的中间表,所谓的 “绑定’其实就是中间表中的⼀项
- 消息(Message)
- 具体来说,是 服务器A 发给 B 的请求(通过MQ转发), 服务器B 给 服务器A返回的响应(通过MQ转发)
- ⼀个消息,可以视为⼀个字符串(⼆进制数据),具体由程序员⾃定义
持久化
关于持久化,上述 虚拟机、交换机、队列、绑定、消息,需要存储起来,光是存内存显然是不够的(为啥不够呢?万一服务器宕机了,我们消息还没有处理,所以还需要将消息恢复一下),所以我们还需要存硬盘,这里以内存为主、硬盘为辅。
关于存硬盘,我们这里要进行一个区分 :
- 存数据库:交换机、队列、绑定
- 存文件:主要是与消息相关的
在内存中存储的原因:
对于 MQ 来说,能够⾼效的转发处理数据,是⾮常关键的指标! 因此对于使⽤内存来组织数据,得 到的效率,就⽐放硬盘要⾼很多
在硬盘中存储的原因:
为了防⽌内存中数据随着进程重启/主机重启⽽丢失
核心 API
- 创建队列(queueDeclare)
- 此处不⽤ Create这样的术语,原因是Create仅仅是创建;⽽ Declare 起到的效果是,不存在则创建,存在就啥也不做
- 销毁队列(queueDelete)
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchageDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnbind)
- 发布消息(basicPublish)
- 订阅消息(basicConsume)
- 确认消息(basicAck)
- 这个API起到的效果,是可以让消费者显式的告诉 broker server,这个消息我处理完毕了,提⾼ 整个系统的可靠性~保证消息处理没有遗漏
- RabbitMQ 提供了 肯定 和 否定的 确认,此处我们项⽬就只有 肯定确认
交换机类型
交换机在转发消息的时候,有一套转发规则的
消息队列提供了几种不同的 交换机类型(ExchangeType) 来描述这里不同的转发规则
RabbitMQ主要实现了如下四种交换机(也是由 AMQP协议定义的)
- Direct 直接交换机
- Fanout 扇出交换机
- Topic 主题交换机
- Header 消息头交换机
而在我们手写的交换机只实现了前三种:
a. ⽣产者发送消息时,会指定⼀个"⽬标队列"的名字(此时的 routingKey就是 队列的名字)b. 交换机收到后,就看看绑定的队列⾥⾯,有没有匹配的队列c. 如果有,就转发过去(把消息塞进对应的队列中)d. 如果没有,消息直接丢弃
a. 会把消息放到交换机绑定的每个队列b. 只要和这个交换机绑定任何队列都会转发消息
a. bindingKey:把队列和交换机绑定的时候,指定⼀个单词(像是⼀个暗号⼀样)b. routingKey:⽣产者发送消息的时候,也指定⼀个单词c. 如果当前 bindingKey 和 routingKey 对上了,就可以把消息转发到对应的队列
- 专属红包 ======== 直接交换机
- 发个10块钱红包,⼤家都能领 10块钱红包 ======== 扇出交换机
- 我发个⼝令红包,只有输⼊对应⼝令才能领导红包 ======== 主题交换机
至于为啥没有实现 header主题头 交换机 ,那是因为 header 的实现规则复杂,并且实用场景少。
网络通信
其他的服务器(生产者/消费者)通过网络,和咱们的 BrokerServer 进行交互的。
此处设定,使用 TCP + 自定义应用层协议 实现 生产者/消费者 和 BrokerServer 之间进行交互。
应用层协议主要工作:就是让客户端可以通过网络,调用 BrokerServer 提供的编程接口(从而达到看似是生产者远程调用服务器的API)
消息队列服务器(BrokerServer),要提供的核心 API
- 创建队列(queueDeclare)
- 此处不⽤ Create这样的术语,原因是Create仅仅是创建;⽽ Declare 起到的效果是,不存在则创建,存在就啥也不做
- 销毁队列(queueDelete)
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchageDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnbind)
- 发布消息(basicPublish)
- 订阅消息(basicConsume)
- 确认消息(basicAck)
- 这个API起到的效果,是可以让消费者显式的告诉 broker server,这个消息我处理完毕了,提⾼ 整个系统的可靠性~保证消息处理没有遗漏
- RabbitMQ 提供了 肯定 和 否定的 确认,此处我们项⽬就只有 肯定确认
因此,客户端这边也需要提供上述 API,只有服务器是真正干实事的;客户端只是发送请求、接收响应:
客户端除了提供了上述 9 个方法之外,还需要提供 4个 额外的方法,支撑其他工作
- 创建 Connection
- 关闭 Connection
- 此处⽤的 TCP 连接,⼀个 Connection 对象,就代表⼀个 TCP连接
-
- 创建 Channel
- ⼀个Connection ⾥⾯包含多个 Channel,每个 Channel 上传输的数据都是互不相⼲的
- TCP中,建⽴/断开⼀个连接,成本挺⾼的,因此很多时候不希望频繁建⽴断开 TCP 连接
- 所以定义⼀个 Channel ,不⽤的时候,销毁 Channel,此处 Channel 是逻辑概念,⽐ TCP 轻量很多
-
- 关闭 Channel
消息应答模式
- 自动应答,消费者把这个消息取走了(调用了回调函数),就算应答了
- 手动应答,调用 basicAck 这个方法属于手动应答(消费者需要主动调用这个 API 来进行应答)
总结
- 需要实现 ⽣产者,消费者,brokerserver 三个部分
- 针对⽣产者消费者来说,主要编写的是 客户端和服务器的通信部分,给客户端提供⼀组 api,让客 户端的业务代码来调⽤,从⽽通过⽹络通信的⽅式远程调⽤ brokerserver 上的⽅法
- ⽐如创建交换机,客户端这边只需要提供相关参数即可,然后通过 socket 将 request 传⼊到 ⽹卡中,然后服务器从 ⽹卡中读取 request 解析。然后计算请求得到 response,再通过 socket 写回去⽹卡。
- 实现 brokerserver 【重点】
- 持久化
- 上述这些关键数据,在硬盘中怎么存储,啥格式存储,存储在哪?
- 后续服务器宕机或是重启了,如何读取这些数据?
如果还是不太理解的话,可以结合下列思维导图一起理解:
思维导图
相关文章:

消息队列(一):需求分析
为什么要做这样一个项目? 首先,我们在之前学习的时候,就认识了一下 生产者消费者模式,这样一个模式有两大好处: 解耦合 本来有个分布式系统,A服务器 调⽤ B服务器(A给B发请求,B给A…...

ImageViewer技术实现细节
第1章 ImageViewer工具使用方法 1.1. 图像加载 1.1.1. 单图像加载 左上角菜单,“File”->“单图像”,或者Ctrl-S,弹出文件对话框,选择图像文件,当前支持bmp,png,jpg格式。 结果如下图所示: 1.1.2. 多图像加载 左上角菜单,“File”->“多图像”,或者Ctrl-M…...
MFC多文档程序,从菜单关闭一个文档和直接点击右上角的x效果不同
MFC多文档程序,从菜单关闭一个文档和直接点击右上角的x效果不同 若文档内容有修改,则前者会询问用户,是否保存修改;后者不保存修改直接关闭。 原因在于,从菜单关闭时,调用OnClose,一定会调用Sa…...

【数据结构】C++实现AVL平衡树
文章目录 1.AVL树的概念2.AVL树的实现AVL树结点的定义AVL树的插入AVL树的旋转左单旋右单旋左右双旋右左双旋插入代码 AVL树的验证AVL树的查找AVL树的修改AVL树的删除AVL树的性能 AVL树的代码测试 1.AVL树的概念 二叉搜索树虽然可以提高我们查找数据的效率,但如果插…...

图神经网络系列之序章
文章目录 一、为什么需要图神经网络?二、图的定义1.图的定义和种类2.一些关于图的重要概念2.1 子图2.2 连通图2.3 顶点的度、入度和出度2.4 边的权和网2.5 稠密图、稀疏图 3.图的存储结构3.1 邻接矩阵3.2 邻接表3.3 边集数组3.4 邻接多重表3.5 十字链表3.6 链式前向…...

Unity中 UI Shader的基本功能
文章目录 前言一、实现思路1、暴露一个 2D 类型的属性来接受UI的纹理2、设置shader的层级为TransParent半透明渲染层级,一般UI都是在这个渲染层级3、更改混合模式,是 UI 使用的纹理,该透明的地方透明 二、代码实现 前言 Unity中 UI Shader的…...

【自学开发之旅】Flask-标准化返回-连接数据库-分表-orm-migrate-增删改查(三)
业务逻辑不能用http状态码判断,应该有自己的逻辑判断。想要前端需要判断(好多if…else),所以需要标准化,标准化返回。 json标准化返回: 最外面:data,message,code三个字段。 data:返回的数据 co…...
numpy增删改查
NumPy是一个用于科学计算的Python库,它提供了一个多维数组对象以及许多用于操作这些数组的函数。下面是关于如何在NumPy中进行增删改查操作的一些基本示例: 创建NumPy数组: import numpy as np # 创建一个一维数组 arr np.array([1, 2, 3, …...
【kafka】kafka重要的集群参数配置
如何规划Kafka 对于实际应用的生产环境中,需要尽量先规划设计好集群,避免后期业务上线后费力调整。在考量部署方案时需要通盘考虑,不能仅从单个维度上进行评估,下面是几个重要的维度的考量和建议: 这里重点说说操作系…...

cs224w_colab3_2023 And cs224w_colab4_2023学习笔记
class GNNStack(torch.nn.Module):def __init__(self, input_dim, hidden_dim, output_dim, args, embFalse):super(GNNStack, self).__init__() #这里的继承表示参见 https://blog.csdn.net/wanzew/article/details/106993425 # 继承时运行继承类别的函数 总之 __mro__的目的…...

Cannot find module ‘prop-types‘
把这个import删了。...
LeetCode-63-不同路径Ⅱ-动态规划
题目描述: 一个机器人位于一个 m x n 网格的左上角 (起始点在下图中标记为 “Start” )。 机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角(在下图中标记为 “Finish”)。 现在考虑网格中有障碍物。那…...

unity 使用Photon进行网络同步
Pun使用教程 第一步:请确保使用的 Unity 版本等于或高于 2017.4(不建议使用测试版)创建一个新项目。 第二步:打开资源商店并找到 PUN 2 资源并下载/安装它。 导入所有资源后,让 Unity 重新编译。 第三步…...
大数据课程M1——ELK的概述
文章作者邮箱:yugongshiyesina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 了解ELK的定义; ⚪ 掌握ELK的使用; 一、什么是ELK 1. 简介 ELK 是elastic公司提供的一套完整的日志收集以及展示的解决方案,是三个…...
C# byte[] 如何转换成byte*
目标:将byte[]转成byte*以方便使用memcpy [DllImport("kernel32.dll", EntryPoint "RtlCopyMemory", CharSet CharSet.Ansi)] public extern static long CopyMemory(IntPtr dest, IntPtr source, int size); private void butTemp_Click(object…...
MySQL与Oracle的分页
MySQL与Oracle的分页 当我们通过SQL去查询一个结果集的时候,并不需要查看所有行,可能只是查看前几行,或者中间的几行。则需要像MySQL的limit或Oracle的ROWNUM与FETCH NEXT来实现。 MySQL 语法 SELECT * FROM table_name LIMIT [offset,] ro…...

git基本手册
Git and GitHub for Beginners Tutorial - YouTube Kevin Stratvert git config --global user.name “xxx” git config --global user.email xxxxx.com 设置默认分支 git config --global init.default branch main git config -h查看帮助 详细帮助 git help config 清除 cl…...

每日一题(两数相加)
每日一题(两数相加) 2. 两数相加 - 力扣(LeetCode) 思路 思路: 由于链表从头开始向后存储的是低权值位的数据,所以只需要两个指针p1和p2,分别从链表的头节点开始遍历。同时创建一个新的指针new…...

恒运资本:沪指震荡涨0.28%,医药板块强势拉升,金融等板块上扬
15日早盘,沪指盘中震荡上扬,科创50指数表现强势;北向资金小幅净流入。 到午间收盘,沪指涨0.28%报3135.31点,深成指、创业板指涨均0.11%,科创50指数涨1.04%;两市合计成交4357亿元,北…...

【计算机网络】Tcp详解
文章目录 前言Tcp协议段格式TCP的可靠性面向字节流应答机制超时重传流量控制滑动窗口(重要)拥塞控制延迟应答捎带应答标志位具体标志位三次握手四次挥手粘包问题TCP异常情况listen的第二个参数 前言 前面我们学习了传输层协议Udp,今天我们一…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
多场景 OkHttpClient 管理器 - Android 网络通信解决方案
下面是一个完整的 Android 实现,展示如何创建和管理多个 OkHttpClient 实例,分别用于长连接、普通 HTTP 请求和文件下载场景。 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas…...
连锁超市冷库节能解决方案:如何实现超市降本增效
在连锁超市冷库运营中,高能耗、设备损耗快、人工管理低效等问题长期困扰企业。御控冷库节能解决方案通过智能控制化霜、按需化霜、实时监控、故障诊断、自动预警、远程控制开关六大核心技术,实现年省电费15%-60%,且不改动原有装备、安装快捷、…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
xmind转换为markdown
文章目录 解锁思维导图新姿势:将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件(ZIP处理)2.解析JSON数据结构3:递归转换树形结构4:Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...