2024.2.29 模拟实现 RabbitMQ —— 项目展示
目录
项目介绍
核心功能
核心技术
演示直接交换机
演示扇出交换机
演示主题交换机
项目介绍
- 此处我们模拟 RabbitMQ 实现了一个消息队列服务器
核心功能
- 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理
- 九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、创建绑定、解除绑定、发布消息、订阅消息、确认消息
- 实现了三种典型的消息转换方式 直接交换机(Direct)、扇出交换机(Fanout)、主题交换机(Topic)
- 交换机、队列、绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
- 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作
核心技术
- Spring Boot / MyBatis / Lombok
- SQLite
- TCP
- 关于该项目的需求分析,可点击下方链接跳转
模拟实现 RabbitMQ —— 需求分析
- 关于该项目的核心类,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现核心类
- 关于该项目的数据库操作,可点击下方链接跳转
模拟实现 RabbitMQ —— 数据库操作
- 关于该项目的消息持久化,可点击下方链接跳转
模拟实现 RabbitMQ —— 消息持久化
- 关于该项目的内存数据管理,可点击下方链接跳转
模拟实现 RabbitMQ —— 内存数据管理
- 关于该项目的虚拟机设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 虚拟主机设计
- 关于该项目的交换机转发规则,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现转发规则
- 关于该项目的消费逻辑,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现消费消息逻辑
- 关于该项目网络通信设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 网络通信设计(服务器)
模拟实现 RabbitMQ —— 网络通信设计(客户端)
演示直接交换机
- 简单写一个 demo 模拟 跨主机的生产者消费者模型
- 此处为了方便,就在本机演示
- 此处我们创建的交换机类型为 直接交换机
1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!
@SpringBootApplication public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();} }
2、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/* * 这个类表示一个消费者 * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
演示扇出交换机
- 此处我们创建的交换机类型为 扇出交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者A 代码
/** 这个类表示一个消费者A* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerA {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue1",true,false,false,null); // 设置绑定channel.queueBind("testQueue1","testExchange",""); // 订阅消息channel.basicConsume("testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、编写消费者B 代码
/** 这个类表示一个消费者B* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerB {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue2",true,false,false,null); // 设置绑定channel.queueBind("testQueue2","testExchange",""); // 订阅消息channel.basicConsume("testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
5、启动 Spring Boot 项目(启动 Broker Server)
6、运行消费者A 代码
7、运行消费者B 代码
8、运行生产者代码
9、继续观察消费者A 的控制台
10、继续观察消费者B 的控制台
演示主题交换机
- 此处我们创建的交换机为 主题交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建消息A 并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);// 创建消息B 并发送body = "hi".getBytes();ok = channel.basicPublish("testExchange","aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/** 这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null); // 创建队列channel.queueDeclare("testQueue",true,false,false,null); // 设置绑定channel.queueBind("testQueue","testExchange","*.aaa.bbb"); // 订阅消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}}); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
相关文章:

2024.2.29 模拟实现 RabbitMQ —— 项目展示
目录 项目介绍 核心功能 核心技术 演示直接交换机 演示扇出交换机 演示主题交换机 项目介绍 此处我们模拟 RabbitMQ 实现了一个消息队列服务器 核心功能 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、…...
React htmlfor
注意,在添加属性时, class 属性需要写成 className ,for 属性需要写成 htmlFor ,这是因为 class 和 for 是 JavaScript 的保留字。 在React中,当我们需要为一个表单元素设置标签时,可以使用htmlFor属性。它…...

现代化数据架构升级:毫末智行自动驾驶如何应对年增20PB的数据规模挑战?
毫末智行是一家致力于自动驾驶的人工智能技术公司,其前身是长城汽车智能驾驶前瞻分部,以零事故、零拥堵、自由出行和高效物流为目标,助力合作伙伴重塑和全面升级整个社会的出行及物流方式。 在自动驾驶领域中,是什么原因让毫末智行…...
理解Stable Diffusion、LoRA、Dreambooth、Hypernetworks、Textual Inversion、Checkpoint
前言 在深度学习和人工智能的领域中,模型生成和调整技术的快速发展为创造性内容的自动化提供了新的可能性。本文将介绍四种重要的模型技术——Stable Diffusion、LoRA、Dreambooth、和Hypernetworks——它们在生成艺术、个性化模型调整和网络结构设计方面各自的特点…...

spring boot3登录开发-2(1图形验证码接口实现)
⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 🌊山高路远,行路漫漫,终有归途。 目录 前置条件 内容简介 图形验证码接口实现 导入糊涂工具依赖 接口分析 编写验证码接口 测试验证码接口 前置条件 …...
网络编程中的问题总结
1、服务端重启后bind失败,因为TCP 套接字状态 TIME_WAIT 引起,该状态在套接字关闭后约保留 2 到 4 分钟。在 TIME_WAIT 状态退出之后,套接字被删除,该地址才能被重新绑定而不出问题。可以通过setsockopt()设置Socket描述符的选项S…...

数据结构-关键路径
介绍 在AOV网的基础上,如果用对应边来表示活动持续时间,这种有向图被称为AOE网在AOE网中,入度为0的为源点,出度为0的为汇点,整张网看做是一件事情完成的过程,那么这两个点就是事情的开始和结束。每个活动持…...

进程间通信学习笔记(共享内存)
内存映射概念: 共享内存可以通过mmap()映射普通文件使一个磁盘文件与内存中的一个缓冲区相映射,进程可以像访问普通文件一样对文件进行访问,不必再强调read,write。 mmap的优点: 实现了用户空间和内核空间的高效交互方式 mmap的…...

ChatGPT学习第三周
📖 学习目标 ChatGPT在各行各业的应用 探索ChatGPT在不同领域(如教育、客户服务等)的实际应用案例。 ChatGPT的局限性和挑战 讨论ChatGPT面临的挑战,包括偏见、误解及其限制。 ✍️ 学习活动 学习资料 《人工智能通用大模型(…...

R语言混合效应(多水平/层次/嵌套)模型及贝叶斯实现技术应用
回归分析是科学研究中十分重要的数据分析工具。随着现代统计技术发展,回归分析方法得到了极大改进。混合效应模型(Mixed effect model),即多水平模(Multilevel model)/分层模型(Hierarchical Model)/嵌套模…...

[C++]使用C++部署yolov9的tensorrt模型进行目标检测
部署YOLOv9的TensorRT模型进行目标检测是一个涉及多个步骤的过程,主要包括准备环境、模型转换、编写代码和模型推理。 首先,确保你的开发环境已安装了NVIDIA的TensorRT。TensorRT是一个用于高效推理的SDK,它能对TensorFlow、PyTorch等框架训…...
eureka注册中心做了哪些事情/原理?
1.服务注册: 将eureka client发送过来的元数据存储到注册表中 2.服务续约: eureka client默认会每30秒向eureka server发送一次心跳来进行服务续约,通过这一行动来表示自己没有出现故障; 3.服务…...

c语言经典测试题4
1.题1 #include <stdio.h>//没有break的话,输入什么都会往下一直执行下去,而且default在最后就会全都执行 int main() {char c;int v0 0, v1 0, v2 0;do{switch (c getchar())// 输入ADescriptor{casea:caseA:casee:caseE:casei:caseI:caseo:…...
设计模式(五)-观察者模式
前言 实际业务开发过程中,业务逻辑可能非常复杂,核心业务 N 个子业务。如果都放到一块儿去做,代码可能会很长,耦合度不断攀升,维护起来也麻烦,甚至头疼。还有一些业务场景不需要在一次请求中同步完成&…...
MySQL-七种SQL优化
一、插入数据 普通插入: 采用批量插入(一次插入的数据不建议超过1000条) insert into tb_test values(1,Tom),(3, Cat),(3, Jerry)....手动提交事务 start transaction; insert into tb_test values(1,Tom),(3, Cat),(3, Jerry); insert …...
针对Umi、React中遇到的 “xxxx”不能用作 JSX 组件 问题解决方案
一、处理方案 这是因为"types/react"、"types/react-dom"在子依赖中使用的版本不一致导致,一般情况npm会自动帮我们处理版本不一致的问题。如果npm处理不了,就需要我们自己手动处理在package.json中添加一项配置 {name:"test&…...

蓝桥杯备战刷题one(自用)
1.被污染的支票 #include <iostream> #include <vector> #include <map> #include <algorithm> using namespace std; int main() {int n;cin>>n;vector<int>L;map<int,int>mp;bool ok0;int num;for(int i1;i<n;i){cin>>nu…...

设计模式(十) - 工厂方式模式
前言 在此前的设计模式(四)简单工厂模式中我们介绍了简单工厂模式,在这篇文章中我们来介绍下工厂方法模式,它同样是创建型设计模式,而且又有些类似,文章的末尾会介绍他们之间的不同。 1.工厂方法模式简介 …...

http协议基础与Apache的简单介绍
一、相关介绍: 互联网:是网络的网络,是所有类型网络的母集因特网:世界上最大的互联网网络。即因特网概念从属于互联网概念。习惯上,大家把连接在因特网上的计算机都成为主机。万维网:WWW(world…...

RabbitMQ的死信队列和延迟队列
文章目录 死信队列如何配置死信队列死信队列的应用场景Spring Boot实现RabbitMQ的死信队列 延迟队列方案优劣:延迟队列的实现有两种方式: 死信队列 1)“死信”是RabbitMQ中的一种消息机制。 2)消息变成死信,可能是由于…...
基于算法竞赛的c++编程(28)结构体的进阶应用
结构体的嵌套与复杂数据组织 在C中,结构体可以嵌套使用,形成更复杂的数据结构。例如,可以通过嵌套结构体描述多层级数据关系: struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...

Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...

图表类系列各种样式PPT模版分享
图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

Razor编程中@Html的方法使用大全
文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...

渗透实战PortSwigger靶场:lab13存储型DOM XSS详解
进来是需要留言的,先用做简单的 html 标签测试 发现面的</h1>不见了 数据包中找到了一个loadCommentsWithVulnerableEscapeHtml.js 他是把用户输入的<>进行 html 编码,输入的<>当成字符串处理回显到页面中,看来只是把用户输…...