模拟实现消息队列项目(完结) -- 基于MQ的生产者消费者模型
目录
前言
1. 生产者
2. 消费者
3. 启动消息队列服务器
4. 运行效果
结语
前言
在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操作,写一个基于MQ的生产者消费者模型.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!

1. 生产者
我们的生产者就是一个客户端,需要将自己生产出来的消息发送到消息队列中,供消费者进行使用.
我们创建一个生产者,在服务器端创建交换机(直接),队列,然后往对应的队列进行投递消息.
1. 实例化创建连接的工厂类
2. 设置消息队列服务器的IP地址以及端口号
3. 新建一个连接,创建Channel,交换机,队列
4. 新建一个消息转换成字节文件进行发送,此时给线程一个休眠的时间,确保已经发送到消息队列服务器
5. 关闭通道,关闭连接
package com.example.demo.demo;import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;/*** Created with IntelliJ IDEA.* Description:生产者 通常是一个单独的服务器程序* User: YAO* Date: 2023-08-03* Time: 16:06*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}
2. 消费者
消费者也是客户端,所做的前期工作是一样的,只不过是发送的请求不同.
1. 消费者需要进行订阅消息,接收到消息之后,执行回调进行消费消息.
2. 消费者需要循环等待消息队列的响应,等待消费.
package com.example.demo.demo;import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;
/*** Created with IntelliJ IDEA.* Description:消费者 通常是一个单独的服务器程序* User: YAO* Date: 2023-08-03* Time: 16:07*/
public class DemoConsumer {public static void main(String[] args) throws MqException, InterruptedException, IOException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}
3. 启动消息队列服务器
在Spring Boot 项目的启动类中,实例化Broker Server,传入端口号,进行启动服务器.
package com.example.demo;import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import java.io.IOException;@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}
4. 运行效果
1. 服务器启动:

2. 此时如果再重启服务器,会提示数据库已经存在,就会将数据恢复到内存

3. 启动生产者进行投递消息

上述就是按照我们自定义的应用层协议进行发送请求.
我们再来看服务器这边的日志:

4. 启动消费者进行消费消息

我们再来看服务器这边日志

结语
以上就是一个简单的Demo,实现了基于MQ的生产者消费者模型.其他的功能,大家可以在做完这个项目之后自行进行测试.至此这个消息队列的项目就全部完结了,内容还是很多的,希望可以通过这个系列能够帮助到大家去了解消息队列的实现原理.也希望大家能够有所收获,那就到这里吧.接下来就要开始新的项目了(实现论坛系统),又是一个挑战,我们一起加油!❤️
完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇
模拟实现消息队列
https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq
相关文章:
模拟实现消息队列项目(完结) -- 基于MQ的生产者消费者模型
目录 前言 1. 生产者 2. 消费者 3. 启动消息队列服务器 4. 运行效果 结语 前言 在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操…...
专业商城财务一体化-线上商城+进销存管理软件,批发零售全行业免费更新
订货流程繁琐?订单处理效率低?小程序商城与进销存系统不打通?数据需要手动输入同步?财务与的结算对账需要大量手工处理?零售批发从业者,如何你也有以上烦恼,可以看看进销存小程序订货商城&#…...
深度思考mysql面经
推荐 1 索引下推 Mysql性能优化:什么是索引下推? 1.1 定义 索引下推(Index Condition Pushdown,简称 ICP)是一种数据库优化技术。在传统的数据库查询中,数据库首先使用索引检索来找到符合索引条件的行&…...
2023-08-09力扣每日一题
链接: 1281. 整数的各位积和之差 题意: 十进制每一位的积减去每一位的和 解: 十进制位处理 实际代码: #include<iostream> using namespace std; int subtractProductAndSum(int n) {int t11,t20;while(n){t1*n%10;t…...
[23] Instruct 3D-to-3D: Text Instruction Guided 3D-to-3D conversion
本文提出一种3D-to-3D转换方法:Instruct 3D-to-3D;借助预训练的Image-to-Image扩散模型,本文方法可以使各个视角图片的似然最大;本文方法显式地将source 3D场景作为condition,可以有效提升3D连续性和可控性。同时&…...
设计模式行为型——访问者模式
目录 访问者模式的定义 访问者模式的实现 访问者模式角色 访问者模式类图 访问者模式举例 访问者模式代码实现 访问者模式的特点 优点 缺点 使用场景 注意事项 实际应用 访问者模式的定义 访问者模式(Visitor Pattern)属于行为型设计模式&am…...
vue3官网文档学习、复习笔记(快速上手)
目录 2.Attribute 绑定(v-bind) 3.事件监听(v-on) 4.表单绑定(v-model) 5.条件渲染(v-if) 6.列表渲染(v-for) all.value all.value.filter(…...
0基础学习VR全景平台篇 第81篇:全景相机-临云镜如何直播推流
临云镜全景相机是阿里巴巴定制全景设备,实现空间三维信息的快速采集,与阿里云三维空间重建平台搭配,帮助品牌商与平台以较低的成本完成空间的快速采集,并支持对室内/室外空间的三维全景展示及空间漫游,同时支持VR浏览、…...
分数线划定
题目描述 查看题目信息 世博会志愿者的选拔工作正在A 市如火如荼的进行。为了选拔最合适的人才,A 市对所有报名的选手进行了笔试,笔试分数达到面试分数线的选手方可进入面试。 面试分数线根据计划录取人数的150%划定,即如果计划录取m名志愿…...
考研C语言进阶题库——更新26-30题
目录 26.一个正整数,如果等于组成它的各个数字的阶数之和,该整数称为阶乘合数,例如1451阶加四阶加五阶,则145是一个三位阶乘合数,输入一个数,问共有多少个阶乘合数?(十万之内) 27.与2相关的数…...
用C语言实现定积分计算(包括无穷积分/可自定义精度)
关于严谨性的声明: 在用C语言进行定积分的计算之前,我需要声明以下几点: 一、我们所进行定积分计算的函数都是应当是黎曼可积的,这保证了我们即使均匀地分割区间也保证了积分的收敛性。 二、我们同时还应该认识到,鉴…...
使用Presto、Trino数据库时提示“The datetime zone id ‘GMT+08:00‘ is not recognised”
出现这个问题的原因是:Presto、Trino的驱动使用了joda这个库来处理时区的问题。但这个库的编写人似乎对java zone的格式没有太多经验。先看一下出错的代码: com.facebook.presto.jdbc.internal.joda.time.DateTimeZone#forID 根据String类型的zoneId转成…...
C# BeginInvoke 加 EndInvoke实现异步操作
1、定义一个委托 delegate long MyDel(int first, int second); 2、 需异步操作的函数 static int sum(int x,int y) {Console.WriteLine("InSide Sum1");Thread.Sleep(1000);Console.WriteLine("InSide Sum2");return x y;} 3、回调方法…...
“华为杯”研究生数学建模竞赛2015年-【华为杯】B题:数据的多流形结构分析(续)
目录 4.2.2 算法复杂度分析 4.2.3 参数影响 4.2.4 问题 3(a)求解 4.3 问题 3(b) 4.3.1 加权稀疏子空间聚类</...
R语言APSIM模型高级应用及批量模拟
随着数字农业和智慧农业的发展,基于过程的农业生产系统模型在模拟作物对气候变化的响应与适应、农田管理优化、作物品种和株型筛选、农田固碳和温室气体排放等领域扮演着越来越重要的作用。APSIM (Agricultural Production Systems sIMulator)模型是世界知名的作物生…...
【硬件设计】模拟电子基础三--集成运算放大电路
模拟电子基础三--集成运算放大电路 一、集成运算放大器1.1 定义、组成与性能1.2 电流源电路1.3 差动放大电路1.4 理想运算放大器 二、集成运算放大器的应用2.1 反向比例运算电路2.2 同向比例运算电路2.3 反向加法运算电路2.4 反向减法运算电路2.5 积分运算电路2.6 微分运算电路…...
JavaWeb(11)——前端综合案例5(小黑记事本)
一、实例需求 ⌛ 功能需求: ① 列表渲染 ② 删除功能 ③ 添加功能 ④ 底部统计 和 清空任务 二、代码实现 ☕ <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8" /> <meta http-equiv"X-UA-Compa…...
在使用TensorFlow的时候内部报错:内部某个方法或属性不存在
看到TensorFlow内部封装的方法报错的时候,我的第一反应是版本不匹配,立马去搜了对应版本,按照网上给的TensorFlow 2.2.0keras 2.3.1 python 3.7,反反复复安装、卸载、升级、降低版本了很多回还是八行,就在心态快要爆爆…...
dubbo之高可用
负载均衡 概述 负载均衡是指在集群中,将多个数据请求分散到不同的单元上执行,主要是为了提高系统的容错能力和对数据的处理能力。 Dubbo 负载均衡机制是决定一次服务调用使用哪个提供者的服务。 策略 在Dubbo中提供了7中负载均衡策略,默…...
gitee代码扫描js代码,降低复杂度,减少if-else判断的处理方法
把if-else换成如下形式 页面上的代码 <el-button id"btnSave" type"primary" :loading"loadingEdit" click"saveEdit(put,baseSet)"> {{ $t("formLabel.save") }} </el-button> methods代码: // 编…...
Lumafly:让空洞骑士模组管理变得像呼吸一样简单
Lumafly:让空洞骑士模组管理变得像呼吸一样简单 【免费下载链接】Lumafly A cross platform mod manager for Hollow Knight written in Avalonia. 项目地址: https://gitcode.com/gh_mirrors/lu/Lumafly 还在为空洞骑士模组安装的繁琐流程而烦恼吗…...
从自然语言到图形化程序:VI Generator如何重塑LabVIEW开发流程
1. VI Generator:当LabVIEW遇上大模型 第一次听说VI Generator时,我正在调试一个自动化测试平台。客户临时要求增加数据滤波功能,这意味着我又要重复拖拽那些熟悉的While循环和数组操作节点。就在我机械地复制粘贴代码时,同事发来…...
个人 DIY 传动套件开发计划
最近刚忙完电控部分相关的学习工作,终于可以推进自己的 DIY 项目了!之前已经完成了多款减速器的 3D 打印原型验证,涵盖偏心活齿、凸轮活齿、摆线减速器等经典结构。接下来的核心计划,是在现有传动结构的基础上,完成完整…...
抖音批量下载神器:5分钟搞定无水印视频批量下载
抖音批量下载神器:5分钟搞定无水印视频批量下载 【免费下载链接】douyin-downloader A practical Douyin downloader for both single-item and profile batch downloads, with progress display, retries, SQLite deduplication, and browser fallback support. 抖…...
时频分析在隔振与运动控制中的联合应用
1. 时频分析在隔振与运动控制中的核心价值 在精密制造和科研实验中,隔振台和运动台的联合控制是个经典难题。传统方法就像用两种不同的语言描述同一个现象——隔振台习惯用频域的"振动频谱"说话,运动台则偏爱时域的"误差曲线"表达。…...
跨平台图像采集封装头文件: 一行代码切换 Basler / 海康 / Baumer工业相机?
一行代码切换 Basler / 海康 / USB 摄像头? 开源:跨平台图像采集统一头文件来了! “项目要支持三家相机,难道写三套采集逻辑?” “Windows 上跑得好好的,一到 Linux 就崩?” 在工业视觉、机器人…...
从2D照片到3D场景的终极转换:深度实战fSpy相机匹配工具
从2D照片到3D场景的终极转换:深度实战fSpy相机匹配工具 【免费下载链接】fSpy A cross platform app for quick and easy still image camera matching 项目地址: https://gitcode.com/gh_mirrors/fs/fSpy 你是否曾面对一张建筑照片,想要在3D软件…...
Python爬虫新手必看:Image-Downloader搭配ChromeDriver的完整配置指南(附常见报错解决)
Python爬虫实战:Image-Downloader与ChromeDriver的深度配置手册 当你第一次尝试用Python爬取网页图片时,是否曾被各种环境配置问题搞得焦头烂额?作为过来人,我完全理解那种看着满屏报错信息却无从下手的挫败感。本文将带你深入理解…...
netsh interface portproxy实战:Windows本地端口转发与虚拟IP配置全解析
1. 为什么需要Windows本地端口转发? 很多开发者都遇到过这样的场景:你在本地机器上跑了一个Web服务,监听的是127.0.0.1:8080,这时候同一局域网的其他设备想要访问这个服务,直接输入你的IP地址加端口是访问不了的。这是…...
Windsurf的Write和Chat模式怎么选?一篇讲清点数消耗、模型降级和你的真实开发场景
Windsurf编程助手:Write与Chat模式深度选择指南 1. 理解两种模式的核心差异 Windsurf作为新一代AI编程助手,其Write和Chat模式的设计初衷完全不同。Write模式更像是你的代码自动生成器,它能根据上下文快速产出完整代码块;而Chat模…...
