NYC's Blog - Kafka 2022-02-06T11:58:00+08:00 Typecho http://niyanchun.com/feed/atom/tag/kafka/ <![CDATA[Kafka的消费者分区分配策略]]> http://niyanchun.com/Kafka-consumer-partition-assignor.html 2022-02-06T11:58:00+08:00 2022-02-06T11:58:00+08:00 NYC https://niyanchun.com 本文是《Kafka的Consumer Group Rebalance》一文的补充部分,主要附加介绍一下Kafka内置的几种分区分配策略。

Kafka定义了一个消费者组内分区分配的接口ConsumerPartitionAssignor,该接口里面最核心的是assign方法:

package org.apache.kafka.clients.consumer;

public interface ConsumerPartitionAssignor {
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
    // 省略其它
}

该方法的两个参数分别是当前集群信息metadata和本组成员订阅的主题信息groupSubscription,根据这两个信息计算分配方案GroupAssignment。目前Kafka已经实现了几种具体的策略:

  • RangeAssignor(2.8版本中的默认策略)
  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

完整的UML图如下:

uml

下面分别介绍。

RangeAssignor

该分配算法是逐个topic进行分配的(per-topic basis),对于每个topic:将分区按数值排序,将Consumer按member.id(如果用户指定了group.instance.id,则使用该id作为member.id,否则随机生成)进行字典序排列。用分区数除以消费者个数得出每个Consumer应该分配的分区个数N(不能整除时向上取整),然后依次给每个Consumer一次分配N个分区(最后一个可能不足N个)。

比如现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2. 假设消费者排序后顺序为C0、C1,先开始分配topic t0,3个分区/2个Consumer等于1.5,向上取整为2,即每个Consumer分配2个分区,于是t0的分配结果为:

  • C0:[t0p0, t0p1]
  • C1:[t0p2]

然后再分配topic t1,和上面同理,分配结果如下:

  • C0:[t1p0, t1p1]
  • C1:[t1p2]​

所以最终合并后的分配结果为:

  • C0:[t0p0, t0p1, t1p0, t1p1]
  • C1:[t0p2, t1p2]

RoundRobinAssignor

Round Robin策略和Range的不同之处在于它是将所有topic的分区放在一起进行分配的。具体方式为:先将Consumer按member.id进行排序,将所有分区按数值排序。然后将分区以round robin的方式依次(每次一次)分配给各个Consumer。如果Consumer订阅的topic有差异的话,在分配某个topic的Partition的时候,如果当前Consumer没有订阅该topic,就会跳过该Consumer。举两个例子:

例子1:组内Consumer订阅信息相同。假设现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2。将这6个分区以round-robin的方式分配给C0和C1,分配结果为:

  • C0:[t0p0, t0p2, t1p1]
  • C1:[t0p1, t1p0, t1p2]

例子2:组内Consumer订阅信息不同。假设有3个Consumer C0、C1、C2;有3个topic t0、t1、t2,3个topic的分区数分别为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。其中C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2,则分配结果为:

  • C0:[t0p0]
  • C1:[t1p0]
  • C2:[t1p1, t2p0, t2p1, t2p2]

StickyAssignor

StickyAssignor算法在分配时有2个重要的考虑点:

  1. 尽量让分区分配的比较均衡
  2. 当发生重新分配的时候(即Rebalance的时候),在保证1的前提下,尽量保持原有的分配方案不变动

看2个例子。

例子1:假设有3个Consumer C0、C1、C2;有4个主题t0、t1、t2、t3,每个分区有2个分区,即所有分区为:t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1。现在所有Consumer都订阅了4个主题,则分配结果如下:

  • C0:[t0p0, t1p1, t3p0]
  • C1:[t0p1, t2p0, t3p1]
  • C2:[t1p0, t2p1]

这个结果和前面RoundRobinAssignor的分配结果是一样的。但当发生重新分配的时候,就不一样了。假设,现在C1挂掉了,需要重新分配。如果是RoundRobinAssignor,重新分配后的结果如下:

  • C0:[t0p0, t1p0, t2p0, t3p0]
  • C2:[t0p1, t1p1, t2p1, t3p1]

但如果使用StickyAssignor的话,重新分配后的结果如下:

  • C0:[t0p0, t1p1, t3p0, t2p0]
  • C2:[t1p0, t2p1, t0p1, t3p1]

可以看到,StickyAssignor将C1的分区按照Round Robin的方式分配给了C0和C2,在保证均衡的前提下,最大限度的保留了原有分配方案。

例子2:假设有3个Consumer C0、C1、C2;有3个主题t0、t1、t2,分区数依次为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。现在C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2。如果使用RoundRobin,前面已经展示过一次了,分配结果为:

  • C0:[t0p0]
  • C1:[t1p0]
  • C2:[t1p1, t2p0, t2p1, t2p2]

但如果使用StickyAssignor的话,分配结果为:

  • C0:[t0p0]
  • C1:[t1p0, t1p1]
  • C2:[t2p0, t2p1, t2p2]

此时,如果C0挂掉,RoundRobin重新分配后的结果为:

  • C1:[t0p0, t1p1]
  • C2:[t1p0, t2p0, t2p1, t2p2]

有3个分区分配没有变化,且分配不均匀。

但StickyAssignor重新分配的结果为:

  • C1:[t1p0, t1p1, t0p0]
  • C2:[t2p0, t2p1, t2p2]

有5个分区分配没有变化,且分配均匀。

CooperativeStickyAssignor

该策略具体的分配方式和前面的StickyAssignor是一样的,但有一个重要的区别是该策略会使用Cooperative Rebalance,而StickyAssignor使用的则是Eager Rebalance,这两种Rebalance的区别参见我之前的文章,这里不再赘述。

对比总结

就通用场景而言,进行分区分配的时候,一方面我们比较关注分配的均衡性;另一方面也会比较关注当发生Consumer Group Rebalance的时候,能否最大限度的保持原有的分配。从这两个角度来看:

  • 关于均衡:RangeAssignor和RoundRobinAssignor的分配是否均衡,主要取决于组内Consumer订阅的主题情况以及每个主题的分区个数。而StickyAssignor和CooperativeAssignor则不太依赖这个条件,相当于在任何情况下,都能实现一个相对均衡的分配方案。
  • 当发生Rebalance的时候最大限度保持原有分配方案:RangeAssignor和RoundRobinAssignor算法自身其实完全没有考虑这点,要实现这个功能点,需要结合static membership特性来实现,即指定group.instance.id,这样相当于确定了Consumer的顺序,只要组内Consumer不变、订阅信息不变,就能有一个稳定的分配结果。而StickyAssignor和CooperativeAssignor则考虑了这点,但有一个注意点就是对于StickyAssignor,虽然会尽量保留原有的分配方案,但因为使用的是Eager Rebalance,所以在Rebalance的时候还是会回收所有分区,而CooperativeAssignor使用的是Cooperative Rebalance,所以只会回收有变化的分区。

一般而言,建议新的系统(Kafka 2.4及之后版本)使用CooperativeAssignor。当然,我们也可以实现自己的PartitionAssignor。

]]>
<![CDATA[Kafka的Consumer Group Rebalance]]> http://niyanchun.com/kafka-consumer-group-rebalance.html 2022-01-27T20:59:26+08:00 2022-01-27T20:59:26+08:00 NYC https://niyanchun.com 什么是Consumer Group Rebalance?

Kafka Consumer创建的时候都要指定一个组ID(group id),所有组ID一样的Consumer就组成了一个Consumer Group。对于一个Partition同一时刻只会分配给同一个Group内某一个Consumer,这就是大家熟知的Kafka消费模型。通过这个模型,Kafka的消费者(也就是应用/服务)可以很方便的实现Load Balance、HA和水平扩展。简单说这个模型就相当于现在有M个Partition,N个Consumer,然后把这M个Partition平均分配给N个Consumer,而且分配的时候有个限制条件:一个Partition只能分配给一个Consumer,Consumer Group Rebalance就是在需要的时候去做这个分配工作的,而且它的大原则是尽量保证Partition均衡的分配给组内各个Consumer。

那什么时候需要,或者说什么时候会发生Consumer Group Rebalance呢?看前面描述的职责就已经很明确了,当M或者N值发生变化的时候就需要Rebalance了,准确点就是:当Group内的Consumer个数或者Consumer订阅的Partition个数发生变化的时候就需要Rebalance了。下面列举一些常见的场景:

  • Group内有新的Consumer加入,比如应用扩容了、修改了已有应用的并行度(N值发生变化)
  • Group内Consumer个数减少了,比如应用缩容、挂掉、poll超时等(N值发生变化)
  • 现有Consumer修改了订阅的Topic,导致Group内的Partition个数变了(M值发生变化)
  • 已订阅的Topic修改了Partition的个数(M值发生变化)
  • ...

上面这些场景有些是主动的,有些是被动的,有些是无法避免的,有些是可以避免的,有些是正常的,有些是代码bug导致的...总之,当发现资源(Partition)有变更,需要重新分配以消除“贫富差距”的时候,就会发生Consumer Group Rebalance了。但是资源的分配不论是在现实世界,还是在分布式的世界中,都是一个难题。下面介绍Kafka是怎么做的。

Rebalance介绍

实质上,Rebalance是一个抽象、通用的资源分配协议,它不光可以用于Partition这种资源的分配,在Kafka中,有多个地方都有使用:

  • Confluent Schema Registry:使用Rebalance协议来选主
  • Kafka Connect:使用Rebalance协议来给connector分配任务
  • Kafka Stream:使用Rebalance协议来给实例分配Partition和任务

网上关于这新老协议的细节讲述已经非常多了,这里就概括性的介绍一下。

Rebalance Protocol

如下图,Rebalance协议由2部分组成,一部分在Broker端,一部分在Client端:

  • Group Membership Protocol(Broker端):主要负责整体协调
  • Client Embedded Protocol(Client端) :主要负责具体的资源分配

rebalance-protocol

这里注意一个细节就是一部分协议是在客户端的,而且用户可以按照约定好的协议进行自定义的实现,比如实现一个自己的资源分配方案,后面就会讲到。

下面还是以本文讨论的Consumer Group Rebalance的应用场景(即Partition资源的分配)来描述。对于每一个Consumer Group,都会有一个Coordinator节点(由某个Broker充当)负责这个Group的资源分配,也就是上面的Group Membership协议其实就是由这个Coordinator节点来实际运作的。假设现在新加入了一个Consumer,看下整个Rebalance过程的步骤:

  1. 该Consumer给Kafka集群发送FindCoordinator请求,找到它所属的Group对应的Coordinator;
  2. 找到后向Coordinator发送JoinGroup请求。该请求会携带客户端(即该Consumer)的一些用户配置(比如session.timeout.msmax.poll.interval.ms)和一些元数据(比如订阅了哪些主题等)。
  3. 收到JoinGroup请求后,Coordinator通过心跳响应(Heartbeat)响应通知组内其它成员要开始Rebalance了。然后其它Consumer像这个新加入的Consumer一样,也发送JoinGroup请求给Coordinator。
  4. 当Coordinator收到组内所有成员JoinGroup请求以后,会给所有成员发送一个JoinGroup响应。其中给Group Leader(加入组的第一个成员)发送的Response里面包含了成员信息、资源分配策略等元数据,其它成员则是一个空的Response。这个Leader拿到这些信息以后,本地计算出分配结果。
  5. 所有成员向Coordinator发送SyncGroup请求,Leader的请求中会包含自己计算的分配结果,其它成员则是空请求。
  6. Coordinator将Leader的分配结果通过SyncGroup响应发送给各个成员。如果Consumer实现了ConsumerRebalanceListener接口,则会调用该接口的onPartitionsAssignedMethod方法。

至此,整个Rebalance过程就结束了,这里再补充一些细节:

  • 上面提到了一个心跳的概念:Consumer内部有一个心跳线程定时发送心跳给Coordinator,以让Coordinator知道自己还活着。当需要Rebalance的时候,Coordinator会在心跳响应中通知所有Consumer要进行重平衡了,这就是上面提到的通过心跳通知。
  • 上面举的例子是由一个新加入的Consumer触发了Rebalance。很多其它行为也会触发,前面已经列举过常见的场景了。结合现在的流程和心跳知识再细化一下触发场景,比如当有Consumer正常停止的时候,在结束之前会发送LeaveGroup请求给Coordinator;如果是异常停止,Coordinator会通过心跳超时来判断Consumer已经没了。当然实际中,可能Consumer其实正常着,只是因为网络原因心跳超时了,或者Consumer里面没有及时调用poll方法等。
  • 前面提到Rebalance协议分为Broker端的“Group Membership Protocol”部分和Client端的“Client Embedded Protocol”部分,上面Group Leader计算分配方案,就属于“Client Embedded Protocol”部分的功能。提这个是因为Client这部分的协议除了默认的一些实现外,用户可以去自定义实现,后面马上要讲到的改进方案Incremental Cooperative Rebalance其实就是在这里实现的。

再放一个图(图片来自于引用文章From Eager to Smarter in Apache Kafka Consumer Rebalances,下同):
rebalance

问题分析

优化之前肯定要先分析清楚现有的问题,才能有针对性的进行优化。其实从前面的介绍我们已经很清楚,Rebalance要做的事情很简单:将M个资源(Partition/Task/Connector)平均分配给N个成员(Consumer/Instance/Worker),每个资源只能被一个成员拥有。事情本身不难,但难就难在需要在分布式环境中做这个分配工作。分布式环境中在任意时刻,网络可能分区、节点可能故障、还存在竞态条件(race condition),简单说就是分布式环境中无法实现可靠的通信,这让整个问题复杂化了。

前面介绍了现在的Rebalance开始的时候回收(revoke)所有成员的资源,然后大家一起参与Rebalance过程,等拿到新的资源分配方案,又重新开始工作。具体应用到Partition的分配,就是所有Consumer在发送JoinGroup请求前需要停止从Partition消费,“上交”自己拥有的Partition。这样当Coordinator收到所有Consumer的JoinGroup请求的时候,所有的Partition就处于未分配状态,此时整个系统达到了一个同步状态(Synchronization barrier):

eager-rebalance

所以,在重新分配之前,先回收所有资源其实是为了在不可靠的分布式环境中简化分配工作。然而,按现在这种方式,在分区被回收到收到新的分配方案之前,所有成员都无法工作,即“Stop The World”(借鉴了GC里面的概念),这也是Rebalance存在的最大的问题。默认Rebalance流程的超时时间为5分钟,也就是最差情况下,“Stop The World”效果可能持续5分钟。所以需要针对这个问题进行优化,思路也有两种:

  • 尽量减少Rebalance的发生
  • 减少Rebalance中“Stop The World”的影响

社区在2.3版本中同时引入了两个优化方案:KIP-345: Static MembershipKIP-429: Kafka Consumer Incremental Rebalance Protocol分别按照上述两种思路进行优化,下面分别介绍。

改进点1:Static Membership

Static Membership主要的优化目标是减少“闪断”场景导致的Rebalance,即解决的思路主要是尽量减少Rebalance的发生,我们看下是如何优化的。

在每次Rebalance的时候,Coordinator会随机给每个成员分配一个唯一的ID。然后当有新成员加入的时候,它的ID会是一个空字符串UNKNOWN_MEMBER_ID,这样Coordinator就知道它是新加入的,需要进行Rebalance了。Static Membership方案是给Consumer增加了group.instance.id选项,由用户负责设置以及保证唯一性,这个ID会替换原来由Coordinator每次Rebalance随机生成的ID(随机生成称之为“Dynamic Membership”),并且这个ID信息会加到JoinGroup请求中。那这个ID有什么用呢?

举个例子:某一刻Consumer应用因为内存使用过高,被系统OOM Killer干掉了,然后很快又被守护进程或者人为启动起来的。这个时候,如果是以前的情况,Coordinator会认为是有新的Consumer加入,需要进行一轮Rebalance,但如果是Static Membership的情况下,Coordinator通过ID发现这个Consumer之前就有,就不会重新触发整个Rebalance,而是将缓存的之前分配给该Consumer的Partition直接返回给他,这样就一定程度上避免了因为闪断导致的Rebalance。

当然,这里我用了“闪断”,主要是想表达意外挂掉又很快恢复的情况,更具体点:

  • 意外挂掉:指被kill、网络闪断等不会主动(或者说没有机会)给Coordinator发送LeaveGroup请求的场景。因为如果主动给Coordinator发送了LeaveGroup请求的话,Coordinator会马上开始一轮Rebalance。
  • 很快恢复:指在Coordinator检测到Consumer挂掉之前,就恢复了。具体点说就是在session.timeout.ms或者max.poll.interval.ms时间内就恢复了,否则Coordinator会认为Consumer挂了,开始Rebalance。这里简单提一下这两个配置项。在0.10.0及之前的版本中,心跳是和poll在一个线程里面的,只有session.timeout.ms一个参数。后来进行了优化拆分(KIP-62: Allow consumer to send heartbeats from a background thread),心跳是一个单独的线程,poll是一个线程,session.timeout.ms仍然是心跳的超时时间,而max.poll.interval.ms则是poll线程的超时时间。不管哪一个超时,Coordinator都会认为Consumer挂了,需要Rebalance。

如果我们要使用Static Membership特性,需要给Consumer增加group.instance.id设置。同时尽量将上面提到的超时时间设置的长一些。但显然弊端就是Consumer如果真的挂掉且无法恢复的话,Coordinator需要等较长一段时间才能发现,相当于牺牲了一定的可用性。果然没有免费的蛋糕。

改进点2:Incremental Cooperative Rebalancing

不同于Static Membership,Incremental Cooperative Rebalancing的思路是尽量减少Rebalance中“Stop The World”的时间和范围。那怎么做的呢?有这么几个关键点:

  • 所有成员还是会发送JoinGroup请求,但这次发送的时候资源并不会被回收(即不会停止工作),大家只是将自己目前拥有的资源信息加到元数据里面,发送给Coordinator。然后Coordinator把这些信息发送给Group Leader,Leader根据这些信息计算新的分配方案,计算的时候在保证均衡的情况下尽量对现有状态做最小改动(实际由实现的分配算法决定,默认的StickyAssianor策略就是这种),换句话说最核心的就是看哪些资源变更了成员,那就需要从原拥有者那里剔除这个资源,然后加到新的拥有者那里。
  • 然后Coordinator会将新的分配方案按照原有的方式通过SyncGroup响应发送给各个成员。各个成员收到新的分配方案以后,会和自己的现状做对比,如果没有变化或者只是新增了资源,则不需要额外做什么。但如果发现有资源被回收,则继续Rebalance的流程,接下来的流程和老版本的协议几乎一样,也需要回收资源,并发送JoinGroup请求,但这里仅回收需要被回收的资源。比如某个ConsumerRebalance之前拥有1、3、5三个分区,Rebalance中重新计算的新方案里面是1、3两个分区,则只回收5。

可以看到Incremental Cooperative Rebalancing是将原有的Rebalance流程进行了细化(分成了多轮),延迟了资源回收的时间和范围,改进后的Rebalance流程如下图:

Incremental Cooperative Rebalancing

那如何使用Incremental Cooperative Rebalancing呢?通过配置项partition.assignment.strategy进行配置,可以配置多个,越靠前优先级越高。前面提到了Rebalance协议分两部分,这里配置的其实就是客户端“Client Embedded Protocol”的实现类。2.8版本中已经支持的有:

  • org.apache.kafka.clients.consumer.RangeAssignor(默认值)
  • org.apache.kafka.clients.consumer.RoundRobinAssignor
  • org.apache.kafka.clients.consumer.StickyAssignor
  • org.apache.kafka.clients.consumer.CooperativeStickyAssignor

我们也可以通过实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口来实现自定义的Assignor。如果想使用Incremental Cooperative Rebalancing,就配置最后一个CooperativeStickyAssignor即可。不同Assignor的细节本文就不展开了,另外规划了一篇文章《Kafka的消费者分区分配策略》。更多关于Incremental Cooperative Rebalancing的细节,可以参考本文引用部分的文章:

总结

Kafka中的Rebalance本质上是解决分布式环境中资源分配的一种通用协议,由于分布式环境的复杂性,无法实现一个完美的方案,只能根据具体的场景进行有针对性的优化。比如实际中“闪断”是引起Rebalance的一种很常见且无法避免的原因,所以就有针对性的增加了Static Membership方案。另外Rebalance很严重的一个问题就是会“Stop The World”,然而实际中Rebalance的时候其实往往只需要变更极少量的资源所属权,所以就提出了Incremental Cooperative Rebalance方案,减少了Rebalance过程中“Stop The World”的时间和影响范围。好的架构不是设计出来的,而是进化而来的,Kafka Rebalance优化的脚步仍在继续。

另外,尽管现在已经做了诸多优化,效果也比较明显,但Rebalance仍然算是一个代价比较大的操作,实际应用的时候,我们还是要能避免的就避免。

References

]]>
<![CDATA[Kafka的监听地址配置]]> http://niyanchun.com/kafka-listener-config.html 2022-01-22T17:32:00+08:00 2022-01-22T17:32:00+08:00 NYC https://niyanchun.com 本文基于Kafka 2.8.

有时我们会碰到网络是通畅的,但却连不上Kafka,特别是在多网卡环境或者云环境上很容易出现,这个其实和Kafka的监听配置有关系。本文介绍监听相关的配置,目前监听相关的参数主要有下面几个:

  • listeners
  • advertised.listeners
  • listener.security.protocol.map
  • inter.broker.listener.name
  • security.inter.broker.protocol
  • advertised.host.name(历史遗留,已废弃,勿使用)
  • advertised.port(历史遗留,已废弃,勿使用)
  • host.name(历史遗留,已废弃,勿使用)

其中最重要的就是listenersadvertised.listeners:集群启动时监听listeners配置的地址,并将advertised.listeners配置的地址写到Zookeeper里面,作为集群元数据的一部分。我们可以将客户端(生产者/消费者)连接Kafka集群进行操作的过程分成2步:

  1. 通过listeners配置的连接信息(ip/host)连接到某个Broker(broker会定期获取并缓存zk中的元数据信息),获取元数据中advertised.listeners配置的地址信息。
  2. 通过第1步获取的advertised.listeners连接信息和Kafka集群通信(读/写)。

所以在存在内外网隔离的虚拟化环境中(比如Docker、公有云),外部客户端经常会出现可以连接到Kafka(第1步),但发送/消费数据时报连接超时(第2步),就是因为listeners配置的是外网地址,而advertised.listeners配置的却是内网地址。那这几个参数该如何配置呢?

先看连接信息的配置格式:{listener名字}://{HOST/IP}:{PORT}。HOST/IP、PORT很清楚,主要是这个“listener名字”字段。要理解这个得了解listener.security.protocol.map这个配置项:它的用途是配置listener名字和协议的映射(所以它是一个key-value的map),key是“listener名字”,value是“协议名称”,其默认值是“listener名字”和“协议名称”一样。有点绕,举个例子,比如:PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,冒号前面是key,即协议名字;后面是value,即协议名称。listener名字我们可以随便起,而协议名称则是固定可枚举的一个范围。所以如果我们自定义了listener名字,那就需要显式的设置其对应的协议名。

inter.broker.listener.namesecurity.inter.broker.protocol都是用于配置Broker之间通信的,前者配置名称(即listener.security.protocol.map中的key),后者配置协议(即listener.security.protocol.map中的value),默认值是PLAINTEXT。这两个配置项同时只能配置一个。

为什么一个连接要搞这么复杂呢?主要是为了各种不同的场景需求。下面举一个复杂一点的应用场景进行说明。比如我们在一个公有云上面部署了一个Kafka集群,该环境有一个外网地址external_hostname和一个内网地址internal_hostname;且在内部中是无法获取外网地址的(公有云大多都是这样的)。然后想实现内部客户端访问集群时走内部地址,且不需要加密;而外部客户端访问时则走外部地址,且需要加密。要实现这个需求,可以对集群进行如下配置:

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092
inter.broker.listener.name=INTERNAL

其实更进一步,我们还可以通过可选的control.plane.listener.name参数单独定制集群Controller节点与其他Broker节点的连接,那配置信息就变为:

listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROL:SSL
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092,CONTROL://{control_ip}:9094
inter.broker.listener.name=INTERNAL
control.plane.listener.name=CONTROL

最后给出这些配置项的默认值和一些注意事项:

  1. listeners如果不显式的配置,那会监听所有网卡,相当于配置了0.0.0.0。该配置项里面listeners名字和端口都必须是唯一的,不能重复。
  2. advertised.listeners如果不配置,默认使用listeners配置的值。如果listeners也没有显式配置,则使用java.net.InetAddress.getCanonicalHostName()获取的IP地址。如果listeners配置的是0.0.0.0,则必须显式的配置advertised.listeners,因为这个配置项必须是一个具体的地址,不允许是0.0.0.0(因为客户端无法根据这个地址连接到Broker)。另外,advertised.listeners中的端口允许重复。
  3. 对于listenersadvertised.listeners,有多个地址的时候,每一个地址都必须按照{listener名字}://{HOST/IP}:{PORT}格式进行配置,多个地址用英文逗号分隔。
  4. 如果集群所有节点的hostname在客户端和服务端各节点之间可以正确解析,优先使用hostname,而不是IP。因为代码里面使用了java.net.InetAddress.getCanonicalHostName(),有时使用IP会出现访问不通的情况。

总结:listeners地址是用于首次连接的;advertised.listeners的地址是会写到zk里面,客户端通过listeners地址建立连接获取该地址信息,然后通过该地址和集群交互。所以对于客户端,这2个地址必须都是可以访问的才可以。

]]>
<![CDATA[Kafka的Producer]]> http://niyanchun.com/kafka-producer.html 2022-01-16T17:32:00+08:00 2022-01-16T17:32:00+08:00 NYC https://niyanchun.com 上篇文章介绍了Kafka Consumer,这篇文章讨论Kafka Producer。

Kafka Producer流程概述

下面是一个简单的Producer实现:

public class SimpleProducer {

  public static void main(String[] args) {
    Properties config = new Properties();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ProducerConfig.CLIENT_ID_CONFIG, "test-write");
    config.put(ProducerConfig.ACKS_CONFIG, "all");
    config.put(ProducerConfig.RETRIES_CONFIG, 0);
    config.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
    config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * 1024 * 1024);
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    Producer<String, String> producer = new KafkaProducer<>(config);

    for (int i = 0; i < 1000; i++) {
      ProducerRecord<String, String> record = new ProducerRecord<>("test-write", Integer.toString(i));
      producer.send(record, (m, e) -> {
        if (e != null) {
          e.printStackTrace();
        } else {
          System.out.printf("Produced record to topic %s partition [%d] @ offset %d%n",
              m.topic(), m.partition(), m.offset());
        }
      });
    }

    producer.flush();
    producer.close();
  }
}

大概流程就是:

  • 创建KafkaProducer(KafkaConsumer不是线程安全的,但KafkaProducer是线程安全的);
  • 创建ProducerRecord
  • 调用send方法发送record。该方法返回的是一个Future对象,而且也允许指定回调函数。所以要是想同步发送,则调用Future对象的get即可;但一般建议定义回调函数,异步发送;
  • 最后close掉对象,释放资源;

以上便是从用户代码角度看到的Producer的流程,下面从Kafka内部来看。如下图(来自参考部分第一篇文章):

kafka-producer-internal

KafkaProducer内部主要由一个Memory Buffer和一个后台IO线程组成,数据写入流程如下:

  1. 生成ProducerRecord对象,该对象必传的2个参数是topic和message value。
  2. 调用send发送生成的record。
  3. 调用指定的序列化函数序列化key和value,生成字节数组以方便在网络上传输。
  4. 计算分区号。Kafka的分区策略是这样的:如果record中指定了partition字段,则直接使用。如果没有指定,则检查record中是否指定了key,如果有则基于该key计算哈希。如果key也没有指定,则使用一个叫"sticky partition"的策略计算分区号。sticky partition是2.4版本加入的,之前的版本是“round-robin”。改成sticky是因为轮询策略会把一批数据分散到多个batch去,这样batch比较小,批量的效果就不是很好。而sticky实质是个优化版本的轮询,它会等一个batch满了以后,再将record分配给下个batch,具体见KIP-480
  5. 将数据加到按partition分组的mem buffer里面的batch里边。加到batch以后,send方法就返回了。
  6. 后台IO线程从mem buffer里面取一个batch的数据封装成一个request发送给kafka broker。
  7. 后端broker成功接收以后,就会返回RecordMetadata对象,该对象包含主题、分区号、分区内的offset。如果接收失败,就会返回错误,此时如果配置了重试,就会自动重试若干次。重试之后还失败,就会抛出异常。

一些重要配置

学习KafkaProducer的重点之一就在于掌握一些重要参数的使用,下面仅是进行简单介绍,完整的课参考官方文档。

可靠性相关

主要就是acks了,有3个值:

  • ack=0:数据发送后,不需要服务端确认;
  • ack=1:默认值,数据发送后,Leader Partition写成功就返回;
  • ack=all(或者-1),ISR中所有节点的Partition写成功才返回;

关于可靠性我之前已经有单独的文章《Kafka的可靠性》介绍过了,这里就不赘述了。

重试和超时

这部分配置主要影响的是是否会产生重复数据。

  • retries:send失败后的自动重试次数,默认为0. 当大于0的时候,重试可能导致request乱序。
  • max.in.flight.requests.per.connection:一个连接允许同时发送request的个数,超过这个个数会block。一般配合retris使用:当retries设置大于0时,该参数设置为1,仍然可以保证顺序。kafka 1.1版本之前的默认值是1,1.1及之后版本默认值是5.
  • request.timeout.ms:producer等待request响应的最大时间,如果超过这个时间,就会认为发送失败,会开始retry(如果配置了的话)。
  • delivery.timeout.ms:请求发送的上限时间,从send方法返回开始算起。delivery timeout = time delay prior to sending + time to await ack + time for retries.默认值为120000,即2分钟。
  • retries.backoff.ms:重试间隔

Batch

批量的定时达量配置:

  • batch.size:batch的大小,单位是字节。设置为0表示关闭batch功能。
  • linger.ms: the producer will wait for linger.ms time before sending the record. This allows multiple records to be batched together.

batch request发送到broker的条件是:达到batch.size的大小,或者等待时间超过linger.ms。即所谓的定时或者达量。

Memory Buffer

对Producer内存使用的控制:

  • buffer.memory:用于缓存record的内存总大小,单位是字节。超过以后send会阻塞。
  • max.block.ms:send方法允许阻塞的最长时间,超时就会抛出异常。

Compression

  • compression.type:压缩类型,默认为none,表示不压缩。可选的值有:none, gzip, snappy, lz4 or zstd.

压缩这里有几个注意点:

  • 压缩是针对整个batch压缩的,所以batch越大,压缩效率越高。
  • 一般来说一定要保证producer和broker端的压缩方法一致,不然会出现producer压缩发送到broker之后,broker又解压然后按自己的压缩算法重新压缩(CPU资源会使用比较多)。一般比较好的配置策略是producer压缩,broker端保持(即不配置压缩)。

Kafka 0.11版本引入了两个特殊的Producer:idempotent producertransactional producer,下面分别介绍。

Idempotent Producer

解决什么问题

幂等producer主要是为了解决下面这种重试导致数据重复的问题:

duplicate

使用幂等producer的方法是设置:enable.idempotence = true(默认是false)。这个配置等价于下面配置项的组合:

  • acks = all
  • retries = Integer.MAX_VALUE
  • max.in.flight.requests.per.connection = 1 (0.11 >= Kafka < 1.1) OR 5 (Kafka >= 1.1)

也就是幂等producer是为了提供“exactly once”语义。

如何解决

对于每个session,

  • kafka leader broker会给每个producer分配一个唯一的producer id;
  • 每次send之前,producer会给每个record分配一个单调递增的sequence number;
  • kafka broker会记录每个partition上面每个producer id的最大sequence number,对于某个producer,如果收到小的sequence number,就会丢掉这个record。

idempotent

但是注意:

  • 应用层自己的re-send操作导致的重复幂等producer不能去重,因为这个时候sequence number已经变了
  • 只能保证当前session内的exactly once。

如何使用

以下摘自Kafka 2.8 Kafka Producer Javadoc:

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

简单概括就是:

  • 应用层(失败时)不要自己重发,内部有自动尝试;如果有错误导致无限重试时,就shutdown程序来定位问题;
  • 设置enable.idempotence = true(此时retries会自动设置为Integer.MAX_VALUE,用户不要修改)
  • 注意幂等只能在一个session范围内提供幂等性

Transactional Producer

如其名,transactional producer能提供事务保证,使用方式很简单,给当前Producer设置以下两个参数:

  • 设置enable.idempotence = true
  • 设置一个transactional.id,这个id在消费同一个partition的producer里面要是唯一的( it should be unique to each producer instance running within a partitioned application)。

编码模式如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer  producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

这种模式大家很熟悉,和数据库的事务使用基本一样,语义也一致,这里就不赘述了。

另外,transactional producer是不依赖于session的,也就是即使session断了,甚至是进程重启都不影响。前面需要设置的那个transactional.id就是为了在出现问题时,能够跨session恢复。
还有一个注意点就是transactional producer的机制其实还是“预写日志(WAL)”,所以即使失败,数据也可能写入日志了。在DB中,这部分数据client肯定是读不到了,但在kafka里面能不能读到,是由客户端的参数isolation.level决定的,这个配置有2个可选值:

  • read_uncommitted:这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

Transactional Producer提供的也是exactly once的语义保证。

KafkaProducer Class Javadoc

其实Kafka Producer的知识有2个非常好的获取方式都已经在Kafka Producer代码里面了,KafkaProducer类的Javadoc非常详细和全面的讲解了Kafka Producer,其onSend方法则可以看到发送的全流程,想深入了解的,可以从这2个途径获得更详细的信息。下面摘抄了kafka 2.8 KafkaProducer类完整的Javadoc,本文很多内容可以从里面找到出处和说明,完整内容如下(在线地址:Kafka 2.8 KafkaProducer Javadoc):

A Kafka client that publishes records to the Kafka cluster.
The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value pairs.

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("linger.ms", 1);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
​
 Producer  producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
   producer.send(new ProducerRecord ("my-topic", Integer.toString(i), Integer.toString(i)));
​
 producer.close();

The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.

The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.

The acks config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.

If the request fails, the producer can automatically retry, though since we have specified retries as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on message delivery semantics for details).

The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by the batch.size config. Making this larger can result in more batching, but requires more memory (since we will generally have one of these buffers for each active partition).

By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you want to reduce the number of requests you can set linger.ms to something greater than 0. This will instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that records that arrive close together in time will generally batch together even with linger.ms=0 so under heavy load batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more efficient requests when not under maximal load at the cost of a small amount of latency.

The buffer.memory controls the total amount of memory available to the producer for buffering. If records are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is exhausted additional send calls will block. The threshold for time to block is determined by max.block.ms after which it throws a TimeoutException.

The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes. You can use the included org.apache.kafka.common.serialization.ByteArraySerializer or org.apache.kafka.common.serialization.StringSerializer for simple string or byte types.
From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.

To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.

To take advantage of the idempotent producer, it is imperative to avoid application level re-sends since these cannot be de-duplicated. As such, if an application enables idempotence, it is recommended to leave the retries config unset, as it will be defaulted to Integer.MAX_VALUE. Additionally, if a send(ProducerRecord) returns an error even with infinite retries (for instance if the message expires in the buffer before being sent), then it is recommended to shut down the producer and check the contents of the last produced message to ensure that it is not duplicated. Finally, the producer can only guarantee idempotence for messages sent within a single session.

To use the transactional producer and the attendant APIs, you must set the transactional.id configuration property. If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on. Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.

The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application. As such, it should be unique to each producer instance running within a partitioned application.

All the new transactional APIs are blocking and will throw exceptions on failure. The example below illustrates how the new APIs are meant to be used. It is similar to the example above, except that all 100 messages are part of a single transaction.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer  producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
​
producer.initTransactions();
​
try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.

The transactional producer uses exceptions to communicate error states. In particular, it is not required to specify callbacks for producer.send() or to call .get() on the returned Future: a KafkaException would be thrown if any of the producer.send() or transactional calls hit an irrecoverable error during a transaction. See the send(ProducerRecord) documentation for more details about detecting errors from a transactional send.

By calling producer.abortTransaction() upon receiving a KafkaException we can ensure that any successful writes are marked as aborted, hence keeping the transactional guarantees.

This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support certain client features. For instance, the transactional APIs need broker versions 0.11.0 or later. You will receive an UnsupportedVersionException when invoking an API that is not available in the running broker version.

总结

KafkaConsumer是线程不安全的,所以之前介绍KafkaConsumer的文章重点在offset的控制,特别是多线程消费的时候。但KafkaProducer是线程安全的,所以他的重点和难点不在于多线程,而在于如何保证数据发送时的可靠性、效率、语义保证等。KafkaProducer提供了大量的配置,可以让用户根据自己的场景进行配置。还提供了幂等Producer和事务Producer,这二者目的都是为了提供exactly once语义保证,而且事务Producer还可以提供跨session的事务(当然性能也会比较差)。但是我们需要清楚,这些保证都是有条件的,比如应用层用户自己的重发是无法保证的,当然还有其它情况。所以和上篇文章中提到的一样,如果想实现端到端的exactly once,光靠kafka是不行的,需要全局考虑。

参考

]]>
<![CDATA[Kafka的多线程消费者实现]]> http://niyanchun.com/kafka-multi-thread-consumer.html 2022-01-09T14:18:00+08:00 2022-01-09T14:18:00+08:00 NYC https://niyanchun.com Kafka的消费者类KafkaConsumer是非线程安全的,那如何实现多线程的Consumer呢?先了解一下一般Consumer的流程。

1.jpg

如上图:

  1. 通过poll方法从kafka集群拉取数据;
  2. 处理数据
  3. 提交offset(如果开启了自动提交enable.auto.commit=true,则每次poll的时候会自动提交上一次poll的offset)

如此往复。翻译成代码类似下面这样:

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
        ...

    // 创建consumer实例(非线程安全)
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    // 订阅主题
    consumer.subscribe(Collections.singletonList("test"));
    while (true) {
      // 拉取数据
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000));
      // 处理数据
      for (ConsumerRecord<String, String> record : records) {
        ...
      }
      // 提交offset
      consumer.commitSync();
    }

目前kafka consumer的多线程方案常见的有两种:

  1. Thread per consumer model:即每个线程都有自己的consumer实例,然后在一个线程里面完成数据的获取(poll)、处理(process)、offset提交。
  2. Multi-threaded consumer model:一个线程(也可能是多个)专门用于获取数据,另外一组线程专门用于处理。这种模型没有统一的标准。

下面分别介绍。

Thread-Per-Consumer Model

这种多线程模型是利用Kafka的topic分多个partition的机制来实现并行:每个线程都有自己的consumer实例,负责消费若干个partition。各个线程之间是完全独立的,不涉及任何线程同步和通信,所以实现起来非常简单,使用也最多,像Flink里面用的就是这种模型。比如下面是2个线程消费5个分区的示例图:

2.jpg

用代码实现起来的思路是:先确定线程数,然后将分区数平均分给这些线程。下面是一个示例代码(完整代码见这里 github):

/**
 * @author NiYanchun
 **/
public class ThreadPerConsumerModel {
  public static void main(String[] args) throws InterruptedException {
    Properties config = new Properties();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    final String topic = "test";
    int partitionNum = getPartitionNum(topic);
    final int threadNum = 4;
    int partitionNumPerThread = (partitionNum <= threadNum) ? 1 : partitionNum / threadNum + 1;

    ExecutorService threadPool = Executors.newFixedThreadPool(partitionNum);
    List<TopicPartition> topicPartitions = new ArrayList<>(partitionNumPerThread);
    for (int i = 0; i < partitionNum; i++) {
      topicPartitions.add(new TopicPartition(topic, i));
      if ((i + 1) % partitionNumPerThread == 0) {
        threadPool.submit(new Task(new ArrayList<>(topicPartitions), config));
        topicPartitions.clear();
      }
    }
    if (!topicPartitions.isEmpty()) {
      threadPool.submit(new Task(new ArrayList<>(topicPartitions), config));
    }

    threadPool.shutdown();
    threadPool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
  }

  static class Task implements Runnable {
    private final List<TopicPartition> topicPartitions;
    private final Properties config;

    public Task(List<TopicPartition> topicPartitions, Properties config) {
      this.topicPartitions = topicPartitions;
      this.config = config;
    }

    @Override
    public void run() {
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config);
      System.out.println(topicPartitions);
      consumer.assign(topicPartitions);
      try {
        while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000));
          for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
          }
        }
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        consumer.close();
      }
    }
  }
  
  public static int getPartitionNum(String topic) {
    return 8;
  }
}

上面的代码仅是作为原理说明,所以有些处理都简化或者硬编码了,核心逻辑就是将分区数均分给所有线程,这里不再赘述。

这种模型最大的优点是简单,但并非完美。在这种模型里面,获取数据、处理数据都是在一个线程里面的,如果在处理流程需要耗费很长时间,甚至是不可控的时候就会产生问题。会有什么问题?Kafka有一个参数**max.poll.interval.ms**,它表示的是两次poll调用的最大时间间隔。如果超过这个时间间隔,kafka就会认为这个consumer已经挂了,就会触发consumer group rebalance。这个参数默认值是5分钟,也就是如果某次处理超过5分钟,那就可能导致rebalance。Rebalance有什么问题呢?问题很大,对于Rebalance以后再介绍,这里先简单概括一下:

  1. 引发Consumer Group Rebalance操作主要有3种情况:(1)consumer数量发生变化(2)consumer订阅的主题发生变化(2)主题的分区数发生变化
  2. Rebalance是一个比较重的操作,它需要consumer group的所有活着的consumer参与,当consumer比较多的时候,rebalance会很耗时(比如若干小时);而且在没有完成之前,大家都是干不了活的,体现在业务上就是停止处理了。
  3. Rebalance其实就是重新划分partition,如果是自己提交offset的话,处理不好,就可能产生重复数据,后面再说。

总之,Rebalance操作能避免应该尽可能避免,特别是因为编码不合理产生的应该坚决改掉。为了避免处理时间超过这个最大时间间隔,在仍然使用这种模型的前提下,一般可以通过调整下面的参数一定程度的解决问题:

  1. 控制每次poll的数据不要太大,即修改max.poll.records参数的值,默认是500,即在给定时间内一次最多poll 500个record。
  2. max.poll.interval.ms的值改大一些。

但有些场景通过改上面的2个参数也无法解决,而且可能还挺常见,举个例子,比如消费业务数据,处理后写入到外部存储。如果外部存储挂了,在没有恢复之前一般是不应该继续消费Kafka数据的。此时通过调整max.poll.interval.ms的方法就失效了,因为事先是完全不知道应该设置多少的。

另外一个多线程模型可以解决上面这些问题,但会复杂很多。

Multi-Thread Consumer Model

先看下模型的图(注意:多线程模型的设计方式没有统一标准,下面这种只是其中一种而已):

3.jpg

具体处理流程为:poll线程专门负责拉取数据,然后将数据按partition分组,交给处理线程池,每个线程一次只处理一个分区的数据。数据交给处理线程后,poll继续拉取数据。现在有2个问题:

  1. 如何像Thread-Per-Consumer那样,保证一个分区里面的数据有序
  2. 如何提交offset

数据有序性

要保证partition内数据有序,只要避免多个线程并行处理同一个partition的数据即可。在poll线程给线程池分发数据的时候,已经按partition做了分组,也就是保证了一次拉取的数据中同一个partition的数据只会分配给一个线程。现在只要保证分区数据处理完成之前不再拉取该分区的数据,就可以保证数据的有序了。KafkaConsumer类提供了一个pauseresume方法,参数都是分区信息的集合:

public void pause(Collection<TopicPartition> partitions)
public void resume(Collection<TopicPartition> partitions)

调用pause方法后,后续poll的时候,这些被pause的分区不会再返回任何数据,直到被resume。所以,可以在本次partition的数据处理完成之前,pause该partition。

Offset提交

很显然,poll线程和处理线程解耦异步以后,就不能使用自动提交offset了,必须手动提交,否则可能offset提交了,但数据其实还没有处理。提交是poll线程做的,而offset的值则是处理线程才知道的,两者之前需要一个信息传递机制。代码实现的时候需要考虑。

这里插一个关于offset提交的话题。我们知道有2种方式提交offset:

  • 自动提交:将enable.auto.commit设置为true即可;每次poll的时候会自动提交上一次的offset。
  • 手动提交:分为同步提交commitSync和异步提交commitAsync。同步就是阻塞式提交,异步就是非阻塞式。而且二者都支持指定具体partition的offset,也就是可以精细化的自定义offset。

这里不展开介绍具体细节,只讨论一个问题:offset提交和消息传递语义(Message Delivery Semantics)的关系。Kafka提供了3种消息传递保证:

  • at most once:最多一次,即不会产生重复数据,但可能会丢数据
  • at least once:至少一次,即可能会产生重复数据,但不会丢数据
  • exactly once:准确的一次,不多也不少

首先自动提交offset提供的是at least once的保证,因为他是在poll的时候提交上次数据的offset,也就是如果处理本次poll拉取的数据的时候异常了,导致没有执行到下次poll,那这次这些数据的offset就无法提交了,但数据可能已经处理了一部分了。要实现最多一次也很简单,每次poll完数据,先提交offset,提交成功之后再开始处理即可。而要实现exactly once很难,而且这里的exactly once其实仅指消费,但我们一般想要的是全系统数据链上的exactly once。这个问题比较复杂,要实现真正的端到端exactly once,可以去查查flink、dataflow的设计,需要有很多假设和权衡,这里就不展开了。

重点说明一种情况,考虑这样一个流程:poll数据,处理数据,手动同步提交offset,然后再poll。当然也可能会有变种,就是边处理数据,边提交offset,但实质一样。这种情况会不会产生数据重复?答案是:会。其实不管你用同步提交,还是异步提交,都可能产生数据重复。因为系统随时可能因为其它原因产生Rebalance。所以要真正避免重复数据,正确的方式应该是实现ConsumerRebalanceListener接口,监听Rebalance的发生。当监听到发生Rebalance导致当前consumer的partition发生变化时,同步提交offset。但即使这样,其实也不能严格避免,因为系统还可能在任意阶段挂掉。因此这种流程实际提供的也是at least once的保证。所以这里想要表达的意思是,如果你的系统不能容忍有重复数据(在不丢数据的前提下),光靠kafka是做不到的,这里通过一些编码优化只能减少发生的概率,无法杜绝。终极方案应该是在应用层解决。比如幂等设计、使用数据的时候去重等。

代码实现

代码实现分为poll线程的实现(PollThread.java)、处理线程的实现(ProcessThread.java)、测试类Main.java。完整的代码见github。下面对关键地方进行说明。

先看主流程:

// PollThread.java    
@Override
public void run() {
    try {
        // 主流程
        while (!stopped.get()) {
            // 消费数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10_000));
            // 按partition分组,分发数据给处理线程池
            handleFetchedRecords(records);
            // 检查正在处理的线程
            checkActiveTasks();
            // 提交offset
            commitOffsets();
        }

    } catch (WakeupException we) {
        if (!stopped.get()) {
            throw we;
        }
    } finally {
        consumer.close();
    }
}

主流程比较清晰:

  1. poll数据
  2. 按partition分组,分发数据给处理线程池
  3. 检查正在处理的线程
  4. 提交offset

poll数据没什么可说的,看下handleFetchedRecords

// PollThread.java
private void handleFetchedRecords(ConsumerRecords<String, String> records) {
    if (records.count() > 0) {
        List<TopicPartition> partitionsToPause = new ArrayList<>();
        // 按partition分组
        records.partitions().forEach(partition -> {
            List<ConsumerRecord<String, String>> consumerRecords = records.records(partition);
            // 提交一个分区的数据给处理线程池
            ProcessThread processThread = new ProcessThread(consumerRecords);
            processThreadPool.submit(processThread);
            // 记录分区与处理线程的关系,方便后面查询处理状态
            activeTasks.put(partition, processThread);
        });
        // pause已经在处理的分区,避免同个分区的数据被多个线程同时消费,从而保证分区内数据有序处理
        consumer.pause(partitionsToPause);
    }
}

该方法里面主要做了下面几件事情:

  1. 按partition将数据分组,然后每个分组交给线程池去处理
  2. 记录分区和处理线程的关系,因为后面要查询处理的状态
  3. 将正在处理的分区pause,即这些分区在后续poll中不会返回数据,直到被resume。原因前面说了,如果继续消费,可能会有多个线程同时处理一个分区来的数据,导致分区内数据的处理顺序无法保证。

接下来,先看下处理线程的实现,代码量不大,就全贴上了:

@Slf4j
public class ProcessThread implements Runnable {
  private final List<ConsumerRecord<String, String>> records;
  private final AtomicLong currentOffset = new AtomicLong();
  private volatile boolean stopped = false;
  private volatile boolean started = false;
  private volatile boolean finished = false;
  private final CompletableFuture<Long> completion = new CompletableFuture<>();
  private final ReentrantLock startStopLock = new ReentrantLock();

  public ProcessThread(List<ConsumerRecord<String, String>> records) {
    this.records = records;
  }

  @Override
  public void run() {
    startStopLock.lock();
    try {
      if (stopped) {
        return;
      }
      started = true;
    } finally {
      startStopLock.unlock();
    }

    for (ConsumerRecord<String, String> record : records) {
      if (stopped) {
        break;
      }
      // process record here and make sure you catch all exceptions;
      currentOffset.set(record.offset() + 1);
    }
    finished = true;
    completion.complete(currentOffset.get());
  }

  public long getCurrentOffset() {
    return currentOffset.get();
  }

  public void stop() {
    startStopLock.lock();
    try {
      this.stopped = true;
      if (!started) {
        finished = true;
        completion.complete(currentOffset.get());
      }
    } finally {
      startStopLock.unlock();
    }
  }

  public long waitForCompletion() {
    try {
      return completion.get();
    } catch (InterruptedException | ExecutionException e) {
      return -1;
    }
  }

  public boolean isFinished() {
    return finished;
  }
}

代码逻辑比较简单,很多设计都是在实现“如何优雅停止处理线程”的情况,这里简单说明一下。不考虑进程停止这种情况的话,其实需要停止处理线程的一个主要原因就是可能发生了rebalance,这个分区给其它consumer了。这个时候当前consumer处理线程优雅的停止方式就是完成目前正在处理的那个record,然后提交offset,然后停止。注意是完成当前正在处理的record,而不是完成本次分给该线程的所有records,因为处理完所有的records可能需要的时间会比较长。前面在offset提交的时候已经提到过,手动提交offset的时候,要想处理好rebalance这种情况,需要实现ConsumerRebalanceListener接口。这里PollThread实现了该接口:

@Slf4j
public class PollThread implements Runnable, ConsumerRebalanceListener {
  private final KafkaConsumer<String, String> consumer;
  private final Map<TopicPartition, ProcessThread> activeTasks = new HashMap<>();
  private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
    
  // 省略其它代码
    
   @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // 1. stop all tasks handling records from revoked partitions
    Map<TopicPartition, ProcessThread> stoppedTask = new HashMap<>();
    for (TopicPartition partition : partitions) {
      ProcessThread processThread = activeTasks.remove(partition);
      if (processThread != null) {
        processThread.stop();
        stoppedTask.put(partition, processThread);
      }
    }

    // 2. wait for stopped task to complete processing of current record
    stoppedTask.forEach((partition, processThread) -> {
      long offset = processThread.waitForCompletion();
      if (offset > 0) {
        offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
      }
    });

    // 3. collect offsets for revoked partitions
    Map<TopicPartition, OffsetAndMetadata> revokedPartitionOffsets = new HashMap<>();
    for (TopicPartition partition : partitions) {
      OffsetAndMetadata offsetAndMetadata = offsetsToCommit.remove(partition);
      if (offsetAndMetadata != null) {
        revokedPartitionOffsets.put(partition, offsetAndMetadata);
      }
    }

    // 4. commit offsets for revoked partitions
    try {
      consumer.commitSync(revokedPartitionOffsets);
    } catch (Exception e) {
      log.warn("Failed to commit offsets for revoked partitions!", e);
    }
  }

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    // 如果分区之前没有pause过,那执行resume就不会有什么效果
    consumer.resume(partitions);
  }   
    
}

可以看到:

  • 当检测到有分区分配给当前consumer的时候,就会尝试resume这个分区,因为这个分区可能之前被其它consumer pause过。
  • 当检测到有分区要被回收时,执行了下面几个操作:

    1. 查看activeTasks,看是否有线程正在处理这些分区的数据,有的话调用ProcessThreadstop方法将这些处理线程的stopped标志位设置成true。同时记录找到的这些线程。
    2. ProcessThreadrun方法里面每次循环处理record的时候都会检测上一步置位的这个stopped标志位,从而实现完成当前处理的record后就停止的逻辑。然后这一步就是等待这些线程处理结束,拿到处理的offset值,放到待提交offset队列offsetsToCommit里面。
    3. 从待提交队列里面找到要被回收的分区的offset,放到revokedPartitionOffsets里面。
    4. 提交revokedPartitionOffsets里面的offset。

简单总结的话,当收到有某些partition要从当前consumer回收的消息的时候,就从处理线程里面找到正在处理这些分区的线程,然后通知它们处理完正在处理的那一个record之后就退出吧。当然退出的时候需要返回已处理数据的offset,然后PollThread线程提交这些分区的offset。

然后继续回到主流程,之前讨论到了handleFetchedRecords,主要是按partition分区,并将分区数据分发给处理线程,并将这些partition pause。然后看接下来checkActiveTasks

private void checkActiveTasks() {
    List<TopicPartition> finishedTasksPartitions = new ArrayList<>();
    activeTasks.forEach((partition, processThread) -> {
        if (processThread.isFinished()) {
            finishedTasksPartitions.add(partition);
        }
        long offset = processThread.getCurrentOffset();
        if (offset > 0) {
            offsetsToCommit.put(partition, new OffsetAndMetadata(offset));
        }
    });

    finishedTasksPartitions.forEach(activeTasks::remove);
    consumer.resume(finishedTasksPartitions);
}

该方法的主要作用是:找到已经处理完成的partition,并获取对应的offset,放到待提交队列里面。同时resume这些partition。

最后就是commitOffsets了:

private void commitOffsets() {
    try {
        long currentMillis = System.currentTimeMillis();
        if (currentMillis - lastCommitTime > 5000) {
            if (!offsetsToCommit.isEmpty()) {
                consumer.commitSync(offsetsToCommit);
                offsetsToCommit.clear();
            }
            lastCommitTime = currentMillis;
        }
    } catch (Exception e) {
        log.error("Failed to commit offsets!", e);
    }
}

这个方法比较简单,就是按一定的时间间隔同步提交待提交队列里面的offset。

至此,该模型的整体流程就结束了。其实这个模型还隐式的实现了限流:如果后端线程都在处理了,虽然poll线程还会继续poll,但这些partition因为被pause了,所以不会真正返回数据了。这个功能在将poll和处理解耦,分到不同线程以后是很重要的。如果没有限流且后面处理比较慢的话,如果限制了poll和线程池之间传递records的队列大小,那poll的数据要么丢掉,要么就等待,等待的话又会碰到之前可能会超max.poll.interval.ms的问题。但如果不限制的话,队列就会一直变大,最后OOM。

总结

从提供的功能来看,两种多线程模型都实现了基本的功能:

  1. 多线程处理,提高并发
  2. 提供partition内数据有序保证
  3. 提供 at least once语义保证

但第二种模型可以更好的应对处理流程比较慢的需求场景,之所以要处理这种场景根本原因其实还是kafka的max.poll.interval.ms机制,也就是我们不能无限期的阻塞poll调用。另外,如果不需要提供partition内数据有序的话,可以对模型进行改造,改成纯基于数据拆分的多线程模式。目前的实现其实和Thread-Per-Consumer一样,都是基于partition的拆分、并发,只不过将流程从同步改成了异步而已。

总体而言,两种模型各有利弊,但如果Thread-Per-Consumer模型能满足需求的话,肯定是应该优先使用的,简单明了。毕竟在软件世界,在满足需求的前提下,系统越简单越好。如无必要,勿增实体。

参考:

]]>
<![CDATA[Kafka的存储]]> http://niyanchun.com/kafka-storage.html 2020-10-31T15:07:00+08:00 2020-10-31T15:07:00+08:00 NYC https://niyanchun.com Kafka的存储层级概念上比较简单,一个topic分为若干partition,一个partition再分为若干segment。下图是一个示例:

topic是个逻辑概念,partition和segment则是真实存储数据的:一个partition对应磁盘上面一个目录,一个segment对应partition目录下的一个日志文件,消息数据就是以append-only的方式顺序写入segment文件

看个例子,创建一个有2个分区,1个副本的topic:

➜ bin/kafka-topics.sh \                                                             
--zookeeper localhost:2181/kafka_26 \
--create \
--topic test \  
--partitions 2  \
--replication-factor 1 \
--config segment.bytes=1024    # 这个选项后文会介绍

创建完之后,看下数据目录:

# 创建topic之前的数据目录(该集群还没有topic)
➜  kafka_2.13-2.6.0 ll data 
total 4.0K
-rw-r--r--. 1 root root  0 Oct 31 17:43 cleaner-offset-checkpoint
-rw-r--r--. 1 root root  0 Oct 31 17:43 log-start-offset-checkpoint
-rw-r--r--. 1 root root 90 Oct 31 17:43 meta.properties
-rw-r--r--. 1 root root  0 Oct 31 17:43 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root  0 Oct 31 17:43 replication-offset-checkpoint

# 创建后
➜ ll -R data/
data:
total 8.0K
-rw-r--r--. 1 root root   0 Oct 31 17:43 cleaner-offset-checkpoint
-rw-r--r--. 1 root root   0 Oct 31 17:43 log-start-offset-checkpoint
-rw-r--r--. 1 root root  90 Oct 31 17:43 meta.properties
-rw-r--r--. 1 root root   0 Oct 31 17:43 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root  22 Oct 31 17:47 replication-offset-checkpoint
drwxr-xr-x. 2 root root 141 Oct 31 17:47 test-0
drwxr-xr-x. 2 root root 141 Oct 31 17:47 test-1

# 目录内容
data/test-0:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root   0 Oct 31 17:47 00000000000000000000.log
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

data/test-1:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root   0 Oct 31 17:47 00000000000000000000.log
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

可以看到,创建test这个topic之后,多出来了两个目录,test-0和test-1,这两个就是partition对应的物理目录。Partition的目录格式为:{topic名}-{序号},序号从0开始。Partition目录的文件格式是一样的,初始状态有四个文件:

重点要说的是00000000000000000000.log这个文件,这个文件就是顺序写消息数据的,关于这个文件有下面几个关键点:

  • 这样1个文件就是1个segment;
  • segment文件是二进制编码的,人眼无法读,但可以借助工具查看内容;
  • segment文件会根据大小滚动产生新的;
  • 前面的数字(即文件名)是当前segment中第1个消息的起始offset。每个partition都有自己的offset,从0开始,每写入一条消息,就加1。

结合一些具体例子看一下。首先看下segment文件的构成,写入两条数据:

# 写入两条数据
➜ bin/kafka-console-producer.sh --broker-list localhost:9092  --topic test   
>hello world
>bye world
# 查看数据目录,可以看到数据分别写到了两个partition下面。Kafka会自己均衡负载
➜  kafka_2.13-2.6.0 ll -R data/test*                        
data/test-0:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root  79 Oct 31 17:48 00000000000000000000.log    # 有数据了
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

data/test-1:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root  77 Oct 31 17:48 00000000000000000000.log # 有数据了
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

# 直接查看segment文件
➜  kafka_2.13-2.6.0 cat data/test-0/00000000000000000000.log 
C;��ux�aZux�aZ��������������"hello world#                                                                                                 
root@NYC-DEV ➜  kafka_2.13-2.6.0 cat data/test-1/00000000000000000000.log
AM,��ux�reux�re��������������bye world# 

从上面可以看到,直接查看segment文件有乱码,因为是二进制的,但消息内容还是能够看出来的(如果我们开启了压缩的话,消息部分也是无法看的)。Kafka提供了查看segment的工具kafka-run-class.sh:

➜ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files data/test-0/00000000000000000000.log
Dumping data/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1604051296602 size: 79 magic: 2 compresscodec: NONE crc: 1003684042 isvalid: true | offset: 0 CreateTime: 1604051296602 keysize: -1 valuesize: 11 sequence: -1 headerKeys: [] payload: hello world

➜ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files data/test-1/00000000000000000000.log
Dumping data/test-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1604051300965 size: 77 magic: 2 compresscodec: NONE crc: 1294785255 isvalid: true | offset: 0 CreateTime: 1604051300965 keysize: -1 valuesize: 9 sequence: -1 headerKeys: [] payload: bye world

可以看到除了数据外,还有很多元数据信息。然后接着多写入一些数据,观察一下segment的切换。默认1个segment文件达到1GB(server.properties中的log.segment.bytes=1073741824)以后才会切换新的,为了演示方便,创建topic的时候我把这个配置设置成了1024(字节),也就是超过1KB就会切换,可以通过命令看下现在这个topic的配置:

➜ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name test --describe
Dynamic configs for topic test are:
  segment.bytes=1024 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1024, DEFAULT_CONFIG:log.segment.bytes=1073741824}

写入较多数据后,查看数据目录:

➜  kafka_2.13-2.6.0 ll data/test*
data/test-0:
total 21M
-rw-r--r--. 1 root root   0 Oct 31 18:09 00000000000000000000.index
-rw-r--r--. 1 root root 879 Oct 31 18:09 00000000000000000000.log
-rw-r--r--. 1 root root  12 Oct 31 18:09 00000000000000000000.timeindex
-rw-r--r--. 1 root root   0 Oct 31 18:10 00000000000000000005.index
-rw-r--r--. 1 root root 935 Oct 31 18:10 00000000000000000005.log
-rw-r--r--. 1 root root  10 Oct 31 18:09 00000000000000000005.snapshot
-rw-r--r--. 1 root root  12 Oct 31 18:10 00000000000000000005.timeindex
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.index
-rw-r--r--. 1 root root 315 Oct 31 18:10 00000000000000000013.log
-rw-r--r--. 1 root root  10 Oct 31 18:10 00000000000000000013.snapshot
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.timeindex
-rw-r--r--. 1 root root   8 Oct 31 18:05 leader-epoch-checkpoint

data/test-1:
total 21M
-rw-r--r--. 1 root root   0 Oct 31 18:09 00000000000000000000.index
-rw-r--r--. 1 root root 877 Oct 31 18:09 00000000000000000000.log
-rw-r--r--. 1 root root  12 Oct 31 18:09 00000000000000000000.timeindex
-rw-r--r--. 1 root root   0 Oct 31 18:10 00000000000000000005.index
-rw-r--r--. 1 root root 936 Oct 31 18:10 00000000000000000005.log
-rw-r--r--. 1 root root  10 Oct 31 18:09 00000000000000000005.snapshot
-rw-r--r--. 1 root root  12 Oct 31 18:10 00000000000000000005.timeindex
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.index
-rw-r--r--. 1 root root 352 Oct 31 18:10 00000000000000000013.log
-rw-r--r--. 1 root root  10 Oct 31 18:10 00000000000000000013.snapshot
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.timeindex
-rw-r--r--. 1 root root   8 Oct 31 18:05 leader-epoch-checkpoint

可以看到,已经多出了两个segment:00000000000000000005.log00000000000000000013.log。前面说segment文件名里面的序号是当前segment里面第一个消息的offset。这里以00000000000000000005.log为例验证一下:

# 00000000000000000005.log文件
➜ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files data/test-0/00000000000000000005.log
Dumping data/test-0/00000000000000000005.log
Starting offset: 5
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1604052567547 size: 200 magic: 2 compresscodec: NONE crc: 1854811761 isvalid: true | offset: 5 CreateTime: 1604052567547 keysize: -1 valuesize: 130 sequence: -1 headerKeys: [] payload: a long long long long long long long long long long long long long long long long long long long long long long long long string 9
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 200 CreateTime: 1604052614272 size: 105 magic: 2 compresscodec: NONE crc: 4253105822 isvalid: true | offset: 6 CreateTime: 1604052614272 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 12
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 305 CreateTime: 1604052625276 size: 105 magic: 2 compresscodec: NONE crc: 1470066054 isvalid: true | offset: 7 CreateTime: 1604052625276 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 14
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 410 CreateTime: 1604052628456 size: 105 magic: 2 compresscodec: NONE crc: 992483528 isvalid: true | offset: 8 CreateTime: 1604052628456 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 16
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 515 CreateTime: 1604052631303 size: 105 magic: 2 compresscodec: NONE crc: 799335592 isvalid: true | offset: 9 CreateTime: 1604052631303 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 18
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 620 CreateTime: 1604052634136 size: 105 magic: 2 compresscodec: NONE crc: 2893899522 isvalid: true | offset: 10 CreateTime: 1604052634136 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 20
baseOffset: 11 lastOffset: 11 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 725 CreateTime: 1604052636553 size: 105 magic: 2 compresscodec: NONE crc: 901773285 isvalid: true | offset: 11 CreateTime: 1604052636553 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 22
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 830 CreateTime: 1604052639396 size: 105 magic: 2 compresscodec: NONE crc: 3365784921 isvalid: true | offset: 12 CreateTime: 1604052639396 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 24

可以看到该文件的第一个offset是5,最后一个offset为12,所以当前segment文件是00000000000000000005.log;下个文件的第一个offset就是13,也就是后面的00000000000000000013.log文件。

kafka的log文件都是以appen-only的方式写的,无法修改。所以把partition分成很多个大小可控的segment会带来很多好处,比如数据的滚动、删除、查找等涉及物理层的操作底层都是以segment为单位的,这个也很容易理解。但是即使拆分后,对于1GB(默认)的文件进行高效搜索也是比较麻烦的,而指定offset进行消费又是使用非常普遍的情况。为了解决这个问题,kafka为每个segment创建一个索引文件,就是那个.index后缀的文件,该文件会存储每个offset在segment文件中的实际物理偏移量。这样当指定消费某个offset的数据时,可以先通过segment的文件名找到该offset对应的segment文件,然后再通过index文件直接定位到数据在该segment中的物理位置。相当于通过segment文件名和index文件给所有数据构造了一个二级索引,给定任何offset,通过两步即可定位到实际存储的位置。

最后就是数据删除,删除是以topic为单位,删除时会先标记,然后定期真正物理删除。比如下面的索引已经被标记为删除:

➜ ll data/test* 
total 20K
drwxr-xr-x. 2 root root 141 Oct 31 16:33 test-0.a897c64b91b8419d86426c1817fcf650-delete
drwxr-xr-x. 2 root root 141 Oct 31 16:33 test-1.0767a8ede4314439accbf67039089804-delete

最后总结一下,

  1. Kafka的topic逻辑上包含了若干个partition,每个partition对应磁盘上一个目录;写数据的时候就是以append-only的方式在partition目录写log文件。
  2. 为了防止文件过大不利于搜索和管理,又按照大小(默认1G)进行滚动,生成多个文件,每个文件称为一个segment。segment的文件名就是该文件里面第1个offset的值。
  3. 为了高效的根据offset进行消费,给每个segment文件生成了一个索引文件,索引里面记录了该segment里面每个offset对应的实际偏移量。
]]>
<![CDATA[Kafka的扩容和缩容]]> http://niyanchun.com/expand-or-decommissioning-kafka-cluster.html 2020-10-29T00:09:00+08:00 2020-10-29T00:09:00+08:00 NYC https://niyanchun.com 本文讨论Kafka的扩缩容以及故障后如何“补齐”分区,本文的操作基于Kafka 2.6版本。

扩容

扩容也就是新增节点,扩容后老的数据不会自动迁移,只有新创建的topic才可能会分配到新增的节点上面。如果我们不需要迁移旧数据,那直接把新的节点启动起来就行了,不需要做额外的操作。但有的时候,新增节点后,我们会将一些老数据迁移到新的节点上,以达到负载均衡的目的,这个时候就需要手动操作了。Kafka提供了一个脚本(在bin目录下):kafka-reassign-partitions.sh,通过这个脚本可以重新分配分区的分布。脚本的使用比较简单,提供一个JSON格式的分配方案,然后传给脚本,脚本根据我们的分配方案重新进行平衡。

举个例子,假如现在集群有181、182两个broker,上面有4个topic:test-1,test-2,test-3,test-4,这些topic都有4个分区,2个副本,如下:

# 两个broker
[zk: localhost:2181(CONNECTED) 0] ls /kafka_26/brokers/ids
[181, 182]

# 4个topic
➜ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka_26
__consumer_offsets
test-1
test-1
test-3
test-4
# test-1
➜  bin/kafka-topics.sh --describe --topic test-1  --zookeeper localhost:2181/kafka_26
Topic: test-1   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-1   Partition: 0    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-1   Partition: 1    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-1   Partition: 2    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-1   Partition: 3    Leader: 182     Replicas: 182,181       Isr: 182,181
# test-2
➜  bin/kafka-topics.sh --describe --topic test-2  --zookeeper localhost:2181/kafka_26
Topic: test-2   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-2   Partition: 0    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-2   Partition: 1    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-2   Partition: 2    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-2   Partition: 3    Leader: 182     Replicas: 182,181       Isr: 182,181
# test-3
➜  bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26
Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-3   Partition: 1    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-3   Partition: 2    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-3   Partition: 3    Leader: 182     Replicas: 182,181       Isr: 182,181
# test-4
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26
Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-4   Partition: 1    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-4   Partition: 2    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-4   Partition: 3    Leader: 181     Replicas: 181,182       Isr: 181,182

现在扩容了,新增了两个节点:183和184。扩容后,我们想要把test-3,test-4迁移到183,184上面去。

首先我们可以准备如下JSON格式的文件(假设文件名为topics-to-move.json):

{
    "topics": [
        {
            "topic": "test-3"
        },
        {
            "topic": "test-4"
        }
    ],
    "version": 1
}

里面写明想要重新分配的topic。然后执行如下命令:

➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "183,184" --generate
# 当前分区的分布情况
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[181,182],"log_dirs":["any","any"]}]}
# 建议的分区分布情况
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[183,184],"log_dirs":["any","any"]}]}

可以看到上面的命令会列出当前分区的分布情况,并且会给出一个建议的新分区分配方案,都是JSON格式的,内容也很简单。然后我们将建议的分配方案保存为一个文件(假设文件名为expand-cluster-reassignment.json),当然我们也可以手动修改这个方案,只要格式正确即可。然后执行下面命令使用新的方案进行分区重分配:

➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[181,182],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for test-3-0,test-3-1,test-3-2,test-3-3,test-4-0,test-4-1,test-4-2,test-4-3

这样就提交了重分配的任务,可以使用下面的命令查看任务的执行状态:

➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition test-3-0 is complete.
Reassignment of partition test-3-1 is complete.
Reassignment of partition test-3-2 is complete.
Reassignment of partition test-3-3 is complete.
Reassignment of partition test-4-0 is complete.
Reassignment of partition test-4-1 is complete.
Reassignment of partition test-4-2 is complete.
Reassignment of partition test-4-3 is complete.

Clearing broker-level throttles on brokers 181,182,183,184
Clearing topic-level throttles on topics test-3,test-4

完成后,我们检查一下新的test-3和test-4的分区分配情况:

➜ bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26
Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 183     Replicas: 183,184       Isr: 183,184
        Topic: test-3   Partition: 1    Leader: 184     Replicas: 184,183       Isr: 183,184
        Topic: test-3   Partition: 2    Leader: 183     Replicas: 183,184       Isr: 183,184
        Topic: test-3   Partition: 3    Leader: 184     Replicas: 184,183       Isr: 183,184
        
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26
Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 184     Replicas: 184,183       Isr: 183,184
        Topic: test-4   Partition: 1    Leader: 183     Replicas: 183,184       Isr: 183,184
        Topic: test-4   Partition: 2    Leader: 184     Replicas: 184,183       Isr: 183,184
        Topic: test-4   Partition: 3    Leader: 183     Replicas: 183,184       Isr: 184,183

可以看到,这两个topic的数据已经全部分配到183和184节点上了。

缩容和故障场景

从上面可以看到,其实数据分配完全是由我们自己把控的,缩容也只是数据迁移而已,只需要提供正确的迁移方案即可。一般生产环境很少有缩容的,但有一个场景比较常见,就是某个节点故障了,且无法恢复。以前的文章提到过,节点故障后,这个节点上的分区就丢了,Kafka不会自动在其它可用节点上重新创建一个副本,这个时候就需要我们自己手动在其他可用节点创建副本,原理和扩容是一样的。接着上面的例子,比如现在184节点故障了,且无法恢复了,而test-3和test-4有部分分区是在该节点上面的,自然也就丢了:

# 节点挂了,zk中的节点已经没了
[zk: localhost:2181(CONNECTED) 15] ls /kafka_26/brokers/ids
[181, 182, 183]

# 可以看到ISR中已经没有184了
➜ bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26
Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 183     Replicas: 183,184       Isr: 183
        Topic: test-3   Partition: 1    Leader: 183     Replicas: 184,183       Isr: 183
        Topic: test-3   Partition: 2    Leader: 183     Replicas: 183,184       Isr: 183
        Topic: test-3   Partition: 3    Leader: 183     Replicas: 184,183       Isr: 183
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26
Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 183     Replicas: 184,183       Isr: 183
        Topic: test-4   Partition: 1    Leader: 183     Replicas: 183,184       Isr: 183
        Topic: test-4   Partition: 2    Leader: 183     Replicas: 184,183       Isr: 183
        Topic: test-4   Partition: 3    Leader: 183     Replicas: 183,184       Isr: 183

这个时候,我们准备把test-3原来在184上的分区分配到181上面去,把test-4在184上的分区分配到182上去,那分配方案就是下面这样的:

➜ cat expand-cluster-reassignment.json
{
  "version": 1,
  "partitions": [
    {
      "topic": "test-3",
      "partition": 0,
      "replicas": [183, 181],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-3",
      "partition": 1,
      "replicas": [181, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-3",
      "partition": 2,
      "replicas": [183, 181],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-3",
      "partition": 3,
      "replicas": [181, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 0,
      "replicas": [182, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 1,
      "replicas": [183, 182],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 2,
      "replicas": [182, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 3,
      "replicas": [183, 182],
      "log_dirs": ["any", "any"]
    }
  ]
}

然后执行分配方案即可:

# 执行分配方案
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
# 输出略

# 查看进度
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify 
# 输出略

# 完成后查看test-3和test-4
➜ bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 183     Replicas: 183,181       Isr: 183,181
        Topic: test-3   Partition: 1    Leader: 183     Replicas: 181,183       Isr: 183,181
        Topic: test-3   Partition: 2    Leader: 183     Replicas: 183,181       Isr: 183,181
        Topic: test-3   Partition: 3    Leader: 183     Replicas: 181,183       Isr: 183,181
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 183     Replicas: 182,183       Isr: 183,182
        Topic: test-4   Partition: 1    Leader: 183     Replicas: 183,182       Isr: 183,182
        Topic: test-4   Partition: 2    Leader: 183     Replicas: 182,183       Isr: 183,182
        Topic: test-4   Partition: 3    Leader: 183     Replicas: 183,182       Isr: 183,182

总结

不管扩容还是缩容,或者是故障后手动补齐分区,实质都是分区重分配,使用kafka-reassign-partitions.sh脚本即可。该脚本使用也非常简单:

  1. 先提供一个JSON格式的需要重分配的topic列表,然后执行--generate生成迁移方案文件;
  2. 然后使用--execute执行新的分配方案;
  3. 最后使用--verify查看分配方案执行进度。

如果对于分配方案文件格式很熟悉,可以跳过1.

]]>
<![CDATA[Kafka的可靠性]]> http://niyanchun.com/kafka-availability-and-durability.html 2020-10-18T21:38:00+08:00 2020-10-18T21:38:00+08:00 NYC https://niyanchun.com 本文讨论一下Kafka广义上的可靠性,所谓广义是从不同的角度、不同维度去观察系统的可靠性,比如从生产者(Producer)的角度看如何保证已经确认的数据一定写入了系统,不会丢失;从消费者(Consumer)的角度,只要数据没有彻底丢失,就可以读取。再比如从服务端(Broker)的角度,如何保证在可用性(Availability)和持久性(Durability)方面做权衡。

副本机制

分布式系统的可靠性一般都是通过副本(replicas)机制去实现的,Kafka也不例外。如下图:

kafka

Kafka的topic是逻辑概念,一个topic由若干个分区(partition)组成,副本就是在分区这个粒度上实现的。关于Kafka的副本有几个注意点:

  • Kafka的副本数默认为1。用户可以修改(replication factor),但不能超过节点个数,因为Kafka要保证所有副本不在同一个节点上面。
  • Kafka的副本里面有且仅有一个副本被称为Leader Replica(通过选举产生),剩下的称为Follower Replica。我们说副本个数或者配置副本个数时说的个数指两者的总和,而不是单指Follower Replica。比如默认的副本数为1(default.replication.factor),那就表示只有Leader Replica,没有Follower Replica。如果改为3,则表示有1个Leader Replica,2个Follower Replicas。
  • 生产者将数据写入Leader后,所有Follower主动去Leader拉取数据,而不是Leader或者生产者向Follower推,这个拉的过程和消费者消费是同一套机制和接口,也就是Follower其实只是一种特殊点的消费者。
  • Follower和Leader的数据及offset是完全一样的(因为Follower是异步拉取Leader数据,所以在某一时刻数据可能会略有滞后)。
  • 只有Leader对外提供服务(即供消费者消费),Follower仅是起数据冗余的作用。

对于最后一条,Kafka为何要这样设计,我摘抄了极客时间《Kafka核心技术与实战》专栏里面的解释,供参考:

  1. 方便实现“Read-your-writes”:所谓Read-your-writes,顾名思义就是,当你使用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息。举个例子,比如你平时发微博时,你发完一条微博,肯定是希望能立即看到的,这就是典型的 Read-your-writes 场景。如果允许追随者副本对外提供服务,由于副本同步是异步的,因此有可能出现追随者副本还没有从领导者副本那里拉取到最新的消息,从而使得客户端看不到最新写入的消息
  2. 方便实现单调读(Monotonic Reads):什么是单调读呢?就是对于一个消费者用户而言,在多次消费消息时,它不会看到某条消息一会儿存在一会儿不存在。如果允许追随者副本提供读服务,那么假设当前有 2 个追随者副本 F1 和 F2,它们异步地拉取领导者副本数据。倘若 F1 拉取了 Leader 的最新消息而 F2 还未及时拉取,那么,此时如果有一个消费者先从 F1 读取消息之后又从 F2 拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。但是,如果所有的读请求都是由 Leader 来处理,那么 Kafka 就很容易实现单调读一致性。

另外,我个人觉得Kafka已经通过将topic分多个Partition的机制将整个数据分配到了多个节点上面,不管是写还是读,都已经达到了负载均衡的效果,选择只有Leader对外服务的机制也无伤大雅,还能简化系统的设计。

生产者的确认机制

对于存在副本的系统,从生产者角度,人们最关心的一个问题往往就是生产者写数据的时候是写主副本成功就返回,还是大部分写成功就返回,还是等所有的副本都成功才返回。比如之前介绍的ES,属于最后一种,所有副本写成功才返回,用户不可选择。而Kafka则灵活了很多,默认主副本写成功就返回,但用户也可以灵活配置。

生产者生产消息时,可以选择从服务端获取确认(ack)的方式,有三种:

  • ack=0:数据发送后,不需要服务端确认;
  • ack=1:默认值,数据发送后,Leader Partition写成功就返回;
  • ack=all(或者-1),ISR中所有节点的Partition写成功才返回;

前两种就不说了,重点是最后面all的情况。很容易理解成设置为all,就能保证数据成功写入Leader和所有Follower才返回,其实并不总是这样的。有时会退化成ack=1的情况。不过先简单介绍下ISR。

ISR

Kafka集群认为集群中的一个节点还存活着需要同时满足两个条件:

  1. 节点在zk中注册的临时节点存在;
  2. 如果该节点(的Partition)是Follower,则其与Leader的数据差距不能超过replica.lag.time.max.ms

第1点很好理解,典型的利用zk临时节点特性做服务发现的场景,不再赘述。第2点的话从之前的图里面可以看到,Follower是定期主动去Leader那里拉取数据的,如果它在replica.lag.time.max.ms时间内没有发起拉取请求,或者拉取的太慢,在这个时间范围内还没赶上Leader节点,那Leader就认为该Follower的数据已经和自己不同步了。Leader维护了一个列表,这个列表里面是自己以及和自己同步的Follower,我们把这个列表称为ISR(全称“in-sync replica”)。如果某个Follower和自己不同步了,Leader就会从ISR中将该Follower移除。后面赶上后,又会加进来。

看个例子,我们有3个Kafka节点,broker id分别为181、182、183。现在我们创建一个名为test-1的topic,并且给设置4个Partition,副本个数设置为2:

# 创建topic-1
bin/kafka-topics.sh \
--zookeeper localhost:2181/kafka_26 \
--create \
--topic test-1 \
--partitions 4  \
--replication-factor 2
    
    # 查看topic-1
 bin/kafka-topics.sh --describe --topic test-1  --zookeeper localhost:2181/kafka_26
Topic: test-1   PartitionCount: 4       ReplicationFactor: 2    Configs: 
    Topic: test-1   Partition: 0    Leader: 183     Replicas: 183,181       Isr: 183,181
    Topic: test-1   Partition: 1    Leader: 181     Replicas: 181,182       Isr: 181,182
    Topic: test-1   Partition: 2    Leader: 182     Replicas: 182,183       Isr: 182,183
    Topic: test-1   Partition: 3    Leader: 183     Replicas: 183,182       Isr: 183,182

回到之前的问题,ack=all只保证数据全部写入了ISR中的副本,最差情况下,如果所有的Follower都因为太慢被剔除了ISR,那ISR中就只剩Leader了,此时其实效果和ack=1是一模一样的。那为了避免这种问题,Kafka给了一个参数min.insync.replicas,这个参数的含义是当我们指定ack=all时,ISR中至少要有min.insync.replicas个副本写成功,生产者才会认为数据写成功了,否则生产者就会抛NotEnoughReplicas或者NotEnoughReplicasAfterAppend异常。这个参数的默认值是1。

这里有个注意点就是理论上min.insync.replicas的值应该小于等于replication.facotr,但实际Kafka并没有校验这个,也就是如果你设置的错了,在没有真正写数据,或者ack的值不是all的时候也不会报错。比如下面的创建语句也不会报错的:

bin/kafka-topics.sh \
    --zookeeper localhost:2181/kafka_26 \
    --create \
    --topic test-1 \
    --partitions 4  \
    --replication-factor 3 \
    --config min.insync.replicas=4

所以,如果追求系统的Durability,那我们应该

  1. 至少有2个Broker;
  2. 将副本数(replication.factor)设置为[2, Broker个数]范围内的某个值。
  3. min.insync.replicas设置为[2, replication.factor]范围内的某个值。

副本故障

副本故障涉及两个关键知识点,第一个直接说结论:副本故障后,Kafka不会自动重新分配新的副本。比如某个topic设置了2个副本(replication.factor=2),那其中1个故障后,就只剩1个副本了,Kafka不会自动在其它可用节点上面重新分配一个新的副本,以保证副本数足够。如果碰巧故障分区包含Leader副本,那Kafka会从ISR中选举一个Follower作为新的Leader(但此时,副本数仍然是比预设的少一个的)。如果出现这种情况,可以使用Kafka提供的kafka-reassign-partitions.sh脚本手动恢复分区数(具体操作见:Kafka的扩容和缩容

而如何选举Leader就是第二个关键知识点。如果Leader和所有Follower都故障了,那数据肯定是丢了。但如果只是ISR中的副本全丢了,但ISR之外还有Follower,并且还存活着,那默认情况下数据也是丢了,因为默认Leader只会从ISR中选。对于这种情况,实际中我们可能愿意接受从存活的Follower中选取新的Leader,毕竟这样只丢一部分数据(Follower滞后于Leader的那部分数据)。Kafka提供了一个参数unclean.leader.election.enable,默认为false,如果设置为true,则允许选择ISR之外的Follower作为新的Leader。其实就是可用性和一致性之前的一个权衡。

总结

综合上面的这些介绍,可以知道,Kafka提供的可靠性其实是有诸多条件的,官方的描述如下:

The guarantee that Kafka offers is that a committed message will not be lost, as long as there is at least one in sync replica alive, at all times.

另外,对于一致性和可用性的权衡,也是提供了多个参数,让用户去根据需要,灵活选择。如上篇文章介绍,Kafka是在明确需求场景下设计的系统,追求的不是完美,而是真实场景中的实用性。

]]>
<![CDATA[Kafka的高效]]> http://niyanchun.com/efficient-kafka.html 2020-10-11T22:18:00+08:00 2020-10-11T22:18:00+08:00 NYC https://niyanchun.com Kafka在大数据领域消息中间件的位置独占鳌头很多年了,很重要的一个原因就是其能很高效的承载海量数据,这里的高效指读写能做到低延迟、高吞吐。要做到高效,不是特别难,有很多MQ以及Redis之类的组件都可以做到;要做到支撑海量数据且有良好的水平扩展性,也有很多组件,但能同时兼顾二者的,的确就不多了。而Kafka能同时兼顾,主要是在设计上花费了很多心思,核心的一些点包括:

  • 磁盘的连续读写
  • 充分利用操作系统的PageCache(预读、后写)
  • 利用零拷贝技术
  • 端到端的批量压缩

磁盘连续读写

Kafka选择直接使用磁盘做持久化而非内存主要有两方面原因:

  • 和内存相比,磁盘的容量极大,可以支撑海量数据
  • Kafka使用JVM语言开发,内存越大,GC越慢;同时,内存对象的存储往往需要占用额外的内存空间

当然,选择将数据直接写入磁盘,磁盘与内存的性能差距将是一个避不开的问题。众所周知,Kafka通过磁盘连续读写+内核的PageCache解决了该问题。

磁盘之所以慢,主要慢在了寻址(seek)上面,一般而言,磁盘转动磁头做一次寻址平均需要10ms的时间;而寻址完成之后,往磁盘写入数据是比较快的,写入1MB的数据大约需要30ms(随着磁盘技术的发展以及不同转速规格的磁盘,数据稍有不一,但道理是一样的)。对于连续读写,只寻址一次,所以寻址时间几乎可以忽略;但对于随机读写,写一次就需要寻址一次。这里举个极端的例子感受一下差异,假设我们要写总共100MB的数据:

  • 方式一:随机写,每次写1字节,需要写约$100*10^6$次,需要的时间为$T_{随机写}= 100*10^6次寻址时间 + 100MB数据写入时间 ≈ 100*10^6* 10ms + 100*30ms ≈ 11.57天 $
  • 方式二:连续写,一次写100MB,则需要的时间为$T_{连续写}≈1次寻址时间+100MB数据写入时间 ≈ 3秒$

可以看到,差距是非常大的。这也是我们平时拷贝大量小文件非常慢,一般都要先打包甚至压缩成一个大文件在进行拷贝传输的重要原因之一。另外,在之前的文章里面也提到过,有评测磁盘的连续写入速度甚至可以媲美内存的随机读写(见The Pathologies of Big Data)。

既然连续写性能这么高,那如何实现呢?内核并没有提供连续写的API,也就是我们无法直接控制是随机写还是连续写,但一般追加写在底层都会变为连续读写。所以像常见的能接受只是追加写,且对写入性能要求较高的日志、数据库WAL等场景都是追加写。Kafka的数据在后台实际也是以日志文件的形式保存,也就是存储数据的时候是追加写的。追加写除了可以带来连续写的高效外,还有一些其它好处,比如无论数据量多大,所有顺序执行的操作复杂度都为O(1);读和写不会相互阻塞等。当然,为了解决超大文件给后面按照位移(offset)消费以及数据老化带来的问题,Kafka会限制文件的大小,超过后就滚动形成另外一个文件,同时还创建了一些索引文件,以提高后面的查找速度。

消费的时候在一个分区内也是按消息顺序消费的,所以写的时候连续写的数据,在消费的时候几乎也就是连续读了,所以也可以充分享受操作系统的预读特性,提高读的效率。

引申:因为机械盘的连续读写能力和SSD的读写能力其实差异不是特别大,所以尽管Kafka使用磁盘做持久化,但一般磁盘都不是Kafka的性能瓶颈,所以也通常无需使用SSD。

PageCache

虽然磁盘连续写速度已经很快了,但如果特别频繁的写小数据性能自然也无法保证,而且可能还会慢慢演变成随机写,这种情况一般的思路都是搞个buffer缓存一批数据后再写,以提高吞吐。而对于写磁盘这种情况,除了应用自己内部实现缓存外,还有一种选择就是直接利用操作系统自身的PageCache(也称Disk Cache)机制,Kafka选择了后一种。原因也很简单,后一种完全符合场景需求,而且还会带来额外的一些好处:完全由操作系统负责,应用无感知,简化了开发;PageCache不依赖于应用,即使应用重启,PageCache的缓存依然是有效且可用的;应用内不做缓存,可以减少应用的内存,降低GC的负担。

上面讨论的是写,读也能从PageCache中获益不少。如果数据处理及时,读数据的时候,写入的数据很可能还在PageCache中,这样就无需从磁盘加载,直接就从内存读了。这也是使用Kafka应该尽量做到的点。

引申:因为Kafka设计上使用PageCache做缓存,而不是自己在JVM进程中做应用级的缓存,所以Kafka的JVM Heap一般不需要设置很大,即使大数据量场景,通常5~6GB也足矣。但这不代表它不需要大内存,只是我们需要将更多的内存留给系统PageCache,让Kafka去使用。这点和ES非常像,其实所有比较依赖PageCache的进程都是如此。

零拷贝技术

关于零拷贝(zero-copy)技术的细节就不展开了,网上的文章多如牛毛,这里推荐两篇文章,网上绝大部分文章也都是从这两篇文章来的:

其实原理比较简单,一方面省去了内核态和用户态的数据拷贝(随之也省去了多次上下文切换),另一方面省去了内核态内部的数据拷贝。但对于零拷贝我觉得有两个点需要注意:

  1. 所谓的零拷贝是从内核和CPU的角度而言的,并不是说整个过程没有任何数据拷贝。首先,用户态和内核态的拷贝为0(用户态被直接跳过了);整个过程CPU的参与为0,所有数据中转都是通过DMA进行的:DMA负责将数据从磁盘拷贝到内存,然后从内存拷贝到网卡。
  2. 虽然零拷贝效率很高,但仅适用于从磁盘加载,且无需做任何数据改动就可以直接发送的场景。如果数据在发送前需要做处理,那还是必须从内核态buffer拷贝到用户态buffer,修改后再拷贝回去,也就是我们平时用的最多的场景。Kafka在设计上就就考虑到了这些,所以存储的数据是可以直接发送,而无需做处理的。

所以,这里的零拷贝技术主要是提高了Kafka的读能力。另外,Kafka的消费模型可能会存在多个消费者消费同一份数据,这样,当第一次消费数据时数据会从磁盘拷贝到内存,在缓存有效期间,后续消费者再次消费数据的时候就直接从内存读了,这也是一个设计亮点。

端到端的压缩

关于压缩,看着是最好理解的地方,但其中却有很多需要注意的地方。

论述之前,先提一下消息集(Message Set),虽然对于用户来说,生产和消费都是按照一个一个消息进行的,但Kafka内部的很多操作、传输及存储都不是以单个消息为单位的,而是以消息集为单位的。所谓消息集就是包含多个消息的集合,按照消息集处理主要是为了减少大量小IO操作(网络传输、内存分配、磁盘读写),以提高效率。

然后说压缩。压缩这个事情其实应用层可以自己做,但Kafka做的好处是可以“批量压缩”,即以消息集为单位进行压缩,而不是对单条消息进行压缩(注:以消息集为单位进行压缩是Kafka V2版本的消息格式后才有的,V1版本还是单条消息进行压缩的)。所以,通常的情况就是生产者生产的消息会被组装成消息集通过网络发送到服务端(Broker),然后服务端会按照收到的格式写入到文件,消费者消费的时候服务端再从文件读取直接发给消费者,整个过程中不需要改变消息(也就是用户态代码不需要更改从磁盘加载的数据),所以才可以使用零拷贝。

关于压缩,有个注意点就是在生产者(Producer)和服务端(Broker)都可以指定压缩格式,如果两边不一致,那服务端在收到消息后会重新进行压缩,应该要避免出现这种情况。

结语

Kafka之所以高效,是其设计之初就有了明确的场景和目标,然后针对目标,进行了一系列精巧的设计。单就某一个点去看,也没有什么特别的新颖独到之处,都是应用挺广挺成熟的技术,但环环相扣起来,却造就了一个精妙的系统。

]]>
<![CDATA[Kafka性能测试]]> http://niyanchun.com/kafka-benchmark.html 2019-11-03T22:22:00+08:00 2019-11-03T22:22:00+08:00 NYC https://niyanchun.com 本文内容来自于Jay Kreps于2014年在LinkedIn发布的一篇文章,英文原文见Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)。Jay Kreps是Kafka的早期作者之一,也是提供商业版本Kafka的Confluent公司联合创始人兼CEO。尽管该文发布于2014年4月份,使用的版本是0.8.1,但其结果在五年后的今天依然具有很大的参考价值。因为尽管Kafka如今已经发展到2.3版本,但其核心架构在这5年间并未发生过大的调整,新的版本改动更多的都是在完善和丰富其Stream特性。

作者使用了6台机器,每台机器配置如下:

  • Intel Xeon 2.5 GHz processor with 6cores
  • Six 7200 RPM SATA drives
  • 32GB of RAM
  • 1Gb Ethernet(即千兆网卡)

其中每台机器上面的6块硬盘没有做任何RAID,直接挂载使用,6块硬盘的线性写入能力约为822MB/秒。测试中,使用3台机器部署Kafka,另外3台作为压力机以及部署Zookeeper。整个测试过程中,没有对Kafka进行任何调优。下面介绍测试过程。

测试生产者性能

写入性能测试共进行了4组,不同测试变化的是副本确认机制和写入线程数,其它参数配置均相同。因为有6块硬盘,所以创建了一个包含6个分区的Topic,每个磁盘分布一个分区。写入的消息大小都是100字节,共写入5千万条消息。另外,写的过程中没有任何读取操作。

场景1:1个写线程,无副本复制

测试场景:1个producer线程,数据只写1份(即只有leader replica,无follower replica)

测试结果:821,557 records/秒 或 78.3 MB/秒

结果分析:此时判断应该是网卡基本饱和。因为上面统计的测试结果只计算了原始数据(即每条100字节),但实际发送的时候,每条消息还会附加约22字节的元数据信息,每个请求还会附加topic、partition等信息,所以网络上的数据肯定是大于上述结果的。

场景2:1个写线程,3 replica,异步复制

测试场景:1个producer线程,3份数据(1个leader replica,2个follower replica),副本复制采用异步方式,即写入请求在leader replica将数据写入本地日志之后就返回了,无需等待2个follower replica写。

测试结果:786,980 records/秒 或 75.1MB/秒

结果分析:因为是follower是异步复制的,所以和场景1没有follower时的性能相近。

场景3:1个写线程,3 replica,同步复制

测试场景:和场景2类似,单复制过程采用同步方式,即只有当leader和2个follower都将数据持久化之后,写请求才会返回。

测试结果:421,823 records/秒 或 40.2MB/秒

结果分析:同步复制的延迟导致TPS下降了很多。

场景4:3个写线程,3 replica,异步复制

测试场景:和场景2类似,但使用3个producer线程。为了避免网络瓶颈,3个线程分布在3台机器上。

测试结果:2,024,032 records/秒 或 193MB/秒

结果分析:性能基本上是随着写线程数近线性增长的的。

测试消费者性能

测试消费者性能时,使用的是前面测试生产者产生的数据,即5千万条消息,每个消息100字节,3 replica。测试期间,没有任何生产者。另外,对于Kafka的消费者,需要明确两点:

  1. 在kafka中,只有leader replica对外提供服务,follower replica只是作为冗余,并不对外提供服务。所以消费者的性能与是否有follower replica及其个数没有关系。
  2. 一个消息只有follower replica完成复制后才对消费者可见,这与生产者是同步确认还是异步确认没有关系。

场景5:1个消费者

测试场景:1个消费者线程

测试结果:940,521 records/秒 或 89.7MB/秒

结果分析:读取速度基本达到网卡瓶颈了。

场景6:3个消费者

测试场景:3个消费者线程,分布在3台机器上,同时消费同一个topic

测试结果:2,615,968 records/秒 或 249.5 MB/秒

结果分析:基本上是接近线性增长的

生产者消费者一起

前面测的都是单独的生产者或者消费者的场景,实际中往往是一边生产一边消费,所以作者再测试了一下这种场景。

场景7:生产者消费者协调

测试场景:1个生产者线程,1个消费者线程,6个分区,3个replica,异步复制

测试结果:795,064 records/秒 或 75.8MB/秒(消费者的性能)

结果分析:上面的测试结果是消费者的性能,对比场景2可知该性能其实就是生产者的性能上限。所以,对于Kafka模型而言,消费者和生产者基本是互不影响的。而且二者一起的时候,往往还能更好的利用Page Cache。

通过这一系列的测试可以看到Kafka的性能是非常高的,这得益于它精良的设计。测试过程中使用到的脚本及配置见这里

]]>