Logstashによるデータ出力
1年以上、更新していなかったでしょうか・・・(書きたいなとは思いつつ忙殺されてました) Elastic Stackのバージョンも7.8になりました。一応、過去記事からの流れで Logstashを使用したログの流し込みです。
1.設計要素
今回の筆者の前提環境では、Linux サーバのSyslogとCiscoルータのログをLogstash経由でelasticsearchへ流し込む方式です。
LinuxサーバのSyslogは、FilebeatでLogstashへ連携することにします。CiscoルータのログはSyslog転送で Elastic Stack サーバへ転送した後、Logstashで取り込むことにします。
■Linux サーバ:Syslog → Fil;ebeat → Elastic Stack Server:Logstash → Elasticsearch
■Cisco 841MJ:Syslog転送 Elastic Stack Server:Logstash → Elasticsearch
実際の各種ログ取り込み方式としては、以下の方針(個人的主観)が良いと思います。
・各サーバ(機器)のログはSyslog転送を使用してIngestノード(Logstashを配置しているサーバ)へ保存する。転送したログをLogstashまたはBeatsでElasticsearchへ出力する。
・Syslog転送が使用できない場合は、Beatsを使用して Ingestノード(Logstashを配置しているサーバ)またはElasticsearchへ転送する。
2.インストール
LogstashやFilebeatのインストールについては、Elastic Stackの歩きかた#5,#6あたりを参照してください。
3.各設定
■Cisco841MJ側の設定
Ciscoルータ側はElasticStackサーバへsyslog転送する設定(どのファシリティを使用するかは任意)
logging facility local5 logging host 192.168.11.210 ← 転送先サーバのIP
■Linuxサーバ側の設定
・filebeat.ymlの設定
①Kibanaホストの指定
# =================================== Kibana =================================== # Starting with Beats version 6.0.0, the dashboards are loaded via the Kibana API. # This requires a Kibana endpoint configuration. setup.kibana: # Kibana Host # Scheme and port can be left out and will be set to the default (http and 5601) # In case you specify and additional path, the scheme is required: http://localhost:5601/path # IPv6 addresses should always be defined as: https://[2001:db8::1]:5601 host: "192.168.11.210:5601" ← ElasticStackサーバのIP
②Outputの設定
Logstashへ転送するので、Elasticsearchへの出力はコメントにする。
# ---------------------------- Elasticsearch Output ---------------------------- #output.elasticsearch: # Array of hosts to connect to. # hosts: ["localhost:9200"] # Protocol - either `http` (default) or `https`. #protocol: "https" # Authentication credentials - either API key or username/password. #api_key: "id:api_key" #username: "elastic" #password: "changeme" # ------------------------------ Logstash Output ------------------------------- output.logstash: # The Logstash hosts hosts: ["192.168.11.210:5044"] # Optional SSL. By default is off. # List of root certificates for HTTPS server verifications #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"] # Certificate for SSL client authentication #ssl.certificate: "/etc/pki/client/cert.pem" # Client Certificate Key #ssl.key: "/etc/pki/client/cert.key"
③systemモジュールの有効化
以下コマンドでモジュールを有効にする。
# filebeat modules enable system
モジュールの有効化を確認
# filebeat modules list Enabled: system Disabled: activemq apache auditd aws ~省略~
■ElasticStackサーバ側の設定
・Rsyslogの設定
Rsyslog設定は転送ログを受信できるように、デフォルトのコメントを外す。(受信ポートを変更したい場合は任意で変更)ファシリティはlocal5を指定し出力先を指定。
# syslog転送設定 # Provides TCP syslog reception $ModLoad imtcp $InputTCPServerRun 514 # ログ出力先設定 #Cisco 841 Syslog local5.* /var/log/network.log
・Logstashの設定
①Cisco841MJ用のconf
local5へ転送されたsyslogの出力ファイルをinputに指定しElasticsearchへ出力。(grokフィルターは参考程度に見といてください)
input { file { path => "/var/log/network.log" sincedb_path => "/tmp/.ciscolog.sincedb" start_position => "beginning" } } filter { if "FW-" in [message] { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s%{DATA:type}\s%{DATA:seqno1}:\s%{DATA:seqno2}:\s%{DATA:timestamp_jst}:\s%{DATA:evnt_type}:\s%{GREEDYDATA:action}\s%{DATA:protocol}\ssession\s%{IP:src_ip}:%{DATA:src_port}\s%{IP:dest_ip}:%{DATA:dest_port}\s%{GREEDYDATA:logmsg}" } break_on_match => false tag_on_failure => ["_message_parse_failure"] add_field => { "dest_ip_fqdn" => "%{dest_ip}" } } geoip { source => ["dest_ip"] target => "geoip" } dns { reverse => [ "dest_ip_fqdn" ] action => "replace" } } else { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s%{DATA:type}\s%{DATA:seqno1}:\s%{DATA:seqno2}:\s%{DATA:timestamp_jst}:\s%{DATA:evnt_type}:\s%{GREEDYDATA:logmsg}" } break_on_match => false tag_on_failure => ["_message_parse_failure"] } } date { match => ["timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "cisco_syslog-%{+YYYY-MM}" } }
②Linuxサーバのsyslog用のconf
filebeatで転送してきているので、beatsをinputに指定。
input { beats { port => 5044 } } filter { grok { match => { "message" => "%{SYSLOGTIMESTAMP:timestamp}\s%{GREEDYDATA:restof}" } } date { match => ["timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"] } mutate { remove_field => [ "restof" ] } } output { elasticsearch { hosts => ["localhost:9200"] index => "sv_syslog-%{+YYYY-MM}" } }
4.INDEXの確認
DiscoverでINDEXにログが登録されている事を確認します。
・Linuxサーバのログ
・Cisco841MJのログ
ElasticStackの歩き方は今回で終了にします。今後は色々な使い方や機能について記事にしていきたいと思います。
いじょ。