当前位置: 首页 > article >正文

RabbitMQ 消息顺序性保证

方式一:Consumer设置exclusive

在这里插入图片描述

注意条件

  • 作用于basic.consume
  • 不支持quorum queue
    在这里插入图片描述
    当同时有A、B两个消费者调用basic.consume方法消费,并将exclusive设置为true时,第二个消费者会抛出异常:
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'test' in vhost '/' in exclusive use, class-id=60, method-id=20)at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)at java.base/java.lang.Thread.run(Thread.java:840)

Spring AMQP 如何通过exclusive实现顺序消费:

在这里插入图片描述
核心逻辑

while (!DirectMessageListenerContainer.this.started && isRunning()) {this.cancellationLock.reset();try {for (String queue : queueNames) {consumeFromQueue(queue);}}catch (AmqpConnectException | AmqpIOException e) {long nextBackOff = backOffExecution.nextBackOff();if (nextBackOff < 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {DirectMessageListenerContainer.this.aborted = true;shutdown();this.logger.error("Failed to start container - fatal error or backOffs exhausted",e);this.taskScheduler.schedule(this::stop, Instant.now());break;}this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);doShutdown();try {Thread.sleep(nextBackOff); // NOSONAR}catch (InterruptedException e1) {Thread.currentThread().interrupt();}continue; // initialization failed; try again having rested for backOff-interval}DirectMessageListenerContainer.this.started = true;DirectMessageListenerContainer.this.startedLatch.countDown();
}
  1. 抛出异常后,会重试
  2. 重试间隔、次数受recoveryInterval(默认无限)、recoveryBackOff控制

方式二:single active consumer

在这里插入图片描述

原理:

在这里插入图片描述

代码示例

Channel ch = ...;
Map<String, Object> arguments = newHashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

在这里插入图片描述
参考资料:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams

相关文章:

RabbitMQ 消息顺序性保证

方式一&#xff1a;Consumer设置exclusive 注意条件 作用于basic.consume不支持quorum queue 当同时有A、B两个消费者调用basic.consume方法消费&#xff0c;并将exclusive设置为true时&#xff0c;第二个消费者会抛出异常&#xff1a; com.rabbitmq.client.AlreadyClosedEx…...

DeepSeek R1 简单指南:架构、训练、本地部署和硬件要求

DeepSeek R1 简单指南&#xff1a;架构、训练、本地部署和硬件要求 DeepSeek 的 LLM 推理新方法 DeepSeek 推出了一种创新方法&#xff0c;通过强化学习 (RL) 来提高大型语言模型 (LLM) 的推理能力&#xff0c;其最新论文 DeepSeek-R1 对此进行了详细介绍。这项研究代表了我们…...

1.攻防世界 unserialize3(wakeup()魔术方法、反序列化工作原理)

进入题目页面如下 直接开审 <?php // 定义一个名为 xctf 的类 class xctf {// 声明一个公共属性 $flag&#xff0c;初始值为字符串 111public $flag 111;// 定义一个魔术方法 __wakeup()// 当对象被反序列化时&#xff0c;__wakeup() 方法会自动调用public function __wa…...

麒麟系统编译安装git

有些版本的麒麟系统上没有git&#xff0c;官网又找不到现成的安装包&#xff0c;只好下载编译进行编译安装 1、下载源码 下载源码&#xff0c;地址&#xff1a;https://git-scm.com/downloads/linux。 2、解压 直接鼠标右键解压&#xff0c;或者用命令行&#xff1a; tar …...

Web - CSS3过渡与动画

过渡 基本使用 transition过渡属性是css3浓墨重彩的特性&#xff0c;过渡可以为一个元素在不同样式之间变化自动添加补间动画。 过渡从kIE10开始兼容&#xff0c;移动端兼容良好&#xff0c;网页上的动画特效基本都是由JavaScript定时器实现的&#xff0c;现在逐步改为css3过…...

Git 常见错误与解决方案全指南

&#x1f680; Git 常见错误与解决方案全指南 这份指南涵盖了你在 Git 操作过程中遇到的所有常见错误、问题及其对应的解决方案&#xff0c;确保你在日常开发中能够快速定位问题并高效解决。 &#x1f517; 1. 如何将本地项目上传到 GitHub 仓库&#xff1f; 步骤&#xff1a…...

OpenStack四种创建虚拟机的方式

实例&#xff08;Instances&#xff09;是在云内部运行的虚拟机。您可以从以下来源启动实例&#xff1a; 一、上传到镜像服务的镜像&#xff08;Image&#xff09; 使用已上传到镜像服务的镜像来启动实例。 二、复制到持久化卷的镜像&#xff08;Volume&#xff09; 使用已…...

线上hbase rs 读写请求个数指标重置问题分析

问题描述: 客户想通过调用hbase的jmx接口获取hbase的读写请求个数,以此来分析HBase读写请求每日增量。 但是发现生产,测试多个集群,Hbase服务指标regionserver读写请求个数存在突然下降到0或者大幅度下降情况。 需要排查原因: 某个Region的读写请求数:会发现经常会重置为…...

【R语言】卡方检验

一、定义 卡方检验是用来检验样本观测次数与理论或总体次数之间差异性的推断性统计方法&#xff0c;其原理是比较观测值与理论值之间的差异。两者之间的差异越小&#xff0c;检验的结果越不容易达到显著水平&#xff1b;反之&#xff0c;检验结果越可能达到显著水平。 二、用…...

2025.2.9机器学习笔记:PINN文献阅读

2025.2.9周报 文献阅读题目信息摘要Abstract创新点网络架构实验结论缺点以及后续展望 文献阅读 题目信息 题目&#xff1a; GPT-PINN:Generative Pre-Trained Physics-Informed Neural Networks toward non-intrusive Meta-learning of parametric PDEs期刊&#xff1a; Fini…...

c语言:取绝对值

假设我们有一个 long 类型的变量 l&#xff0c;我们希望恢复其绝对值。以下是两种方法的对比&#xff1a; 方法1&#xff1a;使用条件语句 这个很好理解&#xff0c;负数时取负运算 &#xff0c;用于数值的符号反转。 long abs_value(long l) {if (l < 0) {return -l;} e…...

JVM(Java 虚拟机)

Java语言的解释性和编译性&#xff08;通过JVM 的执行引擎&#xff09; Java 代码&#xff08;.java 文件&#xff09;要先使用 javac 编译器编译为 .class 文件&#xff08;字节码&#xff09;&#xff0c;紧接着再通过JVM 的执行引擎&#xff08;Execution Engine&#xff09…...

利用二分法进行 SQL 盲注

什么是sql注入&#xff1f; SQL 注入&#xff08;SQL Injection&#xff09;是一种常见的 Web 安全漏洞&#xff0c;攻击者可以通过构造恶意 SQL 语句来访问数据库中的敏感信息。在某些情况下&#xff0c;服务器不会直接返回查询结果&#xff0c;而是通过布尔值&#xff08;Tr…...

大模型数据集全面整理:444个数据集下载地址

本文针对Datasets for Large Language Models: A Comprehensive Survey 中的 444 个数据集&#xff08;涵盖8种语言类别和32个领域&#xff09;进行完整下载地址整理收集。 2024-02-28&#xff0c;由杨刘、曹家欢、刘崇宇、丁凯、金连文等作者编写&#xff0c;深入探讨了大型语…...

Ubuntu 下 nginx-1.24.0 源码分析 ngx_tm_t 类型

src\os\unix\ngx_time.h 中 typedef struct tm ngx_tm_t; tm 是 C 标准库中定义的一个结构体&#xff0c;通常用于表示日期和时间的信息。它通常定义在 <time.h> 头文件中 struct tm {int tm_sec; /* 秒&#xff0c;范围 0-59 */int tm_min; /* …...

Linux 创建进程 fork()、vfork() 与进程管理

Linux 创建进程 fork、vfork、进程管理 一、Linux的0号、1号、2号进程二、Linux的进程标识三、fork() 函数1、基本概念2、函数特点3、用法以及应用场景&#xff08;1&#xff09;父子进程执行不同的代码&#xff08;2&#xff09;进程执行另一个程序 4、工作原理 四、vfork() 函…...

2025web寒假作业二

一、整体功能概述 该代码构建了一个简单的后台管理系统界面&#xff0c;主要包含左侧导航栏和右侧内容区域。左侧导航栏有 logo、管理员头像、导航菜单和安全退出按钮&#xff1b;右侧内容区域包括页头、用户信息管理内容&#xff08;含搜索框和用户数据表格&#xff09;以及页…...

鸿蒙NEXT API使用指导之文件压缩和邮件创建

鸿蒙NEXT API 使用指导 一、前言二、邮件创建1、拉起垂类应用2、 UIAbilityContext.startAbilityByType 原型2.1、wantParam2.2、abilityStartCallback 与 callback 3、拉起邮箱类应用3.1、单纯拉起邮箱应用3.2、传入带附件的邮件 三、压缩文件1、认识 zlib2、压缩处理2.1、单文…...

javaEE-10.CSS入门

目录 一.什么是CSS ​编辑二.语法规则: 三.使用方式 1.行内样式: 2.内部样式: 3.外部样式: 空格规范 : 四.CSS选择器类型 1.标签选择器 2.类选择器 3.ID选择器 4.通配符选择器 5.复合选择器 五.常用的CSS样式 1.color:设置字体颜色 2.font-size:设置字体大小 3…...

Spring Boot牵手Redisson:分布式锁实战秘籍

一、引言 在当今的分布式系统架构中,随着业务规模的不断扩大和系统复杂度的日益增加,如何确保多个服务节点之间的数据一致性和操作的原子性成为了一个至关重要的问题。在单机环境下,我们可以轻松地使用线程锁或进程锁来控制对共享资源的访问,但在分布式系统中,由于各个服务…...

制药行业 BI 可视化数据分析方案

一、行业背景 随着医药行业数字化转型的深入&#xff0c;企业积累了海量的数据&#xff0c;包括销售数据、生产数据、研发数据、市场数据等。如何利用这些数据&#xff0c;挖掘其价值&#xff0c;为企业决策提供支持&#xff0c;成为医药企业面临的重大挑战。在当今竞争激烈的…...

[学习笔记] Kotlin Compose-Multiplatform

Compose-Multiplatform 原文&#xff1a;https://github.com/zimoyin/StudyNotes-master/blob/master/compose-multiplatform/compose.md Compose Multiplatform 是 JetBrains 为桌面平台&#xff08;macOS&#xff0c;Linux&#xff0c;Windows&#xff09;和Web编写Kotlin UI…...

ubutun系统常用配置

目录 1. 更新系统 2. 安装 vim 文本编辑器 3. 扩展文件系统 4. 设置静态IP地址&#xff08;可选&#xff09; 5. 安装图形驱动 6. 安装常用软件 7. 调整启动项 8. 清理系统 9. 配置SSH 10. 安装VNC服务器&#xff08;可选&#xff09; 11. 安装桌面环境&#xff08;…...

PHP函数介绍—get_headers(): 获取URL的响应头信息

概述&#xff1a;在PHP开发中&#xff0c;我们经常需要获取网页或远程资源的响应头信息。PHP函数get_headers()能够方便地获取目标URL的响应头信息&#xff0c;并以数组形式返回。本文将介绍get_headers()函数的用法&#xff0c;以及提供一些相关的代码示例。 get_headers()函…...

web前端录制canvas视频和video的声音,并合并成一个文件进行下载

一、captureStream ‌captureStream‌是一个Web API方法&#xff0c;用于捕获指定元素的媒体流。该方法通常用于从<video>、<audio>或<canvas>元素中捕获实时视频流或音频流&#xff0c;以便进行进一步的处理&#xff0c;如直播、录制或分析‌。 captureStr…...

Golang 并发机制-7:sync.Once实战应用指南

Go的并发模型是其突出的特性之一&#xff0c;但强大的功能也带来了巨大的责任。sync.Once是由Go的sync包提供的同步原语。它的目的是确保一段代码只执行一次&#xff0c;而不管有多少协程试图执行它。这听起来可能很简单&#xff0c;但它改变了并发环境中管理一次性操作的规则。…...

【AI实践】Cursor上手-跑通Hello World和时间管理功能

背景 学习目的&#xff1a;熟悉Cursor使用环境&#xff0c;跑通基本开发链路。 本人背景&#xff1a;安卓开发不熟悉&#xff0c;了解科技软硬件常识 实践 基础操作 1&#xff0c;下载安装安卓Android Studio 创建一个empty project 工程&#xff0c;名称为helloworld 2&am…...

深度学习 视频推荐

以下为你呈现一个基于深度学习实现视频推荐的简化代码示例。这里我们使用的是协同过滤思想结合神经网络的方式,借助 TensorFlow 和 Keras 库来构建模型。在这个示例中,假设已有用户对视频的评分数据,目标是预测用户对未评分视频的评分,进而为用户推荐可能感兴趣的视频。 1…...

缓存组件<keep-alive>

缓存组件<keep-alive> 1.组件作用 组件, 默认会缓存内部的所有组件实例&#xff0c;当组件需要缓存时首先考虑使用此组件。 2.使用场景 场景1&#xff1a;tab切换时&#xff0c;对应的组件保持原状态&#xff0c;使用keep-alive组件 使用&#xff1a;KeepAlive | Vu…...

SpringBoot单机模式的极限是什么?为什么会引入分布式?

Spring Boot 单机模式的极限 Spring Boot 单机模式的极限主要体现在以下几个方面&#xff1a; 硬件资源限制&#xff1a; CPU&#xff1a;单机性能受限于 CPU 核心数和主频&#xff0c;无法无限扩展。内存&#xff1a;内存容量有限&#xff0c;无法应对大规模数据处理或高并发…...