RabbitMQ之发送者(生产者)可靠性
文章目录
- 前言
- 一、生产者重试机制
- 二、生产者确认机制
- 实现生产者确认
- (1)定义ReturnCallback
- (2)定义ConfirmCallback
- 总结
前言
生产者重试机制、生产者确认机制。
一、生产者重试机制
- 问题:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
- 解决:SpringAMQP提供的消息发送时的重试机制。即:当RabbitTemplate与MQ连接超时后,多次重试。
实现:
需要配置application.yaml文件
spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数
我们利用命令停掉RabbitMQ服务:
docker stop mq
然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送的超时重试机制配置成功了!
注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式
的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。
如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。
二、生产者确认机制
- 一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。
- 不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:
- MQ内部处理消息的进程发生了异常
- 生产者发送消息到达MQ后未找到Exchange
- 生产者发送消息到达MQ的Exchange后,未找到合适的Queue,因此无法路由
- 针对上述情况,RabbitMQ提供了生产者消息确认机制,包括Publisher Confirm和Publisher Return两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。
总结如下:
- 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且
入队完成持久化
,返回ACK ,告知投递成功 - 其它情况都会返回NACK,告知投递失败
- 其中ack和nack属于Publisher Confirm机制,ack是投递成功;nack是投递失败。而return则属于Publisher Return机制。
默认两种机制都是关闭状态,需要通过配置文件来开启。
实现生产者确认
发送者的application.yaml配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制
这里publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞等待MQ的回执
- correlated:MQ异步回调返回回执
一般我们推荐使用correlated,回调机制。
(1)定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在发送者定义一个配置类:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallback(回调)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息的return callback, exchange:{},key:{}, msg:{}, code:{},text:{}", returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}
(2)定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
- id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
- SettableListenableFuture:回执结果的Future对象
将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:
发送消息的测试类(添加ConfirmCallback):
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息 RoutingKey是错误的rabbitTemplate.convertAndSend("dragon.direct", "q", "hello", cd);
}
由于传递的RoutingKey是错误的,路由失败后,触发了return callback,同时也收到了ack。当我们修改为正确的RoutingKey以后,就不会触发return callback了,只收到ack。而如果连交换机都是错误的,则只会收到nack。
注意:
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:
- 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
- 交换机名称错误:同样是编程错误导致
- MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以了。
总结
以上就是全部讲解。
相关文章:

RabbitMQ之发送者(生产者)可靠性
文章目录 前言一、生产者重试机制二、生产者确认机制实现生产者确认(1)定义ReturnCallback(2)定义ConfirmCallback 总结 前言 生产者重试机制、生产者确认机制。 一、生产者重试机制 问题:生产者发送消息时࿰…...

乐得瑞LDR6020 VR串流线方案:实现同时充电传输视频信号
VR(Virtual Reality),俗称虚拟现实技术,是一项具有巨大潜力的技术创新,正在以惊人的速度改变我们的生活方式和体验,利用专门设计的设备,如头戴式显示器(VR头盔)、手柄、定…...

【libGDX】Mesh纹理贴图
1 前言 纹理贴图的本质是将图片的纹理坐标与模型的顶点坐标建立一一映射关系。纹理坐标的 x、y 轴正方向分别朝右和朝下,如下。 2 纹理贴图 本节将使用 Mesh、ShaderProgram、Shader 实现纹理贴图,OpenGL ES 的实现见博客 → 纹理贴图。 DesktopLauncher…...
基线扫描tomcat安全加固-检查是否支持HTTPS等加密协议
背景:基线扫描时,docker镜像中的tomcat在检查是否支持HTTPS等加密协议这一项上未通过。 思路:先通过JDK自带的keytool工具生成证书,再从tomcat的server.xml配置文件中增加配置。 我不确定不同版本的JDK生成的证书是否可以通用&a…...

基于 STM32F7 和神经网络的实时人脸特征提取与匹配算法实现
本文讨论了如何使用 STM32F7 和神经网络模型来实现实时人脸特征提取与匹配算法。首先介绍了 STM32F7 的硬件和软件特点,然后讨论了人脸特征提取和匹配算法的基本原理。接下来,我们将重点讨论如何在 STM32F7 上实现基于神经网络的人脸特征提取与匹配算法&…...

Android笔记(十四):JetPack Compose中附带效应(一)
在Android应用中可以通过定义可组合函数来搭建应用界面。应用界面的更新往往是与可组合函数内部定义的状态值相关联的。当界面的状态值发生变更,会导致应用界面进行更新。在Android笔记(九):Compose组件的状态,对Compo…...

【web】Fastapi自动生成接口文档(Swagger、ReDoc )
简介 FastAPI是流行的Python web框架,适用于开发高吞吐量API和微服务(直接支持异步编程) FastAPI的优势之一:通过提供高级抽象和自动数据模型转换,简化请求数据的处理(用户不需要手动处理原始请求数据&am…...

竞赛选题 题目:基于FP-Growth的新闻挖掘算法系统的设计与实现
文章目录 0 前言1 项目背景2 算法架构3 FP-Growth算法原理3.1 FP树3.2 算法过程3.3 算法实现3.3.1 构建FP树 3.4 从FP树中挖掘频繁项集 4 系统设计展示5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 基于FP-Growth的新闻挖掘算法系统的设计与实现…...

188. 股票买卖问题(交易次数为任意正整数)
题目 题解 class Solution:def maxProfit(self, k: int, prices: List[int]) -> int:N len(prices)# 定义状态:dp[i][j][k]表示在第i天,有j次交易机会,持有或不持有的最大利润dp [[[0 for i in range(2)] for j in range(k1)] for m in range(N)]f…...
Typescript怎样对URL参数进行编码?
URL中的参数需要进行编码(URL encoding)是为了确保传输的参数不包含特殊字符,同时确保数据的可靠性和安全性。 特殊字符如空格、&、?等在URL中有特殊含义,如果直接包含在参数值中,可能会导致解析错误或者安全问题…...

AndroidStudio2022.3.1 Patch3使用国内下载源加速
记录一下这个版本的as在使用国内下载源加速碰到的诸多问题。 一、gradle-8.0-bin.zip下载慢 编辑项目文件夹/gradle/wrapper/gradle-wrapper.properties,文件内容改为如下: #Fri Nov 24 18:50:06 CST 2023 distributionBaseGRADLE_USER_HOME distribu…...

Go语言的学习笔记2——Go语言源文件的结构布局
用一个只有main函数的go文件来简单说一下Go语言的源文件结构布局,主要分为包名、引入的包和具体函数。下边是main.go示例代码: package mainimport "fmt"func main() { fmt.Println("hello, world") }package main就是表明这个文件…...
python给视频增加字幕
python给视频增加字幕 安装所需库 在开始之前,我们需要安装一些Python库。主要使用到的库如下: moviepy:用于处理视频和音频的库。 pydub:用于处理音频的库。 speech_recognition:用于语音识别的库。 首先࿰…...

相机设置参数:黑电平(Black Level)详解和示例
本文通过原理和示例对相机设置参数“黑电平”进行讲解,以帮助大家理解和使用。 原理 相机中黑电平原理是将电平增大,可以显示更多暗区细节,可能会损失一些亮区,但图像更多的关注暗区,获取完图像信息再减掉。只是为了…...

Mac Ubuntu双系统解决WiFi和WiFi 5G网络不可用问题
文章目录 设备信息1. Ubuntu WiFi不可用解决方式查看Mac的网卡型号根据网卡型号搜索获取到的解决方法查看WiFi名字问题参考链接 2. 解决WiFi重启后失效问题打开终端创建.sh脚本文件编辑脚本文件复制粘贴脚本修改脚本权限创建并编辑systemd service文件复制粘贴下文到systemd se…...

数据分析基础之《matplotlib(2)—折线图》
一、折线图绘制与保存图片 1、matplotlib.pyplot模块 matplotlib.pyplot包含了一系列类似于matlab的画图函数。它的函数作用于当前图形(figure)的当前坐标系(axes) import matplotlib.pyplot as plt 2、折线图绘制与显示 展示城…...
Rust语言入门教程(三) - 函数与模块系统
函数 函数的定义 根据Rust的格式规范,函数名的格式应遵从蛇形命名法,即是用小写字母以及下划线组成,如: fn do_stuff(){ }Rust并不要求函数定义的位置必须在调用它之前,所以如果你习惯于把main函数放在最前面的话&a…...

ubuntu22.04 arrch64版在线安装java环境
脚本 #安装java#!/bin/bashif type -p java; thenecho "Java has been installed."else#2.Installed Java , must install wgetwget -c https://repo.huaweicloud.com/java/jdk/8u151-b12/jdk-8u151-linux-arm64-vfp-hflt.tar.gz;tar -zxvf ./jdk-8u151-linux-arm6…...

概率论与数理统计中常见的随机变量分布律、数学期望、方差及其介绍
1 离散型随机变量 1.1 0-1分布 设随机变量X的所有可能取值为0与1两个值,其分布律为 若分布律如上所示,则称X服从以P为参数的(0-1)分布或两点分布。记作X~ B(1,p) 0-1分布的分布律利用表格法表示为: X01P1-PP 0-1分布的数学期望E(X) 0 *…...

骨传导耳机的优缺点都有哪些?骨传导耳机值得入手吗?
骨传导耳机的优点还是很多的,相比于传统耳机,骨传导耳机要更值得入手! 下面让我们了解下骨传导耳机的优缺点都有哪些: 一、优点 1、使用更安全 传统的耳机,在使用时会听不到外界的声音,而骨传导耳机通过…...

MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...

DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
jmeter聚合报告中参数详解
sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample(样本数) 表示测试中发送的请求数量,即测试执行了多少次请求。 单位,以个或者次数表示。 示例:…...
WebRTC从入门到实践 - 零基础教程
WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC? WebRTC(Web Real-Time Communication)是一个支持网页浏览器进行实时语音…...

HubSpot推出与ChatGPT的深度集成引发兴奋与担忧
上周三,HubSpot宣布已构建与ChatGPT的深度集成,这一消息在HubSpot用户和营销技术观察者中引发了极大的兴奋,但同时也存在一些关于数据安全的担忧。 许多网络声音声称,这对SaaS应用程序和人工智能而言是一场范式转变。 但向任何技…...

永磁同步电机无速度算法--基于卡尔曼滤波器的滑模观测器
一、原理介绍 传统滑模观测器采用如下结构: 传统SMO中LPF会带来相位延迟和幅值衰减,并且需要额外的相位补偿。 采用扩展卡尔曼滤波器代替常用低通滤波器(LPF),可以去除高次谐波,并且不用相位补偿就可以获得一个误差较小的转子位…...

pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决
问题: pgsql数据库通过备份数据库文件进行还原时,如果表中有自增序列,还原后可能会出现重复的序列,此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”,…...

李沐--动手学深度学习--GRU
1.GRU从零开始实现 #9.1.2GRU从零开始实现 import torch from torch import nn from d2l import torch as d2l#首先读取 8.5节中使用的时间机器数据集 batch_size,num_steps 32,35 train_iter,vocab d2l.load_data_time_machine(batch_size,num_steps) #初始化模型参数 def …...
统计学(第8版)——统计抽样学习笔记(考试用)
一、统计抽样的核心内容与问题 研究内容 从总体中科学抽取样本的方法利用样本数据推断总体特征(均值、比率、总量)控制抽样误差与非抽样误差 解决的核心问题 在成本约束下,用少量样本准确推断总体特征量化估计结果的可靠性(置…...