Rocketmq消费消息时不丢失不重复
消息消费不丢失
手动ACK
在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
spring boot中 消费消息确认
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",topic = "${carInInfo.topic}", selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param addCarInParkDTO*/@Overridepublic void onMessage(AddCarInParkDTO addCarInParkDTO) {System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));}
指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:
DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}
只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了
private void initRocketMQPushConsumer() throws MQClientException {......switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}
消息重试
对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。
如何让消息进行重试
RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}
一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
每次重试的间隔时间如下:

死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
消息幂等
在MQ系统中,消息幂等有三种实现语义:
at most once 最多一次:每条消息最多只会被消费一次
at least once 至少一次:每条消息至少会被消费一次
exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。
RocketMQ 消息重复的场景
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复消息
消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
消息幂等解决方案
在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
要实现消息的幂等,我们可能会采取这样的方案:
select * from t_order where order_no = 'order123'
if(order != null) {return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。
还可以通过以下方式处理:
- 使用数据库的行锁处理
- 利用分布式锁处理不同服务间的并发。
- 数据库对唯一值的入库字段设唯一索引。
相关文章:
Rocketmq消费消息时不丢失不重复
消息消费不丢失 手动ACK 在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业…...
RedisInsight——redis的桌面UI工具使用实践
下载 官网下载安装。下载地址在这里 填个邮箱地址就可以下载了。 安装使用。 安装成功后开始使用。 1. 你可以add一个地址。或者登录redis cloud 去auto-discover 2 . 新增你的redis库地址。注意index的取值 3。现在可以登录到redis了。看看结果 这是现在 在服务器上执行…...
JVM对象创建与内存分配
对象的创建 对象创建的主要流程: 类加载推荐博客:JVM类加载机制详解 类加载检查 虚拟机遇到一条new指令时,首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引用,并且检查这个符号引用代表的类是否已被加载、解析…...
央国企数字化转型难在哪?为什么要数字化转型?
科技在发展,技术在升级,全球信息化、数字化的步伐在加快,企业想要在未来的发展中抓住机会,更好地发展壮大,就需要加快企业数字化转型的速度,才能立足于信息化、数字化时代,央国企作为企业中的一…...
第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份
第7天:信息打点-资产泄漏&CMS识别&Git监控&SVN&DS_Store&备份 知识点: 一、cms指纹识别获取方式 网上开源的程序,得到名字就可以搜索直接获取到源码。 cms在线识别: CMS识别:https://www.yun…...
不可思议,红警居然开源了!
红警,准确的说应该叫“红色警戒”,是大部分 80/90 后记忆里跟游戏二字关系最深的情节。 相信每一名 80/90 后,都有一段难忘的红警岁月,甚至可以说很多人的青春,就叫红警! 说到红色警戒游戏,估计应该是很多…...
yolo系列模型训练数据集全流程制作方法(附数据增强代码)
yolo系列的模型在目标检测领域里面受众非常广,也十分流行,但是在使用yolo进行目标检测训练的时候,往往要将VOC格式的数据集转化为yolo专属的数据集,而yolo的训练数据集制作方法呢,最常见的也是有两种,下面我…...
4、FFmpeg命令行操作7
转封装(1) 保持编码格式: ffmpeg -i test.mp4 -vcodec copy -acodec copy test_copy.ts ffmpeg -i test.mp4 -codec copy test_copy2.ts 改变编码格式: ffmpeg -i test.mp4 -vcodec libx265 -acodec libmp3lame out_h265_mp3.mkv …...
算法进阶——链表中环的入口节点
题目 给一个长度为n链表,若其中包含环,请找出该链表的环的入口结点,否则,返回null。 数据范围:1<结点值<10000 要求:空间复杂度O(1),时间复杂度O(n) 例如,输入{1,2},{3,4,5…...
无线WiFi安全渗透与攻防(N.1)WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速
WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速 WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速1.渗透WIFI1.导入密码字典2.导入essid,破解完成记得删除3.批处理数据库,速度比较慢,耐心等待4.batch-table(批处理数据库)加…...
YOLOV8部署Android Studio安卓平台NCNN
下载Android Studio,配置安卓开发环境,这个过程比较漫长。 安装cmake,注意安装的是cmake3.10版本。 根据手机安卓版本选择相应的安卓版本,我的是红米K30Pro,安卓12。 使用腾讯开源的ncnn,这是一个为手机端极…...
【算法萌新闯力扣】:旋转字符串
力扣热题:796.旋转字符串 开篇 今天下午刷了6道力扣算法题,选了一道有多种解法的题目与大家分享。 题目链接:796.旋转字符串 题目描述 代码思路 完全按照题目的要求,利用StringBuffer中的方法对字符串进行旋转,寻找相同的一项 …...
可逆矩阵的性质
如果矩阵A可逆,那么它的逆矩阵也可逆,并且如果矩阵A可逆,假设是一个不为0的数,那么也可逆,并且如果矩阵A和都可逆,而且它们的阶数也相同,那么它们的乘积也是可逆的,并且如果矩阵A可逆…...
HIT 模式识别 手写汉字分类 Python实现
训练集数据 TrainSamples-400.csv,含 100 个不同汉字,每个汉字 400 个实例,每个实例均为 64*64 的二值图像; 训练集标注TrainSamples-400.csv,为 40000 个 0 到 99 间的整数,表示训练集中每个实例所属汉字类…...
GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。
内容来源:xiaohuggg GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。 它可以模拟人类浏览网页时的行为,如点击链接、填写表单、滚动页面等。 它通过视觉理解技术识别网页上的元素,就像…...
剪辑视频怎么把说话声音转成文字?
短视频已然成为了一种生活潮流,我们每天都在浏览各种短视频,或者用视频的形式记录生活,在制作视频的时候,字幕是一个很大的问题,给视频添加字幕可以更直观、更方便浏览。手动添加太费时间,下面就给大家分享…...
maven打包插件配置模板
主要有两类: 1、maven-shade-plugin 主要用于java程序编写的的打包 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</ve…...
clusterProfiler包学习
📖 Introduction | Biomedical Knowledge Mining using GOSemSim and clusterProfiler (yulab-smu.top) 部分使用 #GO classificationlibrary(clusterProfiler) data(geneList, package"DOSE") gene <- names(geneList)[abs(geneList) > 2]# Entre…...
【Qt开发流程之】布局管理
介绍 一个界面呈现,如果要让用户有更好的观感,布局必不可少。 【Qt之布局】QVBoxLayout、QHBoxLayout、QGridLayout、QFormLayout介绍及使用 链接: https://blog.csdn.net/MrHHHHHH/article/details/133915208 qt布局类图: Qt布局是Qt图形…...
建筑可视化中的 3D 纹理
在线工具推荐: 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 1、什么是 3D 纹理? 纹理是将二维图像添加到三维模型的技术艺术。虽然对物体进行纹…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...
