0x01 前言
之前我在服务器上配置了Elastic Stack这个技术栈,经过2周的使用,真觉得用它来处理日志真是棒极了。
它不但可以处理nginx日志,还可以处理WAF、IDS甚至防火墙的日志都没问题。只要日志有固定的格式,几乎没有它处理不了的。
0x02 准备
首先要知道你现在所用的nginx日志格式,可以参考以下文章:
我的日志格式是:
'$remote_addr - $remote_user [$time_local] "$request" $http_host ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for" ' '$upstream_addr $upstream_status $upstream_cache_status "$upstream_http_content_type" $upstream_response_time > $request_time';
生成之后是这样的:
119.139.58.12 - - [27/Nov/2016:08:25:23 +0600] "GET / HTTP/2.0" proj.org.cn 200 158441 "https://ngx.hk/?s=elk" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36" "-" 127.0.0.1:8080 200 MISS "text/html; charset=UTF-8" 1.884 > 2.189
然后需要准备ELK5,安装配置过程请参考以下文章:
0x03 匹配
我通过filebeat读取日志后传送至logstash进行处理,处理完成再保存在elasticsearch中。其中最重要的一步就是logstash的处理,我们需要根据日志的格式编写相关的匹配代码,以便logstash进行匹配处理。
在这里我使用过滤插件中的Grok插件,具体技术文档请点击以下链接:
你还需要以下这个网站对你所编写的过滤匹配代码进行debug:
我上面的日志的过滤代码如下:
%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{IPORHOST:domain}|%{URIHOST:domain}|-) %{NUMBER:response} %{NUMBER:bytes} %{QS:referrer} %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}|-) > %{BASE16FLOAT:request_time}
如果一些顺利,在debuger中就可以生成正确的信息:
你还需要了解相关的语法,这个可以通过以下链接获取:
0x04 logstash
完成相关的过滤代码后,还需要将代码添加到logstash中。如果你依据我的安装过程安装了logstash,那么,你需要在以下文件夹新建文件:
#进入文件夹 [root@web ~]# cd /etc/logstash/conf.d/
里面应该有一个有配置监听信息的文件:
[root@web conf.d]# cat 00-logstash-listen-5044.conf input { beats { port => 5044 } } output { elasticsearch { hosts => "localhost:9200" user => "elastic" password => "changeme" manage_template => true index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" document_type => "%{[@metadata][type]}" } }
我们需要添加一个新的文件:
#新建文件 [root@web conf.d]# vim 13-proj.org.cn-fliter.conf #填入内容 filter { if "hk1_server_ngx_access_log" in [tags] { grok { match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" (%{IPORHOST:domain}|%{URIHOST:domain}|-) %{NUMBER:response} %{NUMBER:bytes} %{QS:referrer} %{QS:agent} \"(%{WORD:x_forword}|-)\" (%{URIHOST:upstream_host}|-) (%{NUMBER:upstream_response}|-) (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} (%{BASE16FLOAT:upstream_response_time}|-) > %{BASE16FLOAT:request_time}" } match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" } match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(%{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" } match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] \"((%{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|-)|)\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" } match => { "message" => "%{IPORHOST:client_ip} - %{USER:auth} \[%{HTTPDATE:timestamp}\] %{QS:request} %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent}" } } geoip { source => "client_ip" } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } useragent { source => "agent" target => "ua" } } }
上面这个配置文件我匹配了5种我所使用的nginx日志格式,其中还有一下配置信息:
- geoip:将用户IP进行分析,得出大概GPS位置、所属国家、城市、电信服务商等信息;
- date:将日志中的时间作为logstash处理的时间;
- useragent:可是分离出用户所用的浏览器、操作系统、UA等信息。
配置文件中还有以下这句判断:
if "hk1_server_ngx_access_log" in [tags]
因为我将各种各样的日志都通过logstash分析,所以在filebeat添加了自定义tags以便区分不同的log:
[root@web conf.d]# cat /etc/filebeat/filebeat.yml ######################################## - input_type: log paths: - /usr/local/html/*/logs/ngx_access.log - /var/log/nginx/access.log tags: hk1_server_ngx_access_log ########################################
完成上面这一切后,请通过以下命令测试并重新加载logstash:
[root@web conf.d]# /usr/share/logstash/bin/logstash -t -f /etc/logstash/conf.d/ --path.settings /etc/logstash/ --config.reload.automatic
0x05 检查
重新加载的过程需要点时间,但完成后,即可通过kibana检查配置是否正确:
0x06 结语
编写匹配代码是最麻烦的一步,要经过很多次的调整才能完美匹配。