RabbitMQ之顺序消费
什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。
不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,
虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。
如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现
private Queue creatQueue(String name){// 创建一个 单活模式 队列HashMap<String, Object> args=new HashMap<>();args.put("x-single-active-consumer",true);return new Queue(name,true,false,false,args);}
创建之后,我们可以在控制台看到 消费者的激活状态

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {@Beanpublic Queue queue15_0() {return creatQueue(Message15.QUEUE_0);}@Beanpublic Queue queue15_1() {return creatQueue(Message15.QUEUE_1);}@Beanpublic Queue queue15_2() {return creatQueue(Message15.QUEUE_2);}@Beanpublic Queue queue15_3() {return creatQueue(Message15.QUEUE_3);}@Beanpublic DirectExchange exchange15() {// name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它return new DirectExchange(Message15.EXCHANGE, true, false);}@Beanpublic Binding binding15_0() {return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");}@Beanpublic Binding binding15_1() {return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");}@Beanpublic Binding binding15_2() {return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");}@Beanpublic Binding binding15_3() {return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");}/*** 创建一个 单活 模式的队列* 注意 :* <p>* 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除** @param name* @return queue*/private Queue creatQueue(String name) {// 创建一个 单活模式 队列HashMap<String, Object> args = new HashMap<>();args.put("x-single-active-consumer", true);return new Queue(name, true, false, false, args);}
=================================》生产者
@Component
@Slf4j
public class Producer15 {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 这里的发送是 拟投递到多个队列中** @param id 业务id* @param msg 业务信息*/public void syncSend(int id, String msg) {Message15 message = new Message15(id, msg);rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);}/*** 根据 id 取余来决定丢到那个队列中去** @param id id* @return routingKey*/private String getRoutingKey(int id) {return String.valueOf(id % Message15.QUEUE_COUNT);}
}
============================》消费者
/*** 要想保证消息的顺序,每个队列只能有一个消费者** @author 深漂码农@明哥* @date 2024-03-18*/
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {@RabbitHandlerpublic void onMessage(Message15 message) throws InterruptedException {log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);// 这里随机睡一会,模拟业务处理时候的耗时long l = new Random(1000).nextLong();TimeUnit.MILLISECONDS.sleep(l);}
}
==============================》测试类
@Testvoid mock() throws InterruptedException {// 先启动这个测试类,模拟多个副本情况下,看如何消费new CountDownLatch(1).await();}@Testvoid syncSend() throws InterruptedException {// 模拟每个队列中扔 10 个数据,看看效果for (int i = 0; i < 10; i++) {for (int j = 0; j < 4; j++) {producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");}}TimeUnit.SECONDS.sleep(20);}
}
ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。

相关文章:
RabbitMQ之顺序消费
什么是顺序消费 例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。 如何保证顺序性 一般我们讨论如何保证消息的顺序性&…...
轻松上手的LangChain学习说明书
一、Langchain是什么? 如今各类AI模型层出不穷,百花齐放,大佬们开发的速度永远遥遥领先于学习者的学习速度。。为了解放生产力,不让应用层开发人员受限于各语言模型的生产部署中…LangChain横空出世界。 Langchain可以说是现阶段…...
【论文笔记】Training language models to follow instructions with human feedback A部分
Training language models to follow instructions with human feedback A 部分 回顾一下第一代 GPT-1 : 设计思路是 “海量无标记文本进行无监督预训练少量有标签文本有监督微调” 范式;模型架构是基于 Transformer 的叠加解码器(掩码自注意…...
嵌入式交叉编译:x265
下载 multicoreware / x265_git / Downloads — Bitbucket 解压编译 BUILD_DIR${HOME}/build_libs CROSS_NAMEaarch64-mix210-linuxcd build/aarch64-linuxmake cleancmake \-G "Unix Makefiles" \-DCMAKE_C_COMPILER${CROSS_NAME}-gcc \-DCMAKE_CXX_COMPILER${CR…...
一、Redis五种常用数据类型
Redis优势: 1、性能高—基于内存实现数据的存储 2、丰富的数据类型 5种常用,3种高级 3、原子—redis的所有单个操作都是原子性,即要么成功,要么失败。其多个操作也支持采用事务的方式实现原子性。 Redis特点: 1、支持…...
C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等的介绍
文章目录 前言一、为什么存在动态内存管理二、动态内存函数的介绍1. malloc函数2. 内存泄漏3. 动态内存开辟位置4. free函数5. calloc 函数6. realloc 函数7. realloc 传空指针 总结 前言 C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等…...
最近惊爆谷歌裁员
Python团队还没解散完,谷歌又对Flutter、Dart动手了。 什么原因呢,猜测啊。 谷歌裁员Python的具体原因可能是因为公司在进行技术栈的调整和优化。Python作为一种脚本语言,在某些情况下可能无法提供足够的性能或者扩展性,尤其是在…...
音频可视化:原生音频API为前端带来的全新可能!
音频API是一组提供给网页开发者的接口,允许他们直接在浏览器中处理音频内容。这些API使得在不依赖任何外部插件的情况下操作和控制音频成为可能。 Web Audio API 可以进行音频的播放、处理、合成以及分析等操作。借助于这些工具,开发者可以实现自定义的音…...
【中等】保研/考研408机试-动态规划1(01背包、完全背包、多重背包)
背包问题基本上都是模板题,重点:弄熟多重背包模板 dp[j]max(dp[j-v[i]]w[i],dp[j]) //核心思路代码(一维数组版) dp[i][j]max(dp[i-1][j], dp[i-1][j-v[i]]w[i])//二维数字版 一、 0-1背包 一般输入两个变量:体积&…...
[DEMO]给两个字符串取交集的词语
要求:2个英文字符串中,取相同的大于等于4个字母的词组 比如: 字符串1:" xingMeiLingabcdef WorldHello", 字符串2:"mnjqlup WorldLingLing xingMeiLingHello" 获取交接: [xingMeiLing…...
leetcode53-Maximum Subarray
题目 给你一个整数数组 nums ,请你找出一个具有最大和的连续子数组(子数组最少包含一个元素),返回其最大和。 子数组 是数组中的一个连续部分。 示例 1: 输入:nums [-2,1,-3,4,-1,2,1,-5,4] 输出…...
Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现
Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单人脸检测/识别实战案例 之七 简单进行人脸检测并添加面具特效实现 目录...
【go项目01_学习记录06】
学习记录 1 使用中间件1.1 测试一下1.2 push代码 2 URI 中的斜杆2.1 StrictSlash2.2 兼容 POST 请求 1 使用中间件 代码中存在重复率很高的代码 w.Header().Set("Content-Type", "text/html; charsetutf-8")统一对响应做处理的,我们可以使用中…...
Vue中Element的下载
打开vscode让项目在终端中打开 输入npm install element-ui2.15.3 然后进行下载 在node_modules中出现element-ui表示下载完成 然后在输入Vue.use(ElementUI); import Vue from vue import App from ./App.vue import router from ./router import ElementUI from element-ui…...
机器人项目相关
机器人项目相关 1. Nvidia 1.1 Jetson 1.1.1 初步安装Riva教程 llamaspeakJetson AGX Orin踩坑记录(1)安装Riva 参考知乎链接:https://zhuanlan.zhihu.com/p/670007305 1.1.2 NVIDIA Jetson AI Lab 借助 NVIDIA Jetson™ 将生成式 AI…...
Mac升级go版本某种错误情况处理
当看到 "go1.21 is keg-only, which means it was not symlinked into /opt/homebrew" 这样的信息时,意味着Homebrew没有自动为你创建指向新版本Go的符号链接(symlink),因为这是一个旧版本Go的替代版本。 Homebrew中的…...
美团KV存储squirrel和Celler学习
文章目录 美团在KV存储squirrel优化和改进在水平方向1、对Gossip协议进行优化 在垂直扩展方面1、forkless RDB数据复制优化2、使用多线程,充分利用机器的多核能力 在高可用方面 美团持久化kv存储celler优化和改进水平扩展优化1、使用bulkload进行数据导入2、线程模型…...
Python学习笔记------处理数据和生成折线图
给定数据: jsonp_1629344292311_69436({"status":0,"msg":"success","data":[{"name":"美国","trend":{"updateDate":["2.22","2.23","2.24",&qu…...
知识图谱与大语言模型的协同(RAG)——MindMap
MindMap : Knowledge Graph Prompting Sparks Graph of Thoughts in Large Language Models 论文地址: https://arxiv.org/abs/2308.09729 代码:https://github.com/wylwilling/MindMap 1.概述 大型语言模型(LLMs)在处理新信息、防止生成幻觉内容、以及增强决策过程透明度…...
奶爸预备 |《P.E.T.父母效能训练:让亲子沟通如此高效而简单:21世纪版》 / 托马斯·戈登——读书笔记
目录 引出致中国读者译序前言第1章 父母总是被指责,而非受训练第2章 父母是人,不是神第3章 如何听,孩子才会说:接纳性语言第4章 让积极倾听发挥作用第5章 如何倾听不会说话的婴幼儿第6章 如何听,孩子才肯听第8章 通过改…...
macOS风格光标主题:从视觉革新到交互未来的全面探索
macOS风格光标主题:从视觉革新到交互未来的全面探索 【免费下载链接】apple_cursor Free & Open source macOS Cursors. 项目地址: https://gitcode.com/gh_mirrors/ap/apple_cursor 价值解析:重新定义数字交互的视觉语言 在当今多设备协同的…...
Redis管理效率革命:AnotherRedisDesktopManager实战指南
Redis管理效率革命:AnotherRedisDesktopManager实战指南 【免费下载链接】AnotherRedisDesktopManager qishibo/AnotherRedisDesktopManager: Another Redis Desktop Manager 是一款跨平台的Redis桌面管理工具,提供图形用户界面,支持连接到Re…...
实验结果与分析篇 | 本科/硕士必备,一文搞定实验结果与分析部分!基于改进 ConvNeXt 的农作物病虫害识别系统
前言 “代码跑通了,论文怎么写?”,这恐怕是无数 CV 算法/人工智能萌新在面对毕设或期刊投稿时最大的痛。纯缝合模型容易被拒(看你写作能力了),实验分析写成了干巴巴的报流水账,缺乏深度的理论支…...
Quartus中生成与烧录FPGA板载Flash的jic文件全流程解析
1. 为什么需要jic文件? 刚接触FPGA开发的朋友可能会疑惑:为什么编译生成的sof文件不能直接烧录到Flash?这个问题要从FPGA的特性说起。FPGA芯片内部是基于SRAM结构的,这意味着每次断电后配置数据都会丢失。想象一下你正在用电脑写文…...
效果实测:nli-distilroberta-base处理长文本与跨语言推理能力
效果实测:nli-distilroberta-base处理长文本与跨语言推理能力 1. 模型核心能力概览 nli-distilroberta-base作为轻量级自然语言推理模型,在文本理解任务中展现出独特优势。这个基于RoBERTa架构的蒸馏版本,保留了原模型90%以上的性能&#x…...
Canvas动画实战:用requestAnimationFrame打造会飘动的云朵与彩虹
1. Canvas动画基础入门 第一次接触Canvas动画时,我被它强大的绘图能力惊艳到了。记得当时为了做一个简单的太阳升起动画,硬是用setInterval写了上百行代码,结果动画卡得像幻灯片一样。后来才发现,原来浏览器早就为我们准备了更专业…...
清音刻墨镜像免配置亮点:内置10+中文领域词典(医疗/法律/IT)开箱即用
清音刻墨镜像免配置亮点:内置10中文领域词典(医疗/法律/IT)开箱即用 1. 为什么字幕对齐需要专业词典? 做视频字幕的朋友都知道,最头疼的不是生成文字,而是让文字和声音完美对齐。普通字幕工具遇到专业术语…...
为什么你的Ping总是丢包?这7个隐藏原因90%的人都忽略了(含Wireshark分析技巧)
为什么你的Ping总是丢包?这7个隐藏原因90%的人都忽略了(含Wireshark分析技巧) 在网络运维的日常工作中,Ping命令就像网络工程师的听诊器,简单却至关重要。但当你发现Ping测试频繁丢包时,问题往往不像表面看…...
告别频繁输密码!域环境下Windows软件静默安装的两种野路子(慎用)
告别频繁输密码!域环境下Windows软件静默安装的两种野路子(慎用) 在中小企业IT运维的日常中,软件批量部署和远程协助安装堪称两大高频痛点。想象这样的场景:财务部急需更新报税软件,二十台电脑需要同时处理…...
OpenClaw文件处理自动化:nanobot轻量模型实战案例
OpenClaw文件处理自动化:nanobot轻量模型实战案例 1. 为什么选择nanobot处理文件自动化 作为一个长期被各种文件整理工作困扰的技术写作者,我一直在寻找一个既轻量又智能的自动化解决方案。直到遇到OpenClaw框架下的nanobot镜像,这个内置Qw…...
