2024.2.29 模拟实现 RabbitMQ —— 项目展示
目录
项目介绍
核心功能
核心技术
演示直接交换机
演示扇出交换机
演示主题交换机
项目介绍
- 此处我们模拟 RabbitMQ 实现了一个消息队列服务器
核心功能
- 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理
- 九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、创建绑定、解除绑定、发布消息、订阅消息、确认消息
- 实现了三种典型的消息转换方式 直接交换机(Direct)、扇出交换机(Fanout)、主题交换机(Topic)
- 交换机、队列、绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
- 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作
核心技术
- Spring Boot / MyBatis / Lombok
- SQLite
- TCP
- 关于该项目的需求分析,可点击下方链接跳转
模拟实现 RabbitMQ —— 需求分析
- 关于该项目的核心类,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现核心类
- 关于该项目的数据库操作,可点击下方链接跳转
模拟实现 RabbitMQ —— 数据库操作
- 关于该项目的消息持久化,可点击下方链接跳转
模拟实现 RabbitMQ —— 消息持久化
- 关于该项目的内存数据管理,可点击下方链接跳转
模拟实现 RabbitMQ —— 内存数据管理
- 关于该项目的虚拟机设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 虚拟主机设计
- 关于该项目的交换机转发规则,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现转发规则
- 关于该项目的消费逻辑,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现消费消息逻辑
- 关于该项目网络通信设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 网络通信设计(服务器)
模拟实现 RabbitMQ —— 网络通信设计(客户端)
演示直接交换机
- 简单写一个 demo 模拟 跨主机的生产者消费者模型
- 此处为了方便,就在本机演示
- 此处我们创建的交换机类型为 直接交换机
1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!
@SpringBootApplication public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();} }
2、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/* * 这个类表示一个消费者 * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
演示扇出交换机
- 此处我们创建的交换机类型为 扇出交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者A 代码
/** 这个类表示一个消费者A* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerA {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue1",true,false,false,null); // 设置绑定channel.queueBind("testQueue1","testExchange",""); // 订阅消息channel.basicConsume("testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、编写消费者B 代码
/** 这个类表示一个消费者B* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerB {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue2",true,false,false,null); // 设置绑定channel.queueBind("testQueue2","testExchange",""); // 订阅消息channel.basicConsume("testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
5、启动 Spring Boot 项目(启动 Broker Server)
6、运行消费者A 代码
7、运行消费者B 代码
8、运行生产者代码
9、继续观察消费者A 的控制台
10、继续观察消费者B 的控制台
演示主题交换机
- 此处我们创建的交换机为 主题交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建消息A 并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);// 创建消息B 并发送body = "hi".getBytes();ok = channel.basicPublish("testExchange","aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/** 这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null); // 创建队列channel.queueDeclare("testQueue",true,false,false,null); // 设置绑定channel.queueBind("testQueue","testExchange","*.aaa.bbb"); // 订阅消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}}); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
相关文章:
2024.2.29 模拟实现 RabbitMQ —— 项目展示
目录 项目介绍 核心功能 核心技术 演示直接交换机 演示扇出交换机 演示主题交换机 项目介绍 此处我们模拟 RabbitMQ 实现了一个消息队列服务器 核心功能 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、…...
React htmlfor
注意,在添加属性时, class 属性需要写成 className ,for 属性需要写成 htmlFor ,这是因为 class 和 for 是 JavaScript 的保留字。 在React中,当我们需要为一个表单元素设置标签时,可以使用htmlFor属性。它…...
现代化数据架构升级:毫末智行自动驾驶如何应对年增20PB的数据规模挑战?
毫末智行是一家致力于自动驾驶的人工智能技术公司,其前身是长城汽车智能驾驶前瞻分部,以零事故、零拥堵、自由出行和高效物流为目标,助力合作伙伴重塑和全面升级整个社会的出行及物流方式。 在自动驾驶领域中,是什么原因让毫末智行…...
理解Stable Diffusion、LoRA、Dreambooth、Hypernetworks、Textual Inversion、Checkpoint
前言 在深度学习和人工智能的领域中,模型生成和调整技术的快速发展为创造性内容的自动化提供了新的可能性。本文将介绍四种重要的模型技术——Stable Diffusion、LoRA、Dreambooth、和Hypernetworks——它们在生成艺术、个性化模型调整和网络结构设计方面各自的特点…...
spring boot3登录开发-2(1图形验证码接口实现)
⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 🌊山高路远,行路漫漫,终有归途。 目录 前置条件 内容简介 图形验证码接口实现 导入糊涂工具依赖 接口分析 编写验证码接口 测试验证码接口 前置条件 …...
网络编程中的问题总结
1、服务端重启后bind失败,因为TCP 套接字状态 TIME_WAIT 引起,该状态在套接字关闭后约保留 2 到 4 分钟。在 TIME_WAIT 状态退出之后,套接字被删除,该地址才能被重新绑定而不出问题。可以通过setsockopt()设置Socket描述符的选项S…...
数据结构-关键路径
介绍 在AOV网的基础上,如果用对应边来表示活动持续时间,这种有向图被称为AOE网在AOE网中,入度为0的为源点,出度为0的为汇点,整张网看做是一件事情完成的过程,那么这两个点就是事情的开始和结束。每个活动持…...
进程间通信学习笔记(共享内存)
内存映射概念: 共享内存可以通过mmap()映射普通文件使一个磁盘文件与内存中的一个缓冲区相映射,进程可以像访问普通文件一样对文件进行访问,不必再强调read,write。 mmap的优点: 实现了用户空间和内核空间的高效交互方式 mmap的…...
ChatGPT学习第三周
📖 学习目标 ChatGPT在各行各业的应用 探索ChatGPT在不同领域(如教育、客户服务等)的实际应用案例。 ChatGPT的局限性和挑战 讨论ChatGPT面临的挑战,包括偏见、误解及其限制。 ✍️ 学习活动 学习资料 《人工智能通用大模型(…...
R语言混合效应(多水平/层次/嵌套)模型及贝叶斯实现技术应用
回归分析是科学研究中十分重要的数据分析工具。随着现代统计技术发展,回归分析方法得到了极大改进。混合效应模型(Mixed effect model),即多水平模(Multilevel model)/分层模型(Hierarchical Model)/嵌套模…...
[C++]使用C++部署yolov9的tensorrt模型进行目标检测
部署YOLOv9的TensorRT模型进行目标检测是一个涉及多个步骤的过程,主要包括准备环境、模型转换、编写代码和模型推理。 首先,确保你的开发环境已安装了NVIDIA的TensorRT。TensorRT是一个用于高效推理的SDK,它能对TensorFlow、PyTorch等框架训…...
eureka注册中心做了哪些事情/原理?
1.服务注册: 将eureka client发送过来的元数据存储到注册表中 2.服务续约: eureka client默认会每30秒向eureka server发送一次心跳来进行服务续约,通过这一行动来表示自己没有出现故障; 3.服务…...
c语言经典测试题4
1.题1 #include <stdio.h>//没有break的话,输入什么都会往下一直执行下去,而且default在最后就会全都执行 int main() {char c;int v0 0, v1 0, v2 0;do{switch (c getchar())// 输入ADescriptor{casea:caseA:casee:caseE:casei:caseI:caseo:…...
设计模式(五)-观察者模式
前言 实际业务开发过程中,业务逻辑可能非常复杂,核心业务 N 个子业务。如果都放到一块儿去做,代码可能会很长,耦合度不断攀升,维护起来也麻烦,甚至头疼。还有一些业务场景不需要在一次请求中同步完成&…...
MySQL-七种SQL优化
一、插入数据 普通插入: 采用批量插入(一次插入的数据不建议超过1000条) insert into tb_test values(1,Tom),(3, Cat),(3, Jerry)....手动提交事务 start transaction; insert into tb_test values(1,Tom),(3, Cat),(3, Jerry); insert …...
针对Umi、React中遇到的 “xxxx”不能用作 JSX 组件 问题解决方案
一、处理方案 这是因为"types/react"、"types/react-dom"在子依赖中使用的版本不一致导致,一般情况npm会自动帮我们处理版本不一致的问题。如果npm处理不了,就需要我们自己手动处理在package.json中添加一项配置 {name:"test&…...
蓝桥杯备战刷题one(自用)
1.被污染的支票 #include <iostream> #include <vector> #include <map> #include <algorithm> using namespace std; int main() {int n;cin>>n;vector<int>L;map<int,int>mp;bool ok0;int num;for(int i1;i<n;i){cin>>nu…...
设计模式(十) - 工厂方式模式
前言 在此前的设计模式(四)简单工厂模式中我们介绍了简单工厂模式,在这篇文章中我们来介绍下工厂方法模式,它同样是创建型设计模式,而且又有些类似,文章的末尾会介绍他们之间的不同。 1.工厂方法模式简介 …...
http协议基础与Apache的简单介绍
一、相关介绍: 互联网:是网络的网络,是所有类型网络的母集因特网:世界上最大的互联网网络。即因特网概念从属于互联网概念。习惯上,大家把连接在因特网上的计算机都成为主机。万维网:WWW(world…...
RabbitMQ的死信队列和延迟队列
文章目录 死信队列如何配置死信队列死信队列的应用场景Spring Boot实现RabbitMQ的死信队列 延迟队列方案优劣:延迟队列的实现有两种方式: 死信队列 1)“死信”是RabbitMQ中的一种消息机制。 2)消息变成死信,可能是由于…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
规则与人性的天平——由高考迟到事件引发的思考
当那位身着校服的考生在考场关闭1分钟后狂奔而至,他涨红的脸上写满绝望。铁门内秒针划过的弧度,成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定",构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...
基于单片机的宠物屋智能系统设计与实现(论文+源码)
本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢,连接红外测温传感器,可实时精准捕捉宠物体温变化,以便及时发现健康异常;水位检测传感器时刻监测饮用水余量,防止宠物…...











