0x01 前言

之前我在服务器上配置了Elastic Stack这个技术栈,经过2周的使用,真觉得用它来处理日志真是棒极了。

它不但可以处理nginx日志,还可以处理WAF、IDS甚至防火墙的日志都没问题。只要日志有固定的格式,几乎没有它处理不了的。

0x02 准备

首先要知道你现在所用的nginx日志格式,可以参考以下文章:

nginx日志格式错误修复

我的日志格式是:

'$remote_addr - $remote_user [$time_local] "$request" $http_host ' 
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'$upstream_addr $upstream_status $upstream_cache_status "$upstream_http_content_type" $upstream_response_time > $request_time';

生成之后是这样的:

119.139.58.12 - - [27/Nov/2016:08:25:23 +0600] "GET / HTTP/2.0" proj.org.cn 200 158441 "https://ngx.hk/?s=elk" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36" "-" 127.0.0.1:8080 200 MISS "text/html; charset=UTF-8" 1.884 > 2.189

然后需要准备ELK5,安装配置过程请参考以下文章:

安装配置Elastic Stack 5

0x03 匹配

我通过filebeat读取日志后传送至logstash进行处理,处理完成再保存在elasticsearch中。其中最重要的一步就是logstash的处理,我们需要根据日志的格式编写相关的匹配代码,以便logstash进行匹配处理。

在这里我使用过滤插件中的Grok插件,具体技术文档请点击以下链接:

Filter plugins » grok

你还需要以下这个网站对你所编写的过滤匹配代码进行debug:

grok debuger

我上面的日志的过滤代码如下:

%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{IPORHOST:domain}|%{URIHOST:domain}|-) %{NUMBER:response} %{NUMBER:bytes} %{QS:referrer} %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}|-) > %{BASE16FLOAT:request_time}

如果一些顺利,在debuger中就可以生成正确的信息:

1480215162

你还需要了解相关的语法,这个可以通过以下链接获取:

grok-patterns

0x04 logstash

完成相关的过滤代码后,还需要将代码添加到logstash中。如果你依据我的安装过程安装了logstash,那么,你需要在以下文件夹新建文件:

#进入文件夹
[root@web ~]# cd /etc/logstash/conf.d/

里面应该有一个有配置监听信息的文件:

[root@web conf.d]# cat 00-logstash-listen-5044.conf 
input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    user => "elastic"
    password => "changeme"
    manage_template => true
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
    document_type => "%{[@metadata][type]}"
  }
}

我们需要添加一个新的文件:

#新建文件
[root@web conf.d]# vim 13-proj.org.cn-fliter.conf

#填入内容
filter {
  if "hk1_server_ngx_access_log" in [tags] {
    grok {
      match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{IPORHOST:domain}|%{URIHOST:domain}|-) %{NUMBER:response} %{NUMBER:bytes} %{QS:referrer} %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}|-) > %{BASE16FLOAT:request_time}" }

      match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }

      match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(%{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }

      match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"((%{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)|)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }

      match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" }

    }

    geoip {
      source => "client_ip"
    }

    date {
      match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ]
    }

    useragent {
      source => "agent"
      target => "ua"
    }
  }
}

上面这个配置文件我匹配了5种我所使用的nginx日志格式,其中还有一下配置信息:

  • geoip:将用户IP进行分析,得出大概GPS位置、所属国家、城市、电信服务商等信息;
  • date:将日志中的时间作为logstash处理的时间;
  • useragent:可是分离出用户所用的浏览器、操作系统、UA等信息。

配置文件中还有以下这句判断:

if "hk1_server_ngx_access_log" in [tags]

因为我将各种各样的日志都通过logstash分析,所以在filebeat添加了自定义tags以便区分不同的log:

[root@web conf.d]# cat /etc/filebeat/filebeat.yml

########################################
- input_type: log
  paths:
    - /usr/local/html/*/logs/ngx_access.log
    - /var/log/nginx/access.log
  tags: hk1_server_ngx_access_log
########################################

完成上面这一切后,请通过以下命令测试并重新加载logstash:

[root@web conf.d]# /usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/ --path.settings /etc/logstash/ --config.reload.automatic

0x05 检查

重新加载的过程需要点时间,但完成后,即可通过kibana检查配置是否正确:

1480215163

0x06 结语

编写匹配代码是最麻烦的一步,要经过很多次的调整才能完美匹配。