@johtaniの日記 2nd

@johtani ‘s blog 2nd edition

Elasticsearch-river-wikipediaの疑問点

river-wikipediaの前々回の記事で書きましたが、bulk_sizeに関連して登録件数がやけにきりが良いのが気になると書いていました。

で、Riverの仕組みを勉强がてら、elasticsearch-river-wikipediaのソース(1.2.0)を読んでみました。

Riverの作り

Riverはorg.elasticsearch.river.Riverというinterfaceを実装することで作らています。 ただ、Riverがinterfaceとなっていますが、o.e.river.AbstractRiverComponentというクラスを継承して作られています。

AbstractRiverComponentにはRiverの名前や設定などが用意されています。 ま、ここはそれほど重要じゃないので、軽く流してと。

Riverの設定関連は実装したRiverクラス(ここでは、WikipediaRiverクラス)のコンストラクタで、設定値の読み取りなどの記述を記載します。 このコンストラクタが、_river/hogehoge/_metaをPUTした時のJSONを元にElasticSearchから呼ばれて、Riverのインスタンスが作成されます。(たぶん、このへんがその処理だと思う。。。このあたりはまた今度)

実際のRiverの処理はWikipediaRiverクラスのstart()メソッド内部に記述されています。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void start() {
    logger.info("starting wikipedia stream");
    try {
            client.admin().indices().prepareCreate(indexName).execute().actionGet();
    } catch (Exception e) {
        if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
            // that's fine
        } else if (ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException) {
            // ok, not recovered yet..., lets start indexing and hope we recover by the first bulk
            // TODO: a smarter logic can be to register for cluster event listener here, and only start sampling when the block is removed...
        } else {
            logger.warn("failed to create index [{}], disabling river...", e, indexName);
            return;
        }
    }
        currentRequest = client.prepareBulk();
        WikiXMLParser parser = WikiXMLParserFactory.getSAXParser(url);
    try {
            parser.setPageCallback(new PageCallback());
    } catch (Exception e) {
        logger.error("failed to create parser", e);
        return;
    }
        thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "wikipedia_slurper").newThread(new Parser(parser));
    thread.start();
}

内部では

  1. インデックスの作成
  2. バルクアップデート用クライアントの設定
  3. WikiXMLのパーサの初期化
  4. ページごとにキックされるコールバック処理の登録
  5. デーモンスレッドの起動と起動

といった処理の流れになっています。

で、このスレッドの起動後は、4.で用意したparser.parse()処理がグルグル回ります。

1ページがパースされるたびに、WikipediaRiver.PageCallbackクラスのproess()メソッドが呼ばれます。 このメソッドの最後で、processBulkIfNeeded()メソッドが呼ばれています。ここで、実際にパースしたページをインデックスに登録する処理が実行されます。

このメソッドの1行目が鍵でした。 bulkSize以上の件数がバルクのリクエストに貯まった時だけ、実際にインデックスに登録する処理が実行されます。 このため、スレッドが回っている間は、bulkSize以上のデータが貯まらないと、インデックスへの登録は行われないわけです。

次に、このスレッドを止めるには、前々回書いたように、_riverにPUTした、Riverの設定をDELETEするしかありません。(あとは、ElasticSearchを停止するとかでしょうか。)

で、DELETEが実行される呼ばれるのが、WikipediaRiverクラスのclose()メソッドです。

1
2
3
4
5
6
7
8
    @Override
    public void close() {
        logger.info("closing wikipedia river");
        closed = true;
        if (thread != null) {
            thread.interrupt();
        }
    }

見ていただくと分かりますが、スレッド止めて終了です。

問題点は?

ということで、

  • WikipediaのXMLを読み込んでもRiverは停止しない
  • Riverの停止を行ってもスレッドが止められるだけ。
  • bulkSize以下の件数がcurrentRequestに残っているけど、破棄される

とまぁ、こんな流れになっているので、最後の端数のドキュメントがインデックスに登録されないようです。 (まだ、ちゃんと確認してないんですが、備忘録のため先に書いちゃいました。。。)

じゃあ、全部うまく登録するにはどうしたもんかなぁと。 いまのところ思いついたのはこんな感じです。 他にいい案があったら教えて下さい。

  • 案1:close()処理の中で、スレッド停止後に、currentRequestに貯まっているデータをインデックスに登録しちゃう
  • 案2:bulkSize以外に、定期的(指定された時間)で登録処理を実行してしまう。

簡単なのでとりあえず、案1を実装してみるかなぁと。 (さっさとコード書けよって話ですね。。。スミマセン) その前にMLで質問ですかねぇ、英語で。

WikipediaのRiverをざっと眺めてみた感じですが、わかりやすい作りだなぁと。 他のRiverがどうなってるかをちゃんと見てませんが、他にもbulkSize指定をするRiverの場合は、このように件数がbulkSizeに満たない状態ではデータが登録されないといったことがあるかもしれません。

ElasticSearchのソースを読み始める取っ掛かりとしては面白いかと思いますので、興味ある方は読んで作ってみるといいかもしれません。(私は読んだだけですがw)

追記(2013/09/13 21:00)

MLで質問してみました。とりあえず、案1を。

river-wikipedia does not index all pages

他のRiverでは対応してるしバグだね、Issue上げてとのことで、あげときました。 ついでにプルリクも出せばいいんでしょうが、プルリクまだやったことないヘタレです。。。

あと、案2についても同じトピックで質問してます。 どうやら、BulkProcessorにその機能があるよと。 flushintervalというプロパティがありそうです。どうやって設定して、どうやって動くのかとか見てないので、 調査してブログorLTかな。

bulk udpにはその値を設定できそうなのがあるんだよなぁ。

追記その2(2013/09/16 23:50)

さっそく修正版がコミット(コミットログ)されてました。 結構変わってます。BulkProcessorにflush_intervalの設定をすれば、よしなにやってくれる仕組みがすでに実装されているようです。 bulkSizeについても同様に、BulkProcessorに設定すれば良いようです。 Riverの仕組みが結構スッキリしています。 もともと実装されていた、bulkSizeごとの処理も消されています。 確かに、BulkProcessorの仕組みとして実装されている方がしっくりきますね。

ということで、考える暇もなくコミットされてしまいました。 こうやって質問しつつ、少しずつソースを読んでいこうかなと思ってるとこです。

Comments