当前位置: 首页 > article >正文

RabbitMQ架构原理及消息分发机制

RabbitMQ架构原理及消息分发机制

在现代分布式系统中,消息队列是不可或缺的组件之一。它不仅能够解耦系统模块,还能实现异步通信和削峰填谷。在众多消息队列中,RabbitMQ 因其高并发、高可靠性和丰富的功能而备受青睐。本文将从 RabbitMQ 的基础概念、架构原理、消息分发机制、持久化与内存管理、插件管理、Java API 编程以及 Spring 集成等方面,全面解析 RabbitMQ 的核心技术和应用场景。

一、RabbitMQ 简介

RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的消息中间件,由 Pivotal 公司开发并维护。它最初于 2007 年发布,使用 Erlang 语言编写,专为高并发和分布式场景设计。RabbitMQ 支持多种协议(如 STOMP、MQTT、HTTP 和 WebSocket),并且在国内大厂(如头条、美团、滴滴等)中广泛使用。

RabbitMQ 的名字来源于其高性能和高扩展性,正如兔子奔跑迅速且繁殖能力强一样,RabbitMQ 能够高效处理海量消息。

二、RabbitMQ 架构原理

RabbitMQ 的核心架构基于 AMQP 协议,主要由以下几个关键组件构成:

  1. Broker(代理/中介)
    Broker 是 RabbitMQ 的服务器实例,负责存储和转发消息。它通过 TCP 长连接与客户端通信。

  2. Connection(连接)
    客户端与 Broker 之间的 TCP 长连接,用于建立通信通道。

  3. Channel(通道)
    为了避免频繁创建和释放 TCP 连接的性能损耗,RabbitMQ 引入了 Channel 概念。Channel 是基于 Connection 的虚拟连接,用于发送和接收消息。

  4. Queue(队列)
    Queue 是存储消息的对象,支持多种类型(如 classic 和 quorum)。消息在队列中按照 FIFO(先进先出)原则存储和消费。

  5. Exchange(交换机)
    Exchange 是负责路由消息的组件,根据绑定键(binding key)和路由键(routing key)将消息分发到一个或多个队列。

  6. Vhost(虚拟主机)
    Vhost 是用于资源隔离和权限控制的逻辑单元,类似于命名空间。不同 Vhost 中可以有同名的 Exchange 和 Queue。

三、RabbitMQ 消息分发机制

RabbitMQ 提供了四种主要的交换机类型,用于灵活的消息路由:

  1. Direct(直连型)
    消息根据路由键(routing key)与绑定键(binding key)的完全匹配,路由到对应的队列。

  2. Topic(主题型)
    支持通配符(*#),用于模糊匹配路由键。例如,*.gupao.* 可以匹配所有以 gupao 为中间部分的路由键。

  3. Fanout(广播型)
    将消息广播到所有绑定的队列,无需路由键。

  4. Headers(头部型)
    通过消息头部的键值对进行路由,较少使用。

四、RabbitMQ 持久化与内存管理

  1. 持久化机制

    • 消息持久化:通过设置 deliveryMode 为 2,将消息写入磁盘。
    • 队列和交换机持久化:通过参数设置,确保服务重启后仍存在。
  2. 内存控制
    RabbitMQ 通过内存阈值(默认 40% 的系统内存)控制内存使用。当内存达到阈值时,会阻塞客户端连接并触发内存告警。

  3. 磁盘控制
    当磁盘剩余空间低于阈值(默认 50MB)时,RabbitMQ 会阻塞生产者,避免磁盘耗尽。

五、RabbitMQ 插件管理

RabbitMQ 提供了丰富的插件支持,例如管理插件(rabbitmq_management)和延迟插件(x-delayed-message)。插件可以通过命令行管理:

rabbitmq-plugins enable <plugin_name>
rabbitmq-plugins disable <plugin_name>

六、Java API 编程

RabbitMQ 提供了原生的 Java API,用于发送和接收消息。以下是一个简单的生产者和消费者示例:

生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MyProducer {private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.8.149");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");Connection conn = factory.newConnection();Channel channel = conn.createChannel();String msg = "Hello world, Rabbit MQ";channel.basicPublish(EXCHANGE_NAME, "gupao.best", null, msg.getBytes());channel.close();conn.close();}
}

消费者代码

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;public class MyConsumer {private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";private final static String QUEUE_NAME = "SIMPLE_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.8.149");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");Connection conn = factory.newConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "gupao.best");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("Received message : '" + msg + "'");}};channel.basicConsume(QUEUE_NAME, true, consumer);}
}

七、Spring 集成 RabbitMQ

Spring 提供了对 RabbitMQ 的封装,简化了开发流程。以下是 Spring Boot 的集成示例:

配置文件

spring.rabbitmq.host=192.168.8.149
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/

定义交换机和队列

@Configuration
public class RabbitConfig {@Beanpublic DirectExchange directExchange() {return new DirectExchange("vipDirectExchange");}@Beanpublic Queue firstQueue() {return new Queue("vipFirstQueue");}@Beanpublic Binding bindFirst(Queue firstQueue, DirectExchange directExchange) {return BindingBuilder.bind(firstQueue).to(directExchange).with("gupao.best");}
}

消费者

@Component
@RabbitListener(queues = "vipFirstQueue")
public class FirstConsumer {@RabbitHandlerpublic void process(@Payload String message) {System.out.println("Received message: " + message);}
}

生产者

@Service
public class RabbitSender {@Autowiredprivate AmqpTemplate amqpTemplate;public void send() {String message = "Hello, RabbitMQ!";amqpTemplate.convertAndSend("vipDirectExchange", "gupao.best", message);}
}

八、总结

RabbitMQ 作为一款功能强大的消息中间件,在高并发和分布式系统中有着广泛的应用。通过本文的介绍,我们了解了 RabbitMQ 的架构原理、消息分发机制、持久化与内存管理、插件管理以及如何通过 Java API 和 Spring 集成进行开发。希望本文能够帮助你快速掌握 RabbitMQ 的核心技术和应用场景。

参考链接

  • RabbitMQ 官方文档:https://www.rabbitmq.com/documentation.html
  • Spring AMQP 文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/
  • Spring Boot 文档:https://docs.spring.io/spring-boot/docs/current/reference/html/

相关文章:

RabbitMQ架构原理及消息分发机制

RabbitMQ架构原理及消息分发机制 在现代分布式系统中&#xff0c;消息队列是不可或缺的组件之一。它不仅能够解耦系统模块&#xff0c;还能实现异步通信和削峰填谷。在众多消息队列中&#xff0c;RabbitMQ 因其高并发、高可靠性和丰富的功能而备受青睐。本文将从 RabbitMQ 的基…...

React 项目src文件结构

SCSS 组件库 SCSS为预处理器 支持除原生CSS外的其他语句 别名路径 在项目下的第一级目录就加入craco.config.js文件并且修改packpage.js 中的部分 // 扩展webpage的配置const path require(path)module.exports {// exports配置webpack:{// 配置别名alias:{:path.resolve(__d…...

Redis --- 基本数据类型

Redis --- 基本数据类型 Redis Intro5种基础数据类型 Redis Intro Redis&#xff08;Remote Dictionary Server&#xff09;是一款开源的高性能键值存储系统&#xff0c;常用于缓存、消息中间件和实时数据处理场景。以下是其核心特点、数据类型及典型使用场景&#xff1a; 核心…...

React 高级特性与最佳实践

在掌握了 React 的基础知识后&#xff0c;我们可以进一步探索 React 的高级特性和最佳实践。这些特性将帮助你构建更高效、可维护和可扩展的 React 应用。本文重点介绍 Hooks、Context、Refs 和高阶组件等核心高级特性。 1. Hooks&#xff1a;函数组件的强大工具 Hooks 是 Rea…...

一个由通义千问以及FFmpeg的AVFrame、buffer引起的bug:前面几帧影响后面帧数据

目录 1 问题描述 2 我最开始的代码----错误代码 3 正确的代码 4 为什么前面帧的结果会叠加到了后面帧上----因为ffmpeg新一帧只更新上一帧变化的部分 5 以后不要用通义千问写代码 1 问题描述 某个项目中&#xff0c;需要做人脸马赛克&#xff0c;然后这个是君正的某款芯片…...

12.第二阶段x64游戏实战-远程调试

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 上一个内容&#xff1a;11.第二阶段x64游戏实战-框架代码细节优化 本次写的内容是关于调试、排错相关的…...

Coze 和 n8n 的详细介绍及多维度对比分析,涵盖功能、架构、适用场景、成本等关键指标

以下是 Coze 和 n8n 的详细介绍及多维度对比分析&#xff0c;涵盖功能、架构、适用场景、成本等关键指标&#xff1a; 一、Coze 详细介绍 1. 基础信息 类型&#xff1a;低代码自动化平台&#xff08;SaaS&#xff09;。开源性&#xff1a;闭源&#xff08;企业版需付费&…...

咋用fliki的AI生成各类视频?AI生成视频教程

最近想制作视频&#xff0c;多方考查了决定用fliki&#xff0c;于是订阅了一年试试&#xff0c;这个AI生成的视频效果来看真是不错&#xff0c;感兴趣的自己官网注册个账号体验一下就知道了。 fliki官网 Fliki生成视频教程 创建账户并登录 首先&#xff0c;访问fliki官网并注…...

【NLP】 20. Attention 和 self-attention

1. 背景与基本概念 1.1 编码器&#xff0d;解码器模型的瓶颈问题 传统的序列到序列&#xff08;Seq2Seq&#xff09;模型主要依靠编码器生成单一固定长度的上下文向量&#xff0c;然后由解码器逐步生成输出。这个过程存在两个主要问题&#xff1a; 瓶颈问题&#xff1a;固定…...

vue3+element-plus实现省市区三级地址多选

目录 背景实现功能点遗留问题完整代码参考 背景 需要实现&#xff1a;选择省级地址时&#xff0c;回传节点为 [ 省级地址 id]&#xff0c; 选择市级地址时&#xff0c;回传节点为 [ 省级地址 id&#xff0c;市级地址 id]&#xff0c; 选择区县地址时&#xff0c;回传节点为 [ …...

centos部署的openstack发布windows虚拟机

‌CentOS上部署的OpenStack可以发布Windows虚拟机‌。在CentOS上部署OpenStack后&#xff0c;可以通过OpenStack平台创建和管理Windows虚拟机。以下是具体的步骤和注意事项&#xff1a; ‌安装和配置OpenStack‌&#xff1a; 首先&#xff0c;确保系统满足OpenStack的最低硬件…...

Linux : 进程等待以及进程终止

进程控制之进程等待 &#xff08;一&#xff09;fork函数1*fork函数返回值2.父子进程的写时拷贝 &#xff08;二&#xff09;进程终止1.进程退出码2.进程常见退出方法&#xff08;1&#xff09;_exit&#xff08;2&#xff09;exit&#xff08;3&#xff09;return 3.进程的异常…...

LSTM结合LightGBM高纬时序预测

1. LSTM 时间序列预测 LSTM 是 RNN&#xff08;Recurrent Neural Network&#xff09;的一种变体&#xff0c;它解决了普通 RNN 训练时的梯度消失和梯度爆炸问题&#xff0c;适用于长期依赖的时间序列建模。 LSTM 结构 LSTM 由 输入门&#xff08;Input Gate&#xff09;、遗…...

详细解释MCP项目中安装命令 bunx 和 npx区别

详细解释 bunx 和 npx 1. bunx bunx 是 Bun 的一个命令行工具&#xff0c;用于自动安装和运行来自 npm 的包。它是 Bun 生态系统中类似于 npx 或 yarn dlx 的工具。以下是 bunx 的主要特点和使用方法&#xff1a; 自动安装和运行&#xff1a; bunx 会自动从 npm 安装所需的包…...

【统信UOS操作系统】python3.11安装numpy库及导入问题解决

一、安装Python3.11.4 首先来安装Python3.11.4。所用操作系统&#xff1a;统信UOS 前提是准备好Python3.11.4的安装包&#xff08;可从官网下载&#xff08;链接&#xff09;&#xff09;&#xff0c;并解压到本地&#xff1a; 右键&#xff0c;选择“在终端中打开”&#xff…...

【中间件】nginx反向代理实操

一、说明 nginx用于做反向代理&#xff0c;其目标是将浏览器中的请求进行转发&#xff0c;应用场景如下&#xff1a; 说明&#xff1a; 1、用户在浏览器中发送请求 2、nginx监听到浏览器中的请求时&#xff0c;将该请求转发到网关 3、网关再将请求转发至对应服务 二、具体操作…...

嵌入式硬件篇---加法减法积分微分器

文章目录 前言1. 加法器&#xff08;Summing Amplifier&#xff09;结构反相加法器同相加法器 特点反相输出虚地特性 应用 2. 减法器&#xff08;差分放大器&#xff09;结构特点差分放大共模抑制比 应用 3. 积分器结构特点直流漂移问题应用 4. 微分器结构特点应用关键注意事项…...

Spring Cloud Gateway 的执行链路详解

Spring Cloud Gateway 的执行链路详解 &#x1f3af; 核心目标 明确 Spring Cloud Gateway 的请求处理全过程&#xff08;从接收到请求 → 到转发 → 到返回响应&#xff09;&#xff0c;方便你在合适的生命周期节点插入你的逻辑。 &#x1f9f1; 核心执行链路图&#xff08;执…...

鸿蒙应用(医院诊疗系统)开发篇2·Axios网络请求封装全流程解析

一、项目初始化与环境准备 1. 创建鸿蒙工程 src/main/ets/ ├── api/ │ ├── api.ets # 接口聚合入口 │ ├── login.ets # 登录模块接口 │ └── request.ets # 网络请求核心封装 └── pages/ └── login.ets # 登录页面逻辑…...

突发重磅消息!!!CVE项目将被取消?

突发重磅消息&#xff01;&#xff01;&#xff01;CVE项目将被取消&#xff1f;突发&#xff01;来自可靠消息来源。MITRE 对 CVE 项目的支持将于明天到期。附件信件已发送给 CVE 董事会成员。https://mp.weixin.qq.com/s/N3qkiHaDfzDuBMK3JbBCjw...

详解与FTP服务器相关操作

目录 什么是FTP服务器 搭建FTP服务器相关 ​编辑 Unity中与FTP相关的类 上传文件到FTP服务器 使用FTP服务器上传文件的关键点 开始上传 从FTP服务器下载文件到客户端 使用FTP下载文件的关键点 开始下载 关于FTP服务器的其他操作 将文件的上传&#xff0c;下载&…...

远程登录一个Linux系统,如何用命令快速知道该系统属于Linux的哪个发行版,以及该服务器的各种配置参数,运行状态?

远程登录一个Linux系统&#xff0c;如何用命令快速知道该系统属于Linux的哪个发行版&#xff0c;以及该服务器的各种配置参数&#xff0c;运行状态&#xff1f; 查看Linux发行版信息 查看发行版名称和版本&#xff1a; cat /etc/*-release或 lsb_release -a查看内核版本&#…...

解决 .Net 6.0 项目发布到IIS报错:HTTP Error 500.30

今天在将自己开发许久的项目上线的时候&#xff0c;发现 IIS 发布后请求后端老是报一个 HTTP Error 500.30 的异常&#xff0c;如下图所示。   后来仔细调查了一下发现是自己的程序中写了 UseStaticFiles 的依赖注入&#xff0c;这个的主要作用就是发布后端后&#xff0c;想…...

STM32F103_HAL库+寄存器学习笔记16 - 监控CAN发送失败(轮询方式)

导言 《STM32F103_HAL库寄存器学习笔记15 - 梳理CAN发送失败时&#xff0c;涉及哪些寄存器》从上一章节看到&#xff0c;当CAN消息发送失败时&#xff0c;CAN错误状态寄存器ESR的TEC会持续累加&#xff0c;LEC等于0x03&#xff08;ACK错误&#xff09;。本次实验的目的是编写一…...

Java并发-AQS框架原理解析与实现类详解

什么是AQS&#xff1f; AQS&#xff08;AbstractQueuedSynchronizer&#xff09;是Java并发包&#xff08;JUC&#xff09;的核心基础框架&#xff0c;它为构建锁和同步器提供了高效、灵活的底层支持。本文将从设计原理、核心机制及典型实现类三个维度展开&#xff0c;帮助读者…...

实现定长的内存池

池化技术 所谓的池化技术&#xff0c;就是程序预先向系统申请过量的资源&#xff0c;然后自己管理起来&#xff0c;以备不时之需。这个操作的价值就是&#xff0c;如果申请与释放资源的开销较大&#xff0c;提前申请资源并在使用后并不释放而是重复利用&#xff0c;能够提高程序…...

vs2022使用git方法

1、创建git 2、在cmd下执行 git push -f origin master &#xff0c;会把本地代码全部推送到远程&#xff0c;同时会覆盖远程代码。 3、需要设置【Git全局设置】&#xff0c;修改的代码才会显示可以提交&#xff0c;否则是灰色的不能提交。 4、创建的分支&#xff0c;只要点击…...

Mysql中表的使用(3)

目录 1.updata的使用 2.delete(删除表中数据)drop&#xff08;删除表&#xff09; 数据库的约束 1.NOT NULL 指定列不能为空 2.UNIQUE指定列唯一 3.DEFAULT(默认值) 4.PRIMARY KEY 5.自增主键 1.updata的使用 1.0update 表名 set 列名x where 列名y; 2.0update 表名 s…...

BUUCTF-Web(1-20)

目录 一.SQL注入 (1)[极客大挑战 2019]EasySQL 万能密码 (7)[SUCTF 2019]EasySQL 堆叠注入 解一&#xff1a; 解二&#xff1a; (10)[强网杯 2019]随便注 堆叠注入 解一&#xff1a; 解二&#xff1a; 解三&#xff1a; (8)[极客大挑战 2019]LoveSQL 联…...

Uniapp:确认框

目录 一、 出现场景二、 效果展示三、具体使用 一、 出现场景 在项目的开发中&#xff0c;会经常出现删除数据的情况&#xff0c;如果直接删除的话&#xff0c;可能会存在误删&#xff0c;用户体验不好&#xff0c;所以需要增加一个消息提示&#xff0c;提醒用户是否删除。 二…...