NYC's Blog - Filebeat http://niyanchun.com/tag/Filebeat/ zh-CN Sat, 06 Mar 2021 12:05:00 +0800 Sat, 06 Mar 2021 12:05:00 +0800 Filebeat源码浅析 http://niyanchun.com/filebeat-source-learning.html http://niyanchun.com/filebeat-source-learning.html Sat, 06 Mar 2021 12:05:00 +0800 NYC 本文对Filebeat代码进行简单分析,作为之前 Filebeat导致文件无法被删除的原因分析 一文的补充,当然也可单独阅读,了解Filebeat的代码逻辑。需要注意的是:本文不是全面、深度的Filebeat源码剖析,而是专注在通用配置下核心数据的流转上面,目标是理清楚数据从采集到中间流转,最后到发送的流程,而不是对每处代码细节进行分析讲解。本文核心点包括:

  1. Filebeat实例的创建和初始化流程;
  2. 文件的遍历、采集,包括Crawler、Input、Harvester、Reader等核心概念;
  3. 数据发送逻辑,即Publisher/Pipeline;
  4. Filebeat的ACK机制。

另外,本文包含大量代码,但为了尽量减少代码所占篇幅和突出重点逻辑,大部分代码都做了删减,有的地方有说明,有的没有说明。如果想看完整代码,建议按照下面的指导搭建自己的源码环境,对照着阅读。当然,本文假设你已经对Filebeat基本使用比较熟悉了。

源码准备

Filebeat是Elastic公司Beat系列的一员,所有Beat的源码都归档在一起,所以下载源码直接克隆Beat的代码即可。需要注意的是早期Go的模块依赖使用的是govendor,这种方式需要代码在GOPATH的src目录下面,而Go 1.11版本中引入了Go module作为新的模块依赖,并且在1.12版本中正式启用,作为默认选项(当然用户可以通过GO111MODULE这个环境变量来控制是否使用Go module功能)。这个特性的转变对于搭建Filebeat的源码环境也有一些影响,查看一下你想阅读的Beat的分支是否有go.mod文件,如果有则说明使用的是Go module,否则还是老的govendor。如果是govendor,那你必须将代码放到${GOPATH}/src/github.com/elastic,且GO111MODULE设置为auto或者off;如果是Go module,则没有这个要求。至于Golang的版本,不要比官方要求的版本低就行。

本文的讲解是基于6.8分支的代码(6.8.15),该版本的beat模块依赖还是早期的govendor,所以用如下命令搭建源码环境:

mkdir -p ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats
cd ${GOPATH}/src/github.com/elastic/beats
git checkout 6.8
如果你无法科学上网,从github拉取beat的代码会很慢(主要是因为.git目录非常大),所以我把自己的代码打包传到了百度网盘,实在从github上拉不下来的可以直接下载我上传的(点此进入下载页面,密码: 6bro)。我提供了2个版本,一个包含git,一个不包含,都不影响代码阅读,没有git的好处是代码会小很多。

我的Golang版本是1.15.7,编辑器是vscode。代码下载好以后,直接运行根目录下面的filebeat目录中的main.go,如果可以运行,那环境就算搭建好了。注意:如果提示配置文件filebeat.yml找不到,那环境也没问题,可能是因为你启动Filebeat的目录没有这个配置文件,可以通过-c <配置文件>这个命令行参数去指定一个filebeat配置即可。另外,Filebeat日志默认只输出到文件,如果调试的时候想输出到终端,再增加一个-e参数。如果是vscode,则直接修改.vscode/launch.json即可:

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch file",
            "type": "go",
            "request": "launch",
            "mode": "debug",
            "program": "${file}",
            "args": ["-e"]
        }
    ]
}

filebeat与libbeat

熟悉Beat的应该都知道Beat里面有一个核心的libbeat模块,实现了很多基础功能,比如实例初始化、公共配置、队列、输出(output)、日志等,不同的Beat往往只需要实现如何采集数据,然后将数据以事件(event)的方式通过channel发给libbeat即可,这种方式大大简化了各个Beat的开发。下面是官方的一张说明Beat与libbeat关系的图:

beat-overview

上面提到的事件是在libbeat/beat/event.go中定义的:

// Event is the common event format shared by all beats.
// Every event must have a timestamp and provide encodable Fields in `Fields`.
// The `Meta`-fields can be used to pass additional meta-data to the outputs.
// Output can optionally publish a subset of Meta, or ignore Meta.
type Event struct {
    Timestamp time.Time
    Meta      common.MapStr
    Fields    common.MapStr
    Private   interface{} // for beats private use
}

Filebeat作为Beat的一员,也遵从上面的逻辑:Filebeat实现了文件的采集,并以event的方式发送给libbeat的Publisher,后续的工作都由libbeat完成。所以了解Filebeat完整的数据流程包含Filebeat和libbeat两部分代码。对应的代码在根目录下的filebeat目录和libbeat目录。为了方便理解,这里先给出一个精简的数据流图:

Filebeat精简数据流

各部分主要功能是:

  • Filebeat instance:Filebeat实例的创建和初始化;
  • Crawler:遍历配置中的input,创建、初始化Input实例;
  • Input:遍历配置的目录、文件,为每个需要采集的文件创建、初始化Harvester;
  • Harvester:读取文件内容,以event的方式发送到后端的队列(本文以最常使用的内存队列为例讲解);
  • Queue/Spooler:Filebeat内部的队列有两种:内存队列(Broker)和文件队列(Spooler),代码中文件队列叫spooler,但大多数地方spooler是queue的代名词,而不专指文件队列;
  • Publisher:libbeat中的发送逻辑,整个发送流程在代码中称为pipeline
  • Registrar:记录采集过的文件及位置(offset),和ACK机制一起实现Filebeat承诺的至少发送一次(at-least once)的保证。

下面分别介绍这些模块。

实例初始化 && Registrar

第一步是Filebeat的实例初始化。libbeat定义了一个Beater接口,不同的Beat实现这个接口,即可复用libbeat定义的诸多基础功能,这个接口定义如下:

// libbeat/beat/beat.go

// Beater is the interface that must be implemented by every Beat. A Beater
// provides the main Run-loop and a Stop method to break the Run-loop.
// Instantiation and Configuration is normally provided by a Beat-`Creator`.
//
// Once the beat is fully configured, the Run() method is invoked. The
// Run()-method implements the beat its run-loop. Once the Run()-method returns,
// the beat shuts down.
//
// The Stop() method is invoked the first time (and only the first time) a
// shutdown signal is received. The Stop()-method normally will stop the Run()-loop,
// such that the beat can gracefully shutdown.
type Beater interface {
    // The main event loop. This method should block until signalled to stop by an
    // invocation of the Stop() method.
    Run(b *Beat) error

    // Stop is invoked to signal that the Run method should finish its execution.
    // It will be invoked at most once.
    Stop()
}

// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
    Info      Info     // beat metadata.
    Publisher Pipeline // Publisher pipeline

    SetupMLCallback SetupMLCallback // setup callback for ML job configs
    InSetupCmd      bool            // this is set to true when the `setup` command is called

    OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback
    // XXX: remove Config from public interface.
    //      It's currently used by filebeat modules to setup the Ingest Node
    //      pipeline and ML jobs.
    Config *BeatConfig // Common Beat configuration data.

    BeatConfig *common.Config // The beat's own configuration section

    Fields []byte // Data from fields.yml

    ConfigManager management.ConfigManager // config manager
}

Filebeat在filebeat/beater/filebeat.go中实现了这个接口。我们看下Filebeat是如何从main函数(filebeat/main.go)流转到filebeat.go的:

// filebeat/main.go

// The basic model of execution:
// - input: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
    if err := cmd.RootCmd.Execute(); err != nil {
        os.Exit(1)
    }
}

main前面的注释概述了Filebeat的运行逻辑,也是前面图的由来。main方法里面的代码很简单,下面直接给出关键的调用关系:

  1. filebeat/main.go#main
  2. filebeat/cmd/root.go#init: RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
  3. libbeat/cmd/root.go#GenRootCmdWithSettings: rootCmd.RunCmd = genRunCmd(settings, beatCreator, runFlags)
  4. libbeat/cmd/run.go#genRunCmd: err := instance.Run(settings, beatCreator)
  5. libbeat/cmd/instance/beat.go#Run: return b.launch(settings, bt)
  6. libbeat/cmd/instance/beat.go#launch: return beater.Run(&b.Beat)
  7. filebeat/beater/filebeat.go#Run

这部分的代码流转主要是在libbeat中,所以是一些比较通用化的配置,主要是构造Beater实例(此处是Filebeat实例),并最终进入到filebeat.go的Run方法中。Run方法中有两个比较重要的操作:registrar和crawler的创建、启动。

Filebeat的at-least once采集就是通过Registrar模块实现的,该文件会记录采集过的文件以及最后采集的位置,并且只有收到数据成功发送的确认后才会更新,其核心结构如下:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int
}

每次重启Filebeat后,在registrar启动的时候会调用func (r *Registrar) loadStates()读取registry文件中的内容,和需要采集的文件作对比(func (p *Input) loadStates(states []file.State)),确认哪些文件采集过了,哪些有更新,需要接着上次的地方继续采集。

确认消息成功发送的回调(通过channel实现)函数是在Run方法里面注册的(这部分后面还会专门讲解):

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
    ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})

Registrar的记录文件默认为filebeat安装目录/data/registry,示例内容如下(为了方便查看,进行了格式化):

[{
    "source": "/Users/allan/Desktop/temp/test-logs/test-1.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.435218+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712351738,
        "device": 16777225
    }
}, {
    "source": "/Users/allan/Desktop/temp/test-logs/test-2.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.24922+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712603538,
        "device": 16777225
    }
}]

另外,为了保证数据不丢失,实例初始化的时候各大模块的启动顺序依次是:registrar --> publisher --> spooler --> crawler/input. 而实例停止的时候各大模块的停止顺序则正好相反。下面看Crawler和Input。

输入/文件遍历(Crawler && Input)

Crawler

我们知道filebeat采集数据是通过在配置文件里面配置若干个input实现的,而Crawler的作用就是解析这些input,并创建、启动Input。Crawler的核心代码在filebeat/crawler/crawler.go中。

核心结构定义如下:

type Crawler struct {
    inputs          map[uint64]*input.Runner   // 包含若干个Input
    inputConfigs    []*common.Config
    out             channel.Factory
    wg              sync.WaitGroup
    InputsFactory   cfgfile.RunnerFactory
    ModulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatVersion     string
    beatDone        chan struct{}
}

在Start方法中遍历配置中的input,并创建、启动input:

// Start starts the crawler with all inputs
func (c *Crawler) Start(
    pipeline beat.Pipeline,
    r *registrar.Registrar,
    configInputs *common.Config,
    configModules *common.Config,
    pipelineLoaderFactory fileset.PipelineLoaderFactory,
    overwritePipelines bool,
) error {

    logp.Info("Loading Inputs: %v", len(c.inputConfigs))

    // Prospect the globs/paths given on the command line and launch harvesters
    for _, inputConfig := range c.inputConfigs {
        err := c.startInput(pipeline, inputConfig, r.GetStates())
        if err != nil {
            return err
        }
    }

    // 省略部分代码
    logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))

    return nil
}

// 创建、启动Input
func (c *Crawler) startInput(
    pipeline beat.Pipeline,
    config *common.Config,
    states []file.State,
) error {
    if !config.Enabled() {
        return nil
    }

    connector := channel.ConnectTo(pipeline, c.out)
    p, err := input.New(config, connector, c.beatDone, states, nil)
    if err != nil {
        return fmt.Errorf("Error in initing input: %s", err)
    }
    p.Once = c.once

    if _, ok := c.inputs[p.ID]; ok {
        return fmt.Errorf("Input with same ID already exists: %d", p.ID)
    }

    c.inputs[p.ID] = p

    p.Start()

    return nil
}

Input

Filebeat支持多种类型的Input,比如log、redis、stdin、docker等,这些代码都在filebeat/input目录,不同类型的目录下面实现了特定的input。通用Input接口定义如下:

// filebeat/input/input.go
// Input is the interface common to all input
type Input interface {
    Run()
    Stop()
    Wait()
}

// Start starts the input
func (p *Runner) Start() {
    p.wg.Add(1)
    logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID)

    // 省略部分代码

    // Add waitgroup to make sure input is finished
    go func() {
        defer func() {
            onceWg.Done()
            p.stop()
            p.wg.Done()
        }()

        p.Run()
    }()
}

// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Runner) Run() {
    // Initial input run
    p.input.Run()

    // 省略部分代码
}

此处以最常用的log类型为例进行说明。p.input.Run()会跳转到filebeat/log/input.go#Run

// Input contains the input and its config
type Input struct {
    cfg           *common.Config
    config        config
    states        *file.States
    harvesters    *harvester.Registry   // 1个input包含多个harvesters
    outlet        channel.Outleter
    stateOutlet   channel.Outleter
    done          chan struct{}
    numHarvesters atomic.Uint32
    meta          map[string]string
    stopOnce      sync.Once
}


// Run runs the input
func (p *Input) Run() {
    logp.Debug("input", "Start next scan")

    // TailFiles is like ignore_older = 1ns and only on startup
    if p.config.TailFiles {
        ignoreOlder := p.config.IgnoreOlder

        // Overwrite ignore_older for the first scan
        p.config.IgnoreOlder = 1
        defer func() {
            // Reset ignore_older after first run
            p.config.IgnoreOlder = ignoreOlder
            // Disable tail_files after the first run
            p.config.TailFiles = false
        }()
    }
    p.scan()

    // It is important that a first scan is run before cleanup to make sure all new states are read first
    if p.config.CleanInactive > 0 || p.config.CleanRemoved {
        beforeCount := p.states.Count()
        cleanedStates, pendingClean := p.states.Cleanup()
        logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
            beforeCount, beforeCount-cleanedStates, pendingClean)
    }

    // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
    if p.config.CleanRemoved {
        for _, state := range p.states.GetStates() {
            // os.Stat will return an error in case the file does not exist
            stat, err := os.Stat(state.Source)
            if err != nil {
                if os.IsNotExist(err) {
                    p.removeState(state)
                    logp.Debug("input", "Remove state for file as file removed: %s", state.Source)
                } else {
                    logp.Err("input state for %s was not removed: %s", state.Source, err)
                }
            } else {
                // Check if existing source on disk and state are the same. Remove if not the case.
                newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
                if !newState.FileStateOS.IsSame(state.FileStateOS) {
                    p.removeState(state)
                    logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
                }
            }
        }
    }
}

对Filebeat配置比较熟悉的朋友看到这部分代码,应该很亲切,变量的命名和配置项几乎是对应的,很多判断逻辑都是对配置项的处理,很容易理解。其中比较关键的是p.scan():

// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {
    var sortInfos []FileSortInfo
    var files []string

    // 获取目录下需要被采集的文件,是否需要被采集的逻辑就是在getFiles()方法中实现
    paths := p.getFiles()

    // 省略部分代码

    for i := 0; i < len(paths); i++ {
        // 省略一些判断是否采集过,以及采集到哪里的代码

        // Decides if previous state exists
        if lastState.IsEmpty() {
            logp.Debug("input", "Start harvester for new file: %s", newState.Source)
            // 启动harvester
            err := p.startHarvester(newState, 0)
            // 省略错误处理代码
        } else {
            p.harvestExistingFile(newState, lastState)
        }
    }
}

// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
    logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)

    if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
        logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
        // 启动harvester采集
        err := p.startHarvester(newState, oldState.Offset)
        if err != nil {
            logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
        }
        return
    }
    // 省略后续代码
}

// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(state file.State, offset int64) error {
    if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
        p.numHarvesters.Dec()
        harvesterSkipped.Add(1)
        return errHarvesterLimit
    }
    // Set state to "not" finished to indicate that a harvester is running
    state.Finished = false
    state.Offset = offset

    // Create harvester with state
    h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })
    if err != nil {
        p.numHarvesters.Dec()
        return err
    }

    err = h.Setup()
    if err != nil {
        p.numHarvesters.Dec()
        return fmt.Errorf("error setting up harvester: %s", err)
    }

    // Update state before staring harvester
    // This makes sure the states is set to Finished: false
    // This is synchronous state update as part of the scan
    h.SendStateUpdate()

    if err = p.harvesters.Start(h); err != nil {
        p.numHarvesters.Dec()
    }
    return err
}

scan代码的核心逻辑就是遍历目录下的文件,找到需要采集的文件后就创建启动一个harvester实例进行采集。最后面的startHarvester方法中先是创建了一个harvester实例(p.createHarvester),然后配置(h.Setup)该实例,最后启动(p.harvesters.Start(h))实例。这部分在接下来的Harvester部分介绍。

filebeat/log/input.go文件中的代码中包含了大量配置项的代码逻辑,建议好好看一下。如果你对input部分的配置项比较熟悉,这部分代码看起来也比较简单。对照配置项说明文档进行查看,效果更佳。

数据采集Harvester

Harvester

一个Harvester就是一个goroutine,接口定义在filebeat/harvester/harvester.go中:

// Harvester contains all methods which must be supported by each harvester
// so the registry can be used by the input
type Harvester interface {
    ID() uuid.UUID      
    Run() error
    Stop()
}

不同的input类型会实现自己的Harvester,log类型的Harvester实现在filebeat/input/log/harvester.go中,核心结构定义如下:

// Harvester contains all harvester related data
type Harvester struct {
    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(*util.Data) bool

    onTerminate func()
}

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
    // 该方法在上篇文章中已经介绍过,下面省略掉了大部分代码,只保留了读取和发送
    for {
        // 读取
        message, err := h.reader.Next()

        // 发送
        if !h.sendEvent(data, forwarder) {
            return nil
        }
    }
}

上篇文章中已经介绍过Harvester,其核心任务就是打开文件,根据配置读取文件内容,并发送。读取和发送都是在Run中实现,见上面的代码注释。本文再补充介绍下另外一个关键点reader.Reader

各种Reader

其实读取的真正操作是由一系列Reader完成的。Reader接口定义在filebeat/reader/reader.go中:

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
    Next() (Message, error)
}

// Message represents a reader event with timestamp, content and actual number
// of bytes read from input before decoding.
type Message struct {
    Ts      time.Time     // timestamp the content was read
    Content []byte        // actual content read
    Bytes   int           // total number of bytes read to generate the message
    Fields  common.MapStr // optional fields that can be added by reader
}

该接口只包含一个Next方法,每调用一次,则读取一个Message。Filebeat实现了很多种Reader,这些Reader根据用户的配置形成一个调用链,对最原始的数据依次进行处理,就像一个流水线一样。每一个后面的Reader都包含了前面的Reader。这些Reader都定义在filebeat/reader目录,主要包括下面这些:

filebeat-reader

最底层的log.go#Read并不是一个Filebeat的reader.Reader实现,而是直接调用了Go底层的Read,实现读取指定长度的字节数据:

// filebeat/input/log/log.go
func (f *Log) Read(buf []byte) (int, error) {
    for {
        n, err := f.fs.Read(buf)  // 调用go底层os/file.go#Read
    }
}

// os/file.go#Read
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
    if err := f.checkValid("read"); err != nil {
        return 0, err
    }
    n, e := f.read(b)
    return n, f.wrapErr("read", e)
}

再往上都是各种reader.Reader的实现,依次如下(都省略了部分代码):

LineReader,实现逐行读取的功能:

// filebeat/reader/readfile/line.go
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
    reader     io.Reader
    codec      encoding.Encoding
    bufferSize int
    maxBytes   int
    nl         []byte
    inBuffer   *streambuf.Buffer
    outBuffer  *streambuf.Buffer
    inOffset   int // input buffer read offset
    byteCount  int // number of bytes decoded from input buffer into output buffer
    decoder    transform.Transformer
}

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
    for {
        err := r.advance()
    }
}

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *LineReader) advance() error {
    // fill inBuffer until '\n' sequence has been found in input buffer
    for idx == -1 {
        // try to read more bytes into buffer   filebeat/input/log/log.go#Read
        n, err := r.reader.Read(buf)
    }
}

EncoderReader,对行数据进行编解码:

// filebeat/reader/readfile/encode.go
// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type EncoderReader struct {
    reader *LineReader
}

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r EncoderReader) Next() (reader.Message, error) {
    c, sz, err := r.reader.Next()
    // Creating message object
    return reader.Message{
        Ts:      time.Now(),
        Content: c,
        Bytes:   sz,
    }, err
}

JSON Reader,处理JSON格式的数据:

// filebeat/reader/json/json.go
type JSON struct {
    reader reader.Reader
    cfg    *Config
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (reader.Message, error) {
}

StripNewline Reader,去掉后面的换行符:

// filebeat/reader/readfile/strip_newline.go
// StripNewline reader removes the last trailing newline characters from
// read lines.
type StripNewline struct {
    reader reader.Reader
}

// Next returns the next line.
func (p *StripNewline) Next() (reader.Message, error) {
}

Timeout Reader,超时处理:

// filebeat/reader/readfile/timeout.go
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
    reader  reader.Reader
    timeout time.Duration
    signal  error
    running bool
    ch      chan lineMessage
}

// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
}

Multiline Reader,多行合并处理:

// filebeat/reader/multiline/multiline.go
// MultiLine reader combining multiple line events into one multi-line event.
//
// Lines to be combined are matched by some configurable predicate using
// regular expression.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type Reader struct {
    reader       reader.Reader
    pred         matcher
    flushMatcher *match.Matcher
    maxBytes     int // bytes stored in content
    maxLines     int
    separator    []byte
    last         []byte
    numLines     int
    truncated    int
    err          error // last seen error
    state        func(*Reader) (reader.Message, error)
    message      reader.Message
}

// Next returns next multi-line event.
func (mlr *Reader) Next() (reader.Message, error) {
    return mlr.state(mlr)
}

LimitReader,单个event长度限制:

// filebeat/reader/readfile/limit.go
// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type LimitReader struct {
    reader   reader.Reader
    maxBytes int
}

// Next returns the next line.
func (r *LimitReader) Next() (reader.Message, error) {
    message, err := r.reader.Next()
    if len(message.Content) > r.maxBytes {
        message.Content = message.Content[:r.maxBytes]
        message.AddFlagsWithKey("log.flags", "truncated")
    }
    return message, err
}

这么多的Reader并非在所有场景下都是必须的,需要根据用户配置进行装配,这个操作是在初始化Harvester时进行的:

// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
    err := h.open()
    if err != nil {
        return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
    }

    // 在newLogFileReader中根据用户配置装配 Reader 流
    h.reader, err = h.newLogFileReader()
    if err != nil {
        if h.source != nil {
            h.source.Close()
        }
        return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
    }

    return nil
}

Harvester的Run方法中通过无限循环调用h.reader.Next()时,会依次递归调用这些Reader的Next方法,加工数据,得到的最终数据发给给后端。

发送逻辑(Harvester内部)

发送的调用流程如下:

  1. filebeat/input/log/harvester.go#Run(): h.sendEvent(data, forwarder)
  2. filebeat/input/log/harvester.go#sendEvent: forwarder.Send(data)
  3. filebeat/harvester/forwarder.go#Send: f.Outlet.OnEvent(data)
  4. filebeat/channel/util.go#OnEvent

实际的发送是由filebeat/channel/util.gofilebeat/channel/outlet.go中的代码协作完成的:

filebeat/channel/util.go:

// filebeat/channel/util.go
func (o *subOutlet) OnEvent(d *util.Data) bool {

    o.mutex.Lock()
    defer o.mutex.Unlock()
    select {
    case <-o.done:
        return false
    default:
    }

    select {
    case <-o.done:
        return false
    // 数据写入channel
    case o.ch <- d:
        select {
        case <-o.done:
            return true
        // 等待结果
        case ret := <-o.res:
            return ret
        }
    }
}

// filebeat/channel/util.go
// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
    s := &subOutlet{
        done: make(chan struct{}),
        ch:   make(chan *util.Data),
        res:  make(chan bool, 1),
    }

    go func() {
        // 从channel读取数据,并调用OnEvent发送数据
        for event := range s.ch {
            s.res <- out.OnEvent(event)
        }
    }()

    return s
}

filebeat/channel/outlet.go:

func (o *outlet) OnEvent(d *util.Data) bool {
    if !o.isOpen.Load() {
        return false
    }

    event := d.GetEvent()
    if d.HasState() {
        event.Private = d.GetState()
    }

    if o.wg != nil {
        o.wg.Add(1)
    }

    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish

    return o.isOpen.Load()
}

这部分代码逻辑是:

  1. 数据经由filebeat/channel/util.go#OnEvent中的case o.ch <- d写入到channel中,然后在内层的select中等待结果;
  2. 同时filebeat/channel/util.go#Outleter里面的一个goroutine会读取写入到channel的数据,并调用filebeat/channel/outlet.go#OnEvent中的o.client.Publish(event)发送数据,发送成功后,将结果写回到subOutlet的res channel,然后harvester返回。

上篇文章中harvester关闭不掉,就是因为卡在了o.client.Publish(event)无法返回,所以harvester卡在了subOutlet的res channel,即一直等待发送结果。

到目前为止,数据都是在Filebeat的代码中流转的,此处调用o.client.Publish(event)之后,就会进入到libbeat中(libbeat/publisher/pipeline/client.go#Publish)。

数据发送Publisher

从这部分开始,代码进入到libbeat中,这部分代码的复杂主要体现在涉及概念比较多,流转流程比较长。

Pipeline

Beat把Publisher中的整个流程称之为pipeline,是libbeat中非常核心的一部分,代码在libbeat/publisher/pipeline/pipeline.go中,这里贴一下包含较详细注释部分的代码:

// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline

// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most central entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
type Pipeline struct {
    beatInfo beat.Info

    logger *logp.Logger
    queue  queue.Queue
    output *outputController

    observer observer

    eventer pipelineEventer

    // wait close support
    waitCloseMode    WaitCloseMode
    waitCloseTimeout time.Duration
    waitCloser       *waitCloser

    // pipeline ack
    ackMode    pipelineACKMode
    ackActive  atomic.Bool
    ackDone    chan struct{}
    ackBuilder ackBuilder
    eventSema  *sema

    processors pipelineProcessors
}

我整理了一个图:

beat-pipeline

下面介绍pipeline中的各个环节。接前文,Harvester中会持有一个最左边的client实例(一个Input实例中的所有Harvester共享该Client),然后通过这个client调用Producer将数据发送到一个buffer,这是一个channel,大小硬编码为20。同时需要注意的一个点是client的publish中还执行了processor,也就是如果定义了processor,他是在发送的Client里面执行的。这部分功能对应代码如下(只保留了关键代码):

// filebeat/channel/outlet.go
func (o *outlet) OnEvent(d *util.Data) bool {
    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish
}

// libbeat/publisher/pipeline/client.go
func (c *client) publish(e beat.Event) {
    // 如果定义了processor,则在此处执行
    if c.processors != nil {
        var err error

        event, err = c.processors.Run(event)
        publish = event != nil
        if err != nil {
            // TODO: introduce dead-letter queue?

            log.Errorf("Failed to publish event: %v", err)
        }
    }

    var published bool
    if c.canDrop {
        published = c.producer.TryPublish(pubEvent)
    } else {
        published = c.producer.Publish(pubEvent) // queue/memqueue/produce.go
    }
}

// libbeat/publisher/queue/memqueue/produce.go
func (p *forgetfulProducer) Publish(event publisher.Event) bool {
    return p.openState.publish(p.makeRequest(event))
}
func (st *openState) publish(req pushRequest) bool {
    select {
    // 将数据发送到events buffer中
    case st.events <- req:
        return true
    case <-st.done:
        st.events = nil
        return false
    }
}

然后EventLoop从events buffer中读取数据写到batchBuffer中,batchBuffer是一个Slice,其大小为queue.mem.events(默认值为4096)。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/queue/memqueue/eventloop.go
// 创建EventLoop
func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
    l := &bufferingEventLoop{
        broker:       b,
        maxEvents:    size,
        minEvents:    minEvents,
        flushTimeout: flushTimeout,
        // 直接使用Broker的events
        events:    b.events,
        get:       nil,
        pubCancel: b.pubCancel,
        acks:      b.acks,
    }
    l.buf = newBatchBuffer(l.minEvents)

    l.timer = time.NewTimer(flushTimeout)
    if !l.timer.Stop() {
        <-l.timer.C
    }

    return l
}

func (l *bufferingEventLoop) run() {
    var (
        broker = l.broker
    )

    for {
        select {
        case <-broker.done:
            return

        case req := <-l.events: // producer pushing new event
            l.handleInsert(&req)
        }
    }
}

func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
    // insert会把数据写入batchBuffer
    if l.insert(req) {
        l.eventCount++
        if l.eventCount == l.maxEvents {
            // 队列面了就把chan设置为nil,此时写会被阻塞。等收到ack(handleACK)后又会恢复队列
            l.events = nil // stop inserting events if upper limit is reached
        }
    }
}

数据到batchBuffer之后,eventConsumer会按照用户配置的规则批量从batchBuffer读取数据,并写入workQueue,这是一个channel。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/pipeline/consumer.go
func (c *eventConsumer) loop(consumer queue.Consumer) {
    log := c.logger

    log.Debug("start pipeline event consumer")

    var (
        out    workQueue
        batch  *Batch
        paused = true
    )

    for {
        select {
        case <-c.done:
            log.Debug("stop pipeline event consumer")
            return
        case sig := <-c.sig:
            handleSignal(sig)
        // 将数据写入 workQueue
        case out <- batch:
            batch = nil
        }
    }
}

数据到workQueue之后,再由netClientWorker模块读取并通过调用output的Publish将数据真正的发送出去。这里以ElasticSearch类型的output为例展示代码流程(只保留了关键代码):

// libbeat/publisher/pipeline/output.go
func (w *netClientWorker) run() {
    for !w.closed.Load() {
        reconnectAttempts := 0

        // 从 workQueue读取数据
        // send loop
        for batch := range w.qu {
            if w.closed.Load() {
                if batch != nil {
                    batch.Cancelled()
                }
                return
            }
            // 发送到output
            err := w.client.Publish(batch) // libbeat/outputs/backoff.go
            if err != nil {
                logp.Err("Failed to publish events: %v", err)
                // on error return to connect loop
                break
            }
        }
    }
}

// libbeat/outputs/backoff.go
func (b *backoffClient) Publish(batch publisher.Batch) error {
    err := b.client.Publish(batch) // libbeat/outputs/elasticsearch/client.go
    if err != nil {
        b.client.Close()
    }
    backoff.WaitOnError(b.backoff, err)
    return err
}

// libbeat/outputs/elasticsearch/client.go
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // libbeat/publisher/pipeline/batch.go
    } else {
        batch.RetryEvents(rest)
    }
    return err
}

至此,整个pipeline的数据流程就算完成了,其实都是各种代码调用,并不复杂,只是需要花时间去看代码而已。接下来,再补充介绍一下pipeline中比较核心的spooler。

Queue

Filebeat提供了2种队列:内存队列和文件队列。实际中绝大多数应该用的都是内存队列,这里也仅介绍内存队列,文件队列的实现在libbeat/publisher/queue/spool目录下,有兴趣的自行查看,核心的东西和内存队列一致。内存队列的定义在libbeat/publisher/queue/memqueue目录下,定义队列的文件是broker.go

// 内存队列在代码中叫Broker
type Broker struct {
    done chan struct{}

    logger logger

    bufSize int
    // buf         brokerBuffer
    // minEvents   int
    // idleTimeout time.Duration

    // api channels
    events    chan pushRequest
    requests  chan getRequest
    pubCancel chan producerCancelRequest

    // internal channels
    acks          chan int
    scheduledACKs chan chanList

    eventer queue.Eventer

    // wait group for worker shutdown
    wg          sync.WaitGroup
    waitOnClose bool
}

前面的介绍的event buffer就是这里的events字段。该文件中还有一个NewBroker函数比较重要,里面是创建一个Broker,并且定义了eventLoop接口,该接口有2个实现:

  • directEventLoop(queue.mem.flush.min_events = 1
  • bufferingEventLoop(queue.mem.flush.min_events > 1

queue.mem.flush.min_events的默认值为2048,所以创建的是bufferingEventLoop,这里仅介绍该类型的EventLoop:

// libbeat/publisher/queue/memqueue/eventloop.go
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
    broker *Broker

    buf        *batchBuffer
    flushList  flushList
    eventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // active broker API channels
    events    chan pushRequest
    get       chan getRequest
    pubCancel chan producerCancelRequest

    // ack handling
    acks        chan int      // ackloop -> eventloop : total number of events ACKed by outputs
    schedACKS   chan chanList // eventloop -> ackloop : active list of batches to be acked
    pendingACKs chanList      // ordered list of active batches to be send to the ackloop
    ackSeq      uint          // ack batch sequence number to validate ordering

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}

前面提到的batchBuffer就是这里的buf字段。另外,看代码的时候注意区分一下spooler这个词的含义,大多数时候它都指代的是queue。同时,文件队列也称为spooler。

至此,整个pipeline就介绍完了。到这里,从Filebeat实例创建,到数据采集、发送都全部介绍完了。还差的就是ACK流程,这个是和数据发送流程相反的。

ACK流程

Filebeat基于Registrar+ACK的方式实现了至少发送一次的保证,Registrar前面已经介绍过了,最后看下ACK的流程,这部分相对复杂一些。

再看下Registrar的逻辑:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int
}


func (r *Registrar) Run() {
    logp.Debug("registrar", "Starting Registrar")
    // Writes registry on shutdown
    defer func() {
        r.writeRegistry()
        r.wg.Done()
    }()

    var (
        timer  *time.Timer
        flushC <-chan time.Time
    )

    for {
        select {
        case <-r.done:
            logp.Info("Ending Registrar")
            return
        case <-flushC:
            flushC = nil
            timer.Stop()
            r.flushRegistry()
        case states := <-r.Channel:         // 依靠这个channel通信
            // 收到channel中的确认信息之后,将数据写入registry文件
            r.onEvents(states)
            if r.flushTimeout <= 0 {
                r.flushRegistry()
            } else if flushC == nil {
                timer = time.NewTimer(r.flushTimeout)
                flushC = timer.C
            }
        }
    }
}

Registrar结构定义了一个Channel字段,这个channel就是用来接收ack消息的。Run方法里面从这个channel读取数据。然后看下ack数据是如何写到这个channel的。

首先filebeat.go初始化时注册了全局ACK处理回调:

// filebeat/beater/filebeat.go
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

    // Make sure all events that were published in
    registrarChannel := newRegistrarLogger(registrar)

    // 注册消息成功发送的回调
    err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
        ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
    })
    if err != nil {
        logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
        return err
    }
}
// filebeat/beater/filebeat.go
func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
    return &registrarLogger{
        done: make(chan struct{}),
        ch:   reg.Channel,
    }
}

// filebeat/beater/channels.go
type registrarLogger struct {
    done chan struct{}
    ch   chan<- []file.State
}
// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:
    }
}

// filebeat/beater/acker.go
type statefulLogger interface {
    Published(states []file.State)
}
// filebeat/beater/acker.go
func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
    return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}

// 注册的回调函数
func (a *eventACKer) ackEvents(data []interface{}) {
    stateless := 0
    states := make([]file.State, 0, len(data))
    for _, datum := range data {
        if datum == nil {
            stateless++
            continue
        }

        st, ok := datum.(file.State)
        if !ok {
            stateless++
            continue
        }

        states = append(states, st)
    }

    if len(states) > 0 {
        a.log.Debugw("stateful ack", "count", len(states))
        a.stateful.Published(states) // filebeat/beater/channels.go: func (l *registrarLogger) Published(states []file.State)
    }

    if stateless > 0 {
        a.log.Debugw("stateless ack", "count", stateless)
        a.stateless.Published(stateless) //
    }
}

这里最终注册的回调是eventACKer,里面的重点是a.stateful.Published,看下这个的实现:

// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:
    }
}

里面将最终的ack消息(states)写到了l.ch。这个channel就是Registrar那里的channel(从注册的代码里面可以分析出来),即回调函数将ack消息写入channel,然后Registrar从channel中读取states数据,写入registry文件,这样形成一个闭环。如下图:

ack-1

现在的问题就是:这个ackEvents回调函数的ack又是哪来的呢?是谁(who),在什么地方(where),什么时候(when),以何种方式(how)发送到ackEvents的?首先推断一下,既然是ack,那最源头当然应该是最终发送数据的地方发出,即发送数据完成得到外部确认之后,反向传递ack,正好和采集的时间传递方向相反,也就是核心应该在Publisher里面,或者说libbeat的pipeline里面。下面从pipeline中最核心的(内存)队列Broker模块开始分析。

// libbeat/publisher/queue/memqueue/broker.go
// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
    logger logger,
    settings Settings,
) *Broker {

    var eventLoop interface {
        run()
        processACK(chanList, int)
    }

    // 创建EventLoop
    if minEvents > 1 {
        eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
    } else {
        eventLoop = newDirectEventLoop(b, sz)
    }

    b.bufSize = sz
    // 创建AckLoop
    ack := newACKLoop(b, eventLoop.processACK)

    b.wg.Add(2)
    go func() {
        defer b.wg.Done()
        eventLoop.run()
    }()
    // 这个goroutine中启动 ack
    go func() {
        defer b.wg.Done()
        ack.run()
    }()
}

// libbeat/publisher/queue/memqueue/ackloop.go
func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
    l := &ackLoop{broker: b}
    l.processACK = processACK
    return l
}

NewBroker中创建了eventLoop和ackLoop,前者用于发送数据,前面已经讲过,后者则用于处理ack。看下ackLoop的代码:

// libbeat/publisher/queue/memqueue/ackloop.go
// ackLoop implements the brokers asynchronous ACK worker.
// Multiple concurrent ACKs from consecutive published batches will be batched up by the
// worker, to reduce the number of signals to return to the producer and the
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
    broker *Broker
    sig    chan batchAckMsg     // 确认消息发送的channel
    lst    chanList

    totalACK   uint64
    totalSched uint64

    batchesSched uint64
    batchesACKed uint64

    processACK func(chanList, int)
}

func (l *ackLoop) run() {
    for {
        select {
        case <-l.broker.done:
            return

        case acks <- acked:
            acks, acked = nil, 0

        case lst := <-l.broker.scheduledACKs:
            count, events := lst.count()
            l.lst.concat(&lst)

            l.batchesSched += uint64(count)
            l.totalSched += uint64(events)

        // 这里等待batch发送完成的确认信号
        case <-l.sig:
            acked += l.handleBatchSig()
            if acked > 0 {
                acks = l.broker.acks
            }
        }
    }
}

可以看到,ackloop中在等待batch发送完成的信号(sig),这里有2条线:

  1. 信号如何来的?
  2. 收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?

先来看第1个问题:信号如何来的?根据前面的推断,应该由发送数据那里产生。而由pipeline部分的分析知道最终数据发送在output那里。此处继续以ES这种类型的output为例,看下最终发送的代码,从那里反推:

// libbeat/outputs/elasticsearch/client.go  这是之前ES部分最终的发送代码
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // 重点看这里的ACK,这个会跳转到libbeat/publisher/pipeline/batch.go
    } else {
        batch.RetryEvents(rest)
    }
    return err
}

// libbeat/publisher/pipeline/batch.go
func (b *Batch) ACK() {
    b.ctx.observer.outBatchACKed(len(b.events)) // libbeat/publisher/pipeline/monitoring.go: func (o *metricsObserver) outBatchACKed(int) {}  这个是监控用的
    b.original.ACK()   // 重点看这里: libbeat/publiser/queue/memequeue/consumer.go
    releaseBatch(b)
}

// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) ACK() {
    if b.state != batchActive {
        switch b.state {
        case batchACK:
            panic("Can not acknowledge already acknowledged batch")
        default:
            panic("inactive batch")
        }
    }

    b.report()      // 重点在这里
}
// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) report() {
    b.ack.ch <- batchAckMsg{}      // 最终在这里发送了确认ACK
}

Bingo!在Publish里面发送完成(可能失败,可能成功)之后,就会发送ACK。然后根据调用关系往回推,最终在report中发送了ack,第1个问题就解答了,ack信号就是这样来的。然后看第2个问题:收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?接着看收到信号后调用的handleBatchSig的代码:

// libbeat/publisher/queue/memqueue/ackloop.go
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {

    if count > 0 {
        if e := l.broker.eventer; e != nil {
            e.OnACK(count)
        }
        // 这里会调用之前EventLoop的processACK,我们用的是bufferingEventLoop,所以会调用(*bufferingEventLoop)processACK
        // report acks to waiting clients
        l.processACK(lst, count) // libbeat/publisher/queue/memqueue/eventloop.go#(*bufferingEventLoop)processACK
    }

}

// libbeat/publisher/queue/memqueue/eventloop.go
func (l *bufferingEventLoop) processACK(lst chanList, N int) {
    for !lst.empty() {
            // 重点在这里,这里会调用ackEvents发送确认消息
            st.state.cb(int(count)) // libbeat/publisher/pipeline/acker.go (a *eventDataACK) ackEvents(n int)
        }
    }
}

handleBatchSig里面调用了对应类型的eventLoop的processACK方法,该方法内部会转到pipeline的acker.go,下面给出代码流转:

  1. libbeat/publisher/queue/memqueue/eventloop.go: (l *bufferingEventLoop) processACK --> st.state.cb(int(count))
  2. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) }
  3. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) }
  4. libbeat/publisher/pipeline/acker.go: func (a *gapCountACK) ackEvents(n int) {}

看下最后一个ackEvents:

func (a *gapCountACK) ackEvents(n int) {
    select {
    case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event
        a.acks = nil
    // ack数n写入了a.acks
    case a.acks <- n: // send ack event to worker
    }
}

// gapCountACK returns event ACKs to the producer, taking account for dropped events.
// Events being dropped by processors will always be ACKed with the last batch ACKed
// by the broker. This way clients waiting for ACKs can expect all processed
// events being always ACKed.
type gapCountACK struct {
    pipeline *Pipeline

    fn func(total int, acked int)

    done chan struct{}

    drop chan struct{}
    acks chan int

    events atomic.Uint32
    lst    gapList
}

数据会写入gapCountACKacks之后,会在ackLoop中读取:

// libbeat/publisher/pipeline/acker.go
func (a *gapCountACK) ackLoop() {
    acks, drop := a.acks, a.drop
    closing := false
    for {
        select {
        case <-a.done:
            closing = true
            a.done = nil
            if a.events.Load() == 0 {
                // stop worker, if all events accounted for have been ACKed.
                // If new events are added after this acker won't handle them, which may
                // result in duplicates
                return
            }

        case <-a.pipeline.ackDone:
            return

        case n := <-acks:
            // 重点:从acks读出n之后调用handleACK处理
            empty := a.handleACK(n)
            if empty && closing && a.events.Load() == 0 {
                // stop worker, if and only if all events accounted for have been ACKed
                return
            }

        case <-drop:
            // TODO: accumulate multiple drop events + flush count with timer
            a.events.Sub(1)
            a.fn(1, 0)
        }
    }
}

func (a *gapCountACK) handleACK(n int) bool {
    a.events.Sub(uint32(total))
    a.fn(total, acked) // line 326: func (a *boundGapCountACK) onACK(total, acked int) {
    return emptyLst
}

从acks读出n之后调用handleACK处理,后续的调用流程如下:

1. libbeat/publisher/pipeline/acker.go: handleACK --> a.fn(total, acked)
2. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) onACK(total, acked int) --> a.fn(total, acked)
3. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) onACK(total, acked int) --> a.fn(data, acked) 
4. libbeat/publisher/pipeline/pipeline_ack.go: func (p *pipelineEventCB) onEvents(data []interface{}, acked int)

最终进入到pipelineEventCB中,这个结构是内部处理ack的(我理解pipelineEventCB命名的含义是pipeline中event的callback函数,可见它是处理ack的核心),看下关键代码:

// libbeat/publisher/pipeline/pipeline_ack.go
// pipelineEventCB internally handles active ACKs in the pipeline.
// It receives ACK events from the queue and the individual clients.
// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect
// events from all clients having published events in the last batch of events
// being ACKed.
// the PipelineACKHandler will be notified, once all events being ACKed
// (including dropped events) have been collected. Only one ACK-event is handled
// at a time. The pipeline global and clients ACK handler will be blocked for the time
// an ACK event is being processed.
type pipelineEventCB struct {
    done chan struct{}

    acks chan int

    // 这个字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel
    events        chan eventsDataMsg    
    droppedEvents chan eventsDataMsg

    mode    pipelineACKMode
    handler beat.PipelineACKHandler
}

其中的events字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel。接着之前的调用,看数据如何写到events这个channel:

// reportEvents sends a batch of ACKed events to the ACKer.
// The events array contains send and dropped events. The `acked` counters
// indicates the total number of events acked by the pipeline.
// That is, the number of dropped events is given by `len(events) - acked`.
// A client can report events with acked=0, iff the client has no waiting events
// in the pipeline (ACK ordering requirements)
//
// Note: the call blocks, until the ACK handler has collected all active events
//       from all clients. This ensure an ACK event being fully 'captured'
//       by the pipeline, before receiving/processing another ACK event.
//       In the meantime the queue has the chance of batching-up more ACK events,
//       such that only one ACK event is being reported to the pipeline handler
func (p *pipelineEventCB) onEvents(data []interface{}, acked int) {
    p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked})
}

func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) {
    if msg.acked == 0 {
        p.droppedEvents <- msg
    } else {
        msg.sig = make(chan struct{})
        p.events <- msg // 此处写入channel后,在(p *pipelineEventCB) worker()的collect中读出,最后reportEventsData
        <-msg.sig
    }
}

pushMsg中,将消息写入events中。然后看下channel另一端的读取:

func (p *pipelineEventCB) worker() {
    defer close(p.acks)
    defer close(p.events)
    defer close(p.droppedEvents)

    for {
        select {
        case count := <-p.acks:
            // 在collect中读取消息
            exit := p.collect(count)
            if exit {
                return
            }

            // short circuit dropped events, but have client block until all events
            // have been processed by pipeline ack handler
        case msg := <-p.droppedEvents:
            p.reportEventsData(msg.data, msg.total)
            if msg.sig != nil {
                close(msg.sig)
            }

        case <-p.done:
            return
        }
    }
}

func (p *pipelineEventCB) collect(count int) (exit bool) {
    for acked < count {
        var msg eventsDataMsg
        select {
        // 在此处读取消息
        case msg = <-p.events:
        case msg = <-p.droppedEvents:
        case <-p.done:
            exit = true
            return
        }
    }

    p.reportEventsData(data, total)
    return
}

func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) {
    // report ACK back to the beat
    switch p.mode {
    case countACKMode:
        p.handler.ACKCount(total)
    case eventsACKMode:
        // 这里调用之前在 Filebeat中注册的 ACKEvents
        p.handler.ACKEvents(data) // filebeat/beater/acker.go: func (a *eventACKer) ackEvents(data []interface{})
    case lastEventsACKMode:
        p.handler.ACKLastEvents(data)
    }
}

可以看到从channel中读出确认消息后,最终会在reportEventsData中调用之前在Filebeat中注册的ACKEvents:eventACKer。至此,第2个问题也得到了解答,并且和之前的回调函数成功对接。把之前的图补充完整如下:

ack-2

总结

本文从源码角度对Filebeat的核心数据流转进行了简单的分析,为了突出重点,省掉了很多细节代码,比如各个环节的数据结构的实例是何时创建的,又是何时启动的,以及一些异常分支的处理、失败重传等。正如前文所说,本文并不是一篇全面的Filebeat代码深度剖析,而是通用配置下的核心数据流转分析。这篇文章我改了很多遍,总感觉没写什么东西,只是贴了了大量代码,然后讲了代码的调用点,而这些其实自己debug几遍也就清楚了。不过话说回来,如果事先知道整个逻辑流程,以及关键调用点,就能在debug时做到胸有成竹、有所侧重,从而节省很多时间。我想这篇文章对我最大的意义有两点:一个是让自己对Filebeat更加了解,另外一个是可能会给其它看源码的人节省一些时间。这就够了。

Reference

]]>
3 http://niyanchun.com/filebeat-source-learning.html#comments http://niyanchun.com/feed/tag/Filebeat/
Filebeat导致文件无法被删除的原因分析 http://niyanchun.com/filebeat-cannot-delete-files.html http://niyanchun.com/filebeat-cannot-delete-files.html Sat, 27 Feb 2021 12:12:00 +0800 NYC 之前有市场反馈Filebeat导致文件删除后磁盘空间无法释放,于是分析并重现了一下,下面记录一下。

熟悉Linux的人都知道,文件删除实际是将其inode链接数减1,当inode链接数变为0,且没有其它进程访问该文件时,操作系统就会真正删除文件,回收资源。“Filebeat导致文件删除后磁盘空间无法释放”的原因就是因为Filebeat打开着文件,所以系统无法真正删除文件,释放磁盘空间。而这个在官方文档里面其实是有说明的,且给出了解决方案:

Filebeat keeps open file handlers of deleted files for a long time?

In the default behaviour, Filebeat opens the files and keeps them open until it reaches the end of them. In situations when the configured output is blocked (e.g. Elasticsearch or Logstash is unavailable) for a long time, this can cause Filebeat to keep file handlers to files that were deleted from the file system in the mean time. As long as Filebeat keeps the deleted files open, the operating system doesn’t free up the space on disk, which can lead to increase disk utilisation or even out of disk situations.

To mitigate this issue, you can set the close_timeout setting to 5m. This will ensure every file handler is closed once every 5 minutes, regardless of whether it reached EOF or not. Note that this option can lead to data loss if the file is deleted before Filebeat reaches the end of the file.

第一部分说明了问题原因:当output也就是后端阻塞时,filebeat就会一直打开着文件。这样即使我们使用rm删除了文件,系统也没法真正删除文件并释放磁盘空间,这个问题可以很容易的复现一下。

问题复现

准备一个采集配置:

# test-delete.yml
filebeat.inputs:
  - type: log
    enabled: true      
    paths:
      - /tmp/test-logs/*.log
    close_inactive: 30s   # 默认5m
    close_removed: true   # 默认就是开启的

output.elasticsearch:
  enabled: true
  hosts: ["127.0.0.1:9200"]

queue.mem:
  events: 32
  flush.min_events: 32

注意:

  1. 要模拟后端阻塞,所以配置一个不存在的后端即可,这里使用的是ES,换成其它Filebeat支持的可阻塞后端即可(比如Kafka、Logstash),当然不论配哪个,都不需要真正去部署;
  2. 为了方便复现,将Filebeat内部的队列设置成最小值32;
  3. 准备一个日志文件,里面的日志行数最少有53行(后面解释原因)。

比如我在/tmp/test-logs/目录下创建了一个test-1.log,里面有53行日志:

line1
line2
...
line53

然后运行Filebeat:

➜  filebeat-6.8.8-linux-x86_64 ./filebeat -e -c delete-test.yml 
2021-02-27T10:40:15.452+0800    INFO    instance/beat.go:611    Home path: [/root/software/filebeat-6.8.8-linux-x86_64] Config path: [/root/software/filebeat-6.8.8-linux-x86_64] Data path: [/root/software/filebeat-6.8.8-linux-x86_64/data] Logs path: [/root/software/filebeat-6.8.8-linux-x86_64/logs]
# 省略中间日志
2021-02-27T10:40:15.470+0800    INFO    registrar/registrar.go:141      States Loaded from registrar: 0
2021-02-27T10:40:15.470+0800    INFO    crawler/crawler.go:72   Loading Inputs: 1
2021-02-27T10:40:15.471+0800    INFO    log/input.go:148        Configured paths: [/tmp/test-logs/*.log]
2021-02-27T10:40:15.471+0800    INFO    input/input.go:114      Starting input of type: log; ID: 13166799806221673713 
2021-02-27T10:40:15.471+0800    INFO    crawler/crawler.go:106  Loading and starting Inputs completed. Enabled inputs: 1
2021-02-27T10:40:15.472+0800    INFO    log/harvester.go:255    Harvester started for file: /tmp/test-logs/test-1.log
2021-02-27T10:40:15.474+0800    INFO    pipeline/output.go:95   Connecting to backoff(elasticsearch(http://127.0.0.1:9200))
2021-02-27T10:40:17.476+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(elasticsearch(http://127.0.0.1:9200)): Get http://127.0.0.1:9200: dial tcp 127.0.0.1:9200: connect: connection refused
2021-02-27T10:40:17.476+0800    INFO    pipeline/output.go:93   Attempting to reconnect to backoff(elasticsearch(http://127.0.0.1:9200)) with 1 reconnect attempt(s)
# 省略重试过程
2021-02-27T10:40:37.847+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(elasticsearch(http://127.0.0.1:9200)): Get http://127.0.0.1:9200: dial tcp 127.0.0.1:9200: connect: connection refused
2021-02-27T10:40:37.847+0800    INFO    pipeline/output.go:93   Attempting to reconnect to backoff(elasticsearch(http://127.0.0.1:9200)) with 4 reconnect attempt(s)
# 注意看里面的"harvester":{"open_files":1,"running":1,"started":1}}
2021-02-27T10:40:45.472+0800    INFO    [monitoring]    log/log.go:144  Non-zero metrics in the last 30s        {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":70,"time":{"ms":73}},"total":{"ticks":140,"time":{"ms":151},"value":0},"user":{"ticks":70,"time":{"ms":78}}},"handles":{"limit":{"hard":100000,"soft":100000},"open":7},"info":{"ephemeral_id":"3f417841-c5cb-409b-ba88-16e577a0f07f","uptime":{"ms":30052}},"memstats":{"gc_next":4194304,"memory_alloc":2242696,"memory_total":5271520,"rss":27172864}},"filebeat":{"events":{"active":53,"added":54,"done":1},"harvester":{"open_files":1,"running":1,"started":1}},"libbeat":{"config":{"module":{"running":0}},"output":{"type":"elasticsearch"},"pipeline":{"clients":1,"events":{"active":53,"filtered":1,"published":52,"retry":128,"total":54}}},"registrar":{"states":{"current":1,"update":1},"writes":{"success":2,"total":2}},"system":{"cpu":{"cores":8},"load":{"1":0.57,"15":1.18,"5":0.85,"norm":{"1":0.0712,"15":0.1475,"5":0.1063}}}}}}

可以看到后端不通,而且后面的monitor部分显示“"open_files":1”,也就是文件一直被打开着。此时,删除正在被采集的文件/tmp/test-logs/test-1.log,然后观察一下:

# 删除文件
➜  test-logs rm test-1.log
# ll查看文件,的确已经不存在了
➜  test-logs ll
total 0
# 使用lsof +L1查看
➜  test-logs lsof +L1
COMMAND    PID USER   FD   TYPE DEVICE SIZE/OFF NLINK       NODE NAME
filebeat 18777 root    5r   REG  253,0      362     0 1115687240 /tmp/test-logs/test-1.log (deleted)

lsof +L1命令可以查看已经被删除但还被进程打开的文件。可以看到用rm删除之后,文件依旧被Filebeat占用,所以导致操作系统无法真正释放资源。此时,停掉Filebeat进程,再用lsof查看,就会发现已经没有了,文件已经被真正删除了。这样这个问题就复现了。

这部分最后解答一下之前为什么要求测试日志里面必须至少有53行日志?这个得结合Filebeat的原理和代码进行说明,后面计划单独写一篇Filebeat源码解析的文章,这里只简单解答这个问题。Filebeat的采集过程如下图:

filebeat-pipeline

  1. input:扫描要采集的文件,每个文件打开一个harvester(即一个gorouting);
  2. harvester:读取文件内容,以事件(event)的形式发送到spooler;
  3. spooler:即filebeat内部的队列,有内存和文件两种队列,默认为内存队列;
  4. publisher:负责从spooler中读取事件发送到后端(output),发送成功后,会给registrar发送ack;
  5. registrar:收到事件成功发送的ack之后,就将状态(采集的文件、及位置等信息)记录到registry文件中。

在上述过程中,不同的功能是由不用的goroutine实现的,这些goroutine之间就是通过go的channel通信的。上面画的是一个非常精简的流程,实际过程会比这个复杂一些:

filebeat-internal

实际上harvester是将数据发送到producer的队列(channel),这个队列的大小是写死的20(libbeat/publisher/queue/memqueue/broker.go: chanSize := 20,这是个channel),然后数据又会通过EventLoop转移到一个batchBuffer,这个缓存的大小才是在配置文件里面配的queue.mem.events。其实后面还有个consumer又将数据搬到另外一个output队列,最后由有一个netClientWorker负责将数据发送出去,这些留到下篇源码解析文章讨论。现在由于后端不通,所以数据会一直积攒在batchBuffer里面。这样harvester能够发送的总数据大小就是20+queue.meme.events=20+32=52了。这就是53的由来,整个链路最大只能容纳52个事件,第53个事件来的时候,队列已经满了,harvester发送的时候就会阻塞在broker的channel上面。queue.meme.events的默认值是4096,如果不修改的话,那就需要多准备一些日志。

问题解决

上面官方给的解决方法是close_timeout参数。在讨论这个参数之前,我们先想一下close_inactiveclose_removed两个参数能不能解决问题:

  • close_inactive:当文件在该参数设置的时间段内没有更新的话就关闭harvester,当然同时也会关闭文件;
  • close_removed:如果文件在采集阶段被删除了,就关闭harvester,当然同时也会关闭文件;

从这两个参数的功能来看,似乎可以避免上面的问题。但这两个配置在上面复现时也配置了,结果是并没有解决问题,为什么呢?这个就得从Filebeat的代码层面来解答了。看下harvester的部分代码:

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
    // 删除部分代码

    // Closes reader after timeout or when done channel is closed
    // This routine is also responsible to properly stop the reader
    go func(source string) {

        closeTimeout := make(<-chan time.Time)
        // starts close_timeout timer
        if h.config.CloseTimeout > 0 {
            closeTimeout = time.After(h.config.CloseTimeout)
        }

        select {
        // Applies when timeout is reached
        case <-closeTimeout:
            logp.Info("Closing harvester because close_timeout was reached: %s", source)
        // Required when reader loop returns and reader finished
        case <-h.done:
        }

        h.stop()
        h.log.Close()
    }(h.state.Source)

    logp.Info("Harvester started for file: %s", h.state.Source)

    for {
        select {
        case <-h.done:
            return nil
        default:
        }

        message, err := h.reader.Next()
        if err != nil {
            switch err {
            case ErrFileTruncate:
                logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
                h.state.Offset = 0
                filesTruncated.Add(1)
            case ErrRemoved:
                logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
            case ErrRenamed:
                logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
            case ErrClosed:
                logp.Info("Reader was closed: %s. Closing.", h.state.Source)
            case io.EOF:
                logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
            case ErrInactive:
                logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
            case reader.ErrLineUnparsable:
                logp.Info("Skipping unparsable line in file: %v", h.state.Source)
                //line unparsable, go to next line
                continue
            default:
                logp.Err("Read line error: %v; File: %v", err, h.state.Source)
            }
            return nil
        }

        // 删除部分代码
        
        // Always send event to update state, also if lines was skipped
        // Stop harvester in case of an error
        if !h.sendEvent(data, forwarder) {
            return nil
        }

        // Update state of harvester as successfully sent
        h.state = state
    }
}

分析一下这部分代码(为了突出重点,把不相关的代码删掉了):

  1. 前面启动了一个goroutine,如果定义了close_timeout,就会启动一个定时器,当定时器触发后,就会关闭当前goroutine,这也是官方文档中提到的解决方式可行的原因;
  2. for循环里面的h.sendEvent(data, forwarder)就是向后边队列(实际是大小为20的channel)发送数据的;
  3. for循环里面的h.reader.Next()是读取文件的。

从这个逻辑就可以解释为什么close_inactiveclose_removed不能解决问题了:因为它们两个配置的逻辑是在for循环里面的switch部分的。当队列满了以后,当前harvester会阻塞在h.reader.Next()发送这里,根本无法回到前面的switch部分。不过反过来大家也可以实验一下,如果你的日志行数少于20+queue.mem.events,那在队列满之前就会走到switch部分,这样harvester就会关闭,同时也会关闭文件,就不会产生上面问题了。

close_timeout能解决问题就是因为它的功能是在另外一个独立的goroutine中实现的,所以即使采集的goroutine阻塞在channel上了,它也可以从“外部”将阻塞的goroutine停掉。但这个参数并非完美无缺,它是有弊端的,主要有两个潜在的问题:

  1. 可能造成数据丢失。因为文件没采集完,harvester就被强制关闭了。特别是当close_timeout大于等于ignore_older时,大概率会丢失数据。因为当close_timeout时间到了的时候,ignore_older的时间也到了,此时如果该文件数据没有采集完,且后面该文件没有再更新的话,这个文件里面剩余的数据就丢了,永远不会被采集了。所以如果设置这个参数,一定要保证它的值不大于ignore_older
  2. 如果使用了multiline,该合并成一个event的数据可能会被拆分成多个event。如果在multiline处理阶段超时时间到了的话,harvester就会将已经合并的部分作为一个event发送。当下次这个文件被采集时,剩下的部分又会被当成另外一个event发送。

总结

Filebeat有非常多的参数可以配置,但这也是一把双刃剑,如果你使用的好,它就是一个非常灵活,高度可配置的工具;使用的不好,就可能会丢数据,或者占用非常多的资源,你还不知道为什么。比如本文讨论的这个占用文件导致文件无法被删除的场景,虽然可能比较少,但的确可能会出现。当问题出现的时候,你必须明白为什么,以及使用close_timeout的时候也要知道他可能导致的问题,以及如何通过联动多个相关配置项来尽量去避免它的弊端。

]]>
0 http://niyanchun.com/filebeat-cannot-delete-files.html#comments http://niyanchun.com/feed/tag/Filebeat/
filebeat数据重复和截断及丢失问题分析 http://niyanchun.com/filebeat-truncate-bug.html http://niyanchun.com/filebeat-truncate-bug.html Sat, 31 Aug 2019 23:18:00 +0800 NYC 本文讨论Filebeat收集文件的时候可能产生的数据重复或者数据截断及丢失的问题。

数据重复

关于数据重复我们先看关于Filebeat的一段官方描述:

Filebeat guarantees that events will be delivered to the configured output at least once and with no data loss. Filebeat is able to achieve this behavior because it stores the delivery state of each event in the registry file.

从这里可以看出Filebeat对于收集到的数据(即event)的传输保证的是"at least once",而不是"exactly once",也就是Filebeat传输的数据是有可能有重复的。这里我们讨论一下可能产生重复数据的一些场景,我大概将其分为两类。

第一类:Filebeat重传导致数据重复。重传是因为Filebeat要保证数据至少发送一次,进而避免数据丢失。具体来说就是每条event发送到output后都要等待ack,只有收到ack了才会认为数据发送成功,然后将状态记录到registry。当然实际操作的时候为了高效是批量发送,批量确认的。而造成重传的场景(也就是没有收到ack)非常多,而且很多都不可避免,比如后端不可达、网络传输失败、程序突然挂掉等等。

第二类:配置不当或操作不当导致文件重复收集。Filebeat感知文件有没有被收集过靠的是registry文件里面记录的状态,如果一个文件已经被收集过了,但因为各种原因它的状态从registry文件中被移除了,而恰巧这个文件还在收集范围内,那就会再收集一次。

对于第一类产生的数据重复一般不可避免,而第二类可以避免,但总的来说,Filebeat提供的是at least once的机制,所以我们在使用时要明白数据是可能重复的。如果业务上不能接受数据重复,那就要在Filebeat之后的流程中去重。

数据截断或丢失

上面的官方描述中说了Filebeat可以保证”no data loss“,但实际中并非如此。就跟各种流处理框架宣传的无比优秀,比如可以保证”exactly once message processing“,但实际是情况是这种保证都是有前提条件的,只是更加优秀的框架这个前提条件更少而已(比如flink),但不管怎样,宣传中一般都绝口不提那些前提条件,因为别人都不提,我如果提了,那只有坏处,没有好处。有点跑远了。Filebeat这里保证的没有数据丢失也同样是有前提条件的,就是只保证数据传输时不丢失。说的再清楚一点就是如果你的数据在采集阶段没有丢失,那后面的传输可以保证不丢(因为有ack的确认和重传机制),但如果数据在采集阶段就丢失了,那就真丢了。Filebeat自身机制方面的一些缺陷,导致即使你的使用方式完全正确,数据在采集这一步就丢失的问题依旧是无法完全避免的,我们能做的就是明白丢失的原因,并有针对性的降低丢失的可能性。

接着我们来分析一下这个机制中的缺陷。Filebeat处理文件时会维护一个状态,这个状态里面记录了收集过的每一个带绝对路径的文件名,文件的inode值,文件内容上次收集的位置(即offset)以及其它一些信息。这个状态维护在内存里面,过一段时间会刷新到磁盘上,默认刷到registry这个文件里面。程序如果重启,就从这个文件重新加载,恢复之前的状态。可以认为这个文件就是Filebeat能正常工作的核心。而这个机制上的缺陷主要和Filebeat判断文件是否truncate的方式有关系。我们先看下判断的代码(Filebeat 6.4.3,github.com/elastic/beats/filebeat/input/log/input.go):

// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
  // 省略部分代码

    // File size was reduced -> truncated file
    if oldState.Finished && newState.Fileinfo.Size() < oldState.Offset {
        logp.Debug("input", "Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Fileinfo.Size())
        err := p.startHarvester(newState, 0)
        if err != nil {
            logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
        }

        filesTruncated.Add(1)
        return
    }

  // 省略部分代码
}

可以看到,Filebeat认为只要文件的大小比之前记录的这个文件(inode唯一标识)的offset小,就认为是truncate掉了,就会从文件头开始重新收集,这一般没什么问题。但如果这个文件的大小比offset大,就认为文件没变过,接着从上次的offset处理,但实际文件可能truncate过,甚至已经不是之前的文件了(inode重复导致)。registry文件里面虽然记录了文件名,但Filebeat唯一标识一个文件使用的是里面的inode值,而非文件名(所以文件重命名对于Filebeat正常工作没有影响),但操作系统的inode值是会复用的。这里举一个我碰到过的场景,比如原来有一个文件A,Filebeat处理过之后将其inode,以及处理的offset(假设为n)记录到了registry文件中。后来这个文件删除了,但registry里面记录的状态还没有自动删除,此时如果有另外一个文件B正好复用了之前A的inode,那Filebeat就会认为这个文件之前处理过,且已经处理到了offset为n处。如果B的文件比A小,即文件的end offset都小于n,那Filebeat就会认为是原来的A文件被truncate掉了,此时会从头开始收集,没有问题。但如果B的end offset大于等于n,那Filebeat就认为是A文件有更新,然后就会从offset为n处开始处理,于是B的前n个字节的数据就丢失了,这样我们就会看到数据有被截断。

其实,即使没有inode重用的问题,上面例子中的问题依旧可能。如果一个文件达到了限制(比如大小),不是重新创建一个新的文件写,而是将这个文件truncate掉继续复用(当然实际中这种场景好像比较少,但也并非没有),Filebeat下次来检查这个文件是否有变动的时候,这个文件的大小如果大于之前记录的offset,也会发生上面的情况。这个问题在github上面是有issue的,但目前还没有解决,官方回复是Filebeat的整个机制在重构中。

数据截断属于数据采集时丢失的一种情况。还有一些其它情况,比如文件数太多,Filebeat的处理能力有限,在还没来得及处理的时候这些文件就被删掉了(比如rotate给老化掉了)也会造成数据丢失。还有就是后端不可用,所以Filebeat还在重试,但源文件被删了,那数据也就丢了。因为Filebeat的重试并非一直发送已经收集到内存里面的event,必要的时候会重新从源文件读,比如程序重启。这些情况的话,只要不限制Filebeat的收集能力,同时保证后端的可用性,网络的可用性,一般问题不大。

所以,使用Filebeat既可能产生数据重复,也可能产生数据丢失,不同的业务场景,影响程度各异,但作为使用者,我们需要做到心中有数,出现这些问题时,明白为什么会这样。

]]>
5 http://niyanchun.com/filebeat-truncate-bug.html#comments http://niyanchun.com/feed/tag/Filebeat/
Filebeat重启后不收集的问题分析 http://niyanchun.com/analysize-filebeat-no-working-after-reboot.html http://niyanchun.com/analysize-filebeat-no-working-after-reboot.html Sat, 24 Aug 2019 14:10:00 +0800 NYC Filebeat作为elastic公司使用Golang开发的新一代的日志采集工具,旨在替换原来Logstash的日志收集功能(注意,Logstash的核心功能包含收集解析两大块,Filebeat的定位只是替代收集部分,所以Filebeat并不能完全取代Logstash)。因为其性能高,资源占用少且轻量级,已经逐渐被很多公司采用。而使用的问题也非常多,今天来讨论之前在elastic中文社区看到的一个问题:

filebeat采集log,每次filebeat程序断开,重新启动之后会生成一个registry.new文件,并且不会继续采集log,请问是什么原因呢?

要搞清楚这个问题,其实要回答两个问题:

  1. 为什么日志不继续采集?
  2. 为什么重启会生成一个registry.new文件?

本文的讨论都基于Filebeat6.4~6.8版本。如果你对Filebeat的配置项非常了解,其实这类不收集的问题很好排查。如果真的是不继续采集日志了,那原因很明确就是没有日志需要采集了,比如所有已存的日志已经都采集完了,而且没有新的日志产生。或者已经存在的日志不再采集范围内,这时候可通过以下一些点去排查:

  • 检查配置中的paths,确认需要采集的文件是否包含在设置的采集路径里面;如果设置了正则,确保文件能被匹配到(paths支持的正则只是有限的正则);
  • 检查上面paths有没有配置enable项,如果配置了确认是否为true(如果没配置,默认就是true);
  • 检查ignore_older是否有配置,是不是日志文件的最后修改时间(可使用stat命令查看)已经超过了忽略的时间点,即这些文件已经被忽略了;
  • 查看filebeat的日志,看filebeat是否在正常工作。如果是info级别的日志的话,每收集(打开)和关闭一个文件都会有打印一条日志,日志中包含文件的绝对路径及文件名。而且默认每30秒会打印一条monitor日志,里面包含了很多信息,包括正则运行的havester个数,已经确认的event等信息,看Filebeat状态是否正常;
  • 如果是某个文件不收集,就是去registry文件里面搜一下这个文件,看是否已经收集过了,并且注意对比一下时间;

一般这样排查下来之后,问题原因就能找到。当然,注意上面有个前提,就是真的是不继续采集日志了。而重启后,一般都不是不采集日志了,而是filebeat在从registry文件恢复状态。而这个恢复是比较慢的,registry文件越大,恢复时间越长。我之前测试过一次,一个大概2MB的registry文件,恢复了约三四十分钟,而这个恢复过程中输出的日志都是debug级别的,也就是默认info级别的情况下,你会发现filebeat既不工作,也没有输出任何日志,而且因为恢复的时候是单线程(更准确的说是单个Goroutine)的,所以CPU也占用也很少,所以给人感觉好像是filebeat不工作了,也就是问题中描述的“不继续采集log”。

至于registry.new文件,其实不光是会在恢复期间会产生,它在任意时刻都可能会产生,这是Filebeat写registry文件的机制。结合代码简单分析一下:

// registrar.go
func (r *Registrar) Run() {
    logp.Debug("registrar", "Starting Registrar")
    // Writes registry on shutdown
    defer func() {
        r.writeRegistry()
        r.wg.Done()
    }()

    var (
        timer  *time.Timer
        flushC <-chan time.Time
    )

    for {
        select {
        case <-r.done:
            logp.Info("Ending Registrar")
            return
        case <-flushC:
            flushC = nil
            timer.Stop()
            r.flushRegistry()
        case states := <-r.Channel:
            r.onEvents(states)
            if r.flushTimeout <= 0 {
                r.flushRegistry()
            } else if flushC == nil {
                timer = time.NewTimer(r.flushTimeout)
                flushC = timer.C
            }
        }
    }
}

registrar协程启动以后,在for循环里面会调用flushRegistry写registry文件,其实就是将内存中的数据写到磁盘。写的时机由filebeat.registry_flush配置项决定,其默认值为0。该配置项就是代码中的r.flushTimeout,等于0的时候,是每成功发送一批(batch)数据,就会写一次。如果大于0,则按照这个频次定期刷新。看下刷新的具体代码(有精简):

func (r *Registrar) flushRegistry() {
    if err := r.writeRegistry(); err != nil {
        logp.Err("Writing of registry returned error: %v. Continuing...", err)
    }
    // 省略
}

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
    // 省略
    tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states)
    if err != nil {
        registryFails.Inc()
        return err
    }

    err = helper.SafeFileRotate(r.registryFile, tempfile)
    if err != nil {
        registryFails.Inc()
        return err
    }

    logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
    registrySuccess.Inc()

    return nil
}

func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
    logp.Debug("registrar", "Write registry file: %s", baseName)

    tempfile := baseName + ".new"
    f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
    if err != nil {
        logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
        return "", err
    }

    defer f.Close()

    encoder := json.NewEncoder(f)

    if err := encoder.Encode(states); err != nil {
        logp.Err("Error when encoding the states: %s", err)
        return "", err
    }

    // Commit the changes to storage to avoid corrupt registry files
    if err = f.Sync(); err != nil {
        logp.Err("Error when syncing new registry file contents: %s", err)
        return "", err
    }

    return tempfile, nil
}

从代码看着就非常清楚了,写的时候先写临时文件registry.new,写完以后重命名成registry(helper.SafeFileRotate(r.registryFile, tempfile))。我个人觉得恢复状态是一个比较重要的过程,而且又可能很慢,其实可以打印info级别的日志的(比如像Kafka集群恢复的时候会打印恢复过程,让用户知道集群目前在做什么)这样也不会对用户造成困扰。如果真的遇到这个问题,怎么来确认是不是在从registry恢复状态呢,这里我教你几个方法:

  • 修改日志级别改为debug,这个最直观,但不适用于生产环境。所以大家可以观察filebeat是否有输出这样一条日志:2019-07-04T14:52:43.987+0800 INFO crawler/crawler.go:106 Loading and starting Inputs completed. Enabled inputs: 1 ,状态恢复完毕之后,会打印这样一条日志,而这个日志是info级别的,且就在filebeat启动的时候打印。如果一直没有输出这条日志,那就是还在恢复。

好,现在问题一般就可以定位清楚了,但并没有被很好的解决。因为这个恢复时间太长了,特别是如果你的registry文件很大的话,恢复几个小时,十几个小时都是完全有可能的,在恢复完之前,filebeat都是不工作的。那如何避免呢?有两类方法。

  1. 不会丢数据的方法:将filebeat.registry_flush参数由默认的0改为非0。我测了一下,改成1s时,27MB(包含20万个文件的状态)不到3秒就可以加载完了。这样改的弊端就是当Filebeat故障时,重复采集的文件会多一些,因为持久化结果的频次降低了,但不会丢数据。大多数情况下,日志重复采集问题不大。
  2. 控制registry的大小,这种可能会丢数据。filebeat提供了两个clean开头的参数来让我们尽量控制registry文件的大小,但这两个参数都有可能在某些场景下导致数据丢失,所以使用时一定确保你特别清楚这两个参数的用途及副作用。官方对于参数描述比较详细,这里我只简单说明一下:

    • clean-inactive:这个参数配置一个时间,如果一个文件超过这个时间都没有变更过,那它的状态就会从registry文件中移除。这个时间的值必须大于ignore_older + scan_frequency两个参数的时间和,以保证一个文件在收集的时候不会被移除状态。需要注意,这个参数的默认值是0,表示这个功能是disable的,即默认没有开启
    • clean-removed:这个参数比较简单,就是如果文件从磁盘上移除了,那就会从registry中删除这个文件的状态,该参数默认是开启的

当然这两个参数除了用于减小registry文件的大小,也可以有效避免inode重用导致的问题(但无法杜绝)。Filebeat使用inode唯一标识一个文件,而不是文件名,而unix*系统的inode是可能重复的,一旦重复,如果这个状态还存储在registry文件中,那filebeat就会产生问题,表现出来的现象就是数据截断,这个以后再说。

最后附上恢复状态的代码,供有兴趣的同学研究。beat的代码更新非常快,变化也非常大,以下代码取自filebeat 6.4.3。加载状态的方法是loadStates,在github.com/elastic/beats/filebeat/input/log/input.go文件中:

// LoadStates loads states into input
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the input will be loaded and updated. All other states will not be touched.
func (p *Input) loadStates(states []file.State) error {
    logp.Debug("input", "exclude_files: %s. Number of stats: %d", p.config.ExcludeFiles, len(states))

    for _, state := range states {
        // Check if state source belongs to this input. If yes, update the state.
        if p.matchesFile(state.Source) && p.matchesMeta(state.Meta) {
            state.TTL = -1

            // In case a input is tried to be started with an unfinished state matching the glob pattern
            if !state.Finished {
                return fmt.Errorf("Can only start an input when all related states are finished: %+v", state)
            }

            // Update input states and send new states to registry
            err := p.updateState(state)
            if err != nil {
                logp.Err("Problem putting initial state: %+v", err)
                return err
            }
        }
    }

    logp.Debug("input", "input with previous states loaded: %v", p.states.Count())
    return nil
}

Filebeat虽然简单,但只是相对于ES、Logstash这些而言的,其本身代码量也非常大,只看官方文档其实只能明白怎么用,对于一些细节,还是只能从源码寻找答案,特别是遇到一些比较奇怪的问题或者有定制化需求的时候。

本文先介绍到这,后面会从源码角度再讨论Filebeat的一些机制和使用时的注意点。

]]>
1 http://niyanchun.com/analysize-filebeat-no-working-after-reboot.html#comments http://niyanchun.com/feed/tag/Filebeat/
ELK+Filebeat小试 http://niyanchun.com/ELK-Filebeat.html http://niyanchun.com/ELK-Filebeat.html Mon, 12 Feb 2018 11:44:00 +0800 NYC ELK

ELK(Elasticsearch, Logstash, Kibana)是什么?网上很多,本文就不赘述了,这里推荐IBM的一篇文章:ELK+Filebeat 集中式日志解决方案详解,感觉图文并茂,讲的挺不错,而且里面也对Beat做了简单的说明。

Logstash vs Filebeat

这里也推荐一篇文章:Filebeat vs. Logstash — The Evolution of a Log Shipper.

如果你觉得上面的文章有些长,那我总结一下。

先看下二者的历史:

  1. 把Logstash和FileBeat放在一起比较其实不是非常合适,因为二者往往是互补的关系,可以配合使用,但这里放在一起比较的原因主要是说明他们的关系。
  2. Logstash是由Jordan Sissel(链接是Twitter的)这个大牛开发的,后来这个人加入了Elastic公司,Logstash也就由一个单独的软件变成了ELK技术栈内的一员。Logstash是由Jruby写的,运行依赖于JVM,所以它的突出问题就是性能不是很好,尤其是对内存消耗比较高。
  3. 同时,有人用Go语言开发了Lumberjack,刚开始的初衷是实现一个轻量级的日志收集系统,而日志分析则转交给其他软件去做,比如Logstash。Lumberjack背后的核心思想就是开发一套高效、可处理大量数据、内存消耗低、支持加密的网络协议。后来Lumberjack更名为Logstash-Forwarder,从此以后,Lumberjack单纯的指网络协议,而Logstash-Forwarder则成为基于Lumberjack的日志系统。
  4. 等到Lumberjack协议出了第二个版本以后(新版本的主要改变是支持嵌套JSON和一些流控机制),Logstash-Forwarder也被废弃了,出现了基于第二版Lumberjack的日志收集系统Beats。Beats是一个家族,里面包含Filebeat、Packetbeat、Winlogbeat等,用于收集不用数据源的日志,比如Filebeat用于收集文件式的日志。其中第一个出现的是Packetbeat,用于分析网络数据,后来其开发者也加入了Elastic公司,进而就有了后来其他的Beat。详情参见:https://www.elastic.co/downloads/beats

所以从二者的发展过程中我们就能看出来,最初的Logstash是既可以做日志收集,又可以做一些复杂分析的,但缺点是性能差,特别是运行依赖于JVM,作为Agent部署到每个节点上去的时候显得有些太重。于是乎就出现了基于更高效的编译型语言Go开发的Beat系列,它们很好的替代了Logstash收集日志的那一部分功能(当然也可以做一些简单的分析),但目前还做不了复杂的分析,特别是聚集类的分析。所以在实际使用中,如果我们不需要对日志做复杂的分析,那基本上选择Beat+ES+Kibana就可以了。但如果需要做复杂的分析,往往就是Beat+Logstash+ES+Kibana。这些软件都是Elastic公司开发的,所以可以无缝配合使用。

最后附张图(图片来源于上面的文章):

Beats和Logstash关系

Filebeat+ES+Kibana实例

该例子来源于:https://github.com/elastic/examples

当然第一步得安装对应的软件,这些软件都可以在https://www.elastic.co/downloads下载。需要说明的是目前主要有两种安装方式:一种是tar.gz包;一种是对应平台的包,比如rmp、deb等。二者各有利弊:

  • 通过tar.gz的好处是用户可以自定义安装目录,所有东西都安装在该目录下,不好处是软件没有自动纳入到系统管理里面,简单说就是你不能通过systemctl(对于upstart系统是service)命令去对软件的开机自动启动、软件启停等去管理,如果需要这样,需要自己手动去加service文件。
  • 通过rpm、deb等包安装的好处就是安装完成之后你就可以使用systemctl命令去管理软件了,不好处就是安装目录无法指定,而且配置文件和其他文件是分开在不同地方的。默认安装在/usr/share/目录下,配置文件在/etc目录下。

萝卜白菜,各有所爱。我个人推荐至少使用tar.gz安装一次,看看里面的目录等长啥样,如果第一次就通过rpm、deb包安装,安装完后,你可能连安装到哪里了都不知道。对于下面的例子,Filebeat要使用tar.gz的方式安装。

具体的安装过程就不赘述了,官网写的比较详细,网上教程也很多。这里说几个注意点(我是基于Ubuntu 16.04的):

  1. 通过tar.gz包安装的话,就是解压,设置环境变量,配置等;通过rpm、deb包安装的话,一条命令就搞定了。
  2. 对于ES,

    • 需要修改一下系统允许的最大虚拟内存,否则启动会失败,报错信息类似[1]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]。修改方法为,在/etc/sysctl.conf文件中增加(如果有,则修改)vm.max_map_count=262144,需重启才可生效。
    • ES不允许以root用户运行,需要使用其他用户。所以要注意其他用户一定要有权限访问和执行ES安装目录以及对应的文件。
    • 需要后台运行的话,在elasticsearch启动的时候加上-d参数即可。
    • ES的配置需要修改一下监听地址network.host(默认值是192.168.0.1),可修改为具体的IP或者0.0.0.0,端口号默认9200,可以不用修改。同时建议设置一下监听数据目录path.data,要注意修改的目录一定要是运行ES的用户可读可写的。
    • 安装例子需要的ES插件:
    <path_to_elasticsearch_root_dir>/bin/elasticsearch-plugin install ingest-user-agent
    <path_to_elasticsearch_root_dir>/bin/elasticsearch-plugin install ingest-geoip
    • 安装好ES之后,在浏览器中输入(或者curl等)http://<your_ip>:9200/,可看到类似如下结果则表示安装成功:
    {
      "name" : "i-gOkTs",
      "cluster_name" : "elasticsearch",
      "cluster_uuid" : "kmCU4GOqQteFbR4CekK6dA",
      "version" : {
        "number" : "6.2.1",
        "build_hash" : "7299dc3",
        "build_date" : "2018-02-07T19:34:26.990113Z",
        "build_snapshot" : false,
        "lucene_version" : "7.2.1",
        "minimum_wire_compatibility_version" : "5.6.0",
        "minimum_index_compatibility_version" : "5.0.0"
      },
      "tagline" : "You Know, for Search"
    }
  3. 对于Kibana,需要设置监听IPserver.host、ES地址elasticsearch.url,默认端口是5601,可以不修改。装完之后,如果http://<your_ip>:5601/能打开,则表示安装成功。

安装完之后,启动ES和Kibana。下载日志文件:apache_logs,假设下载的文件放在/root/temp目录。然后在Filebeat安装目录执行以下命令:

./filebeat modules enable apache2
./filebeat modules list
 ./filebeat -e --modules apache2 --setup  -M "apache2.access.var.paths=[/root/temp/*]"

如果运行正常的话,日志数据就会经Filebeat采集到ES里面(这里不需要复杂分析,所以没有使用Logstash)。在浏览器输入http://<your_ip>:9200/filebeat-*/_count,如果看到"count":10000字样,表示数据已经全部进入ES。

然后打开Kibana界面,选择Management-Index Patterns,在里面输入filebeat-*,下一步可选择I don't want to use Time Filter。然后打开Dashboard,选择[Filebeat Apache2] Access and error logs。然后在右上角选择2015-05-17 00:00:00.0002015-05-21 12:00:00.000时间段,就可以看到如下分析图了:

Kibana Show

ELK Stack作为一套开源的日志分析系统,已经被很多企业所采用,现在Beat家族加入后,相信前途会更好。

And, I love Go and Python. And, Happy Lunar New Year!

]]>
0 http://niyanchun.com/ELK-Filebeat.html#comments http://niyanchun.com/feed/tag/Filebeat/