参考RabbitMQ实现一个消息队列
文章目录
- 前言
- 小小消息管家
- 1.项目介绍
- 2. 需求分析
- 2.1 API
- 2.2 消息应答
- 2.3 网络通信协议设计
- 3. 开发环境
- 4. 项目结构介绍
- 4.1 配置信息
- 5. 项目演示
前言
消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填谷。
在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大,如果A或者B服务器挂了,我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递,就降低了耦合度。
同样的,如果我们AB服务器直接进行通信,如果A服务器突然发送许多请求,我们B服务器也会收到巨多请求的影响,AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列,消息队列可以将许多请求接收存储下来,B服务器依然可以按照原有节奏取请求,不会一下子接收大量请求。这也就是我们所说的削峰。 填谷,也是类似的,就算请求过少,我们的服务器依然可以从消息队列中取出挤压得请求。
在分布式系统中,跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序,并且新增一些功能。这样的程序我们就称为 消息队列。
所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。
小小消息管家
1.项目介绍
将消息队列分成服务器模块、客户端模块、公共模块来实现。
- 服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘(数据库和文件)以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。
- 客户端模块:实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。
- 公共模块:约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。
2. 需求分析
由于是实现一个生产者消费者模型,在消息队列中我们要实现的逻辑如下:我们的生产者客户端向服务器发送消息,消费者客户端向服务器订阅消息,服务器负责消息的存储和转发。
概念解读:
- 虚拟主机 (VirtualHost):是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
- 交换机 (Exchange):生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。
- 队列 (Queue):真正用来存储消息,每个消费者决定自己从哪个 Queue 上读取消息。
- 绑定 (Binding):Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关
系。 - 消息 (Message): 传递的内容。
2.1 API
我们的服务器提供以下API是西安消息队列的基本功能。
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
我们的客户端提供以下API供客户使用消息队列,为了复用TCP连接,我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭:
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
2.2 消息应答
被消费者消费的消息,需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式:
- 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
- 手动应答:消费者手动调用应答接口(确认消息),Broker收到应答请求后删除这个消息。
2.3 网络通信协议设计
我们使用TCP 协议,来作为通信的底层协议。在这个基础上自定义应用层协议,实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能,length为消息的长度,payload为消息的具体内容。
请求和响应的格式如下:
其中 type取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客户端推送的消息(被订阅的消息)
其中 payload 部分,会根据不同的 type,存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息,我们定义对应的类实现。
对于响应,payload 表示这次方法调用的返回值。
3. 开发环境
数据库:SQLite
开发语言:Java
技术框架:SpringBoot、SpringMVC、Mybatis
管理工具:Maven
开发工具:Intellij IDEA 2020.1.4
操作系统:Windows10
4. 项目结构介绍
- common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式;创建自定义异常类;实现序列化反序列化;定义消费者以及处理消息调用的函数接口。
- demo 创建了一个消费者和生产者用于测试项目。
- mqclient 客户端模块,提供创建连接的工厂类;定义完整连接的内容;定义channel实现客户端api
- mqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。
- mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。
- mqserver.mapper:实现对sqlite数据库的操作。
4.1 配置信息
由于我们对于数据库的存储只涉及一小部分,所以此处我们利用sqlite进行数据管理。
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.14</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq</artifactId><version>0.0.1-SNAPSHOT</version><name>mq</name><description>mq</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter-test</artifactId><version>2.3.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>
application.yml: 配置数据库信息
5. 项目演示
创建一个生产者客户端向服务器发送一个消息:
package com.example.mq.demo;import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** 这个类用来表示一个生产者.* 通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");//创建一个连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);//得到逻辑链接,这个就类似于socketConnection connection = factory.newConnection();Channel channel = connection.createChannel();// 通过逻辑连接创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello 欢迎消费消息".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}
创建一个消费者客户端,订阅消费消息
package com.example.mq.demo;import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** @author zq* @date 2023-08-05 19:29*//** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);//订阅消息,并定义如何消费消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 通过这个循环模拟一直等待消费完生产者生产的所有消息while (true) {Thread.sleep(500);}}
}
实现结果如下 :
通过结果我们可以看出,我们消费者成功取出订阅的队列中的消息进行消费
相关文章:

参考RabbitMQ实现一个消息队列
文章目录 前言小小消息管家1.项目介绍2. 需求分析2.1 API2.2 消息应答2.3 网络通信协议设计 3. 开发环境4. 项目结构介绍4.1 配置信息 5. 项目演示 前言 消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填…...
SpringBoot+JWT
一、maven坐标 <!-- JWT依赖 --><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></dependency><dependency><groupId>com.auth0</groupId>&…...
Cad二次开发EqualPoint
在 CAD 软件的二次开发中,Tolerance.Global.EqualPoint 是一个特定的属性或方法,用于表示全局的相等性公差值。这个属性或方法通常是由 CAD 软件的开发平台或 API 提供的,用于处理浮点数的相等性比较。 具体来说,Tolerance.Globa…...

20230806将ASF格式的视频转换为MP4
20230806将ASF格式的视频转换为MP4 2023/8/6 18:47 缘起,自考中山大学的《计算机网络》,考试《数据库系统原理》的时候找到视频,由于个人的原因,使用字幕更加有学习效率! 由于【重型】的PR2023占用资源较多,…...
【MySQL】——常用接口API即相关函数说明
目录 1、MySQL结构体的说明 1、MYSQL结构体 2.MYSQL_RES结构体 3. MYSQL_FIELD 2. 接口的使用步骤 3、mysql_init()——MYSQL对象初始化 4、mysql_real_connect()——数据库引擎建立连接 5. mysql_query()——查询数据库某表内容 6、mysql_real_query——执行SQL语句 …...
ts + axios + useRequest (ahooks)—— 实现请求封装
现在越来越多的项目开始ts化,我们今天就一块学习一下,关于ts的请求封装。 首先要安装两个依赖: npm i axios -S npm i ahooks -S 引入: import { useRequest } from ahooks; import axios, { AxiosRequestConfig, AxiosRespo…...

Springboot @Validated注解详细说明
在Spring Boot中,Validated注解用于验证请求参数。它可以应用在Controller类或方法上 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId> </depen…...
STM32初学者,到底选标准库还是HAL库?
当初学者尝试学习STM32开发时,通常会面临一个关键的选择:是选择STM32的标准库,还是HAL库?这两个库各自有着优势与适用场景,本文将从多个角度分析,帮助初学者更好地选择适合自己的库。 在开始之前ÿ…...

小学生作业随机加减乘除运算计算习题答案 html源码
小学生作业随机加减乘除运算计算习题答案 html源码 这道题目提供了多种选项,包括运算符和输入的运算数范围。题目数量也可以选择。如果你选择好了选项,就可以点击出题按钮进行练习。 为了方便,题目答案可以打印出来。但是,如果隐藏了横线,就会去除等号后面的下划线。推荐使用…...

nvm下载安装配置
一、作用 nvm是node版本管理的工具,具有管理、下载、切换node版本等能力。经常不同项目需要依赖不同版本的node,此时nvm就能有效的解决node版本切换的问题。 二、nvm下载安装配置 (1)安装包地址 https://github.com/coreybutl…...
2023-08-07力扣每日一题
链接: 344. 反转字符串 题意: 如题 解: 初级算法做过的题啊-感觉这几天重复题还蛮多的 实际代码: #include<iostream> #include<vector> #include<algorithm> using namespace std; /* void reverseStri…...

uni——不规则tab切换(skew)
案例展示 案例代码 <!-- 切换栏 --> <view class"tabBoxs"><view class"tabBox"><block v-for"(item,index) in tabList" :key"index"><view class"tabItem":class"current item.id&…...

Docker安装Grafana以及Grafana应用
Doker基础 安装 1、 卸载旧的版本 sudo yum remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine 2、需要的安装包 sudo yum install -y yum-utils 3、设置镜像的仓库 yum-config-m…...

OpenSource - 分布式重试平台
文章目录 概述重试方案对比设计思想流量管理平台预览场景应用强通知场景发送MQ场景回调场景异步场景 概述 在当前广泛流行的分布式系统中,确保系统数据的一致性和正确性是一项重大挑战。为了解决分布式事务问题,涌现了许多理论和业务实践,其…...
oracle稳定执行计划
二、稳定执行计划 (一)sql profile的好处 稳定执行计划 在不能修改目标sql的sql文本的情况下使目标sql语句按照指定的执行计划运行。 1、automatic类型的sql profile 本质是针对目标sql的一些额外的调整信息,这些额外的调整信息需要与原目标s…...

docker安装neo4j
参考文章: 1、Mac 本地以 docker 方式配置 neo4j_neo4j mac docker_Abandon_first的博客-CSDN博客 2、https://www.cnblogs.com/caoyusang/p/13610408.html 安装的时候,参考了以上文章。遇到了一些问题,记录下自己的安装过程: …...

第十五章 定义 HL7 的 DTL 数据转换
文章目录 第十五章 定义 HL7 的 DTL 数据转换 第十五章 定义 HL7 的 DTL 数据转换 每个接口可能需要一定数量的数据转换。创建转换时,不要使用保留的包名称。 重要提示:请勿在数据转换中手动更改 HL7 转义序列;自动处理这些。 可以使用“数…...

【笔记】移动光猫改桥接
1. 登录后台 移动光猫的超管和密码(百度的) 账号:CMCCAdmin 密码:aDm8H%MdA 浏览器访问 192.168.1.1 并登录 2. 选择连接 点击“网络”,在“连接名称”下拉框选择 INTENET_R_VID 字样的连接,并截图备…...

网络安全进阶学习第十四课——MSSQL注入
文章目录 一、MSsql数据库二、MSsql结构三、MSsql重点表1、master 数据库中的Sysdatabases 表2、Sysobjects 表3、Syscolumns 表 四、Mssql常用函数五、Mssql的报错注入六、Mssql的盲注常用以下函数进行盲注: 七、联合注入1、获取当前表的列数2、获取当前数据库名3、…...

【C语言】初阶结构体
🎈个人主页:库库的里昂 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏 ✨收录专栏:C语言初阶 ✨其他专栏:代码小游戏 🤝希望作者的文章能对你有所帮助,有不足的地方请在评论…...

Docker 离线安装指南
参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性,不同版本的Docker对内核版本有不同要求。例如,Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本,Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...

【7色560页】职场可视化逻辑图高级数据分析PPT模版
7种色调职场工作汇报PPT,橙蓝、黑红、红蓝、蓝橙灰、浅蓝、浅绿、深蓝七种色调模版 【7色560页】职场可视化逻辑图高级数据分析PPT模版:职场可视化逻辑图分析PPT模版https://pan.quark.cn/s/78aeabbd92d1...

AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist
现象: android studio报错: [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决: 不要动CMakeLists.…...

第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10+pip3.10)
第一篇:Liunx环境下搭建PaddlePaddle 3.0基础环境(Liunx Centos8.5安装Python3.10pip3.10) 一:前言二:安装编译依赖二:安装Python3.10三:安装PIP3.10四:安装Paddlepaddle基础框架4.1…...

企业大模型服务合规指南:深度解析备案与登记制度
伴随AI技术的爆炸式发展,尤其是大模型(LLM)在各行各业的深度应用和整合,企业利用AI技术提升效率、创新服务的步伐不断加快。无论是像DeepSeek这样的前沿技术提供者,还是积极拥抱AI转型的传统企业,在面向公众…...

对象回调初步研究
_OBJECT_TYPE结构分析 在介绍什么是对象回调前,首先要熟悉下结构 以我们上篇线程回调介绍过的导出的PsProcessType 结构为例,用_OBJECT_TYPE这个结构来解析它,0x80处就是今天要介绍的回调链表,但是先不着急,先把目光…...
验证redis数据结构
一、功能验证 1.验证redis的数据结构(如字符串、列表、哈希、集合、有序集合等)是否按照预期工作。 2、常见的数据结构验证方法: ①字符串(string) 测试基本操作 set、get、incr、decr 验证字符串的长度和内容是否正…...