HDFSのファイル書き込み部分のソースを読んでみた Part2

前回はこちら
HDFSのファイル書き込み部分のソースを読んでみた - wyukawa’s blog

ResponseProcessorスレッドをstartする前にnextBlockOutputStreamメソッドを呼び出しデータノードと接続します。

              // get new block from namenode.
              if (blockStream == null) {
                LOG.debug("Allocating new block");
                nodes = nextBlockOutputStream(src); 
                this.setName("DataStreamer for file " + src +
                             " block " + block);
                response = new ResponseProcessor(nodes);
                response.start();
              }

nextBlockOutputStreamメソッドは下記のとおりです。

createBlockOutputStreamメソッドで複製数分のデータノードの集合であるパイプラインの先頭ノードに接続します。

先頭ノードとの接続で
Exception in createBlockOutputStream java.io.IOException: Bad connect ack with firstBadLink
とか
Exception in createBlockOutputStream java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for
とか言われてエラーになった場合はfalseを返すのでリトライします。

タイムアウトの69秒の内訳は60+複製数x3です。複製数が3だと69秒です。

それにしてもdo-while文は久しぶりに見ました。かならず1回は実行するからこうなっているんでしょう。

    /**
     * Open a DataOutputStream to a DataNode so that it can be written to.
     * This happens when a file is created and each time a new block is allocated.
     * Must get block ID and the IDs of the destinations from the namenode.
     * Returns the list of target datanodes.
     */
    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
      LocatedBlock lb = null;
      boolean retry = false;
      DatanodeInfo[] nodes;
      int count = conf.getInt("dfs.client.block.write.retries", 3);
      boolean success;
      do {
        hasError = false;
        lastException = null;
        errorIndex = 0;
        retry = false;
        nodes = null;
        success = false;
                
        long startTime = System.currentTimeMillis();
        lb = locateFollowingBlock(startTime);
        block = lb.getBlock();
        nodes = lb.getLocations();
  
        //
        // Connect to first DataNode in the list.
        //
        success = createBlockOutputStream(nodes, clientName, false);

        if (!success) {
          LOG.info("Abandoning block " + block);
          namenode.abandonBlock(block, src, clientName);

          // Connection failed.  Let's wait a little bit and retry
          retry = true;
          try {
            if (System.currentTimeMillis() - startTime > 5000) {
              LOG.info("Waiting to find target node: " + nodes[0].getName());
            }
            Thread.sleep(6000);
          } catch (InterruptedException iex) {
          }
        }
      } while (retry && --count >= 0);

      if (!success) {
        throw new IOException("Unable to create new block.");
      }
      return nodes;
    }