当前位置: 首页 > news >正文

RabbitMQ使用StringRedisTemplate-防止重复消费

造成重复消费的原因:

MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消费者再次推送该条message,这样就造成了重复消费。

解决重复消费的办法:

用存储(redis或者mysql)记录一下已经消费的message的id,当message被消费前先去存储中查一下消费记录,没有该条message的id则正常消费返回ack,有该条message的id的话不用消费直接返回ack给MQ。

当然实际生产中的话选用redis是比较好的选择,毕竟查mysql要进行磁盘IO,效率要低得多,而且绝大多数重复消费都是由于MQ没有收到消费者的ack于是造成MQ再次向消费者进行同一条message的投递。所以message的消费记录其实我们并不需要一直记录,只需要保存一段时间,当下次投递过来的时候消费者能查到消费记录然后准确返回ack给MQ就行。

yml

#配置rabbitMq 服务器rabbitmq:host: xxxx#rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口port: xxxxusername: xxxxpassword: xxxx#虚拟host 可以不设置,使用server默认hostvirtual-host: xxxxconnection-timeout: 0#确认消息已发送到队列(Queue)publisher-returns: true #确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 设置消费端手动 acklistener:simple:retry:# 开启消费者(程序出现异常的情况下)进行重试enabled: true#重试间隔时间max-interval: 1000# 最大重试次数max-attempts: 3#开启手动确认消息acknowledge-mode: manual

监听类
 

package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel;
import com.rabbitmqprovider.commons.CommonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重复消费*/
@Slf4j
@Service
public class TestBasicService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以写在类、方法上* @param channel* @param message* @throws IOException*///@RabbitListener(queues = {CommonUtils.queueStr})@RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId= message.getMessageProperties().getMessageId();String msg = new String(message.getBody(),"UTF-8");//判断messageId在redis中是否存在boolean flage=stringRedisTemplate(messageId,msg);if(!flage){log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息}else{//如果要防止 重复消费,则需要将 id值存在 redis,每次 都要去redis中拿id比对,是否存在,存在则消费过->messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info("接收到的消息{}->"+redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理失败,拒绝再次接收...");channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error("消息即将再次返回队列处理...");channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判断Key是否存在* @param messageId 唯一表示key* @param msg       value值* @return*/private boolean stringRedisTemplate(String messageId,String msg){log.info("messageId="+messageId);//判断Key是否存在 有则返回true,没有则返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;}
}

------------------------------------------controller--------------------------------------------------

/*** 解决重复消费问题*/
@GetMapping("/sendMessageTestOnly")
public void sendMessageTestOnly(){JSONObject jsonObject = new JSONObject();jsonObject.put("message","世界很大!");jsonObject.put("msg","你想去看看么?");String json = jsonObject.toJSONString();String messageId=UUID.randomUUID()+"";Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
}

---------------------------------回调------------------------------------------------------

package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 当消息由生产者发到交换机后会回调该接口中的confirm方法*/
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败* cause 表示失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info("交换机收到消息 消息内容为{}->", correlationData);}else {log.info("交换机未收到消息消息内容为{}, 原因为{}->", correlationData, cause);}}}

-----------------------------------------------------------------------------------------------------------------

执行顺序时:先发送消息;然后在接收消息,并判断消息是否重复,如果不重复 则回复消息,否则 拒绝回复;最后回调。

相关文章:

RabbitMQ使用StringRedisTemplate-防止重复消费

造成重复消费的原因: MQ向消费者推送message,消费者向MQ返回ack,告知所推送的消息消费成功。但是由于网络波动等原因,可能造成消费者向MQ返回的ack丢失。MQ长时间(一分钟)收不到ack,于是会向消…...

临沂大学张继群寄语

目录 寄语 1、不能有不良睹好 2、坚毅的个性和勤奋的品质 3、会存钱...

线程学习笔记

1:Thread 线程的生命周期控制 2:Runnable 可执行的任务和程序 3:Callable 执行程序后返回结果 4:Future 收集程序返回结果 5:Executor 线程池 6:ForkJoin 默认线程池 每个线程有工作队列 工作窃取 7:RunnableFuture FutureTask 实现 Runnable 和 Future 执…...

代码随想录算法训练营第四十二天|01背包问题,你该了解这些!、01背包问题,你该了解这些! 滚动数组 、416. 分割等和子集

文章目录 01背包问题,你该了解这些!01背包问题,你该了解这些! 滚动数组416. 分割等和子集 01背包问题,你该了解这些! 题目链接:代码随想录 二维数组解决0-1背包问题 解题思路: 1.dp…...

结构体指针、数组指针和结构体数组指针

结构体指针 首先让我们定义结构体: struct stu { char name[20]; long number; float score[4]; }; 再定义指向结构体类型变量的指针变量: struct stu *student; /*定义结构体类型指针*/ student malloc(sizeof(struct stu)); /*为指针变量分…...

项目架构一些注意点

考虑系统的 稳定性 一、微服务的稳定性 1、如何解决那些不稳定的因素/问题?也是常说的如何容错。 2、一个系统的高可用取决于它本身和其强依赖的组件的高可用 3、消除单点 保活机制 健康检查 注册中心如何保障稳定性 注册中心集群 微服务本身对注册信息的本地持…...

Forefront GPT-4免费版:开启无限畅聊时代,乐享人工智能快感,无限制“白嫖”,还能和N多角色一起聊天?赶紧注册,再过些时间估计就要收费了

目录 前言注册登录方式应用体验聊天体验绘图体验 “是打算先免费后收费吗?”建议其它资料下载 前言 近期,人工智能技术迎来重大飞跃,OpenAI的ChatGPT等工具成为全球数亿人探索提高生产力和增强创造力的新方法。人们现在可以使用人工智能驱动…...

深入浅出 Compose Compiler(1) Kotlin Compiler KCP

前言 Compose 的语法简洁、代码效率非常高,这主要得益于 Compose Compiler 的一系列编译期魔法,帮开发者生成了很多样板代码。但编译期插桩也阻碍了我们对于 Compose 运行原理的认知,想要真正读懂 Compose 就必须先了解它的 Compiler。本系列…...

BatchNormalization和LayerNormalization的理解、适用范围、PyTorch代码示例

文章目录 为什么要NormalizationBatchNormLayerNormtorch代码示例 学习神经网络归一化时,文章形形色色,但没找到适合小白通俗易懂且全面的。学习过后,特此记录。 为什么要Normalization 当输入数据量级极大或极小时,为保证输出数…...

大数据 | 实验二:文档倒排索引算法实现

文章目录 📚实验目的📚实验平台📚实验内容🐇在本地编写程序和调试🥕代码框架思路🥕代码实现 🐇在集群上提交作业并执行🥕在集群上提交作业并执行,同本地执行相比即需修改…...

Java文档注释-JavaDoc标签

标签含义author指定作者{code}使用代码字体以原样显示信息,不处理HTML样式deprecated指定程序元素已经过时{docRoot}指定当前文档的根目录路径exception标识由方法或构造函数抛出的异常{inheritDoc}从直接超类中继承注释{link}插入指向另外一个主题的内联链接{linkp…...

黑盒测试过程中【测试方法】详解5-输入域,输出域,猜错法

在黑盒测试过程中,有9种常用的方法:1.等价类划分 2.边界值分析 3.判定表法 4.正交实验法 5.流程图分析 6.因果图法 7.输入域覆盖法 8.输出域覆盖法 9.猜错法 黑盒测试过程中【测试方法】讲解1-等价类,边界值,判定表_朝一…...

Python学习之sh(shell脚本)在Python中的使用

文章目录 前言一、sh是什么?二、使用步骤1.安装2.使用示例3.使用sh执行命令4.关键字参数5.查找命令6.Baking参数 前言 本文章向大家介绍[Python库]分析一个python库–sh(系统调用),主要内容包括其使用实例、应用技巧、基本知识点…...

追求卓越:编写高质量代码的方法和技巧

本文讨论了编写高质量代码的重要性,并详细介绍了高质量代码的特征、编程实践技巧和软件工程方法论。通过遵循这些原则和实践,程序员可以编写出更稳定、可维护和可扩展的代码。 一、 前言 写出高质量代码是每个程序员的追求和目标。高质量的代码可以使程…...

MATLAB算法实战应用案例精讲-【人工智能】机器视觉(概念篇)(最终篇)

目录 前言 几个高频面试题目 如何评价一个光源的好坏? 如何依靠光源增强图像对比度?...

【老王读SpringMVC-3】根据 url 是如何找到 controller method 的?

前面分析了 request 与 handler method 映射关系的注册,现在再来分析一下 SpringMVC 是如何根据 request 来获取对应的 handler method 的? 可能有人会说,既然已经将 request 与 handler method 映射关系注册保存在了 AbstractHandlerMethodMapping.Ma…...

人机交互到艺术设计及玫瑰花绘制实例

Python库之图形用户界面 Riverbank Computing | Introduction Welcome to wxPython! | wxPython Overview — PyGObject Python库之游戏开发 https://www.pygame.org/news Panda3D | Open Source Framework for 3D Rendering & Games python.cocos2d.org Python库之…...

多臂老虎机问题

1.问题简介 多臂老虎机问题可以被看作简化版的强化学习问题,算是最简单的“和环境交互中的学习”的一种形式,不存在状态信息,只有动作和奖励。多臂老虎机中的探索与利用(exploration vs. exploitation)问题一直以来都…...

DNS 查询原理详解

DNS(Domain Name System)是互联网上的一种命名系统,它将域名转换为IP地址。在进行DNS查询时,先要明确需要查询的主机名,然后向本地DNS服务器发出查询请求。 1. 本地DNS服务器查询 当用户在浏览器中输入一个URL或者点…...

浅谈软件测试工程师的技能树

软件测试工程师是一个历史很悠久的职位,可以说从有软件开发这个行业以来,就开始有了软件测试工程师的角色。随着时代的发展,软件测试工程师的角色和职责也在悄然发生着变化,从一开始单纯的在瀑布式开发流程中担任测试阶段的执行者…...

浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)

✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...

安卓基础(aar)

重新设置java21的环境,临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的: MyApp/ ├── app/ …...

网站指纹识别

网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...

QT开发技术【ffmpeg + QAudioOutput】音乐播放器

一、 介绍 使用ffmpeg 4.2.2 在数字化浪潮席卷全球的当下,音视频内容犹如璀璨繁星,点亮了人们的生活与工作。从短视频平台上令人捧腹的搞笑视频,到在线课堂中知识渊博的专家授课,再到影视平台上扣人心弦的高清大片,音…...