本文讨论Filebeat收集文件的时候可能产生的数据重复或者数据截断及丢失的问题。

数据重复

关于数据重复我们先看关于Filebeat的一段官方描述:

Filebeat guarantees that events will be delivered to the configured output at least once and with no data loss. Filebeat is able to achieve this behavior because it stores the delivery state of each event in the registry file.

从这里可以看出Filebeat对于收集到的数据(即event)的传输保证的是"at least once",而不是"exactly once",也就是Filebeat传输的数据是有可能有重复的。这里我们讨论一下可能产生重复数据的一些场景,我大概将其分为两类。

第一类:Filebeat重传导致数据重复。重传是因为Filebeat要保证数据至少发送一次,进而避免数据丢失。具体来说就是每条event发送到output后都要等待ack,只有收到ack了才会认为数据发送成功,然后将状态记录到registry。当然实际操作的时候为了高效是批量发送,批量确认的。而造成重传的场景(也就是没有收到ack)非常多,而且很多都不可避免,比如后端不可达、网络传输失败、程序突然挂掉等等。

第二类:配置不当或操作不当导致文件重复收集。Filebeat感知文件有没有被收集过靠的是registry文件里面记录的状态,如果一个文件已经被收集过了,但因为各种原因它的状态从registry文件中被移除了,而恰巧这个文件还在收集范围内,那就会再收集一次。

对于第一类产生的数据重复一般不可避免,而第二类可以避免,但总的来说,Filebeat提供的是at least once的机制,所以我们在使用时要明白数据是可能重复的。如果业务上不能接受数据重复,那就要在Filebeat之后的流程中去重。

数据截断或丢失

上面的官方描述中说了Filebeat可以保证”no data loss“,但实际中并非如此。就跟各种流处理框架宣传的无比优秀,比如可以保证”exactly once message processing“,但实际是情况是这种保证都是有前提条件的,只是更加优秀的框架这个前提条件更少而已(比如flink),但不管怎样,宣传中一般都绝口不提那些前提条件,因为别人都不提,我如果提了,那只有坏处,没有好处。有点跑远了。Filebeat这里保证的没有数据丢失也同样是有前提条件的,就是只保证数据传输时不丢失。说的再清楚一点就是如果你的数据在采集阶段没有丢失,那后面的传输可以保证不丢(因为有ack的确认和重传机制),但如果数据在采集阶段就丢失了,那就真丢了。Filebeat自身机制方面的一些缺陷,导致即使你的使用方式完全正确,数据在采集这一步就丢失的问题依旧是无法完全避免的,我们能做的就是明白丢失的原因,并有针对性的降低丢失的可能性。

接着我们来分析一下这个机制中的缺陷。Filebeat处理文件时会维护一个状态,这个状态里面记录了收集过的每一个带绝对路径的文件名,文件的inode值,文件内容上次收集的位置(即offset)以及其它一些信息。这个状态维护在内存里面,过一段时间会刷新到磁盘上,默认刷到registry这个文件里面。程序如果重启,就从这个文件重新加载,恢复之前的状态。可以认为这个文件就是Filebeat能正常工作的核心。而这个机制上的缺陷主要和Filebeat判断文件是否truncate的方式有关系。我们先看下判断的代码(Filebeat 6.4.3,github.com/elastic/beats/filebeat/input/log/input.go):

// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
  // 省略部分代码

    // File size was reduced -> truncated file
    if oldState.Finished && newState.Fileinfo.Size() < oldState.Offset {
        logp.Debug("input", "Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Fileinfo.Size())
        err := p.startHarvester(newState, 0)
        if err != nil {
            logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
        }

        filesTruncated.Add(1)
        return
    }

  // 省略部分代码
}

可以看到,Filebeat认为只要文件的大小比之前记录的这个文件(inode唯一标识)的offset小,就认为是truncate掉了,就会从文件头开始重新收集,这一般没什么问题。但如果这个文件的大小比offset大,就认为文件没变过,接着从上次的offset处理,但实际文件可能truncate过,甚至已经不是之前的文件了(inode重复导致)。registry文件里面虽然记录了文件名,但Filebeat唯一标识一个文件使用的是里面的inode值,而非文件名(所以文件重命名对于Filebeat正常工作没有影响),但操作系统的inode值是会复用的。这里举一个我碰到过的场景,比如原来有一个文件A,Filebeat处理过之后将其inode,以及处理的offset(假设为n)记录到了registry文件中。后来这个文件删除了,但registry里面记录的状态还没有自动删除,此时如果有另外一个文件B正好复用了之前A的inode,那Filebeat就会认为这个文件之前处理过,且已经处理到了offset为n处。如果B的文件比A小,即文件的end offset都小于n,那Filebeat就会认为是原来的A文件被truncate掉了,此时会从头开始收集,没有问题。但如果B的end offset大于等于n,那Filebeat就认为是A文件有更新,然后就会从offset为n处开始处理,于是B的前n个字节的数据就丢失了,这样我们就会看到数据有被截断。

其实,即使没有inode重用的问题,上面例子中的问题依旧可能。如果一个文件达到了限制(比如大小),不是重新创建一个新的文件写,而是将这个文件truncate掉继续复用(当然实际中这种场景好像比较少,但也并非没有),Filebeat下次来检查这个文件是否有变动的时候,这个文件的大小如果大于之前记录的offset,也会发生上面的情况。这个问题在github上面是有issue的,但目前还没有解决,官方回复是Filebeat的整个机制在重构中。

数据截断属于数据采集时丢失的一种情况。还有一些其它情况,比如文件数太多,Filebeat的处理能力有限,在还没来得及处理的时候这些文件就被删掉了(比如rotate给老化掉了)也会造成数据丢失。还有就是后端不可用,所以Filebeat还在重试,但源文件被删了,那数据也就丢了。因为Filebeat的重试并非一直发送已经收集到内存里面的event,必要的时候会重新从源文件读,比如程序重启。这些情况的话,只要不限制Filebeat的收集能力,同时保证后端的可用性,网络的可用性,一般问题不大。

所以,使用Filebeat既可能产生数据重复,也可能产生数据丢失,不同的业务场景,影响程度各异,但作为使用者,我们需要做到心中有数,出现这些问题时,明白为什么会这样。