HDFSにデータを書き込む際の4つのパラメータ

最近知ったんですがHDFSにデータを書き込む際のパラメータとして以下の4つがあります。
ま、他にもあるんでしょうけど、今回はこの4つを取り上げます。まあ取り上げるというか自分へのメモです。なのである程度前提知識ある人じゃないと読んでも意味不明だと思います(汗

parameter default
dfs.socket.timeout 60seconds
dfs.datanode.socket.write.timeout 480seconds
dfs.client.block.write.retries 3
dfs.client.block.write.locateFollowingBlock.retries 5

なおデータの書き込みはDFSClient内でざっくり下記のようなフローで行っているのでこの辺のソースを読んでみるといろいろ分かってくると思います。

DataStreamer#run → nextBlockOutputStream → locateFollowingBlockを呼び出して得たnodeにたいしてcreateBlockOutputStream


dfs.socket.timeoutは読み込みのtimeout, dfs.datanode.socket.write.timeoutは書き込みのtimeoutを設定します。

createBlockOutputStreamの一部を抜粋すると以下のようになっています。なお本エントリはhadoop 1.0.4のソースを対象にしています。

        LOG.debug("Connecting to " + nodes[0].getName());
        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
        s = socketFactory.createSocket();
        timeoutValue = 3000 * nodes.length + socketTimeout;
        NetUtils.connect(s, target, timeoutValue);
        s.setSoTimeout(timeoutValue);
        s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
        LOG.debug("Send buf size " + s.getSendBufferSize());
        long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
                            datanodeWriteTimeout;

        //
        // Xmit header info to datanode
        //
        DataOutputStream out = new DataOutputStream(
            new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), 
                                     DataNode.SMALL_BUFFER_SIZE));
        blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));

        out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
        out.write( DataTransferProtocol.OP_WRITE_BLOCK );
        out.writeLong( block.getBlockId() );
        out.writeLong( block.getGenerationStamp() );
        out.writeInt( nodes.length );
        out.writeBoolean( recoveryFlag );       // recovery flag
        Text.writeString( out, client );
        out.writeBoolean(false); // Not sending src node information
        out.writeInt( nodes.length - 1 );
        for (int i = 1; i < nodes.length; i++) {
          nodes[i].write(out);
        }
        accessToken.write(out);
        checksum.writeHeader( out );
        out.flush();

        // receive ack for connect
        pipelineStatus = blockReplyStream.readShort();
        firstBadLink = Text.readString(blockReplyStream);
        if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
          if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
            throw new InvalidBlockTokenException(
                "Got access token error for connect ack with firstBadLink as "
                    + firstBadLink);
          } else {
            throw new IOException("Bad connect ack with firstBadLink as "
                + firstBadLink);
          }
        }

        blockStream = out;
        result = true;     // success

以下の部分が読み込みのtimeoutを設定している部分です。

 timeoutValue = 3000 * nodes.length + socketTimeout;

java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for

とか言われた場合の69000というのは 3000 * dfs.replication + dfs.socket.timeout に相当します。

この辺は以前書いたHDFSのファイル書き込み部分のソースを読んでみた Part2 - wyukawa’s blogにもちょっと関係しますね。

以下の部分が書き込みのtimeoutを設定している部分です。

        long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
                            datanodeWriteTimeout;

こちらは5000 * dfs.replication + dfs.datanode.socket.write.timeoutに相当します。


dfs.client.block.write.retriesはdatanodeへの書き込みのretry回数です。
dfs.client.block.write.retriesを使うnextBlockOutputStreamメソッドは下記のようになっています。

    /**
     * Open a DataOutputStream to a DataNode so that it can be written to.
     * This happens when a file is created and each time a new block is allocated.
     * Must get block ID and the IDs of the destinations from the namenode.
     * Returns the list of target datanodes.
     */
    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
      LocatedBlock lb = null;
      boolean retry = false;
      DatanodeInfo[] nodes;
      int count = conf.getInt("dfs.client.block.write.retries", 3);
      boolean success;
      do {
        hasError = false;
        lastException = null;
        errorIndex = 0;
        retry = false;
        nodes = null;
        success = false;
                
        long startTime = System.currentTimeMillis();

        DatanodeInfo[] excluded = excludedNodes.toArray(new DatanodeInfo[0]);
        lb = locateFollowingBlock(startTime, excluded.length > 0 ? excluded : null);
        block = lb.getBlock();
        accessToken = lb.getBlockToken();
        nodes = lb.getLocations();
  
        //
        // Connect to first DataNode in the list.
        //
        success = createBlockOutputStream(nodes, clientName, false);

        if (!success) {
          LOG.info("Abandoning block " + block);
          namenode.abandonBlock(block, src, clientName);

          if (errorIndex < nodes.length) {
            LOG.info("Excluding datanode " + nodes[errorIndex]);
            excludedNodes.add(nodes[errorIndex]);
          }

          // Connection failed.  Let's wait a little bit and retry
          retry = true;
        }
      } while (retry && --count >= 0);

      if (!success) {
        throw new IOException("Unable to create new block.");
      }
      return nodes;
    }

こちらも参考になると思います。
Day After Neet: DFSClientで"Bad connect ack with firstBadLink..."が稀に発生する時の対処


dfs.client.block.write.locateFollowingBlock.retriesはnamenodeへの接続のretry回数ですね。
nextBlockOutputStreamから呼ばれdfs.client.block.write.locateFollowingBlock.retriesを使うlocateFollowingBlockメソッドは下記のようになっています。

    private LocatedBlock locateFollowingBlock(long start,
                                              DatanodeInfo[] excludedNodes
                                              ) throws IOException {     
      int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
      long sleeptime = 400;
      while (true) {
        long localstart = System.currentTimeMillis();
        while (true) {
          try {
            if (serverSupportsHdfs630) {
              return namenode.addBlock(src, clientName, excludedNodes);
            } else {
              return namenode.addBlock(src, clientName);
            }
          } catch (RemoteException e) {
            IOException ue = 
              e.unwrapRemoteException(FileNotFoundException.class,
                                      AccessControlException.class,
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class);
            if (ue != e) { 
              throw ue; // no need to retry these exceptions
            }

            if (e.getMessage().startsWith(
                  "java.io.IOException: java.lang.NoSuchMethodException: " +
                  "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock(" +
                  "java.lang.String, java.lang.String, " +
                  "[Lorg.apache.hadoop.hdfs.protocol.DatanodeInfo;)")) {
              // We're talking to a server that doesn't implement HDFS-630.
              // Mark that and try again
              serverSupportsHdfs630 = false;
              continue;
            }

            if (NotReplicatedYetException.class.getName().
                equals(e.getClassName())) {

                if (retries == 0) { 
                  throw e;
                } else {
                  --retries;
                  LOG.info(StringUtils.stringifyException(e));
                  if (System.currentTimeMillis() - localstart > 5000) {
                    LOG.info("Waiting for replication for "
                        + (System.currentTimeMillis() - localstart) / 1000
                        + " seconds");
                  }
                  try {
                    LOG.warn("NotReplicatedYetException sleeping " + src
                        + " retries left " + retries);
                    Thread.sleep(sleeptime);
                    sleeptime *= 2;
                  } catch (InterruptedException ie) {
                  }
                }
            } else {
              throw e;
            }
          }
        }
      } 
    }

こちらも参考になると思います。
hadoop fs コマンドで dfs にコピーする際のリトライ回数を増やすには - n3104のブログ