当前位置: 首页 > news >正文

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列

创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个数返回路由信息。在物理文件层面,只有写队列才会创建文件。默认读、写队列的个数都是 16。

比如写队列的个数是 16,则创建 16 个文件夹,代表 0 - 15;读队列的个数是 8,则只会消费 0 - 7 这 8 个队列中的消息。

要求 readQueueNums >= writeQueueNums,最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。

比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下,先缩容写队列,由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后,再缩容读队列,由 0 - 15 缩容至 0 - 7。

队列的选择

方式一、指定 queueId 来选择具体的队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。

public MessageQueue(String topic, String brokerName, int queueId) {this.topic = topic;this.brokerName = brokerName;this.queueId = queueId;
}

方式二、根据 MessageQueueSelector 策略来选择队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。

public SendResult send(Message msg, MessageQueueSelector selector, Object arg);
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);

RocketMQ 内部定义了三种 MessageQueueSelector 策略。

  • SelectMessageQueueByHash:基于方法参数arg的哈希值,对队列总数取模,选择对应下标的队列。
  • SelectMessageQueueByRandom:基于队列总数生成一个随机数,选择对应下标的队列。
  • SelectMessageQueueByMachineRoom:返回空。
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 取arg方法参数的哈希值,再对队列总数取模int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}// 选择对应的队列return mqs.get(value);}
}
public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis());@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value = random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);}
}

方式三、基于Broker的可用性采取轮询的策略选择队列

DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector,简单看下这种方式的队列是如何选择。

这种方式下的 send / sendOneway 方法中内部会调用如下方法:

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

进入方法内部,看一下处理逻辑。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

MQFaultStrategy

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制,默认falseif (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 获取指定下标的队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用,则返回该队列;否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}

LatencyFaultToleranceImpl

@Override
public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem = this.faultItemTable.get(name);// 如果缓存命中if (faultItem != null) {// 判断是否可用,即当前时间-startTimestamp是否>=0return faultItem.isAvailable();}return true;
}@Override
public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;
}@Override
public void remove(final String name) {this.faultItemTable.remove(name);
}

TopicPublishInfo

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
}

额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

接着看下 MQFaultStrategy 的 updateFaultItem 方法。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {// 根据延迟时间计算不可用的时间if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}

接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old = this.faultItemTable.get(name);// 如果缓存未命中if (null == old) {// 构造 FaultItem 实例final FaultItem faultItem = new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);// 更新缓存old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}// 如果缓存命中  } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}

相关文章:

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列 创建主题时&#xff0c;可以指定 writeQueueNums&#xff08;写队列的个数&#xff09;、readQueueNums&#xff08;读队列的个数&#xff09;。生产者发送消息时&#xff0c;使用写队列的个数返回路由信息&#xff1b;消费者消费消息时&#xff0c;使用读队列的个…...

华为OD机试真题Python实现【通信误码】真题+解题思路+代码(20222023)

通信误码 题目 信号传播过程中会出现一些误码,不同的数字表示不同的误码 ID,取值范围为 1~65535,用一个数组记录误码出现的情况,每个误码出现的次数代表误码频度,请找出记录中包含频度最高误码的最小子数组长度。 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD…...

【单目3D目标检测】MonoDDE论文精读与代码解析

文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point&#xff08;PnP&#xff09;PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…...

复习 Kotlin 从小白到大牛 第二版 笔记要点

4.2.2 常量和只读变量 常量和只读变量一旦初始化就不能再被修改。在kotlin中&#xff0c;声明常量是在标识符的前面加上val或const val 关键字。 1. val 声明的是运行时变量&#xff0c;在运行时进行初始化 2.const val 声明的是编译时常量&#xff0c;在编译时初始化 val …...

X264简介-Android使用(二)

X264简介-Android使用&#xff08;二&#xff09; 4、Ubuntu上安装ffmpeg&#xff1a; 检查更新本地软件包&#xff08;如果未更新&#xff0c;reboot Vmware&#xff09;&#xff1a; sudo apt update sudo apt upgrade官网下载的source文件安装&#xff1a; http://ffmpe…...

【独家】华为OD机试 - 统计差异值大于相似值二元组个数(C 语言解题)

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明本期…...

掌握好Framework 才是王道~

现在面试对Android开发者的要求越来越高了&#xff01;从最开始的阿里、头条、腾讯等大厂&#xff0c;到现在的互联网车企&#xff0c;面试总喜欢问道 Framework底层原理的相关问题 Android Framework的三大核心功能&#xff1a; 1、View.java:View工作原理&#xff0c;实现包…...

Codeforces Round 856 (Div. 2) A — C

Codeforces Round 856 (Div. 2) 文章目录A. Prefix and Suffix Array题目大意题目分析codeB. Not Dividing题目大意题目分析codeC. Scoring Subsequences题目大意题目分析codeA. Prefix and Suffix Array 题目大意 给出一个字符串所有的非空前后缀&#xff0c;判断原字符串是…...

2022年MathorCup数学建模B题无人仓的搬运机器人调度问题解题全过程文档加程序

2022年第十二届MathorCup高校数学建模 B题 无人仓的搬运机器人调度问题 原题再现 本题考虑在无人仓内的仓库管理问题之一&#xff0c;搬运机器人 AGV 的调度问题。更多的背景介绍请参看附件-背景介绍。对于无人仓来说&#xff0c;仓库的地图模型可以简化为图的数据结构。 仓库…...

开源项目的演进会遇到哪些“坑”?KubeVela 从发起到晋级 CNCF 孵化的全程回顾

作者&#xff1a;孙健波、曾庆国 点击查看&#xff1a;「开源人说」第五期《KubeVela&#xff1a;一场向应用交付标准的冲锋》 2023 年 2 月&#xff0c;**KubeVela [ 1] ** 经过全体 ToC 投票成功进入 CNCF Incubation&#xff0c;是云原生领域首个晋级孵化的面向应用的交付…...

MSDP实验配置

目录 配置MSDP 配置PIM SM协议 配置各PIM SM域内的静态RP 配置MSDP对等体 配置域内的MSDP对等体 AR8和AR9建立EBGP邻居 配置域间的MSDP对等体 进行实验验证 什么是MSDP MSDP&#xff08;Multicast Source Discovery Protocol&#xff09;组播源发现协议的简称 用来传递…...

惊!初中生也来卷了……

大家好&#xff0c;我是良许。 前两天在抖音直播的时候&#xff0c;突然来了一位不速之客…… 他自称是初中生&#xff0c;一开始我还有点不太相信&#xff0c;直到跟他连麦&#xff0c;听到他还略带一些稚嫩的声音&#xff0c;我才知道&#xff0c;他没有骗我…… 他说他想学…...

kafka相关配置介绍

kafka默认配置 每个kafka broker中配置文件server.properties默认必须配置的属性如下&#xff1a; broker.id0 num.network.threads2 num.io.threads8 socket.send.buffer.bytes1048576 socket.receive.buffer.bytes1048576 socket.request.max.bytes104857600 log.dirs/tmp/…...

【PyTorch】教程:torch.nn.Hardtanh

torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值&#xff0c;默认为 -1max_val ([float]) – 线性区域的最大值&#xff0c;默认为 1inplace ([bool]) …...

神垕古镇景区5A级十年都没有实现的三大主因

钧 瓷 内 参 第40期&#xff08;总第371期&#xff09; 2023年3月5日 神垕古镇景区5A级十年都没有实现的三大主因 这是2013年&#xff0c;禹州市市政府第一次提出创建5A级景区到今年三月份整整十年啊&#xff01; 目前神垕古镇景区是4A级景区&#xff0c;5A级一直进行中&a…...

react函数组件常用的几个钩子函数useState、useEffect、useRef、useCallback

react框架react框架包括包括两大类&#xff1a;类组件函数组件。类组件构成&#xff1a;constructor自定义方法。调用方法通过this.方法名()。constructor(superstate)构造器里面必有super字段。render()方法里面写页面布局。函数组件构成&#xff1a;各种钩子函数return()方法…...

4N60-ASEMI高压MOS管4N60

编辑-Z 4N60在TO-220封装里的静态漏极源导通电阻&#xff08;RDS(ON)&#xff09;为2.5Ω&#xff0c;是一款N沟道高压MOS管。4N60的最大脉冲正向电流ISM为16A&#xff0c;零栅极电压漏极电流(IDSS)为1uA&#xff0c;其工作时耐温度范围为-55~150摄氏度。4N60功耗&#xff08;…...

现代神经网络(VGG),并用VGG16进行实战CIFAR10分类

专栏&#xff1a;神经网络复现目录 本章介绍的是现代神经网络的结构和复现&#xff0c;包括深度卷积神经网络&#xff08;AlexNet&#xff09;&#xff0c;VGG&#xff0c;NiN&#xff0c;GoogleNet&#xff0c;残差网络&#xff08;ResNet&#xff09;&#xff0c;稠密连接网络…...

Java代码弱点与修复之——Dereference null return value(间接引用空返回值)

弱点描述 Dereference null return value,间接引用空返回值。是Coverity Scan静态代码分析工具中的一个警告,表示代码中有对可能为空(null)的方法或函数返回值进行间接引用(Dereference)操作。 该类型的漏洞可能会导致 NullPointerException 异常,并且会导致程序崩溃或…...

【冲刺蓝桥杯的最后30天】day3

大家好&#x1f603;&#xff0c;我是想要慢慢变得优秀的向阳&#x1f31e;同学&#x1f468;‍&#x1f4bb;&#xff0c;断更了整整一年&#xff0c;又开始恢复CSDN更新&#xff0c;从今天开始更新备战蓝桥30天系列&#xff0c;一共30天&#xff0c;如果对你有帮助或者正在备…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

SCAU期末笔记 - 数据分析与数据挖掘题库解析

这门怎么题库答案不全啊日 来简单学一下子来 一、选择题&#xff08;可多选&#xff09; 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘&#xff1a;专注于发现数据中…...

什么是库存周转?如何用进销存系统提高库存周转率?

你可能听说过这样一句话&#xff1a; “利润不是赚出来的&#xff0c;是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业&#xff0c;很多企业看着销售不错&#xff0c;账上却没钱、利润也不见了&#xff0c;一翻库存才发现&#xff1a; 一堆卖不动的旧货…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...