RabbitMQ-消息的可靠性投递
文章目录
- 0. 什么是消息的可靠性投递
- 1. confirm机制
- 2. return机制
- 3. 总结
0. 什么是消息的可靠性投递
在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息从交换机到队列的过程中出现意外,消息没有正常投放,我们需要消息回到交换机重新投放。
为了解决因为这些种种意外而产生的问题,就需要使用消息的可靠性投递。我们需要在三个过程保证消息传输的正常 :
- 消息
从生产者到交换机的过程 - 消息
从交换机到队列的过程 - 消息
从队列到消费者的过程
消息从生产者到交换机的过程中,我们可以使用confirm机制来保障消息的可靠性。
消息从交换机到队列的过程中,我们可以使用return机制来保障消息的可靠性。
消息从队列到消费者的过程中,我们可以使用ack这个参数来保障消息的可靠性。
其中confirm机制和return机制都是使用回调函数,当消息投放失败后在回调函数中将消息放入redis,用定时任务重新投放。需要我们自己编码。
ack参数是RabbitMQ实现的,我们需要手动ack。RabbitMQ将一条消息推送给消费者,如果没有接收到ack就会重发。
1. confirm机制
使用confirm机制需要在配置文件中将rabbitmq.publisher-confirm-type设置为correlated。表示使用confirm机制。
publisher-confirm-type: correlated
publisher-confirm-type有以下三种值:
simple:简单的执行ack的判断,在发布消息成功后使用 rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDie 方法如果返回false则会关闭channel。(同步confirm,性能较差)correlated:打开消息确认机制, 执行ack的时候还会携带消息的元数据。none:禁用发布确认模式,默认值。
最后在RabbitTemplate中设置回调函数。这个函数不管是消息发送成功与否都会执行。该函数接收一个接口 :ConfirmCallback,此接口只有一个方法 :
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
-
correlationData :保存消息id以及相关信息
-
ack :交换机是否收到消息,收到为true
-
cause :消息接收不到原因,成功接收消息则为null
在这个函数里,可以判断ack,如果为false则代表信息发送失败,可以存入redis,让定时任务扫描redis重新发送消息。(解决方案有很多,这里简单提个建议)
@Slf4j
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ...});return rabbitTemplate;}
}
总结 :
confirm机制有两步 :
-
在yml配置文件中将
publisher-confirm-type设置为correlated,开启confirm机制publisher-confirm-type: correlated -
设置回调函数,setConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入数据库中定时重发.// ... });
2. return机制
在了解return 机制前先了解一个参数 :mandatory。
mandatory是AMQP协议中basic.publish方法中的标识位。
如果交换机根据路由键找不到对应的队列,那么此时它有两个选择 :将消息返回交换机、将消息丢弃,当mandatory的值为true时,消息将返回给交换机;当mandatory的值为false时,消息将被丢弃。
我们想要实现消息的可靠性投递,当消息投放错时让消息重新投放,那么我们就需要将mandatory的值设置为true。
所以实现return机制有三步 :
想使用return机制,首先要在配置文件中将publisher-returns的值设置为true,代表开启return机制
publisher-returns: true
接下来将mandatory这个值设置为true,代表如果消息丢失或者出现意外,将消息返回,而不是丢弃。
rabbitTemplate.setMandatory(true);
最后实现ReturnCallback这个接口 :
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
- message :此条消息
- replyCode :错误编码
- replyText :消息接收失败的原因
- exchange :此条消息的目标交换机
- routingKey :此条消息的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);// 做其他处理...
});
执行到returnCallback函数就代表消息从交换机到队列的过程出现问题,例如路由键错误、网络问题、找不到相应队列…这些情况下可以先打印日志,再使用手动签收机制让交换机重新发送消息。
因为路由键错误这种就是代码写错了,重发消息也没用,只能打印日志等操作来尽可能提醒开发人员,但是网络问题可以重新发送消息。
3. 总结
实现消息的可靠性投递通过confirm机制和return机制。
-
在配置文件中开启这两种机制:
publisher-confirm-type: correlated publisher-returns: true -
注入RabbitTemplate时将mandatory设置为true,代表消息投递异常时将消息返回,而不是丢弃。
-
注入RabbitTemplate时实现ConfirmCallback和ReturnCallback。在里面完成消息投放错误后的保障工作。
@Slf4j @Configuration public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {// 打印日志log.info("ConfirmCallback:" + "相关数据:" + correlationData);log.info("ConfirmCallback:" + "确认情况:" + ack);log.info("ConfirmCallback:" + "原因:" + cause);// 存入redis数据库中定时重发.// ...});rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 打印日志log.info("ReturnCallback:" + "消息:" + message);log.info("ReturnCallback:" + "回应码:" + replyCode);log.info("ReturnCallback:" + "回应信息:" + replyText);log.info("ReturnCallback:" + "交换机:" + exchange);log.info("ReturnCallback:" + "路由键:" + routingKey);});return rabbitTemplate;} }- 在保证confirm机制和return机制的同时别忘记让消费者手动ack。
注意 :
为什么return机制和ack机制可以重新发送,而confirm机制只能借用外部手段呢(例如redis、xxl-job)?
因为return机制触发时消息已经到了交换机,可以通过判断通过mandatory这个参数来确定是重新发送还是丢弃。(只是在发送给队列时出现错误,人家RabbitMQ自己设计的可以重发。)
但是confirm机制触发时消息刚从生产者发送给交换机,还没进入RabbitMQ,只能借用外部手段了。
相关文章:
RabbitMQ-消息的可靠性投递
文章目录0. 什么是消息的可靠性投递1. confirm机制2. return机制3. 总结0. 什么是消息的可靠性投递 在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息…...
华为OD机试题 - 最小叶子节点(JavaScript)| 含思路
华为OD机试题 最近更新的博客使用说明本篇题解:最小叶子节点题目输入输出示例一输入输出示例二输入输出Code解题思路华为OD其它语言版本最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD…...
嵌入式系统硬件设计与实践(开发过程)
【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 如果把电路设计看成是画板子的,这本身其实是狭隘了。嵌入式硬件设计其实是嵌入式系统中很重要的一个部分。里面软件做的什么样…...
入门vue(1-10)
正确学习方式:视频->动手实操->压缩提取->记录表述 1基础结构 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"&…...
C#开发的OpenRA的游戏主界面怎么样创建3
继续游戏主界面创建的主题, 我们知道游戏的主界面上有很多部件,比如显示文本的标签(LabelWidget), 显示按钮(ButtonWidget)。那么这些部件又是如何创建在主界面上的呢? 其实这些部件是否显示,都是来源于文件yaml,在这里就是文件mainmenu.yaml, 在这个文件里定义了所有…...
秒懂算法 | 基于主成分分析法、随机森林算法和SVM算法的人脸识别问题
本文的任务与手写数字识别非常相似,都是基于图片的多分类任务,也都是有监督的。 01、数据集介绍与分析 ORL人脸数据集共包含40个不同人的400张图像,是在1992年4月至1994年4月期间由英国剑桥的Olivetti研究实验室创建。 此数据集下包含40个目录,每个目录下有10张图像,每个…...
QML Loader(加载程序)
Loader加载器用于动态加载 QML 组件。加载程序可以加载 QML 文件(使用 source 属性)或组件对象(使用 sourceComponent 属性) 常用属性: active 活动asynchronous异步,默认为falseitem项目progress 进度so…...
C++——类型转换
目录 C语言中的类型转换 C强制类型转换 static_cast reinterpret_cast const_cast dynamic_cast 延伸问题 RTTI(了解) C语言中的类型转换 在C语言中,如果赋值运算符左右两侧类型不同,或者形参与实参类型不匹配,或…...
vue3:生命周期(onErrorCaptured)
一、背景 当项目如果发生报错,影响程序体验。如果能以捕获的方式得到错误信息,而且还能定位问题,这样就好了,本文介绍onErrorCaptured实现我们想要的效果。 vue2:errorCaptured。使用与vue3同理。 vue3:…...
vue过滤器
vue 过滤器 对要显示的数据进行特定格式化之后再显示 注册过滤器 Vue.filter(name,callback)new Vue({filters:{}}) 使用过滤器 {{ name | 过滤器名 }}v-band:属性“name | 过滤器名” 局部过滤器 <p>{{time | timeFormater }}</p> <!-- 过滤器可接受额外参…...
I/O模型
写在前面 前面聊完了IO方式, 也就意味着网络数据的收发通道是建立起来了。但业务场景中, 通道本身是不会发送数据的。在常见的网络应用中, server端会创建多个链接以服务更多client, 同时要求各个client尽可能互不影响。这是I/O模型(也就是IO方式线程模型)要解决的问题。由于加…...
前端必备技术之——AJAX
简介 AJAX 全称为 Asynchronous JavaScript And XML,就是异步的 JS 和 XML(现在已经基本被json取代)。通过 AJAX 可以在浏览器中向服务器发送异步请求,最大的优势:无刷新获取数据。AJAX 不是新的编程语言,而是一种将现有的标准组…...
MySQL数据库 各种指令操作大杂烩(DML增删改、DQL查询、SQL...)
文章目录前言一、DML 增删改添加数据修改数据删除数据二、DQL 查询基本查询条件查询聚合函数(count、max、min、avg、sum)分组查询(group by)排序查询(order by)分页查询(limit)DQL 语句练习三、SQLDCL 权限控制约束案例多表查询事务存储引擎字符串函数数值函数日期函数流程函数…...
Java分布式全局ID(一)
随着互联网的不断发展,互联网企业的业务在飞速变化,推动着系统架构也在不断地发生变化。 如今微服务技术越来越成熟,很多企业都采用微服务架构来支撑内部及对外的业务,尤其是在高 并发大流量的电商业务场景下,微服务…...
算法分析与设计之并查集详解
算法分析与设计之并查集1.前言2.并查集的基础2.1.关于动态连通性2.2.动态连通性的应用场景:2.3.对问题建模:2.4.建模思路:2.5.API2.7.Quick-Find算法:2.8.Quick-Union算法:3. 并查集的应用1.前言 本文主要介绍解决动态…...
Linux - 内存性能评估
文章目录概述free 命令指定的时间段内不间断地监控内存的使用情况通过watch与free相结合动态监控内存状况vmstat命令监控内存“sar –r”命令组合小结概述 内存的管理和优化是系统性能优化的一个重要部分,内存资源的充足与否直接影响应用系统的使用性能。在进行内存…...
00后初中辍学,转行程序员后,终于找到了女朋友
大家好,这里是程序员晚枫,今天继续分享我们的读者投稿,如需投稿赚稿费的朋友,请在后台私信我:投稿。下面我们进入正文吧~ 我是一位 00 后,从初一辍学,到目前为止已有 8 年的时间了,在…...
“Vue学习注意事项:掌握核心特性,注意性能优化和第三方库的使用“
Vue是一款易学易用的JavaScript框架,它可以帮助开发者构建动态、高性能的用户界面。Vue的核心概念包括数据绑定、指令、计算属性和组件化,学习Vue需要注意以下几个点:1. 理解Vue的基本概念和用法Vue的基本概念包括模板、组件、数据绑定、计算…...
计算机网络协议详解(二)
文章目录🔥HTTP协议介绍🔥HTTP协议特点🔥HTTP协议发展和版本🔥HTTP协议中URI、URL、URN🔥HTTP协议的请求分析🔥HTTP协议的响应分析🔥MIME类型🔥HTTP协议介绍 HTTP协议介绍 什么是超…...
【CSS】CSS 复合选择器 ② ( 子元素选择器 | 交集选择器 )
文章目录一、子元素选择器1、语法说明2、代码分析3、代码示例二、交集选择器1、语法说明2、代码示例一、子元素选择器 1、语法说明 子元素选择器 可以选择 某个基础选择器 选择出的 元素组 的 直接子元素 ( 亲儿子元素 ) 中 使用基础选择器 选择 元素 ; 子元素选择器语法 : 父选…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动
一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...
Oracle查询表空间大小
1 查询数据库中所有的表空间以及表空间所占空间的大小 SELECTtablespace_name,sum( bytes ) / 1024 / 1024 FROMdba_data_files GROUP BYtablespace_name; 2 Oracle查询表空间大小及每个表所占空间的大小 SELECTtablespace_name,file_id,file_name,round( bytes / ( 1024 …...
关于nvm与node.js
1 安装nvm 安装过程中手动修改 nvm的安装路径, 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解,但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后,通常在该文件中会出现以下配置&…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
AI编程--插件对比分析:CodeRider、GitHub Copilot及其他
AI编程插件对比分析:CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展,AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者,分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
华为OD机考-机房布局
import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...
LRU 缓存机制详解与实现(Java版) + 力扣解决
📌 LRU 缓存机制详解与实现(Java版) 一、📖 问题背景 在日常开发中,我们经常会使用 缓存(Cache) 来提升性能。但由于内存有限,缓存不可能无限增长,于是需要策略决定&am…...
