当前位置: 首页 > news >正文

RabbitMQ之顺序消费

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){// 创建一个 单活模式 队列HashMap<String, Object> args=new HashMap<>();args.put("x-single-active-consumer",true);return new Queue(name,true,false,false,args);}

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {@Beanpublic Queue queue15_0() {return creatQueue(Message15.QUEUE_0);}@Beanpublic Queue queue15_1() {return creatQueue(Message15.QUEUE_1);}@Beanpublic Queue queue15_2() {return creatQueue(Message15.QUEUE_2);}@Beanpublic Queue queue15_3() {return creatQueue(Message15.QUEUE_3);}@Beanpublic DirectExchange exchange15() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message15.EXCHANGE, true, false);}@Beanpublic Binding binding15_0() {return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");}@Beanpublic Binding binding15_1() {return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");}@Beanpublic Binding binding15_2() {return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");}@Beanpublic Binding binding15_3() {return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");}/*** 创建一个 单活 模式的队列* 注意 :* <p>* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除** @param name* @return queue*/private Queue creatQueue(String name) {// 创建一个 单活模式 队列HashMap<String, Object> args = new HashMap<>();args.put("x-single-active-consumer", true);return new Queue(name, true, false, false, args);}
=================================》生产者
@Component
@Slf4j
public class Producer15 {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 这里的发送是 拟投递到多个队列中** @param id  业务id* @param msg 业务信息*/public void syncSend(int id, String msg) {Message15 message = new Message15(id, msg);rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);}/*** 根据 id 取余来决定丢到那个队列中去** @param id id* @return routingKey*/private String getRoutingKey(int id) {return String.valueOf(id % Message15.QUEUE_COUNT);}
}
============================》消费者
/*** 要想保证消息的顺序,每个队列只能有一个消费者** @author 深漂码农@明哥* @date 2024-03-18*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {@RabbitHandlerpublic void onMessage(Message15 message) throws InterruptedException {log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);// 这里随机睡一会,模拟业务处理时候的耗时long l = new Random(1000).nextLong();TimeUnit.MILLISECONDS.sleep(l);}
}
==============================》测试类
@Testvoid mock() throws InterruptedException {// 先启动这个测试类,模拟多个副本情况下,看如何消费new CountDownLatch(1).await();}@Testvoid syncSend() throws InterruptedException {// 模拟每个队列中扔 10 个数据,看看效果for (int i = 0; i < 10; i++) {for (int j = 0; j < 4; j++) {producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");}}TimeUnit.SECONDS.sleep(20);}
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

相关文章:

RabbitMQ之顺序消费

什么是顺序消费 例如&#xff1a;业务上产生者发送三条消息&#xff0c; 分别是对同一条数据的增加、修改、删除操作&#xff0c; 如果没有保证顺序消费&#xff0c;执行顺序可能变成删除、修改、增加&#xff0c;这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性&…...

轻松上手的LangChain学习说明书

一、Langchain是什么&#xff1f; 如今各类AI模型层出不穷&#xff0c;百花齐放&#xff0c;大佬们开发的速度永远遥遥领先于学习者的学习速度。。为了解放生产力&#xff0c;不让应用层开发人员受限于各语言模型的生产部署中…LangChain横空出世界。 Langchain可以说是现阶段…...

【论文笔记】Training language models to follow instructions with human feedback A部分

Training language models to follow instructions with human feedback A 部分 回顾一下第一代 GPT-1 &#xff1a; 设计思路是 “海量无标记文本进行无监督预训练少量有标签文本有监督微调” 范式&#xff1b;模型架构是基于 Transformer 的叠加解码器&#xff08;掩码自注意…...

嵌入式交叉编译:x265

下载 multicoreware / x265_git / Downloads — Bitbucket 解压编译 BUILD_DIR${HOME}/build_libs CROSS_NAMEaarch64-mix210-linuxcd build/aarch64-linuxmake cleancmake \-G "Unix Makefiles" \-DCMAKE_C_COMPILER${CROSS_NAME}-gcc \-DCMAKE_CXX_COMPILER${CR…...

一、Redis五种常用数据类型

Redis优势&#xff1a; 1、性能高—基于内存实现数据的存储 2、丰富的数据类型 5种常用&#xff0c;3种高级 3、原子—redis的所有单个操作都是原子性&#xff0c;即要么成功&#xff0c;要么失败。其多个操作也支持采用事务的方式实现原子性。 Redis特点&#xff1a; 1、支持…...

C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等的介绍

文章目录 前言一、为什么存在动态内存管理二、动态内存函数的介绍1. malloc函数2. 内存泄漏3. 动态内存开辟位置4. free函数5. calloc 函数6. realloc 函数7. realloc 传空指针 总结 前言 C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等…...

最近惊爆谷歌裁员

Python团队还没解散完&#xff0c;谷歌又对Flutter、Dart动手了。 什么原因呢&#xff0c;猜测啊。 谷歌裁员Python的具体原因可能是因为公司在进行技术栈的调整和优化。Python作为一种脚本语言&#xff0c;在某些情况下可能无法提供足够的性能或者扩展性&#xff0c;尤其是在…...

音频可视化:原生音频API为前端带来的全新可能!

音频API是一组提供给网页开发者的接口&#xff0c;允许他们直接在浏览器中处理音频内容。这些API使得在不依赖任何外部插件的情况下操作和控制音频成为可能。 Web Audio API 可以进行音频的播放、处理、合成以及分析等操作。借助于这些工具&#xff0c;开发者可以实现自定义的音…...

【中等】保研/考研408机试-动态规划1(01背包、完全背包、多重背包)

背包问题基本上都是模板题&#xff0c;重点&#xff1a;弄熟多重背包模板 dp[j]max(dp[j-v[i]]w[i],dp[j]) //核心思路代码&#xff08;一维数组版&#xff09; dp[i][j]max(dp[i-1][j], dp[i-1][j-v[i]]w[i])//二维数字版 一、 0-1背包 一般输入两个变量&#xff1a;体积&…...

[DEMO]给两个字符串取交集的词语

要求&#xff1a;2个英文字符串中&#xff0c;取相同的大于等于4个字母的词组 比如&#xff1a; 字符串1&#xff1a;" xingMeiLingabcdef WorldHello", 字符串2&#xff1a;"mnjqlup WorldLingLing xingMeiLingHello" 获取交接&#xff1a; [xingMeiLing…...

leetcode53-Maximum Subarray

题目 给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xf…...

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现 目录...

【go项目01_学习记录06】

学习记录 1 使用中间件1.1 测试一下1.2 push代码 2 URI 中的斜杆2.1 StrictSlash2.2 兼容 POST 请求 1 使用中间件 代码中存在重复率很高的代码 w.Header().Set("Content-Type", "text/html; charsetutf-8")统一对响应做处理的&#xff0c;我们可以使用中…...

Vue中Element的下载

打开vscode让项目在终端中打开 输入npm install element-ui2.15.3 然后进行下载 在node_modules中出现element-ui表示下载完成 然后在输入Vue.use(ElementUI); import Vue from vue import App from ./App.vue import router from ./router import ElementUI from element-ui…...

机器人项目相关

机器人项目相关 1. Nvidia 1.1 Jetson 1.1.1 初步安装Riva教程 llamaspeakJetson AGX Orin踩坑记录&#xff08;1&#xff09;安装Riva 参考知乎链接&#xff1a;https://zhuanlan.zhihu.com/p/670007305 1.1.2 NVIDIA Jetson AI Lab 借助 NVIDIA Jetson™ 将生成式 AI…...

Mac升级go版本某种错误情况处理

当看到 "go1.21 is keg-only, which means it was not symlinked into /opt/homebrew" 这样的信息时&#xff0c;意味着Homebrew没有自动为你创建指向新版本Go的符号链接&#xff08;symlink&#xff09;&#xff0c;因为这是一个旧版本Go的替代版本。 Homebrew中的…...

美团KV存储squirrel和Celler学习

文章目录 美团在KV存储squirrel优化和改进在水平方向1、对Gossip协议进行优化 在垂直扩展方面1、forkless RDB数据复制优化2、使用多线程&#xff0c;充分利用机器的多核能力 在高可用方面 美团持久化kv存储celler优化和改进水平扩展优化1、使用bulkload进行数据导入2、线程模型…...

Python学习笔记------处理数据和生成折线图

给定数据&#xff1a; jsonp_1629344292311_69436({"status":0,"msg":"success","data":[{"name":"美国","trend":{"updateDate":["2.22","2.23","2.24",&qu…...

知识图谱与大语言模型的协同(RAG)——MindMap

MindMap : Knowledge Graph Prompting Sparks Graph of Thoughts in Large Language Models 论文地址: https://arxiv.org/abs/2308.09729 代码:https://github.com/wylwilling/MindMap 1.概述 大型语言模型(LLMs)在处理新信息、防止生成幻觉内容、以及增强决策过程透明度…...

奶爸预备 |《P.E.T.父母效能训练:让亲子沟通如此高效而简单:21世纪版》 / 托马斯·戈登——读书笔记

目录 引出致中国读者译序前言第1章 父母总是被指责&#xff0c;而非受训练第2章 父母是人&#xff0c;不是神第3章 如何听&#xff0c;孩子才会说&#xff1a;接纳性语言第4章 让积极倾听发挥作用第5章 如何倾听不会说话的婴幼儿第6章 如何听&#xff0c;孩子才肯听第8章 通过改…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

MySQL中【正则表达式】用法

MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现&#xff08;两者等价&#xff09;&#xff0c;用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例&#xff1a; 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...

九天毕昇深度学习平台 | 如何安装库?

pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子&#xff1a; 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

c++第七天 继承与派生2

这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分&#xff1a;派生类构造函数与析构函数 当创建一个派生类对象时&#xff0c;基类成员是如何初始化的&#xff1f; 1.当派生类对象创建的时候&#xff0c;基类成员的初始化顺序 …...

Android写一个捕获全局异常的工具类

项目开发和实际运行过程中难免会遇到异常发生&#xff0c;系统提供了一个可以捕获全局异常的工具Uncaughtexceptionhandler&#xff0c;它是Thread的子类&#xff08;就是package java.lang;里线程的Thread&#xff09;。本文将利用它将设备信息、报错信息以及错误的发生时间都…...