RabbitMQ消息可靠性(二)-- 消费者消息确认
一、消费者消息确认是什么?
在这种机制下,消费者在接收到消息后,需要向 RabbitMQ 发送确认信息,告知 RabbitMQ 已经接收到该消息,并已经处理完毕。如果 RabbitMQ 没有接收到确认信息,则会将该消息重新加入队列,等待其他消费者继续处理。
消费者消息确认机制能够保证消息不会因为消费者宕机或其他原因而丢失,从而保证了消息的可靠性和稳定性。
RabbitMQ 支持两种消费者消息确认机制:自动确认和手动确认。在自动确认模式下,消费者在接收到消息后,RabbitMQ 会自动将该消息标记为已经确认。在手动确认模式下,消费者需要向 RabbitMQ 显式地发送确认信息,才能完成消息的确认。
二、代码实现
1.修改application.yml 配置
spring:rabbitmq:listener:simple:# RabbitMQ开启手动确认acknowledge-mode: manual
而SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
2.消费者确认
生产者发送一笔需要消费的订单到Direct Exchange直连交换机
@GetMapping("/sendDirectMessage")@ApiOperation(value = "sendDirectMessage")@ApiOperationSupport(order = 1)public String sendDirectMessage(@RequestParam String orderNo){//设置消息唯一IDString uniqueId = "MQ"+ DateUtils.dateTimeNow("yyyyMMddHHmmss")+ RandomUtil.randomNumbers(4);CorrelationData correlationData = new CorrelationData(uniqueId);log.info("------生产者发送消息,消息唯一id {},订单编号 {}-------",uniqueId,orderNo);rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",orderNo,correlationData);return "ok";}
下面是消费者的处理逻辑
这里的消息序号是系统自动生成的,还需要注意的是,在手动确认模式下,如果消费者在处理消息时发生了异常或错误的时候
需要确保将该消息重新加入队列或者删除队列之后将该信息保存至数据库中记录下来,否则该消息将被认为已经成功处理并确认。因此,在编写消费者代码时,需要谨慎处理异常情况,避免因为异常而导致消息丢失或重复处理等问题。
/*** 消费者,用于消费队列信息*/
@Component
@Slf4j
public class DirectConsumer {@ResourceRedisService redisService;@RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueuepublic void process(Message message, Channel channel) {// 消息序号long deliveryTag = message.getMessageProperties().getDeliveryTag();//取出消息唯一标识String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");// 取出订单编码String orderNo = new String(message.getBody(), StandardCharsets.UTF_8);log.info("------消费者收到消息,消息唯一id {},订单编号 {}-------",messageId,orderNo);try {//消费者在消费消息之前,先去redis中查看消息状态是否已被消费if (redisService.setCacheMapIfAbsent("rabbit-tag", messageId, Boolean.FALSE)){//删除过期订单.......//消费完消息后,设置key的值为trueredisService.setCacheMapValue("rabbit-tag", messageId, Boolean.TRUE);channel.basicAck(deliveryTag,false);log.info("------订单处理完毕,订单编号 {}--------", orderNo);}else {//如果从redis中获取消息的value是TRUE,表示已消费,直接发送确认信号,避免重复消费if (Boolean.TRUE.equals(redisService.getCacheMapValue("rabbit-tag",messageId))) {/*** TODO 手动确认消息* tag:消息序号* multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除*/channel.basicAck(deliveryTag, false);log.info("--------订单已经被消费过了,订单编号 {}-------", orderNo);}}} catch (Exception e) {e.printStackTrace();try {/*** TODO 消费者消费消息异常,手动否认信息,将消息退回到队列中* tag:消息序号* multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除* requeue:是否要退回到队列*/channel.basicNack(deliveryTag, true, false);redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.FALSE);log.error("------------订单消费失败,已从队列删除.订单编号 {}, 原因 {}--------",orderNo, e.getMessage());} catch (IOException ex) {ex.printStackTrace();}}log.info("------消费者处理完毕-------");}
}相关文章:
RabbitMQ消息可靠性(二)-- 消费者消息确认
一、消费者消息确认是什么? 在这种机制下,消费者在接收到消息后,需要向 RabbitMQ 发送确认信息,告知 RabbitMQ 已经接收到该消息,并已经处理完毕。如果 RabbitMQ 没有接收到确认信息,则会将该消息重新加入…...
【python第7课 实例,类】
文章目录 一、实例1.1实例的变量1.2实例方法1.3 构造方法1.4析构函数1.4预置实例属性: 二,类1.1类变量1.2类方法1.3静态方法1.4类属性的增删改查 一、实例 1.1实例的变量 使用示例 class dog:def __init__(self,k,c,a):self.kinds kself.color csel…...
RocketMQ源码解析(上)
一、ACL权限控制 应用场景: RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说,RocketMQ作为一个内部服务,是不需要进行权限控制的,但是,如果要通过RocketMQ进行跨部门甚至跨公司的合作&…...
Webpack打包CSS文件,解决You may need an appropriate loader to handle this file type报错
在项目文件夹下创建webpack.config.js文件,该文件就是Webpack的配置文件 注意:该文件中遵循Node.js的代码格式规范 ,需要对导出配置文件中的内容 Webpack在默认情况下只能打包js文件,如果我们希望他能够打包其他类型的文件&#…...
轮换对称性
二重积分 普通对称性–D关于 y x yx yx对称: ∬ D f ( x , y ) d σ { 2 ∬ D 1 f ( x , y ) d σ f ( x , y ) f ( y , x ) 0 f ( x , y ) − f ( y , x ) \iint_{D}f(x,y)d\sigma\begin{cases} 2\iint_{D_1}f(x,y)d\sigma\ \ \ \ \ \ f(x,y)f(y,x) \\ 0 \ \…...
【MySQL基础】--- 约束
个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】🎈 本专栏旨在分享学习MySQL的一点学习心得,欢迎大家在评论区讨论💌 目录 一、什么…...
ROS2 的行为树 — 第 1 部分:解锁高级机器人决策和控制
一、说明 在复杂而迷人的机器人世界中,行为树(BT)已成为决策过程中不可或缺的一部分。它们提供了一种结构化、模块化和高效的方法来对机器人的行为进行编程。BT起源于视频游戏行业,用于控制非玩家角色,他们在机器人领域…...
kafka事务的详解
一 kafka事务的机制 1.1 kafka的事务机制 通过事务机制,KAFKA 可以实现对多个 topic 的多个 partition 的原子性的写入,即处于同一个事务内的所有消息,不管最终需要落地到哪个 topic 的哪个 partition, 最终结果都是要么全部写成功…...
Flutter Fair逻辑动态化架构设计与实现
本文的核心内容包括: 数据逻辑处理布局中的逻辑处理Flutter类型数据处理一、数据逻辑处理 我们接触的每一个Flutter界面,大多由布局和逻辑相关的代码组成。如Flutter初始工程的Counting Demo的代码: class _MyHomePageState extends State<MyHomePage> {// 变量 int…...
【每日一题】74. 搜索二维矩阵
74. 搜索二维矩阵 - 力扣(LeetCode) 给你一个满足下述两条属性的 m x n 整数矩阵: 每行中的整数从左到右按非递减顺序排列。每行的第一个整数大于前一行的最后一个整数。 给你一个整数 target ,如果 target 在矩阵中,返…...
软件测试进大厂,拿高薪,怎么做?看这里!
有些同学大学专业不对口,但有想进大厂想拿高薪心,只要你有想法,那就一定有实现的方法。 俗话说:“世间无难事,只怕有心人”。仔细思索一下,哪家大厂能缺软件测试这一重要职位。相对大学所学专业而言&#…...
【读书笔记】基于世界500强的高薪实战Kubernetes课程
第1章 课程简介&&自我介绍 1-1 自我介绍 1-2 课程大纲内容介绍 1-3 课程更新通知 第2章 K8s必备知识-Docker容器基础入门 2-1 课程介绍 2-2 docker容器介绍 2-3 docker优缺点 2-4 安装和配置docker 2-5 修改内核参数 2-6 配置镜像加速器 2-7 配置常用镜像加…...
【Java 基础篇】Java并发包详解
多线程编程是Java开发中一个重要的方面,它能够提高程序的性能和响应能力。然而,多线程编程也伴随着一系列的挑战,如线程安全、死锁、性能问题等。为了解决这些问题,Java提供了一套强大的并发包。本文将详细介绍Java并发包的各个组…...
MYSQL存储引擎基础知识介绍
下面重点介绍几种常用的存储引擎,并对比各个存储引擎之间的区别,以帮助读者理解 不同存储引擎的使用方式。 MyISAM MyISAM是 MySQL的默认存储引擎。MyISAM不支持事务、也不支持外键,其优势是访 问的速度快,对事务完整性没有要求或者以 SEL…...
vue学习之element-ui组件集成
1. element-ui 链接 https://element.eleme.cn/#/zh-CN 2. element-ui 安装 cnpm install element-ui3. 创建项目 https://blog.csdn.net/qq_36940806/article/details/132921688?spm=1001.2014.3001.5502 4. 引入element库 /src/main.js 引入 element-uiimport Vue from…...
如何通过百度SEO优化提升网站排名(掌握基础概念,实现有效优化)
随着互联网的发展,搜索引擎优化(SEO)成为了网站优化中不可或缺的一部分。在中国,百度搜索引擎占据着主导地位,因此掌握百度SEO概念和优化技巧对网站的排名和曝光非常重要。 百度SEO排名的6个有效方法: 首…...
Golang 字符串
目录 1. Golang 字符串1.1. 基础概念1.2. 字符串编码1.3. 遍历字符串1.4. 类型转换1.5. 总结1.6. String Concatenation (字符串连接)1.6.1. Using the operator1.6.2. Using the operator1.6.3. Using the Join method1.6.4. Using Sprintf method1.6.5. Using Go string Bu…...
python应用中使用了multiprocessing多进程,使用pyinstaller打包出来的程序可能产生多个窗口
问题现象 我用pyside(类似pyqt)开发了一个应用程序。直接使用pycharm运行,一切都正常。但当我使用pyinstaller将它打包之后,再去运运行,发现窗口总是产生多个。 问题分析 直接运行没有问题,那么问题肯定…...
数据结构与算法——13.队列的拓展
这篇文章主要讲一下双端队列,优先队列,阻塞队列等队列的拓展内容。 目录 1.队列拓展概述 2.双端队列的链表实现 3.双端队列的数组实现 4.优先队列无序数组实现 5.阻塞队列 6.总结 1.队列拓展概述 首先来看一张图,来大致了解一下他们的…...
机器学习入门教学——损失函数(交叉熵法)
1、前言 我们在训练神经网络时,最常用到的方法就是梯度下降法。在了解梯度下降法前,我们需要了解什么是损失(代价)函数。所谓求的梯度,就是损失函数的梯度。如果不知道什么是梯度下降的,可以看一下这篇文章:机器学习入…...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台
🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...
VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP
编辑-虚拟网络编辑器-更改设置 选择桥接模式,然后找到相应的网卡(可以查看自己本机的网络连接) windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置,选择刚才配置的桥接模式 静态ip设置: 我用的ubuntu24桌…...
MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
