当前位置: 首页 > news >正文

Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录

  • 一、序言
  • 二、生产者确保消息发送成功
    • 1、为什么需要Publisher Confirms
    • 2、哪些消息会被确认处理成功
  • 三、消费者保证消息被处理
  • 四、Spring RabbitMQ支持代码示例
    • 1、 application.yml
    • 2、RabbigtMQ配置
    • 3、可靠生产者配置
    • 4、可靠消费者配置
    • 5、测试用例

一、序言

在有些业务场景中,消息是不能丢的,比如分布式事务资金动账,出账方扣款,那么入账方就一定要收款。以前写了一篇分布式事务的文章,里面的跨地区转账就是一个实际案例。

消息是有可能丢的,比如生产者在发送消息时broker服务挂了,消息没有来得及落盘,这时消息就彻底丢了。

保证MQ消息可靠传输主要有两个方面,一方面是消息生产者确保消息一定发送成功,另一方面是消费者确保消息一定被处理。


二、生产者确保消息发送成功

1、为什么需要Publisher Confirms

在Spring AMQP中AmqpTemplate的实现RabbitTemplate已经支持 Publisher Confirms and Returns,所谓的publisher confirms意思就是消息发布者确认消息是否已经被发送。

在RabbitMQ官方文档描述中,持久化的消息在Broker重启时也是应该存活的,这里的词用的是应该,因为消息有可能在落地磁盘前Broker就挂了,导致消息丢失。

最直接的解决方案是通过事务,但是通过事务有两个问题:

  • 事务阻塞:发布者必须等待Broker处理完每条消息。
  • 事务很重:每次提交都会要求触发fsync(),强制磁盘,这个过程需要花很长的时间。

备注:在RabbitMQ官方测试中,通过事务去保证,发布10000条消息需要花至少4分钟的时间。

在这里插入图片描述

而通过Publisher Confirm机制,一旦Broker处理完就会确认消息,而且这个过程是异步的,生产者可以流式发布消息,不需要等待Broker,并且Broker会批量高效将消息落盘。

2、哪些消息会被确认处理成功

当Broker确认消息时,会通知消息发布者消息是否被成功处理,成功处理的基本规则如下:

  • 无法路由的mandatory(必须有符合条件的队列)和immediate(必须有消费者在线)类型在被basic.return后会被确认。
  • 非持久化消息在入队时会被确认。
  • 持久化消息当持久化到磁盘或者被消费者消费时会被确认。

三、消费者保证消息被处理

消费者端确保消息消费很简单,关闭消息自动确认就好,开启消息手动确认。当然有些场景消息只能被处理一次,可以通过分布式锁来实现。


四、Spring RabbitMQ支持代码示例

1、 application.yml

server:port: 8080
spring:rabbitmq:addresses: localhost:5672username: adminpassword: adminvirtual-host: /publisher-returns: truepublisher-confirm-type: correlatedlistener:type: simplesimple:acknowledge-mode: manualconcurrency: 5max-concurrency: 20prefetch: 5template:mandatory: true

备注:

  • 这里一定要设置spring.rabbitmq.publisher-returnstrue,并且设置spring.rabbitmq.publisher-confirm-typecorrelated,同时设置spring.rabbitmq.template.mandatorytrue
  • 上面我们将消费者的确认模式改为了手动确认

2、RabbigtMQ配置

@Configuration
public class RabbitReliableTransportConfig {/*** RabbitTemplate消息转换器配置,自动将对象转换为json字符串** @return*/@Beanpublic MessageConverter jackson2JsonMessageConverter() {Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();messageConverter.setClassMapper(new DefaultJackson2JavaTypeMapper());return messageConverter;}@Beanpublic Queue reliableQueue() {return QueueBuilder.durable("reliable-queue").build();}
}

3、可靠生产者配置

@Slf4j
@Component
@RequiredArgsConstructor
public class RabbitMqReliableProducer {private final RabbitTemplate rabbitTemplate;public void sendReliableMsg(String body) {// 发送可靠消息ReliableMsgDTO reliableMsgDTO = ReliableMsgDTO.builder().body(body).build();CorrelationData correlationData = new CorrelationData();rabbitTemplate.convertAndSend("reliable-queue", reliableMsgDTO, correlationData);// 发送确认逻辑CompletableFuture<Confirm> future = correlationData.getFuture().completable();future.whenComplete((confirm, throwable) -> {if (confirm.isAck()) {log.info("消息已经被成功发送, 消息内容:{}", JSON.toJSONString(reliableMsgDTO));return;}log.warn("消息发送未成功发送, 原因:{}, 消息内容:{}", confirm.getReason(), JSON.toJSONString(reliableMsgDTO), throwable);// 5秒后再发送LockSupport.parkNanos(5 * 1000 * 1000 * 1000L);rabbitTemplate.convertSendAndReceive(reliableMsgDTO, correlationData);});}
}

4、可靠消费者配置

@Slf4j
@Component
public class RabbitMQReliableConsumer {@RabbitListener(queues = "reliable-queue")public void handleMsgFromQueue(ReliableMsgDTO reliableMsgDTO, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {channel.basicAck(tag, false);// channel.basicNack(tag, false, false);log.info("Message received from queue, message body: {}", JSON.toJSONString(reliableMsgDTO));}
}

备注:这里我们开启了消息的手动确认,如果消息处理失败没有确认,那么消息将会在下次消费者参加连接时再次被投递。

5、测试用例

测试结果如下,每当消息发送至Broker成功后会触发回调,如果消息发送失败将会触发重新发送。

2024-01-20 18:13:11.399  INFO 12316 --- [78.107.127:5672] c.u.r.i.p.RabbitMqReliableProducer       : 消息已经被成功发送, 消息内容:{"body":"hello"}
2024-01-20 18:13:11.399  INFO 12316 --- [ntContainer#0-5] c.u.r.i.c.RabbitMQReliableConsumer       : Message received from queue, message body: {"body":"hello"}

在这里插入图片描述

相关文章:

Spring RabbitMQ那些事(3-消息可靠传输和订阅)

目录 一、序言二、生产者确保消息发送成功1、为什么需要Publisher Confirms2、哪些消息会被确认处理成功 三、消费者保证消息被处理四、Spring RabbitMQ支持代码示例1、 application.yml2、RabbigtMQ配置3、可靠生产者配置4、可靠消费者配置5、测试用例 一、序言 在有些业务场…...

揭秘 Kafka 高性能之谜:一文读懂背后的设计精粹与技术实现

Kafka在性能方面有着显著的优势&#xff0c;这也使得Kafka的应用非常广泛&#xff0c;那kakfa的性能为何如此优异呢&#xff1f;本文将带你探寻kafka高性能之谜。 kafka的高性能概括起来有如下几点&#xff1a;顺序写入磁盘与I/O优化、批量处理、页缓存、零拷贝技术、分区并行处…...

canvas绘制美国国旗(USA Flag)

查看专栏目录 canvas实例应用100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…...

Python中的`__all__`魔法函数使用详解

概要 Python是一门灵活而强大的编程语言&#xff0c;提供了各种机制来控制模块的导入和访问。其中&#xff0c;__all__魔法函数是一种用于限制模块导入的机制&#xff0c;可以明确指定哪些变量、函数或类可以被导入。本文将深入探讨__all__的作用、用法以及示例&#xff0c;以…...

Studio One 6 mac 6.5.2 激活版 数字音乐编曲创作

PreSonus Studio One是PreSonus出品的一款功能强大的音乐创作软件。主要为用户提供音乐创作、录音、编辑、制作等功能。它可以让你创造音乐&#xff0c;无限的轨道&#xff0c;无限的MIDI和乐器轨道&#xff0c;虚拟乐器和效果通道&#xff0c;这些都是强大和完美的。 软件下载…...

GitHub图床TyporaPicGo相关配置

本文作者&#xff1a; slience_me 文章目录 GitHub图床&Typora&PicGo相关配置1. Github配置2. picGo配置3. Typora配置 GitHub图床&Typora&PicGo相关配置 关于Typora旧版的百度网盘下载路径 链接&#xff1a;https://pan.baidu.com/s/12mq-dMqWnRRoreGo4MTbKg?…...

FireAlpaca:轻量级、免费的Mac/Win绘图软件,让你的创意如火燃烧!

FireAlpaca是一款轻量级、免费的绘图软件&#xff0c;适用于Mac和Win系统&#xff0c;让你的创作过程更加快捷、简便。无论是绘制漫画、插图、设计作品还是进行简单的图片编辑&#xff0c;FireAlpaca都能满足你的需求。 首先&#xff0c;FireAlpaca具有直观友好的用户界面&…...

用 Python 制作可视化 GUI 界面,一键实现自动分类管理文件!

经常杂乱无章的文件夹会让我们找不到所想要的文件&#xff0c;因此小编特意制作了一个可视化GUI界面&#xff0c;通过输入路径一键点击实现文件分门别类的归档。 不同的文件后缀归类为不同的类别 我们先罗列一下大致有几类文件&#xff0c;根据文件的后缀来设定&#xff0c;大…...

【STM32】USB程序烧录需要重新上电 软件复位方法

文章目录 一、问题二、解决思路2.1 直接插拔USB2.2 给芯片复位 三、解决方法3.1 别人的解决方法3.2 在下载界面进行设置 一、问题 最近学习STM32的USB功能&#xff0c;主要是想要使用虚拟串口功能&#xff08;VCP&#xff09;&#xff0c;发现每次烧录之后都需要重新上电才可以…...

Java数据结构与算法:图算法之深度优先搜索(DFS)

Java数据结构与算法&#xff1a;图算法之深度优先搜索&#xff08;DFS&#xff09; 大家好&#xff0c;我是免费搭建查券返利机器人赚佣金就用微赚淘客系统3.0的小编&#xff0c;一个热爱编程的程序猿。今天&#xff0c;让我们一起探索图算法中的深度优先搜索&#xff08;DFS&…...

SpringBoot整合QQ邮箱发送验证码

一、QQ开启SMTP 打开QQ邮箱&#xff0c;点击设置&#xff0c;进入账号&#xff0c;往下滑后&#xff0c;看见服务状态后&#xff0c;点击管理服务 进入管理服务后&#xff0c;打开服务&#xff0c;然后获取授权码 二 、导入依赖 <!-- 邮箱--><dependency>&…...

云虚拟主机怎么修改代码?如何修改部署在虚拟主机的网站代码?

很多站长成功创建网站之后&#xff0c;或多或少都会对网站代码进行适当修改。比如boke112百科使用YIA主题后&#xff0c;也根据自己的需要进行了多个方面的小修改。 那么如果网站是部署在虚拟主机上的&#xff0c;那么应该如何修改这些网站代码呢&#xff1f;其实&#xff0c;…...

电脑加固态硬盘有什么好处

电脑加固态硬盘有很多好处&#xff0c;以下是一些主要的优点&#xff1a; 1. 启动速度更快&#xff1a;固态硬盘&#xff08;SSD&#xff09;的启动速度比传统机械硬盘&#xff08;HDD&#xff09;快得多。这是因为固态硬盘没有旋转部件&#xff0c;而传统硬盘的读写头需要不断…...

LabVIEW电火花线切割放电点位置

介绍了一个电火花线切割放电点位置分布评价系统&#xff0c;特别是在系统组成、硬件选择和LabVIEW软件应用方面。 本系统由两个主要部分组成&#xff1a;硬件和软件。硬件部分包括电流传感器、高速数据采集卡、开关电源、电阻和导线。软件部分则由LabVIEW编程环境构成&#xf…...

信通院发布《全球数字经济白皮书 (2023年)》解析

文章目录 前言一、白皮书目录二、白皮书核心观点(一)主要国家优化政策布局,数字经济政策导向更加明晰、体系更加完善(二) 数字经济加速构筑经济复苏关键支撑(三)全球数字经济多极化趋势进一步深化(四)数字经济重点领域发展成效显著三、白皮书的主要内容前言 当前,世…...

Spring5系列学习文章分享---第三篇(AOP概念+原理+动态代理+术语+Aspect+操作案例(注解与配置方式))

目录 AOP概念AOP底层原理AOP(JDK动态代理)使用 JDK 动态代理&#xff0c;使用 Proxy 类里面的方法创建代理对象**编写** **JDK** 动态代理代码 AOP(术语)AOP操作&#xff08;准备工作&#xff09;**AOP** **操作&#xff08;**AspectJ注解)**AOP** **操作&#xff08;**AspectJ…...

BL0942 内置时钟免校准计量芯片 用于智能家居领域 上海贝岭 低成本 使用指南

BL0939是上海贝岭股份有限公司开发的一款用于智能家居领域进行电能测量的专用芯片&#xff0c;支持两路测量&#xff0c;可同时进行计量和漏电故障检测&#xff0c;漏电检测电流可设&#xff0c;响应时间快&#xff0c;具有体积小&#xff0c;外围电路简单&#xff0c;成本低廉…...

【算法专题】动态规划之路径问题

动态规划2.0 动态规划 - - - 路径问题1. 不同路径2. 不同路径Ⅱ3. 珠宝的最高价值4. 下降路径最小和5. 最小路径和6. 地下城游戏 动态规划 - - - 路径问题 1. 不同路径 题目链接 -> Leetcode -62.不同路径 Leetcode -62.不同路径 题目&#xff1a;一个机器人位于一个 m …...

Python range函数

Python中的range()函数是一个强大的工具&#xff0c;用于生成一系列的整数。它在循环、迭代和序列生成等方面都有广泛的应用。本文将深入探讨range()函数的用法&#xff0c;提供详细的示例代码&#xff0c;并讨论其在Python编程中的实际应用。 什么是range()函数&#xff1f; …...

Unity中实现捏脸系统

前言 目前市面上常见的捏脸一般是基于BlendShapes和控制骨骼点坐标两种方案实现的。后者能够控制的精细程度更高&#xff0c;同时使用BlendShapes来控制表情。 控制骨骼点坐标 比如找到控制鼻子的骨骼节点修改localScale缩放&#xff0c;调节鼻子大小。 BlendShapes控制表…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法

树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作&#xff0c;无需更改相机配置。但是&#xff0c;一…...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

代码随想录刷题day30

1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币&#xff0c;另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额&#xff0c;返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...