RabbitMQ之发送者(生产者)可靠性
文章目录
- 前言
- 一、生产者重试机制
- 二、生产者确认机制
- 实现生产者确认
- (1)定义ReturnCallback
- (2)定义ConfirmCallback
- 总结
前言
生产者重试机制、生产者确认机制。
一、生产者重试机制
- 问题:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
- 解决:SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
实现:
需要配置application.yaml文件
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
我们利用命令停掉RabbitMQ服务:
docker stop mq
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!
注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
二、生产者确认机制
- 一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
- 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
- MQ内部处理消息的进程发生了异常
- 生产者发送消息到达MQ后未找到Exchange
- 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
- 针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
总结如下:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且
入队完成持久化,返回ACK ,告知投递成功 - 其它情况都会返回NACK,告知投递失败
- 其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
实现生产者确认
发送者的application.yaml配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞等待MQ的回执
- correlated:MQ异步回调返回回执
一般我们推荐使用correlated,回调机制。
(1)定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在发送者定义一个配置类:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallback(回调)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息的return callback, exchange:{},key:{}, msg:{}, code:{},text:{}", returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}
(2)定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

这里的CorrelationData中包含两个核心的东西:
- id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
- SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

发送消息的测试类(添加ConfirmCallback):
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息 RoutingKey是错误的rabbitTemplate.convertAndSend("dragon.direct", "q", "hello", cd);
}
由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。而如果连交换机都是错误的,则只会收到nack。
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
总结
以上就是全部讲解。
相关文章:
RabbitMQ之发送者(生产者)可靠性
文章目录 前言一、生产者重试机制二、生产者确认机制实现生产者确认(1)定义ReturnCallback(2)定义ConfirmCallback 总结 前言 生产者重试机制、生产者确认机制。 一、生产者重试机制 问题:生产者发送消息时࿰…...
乐得瑞LDR6020 VR串流线方案:实现同时充电传输视频信号
VR(Virtual Reality),俗称虚拟现实技术,是一项具有巨大潜力的技术创新,正在以惊人的速度改变我们的生活方式和体验,利用专门设计的设备,如头戴式显示器(VR头盔)、手柄、定…...
【libGDX】Mesh纹理贴图
1 前言 纹理贴图的本质是将图片的纹理坐标与模型的顶点坐标建立一一映射关系。纹理坐标的 x、y 轴正方向分别朝右和朝下,如下。 2 纹理贴图 本节将使用 Mesh、ShaderProgram、Shader 实现纹理贴图,OpenGL ES 的实现见博客 → 纹理贴图。 DesktopLauncher…...
基线扫描tomcat安全加固-检查是否支持HTTPS等加密协议
背景:基线扫描时,docker镜像中的tomcat在检查是否支持HTTPS等加密协议这一项上未通过。 思路:先通过JDK自带的keytool工具生成证书,再从tomcat的server.xml配置文件中增加配置。 我不确定不同版本的JDK生成的证书是否可以通用&a…...
基于 STM32F7 和神经网络的实时人脸特征提取与匹配算法实现
本文讨论了如何使用 STM32F7 和神经网络模型来实现实时人脸特征提取与匹配算法。首先介绍了 STM32F7 的硬件和软件特点,然后讨论了人脸特征提取和匹配算法的基本原理。接下来,我们将重点讨论如何在 STM32F7 上实现基于神经网络的人脸特征提取与匹配算法&…...
Android笔记(十四):JetPack Compose中附带效应(一)
在Android应用中可以通过定义可组合函数来搭建应用界面。应用界面的更新往往是与可组合函数内部定义的状态值相关联的。当界面的状态值发生变更,会导致应用界面进行更新。在Android笔记(九):Compose组件的状态,对Compo…...
【web】Fastapi自动生成接口文档(Swagger、ReDoc )
简介 FastAPI是流行的Python web框架,适用于开发高吞吐量API和微服务(直接支持异步编程) FastAPI的优势之一:通过提供高级抽象和自动数据模型转换,简化请求数据的处理(用户不需要手动处理原始请求数据&am…...
竞赛选题 题目:基于FP-Growth的新闻挖掘算法系统的设计与实现
文章目录 0 前言1 项目背景2 算法架构3 FP-Growth算法原理3.1 FP树3.2 算法过程3.3 算法实现3.3.1 构建FP树 3.4 从FP树中挖掘频繁项集 4 系统设计展示5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 基于FP-Growth的新闻挖掘算法系统的设计与实现…...
188. 股票买卖问题(交易次数为任意正整数)
题目 题解 class Solution:def maxProfit(self, k: int, prices: List[int]) -> int:N len(prices)# 定义状态:dp[i][j][k]表示在第i天,有j次交易机会,持有或不持有的最大利润dp [[[0 for i in range(2)] for j in range(k1)] for m in range(N)]f…...
Typescript怎样对URL参数进行编码?
URL中的参数需要进行编码(URL encoding)是为了确保传输的参数不包含特殊字符,同时确保数据的可靠性和安全性。 特殊字符如空格、&、?等在URL中有特殊含义,如果直接包含在参数值中,可能会导致解析错误或者安全问题…...
AndroidStudio2022.3.1 Patch3使用国内下载源加速
记录一下这个版本的as在使用国内下载源加速碰到的诸多问题。 一、gradle-8.0-bin.zip下载慢 编辑项目文件夹/gradle/wrapper/gradle-wrapper.properties,文件内容改为如下: #Fri Nov 24 18:50:06 CST 2023 distributionBaseGRADLE_USER_HOME distribu…...
Go语言的学习笔记2——Go语言源文件的结构布局
用一个只有main函数的go文件来简单说一下Go语言的源文件结构布局,主要分为包名、引入的包和具体函数。下边是main.go示例代码: package mainimport "fmt"func main() { fmt.Println("hello, world") }package main就是表明这个文件…...
python给视频增加字幕
python给视频增加字幕 安装所需库 在开始之前,我们需要安装一些Python库。主要使用到的库如下: moviepy:用于处理视频和音频的库。 pydub:用于处理音频的库。 speech_recognition:用于语音识别的库。 首先࿰…...
相机设置参数:黑电平(Black Level)详解和示例
本文通过原理和示例对相机设置参数“黑电平”进行讲解,以帮助大家理解和使用。 原理 相机中黑电平原理是将电平增大,可以显示更多暗区细节,可能会损失一些亮区,但图像更多的关注暗区,获取完图像信息再减掉。只是为了…...
Mac Ubuntu双系统解决WiFi和WiFi 5G网络不可用问题
文章目录 设备信息1. Ubuntu WiFi不可用解决方式查看Mac的网卡型号根据网卡型号搜索获取到的解决方法查看WiFi名字问题参考链接 2. 解决WiFi重启后失效问题打开终端创建.sh脚本文件编辑脚本文件复制粘贴脚本修改脚本权限创建并编辑systemd service文件复制粘贴下文到systemd se…...
数据分析基础之《matplotlib(2)—折线图》
一、折线图绘制与保存图片 1、matplotlib.pyplot模块 matplotlib.pyplot包含了一系列类似于matlab的画图函数。它的函数作用于当前图形(figure)的当前坐标系(axes) import matplotlib.pyplot as plt 2、折线图绘制与显示 展示城…...
Rust语言入门教程(三) - 函数与模块系统
函数 函数的定义 根据Rust的格式规范,函数名的格式应遵从蛇形命名法,即是用小写字母以及下划线组成,如: fn do_stuff(){ }Rust并不要求函数定义的位置必须在调用它之前,所以如果你习惯于把main函数放在最前面的话&a…...
ubuntu22.04 arrch64版在线安装java环境
脚本 #安装java#!/bin/bashif type -p java; thenecho "Java has been installed."else#2.Installed Java , must install wgetwget -c https://repo.huaweicloud.com/java/jdk/8u151-b12/jdk-8u151-linux-arm64-vfp-hflt.tar.gz;tar -zxvf ./jdk-8u151-linux-arm6…...
概率论与数理统计中常见的随机变量分布律、数学期望、方差及其介绍
1 离散型随机变量 1.1 0-1分布 设随机变量X的所有可能取值为0与1两个值,其分布律为 若分布律如上所示,则称X服从以P为参数的(0-1)分布或两点分布。记作X~ B(1,p) 0-1分布的分布律利用表格法表示为: X01P1-PP 0-1分布的数学期望E(X) 0 *…...
骨传导耳机的优缺点都有哪些?骨传导耳机值得入手吗?
骨传导耳机的优点还是很多的,相比于传统耳机,骨传导耳机要更值得入手! 下面让我们了解下骨传导耳机的优缺点都有哪些: 一、优点 1、使用更安全 传统的耳机,在使用时会听不到外界的声音,而骨传导耳机通过…...
具微科技完成A+++轮融资,聚焦特种场景,欲打造具身智能发展新范式
36氪获悉,全域移动智能机器人公司具微科技近期完成A轮融资,总融资金额达数亿元。资金将用于技术研发与场景落地,其产品聚焦特种场景,优势显著。融资情况与团队实力具微科技此次A轮融资由滨州国投等联合领投,和达控股等…...
线性代数在数据挖掘中的核心应用,机器学习必须了解
线性代数在数据挖掘中扮演着核心数学工具的角色,其应用贯穿于数据预处理、特征工程、模型构建与优化的全过程 。 以下将从核心知识点、具体用途及实践教程三个层面进行详细阐述。 一、核心知识点及其在数据挖掘中的用途 线性代数在数据挖掘中的应用主要围绕以下几…...
计算机毕业设计:Python股票市场数据可视化与深度学习预测系统 Flask框架 LSTM Keras 数据分析 可视化 深度学习 大数据 爬虫(建议收藏)✅
博主介绍:✌全网粉丝10W,前互联网大厂软件研发、集结硕博英豪成立工作室。专注于计算机相关专业项目实战6年之久,选择我们就是选择放心、选择安心毕业✌ > 🍅想要获取完整文章或者源码,或者代做,拉到文章底部即可与…...
【限时开源】车规级Docker守护进程加固包(已通过ASPICE L2认证):含17项车载专属健康检查、断电保护快照及CAN FD透传模块
第一章:车规级Docker守护进程加固包概述车规级Docker守护进程加固包(Automotive-Grade Docker Daemon Hardening Package,简称AG-DDHP)是一套面向ISO 21434与UNECE R156合规要求设计的轻量级安全增强组件,专为车载信息…...
金融敏感数据零泄漏配置指南,深度解析Docker Secrets+Vault+TLS双向认证的闭环实践
第一章:金融敏感数据零泄漏配置指南总览金融行业对数据安全的合规性要求极为严苛,GDPR、PCI DSS、《金融数据安全分级指南》及《个人信息保护法》均明确要求对客户身份信息、账户凭证、交易流水等敏感数据实施端到端防护。零泄漏并非追求理论上的绝对安全…...
NVIDIA与Snowflake合作:GPU加速与数据云的AI开发革命
1. 当GPU加速遇上数据云:NVIDIA与Snowflake如何重塑AI开发流程上周在旧金山参加数据科学峰会时,听到同行们讨论最多的就是NVIDIA和Snowflake的这次合作。作为在数据工程领域摸爬滚打多年的从业者,我立刻意识到这不仅仅是又一场科技巨头的公关…...
STM32+ST7735S屏幕,手把手教你移植LVGL v8显示驱动(附完整代码)
STM32ST7735S屏幕移植LVGL v8显示驱动的实战指南 1. 硬件选型与基础环境搭建 在嵌入式GUI开发中,选择合适的硬件平台是项目成功的第一步。STM32系列微控制器因其丰富的外设资源和稳定的性能,成为众多开发者的首选。本次项目采用STM32F103C8T6作为主控芯片…...
CustomTkinter:解决Python GUI现代化渲染与跨平台适配的技术架构
CustomTkinter:解决Python GUI现代化渲染与跨平台适配的技术架构 【免费下载链接】CustomTkinter A modern and customizable python UI-library based on Tkinter 项目地址: https://gitcode.com/gh_mirrors/cu/CustomTkinter Python的Tkinter框架在桌面GUI…...
微信聊天记录永久保存:3步打造你的个人数字档案馆
微信聊天记录永久保存:3步打造你的个人数字档案馆 【免费下载链接】WeChatMsg 提取微信聊天记录,将其导出成HTML、Word、CSV文档永久保存,对聊天记录进行分析生成年度聊天报告 项目地址: https://gitcode.com/GitHub_Trending/we/WeChatMsg…...
告别数据丢失!深入解析M24C08 EEPROM的页写缓冲与自定时写入周期
告别数据丢失!深入解析M24C08 EEPROM的页写缓冲与自定时写入周期 在嵌入式系统开发中,数据可靠性往往决定着产品的成败。想象这样一个场景:你的设备刚刚完成了一次关键数据写入,系统立即读取验证却发现数据异常——这不是代码逻辑…...
