RabbitMq-接收消息+redis消费者重复接收
在接触RammitMQ时,好多文章都说在配置中设置属性
# rabbitmq 配置 rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)publisher-returns: true#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedlistener: #消费者 端配置retry:enabled: true # 是否支持重试default-requeue-rejected: falsemax-attempts: 5 #最大重试次数initial-interval: 3000 # 重试时间间隔direct:acknowledge-mode: manualsimple:acknowledge-mode: manual
消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。
----------------------------------正确思路---------------------------------------------------------------------------------
设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;
---------------------------------------代码
package com.charg.listener;import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.Duration;/*** rabbitmq 监听*/
@Slf4j
@Component
public class RabbitQueueListener {/*** 最大重试次数*/private static int maxReconsumeCount = 3;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听 队列的处理器** @param message*/@RabbitListener(queues = "队列名称")@RabbitHandlerpublic void onMessage(Message message, Channel channel) {//唯一标识String messageId = message.getMessageProperties().getMessageId();try {//判断messageId在redis中是否存在if (verificationMessageId(messageId)) {log.error("消息已重复处理,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {//不存在 则处理消息// 接收消息if (StringUtils.isNotBlank(new String(message.getBody()))) {//修改业务逻辑if (!false) {log.error("消息即将再次返回队列处理...逻辑错误");// 处理最大回调次数getMaximumNumber(message, channel);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//加入缓存addMessageId(message);}} else {log.info("消息为空拒绝接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception e) {e.printStackTrace();try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收----...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");// 处理最大回调次数getMaximumNumber(message, channel);}} catch (Exception exception) {exception.printStackTrace();}}}/*** 记录消息最大次数** @param message* @param channel* @throws IOException*/private void getMaximumNumber(Message message, Channel channel) {try {int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());if (maxReconsumeCount > recounsumeCounts) {log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 记录重试次数maxMessageId(message.getMessageProperties().getMessageId());} else {log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));// 将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 清除缓存RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());//重试三次后,还是失败 需记录到数据库addRabMsgLog(message);}} catch (Exception e) {e.printStackTrace();}}/*** 设置消息的最大重试次数*/public void maxMessageId(String messageId) {String messageMax ="messageMaxKey"+ messageId;// 存入缓存,用来记录该消息重试了几次if (RedisUtils.hasKey(messageMax)) {RedisUtils.incrAtomicValue(messageMax);} else {//错误的消息-插入数据库RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));}}/*** 校验消息是否消费过该消息** @param messageId 消息id* @return*/public boolean verificationMessageId(String messageId) {// 消息是否存在keyString verifyIsExistKey ="messageExistKey" + messageId;if ((RedisUtils.hasKey(verifyIsExistKey))) {return true;}return false;}/*** 保存消费过消息** @param message 消息* @return*/public void addMessageId(Message message) {// 存入缓存RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);}/*** 消息队列 失败日志 操作* 自己存数据库逻辑*/public void addRabMsgLog(Message message) {log.info("====操作日志===");//将内容记录到数据库}}
--------------------------------数据库表

相关文章:
RabbitMq-接收消息+redis消费者重复接收
在接触RammitMQ时,好多文章都说在配置中设置属性 # rabbitmq 配置 rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)pub…...
Orangepi Zero2 全志H616简介
为什么学 学习目标依然是Linux 系统 ,平台是 ARM 架构 蜂巢快递柜,配送机器人,这些应用场景用C51,STM32单片机无法实现 第三方介入库的局限性,比如刷脸支付和公交车收费设备需要集成支付宝SDK,提供的libalipay.so 是…...
Golang每日一练(leetDay0047)
目录 138. 复制带随机指针的链表 Copy List with Random-pointer 🌟🌟 139. 单词拆分 Word Break 🌟🌟 140. 单词拆分 II Word Break II 🌟🌟🌟 🌟 每日一练刷题专栏 &…...
HCL Nomad Web 1.0.7发布和新功能验证
大家好,才是真的好。 要问在HCL Notes/Domino系列产品中,谁更新得最快,那么答案一定是HCL Nomad Web。 你看上图右边,从1.0.1更新到1.0.7,都没花多少时间。 从HCL Nomad Web 1.0.5版本开始,可以支持直接…...
春招网申简历填写三技巧
网申第一关很重要,不夸张的说网申决定了你的笔试机会,从如信银行考试中心了解到,银行网申筛选过程中,有机器筛选人工筛选两道程序,掌握填写技巧后对提升简历通过率有较大帮助,一定要把握住,关于…...
计算机网络基础知识总结
经过学习我们可以知道: 关于计算机网络: ip地址端口号协议协议分层TCP五层协议协议封装两台计算机之间的通信 目录 ip地址 端口号 协议 协议分层 五层协议体系结构 (1) 应用层 (2) 运输层 (3) 网络层 (4)数据链路层 (5)物理层 封装&分用 两台主机之间的通信 …...
(下)苹果有开源,但又怎样呢?
一开始,因为 MacOS X ,苹果与 FreeBSD 过往从密,不仅挖来 FreeBSD 创始人 Jordan Hubbard,更是在此基础上开源了 Darwin。但是,苹果并没有给予 Darwin 太多关注,作为苹果的首个开源项目,它算不上…...
row_number 和 cte 使用实例:考场监考安排
row_number 和 cte 使用实例:考场监考安排 考场监考安排使用 cte 模拟两个表的原始数据使用 master..spt_values 进行数据填充优先安排时长较长的考试使用 cte 安排第一个需要安排的科目统计老师已有的监考时长尝试使用 cte 递归,进行下一场考试安排&…...
2023天梯赛记录
文章目录 L2-001 紧急救援L2-002 链表去重L2-004 这是二叉搜索树吗?L2-005 集合相似度L2-006 树的遍历L2-007 家庭房产L2-010 排座位L2-011 玩转二叉树L2-012 关于堆的判断L2-013 红色警报L2-014 列车调度L2-016 愿天下有情人都是失散多年的兄妹L2-019 悄悄关注L2-0…...
被吐槽 GitHub仓 库太大,直接 600M 瘦身到 6M,这下舒服了
大家好,我是小富~ 前言 忙里偷闲学习了点技术写了点demo代码,打算提交到我那 2000Star 的Github仓库上,居然发现有5个Issues,最近的一条日期已经是2022/8/1了,以前我还真没留意过这些,我这人懒…...
OpenGL(三)——着色器
目录 一、前言 二、Shader 2 Shader 2.1 顶点着色器 2.2 片段着色器 三、APP 2 Shader 四、顶点颜色属性 五、着色器类C 一、前言 着色器Shader是运行在GPU上的小程序,为图形渲染管线的某个特定部分而运行。各阶段着色器之间无法通信,只有输入和输…...
【MySQL】单表查询
一、表的准备 查询操作的SQL演示将基于下面这四张表进行,我们先创建好这四张数据表,并为其添加数据。 1、第一张表为部门表,名称为包含三个字段:部门编号(deptno),部门名称(dname&…...
第一章 安装Unity
使用Unity开发游戏的话,首先要安装Unity Hub和Unity Editor两个软件。大家可以去官方地址下载:https://unity.cn/releases/full/2020 (这里我们选择的是2020版本) Unity Hub 是安装 Unity Editor、创建项目、管理帐户和许可证的主…...
20230425----重返学习-vue项目-vue自定义指令-vue-cli的配置
day-057-fifty-seven-20230425-vue项目-vue自定义指令-vue-cli的配置 vue项目 vuex版 普通版纯axios:切换页面,就会重新发送一次ajax请求普通版升级:vuex版vuex的常用功能 vuex 数据通信vuex 缓存数据 前进后退,切换页面&#…...
el-input 只能输入整数(包括正数、负数、0)或者只能输入整数(包括正数、负数、0)和小数
使用el-input-number标签 也可以使用typenumbe和v-model.number属性,两者结合使用,能满足大多数需求,如果还不满足,可以再结合正则表达式过滤 <el-input v-model.number"value" type"number" /> el-i…...
Docker Compose的常用命令与docker-compose.yml脚本属性配置
Docker Compose的常用命令与配置 常见命令ps:列出所有运行容器logs:查看服务日志输出port:打印绑定的公共端口build:构建或者重新构建服务start:启动指定服务已存在的容器stop:停止已运行的服务的容器&…...
with语句和上下文管理器(py编程)
1. with语句的使用 基础班向文件中写入数据的示例代码: # 1、以写的方式打开文件f open("1.txt", "w")# 2、写入文件内容f.write("hello world")# 3、关闭文件f.close()代码说明: 文件使用完后必须关闭,因为文件对象会占用操作系统…...
《JavaEE初阶》HTTP协议和HTTPS
《JavaEE初阶》HTTP协议和HTTPS 文章目录 《JavaEE初阶》HTTP协议和HTTPSHTTP协议是应用层协议:使用Fiddler抓取HTTP请求和响应:Fiddler的下载和基本使用:Fiddler的中间代理人身份:其他抓包工具: 先简单认识HTTP请求与HTTP响应:HTTP请求:HTTP响应: HTTP请求详解:首行࿱…...
微信小程序 | 基于高德地图+ChatGPT实现旅游规划小程序
🎈🎈效果预览🎈🎈 ❤ 路劲规划 ❤ 功能总览 ❤ ChatGPT交互 一、需求背景 五一假期即即将到来,在大家都阳过之后,截止到目前这应该是最安全的一个假期。所以出去旅游想必是大多数人的选择。 然后&#x…...
Excel技能之实用技巧,高手私藏
今天来讲一下Excel技巧,工作常用,高手私藏。能帮到你是我最大的荣幸。 与其加班熬夜赶进度,不如下班学习提效率。能力有成长,效率提上去,自然不用加班。 消化吸收,工作中立马使用,感觉真不错。…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...
智能AI电话机器人系统的识别能力现状与发展水平
一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
