NYC's Blog - ElasticSearch 2021-05-30T21:21:00+08:00 Typecho http://niyanchun.com/feed/atom/tag/elasticsearch/ <![CDATA[ES基于磁盘的shard分配机制浅析]]> http://niyanchun.com/es-disk-based-shard-allocation.html 2021-05-30T21:21:00+08:00 2021-05-30T21:21:00+08:00 NYC https://niyanchun.com 先回顾几个概念:ES的Index是个逻辑概念,实际由若干shard组成,而shard就是Lucene的Index,即真正存储数据的实体。当有数据需要存储的时候,就需要先分配shard。具体来说需要分配shard的场景包括:数据恢复,主分片(primary)、副本分片的分配,再平衡(rebalancing),节点的新增、删除。对于分布式存储系统来说,数据的分布非常重要,ES shard的分配工作由ES的master节点负责。

ES提供了多种分配策略的支持,简单来说就是用户可以通过配置定义一些“策略”或者叫“路由规则”,然后ES会在遵守这些策略的前提下,尽可能的让数据均匀分布。比如可以配置机房、机架属性,ES会尽量让主数据和副本数据分配在不同的机房、机架,起到容灾的作用。再比如,可以配置一些策略,让数据不分配到某些节点上面,这在滚动升级或者数据迁移的时候非常有用。不过本文并不会介绍所有这些策略,只聚焦于默认的基于磁盘的分配策略,因为这部分是最常用的。

先说一下再平衡。

再平衡

再平衡这个概念在分布式存储系统里面很常见,几乎是标配。因为各种各样的原因(比如分配策略不够智能、新增节点等),系统各个节点的数据存储可能分布不均,这时候就需要有能够重新让数据均衡的机制,即所谓的再平衡。有些系统的再平衡需要用户手动执行,有些则是自动的。ES就属于后者,它的再平衡是自动的,用户不参与,也几乎不感知。

那到底怎样才算平衡(balanced)?ES官方对此的定义是:

A cluster is balanced when it has an equal number of shards on each node without having a concentration of shards from any index on any node.

简单说就是看每个节点上面的shard个数是否相等,越相近就越平衡。这里注意计数的时候是“无差别”的,即不管是哪个索引的shard,也不管是主分片的shard,还是副本的shard,一视同仁,只算个数。ES后台会有进程专门检查整个集群是否平衡,以及执行再平衡的操作。再平衡也就是将shard数多的迁移到shard数少的节点,让集群尽可能的平衡。

关于再平衡,需要注意2个点:

  1. 平衡的状态是一个范围,而不是一个点。即不是说各个节点的shard数严格相等才算平衡,而是大家的差别在一个可接受的范围内就算平衡。这个范围(也称阈值或权重)是可配置的,用户一般是无需参与的。
  2. 再平衡是一个尽力而为的动作,它会在遵守各种策略的前提下,尽量让集群趋于平衡。

看个简单的例子吧。有一个集群刚开始有2个节点(node42,node43),我们创建一个1 replica、6 primary shard的索引shard_alloc_test

PUT shard_alloc_test
{
  "settings": {
      "index": {   
          "number_of_shards" : "6",            
          "number_of_replicas": "1"
      }
  }   
}

查看一下shard的分配:

GET _cat/shards?v
index                               shard prirep state   docs  store ip        node
shard_alloc_test                    3     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    3     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    2     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    2     r      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    4     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    4     r      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    1     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    1     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    5     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    5     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    0     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    0     r      STARTED    0   208b 10.8.4.42 node-42

可以看到,primary的6个shard和replica的6个shard的分配是非常均衡的:一方面,12个shard均匀分配到了2个节点上面;另一方面,primary shard和replica shard也是均匀交叉的分配到了2个节点上面。

此时,我们对集群进行扩容,再增加一台节点:node-41。待节点成功加入集群后,我们看一下shard_alloc_test的shard分配:

GET _cat/shards?v
index                               shard prirep state   docs  store ip        node
shard_alloc_test                    3     p      STARTED    0   208b 10.8.4.41 node-41
shard_alloc_test                    3     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    2     p      STARTED    0   208b 10.8.4.41 node-41
shard_alloc_test                    2     r      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    4     p      STARTED    0   208b 10.8.4.41 node-41
shard_alloc_test                    4     r      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    1     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    1     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    5     p      STARTED    0   208b 10.8.4.41 node-41
shard_alloc_test                    5     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    0     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    0     r      STARTED    0   208b 10.8.4.42 node-42


GET _cat/tasks?v
action                         task_id                       parent_task_id              type      start_time    timestamp running_time ip        node
indices:data/read/get[s]       0KbNzeuGR1iZ1I1_3fIDVg:192304 -                           transport 1622513252244 02:07:32  1.6ms        10.8.4.42 node-42
indices:data/read/get          jgfZk3snRb-uEUToQgo9pw:1841   -                           transport 1622513252487 02:07:32  1.4ms        10.8.4.43 node-43

可以看到,shard自动的进行了再分配,均匀的分配到了3个节点上面。如果再平衡的时间稍长一点,你还可以通过task接口看到集群间的数据迁移任务。然后我们再缩容,停掉node-41这个节点,数据也会再次自动重新分配:

GET _cat/shards?v
index                               shard prirep state   docs  store ip        node
shard_alloc_test                    3     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    3     r      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    2     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    2     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    1     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    1     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    4     r      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    4     p      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    5     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    5     r      STARTED    0   208b 10.8.4.42 node-42
shard_alloc_test                    0     p      STARTED    0   208b 10.8.4.43 node-43
shard_alloc_test                    0     r      STARTED    0   208b 10.8.4.42 node-42

所以,ES的再平衡功能还是非常好用和易用的,完全自动化。但是细心的同学应该已经意识到一个问题:光靠保证shard个数均衡其实是没法保证数据均衡的,因为有些shard可能很大,存了很多数据;有些shard可能很小,只存了几条数据。的确是这样,所以光靠再平衡还是无法保证数据的均衡的,至少从存储容量的角度来说是不能保证均衡的。所以,ES还有一个默认就开启的基于磁盘容量的shard分配器。

基于磁盘容量的shard分配

基于磁盘容量的shard分配策略(Disk-based shard allocation)默认就是开启的,其机制也非常简单,主要就是3条非常重要的分水线(watermark):

  • low watermark:默认值是85%。磁盘使用超过这个阈值,就认为“危险”快来了,这个时候就不会往该节点再分配replica shard了,但新创建的索引的primary shard还是可以分配。特别注意必须是新创建的索引(什么是“老的”?比如再平衡时其它节点上已经存在的primary shard就算老的,这部分也是不能够迁移到进入low watermark的节点上来的)。
  • high watermark:默认值是90%。磁盘使用超过这个阈值,就认为“危险”已经来了,这个时候不会再往该节点分配任何shard,即primary shard和replica shard都不会分配。并且会开始尝试将节点上的shard迁移到其它节点上。
  • flood stage watermark:默认值是95%。磁盘使用超过这个阈值,就认为已经病入膏肓了,需要做最后的挽救了,挽救方式也很简单——断臂求生:将有在该节点上分配shard的所有索引设置为只读,不允许再往这些索引写数据,但允许删除索引(index.blocks.read_only_allow_delete)。

大概总结一下:

  1. 当进入low watermark的时候,就放弃新创建的索引的副本分片数据了(即不创建对应的shard),但还是允许创建主分片数据;
  2. 当进入high watermark的时候,新创建索引的主分片、副本分片全部放弃了,但之前已经创建的索引还是可以正常继续写入数据的;同时尝试将节点上的数据向其它节点迁移;
  3. 当进入flood stage watermark,完全不允许往该节点上写入数据了,这是最后一道保护。只要在high watermark阶段,数据可以迁移到其它节点,并且迁移的速度比写入的速度快,那就不会进入该阶段。

一些相关的配置如下:

  • cluster.routing.allocation.disk.threshold_enabled:是否开启基于磁盘的分配策略,默认为true,表示开启。
  • cluster.info.update.interval:多久检查一次磁盘使用,默认值是30s。
  • cluster.routing.allocation.disk.watermark.low:配置low watermark,默认85%。
  • cluster.routing.allocation.disk.watermark.high:配置high watermark,默认90%。
  • cluster.routing.allocation.disk.watermark.flood_stage:配置flood stage watermark,默认95%。

后面3个配置阈值的配置项除了可以使用百分比以外,也可以使用具体的值,比如配置为low watermark为10gb,表示剩余空闲磁盘低于10gb的时候,就认为到low watermark了。但是需要注意,要么3个配置项都配置百分比,要么都配置具体的值,不允许百分比和具体的值混用。

另外需要注意:如果一个节点配置了多个磁盘,决策时会采用磁盘使用最高的那个。比如一个节点有2个磁盘,一个磁盘使用是84%,一个使用是86%,那也认为该节点进入low watermark了。

最后,如果节点进入flood stage watermark阶段,涉及的索引被设置成read-only以后,如何恢复呢?第一步当然是先通过删数据或增加磁盘/节点等方式让磁盘使用率降到flood stage watermark的阈值以下。然后第二步就是恢复索引状态,取消只读。在7.4.0及以后版本,一旦检测到磁盘使用率低于阈值后,会自动恢复;7.4.0以前的版本,必须手动执行以下命令来取消只读状态

// 恢复单个索引
PUT /索引名称/_settings
{
  "index.blocks.read_only_allow_delete": null
}

// 恢复所有索引
PUT _settings
{
    "index": {
        "blocks": {"read_only_allow_delete": null}
    }
}

// curl命令
curl -XPUT -H "Content-Type: application/json" http://localhost:9200/_all/_settings -d '{"index.blocks.read_only_allow_delete": null}'

下面看一下具体的例子。还是前面node-42和node-43组成的集群,每个节点的磁盘总空间是1GB。

小技巧:为了方便验证这个功能,Linux用户可以挂载小容量的内存盘来进行操作。这样既免去了没有多个磁盘的烦恼,而且磁盘大小可以设置的比较小,容易操作。比如我测试的2个节点的数据目录就是挂载的1GB的内存盘。

为了方便验证和看的更加清楚,我重新设置几个watermark的阈值,这里不使用百分比,而是用具体的值。

PUT _cluster/settings
{
  "transient": {
    "cluster.routing.allocation.disk.watermark.low": "800mb",
    "cluster.routing.allocation.disk.watermark.high": "600mb",
    "cluster.routing.allocation.disk.watermark.flood_stage": "500mb",
    "cluster.info.update.interval": "10s"
  }
}

# 检查下磁盘使用
GET _cat/nodes?v&h=n,dt,du,dup
n        dt    du  dup
node-43 1gb 328kb 0.03
node-42 1gb 328kb 0.03

也就是当空闲磁盘低于800mb时进入low watermark;低于600mb时进入high watermark;低于500mb时进入flood stage;每10秒检查一次。接下来写入一些数据,先让磁盘进入low watermark。

// 为了查看方便,使用1个 primary shard,1个副本
PUT shard_alloc_test
{
  "settings": {
      "index": {   
          "number_of_shards" : "1",            
          "number_of_replicas": "1"
      }
  }   
}

// 磁盘低于800mb,进入low watermark
GET _cat/nodes?v&h=n,dt,du,dup,da
n        dt      du   dup      da
node-43 1gb 236.1mb 23.06 787.8mb
node-42 1gb 236.1mb 23.06 787.8mb

// 进入low watermark时的日志
[2021-06-01T10:24:59,795][INFO ][o.e.c.r.a.DiskThresholdMonitor] [node-42] low disk watermark [800mb] exceeded on [jgfZk3snRb-uEUToQgo9pw][node-43][/tmp/es-data/nodes/0] free: 799mb[78%], replicas will not be assigned to this node
[2021-06-01T10:24:59,799][INFO ][o.e.c.r.a.DiskThresholdMonitor] [node-42] low disk watermark [800mb] exceeded on [0KbNzeuGR1iZ1I1_3fIDVg][node-42][/tmp/es-data/nodes/0] free: 799mb[78%], replicas will not be assigned to this node

可以看到,空闲磁盘小于800MB的时候就进入了low watermark,ES也有对应的日志提示。这个时候副本已经不能分配到这节点上了,我们新建一个索引shard_alloc_test1验证一下。

PUT shard_alloc_test1
{
  "settings": {
      "index": {   
          "number_of_shards" : "1",            
          "number_of_replicas": "1"
      }
  }   
}

GET _cat/shards?v
index                               shard prirep state      docs  store ip        node
shard_alloc_test                    0     p      STARTED       0  3.9mb 10.8.4.43 node-43
shard_alloc_test                    0     r      STARTED       0  3.9mb 10.8.4.42 node-42
shard_alloc_test1                   0     p      STARTED       0   208b 10.8.4.43 node-43
shard_alloc_test1                   0     r      UNASSIGNED                       

可以看到,shard_alloc_test1的primary shard分配了,但replica shard没有分配,符合预期。再接着往shard_alloc_test写数据,让进入high watermark。

GET _cat/nodes?v&h=n,dt,du,dup,da
n        dt      du   dup      da
node-43 1gb 468.2mb 45.73 555.7mb
node-42 1gb 468.1mb 45.72 555.8mb

[2021-06-01T10:32:59,875][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] high disk watermark [600mb] exceeded on [jgfZk3snRb-uEUToQgo9pw][node-43][/tmp/es-data/nodes/0] free: 555.8mb[54.2%], shards will be relocated away from this node; currently relocating away shards totalling [0] bytes; the node is expected to continue to exceed the high disk watermark when these relocations are complete
[2021-06-01T10:32:59,876][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] high disk watermark [600mb] exceeded on [0KbNzeuGR1iZ1I1_3fIDVg][node-42][/tmp/es-data/nodes/0] free: 555.7mb[54.2%], shards will be relocated away from this node; currently relocating away shards totalling [0] bytes; the node is expected to continue to exceed the high disk watermark when these relocations are complete

可以看到,磁盘低于600MB的时候,进入high watermark。这个时候应该不会往该节点分配任何shard了(同时因为只有2个节点,且都引入high watermark了,所以也无法将节点上的shard迁移到其它节点),我们创建新索引shard_alloc_test2验证一下。

PUT shard_alloc_test2
{
  "settings": {
      "index": {   
          "number_of_shards" : "1",            
          "number_of_replicas": "1"
      }
  }   
}


GET _cat/shards?v
index                               shard prirep state      docs  store ip        node
shard_alloc_test                    0     p      STARTED       0 17.5mb 10.8.4.43 node-43
shard_alloc_test                    0     r      STARTED       0 17.5mb 10.8.4.42 node-42
shard_alloc_test2                   0     p      UNASSIGNED                       
shard_alloc_test2                   0     r      UNASSIGNED                       
shard_alloc_test1                   0     p      STARTED       0   208b 10.8.4.43 node-43
shard_alloc_test1                   0     r      UNASSIGNED                       

可以看到,主分片、副本分片的shard都没有分配,符合预期。虽然新创建的索引的shard无法分配,但原有的索引还是可以正常写的,我们继续写数据,使磁盘进入flood stage。

GET _cat/nodes?v&h=n,dt,du,dup,da
n        dt      du   dup      da
node-43 1gb 535.3mb 52.28 488.6mb
node-42 1gb 535.8mb 52.33 488.1mb

[2021-06-01T10:42:20,024][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] flood stage disk watermark [500mb] exceeded on [jgfZk3snRb-uEUToQgo9pw][node-43][/tmp/es-data/nodes/0] free: 488.6mb[47.7%], all indices on this node will be marked read-only
[2021-06-01T10:42:20,024][WARN ][o.e.c.r.a.DiskThresholdMonitor] [node-42] flood stage disk watermark [500mb] exceeded on [0KbNzeuGR1iZ1I1_3fIDVg][node-42][/tmp/es-data/nodes/0] free: 488.1mb[47.6%], all indices on this node will be marked read-only

顺利进入flood stage,索引被设置为read-only。此时,客户端再次写入会收到类似如下错误(这里是JSON格式的日志):

{
  "error" : {
    "root_cause" : [
      {
        "type" : "cluster_block_exception",
        "reason" : "index [shard_alloc_test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];"
      }
    ],
    "type" : "cluster_block_exception",
    "reason" : "index [shard_alloc_test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded flood-stage watermark, index has read-only-allow-delete block];"
  },
  "status" : 429
}

也就是当你的客户端出现“read-only-allow-delete block”错误日志时,表名ES的磁盘空间已经满了。如果是7.4.0版本之前的ES,除了恢复磁盘空间外,还要手动恢复索引的状态,取消只读。

总结

shard是ES中非常重要的一个概念,而且大部分时候我们不需要太多的关注shard分配的细节,ES默认就会帮我们处理好。但基本的原理还是要有一些了解,一方面可以让我们事先设计合理的方案;另一方面当出现问题时,也知道问题原因和解决方案。

]]>
<![CDATA[ES数据可靠性分析]]> http://niyanchun.com/es-data-reliability.html 2020-09-16T08:12:00+08:00 2020-09-16T08:12:00+08:00 NYC https://niyanchun.com ES作为全文检索兼存储系统,数据可靠性至关重要,本文讨论ES是如何实现数据可靠性的。ES底层基于Lucene,所以有必要先搞清楚一些相关的概念。

refresh && flush && commit

Lucene中,有flush和commit的概念。所谓flush,就是定期将内存Buffer里面的数据刷新到Directory这样一个抽象的文件存储层,其实就是生成segment。需要注意的是,因为操作系统file cache的原因,这里的刷新未必会真的落盘,一般只是从内存buffer刷到了file cache而已,实质还是在内存中,所以是一个相对比较高效和轻量级的操作。flush方法的java doc是这样描述的:

Moves all in-memory segments to the Directory, but does not commit (fsync) them (call commit() for that).”

关于segment再补充一点:形成segment以后,数据就可以被搜索了,但因为Lucene flush一般比较频繁(ES里面执行频率默认是1秒),所以会产生很多小的segment文件,一方面太多的文件会占用太多的文件描述符;另一方面,搜索时文件太多也会影响搜索效率,所以Lucene有专门的Merge线程定期将小的segment文件merge为大文件。

Lucene的commit上面的Java doc已经提到了,它会调用fsync,commit方法的java doc如下:

Commits all pending changes (added and deleted documents, segment merges, added indexes, etc.) to the index, and syncs all referenced index files, such that a reader will see the changes and the index updates will survive an OS or machine crash or power loss.

因为会调用fsync,所以commit之后,文件肯定会被持久化到磁盘上,所以这是一个重操作,一方面是磁盘的性能比较差,另一方面是commit的时候会执行更新索引等操作。commit一般是当我们认为系统到达一个稳定点的时候,commit一次,类似于流式系统里面的checkpoint。当系统出现故障的时候,Lucene会从最近的一次commit point进行恢复,而不是最近的一次flush。

总结一下,flush会生成segment,之后数据就能被搜索了,是一个轻量级操作,但此时并不保证数据被持久化了。commit是一个比较重的落盘操作,用于持久化,不能被频繁执行。

以上是Lucene,现在我们来看ES。ES中有refresh和flush的概念,其实是和Lucene一一对应的,不过换了个名字。ES里面的refresh就是Lucene里面的flush;ES里面的flush就是Lucene里面的commit。所以,ES里面的refresh默认1秒执行一次,也就是数据写入ES之后最快1秒之后可以被搜索到,这也就是ES提供的近实时搜索NRT(Near Realtime)。而flush的执行时机有两个点:一个是ES会根据内存使用情况启发式的执行flush,另外一个时机是当translog达到512MB(默认值)时执行一次flush。网上很多文章(一般都比较早了)都提到每30分钟也会执行一次,但我在6.6版本的代码及文档里面没有找到这部分说明。

这里提到了translog,下面我们来介绍一下translog。

translog

仔细想一下上面介绍的Lucene的flush和commit,就会发现如果在数据还没有被commit之前,机器宕掉了,那上次commit之后到宕机前的数据就丢了。实际上,Lucene也是这么做的,异常恢复时会丢掉最后一次commit之后的数据(如果有的话)。这对于绝大多数业务是不能接受的,所以ES引入了translog来解决这个问题。其实也不算什么新技术,就是类似于传统DB里面的预写日志(WAL),不过在ES里面叫事务日志(transaction log),简称translog。

这样ES的写入的时候,先在内存buffer中进行Lucene documents的写入,写入成功后再写translog。内存buffer中的Lucene documents经过refresh会形成file system cache中的segment,此时,内容就可以被搜索到了。然后经过flush,持久化到磁盘上面。整个流程如下图:

写入流程

这里有几个问题需要说明一下。

  1. 为了保证数据完全的可靠,一般的写入流程都是先写WAL,再写内存,但ES是先写内存buffer,然后再写translog。这个顺序目前没有找到官方的说明,网上大部分说的是写的过程比较复杂,容易出错,先写内存可以降低处理的复杂性。不过这个顺序个人认为对于用户而言其实不是很关键,因为不管先写谁,最终两者都写成功才会返回给客户端。
  2. translog的落盘(即图中的fsync过程)有两种策略,分别对应不同的可靠程度。第一种是每次请求(一个index、update、delete或者bulk操作)都会执行fsync,fsync成功后,才会给客户端返回成功,也就是请求同步刷盘,这种可靠性最高,只要返回成功,那数据一定已经落盘了,这也是默认的方式。第二种是异步的,按照定时达量的方式,默认每5秒或者512MB的时候就fsync一次。异步一般可以获得更高的吞吐量,但弊端是存在数据丢失的风险。
  3. ES的flush(或者Lucene的commit)也是落盘,为什么不直接用,而加一个translog?translog或者所有的WAL的一大特性就是他们都是追加写,这样大多数时候都可以实现顺序读写,读和写可以完全并发,所以性能一般都是非常好的。有一些测试表明,磁盘的顺序写甚至比内存的随机写更快,见The Pathologies of Big Data的Figure 3。
  4. translog不是全局的,而是每个shard(也就是Lucene的index)一个,且每个shard的所有translog中同一时刻只会有一个translog文件是可写的,其它都是只读的(如果有的话)。具体细节可查看Translog类的Java doc说明。
  5. translog的老化机制在6.0之前是segment flush到磁盘后,就删掉了。6.0之后是按定时达量的策略进行删除,默认是512MB或者12小时。

因为ES的版本更迭比较快,配置项也是经常变更,所以上面没有列出具体的配置项,这里的默认值都来自 ES 6.8 translog文档,有兴趣的可以点击链接查看。

副本

引入translog之后,解决了进程突然挂掉或者机器突然宕机导致还处于内存,没有被持久化到磁盘的数据丢失的问题,但数据仅落到磁盘还是无法完全保证数据的安全,比如磁盘损坏等。分布式领域解决这个问题最直观和最简单的方式就是采用副本机制,比如HDFS、Kafka等都是此类代表,ES也使用了副本的机制。一个索引由一个primary和n(n≥0)个replica组成,replica的个数支持可以通过接口动态调整。为了可靠,primary和replica的shard不能在同一台机器上面。

这里要注意区分一下replica和shard的关系:比如创建一个索引的时候指定了5个shard,那么此时primary分片就有5个shard,每个replica也会有5个shard。我们可以通过接口随意修改replica的个数,但不能修改每个primary/replica包含5个shard这个事实。 当然,shard的个数也可以通过shrink接口进行调整,但这是一个很重的操作,而且有诸多限制,比如shard个数只能减少,且新个数是原来的因子,比如原来是8个shard,那只能选择调整为4、2或1个;如果原来是5个,那就只能调整为1个了。所以实际中,shard的个数一般要预先计划好(经验值是保证一个shard的大小在30~50GB之间),而replica的个数可以根据实际情况后面再做调整。

在数据写入的时候,数据会先写primary,写成功之后,同时分发给多个replica,待所有replica都返回成功之后,再返回给客户端。所以一个写请求的耗时可以粗略认为是写primary的时间+耗时最长的那个replica的时间。

引申:写入优化

总体来说,ES的写入能力不算太好,所以经常需要对写入性能做优化,除了保证良好的硬件配置外,还可以从ES自身进行机制进行优化,结合上面的介绍,可以很容易得出下面的一些优化手段:

  1. 如果对于搜索的实时性要求不高,可以适当增加refresh的时间,比如从默认的1秒改为30秒或者1min,甚至更长。如果是离线导入再搜索的场景,可以直接设置为"-1",即关闭自动的refresh,等导入完成后,通过接口手动refresh。其提高性能的原理是增加refresh的时间可以减少大量小的segment文件,这样在可以提高flush的效率,减小merge的压力。
  2. 如果对于数据可靠性要求不是特别高,可以将translog的落盘机制由默认的请求同步落盘,改为定时达量的异步落盘,提高落盘的效率。
  3. 如果对于数据可靠性要求不是特别高,可以在写入高峰期先不设置副本,待过了高峰之后再通过接口增加副本。这个可以通过ES的ILM策略,实现自动化。

优化往往都是有代价的,需要根据自己的实际场景进行评估。

引申:写入流程代码分析

本来计划是写一篇从原理、源码分析ES关于写入流程的文章,但发现已经有一些比较好的文章了,就换了个角度,从使用角度宏观介绍了用户一般比较关心的数据可靠性问题。这里推荐两篇介绍ES写入流程的文章:

这两篇文章都结合代码对写入流程进行了深入解析,文章非常不错,这里我再补充一些细节,供有兴趣debug源码的读者阅读。源码阅读环境搭建见我之前的文章IDEA或Eclipse中编译调试ElasticSearch源码,里面使用的ES版本是6.6(6.6.3 snapshot),下面代码的说明也基于这个版本。

ES提供两种类型的接口:对外的REST接口,对内的Transport接口。早一点的版本中,transport接口也开放给外部客户端使用,但现在已经逐步只用于内部节点之间通信了。不过REST接口其实只是在Transport接口进行了封装,请求到ES之后还是会转发到Transport接口对应的处理器去处理。REST和Transport接口的处理器都是在ActionModule类中注册的:

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
    
    // 省略部分代码

    ActionRegistry actions = new ActionRegistry();

    // Transport接口处理器注册
    actions.register(MainAction.INSTANCE, TransportMainAction.class);
    actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
    actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
    actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
    actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
    actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
    actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
    actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
    actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

    // 省略部分代码
}


public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
    List<AbstractCatAction> catActions = new ArrayList<>();
    Consumer<RestHandler> registerHandler = a -> {
        if (a instanceof AbstractCatAction) {
            catActions.add((AbstractCatAction) a);
        }
    };

    // REST接口处理器注册
    registerHandler.accept(new RestMainAction(settings, restController));
    registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
    registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
    registerHandler.accept(new RestNodesStatsAction(settings, restController));
    registerHandler.accept(new RestNodesUsageAction(settings, restController));
    registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
    registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
    registerHandler.accept(new RestClusterStatsAction(settings, restController));
    registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter));
    registerHandler.accept(new RestClusterHealthAction(settings, restController));
    // 省略部分代码
}

所以我们要调试某个功能的时候其实只要在这里找到REST接口的处理器以及其对应的Transport接口对应的处理器即可。这里以数据写入为例,ES的写入主要分单条的index和批量的bulk操作,单条其实是bulk的一种特殊情况,所以处理复用的是bulk的逻辑。

写入的REST接口处理器是RestIndexAction

// 单条
registerHandler.accept(new RestIndexAction(settings, restController));
// 批量
registerHandler.accept(new RestBulkAction(settings, restController));

它们的构造函数里面也明确的定义了我们使用的REST接口:

public RestIndexAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
    controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
    controller.registerHandler(POST, "/{index}/{type}/{id}", this);
    CreateHandler createHandler = new CreateHandler(settings);
    controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
    controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
}

public RestBulkAction(Settings settings, RestController controller) {
    super(settings);

    controller.registerHandler(POST, "/_bulk", this);
    controller.registerHandler(PUT, "/_bulk", this);
    controller.registerHandler(POST, "/{index}/_bulk", this);
    controller.registerHandler(PUT, "/{index}/_bulk", this);
    controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
    controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

    this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
}

这两个接口最终都会调用TransportBulkAction,该类会将bulk中的操作按照shard进行分类,然后交给TransportBulkAction执行。这里我以index操作为例列出调用链上的关键类和方法,供参考:

  1. RestIndexAction#prepareRequest:解析参数,构建IndexRequest,然后转发给TransportBulkAction#doExecute执行
  2. TransportBulkAction

    • doExecute:检查是否需要创建索引,如果需要则创建
    • executeIngestAndBulk:检查是否需要执行pipeline,如果有且本节点是ingest节点,则执行,否则转发到ingest节点
    • executeBulk->BulkOperation#doRun:按需生成doc id,计算操作在哪个shard执行,按照shard进行分组
  3. ReplicationOperation#execute:写主分片、写副本分片,等待返回。下面给出写主分片的调用,副本分片同理。

    • primaryResult = primary.perform(request); -> TransportReplicationAction#perform -> TransportShardBulkAction#shardOperationOnPrimary -> TransportShardBulkAction#performOnPrimary -> TransportShardBulkAction#executeBulkItemRequest ->

      TransportShardBulkAction#executeIndexRequestOnPrimary ->

      IndexShard#applyIndexOperationOnPrimary -> IndexShard#index ->

      InternalEngine#index

这里看下最后的InternalEngine#index部分关键代码:

 @Override
public IndexResult index(Index index) throws IOException {    
    
    // 省略部分代码
    
    final IndexResult indexResult;
    if (plan.earlyResultOnPreFlightError.isPresent()) {
        indexResult = plan.earlyResultOnPreFlightError.get();
        assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
    } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
        // 这里会调用 Lucene 的接口
        indexResult = indexIntoLucene(index, plan);
    } else {
        indexResult = new IndexResult(
                plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
    }
    if (index.origin().isFromTranslog() == false) {
        final Translog.Location location;
        if (indexResult.getResultType() == Result.Type.SUCCESS) {
            // Lucene写成功后写 translog
            location = translog.add(new Translog.Index(index, indexResult));
        } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
            // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
            final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
                index.startTime(), indexResult.getFailure().toString());
            location = innerNoOp(noOp).getTranslogLocation();
        } else {
            location = null;
        }
        indexResult.setTranslogLocation(location);
    }
        
    // 省略部分代码
    
}

在indexIntoLucene里面就可以看到熟悉的Lucene接口IndexWriter了:

private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
    if (docs.size() > 1) {
        indexWriter.addDocuments(docs);
    } else {
        indexWriter.addDocument(docs.get(0));
    }
    numDocAppends.inc(docs.size());
}


private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
    if (softDeleteEnabled) {
        if (docs.size() > 1) {
            indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
        } else {
            indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
        }
    } else {
        if (docs.size() > 1) {
            indexWriter.updateDocuments(uid, docs);
        } else {
            indexWriter.updateDocument(uid, docs.get(0));
        }
    }
    numDocUpdates.inc(docs.size());
}

以上就是一些关键代码。

总结

ES的数据可靠性是从两个维度进行保证的:一方面,通过增加translog,解决了因为进程挂掉和机器宕机引发的内存数据丢失的问题,另一方面,通过副本机制解决了单点故障(当然还可以分担查询的压力。

]]>
<![CDATA[Lucene系列(10)——相似度评分机制浅析(终篇)]]> http://niyanchun.com/lucene-learning-10.html 2019-11-23T17:22:00+08:00 2019-11-23T17:22:00+08:00 NYC https://niyanchun.com 注:本文基于Lucene 8.2.0 版本。

本文是Lucene系列的终篇,在这篇文章中,我们会简单聊一下Lucene的相似度评分机制。

TF-IDF

Bag-of-words模型

先介绍一下NLP和IR领域里面非常简单且使用极其广泛的bag-fo-words model,即词袋模型。假设有这么一句话:"John likes to watch movies. Mary likes movies too."。那这句话用JSON格式的词袋模型表示的话就是:

BoW = {"John":1,"likes":2,"to":1,"watch":1,"movies":2,"Mary":1,"too":1};

可以看到,词袋模型关注的是词的出现次数,而没有记录词的位置信息。所以不同的语句甚至相反含义的语句其词袋可能是一样的,比如"Mary is quicker than John""John is quicker than Mary"这两句话,其词袋是一样的,但含义是完全相反的。所以凡是完全基于词袋模型的一些算法一般也存在这样该问题。

Term frequency

词频就是一个词(term)在一个文档中(document)出现的次数(frequency),记为 $tf_{t,d}$。这是一种最简单的定义方式,实际使用中还有一些变种:

  • 布尔词频:如果词在文档中出现,则 $tf_{t,d}=1$,否则为0。
  • 根据文档长短做调整的词频:$tf_{t,d}/length$,其中length为文档中的总词数。
  • 对数词频:$log(1+tf_{t,d})$,加1是防止对0求对数(0没有对数)。 一般选取常用对数或者自然对数。

词频的优点是简单,但缺点也很显然:

  1. 词频中没有包含词的位置信息,所以从词频的角度来看,"Mary is quicker than John""John is quicker than Mary"两条文档是完全一致的,但显然它们的含义是完全相反的。
  2. 词频没有考虑不同词的重要性一般是不一样的,比如停用词的词频都很高,但它们并不重要。

Inverse document frequency

一个词的逆文档频率用于衡量该词提供了多少信息,计算方式定义如下:

$$ idf_t = log\frac{N}{df_t}=-log\frac{df_t}{N} $$

其中,$t$ 代表term,$D$ 代表文档,$N$ 代表语料库中文档总数,$df_t$ 代表语料库中包含 $t$ 的文档的数据,即文档频率(document frequency)。如果语料库中不包含 $t$,那 $df_t$ 就等于0,为了避免除零操作,可以采用后面的公式,将 $df_t$ 作为分子,也有的变种给 $df_t$ 加了1.

对于固定的语料库,N是固定的,一个词的 $df_t$ 越大,其$idf(t, D)$ 就越小。所以那些很稀少的词的 $idf$ 值会很高,而像停用词这种出现频率很高的词 $idf$ 值很低。

TF-IDF

TF-IDF就是将TF和IDF结合起来,其实就是简单的相乘:

$$ tfidf(t, d)=tf_{t,d} \cdot idf_t $$

从公式可以分析出来,一个词 $t$ 在某个文档 $d$ 中的tf-idf值:

  • 当该词在少数文档出现很多次的时候,其值接近最大值;(场景1
  • 当该词在文档中出现次数少或者在很多文档中都出现时,其值较小;(场景2
  • 当该词几乎在所有文档中都出现时,其值接近最小值。(场景3

下面用一个例子来实战一下,还是以《Lucene系列(2)——代码实践》文中的4首英文短诗中的前3首为例。假设这3首诗组成了我们的语料库,每首诗就是一个文档(doc1:Fog、doc2:Freedom And Love、doc3:Love's Secret),诗里面的每个单词就是一个个词(我们把标题也包含在里面)。然后我们选取"the"、 "freedom"、"love"三个词来分别计算它们在每个文档的TF-IDF,计算中使用自然对数形式。

  1. "the"在doc1中出现了1次,在doc2中出现了2次,在doc3中出现了1次,整个语料库有3个文档,包含"the"的文档也是3个。所以:

    • doc1: $tf=ln(1+1), idf=ln\frac{3}{3}, tfidf("the","doc1")= tf \cdot idf =ln2 \cdot ln1=0$
    • doc2: $tf=ln(1+2), idf=ln\frac{3}{3}, tfidf("the","doc2")=tf \cdot idf=ln3 \cdot ln1=0$
    • doc3: $tf=ln(1+1), idf=ln\frac{3}{3}, tfidf("the","doc3")=tf \cdot idf=ln2 \cdot ln1=0$
  2. "freedom"在doc1中出现了0次,在doc2中出现了1次,在doc3中出现了0次,语料库中包含"freedom"的文档只有1个。所以:

    • doc1: $tf=ln(1+0), idf=ln\frac{3}{1}, tfidf("freedom","doc1")= tf \cdot idf =ln1 \cdot ln3=0$
    • doc2: $tf=ln(1+1), idf=ln\frac{3}{1}, tfidf("freedom","doc2")= tf \cdot idf =ln2 \cdot ln3=0.76$
    • doc3: $tf=ln(1+0), idf=ln\frac{3}{1}, tfidf("freedom","doc3")= tf \cdot idf =ln1 \cdot ln3=0$
  3. "love"在doc1中现了0次,在doc2中出现了3次,在doc3中出现了5次,整个语料库有3个文档,包含"love"的文档有2个。所以:

    • doc1: $tf=ln(1+0), idf=ln\frac{3}{2}, tfidf("love","doc1")= tf \cdot idf =ln1 \cdot ln\frac{3}{2}=0$
    • doc2: $tf=ln(1+3), idf=ln\frac{3}{2}, tfidf("love","doc2")= tf \cdot idf =ln4 \cdot ln\frac{3}{2}=0.56$
    • doc3: $tf=ln(1+5), idf=ln\frac{3}{2}, tfidf("love","doc3")= tf \cdot idf =ln6 \cdot ln\frac{3}{2}=0.73$

我们简单分析一下结果:"the"在所有文档中都出现了,所以其tf-idf值最低,为0,验证了上面公式分析中的场景3;"freedom"只有在第2个文档中出现了,所以其它两个的tf-idf值为0,表示不包含该词;"love"在第2、3个文档中都出现了,但在第3个文档中出现的频率更高,所以其tf-idf值最高。所以tf-idf算法的结果还是能很好的表示实际结果的。

Vector Space Model

通过TF-IDF算法,我们可以计算出每个词在语料库中的权重,而通过VSM(Vector Space Model),则可以计算两个文档的相似度。

假设有两个文档:

  • 文档1:"Jack Ma regrets setting up Alibaba."
  • 文档2:"Richard Liu does not know he has a beautiful wife."

这是原始的文档,然后通过词袋模型转化后为:

  • BoW1 = {"jack":1, "ma":1, "regret":1, "set":1, "up":1, "alibaba":1}
  • BoW2 = {"richard":1, "liu":1, "does":1, "not":1, "know":1, "he":1, "has":1, "a": 1, "beautiful":1, "wife":1}

接着,分别用TF-IDF算法计算每个文档词袋中每个词的tf-idf值(值是随便写的,仅供原理说明):

  • tf-idf_doc1 = { 0.41, 0.12, 0.76, 0.83, 0.21, 0.47 }
  • tf-idf_doc2 = { 0.12, 0.25, 0.67, 0.98, 0.43, 0.76, 0.89, 0.51, 0.19, 0.37 }

如果将上面的tf-idf_doc1和tf-idf_doc2看成是2个向量,那我们就通过上面的方式将原始的文档转换成了向量,这个向量就是VSM中的Vector。在VSM中,一个Vector就代表一个文档(记为 $V(q)$),Vector中的每个值就是原来文档中term的权重(这个权重一般使用tf-idf计算,也可以通过其他方式计算)。这样语料库中的很多文档就会产生很多的向量,这些向量一起构成了一个向量空间,也就是Vector Space。

假设有一个查询语句为"Jack Alibaba",我们可以用同样的方式将其转化一个向量,假设这个向量叫查询向量 $V(q)$。这样在语料库中检索和 $q$ 相近文档的问题就转换成求语料库中每个向量 $V(d)$ 与 $V(q)$ 的相似度问题了。而衡量两个向量相似度最常用的方法就是余弦相似度(Cosine similarity),以下内容来自维基百科:

余弦相似性通过测量两个向量的夹角的余弦值来度量它们之间的相似性。0度角的余弦值是1,而其他任何角度的余弦值都不大于1;并且其最小值是-1。从而两个向量之间的角度的余弦值确定两个向量是否大致指向相同的方向。两个向量有相同的指向时,余弦相似度的值为1;两个向量夹角为90°时,余弦相似度的值为0;两个向量指向完全相反的方向时,余弦相似度的值为-1。这结果是与向量的长度无关的,仅仅与向量的指向方向相关。余弦相似度通常用于正空间,因此给出的值为0到1之间。

用公式表示就是:

$$ cosineSimilarity(q,d)=\frac{V(q) \cdot V(d)}{|V(q)||V(d)|}=v(q) \cdot v(d) $$

其中,$V(q)$是查询向量,$V(d)$是文档向量,$|V(q)|$和$|V(q)|$是两个向量的长度,$v(q)$和$v(q)$则是对应的单位向量。

这个就是Vector Space Model。

TFIDFSimilarity

Lucene使用Boolean model (BM) of Information Retrieval模型来计算一个文档是否和搜索词匹配,对于匹配的文档使用基于VSM的评分算法来计算得分。具体的实现类是org.apache.lucene.search.similarities.TFIDFSimilarity,但做了一些修正。本文不讨论BM算法,只介绍评分算法。TFIDFSimilarity采用的评分公式如下:

$$ Score(q, d)=\sum_{t\in q}(tf_{t, d} \cdot {idf_t}^2 \cdot t.getBoost() \cdot norm(t, d)) $$

我们从外到内剖析一下这个公式。

  1. 最外层的累加。搜索语句一般是由多个词组成的,比如"Jack Alibaba"就是有"Jack"和"Alibaba"两个词组成。计算搜索语句和每个匹配文档的得分的时候就是计算搜索语句中每个词和匹配文档的得分,然后累加起来就是搜索语句和该匹配文档的得分。这就是最外层的累加。
  2. $t.getBoost()$。之前的系列文章中介绍过,在查询或者索引阶段我们可以人为设定某些term的权重,t.getBoost()获取的就是这个阶段设置的权重。所以查询或索引阶段设置的权重也就是在这个时候起作用的。
  3. $norm(t, d)$。之前的系列文章中也介绍过,查询的时候一个文档的长短也是会影响词的重要性,匹配次数一样的情况下,越长的文档评分越低。这个也好理解,比如我们搜"Alibaba",有两个文档里面都出现了一次该词,但其中一个文档总共包含100万个词,而另外一个只包含10个词,很显然,极大多数情况下,后者与搜索词的相关度是比前者高的。实际计算的时候使用的公式如下:

    $$ norm(t,d) = \frac{1}{\sqrt{length}} $$

    其中 $length$是文档 $d$ 的长度。

  4. $tf_{t, d} \cdot {idf_t}^2$。Lucene假设一个词在搜索语句中的词频为1(即使出现多次也不影响,就是重复计算多次而已),所以可以把这个公式拆开写:

    $$ tf_{t, d} \cdot {idf_t}^2 = tf_{t, d} \cdot {idf_t} \cdot 1 \cdot {idf_t}=(tf_{t, d} \cdot {idf_t}) \cdot (tf_{t, q} \cdot {idf_t}) $$

    这里的$(tf_{t, d} \cdot {idf_t}) \cdot (tf_{t, q} \cdot {idf_t})$就对应上面的$v(d) \cdot v(q)$.

    在Lucene中,采用的TF计算公式为:

    $$ tf_{t,d} = \sqrt {frequency} $$

    IDF计算公式为:

    $$ idf_t =1+log\frac{N+1}{df_t+1} $$

其实TFIDFSimilarity是一个抽象类,真正实现上述相似度计算的是org.apache.lucene.search.similarities.ClassicSimilarity类,上面列举的公式在其对应的方法中也可以找到。除了基于TFIDF这种方式外,Lucene还支持另外一种相似度算法BM25,并且从6.0.0版本开始,BM25已经替代ClassicSimilarity,作为默认的评分算法。下面就来看看BM25.

BM25Similarity

BM25全称“Best Match 25”,其中“25”是指现在BM25中的计算公式是第25次迭代优化。该算法是几位大牛在1994年TREC-3(Third Text REtrieval Conference)会议上提出的,它将文本相似度问题转化为概率模型,可以看做是TF-IDF的改良版,我们看下它是如何进行改良的。

对IDF的改良

BM25中的IDF公式为:

$$ idf_t^{BM25} = log(1+\frac{N-df_t+0.5}{df_t+0.5}) $$

注意:原版BM25的log中是没有加1的,Lucene为了防止产生负值,做了一点小优化。

虽然对公式进行了更改,但其实和原来的公式没有实质性的差异,下面是新旧函数曲线对比:

idf-comparation

对TF的改良1

BM25中TF的公式为:

$$ tf_{t,d}^{BM25}=((k+1)*tf)/(k+tf) $$

其中$tf$是传统的词频值。先来看下改良前后的函数曲线对比吧(下图中$k=1.2$):

tf-comparation

可以看到,传统的tf计算公式中,词频越高,tf值就越大,没有上限。但BM中的tf,随着词频的增长,tf值会无限逼近$(k+1)$,相当于是有上限的。这就是二者的区别。一般 $k$ 取 1.2,Lucene中也使用1.2作为 $k$ 的默认值。

对TF的改良2

在传统的计算公式中,还有一个norm。BM25将这个因素加到了TF的计算公式中,结合了norm因素的BM25中的TF计算公式为:

$$ tf_{t,d}^{BM25}=((k+1)*tf)/(k*(1.0-b+b*L)+tf) $$

和之前相比,就是给分母上面的 $k$ 加了一个乘数 $(1.0-b+b*L)$. 其中的 $L$ 的计算公式为:

$$ L = \frac{|d|}{avgDl} $$

其中,$|d|$是当前文档的长度,$avgDl$ 是语料库中所有文档的平均长度。

$b$ 是一个常数,用来控制 $L$ 对最总评分影响的大小,一般取0~1之间的数(取0则代表完全忽略 $L$ )。Lucene中 $b$ 的默认值为 0.75.

通过这些细节上的改良,BM25在很多实际场景中的表现都优于传统的TF-IDF,所以从Lucene 6.0.0版本开始,上位成为默认的相似度评分算法。

Reference

]]>
<![CDATA[Lucene系列(9)——QueryParser介绍]]> http://niyanchun.com/lucene-learning-9.html 2019-11-02T11:17:00+08:00 2019-11-02T11:17:00+08:00 NYC https://niyanchun.com 注:本文基于Lucene 8.2.0 版本。

本文介绍一个比较“特殊”的查询API——QueryParser,它的特殊之处在于定义了一些查询语法,通过这些语法几乎可以实现前文介绍的所有Query API提供的功能,但它的存在并不是为了替换那些API,而是用在一些交互式场景中。本文不会再细述Lucene各个查询的含义及用法(比如什么是edit distance),所以如果你还不熟悉,请务必先阅读《Lucene系列(8)——常用Query介绍》一文。

QueryParser概述

其实在《Lucene系列(2)——代码实践》一文中我们已经使用过QueryParser进行查询了,这里再贴一下部分代码:

/**
 * Minimal Search Files code
 **/
public class SearchFilesMinimal {

    public static void main(String[] args) throws Exception {
        // 索引保存目录
        final String indexPath = "/Users/allan/Git/allan/github/CodeSnippet/Java/lucene-learning/indices/poems-index";
        // 搜索的字段
        final String searchField = "contents";

        // 从索引目录读取索引信息
        IndexReader indexReader = DirectoryReader.open(FSDirectory.open(Paths.get(indexPath)));
        // 创建索引查询对象
        IndexSearcher searcher = new IndexSearcher(indexReader);
        // 使用标准分词器
        Analyzer analyzer = new StandardAnalyzer();

        // 从终端获取查询语句
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        // 创建查询语句解析对象
        QueryParser queryParser = new QueryParser(searchField, analyzer);
        while (true) {
            System.out.println("Enter query: ");

            String input = in.readLine();
            if (input == null) {
                break;
            }

            input = input.trim();
            if (input.length() == 0) {
                break;
            }

            // 解析用户输入的查询语句:build query
            Query query = queryParser.parse(input);
            System.out.println("searching for: " + query.toString(searchField));
            // 查询
            TopDocs results = searcher.search(query, 10);
            // 省略后面查询结果打印的代码
            }
        }
    }
}

在这段代码中,先读取了已经创建好的索引文件,然后创建了一个QueryParser实例(queryParser)。接着不断读取用户输入(input),并传给QueryParser的parse方法,该方法通过用户的输入构建一个Query对象用于查询。

QueryParser的构造函数为QueryParser(String f, Analyzer a),第1个参数指定一个默认的查询字段,如果后面输入的input里面没有指定查询字段,则默认查询该该字段,比如输入hello表示在默认字段中查询"hello",而content: hello则表示在content字段中查询"hello"。第2个参数指定一个分析器,一般该分析器应该选择和索引阶段同样的Analyzer。

另外有两个点需要特别注意:

  1. QueryParser默认使用TermQuery进行多个Term的OR关系查询(后文布尔查询那里会再介绍)。比如输入hello world,表示先将hello world分词(一般会分为hello和world两个词),然后使用TermQuery查询。如果需要全词匹配(即使用PhraseQuery),则需要将搜索词用双引号引起来,比如"hello world"
  2. 指定搜索字段时,该字段仅对紧随其后的第一个词或第一个用双引号引起来的串有效。比如title:hello world这个输入,title仅对hello有效,即搜索时只会在title字段中搜索hello,然后在默认搜索字段中搜索world。如果想要在一个字段中搜索多个词或多个用双引号引起来的词组时,将这些词用小括号括起来即可,比如title:(hello world)

下面通过一些例子看一下如何通过QueryParser提供的语法在一个输入串中实现前文介绍的各种搜索。

QueryParser语法

Wildcard搜索

通配符搜索和WildcardQuery API一样,仅支持?*两个通配符,前者用于匹配1个字符,后者匹配0到多个字符。输入title:te?t,则可以匹配到title中的"test"、"text"等词。

注意:使用QueryParser中的wildcard搜索时,不允许以?*开头,否则会抛异常,但直接使用WildcardQuery API时,允许以通配符开头,只是因为性能原因,不推荐使用。这样设计的原因我猜是因为QueryParser的输入是面向用户的,用户对于通配符开头造成的后果并不清楚,所以直接禁掉;而WildcardQuery是给开发者使用的,开发者在开发阶段很清楚如果允许这样做造成的后果是否可以接受,如果不能接受,也是可以通过接口禁掉开头就是用通配符的情况。

Regexp搜索

正则搜索和RegexpQuery一样,不同之处在于QueryParser中输入的正则表达式需要使用两个斜线("/")包围起来,比如匹配"moat"或"boat"的正则为/[mb]oat/

Fuzzy搜索

在QueryParser中,通过在搜索词后面加波浪字符来实现FuzzyQuery,比如love~,默认edit distance是2,可以在波浪符后面加具体的整数值来修改默认值,合法的值为0、1、2.

Phrase slop搜索

PhraseQuery中可以指定slop(默认值为0,精确匹配)来实现相似性搜索,QueryParser中同样可以,使用方法与Fuzzy类似——将搜索字符串用双引号引起来,然后在末尾加上波浪符,比如"jakarta apache"~10。这里对数edit distance没有限制,合法值为非负数,默认值为0.

Range搜索

QueryParser的范围搜索同时支持TermRangeQuery和数值型的范围搜索,排序使用的是字典序开区间使用大括号,闭区间使用方括号。比如搜索修改日期介于2019年9月份和10月份的文档:mod_date:[20190901 TO 20191031],再比如搜索标题字段中包含hatelove的词(但不包含这两个词)的文档:title:{hate TO love}.

提升权重(boost)

查询时可以通过给搜索的关键字或双引号引起来的搜索串后面添加脱字符(^)及一个正数来提升其计算相关性时的权重(默认为1),比如love^5 China"love China"^0.3

Boolean操作符

QueryParser中提供了5种布尔操作符:AND+ORNOT-,所有的操作符必须大写

  • OR是默认的操作符,表示满足任意一个term即可。比如搜索love ChinaloveChina之间就是OR的关系,检索时文档匹配任意一个词即视为匹配。OR也可以使用可用||代替。
  • AND表示必须满足所有term才可以,可以使用&&代替。
  • +用在term之前,表示该term必须存在。比如+love China表示匹配文档中必须包含loveChina则可包含也可不含。
  • -用在term之前,表示该term必须不存在。比如-"hate China" "love China"表示匹配文档中包含"love China",但不包含"hate China"的词。

分组

前面已经介绍过,可以使用小括号进行分组,通过分组可以表达一些复杂的逻辑。举两个例子:

  • (jakarta OR apache) AND website表示匹配文档中必须包含webiste,同时需要至少包含jakartaapache二者之一。
  • title:(+return +"pink panther")表示匹配文档中的title字段中必须同时存在return"pink panther"串。

特殊字符

从前面的介绍可知,有很多符号在QueryParser中具有特殊含义,目前所有的特殊符号包括:+- && || ! ( ) { } [ ] ^ " ~ * ? : \ /。如果搜索关键字中存在这些特殊符号,则需要使用反斜线(\)转义。比如搜索(1+1)*2则必须写为\(1\+1\)\*2

相比于Lucene的其它搜索API,QueryParser提供了一种方式,让普通用户可以不需要写代码,只是掌握一些语法就可以进行复杂的搜索,在一些交互式检索场景中,还是非常方便的。

References

]]>
<![CDATA[三个臭皮匠不如一个诸葛亮之DisjunctionMaxQuery查询介绍]]> http://niyanchun.com/DisjunctionMaxQuery-introduction.html 2019-10-27T08:59:28+08:00 2019-10-27T08:59:28+08:00 NYC https://niyanchun.com 本文介绍Lucene/ElasticSearch/Solr中的DisjunctionMaxQuery,这里我先给出Lucene 8.2.0版本JavaDoc对于该查询接口的描述:

A query that generates the union of documents produced by its subqueries, and that scores each document with the maximum score for that document as produced by any subquery, plus a tie breaking increment for any additional matching subqueries. This is useful when searching for a word in multiple fields with different boost factors (so that the fields cannot be combined equivalently into a single search field). We want the primary score to be the one associated with the highest boost, not the sum of the field scores (as BooleanQuery would give). If the query is "albino elephant" this ensures that "albino" matching one field and "elephant" matching another gets a higher score than "albino" matching both fields. To get this result, use both BooleanQuery and DisjunctionMaxQuery: for each term a DisjunctionMaxQuery searches for it in each field, while the set of these DisjunctionMaxQuery's is combined into a BooleanQuery. The tie breaker capability allows results that include the same term in multiple fields to be judged better than results that include this term in only the best of those multiple fields, without confusing this with the better case of two different terms in the multiple fields.

如果你已经知道DisjunctionMaxQuery的含义,就很容易理解上面这段话:该查询生成多个子查询的合集,对于一个文档,如果同时匹配多个子查询,则取其中评分最高的那个子查询的评分作为每个文档的最终评分。有些绕,直接通过例子来看这个查询是用来解决什么问题的。看完之后,你也就明白上面再说什么了。

一个例子

为了方便,这里以ES为例进行说明。先创建一个名为 dis-max-test 的索引,并插入2条文档,每个文档包含一个 nameintroduction 字段:

// 插入数据
PUT dis-max-test/_bulk
{ "index": {}}
{ "name": "William Henry Gates III, Bill Gates", "introduction": "Founder of Microsoft Corporation."}        // 第一条数据:Bill Gates的信息
{ "index": {}}
{ "name": "Melinda Gates", "introduction": "Wife of Gates, a former general manager at Microsoft."}            // 第二条数据:Melinda Gates的信息

假设现在我们想搜索和“Bill Gates”相关的内容,则可以通过如下语句方式进行搜索:

# 搜索语句
GET dis-max-test/_search
{
  "query": {
    "bool": {
      "should": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
      ]
    }
  }
}

上面这个语句的含义是搜索name或者introduction字段里面包含“Bill Gates”的文档,其查询结果如下:

# 搜索结果
{
  "took" : 4,
  "timed_out" : false,
  "hits" : {
    "max_score" : 0.8281169,
    "hits" : [
      {
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "Ge7O3W0BYOeS6h1DGlUi",
        "_score" : 0.8281169,        # 评分
        "_source" : {
          "name" : "Melinda Gates",
          "introduction" : "Wife of Gates, a former general manager at Microsoft."
        }
      },
      {
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "GO7O3W0BYOeS6h1DGlUi",
        "_score" : 0.7952278,         # 评分
        "_source" : {
          "name" : "William Henry Gates III, Bill Gates",
          "introduction" : "Founder of Microsoft Corporation."
        }
      }
    ]
  }
}

这个搜索结果是正确的,Match搜索的时候会把“Bill Gates”先分词,结果是BillGates,搜索的结果里面也都至少其中一个。但是有一点让人不是很满意,按照我们的搜索意图,上面的第二条结果才更贴近,因为它里面包含完整的“Bill Gates”。但结果它的评分却比第一条低(即匹配度低),排在了后面。在分析原因之前,我们换成Lucene的DisjunctionMaxQuery(在ES里面叫dis_max)来查询一下:

# 查询语句
GET dis-max-test/_search
{
  "query": {
    "dis_max": {
      "queries": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
        ]
    }
  }
}

dis_max由多个match组成,其查询条件和上面的bool-should相同,看下查询结果:

{
  "took" : 3,
  "timed_out" : false,
  "hits" : {
    "max_score" : 0.7952278,
    "hits" : [
      {
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "GO7O3W0BYOeS6h1DGlUi",
        "_score" : 0.7952278,
        "_source" : {
          "name" : "William Henry Gates III, Bill Gates",
          "introduction" : "Founder of Microsoft Corporation."
        }
      },
      {
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "Ge7O3W0BYOeS6h1DGlUi",
        "_score" : 0.59891266,
        "_source" : {
          "name" : "Melinda Gates",
          "introduction" : "Wife of Gates, a former general manager at Microsoft."
        }
      }
    ]
  }
}

可以看到,查询结果与之前的一样,区别在于完全包含“Bill Gates”一词的那条文档排在了前面,因为它的评分高于Melinda Gates的那条文档,这个结果也正是我们想要的。看到这里,你应该已经有一点感觉了,虽然dis_max和boolean-should的查询条件相近,但其对于结果的评分却不一样,似乎dis_max更贴近我们的搜索意图。下面来探索一下造成这种差别的原因。

原理分析

ES的查询中支持一个explain的参数,如果将其设置为true的话,查询结果中就会额外输出计算得分的过程(_explanation 部分)。该参数默认是false的,我们将其改为true,然后再执行一下上面的两个查询,来看看造成两种不同结果背后的细节。

先看Boolean-should查询:

# 查询语句
GET dis-max-test/_search
{
  "explain": true, 
  "query": {
    "bool": {
      "should": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
      ]
    }
  }
}

# 查询结果
{
  "took" : 5,
  "timed_out" : false,
  "hits" : {
    "max_score" : 0.8281169,
    "hits" : [
      {
        "_shard" : "[dis-max-test][0]",
        "_node" : "aIzM2bJFT_afjUgEMxWosg",
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "Ge7O3W0BYOeS6h1DGlUi",
        "_score" : 0.8281169,
        "_source" : {
          "name" : "Melinda Gates",
          "introduction" : "Wife of Gates, a former general manager at Microsoft."
        },
        "_explanation" : {
          "value" : 0.8281169,
          "description" : "sum of:",        # 注意这里!!!
          "details" : [
            {
              "value" : 0.22920427,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.22920427,
                  "description" : "weight(name:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            },
            {
              "value" : 0.59891266,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.59891266,
                  "description" : "weight(introduction:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            }
          ]
        }
      },
      {
        "_shard" : "[dis-max-test][0]",
        "_node" : "aIzM2bJFT_afjUgEMxWosg",
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "GO7O3W0BYOeS6h1DGlUi",
        "_score" : 0.7952278,
        "_source" : {
          "name" : "William Henry Gates III, Bill Gates",
          "introduction" : "Founder of Microsoft Corporation."
        },
        "_explanation" : {
          "value" : 0.7952278,
          "description" : "sum of:",
          "details" : [
            {
              "value" : 0.7952278,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.5754429,
                  "description" : "weight(name:bill in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                },
                {
                  "value" : 0.21978492,
                  "description" : "weight(name:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            }
          ]
        }
      }
    ]
  }
}

为了节省篇幅以及看的更清楚,省略了计算评分的细节,这部分后面有单独的文章介绍。在这个查询中,Melinda Gates对应文档的评分0.8281169,高于Bill Gates对应文档的评分0.7952278,即对于这个查询而言,ES认为Melinda Gates对应文档比Bill Gates对应文档更贴近我们的搜索词“Bill Gates”。其原因是这样的:

  • 对于Melinda Gates对应文档而言,它的评分0.8281169是由下面details数组里面两个子查询的评分0.22920427和0.59891266两个评分加来的(即description字段的"sum of" 含义):0.22920427这个评分是name字段中包含了Gates这个搜索词而得的,0.59891266这个评分是introduction字段中包含也包含Gates而得的。
  • 对于Bill Gates对应的文档而言,它的评分0.7952278是由下面的0.5754429和0.21978492相加而来。0.5754429是name中包含bill获得的,0.21978492是name中包含gates获得的。introduction字段中没有匹配项,所以没有得分。

这样我们就明白了为什么虽然Bill Gates的文档更贴近搜索意图,其评分却低的原因。因为对于Boolean查询而言,其总评分是多个子查询的评分相加而来的(上面查询结果json中details数组里面一个元素代表一个查询结果)。Melinda Gates文档中虽然没有bill,但却包含多个Gates,所以累加下来总评分就高。但实际中对于有些场景通过这种累加所有子查询的结果并不能准确的代表查询意图,就好比三个臭皮匠很多时候是顶不了一个诸葛亮的。

为了解决这个问题,就产生了本文的主角DisjunctionMaxQuery,看下面查询:

GET dis-max-test/_search
{
  "explain": true, 
  "query": {
    "dis_max": {
      "queries": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
        ]
    }
  }
}

# 查询结果
{
  "took" : 5,
  "timed_out" : false,
  "hits" : {
    "max_score" : 0.7952278,
    "hits" : [
      {
        "_shard" : "[dis-max-test][0]",
        "_node" : "aIzM2bJFT_afjUgEMxWosg",
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "GO7O3W0BYOeS6h1DGlUi",
        "_score" : 0.7952278,
        "_source" : {
          "name" : "William Henry Gates III, Bill Gates",
          "introduction" : "Founder of Microsoft Corporation."
        },
        "_explanation" : {
          "value" : 0.7952278,
          "description" : "max of:",        # 注意这里
          "details" : [
            {
              "value" : 0.7952278,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.5754429,
                  "description" : "weight(name:bill in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                },
                {
                  "value" : 0.21978492,
                  "description" : "weight(name:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            }
          ]
        }
      },
      {
        "_shard" : "[dis-max-test][0]",
        "_node" : "aIzM2bJFT_afjUgEMxWosg",
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "Ge7O3W0BYOeS6h1DGlUi",
        "_score" : 0.59891266,
        "_source" : {
          "name" : "Melinda Gates",
          "introduction" : "Wife of Gates, a former general manager at Microsoft."
        },
        "_explanation" : {
          "value" : 0.59891266,
          "description" : "max of:",       # 注意这里
          "details" : [
            {
              "value" : 0.22920427,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.22920427,
                  "description" : "weight(name:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            },
            {
              "value" : 0.59891266,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.59891266,
                  "description" : "weight(introduction:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            }
          ]
        }
      }
    ]
  }
}

这个和上边的类似,但是dis_max在计算最终评分的时候并不是累加各个匹配的子查询,而是取评分最高的子查询结果作为最终结果(即"description" : "max of:",这里注意区分一下,DisjunctionMaxQuery这一层取max,而子查询内层依旧使用的是sum的方式来计算评分)。

到这里,我们就明白DisjunctionMaxQuery查询的含义了,它和BooleanQuery类似,也由多个子查询组成。BooleanQuery计算一个匹配文档的总评分时,是累加所有子查询的评分,而DisjunctionMaxQuery则是取评分最高的那个子查询的评分作为文档的最终得分。

还拿臭皮匠为例,如果说诸葛亮的IQ是145,而三个臭皮匠的IQ分别为91,85,84。如果你问BooleanQuery是诸葛亮聪明还是三个臭皮匠聪明,那它会告诉你三个臭皮匠聪明,因为诸葛亮IQ是145,而三个臭皮匠的IQ是91+85+84=260。显然这样是不对的。但如果你问DisjunctionMaxQuery同样的问题,它则会告诉你诸葛亮聪明,因为诸葛亮的IQ是145,而三个臭皮匠的IQ是91.

当然呢,有时候人多还是力量大的。三个臭皮匠在一起不一定能胜过一个诸葛亮,但一般还是可以胜过他们之中任意一个人的,所以直接取最高的,忽略掉另外两个人的IQ有时候也不太合适,特别是他们如果技能领域各不相同的话。所以DisjunctionMaxQuery又提供了一个tie_breaker参数,该参数合法值范围为[0, 1],默认取0. 计算最终得分的时候,DisjunctionMaxQuery会取最高分,同时加上各个子查询的得分乘以tie_breaker的值。即不是像BooleanQuery那样粗暴相加,而是给非最高分的评分给一个权重,毕竟量变可能会引起质变,完全忽略也不是很合适。至于tie_breaker该设置多少,这个需要结合具体的使用场景。

还是上面的dis_max查询,但我们将tie_breaker由默认值0改为0.9,会发现它的查询结果也发生了变化:

# 查询
GET dis-max-test/_search
{
  "query": {
    "dis_max": {
      "queries": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
        ],
        "tie_breaker": 0.9
    }
  }
}

# 查询结果
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.80519646,
    "hits" : [
      {
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "Ge7O3W0BYOeS6h1DGlUi",
        "_score" : 0.80519646,
        "_source" : {
          "name" : "Melinda Gates",
          "introduction" : "Wife of Gates, a former general manager at Microsoft."
        }
      },
      {
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "GO7O3W0BYOeS6h1DGlUi",
        "_score" : 0.7952278,
        "_source" : {
          "name" : "William Henry Gates III, Bill Gates",
          "introduction" : "Founder of Microsoft Corporation."
        }
      }
    ]
  }
}

使用explain查看上述查询评分的过程:

GET dis-max-test/_search
{
  "explain": true, 
  "query": {
    "dis_max": {
      "queries": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
        ],
        "tie_breaker": 0.9
    }
  }
}

GET dis-max-test/_search
{
  "explain": true, 
  "query": {
    "dis_max": {
      "queries": [
        { "match": { "name": "Bill Gates"} },
        { "match": { "introduction": "Bill Gates"}}
        ],
        "tie_breaker": 0.9
    }
  }
}


# 查询结果
{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 0.80519646,
    "hits" : [
      {
        "_shard" : "[dis-max-test][0]",
        "_node" : "aIzM2bJFT_afjUgEMxWosg",
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "Ge7O3W0BYOeS6h1DGlUi",
        "_score" : 0.80519646,
        "_source" : {
          "name" : "Melinda Gates",
          "introduction" : "Wife of Gates, a former general manager at Microsoft."
        },
        "_explanation" : {
          "value" : 0.80519646,
          "description" : "max plus 0.9 times others of:",
          "details" : [
            {
              "value" : 0.22920427,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.22920427,
                  "description" : "weight(name:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            },
            {
              "value" : 0.59891266,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.59891266,
                  "description" : "weight(introduction:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            }
          ]
        }
      },
      {
        "_shard" : "[dis-max-test][0]",
        "_node" : "aIzM2bJFT_afjUgEMxWosg",
        "_index" : "dis-max-test",
        "_type" : "_doc",
        "_id" : "GO7O3W0BYOeS6h1DGlUi",
        "_score" : 0.7952278,
        "_source" : {
          "name" : "William Henry Gates III, Bill Gates",
          "introduction" : "Founder of Microsoft Corporation."
        },
        "_explanation" : {
          "value" : 0.7952278,
          "description" : "max plus 0.9 times others of:",
          "details" : [
            {
              "value" : 0.7952278,
              "description" : "sum of:",
              "details" : [
                {
                  "value" : 0.5754429,
                  "description" : "weight(name:bill in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                },
                {
                  "value" : 0.21978492,
                  "description" : "weight(name:gates in 0) [PerFieldSimilarity], result of:",
                  "details" : [/* 省略计算得分的细节 */]
                }
              ]
            }
          ]
        }
      }
    ]
  }
}

本文就介绍到这里。

References

]]>
<![CDATA[Lucene系列(8)——常用Query介绍]]> http://niyanchun.com/lucene-learning-8.html 2019-10-20T08:09:00+08:00 2019-10-20T08:09:00+08:00 NYC https://niyanchun.com 注:本文基于Lucene 8.2.0 版本。

搜索是使用Lucene的根本目的,本文介绍Lucene提供的常用查询。下面的讲述中,会以之前《Lucene系列(2)——代码实践》文章中4首短诗的索引数据为例进行查询,你可以先阅读那篇文章构建索引。在Lucene中,Term是查询的基本单元(unit),所有查询类的父类是org.apache.lucene.search.Query,本文会介绍下图中这些主要的Query子类:

Lucene Query

DisjunctionMaxQuery主要用于控制评分机制,SpanQuery代表一类查询,有很多的实现。这两类查询不是非常常用,放在以后的文章单独介绍。本文所有、示例的完整代码见这里

TermQuery

TermQuery是最基础最常用的的一个查询了,对应的类是org.apache.lucene.search.TermQuery。其功能很简单,就是查询哪些文档中包含指定的term。

看下面代码:

/**
 * Query Demo.
 *
 * @author NiYanchun
 **/
public class QueryDemo {

    /**
     * 搜索的字段
     */
    private static final String SEARCH_FIELD = "contents";

    public static void main(String[] args) throws Exception {
        // 索引保存目录
        final String indexPath = "indices/poems-index";
        // 读取索引
        IndexReader indexReader = DirectoryReader.open(FSDirectory.open(Paths.get(indexPath)));
        IndexSearcher searcher = new IndexSearcher(indexReader);

        // TermQuery
        termQueryDemo(searcher);
    }

    private static void termQueryDemo(IndexSearcher searcher) throws IOException {
        System.out.println("TermQuery, search for 'death':");
        TermQuery termQuery = new TermQuery(new Term(SEARCH_FIELD, "death"));

        resultPrint(searcher, termQuery);
    }

    private static void resultPrint(IndexSearcher searcher, Query query) throws IOException {
        TopDocs topDocs = searcher.search(query, 10);
        if (topDocs.totalHits.value == 0) {
            System.out.println("not found!\n");
            return;
        }

        ScoreDoc[] hits = topDocs.scoreDocs;

        System.out.println(topDocs.totalHits.value + " result(s) matched: ");
        for (ScoreDoc hit : hits) {
            Document doc = searcher.doc(hit.doc);
            System.out.println("doc=" + hit.doc + " score=" + hit.score + " file: " + doc.get("path"));
        }
        System.out.println();
    }
}

上面代码先读取索引文件,然后执行了一个term查询,查询所有包含death关键词的文档。为了方便打印,我们封装了一个resultPrint函数用于打印查询结果。On Death一诗包含了death关键字,所以程序执行结果为:

TermQuery, search for 'death':
1 result(s) matched: 
doc=3 score=0.6199532 file: data/poems/OnDeath.txt

后面的示例代码会基于上述代码结构再增加。

BooleanQuery

BooleanQuery用于将若干个查询按照与或的逻辑关系组织起来,支持嵌套。目前支持4个逻辑关系:

  • SHOULD:逻辑的关系,文档满足任意一个查询即视为匹配。
  • MUST:逻辑的关系,文档必须满足所有查询才视为匹配。
  • FILTER:逻辑的关系,与must的区别是不计算score,所以性能会比must好。如果只关注是否匹配,而不关注匹配程度(即得分),应该优先使用filter。
  • MUST NOT:逻辑与的关系,且取反。文档不满足所有查询的条件才视为匹配。

使用方式也比较简单,以下的代码使用BooleanQuery查询contents字段包含love但不包含seek的词:

private static void booleanQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("BooleanQuery, must contain 'love' but absolutely not 'seek': ");
    BooleanQuery.Builder builder = new BooleanQuery.Builder();
    builder.add(new TermQuery(new Term(SEARCH_FIELD, "love")), BooleanClause.Occur.MUST);
    builder.add(new TermQuery(new Term(SEARCH_FIELD, "seek")), BooleanClause.Occur.MUST_NOT);
    BooleanQuery booleanQuery = builder.build();

    resultPrint(searcher, booleanQuery);
}

Love's SecretFreedom and Love两首诗中均包含了love一词,但前者还包含了seek一词,所以最终的搜索结果为Freedom and Love

PhraseQuery

PhraseQuery用于搜索term序列,比如搜索“hello world”这个由两个term组成的一个序列。对于Phrase类的查询需要掌握两个点:

  1. Phrase查询需要term的position信息,所以如果indexing阶段没有保存position信息,就无法使用phrase类的查询。
  2. 理解slop的概念:Slop就是两个term或者两个term序列的edit distance。后面的FuzzyQuery也用到了该概念,这里简单介绍一下。

Edit distance

Edit distance用于描述两个字符串(词也是一种特殊的字符串)的相似度,其定义有多种,比较常用的是 Levenshtein distance 和其扩展 Damerau–Levenshtein distance。Lucene使用的就是这两种。Levenshtein distance是这样定义edit distance的:如果最少通过n个 增加(Insertion)/删除(Deletion)/替换(Substitution) 单个符号(symbol)的操作能使两个字符串相等,那这两个字符串的距离就是 n。这里有三个注意点:

  1. 只允许使用增、删、替换三种操作。
  2. 一次只能操作1个符号。如果是计算两个词的距离,那一个符号就代表一个字母;如果是计算两个句子的距离,那一个符号就代表一个词。
  3. 计算的是最少达到目标的操作数。

举几个例子:

  • cat与cut的edit distance是1,因为通过将cat的a替换为u这1个字母替换操作就可以让cat和cut相等。
  • cate与cut的edit distance是2,因为需要将cate的a替换为u,再将e删除两个操作才可以使得cate和cut相等。
  • cat与cta的edit distance是2,因为需要将cat的a替换为t,t替换为a两个操作才可以使得cat和cta相等。
  • "a bad boy"和"a good boy"的edit distance是1,因为需要将bad替换为good才能使两个句子相等(注意这里计算的是两个句子的distance,所以一个symbol就是一个词)。
  • "good boy"和"boy good"的edit distance是2,因为最少需要两步操作才可以使两个句子相等。

Damerau–Levenshtein distance对Levenshtein distance做了一个扩展:增加了一个transposition操作,定义 相邻 symbol的位置交换为1次操作,即distance为1。 这样的话在Levenshtein distance中,cat和cta的距离为2,但在Damerau–Levenshtein distance中,它们的距离就是1了;同理,"good boy"和"boy good"的距离也就是1了。

这就是所谓的Edit distance,PhraseQuery使用的是Levenshtein distance,且默认的slop值是0,也就是只检索完全匹配的term序列。看下面这个例子:

private static void phraseQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("\nPhraseQuery, search 'love that'");

    PhraseQuery.Builder builder = new PhraseQuery.Builder();
    builder.add(new Term(SEARCH_FIELD, "love"));
    builder.add(new Term(SEARCH_FIELD, "that"));
    PhraseQuery phraseQueryWithSlop = builder.build();

    resultPrint(searcher, phraseQueryWithSlop);
}


// 运行结果
PhraseQuery, search 'love that'
1 result(s) matched: 
doc=2 score=0.7089927 file: data/poems/Love'sSecret.txt

Love‘s Secret里面有这么一句:"Love that never told shall be",是能够匹配"love that"的。我们也可以修改slop的值,使得与搜索序列的edit distance小于等于slop的文档都可以被检索到,同时距离越小的文档评分越高。看下面例子:

private static void phraseQueryWithSlopDemo(IndexSearcher searcher) throws IOException {
    System.out.println("PhraseQuery with slop: 'love <slop> never");
    PhraseQuery phraseQueryWithSlop = new PhraseQuery(1, SEARCH_FIELD, "love", "never");

    resultPrint(searcher, phraseQueryWithSlop);
}

// 运行结果
PhraseQuery with slop: 'love <slop> never
1 result(s) matched: 
doc=2 score=0.43595996 file: data/poems/Love'sSecret.txt

MultiPhraseQuery

不论是官方文档或是网上的资料,对于MultiPhraseQuery讲解的都比较少。但其实它的功能很简单,举个例子就明白了:我们提供两个由term组成的数组:["love", "hate"], ["him", "her"],然后把这两个数组传给MultiPhraseQuery,它就会去检索 "love him", "love her", "hate him", "hate her"的组合,每一个组合其实就是一个上面介绍的PhraseQuery。当然MultiPhraseQuery也可以接受更高维的组合。

由上面的例子可以看到PhraseQuery其实是MultiPhraseQuery的一种特殊形式而已,如果给MultiPhraseQuery传递的每个数组里面只有一个term,那就退化成PhraseQuery了。在MultiPhraseQuery中,一个数组内的元素匹配时是 或(OR) 的关系,也就是这些term共享同一个position。 还记得之前的文章中我们说过在同一个position放多个term,可以实现同义词的搜索。的确MultiPhraseQuery实际中主要用于同义词的查询。比如查询一个“我爱土豆”,那可以构造这样两个数组传递给MultiPhraseQuery查询:["喜欢",“爱”], ["土豆","马铃薯","洋芋"],这样查出来的结果就会更全面一些。

最后来个例子:

private static void multiPhraseQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("MultiPhraseQuery:");

    // On Death 一诗中有这样一句: I know not what into my ear
    // Fog 一诗中有这样一句: It sits looking over harbor and city
    // 以下的查询可以匹配 "know harbor, know not, over harbor, over not" 4种情况
    MultiPhraseQuery.Builder builder = new MultiPhraseQuery.Builder();
    Term[] termArray1 = new Term[2];
    termArray1[0] = new Term(SEARCH_FIELD, "know");
    termArray1[1] = new Term(SEARCH_FIELD, "over");
    Term[] termArray2 = new Term[2];
    termArray2[0] = new Term(SEARCH_FIELD, "harbor");
    termArray2[1] = new Term(SEARCH_FIELD, "not");
    builder.add(termArray1);
    builder.add(termArray2);
    MultiPhraseQuery multiPhraseQuery = builder.build();

    resultPrint(searcher, multiPhraseQuery);
}

// 程序输出
MultiPhraseQuery:
2 result(s) matched: 
doc=0 score=2.7032354 file: data/poems/Fog.txt
doc=3 score=2.4798129 file: data/poems/OnDeath.txt

PrefixQuery, WildcardQuery, RegexpQuery

这三个查询提供模糊模糊查询的功能:

  • PrefixQuery只支持指定前缀模糊查询,用户指定一个前缀,查询时会匹配所有该前缀开头的term。
  • WildcardQuery比PrefixQuery更进一步,支持 *(匹配0个或多个字符)和 ?(匹配一个字符) 两个通配符。从效果上看,PrefixQuery是WildcardQuery的一种特殊情况,但其底层不是基于WildcardQuery,而是另外一种单独的实现。
  • RegexpQuery是比WildcardQuery更宽泛的查询,它支持正则表达式。支持的正则语法范围见org.apache.lucene.util.automaton.RegExp类。

需要注意,WildcardQuery和RegexpQuery的性能会差一些,因为它们需要遍历很多文档。特别是极力不推荐以模糊匹配开头。当然这里的差是相对其它查询来说的,我粗略测试过,2台16C+32G的ES,比较简短的文档,千万级以下的查询也能毫秒级返回。最后看几个使用的例子:

private static void prefixQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("PrefixQuery, search terms begin with 'co'");
    PrefixQuery prefixQuery = new PrefixQuery(new Term(SEARCH_FIELD, "co"));

    resultPrint(searcher, prefixQuery);
}

private static void wildcardQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("WildcardQuery, search terms 'har*'");
    WildcardQuery wildcardQuery = new WildcardQuery(new Term(SEARCH_FIELD, "har*"));

    resultPrint(searcher, wildcardQuery);
}

private static void regexpQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("RegexpQuery, search regexp 'l[ao]*'");
    RegexpQuery regexpQuery = new RegexpQuery(new Term(SEARCH_FIELD, "l[ai].*"));

    resultPrint(searcher, regexpQuery);
}


// 程序输出
PrefixQuery, search terms begin with 'co'
2 result(s) matched: 
doc=0 score=1.0 file: data/poems/Fog.txt
doc=2 score=1.0 file: data/poems/Love'sSecret.txt

WildcardQuery, search terms 'har*'
1 result(s) matched: 
doc=0 score=1.0 file: data/poems/Fog.txt

RegexpQuery, search regexp 'l[ao]*'
2 result(s) matched: 
doc=0 score=1.0 file: data/poems/Fog.txt
doc=3 score=1.0 file: data/poems/OnDeath.txt

FuzzyQuery

FuzzyQuery和PhraseQuery一样,都是基于上面介绍的edit distance做匹配的,差异是在PhraseQuery中搜索词的是一个term序列,此时edit distance中定义的一个symbol就是一个词;而FuzzyQuery的搜索词就是一个term,所以它对应的edit distance中的symbol就是一个字符了。另外使用时还有几个注意点:

  • PhraseQuery采用Levenshtein distance计算edit distance,即相邻symbol交换是2个slop,而FuzzyQuery默认使用Damerau–Levenshtein distance,所以相邻symbol交换是1个slop,但支持用户使用Levenshtein distance。
  • FuzzyQuery限制最大允许的edit distance为2(LevenshteinAutomata.MAXIMUM_SUPPORTED_DISTANCE值限定),因为对于更大的edit distance会匹配出特别多的词,但FuzzyQuery的定位是解决诸如美式英语和英式英语在拼写上的细微差异。
  • FuzzyQuery匹配的时候还有个要求就是搜索的term和待匹配的term的edit distance必须小于它们二者长度的最小值。比如搜索词为"abcd",设定允许的maxEdits(允许的最大edit distance)为2,那么按照edit distance的计算方式"ab"这个词是匹配的,因为它们的距离是2,不大于设定的maxEdits。但是,由于 2 < min( len("abcd"), len("ab") ) = 2不成立,所以算不匹配。

最后看个例子:

private static void fuzzyQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("FuzzyQuery, search 'remembre'");
    // 这里把remember拼成了remembre
    FuzzyQuery fuzzyQuery = new FuzzyQuery(new Term(SEARCH_FIELD, "remembre"), 1);

    resultPrint(searcher, fuzzyQuery);
}

// 程序输出
FuzzyQuery, search 'remembre'
1 result(s) matched: 
doc=1 score=0.4473783 file: data/poems/FreedomAndLove.txt

PointRangeQuery

前面介绍Field的时候,我们介绍过几种常用的数值型Field:IntPoint、LongPoint、FloatPoint、DoublePoint。PointRangeQuery就是给数值型数据提供范围查询的一个Query,功能和原理都很简单,我们直接看一个完整的例子吧:

/**
 * Point Query Demo.
 *
 * @author NiYanchun
 **/
public class PointQueryDemo {

    public static void main(String[] args) throws Exception {
        // 索引保存目录
        final String indexPath = "indices/point-index";
        Directory indexDir = FSDirectory.open(Paths.get(indexPath));
        IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
        iwc.setOpenMode(IndexWriterConfig.OpenMode.CREATE);
        IndexWriter writer = new IndexWriter(indexDir, iwc);

        // 向索引中插入10条document,每个document包含一个field字段,字段值是0~10之间的数字
        for (int i = 0; i < 10; i++) {
            Document doc = new Document();
            Field pointField = new IntPoint("field", i);
            doc.add(pointField);
            writer.addDocument(doc);
        }
        writer.close();

        // 查询
        IndexReader indexReader = DirectoryReader.open(FSDirectory.open(Paths.get(indexPath)));
        IndexSearcher searcher = new IndexSearcher(indexReader);

        // 查询field字段值在[5, 8]范围内的文档
        Query query = IntPoint.newRangeQuery("field", 5, 8);
        TopDocs topDocs = searcher.search(query, 10);

        if (topDocs.totalHits.value == 0) {
            System.out.println("not found!");
            return;
        }

        ScoreDoc[] hits = topDocs.scoreDocs;

        System.out.println(topDocs.totalHits.value + " result(s) matched: ");
        for (ScoreDoc hit : hits) {
            System.out.println("doc=" + hit.doc + " score=" + hit.score);
        }
    }
}

// 程序输出
4 result(s) matched: 
doc=5 score=1.0
doc=6 score=1.0
doc=7 score=1.0
doc=8 score=1.0

完整代码见这里

TermRangeQuery

TermRangeQuery和PointRangeQuery功能类似,不过它比较的是字符串,而非数值。比较基于org.apache.lucene.util.BytesRef.compareTo(BytesRef other)方法。直接看例子:

private static void termRangeQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("TermRangeQuery, search term between 'loa' and 'lov'");
    // 后面的true和false分别表示 loa <= 待匹配的term < lov
    TermRangeQuery termRangeQuery = new TermRangeQuery(SEARCH_FIELD, new BytesRef("loa"), new BytesRef("lov"), true, false);

    resultPrint(searcher, termRangeQuery);
}

// 程序输出
TermRangeQuery, search term between 'loa' and 'lov'
1 result(s) matched: 
doc=0 score=1.0 file: data/poems/Fog.txt    // Fog中的term 'looking' 符合搜索条件

ConstantScoreQuery

ConstantScoreQuery很简单,它的功能是将其它查询包装起来,并将它们查询结果中的评分改为一个常量值(默认为1.0)。上面FuzzyQuery一节里面最后举得例子中返回的查询结果score=0.4473783,现在我们用ConstantScoreQuery包装一下看下效果:

private static void constantScoreQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("ConstantScoreQuery:");
    ConstantScoreQuery constantScoreQuery = new ConstantScoreQuery(
            new FuzzyQuery(new Term(SEARCH_FIELD, "remembre"), 1));

    resultPrint(searcher, constantScoreQuery);
}

// 运行结果
ConstantScoreQuery:
1 result(s) matched: 
doc=1 score=1.0 file: data/poems/FreedomAndLove.txt

另外有个知识点需要注意:ConstantScoreQuery嵌套Filter和BooleanQuery嵌套Filter的查询结果不考虑评分的话是一样的,但前面在BooleanQuery中介绍过Filter,其功能与MUST相同,但不计算评分;而ConstantScoreQuery就是用来设置一个评分的。所以两者的查询结果是一样的,但ConstantScoreQuery嵌套Filter返回结果是附带评分的,而BooleanQuery嵌套Filter的返回结果是没有评分的(score字段的值为0)。

MatchAllDocsQuery

这个查询很简单,就是匹配所有文档,用于没有特定查询条件,只想预览部分数据的场景。直接看例子:

private static void matchAllDocsQueryDemo(IndexSearcher searcher) throws IOException {
    System.out.println("MatchAllDocsQueryDemo:");
    MatchAllDocsQuery matchAllDocsQuery = new MatchAllDocsQuery();

    resultPrint(searcher, matchAllDocsQuery);
}

// 程序输出
MatchAllDocsQueryDemo:
4 result(s) matched: 
doc=0 score=1.0 file: data/poems/Fog.txt
doc=1 score=1.0 file: data/poems/FreedomAndLove.txt
doc=2 score=1.0 file: data/poems/Love'sSecret.txt
doc=3 score=1.0 file: data/poems/OnDeath.txt

References

]]>
<![CDATA[Lucene系列(7)——索引存储文件介绍]]> http://niyanchun.com/lucene-learning-7.html 2019-10-19T08:00:00+08:00 2019-10-19T08:00:00+08:00 NYC https://niyanchun.com 注:本文基于Lucene 8.2.0 版本。

本文讨论Lucene底层索引数据存储。对于绝大数多人来说了解Lucene的上层概念足矣,无需关注底层的存储格式。所以本文虽然是讨论底层数据存储的,但也不会深入到具体的数据结构、压缩算法等。如果你有兴趣,可以查看对应版本的Lucene Java doc(8.2.0版本的链接已经附在文末)。另外,如果你对index、document、term、segment、term vector、norm等上层概念还不清楚,建议先阅读该系列文章的前几篇。

索引文件格式

不论是Solr还是ES,底层index的存储都是完全使用Lucene原生的方式,没有做改变,所以本文会以ES为例来介绍。需要注意的是Lucene的index在ES中称为shard,本文中提到的index都指的是Lucene的index,即ES中的shard。先来看一个某个index的数据目录:

index-file

可以看到一个索引包含了很多文件,似乎很复杂。但仔细观察之后会发现乱中似乎又有些规律:很多文件前缀一样,只是后缀不同,比如有很多_3c开头的文件。回想一下之前文章的介绍,index由若干个segment组成,而一个index目录下前缀相同表示这些文件都属于同一个segment

那各种各样的后缀又代表什么含义呢?Lucene存储segment时有两种方式:

  • multifile格式。该模式下会产生很多文件,不同的文件存储不同的信息,其弊端是读取index时需要打开很多文件,可能造成文件描述符超出系统限制。
  • compound格式。一般简写为CFS(Compound File System),该模式下会将很多小文件合并成一个大文件,以减少文件描述符的使用。

我们先来介绍multifile格式下的各个文件:

  • write.lock:每个index目录都会有一个该文件,用于防止多个IndexWriter同时写一个文件。
  • segments_N:该文件记录index所有segment的相关信息,比如该索引包含了哪些segment。IndexWriter每次commit都会生成一个(N的值会递增),新文件生成后旧文件就会删除。所以也说该文件用于保存commit point信息。

上面这两个文件是针对当前index的,所以每个index目录下都只会有1个(segments_N可能因为旧的没有及时删除临时存在两个)。下面介绍的文件都是针对segment的,每个segment就会有1个。

  • .siSegment Info的缩写,用于记录segment的一些元数据信息。
  • .fnmFields,用于记录fields设置类信息,比如字段的index option信息,是否存储了norm信息、DocValue等。
  • .fdtField Data,存储字段信息。当通过StoredField或者Field.Store.YES指定存储原始field数据时,这些数据就会存储在该文件中。
  • .fdxField Index.fdt文件的索引/指针。通过该文件可以快速从.fdt文件中读取field数据。
  • .docFrequencies,存储了一个documents列表,以及它们的term frequency信息。
  • .posPositions,和.doc类似,但保存的是position信息。
  • .pay:Payloads,和.doc类似,但保存的是payloads和offset信息。
  • .timTerm Dictionary,存储所有文档analyze出来的term信息。同时还包含term对应的document number以及若干指向.doc, .pos, .pay的指针,从而可以快速获取term的term vector信息。。
  • .tipTerm Index,该文件保存了Term Dictionary的索引信息,使得可以对Term Dictionary进行随机访问。
  • .nvd, .nvmNorms,这两个都是用来存储Norms信息的,前者用于存储norms的数据,后者用于存储norms的元数据。
  • .dvd, .dvmPer-Document Values,这两个都是用来存储DocValues信息的,前者用于数据,后者用于存储元数据。
  • .tvdTerm Vector Data,用于存储term vector数据。
  • .tvxTerm Vector Index,用于存储Term Vector Data的索引数据。
  • .livLive Documents,用于记录segment中哪些documents没有被删除。一般不存在该文件,表示segment内的所有document都是live的。如果有documents被删除,就会产生该文件。以前是使用一个.del后缀的文件来记录被删除的documents,现在改为使用该文件了。
  • .dim,.diiPoint values,这两个文件用于记录indexing的Point信息,前者保存数据,后者保存索引/指针,用于快速访问前者。

上面介绍了很多文件类型,实际中不一定都有,如果indexing阶段不保存字段的term vector信息,那存储term vector的相关文件可能就不存在。如果一个index的segment非常多,那将会有非常非常多的文件,检索时,这些文件都是要打开的,很可能会造成文件描述符不够用,所以Lucene引入了前面介绍的CFS格式,它把上述每个segment的众多文件做了一个合并压缩(.liv.si没有被合并,依旧单独写文件),最终形成了两个新文件:.cfs.cfe,前者用于保存数据,后者保存了前者的一个Entry Table,用于快速访问。所以,如果使用CFS的话,最终对于每个segment,最多就只存在.cfs, .cfe, .si, .liv4个文件了。Lucene从1.4版本开始,默认使用CFS来保存segment数据,但开发者仍然可以选择使用multifile格式。一般来说,对于小的segment使用CFS,对于大的segment,使用multifile格式。比如Lucene的org.apache.lucene.index.MergePolicy构造函数中就提供merge时在哪些条件下使用CFS:

  /**
   * Default ratio for compound file system usage. Set to <tt>1.0</tt>, always use 
   * compound file system.
   */
  protected static final double DEFAULT_NO_CFS_RATIO = 1.0;

  /**
   * Default max segment size in order to use compound file system. Set to {@link Long#MAX_VALUE}.
   */
  protected static final long DEFAULT_MAX_CFS_SEGMENT_SIZE = Long.MAX_VALUE;

  /** If the size of the merge segment exceeds this ratio of
   *  the total index size then it will remain in
   *  non-compound format */
  protected double noCFSRatio = DEFAULT_NO_CFS_RATIO;
  
  /** If the size of the merged segment exceeds
   *  this value then it will not use compound file format. */
  protected long maxCFSSegmentSize = DEFAULT_MAX_CFS_SEGMENT_SIZE;

  /**
   * Creates a new merge policy instance.
   */
  public MergePolicy() {
    this(DEFAULT_NO_CFS_RATIO, DEFAULT_MAX_CFS_SEGMENT_SIZE);
  }
  
  /**
   * Creates a new merge policy instance with default settings for noCFSRatio
   * and maxCFSSegmentSize. This ctor should be used by subclasses using different
   * defaults than the {@link MergePolicy}
   */
  protected MergePolicy(double defaultNoCFSRatio, long defaultMaxCFSSegmentSize) {
    this.noCFSRatio = defaultNoCFSRatio;
    this.maxCFSSegmentSize = defaultMaxCFSSegmentSize;
  }

接下来让我们使用ES做一些操作来具体感受一下。

一些例子

首先在ES中创建一个索引:

PUT nyc-test
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "refresh_interval": -1
  }
}

这里设置1个shard,0个副本,并且将refresh_interval设置为-1,表示不自动刷新。创建完之后就可以在es的数据目录找到该索引,es的后台索引的目录结构为:<数据目录>/nodes/0/indices/<索引UUID>/<shard>/index,这里的shard就是Lucene的index。我们看下刚创建的index的目录:

-> % ll
总用量 4.0K
-rw-rw-r-- 1 allan allan 230 10月 11 21:45 segments_2
-rw-rw-r-- 1 allan allan   0 10月 11 21:45 write.lock

可以看到,现在还没有写入任何数据,所以只有index级别的segments_Nwrite.lock文件,没有segment级别的文件。写入1条数据并查看索引目录的变化:

PUT nyc-test/doc/1
{
  "name": "Jack"
}

# 查看索引目录
-> % ll
总用量 4.0K
-rw-rw-r-- 1 allan allan   0 10月 11 22:20 _0.fdt
-rw-rw-r-- 1 allan allan   0 10月 11 22:20 _0.fdx
-rw-rw-r-- 1 allan allan 230 10月 11 22:19 segments_2
-rw-rw-r-- 1 allan allan   0 10月 11 22:19 write.lock

可以看到出现了1个segment的数据,因为ES把数据缓存在内存里面,所以文件大小为0。然后再写入1条数据,并查看目录变化:

PUT nyc-test/doc/2
{
  "name": "Allan"
}

# 查看目录
-> % ll
总用量 4.0K
-rw-rw-r-- 1 allan allan   0 10月 11 22:20 _0.fdt
-rw-rw-r-- 1 allan allan   0 10月 11 22:20 _0.fdx
-rw-rw-r-- 1 allan allan 230 10月 11 22:19 segments_2
-rw-rw-r-- 1 allan allan   0 10月 11 22:19 write.lock

因为ES缓存机制的原因,目录没有变化。显式的refresh一下,让内存中的数据落地:

POST nyc-test/_refresh

-> % ll
总用量 16K
-rw-rw-r-- 1 allan allan  405 10月 11 22:22 _0.cfe
-rw-rw-r-- 1 allan allan 2.5K 10月 11 22:22 _0.cfs
-rw-rw-r-- 1 allan allan  393 10月 11 22:22 _0.si
-rw-rw-r-- 1 allan allan  230 10月 11 22:19 segments_2
-rw-rw-r-- 1 allan allan    0 10月 11 22:19 write.lock

ES的refresh操作会将内存中的数据写入到一个新的segment中,所以refresh之后写入的两条数据形成了一个segment,并且使用CFS格式存储了。然后再插入1条数据,接着update这条数据:

PUT nyc-test/doc/3
{
  "name": "Patric"
}

# 查看
-> % ll
总用量 16K
-rw-rw-r-- 1 allan allan  405 10月 11 22:22 _0.cfe
-rw-rw-r-- 1 allan allan 2.5K 10月 11 22:22 _0.cfs
-rw-rw-r-- 1 allan allan  393 10月 11 22:22 _0.si
-rw-rw-r-- 1 allan allan    0 10月 11 22:23 _1.fdt
-rw-rw-r-- 1 allan allan    0 10月 11 22:23 _1.fdx
-rw-rw-r-- 1 allan allan  230 10月 11 22:19 segments_2
-rw-rw-r-- 1 allan allan    0 10月 11 22:19 write.lock

# 更新数据
PUT nyc-test/doc/3?refresh=true
{
  "name": "James"
}

# 查看
-> % ll
总用量 32K
-rw-rw-r-- 1 allan allan  405 10月 11 22:22 _0.cfe
-rw-rw-r-- 1 allan allan 2.5K 10月 11 22:22 _0.cfs
-rw-rw-r-- 1 allan allan  393 10月 11 22:22 _0.si
-rw-rw-r-- 1 allan allan   67 10月 11 22:24 _1_1.liv
-rw-rw-r-- 1 allan allan  405 10月 11 22:24 _1.cfe
-rw-rw-r-- 1 allan allan 2.5K 10月 11 22:24 _1.cfs
-rw-rw-r-- 1 allan allan  393 10月 11 22:24 _1.si
-rw-rw-r-- 1 allan allan  230 10月 11 22:19 segments_2
-rw-rw-r-- 1 allan allan    0 10月 11 22:19 write.lock

可以看到,再次refresh的时候又形成了一个新的segment,并且因为update,导致删掉了1条document,所以产生了一个.liv文件。但前面的这些流程中,segments_N文件也就是segments_2一直没有变过,这是因为一直没有Lucene概念中的commit操作发生过。ES的flush操作对应的是Lucene的commit,我们触发一次Lucene commit看下变化:

# 触发Lucene commit
POST nyc-test/_flush?wait_if_ongoing

# 查看目录
-> % ll
总用量 32K
-rw-rw-r-- 1 allan allan  405 10月 11 22:22 _0.cfe
-rw-rw-r-- 1 allan allan 2.5K 10月 11 22:22 _0.cfs
-rw-rw-r-- 1 allan allan  393 10月 11 22:22 _0.si
-rw-rw-r-- 1 allan allan   67 10月 11 22:24 _1_1.liv
-rw-rw-r-- 1 allan allan  405 10月 11 22:24 _1.cfe
-rw-rw-r-- 1 allan allan 2.5K 10月 11 22:24 _1.cfs
-rw-rw-r-- 1 allan allan  393 10月 11 22:24 _1.si
-rw-rw-r-- 1 allan allan  361 10月 11 22:25 segments_3
-rw-rw-r-- 1 allan allan    0 10月 11 22:19 write.lock

# 查看segment信息
GET _cat/segments/nyc-test?v

index    shard prirep ip        segment generation docs.count docs.deleted  size size.memory committed searchable version compound
nyc-test 0     p      10.8.4.42 _0               0          2            0 3.2kb        1184 true      true       7.4.0   true
nyc-test 0     p      10.8.4.42 _1               1          1            2 3.2kb        1184 true      true       7.4.0   true

触发Lucene commit之后,可以看到segments_2变成了segments_3。然后调用_cat接口查看索引的segment信息也能看到目前有2个segment,而且都已经commit过了,并且compound是true,表示是CFS格式存储的。当然Lucene的segment是可以合并的。我们通过ES的forcemerge接口进行合并,并且将所有segment合并成1个segment,forcemerge的时候会自动调用flush,即会触发Lucene commit:

POST nyc-test/_forcemerge?max_num_segments=1

-> % ll
总用量 60K
-rw-rw-r-- 1 allan allan  69 10月 11 22:27 _2.dii
-rw-rw-r-- 1 allan allan 123 10月 11 22:27 _2.dim
-rw-rw-r-- 1 allan allan 142 10月 11 22:27 _2.fdt
-rw-rw-r-- 1 allan allan  83 10月 11 22:27 _2.fdx
-rw-rw-r-- 1 allan allan 945 10月 11 22:27 _2.fnm
-rw-rw-r-- 1 allan allan 110 10月 11 22:27 _2_Lucene50_0.doc
-rw-rw-r-- 1 allan allan  80 10月 11 22:27 _2_Lucene50_0.pos
-rw-rw-r-- 1 allan allan 287 10月 11 22:27 _2_Lucene50_0.tim
-rw-rw-r-- 1 allan allan 145 10月 11 22:27 _2_Lucene50_0.tip
-rw-rw-r-- 1 allan allan 100 10月 11 22:27 _2_Lucene70_0.dvd
-rw-rw-r-- 1 allan allan 469 10月 11 22:27 _2_Lucene70_0.dvm
-rw-rw-r-- 1 allan allan  59 10月 11 22:27 _2.nvd
-rw-rw-r-- 1 allan allan 100 10月 11 22:27 _2.nvm
-rw-rw-r-- 1 allan allan 572 10月 11 22:27 _2.si
-rw-rw-r-- 1 allan allan 296 10月 11 22:27 segments_4
-rw-rw-r-- 1 allan allan   0 10月 11 22:19 write.lock


GET _cat/segments/nyc-test?v

index    shard prirep ip        segment generation docs.count docs.deleted  size size.memory committed searchable version compound
nyc-test 0     p      10.8.4.42 _2               2          3            0 3.2kb        1224 true      true       7.4.0   false

可以看到,force merge之后只有一个segment了,并且使用了multifile格式存储,而不是compound。当然这并非Lucene的机制,而是ES自己的设计。

最后用图总结一下:

Lucene-Index-Files-Format.png

本文就介绍到这里,对于绝大多数使用者来说,只需要知道Lucene索引后台存储的组织逻辑和层次,以更好的使用Lucene及基于Lucene的产品即可。

References

]]>
<![CDATA[ES中Term Aggregation的准确性问题讨论]]> http://niyanchun.com/term-aggregation-accuracy-in-es.html 2019-10-13T07:28:00+08:00 2019-10-13T07:28:00+08:00 NYC https://niyanchun.com 本文讨论ElasticSearch Bucket Aggregations中的Term Aggregation结果准确性的问题,所有测试基于ES 7.1.0版本,DSL语句通过Kibana dev tool执行。

问题演示

我们先来看一个查询不准确的例子,该例子借鉴自官方文档(链接见文末),但为了操作方便,减小了数据规模。创建一个名为products,包含3个shard,0个副本的索引,及一个keyword类型的 name 字段。通过以下命令创建该索引,并写入一些测试数据:

# 创建索引
PUT products
{
  "settings": {
    "number_of_shards": 3, 
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "keyword"
      }
    }
  }
}

# 写入测试数据
PUT products/_bulk?refresh
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "A" }
{ "index" :{ "routing": "key1"}}
{ "name": "B" }
{ "index" :{ "routing": "key1"}}
{ "name": "B" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "C" }
{ "index" :{ "routing": "key1"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "A" }
{ "index" :{ "routing": "key2"}}
{ "name": "B" }
{ "index" :{ "routing": "key2"}}
{ "name": "B" }
{ "index" :{ "routing": "key2"}}
{ "name": "B" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key2"}}
{ "name": "D" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "A" }
{ "index" :{ "routing": "key3"}}
{ "name": "B" }
{ "index" :{ "routing": "key3"}}
{ "name": "B" }
{ "index" :{ "routing": "key3"}}
{ "name": "B" }
{ "index" :{ "routing": "key3"}}
{ "name": "C" }
{ "index" :{ "routing": "key3"}}
{ "name": "D" }
{ "index" :{ "routing": "key3"}}
{ "name": "D" }

# 查看数据分布
# GET _cat/shards/products?v
index    shard prirep state   docs store ip         node
products 2     p      STARTED   12 3.6kb 172.19.0.5 es7_01
products 1     p      STARTED   11 3.6kb 172.19.0.2 es7_02
products 0     p      STARTED   14 3.7kb 172.19.0.5 es7_01

通过上面的一些命令,我们构造了一个如下分布的索引数据:

shard

products索引包含A、B、C、D四种产品,每个产品都包含了若干条文档,且分布在不同的shard上面,总体情况如下:

  • 37 条数据,Shard 0上 14 条,Shard 1上 11 条, Shard 2上 12 条。
  • 产品 A 共 16 条数据,产品 B 共 8 条数据,产品 C 共 6 条数据,产品 D 共 7 条数据。

现在我们使用Term Aggregation求一下产品个数的Top 2产品:

GET products/_search
{
  "size": 0, 
  "aggs": {
    "product": {
      "terms": {
        "field": "name",
        "size": 2,
        "shard_size": 2
      }
    }
  }
}

返回结果如下:

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 37,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "product" : {
      "doc_count_error_upper_bound" : 12,
      "sum_other_doc_count" : 16,
      "buckets" : [
        {
          "key" : "A",
          "doc_count" : 16
        },
        {
          "key" : "C",
          "doc_count" : 5
        }
      ]
    }
  }
}

我们关注一下aggregations部分,Term Aggregation默认是按照文档个数 doc_count 降序排列的,所以可以看到Top 2的结果是A(16条),C(5条)。但根据前面构造的数据我们知道正确的Top 2产品应该是A(16条),B(8条)。那为什么ES计算的结果是不准确的呢?

因为ES的是分布式的,所以查询、聚集等任务会分派到各个节点,每个节点查询自己上面的所有Shard并返回计算结果,再由一个协调节点(Coordinating Node)汇总各个节点返回的数据,得到最终结果。以上面的查询为例,计算的是Top 2(即 size 的值为2),同时指定了 shard_size 为2,即每个shard返回2条数据(shard_size的默认值是size * 1.5 + 10,因为构造的原始数据量太小,如果使用默认值,每个节点就返回了几乎全部数据,不会存在准确性问题,所以这里显式的设置为2,用来说明问题)。具体执行的时候,每个shard会遍历自己的所有数据,求出Top M的数据(M为shard_size指定的值),在上例中M为2,所以各个Shard计算的Top 2为:

  • Shard 0 计算的Top 2:A(6), C(5)
  • Shard 1 计算的Top 2:A(5), B(3)
  • Shard 2 计算的Top 2:A(5), D(4)

协调节点汇总这些数据之后得到的结果为:A(16), B(3), C(5), D(4),所以最终返回的Top 2为A(16), C(5),即上面查询的返回结果。可以看到最终结果的不准确还是因为在计算的过程中没有任何一个环节能看到全量的数据。这其实也是分布式计算中的经典问题,CAP理论的一个具体体现。要保证快速响应,就无法一次计算全量数据;如果计算全量数据,就无法保证响应速度。比如流式计算实时性高,但并不是accurate的;离线计算虽然是accurate的,但又是非实时的,所以就诞生了Lambda架构:使用流式计算做预处理保证实时性及局部准确性,再由离线计算保证全局的accuracy。当然最新的流式框架已经在通过state机制解决流式计算的accuracy问题了,有兴趣的可以阅读《Streaming Systems》一书。

正因为结算结果可能存在的不准确性,Term Aggregation在返回结果中提供了一些评估不准确性的字段,总共有三个:

  • 全局的doc_count_error_upper_bound
  • sum_other_doc_count
  • 每个bucket的doc_count_error_upper_bound

其中前两个默认就会返回,最后一个需要在查询时显式的设置:

GET products/_search
{
  "size": 0, 
  "aggs": {
    "product": {
      "terms": {
        "field": "name",
        "size": 2,
        "shard_size": 2,
        "show_term_doc_count_error": true        # 将此参数设置为true
      }
    }
  }
}

# 返回结果
{
  "took" : 7,
  "timed_out" : false,
  "_shards" : {
    "total" : 3,
    "successful" : 3,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 37,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  },
  "aggregations" : {
    "product" : {
      "doc_count_error_upper_bound" : 12,
      "sum_other_doc_count" : 16,
      "buckets" : [
        {
          "key" : "A",
          "doc_count" : 16,
          "doc_count_error_upper_bound" : 0   # 多出了一项
        },
        {
          "key" : "C",
          "doc_count" : 5,
          "doc_count_error_upper_bound" : 7   # 多出了一项
        }
      ]
    }
  }
}

下面我们来分析这三个用于评估准确性的值。

1. 全局的doc_count_error_upper_bound

这个值的计算方式是累加每个shard返回的最后一个term的doc_count值。上例中,每个Shard返回的最后一个term及其doc_count依次为:C(5), B(3), D(4), 所以doc_count_error_upper_bound的值为 5+3+4=12。

该值代表的含义是某个没有被纳入到最终结果的term的最大可能的doc_count值。举个例子,比如有一个产品E分别分布在三个shard上面,且依次为E(5), E(3), E(4),因为它和每个shard上的前一个term的doc_count相同,所以可能不会被最终返回,而它的实际doc_count就是5+3+4=12。也就是最坏的情况下可能有一个doc_count为12(在上面的例子中按照doc_count应该排在第2位)的一个产品没有返回到最终Top N的结果中。

2. 每个bucket的doc_count_error_upper_bound

这个值的计算方式是累加没有返回该bucket的所有Shard的最后一个term的doc_count值。有些拗口,上例中,比如对于A的bucket,其doc_count_error_upper_bound值为0,是因为3个shard的返回中都包含了A。而对于C的bucket,Shard 1和Shard 2都没有返回C,所以它的doc_count_error_upper_bound值就是Shard 1和Shard 2的最后一个term的doc_count值之和,即 3+4=7.

3. sum_other_doc_count

这个值的含义是最终没有被返回的doc的总数,计算方式是总doc数减去返回的doc数。上例中,总的doc数为37,返回的doc数为16+5=21,所以没返回的doc总数就是37-21=16.

该值代表的含义是最差情况下统计的doc_count的偏差。比如C的实际doc_count是6,而最终返回的结果是5,差了1. 而最差情况下可能的差值是7,这种情况就是Shard 1和Shard 2中都包含C,且doc_count依次为3和4,但因为它们和B、D的doc_count相同,可能被排在了后面,没有返回到最终结果里面。换句话说,如果该值为0,那就代表该bucket的doc_count值是准确的,否则就可能是不准确的,最大可能的偏差就是该字段的值。该值的另外一个作用就是可以作为shard_size值的参考。

最后需要注意的是,上面两个doc_count_error_upper_bound的值只有在采用doc_count进行降序排列(默认情况)时才会计算,doc_count升序或者采用其它字段排序时是不计算的(不计算的时候其值可能是0——按照_key排序,也可能是-1——按照sub aggregation排序)。

不准确的解决方案

那该如何解决这种不准确性呢?

  • 方案1:增大shard_size的值。很显然,shard_size的值如果不小于每个shard上面的doc数量,就相当于把所有shard上面符合过滤条件(如果有的话)的数据全部发送给协调节点去处理,也就是全局全量数据处理了,自然不会出现准确性问题。但如果数据量比较大,显然协调节点会成为处理瓶颈。所以该方案在实际中存在一定的限制,只能在情况允许的情况下增加shard_size的值以提高准确性,但不可能无限制的增加。
  • 方案2:将所有数据存储到一个shard上面。这样就避免了分布式计算,自然也不会产生不准确的问题。但如果数据量很大,就会造成shard很大,单个计算很容易成为瓶颈,所以该方案仅适用于数据量不是很大的场景,比如一些指标数据。
  • 方案3:索引时使用Term Aggregation的field字段做routing,保证同一个bucket的数据存储在同一个shard上面,这样也可以避免不准确的问题。潜在的风险就是如果这个field的数据分布不均匀,那最终的存储就可能不均衡。

每种方案都有利有弊,需要根据实际情况进行选择。

References

  1. Elasticsearch Reference#Term Aggregation
]]>
<![CDATA[详解ElasticSearch中的路由(_routing)机制]]> http://niyanchun.com/routing-in-es.html 2019-09-30T22:23:00+08:00 2019-09-30T22:23:00+08:00 NYC https://niyanchun.com 注:本文的所有测试基于ES 7.1.0版本。

ES中的路由(routing)机制决定一个document存储到索引的哪个shard上面去,即文档到shard的路由。计算公式为:

shard_num = hash(_routing) % num_primary_shards

其中_routing是路由字段的值,默认使用文档的ID字段:_id。如果我们想自己控制数据的路由规则的话,那可以修改这个默认值。修改的方式非常简单,只需要在插入数据的时候指定路由的key即可。虽然使用简单,但有许多的细节需要注意。我们从一个例子看起(注:本文关于ES的命令都是在Kibana dev tool中执行的):

// 步骤1:先创建一个名为route_test的索引,该索引有3个shard,0个副本
PUT route_test/
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 0
  }
}

// 步骤2:查看shard
GET _cat/shards/route_test?v
index      shard prirep state   docs store ip         node
route_test 1     p      STARTED    0  230b 172.19.0.2 es7_02
route_test 0     p      STARTED    0  230b 172.19.0.5 es7_01

// 步骤3:插入第1条数据
PUT route_test/_doc/a?refresh
{
  "data": "A"
}

// 步骤4:查看shard
GET _cat/shards/route_test?v
index      shard prirep state   docs store ip         node
route_test 1     p      STARTED    0  230b 172.19.0.2 es7_02
route_test 0     p      STARTED    1 3.3kb 172.19.0.5 es7_01

// 步骤5:插入第2条数据
PUT route_test/_doc/b?refresh
{
  "data": "B"
}

// 步骤6:查看数据
GET _cat/shards/route_test?v
index      shard prirep state   docs store ip         node
route_test 1     p      STARTED    1 3.3kb 172.19.0.2 es7_02
route_test 0     p      STARTED    1 3.3kb 172.19.0.5 es7_01

// 步骤7:查看此时索引里面的数据
GET route_test/_search
{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "a",
        "_score" : 1.0,
        "_source" : {
          "data" : "A"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "b",
        "_score" : 1.0,
        "_source" : {
          "data" : "B"
        }
      }
    ]
  }
}

上面这个例子比较简单,先创建了一个拥有2个shard,0个副本(为了方便观察)的索引 route_test 。创建完之后查看两个shard的信息,此时shard为空,里面没有任何文档( docs 列为0)。接着我们插入了两条数据,每次插完之后,都检查shard的变化。通过对比可以发现 docid=a 的第一条数据写入了0号shard,docid=b 的第二条数据写入了1号 shard。需要注意的是这里的doc id我选用的是字母"a"和"b",而非数字。原因是连续的数字很容易路由到一个shard中去。以上的过程就是不指定routing时候的默认行为。接着,我们指定routing,看一些有趣的变化。

// 步骤8:插入第3条数据
PUT route_test/_doc/c?routing=key1&refresh
{
  "data": "C"
}

// 步骤9:查看shard
GET _cat/shards/route_test?v
index      shard prirep state   docs store ip         node
route_test 1     p      STARTED    1 3.4kb 172.19.0.2 es7_02
route_test 0     p      STARTED    2 6.9kb 172.19.0.5 es7_01

// 步骤10:查看索引数据
GET route_test/_search
{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "a",
        "_score" : 1.0,
        "_source" : {
          "data" : "A"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "c",
        "_score" : 1.0,
        "_routing" : "key1",
        "_source" : {
          "data" : "C"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "b",
        "_score" : 1.0,
        "_source" : {
          "data" : "B"
        }
      }
    ]
  }
}

我们又插入了1条 docid=c 的新数据,但这次我们指定了路由,路由的值是一个字符串"key1". 通过查看shard信息,能看出这条数据路由到了0号shard。也就是说用"key1"做路由时,文档会写入到0号shard。接着我们使用该路由再插入两条数据,但这两条数据的 docid 分别为之前使用过的 "a"和"b",你猜一下最终结果会是什么样?

// 步骤11:插入 docid=a 的数据,并指定 routing=key1
PUT route_test/_doc/a?routing=key1&refresh
{
  "data": "A with routing key1"
}

// es的返回信息为:
{
  "_index" : "route_test",
  "_type" : "_doc",
  "_id" : "a",
  "_version" : 2,
  "result" : "updated",        // 注意此处为updated,之前的三次插入返回都为created
  "forced_refresh" : true,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

// 步骤12:查看shard
GET _cat/shards/route_test?v
index      shard prirep state   docs  store ip         node
route_test 1     p      STARTED    1  3.4kb 172.19.0.2 es7_02
route_test 0     p      STARTED    2 10.5kb 172.19.0.5 es7_01

// 步骤13:查询索引
GET route_test/_search
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "c",
        "_score" : 1.0,
        "_routing" : "key1",
        "_source" : {
          "data" : "C"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "a",
        "_score" : 1.0,
        "_routing" : "key1",
        "_source" : {
          "data" : "A with routing key1"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "b",
        "_score" : 1.0,
        "_source" : {
          "data" : "B"
        }
      }
    ]
  }
}

之前 docid=a 的数据就在0号shard中,这次依旧写入到0号shard中了,因为docid重复,所以文档被更新了。然后再插入 docid=b 的数据:

// 步骤14:插入 docid=b的数据,使用key1作为路由字段的值
PUT route_test/_doc/b?routing=key1&refresh
{
  "data": "B with routing key1"
}

// es返回的信息
{
  "_index" : "route_test",
  "_type" : "_doc",
  "_id" : "b",
  "_version" : 1,
  "result" : "created",        // 注意这里不是updated
  "forced_refresh" : true,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

// 步骤15:查看shard信息
GET _cat/shards/route_test?v
index      shard prirep state   docs store ip         node
route_test 1     p      STARTED    1 3.4kb 172.19.0.2 es7_02
route_test 0     p      STARTED    3  11kb 172.19.0.5 es7_01

// 步骤16:查询索引内容
{
  "took" : 6,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "c",
        "_score" : 1.0,
        "_routing" : "key1",
        "_source" : {
          "data" : "C"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "a",
        "_score" : 1.0,
        "_routing" : "key1",
        "_source" : {
          "data" : "A with routing key1"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "b",
        "_score" : 1.0,
        "_routing" : "key1",        // 和下面的 id=b 的doc相比,多了一个这个字段
        "_source" : {
          "data" : "B with routing key1"
        }
      },
      {
        "_index" : "route_test",
        "_type" : "_doc",
        "_id" : "b",
        "_score" : 1.0,
        "_source" : {
          "data" : "B"
        }
      }
    ]
  }
}

和步骤11插入docid=a 的那条数据相比,这次这个有些不同,我们来分析一下。步骤11中插入 docid=a 时,es返回的是updated,也就是更新了步骤2中插入的docid为a的数据,步骤12和13中查询的结果也能看出,并没有新增数据,route_test中还是只有3条数据。而步骤14插入 docid=b 的数据时,es返回的是created,也就是新增了一条数据,而不是updated原来docid为b的数据,步骤15和16的确也能看出多了一条数据,现在有4条数据。而且从步骤16查询的结果来看,有两条docid为b的数据,但一个有routing,一个没有。而且也能分析出有routing的在0号shard上面,没有的那个在1号shard上。

这个就是我们自定义routing后会导致的一个问题:docid不再全局唯一。ES shard的实质是Lucene的索引,所以其实每个shard都是一个功能完善的倒排索引。ES能保证docid全局唯一是采用do id作为了路由,所以同样的docid肯定会路由到同一个shard上面,如果出现docid重复,就会update或者抛异常,从而保证了集群内docid唯一标识一个doc。但如果我们换用其它值做routing,那这个就保证不了了,如果用户还需要docid的全局唯一性,那只能自己保证了。因为docid不再全局唯一,所以doc的增删改查API就可能产生问题,比如下面的查询:

GET route_test/_doc/b

// es返回
{
  "_index" : "route_test",
  "_type" : "_doc",
  "_id" : "b",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "data" : "B"
  }
}


GET route_test/_doc/b?routing=key1

// es返回
{
  "_index" : "route_test",
  "_type" : "_doc",
  "_id" : "b",
  "_version" : 1,
  "_seq_no" : 3,
  "_primary_term" : 1,
  "_routing" : "key1",
  "found" : true,
  "_source" : {
    "data" : "B with routing key1"
  }
}

上面两个查询,虽然指定的docid都是b,但返回的结果是不一样的。所以,如果自定义了routing字段的话,一般doc的增删改查接口都要加上routing参数以保证一致性。为此,ES在mapping中提供了一个选项,可以强制检查doc的增删改查接口是否加了routing参数,如果没有加,就会报错。设置方式如下:

PUT <索引名>/
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 0
  },
  "mappings": {
    "_routing": {
      "required": true        // 设置为true,则强制检查;false则不检查,默认为false
    }
  }
}

举个例子:

PUT route_test1/
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 0
  },
  "mappings": {
    "_routing": {
      "required": true
    }
  }
}

// 写入一条数据
PUT route_test1/_doc/b?routing=key1
{
  "data": "b with routing"
}

// 以下的增删改查都会抱错
GET route_test1/_doc/b
PUT route_test1/_doc/b
{
  "data": "B"
}
DELETE route_test1/_doc/b

// 错误信息
  "error": {
    "root_cause": [
      {
        "type": "routing_missing_exception",
        "reason": "routing is required for [route_test1]/[_doc]/[b]",
        "index_uuid": "_na_",
        "index": "route_test1"
      }
    ],
    "type": "routing_missing_exception",
    "reason": "routing is required for [route_test1]/[_doc]/[b]",
    "index_uuid": "_na_",
    "index": "route_test1"
  },
  "status": 400
}

当然,很多时候自定义路由是为了减少查询时扫描shard的个数,从而提高查询效率。默认查询接口会搜索所有的shard,但也可以指定routing字段,这样就只会查询routing计算出来的shard,提高查询速度。使用方式也非常简单,只需在查询语句上面指定routing即可,允许指定多个:

GET route_test/_search?routing=key1,key2 
{
  "query": {
    "match": {
      "data": "b"
    }
  }
}

另外,指定routing还有个弊端就是容易造成负载不均衡。所以ES提供了一种机制可以将数据路由到一组shard上面,而不是某一个。只需在创建索引时(也只能在创建时)设置index.routing_partition_size,默认值是1,即只路由到1个shard,可以将其设置为大于1且小于索引shard总数的某个值,就可以路由到一组shard了。值越大,数据越均匀。当然,从设置就能看出来,这个设置是针对单个索引的,可以加入到动态模板中,以对多个索引生效。指定后,shard的计算方式变为:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

对于同一个routing值,hash(_routing)的结果固定的,hash(_id) % routing_partition_size的结果有 routing_partition_size 个可能的值,两个组合在一起,对于同一个routing值的不同doc,也就能计算出 routing_partition_size 可能的shard num了,即一个shard集合。但要注意这样做以后有两个限制:

  1. 索引的mapping中不能再定义join关系的字段,原因是join强制要求关联的doc必须路由到同一个shard,如果采用shard集合,这个是保证不了的。
  2. 索引mapping中_routingrequired必须设置为true。

但是对于第2点我测试了一下,如果不写mapping,是可以的,此时_routingrequired默认值其实是false的。但如果显式的写了,就必须设置为true,否则创建索引会报错。

// 不显式的设置mapping,可以成功创建索引
PUT route_test_3/
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 0,
    "routing_partition_size": 2
  }
}
// 查询也可以不用带routing,也可以正确执行,增删改也一样
GET route_test_3/_doc/a

// 如果显式的设置了mappings域,且required设置为false,创建索引就会失败,必须改为true
PUT route_test_4/
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 0,
    "routing_partition_size": 2
  },
  "mappings": {
    "_routing": {
      "required": false
    }
  }
}

不知道这算不算一个bug。ElasticSearch的routing算是一个高级用法,但的确非常有用。举个例子,比如互联网中的用户数据,我们可以用userid作为routing,这样就能保证同一个用户的数据全部保存到同一个shard去,后面检索的时候,同样使用userid作为routing,就可以精准的从某个shard获取数据了。对于超大数据量的搜索,routing再配合hot&warm的架构,是非常有用的一种解决方案。而且同一种属性的数据写到同一个shard还有很多好处,比如可以提高aggregation的准确性,以后的文章再介绍。

最后,祝祖国母亲70周年快乐!

]]>
<![CDATA[使用Logstash将Kafka中的数据导入到ElasticSearch]]> http://niyanchun.com/export-data-from-kafka-to-es-with-logstash.html 2019-09-28T17:55:55+08:00 2019-09-28T17:55:55+08:00 NYC https://niyanchun.com 本文介绍如何使用Logstash将Kafka中的数据写入到ElasticSearch。使用的各个组件版本如下:

  • kafka_2.11-2.2.0
  • elasticsearch 6.8.3
  • logstash 6.8.3

组件安装这里就不赘述了,主要说一下操作流程和需要注意的一些细节。

Logstash工作的流程由三部分组成:

  • input:输入(即source),必须要有,指明从那里读取数据。
  • filter:过滤,logstash对数据的ETL就是在这个里面进行的,这一步可选。
  • output:输出(即sink),必须要有,指明数据写入到哪里去。

所以在我们的场景里面,input就是kafka,output就是es。至于是否需要filter,看你的场景需不需要对input的数据做transform了,本文没有使用filter。input需要使用logstash-input-kafka插件,该插件logstash默认就带了,可以使用bin/logstash-plugin list | grep kafka命令确认。

1. 基本功能

创建一个的Logstash配置文件 kafka.conf,内容如下:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["nyc-test"]
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "logstash-kafka-%{+YYYY.MM.dd}"
  }
}

这个配置文件非常简单,

  • 在input中配置了kafka的broker和topic信息,有了这两项配置,Logstash就知道从哪个kafka的哪个topic读取数据了;
  • 在output中配置了es的hosts和index信息,有了这两项配置,Logstash就知道数据写到哪个es的哪个index里面去了。

启动Logstash:bin/logstash -f kafka.conf,确保没有错误。然后我们在kafka中创建上述配置文件中的topic,并写入一些数据:

-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test
>{"key1": "value1"}
>{"key2": "value2"}
>

如果没有出错的话,此时数据已经写入到es了,我们查看一下:

// 查看索引
-> % curl "http://localhost:9200/_cat/indices?v"
health status index                     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   logstash-kafka-2019.09.19 TgpGY4xgT4OMBgEKfbnDbQ   5   1          2            0      8.2kb          8.2kb

// 查看索引里面的数据
-> % curl "http://localhost:9200/logstash-kafka-2019.09.19/_search?pretty"
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-kafka-2019.09.19",
        "_type" : "doc",
        "_id" : "t7O8SG0BpqLLPSZL6Tw4",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2019-09-19T14:56:42.617Z",
          "message" : "{\"key1\": \"value1\"}"
        }
      },
      {
        "_index" : "logstash-kafka-2019.09.19",
        "_type" : "doc",
        "_id" : "uLO9SG0BpqLLPSZLojxO",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2019-09-19T14:57:30.328Z",
          "message" : "{\"key2\": \"value2\"}"
        }
      }
    ]
  }
}

没问题,索引创建出来了,并且数据也写进去了。这样基本功能就算完成了。kafka插件还提供了许多自定义的配置项,我结合一些实际场景来介绍一下。

2. 一些有用的配置项

2.1 反序列化JSON

es是按照json格式存储数据的,上面的例子中,我们输入到kafka的数据是json格式的,但是经Logstash写入到es之后,整条数据变成一个字符串存储到message字段里面了。如果我们想要保持原来的json格式写入到es,只需要在input里面再加一条配置项:codec => "json".

2.2 并行传输

Logstash的input读取数的时候可以多线程并行读取,logstash-input-kafka插件中对应的配置项是consumer_threads,默认值为1。一般这个默认值不是最佳选择,那这个值该配置多少呢?这个需要对kafka的模型有一定了解:

  • kafka的topic是分区的,数据存储在每个分区内;
  • kafka的consumer是分组的,任何一个consumer属于某一个组,一个组可以包含多个consumer,同一个组内的consumer不会重复消费的同一份数据。

所以,对于kafka的consumer,一般最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,这样consumer就会均分topic的分区,达到比较好的均衡效果。举个例子,比如一个topic有n个分区,consumer有m个线程。那最佳场景就是n=m,此时一个线程消费一个分区。如果n小于m,即线程数多于分区数,那多出来的线程就会空闲。如果n大于m,那就会存在一些线程同时消费多个分区的数据,造成线程间负载不均衡。所以,一般consumer_threads配置为你消费的topic的所包含的partition个数即可。如果有多个Logstash实例,那就让实例个数 * consumer_threads等于分区数即可。

消费者组名可以通过group_id配置,默认值为logstash

2.3 如何避免重复数据

有些业务场景可能不能忍受重复数据,有一些配置项可以帮我们在一定程度上解决问题。这里需要先梳理一下可能造成重复数据的场景:

  1. 数据产生的时候就有重复,业务想对重复数据去重(注意是去重,不是merge)。
  2. 数据写入到Kafka时没有重复,但后续流程可能因为网络抖动、传输失败等导致重试造成数据重复。

对于第1种场景,只要原始数据中有唯一字段就可以去重;对于第2种场景,不需要依赖业务数据就可以去重。去重的原理也很简单,利用es document id即可。对于es,如果写入数据时没有指定document id,就会随机生成一个uuid,如果指定了,就使用指定的值。对于需要去重的场景,我们指定document id即可。在output elasticsearch中可以通过document_id字段指定document id。对于场景1非常简单,指定业务中的惟一字段为document id即可。主要看下场景2。

对于场景2,我们需要构造出一个“uuid”能惟一标识kafka中的一条数据,这个也非常简单:<topic>+<partition>+<offset>,这三个值的组合就可以惟一标识kafka集群中的一条数据。input kafka插件也已经帮我们把消息对应的元数据信息记录到了@metadata(Logstash的元数据字段,不会输出到output里面去)字段里面:

  • [@metadata][kafka][topic]:索引信息
  • [@metadata][kafka][consumer_group]:消费者组信息
  • [@metadata][kafka][partition]:分区信息
  • [@metadata][kafka][offset]:offset信息
  • [@metadata][kafka][key]:消息的key(如果有的话)
  • [@metadata][kafka][timestamp]:时间戳信息(消息创建的时间或者broker收到的时间)

所以,就可以这样配置document id了:

document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"

当然,如果每条kafka消息都有一个唯一的uuid的话,也可以在写入kafka的时候,将其写为key,然后这里就可以使用[@metadata][kafka][key]作为document id了。

最后一定要注意,只有当decorate_events选项配置为true的时候,上面的@metadata才会记录那些元数据,否则不会记录。而该配置项的默认值是false,即不记录。

现在我们把上面提到的那些配置都加入到 kafka.conf 里面去,再运行一遍看看效果。新的配置文件:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["nyc-test"]
    consumer_threads => 2
    group_id => "logstash"
    codec => "json"
    decorate_events => true
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "logstash-kafka-%{+YYYY.MM.dd}"
    document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
  }
}

之前创建"nyc-test"这个topic时,我建了两个partition。之前的测试中没有配置consumer_threads,所以使用了默认值1,可以在Logstash中看到如下日志:

[2019-09-19T22:54:48,207][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-1, nyc-test-0]

因为只有一个consumer,所以两个分区都分给了它。这次我们将consumer_threads设置成了2,看下效果:

[2019-09-19T23:23:52,981][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-0]
[2019-09-19T23:23:52,982][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [nyc-test-1]

有两个线程,即两个consumer,所以各分到一个partition。

然后我们再写入两条数据:

-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test
>{"key1": "value1"}
>{"key2": "value2"}
// 新写两条数据
>{"key3": "value3"}
>{"key4":{"key5": "value5"}}

然后查ES:

{
    "_index" : "logstash-kafka-2019.09.19",
    "_type" : "doc",
    "_id" : "nyc-test-1-1",
    "_score" : 1.0,
    "_source" : {
      "key3" : "value3",
      "@version" : "1",
      "@timestamp" : "2019-09-19T15:18:05.971Z"
    }
},
{
    "_index" : "logstash-kafka-2019.09.19",
    "_type" : "doc",
    "_id" : "nyc-test-0-1",
    "_score" : 1.0,
    "_source" : {
        "@timestamp" : "2019-09-19T15:19:00.183Z",
        "key4" : {
        "key5" : "value5"
        },
        "@version" : "1"
    }
}

可以看到新写的两条数据的document id已经变了,而且消息内容也成json了,而不是之前作为一个字符串放在message字段中。

另外,大家有兴趣可以试一下使用[@metadata][kafka][key]做document id的情况。参考下面的方式指定kafka消息key(默认没有指定的话,key就是null):

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test --property "parse.key=true" --property "key.separator=:"

当然,还有很多其它配置项,但大多不需要我们更改。有兴趣的可以查看文末链接。

References

]]>