目次
river-wikipediaの前々回の記事で書きましたが、bulk_sizeに関連して登録件数がやけにきりが良いのが気になると書いていました。
で、Riverの仕組みを勉强がてら、elasticsearch-river-wikipediaのソース(1.2.0)を読んでみました。
Riverの作り
Riverはorg.elasticsearch.river.Riverというinterfaceを実装することで作らています。 ただ、Riverがinterfaceとなっていますが、o.e.river.AbstractRiverComponentというクラスを継承して作られています。
AbstractRiverComponentにはRiverの名前や設定などが用意されています。 ま、ここはそれほど重要じゃないので、軽く流してと。
Riverの設定関連は実装したRiverクラス(ここでは、WikipediaRiverクラス)のコンストラクタで、設定値の読み取りなどの記述を記載します。
このコンストラクタが、_river/hogehoge/_meta
をPUTした時のJSONを元にElasticSearchから呼ばれて、Riverのインスタンスが作成されます。(たぶん、このへんがその処理だと思う。。。このあたりはまた今度)
実際のRiverの処理はWikipediaRiverクラスのstart()メソッド内部に記述されています。
@Override
public void start() {
logger.info("starting wikipedia stream");
try {
① client.admin().indices().prepareCreate(indexName).execute().actionGet();
} catch (Exception e) {
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
// that's fine
} else if (ExceptionsHelper.unwrapCause(e) instanceof ClusterBlockException) {
// ok, not recovered yet..., lets start indexing and hope we recover by the first bulk
// TODO: a smarter logic can be to register for cluster event listener here, and only start sampling when the block is removed...
} else {
logger.warn("failed to create index [{}], disabling river...", e, indexName);
return;
}
}
② currentRequest = client.prepareBulk();
③ WikiXMLParser parser = WikiXMLParserFactory.getSAXParser(url);
try {
④ parser.setPageCallback(new PageCallback());
} catch (Exception e) {
logger.error("failed to create parser", e);
return;
}
⑤ thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "wikipedia_slurper").newThread(new Parser(parser));
thread.start();
}
内部では
- インデックスの作成
- バルクアップデート用クライアントの設定
- WikiXMLのパーサの初期化
- ページごとにキックされるコールバック処理の登録
- デーモンスレッドの起動と起動
といった処理の流れになっています。
で、このスレッドの起動後は、4.で用意したparser.parse()処理がグルグル回ります。
1ページがパースされるたびに、WikipediaRiver.PageCallback
クラスのproess()
メソッドが呼ばれます。
このメソッドの最後で、processBulkIfNeeded()
メソッドが呼ばれています。ここで、実際にパースしたページをインデックスに登録する処理が実行されます。
このメソッドの1行目が鍵でした。 bulkSize以上の件数がバルクのリクエストに貯まった時だけ、実際にインデックスに登録する処理が実行されます。 このため、スレッドが回っている間は、bulkSize以上のデータが貯まらないと、インデックスへの登録は行われないわけです。
次に、このスレッドを止めるには、前々回書いたように、_riverにPUTした、Riverの設定をDELETEするしかありません。(あとは、ElasticSearchを停止するとかでしょうか。)
で、DELETEが実行される呼ばれるのが、WikipediaRiver
クラスのclose()
メソッドです。
@Override
public void close() {
logger.info("closing wikipedia river");
closed = true;
if (thread != null) {
thread.interrupt();
}
}
見ていただくと分かりますが、スレッド止めて終了です。
問題点は?
ということで、
- WikipediaのXMLを読み込んでもRiverは停止しない
- Riverの停止を行ってもスレッドが止められるだけ。
- bulkSize以下の件数が
currentRequest
に残っているけど、破棄される
とまぁ、こんな流れになっているので、最後の端数のドキュメントがインデックスに登録されないようです。 (まだ、ちゃんと確認してないんですが、備忘録のため先に書いちゃいました。。。)
じゃあ、全部うまく登録するにはどうしたもんかなぁと。 いまのところ思いついたのはこんな感じです。 他にいい案があったら教えて下さい。
- 案1:close()処理の中で、スレッド停止後に、
currentRequest
に貯まっているデータをインデックスに登録しちゃう - 案2:bulkSize以外に、定期的(指定された時間)で登録処理を実行してしまう。
簡単なのでとりあえず、案1を実装してみるかなぁと。 (さっさとコード書けよって話ですね。。。スミマセン) その前にMLで質問ですかねぇ、英語で。
WikipediaのRiverをざっと眺めてみた感じですが、わかりやすい作りだなぁと。 他のRiverがどうなってるかをちゃんと見てませんが、他にもbulkSize指定をするRiverの場合は、このように件数がbulkSizeに満たない状態ではデータが登録されないといったことがあるかもしれません。
ElasticSearchのソースを読み始める取っ掛かりとしては面白いかと思いますので、興味ある方は読んで作ってみるといいかもしれません。(私は読んだだけですがw)
追記(2013/09/13 21:00)
MLで質問してみました。とりあえず、案1を。
river-wikipedia does not index all pages
他のRiverでは対応してるしバグだね、Issue上げてとのことで、あげときました。 ついでにプルリクも出せばいいんでしょうが、プルリクまだやったことないヘタレです。。。
あと、案2についても同じトピックで質問してます。
どうやら、BulkProcessorにその機能があるよと。
flushinterval
というプロパティがありそうです。どうやって設定して、どうやって動くのかとか見てないので、
調査してブログorLTかな。
bulk udpにはその値を設定できそうなのがあるんだよなぁ。
追記その2(2013/09/16 23:50)
さっそく修正版がコミット(コミットログ)されてました。
結構変わってます。BulkProcessorにflush_interval
の設定をすれば、よしなにやってくれる仕組みがすでに実装されているようです。
bulkSize
についても同様に、BulkProcessorに設定すれば良いようです。
Riverの仕組みが結構スッキリしています。
もともと実装されていた、bulkSizeごとの処理も消されています。
確かに、BulkProcessorの仕組みとして実装されている方がしっくりきますね。
ということで、考える暇もなくコミットされてしまいました。 こうやって質問しつつ、少しずつソースを読んでいこうかなと思ってるとこです。
comments powered by Disqus
See Also by Hugo
- ElasticSearchにプラグインで日本語Wikipediaデータを入れてみました
- OData式と日本語の検索(NGram)とフレーズ検索
- 辞書の更新についての注意点
- 日本語Wikipediaをインデクシング(Kuromojiバージョン)
- ElasticsearchのアーキテクチャとStateless / Serverless