RocketMQ架构篇 - 读写队列与生产者如何选择队列
读、写队列
创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个数返回路由信息。在物理文件层面,只有写队列才会创建文件。默认读、写队列的个数都是 16。
比如写队列的个数是 16,则创建 16 个文件夹,代表 0 - 15;读队列的个数是 8,则只会消费 0 - 7 这 8 个队列中的消息。
要求 readQueueNums >= writeQueueNums,最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。
比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下,先缩容写队列,由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后,再缩容读队列,由 0 - 15 缩容至 0 - 7。
队列的选择
方式一、指定 queueId 来选择具体的队列
DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。
public MessageQueue(String topic, String brokerName, int queueId) {this.topic = topic;this.brokerName = brokerName;this.queueId = queueId;
}
方式二、根据 MessageQueueSelector 策略来选择队列
DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。
public SendResult send(Message msg, MessageQueueSelector selector, Object arg);
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);
RocketMQ 内部定义了三种 MessageQueueSelector 策略。
- SelectMessageQueueByHash:基于方法参数arg的哈希值,对队列总数取模,选择对应下标的队列。
- SelectMessageQueueByRandom:基于队列总数生成一个随机数,选择对应下标的队列。
- SelectMessageQueueByMachineRoom:返回空。
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 取arg方法参数的哈希值,再对队列总数取模int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}// 选择对应的队列return mqs.get(value);}
}
public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis());@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value = random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);}
}
方式三、基于Broker的可用性采取轮询的策略选择队列
DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector,简单看下这种方式的队列是如何选择。
这种方式下的 send / sendOneway 方法中内部会调用如下方法:
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
进入方法内部,看一下处理逻辑。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
MQFaultStrategy
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制,默认falseif (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 获取指定下标的队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用,则返回该队列;否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}
LatencyFaultToleranceImpl
@Override
public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem = this.faultItemTable.get(name);// 如果缓存命中if (faultItem != null) {// 判断是否可用,即当前时间-startTimestamp是否>=0return faultItem.isAvailable();}return true;
}@Override
public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;
}@Override
public void remove(final String name) {this.faultItemTable.remove(name);
}
TopicPublishInfo
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
}
额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
接着看下 MQFaultStrategy 的 updateFaultItem 方法。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {// 根据延迟时间计算不可用的时间if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}
接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old = this.faultItemTable.get(name);// 如果缓存未命中if (null == old) {// 构造 FaultItem 实例final FaultItem faultItem = new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);// 更新缓存old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}// 如果缓存命中 } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}
相关文章:
RocketMQ架构篇 - 读写队列与生产者如何选择队列
读、写队列 创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个…...
华为OD机试真题Python实现【通信误码】真题+解题思路+代码(20222023)
通信误码 题目 信号传播过程中会出现一些误码,不同的数字表示不同的误码 ID,取值范围为 1~65535,用一个数组记录误码出现的情况,每个误码出现的次数代表误码频度,请找出记录中包含频度最高误码的最小子数组长度。 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD…...
【单目3D目标检测】MonoDDE论文精读与代码解析
文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point(PnP)PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…...
复习 Kotlin 从小白到大牛 第二版 笔记要点
4.2.2 常量和只读变量 常量和只读变量一旦初始化就不能再被修改。在kotlin中,声明常量是在标识符的前面加上val或const val 关键字。 1. val 声明的是运行时变量,在运行时进行初始化 2.const val 声明的是编译时常量,在编译时初始化 val …...
X264简介-Android使用(二)
X264简介-Android使用(二) 4、Ubuntu上安装ffmpeg: 检查更新本地软件包(如果未更新,reboot Vmware): sudo apt update sudo apt upgrade官网下载的source文件安装: http://ffmpe…...
【独家】华为OD机试 - 统计差异值大于相似值二元组个数(C 语言解题)
最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明本期…...
掌握好Framework 才是王道~
现在面试对Android开发者的要求越来越高了!从最开始的阿里、头条、腾讯等大厂,到现在的互联网车企,面试总喜欢问道 Framework底层原理的相关问题 Android Framework的三大核心功能: 1、View.java:View工作原理,实现包…...
Codeforces Round 856 (Div. 2) A — C
Codeforces Round 856 (Div. 2) 文章目录A. Prefix and Suffix Array题目大意题目分析codeB. Not Dividing题目大意题目分析codeC. Scoring Subsequences题目大意题目分析codeA. Prefix and Suffix Array 题目大意 给出一个字符串所有的非空前后缀,判断原字符串是…...
2022年MathorCup数学建模B题无人仓的搬运机器人调度问题解题全过程文档加程序
2022年第十二届MathorCup高校数学建模 B题 无人仓的搬运机器人调度问题 原题再现 本题考虑在无人仓内的仓库管理问题之一,搬运机器人 AGV 的调度问题。更多的背景介绍请参看附件-背景介绍。对于无人仓来说,仓库的地图模型可以简化为图的数据结构。 仓库…...
开源项目的演进会遇到哪些“坑”?KubeVela 从发起到晋级 CNCF 孵化的全程回顾
作者:孙健波、曾庆国 点击查看:「开源人说」第五期《KubeVela:一场向应用交付标准的冲锋》 2023 年 2 月,**KubeVela [ 1] ** 经过全体 ToC 投票成功进入 CNCF Incubation,是云原生领域首个晋级孵化的面向应用的交付…...
MSDP实验配置
目录 配置MSDP 配置PIM SM协议 配置各PIM SM域内的静态RP 配置MSDP对等体 配置域内的MSDP对等体 AR8和AR9建立EBGP邻居 配置域间的MSDP对等体 进行实验验证 什么是MSDP MSDP(Multicast Source Discovery Protocol)组播源发现协议的简称 用来传递…...
惊!初中生也来卷了……
大家好,我是良许。 前两天在抖音直播的时候,突然来了一位不速之客…… 他自称是初中生,一开始我还有点不太相信,直到跟他连麦,听到他还略带一些稚嫩的声音,我才知道,他没有骗我…… 他说他想学…...
kafka相关配置介绍
kafka默认配置 每个kafka broker中配置文件server.properties默认必须配置的属性如下: broker.id0 num.network.threads2 num.io.threads8 socket.send.buffer.bytes1048576 socket.receive.buffer.bytes1048576 socket.request.max.bytes104857600 log.dirs/tmp/…...
【PyTorch】教程:torch.nn.Hardtanh
torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值,默认为 -1max_val ([float]) – 线性区域的最大值,默认为 1inplace ([bool]) …...
神垕古镇景区5A级十年都没有实现的三大主因
钧 瓷 内 参 第40期(总第371期) 2023年3月5日 神垕古镇景区5A级十年都没有实现的三大主因 这是2013年,禹州市市政府第一次提出创建5A级景区到今年三月份整整十年啊! 目前神垕古镇景区是4A级景区,5A级一直进行中&a…...
react函数组件常用的几个钩子函数useState、useEffect、useRef、useCallback
react框架react框架包括包括两大类:类组件函数组件。类组件构成:constructor自定义方法。调用方法通过this.方法名()。constructor(superstate)构造器里面必有super字段。render()方法里面写页面布局。函数组件构成:各种钩子函数return()方法…...
4N60-ASEMI高压MOS管4N60
编辑-Z 4N60在TO-220封装里的静态漏极源导通电阻(RDS(ON))为2.5Ω,是一款N沟道高压MOS管。4N60的最大脉冲正向电流ISM为16A,零栅极电压漏极电流(IDSS)为1uA,其工作时耐温度范围为-55~150摄氏度。4N60功耗(…...
现代神经网络(VGG),并用VGG16进行实战CIFAR10分类
专栏:神经网络复现目录 本章介绍的是现代神经网络的结构和复现,包括深度卷积神经网络(AlexNet),VGG,NiN,GoogleNet,残差网络(ResNet),稠密连接网络…...
Java代码弱点与修复之——Dereference null return value(间接引用空返回值)
弱点描述 Dereference null return value,间接引用空返回值。是Coverity Scan静态代码分析工具中的一个警告,表示代码中有对可能为空(null)的方法或函数返回值进行间接引用(Dereference)操作。 该类型的漏洞可能会导致 NullPointerException 异常,并且会导致程序崩溃或…...
【冲刺蓝桥杯的最后30天】day3
大家好😃,我是想要慢慢变得优秀的向阳🌞同学👨💻,断更了整整一年,又开始恢复CSDN更新,从今天开始更新备战蓝桥30天系列,一共30天,如果对你有帮助或者正在备…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
深入剖析AI大模型:大模型时代的 Prompt 工程全解析
今天聊的内容,我认为是AI开发里面非常重要的内容。它在AI开发里无处不在,当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗",或者让翻译模型 "将这段合同翻译成商务日语" 时,输入的这句话就是 Prompt。…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...
管理学院权限管理系统开发总结
文章目录 🎓 管理学院权限管理系统开发总结 - 现代化Web应用实践之路📝 项目概述🏗️ 技术架构设计后端技术栈前端技术栈 💡 核心功能特性1. 用户管理模块2. 权限管理系统3. 统计报表功能4. 用户体验优化 🗄️ 数据库设…...
