在logstash.conf配置中,可以使用ruby动态修改某个字段数据。
filter {
if [type] == "deployment" {
drop {}
}
mutate {
remove_field => ["kafka"]
}
ruby {
code => "
timestamp = event.get('@timestamp') #从字段中获取@timestamp字段,
localtime = timestamp.time + 28800 #加上8个小时偏差
localtimeStr = localtime.strftime('%Y.%m.%d')
event.set('localtime', localtimeStr) #保存最新时间
"
}
}
filter {
grok {
#match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) (?.*) "(?.*?) (?.*?)\?d=(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)" "(?.*?)'}
#match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) (?.*) "(?.*?) (?.*?)\?d=(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)".*?'}
match => {"message"=>'(?\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) - (?(\s+)|-) \[(?.*)\] "(?.*?) (?.*?)\?(d=)?(?.*?) (?\S+)" (?\d+) (?\d+) "(?.*?)" "(?.*?)".*?'}
}
if [tags]{
drop {}
}
if [status] != "200" {
drop {}
}
ruby {
init => "require 'base64'"
code => "
string = event.get('string')
if string
begin
b64 = Base64.decode64(string).force_encoding('utf-8')
#puts b64, event.get('message')
event.set('b64_decode', b64)
rescue ArgumentError
event.set('b64_decode', '')
end
else
event.set('b64_decode', '')
end
"
}
if [b64_decode == ""]{
drop {}
}
kv {
source => "b64_decode"
field_split => "&?"
value_split => "="
}
if [type] == "template" {
mutate {
remove_field => ["@timestamp", "@version", "b64_decode", "message", "string", "body_bytes_sent", "timelocal", "http_user_agent", "http_referer", "status", "protocol", "uri", "ver", "remote_user", "remote_addr", "host", "method", "path"]
}
} else {
date {
match => ["timelocal", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
mutate {
remove_field => ["@version", "b64_decode", "message", "string", "body_bytes_sent", "timelocal", "http_user_agent", "http_referer", "status", "protocol", "uri", "ver", "remote_user", "remote_addr", "host", "method", "path"]
}
}
}