当前位置: 首页 > news >正文

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列

创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个数返回路由信息。在物理文件层面,只有写队列才会创建文件。默认读、写队列的个数都是 16。

比如写队列的个数是 16,则创建 16 个文件夹,代表 0 - 15;读队列的个数是 8,则只会消费 0 - 7 这 8 个队列中的消息。

要求 readQueueNums >= writeQueueNums,最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。

比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下,先缩容写队列,由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后,再缩容读队列,由 0 - 15 缩容至 0 - 7。

队列的选择

方式一、指定 queueId 来选择具体的队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。

public MessageQueue(String topic, String brokerName, int queueId) {this.topic = topic;this.brokerName = brokerName;this.queueId = queueId;
}

方式二、根据 MessageQueueSelector 策略来选择队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。

public SendResult send(Message msg, MessageQueueSelector selector, Object arg);
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);

RocketMQ 内部定义了三种 MessageQueueSelector 策略。

  • SelectMessageQueueByHash:基于方法参数arg的哈希值,对队列总数取模,选择对应下标的队列。
  • SelectMessageQueueByRandom:基于队列总数生成一个随机数,选择对应下标的队列。
  • SelectMessageQueueByMachineRoom:返回空。
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 取arg方法参数的哈希值,再对队列总数取模int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}// 选择对应的队列return mqs.get(value);}
}
public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis());@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value = random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);}
}

方式三、基于Broker的可用性采取轮询的策略选择队列

DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector,简单看下这种方式的队列是如何选择。

这种方式下的 send / sendOneway 方法中内部会调用如下方法:

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

进入方法内部,看一下处理逻辑。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

MQFaultStrategy

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制,默认falseif (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 获取指定下标的队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用,则返回该队列;否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}

LatencyFaultToleranceImpl

@Override
public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem = this.faultItemTable.get(name);// 如果缓存命中if (faultItem != null) {// 判断是否可用,即当前时间-startTimestamp是否>=0return faultItem.isAvailable();}return true;
}@Override
public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;
}@Override
public void remove(final String name) {this.faultItemTable.remove(name);
}

TopicPublishInfo

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
}

额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

接着看下 MQFaultStrategy 的 updateFaultItem 方法。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {// 根据延迟时间计算不可用的时间if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}

接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old = this.faultItemTable.get(name);// 如果缓存未命中if (null == old) {// 构造 FaultItem 实例final FaultItem faultItem = new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);// 更新缓存old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}// 如果缓存命中  } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}

相关文章:

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列 创建主题时&#xff0c;可以指定 writeQueueNums&#xff08;写队列的个数&#xff09;、readQueueNums&#xff08;读队列的个数&#xff09;。生产者发送消息时&#xff0c;使用写队列的个数返回路由信息&#xff1b;消费者消费消息时&#xff0c;使用读队列的个…...

华为OD机试真题Python实现【通信误码】真题+解题思路+代码(20222023)

通信误码 题目 信号传播过程中会出现一些误码,不同的数字表示不同的误码 ID,取值范围为 1~65535,用一个数组记录误码出现的情况,每个误码出现的次数代表误码频度,请找出记录中包含频度最高误码的最小子数组长度。 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD…...

【单目3D目标检测】MonoDDE论文精读与代码解析

文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point&#xff08;PnP&#xff09;PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…...

复习 Kotlin 从小白到大牛 第二版 笔记要点

4.2.2 常量和只读变量 常量和只读变量一旦初始化就不能再被修改。在kotlin中&#xff0c;声明常量是在标识符的前面加上val或const val 关键字。 1. val 声明的是运行时变量&#xff0c;在运行时进行初始化 2.const val 声明的是编译时常量&#xff0c;在编译时初始化 val …...

X264简介-Android使用(二)

X264简介-Android使用&#xff08;二&#xff09; 4、Ubuntu上安装ffmpeg&#xff1a; 检查更新本地软件包&#xff08;如果未更新&#xff0c;reboot Vmware&#xff09;&#xff1a; sudo apt update sudo apt upgrade官网下载的source文件安装&#xff1a; http://ffmpe…...

【独家】华为OD机试 - 统计差异值大于相似值二元组个数(C 语言解题)

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明本期…...

掌握好Framework 才是王道~

现在面试对Android开发者的要求越来越高了&#xff01;从最开始的阿里、头条、腾讯等大厂&#xff0c;到现在的互联网车企&#xff0c;面试总喜欢问道 Framework底层原理的相关问题 Android Framework的三大核心功能&#xff1a; 1、View.java:View工作原理&#xff0c;实现包…...

Codeforces Round 856 (Div. 2) A — C

Codeforces Round 856 (Div. 2) 文章目录A. Prefix and Suffix Array题目大意题目分析codeB. Not Dividing题目大意题目分析codeC. Scoring Subsequences题目大意题目分析codeA. Prefix and Suffix Array 题目大意 给出一个字符串所有的非空前后缀&#xff0c;判断原字符串是…...

2022年MathorCup数学建模B题无人仓的搬运机器人调度问题解题全过程文档加程序

2022年第十二届MathorCup高校数学建模 B题 无人仓的搬运机器人调度问题 原题再现 本题考虑在无人仓内的仓库管理问题之一&#xff0c;搬运机器人 AGV 的调度问题。更多的背景介绍请参看附件-背景介绍。对于无人仓来说&#xff0c;仓库的地图模型可以简化为图的数据结构。 仓库…...

开源项目的演进会遇到哪些“坑”?KubeVela 从发起到晋级 CNCF 孵化的全程回顾

作者&#xff1a;孙健波、曾庆国 点击查看&#xff1a;「开源人说」第五期《KubeVela&#xff1a;一场向应用交付标准的冲锋》 2023 年 2 月&#xff0c;**KubeVela [ 1] ** 经过全体 ToC 投票成功进入 CNCF Incubation&#xff0c;是云原生领域首个晋级孵化的面向应用的交付…...

MSDP实验配置

目录 配置MSDP 配置PIM SM协议 配置各PIM SM域内的静态RP 配置MSDP对等体 配置域内的MSDP对等体 AR8和AR9建立EBGP邻居 配置域间的MSDP对等体 进行实验验证 什么是MSDP MSDP&#xff08;Multicast Source Discovery Protocol&#xff09;组播源发现协议的简称 用来传递…...

惊!初中生也来卷了……

大家好&#xff0c;我是良许。 前两天在抖音直播的时候&#xff0c;突然来了一位不速之客…… 他自称是初中生&#xff0c;一开始我还有点不太相信&#xff0c;直到跟他连麦&#xff0c;听到他还略带一些稚嫩的声音&#xff0c;我才知道&#xff0c;他没有骗我…… 他说他想学…...

kafka相关配置介绍

kafka默认配置 每个kafka broker中配置文件server.properties默认必须配置的属性如下&#xff1a; broker.id0 num.network.threads2 num.io.threads8 socket.send.buffer.bytes1048576 socket.receive.buffer.bytes1048576 socket.request.max.bytes104857600 log.dirs/tmp/…...

【PyTorch】教程:torch.nn.Hardtanh

torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值&#xff0c;默认为 -1max_val ([float]) – 线性区域的最大值&#xff0c;默认为 1inplace ([bool]) …...

神垕古镇景区5A级十年都没有实现的三大主因

钧 瓷 内 参 第40期&#xff08;总第371期&#xff09; 2023年3月5日 神垕古镇景区5A级十年都没有实现的三大主因 这是2013年&#xff0c;禹州市市政府第一次提出创建5A级景区到今年三月份整整十年啊&#xff01; 目前神垕古镇景区是4A级景区&#xff0c;5A级一直进行中&a…...

react函数组件常用的几个钩子函数useState、useEffect、useRef、useCallback

react框架react框架包括包括两大类&#xff1a;类组件函数组件。类组件构成&#xff1a;constructor自定义方法。调用方法通过this.方法名()。constructor(superstate)构造器里面必有super字段。render()方法里面写页面布局。函数组件构成&#xff1a;各种钩子函数return()方法…...

4N60-ASEMI高压MOS管4N60

编辑-Z 4N60在TO-220封装里的静态漏极源导通电阻&#xff08;RDS(ON)&#xff09;为2.5Ω&#xff0c;是一款N沟道高压MOS管。4N60的最大脉冲正向电流ISM为16A&#xff0c;零栅极电压漏极电流(IDSS)为1uA&#xff0c;其工作时耐温度范围为-55~150摄氏度。4N60功耗&#xff08;…...

现代神经网络(VGG),并用VGG16进行实战CIFAR10分类

专栏&#xff1a;神经网络复现目录 本章介绍的是现代神经网络的结构和复现&#xff0c;包括深度卷积神经网络&#xff08;AlexNet&#xff09;&#xff0c;VGG&#xff0c;NiN&#xff0c;GoogleNet&#xff0c;残差网络&#xff08;ResNet&#xff09;&#xff0c;稠密连接网络…...

Java代码弱点与修复之——Dereference null return value(间接引用空返回值)

弱点描述 Dereference null return value,间接引用空返回值。是Coverity Scan静态代码分析工具中的一个警告,表示代码中有对可能为空(null)的方法或函数返回值进行间接引用(Dereference)操作。 该类型的漏洞可能会导致 NullPointerException 异常,并且会导致程序崩溃或…...

【冲刺蓝桥杯的最后30天】day3

大家好&#x1f603;&#xff0c;我是想要慢慢变得优秀的向阳&#x1f31e;同学&#x1f468;‍&#x1f4bb;&#xff0c;断更了整整一年&#xff0c;又开始恢复CSDN更新&#xff0c;从今天开始更新备战蓝桥30天系列&#xff0c;一共30天&#xff0c;如果对你有帮助或者正在备…...

初识Git,带你深入学习Git相关的知识

在之前的博客中&#xff0c;我都会在博客的开头放一个gitee的链接。Gitee是什么呢&#xff1f;它是一个远程的代码托管库。在我们学习和项目管理的时候起着非常重要的作用。 本期我就带领着大家一起学习Git相关的知识内容。学习它的操作&#xff0c;了解其在企业级开发中的作用…...

stealth.js全解析:40+反检测补丁的配置与优化技巧

Stealth.js全解析&#xff1a;40反检测补丁的配置与优化技巧 在当今的Web自动化领域&#xff0c;反检测技术已成为开发者必须掌握的核心技能之一。无论是数据采集、自动化测试还是其他需要模拟真实用户行为的场景&#xff0c;如何让脚本"隐形"都是决定成败的关键因素…...

运放跟随器:电路设计中最容易被低估的‘保镖‘(隔离驱动全解析)

运放跟随器&#xff1a;电路设计中最容易被低估的"保镖"&#xff08;隔离&驱动全解析&#xff09; 在硬件工程师的日常设计中&#xff0c;运放跟随器常常被视为一个"可有可无"的组件——毕竟它的电压增益仅为1&#xff0c;看起来似乎只是将输入信号原封…...

Windows 11 离线部署 WSL2 与 Ubuntu:绕过商店限制的完整实战

1. 为什么需要离线部署 WSL2 与 Ubuntu 很多开发者在 Windows 11 上使用 WSL2 时都会遇到一个头疼的问题&#xff1a;微软商店经常无法正常访问或下载速度极慢。我自己就遇到过好几次&#xff0c;明明网络连接正常&#xff0c;但就是卡在下载环节&#xff0c;进度条一动不动。这…...

别再只会用高德百度了!这7种专业地图(附GIS工具推荐)帮你搞定数据分析

7种专业地图与GIS工具实战指南&#xff1a;从用户分布到物流优化的全场景解决方案 打开手机地图应用查看路线&#xff0c;可能是大多数人对地理数据的唯一接触。但当你需要分析千万级用户的区域活跃度、规划全国物流网络或评估新店选址时&#xff0c;高德百度提供的标准化地图就…...

Zotero插件版本兼容性问题深度解析:从冲突到解决方案

Zotero插件版本兼容性问题深度解析&#xff1a;从冲突到解决方案 【免费下载链接】zotero-format-metadata Linter for Zotero. An addon for Zotero to format item metadata. Shortcut to set title rich text; set journal abbreviations, university places, and item lang…...

遥感影像处理避坑指南:为什么你的SHP裁剪总失败?ArcMap与ENVI协作全解析

遥感影像裁剪实战避坑手册&#xff1a;从坐标系校准到多工具协同 当你在深夜盯着屏幕上那个扭曲变形的裁剪结果时&#xff0c;是否曾怀疑过人生&#xff1f;遥感影像的矢量裁剪看似简单&#xff0c;实则暗藏玄机。本文将带你深入剖析那些教科书上不会告诉你的实战细节&#xff…...

用计算机科学与技术的视角,把谈恋爱流程化:构建可运行、可调试、可迭代的情感操作系统

用计算机科学与技术的视角&#xff0c;把谈恋爱流程化&#xff1a;构建可运行、可调试、可迭代的情感操作系统 善灵驿站 成长心理 技术思维深度融合系列 作者&#xff1a;培风图南以星河揽胜 专栏链接&#xff1a;善灵驿站 &#x1f4cc; 导读&#xff1a;为什么技术人更需要…...

AUTOSAR SPI配置进阶:如何为你的车载传感器设计高效可靠的通信序列?

AUTOSAR SPI配置进阶&#xff1a;车载传感器通信序列设计实战指南 在智能驾驶系统开发中&#xff0c;SPI总线作为连接毫米波雷达、IMU等关键传感器的神经末梢&#xff0c;其通信效率直接影响着环境感知的实时性。传统配置手册往往止步于基础参数说明&#xff0c;而本文将带您深…...

PADS 9.5集成的组件

PADS 9.5是一个高度集成的PCB设计平台&#xff0c;主要由三大核心组件构成&#xff1a;PADS Logic&#xff08;原理图设计&#xff09;、PADS Layout&#xff08;PCB布局设计&#xff09;和PADS Router&#xff08;交互式布线&#xff09;。这三个模块各司其职&#xff0c;又紧…...