HadoopのMapReduceのシャッフル

象本2版の6.4 シャッフルとソートを読んでMapReduceのシャッフルって面白いし興味深いなーと思い、ついでに軽くHadoop 0.20系のソースもあわせて読んでみたのでメモっておく。

シャッフルっていうとまずソートのイメージだよね。ていうか僕自身はそうだった。

ワードカウントの例でいうとこんな感じ。キーでソートしてるね。

まあソートもメインな処理なんだろうけど他にもいろいろやってます。

map関数の出力からreduce関数の入力までの一連の処理をすべてシャッフルと呼ぶと思うんだけど、以下のようなことをやってます。

まずmapタスク側の処理を見て行きましょう。HadoopのソースはMapTaskが該当します。

mapタスクでは出力が書き込まれる循環メモリバッファがあります。これがMapOutputBufferに相当します。

このバッファの大きさがデフォルト100Mで80%書き込まれるとバックグラウンドスレッド(SpillThread)がディスクにスピルファイルとして書き出します。

sortAndSpillというメソッドのなかでソート、combineして書き出しているようです。

スピルファイルはパーティション化され、マージされます。これでmapタスクが終了します。

次にreduceタスク側を見ていきましょう。HadoopのソースはReduceTaskが該当します。

まずコピーフェーズというのがあります。

これはmapの出力をコピーしてくることです。ReduceCopier#fetchOutputsメソッドが該当します。

MapOutputCopierというコピースレッドによって並列にmapの出力を取得します。

ちなみにmapとreduceが同じノードで動くとは限らないので場合によってはネットワーク越しのコピーになります。

その場合どうやっているかというとHTTP通信します。MapOutputServletというサーブレットからdoGetします。

HadoopはJettyというサーブレットコンテナを内蔵しており、TaskTracker起動時にサーブレットコンテナも起動します。

コピーフェーズが終わるとマージしてreduce関数に処理が移ります。

だいたいこんな感じのようです。


そういえばmap数が1000とかでreduce数が10とかのジョブの場合にmap処理が完了してもなかなかreduce処理が終わらなかった記憶がありますが、シャッフルのコピーフェーズで時間かかっていたのかもしれませんね。