1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/writable_file_writer.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "rocksdb/system_clock.h"
20 #include "test_util/sync_point.h"
21 #include "util/crc32c.h"
22 #include "util/random.h"
23 #include "util/rate_limiter.h"
24 
25 namespace ROCKSDB_NAMESPACE {
Create(const std::shared_ptr<FileSystem> & fs,const std::string & fname,const FileOptions & file_opts,std::unique_ptr<WritableFileWriter> * writer,IODebugContext * dbg)26 Status WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
27                                   const std::string& fname,
28                                   const FileOptions& file_opts,
29                                   std::unique_ptr<WritableFileWriter>* writer,
30                                   IODebugContext* dbg) {
31   std::unique_ptr<FSWritableFile> file;
32   Status s = fs->NewWritableFile(fname, file_opts, &file, dbg);
33   if (s.ok()) {
34     writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
35   }
36   return s;
37 }
38 
Append(const Slice & data)39 IOStatus WritableFileWriter::Append(const Slice& data) {
40   const char* src = data.data();
41   size_t left = data.size();
42   IOStatus s;
43   pending_sync_ = true;
44 
45   TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);
46 
47   // Calculate the checksum of appended data
48   UpdateFileChecksum(data);
49 
50   {
51     IOSTATS_TIMER_GUARD(prepare_write_nanos);
52     TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
53     writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
54                                  IOOptions(), nullptr);
55   }
56 
57   // See whether we need to enlarge the buffer to avoid the flush
58   if (buf_.Capacity() - buf_.CurrentSize() < left) {
59     for (size_t cap = buf_.Capacity();
60          cap < max_buffer_size_;  // There is still room to increase
61          cap *= 2) {
62       // See whether the next available size is large enough.
63       // Buffer will never be increased to more than max_buffer_size_.
64       size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
65       if (desired_capacity - buf_.CurrentSize() >= left ||
66           (use_direct_io() && desired_capacity == max_buffer_size_)) {
67         buf_.AllocateNewBuffer(desired_capacity, true);
68         break;
69       }
70     }
71   }
72 
73   // Flush only when buffered I/O
74   if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
75     if (buf_.CurrentSize() > 0) {
76       s = Flush();
77       if (!s.ok()) {
78         return s;
79       }
80     }
81     assert(buf_.CurrentSize() == 0);
82   }
83 
84   // We never write directly to disk with direct I/O on.
85   // or we simply use it for its original purpose to accumulate many small
86   // chunks
87   if (use_direct_io() || (buf_.Capacity() >= left)) {
88     while (left > 0) {
89       size_t appended = buf_.Append(src, left);
90       left -= appended;
91       src += appended;
92 
93       if (left > 0) {
94         s = Flush();
95         if (!s.ok()) {
96           break;
97         }
98       }
99     }
100   } else {
101     // Writing directly to file bypassing the buffer
102     assert(buf_.CurrentSize() == 0);
103     s = WriteBuffered(src, left);
104   }
105 
106   TEST_KILL_RANDOM("WritableFileWriter::Append:1");
107   if (s.ok()) {
108     filesize_ += data.size();
109   }
110   return s;
111 }
112 
Pad(const size_t pad_bytes)113 IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
114   assert(pad_bytes < kDefaultPageSize);
115   size_t left = pad_bytes;
116   size_t cap = buf_.Capacity() - buf_.CurrentSize();
117 
118   // Assume pad_bytes is small compared to buf_ capacity. So we always
119   // use buf_ rather than write directly to file in certain cases like
120   // Append() does.
121   while (left) {
122     size_t append_bytes = std::min(cap, left);
123     buf_.PadWith(append_bytes, 0);
124     left -= append_bytes;
125     if (left > 0) {
126       IOStatus s = Flush();
127       if (!s.ok()) {
128         return s;
129       }
130     }
131     cap = buf_.Capacity() - buf_.CurrentSize();
132   }
133   pending_sync_ = true;
134   filesize_ += pad_bytes;
135   return IOStatus::OK();
136 }
137 
Close()138 IOStatus WritableFileWriter::Close() {
139   // Do not quit immediately on failure the file MUST be closed
140   IOStatus s;
141 
142   // Possible to close it twice now as we MUST close
143   // in __dtor, simply flushing is not enough
144   // Windows when pre-allocating does not fill with zeros
145   // also with unbuffered access we also set the end of data.
146   if (writable_file_.get() == nullptr) {
147     return s;
148   }
149 
150   s = Flush();  // flush cache to OS
151 
152   IOStatus interim;
153   // In direct I/O mode we write whole pages so
154   // we need to let the file know where data ends.
155   if (use_direct_io()) {
156     {
157 #ifndef ROCKSDB_LITE
158       FileOperationInfo::StartTimePoint start_ts;
159       if (ShouldNotifyListeners()) {
160         start_ts = FileOperationInfo::StartNow();
161       }
162 #endif
163       interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
164 #ifndef ROCKSDB_LITE
165       if (ShouldNotifyListeners()) {
166         auto finish_ts = FileOperationInfo::FinishNow();
167         NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
168       }
169 #endif
170     }
171     if (interim.ok()) {
172       {
173 #ifndef ROCKSDB_LITE
174         FileOperationInfo::StartTimePoint start_ts;
175         if (ShouldNotifyListeners()) {
176           start_ts = FileOperationInfo::StartNow();
177         }
178 #endif
179         interim = writable_file_->Fsync(IOOptions(), nullptr);
180 #ifndef ROCKSDB_LITE
181         if (ShouldNotifyListeners()) {
182           auto finish_ts = FileOperationInfo::FinishNow();
183           NotifyOnFileSyncFinish(start_ts, finish_ts, s,
184                                  FileOperationType::kFsync);
185         }
186 #endif
187       }
188     }
189     if (!interim.ok() && s.ok()) {
190       s = interim;
191     }
192   }
193 
194   TEST_KILL_RANDOM("WritableFileWriter::Close:0");
195   {
196 #ifndef ROCKSDB_LITE
197     FileOperationInfo::StartTimePoint start_ts;
198     if (ShouldNotifyListeners()) {
199       start_ts = FileOperationInfo::StartNow();
200     }
201 #endif
202     interim = writable_file_->Close(IOOptions(), nullptr);
203 #ifndef ROCKSDB_LITE
204     if (ShouldNotifyListeners()) {
205       auto finish_ts = FileOperationInfo::FinishNow();
206       NotifyOnFileCloseFinish(start_ts, finish_ts, s);
207     }
208 #endif
209   }
210   if (!interim.ok() && s.ok()) {
211     s = interim;
212   }
213 
214   writable_file_.reset();
215   TEST_KILL_RANDOM("WritableFileWriter::Close:1");
216 
217   if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) {
218     checksum_generator_->Finalize();
219     checksum_finalized_ = true;
220   }
221 
222   return s;
223 }
224 
225 // write out the cached data to the OS cache or storage if direct I/O
226 // enabled
Flush()227 IOStatus WritableFileWriter::Flush() {
228   IOStatus s;
229   TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
230 
231   if (buf_.CurrentSize() > 0) {
232     if (use_direct_io()) {
233 #ifndef ROCKSDB_LITE
234       if (pending_sync_) {
235         s = WriteDirect();
236       }
237 #endif  // !ROCKSDB_LITE
238     } else {
239       s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
240     }
241     if (!s.ok()) {
242       return s;
243     }
244   }
245 
246   {
247 #ifndef ROCKSDB_LITE
248     FileOperationInfo::StartTimePoint start_ts;
249     if (ShouldNotifyListeners()) {
250       start_ts = FileOperationInfo::StartNow();
251     }
252 #endif
253     s = writable_file_->Flush(IOOptions(), nullptr);
254 #ifndef ROCKSDB_LITE
255     if (ShouldNotifyListeners()) {
256       auto finish_ts = std::chrono::steady_clock::now();
257       NotifyOnFileFlushFinish(start_ts, finish_ts, s);
258     }
259 #endif
260   }
261 
262   if (!s.ok()) {
263     return s;
264   }
265 
266   // sync OS cache to disk for every bytes_per_sync_
267   // TODO: give log file and sst file different options (log
268   // files could be potentially cached in OS for their whole
269   // life time, thus we might not want to flush at all).
270 
271   // We try to avoid sync to the last 1MB of data. For two reasons:
272   // (1) avoid rewrite the same page that is modified later.
273   // (2) for older version of OS, write can block while writing out
274   //     the page.
275   // Xfs does neighbor page flushing outside of the specified ranges. We
276   // need to make sure sync range is far from the write offset.
277   if (!use_direct_io() && bytes_per_sync_) {
278     const uint64_t kBytesNotSyncRange =
279         1024 * 1024;                                // recent 1MB is not synced.
280     const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
281     if (filesize_ > kBytesNotSyncRange) {
282       uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
283       offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
284       assert(offset_sync_to >= last_sync_size_);
285       if (offset_sync_to > 0 &&
286           offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
287         s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
288         last_sync_size_ = offset_sync_to;
289       }
290     }
291   }
292 
293   return s;
294 }
295 
GetFileChecksum()296 std::string WritableFileWriter::GetFileChecksum() {
297   if (checksum_generator_ != nullptr) {
298     assert(checksum_finalized_);
299     return checksum_generator_->GetChecksum();
300   } else {
301     return kUnknownFileChecksum;
302   }
303 }
304 
GetFileChecksumFuncName() const305 const char* WritableFileWriter::GetFileChecksumFuncName() const {
306   if (checksum_generator_ != nullptr) {
307     return checksum_generator_->Name();
308   } else {
309     return kUnknownFileChecksumFuncName;
310   }
311 }
312 
Sync(bool use_fsync)313 IOStatus WritableFileWriter::Sync(bool use_fsync) {
314   IOStatus s = Flush();
315   if (!s.ok()) {
316     return s;
317   }
318   TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
319   if (!use_direct_io() && pending_sync_) {
320     s = SyncInternal(use_fsync);
321     if (!s.ok()) {
322       return s;
323     }
324   }
325   TEST_KILL_RANDOM("WritableFileWriter::Sync:1");
326   pending_sync_ = false;
327   return IOStatus::OK();
328 }
329 
SyncWithoutFlush(bool use_fsync)330 IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
331   if (!writable_file_->IsSyncThreadSafe()) {
332     return IOStatus::NotSupported(
333         "Can't WritableFileWriter::SyncWithoutFlush() because "
334         "WritableFile::IsSyncThreadSafe() is false");
335   }
336   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
337   IOStatus s = SyncInternal(use_fsync);
338   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
339   return s;
340 }
341 
SyncInternal(bool use_fsync)342 IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
343   IOStatus s;
344   IOSTATS_TIMER_GUARD(fsync_nanos);
345   TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
346   auto prev_perf_level = GetPerfLevel();
347   IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
348 #ifndef ROCKSDB_LITE
349   FileOperationInfo::StartTimePoint start_ts;
350   if (ShouldNotifyListeners()) {
351     start_ts = FileOperationInfo::StartNow();
352   }
353 #endif
354   if (use_fsync) {
355     s = writable_file_->Fsync(IOOptions(), nullptr);
356   } else {
357     s = writable_file_->Sync(IOOptions(), nullptr);
358   }
359 #ifndef ROCKSDB_LITE
360   if (ShouldNotifyListeners()) {
361     auto finish_ts = std::chrono::steady_clock::now();
362     NotifyOnFileSyncFinish(
363         start_ts, finish_ts, s,
364         use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
365   }
366 #endif
367   SetPerfLevel(prev_perf_level);
368   return s;
369 }
370 
RangeSync(uint64_t offset,uint64_t nbytes)371 IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
372   IOSTATS_TIMER_GUARD(range_sync_nanos);
373   TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
374 #ifndef ROCKSDB_LITE
375   FileOperationInfo::StartTimePoint start_ts;
376   if (ShouldNotifyListeners()) {
377     start_ts = FileOperationInfo::StartNow();
378   }
379 #endif
380   IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
381 #ifndef ROCKSDB_LITE
382   if (ShouldNotifyListeners()) {
383     auto finish_ts = std::chrono::steady_clock::now();
384     NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
385   }
386 #endif
387   return s;
388 }
389 
390 // This method writes to disk the specified data and makes use of the rate
391 // limiter if available
WriteBuffered(const char * data,size_t size)392 IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
393   IOStatus s;
394   assert(!use_direct_io());
395   const char* src = data;
396   size_t left = size;
397   DataVerificationInfo v_info;
398   char checksum_buf[sizeof(uint32_t)];
399 
400   while (left > 0) {
401     size_t allowed;
402     if (rate_limiter_ != nullptr) {
403       allowed = rate_limiter_->RequestToken(
404           left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
405           RateLimiter::OpType::kWrite);
406     } else {
407       allowed = left;
408     }
409 
410     {
411       IOSTATS_TIMER_GUARD(write_nanos);
412       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
413 
414 #ifndef ROCKSDB_LITE
415       FileOperationInfo::StartTimePoint start_ts;
416       uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
417       if (ShouldNotifyListeners()) {
418         start_ts = FileOperationInfo::StartNow();
419         old_size = next_write_offset_;
420       }
421 #endif
422       {
423         auto prev_perf_level = GetPerfLevel();
424 
425         IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
426         if (perform_data_verification_) {
427           Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
428           v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
429           s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info,
430                                      nullptr);
431         } else {
432           s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
433         }
434         SetPerfLevel(prev_perf_level);
435       }
436 #ifndef ROCKSDB_LITE
437       if (ShouldNotifyListeners()) {
438         auto finish_ts = std::chrono::steady_clock::now();
439         NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
440       }
441 #endif
442       if (!s.ok()) {
443         return s;
444       }
445     }
446 
447     IOSTATS_ADD(bytes_written, allowed);
448     TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
449 
450     left -= allowed;
451     src += allowed;
452   }
453   buf_.Size(0);
454   return s;
455 }
456 
UpdateFileChecksum(const Slice & data)457 void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
458   if (checksum_generator_ != nullptr) {
459     checksum_generator_->Update(data.data(), data.size());
460   }
461 }
462 
463 // Currently, crc32c checksum is used to calculate the checksum value of the
464 // content in the input buffer for handoff. In the future, the checksum might be
465 // calculated from the existing crc32c checksums of the in WAl and Manifest
466 // records, or even SST file blocks.
467 // TODO: effectively use the existing checksum of the data being writing to
468 // generate the crc32c checksum instead of a raw calculation.
Crc32cHandoffChecksumCalculation(const char * data,size_t size,char * buf)469 void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
470                                                           size_t size,
471                                                           char* buf) {
472   uint32_t v_crc32c = crc32c::Extend(0, data, size);
473   EncodeFixed32(buf, v_crc32c);
474 }
475 
476 // This flushes the accumulated data in the buffer. We pad data with zeros if
477 // necessary to the whole page.
478 // However, during automatic flushes padding would not be necessary.
479 // We always use RateLimiter if available. We move (Refit) any buffer bytes
480 // that are left over the
481 // whole number of pages to be written again on the next flush because we can
482 // only write on aligned
483 // offsets.
484 #ifndef ROCKSDB_LITE
WriteDirect()485 IOStatus WritableFileWriter::WriteDirect() {
486   assert(use_direct_io());
487   IOStatus s;
488   const size_t alignment = buf_.Alignment();
489   assert((next_write_offset_ % alignment) == 0);
490 
491   // Calculate whole page final file advance if all writes succeed
492   size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
493 
494   // Calculate the leftover tail, we write it here padded with zeros BUT we
495   // will write
496   // it again in the future either on Close() OR when the current whole page
497   // fills out
498   size_t leftover_tail = buf_.CurrentSize() - file_advance;
499 
500   // Round up and pad
501   buf_.PadToAlignmentWith(0);
502 
503   const char* src = buf_.BufferStart();
504   uint64_t write_offset = next_write_offset_;
505   size_t left = buf_.CurrentSize();
506   DataVerificationInfo v_info;
507   char checksum_buf[sizeof(uint32_t)];
508 
509   while (left > 0) {
510     // Check how much is allowed
511     size_t size;
512     if (rate_limiter_ != nullptr) {
513       size = rate_limiter_->RequestToken(left, buf_.Alignment(),
514                                          writable_file_->GetIOPriority(),
515                                          stats_, RateLimiter::OpType::kWrite);
516     } else {
517       size = left;
518     }
519 
520     {
521       IOSTATS_TIMER_GUARD(write_nanos);
522       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
523       FileOperationInfo::StartTimePoint start_ts;
524       if (ShouldNotifyListeners()) {
525         start_ts = FileOperationInfo::StartNow();
526       }
527       // direct writes must be positional
528       if (perform_data_verification_) {
529         Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
530         v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
531         s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
532                                              IOOptions(), v_info, nullptr);
533       } else {
534         s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
535                                              IOOptions(), nullptr);
536       }
537 
538       if (ShouldNotifyListeners()) {
539         auto finish_ts = std::chrono::steady_clock::now();
540         NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
541       }
542       if (!s.ok()) {
543         buf_.Size(file_advance + leftover_tail);
544         return s;
545       }
546     }
547 
548     IOSTATS_ADD(bytes_written, size);
549     left -= size;
550     src += size;
551     write_offset += size;
552     assert((next_write_offset_ % alignment) == 0);
553   }
554 
555   if (s.ok()) {
556     // Move the tail to the beginning of the buffer
557     // This never happens during normal Append but rather during
558     // explicit call to Flush()/Sync() or Close()
559     buf_.RefitTail(file_advance, leftover_tail);
560     // This is where we start writing next time which may or not be
561     // the actual file size on disk. They match if the buffer size
562     // is a multiple of whole pages otherwise filesize_ is leftover_tail
563     // behind
564     next_write_offset_ += file_advance;
565   }
566   return s;
567 }
568 #endif  // !ROCKSDB_LITE
569 }  // namespace ROCKSDB_NAMESPACE
570