Hadoopを使ったログ解析システムにおける時間別のジョブ、パーティションについてつらつらと書いてみる。

ログを1時間毎にためてそれをHadoopで処理するなんていうのはよくあるパターンではないかと思う。例えば時間別のPV, UUを求めたいとかね。

その場合20130806-1800, 20130806-1900みたいなHiveパーティションを用意するのではないだろうか。20130806-1800パーティションには2013/08/06の18:00-18:59:59までのログがあると思ってくだされ。

ただ大抵はログを出力しているマシンとログを収集してHDFSにputするマシンとは別なので転送に時間がかかる。
またログの中身のタイムスタンプを見ないでHDFSにputする場合はHiveパーティションの時間とログの時間が必ずしも一致するとは限らない。

例えばログファイルの中身に18:59:59のレコードがあったとしてもそれをHDFSにputする頃には19時台になっているため20130806-1900のパーティションに18:59:59ののレコードがある可能性がある。

まあそれはともかくとして20130806-1900のパーティションのデータをinputとしてhourlyのジョブが20:10分に実行されるとしよう。毎時10分に1時間前のログを処理するジョブがあるというイメージである。

理想的なケースではこれはうまくいくが世の中そう単純ではない。

例えばジョブが非常に混み合っていて本来なら20:10に実行されるべきジョブが21:10になっても始まらないというようなケースがある。二重起動しないようにしたほうが良いというのもあるが、このようなケースではジョブがどんどん遅れることになる。まあそうでなくても一時的にジョブをkillすることはありうるので、必ず毎時10分に1時間前のログを処理できると想定することはあまり現実的ではない。

なのでそのようなケースでも対応できるようにしておくのがベターである。

どう対応すべきかは完全にケースバイケースだがHDFS上のinputを受け取ってMapReduceでHFileつくってHBaseにbulk loadするようなケースならinputを複数持つことも可能だろう。FileInputFormat#setInputPathsは複数のpathを持てるのでこれで対応する。どこまで処理をしたかのチェックポイントをどこかに持っておくのも重要である。チェックポイントが20130806-1800で21:10にジョブが走るなら20130806-1900と20130806-2000がinputとなるようなイメージである。

これは複数のinputに対してもoutputが1つだから出来る技で、そうでないケースもある。

例えば20130806-1900のパーティションのデータを集計してMySQLに20130806-1900をキーにinsertするようなケースである。この場合はinputが20130806-1900と20130806-2000なら1回のジョブで2回集計処理が走るようにしておくと良いと思う。

要はジョブの実行が遅れても遅れを取り戻せるような仕組みにしておくと良いという話である。

いくぶん抽象的な話を書いたのでちょっと伝わりづらいと思うが、なんとなく書いてみたでござる。