parameter | default |
dfs.socket.timeout | 60seconds |
dfs.datanode.socket.write.timeout | 480seconds |
dfs.client.block.write.retries | 3 |
dfs.client.block.write.locateFollowingBlock.retries | 5 |
DataStreamer#run → nextBlockOutputStream → locateFollowingBlockを呼び出して得たnodeにたいしてcreateBlockOutputStream
dfs.socket.timeoutは読み込みのtimeout, dfs.datanode.socket.write.timeoutは書き込みのtimeoutを設定します。
createBlockOutputStreamの一部を抜粋すると以下のようになっています。なお本エントリはhadoop 1.0.4のソースを対象にしています。
LOG.debug("Connecting to " + nodes[0].getName()); InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName()); s = socketFactory.createSocket(); timeoutValue = 3000 * nodes.length + socketTimeout; NetUtils.connect(s, target, timeoutValue); s.setSoTimeout(timeoutValue); s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE); LOG.debug("Send buf size " + s.getSendBufferSize()); long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length + datanodeWriteTimeout; // // Xmit header info to datanode // DataOutputStream out = new DataOutputStream( new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), DataNode.SMALL_BUFFER_SIZE)); blockReplyStream = new DataInputStream(NetUtils.getInputStream(s)); out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION ); out.write( DataTransferProtocol.OP_WRITE_BLOCK ); out.writeLong( block.getBlockId() ); out.writeLong( block.getGenerationStamp() ); out.writeInt( nodes.length ); out.writeBoolean( recoveryFlag ); // recovery flag Text.writeString( out, client ); out.writeBoolean(false); // Not sending src node information out.writeInt( nodes.length - 1 ); for (int i = 1; i < nodes.length; i++) { nodes[i].write(out); } accessToken.write(out); checksum.writeHeader( out ); out.flush(); // receive ack for connect pipelineStatus = blockReplyStream.readShort(); firstBadLink = Text.readString(blockReplyStream); if (pipelineStatus != DataTransferProtocol.OP_STATUS_SUCCESS) { if (pipelineStatus == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( "Got access token error for connect ack with firstBadLink as " + firstBadLink); } else { throw new IOException("Bad connect ack with firstBadLink as " + firstBadLink); } } blockStream = out; result = true; // success
timeoutValue = 3000 * nodes.length + socketTimeout;
java.net.SocketTimeoutException: 69000 millis timeout while waiting for channel to be ready for
とか言われた場合の69000というのは 3000 * dfs.replication + dfs.socket.timeout に相当します。
この辺は以前書いたHDFSのファイル書き込み部分のソースを読んでみた Part2 - wyukawa’s blogにもちょっと関係しますね。
long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
こちらは5000 * dfs.replication + dfs.datanode.socket.write.timeoutに相当します。
/** * Open a DataOutputStream to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. * Must get block ID and the IDs of the destinations from the namenode. * Returns the list of target datanodes. */ private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException { LocatedBlock lb = null; boolean retry = false; DatanodeInfo[] nodes; int count = conf.getInt("dfs.client.block.write.retries", 3); boolean success; do { hasError = false; lastException = null; errorIndex = 0; retry = false; nodes = null; success = false; long startTime = System.currentTimeMillis(); DatanodeInfo[] excluded = excludedNodes.toArray(new DatanodeInfo[0]); lb = locateFollowingBlock(startTime, excluded.length > 0 ? excluded : null); block = lb.getBlock(); accessToken = lb.getBlockToken(); nodes = lb.getLocations(); // // Connect to first DataNode in the list. // success = createBlockOutputStream(nodes, clientName, false); if (!success) { LOG.info("Abandoning block " + block); namenode.abandonBlock(block, src, clientName); if (errorIndex < nodes.length) { LOG.info("Excluding datanode " + nodes[errorIndex]); excludedNodes.add(nodes[errorIndex]); } // Connection failed. Let's wait a little bit and retry retry = true; } } while (retry && --count >= 0); if (!success) { throw new IOException("Unable to create new block."); } return nodes; }
Day After Neet: DFSClientで"Bad connect ack with firstBadLink..."が稀に発生する時の対処
private LocatedBlock locateFollowingBlock(long start, DatanodeInfo[] excludedNodes ) throws IOException { int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5); long sleeptime = 400; while (true) { long localstart = System.currentTimeMillis(); while (true) { try { if (serverSupportsHdfs630) { return namenode.addBlock(src, clientName, excludedNodes); } else { return namenode.addBlock(src, clientName); } } catch (RemoteException e) { IOException ue = e.unwrapRemoteException(FileNotFoundException.class, AccessControlException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); if (ue != e) { throw ue; // no need to retry these exceptions } if (e.getMessage().startsWith( "java.io.IOException: java.lang.NoSuchMethodException: " + "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock(" + "java.lang.String, java.lang.String, " + "[Lorg.apache.hadoop.hdfs.protocol.DatanodeInfo;)")) { // We're talking to a server that doesn't implement HDFS-630. // Mark that and try again serverSupportsHdfs630 = false; continue; } if (NotReplicatedYetException.class.getName(). equals(e.getClassName())) { if (retries == 0) { throw e; } else { --retries; LOG.info(StringUtils.stringifyException(e)); if (System.currentTimeMillis() - localstart > 5000) { LOG.info("Waiting for replication for " + (System.currentTimeMillis() - localstart) / 1000 + " seconds"); } try { LOG.warn("NotReplicatedYetException sleeping " + src + " retries left " + retries); Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { } } } else { throw e; } } } } }
hadoop fs コマンドで dfs にコピーする際のリトライ回数を増やすには - n3104のブログ