RabbitMQ消息可靠性等机制详解(精细版三)
目录
七 RabbitMQ的其他操作
7.1 消息的可靠性(发送可靠)
7.1.1 confim机制(保证发送可靠)
7.1.2 Return机制(保证发送可靠)
7.1.3 编写配置文件
7.1.4 开启Confirm和Return
7.2 手动Ack(保证接收可靠)
7.2.1 添加配置文件
7.2.2 手动ack
7.3 避免消息重复消费
7.3.1 导入依赖
7.3.2 编写配置文件
7.3.3 修改生产者
7.3.4 修改消费者
官方文档 RabbitMQ Documentation | RabbitMQ
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
RabbitMQ是一个Erlang开发的AMQP(高级消息排队 协议)(英文全称:Advanced Message Queuing Protocol )的开源实现。-------------接上章
七 RabbitMQ的其他操作
7.1 消息的可靠性(发送可靠)
7.1.1 confim机制(保证发送可靠)
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
| 消息传递可靠性 |
|---|
![]() |
7.1.2 Return机制(保证发送可靠)
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
| 消息传递可靠性 |
|---|
![]() |
在消息发送方项目上加入下面内容:
7.1.3 编写配置文件
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testpublisher-confirms: truepublisher-returns: true
7.1.4 开启Confirm和Return
package com.tingyi.rabbitmq.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/*** @author 听忆*/
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback{
@Autowiredprivate RabbitTemplate rabbitTemplate;
@PostConstruct // init-methodpublic void initMethod(){//指定 ConfirmCallbackrabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallbackrabbitTemplate.setReturnCallback(this);}
@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息已经送达到Exchange");}else{System.out.println("消息没有送达到Exchange");}}
@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息没有送达到Queue");}
}
7.2 手动Ack(保证接收可靠)
7.2.1 添加配置文件
-
在消费方application.yml文件添加下面配置, 改为手动应答机制.
spring:rabbitmq:host: 你的地址port: 5672virtual-host: /tingyiusername: testpassword: testlistener:simple:acknowledge-mode: manual
7.2.2 手动ack
package com.tingyi.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/*** @author 听忆*/
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {System.out.println("接收到消息:" + msg);try {int i = 1 / 0;/*** 消费者发起成功通知* 第一个参数: DeliveryTag,消息的唯一标识 channel+消息编号* 第二个参数:是否开启批量处理 false:不开启批量* 举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,* 当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);} catch (Exception e) {e.printStackTrace();/*** 返回失败通知* 第一个参数: DeliveryTag,消息的唯一标识 channel+消息编号* 第二个boolean true所有消费者都会拒绝这个消息,false代表只有当前消费者拒绝* 第三个boolean true消息接收失败重新回到原有队列中*/channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}
}
}
7.3 避免消息重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
| 重复消费 |
|---|
![]() |
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
然后使用ack给RabbitMQ返回消息
如果RabbitMQack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
备注: java中的方法叫做setIfAbsent, redis中的命令叫做setnx
作用:如果为空就set值,并返回1, true 如果存在(不为空)不进行操作,并返回0, false
7.3.1 导入依赖
生产者和消费者都加入下面依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.4.5</version>
</dependency>
7.3.2 编写配置文件
spring:redis:host: 你的地址port: 6379
7.3.3 修改生产者
@Test
public void contextLoads() throws IOException {CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());//第四个参数: 设置消息唯一idrabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","你看听忆哇",messageId);System.in.read();
}
7.3.4 修改消费者
package com.tingyi.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/*** @author 听忆*/
/*** java中的方法叫做setIfAbsent, redis中的命令叫做setnx* 作用:* 如果为空就set值,并返回1, true* 如果存在(不为空)不进行操作,并返回0, false*/
@Component
public class Consumer {
@Autowiredprivate StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")public void getMessage(String msg, Channel channel, Message message) throws IOException {//0. 获取MessageId, 消息唯一idString messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//1. 设置key到Redisif(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消费消息System.out.println("接收到消息:" + msg);
//3. 设置key的value为1redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ackchannel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ackif("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
}
}
相关文章:
RabbitMQ消息可靠性等机制详解(精细版三)
目录 七 RabbitMQ的其他操作 7.1 消息的可靠性(发送可靠) 7.1.1 confim机制(保证发送可靠) 7.1.2 Return机制(保证发送可靠) 7.1.3 编写配置文件 7.1.4 开启Confirm和Return 7.2 手动Ack(保证接收可靠) 7.2.1 添加配置文件 7.2.2 手动ack 7.3 避免消息重复消费 7.3.…...
88888
49615...
深度学习之激活函数
激活函数的公式根据不同的函数类型而有所不同。以下是一些常见的激活函数及其数学公式: Sigmoid函数: 公式:f(x)特性:输出范围在0到1之间,常用于二分类问题,将输出转换为概率值。但存在梯度消失问题&#…...
OpenStack开源虚拟化平台(一)
目录 一、OpenStack背景介绍(一)OpenStack是什么(二)OpenStack的主要服务 二、计算服务Nova(一)Nova组件介绍(二)Libvirt简介(三)Nova中的RabbitMQ解析 OpenS…...
C++ | Leetcode C++题解之第207题课程表
题目: 题解: class Solution { private:vector<vector<int>> edges;vector<int> indeg;public:bool canFinish(int numCourses, vector<vector<int>>& prerequisites) {edges.resize(numCourses);indeg.resize(numCo…...
vue3中的自定义指令
全局自定义指令 假设我们要创建一个全局指令v-highlight,用于高亮显示元素。这个指令将接受一个颜色参数,并有一个可选的修饰符bold来决定是否加粗文本。 首先,在创建Vue应用时定义这个指令:(这里可以将指令抽离成单…...
Postman接口测试工具的原理及应用详解(一)
本系列文章简介: 在当今软件开发的世界中,接口测试作为保证软件质量的重要一环,其重要性不言而喻。随着前后端分离开发模式的普及,接口测试已成为连接前后端开发的桥梁,确保前后端之间的数据交互准确无误。在这样的背景…...
C++ initializer_list类型推导
目录 initializer_list C自动类型推断 auto typeid decltype initializer_list<T> C支持统一初始化{ },出现了一个新的类型initializer_list<T>,一切类型都可以用列表初始化。提供了一种更加灵活、安全和明确的方式来初始化对象。 class…...
造一个交互式3D火山数据可视化
本文由ScriptEcho平台提供技术支持 项目地址:传送门 使用 Plotly.js 创建交互式 3D 火山数据可视化 应用场景 本代码用于将火山数据库中的数据可视化,展示火山的高度、类型和状态。可用于地质学研究、教育和数据探索。 基本功能 该代码使用 Plotly…...
【网络安全】一文带你了解什么是【CSRF攻击】
CSRF(Cross-Site Request Forgery,跨站请求伪造)是一种网络攻击方式,它利用已认证用户在受信任网站上的身份,诱使用户在不知情的情况下执行恶意操作。具体来说,攻击者通过各种方式(如发送恶意链…...
短视频电商源码如何选择
在数字时代的浪潮下,短视频电商以其直观、生动、互动性强的特点,迅速崛起成为电商行业的一股新势力。对于有志于进军短视频电商领域的创业者来说,选择一款合适的短视频电商源码至关重要。本文将从多个角度探讨如何选择短视频电商源码…...
444444
356前期...
初识LangChain的快速入门指南
个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119qq.com] 📱…...
OpenBayes 教程上新 | CVPR 获奖项目,BioCLlP 快速识别生物种类,再也不会弄混小浣熊和小熊猫了!
市面上有很多植物识别的 App,通过对植物的叶片、花朵、果实等特征进行准确的识别,从而确定植物的种类、名称。但动物识别的 App 却十分有限,这使我们很难区分一些外形相似的动物,例如小浣熊和小熊猫。 左侧为小浣熊,右…...
24 年程序员各岗位薪资待遇汇总(最新)
大家好,我是程序员鱼皮。今天分享 24 年 6 月最新的程序员各岗位薪资待遇汇总。 数据是从哪儿来的呢?其实很简单,BOSS 直聘上有一个免费的薪酬查询工具,只要认证成为招聘者就能直接看,便于招聘者了解市场,…...
Android SurfaceFlinger——系统动画服务启动(十四)
在了解了 SurfaceFlinger、HWC、OpenGL ES 和 EGL 等相关概念和基础信息后,我们通过系统动画的调用流程引入更多的内容。 一、解析init.rc 开机就启动进程,肯定就要从 rc 文件开始。负责开机动画的进程是 bootanimation。 1、bootanim.rc 源码位置:/frameworks/base/cmds…...
VaRest插件常用节点以及Http请求数据
1.解析json (1)Construct Json Object:构建json对象 (2)Decode Json:解析json 将string转换为json (3)Encode json:将json转换为string (4)Get S…...
【Linux】线程id与互斥(线程三)
上一期我们进行了线程控制的了解与相关操作,但是仍旧有一些问题没有解决 本章第一阶段就是解决tid的问题,第二阶段是进行模拟一个简易线程库(为了加深对于C库封装linux原生线程的理解),第三阶段就是互斥。 目录 线程id…...
JavaEE—什么是服务器?以及Tomcat安装到如何集成到IDEA中?
目录 ▐ 前言 ▐ JavaEE是指什么? ▐ 什么是服务器? ▐ Tomcat安装教程 * 修改服务端口号 ▐ 将Tomcat集成到IDEA中 ▐ 测试 ▐ 结语 ▐ 前言 至此,这半年来我已经完成了JavaSE,Mysql数据库,以及Web前端知识的学习了&am…...
主流分布式消息中间件RabbitMQ、RocketMQ
分布式消息中间件在现代分布式系统中起着至关重要的作用。以下是一些主流的分布式消息中间件: 1. Apache Kafka - 特点:高吞吐量、低延迟、持久化、水平可扩展、分布式日志系统。 - 使用场景:日志收集与处理、实时流处理、事件驱动架构、大数…...
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
Kafka入门-生产者
生产者 生产者发送流程: 延迟时间为0ms时,也就意味着每当有数据就会直接发送 异步发送API 异步发送和同步发送的不同在于:异步发送不需要等待结果,同步发送必须等待结果才能进行下一步发送。 普通异步发送 首先导入所需的k…...
Golang——6、指针和结构体
指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...
android13 app的触摸问题定位分析流程
一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...
Linux部署私有文件管理系统MinIO
最近需要用到一个文件管理服务,但是又不想花钱,所以就想着自己搭建一个,刚好我们用的一个开源框架已经集成了MinIO,所以就选了这个 我这边对文件服务性能要求不是太高,单机版就可以 安装非常简单,几个命令就…...
【Veristand】Veristand环境安装教程-Linux RT / Windows
首先声明,此教程是针对Simulink编译模型并导入Veristand中编写的,同时需要注意的是老用户编译可能用的是Veristand Model Framework,那个是历史版本,且NI不会再维护,新版本编译支持为VeriStand Model Generation Suppo…...


