NYC's Blog - Logstash http://niyanchun.com/tag/logstash/ 使用Logstash将Kafka中的数据导入到ElasticSearch http://niyanchun.com/export-data-from-kafka-to-es-with-logstash.html 2019-09-28T17:55:55+08:00 本文介绍如何使用Logstash将Kafka中的数据写入到ElasticSearch。使用的各个组件版本如下:kafka_2.11-2.2.0elasticsearch 6.8.3logstash 6.8.3组件安装这里就不赘述了,主要说一下操作流程和需要注意的一些细节。Logstash工作的流程由三部分组成:input:输入(即source),必须要有,指明从那里读取数据。filter:过滤,logstash对数据的ETL就是在这个里面进行的,这一步可选。output:输出(即sink),必须要有,指明数据写入到哪里去。所以在我们的场景里面,input就是kafka,output就是es。至于是否需要filter,看你的场景需不需要对input的数据做transform了,本文没有使用filter。input需要使用logstash-input-kafka插件,该插件logstash默认就带了,可以使用bin/logstash-plugin list | grep kafka命令确认。1. 基本功能创建一个的Logstash配置文件 kafka.conf,内容如下:input { kafka { bootstrap_servers => "localhost:9092" topics => ["nyc-test"] } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "logstash-kafka-%{+YYYY.MM.dd}" } }这个配置文件非常简单,在input中配置了kafka的broker和topic信息,有了这两项配置,Logstash就知道从哪个kafka的哪个topic读取数据了;在output中配置了es的hosts和index信息,有了这两项配置,Logstash就知道数据写到哪个es的哪个index里面去了。启动Logstash:bin/logstash -f kafka.conf,确保没有错误。然后我们在kafka中创建上述配置文件中的topic,并写入一些数据:-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test >{"key1": "value1"} >{"key2": "value2"} >如果没有出错的话,此时数据已经写入到es了,我们查看一下:// 查看索引 -> % curl "http://localhost:9200/_cat/indices?v" health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open logstash-kafka-2019.09.19 TgpGY4xgT4OMBgEKfbnDbQ 5 1 2 0 8.2kb 8.2kb // 查看索引里面的数据 -> % curl "http://localhost:9200/logstash-kafka-2019.09.19/_search?pretty" { "took" : 1, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "logstash-kafka-2019.09.19", "_type" : "doc", "_id" : "t7O8SG0BpqLLPSZL6Tw4", "_score" : 1.0, "_source" : { "@version" : "1", "@timestamp" : "2019-09-19T14:56:42.617Z", "message" : "{\"key1\": \"value1\"}" } }, { "_index" : "logstash-kafka-2019.09.19", "_type" : "doc", "_id" : "uLO9SG0BpqLLPSZLojxO", "_score" : 1.0, "_source" : { "@version" : "1", "@timestamp" : "2019-09-19T14:57:30.328Z", "message" : "{\"key2\": \"value2\"}" } } ] } }没问题,索引创建出来了,并且数据也写进去了。这样基本功能就算完成了。kafka插件还提供了许多自定义的配置项,我结合一些实际场景来介绍一下。2. 一些有用的配置项2.1 反序列化JSONes是按照json格式存储数据的,上面的例子中,我们输入到kafka的数据是json格式的,但是经Logstash写入到es之后,整条数据变成一个字符串存储到message字段里面了。如果我们想要保持原来的json格式写入到es,只需要在input里面再加一条配置项:codec => "json".2.2 并行传输Logstash的input读取数的时候可以多线程并行读取,logstash-input-kafka插件中对应的配置项是consumer_threads,默认值为1。一般这个默认值不是最佳选择,那这个值该配置多少呢?这个需要对kafka的模型有一定了解:kafka的topic是分区的,数据存储在每个分区内;kafka的consumer是分组的,任何一个consumer属于某一个组,一个组可以包含多个consumer,同一个组内的consumer不会重复消费的同一份数据。所以,对于kafka的consumer,一般最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,这样consumer就会均分topic的分区,达到比较好的均衡效果。举个例子,比如一个topic有n个分区,consumer有m个线程。那最佳场景就是n=m,此时一个线程消费一个分区。如果n小于m,即线程数多于分区数,那多出来的线程就会空闲。如果n大于m,那就会存在一些线程同时消费多个分区的数据,造成线程间负载不均衡。所以,一般consumer_threads配置为你消费的topic的所包含的partition个数即可。如果有多个Logstash实例,那就让实例个数 * consumer_threads等于分区数即可。消费者组名可以通过group_id配置,默认值为logstash。2.3 如何避免重复数据有些业务场景可能不能忍受重复数据,有一些配置项可以帮我们在一定程度上解决问题。这里需要先梳理一下可能造成重复数据的场景:数据产生的时候就有重复,业务想对重复数据去重(注意是去重,不是merge)。数据写入到Kafka时没有重复,但后续流程可能因为网络抖动、传输失败等导致重试造成数据重复。对于第1种场景,只要原始数据中有唯一字段就可以去重;对于第2种场景,不需要依赖业务数据就可以去重。去重的原理也很简单,利用es document id即可。对于es,如果写入数据时没有指定document id,就会随机生成一个uuid,如果指定了,就使用指定的值。对于需要去重的场景,我们指定document id即可。在output elasticsearch中可以通过document_id字段指定document id。对于场景1非常简单,指定业务中的惟一字段为document id即可。主要看下场景2。对于场景2,我们需要构造出一个“uuid”能惟一标识kafka中的一条数据,这个也非常简单:<topic>+<partition>+<offset>,这三个值的组合就可以惟一标识kafka集群中的一条数据。input kafka插件也已经帮我们把消息对应的元数据信息记录到了@metadata(Logstash的元数据字段,不会输出到output里面去)字段里面:[@metadata][kafka][topic]:索引信息[@metadata][kafka][consumer_group]:消费者组信息[@metadata][kafka][partition]:分区信息[@metadata][kafka][offset]:offset信息[@metadata][kafka][key]:消息的key(如果有的话)[@metadata][kafka][timestamp]:时间戳信息(消息创建的时间或者broker收到的时间)所以,就可以这样配置document id了:document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"当然,如果每条kafka消息都有一个唯一的uuid的话,也可以在写入kafka的时候,将其写为key,然后这里就可以使用[@metadata][kafka][key]作为document id了。最后一定要注意,只有当decorate_events选项配置为true的时候,上面的@metadata才会记录那些元数据,否则不会记录。而该配置项的默认值是false,即不记录。现在我们把上面提到的那些配置都加入到 kafka.conf 里面去,再运行一遍看看效果。新的配置文件:input { kafka { bootstrap_servers => "localhost:9092" topics => ["nyc-test"] consumer_threads => 2 group_id => "logstash" codec => "json" decorate_events => true } } output { elasticsearch { hosts => ["http://localhost:9200"] index => "logstash-kafka-%{+YYYY.MM.dd}" document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}" } }之前创建"nyc-test"这个topic时,我建了两个partition。之前的测试中没有配置consumer_threads,所以使用了默认值1,可以在Logstash中看到如下日志:[2019-09-19T22:54:48,207][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-1, nyc-test-0]因为只有一个consumer,所以两个分区都分给了它。这次我们将consumer_threads设置成了2,看下效果:[2019-09-19T23:23:52,981][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-0] [2019-09-19T23:23:52,982][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [nyc-test-1]有两个线程,即两个consumer,所以各分到一个partition。然后我们再写入两条数据:-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test >{"key1": "value1"} >{"key2": "value2"} // 新写两条数据 >{"key3": "value3"} >{"key4":{"key5": "value5"}}然后查ES:{ "_index" : "logstash-kafka-2019.09.19", "_type" : "doc", "_id" : "nyc-test-1-1", "_score" : 1.0, "_source" : { "key3" : "value3", "@version" : "1", "@timestamp" : "2019-09-19T15:18:05.971Z" } }, { "_index" : "logstash-kafka-2019.09.19", "_type" : "doc", "_id" : "nyc-test-0-1", "_score" : 1.0, "_source" : { "@timestamp" : "2019-09-19T15:19:00.183Z", "key4" : { "key5" : "value5" }, "@version" : "1" } }可以看到新写的两条数据的document id已经变了,而且消息内容也成json了,而不是之前作为一个字符串放在message字段中。另外,大家有兴趣可以试一下使用[@metadata][kafka][key]做document id的情况。参考下面的方式指定kafka消息key(默认没有指定的话,key就是null):bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test --property "parse.key=true" --property "key.separator=:"当然,还有很多其它配置项,但大多不需要我们更改。有兴趣的可以查看文末链接。Referenceshttps://www.elastic.co/guide/en/logstash/6.8/plugins-inputs-kafka.html 修改Logstash的@timestamp字段为业务时间 http://niyanchun.com/modify-attimestamp-field-in-logstash.html 2018-10-18T21:08:00+08:00 Logstash在处理数据的时候,会自动生成一个字段@timestamp,默认该字段存储的是Logstash收到消息/事件(event)的时间。很多时候我们用ELK是处理日志的,日志里面一般都是有时间的。而且很多时候我们只关注日志里面的时间,而不关注Logstash收到这条日志的时间。这个时候,一种方法是再增加一个字段,用来存储日志里面的时间,这种很简单;另一种方法是使用日志中的时间替换掉@timestamp字段默认的时间。本文介绍第二种方法并总结一些关键知识点。现在有如下一条日志:2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider我们先用最简单的Logstash规则解析,规则文件test.conf如下:# 从标准输入读入数据 input { stdin {} } # 只解析时间戳,其它信息不管 filter { grok { match => { "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})" } } } # 输出到标准输出 output { stdout { codec => rubydebug } }启动Logstash:bin/logstash -f test.conf在标准输入粘贴上面的日志并回车,输出如下:{ "host" => "NiYanchuns-MacBook-Air.local", "@timestamp" => 2018-10-18T12:20:51.603Z, "timestamp" => "2018-02-26 15:48:32.708", "message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider", "@version" => "1" }这里需要注意以下几个点:解析规则里面的grok是Logstash的一个正则解析插件,Logstash强大的解析能力主要来自于它,使用文档看这里。@timestamp字段是内置的,和之前说的一样,时间是Logstash收到消息的时间,而且注意使用的是UTC时间,我电脑的时间是北京时间晚上八点多。timestamp字段是我们解析规则里面定义的字段(字段名随便起,不能有特殊符号,比如@),该字段存储的是从日志里面解析出来的时间,注意这个时间格式完全是日志里面时间的格式。要注意我们加的timestamp和系统内置的@timestamp不是同一个字段。message:也是程序内置字段,内容为消息原始内容。既然如此,那我们能不能通过自定义一个与程序内置的@timestamp同名的字段来覆盖掉程序内置字段呢?比如将上面解析规则里面的timestamp改为@timestamp。答案是不行。刚才已经说了,系统内置字段名前可以加@,但我们定义字段的时候字段名不能加@,也就是说我们不能通过定义一个@timestamp字段覆盖掉系统默认的,那样配置语法检查就通不过。有(bu)兴(xin)趣(xie)的可以试一下。那如何做呢?Logstash提供了一个Date插件可以实现该功能,使用文档见这里。我们修改一下刚才的规则:input { stdin {} } filter { grok { match => { "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})" } } date { match => [ "timestamp", "ISO8601" ] } } output { stdout { codec => rubydebug } }重启Logstash后,输入日志后,解析输出如下:{ "@version" => "1", "host" => "NiYanchuns-MacBook-Air.local", "timestamp" => "2018-02-26 15:48:32.708", "@timestamp" => 2018-02-26T07:48:32.708Z, "message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider" }OK,@timestamp字段里面的值已经和timestamp字段很像了,但不完全一样。Logstash将timestamp的时间根据系统的时区转换为UTC时间存到了@timestamp字段里面。这个时候timestamp就成多余的了,我们可以通过mutate插件移除该字段。再次修改解析规则文件:input { stdin {} } filter { grok { match => { "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})" } } date { match => [ "timestamp", "ISO8601" ] } mutate { remove_field => [ "ts_tmp","date", "time", "end_time_tmp", "end_time_tmp1" ] } } output { stdout { codec => rubydebug } }重启Logstash后,输入日志后,解析输出如下:{ "message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider", "@version" => "1", "@timestamp" => 2018-02-26T07:48:32.708Z, "host" => "NiYanchuns-MacBook-Air.local" }timestamp字段已经在输出中去掉了。我们简单介绍一下Date插件。Date常见的配置如下:date { match => [ "time_field", "yyyyMMdd HH:mm:ss.SSS" ] # timezone => "UTC" target => "end_time" }上述配置的含义是,将time_field字段按照yyyyMMdd HH:mm:ss.SSS格式解析后存到target指定的字段end_time字段去。time_field必须是已经定义的字段,最常见的就是在grok里面解析出来的某个时间字段。时间格式可查看Date插件的文档。如果没有指定target,默认就是@timestamp字段,这就是为什么我们可以使用该插件来修改@timestamp字段值的原因。另外,timezone字段在某些场景下也非常重要,如果从时间的值里面解析不出来时区,而且我们也没有指定时区的话,程序就会认为我们的时间字段的时区就是系统所处时区。比如上面从timestamp转到@timestamp的时候,时间值里面没有时区,所以使用了系统的时区东八区。当然,我们可以使用该字段指定时区。OK,本文就到这里,干(hong)正(wo)事(wa)去了。