本文对Filebeat代码进行简单分析,作为之前 Filebeat导致文件无法被删除的原因分析 一文的补充,当然也可单独阅读,了解Filebeat的代码逻辑。需要注意的是:本文不是全面、深度的Filebeat源码剖析,而是专注在通用配置下核心数据的流转上面,目标是理清楚数据从采集到中间流转,最后到发送的流程,而不是对每处代码细节进行分析讲解。本文核心点包括:

  1. Filebeat实例的创建和初始化流程;
  2. 文件的遍历、采集,包括Crawler、Input、Harvester、Reader等核心概念;
  3. 数据发送逻辑,即Publisher/Pipeline;
  4. Filebeat的ACK机制。



Filebeat是Elastic公司Beat系列的一员,所有Beat的源码都归档在一起,所以下载源码直接克隆Beat的代码即可。需要注意的是早期Go的模块依赖使用的是govendor,这种方式需要代码在GOPATH的src目录下面,而Go 1.11版本中引入了Go module作为新的模块依赖,并且在1.12版本中正式启用,作为默认选项(当然用户可以通过GO111MODULE这个环境变量来控制是否使用Go module功能)。这个特性的转变对于搭建Filebeat的源码环境也有一些影响,查看一下你想阅读的Beat的分支是否有go.mod文件,如果有则说明使用的是Go module,否则还是老的govendor。如果是govendor,那你必须将代码放到${GOPATH}/src/github.com/elastic,且GO111MODULE设置为auto或者off;如果是Go module,则没有这个要求。至于Golang的版本,不要比官方要求的版本低就行。


mkdir -p ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats
cd ${GOPATH}/src/github.com/elastic/beats
git checkout 6.8
我的Golang版本是1.15.7,编辑器是vscode。代码下载好以后,直接运行根目录下面的filebeat目录中的main.go,如果可以运行,那环境就算搭建好了。注意:如果提示配置文件filebeat.yml找不到,那环境也没问题,可能是因为你启动Filebeat的目录没有这个配置文件,可以通过-c <配置文件>这个命令行参数去指定一个filebeat配置即可。另外,Filebeat日志默认只输出到文件,如果调试的时候想输出到终端,再增加一个-e参数。如果是vscode,则直接修改.vscode/launch.json即可:

    // Use IntelliSense to learn about possible attributes.
    // Hover to view descriptions of existing attributes.
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
            "name": "Launch file",
            "type": "go",
            "request": "launch",
            "mode": "debug",
            "program": "${file}",
            "args": ["-e"]





// Event is the common event format shared by all beats.
// Every event must have a timestamp and provide encodable Fields in `Fields`.
// The `Meta`-fields can be used to pass additional meta-data to the outputs.
// Output can optionally publish a subset of Meta, or ignore Meta.
type Event struct {
    Timestamp time.Time
    Meta      common.MapStr
    Fields    common.MapStr
    Private   interface{} // for beats private use




  • Filebeat instance:Filebeat实例的创建和初始化;
  • Crawler:遍历配置中的input,创建、初始化Input实例;
  • Input:遍历配置的目录、文件,为每个需要采集的文件创建、初始化Harvester;
  • Harvester:读取文件内容,以event的方式发送到后端的队列(本文以最常使用的内存队列为例讲解);
  • Queue/Spooler:Filebeat内部的队列有两种:内存队列(Broker)和文件队列(Spooler),代码中文件队列叫spooler,但大多数地方spooler是queue的代名词,而不专指文件队列;
  • Publisher:libbeat中的发送逻辑,整个发送流程在代码中称为pipeline
  • Registrar:记录采集过的文件及位置(offset),和ACK机制一起实现Filebeat承诺的至少发送一次(at-least once)的保证。


实例初始化 && Registrar


// libbeat/beat/beat.go

// Beater is the interface that must be implemented by every Beat. A Beater
// provides the main Run-loop and a Stop method to break the Run-loop.
// Instantiation and Configuration is normally provided by a Beat-`Creator`.
// Once the beat is fully configured, the Run() method is invoked. The
// Run()-method implements the beat its run-loop. Once the Run()-method returns,
// the beat shuts down.
// The Stop() method is invoked the first time (and only the first time) a
// shutdown signal is received. The Stop()-method normally will stop the Run()-loop,
// such that the beat can gracefully shutdown.
type Beater interface {
    // The main event loop. This method should block until signalled to stop by an
    // invocation of the Stop() method.
    Run(b *Beat) error

    // Stop is invoked to signal that the Run method should finish its execution.
    // It will be invoked at most once.

// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
    Info      Info     // beat metadata.
    Publisher Pipeline // Publisher pipeline

    SetupMLCallback SetupMLCallback // setup callback for ML job configs
    InSetupCmd      bool            // this is set to true when the `setup` command is called

    OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback
    // XXX: remove Config from public interface.
    //      It's currently used by filebeat modules to setup the Ingest Node
    //      pipeline and ML jobs.
    Config *BeatConfig // Common Beat configuration data.

    BeatConfig *common.Config // The beat's own configuration section

    Fields []byte // Data from fields.yml

    ConfigManager management.ConfigManager // config manager


// filebeat/main.go

// The basic model of execution:
// - input: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
    if err := cmd.RootCmd.Execute(); err != nil {


  1. filebeat/main.go#main
  2. filebeat/cmd/root.go#init: RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
  3. libbeat/cmd/root.go#GenRootCmdWithSettings: rootCmd.RunCmd = genRunCmd(settings, beatCreator, runFlags)
  4. libbeat/cmd/run.go#genRunCmd: err := instance.Run(settings, beatCreator)
  5. libbeat/cmd/instance/beat.go#Run: return b.launch(settings, bt)
  6. libbeat/cmd/instance/beat.go#launch: return beater.Run(&b.Beat)
  7. filebeat/beater/filebeat.go#Run


Filebeat的at-least once采集就是通过Registrar模块实现的,该文件会记录采集过的文件以及最后采集的位置,并且只有收到数据成功发送的确认后才会更新,其核心结构如下:

// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int

每次重启Filebeat后,在registrar启动的时候会调用func (r *Registrar) loadStates()读取registry文件中的内容,和需要采集的文件作对比(func (p *Input) loadStates(states []file.State)),确认哪些文件采集过了,哪些有更新,需要接着上次的地方继续采集。


err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
    ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,


    "source": "/Users/allan/Desktop/temp/test-logs/test-1.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.435218+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712351738,
        "device": 16777225
}, {
    "source": "/Users/allan/Desktop/temp/test-logs/test-2.log",
    "offset": 362,
    "timestamp": "2021-02-28T21:28:55.24922+08:00",
    "ttl": -1,
    "type": "log",
    "meta": null,
    "FileStateOS": {
        "inode": 8712603538,
        "device": 16777225

另外,为了保证数据不丢失,实例初始化的时候各大模块的启动顺序依次是:registrar --> publisher --> spooler --> crawler/input. 而实例停止的时候各大模块的停止顺序则正好相反。下面看Crawler和Input。

输入/文件遍历(Crawler && Input)




type Crawler struct {
    inputs          map[uint64]*input.Runner   // 包含若干个Input
    inputConfigs    []*common.Config
    out             channel.Factory
    wg              sync.WaitGroup
    InputsFactory   cfgfile.RunnerFactory
    ModulesFactory  cfgfile.RunnerFactory
    modulesReloader *cfgfile.Reloader
    inputReloader   *cfgfile.Reloader
    once            bool
    beatVersion     string
    beatDone        chan struct{}


// Start starts the crawler with all inputs
func (c *Crawler) Start(
    pipeline beat.Pipeline,
    r *registrar.Registrar,
    configInputs *common.Config,
    configModules *common.Config,
    pipelineLoaderFactory fileset.PipelineLoaderFactory,
    overwritePipelines bool,
) error {

    logp.Info("Loading Inputs: %v", len(c.inputConfigs))

    // Prospect the globs/paths given on the command line and launch harvesters
    for _, inputConfig := range c.inputConfigs {
        err := c.startInput(pipeline, inputConfig, r.GetStates())
        if err != nil {
            return err

    // 省略部分代码
    logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))

    return nil

// 创建、启动Input
func (c *Crawler) startInput(
    pipeline beat.Pipeline,
    config *common.Config,
    states []file.State,
) error {
    if !config.Enabled() {
        return nil

    connector := channel.ConnectTo(pipeline, c.out)
    p, err := input.New(config, connector, c.beatDone, states, nil)
    if err != nil {
        return fmt.Errorf("Error in initing input: %s", err)
    p.Once = c.once

    if _, ok := c.inputs[p.ID]; ok {
        return fmt.Errorf("Input with same ID already exists: %d", p.ID)

    c.inputs[p.ID] = p


    return nil



// filebeat/input/input.go
// Input is the interface common to all input
type Input interface {

// Start starts the input
func (p *Runner) Start() {
    logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID)

    // 省略部分代码

    // Add waitgroup to make sure input is finished
    go func() {
        defer func() {


// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Runner) Run() {
    // Initial input run

    // 省略部分代码


// Input contains the input and its config
type Input struct {
    cfg           *common.Config
    config        config
    states        *file.States
    harvesters    *harvester.Registry   // 1个input包含多个harvesters
    outlet        channel.Outleter
    stateOutlet   channel.Outleter
    done          chan struct{}
    numHarvesters atomic.Uint32
    meta          map[string]string
    stopOnce      sync.Once

// Run runs the input
func (p *Input) Run() {
    logp.Debug("input", "Start next scan")

    // TailFiles is like ignore_older = 1ns and only on startup
    if p.config.TailFiles {
        ignoreOlder := p.config.IgnoreOlder

        // Overwrite ignore_older for the first scan
        p.config.IgnoreOlder = 1
        defer func() {
            // Reset ignore_older after first run
            p.config.IgnoreOlder = ignoreOlder
            // Disable tail_files after the first run
            p.config.TailFiles = false

    // It is important that a first scan is run before cleanup to make sure all new states are read first
    if p.config.CleanInactive > 0 || p.config.CleanRemoved {
        beforeCount := p.states.Count()
        cleanedStates, pendingClean := p.states.Cleanup()
        logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
            beforeCount, beforeCount-cleanedStates, pendingClean)

    // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
    if p.config.CleanRemoved {
        for _, state := range p.states.GetStates() {
            // os.Stat will return an error in case the file does not exist
            stat, err := os.Stat(state.Source)
            if err != nil {
                if os.IsNotExist(err) {
                    logp.Debug("input", "Remove state for file as file removed: %s", state.Source)
                } else {
                    logp.Err("input state for %s was not removed: %s", state.Source, err)
            } else {
                // Check if existing source on disk and state are the same. Remove if not the case.
                newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
                if !newState.FileStateOS.IsSame(state.FileStateOS) {
                    logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)


// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {
    var sortInfos []FileSortInfo
    var files []string

    // 获取目录下需要被采集的文件,是否需要被采集的逻辑就是在getFiles()方法中实现
    paths := p.getFiles()

    // 省略部分代码

    for i := 0; i < len(paths); i++ {
        // 省略一些判断是否采集过,以及采集到哪里的代码

        // Decides if previous state exists
        if lastState.IsEmpty() {
            logp.Debug("input", "Start harvester for new file: %s", newState.Source)
            // 启动harvester
            err := p.startHarvester(newState, 0)
            // 省略错误处理代码
        } else {
            p.harvestExistingFile(newState, lastState)

// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
    logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)

    if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
        logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
        // 启动harvester采集
        err := p.startHarvester(newState, oldState.Offset)
        if err != nil {
            logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
    // 省略后续代码

// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(state file.State, offset int64) error {
    if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
        return errHarvesterLimit
    // Set state to "not" finished to indicate that a harvester is running
    state.Finished = false
    state.Offset = offset

    // Create harvester with state
    h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })
    if err != nil {
        return err

    err = h.Setup()
    if err != nil {
        return fmt.Errorf("error setting up harvester: %s", err)

    // Update state before staring harvester
    // This makes sure the states is set to Finished: false
    // This is synchronous state update as part of the scan

    if err = p.harvesters.Start(h); err != nil {
    return err






// Harvester contains all methods which must be supported by each harvester
// so the registry can be used by the input
type Harvester interface {
    ID() uuid.UUID      
    Run() error


// Harvester contains all harvester related data
type Harvester struct {
    id     uuid.UUID
    config config
    source harvester.Source // the source being watched

    // shutdown handling
    done     chan struct{}
    stopOnce sync.Once
    stopWg   *sync.WaitGroup
    stopLock sync.Mutex

    // internal harvester state
    state  file.State
    states *file.States
    log    *Log

    // file reader pipeline
    reader          reader.Reader
    encodingFactory encoding.EncodingFactory
    encoding        encoding.Encoding

    // event/state publishing
    outletFactory OutletFactory
    publishState  func(*util.Data) bool

    onTerminate func()

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
    // 该方法在上篇文章中已经介绍过,下面省略掉了大部分代码,只保留了读取和发送
    for {
        // 读取
        message, err := h.reader.Next()

        // 发送
        if !h.sendEvent(data, forwarder) {
            return nil




// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
    Next() (Message, error)

// Message represents a reader event with timestamp, content and actual number
// of bytes read from input before decoding.
type Message struct {
    Ts      time.Time     // timestamp the content was read
    Content []byte        // actual content read
    Bytes   int           // total number of bytes read to generate the message
    Fields  common.MapStr // optional fields that can be added by reader




// filebeat/input/log/log.go
func (f *Log) Read(buf []byte) (int, error) {
    for {
        n, err := f.fs.Read(buf)  // 调用go底层os/file.go#Read

// os/file.go#Read
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
    if err := f.checkValid("read"); err != nil {
        return 0, err
    n, e := f.read(b)
    return n, f.wrapErr("read", e)



// filebeat/reader/readfile/line.go
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
    reader     io.Reader
    codec      encoding.Encoding
    bufferSize int
    maxBytes   int
    nl         []byte
    inBuffer   *streambuf.Buffer
    outBuffer  *streambuf.Buffer
    inOffset   int // input buffer read offset
    byteCount  int // number of bytes decoded from input buffer into output buffer
    decoder    transform.Transformer

// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
    for {
        err := r.advance()

// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *LineReader) advance() error {
    // fill inBuffer until '\n' sequence has been found in input buffer
    for idx == -1 {
        // try to read more bytes into buffer   filebeat/input/log/log.go#Read
        n, err := r.reader.Read(buf)


// filebeat/reader/readfile/encode.go
// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type EncoderReader struct {
    reader *LineReader

// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r EncoderReader) Next() (reader.Message, error) {
    c, sz, err := r.reader.Next()
    // Creating message object
    return reader.Message{
        Ts:      time.Now(),
        Content: c,
        Bytes:   sz,
    }, err

JSON Reader,处理JSON格式的数据:

// filebeat/reader/json/json.go
type JSON struct {
    reader reader.Reader
    cfg    *Config

// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (reader.Message, error) {

StripNewline Reader,去掉后面的换行符:

// filebeat/reader/readfile/strip_newline.go
// StripNewline reader removes the last trailing newline characters from
// read lines.
type StripNewline struct {
    reader reader.Reader

// Next returns the next line.
func (p *StripNewline) Next() (reader.Message, error) {

Timeout Reader,超时处理:

// filebeat/reader/readfile/timeout.go
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
    reader  reader.Reader
    timeout time.Duration
    signal  error
    running bool
    ch      chan lineMessage

// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {

Multiline Reader,多行合并处理:

// filebeat/reader/multiline/multiline.go
// MultiLine reader combining multiple line events into one multi-line event.
// Lines to be combined are matched by some configurable predicate using
// regular expression.
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type Reader struct {
    reader       reader.Reader
    pred         matcher
    flushMatcher *match.Matcher
    maxBytes     int // bytes stored in content
    maxLines     int
    separator    []byte
    last         []byte
    numLines     int
    truncated    int
    err          error // last seen error
    state        func(*Reader) (reader.Message, error)
    message      reader.Message

// Next returns next multi-line event.
func (mlr *Reader) Next() (reader.Message, error) {
    return mlr.state(mlr)


// filebeat/reader/readfile/limit.go
// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type LimitReader struct {
    reader   reader.Reader
    maxBytes int

// Next returns the next line.
func (r *LimitReader) Next() (reader.Message, error) {
    message, err := r.reader.Next()
    if len(message.Content) > r.maxBytes {
        message.Content = message.Content[:r.maxBytes]
        message.AddFlagsWithKey("log.flags", "truncated")
    return message, err


// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
    err := h.open()
    if err != nil {
        return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)

    // 在newLogFileReader中根据用户配置装配 Reader 流
    h.reader, err = h.newLogFileReader()
    if err != nil {
        if h.source != nil {
        return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)

    return nil




  1. filebeat/input/log/harvester.go#Run(): h.sendEvent(data, forwarder)
  2. filebeat/input/log/harvester.go#sendEvent: forwarder.Send(data)
  3. filebeat/harvester/forwarder.go#Send: f.Outlet.OnEvent(data)
  4. filebeat/channel/util.go#OnEvent



// filebeat/channel/util.go
func (o *subOutlet) OnEvent(d *util.Data) bool {

    defer o.mutex.Unlock()
    select {
    case <-o.done:
        return false

    select {
    case <-o.done:
        return false
    // 数据写入channel
    case o.ch <- d:
        select {
        case <-o.done:
            return true
        // 等待结果
        case ret := <-o.res:
            return ret

// filebeat/channel/util.go
// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
    s := &subOutlet{
        done: make(chan struct{}),
        ch:   make(chan *util.Data),
        res:  make(chan bool, 1),

    go func() {
        // 从channel读取数据,并调用OnEvent发送数据
        for event := range s.ch {
            s.res <- out.OnEvent(event)

    return s


func (o *outlet) OnEvent(d *util.Data) bool {
    if !o.isOpen.Load() {
        return false

    event := d.GetEvent()
    if d.HasState() {
        event.Private = d.GetState()

    if o.wg != nil {

    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish

    return o.isOpen.Load()


  1. 数据经由filebeat/channel/util.go#OnEvent中的case o.ch <- d写入到channel中,然后在内层的select中等待结果;
  2. 同时filebeat/channel/util.go#Outleter里面的一个goroutine会读取写入到channel的数据,并调用filebeat/channel/outlet.go#OnEvent中的o.client.Publish(event)发送数据,发送成功后,将结果写回到subOutlet的res channel,然后harvester返回。

上篇文章中harvester关闭不掉,就是因为卡在了o.client.Publish(event)无法返回,所以harvester卡在了subOutlet的res channel,即一直等待发送结果。






// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline

// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most central entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
type Pipeline struct {
    beatInfo beat.Info

    logger *logp.Logger
    queue  queue.Queue
    output *outputController

    observer observer

    eventer pipelineEventer

    // wait close support
    waitCloseMode    WaitCloseMode
    waitCloseTimeout time.Duration
    waitCloser       *waitCloser

    // pipeline ack
    ackMode    pipelineACKMode
    ackActive  atomic.Bool
    ackDone    chan struct{}
    ackBuilder ackBuilder
    eventSema  *sema

    processors pipelineProcessors




// filebeat/channel/outlet.go
func (o *outlet) OnEvent(d *util.Data) bool {
    o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish

// libbeat/publisher/pipeline/client.go
func (c *client) publish(e beat.Event) {
    // 如果定义了processor,则在此处执行
    if c.processors != nil {
        var err error

        event, err = c.processors.Run(event)
        publish = event != nil
        if err != nil {
            // TODO: introduce dead-letter queue?

            log.Errorf("Failed to publish event: %v", err)

    var published bool
    if c.canDrop {
        published = c.producer.TryPublish(pubEvent)
    } else {
        published = c.producer.Publish(pubEvent) // queue/memqueue/produce.go

// libbeat/publisher/queue/memqueue/produce.go
func (p *forgetfulProducer) Publish(event publisher.Event) bool {
    return p.openState.publish(p.makeRequest(event))
func (st *openState) publish(req pushRequest) bool {
    select {
    // 将数据发送到events buffer中
    case st.events <- req:
        return true
    case <-st.done:
        st.events = nil
        return false

然后EventLoop从events buffer中读取数据写到batchBuffer中,batchBuffer是一个Slice,其大小为queue.mem.events(默认值为4096)。这部分功能对应代码如下(只保留了关键代码):

// libbeat/publisher/queue/memqueue/eventloop.go
// 创建EventLoop
func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
    l := &bufferingEventLoop{
        broker:       b,
        maxEvents:    size,
        minEvents:    minEvents,
        flushTimeout: flushTimeout,
        // 直接使用Broker的events
        events:    b.events,
        get:       nil,
        pubCancel: b.pubCancel,
        acks:      b.acks,
    l.buf = newBatchBuffer(l.minEvents)

    l.timer = time.NewTimer(flushTimeout)
    if !l.timer.Stop() {

    return l

func (l *bufferingEventLoop) run() {
    var (
        broker = l.broker

    for {
        select {
        case <-broker.done:

        case req := <-l.events: // producer pushing new event

func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
    // insert会把数据写入batchBuffer
    if l.insert(req) {
        if l.eventCount == l.maxEvents {
            // 队列面了就把chan设置为nil,此时写会被阻塞。等收到ack(handleACK)后又会恢复队列
            l.events = nil // stop inserting events if upper limit is reached


// libbeat/publisher/pipeline/consumer.go
func (c *eventConsumer) loop(consumer queue.Consumer) {
    log := c.logger

    log.Debug("start pipeline event consumer")

    var (
        out    workQueue
        batch  *Batch
        paused = true

    for {
        select {
        case <-c.done:
            log.Debug("stop pipeline event consumer")
        case sig := <-c.sig:
        // 将数据写入 workQueue
        case out <- batch:
            batch = nil


// libbeat/publisher/pipeline/output.go
func (w *netClientWorker) run() {
    for !w.closed.Load() {
        reconnectAttempts := 0

        // 从 workQueue读取数据
        // send loop
        for batch := range w.qu {
            if w.closed.Load() {
                if batch != nil {
            // 发送到output
            err := w.client.Publish(batch) // libbeat/outputs/backoff.go
            if err != nil {
                logp.Err("Failed to publish events: %v", err)
                // on error return to connect loop

// libbeat/outputs/backoff.go
func (b *backoffClient) Publish(batch publisher.Batch) error {
    err := b.client.Publish(batch) // libbeat/outputs/elasticsearch/client.go
    if err != nil {
    backoff.WaitOnError(b.backoff, err)
    return err

// libbeat/outputs/elasticsearch/client.go
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // libbeat/publisher/pipeline/batch.go
    } else {
    return err




// 内存队列在代码中叫Broker
type Broker struct {
    done chan struct{}

    logger logger

    bufSize int
    // buf         brokerBuffer
    // minEvents   int
    // idleTimeout time.Duration

    // api channels
    events    chan pushRequest
    requests  chan getRequest
    pubCancel chan producerCancelRequest

    // internal channels
    acks          chan int
    scheduledACKs chan chanList

    eventer queue.Eventer

    // wait group for worker shutdown
    wg          sync.WaitGroup
    waitOnClose bool

前面的介绍的event buffer就是这里的events字段。该文件中还有一个NewBroker函数比较重要,里面是创建一个Broker,并且定义了eventLoop接口,该接口有2个实现:

  • directEventLoop(queue.mem.flush.min_events = 1
  • bufferingEventLoop(queue.mem.flush.min_events > 1


// libbeat/publisher/queue/memqueue/eventloop.go
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
    broker *Broker

    buf        *batchBuffer
    flushList  flushList
    eventCount int

    minEvents    int
    maxEvents    int
    flushTimeout time.Duration

    // active broker API channels
    events    chan pushRequest
    get       chan getRequest
    pubCancel chan producerCancelRequest

    // ack handling
    acks        chan int      // ackloop -> eventloop : total number of events ACKed by outputs
    schedACKS   chan chanList // eventloop -> ackloop : active list of batches to be acked
    pendingACKs chanList      // ordered list of active batches to be send to the ackloop
    ackSeq      uint          // ack batch sequence number to validate ordering

    // buffer flush timer state
    timer *time.Timer
    idleC <-chan time.Time






// filebeat/registrar/registrar.go
type Registrar struct {
    Channel      chan []file.State
    out          successLogger
    done         chan struct{}
    registryFile string      // Path to the Registry File
    fileMode     os.FileMode // Permissions to apply on the Registry File
    wg           sync.WaitGroup

    states               *file.States // Map with all file paths inside and the corresponding state
    gcRequired           bool         // gcRequired is set if registry state needs to be gc'ed before the next write
    gcEnabled            bool         // gcEnabled indicates the registry contains some state that can be gc'ed in the future
    flushTimeout         time.Duration
    bufferedStateUpdates int

func (r *Registrar) Run() {
    logp.Debug("registrar", "Starting Registrar")
    // Writes registry on shutdown
    defer func() {

    var (
        timer  *time.Timer
        flushC <-chan time.Time

    for {
        select {
        case <-r.done:
            logp.Info("Ending Registrar")
        case <-flushC:
            flushC = nil
        case states := <-r.Channel:         // 依靠这个channel通信
            // 收到channel中的确认信息之后,将数据写入registry文件
            if r.flushTimeout <= 0 {
            } else if flushC == nil {
                timer = time.NewTimer(r.flushTimeout)
                flushC = timer.C



// filebeat/beater/filebeat.go
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {

    // Make sure all events that were published in
    registrarChannel := newRegistrarLogger(registrar)

    // 注册消息成功发送的回调
    err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
        ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
    if err != nil {
        logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
        return err
// filebeat/beater/filebeat.go
func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
    return &registrarLogger{
        done: make(chan struct{}),
        ch:   reg.Channel,

// filebeat/beater/channels.go
type registrarLogger struct {
    done chan struct{}
    ch   chan<- []file.State
// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:

// filebeat/beater/acker.go
type statefulLogger interface {
    Published(states []file.State)
// filebeat/beater/acker.go
func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
    return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}

// 注册的回调函数
func (a *eventACKer) ackEvents(data []interface{}) {
    stateless := 0
    states := make([]file.State, 0, len(data))
    for _, datum := range data {
        if datum == nil {

        st, ok := datum.(file.State)
        if !ok {

        states = append(states, st)

    if len(states) > 0 {
        a.log.Debugw("stateful ack", "count", len(states))
        a.stateful.Published(states) // filebeat/beater/channels.go: func (l *registrarLogger) Published(states []file.State)

    if stateless > 0 {
        a.log.Debugw("stateless ack", "count", stateless)
        a.stateless.Published(stateless) //


// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
    select {
    case <-l.done:
        // set ch to nil, so no more events will be send after channel close signal
        // has been processed the first time.
        // Note: nil channels will block, so only done channel will be actively
        //       report 'closed'.
        l.ch = nil
    case l.ch <- states:




// libbeat/publisher/queue/memqueue/broker.go
// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
    logger logger,
    settings Settings,
) *Broker {

    var eventLoop interface {
        processACK(chanList, int)

    // 创建EventLoop
    if minEvents > 1 {
        eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
    } else {
        eventLoop = newDirectEventLoop(b, sz)

    b.bufSize = sz
    // 创建AckLoop
    ack := newACKLoop(b, eventLoop.processACK)

    go func() {
        defer b.wg.Done()
    // 这个goroutine中启动 ack
    go func() {
        defer b.wg.Done()

// libbeat/publisher/queue/memqueue/ackloop.go
func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
    l := &ackLoop{broker: b}
    l.processACK = processACK
    return l


// libbeat/publisher/queue/memqueue/ackloop.go
// ackLoop implements the brokers asynchronous ACK worker.
// Multiple concurrent ACKs from consecutive published batches will be batched up by the
// worker, to reduce the number of signals to return to the producer and the
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
    broker *Broker
    sig    chan batchAckMsg     // 确认消息发送的channel
    lst    chanList

    totalACK   uint64
    totalSched uint64

    batchesSched uint64
    batchesACKed uint64

    processACK func(chanList, int)

func (l *ackLoop) run() {
    for {
        select {
        case <-l.broker.done:

        case acks <- acked:
            acks, acked = nil, 0

        case lst := <-l.broker.scheduledACKs:
            count, events := lst.count()

            l.batchesSched += uint64(count)
            l.totalSched += uint64(events)

        // 这里等待batch发送完成的确认信号
        case <-l.sig:
            acked += l.handleBatchSig()
            if acked > 0 {
                acks = l.broker.acks


  1. 信号如何来的?
  2. 收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?


// libbeat/outputs/elasticsearch/client.go  这是之前ES部分最终的发送代码
func (client *Client) Publish(batch publisher.Batch) error {
    events := batch.Events()
    rest, err := client.publishEvents(events)
    if len(rest) == 0 {
        batch.ACK() // 重点看这里的ACK,这个会跳转到libbeat/publisher/pipeline/batch.go
    } else {
    return err

// libbeat/publisher/pipeline/batch.go
func (b *Batch) ACK() {
    b.ctx.observer.outBatchACKed(len(b.events)) // libbeat/publisher/pipeline/monitoring.go: func (o *metricsObserver) outBatchACKed(int) {}  这个是监控用的
    b.original.ACK()   // 重点看这里: libbeat/publiser/queue/memequeue/consumer.go

// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) ACK() {
    if b.state != batchActive {
        switch b.state {
        case batchACK:
            panic("Can not acknowledge already acknowledged batch")
            panic("inactive batch")

    b.report()      // 重点在这里
// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) report() {
    b.ack.ch <- batchAckMsg{}      // 最终在这里发送了确认ACK


// libbeat/publisher/queue/memqueue/ackloop.go
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {

    if count > 0 {
        if e := l.broker.eventer; e != nil {
        // 这里会调用之前EventLoop的processACK,我们用的是bufferingEventLoop,所以会调用(*bufferingEventLoop)processACK
        // report acks to waiting clients
        l.processACK(lst, count) // libbeat/publisher/queue/memqueue/eventloop.go#(*bufferingEventLoop)processACK


// libbeat/publisher/queue/memqueue/eventloop.go
func (l *bufferingEventLoop) processACK(lst chanList, N int) {
    for !lst.empty() {
            // 重点在这里,这里会调用ackEvents发送确认消息
            st.state.cb(int(count)) // libbeat/publisher/pipeline/acker.go (a *eventDataACK) ackEvents(n int)


  1. libbeat/publisher/queue/memqueue/eventloop.go: (l *bufferingEventLoop) processACK --> st.state.cb(int(count))
  2. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) }
  3. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) }
  4. libbeat/publisher/pipeline/acker.go: func (a *gapCountACK) ackEvents(n int) {}


func (a *gapCountACK) ackEvents(n int) {
    select {
    case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event
        a.acks = nil
    // ack数n写入了a.acks
    case a.acks <- n: // send ack event to worker

// gapCountACK returns event ACKs to the producer, taking account for dropped events.
// Events being dropped by processors will always be ACKed with the last batch ACKed
// by the broker. This way clients waiting for ACKs can expect all processed
// events being always ACKed.
type gapCountACK struct {
    pipeline *Pipeline

    fn func(total int, acked int)

    done chan struct{}

    drop chan struct{}
    acks chan int

    events atomic.Uint32
    lst    gapList


// libbeat/publisher/pipeline/acker.go
func (a *gapCountACK) ackLoop() {
    acks, drop := a.acks, a.drop
    closing := false
    for {
        select {
        case <-a.done:
            closing = true
            a.done = nil
            if a.events.Load() == 0 {
                // stop worker, if all events accounted for have been ACKed.
                // If new events are added after this acker won't handle them, which may
                // result in duplicates

        case <-a.pipeline.ackDone:

        case n := <-acks:
            // 重点:从acks读出n之后调用handleACK处理
            empty := a.handleACK(n)
            if empty && closing && a.events.Load() == 0 {
                // stop worker, if and only if all events accounted for have been ACKed

        case <-drop:
            // TODO: accumulate multiple drop events + flush count with timer
            a.fn(1, 0)

func (a *gapCountACK) handleACK(n int) bool {
    a.fn(total, acked) // line 326: func (a *boundGapCountACK) onACK(total, acked int) {
    return emptyLst


1. libbeat/publisher/pipeline/acker.go: handleACK --> a.fn(total, acked)
2. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) onACK(total, acked int) --> a.fn(total, acked)
3. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) onACK(total, acked int) --> a.fn(data, acked) 
4. libbeat/publisher/pipeline/pipeline_ack.go: func (p *pipelineEventCB) onEvents(data []interface{}, acked int)


// libbeat/publisher/pipeline/pipeline_ack.go
// pipelineEventCB internally handles active ACKs in the pipeline.
// It receives ACK events from the queue and the individual clients.
// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect
// events from all clients having published events in the last batch of events
// being ACKed.
// the PipelineACKHandler will be notified, once all events being ACKed
// (including dropped events) have been collected. Only one ACK-event is handled
// at a time. The pipeline global and clients ACK handler will be blocked for the time
// an ACK event is being processed.
type pipelineEventCB struct {
    done chan struct{}

    acks chan int

    // 这个字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel
    events        chan eventsDataMsg    
    droppedEvents chan eventsDataMsg

    mode    pipelineACKMode
    handler beat.PipelineACKHandler


// reportEvents sends a batch of ACKed events to the ACKer.
// The events array contains send and dropped events. The `acked` counters
// indicates the total number of events acked by the pipeline.
// That is, the number of dropped events is given by `len(events) - acked`.
// A client can report events with acked=0, iff the client has no waiting events
// in the pipeline (ACK ordering requirements)
// Note: the call blocks, until the ACK handler has collected all active events
//       from all clients. This ensure an ACK event being fully 'captured'
//       by the pipeline, before receiving/processing another ACK event.
//       In the meantime the queue has the chance of batching-up more ACK events,
//       such that only one ACK event is being reported to the pipeline handler
func (p *pipelineEventCB) onEvents(data []interface{}, acked int) {
    p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked})

func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) {
    if msg.acked == 0 {
        p.droppedEvents <- msg
    } else {
        msg.sig = make(chan struct{})
        p.events <- msg // 此处写入channel后,在(p *pipelineEventCB) worker()的collect中读出,最后reportEventsData


func (p *pipelineEventCB) worker() {
    defer close(p.acks)
    defer close(p.events)
    defer close(p.droppedEvents)

    for {
        select {
        case count := <-p.acks:
            // 在collect中读取消息
            exit := p.collect(count)
            if exit {

            // short circuit dropped events, but have client block until all events
            // have been processed by pipeline ack handler
        case msg := <-p.droppedEvents:
            p.reportEventsData(msg.data, msg.total)
            if msg.sig != nil {

        case <-p.done:

func (p *pipelineEventCB) collect(count int) (exit bool) {
    for acked < count {
        var msg eventsDataMsg
        select {
        // 在此处读取消息
        case msg = <-p.events:
        case msg = <-p.droppedEvents:
        case <-p.done:
            exit = true

    p.reportEventsData(data, total)

func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) {
    // report ACK back to the beat
    switch p.mode {
    case countACKMode:
    case eventsACKMode:
        // 这里调用之前在 Filebeat中注册的 ACKEvents
        p.handler.ACKEvents(data) // filebeat/beater/acker.go: func (a *eventACKer) ackEvents(data []interface{})
    case lastEventsACKMode:




