本文讨论一下Kafka广义上的可靠性,所谓广义是从不同的角度、不同维度去观察系统的可靠性,比如从生产者(Producer)的角度看如何保证已经确认的数据一定写入了系统,不会丢失;从消费者(Consumer)的角度,只要数据没有彻底丢失,就可以读取。再比如从服务端(Broker)的角度,如何保证在可用性(Availability)和持久性(Durability)方面做权衡。

副本机制

分布式系统的可靠性一般都是通过副本(replicas)机制去实现的,Kafka也不例外。如下图:

kafka

Kafka的topic是逻辑概念,一个topic由若干个分区(partition)组成,副本就是在分区这个粒度上实现的。关于Kafka的副本有几个注意点:

  • Kafka的副本数默认为1。用户可以修改(replication factor),但不能超过节点个数,因为Kafka要保证所有副本不在同一个节点上面。
  • Kafka的副本里面有且仅有一个副本被称为Leader Replica(通过选举产生),剩下的称为Follower Replica。我们说副本个数或者配置副本个数时说的个数指两者的总和,而不是单指Follower Replica。比如默认的副本数为1(default.replication.factor),那就表示只有Leader Replica,没有Follower Replica。如果改为3,则表示有1个Leader Replica,2个Follower Replicas。
  • 生产者将数据写入Leader后,所有Follower主动去Leader拉取数据,而不是Leader或者生产者向Follower推,这个拉的过程和消费者消费是同一套机制和接口,也就是Follower其实只是一种特殊点的消费者。
  • Follower和Leader的数据及offset是完全一样的(因为Follower是异步拉取Leader数据,所以在某一时刻数据可能会略有滞后)。
  • 只有Leader对外提供服务(即供消费者消费),Follower仅是起数据冗余的作用。

对于最后一条,Kafka为何要这样设计,我摘抄了极客时间《Kafka核心技术与实战》专栏里面的解释,供参考:

  1. 方便实现“Read-your-writes”:所谓Read-your-writes,顾名思义就是,当你使用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息。举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许追随者副本对外提供服务,由于副本同步是异步的,因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息
  2. 方便实现单调读(Monotonic Reads):什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。

另外,我个人觉得Kafka已经通过将topic分多个Partition的机制将整个数据分配到了多个节点上面,不管是写还是读,都已经达到了负载均衡的效果,选择只有Leader对外服务的机制也无伤大雅,还能简化系统的设计。

生产者的确认机制

对于存在副本的系统,从生产者角度,人们最关心的一个问题往往就是生产者写数据的时候是写主副本成功就返回,还是大部分写成功就返回,还是等所有的副本都成功才返回。比如之前介绍的ES,属于最后一种,所有副本写成功才返回,用户不可选择。而Kafka则灵活了很多,默认主副本写成功就返回,但用户也可以灵活配置。

生产者生产消息时,可以选择从服务端获取确认(ack)的方式,有三种:

  • ack=0:数据发送后,不需要服务端确认;
  • ack=1:默认值,数据发送后,Leader Partition写成功就返回;
  • ack=all(或者-1),ISR中所有节点的Partition写成功才返回;

前两种就不说了,重点是最后面all的情况。很容易理解成设置为all,就能保证数据成功写入Leader和所有Follower才返回,其实并不总是这样的。有时会退化成ack=1的情况。不过先简单介绍下ISR。

ISR

Kafka集群认为集群中的一个节点还存活着需要同时满足两个条件:

  1. 节点在zk中注册的临时节点存在;
  2. 如果该节点(的Partition)是Follower,则其与Leader的数据差距不能超过replica.lag.time.max.ms

第1点很好理解,典型的利用zk临时节点特性做服务发现的场景,不再赘述。第2点的话从之前的图里面可以看到,Follower是定期主动去Leader那里拉取数据的,如果它在replica.lag.time.max.ms时间内没有发起拉取请求,或者拉取的太慢,在这个时间范围内还没赶上Leader节点,那Leader就认为该Follower的数据已经和自己不同步了。Leader维护了一个列表,这个列表里面是自己以及和自己同步的Follower,我们把这个列表称为ISR(全称“in-sync replica”)。如果某个Follower和自己不同步了,Leader就会从ISR中将该Follower移除。后面赶上后,又会加进来。

看个例子,我们有3个Kafka节点,broker id分别为181、182、183。现在我们创建一个名为test-1的topic,并且给设置4个Partition,副本个数设置为2:

# 创建topic-1
bin/kafka-topics.sh \
--zookeeper localhost:2181/kafka_26 \
--create \
--topic test-1 \
--partitions 4  \
--replication-factor 2
    
    # 查看topic-1
 bin/kafka-topics.sh --describe --topic test-1  --zookeeper localhost:2181/kafka_26
Topic: test-1   PartitionCount: 4       ReplicationFactor: 2    Configs: 
    Topic: test-1   Partition: 0    Leader: 183     Replicas: 183,181       Isr: 183,181
    Topic: test-1   Partition: 1    Leader: 181     Replicas: 181,182       Isr: 181,182
    Topic: test-1   Partition: 2    Leader: 182     Replicas: 182,183       Isr: 182,183
    Topic: test-1   Partition: 3    Leader: 183     Replicas: 183,182       Isr: 183,182

回到之前的问题,ack=all只保证数据全部写入了ISR中的副本,最差情况下,如果所有的Follower都因为太慢被剔除了ISR,那ISR中就只剩Leader了,此时其实效果和ack=1是一模一样的。那为了避免这种问题,Kafka给了一个参数min.insync.replicas,这个参数的含义是当我们指定ack=all时,ISR中至少要有min.insync.replicas个副本写成功,生产者才会认为数据写成功了,否则生产者就会抛NotEnoughReplicas或者NotEnoughReplicasAfterAppend异常。这个参数的默认值是1。

这里有个注意点就是理论上min.insync.replicas的值应该小于等于replication.facotr,但实际Kafka并没有校验这个,也就是如果你设置的错了,在没有真正写数据,或者ack的值不是all的时候也不会报错。比如下面的创建语句也不会报错的:

bin/kafka-topics.sh \
    --zookeeper localhost:2181/kafka_26 \
    --create \
    --topic test-1 \
    --partitions 4  \
    --replication-factor 3 \
    --config min.insync.replicas=4

所以,如果追求系统的Durability,那我们应该

  1. 至少有2个Broker;
  2. 将副本数(replication.factor)设置为[2, Broker个数]范围内的某个值。
  3. min.insync.replicas设置为[2, replication.factor]范围内的某个值。

副本故障

副本故障涉及两个关键知识点,第一个直接说结论:副本故障后,Kafka不会自动重新分配新的副本。比如某个topic设置了2个副本(replication.factor=2),那其中1个故障后,就只剩1个副本了,Kafka不会自动在其它可用节点上面重新分配一个新的副本,以保证副本数足够。如果碰巧故障分区包含Leader副本,那Kafka会从ISR中选举一个Follower作为新的Leader(但此时,副本数仍然是比预设的少一个的)。如果出现这种情况,可以使用Kafka提供的kafka-reassign-partitions.sh脚本手动恢复分区数(具体操作见:Kafka的扩容和缩容

而如何选举Leader就是第二个关键知识点。如果Leader和所有Follower都故障了,那数据肯定是丢了。但如果只是ISR中的副本全丢了,但ISR之外还有Follower,并且还存活着,那默认情况下数据也是丢了,因为默认Leader只会从ISR中选。对于这种情况,实际中我们可能愿意接受从存活的Follower中选取新的Leader,毕竟这样只丢一部分数据(Follower滞后于Leader的那部分数据)。Kafka提供了一个参数unclean.leader.election.enable,默认为false,如果设置为true,则允许选择ISR之外的Follower作为新的Leader。其实就是可用性和一致性之前的一个权衡。

总结

综合上面的这些介绍,可以知道,Kafka提供的可靠性其实是有诸多条件的,官方的描述如下:

The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.

另外,对于一致性和可用性的权衡,也是提供了多个参数,让用户去根据需要,灵活选择。如上篇文章介绍,Kafka是在明确需求场景下设计的系统,追求的不是完美,而是真实场景中的实用性。