1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
21 
22 import java.io.BufferedOutputStream;
23 import java.io.DataInputStream;
24 import java.io.DataOutputStream;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InterruptedIOException;
29 import java.io.OutputStream;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.Socket;
33 import java.nio.channels.ClosedChannelException;
34 import java.util.ArrayList;
35 import java.util.Arrays;
36 import java.util.EnumSet;
37 import java.util.HashSet;
38 import java.util.LinkedList;
39 import java.util.List;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.atomic.AtomicReference;
44 
45 import org.apache.hadoop.HadoopIllegalArgumentException;
46 import org.apache.hadoop.classification.InterfaceAudience;
47 import org.apache.hadoop.crypto.CryptoProtocolVersion;
48 import org.apache.hadoop.fs.CanSetDropBehind;
49 import org.apache.hadoop.fs.CreateFlag;
50 import org.apache.hadoop.fs.FSOutputSummer;
51 import org.apache.hadoop.fs.FileAlreadyExistsException;
52 import org.apache.hadoop.fs.FileEncryptionInfo;
53 import org.apache.hadoop.fs.ParentNotDirectoryException;
54 import org.apache.hadoop.fs.permission.FsPermission;
55 import org.apache.hadoop.fs.StorageType;
56 import org.apache.hadoop.fs.Syncable;
57 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
58 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
59 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
60 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
61 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
62 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
63 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
64 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
65 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
66 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
67 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
68 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
69 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
70 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
71 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
72 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
73 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
74 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
75 import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
76 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
77 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
78 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
79 import org.apache.hadoop.hdfs.protocolPB.PBHelper;
80 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
81 import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
82 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
83 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
84 import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
85 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
86 import org.apache.hadoop.hdfs.util.ByteArrayManager;
87 import org.apache.hadoop.io.EnumSetWritable;
88 import org.apache.hadoop.io.IOUtils;
89 import org.apache.hadoop.ipc.RemoteException;
90 import org.apache.hadoop.net.NetUtils;
91 import org.apache.hadoop.security.AccessControlException;
92 import org.apache.hadoop.security.token.Token;
93 import org.apache.hadoop.util.Daemon;
94 import org.apache.hadoop.util.DataChecksum;
95 import org.apache.hadoop.util.DataChecksum.Type;
96 import org.apache.hadoop.util.Progressable;
97 import org.apache.hadoop.util.Time;
98 import org.apache.htrace.NullScope;
99 import org.apache.htrace.Sampler;
100 import org.apache.htrace.Span;
101 import org.apache.htrace.Trace;
102 import org.apache.htrace.TraceInfo;
103 import org.apache.htrace.TraceScope;
104 
105 import com.google.common.annotations.VisibleForTesting;
106 import com.google.common.base.Preconditions;
107 import com.google.common.cache.CacheBuilder;
108 import com.google.common.cache.CacheLoader;
109 import com.google.common.cache.LoadingCache;
110 import com.google.common.cache.RemovalListener;
111 import com.google.common.cache.RemovalNotification;
112 
113 
114 /****************************************************************
115  * DFSOutputStream creates files from a stream of bytes.
116  *
117  * The client application writes data that is cached internally by
118  * this stream. Data is broken up into packets, each packet is
119  * typically 64K in size. A packet comprises of chunks. Each chunk
120  * is typically 512 bytes and has an associated checksum with it.
121  *
122  * When a client application fills up the currentPacket, it is
123  * enqueued into dataQueue.  The DataStreamer thread picks up
124  * packets from the dataQueue, sends it to the first datanode in
125  * the pipeline and moves it from the dataQueue to the ackQueue.
126  * The ResponseProcessor receives acks from the datanodes. When an
127  * successful ack for a packet is received from all datanodes, the
128  * ResponseProcessor removes the corresponding packet from the
129  * ackQueue.
130  *
131  * In case of error, all outstanding packets and moved from
132  * ackQueue. A new pipeline is setup by eliminating the bad
133  * datanode from the original pipeline. The DataStreamer now
134  * starts sending packets from the dataQueue.
135 ****************************************************************/
136 @InterfaceAudience.Private
137 public class DFSOutputStream extends FSOutputSummer
138     implements Syncable, CanSetDropBehind {
139   private final long dfsclientSlowLogThresholdMs;
140   /**
141    * Number of times to retry creating a file when there are transient
142    * errors (typically related to encryption zones and KeyProvider operations).
143    */
144   @VisibleForTesting
145   static final int CREATE_RETRY_COUNT = 10;
146   @VisibleForTesting
147   static CryptoProtocolVersion[] SUPPORTED_CRYPTO_VERSIONS =
148       CryptoProtocolVersion.supported();
149 
150   private final DFSClient dfsClient;
151   private final ByteArrayManager byteArrayManager;
152   private Socket s;
153   // closed is accessed by different threads under different locks.
154   private volatile boolean closed = false;
155 
156   private String src;
157   private final long fileId;
158   private final long blockSize;
159   /** Only for DataTransferProtocol.writeBlock(..) */
160   private final DataChecksum checksum4WriteBlock;
161   private final int bytesPerChecksum;
162 
163   // both dataQueue and ackQueue are protected by dataQueue lock
164   private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
165   private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
166   private DFSPacket currentPacket = null;
167   private DataStreamer streamer;
168   private long currentSeqno = 0;
169   private long lastQueuedSeqno = -1;
170   private long lastAckedSeqno = -1;
171   private long bytesCurBlock = 0; // bytes written in current block
172   private int packetSize = 0; // write packet size, not including the header.
173   private int chunksPerPacket = 0;
174   private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
175   private long artificialSlowdown = 0;
176   private long lastFlushOffset = 0; // offset when flush was invoked
177   //persist blocks on namenode
178   private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
179   private volatile boolean appendChunk = false;   // appending to existing partial block
180   private long initialFileSize = 0; // at time of file open
181   private final Progressable progress;
182   private final short blockReplication; // replication factor of file
183   private boolean shouldSyncBlock = false; // force blocks to disk upon close
184   private final AtomicReference<CachingStrategy> cachingStrategy;
185   private boolean failPacket = false;
186   private FileEncryptionInfo fileEncryptionInfo;
187   private static final BlockStoragePolicySuite blockStoragePolicySuite =
188       BlockStoragePolicySuite.createDefaultSuite();
189 
190   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
createPacket(int packetSize, int chunksPerPkt, long offsetInBlock, long seqno, boolean lastPacketInBlock)191   private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
192       long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
193     final byte[] buf;
194     final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
195 
196     try {
197       buf = byteArrayManager.newByteArray(bufferSize);
198     } catch (InterruptedException ie) {
199       final InterruptedIOException iioe = new InterruptedIOException(
200           "seqno=" + seqno);
201       iioe.initCause(ie);
202       throw iioe;
203     }
204 
205     return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
206                          getChecksumSize(), lastPacketInBlock);
207   }
208 
209   /**
210    * For heartbeat packets, create buffer directly by new byte[]
211    * since heartbeats should not be blocked.
212    */
createHeartbeatPacket()213   private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
214     final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
215     return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
216                          getChecksumSize(), false);
217   }
218 
219   //
220   // The DataStreamer class is responsible for sending data packets to the
221   // datanodes in the pipeline. It retrieves a new blockid and block locations
222   // from the namenode, and starts streaming packets to the pipeline of
223   // Datanodes. Every packet has a sequence number associated with
224   // it. When all the packets for a block are sent out and acks for each
225   // if them are received, the DataStreamer closes the current block.
226   //
227   class DataStreamer extends Daemon {
228     private volatile boolean streamerClosed = false;
229     private volatile ExtendedBlock block; // its length is number of bytes acked
230     private Token<BlockTokenIdentifier> accessToken;
231     private DataOutputStream blockStream;
232     private DataInputStream blockReplyStream;
233     private ResponseProcessor response = null;
234     private volatile DatanodeInfo[] nodes = null; // list of targets for current block
235     private volatile StorageType[] storageTypes = null;
236     private volatile String[] storageIDs = null;
237     private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
238         CacheBuilder.newBuilder()
239         .expireAfterWrite(
240             dfsClient.getConf().excludedNodesCacheExpiry,
241             TimeUnit.MILLISECONDS)
242         .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
243           @Override
244           public void onRemoval(
245               RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
246             DFSClient.LOG.info("Removing node " +
247                 notification.getKey() + " from the excluded nodes list");
248           }
249         })
250         .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
251           @Override
252           public DatanodeInfo load(DatanodeInfo key) throws Exception {
253             return key;
254           }
255         });
256     private String[] favoredNodes;
257     volatile boolean hasError = false;
258     volatile int errorIndex = -1;
259     // Restarting node index
260     AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
261     private long restartDeadline = 0; // Deadline of DN restart
262     private BlockConstructionStage stage;  // block construction stage
263     private long bytesSent = 0; // number of bytes that've been sent
264     private final boolean isLazyPersistFile;
265 
266     /** Nodes have been used in the pipeline before and have failed. */
267     private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
268     /** The last ack sequence number before pipeline failure. */
269     private long lastAckedSeqnoBeforeFailure = -1;
270     private int pipelineRecoveryCount = 0;
271     /** Has the current block been hflushed? */
272     private boolean isHflushed = false;
273     /** Append on an existing block? */
274     private final boolean isAppend;
275 
DataStreamer(HdfsFileStatus stat, ExtendedBlock block)276     private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
277       isAppend = false;
278       isLazyPersistFile = isLazyPersist(stat);
279       this.block = block;
280       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
281     }
282 
283     /**
284      * Construct a data streamer for appending to the last partial block
285      * @param lastBlock last block of the file to be appended
286      * @param stat status of the file to be appended
287      * @param bytesPerChecksum number of bytes per checksum
288      * @throws IOException if error occurs
289      */
DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, int bytesPerChecksum)290     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
291         int bytesPerChecksum) throws IOException {
292       isAppend = true;
293       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
294       block = lastBlock.getBlock();
295       bytesSent = block.getNumBytes();
296       accessToken = lastBlock.getBlockToken();
297       isLazyPersistFile = isLazyPersist(stat);
298       long usedInLastBlock = stat.getLen() % blockSize;
299       int freeInLastBlock = (int)(blockSize - usedInLastBlock);
300 
301       // calculate the amount of free space in the pre-existing
302       // last crc chunk
303       int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
304       int freeInCksum = bytesPerChecksum - usedInCksum;
305 
306       // if there is space in the last block, then we have to
307       // append to that block
308       if (freeInLastBlock == blockSize) {
309         throw new IOException("The last block for file " +
310             src + " is full.");
311       }
312 
313       if (usedInCksum > 0 && freeInCksum > 0) {
314         // if there is space in the last partial chunk, then
315         // setup in such a way that the next packet will have only
316         // one chunk that fills up the partial chunk.
317         //
318         computePacketChunkSize(0, freeInCksum);
319         setChecksumBufSize(freeInCksum);
320         appendChunk = true;
321       } else {
322         // if the remaining space in the block is smaller than
323         // that expected size of of a packet, then create
324         // smaller size packet.
325         //
326         computePacketChunkSize(Math.min(dfsClient.getConf().writePacketSize, freeInLastBlock),
327             bytesPerChecksum);
328       }
329 
330       // setup pipeline to append to the last block XXX retries??
331       setPipeline(lastBlock);
332       errorIndex = -1;   // no errors yet.
333       if (nodes.length < 1) {
334         throw new IOException("Unable to retrieve blocks locations " +
335             " for last block " + block +
336             "of file " + src);
337 
338       }
339     }
340 
setPipeline(LocatedBlock lb)341     private void setPipeline(LocatedBlock lb) {
342       setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
343     }
setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, String[] storageIDs)344     private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes,
345         String[] storageIDs) {
346       this.nodes = nodes;
347       this.storageTypes = storageTypes;
348       this.storageIDs = storageIDs;
349     }
350 
setFavoredNodes(String[] favoredNodes)351     private void setFavoredNodes(String[] favoredNodes) {
352       this.favoredNodes = favoredNodes;
353     }
354 
355     /**
356      * Initialize for data streaming
357      */
initDataStreaming()358     private void initDataStreaming() {
359       this.setName("DataStreamer for file " + src +
360           " block " + block);
361       response = new ResponseProcessor(nodes);
362       response.start();
363       stage = BlockConstructionStage.DATA_STREAMING;
364     }
365 
endBlock()366     private void endBlock() {
367       if(DFSClient.LOG.isDebugEnabled()) {
368         DFSClient.LOG.debug("Closing old block " + block);
369       }
370       this.setName("DataStreamer for file " + src);
371       closeResponder();
372       closeStream();
373       setPipeline(null, null, null);
374       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
375     }
376 
377     /*
378      * streamer thread is the only thread that opens streams to datanode,
379      * and closes them. Any error recovery is also done by this thread.
380      */
381     @Override
run()382     public void run() {
383       long lastPacket = Time.monotonicNow();
384       TraceScope scope = NullScope.INSTANCE;
385       while (!streamerClosed && dfsClient.clientRunning) {
386         // if the Responder encountered an error, shutdown Responder
387         if (hasError && response != null) {
388           try {
389             response.close();
390             response.join();
391             response = null;
392           } catch (InterruptedException  e) {
393             DFSClient.LOG.warn("Caught exception ", e);
394           }
395         }
396 
397         DFSPacket one;
398         try {
399           // process datanode IO errors if any
400           boolean doSleep = false;
401           if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
402             doSleep = processDatanodeError();
403           }
404 
405           synchronized (dataQueue) {
406             // wait for a packet to be sent.
407             long now = Time.monotonicNow();
408             while ((!streamerClosed && !hasError && dfsClient.clientRunning
409                 && dataQueue.size() == 0 &&
410                 (stage != BlockConstructionStage.DATA_STREAMING ||
411                  stage == BlockConstructionStage.DATA_STREAMING &&
412                  now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
413               long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
414               timeout = timeout <= 0 ? 1000 : timeout;
415               timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
416                  timeout : 1000;
417               try {
418                 dataQueue.wait(timeout);
419               } catch (InterruptedException  e) {
420                 DFSClient.LOG.warn("Caught exception ", e);
421               }
422               doSleep = false;
423               now = Time.monotonicNow();
424             }
425             if (streamerClosed || hasError || !dfsClient.clientRunning) {
426               continue;
427             }
428             // get packet to be sent.
429             if (dataQueue.isEmpty()) {
430               one = createHeartbeatPacket();
431               assert one != null;
432             } else {
433               one = dataQueue.getFirst(); // regular data packet
434               long parents[] = one.getTraceParents();
435               if (parents.length > 0) {
436                 scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
437                 // TODO: use setParents API once it's available from HTrace 3.2
438 //                scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
439 //                scope.getSpan().setParents(parents);
440               }
441             }
442           }
443 
444           // get new block from namenode.
445           if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
446             if(DFSClient.LOG.isDebugEnabled()) {
447               DFSClient.LOG.debug("Allocating new block");
448             }
449             setPipeline(nextBlockOutputStream());
450             initDataStreaming();
451           } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
452             if(DFSClient.LOG.isDebugEnabled()) {
453               DFSClient.LOG.debug("Append to block " + block);
454             }
455             setupPipelineForAppendOrRecovery();
456             initDataStreaming();
457           }
458 
459           long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
460           if (lastByteOffsetInBlock > blockSize) {
461             throw new IOException("BlockSize " + blockSize +
462                 " is smaller than data size. " +
463                 " Offset of packet in block " +
464                 lastByteOffsetInBlock +
465                 " Aborting file " + src);
466           }
467 
468           if (one.isLastPacketInBlock()) {
469             // wait for all data packets have been successfully acked
470             synchronized (dataQueue) {
471               while (!streamerClosed && !hasError &&
472                   ackQueue.size() != 0 && dfsClient.clientRunning) {
473                 try {
474                   // wait for acks to arrive from datanodes
475                   dataQueue.wait(1000);
476                 } catch (InterruptedException  e) {
477                   DFSClient.LOG.warn("Caught exception ", e);
478                 }
479               }
480             }
481             if (streamerClosed || hasError || !dfsClient.clientRunning) {
482               continue;
483             }
484             stage = BlockConstructionStage.PIPELINE_CLOSE;
485           }
486 
487           // send the packet
488           Span span = null;
489           synchronized (dataQueue) {
490             // move packet from dataQueue to ackQueue
491             if (!one.isHeartbeatPacket()) {
492               span = scope.detach();
493               one.setTraceSpan(span);
494               dataQueue.removeFirst();
495               ackQueue.addLast(one);
496               dataQueue.notifyAll();
497             }
498           }
499 
500           if (DFSClient.LOG.isDebugEnabled()) {
501             DFSClient.LOG.debug("DataStreamer block " + block +
502                 " sending packet " + one);
503           }
504 
505           // write out data to remote datanode
506           TraceScope writeScope = Trace.startSpan("writeTo", span);
507           try {
508             one.writeTo(blockStream);
509             blockStream.flush();
510           } catch (IOException e) {
511             // HDFS-3398 treat primary DN is down since client is unable to
512             // write to primary DN. If a failed or restarting node has already
513             // been recorded by the responder, the following call will have no
514             // effect. Pipeline recovery can handle only one node error at a
515             // time. If the primary node fails again during the recovery, it
516             // will be taken out then.
517             tryMarkPrimaryDatanodeFailed();
518             throw e;
519           } finally {
520             writeScope.close();
521           }
522           lastPacket = Time.monotonicNow();
523 
524           // update bytesSent
525           long tmpBytesSent = one.getLastByteOffsetBlock();
526           if (bytesSent < tmpBytesSent) {
527             bytesSent = tmpBytesSent;
528           }
529 
530           if (streamerClosed || hasError || !dfsClient.clientRunning) {
531             continue;
532           }
533 
534           // Is this block full?
535           if (one.isLastPacketInBlock()) {
536             // wait for the close packet has been acked
537             synchronized (dataQueue) {
538               while (!streamerClosed && !hasError &&
539                   ackQueue.size() != 0 && dfsClient.clientRunning) {
540                 dataQueue.wait(1000);// wait for acks to arrive from datanodes
541               }
542             }
543             if (streamerClosed || hasError || !dfsClient.clientRunning) {
544               continue;
545             }
546 
547             endBlock();
548           }
549           if (progress != null) { progress.progress(); }
550 
551           // This is used by unit test to trigger race conditions.
552           if (artificialSlowdown != 0 && dfsClient.clientRunning) {
553             Thread.sleep(artificialSlowdown);
554           }
555         } catch (Throwable e) {
556           // Log warning if there was a real error.
557           if (restartingNodeIndex.get() == -1) {
558             DFSClient.LOG.warn("DataStreamer Exception", e);
559           }
560           if (e instanceof IOException) {
561             setLastException((IOException)e);
562           } else {
563             setLastException(new IOException("DataStreamer Exception: ",e));
564           }
565           hasError = true;
566           if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
567             // Not a datanode issue
568             streamerClosed = true;
569           }
570         } finally {
571           scope.close();
572         }
573       }
574       closeInternal();
575     }
576 
closeInternal()577     private void closeInternal() {
578       closeResponder();       // close and join
579       closeStream();
580       streamerClosed = true;
581       setClosed();
582       synchronized (dataQueue) {
583         dataQueue.notifyAll();
584       }
585     }
586 
587     /*
588      * close both streamer and DFSOutputStream, should be called only
589      * by an external thread and only after all data to be sent has
590      * been flushed to datanode.
591      *
592      * Interrupt this data streamer if force is true
593      *
594      * @param force if this data stream is forced to be closed
595      */
close(boolean force)596     void close(boolean force) {
597       streamerClosed = true;
598       synchronized (dataQueue) {
599         dataQueue.notifyAll();
600       }
601       if (force) {
602         this.interrupt();
603       }
604     }
605 
closeResponder()606     private void closeResponder() {
607       if (response != null) {
608         try {
609           response.close();
610           response.join();
611         } catch (InterruptedException  e) {
612           DFSClient.LOG.warn("Caught exception ", e);
613         } finally {
614           response = null;
615         }
616       }
617     }
618 
closeStream()619     private void closeStream() {
620       if (blockStream != null) {
621         try {
622           blockStream.close();
623         } catch (IOException e) {
624           setLastException(e);
625         } finally {
626           blockStream = null;
627         }
628       }
629       if (blockReplyStream != null) {
630         try {
631           blockReplyStream.close();
632         } catch (IOException e) {
633           setLastException(e);
634         } finally {
635           blockReplyStream = null;
636         }
637       }
638       if (null != s) {
639         try {
640           s.close();
641         } catch (IOException e) {
642           setLastException(e);
643         } finally {
644           s = null;
645         }
646       }
647     }
648 
649     // The following synchronized methods are used whenever
650     // errorIndex or restartingNodeIndex is set. This is because
651     // check & set needs to be atomic. Simply reading variables
652     // does not require a synchronization. When responder is
653     // not running (e.g. during pipeline recovery), there is no
654     // need to use these methods.
655 
656     /** Set the error node index. Called by responder */
setErrorIndex(int idx)657     synchronized void setErrorIndex(int idx) {
658       errorIndex = idx;
659     }
660 
661     /** Set the restarting node index. Called by responder */
setRestartingNodeIndex(int idx)662     synchronized void setRestartingNodeIndex(int idx) {
663       restartingNodeIndex.set(idx);
664       // If the data streamer has already set the primary node
665       // bad, clear it. It is likely that the write failed due to
666       // the DN shutdown. Even if it was a real failure, the pipeline
667       // recovery will take care of it.
668       errorIndex = -1;
669     }
670 
671     /**
672      * This method is used when no explicit error report was received,
673      * but something failed. When the primary node is a suspect or
674      * unsure about the cause, the primary node is marked as failed.
675      */
tryMarkPrimaryDatanodeFailed()676     synchronized void tryMarkPrimaryDatanodeFailed() {
677       // There should be no existing error and no ongoing restart.
678       if ((errorIndex == -1) && (restartingNodeIndex.get() == -1)) {
679         errorIndex = 0;
680       }
681     }
682 
683     /**
684      * Examine whether it is worth waiting for a node to restart.
685      * @param index the node index
686      */
shouldWaitForRestart(int index)687     boolean shouldWaitForRestart(int index) {
688       // Only one node in the pipeline.
689       if (nodes.length == 1) {
690         return true;
691       }
692 
693       // Is it a local node?
694       InetAddress addr = null;
695       try {
696         addr = InetAddress.getByName(nodes[index].getIpAddr());
697       } catch (java.net.UnknownHostException e) {
698         // we are passing an ip address. this should not happen.
699         assert false;
700       }
701 
702       if (addr != null && NetUtils.isLocalAddress(addr)) {
703         return true;
704       }
705       return false;
706     }
707 
708     //
709     // Processes responses from the datanodes.  A packet is removed
710     // from the ackQueue when its response arrives.
711     //
712     private class ResponseProcessor extends Daemon {
713 
714       private volatile boolean responderClosed = false;
715       private DatanodeInfo[] targets = null;
716       private boolean isLastPacketInBlock = false;
717 
ResponseProcessor(DatanodeInfo[] targets)718       ResponseProcessor (DatanodeInfo[] targets) {
719         this.targets = targets;
720       }
721 
722       @Override
run()723       public void run() {
724 
725         setName("ResponseProcessor for block " + block);
726         PipelineAck ack = new PipelineAck();
727 
728         TraceScope scope = NullScope.INSTANCE;
729         while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
730           // process responses from datanodes.
731           try {
732             // read an ack from the pipeline
733             long begin = Time.monotonicNow();
734             ack.readFields(blockReplyStream);
735             long duration = Time.monotonicNow() - begin;
736             if (duration > dfsclientSlowLogThresholdMs
737                 && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
738               DFSClient.LOG
739                   .warn("Slow ReadProcessor read fields took " + duration
740                       + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
741                       + ack + ", targets: " + Arrays.asList(targets));
742             } else if (DFSClient.LOG.isDebugEnabled()) {
743               DFSClient.LOG.debug("DFSClient " + ack);
744             }
745 
746             long seqno = ack.getSeqno();
747             // processes response status from datanodes.
748             for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
749               final Status reply = PipelineAck.getStatusFromHeader(ack
750                 .getHeaderFlag(i));
751               // Restart will not be treated differently unless it is
752               // the local node or the only one in the pipeline.
753               if (PipelineAck.isRestartOOBStatus(reply) &&
754                   shouldWaitForRestart(i)) {
755                 restartDeadline = dfsClient.getConf().datanodeRestartTimeout
756                     + Time.monotonicNow();
757                 setRestartingNodeIndex(i);
758                 String message = "A datanode is restarting: " + targets[i];
759                 DFSClient.LOG.info(message);
760                throw new IOException(message);
761               }
762               // node error
763               if (reply != SUCCESS) {
764                 setErrorIndex(i); // first bad datanode
765                 throw new IOException("Bad response " + reply +
766                     " for block " + block +
767                     " from datanode " +
768                     targets[i]);
769               }
770             }
771 
772             assert seqno != PipelineAck.UNKOWN_SEQNO :
773               "Ack for unknown seqno should be a failed ack: " + ack;
774             if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
775               continue;
776             }
777 
778             // a success ack for a data packet
779             DFSPacket one;
780             synchronized (dataQueue) {
781               one = ackQueue.getFirst();
782             }
783             if (one.getSeqno() != seqno) {
784               throw new IOException("ResponseProcessor: Expecting seqno " +
785                                     " for block " + block +
786                                     one.getSeqno() + " but received " + seqno);
787             }
788             isLastPacketInBlock = one.isLastPacketInBlock();
789 
790             // Fail the packet write for testing in order to force a
791             // pipeline recovery.
792             if (DFSClientFaultInjector.get().failPacket() &&
793                 isLastPacketInBlock) {
794               failPacket = true;
795               throw new IOException(
796                     "Failing the last packet for testing.");
797             }
798 
799             // update bytesAcked
800             block.setNumBytes(one.getLastByteOffsetBlock());
801 
802             synchronized (dataQueue) {
803               scope = Trace.continueSpan(one.getTraceSpan());
804               one.setTraceSpan(null);
805               lastAckedSeqno = seqno;
806               ackQueue.removeFirst();
807               dataQueue.notifyAll();
808 
809               one.releaseBuffer(byteArrayManager);
810             }
811           } catch (Exception e) {
812             if (!responderClosed) {
813               if (e instanceof IOException) {
814                 setLastException((IOException)e);
815               }
816               hasError = true;
817               // If no explicit error report was received, mark the primary
818               // node as failed.
819               tryMarkPrimaryDatanodeFailed();
820               synchronized (dataQueue) {
821                 dataQueue.notifyAll();
822               }
823               if (restartingNodeIndex.get() == -1) {
824                 DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
825                      + " for block " + block, e);
826               }
827               responderClosed = true;
828             }
829           } finally {
830             scope.close();
831           }
832         }
833       }
834 
close()835       void close() {
836         responderClosed = true;
837         this.interrupt();
838       }
839     }
840 
841     // If this stream has encountered any errors so far, shutdown
842     // threads and mark stream as closed. Returns true if we should
843     // sleep for a while after returning from this call.
844     //
processDatanodeError()845     private boolean processDatanodeError() throws IOException {
846       if (response != null) {
847         DFSClient.LOG.info("Error Recovery for " + block +
848         " waiting for responder to exit. ");
849         return true;
850       }
851       closeStream();
852 
853       // move packets from ack queue to front of the data queue
854       synchronized (dataQueue) {
855         dataQueue.addAll(0, ackQueue);
856         ackQueue.clear();
857       }
858 
859       // Record the new pipeline failure recovery.
860       if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) {
861          lastAckedSeqnoBeforeFailure = lastAckedSeqno;
862          pipelineRecoveryCount = 1;
863       } else {
864         // If we had to recover the pipeline five times in a row for the
865         // same packet, this client likely has corrupt data or corrupting
866         // during transmission.
867         if (++pipelineRecoveryCount > 5) {
868           DFSClient.LOG.warn("Error recovering pipeline for writing " +
869               block + ". Already retried 5 times for the same packet.");
870           lastException.set(new IOException("Failing write. Tried pipeline " +
871               "recovery 5 times without success."));
872           streamerClosed = true;
873           return false;
874         }
875       }
876       boolean doSleep = setupPipelineForAppendOrRecovery();
877 
878       if (!streamerClosed && dfsClient.clientRunning) {
879         if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
880 
881           // If we had an error while closing the pipeline, we go through a fast-path
882           // where the BlockReceiver does not run. Instead, the DataNode just finalizes
883           // the block immediately during the 'connect ack' process. So, we want to pull
884           // the end-of-block packet from the dataQueue, since we don't actually have
885           // a true pipeline to send it over.
886           //
887           // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that
888           // a client waiting on close() will be aware that the flush finished.
889           synchronized (dataQueue) {
890             DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block packet
891             Span span = endOfBlockPacket.getTraceSpan();
892             if (span != null) {
893               // Close any trace span associated with this Packet
894               TraceScope scope = Trace.continueSpan(span);
895               scope.close();
896             }
897             assert endOfBlockPacket.isLastPacketInBlock();
898             assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
899             lastAckedSeqno = endOfBlockPacket.getSeqno();
900             dataQueue.notifyAll();
901           }
902           endBlock();
903         } else {
904           initDataStreaming();
905         }
906       }
907 
908       return doSleep;
909     }
910 
setHflush()911     private void setHflush() {
912       isHflushed = true;
913     }
914 
findNewDatanode(final DatanodeInfo[] original )915     private int findNewDatanode(final DatanodeInfo[] original
916         ) throws IOException {
917       if (nodes.length != original.length + 1) {
918         throw new IOException(
919             new StringBuilder()
920             .append("Failed to replace a bad datanode on the existing pipeline ")
921             .append("due to no more good datanodes being available to try. ")
922             .append("(Nodes: current=").append(Arrays.asList(nodes))
923             .append(", original=").append(Arrays.asList(original)).append("). ")
924             .append("The current failed datanode replacement policy is ")
925             .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ")
926             .append("a client may configure this via '")
927             .append(DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY)
928             .append("' in its configuration.")
929             .toString());
930       }
931       for(int i = 0; i < nodes.length; i++) {
932         int j = 0;
933         for(; j < original.length && !nodes[i].equals(original[j]); j++);
934         if (j == original.length) {
935           return i;
936         }
937       }
938       throw new IOException("Failed: new datanode not found: nodes="
939           + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
940     }
941 
addDatanode2ExistingPipeline()942     private void addDatanode2ExistingPipeline() throws IOException {
943       if (DataTransferProtocol.LOG.isDebugEnabled()) {
944         DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
945       }
946       /*
947        * Is data transfer necessary?  We have the following cases.
948        *
949        * Case 1: Failure in Pipeline Setup
950        * - Append
951        *    + Transfer the stored replica, which may be a RBW or a finalized.
952        * - Create
953        *    + If no data, then no transfer is required.
954        *    + If there are data written, transfer RBW. This case may happens
955        *      when there are streaming failure earlier in this pipeline.
956        *
957        * Case 2: Failure in Streaming
958        * - Append/Create:
959        *    + transfer RBW
960        *
961        * Case 3: Failure in Close
962        * - Append/Create:
963        *    + no transfer, let NameNode replicates the block.
964        */
965       if (!isAppend && lastAckedSeqno < 0
966           && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
967         //no data have been written
968         return;
969       } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
970           || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
971         //pipeline is closing
972         return;
973       }
974 
975       int tried = 0;
976       final DatanodeInfo[] original = nodes;
977       final StorageType[] originalTypes = storageTypes;
978       final String[] originalIDs = storageIDs;
979       IOException caughtException = null;
980       ArrayList<DatanodeInfo> exclude = new ArrayList<DatanodeInfo>(failed);
981       while (tried < 3) {
982         LocatedBlock lb;
983         //get a new datanode
984         lb = dfsClient.namenode.getAdditionalDatanode(
985             src, fileId, block, nodes, storageIDs,
986             exclude.toArray(new DatanodeInfo[exclude.size()]),
987             1, dfsClient.clientName);
988         // a new node was allocated by the namenode. Update nodes.
989         setPipeline(lb);
990 
991         //find the new datanode
992         final int d = findNewDatanode(original);
993         //transfer replica. pick a source from the original nodes
994         final DatanodeInfo src = original[tried % original.length];
995         final DatanodeInfo[] targets = {nodes[d]};
996         final StorageType[] targetStorageTypes = {storageTypes[d]};
997 
998         try {
999           transfer(src, targets, targetStorageTypes, lb.getBlockToken());
1000         } catch (IOException ioe) {
1001           DFSClient.LOG.warn("Error transferring data from " + src + " to " +
1002               nodes[d] + ": " + ioe.getMessage());
1003           caughtException = ioe;
1004           // add the allocated node to the exclude list.
1005           exclude.add(nodes[d]);
1006           setPipeline(original, originalTypes, originalIDs);
1007           tried++;
1008           continue;
1009         }
1010         return; // finished successfully
1011       }
1012       // All retries failed
1013       throw (caughtException != null) ? caughtException :
1014          new IOException("Failed to add a node");
1015     }
1016 
transfer(final DatanodeInfo src, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final Token<BlockTokenIdentifier> blockToken)1017     private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
1018         final StorageType[] targetStorageTypes,
1019         final Token<BlockTokenIdentifier> blockToken) throws IOException {
1020       //transfer replica to the new datanode
1021       Socket sock = null;
1022       DataOutputStream out = null;
1023       DataInputStream in = null;
1024       try {
1025         sock = createSocketForPipeline(src, 2, dfsClient);
1026         final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
1027 
1028         // transfer timeout multiplier based on the transfer size
1029         // One per 200 packets = 12.8MB. Minimum is 2.
1030         int multi = 2 + (int)(bytesSent/dfsClient.getConf().writePacketSize)/200;
1031         final long readTimeout = dfsClient.getDatanodeReadTimeout(multi);
1032 
1033         OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout);
1034         InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout);
1035         IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock,
1036           unbufOut, unbufIn, dfsClient, blockToken, src);
1037         unbufOut = saslStreams.out;
1038         unbufIn = saslStreams.in;
1039         out = new DataOutputStream(new BufferedOutputStream(unbufOut,
1040             HdfsConstants.SMALL_BUFFER_SIZE));
1041         in = new DataInputStream(unbufIn);
1042 
1043         //send the TRANSFER_BLOCK request
1044         new Sender(out).transferBlock(block, blockToken, dfsClient.clientName,
1045             targets, targetStorageTypes);
1046         out.flush();
1047 
1048         //ack
1049         BlockOpResponseProto response =
1050           BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
1051         if (SUCCESS != response.getStatus()) {
1052           throw new IOException("Failed to add a datanode");
1053         }
1054       } finally {
1055         IOUtils.closeStream(in);
1056         IOUtils.closeStream(out);
1057         IOUtils.closeSocket(sock);
1058       }
1059     }
1060 
1061     /**
1062      * Open a DataOutputStream to a DataNode pipeline so that
1063      * it can be written to.
1064      * This happens when a file is appended or data streaming fails
1065      * It keeps on trying until a pipeline is setup
1066      */
setupPipelineForAppendOrRecovery()1067     private boolean setupPipelineForAppendOrRecovery() throws IOException {
1068       // check number of datanodes
1069       if (nodes == null || nodes.length == 0) {
1070         String msg = "Could not get block locations. " + "Source file \""
1071             + src + "\" - Aborting...";
1072         DFSClient.LOG.warn(msg);
1073         setLastException(new IOException(msg));
1074         streamerClosed = true;
1075         return false;
1076       }
1077 
1078       boolean success = false;
1079       long newGS = 0L;
1080       while (!success && !streamerClosed && dfsClient.clientRunning) {
1081         // Sleep before reconnect if a dn is restarting.
1082         // This process will be repeated until the deadline or the datanode
1083         // starts back up.
1084         if (restartingNodeIndex.get() >= 0) {
1085           // 4 seconds or the configured deadline period, whichever is shorter.
1086           // This is the retry interval and recovery will be retried in this
1087           // interval until timeout or success.
1088           long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
1089               4000L);
1090           try {
1091             Thread.sleep(delay);
1092           } catch (InterruptedException ie) {
1093             lastException.set(new IOException("Interrupted while waiting for " +
1094                 "datanode to restart. " + nodes[restartingNodeIndex.get()]));
1095             streamerClosed = true;
1096             return false;
1097           }
1098         }
1099         boolean isRecovery = hasError;
1100         // remove bad datanode from list of datanodes.
1101         // If errorIndex was not set (i.e. appends), then do not remove
1102         // any datanodes
1103         //
1104         if (errorIndex >= 0) {
1105           StringBuilder pipelineMsg = new StringBuilder();
1106           for (int j = 0; j < nodes.length; j++) {
1107             pipelineMsg.append(nodes[j]);
1108             if (j < nodes.length - 1) {
1109               pipelineMsg.append(", ");
1110             }
1111           }
1112           if (nodes.length <= 1) {
1113             lastException.set(new IOException("All datanodes " + pipelineMsg
1114                 + " are bad. Aborting..."));
1115             streamerClosed = true;
1116             return false;
1117           }
1118           DFSClient.LOG.warn("Error Recovery for block " + block +
1119               " in pipeline " + pipelineMsg +
1120               ": bad datanode " + nodes[errorIndex]);
1121           failed.add(nodes[errorIndex]);
1122 
1123           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
1124           arraycopy(nodes, newnodes, errorIndex);
1125 
1126           final StorageType[] newStorageTypes = new StorageType[newnodes.length];
1127           arraycopy(storageTypes, newStorageTypes, errorIndex);
1128 
1129           final String[] newStorageIDs = new String[newnodes.length];
1130           arraycopy(storageIDs, newStorageIDs, errorIndex);
1131 
1132           setPipeline(newnodes, newStorageTypes, newStorageIDs);
1133 
1134           // Just took care of a node error while waiting for a node restart
1135           if (restartingNodeIndex.get() >= 0) {
1136             // If the error came from a node further away than the restarting
1137             // node, the restart must have been complete.
1138             if (errorIndex > restartingNodeIndex.get()) {
1139               restartingNodeIndex.set(-1);
1140             } else if (errorIndex < restartingNodeIndex.get()) {
1141               // the node index has shifted.
1142               restartingNodeIndex.decrementAndGet();
1143             } else {
1144               // this shouldn't happen...
1145               assert false;
1146             }
1147           }
1148 
1149           if (restartingNodeIndex.get() == -1) {
1150             hasError = false;
1151           }
1152           lastException.set(null);
1153           errorIndex = -1;
1154         }
1155 
1156         // Check if replace-datanode policy is satisfied.
1157         if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
1158             nodes, isAppend, isHflushed)) {
1159           try {
1160             addDatanode2ExistingPipeline();
1161           } catch(IOException ioe) {
1162             if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) {
1163               throw ioe;
1164             }
1165             DFSClient.LOG.warn("Failed to replace datanode."
1166                 + " Continue with the remaining datanodes since "
1167                 + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_BEST_EFFORT_KEY
1168                 + " is set to true.", ioe);
1169           }
1170         }
1171 
1172         // get a new generation stamp and an access token
1173         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
1174         newGS = lb.getBlock().getGenerationStamp();
1175         accessToken = lb.getBlockToken();
1176 
1177         // set up the pipeline again with the remaining nodes
1178         if (failPacket) { // for testing
1179           success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
1180           failPacket = false;
1181           try {
1182             // Give DNs time to send in bad reports. In real situations,
1183             // good reports should follow bad ones, if client committed
1184             // with those nodes.
1185             Thread.sleep(2000);
1186           } catch (InterruptedException ie) {}
1187         } else {
1188           success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
1189         }
1190 
1191         if (restartingNodeIndex.get() >= 0) {
1192           assert hasError == true;
1193           // check errorIndex set above
1194           if (errorIndex == restartingNodeIndex.get()) {
1195             // ignore, if came from the restarting node
1196             errorIndex = -1;
1197           }
1198           // still within the deadline
1199           if (Time.monotonicNow() < restartDeadline) {
1200             continue; // with in the deadline
1201           }
1202           // expired. declare the restarting node dead
1203           restartDeadline = 0;
1204           int expiredNodeIndex = restartingNodeIndex.get();
1205           restartingNodeIndex.set(-1);
1206           DFSClient.LOG.warn("Datanode did not restart in time: " +
1207               nodes[expiredNodeIndex]);
1208           // Mark the restarting node as failed. If there is any other failed
1209           // node during the last pipeline construction attempt, it will not be
1210           // overwritten/dropped. In this case, the restarting node will get
1211           // excluded in the following attempt, if it still does not come up.
1212           if (errorIndex == -1) {
1213             errorIndex = expiredNodeIndex;
1214           }
1215           // From this point on, normal pipeline recovery applies.
1216         }
1217       } // while
1218 
1219       if (success) {
1220         // update pipeline at the namenode
1221         ExtendedBlock newBlock = new ExtendedBlock(
1222             block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
1223         dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
1224             nodes, storageIDs);
1225         // update client side generation stamp
1226         block = newBlock;
1227       }
1228       return false; // do not sleep, continue processing
1229     }
1230 
1231     /**
1232      * Open a DataOutputStream to a DataNode so that it can be written to.
1233      * This happens when a file is created and each time a new block is allocated.
1234      * Must get block ID and the IDs of the destinations from the namenode.
1235      * Returns the list of target datanodes.
1236      */
nextBlockOutputStream()1237     private LocatedBlock nextBlockOutputStream() throws IOException {
1238       LocatedBlock lb = null;
1239       DatanodeInfo[] nodes = null;
1240       StorageType[] storageTypes = null;
1241       int count = dfsClient.getConf().nBlockWriteRetry;
1242       boolean success = false;
1243       ExtendedBlock oldBlock = block;
1244       do {
1245         hasError = false;
1246         lastException.set(null);
1247         errorIndex = -1;
1248         success = false;
1249 
1250         DatanodeInfo[] excluded =
1251             excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
1252             .keySet()
1253             .toArray(new DatanodeInfo[0]);
1254         block = oldBlock;
1255         lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
1256         block = lb.getBlock();
1257         block.setNumBytes(0);
1258         bytesSent = 0;
1259         accessToken = lb.getBlockToken();
1260         nodes = lb.getLocations();
1261         storageTypes = lb.getStorageTypes();
1262 
1263         //
1264         // Connect to first DataNode in the list.
1265         //
1266         success = createBlockOutputStream(nodes, storageTypes, 0L, false);
1267 
1268         if (!success) {
1269           DFSClient.LOG.info("Abandoning " + block);
1270           dfsClient.namenode.abandonBlock(block, fileId, src,
1271               dfsClient.clientName);
1272           block = null;
1273           DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
1274           excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
1275         }
1276       } while (!success && --count >= 0);
1277 
1278       if (!success) {
1279         throw new IOException("Unable to create new block.");
1280       }
1281       return lb;
1282     }
1283 
1284     // connects to the first datanode in the pipeline
1285     // Returns true if success, otherwise return failure.
1286     //
createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag)1287     private boolean createBlockOutputStream(DatanodeInfo[] nodes,
1288         StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) {
1289       if (nodes.length == 0) {
1290         DFSClient.LOG.info("nodes are empty for write pipeline of block "
1291             + block);
1292         return false;
1293       }
1294       Status pipelineStatus = SUCCESS;
1295       String firstBadLink = "";
1296       boolean checkRestart = false;
1297       if (DFSClient.LOG.isDebugEnabled()) {
1298         for (int i = 0; i < nodes.length; i++) {
1299           DFSClient.LOG.debug("pipeline = " + nodes[i]);
1300         }
1301       }
1302 
1303       // persist blocks on namenode on next flush
1304       persistBlocks.set(true);
1305 
1306       int refetchEncryptionKey = 1;
1307       while (true) {
1308         boolean result = false;
1309         DataOutputStream out = null;
1310         try {
1311           assert null == s : "Previous socket unclosed";
1312           assert null == blockReplyStream : "Previous blockReplyStream unclosed";
1313           s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
1314           long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
1315 
1316           OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
1317           InputStream unbufIn = NetUtils.getInputStream(s);
1318           IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
1319             unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
1320           unbufOut = saslStreams.out;
1321           unbufIn = saslStreams.in;
1322           out = new DataOutputStream(new BufferedOutputStream(unbufOut,
1323               HdfsConstants.SMALL_BUFFER_SIZE));
1324           blockReplyStream = new DataInputStream(unbufIn);
1325 
1326           //
1327           // Xmit header info to datanode
1328           //
1329 
1330           BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage;
1331 
1332           // We cannot change the block length in 'block' as it counts the number
1333           // of bytes ack'ed.
1334           ExtendedBlock blockCopy = new ExtendedBlock(block);
1335           blockCopy.setNumBytes(blockSize);
1336 
1337           boolean[] targetPinnings = getPinnings(nodes, true);
1338           // send the request
1339           new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
1340               dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
1341               nodes.length, block.getNumBytes(), bytesSent, newGS,
1342               checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
1343             (targetPinnings == null ? false : targetPinnings[0]), targetPinnings);
1344 
1345           // receive ack for connect
1346           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
1347               PBHelper.vintPrefixed(blockReplyStream));
1348           pipelineStatus = resp.getStatus();
1349           firstBadLink = resp.getFirstBadLink();
1350 
1351           // Got an restart OOB ack.
1352           // If a node is already restarting, this status is not likely from
1353           // the same node. If it is from a different node, it is not
1354           // from the local datanode. Thus it is safe to treat this as a
1355           // regular node error.
1356           if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
1357             restartingNodeIndex.get() == -1) {
1358             checkRestart = true;
1359             throw new IOException("A datanode is restarting.");
1360           }
1361 
1362           String logInfo = "ack with firstBadLink as " + firstBadLink;
1363           DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
1364 
1365           assert null == blockStream : "Previous blockStream unclosed";
1366           blockStream = out;
1367           result =  true; // success
1368           restartingNodeIndex.set(-1);
1369           hasError = false;
1370         } catch (IOException ie) {
1371           if (restartingNodeIndex.get() == -1) {
1372             DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
1373           }
1374           if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
1375             DFSClient.LOG.info("Will fetch a new encryption key and retry, "
1376                 + "encryption key was invalid when connecting to "
1377                 + nodes[0] + " : " + ie);
1378             // The encryption key used is invalid.
1379             refetchEncryptionKey--;
1380             dfsClient.clearDataEncryptionKey();
1381             // Don't close the socket/exclude this node just yet. Try again with
1382             // a new encryption key.
1383             continue;
1384           }
1385 
1386           // find the datanode that matches
1387           if (firstBadLink.length() != 0) {
1388             for (int i = 0; i < nodes.length; i++) {
1389               // NB: Unconditionally using the xfer addr w/o hostname
1390               if (firstBadLink.equals(nodes[i].getXferAddr())) {
1391                 errorIndex = i;
1392                 break;
1393               }
1394             }
1395           } else {
1396             assert checkRestart == false;
1397             errorIndex = 0;
1398           }
1399           // Check whether there is a restart worth waiting for.
1400           if (checkRestart && shouldWaitForRestart(errorIndex)) {
1401             restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
1402                 Time.monotonicNow();
1403             restartingNodeIndex.set(errorIndex);
1404             errorIndex = -1;
1405             DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
1406                 nodes[restartingNodeIndex.get()]);
1407           }
1408           hasError = true;
1409           setLastException(ie);
1410           result =  false;  // error
1411         } finally {
1412           if (!result) {
1413             IOUtils.closeSocket(s);
1414             s = null;
1415             IOUtils.closeStream(out);
1416             out = null;
1417             IOUtils.closeStream(blockReplyStream);
1418             blockReplyStream = null;
1419           }
1420         }
1421         return result;
1422       }
1423     }
1424 
getPinnings(DatanodeInfo[] nodes, boolean shouldLog)1425     private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
1426       if (favoredNodes == null) {
1427         return null;
1428       } else {
1429         boolean[] pinnings = new boolean[nodes.length];
1430         HashSet<String> favoredSet =
1431             new HashSet<String>(Arrays.asList(favoredNodes));
1432         for (int i = 0; i < nodes.length; i++) {
1433           pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname());
1434           if (DFSClient.LOG.isDebugEnabled()) {
1435             DFSClient.LOG.debug(nodes[i].getXferAddrWithHostname() +
1436                 " was chosen by name node (favored=" + pinnings[i] +
1437                 ").");
1438           }
1439         }
1440         if (shouldLog && !favoredSet.isEmpty()) {
1441           // There is one or more favored nodes that were not allocated.
1442           DFSClient.LOG.warn(
1443               "These favored nodes were specified but not chosen: " +
1444               favoredSet +
1445               " Specified favored nodes: " + Arrays.toString(favoredNodes));
1446 
1447         }
1448         return pinnings;
1449       }
1450     }
1451 
locateFollowingBlock(DatanodeInfo[] excludedNodes)1452     private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)  throws IOException {
1453       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
1454       long sleeptime = 400;
1455       while (true) {
1456         long localstart = Time.monotonicNow();
1457         while (true) {
1458           try {
1459             return dfsClient.namenode.addBlock(src, dfsClient.clientName,
1460                 block, excludedNodes, fileId, favoredNodes);
1461           } catch (RemoteException e) {
1462             IOException ue =
1463               e.unwrapRemoteException(FileNotFoundException.class,
1464                                       AccessControlException.class,
1465                                       NSQuotaExceededException.class,
1466                                       DSQuotaExceededException.class,
1467                                       UnresolvedPathException.class);
1468             if (ue != e) {
1469               throw ue; // no need to retry these exceptions
1470             }
1471 
1472 
1473             if (NotReplicatedYetException.class.getName().
1474                 equals(e.getClassName())) {
1475               if (retries == 0) {
1476                 throw e;
1477               } else {
1478                 --retries;
1479                 DFSClient.LOG.info("Exception while adding a block", e);
1480                 long elapsed = Time.monotonicNow() - localstart;
1481                 if (elapsed > 5000) {
1482                   DFSClient.LOG.info("Waiting for replication for "
1483                       + (elapsed / 1000) + " seconds");
1484                 }
1485                 try {
1486                   DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
1487                       + " retries left " + retries);
1488                   Thread.sleep(sleeptime);
1489                   sleeptime *= 2;
1490                 } catch (InterruptedException ie) {
1491                   DFSClient.LOG.warn("Caught exception ", ie);
1492                 }
1493               }
1494             } else {
1495               throw e;
1496             }
1497 
1498           }
1499         }
1500       }
1501     }
1502 
getBlock()1503     ExtendedBlock getBlock() {
1504       return block;
1505     }
1506 
getNodes()1507     DatanodeInfo[] getNodes() {
1508       return nodes;
1509     }
1510 
getBlockToken()1511     Token<BlockTokenIdentifier> getBlockToken() {
1512       return accessToken;
1513     }
1514 
setLastException(IOException e)1515     private void setLastException(IOException e) {
1516       lastException.compareAndSet(null, e);
1517     }
1518   }
1519 
1520   /**
1521    * Create a socket for a write pipeline
1522    * @param first the first datanode
1523    * @param length the pipeline length
1524    * @param client client
1525    * @return the socket connected to the first datanode
1526    */
createSocketForPipeline(final DatanodeInfo first, final int length, final DFSClient client)1527   static Socket createSocketForPipeline(final DatanodeInfo first,
1528       final int length, final DFSClient client) throws IOException {
1529     final String dnAddr = first.getXferAddr(
1530         client.getConf().connectToDnViaHostname);
1531     if (DFSClient.LOG.isDebugEnabled()) {
1532       DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
1533     }
1534     final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
1535     final Socket sock = client.socketFactory.createSocket();
1536     final int timeout = client.getDatanodeReadTimeout(length);
1537     NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), client.getConf().socketTimeout);
1538     sock.setSoTimeout(timeout);
1539     sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
1540     if(DFSClient.LOG.isDebugEnabled()) {
1541       DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
1542     }
1543     return sock;
1544   }
1545 
1546   @Override
checkClosed()1547   protected void checkClosed() throws IOException {
1548     if (isClosed()) {
1549       IOException e = lastException.get();
1550       throw e != null ? e : new ClosedChannelException();
1551     }
1552   }
1553 
1554   //
1555   // returns the list of targets, if any, that is being currently used.
1556   //
1557   @VisibleForTesting
getPipeline()1558   public synchronized DatanodeInfo[] getPipeline() {
1559     if (streamer == null) {
1560       return null;
1561     }
1562     DatanodeInfo[] currentNodes = streamer.getNodes();
1563     if (currentNodes == null) {
1564       return null;
1565     }
1566     DatanodeInfo[] value = new DatanodeInfo[currentNodes.length];
1567     for (int i = 0; i < currentNodes.length; i++) {
1568       value[i] = currentNodes[i];
1569     }
1570     return value;
1571   }
1572 
1573   /**
1574    * @return the object for computing checksum.
1575    *         The type is NULL if checksum is not computed.
1576    */
getChecksum4Compute(DataChecksum checksum, HdfsFileStatus stat)1577   private static DataChecksum getChecksum4Compute(DataChecksum checksum,
1578       HdfsFileStatus stat) {
1579     if (isLazyPersist(stat) && stat.getReplication() == 1) {
1580       // do not compute checksum for writing to single replica to memory
1581       return DataChecksum.newDataChecksum(Type.NULL,
1582           checksum.getBytesPerChecksum());
1583     }
1584     return checksum;
1585   }
1586 
DFSOutputStream(DFSClient dfsClient, String src, Progressable progress, HdfsFileStatus stat, DataChecksum checksum)1587   private DFSOutputStream(DFSClient dfsClient, String src, Progressable progress,
1588       HdfsFileStatus stat, DataChecksum checksum) throws IOException {
1589     super(getChecksum4Compute(checksum, stat));
1590     this.dfsClient = dfsClient;
1591     this.src = src;
1592     this.fileId = stat.getFileId();
1593     this.blockSize = stat.getBlockSize();
1594     this.blockReplication = stat.getReplication();
1595     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
1596     this.progress = progress;
1597     this.cachingStrategy = new AtomicReference<CachingStrategy>(
1598         dfsClient.getDefaultWriteCachingStrategy());
1599     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
1600       DFSClient.LOG.debug(
1601           "Set non-null progress callback on DFSOutputStream " + src);
1602     }
1603 
1604     this.bytesPerChecksum = checksum.getBytesPerChecksum();
1605     if (bytesPerChecksum <= 0) {
1606       throw new HadoopIllegalArgumentException(
1607           "Invalid value: bytesPerChecksum = " + bytesPerChecksum + " <= 0");
1608     }
1609     if (blockSize % bytesPerChecksum != 0) {
1610       throw new HadoopIllegalArgumentException("Invalid values: "
1611           + DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY + " (=" + bytesPerChecksum
1612           + ") must divide block size (=" + blockSize + ").");
1613     }
1614     this.checksum4WriteBlock = checksum;
1615 
1616     this.dfsclientSlowLogThresholdMs =
1617       dfsClient.getConf().dfsclientSlowIoWarningThresholdMs;
1618     this.byteArrayManager = dfsClient.getClientContext().getByteArrayManager();
1619   }
1620 
1621   /** Construct a new output stream for creating a file. */
DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress, DataChecksum checksum, String[] favoredNodes)1622   private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
1623       EnumSet<CreateFlag> flag, Progressable progress,
1624       DataChecksum checksum, String[] favoredNodes) throws IOException {
1625     this(dfsClient, src, progress, stat, checksum);
1626     this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
1627 
1628     computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
1629 
1630     streamer = new DataStreamer(stat, null);
1631     if (favoredNodes != null && favoredNodes.length != 0) {
1632       streamer.setFavoredNodes(favoredNodes);
1633     }
1634   }
1635 
newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes)1636   static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
1637       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
1638       short replication, long blockSize, Progressable progress, int buffersize,
1639       DataChecksum checksum, String[] favoredNodes) throws IOException {
1640     TraceScope scope =
1641         dfsClient.getPathTraceScope("newStreamForCreate", src);
1642     try {
1643       HdfsFileStatus stat = null;
1644 
1645       // Retry the create if we get a RetryStartFileException up to a maximum
1646       // number of times
1647       boolean shouldRetry = true;
1648       int retryCount = CREATE_RETRY_COUNT;
1649       while (shouldRetry) {
1650         shouldRetry = false;
1651         try {
1652           stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
1653               new EnumSetWritable<CreateFlag>(flag), createParent, replication,
1654               blockSize, SUPPORTED_CRYPTO_VERSIONS);
1655           break;
1656         } catch (RemoteException re) {
1657           IOException e = re.unwrapRemoteException(
1658               AccessControlException.class,
1659               DSQuotaExceededException.class,
1660               FileAlreadyExistsException.class,
1661               FileNotFoundException.class,
1662               ParentNotDirectoryException.class,
1663               NSQuotaExceededException.class,
1664               RetryStartFileException.class,
1665               SafeModeException.class,
1666               UnresolvedPathException.class,
1667               SnapshotAccessControlException.class,
1668               UnknownCryptoProtocolVersionException.class);
1669           if (e instanceof RetryStartFileException) {
1670             if (retryCount > 0) {
1671               shouldRetry = true;
1672               retryCount--;
1673             } else {
1674               throw new IOException("Too many retries because of encryption" +
1675                   " zone operations", e);
1676             }
1677           } else {
1678             throw e;
1679           }
1680         }
1681       }
1682       Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
1683       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
1684           flag, progress, checksum, favoredNodes);
1685       out.start();
1686       return out;
1687     } finally {
1688       scope.close();
1689     }
1690   }
1691 
1692   /** Construct a new output stream for append. */
DFSOutputStream(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)1693   private DFSOutputStream(DFSClient dfsClient, String src,
1694       EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
1695       HdfsFileStatus stat, DataChecksum checksum) throws IOException {
1696     this(dfsClient, src, progress, stat, checksum);
1697     initialFileSize = stat.getLen(); // length of file when opened
1698     this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
1699 
1700     boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
1701 
1702     // The last partial block of the file has to be filled.
1703     if (!toNewBlock && lastBlock != null) {
1704       // indicate that we are appending to an existing block
1705       bytesCurBlock = lastBlock.getBlockSize();
1706       streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
1707     } else {
1708       computePacketChunkSize(dfsClient.getConf().writePacketSize,
1709           bytesPerChecksum);
1710       streamer = new DataStreamer(stat,
1711           lastBlock != null ? lastBlock.getBlock() : null);
1712     }
1713     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
1714   }
1715 
newStreamForAppend(DFSClient dfsClient, String src, EnumSet<CreateFlag> flags, int bufferSize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)1716   static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
1717       EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
1718       LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
1719       String[] favoredNodes) throws IOException {
1720     TraceScope scope =
1721         dfsClient.getPathTraceScope("newStreamForAppend", src);
1722     try {
1723       final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
1724           progress, lastBlock, stat, checksum);
1725       if (favoredNodes != null && favoredNodes.length != 0) {
1726         out.streamer.setFavoredNodes(favoredNodes);
1727       }
1728       out.start();
1729       return out;
1730     } finally {
1731       scope.close();
1732     }
1733   }
1734 
isLazyPersist(HdfsFileStatus stat)1735   private static boolean isLazyPersist(HdfsFileStatus stat) {
1736     final BlockStoragePolicy p = blockStoragePolicySuite.getPolicy(
1737         HdfsConstants.MEMORY_STORAGE_POLICY_NAME);
1738     return p != null && stat.getStoragePolicy() == p.getId();
1739   }
1740 
computePacketChunkSize(int psize, int csize)1741   private void computePacketChunkSize(int psize, int csize) {
1742     final int bodySize = psize - PacketHeader.PKT_MAX_HEADER_LEN;
1743     final int chunkSize = csize + getChecksumSize();
1744     chunksPerPacket = Math.max(bodySize/chunkSize, 1);
1745     packetSize = chunkSize*chunksPerPacket;
1746     if (DFSClient.LOG.isDebugEnabled()) {
1747       DFSClient.LOG.debug("computePacketChunkSize: src=" + src +
1748                 ", chunkSize=" + chunkSize +
1749                 ", chunksPerPacket=" + chunksPerPacket +
1750                 ", packetSize=" + packetSize);
1751     }
1752   }
1753 
queueCurrentPacket()1754   private void queueCurrentPacket() {
1755     synchronized (dataQueue) {
1756       if (currentPacket == null) return;
1757       currentPacket.addTraceParent(Trace.currentSpan());
1758       dataQueue.addLast(currentPacket);
1759       lastQueuedSeqno = currentPacket.getSeqno();
1760       if (DFSClient.LOG.isDebugEnabled()) {
1761         DFSClient.LOG.debug("Queued packet " + currentPacket.getSeqno());
1762       }
1763       currentPacket = null;
1764       dataQueue.notifyAll();
1765     }
1766   }
1767 
waitAndQueueCurrentPacket()1768   private void waitAndQueueCurrentPacket() throws IOException {
1769     synchronized (dataQueue) {
1770       try {
1771       // If queue is full, then wait till we have enough space
1772         boolean firstWait = true;
1773         try {
1774           while (!isClosed() && dataQueue.size() + ackQueue.size() >
1775               dfsClient.getConf().writeMaxPackets) {
1776             if (firstWait) {
1777               Span span = Trace.currentSpan();
1778               if (span != null) {
1779                 span.addTimelineAnnotation("dataQueue.wait");
1780               }
1781               firstWait = false;
1782             }
1783             try {
1784               dataQueue.wait();
1785             } catch (InterruptedException e) {
1786               // If we get interrupted while waiting to queue data, we still need to get rid
1787               // of the current packet. This is because we have an invariant that if
1788               // currentPacket gets full, it will get queued before the next writeChunk.
1789               //
1790               // Rather than wait around for space in the queue, we should instead try to
1791               // return to the caller as soon as possible, even though we slightly overrun
1792               // the MAX_PACKETS length.
1793               Thread.currentThread().interrupt();
1794               break;
1795             }
1796           }
1797         } finally {
1798           Span span = Trace.currentSpan();
1799           if ((span != null) && (!firstWait)) {
1800             span.addTimelineAnnotation("end.wait");
1801           }
1802         }
1803         checkClosed();
1804         queueCurrentPacket();
1805       } catch (ClosedChannelException e) {
1806       }
1807     }
1808   }
1809 
1810   // @see FSOutputSummer#writeChunk()
1811   @Override
writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)1812   protected synchronized void writeChunk(byte[] b, int offset, int len,
1813       byte[] checksum, int ckoff, int cklen) throws IOException {
1814     TraceScope scope =
1815         dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
1816     try {
1817       writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
1818     } finally {
1819       scope.close();
1820     }
1821   }
1822 
writeChunkImpl(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen)1823   private synchronized void writeChunkImpl(byte[] b, int offset, int len,
1824           byte[] checksum, int ckoff, int cklen) throws IOException {
1825     dfsClient.checkOpen();
1826     checkClosed();
1827 
1828     if (len > bytesPerChecksum) {
1829       throw new IOException("writeChunk() buffer size is " + len +
1830                             " is larger than supported  bytesPerChecksum " +
1831                             bytesPerChecksum);
1832     }
1833     if (cklen != 0 && cklen != getChecksumSize()) {
1834       throw new IOException("writeChunk() checksum size is supposed to be " +
1835                             getChecksumSize() + " but found to be " + cklen);
1836     }
1837 
1838     if (currentPacket == null) {
1839       currentPacket = createPacket(packetSize, chunksPerPacket,
1840           bytesCurBlock, currentSeqno++, false);
1841       if (DFSClient.LOG.isDebugEnabled()) {
1842         DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" +
1843             currentPacket.getSeqno() +
1844             ", src=" + src +
1845             ", packetSize=" + packetSize +
1846             ", chunksPerPacket=" + chunksPerPacket +
1847             ", bytesCurBlock=" + bytesCurBlock);
1848       }
1849     }
1850 
1851     currentPacket.writeChecksum(checksum, ckoff, cklen);
1852     currentPacket.writeData(b, offset, len);
1853     currentPacket.incNumChunks();
1854     bytesCurBlock += len;
1855 
1856     // If packet is full, enqueue it for transmission
1857     //
1858     if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
1859         bytesCurBlock == blockSize) {
1860       if (DFSClient.LOG.isDebugEnabled()) {
1861         DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
1862             currentPacket.getSeqno() +
1863             ", src=" + src +
1864             ", bytesCurBlock=" + bytesCurBlock +
1865             ", blockSize=" + blockSize +
1866             ", appendChunk=" + appendChunk);
1867       }
1868       waitAndQueueCurrentPacket();
1869 
1870       // If the reopened file did not end at chunk boundary and the above
1871       // write filled up its partial chunk. Tell the summer to generate full
1872       // crc chunks from now on.
1873       if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
1874         appendChunk = false;
1875         resetChecksumBufSize();
1876       }
1877 
1878       if (!appendChunk) {
1879         int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
1880         computePacketChunkSize(psize, bytesPerChecksum);
1881       }
1882       //
1883       // if encountering a block boundary, send an empty packet to
1884       // indicate the end of block and reset bytesCurBlock.
1885       //
1886       if (bytesCurBlock == blockSize) {
1887         currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
1888         currentPacket.setSyncBlock(shouldSyncBlock);
1889         waitAndQueueCurrentPacket();
1890         bytesCurBlock = 0;
1891         lastFlushOffset = 0;
1892       }
1893     }
1894   }
1895 
1896   @Deprecated
sync()1897   public void sync() throws IOException {
1898     hflush();
1899   }
1900 
1901   /**
1902    * Flushes out to all replicas of the block. The data is in the buffers
1903    * of the DNs but not necessarily in the DN's OS buffers.
1904    *
1905    * It is a synchronous operation. When it returns,
1906    * it guarantees that flushed data become visible to new readers.
1907    * It is not guaranteed that data has been flushed to
1908    * persistent store on the datanode.
1909    * Block allocations are persisted on namenode.
1910    */
1911   @Override
hflush()1912   public void hflush() throws IOException {
1913     TraceScope scope =
1914         dfsClient.getPathTraceScope("hflush", src);
1915     try {
1916       flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
1917     } finally {
1918       scope.close();
1919     }
1920   }
1921 
1922   @Override
hsync()1923   public void hsync() throws IOException {
1924     TraceScope scope =
1925         dfsClient.getPathTraceScope("hsync", src);
1926     try {
1927       flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
1928     } finally {
1929       scope.close();
1930     }
1931   }
1932 
1933   /**
1934    * The expected semantics is all data have flushed out to all replicas
1935    * and all replicas have done posix fsync equivalent - ie the OS has
1936    * flushed it to the disk device (but the disk may have it in its cache).
1937    *
1938    * Note that only the current block is flushed to the disk device.
1939    * To guarantee durable sync across block boundaries the stream should
1940    * be created with {@link CreateFlag#SYNC_BLOCK}.
1941    *
1942    * @param syncFlags
1943    *          Indicate the semantic of the sync. Currently used to specify
1944    *          whether or not to update the block length in NameNode.
1945    */
hsync(EnumSet<SyncFlag> syncFlags)1946   public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
1947     TraceScope scope =
1948         dfsClient.getPathTraceScope("hsync", src);
1949     try {
1950       flushOrSync(true, syncFlags);
1951     } finally {
1952       scope.close();
1953     }
1954   }
1955 
1956   /**
1957    * Flush/Sync buffered data to DataNodes.
1958    *
1959    * @param isSync
1960    *          Whether or not to require all replicas to flush data to the disk
1961    *          device
1962    * @param syncFlags
1963    *          Indicate extra detailed semantic of the flush/sync. Currently
1964    *          mainly used to specify whether or not to update the file length in
1965    *          the NameNode
1966    * @throws IOException
1967    */
flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)1968   private void flushOrSync(boolean isSync, EnumSet<SyncFlag> syncFlags)
1969       throws IOException {
1970     dfsClient.checkOpen();
1971     checkClosed();
1972     try {
1973       long toWaitFor;
1974       long lastBlockLength = -1L;
1975       boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
1976       boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
1977       synchronized (this) {
1978         // flush checksum buffer, but keep checksum buffer intact if we do not
1979         // need to end the current block
1980         int numKept = flushBuffer(!endBlock, true);
1981         // bytesCurBlock potentially incremented if there was buffered data
1982 
1983         if (DFSClient.LOG.isDebugEnabled()) {
1984           DFSClient.LOG.debug("DFSClient flush():"
1985               + " bytesCurBlock=" + bytesCurBlock
1986               + " lastFlushOffset=" + lastFlushOffset
1987               + " createNewBlock=" + endBlock);
1988         }
1989         // Flush only if we haven't already flushed till this offset.
1990         if (lastFlushOffset != bytesCurBlock) {
1991           assert bytesCurBlock > lastFlushOffset;
1992           // record the valid offset of this flush
1993           lastFlushOffset = bytesCurBlock;
1994           if (isSync && currentPacket == null && !endBlock) {
1995             // Nothing to send right now,
1996             // but sync was requested.
1997             // Send an empty packet if we do not end the block right now
1998             currentPacket = createPacket(packetSize, chunksPerPacket,
1999                 bytesCurBlock, currentSeqno++, false);
2000           }
2001         } else {
2002           if (isSync && bytesCurBlock > 0 && !endBlock) {
2003             // Nothing to send right now,
2004             // and the block was partially written,
2005             // and sync was requested.
2006             // So send an empty sync packet if we do not end the block right now
2007             currentPacket = createPacket(packetSize, chunksPerPacket,
2008                 bytesCurBlock, currentSeqno++, false);
2009           } else if (currentPacket != null) {
2010             // just discard the current packet since it is already been sent.
2011             currentPacket.releaseBuffer(byteArrayManager);
2012             currentPacket = null;
2013           }
2014         }
2015         if (currentPacket != null) {
2016           currentPacket.setSyncBlock(isSync);
2017           waitAndQueueCurrentPacket();
2018         }
2019         if (endBlock && bytesCurBlock > 0) {
2020           // Need to end the current block, thus send an empty packet to
2021           // indicate this is the end of the block and reset bytesCurBlock
2022           currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
2023           currentPacket.setSyncBlock(shouldSyncBlock || isSync);
2024           waitAndQueueCurrentPacket();
2025           bytesCurBlock = 0;
2026           lastFlushOffset = 0;
2027         } else {
2028           // Restore state of stream. Record the last flush offset
2029           // of the last full chunk that was flushed.
2030           bytesCurBlock -= numKept;
2031         }
2032 
2033         toWaitFor = lastQueuedSeqno;
2034       } // end synchronized
2035 
2036       waitForAckedSeqno(toWaitFor);
2037 
2038       // update the block length first time irrespective of flag
2039       if (updateLength || persistBlocks.get()) {
2040         synchronized (this) {
2041           if (streamer != null && streamer.block != null) {
2042             lastBlockLength = streamer.block.getNumBytes();
2043           }
2044         }
2045       }
2046       // If 1) any new blocks were allocated since the last flush, or 2) to
2047       // update length in NN is required, then persist block locations on
2048       // namenode.
2049       if (persistBlocks.getAndSet(false) || updateLength) {
2050         try {
2051           dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
2052               lastBlockLength);
2053         } catch (IOException ioe) {
2054           DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
2055           // If we got an error here, it might be because some other thread called
2056           // close before our hflush completed. In that case, we should throw an
2057           // exception that the stream is closed.
2058           checkClosed();
2059           // If we aren't closed but failed to sync, we should expose that to the
2060           // caller.
2061           throw ioe;
2062         }
2063       }
2064 
2065       synchronized(this) {
2066         if (streamer != null) {
2067           streamer.setHflush();
2068         }
2069       }
2070     } catch (InterruptedIOException interrupt) {
2071       // This kind of error doesn't mean that the stream itself is broken - just the
2072       // flushing thread got interrupted. So, we shouldn't close down the writer,
2073       // but instead just propagate the error
2074       throw interrupt;
2075     } catch (IOException e) {
2076       DFSClient.LOG.warn("Error while syncing", e);
2077       synchronized (this) {
2078         if (!isClosed()) {
2079           lastException.set(new IOException("IOException flush: " + e));
2080           closeThreads(true);
2081         }
2082       }
2083       throw e;
2084     }
2085   }
2086 
2087   /**
2088    * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
2089    */
2090   @Deprecated
getNumCurrentReplicas()2091   public synchronized int getNumCurrentReplicas() throws IOException {
2092     return getCurrentBlockReplication();
2093   }
2094 
2095   /**
2096    * Note that this is not a public API;
2097    * use {@link HdfsDataOutputStream#getCurrentBlockReplication()} instead.
2098    *
2099    * @return the number of valid replicas of the current block
2100    */
getCurrentBlockReplication()2101   public synchronized int getCurrentBlockReplication() throws IOException {
2102     dfsClient.checkOpen();
2103     checkClosed();
2104     if (streamer == null) {
2105       return blockReplication; // no pipeline, return repl factor of file
2106     }
2107     DatanodeInfo[] currentNodes = streamer.getNodes();
2108     if (currentNodes == null) {
2109       return blockReplication; // no pipeline, return repl factor of file
2110     }
2111     return currentNodes.length;
2112   }
2113 
2114   /**
2115    * Waits till all existing data is flushed and confirmations
2116    * received from datanodes.
2117    */
flushInternal()2118   private void flushInternal() throws IOException {
2119     long toWaitFor;
2120     synchronized (this) {
2121       dfsClient.checkOpen();
2122       checkClosed();
2123       //
2124       // If there is data in the current buffer, send it across
2125       //
2126       queueCurrentPacket();
2127       toWaitFor = lastQueuedSeqno;
2128     }
2129 
2130     waitForAckedSeqno(toWaitFor);
2131   }
2132 
waitForAckedSeqno(long seqno)2133   private void waitForAckedSeqno(long seqno) throws IOException {
2134     TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
2135     try {
2136       if (DFSClient.LOG.isDebugEnabled()) {
2137         DFSClient.LOG.debug("Waiting for ack for: " + seqno);
2138       }
2139       long begin = Time.monotonicNow();
2140       try {
2141         synchronized (dataQueue) {
2142           while (!isClosed()) {
2143             checkClosed();
2144             if (lastAckedSeqno >= seqno) {
2145               break;
2146             }
2147             try {
2148               dataQueue.wait(1000); // when we receive an ack, we notify on
2149               // dataQueue
2150             } catch (InterruptedException ie) {
2151               throw new InterruptedIOException(
2152                   "Interrupted while waiting for data to be acknowledged by pipeline");
2153             }
2154           }
2155         }
2156         checkClosed();
2157       } catch (ClosedChannelException e) {
2158       }
2159       long duration = Time.monotonicNow() - begin;
2160       if (duration > dfsclientSlowLogThresholdMs) {
2161         DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
2162             + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
2163       }
2164     } finally {
2165       scope.close();
2166     }
2167   }
2168 
start()2169   private synchronized void start() {
2170     streamer.start();
2171   }
2172 
2173   /**
2174    * Aborts this output stream and releases any system
2175    * resources associated with this stream.
2176    */
abort()2177   void abort() throws IOException {
2178     synchronized (this) {
2179       if (isClosed()) {
2180         return;
2181       }
2182       streamer.setLastException(new IOException("Lease timeout of "
2183           + (dfsClient.getHdfsTimeout() / 1000) + " seconds expired."));
2184       closeThreads(true);
2185     }
2186     dfsClient.endFileLease(fileId);
2187   }
2188 
isClosed()2189   boolean isClosed() {
2190     return closed;
2191   }
2192 
setClosed()2193   void setClosed() {
2194     closed = true;
2195     synchronized (dataQueue) {
2196       releaseBuffer(dataQueue, byteArrayManager);
2197       releaseBuffer(ackQueue, byteArrayManager);
2198     }
2199   }
2200 
releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam)2201   private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) {
2202     for (DFSPacket p : packets) {
2203       p.releaseBuffer(bam);
2204     }
2205     packets.clear();
2206   }
2207 
2208   // shutdown datastreamer and responseprocessor threads.
2209   // interrupt datastreamer if force is true
closeThreads(boolean force)2210   private void closeThreads(boolean force) throws IOException {
2211     try {
2212       streamer.close(force);
2213       streamer.join();
2214       if (s != null) {
2215         s.close();
2216       }
2217     } catch (InterruptedException e) {
2218       throw new IOException("Failed to shutdown streamer");
2219     } finally {
2220       streamer = null;
2221       s = null;
2222       setClosed();
2223     }
2224   }
2225 
2226   /**
2227    * Closes this output stream and releases any system
2228    * resources associated with this stream.
2229    */
2230   @Override
close()2231   public void close() throws IOException {
2232     synchronized (this) {
2233       TraceScope scope = dfsClient.getPathTraceScope("DFSOutputStream#close",
2234           src);
2235       try {
2236         closeImpl();
2237       } finally {
2238         scope.close();
2239       }
2240     }
2241     dfsClient.endFileLease(fileId);
2242   }
2243 
closeImpl()2244   private synchronized void closeImpl() throws IOException {
2245     if (isClosed()) {
2246       IOException e = lastException.getAndSet(null);
2247       if (e == null)
2248         return;
2249       else
2250         throw e;
2251     }
2252 
2253     try {
2254       flushBuffer();       // flush from all upper layers
2255 
2256       if (currentPacket != null) {
2257         waitAndQueueCurrentPacket();
2258       }
2259 
2260       if (bytesCurBlock != 0) {
2261         // send an empty packet to mark the end of the block
2262         currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
2263         currentPacket.setSyncBlock(shouldSyncBlock);
2264       }
2265 
2266       flushInternal();             // flush all data to Datanodes
2267       // get last block before destroying the streamer
2268       ExtendedBlock lastBlock = streamer.getBlock();
2269       closeThreads(false);
2270       TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
2271       try {
2272         completeFile(lastBlock);
2273       } finally {
2274         scope.close();
2275       }
2276     } catch (ClosedChannelException e) {
2277     } finally {
2278       setClosed();
2279     }
2280   }
2281 
2282   // should be called holding (this) lock since setTestFilename() may
2283   // be called during unit tests
completeFile(ExtendedBlock last)2284   private void completeFile(ExtendedBlock last) throws IOException {
2285     long localstart = Time.monotonicNow();
2286     long localTimeout = 400;
2287     boolean fileComplete = false;
2288     int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
2289     while (!fileComplete) {
2290       fileComplete =
2291           dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
2292       if (!fileComplete) {
2293         final int hdfsTimeout = dfsClient.getHdfsTimeout();
2294         if (!dfsClient.clientRunning
2295             || (hdfsTimeout > 0
2296                 && localstart + hdfsTimeout < Time.monotonicNow())) {
2297             String msg = "Unable to close file because dfsclient " +
2298                           " was unable to contact the HDFS servers." +
2299                           " clientRunning " + dfsClient.clientRunning +
2300                           " hdfsTimeout " + hdfsTimeout;
2301             DFSClient.LOG.info(msg);
2302             throw new IOException(msg);
2303         }
2304         try {
2305           if (retries == 0) {
2306             throw new IOException("Unable to close file because the last block"
2307                 + " does not have enough number of replicas.");
2308           }
2309           retries--;
2310           Thread.sleep(localTimeout);
2311           localTimeout *= 2;
2312           if (Time.monotonicNow() - localstart > 5000) {
2313             DFSClient.LOG.info("Could not complete " + src + " retrying...");
2314           }
2315         } catch (InterruptedException ie) {
2316           DFSClient.LOG.warn("Caught exception ", ie);
2317         }
2318       }
2319     }
2320   }
2321 
2322   @VisibleForTesting
setArtificialSlowdown(long period)2323   public void setArtificialSlowdown(long period) {
2324     artificialSlowdown = period;
2325   }
2326 
2327   @VisibleForTesting
setChunksPerPacket(int value)2328   public synchronized void setChunksPerPacket(int value) {
2329     chunksPerPacket = Math.min(chunksPerPacket, value);
2330     packetSize = (bytesPerChecksum + getChecksumSize()) * chunksPerPacket;
2331   }
2332 
setTestFilename(String newname)2333   synchronized void setTestFilename(String newname) {
2334     src = newname;
2335   }
2336 
2337   /**
2338    * Returns the size of a file as it was when this stream was opened
2339    */
getInitialLen()2340   public long getInitialLen() {
2341     return initialFileSize;
2342   }
2343 
2344   /**
2345    * @return the FileEncryptionInfo for this stream, or null if not encrypted.
2346    */
getFileEncryptionInfo()2347   public FileEncryptionInfo getFileEncryptionInfo() {
2348     return fileEncryptionInfo;
2349   }
2350 
2351   /**
2352    * Returns the access token currently used by streamer, for testing only
2353    */
getBlockToken()2354   synchronized Token<BlockTokenIdentifier> getBlockToken() {
2355     return streamer.getBlockToken();
2356   }
2357 
2358   @Override
setDropBehind(Boolean dropBehind)2359   public void setDropBehind(Boolean dropBehind) throws IOException {
2360     CachingStrategy prevStrategy, nextStrategy;
2361     // CachingStrategy is immutable.  So build a new CachingStrategy with the
2362     // modifications we want, and compare-and-swap it in.
2363     do {
2364       prevStrategy = this.cachingStrategy.get();
2365       nextStrategy = new CachingStrategy.Builder(prevStrategy).
2366                         setDropBehind(dropBehind).build();
2367     } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
2368   }
2369 
2370   @VisibleForTesting
getBlock()2371   ExtendedBlock getBlock() {
2372     return streamer.getBlock();
2373   }
2374 
2375   @VisibleForTesting
getFileId()2376   public long getFileId() {
2377     return fileId;
2378   }
2379 
arraycopy(T[] srcs, T[] dsts, int skipIndex)2380   private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) {
2381     System.arraycopy(srcs, 0, dsts, 0, skipIndex);
2382     System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex);
2383   }
2384 }
2385