Kafka的存储层级概念上比较简单,一个topic分为若干partition,一个partition再分为若干segment。下图是一个示例:

topic是个逻辑概念,partition和segment则是真实存储数据的:一个partition对应磁盘上面一个目录,一个segment对应partition目录下的一个日志文件,消息数据就是以append-only的方式顺序写入segment文件

看个例子,创建一个有2个分区,1个副本的topic:

➜ bin/kafka-topics.sh \                                                             
--zookeeper localhost:2181/kafka_26 \
--create \
--topic test \  
--partitions 2  \
--replication-factor 1 \
--config segment.bytes=1024    # 这个选项后文会介绍

创建完之后,看下数据目录:

# 创建topic之前的数据目录(该集群还没有topic)
➜  kafka_2.13-2.6.0 ll data 
total 4.0K
-rw-r--r--. 1 root root  0 Oct 31 17:43 cleaner-offset-checkpoint
-rw-r--r--. 1 root root  0 Oct 31 17:43 log-start-offset-checkpoint
-rw-r--r--. 1 root root 90 Oct 31 17:43 meta.properties
-rw-r--r--. 1 root root  0 Oct 31 17:43 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root  0 Oct 31 17:43 replication-offset-checkpoint

# 创建后
➜ ll -R data/
data:
total 8.0K
-rw-r--r--. 1 root root   0 Oct 31 17:43 cleaner-offset-checkpoint
-rw-r--r--. 1 root root   0 Oct 31 17:43 log-start-offset-checkpoint
-rw-r--r--. 1 root root  90 Oct 31 17:43 meta.properties
-rw-r--r--. 1 root root   0 Oct 31 17:43 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root  22 Oct 31 17:47 replication-offset-checkpoint
drwxr-xr-x. 2 root root 141 Oct 31 17:47 test-0
drwxr-xr-x. 2 root root 141 Oct 31 17:47 test-1

# 目录内容
data/test-0:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root   0 Oct 31 17:47 00000000000000000000.log
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

data/test-1:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root   0 Oct 31 17:47 00000000000000000000.log
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

可以看到,创建test这个topic之后,多出来了两个目录,test-0和test-1,这两个就是partition对应的物理目录。Partition的目录格式为:{topic名}-{序号},序号从0开始。Partition目录的文件格式是一样的,初始状态有四个文件:

重点要说的是00000000000000000000.log这个文件,这个文件就是顺序写消息数据的,关于这个文件有下面几个关键点:

  • 这样1个文件就是1个segment;
  • segment文件是二进制编码的,人眼无法读,但可以借助工具查看内容;
  • segment文件会根据大小滚动产生新的;
  • 前面的数字(即文件名)是当前segment中第1个消息的起始offset。每个partition都有自己的offset,从0开始,每写入一条消息,就加1。

结合一些具体例子看一下。首先看下segment文件的构成,写入两条数据:

# 写入两条数据
➜ bin/kafka-console-producer.sh --broker-list localhost:9092  --topic test   
>hello world
>bye world
# 查看数据目录,可以看到数据分别写到了两个partition下面。Kafka会自己均衡负载
➜  kafka_2.13-2.6.0 ll -R data/test*                        
data/test-0:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root  79 Oct 31 17:48 00000000000000000000.log    # 有数据了
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

data/test-1:
total 21M
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.index
-rw-r--r--. 1 root root  77 Oct 31 17:48 00000000000000000000.log # 有数据了
-rw-r--r--. 1 root root 10M Oct 31 17:47 00000000000000000000.timeindex
-rw-r--r--. 1 root root   8 Oct 31 17:47 leader-epoch-checkpoint

# 直接查看segment文件
➜  kafka_2.13-2.6.0 cat data/test-0/00000000000000000000.log 
C;��ux�aZux�aZ��������������"hello world#                                                                                                 
root@NYC-DEV ➜  kafka_2.13-2.6.0 cat data/test-1/00000000000000000000.log
AM,��ux�reux�re��������������bye world# 

从上面可以看到,直接查看segment文件有乱码,因为是二进制的,但消息内容还是能够看出来的(如果我们开启了压缩的话,消息部分也是无法看的)。Kafka提供了查看segment的工具kafka-run-class.sh:

➜ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files data/test-0/00000000000000000000.log
Dumping data/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1604051296602 size: 79 magic: 2 compresscodec: NONE crc: 1003684042 isvalid: true | offset: 0 CreateTime: 1604051296602 keysize: -1 valuesize: 11 sequence: -1 headerKeys: [] payload: hello world

➜ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files data/test-1/00000000000000000000.log
Dumping data/test-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1604051300965 size: 77 magic: 2 compresscodec: NONE crc: 1294785255 isvalid: true | offset: 0 CreateTime: 1604051300965 keysize: -1 valuesize: 9 sequence: -1 headerKeys: [] payload: bye world

可以看到除了数据外,还有很多元数据信息。然后接着多写入一些数据,观察一下segment的切换。默认1个segment文件达到1GB(server.properties中的log.segment.bytes=1073741824)以后才会切换新的,为了演示方便,创建topic的时候我把这个配置设置成了1024(字节),也就是超过1KB就会切换,可以通过命令看下现在这个topic的配置:

➜ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name test --describe
Dynamic configs for topic test are:
  segment.bytes=1024 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1024, DEFAULT_CONFIG:log.segment.bytes=1073741824}

写入较多数据后,查看数据目录:

➜  kafka_2.13-2.6.0 ll data/test*
data/test-0:
total 21M
-rw-r--r--. 1 root root   0 Oct 31 18:09 00000000000000000000.index
-rw-r--r--. 1 root root 879 Oct 31 18:09 00000000000000000000.log
-rw-r--r--. 1 root root  12 Oct 31 18:09 00000000000000000000.timeindex
-rw-r--r--. 1 root root   0 Oct 31 18:10 00000000000000000005.index
-rw-r--r--. 1 root root 935 Oct 31 18:10 00000000000000000005.log
-rw-r--r--. 1 root root  10 Oct 31 18:09 00000000000000000005.snapshot
-rw-r--r--. 1 root root  12 Oct 31 18:10 00000000000000000005.timeindex
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.index
-rw-r--r--. 1 root root 315 Oct 31 18:10 00000000000000000013.log
-rw-r--r--. 1 root root  10 Oct 31 18:10 00000000000000000013.snapshot
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.timeindex
-rw-r--r--. 1 root root   8 Oct 31 18:05 leader-epoch-checkpoint

data/test-1:
total 21M
-rw-r--r--. 1 root root   0 Oct 31 18:09 00000000000000000000.index
-rw-r--r--. 1 root root 877 Oct 31 18:09 00000000000000000000.log
-rw-r--r--. 1 root root  12 Oct 31 18:09 00000000000000000000.timeindex
-rw-r--r--. 1 root root   0 Oct 31 18:10 00000000000000000005.index
-rw-r--r--. 1 root root 936 Oct 31 18:10 00000000000000000005.log
-rw-r--r--. 1 root root  10 Oct 31 18:09 00000000000000000005.snapshot
-rw-r--r--. 1 root root  12 Oct 31 18:10 00000000000000000005.timeindex
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.index
-rw-r--r--. 1 root root 352 Oct 31 18:10 00000000000000000013.log
-rw-r--r--. 1 root root  10 Oct 31 18:10 00000000000000000013.snapshot
-rw-r--r--. 1 root root 10M Oct 31 18:10 00000000000000000013.timeindex
-rw-r--r--. 1 root root   8 Oct 31 18:05 leader-epoch-checkpoint

可以看到,已经多出了两个segment:00000000000000000005.log00000000000000000013.log。前面说segment文件名里面的序号是当前segment里面第一个消息的offset。这里以00000000000000000005.log为例验证一下:

# 00000000000000000005.log文件
➜ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --deep-iteration --print-data-log --files data/test-0/00000000000000000005.log
Dumping data/test-0/00000000000000000005.log
Starting offset: 5
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1604052567547 size: 200 magic: 2 compresscodec: NONE crc: 1854811761 isvalid: true | offset: 5 CreateTime: 1604052567547 keysize: -1 valuesize: 130 sequence: -1 headerKeys: [] payload: a long long long long long long long long long long long long long long long long long long long long long long long long string 9
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 200 CreateTime: 1604052614272 size: 105 magic: 2 compresscodec: NONE crc: 4253105822 isvalid: true | offset: 6 CreateTime: 1604052614272 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 12
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 305 CreateTime: 1604052625276 size: 105 magic: 2 compresscodec: NONE crc: 1470066054 isvalid: true | offset: 7 CreateTime: 1604052625276 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 14
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 410 CreateTime: 1604052628456 size: 105 magic: 2 compresscodec: NONE crc: 992483528 isvalid: true | offset: 8 CreateTime: 1604052628456 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 16
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 515 CreateTime: 1604052631303 size: 105 magic: 2 compresscodec: NONE crc: 799335592 isvalid: true | offset: 9 CreateTime: 1604052631303 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 18
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 620 CreateTime: 1604052634136 size: 105 magic: 2 compresscodec: NONE crc: 2893899522 isvalid: true | offset: 10 CreateTime: 1604052634136 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 20
baseOffset: 11 lastOffset: 11 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 725 CreateTime: 1604052636553 size: 105 magic: 2 compresscodec: NONE crc: 901773285 isvalid: true | offset: 11 CreateTime: 1604052636553 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 22
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 830 CreateTime: 1604052639396 size: 105 magic: 2 compresscodec: NONE crc: 3365784921 isvalid: true | offset: 12 CreateTime: 1604052639396 keysize: -1 valuesize: 37 sequence: -1 headerKeys: [] payload: a long long long long long  string 24

可以看到该文件的第一个offset是5,最后一个offset为12,所以当前segment文件是00000000000000000005.log;下个文件的第一个offset就是13,也就是后面的00000000000000000013.log文件。

kafka的log文件都是以appen-only的方式写的,无法修改。所以把partition分成很多个大小可控的segment会带来很多好处,比如数据的滚动、删除、查找等涉及物理层的操作底层都是以segment为单位的,这个也很容易理解。但是即使拆分后,对于1GB(默认)的文件进行高效搜索也是比较麻烦的,而指定offset进行消费又是使用非常普遍的情况。为了解决这个问题,kafka为每个segment创建一个索引文件,就是那个.index后缀的文件,该文件会存储每个offset在segment文件中的实际物理偏移量。这样当指定消费某个offset的数据时,可以先通过segment的文件名找到该offset对应的segment文件,然后再通过index文件直接定位到数据在该segment中的物理位置。相当于通过segment文件名和index文件给所有数据构造了一个二级索引,给定任何offset,通过两步即可定位到实际存储的位置。

最后就是数据删除,删除是以topic为单位,删除时会先标记,然后定期真正物理删除。比如下面的索引已经被标记为删除:

➜ ll data/test* 
total 20K
drwxr-xr-x. 2 root root 141 Oct 31 16:33 test-0.a897c64b91b8419d86426c1817fcf650-delete
drwxr-xr-x. 2 root root 141 Oct 31 16:33 test-1.0767a8ede4314439accbf67039089804-delete

最后总结一下,

  1. Kafka的topic逻辑上包含了若干个partition,每个partition对应磁盘上一个目录;写数据的时候就是以append-only的方式在partition目录写log文件。
  2. 为了防止文件过大不利于搜索和管理,又按照大小(默认1G)进行滚动,生成多个文件,每个文件称为一个segment。segment的文件名就是该文件里面第1个offset的值。
  3. 为了高效的根据offset进行消费,给每个segment文件生成了一个索引文件,索引里面记录了该segment里面每个offset对应的实际偏移量。