Kafka-副本分配策略
一、上下文
《Kafka-创建topic源码》我们大致分析了topic创建的流程,为了保持它的完整性和清晰度。细节并没有展开分析。下面我们就来分析下副本的分配策略以及副本中的leader角色的确定逻辑。当有了副本分配策略,才会得到分区对应的broker,才可以在topic目录下写入对应的数据。Controller端才可以让这些分区和副本上线去提供服务。副本的leader角色确定后才能使producer生产的数据知道第一个写入的broker节点是哪个?以及follower的同步工作。
二、目的
1、将副本均匀地分布在broker之间
2、对于分配给特定broker的分区,其其他副本分布在其他broker上
3、如果所有broker都有机架信息,请尽可能将每个分区的副本分配给不同的机架
如果不考虑机架的情况下也要实现副本分配的目标,我们的做法是这样的
1、从broker列表中的随机位置开始,通过循环分配每个分区的第一个副本
2、以递增的移位分配每个分区的剩余副本
三、示例
1、场景描述
假如一个topic有6个分区(0, 1, 2, 3, 4, 5)且副本因子为3,对应的集群情况如下图:
每个机架对应的brokerid如下:
机架 | brokerId列表 |
rack1 | 0,5 |
rack2 | 3,4 |
rack3 | 1,2 |
获取机架交错的broker列表:(0, 3, 1, 5, 4, 2)
有了这个列表就课可以以简单的循环方式将副本分配给broker,确保每个broker上的leader和follower数量均匀分布,并将副本分配到所有机架。
2、分配结果
分区 | 副本所在的brokerId列表 |
0 | 0,3,1 |
1 | 3,1,5 |
2 | 1,5,4 |
3 | 5,4,2 |
4 | 4,2,0 |
5 | 2,0,3 |
机架感知分配总是使用机架交替broker列表上的轮询来选择分区的第一个副本。对于其余的副本,它将偏向于机架上没有任何副本分配的broker,直到每个机架都有一个副本。然后,任务将回到broker 列表上的循环。
因此,如果副本的数量 >= 机架的数量,它将确保每个机架至少获得一个副本。否则,每个机架最多只能获得一个副本。在副本数量与机架数量相同并且每个机架具有相同数量的代理的完美情况下,它保证了副本在broker和机架之间的分布是均匀的。
此时如果再增加一个分区(6分区)呢?按照规律分配的副本所在的broker列表应该是0,3,1。但如果这样就违背了目的3。一旦它完成了第一轮循环,如果有更多的分区要分配,算法将开始转移follower 。这是为了确保我们不会总是得到相同的序列集。因此分区6可能分配到的副本broker列表为0,4,2。
四、源码
//fixedStartIndex =-1 , startPartitionId = -1private static Map<Integer, List<Integer>> assignReplicasToBrokersRackAware(int nPartitions,int replicationFactor,Collection<BrokerMetadata> brokerMetadatas,int fixedStartIndex,int startPartitionId) {//存放broker和机架的对应关系Map<Integer, String> brokerRackMap = new HashMap<>();//获取broker和机架的对应关系brokerMetadatas.forEach(m -> brokerRackMap.put(m.id, m.rack.get()));//机架数量int numRacks = new HashSet<>(brokerRackMap.values()).size();//获取交替机架的broker列表List<Integer> arrangedBrokerList = getRackAlternatedBrokerList(brokerRackMap);//broker 数量int numBrokers = arrangedBrokerList.size();Map<Integer, List<Integer>> ret = new HashMap<>();// fixedStartIndex 的初始值是 -1 ,因此 startIndex = 一个 0 至 broker 数量 之间的随机整数int startIndex = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());// startPartitionId 的初始值是 -1 ,因此 currentPartitionId = 0int currentPartitionId = Math.max(0, startPartitionId);//下一个要分配的副本 ,第一次应该是 一个 0 至 broker 数量 之间的随机整数int nextReplicaShift = fixedStartIndex >= 0 ? fixedStartIndex : RAND.nextInt(arrangedBrokerList.size());//循环分区列表,对每个分区进行副本分配for (int i = 0; i < nPartitions; i++) {if (currentPartitionId > 0 && (currentPartitionId % arrangedBrokerList.size() == 0))nextReplicaShift += 1;//第一个副本索引int firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size();//默认将第一个副本 作为leaderint leader = arrangedBrokerList.get(firstReplicaIndex);List<Integer> replicaBuffer = new ArrayList<>();replicaBuffer.add(leader);Set<String> racksWithReplicas = new HashSet<>();racksWithReplicas.add(brokerRackMap.get(leader));Set<Integer> brokersWithReplicas = new HashSet<>();brokersWithReplicas.add(leader);//根据副本因子,进行副本的分配,因为有了leader,因此只用循环处理 replicationFactor - 1 的甚于副本分配int k = 0;for (int j = 0; j < replicationFactor - 1; j++) {boolean done = false;while (!done) {Integer broker = arrangedBrokerList.get(replicaIndex(firstReplicaIndex, nextReplicaShift * numRacks, k, arrangedBrokerList.size()));String rack = brokerRackMap.get(broker);// 跳过这个broker ,如果满足以下2个条件中的1个//1、同一机架中已经有一个broker分配了副本,并且有一个或多个机架没有任何副本,或者//2、broker已经分配了副本,但有一个或多个broker没有分配副本if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size() == numRacks)&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size() == numBrokers)) {replicaBuffer.add(broker);racksWithReplicas.add(rack);brokersWithReplicas.add(broker);done = true;}k += 1;}}//返回分区对应的副本的broker列表ret.put(currentPartitionId, replicaBuffer);currentPartitionId += 1;}return ret;}
相关文章:

Kafka-副本分配策略
一、上下文 《Kafka-创建topic源码》我们大致分析了topic创建的流程,为了保持它的完整性和清晰度。细节并没有展开分析。下面我们就来分析下副本的分配策略以及副本中的leader角色的确定逻辑。当有了副本分配策略,才会得到分区对应的broker,…...

市场波动不断,如何自我提高交易心理韧性?
交易市场,一个由无数变量交织而成的复杂领域,常常因各方因素的微妙变化而掀起波澜。在这里,机遇与挑战并存,诱人的利润与潜在的风险如影随形,共同考验着每一位交易员的智慧与心理承受能力。在这样的环境下,…...

加速科技精彩亮相中国国际半导体博览会IC China 2024
11月18日—20日,第二十一届中国国际半导体博览会(IC China 2024)在北京国家会议中心顺利举办,加速科技携重磅产品及全系测试解决方案精彩亮相,加速科技创始人兼董事长邬刚受邀在先进封装创新发展论坛与半导体产业前沿与…...

利用c语言详细介绍下选择排序
选择排序(Selection sort)是一种简单直观的排序算法。它是每次选出最小或者最大的元素放在开头或者结尾位置(采用升序的方式),最终完成列表排序的算法。 一、图文介绍 我们还是使用数组【10,5,3…...
华为流程L1-L6业务流程深度细化到可执行
该文档主要介绍了华为业务流程的深度细化及相关内容,包括流程框架、建模方法、流程模块描述、流程图建模等,旨在帮助企业构建有效的流程体系,实现战略目标。具体内容如下: 华为业务流程的深度细化 流程层级:华为业务流程分为 L1 - L6 六个层级,L1 为流程大类,L2 为流程…...

bridge-multicast-igmpsnooping
# 1.topo # 2.创建命名空间 ip netns add ns0 ip netns add ns1 ip netns add ns2 ip netns add ns3 # 3.创建veth设备 ip link add ns0-veth0 type veth peer name hn0-veth0 ip link add ns1-veth0 type veth peer name hn1-veth0 ip link add ns2-veth0 type veth pe…...

git使用(一)
git使用(一) 为什么学习git?两种版本控制系统在github上创建一个仓库(repository)windows上配置git环境在Linux上配置git环境 为什么学习git? 代码写了好久不小心删了,可以使用git防止,每写一部分代码通…...

Linux环境安装MongoDB
文章目录 1. 查看Linux系统的发行版本2. 下载MongoDB3. 安装MongoDB3.1 新建几个目录,分别用来存储 MongoDB 的数据和日志3.2 新建日志文件3.3 新建配置文件 4. 将MongoDB注册为服务4.1 新建服务文件4.2 编写服务文件 5. MongoDB服务相关操作5.1 启动MongoDB服务5.2…...

Cyberchef使用功能之-多种压缩/解压缩操作对比
cyberchef的compression操作大类中有大量的压缩和解压缩操作,每种操作的功能和区别是什么,本章将进行讲解,作为我的专栏《Cyberchef 从入门到精通教程》中的一篇,详见这里。 关于文件格式和压缩算法的理论部分在之前的文章《压缩…...
TypeScript 装饰器都有那些应用场景?如何更快的上手?
TypeScript 装饰器简介 在 TypeScript 中,装饰器(Decorators)是一种特殊的语法,用于在类、类方法、属性、访问器等上动态地添加行为或修改现有行为。装饰器可以用来增强类的功能、修改方法的行为,或者修改类的元数据等…...
堆优化版本的Prim
prim和dijkstra每轮找最小边的松弛操作其实是同源的,因而受dijkstra堆优化的启发,那么prim也可以采用小根堆进行优化。时间复杂度也由 O ( n 2 ) O(n^2) O(n2)降为 O ( n l o g n ) O(nlogn) O(nlogn)。 测试一下吧:原题链接 #include <i…...

Ubuntu上安装MySQL并且实现远程登录
目录 下载网络工具 查看网络连接 更新系统软件包; 安装mysql数据库 查看mysql数据库状态 以数字ip形式显示mysql的监听状态。(默认监听端口是3306) 查看安装mysql数据库时系统创建的目录信息。 根据查询到的系统用户名以及随机密码&a…...

蓝桥杯每日真题 - 第21天
题目:(空间) 题目描述(12届 C&C B组A题) 解题思路: 转换单位: 内存总大小为 256MB,换算为字节: 25610241024268,435,456字节 计算每个整数占用空间: 每个 32 位整数占用…...

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(下)----空间数据的编辑与处理(超超超详细!!!)
续上篇博客(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(上)----空间数据的编辑与处理(超超超详细!!!)-CSDN博客 继续更新 目录 什么是拓扑? 1.3.5道路…...

NLP论文速读(CVPR 2024)|使用DPO进行diffusion模型对齐
论文速读|Diffusion Model Alignment Using Direct Preference Optimization 论文信息: 简介: 本文探讨的背景是大型语言模型(LLMs)通过人类比较数据和从人类反馈中学习(RLHF)的方法进行微调,以…...

操作系统——揭开盖子
计算机执行时——取指执行 es:bx等于从0x9000开始,到0x90200结束...
如何在 React 项目中应用 TypeScript?应该注意那些点?结合实际项目示例及代码进行讲解!
在 React 项目中应用 TypeScript 是提升开发效率、增强代码可维护性和可读性的好方法。TypeScript 提供了静态类型检查、自动补全和代码提示等功能,这对于 React 开发者来说,能够帮助早期发现潜在的 bug,提高开发体验。 1. 项目初始化 在现…...

C++学习第四天
创作过程中难免有不足,若您发现本文内容有误,恳请不吝赐教。 提示:以下是本篇文章正文内容,下面案例可供参考 一、计算类对象的大小 #include<iostream> using namespace std;class Date { public:void Init(int year, in…...

【从零开始的LeetCode-算法】3232. 判断是否可以赢得数字游戏
给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中,Alice 可以从 nums 中选择所有个位数 或 所有两位数,剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和,则 Alice 获胜。 如果 Alice 能赢得这场游…...
一种简单高效的RTSP流在线检测方法,不需要再过渡拉流就可以获取设备状态以及对应音视频通道与编码格式
平台如何检测一路RTSP流是否在线? 在之前的流媒体平台方案中,我们都是通过定时RTSP拉流的方式,走一个完整的RTSP流程:包括OPTIONS、DESCRIBE、SETUP、PLAY、RTP收流,这种方式去取流,然后取到流之后进行流解…...

基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
反射获取方法和属性
Java反射获取方法 在Java中,反射(Reflection)是一种强大的机制,允许程序在运行时访问和操作类的内部属性和方法。通过反射,可以动态地创建对象、调用方法、改变属性值,这在很多Java框架中如Spring和Hiberna…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
AspectJ 在 Android 中的完整使用指南
一、环境配置(Gradle 7.0 适配) 1. 项目级 build.gradle // 注意:沪江插件已停更,推荐官方兼容方案 buildscript {dependencies {classpath org.aspectj:aspectjtools:1.9.9.1 // AspectJ 工具} } 2. 模块级 build.gradle plu…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
Java 二维码
Java 二维码 **技术:**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...