当前位置: 首页 > news >正文

RabbitMq-接收消息+redis消费者重复接收

在接触RammitMQ时,好多文章都说在配置中设置属性 

# rabbitmq 配置
rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)publisher-returns: true#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlatedlistener: #消费者 端配置retry:enabled: true # 是否支持重试default-requeue-rejected: falsemax-attempts: 5 #最大重试次数initial-interval: 3000 # 重试时间间隔direct:acknowledge-mode: manualsimple:acknowledge-mode: manual

消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。

----------------------------------正确思路---------------------------------------------------------------------------------

设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;

---------------------------------------代码

package com.charg.listener;import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.time.Duration;/*** rabbitmq 监听*/
@Slf4j
@Component
public class RabbitQueueListener {/*** 最大重试次数*/private static int maxReconsumeCount = 3;@Autowiredprivate StringRedisTemplate redisTemplate;/*** 监听  队列的处理器** @param message*/@RabbitListener(queues = "队列名称")@RabbitHandlerpublic void onMessage(Message message, Channel channel) {//唯一标识String messageId = message.getMessageProperties().getMessageId();try {//判断messageId在redis中是否存在if (verificationMessageId(messageId)) {log.error("消息已重复处理,拒绝再次接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {//不存在 则处理消息// 接收消息if (StringUtils.isNotBlank(new String(message.getBody()))) {//修改业务逻辑if (!false) {log.error("消息即将再次返回队列处理...逻辑错误");// 处理最大回调次数getMaximumNumber(message, channel);} else {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//加入缓存addMessageId(message);}} else {log.info("消息为空拒绝接收...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}}} catch (Exception e) {e.printStackTrace();try {if (message.getMessageProperties().getRedelivered()) {log.error("消息已重复处理,拒绝再次接收----...");// 拒绝消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {log.error("消息即将再次返回队列处理...");// 处理最大回调次数getMaximumNumber(message, channel);}} catch (Exception exception) {exception.printStackTrace();}}}/*** 记录消息最大次数** @param message* @param channel* @throws IOException*/private void getMaximumNumber(Message message, Channel channel) {try {int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());if (maxReconsumeCount > recounsumeCounts) {log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);// 记录重试次数maxMessageId(message.getMessageProperties().getMessageId());} else {log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));// 将消息重新放回队列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 清除缓存RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());//重试三次后,还是失败 需记录到数据库addRabMsgLog(message);}} catch (Exception e) {e.printStackTrace();}}/*** 设置消息的最大重试次数*/public void maxMessageId(String messageId) {String messageMax ="messageMaxKey"+ messageId;// 存入缓存,用来记录该消息重试了几次if (RedisUtils.hasKey(messageMax)) {RedisUtils.incrAtomicValue(messageMax);} else {//错误的消息-插入数据库RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));}}/*** 校验消息是否消费过该消息** @param messageId 消息id* @return*/public boolean verificationMessageId(String messageId) {// 消息是否存在keyString verifyIsExistKey ="messageExistKey" + messageId;if ((RedisUtils.hasKey(verifyIsExistKey))) {return true;}return false;}/*** 保存消费过消息** @param message 消息* @return*/public void addMessageId(Message message) {// 存入缓存RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);}/*** 消息队列 失败日志 操作* 自己存数据库逻辑*/public void addRabMsgLog(Message message) {log.info("====操作日志===");//将内容记录到数据库}}
--------------------------------数据库表

 

相关文章:

RabbitMq-接收消息+redis消费者重复接收

在接触RammitMQ时,好多文章都说在配置中设置属性 # rabbitmq 配置 rabbitmq:host: xxx.xxx.xxx.xxxport: xxxxusername: xxxpassword: xxxxxx## 生产端配置# 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除#确认消息已发送到队列(Queue)pub…...

Orangepi Zero2 全志H616简介

为什么学 学习目标依然是Linux 系统 ,平台是 ARM 架构 蜂巢快递柜,配送机器人,这些应用场景用C51,STM32单片机无法实现 第三方介入库的局限性,比如刷脸支付和公交车收费设备需要集成支付宝SDK,提供的libalipay.so 是…...

Golang每日一练(leetDay0047)

目录 138. 复制带随机指针的链表 Copy List with Random-pointer 🌟🌟 139. 单词拆分 Word Break 🌟🌟 140. 单词拆分 II Word Break II 🌟🌟🌟 🌟 每日一练刷题专栏 &…...

HCL Nomad Web 1.0.7发布和新功能验证

大家好,才是真的好。 要问在HCL Notes/Domino系列产品中,谁更新得最快,那么答案一定是HCL Nomad Web。 你看上图右边,从1.0.1更新到1.0.7,都没花多少时间。 从HCL Nomad Web 1.0.5版本开始,可以支持直接…...

春招网申简历填写三技巧

网申第一关很重要,不夸张的说网申决定了你的笔试机会,从如信银行考试中心了解到,银行网申筛选过程中,有机器筛选人工筛选两道程序,掌握填写技巧后对提升简历通过率有较大帮助,一定要把握住,关于…...

计算机网络基础知识总结

经过学习我们可以知道: 关于计算机网络: ip地址端口号协议协议分层TCP五层协议协议封装两台计算机之间的通信 目录 ip地址 端口号 协议 协议分层 五层协议体系结构 (1) 应用层 (2) 运输层 (3) 网络层 (4)数据链路层 (5)物理层 封装&分用 两台主机之间的通信 …...

(下)苹果有开源,但又怎样呢?

一开始,因为 MacOS X ,苹果与 FreeBSD 过往从密,不仅挖来 FreeBSD 创始人 Jordan Hubbard,更是在此基础上开源了 Darwin。但是,苹果并没有给予 Darwin 太多关注,作为苹果的首个开源项目,它算不上…...

row_number 和 cte 使用实例:考场监考安排

row_number 和 cte 使用实例:考场监考安排 考场监考安排使用 cte 模拟两个表的原始数据使用 master..spt_values 进行数据填充优先安排时长较长的考试使用 cte 安排第一个需要安排的科目统计老师已有的监考时长尝试使用 cte 递归,进行下一场考试安排&…...

2023天梯赛记录

文章目录 L2-001 紧急救援L2-002 链表去重L2-004 这是二叉搜索树吗?L2-005 集合相似度L2-006 树的遍历L2-007 家庭房产L2-010 排座位L2-011 玩转二叉树L2-012 关于堆的判断L2-013 红色警报L2-014 列车调度L2-016 愿天下有情人都是失散多年的兄妹L2-019 悄悄关注L2-0…...

被吐槽 GitHub仓 库太大,直接 600M 瘦身到 6M,这下舒服了

大家好,我是小富~ 前言 忙里偷闲学习了点技术写了点demo代码,打算提交到我那 2000Star 的Github仓库上,居然发现有5个Issues,最近的一条日期已经是2022/8/1了,以前我还真没留意过这些,我这人懒…...

OpenGL(三)——着色器

目录 一、前言 二、Shader 2 Shader 2.1 顶点着色器 2.2 片段着色器 三、APP 2 Shader 四、顶点颜色属性 五、着色器类C 一、前言 着色器Shader是运行在GPU上的小程序,为图形渲染管线的某个特定部分而运行。各阶段着色器之间无法通信,只有输入和输…...

【MySQL】单表查询

一、表的准备 查询操作的SQL演示将基于下面这四张表进行,我们先创建好这四张数据表,并为其添加数据。 1、第一张表为部门表,名称为包含三个字段:部门编号(deptno),部门名称(dname&…...

第一章 安装Unity

使用Unity开发游戏的话,首先要安装Unity Hub和Unity Editor两个软件。大家可以去官方地址下载:https://unity.cn/releases/full/2020 (这里我们选择的是2020版本) Unity Hub 是安装 Unity Editor、创建项目、管理帐户和许可证的主…...

20230425----重返学习-vue项目-vue自定义指令-vue-cli的配置

day-057-fifty-seven-20230425-vue项目-vue自定义指令-vue-cli的配置 vue项目 vuex版 普通版纯axios:切换页面,就会重新发送一次ajax请求普通版升级:vuex版vuex的常用功能 vuex 数据通信vuex 缓存数据 前进后退,切换页面&#…...

el-input 只能输入整数(包括正数、负数、0)或者只能输入整数(包括正数、负数、0)和小数

使用el-input-number标签 也可以使用typenumbe和v-model.number属性&#xff0c;两者结合使用&#xff0c;能满足大多数需求&#xff0c;如果还不满足&#xff0c;可以再结合正则表达式过滤 <el-input v-model.number"value" type"number" /> el-i…...

Docker Compose的常用命令与docker-compose.yml脚本属性配置

Docker Compose的常用命令与配置 常见命令ps&#xff1a;列出所有运行容器logs&#xff1a;查看服务日志输出port&#xff1a;打印绑定的公共端口build&#xff1a;构建或者重新构建服务start&#xff1a;启动指定服务已存在的容器stop&#xff1a;停止已运行的服务的容器&…...

with语句和上下文管理器(py编程)

1. with语句的使用 基础班向文件中写入数据的示例代码: # 1、以写的方式打开文件f open("1.txt", "w")# 2、写入文件内容f.write("hello world")# 3、关闭文件f.close()代码说明: 文件使用完后必须关闭&#xff0c;因为文件对象会占用操作系统…...

《JavaEE初阶》HTTP协议和HTTPS

《JavaEE初阶》HTTP协议和HTTPS 文章目录 《JavaEE初阶》HTTP协议和HTTPSHTTP协议是应用层协议:使用Fiddler抓取HTTP请求和响应:Fiddler的下载和基本使用:Fiddler的中间代理人身份:其他抓包工具: 先简单认识HTTP请求与HTTP响应:HTTP请求:HTTP响应: HTTP请求详解:首行&#xff1…...

微信小程序 | 基于高德地图+ChatGPT实现旅游规划小程序

&#x1f388;&#x1f388;效果预览&#x1f388;&#x1f388; ❤ 路劲规划 ❤ 功能总览 ❤ ChatGPT交互 一、需求背景 五一假期即即将到来&#xff0c;在大家都阳过之后&#xff0c;截止到目前这应该是最安全的一个假期。所以出去旅游想必是大多数人的选择。 然后&#x…...

Excel技能之实用技巧,高手私藏

今天来讲一下Excel技巧&#xff0c;工作常用&#xff0c;高手私藏。能帮到你是我最大的荣幸。 与其加班熬夜赶进度&#xff0c;不如下班学习提效率。能力有成长&#xff0c;效率提上去&#xff0c;自然不用加班。 消化吸收&#xff0c;工作中立马使用&#xff0c;感觉真不错。…...

<6>-MySQL表的增删查改

目录 一&#xff0c;create&#xff08;创建表&#xff09; 二&#xff0c;retrieve&#xff08;查询表&#xff09; 1&#xff0c;select列 2&#xff0c;where条件 三&#xff0c;update&#xff08;更新表&#xff09; 四&#xff0c;delete&#xff08;删除表&#xf…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

python/java环境配置

环境变量放一起 python&#xff1a; 1.首先下载Python Python下载地址&#xff1a;Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个&#xff0c;然后自定义&#xff0c;全选 可以把前4个选上 3.环境配置 1&#xff09;搜高级系统设置 2…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

HashMap中的put方法执行流程(流程图)

1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中&#xff0c;其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下&#xff1a; 初始判断与哈希计算&#xff1a; 首先&#xff0c;putVal 方法会检查当前的 table&#xff08;也就…...

基于Springboot+Vue的办公管理系统

角色&#xff1a; 管理员、员工 技术&#xff1a; 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能&#xff1a; 该办公管理系统是一个综合性的企业内部管理平台&#xff0c;旨在提升企业运营效率和员工管理水…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机&#xff0c;点进去 点进去 查看URL&#xff0c;有 ?fileflag.php说明存在文件包含&#xff0c;原理是php://filter 协议 当它与包含函数结合时&#xff0c;php://filter流会被当作php文件执行。 用php://filter加编码&#xff0c;能让PHP把文件内容…...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...