当前位置: 首页 > news >正文

Rocketmq消费消息时不丢失不重复

消息消费不丢失

手动ACK

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

spring boot中 消费消息确认

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",topic = "${carInInfo.topic}", selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param addCarInParkDTO*/@Overridepublic void onMessage(AddCarInParkDTO addCarInParkDTO) {System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));}

指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:

DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}

只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了

 private void initRocketMQPushConsumer() throws MQClientException {......switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}

消息重试

对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试

RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}

一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

每次重试的间隔时间如下:

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

消息幂等

在MQ系统中,消息幂等有三种实现语义:

at most once 最多一次:每条消息最多只会被消费一次

at least once 至少一次:每条消息至少会被消费一次

exactly once 刚刚好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。

at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。

at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。

而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。

RocketMQ 消息重复的场景

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复消息

消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

消息幂等解决方案

在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

select * from t_order where order_no = 'order123'
if(order  != null) {return ;//消息重复,直接返回
}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

还可以通过以下方式处理:

  • 使用数据库的行锁处理
  • 利用分布式锁处理不同服务间的并发。
  • 数据库对唯一值的入库字段设唯一索引。

相关文章:

Rocketmq消费消息时不丢失不重复

消息消费不丢失 手动ACK 在消费者端&#xff0c;需要确保在消息拉取并消费成功之后再给Broker返回ACK&#xff0c;就可以保证消息不丢失了&#xff0c;如果这个过程中Broker一直没收到ACK&#xff0c;那么就可以重试。所以&#xff0c;在消费者的代码中&#xff0c;一定要在业…...

RedisInsight——redis的桌面UI工具使用实践

下载 官网下载安装。下载地址在这里 填个邮箱地址就可以下载了。 安装使用。 安装成功后开始使用。 1. 你可以add一个地址。或者登录redis cloud 去auto-discover 2 . 新增你的redis库地址。注意index的取值 3。现在可以登录到redis了。看看结果 这是现在 在服务器上执行…...

JVM对象创建与内存分配

对象的创建 对象创建的主要流程&#xff1a; 类加载推荐博客&#xff1a;JVM类加载机制详解 类加载检查 虚拟机遇到一条new指令时&#xff0c;首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已被加载、解析…...

央国企数字化转型难在哪?为什么要数字化转型?

科技在发展&#xff0c;技术在升级&#xff0c;全球信息化、数字化的步伐在加快&#xff0c;企业想要在未来的发展中抓住机会&#xff0c;更好地发展壮大&#xff0c;就需要加快企业数字化转型的速度&#xff0c;才能立足于信息化、数字化时代&#xff0c;央国企作为企业中的一…...

第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份

第7天&#xff1a;信息打点-资产泄漏&CMS识别&Git监控&SVN&DS_Store&备份 知识点&#xff1a; 一、cms指纹识别获取方式 网上开源的程序&#xff0c;得到名字就可以搜索直接获取到源码。 cms在线识别&#xff1a; CMS识别&#xff1a;https://www.yun…...

不可思议,红警居然开源了!

红警&#xff0c;准确的说应该叫“红色警戒”&#xff0c;是大部分 80/90 后记忆里跟游戏二字关系最深的情节。 相信每一名 80/90 后&#xff0c;都有一段难忘的红警岁月&#xff0c;甚至可以说很多人的青春&#xff0c;就叫红警! 说到红色警戒游戏&#xff0c;估计应该是很多…...

yolo系列模型训练数据集全流程制作方法(附数据增强代码)

yolo系列的模型在目标检测领域里面受众非常广&#xff0c;也十分流行&#xff0c;但是在使用yolo进行目标检测训练的时候&#xff0c;往往要将VOC格式的数据集转化为yolo专属的数据集&#xff0c;而yolo的训练数据集制作方法呢&#xff0c;最常见的也是有两种&#xff0c;下面我…...

4、FFmpeg命令行操作7

转封装(1) 保持编码格式&#xff1a; ffmpeg -i test.mp4 -vcodec copy -acodec copy test_copy.ts ffmpeg -i test.mp4 -codec copy test_copy2.ts 改变编码格式&#xff1a; ffmpeg -i test.mp4 -vcodec libx265 -acodec libmp3lame out_h265_mp3.mkv …...

算法进阶——链表中环的入口节点

题目 给一个长度为n链表&#xff0c;若其中包含环&#xff0c;请找出该链表的环的入口结点&#xff0c;否则&#xff0c;返回null。 数据范围&#xff1a;1<结点值<10000 要求&#xff1a;空间复杂度O(1)&#xff0c;时间复杂度O(n) 例如&#xff0c;输入{1,2},{3,4,5…...

无线WiFi安全渗透与攻防(N.1)WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速

WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速 WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速1.渗透WIFI1.导入密码字典2.导入essid,破解完成记得删除3.批处理数据库,速度比较慢,耐心等待4.batch-table(批处理数据库)加…...

YOLOV8部署Android Studio安卓平台NCNN

下载Android Studio&#xff0c;配置安卓开发环境&#xff0c;这个过程比较漫长。 安装cmake&#xff0c;注意安装的是cmake3.10版本。 根据手机安卓版本选择相应的安卓版本&#xff0c;我的是红米K30Pro&#xff0c;安卓12。 使用腾讯开源的ncnn&#xff0c;这是一个为手机端极…...

【算法萌新闯力扣】:旋转字符串

力扣热题&#xff1a;796.旋转字符串 开篇 今天下午刷了6道力扣算法题&#xff0c;选了一道有多种解法的题目与大家分享。 题目链接:796.旋转字符串 题目描述 代码思路 完全按照题目的要求&#xff0c;利用StringBuffer中的方法对字符串进行旋转&#xff0c;寻找相同的一项 …...

可逆矩阵的性质

如果矩阵A可逆&#xff0c;那么它的逆矩阵也可逆&#xff0c;并且如果矩阵A可逆&#xff0c;假设是一个不为0的数&#xff0c;那么也可逆&#xff0c;并且如果矩阵A和都可逆&#xff0c;而且它们的阶数也相同&#xff0c;那么它们的乘积也是可逆的&#xff0c;并且如果矩阵A可逆…...

HIT 模式识别 手写汉字分类 Python实现

训练集数据 TrainSamples-400.csv&#xff0c;含 100 个不同汉字&#xff0c;每个汉字 400 个实例&#xff0c;每个实例均为 64*64 的二值图像&#xff1b; 训练集标注TrainSamples-400.csv&#xff0c;为 40000 个 0 到 99 间的整数&#xff0c;表示训练集中每个实例所属汉字类…...

GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。

内容来源&#xff1a;xiaohuggg GPT-4V-Act &#xff1a;一个多模态AI助手&#xff0c;能够像人类一样模拟通过鼠标和键盘进行网页浏览。 它可以模拟人类浏览网页时的行为&#xff0c;如点击链接、填写表单、滚动页面等。 它通过视觉理解技术识别网页上的元素&#xff0c;就像…...

剪辑视频怎么把说话声音转成文字?

短视频已然成为了一种生活潮流&#xff0c;我们每天都在浏览各种短视频&#xff0c;或者用视频的形式记录生活&#xff0c;在制作视频的时候&#xff0c;字幕是一个很大的问题&#xff0c;给视频添加字幕可以更直观、更方便浏览。手动添加太费时间&#xff0c;下面就给大家分享…...

maven打包插件配置模板

主要有两类&#xff1a; 1、maven-shade-plugin 主要用于java程序编写的的打包 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</ve…...

clusterProfiler包学习

&#x1f4d6; Introduction | Biomedical Knowledge Mining using GOSemSim and clusterProfiler (yulab-smu.top) 部分使用 #GO classificationlibrary(clusterProfiler) data(geneList, package"DOSE") gene <- names(geneList)[abs(geneList) > 2]# Entre…...

【Qt开发流程之】布局管理

介绍 一个界面呈现&#xff0c;如果要让用户有更好的观感&#xff0c;布局必不可少。 【Qt之布局】QVBoxLayout、QHBoxLayout、QGridLayout、QFormLayout介绍及使用 链接: https://blog.csdn.net/MrHHHHHH/article/details/133915208 qt布局类图&#xff1a; Qt布局是Qt图形…...

建筑可视化中的 3D 纹理

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 1、什么是 3D 纹理&#xff1f; 纹理是将二维图像添加到三维模型的技术艺术。虽然对物体进行纹…...

零售行业自动化解决方案选型,核心看这几点:企业级智能体架构与落地实测分析

当前&#xff0c;零售行业正处于从“信息化”向“智能化”跨越的关键拐点。 面对全渠道运营的复杂性、劳动力成本的持续攀升以及消费者对交付时效的极致追求&#xff0c; 自动化解决方案已成为零售企业降本增效的核心战略工具。 然而&#xff0c;市场中各类技术路径分化严重&am…...

NitroShare高效使用指南:从安装到定制的全流程解析

NitroShare高效使用指南&#xff1a;从安装到定制的全流程解析 【免费下载链接】nitroshare-desktop Network file transfer application for Windows, OS X, & Linux 项目地址: https://gitcode.com/gh_mirrors/ni/nitroshare-desktop NitroShare是一款跨Windows、…...

如何让AI创作速度提升3倍?智能缓存技术TeaCache的完整指南

如何让AI创作速度提升3倍&#xff1f;智能缓存技术TeaCache的完整指南 【免费下载链接】ComfyUI-TeaCache 项目地址: https://gitcode.com/gh_mirrors/co/ComfyUI-TeaCache 你是否曾为AI图像和视频生成的速度而烦恼&#xff1f;看着进度条缓慢移动&#xff0c;等待数分…...

opencv利用freetype写中文

1、ubuntu需要安装环境 sudo apt install libfreetype6-dev libharfbuzz-dev 2、opencv和opencv_contril编译&#xff0c;勾选下面按钮 3、下载字体库 https://github.com/StellarCN/scp_zh/tree/master/fonts 下载SimHei.ttf 4、代码 #include <opencv2/freetype.hpp…...

云计算案例排错(云上3)

故障1 CPU&内存配额错误 solo-1工作负载启动失败&#xff0c;提示&#xff1a;重启启动容器失败。 解决方案&#xff1a;看下solo-1的更新升级中的容器规划配置&#xff0c;是否是正确的配置&#xff08;CPU配额&#xff1a;申请0.25Core 限制0.29Core&#xff1b;内存配额…...

AudioLDM-S与LangGraph:构建音效生成工作流引擎

AudioLDM-S与LangGraph&#xff1a;构建音效生成工作流引擎 1. 引言 想象一下这样的场景&#xff1a;电影制作人需要为一场雨夜追逐戏配乐&#xff0c;传统的工作流程需要先搜索音效库&#xff0c;筛选合适的雨声、脚步声、轮胎摩擦声&#xff0c;然后进行剪辑、混音&#xf…...

Modules 模块化:头文件地狱真的要终结了吗?我持怀疑态度

各位来宾&#xff0c;各位技术同仁&#xff0c;大家好&#xff01;今天我们齐聚一堂&#xff0c;探讨一个在C社区引发广泛讨论、充满期待又饱含争议的话题&#xff1a;C模块化。特别是关于“头文件地狱真的要终结了吗&#xff1f;”这个问题&#xff0c;我深知在座的许多人&…...

别再只盯着find提权了!盘点Linux下5种更隐蔽的权限维持姿势与排查手册

超越find提权&#xff1a;Linux系统下5种高阶权限维持技术与深度排查指南 当攻击者成功获取Linux系统权限后&#xff0c;权限维持&#xff08;Persistence&#xff09;往往成为攻防对抗的核心战场。传统安全培训常聚焦于SUID提权等基础手段&#xff0c;但真实APT攻击中&#xf…...

自动驾驶模拟平台模型配置全指南:从技术选型到场景验证

自动驾驶模拟平台模型配置全指南&#xff1a;从技术选型到场景验证 【免费下载链接】alpasim 项目地址: https://gitcode.com/GitHub_Trending/al/alpasim 一、AlpaSim核心价值&#xff1a;构建自动驾驶研发闭环 AlpaSim作为开源自动驾驶模拟平台&#xff0c;通过模块…...

如何用20万条真实动作数据,终结机器人动作“脑补”

3月30日&#xff0c;某知名媒体报道了一项来自南洋理工大学的前沿技术突破。研究团队利用超过20万条“4D交互数据”结合“运动学锚定”&#xff0c;研发出一种新型的“生成式仿真”技术&#xff0c;有效解决了机器人动作模拟中长期存在的“脑补”难题。据悉&#xff0c;这一技术…...