NYC's Blog - Logstash 2019-09-28T17:55:55+08:00 Typecho http://niyanchun.com/feed/atom/tag/logstash/ <![CDATA[使用Logstash将Kafka中的数据导入到ElasticSearch]]> http://niyanchun.com/export-data-from-kafka-to-es-with-logstash.html 2019-09-28T17:55:55+08:00 2019-09-28T17:55:55+08:00 NYC https://niyanchun.com 本文介绍如何使用Logstash将Kafka中的数据写入到ElasticSearch。使用的各个组件版本如下:

  • kafka_2.11-2.2.0
  • elasticsearch 6.8.3
  • logstash 6.8.3

组件安装这里就不赘述了,主要说一下操作流程和需要注意的一些细节。

Logstash工作的流程由三部分组成:

  • input:输入(即source),必须要有,指明从那里读取数据。
  • filter:过滤,logstash对数据的ETL就是在这个里面进行的,这一步可选。
  • output:输出(即sink),必须要有,指明数据写入到哪里去。

所以在我们的场景里面,input就是kafka,output就是es。至于是否需要filter,看你的场景需不需要对input的数据做transform了,本文没有使用filter。input需要使用logstash-input-kafka插件,该插件logstash默认就带了,可以使用bin/logstash-plugin list | grep kafka命令确认。

1. 基本功能

创建一个的Logstash配置文件 kafka.conf,内容如下:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["nyc-test"]
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "logstash-kafka-%{+YYYY.MM.dd}"
  }
}

这个配置文件非常简单,

  • 在input中配置了kafka的broker和topic信息,有了这两项配置,Logstash就知道从哪个kafka的哪个topic读取数据了;
  • 在output中配置了es的hosts和index信息,有了这两项配置,Logstash就知道数据写到哪个es的哪个index里面去了。

启动Logstash:bin/logstash -f kafka.conf,确保没有错误。然后我们在kafka中创建上述配置文件中的topic,并写入一些数据:

-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test
>{"key1": "value1"}
>{"key2": "value2"}
>

如果没有出错的话,此时数据已经写入到es了,我们查看一下:

// 查看索引
-> % curl "http://localhost:9200/_cat/indices?v"
health status index                     uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   logstash-kafka-2019.09.19 TgpGY4xgT4OMBgEKfbnDbQ   5   1          2            0      8.2kb          8.2kb

// 查看索引里面的数据
-> % curl "http://localhost:9200/logstash-kafka-2019.09.19/_search?pretty"
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "logstash-kafka-2019.09.19",
        "_type" : "doc",
        "_id" : "t7O8SG0BpqLLPSZL6Tw4",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2019-09-19T14:56:42.617Z",
          "message" : "{\"key1\": \"value1\"}"
        }
      },
      {
        "_index" : "logstash-kafka-2019.09.19",
        "_type" : "doc",
        "_id" : "uLO9SG0BpqLLPSZLojxO",
        "_score" : 1.0,
        "_source" : {
          "@version" : "1",
          "@timestamp" : "2019-09-19T14:57:30.328Z",
          "message" : "{\"key2\": \"value2\"}"
        }
      }
    ]
  }
}

没问题,索引创建出来了,并且数据也写进去了。这样基本功能就算完成了。kafka插件还提供了许多自定义的配置项,我结合一些实际场景来介绍一下。

2. 一些有用的配置项

2.1 反序列化JSON

es是按照json格式存储数据的,上面的例子中,我们输入到kafka的数据是json格式的,但是经Logstash写入到es之后,整条数据变成一个字符串存储到message字段里面了。如果我们想要保持原来的json格式写入到es,只需要在input里面再加一条配置项:codec => "json".

2.2 并行传输

Logstash的input读取数的时候可以多线程并行读取,logstash-input-kafka插件中对应的配置项是consumer_threads,默认值为1。一般这个默认值不是最佳选择,那这个值该配置多少呢?这个需要对kafka的模型有一定了解:

  • kafka的topic是分区的,数据存储在每个分区内;
  • kafka的consumer是分组的,任何一个consumer属于某一个组,一个组可以包含多个consumer,同一个组内的consumer不会重复消费的同一份数据。

所以,对于kafka的consumer,一般最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,这样consumer就会均分topic的分区,达到比较好的均衡效果。举个例子,比如一个topic有n个分区,consumer有m个线程。那最佳场景就是n=m,此时一个线程消费一个分区。如果n小于m,即线程数多于分区数,那多出来的线程就会空闲。如果n大于m,那就会存在一些线程同时消费多个分区的数据,造成线程间负载不均衡。所以,一般consumer_threads配置为你消费的topic的所包含的partition个数即可。如果有多个Logstash实例,那就让实例个数 * consumer_threads等于分区数即可。

消费者组名可以通过group_id配置,默认值为logstash

2.3 如何避免重复数据

有些业务场景可能不能忍受重复数据,有一些配置项可以帮我们在一定程度上解决问题。这里需要先梳理一下可能造成重复数据的场景:

  1. 数据产生的时候就有重复,业务想对重复数据去重(注意是去重,不是merge)。
  2. 数据写入到Kafka时没有重复,但后续流程可能因为网络抖动、传输失败等导致重试造成数据重复。

对于第1种场景,只要原始数据中有唯一字段就可以去重;对于第2种场景,不需要依赖业务数据就可以去重。去重的原理也很简单,利用es document id即可。对于es,如果写入数据时没有指定document id,就会随机生成一个uuid,如果指定了,就使用指定的值。对于需要去重的场景,我们指定document id即可。在output elasticsearch中可以通过document_id字段指定document id。对于场景1非常简单,指定业务中的惟一字段为document id即可。主要看下场景2。

对于场景2,我们需要构造出一个“uuid”能惟一标识kafka中的一条数据,这个也非常简单:<topic>+<partition>+<offset>,这三个值的组合就可以惟一标识kafka集群中的一条数据。input kafka插件也已经帮我们把消息对应的元数据信息记录到了@metadata(Logstash的元数据字段,不会输出到output里面去)字段里面:

  • [@metadata][kafka][topic]:索引信息
  • [@metadata][kafka][consumer_group]:消费者组信息
  • [@metadata][kafka][partition]:分区信息
  • [@metadata][kafka][offset]:offset信息
  • [@metadata][kafka][key]:消息的key(如果有的话)
  • [@metadata][kafka][timestamp]:时间戳信息(消息创建的时间或者broker收到的时间)

所以,就可以这样配置document id了:

document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"

当然,如果每条kafka消息都有一个唯一的uuid的话,也可以在写入kafka的时候,将其写为key,然后这里就可以使用[@metadata][kafka][key]作为document id了。

最后一定要注意,只有当decorate_events选项配置为true的时候,上面的@metadata才会记录那些元数据,否则不会记录。而该配置项的默认值是false,即不记录。

现在我们把上面提到的那些配置都加入到 kafka.conf 里面去,再运行一遍看看效果。新的配置文件:

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["nyc-test"]
    consumer_threads => 2
    group_id => "logstash"
    codec => "json"
    decorate_events => true
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "logstash-kafka-%{+YYYY.MM.dd}"
    document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
  }
}

之前创建"nyc-test"这个topic时,我建了两个partition。之前的测试中没有配置consumer_threads,所以使用了默认值1,可以在Logstash中看到如下日志:

[2019-09-19T22:54:48,207][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-1, nyc-test-0]

因为只有一个consumer,所以两个分区都分给了它。这次我们将consumer_threads设置成了2,看下效果:

[2019-09-19T23:23:52,981][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-0]
[2019-09-19T23:23:52,982][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [nyc-test-1]

有两个线程,即两个consumer,所以各分到一个partition。

然后我们再写入两条数据:

-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test
>{"key1": "value1"}
>{"key2": "value2"}
// 新写两条数据
>{"key3": "value3"}
>{"key4":{"key5": "value5"}}

然后查ES:

{
    "_index" : "logstash-kafka-2019.09.19",
    "_type" : "doc",
    "_id" : "nyc-test-1-1",
    "_score" : 1.0,
    "_source" : {
      "key3" : "value3",
      "@version" : "1",
      "@timestamp" : "2019-09-19T15:18:05.971Z"
    }
},
{
    "_index" : "logstash-kafka-2019.09.19",
    "_type" : "doc",
    "_id" : "nyc-test-0-1",
    "_score" : 1.0,
    "_source" : {
        "@timestamp" : "2019-09-19T15:19:00.183Z",
        "key4" : {
        "key5" : "value5"
        },
        "@version" : "1"
    }
}

可以看到新写的两条数据的document id已经变了,而且消息内容也成json了,而不是之前作为一个字符串放在message字段中。

另外,大家有兴趣可以试一下使用[@metadata][kafka][key]做document id的情况。参考下面的方式指定kafka消息key(默认没有指定的话,key就是null):

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test --property "parse.key=true" --property "key.separator=:"

当然,还有很多其它配置项,但大多不需要我们更改。有兴趣的可以查看文末链接。

References

]]>
<![CDATA[修改Logstash的@timestamp字段为业务时间]]> http://niyanchun.com/modify-attimestamp-field-in-logstash.html 2018-10-18T21:08:00+08:00 2018-10-18T21:08:00+08:00 NYC https://niyanchun.com Logstash在处理数据的时候,会自动生成一个字段@timestamp,默认该字段存储的是Logstash收到消息/事件(event)的时间。很多时候我们用ELK是处理日志的,日志里面一般都是有时间的。而且很多时候我们只关注日志里面的时间,而不关注Logstash收到这条日志的时间。这个时候,一种方法是再增加一个字段,用来存储日志里面的时间,这种很简单;另一种方法是使用日志中的时间替换掉@timestamp字段默认的时间。本文介绍第二种方法并总结一些关键知识点。

现在有如下一条日志:

2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider

我们先用最简单的Logstash规则解析,规则文件test.conf如下:

# 从标准输入读入数据
input { stdin {} }            

# 只解析时间戳,其它信息不管
filter {
    grok {
        match => {  "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})"  }
    }
}

# 输出到标准输出
output {
 stdout { codec => rubydebug }
}

启动Logstash:bin/logstash -f test.conf

在标准输入粘贴上面的日志并回车,输出如下:

{
    "host" => "NiYanchuns-MacBook-Air.local",
    "@timestamp" => 2018-10-18T12:20:51.603Z,
    "timestamp" => "2018-02-26 15:48:32.708",
    "message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider",
    "@version" => "1"
}

这里需要注意以下几个点:

  • 解析规则里面的grok是Logstash的一个正则解析插件,Logstash强大的解析能力主要来自于它,使用文档看这里
  • @timestamp字段是内置的,和之前说的一样,时间是Logstash收到消息的时间,而且注意使用的是UTC时间,我电脑的时间是北京时间晚上八点多。
  • timestamp字段是我们解析规则里面定义的字段(字段名随便起,不能有特殊符号,比如@),该字段存储的是从日志里面解析出来的时间,注意这个时间格式完全是日志里面时间的格式。要注意我们加的timestamp和系统内置的@timestamp不是同一个字段。
  • message:也是程序内置字段,内容为消息原始内容。

既然如此,那我们能不能通过自定义一个与程序内置的@timestamp同名的字段来覆盖掉程序内置字段呢?比如将上面解析规则里面的timestamp改为@timestamp

答案是不行。刚才已经说了,系统内置字段名前可以加@,但我们定义字段的时候字段名不能加@,也就是说我们不能通过定义一个@timestamp字段覆盖掉系统默认的,那样配置语法检查就通不过。有(bu)兴(xin)趣(xie)的可以试一下。

那如何做呢?Logstash提供了一个Date插件可以实现该功能,使用文档见这里。我们修改一下刚才的规则:

input { stdin {} }

filter {
    grok {
        match => {  "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})"  }
    }
    date {
        match => [ "timestamp", "ISO8601" ]
    }
}

output {
 stdout { codec => rubydebug }
}

重启Logstash后,输入日志后,解析输出如下:

{
    "@version" => "1",
    "host" => "NiYanchuns-MacBook-Air.local",
    "timestamp" => "2018-02-26 15:48:32.708",
    "@timestamp" => 2018-02-26T07:48:32.708Z,
    "message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider"
}

OK,@timestamp字段里面的值已经和timestamp字段很像了,但不完全一样。Logstash将timestamp的时间根据系统的时区转换为UTC时间存到了@timestamp字段里面。

这个时候timestamp就成多余的了,我们可以通过mutate插件移除该字段。再次修改解析规则文件:

input { stdin {} }

filter {
    grok {
        match => {  "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})"  }
    }
    date {
        match => [ "timestamp", "ISO8601" ]
    }
    mutate {
         remove_field => [ "ts_tmp","date", "time", "end_time_tmp", "end_time_tmp1" ]
    }
}

output {
 stdout { codec => rubydebug }
}

重启Logstash后,输入日志后,解析输出如下:

{
    "message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider",
    "@version" => "1",
    "@timestamp" => 2018-02-26T07:48:32.708Z,
    "host" => "NiYanchuns-MacBook-Air.local"
}

timestamp字段已经在输出中去掉了。

我们简单介绍一下Date插件。Date常见的配置如下:

date {
    match => [ "time_field", "yyyyMMdd HH:mm:ss.SSS" ]
    # timezone => "UTC"
    target => "end_time"
}

上述配置的含义是,将time_field字段按照yyyyMMdd HH:mm:ss.SSS格式解析后存到target指定的字段end_time字段去。time_field必须是已经定义的字段,最常见的就是在grok里面解析出来的某个时间字段。时间格式可查看Date插件的文档。如果没有指定target,默认就是@timestamp字段,这就是为什么我们可以使用该插件来修改@timestamp字段值的原因。

另外,timezone字段在某些场景下也非常重要,如果从时间的值里面解析不出来时区,而且我们也没有指定时区的话,程序就会认为我们的时间字段的时区就是系统所处时区。比如上面从timestamp转到@timestamp的时候,时间值里面没有时区,所以使用了系统的时区东八区。当然,我们可以使用该字段指定时区。

OK,本文就到这里,干(hong)正(wo)事(wa)去了。

]]>