Filebeat作为elastic公司使用Golang开发的新一代的日志采集工具,旨在替换原来Logstash的日志收集功能(注意,Logstash的核心功能包含收集和解析两大块,Filebeat的定位只是替代收集部分,所以Filebeat并不能完全取代Logstash)。因为其性能高,资源占用少且轻量级,已经逐渐被很多公司采用。而使用的问题也非常多,今天来讨论之前在elastic中文社区看到的一个问题:
filebeat采集log,每次filebeat程序断开,重新启动之后会生成一个registry.new文件,并且不会继续采集log,请问是什么原因呢?
要搞清楚这个问题,其实要回答两个问题:
- 为什么日志不继续采集?
- 为什么重启会生成一个registry.new文件?
本文的讨论都基于Filebeat6.4~6.8版本。如果你对Filebeat的配置项非常了解,其实这类不收集的问题很好排查。如果真的是不继续采集日志了,那原因很明确就是没有日志需要采集了,比如所有已存的日志已经都采集完了,而且没有新的日志产生。或者已经存在的日志不再采集范围内,这时候可通过以下一些点去排查:
- 检查配置中的
paths
,确认需要采集的文件是否包含在设置的采集路径里面;如果设置了正则,确保文件能被匹配到(paths支持的正则只是有限的正则); - 检查上面
paths
有没有配置enable
项,如果配置了确认是否为true
(如果没配置,默认就是true); - 检查
ignore_older
是否有配置,是不是日志文件的最后修改时间(可使用stat
命令查看)已经超过了忽略的时间点,即这些文件已经被忽略了; - 查看filebeat的日志,看filebeat是否在正常工作。如果是info级别的日志的话,每收集(打开)和关闭一个文件都会有打印一条日志,日志中包含文件的绝对路径及文件名。而且默认每30秒会打印一条monitor日志,里面包含了很多信息,包括正则运行的havester个数,已经确认的event等信息,看Filebeat状态是否正常;
- 如果是某个文件不收集,就是去registry文件里面搜一下这个文件,看是否已经收集过了,并且注意对比一下时间;
一般这样排查下来之后,问题原因就能找到。当然,注意上面有个前提,就是真的是不继续采集日志了。而重启后,一般都不是不采集日志了,而是filebeat在从registry文件恢复状态。而这个恢复是比较慢的,registry文件越大,恢复时间越长。我之前测试过一次,一个大概2MB的registry文件,恢复了约三四十分钟,而这个恢复过程中输出的日志都是debug级别的,也就是默认info级别的情况下,你会发现filebeat既不工作,也没有输出任何日志,而且因为恢复的时候是单线程(更准确的说是单个Goroutine)的,所以CPU也占用也很少,所以给人感觉好像是filebeat不工作了,也就是问题中描述的“不继续采集log”。
至于registry.new文件,其实不光是会在恢复期间会产生,它在任意时刻都可能会产生,这是Filebeat写registry文件的机制。结合代码简单分析一下:
// registrar.go
func (r *Registrar) Run() {
logp.Debug("registrar", "Starting Registrar")
// Writes registry on shutdown
defer func() {
r.writeRegistry()
r.wg.Done()
}()
var (
timer *time.Timer
flushC <-chan time.Time
)
for {
select {
case <-r.done:
logp.Info("Ending Registrar")
return
case <-flushC:
flushC = nil
timer.Stop()
r.flushRegistry()
case states := <-r.Channel:
r.onEvents(states)
if r.flushTimeout <= 0 {
r.flushRegistry()
} else if flushC == nil {
timer = time.NewTimer(r.flushTimeout)
flushC = timer.C
}
}
}
}
registrar协程启动以后,在for循环里面会调用flushRegistry
写registry文件,其实就是将内存中的数据写到磁盘。写的时机由filebeat.registry_flush
配置项决定,其默认值为0。该配置项就是代码中的r.flushTimeout
,等于0的时候,是每成功发送一批(batch)数据,就会写一次。如果大于0,则按照这个频次定期刷新。看下刷新的具体代码(有精简):
func (r *Registrar) flushRegistry() {
if err := r.writeRegistry(); err != nil {
logp.Err("Writing of registry returned error: %v. Continuing...", err)
}
// 省略
}
// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
// 省略
tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states)
if err != nil {
registryFails.Inc()
return err
}
err = helper.SafeFileRotate(r.registryFile, tempfile)
if err != nil {
registryFails.Inc()
return err
}
logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registrySuccess.Inc()
return nil
}
func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
logp.Debug("registrar", "Write registry file: %s", baseName)
tempfile := baseName + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
return "", err
}
defer f.Close()
encoder := json.NewEncoder(f)
if err := encoder.Encode(states); err != nil {
logp.Err("Error when encoding the states: %s", err)
return "", err
}
// Commit the changes to storage to avoid corrupt registry files
if err = f.Sync(); err != nil {
logp.Err("Error when syncing new registry file contents: %s", err)
return "", err
}
return tempfile, nil
}
从代码看着就非常清楚了,写的时候先写临时文件registry.new,写完以后重命名成registry(helper.SafeFileRotate(r.registryFile, tempfile)
)。我个人觉得恢复状态是一个比较重要的过程,而且又可能很慢,其实可以打印info级别的日志的(比如像Kafka集群恢复的时候会打印恢复过程,让用户知道集群目前在做什么)这样也不会对用户造成困扰。如果真的遇到这个问题,怎么来确认是不是在从registry恢复状态呢,这里我教你几个方法:
- 修改日志级别改为
debug
,这个最直观,但不适用于生产环境。所以大家可以观察filebeat是否有输出这样一条日志:2019-07-04T14:52:43.987+0800 INFO crawler/crawler.go:106 Loading and starting Inputs completed. Enabled inputs: 1
,状态恢复完毕之后,会打印这样一条日志,而这个日志是info级别的,且就在filebeat启动的时候打印。如果一直没有输出这条日志,那就是还在恢复。
好,现在问题一般就可以定位清楚了,但并没有被很好的解决。因为这个恢复时间太长了,特别是如果你的registry文件很大的话,恢复几个小时,十几个小时都是完全有可能的,在恢复完之前,filebeat都是不工作的。那如何避免呢?有两类方法。
- 不会丢数据的方法:将
filebeat.registry_flush
参数由默认的0改为非0。我测了一下,改成1s时,27MB(包含20万个文件的状态)不到3秒就可以加载完了。这样改的弊端就是当Filebeat故障时,重复采集的文件会多一些,因为持久化结果的频次降低了,但不会丢数据。大多数情况下,日志重复采集问题不大。 控制registry的大小,这种可能会丢数据。filebeat提供了两个
clean
开头的参数来让我们尽量控制registry文件的大小,但这两个参数都有可能在某些场景下导致数据丢失,所以使用时一定确保你特别清楚这两个参数的用途及副作用。官方对于参数描述比较详细,这里我只简单说明一下:- clean-inactive:这个参数配置一个时间,如果一个文件超过这个时间都没有变更过,那它的状态就会从registry文件中移除。这个时间的值必须大于
ignore_older + scan_frequency
两个参数的时间和,以保证一个文件在收集的时候不会被移除状态。需要注意,这个参数的默认值是0,表示这个功能是disable的,即默认没有开启。 - clean-removed:这个参数比较简单,就是如果文件从磁盘上移除了,那就会从registry中删除这个文件的状态,该参数默认是开启的。
- clean-inactive:这个参数配置一个时间,如果一个文件超过这个时间都没有变更过,那它的状态就会从registry文件中移除。这个时间的值必须大于
当然这两个参数除了用于减小registry文件的大小,也可以有效避免inode重用导致的问题(但无法杜绝)。Filebeat使用inode唯一标识一个文件,而不是文件名,而unix*系统的inode是可能重复的,一旦重复,如果这个状态还存储在registry文件中,那filebeat就会产生问题,表现出来的现象就是数据截断,这个以后再说。
最后附上恢复状态的代码,供有兴趣的同学研究。beat的代码更新非常快,变化也非常大,以下代码取自filebeat 6.4.3。加载状态的方法是loadStates
,在github.com/elastic/beats/filebeat/input/log/input.go文件中:
// LoadStates loads states into input
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the input will be loaded and updated. All other states will not be touched.
func (p *Input) loadStates(states []file.State) error {
logp.Debug("input", "exclude_files: %s. Number of stats: %d", p.config.ExcludeFiles, len(states))
for _, state := range states {
// Check if state source belongs to this input. If yes, update the state.
if p.matchesFile(state.Source) && p.matchesMeta(state.Meta) {
state.TTL = -1
// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return fmt.Errorf("Can only start an input when all related states are finished: %+v", state)
}
// Update input states and send new states to registry
err := p.updateState(state)
if err != nil {
logp.Err("Problem putting initial state: %+v", err)
return err
}
}
}
logp.Debug("input", "input with previous states loaded: %v", p.states.Count())
return nil
}
Filebeat虽然简单,但只是相对于ES、Logstash这些而言的,其本身代码量也非常大,只看官方文档其实只能明白怎么用,对于一些细节,还是只能从源码寻找答案,特别是遇到一些比较奇怪的问题或者有定制化需求的时候。
本文先介绍到这,后面会从源码角度再讨论Filebeat的一些机制和使用时的注意点。
2021.4.26更新
registry.new
的分析filebeat.registry_flush
参数说明