当前位置: 首页 > news >正文

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列

创建主题时,可以指定 writeQueueNums(写队列的个数)、readQueueNums(读队列的个数)。生产者发送消息时,使用写队列的个数返回路由信息;消费者消费消息时,使用读队列的个数返回路由信息。在物理文件层面,只有写队列才会创建文件。默认读、写队列的个数都是 16。

比如写队列的个数是 16,则创建 16 个文件夹,代表 0 - 15;读队列的个数是 8,则只会消费 0 - 7 这 8 个队列中的消息。

要求 readQueueNums >= writeQueueNums,最佳方案是两者相等。RocketMQ 设置读、写队列的目的是方便队列的扩容、缩容。

比如在原来指定读、写队列都是 16 的基础上进行扩容到 8 个。在不需要重启应用程序的情况下,先缩容写队列,由 0 - 15 缩容至 0 - 7。等到 8 - 15 队列中的消息全部消费完之后,再缩容读队列,由 0 - 15 缩容至 0 - 7。

队列的选择

方式一、指定 queueId 来选择具体的队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueue 参数。而 MessageQueue 可以指定 topic、queueId、brokerName 三个参数。

public MessageQueue(String topic, String brokerName, int queueId) {this.topic = topic;this.brokerName = brokerName;this.queueId = queueId;
}

方式二、根据 MessageQueueSelector 策略来选择队列

DefaultMQProducer 的 send / sendOneway 方法中可携带 MessageQueueSelector 参数。

public SendResult send(Message msg, MessageQueueSelector selector, Object arg);
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback);

RocketMQ 内部定义了三种 MessageQueueSelector 策略。

  • SelectMessageQueueByHash:基于方法参数arg的哈希值,对队列总数取模,选择对应下标的队列。
  • SelectMessageQueueByRandom:基于队列总数生成一个随机数,选择对应下标的队列。
  • SelectMessageQueueByMachineRoom:返回空。
public class SelectMessageQueueByHash implements MessageQueueSelector {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 取arg方法参数的哈希值,再对队列总数取模int value = arg.hashCode() % mqs.size();if (value < 0) {value = Math.abs(value);}// 选择对应的队列return mqs.get(value);}
}
public class SelectMessageQueueByRandom implements MessageQueueSelector {private Random random = new Random(System.currentTimeMillis());@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 基于队列总数生成一个随机数int value = random.nextInt(mqs.size());// 选择对应的队列return mqs.get(value);}
}

方式三、基于Broker的可用性采取轮询的策略选择队列

DefaultMQProducer 的 send / sendOneway 方法可以不携带 MessageQueue、MessageQueueSelector,简单看下这种方式的队列是如何选择。

这种方式下的 send / sendOneway 方法中内部会调用如下方法:

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

进入方法内部,看一下处理逻辑。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

MQFaultStrategy

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果开启了发送延迟规避机制,默认falseif (this.sendLatencyFaultEnable) {try {int index = tpInfo.getSendWhichQueue().incrementAndGet();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 获取指定下标的队列MessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 如果队列对应的Broker判定为可用,则返回该队列;否则基于轮询的策略选择下一个队列重复上述步骤进行判断if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();// 根据BrokerName获取存储的写队列的总数int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);
}

LatencyFaultToleranceImpl

@Override
public boolean isAvailable(final String name) {// 从缓存中获取指定brokerName对应的FaultItem实例final FaultItem faultItem = this.faultItemTable.get(name);// 如果缓存命中if (faultItem != null) {// 判断是否可用,即当前时间-startTimestamp是否>=0return faultItem.isAvailable();}return true;
}@Override
public String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;
}@Override
public void remove(final String name) {this.faultItemTable.remove(name);
}

TopicPublishInfo

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) {return selectOneMessageQueue();} else {for (int i = 0; i < this.messageQueueList.size(); i++) {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();}
}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);
}

额外分析一下 DefaultMQProducerImpl 的 updateFaultItem 方法。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

接着看下 MQFaultStrategy 的 updateFaultItem 方法。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 如果开启了发送延迟规避机制if (this.sendLatencyFaultEnable) {// 根据延迟时间计算不可用的时间long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新faultItemTable缓存this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}
}private long computeNotAvailableDuration(final long currentLatency) {for (int i = latencyMax.length - 1; i >= 0; i--) {// 根据延迟时间计算不可用的时间if (currentLatency >= latencyMax[i])return this.notAvailableDuration[i];}return 0;
}

接着分析 LatencyFaultToleranceImpl 的 updateFaultItem 方法的处理逻辑。

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 从缓存中获取指定BrokerName对应的FaultItem实例FaultItem old = this.faultItemTable.get(name);// 如果缓存未命中if (null == old) {// 构造 FaultItem 实例final FaultItem faultItem = new FaultItem(name);// 更新 currentLatecy、startTimestamp 属性faultItem.setCurrentLatency(currentLatency);faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);// 更新缓存old = this.faultItemTable.putIfAbsent(name, faultItem);if (old != null) {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}// 如果缓存命中  } else {// 更新 currentLatecy、startTimestamp 属性old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);}
}

相关文章:

RocketMQ架构篇 - 读写队列与生产者如何选择队列

读、写队列 创建主题时&#xff0c;可以指定 writeQueueNums&#xff08;写队列的个数&#xff09;、readQueueNums&#xff08;读队列的个数&#xff09;。生产者发送消息时&#xff0c;使用写队列的个数返回路由信息&#xff1b;消费者消费消息时&#xff0c;使用读队列的个…...

华为OD机试真题Python实现【通信误码】真题+解题思路+代码(20222023)

通信误码 题目 信号传播过程中会出现一些误码,不同的数字表示不同的误码 ID,取值范围为 1~65535,用一个数组记录误码出现的情况,每个误码出现的次数代表误码频度,请找出记录中包含频度最高误码的最小子数组长度。 🔥🔥🔥🔥🔥👉👉👉👉👉👉 华为OD…...

【单目3D目标检测】MonoDDE论文精读与代码解析

文章目录PrefacePros and ConsAbstractContributionsPreliminaryDirect depth estimationDepth from heightPespective-n-point&#xff08;PnP&#xff09;PipelineDiverse Depth EstimationsRobust Depth CombinationOutput distributionSelecting and combining reliable de…...

复习 Kotlin 从小白到大牛 第二版 笔记要点

4.2.2 常量和只读变量 常量和只读变量一旦初始化就不能再被修改。在kotlin中&#xff0c;声明常量是在标识符的前面加上val或const val 关键字。 1. val 声明的是运行时变量&#xff0c;在运行时进行初始化 2.const val 声明的是编译时常量&#xff0c;在编译时初始化 val …...

X264简介-Android使用(二)

X264简介-Android使用&#xff08;二&#xff09; 4、Ubuntu上安装ffmpeg&#xff1a; 检查更新本地软件包&#xff08;如果未更新&#xff0c;reboot Vmware&#xff09;&#xff1a; sudo apt update sudo apt upgrade官网下载的source文件安装&#xff1a; http://ffmpe…...

【独家】华为OD机试 - 统计差异值大于相似值二元组个数(C 语言解题)

最近更新的博客 华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧文章目录 最近更新的博客使用说明本期…...

掌握好Framework 才是王道~

现在面试对Android开发者的要求越来越高了&#xff01;从最开始的阿里、头条、腾讯等大厂&#xff0c;到现在的互联网车企&#xff0c;面试总喜欢问道 Framework底层原理的相关问题 Android Framework的三大核心功能&#xff1a; 1、View.java:View工作原理&#xff0c;实现包…...

Codeforces Round 856 (Div. 2) A — C

Codeforces Round 856 (Div. 2) 文章目录A. Prefix and Suffix Array题目大意题目分析codeB. Not Dividing题目大意题目分析codeC. Scoring Subsequences题目大意题目分析codeA. Prefix and Suffix Array 题目大意 给出一个字符串所有的非空前后缀&#xff0c;判断原字符串是…...

2022年MathorCup数学建模B题无人仓的搬运机器人调度问题解题全过程文档加程序

2022年第十二届MathorCup高校数学建模 B题 无人仓的搬运机器人调度问题 原题再现 本题考虑在无人仓内的仓库管理问题之一&#xff0c;搬运机器人 AGV 的调度问题。更多的背景介绍请参看附件-背景介绍。对于无人仓来说&#xff0c;仓库的地图模型可以简化为图的数据结构。 仓库…...

开源项目的演进会遇到哪些“坑”?KubeVela 从发起到晋级 CNCF 孵化的全程回顾

作者&#xff1a;孙健波、曾庆国 点击查看&#xff1a;「开源人说」第五期《KubeVela&#xff1a;一场向应用交付标准的冲锋》 2023 年 2 月&#xff0c;**KubeVela [ 1] ** 经过全体 ToC 投票成功进入 CNCF Incubation&#xff0c;是云原生领域首个晋级孵化的面向应用的交付…...

MSDP实验配置

目录 配置MSDP 配置PIM SM协议 配置各PIM SM域内的静态RP 配置MSDP对等体 配置域内的MSDP对等体 AR8和AR9建立EBGP邻居 配置域间的MSDP对等体 进行实验验证 什么是MSDP MSDP&#xff08;Multicast Source Discovery Protocol&#xff09;组播源发现协议的简称 用来传递…...

惊!初中生也来卷了……

大家好&#xff0c;我是良许。 前两天在抖音直播的时候&#xff0c;突然来了一位不速之客…… 他自称是初中生&#xff0c;一开始我还有点不太相信&#xff0c;直到跟他连麦&#xff0c;听到他还略带一些稚嫩的声音&#xff0c;我才知道&#xff0c;他没有骗我…… 他说他想学…...

kafka相关配置介绍

kafka默认配置 每个kafka broker中配置文件server.properties默认必须配置的属性如下&#xff1a; broker.id0 num.network.threads2 num.io.threads8 socket.send.buffer.bytes1048576 socket.receive.buffer.bytes1048576 socket.request.max.bytes104857600 log.dirs/tmp/…...

【PyTorch】教程:torch.nn.Hardtanh

torch.nn.Hardtanh 原型 CLASS torch.nn.Hardtanh(min_val- 1.0, max_val1.0, inplaceFalse, min_valueNone, max_valueNone) 参数 min_val ([float]) – 线性区域的最小值&#xff0c;默认为 -1max_val ([float]) – 线性区域的最大值&#xff0c;默认为 1inplace ([bool]) …...

神垕古镇景区5A级十年都没有实现的三大主因

钧 瓷 内 参 第40期&#xff08;总第371期&#xff09; 2023年3月5日 神垕古镇景区5A级十年都没有实现的三大主因 这是2013年&#xff0c;禹州市市政府第一次提出创建5A级景区到今年三月份整整十年啊&#xff01; 目前神垕古镇景区是4A级景区&#xff0c;5A级一直进行中&a…...

react函数组件常用的几个钩子函数useState、useEffect、useRef、useCallback

react框架react框架包括包括两大类&#xff1a;类组件函数组件。类组件构成&#xff1a;constructor自定义方法。调用方法通过this.方法名()。constructor(superstate)构造器里面必有super字段。render()方法里面写页面布局。函数组件构成&#xff1a;各种钩子函数return()方法…...

4N60-ASEMI高压MOS管4N60

编辑-Z 4N60在TO-220封装里的静态漏极源导通电阻&#xff08;RDS(ON)&#xff09;为2.5Ω&#xff0c;是一款N沟道高压MOS管。4N60的最大脉冲正向电流ISM为16A&#xff0c;零栅极电压漏极电流(IDSS)为1uA&#xff0c;其工作时耐温度范围为-55~150摄氏度。4N60功耗&#xff08;…...

现代神经网络(VGG),并用VGG16进行实战CIFAR10分类

专栏&#xff1a;神经网络复现目录 本章介绍的是现代神经网络的结构和复现&#xff0c;包括深度卷积神经网络&#xff08;AlexNet&#xff09;&#xff0c;VGG&#xff0c;NiN&#xff0c;GoogleNet&#xff0c;残差网络&#xff08;ResNet&#xff09;&#xff0c;稠密连接网络…...

Java代码弱点与修复之——Dereference null return value(间接引用空返回值)

弱点描述 Dereference null return value,间接引用空返回值。是Coverity Scan静态代码分析工具中的一个警告,表示代码中有对可能为空(null)的方法或函数返回值进行间接引用(Dereference)操作。 该类型的漏洞可能会导致 NullPointerException 异常,并且会导致程序崩溃或…...

【冲刺蓝桥杯的最后30天】day3

大家好&#x1f603;&#xff0c;我是想要慢慢变得优秀的向阳&#x1f31e;同学&#x1f468;‍&#x1f4bb;&#xff0c;断更了整整一年&#xff0c;又开始恢复CSDN更新&#xff0c;从今天开始更新备战蓝桥30天系列&#xff0c;一共30天&#xff0c;如果对你有帮助或者正在备…...

C++实现分布式网络通信框架RPC(3)--rpc调用端

目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中&#xff0c;我们已经大致实现了rpc服务端的各项功能代…...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者

抖音增长新引擎&#xff1a;品融电商&#xff0c;一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中&#xff0c;品牌如何破浪前行&#xff1f;自建团队成本高、效果难控&#xff1b;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...