RabbitMQ 架构分析
文章目录
- 前言
- 一、RabbitMQ架构分析
- 1、Broker
- 2、Vhost
- 3、Producer
- 4、Messages
- 5、Connections
- 6、Channel
- 7、Exchange
- 7、Queue
- 8、Consumer
- 二、消息路由机制
- 1、Direct Exchange
- 2、Topic Exchange
- 3、Fanout Exchange
- 4、Headers Exchange
- 5、notice
- 5.1、备用交换机(Alternate Exchange)
- 5.2、Mandatory标志
- 5.3、Dead Letter Exchange(死信交换机)
前言
RabbitMQ是一款功能强大、利用广泛的消息中间件,通常用于分布式系统和微服务架构中的异步通信。理解其架构有助于正确配置和优化系统,实现高效可靠的消息传递
一、RabbitMQ架构分析
架构图
1、Broker
我们要使用 RabbitMQ 来收发消息,必须要安装一个 RabbitMQ 的服务,可以安装在 Windows 上面也可以安装在 Linux 上面,默认是 5672 的端口。
这台 RabbitMQ 的服务器我们把它叫做 Broker。
2、Vhost
我们每个需要实现基于 RabbitMQ 的异步通信的系统,都需要在 Broker 上创建自己要用到的交换机、队列和它们的绑定关系。
如果某个业务系统不想跟别人混用一个 Broker,有办法不需要安装多个 RabbitMQ 的服务吗?那就是建立一个新的虚拟主机 VHOST。 VHOST 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。
它的作用类似于其他编程语言中的 namespace 和 package,不同的 VHOST 中可以有同名的 Exchange 和 Queue,它们是完全独立的。
我们可以为不同的业务系统创建专属于他们自己的 VHOST,然后再为他们创建专属的用户,给用户分配对应的 VHOST 的权限。
我们安装 RabbitMQ 的时候会自带一个默认的 VHOST,名字是“/”。
3、Producer
消息生产者(Producer)是发送消息的应用程序。在RabbitMQ中,生产者将消息发送到交换机(Exchange)而不是直接发给队列。
生产者可以是任何能够与RabbitMQ服务器进行通信并使用AMQP协议发送消息的应用。
4、Messages
消息是由生产者发送给RabbitMQ的内容单元,每条消息由负载(payload)和一些元数据(metadata)组成。负载是消息的实际内容,而元数据包含消息的属性或标识。
5、Connections
生产者和消费者与RabbitMQ服务器之间的TCP长连接,每个连接用于消息的传递和通信。
6、Channel
如果所有的生产者发送消息和消费者接收消息,都直接创建和释放 TCP 长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,也会浪费时间。
在 AMQP 里面引入了 Channel (消息信道)的概念,它是一个虚拟的连接。这样我们就可以在保持的 TCP 长连接里面去创建和释放Channel,大大了减少了资源消耗。
不同的 Channel 是相互隔离的,每个 Channel 都有自己的编号。每个客户端线程独占一个channel。
Channel 是 RabbitMQ 原生 API 里面的最重要的编程接口,也就是说我们定义交换机、队列、绑定关系,发送消息,消费消息,调用的都是Channel 接口上的方法。
7、Exchange
现在我们来思考一个问题,如果要把一条消息发送给多个队列,给多个消费者消费,如果交给生产者来做,当有成千上万个队列的时候,那要发送大量消息,耗费太多资源。
如何更优雅的处理呢?RabbitMQ考虑到了这一点,它设计了一个帮我们路由消息的组件,叫做 Exchange。
不管有多少个队列需要接收消息,我都只需要发送到 Exchange 就OK了,由它帮我来分发。Exchange 是不会存储消息的,它只做一件事情,根据规则分发消息。 分发就需要有规则,并且与队列绑定,然后进行个性话分发。
Exchange 和队列是多对多的绑定关系,也就说,一个交换机的消息一个路由给多个队列,一个队列也可以接收来自多个交换机的消息。
绑定关系建立好之后,生产者发送消息到 Exchange,也会携带一个特殊的标识。 当这个标识跟绑定的标识匹配的时候,消息就会发给一个或者多个符合规则的队列。
创建Exchange可以配置相关的属性
属性 | 含义 |
---|---|
name | 交换机的名称 |
type | 交换机的类型决定了它如何路由消息。主要有以下几种类型。Direct:路由规则是完全匹配路由键。 Fanout:把接收到的消息广播到所有绑定的队列上,不需要匹配键。 Topic:根据路由键的通配符模式进行路由。 Headers:根据消息头属性进行匹配,而不是路由键。 |
durable | 是否持久化(重启 rabbitmq 之后, 交换机是否还存在),默认false |
autoDelete | 当所有绑定到这个交换机的队列都不再使用时,交换机会自动删除。默认为false |
internal | 交换机将是内部的,不能被生产者直接发送消息,只能用于交换机到交换机的绑定(即路由)。默认为false |
arguments | 一个字典,可以包含与RabbitMQ插件或未来版本兼容的其他设置。具体参数可能会依赖于具体的交换机类型和业务需求,比如 alternate-exchange:配置一个备用交换机(当消息无法路由时,消息将发送到这个备用交换机) |
7、Queue
在 Broker 上有一个对象用来存储消息,在 RabbitMQ 里面这个对象叫做 Queue。
队列也是生产者和消费者的纽带,生产者发送的消息到达队列,在队列中存储,消费者从队列获取消息进行消费。
队列的相关属性
属性 | 含义 |
---|---|
name | |
durable | (默认为false)设置队列是否为持久化队列。如果设置为true,即使RabbitMQ服务器重启,队列仍然存在。 |
exclusive | (默认为false)设置队列是否为排他队列。如果设置为true,队列仅限于首次声明它的连接(Connection)使用,并且连接关闭时队列会自动删除。这个参数优先级高于autoDelete |
autoDelete | (默认为false)设置队列是否在所有消费者断开连接后自动删除。如果设置为true,当不再有任何消费者订阅该队列时,它会被删除。 |
arguments | x-message-ttl: 每个消息的生存时间(以毫秒为单位)。如果消息在队列中停留超过这个时间则会被删除。 x-expires: 在队列空闲(没有消费者订阅)超过指定时间(以毫秒为单位)后,队列会被删除。 x-max-length: 队列允许的最大消息数。超过限制时,新消息将从队列的头部移除(可选丢弃策略)。 x-max-length-bytes: 队列允许的最大字节数。超过限制时,新消息将从队列的头部移除(可选丢弃策略)。 x-dead-letter-exchange: 设置消息从该队列被移除(由于TTL、长度限制等)后发送到的备用交换机(Dead Letter Exchange)。 x-dead-letter-routing-key: 设置消息重新投递到备用交换机时使用的路由键。 x-max-priority: 队列的最大优先级,如果设置,队列将成为一个优先级队列。 x-queue-mode: 可以设置为 lazy,表明队列应尽可能保存消息到磁盘而不是内存中,以减少RAM的占用。 x-queue-master-locator: 用于集群模式下,控制队列主节点的分布策略。 |
8、Consumer
消费者消费消息有两种模式。
- Pull 模式,对应的方法是 basicGet。消息存放在服务端,只有消费者主动获取才能拿到消息。如果每隔一段时间获取一次消息,消息的实时性会降低。但是好处是可以根据自己的消费能力决定获取消息的频率
- Push 模式,对应的方法是 basicConsume,只要生产者发消息到服务器,就马上推送给消费者,消息保存在客户端,实时性很高,如果消费不过来有可能会造成消息积压。
Spring AMQP 是 push 方式,通过事件机制对队列进行监听,只要有消息到达队列,就会触发消费消息的方法。
RabbitMQ 中 pull 和 push 都有实现。而 kafka 和 RocketMQ 只有 pull。
由于队列有 FIFO 的特性,只有确定前一条消息被消费者接收之后,Broker 才会把这条消息从数据库删除,继续投递下一条消息。
一个消费者是可以监听多个队列的,一个队列也可以被多个消费者监听。 但是在生产环境中,我们一般是建议一个消费者只处理一个队列的消息。
如果需要提升处理消息的能力,可以增加多个消费者。这个时候消息会在多个消费者之间轮询。
二、消息路由机制
RabbitMQ 中一共有四种类型的交换机,Direct、Topic、Fanout、Headers(不常用)。
1、Direct Exchange
特点:
- 直接交换机根据消息的路由键(Routing Key)与绑定键(Binding Key)进行精确匹配,将消息路由到绑定的队列。
工作原理:
- 当生产者发送消息时,需要指定路由键。
- 交换机会查找与该路由键完全匹配的绑定队列,并将消息路由到这些队列。
例如发送如下消息
# 只有 binding key = rabbit 能收到消息
channel.basicPublish(“MY_DIRECT_EXCHANGE”,”rabbit”,”msg 1”);# 匹配不上,消息丢失
channel.basicPublish(“MY_DIRECT_EXCHANGE”,”rabbit0000”,”msg 1”);
应用场景:
- 适用于需要精确匹配路由键的场景,比如日志系统中不同级别的日志(“info”, “error”)可以精确路由到不同的队列进行处理。
2、Topic Exchange
特点:
- 主题交换机通过模式匹配路由键和绑定键来路由消息,支持部分匹配和通配符。
工作原理:
- 生产者发送消息时指定带有模式的路由键,路由键可以包含点号(.)分隔的多个单词,例如 a.bc.def 是 3 个单词。
- 绑定键可以包含通配符:"*“匹配一个单词,”#"匹配零个或多个单词,可以当前缀也可以当后缀。
分析:
- rabbit.#:支持路由键以 rabbit 开头的消息路由,后面可以有单词,也可以没有
- #.rocket:支持路由键以 rocket 结尾,前面可以有也可以没有单词的消息路由
- kafka.*:支持路由键以 kafka 开头,并且后面是一个单词的消息路由
例如发送如下消息
# 只有 binding key = rabbit.# 能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE","rabbit.abc.jvm","msg 2");# 只有 binding key = kafka.* 能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE","kafka.jvm","msg 2");# 只有 binding key = #.rocket 能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE","kafka.abc.rocket","msg 2");# binding key = rabbit.# , binding key = #.rocket 能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE","rabbit.dad.rocket","msg 2");# binding key = #.rocket , binding key = kafka.* 能收到消息
channel.basicPublish("MY_TOPIC_EXCHANGE","kafka.rocket","msg 2");
应用场景:
- 适用于多层次、复杂的路由场景,例如新闻系统可以根据新闻类别和地区进行路由。
3、Fanout Exchange
特点:
- 广播交换机会将接收到的每条消息路由到所有绑定的队列,而不考虑路由键。
工作原理:
- 生产者发送消息时无需指定路由键。
- 消息会被广播到所有与交换机绑定的队列中。
例如发送如下消息
# 三个队列都会收到 msg 4
channel.basicPublish("MY_FANOUT_EXCHANGE", "", "msg 4");
应用场景:
- 适用于广播场景,将一条消息分发给多个消费者,常用于发布-订阅模式。比如广告、日志等
4、Headers Exchange
特点:
- 头交换机不是通过路由键,而是通过消息头(Headers)中的属性进行匹配路由。
工作原理:
- 绑定包含一组键值对,消息头也包含键值对。
- 交换机将消息头与绑定键值对进行匹配来路由消息,可选择完全匹配或者部分匹配。
应用场景:
- 适用于消息头包含丰富信息,且需要基于多属性进行路由的场景。例如,需要根据多种复杂条件路由的应用。
5、notice
Topic Exchange 和 Direct Exchange 匹配不上都可能导致消息丢失。
可以按照如下方式处理
5.1、备用交换机(Alternate Exchange)
备用交换机是RabbitMQ提供的一种机制,用来处理没有匹配队列的消息。你可以为一个交换机配置一个备用交换机,当消息未能被路由到任何队列时,这些消息会被发送到备用交换机进行处理。
配置备用交换机步骤:
- 创建一个备用交换机。
- 在主要交换机上设置该备用交换机。
示例:
- 假设我们有一个主交换机名为primary_exchange和一个备用交换机名为alternate_exchange,你可以用以下方式设置:
// 当消息在primary_exchange中没有匹配的队列时,消息将被路由到alternate_exchange。
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "alternate_exchange");
channel.exchangeDeclare("primary_exchange", "direct", true, false, args);
5.2、Mandatory标志
Mandatory标志是RabbitMQ的消息属性之一。在生产者发送消息时,如果设置了mandatory标志为true,而消息未能找到匹配的队列,那么消息不会被丢弃,Broker会将消息返回给生产者。
如何使用:
- 在生产者发送消息时,可以配置mandatory标志。
String message = "Hello World!";
channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());
Callback机制可以用来处理返回的未路由消息。需要设置ReturnCallback:
channel.addReturnListener(new ReturnCallback() {public void handle (Return r){
// 处理未路由的消息System.out.println("Returned message: " + new String(r.getBody()));}
});
5.3、Dead Letter Exchange(死信交换机)
未被正确路由的消息有时会被重定向到称为死信交换机(DLX)的特殊交换机。当一个队列的消息变为死信时,这些消息可以被路由到一个死信交换机上。
一些情况会导致消息变为死信:
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue 属性设置为 false。
- 消息在队列中存活时间超过TTL(Time to Live)。
- 队列的消息数超过了最大长度(队列溢出)。
你可以配置队列使用死信交换机:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange");
channel.queueDeclare("queue_name", true, false, false, args);
相关文章:

RabbitMQ 架构分析
文章目录 前言一、RabbitMQ架构分析1、Broker2、Vhost3、Producer4、Messages5、Connections6、Channel7、Exchange7、Queue8、Consumer 二、消息路由机制1、Direct Exchange2、Topic Exchange3、Fanout Exchange4、Headers Exchange5、notice5.1、备用交换机(Alter…...

Qt Enter和HoverEnter事件
介绍 做PC开发的过程中或多或少都会接触到鼠标的悬停事件,Qt中处理鼠标悬停有Enter和HoverEnter两种事件 相同点 QEvent::Enter对应QEnterEvent,描述的是鼠标进入控件坐标范围之内的行为,QEnterEvent可以抓取鼠标的位置;QEvent…...

大语言模型之prompt工程
前言 随着人工智能的快速发展,我们正慢慢进入AIGC的新时代,其中对自然语言的处理成为了智能化的关键一环,在这个大背景下,“Prompt工程”由此产生,并且正逐渐成为有力的工具... LLM (Large Language Mode…...

WPF基础 | WPF 常用控件实战:Button、TextBox 等的基础应用
WPF基础 | WPF 常用控件实战:Button、TextBox 等的基础应用 一、前言二、Button 控件基础2.1 Button 的基本定义与显示2.2 按钮样式设置2.3 按钮大小与布局 三、Button 的交互功能3.1 点击事件处理3.2 鼠标悬停与离开效果3.3 按钮禁用与启用 四、TextBox 控件基础4.…...

[笔记] 极狐GitLab实例 : 手动备份步骤总结
官方备份文档 : 备份和恢复极狐GitLab 一. 要求 为了能够进行备份和恢复,请确保您系统已安装 Rsync。 如果您安装了极狐GitLab: 如果您使用 Omnibus 软件包,则无需额外操作。如果您使用源代码安装,您需要确定是否安装了 rsync。…...

随笔十七、eth0单网卡绑定双ip的问题
在调试语音对讲过程中遇到过一个“奇怪”问题:泰山派作为一端,可以收到对方发来的语音,而对方不能收到泰山派发出的语音。 用wireshark抓包UDP发现,泰山派发送的地址是192.168.1.30,而给泰山派实际设置的静态地址是19…...

逻辑复制parallel并发参数测试
逻辑复制parallel并发参数测试 一、测试结果、测试环境描述 1.1、测试结果 cpu表中有1000万条数据,大小为1652MB,当更新的数据量多于10万条的时候有明显变化,多余30万条的时候相差2倍。 更新的数据量较多时,逻辑复制使用并发参数相比于使用…...

Cursor 帮你写一个小程序
Cursor注册地址 首先下载客户端 点击链接下载 1 打开微信开发者工具创建一个小程序项目 选择TS-基础模版 官方 2 然后使用Cursor打开小程序创建的项目 3 在CHAT聊天框输入自己的需求 比如 小程序功能描述:吃什么助手 项目名称: 吃什么小程序 功能目标…...

WordPress免费证书插件
为了在您的网站上启用HTTPS,您可以使用本插件快速获取Let’s Encrypt免费证书。 主要功能: 支持快速申请Let’s Encrypt免费证书支持通配符证书申请,每个证书最多可以绑定100个域名支持自动续期证书支持重颁发证书,证书过期或失…...

Linux:多线程[2] 线程控制
了解: Linux底层提供创建轻量级进程/进程的接口clone,通过选择是否共享资源创建。 vfork和fork都调用的clone进行实现,vfork和父进程共享地址空间-轻量级进程。 库函数pthread_create调用的也是底层的clone。 POSIX线程库 与线程有关的函数构…...

C++——list的了解和使用
目录 引言 forward_list与list 标准库中的list 一、list的常用接口 1.list的迭代器 2.list的初始化 3.list的容量操作 4.list的访问操作 5.list的修改操作 6.list的其他操作 二、list与vector的对比 结束语 引言 本篇博客要介绍的是STL中的list。 求点赞收藏评论…...

Agent群舞,在亚马逊云科技搭建数字营销多代理(Multi-Agent)(下篇)
在本系列的上篇中,小李哥为大家介绍了如何在亚马逊云科技上给社交数字营销场景创建AI代理的方案,用于社交动态的生成和对文章进行推广曝光。在本篇中小李哥将继续本系列的介绍,为大家介绍如何创建主代理,将多个子代理挂载到主代理…...

DBeaver连接MySQL数据库
打开DBeaver,点击“新建数据库连接”选项。 点击“测试连接”,首次连接mysql会提示下载对应的JDBC驱动,点击下载即可。 填写服务器地址(这里是本地测试)、mysql的用户名(root)和密码ÿ…...

Leetcode40: 组合总和 II
题目描述: 给定一个候选人编号的集合 candidates 和一个目标数 target ,找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的每个数字在每个组合中只能使用 一次 。 注意:解集不能包含重复的组合。 代码思路ÿ…...

win32汇编环境,对话框程序中使用进度条控件
;运行效果 ;win32汇编环境,对话框程序中使用进度条控件 ;进度条控件主要涉及的是长度单位,每步步长,推进的时间。 ;比如你的长度是1000,步长是100,每秒走1次,则10秒走完全程 ;比如你的长度是1000,步长是10,每秒走1次,则100秒走完全程,但每格格子的长度与上面一样 ;以下…...

AIGC时代下的Vue组件开发深度探索
文章目录 一、AIGC时代对Vue组件开发的深远影响二、Vue组件开发基础与最佳实践三、AIGC技术在Vue组件开发中的具体应用四、结论与展望 随着人工智能技术的飞速发展,AIGC(人工智能生成内容)时代已经悄然来临。在这个时代背景下,软件…...

在Kubernets Cluster中部署LVM类型的StorageClass - 上
适用场景 看到B站技术部门的文章,是关于如何在k8s集群部署Elastic Search和Click House等IO密集型数据库应用的。 因为要充分利用NvME SSD盘的IOPS,所有数据库应用都直接调用本地SSD盘做为stateful application的 Persistent Volumes。为了可用动态的分…...

一次StarRocks分析的经历
起因 有人反馈说SR,在系统资源还有空闲的时候,被操作系统杀掉了。没有日志,怀疑是bug,如果要解决这个bug。据说在网上查到要升级。请我准备一下升级。 质疑 StarRocks是一款分析型数据库,2021年正式开源,…...

Django网站搭建流程
使用Django搭建网站是一个系统的过程,涉及从环境搭建到部署上线的多个步骤。以下是详细的流程: 1. 环境搭建 (1)安装Python Django是基于Python的Web框架,因此需要先安装Python。建议安装Python 3.8及以上版本。 下载地…...

Vue-day2
7.Vue的生命周期 mounted函数:在页面加载完毕时,发送异步请求,加载数据,渲染页面 createApp({date(){},methods:{},mounted:function(){console.log(Vue挂载完毕,发送请求获取数据)} }).mount(#{app}) 8.ajax函数库…...

Day44:列表元素的修改
在 Python 中,列表是一种可变的数据结构,意味着我们可以对列表中的元素进行修改。修改列表元素的方式有很多种,包括通过索引修改、切片修改、使用 append() 和 extend() 添加新元素、以及删除元素等。今天,我们将学习如何在列表中…...

在 AMD GPU 上使用 vLLM 的 Triton 推理服务器
Triton Inference Server with vLLM on AMD GPUs — ROCm Blogs 2025年1月8日,作者:Fabricio Flores,Tiffany Mintz,Eliot Li,Yao Liu,Ted Themistokleous,Brian Pickrell,Vish Vadl…...

day7手机拍照装备
对焦对不上:1、光太暗;2、离太近;3、颜色太单一没有区分点 滤镜可以后期P 渐变灰滤镜:均衡色彩,暗的地方亮一些,亮的地方暗一些 中灰滤镜:减少光差 手机支架:最基本70cm即可 手…...

HarmonyOS:创建应用静态快捷方式
一、前言 静态快捷方式是一种在系统中创建的可以快速访问应用程序或特定功能的链接。它通常可以在长按应用图标,以图标和相应的文字出现在应用图标的上方,用户可以迅速启动对应应用程序的组件。使用快捷方式,可以提高效率,节省了查…...

[SUCTF 2018]MultiSQL1
进去题目页面如下 发现可能注入点只有登录和注册,那么我们先注册一个用户,发现跳转到了/user/user.php, 查看用户信息,发现有传参/user/user.php?id1 用?id1 and 11,和?id1 and 12,判断为数字型注入 原本以为是简单的数字型注入,看到大…...

kafka-部署安装
一. 简述: Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。 二. 安装部署: 1. 依赖: a). Java:Kafka 需要 Java 8 或更高版本。 b). zookeeper: #tar fxvz zookeeper-3.7.0.tar.gz #…...

VUE3 使用路由守卫函数实现类型服务器端中间件效果
vue3中的router组件,有一个函数 router.beforeEach,可以实现请求中间件效果 使用方法如下: 前提已经在Vue3 项目中引入router组件,在router.js文件中加入router.beforeEach //路由守卫函数,类似于中间件session效果…...

|Python新手小白中级教程|第二十九章:面向对象编程(Python类的拓展延伸与10道实操题目)(5)
文章目录 前言1.类变量与实例变量2.静态方法和类方法1.静态方法2.类方法 3.实操使用1. 创建一个名为Person的类,包含属性name和age,并且有一个方法introduce()用于介绍自己的名字和年龄。2. 创建一个名为Circle的类,包含属性radius和color&am…...

项目概述与规划 (I)
项目概述与规划 (I) JavaScript的学习已经接近尾声了,最后我们将通过一个项目来讲我们在JavaScript中学习到的所有都在这个项目中展现出来,这个项目的DEMO来自于Udemy中的课程,作者是Jonas Schmedtmann; 项目规划 项目步骤 用户…...

mysql学习笔记-数据库的设计规范
1、范式简介 在关系型数据库中,关于数据表设计的基本原则、规则就称为范式。 1.1键和相关属性的概念 超键:能唯一标识元组的属性集叫做超键。 候选键:如果超键不包括多余的属性,那么这个超键就是候选键 主键:用户可以从候选键中选择一个作为主键。 外…...