1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.nfs.nfs3; 19 20 import java.io.File; 21 import java.io.FileNotFoundException; 22 import java.io.FileOutputStream; 23 import java.io.IOException; 24 import java.io.RandomAccessFile; 25 import java.nio.ByteBuffer; 26 import java.nio.channels.ClosedChannelException; 27 import java.util.EnumSet; 28 import java.util.Iterator; 29 import java.util.Map.Entry; 30 import java.util.concurrent.ConcurrentNavigableMap; 31 import java.util.concurrent.ConcurrentSkipListMap; 32 import java.util.concurrent.atomic.AtomicLong; 33 34 import org.apache.commons.logging.Log; 35 import org.apache.commons.logging.LogFactory; 36 import org.apache.hadoop.fs.FSDataInputStream; 37 import org.apache.hadoop.hdfs.DFSClient; 38 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 39 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; 40 import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; 41 import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; 42 import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx.DataState; 43 import org.apache.hadoop.io.BytesWritable.Comparator; 44 import org.apache.hadoop.io.IOUtils; 45 import org.apache.hadoop.nfs.nfs3.FileHandle; 46 import org.apache.hadoop.nfs.nfs3.Nfs3Constant; 47 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; 48 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; 49 import org.apache.hadoop.nfs.nfs3.Nfs3Status; 50 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; 51 import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response; 52 import org.apache.hadoop.nfs.nfs3.response.WRITE3Response; 53 import org.apache.hadoop.nfs.nfs3.response.WccAttr; 54 import org.apache.hadoop.nfs.nfs3.response.WccData; 55 import org.apache.hadoop.oncrpc.XDR; 56 import org.apache.hadoop.oncrpc.security.VerifierNone; 57 import org.apache.hadoop.security.IdMappingServiceProvider; 58 import org.apache.hadoop.util.Daemon; 59 import org.apache.hadoop.util.Time; 60 import org.jboss.netty.channel.Channel; 61 62 import com.google.common.annotations.VisibleForTesting; 63 import com.google.common.base.Preconditions; 64 65 /** 66 * OpenFileCtx saves the context of one HDFS file output stream. Access to it is 67 * synchronized by its member lock. 68 */ 69 class OpenFileCtx { 70 public static final Log LOG = LogFactory.getLog(OpenFileCtx.class); 71 72 // Pending writes water mark for dump, 1MB 73 private static long DUMP_WRITE_WATER_MARK = 1024 * 1024; 74 75 static enum COMMIT_STATUS { 76 COMMIT_FINISHED, 77 COMMIT_WAIT, 78 COMMIT_INACTIVE_CTX, 79 COMMIT_INACTIVE_WITH_PENDING_WRITE, 80 COMMIT_ERROR, 81 COMMIT_DO_SYNC, 82 /** 83 * Deferred COMMIT response could fail file uploading. The following two 84 * status are introduced as a solution. 1. if client asks to commit 85 * non-sequential trunk of data, NFS gateway return success with the hope 86 * that client will send the prerequisite writes. 2. if client asks to 87 * commit a sequential trunk(means it can be flushed to HDFS), NFS gateway 88 * return a special error NFS3ERR_JUKEBOX indicating the client needs to 89 * retry. Meanwhile, NFS gateway keeps flush data to HDFS and do sync 90 * eventually. 91 * 92 * The reason to let client wait is that, we want the client to wait for the 93 * last commit. Otherwise, client thinks file upload finished (e.g., cp 94 * command returns success) but NFS could be still flushing staged data to 95 * HDFS. However, we don't know which one is the last commit. We make the 96 * assumption that a commit after sequential writes may be the last. 97 * Referring HDFS-7259 for more details. 98 * */ 99 COMMIT_SPECIAL_WAIT, // scoped pending writes is sequential 100 COMMIT_SPECIAL_SUCCESS;// scoped pending writes is not sequential 101 } 102 103 private final DFSClient client; 104 private final IdMappingServiceProvider iug; 105 106 // The stream status. False means the stream is closed. 107 private volatile boolean activeState; 108 // The stream write-back status. True means one thread is doing write back. 109 private volatile boolean asyncStatus; 110 private volatile long asyncWriteBackStartOffset; 111 112 /** 113 * The current offset of the file in HDFS. All the content before this offset 114 * has been written back to HDFS. 115 */ 116 private AtomicLong nextOffset; 117 private final HdfsDataOutputStream fos; 118 private final boolean aixCompatMode; 119 120 // It's updated after each sync to HDFS 121 private Nfs3FileAttributes latestAttr; 122 123 private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites; 124 125 private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits; 126 127 static class CommitCtx { 128 private final long offset; 129 private final Channel channel; 130 private final int xid; 131 private final Nfs3FileAttributes preOpAttr; 132 133 public final long startTime; 134 getOffset()135 long getOffset() { 136 return offset; 137 } 138 getChannel()139 Channel getChannel() { 140 return channel; 141 } 142 getXid()143 int getXid() { 144 return xid; 145 } 146 getPreOpAttr()147 Nfs3FileAttributes getPreOpAttr() { 148 return preOpAttr; 149 } 150 getStartTime()151 long getStartTime() { 152 return startTime; 153 } 154 CommitCtx(long offset, Channel channel, int xid, Nfs3FileAttributes preOpAttr)155 CommitCtx(long offset, Channel channel, int xid, 156 Nfs3FileAttributes preOpAttr) { 157 this.offset = offset; 158 this.channel = channel; 159 this.xid = xid; 160 this.preOpAttr = preOpAttr; 161 this.startTime = System.nanoTime(); 162 } 163 164 @Override toString()165 public String toString() { 166 return String.format("offset: %d xid: %d startTime: %d", offset, xid, 167 startTime); 168 } 169 } 170 171 // The last write, commit request or write-back event. Updating time to keep 172 // output steam alive. 173 private long lastAccessTime; 174 175 private volatile boolean enabledDump; 176 private FileOutputStream dumpOut; 177 178 /** Tracks the data buffered in memory related to non sequential writes */ 179 private AtomicLong nonSequentialWriteInMemory; 180 181 private RandomAccessFile raf; 182 private final String dumpFilePath; 183 private Daemon dumpThread; 184 private final boolean uploadLargeFile; 185 updateLastAccessTime()186 private void updateLastAccessTime() { 187 lastAccessTime = Time.monotonicNow(); 188 } 189 checkStreamTimeout(long streamTimeout)190 private boolean checkStreamTimeout(long streamTimeout) { 191 return Time.monotonicNow() - lastAccessTime > streamTimeout; 192 } 193 getLastAccessTime()194 long getLastAccessTime() { 195 return lastAccessTime; 196 } 197 getNextOffset()198 public long getNextOffset() { 199 return nextOffset.get(); 200 } 201 getActiveState()202 boolean getActiveState() { 203 return this.activeState; 204 } 205 hasPendingWork()206 boolean hasPendingWork() { 207 return (pendingWrites.size() != 0 || pendingCommits.size() != 0); 208 } 209 210 /** Increase or decrease the memory occupation of non-sequential writes */ updateNonSequentialWriteInMemory(long count)211 private long updateNonSequentialWriteInMemory(long count) { 212 long newValue = nonSequentialWriteInMemory.addAndGet(count); 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value: " 215 + newValue); 216 } 217 218 Preconditions.checkState(newValue >= 0, 219 "nonSequentialWriteInMemory is negative " + newValue 220 + " after update with count " + count); 221 return newValue; 222 } 223 OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug)224 OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, 225 String dumpFilePath, DFSClient client, IdMappingServiceProvider iug) { 226 this(fos, latestAttr, dumpFilePath, client, iug, false, 227 new NfsConfiguration()); 228 } 229 OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, boolean aixCompatMode, NfsConfiguration config)230 OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, 231 String dumpFilePath, DFSClient client, IdMappingServiceProvider iug, 232 boolean aixCompatMode, NfsConfiguration config) { 233 this.fos = fos; 234 this.latestAttr = latestAttr; 235 this.aixCompatMode = aixCompatMode; 236 // We use the ReverseComparatorOnMin as the comparator of the map. In this 237 // way, we first dump the data with larger offset. In the meanwhile, we 238 // retrieve the last element to write back to HDFS. 239 pendingWrites = new ConcurrentSkipListMap<OffsetRange, WriteCtx>( 240 OffsetRange.ReverseComparatorOnMin); 241 242 pendingCommits = new ConcurrentSkipListMap<Long, CommitCtx>(); 243 244 updateLastAccessTime(); 245 activeState = true; 246 asyncStatus = false; 247 asyncWriteBackStartOffset = 0; 248 dumpOut = null; 249 raf = null; 250 nonSequentialWriteInMemory = new AtomicLong(0); 251 252 this.dumpFilePath = dumpFilePath; 253 enabledDump = dumpFilePath != null; 254 nextOffset = new AtomicLong(); 255 nextOffset.set(latestAttr.getSize()); 256 try { 257 assert(nextOffset.get() == this.fos.getPos()); 258 } catch (IOException e) {} 259 dumpThread = null; 260 this.client = client; 261 this.iug = iug; 262 this.uploadLargeFile = config.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, 263 NfsConfigKeys.LARGE_FILE_UPLOAD_DEFAULT); 264 } 265 getLatestAttr()266 public Nfs3FileAttributes getLatestAttr() { 267 return latestAttr; 268 } 269 270 // Get flushed offset. Note that flushed data may not be persisted. getFlushedOffset()271 private long getFlushedOffset() throws IOException { 272 return fos.getPos(); 273 } 274 275 // Check if need to dump the new writes waitForDump()276 private void waitForDump() { 277 if (!enabledDump) { 278 if (LOG.isDebugEnabled()) { 279 LOG.debug("Do nothing, dump is disabled."); 280 } 281 return; 282 } 283 284 if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { 285 return; 286 } 287 288 // wake up the dumper thread to dump the data 289 synchronized (this) { 290 if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { 291 if (LOG.isDebugEnabled()) { 292 LOG.debug("Asking dumper to dump..."); 293 } 294 if (dumpThread == null) { 295 dumpThread = new Daemon(new Dumper()); 296 dumpThread.start(); 297 } else { 298 this.notifyAll(); 299 } 300 } 301 302 while (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { 303 try { 304 this.wait(); 305 } catch (InterruptedException ignored) { 306 } 307 } 308 309 } 310 } 311 312 class Dumper implements Runnable { 313 /** Dump data into a file */ dump()314 private void dump() { 315 // Create dump outputstream for the first time 316 if (dumpOut == null) { 317 LOG.info("Create dump file: " + dumpFilePath); 318 File dumpFile = new File(dumpFilePath); 319 try { 320 synchronized (this) { 321 // check if alive again 322 Preconditions.checkState(dumpFile.createNewFile(), 323 "The dump file should not exist: %s", dumpFilePath); 324 dumpOut = new FileOutputStream(dumpFile); 325 } 326 } catch (IOException e) { 327 LOG.error("Got failure when creating dump stream " + dumpFilePath, e); 328 enabledDump = false; 329 if (dumpOut != null) { 330 try { 331 dumpOut.close(); 332 } catch (IOException e1) { 333 LOG.error("Can't close dump stream " + dumpFilePath, e); 334 } 335 } 336 return; 337 } 338 } 339 340 // Get raf for the first dump 341 if (raf == null) { 342 try { 343 raf = new RandomAccessFile(dumpFilePath, "r"); 344 } catch (FileNotFoundException e) { 345 LOG.error("Can't get random access to file " + dumpFilePath); 346 // Disable dump 347 enabledDump = false; 348 return; 349 } 350 } 351 352 if (LOG.isDebugEnabled()) { 353 LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == " 354 + nonSequentialWriteInMemory.get()); 355 } 356 357 Iterator<OffsetRange> it = pendingWrites.keySet().iterator(); 358 while (activeState && it.hasNext() 359 && nonSequentialWriteInMemory.get() > 0) { 360 OffsetRange key = it.next(); 361 WriteCtx writeCtx = pendingWrites.get(key); 362 if (writeCtx == null) { 363 // This write was just deleted 364 continue; 365 } 366 try { 367 long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); 368 if (dumpedDataSize > 0) { 369 updateNonSequentialWriteInMemory(-dumpedDataSize); 370 } 371 } catch (IOException e) { 372 LOG.error("Dump data failed: " + writeCtx + " with error: " + e 373 + " OpenFileCtx state: " + activeState); 374 // Disable dump 375 enabledDump = false; 376 return; 377 } 378 } 379 380 if (LOG.isDebugEnabled()) { 381 LOG.debug("After dump, nonSequentialWriteInMemory == " 382 + nonSequentialWriteInMemory.get()); 383 } 384 } 385 386 @Override run()387 public void run() { 388 while (activeState && enabledDump) { 389 try { 390 if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { 391 dump(); 392 } 393 synchronized (OpenFileCtx.this) { 394 if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { 395 OpenFileCtx.this.notifyAll(); 396 try { 397 OpenFileCtx.this.wait(); 398 if (LOG.isDebugEnabled()) { 399 LOG.debug("Dumper woke up"); 400 } 401 } catch (InterruptedException e) { 402 LOG.info("Dumper is interrupted, dumpFilePath= " 403 + OpenFileCtx.this.dumpFilePath); 404 } 405 } 406 } 407 if (LOG.isDebugEnabled()) { 408 LOG.debug("Dumper checking OpenFileCtx activeState: " + activeState 409 + " enabledDump: " + enabledDump); 410 } 411 } catch (Throwable t) { 412 // unblock threads with new request 413 synchronized (OpenFileCtx.this) { 414 OpenFileCtx.this.notifyAll(); 415 } 416 LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: " 417 + OpenFileCtx.this.dumpFilePath, t); 418 activeState = false; 419 } 420 } 421 } 422 } 423 checkRepeatedWriteRequest(WRITE3Request request, Channel channel, int xid)424 private WriteCtx checkRepeatedWriteRequest(WRITE3Request request, 425 Channel channel, int xid) { 426 OffsetRange range = new OffsetRange(request.getOffset(), 427 request.getOffset() + request.getCount()); 428 WriteCtx writeCtx = pendingWrites.get(range); 429 if (writeCtx== null) { 430 return null; 431 } else { 432 if (xid != writeCtx.getXid()) { 433 LOG.warn("Got a repeated request, same range, with a different xid: " 434 + xid + " xid in old request: " + writeCtx.getXid()); 435 //TODO: better handling. 436 } 437 return writeCtx; 438 } 439 } 440 receivedNewWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdMappingServiceProvider iug)441 public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request, 442 Channel channel, int xid, AsyncDataService asyncDataService, 443 IdMappingServiceProvider iug) { 444 445 if (!activeState) { 446 LOG.info("OpenFileCtx is inactive, fileId: " 447 + request.getHandle().getFileId()); 448 WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); 449 WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, 450 fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); 451 Nfs3Utils.writeChannel(channel, 452 response.serialize(new XDR(), xid, new VerifierNone()), 453 xid); 454 } else { 455 // Update the write time first 456 updateLastAccessTime(); 457 458 // Handle repeated write requests (same xid or not). 459 // If already replied, send reply again. If not replied, drop the 460 // repeated request. 461 WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, 462 xid); 463 if (existantWriteCtx != null) { 464 if (!existantWriteCtx.getReplied()) { 465 if (LOG.isDebugEnabled()) { 466 LOG.debug("Repeated write request which hasn't been served: xid=" 467 + xid + ", drop it."); 468 } 469 } else { 470 if (LOG.isDebugEnabled()) { 471 LOG.debug("Repeated write request which is already served: xid=" 472 + xid + ", resend response."); 473 } 474 WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); 475 WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, 476 fileWcc, request.getCount(), request.getStableHow(), 477 Nfs3Constant.WRITE_COMMIT_VERF); 478 Nfs3Utils.writeChannel(channel, response.serialize( 479 new XDR(), xid, new VerifierNone()), xid); 480 } 481 } else { 482 // not a repeated write request 483 receivedNewWriteInternal(dfsClient, request, channel, xid, 484 asyncDataService, iug); 485 } 486 } 487 } 488 489 @VisibleForTesting alterWriteRequest(WRITE3Request request, long cachedOffset)490 public static void alterWriteRequest(WRITE3Request request, long cachedOffset) { 491 long offset = request.getOffset(); 492 int count = request.getCount(); 493 long smallerCount = offset + count - cachedOffset; 494 if (LOG.isDebugEnabled()) { 495 LOG.debug(String.format("Got overwrite with appended data (%d-%d)," 496 + " current offset %d," + " drop the overlapped section (%d-%d)" 497 + " and append new data (%d-%d).", offset, (offset + count - 1), 498 cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset 499 + count - 1))); 500 } 501 502 ByteBuffer data = request.getData(); 503 Preconditions.checkState(data.position() == 0, 504 "The write request data has non-zero position"); 505 data.position((int) (cachedOffset - offset)); 506 Preconditions.checkState(data.limit() - data.position() == smallerCount, 507 "The write request buffer has wrong limit/position regarding count"); 508 509 request.setOffset(cachedOffset); 510 request.setCount((int) smallerCount); 511 } 512 513 /** 514 * Creates and adds a WriteCtx into the pendingWrites map. This is a 515 * synchronized method to handle concurrent writes. 516 * 517 * @return A non-null {@link WriteCtx} instance if the incoming write 518 * request's offset >= nextOffset. Otherwise null. 519 */ addWritesToCache(WRITE3Request request, Channel channel, int xid)520 private synchronized WriteCtx addWritesToCache(WRITE3Request request, 521 Channel channel, int xid) { 522 long offset = request.getOffset(); 523 int count = request.getCount(); 524 long cachedOffset = nextOffset.get(); 525 int originalCount = WriteCtx.INVALID_ORIGINAL_COUNT; 526 527 if (LOG.isDebugEnabled()) { 528 LOG.debug("requested offset=" + offset + " and current offset=" 529 + cachedOffset); 530 } 531 532 // Handle a special case first 533 if ((offset < cachedOffset) && (offset + count > cachedOffset)) { 534 // One Linux client behavior: after a file is closed and reopened to 535 // write, the client sometimes combines previous written data(could still 536 // be in kernel buffer) with newly appended data in one write. This is 537 // usually the first write after file reopened. In this 538 // case, we log the event and drop the overlapped section. 539 LOG.warn(String.format("Got overwrite with appended data (%d-%d)," 540 + " current offset %d," + " drop the overlapped section (%d-%d)" 541 + " and append new data (%d-%d).", offset, (offset + count - 1), 542 cachedOffset, offset, (cachedOffset - 1), cachedOffset, (offset 543 + count - 1))); 544 545 if (!pendingWrites.isEmpty()) { 546 LOG.warn("There are other pending writes, fail this jumbo write"); 547 return null; 548 } 549 550 LOG.warn("Modify this write to write only the appended data"); 551 alterWriteRequest(request, cachedOffset); 552 553 // Update local variable 554 originalCount = count; 555 offset = request.getOffset(); 556 count = request.getCount(); 557 } 558 559 // Fail non-append call 560 if (offset < cachedOffset) { 561 LOG.warn("(offset,count,nextOffset): " + "(" + offset + "," + count + "," 562 + nextOffset + ")"); 563 return null; 564 } else { 565 DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP 566 : WriteCtx.DataState.ALLOW_DUMP; 567 WriteCtx writeCtx = new WriteCtx(request.getHandle(), 568 request.getOffset(), request.getCount(), originalCount, 569 request.getStableHow(), request.getData(), channel, xid, false, 570 dataState); 571 if (LOG.isDebugEnabled()) { 572 LOG.debug("Add new write to the list with nextOffset " + cachedOffset 573 + " and requested offset=" + offset); 574 } 575 if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { 576 // update the memory size 577 updateNonSequentialWriteInMemory(count); 578 } 579 // check if there is a WriteCtx with the same range in pendingWrites 580 WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); 581 if (oldWriteCtx == null) { 582 pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); 583 if (LOG.isDebugEnabled()) { 584 LOG.debug("New write buffered with xid " + xid + " nextOffset " 585 + cachedOffset + " req offset=" + offset + " mapsize=" 586 + pendingWrites.size()); 587 } 588 } else { 589 LOG.warn("Got a repeated request, same range, with xid: " + xid 590 + " nextOffset " + +cachedOffset + " req offset=" + offset); 591 } 592 return writeCtx; 593 } 594 } 595 596 /** Process an overwrite write request */ processOverWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, IdMappingServiceProvider iug)597 private void processOverWrite(DFSClient dfsClient, WRITE3Request request, 598 Channel channel, int xid, IdMappingServiceProvider iug) { 599 WccData wccData = new WccData(latestAttr.getWccAttr(), null); 600 long offset = request.getOffset(); 601 int count = request.getCount(); 602 WriteStableHow stableHow = request.getStableHow(); 603 WRITE3Response response; 604 long cachedOffset = nextOffset.get(); 605 if (offset + count > cachedOffset) { 606 LOG.warn("Treat this jumbo write as a real random write, no support."); 607 response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, 608 WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF); 609 } else { 610 if (LOG.isDebugEnabled()) { 611 LOG.debug("Process perfectOverWrite"); 612 } 613 // TODO: let executor handle perfect overwrite 614 response = processPerfectOverWrite(dfsClient, offset, count, stableHow, 615 request.getData().array(), 616 Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); 617 } 618 updateLastAccessTime(); 619 Nfs3Utils.writeChannel(channel, 620 response.serialize(new XDR(), xid, new VerifierNone()), 621 xid); 622 } 623 624 /** 625 * Check if we can start the write (back to HDFS) now. If there is no hole for 626 * writing, and there is no other threads writing (i.e., asyncStatus is 627 * false), start the writing and set asyncStatus to true. 628 * 629 * @return True if the new write is sequential and we can start writing 630 * (including the case that there is already a thread writing). 631 */ checkAndStartWrite( AsyncDataService asyncDataService, WriteCtx writeCtx)632 private synchronized boolean checkAndStartWrite( 633 AsyncDataService asyncDataService, WriteCtx writeCtx) { 634 635 if (writeCtx.getOffset() == nextOffset.get()) { 636 if (!asyncStatus) { 637 if (LOG.isDebugEnabled()) { 638 LOG.debug("Trigger the write back task. Current nextOffset: " 639 + nextOffset.get()); 640 } 641 asyncStatus = true; 642 asyncWriteBackStartOffset = writeCtx.getOffset(); 643 asyncDataService.execute(new AsyncDataService.WriteBackTask(this)); 644 } else { 645 if (LOG.isDebugEnabled()) { 646 LOG.debug("The write back thread is working."); 647 } 648 } 649 return true; 650 } else { 651 return false; 652 } 653 } 654 receivedNewWriteInternal(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdMappingServiceProvider iug)655 private void receivedNewWriteInternal(DFSClient dfsClient, 656 WRITE3Request request, Channel channel, int xid, 657 AsyncDataService asyncDataService, IdMappingServiceProvider iug) { 658 WriteStableHow stableHow = request.getStableHow(); 659 WccAttr preOpAttr = latestAttr.getWccAttr(); 660 int count = request.getCount(); 661 662 WriteCtx writeCtx = addWritesToCache(request, channel, xid); 663 if (writeCtx == null) { 664 // offset < nextOffset 665 processOverWrite(dfsClient, request, channel, xid, iug); 666 } else { 667 // The write is added to pendingWrites. 668 // Check and start writing back if necessary 669 boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); 670 if (!startWriting) { 671 // offset > nextOffset. check if we need to dump data 672 waitForDump(); 673 674 // In test, noticed some Linux client sends a batch (e.g., 1MB) 675 // of reordered writes and won't send more writes until it gets 676 // responses of the previous batch. So here send response immediately 677 // for unstable non-sequential write 678 if (stableHow != WriteStableHow.UNSTABLE) { 679 LOG.info("Have to change stable write to unstable write: " 680 + request.getStableHow()); 681 stableHow = WriteStableHow.UNSTABLE; 682 } 683 684 if (LOG.isDebugEnabled()) { 685 LOG.debug("UNSTABLE write request, send response for offset: " 686 + writeCtx.getOffset()); 687 } 688 WccData fileWcc = new WccData(preOpAttr, latestAttr); 689 WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, 690 fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); 691 RpcProgramNfs3.metrics.addWrite(Nfs3Utils 692 .getElapsedTime(writeCtx.startTime)); 693 Nfs3Utils 694 .writeChannel(channel, response.serialize(new XDR(), 695 xid, new VerifierNone()), xid); 696 writeCtx.setReplied(true); 697 } 698 } 699 } 700 701 /** 702 * Honor 2 kinds of overwrites: 1). support some application like touch(write 703 * the same content back to change mtime), 2) client somehow sends the same 704 * write again in a different RPC. 705 */ processPerfectOverWrite(DFSClient dfsClient, long offset, int count, WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdMappingServiceProvider iug)706 private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, 707 long offset, int count, WriteStableHow stableHow, byte[] data, 708 String path, WccData wccData, IdMappingServiceProvider iug) { 709 WRITE3Response response; 710 711 // Read the content back 712 byte[] readbuffer = new byte[count]; 713 714 int readCount = 0; 715 FSDataInputStream fis = null; 716 try { 717 // Sync file data and length to avoid partial read failure 718 fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); 719 } catch (ClosedChannelException closedException) { 720 LOG.info("The FSDataOutputStream has been closed. " 721 + "Continue processing the perfect overwrite."); 722 } catch (IOException e) { 723 LOG.info("hsync failed when processing possible perfect overwrite, path=" 724 + path + " error: " + e); 725 return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, 726 Nfs3Constant.WRITE_COMMIT_VERF); 727 } 728 729 try { 730 fis = dfsClient.createWrappedInputStream(dfsClient.open(path)); 731 readCount = fis.read(offset, readbuffer, 0, count); 732 if (readCount < count) { 733 LOG.error("Can't read back " + count + " bytes, partial read size: " 734 + readCount); 735 return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, 736 Nfs3Constant.WRITE_COMMIT_VERF); 737 } 738 } catch (IOException e) { 739 LOG.info("Read failed when processing possible perfect overwrite, path=" 740 + path, e); 741 return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, 742 Nfs3Constant.WRITE_COMMIT_VERF); 743 } finally { 744 IOUtils.cleanup(LOG, fis); 745 } 746 747 // Compare with the request 748 Comparator comparator = new Comparator(); 749 if (comparator.compare(readbuffer, 0, readCount, data, 0, count) != 0) { 750 LOG.info("Perfect overwrite has different content"); 751 response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, 752 stableHow, Nfs3Constant.WRITE_COMMIT_VERF); 753 } else { 754 LOG.info("Perfect overwrite has same content," 755 + " updating the mtime, then return success"); 756 Nfs3FileAttributes postOpAttr = null; 757 try { 758 dfsClient.setTimes(path, Time.monotonicNow(), -1); 759 postOpAttr = Nfs3Utils.getFileAttr(dfsClient, path, iug); 760 } catch (IOException e) { 761 LOG.info("Got error when processing perfect overwrite, path=" + path 762 + " error: " + e); 763 return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, 764 Nfs3Constant.WRITE_COMMIT_VERF); 765 } 766 767 wccData.setPostOpAttr(postOpAttr); 768 response = new WRITE3Response(Nfs3Status.NFS3_OK, wccData, count, 769 stableHow, Nfs3Constant.WRITE_COMMIT_VERF); 770 } 771 return response; 772 } 773 774 /** 775 * Check the commit status with the given offset 776 * @param commitOffset the offset to commit 777 * @param channel the channel to return response 778 * @param xid the xid of the commit request 779 * @param preOpAttr the preOp attribute 780 * @param fromRead whether the commit is triggered from read request 781 * @return one commit status: COMMIT_FINISHED, COMMIT_WAIT, 782 * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR 783 */ checkCommit(DFSClient dfsClient, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead)784 public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset, 785 Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) { 786 if (!fromRead) { 787 Preconditions.checkState(channel != null && preOpAttr != null); 788 // Keep stream active 789 updateLastAccessTime(); 790 } 791 Preconditions.checkState(commitOffset >= 0); 792 793 COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid, 794 preOpAttr, fromRead); 795 if (LOG.isDebugEnabled()) { 796 LOG.debug("Got commit status: " + ret.name()); 797 } 798 // Do the sync outside the lock 799 if (ret == COMMIT_STATUS.COMMIT_DO_SYNC 800 || ret == COMMIT_STATUS.COMMIT_FINISHED) { 801 try { 802 // Sync file data and length 803 fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); 804 ret = COMMIT_STATUS.COMMIT_FINISHED; // Remove COMMIT_DO_SYNC status 805 // Nothing to do for metadata since attr related change is pass-through 806 } catch (ClosedChannelException cce) { 807 if (pendingWrites.isEmpty()) { 808 ret = COMMIT_STATUS.COMMIT_FINISHED; 809 } else { 810 ret = COMMIT_STATUS.COMMIT_ERROR; 811 } 812 } catch (IOException e) { 813 LOG.error("Got stream error during data sync: " + e); 814 // Do nothing. Stream will be closed eventually by StreamMonitor. 815 // status = Nfs3Status.NFS3ERR_IO; 816 ret = COMMIT_STATUS.COMMIT_ERROR; 817 } 818 } 819 return ret; 820 } 821 822 // Check if the to-commit range is sequential 823 @VisibleForTesting checkSequential(final long commitOffset, final long nextOffset)824 synchronized boolean checkSequential(final long commitOffset, 825 final long nextOffset) { 826 Preconditions.checkState(commitOffset >= nextOffset, "commitOffset " 827 + commitOffset + " less than nextOffset " + nextOffset); 828 long offset = nextOffset; 829 Iterator<OffsetRange> it = pendingWrites.descendingKeySet().iterator(); 830 while (it.hasNext()) { 831 OffsetRange range = it.next(); 832 if (range.getMin() != offset) { 833 // got a hole 834 return false; 835 } 836 offset = range.getMax(); 837 if (offset > commitOffset) { 838 return true; 839 } 840 } 841 // there is gap between the last pending write and commitOffset 842 return false; 843 } 844 handleSpecialWait(boolean fromRead, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr)845 private COMMIT_STATUS handleSpecialWait(boolean fromRead, long commitOffset, 846 Channel channel, int xid, Nfs3FileAttributes preOpAttr) { 847 if (!fromRead) { 848 // let client retry the same request, add pending commit to sync later 849 CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, preOpAttr); 850 pendingCommits.put(commitOffset, commitCtx); 851 } 852 if (LOG.isDebugEnabled()) { 853 LOG.debug("return COMMIT_SPECIAL_WAIT"); 854 } 855 return COMMIT_STATUS.COMMIT_SPECIAL_WAIT; 856 } 857 858 @VisibleForTesting checkCommitInternal(long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead)859 synchronized COMMIT_STATUS checkCommitInternal(long commitOffset, 860 Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean fromRead) { 861 if (!activeState) { 862 if (pendingWrites.isEmpty()) { 863 return COMMIT_STATUS.COMMIT_INACTIVE_CTX; 864 } else { 865 // TODO: return success if already committed 866 return COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE; 867 } 868 } 869 870 long flushed = 0; 871 try { 872 flushed = getFlushedOffset(); 873 } catch (IOException e) { 874 LOG.error("Can't get flushed offset, error:" + e); 875 return COMMIT_STATUS.COMMIT_ERROR; 876 } 877 878 if (LOG.isDebugEnabled()) { 879 LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset 880 + "nextOffset=" + nextOffset.get()); 881 } 882 883 if (pendingWrites.isEmpty()) { 884 if (aixCompatMode) { 885 // Note that, there is no guarantee data is synced. Caller should still 886 // do a sync here though the output stream might be closed. 887 return COMMIT_STATUS.COMMIT_FINISHED; 888 } else { 889 if (flushed < nextOffset.get()) { 890 if (LOG.isDebugEnabled()) { 891 LOG.debug("get commit while still writing to the requested offset," 892 + " with empty queue"); 893 } 894 return handleSpecialWait(fromRead, nextOffset.get(), channel, xid, 895 preOpAttr); 896 } else { 897 return COMMIT_STATUS.COMMIT_FINISHED; 898 } 899 } 900 } 901 902 Preconditions.checkState(flushed <= nextOffset.get(), "flushed " + flushed 903 + " is larger than nextOffset " + nextOffset.get()); 904 // Handle large file upload 905 if (uploadLargeFile && !aixCompatMode) { 906 long co = (commitOffset > 0) ? commitOffset : pendingWrites.firstEntry() 907 .getKey().getMax() - 1; 908 909 if (co <= flushed) { 910 return COMMIT_STATUS.COMMIT_DO_SYNC; 911 } else if (co < nextOffset.get()) { 912 if (LOG.isDebugEnabled()) { 913 LOG.debug("get commit while still writing to the requested offset"); 914 } 915 return handleSpecialWait(fromRead, co, channel, xid, preOpAttr); 916 } else { 917 // co >= nextOffset 918 if (checkSequential(co, nextOffset.get())) { 919 return handleSpecialWait(fromRead, co, channel, xid, preOpAttr); 920 } else { 921 if (LOG.isDebugEnabled()) { 922 LOG.debug("return COMMIT_SPECIAL_SUCCESS"); 923 } 924 return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS; 925 } 926 } 927 } 928 929 if (commitOffset > 0) { 930 if (aixCompatMode) { 931 // The AIX NFS client misinterprets RFC-1813 and will always send 4096 932 // for the commitOffset even if fewer bytes than that have ever (or will 933 // ever) be sent by the client. So, if in AIX compatibility mode, we 934 // will always DO_SYNC if the number of bytes to commit have already all 935 // been flushed, else we will fall through to the logic below which 936 // checks for pending writes in the case that we're being asked to 937 // commit more bytes than have so far been flushed. See HDFS-6549 for 938 // more info. 939 if (commitOffset <= flushed) { 940 return COMMIT_STATUS.COMMIT_DO_SYNC; 941 } 942 } else { 943 if (commitOffset > flushed) { 944 if (!fromRead) { 945 CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid, 946 preOpAttr); 947 pendingCommits.put(commitOffset, commitCtx); 948 } 949 return COMMIT_STATUS.COMMIT_WAIT; 950 } else { 951 return COMMIT_STATUS.COMMIT_DO_SYNC; 952 } 953 } 954 } 955 956 Entry<OffsetRange, WriteCtx> key = pendingWrites.firstEntry(); 957 958 // Commit whole file, commitOffset == 0 959 if (!fromRead) { 960 // Insert commit 961 long maxOffset = key.getKey().getMax() - 1; 962 Preconditions.checkState(maxOffset > 0); 963 CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr); 964 pendingCommits.put(maxOffset, commitCtx); 965 } 966 return COMMIT_STATUS.COMMIT_WAIT; 967 } 968 969 /** 970 * Check stream status to decide if it should be closed 971 * @return true, remove stream; false, keep stream 972 */ streamCleanup(long fileId, long streamTimeout)973 public synchronized boolean streamCleanup(long fileId, long streamTimeout) { 974 Preconditions 975 .checkState(streamTimeout >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT); 976 if (!activeState) { 977 return true; 978 } 979 980 boolean flag = false; 981 // Check the stream timeout 982 if (checkStreamTimeout(streamTimeout)) { 983 if (LOG.isDebugEnabled()) { 984 LOG.debug("stream can be closed for fileId: " + fileId); 985 } 986 flag = true; 987 } 988 return flag; 989 } 990 991 /** 992 * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible. 993 * 994 * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's 995 * offset is larger than nextOffSet. 996 */ offerNextToWrite()997 private synchronized WriteCtx offerNextToWrite() { 998 if (pendingWrites.isEmpty()) { 999 if (LOG.isDebugEnabled()) { 1000 LOG.debug("The async write task has no pending writes, fileId: " 1001 + latestAttr.getFileId()); 1002 } 1003 // process pending commit again to handle this race: a commit is added 1004 // to pendingCommits map just after the last doSingleWrite returns. 1005 // There is no pending write and the commit should be handled by the 1006 // last doSingleWrite. Due to the race, the commit is left along and 1007 // can't be processed until cleanup. Therefore, we should do another 1008 // processCommits to fix the race issue. 1009 processCommits(nextOffset.get()); // nextOffset has same value as 1010 // flushedOffset 1011 this.asyncStatus = false; 1012 return null; 1013 } 1014 1015 Entry<OffsetRange, WriteCtx> lastEntry = pendingWrites.lastEntry(); 1016 OffsetRange range = lastEntry.getKey(); 1017 WriteCtx toWrite = lastEntry.getValue(); 1018 1019 if (LOG.isTraceEnabled()) { 1020 LOG.trace("range.getMin()=" + range.getMin() + " nextOffset=" 1021 + nextOffset); 1022 } 1023 1024 long offset = nextOffset.get(); 1025 if (range.getMin() > offset) { 1026 if (LOG.isDebugEnabled()) { 1027 LOG.debug("The next sequential write has not arrived yet"); 1028 } 1029 processCommits(nextOffset.get()); // handle race 1030 this.asyncStatus = false; 1031 } else if (range.getMin() < offset && range.getMax() > offset) { 1032 // shouldn't happen since we do sync for overlapped concurrent writers 1033 LOG.warn("Got an overlapping write (" + range.getMin() + ", " 1034 + range.getMax() + "), nextOffset=" + offset 1035 + ". Silently drop it now"); 1036 pendingWrites.remove(range); 1037 processCommits(nextOffset.get()); // handle race 1038 } else { 1039 if (LOG.isDebugEnabled()) { 1040 LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax() 1041 + ") from the list"); 1042 } 1043 // after writing, remove the WriteCtx from cache 1044 pendingWrites.remove(range); 1045 // update nextOffset 1046 nextOffset.addAndGet(toWrite.getCount()); 1047 if (LOG.isDebugEnabled()) { 1048 LOG.debug("Change nextOffset to " + nextOffset.get()); 1049 } 1050 return toWrite; 1051 } 1052 1053 return null; 1054 } 1055 1056 /** Invoked by AsyncDataService to write back to HDFS */ executeWriteBack()1057 void executeWriteBack() { 1058 Preconditions.checkState(asyncStatus, 1059 "openFileCtx has false asyncStatus, fileId: " + latestAttr.getFileId()); 1060 final long startOffset = asyncWriteBackStartOffset; 1061 try { 1062 while (activeState) { 1063 // asyncStatus could be changed to false in offerNextToWrite() 1064 WriteCtx toWrite = offerNextToWrite(); 1065 if (toWrite != null) { 1066 // Do the write 1067 doSingleWrite(toWrite); 1068 updateLastAccessTime(); 1069 } else { 1070 break; 1071 } 1072 } 1073 1074 if (!activeState && LOG.isDebugEnabled()) { 1075 LOG.debug("The openFileCtx is not active anymore, fileId: " 1076 + latestAttr.getFileId()); 1077 } 1078 } finally { 1079 // Make sure to reset asyncStatus to false unless a race happens 1080 synchronized (this) { 1081 if (startOffset == asyncWriteBackStartOffset) { 1082 asyncStatus = false; 1083 } else { 1084 LOG.info("Another async task is already started before this one" 1085 + " is finalized. fileId: " + latestAttr.getFileId() 1086 + " asyncStatus: " + asyncStatus + " original startOffset: " 1087 + startOffset + " new startOffset: " + asyncWriteBackStartOffset 1088 + ". Won't change asyncStatus here."); 1089 } 1090 } 1091 } 1092 } 1093 processCommits(long offset)1094 private void processCommits(long offset) { 1095 Preconditions.checkState(offset > 0); 1096 long flushedOffset = 0; 1097 Entry<Long, CommitCtx> entry = null; 1098 1099 int status = Nfs3Status.NFS3ERR_IO; 1100 try { 1101 flushedOffset = getFlushedOffset(); 1102 entry = pendingCommits.firstEntry(); 1103 if (entry == null || entry.getValue().offset > flushedOffset) { 1104 return; 1105 } 1106 1107 // Now do sync for the ready commits 1108 // Sync file data and length 1109 fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); 1110 status = Nfs3Status.NFS3_OK; 1111 } catch (ClosedChannelException cce) { 1112 if (!pendingWrites.isEmpty()) { 1113 LOG.error("Can't sync for fileId: " + latestAttr.getFileId() 1114 + ". Channel closed with writes pending.", cce); 1115 } 1116 status = Nfs3Status.NFS3ERR_IO; 1117 } catch (IOException e) { 1118 LOG.error("Got stream error during data sync: ", e); 1119 // Do nothing. Stream will be closed eventually by StreamMonitor. 1120 status = Nfs3Status.NFS3ERR_IO; 1121 } 1122 1123 // Update latestAttr 1124 try { 1125 latestAttr = Nfs3Utils.getFileAttr(client, 1126 Nfs3Utils.getFileIdPath(latestAttr.getFileId()), iug); 1127 } catch (IOException e) { 1128 LOG.error("Can't get new file attr, fileId: " + latestAttr.getFileId(), e); 1129 status = Nfs3Status.NFS3ERR_IO; 1130 } 1131 1132 if (latestAttr.getSize() != offset) { 1133 LOG.error("After sync, the expect file size: " + offset 1134 + ", however actual file size is: " + latestAttr.getSize()); 1135 status = Nfs3Status.NFS3ERR_IO; 1136 } 1137 WccData wccData = new WccData(Nfs3Utils.getWccAttr(latestAttr), latestAttr); 1138 1139 // Send response for the ready commits 1140 while (entry != null && entry.getValue().offset <= flushedOffset) { 1141 pendingCommits.remove(entry.getKey()); 1142 CommitCtx commit = entry.getValue(); 1143 1144 COMMIT3Response response = new COMMIT3Response(status, wccData, 1145 Nfs3Constant.WRITE_COMMIT_VERF); 1146 RpcProgramNfs3.metrics.addCommit(Nfs3Utils 1147 .getElapsedTime(commit.startTime)); 1148 Nfs3Utils.writeChannelCommit(commit.getChannel(), response 1149 .serialize(new XDR(), commit.getXid(), 1150 new VerifierNone()), commit.getXid()); 1151 1152 if (LOG.isDebugEnabled()) { 1153 LOG.debug("FileId: " + latestAttr.getFileId() + " Service time: " 1154 + Nfs3Utils.getElapsedTime(commit.startTime) 1155 + "ns. Sent response for commit: " + commit); 1156 } 1157 entry = pendingCommits.firstEntry(); 1158 } 1159 } 1160 doSingleWrite(final WriteCtx writeCtx)1161 private void doSingleWrite(final WriteCtx writeCtx) { 1162 Channel channel = writeCtx.getChannel(); 1163 int xid = writeCtx.getXid(); 1164 1165 long offset = writeCtx.getOffset(); 1166 int count = writeCtx.getCount(); 1167 WriteStableHow stableHow = writeCtx.getStableHow(); 1168 1169 FileHandle handle = writeCtx.getHandle(); 1170 if (LOG.isDebugEnabled()) { 1171 LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " 1172 + offset + " length: " + count + " stableHow: " + stableHow.name()); 1173 } 1174 1175 try { 1176 // The write is not protected by lock. asyncState is used to make sure 1177 // there is one thread doing write back at any time 1178 writeCtx.writeData(fos); 1179 RpcProgramNfs3.metrics.incrBytesWritten(writeCtx.getCount()); 1180 1181 long flushedOffset = getFlushedOffset(); 1182 if (flushedOffset != (offset + count)) { 1183 throw new IOException("output stream is out of sync, pos=" 1184 + flushedOffset + " and nextOffset should be" 1185 + (offset + count)); 1186 } 1187 1188 1189 // Reduce memory occupation size if request was allowed dumped 1190 if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { 1191 synchronized (writeCtx) { 1192 if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { 1193 writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); 1194 updateNonSequentialWriteInMemory(-count); 1195 if (LOG.isDebugEnabled()) { 1196 LOG.debug("After writing " + handle.getFileId() + " at offset " 1197 + offset + ", updated the memory count, new value: " 1198 + nonSequentialWriteInMemory.get()); 1199 } 1200 } 1201 } 1202 } 1203 1204 if (!writeCtx.getReplied()) { 1205 if (stableHow != WriteStableHow.UNSTABLE) { 1206 LOG.info("Do sync for stable write: " + writeCtx); 1207 try { 1208 if (stableHow == WriteStableHow.DATA_SYNC) { 1209 fos.hsync(); 1210 } else { 1211 Preconditions.checkState(stableHow == WriteStableHow.FILE_SYNC, 1212 "Unknown WriteStableHow: " + stableHow); 1213 // Sync file data and length 1214 fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); 1215 } 1216 } catch (IOException e) { 1217 LOG.error("hsync failed with writeCtx: " + writeCtx, e); 1218 throw e; 1219 } 1220 } 1221 1222 WccAttr preOpAttr = latestAttr.getWccAttr(); 1223 WccData fileWcc = new WccData(preOpAttr, latestAttr); 1224 if (writeCtx.getOriginalCount() != WriteCtx.INVALID_ORIGINAL_COUNT) { 1225 LOG.warn("Return original count: " + writeCtx.getOriginalCount() 1226 + " instead of real data count: " + count); 1227 count = writeCtx.getOriginalCount(); 1228 } 1229 WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, 1230 fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); 1231 RpcProgramNfs3.metrics.addWrite(Nfs3Utils.getElapsedTime(writeCtx.startTime)); 1232 Nfs3Utils.writeChannel(channel, response.serialize( 1233 new XDR(), xid, new VerifierNone()), xid); 1234 } 1235 1236 // Handle the waiting commits without holding any lock 1237 processCommits(writeCtx.getOffset() + writeCtx.getCount()); 1238 1239 } catch (IOException e) { 1240 LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " 1241 + offset + " and length " + count, e); 1242 if (!writeCtx.getReplied()) { 1243 WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO); 1244 Nfs3Utils.writeChannel(channel, response.serialize( 1245 new XDR(), xid, new VerifierNone()), xid); 1246 // Keep stream open. Either client retries or SteamMonitor closes it. 1247 } 1248 1249 LOG.info("Clean up open file context for fileId: " 1250 + latestAttr.getFileId()); 1251 cleanup(); 1252 } 1253 } 1254 cleanup()1255 synchronized void cleanup() { 1256 if (!activeState) { 1257 LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); 1258 return; 1259 } 1260 activeState = false; 1261 1262 // stop the dump thread 1263 if (dumpThread != null && dumpThread.isAlive()) { 1264 dumpThread.interrupt(); 1265 try { 1266 dumpThread.join(3000); 1267 } catch (InterruptedException ignored) { 1268 } 1269 } 1270 1271 // Close stream 1272 try { 1273 if (fos != null) { 1274 fos.close(); 1275 } 1276 } catch (IOException e) { 1277 LOG.info("Can't close stream for fileId: " + latestAttr.getFileId() 1278 + ", error: " + e); 1279 } 1280 1281 // Reply error for pending writes 1282 LOG.info("There are " + pendingWrites.size() + " pending writes."); 1283 WccAttr preOpAttr = latestAttr.getWccAttr(); 1284 while (!pendingWrites.isEmpty()) { 1285 OffsetRange key = pendingWrites.firstKey(); 1286 LOG.info("Fail pending write: (" + key.getMin() + ", " + key.getMax() 1287 + "), nextOffset=" + nextOffset.get()); 1288 1289 WriteCtx writeCtx = pendingWrites.remove(key); 1290 if (!writeCtx.getReplied()) { 1291 WccData fileWcc = new WccData(preOpAttr, latestAttr); 1292 WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, 1293 fileWcc, 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); 1294 Nfs3Utils.writeChannel(writeCtx.getChannel(), response 1295 .serialize(new XDR(), writeCtx.getXid(), 1296 new VerifierNone()), writeCtx.getXid()); 1297 } 1298 } 1299 1300 // Cleanup dump file 1301 if (dumpOut != null) { 1302 try { 1303 dumpOut.close(); 1304 } catch (IOException e) { 1305 LOG.error("Failed to close outputstream of dump file" + dumpFilePath, e); 1306 } 1307 File dumpFile = new File(dumpFilePath); 1308 if (dumpFile.exists() && !dumpFile.delete()) { 1309 LOG.error("Failed to delete dumpfile: " + dumpFile); 1310 } 1311 } 1312 if (raf != null) { 1313 try { 1314 raf.close(); 1315 } catch (IOException e) { 1316 LOG.error("Got exception when closing input stream of dump file.", e); 1317 } 1318 } 1319 } 1320 1321 @VisibleForTesting getPendingWritesForTest()1322 ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest(){ 1323 return pendingWrites; 1324 } 1325 1326 @VisibleForTesting getPendingCommitsForTest()1327 ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest(){ 1328 return pendingCommits; 1329 } 1330 1331 @VisibleForTesting getNextOffsetForTest()1332 long getNextOffsetForTest() { 1333 return nextOffset.get(); 1334 } 1335 1336 @VisibleForTesting setNextOffsetForTest(long newValue)1337 void setNextOffsetForTest(long newValue) { 1338 nextOffset.set(newValue); 1339 } 1340 1341 @VisibleForTesting setActiveStatusForTest(boolean activeState)1342 void setActiveStatusForTest(boolean activeState) { 1343 this.activeState = activeState; 1344 } 1345 1346 @Override toString()1347 public String toString() { 1348 return String.format("activeState: %b asyncStatus: %b nextOffset: %d", 1349 activeState, asyncStatus, nextOffset.get()); 1350 } 1351 } 1352