当前位置: 首页 > news >正文

Kafka-副本分配策略

一、上下文

《Kafka-创建topic源码》我们大致分析了topic创建的流程,为了保持它的完整性和清晰度。细节并没有展开分析。下面我们就来分析下副本的分配策略以及副本中的leader角色的确定逻辑。当有了副本分配策略,才会得到分区对应的broker,才可以在topic目录下写入对应的数据。Controller端才可以让这些分区和副本上线去提供服务。副本的leader角色确定后才能使producer生产的数据知道第一个写入的broker节点是哪个?以及follower的同步工作。

二、目的

1、将副本均匀地分布在broker之间

2、对于分配给特定broker的分区,其其他副本分布在其他broker上

3、如果所有broker都有机架信息,请尽可能将每个分区的副本分配给不同的机架

如果不考虑机架的情况下也要实现副本分配的目标,我们的做法是这样的

1、从broker列表中的随机位置开始,通过循环分配每个分区的第一个副本

2、以递增的移位分配每个分区的剩余副本

三、示例

1、场景描述

假如一个topic有6个分区(0, 1, 2, 3, 4, 5)且副本因子为3,对应的集群情况如下图:

每个机架对应的brokerid如下:

机架brokerId列表
rack10,5
rack23,4
rack31,2

获取机架交错的broker列表:(0, 3, 1, 5, 4, 2)

有了这个列表就课可以以简单的循环方式将副本分配给broker,确保每个broker上的leader和follower数量均匀分布,并将副本分配到所有机架。

2、分配结果

分区副本所在的brokerId列表
00,3,1
13,1,5
21,5,4
35,4,2
44,2,0
52,0,3

机架感知分配总是使用机架交替broker列表上的轮询来选择分区的第一个副本。对于其余的副本,它将偏向于机架上没有任何副本分配的broker,直到每个机架都有一个副本。然后,任务将回到broker 列表上的循环。

因此,如果副本的数量 >= 机架的数量,它将确保每个机架至少获得一个副本。否则,每个机架最多只能获得一个副本。在副本数量与机架数量相同并且每个机架具有相同数量的代理的完美情况下,它保证了副本在broker和机架之间的分布是均匀的。

此时如果再增加一个分区(6分区)呢?按照规律分配的副本所在的broker列表应该是0,3,1。但如果这样就违背了目的3。一旦它完成了第一轮循环,如果有更多的分区要分配,算法将开始转移follower 。这是为了确保我们不会总是得到相同的序列集。因此分区6可能分配到的副本broker列表为0,4,2。

四、源码

    //fixedStartIndex =-1 , startPartitionId = -1private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int nPartitions,int replicationFactor,Collection<BrokerMetadata> brokerMetadatas,int fixedStartIndex,int startPartitionId) {//存放broker和机架的对应关系Map<Integer, String> brokerRackMap = new HashMap<>();//获取broker和机架的对应关系brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get()));//机架数量int numRacks = new HashSet<>(brokerRackMap.values()).size();//获取交替机架的broker列表List<Integer> arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap);//broker 数量int numBrokers = arrangedBrokerList.size();Map<Integer, List<Integer>> ret = new HashMap<>();// fixedStartIndex 的初始值是 -1 ,因此 startIndex = 一个 0 至 broker 数量 之间的随机整数int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());// startPartitionId 的初始值是 -1 ,因此 currentPartitionId = 0int currentPartitionId = Math.max(0, startPartitionId);//下一个要分配的副本 ,第一次应该是 一个 0 至 broker 数量 之间的随机整数int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());//循环分区列表,对每个分区进行副本分配for (int i = 0; i < nPartitions; i++) {if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0))nextReplicaShift += 1;//第一个副本索引int firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size();//默认将第一个副本 作为leaderint leader = arrangedBrokerList.get(firstReplicaIndex);List<Integer> replicaBuffer = new ArrayList<>();replicaBuffer.add(leader);Set<String> racksWithReplicas = new HashSet<>();racksWithReplicas.add(brokerRackMap.get(leader));Set<Integer> brokersWithReplicas = new HashSet<>();brokersWithReplicas.add(leader);//根据副本因子,进行副本的分配,因为有了leader,因此只用循环处理 replicationFactor - 1 的甚于副本分配int k = 0;for (int j = 0; j < replicationFactor - 1; j++) {boolean done = false;while (!done) {Integer broker = arrangedBrokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size()));String rack = brokerRackMap.get(broker);// 跳过这个broker ,如果满足以下2个条件中的1个//1、同一机架中已经有一个broker分配了副本,并且有一个或多个机架没有任何副本,或者//2、broker已经分配了副本,但有一个或多个broker没有分配副本if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size() == numRacks)&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) {replicaBuffer.add(broker);racksWithReplicas.add(rack);brokersWithReplicas.add(broker);done = true;}k += 1;}}//返回分区对应的副本的broker列表ret.put(currentPartitionId, replicaBuffer);currentPartitionId += 1;}return ret;}

相关文章:

Kafka-副本分配策略

一、上下文 《Kafka-创建topic源码》我们大致分析了topic创建的流程&#xff0c;为了保持它的完整性和清晰度。细节并没有展开分析。下面我们就来分析下副本的分配策略以及副本中的leader角色的确定逻辑。当有了副本分配策略&#xff0c;才会得到分区对应的broker&#xff0c;…...

市场波动不断,如何自我提高交易心理韧性?

交易市场&#xff0c;一个由无数变量交织而成的复杂领域&#xff0c;常常因各方因素的微妙变化而掀起波澜。在这里&#xff0c;机遇与挑战并存&#xff0c;诱人的利润与潜在的风险如影随形&#xff0c;共同考验着每一位交易员的智慧与心理承受能力。在这样的环境下&#xff0c;…...

加速科技精彩亮相中国国际半导体博览会IC China 2024

11月18日—20日&#xff0c;第二十一届中国国际半导体博览会&#xff08;IC China 2024&#xff09;在北京国家会议中心顺利举办&#xff0c;加速科技携重磅产品及全系测试解决方案精彩亮相&#xff0c;加速科技创始人兼董事长邬刚受邀在先进封装创新发展论坛与半导体产业前沿与…...

利用c语言详细介绍下选择排序

选择排序&#xff08;Selection sort&#xff09;是一种简单直观的排序算法。它是每次选出最小或者最大的元素放在开头或者结尾位置&#xff08;采用升序的方式&#xff09;&#xff0c;最终完成列表排序的算法。 一、图文介绍 我们还是使用数组【10&#xff0c;5&#xff0c;3…...

华为流程L1-L6业务流程深度细化到可执行

该文档主要介绍了华为业务流程的深度细化及相关内容,包括流程框架、建模方法、流程模块描述、流程图建模等,旨在帮助企业构建有效的流程体系,实现战略目标。具体内容如下: 华为业务流程的深度细化 流程层级:华为业务流程分为 L1 - L6 六个层级,L1 为流程大类,L2 为流程…...

bridge-multicast-igmpsnooping

# 1.topo # 2.创建命名空间 ip netns add ns0 ip netns add ns1 ip netns add ns2 ip netns add ns3 # 3.创建veth设备 ip link add ns0-veth0 type veth peer name hn0-veth0 ip link add ns1-veth0 type veth peer name hn1-veth0 ip link add ns2-veth0 type veth pe…...

git使用(一)

git使用&#xff08;一&#xff09; 为什么学习git?两种版本控制系统在github上创建一个仓库&#xff08;repository&#xff09;windows上配置git环境在Linux上配置git环境 为什么学习git? 代码写了好久不小心删了&#xff0c;可以使用git防止&#xff0c;每写一部分代码通…...

Linux环境安装MongoDB

文章目录 1. 查看Linux系统的发行版本2. 下载MongoDB3. 安装MongoDB3.1 新建几个目录&#xff0c;分别用来存储 MongoDB 的数据和日志3.2 新建日志文件3.3 新建配置文件 4. 将MongoDB注册为服务4.1 新建服务文件4.2 编写服务文件 5. MongoDB服务相关操作5.1 启动MongoDB服务5.2…...

Cyberchef使用功能之-多种压缩/解压缩操作对比

cyberchef的compression操作大类中有大量的压缩和解压缩操作&#xff0c;每种操作的功能和区别是什么&#xff0c;本章将进行讲解&#xff0c;作为我的专栏《Cyberchef 从入门到精通教程》中的一篇&#xff0c;详见这里。 关于文件格式和压缩算法的理论部分在之前的文章《压缩…...

TypeScript 装饰器都有那些应用场景?如何更快的上手?

TypeScript 装饰器简介 在 TypeScript 中&#xff0c;装饰器&#xff08;Decorators&#xff09;是一种特殊的语法&#xff0c;用于在类、类方法、属性、访问器等上动态地添加行为或修改现有行为。装饰器可以用来增强类的功能、修改方法的行为&#xff0c;或者修改类的元数据等…...

堆优化版本的Prim

prim和dijkstra每轮找最小边的松弛操作其实是同源的&#xff0c;因而受dijkstra堆优化的启发&#xff0c;那么prim也可以采用小根堆进行优化。时间复杂度也由 O ( n 2 ) O(n^2) O(n2)降为 O ( n l o g n ) O(nlogn) O(nlogn)。 测试一下吧&#xff1a;原题链接 #include <i…...

Ubuntu上安装MySQL并且实现远程登录

目录 下载网络工具 查看网络连接 更新系统软件包&#xff1b; 安装mysql数据库 查看mysql数据库状态 以数字ip形式显示mysql的监听状态。&#xff08;默认监听端口是3306&#xff09; 查看安装mysql数据库时系统创建的目录信息。 根据查询到的系统用户名以及随机密码&a…...

蓝桥杯每日真题 - 第21天

题目&#xff1a;(空间) 题目描述&#xff08;12届 C&C B组A题&#xff09; 解题思路&#xff1a; 转换单位&#xff1a; 内存总大小为 256MB&#xff0c;换算为字节&#xff1a; 25610241024268,435,456字节 计算每个整数占用空间&#xff1a; 每个 32 位整数占用…...

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(下)----空间数据的编辑与处理(超超超详细!!!)

续上篇博客&#xff08;长期更新&#xff09;《零基础入门 ArcGIS(ArcMap) 》实验一&#xff08;上&#xff09;----空间数据的编辑与处理&#xff08;超超超详细&#xff01;&#xff01;&#xff01;&#xff09;-CSDN博客 继续更新 目录 什么是拓扑&#xff1f; 1.3.5道路…...

NLP论文速读(CVPR 2024)|使用DPO进行diffusion模型对齐

论文速读|Diffusion Model Alignment Using Direct Preference Optimization 论文信息&#xff1a; 简介&#xff1a; 本文探讨的背景是大型语言模型&#xff08;LLMs&#xff09;通过人类比较数据和从人类反馈中学习&#xff08;RLHF&#xff09;的方法进行微调&#xff0c;以…...

操作系统——揭开盖子

计算机执行时——取指执行 es:bx等于从0x9000开始&#xff0c;到0x90200结束...

如何在 React 项目中应用 TypeScript?应该注意那些点?结合实际项目示例及代码进行讲解!

在 React 项目中应用 TypeScript 是提升开发效率、增强代码可维护性和可读性的好方法。TypeScript 提供了静态类型检查、自动补全和代码提示等功能&#xff0c;这对于 React 开发者来说&#xff0c;能够帮助早期发现潜在的 bug&#xff0c;提高开发体验。 1. 项目初始化 在现…...

C++学习第四天

创作过程中难免有不足&#xff0c;若您发现本文内容有误&#xff0c;恳请不吝赐教。 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、计算类对象的大小 #include<iostream> using namespace std;class Date { public:void Init(int year, in…...

【从零开始的LeetCode-算法】3232. 判断是否可以赢得数字游戏

给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中&#xff0c;Alice 可以从 nums 中选择所有个位数 或 所有两位数&#xff0c;剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和&#xff0c;则 Alice 获胜。 如果 Alice 能赢得这场游…...

一种简单高效的RTSP流在线检测方法,不需要再过渡拉流就可以获取设备状态以及对应音视频通道与编码格式

平台如何检测一路RTSP流是否在线&#xff1f; 在之前的流媒体平台方案中&#xff0c;我们都是通过定时RTSP拉流的方式&#xff0c;走一个完整的RTSP流程&#xff1a;包括OPTIONS、DESCRIBE、SETUP、PLAY、RTP收流&#xff0c;这种方式去取流&#xff0c;然后取到流之后进行流解…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

CentOS下的分布式内存计算Spark环境部署

一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架&#xff0c;相比 MapReduce 具有以下核心优势&#xff1a; 内存计算&#xff1a;数据可常驻内存&#xff0c;迭代计算性能提升 10-100 倍&#xff08;文档段落&#xff1a;3-79…...

稳定币的深度剖析与展望

一、引言 在当今数字化浪潮席卷全球的时代&#xff0c;加密货币作为一种新兴的金融现象&#xff0c;正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而&#xff0c;加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下&#xff0c;稳定…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

C#中的CLR属性、依赖属性与附加属性

CLR属性的主要特征 封装性&#xff1a; 隐藏字段的实现细节 提供对字段的受控访问 访问控制&#xff1a; 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性&#xff1a; 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑&#xff1a; 可以…...

tomcat指定使用的jdk版本

说明 有时候需要对tomcat配置指定的jdk版本号&#xff0c;此时&#xff0c;我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...

Vue3 PC端 UI组件库我更推荐Naive UI

一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用&#xff0c;前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率&#xff0c;还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库&#xff08;Naive UI、Element …...

用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法

用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法 大家好,我是Echo_Wish。最近刷短视频、看直播,有没有发现,越来越多的应用都开始“懂你”了——它们能感知你的情绪,推荐更合适的内容,甚至帮客服识别用户情绪,提升服务体验。这背后,神经网络在悄悄发力,撑起…...

Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术点解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、Spring MVC与MyBatis技术点解析 第一轮&#xff1a;基础概念问题 请解释Spring框架的核心容器是什么&#xff1f;它的作用是什么&#xff1f; 程序员JY回答&#xff1a;Spring框架的核心容器是IoC容器&#xff08;控制反转…...