当前位置: 首页 > news >正文

RabbitMQ深度探索:消息幂等性问题

  1. RabbitMQ 消息自动重试机制:
    1. 让我们消费者处理我们业务代码的时候,如果抛出异常的情况下,在这时候 MQ 会自动触发重试机制,默认的情况下 RabbitMQ 时无限次数的重试
    2. 需要认为指定重试次数限制问题
  2. 在什么情况下消费者实现重试策略:
    1. 消费者调用第三方接口,但是调用第三方接口失败后,需要实现重试策略,网络延迟只是暂时调不通,重试多次有可能会调通
    2. 消费者获取代码后,因为代码问题抛出数据异常,此时不需要实现重试策略
      1. 我们需要将日志存放起来,后期通过定时任务或者人工补偿形式
      2. 如果是重试多次还是失败消息,需要重新发布消费者版本实现消费
      3. 可以使用死信队列
    3. MQ 在重试的过程中,可能会引发消费者重复消费的问题
    4. MQ 消费者需要解决幂等性问题
      1. 幂等性:保证数据唯一
  3. 解决幂等性问题:
    1. 生产者在投递消息的时候,生成一个唯一 id 放在我们消息中
    2. 消费者获取到该消息,可以根据全局唯一 id 实现去重
    3. 全局唯一 id 根据业务来定的,订单号码作为全局的 id 
    4. 实际上还是需要在 DB 层面解决数据防重复
    5. 业务逻辑是在做 insert 操作使用唯一主键约束
    6. 业务逻辑是在做 update 操作,使用乐观锁
      1. 当消费者业务逻辑代码中抛出异常自动实现重试(默认是无数次重试)
      2. 应该对 RabbitMQ 重试次数实现限制,比如最多重试 5 次,每次间隔 30 秒
      3. 重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿
  4. 如何选择消息重试:
    1. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败后是否要重试?
    2. 消费者获取消息后,如果代码问题抛出数据异常,是否需要重试?
    3. 总结:
      1. 如果消费者处理消息时,因为代码原因抛出异常是需要重新发布版本才能解决,就不要重试
      2. 存放到死信队列或者是数据库记录、后期人工实现补偿
  5. 实现:
    1. yml 文件:
      spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: boyatopVirtualHostlistener:simple:retry:#开启消费者进行重试(程序异常的情况)enabled: true#最大重试次数max-attempts: 5#重试间隔时间initial-interval: 3000#手动确认机制acknowledge-mode: manualdatasource:url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8username: rootpassword: rootdriver-class-name: com.mysql.jdbc.Driverboyatop:#备胎交换机dlx:exchange: boyatop_dlx_exchangequeue: boyatop_dlx_queueroutingKey: dlx#普通交换机order:exchange: boyatop_order_exchangequeue: boyatop_order_queueroutingKey: order
    2. 配置类:
      @Component
      public class IdempotentExchangeConfig {//交换机@Value("${boyatop.order.exchange}")private  String order_exchange;//普通队列@Value("${boyatop.order.queue}")private String order_queue;//普通队列的 key@Value("${boyatop.order.routingKey}")private String order_rotingKey;//死信交换机@Value("${boyatop.dlx.exchange}")private String dlx_exchange;//死信队列@Value("${boyatop.dlx.queue}")private String dlx_queue;//死信队列的 key@Value("${boyatop.dlx.routingKey}")private String dlx_routingKey;//定义死信交换机@Beanpublic DirectExchange dlxExchange(){return new DirectExchange(dlx_exchange);}//定义死信队列@Beanpublic Queue dlxQueue(){return new Queue(dlx_queue);}//定义普通交换机@Beanpublic DirectExchange orderExchange(){return new DirectExchange(order_exchange);}//定义普通队列@Beanpublic Queue orderQueue(){//订单队列绑定死信交换机Map<String,Object> arguments = new HashMap<>(2);arguments.put("x-dead-letter-exchange",dlx_exchange);arguments.put("x-dead-letter-routing-key",dlx_routingKey);return new Queue(order_queue,true,false,false,arguments);
      //        return QueueBuilder.durable(order_queue).withArguments(arguments).build();}//订单队列绑定交换机@Beanpublic Binding bindingOrderExchange(DirectExchange orderExchange, Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderExchange).with(order_rotingKey);}//死信队列绑定交换机@Beanpublic Binding bindingDlxExchange(DirectExchange dlxExchange, Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlx_routingKey);}}
    3. 实体类:
      @Data
      @NoArgsConstructor
      public class OrderEntity implements Serializable {private Integer id;private String orderName;private String orderId;public OrderEntity(String orderName, String orderId) {this.orderName = orderName;this.orderId = orderId;}
      }
    4. Mapper:
      public interface OrderMapper {@Insert("INSERT into order_entity value (null,#{orderName},#{orderId})")int addOrder(OrderEntity orderEntity);@Select("select * from order_entity where order_id = #{orderId} ")OrderEntity getOrder(String orderId);
      }
    5. 生产者:
      @Component
      @Slf4j
      public class OrderProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Value("${boyatop.order.exchange}")private  String order_exchange;//普通队列的 key@Value("${boyatop.order.routingKey}")private String order_rotingKey;public void sendMsg(String orderName,String orderId){OrderEntity orderEntity = new OrderEntity(orderName,orderId);rabbitTemplate.convertAndSend(order_exchange,order_rotingKey,orderEntity,message -> {message.getMessageProperties().setExpiration("5000");return message;});}
      }
    6. 消费者:
      @Component
      @Slf4j
      @RabbitListener(queues = "boyatop_order_queue")
      public class OrderConsumer {@Autowiredprivate OrderMapper orderMapper;@RabbitHandlerpublic void process(OrderEntity orderEntity, Message message, Channel channel){try{String orderId = orderEntity.getOrderId();if(StringUtils.isEmpty(orderId)){return;}OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);if(dbOrderEntity != null){//出现异常,消息拒收,进入死信队列人为处理channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}int result = orderMapper.addOrder(orderEntity);//出现异常int i = 1 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println("监听内容:" + orderEntity);}catch (Exception e){// 记录该消息日志形式  存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿//将该消息存放到死信队列中,单独写一个死信消费者实现消费。}}
      }

相关文章:

RabbitMQ深度探索:消息幂等性问题

RabbitMQ 消息自动重试机制&#xff1a; 让我们消费者处理我们业务代码的时候&#xff0c;如果抛出异常的情况下&#xff0c;在这时候 MQ 会自动触发重试机制&#xff0c;默认的情况下 RabbitMQ 时无限次数的重试需要认为指定重试次数限制问题 在什么情况下消费者实现重试策略…...

Linux网络 | 进入数据链路层,学习相关协议与概念

前言&#xff1a;本节内容进入博主讲解的网络层级中的最后一层&#xff1a;数据链路层。 首先博主还是会线代友友们认识一下数据链路层的报文。 然后会带大家重新理解一些概念&#xff0c;比如局域网交换机等等。然后就是ARP协议。 讲完这些&#xff0c; 本节任务就算结束。 那…...

芝法酱学习笔记(2.6)——flink-cdc监听mysql binlog并同步数据至elastic-search和更新redis缓存

一、需求背景 在有的项目中&#xff0c;尤其是进销存类的saas软件&#xff0c;一开始为了快速把产品做出来&#xff0c;并没有考虑缓存问题。而这类软件&#xff0c;有着复杂的业务逻辑。如果想在原先的代码中&#xff0c;添加redis缓存&#xff0c;改动面将非常大&#xff0c…...

JavaScript系列(58)--性能监控系统详解

JavaScript性能监控系统详解 &#x1f4ca; 今天&#xff0c;让我们深入探讨JavaScript的性能监控系统。性能监控对于保证应用的稳定性和用户体验至关重要。 性能监控基础概念 &#x1f31f; &#x1f4a1; 小知识&#xff1a;JavaScript性能监控是指通过收集和分析各种性能指…...

GESP2023年12月认证C++六级( 第三部分编程题(1)闯关游戏)

参考程序代码&#xff1a; #include <cstdio> #include <cstdlib> #include <cstring> #include <algorithm> #include <string> #include <map> #include <iostream> #include <cmath> using namespace std;const int N 10…...

git 新项目

新项目git 新建的项目如何进行git 配置git git config --global user.name "cc" git config --global user.email ccexample.com配置远程仓库路径 // 添加 git remote add origin http://gogs/cc/mc.git //如果配错了&#xff0c;删除 git remote remove origin初…...

系统URL整合系列视频一(需求方案)

视频 系统URL整合系列视频一&#xff08;需求方案&#xff09; 视频介绍 &#xff08;全国&#xff09;某大型分布式系统Web资源URL整合需求实现方案讲解。当今社会各行各业对软件系统的web资源访问权限控制越来越严格&#xff0c;控制粒度也越来越细。安全级别提高的同时也增…...

Vue.js 使用组件库构建 UI

Vue.js 使用组件库构建 UI 在 Vue.js 项目中&#xff0c;构建漂亮又高效的用户界面&#xff08;UI&#xff09;是很重要的一环。组件库就是你开发 UI 的好帮手&#xff0c;它可以大大提高开发效率&#xff0c;减少重复工作&#xff0c;还能让你的项目更具一致性和专业感。今天…...

计算图 Compute Graph 和自动求导 Autograd | PyTorch 深度学习实战

前一篇文章&#xff0c;Tensor 基本操作5 device 管理&#xff0c;使用 GPU 设备 | PyTorch 深度学习实战 本系列文章 GitHub Repo: https://github.com/hailiang-wang/pytorch-get-started PyTorch 计算图和 Autograd 微积分之于机器学习Computational Graphs 计算图Autograd…...

51单片机入门_05_LED闪烁(常用的延时方法:软件延时、定时器延时;while循环;unsigned char 可以表示的数字是0~255)

本篇介绍编程实现LED灯闪烁&#xff0c;需要学到一些新的C语言知识。由于单片机执行的速度是非常快的&#xff0c;如果不进行延时的话&#xff0c;人眼是无法识别(停留时间要大于20ms)出LED灯是否在闪烁所以需要学习如何实现软件延时。另外IO口与一个字节位的数据对应关系。 文…...

如何获取sql数据中时间的月份、年份(类型为date)

可用自带的函数month来实现 如&#xff1a; 创建表及插入数据&#xff1a; create table test (id int,begindate datetime) insert into test values (1,2015-01-01) insert into test values (2,2015-02-01) 执行sql语句,获取月份&#xff1a; select MONTH(begindate)…...

【单层神经网络】softmax回归的从零开始实现(图像分类)

softmax回归 该回归分析为后续的多层感知机做铺垫 基本概念 softmax回归用于离散模型预测&#xff08;分类问题&#xff0c;含标签&#xff09; softmax运算本质上是对网络的多个输出进行了归一化&#xff0c;使结果有一个统一的判断标准&#xff0c;不必纠结为什么要这么算…...

使用开源项目:pdf2docx,让PDF转换为Word

目录 1.安装python 2.安装 pdf2docx 3.使用 pdf2docx 转换 PDF 到 Word pdf2docx&#xff1a;GitCode - 全球开发者的开源社区,开源代码托管平台 环境&#xff1a;windows电脑 1.安装python Download Python | Python.org 最好下载3.8以上的版本 安装时记得选择上&#…...

保姆级教程Docker部署KRaft模式的Kafka官方镜像

目录 一、安装Docker及可视化工具 二、单节点部署 1、创建挂载目录 2、运行Kafka容器 3、Compose运行Kafka容器 4、查看Kafka运行状态 三、集群部署 四、部署可视化工具 1、创建挂载目录 2、运行Kafka-ui容器 3、Compose运行Kafka-ui容器 4、查看Kafka-ui运行状态 …...

ChatGPT提问技巧:行业热门应用提示词案例--咨询法律知识

ChatGPT除了可以协助办公&#xff0c;写作文案和生成短视频脚本外&#xff0c;和还可以做为一个法律工具&#xff0c;当用户面临一些法律知识盲点时&#xff0c;可以向ChatGPT咨询获得解答。赋予ChatGPT专家的身份&#xff0c;用户能够得到较为满意的解答。 1.咨询法律知识 举…...

openRv1126 AI算法部署实战之——Tensorflow模型部署实战

在RV1126开发板上部署Tensorflow算法&#xff0c;实时目标检测RTSP传输。视频演示地址 rv1126 yolov5 实时目标检测 rtsp传输_哔哩哔哩_bilibili ​ 一、准备工作 从官网下载tensorflow模型和数据集 手动在线下载&#xff1a; https://github.com/tensorflow/models/b…...

STM32 TIM定时器配置

TIM简介 TIM&#xff08;Timer&#xff09;定时器 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元&#xff0c;在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能&#xff…...

51单片机 05 矩阵键盘

嘻嘻&#xff0c;LCD在RC板子上可以勉强装上&#xff0c;会有一点歪。 一、矩阵键盘 在键盘中按键数量较多时&#xff0c;为了减少I/O口的占用&#xff0c;通常将按键排列成矩阵形式&#xff1b;采用逐行或逐列的“扫描”&#xff0c;就可以读出任何位置按键的状态。&#xf…...

SSRF 漏洞利用 Redis 实战全解析:原理、攻击与防范

目录 前言 SSRF 漏洞深度剖析 Redis&#xff1a;强大的内存数据库 Redis 产生漏洞的原因 SSRF 漏洞利用 Redis 实战步骤 准备环境 下载安装 Redis 配置漏洞环境 启动 Redis 攻击机远程连接 Redis 利用 Redis 写 Webshell 防范措施 前言 在网络安全领域&#xff0…...

kubernetes学习-配置管理(九)

一、ConfigMap &#xff08;1&#xff09;通过指定目录&#xff0c;创建configmap # 创建一个config目录 [rootk8s-master k8s]# mkdir config[rootk8s-master k8s]# cd config/ [rootk8s-master config]# mkdir test [rootk8s-master config]# cd test [rootk8s-master test…...

松江少儿英语口碑好的?

松江少儿英语口碑好的 环球乐学少儿英语&#xff0c;指出幼儿英语学习三大痛点&#xff1a; 1. 兴趣不足易抵触&#xff1a;教学形式枯燥&#xff0c;多以机械记单词、跟读为主&#xff0c;不符合幼儿认知特点&#xff0c;易产生厌学情绪。 2. 缺语境不会运用&#xff1a…...

高并发场景下的FUTURE POLICE服务架构设计

高并发场景下的FUTURE POLICE服务架构设计 最近和几个做智能语音项目的朋友聊天&#xff0c;大家普遍遇到一个头疼的问题&#xff1a;模型效果不错&#xff0c;但用户一多&#xff0c;服务就卡顿甚至崩溃。特别是像FUTURE POLICE这类语音合成模型&#xff0c;生成一段高质量的…...

Cursor滑跪开源技术报告:Kimi基模这样微调能干翻Claude

Cursor滑跪开源技术报告&#xff1a;Kimi基模这样微调能干翻Claude 导读&#xff1a;当"套壳"成为一门技术活&#xff0c;Cursor用一份技术报告告诉我们&#xff1a;基于中国开源模型Kimi K2.5&#xff0c;通过持续预训练异步强化学习&#xff0c;完全可以在代码Agen…...

FlyByWire A32NX与A380X实战指南:5个提升飞行模拟体验的关键技巧

FlyByWire A32NX与A380X实战指南&#xff1a;5个提升飞行模拟体验的关键技巧 【免费下载链接】aircraft The A32NX & A380X Project are community driven open source projects to create free Airbus aircraft in Microsoft Flight Simulator that is as close to realit…...

Qwen2.5-72B-Instruct实战:vLLM + FastAPI 构建标准化OpenAI兼容接口

Qwen2.5-72B-Instruct实战&#xff1a;vLLM FastAPI 构建标准化OpenAI兼容接口 1. 模型介绍 Qwen2.5-72B-Instruct-GPTQ-Int4是Qwen大语言模型系列的最新版本&#xff0c;代表了当前开源大模型领域的顶尖水平。这个72B参数的指令调优模型经过GPTQ 4-bit量化处理&#xff0c;…...

开源阅读鸿蒙版:打造完全自定义的鸿蒙电子书阅读器终极指南

开源阅读鸿蒙版&#xff1a;打造完全自定义的鸿蒙电子书阅读器终极指南 【免费下载链接】legado-Harmony 开源阅读鸿蒙版仓库 项目地址: https://gitcode.com/gh_mirrors/le/legado-Harmony 开源阅读鸿蒙版是一款专为HarmonyOS系统设计的免费开源电子书阅读器应用&#…...

模拟面试回答第十三问:JVM内存模型

JVM简介 JVM是Java程序运行的基石&#xff0c;包括程序计数器&#xff0c;两种栈&#xff0c;堆和方法区五个区域。包含保存类元数据&#xff0c;保存方法字节码执行顺序&#xff0c;保存符号引用与直接地址的映射&#xff0c;为对象实例分配内存&#xff0c;为堆中内存分配对象…...

Phi-3-mini-128k-instruct创意写作效果集锦:技术博客、邮件、周报一键生成

Phi-3-mini-128k-instruct创意写作效果集锦&#xff1a;技术博客、邮件、周报一键生成 每次打开文档&#xff0c;面对空白的页面&#xff0c;你是不是也有过那种“万事开头难”的感觉&#xff1f;特别是写技术博客、整理会议邮件、或者汇总项目周报的时候&#xff0c;明明脑子…...

Aspose.Words避坑指南:Java实现Word转PDF时如何去除水印(2023最新版)

Aspose.Words商业应用实战&#xff1a;Java版Word转PDF无水印解决方案深度解析 在企业级文档处理系统中&#xff0c;Word到PDF的转换需求几乎无处不在——合同归档、报告生成、电子发票导出等场景都依赖这一基础功能。作为Java开发者&#xff0c;当我们选择Aspose.Words这一业界…...

MGeo中文地址结构化教程:从原始文本到标准GeoJSON格式输出的完整转换流程

MGeo中文地址结构化教程&#xff1a;从原始文本到标准GeoJSON格式输出的完整转换流程 1. 引言&#xff1a;为什么我们需要地址结构化&#xff1f; 你有没有遇到过这样的场景&#xff1f;用户填写的收货地址五花八门&#xff1a;“北京市海淀区中关村大街27号”、“北京海淀中…...