本文是《Kafka的Consumer Group Rebalance》一文的补充部分,主要附加介绍一下Kafka内置的几种分区分配策略。

Kafka定义了一个消费者组内分区分配的接口ConsumerPartitionAssignor,该接口里面最核心的是assign方法:

package org.apache.kafka.clients.consumer;

public interface ConsumerPartitionAssignor {
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
    // 省略其它
}

该方法的两个参数分别是当前集群信息metadata和本组成员订阅的主题信息groupSubscription,根据这两个信息计算分配方案GroupAssignment。目前Kafka已经实现了几种具体的策略:

  • RangeAssignor(2.8版本中的默认策略)
  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

完整的UML图如下:

uml

下面分别介绍。

RangeAssignor

该分配算法是逐个topic进行分配的(per-topic basis),对于每个topic:将分区按数值排序,将Consumer按member.id(如果用户指定了group.instance.id,则使用该id作为member.id,否则随机生成)进行字典序排列。用分区数除以消费者个数得出每个Consumer应该分配的分区个数N(不能整除时向上取整),然后依次给每个Consumer一次分配N个分区(最后一个可能不足N个)。

比如现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2. 假设消费者排序后顺序为C0、C1,先开始分配topic t0,3个分区/2个Consumer等于1.5,向上取整为2,即每个Consumer分配2个分区,于是t0的分配结果为:

  • C0:[t0p0, t0p1]
  • C1:[t0p2]

然后再分配topic t1,和上面同理,分配结果如下:

  • C0:[t1p0, t1p1]
  • C1:[t1p2]​

所以最终合并后的分配结果为:

  • C0:[t0p0, t0p1, t1p0, t1p1]
  • C1:[t0p2, t1p2]

RoundRobinAssignor

Round Robin策略和Range的不同之处在于它是将所有topic的分区放在一起进行分配的。具体方式为:先将Consumer按member.id进行排序,将所有分区按数值排序。然后将分区以round robin的方式依次(每次一次)分配给各个Consumer。如果Consumer订阅的topic有差异的话,在分配某个topic的Partition的时候,如果当前Consumer没有订阅该topic,就会跳过该Consumer。举两个例子:

例子1:组内Consumer订阅信息相同。假设现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2。将这6个分区以round-robin的方式分配给C0和C1,分配结果为:

  • C0:[t0p0, t0p2, t1p1]
  • C1:[t0p1, t1p0, t1p2]

例子2:组内Consumer订阅信息不同。假设有3个Consumer C0、C1、C2;有3个topic t0、t1、t2,3个topic的分区数分别为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。其中C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2,则分配结果为:

  • C0:[t0p0]
  • C1:[t1p0]
  • C2:[t1p1, t2p0, t2p1, t2p2]

StickyAssignor

StickyAssignor算法在分配时有2个重要的考虑点:

  1. 尽量让分区分配的比较均衡
  2. 当发生重新分配的时候(即Rebalance的时候),在保证1的前提下,尽量保持原有的分配方案不变动

看2个例子。

例子1:假设有3个Consumer C0、C1、C2;有4个主题t0、t1、t2、t3,每个分区有2个分区,即所有分区为:t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1。现在所有Consumer都订阅了4个主题,则分配结果如下:

  • C0:[t0p0, t1p1, t3p0]
  • C1:[t0p1, t2p0, t3p1]
  • C2:[t1p0, t2p1]

这个结果和前面RoundRobinAssignor的分配结果是一样的。但当发生重新分配的时候,就不一样了。假设,现在C1挂掉了,需要重新分配。如果是RoundRobinAssignor,重新分配后的结果如下:

  • C0:[t0p0, t1p0, t2p0, t3p0]
  • C2:[t0p1, t1p1, t2p1, t3p1]

但如果使用StickyAssignor的话,重新分配后的结果如下:

  • C0:[t0p0, t1p1, t3p0, t2p0]
  • C2:[t1p0, t2p1, t0p1, t3p1]

可以看到,StickyAssignor将C1的分区按照Round Robin的方式分配给了C0和C2,在保证均衡的前提下,最大限度的保留了原有分配方案。

例子2:假设有3个Consumer C0、C1、C2;有3个主题t0、t1、t2,分区数依次为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。现在C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2。如果使用RoundRobin,前面已经展示过一次了,分配结果为:

  • C0:[t0p0]
  • C1:[t1p0]
  • C2:[t1p1, t2p0, t2p1, t2p2]

但如果使用StickyAssignor的话,分配结果为:

  • C0:[t0p0]
  • C1:[t1p0, t1p1]
  • C2:[t2p0, t2p1, t2p2]

此时,如果C0挂掉,RoundRobin重新分配后的结果为:

  • C1:[t0p0, t1p1]
  • C2:[t1p0, t2p0, t2p1, t2p2]

有3个分区分配没有变化,且分配不均匀。

但StickyAssignor重新分配的结果为:

  • C1:[t1p0, t1p1, t0p0]
  • C2:[t2p0, t2p1, t2p2]

有5个分区分配没有变化,且分配均匀。

CooperativeStickyAssignor

该策略具体的分配方式和前面的StickyAssignor是一样的,但有一个重要的区别是该策略会使用Cooperative Rebalance,而StickyAssignor使用的则是Eager Rebalance,这两种Rebalance的区别参见我之前的文章,这里不再赘述。

对比总结

就通用场景而言,进行分区分配的时候,一方面我们比较关注分配的均衡性;另一方面也会比较关注当发生Consumer Group Rebalance的时候,能否最大限度的保持原有的分配。从这两个角度来看:

  • 关于均衡:RangeAssignor和RoundRobinAssignor的分配是否均衡,主要取决于组内Consumer订阅的主题情况以及每个主题的分区个数。而StickyAssignor和CooperativeAssignor则不太依赖这个条件,相当于在任何情况下,都能实现一个相对均衡的分配方案。
  • 当发生Rebalance的时候最大限度保持原有分配方案:RangeAssignor和RoundRobinAssignor算法自身其实完全没有考虑这点,要实现这个功能点,需要结合static membership特性来实现,即指定group.instance.id,这样相当于确定了Consumer的顺序,只要组内Consumer不变、订阅信息不变,就能有一个稳定的分配结果。而StickyAssignor和CooperativeAssignor则考虑了这点,但有一个注意点就是对于StickyAssignor,虽然会尽量保留原有的分配方案,但因为使用的是Eager Rebalance,所以在Rebalance的时候还是会回收所有分区,而CooperativeAssignor使用的是Cooperative Rebalance,所以只会回收有变化的分区。

一般而言,建议新的系统(Kafka 2.4及之后版本)使用CooperativeAssignor。当然,我们也可以实现自己的PartitionAssignor。