关于RabbitMQ你了解多少?
关于RabbitMQ你了解多少?
文章目录
- 关于RabbitMQ你了解多少?
- 基础篇
- 同步和异步
- MQ技术选型
- 介绍和安装
- 数据隔离
- SpringAMQP
- 快速入门
- Work queues
- 交换机
- Fanout交换机
- Direct交换机
- Topic交换机
- 声明队列和交换机
- MQ消息转换器
- 高级篇
- 消息可靠性问题
- 发送者的可靠性
- 生产者重连
- 生产者确认
- SpringAMQP实现生产者确认
RabbitMQ是目前企业中应用非常广泛的高性能的异步通讯组件
基础篇
同步和异步
同步调用:
- 优势:时效性强,等待到结果后才返回
- 不足:拓展性差、性能下降、级联失败问题
异步调用:
- 异步调用方式其实就是基于消息通知的方式,一般包含三个角色
- 消息发送者:投递消息的人,就是原来的调用方
- 消息代理:管理、暂存、转发消息
- 消息接收者:接收和处理消息的人,就是原来的服务提供方
- 优势:解除耦合,拓展性强、无需等待,性能好、故障隔离、缓存消息,流量削峰填谷
- 不足:不能立刻得到调用结果,时效性差、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性
MQ技术选型
MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broker。
| RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
|---|---|---|---|---|
| 公司/社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议支持 | AMQP、XMPP、SMTP、STOMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
| 可用性 | 高 | 一般 | 高 | 高 |
| 单机吞吐量 | 一般 | 差 | 高 | 非常高 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 消息可靠性 | 高 | 一般 | 高 | 一般 |
介绍和安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址
同样基于Docker来安装RabbitMQ,使用下面的命令即可:
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management
-
15672:RabbitMQ提供的管理控制台的端口
-
5672:RabbitMQ的消息发送处理接口
-
安装完成后访问管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。 登录后即可看到管理控制台总览页面:

RabbitMQ对应的整体架构核心概念:
- publisher:生产者,也就是发送消息的一方
- consumer:消费者,也就是消费消息的一方
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchange:交换机,负责消息路由、转发消息。生产者发送的消息由交换机决定投递到哪个队列,无存储能力
- virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

数据隔离
- 每个Virtual Host提供了一个独立的消息代理,它们之间完全隔离,相互之间不共享任何资源。通过将不同的应用程序或服务分配到不同的Virtual Host,可以实现数据的隔离。
- 每个Virtual Host都有自己独立的交换机、队列和绑定规则。这样,消息只能在同一个Virtual Host内进行路由和传递,不会跨越Virtual Host。
- 通过使用Virtual Hosts,可以将不同的应用程序或服务的消息进行逻辑隔离,确保它们之间不会互相干扰或访问彼此的数据。这在多租户环境下尤为有用,可以确保不同的租户之间的消息数据完全隔离,提高安全性和隐私性。
SpringAMQP
SpringAMQP的官方地址
- AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
- Spring AMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
快速入门
-
引入spring-amqp依赖,这样publisher和consumer服务都可以使用:
<!——AMQP依赖,包含RabbitMQ--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> -
配置RabbitMQ服务端信息
spring:rabbitmq:host: 192.168.100.101 #主机名port: 5672 # 端口virtual-host: /liner #虚拟主机username: admin #用户名password: admin123 #密码 -
发送消息:SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSimpleQueue() {//队列名称String queueName = "simple.queue";//消息String message = "hello,spring amqp!";//发送消息rabbitTemplate.convertAndSend(queueName,message); } -
接收消息:SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法
@slf4j @Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {log.info("spring消费者接收到消息:【"+ msg + "】");if (true) {throw new MessageconversionException("故意的");}log.info("消息处理完成");} }
Work queues
Work queues,任务模型。让多个消费者绑定到一个队列,共同消费队列中的消息。
消费者消息推送限制
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息
spring:rabbitmq:listener:simple:prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
work模型的使用:
- 多个消费者绑定到一个队列,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳
交换机
真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
Fanout交换机
Fanout Exchange:会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式。

Direct交换机
Direct Exchange :会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

Topic交换机
TopicExchange:与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 “.” 分割。
Queue与Exchange指定BIndingKey时可以使用通配符:
#:代指0个或多个单词*:代指一个单词

声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

@Configuration
public class FanoutConfiguration{@Beanpublic FanoutExchange fanoutExchange(){//ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("liner.fanout");}@Beanpublic Queue fanoutQueue(){//QueueBuilder.durable("").build();return new Queue("fanout.queue");}@Beanpublic Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}@Beanpublic Binding fanoutBinding2(){return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}
SpringAMQP还提供了基于 @RabbitListener 注解来声明队列和交换机的方式:
@RabbitListener(bindings = @QueueBinding(value = Queue (name = "direct.queue",durable = "true"),exchange = @Exchange(name = "liner.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue(String msg){System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}
MQ消息转换器
Spring的消息发送代码接收的消息体是一个Object:
而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 默认情况下Spring采用的序列化方式是JDK序列化。存在问题:数据体积过大、有安全漏洞、可读性差

因此建议采用JSON序列化代替默认的JDK序列化
- 在
publisher和consumer两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。
- 配置消息转换器,在
publisher和consumer两个服务的启动类中添加Bean:
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断

高级篇
消息可靠性问题
发送者的可靠性
生产者重连
有的时候由于网络波动,可能会出现客户端连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled: true #开启超时重试机制initial-interval: 1000ms #失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 #最大重试次数
注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
生产者确认
RabbitMQ有Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,在MQ成功收到消息后会返回确认消息给生产者。返回的结果有以下几种情况:
- 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
SpringAMQP实现生产者确认
-
在publisher这个微服务的application.yml中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated #开启publisher confirm机制,并设置confirm类型publisher-returns: true #开启publisher return机制#这里publisher-confirm-type有三种模式可选: # none: 关闭confirm机制 # simple: 同步阻塞等待MQ的回执消息 # correlated: MQ异步回调方式返回回执消息 -
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallbackrabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {log.info("消息发送失败,应答码{{},原因{},交换机{},路由键{},消息{}",replyCode,replyText,exchange,routingKey,message.toString());});} } -
发送消息,指定消息ID、消息ConfirmCallback
@Test void testPublisherConfirm() throws InterruptedException {//1.创建CorrelationDataCorrelationData cd = new CorrelationData();//2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("handle message ack fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(), boolean类型, true代表ack回执,false代表nack回执log.debug("发送消息成功,收到ack!");}else{ // result.getReason(), String类型,返回nack时的异常描述log.error("发送消息失败,收到nack,reason:{}",result.getReason());}}});//3.发送消息rabbitTemplate.convertAndSend("liner.direct","red","hello",cd); }
相关文章:
关于RabbitMQ你了解多少?
关于RabbitMQ你了解多少? 文章目录 关于RabbitMQ你了解多少?基础篇同步和异步MQ技术选型介绍和安装数据隔离SpringAMQP快速入门Work queues交换机Fanout交换机Direct交换机Topic交换机 声明队列和交换机MQ消息转换器 高级篇消息可靠性问题发送者的可靠性…...
Vulkan-着色器及编译SPIR-V
1.着色器模块介绍 Vulkan着色器代码一定要用字节码格式,而不是人类可读的语法如GLSL和HLSL。这个字节码就是SPIR-V,设计用于Vulkan和OpenCL。这是一个可以用于编写图形和计算着色器的格式,但是我们主要关注的是Vulkan的图形管线。使用字节码格…...
从MVC到DDD,该如何下手重构?
作者:付政委 博客:bugstack.cn 沉淀、分享、成长,让自己和他人都能有所收获!😄 大家好,我是技术UP主小傅哥。多年的 DDD 应用,使我开了技术的眼界! MVC 旧工程腐化严重,…...
论文阅读:基于隐马尔可夫模型的蛋白质多序列比对方法研究
本文来自chatpaper Basic Information: • Title: Research on Protein Multiple Sequence Alignment Method Based on Hidden Markov Model (基于隐马尔可夫模型的蛋白质多序列比对方法研究) • Authors: Zhan Qing • Affiliation: Harbin Institute of Technology (哈尔滨工…...
Vim同时打开多个文件
分屏模式 在 Vim 中,可以同时打开多个文件并使用分屏模式来查看它们。以下是一些常见的方法和命令: 在启动 Vim 时打开多个文件 使用 -o 选项打开文件并水平分屏: vim -o file1.txt file2.txt使用 -O 选项打开文件并垂直分屏: v…...
SpringCloudStreamkafka接收jsonarray字符串失败
文章目录 场景现象问题处理 场景现象 kafka作为消息队列,作为前端设备数据到后端消费的渠道,也被多个不同微服务消费一个服务与前端边缘计算设备建立socket消息,接收实时交通事件推送,再将事件发送到kafka里面。此处使用的是Spri…...
面向对象特性分析大全集
面向对象特性分析 先进行专栏介绍 面向对象总析前提小知识分类浅析封装浅析继承浅析多态面向对象编程优点abc 核心思想实际应用总结 封装概念详解关键主要目的核心思想优点12 缺点12 Java代码实现封装特性 继承概念详解语法示例关键主要目的核心思想优点12 缺点12 Java代码实现…...
【数据结构】队列和栈
大家中秋节快乐,玩了好几天没有学习,今天分享的是栈以及队列的相关知识,以及栈和队列相关的面试题 1.栈 1.1栈的概念及结构 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作…...
WordPress主题开发( 十)之—— 条件标签函数(上)
这里写目录标题 什么是条件标签函数?条件标签函数的使用场景使用条件标签函数的注意事项常用的条件标签函数主页示例:is_front_page() 示例:管理后台is_admin() 示例:单个文章页面is_single() 示例:is_single(17) 示例:is_single(Hello World) 示例:is_single(hello…...
vue学习-10vue整合SpringBoot跨域请求
在Vue.js应用整合Spring Boot后端时,需要处理跨域请求。跨域请求通常发生在前端应用运行在不同的域名或端口上时,而后端服务运行在不同的域名或端口上。以下是一种处理跨域请求的常见方式: 后端(Spring Boot)配置 在…...
ElasticSearch - 基于 JavaRestClient 查询文档(match、精确、复合查询,以及排序、分页、高亮)
目录 一、基于 JavaRestClient 查询文档 1.1、查询 API 演示 1.1.1、查询基本框架 DSL 请求的对应格式 响应的解析 1.1.2、全文检索查询 1.1.3、精确查询 1.1.4、复合查询 1.1.5、排序和分页 1.1.6、高亮 一、基于 JavaRestClient 查询文档 1.1、查询 API 演示 1.1.…...
简易实现通讯录(2.0)
这篇文章是在上期实现的通讯录基础上,增加了自动增容的功能,也解决了一开始通讯录自动开辟一个空间,可能会浪费空间,或者是信息过多无法增容的痛点,由于我们使用的是malloc这类函数来开辟空间,我们也需要来…...
Jasypt 实现自定义加解密
如下文章已经讲解了, Jasypt 是什么,怎么集成 Jasypt,怎么使用 Jasypt。 Jasypt 开源加密库使用教程_jasyptstringencryptor-CSDN博客Jasypt 加密框架概述1、Jasypt Spring Boot 为 spring boot 应用程序中的属性源提供加密支持,…...
Leetcode 554. 砖墙
文章目录 题目代码(9.25 首刷自解) 题目 Leetcode 554. 砖墙 代码(9.25 首刷自解) class Solution { public:int leastBricks(vector<vector<int>>& wall) {unordered_map<int, int> mp;int count 0;for…...
Python 内置函数详解 (3) 进制转换
近期在外旅游,本篇是出发前定时发布的,不完整,旅游回来后再补充。 Python 内置函数 Python3.11共有75个内置函数,其来历和分类请参考:Python 新版本有75个内置函数,你不会不知道吧_Hann Yang的博客-CSDN博客 函数列表 abs aiter all …...
SPSS列联表分析
前言: 本专栏参考教材为《SPSS22.0从入门到精通》,由于软件版本原因,部分内容有所改变,为适应软件版本的变化,特此创作此专栏便于大家学习。本专栏使用软件为:SPSS25.0 本专栏所有的数据文件可在个人主页—…...
聊聊并发编程——并发容器和阻塞队列
目录 一.ConcurrentHashMap 1.为什么要使用ConcurrentHashMap? 2.ConcurrentHashMap的类图 3.ConcurrentHashMap的结构图 二.阻塞队列 Java中的7个阻塞队列 ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 LinkedBlockingQueue…...
我庄严承诺终生不去承德旅游
虽然人微言轻,但也要尽一份力。 在此,我庄严承诺: 如果承德相关机构不返还那名"灵活就业人员"105.82万元的财产,并进行公开道歉。 我将终生不去承德旅游, 我将终生不买承德出产的任何产品。 我还将劝诫我…...
【python】python实现杨辉三角的三种方法
文章目录 1.杨辉三角介绍:2.方法一:迭代3.方法二:生成器4.方法三:递归 1.杨辉三角介绍: 杨辉三角是一种数学图形,由数字排列成类似三角形的形状。它的每个数值等于它上方两个数值之和。这个三角形的形状可以…...
GitHub 基本操作
最近要发展一下自己的 github 账号了,把以前的项目代码规整规整上传上去,这里总结了一些经验,经过数次实践之后,已解决几乎所有基本操作中的bug,根据下面的操作步骤来,绝对没错了。(若有其他问题…...
Python实现prophet 理论及参数优化
文章目录 Prophet理论及模型参数介绍Python代码完整实现prophet 添加外部数据进行模型优化 之前初步学习prophet的时候,写过一篇简单实现,后期随着对该模型的深入研究,本次记录涉及到prophet 的公式以及参数调优,从公式可以更直观…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
QT: `long long` 类型转换为 `QString` 2025.6.5
在 Qt 中,将 long long 类型转换为 QString 可以通过以下两种常用方法实现: 方法 1:使用 QString::number() 直接调用 QString 的静态方法 number(),将数值转换为字符串: long long value 1234567890123456789LL; …...
初学 pytest 记录
安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
Razor编程中@Html的方法使用大全
文章目录 1. 基础HTML辅助方法1.1 Html.ActionLink()1.2 Html.RouteLink()1.3 Html.Display() / Html.DisplayFor()1.4 Html.Editor() / Html.EditorFor()1.5 Html.Label() / Html.LabelFor()1.6 Html.TextBox() / Html.TextBoxFor() 2. 表单相关辅助方法2.1 Html.BeginForm() …...
LRU 缓存机制详解与实现(Java版) + 力扣解决
📌 LRU 缓存机制详解与实现(Java版) 一、📖 问题背景 在日常开发中,我们经常会使用 缓存(Cache) 来提升性能。但由于内存有限,缓存不可能无限增长,于是需要策略决定&am…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
