RabbitMQ-消息的可靠性投递
文章目录
- 0. 什么是消息的可靠性投递
- 1. confirm机制
- 2. return机制
- 3. 总结
0. 什么是消息的可靠性投递
在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息从交换机到队列的过程中出现意外,消息没有正常投放,我们需要消息回到交换机重新投放。
为了解决因为这些种种意外而产生的问题,就需要使用消息的可靠性投递
。我们需要在三个过程保证消息传输的正常 :
- 消息
从生产者到交换机
的过程 - 消息
从交换机到队列
的过程 - 消息
从队列到消费者
的过程
消息从生产者到交换机的过程中,我们可以使用confirm机制
来保障消息的可靠性。
消息从交换机到队列的过程中,我们可以使用return机制
来保障消息的可靠性。
消息从队列到消费者的过程中,我们可以使用ack
这个参数来保障消息的可靠性。
其中confirm机制
和return机制
都是使用回调函数,当消息投放失败后在回调函数中将消息放入redis,用定时任务重新投放。需要我们自己编码。
ack
参数是RabbitMQ实现的,我们需要手动ack。RabbitMQ将一条消息推送给消费者,如果没有接收到ack就会重发。
1. confirm机制
使用confirm机制需要在配置文件中将rabbitmq.publisher-confirm-type
设置为correlated
。表示使用confirm机制。
publisher-confirm-type: correlated
publisher-confirm-type
有以下三种值:
simple
:简单的执行ack的判断,在发布消息成功后使用 rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDie 方法如果返回false则会关闭channel。(同步confirm,性能较差)correlated
:打开消息确认机制, 执行ack的时候还会携带消息的元数据。none
:禁用发布确认模式,默认值。
最后在RabbitTemplate中设置回调函数。这个函数不管是消息发送成功与否都会执行。该函数接收一个接口 :ConfirmCallback,此接口只有一个方法 :
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
-
correlationData :保存消息id以及相关信息
-
ack :交换机是否收到消息,收到为true
-
cause :消息接收不到原因,成功接收消息则为null
在这个函数里,可以判断ack,如果为false则代表信息发送失败,可以存入redis,让定时任务扫描redis重新发送消息。(解决方案有很多,这里简单提个建议)
@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ...});return rabbitTemplate;}
}
总结 :
confirm机制有两步 :
-
在yml配置文件中将
publisher-confirm-type
设置为correlated
,开启confirm机制publisher-confirm-type: correlated
-
设置回调函数,setConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ... });
2. return机制
在了解return 机制前先了解一个参数 :mandatory
。
mandatory
是AMQP协议中basic.publish方法中的标识位。
如果交换机根据路由键找不到对应的队列,那么此时它有两个选择 :将消息返回交换机、将消息丢弃,当mandatory的值为true时,消息将返回给交换机;当mandatory的值为false时,消息将被丢弃。
我们想要实现消息的可靠性投递
,当消息投放错时让消息重新投放,那么我们就需要将mandatory
的值设置为true。
所以实现return机制有三步 :
想使用return机制,首先要在配置文件中将publisher-returns
的值设置为true
,代表开启return机制
publisher-returns: true
接下来将mandatory这个值设置为true,代表如果消息丢失或者出现意外,将消息返回,而不是丢弃。
rabbitTemplate.setMandatory(true);
最后实现ReturnCallback
这个接口 :
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
- message :此条消息
- replyCode :错误编码
- replyText :消息接收失败的原因
- exchange :此条消息的目标交换机
- routingKey :此条消息的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);// 做其他处理...
});
执行到returnCallback函数就代表消息从交换机到队列的过程出现问题,例如路由键错误
、网络问题
、找不到相应队列
…这些情况下可以先打印日志,再使用手动签收机制让交换机重新发送消息。
因为路由键错误这种就是代码写错了,重发消息也没用,只能打印日志等操作来尽可能提醒开发人员,但是网络问题可以重新发送消息。
3. 总结
实现消息的可靠性投递通过confirm机制和return机制。
-
在配置文件中开启这两种机制:
publisher-confirm-type: correlated publisher-returns: true
-
注入RabbitTemplate时将mandatory设置为true,代表消息投递异常时将消息返回,而不是丢弃。
-
注入RabbitTemplate时实现ConfirmCallback和ReturnCallback。在里面完成消息投放错误后的保障工作。
@Slf4j @Configuration public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入redis数据库中定时重发.// ...});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);});return rabbitTemplate;} }
- 在保证confirm机制和return机制的同时别忘记让消费者手动ack。
注意 :
为什么return机制和ack机制可以重新发送,而confirm机制只能借用外部手段呢(例如redis、xxl-job)?
因为return机制触发时消息已经到了交换机,可以通过判断通过mandatory这个参数来确定是重新发送还是丢弃。(只是在发送给队列时出现错误,人家RabbitMQ自己设计的可以重发。)
但是confirm机制触发时消息刚从生产者发送给交换机,还没进入RabbitMQ,只能借用外部手段了。
相关文章:
RabbitMQ-消息的可靠性投递
文章目录0. 什么是消息的可靠性投递1. confirm机制2. return机制3. 总结0. 什么是消息的可靠性投递 在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息…...
华为OD机试题 - 最小叶子节点(JavaScript)| 含思路
华为OD机试题 最近更新的博客使用说明本篇题解:最小叶子节点题目输入输出示例一输入输出示例二输入输出Code解题思路华为OD其它语言版本最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD…...
嵌入式系统硬件设计与实践(开发过程)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 如果把电路设计看成是画板子的,这本身其实是狭隘了。嵌入式硬件设计其实是嵌入式系统中很重要的一个部分。里面软件做的什么样…...

入门vue(1-10)
正确学习方式:视频->动手实操->压缩提取->记录表述 1基础结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"&…...
C#开发的OpenRA的游戏主界面怎么样创建3
继续游戏主界面创建的主题, 我们知道游戏的主界面上有很多部件,比如显示文本的标签(LabelWidget), 显示按钮(ButtonWidget)。那么这些部件又是如何创建在主界面上的呢? 其实这些部件是否显示,都是来源于文件yaml,在这里就是文件mainmenu.yaml, 在这个文件里定义了所有…...

秒懂算法 | 基于主成分分析法、随机森林算法和SVM算法的人脸识别问题
本文的任务与手写数字识别非常相似,都是基于图片的多分类任务,也都是有监督的。 01、数据集介绍与分析 ORL人脸数据集共包含40个不同人的400张图像,是在1992年4月至1994年4月期间由英国剑桥的Olivetti研究实验室创建。 此数据集下包含40个目录,每个目录下有10张图像,每个…...

QML Loader(加载程序)
Loader加载器用于动态加载 QML 组件。加载程序可以加载 QML 文件(使用 source 属性)或组件对象(使用 sourceComponent 属性) 常用属性: active 活动asynchronous异步,默认为falseitem项目progress 进度so…...

C++——类型转换
目录 C语言中的类型转换 C强制类型转换 static_cast reinterpret_cast const_cast dynamic_cast 延伸问题 RTTI(了解) C语言中的类型转换 在C语言中,如果赋值运算符左右两侧类型不同,或者形参与实参类型不匹配,或…...

vue3:生命周期(onErrorCaptured)
一、背景 当项目如果发生报错,影响程序体验。如果能以捕获的方式得到错误信息,而且还能定位问题,这样就好了,本文介绍onErrorCaptured实现我们想要的效果。 vue2:errorCaptured。使用与vue3同理。 vue3:…...
vue过滤器
vue 过滤器 对要显示的数据进行特定格式化之后再显示 注册过滤器 Vue.filter(name,callback)new Vue({filters:{}}) 使用过滤器 {{ name | 过滤器名 }}v-band:属性“name | 过滤器名” 局部过滤器 <p>{{time | timeFormater }}</p> <!-- 过滤器可接受额外参…...

I/O模型
写在前面 前面聊完了IO方式, 也就意味着网络数据的收发通道是建立起来了。但业务场景中, 通道本身是不会发送数据的。在常见的网络应用中, server端会创建多个链接以服务更多client, 同时要求各个client尽可能互不影响。这是I/O模型(也就是IO方式线程模型)要解决的问题。由于加…...

前端必备技术之——AJAX
简介 AJAX 全称为 Asynchronous JavaScript And XML,就是异步的 JS 和 XML(现在已经基本被json取代)。通过 AJAX 可以在浏览器中向服务器发送异步请求,最大的优势:无刷新获取数据。AJAX 不是新的编程语言,而是一种将现有的标准组…...
MySQL数据库 各种指令操作大杂烩(DML增删改、DQL查询、SQL...)
文章目录前言一、DML 增删改添加数据修改数据删除数据二、DQL 查询基本查询条件查询聚合函数(count、max、min、avg、sum)分组查询(group by)排序查询(order by)分页查询(limit)DQL 语句练习三、SQLDCL 权限控制约束案例多表查询事务存储引擎字符串函数数值函数日期函数流程函数…...

Java分布式全局ID(一)
随着互联网的不断发展,互联网企业的业务在飞速变化,推动着系统架构也在不断地发生变化。 如今微服务技术越来越成熟,很多企业都采用微服务架构来支撑内部及对外的业务,尤其是在高 并发大流量的电商业务场景下,微服务…...

算法分析与设计之并查集详解
算法分析与设计之并查集1.前言2.并查集的基础2.1.关于动态连通性2.2.动态连通性的应用场景:2.3.对问题建模:2.4.建模思路:2.5.API2.7.Quick-Find算法:2.8.Quick-Union算法:3. 并查集的应用1.前言 本文主要介绍解决动态…...

Linux - 内存性能评估
文章目录概述free 命令指定的时间段内不间断地监控内存的使用情况通过watch与free相结合动态监控内存状况vmstat命令监控内存“sar –r”命令组合小结概述 内存的管理和优化是系统性能优化的一个重要部分,内存资源的充足与否直接影响应用系统的使用性能。在进行内存…...
00后初中辍学,转行程序员后,终于找到了女朋友
大家好,这里是程序员晚枫,今天继续分享我们的读者投稿,如需投稿赚稿费的朋友,请在后台私信我:投稿。下面我们进入正文吧~ 我是一位 00 后,从初一辍学,到目前为止已有 8 年的时间了,在…...
“Vue学习注意事项:掌握核心特性,注意性能优化和第三方库的使用“
Vue是一款易学易用的JavaScript框架,它可以帮助开发者构建动态、高性能的用户界面。Vue的核心概念包括数据绑定、指令、计算属性和组件化,学习Vue需要注意以下几个点:1. 理解Vue的基本概念和用法Vue的基本概念包括模板、组件、数据绑定、计算…...

计算机网络协议详解(二)
文章目录🔥HTTP协议介绍🔥HTTP协议特点🔥HTTP协议发展和版本🔥HTTP协议中URI、URL、URN🔥HTTP协议的请求分析🔥HTTP协议的响应分析🔥MIME类型🔥HTTP协议介绍 HTTP协议介绍 什么是超…...

【CSS】CSS 复合选择器 ② ( 子元素选择器 | 交集选择器 )
文章目录一、子元素选择器1、语法说明2、代码分析3、代码示例二、交集选择器1、语法说明2、代码示例一、子元素选择器 1、语法说明 子元素选择器 可以选择 某个基础选择器 选择出的 元素组 的 直接子元素 ( 亲儿子元素 ) 中 使用基础选择器 选择 元素 ; 子元素选择器语法 : 父选…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...

高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)
🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...