本文讨论ElasticSearch Bucket Aggregations中的Term Aggregation结果准确性的问题,所有测试基于ES 7.1.0版本,DSL语句通过Kibana dev tool执行。

问题演示

我们先来看一个查询不准确的例子,该例子借鉴自官方文档(链接见文末),但为了操作方便,减小了数据规模。创建一个名为products,包含3个shard,0个副本的索引,及一个keyword类型的 name 字段。通过以下命令创建该索引,并写入一些测试数据:

# 创建索引
PUT products
{
  "settings": {
    "number_of_shards": 3, 
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "keyword"
      }
    }
  }
}

# 写入测试数据
PUT products/_bulk?refresh
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "B" }
{ "index" :{ "routing": "key1"}}
{ "name": "B" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "B" }
{ "index" :{ "routing": "key2"}}
{ "name": "B" }
{ "index" :{ "routing": "key2"}}
{ "name": "B" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "B" }
{ "index" :{ "routing": "key3"}}
{ "name": "B" }
{ "index" :{ "routing": "key3"}}
{ "name": "B" }
{ "index" :{ "routing": "key3"}}
{ "name": "C" }
{ "index" :{ "routing": "key3"}}
{ "name": "D" }
{ "index" :{ "routing": "key3"}}
{ "name": "D" }

# 查看数据分布
# GET _cat/shards/products?v
index    shard prirep state   docs store ip         node
products 2     p      STARTED   12 3.6kb 172.19.0.5 es7_01
products 1     p      STARTED   11 3.6kb 172.19.0.2 es7_02
products 0     p      STARTED   14 3.7kb 172.19.0.5 es7_01

通过上面的一些命令,我们构造了一个如下分布的索引数据:

shard

products索引包含A、B、C、D四种产品,每个产品都包含了若干条文档,且分布在不同的shard上面,总体情况如下:

  • 37 条数据,Shard 0上 14 条,Shard 1上 11 条, Shard 2上 12 条。
  • 产品 A 共 16 条数据,产品 B 共 8 条数据,产品 C 共 6 条数据,产品 D 共 7 条数据。

现在我们使用Term Aggregation求一下产品个数的Top 2产品:

GET products/_search
{
  "size": 0, 
  "aggs": {
    "product": {
      "terms": {
        "field": "name",
        "size": 2,
        "shard_size": 2
      }
    }
  }
}

返回结果如下:

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 37,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "product" : {
      "doc_count_error_upper_bound" : 12,
      "sum_other_doc_count" : 16,
      "buckets" : [
        {
          "key" : "A",
          "doc_count" : 16
        },
        {
          "key" : "C",
          "doc_count" : 5
        }
      ]
    }
  }
}

我们关注一下aggregations部分,Term Aggregation默认是按照文档个数 doc_count 降序排列的,所以可以看到Top 2的结果是A(16条),C(5条)。但根据前面构造的数据我们知道正确的Top 2产品应该是A(16条),B(8条)。那为什么ES计算的结果是不准确的呢?

因为ES的是分布式的,所以查询、聚集等任务会分派到各个节点,每个节点查询自己上面的所有Shard并返回计算结果,再由一个协调节点(Coordinating Node)汇总各个节点返回的数据,得到最终结果。以上面的查询为例,计算的是Top 2(即 size 的值为2),同时指定了 shard_size 为2,即每个shard返回2条数据(shard_size的默认值是size * 1.5 + 10,因为构造的原始数据量太小,如果使用默认值,每个节点就返回了几乎全部数据,不会存在准确性问题,所以这里显式的设置为2,用来说明问题)。具体执行的时候,每个shard会遍历自己的所有数据,求出Top M的数据(M为shard_size指定的值),在上例中M为2,所以各个Shard计算的Top 2为:

  • Shard 0 计算的Top 2:A(6), C(5)
  • Shard 1 计算的Top 2:A(5), B(3)
  • Shard 2 计算的Top 2:A(5), D(4)

协调节点汇总这些数据之后得到的结果为:A(16), B(3), C(5), D(4),所以最终返回的Top 2为A(16), C(5),即上面查询的返回结果。可以看到最终结果的不准确还是因为在计算的过程中没有任何一个环节能看到全量的数据。这其实也是分布式计算中的经典问题,CAP理论的一个具体体现。要保证快速响应,就无法一次计算全量数据;如果计算全量数据,就无法保证响应速度。比如流式计算实时性高,但并不是accurate的;离线计算虽然是accurate的,但又是非实时的,所以就诞生了Lambda架构:使用流式计算做预处理保证实时性及局部准确性,再由离线计算保证全局的accuracy。当然最新的流式框架已经在通过state机制解决流式计算的accuracy问题了,有兴趣的可以阅读《Streaming Systems》一书。

正因为结算结果可能存在的不准确性,Term Aggregation在返回结果中提供了一些评估不准确性的字段,总共有三个:

  • 全局的doc_count_error_upper_bound
  • sum_other_doc_count
  • 每个bucket的doc_count_error_upper_bound

其中前两个默认就会返回,最后一个需要在查询时显式的设置:

GET products/_search
{
  "size": 0, 
  "aggs": {
    "product": {
      "terms": {
        "field": "name",
        "size": 2,
        "shard_size": 2,
        "show_term_doc_count_error": true        # 将此参数设置为true
      }
    }
  }
}

# 返回结果
{
  "took" : 7,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 37,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "product" : {
      "doc_count_error_upper_bound" : 12,
      "sum_other_doc_count" : 16,
      "buckets" : [
        {
          "key" : "A",
          "doc_count" : 16,
          "doc_count_error_upper_bound" : 0   # 多出了一项
        },
        {
          "key" : "C",
          "doc_count" : 5,
          "doc_count_error_upper_bound" : 7   # 多出了一项
        }
      ]
    }
  }
}

下面我们来分析这三个用于评估准确性的值。

1. 全局的doc_count_error_upper_bound

这个值的计算方式是累加每个shard返回的最后一个term的doc_count值。上例中,每个Shard返回的最后一个term及其doc_count依次为:C(5), B(3), D(4), 所以doc_count_error_upper_bound的值为 5+3+4=12。

该值代表的含义是某个没有被纳入到最终结果的term的最大可能的doc_count值。举个例子,比如有一个产品E分别分布在三个shard上面,且依次为E(5), E(3), E(4),因为它和每个shard上的前一个term的doc_count相同,所以可能不会被最终返回,而它的实际doc_count就是5+3+4=12。也就是最坏的情况下可能有一个doc_count为12(在上面的例子中按照doc_count应该排在第2位)的一个产品没有返回到最终Top N的结果中。

2. 每个bucket的doc_count_error_upper_bound

这个值的计算方式是累加没有返回该bucket的所有Shard的最后一个term的doc_count值。有些拗口,上例中,比如对于A的bucket,其doc_count_error_upper_bound值为0,是因为3个shard的返回中都包含了A。而对于C的bucket,Shard 1和Shard 2都没有返回C,所以它的doc_count_error_upper_bound值就是Shard 1和Shard 2的最后一个term的doc_count值之和,即 3+4=7.

3. sum_other_doc_count

这个值的含义是最终没有被返回的doc的总数,计算方式是总doc数减去返回的doc数。上例中,总的doc数为37,返回的doc数为16+5=21,所以没返回的doc总数就是37-21=16.

该值代表的含义是最差情况下统计的doc_count的偏差。比如C的实际doc_count是6,而最终返回的结果是5,差了1. 而最差情况下可能的差值是7,这种情况就是Shard 1和Shard 2中都包含C,且doc_count依次为3和4,但因为它们和B、D的doc_count相同,可能被排在了后面,没有返回到最终结果里面。换句话说,如果该值为0,那就代表该bucket的doc_count值是准确的,否则就可能是不准确的,最大可能的偏差就是该字段的值。该值的另外一个作用就是可以作为shard_size值的参考。

最后需要注意的是,上面两个doc_count_error_upper_bound的值只有在采用doc_count进行降序排列(默认情况)时才会计算,doc_count升序或者采用其它字段排序时是不计算的(不计算的时候其值可能是0——按照_key排序,也可能是-1——按照sub aggregation排序)。

不准确的解决方案

那该如何解决这种不准确性呢?

  • 方案1:增大shard_size的值。很显然,shard_size的值如果不小于每个shard上面的doc数量,就相当于把所有shard上面符合过滤条件(如果有的话)的数据全部发送给协调节点去处理,也就是全局全量数据处理了,自然不会出现准确性问题。但如果数据量比较大,显然协调节点会成为处理瓶颈。所以该方案在实际中存在一定的限制,只能在情况允许的情况下增加shard_size的值以提高准确性,但不可能无限制的增加。
  • 方案2:将所有数据存储到一个shard上面。这样就避免了分布式计算,自然也不会产生不准确的问题。但如果数据量很大,就会造成shard很大,单个计算很容易成为瓶颈,所以该方案仅适用于数据量不是很大的场景,比如一些指标数据。
  • 方案3:索引时使用Term Aggregation的field字段做routing,保证同一个bucket的数据存储在同一个shard上面,这样也可以避免不准确的问题。潜在的风险就是如果这个field的数据分布不均匀,那最终的存储就可能不均衡。

每种方案都有利有弊,需要根据实际情况进行选择。

References

  1. Elasticsearch Reference#Term Aggregation