RocketMQ重复消费的解决方案::分布式锁直击面试!
文章目录
- 场景分析
- 方法的幂等
- 分布式锁
- Redis实现分布式锁
- 抢锁的设计思路
- 分布式锁案例
- 直击面试
- rocketmq什么时候重复消费
- 消息丢失的问题
- 消息在哪里丢失
- 发送端确保发送成功并且配合失败的业务处理
- 消费端确保消息不丢失
- rocketmq 主从+同步刷盘
场景分析

分布式系统架构中,队列是分布式的,生产端是分布式集群,消费端也是分布式集群.
相当于有多个消费端同时监听队列,同时减库存,写入订单.
面试题:如何处理消息重复消费的问题. 重复消费大部分场景,需要解决的.
引入2个概念来解决: 幂等的业务方法,和消息的分布式锁.
方法的幂等
结论: 一个方法的一次业务逻辑调用和N次调用的结果是一致的,我们称这种方法就是幂等.
一旦重复消费,一定要把消费的业务逻辑方法(orderAdd)设计成幂等的.
- 幂等的方法
- GET方法: 查询方法,天生幂等.
- DELETE方法: 删除方法,天生幂等.
- PUT方法: 修改,并不是天生幂等,需要设计
- 减少库存:
- update stock_tbl set stock=stock-#{stock} where id=#{id}(不是幂等)
- select * from stock_log where order_id=#{orderId}(查询日志,判断是否已经见过库存了),没有数据 update stock_tbl set stock=stock-#{stock} where id=#{id},insert into stock_log (字段) values (订单id,商品减库存信息) (这样设计就幂等了,依然有问题)
- 减少库存:
- POST方法: 新增,并不是天生幂等,需要设计
- 新增订单: insert into order_tbl (order_id,order_item_id,count,user_id) values (各种属性);如果使用唯一属性校验,作用在order_id order_sn(编号).同一张订单,这个字段值是相同(幂等满足,没做幂等不满足)
- 当前orderAdd方法设计幂等的解决思路(之一)
- 使用订单id或者订单编号,**userId+商品id(**这个只满足当前我们的案例特点,不满足实际场景.)查询订单,如果已经存在了,库存不减少,订单不增了,购物车不用删除了
@Override
public void orderAdd(OrderAddDTO orderAddDTO) {//幂等设计思路: 利用userId和commodityCode 查询,如果已经存在了订单,方法直接执行结束//如果结果不存在,减库存,生单,删除购物车int count=orderMapper.selectExists(orderAddDTO);if (count>0){log.debug("订单已经新增了");return;}StockReduceCountDTO countDTO=new StockReduceCountDTO();countDTO.setCommodityCode(orderAddDTO.getCommodityCode());countDTO.setReduceCount(orderAddDTO.getCount());// 利用Dubbo调用stock模块减少库存的业务逻辑层方法实现功能stockService.reduceCommodityCount(countDTO);// 2.从购物车中删除用户选中的商品(调用Cart模块删除购物车中商品的方法)// 利用dubbo调用cart模块删除购物车中商品的方法实现功能Order order=new Order();BeanUtils.copyProperties(orderAddDTO,order);// 下面执行新增 假设insert是幂等的.orderMapper.insertOrder(order);log.info("新增订单信息为:{}",order);cartService.cartDelete(orderAddDTO);
}@Select("select count(id) from " +
"order_tbl where user_id=#{userId} and commodity_code=#{commodityCode}")
int selectExists(OrderAddDTO orderAddDTO);
分布式锁
当前分布式消费架构

即使,将方法设计成幂等,这个架构中,消息重复消费
,满足线程安全问题的所有因素
- 并发/多线程
- 写操作
- 共享数据
只要解决其中一点,线程安全问题就消失了.
并发多线程–>串行
写操作–> 避免写(不能满足当前案例,必须写)
共享数据–>个体数据(不能满足,重复消费,重复订单是前提)
分布式线程安全问题的解决方案—分布式锁
错误思路: 引入synchronized同步锁,不能解决分布式场景下,多个进程的并发线程安全问题.
概念: 分布式场景下,多进程,多线程并发的抢锁机制. 抢到资源锁,执行业务逻辑,抢不到等待或者放弃执行.能够避免对同一个资源出现并发多线程操作的解决方案.
和synchronized的区别在于 synchronizeds本地锁.管理一个进程中的多线程,分布式锁是管理多个进程中的多线程.
分布式锁当前落地方案: redis setnx命令
Redis实现分布式锁
抢锁的设计思路
-
目标: 多线程执行业务之前,先判断执行权限,抢锁,抢到锁的才能执行业务,抢不到的不执行.(当前案例中,抢锁,然后执行的业务逻辑是:orderAdd)

抢锁如何执行?: setnx key “” -
key值如何设计?: 需要结合业务,设计key值(redis中最主要的功能,都关系到key值的设计),抢锁的逻辑中,满足是业务数据,满足重复消费的重复数据.就可以实现这个key值的设计. 消息Id是重复的.
-
当前业务流程设计缺陷: 如果有一个消费者抢到锁了,执行了业务方法.执行完成后,没有释放锁的机制.如果引入等待重抢的机制,由于抢到锁的没有释放,会导致死锁.

释放锁的逻辑引入

上述整改的流程中避免了死锁问题,但是存在删除失败导致死锁的问题.

所以,要保证del释放没有成功,在redis也一定不会长期保存.

兜底的解决死锁问题.基本不会出现死锁了.
为了解决误删除的问题,抢锁的时候setnx key value值设计成一个随机数.

随机数两个消费,多个消费者生成相同的可能性极低.
分布式锁案例
@Component
@RocketMQMessageListener(topic = "business-order-topic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "orderAdd")
@Slf4j
public class OrderAddConsumerListener implements RocketMQListener<MessageExt> {@Autowiredprivate IOrderService orderService;@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic void onMessage(MessageExt msg) {//拿到底层消息对象的bodybyte[] body = msg.getBody();//尝试先解析成stringString orderJson=new String(body, StandardCharsets.UTF_8);System.out.println(orderJson);OrderAddDTO orderAddDTO=JSON.toJavaObject(JSON.parseObject(orderJson),OrderAddDTO.class);System.out.println(orderAddDTO);//1.生成锁的key值,生成当前这把锁的随机数//准备锁keyString msgKeyLock="msg:order:add:"+msg.getMsgId();//准备随机数 4 6 8位String randCode=new Random().nextInt(9000)+1000+"";ValueOperations<String, String> stringOps = redisTemplate.opsForValue();try{//补充消息消费的抢锁机制//2.抢锁 setnx msgKeyLock randCode expire 10sBoolean tryLockSuccess = stringOps.setIfAbsent(msgKeyLock, randCode, 10, TimeUnit.SECONDS);//3.判断 抢锁成功还是失败if(!tryLockSuccess){//3.2 失败了 可以等待5秒重新抢锁,也可以直接结束//尝试这里使用while编写等待5秒重新抢的逻辑log.info("有别人抢锁了,msgKey:{},value:{}",msgKeyLock,randCode);return;}//3.1 成功了 执行orderAddorderService.orderAdd(orderAddDTO);}catch (CoolSharkServiceException e){//业务异常,说明订单新增业务性失败,比如库存没了log.error("库存减少失败,库存触底了:{},异常信息:{}",orderAddDTO,e.getMessage());}finally {//释放锁 读以下锁的value值,等于当前生成value才释放String s = stringOps.get(msgKeyLock);if (s!=null && s.equals(randCode)){//del msgKeyLockredisTemplate.delete(msgKeyLock);}}}
}
直击面试
rocketmq什么时候重复消费
- 在broker做扩展的时候,消息队列的消息,做扩展的时候,原本存储在原队列的消息,会进行rebalance重平衡.
- 消费开始阶段
消费者consumer1 所在group1 绑定队列,push消费模式,使得消费者接受到了queue1 queue2的6条消息.消费过程,成功执行,即将返回确认.

总结: - 消费者并发消费的逻辑,同一组消费者绑定分布式队列,推送批量的消息
- 在某个消费者还没有来得及消费,或者没来得及返回确认给rocketmq,队列发生了扩容缩容
- rocketmq会对队列中所有的消息做rebalance重平衡(消息重新分配给不同队列),消费者绑定也充平衡
- 导致已经推送的但是未返回确认的消息,被发送给不同消费者多次.
消息丢失的问题
rocketmq kafka rabbitmq activemq都是队列.只要谈到其中一个.
- 重复消费的问题(方法必须设计成幂等,一旦设计成幂等,可能造成线程安全隐患,所以引入分布式锁)
- 消息丢失如何处理.
面试题:消息丢失如何处理.
消息在哪里丢失
- 发送没成功,没有解决不成功的业务逻辑
- rocketmq保存的时候,断电,宕机,丢失消息(运行的时候,消息存储在内存)
- 消费端丢失消息(没有成功处理消息,就直接返回success,并不是所有的消费逻辑都是先消费,再确认的,如果关注的是消费速度,不关注成功或者是否丢失,就可以这样处理)
发送端确保发送成功并且配合失败的业务处理
同步发送,接收发送结果,SEND_OK才结束.
客户端代码底层都有默认重试(retry 3 times).发送重试都失败了.
处理发送失败的逻辑.
- 发送到备用/失败的队列
- 记录日志,将消息来源,目标和消息内容,详细记录,等待监控系统,维护人员来直接处理
消费端确保消息不丢失
一定是先消息费,在确认,消费失败,返回失败(rocketmq消费点位保持原有位置不变,同一个消费者组,会重新拿到消息)
rocketmq 主从+同步刷盘

同步刷盘(消息数据可靠性保证): 如果持久化内存消息数据到磁盘失败,发送结果没有成功.
异步刷盘: 只要内存接收到了生产端的消息数据,数据是否持久化到磁盘,都会给生产端发送成功接收信息.
主从的双机热备: broker可以配置主从,考虑数据可靠性,和性能,一般主master做同步刷盘,slave做异步刷盘.(都同步刷盘,100%保证消息只要到达rocketmq就不会丢失,但是性能不能保证.)
相关文章:
RocketMQ重复消费的解决方案::分布式锁直击面试!
文章目录 场景分析方法的幂等分布式锁Redis实现分布式锁抢锁的设计思路 分布式锁案例 直击面试rocketmq什么时候重复消费消息丢失的问题消息在哪里丢失发送端确保发送成功并且配合失败的业务处理消费端确保消息不丢失rocketmq 主从同步刷盘 场景分析 分布式系统架构中,队列是分…...
如何降低TCP在局域网环境下的数据传输延迟
以Ping为例。本案例是一个测试题目,只有现象展示,不含解决方案。 ROS_Kinetic_26 使用rosserial_windows实现windows与ROS master发送与接收消息_windows 接收ros1 消息 什么是ping? AI: ping是互联网控制消息协议(…...
【LeetCode】78.子集
题目 给你一个整数数组 nums ,数组中的元素 互不相同 。返回该数组所有可能的子集(幂集)。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1: 输入:nums [1,2,3] 输出:[[],[1],[2],[1…...
认可功能介绍 - 技术声誉靠认可
需求 大家在学习和工作中, 经常碰到一些热心帮助自己的人, 我们怎么向他们表示感谢呢? 各位博主在 CSDN 也做了很多贡献,也有不少用户在做各种各样的社区活动,这些活动给我们的领军人物什么回馈呢? 这些…...
EtherNet/IP转CAN网关can协议标准
生产管理设备中,会有设备与其他设备的协议不同,数据无法互通,让你的工作陷入困境。这时,一款神奇的产品出现了——远创智控YC-EIP-CAN通讯网关! 1, 这款通讯网关采用ETHERNET/IP从站功能,可以将各种CAN总线…...
解决代理IP负载均衡与性能优化的双重挑战
在当今数字化时代,代理IP的应用范围日益广泛,它不仅在数据爬取、网络抓取等领域发挥着重要作用,也成为网络安全和隐私保护的有力工具。然而,面对庞大的数据流量和复杂的网络环境,如何实现代理IP的负载均衡和性能优化成…...
深度探索 Elasticsearch 8.X:function_score 参数解读与实战案例分析
在 Elasticsearch 中,function_score 可以让我们在查询的同时对搜索结果进行自定义评分。 function_score 提供了一系列的参数和函数让我们可以根据需求灵活地进行设置。 近期有同学反馈,function_score 的相关参数不好理解,本文将深入探讨 f…...
测牛学堂:软件测试之andorid app性能测试面试知识点总结(二)
APP性能测试指标之FPS 如果经常玩游戏的同学应该听过FPS。 FPS本来是图像领域中的概念,是指画面每秒传输的帧数。每秒钟帧数越多,所显示的动作就会越流畅。 但是因为功耗的限制,一般60fps就是跑满的效果了。 我们测试的话,一般…...
尚医通06:数据字典+EasyExcel+mongodb
内容介绍 1、数据字典列表前端 2、EasyExcel介绍、实例 3、数据字典导出接口、前端 4、数据字典导入接口、前端 5、数据字典添加redis缓存 6、MongoDB简介 7、MongoDB安装 8、MongoDB基本概念 数据字典列表前端 1、测试问题 (1)报错日志 &am…...
【前端知识】React 基础巩固(三十二)——Redux的三大原则、使用流程及实践
React 基础巩固(三十二)——Redux的三大原则 一、Redux的三大原则 单一数据源 整个应用程序的state被存储在一颗object tree 中,并且这个object tree 只存储在一个store中;Redux并没有强制让我们不能创建多个Store,但是那样做不利于数据维护…...
[NLP]使用Alpaca-Lora基于llama模型进行微调教程
Stanford Alpaca 是在 LLaMA 整个模型上微调,即对预训练模型中的所有参数都进行微调(full fine-tuning)。但该方法对于硬件成本要求仍然偏高且训练低效。 [NLP]理解大型语言模型高效微调(PEFT) 因此, Alpaca-Lora 则是利用 Lora…...
Linux Shell 脚本编程学习之【第5章 文件的排序、合并与分割 (第四部分之cut命令) 】
第5章 文件的排序、合并与分割 (第四部分之cut命令) 4 cut 命令4.1 选项及其意义4.2 输出字符 (-c)4.3 改变分隔符(-d)和提取特定域(-f) 5 paste 命令5.1 paste 命令选项及其意义5.2…...
php-golang-rpc jsonrpc和php客户端tivoka/tivoka包实践
golang 代码: package main import ( "fmt" "net" "net/rpc" "net/rpc/jsonrpc" ) type App struct{} type Res struct { Code int json:"code" Msg string json:"msg" Data any json:"…...
flutter 打包iOS安装包
flutter iOS Xcode打包并导出ipa文件安装包 1、 Xcode配置 1、 启动打包 1、 等待打包 1、 打包完成、准备导出ipa 1、 选择模式 1、 选择配置文件 1、 导出 1、 选择导出位置 1、 得到ipa...
二进制重排
二进制重排作用 二进制重排的主要目的是将连续调用的函数连接到相邻的虚拟内存地址,这样在启动时可以减少缺页中断的发生,提升启动速度。目前网络上关于ios应用启动优化,通过XCode实现的版本比较多。MacOS上的应用也是通过clang进行编译的&am…...
【Linux后端服务器开发】MAC地址与其他重要协议
目录 一、以太网 二、MAC地址 三、MTU 四、ARP协议 五、DNS系统 六、ICMP协议 七、NAT技术 八、代理服务器 一、以太网 “以太网”不是一种具体的网路,而是一种技术标准:既包含了数据链路层的内容,也包含了一些物理层的内容…...
WebGPU入门
1. 引言 前序博客: CUDA入门WebGPUZKP:客户端证明 WebGPU——Draft 2023.7.17 由苹果、谷歌、Mozilla团队发起,当前处于草稿阶段,旨在成为W3C推荐标准。 WebGPU为 在图形处理单元(GPU)上执行诸如渲染和…...
React Dva项目中.roadhogrc.mock.js直接自动导入mock目录下所有文件方式
上文 React Dva项目中模仿网络请求数据方法 中,我们书写了Dva项目模拟后端数据的方式 但是 我们.roadhogrc.mock.js中的这个处理其实并不好用 我们还需要一个一个的引入 我们可以直接靠一段代码 import fs from fs; import path from path; const mock {} fs.re…...
跨境独立站如何应对恶意网络爬虫?
目录 跨境出海独立站纷纷成立 爬虫威胁跨境电商生存 如何有效识别爬虫? 技术反爬方案 防爬虫才能保发展 中国出海跨境电商业务,主要选择大平台开设店铺,例如,亚马逊、eBay、Walmart、AliExpress、Zalando等。随着业务的扩大&…...
C# SourceGenerator 源生成器初探
简介 注意: 坑极多。而且截至2023年,这个东西仅仅是半成品 利用SourceGenerator可以在编译结束前生成一些代码参与编译,比如编译时反射之类的,还有模板代码生成都很好用。 演示仓库传送门-Github-yueh0607 使用 1. 创建项目 …...
大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
Unity3D中Gfx.WaitForPresent优化方案
前言 在Unity中,Gfx.WaitForPresent占用CPU过高通常表示主线程在等待GPU完成渲染(即CPU被阻塞),这表明存在GPU瓶颈或垂直同步/帧率设置问题。以下是系统的优化方案: 对惹,这里有一个游戏开发交流小组&…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂
蛋白质结合剂(如抗体、抑制肽)在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上,高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术,但这类方法普遍面临资源消耗巨大、研发周期冗长…...
HTML 列表、表格、表单
1 列表标签 作用:布局内容排列整齐的区域 列表分类:无序列表、有序列表、定义列表。 例如: 1.1 无序列表 标签:ul 嵌套 li,ul是无序列表,li是列表条目。 注意事项: ul 标签里面只能包裹 li…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
浪潮交换机配置track检测实现高速公路收费网络主备切换NQA
浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求,本次涉及的主要是收费汇聚交换机的配置,浪潮网络设备在高速项目很少,通…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
