На этой странице показаны настройки logstash и патерны для парсинга логов, которые использованы в этом проекте. Конфигурация обновляется, если вы заметили ошибки в парсере, пожалуйста, напишите в комментариях. Для передачи логов linux систем, используется syslog, конечно, для настройки парсинга syslog в elasticseach нужно некоторое время, но такая конфигурация кажется мне более правильной. Использование syslog позволяет избежать установки дополнительных программ на серверах linux, т.к. rsyslog – это встроенный механизм ведения журналов Linux.
Конфигурации logstash находятся по этому пути:
cd /etc/logstash/conf.d/
[root@elk7int conf.d]# pwd /etc/logstash/conf.d [root@elk7int conf.d]# ll total 20 -rw-r--r-- 1 root root 1647 Feb 16 18:42 filter-beats-http.conf -rw-r--r-- 1 root root 3152 Feb 16 19:27 filter-beats-sshd.conf -rw-r--r-- 1 root root 1660 Feb 16 19:44 filter-syslog-main.conf -rw-r--r-- 1 root root 171 Feb 2 15:41 input.conf -rw-r--r-- 1 root root 446 Feb 2 15:43 output.conf [root@elk7int conf.d]#
Патерны для logstash находятся по этому пути:
cd /etc/logstash/patterns/
[root@elk7int patterns]# pwd /etc/logstash/patterns [root@elk7int patterns]# ll total 12 -rw-r--r-- 1 root root 1046 Feb 16 19:25 http.conf -rw-r--r-- 1 root root 1784 Feb 16 18:12 modsec.conf -rw-r--r-- 1 root root 1333 Feb 16 20:04 ssh.conf [root@elk7int patterns]#
LOGSTASH INPUT CONFIG (INPUT.CONF)
В этом проекте используется двойная конфигурация logstash. Подобный подход позволяет принимать логи в формате syslog и как обычно данные от filebeat, metricbeat и других типов *beat.
vi /etc/logstash/conf.d/input.conf
input { beats { port => 5044 tags => "beats" type => "beats" ssl => false } udp { port => 5055 tags => "syslog" type => "syslog" } }
Как видно в конфигурационном файле “input.conf” логи от *beats (Windows/Linux) принимаются на 5044 порту, а логи syslog (Linux) на 5055 порту.
LOGSTASH OUTPUT CONFIG (OUTPUT.CONF)
vi /etc/logstash/conf.d/output.conf
output { if "beats" in [tags] { elasticsearch { hosts => ["localhost:9200"] manage_template => false index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" } } else if "syslog" in [tags] { elasticsearch { hosts => ["localhost:9200"] manage_template => false index => "syslog-%{+YYYY.MM.dd}" } } }
Для разделения потоков *beat и syslog используются tags (метки). Это позволяет удобнее управлять индексами в elasticsearch. Также, в итоговом варианте, конфигурации я стал разделять потоки от *beat. Это позволяет легко удалять ненужные данные, например от metricbeat, когда их становится очень много.
MAIN SYSLOG FILTER (FILTER-SYSLOG-MAIN.CONF)
vi /etc/logstash/conf.d/filter-syslog-main.conf
filter { if [type] == "syslog" { grok { match => { "message" => [ "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_application} %{DATA:syslog_localdate} %{DATA:syslog_localtime} %{DATA:syslog_process} (?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}", "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_application}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" ]} } grok { patterns_dir => ["/etc/logstash/patterns"] match => { "syslog_message" => [ "%{SSH_ACCESS_01}", "%{SSH_ACCESS_02}", "%{SSH_ERROR_01}", "%{SSH_ERROR_02}", "%{SSH_ERROR_03}", "%{SSH_ERROR_04}", "%{SSH_ERROR_05}", "%{SSH_ERROR_06}", "%{SSH_ERROR_07}", "%{SSH_ERROR_08}", "%{SSH_ERROR_09}", "%{SSH_ERROR_10}", "%{HTTP_ACCESS_01}", "%{HTTP_ACCESS_02}", "%{HTTP_ERROR_01}", "%{MODSEC_MESSAGE_01}" ]} } mutate { copy => { "[http][access][remote_addr]" => "[source][ip]" } copy => { "[http][access][remote_user]" => "[destination][username]" } copy => { "[http][access][url]" => "[request][url]" } copy => { "[modsecurity][uri]" => "[request][url]" } } geoip { source => "[source][ip]" target => "[geoip]" } } }
SYSLOG MESSAGE – SSH LOGSTASH PATTERN (SSH.CONF)
Парсинг логов ssh в формате syslog, которые поступают на 5055 порт.
vi /etc/logstash/patterns/ssh.conf
SSH_ERROR_01 .*Invalid user %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]} SSH_ERROR_02 .*Disconnected from.*user %{USERNAME:[destination][username]} %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ERROR_03 .*Disconnected from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ERROR_04 .*Received disconnect from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ERROR_05 .*Failed.*user %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]} ssh2 SSH_ERROR_06 .*Failed password for %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]} ssh2 SSH_ERROR_07 .*Did not receive identification string from %{IP:[source][ip]} port %{BASE10NUM:[source][port]} SSH_ERROR_08 .*Connection.*user %{USERNAME:[destination][username]} %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ERROR_09 .*Connection closed by %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ERROR_10 .*Connection reset by %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ACCESS_01 .*Accepted password for %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.* SSH_ACCESS_02 .*Accepted publickey for %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SYSLOG MESSAGE – HTTP LOGSTASH PATTERN (HTTP.CONF)
Парсинг логов web-серверов apache/nginx, которые поступают на 5055 порт.
vi /etc/logstash/patterns/http.conf
HTTP_ACCESS_01 %{IPORHOST:[http][access][remote_addr]} - %{DATA:[http][access][remote_user]} \[%{HTTPDATE:[http][access][time_local]}\] \"%{WORD:[http][access][method]} %{DATA:[http][access][url]} HTTP/%{NUMBER:[http][access][http_version]}\" %{NUMBER:[http][access][status]} %{NUMBER:[http][access][body_bytes_sent]} \"%{DATA:[http][access][http_referer]}\" \"%{DATA:[http][access][http_user_agent]}\" HTTP_ACCESS_02 %{IPORHOST:[http][access][remote_addr]} - %{DATA:[http][access][remote_user]} \[%{HTTPDATE:[http][access][time_local]}\] \"%{DATA:[http][access][url]} HTTP/%{NUMBER:[http][access][http_version]}\" %{NUMBER:[http][access][status]} %{NUMBER:[http][access][body_bytes_sent]} \"%{DATA:[http][access][http_referer]}\" \"%{DATA:[http][access][http_user_agent]}\"(?:\"%{DATA:[http][access][http_x_forwarded_for]}\")? HTTP_ERROR_01 %{DATA:[http][error][time]} \[%{DATA:[http][error][level]}\] %{NUMBER:[http][error][pid]}#%{NUMBER:[http][error][tid]}: (\*%{NUMBER:[http][error][connection_id]} )?%{GREEDYDATA:[http][error][message]}
SYSLOG MESSAGE – MODSECURITY LOGSTASH PATTERN APACHE MOD (MODSEC.CONF)
Патерны для парсинга логов ModSecurity (Web Application Firewall) в формате syslog, которые поступают на 5055 порт.
vi /etc/logstash/patterns/modsec.conf
MODSEC_LOGLEVEL .*\[\:%{LOGLEVEL:severity} MODSEC_CLIENT .*client\s%{IPORHOST:[source][ip]}:%{BASE10NUM:[source][port]}\] MODSEC_INFO .*ModSecurity:\s+%{GREEDYDATA:[modsecurity][info]} MODSEC_FILE .*\[file\s\"%{DATA:[modsecurity][file]}\"\] MODSEC_LINE .*\[line\s\"%{BASE10NUM:[modsecurity][line]}\"\] MODSEC_ID .*\[id\s\"%{BASE10NUM:[modsecurity][id]}\"\] MODSEC_REV (\s+\[rev\s\"%{BASE10NUM:[modsecurity][rev]}\"\])? MODSEC_MSG .*\[msg\s\"%{DATA:[modsecurity][msg]}\"\] MODSEC_DATA (.*\[data\s\"%{DATA:[modsecurity][data]}\"\])? MODSEC_SEVIRITY (.*\[severity\s\"%{DATA:[modsecurity][severity]}\"\])? MODSEC_VER (.*\[ver\s\"%{DATA:[modsecurity][ver]}\"\])? MODSEC_MATURITY (.*\[maturity\s\"%{BASE10NUM:[modsecurity][maturity]}\"\])? MODSEC_ACCURACY (.*\[accuracy\s\"%{BASE10NUM:[modsecurity][accuracy]}\"\])? MODSEC_TAGS (\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])? MODSEC_HOSTNAME (.*\[hostname\s\"%{DATA:[modsecurity][hostname]}\"\])? MODSEC_URI (.*\[uri\s\"%{DATA:[modsecurity][uri]}\"\])? MODSEC_UNIQUE_ID (.*\[unique_id\s\"%{DATA:[modsecurity][unique_id]}\"\])? MODSEC_MESSAGE_01 %{MODSEC_LOGLEVEL}%{MODSEC_CLIENT}%{MODSEC_INFO}%{MODSEC_FILE}%{MODSEC_LINE}%{MODSEC_ID}%{MODSEC_REV}%{MODSEC_MSG}%{MODSEC_DATA}%{MODSEC_SEVIRITY}%{MODSEC_VER}%{MODSEC_MATURITY}%{MODSEC_ACCURACY}%{MODSEC_TAGS}%{MODSEC_HOSTNAME}%{MODSEC_URI}%{MODSEC_UNIQUE_ID}.*
FILEBEAT NGINX LOGSTASH CONFIG (FILTER-BEATS-HTTP.CONF)
Парсинг логов nginx, котороые поступают от filebeat на 5044 порт:
vi /etc/logstash/conf.d/filter-beats-http.conf
filter { if [type] == "beats" { if [event][module] == "nginx" { if [fileset][name] == "access" { grok { match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] } # remove_field => "message" } mutate { add_field => { "read_timestamp" => "%{@timestamp}" } } date { match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ] remove_field => "[nginx][access][time]" } useragent { source => "[nginx][access][agent]" target => "[nginx][access][user_agent]" remove_field => "[nginx][access][agent]" } geoip { source => "[nginx][access][remote_ip]" target => "[location]" } } else if [fileset][name] == "error" { grok { match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] } # remove_field => "message" } mutate { rename => { "@timestamp" => "read_timestamp" } } date { match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ] remove_field => "[nginx][error][time]" } } } } }
FILEBEAT SSH LOGSTASH CONFIG (FILTER-BEATS-SSHD.CONF)
Парсинг логов ssh, котороые поступают от filebeat на 5044 порт:
vi /etc/logstash/conf.d/filter-beats-sshd.conf
filter { if [type] == "beats" { if [event][module] == "system" { if [fileset][name] == "auth" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$", "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] } pattern_definitions => { "GREEDYMULTILINE"=> "(.|\n)*" } # remove_field => "message" } date { match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } geoip { source => "[system][auth][ssh][ip]" target => "[location]" } } else if [fileset][name] == "syslog" { grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] } pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" } # remove_field => "message" } date { match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } } } }