ES作为全文检索兼存储系统,数据可靠性至关重要,本文讨论ES是如何实现数据可靠性的。ES底层基于Lucene,所以有必要先搞清楚一些相关的概念。

refresh && flush && commit

Lucene中,有flush和commit的概念。所谓flush,就是定期将内存Buffer里面的数据刷新到Directory这样一个抽象的文件存储层,其实就是生成segment。需要注意的是,因为操作系统file cache的原因,这里的刷新未必会真的落盘,一般只是从内存buffer刷到了file cache而已,实质还是在内存中,所以是一个相对比较高效和轻量级的操作。flush方法的java doc是这样描述的:

Moves all in-memory segments to the Directory, but does not commit (fsync) them (call commit() for that).”

关于segment再补充一点:形成segment以后,数据就可以被搜索了,但因为Lucene flush一般比较频繁(ES里面执行频率默认是1秒),所以会产生很多小的segment文件,一方面太多的文件会占用太多的文件描述符;另一方面,搜索时文件太多也会影响搜索效率,所以Lucene有专门的Merge线程定期将小的segment文件merge为大文件。

Lucene的commit上面的Java doc已经提到了,它会调用fsync,commit方法的java doc如下:

Commits all pending changes (added and deleted documents, segment merges, added indexes, etc.) to the index, and syncs all referenced index files, such that a reader will see the changes and the index updates will survive an OS or machine crash or power loss.

因为会调用fsync,所以commit之后,文件肯定会被持久化到磁盘上,所以这是一个重操作,一方面是磁盘的性能比较差,另一方面是commit的时候会执行更新索引等操作。commit一般是当我们认为系统到达一个稳定点的时候,commit一次,类似于流式系统里面的checkpoint。当系统出现故障的时候,Lucene会从最近的一次commit point进行恢复,而不是最近的一次flush。

总结一下,flush会生成segment,之后数据就能被搜索了,是一个轻量级操作,但此时并不保证数据被持久化了。commit是一个比较重的落盘操作,用于持久化,不能被频繁执行。

以上是Lucene,现在我们来看ES。ES中有refresh和flush的概念,其实是和Lucene一一对应的,不过换了个名字。ES里面的refresh就是Lucene里面的flush;ES里面的flush就是Lucene里面的commit。所以,ES里面的refresh默认1秒执行一次,也就是数据写入ES之后最快1秒之后可以被搜索到,这也就是ES提供的近实时搜索NRT(Near Realtime)。而flush的执行时机有两个点:一个是ES会根据内存使用情况启发式的执行flush,另外一个时机是当translog达到512MB(默认值)时执行一次flush。网上很多文章(一般都比较早了)都提到每30分钟也会执行一次,但我在6.6版本的代码及文档里面没有找到这部分说明。

这里提到了translog,下面我们来介绍一下translog。

translog

仔细想一下上面介绍的Lucene的flush和commit,就会发现如果在数据还没有被commit之前,机器宕掉了,那上次commit之后到宕机前的数据就丢了。实际上,Lucene也是这么做的,异常恢复时会丢掉最后一次commit之后的数据(如果有的话)。这对于绝大多数业务是不能接受的,所以ES引入了translog来解决这个问题。其实也不算什么新技术,就是类似于传统DB里面的预写日志(WAL),不过在ES里面叫事务日志(transaction log),简称translog。

这样ES的写入的时候,先在内存buffer中进行Lucene documents的写入,写入成功后再写translog。内存buffer中的Lucene documents经过refresh会形成file system cache中的segment,此时,内容就可以被搜索到了。然后经过flush,持久化到磁盘上面。整个流程如下图:

写入流程

这里有几个问题需要说明一下。

  1. 为了保证数据完全的可靠,一般的写入流程都是先写WAL,再写内存,但ES是先写内存buffer,然后再写translog。这个顺序目前没有找到官方的说明,网上大部分说的是写的过程比较复杂,容易出错,先写内存可以降低处理的复杂性。不过这个顺序个人认为对于用户而言其实不是很关键,因为不管先写谁,最终两者都写成功才会返回给客户端。
  2. translog的落盘(即图中的fsync过程)有两种策略,分别对应不同的可靠程度。第一种是每次请求(一个index、update、delete或者bulk操作)都会执行fsync,fsync成功后,才会给客户端返回成功,也就是请求同步刷盘,这种可靠性最高,只要返回成功,那数据一定已经落盘了,这也是默认的方式。第二种是异步的,按照定时达量的方式,默认每5秒或者512MB的时候就fsync一次。异步一般可以获得更高的吞吐量,但弊端是存在数据丢失的风险。
  3. ES的flush(或者Lucene的commit)也是落盘,为什么不直接用,而加一个translog?translog或者所有的WAL的一大特性就是他们都是追加写,这样大多数时候都可以实现顺序读写,读和写可以完全并发,所以性能一般都是非常好的。有一些测试表明,磁盘的顺序写甚至比内存的随机写更快,见The Pathologies of Big Data的Figure 3。
  4. translog不是全局的,而是每个shard(也就是Lucene的index)一个,且每个shard的所有translog中同一时刻只会有一个translog文件是可写的,其它都是只读的(如果有的话)。具体细节可查看Translog类的Java doc说明。
  5. translog的老化机制在6.0之前是segment flush到磁盘后,就删掉了。6.0之后是按定时达量的策略进行删除,默认是512MB或者12小时。

因为ES的版本更迭比较快,配置项也是经常变更,所以上面没有列出具体的配置项,这里的默认值都来自 ES 6.8 translog文档,有兴趣的可以点击链接查看。

副本

引入translog之后,解决了进程突然挂掉或者机器突然宕机导致还处于内存,没有被持久化到磁盘的数据丢失的问题,但数据仅落到磁盘还是无法完全保证数据的安全,比如磁盘损坏等。分布式领域解决这个问题最直观和最简单的方式就是采用副本机制,比如HDFS、Kafka等都是此类代表,ES也使用了副本的机制。一个索引由一个primary和n(n≥0)个replica组成,replica的个数支持可以通过接口动态调整。为了可靠,primary和replica的shard不能在同一台机器上面。

这里要注意区分一下replica和shard的关系:比如创建一个索引的时候指定了5个shard,那么此时primary分片就有5个shard,每个replica也会有5个shard。我们可以通过接口随意修改replica的个数,但不能修改每个primary/replica包含5个shard这个事实。 当然,shard的个数也可以通过shrink接口进行调整,但这是一个很重的操作,而且有诸多限制,比如shard个数只能减少,且新个数是原来的因子,比如原来是8个shard,那只能选择调整为4、2或1个;如果原来是5个,那就只能调整为1个了。所以实际中,shard的个数一般要预先计划好(经验值是保证一个shard的大小在30~50GB之间),而replica的个数可以根据实际情况后面再做调整。

在数据写入的时候,数据会先写primary,写成功之后,同时分发给多个replica,待所有replica都返回成功之后,再返回给客户端。所以一个写请求的耗时可以粗略认为是写primary的时间+耗时最长的那个replica的时间。

引申:写入优化

总体来说,ES的写入能力不算太好,所以经常需要对写入性能做优化,除了保证良好的硬件配置外,还可以从ES自身进行机制进行优化,结合上面的介绍,可以很容易得出下面的一些优化手段:

  1. 如果对于搜索的实时性要求不高,可以适当增加refresh的时间,比如从默认的1秒改为30秒或者1min,甚至更长。如果是离线导入再搜索的场景,可以直接设置为"-1",即关闭自动的refresh,等导入完成后,通过接口手动refresh。其提高性能的原理是增加refresh的时间可以减少大量小的segment文件,这样在可以提高flush的效率,减小merge的压力。
  2. 如果对于数据可靠性要求不是特别高,可以将translog的落盘机制由默认的请求同步落盘,改为定时达量的异步落盘,提高落盘的效率。
  3. 如果对于数据可靠性要求不是特别高,可以在写入高峰期先不设置副本,待过了高峰之后再通过接口增加副本。这个可以通过ES的ILM策略,实现自动化。

优化往往都是有代价的,需要根据自己的实际场景进行评估。

引申:写入流程代码分析

本来计划是写一篇从原理、源码分析ES关于写入流程的文章,但发现已经有一些比较好的文章了,就换了个角度,从使用角度宏观介绍了用户一般比较关心的数据可靠性问题。这里推荐两篇介绍ES写入流程的文章:

这两篇文章都结合代码对写入流程进行了深入解析,文章非常不错,这里我再补充一些细节,供有兴趣debug源码的读者阅读。源码阅读环境搭建见我之前的文章IDEA或Eclipse中编译调试ElasticSearch源码,里面使用的ES版本是6.6(6.6.3 snapshot),下面代码的说明也基于这个版本。

ES提供两种类型的接口:对外的REST接口,对内的Transport接口。早一点的版本中,transport接口也开放给外部客户端使用,但现在已经逐步只用于内部节点之间通信了。不过REST接口其实只是在Transport接口进行了封装,请求到ES之后还是会转发到Transport接口对应的处理器去处理。REST和Transport接口的处理器都是在ActionModule类中注册的:

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
    
    // 省略部分代码

    ActionRegistry actions = new ActionRegistry();

    // Transport接口处理器注册
    actions.register(MainAction.INSTANCE, TransportMainAction.class);
    actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
    actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
    actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
    actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
    actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
    actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
    actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
    actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

    // 省略部分代码
}


public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
    List<AbstractCatAction> catActions = new ArrayList<>();
    Consumer<RestHandler> registerHandler = a -> {
        if (a instanceof AbstractCatAction) {
            catActions.add((AbstractCatAction) a);
        }
    };

    // REST接口处理器注册
    registerHandler.accept(new RestMainAction(settings, restController));
    registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
    registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
    registerHandler.accept(new RestNodesStatsAction(settings, restController));
    registerHandler.accept(new RestNodesUsageAction(settings, restController));
    registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
    registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
    registerHandler.accept(new RestClusterStatsAction(settings, restController));
    registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter));
    registerHandler.accept(new RestClusterHealthAction(settings, restController));
    // 省略部分代码
}

所以我们要调试某个功能的时候其实只要在这里找到REST接口的处理器以及其对应的Transport接口对应的处理器即可。这里以数据写入为例,ES的写入主要分单条的index和批量的bulk操作,单条其实是bulk的一种特殊情况,所以处理复用的是bulk的逻辑。

写入的REST接口处理器是RestIndexAction

// 单条
registerHandler.accept(new RestIndexAction(settings, restController));
// 批量
registerHandler.accept(new RestBulkAction(settings, restController));

它们的构造函数里面也明确的定义了我们使用的REST接口:

public RestIndexAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
    controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
    controller.registerHandler(POST, "/{index}/{type}/{id}", this);
    CreateHandler createHandler = new CreateHandler(settings);
    controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
    controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
}

public RestBulkAction(Settings settings, RestController controller) {
    super(settings);

    controller.registerHandler(POST, "/_bulk", this);
    controller.registerHandler(PUT, "/_bulk", this);
    controller.registerHandler(POST, "/{index}/_bulk", this);
    controller.registerHandler(PUT, "/{index}/_bulk", this);
    controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
    controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

    this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
}

这两个接口最终都会调用TransportBulkAction,该类会将bulk中的操作按照shard进行分类,然后交给TransportBulkAction执行。这里我以index操作为例列出调用链上的关键类和方法,供参考:

  1. RestIndexAction#prepareRequest:解析参数,构建IndexRequest,然后转发给TransportBulkAction#doExecute执行
  2. TransportBulkAction

    • doExecute:检查是否需要创建索引,如果需要则创建
    • executeIngestAndBulk:检查是否需要执行pipeline,如果有且本节点是ingest节点,则执行,否则转发到ingest节点
    • executeBulk->BulkOperation#doRun:按需生成doc id,计算操作在哪个shard执行,按照shard进行分组
  3. ReplicationOperation#execute:写主分片、写副本分片,等待返回。下面给出写主分片的调用,副本分片同理。

    • primaryResult = primary.perform(request); -> TransportReplicationAction#perform -> TransportShardBulkAction#shardOperationOnPrimary -> TransportShardBulkAction#performOnPrimary -> TransportShardBulkAction#executeBulkItemRequest ->

      TransportShardBulkAction#executeIndexRequestOnPrimary ->

      IndexShard#applyIndexOperationOnPrimary -> IndexShard#index ->

      InternalEngine#index

这里看下最后的InternalEngine#index部分关键代码:

 @Override
public IndexResult index(Index index) throws IOException {    
    
    // 省略部分代码
    
    final IndexResult indexResult;
    if (plan.earlyResultOnPreFlightError.isPresent()) {
        indexResult = plan.earlyResultOnPreFlightError.get();
        assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
    } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
        // 这里会调用 Lucene 的接口
        indexResult = indexIntoLucene(index, plan);
    } else {
        indexResult = new IndexResult(
                plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
    }
    if (index.origin().isFromTranslog() == false) {
        final Translog.Location location;
        if (indexResult.getResultType() == Result.Type.SUCCESS) {
            // Lucene写成功后写 translog
            location = translog.add(new Translog.Index(index, indexResult));
        } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
            // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
            final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
                index.startTime(), indexResult.getFailure().toString());
            location = innerNoOp(noOp).getTranslogLocation();
        } else {
            location = null;
        }
        indexResult.setTranslogLocation(location);
    }
        
    // 省略部分代码
    
}

在indexIntoLucene里面就可以看到熟悉的Lucene接口IndexWriter了:

private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
    if (docs.size() > 1) {
        indexWriter.addDocuments(docs);
    } else {
        indexWriter.addDocument(docs.get(0));
    }
    numDocAppends.inc(docs.size());
}


private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
    if (softDeleteEnabled) {
        if (docs.size() > 1) {
            indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
        } else {
            indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
        }
    } else {
        if (docs.size() > 1) {
            indexWriter.updateDocuments(uid, docs);
        } else {
            indexWriter.updateDocument(uid, docs.get(0));
        }
    }
    numDocUpdates.inc(docs.size());
}

以上就是一些关键代码。

总结

ES的数据可靠性是从两个维度进行保证的:一方面,通过增加translog,解决了因为进程挂掉和机器宕机引发的内存数据丢失的问题,另一方面,通过副本机制解决了单点故障(当然还可以分担查询的压力。