Rocketmq消费消息时不丢失不重复
消息消费不丢失
手动ACK
在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS
spring boot中 消费消息确认
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",topic = "${carInInfo.topic}", selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param addCarInParkDTO*/@Overridepublic void onMessage(AddCarInParkDTO addCarInParkDTO) {System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));}
指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:
DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现
public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}
只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了
private void initRocketMQPushConsumer() throws MQClientException {......switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}
消息重试
对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。
如何让消息进行重试
RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。
当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列。
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}
一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。
每次重试的间隔时间如下:

死信队列
当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。
消息幂等
在MQ系统中,消息幂等有三种实现语义:
at most once 最多一次:每条消息最多只会被消费一次
at least once 至少一次:每条消息至少会被消费一次
exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。
RocketMQ 消息重复的场景
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复消息
消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复
包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
消息幂等解决方案
在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。
insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';
要实现消息的幂等,我们可能会采取这样的方案:
select * from t_order where order_no = 'order123'
if(order != null) {return ;//消息重复,直接返回
}
这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。
还可以通过以下方式处理:
- 使用数据库的行锁处理
- 利用分布式锁处理不同服务间的并发。
- 数据库对唯一值的入库字段设唯一索引。
相关文章:
Rocketmq消费消息时不丢失不重复
消息消费不丢失 手动ACK 在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业…...
RedisInsight——redis的桌面UI工具使用实践
下载 官网下载安装。下载地址在这里 填个邮箱地址就可以下载了。 安装使用。 安装成功后开始使用。 1. 你可以add一个地址。或者登录redis cloud 去auto-discover 2 . 新增你的redis库地址。注意index的取值 3。现在可以登录到redis了。看看结果 这是现在 在服务器上执行…...
JVM对象创建与内存分配
对象的创建 对象创建的主要流程: 类加载推荐博客:JVM类加载机制详解 类加载检查 虚拟机遇到一条new指令时,首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引用,并且检查这个符号引用代表的类是否已被加载、解析…...
央国企数字化转型难在哪?为什么要数字化转型?
科技在发展,技术在升级,全球信息化、数字化的步伐在加快,企业想要在未来的发展中抓住机会,更好地发展壮大,就需要加快企业数字化转型的速度,才能立足于信息化、数字化时代,央国企作为企业中的一…...
第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份
第7天:信息打点-资产泄漏&CMS识别&Git监控&SVN&DS_Store&备份 知识点: 一、cms指纹识别获取方式 网上开源的程序,得到名字就可以搜索直接获取到源码。 cms在线识别: CMS识别:https://www.yun…...
不可思议,红警居然开源了!
红警,准确的说应该叫“红色警戒”,是大部分 80/90 后记忆里跟游戏二字关系最深的情节。 相信每一名 80/90 后,都有一段难忘的红警岁月,甚至可以说很多人的青春,就叫红警! 说到红色警戒游戏,估计应该是很多…...
yolo系列模型训练数据集全流程制作方法(附数据增强代码)
yolo系列的模型在目标检测领域里面受众非常广,也十分流行,但是在使用yolo进行目标检测训练的时候,往往要将VOC格式的数据集转化为yolo专属的数据集,而yolo的训练数据集制作方法呢,最常见的也是有两种,下面我…...
4、FFmpeg命令行操作7
转封装(1) 保持编码格式: ffmpeg -i test.mp4 -vcodec copy -acodec copy test_copy.ts ffmpeg -i test.mp4 -codec copy test_copy2.ts 改变编码格式: ffmpeg -i test.mp4 -vcodec libx265 -acodec libmp3lame out_h265_mp3.mkv …...
算法进阶——链表中环的入口节点
题目 给一个长度为n链表,若其中包含环,请找出该链表的环的入口结点,否则,返回null。 数据范围:1<结点值<10000 要求:空间复杂度O(1),时间复杂度O(n) 例如,输入{1,2},{3,4,5…...
无线WiFi安全渗透与攻防(N.1)WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速
WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速 WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速1.渗透WIFI1.导入密码字典2.导入essid,破解完成记得删除3.批处理数据库,速度比较慢,耐心等待4.batch-table(批处理数据库)加…...
YOLOV8部署Android Studio安卓平台NCNN
下载Android Studio,配置安卓开发环境,这个过程比较漫长。 安装cmake,注意安装的是cmake3.10版本。 根据手机安卓版本选择相应的安卓版本,我的是红米K30Pro,安卓12。 使用腾讯开源的ncnn,这是一个为手机端极…...
【算法萌新闯力扣】:旋转字符串
力扣热题:796.旋转字符串 开篇 今天下午刷了6道力扣算法题,选了一道有多种解法的题目与大家分享。 题目链接:796.旋转字符串 题目描述 代码思路 完全按照题目的要求,利用StringBuffer中的方法对字符串进行旋转,寻找相同的一项 …...
可逆矩阵的性质
如果矩阵A可逆,那么它的逆矩阵也可逆,并且如果矩阵A可逆,假设是一个不为0的数,那么也可逆,并且如果矩阵A和都可逆,而且它们的阶数也相同,那么它们的乘积也是可逆的,并且如果矩阵A可逆…...
HIT 模式识别 手写汉字分类 Python实现
训练集数据 TrainSamples-400.csv,含 100 个不同汉字,每个汉字 400 个实例,每个实例均为 64*64 的二值图像; 训练集标注TrainSamples-400.csv,为 40000 个 0 到 99 间的整数,表示训练集中每个实例所属汉字类…...
GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。
内容来源:xiaohuggg GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。 它可以模拟人类浏览网页时的行为,如点击链接、填写表单、滚动页面等。 它通过视觉理解技术识别网页上的元素,就像…...
剪辑视频怎么把说话声音转成文字?
短视频已然成为了一种生活潮流,我们每天都在浏览各种短视频,或者用视频的形式记录生活,在制作视频的时候,字幕是一个很大的问题,给视频添加字幕可以更直观、更方便浏览。手动添加太费时间,下面就给大家分享…...
maven打包插件配置模板
主要有两类: 1、maven-shade-plugin 主要用于java程序编写的的打包 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</ve…...
clusterProfiler包学习
📖 Introduction | Biomedical Knowledge Mining using GOSemSim and clusterProfiler (yulab-smu.top) 部分使用 #GO classificationlibrary(clusterProfiler) data(geneList, package"DOSE") gene <- names(geneList)[abs(geneList) > 2]# Entre…...
【Qt开发流程之】布局管理
介绍 一个界面呈现,如果要让用户有更好的观感,布局必不可少。 【Qt之布局】QVBoxLayout、QHBoxLayout、QGridLayout、QFormLayout介绍及使用 链接: https://blog.csdn.net/MrHHHHHH/article/details/133915208 qt布局类图: Qt布局是Qt图形…...
建筑可视化中的 3D 纹理
在线工具推荐: 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 1、什么是 3D 纹理? 纹理是将二维图像添加到三维模型的技术艺术。虽然对物体进行纹…...
多模态2025:技术路线“神仙打架”,视频生成冲上云霄
文|魏琳华 编|王一粟 一场大会,聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中,汇集了学界、创业公司和大厂等三方的热门选手,关于多模态的集中讨论达到了前所未有的热度。其中,…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
为什么要创建 Vue 实例
核心原因:Vue 需要一个「控制中心」来驱动整个应用 你可以把 Vue 实例想象成你应用的**「大脑」或「引擎」。它负责协调模板、数据、逻辑和行为,将它们变成一个活的、可交互的应用**。没有这个实例,你的代码只是一堆静态的 HTML、JavaScript 变量和函数,无法「活」起来。 …...
Elastic 获得 AWS 教育 ISV 合作伙伴资质,进一步增强教育解决方案产品组合
作者:来自 Elastic Udayasimha Theepireddy (Uday), Brian Bergholm, Marianna Jonsdottir 通过搜索 AI 和云创新推动教育领域的数字化转型。 我们非常高兴地宣布,Elastic 已获得 AWS 教育 ISV 合作伙伴资质。这一重要认证表明,Elastic 作为 …...
如何做好一份技术文档?从规划到实践的完整指南
如何做好一份技术文档?从规划到实践的完整指南 🌟 嗨,我是IRpickstars! 🌌 总有一行代码,能点亮万千星辰。 🔍 在技术的宇宙中,我愿做永不停歇的探索者。 ✨ 用代码丈量世界&…...
