当前位置: 首页 > news >正文

RabbitMq 消息可靠性问题(一) --- publisher发送时丢失

前言

消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢?

  • 发送时丢失:
    • 生产者发送的消息未送达 exchange
    • 消息到达 exchange 后未到达 queue
  • MQ 宕机,queue将消息丢失
  • consumer 接收到消息后未消费就宕机
    在这里插入图片描述
    消息可靠性问题及其对应的解决方案:
场景publisher发送时丢失MQ消息丢失consumer消费问题
解决方案生产者确认机制消息持久化消费者消息确认&&失败重试机制

下面我们先说一下publisher 发送时丢失的问题应该如何处理

生产者确认机制的理论说明

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publish-confirm, 发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publish-return, 发送回执
    • 消息投递到交换机,但是没有路由到队列,返回ACK, 及路由失败原因

注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息,避免ack 冲突

在这里插入图片描述

代码实现

下面基于SpringAMQP 实现的生产者确认机制

  1. 在 publisher 服务的 application,yml 中添加以下配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启异步回调publisher-returns: truetemplate:mandatory: true

配置说明:

  • publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型:
    • simple: 同步等待 confirm 结果, 直到超时
    • correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback
  • publish-returns: 开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallbcak
  • template.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息

ConfirmCallBack是基于每条消息设置的,所以需要一个全局唯一id 进行区分。
ReturenCallbcak 则是基于每个RabbitTemplate操作实例,是一种全局性的回调。

  1. 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置:
    (这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候,进行一些全局性回调的操作)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) -> {// 记录日志log.error("消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},",replayCode, replayText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}}
}	
  1. 为每条发送的消息,指定消息 ID, 并编写对应的 ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1. 准备消息String message = "hello, spring amqp!";// 2. 准备CorrelationData// 2.1 消息idCorrelationData correlationData = newCorrelationData(UUID.randomUUID().toString());// 2.2 准备 ConfirmCallbackcorrelationData.getFuture().addCallback(confirm -> {// 判断结果if(confirm.isAck()){// ACKlog.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());}else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());// 重发消息}}, throwable -> {// 记录日志log.error("消息发送失败", throwable);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend("amq.topic", "asimple.test", message, correlationData);
}

总结

SpringAMQP 中处理消息确认的几种情况:

  • publisher-confirm:
    • 消息发送到 exchange, 返回 ack
    • 消息发送失败,没有到达交换机,返回 nack
    • 消息发送过程中出现异常,没有收到回执
  • 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback

相关文章:

RabbitMq 消息可靠性问题(一) --- publisher发送时丢失

前言 消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢? 发送时丢失: 生产者发送的消息未送达 exchange消息到达 exchange 后未到达 queue MQ 宕机,queue将消息丢失consumer 接收到消息后未消费…...

Java初识泛型

目录 一、包装类 1、基本数据类型和对应的包装类 2、装箱和拆箱 3、自动装箱和自动拆箱 二、什么是泛型 三、引出泛型 1、泛型的语法 四、泛型类的使用 1、语法 2、示例 3、类型推导(Type Inference) 六、泛型如何编译的 1、擦除机制 2、为什么不能实例化泛型类…...

寸照换底色技巧大全,超详细图文教程

在日常的设计工作中,我们常常需要将图片的背景色进行修改,以适应不同的场景和需求。其中最常用的方法就是寸照换底色技巧。本文将为大家介绍一些常见的寸照换底色技巧,并提供超详细的图文教程,帮助大家轻松完成这项任务。 一、使…...

这篇文章价值很大:股票历史分时成交数据怎么简单获取?【干货】

文章目录前言一、准备二、使用步骤1.引入库2,使用这个API查询历史分时数据:3.查询完整历史分时数据4.其他查询方法参数格式:[(市场代码, 股票代码), ...]参数:市场代码, 股票代码, 文件名, 起始位置, 数量参数:市场代码…...

muduo源码剖析--Buffer

Buffer类 Buffer类是自定义处理数据输入缓冲的类&#xff0c;底层是vector< char >&#xff0c;通过readIdx和writeIdx将缓冲区分为3个部分&#xff0c;第一部分是预留的8字节已经读出的缓冲区字节数、第二部分是还未读出的部分、第三部分是可写的部分。 Buffer类的设计…...

AI人工智能简介和其定义

全称&#xff1a;人工智能&#xff08;Artificial Intelligence&#xff09; 缩写&#xff1a;AI / ai 人工智能研究 亦称智械、机器智能&#xff0c;指由人制造出来的可以表现出智能的机器。通常人工智能是指通过普通计算机程序来呈现人类智能的技术。该词也指出研究这样的智…...

python数据清洗

数据清洗包括&#xff1a;空值&#xff0c;异常值&#xff0c;重复值&#xff0c;类型转换和数据整合这里数据清洗需要用到的库是pandas库&#xff0c;下载方式还是在终端运行 &#xff1a; pip install pandas.首先我们需要对数据进行读取import pandas as pddata pd.read_cs…...

Python3 os.makedirs() 方法、Python3 os.read() 方法

Python3 os.makedirs() 方法 概述 os.makedirs() 方法用于递归创建目录。像 mkdir(), 但创建的所有intermediate-level文件夹需要包含子目录。 语法 makedirs()方法语法格式如下&#xff1a; os.makedirs(path, mode0o777)参数 path -- 需要递归创建的目录。 mode -- 权限…...

【Linux安装数据库】Ubuntu安装mysql并连接navicat

Linux系统部署Django项目 文章目录Linux系统部署Django项目一、mysql安装二、mysql配置文件三、新建数据库和用户四、nivacat链接mysql一、mysql安装 linux安装mysql数据库有很多教程&#xff0c;根据安装方式不同&#xff0c;相关的步骤也不同。可以参考&#xff1a;【Linux安…...

GaussDB工作级开发者认证—第一章GaussDB数据库介绍

一. GaussDB概述 GaussDB是华为基于openGauss自研生态推出的企业级分布式关系型数据库。具备企业级复杂事物混合负载能力&#xff0c;同时支持分布式事务强一致性&#xff0c;同城跨AZ部署&#xff0c;数据0丢失&#xff0c;支持1000的计算节点扩展能力&#xff0c;4PB海量存储…...

阿里张勇:所有行业都值得用大模型重新做一遍!

‍数据智能产业创新服务媒体——聚焦数智 改变商业“2023阿里云峰会”于4月11日在北京国际会议中心隆重召开&#xff0c;本次峰会以" 与实俱进 为创新提速&#xff01;"为主题&#xff0c;阿里巴巴集团董事会主席兼首席执行官张勇、阿里云智能集团首席技术官周靖人、…...

ES6(字符串的扩展与新增方法)

字符串的扩展与新增方法 1. 模板字符串 模板字符串解决了之前的字符串拼接 ESC下那个键&#xff1a;反引号&#xff08;&#xff09;包裹>替换引号 ${变量名/表达式/函数}>替换引引加加导致的代码冗余 //ES5(引引加加) $(#result).append(There are <b> basket.c…...

rk3568点亮LCD(lvds)

rk3568 Android11/12 适配 lvds 屏 LVDS&#xff08;Low Voltage Differential Signal&#xff09;即低电压差分信号。1994年由美国国家半导体&#xff08;NS&#xff09;公司为克服以TTL电平方式传输宽带高码率数据时功耗大、电磁干扰大等缺点而研制的一种数字视频信号传输方…...

全终端办公电子邮件集成方案

面临挑战 应用场景复杂&#xff0c;经常需要在不同终端进行切换&#xff0c;多屏、跨屏及移动办公要求高&#xff1b; 业务系统较多&#xff0c;需要同时支持多种业务的开展&#xff0c;对第三方应用集成及协同办公要求高&#xff1b; 对邮件系统的稳定及高效性要求高&#x…...

再不转型为ChatGPT程序员,有遭受降维打击的危险

Open AI在演示GPT-4的时候&#xff0c;有这么一个场景&#xff1a;给一个界面草图&#xff0c;就可以生成网页代码。这个演示非常简单&#xff0c;如果界面原型比较复杂呢&#xff1f;像这样&#xff1a;ChatGPT能不能直接生成HTML, CSS,JavaScript代码&#xff0c;把这个网页给…...

maven使用教程

文章目录IDEA创建maven项目maven项目必有得目录结构项目构建关键字cleanvalidatecompiletestpackageverifyinstallsitedeploy命令使用方法方法一 在terminal终端执行方法二 在右侧得maven中双击依赖管理在pom.xml下 导包、scope的传递范围、打包方式依赖冲突声明优先原则就近原…...

Emlog底部显示当前在线人数

第一步&#xff1a;在模板文件里面创建“visitor.php”的文件吧下面代码入进去 code <?php//首先你要有读写文件的权限&#xff0c;首次访问肯不显示&#xff0c;正常情况刷新即可$online_log "slzxrs.dat"; //保存人数的文件到根目录,$timeout 30;//30秒内没…...

【java踩坑搞起】MybatisPlus封装的mapper不支持 join,那咋办

众所周知&#xff0c;Mybatis Plus 封装的 mapper 不支持 join&#xff0c;如果需要支持就必须自己去实现。但是对于大部分的业务场景来说&#xff0c;都需要多表 join&#xff0c;要不然就没必要采用关系型数据库了。 直到前几天&#xff0c;偶然碰到了这么一款叫做mybatis-p…...

【创造者】——什么是数学

吉姆罗恩在不经意间这样说过&#xff0c;要么你主宰生活&#xff0c;要么你被生活主宰。这不禁令我深思. 既然如此, 康德说过一句著名的话&#xff0c;既然我已经踏上这条道路&#xff0c;那么&#xff0c;任何东西都不应妨碍我沿着这条路走下去。带着这句话, 我们还要更加慎重…...

ROS系列——错误syntax error near unexpected token `$‘do\r‘‘

ROS系列——错误syntax error near unexpected token $do\r说明解决方法问题原因解决1.终端运行2.本文使用的方法&#xff0c;适用于代码行数较少其他方法&#xff0c;本质就是替换3.重新运行脚本说明 在运行.sh脚本时&#xff0c;报错&#xff1a; syntax error near unexpec…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南

1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;使用DevEco Studio作为开发工具&#xff0c;采用Java语言实现&#xff0c;包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制

使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下&#xff0c;限制某个 IP 的访问频率是非常重要的&#xff0c;可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案&#xff0c;使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...

Unity UGUI Button事件流程

场景结构 测试代码 public class TestBtn : MonoBehaviour {void Start(){var btn GetComponent<Button>();btn.onClick.AddListener(OnClick);}private void OnClick(){Debug.Log("666");}}当添加事件时 // 实例化一个ButtonClickedEvent的事件 [Formerl…...