Патерны logstash для syslog и *beat в elasticsearch. Двойная конфигурация приема логов windows/linux.

testsoft.net syslog index patterns

На этой странице показаны настройки logstash и патерны для парсинга логов, которые использованы в этом проекте. Конфигурация обновляется, если вы заметили ошибки в парсере, пожалуйста, напишите в комментариях. Для передачи логов linux систем, используется syslog, конечно, для настройки парсинга syslog в elasticseach нужно некоторое время, но такая конфигурация кажется мне более правильной. Использование syslog позволяет избежать установки дополнительных программ на серверах linux, т.к. rsyslog – это встроенный механизм ведения журналов Linux.

Конфигурации logstash находятся по этому пути:
cd /etc/logstash/conf.d/

[root@elk7int conf.d]# pwd
/etc/logstash/conf.d
[root@elk7int conf.d]# ll
total 20
-rw-r--r-- 1 root root 1647 Feb 16 18:42 filter-beats-http.conf
-rw-r--r-- 1 root root 3152 Feb 16 19:27 filter-beats-sshd.conf
-rw-r--r-- 1 root root 1660 Feb 16 19:44 filter-syslog-main.conf
-rw-r--r-- 1 root root  171 Feb  2 15:41 input.conf
-rw-r--r-- 1 root root  446 Feb  2 15:43 output.conf
[root@elk7int conf.d]#

Патерны для logstash находятся по этому пути:
cd /etc/logstash/patterns/

[root@elk7int patterns]# pwd
/etc/logstash/patterns
[root@elk7int patterns]# ll
total 12
-rw-r--r-- 1 root root 1046 Feb 16 19:25 http.conf
-rw-r--r-- 1 root root 1784 Feb 16 18:12 modsec.conf
-rw-r--r-- 1 root root 1333 Feb 16 20:04 ssh.conf
[root@elk7int patterns]#


LOGSTASH INPUT CONFIG (INPUT.CONF)
В этом проекте используется двойная конфигурация logstash. Подобный подход позволяет принимать логи в формате syslog и как обычно данные от filebeat, metricbeat и других типов *beat.
vi /etc/logstash/conf.d/input.conf

input {
  beats {
    port => 5044
    tags => "beats"
    type => "beats"
    ssl => false
  }

  udp {
    port  => 5055
    tags => "syslog"
    type => "syslog"
  }
}

Как видно в конфигурационном файле “input.conf” логи от *beats (Windows/Linux) принимаются на 5044 порту, а логи syslog (Linux) на 5055 порту.


LOGSTASH OUTPUT CONFIG (OUTPUT.CONF)
vi /etc/logstash/conf.d/output.conf

output {

  if "beats" in [tags] {
        elasticsearch {
                hosts => ["localhost:9200"]
                manage_template => false
                index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        }
  }
  else if "syslog" in [tags] {
        elasticsearch {
                hosts => ["localhost:9200"]
                manage_template => false
                index => "syslog-%{+YYYY.MM.dd}"
    }
  }
}

Для разделения потоков *beat и syslog используются tags (метки). Это позволяет удобнее управлять индексами в elasticsearch. Также, в итоговом варианте, конфигурации я стал разделять потоки от *beat. Это позволяет легко удалять ненужные данные, например от metricbeat, когда их становится очень много.


MAIN SYSLOG FILTER (FILTER-SYSLOG-MAIN.CONF)
vi /etc/logstash/conf.d/filter-syslog-main.conf

filter {

if [type] == "syslog" {
                grok {
                        match => { "message" => [
									"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_application} %{DATA:syslog_localdate} %{DATA:syslog_localtime} %{DATA:syslog_process} (?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}",
                                                                        "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_application}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"
                                                ]}

				}
				grok {
						patterns_dir => ["/etc/logstash/patterns"]
						match => { "syslog_message" => [
															"%{SSH_ACCESS_01}",
															"%{SSH_ACCESS_02}",
															"%{SSH_ERROR_01}",
															"%{SSH_ERROR_02}",
															"%{SSH_ERROR_03}",
															"%{SSH_ERROR_04}",
															"%{SSH_ERROR_05}",
															"%{SSH_ERROR_06}",
															"%{SSH_ERROR_07}",
															"%{SSH_ERROR_08}",
															"%{SSH_ERROR_09}",
															"%{SSH_ERROR_10}",
															"%{HTTP_ACCESS_01}",
															"%{HTTP_ACCESS_02}",
															"%{HTTP_ERROR_01}",
															"%{MODSEC_MESSAGE_01}"
														]}
				}	
				mutate {
					copy => { "[http][access][remote_addr]" => "[source][ip]" }
					copy => { "[http][access][remote_user]" => "[destination][username]" }
					copy => { "[http][access][url]" => "[request][url]" }
					copy => { "[modsecurity][uri]" => "[request][url]" }
				}
				geoip {
					source => "[source][ip]"
					target => "[geoip]"
				}				
}
}


SYSLOG MESSAGE – SSH LOGSTASH PATTERN (SSH.CONF)
Парсинг логов ssh в формате syslog, которые поступают на 5055 порт.
vi /etc/logstash/patterns/ssh.conf

SSH_ERROR_01 .*Invalid user %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}
SSH_ERROR_02 .*Disconnected from.*user %{USERNAME:[destination][username]} %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ERROR_03 .*Disconnected from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ERROR_04 .*Received disconnect from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ERROR_05 .*Failed.*user %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]} ssh2
SSH_ERROR_06 .*Failed password for %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]} ssh2
SSH_ERROR_07 .*Did not receive identification string from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}
SSH_ERROR_08 .*Connection.*user %{USERNAME:[destination][username]} %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ERROR_09 .*Connection closed by %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ERROR_10 .*Connection reset by %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ACCESS_01 .*Accepted password for %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*
SSH_ACCESS_02 .*Accepted publickey for %{USERNAME:[destination][username]} from %{IP:[source][ip]} port %{BASE10NUM:[source][port]}.*


SYSLOG MESSAGE – HTTP LOGSTASH PATTERN (HTTP.CONF)
Парсинг логов web-серверов apache/nginx, которые поступают на 5055 порт.
vi /etc/logstash/patterns/http.conf

HTTP_ACCESS_01 %{IPORHOST:[http][access][remote_addr]} - %{DATA:[http][access][remote_user]} \[%{HTTPDATE:[http][access][time_local]}\] \"%{WORD:[http][access][method]} %{DATA:[http][access][url]} HTTP/%{NUMBER:[http][access][http_version]}\" %{NUMBER:[http][access][status]} %{NUMBER:[http][access][body_bytes_sent]} \"%{DATA:[http][access][http_referer]}\" \"%{DATA:[http][access][http_user_agent]}\"
HTTP_ACCESS_02 %{IPORHOST:[http][access][remote_addr]} - %{DATA:[http][access][remote_user]} \[%{HTTPDATE:[http][access][time_local]}\] \"%{DATA:[http][access][url]} HTTP/%{NUMBER:[http][access][http_version]}\" %{NUMBER:[http][access][status]} %{NUMBER:[http][access][body_bytes_sent]} \"%{DATA:[http][access][http_referer]}\" \"%{DATA:[http][access][http_user_agent]}\"(?:\"%{DATA:[http][access][http_x_forwarded_for]}\")?
HTTP_ERROR_01 %{DATA:[http][error][time]} \[%{DATA:[http][error][level]}\] %{NUMBER:[http][error][pid]}#%{NUMBER:[http][error][tid]}: (\*%{NUMBER:[http][error][connection_id]} )?%{GREEDYDATA:[http][error][message]}


SYSLOG MESSAGE – MODSECURITY LOGSTASH PATTERN APACHE MOD (MODSEC.CONF)
Патерны для парсинга логов ModSecurity (Web Application Firewall) в формате syslog, которые поступают на 5055 порт.
vi /etc/logstash/patterns/modsec.conf

MODSEC_LOGLEVEL .*\[\:%{LOGLEVEL:severity}
MODSEC_CLIENT .*client\s%{IPORHOST:[source][ip]}:%{BASE10NUM:[source][port]}\]
MODSEC_INFO .*ModSecurity:\s+%{GREEDYDATA:[modsecurity][info]}
MODSEC_FILE .*\[file\s\"%{DATA:[modsecurity][file]}\"\]
MODSEC_LINE .*\[line\s\"%{BASE10NUM:[modsecurity][line]}\"\]
MODSEC_ID .*\[id\s\"%{BASE10NUM:[modsecurity][id]}\"\]
MODSEC_REV (\s+\[rev\s\"%{BASE10NUM:[modsecurity][rev]}\"\])?
MODSEC_MSG .*\[msg\s\"%{DATA:[modsecurity][msg]}\"\]
MODSEC_DATA (.*\[data\s\"%{DATA:[modsecurity][data]}\"\])?
MODSEC_SEVIRITY (.*\[severity\s\"%{DATA:[modsecurity][severity]}\"\])?
MODSEC_VER (.*\[ver\s\"%{DATA:[modsecurity][ver]}\"\])?
MODSEC_MATURITY (.*\[maturity\s\"%{BASE10NUM:[modsecurity][maturity]}\"\])?
MODSEC_ACCURACY (.*\[accuracy\s\"%{BASE10NUM:[modsecurity][accuracy]}\"\])?
MODSEC_TAGS (\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?(\s+\[tag\s\"%{DATA:[modsecurity][tag]}\"\])?
MODSEC_HOSTNAME (.*\[hostname\s\"%{DATA:[modsecurity][hostname]}\"\])?
MODSEC_URI (.*\[uri\s\"%{DATA:[modsecurity][uri]}\"\])?
MODSEC_UNIQUE_ID (.*\[unique_id\s\"%{DATA:[modsecurity][unique_id]}\"\])?

MODSEC_MESSAGE_01 %{MODSEC_LOGLEVEL}%{MODSEC_CLIENT}%{MODSEC_INFO}%{MODSEC_FILE}%{MODSEC_LINE}%{MODSEC_ID}%{MODSEC_REV}%{MODSEC_MSG}%{MODSEC_DATA}%{MODSEC_SEVIRITY}%{MODSEC_VER}%{MODSEC_MATURITY}%{MODSEC_ACCURACY}%{MODSEC_TAGS}%{MODSEC_HOSTNAME}%{MODSEC_URI}%{MODSEC_UNIQUE_ID}.*


FILEBEAT NGINX LOGSTASH CONFIG (FILTER-BEATS-HTTP.CONF)
Парсинг логов nginx, котороые поступают от filebeat на 5044 порт:
vi /etc/logstash/conf.d/filter-beats-http.conf

filter {

if [type] == "beats" {

    if [event][module] == "nginx" {
    if [fileset][name] == "access" {
      grok {
        match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
#        remove_field => "message"
      }
      mutate {
        add_field => { "read_timestamp" => "%{@timestamp}" }
      }
      date {
        match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
        remove_field => "[nginx][access][time]"
      }
      useragent {
        source => "[nginx][access][agent]"
        target => "[nginx][access][user_agent]"
        remove_field => "[nginx][access][agent]"
      }

      geoip {
        source => "[nginx][access][remote_ip]"
        target => "[location]"
      }

    }
    else if [fileset][name] == "error" {
      grok {
        match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
#        remove_field => "message"
      }
      mutate {
        rename => { "@timestamp" => "read_timestamp" }
      }
      date {
        match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
        remove_field => "[nginx][error][time]"
      }
    }
  }

}
}


FILEBEAT SSH LOGSTASH CONFIG (FILTER-BEATS-SSHD.CONF)
Парсинг логов ssh, котороые поступают от filebeat на 5044 порт:
vi /etc/logstash/conf.d/filter-beats-sshd.conf

filter {

if [type] == "beats" {

  if [event][module] == "system" {
    if [fileset][name] == "auth" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
                  "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
        pattern_definitions => {
          "GREEDYMULTILINE"=> "(.|\n)*"
        }
#        remove_field => "message"
      }
      date {
        match => [ "[system][auth][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }

      geoip {
        source => "[system][auth][ssh][ip]"
        target => "[location]"
      }

    }
    else if [fileset][name] == "syslog" {
      grok {
        match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
        pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
 #       remove_field => "message"
      }
      date {
        match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      }
    }
  }

}
}