1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs.nfs.nfs3;
19 
20 import java.io.File;
21 import java.io.FileNotFoundException;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import java.io.RandomAccessFile;
25 import java.nio.ByteBuffer;
26 import java.nio.channels.ClosedChannelException;
27 import java.util.EnumSet;
28 import java.util.Iterator;
29 import java.util.Map.Entry;
30 import java.util.concurrent.ConcurrentNavigableMap;
31 import java.util.concurrent.ConcurrentSkipListMap;
32 import java.util.concurrent.atomic.AtomicLong;
33 
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.hadoop.fs.FSDataInputStream;
37 import org.apache.hadoop.hdfs.DFSClient;
38 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
39 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
40 import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
41 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
42 import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState;
43 import org.apache.hadoop.io.BytesWritable.Comparator;
44 import org.apache.hadoop.io.IOUtils;
45 import org.apache.hadoop.nfs.nfs3.FileHandle;
46 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
47 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
48 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
49 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
50 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
51 import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
52 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
53 import org.apache.hadoop.nfs.nfs3.response.WccAttr;
54 import org.apache.hadoop.nfs.nfs3.response.WccData;
55 import org.apache.hadoop.oncrpc.XDR;
56 import org.apache.hadoop.oncrpc.security.VerifierNone;
57 import org.apache.hadoop.security.IdMappingServiceProvider;
58 import org.apache.hadoop.util.Daemon;
59 import org.apache.hadoop.util.Time;
60 import org.jboss.netty.channel.Channel;
61 
62 import com.google.common.annotations.VisibleForTesting;
63 import com.google.common.base.Preconditions;
64 
65 /**
66  * OpenFileCtx saves the context of one HDFS file output stream. Access to it is
67  * synchronized by its member lock.
68  */
69 class OpenFileCtx {
70   public static final Log LOG = LogFactory.getLog(OpenFileCtx.class);
71 
72   // Pending writes water mark for dump, 1MB
73   private static long DUMP_WRITE_WATER_MARK = 1024 * 1024;
74 
75   static enum COMMIT_STATUS {
76     COMMIT_FINISHED,
77     COMMIT_WAIT,
78     COMMIT_INACTIVE_CTX,
79     COMMIT_INACTIVE_WITH_PENDING_WRITE,
80     COMMIT_ERROR,
81     COMMIT_DO_SYNC,
82     /**
83      * Deferred COMMIT response could fail file uploading. The following two
84      * status are introduced as a solution. 1. if client asks to commit
85      * non-sequential trunk of data, NFS gateway return success with the hope
86      * that client will send the prerequisite writes. 2. if client asks to
87      * commit a sequential trunk(means it can be flushed to HDFS), NFS gateway
88      * return a special error NFS3ERR_JUKEBOX indicating the client needs to
89      * retry. Meanwhile, NFS gateway keeps flush data to HDFS and do sync
90      * eventually.
91      *
92      * The reason to let client wait is that, we want the client to wait for the
93      * last commit. Otherwise, client thinks file upload finished (e.g., cp
94      * command returns success) but NFS could be still flushing staged data to
95      * HDFS. However, we don't know which one is the last commit. We make the
96      * assumption that a commit after sequential writes may be the last.
97      * Referring HDFS-7259 for more details.
98      * */
99     COMMIT_SPECIAL_WAIT, // scoped pending writes is sequential
100     COMMIT_SPECIAL_SUCCESS;// scoped pending writes is not sequential
101   }
102 
103   private final DFSClient client;
104   private final IdMappingServiceProvider iug;
105 
106   // The stream status. False means the stream is closed.
107   private volatile boolean activeState;
108   // The stream write-back status. True means one thread is doing write back.
109   private volatile boolean asyncStatus;
110   private volatile long asyncWriteBackStartOffset;
111 
112   /**
113    * The current offset of the file in HDFS. All the content before this offset
114    * has been written back to HDFS.
115    */
116   private AtomicLong nextOffset;
117   private final HdfsDataOutputStream fos;
118   private final boolean aixCompatMode;
119 
120   // It's updated after each sync to HDFS
121   private Nfs3FileAttributes latestAttr;
122 
123   private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
124 
125   private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
126 
127   static class CommitCtx {
128     private final long offset;
129     private final Channel channel;
130     private final int xid;
131     private final Nfs3FileAttributes preOpAttr;
132 
133     public final long startTime;
134 
getOffset()135     long getOffset() {
136       return offset;
137     }
138 
getChannel()139     Channel getChannel() {
140       return channel;
141     }
142 
getXid()143     int getXid() {
144       return xid;
145     }
146 
getPreOpAttr()147     Nfs3FileAttributes getPreOpAttr() {
148       return preOpAttr;
149     }
150 
getStartTime()151     long getStartTime() {
152       return startTime;
153     }
154 
CommitCtx(long offset, Channel channel, int xid, Nfs3FileAttributes preOpAttr)155     CommitCtx(long offset, Channel channel, int xid,
156         Nfs3FileAttributes preOpAttr) {
157       this.offset = offset;
158       this.channel = channel;
159       this.xid = xid;
160       this.preOpAttr = preOpAttr;
161       this.startTime = System.nanoTime();
162     }
163 
164     @Override
toString()165     public String toString() {
166       return String.format("offset: %d xid: %d startTime: %d", offset, xid,
167           startTime);
168     }
169   }
170 
171   // The last write, commit request or write-back event. Updating time to keep
172   // output steam alive.
173   private long lastAccessTime;
174 
175   private volatile boolean enabledDump;
176   private FileOutputStream dumpOut;
177 
178   /** Tracks the data buffered in memory related to non sequential writes */
179   private AtomicLong nonSequentialWriteInMemory;
180 
181   private RandomAccessFile raf;
182   private final String dumpFilePath;
183   private Daemon dumpThread;
184   private final boolean uploadLargeFile;
185 
updateLastAccessTime()186   private void updateLastAccessTime() {
187     lastAccessTime = Time.monotonicNow();
188   }
189 
checkStreamTimeout(long streamTimeout)190   private boolean checkStreamTimeout(long streamTimeout) {
191     return Time.monotonicNow() - lastAccessTime > streamTimeout;
192   }
193 
getLastAccessTime()194   long getLastAccessTime() {
195     return lastAccessTime;
196   }
197 
getNextOffset()198   public long getNextOffset() {
199     return nextOffset.get();
200   }
201 
getActiveState()202   boolean getActiveState() {
203     return this.activeState;
204   }
205 
hasPendingWork()206   boolean hasPendingWork() {
207     return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
208   }
209 
210   /** Increase or decrease the memory occupation of non-sequential writes */
updateNonSequentialWriteInMemory(long count)211   private long updateNonSequentialWriteInMemory(long count) {
212     long newValue = nonSequentialWriteInMemory.addAndGet(count);
213     if (LOG.isDebugEnabled()) {
214       LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value: "
215           + newValue);
216     }
217 
218     Preconditions.checkState(newValue >= 0,
219         "nonSequentialWriteInMemory is negative " + newValue
220             + " after update with count " + count);
221     return newValue;
222   }
223 
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug)224   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
225       String dumpFilePath, DFSClient client, IdMappingServiceProvider iug) {
226     this(fos, latestAttr, dumpFilePath, client, iug, false,
227         new NfsConfiguration());
228   }
229 
OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config)230   OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr,
231       String dumpFilePath, DFSClient client, IdMappingServiceProvider iug,
232       boolean aixCompatMode, NfsConfiguration config) {
233     this.fos = fos;
234     this.latestAttr = latestAttr;
235     this.aixCompatMode = aixCompatMode;
236     // We use the ReverseComparatorOnMin as the comparator of the map. In this
237     // way, we first dump the data with larger offset. In the meanwhile, we
238     // retrieve the last element to write back to HDFS.
239     pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>(
240         OffsetRange.ReverseComparatorOnMin);
241 
242     pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>();
243 
244     updateLastAccessTime();
245     activeState = true;
246     asyncStatus = false;
247     asyncWriteBackStartOffset = 0;
248     dumpOut = null;
249     raf = null;
250     nonSequentialWriteInMemory = new AtomicLong(0);
251 
252     this.dumpFilePath = dumpFilePath;
253     enabledDump = dumpFilePath != null;
254     nextOffset = new AtomicLong();
255     nextOffset.set(latestAttr.getSize());
256     try {
257       assert(nextOffset.get() == this.fos.getPos());
258     } catch (IOException e) {}
259     dumpThread = null;
260     this.client = client;
261     this.iug = iug;
262     this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD,
263         NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT);
264   }
265 
getLatestAttr()266   public Nfs3FileAttributes getLatestAttr() {
267     return latestAttr;
268   }
269 
270   // Get flushed offset. Note that flushed data may not be persisted.
getFlushedOffset()271   private long getFlushedOffset() throws IOException {
272     return fos.getPos();
273   }
274 
275   // Check if need to dump the new writes
waitForDump()276   private void waitForDump() {
277     if (!enabledDump) {
278       if (LOG.isDebugEnabled()) {
279         LOG.debug("Do nothing, dump is disabled.");
280       }
281       return;
282     }
283 
284     if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
285       return;
286     }
287 
288     // wake up the dumper thread to dump the data
289     synchronized (this) {
290       if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
291         if (LOG.isDebugEnabled()) {
292           LOG.debug("Asking dumper to dump...");
293         }
294         if (dumpThread == null) {
295           dumpThread = new Daemon(new Dumper());
296           dumpThread.start();
297         } else {
298           this.notifyAll();
299         }
300       }
301 
302       while (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
303         try {
304           this.wait();
305         } catch (InterruptedException ignored) {
306         }
307       }
308 
309     }
310   }
311 
312   class Dumper implements Runnable {
313     /** Dump data into a file */
dump()314     private void dump() {
315       // Create dump outputstream for the first time
316       if (dumpOut == null) {
317         LOG.info("Create dump file: " + dumpFilePath);
318         File dumpFile = new File(dumpFilePath);
319         try {
320           synchronized (this) {
321             // check if alive again
322             Preconditions.checkState(dumpFile.createNewFile(),
323                 "The dump file should not exist: %s", dumpFilePath);
324             dumpOut = new FileOutputStream(dumpFile);
325           }
326         } catch (IOException e) {
327           LOG.error("Got failure when creating dump stream " + dumpFilePath, e);
328           enabledDump = false;
329           if (dumpOut != null) {
330             try {
331               dumpOut.close();
332             } catch (IOException e1) {
333               LOG.error("Can't close dump stream " + dumpFilePath, e);
334             }
335           }
336           return;
337         }
338       }
339 
340       // Get raf for the first dump
341       if (raf == null) {
342         try {
343           raf = new RandomAccessFile(dumpFilePath, "r");
344         } catch (FileNotFoundException e) {
345           LOG.error("Can't get random access to file " + dumpFilePath);
346           // Disable dump
347           enabledDump = false;
348           return;
349         }
350       }
351 
352       if (LOG.isDebugEnabled()) {
353         LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == "
354             + nonSequentialWriteInMemory.get());
355       }
356 
357       Iterator<OffsetRange> it = pendingWrites.keySet().iterator();
358       while (activeState && it.hasNext()
359           && nonSequentialWriteInMemory.get() > 0) {
360         OffsetRange key = it.next();
361         WriteCtx writeCtx = pendingWrites.get(key);
362         if (writeCtx == null) {
363           // This write was just deleted
364           continue;
365         }
366         try {
367           long dumpedDataSize = writeCtx.dumpData(dumpOut, raf);
368           if (dumpedDataSize > 0) {
369             updateNonSequentialWriteInMemory(-dumpedDataSize);
370           }
371         } catch (IOException e) {
372           LOG.error("Dump data failed: " + writeCtx + " with error: " + e
373               + " OpenFileCtx state: " + activeState);
374           // Disable dump
375           enabledDump = false;
376           return;
377         }
378       }
379 
380       if (LOG.isDebugEnabled()) {
381         LOG.debug("After dump, nonSequentialWriteInMemory == "
382             + nonSequentialWriteInMemory.get());
383       }
384     }
385 
386     @Override
run()387     public void run() {
388       while (activeState && enabledDump) {
389         try {
390           if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
391             dump();
392           }
393           synchronized (OpenFileCtx.this) {
394             if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
395               OpenFileCtx.this.notifyAll();
396               try {
397                 OpenFileCtx.this.wait();
398                 if (LOG.isDebugEnabled()) {
399                   LOG.debug("Dumper woke up");
400                 }
401               } catch (InterruptedException e) {
402                 LOG.info("Dumper is interrupted, dumpFilePath= "
403                     + OpenFileCtx.this.dumpFilePath);
404               }
405             }
406           }
407           if (LOG.isDebugEnabled()) {
408             LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState
409                 + " enabledDump: " + enabledDump);
410           }
411         } catch (Throwable t) {
412           // unblock threads with new request
413           synchronized (OpenFileCtx.this) {
414             OpenFileCtx.this.notifyAll();
415           }
416           LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
417               + OpenFileCtx.this.dumpFilePath, t);
418           activeState = false;
419         }
420       }
421     }
422   }
423 
checkRepeatedWriteRequest(WRITE3Request request, Channel channel, int xid)424   private WriteCtx checkRepeatedWriteRequest(WRITE3Request request,
425       Channel channel, int xid) {
426     OffsetRange range = new OffsetRange(request.getOffset(),
427         request.getOffset() + request.getCount());
428     WriteCtx writeCtx = pendingWrites.get(range);
429     if (writeCtx== null) {
430       return null;
431     } else {
432       if (xid != writeCtx.getXid()) {
433         LOG.warn("Got a repeated request, same range, with a different xid: "
434             + xid + " xid in old request: " + writeCtx.getXid());
435         //TODO: better handling.
436       }
437       return writeCtx;
438     }
439   }
440 
receivedNewWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdMappingServiceProvider iug)441   public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request,
442       Channel channel, int xid, AsyncDataService asyncDataService,
443       IdMappingServiceProvider iug) {
444 
445     if (!activeState) {
446       LOG.info("OpenFileCtx is inactive, fileId: "
447           + request.getHandle().getFileId());
448       WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
449       WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
450           fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
451       Nfs3Utils.writeChannel(channel,
452           response.serialize(new XDR(), xid, new VerifierNone()),
453           xid);
454     } else {
455       // Update the write time first
456       updateLastAccessTime();
457 
458       // Handle repeated write requests (same xid or not).
459       // If already replied, send reply again. If not replied, drop the
460       // repeated request.
461       WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel,
462           xid);
463       if (existantWriteCtx != null) {
464         if (!existantWriteCtx.getReplied()) {
465           if (LOG.isDebugEnabled()) {
466             LOG.debug("Repeated write request which hasn't been served: xid="
467                 + xid + ", drop it.");
468           }
469         } else {
470           if (LOG.isDebugEnabled()) {
471             LOG.debug("Repeated write request which is already served: xid="
472                 + xid + ", resend response.");
473           }
474           WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
475           WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
476               fileWcc, request.getCount(), request.getStableHow(),
477               Nfs3Constant.WRITE_COMMIT_VERF);
478           Nfs3Utils.writeChannel(channel, response.serialize(
479               new XDR(), xid, new VerifierNone()), xid);
480         }
481       } else {
482         // not a repeated write request
483         receivedNewWriteInternal(dfsClient, request, channel, xid,
484             asyncDataService, iug);
485       }
486     }
487   }
488 
489   @VisibleForTesting
alterWriteRequest(WRITE3Request request, long cachedOffset)490   public static void alterWriteRequest(WRITE3Request request, long cachedOffset) {
491     long offset = request.getOffset();
492     int count = request.getCount();
493     long smallerCount = offset + count - cachedOffset;
494     if (LOG.isDebugEnabled()) {
495       LOG.debug(String.format("Got overwrite with appended data (%d-%d),"
496           + " current offset %d," + " drop the overlapped section (%d-%d)"
497           + " and append new data (%d-%d).", offset, (offset + count - 1),
498           cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
499               + count - 1)));
500     }
501 
502     ByteBuffer data = request.getData();
503     Preconditions.checkState(data.position() == 0,
504         "The write request data has non-zero position");
505     data.position((int) (cachedOffset - offset));
506     Preconditions.checkState(data.limit() - data.position() == smallerCount,
507         "The write request buffer has wrong limit/position regarding count");
508 
509     request.setOffset(cachedOffset);
510     request.setCount((int) smallerCount);
511   }
512 
513   /**
514    * Creates and adds a WriteCtx into the pendingWrites map. This is a
515    * synchronized method to handle concurrent writes.
516    *
517    * @return A non-null {@link WriteCtx} instance if the incoming write
518    *         request's offset >= nextOffset. Otherwise null.
519    */
addWritesToCache(WRITE3Request request, Channel channel, int xid)520   private synchronized WriteCtx addWritesToCache(WRITE3Request request,
521       Channel channel, int xid) {
522     long offset = request.getOffset();
523     int count = request.getCount();
524     long cachedOffset = nextOffset.get();
525     int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT;
526 
527     if (LOG.isDebugEnabled()) {
528       LOG.debug("requested offset=" + offset + " and current offset="
529           + cachedOffset);
530     }
531 
532     // Handle a special case first
533     if ((offset < cachedOffset) && (offset + count > cachedOffset)) {
534       // One Linux client behavior: after a file is closed and reopened to
535       // write, the client sometimes combines previous written data(could still
536       // be in kernel buffer) with newly appended data in one write. This is
537       // usually the first write after file reopened. In this
538       // case, we log the event and drop the overlapped section.
539       LOG.warn(String.format("Got overwrite with appended data (%d-%d),"
540           + " current offset %d," + " drop the overlapped section (%d-%d)"
541           + " and append new data (%d-%d).", offset, (offset + count - 1),
542           cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset
543               + count - 1)));
544 
545       if (!pendingWrites.isEmpty()) {
546         LOG.warn("There are other pending writes, fail this jumbo write");
547         return null;
548       }
549 
550       LOG.warn("Modify this write to write only the appended data");
551       alterWriteRequest(request, cachedOffset);
552 
553       // Update local variable
554       originalCount = count;
555       offset = request.getOffset();
556       count = request.getCount();
557     }
558 
559     // Fail non-append call
560     if (offset < cachedOffset) {
561       LOG.warn("(offset,count,nextOffset): " + "(" + offset + "," + count + ","
562           + nextOffset + ")");
563       return null;
564     } else {
565       DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP
566           : WriteCtx.DataState.ALLOW_DUMP;
567       WriteCtx writeCtx = new WriteCtx(request.getHandle(),
568           request.getOffset(), request.getCount(), originalCount,
569           request.getStableHow(), request.getData(), channel, xid, false,
570           dataState);
571       if (LOG.isDebugEnabled()) {
572         LOG.debug("Add new write to the list with nextOffset " + cachedOffset
573             + " and requested offset=" + offset);
574       }
575       if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
576         // update the memory size
577         updateNonSequentialWriteInMemory(count);
578       }
579       // check if there is a WriteCtx with the same range in pendingWrites
580       WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
581       if (oldWriteCtx == null) {
582         pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
583         if (LOG.isDebugEnabled()) {
584           LOG.debug("New write buffered with xid " + xid + " nextOffset "
585               + cachedOffset + " req offset=" + offset + " mapsize="
586               + pendingWrites.size());
587         }
588       } else {
589         LOG.warn("Got a repeated request, same range, with xid: " + xid
590             + " nextOffset " + +cachedOffset + " req offset=" + offset);
591       }
592       return writeCtx;
593     }
594   }
595 
596   /** Process an overwrite write request */
processOverWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, IdMappingServiceProvider iug)597   private void processOverWrite(DFSClient dfsClient, WRITE3Request request,
598       Channel channel, int xid, IdMappingServiceProvider iug) {
599     WccData wccData = new WccData(latestAttr.getWccAttr(), null);
600     long offset = request.getOffset();
601     int count = request.getCount();
602     WriteStableHow stableHow = request.getStableHow();
603     WRITE3Response response;
604     long cachedOffset = nextOffset.get();
605     if (offset + count > cachedOffset) {
606       LOG.warn("Treat this jumbo write as a real random write, no support.");
607       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
608           WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
609     } else {
610       if (LOG.isDebugEnabled()) {
611         LOG.debug("Process perfectOverWrite");
612       }
613       // TODO: let executor handle perfect overwrite
614       response = processPerfectOverWrite(dfsClient, offset, count, stableHow,
615           request.getData().array(),
616           Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug);
617     }
618     updateLastAccessTime();
619     Nfs3Utils.writeChannel(channel,
620         response.serialize(new XDR(), xid, new VerifierNone()),
621         xid);
622   }
623 
624   /**
625    * Check if we can start the write (back to HDFS) now. If there is no hole for
626    * writing, and there is no other threads writing (i.e., asyncStatus is
627    * false), start the writing and set asyncStatus to true.
628    *
629    * @return True if the new write is sequential and we can start writing
630    *         (including the case that there is already a thread writing).
631    */
checkAndStartWrite( AsyncDataService asyncDataService, WriteCtx writeCtx)632   private synchronized boolean checkAndStartWrite(
633       AsyncDataService asyncDataService, WriteCtx writeCtx) {
634 
635     if (writeCtx.getOffset() == nextOffset.get()) {
636       if (!asyncStatus) {
637         if (LOG.isDebugEnabled()) {
638           LOG.debug("Trigger the write back task. Current nextOffset: "
639               + nextOffset.get());
640         }
641         asyncStatus = true;
642         asyncWriteBackStartOffset = writeCtx.getOffset();
643         asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
644       } else {
645         if (LOG.isDebugEnabled()) {
646           LOG.debug("The write back thread is working.");
647         }
648       }
649       return true;
650     } else {
651       return false;
652     }
653   }
654 
receivedNewWriteInternal(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdMappingServiceProvider iug)655   private void receivedNewWriteInternal(DFSClient dfsClient,
656       WRITE3Request request, Channel channel, int xid,
657       AsyncDataService asyncDataService, IdMappingServiceProvider iug) {
658     WriteStableHow stableHow = request.getStableHow();
659     WccAttr preOpAttr = latestAttr.getWccAttr();
660     int count = request.getCount();
661 
662     WriteCtx writeCtx = addWritesToCache(request, channel, xid);
663     if (writeCtx == null) {
664       // offset < nextOffset
665       processOverWrite(dfsClient, request, channel, xid, iug);
666     } else {
667       // The write is added to pendingWrites.
668       // Check and start writing back if necessary
669       boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
670       if (!startWriting) {
671         // offset > nextOffset. check if we need to dump data
672         waitForDump();
673 
674         // In test, noticed some Linux client sends a batch (e.g., 1MB)
675         // of reordered writes and won't send more writes until it gets
676         // responses of the previous batch. So here send response immediately
677         // for unstable non-sequential write
678         if (stableHow != WriteStableHow.UNSTABLE) {
679           LOG.info("Have to change stable write to unstable write: "
680               + request.getStableHow());
681           stableHow = WriteStableHow.UNSTABLE;
682         }
683 
684         if (LOG.isDebugEnabled()) {
685           LOG.debug("UNSTABLE write request, send response for offset: "
686               + writeCtx.getOffset());
687         }
688         WccData fileWcc = new WccData(preOpAttr, latestAttr);
689         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
690             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
691         RpcProgramNfs3.metrics.addWrite(Nfs3Utils
692             .getElapsedTime(writeCtx.startTime));
693         Nfs3Utils
694             .writeChannel(channel, response.serialize(new XDR(),
695                 xid, new VerifierNone()), xid);
696         writeCtx.setReplied(true);
697       }
698     }
699   }
700 
701   /**
702    * Honor 2 kinds of overwrites: 1). support some application like touch(write
703    * the same content back to change mtime), 2) client somehow sends the same
704    * write again in a different RPC.
705    */
processPerfectOverWrite(DFSClient dfsClient, long offset, int count, WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdMappingServiceProvider iug)706   private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
707       long offset, int count, WriteStableHow stableHow, byte[] data,
708       String path, WccData wccData, IdMappingServiceProvider iug) {
709     WRITE3Response response;
710 
711     // Read the content back
712     byte[] readbuffer = new byte[count];
713 
714     int readCount = 0;
715     FSDataInputStream fis = null;
716     try {
717       // Sync file data and length to avoid partial read failure
718       fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
719     } catch (ClosedChannelException closedException) {
720       LOG.info("The FSDataOutputStream has been closed. "
721           + "Continue processing the perfect overwrite.");
722     } catch (IOException e) {
723       LOG.info("hsync failed when processing possible perfect overwrite, path="
724           + path + " error: " + e);
725       return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
726           Nfs3Constant.WRITE_COMMIT_VERF);
727     }
728 
729     try {
730       fis = dfsClient.createWrappedInputStream(dfsClient.open(path));
731       readCount = fis.read(offset, readbuffer, 0, count);
732       if (readCount < count) {
733         LOG.error("Can't read back " + count + " bytes, partial read size: "
734             + readCount);
735         return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
736             Nfs3Constant.WRITE_COMMIT_VERF);
737       }
738     } catch (IOException e) {
739       LOG.info("Read failed when processing possible perfect overwrite, path="
740           + path, e);
741       return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
742           Nfs3Constant.WRITE_COMMIT_VERF);
743     } finally {
744       IOUtils.cleanup(LOG, fis);
745     }
746 
747     // Compare with the request
748     Comparator comparator = new Comparator();
749     if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) {
750       LOG.info("Perfect overwrite has different content");
751       response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0,
752           stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
753     } else {
754       LOG.info("Perfect overwrite has same content,"
755           + " updating the mtime, then return success");
756       Nfs3FileAttributes postOpAttr = null;
757       try {
758         dfsClient.setTimes(path, Time.monotonicNow(), -1);
759         postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug);
760       } catch (IOException e) {
761         LOG.info("Got error when processing perfect overwrite, path=" + path
762             + " error: " + e);
763         return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow,
764             Nfs3Constant.WRITE_COMMIT_VERF);
765       }
766 
767       wccData.setPostOpAttr(postOpAttr);
768       response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count,
769           stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
770     }
771     return response;
772   }
773 
774   /**
775    * Check the commit status with the given offset
776    * @param commitOffset the offset to commit
777    * @param channel the channel to return response
778    * @param xid the xid of the commit request
779    * @param preOpAttr the preOp attribute
780    * @param fromRead whether the commit is triggered from read request
781    * @return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
782    * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
783    */
checkCommit(DFSClient dfsClient, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead)784   public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
785       Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
786     if (!fromRead) {
787       Preconditions.checkState(channel != null && preOpAttr != null);
788       // Keep stream active
789       updateLastAccessTime();
790     }
791     Preconditions.checkState(commitOffset >= 0);
792 
793     COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
794         preOpAttr, fromRead);
795     if (LOG.isDebugEnabled()) {
796       LOG.debug("Got commit status: " + ret.name());
797     }
798     // Do the sync outside the lock
799     if (ret == COMMIT_STATUS.COMMIT_DO_SYNC
800         || ret == COMMIT_STATUS.COMMIT_FINISHED) {
801       try {
802         // Sync file data and length
803         fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
804         ret = COMMIT_STATUS.COMMIT_FINISHED; // Remove COMMIT_DO_SYNC status
805         // Nothing to do for metadata since attr related change is pass-through
806       } catch (ClosedChannelException cce) {
807         if (pendingWrites.isEmpty()) {
808           ret = COMMIT_STATUS.COMMIT_FINISHED;
809         } else {
810           ret = COMMIT_STATUS.COMMIT_ERROR;
811         }
812       } catch (IOException e) {
813         LOG.error("Got stream error during data sync: " + e);
814         // Do nothing. Stream will be closed eventually by StreamMonitor.
815         // status = Nfs3Status.NFS3ERR_IO;
816         ret = COMMIT_STATUS.COMMIT_ERROR;
817       }
818     }
819     return ret;
820   }
821 
822   // Check if the to-commit range is sequential
823   @VisibleForTesting
checkSequential(final long commitOffset, final long nextOffset)824   synchronized boolean checkSequential(final long commitOffset,
825       final long nextOffset) {
826     Preconditions.checkState(commitOffset >= nextOffset, "commitOffset "
827         + commitOffset + " less than nextOffset " + nextOffset);
828     long offset = nextOffset;
829     Iterator<OffsetRange> it = pendingWrites.descendingKeySet().iterator();
830     while (it.hasNext()) {
831       OffsetRange range = it.next();
832       if (range.getMin() != offset) {
833         // got a hole
834         return false;
835       }
836       offset = range.getMax();
837       if (offset > commitOffset) {
838         return true;
839       }
840     }
841     // there is gap between the last pending write and commitOffset
842     return false;
843   }
844 
handleSpecialWait(boolean fromRead, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr)845   private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset,
846       Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
847     if (!fromRead) {
848       // let client retry the same request, add pending commit to sync later
849       CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr);
850       pendingCommits.put(commitOffset, commitCtx);
851     }
852     if (LOG.isDebugEnabled()) {
853       LOG.debug("return COMMIT_SPECIAL_WAIT");
854     }
855     return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
856   }
857 
858   @VisibleForTesting
checkCommitInternal(long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead)859   synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
860       Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) {
861     if (!activeState) {
862       if (pendingWrites.isEmpty()) {
863         return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
864       } else {
865         // TODO: return success if already committed
866         return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
867       }
868     }
869 
870     long flushed = 0;
871     try {
872       flushed = getFlushedOffset();
873     } catch (IOException e) {
874       LOG.error("Can't get flushed offset, error:" + e);
875       return COMMIT_STATUS.COMMIT_ERROR;
876     }
877 
878     if (LOG.isDebugEnabled()) {
879       LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset
880           + "nextOffset=" + nextOffset.get());
881     }
882 
883     if (pendingWrites.isEmpty()) {
884       if (aixCompatMode) {
885         // Note that, there is no guarantee data is synced. Caller should still
886         // do a sync here though the output stream might be closed.
887         return COMMIT_STATUS.COMMIT_FINISHED;
888       } else {
889         if (flushed < nextOffset.get()) {
890           if (LOG.isDebugEnabled()) {
891             LOG.debug("get commit while still writing to the requested offset,"
892                 + " with empty queue");
893           }
894           return handleSpecialWait(fromRead, nextOffset.get(), channel, xid,
895               preOpAttr);
896         } else {
897           return COMMIT_STATUS.COMMIT_FINISHED;
898         }
899       }
900     }
901 
902     Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed
903         + " is larger than nextOffset " + nextOffset.get());
904     // Handle large file upload
905     if (uploadLargeFile && !aixCompatMode) {
906       long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry()
907           .getKey().getMax() - 1;
908 
909       if (co <= flushed) {
910         return COMMIT_STATUS.COMMIT_DO_SYNC;
911       } else if (co < nextOffset.get()) {
912         if (LOG.isDebugEnabled()) {
913           LOG.debug("get commit while still writing to the requested offset");
914         }
915         return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
916       } else {
917         // co >= nextOffset
918         if (checkSequential(co, nextOffset.get())) {
919           return handleSpecialWait(fromRead, co, channel, xid, preOpAttr);
920         } else {
921           if (LOG.isDebugEnabled()) {
922             LOG.debug("return COMMIT_SPECIAL_SUCCESS");
923           }
924           return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
925         }
926       }
927     }
928 
929     if (commitOffset > 0) {
930       if (aixCompatMode) {
931         // The AIX NFS client misinterprets RFC-1813 and will always send 4096
932         // for the commitOffset even if fewer bytes than that have ever (or will
933         // ever) be sent by the client. So, if in AIX compatibility mode, we
934         // will always DO_SYNC if the number of bytes to commit have already all
935         // been flushed, else we will fall through to the logic below which
936         // checks for pending writes in the case that we're being asked to
937         // commit more bytes than have so far been flushed. See HDFS-6549 for
938         // more info.
939         if (commitOffset <= flushed) {
940           return COMMIT_STATUS.COMMIT_DO_SYNC;
941         }
942       } else {
943         if (commitOffset > flushed) {
944           if (!fromRead) {
945             CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
946                 preOpAttr);
947             pendingCommits.put(commitOffset, commitCtx);
948           }
949           return COMMIT_STATUS.COMMIT_WAIT;
950         } else {
951           return COMMIT_STATUS.COMMIT_DO_SYNC;
952         }
953       }
954     }
955 
956     Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry();
957 
958     // Commit whole file, commitOffset == 0
959     if (!fromRead) {
960       // Insert commit
961       long maxOffset = key.getKey().getMax() - 1;
962       Preconditions.checkState(maxOffset > 0);
963       CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
964       pendingCommits.put(maxOffset, commitCtx);
965     }
966     return COMMIT_STATUS.COMMIT_WAIT;
967   }
968 
969   /**
970    * Check stream status to decide if it should be closed
971    * @return true, remove stream; false, keep stream
972    */
streamCleanup(long fileId, long streamTimeout)973   public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
974     Preconditions
975         .checkState(streamTimeout >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
976     if (!activeState) {
977       return true;
978     }
979 
980     boolean flag = false;
981     // Check the stream timeout
982     if (checkStreamTimeout(streamTimeout)) {
983       if (LOG.isDebugEnabled()) {
984         LOG.debug("stream can be closed for fileId: " + fileId);
985       }
986       flag = true;
987     }
988     return flag;
989   }
990 
991   /**
992    * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible.
993    *
994    * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's
995    *         offset is larger than nextOffSet.
996    */
offerNextToWrite()997   private synchronized WriteCtx offerNextToWrite() {
998     if (pendingWrites.isEmpty()) {
999       if (LOG.isDebugEnabled()) {
1000         LOG.debug("The async write task has no pending writes, fileId: "
1001             + latestAttr.getFileId());
1002       }
1003       // process pending commit again to handle this race: a commit is added
1004       // to pendingCommits map just after the last doSingleWrite returns.
1005       // There is no pending write and the commit should be handled by the
1006       // last doSingleWrite. Due to the race, the commit is left along and
1007       // can't be processed until cleanup. Therefore, we should do another
1008       // processCommits to fix the race issue.
1009       processCommits(nextOffset.get()); // nextOffset has same value as
1010                                         // flushedOffset
1011       this.asyncStatus = false;
1012       return null;
1013     }
1014 
1015       Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry();
1016       OffsetRange range = lastEntry.getKey();
1017       WriteCtx toWrite = lastEntry.getValue();
1018 
1019       if (LOG.isTraceEnabled()) {
1020         LOG.trace("range.getMin()=" + range.getMin() + " nextOffset="
1021             + nextOffset);
1022       }
1023 
1024       long offset = nextOffset.get();
1025       if (range.getMin() > offset) {
1026         if (LOG.isDebugEnabled()) {
1027           LOG.debug("The next sequential write has not arrived yet");
1028         }
1029         processCommits(nextOffset.get()); // handle race
1030         this.asyncStatus = false;
1031       } else if (range.getMin() < offset && range.getMax() > offset) {
1032         // shouldn't happen since we do sync for overlapped concurrent writers
1033         LOG.warn("Got an overlapping write (" + range.getMin() + ", "
1034             + range.getMax() + "), nextOffset=" + offset
1035             + ". Silently drop it now");
1036         pendingWrites.remove(range);
1037         processCommits(nextOffset.get()); // handle race
1038       } else {
1039         if (LOG.isDebugEnabled()) {
1040           LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax()
1041               + ") from the list");
1042         }
1043         // after writing, remove the WriteCtx from cache
1044         pendingWrites.remove(range);
1045         // update nextOffset
1046         nextOffset.addAndGet(toWrite.getCount());
1047         if (LOG.isDebugEnabled()) {
1048           LOG.debug("Change nextOffset to " + nextOffset.get());
1049         }
1050         return toWrite;
1051       }
1052 
1053     return null;
1054   }
1055 
1056   /** Invoked by AsyncDataService to write back to HDFS */
executeWriteBack()1057   void executeWriteBack() {
1058     Preconditions.checkState(asyncStatus,
1059         "openFileCtx has false asyncStatus, fileId: " + latestAttr.getFileId());
1060     final long startOffset = asyncWriteBackStartOffset;
1061     try {
1062       while (activeState) {
1063         // asyncStatus could be changed to false in offerNextToWrite()
1064         WriteCtx toWrite = offerNextToWrite();
1065         if (toWrite != null) {
1066           // Do the write
1067           doSingleWrite(toWrite);
1068           updateLastAccessTime();
1069         } else {
1070           break;
1071         }
1072       }
1073 
1074       if (!activeState && LOG.isDebugEnabled()) {
1075         LOG.debug("The openFileCtx is not active anymore, fileId: "
1076             + latestAttr.getFileId());
1077       }
1078     } finally {
1079       // Make sure to reset asyncStatus to false unless a race happens
1080       synchronized (this) {
1081         if (startOffset == asyncWriteBackStartOffset) {
1082           asyncStatus = false;
1083         } else {
1084           LOG.info("Another async task is already started before this one"
1085               + " is finalized. fileId: " + latestAttr.getFileId()
1086               + " asyncStatus: " + asyncStatus + " original startOffset: "
1087               + startOffset + " new startOffset: " + asyncWriteBackStartOffset
1088               + ". Won't change asyncStatus here.");
1089         }
1090       }
1091     }
1092   }
1093 
processCommits(long offset)1094   private void processCommits(long offset) {
1095     Preconditions.checkState(offset > 0);
1096     long flushedOffset = 0;
1097     Entry<Long, CommitCtx> entry = null;
1098 
1099     int status = Nfs3Status.NFS3ERR_IO;
1100     try {
1101       flushedOffset = getFlushedOffset();
1102       entry = pendingCommits.firstEntry();
1103       if (entry == null || entry.getValue().offset > flushedOffset) {
1104         return;
1105       }
1106 
1107       // Now do sync for the ready commits
1108       // Sync file data and length
1109       fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
1110       status = Nfs3Status.NFS3_OK;
1111     } catch (ClosedChannelException cce) {
1112       if (!pendingWrites.isEmpty()) {
1113         LOG.error("Can't sync for fileId: " + latestAttr.getFileId()
1114             + ". Channel closed with writes pending.", cce);
1115       }
1116       status = Nfs3Status.NFS3ERR_IO;
1117     } catch (IOException e) {
1118       LOG.error("Got stream error during data sync: ", e);
1119       // Do nothing. Stream will be closed eventually by StreamMonitor.
1120       status = Nfs3Status.NFS3ERR_IO;
1121     }
1122 
1123     // Update latestAttr
1124     try {
1125       latestAttr = Nfs3Utils.getFileAttr(client,
1126           Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug);
1127     } catch (IOException e) {
1128       LOG.error("Can't get new file attr, fileId: " + latestAttr.getFileId(), e);
1129       status = Nfs3Status.NFS3ERR_IO;
1130     }
1131 
1132     if (latestAttr.getSize() != offset) {
1133       LOG.error("After sync, the expect file size: " + offset
1134           + ", however actual file size is: " + latestAttr.getSize());
1135       status = Nfs3Status.NFS3ERR_IO;
1136     }
1137     WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr);
1138 
1139     // Send response for the ready commits
1140     while (entry != null && entry.getValue().offset <= flushedOffset) {
1141       pendingCommits.remove(entry.getKey());
1142       CommitCtx commit = entry.getValue();
1143 
1144       COMMIT3Response response = new COMMIT3Response(status, wccData,
1145           Nfs3Constant.WRITE_COMMIT_VERF);
1146       RpcProgramNfs3.metrics.addCommit(Nfs3Utils
1147           .getElapsedTime(commit.startTime));
1148       Nfs3Utils.writeChannelCommit(commit.getChannel(), response
1149           .serialize(new XDR(), commit.getXid(),
1150               new VerifierNone()), commit.getXid());
1151 
1152       if (LOG.isDebugEnabled()) {
1153         LOG.debug("FileId: " + latestAttr.getFileId() + " Service time: "
1154             + Nfs3Utils.getElapsedTime(commit.startTime)
1155             + "ns. Sent response for commit: " + commit);
1156       }
1157       entry = pendingCommits.firstEntry();
1158     }
1159   }
1160 
doSingleWrite(final WriteCtx writeCtx)1161   private void doSingleWrite(final WriteCtx writeCtx) {
1162     Channel channel = writeCtx.getChannel();
1163     int xid = writeCtx.getXid();
1164 
1165     long offset = writeCtx.getOffset();
1166     int count = writeCtx.getCount();
1167     WriteStableHow stableHow = writeCtx.getStableHow();
1168 
1169     FileHandle handle = writeCtx.getHandle();
1170     if (LOG.isDebugEnabled()) {
1171       LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
1172           + offset + " length: " + count + " stableHow: " + stableHow.name());
1173     }
1174 
1175     try {
1176       // The write is not protected by lock. asyncState is used to make sure
1177       // there is one thread doing write back at any time
1178       writeCtx.writeData(fos);
1179       RpcProgramNfs3.metrics.incrBytesWritten(writeCtx.getCount());
1180 
1181       long flushedOffset = getFlushedOffset();
1182       if (flushedOffset != (offset + count)) {
1183         throw new IOException("output stream is out of sync, pos="
1184             + flushedOffset + " and nextOffset should be"
1185             + (offset + count));
1186       }
1187 
1188 
1189       // Reduce memory occupation size if request was allowed dumped
1190       if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
1191         synchronized (writeCtx) {
1192           if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
1193             writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
1194             updateNonSequentialWriteInMemory(-count);
1195             if (LOG.isDebugEnabled()) {
1196               LOG.debug("After writing " + handle.getFileId() + " at offset "
1197                   + offset + ", updated the memory count, new value: "
1198                   + nonSequentialWriteInMemory.get());
1199             }
1200           }
1201         }
1202       }
1203 
1204       if (!writeCtx.getReplied()) {
1205         if (stableHow != WriteStableHow.UNSTABLE) {
1206           LOG.info("Do sync for stable write: " + writeCtx);
1207           try {
1208             if (stableHow == WriteStableHow.DATA_SYNC) {
1209               fos.hsync();
1210             } else {
1211               Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC,
1212                   "Unknown WriteStableHow: " + stableHow);
1213               // Sync file data and length
1214               fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
1215             }
1216           } catch (IOException e) {
1217             LOG.error("hsync failed with writeCtx: " + writeCtx, e);
1218             throw e;
1219           }
1220         }
1221 
1222         WccAttr preOpAttr = latestAttr.getWccAttr();
1223         WccData fileWcc = new WccData(preOpAttr, latestAttr);
1224         if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) {
1225           LOG.warn("Return original count: " + writeCtx.getOriginalCount()
1226               + " instead of real data count: " + count);
1227           count = writeCtx.getOriginalCount();
1228         }
1229         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK,
1230             fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
1231         RpcProgramNfs3.metrics.addWrite(Nfs3Utils.getElapsedTime(writeCtx.startTime));
1232         Nfs3Utils.writeChannel(channel, response.serialize(
1233             new XDR(), xid, new VerifierNone()), xid);
1234       }
1235 
1236       // Handle the waiting commits without holding any lock
1237       processCommits(writeCtx.getOffset() + writeCtx.getCount());
1238 
1239     } catch (IOException e) {
1240       LOG.error("Error writing to fileId " + handle.getFileId() + " at offset "
1241           + offset + " and length " + count, e);
1242       if (!writeCtx.getReplied()) {
1243         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO);
1244         Nfs3Utils.writeChannel(channel, response.serialize(
1245             new XDR(), xid, new VerifierNone()), xid);
1246         // Keep stream open. Either client retries or SteamMonitor closes it.
1247       }
1248 
1249       LOG.info("Clean up open file context for fileId: "
1250           + latestAttr.getFileId());
1251       cleanup();
1252     }
1253   }
1254 
cleanup()1255   synchronized void cleanup() {
1256     if (!activeState) {
1257       LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
1258       return;
1259     }
1260     activeState = false;
1261 
1262     // stop the dump thread
1263     if (dumpThread != null && dumpThread.isAlive()) {
1264       dumpThread.interrupt();
1265       try {
1266         dumpThread.join(3000);
1267       } catch (InterruptedException ignored) {
1268       }
1269     }
1270 
1271     // Close stream
1272     try {
1273       if (fos != null) {
1274         fos.close();
1275       }
1276     } catch (IOException e) {
1277       LOG.info("Can't close stream for fileId: " + latestAttr.getFileId()
1278           + ", error: " + e);
1279     }
1280 
1281     // Reply error for pending writes
1282     LOG.info("There are " + pendingWrites.size() + " pending writes.");
1283     WccAttr preOpAttr = latestAttr.getWccAttr();
1284     while (!pendingWrites.isEmpty()) {
1285       OffsetRange key = pendingWrites.firstKey();
1286       LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax()
1287           + "), nextOffset=" + nextOffset.get());
1288 
1289       WriteCtx writeCtx = pendingWrites.remove(key);
1290       if (!writeCtx.getReplied()) {
1291         WccData fileWcc = new WccData(preOpAttr, latestAttr);
1292         WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO,
1293             fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF);
1294         Nfs3Utils.writeChannel(writeCtx.getChannel(), response
1295             .serialize(new XDR(), writeCtx.getXid(),
1296                 new VerifierNone()), writeCtx.getXid());
1297       }
1298     }
1299 
1300     // Cleanup dump file
1301     if (dumpOut != null) {
1302       try {
1303         dumpOut.close();
1304       } catch (IOException e) {
1305         LOG.error("Failed to close outputstream of dump file" + dumpFilePath, e);
1306       }
1307       File dumpFile = new File(dumpFilePath);
1308       if (dumpFile.exists() && !dumpFile.delete()) {
1309         LOG.error("Failed to delete dumpfile: " + dumpFile);
1310       }
1311     }
1312     if (raf != null) {
1313       try {
1314         raf.close();
1315       } catch (IOException e) {
1316         LOG.error("Got exception when closing input stream of dump file.", e);
1317       }
1318     }
1319   }
1320 
1321   @VisibleForTesting
getPendingWritesForTest()1322   ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){
1323     return pendingWrites;
1324   }
1325 
1326   @VisibleForTesting
getPendingCommitsForTest()1327   ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){
1328     return pendingCommits;
1329   }
1330 
1331   @VisibleForTesting
getNextOffsetForTest()1332   long getNextOffsetForTest() {
1333     return nextOffset.get();
1334   }
1335 
1336   @VisibleForTesting
setNextOffsetForTest(long newValue)1337   void setNextOffsetForTest(long newValue) {
1338     nextOffset.set(newValue);
1339   }
1340 
1341   @VisibleForTesting
setActiveStatusForTest(boolean activeState)1342   void setActiveStatusForTest(boolean activeState) {
1343     this.activeState = activeState;
1344   }
1345 
1346   @Override
toString()1347   public String toString() {
1348     return String.format("activeState: %b asyncStatus: %b nextOffset: %d",
1349         activeState, asyncStatus, nextOffset.get());
1350   }
1351 }
1352