Kafka分区管理大师指南:扩容、均衡、迁移与限流全解析
#作者:孙德新
文章目录
- 分区分配操作(kafka-reassign-partitions.sh)
- 1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)
- 1.2、修改topic分区partition的副本数(扩缩容副本)
- 1.3、Partition Reassign场景限流
- 1.4、节点内副本移动到不同目录的场景限流
- 1.5、集群Broker恢复时,副本数据同步场景限流
分区分配操作(kafka-reassign-partitions.sh)
1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)
Kafka系统提供了一个分区重新分配工具(kafka-reassign-partitions.sh),该工具可用于在Broker之间迁移分区。理想情况下,将确保所有Broker的数据和分区均匀分配。分区重新分配工具无法自动分析Kafka群集中的数据分布并迁移分区以实现均匀的负载均衡。因此,管理员在操作的时候,必须弄清楚应该迁移哪些Topic或分区。
Kafka物理节点扩容方法:只要每个kafka配置文件的brokerid 需要全局唯一,不冲突,启动新kafka节点,即可自动加入原有kafka集群。但是原集群topic数据不会自动移动到新kafka节点上,需要手动迁移。新topic则可以自动分配到新节点上。
–topics-to-move-json-file 指定json文件,文件内容为topic配置 Json文件格式如下:
{"topics": [{"topic": "test_create_topic1"}],"version": 1
}
–generate 尝试给出副本重分配的策略,该命令并不实际执行
–broker-list 指定想要分配的Broker节点,用于尝试给出分配策略,与–generate搭配使用 。–broker-list “0,1,2,3”
–broker-list举例:
./kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list “0,1,2,3” --generate
执行完毕之后会打印如下,需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它
Current partition replica assignment//当前副本分配方式
Current partition replica assignment//当前副本分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}Proposed partition reassignment configuration//期望的重新分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}
–reassignment-json-file 指定要重分配的json文件,与–execute搭配使用
–execute 执行。开始执行重分配任务,与–reassignment-json-file搭配使用
–verify 验证任务是否执行成功,检查分区重新分配的状态。当有使用–throttle限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响。 该选项用于检查分区重新分配的状态,同时–throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。
–throttle 迁移过程Broker之间现在流程传输的速率,单位 bytes/sec – throttle 500000 。迁移过程注意流量陡增对集群的影响。Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响
> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000
加上一个–throttle 50000000 参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s,加上参数后可以看到如下
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上–replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file
–replica-alter-log-dirs-throttle broker内部副本跨路径迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上。单位bytes/sec --replica-alter-log-dirs-throttle 100000
–disable-rack-aware 关闭机架感知能力,在分配的时候就不参考机架的信息
–bootstrap-server 如果是副本跨路径迁移必须有此参数
下面是topic数据均衡操作(迁移操作也是这个思路,只是最终kafka节点目标少):
背景:原有3台kafka集群:192.168.40.11–13,扩容一台kafka 192.168.40.14,把原有3台数据均衡到4台kafka上
首先在现有集群的基础上再添加⼀个Kafka节点,进行物理组件节点扩容,然后使⽤Kafka⾃带的kafka-reassign-partitions.sh ⼯具来重新分布分区,进行数据均衡。该⼯具有三种使⽤模式:
generate模式,给定需要重新分配的Topic,⾃动⽣成reassign plan(并不执⾏)。在此模式下,给定Topic列表和Broker列表,该工具会生成候选重新分配,以将指定Topic的所有分区迁移到新Broker中。此选项仅提供了一种方便的方法,可在给定Topic和目标Broker列表的情况下生成分区重新分配计划。
execute模式,根据指定的reassign plan重新分配Partition。在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用–reassignment-json-file选项)。由管理员手动制定自定义重新分配计划,也可以使用–generate选项提供。
verify模式,验证重新分配Partition是否成功。在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以有成功、失败或正在进行等状态。
基本步骤如下:
- 查看集群中当前所有可用的topic,目标是名为three的topic进行数据均衡到1234,4个kafka节点上
[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server 192.168.40.11:9092
__consumer_offsets
first
second
three
topic_first
- 查看计划数据均衡的topic的详细信息
列出 对应的topic名称的详细信息,包括topic的分区数量,副本及leader等。
./kafka-topics.sh --describe --zookeeper zk-ip:port --topic topic名称 (命令模板)
[root@localhost bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092 --topic three
[2024-03-06 01:25:53,446] WARN [AdminClient clientId=adminclient-1] Connection to node 4 (/192.168.40.14:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: three PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: three Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 2,3,1Topic: three Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,3,1Topic: three Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 2,3,1Topic: three Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 2,3,1Topic: three Partition: 4 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
- 编制负载均衡topic主题清单的json文件:
借助kafka-reassign-partitions.sh⼯具⽣成reassign 计划,不过我们先得按照要求定义⼀个json⽂件,文件名自定义。⾥⾯说明哪些topic需要重新分区,版本号固定为1,⽂件内容如下:
[root@node1 ~]# cat topics-to-move.json
{ "topics": [ { "topic":"three" } ], "version":1}
或如下
[root@localhost ~]# cat topic-to-move.json
{"topics": [{"topic": "three"}],
"version":1
}
- 生成副本均衡迁移计划:
然后使⽤ kafka-reassign-partitions.sh⼯具利用–generate⽣成reassign计划
./kafka-reassign-partitions.sh --zookeeper 192.168.12.100:2181 --topics-to-move-json-file ./plans/topic-to-move.json --broker-list "1,2,3,4,5,6" --generate (命令模板)
参数–broker-list “1,2,3,4” 表示期望均衡、迁移的本kafka集群全部节点(也可是部分节点,但节点个数不能小于副本数量)。执行后输出两段内容,其中黄色标识部分就是副本存储计划文件内容,⽣成的就是将分区重新分布到broker 上的结果
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --topics-to-move-json-file /root/topic-to-move.json --broker-list "1,2,3,4" --generate
Current partition replica assignment //目前的副本分配状态,这个需要保留,可以用它回滚!
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration //期望计划的副本分配状态
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}注:
上述黄色输出内容说明解释,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],是计划将"topic":"three","partition":0分区分配到broker2、4、1上,后面内容也同样
- 编制存储计划json文件:
上面最后一段(黄色标识)就是副本存储计划文件内容,⽣成的就是将分区重新分布到broker 上的结果,再创建一个存储计划json文件,文件名自定义
[root@localhost bin]# vim increase-replication-factor.json
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}
- 执行存储计划:
该命令会将指定分区的副本重新分配到新的 broker上。集群控制器通过为每个分区添加新副本实现重新分配(增加复制系数)。新的副本将从分区的首领那里复制所有数据。根据分区大小的不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里移除(恢复到原先的复制系数)。
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/increase-replication-factor.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4
- 验证副本存储计划:
在重分配进行过程中或者完成之后,可以使用 kafka-reassign-partitions.sh 工具验证重分配 的状态。它可以显示重分配的进度、已经完成重分配的分区以及错误信息。
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/increase-replication-factor.json --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.Clearing broker-level throttles on brokers 1,2,3,4 //说明已经分配在kafka集群的全部4个节点上了
Clearing topic-level throttles on topic three
- 再次验证副本存储计划:
说明没问题,成功。14主机上分配了分区0、2、3,共3个分区
[root@localhost bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092 --topic three
Topic: three PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: three Partition: 0 Leader: 1 Replicas: 2,4,1 Isr: 2,1,4Topic: three Partition: 1 Leader: 2 Replicas: 3,1,2 Isr: 2,3,1Topic: three Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4Topic: three Partition: 3 Leader: 1 Replicas: 1,3,4 Isr: 3,1,4Topic: three Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,3,1
到kafka-14主机查看数据,已经有数据了,分区0、2、3在14主机上
[root@localhost log]# ll
total 16
-rw-r--r--. 1 root root 0 Mar 4 21:48 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Mar 6 06:55 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Mar 6 01:47 meta.properties
-rw-r--r-- 1 root root 34 Mar 6 06:55 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 34 Mar 6 06:55 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Mar 6 03:01 three-0
drwxr-xr-x 2 root root 141 Mar 6 03:06 three-2
drwxr-xr-x 2 root root 141 Mar 6 03:01 three-3
[root@localhost log]# cd three-0
[root@localhost three-0]# ll
total 0
-rw-r--r-- 1 root root 10485760 Mar 6 03:01 00000000000000000000.index
-rw-r--r-- 1 root root 0 Mar 6 03:01 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Mar 6 03:01 00000000000000000000.timeindex
-rw-r--r-- 1 root root 0 Mar 6 03:01 leader-epoch-checkpoint
1.2、修改topic分区partition的副本数(扩缩容副本)
分区重分配工具提供了一些特性,用于改变分区的复制系数(副本数)。如果在创建分区时指定了错误的复制系数、副本数(比如在创建主题时没有足够多可用的 broker),那么就有必要修改它们。这可以通过创建一个 JSON 对象来完成,该对象使用分区重新分配的执行步骤中使用的格式,显式指定分区所需的副本数量。集群将完成重分配过程,并使用新的复制系数、副本数。
Kafka不能通过命令行方式修改副本,需要通过json等其他方式修改。无论是增加或减少副本,都可以针对每个分区进行个性增加或减少,不是必须所有分区都增加或同时减少。
增加副本操作:
创建json文件:
1)方法一:使用命令自动创建(有待进一步验证)
指定要修改副本数的Topic和分区,以及新的副本分配。这个JSON文件可以通过kafka-reassign-partitions.sh脚本的–generate选项自动生成。命令模板如下,11 14 18为kafka的broker id。
首先创建一个文件addReplicas.json,现有业务topic名称为aaa
[root@kafka18 ~]# cat addReplicas.json
{"topics": [{"topic": "aaa"}],"version": 1
}[root@kafka18 ~]# /usr/local/kafka_2.13-2.7.1/bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.40.18:9092 --topics-to-move-json-file addReplicas.json --broker-list "11,14,18" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"aaa","partition":0,"replicas":[14,18,11],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":1,"replicas":[11,14,18],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":2,"replicas":[18,11,14],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"aaa","partition":0,"replicas":[11,14,18],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":1,"replicas":[14,18,11],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":2,"replicas":[18,11,14],"log_dirs":["any","any","any"]}]}
2)方法二:手动编辑创建
假设topic已有3个分区2个副本,需要增加到3个副本。创建一个预定义的topic分区副本json文件,即副本存储计划:
]# cat addReplicas.json //该名称自定义,该文件与下面reassign.json文件内容一致,仅格式不同,使用reassign.json文件
{"version": 1, //固定写法"partitions": [{"topic": "test", //业务的topic名称"partition": 0, //partition的序号,必须从0开始,是第一个分区"replicas": [ //期望增加到的副本数,数字为broker的id,说明期望增加到012,3个broker上,为3个副本0,1,2]},{"topic": "test","partition": 1,"replicas": [1,0,2]},{"topic": "test","partition": 2,"replicas": [2,0,1]}]
}
或如下格式编写

整体操作步骤如下:
首先查看topic的副本情况,发现有5个分区,每个分区有2个副本
bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092 --describe
Topic: three PartitionCount: 5 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: three Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: three Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: three Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: three Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: three Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3
首先编写配置文件,把上面查询输出复制过来即可:
vim reassign.json
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2]},{"topic":"three","partition":1,"replicas":[2,1,3]},{"topic":"three","partition":2,"replicas":[3,2,1]},{"topic":"three","partition":3,"replicas":[1,2,3]},{"topic":"three","partition":4,"replicas":[2,3,1]}
]}
执行副本存储计划
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"three","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"three","partition":2,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"three","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"three","partition":4,"replicas":[2,3],"log_dirs":["any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4
查看执行的状态:
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic three
再次查看验证结果,黄色部分broker的id,说明副本都增加成功了,分布在原有kafka集群了
]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092 --describe
Topic: three PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: three Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2Topic: three Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3Topic: three Partition: 2 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1Topic: three Partition: 3 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3Topic: three Partition: 4 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
减少topic副本操作:(即从某个kafka节点下线副本)
把下面4分区3副本,名为topic_first的topic,减少计划如下,黄色为计划减掉的副本
[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092 --describe
Topic: topic_first PartitionCount: 4 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: topic_first Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2Topic: topic_first Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3Topic: topic_first Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: topic_first Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
我们必须保留Leader副本,再把计划保留的其他Follower副本编辑上,首先编写配置文件,把上面查询输出复制过来即可:
vim reassign.json
{"version":1,"partitions":[{"topic":"topic_first","partition":0,"replicas":[3,1]},{"topic":"topic_first","partition":1,"replicas":[1]},{"topic":"topic_first","partition":2,"replicas":[2,1]},{"topic":"topic_first","partition":3,"replicas":[3,2]}
]}
执行计划
]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute //有zk写法
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file reassign.json --execute //无zk
Json文件要用绝对路径,或者确保可以找到json文件
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"topic_first","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic_first-0,topic_first-1,topic_first-2,topic_first-3
查询执行结果
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition topic_first-0 is complete.
Reassignment of partition topic_first-1 is complete.
Reassignment of partition topic_first-2 is complete.
Reassignment of partition topic_first-3 is complete.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic topic_first
再次查看 Topic 的详情,否按照预想减少副本,结果成功!
[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092 --describe
Topic: topic_first PartitionCount: 4 ReplicationFactor: 2 Configs: segment.bytes=1073741824Topic: topic_first Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1Topic: topic_first Partition: 1 Leader: 1 Replicas: 1 Isr: 1Topic: topic_first Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1Topic: topic_first Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2
故障恢复方法:
宕机如何恢复:
少部分副本宕机
当leader宕机了,会从follower选择⼀个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader⾥pull数据。
全部副本宕机
当全部副本宕机了有两种恢复⽅式:
等待ISR中的⼀个恢复后,并选它作为leader。(等待时间较⻓,降低可⽤性)
选择第⼀个恢复的副本作为新的leader,⽆论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)
只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是⼀些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。
如果所有的ISR副本都失败了怎么办?此时有两种⽅法可选:
等待ISR集合中的副本复活,
选择任何⼀个⽴即可⽤的副本,⽽这个副本不⼀定是在ISR集合中。需要设置 unclean.leader.election.enable=true,如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的leader,这样有可能造成数据的丢失
这两种⽅法各有利弊,实际⽣产中按需选择。
如果要等待ISR副本复活,虽然可以保证⼀致性,但可能需要很⻓时间。⽽如果选择⽴即可⽤的副本,则很可能该副本并不⼀致。
总结:
Kafka中Leader分区选举,通过维护⼀个动态变化的ISR集合来实现,⼀旦Leader分区丢掉,则从ISR中随机挑选⼀个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:
可以等待ISR中的副本任何⼀个恢复,接着对外提供服务,需要时间等待。
从OSR中选出⼀个副本做Leader副本,此时会造成数据丢失
还有一个ISR,该参数全称,in-sync replica,它维护了一个集合,例如截图里的2,0,1,代表2,0,1副本保存的消息日志与leader 副本是保持一致的,只有保持一致的副本(包括所有副本),才会被维护在ISR集合里,当出现一定程度的不同步时,就会将该对应已经不一致的副本移出ISR集合,但是,这种移出并非永久的,一旦被移出的副本慢慢又恢复与leader一样时,那么,又会被加回isr集合当中。注意一点,只有在这个ISR里的副本服务器,才能在leader出现问题时有机会被选举为新的leader。
补充
用步骤1.1的 --generate 获取一下当前的分配情况,得到如下json
{"version": 1,"partitions": [{"topic": "test_create_topic1","partition": 2,"replicas": [2],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 1,"replicas": [1],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 0,"replicas": [0],"log_dirs": ["any"]
}]}假如想把所有分区的副本都变成2, 那只需修改"replicas": []里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以根据自己想要的分配规则修改一下json文件就变成如下
{"version": 1,"partitions": [{"topic": "test_create_topic1","partition": 2,"replicas": [2,0],"log_dirs": ["any","any"]}, {"topic": "test_create_topic1","partition": 1,"replicas": [1,2],"log_dirs": ["any","any"]}, {"topic": "test_create_topic1","partition": 0,"replicas": [0,1],"log_dirs": ["any","any"]}]}
注意log_dirs里面的数量要和replicas数量匹配;或者直接把log_dirs选项删除掉; 这个log_dirs是副本跨路径迁移时候的绝对路径
执行–execute
如果想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。可以重新运行execute命令,用相同的reassignment-json-file:
验证–verify,完事之后,副本数量就增加了
副本缩容
副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除;
比如刚新增了一个副本,想重新恢复到一个副本
执行下面的json文件
{"version": 1,"partitions": [{"topic": "test_create_topic1","partition": 2,"replicas": [2],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 1,"replicas": [1],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 0,"replicas": [0],"log_dirs": ["any"]}]}
1.3、Partition Reassign场景限流
Reassign中文重新分配
Partition Reassign限流注意事项:
限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
限流参数不会对正常的副本fetch流量进行限速。
任务完成后,您需要通过verify参数移除Topic和Broker上的限速参数配置。
如果刚开始已经设置了throttle参数,则可以通过execute命令再次修改throttle参数。
如果刚开始没有设置throttle参数,则需要使用kafka-configs.sh命令修改Topic上的leader.replication.throttled.replicas和follower.replication.throttled.replicas参数、修改Broker上的leader.replication.throttled.rate和follower.replication.throttled.rate参数。
使用kafka-reassign-partitions.sh来进行Partition Reassign操作,通过使用throttle参数来设置限流的大小。示例如下所示:
- 创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
通过以下命令查看Topic详情。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
- 执行以下命令,模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
- 设置throttle参数并执行reassign操作。
创建reassignment-json-file文件reassign.json,写入如下内容。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
执行reassign操作。
由于模拟的写入速度为10 Mbit/s,所以将reassign限流速度设置为30 Mbit/s。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
- 查看限流参数。
查看指定节点的Broker参数。2是broker的id
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe
查看指定Topic的参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
- 查看reassign任务执行情况。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
说明:任务完成后,您需要重复执行上述命令以移除限流参数。
1.4、节点内副本移动到不同目录的场景限流
通过kafka-reassign-partitions.sh可以进行Broker节点内的副本迁移,参数replica-alter-log-dirs-throttle可以对节点内的迁移IO进行限制。示例如下所示:
- 创建测试Topic。
执行以下命令,创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
可以通过以下命令查看Topic详情。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
- 执行以下命令,模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
- 设置参数replica-alter-log-dirs-throttle并执行reassign操作。
创建文件reassign.json,将目标目录写入reassignment文件中,内容如下。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}执行replicas movement操作。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
- 查看限流参数。
Broker节点内目录间移动副本会在Broker上配置限流参数,参数名为Brokerreplica.alter.log.dirs.io.max.bytes.per.second。
执行以下命令,查看指定节点的Broker参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
- 查看reassign任务执行情况。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
说明:任务完成后,您需要重复执行上述命令以移除限流参数。
1.5、集群Broker恢复时,副本数据同步场景限流
重要提示:
限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
限流参数不会对正常的副本fetch流量进行限速。
数据恢复完成后,需要使用kafka-configs.sh命令删除相应的参数。
步骤:
当Broker重启时,需要从leader副本进行副本数据的同步。在Broker节点迁移、坏盘修复重新上线等场景时,由于之前的副本数据完全丢失、副本数据恢复会产生大量的同步流量,有必要对恢复过程进行限流避免恢复流量过大影响正常流量。
- 创建测试Topic。
执行以下命令,创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
可以通过以下命令查看Topic详情。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
- 执行以下命令,写入测试数据。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
- 通过kafka-configs.sh命令设置限流参数。
//设置Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"//设置Broker上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
-
停止Broker 1节点进程。
-
删除Broker 1上的副本数据,模拟数据丢失的场景。
rm -rf /mnt/disk2/kafka/log/test-throttled-0/ -
启动Broker 1节点,观察限流参数是否起作用。
-
待Broker 1相应的副本恢复到ISR列表后,使用kafka-configs.sh命令删除限流参数的配置。
//删除Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled//删除Broker上的限流参数
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
相关文章:
Kafka分区管理大师指南:扩容、均衡、迁移与限流全解析
#作者:孙德新 文章目录 分区分配操作(kafka-reassign-partitions.sh)1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)1.2、修改topic分区partition的副本数(扩缩容副本)1.3、Partition Reassign场景限流1.4、节点内副本移动到不…...
3.从零开始学会Vue--{{生命周期,工程化,组件化}}
1.生命周期钩子 1.是什么 生命周期 概念:就是一个Vue实例从创建 到 销毁 的整个过程 生命周期包括:① 创建 ② 挂载 ③ 更新 ④ 销毁 四个阶段 1.创建阶段:创建响应式数据 2.挂载阶段:渲染模板 3.更新阶段:修改…...
Python--网络编程
3. 网络编程与Socket 3.1 Socket基础 创建Socket import socket# TCP Socket tcp_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)# UDP Socket udp_socket socket.socket(socket.AF_INET, socket.SOCK_DGRAM)服务器端函数 函数描述bind((host, port))绑定…...
【java】方法的基本内存原理(栈和堆)
java内存主要分为栈和堆,方法相关的部分主要在栈内存里,每个方法调用时会在栈里创建一个栈帧,存放局部变量和方法执行的信息。执行完后栈帧被销毁,局部变量消失。而对象实例存在堆里,由垃圾回收器管理。 **Java方法内…...
SQLMesh 系列教程4- 详解模型特点及模型类型
SQLMesh 作为一款强大的数据建模工具,以其灵活的模型设计和高效的增量处理能力脱颖而出。本文将详细介绍 SQLMesh 模型的特点和类型,帮助读者快速了解其强大功能。我们将深入探讨不同模型类型(如增量模型、全量模型、SCD Type 2 等࿰…...
SpringBoot(接受参数相关注解)
文章目录 1.基本介绍2.PathVariable 路径参数获取信息 1.代码实例 1.index.html2.ParameterController.java3.测试 2.细节说明 3.RequestHeader 请求头获取信息 1.代码实例 1.index.html2.ParameterController.java3.测试 2.细节说明 4.RequestParameter 请求获取参数信息 1.…...
hbase合并队列超长问题分析
问题现象 hbase集群合并队列超长,有节点上合并任务已经运行超过1天未结束,合并队列总长不断增加。 问题分析 参数配置: 配置参数默认值含义hbase.hregion.memstore.flush.size128MMemStore达到该值会Flush成StoreFilehbase.hregion.memstore.block.multiplier4当region中…...
FPGA的星辰大海
编者按 时下风头正盛的DeepSeek,正值喜好宏大叙事的米国大统领二次上岗就业,OpenAI、软银、甲骨文等宣布投资高达5000亿美元“星际之门”之际,对比尤为强烈。 某种程度上,,是低成本创新理念的直接落地。 包括来自开源社区的诸多赞誉是,并非体现技术有多“超越”,而是…...
认识vue-admin
认识vue-admin **核心交付:** 为什么要基于现成架子二次开发 什么是二次开发:基于已有的代码(项目工程,脚手架)开进行新功能的开发 所以看懂已有的框架中的既有代码,变得很重要了 1. 背景知识 后台管理系统是一种最…...
STM32、GD32驱动TM1640原理图、源码分享
一、原理图分享 二、源码分享 /************************************************* * copyright: * author:Xupeng * date:2024-07-18 * description: **************************************************/ #include "smg.h"#define DBG_TAG "smg&…...
spring boot 对接aws 的S3 服务,实现上传和查询
1.aws S3介绍 AWS S3(Amazon Simple Storage Service)是亚马逊提供的一种对象存储服务,旨在提供可扩展、高可用性和安全的数据存储解决方案。以下是AWS S3的一些主要特点和功能: 1.1. 对象存储 对象存储模型:S3使用…...
PH热榜 | 2025-02-12
1. FirstHR 2.0 with HR Copilot 标语:小型企业的一站式人力资源平台 介绍:对小型企业来说,FirstHR是一个人力资源平台,专注于招聘和团队发展,并融合了一点人工智能技术。 产品网站: 立即访问 Product …...
通过例子学 rust 个人精简版 1-1
1-1 Hello World fn main() {println!("Hello World!");// 动手试一试println!("Im a Rustacean!"); }Hello World! Im a Rustacean!要点1 :println 自带换行符 注释 fn main() {let x 5 /* 90 */ 5;println!("Is x 10 or 100? x …...
HTTP的前世今生:如何塑造现代互联网的交互方式?
一、关于HTTP 1.1 简介 “没有HTTP协议,就没有今天的互联网。” 从简单的文本传输到支撑全球数十亿设备的实时交互,HTTP协议始终是Web世界的核心纽带。本文将深入剖析其设计思想、演进历程及底层工作原理。 HTTP(HyperText Transfer Protoco…...
Flutter_学习记录_动画的简单了解
用AnimationController简单实现如下的效果图: 1. 只用AnimationController实现简单动画 1.1 完整代码案例 import package:flutter/material.dart;class AnimationDemo extends StatefulWidget {const AnimationDemo({super.key});overrideState<AnimationDe…...
【java】for (int num : numbers) { System.out.print(num + “ “); } for里的是什么意思
for (int num : numbers) 是 Java 中的一种 增强型 for 循环(也称为 for-each 循环)。它的作用是遍历数组或集合中的每一个元素,并对每个元素执行循环体中的操作。 1. 增强型 for 循环的语法 java Copy for (元素类型 变量名 : 数组或集合…...
内容中台驱动企业CMS架构优化与高效策略
内容概要 在数字化转型浪潮中,企业内容管理系统(CMS)正面临从单一内容存储向智能化、协同化方向演进的迫切需求。通过引入内容中台架构,企业能够有效整合元数据管理、版本控制与智能协作能力,从而优化传统CMS的底层逻…...
我用 Cursor 开发了一款个人小记系统
https://note.iiter.cn 项目背景 在日常工作和学习中,我们经常需要快速记录一些想法、收藏一些有用的链接或者保存一些重要的文本、图片内容。虽然市面上已经有很多笔记软件,但我想要一个更轻量、更简单的工具,专注于快速记录和智能检索。于是我开发了这款个人小记系统。 系统…...
百问网(100ask)提供的烧写工具的原理和详解;将自己编译生成的u-boot镜像文件烧写到eMMC中
百问网(100ask)提供的烧写工具的原理 具体的实现原理见链接 http://wiki.100ask.org/100ask_imx6ull_tool 为了防止上面这个链接失效,我还对上面这个链接指向的页面保存成了mhtml文件,这个mhtml文件的百度网盘下载链接: https://pan.baidu.c…...
doris:异步物化视图概述
物化视图作为一种高效的解决方案,兼具了视图的灵活性和物理表的高性能优势。 它能够预先计算并存储查询的结果集,从而在查询请求到达时,直接从已存储的物化视图中快速获取结果,避免了重新执行复杂的查询语句所带来的开销。 使用场…...
Java如何权衡是使用无序的数组还是有序的数组
在 Java 中,选择有序数组还是无序数组取决于具体场景的性能需求与操作特点。以下是关键权衡因素及决策指南: ⚖️ 核心权衡维度 维度有序数组无序数组查询性能二分查找 O(log n) ✅线性扫描 O(n) ❌插入/删除需移位维护顺序 O(n) ❌直接操作尾部 O(1) ✅内存开销与无序数组相…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
AI病理诊断七剑下天山,医疗未来触手可及
一、病理诊断困局:刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断",医生需通过显微镜观察组织切片,在细胞迷宫中捕捉癌变信号。某省病理质控报告显示,基层医院误诊率达12%-15%,专家会诊…...
C/C++ 中附加包含目录、附加库目录与附加依赖项详解
在 C/C 编程的编译和链接过程中,附加包含目录、附加库目录和附加依赖项是三个至关重要的设置,它们相互配合,确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中,这些概念容易让人混淆,但深入理解它们的作用和联…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...
springboot 日志类切面,接口成功记录日志,失败不记录
springboot 日志类切面,接口成功记录日志,失败不记录 自定义一个注解方法 import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/***…...
沙箱虚拟化技术虚拟机容器之间的关系详解
问题 沙箱、虚拟化、容器三者分开一一介绍的话我知道他们各自都是什么东西,但是如果把三者放在一起,它们之间到底什么关系?又有什么联系呢?我不是很明白!!! 就比如说: 沙箱&#…...
xmind转换为markdown
文章目录 解锁思维导图新姿势:将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件(ZIP处理)2.解析JSON数据结构3:递归转换树形结构4:Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...
