HDFSのファイル書き込み部分のソースを読んでみた Part2
前回はこちら
HDFSのファイル書き込み部分のソースを読んでみた - wyukawa’s blog
ResponseProcessorスレッドをstartする前にnextBlockOutputStreamメソッドを呼び出しデータノードと接続します。
// get new block from namenode. if (blockStream == null) { LOG.debug("Allocating new block"); nodes = nextBlockOutputStream(src); this.setName("DataStreamer for file " + src + " block " + block); response = new ResponseProcessor(nodes); response.start(); }
nextBlockOutputStreamメソッドは下記のとおりです。
createBlockOutputStreamメソッドで複製数分のデータノードの集合であるパイプラインの先頭ノードに接続します。
先頭ノードとの接続で
Exception in createBlockOutputStream java.io.IOException: Bad connect ack with firstBadLink
とか
Exception in createBlockOutputStream java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for
とか言われてエラーになった場合はfalseを返すのでリトライします。
タイムアウトの69秒の内訳は60+複製数x3です。複製数が3だと69秒です。
それにしてもdo-while文は久しぶりに見ました。かならず1回は実行するからこうなっているんでしょう。
/** * Open a DataOutputStream to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException { LocatedBlock lb = null; boolean retry = false; DatanodeInfo[] nodes; int count = conf.getInt("dfs.client.block.write.retries", 3); boolean success; do { hasError = false; lastException = null; errorIndex = 0; retry = false; nodes = null; success = false; long startTime = System.currentTimeMillis(); lb = locateFollowingBlock(startTime); block = lb.getBlock(); nodes = lb.getLocations(); // // Connect to first DataNode in the list. // success = createBlockOutputStream(nodes, clientName, false); if (!success) { LOG.info("Abandoning block " + block); namenode.abandonBlock(block, src, clientName); // Connection failed. Let's wait a little bit and retry retry = true; try { if (System.currentTimeMillis() - startTime > 5000) { LOG.info("Waiting to find target node: " + nodes[0].getName()); } Thread.sleep(6000); } catch (InterruptedException iex) { } } } while (retry && --count >= 0); if (!success) { throw new IOException("Unable to create new block."); } return nodes; }