组件安装这里就不赘述了,主要说一下操作流程和需要注意的一些细节。
Logstash工作的流程由三部分组成:
所以在我们的场景里面,input就是kafka,output就是es。至于是否需要filter,看你的场景需不需要对input的数据做transform了,本文没有使用filter。input需要使用logstash-input-kafka
插件,该插件logstash默认就带了,可以使用bin/logstash-plugin list | grep kafka
命令确认。
创建一个的Logstash配置文件 kafka.conf,内容如下:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["nyc-test"]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-kafka-%{+YYYY.MM.dd}"
}
}
这个配置文件非常简单,
启动Logstash:bin/logstash -f kafka.conf
,确保没有错误。然后我们在kafka中创建上述配置文件中的topic,并写入一些数据:
-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test
>{"key1": "value1"}
>{"key2": "value2"}
>
如果没有出错的话,此时数据已经写入到es了,我们查看一下:
// 查看索引
-> % curl "http://localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open logstash-kafka-2019.09.19 TgpGY4xgT4OMBgEKfbnDbQ 5 1 2 0 8.2kb 8.2kb
// 查看索引里面的数据
-> % curl "http://localhost:9200/logstash-kafka-2019.09.19/_search?pretty"
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "logstash-kafka-2019.09.19",
"_type" : "doc",
"_id" : "t7O8SG0BpqLLPSZL6Tw4",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"@timestamp" : "2019-09-19T14:56:42.617Z",
"message" : "{\"key1\": \"value1\"}"
}
},
{
"_index" : "logstash-kafka-2019.09.19",
"_type" : "doc",
"_id" : "uLO9SG0BpqLLPSZLojxO",
"_score" : 1.0,
"_source" : {
"@version" : "1",
"@timestamp" : "2019-09-19T14:57:30.328Z",
"message" : "{\"key2\": \"value2\"}"
}
}
]
}
}
没问题,索引创建出来了,并且数据也写进去了。这样基本功能就算完成了。kafka插件还提供了许多自定义的配置项,我结合一些实际场景来介绍一下。
es是按照json格式存储数据的,上面的例子中,我们输入到kafka的数据是json格式的,但是经Logstash写入到es之后,整条数据变成一个字符串存储到message
字段里面了。如果我们想要保持原来的json格式写入到es,只需要在input里面再加一条配置项:codec => "json"
.
Logstash的input读取数的时候可以多线程并行读取,logstash-input-kafka
插件中对应的配置项是consumer_threads
,默认值为1。一般这个默认值不是最佳选择,那这个值该配置多少呢?这个需要对kafka的模型有一定了解:
所以,对于kafka的consumer,一般最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,这样consumer就会均分topic的分区,达到比较好的均衡效果。举个例子,比如一个topic有n个分区,consumer有m个线程。那最佳场景就是n=m,此时一个线程消费一个分区。如果n小于m,即线程数多于分区数,那多出来的线程就会空闲。如果n大于m,那就会存在一些线程同时消费多个分区的数据,造成线程间负载不均衡。所以,一般consumer_threads
配置为你消费的topic的所包含的partition个数即可。如果有多个Logstash实例,那就让实例个数 * consumer_threads
等于分区数即可。
消费者组名可以通过group_id
配置,默认值为logstash
。
有些业务场景可能不能忍受重复数据,有一些配置项可以帮我们在一定程度上解决问题。这里需要先梳理一下可能造成重复数据的场景:
对于第1种场景,只要原始数据中有唯一字段就可以去重;对于第2种场景,不需要依赖业务数据就可以去重。去重的原理也很简单,利用es document id即可。对于es,如果写入数据时没有指定document id,就会随机生成一个uuid,如果指定了,就使用指定的值。对于需要去重的场景,我们指定document id即可。在output elasticsearch中可以通过document_id
字段指定document id。对于场景1非常简单,指定业务中的惟一字段为document id即可。主要看下场景2。
对于场景2,我们需要构造出一个“uuid”能惟一标识kafka中的一条数据,这个也非常简单:<topic>+<partition>+<offset>
,这三个值的组合就可以惟一标识kafka集群中的一条数据。input kafka插件也已经帮我们把消息对应的元数据信息记录到了@metadata
(Logstash的元数据字段,不会输出到output里面去)字段里面:
[@metadata][kafka][topic]
:索引信息[@metadata][kafka][consumer_group]
:消费者组信息[@metadata][kafka][partition]
:分区信息[@metadata][kafka][offset]
:offset信息[@metadata][kafka][key]
:消息的key(如果有的话)[@metadata][kafka][timestamp]
:时间戳信息(消息创建的时间或者broker收到的时间)所以,就可以这样配置document id了:
document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
当然,如果每条kafka消息都有一个唯一的uuid的话,也可以在写入kafka的时候,将其写为key,然后这里就可以使用[@metadata][kafka][key]
作为document id了。
最后一定要注意,只有当decorate_events
选项配置为true的时候,上面的@metadata才会记录那些元数据,否则不会记录。而该配置项的默认值是false,即不记录。
现在我们把上面提到的那些配置都加入到 kafka.conf 里面去,再运行一遍看看效果。新的配置文件:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["nyc-test"]
consumer_threads => 2
group_id => "logstash"
codec => "json"
decorate_events => true
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "logstash-kafka-%{+YYYY.MM.dd}"
document_id => "%{[@metadata][kafka][topic]}-%{[@metadata][kafka][partition]}-%{[@metadata][kafka][offset]}"
}
}
之前创建"nyc-test"这个topic时,我建了两个partition。之前的测试中没有配置consumer_threads
,所以使用了默认值1,可以在Logstash中看到如下日志:
[2019-09-19T22:54:48,207][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-1, nyc-test-0]
因为只有一个consumer,所以两个分区都分给了它。这次我们将consumer_threads
设置成了2,看下效果:
[2019-09-19T23:23:52,981][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [nyc-test-0]
[2019-09-19T23:23:52,982][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [nyc-test-1]
有两个线程,即两个consumer,所以各分到一个partition。
然后我们再写入两条数据:
-> % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test
>{"key1": "value1"}
>{"key2": "value2"}
// 新写两条数据
>{"key3": "value3"}
>{"key4":{"key5": "value5"}}
然后查ES:
{
"_index" : "logstash-kafka-2019.09.19",
"_type" : "doc",
"_id" : "nyc-test-1-1",
"_score" : 1.0,
"_source" : {
"key3" : "value3",
"@version" : "1",
"@timestamp" : "2019-09-19T15:18:05.971Z"
}
},
{
"_index" : "logstash-kafka-2019.09.19",
"_type" : "doc",
"_id" : "nyc-test-0-1",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2019-09-19T15:19:00.183Z",
"key4" : {
"key5" : "value5"
},
"@version" : "1"
}
}
可以看到新写的两条数据的document id已经变了,而且消息内容也成json了,而不是之前作为一个字符串放在message字段中。
另外,大家有兴趣可以试一下使用[@metadata][kafka][key]
做document id的情况。参考下面的方式指定kafka消息key(默认没有指定的话,key就是null):
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nyc-test --property "parse.key=true" --property "key.separator=:"
当然,还有很多其它配置项,但大多不需要我们更改。有兴趣的可以查看文末链接。
@timestamp
,默认该字段存储的是Logstash收到消息/事件(event)的时间。很多时候我们用ELK是处理日志的,日志里面一般都是有时间的。而且很多时候我们只关注日志里面的时间,而不关注Logstash收到这条日志的时间。这个时候,一种方法是再增加一个字段,用来存储日志里面的时间,这种很简单;另一种方法是使用日志中的时间替换掉@timestamp
字段默认的时间。本文介绍第二种方法并总结一些关键知识点。现在有如下一条日志:
2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider
我们先用最简单的Logstash规则解析,规则文件test.conf如下:
# 从标准输入读入数据
input { stdin {} }
# 只解析时间戳,其它信息不管
filter {
grok {
match => { "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})" }
}
}
# 输出到标准输出
output {
stdout { codec => rubydebug }
}
启动Logstash:bin/logstash -f test.conf
在标准输入粘贴上面的日志并回车,输出如下:
{
"host" => "NiYanchuns-MacBook-Air.local",
"@timestamp" => 2018-10-18T12:20:51.603Z,
"timestamp" => "2018-02-26 15:48:32.708",
"message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider",
"@version" => "1"
}
这里需要注意以下几个点:
@timestamp
字段是内置的,和之前说的一样,时间是Logstash收到消息的时间,而且注意使用的是UTC时间,我电脑的时间是北京时间晚上八点多。timestamp
字段是我们解析规则里面定义的字段(字段名随便起,不能有特殊符号,比如@
),该字段存储的是从日志里面解析出来的时间,注意这个时间格式完全是日志里面时间的格式。要注意我们加的timestamp
和系统内置的@timestamp
不是同一个字段。message
:也是程序内置字段,内容为消息原始内容。既然如此,那我们能不能通过自定义一个与程序内置的@timestamp
同名的字段来覆盖掉程序内置字段呢?比如将上面解析规则里面的timestamp
改为@timestamp
。
答案是不行。刚才已经说了,系统内置字段名前可以加@
,但我们定义字段的时候字段名不能加@
,也就是说我们不能通过定义一个@timestamp
字段覆盖掉系统默认的,那样配置语法检查就通不过。有(bu)兴(xin)趣(xie)的可以试一下。
那如何做呢?Logstash提供了一个Date插件可以实现该功能,使用文档见这里。我们修改一下刚才的规则:
input { stdin {} }
filter {
grok {
match => { "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})" }
}
date {
match => [ "timestamp", "ISO8601" ]
}
}
output {
stdout { codec => rubydebug }
}
重启Logstash后,输入日志后,解析输出如下:
{
"@version" => "1",
"host" => "NiYanchuns-MacBook-Air.local",
"timestamp" => "2018-02-26 15:48:32.708",
"@timestamp" => 2018-02-26T07:48:32.708Z,
"message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider"
}
OK,@timestamp
字段里面的值已经和timestamp
字段很像了,但不完全一样。Logstash将timestamp
的时间根据系统的时区转换为UTC时间存到了@timestamp
字段里面。
这个时候timestamp
就成多余的了,我们可以通过mutate
插件移除该字段。再次修改解析规则文件:
input { stdin {} }
filter {
grok {
match => { "message" => "(?<timestamp>%{TIMESTAMP_ISO8601})" }
}
date {
match => [ "timestamp", "ISO8601" ]
}
mutate {
remove_field => [ "ts_tmp","date", "time", "end_time_tmp", "end_time_tmp1" ]
}
}
output {
stdout { codec => rubydebug }
}
重启Logstash后,输入日志后,解析输出如下:
{
"message" => "2018-02-26 15:48:32.708-[INFO ] main RestfulApiProvider - initializing restful api provider",
"@version" => "1",
"@timestamp" => 2018-02-26T07:48:32.708Z,
"host" => "NiYanchuns-MacBook-Air.local"
}
timestamp
字段已经在输出中去掉了。
我们简单介绍一下Date
插件。Date常见的配置如下:
date {
match => [ "time_field", "yyyyMMdd HH:mm:ss.SSS" ]
# timezone => "UTC"
target => "end_time"
}
上述配置的含义是,将time_field
字段按照yyyyMMdd HH:mm:ss.SSS
格式解析后存到target指定的字段end_time
字段去。time_field
必须是已经定义的字段,最常见的就是在grok里面解析出来的某个时间字段。时间格式可查看Date插件的文档。如果没有指定target,默认就是@timestamp
字段,这就是为什么我们可以使用该插件来修改@timestamp
字段值的原因。
另外,timezone
字段在某些场景下也非常重要,如果从时间的值里面解析不出来时区,而且我们也没有指定时区的话,程序就会认为我们的时间字段的时区就是系统所处时区。比如上面从timestamp
转到@timestamp
的时候,时间值里面没有时区,所以使用了系统的时区东八区。当然,我们可以使用该字段指定时区。
OK,本文就到这里,干(hong)正(wo)事(wa)去了。
]]>