本文讨论Kafka的扩缩容以及故障后如何“补齐”分区,本文的操作基于Kafka 2.6版本。
扩容
扩容也就是新增节点,扩容后老的数据不会自动迁移,只有新创建的topic才可能会分配到新增的节点上面。如果我们不需要迁移旧数据,那直接把新的节点启动起来就行了,不需要做额外的操作。但有的时候,新增节点后,我们会将一些老数据迁移到新的节点上,以达到负载均衡的目的,这个时候就需要手动操作了。Kafka提供了一个脚本(在bin目录下):kafka-reassign-partitions.sh,通过这个脚本可以重新分配分区的分布。脚本的使用比较简单,提供一个JSON格式的分配方案,然后传给脚本,脚本根据我们的分配方案重新进行平衡。
举个例子,假如现在集群有181、182两个broker,上面有4个topic:test-1,test-2,test-3,test-4,这些topic都有4个分区,2个副本,如下:
# 两个broker
[zk: localhost:2181(CONNECTED) 0] ls /kafka_26/brokers/ids
[181, 182]
# 4个topic
➜ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka_26
__consumer_offsets
test-1
test-1
test-3
test-4
# test-1
➜ bin/kafka-topics.sh --describe --topic test-1 --zookeeper localhost:2181/kafka_26
Topic: test-1 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-1 Partition: 0 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-1 Partition: 1 Leader: 182 Replicas: 182,181 Isr: 182,181
Topic: test-1 Partition: 2 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-1 Partition: 3 Leader: 182 Replicas: 182,181 Isr: 182,181
# test-2
➜ bin/kafka-topics.sh --describe --topic test-2 --zookeeper localhost:2181/kafka_26
Topic: test-2 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-2 Partition: 0 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-2 Partition: 1 Leader: 182 Replicas: 182,181 Isr: 182,181
Topic: test-2 Partition: 2 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-2 Partition: 3 Leader: 182 Replicas: 182,181 Isr: 182,181
# test-3
➜ bin/kafka-topics.sh --describe --topic test-3 --zookeeper localhost:2181/kafka_26
Topic: test-3 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-3 Partition: 0 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-3 Partition: 1 Leader: 182 Replicas: 182,181 Isr: 182,181
Topic: test-3 Partition: 2 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-3 Partition: 3 Leader: 182 Replicas: 182,181 Isr: 182,181
# test-4
➜ bin/kafka-topics.sh --describe --topic test-4 --zookeeper localhost:2181/kafka_26
Topic: test-4 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-4 Partition: 0 Leader: 182 Replicas: 182,181 Isr: 182,181
Topic: test-4 Partition: 1 Leader: 181 Replicas: 181,182 Isr: 181,182
Topic: test-4 Partition: 2 Leader: 182 Replicas: 182,181 Isr: 182,181
Topic: test-4 Partition: 3 Leader: 181 Replicas: 181,182 Isr: 181,182
现在扩容了,新增了两个节点:183和184。扩容后,我们想要把test-3,test-4迁移到183,184上面去。
首先我们可以准备如下JSON格式的文件(假设文件名为topics-to-move.json
):
{
"topics": [
{
"topic": "test-3"
},
{
"topic": "test-4"
}
],
"version": 1
}
里面写明想要重新分配的topic。然后执行如下命令:
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "183,184" --generate
# 当前分区的分布情况
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[181,182],"log_dirs":["any","any"]}]}
# 建议的分区分布情况
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[183,184],"log_dirs":["any","any"]}]}
可以看到上面的命令会列出当前分区的分布情况,并且会给出一个建议的新分区分配方案,都是JSON格式的,内容也很简单。然后我们将建议的分配方案保存为一个文件(假设文件名为expand-cluster-reassignment.json
),当然我们也可以手动修改这个方案,只要格式正确即可。然后执行下面命令使用新的方案进行分区重分配:
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[181,182],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for test-3-0,test-3-1,test-3-2,test-3-3,test-4-0,test-4-1,test-4-2,test-4-3
这样就提交了重分配的任务,可以使用下面的命令查看任务的执行状态:
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition test-3-0 is complete.
Reassignment of partition test-3-1 is complete.
Reassignment of partition test-3-2 is complete.
Reassignment of partition test-3-3 is complete.
Reassignment of partition test-4-0 is complete.
Reassignment of partition test-4-1 is complete.
Reassignment of partition test-4-2 is complete.
Reassignment of partition test-4-3 is complete.
Clearing broker-level throttles on brokers 181,182,183,184
Clearing topic-level throttles on topics test-3,test-4
完成后,我们检查一下新的test-3和test-4的分区分配情况:
➜ bin/kafka-topics.sh --describe --topic test-3 --zookeeper localhost:2181/kafka_26
Topic: test-3 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-3 Partition: 0 Leader: 183 Replicas: 183,184 Isr: 183,184
Topic: test-3 Partition: 1 Leader: 184 Replicas: 184,183 Isr: 183,184
Topic: test-3 Partition: 2 Leader: 183 Replicas: 183,184 Isr: 183,184
Topic: test-3 Partition: 3 Leader: 184 Replicas: 184,183 Isr: 183,184
➜ bin/kafka-topics.sh --describe --topic test-4 --zookeeper localhost:2181/kafka_26
Topic: test-4 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-4 Partition: 0 Leader: 184 Replicas: 184,183 Isr: 183,184
Topic: test-4 Partition: 1 Leader: 183 Replicas: 183,184 Isr: 183,184
Topic: test-4 Partition: 2 Leader: 184 Replicas: 184,183 Isr: 183,184
Topic: test-4 Partition: 3 Leader: 183 Replicas: 183,184 Isr: 184,183
可以看到,这两个topic的数据已经全部分配到183和184节点上了。
缩容和故障场景
从上面可以看到,其实数据分配完全是由我们自己把控的,缩容也只是数据迁移而已,只需要提供正确的迁移方案即可。一般生产环境很少有缩容的,但有一个场景比较常见,就是某个节点故障了,且无法恢复。以前的文章提到过,节点故障后,这个节点上的分区就丢了,Kafka不会自动在其它可用节点上重新创建一个副本,这个时候就需要我们自己手动在其他可用节点创建副本,原理和扩容是一样的。接着上面的例子,比如现在184节点故障了,且无法恢复了,而test-3和test-4有部分分区是在该节点上面的,自然也就丢了:
# 节点挂了,zk中的节点已经没了
[zk: localhost:2181(CONNECTED) 15] ls /kafka_26/brokers/ids
[181, 182, 183]
# 可以看到ISR中已经没有184了
➜ bin/kafka-topics.sh --describe --topic test-3 --zookeeper localhost:2181/kafka_26
Topic: test-3 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-3 Partition: 0 Leader: 183 Replicas: 183,184 Isr: 183
Topic: test-3 Partition: 1 Leader: 183 Replicas: 184,183 Isr: 183
Topic: test-3 Partition: 2 Leader: 183 Replicas: 183,184 Isr: 183
Topic: test-3 Partition: 3 Leader: 183 Replicas: 184,183 Isr: 183
➜ bin/kafka-topics.sh --describe --topic test-4 --zookeeper localhost:2181/kafka_26
Topic: test-4 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-4 Partition: 0 Leader: 183 Replicas: 184,183 Isr: 183
Topic: test-4 Partition: 1 Leader: 183 Replicas: 183,184 Isr: 183
Topic: test-4 Partition: 2 Leader: 183 Replicas: 184,183 Isr: 183
Topic: test-4 Partition: 3 Leader: 183 Replicas: 183,184 Isr: 183
这个时候,我们准备把test-3原来在184上的分区分配到181上面去,把test-4在184上的分区分配到182上去,那分配方案就是下面这样的:
➜ cat expand-cluster-reassignment.json
{
"version": 1,
"partitions": [
{
"topic": "test-3",
"partition": 0,
"replicas": [183, 181],
"log_dirs": ["any", "any"]
},
{
"topic": "test-3",
"partition": 1,
"replicas": [181, 183],
"log_dirs": ["any", "any"]
},
{
"topic": "test-3",
"partition": 2,
"replicas": [183, 181],
"log_dirs": ["any", "any"]
},
{
"topic": "test-3",
"partition": 3,
"replicas": [181, 183],
"log_dirs": ["any", "any"]
},
{
"topic": "test-4",
"partition": 0,
"replicas": [182, 183],
"log_dirs": ["any", "any"]
},
{
"topic": "test-4",
"partition": 1,
"replicas": [183, 182],
"log_dirs": ["any", "any"]
},
{
"topic": "test-4",
"partition": 2,
"replicas": [182, 183],
"log_dirs": ["any", "any"]
},
{
"topic": "test-4",
"partition": 3,
"replicas": [183, 182],
"log_dirs": ["any", "any"]
}
]
}
然后执行分配方案即可:
# 执行分配方案
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
# 输出略
# 查看进度
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
# 输出略
# 完成后查看test-3和test-4
➜ bin/kafka-topics.sh --describe --topic test-3 --zookeeper localhost:2181/kafka_26Topic: test-3 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-3 Partition: 0 Leader: 183 Replicas: 183,181 Isr: 183,181
Topic: test-3 Partition: 1 Leader: 183 Replicas: 181,183 Isr: 183,181
Topic: test-3 Partition: 2 Leader: 183 Replicas: 183,181 Isr: 183,181
Topic: test-3 Partition: 3 Leader: 183 Replicas: 181,183 Isr: 183,181
➜ bin/kafka-topics.sh --describe --topic test-4 --zookeeper localhost:2181/kafka_26Topic: test-4 PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: test-4 Partition: 0 Leader: 183 Replicas: 182,183 Isr: 183,182
Topic: test-4 Partition: 1 Leader: 183 Replicas: 183,182 Isr: 183,182
Topic: test-4 Partition: 2 Leader: 183 Replicas: 182,183 Isr: 183,182
Topic: test-4 Partition: 3 Leader: 183 Replicas: 183,182 Isr: 183,182
总结
不管扩容还是缩容,或者是故障后手动补齐分区,实质都是分区重分配,使用kafka-reassign-partitions.sh
脚本即可。该脚本使用也非常简单:
- 先提供一个JSON格式的需要重分配的topic列表,然后执行
--generate
生成迁移方案文件; - 然后使用
--execute
执行新的分配方案; - 最后使用
--verify
查看分配方案执行进度。
如果对于分配方案文件格式很熟悉,可以跳过1.
评论已关闭