消息队列RabbitMQ
文章目录
- 1. 简介与安装
- 2. 基本概念
- 3. SpringAMQP
- 4. 交换机类型
- 5. 消息转换器
- 5.1 默认转换器
- 5.2 配置JSON转换器
- 6 生产者的可靠性
- 6.1 生产者超时重连机制
- 6.2 生产者确认机制
- 6. MQ的可靠性
- 6.1 数据持久化
- 6.2 惰性队列 Lazy Queue
- 7. 消费者的可靠性
- 7.1 消费者确认机制
- 7.2 失败重试机制
- 7.3 失败处理策略
- 7.4 业务幂等性方案
- 7.4.1 唯一消息ID
- 7.4.2 业务判断
- 7.5 兜底策略
- 8. 延迟消息
- 8.1 死信交换机
- 8.2 DelayExchange插件
1. 简介与安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,支持AMQP,XMPP,SMTP,STOMP协议,消息延迟时微秒级别的。
Ubuntu系统RabbitMQ的安装
2. 基本概念
- Publisher 生产者,发送消息的一方
- Consumer 消费者,接收消息的一方
- Queue 队列,存储消息
- Exchange 交换机,负责消息路由,生产者发送的消息由交换机负责投递到相应的队列。不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
- VirtualHost 虚拟主机,起到数据隔离的作用,有各自的交换机和队列
3. SpringAMQP
-
导入Maven依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
SpringAMQP提供了RabbitTemplate工具,用于发送消息
-
yml配置
spring:rabbitmq:host: 127.0.0.1 # MQ部署的机器IPport: 5672 # 端口virtual-host: /test # 虚拟主机username: admin # 用户名password: admin # 密码
-
RabbitMQ管理系统配置
- 创建虚拟主机/test
- 创建交换机test.direct
- 创建队列test.queue
- 将队列test.queue绑定到交换机test.direct
-
发送消息测试
class LearnApplicationTests {@AutowiredRabbitTemplate rabbitTemplate;@Testvoid testSend() {String exchange = "test.direct";String msg = "hello RabbitMQ";rabbitTemplate.convertAndSend(exchange, "", msg);} }
-
接收消息测试
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;/*** @author zyq*/ @Component public class SpringRabbitListener {@RabbitListener(queues = "test.queue")public void listenSimpleQueueMessage(String msg) {System.out.println("消费者接收到消息:【" + msg + "】");} }
-
一个队列上存在多个监听器时,假定队列
test.queue
上有两个消费者listener1和listener2,默认情况下队列上的消息会由两个消费者平均分配(第一个消息发给listener1,第二个消息发给listener2,第三个消息发给listener1, …),如果两个消费者的性能存在差异,那么性能好的消费者的资源无法充分利用,可以通过配置prefetch = 1
切换到“能者多劳”策略。spring:rabbitmq:host: 127.0.0.1 # MQ部署的机器IPport: 5672 # 端口virtual-host: /test_host # 虚拟主机username: admin # 用户名password: admin # 密码listener:simple:prefetch: 1 # 能者多劳,不配置的话是将消息平均发送给消费者
4. 交换机类型
- Fanout交换机 广播交换机,将消息发送到所有绑定的队列
- Direct交换机 根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routingkey完全一致,才会接收到消息。
- Topic交换机 可以让队列在绑定BindingKey 的时候使用通配符
#
匹配一个或多个词*
匹配一个词
5. 消息转换器
5.1 默认转换器
在数据传输时,发送的消息被序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。 只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
5.2 配置JSON转换器
- 引入Maven依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId> </dependency>
- 配置Bean
@Bean public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter(); }
6 生产者的可靠性
一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况。少数情况下,可能出现投递的消息没有成功入队。
6.1 生产者超时重连机制
在生产者服务中进行如下配置
spring:rabbitmq:connection-timeout: 1s # 连接超时时间template:retry:enabled: true # 开启超时重连机制initial-interval: 1000ms # 初始等待时间multiplier: 1 # 等待时长倍数,下次等待时长 initial-interval * multipliermax-attempts: 3 # 重试次数
当网络不稳定时,超时重连机制可以提高消息的发送成功率,但是SpringAMQP提供的重连机制时阻塞式
的。不建议开启该功能,若业务需要,需要配置合理的等待时间和重试次数,也可以使用异步线程来执行发送消息的代码。
6.2 生产者确认机制
配置文件配置选项
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型# none:关闭confirm机制; simple:同步阻塞等待MQ的回执; correlated:MQ异步回调返回回执(推荐)publisher-returns: true # 开启publisher return机制
-
Publisher Return 消息成功到达交换机,但是路由失败时会触发ReturnCallback,往往时编程导致的,可以避免
@Configuration public class MqConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnCallback((message, code, text, exchange, key) -> {// 实现return callbackSystem.err.println("【Return Call】 message: " + message + ", replyText: " + text);});} }
-
Publisher Confirm
- 消息投递到交换机,但是路由失败,触发ReturnCallback,返回ACK
- 临时消息(不需要持久化)投递到交换机并入队成功,返回ACK
- 持久化消息投递到交换机,入队成功并完成持久化,返回ACK
- 其他情况返回NACK,标识投递失败
@Test void contextLoads() {// new CorrelationData(UUID.randomUUID().toString());CorrelationData cd = new CorrelationData();cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable throwable) {// Future本身发生错误,一般不需要处理}@Overridepublic void onSuccess(CorrelationData.Confirm confirm) {// Future处理成功if (confirm.isAck()) {// 消息发送成功 ACK} else {// 消息发送失败 NACK// 执行消息发送失败的业务逻辑}}});String exchange = "";String routingKey = "";String msg = "";rabbitTemplate.convertAndSend(exchange, routingKey, msg, cd); }
-
总结
生产者确认机制比较耗费资源,一般不开启,不开启确认每秒钟可以投递数万的消息,而开启后只能投递数千。若业务需要高可靠性,只需要开启Publisher Confirm处理NACK的情况即可。
6. MQ的可靠性
消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失。
- MQ宕机;
- 内存空间不足,引发MQ阻塞执行持久化;
6.1 数据持久化
- 交换机持久化(默认开启);
- 队列持久化(默认开启);
- 消息持久化(Delivery-mode需要指定为2,也就是持久化)
- 若不开启消息持久化,在内存不足时,会发生MQ阻塞写磁盘PageOut;
- 若开启消息持久化,会同步将消息写到磁盘,MQ不会出现阻塞的现象,速度稍微慢一点点。
6.2 惰性队列 Lazy Queue
- 从3.6.0开始支持,从3.12开始默认使用该策略
- 接收到消息后直接写入磁盘(内存默认只保留2048条),消息消费时才加载到内存,支持百万消息存储
7. 消费者的可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消费者消费消息可能出现故障,比如:
- 消息投递的过程中出现了网络故障
- 消费者接收到消息后突然宕机
- 消费者接收到消息后,因处理不当导致异常
7.1 消费者确认机制
RabbitMQ提供了消费者确认机制(Consumer Acknowledgement),当消费者处理消息后,向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
- ack:成功处理消息,RabbitMQ从队列中删除该消息
- nack:消息处理失败,RabbitMQ需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
消息确认机制的实现方式
- none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
- auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
- 如果是业务异常,会自动返回nack;
- 如果是消息处理或校验异常,自动返回reject,比如发生MessageConversionException
7.2 失败重试机制
开启消费者确认机制后,如果消息处理一直返回NACK,那么消息会反复进行入队和处理,会导致MQ压力飙升。
而开启失败重试机制后,消息会在本地重试,而不是重新入队,本地重试达到最大次数后,默认会返回reject丢弃消息。
在消费者服务的配置文件中进行配置
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
7.3 失败处理策略
本地重试达到最大次数后,默认会返回reject丢弃消息,而有些业务显然无法接受消息的丢失。MQ支持之定义重试次数耗尽后的处理策略
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息,默认方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。(后续可进行人工处理)
需要定义如下配置类
@Configuration
public class MqErrorConfig {private final static String ERROR_EXCHANGE = "error.direct";private final static String ERROR_QUEUE = "error.queue";private final static String ERROR_ROTING_KEY = "error";/*** 创建处理失败消息的交换机* @return*/@Beanpublic DirectExchange errorExchange() {return new DirectExchange(ERROR_EXCHANGE);}/*** 创建存放失败消息的队列* @return*/@Beanpublic Queue errorQueue() {return new Queue(ERROR_QUEUE);}/*** 交换机与队列绑定* @param errorQueue* @param errorExchange* @return*/@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with(ERROR_ROTING_KEY);}/*** 注册处理失败消息处理策略* @param rabbitTemplate* @return*/@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, ERROR_EXCHANGE, ERROR_ROTING_KEY);}
}
7.4 业务幂等性方案
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。
7.4.1 唯一消息ID
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
进行如下配置,SpringAMQP会在消息头部自动添加唯一ID
@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}
7.4.2 业务判断
非幂等性业务,会对数据进行更改,那么我们在执行业务逻辑前,可先判断数据记录是否处于未处理状态,比如可以根据订单的状态。
7.5 兜底策略
开启定时任务主动去查询数据库,判断数据有需要处理的数据。
8. 延迟消息
8.1 死信交换机
设计两个队列两个交换机,当消息过期时,消息会被投递到死信队列,只需监听死信队列即可。通过设置队列dead-letter-exchange
指定过期的消息投递的交换机,也就是死信交换机。对于消息,通过expration指定过期时间。
然而,RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。 当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
8.2 DelayExchange插件
开启队列的delayed配置,并且在投递消息时设置delay时长。
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不建议设置延迟时间过长的延迟消息。
改进策略,将消息的delay时长分段,比如将延迟时间切割成10s 10s 10s 15s 15s …,大部分消息在前30s内就已经可以被消费,不需要等到30分钟,可以有效防止消息堆积。
参考资料:https://www.bilibili.com/video/BV1mN4y1Z7t9/
相关文章:

消息队列RabbitMQ
文章目录 1. 简介与安装2. 基本概念3. SpringAMQP4. 交换机类型5. 消息转换器5.1 默认转换器5.2 配置JSON转换器 6 生产者的可靠性6.1 生产者超时重连机制6.2 生产者确认机制 6. MQ的可靠性6.1 数据持久化6.2 惰性队列 Lazy Queue 7. 消费者的可靠性7.1 消费者确认机制7.2 失败…...

RabbitMQ概述
什么是MQ MQ (message queue)消息队列 MQ从字⾯意思上看,本质是个队列,FIFO先⼊先出,只不过队列中存放的内容是消息(message).消息可以⾮常简单,⽐如只包含⽂本字符串,JSON等,也可以很复杂,⽐如内嵌对象 RabbitMQ是MQ的一种实现,是Rabbit 企业下的⼀个消息队列产…...
Golang学习路线
以下是一条学习Golang(Go语言)的路线: 一、基础入门 1. 环境搭建 安装Go编译器,在官网(https://golang.org/dl/)下载适合操作系统的安装包并配置好环境变量。 2. 语法学习学习变量、数据类型(…...
Flink从ck拉起任务脚本
#!/bin/bashAPP_NAME"orderTest"CHECKPOINT_BASE_PATH"hdfs:///jobs/flink/checkpoints/aaa-test/"is_running$(yarn application -list | grep -w "$APP_NAME" | grep -c "RUNNING")if [ $is_running -gt 0 ]; thenecho "应用程…...

GADBench Revisiting and Benchmarking Supervised Graph Anomaly Detection
Neurips 23 推荐指数: #paper/⭐⭐⭐ 领域:图异常检测 胡言乱语: neurips 的benchmark模块的文章总能给人一些启发性的理解,这篇的insight真有意思。个人感兴趣的地方会加粗。此外,这篇文章和腾讯AIlab合作ÿ…...

某象异形滑块99%准确率方案
注意,本文只提供学习的思路,严禁违反法律以及破坏信息系统等行为,本文只提供思路 如有侵犯,请联系作者下架 该文章模型已经上线ocr识别网站,欢迎测试!!,地址:https://yxlocr.windy-rain.cn/ocr/slider/6 所谓的顶象异形滑块,是指没有采用常规的缺口,使用各种形状的…...

CDN绕过学习
1.什么是CDN? CDN就是分布在各个地区的服务器,这些服务器储存着数据的副本。 哪些服务器比较接近你,当你发起请求时,提前就会快速为你提供服务。 总结来说就是: 其实就是用来加速访问的,以及缓解压力&a…...

SpringBoot日常:redission的接入使用和源码解析
文章目录 一、简介二、集成redissionpom文件redission 配置文件application.yml文件启动类 三、JAVA 操作案例字符串操作哈希操作列表操作集合操作有序集合操作布隆过滤器操作分布式锁操作 四、源码解析 一、简介 Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格…...
npm包管理深度探索:从基础到进阶全面教程!
目录 一、npm概述1、npm简介(1)什么是npm?(2)npm的核心功能(3)npm的工作原理(4)npm的优势(5)npm的局限性(6)总结 2、npm的…...

最新免费GPT4O和Midjourney
一、什么是GPT4O? GPT-4 是 OpenAI 研发的大型语言模型。它具有强大的语言理解和生成能力,在自然语言处理等诸多领域有着广泛的应用和表现。 二、什么是Midjourney? Midjourney 是一款人工智能图像生成工具。它可以根据用户输入的描述或提…...
python操作OpenAI教程
python操作OpenAI pip install -U openai代码: from openai import OpenAI# 解决请求超时问题 import os os.environ["http_proxy"] "http://localhost:7890" os.environ["https_proxy"] "http://localhost:7890"# 需要…...
如何版本REST API:综合指南
目录 总则什么是REST API版本控制?为什么API版本控制很重要?如何对REST API进行版本控制 理解API契约评估需求选择版本控制策略沟通变化保持向后兼容性详细文档记录REST API版本控制最佳实践REST API版本控制常见问题:REST API版本控制总则 版本化REST API对于确保软件应用…...
Docker 环境下 Nginx 监控实战:使用 Prometheus 实现 Nginx 性能监控的完整部署指南
Docker 环境下 Nginx 监控实战:使用 Prometheus 实现 Nginx 性能监控的完整部署指南 文章目录 Docker 环境下 Nginx 监控实战:使用 Prometheus 实现 Nginx 性能监控的完整部署指南一 查看模块是否安装二 配置 status 访问端点三 Docker 部署 nginx-prome…...

网络安全-IPv4和IPv6的区别
1. 2409:8c20:6:1135:0:ff:b027:210d。 这是一个IPv6地址。IPv6(互联网协议版本6)是用于标识网络中的设备的一种协议,它可以提供比IPv4更大的地址空间。这个地址由八组十六进制数字组成,每组之间用冒号分隔。IPv6地址通常用于替代…...

【移动端】事件基础
一、移动端事件分类 移动端事件主要分为以下几类: 1. 触摸事件(Touch Events) 触摸事件是移动设备特有的事件,用来处理用户通过触摸屏幕进行的操作。主要的触摸事件有: touchstart:手指触摸屏幕时触发。…...

软件测试比赛-学习
一、环境配置 二、浏览器适配 //1.设置浏览器的位置,google浏览器位置是默认且固定在电脑里的//2.设置浏览器驱动的位置,C:\Users\27743\AppData\Local\Google\Chrome\ApplicationSystem.setProperty("webdriver.chrome.driver", "C:\\Users\\27743\\AppData\\…...

力扣LeetCode-链表中的循环与递归使用
标题做题的时候发现循环与递归的使用差别: 看两道题: 两道题都是不知道链表有多长,所以需要用到循环,用到循环就可以把整个过程分成多个循环体,就是每一次循环要执行的内容。 反转链表: 把null–>1…...

AFSim仿真系统 --- 系统简解_08 传感器与特征
传感器与特征 传感器是平台的一部分,为拥有该平台提供了探测其他平台及其组成部分的能力。 特征是平台的一种属性,用于确定特定传感器是否能够探测到特征所拥有的平台。 以下是用于探测平台的一些特征属性列表: 声学红外光学雷达 AFSIM …...

已经安装了qt,想添加mingw组件,包含gcc等
1、已经安装了qt,想添加mingw组件, 步骤1 双击打开MaintenanceTool.exe, 步骤2: 选择清华大学开源软件镜像网站,选择相应QT版本添加网址https://mirrors.tuna.tsinghua.edu.cn/qt/online/qtsdkrepository/windows_x8…...

数据库管理-第250期 深入浅出多主多活数据库技术- Cantian存储引擎(一)(20241009)
数据库管理250期 2024-10-09 数据库管理-第250期 深入浅出多主多活数据库技术- Cantian存储引擎(一)(20241009)1 简介2 引擎构成3 引擎架构4 文件分布5 分布式MVCC6 限制/要求总结 数据库管理-第250期 深入浅出多主多活数据库技术…...

TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...

React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...

ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

边缘计算网关提升水产养殖尾水处理的远程运维效率
一、项目背景 随着水产养殖行业的快速发展,养殖尾水的处理成为了一个亟待解决的环保问题。传统的尾水处理方式不仅效率低下,而且难以实现精准监控和管理。为了提升尾水处理的效果和效率,同时降低人力成本,某大型水产养殖企业决定…...

麒麟系统使用-进行.NET开发
文章目录 前言一、搭建dotnet环境1.获取相关资源2.配置dotnet 二、使用dotnet三、其他说明总结 前言 麒麟系统的内核是基于linux的,如果需要进行.NET开发,则需要安装特定的应用。由于NET Framework 是仅适用于 Windows 版本的 .NET,所以要进…...
LangChain【6】之输出解析器:结构化LLM响应的关键工具
文章目录 一 LangChain输出解析器概述1.1 什么是输出解析器?1.2 主要功能与工作原理1.3 常用解析器类型 二 主要输出解析器类型2.1 Pydantic/Json输出解析器2.2 结构化输出解析器2.3 列表解析器2.4 日期解析器2.5 Json输出解析器2.6 xml输出解析器 三 高级使用技巧3…...

break 语句和 continue 语句
break语句和continue语句都具有跳转作用,可以让代码不按既有的顺序执行 break break语句用于跳出代码块或循环 1 2 3 4 5 6 for (var i 0; i < 5; i) { if (i 3){ break; } console.log(i); } continue continue语句用于立即终…...