当前位置: 首页 > news >正文

Rocketmq消费消息时不丢失不重复

消息消费不丢失

手动ACK

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

spring boot中 消费消息确认

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",topic = "${carInInfo.topic}", selectorExpression = "*",consumeMode = ConsumeMode.ORDERLY)
public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {/*** 请不要捕获异常信息,否则无法进行消息重新推送** @param addCarInParkDTO*/@Overridepublic void onMessage(AddCarInParkDTO addCarInParkDTO) {System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));}

指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:

DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现

public class DefaultMessageListenerOrderly implements MessageListenerOrderly {@SuppressWarnings("unchecked")@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgs) {log.debug("received msg: {}", messageExt);try {long now = System.currentTimeMillis();handleMessage(messageExt);long costTime = System.currentTimeMillis() - now;log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);} catch (Exception e) {log.warn("consume message failed. messageExt:{}", messageExt, e);context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}

只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了

 private void initRocketMQPushConsumer() throws MQClientException {......switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException("Property 'consumeMode' was wrong.");}if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);}}

消息重试

对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

如何让消息进行重试

RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {@Overridepublic void onMessage(String message) {System.out.println("Received message : "+ message);}@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// 设置最大重试次数consumer.setMaxReconsumeTimes(5);}
}

一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

每次重试的间隔时间如下:

死信队列

当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

死信队列的特征:

  • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
  • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
  • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
  • 死信队列中的消息不会再被消费者正常消费。
  • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

消息幂等

在MQ系统中,消息幂等有三种实现语义:

at most once 最多一次:每条消息最多只会被消费一次

at least once 至少一次:每条消息至少会被消费一次

exactly once 刚刚好一次:每条消息都只会确定的消费一次

这三种语义都有他适用的业务场景。

at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。

at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。

而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。

RocketMQ 消息重复的场景

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复消息

消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

消息幂等解决方案

在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

insert into t_order values .....
update t_inv set count = count-1 where good_id = 'good123';

要实现消息的幂等,我们可能会采取这样的方案:

select * from t_order where order_no = 'order123'
if(order  != null) {return ;//消息重复,直接返回
}

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

还可以通过以下方式处理:

  • 使用数据库的行锁处理
  • 利用分布式锁处理不同服务间的并发。
  • 数据库对唯一值的入库字段设唯一索引。

相关文章:

Rocketmq消费消息时不丢失不重复

消息消费不丢失 手动ACK 在消费者端&#xff0c;需要确保在消息拉取并消费成功之后再给Broker返回ACK&#xff0c;就可以保证消息不丢失了&#xff0c;如果这个过程中Broker一直没收到ACK&#xff0c;那么就可以重试。所以&#xff0c;在消费者的代码中&#xff0c;一定要在业…...

RedisInsight——redis的桌面UI工具使用实践

下载 官网下载安装。下载地址在这里 填个邮箱地址就可以下载了。 安装使用。 安装成功后开始使用。 1. 你可以add一个地址。或者登录redis cloud 去auto-discover 2 . 新增你的redis库地址。注意index的取值 3。现在可以登录到redis了。看看结果 这是现在 在服务器上执行…...

JVM对象创建与内存分配

对象的创建 对象创建的主要流程&#xff1a; 类加载推荐博客&#xff1a;JVM类加载机制详解 类加载检查 虚拟机遇到一条new指令时&#xff0c;首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已被加载、解析…...

央国企数字化转型难在哪?为什么要数字化转型?

科技在发展&#xff0c;技术在升级&#xff0c;全球信息化、数字化的步伐在加快&#xff0c;企业想要在未来的发展中抓住机会&#xff0c;更好地发展壮大&#xff0c;就需要加快企业数字化转型的速度&#xff0c;才能立足于信息化、数字化时代&#xff0c;央国企作为企业中的一…...

第7天:信息打点-资产泄漏amp;CMS识别amp;Git监控amp;SVNamp;DS_Storeamp;备份

第7天&#xff1a;信息打点-资产泄漏&CMS识别&Git监控&SVN&DS_Store&备份 知识点&#xff1a; 一、cms指纹识别获取方式 网上开源的程序&#xff0c;得到名字就可以搜索直接获取到源码。 cms在线识别&#xff1a; CMS识别&#xff1a;https://www.yun…...

不可思议,红警居然开源了!

红警&#xff0c;准确的说应该叫“红色警戒”&#xff0c;是大部分 80/90 后记忆里跟游戏二字关系最深的情节。 相信每一名 80/90 后&#xff0c;都有一段难忘的红警岁月&#xff0c;甚至可以说很多人的青春&#xff0c;就叫红警! 说到红色警戒游戏&#xff0c;估计应该是很多…...

yolo系列模型训练数据集全流程制作方法(附数据增强代码)

yolo系列的模型在目标检测领域里面受众非常广&#xff0c;也十分流行&#xff0c;但是在使用yolo进行目标检测训练的时候&#xff0c;往往要将VOC格式的数据集转化为yolo专属的数据集&#xff0c;而yolo的训练数据集制作方法呢&#xff0c;最常见的也是有两种&#xff0c;下面我…...

4、FFmpeg命令行操作7

转封装(1) 保持编码格式&#xff1a; ffmpeg -i test.mp4 -vcodec copy -acodec copy test_copy.ts ffmpeg -i test.mp4 -codec copy test_copy2.ts 改变编码格式&#xff1a; ffmpeg -i test.mp4 -vcodec libx265 -acodec libmp3lame out_h265_mp3.mkv …...

算法进阶——链表中环的入口节点

题目 给一个长度为n链表&#xff0c;若其中包含环&#xff0c;请找出该链表的环的入口结点&#xff0c;否则&#xff0c;返回null。 数据范围&#xff1a;1<结点值<10000 要求&#xff1a;空间复杂度O(1)&#xff0c;时间复杂度O(n) 例如&#xff0c;输入{1,2},{3,4,5…...

无线WiFi安全渗透与攻防(N.1)WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速

WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速 WPA渗透-pyrit:batch-table加速attack_db模块加速_“attack_db”模块加速1.渗透WIFI1.导入密码字典2.导入essid,破解完成记得删除3.批处理数据库,速度比较慢,耐心等待4.batch-table(批处理数据库)加…...

YOLOV8部署Android Studio安卓平台NCNN

下载Android Studio&#xff0c;配置安卓开发环境&#xff0c;这个过程比较漫长。 安装cmake&#xff0c;注意安装的是cmake3.10版本。 根据手机安卓版本选择相应的安卓版本&#xff0c;我的是红米K30Pro&#xff0c;安卓12。 使用腾讯开源的ncnn&#xff0c;这是一个为手机端极…...

【算法萌新闯力扣】:旋转字符串

力扣热题&#xff1a;796.旋转字符串 开篇 今天下午刷了6道力扣算法题&#xff0c;选了一道有多种解法的题目与大家分享。 题目链接:796.旋转字符串 题目描述 代码思路 完全按照题目的要求&#xff0c;利用StringBuffer中的方法对字符串进行旋转&#xff0c;寻找相同的一项 …...

可逆矩阵的性质

如果矩阵A可逆&#xff0c;那么它的逆矩阵也可逆&#xff0c;并且如果矩阵A可逆&#xff0c;假设是一个不为0的数&#xff0c;那么也可逆&#xff0c;并且如果矩阵A和都可逆&#xff0c;而且它们的阶数也相同&#xff0c;那么它们的乘积也是可逆的&#xff0c;并且如果矩阵A可逆…...

HIT 模式识别 手写汉字分类 Python实现

训练集数据 TrainSamples-400.csv&#xff0c;含 100 个不同汉字&#xff0c;每个汉字 400 个实例&#xff0c;每个实例均为 64*64 的二值图像&#xff1b; 训练集标注TrainSamples-400.csv&#xff0c;为 40000 个 0 到 99 间的整数&#xff0c;表示训练集中每个实例所属汉字类…...

GPT-4V-Act :一个多模态AI助手,能够像人类一样模拟通过鼠标和键盘进行网页浏览。

内容来源&#xff1a;xiaohuggg GPT-4V-Act &#xff1a;一个多模态AI助手&#xff0c;能够像人类一样模拟通过鼠标和键盘进行网页浏览。 它可以模拟人类浏览网页时的行为&#xff0c;如点击链接、填写表单、滚动页面等。 它通过视觉理解技术识别网页上的元素&#xff0c;就像…...

剪辑视频怎么把说话声音转成文字?

短视频已然成为了一种生活潮流&#xff0c;我们每天都在浏览各种短视频&#xff0c;或者用视频的形式记录生活&#xff0c;在制作视频的时候&#xff0c;字幕是一个很大的问题&#xff0c;给视频添加字幕可以更直观、更方便浏览。手动添加太费时间&#xff0c;下面就给大家分享…...

maven打包插件配置模板

主要有两类&#xff1a; 1、maven-shade-plugin 主要用于java程序编写的的打包 <build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</ve…...

clusterProfiler包学习

&#x1f4d6; Introduction | Biomedical Knowledge Mining using GOSemSim and clusterProfiler (yulab-smu.top) 部分使用 #GO classificationlibrary(clusterProfiler) data(geneList, package"DOSE") gene <- names(geneList)[abs(geneList) > 2]# Entre…...

【Qt开发流程之】布局管理

介绍 一个界面呈现&#xff0c;如果要让用户有更好的观感&#xff0c;布局必不可少。 【Qt之布局】QVBoxLayout、QHBoxLayout、QGridLayout、QFormLayout介绍及使用 链接: https://blog.csdn.net/MrHHHHHH/article/details/133915208 qt布局类图&#xff1a; Qt布局是Qt图形…...

建筑可视化中的 3D 纹理

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 1、什么是 3D 纹理&#xff1f; 纹理是将二维图像添加到三维模型的技术艺术。虽然对物体进行纹…...

dedecms 织梦自定义表单留言增加ajax验证码功能

增加ajax功能模块&#xff0c;用户不点击提交按钮&#xff0c;只要输入框失去焦点&#xff0c;就会提前提示验证码是否正确。 一&#xff0c;模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

深度学习习题2

1.如果增加神经网络的宽度&#xff0c;精确度会增加到一个特定阈值后&#xff0c;便开始降低。造成这一现象的可能原因是什么&#xff1f; A、即使增加卷积核的数量&#xff0c;只有少部分的核会被用作预测 B、当卷积核数量增加时&#xff0c;神经网络的预测能力会降低 C、当卷…...

React---day11

14.4 react-redux第三方库 提供connect、thunk之类的函数 以获取一个banner数据为例子 store&#xff1a; 我们在使用异步的时候理应是要使用中间件的&#xff0c;但是configureStore 已经自动集成了 redux-thunk&#xff0c;注意action里面要返回函数 import { configureS…...