ShuffleがうまくいかなくてReduceが遅くなる現象

たまに起こる現象でちょっと困っているのがMapReduceのジョブを実行していてShuffleがうまくいかなくてReduceが遅くなる現象というのがある。

遅くなっているMapReduceのジョブのログをみるとこんな感じになっていてTaskTrackerからのcopyに失敗しているように見える。

2013-06-24 21:35:04,289 WARN org.apache.hadoop.mapred.ReduceTask: attempt_… copy failed: attempt_… from TaskTrackerのホスト名
2013-06-24 21:35:04,289 WARN org.apache.hadoop.mapred.ReduceTask: java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:129)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:256)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
        at sun.net.www.http.ChunkedInputStream.fastRead(ChunkedInputStream.java:221)
        at sun.net.www.http.ChunkedInputStream.read(ChunkedInputStream.java:662)
        at java.io.FilterInputStream.read(FilterInputStream.java:116)
        at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:2668)
        at org.apache.hadoop.mapred.IFileInputStream.doRead(IFileInputStream.java:149)
        at org.apache.hadoop.mapred.IFileInputStream.read(IFileInputStream.java:101)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.shuffleInMemory(ReduceTask.java:1699)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.getMapOutput(ReduceTask.java:1545)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.copyOutput(ReduceTask.java:1394)
        at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$MapOutputCopier.run(ReduceTask.java:1326)

で、該当のTaskTrackerのログをみるとこんな感じになっている。

2013-06-24 23:59:09,669 WARN org.mortbay.log: Committed before 410 getMapOutput(attempt_...) failed :
java.io.IOException: Broken pipe
        at sun.nio.ch.EPollArrayWrapper.interrupt(Native Method)
        at sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256)
        at sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175)
        at org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831)
        at org.mortbay.io.nio.SelectChannelEndPoint.updateKey(SelectChannelEndPoint.java:335)
        at org.mortbay.io.nio.SelectChannelEndPoint.blockWritable(SelectChannelEndPoint.java:278)
        at org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator.java:545)
        at org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:572)
        at org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012)
        at org.apache.hadoop.mapred.TaskTracker$MapOutputServlet.doGet(TaskTracker.java:3917)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
        at org.apache.hadoop.http.HttpServer$QuotingInputFilter.doFilter(HttpServer.java:835)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
        at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
        at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
        at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
...
2013-06-24 23:59:09,669 ERROR org.mortbay.log: /mapOutput
java.lang.IllegalStateException: Committed
        at org.mortbay.jetty.Response.resetBuffer(Response.java:1023)
        at org.mortbay.jetty.Response.sendError(Response.java:240)
        at org.apache.hadoop.mapred.TaskTracker$MapOutputServlet.doGet(TaskTracker.java:3945)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
        at org.apache.hadoop.http.HttpServer$QuotingInputFilter.doFilter(HttpServer.java:835)
        at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:399)
        at org.mortbay.jetty.security.SecurityHandler.handle(SecurityHandler.java:216)
        at org.mortbay.jetty.servlet.SessionHandler.handle(SessionHandler.java:182)
        at org.mortbay.jetty.handler.ContextHandler.handle(ContextHandler.java:766)
        at org.mortbay.jetty.webapp.WebAppContext.handle(WebAppContext.java:450)
        at org.mortbay.jetty.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:230)
        at org.mortbay.jetty.handler.HandlerWrapper.handle(HandlerWrapper.java:152)
        at org.mortbay.jetty.Server.handle(Server.java:326)
        at org.mortbay.jetty.HttpConnection.handleRequest(HttpConnection.java:542)
        at org.mortbay.jetty.HttpConnection$RequestHandler.headerComplete(HttpConnection.java:928)
        at org.mortbay.jetty.HttpParser.parseNext(HttpParser.java:549)
        at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
        at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
        at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
        at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

map数、reduce数が多い(約数千のオーダー)ジョブが実行されている場合にこうなることが多いのだが詳細は不明。
対処策としては該当のTaskTrackerを再起動すれば一応解決する。

shuffleのメトリクスを取ってみてみようと思ってhttp://localhost:50060/jmxHadoop:service=TaskTracker,name=ShuffleServerMetricsから取れるメトリクスをモニタリングしてみると本現象が起こっているときはshuffle_exceptions_caughtが上昇している。

#ただshuffle_exceptions_caughtが上昇してなくてもreduceが遅くなっていることもある。。。

またGraylisted Nodesにも登録されているという状況だった。

この問題の困ったところは1台のTaskTrackerのせいで動いている全てのジョブが影響を受けてしまうところ。

JobTrackerのメトリクス(Hadoop:service=JobTracker,name=JobTrackerInfoのGraylistedNodesInfoJsonおよびBlacklistedNodesInfoJson)を見てGraylisted NodesもしくはBlacklisted Nodesに登録されていたらTaskTrackerを再起動するようにcronを仕込んで今は様子を見ているところ。