Fluentd+WebHDFSでappend書き込みしてて遭遇したエラー

fluent-plugin-webhdfs経由でHadoop 1.2.1のHDFSにデータをappendで書き込むということをやっているのだが、最近エラーが出るようになって困っている。再現性は無し。

fluentdのログは下記のような感じ。要はある瞬間からHDFSに書き込めなくなってそのままバッファあふれというもの。fluentdが悪さしているというよりHDFSの状態が悪くて出力が詰まってバッファあふれということだと思う。
書き込めなくなった該当ファイルがCORRUPTになることもあればならないこともある。仮にCORRUPTにならなくても後述のようにchecksumエラーになってmapreduceのジョブは失敗します。

2014-02-04 18:18:46 +0900 [warn]: failed to communicate hdfs cluster, path: ...
2014-02-04 18:18:46 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-02-04 18:18:34 +0900 error_class="WebHDFS::IOError" error="{\"RemoteException\":{\"exception\":\"IOException\",\"javaClassName\":\"java.io.IOException\",\"message\":\"Error Recovery for block blk_2668047020098952858_1087552928 failed  because recovery from primary datanode … failed 6 times.  Pipeline was .... Aborting...\"}}" instance=188840060
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/webhdfs-0.5.5/lib/webhdfs/client_v1.rb:317:in `request'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/webhdfs-0.5.5/lib/webhdfs/client_v1.rb:242:in `operate_requests'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/webhdfs-0.5.5/lib/webhdfs/client_v1.rb:56:in `append'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluent-plugin-webhdfs-0.2.1/lib/fluent/plugin/out_webhdfs.rb:182:in `send_data'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluent-plugin-webhdfs-0.2.1/lib/fluent/plugin/out_webhdfs.rb:200:in `write'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/buffer.rb:292:in `write_chunk'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/buffer.rb:272:in `pop'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/output.rb:305:in `try_flush'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/output.rb:131:in `run'
2014-02-04 18:18:46 +0900 [warn]: failed to communicate hdfs cluster, path: ...
2014-02-04 18:18:46 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-02-04 18:18:36 +0900 error_class="WebHDFS::IOError" error="{\"RemoteException\":{\"exception\":\"AlreadyBeingCreatedException\",\"javaClassName\":\"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException\",\"message\":\"org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file … for DFSClient_NONMAPREDUCE_-267011383_6211186 on client …, because this file is already being created by DFSClient_NONMAPREDUCE_-1261157395_6211186 on ...\"}}" instance=188840060
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/webhdfs-0.5.5/lib/webhdfs/client_v1.rb:317:in `request'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/webhdfs-0.5.5/lib/webhdfs/client_v1.rb:242:in `operate_requests'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/webhdfs-0.5.5/lib/webhdfs/client_v1.rb:56:in `append'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluent-plugin-webhdfs-0.2.1/lib/fluent/plugin/out_webhdfs.rb:182:in `send_data'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluent-plugin-webhdfs-0.2.1/lib/fluent/plugin/out_webhdfs.rb:200:in `write'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/buffer.rb:292:in `write_chunk'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/buffer.rb:272:in `pop'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/output.rb:305:in `try_flush'
  2014-02-04 18:18:46 +0900 [warn]: /usr/local/ruby-1.9.3-p484/lib/ruby/gems/1.9.1/gems/fluentd-0.10.41/lib/fluent/output.rb:131:in `run'
  
  ...
  
  2014-02-04 19:41:37 +0900 [warn]: emit transaction failed  error_class=Fluent::BufferQueueLimitError error=#<Fluent::BufferQueueLimitError: queue size exceeds limit>
  
  ...

fluentdの設定はこんな感じ。1台で複数プロセス動かしており<>はポート番号で置換します。別に複数プロセスから同じファイルに書き込んではいないです。

  <store>
    type webhdfs
    host ...
    port ...
    username ...
    path .../${hostname}_<<PORT>>.log

    field_separator TAB
    output_include_time false
    output_include_tag false
    output_data_type ...

    flush_interval 1
    buffer_chunk_limit 128M
    buffer_queue_limit 32
  </store>

open_timeout,read_timeoutを増やしてみましたが変わらない感じ。append noにすればいいのかもだけどファイルが大量に出来るので微妙。flush_intervalが1になっているけど10とかにするとloadが跳ね上がりますね。

hdfs-site.xmlはこんな感じ

        <property>
                <name>dfs.webhdfs.enabled</name>
                <value>true</value>
        </property>
        <property>
                <name>dfs.support.append</name>
                <value>true</value>
        </property>
        <property>
                <name>dfs.support.broken.append</name>
                <value>true</value>
        </property>

エラー時のNameNodeのログはこんな感じ。

2014-02-04 18:18:46,148 ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:... cause:org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file … for DFSClient_NONMAPREDUCE_-267011383_6211186 on client …, because this file is already being created by DFSClient_NONMAPREDUCE_-1261157395_6211186 on ...
2014-02-04 18:18:46,148 INFO org.apache.hadoop.ipc.Server: IPC Server handler 83 on 54310, call append(…, DFSClient_NONMAPREDUCE_-267011383_6211186) from …: error: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file … for DFSClient_NONMAPREDUCE_-267011383_6211186 on client ..., because this file is already being created by DFSClient_NONMAPREDUCE_-1261157395_6211186 on …
org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException: failed to create file … for DFSClient_NONMAPREDUCE_-267011383_6211186 on client …, because this file is already being created by DFSClient_NONMAPREDUCE_-1261157395_6211186 on ...
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:1764)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:1586)
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:1784)
        at org.apache.hadoop.hdfs.server.namenode.NameNode.append(NameNode.java:725)
        at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426)

DataNodeのログはこんな感じ。checksumがおかしいみたい。

java.io.IOException: Block blk_2668047020098952858_1087552928 is of size 125057123 but has 244253 checksums and each checksum size is 4 bytes.
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.validateBlockMetadata(FSDataset.java:1928)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.startBlockRecovery(FSDataset.java:2193)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.startBlockRecovery(DataNode.java:1844)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2033)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2175)
        at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426)
2014-02-04 18:18:44,088 ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:blk_2668047020098952858_1087552928 cause:java.io.IOException: All datanodes failed: block=blk_2668047020098952858_1087552928, datanodeids=[...]
2014-02-04 18:18:44,088 INFO org.apache.hadoop.ipc.Server: IPC Server handler 42 on 50020, call recoverBlock(blk_2668047020098952858_1087552928, true, [Lorg.apache.hadoop.hdfs.protocol.DatanodeInfo;@58a8ed9f) from …: error: java.io.IOException: All datanodes failed: block=blk_2668047020098952858_1087552928, datanodeids=[…]
java.io.IOException: All datanodes failed: block=blk_2668047020098952858_1087552928, datanodeids=[...]
        at org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2089)
        at org.apache.hadoop.hdfs.server.datanode.DataNode.recoverBlock(DataNode.java:2175)
        at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:587)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1432)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1428)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426)

書き込めなくなったファイルをinputとしたmapreduceジョブを実行すると下記のようにchecksumエラーとなります。

org.apache.hadoop.fs.ChecksumException: Checksum error: /blk_2668047020098952858:of:...
	at org.apache.hadoop.fs.FSInputChecker.verifySum(FSInputChecker.java:277)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:241)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:189)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:158)
	at org.apache.hadoop.hdfs.DFSClient$RemoteBlockReader.read(DFSClient.java:1592)
	at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.readBuffer(DFSClient.java:2366)
	at org.apache.hadoop.hdfs.DFSClient$DFSInputStream.read(DFSClient.java:2418)
	at java.io.DataInputStream.read(DataInputStream.java:100)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:205)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:169)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:139)
	at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:531)
	at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)

使っているHadoop 1.2.1はappend branch系統じゃないのでその辺がいけないという可能性が高いような気がするのだが、上記エラーに心当たりある方はアドバイスいただけると幸いです。

たぶんCDHだとappend branchをええ感じにbackportしていると思うので上記エラーには遭遇しないかも。