Filebeat作为elastic公司使用Golang开发的新一代的日志采集工具,旨在替换原来Logstash的日志收集功能(注意,Logstash的核心功能包含收集解析两大块,Filebeat的定位只是替代收集部分,所以Filebeat并不能完全取代Logstash)。因为其性能高,资源占用少且轻量级,已经逐渐被很多公司采用。而使用的问题也非常多,今天来讨论之前在elastic中文社区看到的一个问题:

filebeat采集log,每次filebeat程序断开,重新启动之后会生成一个registry.new文件,并且不会继续采集log,请问是什么原因呢?

要搞清楚这个问题,其实要回答两个问题:

  1. 为什么日志不继续采集?
  2. 为什么重启会生成一个registry.new文件?

本文的讨论都基于Filebeat6.4~6.8版本。如果你对Filebeat的配置项非常了解,其实这类不收集的问题很好排查。如果真的是不继续采集日志了,那原因很明确就是没有日志需要采集了,比如所有已存的日志已经都采集完了,而且没有新的日志产生。或者已经存在的日志不再采集范围内,这时候可通过以下一些点去排查:

  • 检查配置中的paths,确认需要采集的文件是否包含在设置的采集路径里面;如果设置了正则,确保文件能被匹配到(paths支持的正则只是有限的正则);
  • 检查上面paths有没有配置enable项,如果配置了确认是否为true(如果没配置,默认就是true);
  • 检查ignore_older是否有配置,是不是日志文件的最后修改时间(可使用stat命令查看)已经超过了忽略的时间点,即这些文件已经被忽略了;
  • 查看filebeat的日志,看filebeat是否在正常工作。如果是info级别的日志的话,每收集(打开)和关闭一个文件都会有打印一条日志,日志中包含文件的绝对路径及文件名。而且默认每30秒会打印一条monitor日志,里面包含了很多信息,包括正则运行的havester个数,已经确认的event等信息,看Filebeat状态是否正常;
  • 如果是某个文件不收集,就是去registry文件里面搜一下这个文件,看是否已经收集过了,并且注意对比一下时间;

一般这样排查下来之后,问题原因就能找到。当然,注意上面有个前提,就是真的是不继续采集日志了。而重启后,一般都不是不采集日志了,而是filebeat在从registry文件恢复状态。而这个恢复是比较慢的,registry文件越大,恢复时间越长。我之前测试过一次,一个大概2MB的registry文件,恢复了约三四十分钟,而这个恢复过程中输出的日志都是debug级别的,也就是默认info级别的情况下,你会发现filebeat既不工作,也没有输出任何日志,而且因为恢复的时候是单线程(更准确的说是单个Goroutine)的,所以CPU也占用也很少,所以给人感觉好像是filebeat不工作了,也就是问题中描述的“不继续采集log”。

至于registry.new文件,其实不光是会在恢复期间会产生,它在任意时刻都可能会产生,这是Filebeat写registry文件的机制。结合代码简单分析一下:

// registrar.go
func (r *Registrar) Run() {
    logp.Debug("registrar", "Starting Registrar")
    // Writes registry on shutdown
    defer func() {
        r.writeRegistry()
        r.wg.Done()
    }()

    var (
        timer  *time.Timer
        flushC <-chan time.Time
    )

    for {
        select {
        case <-r.done:
            logp.Info("Ending Registrar")
            return
        case <-flushC:
            flushC = nil
            timer.Stop()
            r.flushRegistry()
        case states := <-r.Channel:
            r.onEvents(states)
            if r.flushTimeout <= 0 {
                r.flushRegistry()
            } else if flushC == nil {
                timer = time.NewTimer(r.flushTimeout)
                flushC = timer.C
            }
        }
    }
}

registrar协程启动以后,在for循环里面会调用flushRegistry写registry文件,其实就是将内存中的数据写到磁盘。写的时机由filebeat.registry_flush配置项决定,其默认值为0。该配置项就是代码中的r.flushTimeout,等于0的时候,是每成功发送一批(batch)数据,就会写一次。如果大于0,则按照这个频次定期刷新。看下刷新的具体代码(有精简):

func (r *Registrar) flushRegistry() {
    if err := r.writeRegistry(); err != nil {
        logp.Err("Writing of registry returned error: %v. Continuing...", err)
    }
    // 省略
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
    // 省略
    tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states)
    if err != nil {
        registryFails.Inc()
        return err
    }

    err = helper.SafeFileRotate(r.registryFile, tempfile)
    if err != nil {
        registryFails.Inc()
        return err
    }

    logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
    registrySuccess.Inc()

    return nil
}

func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
    logp.Debug("registrar", "Write registry file: %s", baseName)

    tempfile := baseName + ".new"
    f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
    if err != nil {
        logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
        return "", err
    }

    defer f.Close()

    encoder := json.NewEncoder(f)

    if err := encoder.Encode(states); err != nil {
        logp.Err("Error when encoding the states: %s", err)
        return "", err
    }

    // Commit the changes to storage to avoid corrupt registry files
    if err = f.Sync(); err != nil {
        logp.Err("Error when syncing new registry file contents: %s", err)
        return "", err
    }

    return tempfile, nil
}

从代码看着就非常清楚了,写的时候先写临时文件registry.new,写完以后重命名成registry(helper.SafeFileRotate(r.registryFile, tempfile))。我个人觉得恢复状态是一个比较重要的过程,而且又可能很慢,其实可以打印info级别的日志的(比如像Kafka集群恢复的时候会打印恢复过程,让用户知道集群目前在做什么)这样也不会对用户造成困扰。如果真的遇到这个问题,怎么来确认是不是在从registry恢复状态呢,这里我教你几个方法:

  • 修改日志级别改为debug,这个最直观,但不适用于生产环境。所以大家可以观察filebeat是否有输出这样一条日志:2019-07-04T14:52:43.987+0800 INFO crawler/crawler.go:106 Loading and starting Inputs completed. Enabled inputs: 1 ,状态恢复完毕之后,会打印这样一条日志,而这个日志是info级别的,且就在filebeat启动的时候打印。如果一直没有输出这条日志,那就是还在恢复。

好,现在问题一般就可以定位清楚了,但并没有被很好的解决。因为这个恢复时间太长了,特别是如果你的registry文件很大的话,恢复几个小时,十几个小时都是完全有可能的,在恢复完之前,filebeat都是不工作的。那如何避免呢?有两类方法。

  1. 不会丢数据的方法:将filebeat.registry_flush参数由默认的0改为非0。我测了一下,改成1s时,27MB(包含20万个文件的状态)不到3秒就可以加载完了。这样改的弊端就是当Filebeat故障时,重复采集的文件会多一些,因为持久化结果的频次降低了,但不会丢数据。大多数情况下,日志重复采集问题不大。
  2. 控制registry的大小,这种可能会丢数据。filebeat提供了两个clean开头的参数来让我们尽量控制registry文件的大小,但这两个参数都有可能在某些场景下导致数据丢失,所以使用时一定确保你特别清楚这两个参数的用途及副作用。官方对于参数描述比较详细,这里我只简单说明一下:

    • clean-inactive:这个参数配置一个时间,如果一个文件超过这个时间都没有变更过,那它的状态就会从registry文件中移除。这个时间的值必须大于ignore_older + scan_frequency两个参数的时间和,以保证一个文件在收集的时候不会被移除状态。需要注意,这个参数的默认值是0,表示这个功能是disable的,即默认没有开启
    • clean-removed:这个参数比较简单,就是如果文件从磁盘上移除了,那就会从registry中删除这个文件的状态,该参数默认是开启的

当然这两个参数除了用于减小registry文件的大小,也可以有效避免inode重用导致的问题(但无法杜绝)。Filebeat使用inode唯一标识一个文件,而不是文件名,而unix*系统的inode是可能重复的,一旦重复,如果这个状态还存储在registry文件中,那filebeat就会产生问题,表现出来的现象就是数据截断,这个以后再说。

最后附上恢复状态的代码,供有兴趣的同学研究。beat的代码更新非常快,变化也非常大,以下代码取自filebeat 6.4.3。加载状态的方法是loadStates,在github.com/elastic/beats/filebeat/input/log/input.go文件中:

// LoadStates loads states into input
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the input will be loaded and updated. All other states will not be touched.
func (p *Input) loadStates(states []file.State) error {
    logp.Debug("input", "exclude_files: %s. Number of stats: %d", p.config.ExcludeFiles, len(states))

    for _, state := range states {
        // Check if state source belongs to this input. If yes, update the state.
        if p.matchesFile(state.Source) && p.matchesMeta(state.Meta) {
            state.TTL = -1

            // In case a input is tried to be started with an unfinished state matching the glob pattern
            if !state.Finished {
                return fmt.Errorf("Can only start an input when all related states are finished: %+v", state)
            }

            // Update input states and send new states to registry
            err := p.updateState(state)
            if err != nil {
                logp.Err("Problem putting initial state: %+v", err)
                return err
            }
        }
    }

    logp.Debug("input", "input with previous states loaded: %v", p.states.Count())
    return nil
}

Filebeat虽然简单,但只是相对于ES、Logstash这些而言的,其本身代码量也非常大,只看官方文档其实只能明白怎么用,对于一些细节,还是只能从源码寻找答案,特别是遇到一些比较奇怪的问题或者有定制化需求的时候。

本文先介绍到这,后面会从源码角度再讨论Filebeat的一些机制和使用时的注意点。