本文是《Kafka的Consumer Group Rebalance》一文的补充部分,主要附加介绍一下Kafka内置的几种分区分配策略。
Kafka定义了一个消费者组内分区分配的接口ConsumerPartitionAssignor
,该接口里面最核心的是assign
方法:
package org.apache.kafka.clients.consumer;
public interface ConsumerPartitionAssignor {
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
// 省略其它
}
该方法的两个参数分别是当前集群信息metadata
和本组成员订阅的主题信息groupSubscription
,根据这两个信息计算分配方案GroupAssignment
。目前Kafka已经实现了几种具体的策略:
- RangeAssignor(2.8版本中的默认策略)
- RoundRobinAssignor
- StickyAssignor
- CooperativeStickyAssignor
完整的UML图如下:
下面分别介绍。
RangeAssignor
该分配算法是逐个topic进行分配的(per-topic basis),对于每个topic:将分区按数值排序,将Consumer按member.id
(如果用户指定了group.instance.id
,则使用该id作为member.id
,否则随机生成)进行字典序排列。用分区数除以消费者个数得出每个Consumer应该分配的分区个数N(不能整除时向上取整),然后依次给每个Consumer一次分配N个分区(最后一个可能不足N个)。
比如现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2. 假设消费者排序后顺序为C0、C1,先开始分配topic t0,3个分区/2个Consumer等于1.5,向上取整为2,即每个Consumer分配2个分区,于是t0的分配结果为:
- C0:[t0p0, t0p1]
- C1:[t0p2]
然后再分配topic t1,和上面同理,分配结果如下:
- C0:[t1p0, t1p1]
- C1:[t1p2]
所以最终合并后的分配结果为:
- C0:[t0p0, t0p1, t1p0, t1p1]
- C1:[t0p2, t1p2]
RoundRobinAssignor
Round Robin策略和Range的不同之处在于它是将所有topic的分区放在一起进行分配的。具体方式为:先将Consumer按member.id
进行排序,将所有分区按数值排序。然后将分区以round robin的方式依次(每次一次)分配给各个Consumer。如果Consumer订阅的topic有差异的话,在分配某个topic的Partition的时候,如果当前Consumer没有订阅该topic,就会跳过该Consumer。举两个例子:
例子1:组内Consumer订阅信息相同。假设现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2。将这6个分区以round-robin的方式分配给C0和C1,分配结果为:
- C0:[t0p0, t0p2, t1p1]
- C1:[t0p1, t1p0, t1p2]
例子2:组内Consumer订阅信息不同。假设有3个Consumer C0、C1、C2;有3个topic t0、t1、t2,3个topic的分区数分别为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。其中C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2,则分配结果为:
- C0:[t0p0]
- C1:[t1p0]
- C2:[t1p1, t2p0, t2p1, t2p2]
StickyAssignor
StickyAssignor算法在分配时有2个重要的考虑点:
- 尽量让分区分配的比较均衡
- 当发生重新分配的时候(即Rebalance的时候),在保证1的前提下,尽量保持原有的分配方案不变动
看2个例子。
例子1:假设有3个Consumer C0、C1、C2;有4个主题t0、t1、t2、t3,每个分区有2个分区,即所有分区为:t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1。现在所有Consumer都订阅了4个主题,则分配结果如下:
- C0:[t0p0, t1p1, t3p0]
- C1:[t0p1, t2p0, t3p1]
- C2:[t1p0, t2p1]
这个结果和前面RoundRobinAssignor的分配结果是一样的。但当发生重新分配的时候,就不一样了。假设,现在C1挂掉了,需要重新分配。如果是RoundRobinAssignor,重新分配后的结果如下:
- C0:[t0p0, t1p0, t2p0, t3p0]
- C2:[t0p1, t1p1, t2p1, t3p1]
但如果使用StickyAssignor的话,重新分配后的结果如下:
- C0:[t0p0, t1p1, t3p0, t2p0]
- C2:[t1p0, t2p1, t0p1, t3p1]
可以看到,StickyAssignor将C1的分区按照Round Robin的方式分配给了C0和C2,在保证均衡的前提下,最大限度的保留了原有分配方案。
例子2:假设有3个Consumer C0、C1、C2;有3个主题t0、t1、t2,分区数依次为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。现在C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2。如果使用RoundRobin,前面已经展示过一次了,分配结果为:
- C0:[t0p0]
- C1:[t1p0]
- C2:[t1p1, t2p0, t2p1, t2p2]
但如果使用StickyAssignor的话,分配结果为:
- C0:[t0p0]
- C1:[t1p0, t1p1]
- C2:[t2p0, t2p1, t2p2]
此时,如果C0挂掉,RoundRobin重新分配后的结果为:
- C1:[t0p0, t1p1]
- C2:[t1p0, t2p0, t2p1, t2p2]
有3个分区分配没有变化,且分配不均匀。
但StickyAssignor重新分配的结果为:
- C1:[t1p0, t1p1, t0p0]
- C2:[t2p0, t2p1, t2p2]
有5个分区分配没有变化,且分配均匀。
CooperativeStickyAssignor
该策略具体的分配方式和前面的StickyAssignor是一样的,但有一个重要的区别是该策略会使用Cooperative Rebalance,而StickyAssignor使用的则是Eager Rebalance,这两种Rebalance的区别参见我之前的文章,这里不再赘述。
对比总结
就通用场景而言,进行分区分配的时候,一方面我们比较关注分配的均衡性;另一方面也会比较关注当发生Consumer Group Rebalance的时候,能否最大限度的保持原有的分配。从这两个角度来看:
- 关于均衡:RangeAssignor和RoundRobinAssignor的分配是否均衡,主要取决于组内Consumer订阅的主题情况以及每个主题的分区个数。而StickyAssignor和CooperativeAssignor则不太依赖这个条件,相当于在任何情况下,都能实现一个相对均衡的分配方案。
- 当发生Rebalance的时候最大限度保持原有分配方案:RangeAssignor和RoundRobinAssignor算法自身其实完全没有考虑这点,要实现这个功能点,需要结合static membership特性来实现,即指定
group.instance.id
,这样相当于确定了Consumer的顺序,只要组内Consumer不变、订阅信息不变,就能有一个稳定的分配结果。而StickyAssignor和CooperativeAssignor则考虑了这点,但有一个注意点就是对于StickyAssignor,虽然会尽量保留原有的分配方案,但因为使用的是Eager Rebalance,所以在Rebalance的时候还是会回收所有分区,而CooperativeAssignor使用的是Cooperative Rebalance,所以只会回收有变化的分区。
一般而言,建议新的系统(Kafka 2.4及之后版本)使用CooperativeAssignor。当然,我们也可以实现自己的PartitionAssignor。