Logstashを利用したApacheアクセスログのインポート

Posted by johtani on Friday, November 21, 2014

目次

JJUG CCCや第7回Elasticsearch勉強会のKibana4のデモにアクセスログを利用しました。

ただ、セッションでは、どうやってElasticsearchに投入したのかという詳しい話をしていませんでした。 本記事では、データ取り込み時に利用したLogstashの設定ファイルについて説明します。

Logstashの設定の説明に入る前に、全体の流れを。 「ApacheアクセスログをKibana4により可視化」です。

材料の準備

「ApacheアクセスログをKibana4により可視化」に必要な材料は次の通りです。 (今回は起動するところまでいかないので、実際に必要なのは次回以降になります。)

  • Java 7(u55以上を1つ)
  • Logstash 1.4.2(1つ)
  • Elasticsearch 1.4.0(1つ)
  • Kibana4 Beta2(1つ)
  • Apacheのアクセスログ(適量)

Apacheのアクセスログ以外は、公式サイトからダウンロードできます。 それぞれをダウンロードして、起動できるようにしておきましょう。

※1台のマシン上で行う場合は、アクセスログの量を少なめにするなどの対策をとりましょう。 ※今回は、1台のマシン(Mac)上で、VMなどを利用せず、それぞれ直接起動するものとします。

可視化の手順と流れ

可視化の流れとしては、

  1. Logstashでファイルを読み込み、各種処理(パースしたり、情報を追加したり、切り出したり)
  2. Elasticsearchに保存
  3. Kibanaでグラフを作ったり、検索してみたり

です。

今回は、1のLogstashでファイルを読み込んだりする設定ファイルの説明です。

Logstashの設定

Logstashの基本

まずは、Logstashの設定ですが、簡単にLogstashの説明を。 Logstashは大きく3つのパーツに分かれています。

  1. input:データの入力処理
  2. filter:inputで読み込んだデータに対する操作など
  3. output:データの出力処理

inputでデータを読み込み(複数可)、filterでデータに対して各種処理を行い、outputでデータを指定されたところに出力(複数可)します。

アクセスログの読み込み設定

アクセスログの読み込み処理は大まかに次のようなものとなります。

  1. アクセスログを読み込む(input/file)
  2. 読み取ったアクセスログを各フィールド(IPアドレス、ユーザエージェントなど)に分割(filter/grok)
  3. 日付のパース(filter/date)
  4. クライアントIPアドレスにgeoipの情報を付加(filter/geoip)
  5. リクエストのパスの第1階層の抽出(filter/grok)
  6. ユーザエージェントのパース(filter/useragent)
  7. Elasticsearchへの出力(output/elasticsearch)

設定ファイルは次のようなものになります。

input {
  file {
    path => "/Users/johtani/demo_access_log/*/*.log"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    break_on_match => false
    tag_on_failure => ["_message_parse_failure"]
  }
  date {
    match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
    locale => en
  }
  geoip {
    source => ["clientip"]
  }
  grok {
    match => { "request" => "^/%{WORD:first_path}/%{GREEDYDATA}$" }
    tag_on_failure => ["_request_parse_failure"]
  }
  useragent {
    source => "agent"
    target => "useragent"
  }
}

output {
  elasticsearch {
    host => "localhost"
    index => "new_demo_access_log-%{year}"
    cluster => "demo_cluster"
    protocol => "http"
  }
}
1. アクセスログを読み込む(input/file)

inputのfileモジュール(a)を使用してアクセスログのファイルを読み込みます。 pathでアクセスログのファイルのパスを指定します。 今回利用したアクセスログはdemo_access_log/2010/access20100201.logといった日毎のファイルに分割されていたため、 *を利用してファイルのパスを指定しました。 また、今回は既存のファイルの読み込みだけのため、start_positionbeginningを指定してあります。 デフォルトではendが指定されるため、Logstashを起動後に追記されたログから対象になってしまうためです。 その他の設定については、公式ガイドをご覧ください。

input {
  file { # a
    path => "/Users/johtani/demo_access_log/*/*.log" # b
    start_position => "beginning" # c
  }
}

Logstashでは、ファイルをどこまで読み込んだかという情報を保持するために、sincedbを利用しています。 設定変更後に同じファイルを最初から読み込みたい場合などは、こちらのファイルを一旦削除するなどの対応が必要です。

ちなみに、読み込んだデータは次のようなJSONになっています。

{
  "message": "読み込んだアクセスログ",
  "@version": "1",
  "@timestamp":"2014-11-21T06:16:21.644Z",
  "host":"jupiter.local",
  "path":"/Users/johtani/demo_access_log/2010/access20100201.log"}
}

特に指定がない場合は、messageに読み込んだデータが入ってきます。 @timestampがLogstashが読み込んだ時刻、hostはLogstashが動作しているホスト名です。 pathはfileモジュールが読み込んだファイルのパスを設定しています。 この後の処理で、どこの項目に対して処理を行うかといったことが重要になるので、

2. 読み取ったアクセスログを各フィールド(IPアドレス、ユーザエージェントなど)に分割(filter/grok)

2.〜6.の処理は、inputで読み込んだ1アクセスログに対する処理となります。

ここでは、grokフィルタを使用して Apacheのアクセスログを各フィールドに分割します。 Logastashでは、簡単に使えるようにいくつかのパターンが用意されています。 Apacheのログのために、COMBINEDAPACHELOGというのが用意されています。 今回はこちらを使用しています。その他にも日付などパターンが用意されているので、試してみてください。

messageにアクセスログが入っているので、こちらの項目に対してCOMBINEDAPACHELOGのパターンを matchで適用してフィールドに抜き出します。 tag_on_failureは、matchでパースに失敗した場合に、tagというフィールドに指定した文字列を出力する機能になります。 デフォルトだと_grokparsefailureが付与されますが、ここでは、どの処理で失敗したがを判別するために文字列を変更しています。

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
    break_on_match => false
    tag_on_failure => ["_message_parse_failure"]
  }
  ...

clientipidentauthtimestampverbrequesthttpversionresponsebytesreferreragentがgrokフィルタにより抜き出された項目です。

{
  "message":"アクセスログ",
  "@version":"1",
  "@timestamp":"2014-11-21T07:20:54.387Z",
  "host":"jupiter.local",
  "path":"/Users/johtani/demo_access_log/2010/access20100201.log",
  "clientip":"クライアントのIPアドレス",
  "ident":"-",
  "auth":"-",
  "timestamp":"01/Feb/2010:00:00:26 +0900",
  "verb":"GET",
  "request":"/images/favicon.ico",
  "httpversion":"1.1",
  "response":"200",
  "bytes":"318",
  "referrer":"\"-\"",
  "agent":"\"Mozilla/5.0 (Windows; U; Windows NT 5.1; ja; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 (.NET CLR 3.5.30729)\""
}
3. 日付のパース(filter/date)

Logstashは特に指定がない場合、inputでデータを取り出した日付が@timestampとなります。 そして、このフィールドが特に指定がない場合は、Elasticsearchのデータの日付となり、Kibanaで利用する日付となります。

リアルタイムにアクセスログを読み込む場合は、読み込んだ日時でもほぼ問題はありませんが、過去データの場合はそうもいきません。 そこで、dateフィルタを使用して、@timestampの値を書き換えます。

date {
  match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
  locale => en
}

上記では、timestampという項目に対してdd/MMM/YYYY:HH:mm:ss Zという日付パターンの場合に値を書き換える設定となります。 なお、日付の月の部分がFebとなっているため、localeenを指定しています。Logstashが動作するマシンのlocalejaなどの場合にパースに失敗するためです。

4. クライアントIPアドレスにgeoipの情報を付加(filter/geoip)

どの国からのアクセスかなどを判別したいので、IPアドレスを元にgeoipを利用してより詳細な情報を付与します。 Logstashでもこの機能が用意されており、簡単に利用ができます。

geoip {
  source => ["clientip"]
}

これだけです。対象とするIPアドレスのフィールドを指定しているだけです。 geoipというフィールドが追加され、次のような情報が付与されます。 国名、緯度経度、タイムゾーンなどです。

{
  ...  
  "geoip": {
    "ip": "IPアドレス",
    "country_code2": "JP",
    "country_code3": "JPN",
    "country_name": "Japan",
    "continent_code": "AS",
    "latitude": 36,
    "longitude": 138,
    "timezone": "Asia/Tokyo",
    "location": [
      138,
      36
    ]
  }
  ...
}
5. リクエストのパスの第1階層の抽出(filter/grok)

リクエストされたURLはrequestフィールドにありますが、個別のURLだと、大まかな集計が大変です。 もちろん、クエリで処理することもできますが、Logstashで処理するついでに、第1階層のディレクトリ名を抽出しておくことで、 検索や集計を行いやすくしておきます。

grok {
  match => { "request" => "^/%{WORD:first_path}/%{GREEDYDATA}$" }
  tag_on_failure => ["_request_parse_failure"]
}

また、grokフィルタの登場です。 今回は、WORD:first_pathという記述方法で、WORDパターンにマッチした文字列をfirst_pathというフィールドに展開する指定をしています。

例えば、サイトのスクリプトなどがscriptsというディレクトリにある場合は、first_pathの値を利用して、 後続のフィルタでログデータを出力しないといった処理にも使えます。

6. ユーザエージェントのパース(filter/useragent)

Logstashではユーザエージェントの文字列から、いくつかの情報を付与するフィルタも用意されています。 useragentフィルタです。

useragent {
  source => "agent"
  target => "useragent"
}

agentというフィールドにユーザエージェントの文字列があるので、このフィールドに対してフィルタを適用します。 元の文字列も取っておきたいので、useragentという別のフィールドに出力するように指定してあります。

"useragent": {
  "name": "Firefox",
  "os": "Windows XP",
  "os_name": "Windows XP",
  "device": "Other",
  "major": "17",
  "minor": "0"
},

このように、OS名やバージョン名などが抽出できます。

7. Elasticsearchへの出力(output/elasticsearch)

最後は、Elasticsearchへのデータの出力設定です。

indexにて、出力するindex名を指定してあります。 また、年毎のインデックス名にするために%{year}を利用しています。 sprintf formatです。

elasticsearch {
  host => "localhost"
  index => "new_demo_access_log-%{year}"
  cluster => "demo_cluster"
  protocol => "http"
}

まとめ

ということで、今回はアクセスログをLogstashにて読み込む時の設定について説明してきました。 次回は、実際にLogstashを起動してElasticsearchにデータを登録するところまでを説明します。

JJUG CCCや勉強会のデモに用いたデータは、 Elasticsearchにデータを登録する前にテンプレートも設定してありました。こちらについても、次回説明しようと思います。

不明な点、誤植などありましたら、コメント欄へお願いします。


comments powered by Disqus

See Also by Hugo


Related by prelims-cli