本文对Filebeat代码进行简单分析,作为之前 Filebeat导致文件无法被删除的原因分析 一文的补充,当然也可单独阅读,了解Filebeat的代码逻辑。需要注意的是:本文不是全面、深度的Filebeat源码剖析,而是专注在通用配置下核心数据的流转上面,目标是理清楚数据从采集到中间流转,最后到发送的流程,而不是对每处代码细节进行分析讲解。本文核心点包括:
- Filebeat实例的创建和初始化流程;
- 文件的遍历、采集,包括Crawler、Input、Harvester、Reader等核心概念;
- 数据发送逻辑,即Publisher/Pipeline;
- Filebeat的ACK机制。
另外,本文包含大量代码,但为了尽量减少代码所占篇幅和突出重点逻辑,大部分代码都做了删减,有的地方有说明,有的没有说明。如果想看完整代码,建议按照下面的指导搭建自己的源码环境,对照着阅读。当然,本文假设你已经对Filebeat基本使用比较熟悉了。
源码准备
Filebeat是Elastic公司Beat系列的一员,所有Beat的源码都归档在一起,所以下载源码直接克隆Beat的代码即可。需要注意的是早期Go的模块依赖使用的是govendor,这种方式需要代码在GOPATH的src目录下面,而Go 1.11版本中引入了Go module作为新的模块依赖,并且在1.12版本中正式启用,作为默认选项(当然用户可以通过GO111MODULE
这个环境变量来控制是否使用Go module功能)。这个特性的转变对于搭建Filebeat的源码环境也有一些影响,查看一下你想阅读的Beat的分支是否有go.mod
文件,如果有则说明使用的是Go module,否则还是老的govendor。如果是govendor,那你必须将代码放到${GOPATH}/src/github.com/elastic
,且GO111MODULE
设置为auto
或者off
;如果是Go module,则没有这个要求。至于Golang的版本,不要比官方要求的版本低就行。
本文的讲解是基于6.8分支的代码(6.8.15),该版本的beat模块依赖还是早期的govendor,所以用如下命令搭建源码环境:
mkdir -p ${GOPATH}/src/github.com/elastic
git clone https://github.com/elastic/beats ${GOPATH}/src/github.com/elastic/beats
cd ${GOPATH}/src/github.com/elastic/beats
git checkout 6.8
如果你无法科学上网,从github拉取beat的代码会很慢(主要是因为
.git
目录非常大),所以我把自己的代码打包传到了百度网盘,实在从github上拉不下来的可以直接下载我上传的(点此进入下载页面,密码: 6bro)。我提供了2个版本,一个包含git,一个不包含,都不影响代码阅读,没有git的好处是代码会小很多。
我的Golang版本是1.15.7,编辑器是vscode。代码下载好以后,直接运行根目录下面的filebeat目录中的main.go
,如果可以运行,那环境就算搭建好了。注意:如果提示配置文件filebeat.yml找不到,那环境也没问题,可能是因为你启动Filebeat的目录没有这个配置文件,可以通过-c <配置文件>
这个命令行参数去指定一个filebeat配置即可。另外,Filebeat日志默认只输出到文件,如果调试的时候想输出到终端,再增加一个-e
参数。如果是vscode,则直接修改.vscode/launch.json
即可:
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch file",
"type": "go",
"request": "launch",
"mode": "debug",
"program": "${file}",
"args": ["-e"]
}
]
}
filebeat与libbeat
熟悉Beat的应该都知道Beat里面有一个核心的libbeat模块,实现了很多基础功能,比如实例初始化、公共配置、队列、输出(output)、日志等,不同的Beat往往只需要实现如何采集数据,然后将数据以事件(event)的方式通过channel发给libbeat即可,这种方式大大简化了各个Beat的开发。下面是官方的一张说明Beat与libbeat关系的图:
上面提到的事件是在libbeat/beat/event.go
中定义的:
// Event is the common event format shared by all beats.
// Every event must have a timestamp and provide encodable Fields in `Fields`.
// The `Meta`-fields can be used to pass additional meta-data to the outputs.
// Output can optionally publish a subset of Meta, or ignore Meta.
type Event struct {
Timestamp time.Time
Meta common.MapStr
Fields common.MapStr
Private interface{} // for beats private use
}
Filebeat作为Beat的一员,也遵从上面的逻辑:Filebeat实现了文件的采集,并以event的方式发送给libbeat的Publisher,后续的工作都由libbeat完成。所以了解Filebeat完整的数据流程包含Filebeat和libbeat两部分代码。对应的代码在根目录下的filebeat目录和libbeat目录。为了方便理解,这里先给出一个精简的数据流图:
各部分主要功能是:
- Filebeat instance:Filebeat实例的创建和初始化;
- Crawler:遍历配置中的input,创建、初始化Input实例;
- Input:遍历配置的目录、文件,为每个需要采集的文件创建、初始化Harvester;
- Harvester:读取文件内容,以event的方式发送到后端的队列(本文以最常使用的内存队列为例讲解);
- Queue/Spooler:Filebeat内部的队列有两种:内存队列(Broker)和文件队列(Spooler),代码中文件队列叫spooler,但大多数地方spooler是queue的代名词,而不专指文件队列;
- Publisher:libbeat中的发送逻辑,整个发送流程在代码中称为pipeline;
- Registrar:记录采集过的文件及位置(offset),和ACK机制一起实现Filebeat承诺的至少发送一次(at-least once)的保证。
下面分别介绍这些模块。
实例初始化 && Registrar
第一步是Filebeat的实例初始化。libbeat定义了一个Beater接口,不同的Beat实现这个接口,即可复用libbeat定义的诸多基础功能,这个接口定义如下:
// libbeat/beat/beat.go
// Beater is the interface that must be implemented by every Beat. A Beater
// provides the main Run-loop and a Stop method to break the Run-loop.
// Instantiation and Configuration is normally provided by a Beat-`Creator`.
//
// Once the beat is fully configured, the Run() method is invoked. The
// Run()-method implements the beat its run-loop. Once the Run()-method returns,
// the beat shuts down.
//
// The Stop() method is invoked the first time (and only the first time) a
// shutdown signal is received. The Stop()-method normally will stop the Run()-loop,
// such that the beat can gracefully shutdown.
type Beater interface {
// The main event loop. This method should block until signalled to stop by an
// invocation of the Stop() method.
Run(b *Beat) error
// Stop is invoked to signal that the Run method should finish its execution.
// It will be invoked at most once.
Stop()
}
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Info Info // beat metadata.
Publisher Pipeline // Publisher pipeline
SetupMLCallback SetupMLCallback // setup callback for ML job configs
InSetupCmd bool // this is set to true when the `setup` command is called
OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback
// XXX: remove Config from public interface.
// It's currently used by filebeat modules to setup the Ingest Node
// pipeline and ML jobs.
Config *BeatConfig // Common Beat configuration data.
BeatConfig *common.Config // The beat's own configuration section
Fields []byte // Data from fields.yml
ConfigManager management.ConfigManager // config manager
}
Filebeat在filebeat/beater/filebeat.go
中实现了这个接口。我们看下Filebeat是如何从main函数(filebeat/main.go
)流转到filebeat.go的:
// filebeat/main.go
// The basic model of execution:
// - input: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}
main前面的注释概述了Filebeat的运行逻辑,也是前面图的由来。main方法里面的代码很简单,下面直接给出关键的调用关系:
- filebeat/main.go#main
- filebeat/cmd/root.go#init: RootCmd = cmd.GenRootCmdWithRunFlags(Name, "", beater.New, runFlags)
- libbeat/cmd/root.go#GenRootCmdWithSettings: rootCmd.RunCmd = genRunCmd(settings, beatCreator, runFlags)
- libbeat/cmd/run.go#genRunCmd: err := instance.Run(settings, beatCreator)
- libbeat/cmd/instance/beat.go#Run: return b.launch(settings, bt)
- libbeat/cmd/instance/beat.go#launch: return beater.Run(&b.Beat)
- filebeat/beater/filebeat.go#Run
这部分的代码流转主要是在libbeat中,所以是一些比较通用化的配置,主要是构造Beater实例(此处是Filebeat实例),并最终进入到filebeat.go的Run
方法中。Run方法中有两个比较重要的操作:registrar和crawler的创建、启动。
Filebeat的at-least once采集就是通过Registrar模块实现的,该文件会记录采集过的文件以及最后采集的位置,并且只有收到数据成功发送的确认后才会更新,其核心结构如下:
// filebeat/registrar/registrar.go
type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
fileMode os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup
states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
gcEnabled bool // gcEnabled indicates the registry contains some state that can be gc'ed in the future
flushTimeout time.Duration
bufferedStateUpdates int
}
每次重启Filebeat后,在registrar启动的时候会调用func (r *Registrar) loadStates()
读取registry文件中的内容,和需要采集的文件作对比(func (p *Input) loadStates(states []file.State)
),确认哪些文件采集过了,哪些有更新,需要接着上次的地方继续采集。
确认消息成功发送的回调(通过channel实现)函数是在Run方法里面注册的(这部分后面还会专门讲解):
err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
Registrar的记录文件默认为filebeat安装目录/data/registry
,示例内容如下(为了方便查看,进行了格式化):
[{
"source": "/Users/allan/Desktop/temp/test-logs/test-1.log",
"offset": 362,
"timestamp": "2021-02-28T21:28:55.435218+08:00",
"ttl": -1,
"type": "log",
"meta": null,
"FileStateOS": {
"inode": 8712351738,
"device": 16777225
}
}, {
"source": "/Users/allan/Desktop/temp/test-logs/test-2.log",
"offset": 362,
"timestamp": "2021-02-28T21:28:55.24922+08:00",
"ttl": -1,
"type": "log",
"meta": null,
"FileStateOS": {
"inode": 8712603538,
"device": 16777225
}
}]
另外,为了保证数据不丢失,实例初始化的时候各大模块的启动顺序依次是:registrar --> publisher --> spooler --> crawler/input. 而实例停止的时候各大模块的停止顺序则正好相反。下面看Crawler和Input。
输入/文件遍历(Crawler && Input)
Crawler
我们知道filebeat采集数据是通过在配置文件里面配置若干个input实现的,而Crawler的作用就是解析这些input,并创建、启动Input。Crawler的核心代码在filebeat/crawler/crawler.go
中。
核心结构定义如下:
type Crawler struct {
inputs map[uint64]*input.Runner // 包含若干个Input
inputConfigs []*common.Config
out channel.Factory
wg sync.WaitGroup
InputsFactory cfgfile.RunnerFactory
ModulesFactory cfgfile.RunnerFactory
modulesReloader *cfgfile.Reloader
inputReloader *cfgfile.Reloader
once bool
beatVersion string
beatDone chan struct{}
}
在Start方法中遍历配置中的input,并创建、启动input:
// Start starts the crawler with all inputs
func (c *Crawler) Start(
pipeline beat.Pipeline,
r *registrar.Registrar,
configInputs *common.Config,
configModules *common.Config,
pipelineLoaderFactory fileset.PipelineLoaderFactory,
overwritePipelines bool,
) error {
logp.Info("Loading Inputs: %v", len(c.inputConfigs))
// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig, r.GetStates())
if err != nil {
return err
}
}
// 省略部分代码
logp.Info("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))
return nil
}
// 创建、启动Input
func (c *Crawler) startInput(
pipeline beat.Pipeline,
config *common.Config,
states []file.State,
) error {
if !config.Enabled() {
return nil
}
connector := channel.ConnectTo(pipeline, c.out)
p, err := input.New(config, connector, c.beatDone, states, nil)
if err != nil {
return fmt.Errorf("Error in initing input: %s", err)
}
p.Once = c.once
if _, ok := c.inputs[p.ID]; ok {
return fmt.Errorf("Input with same ID already exists: %d", p.ID)
}
c.inputs[p.ID] = p
p.Start()
return nil
}
Input
Filebeat支持多种类型的Input,比如log、redis、stdin、docker等,这些代码都在filebeat/input
目录,不同类型的目录下面实现了特定的input。通用Input接口定义如下:
// filebeat/input/input.go
// Input is the interface common to all input
type Input interface {
Run()
Stop()
Wait()
}
// Start starts the input
func (p *Runner) Start() {
p.wg.Add(1)
logp.Info("Starting input of type: %v; ID: %d ", p.config.Type, p.ID)
// 省略部分代码
// Add waitgroup to make sure input is finished
go func() {
defer func() {
onceWg.Done()
p.stop()
p.wg.Done()
}()
p.Run()
}()
}
// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Runner) Run() {
// Initial input run
p.input.Run()
// 省略部分代码
}
此处以最常用的log类型为例进行说明。p.input.Run()
会跳转到filebeat/log/input.go#Run
:
// Input contains the input and its config
type Input struct {
cfg *common.Config
config config
states *file.States
harvesters *harvester.Registry // 1个input包含多个harvesters
outlet channel.Outleter
stateOutlet channel.Outleter
done chan struct{}
numHarvesters atomic.Uint32
meta map[string]string
stopOnce sync.Once
}
// Run runs the input
func (p *Input) Run() {
logp.Debug("input", "Start next scan")
// TailFiles is like ignore_older = 1ns and only on startup
if p.config.TailFiles {
ignoreOlder := p.config.IgnoreOlder
// Overwrite ignore_older for the first scan
p.config.IgnoreOlder = 1
defer func() {
// Reset ignore_older after first run
p.config.IgnoreOlder = ignoreOlder
// Disable tail_files after the first run
p.config.TailFiles = false
}()
}
p.scan()
// It is important that a first scan is run before cleanup to make sure all new states are read first
if p.config.CleanInactive > 0 || p.config.CleanRemoved {
beforeCount := p.states.Count()
cleanedStates, pendingClean := p.states.Cleanup()
logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
beforeCount, beforeCount-cleanedStates, pendingClean)
}
// Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
if p.config.CleanRemoved {
for _, state := range p.states.GetStates() {
// os.Stat will return an error in case the file does not exist
stat, err := os.Stat(state.Source)
if err != nil {
if os.IsNotExist(err) {
p.removeState(state)
logp.Debug("input", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Err("input state for %s was not removed: %s", state.Source, err)
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
if !newState.FileStateOS.IsSame(state.FileStateOS) {
p.removeState(state)
logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
}
}
}
}
}
对Filebeat配置比较熟悉的朋友看到这部分代码,应该很亲切,变量的命名和配置项几乎是对应的,很多判断逻辑都是对配置项的处理,很容易理解。其中比较关键的是p.scan()
:
// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {
var sortInfos []FileSortInfo
var files []string
// 获取目录下需要被采集的文件,是否需要被采集的逻辑就是在getFiles()方法中实现
paths := p.getFiles()
// 省略部分代码
for i := 0; i < len(paths); i++ {
// 省略一些判断是否采集过,以及采集到哪里的代码
// Decides if previous state exists
if lastState.IsEmpty() {
logp.Debug("input", "Start harvester for new file: %s", newState.Source)
// 启动harvester
err := p.startHarvester(newState, 0)
// 省略错误处理代码
} else {
p.harvestExistingFile(newState, lastState)
}
}
}
// harvestExistingFile continues harvesting a file with a known state if needed
func (p *Input) harvestExistingFile(newState file.State, oldState file.State) {
logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset)
if oldState.Finished && newState.Fileinfo.Size() > oldState.Offset {
logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size())
// 启动harvester采集
err := p.startHarvester(newState, oldState.Offset)
if err != nil {
logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err)
}
return
}
// 省略后续代码
}
// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(state file.State, offset int64) error {
if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
p.numHarvesters.Dec()
harvesterSkipped.Add(1)
return errHarvesterLimit
}
// Set state to "not" finished to indicate that a harvester is running
state.Finished = false
state.Offset = offset
// Create harvester with state
h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })
if err != nil {
p.numHarvesters.Dec()
return err
}
err = h.Setup()
if err != nil {
p.numHarvesters.Dec()
return fmt.Errorf("error setting up harvester: %s", err)
}
// Update state before staring harvester
// This makes sure the states is set to Finished: false
// This is synchronous state update as part of the scan
h.SendStateUpdate()
if err = p.harvesters.Start(h); err != nil {
p.numHarvesters.Dec()
}
return err
}
scan代码的核心逻辑就是遍历目录下的文件,找到需要采集的文件后就创建启动一个harvester实例进行采集。最后面的startHarvester
方法中先是创建了一个harvester实例(p.createHarvester
),然后配置(h.Setup
)该实例,最后启动(p.harvesters.Start(h)
)实例。这部分在接下来的Harvester部分介绍。
filebeat/log/input.go文件中的代码中包含了大量配置项的代码逻辑,建议好好看一下。如果你对input部分的配置项比较熟悉,这部分代码看起来也比较简单。对照配置项说明文档进行查看,效果更佳。
数据采集Harvester
Harvester
一个Harvester就是一个goroutine,接口定义在filebeat/harvester/harvester.go
中:
// Harvester contains all methods which must be supported by each harvester
// so the registry can be used by the input
type Harvester interface {
ID() uuid.UUID
Run() error
Stop()
}
不同的input类型会实现自己的Harvester,log类型的Harvester实现在filebeat/input/log/harvester.go
中,核心结构定义如下:
// Harvester contains all harvester related data
type Harvester struct {
id uuid.UUID
config config
source harvester.Source // the source being watched
// shutdown handling
done chan struct{}
stopOnce sync.Once
stopWg *sync.WaitGroup
stopLock sync.Mutex
// internal harvester state
state file.State
states *file.States
log *Log
// file reader pipeline
reader reader.Reader
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
// event/state publishing
outletFactory OutletFactory
publishState func(*util.Data) bool
onTerminate func()
}
// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
// 该方法在上篇文章中已经介绍过,下面省略掉了大部分代码,只保留了读取和发送
for {
// 读取
message, err := h.reader.Next()
// 发送
if !h.sendEvent(data, forwarder) {
return nil
}
}
}
上篇文章中已经介绍过Harvester,其核心任务就是打开文件,根据配置读取文件内容,并发送。读取和发送都是在Run
中实现,见上面的代码注释。本文再补充介绍下另外一个关键点reader.Reader
。
各种Reader
其实读取的真正操作是由一系列Reader完成的。Reader接口定义在filebeat/reader/reader.go
中:
// Reader is the interface that wraps the basic Next method for
// getting a new message.
// Next returns the message being read or and error. EOF is returned
// if reader will not return any new message on subsequent calls.
type Reader interface {
Next() (Message, error)
}
// Message represents a reader event with timestamp, content and actual number
// of bytes read from input before decoding.
type Message struct {
Ts time.Time // timestamp the content was read
Content []byte // actual content read
Bytes int // total number of bytes read to generate the message
Fields common.MapStr // optional fields that can be added by reader
}
该接口只包含一个Next
方法,每调用一次,则读取一个Message。Filebeat实现了很多种Reader,这些Reader根据用户的配置形成一个调用链,对最原始的数据依次进行处理,就像一个流水线一样。每一个后面的Reader都包含了前面的Reader。这些Reader都定义在filebeat/reader
目录,主要包括下面这些:
最底层的log.go#Read
并不是一个Filebeat的reader.Reader
实现,而是直接调用了Go底层的Read,实现读取指定长度的字节数据:
// filebeat/input/log/log.go
func (f *Log) Read(buf []byte) (int, error) {
for {
n, err := f.fs.Read(buf) // 调用go底层os/file.go#Read
}
}
// os/file.go#Read
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
if err := f.checkValid("read"); err != nil {
return 0, err
}
n, e := f.read(b)
return n, f.wrapErr("read", e)
}
再往上都是各种reader.Reader
的实现,依次如下(都省略了部分代码):
LineReader,实现逐行读取的功能:
// filebeat/reader/readfile/line.go
// lineReader reads lines from underlying reader, decoding the input stream
// using the configured codec. The reader keeps track of bytes consumed
// from raw input stream for every decoded line.
type LineReader struct {
reader io.Reader
codec encoding.Encoding
bufferSize int
maxBytes int
nl []byte
inBuffer *streambuf.Buffer
outBuffer *streambuf.Buffer
inOffset int // input buffer read offset
byteCount int // number of bytes decoded from input buffer into output buffer
decoder transform.Transformer
}
// Next reads the next line until the new line character
func (r *LineReader) Next() ([]byte, int, error) {
for {
err := r.advance()
}
}
// Reads from the buffer until a new line character is detected
// Returns an error otherwise
func (r *LineReader) advance() error {
// fill inBuffer until '\n' sequence has been found in input buffer
for idx == -1 {
// try to read more bytes into buffer filebeat/input/log/log.go#Read
n, err := r.reader.Read(buf)
}
}
EncoderReader,对行数据进行编解码:
// filebeat/reader/readfile/encode.go
// Reader produces lines by reading lines from an io.Reader
// through a decoder converting the reader it's encoding to utf-8.
type EncoderReader struct {
reader *LineReader
}
// Next reads the next line from it's initial io.Reader
// This converts a io.Reader to a reader.reader
func (r EncoderReader) Next() (reader.Message, error) {
c, sz, err := r.reader.Next()
// Creating message object
return reader.Message{
Ts: time.Now(),
Content: c,
Bytes: sz,
}, err
}
JSON Reader,处理JSON格式的数据:
// filebeat/reader/json/json.go
type JSON struct {
reader reader.Reader
cfg *Config
}
// Next decodes JSON and returns the filled Line object.
func (r *JSON) Next() (reader.Message, error) {
}
StripNewline Reader,去掉后面的换行符:
// filebeat/reader/readfile/strip_newline.go
// StripNewline reader removes the last trailing newline characters from
// read lines.
type StripNewline struct {
reader reader.Reader
}
// Next returns the next line.
func (p *StripNewline) Next() (reader.Message, error) {
}
Timeout Reader,超时处理:
// filebeat/reader/readfile/timeout.go
// TimeoutReader will signal some configurable timeout error if no
// new line can be returned in time.
type TimeoutReader struct {
reader reader.Reader
timeout time.Duration
signal error
running bool
ch chan lineMessage
}
// Next returns the next line. If no line was returned before timeout, the
// configured timeout error is returned.
// For handline timeouts a goroutine is started for reading lines from
// configured line reader. Only when underlying reader returns an error, the
// goroutine will be finished.
func (r *TimeoutReader) Next() (reader.Message, error) {
}
Multiline Reader,多行合并处理:
// filebeat/reader/multiline/multiline.go
// MultiLine reader combining multiple line events into one multi-line event.
//
// Lines to be combined are matched by some configurable predicate using
// regular expression.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type Reader struct {
reader reader.Reader
pred matcher
flushMatcher *match.Matcher
maxBytes int // bytes stored in content
maxLines int
separator []byte
last []byte
numLines int
truncated int
err error // last seen error
state func(*Reader) (reader.Message, error)
message reader.Message
}
// Next returns next multi-line event.
func (mlr *Reader) Next() (reader.Message, error) {
return mlr.state(mlr)
}
LimitReader,单个event长度限制:
// filebeat/reader/readfile/limit.go
// Reader sets an upper limited on line length. Lines longer
// then the max configured line length will be snapped short.
type LimitReader struct {
reader reader.Reader
maxBytes int
}
// Next returns the next line.
func (r *LimitReader) Next() (reader.Message, error) {
message, err := r.reader.Next()
if len(message.Content) > r.maxBytes {
message.Content = message.Content[:r.maxBytes]
message.AddFlagsWithKey("log.flags", "truncated")
}
return message, err
}
这么多的Reader并非在所有场景下都是必须的,需要根据用户配置进行装配,这个操作是在初始化Harvester时进行的:
// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
err := h.open()
if err != nil {
return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
}
// 在newLogFileReader中根据用户配置装配 Reader 流
h.reader, err = h.newLogFileReader()
if err != nil {
if h.source != nil {
h.source.Close()
}
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
}
return nil
}
Harvester的Run方法中通过无限循环调用h.reader.Next()
时,会依次递归调用这些Reader的Next
方法,加工数据,得到的最终数据发给给后端。
发送逻辑(Harvester内部)
发送的调用流程如下:
- filebeat/input/log/harvester.go#Run(): h.sendEvent(data, forwarder)
- filebeat/input/log/harvester.go#sendEvent: forwarder.Send(data)
- filebeat/harvester/forwarder.go#Send: f.Outlet.OnEvent(data)
- filebeat/channel/util.go#OnEvent
实际的发送是由filebeat/channel/util.go
和filebeat/channel/outlet.go
中的代码协作完成的:
filebeat/channel/util.go:
// filebeat/channel/util.go
func (o *subOutlet) OnEvent(d *util.Data) bool {
o.mutex.Lock()
defer o.mutex.Unlock()
select {
case <-o.done:
return false
default:
}
select {
case <-o.done:
return false
// 数据写入channel
case o.ch <- d:
select {
case <-o.done:
return true
// 等待结果
case ret := <-o.res:
return ret
}
}
}
// filebeat/channel/util.go
// SubOutlet create a sub-outlet, which can be closed individually, without closing the
// underlying outlet.
func SubOutlet(out Outleter) Outleter {
s := &subOutlet{
done: make(chan struct{}),
ch: make(chan *util.Data),
res: make(chan bool, 1),
}
go func() {
// 从channel读取数据,并调用OnEvent发送数据
for event := range s.ch {
s.res <- out.OnEvent(event)
}
}()
return s
}
filebeat/channel/outlet.go:
func (o *outlet) OnEvent(d *util.Data) bool {
if !o.isOpen.Load() {
return false
}
event := d.GetEvent()
if d.HasState() {
event.Private = d.GetState()
}
if o.wg != nil {
o.wg.Add(1)
}
o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish
return o.isOpen.Load()
}
这部分代码逻辑是:
- 数据经由
filebeat/channel/util.go#OnEvent
中的case o.ch <- d
写入到channel中,然后在内层的select中等待结果; - 同时
filebeat/channel/util.go#Outleter
里面的一个goroutine会读取写入到channel的数据,并调用filebeat/channel/outlet.go#OnEvent
中的o.client.Publish(event)
发送数据,发送成功后,将结果写回到subOutlet的res channel,然后harvester返回。
上篇文章中harvester关闭不掉,就是因为卡在了o.client.Publish(event)
无法返回,所以harvester卡在了subOutlet的res channel,即一直等待发送结果。
到目前为止,数据都是在Filebeat的代码中流转的,此处调用o.client.Publish(event)
之后,就会进入到libbeat中(libbeat/publisher/pipeline/client.go#Publish)。
数据发送Publisher
从这部分开始,代码进入到libbeat中,这部分代码的复杂主要体现在涉及概念比较多,流转流程比较长。
Pipeline
Beat把Publisher中的整个流程称之为pipeline,是libbeat中非常核心的一部分,代码在libbeat/publisher/pipeline/pipeline.go
中,这里贴一下包含较详细注释部分的代码:
// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline
// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most central entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
type Pipeline struct {
beatInfo beat.Info
logger *logp.Logger
queue queue.Queue
output *outputController
observer observer
eventer pipelineEventer
// wait close support
waitCloseMode WaitCloseMode
waitCloseTimeout time.Duration
waitCloser *waitCloser
// pipeline ack
ackMode pipelineACKMode
ackActive atomic.Bool
ackDone chan struct{}
ackBuilder ackBuilder
eventSema *sema
processors pipelineProcessors
}
我整理了一个图:
下面介绍pipeline中的各个环节。接前文,Harvester中会持有一个最左边的client实例(一个Input实例中的所有Harvester共享该Client),然后通过这个client调用Producer将数据发送到一个buffer,这是一个channel,大小硬编码为20。同时需要注意的一个点是client的publish中还执行了processor,也就是如果定义了processor,他是在发送的Client里面执行的。这部分功能对应代码如下(只保留了关键代码):
// filebeat/channel/outlet.go
func (o *outlet) OnEvent(d *util.Data) bool {
o.client.Publish(event) // 跳到libbeat/publisher/pipeline/client.go#Publish
}
// libbeat/publisher/pipeline/client.go
func (c *client) publish(e beat.Event) {
// 如果定义了processor,则在此处执行
if c.processors != nil {
var err error
event, err = c.processors.Run(event)
publish = event != nil
if err != nil {
// TODO: introduce dead-letter queue?
log.Errorf("Failed to publish event: %v", err)
}
}
var published bool
if c.canDrop {
published = c.producer.TryPublish(pubEvent)
} else {
published = c.producer.Publish(pubEvent) // queue/memqueue/produce.go
}
}
// libbeat/publisher/queue/memqueue/produce.go
func (p *forgetfulProducer) Publish(event publisher.Event) bool {
return p.openState.publish(p.makeRequest(event))
}
func (st *openState) publish(req pushRequest) bool {
select {
// 将数据发送到events buffer中
case st.events <- req:
return true
case <-st.done:
st.events = nil
return false
}
}
然后EventLoop从events buffer中读取数据写到batchBuffer中,batchBuffer是一个Slice,其大小为queue.mem.events
(默认值为4096)。这部分功能对应代码如下(只保留了关键代码):
// libbeat/publisher/queue/memqueue/eventloop.go
// 创建EventLoop
func newBufferingEventLoop(b *Broker, size int, minEvents int, flushTimeout time.Duration) *bufferingEventLoop {
l := &bufferingEventLoop{
broker: b,
maxEvents: size,
minEvents: minEvents,
flushTimeout: flushTimeout,
// 直接使用Broker的events
events: b.events,
get: nil,
pubCancel: b.pubCancel,
acks: b.acks,
}
l.buf = newBatchBuffer(l.minEvents)
l.timer = time.NewTimer(flushTimeout)
if !l.timer.Stop() {
<-l.timer.C
}
return l
}
func (l *bufferingEventLoop) run() {
var (
broker = l.broker
)
for {
select {
case <-broker.done:
return
case req := <-l.events: // producer pushing new event
l.handleInsert(&req)
}
}
}
func (l *bufferingEventLoop) handleInsert(req *pushRequest) {
// insert会把数据写入batchBuffer
if l.insert(req) {
l.eventCount++
if l.eventCount == l.maxEvents {
// 队列面了就把chan设置为nil,此时写会被阻塞。等收到ack(handleACK)后又会恢复队列
l.events = nil // stop inserting events if upper limit is reached
}
}
}
数据到batchBuffer之后,eventConsumer会按照用户配置的规则批量从batchBuffer读取数据,并写入workQueue,这是一个channel。这部分功能对应代码如下(只保留了关键代码):
// libbeat/publisher/pipeline/consumer.go
func (c *eventConsumer) loop(consumer queue.Consumer) {
log := c.logger
log.Debug("start pipeline event consumer")
var (
out workQueue
batch *Batch
paused = true
)
for {
select {
case <-c.done:
log.Debug("stop pipeline event consumer")
return
case sig := <-c.sig:
handleSignal(sig)
// 将数据写入 workQueue
case out <- batch:
batch = nil
}
}
}
数据到workQueue之后,再由netClientWorker模块读取并通过调用output的Publish将数据真正的发送出去。这里以ElasticSearch类型的output为例展示代码流程(只保留了关键代码):
// libbeat/publisher/pipeline/output.go
func (w *netClientWorker) run() {
for !w.closed.Load() {
reconnectAttempts := 0
// 从 workQueue读取数据
// send loop
for batch := range w.qu {
if w.closed.Load() {
if batch != nil {
batch.Cancelled()
}
return
}
// 发送到output
err := w.client.Publish(batch) // libbeat/outputs/backoff.go
if err != nil {
logp.Err("Failed to publish events: %v", err)
// on error return to connect loop
break
}
}
}
}
// libbeat/outputs/backoff.go
func (b *backoffClient) Publish(batch publisher.Batch) error {
err := b.client.Publish(batch) // libbeat/outputs/elasticsearch/client.go
if err != nil {
b.client.Close()
}
backoff.WaitOnError(b.backoff, err)
return err
}
// libbeat/outputs/elasticsearch/client.go
func (client *Client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, err := client.publishEvents(events)
if len(rest) == 0 {
batch.ACK() // libbeat/publisher/pipeline/batch.go
} else {
batch.RetryEvents(rest)
}
return err
}
至此,整个pipeline的数据流程就算完成了,其实都是各种代码调用,并不复杂,只是需要花时间去看代码而已。接下来,再补充介绍一下pipeline中比较核心的spooler。
Queue
Filebeat提供了2种队列:内存队列和文件队列。实际中绝大多数应该用的都是内存队列,这里也仅介绍内存队列,文件队列的实现在libbeat/publisher/queue/spool
目录下,有兴趣的自行查看,核心的东西和内存队列一致。内存队列的定义在libbeat/publisher/queue/memqueue
目录下,定义队列的文件是broker.go
:
// 内存队列在代码中叫Broker
type Broker struct {
done chan struct{}
logger logger
bufSize int
// buf brokerBuffer
// minEvents int
// idleTimeout time.Duration
// api channels
events chan pushRequest
requests chan getRequest
pubCancel chan producerCancelRequest
// internal channels
acks chan int
scheduledACKs chan chanList
eventer queue.Eventer
// wait group for worker shutdown
wg sync.WaitGroup
waitOnClose bool
}
前面的介绍的event buffer就是这里的events
字段。该文件中还有一个NewBroker
函数比较重要,里面是创建一个Broker,并且定义了eventLoop
接口,该接口有2个实现:
- directEventLoop(
queue.mem.flush.min_events = 1
) - bufferingEventLoop(
queue.mem.flush.min_events > 1
)
queue.mem.flush.min_events
的默认值为2048,所以创建的是bufferingEventLoop,这里仅介绍该类型的EventLoop:
// libbeat/publisher/queue/memqueue/eventloop.go
// bufferingEventLoop implements the broker main event loop.
// Events in the buffer are forwarded to consumers only if the buffer is full or on flush timeout.
type bufferingEventLoop struct {
broker *Broker
buf *batchBuffer
flushList flushList
eventCount int
minEvents int
maxEvents int
flushTimeout time.Duration
// active broker API channels
events chan pushRequest
get chan getRequest
pubCancel chan producerCancelRequest
// ack handling
acks chan int // ackloop -> eventloop : total number of events ACKed by outputs
schedACKS chan chanList // eventloop -> ackloop : active list of batches to be acked
pendingACKs chanList // ordered list of active batches to be send to the ackloop
ackSeq uint // ack batch sequence number to validate ordering
// buffer flush timer state
timer *time.Timer
idleC <-chan time.Time
}
前面提到的batchBuffer就是这里的buf
字段。另外,看代码的时候注意区分一下spooler这个词的含义,大多数时候它都指代的是queue。同时,文件队列也称为spooler。
至此,整个pipeline就介绍完了。到这里,从Filebeat实例创建,到数据采集、发送都全部介绍完了。还差的就是ACK流程,这个是和数据发送流程相反的。
ACK流程
Filebeat基于Registrar+ACK的方式实现了至少发送一次的保证,Registrar前面已经介绍过了,最后看下ACK的流程,这部分相对复杂一些。
再看下Registrar的逻辑:
// filebeat/registrar/registrar.go
type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
fileMode os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup
states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
gcEnabled bool // gcEnabled indicates the registry contains some state that can be gc'ed in the future
flushTimeout time.Duration
bufferedStateUpdates int
}
func (r *Registrar) Run() {
logp.Debug("registrar", "Starting Registrar")
// Writes registry on shutdown
defer func() {
r.writeRegistry()
r.wg.Done()
}()
var (
timer *time.Timer
flushC <-chan time.Time
)
for {
select {
case <-r.done:
logp.Info("Ending Registrar")
return
case <-flushC:
flushC = nil
timer.Stop()
r.flushRegistry()
case states := <-r.Channel: // 依靠这个channel通信
// 收到channel中的确认信息之后,将数据写入registry文件
r.onEvents(states)
if r.flushTimeout <= 0 {
r.flushRegistry()
} else if flushC == nil {
timer = time.NewTimer(r.flushTimeout)
flushC = timer.C
}
}
}
}
Registrar结构定义了一个Channel
字段,这个channel就是用来接收ack消息的。Run方法里面从这个channel读取数据。然后看下ack数据是如何写到这个channel的。
首先filebeat.go初始化时注册了全局ACK处理回调:
// filebeat/beater/filebeat.go
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)
// 注册消息成功发送的回调
err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(finishedLogger, registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}
}
// filebeat/beater/filebeat.go
func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
return ®istrarLogger{
done: make(chan struct{}),
ch: reg.Channel,
}
}
// filebeat/beater/channels.go
type registrarLogger struct {
done chan struct{}
ch chan<- []file.State
}
// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
// has been processed the first time.
// Note: nil channels will block, so only done channel will be actively
// report 'closed'.
l.ch = nil
case l.ch <- states:
}
}
// filebeat/beater/acker.go
type statefulLogger interface {
Published(states []file.State)
}
// filebeat/beater/acker.go
func newEventACKer(stateless statelessLogger, stateful statefulLogger) *eventACKer {
return &eventACKer{stateless: stateless, stateful: stateful, log: logp.NewLogger("acker")}
}
// 注册的回调函数
func (a *eventACKer) ackEvents(data []interface{}) {
stateless := 0
states := make([]file.State, 0, len(data))
for _, datum := range data {
if datum == nil {
stateless++
continue
}
st, ok := datum.(file.State)
if !ok {
stateless++
continue
}
states = append(states, st)
}
if len(states) > 0 {
a.log.Debugw("stateful ack", "count", len(states))
a.stateful.Published(states) // filebeat/beater/channels.go: func (l *registrarLogger) Published(states []file.State)
}
if stateless > 0 {
a.log.Debugw("stateless ack", "count", stateless)
a.stateless.Published(stateless) //
}
}
这里最终注册的回调是eventACKer,里面的重点是a.stateful.Published
,看下这个的实现:
// filebeat/beater/channels.go
func (l *registrarLogger) Published(states []file.State) {
select {
case <-l.done:
// set ch to nil, so no more events will be send after channel close signal
// has been processed the first time.
// Note: nil channels will block, so only done channel will be actively
// report 'closed'.
l.ch = nil
case l.ch <- states:
}
}
里面将最终的ack消息(states)写到了l.ch。这个channel就是Registrar那里的channel(从注册的代码里面可以分析出来),即回调函数将ack消息写入channel,然后Registrar从channel中读取states数据,写入registry文件,这样形成一个闭环。如下图:
现在的问题就是:这个ackEvents回调函数的ack又是哪来的呢?是谁(who),在什么地方(where),什么时候(when),以何种方式(how)发送到ackEvents的?首先推断一下,既然是ack,那最源头当然应该是最终发送数据的地方发出,即发送数据完成得到外部确认之后,反向传递ack,正好和采集的时间传递方向相反,也就是核心应该在Publisher里面,或者说libbeat的pipeline里面。下面从pipeline中最核心的(内存)队列Broker模块开始分析。
// libbeat/publisher/queue/memqueue/broker.go
// NewBroker creates a new broker based in-memory queue holding up to sz number of events.
// If waitOnClose is set to true, the broker will block on Close, until all internal
// workers handling incoming messages and ACKs have been shut down.
func NewBroker(
logger logger,
settings Settings,
) *Broker {
var eventLoop interface {
run()
processACK(chanList, int)
}
// 创建EventLoop
if minEvents > 1 {
eventLoop = newBufferingEventLoop(b, sz, minEvents, flushTimeout)
} else {
eventLoop = newDirectEventLoop(b, sz)
}
b.bufSize = sz
// 创建AckLoop
ack := newACKLoop(b, eventLoop.processACK)
b.wg.Add(2)
go func() {
defer b.wg.Done()
eventLoop.run()
}()
// 这个goroutine中启动 ack
go func() {
defer b.wg.Done()
ack.run()
}()
}
// libbeat/publisher/queue/memqueue/ackloop.go
func newACKLoop(b *Broker, processACK func(chanList, int)) *ackLoop {
l := &ackLoop{broker: b}
l.processACK = processACK
return l
}
NewBroker中创建了eventLoop和ackLoop,前者用于发送数据,前面已经讲过,后者则用于处理ack。看下ackLoop的代码:
// libbeat/publisher/queue/memqueue/ackloop.go
// ackLoop implements the brokers asynchronous ACK worker.
// Multiple concurrent ACKs from consecutive published batches will be batched up by the
// worker, to reduce the number of signals to return to the producer and the
// broker event loop.
// Producer ACKs are run in the ackLoop go-routine.
type ackLoop struct {
broker *Broker
sig chan batchAckMsg // 确认消息发送的channel
lst chanList
totalACK uint64
totalSched uint64
batchesSched uint64
batchesACKed uint64
processACK func(chanList, int)
}
func (l *ackLoop) run() {
for {
select {
case <-l.broker.done:
return
case acks <- acked:
acks, acked = nil, 0
case lst := <-l.broker.scheduledACKs:
count, events := lst.count()
l.lst.concat(&lst)
l.batchesSched += uint64(count)
l.totalSched += uint64(events)
// 这里等待batch发送完成的确认信号
case <-l.sig:
acked += l.handleBatchSig()
if acked > 0 {
acks = l.broker.acks
}
}
}
}
可以看到,ackloop中在等待batch发送完成的信号(sig),这里有2条线:
- 信号如何来的?
- 收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?
先来看第1个问题:信号如何来的?根据前面的推断,应该由发送数据那里产生。而由pipeline部分的分析知道最终数据发送在output那里。此处继续以ES这种类型的output为例,看下最终发送的代码,从那里反推:
// libbeat/outputs/elasticsearch/client.go 这是之前ES部分最终的发送代码
func (client *Client) Publish(batch publisher.Batch) error {
events := batch.Events()
rest, err := client.publishEvents(events)
if len(rest) == 0 {
batch.ACK() // 重点看这里的ACK,这个会跳转到libbeat/publisher/pipeline/batch.go
} else {
batch.RetryEvents(rest)
}
return err
}
// libbeat/publisher/pipeline/batch.go
func (b *Batch) ACK() {
b.ctx.observer.outBatchACKed(len(b.events)) // libbeat/publisher/pipeline/monitoring.go: func (o *metricsObserver) outBatchACKed(int) {} 这个是监控用的
b.original.ACK() // 重点看这里: libbeat/publiser/queue/memequeue/consumer.go
releaseBatch(b)
}
// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) ACK() {
if b.state != batchActive {
switch b.state {
case batchACK:
panic("Can not acknowledge already acknowledged batch")
default:
panic("inactive batch")
}
}
b.report() // 重点在这里
}
// libbeat/publisher/queue/memqueue/consume.go
func (b *batch) report() {
b.ack.ch <- batchAckMsg{} // 最终在这里发送了确认ACK
}
Bingo!在Publish里面发送完成(可能失败,可能成功)之后,就会发送ACK。然后根据调用关系往回推,最终在report中发送了ack,第1个问题就解答了,ack信号就是这样来的。然后看第2个问题:收到信号之后,后续是如何将这个信号发送给前面的那个回调函数?接着看收到信号后调用的handleBatchSig
的代码:
// libbeat/publisher/queue/memqueue/ackloop.go
// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
if count > 0 {
if e := l.broker.eventer; e != nil {
e.OnACK(count)
}
// 这里会调用之前EventLoop的processACK,我们用的是bufferingEventLoop,所以会调用(*bufferingEventLoop)processACK
// report acks to waiting clients
l.processACK(lst, count) // libbeat/publisher/queue/memqueue/eventloop.go#(*bufferingEventLoop)processACK
}
}
// libbeat/publisher/queue/memqueue/eventloop.go
func (l *bufferingEventLoop) processACK(lst chanList, N int) {
for !lst.empty() {
// 重点在这里,这里会调用ackEvents发送确认消息
st.state.cb(int(count)) // libbeat/publisher/pipeline/acker.go (a *eventDataACK) ackEvents(n int)
}
}
}
handleBatchSig
里面调用了对应类型的eventLoop的processACK
方法,该方法内部会转到pipeline的acker.go,下面给出代码流转:
- libbeat/publisher/queue/memqueue/eventloop.go: (l *bufferingEventLoop) processACK --> st.state.cb(int(count))
- libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) ackEvents(n int) { a.acker.ackEvents(n) }
- libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) ackEvents(n int) { a.acker.ackEvents(n) }
- libbeat/publisher/pipeline/acker.go: func (a *gapCountACK) ackEvents(n int) {}
看下最后一个ackEvents
:
func (a *gapCountACK) ackEvents(n int) {
select {
case <-a.pipeline.ackDone: // pipeline is closing down -> ignore event
a.acks = nil
// ack数n写入了a.acks
case a.acks <- n: // send ack event to worker
}
}
// gapCountACK returns event ACKs to the producer, taking account for dropped events.
// Events being dropped by processors will always be ACKed with the last batch ACKed
// by the broker. This way clients waiting for ACKs can expect all processed
// events being always ACKed.
type gapCountACK struct {
pipeline *Pipeline
fn func(total int, acked int)
done chan struct{}
drop chan struct{}
acks chan int
events atomic.Uint32
lst gapList
}
数据会写入gapCountACK
的acks
之后,会在ackLoop
中读取:
// libbeat/publisher/pipeline/acker.go
func (a *gapCountACK) ackLoop() {
acks, drop := a.acks, a.drop
closing := false
for {
select {
case <-a.done:
closing = true
a.done = nil
if a.events.Load() == 0 {
// stop worker, if all events accounted for have been ACKed.
// If new events are added after this acker won't handle them, which may
// result in duplicates
return
}
case <-a.pipeline.ackDone:
return
case n := <-acks:
// 重点:从acks读出n之后调用handleACK处理
empty := a.handleACK(n)
if empty && closing && a.events.Load() == 0 {
// stop worker, if and only if all events accounted for have been ACKed
return
}
case <-drop:
// TODO: accumulate multiple drop events + flush count with timer
a.events.Sub(1)
a.fn(1, 0)
}
}
}
func (a *gapCountACK) handleACK(n int) bool {
a.events.Sub(uint32(total))
a.fn(total, acked) // line 326: func (a *boundGapCountACK) onACK(total, acked int) {
return emptyLst
}
从acks读出n之后调用handleACK
处理,后续的调用流程如下:
1. libbeat/publisher/pipeline/acker.go: handleACK --> a.fn(total, acked)
2. libbeat/publisher/pipeline/acker.go: func (a *boundGapCountACK) onACK(total, acked int) --> a.fn(total, acked)
3. libbeat/publisher/pipeline/acker.go: func (a *eventDataACK) onACK(total, acked int) --> a.fn(data, acked)
4. libbeat/publisher/pipeline/pipeline_ack.go: func (p *pipelineEventCB) onEvents(data []interface{}, acked int)
最终进入到pipelineEventCB
中,这个结构是内部处理ack的(我理解pipelineEventCB命名的含义是pipeline中event的callback函数,可见它是处理ack的核心),看下关键代码:
// libbeat/publisher/pipeline/pipeline_ack.go
// pipelineEventCB internally handles active ACKs in the pipeline.
// It receives ACK events from the queue and the individual clients.
// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect
// events from all clients having published events in the last batch of events
// being ACKed.
// the PipelineACKHandler will be notified, once all events being ACKed
// (including dropped events) have been collected. Only one ACK-event is handled
// at a time. The pipeline global and clients ACK handler will be blocked for the time
// an ACK event is being processed.
type pipelineEventCB struct {
done chan struct{}
acks chan int
// 这个字段是关键,确认信息会写到这个channel,然后在worker中读出,最终写入到Registrar的channel
events chan eventsDataMsg
droppedEvents chan eventsDataMsg
mode pipelineACKMode
handler beat.PipelineACKHandler
}
其中的events
字段是关键,确认信息会写到这个channel,然后在worker
中读出,最终写入到Registrar的channel。接着之前的调用,看数据如何写到events这个channel:
// reportEvents sends a batch of ACKed events to the ACKer.
// The events array contains send and dropped events. The `acked` counters
// indicates the total number of events acked by the pipeline.
// That is, the number of dropped events is given by `len(events) - acked`.
// A client can report events with acked=0, iff the client has no waiting events
// in the pipeline (ACK ordering requirements)
//
// Note: the call blocks, until the ACK handler has collected all active events
// from all clients. This ensure an ACK event being fully 'captured'
// by the pipeline, before receiving/processing another ACK event.
// In the meantime the queue has the chance of batching-up more ACK events,
// such that only one ACK event is being reported to the pipeline handler
func (p *pipelineEventCB) onEvents(data []interface{}, acked int) {
p.pushMsg(eventsDataMsg{data: data, total: len(data), acked: acked})
}
func (p *pipelineEventCB) pushMsg(msg eventsDataMsg) {
if msg.acked == 0 {
p.droppedEvents <- msg
} else {
msg.sig = make(chan struct{})
p.events <- msg // 此处写入channel后,在(p *pipelineEventCB) worker()的collect中读出,最后reportEventsData
<-msg.sig
}
}
在pushMsg
中,将消息写入events中。然后看下channel另一端的读取:
func (p *pipelineEventCB) worker() {
defer close(p.acks)
defer close(p.events)
defer close(p.droppedEvents)
for {
select {
case count := <-p.acks:
// 在collect中读取消息
exit := p.collect(count)
if exit {
return
}
// short circuit dropped events, but have client block until all events
// have been processed by pipeline ack handler
case msg := <-p.droppedEvents:
p.reportEventsData(msg.data, msg.total)
if msg.sig != nil {
close(msg.sig)
}
case <-p.done:
return
}
}
}
func (p *pipelineEventCB) collect(count int) (exit bool) {
for acked < count {
var msg eventsDataMsg
select {
// 在此处读取消息
case msg = <-p.events:
case msg = <-p.droppedEvents:
case <-p.done:
exit = true
return
}
}
p.reportEventsData(data, total)
return
}
func (p *pipelineEventCB) reportEventsData(data []interface{}, total int) {
// report ACK back to the beat
switch p.mode {
case countACKMode:
p.handler.ACKCount(total)
case eventsACKMode:
// 这里调用之前在 Filebeat中注册的 ACKEvents
p.handler.ACKEvents(data) // filebeat/beater/acker.go: func (a *eventACKer) ackEvents(data []interface{})
case lastEventsACKMode:
p.handler.ACKLastEvents(data)
}
}
可以看到从channel中读出确认消息后,最终会在reportEventsData
中调用之前在Filebeat中注册的ACKEvents:eventACKer。至此,第2个问题也得到了解答,并且和之前的回调函数成功对接。把之前的图补充完整如下:
总结
本文从源码角度对Filebeat的核心数据流转进行了简单的分析,为了突出重点,省掉了很多细节代码,比如各个环节的数据结构的实例是何时创建的,又是何时启动的,以及一些异常分支的处理、失败重传等。正如前文所说,本文并不是一篇全面的Filebeat代码深度剖析,而是通用配置下的核心数据流转分析。这篇文章我改了很多遍,总感觉没写什么东西,只是贴了了大量代码,然后讲了代码的调用点,而这些其实自己debug几遍也就清楚了。不过话说回来,如果事先知道整个逻辑流程,以及关键调用点,就能在debug时做到胸有成竹、有所侧重,从而节省很多时间。我想这篇文章对我最大的意义有两点:一个是让自己对Filebeat更加了解,另外一个是可能会给其它看源码的人节省一些时间。这就够了。
Reference:
很强,流程很清楚
感谢
客气~