RabbitMq-接收消息+redis消费者重复接收
在接触RammitMQ时,好多文章都说在配置中设置属性
# rabbitmq 配置 rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)publisher-returns: true#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedlistener: #消费者 端配置retry:enabled: true # 是否支持重试default-requeue-rejected: falsemax-attempts: 5 #最大重试次数initial-interval: 3000 # 重试时间间隔direct:acknowledge-mode: manualsimple:acknowledge-mode: manual
消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。
----------------------------------正确思路---------------------------------------------------------------------------------
设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;
---------------------------------------代码
package com.charg.listener;import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.Duration;/*** rabbitmq 监听*/
@Slf4j
@Component
public class RabbitQueueListener {/*** 最大重试次数*/private static int maxReconsumeCount = 3;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听 队列的处理器** @param message*/@RabbitListener(queues = "队列名称")@RabbitHandlerpublic void onMessage(Message message, Channel channel) {//唯一标识String messageId = message.getMessageProperties().getMessageId();try {//判断messageId在redis中是否存在if (verificationMessageId(messageId)) {log.error("消息已重复处理,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {//不存在 则处理消息// 接收消息if (StringUtils.isNotBlank(new String(message.getBody()))) {//修改业务逻辑if (!false) {log.error("消息即将再次返回队列处理...逻辑错误");// 处理最大回调次数getMaximumNumber(message, channel);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//加入缓存addMessageId(message);}} else {log.info("消息为空拒绝接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception e) {e.printStackTrace();try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收----...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");// 处理最大回调次数getMaximumNumber(message, channel);}} catch (Exception exception) {exception.printStackTrace();}}}/*** 记录消息最大次数** @param message* @param channel* @throws IOException*/private void getMaximumNumber(Message message, Channel channel) {try {int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());if (maxReconsumeCount > recounsumeCounts) {log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 记录重试次数maxMessageId(message.getMessageProperties().getMessageId());} else {log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));// 将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 清除缓存RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());//重试三次后,还是失败 需记录到数据库addRabMsgLog(message);}} catch (Exception e) {e.printStackTrace();}}/*** 设置消息的最大重试次数*/public void maxMessageId(String messageId) {String messageMax ="messageMaxKey"+ messageId;// 存入缓存,用来记录该消息重试了几次if (RedisUtils.hasKey(messageMax)) {RedisUtils.incrAtomicValue(messageMax);} else {//错误的消息-插入数据库RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));}}/*** 校验消息是否消费过该消息** @param messageId 消息id* @return*/public boolean verificationMessageId(String messageId) {// 消息是否存在keyString verifyIsExistKey ="messageExistKey" + messageId;if ((RedisUtils.hasKey(verifyIsExistKey))) {return true;}return false;}/*** 保存消费过消息** @param message 消息* @return*/public void addMessageId(Message message) {// 存入缓存RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);}/*** 消息队列 失败日志 操作* 自己存数据库逻辑*/public void addRabMsgLog(Message message) {log.info("====操作日志===");//将内容记录到数据库}}
--------------------------------数据库表

相关文章:
RabbitMq-接收消息+redis消费者重复接收
在接触RammitMQ时,好多文章都说在配置中设置属性 # rabbitmq 配置 rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)pub…...
Orangepi Zero2 全志H616简介
为什么学 学习目标依然是Linux 系统 ,平台是 ARM 架构 蜂巢快递柜,配送机器人,这些应用场景用C51,STM32单片机无法实现 第三方介入库的局限性,比如刷脸支付和公交车收费设备需要集成支付宝SDK,提供的libalipay.so 是…...
Golang每日一练(leetDay0047)
目录 138. 复制带随机指针的链表 Copy List with Random-pointer 🌟🌟 139. 单词拆分 Word Break 🌟🌟 140. 单词拆分 II Word Break II 🌟🌟🌟 🌟 每日一练刷题专栏 &…...
HCL Nomad Web 1.0.7发布和新功能验证
大家好,才是真的好。 要问在HCL Notes/Domino系列产品中,谁更新得最快,那么答案一定是HCL Nomad Web。 你看上图右边,从1.0.1更新到1.0.7,都没花多少时间。 从HCL Nomad Web 1.0.5版本开始,可以支持直接…...
春招网申简历填写三技巧
网申第一关很重要,不夸张的说网申决定了你的笔试机会,从如信银行考试中心了解到,银行网申筛选过程中,有机器筛选人工筛选两道程序,掌握填写技巧后对提升简历通过率有较大帮助,一定要把握住,关于…...
计算机网络基础知识总结
经过学习我们可以知道: 关于计算机网络: ip地址端口号协议协议分层TCP五层协议协议封装两台计算机之间的通信 目录 ip地址 端口号 协议 协议分层 五层协议体系结构 (1) 应用层 (2) 运输层 (3) 网络层 (4)数据链路层 (5)物理层 封装&分用 两台主机之间的通信 …...
(下)苹果有开源,但又怎样呢?
一开始,因为 MacOS X ,苹果与 FreeBSD 过往从密,不仅挖来 FreeBSD 创始人 Jordan Hubbard,更是在此基础上开源了 Darwin。但是,苹果并没有给予 Darwin 太多关注,作为苹果的首个开源项目,它算不上…...
row_number 和 cte 使用实例:考场监考安排
row_number 和 cte 使用实例:考场监考安排 考场监考安排使用 cte 模拟两个表的原始数据使用 master..spt_values 进行数据填充优先安排时长较长的考试使用 cte 安排第一个需要安排的科目统计老师已有的监考时长尝试使用 cte 递归,进行下一场考试安排&…...
2023天梯赛记录
文章目录 L2-001 紧急救援L2-002 链表去重L2-004 这是二叉搜索树吗?L2-005 集合相似度L2-006 树的遍历L2-007 家庭房产L2-010 排座位L2-011 玩转二叉树L2-012 关于堆的判断L2-013 红色警报L2-014 列车调度L2-016 愿天下有情人都是失散多年的兄妹L2-019 悄悄关注L2-0…...
被吐槽 GitHub仓 库太大,直接 600M 瘦身到 6M,这下舒服了
大家好,我是小富~ 前言 忙里偷闲学习了点技术写了点demo代码,打算提交到我那 2000Star 的Github仓库上,居然发现有5个Issues,最近的一条日期已经是2022/8/1了,以前我还真没留意过这些,我这人懒…...
OpenGL(三)——着色器
目录 一、前言 二、Shader 2 Shader 2.1 顶点着色器 2.2 片段着色器 三、APP 2 Shader 四、顶点颜色属性 五、着色器类C 一、前言 着色器Shader是运行在GPU上的小程序,为图形渲染管线的某个特定部分而运行。各阶段着色器之间无法通信,只有输入和输…...
【MySQL】单表查询
一、表的准备 查询操作的SQL演示将基于下面这四张表进行,我们先创建好这四张数据表,并为其添加数据。 1、第一张表为部门表,名称为包含三个字段:部门编号(deptno),部门名称(dname&…...
第一章 安装Unity
使用Unity开发游戏的话,首先要安装Unity Hub和Unity Editor两个软件。大家可以去官方地址下载:https://unity.cn/releases/full/2020 (这里我们选择的是2020版本) Unity Hub 是安装 Unity Editor、创建项目、管理帐户和许可证的主…...
20230425----重返学习-vue项目-vue自定义指令-vue-cli的配置
day-057-fifty-seven-20230425-vue项目-vue自定义指令-vue-cli的配置 vue项目 vuex版 普通版纯axios:切换页面,就会重新发送一次ajax请求普通版升级:vuex版vuex的常用功能 vuex 数据通信vuex 缓存数据 前进后退,切换页面&#…...
el-input 只能输入整数(包括正数、负数、0)或者只能输入整数(包括正数、负数、0)和小数
使用el-input-number标签 也可以使用typenumbe和v-model.number属性,两者结合使用,能满足大多数需求,如果还不满足,可以再结合正则表达式过滤 <el-input v-model.number"value" type"number" /> el-i…...
Docker Compose的常用命令与docker-compose.yml脚本属性配置
Docker Compose的常用命令与配置 常见命令ps:列出所有运行容器logs:查看服务日志输出port:打印绑定的公共端口build:构建或者重新构建服务start:启动指定服务已存在的容器stop:停止已运行的服务的容器&…...
with语句和上下文管理器(py编程)
1. with语句的使用 基础班向文件中写入数据的示例代码: # 1、以写的方式打开文件f open("1.txt", "w")# 2、写入文件内容f.write("hello world")# 3、关闭文件f.close()代码说明: 文件使用完后必须关闭,因为文件对象会占用操作系统…...
《JavaEE初阶》HTTP协议和HTTPS
《JavaEE初阶》HTTP协议和HTTPS 文章目录 《JavaEE初阶》HTTP协议和HTTPSHTTP协议是应用层协议:使用Fiddler抓取HTTP请求和响应:Fiddler的下载和基本使用:Fiddler的中间代理人身份:其他抓包工具: 先简单认识HTTP请求与HTTP响应:HTTP请求:HTTP响应: HTTP请求详解:首行࿱…...
微信小程序 | 基于高德地图+ChatGPT实现旅游规划小程序
🎈🎈效果预览🎈🎈 ❤ 路劲规划 ❤ 功能总览 ❤ ChatGPT交互 一、需求背景 五一假期即即将到来,在大家都阳过之后,截止到目前这应该是最安全的一个假期。所以出去旅游想必是大多数人的选择。 然后&#x…...
Excel技能之实用技巧,高手私藏
今天来讲一下Excel技巧,工作常用,高手私藏。能帮到你是我最大的荣幸。 与其加班熬夜赶进度,不如下班学习提效率。能力有成长,效率提上去,自然不用加班。 消化吸收,工作中立马使用,感觉真不错。…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...
循环冗余码校验CRC码 算法步骤+详细实例计算
通信过程:(白话解释) 我们将原始待发送的消息称为 M M M,依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)(意思就是 G ( x ) G(x) G(x) 是已知的)࿰…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
【MATLAB代码】基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),附源代码|订阅专栏后可直接查看
文章所述的代码实现了基于最大相关熵准则(MCC)的三维鲁棒卡尔曼滤波算法(MCC-KF),针对传感器观测数据中存在的脉冲型异常噪声问题,通过非线性加权机制提升滤波器的抗干扰能力。代码通过对比传统KF与MCC-KF在含异常值场景下的表现,验证了后者在状态估计鲁棒性方面的显著优…...
日常一水C
多态 言简意赅:就是一个对象面对同一事件时做出的不同反应 而之前的继承中说过,当子类和父类的函数名相同时,会隐藏父类的同名函数转而调用子类的同名函数,如果要调用父类的同名函数,那么就需要对父类进行引用&#…...
深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏
一、引言 在深度学习中,我们训练出的神经网络往往非常庞大(比如像 ResNet、YOLOv8、Vision Transformer),虽然精度很高,但“太重”了,运行起来很慢,占用内存大,不适合部署到手机、摄…...
文件上传漏洞防御全攻略
要全面防范文件上传漏洞,需构建多层防御体系,结合技术验证、存储隔离与权限控制: 🔒 一、基础防护层 前端校验(仅辅助) 通过JavaScript限制文件后缀名(白名单)和大小,提…...
【java面试】微服务篇
【java面试】微服务篇 一、总体框架二、Springcloud(一)Springcloud五大组件(二)服务注册和发现1、Eureka2、Nacos (三)负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...
前端工具库lodash与lodash-es区别详解
lodash 和 lodash-es 是同一工具库的两个不同版本,核心功能完全一致,主要区别在于模块化格式和优化方式,适合不同的开发环境。以下是详细对比: 1. 模块化格式 lodash 使用 CommonJS 模块格式(require/module.exports&a…...
