1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; 21 22 import java.io.BufferedOutputStream; 23 import java.io.DataInputStream; 24 import java.io.DataOutputStream; 25 import java.io.FileNotFoundException; 26 import java.io.IOException; 27 import java.io.InputStream; 28 import java.io.InterruptedIOException; 29 import java.io.OutputStream; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.Socket; 33 import java.nio.channels.ClosedChannelException; 34 import java.util.ArrayList; 35 import java.util.Arrays; 36 import java.util.EnumSet; 37 import java.util.HashSet; 38 import java.util.LinkedList; 39 import java.util.List; 40 import java.util.concurrent.TimeUnit; 41 import java.util.concurrent.atomic.AtomicBoolean; 42 import java.util.concurrent.atomic.AtomicInteger; 43 import java.util.concurrent.atomic.AtomicReference; 44 45 import org.apache.hadoop.HadoopIllegalArgumentException; 46 import org.apache.hadoop.classification.InterfaceAudience; 47 import org.apache.hadoop.crypto.CryptoProtocolVersion; 48 import org.apache.hadoop.fs.CanSetDropBehind; 49 import org.apache.hadoop.fs.CreateFlag; 50 import org.apache.hadoop.fs.FSOutputSummer; 51 import org.apache.hadoop.fs.FileAlreadyExistsException; 52 import org.apache.hadoop.fs.FileEncryptionInfo; 53 import org.apache.hadoop.fs.ParentNotDirectoryException; 54 import org.apache.hadoop.fs.permission.FsPermission; 55 import org.apache.hadoop.fs.StorageType; 56 import org.apache.hadoop.fs.Syncable; 57 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 58 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; 59 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; 60 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; 61 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 62 import org.apache.hadoop.hdfs.protocol.ExtendedBlock; 63 import org.apache.hadoop.hdfs.protocol.HdfsConstants; 64 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; 65 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 66 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; 67 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; 68 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; 69 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; 70 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 71 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; 72 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; 73 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; 74 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; 75 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; 76 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; 77 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; 78 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; 79 import org.apache.hadoop.hdfs.protocolPB.PBHelper; 80 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 81 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; 82 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; 83 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; 84 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException; 85 import org.apache.hadoop.hdfs.server.namenode.SafeModeException; 86 import org.apache.hadoop.hdfs.util.ByteArrayManager; 87 import org.apache.hadoop.io.EnumSetWritable; 88 import org.apache.hadoop.io.IOUtils; 89 import org.apache.hadoop.ipc.RemoteException; 90 import org.apache.hadoop.net.NetUtils; 91 import org.apache.hadoop.security.AccessControlException; 92 import org.apache.hadoop.security.token.Token; 93 import org.apache.hadoop.util.Daemon; 94 import org.apache.hadoop.util.DataChecksum; 95 import org.apache.hadoop.util.DataChecksum.Type; 96 import org.apache.hadoop.util.Progressable; 97 import org.apache.hadoop.util.Time; 98 import org.apache.htrace.NullScope; 99 import org.apache.htrace.Sampler; 100 import org.apache.htrace.Span; 101 import org.apache.htrace.Trace; 102 import org.apache.htrace.TraceInfo; 103 import org.apache.htrace.TraceScope; 104 105 import com.google.common.annotations.VisibleForTesting; 106 import com.google.common.base.Preconditions; 107 import com.google.common.cache.CacheBuilder; 108 import com.google.common.cache.CacheLoader; 109 import com.google.common.cache.LoadingCache; 110 import com.google.common.cache.RemovalListener; 111 import com.google.common.cache.RemovalNotification; 112 113 114 /**************************************************************** 115 * DFSOutputStream creates files from a stream of bytes. 116 * 117 * The client application writes data that is cached internally by 118 * this stream. Data is broken up into packets, each packet is 119 * typically 64K in size. A packet comprises of chunks. Each chunk 120 * is typically 512 bytes and has an associated checksum with it. 121 * 122 * When a client application fills up the currentPacket, it is 123 * enqueued into dataQueue. The DataStreamer thread picks up 124 * packets from the dataQueue, sends it to the first datanode in 125 * the pipeline and moves it from the dataQueue to the ackQueue. 126 * The ResponseProcessor receives acks from the datanodes. When an 127 * successful ack for a packet is received from all datanodes, the 128 * ResponseProcessor removes the corresponding packet from the 129 * ackQueue. 130 * 131 * In case of error, all outstanding packets and moved from 132 * ackQueue. A new pipeline is setup by eliminating the bad 133 * datanode from the original pipeline. The DataStreamer now 134 * starts sending packets from the dataQueue. 135 ****************************************************************/ 136 @InterfaceAudience.Private 137 public class DFSOutputStream extends FSOutputSummer 138 implements Syncable, CanSetDropBehind { 139 private final long dfsclientSlowLogThresholdMs; 140 /** 141 * Number of times to retry creating a file when there are transient 142 * errors (typically related to encryption zones and KeyProvider operations). 143 */ 144 @VisibleForTesting 145 static final int CREATE_RETRY_COUNT = 10; 146 @VisibleForTesting 147 static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS = 148 CryptoProtocolVersion.supported(); 149 150 private final DFSClient dfsClient; 151 private final ByteArrayManager byteArrayManager; 152 private Socket s; 153 // closed is accessed by different threads under different locks. 154 private volatile boolean closed = false; 155 156 private String src; 157 private final long fileId; 158 private final long blockSize; 159 /** Only for DataTransferProtocol.writeBlock(..) */ 160 private final DataChecksum checksum4WriteBlock; 161 private final int bytesPerChecksum; 162 163 // both dataQueue and ackQueue are protected by dataQueue lock 164 private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>(); 165 private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>(); 166 private DFSPacket currentPacket = null; 167 private DataStreamer streamer; 168 private long currentSeqno = 0; 169 private long lastQueuedSeqno = -1; 170 private long lastAckedSeqno = -1; 171 private long bytesCurBlock = 0; // bytes written in current block 172 private int packetSize = 0; // write packet size, not including the header. 173 private int chunksPerPacket = 0; 174 private final AtomicReference<IOException> lastException = new AtomicReference<IOException>(); 175 private long artificialSlowdown = 0; 176 private long lastFlushOffset = 0; // offset when flush was invoked 177 //persist blocks on namenode 178 private final AtomicBoolean persistBlocks = new AtomicBoolean(false); 179 private volatile boolean appendChunk = false; // appending to existing partial block 180 private long initialFileSize = 0; // at time of file open 181 private final Progressable progress; 182 private final short blockReplication; // replication factor of file 183 private boolean shouldSyncBlock = false; // force blocks to disk upon close 184 private final AtomicReference<CachingStrategy> cachingStrategy; 185 private boolean failPacket = false; 186 private FileEncryptionInfo fileEncryptionInfo; 187 private static final BlockStoragePolicySuite blockStoragePolicySuite = 188 BlockStoragePolicySuite.createDefaultSuite(); 189 190 /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, long seqno, boolean lastPacketInBlock)191 private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, 192 long seqno, boolean lastPacketInBlock) throws InterruptedIOException { 193 final byte[] buf; 194 final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize; 195 196 try { 197 buf = byteArrayManager.newByteArray(bufferSize); 198 } catch (InterruptedException ie) { 199 final InterruptedIOException iioe = new InterruptedIOException( 200 "seqno=" + seqno); 201 iioe.initCause(ie); 202 throw iioe; 203 } 204 205 return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno, 206 getChecksumSize(), lastPacketInBlock); 207 } 208 209 /** 210 * For heartbeat packets, create buffer directly by new byte[] 211 * since heartbeats should not be blocked. 212 */ createHeartbeatPacket()213 private DFSPacket createHeartbeatPacket() throws InterruptedIOException { 214 final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; 215 return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 216 getChecksumSize(), false); 217 } 218 219 // 220 // The DataStreamer class is responsible for sending data packets to the 221 // datanodes in the pipeline. It retrieves a new blockid and block locations 222 // from the namenode, and starts streaming packets to the pipeline of 223 // Datanodes. Every packet has a sequence number associated with 224 // it. When all the packets for a block are sent out and acks for each 225 // if them are received, the DataStreamer closes the current block. 226 // 227 class DataStreamer extends Daemon { 228 private volatile boolean streamerClosed = false; 229 private volatile ExtendedBlock block; // its length is number of bytes acked 230 private Token<BlockTokenIdentifier> accessToken; 231 private DataOutputStream blockStream; 232 private DataInputStream blockReplyStream; 233 private ResponseProcessor response = null; 234 private volatile DatanodeInfo[] nodes = null; // list of targets for current block 235 private volatile StorageType[] storageTypes = null; 236 private volatile String[] storageIDs = null; 237 private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes = 238 CacheBuilder.newBuilder() 239 .expireAfterWrite( 240 dfsClient.getConf().excludedNodesCacheExpiry, 241 TimeUnit.MILLISECONDS) 242 .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() { 243 @Override 244 public void onRemoval( 245 RemovalNotification<DatanodeInfo, DatanodeInfo> notification) { 246 DFSClient.LOG.info("Removing node " + 247 notification.getKey() + " from the excluded nodes list"); 248 } 249 }) 250 .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() { 251 @Override 252 public DatanodeInfo load(DatanodeInfo key) throws Exception { 253 return key; 254 } 255 }); 256 private String[] favoredNodes; 257 volatile boolean hasError = false; 258 volatile int errorIndex = -1; 259 // Restarting node index 260 AtomicInteger restartingNodeIndex = new AtomicInteger(-1); 261 private long restartDeadline = 0; // Deadline of DN restart 262 private BlockConstructionStage stage; // block construction stage 263 private long bytesSent = 0; // number of bytes that've been sent 264 private final boolean isLazyPersistFile; 265 266 /** Nodes have been used in the pipeline before and have failed. */ 267 private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>(); 268 /** The last ack sequence number before pipeline failure. */ 269 private long lastAckedSeqnoBeforeFailure = -1; 270 private int pipelineRecoveryCount = 0; 271 /** Has the current block been hflushed? */ 272 private boolean isHflushed = false; 273 /** Append on an existing block? */ 274 private final boolean isAppend; 275 DataStreamer(HdfsFileStatus stat, ExtendedBlock block)276 private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) { 277 isAppend = false; 278 isLazyPersistFile = isLazyPersist(stat); 279 this.block = block; 280 stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; 281 } 282 283 /** 284 * Construct a data streamer for appending to the last partial block 285 * @param lastBlock last block of the file to be appended 286 * @param stat status of the file to be appended 287 * @param bytesPerChecksum number of bytes per checksum 288 * @throws IOException if error occurs 289 */ DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, int bytesPerChecksum)290 private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, 291 int bytesPerChecksum) throws IOException { 292 isAppend = true; 293 stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; 294 block = lastBlock.getBlock(); 295 bytesSent = block.getNumBytes(); 296 accessToken = lastBlock.getBlockToken(); 297 isLazyPersistFile = isLazyPersist(stat); 298 long usedInLastBlock = stat.getLen() % blockSize; 299 int freeInLastBlock = (int)(blockSize - usedInLastBlock); 300 301 // calculate the amount of free space in the pre-existing 302 // last crc chunk 303 int usedInCksum = (int)(stat.getLen() % bytesPerChecksum); 304 int freeInCksum = bytesPerChecksum - usedInCksum; 305 306 // if there is space in the last block, then we have to 307 // append to that block 308 if (freeInLastBlock == blockSize) { 309 throw new IOException("The last block for file " + 310 src + " is full."); 311 } 312 313 if (usedInCksum > 0 && freeInCksum > 0) { 314 // if there is space in the last partial chunk, then 315 // setup in such a way that the next packet will have only 316 // one chunk that fills up the partial chunk. 317 // 318 computePacketChunkSize(0, freeInCksum); 319 setChecksumBufSize(freeInCksum); 320 appendChunk = true; 321 } else { 322 // if the remaining space in the block is smaller than 323 // that expected size of of a packet, then create 324 // smaller size packet. 325 // 326 computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock), 327 bytesPerChecksum); 328 } 329 330 // setup pipeline to append to the last block XXX retries?? 331 setPipeline(lastBlock); 332 errorIndex = -1; // no errors yet. 333 if (nodes.length < 1) { 334 throw new IOException("Unable to retrieve blocks locations " + 335 " for last block " + block + 336 "of file " + src); 337 338 } 339 } 340 setPipeline(LocatedBlock lb)341 private void setPipeline(LocatedBlock lb) { 342 setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); 343 } setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs)344 private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, 345 String[] storageIDs) { 346 this.nodes = nodes; 347 this.storageTypes = storageTypes; 348 this.storageIDs = storageIDs; 349 } 350 setFavoredNodes(String[] favoredNodes)351 private void setFavoredNodes(String[] favoredNodes) { 352 this.favoredNodes = favoredNodes; 353 } 354 355 /** 356 * Initialize for data streaming 357 */ initDataStreaming()358 private void initDataStreaming() { 359 this.setName("DataStreamer for file " + src + 360 " block " + block); 361 response = new ResponseProcessor(nodes); 362 response.start(); 363 stage = BlockConstructionStage.DATA_STREAMING; 364 } 365 endBlock()366 private void endBlock() { 367 if(DFSClient.LOG.isDebugEnabled()) { 368 DFSClient.LOG.debug("Closing old block " + block); 369 } 370 this.setName("DataStreamer for file " + src); 371 closeResponder(); 372 closeStream(); 373 setPipeline(null, null, null); 374 stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; 375 } 376 377 /* 378 * streamer thread is the only thread that opens streams to datanode, 379 * and closes them. Any error recovery is also done by this thread. 380 */ 381 @Override run()382 public void run() { 383 long lastPacket = Time.monotonicNow(); 384 TraceScope scope = NullScope.INSTANCE; 385 while (!streamerClosed && dfsClient.clientRunning) { 386 // if the Responder encountered an error, shutdown Responder 387 if (hasError && response != null) { 388 try { 389 response.close(); 390 response.join(); 391 response = null; 392 } catch (InterruptedException e) { 393 DFSClient.LOG.warn("Caught exception ", e); 394 } 395 } 396 397 DFSPacket one; 398 try { 399 // process datanode IO errors if any 400 boolean doSleep = false; 401 if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { 402 doSleep = processDatanodeError(); 403 } 404 405 synchronized (dataQueue) { 406 // wait for a packet to be sent. 407 long now = Time.monotonicNow(); 408 while ((!streamerClosed && !hasError && dfsClient.clientRunning 409 && dataQueue.size() == 0 && 410 (stage != BlockConstructionStage.DATA_STREAMING || 411 stage == BlockConstructionStage.DATA_STREAMING && 412 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { 413 long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); 414 timeout = timeout <= 0 ? 1000 : timeout; 415 timeout = (stage == BlockConstructionStage.DATA_STREAMING)? 416 timeout : 1000; 417 try { 418 dataQueue.wait(timeout); 419 } catch (InterruptedException e) { 420 DFSClient.LOG.warn("Caught exception ", e); 421 } 422 doSleep = false; 423 now = Time.monotonicNow(); 424 } 425 if (streamerClosed || hasError || !dfsClient.clientRunning) { 426 continue; 427 } 428 // get packet to be sent. 429 if (dataQueue.isEmpty()) { 430 one = createHeartbeatPacket(); 431 assert one != null; 432 } else { 433 one = dataQueue.getFirst(); // regular data packet 434 long parents[] = one.getTraceParents(); 435 if (parents.length > 0) { 436 scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); 437 // TODO: use setParents API once it's available from HTrace 3.2 438 // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); 439 // scope.getSpan().setParents(parents); 440 } 441 } 442 } 443 444 // get new block from namenode. 445 if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { 446 if(DFSClient.LOG.isDebugEnabled()) { 447 DFSClient.LOG.debug("Allocating new block"); 448 } 449 setPipeline(nextBlockOutputStream()); 450 initDataStreaming(); 451 } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { 452 if(DFSClient.LOG.isDebugEnabled()) { 453 DFSClient.LOG.debug("Append to block " + block); 454 } 455 setupPipelineForAppendOrRecovery(); 456 initDataStreaming(); 457 } 458 459 long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); 460 if (lastByteOffsetInBlock > blockSize) { 461 throw new IOException("BlockSize " + blockSize + 462 " is smaller than data size. " + 463 " Offset of packet in block " + 464 lastByteOffsetInBlock + 465 " Aborting file " + src); 466 } 467 468 if (one.isLastPacketInBlock()) { 469 // wait for all data packets have been successfully acked 470 synchronized (dataQueue) { 471 while (!streamerClosed && !hasError && 472 ackQueue.size() != 0 && dfsClient.clientRunning) { 473 try { 474 // wait for acks to arrive from datanodes 475 dataQueue.wait(1000); 476 } catch (InterruptedException e) { 477 DFSClient.LOG.warn("Caught exception ", e); 478 } 479 } 480 } 481 if (streamerClosed || hasError || !dfsClient.clientRunning) { 482 continue; 483 } 484 stage = BlockConstructionStage.PIPELINE_CLOSE; 485 } 486 487 // send the packet 488 Span span = null; 489 synchronized (dataQueue) { 490 // move packet from dataQueue to ackQueue 491 if (!one.isHeartbeatPacket()) { 492 span = scope.detach(); 493 one.setTraceSpan(span); 494 dataQueue.removeFirst(); 495 ackQueue.addLast(one); 496 dataQueue.notifyAll(); 497 } 498 } 499 500 if (DFSClient.LOG.isDebugEnabled()) { 501 DFSClient.LOG.debug("DataStreamer block " + block + 502 " sending packet " + one); 503 } 504 505 // write out data to remote datanode 506 TraceScope writeScope = Trace.startSpan("writeTo", span); 507 try { 508 one.writeTo(blockStream); 509 blockStream.flush(); 510 } catch (IOException e) { 511 // HDFS-3398 treat primary DN is down since client is unable to 512 // write to primary DN. If a failed or restarting node has already 513 // been recorded by the responder, the following call will have no 514 // effect. Pipeline recovery can handle only one node error at a 515 // time. If the primary node fails again during the recovery, it 516 // will be taken out then. 517 tryMarkPrimaryDatanodeFailed(); 518 throw e; 519 } finally { 520 writeScope.close(); 521 } 522 lastPacket = Time.monotonicNow(); 523 524 // update bytesSent 525 long tmpBytesSent = one.getLastByteOffsetBlock(); 526 if (bytesSent < tmpBytesSent) { 527 bytesSent = tmpBytesSent; 528 } 529 530 if (streamerClosed || hasError || !dfsClient.clientRunning) { 531 continue; 532 } 533 534 // Is this block full? 535 if (one.isLastPacketInBlock()) { 536 // wait for the close packet has been acked 537 synchronized (dataQueue) { 538 while (!streamerClosed && !hasError && 539 ackQueue.size() != 0 && dfsClient.clientRunning) { 540 dataQueue.wait(1000);// wait for acks to arrive from datanodes 541 } 542 } 543 if (streamerClosed || hasError || !dfsClient.clientRunning) { 544 continue; 545 } 546 547 endBlock(); 548 } 549 if (progress != null) { progress.progress(); } 550 551 // This is used by unit test to trigger race conditions. 552 if (artificialSlowdown != 0 && dfsClient.clientRunning) { 553 Thread.sleep(artificialSlowdown); 554 } 555 } catch (Throwable e) { 556 // Log warning if there was a real error. 557 if (restartingNodeIndex.get() == -1) { 558 DFSClient.LOG.warn("DataStreamer Exception", e); 559 } 560 if (e instanceof IOException) { 561 setLastException((IOException)e); 562 } else { 563 setLastException(new IOException("DataStreamer Exception: ",e)); 564 } 565 hasError = true; 566 if (errorIndex == -1 && restartingNodeIndex.get() == -1) { 567 // Not a datanode issue 568 streamerClosed = true; 569 } 570 } finally { 571 scope.close(); 572 } 573 } 574 closeInternal(); 575 } 576 closeInternal()577 private void closeInternal() { 578 closeResponder(); // close and join 579 closeStream(); 580 streamerClosed = true; 581 setClosed(); 582 synchronized (dataQueue) { 583 dataQueue.notifyAll(); 584 } 585 } 586 587 /* 588 * close both streamer and DFSOutputStream, should be called only 589 * by an external thread and only after all data to be sent has 590 * been flushed to datanode. 591 * 592 * Interrupt this data streamer if force is true 593 * 594 * @param force if this data stream is forced to be closed 595 */ close(boolean force)596 void close(boolean force) { 597 streamerClosed = true; 598 synchronized (dataQueue) { 599 dataQueue.notifyAll(); 600 } 601 if (force) { 602 this.interrupt(); 603 } 604 } 605 closeResponder()606 private void closeResponder() { 607 if (response != null) { 608 try { 609 response.close(); 610 response.join(); 611 } catch (InterruptedException e) { 612 DFSClient.LOG.warn("Caught exception ", e); 613 } finally { 614 response = null; 615 } 616 } 617 } 618 closeStream()619 private void closeStream() { 620 if (blockStream != null) { 621 try { 622 blockStream.close(); 623 } catch (IOException e) { 624 setLastException(e); 625 } finally { 626 blockStream = null; 627 } 628 } 629 if (blockReplyStream != null) { 630 try { 631 blockReplyStream.close(); 632 } catch (IOException e) { 633 setLastException(e); 634 } finally { 635 blockReplyStream = null; 636 } 637 } 638 if (null != s) { 639 try { 640 s.close(); 641 } catch (IOException e) { 642 setLastException(e); 643 } finally { 644 s = null; 645 } 646 } 647 } 648 649 // The following synchronized methods are used whenever 650 // errorIndex or restartingNodeIndex is set. This is because 651 // check & set needs to be atomic. Simply reading variables 652 // does not require a synchronization. When responder is 653 // not running (e.g. during pipeline recovery), there is no 654 // need to use these methods. 655 656 /** Set the error node index. Called by responder */ setErrorIndex(int idx)657 synchronized void setErrorIndex(int idx) { 658 errorIndex = idx; 659 } 660 661 /** Set the restarting node index. Called by responder */ setRestartingNodeIndex(int idx)662 synchronized void setRestartingNodeIndex(int idx) { 663 restartingNodeIndex.set(idx); 664 // If the data streamer has already set the primary node 665 // bad, clear it. It is likely that the write failed due to 666 // the DN shutdown. Even if it was a real failure, the pipeline 667 // recovery will take care of it. 668 errorIndex = -1; 669 } 670 671 /** 672 * This method is used when no explicit error report was received, 673 * but something failed. When the primary node is a suspect or 674 * unsure about the cause, the primary node is marked as failed. 675 */ tryMarkPrimaryDatanodeFailed()676 synchronized void tryMarkPrimaryDatanodeFailed() { 677 // There should be no existing error and no ongoing restart. 678 if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) { 679 errorIndex = 0; 680 } 681 } 682 683 /** 684 * Examine whether it is worth waiting for a node to restart. 685 * @param index the node index 686 */ shouldWaitForRestart(int index)687 boolean shouldWaitForRestart(int index) { 688 // Only one node in the pipeline. 689 if (nodes.length == 1) { 690 return true; 691 } 692 693 // Is it a local node? 694 InetAddress addr = null; 695 try { 696 addr = InetAddress.getByName(nodes[index].getIpAddr()); 697 } catch (java.net.UnknownHostException e) { 698 // we are passing an ip address. this should not happen. 699 assert false; 700 } 701 702 if (addr != null && NetUtils.isLocalAddress(addr)) { 703 return true; 704 } 705 return false; 706 } 707 708 // 709 // Processes responses from the datanodes. A packet is removed 710 // from the ackQueue when its response arrives. 711 // 712 private class ResponseProcessor extends Daemon { 713 714 private volatile boolean responderClosed = false; 715 private DatanodeInfo[] targets = null; 716 private boolean isLastPacketInBlock = false; 717 ResponseProcessor(DatanodeInfo[] targets)718 ResponseProcessor (DatanodeInfo[] targets) { 719 this.targets = targets; 720 } 721 722 @Override run()723 public void run() { 724 725 setName("ResponseProcessor for block " + block); 726 PipelineAck ack = new PipelineAck(); 727 728 TraceScope scope = NullScope.INSTANCE; 729 while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { 730 // process responses from datanodes. 731 try { 732 // read an ack from the pipeline 733 long begin = Time.monotonicNow(); 734 ack.readFields(blockReplyStream); 735 long duration = Time.monotonicNow() - begin; 736 if (duration > dfsclientSlowLogThresholdMs 737 && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { 738 DFSClient.LOG 739 .warn("Slow ReadProcessor read fields took " + duration 740 + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " 741 + ack + ", targets: " + Arrays.asList(targets)); 742 } else if (DFSClient.LOG.isDebugEnabled()) { 743 DFSClient.LOG.debug("DFSClient " + ack); 744 } 745 746 long seqno = ack.getSeqno(); 747 // processes response status from datanodes. 748 for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { 749 final Status reply = PipelineAck.getStatusFromHeader(ack 750 .getHeaderFlag(i)); 751 // Restart will not be treated differently unless it is 752 // the local node or the only one in the pipeline. 753 if (PipelineAck.isRestartOOBStatus(reply) && 754 shouldWaitForRestart(i)) { 755 restartDeadline = dfsClient.getConf().datanodeRestartTimeout 756 + Time.monotonicNow(); 757 setRestartingNodeIndex(i); 758 String message = "A datanode is restarting: " + targets[i]; 759 DFSClient.LOG.info(message); 760 throw new IOException(message); 761 } 762 // node error 763 if (reply != SUCCESS) { 764 setErrorIndex(i); // first bad datanode 765 throw new IOException("Bad response " + reply + 766 " for block " + block + 767 " from datanode " + 768 targets[i]); 769 } 770 } 771 772 assert seqno != PipelineAck.UNKOWN_SEQNO : 773 "Ack for unknown seqno should be a failed ack: " + ack; 774 if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack 775 continue; 776 } 777 778 // a success ack for a data packet 779 DFSPacket one; 780 synchronized (dataQueue) { 781 one = ackQueue.getFirst(); 782 } 783 if (one.getSeqno() != seqno) { 784 throw new IOException("ResponseProcessor: Expecting seqno " + 785 " for block " + block + 786 one.getSeqno() + " but received " + seqno); 787 } 788 isLastPacketInBlock = one.isLastPacketInBlock(); 789 790 // Fail the packet write for testing in order to force a 791 // pipeline recovery. 792 if (DFSClientFaultInjector.get().failPacket() && 793 isLastPacketInBlock) { 794 failPacket = true; 795 throw new IOException( 796 "Failing the last packet for testing."); 797 } 798 799 // update bytesAcked 800 block.setNumBytes(one.getLastByteOffsetBlock()); 801 802 synchronized (dataQueue) { 803 scope = Trace.continueSpan(one.getTraceSpan()); 804 one.setTraceSpan(null); 805 lastAckedSeqno = seqno; 806 ackQueue.removeFirst(); 807 dataQueue.notifyAll(); 808 809 one.releaseBuffer(byteArrayManager); 810 } 811 } catch (Exception e) { 812 if (!responderClosed) { 813 if (e instanceof IOException) { 814 setLastException((IOException)e); 815 } 816 hasError = true; 817 // If no explicit error report was received, mark the primary 818 // node as failed. 819 tryMarkPrimaryDatanodeFailed(); 820 synchronized (dataQueue) { 821 dataQueue.notifyAll(); 822 } 823 if (restartingNodeIndex.get() == -1) { 824 DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception " 825 + " for block " + block, e); 826 } 827 responderClosed = true; 828 } 829 } finally { 830 scope.close(); 831 } 832 } 833 } 834 close()835 void close() { 836 responderClosed = true; 837 this.interrupt(); 838 } 839 } 840 841 // If this stream has encountered any errors so far, shutdown 842 // threads and mark stream as closed. Returns true if we should 843 // sleep for a while after returning from this call. 844 // processDatanodeError()845 private boolean processDatanodeError() throws IOException { 846 if (response != null) { 847 DFSClient.LOG.info("Error Recovery for " + block + 848 " waiting for responder to exit. "); 849 return true; 850 } 851 closeStream(); 852 853 // move packets from ack queue to front of the data queue 854 synchronized (dataQueue) { 855 dataQueue.addAll(0, ackQueue); 856 ackQueue.clear(); 857 } 858 859 // Record the new pipeline failure recovery. 860 if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { 861 lastAckedSeqnoBeforeFailure = lastAckedSeqno; 862 pipelineRecoveryCount = 1; 863 } else { 864 // If we had to recover the pipeline five times in a row for the 865 // same packet, this client likely has corrupt data or corrupting 866 // during transmission. 867 if (++pipelineRecoveryCount > 5) { 868 DFSClient.LOG.warn("Error recovering pipeline for writing " + 869 block + ". Already retried 5 times for the same packet."); 870 lastException.set(new IOException("Failing write. Tried pipeline " + 871 "recovery 5 times without success.")); 872 streamerClosed = true; 873 return false; 874 } 875 } 876 boolean doSleep = setupPipelineForAppendOrRecovery(); 877 878 if (!streamerClosed && dfsClient.clientRunning) { 879 if (stage == BlockConstructionStage.PIPELINE_CLOSE) { 880 881 // If we had an error while closing the pipeline, we go through a fast-path 882 // where the BlockReceiver does not run. Instead, the DataNode just finalizes 883 // the block immediately during the 'connect ack' process. So, we want to pull 884 // the end-of-block packet from the dataQueue, since we don't actually have 885 // a true pipeline to send it over. 886 // 887 // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that 888 // a client waiting on close() will be aware that the flush finished. 889 synchronized (dataQueue) { 890 DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet 891 Span span = endOfBlockPacket.getTraceSpan(); 892 if (span != null) { 893 // Close any trace span associated with this Packet 894 TraceScope scope = Trace.continueSpan(span); 895 scope.close(); 896 } 897 assert endOfBlockPacket.isLastPacketInBlock(); 898 assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; 899 lastAckedSeqno = endOfBlockPacket.getSeqno(); 900 dataQueue.notifyAll(); 901 } 902 endBlock(); 903 } else { 904 initDataStreaming(); 905 } 906 } 907 908 return doSleep; 909 } 910 setHflush()911 private void setHflush() { 912 isHflushed = true; 913 } 914 findNewDatanode(final DatanodeInfo[] original )915 private int findNewDatanode(final DatanodeInfo[] original 916 ) throws IOException { 917 if (nodes.length != original.length + 1) { 918 throw new IOException( 919 new StringBuilder() 920 .append("Failed to replace a bad datanode on the existing pipeline ") 921 .append("due to no more good datanodes being available to try. ") 922 .append("(Nodes: current=").append(Arrays.asList(nodes)) 923 .append(", original=").append(Arrays.asList(original)).append("). ") 924 .append("The current failed datanode replacement policy is ") 925 .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") 926 .append("a client may configure this via '") 927 .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY) 928 .append("' in its configuration.") 929 .toString()); 930 } 931 for(int i = 0; i < nodes.length; i++) { 932 int j = 0; 933 for(; j < original.length && !nodes[i].equals(original[j]); j++); 934 if (j == original.length) { 935 return i; 936 } 937 } 938 throw new IOException("Failed: new datanode not found: nodes=" 939 + Arrays.asList(nodes) + ", original=" + Arrays.asList(original)); 940 } 941 addDatanode2ExistingPipeline()942 private void addDatanode2ExistingPipeline() throws IOException { 943 if (DataTransferProtocol.LOG.isDebugEnabled()) { 944 DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno); 945 } 946 /* 947 * Is data transfer necessary? We have the following cases. 948 * 949 * Case 1: Failure in Pipeline Setup 950 * - Append 951 * + Transfer the stored replica, which may be a RBW or a finalized. 952 * - Create 953 * + If no data, then no transfer is required. 954 * + If there are data written, transfer RBW. This case may happens 955 * when there are streaming failure earlier in this pipeline. 956 * 957 * Case 2: Failure in Streaming 958 * - Append/Create: 959 * + transfer RBW 960 * 961 * Case 3: Failure in Close 962 * - Append/Create: 963 * + no transfer, let NameNode replicates the block. 964 */ 965 if (!isAppend && lastAckedSeqno < 0 966 && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { 967 //no data have been written 968 return; 969 } else if (stage == BlockConstructionStage.PIPELINE_CLOSE 970 || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { 971 //pipeline is closing 972 return; 973 } 974 975 int tried = 0; 976 final DatanodeInfo[] original = nodes; 977 final StorageType[] originalTypes = storageTypes; 978 final String[] originalIDs = storageIDs; 979 IOException caughtException = null; 980 ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(failed); 981 while (tried < 3) { 982 LocatedBlock lb; 983 //get a new datanode 984 lb = dfsClient.namenode.getAdditionalDatanode( 985 src, fileId, block, nodes, storageIDs, 986 exclude.toArray(new DatanodeInfo[exclude.size()]), 987 1, dfsClient.clientName); 988 // a new node was allocated by the namenode. Update nodes. 989 setPipeline(lb); 990 991 //find the new datanode 992 final int d = findNewDatanode(original); 993 //transfer replica. pick a source from the original nodes 994 final DatanodeInfo src = original[tried % original.length]; 995 final DatanodeInfo[] targets = {nodes[d]}; 996 final StorageType[] targetStorageTypes = {storageTypes[d]}; 997 998 try { 999 transfer(src, targets, targetStorageTypes, lb.getBlockToken()); 1000 } catch (IOException ioe) { 1001 DFSClient.LOG.warn("Error transferring data from " + src + " to " + 1002 nodes[d] + ": " + ioe.getMessage()); 1003 caughtException = ioe; 1004 // add the allocated node to the exclude list. 1005 exclude.add(nodes[d]); 1006 setPipeline(original, originalTypes, originalIDs); 1007 tried++; 1008 continue; 1009 } 1010 return; // finished successfully 1011 } 1012 // All retries failed 1013 throw (caughtException != null) ? caughtException : 1014 new IOException("Failed to add a node"); 1015 } 1016 transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final Token<BlockTokenIdentifier> blockToken)1017 private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, 1018 final StorageType[] targetStorageTypes, 1019 final Token<BlockTokenIdentifier> blockToken) throws IOException { 1020 //transfer replica to the new datanode 1021 Socket sock = null; 1022 DataOutputStream out = null; 1023 DataInputStream in = null; 1024 try { 1025 sock = createSocketForPipeline(src, 2, dfsClient); 1026 final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); 1027 1028 // transfer timeout multiplier based on the transfer size 1029 // One per 200 packets = 12.8MB. Minimum is 2. 1030 int multi = 2 + (int)(bytesSent/dfsClient.getConf().writePacketSize)/200; 1031 final long readTimeout = dfsClient.getDatanodeReadTimeout(multi); 1032 1033 OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); 1034 InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); 1035 IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, 1036 unbufOut, unbufIn, dfsClient, blockToken, src); 1037 unbufOut = saslStreams.out; 1038 unbufIn = saslStreams.in; 1039 out = new DataOutputStream(new BufferedOutputStream(unbufOut, 1040 HdfsConstants.SMALL_BUFFER_SIZE)); 1041 in = new DataInputStream(unbufIn); 1042 1043 //send the TRANSFER_BLOCK request 1044 new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, 1045 targets, targetStorageTypes); 1046 out.flush(); 1047 1048 //ack 1049 BlockOpResponseProto response = 1050 BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in)); 1051 if (SUCCESS != response.getStatus()) { 1052 throw new IOException("Failed to add a datanode"); 1053 } 1054 } finally { 1055 IOUtils.closeStream(in); 1056 IOUtils.closeStream(out); 1057 IOUtils.closeSocket(sock); 1058 } 1059 } 1060 1061 /** 1062 * Open a DataOutputStream to a DataNode pipeline so that 1063 * it can be written to. 1064 * This happens when a file is appended or data streaming fails 1065 * It keeps on trying until a pipeline is setup 1066 */ setupPipelineForAppendOrRecovery()1067 private boolean setupPipelineForAppendOrRecovery() throws IOException { 1068 // check number of datanodes 1069 if (nodes == null || nodes.length == 0) { 1070 String msg = "Could not get block locations. " + "Source file \"" 1071 + src + "\" - Aborting..."; 1072 DFSClient.LOG.warn(msg); 1073 setLastException(new IOException(msg)); 1074 streamerClosed = true; 1075 return false; 1076 } 1077 1078 boolean success = false; 1079 long newGS = 0L; 1080 while (!success && !streamerClosed && dfsClient.clientRunning) { 1081 // Sleep before reconnect if a dn is restarting. 1082 // This process will be repeated until the deadline or the datanode 1083 // starts back up. 1084 if (restartingNodeIndex.get() >= 0) { 1085 // 4 seconds or the configured deadline period, whichever is shorter. 1086 // This is the retry interval and recovery will be retried in this 1087 // interval until timeout or success. 1088 long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout, 1089 4000L); 1090 try { 1091 Thread.sleep(delay); 1092 } catch (InterruptedException ie) { 1093 lastException.set(new IOException("Interrupted while waiting for " + 1094 "datanode to restart. " + nodes[restartingNodeIndex.get()])); 1095 streamerClosed = true; 1096 return false; 1097 } 1098 } 1099 boolean isRecovery = hasError; 1100 // remove bad datanode from list of datanodes. 1101 // If errorIndex was not set (i.e. appends), then do not remove 1102 // any datanodes 1103 // 1104 if (errorIndex >= 0) { 1105 StringBuilder pipelineMsg = new StringBuilder(); 1106 for (int j = 0; j < nodes.length; j++) { 1107 pipelineMsg.append(nodes[j]); 1108 if (j < nodes.length - 1) { 1109 pipelineMsg.append(", "); 1110 } 1111 } 1112 if (nodes.length <= 1) { 1113 lastException.set(new IOException("All datanodes " + pipelineMsg 1114 + " are bad. Aborting...")); 1115 streamerClosed = true; 1116 return false; 1117 } 1118 DFSClient.LOG.warn("Error Recovery for block " + block + 1119 " in pipeline " + pipelineMsg + 1120 ": bad datanode " + nodes[errorIndex]); 1121 failed.add(nodes[errorIndex]); 1122 1123 DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; 1124 arraycopy(nodes, newnodes, errorIndex); 1125 1126 final StorageType[] newStorageTypes = new StorageType[newnodes.length]; 1127 arraycopy(storageTypes, newStorageTypes, errorIndex); 1128 1129 final String[] newStorageIDs = new String[newnodes.length]; 1130 arraycopy(storageIDs, newStorageIDs, errorIndex); 1131 1132 setPipeline(newnodes, newStorageTypes, newStorageIDs); 1133 1134 // Just took care of a node error while waiting for a node restart 1135 if (restartingNodeIndex.get() >= 0) { 1136 // If the error came from a node further away than the restarting 1137 // node, the restart must have been complete. 1138 if (errorIndex > restartingNodeIndex.get()) { 1139 restartingNodeIndex.set(-1); 1140 } else if (errorIndex < restartingNodeIndex.get()) { 1141 // the node index has shifted. 1142 restartingNodeIndex.decrementAndGet(); 1143 } else { 1144 // this shouldn't happen... 1145 assert false; 1146 } 1147 } 1148 1149 if (restartingNodeIndex.get() == -1) { 1150 hasError = false; 1151 } 1152 lastException.set(null); 1153 errorIndex = -1; 1154 } 1155 1156 // Check if replace-datanode policy is satisfied. 1157 if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication, 1158 nodes, isAppend, isHflushed)) { 1159 try { 1160 addDatanode2ExistingPipeline(); 1161 } catch(IOException ioe) { 1162 if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { 1163 throw ioe; 1164 } 1165 DFSClient.LOG.warn("Failed to replace datanode." 1166 + " Continue with the remaining datanodes since " 1167 + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY 1168 + " is set to true.", ioe); 1169 } 1170 } 1171 1172 // get a new generation stamp and an access token 1173 LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName); 1174 newGS = lb.getBlock().getGenerationStamp(); 1175 accessToken = lb.getBlockToken(); 1176 1177 // set up the pipeline again with the remaining nodes 1178 if (failPacket) { // for testing 1179 success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); 1180 failPacket = false; 1181 try { 1182 // Give DNs time to send in bad reports. In real situations, 1183 // good reports should follow bad ones, if client committed 1184 // with those nodes. 1185 Thread.sleep(2000); 1186 } catch (InterruptedException ie) {} 1187 } else { 1188 success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); 1189 } 1190 1191 if (restartingNodeIndex.get() >= 0) { 1192 assert hasError == true; 1193 // check errorIndex set above 1194 if (errorIndex == restartingNodeIndex.get()) { 1195 // ignore, if came from the restarting node 1196 errorIndex = -1; 1197 } 1198 // still within the deadline 1199 if (Time.monotonicNow() < restartDeadline) { 1200 continue; // with in the deadline 1201 } 1202 // expired. declare the restarting node dead 1203 restartDeadline = 0; 1204 int expiredNodeIndex = restartingNodeIndex.get(); 1205 restartingNodeIndex.set(-1); 1206 DFSClient.LOG.warn("Datanode did not restart in time: " + 1207 nodes[expiredNodeIndex]); 1208 // Mark the restarting node as failed. If there is any other failed 1209 // node during the last pipeline construction attempt, it will not be 1210 // overwritten/dropped. In this case, the restarting node will get 1211 // excluded in the following attempt, if it still does not come up. 1212 if (errorIndex == -1) { 1213 errorIndex = expiredNodeIndex; 1214 } 1215 // From this point on, normal pipeline recovery applies. 1216 } 1217 } // while 1218 1219 if (success) { 1220 // update pipeline at the namenode 1221 ExtendedBlock newBlock = new ExtendedBlock( 1222 block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); 1223 dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, 1224 nodes, storageIDs); 1225 // update client side generation stamp 1226 block = newBlock; 1227 } 1228 return false; // do not sleep, continue processing 1229 } 1230 1231 /** 1232 * Open a DataOutputStream to a DataNode so that it can be written to. 1233 * This happens when a file is created and each time a new block is allocated. 1234 * Must get block ID and the IDs of the destinations from the namenode. 1235 * Returns the list of target datanodes. 1236 */ nextBlockOutputStream()1237 private LocatedBlock nextBlockOutputStream() throws IOException { 1238 LocatedBlock lb = null; 1239 DatanodeInfo[] nodes = null; 1240 StorageType[] storageTypes = null; 1241 int count = dfsClient.getConf().nBlockWriteRetry; 1242 boolean success = false; 1243 ExtendedBlock oldBlock = block; 1244 do { 1245 hasError = false; 1246 lastException.set(null); 1247 errorIndex = -1; 1248 success = false; 1249 1250 DatanodeInfo[] excluded = 1251 excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) 1252 .keySet() 1253 .toArray(new DatanodeInfo[0]); 1254 block = oldBlock; 1255 lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); 1256 block = lb.getBlock(); 1257 block.setNumBytes(0); 1258 bytesSent = 0; 1259 accessToken = lb.getBlockToken(); 1260 nodes = lb.getLocations(); 1261 storageTypes = lb.getStorageTypes(); 1262 1263 // 1264 // Connect to first DataNode in the list. 1265 // 1266 success = createBlockOutputStream(nodes, storageTypes, 0L, false); 1267 1268 if (!success) { 1269 DFSClient.LOG.info("Abandoning " + block); 1270 dfsClient.namenode.abandonBlock(block, fileId, src, 1271 dfsClient.clientName); 1272 block = null; 1273 DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]); 1274 excludedNodes.put(nodes[errorIndex], nodes[errorIndex]); 1275 } 1276 } while (!success && --count >= 0); 1277 1278 if (!success) { 1279 throw new IOException("Unable to create new block."); 1280 } 1281 return lb; 1282 } 1283 1284 // connects to the first datanode in the pipeline 1285 // Returns true if success, otherwise return failure. 1286 // createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag)1287 private boolean createBlockOutputStream(DatanodeInfo[] nodes, 1288 StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { 1289 if (nodes.length == 0) { 1290 DFSClient.LOG.info("nodes are empty for write pipeline of block " 1291 + block); 1292 return false; 1293 } 1294 Status pipelineStatus = SUCCESS; 1295 String firstBadLink = ""; 1296 boolean checkRestart = false; 1297 if (DFSClient.LOG.isDebugEnabled()) { 1298 for (int i = 0; i < nodes.length; i++) { 1299 DFSClient.LOG.debug("pipeline = " + nodes[i]); 1300 } 1301 } 1302 1303 // persist blocks on namenode on next flush 1304 persistBlocks.set(true); 1305 1306 int refetchEncryptionKey = 1; 1307 while (true) { 1308 boolean result = false; 1309 DataOutputStream out = null; 1310 try { 1311 assert null == s : "Previous socket unclosed"; 1312 assert null == blockReplyStream : "Previous blockReplyStream unclosed"; 1313 s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); 1314 long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); 1315 1316 OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); 1317 InputStream unbufIn = NetUtils.getInputStream(s); 1318 IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, 1319 unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); 1320 unbufOut = saslStreams.out; 1321 unbufIn = saslStreams.in; 1322 out = new DataOutputStream(new BufferedOutputStream(unbufOut, 1323 HdfsConstants.SMALL_BUFFER_SIZE)); 1324 blockReplyStream = new DataInputStream(unbufIn); 1325 1326 // 1327 // Xmit header info to datanode 1328 // 1329 1330 BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; 1331 1332 // We cannot change the block length in 'block' as it counts the number 1333 // of bytes ack'ed. 1334 ExtendedBlock blockCopy = new ExtendedBlock(block); 1335 blockCopy.setNumBytes(blockSize); 1336 1337 boolean[] targetPinnings = getPinnings(nodes, true); 1338 // send the request 1339 new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, 1340 dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, 1341 nodes.length, block.getNumBytes(), bytesSent, newGS, 1342 checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, 1343 (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); 1344 1345 // receive ack for connect 1346 BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( 1347 PBHelper.vintPrefixed(blockReplyStream)); 1348 pipelineStatus = resp.getStatus(); 1349 firstBadLink = resp.getFirstBadLink(); 1350 1351 // Got an restart OOB ack. 1352 // If a node is already restarting, this status is not likely from 1353 // the same node. If it is from a different node, it is not 1354 // from the local datanode. Thus it is safe to treat this as a 1355 // regular node error. 1356 if (PipelineAck.isRestartOOBStatus(pipelineStatus) && 1357 restartingNodeIndex.get() == -1) { 1358 checkRestart = true; 1359 throw new IOException("A datanode is restarting."); 1360 } 1361 1362 String logInfo = "ack with firstBadLink as " + firstBadLink; 1363 DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo); 1364 1365 assert null == blockStream : "Previous blockStream unclosed"; 1366 blockStream = out; 1367 result = true; // success 1368 restartingNodeIndex.set(-1); 1369 hasError = false; 1370 } catch (IOException ie) { 1371 if (restartingNodeIndex.get() == -1) { 1372 DFSClient.LOG.info("Exception in createBlockOutputStream", ie); 1373 } 1374 if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { 1375 DFSClient.LOG.info("Will fetch a new encryption key and retry, " 1376 + "encryption key was invalid when connecting to " 1377 + nodes[0] + " : " + ie); 1378 // The encryption key used is invalid. 1379 refetchEncryptionKey--; 1380 dfsClient.clearDataEncryptionKey(); 1381 // Don't close the socket/exclude this node just yet. Try again with 1382 // a new encryption key. 1383 continue; 1384 } 1385 1386 // find the datanode that matches 1387 if (firstBadLink.length() != 0) { 1388 for (int i = 0; i < nodes.length; i++) { 1389 // NB: Unconditionally using the xfer addr w/o hostname 1390 if (firstBadLink.equals(nodes[i].getXferAddr())) { 1391 errorIndex = i; 1392 break; 1393 } 1394 } 1395 } else { 1396 assert checkRestart == false; 1397 errorIndex = 0; 1398 } 1399 // Check whether there is a restart worth waiting for. 1400 if (checkRestart && shouldWaitForRestart(errorIndex)) { 1401 restartDeadline = dfsClient.getConf().datanodeRestartTimeout + 1402 Time.monotonicNow(); 1403 restartingNodeIndex.set(errorIndex); 1404 errorIndex = -1; 1405 DFSClient.LOG.info("Waiting for the datanode to be restarted: " + 1406 nodes[restartingNodeIndex.get()]); 1407 } 1408 hasError = true; 1409 setLastException(ie); 1410 result = false; // error 1411 } finally { 1412 if (!result) { 1413 IOUtils.closeSocket(s); 1414 s = null; 1415 IOUtils.closeStream(out); 1416 out = null; 1417 IOUtils.closeStream(blockReplyStream); 1418 blockReplyStream = null; 1419 } 1420 } 1421 return result; 1422 } 1423 } 1424 getPinnings(DatanodeInfo[] nodes, boolean shouldLog)1425 private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { 1426 if (favoredNodes == null) { 1427 return null; 1428 } else { 1429 boolean[] pinnings = new boolean[nodes.length]; 1430 HashSet<String> favoredSet = 1431 new HashSet<String>(Arrays.asList(favoredNodes)); 1432 for (int i = 0; i < nodes.length; i++) { 1433 pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); 1434 if (DFSClient.LOG.isDebugEnabled()) { 1435 DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() + 1436 " was chosen by name node (favored=" + pinnings[i] + 1437 ")."); 1438 } 1439 } 1440 if (shouldLog && !favoredSet.isEmpty()) { 1441 // There is one or more favored nodes that were not allocated. 1442 DFSClient.LOG.warn( 1443 "These favored nodes were specified but not chosen: " + 1444 favoredSet + 1445 " Specified favored nodes: " + Arrays.toString(favoredNodes)); 1446 1447 } 1448 return pinnings; 1449 } 1450 } 1451 locateFollowingBlock(DatanodeInfo[] excludedNodes)1452 private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { 1453 int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; 1454 long sleeptime = 400; 1455 while (true) { 1456 long localstart = Time.monotonicNow(); 1457 while (true) { 1458 try { 1459 return dfsClient.namenode.addBlock(src, dfsClient.clientName, 1460 block, excludedNodes, fileId, favoredNodes); 1461 } catch (RemoteException e) { 1462 IOException ue = 1463 e.unwrapRemoteException(FileNotFoundException.class, 1464 AccessControlException.class, 1465 NSQuotaExceededException.class, 1466 DSQuotaExceededException.class, 1467 UnresolvedPathException.class); 1468 if (ue != e) { 1469 throw ue; // no need to retry these exceptions 1470 } 1471 1472 1473 if (NotReplicatedYetException.class.getName(). 1474 equals(e.getClassName())) { 1475 if (retries == 0) { 1476 throw e; 1477 } else { 1478 --retries; 1479 DFSClient.LOG.info("Exception while adding a block", e); 1480 long elapsed = Time.monotonicNow() - localstart; 1481 if (elapsed > 5000) { 1482 DFSClient.LOG.info("Waiting for replication for " 1483 + (elapsed / 1000) + " seconds"); 1484 } 1485 try { 1486 DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src 1487 + " retries left " + retries); 1488 Thread.sleep(sleeptime); 1489 sleeptime *= 2; 1490 } catch (InterruptedException ie) { 1491 DFSClient.LOG.warn("Caught exception ", ie); 1492 } 1493 } 1494 } else { 1495 throw e; 1496 } 1497 1498 } 1499 } 1500 } 1501 } 1502 getBlock()1503 ExtendedBlock getBlock() { 1504 return block; 1505 } 1506 getNodes()1507 DatanodeInfo[] getNodes() { 1508 return nodes; 1509 } 1510 getBlockToken()1511 Token<BlockTokenIdentifier> getBlockToken() { 1512 return accessToken; 1513 } 1514 setLastException(IOException e)1515 private void setLastException(IOException e) { 1516 lastException.compareAndSet(null, e); 1517 } 1518 } 1519 1520 /** 1521 * Create a socket for a write pipeline 1522 * @param first the first datanode 1523 * @param length the pipeline length 1524 * @param client client 1525 * @return the socket connected to the first datanode 1526 */ createSocketForPipeline(final DatanodeInfo first, final int length, final DFSClient client)1527 static Socket createSocketForPipeline(final DatanodeInfo first, 1528 final int length, final DFSClient client) throws IOException { 1529 final String dnAddr = first.getXferAddr( 1530 client.getConf().connectToDnViaHostname); 1531 if (DFSClient.LOG.isDebugEnabled()) { 1532 DFSClient.LOG.debug("Connecting to datanode " + dnAddr); 1533 } 1534 final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); 1535 final Socket sock = client.socketFactory.createSocket(); 1536 final int timeout = client.getDatanodeReadTimeout(length); 1537 NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout); 1538 sock.setSoTimeout(timeout); 1539 sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); 1540 if(DFSClient.LOG.isDebugEnabled()) { 1541 DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize()); 1542 } 1543 return sock; 1544 } 1545 1546 @Override checkClosed()1547 protected void checkClosed() throws IOException { 1548 if (isClosed()) { 1549 IOException e = lastException.get(); 1550 throw e != null ? e : new ClosedChannelException(); 1551 } 1552 } 1553 1554 // 1555 // returns the list of targets, if any, that is being currently used. 1556 // 1557 @VisibleForTesting getPipeline()1558 public synchronized DatanodeInfo[] getPipeline() { 1559 if (streamer == null) { 1560 return null; 1561 } 1562 DatanodeInfo[] currentNodes = streamer.getNodes(); 1563 if (currentNodes == null) { 1564 return null; 1565 } 1566 DatanodeInfo[] value = new DatanodeInfo[currentNodes.length]; 1567 for (int i = 0; i < currentNodes.length; i++) { 1568 value[i] = currentNodes[i]; 1569 } 1570 return value; 1571 } 1572 1573 /** 1574 * @return the object for computing checksum. 1575 * The type is NULL if checksum is not computed. 1576 */ getChecksum4Compute(DataChecksum checksum, HdfsFileStatus stat)1577 private static DataChecksum getChecksum4Compute(DataChecksum checksum, 1578 HdfsFileStatus stat) { 1579 if (isLazyPersist(stat) && stat.getReplication() == 1) { 1580 // do not compute checksum for writing to single replica to memory 1581 return DataChecksum.newDataChecksum(Type.NULL, 1582 checksum.getBytesPerChecksum()); 1583 } 1584 return checksum; 1585 } 1586 DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, HdfsFileStatus stat, DataChecksum checksum)1587 private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, 1588 HdfsFileStatus stat, DataChecksum checksum) throws IOException { 1589 super(getChecksum4Compute(checksum, stat)); 1590 this.dfsClient = dfsClient; 1591 this.src = src; 1592 this.fileId = stat.getFileId(); 1593 this.blockSize = stat.getBlockSize(); 1594 this.blockReplication = stat.getReplication(); 1595 this.fileEncryptionInfo = stat.getFileEncryptionInfo(); 1596 this.progress = progress; 1597 this.cachingStrategy = new AtomicReference<CachingStrategy>( 1598 dfsClient.getDefaultWriteCachingStrategy()); 1599 if ((progress != null) && DFSClient.LOG.isDebugEnabled()) { 1600 DFSClient.LOG.debug( 1601 "Set non-null progress callback on DFSOutputStream " + src); 1602 } 1603 1604 this.bytesPerChecksum = checksum.getBytesPerChecksum(); 1605 if (bytesPerChecksum <= 0) { 1606 throw new HadoopIllegalArgumentException( 1607 "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0"); 1608 } 1609 if (blockSize % bytesPerChecksum != 0) { 1610 throw new HadoopIllegalArgumentException("Invalid values: " 1611 + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum 1612 + ") must divide block size (=" + blockSize + ")."); 1613 } 1614 this.checksum4WriteBlock = checksum; 1615 1616 this.dfsclientSlowLogThresholdMs = 1617 dfsClient.getConf().dfsclientSlowIoWarningThresholdMs; 1618 this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager(); 1619 } 1620 1621 /** Construct a new output stream for creating a file. */ DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes)1622 private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, 1623 EnumSet<CreateFlag> flag, Progressable progress, 1624 DataChecksum checksum, String[] favoredNodes) throws IOException { 1625 this(dfsClient, src, progress, stat, checksum); 1626 this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); 1627 1628 computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); 1629 1630 streamer = new DataStreamer(stat, null); 1631 if (favoredNodes != null && favoredNodes.length != 0) { 1632 streamer.setFavoredNodes(favoredNodes); 1633 } 1634 } 1635 newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes)1636 static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, 1637 FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, 1638 short replication, long blockSize, Progressable progress, int buffersize, 1639 DataChecksum checksum, String[] favoredNodes) throws IOException { 1640 TraceScope scope = 1641 dfsClient.getPathTraceScope("newStreamForCreate", src); 1642 try { 1643 HdfsFileStatus stat = null; 1644 1645 // Retry the create if we get a RetryStartFileException up to a maximum 1646 // number of times 1647 boolean shouldRetry = true; 1648 int retryCount = CREATE_RETRY_COUNT; 1649 while (shouldRetry) { 1650 shouldRetry = false; 1651 try { 1652 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, 1653 new EnumSetWritable<CreateFlag>(flag), createParent, replication, 1654 blockSize, SUPPORTED_CRYPTO_VERSIONS); 1655 break; 1656 } catch (RemoteException re) { 1657 IOException e = re.unwrapRemoteException( 1658 AccessControlException.class, 1659 DSQuotaExceededException.class, 1660 FileAlreadyExistsException.class, 1661 FileNotFoundException.class, 1662 ParentNotDirectoryException.class, 1663 NSQuotaExceededException.class, 1664 RetryStartFileException.class, 1665 SafeModeException.class, 1666 UnresolvedPathException.class, 1667 SnapshotAccessControlException.class, 1668 UnknownCryptoProtocolVersionException.class); 1669 if (e instanceof RetryStartFileException) { 1670 if (retryCount > 0) { 1671 shouldRetry = true; 1672 retryCount--; 1673 } else { 1674 throw new IOException("Too many retries because of encryption" + 1675 " zone operations", e); 1676 } 1677 } else { 1678 throw e; 1679 } 1680 } 1681 } 1682 Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); 1683 final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, 1684 flag, progress, checksum, favoredNodes); 1685 out.start(); 1686 return out; 1687 } finally { 1688 scope.close(); 1689 } 1690 } 1691 1692 /** Construct a new output stream for append. */ DFSOutputStream(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)1693 private DFSOutputStream(DFSClient dfsClient, String src, 1694 EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, 1695 HdfsFileStatus stat, DataChecksum checksum) throws IOException { 1696 this(dfsClient, src, progress, stat, checksum); 1697 initialFileSize = stat.getLen(); // length of file when opened 1698 this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); 1699 1700 boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK); 1701 1702 // The last partial block of the file has to be filled. 1703 if (!toNewBlock && lastBlock != null) { 1704 // indicate that we are appending to an existing block 1705 bytesCurBlock = lastBlock.getBlockSize(); 1706 streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum); 1707 } else { 1708 computePacketChunkSize(dfsClient.getConf().writePacketSize, 1709 bytesPerChecksum); 1710 streamer = new DataStreamer(stat, 1711 lastBlock != null ? lastBlock.getBlock() : null); 1712 } 1713 this.fileEncryptionInfo = stat.getFileEncryptionInfo(); 1714 } 1715 newStreamForAppend(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)1716 static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, 1717 EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, 1718 LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, 1719 String[] favoredNodes) throws IOException { 1720 TraceScope scope = 1721 dfsClient.getPathTraceScope("newStreamForAppend", src); 1722 try { 1723 final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, 1724 progress, lastBlock, stat, checksum); 1725 if (favoredNodes != null && favoredNodes.length != 0) { 1726 out.streamer.setFavoredNodes(favoredNodes); 1727 } 1728 out.start(); 1729 return out; 1730 } finally { 1731 scope.close(); 1732 } 1733 } 1734 isLazyPersist(HdfsFileStatus stat)1735 private static boolean isLazyPersist(HdfsFileStatus stat) { 1736 final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy( 1737 HdfsConstants.MEMORY_STORAGE_POLICY_NAME); 1738 return p != null && stat.getStoragePolicy() == p.getId(); 1739 } 1740 computePacketChunkSize(int psize, int csize)1741 private void computePacketChunkSize(int psize, int csize) { 1742 final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN; 1743 final int chunkSize = csize + getChecksumSize(); 1744 chunksPerPacket = Math.max(bodySize/chunkSize, 1); 1745 packetSize = chunkSize*chunksPerPacket; 1746 if (DFSClient.LOG.isDebugEnabled()) { 1747 DFSClient.LOG.debug("computePacketChunkSize: src=" + src + 1748 ", chunkSize=" + chunkSize + 1749 ", chunksPerPacket=" + chunksPerPacket + 1750 ", packetSize=" + packetSize); 1751 } 1752 } 1753 queueCurrentPacket()1754 private void queueCurrentPacket() { 1755 synchronized (dataQueue) { 1756 if (currentPacket == null) return; 1757 currentPacket.addTraceParent(Trace.currentSpan()); 1758 dataQueue.addLast(currentPacket); 1759 lastQueuedSeqno = currentPacket.getSeqno(); 1760 if (DFSClient.LOG.isDebugEnabled()) { 1761 DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno()); 1762 } 1763 currentPacket = null; 1764 dataQueue.notifyAll(); 1765 } 1766 } 1767 waitAndQueueCurrentPacket()1768 private void waitAndQueueCurrentPacket() throws IOException { 1769 synchronized (dataQueue) { 1770 try { 1771 // If queue is full, then wait till we have enough space 1772 boolean firstWait = true; 1773 try { 1774 while (!isClosed() && dataQueue.size() + ackQueue.size() > 1775 dfsClient.getConf().writeMaxPackets) { 1776 if (firstWait) { 1777 Span span = Trace.currentSpan(); 1778 if (span != null) { 1779 span.addTimelineAnnotation("dataQueue.wait"); 1780 } 1781 firstWait = false; 1782 } 1783 try { 1784 dataQueue.wait(); 1785 } catch (InterruptedException e) { 1786 // If we get interrupted while waiting to queue data, we still need to get rid 1787 // of the current packet. This is because we have an invariant that if 1788 // currentPacket gets full, it will get queued before the next writeChunk. 1789 // 1790 // Rather than wait around for space in the queue, we should instead try to 1791 // return to the caller as soon as possible, even though we slightly overrun 1792 // the MAX_PACKETS length. 1793 Thread.currentThread().interrupt(); 1794 break; 1795 } 1796 } 1797 } finally { 1798 Span span = Trace.currentSpan(); 1799 if ((span != null) && (!firstWait)) { 1800 span.addTimelineAnnotation("end.wait"); 1801 } 1802 } 1803 checkClosed(); 1804 queueCurrentPacket(); 1805 } catch (ClosedChannelException e) { 1806 } 1807 } 1808 } 1809 1810 // @see FSOutputSummer#writeChunk() 1811 @Override writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)1812 protected synchronized void writeChunk(byte[] b, int offset, int len, 1813 byte[] checksum, int ckoff, int cklen) throws IOException { 1814 TraceScope scope = 1815 dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src); 1816 try { 1817 writeChunkImpl(b, offset, len, checksum, ckoff, cklen); 1818 } finally { 1819 scope.close(); 1820 } 1821 } 1822 writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)1823 private synchronized void writeChunkImpl(byte[] b, int offset, int len, 1824 byte[] checksum, int ckoff, int cklen) throws IOException { 1825 dfsClient.checkOpen(); 1826 checkClosed(); 1827 1828 if (len > bytesPerChecksum) { 1829 throw new IOException("writeChunk() buffer size is " + len + 1830 " is larger than supported bytesPerChecksum " + 1831 bytesPerChecksum); 1832 } 1833 if (cklen != 0 && cklen != getChecksumSize()) { 1834 throw new IOException("writeChunk() checksum size is supposed to be " + 1835 getChecksumSize() + " but found to be " + cklen); 1836 } 1837 1838 if (currentPacket == null) { 1839 currentPacket = createPacket(packetSize, chunksPerPacket, 1840 bytesCurBlock, currentSeqno++, false); 1841 if (DFSClient.LOG.isDebugEnabled()) { 1842 DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 1843 currentPacket.getSeqno() + 1844 ", src=" + src + 1845 ", packetSize=" + packetSize + 1846 ", chunksPerPacket=" + chunksPerPacket + 1847 ", bytesCurBlock=" + bytesCurBlock); 1848 } 1849 } 1850 1851 currentPacket.writeChecksum(checksum, ckoff, cklen); 1852 currentPacket.writeData(b, offset, len); 1853 currentPacket.incNumChunks(); 1854 bytesCurBlock += len; 1855 1856 // If packet is full, enqueue it for transmission 1857 // 1858 if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || 1859 bytesCurBlock == blockSize) { 1860 if (DFSClient.LOG.isDebugEnabled()) { 1861 DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" + 1862 currentPacket.getSeqno() + 1863 ", src=" + src + 1864 ", bytesCurBlock=" + bytesCurBlock + 1865 ", blockSize=" + blockSize + 1866 ", appendChunk=" + appendChunk); 1867 } 1868 waitAndQueueCurrentPacket(); 1869 1870 // If the reopened file did not end at chunk boundary and the above 1871 // write filled up its partial chunk. Tell the summer to generate full 1872 // crc chunks from now on. 1873 if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) { 1874 appendChunk = false; 1875 resetChecksumBufSize(); 1876 } 1877 1878 if (!appendChunk) { 1879 int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize); 1880 computePacketChunkSize(psize, bytesPerChecksum); 1881 } 1882 // 1883 // if encountering a block boundary, send an empty packet to 1884 // indicate the end of block and reset bytesCurBlock. 1885 // 1886 if (bytesCurBlock == blockSize) { 1887 currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); 1888 currentPacket.setSyncBlock(shouldSyncBlock); 1889 waitAndQueueCurrentPacket(); 1890 bytesCurBlock = 0; 1891 lastFlushOffset = 0; 1892 } 1893 } 1894 } 1895 1896 @Deprecated sync()1897 public void sync() throws IOException { 1898 hflush(); 1899 } 1900 1901 /** 1902 * Flushes out to all replicas of the block. The data is in the buffers 1903 * of the DNs but not necessarily in the DN's OS buffers. 1904 * 1905 * It is a synchronous operation. When it returns, 1906 * it guarantees that flushed data become visible to new readers. 1907 * It is not guaranteed that data has been flushed to 1908 * persistent store on the datanode. 1909 * Block allocations are persisted on namenode. 1910 */ 1911 @Override hflush()1912 public void hflush() throws IOException { 1913 TraceScope scope = 1914 dfsClient.getPathTraceScope("hflush", src); 1915 try { 1916 flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); 1917 } finally { 1918 scope.close(); 1919 } 1920 } 1921 1922 @Override hsync()1923 public void hsync() throws IOException { 1924 TraceScope scope = 1925 dfsClient.getPathTraceScope("hsync", src); 1926 try { 1927 flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); 1928 } finally { 1929 scope.close(); 1930 } 1931 } 1932 1933 /** 1934 * The expected semantics is all data have flushed out to all replicas 1935 * and all replicas have done posix fsync equivalent - ie the OS has 1936 * flushed it to the disk device (but the disk may have it in its cache). 1937 * 1938 * Note that only the current block is flushed to the disk device. 1939 * To guarantee durable sync across block boundaries the stream should 1940 * be created with {@link CreateFlag#SYNC_BLOCK}. 1941 * 1942 * @param syncFlags 1943 * Indicate the semantic of the sync. Currently used to specify 1944 * whether or not to update the block length in NameNode. 1945 */ hsync(EnumSet<SyncFlag> syncFlags)1946 public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { 1947 TraceScope scope = 1948 dfsClient.getPathTraceScope("hsync", src); 1949 try { 1950 flushOrSync(true, syncFlags); 1951 } finally { 1952 scope.close(); 1953 } 1954 } 1955 1956 /** 1957 * Flush/Sync buffered data to DataNodes. 1958 * 1959 * @param isSync 1960 * Whether or not to require all replicas to flush data to the disk 1961 * device 1962 * @param syncFlags 1963 * Indicate extra detailed semantic of the flush/sync. Currently 1964 * mainly used to specify whether or not to update the file length in 1965 * the NameNode 1966 * @throws IOException 1967 */ flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)1968 private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags) 1969 throws IOException { 1970 dfsClient.checkOpen(); 1971 checkClosed(); 1972 try { 1973 long toWaitFor; 1974 long lastBlockLength = -1L; 1975 boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH); 1976 boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK); 1977 synchronized (this) { 1978 // flush checksum buffer, but keep checksum buffer intact if we do not 1979 // need to end the current block 1980 int numKept = flushBuffer(!endBlock, true); 1981 // bytesCurBlock potentially incremented if there was buffered data 1982 1983 if (DFSClient.LOG.isDebugEnabled()) { 1984 DFSClient.LOG.debug("DFSClient flush():" 1985 + " bytesCurBlock=" + bytesCurBlock 1986 + " lastFlushOffset=" + lastFlushOffset 1987 + " createNewBlock=" + endBlock); 1988 } 1989 // Flush only if we haven't already flushed till this offset. 1990 if (lastFlushOffset != bytesCurBlock) { 1991 assert bytesCurBlock > lastFlushOffset; 1992 // record the valid offset of this flush 1993 lastFlushOffset = bytesCurBlock; 1994 if (isSync && currentPacket == null && !endBlock) { 1995 // Nothing to send right now, 1996 // but sync was requested. 1997 // Send an empty packet if we do not end the block right now 1998 currentPacket = createPacket(packetSize, chunksPerPacket, 1999 bytesCurBlock, currentSeqno++, false); 2000 } 2001 } else { 2002 if (isSync && bytesCurBlock > 0 && !endBlock) { 2003 // Nothing to send right now, 2004 // and the block was partially written, 2005 // and sync was requested. 2006 // So send an empty sync packet if we do not end the block right now 2007 currentPacket = createPacket(packetSize, chunksPerPacket, 2008 bytesCurBlock, currentSeqno++, false); 2009 } else if (currentPacket != null) { 2010 // just discard the current packet since it is already been sent. 2011 currentPacket.releaseBuffer(byteArrayManager); 2012 currentPacket = null; 2013 } 2014 } 2015 if (currentPacket != null) { 2016 currentPacket.setSyncBlock(isSync); 2017 waitAndQueueCurrentPacket(); 2018 } 2019 if (endBlock && bytesCurBlock > 0) { 2020 // Need to end the current block, thus send an empty packet to 2021 // indicate this is the end of the block and reset bytesCurBlock 2022 currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); 2023 currentPacket.setSyncBlock(shouldSyncBlock || isSync); 2024 waitAndQueueCurrentPacket(); 2025 bytesCurBlock = 0; 2026 lastFlushOffset = 0; 2027 } else { 2028 // Restore state of stream. Record the last flush offset 2029 // of the last full chunk that was flushed. 2030 bytesCurBlock -= numKept; 2031 } 2032 2033 toWaitFor = lastQueuedSeqno; 2034 } // end synchronized 2035 2036 waitForAckedSeqno(toWaitFor); 2037 2038 // update the block length first time irrespective of flag 2039 if (updateLength || persistBlocks.get()) { 2040 synchronized (this) { 2041 if (streamer != null && streamer.block != null) { 2042 lastBlockLength = streamer.block.getNumBytes(); 2043 } 2044 } 2045 } 2046 // If 1) any new blocks were allocated since the last flush, or 2) to 2047 // update length in NN is required, then persist block locations on 2048 // namenode. 2049 if (persistBlocks.getAndSet(false) || updateLength) { 2050 try { 2051 dfsClient.namenode.fsync(src, fileId, dfsClient.clientName, 2052 lastBlockLength); 2053 } catch (IOException ioe) { 2054 DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); 2055 // If we got an error here, it might be because some other thread called 2056 // close before our hflush completed. In that case, we should throw an 2057 // exception that the stream is closed. 2058 checkClosed(); 2059 // If we aren't closed but failed to sync, we should expose that to the 2060 // caller. 2061 throw ioe; 2062 } 2063 } 2064 2065 synchronized(this) { 2066 if (streamer != null) { 2067 streamer.setHflush(); 2068 } 2069 } 2070 } catch (InterruptedIOException interrupt) { 2071 // This kind of error doesn't mean that the stream itself is broken - just the 2072 // flushing thread got interrupted. So, we shouldn't close down the writer, 2073 // but instead just propagate the error 2074 throw interrupt; 2075 } catch (IOException e) { 2076 DFSClient.LOG.warn("Error while syncing", e); 2077 synchronized (this) { 2078 if (!isClosed()) { 2079 lastException.set(new IOException("IOException flush: " + e)); 2080 closeThreads(true); 2081 } 2082 } 2083 throw e; 2084 } 2085 } 2086 2087 /** 2088 * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}. 2089 */ 2090 @Deprecated getNumCurrentReplicas()2091 public synchronized int getNumCurrentReplicas() throws IOException { 2092 return getCurrentBlockReplication(); 2093 } 2094 2095 /** 2096 * Note that this is not a public API; 2097 * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead. 2098 * 2099 * @return the number of valid replicas of the current block 2100 */ getCurrentBlockReplication()2101 public synchronized int getCurrentBlockReplication() throws IOException { 2102 dfsClient.checkOpen(); 2103 checkClosed(); 2104 if (streamer == null) { 2105 return blockReplication; // no pipeline, return repl factor of file 2106 } 2107 DatanodeInfo[] currentNodes = streamer.getNodes(); 2108 if (currentNodes == null) { 2109 return blockReplication; // no pipeline, return repl factor of file 2110 } 2111 return currentNodes.length; 2112 } 2113 2114 /** 2115 * Waits till all existing data is flushed and confirmations 2116 * received from datanodes. 2117 */ flushInternal()2118 private void flushInternal() throws IOException { 2119 long toWaitFor; 2120 synchronized (this) { 2121 dfsClient.checkOpen(); 2122 checkClosed(); 2123 // 2124 // If there is data in the current buffer, send it across 2125 // 2126 queueCurrentPacket(); 2127 toWaitFor = lastQueuedSeqno; 2128 } 2129 2130 waitForAckedSeqno(toWaitFor); 2131 } 2132 waitForAckedSeqno(long seqno)2133 private void waitForAckedSeqno(long seqno) throws IOException { 2134 TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); 2135 try { 2136 if (DFSClient.LOG.isDebugEnabled()) { 2137 DFSClient.LOG.debug("Waiting for ack for: " + seqno); 2138 } 2139 long begin = Time.monotonicNow(); 2140 try { 2141 synchronized (dataQueue) { 2142 while (!isClosed()) { 2143 checkClosed(); 2144 if (lastAckedSeqno >= seqno) { 2145 break; 2146 } 2147 try { 2148 dataQueue.wait(1000); // when we receive an ack, we notify on 2149 // dataQueue 2150 } catch (InterruptedException ie) { 2151 throw new InterruptedIOException( 2152 "Interrupted while waiting for data to be acknowledged by pipeline"); 2153 } 2154 } 2155 } 2156 checkClosed(); 2157 } catch (ClosedChannelException e) { 2158 } 2159 long duration = Time.monotonicNow() - begin; 2160 if (duration > dfsclientSlowLogThresholdMs) { 2161 DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration 2162 + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); 2163 } 2164 } finally { 2165 scope.close(); 2166 } 2167 } 2168 start()2169 private synchronized void start() { 2170 streamer.start(); 2171 } 2172 2173 /** 2174 * Aborts this output stream and releases any system 2175 * resources associated with this stream. 2176 */ abort()2177 void abort() throws IOException { 2178 synchronized (this) { 2179 if (isClosed()) { 2180 return; 2181 } 2182 streamer.setLastException(new IOException("Lease timeout of " 2183 + (dfsClient.getHdfsTimeout() / 1000) + " seconds expired.")); 2184 closeThreads(true); 2185 } 2186 dfsClient.endFileLease(fileId); 2187 } 2188 isClosed()2189 boolean isClosed() { 2190 return closed; 2191 } 2192 setClosed()2193 void setClosed() { 2194 closed = true; 2195 synchronized (dataQueue) { 2196 releaseBuffer(dataQueue, byteArrayManager); 2197 releaseBuffer(ackQueue, byteArrayManager); 2198 } 2199 } 2200 releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam)2201 private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) { 2202 for (DFSPacket p : packets) { 2203 p.releaseBuffer(bam); 2204 } 2205 packets.clear(); 2206 } 2207 2208 // shutdown datastreamer and responseprocessor threads. 2209 // interrupt datastreamer if force is true closeThreads(boolean force)2210 private void closeThreads(boolean force) throws IOException { 2211 try { 2212 streamer.close(force); 2213 streamer.join(); 2214 if (s != null) { 2215 s.close(); 2216 } 2217 } catch (InterruptedException e) { 2218 throw new IOException("Failed to shutdown streamer"); 2219 } finally { 2220 streamer = null; 2221 s = null; 2222 setClosed(); 2223 } 2224 } 2225 2226 /** 2227 * Closes this output stream and releases any system 2228 * resources associated with this stream. 2229 */ 2230 @Override close()2231 public void close() throws IOException { 2232 synchronized (this) { 2233 TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#close", 2234 src); 2235 try { 2236 closeImpl(); 2237 } finally { 2238 scope.close(); 2239 } 2240 } 2241 dfsClient.endFileLease(fileId); 2242 } 2243 closeImpl()2244 private synchronized void closeImpl() throws IOException { 2245 if (isClosed()) { 2246 IOException e = lastException.getAndSet(null); 2247 if (e == null) 2248 return; 2249 else 2250 throw e; 2251 } 2252 2253 try { 2254 flushBuffer(); // flush from all upper layers 2255 2256 if (currentPacket != null) { 2257 waitAndQueueCurrentPacket(); 2258 } 2259 2260 if (bytesCurBlock != 0) { 2261 // send an empty packet to mark the end of the block 2262 currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true); 2263 currentPacket.setSyncBlock(shouldSyncBlock); 2264 } 2265 2266 flushInternal(); // flush all data to Datanodes 2267 // get last block before destroying the streamer 2268 ExtendedBlock lastBlock = streamer.getBlock(); 2269 closeThreads(false); 2270 TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); 2271 try { 2272 completeFile(lastBlock); 2273 } finally { 2274 scope.close(); 2275 } 2276 } catch (ClosedChannelException e) { 2277 } finally { 2278 setClosed(); 2279 } 2280 } 2281 2282 // should be called holding (this) lock since setTestFilename() may 2283 // be called during unit tests completeFile(ExtendedBlock last)2284 private void completeFile(ExtendedBlock last) throws IOException { 2285 long localstart = Time.monotonicNow(); 2286 long localTimeout = 400; 2287 boolean fileComplete = false; 2288 int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry; 2289 while (!fileComplete) { 2290 fileComplete = 2291 dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); 2292 if (!fileComplete) { 2293 final int hdfsTimeout = dfsClient.getHdfsTimeout(); 2294 if (!dfsClient.clientRunning 2295 || (hdfsTimeout > 0 2296 && localstart + hdfsTimeout < Time.monotonicNow())) { 2297 String msg = "Unable to close file because dfsclient " + 2298 " was unable to contact the HDFS servers." + 2299 " clientRunning " + dfsClient.clientRunning + 2300 " hdfsTimeout " + hdfsTimeout; 2301 DFSClient.LOG.info(msg); 2302 throw new IOException(msg); 2303 } 2304 try { 2305 if (retries == 0) { 2306 throw new IOException("Unable to close file because the last block" 2307 + " does not have enough number of replicas."); 2308 } 2309 retries--; 2310 Thread.sleep(localTimeout); 2311 localTimeout *= 2; 2312 if (Time.monotonicNow() - localstart > 5000) { 2313 DFSClient.LOG.info("Could not complete " + src + " retrying..."); 2314 } 2315 } catch (InterruptedException ie) { 2316 DFSClient.LOG.warn("Caught exception ", ie); 2317 } 2318 } 2319 } 2320 } 2321 2322 @VisibleForTesting setArtificialSlowdown(long period)2323 public void setArtificialSlowdown(long period) { 2324 artificialSlowdown = period; 2325 } 2326 2327 @VisibleForTesting setChunksPerPacket(int value)2328 public synchronized void setChunksPerPacket(int value) { 2329 chunksPerPacket = Math.min(chunksPerPacket, value); 2330 packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket; 2331 } 2332 setTestFilename(String newname)2333 synchronized void setTestFilename(String newname) { 2334 src = newname; 2335 } 2336 2337 /** 2338 * Returns the size of a file as it was when this stream was opened 2339 */ getInitialLen()2340 public long getInitialLen() { 2341 return initialFileSize; 2342 } 2343 2344 /** 2345 * @return the FileEncryptionInfo for this stream, or null if not encrypted. 2346 */ getFileEncryptionInfo()2347 public FileEncryptionInfo getFileEncryptionInfo() { 2348 return fileEncryptionInfo; 2349 } 2350 2351 /** 2352 * Returns the access token currently used by streamer, for testing only 2353 */ getBlockToken()2354 synchronized Token<BlockTokenIdentifier> getBlockToken() { 2355 return streamer.getBlockToken(); 2356 } 2357 2358 @Override setDropBehind(Boolean dropBehind)2359 public void setDropBehind(Boolean dropBehind) throws IOException { 2360 CachingStrategy prevStrategy, nextStrategy; 2361 // CachingStrategy is immutable. So build a new CachingStrategy with the 2362 // modifications we want, and compare-and-swap it in. 2363 do { 2364 prevStrategy = this.cachingStrategy.get(); 2365 nextStrategy = new CachingStrategy.Builder(prevStrategy). 2366 setDropBehind(dropBehind).build(); 2367 } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy)); 2368 } 2369 2370 @VisibleForTesting getBlock()2371 ExtendedBlock getBlock() { 2372 return streamer.getBlock(); 2373 } 2374 2375 @VisibleForTesting getFileId()2376 public long getFileId() { 2377 return fileId; 2378 } 2379 arraycopy(T[] srcs, T[] dsts, int skipIndex)2380 private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) { 2381 System.arraycopy(srcs, 0, dsts, 0, skipIndex); 2382 System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); 2383 } 2384 } 2385