当前位置: 首页 > news >正文

Spring AMQP-保证消费者消息的可靠性

为什么要保证消息的可靠性?

当MQ向消费者发送一个消息之后需要得到消费者的状态,因为消息并不一定就真的被消费者给消费了,可能在消费的过程中出现了一些意外,比如

1. 网络问题

2. 消息转换有问题

3. 消费者本身的业务处理有问题


消费者确认机制

消费者消息处理状态:

  • ack:消息成功接收,并且成功被处理,MQ将此消息删除
  • nack:消息处理失败,需要MQ重新发送消息
  • reject:消息处理失败并且拒绝该消息,MQ将此消息删除

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用

  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活

  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:

    • 如果是业务异常,会自动返回nack

    • 如果是消息处理或校验异常,自动返回reject;


在配置文件中通过下面的配置即可设置ACK的处理方式

spring:rabbitmq:listener:simple:acknowledge-mode: c # none 默认 auto 自动确认 manual 手动确认 

消费者重试机制

消费者接收了一个消息,但是在处理的过程中出现异常了,那么AMQP会不断的重试,直到把资源占完然后崩掉,这个时候就必须要设置重试机制,限制重试的次数,避免无限制重试。

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

测试

先别配置重试机制,然后在需要在接收消息的地方手动抛出一个异常,查看控制台就会看见消费者在尝试不断的获取消息,但是一直获取不到无限制的重试

  @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "fanout.hamll.query1", // 队列名称durable = "true"), // 是否持久exchange = @Exchange(name = "fanout.hamll", type = ExchangeTypes.FANOUT) // 交换机名称))public void query2(String message) {System.err.println("fanout.hamll.query1 消息内容:" + message); throw  new RuntimeException("故意的错误"); // 抛出异常}

配置好重试之后到了三次就会直接停止,这样子就很好的减少了系统资源的消耗


业务的幂等性判断

什么是幂等性?

在Java领域,幂等性是指同一个请求,不管发送多少次执行的结构都是一样的。

比如支付和交易,支付成功之后通知交易服务修改状态。在交易服务需要查询订单并判断订单的状态,这样子不管同一个订单重复发起多少次请求,都不会对业务的结果造成影响。


MQ保证消息的幂等性

MQ中的幂等是说,不管消息是否被重复消费,都不会对业务造成影响、处理的结果都是一致的。


MQ实现业务幂等性

为每个消息都创建一个唯一的MessageId在操作的时候将其存入数据库,然后在进行判断消息是否存在,存在就直接跳过业务的处理,不存在就继续操作。

    @Beanpublic MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

业务实现幂等性

在业务的操作中,比如支付和交易服务,支付成功之后会通知交易服务修改订单的状态,而在交易服务应该做判断,判断该订单的状态是否未未支付。如果是未支付就继续处理接下来的业务,否则就直接结束。

package com.hmall.trade.listener;import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class PayStatusListener {@AutowiredIOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "trade.pay.success.queue", durable = "true"),exchange = @Exchange(value = "pay.direct", type = ExchangeTypes.DIRECT),key = "pay.success"))public void paySuccess(Long orderId) {log.info("支付成功,订单号:{}", orderId);//查询当前订单 判断幂等性Order order = orderService.getById(orderId);//判断状态以及对象是否存在if (order == null || order.getStatus() != 1) {return;}orderService.markOrderPaySuccess(orderId);}
}

相关文章:

Spring AMQP-保证消费者消息的可靠性

为什么要保证消息的可靠性? 当MQ向消费者发送一个消息之后需要得到消费者的状态,因为消息并不一定就真的被消费者给消费了,可能在消费的过程中出现了一些意外,比如 1. 网络问题 2. 消息转换有问题 3. 消费者本身的业务处理有问题 …...

Linux(Centos 7.6)命令详解:mkdir

1.命令作用 如果目录还不存在,则创建目录(Create the DIRECTORY, if they do not already exist.) 2.命令语法 Usage: mkdir [OPTION]... DIRECTORY... 3.参数详解 OPTION: -m, --modeMODE,创建新目录同时设置权限模式-p, --parents,创…...

在K8S上部署OceanBase的最佳实践

在K8S上部署OceanBase的最佳实践 目录 1. 背景与选型 1.1 为什么选择OB1.2 为什么选择ob-operator实现OB on K8S 2. 部署实操 2.1 环境准备2.2 安装 ob-operator2.3 配置 OB 集群2.4 配置 OBProxy 集群2.5 Headless Service 和 CoreDNS 配置2.6 监控与运维 2.6.1 Promethues部…...

IDEA中Maven依赖包导入失败报红的潜在原因

在上网试了别人的八个问题总结之后依然没有解决&#xff1a; IDEA中Maven依赖包导入失败报红问题总结最有效8种解决方案_idea导入依赖还是报红-CSDN博客https://blog.csdn.net/qq_43705131/article/details/106165960 江郎才尽之后突然想到一个原因&#xff1a;<dep…...

【计算机网络】课程 实验五 静态路由配置

实验五 静态路由配置 一、实验目的 理解静态路由的工作原理&#xff0c;掌握如何配置静态路由。 二、实验分析与设计 【背景描述】 假设校园网分为 2 个区域&#xff0c;每个区域内使用 1 台路由器连接 2 个子网&#xff0c; 现要在路由器上 做适当配置&#xff0c;实现校…...

基于单片机的数字气压计设计

摘要:在嵌入式技术快速发展过程中&#xff0c;智能测量仪器被广泛应用于工业生产以及人们日常生活领域。数字气压计在实际应用中&#xff0c;利用气压传感器检测环境中的压力大小&#xff0c;便于实现对设备进行智能化的控制操作。数字气压计在气象监测、矿产开采、科学实验等环…...

【Docker项目实战】使用Docker部署Typemill轻量级平面文件CMS

【Docker项目实战】使用Docker部署Typemill轻量级平面文件CMS 一、Typemill介绍1.1 Typemill简介1.2 主要特点1.3 主要使用场景二、本次实践规划2.1 本地环境规划2.2 本次实践介绍三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本四、下载…...

react ts 定义基本类型,组件通过ref调用时类型提示

记录&#xff0c;以防忘记 子组件 import React, { forwardRef, Ref, useImperativeHandle, useState } from react;// 类型定义方式1 interface IProps {/**参数1 */params1: number | string | undefined/**参数2 */params2: number | string | undefined/**方法 */openDia…...

二十三种设计模式-原型模式

原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;它通过拷贝现有的实例来创建新的实例&#xff0c;而不是通过新建实例。这种方式可以避免复杂的构造过程&#xff0c;同时还能保持对象的创建和使用分离&#xff0c;提高系统的灵活性和扩展性…...

提升汽车金融租赁系统的效率与风险管理策略探讨

内容概要 在汽车金融租赁系统这个复杂的生态中&#xff0c;提升整体效率是每个企业都渴望达成的目标。首先&#xff0c;优化业务流程是实现高效运行的基础。通过分析目前的流程&#xff0c;找出冗余环节并进行简化&#xff0c;能够帮助企业缩短审批时间&#xff0c;提高客户满…...

Spring Framework 5.3.x源码构建 (jdk-1.8, gradle 7.5.1, idea2024.3)

1、下载jdk安装并配置环境变量&#xff08;自行百度&#xff09; https://www.oracle.com/java/technologies/downloads/#java8 2、下载spring-framework源码&#xff0c;切换分支到5.3.x https://github.com/spring-projects/spring-framework.git 备用地址 https://gitco…...

leetcode 2241. 设计一个 ATM 机器 中等

一个 ATM 机器&#xff0c;存有 5 种面值的钞票&#xff1a;20 &#xff0c;50 &#xff0c;100 &#xff0c;200 和 500 美元。初始时&#xff0c;ATM 机是空的。用户可以用它存或者取任意数目的钱。 取款时&#xff0c;机器会优先取 较大 数额的钱。 比方说&#xff0c;你想…...

IO模型与NIO基础

File类 File类主要是JAVA为文件这块的操作(如删除、新建等)而设计的相关类File类的包名是java.io&#xff0c;其实现了Serializable, Comparable两大接口以便于其对象可序列化和比较 创建一个文件/文件夹 删除文件/文件夹 获取文件/文件夹 判断文件/文件夹是否存在 对文件夹进…...

上门按摩系统架构与功能分析

一、系统架构 服务端&#xff1a;Java&#xff08;最低JDK1.8&#xff0c;支持JDK11以及JDK17&#xff09;数据库&#xff1a;MySQL数据库&#xff08;标配5.7版本&#xff0c;支持MySQL8&#xff09;ORM框架&#xff1a;Mybatis&#xff08;集成通用tk-mapper&#xff0c;支持…...

ubuntu安装ssh9.2

删除旧版本&#xff1a; dpkg --list|grep ssh apt remove sshwget https://ftp.openbsd.org/pub/OpenBSD/OpenSSH/portable/openssh-9.2p1.tar.gz tar xzvf openssh-9.2p1.tar.gz cd openssh-9.2p1 #下载依赖#开始编译安装 ./configure && make && make inst…...

linux wsl配置 redis远程连接

✅ 1. 修改 Redis 配置文件 在 WSL 的 Redis 配置文件中&#xff0c;找到 redis.conf 或 /etc/redis/redis.conf 文件&#xff0c;编辑以下配置项&#xff1a; ➡️ 更新 bind 配置项 将 bind 127.0.0.1 ::1 修改为&#xff1a; bind 0.0.0.0这样&#xff0c;Redis 将监听所…...

JVM 优化指南

JVM 优化指南 1. JVM 参数配置 1.1 基础参数配置 设置堆内存大小 -Xms2048m -Xmx2048m 设置新生代大小 -Xmn1024m 设置元空间大小 -XX:MetaspaceSize256m -XX:MaxMetaspaceSize256m 设置线程栈大小 -Xss512k1.2 垃圾回收器配置 使用 G1 垃圾回收器 -XX:UseG1GC 设置期望停顿…...

关机重启后,GitLab服务异常

整理机房,关闭了所有主机重新上架。 上架后开机,所有主机硬件启动正常。 其中一台GitLab服务器启动正常,使用gitlab-ctl status查看服务业正常。 但使用web登陆却失败,如下图: 反复测试,发现无论使用正确密码还是错误密码都是同样的提示。很大可能是数据库的问题。 使…...

谷粒商城-高级篇完结-Sleuth+Zipkin 服务链路追踪

1、基本概念和整合 1.1、为什么用 微服务架构是一个分布式架构&#xff0c;它按业务划分服务单元&#xff0c;一个分布式系统往往有很多个服务单元。由于服务单元数量众多&#xff0c;业务的复杂性&#xff0c;如果出现了错误和异常&#xff0c;很难去定位 。主要体现在&#…...

C语言基本知识复习浓缩版:标识符、函数、进制、数据类型

C语言基本知识复习浓缩版&#xff1a;标识符、函数、进制、数据类型 【c语言期末复习3小时速成【完整全集】期末速成含考试题c语言期末速成突击复习C语言补考C语言期末大一】 B站看到的复习C语言视频&#xff0c;感觉非常棒&#xff0c;就跟着进行了一下学习。众所周知&#…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

超短脉冲激光自聚焦效应

前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…...

macOS多出来了:Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用

文章目录 问题现象问题原因解决办法 问题现象 macOS启动台&#xff08;Launchpad&#xff09;多出来了&#xff1a;Google云端硬盘、YouTube、表格、幻灯片、Gmail、Google文档等应用。 问题原因 很明显&#xff0c;都是Google家的办公全家桶。这些应用并不是通过独立安装的…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

ios苹果系统,js 滑动屏幕、锚定无效

现象&#xff1a;window.addEventListener监听touch无效&#xff0c;划不动屏幕&#xff0c;但是代码逻辑都有执行到。 scrollIntoView也无效。 原因&#xff1a;这是因为 iOS 的触摸事件处理机制和 touch-action: none 的设置有关。ios有太多得交互动作&#xff0c;从而会影响…...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)

本期内容并不是很难&#xff0c;相信大家会学的很愉快&#xff0c;当然对于有后端基础的朋友来说&#xff0c;本期内容更加容易了解&#xff0c;当然没有基础的也别担心&#xff0c;本期内容会详细解释有关内容 本期用到的软件&#xff1a;yakit&#xff08;因为经过之前好多期…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》

这段 Python 代码是一个完整的 知识库数据库操作模块&#xff0c;用于对本地知识库系统中的知识库进行增删改查&#xff08;CRUD&#xff09;操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 &#x1f4d8; 一、整体功能概述 该模块…...