当前位置: 首页 > news >正文

参考RabbitMQ实现一个消息队列

文章目录

  • 前言
  • 小小消息管家
    • 1.项目介绍
    • 2. 需求分析
      • 2.1 API
      • 2.2 消息应答
      • 2.3 网络通信协议设计
    • 3. 开发环境
    • 4. 项目结构介绍
      • 4.1 配置信息
    • 5. 项目演示

前言

消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填谷

在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大,如果A或者B服务器挂了,我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递,就降低了耦合度。

同样的,如果我们AB服务器直接进行通信,如果A服务器突然发送许多请求,我们B服务器也会收到巨多请求的影响,AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列,消息队列可以将许多请求接收存储下来,B服务器依然可以按照原有节奏取请求,不会一下子接收大量请求。这也就是我们所说的削峰填谷,也是类似的,就算请求过少,我们的服务器依然可以从消息队列中取出挤压得请求。

在分布式系统中,跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序,并且新增一些功能。这样的程序我们就称为 消息队列

所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。

小小消息管家

1.项目介绍

将消息队列分成服务器模块、客户端模块、公共模块来实现。

  1. 服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘(数据库和文件)以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。
  2. 客户端模块:实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。
  3. 公共模块:约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。

在这里插入图片描述

2. 需求分析

由于是实现一个生产者消费者模型,在消息队列中我们要实现的逻辑如下:我们的生产者客户端向服务器发送消息,消费者客户端向服务器订阅消息,服务器负责消息的存储和转发。

在这里插入图片描述

概念解读:

  • 虚拟主机 (VirtualHost):是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange):生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。
  • 队列 (Queue):真正用来存储消息,每个消费者决定自己从哪个 Queue 上读取消息。
  • 绑定 (Binding):Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关
    系。
  • 消息 (Message): 传递的内容。

2.1 API

我们的服务器提供以下API是西安消息队列的基本功能。

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

我们的客户端提供以下API供客户使用消息队列,为了复用TCP连接,我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

2.2 消息应答

被消费者消费的消息,需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式:

  1. 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
  2. 手动应答:消费者手动调用应答接口(确认消息),Broker收到应答请求后删除这个消息。

2.3 网络通信协议设计

我们使用TCP 协议,来作为通信的底层协议。在这个基础上自定义应用层协议,实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能,length为消息的长度,payload为消息的具体内容。

请求和响应的格式如下:
在这里插入图片描述

其中 type取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客户端推送的消息(被订阅的消息)

其中 payload 部分,会根据不同的 type,存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息,我们定义对应的类实现。
对于响应,payload 表示这次方法调用的返回值。

3. 开发环境

数据库:SQLite
开发语言:Java
技术框架:SpringBoot、SpringMVC、Mybatis
管理工具:Maven
开发工具:Intellij IDEA 2020.1.4
操作系统:Windows10

4. 项目结构介绍

在这里插入图片描述

  • common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式;创建自定义异常类;实现序列化反序列化;定义消费者以及处理消息调用的函数接口。
  • demo 创建了一个消费者和生产者用于测试项目。
  • mqclient 客户端模块,提供创建连接的工厂类;定义完整连接的内容;定义channel实现客户端api
  • mqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。
  • mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。
  • mqserver.mapper:实现对sqlite数据库的操作。

4.1 配置信息

由于我们对于数据库的存储只涉及一小部分,所以此处我们利用sqlite进行数据管理。

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.14</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>mq</artifactId><version>0.0.1-SNAPSHOT</version><name>mq</name><description>mq</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc --><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.41.0.1</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter-test</artifactId><version>2.3.1</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

application.yml: 配置数据库信息
在这里插入图片描述

5. 项目演示

创建一个生产者客户端向服务器发送一个消息:

package com.example.mq.demo;import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** 这个类用来表示一个生产者.*  通常这是一个单独的服务器程序.*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");//创建一个连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);//得到逻辑链接,这个就类似于socketConnection connection = factory.newConnection();Channel channel = connection.createChannel();// 通过逻辑连接创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello 欢迎消费消息".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

创建一个消费者客户端,订阅消费消息

package com.example.mq.demo;import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;import java.io.IOException;/*** @author zq* @date 2023-08-05 19:29*//** 这个类表示一个消费者.* 通常这个类也应该是在一个独立的服务器中被执行*/
public class DemoConsumer {public static void main(String[] args) throws IOException, MqException, InterruptedException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);//订阅消息,并定义如何消费消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 通过这个循环模拟一直等待消费完生产者生产的所有消息while (true) {Thread.sleep(500);}}
}

实现结果如下 :

在这里插入图片描述
在这里插入图片描述

通过结果我们可以看出,我们消费者成功取出订阅的队列中的消息进行消费

相关文章:

参考RabbitMQ实现一个消息队列

文章目录 前言小小消息管家1.项目介绍2. 需求分析2.1 API2.2 消息应答2.3 网络通信协议设计 3. 开发环境4. 项目结构介绍4.1 配置信息 5. 项目演示 前言 消息队列的本质就是阻塞队列&#xff0c;它的最大用途就是用来实现生产者消费者模型&#xff0c;从而实现解耦合以及削峰填…...

SpringBoot+JWT

一、maven坐标 <!-- JWT依赖 --><dependency><groupId>io.jsonwebtoken</groupId><artifactId>jjwt</artifactId><version>0.9.1</version></dependency><dependency><groupId>com.auth0</groupId>&…...

Cad二次开发EqualPoint

在 CAD 软件的二次开发中&#xff0c;Tolerance.Global.EqualPoint 是一个特定的属性或方法&#xff0c;用于表示全局的相等性公差值。这个属性或方法通常是由 CAD 软件的开发平台或 API 提供的&#xff0c;用于处理浮点数的相等性比较。 具体来说&#xff0c;Tolerance.Globa…...

20230806将ASF格式的视频转换为MP4

20230806将ASF格式的视频转换为MP4 2023/8/6 18:47 缘起&#xff0c;自考中山大学的《计算机网络》&#xff0c;考试《数据库系统原理》的时候找到视频&#xff0c;由于个人的原因&#xff0c;使用字幕更加有学习效率&#xff01; 由于【重型】的PR2023占用资源较多&#xff0c…...

【MySQL】——常用接口API即相关函数说明

目录 1、MySQL结构体的说明 1、MYSQL结构体 2.MYSQL_RES结构体 3. MYSQL_FIELD 2. 接口的使用步骤 3、mysql_init()——MYSQL对象初始化 4、mysql_real_connect()——数据库引擎建立连接 5. mysql_query()——查询数据库某表内容 6、mysql_real_query——执行SQL语句 …...

ts + axios + useRequest (ahooks)—— 实现请求封装

现在越来越多的项目开始ts化&#xff0c;我们今天就一块学习一下&#xff0c;关于ts的请求封装。 首先要安装两个依赖&#xff1a; npm i axios -S npm i ahooks -S 引入&#xff1a; import { useRequest } from ahooks; import axios, { AxiosRequestConfig, AxiosRespo…...

Springboot @Validated注解详细说明

在Spring Boot中&#xff0c;Validated注解用于验证请求参数。它可以应用在Controller类或方法上 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-validation</artifactId> </depen…...

STM32初学者,到底选标准库还是HAL库?

当初学者尝试学习STM32开发时&#xff0c;通常会面临一个关键的选择&#xff1a;是选择STM32的标准库&#xff0c;还是HAL库&#xff1f;这两个库各自有着优势与适用场景&#xff0c;本文将从多个角度分析&#xff0c;帮助初学者更好地选择适合自己的库。 在开始之前&#xff…...

小学生作业随机加减乘除运算计算习题答案 html源码

小学生作业随机加减乘除运算计算习题答案 html源码 这道题目提供了多种选项,包括运算符和输入的运算数范围。题目数量也可以选择。如果你选择好了选项,就可以点击出题按钮进行练习。 为了方便,题目答案可以打印出来。但是,如果隐藏了横线,就会去除等号后面的下划线。推荐使用…...

nvm下载安装配置

一、作用 nvm是node版本管理的工具&#xff0c;具有管理、下载、切换node版本等能力。经常不同项目需要依赖不同版本的node&#xff0c;此时nvm就能有效的解决node版本切换的问题。 二、nvm下载安装配置 &#xff08;1&#xff09;安装包地址 https://github.com/coreybutl…...

2023-08-07力扣每日一题

链接&#xff1a; 344. 反转字符串 题意&#xff1a; 如题 解&#xff1a; 初级算法做过的题啊-感觉这几天重复题还蛮多的 实际代码&#xff1a; #include<iostream> #include<vector> #include<algorithm> using namespace std; /* void reverseStri…...

uni——不规则tab切换(skew)

案例展示 案例代码 <!-- 切换栏 --> <view class"tabBoxs"><view class"tabBox"><block v-for"(item,index) in tabList" :key"index"><view class"tabItem":class"current item.id&…...

Docker安装Grafana以及Grafana应用

Doker基础 安装 1、 卸载旧的版本 sudo yum remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-engine 2、需要的安装包 sudo yum install -y yum-utils 3、设置镜像的仓库 yum-config-m…...

OpenSource - 分布式重试平台

文章目录 概述重试方案对比设计思想流量管理平台预览场景应用强通知场景发送MQ场景回调场景异步场景 概述 在当前广泛流行的分布式系统中&#xff0c;确保系统数据的一致性和正确性是一项重大挑战。为了解决分布式事务问题&#xff0c;涌现了许多理论和业务实践&#xff0c;其…...

oracle稳定执行计划

二、稳定执行计划 &#xff08;一&#xff09;sql profile的好处 稳定执行计划 在不能修改目标sql的sql文本的情况下使目标sql语句按照指定的执行计划运行。 1、automatic类型的sql profile 本质是针对目标sql的一些额外的调整信息&#xff0c;这些额外的调整信息需要与原目标s…...

docker安装neo4j

参考文章&#xff1a; 1、Mac 本地以 docker 方式配置 neo4j_neo4j mac docker_Abandon_first的博客-CSDN博客 2、https://www.cnblogs.com/caoyusang/p/13610408.html 安装的时候&#xff0c;参考了以上文章。遇到了一些问题&#xff0c;记录下自己的安装过程&#xff1a; …...

第十五章 定义 HL7 的 DTL 数据转换

文章目录 第十五章 定义 HL7 的 DTL 数据转换 第十五章 定义 HL7 的 DTL 数据转换 每个接口可能需要一定数量的数据转换。创建转换时&#xff0c;不要使用保留的包名称。 重要提示&#xff1a;请勿在数据转换中手动更改 HL7 转义序列&#xff1b;自动处理这些。 可以使用“数…...

【笔记】移动光猫改桥接

1. 登录后台 移动光猫的超管和密码&#xff08;百度的&#xff09; 账号&#xff1a;CMCCAdmin 密码&#xff1a;aDm8H%MdA 浏览器访问 192.168.1.1 并登录 2. 选择连接 点击“网络”&#xff0c;在“连接名称”下拉框选择 INTENET_R_VID 字样的连接&#xff0c;并截图备…...

网络安全进阶学习第十四课——MSSQL注入

文章目录 一、MSsql数据库二、MSsql结构三、MSsql重点表1、master 数据库中的Sysdatabases 表2、Sysobjects 表3、Syscolumns 表 四、Mssql常用函数五、Mssql的报错注入六、Mssql的盲注常用以下函数进行盲注&#xff1a; 七、联合注入1、获取当前表的列数2、获取当前数据库名3、…...

【C语言】初阶结构体

&#x1f388;个人主页&#xff1a;库库的里昂 &#x1f390;CSDN新晋作者 &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 ✨收录专栏&#xff1a;C语言初阶 ✨其他专栏&#xff1a;代码小游戏 &#x1f91d;希望作者的文章能对你有所帮助&#xff0c;有不足的地方请在评论…...

AMD Ryzen SDT调试工具:突破性实战指南,让你的处理器性能飙升200%

AMD Ryzen SDT调试工具&#xff1a;突破性实战指南&#xff0c;让你的处理器性能飙升200% 【免费下载链接】SMUDebugTool A dedicated tool to help write/read various parameters of Ryzen-based systems, such as manual overclock, SMU, PCI, CPUID, MSR and Power Table. …...

AI赋能编辑器:借助快马为Notepad++理念添加智能编程助手

今天想和大家分享一个有趣的实践&#xff1a;如何为传统代码编辑器&#xff08;比如Notepad&#xff09;注入AI能力。虽然Notepad本身轻量高效&#xff0c;但缺乏现代智能辅助功能。通过结合InsCode(快马)平台的AI能力&#xff0c;我们可以轻松实现智能补全、错误检查和代码优化…...

Sulpho-Methyltetrazine-NHS ester,磺化甲基四嗪-琥珀酰亚胺酯的结构特点与功能

Sulpho-Methyltetrazine-NHS ester 是一种结合了磺酸基团、甲基四嗪和 NHS 酯三大功能模块的化学试剂&#xff0c;在生物化学和药物研发等领域具有广泛应用。以下是对其详细介绍&#xff1a;一、基本信息英文名称&#xff1a;Sulpho-Methyltetrazine-NHS ester&#xff08;或 S…...

实战应用:基于快马平台开发排序算法性能对比分析工具

今天想和大家分享一个特别实用的工具开发经历——用InsCode(快马)平台快速搭建了一个排序算法性能对比分析工具。这个项目不仅帮我巩固了算法知识&#xff0c;还意外发现了很多实际应用中的细节问题&#xff0c;特别适合用来理解不同排序算法的实战表现。 1. 为什么需要这个工…...

iptables实战指南:从链表关系到规则配置的完整解析

1. iptables基础概念与核心组件 第一次接触iptables时&#xff0c;我盯着那些复杂的规则配置看了整整一个下午。后来才发现&#xff0c;理解iptables的关键在于掌握它的"四表五链"架构。简单来说&#xff0c;iptables就像是一个多层安检系统&#xff0c;数据包要经过…...

Pixel Fashion Atelier快速上手:非对称RPG菜单布局与像素按键交互详解

Pixel Fashion Atelier快速上手&#xff1a;非对称RPG菜单布局与像素按键交互详解 1. 项目概览 Pixel Fashion Atelier是一款基于Stable Diffusion与Anything-v5的图像生成工作站&#xff0c;它彻底改变了传统AI工具的界面设计理念。这款工具将复古日系RPG游戏的"明亮城…...

WSL2下USB串口设备‘失踪’?手把手教你找回/dev/ttyUSB0(以Quectel模块为例)

WSL2下USB串口设备消失的终极解决方案&#xff1a;从原理到实战 最近在WSL2环境下调试Quectel模块时&#xff0c;发现一个奇怪现象&#xff1a;lsusb明明能识别设备&#xff0c;但/dev/ttyUSB0却神秘失踪。这让我想起去年调试树莓派时遇到的类似问题&#xff0c;但WSL2的环境特…...

ContextMenuManager:让Windows交互回归高效本质

ContextMenuManager&#xff1a;让Windows交互回归高效本质 【免费下载链接】ContextMenuManager &#x1f5b1;️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 当你在Windows系统中右键点击文件时&#xff0c;是否…...

S2-Pro+C语言教学系统:代码逻辑讲解与典型错误自动纠正

S2-ProC语言教学系统&#xff1a;代码逻辑讲解与典型错误自动纠正 1. 智能编程助教初体验 第一次看到S2-Pro在C语言教学中的应用效果时&#xff0c;确实让人眼前一亮。想象一下&#xff0c;当学生提交一段指针运算代码后&#xff0c;系统不仅能指出错误&#xff0c;还能像经验…...

告别“差不多就行”:用Cascade R-CNN解决目标检测中那些“似对非对”的边界框

从边界框“模糊地带”到工业级精度&#xff1a;Cascade R-CNN实战全解析 当你在自动驾驶系统中看到车辆识别框与真实车身存在5个像素的偏移&#xff0c;或在工业质检场景中某个关键缺陷的检测框刚好漏掉了1毫米的裂纹区域&#xff0c;这些“看似正确实则不准”的预测结果&#…...