本文对Filebeat代码进行简单分析,作为之前 Filebeat导致文件无法被删除的原因分析 一文的补充,当然也可单独阅读,了解Filebeat的代码逻辑。需要注意的是:本文不是全面、深度的Filebeat源码剖析,而是专注在通用配置下核心数据的流转上面,目标是理清楚数据从采集到中间流转,最后到发送的流程,而不是对每处代码细节进行分析讲解。本文核心点包括:

  1. Filebeat实例的创建和初始化流程;
  2. 文件的遍历、采集,包括Crawler、Input、Harvester、Reader等核心概念;
  3. 数据发送逻辑,即Publisher/Pipeline;
  4. Filebeat的ACK机制。

另外,本文包含大量代码,但为了尽量减少代码所占篇幅和突出重点逻辑,大部分代码都做了删减,有的地方有说明,有的没有说明。如果想看完整代码,建议按照下面的指导搭建自己的源码环境,对照着阅读。当然,本文假设你已经对Filebeat基本使用比较熟悉了。

源码准备

Filebeat是Elastic公司Beat系列的一员,所有Beat的源码都归档在一起,所以下载源码直接克隆Beat的代码即可。需要注意的是早期Go的模块依赖使用的是govendor,这种方式需要代码在GOPATH的src目录下面,而Go 1.11版本中引入了Go module作为新的模块依赖,并且在1.12版本中正式启用,作为默认选项(当然用户可以通过GO111MODULE这个环境变量来控制是否使用Go module功能)。这个特性的转变对于搭建Filebeat的源码环境也有一些影响,查看一下你想阅读的Beat的分支是否有go.mod文件,如果有则说明使用的是Go module,否则还是老的govendor。如果是govendor,那你必须将代码放到${GOPATH}/src/github.com/elastic,且GO111MODULE设置为auto或者off;如果是Go module,则没有这个要求。至于Golang的版本,不要比官方要求的版本低就行。

本文的讲解是基于6.8分支的代码(6.8.15),该版本的beat模块依赖还是早期的govendor,所以用如下命令搭建源码环境:

mkdir -p ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats
cd ${GOPATH}/src/github.com/elastic/beats
git checkout 6.8

如果你无法科学上网,从github拉取beat的代码会很慢(主要是因为.git目录非常大),所以我把自己的代码打包传到了百度网盘,实在从github上拉不下来的可以直接下载我上传的(点此进入下载页面,密码: 6bro)。我提供了2个版本,一个包含git,一个不包含,都不影响代码阅读,没有git的好处是代码会小很多。

我的Golang版本是1.15.7,编辑器是vscode。代码下载好以后,直接运行根目录下面的filebeat目录中的main.go,如果可以运行,那环境就算搭建好了。注意:如果提示配置文件filebeat.yml找不到,那环境也没问题,可能是因为你启动Filebeat的目录没有这个配置文件,可以通过-c <配置文件>这个命令行参数去指定一个filebeat配置即可。另外,Filebeat日志默认只输出到文件,如果调试的时候想输出到终端,再增加一个-e参数。如果是vscode,则直接修改.vscode/launch.json即可:

{
    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Launch file",
            "type": "go",
            "request": "launch",
            "mode": "debug",
            "program": "${file}",
            "args": ["-e"]
        }
    ]
}

filebeat与libbeat

熟悉Beat的应该都知道Beat里面有一个核心的libbeat模块,实现了很多基础功能,比如实例初始化、公共配置、队列、输出(output)、日志等,不同的Beat往往只需要实现如何采集数据,然后将数据以事件(event)的方式通过channel发给libbeat即可,这种方式大大简化了各个Beat的开发。下面是官方的一张说明Beat与libbeat关系的图:

beat-overview

上面提到的事件是在libbeat/beat/event.go中定义的:

// Event is the common event format shared by all beats.
// Every event must have a timestamp and provide encodable Fields in `Fields`.
// The `Meta`-fields can be used to pass additional meta-data to the outputs.
// Output can optionally publish a subset of Meta, or ignore Meta.
type Event struct {
    Timestamp time.Time
    Meta      common.MapStr
    Fields    common.MapStr
    Private   interface{} // for beats private use
}

Filebeat作为Beat的一员,也遵从上面的逻辑:Filebeat实现了文件的采集,并以event的方式发送给libbeat的Publisher,后续的工作都由libbeat完成。所以了解Filebeat完整的数据流程包含Filebeat和libbeat两部分代码。对应的代码在根目录下的filebeat目录和libbeat目录。为了方便理解,这里先给出一个精简的数据流图:

Filebeat精简数据流

各部分主要功能是:

  • Filebeat instance:Filebeat实例的创建和初始化;
  • Crawler:遍历配置中的input,创建、初始化Input实例;
  • Input:遍历配置的目录、文件,为每个需要采集的文件创建、初始化Harvester;
  • Harvester:读取文件内容,以event的方式发送到后端的队列(本文以最常使用的内存队列为例讲解);
  • Queue/Spooler:Filebeat内部的队列有两种:内存队列(Broker)和文件队列(Spooler),代码中文件队列叫spooler,但大多数地方spooler是queue的代名词,而不专指文件队列;
  • Publisher:libbeat中的发送逻辑,整个发送流程在代码中称为pipeline
  • Registrar:记录采集过的文件及位置(offset),和ACK机制一起实现Filebeat承诺的至少发送一次(at-least once)的保证。

下面分别介绍这些模块。

实例初始化 && Registrar

第一步是Filebeat的实例初始化。libbeat定义了一个Beater接口,不同的Beat实现这个接口,即可复用libbeat定义的诸多基础功能,这个接口定义如下:

// libbeat/beat/beat.go

// Beater is the interface that must be implemented by every Beat. A Beater
// provides the main Run-loop and a Stop method to break the Run-loop.
// Instantiation and Configuration is normally provided by a Beat-`Creator`.
//
// Once the beat is fully configured, the Run() method is invoked. The
// Run()-method implements the beat its run-loop. Once the Run()-method returns,
// the beat shuts down.
//
// The Stop() method is invoked the first time (and only the first time) a
// shutdown signal is received. The Stop()-method normally will stop the Run()-loop,
// such that the beat can gracefully shutdown.
type Beater interface {
    // The main event loop. This method should block until signalled to stop by an
    // invocation of the Stop() method.
    Run(b *Beat) error

    // Stop is invoked to signal that the Run method should finish its execution.
    // It will be invoked at most once.
    Stop()
}

// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
    Info      Info     // beat metadata.
    Publisher Pipeline // Publisher pipeline

    SetupMLCallback SetupMLCallback // setup callback for ML job configs
    InSetupCmd      bool            // this is set to true when the `setup` command is called

    OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback
    // XXX: remove Config from public interface.
    //      It's currently used by filebeat modules to setup the Ingest Node
    //      pipeline and ML jobs.
    Config *BeatConfig // Common Beat configuration data.

    BeatConfig *common.Config // The beat's own configuration section

    Fields []byte // Data from fields.yml

    ConfigManager management.ConfigManager // config manager
}

Filebeat在filebeat/beater/filebeat.go中实现了这个接口。我们看下Filebeat是如何从main函数(filebeat/main.go)流转到filebeat.go的:

// filebeat/main.go

// The basic model of execution:
// - input: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
    if err := cmd.RootCmd.Execute(); err != nil {
        os.Exit(1)
    }
}

main前面的注释概述了Filebeat的运行逻辑,也是前面图的由来。main方法里面的代码很简单,下面直接给出关键的调用关系:

  1. filebeat/main.go#main
  2. filebeat/cmd/root.go#init: RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
  3. libbeat/cmd/root.go#GenRootCmdWithSettings: rootCmd.RunCmd = genRunCmd(settings, beatCreator, runFlags)
  4. libbeat/cmd/run.go#genRunCmd: err := instance.Run(settings, beatCreator)
  5. libbeat/cmd/instance/beat.go#Run: return b.launch(settings, bt)
  6. libbeat/cmd/instance/beat.go#launch: return beater.Run(&b.Beat)
  7. filebeat/beater/filebeat.go#Run

这部分的代码流转主要是在libbeat中,所以是一些比较通用化的配置,主要是构造Beater实例(此处是Filebeat实例),并最终进入到filebeat.go的Run方法中。Run方法中有两个比较重要的操作:registrar和crawler的创建、启动。

Filebeat的at-least once采集就是通过Registrar模块实现的,该文件会记录采集过的文件以及最后采集的位置,并且只有收到数据成功发送的确认后才会更新,其核心结构如下:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int
}

确认消息成功发送的回调(通过channel实现)函数是在Run方法里面注册的(这部分后面还会专门讲解):

err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
    ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})

Registrar的记录文件默认为filebeat安装目录/data/registry,示例内容如下(为了方便查看,进行了格式化):

[{
    "source": "/Users/allan/Desktop/temp/test-logs/test-1.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.435218+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712351738,
        "device": 16777225
    }
}, {
    "source": "/Users/allan/Desktop/temp/test-logs/test-2.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.24922+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712603538,
        "device": 16777225
    }
}]

另外,为了保证数据不丢失,实例初始化的时候各大模块的启动顺序依次是:registrar --> publisher --> spooler --> crawler/input. 而实例停止的时候各大模块的停止顺序则正好相反。下面看Crawler和Input。

输入/文件遍历(Crawler && Input)

Crawler

我们知道filebeat采集数据是通过在配置文件里面配置若干个input实现的,而Crawler的作用就是解析这些input,并创建、启动Input。Crawler的核心代码在filebeat/crawler/crawler.go中。

核心结构定义如下:

type Crawler struct {
    inputs          map[uint64]*input.Runner   // 包含若干个Input
    inputConfigs    []*common.Config
    out             channel.Factory
    wg              sync.WaitGroup
    InputsFactory   cfgfile.RunnerFactory
    ModulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatVersion     string
    beatDone        chan struct{}
}

在Start方法中遍历配置中的input,并创建、启动input:

// Start starts the crawler with all inputs
func (c *Crawler) Start(
    pipeline beat.Pipeline,
    r *registrar.Registrar,
    configInputs *common.Config,
    configModules *common.Config,
    pipelineLoaderFactory fileset.PipelineLoaderFactory,
    overwritePipelines bool,
) error {

    logp.Info("Loading Inputs: %v", len(c.inputConfigs))

    // Prospect the globs/paths given on the command line and launch harvesters
    for _, inputConfig := range c.inputConfigs {
        err := c.startInput(pipeline, inputConfig, r.GetStates())
        if err != nil {
            return err
        }
    }

    // 省略部分代码
    logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))

    return nil
}

// 创建、启动Input
func (c *Crawler) startInput(
    pipeline beat.Pipeline,
    config *common.Config,
    states []file.State,
) error {
    if !config.Enabled() {
        return nil
    }

    connector := channel.ConnectTo(pipeline, c.out)
    p, err := input.New(config, connector, c.beatDone, states, nil)
    if err != nil {
        return fmt.Errorf("Error in initing input: %s", err)
    }
    p.Once = c.once

    if _, ok := c.inputs[p.ID]; ok {
        return fmt.Errorf("Input with same ID already exists: %d", p.ID)
    }

    c.inputs[p.ID] = p

    p.Start()

    return nil
}

Input

Filebeat支持多种类型的Input,比如log、redis、stdin、docker等,这些代码都在filebeat/input目录,不同类型的目录下面实现了特定的input。通用Input接口定义如下:

// filebeat/input/input.go
// Input is the interface common to all input
type Input interface {
    Run()
    Stop()
    Wait()
}

// Start starts the input
func (p *Runner) Start() {
    p.wg.Add(1)
    logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID)

    // 省略部分代码

    // Add waitgroup to make sure input is finished
    go func() {
        defer func() {
            onceWg.Done()
            p.stop()
            p.wg.Done()
        }()

        p.Run()
    }()
}

// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Runner) Run() {
    // Initial input run
    p.input.Run()

    // 省略部分代码
}

此处以最常用的log类型为例进行说明。p.input.Run()会跳转到filebeat/log/input.go#Run

// Input contains the input and its config
type Input struct {
    cfg           *common.Config
    config        config
    states        *file.States
    harvesters    *harvester.Registry   // 1个input包含多个harvesters
    outlet        channel.Outleter
    stateOutlet   channel.Outleter
    done          chan struct{}
    numHarvesters atomic.Uint32
    meta          map[string]string
    stopOnce      sync.Once
}


// Run runs the input
func (p *Input) Run() {
    logp.Debug("input", "Start next scan")

    // TailFiles is like ignore_older = 1ns and only on startup
    if p.config.TailFiles {
        ignoreOlder := p.config.IgnoreOlder

        // Overwrite ignore_older for the first scan
        p.config.IgnoreOlder = 1
        defer func() {
            // Reset ignore_older after first run
            p.config.IgnoreOlder = ignoreOlder
            // Disable tail_files after the first run
            p.config.TailFiles = false
        }()
    }
    p.scan()

    // It is important that a first scan is run before cleanup to make sure all new states are read first
    if p.config.CleanInactive > 0 || p.config.CleanRemoved {
        beforeCount := p.states.Count()
        cleanedStates, pendingClean := p.states.Cleanup()
        logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
            beforeCount, beforeCount-cleanedStates, pendingClean)
    }

    // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
    if p.config.CleanRemoved {
        for _, state := range p.states.GetStates() {
            // os.Stat will return an error in case the file does not exist
            stat, err := os.Stat(state.Source)
            if err != nil {
                if os.IsNotExist(err) {
                    p.removeState(state)
                    logp.Debug("input", "Remove state for file as file removed: %s", state.Source)
                } else {
                    logp.Err("input state for %s was not removed: %s", state.Source, err)
                }
            } else {
                // Check if existing source on disk and state are the same. Remove if not the case.
                newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
                if !newState.FileStateOS.IsSame(state.FileStateOS) {
                    p.removeState(state)
                    logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
                }
            }
        }
    }
}

对Filebeat配置比较熟悉的朋友看到这部分代码,应该很亲切,变量的命名和配置项几乎是对应的,很多判断逻辑都是对配置项的处理,很容易理解。其中比较关键的是p.scan():

// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {
    var sortInfos []FileSortInfo
    var files []string

    // 获取目录下需要被采集的文件,是否需要被采集的逻辑就是在getFiles()方法中实现
    paths := p.getFiles()

    // 省略部分代码

    for i := 0; i < len(paths); i++ {
        // 省略一些判断是否采集过,以及采集到哪里的代码

        // Decides if previous state exists
        if lastState.IsEmpty() {
            logp.Debug("input", "Start harvester for new file: %s", newState.Source)
            // 启动harvester
            err := p.startHarvester(newState, 0)
            // 省略错误处理代码
        } else {
            p.harvestExistingFile(newState, lastState)
        }
    }
}

// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
    logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)

    if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
        logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
        // 启动harvester采集
        err := p.startHarvester(newState, oldState.Offset)
        if err != nil {
            logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
        }
        return
    }
    // 省略后续代码
}

// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(state file.State, offset int64) error {
    if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
        p.numHarvesters.Dec()
        harvesterSkipped.Add(1)
        return errHarvesterLimit
    }
    // Set state to "not" finished to indicate that a harvester is running
    state.Finished = false
    state.Offset = offset

    // Create harvester with state
    h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })
    if err != nil {
        p.numHarvesters.Dec()
        return err
    }

    err = h.Setup()
    if err != nil {
        p.numHarvesters.Dec()
        return fmt.Errorf("error setting up harvester: %s", err)
    }

    // Update state before staring harvester
    // This makes sure the states is set to Finished: false
    // This is synchronous state update as part of the scan
    h.SendStateUpdate()

    if err = p.harvesters.Start(h); err != nil {
        p.numHarvesters.Dec()
    }
    return err
}

scan代码的核心逻辑就是遍历目录下的文件,找到需要采集的文件后就创建启动一个harvester实例进行采集。最后面的startHarvester方法中先是创建了一个harvester实例(p.createHarvester),然后配置(h.Setup)该实例,最后启动(p.harvesters.Start(h))实例。这部分在接下来的Harvester部分介绍。

filebeat/log/input.go文件中的代码中包含了大量配置项的代码逻辑,建议好好看一下。如果你对input部分的配置项比较熟悉,这部分代码看起来也比较简单。对照配置项说明文档进行查看,效果更佳。

数据采集Harvester

Harvester

一个Harvester就是一个goroutine,接口定义在filebeat/harvester/harvester.go中:

// Harvester contains all methods which must be supported by each harvester
// so the registry can be used by the input
type Harvester interface {
    ID() uuid.UUID      
    Run() error
    Stop()
}

不同的input类型会实现自己的Harvester,log类型的Harvester实现在filebeat/input/log/harvester.go中,核心结构定义如下:

// Harvester contains all harvester related data
type Harvester struct {
    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(*util.Data) bool

    onTerminate func()
}

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
    // 该方法在上篇文章中已经介绍过,下面省略掉了大部分代码,只保留了读取和发送
    for {
        // 读取
        message, err := h.reader.Next()

        // 发送
        if !h.sendEvent(data, forwarder) {
            return nil
        }
    }
}

上篇文章中已经介绍过Harvester,其核心任务就是打开文件,根据配置读取文件内容,并发送。读取和发送都是在Run中实现,见上面的代码注释。本文再补充介绍下另外一个关键点reader.Reader

各种Reader

其实读取的真正操作是由一系列Reader完成的。Reader接口定义在filebeat/reader/reader.go中:

// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
    Next() (Message, error)
}

// Message represents a reader event with timestamp, content and actual number
// of bytes read from input before decoding.
type Message struct {
    Ts      time.Time     // timestamp the content was read
    Content []byte        // actual content read
    Bytes   int           // total number of bytes read to generate the message
    Fields  common.MapStr // optional fields that can be added by reader
}

该接口只包含一个Next方法,每调用一次,则读取一个Message。Filebeat实现了很多种Reader,这些Reader根据用户的配置形成一个调用链,对最原始的数据依次进行处理,就像一个流水线一样。每一个后面的Reader都包含了前面的Reader。这些Reader都定义在filebeat/reader目录,主要包括下面这些:

filebeat-reader

最底层的log.go#Read并不是一个Filebeat的reader.Reader实现,而是直接调用了Go底层的Read,实现读取指定长度的字节数据:

// filebeat/input/log/log.go
func (f *Log) Read(buf []byte) (int, error) {
    for {
        n, err := f.fs.Read(buf)  // 调用go底层os/file.go#Read
    }
}

// os/file.go#Read
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
    if err := f.checkValid("read"); err != nil {
        return 0, err
    }
    n, e := f.read(b)
    return n, f.wrapErr("read", e)
}

再往上都是各种reader.Reader的实现,依次如下(都省略了部分代码):

LineReader,实现逐行读取的功能:

// filebeat/reader/readfile/line.go
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
    reader     io.Reader
    codec      encoding.Encoding
    bufferSize int
    maxBytes   int
    nl         []byte
    inBuffer   *streambuf.Buffer
    outBuffer  *streambuf.Buffer
    inOffset   int // input buffer read offset
    byteCount  int // number of bytes decoded from input buffer into output buffer
    decoder    transform.Transformer
}

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
    for {
        err := r.advance()
    }
}

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *LineReader) advance() error {
    // fill inBuffer until '\n' sequence has been found in input buffer
    for idx == -1 {
        // try to read more bytes into buffer   filebeat/input/log/log.go#Read
        n, err := r.reader.Read(buf)
    }
}

EncoderReader,对行数据进行编解码:

// filebeat/reader/readfile/encode.go
// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type EncoderReader struct {
    reader *LineReader
}

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r EncoderReader) Next() (reader.Message, error) {
    c, sz, err := r.reader.Next()
    // Creating message object
    return reader.Message{
        Ts:      time.Now(),
        Content: c,
        Bytes:   sz,
    }, err
}

JSON Reader,处理JSON格式的数据:

// filebeat/reader/json/json.go
type JSON struct {
    reader reader.Reader
    cfg    *Config
}

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (reader.Message, error) {
}

StripNewline Reader,去掉后面的换行符:

// filebeat/reader/readfile/strip_newline.go
// StripNewline reader removes the last trailing newline characters from
// read lines.
type StripNewline struct {
    reader reader.Reader
}

// Next returns the next line.
func (p *StripNewline) Next() (reader.Message, error) {
}

Timeout Reader,超时处理:

// filebeat/reader/readfile/timeout.go
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
    reader  reader.Reader
    timeout time.Duration
    signal  error
    running bool
    ch      chan lineMessage
}

// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
}

Multiline Reader,多行合并处理:

// filebeat/reader/multiline/multiline.go
// MultiLine reader combining multiple line events into one multi-line event.
//
// Lines to be combined are matched by some configurable predicate using
// regular expression.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type Reader struct {
    reader       reader.Reader
    pred         matcher
    flushMatcher *match.Matcher
    maxBytes     int // bytes stored in content
    maxLines     int
    separator    []byte
    last         []byte
    numLines     int
    truncated    int
    err          error // last seen error
    state        func(*Reader) (reader.Message, error)
    message      reader.Message
}

// Next returns next multi-line event.
func (mlr *Reader) Next() (reader.Message, error) {
    return mlr.state(mlr)
}

LimitReader,单个event长度限制:

// filebeat/reader/readfile/limit.go
// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type LimitReader struct {
    reader   reader.Reader
    maxBytes int
}

// Next returns the next line.
func (r *LimitReader) Next() (reader.Message, error) {
    message, err := r.reader.Next()
    if len(message.Content) > r.maxBytes {
        message.Content = message.Content[:r.maxBytes]
        message.AddFlagsWithKey("log.flags", "truncated")
    }
    return message, err
}

这么多的Reader并非在所有场景下都是必须的,需要根据用户配置进行装配,这个操作是在初始化Harvester时进行的:

// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
    err := h.open()
    if err != nil {
        return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
    }

    // 在newLogFileReader中根据用户配置装配 Reader 流
    h.reader, err = h.newLogFileReader()
    if err != nil {
        if h.source != nil {
            h.source.Close()
        }
        return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
    }

    return nil
}

Harvester的Run方法中通过无限循环调用h.reader.Next()时,会依次递归调用这些Reader的Next方法,加工数据,得到的最终数据发给给后端。

发送逻辑(Harvester内部)

发送的调用流程如下:

  1. filebeat/input/log/harvester.go#Run(): h.sendEvent(data, forwarder)
  2. filebeat/input/log/harvester.go#sendEvent: forwarder.Send(data)
  3. filebeat/harvester/forwarder.go#Send: f.Outlet.OnEvent(data)
  4. filebeat/channel/util.go#OnEvent

实际的发送是由filebeat/channel/util.gofilebeat/channel/outlet.go中的代码协作完成的:

filebeat/channel/util.go:

// filebeat/channel/util.go
func (o *subOutlet) OnEvent(d *util.Data) bool {

    o.mutex.Lock()
    defer o.mutex.Unlock()
    select {
    case <-o.done:
        return false
    default:
    }

    select {
    case <-o.done:
        return false
    // 数据写入channel
    case o.ch <- d:
        select {
        case <-o.done:
            return true
        // 等待结果
        case ret := <-o.res:
            return ret
        }
    }
}

// filebeat/channel/util.go
// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
    s := &subOutlet{
        done: make(chan struct{}),
        ch:   make(chan *util.Data),
        res:  make(chan bool, 1),
    }

    go func() {
        // 从channel读取数据,并调用OnEvent发送数据
        for event := range s.ch {
            s.res <- out.OnEvent(event)
        }
    }()

    return s
}

filebeat/channel/outlet.go:

func (o *outlet) OnEvent(d *util.Data) bool {
    if !o.isOpen.Load() {
        return false
    }

    event := d.GetEvent()
    if d.HasState() {
        event.Private = d.GetState()
    }

    if o.wg != nil {
        o.wg.Add(1)
    }

    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish

    return o.isOpen.Load()
}

这部分代码逻辑是:

  1. 数据经由filebeat/channel/util.go#OnEvent中的case o.ch <- d写入到channel中,然后在内层的select中等待结果;
  2. 同时filebeat/channel/util.go#Outleter里面的一个goroutine会读取写入到channel的数据,并调用filebeat/channel/outlet.go#OnEvent中的o.client.Publish(event)发送数据,发送成功后,将结果写回到subOutlet的res channel,然后harvester返回。

上篇文章中harvester关闭不掉,就是因为卡在了o.client.Publish(event)无法返回,所以harvester卡在了subOutlet的res channel,即一直等待发送结果。

到目前为止,数据都是在Filebeat的代码中流转的,此处调用o.client.Publish(event)之后,就会进入到libbeat中(libbeat/publisher/pipeline/client.go#Publish)。

数据发送Publisher

从这部分开始,代码进入到libbeat中,这部分代码的复杂主要体现在涉及概念比较多,流转流程比较长。

Pipeline

Beat把Publisher中的整个流程称之为pipeline,是libbeat中非常核心的一部分,代码在libbeat/publisher/pipeline/pipeline.go中,这里贴一下包含较详细注释部分的代码:

// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline

// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most central entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
type Pipeline struct {
    beatInfo beat.Info

    logger *logp.Logger
    queue  queue.Queue
    output *outputController

    observer observer

    eventer pipelineEventer

    // wait close support
    waitCloseMode    WaitCloseMode
    waitCloseTimeout time.Duration
    waitCloser       *waitCloser

    // pipeline ack
    ackMode    pipelineACKMode
    ackActive  atomic.Bool
    ackDone    chan struct{}
    ackBuilder ackBuilder
    eventSema  *sema

    processors pipelineProcessors
}

我整理了一个图:

beat-pipeline

下面介绍pipeline中的各个环节。接前文,Harvester中会持有一个最左边的client实例(一个Input实例中的所有Harvester共享该Client),然后通过这个client调用Producer将数据发送到一个buffer,这是一个channel,大小硬编码为20。同时需要注意的一个点是client的publish中还执行了processor,也就是如果定义了processor,他是在发送的Client里面执行的。这部分功能对应代码如下(只保留了关键代码):

// filebeat/channel/outlet.go
func (o *outlet) OnEvent(d *util.Data) bool {
    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish
}

// libbeat/publisher/pipeline/client.go
func (c *client) publish(e beat.Event) {
    // 如果定义了processor,则在此处执行
    if c.processors != nil {
        var err error

        event, err = c.processors.Run(event)
        publish = event != nil
        if err != nil {
            // TODO: introduce dead-letter queue?

            log.Errorf("Failed to publish event: %v", err)
        }
    }

    var published bool
    if c.canDrop {
        published = c.producer.TryPublish(pubEvent)
    } else {
        published = c.producer.Publish(pubEvent) // queue/memqueue/produce.go
    }
}

// libbeat/publisher/queue/memqueue/produce.go
func (p *forgetfulProducer) Publish(event publisher.Event) bool {
    return p.openState.publish(p.makeRequest(event))
}
func (st *openState) publish(req pushRequest) bool {
    select {
    // 将数据发送到events buffer中
    case st.events <- req:
        return true
    case <-st.done:
        st.events = nil
        return false
    }
}

然后EventLoop从events buffer中读取数据写到batchBuffer中,batchBuffer是一个Slice,其大小为queue.mem.events(默认值为4096)。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/queue/memqueue/eventloop.go
// 创建EventLoop
func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
    l := &bufferingEventLoop{
        broker:       b,
        maxEvents:    size,
        minEvents:    minEvents,
        flushTimeout: flushTimeout,
        // 直接使用Broker的events
        events:    b.events,
        get:       nil,
        pubCancel: b.pubCancel,
        acks:      b.acks,
    }
    l.buf = newBatchBuffer(l.minEvents)

    l.timer = time.NewTimer(flushTimeout)
    if !l.timer.Stop() {
        <-l.timer.C
    }

    return l
}

func (l *bufferingEventLoop) run() {
    var (
        broker = l.broker
    )

    for {
        select {
        case <-broker.done:
            return

        case req := <-l.events: // producer pushing new event
            l.handleInsert(&req)
        }
    }
}

func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
    // insert会把数据写入batchBuffer
    if l.insert(req) {
        l.eventCount++
        if l.eventCount == l.maxEvents {
            // 队列面了就把chan设置为nil,此时写会被阻塞。等收到ack(handleACK)后又会恢复队列
            l.events = nil // stop inserting events if upper limit is reached
        }
    }
}

数据到batchBuffer之后,eventConsumer会按照用户配置的规则批量从batchBuffer读取数据,并写入workQueue,这是一个channel。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/pipeline/consumer.go
func (c *eventConsumer) loop(consumer queue.Consumer) {
    log := c.logger

    log.Debug("start pipeline event consumer")

    var (
        out    workQueue
        batch  *Batch
        paused = true
    )

    for {
        select {
        case <-c.done:
            log.Debug("stop pipeline event consumer")
            return
        case sig := <-c.sig:
            handleSignal(sig)
        // 将数据写入 workQueue
        case out <- batch:
            batch = nil
        }
    }
}

数据到workQueue之后,再由netClientWorker模块读取并通过调用output的Publish将数据真正的发送出去。这里以ElasticSearch类型的output为例展示代码流程(只保留了关键代码):

// libbeat/publisher/pipeline/output.go
func (w *netClientWorker) run() {
    for !w.closed.Load() {
        reconnectAttempts := 0

        // 从 workQueue读取数据
        // send loop
        for batch := range w.qu {
            if w.closed.Load() {
                if batch != nil {
                    batch.Cancelled()
                }
                return
            }
            // 发送到output
            err := w.client.Publish(batch) // libbeat/outputs/backoff.go
            if err != nil {
                logp.Err("Failed to publish events: %v", err)
                // on error return to connect loop
                break
            }
        }
    }
}

// libbeat/outputs/backoff.go
func (b *backoffClient) Publish(batch publisher.Batch) error {
    err := b.client.Publish(batch) // libbeat/outputs/elasticsearch/client.go
    if err != nil {
        b.client.Close()
    }
    backoff.WaitOnError(b.backoff, err)
    return err
}

// libbeat/outputs/elasticsearch/client.go
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // libbeat/publisher/pipeline/batch.go
    } else {
        batch.RetryEvents(rest)
    }
    return err
}

至此,整个pipeline的数据流程就算完成了,其实都是各种代码调用,并不复杂,只是需要花时间去看代码而已。接下来,再补充介绍一下pipeline中比较核心的spooler。

Queue

Filebeat提供了2种队列:内存队列和文件队列。实际中绝大多数应该用的都是内存队列,这里也仅介绍内存队列,文件队列的实现在libbeat/publisher/queue/spool目录下,有兴趣的自行查看,核心的东西和内存队列一致。内存队列的定义在libbeat/publisher/queue/memqueue目录下,定义队列的文件是broker.go

// 内存队列在代码中叫Broker
type Broker struct {
    done chan struct{}

    logger logger

    bufSize int
    // buf         brokerBuffer
    // minEvents   int
    // idleTimeout time.Duration

    // api channels
    events    chan pushRequest
    requests  chan getRequest
    pubCancel chan producerCancelRequest

    // internal channels
    acks          chan int
    scheduledACKs chan chanList

    eventer queue.Eventer

    // wait group for worker shutdown
    wg          sync.WaitGroup
    waitOnClose bool
}

前面的介绍的event buffer就是这里的events字段。该文件中还有一个NewBroker函数比较重要,里面是创建一个Broker,并且定义了eventLoop接口,该接口有2个实现:

  • directEventLoop(queue.mem.flush.min_events = 1
  • bufferingEventLoop(queue.mem.flush.min_events > 1

queue.mem.flush.min_events的默认值为2048,所以创建的是bufferingEventLoop,这里仅介绍该类型的EventLoop:

// libbeat/publisher/queue/memqueue/eventloop.go
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
    broker *Broker

    buf        *batchBuffer
    flushList  flushList
    eventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // active broker API channels
    events    chan pushRequest
    get       chan getRequest
    pubCancel chan producerCancelRequest

    // ack handling
    acks        chan int      // ackloop -> eventloop : total number of events ACKed by outputs
    schedACKS   chan chanList // eventloop -> ackloop : active list of batches to be acked
    pendingACKs chanList      // ordered list of active batches to be send to the ackloop
    ackSeq      uint          // ack batch sequence number to validate ordering

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time
}

前面提到的batchBuffer就是这里的buf字段。另外,看代码的时候注意区分一下spooler这个词的含义,大多数时候它都指代的是queue。同时,文件队列也称为spooler。

至此,整个pipeline就介绍完了。到这里,从Filebeat实例创建,到数据采集、发送都全部介绍完了。还差的就是ACK流程,这个是和数据发送流程相反的。

ACK流程

Filebeat基于Registrar+ACK的方式实现了至少发送一次的保证,Registrar前面已经介绍过了,最后看下ACK的流程,这部分相对复杂一些。

再看下Registrar的逻辑:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int
}


func (r *Registrar) Run() {
    logp.Debug("registrar", "Starting Registrar")
    // Writes registry on shutdown
    defer func() {
        r.writeRegistry()
        r.wg.Done()
    }()

    var (
        timer  *time.Timer
        flushC <-chan time.Time
    )

    for {
        select {
        case <-r.done:
            logp.Info("Ending Registrar")
            return
        case <-flushC:
            flushC = nil
            timer.Stop()
            r.flushRegistry()
        case states := <-r.Channel:         // 依靠这个channel通信
            // 收到channel中的确认信息之后,将数据写入registry文件
            r.onEvents(states)
            if r.flushTimeout <= 0 {
                r.flushRegistry()
            } else if flushC == nil {
                timer = time.NewTimer(r.flushTimeout)
                flushC = timer.C
            }
        }
    }
}

Registrar结构定义了一个Channel字段,这个channel就是用来接收ack消息的。Run方法里面从这个channel读取数据。然后看下ack数据是如何写到这个channel的。

首先filebeat.go初始化时注册了全局ACK处理回调:

// filebeat/beater/filebeat.go
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

    // Make sure all events that were published in
    registrarChannel := newRegistrarLogger(registrar)

    // 注册消息成功发送的回调
    err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
        ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
    })
    if err != nil {
        logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
        return err
    }
}
// filebeat/beater/filebeat.go
func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
    return &registrarLogger{
        done: make(chan struct{}),
        ch:   reg.Channel,
    }
}

// filebeat/beater/channels.go
type registrarLogger struct {
    done chan struct{}
    ch   chan<- []file.State
}
// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:
    }
}

// filebeat/beater/acker.go
type statefulLogger interface {
    Published(states []file.State)
}
// filebeat/beater/acker.go
func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
    return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}

// 注册的回调函数
func (a *eventACKer) ackEvents(data []interface{}) {
    stateless := 0
    states := make([]file.State, 0, len(data))
    for _, datum := range data {
        if datum == nil {
            stateless++
            continue
        }

        st, ok := datum.(file.State)
        if !ok {
            stateless++
            continue
        }

        states = append(states, st)
    }

    if len(states) > 0 {
        a.log.Debugw("stateful ack", "count", len(states))
        a.stateful.Published(states) // filebeat/beater/channels.go: func (l *registrarLogger) Published(states []file.State)
    }

    if stateless > 0 {
        a.log.Debugw("stateless ack", "count", stateless)
        a.stateless.Published(stateless) //
    }
}

这里最终注册的回调是eventACKer,里面的重点是a.stateful.Published,看下这个的实现:

// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:
    }
}

里面将最终的ack消息(states)写到了l.ch。这个channel就是Registrar那里的channel(从注册的代码里面可以分析出来),即回调函数将ack消息写入channel,然后Registrar从channel中读取states数据,写入registry文件,这样形成一个闭环。如下图:

ack-1

现在的问题就是:这个ackEvents回调函数的ack又是哪来的呢?是谁(who),在什么地方(where),什么时候(when),以何种方式(how)发送到ackEvents的?首先推断一下,既然是ack,那最源头当然应该是最终发送数据的地方发出,即发送数据完成得到外部确认之后,反向传递ack,正好和采集的时间传递方向相反,也就是核心应该在Publisher里面,或者说libbeat的pipeline里面。下面从pipeline中最核心的(内存)队列Broker模块开始分析。

// libbeat/publisher/queue/memqueue/broker.go
// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
    logger logger,
    settings Settings,
) *Broker {

    var eventLoop interface {
        run()
        processACK(chanList, int)
    }

    // 创建EventLoop
    if minEvents > 1 {
        eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
    } else {
        eventLoop = newDirectEventLoop(b, sz)
    }

    b.bufSize = sz
    // 创建AckLoop
    ack := newACKLoop(b, eventLoop.processACK)

    b.wg.Add(2)
    go func() {
        defer b.wg.Done()
        eventLoop.run()
    }()
    // 这个goroutine中启动 ack
    go func() {
        defer b.wg.Done()
        ack.run()
    }()
}

// libbeat/publisher/queue/memqueue/ackloop.go
func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
    l := &ackLoop{broker: b}
    l.processACK = processACK
    return l
}

NewBroker中创建了eventLoop和ackLoop,前者用于发送数据,前面已经讲过,后者则用于处理ack。看下ackLoop的代码:

// libbeat/publisher/queue/memqueue/ackloop.go
// ackLoop implements the brokers asynchronous ACK worker.
// Multiple concurrent ACKs from consecutive published batches will be batched up by the
// worker, to reduce the number of signals to return to the producer and the
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
    broker *Broker
    sig    chan batchAckMsg     // 确认消息发送的channel
    lst    chanList

    totalACK   uint64
    totalSched uint64

    batchesSched uint64
    batchesACKed uint64

    processACK func(chanList, int)
}

func (l *ackLoop) run() {
    for {
        select {
        case <-l.broker.done:
            return

        case acks <- acked:
            acks, acked = nil, 0

        case lst := <-l.broker.scheduledACKs:
            count, events := lst.count()
            l.lst.concat(&lst)

            l.batchesSched += uint64(count)
            l.totalSched += uint64(events)

        // 这里等待batch发送完成的确认信号
        case <-l.sig:
            acked += l.handleBatchSig()
            if acked > 0 {
                acks = l.broker.acks
            }
        }
    }
}

可以看到,ackloop中在等待batch发送完成的信号(sig),这里有2条线:

  1. 信号如何来的?
  2. 收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?

先来看第1个问题:信号如何来的?根据前面的推断,应该由发送数据那里产生。而由pipeline部分的分析知道最终数据发送在output那里。此处继续以ES这种类型的output为例,看下最终发送的代码,从那里反推:

// libbeat/outputs/elasticsearch/client.go  这是之前ES部分最终的发送代码
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // 重点看这里的ACK,这个会跳转到libbeat/publisher/pipeline/batch.go
    } else {
        batch.RetryEvents(rest)
    }
    return err
}

// libbeat/publisher/pipeline/batch.go
func (b *Batch) ACK() {
    b.ctx.observer.outBatchACKed(len(b.events)) // libbeat/publisher/pipeline/monitoring.go: func (o *metricsObserver) outBatchACKed(int) {}  这个是监控用的
    b.original.ACK()   // 重点看这里: libbeat/publiser/queue/memequeue/consumer.go
    releaseBatch(b)
}

// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) ACK() {
    if b.state != batchActive {
        switch b.state {
        case batchACK:
            panic("Can not acknowledge already acknowledged batch")
        default:
            panic("inactive batch")
        }
    }

    b.report()      // 重点在这里
}
// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) report() {
    b.ack.ch <- batchAckMsg{}      // 最终在这里发送了确认ACK
}

Bingo!在Publish里面发送完成(可能失败,可能成功)之后,就会发送ACK。然后根据调用关系往回推,最终在report中发送了ack,第1个问题就解答了,ack信号就是这样来的。然后看第2个问题:收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?接着看收到信号后调用的handleBatchSig的代码:

// libbeat/publisher/queue/memqueue/ackloop.go
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {

    if count > 0 {
        if e := l.broker.eventer; e != nil {
            e.OnACK(count)
        }
        // 这里会调用之前EventLoop的processACK,我们用的是bufferingEventLoop,所以会调用(*bufferingEventLoop)processACK
        // report acks to waiting clients
        l.processACK(lst, count) // libbeat/publisher/queue/memqueue/eventloop.go#(*bufferingEventLoop)processACK
    }

}

// libbeat/publisher/queue/memqueue/eventloop.go
func (l *bufferingEventLoop) processACK(lst chanList, N int) {
    for !lst.empty() {
            // 重点在这里,这里会调用ackEvents发送确认消息
            st.state.cb(int(count)) // libbeat/publisher/pipeline/acker.go (a *eventDataACK) ackEvents(n int)
        }
    }
}

handleBatchSig里面调用了对应类型的eventLoop的processACK方法,该方法内部会转到pipeline的acker.go,下面给出代码流转:

  1. libbeat/publisher/queue/memqueue/eventloop.go: (l *bufferingEventLoop) processACK --> st.state.cb(int(count))
  2. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) }
  3. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) }
  4. libbeat/publisher/pipeline/acker.go: func (a *gapCountACK) ackEvents(n int) {}

看下最后一个ackEvents:

func (a *gapCountACK) ackEvents(n int) {
    select {
    case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event
        a.acks = nil
    // ack数n写入了a.acks
    case a.acks <- n: // send ack event to worker
    }
}

// gapCountACK returns event ACKs to the producer, taking account for dropped events.
// Events being dropped by processors will always be ACKed with the last batch ACKed
// by the broker. This way clients waiting for ACKs can expect all processed
// events being always ACKed.
type gapCountACK struct {
    pipeline *Pipeline

    fn func(total int, acked int)

    done chan struct{}

    drop chan struct{}
    acks chan int

    events atomic.Uint32
    lst    gapList
}

数据会写入gapCountACKacks之后,会在ackLoop中读取:

// libbeat/publisher/pipeline/acker.go
func (a *gapCountACK) ackLoop() {
    acks, drop := a.acks, a.drop
    closing := false
    for {
        select {
        case <-a.done:
            closing = true
            a.done = nil
            if a.events.Load() == 0 {
                // stop worker, if all events accounted for have been ACKed.
                // If new events are added after this acker won't handle them, which may
                // result in duplicates
                return
            }

        case <-a.pipeline.ackDone:
            return

        case n := <-acks:
            // 重点:从acks读出n之后调用handleACK处理
            empty := a.handleACK(n)
            if empty && closing && a.events.Load() == 0 {
                // stop worker, if and only if all events accounted for have been ACKed
                return
            }

        case <-drop:
            // TODO: accumulate multiple drop events + flush count with timer
            a.events.Sub(1)
            a.fn(1, 0)
        }
    }
}

func (a *gapCountACK) handleACK(n int) bool {
    a.events.Sub(uint32(total))
    a.fn(total, acked) // line 326: func (a *boundGapCountACK) onACK(total, acked int) {
    return emptyLst
}

从acks读出n之后调用handleACK处理,后续的调用流程如下:

1. libbeat/publisher/pipeline/acker.go: handleACK --> a.fn(total, acked)
2. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) onACK(total, acked int) --> a.fn(total, acked)
3. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) onACK(total, acked int) --> a.fn(data, acked) 
4. libbeat/publisher/pipeline/pipeline_ack.go: func (p *pipelineEventCB) onEvents(data []interface{}, acked int)

最终进入到pipelineEventCB中,这个结构是内部处理ack的(我理解pipelineEventCB命名的含义是pipeline中event的callback函数,可见它是处理ack的核心),看下关键代码:

// libbeat/publisher/pipeline/pipeline_ack.go
// pipelineEventCB internally handles active ACKs in the pipeline.
// It receives ACK events from the queue and the individual clients.
// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect
// events from all clients having published events in the last batch of events
// being ACKed.
// the PipelineACKHandler will be notified, once all events being ACKed
// (including dropped events) have been collected. Only one ACK-event is handled
// at a time. The pipeline global and clients ACK handler will be blocked for the time
// an ACK event is being processed.
type pipelineEventCB struct {
    done chan struct{}

    acks chan int

    // 这个字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel
    events        chan eventsDataMsg    
    droppedEvents chan eventsDataMsg

    mode    pipelineACKMode
    handler beat.PipelineACKHandler
}

其中的events字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel。接着之前的调用,看数据如何写到events这个channel:

// reportEvents sends a batch of ACKed events to the ACKer.
// The events array contains send and dropped events. The `acked` counters
// indicates the total number of events acked by the pipeline.
// That is, the number of dropped events is given by `len(events) - acked`.
// A client can report events with acked=0, iff the client has no waiting events
// in the pipeline (ACK ordering requirements)
//
// Note: the call blocks, until the ACK handler has collected all active events
//       from all clients. This ensure an ACK event being fully 'captured'
//       by the pipeline, before receiving/processing another ACK event.
//       In the meantime the queue has the chance of batching-up more ACK events,
//       such that only one ACK event is being reported to the pipeline handler
func (p *pipelineEventCB) onEvents(data []interface{}, acked int) {
    p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked})
}

func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) {
    if msg.acked == 0 {
        p.droppedEvents <- msg
    } else {
        msg.sig = make(chan struct{})
        p.events <- msg // 此处写入channel后,在(p *pipelineEventCB) worker()的collect中读出,最后reportEventsData
        <-msg.sig
    }
}

pushMsg中,将消息写入events中。然后看下channel另一端的读取:

func (p *pipelineEventCB) worker() {
    defer close(p.acks)
    defer close(p.events)
    defer close(p.droppedEvents)

    for {
        select {
        case count := <-p.acks:
            // 在collect中读取消息
            exit := p.collect(count)
            if exit {
                return
            }

            // short circuit dropped events, but have client block until all events
            // have been processed by pipeline ack handler
        case msg := <-p.droppedEvents:
            p.reportEventsData(msg.data, msg.total)
            if msg.sig != nil {
                close(msg.sig)
            }

        case <-p.done:
            return
        }
    }
}

func (p *pipelineEventCB) collect(count int) (exit bool) {
    for acked < count {
        var msg eventsDataMsg
        select {
        // 在此处读取消息
        case msg = <-p.events:
        case msg = <-p.droppedEvents:
        case <-p.done:
            exit = true
            return
        }
    }

    p.reportEventsData(data, total)
    return
}

func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) {
    // report ACK back to the beat
    switch p.mode {
    case countACKMode:
        p.handler.ACKCount(total)
    case eventsACKMode:
        // 这里调用之前在 Filebeat中注册的 ACKEvents
        p.handler.ACKEvents(data) // filebeat/beater/acker.go: func (a *eventACKer) ackEvents(data []interface{})
    case lastEventsACKMode:
        p.handler.ACKLastEvents(data)
    }
}

可以看到从channel中读出确认消息后,最终会在reportEventsData中调用之前在Filebeat中注册的ACKEvents:eventACKer。至此,第2个问题也得到了解答,并且和之前的回调函数成功对接。把之前的图补充完整如下:

ack-2

总结

本文从源码角度对Filebeat的核心数据流转进行了简单的分析,为了突出重点,省掉了很多细节代码,比如各个环节的数据结构的实例是何时创建的,又是何时启动的,以及一些异常分支的处理、失败重传等。正如前文所说,本文并不是一篇全面的Filebeat代码深度剖析,而是通用配置下的核心数据流转分析。这篇文章我改了很多遍,总感觉没写什么东西,只是贴了了大量代码,然后讲了代码的调用点,而这些其实自己debug几遍也就清楚了。不过话说回来,如果事先知道整个逻辑流程,以及关键调用点,就能在debug时做到胸有成竹、有所侧重,从而节省很多时间。我想这篇文章对我最大的意义有两点:一个是让自己对Filebeat更加了解,另外一个是可能会给其它看源码的人节省一些时间。这就够了。

Reference