当前位置: 首页 > news >正文

RabbitMq 消息可靠性问题(一) --- publisher发送时丢失

前言

消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢?

  • 发送时丢失:
    • 生产者发送的消息未送达 exchange
    • 消息到达 exchange 后未到达 queue
  • MQ 宕机,queue将消息丢失
  • consumer 接收到消息后未消费就宕机
    在这里插入图片描述
    消息可靠性问题及其对应的解决方案:
场景publisher发送时丢失MQ消息丢失consumer消费问题
解决方案生产者确认机制消息持久化消费者消息确认&&失败重试机制

下面我们先说一下publisher 发送时丢失的问题应该如何处理

生产者确认机制的理论说明

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后, 会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publish-confirm, 发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publish-return, 发送回执
    • 消息投递到交换机,但是没有路由到队列,返回ACK, 及路由失败原因

注意: 确认机制发送消息时, 需要给每个消息设置一个全局唯一 id, 以区分不同消息,避免ack 冲突

在这里插入图片描述

代码实现

下面基于SpringAMQP 实现的生产者确认机制

  1. 在 publisher 服务的 application,yml 中添加以下配置:
spring:rabbitmq:publisher-confirm-type: correlated # 开启异步回调publisher-returns: truetemplate:mandatory: true

配置说明:

  • publish-confirm-type: 开启 publisher-confirm, 这里支持两种类型:
    • simple: 同步等待 confirm 结果, 直到超时
    • correlated: 异步回调, 定义ConfirmCallback, MQ 返回结果时会回调这个ConfirmCallback
  • publish-returns: 开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallbcak
  • template.mandatory: 定义消息路由失败时的策略。true, 则调用ReturnCallback, false: 则直接丢弃消息

ConfirmCallBack是基于每条消息设置的,所以需要一个全局唯一id 进行区分。
ReturenCallbcak 则是基于每个RabbitTemplate操作实例,是一种全局性的回调。

  1. 由于每个 RabbitTemplate 只能配置一个 ReturnCallback, 因此需要在项目启动过程中配置:
    (这里可以实现ApplicationContextAware,它可以在SpringIOC 容器初始化的时候,进行一些全局性回调的操作)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取 RabbitTemplate对象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallbackrabbitTemplate.setReturnCallback((message, replayCode, replayText,exchange, routingKey) -> {// 记录日志log.error("消息发送到队列失败, 响应码:{}, 失败原因:{},交换机:{}, 路由key:{},消息:{},",replayCode, replayText, exchange, routingKey, message.toString());// 如果有需要的话,重发消息});}}
}	
  1. 为每条发送的消息,指定消息 ID, 并编写对应的 ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {// 1. 准备消息String message = "hello, spring amqp!";// 2. 准备CorrelationData// 2.1 消息idCorrelationData correlationData = newCorrelationData(UUID.randomUUID().toString());// 2.2 准备 ConfirmCallbackcorrelationData.getFuture().addCallback(confirm -> {// 判断结果if(confirm.isAck()){// ACKlog.debug("消息成功投递到交换机!消息ID:{}",correlationData.getId());}else {// NACKlog.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());// 重发消息}}, throwable -> {// 记录日志log.error("消息发送失败", throwable);// 重发消息});// 3.发送消息rabbitTemplate.convertAndSend("amq.topic", "asimple.test", message, correlationData);
}

总结

SpringAMQP 中处理消息确认的几种情况:

  • publisher-confirm:
    • 消息发送到 exchange, 返回 ack
    • 消息发送失败,没有到达交换机,返回 nack
    • 消息发送过程中出现异常,没有收到回执
  • 消息成功发送到 exchange, 但没有路由到 queue, 调用 ReturnCallback

相关文章:

RabbitMq 消息可靠性问题(一) --- publisher发送时丢失

前言 消息从生产者发送到exchange, 再到 queue, 再到消费者。这个过程中有哪些有消息丢失的可能性呢? 发送时丢失: 生产者发送的消息未送达 exchange消息到达 exchange 后未到达 queue MQ 宕机,queue将消息丢失consumer 接收到消息后未消费…...

Java初识泛型

目录 一、包装类 1、基本数据类型和对应的包装类 2、装箱和拆箱 3、自动装箱和自动拆箱 二、什么是泛型 三、引出泛型 1、泛型的语法 四、泛型类的使用 1、语法 2、示例 3、类型推导(Type Inference) 六、泛型如何编译的 1、擦除机制 2、为什么不能实例化泛型类…...

寸照换底色技巧大全,超详细图文教程

在日常的设计工作中,我们常常需要将图片的背景色进行修改,以适应不同的场景和需求。其中最常用的方法就是寸照换底色技巧。本文将为大家介绍一些常见的寸照换底色技巧,并提供超详细的图文教程,帮助大家轻松完成这项任务。 一、使…...

这篇文章价值很大:股票历史分时成交数据怎么简单获取?【干货】

文章目录前言一、准备二、使用步骤1.引入库2,使用这个API查询历史分时数据:3.查询完整历史分时数据4.其他查询方法参数格式:[(市场代码, 股票代码), ...]参数:市场代码, 股票代码, 文件名, 起始位置, 数量参数:市场代码…...

muduo源码剖析--Buffer

Buffer类 Buffer类是自定义处理数据输入缓冲的类&#xff0c;底层是vector< char >&#xff0c;通过readIdx和writeIdx将缓冲区分为3个部分&#xff0c;第一部分是预留的8字节已经读出的缓冲区字节数、第二部分是还未读出的部分、第三部分是可写的部分。 Buffer类的设计…...

AI人工智能简介和其定义

全称&#xff1a;人工智能&#xff08;Artificial Intelligence&#xff09; 缩写&#xff1a;AI / ai 人工智能研究 亦称智械、机器智能&#xff0c;指由人制造出来的可以表现出智能的机器。通常人工智能是指通过普通计算机程序来呈现人类智能的技术。该词也指出研究这样的智…...

python数据清洗

数据清洗包括&#xff1a;空值&#xff0c;异常值&#xff0c;重复值&#xff0c;类型转换和数据整合这里数据清洗需要用到的库是pandas库&#xff0c;下载方式还是在终端运行 &#xff1a; pip install pandas.首先我们需要对数据进行读取import pandas as pddata pd.read_cs…...

Python3 os.makedirs() 方法、Python3 os.read() 方法

Python3 os.makedirs() 方法 概述 os.makedirs() 方法用于递归创建目录。像 mkdir(), 但创建的所有intermediate-level文件夹需要包含子目录。 语法 makedirs()方法语法格式如下&#xff1a; os.makedirs(path, mode0o777)参数 path -- 需要递归创建的目录。 mode -- 权限…...

【Linux安装数据库】Ubuntu安装mysql并连接navicat

Linux系统部署Django项目 文章目录Linux系统部署Django项目一、mysql安装二、mysql配置文件三、新建数据库和用户四、nivacat链接mysql一、mysql安装 linux安装mysql数据库有很多教程&#xff0c;根据安装方式不同&#xff0c;相关的步骤也不同。可以参考&#xff1a;【Linux安…...

GaussDB工作级开发者认证—第一章GaussDB数据库介绍

一. GaussDB概述 GaussDB是华为基于openGauss自研生态推出的企业级分布式关系型数据库。具备企业级复杂事物混合负载能力&#xff0c;同时支持分布式事务强一致性&#xff0c;同城跨AZ部署&#xff0c;数据0丢失&#xff0c;支持1000的计算节点扩展能力&#xff0c;4PB海量存储…...

阿里张勇:所有行业都值得用大模型重新做一遍!

‍数据智能产业创新服务媒体——聚焦数智 改变商业“2023阿里云峰会”于4月11日在北京国际会议中心隆重召开&#xff0c;本次峰会以" 与实俱进 为创新提速&#xff01;"为主题&#xff0c;阿里巴巴集团董事会主席兼首席执行官张勇、阿里云智能集团首席技术官周靖人、…...

ES6(字符串的扩展与新增方法)

字符串的扩展与新增方法 1. 模板字符串 模板字符串解决了之前的字符串拼接 ESC下那个键&#xff1a;反引号&#xff08;&#xff09;包裹>替换引号 ${变量名/表达式/函数}>替换引引加加导致的代码冗余 //ES5(引引加加) $(#result).append(There are <b> basket.c…...

rk3568点亮LCD(lvds)

rk3568 Android11/12 适配 lvds 屏 LVDS&#xff08;Low Voltage Differential Signal&#xff09;即低电压差分信号。1994年由美国国家半导体&#xff08;NS&#xff09;公司为克服以TTL电平方式传输宽带高码率数据时功耗大、电磁干扰大等缺点而研制的一种数字视频信号传输方…...

全终端办公电子邮件集成方案

面临挑战 应用场景复杂&#xff0c;经常需要在不同终端进行切换&#xff0c;多屏、跨屏及移动办公要求高&#xff1b; 业务系统较多&#xff0c;需要同时支持多种业务的开展&#xff0c;对第三方应用集成及协同办公要求高&#xff1b; 对邮件系统的稳定及高效性要求高&#x…...

再不转型为ChatGPT程序员,有遭受降维打击的危险

Open AI在演示GPT-4的时候&#xff0c;有这么一个场景&#xff1a;给一个界面草图&#xff0c;就可以生成网页代码。这个演示非常简单&#xff0c;如果界面原型比较复杂呢&#xff1f;像这样&#xff1a;ChatGPT能不能直接生成HTML, CSS,JavaScript代码&#xff0c;把这个网页给…...

maven使用教程

文章目录IDEA创建maven项目maven项目必有得目录结构项目构建关键字cleanvalidatecompiletestpackageverifyinstallsitedeploy命令使用方法方法一 在terminal终端执行方法二 在右侧得maven中双击依赖管理在pom.xml下 导包、scope的传递范围、打包方式依赖冲突声明优先原则就近原…...

Emlog底部显示当前在线人数

第一步&#xff1a;在模板文件里面创建“visitor.php”的文件吧下面代码入进去 code <?php//首先你要有读写文件的权限&#xff0c;首次访问肯不显示&#xff0c;正常情况刷新即可$online_log "slzxrs.dat"; //保存人数的文件到根目录,$timeout 30;//30秒内没…...

【java踩坑搞起】MybatisPlus封装的mapper不支持 join,那咋办

众所周知&#xff0c;Mybatis Plus 封装的 mapper 不支持 join&#xff0c;如果需要支持就必须自己去实现。但是对于大部分的业务场景来说&#xff0c;都需要多表 join&#xff0c;要不然就没必要采用关系型数据库了。 直到前几天&#xff0c;偶然碰到了这么一款叫做mybatis-p…...

【创造者】——什么是数学

吉姆罗恩在不经意间这样说过&#xff0c;要么你主宰生活&#xff0c;要么你被生活主宰。这不禁令我深思. 既然如此, 康德说过一句著名的话&#xff0c;既然我已经踏上这条道路&#xff0c;那么&#xff0c;任何东西都不应妨碍我沿着这条路走下去。带着这句话, 我们还要更加慎重…...

ROS系列——错误syntax error near unexpected token `$‘do\r‘‘

ROS系列——错误syntax error near unexpected token $do\r说明解决方法问题原因解决1.终端运行2.本文使用的方法&#xff0c;适用于代码行数较少其他方法&#xff0c;本质就是替换3.重新运行脚本说明 在运行.sh脚本时&#xff0c;报错&#xff1a; syntax error near unexpec…...

conda相比python好处

Conda 作为 Python 的环境和包管理工具&#xff0c;相比原生 Python 生态&#xff08;如 pip 虚拟环境&#xff09;有许多独特优势&#xff0c;尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处&#xff1a; 一、一站式环境管理&#xff1a…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

MySQL用户和授权

开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务&#xff1a; test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

PHP 8.5 即将发布:管道操作符、强力调试

前不久&#xff0c;PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5&#xff01;作为 PHP 语言的又一次重要迭代&#xff0c;PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是&#xff0c;借助强大的本地开发环境 ServBay&am…...

tomcat入门

1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效&#xff0c;稳定&#xff0c;易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...

苹果AI眼镜:从“工具”到“社交姿态”的范式革命——重新定义AI交互入口的未来机会

在2025年的AI硬件浪潮中,苹果AI眼镜(Apple Glasses)正在引发一场关于“人机交互形态”的深度思考。它并非简单地替代AirPods或Apple Watch,而是开辟了一个全新的、日常可接受的AI入口。其核心价值不在于功能的堆叠,而在于如何通过形态设计打破社交壁垒,成为用户“全天佩戴…...