当前位置: 首页 > news >正文

Kafka分区管理大师指南:扩容、均衡、迁移与限流全解析

#作者:孙德新

文章目录

  • 分区分配操作(kafka-reassign-partitions.sh)
    • 1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)
    • 1.2、修改topic分区partition的副本数(扩缩容副本)
    • 1.3、Partition Reassign场景限流
    • 1.4、节点内副本移动到不同目录的场景限流
    • 1.5、集群Broker恢复时,副本数据同步场景限流

分区分配操作(kafka-reassign-partitions.sh)

1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)

Kafka系统提供了一个分区重新分配工具(kafka-reassign-partitions.sh),该工具可用于在Broker之间迁移分区。理想情况下,将确保所有Broker的数据和分区均匀分配。分区重新分配工具无法自动分析Kafka群集中的数据分布并迁移分区以实现均匀的负载均衡。因此,管理员在操作的时候,必须弄清楚应该迁移哪些Topic或分区。
Kafka物理节点扩容方法:只要每个kafka配置文件的brokerid 需要全局唯一,不冲突,启动新kafka节点,即可自动加入原有kafka集群。但是原集群topic数据不会自动移动到新kafka节点上,需要手动迁移。新topic则可以自动分配到新节点上。

–topics-to-move-json-file 指定json文件,文件内容为topic配置 Json文件格式如下:

{"topics": [{"topic": "test_create_topic1"}],"version": 1
}

–generate 尝试给出副本重分配的策略,该命令并不实际执行
–broker-list 指定想要分配的Broker节点,用于尝试给出分配策略,与–generate搭配使用 。–broker-list “0,1,2,3”
–broker-list举例:
./kafka-reassign-partitions.sh --zookeeper xxx:2181 --topics-to-move-json-file config/move-json-file.json --broker-list “0,1,2,3” --generate

执行完毕之后会打印如下,需求注意的是,此时分区移动尚未开始,它只是告诉你当前的分配和建议。保存当前分配,以防你想要回滚它
Current partition replica assignment//当前副本分配方式

Current partition replica assignment//当前副本分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[3],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[2],"log_dirs":["any"]}]}Proposed partition reassignment configuration//期望的重新分配方式
{"version":1,"partitions":[{"topic":"test_create_topic1","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"test_create_topic1","partition":0,"replicas":[0],"log_dirs":["any"]}]}

–reassignment-json-file 指定要重分配的json文件,与–execute搭配使用

–execute 执行。开始执行重分配任务,与–reassignment-json-file搭配使用

–verify 验证任务是否执行成功,检查分区重新分配的状态。当有使用–throttle限流的话,该命令还会移除限流;该命令很重要,不移除限流对正常的副本之间同步会有影响。 该选项用于检查分区重新分配的状态,同时–throttle流量限制也会被移除掉; 否则可能会导致定期复制操作的流量也受到限制。

–throttle 迁移过程Broker之间现在流程传输的速率,单位 bytes/sec – throttle 500000 。迁移过程注意流量陡增对集群的影响。Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限,当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响

> sh bin/kafka-reassign-partitions.sh --zookeeper xxxxx:2181 --reassignment-json-file config/reassignment-json-file.json --execute -- throttle 50000000

加上一个–throttle 50000000 参数, 那么执行移动分区的时候,会被限制流量在50000000 B/s,加上参数后可以看到如下

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

需要注意的是,如果你迁移的时候包含 副本跨路径迁移(同一个Broker多个路径)那么这个限流措施不会生效,你需要再加上–replica-alter-log-dirs-throttle 这个限流参数,它限制的是同一个Broker不同路径直接迁移的限流;

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行execute命令,用相同的reassignment-json-file

–replica-alter-log-dirs-throttle broker内部副本跨路径迁移数据流量限制功能,限制数据拷贝从一个目录到另外一个目录带宽上。单位bytes/sec --replica-alter-log-dirs-throttle 100000

–disable-rack-aware 关闭机架感知能力,在分配的时候就不参考机架的信息

–bootstrap-server 如果是副本跨路径迁移必须有此参数

下面是topic数据均衡操作(迁移操作也是这个思路,只是最终kafka节点目标少):
背景:原有3台kafka集群:192.168.40.11–13,扩容一台kafka 192.168.40.14,把原有3台数据均衡到4台kafka上
首先在现有集群的基础上再添加⼀个Kafka节点,进行物理组件节点扩容,然后使⽤Kafka⾃带的kafka-reassign-partitions.sh ⼯具来重新分布分区,进行数据均衡。该⼯具有三种使⽤模式:
generate模式,给定需要重新分配的Topic,⾃动⽣成reassign plan(并不执⾏)。在此模式下,给定Topic列表和Broker列表,该工具会生成候选重新分配,以将指定Topic的所有分区迁移到新Broker中。此选项仅提供了一种方便的方法,可在给定Topic和目标Broker列表的情况下生成分区重新分配计划。
execute模式,根据指定的reassign plan重新分配Partition。在此模式下,该工具将根据用户提供的重新分配计划启动分区的重新分配。 (使用–reassignment-json-file选项)。由管理员手动制定自定义重新分配计划,也可以使用–generate选项提供。
verify模式,验证重新分配Partition是否成功。在此模式下,该工具将验证最后一次–execute期间列出的所有分区的重新分配状态。状态可以有成功、失败或正在进行等状态。

基本步骤如下:

  1. 查看集群中当前所有可用的topic,目标是名为three的topic进行数据均衡到1234,4个kafka节点上
[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server 192.168.40.11:9092
__consumer_offsets
first
second
three
topic_first
  1. 查看计划数据均衡的topic的详细信息
    列出 对应的topic名称的详细信息,包括topic的分区数量,副本及leader等。
./kafka-topics.sh --describe --zookeeper  zk-ip:port  --topic  topic名称  (命令模板)
[root@localhost bin]# ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092  --topic three
[2024-03-06 01:25:53,446] WARN [AdminClient clientId=adminclient-1] Connection to node 4 (/192.168.40.14:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Topic: three	PartitionCount: 5	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: three	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 2,3,1Topic: three	Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,3,1Topic: three	Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 2,3,1Topic: three	Partition: 3	Leader: 1	Replicas: 1,2,3	Isr: 2,3,1Topic: three	Partition: 4	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
  1. 编制负载均衡topic主题清单的json文件:
    借助kafka-reassign-partitions.sh⼯具⽣成reassign 计划,不过我们先得按照要求定义⼀个json⽂件,文件名自定义。⾥⾯说明哪些topic需要重新分区,版本号固定为1,⽂件内容如下:
[root@node1 ~]# cat topics-to-move.json 
{ 	"topics": [ 		{ "topic":"three" } 	], 	"version":1}

或如下

[root@localhost ~]# cat topic-to-move.json 
{"topics": [{"topic": "three"}],
"version":1
}
  1. 生成副本均衡迁移计划:
    然后使⽤ kafka-reassign-partitions.sh⼯具利用–generate⽣成reassign计划
./kafka-reassign-partitions.sh --zookeeper 192.168.12.100:2181 --topics-to-move-json-file  ./plans/topic-to-move.json  --broker-list "1,2,3,4,5,6" --generate  (命令模板)

参数–broker-list “1,2,3,4” 表示期望均衡、迁移的本kafka集群全部节点(也可是部分节点,但节点个数不能小于副本数量)。执行后输出两段内容,其中黄色标识部分就是副本存储计划文件内容,⽣成的就是将分区重新分布到broker 上的结果

[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --topics-to-move-json-file  /root/topic-to-move.json  --broker-list  "1,2,3,4"  --generate
Current partition replica assignment  //目前的副本分配状态,这个需要保留,可以用它回滚!
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration    //期望计划的副本分配状态
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}注:
上述黄色输出内容说明解释,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],是计划将"topic":"three","partition":0分区分配到broker2、4、1上,后面内容也同样
  1. 编制存储计划json文件:
    上面最后一段(黄色标识)就是副本存储计划文件内容,⽣成的就是将分区重新分布到broker 上的结果,再创建一个存储计划json文件,文件名自定义
[root@localhost bin]#  vim  increase-replication-factor.json
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]}]}
  1. 执行存储计划:
    该命令会将指定分区的副本重新分配到新的 broker上。集群控制器通过为每个分区添加新副本实现重新分配(增加复制系数)。新的副本将从分区的首领那里复制所有数据。根据分区大小的不同,复制过程可能需要花一些时间,因为数据是通过网络复制到新副本上的。在复制完成之后,控制器将旧副本从副本清单里移除(恢复到原先的复制系数)。
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file    /root/increase-replication-factor.json  --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2],"log_dirs":["any","any","any"]},{"topic":"three","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":2,"replicas":[3,2,1],"log_dirs":["any","any","any"]},{"topic":"three","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"three","partition":4,"replicas":[2,3,1],"log_dirs":["any","any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4
  1. 验证副本存储计划:
    在重分配进行过程中或者完成之后,可以使用 kafka-reassign-partitions.sh 工具验证重分配 的状态。它可以显示重分配的进度、已经完成重分配的分区以及错误信息。
[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file    /root/increase-replication-factor.json  --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.Clearing broker-level throttles on brokers 1,2,3,4    //说明已经分配在kafka集群的全部4个节点上了
Clearing topic-level throttles on topic three
  1. 再次验证副本存储计划:
    说明没问题,成功。14主机上分配了分区0、2、3,共3个分区
[root@localhost bin]#  ./kafka-topics.sh --describe --bootstrap-server 192.168.40.11:9092  --topic three
Topic: three	PartitionCount: 5	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: three	Partition: 0	Leader: 1	Replicas: 2,4,1	Isr: 2,1,4Topic: three	Partition: 1	Leader: 2	Replicas: 3,1,2	Isr: 2,3,1Topic: three	Partition: 2	Leader: 4	Replicas: 4,2,3	Isr: 2,3,4Topic: three	Partition: 3	Leader: 1	Replicas: 1,3,4	Isr: 3,1,4Topic: three	Partition: 4	Leader: 2	Replicas: 2,1,3	Isr: 2,3,1
到kafka-14主机查看数据,已经有数据了,分区0、2、3在14主机上
[root@localhost log]# ll
total 16
-rw-r--r--. 1 root root   0 Mar  4 21:48 cleaner-offset-checkpoint
-rw-r--r--  1 root root   4 Mar  6 06:55 log-start-offset-checkpoint
-rw-r--r--  1 root root  88 Mar  6 01:47 meta.properties
-rw-r--r--  1 root root  34 Mar  6 06:55 recovery-point-offset-checkpoint
-rw-r--r--  1 root root  34 Mar  6 06:55 replication-offset-checkpoint
drwxr-xr-x  2 root root 141 Mar  6 03:01 three-0
drwxr-xr-x  2 root root 141 Mar  6 03:06 three-2
drwxr-xr-x  2 root root 141 Mar  6 03:01 three-3
[root@localhost log]# cd three-0
[root@localhost three-0]# ll
total 0
-rw-r--r-- 1 root root 10485760 Mar  6 03:01 00000000000000000000.index
-rw-r--r-- 1 root root        0 Mar  6 03:01 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Mar  6 03:01 00000000000000000000.timeindex
-rw-r--r-- 1 root root        0 Mar  6 03:01 leader-epoch-checkpoint

1.2、修改topic分区partition的副本数(扩缩容副本)

分区重分配工具提供了一些特性,用于改变分区的复制系数(副本数)。如果在创建分区时指定了错误的复制系数、副本数(比如在创建主题时没有足够多可用的 broker),那么就有必要修改它们。这可以通过创建一个 JSON 对象来完成,该对象使用分区重新分配的执行步骤中使用的格式,显式指定分区所需的副本数量。集群将完成重分配过程,并使用新的复制系数、副本数。
Kafka不能通过命令行方式修改副本,需要通过json等其他方式修改。无论是增加或减少副本,都可以针对每个分区进行个性增加或减少,不是必须所有分区都增加或同时减少。

增加副本操作:

创建json文件:
1)方法一:使用命令自动创建(有待进一步验证)
指定要修改副本数的Topic和分区,以及新的副本分配。这个JSON文件可以通过kafka-reassign-partitions.sh脚本的–generate选项自动生成。命令模板如下,11 14 18为kafka的broker id。
首先创建一个文件addReplicas.json,现有业务topic名称为aaa

[root@kafka18 ~]# cat addReplicas.json 
{"topics": [{"topic": "aaa"}],"version": 1
}[root@kafka18 ~]# /usr/local/kafka_2.13-2.7.1/bin/kafka-reassign-partitions.sh --bootstrap-server  192.168.40.18:9092 --topics-to-move-json-file addReplicas.json --broker-list "11,14,18" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"aaa","partition":0,"replicas":[14,18,11],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":1,"replicas":[11,14,18],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":2,"replicas":[18,11,14],"log_dirs":["any","any","any"]}]}Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"aaa","partition":0,"replicas":[11,14,18],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":1,"replicas":[14,18,11],"log_dirs":["any","any","any"]},{"topic":"aaa","partition":2,"replicas":[18,11,14],"log_dirs":["any","any","any"]}]}

2)方法二:手动编辑创建
假设topic已有3个分区2个副本,需要增加到3个副本。创建一个预定义的topic分区副本json文件,即副本存储计划:

]# cat addReplicas.json   //该名称自定义,该文件与下面reassign.json文件内容一致,仅格式不同,使用reassign.json文件
{"version": 1,         //固定写法"partitions": [{"topic": "test",    //业务的topic名称"partition": 0,    //partition的序号,必须从0开始,是第一个分区"replicas": [  //期望增加到的副本数,数字为broker的id,说明期望增加到012,3个broker上,为3个副本0,1,2]},{"topic": "test","partition": 1,"replicas": [1,0,2]},{"topic": "test","partition": 2,"replicas": [2,0,1]}]
}

或如下格式编写
在这里插入图片描述
整体操作步骤如下:

首先查看topic的副本情况,发现有5个分区,每个分区有2个副本
bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092 --describe
Topic: three PartitionCount: 5 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: three Partition: 0 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: three Partition: 1 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: three Partition: 2 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: three Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: three Partition: 4 Leader: 2 Replicas: 2,3 Isr: 2,3

首先编写配置文件,把上面查询输出复制过来即可:

vim  reassign.json
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3,2]},{"topic":"three","partition":1,"replicas":[2,1,3]},{"topic":"three","partition":2,"replicas":[3,2,1]},{"topic":"three","partition":3,"replicas":[1,2,3]},{"topic":"three","partition":4,"replicas":[2,3,1]}
]}

执行副本存储计划

]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[1,3],"log_dirs":["any","any"]},{"topic":"three","partition":1,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"three","partition":2,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"three","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"three","partition":4,"replicas":[2,3],"log_dirs":["any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for three-0,three-1,three-2,three-3,three-4

查看执行的状态:

[root@localhost bin]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition three-0 is complete.
Reassignment of partition three-1 is complete.
Reassignment of partition three-2 is complete.
Reassignment of partition three-3 is complete.
Reassignment of partition three-4 is complete.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic three

再次查看验证结果,黄色部分broker的id,说明副本都增加成功了,分布在原有kafka集群了

]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092  --describe
Topic: three	PartitionCount: 5	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: three	Partition: 0	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2Topic: three	Partition: 1	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3Topic: three	Partition: 2	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1Topic: three	Partition: 3	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3Topic: three	Partition: 4	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1

减少topic副本操作:(即从某个kafka节点下线副本)

把下面4分区3副本,名为topic_first的topic,减少计划如下,黄色为计划减掉的副本

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092  --describe
Topic: topic_first	PartitionCount: 4	ReplicationFactor: 3	Configs: segment.bytes=1073741824Topic: topic_first	Partition: 0	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2Topic: topic_first	Partition: 1	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3Topic: topic_first	Partition: 2	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1Topic: topic_first	Partition: 3	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1

我们必须保留Leader副本,再把计划保留的其他Follower副本编辑上,首先编写配置文件,把上面查询输出复制过来即可:

vim  reassign.json
{"version":1,"partitions":[{"topic":"topic_first","partition":0,"replicas":[3,1]},{"topic":"topic_first","partition":1,"replicas":[1]},{"topic":"topic_first","partition":2,"replicas":[2,1]},{"topic":"topic_first","partition":3,"replicas":[3,2]}
]}

执行计划

]# ./kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute  //有zk写法
]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file reassign.json --execute  //无zk

Json文件要用绝对路径,或者确保可以找到json文件

]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --execute
Current partition replica assignment{"version":1,"partitions":[{"topic":"topic_first","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":2,"replicas":[2,3,1],"log_dirs":["any","any","any"]},{"topic":"topic_first","partition":3,"replicas":[3,2,1],"log_dirs":["any","any","any"]}]}Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for topic_first-0,topic_first-1,topic_first-2,topic_first-3

查询执行结果

]# ./kafka-reassign-partitions.sh --bootstrap-server 192.168.40.11:9092 --reassignment-json-file /root/reassign.json --verify
Status of partition reassignment:
Reassignment of partition topic_first-0 is complete.
Reassignment of partition topic_first-1 is complete.
Reassignment of partition topic_first-2 is complete.
Reassignment of partition topic_first-3 is complete.Clearing broker-level throttles on brokers 1,2,3
Clearing topic-level throttles on topic topic_first

再次查看 Topic 的详情,否按照预想减少副本,结果成功!

[root@localhost bin]#  ./kafka-topics.sh --bootstrap-server 192.168.40.11:9092,192.168.40.12:9092  --describe
Topic: topic_first	PartitionCount: 4	ReplicationFactor: 2	Configs: segment.bytes=1073741824Topic: topic_first	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1Topic: topic_first	Partition: 1	Leader: 1	Replicas: 1	Isr: 1Topic: topic_first	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1Topic: topic_first	Partition: 3	Leader: 3	Replicas: 3,2	Isr: 3,2

故障恢复方法:
宕机如何恢复:
少部分副本宕机
当leader宕机了,会从follower选择⼀个作为leader。当宕机的重新恢复时,会把之前commit的数据清空,重新从leader⾥pull数据。
全部副本宕机
当全部副本宕机了有两种恢复⽅式:
等待ISR中的⼀个恢复后,并选它作为leader。(等待时间较⻓,降低可⽤性)
选择第⼀个恢复的副本作为新的leader,⽆论是否在ISR中。(并未包含之前leader commit的数据,因此造成数据丢失)

只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Kafka会在Zookeeper上针对每个Topic维护⼀个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是⼀些分区的副本。
只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的⽣产者。
如果这个集合有增减,kafka会更新zookeeper上的记录。

如果所有的ISR副本都失败了怎么办?此时有两种⽅法可选:
等待ISR集合中的副本复活,
选择任何⼀个⽴即可⽤的副本,⽽这个副本不⼀定是在ISR集合中。需要设置 unclean.leader.election.enable=true,如果设置为true就意味着当leader下线时候可以从非ISR集合中选举出新的leader,这样有可能造成数据的丢失
这两种⽅法各有利弊,实际⽣产中按需选择。
如果要等待ISR副本复活,虽然可以保证⼀致性,但可能需要很⻓时间。⽽如果选择⽴即可⽤的副本,则很可能该副本并不⼀致。

总结:
Kafka中Leader分区选举,通过维护⼀个动态变化的ISR集合来实现,⼀旦Leader分区丢掉,则从ISR中随机挑选⼀个副本做新的Leader分区。
如果ISR中的副本都丢失了,则:
可以等待ISR中的副本任何⼀个恢复,接着对外提供服务,需要时间等待。
从OSR中选出⼀个副本做Leader副本,此时会造成数据丢失

还有一个ISR,该参数全称,in-sync replica,它维护了一个集合,例如截图里的2,0,1,代表2,0,1副本保存的消息日志与leader 副本是保持一致的,只有保持一致的副本(包括所有副本),才会被维护在ISR集合里,当出现一定程度的不同步时,就会将该对应已经不一致的副本移出ISR集合,但是,这种移出并非永久的,一旦被移出的副本慢慢又恢复与leader一样时,那么,又会被加回isr集合当中。注意一点,只有在这个ISR里的副本服务器,才能在leader出现问题时有机会被选举为新的leader。

补充
用步骤1.1的 --generate 获取一下当前的分配情况,得到如下json

{"version": 1,"partitions": [{"topic": "test_create_topic1","partition": 2,"replicas": [2],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 1,"replicas": [1],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 0,"replicas": [0],"log_dirs": ["any"]
}]}假如想把所有分区的副本都变成2, 那只需修改"replicas": []里面的值了,这里面是Broker列表,排在第一个的是Leader; 所以根据自己想要的分配规则修改一下json文件就变成如下
{"version": 1,"partitions": [{"topic": "test_create_topic1","partition": 2,"replicas": [2,0],"log_dirs": ["any","any"]}, {"topic": "test_create_topic1","partition": 1,"replicas": [1,2],"log_dirs": ["any","any"]}, {"topic": "test_create_topic1","partition": 0,"replicas": [0,1],"log_dirs": ["any","any"]}]}

注意log_dirs里面的数量要和replicas数量匹配;或者直接把log_dirs选项删除掉; 这个log_dirs是副本跨路径迁移时候的绝对路径
执行–execute

如果想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。可以重新运行execute命令,用相同的reassignment-json-file:
验证–verify,完事之后,副本数量就增加了

副本缩容
副本缩容跟扩容是一个意思; 当副本分配少于之前的数量时候,多出来的副本会被删除;
比如刚新增了一个副本,想重新恢复到一个副本
执行下面的json文件

{"version": 1,"partitions": [{"topic": "test_create_topic1","partition": 2,"replicas": [2],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 1,"replicas": [1],"log_dirs": ["any"]}, {"topic": "test_create_topic1","partition": 0,"replicas": [0],"log_dirs": ["any"]}]}

1.3、Partition Reassign场景限流

Reassign中文重新分配
Partition Reassign限流注意事项:

限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
限流参数不会对正常的副本fetch流量进行限速。
任务完成后,您需要通过verify参数移除Topic和Broker上的限速参数配置。
如果刚开始已经设置了throttle参数,则可以通过execute命令再次修改throttle参数。
如果刚开始没有设置throttle参数,则需要使用kafka-configs.sh命令修改Topic上的leader.replication.throttled.replicas和follower.replication.throttled.replicas参数、修改Broker上的leader.replication.throttled.rate和follower.replication.throttled.rate参数。

使用kafka-reassign-partitions.sh来进行Partition Reassign操作,通过使用throttle参数来设置限流的大小。示例如下所示:

  1. 创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

通过以下命令查看Topic详情。

./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  1. 执行以下命令,模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  1. 设置throttle参数并执行reassign操作。
    创建reassignment-json-file文件reassign.json,写入如下内容。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}

执行reassign操作。
由于模拟的写入速度为10 Mbit/s,所以将reassign限流速度设置为30 Mbit/s。

./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
  1. 查看限流参数。
    查看指定节点的Broker参数。2是broker的id
    ./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe

查看指定Topic的参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe

  1. 查看reassign任务执行情况。
    ./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

说明:任务完成后,您需要重复执行上述命令以移除限流参数。

1.4、节点内副本移动到不同目录的场景限流

通过kafka-reassign-partitions.sh可以进行Broker节点内的副本迁移,参数replica-alter-log-dirs-throttle可以对节点内的迁移IO进行限制。示例如下所示:

  1. 创建测试Topic。
    执行以下命令,创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

可以通过以下命令查看Topic详情。

./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  1. 执行以下命令,模拟数据写入。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  1. 设置参数replica-alter-log-dirs-throttle并执行reassign操作。
创建文件reassign.json,将目标目录写入reassignment文件中,内容如下。
{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}执行replicas movement操作。
./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  1. 查看限流参数。
    Broker节点内目录间移动副本会在Broker上配置限流参数,参数名为Brokerreplica.alter.log.dirs.io.max.bytes.per.second。

执行以下命令,查看指定节点的Broker参数。

./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  1. 查看reassign任务执行情况。
    ./kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

说明:任务完成后,您需要重复执行上述命令以移除限流参数。

1.5、集群Broker恢复时,副本数据同步场景限流

重要提示:
限流速度不能过小,如果限流速度过小,将不能触发实际的reassign复制过程。
限流参数不会对正常的副本fetch流量进行限速。
数据恢复完成后,需要使用kafka-configs.sh命令删除相应的参数。

步骤:
当Broker重启时,需要从leader副本进行副本数据的同步。在Broker节点迁移、坏盘修复重新上线等场景时,由于之前的副本数据完全丢失、副本数据恢复会产生大量的同步流量,有必要对恢复过程进行限流避免恢复流量过大影响正常流量。

  1. 创建测试Topic。
    执行以下命令,创建测试Topic。
./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

可以通过以下命令查看Topic详情。

./kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  1. 执行以下命令,写入测试数据。
./kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  1. 通过kafka-configs.sh命令设置限流参数。
//设置Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"//设置Broker上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
./kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
  1. 停止Broker 1节点进程。

  2. 删除Broker 1上的副本数据,模拟数据丢失的场景。
    rm -rf /mnt/disk2/kafka/log/test-throttled-0/

  3. 启动Broker 1节点,观察限流参数是否起作用。

  4. 待Broker 1相应的副本恢复到ISR列表后,使用kafka-configs.sh命令删除限流参数的配置。

//删除Topic上的限流参数。
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled//删除Broker上的限流参数
./kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0

相关文章:

Kafka分区管理大师指南:扩容、均衡、迁移与限流全解析

#作者:孙德新 文章目录 分区分配操作(kafka-reassign-partitions.sh)1.1 分区扩容、数据均衡、迁移(kafka-reassign-partitions.sh)1.2、修改topic分区partition的副本数(扩缩容副本)1.3、Partition Reassign场景限流1.4、节点内副本移动到不…...

3.从零开始学会Vue--{{生命周期,工程化,组件化}}

1.生命周期钩子 1.是什么 生命周期 概念:就是一个Vue实例从创建 到 销毁 的整个过程 生命周期包括:① 创建 ② 挂载 ③ 更新 ④ 销毁 四个阶段 1.创建阶段:创建响应式数据 2.挂载阶段:渲染模板 3.更新阶段:修改…...

Python--网络编程

3. 网络编程与Socket 3.1 Socket基础 创建Socket import socket# TCP Socket tcp_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM)# UDP Socket udp_socket socket.socket(socket.AF_INET, socket.SOCK_DGRAM)服务器端函数 函数描述​bind((host, port))​绑定…...

【java】方法的基本内存原理(栈和堆)

java内存主要分为栈和堆,方法相关的部分主要在栈内存里,每个方法调用时会在栈里创建一个栈帧,存放局部变量和方法执行的信息。执行完后栈帧被销毁,局部变量消失。而对象实例存在堆里,由垃圾回收器管理。 **Java方法内…...

SQLMesh 系列教程4- 详解模型特点及模型类型

SQLMesh 作为一款强大的数据建模工具,以其灵活的模型设计和高效的增量处理能力脱颖而出。本文将详细介绍 SQLMesh 模型的特点和类型,帮助读者快速了解其强大功能。我们将深入探讨不同模型类型(如增量模型、全量模型、SCD Type 2 等&#xff0…...

SpringBoot(接受参数相关注解)

文章目录 1.基本介绍2.PathVariable 路径参数获取信息 1.代码实例 1.index.html2.ParameterController.java3.测试 2.细节说明 3.RequestHeader 请求头获取信息 1.代码实例 1.index.html2.ParameterController.java3.测试 2.细节说明 4.RequestParameter 请求获取参数信息 1.…...

hbase合并队列超长问题分析

问题现象 hbase集群合并队列超长,有节点上合并任务已经运行超过1天未结束,合并队列总长不断增加。 问题分析 参数配置: 配置参数默认值含义hbase.hregion.memstore.flush.size128MMemStore达到该值会Flush成StoreFilehbase.hregion.memstore.block.multiplier4当region中…...

FPGA的星辰大海

编者按 时下风头正盛的DeepSeek,正值喜好宏大叙事的米国大统领二次上岗就业,OpenAI、软银、甲骨文等宣布投资高达5000亿美元“星际之门”之际,对比尤为强烈。 某种程度上,,是低成本创新理念的直接落地。 包括来自开源社区的诸多赞誉是,并非体现技术有多“超越”,而是…...

认识vue-admin

认识vue-admin **核心交付:** 为什么要基于现成架子二次开发 什么是二次开发:基于已有的代码(项目工程,脚手架)开进行新功能的开发 所以看懂已有的框架中的既有代码,变得很重要了 1. 背景知识 后台管理系统是一种最…...

STM32、GD32驱动TM1640原理图、源码分享

一、原理图分享 二、源码分享 /************************************************* * copyright: * author:Xupeng * date:2024-07-18 * description: **************************************************/ #include "smg.h"#define DBG_TAG "smg&…...

spring boot 对接aws 的S3 服务,实现上传和查询

1.aws S3介绍 AWS S3(Amazon Simple Storage Service)是亚马逊提供的一种对象存储服务,旨在提供可扩展、高可用性和安全的数据存储解决方案。以下是AWS S3的一些主要特点和功能: 1.1. 对象存储 对象存储模型:S3使用…...

PH热榜 | 2025-02-12

1. FirstHR 2.0 with HR Copilot 标语:小型企业的一站式人力资源平台 介绍:对小型企业来说,FirstHR是一个人力资源平台,专注于招聘和团队发展,并融合了一点人工智能技术。 产品网站: 立即访问 Product …...

通过例子学 rust 个人精简版 1-1

1-1 Hello World fn main() {println!("Hello World!");// 动手试一试println!("Im a Rustacean!"); }Hello World! Im a Rustacean!要点1 :println 自带换行符 注释 fn main() {let x 5 /* 90 */ 5;println!("Is x 10 or 100? x …...

HTTP的前世今生:如何塑造现代互联网的交互方式?

一、关于HTTP 1.1 简介 “没有HTTP协议,就没有今天的互联网。” 从简单的文本传输到支撑全球数十亿设备的实时交互,HTTP协议始终是Web世界的核心纽带。本文将深入剖析其设计思想、演进历程及底层工作原理。 HTTP(HyperText Transfer Protoco…...

Flutter_学习记录_动画的简单了解

用AnimationController简单实现如下的效果图&#xff1a; 1. 只用AnimationController实现简单动画 1.1 完整代码案例 import package:flutter/material.dart;class AnimationDemo extends StatefulWidget {const AnimationDemo({super.key});overrideState<AnimationDe…...

【java】for (int num : numbers) { System.out.print(num + “ “); } for里的是什么意思

for (int num : numbers) 是 Java 中的一种 增强型 for 循环&#xff08;也称为 for-each 循环&#xff09;。它的作用是遍历数组或集合中的每一个元素&#xff0c;并对每个元素执行循环体中的操作。 1. 增强型 for 循环的语法 java Copy for (元素类型 变量名 : 数组或集合…...

内容中台驱动企业CMS架构优化与高效策略

内容概要 在数字化转型浪潮中&#xff0c;企业内容管理系统&#xff08;CMS&#xff09;正面临从单一内容存储向智能化、协同化方向演进的迫切需求。通过引入内容中台架构&#xff0c;企业能够有效整合元数据管理、版本控制与智能协作能力&#xff0c;从而优化传统CMS的底层逻…...

我用 Cursor 开发了一款个人小记系统

https://note.iiter.cn 项目背景 在日常工作和学习中,我们经常需要快速记录一些想法、收藏一些有用的链接或者保存一些重要的文本、图片内容。虽然市面上已经有很多笔记软件,但我想要一个更轻量、更简单的工具,专注于快速记录和智能检索。于是我开发了这款个人小记系统。 系统…...

百问网(100ask)提供的烧写工具的原理和详解;将自己编译生成的u-boot镜像文件烧写到eMMC中

百问网(100ask)提供的烧写工具的原理 具体的实现原理见链接 http://wiki.100ask.org/100ask_imx6ull_tool 为了防止上面这个链接失效&#xff0c;我还对上面这个链接指向的页面保存成了mhtml文件&#xff0c;这个mhtml文件的百度网盘下载链接&#xff1a; https://pan.baidu.c…...

doris:异步物化视图概述

物化视图作为一种高效的解决方案&#xff0c;兼具了视图的灵活性和物理表的高性能优势。 它能够预先计算并存储查询的结果集&#xff0c;从而在查询请求到达时&#xff0c;直接从已存储的物化视图中快速获取结果&#xff0c;避免了重新执行复杂的查询语句所带来的开销。 使用场…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密

在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

CMake控制VS2022项目文件分组

我们可以通过 CMake 控制源文件的组织结构,使它们在 VS 解决方案资源管理器中以“组”(Filter)的形式进行分类展示。 🎯 目标 通过 CMake 脚本将 .cpp、.h 等源文件分组显示在 Visual Studio 2022 的解决方案资源管理器中。 ✅ 支持的方法汇总(共4种) 方法描述是否推荐…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

LeetCode - 199. 二叉树的右视图

题目 199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 思路 右视图是指从树的右侧看&#xff0c;对于每一层&#xff0c;只能看到该层最右边的节点。实现思路是&#xff1a; 使用深度优先搜索(DFS)按照"根-右-左"的顺序遍历树记录每个节点的深度对于…...

BLEU评分:机器翻译质量评估的黄金标准

BLEU评分&#xff1a;机器翻译质量评估的黄金标准 1. 引言 在自然语言处理(NLP)领域&#xff0c;衡量一个机器翻译模型的性能至关重要。BLEU (Bilingual Evaluation Understudy) 作为一种自动化评估指标&#xff0c;自2002年由IBM的Kishore Papineni等人提出以来&#xff0c;…...