1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/writable_file_writer.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "rocksdb/system_clock.h"
20 #include "test_util/sync_point.h"
21 #include "util/crc32c.h"
22 #include "util/random.h"
23 #include "util/rate_limiter.h"
24
25 namespace ROCKSDB_NAMESPACE {
Create(const std::shared_ptr<FileSystem> & fs,const std::string & fname,const FileOptions & file_opts,std::unique_ptr<WritableFileWriter> * writer,IODebugContext * dbg)26 Status WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
27 const std::string& fname,
28 const FileOptions& file_opts,
29 std::unique_ptr<WritableFileWriter>* writer,
30 IODebugContext* dbg) {
31 std::unique_ptr<FSWritableFile> file;
32 Status s = fs->NewWritableFile(fname, file_opts, &file, dbg);
33 if (s.ok()) {
34 writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
35 }
36 return s;
37 }
38
Append(const Slice & data)39 IOStatus WritableFileWriter::Append(const Slice& data) {
40 const char* src = data.data();
41 size_t left = data.size();
42 IOStatus s;
43 pending_sync_ = true;
44
45 TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);
46
47 // Calculate the checksum of appended data
48 UpdateFileChecksum(data);
49
50 {
51 IOSTATS_TIMER_GUARD(prepare_write_nanos);
52 TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
53 writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
54 IOOptions(), nullptr);
55 }
56
57 // See whether we need to enlarge the buffer to avoid the flush
58 if (buf_.Capacity() - buf_.CurrentSize() < left) {
59 for (size_t cap = buf_.Capacity();
60 cap < max_buffer_size_; // There is still room to increase
61 cap *= 2) {
62 // See whether the next available size is large enough.
63 // Buffer will never be increased to more than max_buffer_size_.
64 size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
65 if (desired_capacity - buf_.CurrentSize() >= left ||
66 (use_direct_io() && desired_capacity == max_buffer_size_)) {
67 buf_.AllocateNewBuffer(desired_capacity, true);
68 break;
69 }
70 }
71 }
72
73 // Flush only when buffered I/O
74 if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
75 if (buf_.CurrentSize() > 0) {
76 s = Flush();
77 if (!s.ok()) {
78 return s;
79 }
80 }
81 assert(buf_.CurrentSize() == 0);
82 }
83
84 // We never write directly to disk with direct I/O on.
85 // or we simply use it for its original purpose to accumulate many small
86 // chunks
87 if (use_direct_io() || (buf_.Capacity() >= left)) {
88 while (left > 0) {
89 size_t appended = buf_.Append(src, left);
90 left -= appended;
91 src += appended;
92
93 if (left > 0) {
94 s = Flush();
95 if (!s.ok()) {
96 break;
97 }
98 }
99 }
100 } else {
101 // Writing directly to file bypassing the buffer
102 assert(buf_.CurrentSize() == 0);
103 s = WriteBuffered(src, left);
104 }
105
106 TEST_KILL_RANDOM("WritableFileWriter::Append:1");
107 if (s.ok()) {
108 filesize_ += data.size();
109 }
110 return s;
111 }
112
Pad(const size_t pad_bytes)113 IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
114 assert(pad_bytes < kDefaultPageSize);
115 size_t left = pad_bytes;
116 size_t cap = buf_.Capacity() - buf_.CurrentSize();
117
118 // Assume pad_bytes is small compared to buf_ capacity. So we always
119 // use buf_ rather than write directly to file in certain cases like
120 // Append() does.
121 while (left) {
122 size_t append_bytes = std::min(cap, left);
123 buf_.PadWith(append_bytes, 0);
124 left -= append_bytes;
125 if (left > 0) {
126 IOStatus s = Flush();
127 if (!s.ok()) {
128 return s;
129 }
130 }
131 cap = buf_.Capacity() - buf_.CurrentSize();
132 }
133 pending_sync_ = true;
134 filesize_ += pad_bytes;
135 return IOStatus::OK();
136 }
137
Close()138 IOStatus WritableFileWriter::Close() {
139 // Do not quit immediately on failure the file MUST be closed
140 IOStatus s;
141
142 // Possible to close it twice now as we MUST close
143 // in __dtor, simply flushing is not enough
144 // Windows when pre-allocating does not fill with zeros
145 // also with unbuffered access we also set the end of data.
146 if (writable_file_.get() == nullptr) {
147 return s;
148 }
149
150 s = Flush(); // flush cache to OS
151
152 IOStatus interim;
153 // In direct I/O mode we write whole pages so
154 // we need to let the file know where data ends.
155 if (use_direct_io()) {
156 {
157 #ifndef ROCKSDB_LITE
158 FileOperationInfo::StartTimePoint start_ts;
159 if (ShouldNotifyListeners()) {
160 start_ts = FileOperationInfo::StartNow();
161 }
162 #endif
163 interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
164 #ifndef ROCKSDB_LITE
165 if (ShouldNotifyListeners()) {
166 auto finish_ts = FileOperationInfo::FinishNow();
167 NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
168 }
169 #endif
170 }
171 if (interim.ok()) {
172 {
173 #ifndef ROCKSDB_LITE
174 FileOperationInfo::StartTimePoint start_ts;
175 if (ShouldNotifyListeners()) {
176 start_ts = FileOperationInfo::StartNow();
177 }
178 #endif
179 interim = writable_file_->Fsync(IOOptions(), nullptr);
180 #ifndef ROCKSDB_LITE
181 if (ShouldNotifyListeners()) {
182 auto finish_ts = FileOperationInfo::FinishNow();
183 NotifyOnFileSyncFinish(start_ts, finish_ts, s,
184 FileOperationType::kFsync);
185 }
186 #endif
187 }
188 }
189 if (!interim.ok() && s.ok()) {
190 s = interim;
191 }
192 }
193
194 TEST_KILL_RANDOM("WritableFileWriter::Close:0");
195 {
196 #ifndef ROCKSDB_LITE
197 FileOperationInfo::StartTimePoint start_ts;
198 if (ShouldNotifyListeners()) {
199 start_ts = FileOperationInfo::StartNow();
200 }
201 #endif
202 interim = writable_file_->Close(IOOptions(), nullptr);
203 #ifndef ROCKSDB_LITE
204 if (ShouldNotifyListeners()) {
205 auto finish_ts = FileOperationInfo::FinishNow();
206 NotifyOnFileCloseFinish(start_ts, finish_ts, s);
207 }
208 #endif
209 }
210 if (!interim.ok() && s.ok()) {
211 s = interim;
212 }
213
214 writable_file_.reset();
215 TEST_KILL_RANDOM("WritableFileWriter::Close:1");
216
217 if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) {
218 checksum_generator_->Finalize();
219 checksum_finalized_ = true;
220 }
221
222 return s;
223 }
224
225 // write out the cached data to the OS cache or storage if direct I/O
226 // enabled
Flush()227 IOStatus WritableFileWriter::Flush() {
228 IOStatus s;
229 TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
230
231 if (buf_.CurrentSize() > 0) {
232 if (use_direct_io()) {
233 #ifndef ROCKSDB_LITE
234 if (pending_sync_) {
235 s = WriteDirect();
236 }
237 #endif // !ROCKSDB_LITE
238 } else {
239 s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
240 }
241 if (!s.ok()) {
242 return s;
243 }
244 }
245
246 {
247 #ifndef ROCKSDB_LITE
248 FileOperationInfo::StartTimePoint start_ts;
249 if (ShouldNotifyListeners()) {
250 start_ts = FileOperationInfo::StartNow();
251 }
252 #endif
253 s = writable_file_->Flush(IOOptions(), nullptr);
254 #ifndef ROCKSDB_LITE
255 if (ShouldNotifyListeners()) {
256 auto finish_ts = std::chrono::steady_clock::now();
257 NotifyOnFileFlushFinish(start_ts, finish_ts, s);
258 }
259 #endif
260 }
261
262 if (!s.ok()) {
263 return s;
264 }
265
266 // sync OS cache to disk for every bytes_per_sync_
267 // TODO: give log file and sst file different options (log
268 // files could be potentially cached in OS for their whole
269 // life time, thus we might not want to flush at all).
270
271 // We try to avoid sync to the last 1MB of data. For two reasons:
272 // (1) avoid rewrite the same page that is modified later.
273 // (2) for older version of OS, write can block while writing out
274 // the page.
275 // Xfs does neighbor page flushing outside of the specified ranges. We
276 // need to make sure sync range is far from the write offset.
277 if (!use_direct_io() && bytes_per_sync_) {
278 const uint64_t kBytesNotSyncRange =
279 1024 * 1024; // recent 1MB is not synced.
280 const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
281 if (filesize_ > kBytesNotSyncRange) {
282 uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
283 offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
284 assert(offset_sync_to >= last_sync_size_);
285 if (offset_sync_to > 0 &&
286 offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
287 s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
288 last_sync_size_ = offset_sync_to;
289 }
290 }
291 }
292
293 return s;
294 }
295
GetFileChecksum()296 std::string WritableFileWriter::GetFileChecksum() {
297 if (checksum_generator_ != nullptr) {
298 assert(checksum_finalized_);
299 return checksum_generator_->GetChecksum();
300 } else {
301 return kUnknownFileChecksum;
302 }
303 }
304
GetFileChecksumFuncName() const305 const char* WritableFileWriter::GetFileChecksumFuncName() const {
306 if (checksum_generator_ != nullptr) {
307 return checksum_generator_->Name();
308 } else {
309 return kUnknownFileChecksumFuncName;
310 }
311 }
312
Sync(bool use_fsync)313 IOStatus WritableFileWriter::Sync(bool use_fsync) {
314 IOStatus s = Flush();
315 if (!s.ok()) {
316 return s;
317 }
318 TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
319 if (!use_direct_io() && pending_sync_) {
320 s = SyncInternal(use_fsync);
321 if (!s.ok()) {
322 return s;
323 }
324 }
325 TEST_KILL_RANDOM("WritableFileWriter::Sync:1");
326 pending_sync_ = false;
327 return IOStatus::OK();
328 }
329
SyncWithoutFlush(bool use_fsync)330 IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
331 if (!writable_file_->IsSyncThreadSafe()) {
332 return IOStatus::NotSupported(
333 "Can't WritableFileWriter::SyncWithoutFlush() because "
334 "WritableFile::IsSyncThreadSafe() is false");
335 }
336 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
337 IOStatus s = SyncInternal(use_fsync);
338 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
339 return s;
340 }
341
SyncInternal(bool use_fsync)342 IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
343 IOStatus s;
344 IOSTATS_TIMER_GUARD(fsync_nanos);
345 TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
346 auto prev_perf_level = GetPerfLevel();
347 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
348 #ifndef ROCKSDB_LITE
349 FileOperationInfo::StartTimePoint start_ts;
350 if (ShouldNotifyListeners()) {
351 start_ts = FileOperationInfo::StartNow();
352 }
353 #endif
354 if (use_fsync) {
355 s = writable_file_->Fsync(IOOptions(), nullptr);
356 } else {
357 s = writable_file_->Sync(IOOptions(), nullptr);
358 }
359 #ifndef ROCKSDB_LITE
360 if (ShouldNotifyListeners()) {
361 auto finish_ts = std::chrono::steady_clock::now();
362 NotifyOnFileSyncFinish(
363 start_ts, finish_ts, s,
364 use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
365 }
366 #endif
367 SetPerfLevel(prev_perf_level);
368 return s;
369 }
370
RangeSync(uint64_t offset,uint64_t nbytes)371 IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
372 IOSTATS_TIMER_GUARD(range_sync_nanos);
373 TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
374 #ifndef ROCKSDB_LITE
375 FileOperationInfo::StartTimePoint start_ts;
376 if (ShouldNotifyListeners()) {
377 start_ts = FileOperationInfo::StartNow();
378 }
379 #endif
380 IOStatus s = writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
381 #ifndef ROCKSDB_LITE
382 if (ShouldNotifyListeners()) {
383 auto finish_ts = std::chrono::steady_clock::now();
384 NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
385 }
386 #endif
387 return s;
388 }
389
390 // This method writes to disk the specified data and makes use of the rate
391 // limiter if available
WriteBuffered(const char * data,size_t size)392 IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
393 IOStatus s;
394 assert(!use_direct_io());
395 const char* src = data;
396 size_t left = size;
397 DataVerificationInfo v_info;
398 char checksum_buf[sizeof(uint32_t)];
399
400 while (left > 0) {
401 size_t allowed;
402 if (rate_limiter_ != nullptr) {
403 allowed = rate_limiter_->RequestToken(
404 left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
405 RateLimiter::OpType::kWrite);
406 } else {
407 allowed = left;
408 }
409
410 {
411 IOSTATS_TIMER_GUARD(write_nanos);
412 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
413
414 #ifndef ROCKSDB_LITE
415 FileOperationInfo::StartTimePoint start_ts;
416 uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
417 if (ShouldNotifyListeners()) {
418 start_ts = FileOperationInfo::StartNow();
419 old_size = next_write_offset_;
420 }
421 #endif
422 {
423 auto prev_perf_level = GetPerfLevel();
424
425 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
426 if (perform_data_verification_) {
427 Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
428 v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
429 s = writable_file_->Append(Slice(src, allowed), IOOptions(), v_info,
430 nullptr);
431 } else {
432 s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
433 }
434 SetPerfLevel(prev_perf_level);
435 }
436 #ifndef ROCKSDB_LITE
437 if (ShouldNotifyListeners()) {
438 auto finish_ts = std::chrono::steady_clock::now();
439 NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
440 }
441 #endif
442 if (!s.ok()) {
443 return s;
444 }
445 }
446
447 IOSTATS_ADD(bytes_written, allowed);
448 TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
449
450 left -= allowed;
451 src += allowed;
452 }
453 buf_.Size(0);
454 return s;
455 }
456
UpdateFileChecksum(const Slice & data)457 void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
458 if (checksum_generator_ != nullptr) {
459 checksum_generator_->Update(data.data(), data.size());
460 }
461 }
462
463 // Currently, crc32c checksum is used to calculate the checksum value of the
464 // content in the input buffer for handoff. In the future, the checksum might be
465 // calculated from the existing crc32c checksums of the in WAl and Manifest
466 // records, or even SST file blocks.
467 // TODO: effectively use the existing checksum of the data being writing to
468 // generate the crc32c checksum instead of a raw calculation.
Crc32cHandoffChecksumCalculation(const char * data,size_t size,char * buf)469 void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
470 size_t size,
471 char* buf) {
472 uint32_t v_crc32c = crc32c::Extend(0, data, size);
473 EncodeFixed32(buf, v_crc32c);
474 }
475
476 // This flushes the accumulated data in the buffer. We pad data with zeros if
477 // necessary to the whole page.
478 // However, during automatic flushes padding would not be necessary.
479 // We always use RateLimiter if available. We move (Refit) any buffer bytes
480 // that are left over the
481 // whole number of pages to be written again on the next flush because we can
482 // only write on aligned
483 // offsets.
484 #ifndef ROCKSDB_LITE
WriteDirect()485 IOStatus WritableFileWriter::WriteDirect() {
486 assert(use_direct_io());
487 IOStatus s;
488 const size_t alignment = buf_.Alignment();
489 assert((next_write_offset_ % alignment) == 0);
490
491 // Calculate whole page final file advance if all writes succeed
492 size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
493
494 // Calculate the leftover tail, we write it here padded with zeros BUT we
495 // will write
496 // it again in the future either on Close() OR when the current whole page
497 // fills out
498 size_t leftover_tail = buf_.CurrentSize() - file_advance;
499
500 // Round up and pad
501 buf_.PadToAlignmentWith(0);
502
503 const char* src = buf_.BufferStart();
504 uint64_t write_offset = next_write_offset_;
505 size_t left = buf_.CurrentSize();
506 DataVerificationInfo v_info;
507 char checksum_buf[sizeof(uint32_t)];
508
509 while (left > 0) {
510 // Check how much is allowed
511 size_t size;
512 if (rate_limiter_ != nullptr) {
513 size = rate_limiter_->RequestToken(left, buf_.Alignment(),
514 writable_file_->GetIOPriority(),
515 stats_, RateLimiter::OpType::kWrite);
516 } else {
517 size = left;
518 }
519
520 {
521 IOSTATS_TIMER_GUARD(write_nanos);
522 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
523 FileOperationInfo::StartTimePoint start_ts;
524 if (ShouldNotifyListeners()) {
525 start_ts = FileOperationInfo::StartNow();
526 }
527 // direct writes must be positional
528 if (perform_data_verification_) {
529 Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
530 v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
531 s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
532 IOOptions(), v_info, nullptr);
533 } else {
534 s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
535 IOOptions(), nullptr);
536 }
537
538 if (ShouldNotifyListeners()) {
539 auto finish_ts = std::chrono::steady_clock::now();
540 NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
541 }
542 if (!s.ok()) {
543 buf_.Size(file_advance + leftover_tail);
544 return s;
545 }
546 }
547
548 IOSTATS_ADD(bytes_written, size);
549 left -= size;
550 src += size;
551 write_offset += size;
552 assert((next_write_offset_ % alignment) == 0);
553 }
554
555 if (s.ok()) {
556 // Move the tail to the beginning of the buffer
557 // This never happens during normal Append but rather during
558 // explicit call to Flush()/Sync() or Close()
559 buf_.RefitTail(file_advance, leftover_tail);
560 // This is where we start writing next time which may or not be
561 // the actual file size on disk. They match if the buffer size
562 // is a multiple of whole pages otherwise filesize_ is leftover_tail
563 // behind
564 next_write_offset_ += file_advance;
565 }
566 return s;
567 }
568 #endif // !ROCKSDB_LITE
569 } // namespace ROCKSDB_NAMESPACE
570