RabbitMQ使用StringRedisTemplate-防止重复消费
造成重复消费的原因:
MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消费者再次推送该条message,这样就造成了重复消费。
解决重复消费的办法:
用存储(redis或者mysql)记录一下已经消费的message的id,当message被消费前先去存储中查一下消费记录,没有该条message的id则正常消费返回ack,有该条message的id的话不用消费直接返回ack给MQ。
当然实际生产中的话选用redis是比较好的选择,毕竟查mysql要进行磁盘IO,效率要低得多,而且绝大多数重复消费都是由于MQ没有收到消费者的ack于是造成MQ再次向消费者进行同一条message的投递。所以message的消费记录其实我们并不需要一直记录,只需要保存一段时间,当下次投递过来的时候消费者能查到消费记录然后准确返回ack给MQ就行。
yml
#配置rabbitMq 服务器rabbitmq:host: xxxx#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口port: xxxxusername: xxxxpassword: xxxx#虚拟host 可以不设置,使用server默认hostvirtual-host: xxxxconnection-timeout: 0#确认消息已发送到队列(Queue)publisher-returns: true #确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 设置消费端手动 acklistener:simple:retry:# 开启消费者(程序出现异常的情况下)进行重试enabled: true#重试间隔时间max-interval: 1000# 最大重试次数max-attempts: 3#开启手动确认消息acknowledge-mode: manual
监听类
package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel; import com.rabbitmqprovider.commons.CommonUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重复消费*/ @Slf4j @Service public class TestBasicService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以写在类、方法上* @param channel* @param message* @throws IOException*///@RabbitListener(queues = {CommonUtils.queueStr})@RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId= message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");//判断messageId在redis中是否存在boolean flage=stringRedisTemplate(messageId,msg);if(!flage){log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息}else{//如果要防止 重复消费,则需要将 id值存在 redis,每次 都要去redis中拿id比对,是否存在,存在则消费过->messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->"+redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判断Key是否存在* @param messageId 唯一表示key* @param msg value值* @return*/private boolean stringRedisTemplate(String messageId,String msg){log.info("messageId="+messageId);//判断Key是否存在 有则返回true,没有则返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;} }
------------------------------------------controller--------------------------------------------------
/*** 解决重复消费问题*/ @GetMapping("/sendMessageTestOnly") public void sendMessageTestOnly(){JSONObject jsonObject = new JSONObject();jsonObject.put("message","世界很大!");jsonObject.put("msg","你想去看看么?");String json = jsonObject.toJSONString();String messageId=UUID.randomUUID()+"";Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString())); }
---------------------------------回调------------------------------------------------------
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/ @Slf4j @Component public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);} }
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;/*** 当消息由生产者发到交换机后会回调该接口中的confirm方法*/ @Component @Slf4j public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败* cause 表示失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info("交换机收到消息 消息内容为{}->", correlationData);}else {log.info("交换机未收到消息消息内容为{}, 原因为{}->", correlationData, cause);}}}
-----------------------------------------------------------------------------------------------------------------
执行顺序时:先发送消息;然后在接收消息,并判断消息是否重复,如果不重复 则回复消息,否则 拒绝回复;最后回调。
相关文章:
RabbitMQ使用StringRedisTemplate-防止重复消费
造成重复消费的原因: MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消…...
临沂大学张继群寄语
目录 寄语 1、不能有不良睹好 2、坚毅的个性和勤奋的品质 3、会存钱...
线程学习笔记
1:Thread 线程的生命周期控制 2:Runnable 可执行的任务和程序 3:Callable 执行程序后返回结果 4:Future 收集程序返回结果 5:Executor 线程池 6:ForkJoin 默认线程池 每个线程有工作队列 工作窃取 7:RunnableFuture FutureTask 实现 Runnable 和 Future 执…...

代码随想录算法训练营第四十二天|01背包问题,你该了解这些!、01背包问题,你该了解这些! 滚动数组 、416. 分割等和子集
文章目录 01背包问题,你该了解这些!01背包问题,你该了解这些! 滚动数组416. 分割等和子集 01背包问题,你该了解这些! 题目链接:代码随想录 二维数组解决0-1背包问题 解题思路: 1.dp…...
结构体指针、数组指针和结构体数组指针
结构体指针 首先让我们定义结构体: struct stu { char name[20]; long number; float score[4]; }; 再定义指向结构体类型变量的指针变量: struct stu *student; /*定义结构体类型指针*/ student malloc(sizeof(struct stu)); /*为指针变量分…...
项目架构一些注意点
考虑系统的 稳定性 一、微服务的稳定性 1、如何解决那些不稳定的因素/问题?也是常说的如何容错。 2、一个系统的高可用取决于它本身和其强依赖的组件的高可用 3、消除单点 保活机制 健康检查 注册中心如何保障稳定性 注册中心集群 微服务本身对注册信息的本地持…...

Forefront GPT-4免费版:开启无限畅聊时代,乐享人工智能快感,无限制“白嫖”,还能和N多角色一起聊天?赶紧注册,再过些时间估计就要收费了
目录 前言注册登录方式应用体验聊天体验绘图体验 “是打算先免费后收费吗?”建议其它资料下载 前言 近期,人工智能技术迎来重大飞跃,OpenAI的ChatGPT等工具成为全球数亿人探索提高生产力和增强创造力的新方法。人们现在可以使用人工智能驱动…...

深入浅出 Compose Compiler(1) Kotlin Compiler KCP
前言 Compose 的语法简洁、代码效率非常高,这主要得益于 Compose Compiler 的一系列编译期魔法,帮开发者生成了很多样板代码。但编译期插桩也阻碍了我们对于 Compose 运行原理的认知,想要真正读懂 Compose 就必须先了解它的 Compiler。本系列…...

BatchNormalization和LayerNormalization的理解、适用范围、PyTorch代码示例
文章目录 为什么要NormalizationBatchNormLayerNormtorch代码示例 学习神经网络归一化时,文章形形色色,但没找到适合小白通俗易懂且全面的。学习过后,特此记录。 为什么要Normalization 当输入数据量级极大或极小时,为保证输出数…...

大数据 | 实验二:文档倒排索引算法实现
文章目录 📚实验目的📚实验平台📚实验内容🐇在本地编写程序和调试🥕代码框架思路🥕代码实现 🐇在集群上提交作业并执行🥕在集群上提交作业并执行,同本地执行相比即需修改…...
Java文档注释-JavaDoc标签
标签含义author指定作者{code}使用代码字体以原样显示信息,不处理HTML样式deprecated指定程序元素已经过时{docRoot}指定当前文档的根目录路径exception标识由方法或构造函数抛出的异常{inheritDoc}从直接超类中继承注释{link}插入指向另外一个主题的内联链接{linkp…...

黑盒测试过程中【测试方法】详解5-输入域,输出域,猜错法
在黑盒测试过程中,有9种常用的方法:1.等价类划分 2.边界值分析 3.判定表法 4.正交实验法 5.流程图分析 6.因果图法 7.输入域覆盖法 8.输出域覆盖法 9.猜错法 黑盒测试过程中【测试方法】讲解1-等价类,边界值,判定表_朝一…...
Python学习之sh(shell脚本)在Python中的使用
文章目录 前言一、sh是什么?二、使用步骤1.安装2.使用示例3.使用sh执行命令4.关键字参数5.查找命令6.Baking参数 前言 本文章向大家介绍[Python库]分析一个python库–sh(系统调用),主要内容包括其使用实例、应用技巧、基本知识点…...
追求卓越:编写高质量代码的方法和技巧
本文讨论了编写高质量代码的重要性,并详细介绍了高质量代码的特征、编程实践技巧和软件工程方法论。通过遵循这些原则和实践,程序员可以编写出更稳定、可维护和可扩展的代码。 一、 前言 写出高质量代码是每个程序员的追求和目标。高质量的代码可以使程…...
MATLAB算法实战应用案例精讲-【人工智能】机器视觉(概念篇)(最终篇)
目录 前言 几个高频面试题目 如何评价一个光源的好坏? 如何依靠光源增强图像对比度?...

【老王读SpringMVC-3】根据 url 是如何找到 controller method 的?
前面分析了 request 与 handler method 映射关系的注册,现在再来分析一下 SpringMVC 是如何根据 request 来获取对应的 handler method 的? 可能有人会说,既然已经将 request 与 handler method 映射关系注册保存在了 AbstractHandlerMethodMapping.Ma…...

人机交互到艺术设计及玫瑰花绘制实例
Python库之图形用户界面 Riverbank Computing | Introduction Welcome to wxPython! | wxPython Overview — PyGObject Python库之游戏开发 https://www.pygame.org/news Panda3D | Open Source Framework for 3D Rendering & Games python.cocos2d.org Python库之…...

多臂老虎机问题
1.问题简介 多臂老虎机问题可以被看作简化版的强化学习问题,算是最简单的“和环境交互中的学习”的一种形式,不存在状态信息,只有动作和奖励。多臂老虎机中的探索与利用(exploration vs. exploitation)问题一直以来都…...
DNS 查询原理详解
DNS(Domain Name System)是互联网上的一种命名系统,它将域名转换为IP地址。在进行DNS查询时,先要明确需要查询的主机名,然后向本地DNS服务器发出查询请求。 1. 本地DNS服务器查询 当用户在浏览器中输入一个URL或者点…...

浅谈软件测试工程师的技能树
软件测试工程师是一个历史很悠久的职位,可以说从有软件开发这个行业以来,就开始有了软件测试工程师的角色。随着时代的发展,软件测试工程师的角色和职责也在悄然发生着变化,从一开始单纯的在瀑布式开发流程中担任测试阶段的执行者…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...

智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...

Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
Python爬虫(二):爬虫完整流程
爬虫完整流程详解(7大核心步骤实战技巧) 一、爬虫完整工作流程 以下是爬虫开发的完整流程,我将结合具体技术点和实战经验展开说明: 1. 目标分析与前期准备 网站技术分析: 使用浏览器开发者工具(F12&…...

《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

解析两阶段提交与三阶段提交的核心差异及MySQL实现方案
引言 在分布式系统的事务处理中,如何保障跨节点数据操作的一致性始终是核心挑战。经典的两阶段提交协议(2PC)通过准备阶段与提交阶段的协调机制,以同步决策模式确保事务原子性。其改进版本三阶段提交协议(3PC…...
[USACO23FEB] Bakery S
题目描述 Bessie 开了一家面包店! 在她的面包店里,Bessie 有一个烤箱,可以在 t C t_C tC 的时间内生产一块饼干或在 t M t_M tM 单位时间内生产一块松糕。 ( 1 ≤ t C , t M ≤ 10 9 ) (1 \le t_C,t_M \le 10^9) (1≤tC,tM≤109)。由于空间…...

Java中HashMap底层原理深度解析:从数据结构到红黑树优化
一、HashMap概述与核心特性 HashMap作为Java集合框架中最常用的数据结构之一,是基于哈希表的Map接口非同步实现。它允许使用null键和null值(但只能有一个null键),并且不保证映射顺序的恒久不变。与Hashtable相比,Hash…...

SOC-ESP32S3部分:30-I2S音频-麦克风扬声器驱动
飞书文档https://x509p6c8to.feishu.cn/wiki/SKZzwIRH3i7lsckUOlzcuJsdnVf I2S简介 I2S(Inter-Integrated Circuit Sound)是一种用于传输数字音频数据的通信协议,广泛应用于音频设备中。 ESP32-S3 包含 2 个 I2S 外设,通过配置…...

C#学习12——预处理
一、预处理指令: 解释:是在编译前由预处理器执行的命令,用于控制编译过程。这些命令以 # 开头,每行只能有一个预处理指令,且不能包含在方法或类中。 个人理解:就是游戏里面的备战阶段(不同对局…...