当前位置: 首页 > news >正文

Rabbitmq消息不丢失

目录

  • 一、消息不丢失
    • 1.消息确认
    • 2.消息确认业务封装
      • 2.1 发送确认消息测试
      • 2.2 消息发送失败,设置重发机制

一、消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:
1,生产者不丢数据
2,MQ服务器不丢数据
3,消费者不丢数据
保证消息不丢失有两种实现方式:
1,开启事务模式
2,消息确认模式
说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

1.消息确认

消息持久化
如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化
Exchange
声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)
Queue
声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)
message
发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认
有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认
有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?
要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

2.消息确认业务封装

service-mq修改配置
开启rabbitmq消息确认配置,在common的配置文件中都已经配置好了!

spring:rabbitmq:host: 192.168.121.140port: 5672username: adminpassword: adminpublisher-confirms-type: correlated  #交换机的确认publisher-returns: true  #队列的确认listener:simple:acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manualprefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

搭建rabbit-util模块
由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可
搭建方式如:
pom.xml

    <dependencies><!--rabbitmq消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 协议--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies>

4.2.4 封装发送端消息确认

/*** @Description 消息发送确认* <p>* ConfirmCallback  只确认消息是否正确到达 Exchange 中* ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行* <p>* 1. 如果消息没有到exchange,则confirm回调,ack=false* 2. 如果消息到达exchange,则confirm回调,ack=true* 3. exchange到queue成功,则不回调return* 4. exchange到queue失败,则回调return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化对象输出System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}}

封装消息发送

@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/***  发送消息* @param exchange 交换机* @param routingKey 路由键* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}

2.1 发送确认消息测试

消息发送端

@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息发送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");return Result.ok();}
}

消息接收端

@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "true"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

测试:http://localhost:8282/mq/sendConfirm

2.2 消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制
模块中添加依赖

<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {//  消息主体private Object message;//  交换机private String exchange;//  路由键private String routingKey;//  重试次数private int retryCount = 0;//  消息类型  是否是延迟消息private boolean isDelay = false;//  延迟时间private int delayTime = 10;
}

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){//  将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  声明一个correlationId的变量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);//  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);//  默认返回truereturn true;
}发送失败调用重发方法  MQProducerAckConfig 类中修改
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//  ack = true 说明消息正确发送到了交换机if (ack){System.out.println("哥们你来了.");log.info("消息发送到了交换机");}else {//  消息没有到交换机log.info("消息没发送到交换机");//  调用重试发送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);//  获取这个CorrelationData对象的Id  spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);//  消息没有到队列的时候,则会调用重试发送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);//  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.this.retrySendMsg(gmallCorrelationData);
}/*** 重试发送方法* @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {//  数据类型转换  统一转换为子类处理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;//  获取到重试次数 初始值 0int retryCount = gmallCorrelationData.getRetryCount();//  判断if (retryCount>=3){//  不需要重试了log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));} else {//  变量更新retryCount+=1;//  重新赋值重试次数 第一次重试 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重试次数:\t"+retryCount);//  更新缓存中的数据this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}

测试:只需修改(错误信息)
在这里插入图片描述
在这里插入图片描述

相关文章:

Rabbitmq消息不丢失

目录 一、消息不丢失1.消息确认2.消息确认业务封装2.1 发送确认消息测试2.2 消息发送失败&#xff0c;设置重发机制 一、消息不丢失 消息的不丢失&#xff0c;在MQ角度考虑&#xff0c;一般有三种途径&#xff1a; 1&#xff0c;生产者不丢数据 2&#xff0c;MQ服务器不丢数据…...

Kotlin runBlocking launch多个协程读写mutableListOf时序

Kotlin runBlocking launch多个协程读写mutableListOf时序 import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlockingfun main(args: Array<String>) {var lists mutableListOf<String>()runBlocking {launch {r…...

Spring Cloud微服务治理框架深度解析

在学习一个技术之前&#xff0c;首先我们要了解它是做什么的&#xff0c;我们为什么要用它。不然看再多资料都理解不了&#xff0c;因此我们先来讲解下Spring Cloud Spring Cloud是一套微服务治理框架&#xff0c;几乎考虑到了微服务治理的方方面面。那么接下来具体说下 Spring…...

设计模式之原型模式Prototype的C++实现

1、原型模式提出 在软件功能设计中&#xff0c;经常面临着“某些结构复杂的对象”的创建工作&#xff0c;且创建的对象想拥有其他对象在某一刻的状态&#xff0c;则可以使用原型模型。原型模型是通过拷贝构造函数来创建对象&#xff0c;并且该对象拥有其他对象在某一刻的状态。…...

Java 中操作 Redis

文章目录 一、Redis 常用数据类型二、Redis 常用操作命令1. 字符串命令2. 哈希命令3. 列表命令4. 集合命令5. 有序集合命令6. 通用命令 三、在 Java 中操作 Redis1. 导入 maven 坐标2. 配置 Redis 数据源3. 编写配置类 四、在代码中的具体使用 一、Redis 常用数据类型 Redis 存…...

数据结构--最短路径 Dijkstra算法

数据结构–最短路径 Dijkstra算法 Dijkstra算法 计算 b e g i n 点到各个点的最短路 \color{red}计算\ begin\ 点到各个点的最短路 计算 begin 点到各个点的最短路 如果是无向图&#xff0c;可以先把无向图转化成有向图 我们需要2个数组 final[] &#xff08;标记各顶点是否已…...

在 Linux 虚拟机上使用 Azure 自定义脚本扩展版本

参考 azure创建虚拟机,创建虚拟机注意入站端口规则开放80端口、 2.转到资源&#xff0c;点击扩展应用程序&#xff0c;创建存储账户&#xff0c;创建容器&#xff0c;上传文件&#xff0c;选择文件&#xff0c;会自动执行部署。 apt-get update -y && apt-get insta…...

W5500-EVB-PICO 做UDP Server进行数据回环测试(七)

前言 前面我们用W5500-EVB-PICO 开发板在TCP Client和TCP Server模式下&#xff0c;分别进行数据回环测试&#xff0c;本章我们将用开发板在UDP Server模式下进行数据回环测试。 UDP是什么&#xff1f;什么是UDP Server&#xff1f;能干什么&#xff1f; UDP (User Dataqram P…...

ES搜索引擎入门+最佳实践(九):项目实战(二)--elasticsearch java api 进行数据增删改查

本篇是这个系列的最后一篇了,在这之前可以先看看前面的内容: ES搜索引擎入门最佳实践(一)_flame.liu的博客-CSDN博客 ES搜索引擎入门最佳实践(二)_flame.liu的博客-CSDN博客 ES搜索引擎入门最佳实践(三)_flame.liu的博客-CSDN博客 ES搜索引擎入门最佳实践(四)_flame.liu的博…...

android内存分析工具记录,请利用好最后2个神器

相机见证了java内存暴增和native持续增长的问题&#xff0c;因此这里记录一下使用的工具情况&#xff0c;方便后续继续使用 一、java 内存 如果是java层的内存可以直接借助leakCanary工具&#xff0c;配置也很简单&#xff0c;直接在build.gradle中添加依赖即可&#xff1a; …...

安科瑞变电所运维平台在电力系统中应用分析

摘要&#xff1a;现代居民生活、工作对电力资源的需求量相对较多&#xff0c;给我国的电力产业带来了良好的发展机遇与挑战。探索电力系统基本构成&#xff0c; 将变电运维安全管理以及相应的设备维护工作系统性开展&#xff0c;能够根据项目实践工作要求&#xff0c;将满足要求…...

uniapp开发微信小程序使用painter将页面转换为图片并保存到本地相册

引言 我使用到painter的原因是&#xff0c;在uniapp开发微信小程序时&#xff0c;需要将一个页面的内容转换成图片保存到本地相册。 起初在网上找到很多都是在uniapp中使用 html2canvas 将网页转换成图片再jspdf将图片转换为pdf&#xff0c;但是这种方式在小程序环境不支持&am…...

790. 数的三次方根

文章目录 QuestionIdeasCode Question 给定一个浮点数 n &#xff0c;求它的三次方根。 输入格式 共一行&#xff0c;包含一个浮点数 n 。 输出格式 共一行&#xff0c;包含一个浮点数&#xff0c;表示问题的解。 注意&#xff0c;结果保留 6 位小数。 数据范围 −10000≤…...

POSTGRESQL 关于2023-08-14 数据库自动启动文章中使用KILL 来进行配置RELOAD的问题解释...

开头还是介绍一下群&#xff0c;如果感兴趣Polardb ,mongodb ,MySQL ,Postgresql ,redis &#xff0c;SQL SERVER ,ORACLE,Oceanbase 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请加 liuaustin3微信号 &…...

vue 使用插件高德地图--vue-amap

第一步&#xff1a;安装 vue-amap npm install vue-amap第二步&#xff1a;在你的 Vue 项目中注册 vue-amap&#xff1a; // main.js import Vue from vue; import VueAMap from vue-amap;Vue.use(VueAMap);VueAMap.initAMapApiLoader({// 高德开发者平台申请key值key: cc9c098…...

减速比如何计算

减速比是用来衡量机械系统中输入轴和输出轴转速之间的比例关系&#xff0c;通常用来描述传动装置&#xff08;如齿轮传动、皮带传动等&#xff09;的效果。计算减速比的公式取决于传动装置的类型。以下是一些常见传动装置的减速比计算方法&#xff1a; 齿轮传动&#xff1a; 对…...

HarmonyOS/OpenHarmony应用开发-ArkTSAPI组件总体分类与说明(下)

六、文本与输入 Text 显示一段文本的组件。 Span 作为Text组件的子组件&#xff0c;用于显示行内文本片段的组件。 Search 搜索框组件&#xff0c;适用于浏览器的搜索内容输入框等应用场景。 TextArea 多行文本输入框组件&#xff0c;当输入的文本内容超过组件宽度时会自动换行…...

势函数和鞅的停时定理

前置芝士 鞅&#xff1a; 鞅是一类特殊的随机过程&#xff0c;假设我们从一开始就在观察一场赌博游戏&#xff0c;现在已经得到了前t秒的观测值&#xff0c;那么当第t1 秒观测值的期望等于第t秒的观测值时&#xff0c;我们称这是一个公平赌博游戏。 具体来说&#xff0c;对于…...

途乐证券-炒股开户流程是怎样的?

炒股是一种危险较大但收益也相对较高的出资方法&#xff0c;而开户则是出资炒股的前提。跟着科技的开展&#xff0c;炒股开户已经能够在线完结&#xff0c;但流程相对来说仍是比较繁琐的。那么&#xff0c;炒股开户流程是怎样的呢&#xff1f;下面从多个视点剖析。 一、炒股开户…...

Eclipse如何设置快捷键

在eclopse设置注释行和取消注释行 // 打开eclipse&#xff0c;依次打开&#xff1a;Window -> Preferences -> General -> Key&#xff0c;...

如何高效优化Windows系统性能:AtlasOS完整调优指南

如何高效优化Windows系统性能&#xff1a;AtlasOS完整调优指南 【免费下载链接】Atlas &#x1f680; An open and lightweight modification to Windows, designed to optimize performance, privacy and usability. 项目地址: https://gitcode.com/GitHub_Trending/atlas1/…...

LongCat-Video:AI视频生成技术的范式突破与实践指南

LongCat-Video&#xff1a;AI视频生成技术的范式突破与实践指南 【免费下载链接】LongCat-Video 项目地址: https://ai.gitcode.com/hf_mirrors/meituan-longcat/LongCat-Video 在数字内容创作领域&#xff0c;AI视频生成技术正经历从实验性探索到产业化应用的关键转折…...

DocRes实战指南:高效统一文档图像修复任务的完整解决方案

DocRes实战指南&#xff1a;高效统一文档图像修复任务的完整解决方案 【免费下载链接】DocRes [CVPR 2024] DocRes: A Generalist Model Toward Unifying Document Image Restoration Tasks 项目地址: https://gitcode.com/gh_mirrors/do/DocRes DocRes是一个革命性的通…...

AWCII 040 CPU模块

AWCII 040 CPU 模块AWCII 040 是工业自动化控制系统中的中央处理单元&#xff08;CPU 模块&#xff09;&#xff0c;主要用于执行控制程序、数据运算及系统管理&#xff0c;是整个控制系统的核心“大脑”。一、基本概述AWCII 040 CPU 模块集成了处理器、存储单元及系统管理功能…...

丹青幻境保姆级教程:LoRA卷轴版本管理与热更新机制在生产环境落地

丹青幻境保姆级教程&#xff1a;LoRA卷轴版本管理与热更新机制在生产环境落地 1. 项目背景与核心价值 丹青幻境是一款专为数字艺术创作者设计的AI绘画工具&#xff0c;它巧妙地将现代AI技术与传统东方美学相结合。与传统的技术工具不同&#xff0c;丹青幻境采用了宣纸质感界面…...

3步搞定B站4K视频下载:开源工具bilibili-downloader终极指南

3步搞定B站4K视频下载&#xff1a;开源工具bilibili-downloader终极指南 【免费下载链接】bilibili-downloader B站视频下载&#xff0c;支持下载大会员清晰度4K&#xff0c;持续更新中 项目地址: https://gitcode.com/gh_mirrors/bil/bilibili-downloader 想要免费下载…...

突破音乐加密限制:Unlock Music实现跨平台音频自由解决方案

突破音乐加密限制&#xff1a;Unlock Music实现跨平台音频自由解决方案 【免费下载链接】unlock-music 在浏览器中解锁加密的音乐文件。原仓库&#xff1a; 1. https://github.com/unlock-music/unlock-music &#xff1b;2. https://git.unlock-music.dev/um/web 项目地址: …...

终极Redis可视化工具:Another Redis Desktop Manager完全使用指南

终极Redis可视化工具&#xff1a;Another Redis Desktop Manager完全使用指南 【免费下载链接】AnotherRedisDesktopManager &#x1f680;&#x1f680;&#x1f680;A faster, better and more stable Redis desktop manager [GUI client], compatible with Linux, Windows, …...

实战指南|OpenWrt磁盘扩容全流程解析与避坑技巧

1. 为什么需要给OpenWrt扩容&#xff1f; 很多朋友第一次接触OpenWrt时都会遇到一个尴尬的问题&#xff1a;系统默认分配的存储空间太小了。我自己刚开始用OpenWrt时也踩过这个坑&#xff0c;当时想装个Docker跑点服务&#xff0c;结果发现连最基本的镜像都拉不下来。这就像给…...

Java 零基础全套视频教程,String StringBuffer StringBuilder 类,笔记142-146

Java 零基础全套视频教程&#xff0c;String StringBuffer StringBuilder 类&#xff0c;笔记142-146 一、参考资料 【尚硅谷Java零基础全套视频教程(宋红康主讲&#xff0c;java入门自学必备)】 https://www.bilibili.com/video/BV1PY411e7J6/?p142&share_sourcecopy_web…...