logstash

Elastic Stackの歩きかた#7

Logstashによるデータ出力


1年以上、更新していなかったでしょうか・・・(書きたいなとは思いつつ忙殺されてました) Elastic Stackのバージョンも7.8になりました。一応、過去記事からの流れで Logstashを使用したログの流し込みです。


elasticstack_wg_logical8


1.設計要素

今回の筆者の前提環境では、Linux サーバのSyslogとCiscoルータのログをLogstash経由でelasticsearchへ流し込む方式です。

LinuxサーバのSyslogは、FilebeatでLogstashへ連携することにします。CiscoルータのログはSyslog転送で Elastic Stack サーバへ転送した後、Logstashで取り込むことにします。

Linux サーバ:Syslog → Fil;ebeat → Elastic Stack Server:Logstash → Elasticsearch

Cisco 841MJ:Syslog転送 Elastic Stack Server:Logstash → Elasticsearch

実際の各種ログ取り込み方式としては、以下の方針(個人的主観)が良いと思います。

・各サーバ(機器)のログはSyslog転送を使用してIngestノード(Logstashを配置しているサーバ)へ保存する。転送したログをLogstashまたはBeatsでElasticsearchへ出力する。

・Syslog転送が使用できない場合は、Beatsを使用して Ingestノード(Logstashを配置しているサーバ)またはElasticsearchへ転送する。


2.インストール

LogstashやFilebeatのインストールについては、Elastic Stackの歩きかた#5,#6あたりを参照してください。


3.各設定

■Cisco841MJ側の設定

Ciscoルータ側はElasticStackサーバへsyslog転送する設定(どのファシリティを使用するかは任意)

logging facility local5
logging host 192.168.11.210 ← 転送先サーバのIP

Linuxサーバ側の設定

・filebeat.ymlの設定

①Kibanaホストの指定

# =================================== Kibana ===================================

# Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API.
# This requires a Kibana endpoint configuration.
setup.kibana:

  # Kibana Host
  # Scheme and port can be left out and will be set to the default (http and 5601)
  # In case you specify and additional path, the scheme is required: http://localhost:5601/path
  # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
  host: "192.168.11.210:5601" ← ElasticStackサーバのIP

②Outputの設定

 Logstashへ転送するので、Elasticsearchへの出力はコメントにする。

# ---------------------------- Elasticsearch Output ----------------------------
#output.elasticsearch:
  # Array of hosts to connect to.
  # hosts: ["localhost:9200"]

  # Protocol - either `http` (default) or `https`.
  #protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  #username: "elastic"
  #password: "changeme"

# ------------------------------ Logstash Output -------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.11.210:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

③systemモジュールの有効化

以下コマンドでモジュールを有効にする。

# filebeat modules enable system

モジュールの有効化を確認

# filebeat modules list
Enabled:
system

Disabled:
activemq
apache
auditd
aws
~省略~

ElasticStackサーバ側の設定

・Rsyslogの設定

Rsyslog設定は転送ログを受信できるように、デフォルトのコメントを外す。(受信ポートを変更したい場合は任意で変更)ファシリティはlocal5を指定し出力先を指定。

# syslog転送設定
# Provides TCP syslog reception
$ModLoad imtcp
$InputTCPServerRun 514


# ログ出力先設定
#Cisco 841 Syslog
local5.*                                                /var/log/network.log

・Logstashの設定

①Cisco841MJ用のconf

local5へ転送されたsyslogの出力ファイルをinputに指定しElasticsearchへ出力。(grokフィルターは参考程度に見といてください)

input {
     file {
        path => "/var/log/network.log"
        sincedb_path => "/tmp/.ciscolog.sincedb"
        start_position => "beginning"
     }
}

filter {
    if "FW-" in [message]  {
       grok {
           match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s%{DATA:type}\s%{DATA:seqno1}:\s%{DATA:seqno2}:\s%{DATA:timestamp_jst}:\s%{DATA:evnt_type}:\s%{GREEDYDATA:action}\s%{DATA:protocol}\ssession\s%{IP:src_ip}:%{DATA:src_port}\s%{IP:dest_ip}:%{DATA:dest_port}\s%{GREEDYDATA:logmsg}" }
           break_on_match => false
           tag_on_failure => ["_message_parse_failure"]
           add_field => {
               "dest_ip_fqdn" => "%{dest_ip}"
           }
       }
       geoip {
          source => ["dest_ip"]
          target => "geoip"
       }
       dns {
          reverse => [ "dest_ip_fqdn" ]
          action => "replace"
       }
   } else {
       grok {
           match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s%{DATA:type}\s%{DATA:seqno1}:\s%{DATA:seqno2}:\s%{DATA:timestamp_jst}:\s%{DATA:evnt_type}:\s%{GREEDYDATA:logmsg}" }
           break_on_match => false
           tag_on_failure => ["_message_parse_failure"]
       }
   }
   date {
          match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
   }
}

output {
     elasticsearch {
         hosts => ["localhost:9200"]
         index => "cisco_syslog-%{+YYYY-MM}"
     }
}

②Linuxサーバのsyslog用のconf

filebeatで転送してきているので、beatsをinputに指定。

input {
     beats {
        port => 5044
     }
}
filter {
   grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s%{GREEDYDATA:restof}" }
   }
   date {
          match => ["timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss"]
   }
   mutate {
      remove_field => [ "restof" ]
   }
}
output {
     elasticsearch {
         hosts => ["localhost:9200"]
         index => "sv_syslog-%{+YYYY-MM}"
     }
}

4.INDEXの確認

DiscoverでINDEXにログが登録されている事を確認します。

Linuxサーバのログ

syslog_discover

Cisco841MJのログ

 cisco_syslog_discover

Dropパケットの宛先をMapで表示してみました。cisco_drop_dest

ElasticStackの歩き方は今回で終了にします。今後は色々な使い方や機能について記事にしていきたいと思います。

いじょ。