2024.2.29 模拟实现 RabbitMQ —— 项目展示
目录
项目介绍
核心功能
核心技术
演示直接交换机
演示扇出交换机
演示主题交换机
项目介绍
- 此处我们模拟 RabbitMQ 实现了一个消息队列服务器
核心功能
- 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理
- 九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、创建绑定、解除绑定、发布消息、订阅消息、确认消息
- 实现了三种典型的消息转换方式 直接交换机(Direct)、扇出交换机(Fanout)、主题交换机(Topic)
- 交换机、队列、绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
- 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作
核心技术
- Spring Boot / MyBatis / Lombok
- SQLite
- TCP
- 关于该项目的需求分析,可点击下方链接跳转
模拟实现 RabbitMQ —— 需求分析
- 关于该项目的核心类,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现核心类
- 关于该项目的数据库操作,可点击下方链接跳转
模拟实现 RabbitMQ —— 数据库操作
- 关于该项目的消息持久化,可点击下方链接跳转
模拟实现 RabbitMQ —— 消息持久化
- 关于该项目的内存数据管理,可点击下方链接跳转
模拟实现 RabbitMQ —— 内存数据管理
- 关于该项目的虚拟机设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 虚拟主机设计
- 关于该项目的交换机转发规则,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现转发规则
- 关于该项目的消费逻辑,可点击下方链接跳转
模拟实现 RabbitMQ —— 实现消费消息逻辑
- 关于该项目网络通信设计,可点击下方链接跳转
模拟实现 RabbitMQ —— 网络通信设计(服务器)
模拟实现 RabbitMQ —— 网络通信设计(客户端)
演示直接交换机
- 简单写一个 demo 模拟 跨主机的生产者消费者模型
- 此处为了方便,就在本机演示
- 此处我们创建的交换机类型为 直接交换机
1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!
@SpringBootApplication public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();} }
2、编写生产者代码
/* * 这个类用来表示一个生产着 * 通常这是一个单独的服务器程序 * */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","testQueue",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/* * 这个类表示一个消费者 * 通常这个类也应该是在一个独立的服务器中被执行 * */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
演示扇出交换机
- 此处我们创建的交换机类型为 扇出交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者A 代码
/** 这个类表示一个消费者A* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerA {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue1",true,false,false,null); // 设置绑定channel.queueBind("testQueue1","testExchange",""); // 订阅消息channel.basicConsume("testQueue1", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、编写消费者B 代码
/** 这个类表示一个消费者B* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumerB {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null); // 创建队列channel.queueDeclare("testQueue2",true,false,false,null); // 设置绑定channel.queueBind("testQueue2","testExchange",""); // 订阅消息channel.basicConsume("testQueue2", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[testQueue1 消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[testQueue1 消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
5、启动 Spring Boot 项目(启动 Broker Server)
6、运行消费者A 代码
7、运行消费者B 代码
8、运行生产者代码
9、继续观察消费者A 的控制台
10、继续观察消费者B 的控制台
演示主题交换机
- 此处我们创建的交换机为 主题交换机
1、编写生产者代码
/** 这个类用来表示一个生产着* 通常这是一个单独的服务器程序* */ public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);channel.queueDeclare("testQueue",true,false,false,null);// 创建消息A 并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);// 创建消息B 并发送body = "hi".getBytes();ok = channel.basicPublish("testExchange","aaa.bbb",null,body);System.out.println("消息投递完成! ok = " + ok);Thread.sleep(500);channel.close();connection.close();} }
3、编写消费者代码
/** 这个类表示一个消费者* 通常这个类也应该是在一个独立的服务器中被执行* */ public class DemoConsumer {public static void main(String[] args) throws IOException, InterruptedException, MqException {System.out.println("启动消费者!");ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(9090);Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel(); // 创建交换机channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null); // 创建队列channel.queueDeclare("testQueue",true,false,false,null); // 设置绑定channel.queueBind("testQueue","testExchange","*.aaa.bbb"); // 订阅消息channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag = " + consumerTag);System.out.println("basicProperties = " + basicProperties);String bodyString = new String(body,0,body.length);System.out.println("body = " + bodyString);System.out.println("[消费数据] 结束!");}}); // 由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费while (true) {Thread.sleep(500);}} }
4、启动 Spring Boot 项目(启动 Broker Server)
5、运行消费者代码
6、运行生产者代码
7、继续观察消费者的控制台
相关文章:
2024.2.29 模拟实现 RabbitMQ —— 项目展示
目录 项目介绍 核心功能 核心技术 演示直接交换机 演示扇出交换机 演示主题交换机 项目介绍 此处我们模拟 RabbitMQ 实现了一个消息队列服务器 核心功能 提供了 虚拟主机、交换机、队列、绑定、消息 概念的管理九大核心 API 创建队列、销毁队列、创建交换机、销毁交换机、…...
React htmlfor
注意,在添加属性时, class 属性需要写成 className ,for 属性需要写成 htmlFor ,这是因为 class 和 for 是 JavaScript 的保留字。 在React中,当我们需要为一个表单元素设置标签时,可以使用htmlFor属性。它…...
现代化数据架构升级:毫末智行自动驾驶如何应对年增20PB的数据规模挑战?
毫末智行是一家致力于自动驾驶的人工智能技术公司,其前身是长城汽车智能驾驶前瞻分部,以零事故、零拥堵、自由出行和高效物流为目标,助力合作伙伴重塑和全面升级整个社会的出行及物流方式。 在自动驾驶领域中,是什么原因让毫末智行…...
理解Stable Diffusion、LoRA、Dreambooth、Hypernetworks、Textual Inversion、Checkpoint
前言 在深度学习和人工智能的领域中,模型生成和调整技术的快速发展为创造性内容的自动化提供了新的可能性。本文将介绍四种重要的模型技术——Stable Diffusion、LoRA、Dreambooth、和Hypernetworks——它们在生成艺术、个性化模型调整和网络结构设计方面各自的特点…...
spring boot3登录开发-2(1图形验证码接口实现)
⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 🌊山高路远,行路漫漫,终有归途。 目录 前置条件 内容简介 图形验证码接口实现 导入糊涂工具依赖 接口分析 编写验证码接口 测试验证码接口 前置条件 …...
网络编程中的问题总结
1、服务端重启后bind失败,因为TCP 套接字状态 TIME_WAIT 引起,该状态在套接字关闭后约保留 2 到 4 分钟。在 TIME_WAIT 状态退出之后,套接字被删除,该地址才能被重新绑定而不出问题。可以通过setsockopt()设置Socket描述符的选项S…...
数据结构-关键路径
介绍 在AOV网的基础上,如果用对应边来表示活动持续时间,这种有向图被称为AOE网在AOE网中,入度为0的为源点,出度为0的为汇点,整张网看做是一件事情完成的过程,那么这两个点就是事情的开始和结束。每个活动持…...
进程间通信学习笔记(共享内存)
内存映射概念: 共享内存可以通过mmap()映射普通文件使一个磁盘文件与内存中的一个缓冲区相映射,进程可以像访问普通文件一样对文件进行访问,不必再强调read,write。 mmap的优点: 实现了用户空间和内核空间的高效交互方式 mmap的…...
ChatGPT学习第三周
📖 学习目标 ChatGPT在各行各业的应用 探索ChatGPT在不同领域(如教育、客户服务等)的实际应用案例。 ChatGPT的局限性和挑战 讨论ChatGPT面临的挑战,包括偏见、误解及其限制。 ✍️ 学习活动 学习资料 《人工智能通用大模型(…...
R语言混合效应(多水平/层次/嵌套)模型及贝叶斯实现技术应用
回归分析是科学研究中十分重要的数据分析工具。随着现代统计技术发展,回归分析方法得到了极大改进。混合效应模型(Mixed effect model),即多水平模(Multilevel model)/分层模型(Hierarchical Model)/嵌套模…...
[C++]使用C++部署yolov9的tensorrt模型进行目标检测
部署YOLOv9的TensorRT模型进行目标检测是一个涉及多个步骤的过程,主要包括准备环境、模型转换、编写代码和模型推理。 首先,确保你的开发环境已安装了NVIDIA的TensorRT。TensorRT是一个用于高效推理的SDK,它能对TensorFlow、PyTorch等框架训…...
eureka注册中心做了哪些事情/原理?
1.服务注册: 将eureka client发送过来的元数据存储到注册表中 2.服务续约: eureka client默认会每30秒向eureka server发送一次心跳来进行服务续约,通过这一行动来表示自己没有出现故障; 3.服务…...
c语言经典测试题4
1.题1 #include <stdio.h>//没有break的话,输入什么都会往下一直执行下去,而且default在最后就会全都执行 int main() {char c;int v0 0, v1 0, v2 0;do{switch (c getchar())// 输入ADescriptor{casea:caseA:casee:caseE:casei:caseI:caseo:…...
设计模式(五)-观察者模式
前言 实际业务开发过程中,业务逻辑可能非常复杂,核心业务 N 个子业务。如果都放到一块儿去做,代码可能会很长,耦合度不断攀升,维护起来也麻烦,甚至头疼。还有一些业务场景不需要在一次请求中同步完成&…...
MySQL-七种SQL优化
一、插入数据 普通插入: 采用批量插入(一次插入的数据不建议超过1000条) insert into tb_test values(1,Tom),(3, Cat),(3, Jerry)....手动提交事务 start transaction; insert into tb_test values(1,Tom),(3, Cat),(3, Jerry); insert …...
针对Umi、React中遇到的 “xxxx”不能用作 JSX 组件 问题解决方案
一、处理方案 这是因为"types/react"、"types/react-dom"在子依赖中使用的版本不一致导致,一般情况npm会自动帮我们处理版本不一致的问题。如果npm处理不了,就需要我们自己手动处理在package.json中添加一项配置 {name:"test&…...
蓝桥杯备战刷题one(自用)
1.被污染的支票 #include <iostream> #include <vector> #include <map> #include <algorithm> using namespace std; int main() {int n;cin>>n;vector<int>L;map<int,int>mp;bool ok0;int num;for(int i1;i<n;i){cin>>nu…...
设计模式(十) - 工厂方式模式
前言 在此前的设计模式(四)简单工厂模式中我们介绍了简单工厂模式,在这篇文章中我们来介绍下工厂方法模式,它同样是创建型设计模式,而且又有些类似,文章的末尾会介绍他们之间的不同。 1.工厂方法模式简介 …...
http协议基础与Apache的简单介绍
一、相关介绍: 互联网:是网络的网络,是所有类型网络的母集因特网:世界上最大的互联网网络。即因特网概念从属于互联网概念。习惯上,大家把连接在因特网上的计算机都成为主机。万维网:WWW(world…...
RabbitMQ的死信队列和延迟队列
文章目录 死信队列如何配置死信队列死信队列的应用场景Spring Boot实现RabbitMQ的死信队列 延迟队列方案优劣:延迟队列的实现有两种方式: 死信队列 1)“死信”是RabbitMQ中的一种消息机制。 2)消息变成死信,可能是由于…...
Vivado 2023.4 与 ModelSim SE 2022.4 联合仿真环境搭建全攻略(附资源与常见报错解决)
Vivado 2023.4与ModelSim SE 2022.4联合仿真环境搭建实战指南 在FPGA开发领域,仿真环节的重要性不言而喻。作为Xilinx最新推出的设计套件,Vivado 2023.4与Mentor旗下ModelSim SE 2022.4的强强联合,能为开发者提供更高效的验证环境。本文将深入…...
终极华硕笔记本性能优化指南:3步快速配置GHelper轻量级硬件控制工具
终极华硕笔记本性能优化指南:3步快速配置GHelper轻量级硬件控制工具 【免费下载链接】g-helper Lightweight, open-source control tool for ASUS laptops and ROG Ally. Manage performance modes, fans, GPU, battery, and RGB lighting across Zephyrus, Flow, T…...
推荐一些可以用于论文降重的软件:哪些平台能同时降低查重率和AIGC疑似率?2026年实测TOP5对比,AIGC率最低降至5%!
【博主按】 各位CSDN的极客和科研搬砖人们,五月答辩季的“代码”都跑通了吗?最近后台收到海量求助报Bug:自己的论文好不容易把字面查重率“Debug”到了8%,结果一提交教务处的系统,直接弹出了个致命错误——“AIGC疑似率…...
反序列化漏洞详解(第二期):实战利用、工具实操与防御方案
反序列化漏洞详解(第二期):实战利用、工具实操与防御方案 摘要:承接反序列化漏洞第一期(基础认知与原理),本期聚焦实战落地——拆解Java、PHP反序列化漏洞的具体利用方法,结合DVWA、…...
从华为LTC到企业核心流程:聊聊SAP OTC/PTP如何融入大流程框架
从华为LTC到企业核心流程:SAP OTC/PTP如何融入大流程框架 在数字化转型的浪潮中,企业流程治理正经历着从职能导向到价值导向的深刻变革。当我们谈论SAP系统中的OTC(Order to Cash)或PTP(Procure to Pay)时&…...
[大模型实战 - 完结篇] 告别孤岛:拥抱 MCP 协议,为大模型打造标准“USB 接口”
前言 Kubernetes 本身并不复杂,是我们把它搞复杂的。无论是刻意为之还是那种虽然出于好意却将优雅的原语堆砌成 鲁布戈德堡机械 的狂热。平台最初提供的 ReplicaSets、Services、ConfigMaps,这些基础组件简单直接,甚至显得有些枯燥。但后来我…...
智能家居项目翻车实录:聊聊嵌入式IoT开发中那些容易踩的坑(附避坑指南)
智能家居开发实战:嵌入式IoT项目避坑指南 去年我接手了一个智能家居中控系统的开发项目,原本以为凭借多年的嵌入式开发经验能够轻松搞定,结果却遭遇了各种意想不到的问题——设备频繁离线、传感器数据延迟、OTA升级失败……这些问题不仅让项目…...
别再手动算积分了!用MATLAB integral函数搞定这6种‘奇葩’积分(含分段、无穷限)
别再手动算积分了!用MATLAB integral函数搞定这6种‘奇葩’积分(含分段、无穷限) 在科研计算和工程仿真中,积分问题就像隐藏在数据背后的幽灵——当你在信号处理中分析频谱特性时,在物理建模中求解场分布时,…...
LARS回归模型:高维数据特征选择与Python实现
## 1. LARS回归模型概述LARS(Least Angle Regression)是一种用于高维数据线性回归的变量选择算法。我第一次接触这个算法是在处理基因组数据时,当时需要从数千个基因表达特征中筛选出几十个关键预测因子。与传统逐步回归不同,LARS…...
5大核心功能!DamaiHelper演唱会抢票神器全攻略
5大核心功能!DamaiHelper演唱会抢票神器全攻略 【免费下载链接】damaihelper 支持大麦网,淘票票、缤玩岛等多个平台,演唱会演出抢票脚本 项目地址: https://gitcode.com/gh_mirrors/dam/damaihelper 还在为抢不到心仪演唱会门票而烦恼…...











