当前位置: 首页 > news >正文

模拟实现消息队列项目(完结) -- 基于MQ的生产者消费者模型

目录

前言

1. 生产者

2. 消费者

3. 启动消息队列服务器

4. 运行效果

 结语


前言

        在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操作,写一个基于MQ的生产者消费者模型.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 生产者

我们的生产者就是一个客户端,需要将自己生产出来的消息发送到消息队列中,供消费者进行使用.

我们创建一个生产者,在服务器端创建交换机(直接),队列,然后往对应的队列进行投递消息.

1. 实例化创建连接的工厂类

2. 设置消息队列服务器的IP地址以及端口号

3. 新建一个连接,创建Channel,交换机,队列

4. 新建一个消息转换成字节文件进行发送,此时给线程一个休眠的时间,确保已经发送到消息队列服务器

5. 关闭通道,关闭连接

package com.example.demo.demo;import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;/*** Created with IntelliJ IDEA.* Description:生产者  通常是一个单独的服务器程序* User: YAO* Date: 2023-08-03* Time: 16:06*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

2. 消费者

消费者也是客户端,所做的前期工作是一样的,只不过是发送的请求不同.

1. 消费者需要进行订阅消息,接收到消息之后,执行回调进行消费消息.

2. 消费者需要循环等待消息队列的响应,等待消费.

package com.example.demo.demo;import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;
/*** Created with IntelliJ IDEA.* Description:消费者  通常是一个单独的服务器程序* User: YAO* Date: 2023-08-03* Time: 16:07*/
public class DemoConsumer {public static void main(String[] args) throws MqException, InterruptedException, IOException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}

3. 启动消息队列服务器

在Spring Boot 项目的启动类中,实例化Broker Server,传入端口号,进行启动服务器.

package com.example.demo;import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import java.io.IOException;@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}

4. 运行效果

1. 服务器启动:

2. 此时如果再重启服务器,会提示数据库已经存在,就会将数据恢复到内存

3. 启动生产者进行投递消息

上述就是按照我们自定义的应用层协议进行发送请求. 

我们再来看服务器这边的日志:

4. 启动消费者进行消费消息 

 我们再来看服务器这边日志


 结语

         以上就是一个简单的Demo,实现了基于MQ的生产者消费者模型.其他的功能,大家可以在做完这个项目之后自行进行测试.至此这个消息队列的项目就全部完结了,内容还是很多的,希望可以通过这个系列能够帮助到大家去了解消息队列的实现原理.也希望大家能够有所收获,那就到这里吧.接下来就要开始新的项目了(实现论坛系统),又是一个挑战,我们一起加油!❤️

        完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

相关文章:

模拟实现消息队列项目(完结) -- 基于MQ的生产者消费者模型

目录 前言 1. 生产者 2. 消费者 3. 启动消息队列服务器 4. 运行效果 结语 前言 在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操…...

专业商城财务一体化-线上商城+进销存管理软件,批发零售全行业免费更新

订货流程繁琐?订单处理效率低?小程序商城与进销存系统不打通?数据需要手动输入同步?财务与的结算对账需要大量手工处理?零售批发从业者,如何你也有以上烦恼,可以看看进销存小程序订货商城&#…...

深度思考mysql面经

推荐 1 索引下推 Mysql性能优化:什么是索引下推? 1.1 定义 索引下推(Index Condition Pushdown,简称 ICP)是一种数据库优化技术。在传统的数据库查询中,数据库首先使用索引检索来找到符合索引条件的行&…...

2023-08-09力扣每日一题

链接&#xff1a; 1281. 整数的各位积和之差 题意&#xff1a; 十进制每一位的积减去每一位的和 解&#xff1a; 十进制位处理 实际代码&#xff1a; #include<iostream> using namespace std; int subtractProductAndSum(int n) {int t11,t20;while(n){t1*n%10;t…...

[23] Instruct 3D-to-3D: Text Instruction Guided 3D-to-3D conversion

本文提出一种3D-to-3D转换方法&#xff1a;Instruct 3D-to-3D&#xff1b;借助预训练的Image-to-Image扩散模型&#xff0c;本文方法可以使各个视角图片的似然最大&#xff1b;本文方法显式地将source 3D场景作为condition&#xff0c;可以有效提升3D连续性和可控性。同时&…...

设计模式行为型——访问者模式

目录 访问者模式的定义 访问者模式的实现 访问者模式角色 访问者模式类图 访问者模式举例 访问者模式代码实现 访问者模式的特点 优点 缺点 使用场景 注意事项 实际应用 访问者模式的定义 访问者模式&#xff08;Visitor Pattern&#xff09;属于行为型设计模式&am…...

vue3官网文档学习、复习笔记(快速上手)

目录 2.Attribute 绑定&#xff08;v-bind&#xff09; 3.事件监听&#xff08;v-on&#xff09; 4.表单绑定&#xff08;v-model&#xff09; 5.条件渲染&#xff08;v-if&#xff09; 6.列表渲染&#xff08;v-for&#xff09; all.value all.value.filter&#xff08;…...

0基础学习VR全景平台篇 第81篇:全景相机-临云镜如何直播推流

临云镜全景相机是阿里巴巴定制全景设备&#xff0c;实现空间三维信息的快速采集&#xff0c;与阿里云三维空间重建平台搭配&#xff0c;帮助品牌商与平台以较低的成本完成空间的快速采集&#xff0c;并支持对室内/室外空间的三维全景展示及空间漫游&#xff0c;同时支持VR浏览、…...

分数线划定

题目描述 查看题目信息 世博会志愿者的选拔工作正在A 市如火如荼的进行。为了选拔最合适的人才&#xff0c;A 市对所有报名的选手进行了笔试&#xff0c;笔试分数达到面试分数线的选手方可进入面试。 面试分数线根据计划录取人数的150%划定&#xff0c;即如果计划录取m名志愿…...

考研C语言进阶题库——更新26-30题

目录 26.一个正整数&#xff0c;如果等于组成它的各个数字的阶数之和&#xff0c;该整数称为阶乘合数&#xff0c;例如1451阶加四阶加五阶&#xff0c;则145是一个三位阶乘合数&#xff0c;输入一个数&#xff0c;问共有多少个阶乘合数&#xff1f;(十万之内) 27.与2相关的数…...

用C语言实现定积分计算(包括无穷积分/可自定义精度)

关于严谨性的声明&#xff1a; 在用C语言进行定积分的计算之前&#xff0c;我需要声明以下几点&#xff1a; 一、我们所进行定积分计算的函数都是应当是黎曼可积的&#xff0c;这保证了我们即使均匀地分割区间也保证了积分的收敛性。 二、我们同时还应该认识到&#xff0c;鉴…...

使用Presto、Trino数据库时提示“The datetime zone id ‘GMT+08:00‘ is not recognised”

出现这个问题的原因是&#xff1a;Presto、Trino的驱动使用了joda这个库来处理时区的问题。但这个库的编写人似乎对java zone的格式没有太多经验。先看一下出错的代码&#xff1a; com.facebook.presto.jdbc.internal.joda.time.DateTimeZone#forID 根据String类型的zoneId转成…...

C# BeginInvoke 加 EndInvoke实现异步操作

1、定义一个委托 delegate long MyDel(int first, int second); 2、 需异步操作的函数 static int sum(int x,int y) {Console.WriteLine("InSide Sum1");Thread.Sleep(1000);Console.WriteLine("InSide Sum2");return x y;} 3、回调方法…...

“华为杯”研究生数学建模竞赛2015年-【华为杯】B题:数据的多流形结构分析(续)

目录 4.2.2 算法复杂度分析 4.2.3 参数影响 4.2.4 问题 3(a)求解 4.3 问题 3(b) 4.3.1 加权稀疏子空间聚类</...

R语言APSIM模型高级应用及批量模拟

随着数字农业和智慧农业的发展&#xff0c;基于过程的农业生产系统模型在模拟作物对气候变化的响应与适应、农田管理优化、作物品种和株型筛选、农田固碳和温室气体排放等领域扮演着越来越重要的作用。APSIM (Agricultural Production Systems sIMulator)模型是世界知名的作物生…...

【硬件设计】模拟电子基础三--集成运算放大电路

模拟电子基础三--集成运算放大电路 一、集成运算放大器1.1 定义、组成与性能1.2 电流源电路1.3 差动放大电路1.4 理想运算放大器 二、集成运算放大器的应用2.1 反向比例运算电路2.2 同向比例运算电路2.3 反向加法运算电路2.4 反向减法运算电路2.5 积分运算电路2.6 微分运算电路…...

JavaWeb(11)——前端综合案例5(小黑记事本)

一、实例需求 ⌛ 功能需求&#xff1a; ① 列表渲染 ② 删除功能 ③ 添加功能 ④ 底部统计 和 清空任务 二、代码实现 ☕ <!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8" /> <meta http-equiv"X-UA-Compa…...

在使用TensorFlow的时候内部报错:内部某个方法或属性不存在

看到TensorFlow内部封装的方法报错的时候&#xff0c;我的第一反应是版本不匹配&#xff0c;立马去搜了对应版本&#xff0c;按照网上给的TensorFlow 2.2.0keras 2.3.1 python 3.7&#xff0c;反反复复安装、卸载、升级、降低版本了很多回还是八行&#xff0c;就在心态快要爆爆…...

dubbo之高可用

负载均衡 概述 负载均衡是指在集群中&#xff0c;将多个数据请求分散到不同的单元上执行&#xff0c;主要是为了提高系统的容错能力和对数据的处理能力。 Dubbo 负载均衡机制是决定一次服务调用使用哪个提供者的服务。 策略 在Dubbo中提供了7中负载均衡策略&#xff0c;默…...

gitee代码扫描js代码,降低复杂度,减少if-else判断的处理方法

把if-else换成如下形式 页面上的代码 <el-button id"btnSave" type"primary" :loading"loadingEdit" click"saveEdit(put,baseSet)"> {{ $t("formLabel.save") }} </el-button> methods代码&#xff1a; // 编…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

谷歌浏览器插件

项目中有时候会用到插件 sync-cookie-extension1.0.0&#xff1a;开发环境同步测试 cookie 至 localhost&#xff0c;便于本地请求服务携带 cookie 参考地址&#xff1a;https://juejin.cn/post/7139354571712757767 里面有源码下载下来&#xff0c;加在到扩展即可使用FeHelp…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

逻辑回归:给不确定性划界的分类大师

想象你是一名医生。面对患者的检查报告&#xff08;肿瘤大小、血液指标&#xff09;&#xff0c;你需要做出一个**决定性判断**&#xff1a;恶性还是良性&#xff1f;这种“非黑即白”的抉择&#xff0c;正是**逻辑回归&#xff08;Logistic Regression&#xff09;** 的战场&a…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件&#xff0c;然后打开终端&#xff0c;进入下载文件夹&#xff0c;键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/

使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题&#xff1a;docker pull 失败 网络不同&#xff0c;需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...