RabbitMQ消息可靠性等机制详解(精细版三)
目录
七 RabbitMQ的其他操作
7.1 消息的可靠性(发送可靠)
7.1.1 confim机制(保证发送可靠)
7.1.2 Return机制(保证发送可靠)
7.1.3 编写配置文件
7.1.4 开启Confirm和Return
7.2 手动Ack(保证接收可靠)
7.2.1 添加配置文件
7.2.2 手动ack
7.3 避免消息重复消费
7.3.1 导入依赖
7.3.2 编写配置文件
7.3.3 修改生产者
7.3.4 修改消费者
官方文档 RabbitMQ Documentation | RabbitMQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章
七 RabbitMQ的其他操作
7.1 消息的可靠性(发送可靠)
7.1.1 confim机制(保证发送可靠)
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
| 消息传递可靠性 |
|---|
![]() |
7.1.2 Return机制(保证发送可靠)
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
| 消息传递可靠性 |
|---|
![]() |
在消息发送方项目上加入下面内容:
7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true
7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
@Autowiredprivate RabbitTemplate rabbitTemplate;
@PostConstruct // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}
7.2 手动Ack(保证接收可靠)
7.2.1 添加配置文件
-
在消费方application.yml文件添加下面配置, 改为手动应答机制.
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual
7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/*** @author 听忆*/
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识 channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,* 当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识 channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
}
}
7.3 避免消息重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
| 重复消费 |
|---|
![]() |
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
然后使用ack给RabbitMQ返回消息
如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx
作用:如果为空就set值,并返回1, true 如果存在(不为空)不进行操作,并返回0, false
7.3.1 导入依赖
生产者和消费者都加入下面依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>
7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379
7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}
7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:* 如果为空就set值,并返回1, true* 如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
@Autowiredprivate StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消费消息System.out.println("接收到消息:" + msg);
//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
}
}
相关文章:
RabbitMQ消息可靠性等机制详解(精细版三)
目录 七 RabbitMQ的其他操作 7.1 消息的可靠性(发送可靠) 7.1.1 confim机制(保证发送可靠) 7.1.2 Return机制(保证发送可靠) 7.1.3 编写配置文件 7.1.4 开启Confirm和Return 7.2 手动Ack(保证接收可靠) 7.2.1 添加配置文件 7.2.2 手动ack 7.3 避免消息重复消费 7.3.…...
88888
49615...
深度学习之激活函数
激活函数的公式根据不同的函数类型而有所不同。以下是一些常见的激活函数及其数学公式: Sigmoid函数: 公式:f(x)特性:输出范围在0到1之间,常用于二分类问题,将输出转换为概率值。但存在梯度消失问题&#…...
OpenStack开源虚拟化平台(一)
目录 一、OpenStack背景介绍(一)OpenStack是什么(二)OpenStack的主要服务 二、计算服务Nova(一)Nova组件介绍(二)Libvirt简介(三)Nova中的RabbitMQ解析 OpenS…...
C++ | Leetcode C++题解之第207题课程表
题目: 题解: class Solution { private:vector<vector<int>> edges;vector<int> indeg;public:bool canFinish(int numCourses, vector<vector<int>>& prerequisites) {edges.resize(numCourses);indeg.resize(numCo…...
vue3中的自定义指令
全局自定义指令 假设我们要创建一个全局指令v-highlight,用于高亮显示元素。这个指令将接受一个颜色参数,并有一个可选的修饰符bold来决定是否加粗文本。 首先,在创建Vue应用时定义这个指令:(这里可以将指令抽离成单…...
Postman接口测试工具的原理及应用详解(一)
本系列文章简介: 在当今软件开发的世界中,接口测试作为保证软件质量的重要一环,其重要性不言而喻。随着前后端分离开发模式的普及,接口测试已成为连接前后端开发的桥梁,确保前后端之间的数据交互准确无误。在这样的背景…...
C++ initializer_list类型推导
目录 initializer_list C自动类型推断 auto typeid decltype initializer_list<T> C支持统一初始化{ },出现了一个新的类型initializer_list<T>,一切类型都可以用列表初始化。提供了一种更加灵活、安全和明确的方式来初始化对象。 class…...
造一个交互式3D火山数据可视化
本文由ScriptEcho平台提供技术支持 项目地址:传送门 使用 Plotly.js 创建交互式 3D 火山数据可视化 应用场景 本代码用于将火山数据库中的数据可视化,展示火山的高度、类型和状态。可用于地质学研究、教育和数据探索。 基本功能 该代码使用 Plotly…...
【网络安全】一文带你了解什么是【CSRF攻击】
CSRF(Cross-Site Request Forgery,跨站请求伪造)是一种网络攻击方式,它利用已认证用户在受信任网站上的身份,诱使用户在不知情的情况下执行恶意操作。具体来说,攻击者通过各种方式(如发送恶意链…...
短视频电商源码如何选择
在数字时代的浪潮下,短视频电商以其直观、生动、互动性强的特点,迅速崛起成为电商行业的一股新势力。对于有志于进军短视频电商领域的创业者来说,选择一款合适的短视频电商源码至关重要。本文将从多个角度探讨如何选择短视频电商源码…...
444444
356前期...
初识LangChain的快速入门指南
个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119qq.com] 📱…...
OpenBayes 教程上新 | CVPR 获奖项目,BioCLlP 快速识别生物种类,再也不会弄混小浣熊和小熊猫了!
市面上有很多植物识别的 App,通过对植物的叶片、花朵、果实等特征进行准确的识别,从而确定植物的种类、名称。但动物识别的 App 却十分有限,这使我们很难区分一些外形相似的动物,例如小浣熊和小熊猫。 左侧为小浣熊,右…...
24 年程序员各岗位薪资待遇汇总(最新)
大家好,我是程序员鱼皮。今天分享 24 年 6 月最新的程序员各岗位薪资待遇汇总。 数据是从哪儿来的呢?其实很简单,BOSS 直聘上有一个免费的薪酬查询工具,只要认证成为招聘者就能直接看,便于招聘者了解市场,…...
Android SurfaceFlinger——系统动画服务启动(十四)
在了解了 SurfaceFlinger、HWC、OpenGL ES 和 EGL 等相关概念和基础信息后,我们通过系统动画的调用流程引入更多的内容。 一、解析init.rc 开机就启动进程,肯定就要从 rc 文件开始。负责开机动画的进程是 bootanimation。 1、bootanim.rc 源码位置:/frameworks/base/cmds…...
VaRest插件常用节点以及Http请求数据
1.解析json (1)Construct Json Object:构建json对象 (2)Decode Json:解析json 将string转换为json (3)Encode json:将json转换为string (4)Get S…...
【Linux】线程id与互斥(线程三)
上一期我们进行了线程控制的了解与相关操作,但是仍旧有一些问题没有解决 本章第一阶段就是解决tid的问题,第二阶段是进行模拟一个简易线程库(为了加深对于C库封装linux原生线程的理解),第三阶段就是互斥。 目录 线程id…...
JavaEE—什么是服务器?以及Tomcat安装到如何集成到IDEA中?
目录 ▐ 前言 ▐ JavaEE是指什么? ▐ 什么是服务器? ▐ Tomcat安装教程 * 修改服务端口号 ▐ 将Tomcat集成到IDEA中 ▐ 测试 ▐ 结语 ▐ 前言 至此,这半年来我已经完成了JavaSE,Mysql数据库,以及Web前端知识的学习了&am…...
主流分布式消息中间件RabbitMQ、RocketMQ
分布式消息中间件在现代分布式系统中起着至关重要的作用。以下是一些主流的分布式消息中间件: 1. Apache Kafka - 特点:高吞吐量、低延迟、持久化、水平可扩展、分布式日志系统。 - 使用场景:日志收集与处理、实时流处理、事件驱动架构、大数…...
SAP ECC6 2027年停服倒计时:手把手教你评估四大迁移路径与成本(含第三方支持避坑指南)
SAP ECC6 2027年停服倒计时:企业迁移决策全景指南 当2027年的钟声敲响时,全球仍在运行SAP ECC6系统的企业将面临一个关键转折点。这不是简单的技术升级,而是一次关乎企业数字化未来的战略抉择。作为经历过三次SAP重大版本迁移的顾问ÿ…...
在PyCharm中上传代码到Gitee仓库
最近学习python,使用pycharm过程中配置远程仓库方式,以gitee为例新建一个项目作为演示点击菜单中的VCS,选择启用版本控制集成弹出的窗口直接点确认在左侧的菜单中找到仓库全选输入提交消息,并点击提交或者提交或推送在弹出的窗口中…...
对比直接调用与通过聚合平台调用大模型的体验差异
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 对比直接调用与通过聚合平台调用大模型的体验差异 作为一名需要频繁使用多种大语言模型的开发者,我曾长期维护着来自不…...
Cursor配置管理:使用符号链接与CLI实现多项目环境一键切换
1. 项目概述:为什么我们需要管理Cursor的配置?如果你和我一样,每天大部分时间都泡在Cursor这个AI驱动的代码编辑器里,那你肯定遇到过这样的场景:早上打开电脑,准备开始一个全新的前端项目,你熟练…...
容器化自动化数据抓取平台OpenClaw-Compose部署与实战指南
1. 项目概述:一个容器化的开源自动化抓取与处理平台最近在折腾一个自动化数据抓取和处理的项目,发现了一个挺有意思的GitHub仓库:alexleach/openclaw-compose。乍一看标题,你可能会觉得这又是一个普通的Docker Compose编排文件集合…...
免费跨平台绘图神器:draw.io桌面版终极使用指南
免费跨平台绘图神器:draw.io桌面版终极使用指南 【免费下载链接】drawio-desktop Official electron build of draw.io 项目地址: https://gitcode.com/GitHub_Trending/dr/drawio-desktop 还在为不同系统间的图表文件兼容性而烦恼吗?ᾑ…...
RK3576开发板AIoT实战:从模型转换到边缘部署全流程解析
1. 项目概述:从一块开发板到AI应用落地的完整旅程 最近几年,AIoT(人工智能物联网)的概念越来越火,但很多开发者朋友拿到一块功能强大的开发板后,往往卡在“如何把AI模型真正跑起来”这一步。我手头这块RK35…...
深度架构解析:深圳地铁大数据客流分析系统的技术演进与架构哲学
深度架构解析:深圳地铁大数据客流分析系统的技术演进与架构哲学 【免费下载链接】SZT-bigdata 深圳地铁大数据客流分析系统🚇🚄🌟 项目地址: https://gitcode.com/gh_mirrors/sz/SZT-bigdata 在智慧城市建设的浪潮中&#…...
从V1到V3:手把手教你用PyTorch复现MobileNet进化史(附完整代码)
从V1到V3:手把手教你用PyTorch复现MobileNet进化史(附完整代码) 在移动端和嵌入式设备上部署深度学习模型一直是计算机视觉领域的核心挑战之一。2017年,Google推出的MobileNet系列彻底改变了轻量级卷积神经网络的设计范式…...
QtScrcpy:将手机屏幕变成电脑扩展屏的终极解决方案
QtScrcpy:将手机屏幕变成电脑扩展屏的终极解决方案 【免费下载链接】QtScrcpy Android实时投屏软件,此应用程序提供USB(或通过TCP/IP)连接的Android设备的显示和控制。它不需要任何root访问权限 项目地址: https://gitcode.com/barry-ran/QtScrcpy …...


