@johtaniの日記 2nd

@johtani ‘s blog 2nd edition

Stream2esと複数データの登録

kopfの記事の続きも書く必要があるんだけど、こんなツイートを見つけてしまったので。。。

前に思いついたけど、放ったらかしにしてた疑問が再浮上してきたので、せっかくだから調べてみようかなと。

複数JSONデータがある場合にもっと楽にデータを入れる方法ないかなぁと思って、これかな?というのがあったのですが、 そのまま手を動かさずに放置してたので、一念発起してブログ書いてます。

Bulk APIって?

ElasticsearchはURLにアクセスしてデータを登録できます。 基本的には次のように1件毎の登録になります。

1
2
3
4
5
6
7
8
$ curl -XPUT http://localhost:9200/bookshop/books/1 -d
'
{
  "book_id" : 1,
  "title" : "ElasticSearch Server Japanese Edition",
  "price" : 3024,
  "publisher" : "KADOKAWA"
}'

これでもいいのですが、大量のデータを登録するときは、Elasticsearch側での効率が悪いです。 そこで、Elasticsearchは大量データを登録するためにBulk APIというものを用意しています。

これは、次のような形式のJSONを作ってデータを登録します。

1
2
3
4
{ "index" : { "_index" : "bookshop", "_type" : "books", "_id" : "1" } }
{ "book_id" : 1, "title" : "ElasticSearch Server Japanese Edition", "price" : 3024, "publisher" : "KADOKAWA"}
{ "index" : { "_index" : "bookshop", "_type" : "books", "_id" : "2" } }
{ "book_id" : 2, "title" : "Introduction of Apache Solr", "price" : 3888, "publisher" : "gihyo"}

これは、次のような構成になっています。

1
2
3
4
5
コマンド
データ
コマンド
データ
...

これで効率よくデータが登録できるのですが、このようなJSONデータを別途作って上げる必要が出てきます。 結局、複数のJSONがあるのに、特殊なJSONを生成しないといけないということでプログラム書いて実行することになります。 これだと、Elasticsearchへのアクセスをプログラムで書くのとあまり大差がないかもしれません。

stream2es

もっとお手軽に複数のJSONを登録できないかな?と目をつけていたのが、stream2esです。

どんなもの?

Clojureで作られた、Elasticsearchにデータを流し込むためのツールです。 Java 7がインストールされていれば、ダウンロードしてくれば動作せることができます。

インストール

公式ページに載っている方法そのままです。

1
$ curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es

実行したディレクトリにコマンドがコピーされます。 あとは、コマンドを実行すればOKです。

実行

データは次のような形式でsample.jsonに保存してあるとします。

1
2
{ "book_id" : 1, "title" : "ElasticSearch Server Japanese Edition", "price" : 3024, "publisher" : "KADOKAWA"}
{ "book_id" : 2, "title" : "Introduction of Apache Solr", "price" : 3888, "publisher" : "gihyo"}

先ほどのBulk APIで利用したJSONよりも、スッキリしていますね。 1行1ドキュメント1JSONです。

あとは、次のコマンドを実行するだけです。

1
$ ./stream2es stdin --target http://localhost:9200/bookshop/books < sample.json

ファイルをstream2esに流し込んで、stream2esが1行ずつパースして、Elasticsearchに投げ込んでくれます。

登録されたデータは次のようになります。 IDは自動で付与されています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
   "_index": "bookstore",
   "_type": "books",
   "_id": "0Hvy4IJCRkKrvGb4Dgam_w",
   "_version": 1,
   "found": true,
   "_source": {
      "book_id": 1,
      "title": "ElasticSearch Server Japanese Edition",
      "price": 3024,
      "publisher": "KADOKAWA"
   }
},
{
   "_index": "bookstore",
   "_type": "books",
   "_id": "b9M6TooFQzGYyJeix_t_WA",
   "_version": 1,
   "found": true,
   "_source": {
      "book_id": 2,
      "title": "Introduction of Apache Solr",
      "price": 3888,
      "publisher": "gihyo"
   }
}

せっかく、book_idがあるんだし、_idをインデックスの設定に指定します。

1
2
3
4
5
6
7
8
9
10
11
$ curl -XDELETE http://localhost:9200/bookshop
$ curl -XPUT http://localhost:9200/bookshop -d '
{
  "mappings": {
    "books" : {
      "_id" : {
        "path": "book_id"
      }
    }
  }
}'

あとは、登録すればbook_id_idに採用されます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
   "_index": "bookshop",
   "_type": "books",
   "_id": "1",
   "_version": 1,
   "found": true,
   "_source": {
      "book_id": 1,
      "title": "ElasticSearch Server Japanese Edition",
      "price": 3024,
      "publisher": "KADOKAWA"
   }
},
{
   "_index": "bookshop",
   "_type": "books",
   "_id": "2",
   "_version": 1,
   "found": true,
   "_source": {
      "book_id": 2,
      "title": "Introduction of Apache Solr",
      "price": 3888,
      "publisher": "gihyo"
   }
}

複数ファイル

ディレクトリに複数のJSONファイルが有った場合は、次のようなコマンドでOK

1
$ cat sample_data/*.json |./stream2es stdin --target http://localhost:9200/bookshop/books

まぁ、catして流してるだけですが。。。

ダメだったケース

  • JSONが複数行になっているようなデータだとエラーが出てしまいました。
    jqコマンドで1行に整形したりできるかなぁ?)

  • また、1行に2つのJSONが書いてある場合は、1つ目のJSONをパースしたら、そこでおしまいみたいで、その後に記述されたデータは登録されませんでした。

インデックスがない場合

stream2esで登録するインデックスがElasticsearchに存在しない場合、stream2esがインデックスを作成してくれるのですが、 この時、シャード数などはstream2es内部に記述があるので注意が必要です。 以下がその設定です。

  • index.number_of_shards : 2
  • index.number_of_replicas : 0
  • index.refresh_interval : 5s

課題?

内部的にはおそらく、Bulkでデータを登録していると思うのですが、まだよくわかっていません。 Clojureが読めないので、せっかくだから、Clojureの勉強も兼ねてちょっとソースを読んでみようかなと思います。 それほど量があるわけでもないので。

あとは、その他にWikipediaのデータやTwitterのデータ登録、 ElasticsearchからデータをScrollで読み出しつつ、別のElasticsearchに流しこむといったこともできそうなので、そちらも試してみようかと。 他にもオプションがいくつかありそうです。

今回は2件ほどでしたが、大量データを流し込んだ時にどうなるか(stream2esが悲鳴を上げるのか、Elasticsearchで詰まることがあったらどうなるか)なども 気になるので、なんか適当なデータで試してみるのもいいかなぁと。 (ということで、だれか、いろいろ試してみてもらえると楽できるなぁ。)

Comments