什么是Consumer Group Rebalance?

Kafka Consumer创建的时候都要指定一个组ID(group id),所有组ID一样的Consumer就组成了一个Consumer Group。对于一个Partition同一时刻只会分配给同一个Group内某一个Consumer,这就是大家熟知的Kafka消费模型。通过这个模型,Kafka的消费者(也就是应用/服务)可以很方便的实现Load Balance、HA和水平扩展。简单说这个模型就相当于现在有M个Partition,N个Consumer,然后把这M个Partition平均分配给N个Consumer,而且分配的时候有个限制条件:一个Partition只能分配给一个Consumer,Consumer Group Rebalance就是在需要的时候去做这个分配工作的,而且它的大原则是尽量保证Partition均衡的分配给组内各个Consumer。

那什么时候需要,或者说什么时候会发生Consumer Group Rebalance呢?看前面描述的职责就已经很明确了,当M或者N值发生变化的时候就需要Rebalance了,准确点就是:当Group内的Consumer个数或者Consumer订阅的Partition个数发生变化的时候就需要Rebalance了。下面列举一些常见的场景:

  • Group内有新的Consumer加入,比如应用扩容了、修改了已有应用的并行度(N值发生变化)
  • Group内Consumer个数减少了,比如应用缩容、挂掉、poll超时等(N值发生变化)
  • 现有Consumer修改了订阅的Topic,导致Group内的Partition个数变了(M值发生变化)
  • 已订阅的Topic修改了Partition的个数(M值发生变化)
  • ...

上面这些场景有些是主动的,有些是被动的,有些是无法避免的,有些是可以避免的,有些是正常的,有些是代码bug导致的...总之,当发现资源(Partition)有变更,需要重新分配以消除“贫富差距”的时候,就会发生Consumer Group Rebalance了。但是资源的分配不论是在现实世界,还是在分布式的世界中,都是一个难题。下面介绍Kafka是怎么做的。

Rebalance介绍

实质上,Rebalance是一个抽象、通用的资源分配协议,它不光可以用于Partition这种资源的分配,在Kafka中,有多个地方都有使用:

  • Confluent Schema Registry:使用Rebalance协议来选主
  • Kafka Connect:使用Rebalance协议来给connector分配任务
  • Kafka Stream:使用Rebalance协议来给实例分配Partition和任务

网上关于这新老协议的细节讲述已经非常多了,这里就概括性的介绍一下。

Rebalance Protocol

如下图,Rebalance协议由2部分组成,一部分在Broker端,一部分在Client端:

  • Group Membership Protocol(Broker端):主要负责整体协调
  • Client Embedded Protocol(Client端) :主要负责具体的资源分配

rebalance-protocol

这里注意一个细节就是一部分协议是在客户端的,而且用户可以按照约定好的协议进行自定义的实现,比如实现一个自己的资源分配方案,后面就会讲到。

下面还是以本文讨论的Consumer Group Rebalance的应用场景(即Partition资源的分配)来描述。对于每一个Consumer Group,都会有一个Coordinator节点(由某个Broker充当)负责这个Group的资源分配,也就是上面的Group Membership协议其实就是由这个Coordinator节点来实际运作的。假设现在新加入了一个Consumer,看下整个Rebalance过程的步骤:

  1. 该Consumer给Kafka集群发送FindCoordinator请求,找到它所属的Group对应的Coordinator;
  2. 找到后向Coordinator发送JoinGroup请求。该请求会携带客户端(即该Consumer)的一些用户配置(比如session.timeout.msmax.poll.interval.ms)和一些元数据(比如订阅了哪些主题等)。
  3. 收到JoinGroup请求后,Coordinator通过心跳响应(Heartbeat)响应通知组内其它成员要开始Rebalance了。然后其它Consumer像这个新加入的Consumer一样,也发送JoinGroup请求给Coordinator。
  4. 当Coordinator收到组内所有成员JoinGroup请求以后,会给所有成员发送一个JoinGroup响应。其中给Group Leader(加入组的第一个成员)发送的Response里面包含了成员信息、资源分配策略等元数据,其它成员则是一个空的Response。这个Leader拿到这些信息以后,本地计算出分配结果。
  5. 所有成员向Coordinator发送SyncGroup请求,Leader的请求中会包含自己计算的分配结果,其它成员则是空请求。
  6. Coordinator将Leader的分配结果通过SyncGroup响应发送给各个成员。如果Consumer实现了ConsumerRebalanceListener接口,则会调用该接口的onPartitionsAssignedMethod方法。

至此,整个Rebalance过程就结束了,这里再补充一些细节:

  • 上面提到了一个心跳的概念:Consumer内部有一个心跳线程定时发送心跳给Coordinator,以让Coordinator知道自己还活着。当需要Rebalance的时候,Coordinator会在心跳响应中通知所有Consumer要进行重平衡了,这就是上面提到的通过心跳通知。
  • 上面举的例子是由一个新加入的Consumer触发了Rebalance。很多其它行为也会触发,前面已经列举过常见的场景了。结合现在的流程和心跳知识再细化一下触发场景,比如当有Consumer正常停止的时候,在结束之前会发送LeaveGroup请求给Coordinator;如果是异常停止,Coordinator会通过心跳超时来判断Consumer已经没了。当然实际中,可能Consumer其实正常着,只是因为网络原因心跳超时了,或者Consumer里面没有及时调用poll方法等。
  • 前面提到Rebalance协议分为Broker端的“Group Membership Protocol”部分和Client端的“Client Embedded Protocol”部分,上面Group Leader计算分配方案,就属于“Client Embedded Protocol”部分的功能。提这个是因为Client这部分的协议除了默认的一些实现外,用户可以去自定义实现,后面马上要讲到的改进方案Incremental Cooperative Rebalance其实就是在这里实现的。

再放一个图(图片来自于引用文章From Eager to Smarter in Apache Kafka Consumer Rebalances,下同):
rebalance

问题分析

优化之前肯定要先分析清楚现有的问题,才能有针对性的进行优化。其实从前面的介绍我们已经很清楚,Rebalance要做的事情很简单:将M个资源(Partition/Task/Connector)平均分配给N个成员(Consumer/Instance/Worker),每个资源只能被一个成员拥有。事情本身不难,但难就难在需要在分布式环境中做这个分配工作。分布式环境中在任意时刻,网络可能分区、节点可能故障、还存在竞态条件(race condition),简单说就是分布式环境中无法实现可靠的通信,这让整个问题复杂化了。

前面介绍了现在的Rebalance开始的时候回收(revoke)所有成员的资源,然后大家一起参与Rebalance过程,等拿到新的资源分配方案,又重新开始工作。具体应用到Partition的分配,就是所有Consumer在发送JoinGroup请求前需要停止从Partition消费,“上交”自己拥有的Partition。这样当Coordinator收到所有Consumer的JoinGroup请求的时候,所有的Partition就处于未分配状态,此时整个系统达到了一个同步状态(Synchronization barrier):

eager-rebalance

所以,在重新分配之前,先回收所有资源其实是为了在不可靠的分布式环境中简化分配工作。然而,按现在这种方式,在分区被回收到收到新的分配方案之前,所有成员都无法工作,即“Stop The World”(借鉴了GC里面的概念),这也是Rebalance存在的最大的问题。默认Rebalance流程的超时时间为5分钟,也就是最差情况下,“Stop The World”效果可能持续5分钟。所以需要针对这个问题进行优化,思路也有两种:

  • 尽量减少Rebalance的发生
  • 减少Rebalance中“Stop The World”的影响

社区在2.3版本中同时引入了两个优化方案:KIP-345: Static MembershipKIP-429: Kafka Consumer Incremental Rebalance Protocol分别按照上述两种思路进行优化,下面分别介绍。

改进点1:Static Membership

Static Membership主要的优化目标是减少“闪断”场景导致的Rebalance,即解决的思路主要是尽量减少Rebalance的发生,我们看下是如何优化的。

在每次Rebalance的时候,Coordinator会随机给每个成员分配一个唯一的ID。然后当有新成员加入的时候,它的ID会是一个空字符串UNKNOWN_MEMBER_ID,这样Coordinator就知道它是新加入的,需要进行Rebalance了。Static Membership方案是给Consumer增加了group.instance.id选项,由用户负责设置以及保证唯一性,这个ID会替换原来由Coordinator每次Rebalance随机生成的ID(随机生成称之为“Dynamic Membership”),并且这个ID信息会加到JoinGroup请求中。那这个ID有什么用呢?

举个例子:某一刻Consumer应用因为内存使用过高,被系统OOM Killer干掉了,然后很快又被守护进程或者人为启动起来的。这个时候,如果是以前的情况,Coordinator会认为是有新的Consumer加入,需要进行一轮Rebalance,但如果是Static Membership的情况下,Coordinator通过ID发现这个Consumer之前就有,就不会重新触发整个Rebalance,而是将缓存的之前分配给该Consumer的Partition直接返回给他,这样就一定程度上避免了因为闪断导致的Rebalance。

当然,这里我用了“闪断”,主要是想表达意外挂掉又很快恢复的情况,更具体点:

  • 意外挂掉:指被kill、网络闪断等不会主动(或者说没有机会)给Coordinator发送LeaveGroup请求的场景。因为如果主动给Coordinator发送了LeaveGroup请求的话,Coordinator会马上开始一轮Rebalance。
  • 很快恢复:指在Coordinator检测到Consumer挂掉之前,就恢复了。具体点说就是在session.timeout.ms或者max.poll.interval.ms时间内就恢复了,否则Coordinator会认为Consumer挂了,开始Rebalance。这里简单提一下这两个配置项。在0.10.0及之前的版本中,心跳是和poll在一个线程里面的,只有session.timeout.ms一个参数。后来进行了优化拆分(KIP-62: Allow consumer to send heartbeats from a background thread),心跳是一个单独的线程,poll是一个线程,session.timeout.ms仍然是心跳的超时时间,而max.poll.interval.ms则是poll线程的超时时间。不管哪一个超时,Coordinator都会认为Consumer挂了,需要Rebalance。

如果我们要使用Static Membership特性,需要给Consumer增加group.instance.id设置。同时尽量将上面提到的超时时间设置的长一些。但显然弊端就是Consumer如果真的挂掉且无法恢复的话,Coordinator需要等较长一段时间才能发现,相当于牺牲了一定的可用性。果然没有免费的蛋糕。

改进点2:Incremental Cooperative Rebalancing

不同于Static Membership,Incremental Cooperative Rebalancing的思路是尽量减少Rebalance中“Stop The World”的时间和范围。那怎么做的呢?有这么几个关键点:

  • 所有成员还是会发送JoinGroup请求,但这次发送的时候资源并不会被回收(即不会停止工作),大家只是将自己目前拥有的资源信息加到元数据里面,发送给Coordinator。然后Coordinator把这些信息发送给Group Leader,Leader根据这些信息计算新的分配方案,计算的时候在保证均衡的情况下尽量对现有状态做最小改动(实际由实现的分配算法决定,默认的StickyAssianor策略就是这种),换句话说最核心的就是看哪些资源变更了成员,那就需要从原拥有者那里剔除这个资源,然后加到新的拥有者那里。
  • 然后Coordinator会将新的分配方案按照原有的方式通过SyncGroup响应发送给各个成员。各个成员收到新的分配方案以后,会和自己的现状做对比,如果没有变化或者只是新增了资源,则不需要额外做什么。但如果发现有资源被回收,则继续Rebalance的流程,接下来的流程和老版本的协议几乎一样,也需要回收资源,并发送JoinGroup请求,但这里仅回收需要被回收的资源。比如某个ConsumerRebalance之前拥有1、3、5三个分区,Rebalance中重新计算的新方案里面是1、3两个分区,则只回收5。

可以看到Incremental Cooperative Rebalancing是将原有的Rebalance流程进行了细化(分成了多轮),延迟了资源回收的时间和范围,改进后的Rebalance流程如下图:

Incremental Cooperative Rebalancing

那如何使用Incremental Cooperative Rebalancing呢?通过配置项partition.assignment.strategy进行配置,可以配置多个,越靠前优先级越高。前面提到了Rebalance协议分两部分,这里配置的其实就是客户端“Client Embedded Protocol”的实现类。2.8版本中已经支持的有:

  • org.apache.kafka.clients.consumer.RangeAssignor(默认值)
  • org.apache.kafka.clients.consumer.RoundRobinAssignor
  • org.apache.kafka.clients.consumer.StickyAssignor
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor

我们也可以通过实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口来实现自定义的Assignor。如果想使用Incremental Cooperative Rebalancing,就配置最后一个CooperativeStickyAssignor即可。不同Assignor的细节本文就不展开了,另外规划了一篇文章《Kafka的消费者分区分配策略》。更多关于Incremental Cooperative Rebalancing的细节,可以参考本文引用部分的文章:

总结

Kafka中的Rebalance本质上是解决分布式环境中资源分配的一种通用协议,由于分布式环境的复杂性,无法实现一个完美的方案,只能根据具体的场景进行有针对性的优化。比如实际中“闪断”是引起Rebalance的一种很常见且无法避免的原因,所以就有针对性的增加了Static Membership方案。另外Rebalance很严重的一个问题就是会“Stop The World”,然而实际中Rebalance的时候其实往往只需要变更极少量的资源所属权,所以就提出了Incremental Cooperative Rebalance方案,减少了Rebalance过程中“Stop The World”的时间和影响范围。好的架构不是设计出来的,而是进化而来的,Kafka Rebalance优化的脚步仍在继续。

另外,尽管现在已经做了诸多优化,效果也比较明显,但Rebalance仍然算是一个代价比较大的操作,实际应用的时候,我们还是要能避免的就避免。

References