当前位置: 首页 > news >正文

RocketMQ重复消费的解决方案::分布式锁直击面试!

文章目录

  • 场景分析
    • 方法的幂等
    • 分布式锁
      • Redis实现分布式锁
        • 抢锁的设计思路
    • 分布式锁案例
  • 直击面试
    • rocketmq什么时候重复消费
    • 消息丢失的问题
      • 消息在哪里丢失
      • 发送端确保发送成功并且配合失败的业务处理
      • 消费端确保消息不丢失
      • rocketmq 主从+同步刷盘

场景分析

在这里插入图片描述
分布式系统架构中,队列是分布式的,生产端是分布式集群,消费端也是分布式集群.

相当于有多个消费端同时监听队列,同时减库存,写入订单.

面试题:如何处理消息重复消费的问题. 重复消费大部分场景,需要解决的.

引入2个概念来解决: 幂等的业务方法,和消息的分布式锁.

方法的幂等

结论: 一个方法的一次业务逻辑调用和N次调用的结果是一致的,我们称这种方法就是幂等.

一旦重复消费,一定要把消费的业务逻辑方法(orderAdd)设计成幂等的.

  • 幂等的方法
    • GET方法: 查询方法,天生幂等.
    • DELETE方法: 删除方法,天生幂等.
    • PUT方法: 修改,并不是天生幂等,需要设计
      • 减少库存:
        • update stock_tbl set stock=stock-#{stock} where id=#{id}(不是幂等)
        • select * from stock_log where order_id=#{orderId}(查询日志,判断是否已经见过库存了),没有数据 update stock_tbl set stock=stock-#{stock} where id=#{id},insert into stock_log (字段) values (订单id,商品减库存信息) (这样设计就幂等了,依然有问题)
    • POST方法: 新增,并不是天生幂等,需要设计
      • 新增订单: insert into order_tbl (order_id,order_item_id,count,user_id) values (各种属性);如果使用唯一属性校验,作用在order_id order_sn(编号).同一张订单,这个字段值是相同(幂等满足,没做幂等不满足)
  • 当前orderAdd方法设计幂等的解决思路(之一)
    • 使用订单id或者订单编号,**userId+商品id(**这个只满足当前我们的案例特点,不满足实际场景.)查询订单,如果已经存在了,库存不减少,订单不增了,购物车不用删除了
@Override
public void orderAdd(OrderAddDTO orderAddDTO) {//幂等设计思路: 利用userId和commodityCode 查询,如果已经存在了订单,方法直接执行结束//如果结果不存在,减库存,生单,删除购物车int count=orderMapper.selectExists(orderAddDTO);if (count>0){log.debug("订单已经新增了");return;}StockReduceCountDTO countDTO=new StockReduceCountDTO();countDTO.setCommodityCode(orderAddDTO.getCommodityCode());countDTO.setReduceCount(orderAddDTO.getCount());// 利用Dubbo调用stock模块减少库存的业务逻辑层方法实现功能stockService.reduceCommodityCount(countDTO);// 2.从购物车中删除用户选中的商品(调用Cart模块删除购物车中商品的方法)// 利用dubbo调用cart模块删除购物车中商品的方法实现功能Order order=new Order();BeanUtils.copyProperties(orderAddDTO,order);// 下面执行新增 假设insert是幂等的.orderMapper.insertOrder(order);log.info("新增订单信息为:{}",order);cartService.cartDelete(orderAddDTO);
}@Select("select count(id) from " +
"order_tbl where user_id=#{userId} and commodity_code=#{commodityCode}")
int selectExists(OrderAddDTO orderAddDTO);

分布式锁

当前分布式消费架构
在这里插入图片描述
即使,将方法设计成幂等,这个架构中,消息重复消费

,满足线程安全问题的所有因素

  • 并发/多线程
  • 写操作
  • 共享数据

只要解决其中一点,线程安全问题就消失了.

并发多线程–>串行

写操作–> 避免写(不能满足当前案例,必须写)

共享数据–>个体数据(不能满足,重复消费,重复订单是前提)

分布式线程安全问题的解决方案—分布式锁

错误思路: 引入synchronized同步锁,不能解决分布式场景下,多个进程的并发线程安全问题.

概念: 分布式场景下,多进程,多线程并发的抢锁机制. 抢到资源锁,执行业务逻辑,抢不到等待或者放弃执行.能够避免对同一个资源出现并发多线程操作的解决方案.

和synchronized的区别在于 synchronizeds本地锁.管理一个进程中的多线程,分布式锁是管理多个进程中的多线程.

分布式锁当前落地方案: redis setnx命令

Redis实现分布式锁

抢锁的设计思路

  • 目标: 多线程执行业务之前,先判断执行权限,抢锁,抢到锁的才能执行业务,抢不到的不执行.(当前案例中,抢锁,然后执行的业务逻辑是:orderAdd)
    在这里插入图片描述
    抢锁如何执行?: setnx key “”

  • key值如何设计?: 需要结合业务,设计key值(redis中最主要的功能,都关系到key值的设计),抢锁的逻辑中,满足是业务数据,满足重复消费的重复数据.就可以实现这个key值的设计. 消息Id是重复的.

  • 当前业务流程设计缺陷: 如果有一个消费者抢到锁了,执行了业务方法.执行完成后,没有释放锁的机制.如果引入等待重抢的机制,由于抢到锁的没有释放,会导致死锁.
    在这里插入图片描述
    释放锁的逻辑引入
    在这里插入图片描述
    上述整改的流程中避免了死锁问题,但是存在删除失败导致死锁的问题.
    在这里插入图片描述
    所以,要保证del释放没有成功,在redis也一定不会长期保存.
    在这里插入图片描述
    兜底的解决死锁问题.基本不会出现死锁了.
    在这里插入图片描述为了解决误删除的问题,抢锁的时候setnx key value值设计成一个随机数.
    在这里插入图片描述
    随机数两个消费,多个消费者生成相同的可能性极低.

分布式锁案例

@Component
@RocketMQMessageListener(topic = "business-order-topic",consumerGroup = "${rocketmq.consumer.group}",selectorExpression = "orderAdd")
@Slf4j
public class OrderAddConsumerListener implements RocketMQListener<MessageExt> {@Autowiredprivate IOrderService orderService;@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic void onMessage(MessageExt msg) {//拿到底层消息对象的bodybyte[] body = msg.getBody();//尝试先解析成stringString orderJson=new String(body, StandardCharsets.UTF_8);System.out.println(orderJson);OrderAddDTO orderAddDTO=JSON.toJavaObject(JSON.parseObject(orderJson),OrderAddDTO.class);System.out.println(orderAddDTO);//1.生成锁的key值,生成当前这把锁的随机数//准备锁keyString msgKeyLock="msg:order:add:"+msg.getMsgId();//准备随机数 4 6 8位String randCode=new Random().nextInt(9000)+1000+"";ValueOperations<String, String> stringOps = redisTemplate.opsForValue();try{//补充消息消费的抢锁机制//2.抢锁 setnx msgKeyLock randCode expire 10sBoolean tryLockSuccess = stringOps.setIfAbsent(msgKeyLock, randCode, 10, TimeUnit.SECONDS);//3.判断 抢锁成功还是失败if(!tryLockSuccess){//3.2 失败了 可以等待5秒重新抢锁,也可以直接结束//尝试这里使用while编写等待5秒重新抢的逻辑log.info("有别人抢锁了,msgKey:{},value:{}",msgKeyLock,randCode);return;}//3.1 成功了 执行orderAddorderService.orderAdd(orderAddDTO);}catch (CoolSharkServiceException e){//业务异常,说明订单新增业务性失败,比如库存没了log.error("库存减少失败,库存触底了:{},异常信息:{}",orderAddDTO,e.getMessage());}finally {//释放锁 读以下锁的value值,等于当前生成value才释放String s = stringOps.get(msgKeyLock);if (s!=null && s.equals(randCode)){//del msgKeyLockredisTemplate.delete(msgKeyLock);}}}
}

直击面试

rocketmq什么时候重复消费

  • 在broker做扩展的时候,消息队列的消息,做扩展的时候,原本存储在原队列的消息,会进行rebalance重平衡.
  • 消费开始阶段
    在这里插入图片描述消费者consumer1 所在group1 绑定队列,push消费模式,使得消费者接受到了queue1 queue2的6条消息.消费过程,成功执行,即将返回确认.
    在这里插入图片描述
    总结:
  • 消费者并发消费的逻辑,同一组消费者绑定分布式队列,推送批量的消息
  • 在某个消费者还没有来得及消费,或者没来得及返回确认给rocketmq,队列发生了扩容缩容
  • rocketmq会对队列中所有的消息做rebalance重平衡(消息重新分配给不同队列),消费者绑定也充平衡
  • 导致已经推送的但是未返回确认的消息,被发送给不同消费者多次.

消息丢失的问题

rocketmq kafka rabbitmq activemq都是队列.只要谈到其中一个.

  1. 重复消费的问题(方法必须设计成幂等,一旦设计成幂等,可能造成线程安全隐患,所以引入分布式锁)
  2. 消息丢失如何处理.

面试题:消息丢失如何处理.

消息在哪里丢失

  • 发送没成功,没有解决不成功的业务逻辑
  • rocketmq保存的时候,断电,宕机,丢失消息(运行的时候,消息存储在内存)
  • 消费端丢失消息(没有成功处理消息,就直接返回success,并不是所有的消费逻辑都是先消费,再确认的,如果关注的是消费速度,不关注成功或者是否丢失,就可以这样处理)

发送端确保发送成功并且配合失败的业务处理

同步发送,接收发送结果,SEND_OK才结束.

客户端代码底层都有默认重试(retry 3 times).发送重试都失败了.

处理发送失败的逻辑.

  1. 发送到备用/失败的队列
  2. 记录日志,将消息来源,目标和消息内容,详细记录,等待监控系统,维护人员来直接处理

消费端确保消息不丢失

一定是先消息费,在确认,消费失败,返回失败(rocketmq消费点位保持原有位置不变,同一个消费者组,会重新拿到消息)

rocketmq 主从+同步刷盘

在这里插入图片描述
同步刷盘(消息数据可靠性保证): 如果持久化内存消息数据到磁盘失败,发送结果没有成功.

异步刷盘: 只要内存接收到了生产端的消息数据,数据是否持久化到磁盘,都会给生产端发送成功接收信息.

主从的双机热备: broker可以配置主从,考虑数据可靠性,和性能,一般主master做同步刷盘,slave做异步刷盘.(都同步刷盘,100%保证消息只要到达rocketmq就不会丢失,但是性能不能保证.)

相关文章:

RocketMQ重复消费的解决方案::分布式锁直击面试!

文章目录 场景分析方法的幂等分布式锁Redis实现分布式锁抢锁的设计思路 分布式锁案例 直击面试rocketmq什么时候重复消费消息丢失的问题消息在哪里丢失发送端确保发送成功并且配合失败的业务处理消费端确保消息不丢失rocketmq 主从同步刷盘 场景分析 分布式系统架构中,队列是分…...

如何降低TCP在局域网环境下的数据传输延迟

以Ping为例。本案例是一个测试题目&#xff0c;只有现象展示&#xff0c;不含解决方案。 ROS_Kinetic_26 使用rosserial_windows实现windows与ROS master发送与接收消息_windows 接收ros1 消息 什么是ping&#xff1f; AI&#xff1a; ping是互联网控制消息协议&#xff08;…...

【LeetCode】78.子集

题目 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[],[1],[2],[1…...

认可功能介绍 - 技术声誉靠认可

需求 大家在学习和工作中&#xff0c; 经常碰到一些热心帮助自己的人&#xff0c; 我们怎么向他们表示感谢呢&#xff1f; 各位博主在 CSDN 也做了很多贡献&#xff0c;也有不少用户在做各种各样的社区活动&#xff0c;这些活动给我们的领军人物什么回馈呢&#xff1f; 这些…...

EtherNet/IP转CAN网关can协议标准

生产管理设备中&#xff0c;会有设备与其他设备的协议不同&#xff0c;数据无法互通&#xff0c;让你的工作陷入困境。这时&#xff0c;一款神奇的产品出现了——远创智控YC-EIP-CAN通讯网关&#xff01; 1, 这款通讯网关采用ETHERNET/IP从站功能&#xff0c;可以将各种CAN总线…...

解决代理IP负载均衡与性能优化的双重挑战

在当今数字化时代&#xff0c;代理IP的应用范围日益广泛&#xff0c;它不仅在数据爬取、网络抓取等领域发挥着重要作用&#xff0c;也成为网络安全和隐私保护的有力工具。然而&#xff0c;面对庞大的数据流量和复杂的网络环境&#xff0c;如何实现代理IP的负载均衡和性能优化成…...

深度探索 Elasticsearch 8.X:function_score 参数解读与实战案例分析

在 Elasticsearch 中&#xff0c;function_score 可以让我们在查询的同时对搜索结果进行自定义评分。 function_score 提供了一系列的参数和函数让我们可以根据需求灵活地进行设置。 近期有同学反馈&#xff0c;function_score 的相关参数不好理解&#xff0c;本文将深入探讨 f…...

测牛学堂:软件测试之andorid app性能测试面试知识点总结(二)

APP性能测试指标之FPS 如果经常玩游戏的同学应该听过FPS。 FPS本来是图像领域中的概念&#xff0c;是指画面每秒传输的帧数。每秒钟帧数越多&#xff0c;所显示的动作就会越流畅。 但是因为功耗的限制&#xff0c;一般60fps就是跑满的效果了。 我们测试的话&#xff0c;一般…...

尚医通06:数据字典+EasyExcel+mongodb

内容介绍 1、数据字典列表前端 2、EasyExcel介绍、实例 3、数据字典导出接口、前端 4、数据字典导入接口、前端 5、数据字典添加redis缓存 6、MongoDB简介 7、MongoDB安装 8、MongoDB基本概念 数据字典列表前端 1、测试问题 &#xff08;1&#xff09;报错日志 &am…...

【前端知识】React 基础巩固(三十二)——Redux的三大原则、使用流程及实践

React 基础巩固(三十二)——Redux的三大原则 一、Redux的三大原则 单一数据源 整个应用程序的state被存储在一颗object tree 中&#xff0c;并且这个object tree 只存储在一个store中&#xff1b;Redux并没有强制让我们不能创建多个Store&#xff0c;但是那样做不利于数据维护…...

[NLP]使用Alpaca-Lora基于llama模型进行微调教程

Stanford Alpaca 是在 LLaMA 整个模型上微调&#xff0c;即对预训练模型中的所有参数都进行微调&#xff08;full fine-tuning&#xff09;。但该方法对于硬件成本要求仍然偏高且训练低效。 [NLP]理解大型语言模型高效微调(PEFT) 因此&#xff0c; Alpaca-Lora 则是利用 Lora…...

Linux Shell 脚本编程学习之【第5章 文件的排序、合并与分割 (第四部分之cut命令) 】

第5章 文件的排序、合并与分割 &#xff08;第四部分之cut命令&#xff09; 4 cut 命令4.1 选项及其意义4.2 输出字符 &#xff08;-c&#xff09;4.3 改变分隔符&#xff08;-d&#xff09;和提取特定域&#xff08;-f&#xff09; 5 paste 命令5.1 paste 命令选项及其意义5.2…...

php-golang-rpc jsonrpc和php客户端tivoka/tivoka包实践

golang 代码&#xff1a; package main import ( "fmt" "net" "net/rpc" "net/rpc/jsonrpc" ) type App struct{} type Res struct { Code int json:"code" Msg string json:"msg" Data any json:"…...

flutter 打包iOS安装包

flutter iOS Xcode打包并导出ipa文件安装包 1、 Xcode配置 1、 启动打包 1、 等待打包 1、 打包完成、准备导出ipa 1、 选择模式 1、 选择配置文件 1、 导出 1、 选择导出位置 1、 得到ipa...

二进制重排

二进制重排作用 二进制重排的主要目的是将连续调用的函数连接到相邻的虚拟内存地址&#xff0c;这样在启动时可以减少缺页中断的发生&#xff0c;提升启动速度。目前网络上关于ios应用启动优化&#xff0c;通过XCode实现的版本比较多。MacOS上的应用也是通过clang进行编译的&am…...

【Linux后端服务器开发】MAC地址与其他重要协议

目录 一、以太网 二、MAC地址 三、MTU 四、ARP协议 五、DNS系统 六、ICMP协议 七、NAT技术 八、代理服务器 一、以太网 “以太网”不是一种具体的网路&#xff0c;而是一种技术标准&#xff1a;既包含了数据链路层的内容&#xff0c;也包含了一些物理层的内容&#xf…...

WebGPU入门

1. 引言 前序博客&#xff1a; CUDA入门WebGPUZKP&#xff1a;客户端证明 WebGPU——Draft 2023.7.17 由苹果、谷歌、Mozilla团队发起&#xff0c;当前处于草稿阶段&#xff0c;旨在成为W3C推荐标准。 WebGPU为 在图形处理单元&#xff08;GPU&#xff09;上执行诸如渲染和…...

React Dva项目中.roadhogrc.mock.js直接自动导入mock目录下所有文件方式

上文 React Dva项目中模仿网络请求数据方法 中&#xff0c;我们书写了Dva项目模拟后端数据的方式 但是 我们.roadhogrc.mock.js中的这个处理其实并不好用 我们还需要一个一个的引入 我们可以直接靠一段代码 import fs from fs; import path from path; const mock {} fs.re…...

跨境独立站如何应对恶意网络爬虫?

目录 跨境出海独立站纷纷成立 爬虫威胁跨境电商生存 如何有效识别爬虫&#xff1f; 技术反爬方案 防爬虫才能保发展 中国出海跨境电商业务&#xff0c;主要选择大平台开设店铺&#xff0c;例如&#xff0c;亚马逊、eBay、Walmart、AliExpress、Zalando等。随着业务的扩大&…...

C# SourceGenerator 源生成器初探

简介 注意&#xff1a; 坑极多。而且截至2023年&#xff0c;这个东西仅仅是半成品 利用SourceGenerator可以在编译结束前生成一些代码参与编译&#xff0c;比如编译时反射之类的&#xff0c;还有模板代码生成都很好用。 演示仓库传送门-Github-yueh0607 使用 1. 创建项目 …...

如何高效配置Kodi PVR IPTV Simple:专业级家庭IPTV直播系统部署指南

如何高效配置Kodi PVR IPTV Simple&#xff1a;专业级家庭IPTV直播系统部署指南 【免费下载链接】pvr.iptvsimple IPTV Simple client for Kodi PVR 项目地址: https://gitcode.com/gh_mirrors/pv/pvr.iptvsimple Kodi PVR IPTV Simple是一款功能强大的开源IPTV客户端插…...

别再踩坑了!Django Ckeditor配置全指南:从基础使用到高级定制(2023最新版)

Django Ckeditor实战手册&#xff1a;2023年高效配置与深度定制技巧 如果你正在为Django项目寻找一个功能强大且可定制的富文本编辑器&#xff0c;Ckeditor无疑是最佳选择之一。但配置过程中那些令人头疼的兼容性问题、图片上传失败、工具栏自定义困难&#xff0c;确实让不少开…...

从NDVI到地表温度:用ENVI Band Math一次性搞定植被与热环境分析

ENVI波段运算实战&#xff1a;NDVI与地表温度的高效批量处理技巧 遥感影像分析中&#xff0c;植被指数和地表温度是最基础却又最关键的指标。传统操作流程往往需要反复切换不同工具模块&#xff0c;既耗时又容易出错。而ENVI的Band Math功能就像一把瑞士军刀&#xff0c;能将这…...

UDS诊断服务-10例程控制服务(0x31)实战:从协议解析到车辆传感器校准

1. 从车辆抖动问题认识0x31服务的重要性 去年夏天&#xff0c;我遇到一辆行驶里程8万公里的SUV&#xff0c;车主反映急加速时发动机抖动明显。用诊断仪读取故障码显示"P0172 - 燃油修正系统过浓"&#xff0c;但更换氧传感器和火花塞后问题依旧。这时候就需要请出我们…...

DCT-Net人像卡通化效果展示:高清人脸转二次元虚拟形象作品集

DCT-Net人像卡通化效果展示&#xff1a;高清人脸转二次元虚拟形象作品集 一键将真人照片变成二次元虚拟形象&#xff0c;体验AI绘画的神奇魅力 1. 效果惊艳&#xff1a;从真人到二次元的华丽变身 DCT-Net人像卡通化技术能够将普通的人物照片转换成精美的二次元虚拟形象&#x…...

Umi-OCR技术解密:离线文字识别的3大创新与全行业实践指南

Umi-OCR技术解密&#xff1a;离线文字识别的3大创新与全行业实践指南 【免费下载链接】Umi-OCR Umi-OCR: 这是一个免费、开源、可批量处理的离线OCR软件&#xff0c;适用于Windows系统&#xff0c;支持截图OCR、批量OCR、二维码识别等功能。 项目地址: https://gitcode.com/G…...

C++并发编程实战:std::atomic的exchange与compare_exchange操作到底怎么选?

C并发编程实战&#xff1a;std::atomic的exchange与compare_exchange操作到底怎么选&#xff1f; 在构建高性能并发系统时&#xff0c;开发者常面临一个关键抉择&#xff1a;当需要原子更新共享数据时&#xff0c;究竟该选择exchange、compare_exchange_weak还是compare_exchan…...

为什么纯向量 RAG 难以支撑长记忆?Graph RAG 的架构优势解析

前几天在调试一个企业级 Agent 时&#xff0c;遇到一个经典崩溃点&#xff1a;当用户问起“去年 10 月项目 A 失败的根本原因是什么”时&#xff0c;纯向量搜索&#xff08;Vector Search&#xff09;直接输出了几个毫不相关的会议纪要片段。 这是企业知识库问答中最常见的一类…...

嵌入式开发中回调函数的解耦实践与高级应用

1. 回调函数在嵌入式开发中的解耦实践在嵌入式系统开发中&#xff0c;模块间的耦合度直接影响着代码的可维护性和可扩展性。最近我在重构一个智能家居项目时&#xff0c;就遇到了模块间强耦合导致修改困难的问题。通过引入回调函数机制&#xff0c;成功将原本紧密交织的代码逻辑…...

SAP物料账核心:手把手配置OBYC中的GBB与PRD科目(含OMSK评估类关联详解)

SAP物料账核心&#xff1a;手把手配置OBYC中的GBB与PRD科目&#xff08;含OMSK评估类关联详解&#xff09; 在SAP系统中&#xff0c;物料账管理是连接物流与财务的关键桥梁。对于财务人员而言&#xff0c;理解物料移动如何触发财务过账&#xff0c;以及如何通过后台配置实现精准…...