RocketMQ架构篇 - 读写队列与生产者如何选择队列
读、写队列
创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个数返回路由信息。在物理文件层面,只有写队列才会创建文件。默认读、写队列的个数都是 16。
比如写队列的个数是 16,则创建 16 个文件夹,代表 0 - 15;读队列的个数是 8,则只会消费 0 - 7 这 8 个队列中的消息。
要求 readQueueNums >= writeQueueNums,最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。
比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下,先缩容写队列,由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后,再缩容读队列,由 0 - 15 缩容至 0 - 7。
队列的选择
方式一、指定 queueId 来选择具体的队列
DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。
public MessageQueue(String topic, String brokerName, int queueId) {this.topic = topic;this.brokerName = brokerName;this.queueId = queueId;
}
方式二、根据 MessageQueueSelector 策略来选择队列
DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。
public SendResult send(Message msg, MessageQueueSelector selector, Object arg);
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);
RocketMQ 内部定义了三种 MessageQueueSelector 策略。
- SelectMessageQueueByHash:基于方法参数arg的哈希值,对队列总数取模,选择对应下标的队列。
- SelectMessageQueueByRandom:基于队列总数生成一个随机数,选择对应下标的队列。
- SelectMessageQueueByMachineRoom:返回空。
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 取arg方法参数的哈希值,再对队列总数取模int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}// 选择对应的队列return mqs.get(value);}
}
public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis());@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value = random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);}
}
方式三、基于Broker的可用性采取轮询的策略选择队列
DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector,简单看下这种方式的队列是如何选择。
这种方式下的 send / sendOneway 方法中内部会调用如下方法:
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
进入方法内部,看一下处理逻辑。
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
MQFaultStrategy
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制,默认falseif (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 获取指定下标的队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用,则返回该队列;否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}
LatencyFaultToleranceImpl
@Override
public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem = this.faultItemTable.get(name);// 如果缓存命中if (faultItem != null) {// 判断是否可用,即当前时间-startTimestamp是否>=0return faultItem.isAvailable();}return true;
}@Override
public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;
}@Override
public void remove(final String name) {this.faultItemTable.remove(name);
}
TopicPublishInfo
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
}
额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
接着看下 MQFaultStrategy 的 updateFaultItem 方法。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {// 根据延迟时间计算不可用的时间if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}
接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old = this.faultItemTable.get(name);// 如果缓存未命中if (null == old) {// 构造 FaultItem 实例final FaultItem faultItem = new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);// 更新缓存old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}// 如果缓存命中 } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}
相关文章:
RocketMQ架构篇 - 读写队列与生产者如何选择队列
读、写队列 创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个…...
华为OD机试真题Python实现【通信误码】真题+解题思路+代码(20222023)
通信误码 题目 信号传播过程中会出现一些误码,不同的数字表示不同的误码 ID,取值范围为 1~65535,用一个数组记录误码出现的情况,每个误码出现的次数代表误码频度,请找出记录中包含频度最高误码的最小子数组长度。 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD…...
【单目3D目标检测】MonoDDE论文精读与代码解析
文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point(PnP)PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…...
复习 Kotlin 从小白到大牛 第二版 笔记要点
4.2.2 常量和只读变量 常量和只读变量一旦初始化就不能再被修改。在kotlin中,声明常量是在标识符的前面加上val或const val 关键字。 1. val 声明的是运行时变量,在运行时进行初始化 2.const val 声明的是编译时常量,在编译时初始化 val …...
X264简介-Android使用(二)
X264简介-Android使用(二) 4、Ubuntu上安装ffmpeg: 检查更新本地软件包(如果未更新,reboot Vmware): sudo apt update sudo apt upgrade官网下载的source文件安装: http://ffmpe…...
【独家】华为OD机试 - 统计差异值大于相似值二元组个数(C 语言解题)
最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明本期…...
掌握好Framework 才是王道~
现在面试对Android开发者的要求越来越高了!从最开始的阿里、头条、腾讯等大厂,到现在的互联网车企,面试总喜欢问道 Framework底层原理的相关问题 Android Framework的三大核心功能: 1、View.java:View工作原理,实现包…...
Codeforces Round 856 (Div. 2) A — C
Codeforces Round 856 (Div. 2) 文章目录A. Prefix and Suffix Array题目大意题目分析codeB. Not Dividing题目大意题目分析codeC. Scoring Subsequences题目大意题目分析codeA. Prefix and Suffix Array 题目大意 给出一个字符串所有的非空前后缀,判断原字符串是…...
2022年MathorCup数学建模B题无人仓的搬运机器人调度问题解题全过程文档加程序
2022年第十二届MathorCup高校数学建模 B题 无人仓的搬运机器人调度问题 原题再现 本题考虑在无人仓内的仓库管理问题之一,搬运机器人 AGV 的调度问题。更多的背景介绍请参看附件-背景介绍。对于无人仓来说,仓库的地图模型可以简化为图的数据结构。 仓库…...
开源项目的演进会遇到哪些“坑”?KubeVela 从发起到晋级 CNCF 孵化的全程回顾
作者:孙健波、曾庆国 点击查看:「开源人说」第五期《KubeVela:一场向应用交付标准的冲锋》 2023 年 2 月,**KubeVela [ 1] ** 经过全体 ToC 投票成功进入 CNCF Incubation,是云原生领域首个晋级孵化的面向应用的交付…...
MSDP实验配置
目录 配置MSDP 配置PIM SM协议 配置各PIM SM域内的静态RP 配置MSDP对等体 配置域内的MSDP对等体 AR8和AR9建立EBGP邻居 配置域间的MSDP对等体 进行实验验证 什么是MSDP MSDP(Multicast Source Discovery Protocol)组播源发现协议的简称 用来传递…...
惊!初中生也来卷了……
大家好,我是良许。 前两天在抖音直播的时候,突然来了一位不速之客…… 他自称是初中生,一开始我还有点不太相信,直到跟他连麦,听到他还略带一些稚嫩的声音,我才知道,他没有骗我…… 他说他想学…...
kafka相关配置介绍
kafka默认配置 每个kafka broker中配置文件server.properties默认必须配置的属性如下: broker.id0 num.network.threads2 num.io.threads8 socket.send.buffer.bytes1048576 socket.receive.buffer.bytes1048576 socket.request.max.bytes104857600 log.dirs/tmp/…...
【PyTorch】教程:torch.nn.Hardtanh
torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值,默认为 -1max_val ([float]) – 线性区域的最大值,默认为 1inplace ([bool]) …...
神垕古镇景区5A级十年都没有实现的三大主因
钧 瓷 内 参 第40期(总第371期) 2023年3月5日 神垕古镇景区5A级十年都没有实现的三大主因 这是2013年,禹州市市政府第一次提出创建5A级景区到今年三月份整整十年啊! 目前神垕古镇景区是4A级景区,5A级一直进行中&a…...
react函数组件常用的几个钩子函数useState、useEffect、useRef、useCallback
react框架react框架包括包括两大类:类组件函数组件。类组件构成:constructor自定义方法。调用方法通过this.方法名()。constructor(superstate)构造器里面必有super字段。render()方法里面写页面布局。函数组件构成:各种钩子函数return()方法…...
4N60-ASEMI高压MOS管4N60
编辑-Z 4N60在TO-220封装里的静态漏极源导通电阻(RDS(ON))为2.5Ω,是一款N沟道高压MOS管。4N60的最大脉冲正向电流ISM为16A,零栅极电压漏极电流(IDSS)为1uA,其工作时耐温度范围为-55~150摄氏度。4N60功耗(…...
现代神经网络(VGG),并用VGG16进行实战CIFAR10分类
专栏:神经网络复现目录 本章介绍的是现代神经网络的结构和复现,包括深度卷积神经网络(AlexNet),VGG,NiN,GoogleNet,残差网络(ResNet),稠密连接网络…...
Java代码弱点与修复之——Dereference null return value(间接引用空返回值)
弱点描述 Dereference null return value,间接引用空返回值。是Coverity Scan静态代码分析工具中的一个警告,表示代码中有对可能为空(null)的方法或函数返回值进行间接引用(Dereference)操作。 该类型的漏洞可能会导致 NullPointerException 异常,并且会导致程序崩溃或…...
【冲刺蓝桥杯的最后30天】day3
大家好😃,我是想要慢慢变得优秀的向阳🌞同学👨💻,断更了整整一年,又开始恢复CSDN更新,从今天开始更新备战蓝桥30天系列,一共30天,如果对你有帮助或者正在备…...
SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:
根据万维钢精英日课6的内容,使用AI(2025)可以参考以下方法: 四个洞见 模型已经比人聪明:以ChatGPT o3为代表的AI非常强大,能运用高级理论解释道理、引用最新学术论文,生成对顶尖科学家都有用的…...
ABAP设计模式之---“简单设计原则(Simple Design)”
“Simple Design”(简单设计)是软件开发中的一个重要理念,倡导以最简单的方式实现软件功能,以确保代码清晰易懂、易维护,并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计,遵循“让事情保…...
深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
