Framework——【MessageQueue】消息队列
定义
队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是 Apache RocketMQ 消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。
队列的主要作用如下:
- 存储顺序性 队列天然具备顺序性,即消息按照进入队列的顺序写入存储,同一队列间的消息天然存在顺序关系,队列头部为最早写入的消息,队列尾部为最新写入的消息。消息在队列中的位置和消息之间的顺序通过位点(Offset)进行标记管理。
- 流式操作语义 Apache RocketMQ 基于队列的存储模型可确保消息从任意位点读取任意数量的消息,以此实现类似聚合读取、回溯读取等特性,这些特性是RabbitMQ、ActiveMQ等非队列存储模型不具备的。
模型关系
在整个 Apache RocketMQ 的领域模型中,队列所处的流程和位置如下:
Apache RocketMQ 默认提供消息可靠存储机制,所有发送成功的消息都被持久化存储到队列中,配合生产者和消费者客户端的调用可实现至少投递一次的可靠性语义。
Apache RocketMQ 队列模型和Kafka的分区(Partition)模型类似。在 Apache RocketMQ 消息收发模型中,队列属于主题的一部分,虽然所有的消息资源以主题粒度管理,但实际的操作实现是面向队列。例如,生产者指定某个主题,向主题内发送消息,但实际消息发送到该主题下的某个队列中。
Apache RocketMQ 中通过修改队列数量,以此实现横向的水平扩容和缩容。
内部属性
读写权限
- 定义:当前队列是否可以读写数据。
- 取值:由服务端定义,枚举值如下
- 6:读写状态,当前队列允许读取消息和写入消息。
- 4:只读状态,当前队列只允许读取消息,不允许写入消息。
- 2:只写状态,当前队列只允许写入消息,不允许读取消息。
- 0:不可读写状态,当前队列不允许读取消息和写入消息。
- 约束:队列的读写权限属于运维侧操作,不建议频繁修改。
行为约束
每个主题下会由一到多个队列来存储消息,每个主题对应的队列数与消息类型以及实例所处地域(Region)相关,队列数暂不支持修改。
版本兼容性
队列的名称属性在 Apache RocketMQ 服务端的不同版本中有如下差异:
- 服务端3.x/4.x版本:队列名称由{主题名称}+{BrokerID}+{QueueID}三元组组成,和物理节点绑定。
- 服务端5.x版本:队列名称为一个集群分配的全局唯一的字符串组成,和物理节点解耦。
因此,在开发过程中,建议不要对队列名称做任何假设和绑定。如果您在代码中自定义拼接队列名称并和其他操作进行绑定,一旦服务端版本升级,可能会出现队列名称无法解析的兼容性问题。
使用建议
按照实际业务消耗设置队列数
Apache RocketMQ 的队列数可在创建主题或变更主题时设置修改,队列数量的设置应遵循少用够用原则,避免随意增加队列数量。
主题内队列数过多可能对导致如下问题:
- 集群元数据膨胀 Apache RocketMQ 会以队列粒度采集指标和监控数据,队列过多容易造成管控元数据膨胀。
- 客户端压力过大 Apache RocketMQ 的消息读写都是针对队列进行操作,队列过多容易产生空轮询请求,增加系统负荷。
常见队列增加场景
- 需要增加队列实现物理节点负载均衡 Apache RocketMQ 每个主题的多个队列可以分布在不同的服务节点上,在集群水平扩容增加节点后,为了保证集群流量的负载均衡,建议在新的服务节点上新增队列,或将旧的队列迁移到新的服务节点上。
- 需要增加队列实现顺序消息性能扩展 在 Apache RocketMQ 服务端4.x版本中,顺序消息的顺序性在队列内生效的,因此顺序消息的并发度会在一定程度上受队列数量的影响,因此建议仅在系统性能瓶颈时再增加队列。
消息队列相关概念
- 生产者(Producer): 负责产生消息;
- 消费者(Consumer): 负责消费消息;
- 消息(Message): 在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象;
- 消息队列(Message Queue): 一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
- 消息代理(Message Broker): 负责存储/转发消息,转发分为推和拉两种。
- 拉是指Consumer主动从Message Broker获取消息;
- 推是指Message Broker主动将Consumer感兴趣的消息推送给Consumer。
消息队列的消费场景
消息至多被消费一次
该场景是最容易满足的,特点是整个消息队列吞吐量大,实现简单。适合能容忍丢消息,消息重复消费的任务。
a)Producer发送消息到Message Broker阶段:
Producer发消息给Message Broker,不要求Message Broker对接收到的消息响应确认,Producer也不用关心Message Broker是否收到消息了。
b)Message Broker存储/转发阶段:
对Message Broker的存储不要求持久性,转发消息时也不用关心Consumer是否真的收到了。
c)Consumer消费阶段:
Consumer从Message Broker中获取到消息后,可以从Message Broker删除消息,或Message Broker在消息被Consumer拿去消费时删除消息,不用关心Consumer最后对消息的处理结果。
消息至少被消费一次
适合不能容忍丢消息,但允许重复消费的任务。
a)Producer发送消息到Message Broker阶段:
Producer发消息给Message Broker,Message Broker必须响应对消息的确认。
b)Message Broker存储/转发阶段:
Message Broker必须提供持久性保障,转发消息时,Message Broker需要Consumer通知删除消息,才能将消息删除。
c)Consumer消费阶段:
Consumer从Message Broker中获取到消息,必须在消费完成后,Message Broker上的消息才能被删除。
消息仅被消费一次
适合对消息消费情况要求非常高的任务,实现较为复杂,这里的“仅被消费一次”包含如下两种场景:
1)Message Broker上存储的消息被Consumer仅消费一次,场景要求:
a)Producer发送消息到Message Broker阶段:
Producer发消息给Message Broker,不要求Message Broker对接收到的消息响应确认,Producer也不用关心Message Broker是否收到消息了。
b)Message Broker存储/转发阶段:
Message Broker必须提供持久性保障,并且每条消息在其消费队列里有唯一标识(这个唯一标识可以由Producer产生,也可以由Message Broker产生)。
c)Consumer消费阶段:
Consumer从Message Broker中获取到消息后,需要记录下消费的消息标识,以便在后续消费中防止对某个消息重复消费。比如Consumer获取到消息,消费完后,还没来得及从Message Broker删除消息,就挂了,这样Message Broker如果把消息重新加入待消费队列的话,那么这条消息就会被重复消费了。
2)Producer上产生的消息被Consumer仅消费一次,场景要求:
a)Producer发送消息到Message Broker阶段:
Producer发消息给Message Broker,Message Broker必须响应对消息的确认,并且Producer负责为该消息产生唯一标识,以防止Consumer重复消费(因为Producer发消息给Message Broker后,由于网络问题没收到Message Broker的响应,可能会重发消息给到Message Broker)。
b)Message Broker存储/转发阶段:
Message Broker必须提供持久性保障,并且每条消息在其消费队列里有唯一标识(这个唯一标识需要由Producer产生)。
c)Consumer消费阶段:
Consumer从Message Broker中获取到消息后,需要记录下消费的消息标识,以便在后续消费中防止对某个消息重复消费。比如Consumer获取到消息,消费完后,还没来得及从Message Broker删除消息,就挂了,这样Message Broker如果把消息重新加入待消费队列的话,那么这条消息就会被重复消费了。
实践Hello World
/// <summary>
/// 发送按钮事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void button1_Click(object sender, EventArgs e)
{if (!string.IsNullOrEmpty(textBox1.Text.Trim())){try{//判断私有队列是否存在if (!MessageQueue.Exists(@".\Private$\MyPrivateQueue")){//创建一个私有队列MessageQueue.Create(@".\Private$\MyPrivateQueue");}//实例一个队列var queue = new MessageQueue(@".\Private$\MyPrivateQueue");//发送消息(第一个参数为消息内容,第二个参数为消息标签或名称)queue.Send(textBox1.Text.Trim(), "TestLable");}catch (MessageQueueException ex){MessageBox.Show(ex.Message, "异常消息", MessageBoxButtons.OK, MessageBoxIcon.Error);}}
}
/// <summary>/// 接收按钮事件/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void button2_Click(object sender, EventArgs e){try{//判断私有队列是否存在if (!MessageQueue.Exists(@".\Private$\MyPrivateQueue")){//创建一个私有队列MessageQueue.Create(@".\Private$\MyPrivateQueue");}//实例一个队列var queue = new MessageQueue(@".\Private$\MyPrivateQueue");//读取消息,设置格式化方式queue.Formatter = new XmlMessageFormatter(new string[] { "System.String" });//读取第一个消息var message = queue.Receive();//显示消息内容label1.Text = message.Body.ToString();}catch (MessageQueueException ex){MessageBox.Show(ex.Message);}}
全文讲述framework中的消息队列的原理及简单的实践方法;更多framework的学习可以参考《framework精编手册》里面是有关framework的源码解析与技术点全家桶。点击即可前往
消息队列 好处或功能:
1、消息可以在断开连接的环境下发送。不需要同时运行正在发送和正在接收的应用程序。
2、使用快捷模式,消息可以非常快地发送。在快捷模式下,消息存储在内存中。
3、对于可恢复的机制,消息可以使用有保证的交付方式发送。可恢复的消`息存储在文件中。在服务器重新启动时发送它们。
4、用访问控制列表来保护消息队列,可以确定哪些用户可以发送或接收队列中的消息。消息还可以加密,避免网络嗅探器读取其中的数据。消息在发送时可以指定优先级,这样可以更快地处理高优先级的项。
5、Message Queuing 3.0支持多播消息的发送。
6、Message queuing 4.0支持病毒消息。病毒消息不能解析。可以定义病毒队列中不能解析的消息是可以移动的。例如,如果从正常的队列中读取消息后,对应作业要把消息插入数据库中,但消息不能插入数据库,因此该作业失败,该消息就会发送到病毒队列中。有人负责处理病毒队列,这个人应以能解析病毒消息的方式来处理该消息。
7、Message Queuing 5.0支持更安全学身份验证算法,可以处理大量队列(Message queuing 4.0在处理几千个队列时有性能问题)。
相关文章:

Framework——【MessageQueue】消息队列
定义 队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是 Apache RocketMQ 消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。 队列的主要作用如下: 存储顺序性…...

SpringBoot依赖原理分析及配置文件
💟💟前言 友友们大家好,我是你们的小王同学😗😗 今天给大家打来的是 SpringBoot依赖原理分析及配置文件 希望能给大家带来有用的知识 觉得小王写的不错的话麻烦动动小手 点赞👍 收藏⭐ 评论📄…...

智慧机场,或将成为航空领域数字孪生技术得完美应用
在《智慧民航建设路线图》文件中,民航局明确指出,智慧机场是实现智慧民航的四个核心抓手之一。这一战略性举措旨在推进数字化技术与航空产业的深度融合,为旅客提供更加智能化、便捷化、安全化的出行服务,进一步提升我国民航发展的…...
SQL64 对顾客ID和日期排序
描述有Orders表cust_idorder_numorder_dateandyaaaa2021-01-01 00:00:00andybbbb2021-01-01 12:00:00bobcccc2021-01-10 12:00:00dickdddd2021-01-11 00:00:00【问题】编写 SQL 语句,从 Orders 表中检索顾客 ID(cust_id)和订单号(…...

MybatisPlus使用聚合函数
前言 今天遇到了一个求总数返回的情况,我一想这不是用sum就完事了吗。 但是仔细想想,MybatisPlus好像没有直接使用sum的api。 虽然没有直接提供,但是办法还是有的,下面就分享下如何实现的: 首先如果使用sql是这么写…...
工程管理系统源码企业工程管理系统简介
一、立项管理 1、招标立项申请 功能点:招标类项目立项申请入口,用户可以保存为草稿,提交。 2、非招标立项申请 功能点:非招标立项申请入口、用户可以保存为草稿、提交。 3、采购立项列表 功能点:对草稿进行编辑&#x…...
《计算机视觉和图像处理简介 - 中英双语版》:使用 OpenCV对图像进行空间滤波
文章大纲 Linear Filtering 线性滤波器Filtering Noise 过滤噪声Gaussian Blur 高斯滤波Image Sharpening 图像锐化Edges 边缘滤波Median 中值滤波Threshold Function Parameters 阈值函数参数References本文大概需要40分钟 Spatial Operations in Image Processing 图像处理中…...
FreeRTOS软件定时器 | FreeRTOS十三
目录 说明: 一、定时器简介 1.1、定时器 1.2、软件定时器 1.3、硬件定时器 1.4、FreeRTOS软件定时器 1.5、软件定时器服务任务作用 1.6、软件定时器的命令队列 1.7、软件定时器相关配置 1.8、单次定时器和周期定时器 1.9、软件定时器结构体 二、软件定时…...

电脑文件被误删?360文件恢复工具,免费的文件恢复软件
电脑里面保存着各种文件,因为误操作我们把还需要用的文件给删除了。很多人都想要使用不收费的文件恢复软件来进行恢复操作,但是又不清楚有哪些文件可以帮到我们。接下来就给大家介绍,一款真正免费的数据 恢复app,一起来看看&#…...
pg_cron优化案例--terminate pg_cron launcher可自动拉起
场景 在PostgreSQL中我们可以使用pg_cron来实现数据库定时任务 我有一个select 1的定时任务,每分钟触发一次 testdb# select * from cron.job ;jobid | schedule | command | nodename | nodeport | database | username | active | jobname -------…...

Python 之 NumPy 随机函数和常用函数
文章目录一、随机函数1. numpy.random.rand(d0,d1,…,dn)2. numpy.random.randn(d0,d1,…,dn)3. numpy.random.normal()4. numpy.random.randint()5. numpy.random.sample6. 随机种子np.random.seed()7. 正态分布 numpy.random.normal二、数组的其他函数1. numpy.resize()2. nu…...

【目标检测】K-means和K-means++计算anchors结果比较(附完整代码,全网最详细的手把手教程)
写在前面: 首先感谢兄弟们的订阅,让我有创作的动力,在创作过程我会尽最大努力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。 一、介绍 YOLO系列目标检测算法中基于anchor的模型还是比较多的,例如YOLOv3、YOLOv4、YOLOv5等,我们可以随机初始化a…...

Java高手速成 | 图说重定向与转发
我们先回顾一下Servlet的工作原理,Servlet的工作原理跟小猪同学食堂就餐的过程很类似。小猪同学点了烤鸡腿(要奥尔良风味的),食堂窗口的服务员记下了菜单,想了想后厨的所有厨师,然后将菜单和餐盘交给专门制…...
Git:不小心在主分支master上进行修改,怎么才能将修改的数据保存到正确的分支中
1.如果还没有push commit 代码第一步:将所修改的代码提交到暂存区git stash第二步:切换到正确的分支git checkout 分支名第三步:从暂存区中取出保存到正确的分支中git stash pop第四步:重新提交git push origin 分支名2.如果已经p…...

都2023年了,如果不会Stream流、函数式编程?你确定能看懂公司代码?
👳我亲爱的各位大佬们好😘😘😘 ♨️本篇文章记录的为 Stream流、函数式编程 相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。 ♨️如果…...

亚马逊云科技汽车行业解决方案
当今,随着万物智联、云计算等领域的高速发展,创新智能网联汽车和车路协同技术正在成为车企加速发展的关键途径,推动着汽车产品从出行代步工具向着“超级智能移动终端”快速转变。 挑战无处不在,如何抢先预判? 随着近…...

为什么学了模数电还是看不懂较复杂的电路图
看懂电路并不难。 (1) 首先要摆正心态,不要看到错综复杂的电路图就一脸懵逼,不知所错。你要明白,再复杂的电路也是由一个个的基本电路拼装出来的。 (2) 基础知识当然是少不了的,常用的基本电路结构搞搞清楚。 (3) 分析电路之前先要…...

帮公司面试了一个30岁培训班出来的程序员,没啥工作经验...
首先,我说一句:培训出来的,优秀学员大有人在,我不希望因为带着培训的标签而无法达到用人单位和候选人的双向匹配,是非常遗憾的事情。 最近,在网上看到这样一个留言,引发了程序员这个圈子不少的…...

勒索软件、网络钓鱼、零信任和网络安全的新常态
当疫情来袭时,网络罪犯看到了他们的机会。随着公司办公、政府机构、学校和大学从以往的工作模式转向远程线上办公模式,甚至许多医疗保健设施都转向线上,这种快速的过渡性质导致了不可避免的网络安全漏洞。消费者宽带和个人设备破坏了企业安全…...
python3 字符串拼接与抽取
我们经常会有对字符串进行拼接和抽取的需求,下面有几个例子可以作为参考。 需求1:取出ip地址的网络地址与网络掩码进行拼接,分别使用shell脚本和python3实现 # echo "192.168.0.1"|awk -F. {print $1"."$2"."…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误
HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误,它们的含义、原因和解决方法都有显著区别。以下是详细对比: 1. HTTP 406 (Not Acceptable) 含义: 客户端请求的内容类型与服务器支持的内容类型不匹…...

工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...

.Net框架,除了EF还有很多很多......
文章目录 1. 引言2. Dapper2.1 概述与设计原理2.2 核心功能与代码示例基本查询多映射查询存储过程调用 2.3 性能优化原理2.4 适用场景 3. NHibernate3.1 概述与架构设计3.2 映射配置示例Fluent映射XML映射 3.3 查询示例HQL查询Criteria APILINQ提供程序 3.4 高级特性3.5 适用场…...

定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

在 Visual Studio Code 中使用驭码 CodeRider 提升开发效率:以冒泡排序为例
目录 前言1 插件安装与配置1.1 安装驭码 CodeRider1.2 初始配置建议 2 示例代码:冒泡排序3 驭码 CodeRider 功能详解3.1 功能概览3.2 代码解释功能3.3 自动注释生成3.4 逻辑修改功能3.5 单元测试自动生成3.6 代码优化建议 4 驭码的实际应用建议5 常见问题与解决建议…...
2025.6.9总结(利与弊)
凡事都有两面性。在大厂上班也不例外。今天找开发定位问题,从一个接口人不断溯源到另一个 接口人。有时候,不知道是谁的责任填。将工作内容分的很细,每个人负责其中的一小块。我清楚的意识到,自己就是个可以随时替换的螺丝钉&…...
raid存储技术
1. 存储技术概念 数据存储架构是对数据存储方式、存储设备及相关组件的组织和规划,涵盖存储系统的布局、数据存储策略等,它明确数据如何存储、管理与访问,为数据的安全、高效使用提供支撑。 由计算机中一组存储设备、控制部件和管理信息调度的…...

EasyRTC音视频实时通话功能在WebRTC与智能硬件整合中的应用与优势
一、WebRTC与智能硬件整合趋势 随着物联网和实时通信需求的爆发式增长,WebRTC作为开源实时通信技术,为浏览器与移动应用提供免插件的音视频通信能力,在智能硬件领域的融合应用已成必然趋势。智能硬件不再局限于单一功能,对实时…...