RabbitMQ消息可靠性等机制详解(精细版三)
目录
七 RabbitMQ的其他操作
7.1 消息的可靠性(发送可靠)
7.1.1 confim机制(保证发送可靠)
7.1.2 Return机制(保证发送可靠)
7.1.3 编写配置文件
7.1.4 开启Confirm和Return
7.2 手动Ack(保证接收可靠)
7.2.1 添加配置文件
7.2.2 手动ack
7.3 避免消息重复消费
7.3.1 导入依赖
7.3.2 编写配置文件
7.3.3 修改生产者
7.3.4 修改消费者
官方文档 RabbitMQ Documentation | RabbitMQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章
七 RabbitMQ的其他操作
7.1 消息的可靠性(发送可靠)
7.1.1 confim机制(保证发送可靠)
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
消息传递可靠性 |
---|
![]() |
7.1.2 Return机制(保证发送可靠)
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
消息传递可靠性 |
---|
![]() |
在消息发送方项目上加入下面内容:
7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true
7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
@Autowiredprivate RabbitTemplate rabbitTemplate;
@PostConstruct // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}
7.2 手动Ack(保证接收可靠)
7.2.1 添加配置文件
-
在消费方application.yml文件添加下面配置, 改为手动应答机制.
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual
7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/*** @author 听忆*/
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识 channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,* 当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识 channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
}
}
7.3 避免消息重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
重复消费 |
---|
![]() |
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
然后使用ack给RabbitMQ返回消息
如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx
作用:如果为空就set值,并返回1, true 如果存在(不为空)不进行操作,并返回0, false
7.3.1 导入依赖
生产者和消费者都加入下面依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>
7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379
7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}
7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:* 如果为空就set值,并返回1, true* 如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
@Autowiredprivate StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消费消息System.out.println("接收到消息:" + msg);
//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
}
}
相关文章:

RabbitMQ消息可靠性等机制详解(精细版三)
目录 七 RabbitMQ的其他操作 7.1 消息的可靠性(发送可靠) 7.1.1 confim机制(保证发送可靠) 7.1.2 Return机制(保证发送可靠) 7.1.3 编写配置文件 7.1.4 开启Confirm和Return 7.2 手动Ack(保证接收可靠) 7.2.1 添加配置文件 7.2.2 手动ack 7.3 避免消息重复消费 7.3.…...
88888
49615...

深度学习之激活函数
激活函数的公式根据不同的函数类型而有所不同。以下是一些常见的激活函数及其数学公式: Sigmoid函数: 公式:f(x)特性:输出范围在0到1之间,常用于二分类问题,将输出转换为概率值。但存在梯度消失问题&#…...

OpenStack开源虚拟化平台(一)
目录 一、OpenStack背景介绍(一)OpenStack是什么(二)OpenStack的主要服务 二、计算服务Nova(一)Nova组件介绍(二)Libvirt简介(三)Nova中的RabbitMQ解析 OpenS…...

C++ | Leetcode C++题解之第207题课程表
题目: 题解: class Solution { private:vector<vector<int>> edges;vector<int> indeg;public:bool canFinish(int numCourses, vector<vector<int>>& prerequisites) {edges.resize(numCourses);indeg.resize(numCo…...
vue3中的自定义指令
全局自定义指令 假设我们要创建一个全局指令v-highlight,用于高亮显示元素。这个指令将接受一个颜色参数,并有一个可选的修饰符bold来决定是否加粗文本。 首先,在创建Vue应用时定义这个指令:(这里可以将指令抽离成单…...
Postman接口测试工具的原理及应用详解(一)
本系列文章简介: 在当今软件开发的世界中,接口测试作为保证软件质量的重要一环,其重要性不言而喻。随着前后端分离开发模式的普及,接口测试已成为连接前后端开发的桥梁,确保前后端之间的数据交互准确无误。在这样的背景…...

C++ initializer_list类型推导
目录 initializer_list C自动类型推断 auto typeid decltype initializer_list<T> C支持统一初始化{ },出现了一个新的类型initializer_list<T>,一切类型都可以用列表初始化。提供了一种更加灵活、安全和明确的方式来初始化对象。 class…...

造一个交互式3D火山数据可视化
本文由ScriptEcho平台提供技术支持 项目地址:传送门 使用 Plotly.js 创建交互式 3D 火山数据可视化 应用场景 本代码用于将火山数据库中的数据可视化,展示火山的高度、类型和状态。可用于地质学研究、教育和数据探索。 基本功能 该代码使用 Plotly…...
【网络安全】一文带你了解什么是【CSRF攻击】
CSRF(Cross-Site Request Forgery,跨站请求伪造)是一种网络攻击方式,它利用已认证用户在受信任网站上的身份,诱使用户在不知情的情况下执行恶意操作。具体来说,攻击者通过各种方式(如发送恶意链…...

短视频电商源码如何选择
在数字时代的浪潮下,短视频电商以其直观、生动、互动性强的特点,迅速崛起成为电商行业的一股新势力。对于有志于进军短视频电商领域的创业者来说,选择一款合适的短视频电商源码至关重要。本文将从多个角度探讨如何选择短视频电商源码…...
444444
356前期...

初识LangChain的快速入门指南
个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119qq.com] 📱…...

OpenBayes 教程上新 | CVPR 获奖项目,BioCLlP 快速识别生物种类,再也不会弄混小浣熊和小熊猫了!
市面上有很多植物识别的 App,通过对植物的叶片、花朵、果实等特征进行准确的识别,从而确定植物的种类、名称。但动物识别的 App 却十分有限,这使我们很难区分一些外形相似的动物,例如小浣熊和小熊猫。 左侧为小浣熊,右…...

24 年程序员各岗位薪资待遇汇总(最新)
大家好,我是程序员鱼皮。今天分享 24 年 6 月最新的程序员各岗位薪资待遇汇总。 数据是从哪儿来的呢?其实很简单,BOSS 直聘上有一个免费的薪酬查询工具,只要认证成为招聘者就能直接看,便于招聘者了解市场,…...
Android SurfaceFlinger——系统动画服务启动(十四)
在了解了 SurfaceFlinger、HWC、OpenGL ES 和 EGL 等相关概念和基础信息后,我们通过系统动画的调用流程引入更多的内容。 一、解析init.rc 开机就启动进程,肯定就要从 rc 文件开始。负责开机动画的进程是 bootanimation。 1、bootanim.rc 源码位置:/frameworks/base/cmds…...

VaRest插件常用节点以及Http请求数据
1.解析json (1)Construct Json Object:构建json对象 (2)Decode Json:解析json 将string转换为json (3)Encode json:将json转换为string (4)Get S…...

【Linux】线程id与互斥(线程三)
上一期我们进行了线程控制的了解与相关操作,但是仍旧有一些问题没有解决 本章第一阶段就是解决tid的问题,第二阶段是进行模拟一个简易线程库(为了加深对于C库封装linux原生线程的理解),第三阶段就是互斥。 目录 线程id…...

JavaEE—什么是服务器?以及Tomcat安装到如何集成到IDEA中?
目录 ▐ 前言 ▐ JavaEE是指什么? ▐ 什么是服务器? ▐ Tomcat安装教程 * 修改服务端口号 ▐ 将Tomcat集成到IDEA中 ▐ 测试 ▐ 结语 ▐ 前言 至此,这半年来我已经完成了JavaSE,Mysql数据库,以及Web前端知识的学习了&am…...
主流分布式消息中间件RabbitMQ、RocketMQ
分布式消息中间件在现代分布式系统中起着至关重要的作用。以下是一些主流的分布式消息中间件: 1. Apache Kafka - 特点:高吞吐量、低延迟、持久化、水平可扩展、分布式日志系统。 - 使用场景:日志收集与处理、实时流处理、事件驱动架构、大数…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
uniapp中使用aixos 报错
问题: 在uniapp中使用aixos,运行后报如下错误: AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA
浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求,本次涉及的主要是收费汇聚交换机的配置,浪潮网络设备在高速项目很少,通…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...
LangFlow技术架构分析
🔧 LangFlow 的可视化技术栈 前端节点编辑器 底层框架:基于 (一个现代化的 React 节点绘图库) 功能: 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...

mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践
在 Kubernetes 集群中,如何在保障应用高可用的同时有效地管理资源,一直是运维人员和开发者关注的重点。随着微服务架构的普及,集群内各个服务的负载波动日趋明显,传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...