本篇文章推荐通过pdf下载方式观看:https://download.zhoufengjie.cn/document/loganalysis/filebeat-logstash-ldnslog-uploadtoelasticsearch.pdf
0、环境说明:
环境搭建:https://download.zhoufengjie.cn/document/loganalysis/elasticsearch-logstash-kibana-install.pdf
filebeat+logstash做移动LDNS的日志采集上传到elasticsearch,通过kibana查看。
filebeat设备ip地址:192.168.0.105
logstash设备ip地址:192.168.0.105
elasticsearch设备ip地址:192.168.0.97
kibana设备ip地址:192.168.0.97
LDNS的log格式定义为:
####
1. 用户IP:发起DNS请求的用户IP,要求支持IPv4和IPv6用户地址;
2. 请求域名:用户请求域名(Query字段的值);
3. 解析时间:DNS服务器回复用户的时间;
4. A记录解析地址:用户域名解析请求的应答包中,Answers字段中第一个A记录IP地址;
5. 解析结果代码(RCODE):用十进制数标识,其中0-NOERROR, 1-FORMERR, 2-SERVFAIL, 3-NXDOMAIN, 4-NOTIMP, 5-REFUSED, 6~15-保留;
6. 请求DNS记录类型:用户请求类型,如A、AAAA、CNAME等,用十进制数标识,其中1-A, 28-AAAA, 5-CNAME;
7. cname:用户请求域名的所有cname域名,按照顺序存放,以分号隔开;
8. AAAA记录解析地址:用户域名解析请求的应答包中,Answers字段中第一个AAAA记录IP地址;
9. 业务IP:提供服务的DNS服务器IP地址,建议有需求的省份上报DNS缓存实际地址;没有需求的省份可不上报,但须保留该字段;
10. 解析时延(可选);
####
日志样例:
####
2409:8a44:1e00:84a2:0e37:47ff:fe76:3eae|iesdouyin.com.|20191031002459||0|1|||2409:8088:0000:0000:0000:0000:0000:0008 223.88.236.41|sina.cn.|20191031002459|221.179.175.207|0|1|||211.138.24.66 223.88.189.98|www.sina.com.cn.|20191031002459|120.192.83.125|0|1|spool.grid.sinaedge.com.||211.138.24.66 223.104.108.208|wspeed.qq.com.|20191031002459||2|28|||211.138.24.66 2409:8945:7a40:2752:5055:cd86:b0f8:6333|k35.gslb.ksyuncdn.com.|20191031002459|111.7.69.2|0|1|||2409:8088:0000:0000:0000:0000:0000:0008 111.7.89.133|cdn.cloudforest.ltd.|20191031002459||0|28|||211.138.24.66
223.88.54.236|apissl.ksapisrv.com.|20191031002459|103.107.217.103|0|1|api.ksapisrv.com.;nls-kix.ksapisrv.com.||211.138.24.66 223.91.103.90|gs.a.sohu.com.|20191031002459||0|28|fjsyyd.a.sohu.com.|2409:8c00:3001:0000:0000:0000:0000:0004|211.138.24.66 223.90.15.3|www.w3.org.|20191031002459|128.30.52.100|0|1|||211.138.24.66
####
1、filebeat部署:
编辑vi /etc/filebeat/filebeat.yml输入以下内容[在搜集nginx日志的基础上,增加dns日志的搜集]:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
filebeat.inputs: - input_type: log paths: - /var/log/dns/dns*.log exclude_files: ['.gz$'] tags: ["filebeat-dns-log"] document_type: dnslog - input_type: log paths: - /var/log/nginx/access*.log exclude_files: ['.gz$'] tags: ["filebeat-nginx-accesslog"] document_type: nginxaccess - input_type: log paths: - /var/log/nginx/error*.log tags: ["filebeat-nginx-errorlog"] exclude_files: ['.gz$'] document_type: nginxerror tags: ["105-filebeat-logs"] filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false output.logstash: hosts: ["127.0.0.1:5044"] #output: ## console: ## pretty: true |
临时测试【测试的时候,可以把output改成console,然后看输出】
/usr/bin/filebeat -e -c /etc/filebeat/filebeat.yml
启动filebeat:
systemctl start filebeat
2、配置logstash:
编辑/etc/logstash/conf.d/rec-filebeat-log.conf配置一个专门采集nginx的logstash配置,用来把filebeat上传上来的日志进行字段过滤和拆解,输入如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
input { beats { port => 5044 host => "127.0.0.1" } } filter { if "105-filebeat-logs" in [tags] { if "filebeat-dns-log" in [tags] { grok { match => { "message" => ["%{IP:[ldns][request][remote_ip]}\|%{HOSTNAME:[ldns][request][domain]}\|%{GREEDYDATA:content}"] } remove_field => "message" add_field => {"logtype"=>"ldnslogs"} } mutate { #add_field => { "read_timestamp" => "%{@timestamp}" } #convert => ["totaltime","float"] split => ["content","|"] add_field => { "[ldns][resolve][time]" => "%{[content][0]}" } add_field => { "[ldns][resolve][addr]" => "%{[content][1]}" } add_field => { "[ldns][resolve][code]" => "%{[content][2]}" } add_field => { "[ldns][request][type]" => "%{[content][3]}" } add_field => { "[ldns][resolve][cname]" => "%{[content][4]}" } add_field => { "[ldns][resolve][ipv6addr]" => "%{[content][5]}" } add_field => { "[ldns][resolve][serverip]" => "%{[content][6]}" } remove_field => "content" } geoip { source => "[ldns][request][remote_ip]" database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb" target => "[ldns][request][geoip]" fields => ["ip","country_name","region_name","city_name","latitude","longitude","region_code"] add_field => [ "[ldns][request][geoip][coordinates]" , "%{[ldns][request][geoip][longitude]}" ] add_field => [ "[ldns][request][geoip][coordinates]" , "%{[ldns][request][geoip][latitude]}" ] } geoip { source => "[ldns][resolve][addr]" database => "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb" target => "[ldns][resolve][geoip]" fields => ["ip","country_name","region_name","city_name","latitude","longitude","region_code"] add_field => [ "[ldns][resolve][geoip][coordinates]" , "%{[ldns][resolve][geoip][longitude]}" ] add_field => [ "[ldns][resolve][geoip][coordinates]" , "%{[ldns][resolve][geoip][latitude]}" ] } } if "filebeat-nginx-accesslog" in [tags] { grok { match => { "message" => ["%{DATA:[nginx][access][time]} %{DATA:[nginx][access][request_time]} %{IPORHOST:[nginx][access][remote_ip]} %{DATA:[nginx][access][upstream][cache_status]} %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} %{WORD:[nginx][access][method]} %{DATA:[nginx][access][scheme]} %{DATA:[nginx][access][domain]} %{DATA:[nginx][access][url]} %{DATA:[nginx][access][args]} %{DATA:[nginx][access][user_name]} %{DATA:[nginx][access][upstream][upstream_ip]} %{NUMBER:[nginx][access][upstream][response_code]} %{DATA:[nginx][access][upstream][response_time]} \"%{DATA:[nginx][access][upstream][content_type]}\" \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\" \"%{GREEDYDATA:[nginx][access][cookie]}\""] } remove_field => "message" add_field => {"logtype"=>"nginxLogs"} } grok { match => {"[nginx][access][url]" => "%{URIPATH:api}"} } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } #convert => ["totaltime","float"] } #date { # match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ] # remove_field => "[nginx][access][time]" #} useragent { source => "[nginx][access][agent]" target => "[nginx][access][user_agent]" remove_field => "[nginx][access][agent]" } geoip { source => "[nginx][access][remote_ip]" target => "[nginx][access][geoip]" } } if "filebeat-nginx-errorlog" in [tags] { grok { match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] } remove_field => "message" add_field => {"logtype"=>"nginxLogs"} } mutate { rename => { "@timestamp" => "read_timestamp" } } date { match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ] remove_field => "[nginx][error][time]" } } } } output { if "105-filebeat-logs" in [tags] { if [logtype] == "ldnslogs" { elasticsearch { hosts => ["192.168.0.97:9200"] manage_template => false index => "ldnslogs-%{[@metadata][beat]}-%{+YYYY.MM.dd}" } #if [logtype] == "nginxLogs" { # elasticsearch { # hosts => ["192.168.0.97:9200"] # manage_template => false # index => "nginxlogs-%{[@metadata][beat]}-%{+YYYY.MM.dd}" # } #stdout { # codec => rubydebug #} } } } |
注:gork格式说明:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
重启logstash:
systemctl restart logstash
3、查看kibana日志
管理=>索引模式=>创建索引模式=>ldnslog-filebeat*
然后就可以进一步分析查看了,点击discover可以查看日志状态。上传到elasticsearch上面的日志,