1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/writable_file_writer.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "db/version_edit.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22 
23 namespace ROCKSDB_NAMESPACE {
Append(const Slice & data)24 Status WritableFileWriter::Append(const Slice& data) {
25   const char* src = data.data();
26   size_t left = data.size();
27   Status s;
28   pending_sync_ = true;
29 
30   TEST_KILL_RANDOM("WritableFileWriter::Append:0",
31                    rocksdb_kill_odds * REDUCE_ODDS2);
32 
33   {
34     IOSTATS_TIMER_GUARD(prepare_write_nanos);
35     TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
36     writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
37                                  IOOptions(), nullptr);
38   }
39 
40   // See whether we need to enlarge the buffer to avoid the flush
41   if (buf_.Capacity() - buf_.CurrentSize() < left) {
42     for (size_t cap = buf_.Capacity();
43          cap < max_buffer_size_;  // There is still room to increase
44          cap *= 2) {
45       // See whether the next available size is large enough.
46       // Buffer will never be increased to more than max_buffer_size_.
47       size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
48       if (desired_capacity - buf_.CurrentSize() >= left ||
49           (use_direct_io() && desired_capacity == max_buffer_size_)) {
50         buf_.AllocateNewBuffer(desired_capacity, true);
51         break;
52       }
53     }
54   }
55 
56   // Flush only when buffered I/O
57   if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
58     if (buf_.CurrentSize() > 0) {
59       s = Flush();
60       if (!s.ok()) {
61         return s;
62       }
63     }
64     assert(buf_.CurrentSize() == 0);
65   }
66 
67   // We never write directly to disk with direct I/O on.
68   // or we simply use it for its original purpose to accumulate many small
69   // chunks
70   if (use_direct_io() || (buf_.Capacity() >= left)) {
71     while (left > 0) {
72       size_t appended = buf_.Append(src, left);
73       left -= appended;
74       src += appended;
75 
76       if (left > 0) {
77         s = Flush();
78         if (!s.ok()) {
79           break;
80         }
81       }
82     }
83   } else {
84     // Writing directly to file bypassing the buffer
85     assert(buf_.CurrentSize() == 0);
86     s = WriteBuffered(src, left);
87   }
88 
89   TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
90   if (s.ok()) {
91     filesize_ += data.size();
92     CalculateFileChecksum(data);
93   }
94   return s;
95 }
96 
Pad(const size_t pad_bytes)97 Status WritableFileWriter::Pad(const size_t pad_bytes) {
98   assert(pad_bytes < kDefaultPageSize);
99   size_t left = pad_bytes;
100   size_t cap = buf_.Capacity() - buf_.CurrentSize();
101 
102   // Assume pad_bytes is small compared to buf_ capacity. So we always
103   // use buf_ rather than write directly to file in certain cases like
104   // Append() does.
105   while (left) {
106     size_t append_bytes = std::min(cap, left);
107     buf_.PadWith(append_bytes, 0);
108     left -= append_bytes;
109     if (left > 0) {
110       Status s = Flush();
111       if (!s.ok()) {
112         return s;
113       }
114     }
115     cap = buf_.Capacity() - buf_.CurrentSize();
116   }
117   pending_sync_ = true;
118   filesize_ += pad_bytes;
119   return Status::OK();
120 }
121 
Close()122 Status WritableFileWriter::Close() {
123   // Do not quit immediately on failure the file MUST be closed
124   Status s;
125 
126   // Possible to close it twice now as we MUST close
127   // in __dtor, simply flushing is not enough
128   // Windows when pre-allocating does not fill with zeros
129   // also with unbuffered access we also set the end of data.
130   if (!writable_file_) {
131     return s;
132   }
133 
134   s = Flush();  // flush cache to OS
135 
136   Status interim;
137   // In direct I/O mode we write whole pages so
138   // we need to let the file know where data ends.
139   if (use_direct_io()) {
140     interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
141     if (interim.ok()) {
142       interim = writable_file_->Fsync(IOOptions(), nullptr);
143     }
144     if (!interim.ok() && s.ok()) {
145       s = interim;
146     }
147   }
148 
149   TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
150   interim = writable_file_->Close(IOOptions(), nullptr);
151   if (!interim.ok() && s.ok()) {
152     s = interim;
153   }
154 
155   writable_file_.reset();
156   TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
157 
158   return s;
159 }
160 
161 // write out the cached data to the OS cache or storage if direct I/O
162 // enabled
Flush()163 Status WritableFileWriter::Flush() {
164   Status s;
165   TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
166                    rocksdb_kill_odds * REDUCE_ODDS2);
167 
168   if (buf_.CurrentSize() > 0) {
169     if (use_direct_io()) {
170 #ifndef ROCKSDB_LITE
171       if (pending_sync_) {
172         s = WriteDirect();
173       }
174 #endif  // !ROCKSDB_LITE
175     } else {
176       s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
177     }
178     if (!s.ok()) {
179       return s;
180     }
181   }
182 
183   s = writable_file_->Flush(IOOptions(), nullptr);
184 
185   if (!s.ok()) {
186     return s;
187   }
188 
189   // sync OS cache to disk for every bytes_per_sync_
190   // TODO: give log file and sst file different options (log
191   // files could be potentially cached in OS for their whole
192   // life time, thus we might not want to flush at all).
193 
194   // We try to avoid sync to the last 1MB of data. For two reasons:
195   // (1) avoid rewrite the same page that is modified later.
196   // (2) for older version of OS, write can block while writing out
197   //     the page.
198   // Xfs does neighbor page flushing outside of the specified ranges. We
199   // need to make sure sync range is far from the write offset.
200   if (!use_direct_io() && bytes_per_sync_) {
201     const uint64_t kBytesNotSyncRange =
202         1024 * 1024;                                // recent 1MB is not synced.
203     const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
204     if (filesize_ > kBytesNotSyncRange) {
205       uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
206       offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
207       assert(offset_sync_to >= last_sync_size_);
208       if (offset_sync_to > 0 &&
209           offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
210         s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
211         last_sync_size_ = offset_sync_to;
212       }
213     }
214   }
215 
216   return s;
217 }
218 
GetFileChecksumFuncName() const219 const char* WritableFileWriter::GetFileChecksumFuncName() const {
220   if (checksum_func_ != nullptr) {
221     return checksum_func_->Name();
222   } else {
223     return kUnknownFileChecksumFuncName.c_str();
224   }
225 }
226 
Sync(bool use_fsync)227 Status WritableFileWriter::Sync(bool use_fsync) {
228   Status s = Flush();
229   if (!s.ok()) {
230     return s;
231   }
232   TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
233   if (!use_direct_io() && pending_sync_) {
234     s = SyncInternal(use_fsync);
235     if (!s.ok()) {
236       return s;
237     }
238   }
239   TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
240   pending_sync_ = false;
241   return Status::OK();
242 }
243 
SyncWithoutFlush(bool use_fsync)244 Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
245   if (!writable_file_->IsSyncThreadSafe()) {
246     return Status::NotSupported(
247         "Can't WritableFileWriter::SyncWithoutFlush() because "
248         "WritableFile::IsSyncThreadSafe() is false");
249   }
250   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
251   Status s = SyncInternal(use_fsync);
252   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
253   return s;
254 }
255 
SyncInternal(bool use_fsync)256 Status WritableFileWriter::SyncInternal(bool use_fsync) {
257   Status s;
258   IOSTATS_TIMER_GUARD(fsync_nanos);
259   TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
260   auto prev_perf_level = GetPerfLevel();
261   IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
262   if (use_fsync) {
263     s = writable_file_->Fsync(IOOptions(), nullptr);
264   } else {
265     s = writable_file_->Sync(IOOptions(), nullptr);
266   }
267   SetPerfLevel(prev_perf_level);
268   return s;
269 }
270 
RangeSync(uint64_t offset,uint64_t nbytes)271 Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
272   IOSTATS_TIMER_GUARD(range_sync_nanos);
273   TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
274   return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
275 }
276 
277 // This method writes to disk the specified data and makes use of the rate
278 // limiter if available
WriteBuffered(const char * data,size_t size)279 Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
280   Status s;
281   assert(!use_direct_io());
282   const char* src = data;
283   size_t left = size;
284 
285   while (left > 0) {
286     size_t allowed;
287     if (rate_limiter_ != nullptr) {
288       allowed = rate_limiter_->RequestToken(
289           left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
290           RateLimiter::OpType::kWrite);
291     } else {
292       allowed = left;
293     }
294 
295     {
296       IOSTATS_TIMER_GUARD(write_nanos);
297       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
298 
299 #ifndef ROCKSDB_LITE
300       FileOperationInfo::TimePoint start_ts;
301       uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
302       if (ShouldNotifyListeners()) {
303         start_ts = std::chrono::system_clock::now();
304         old_size = next_write_offset_;
305       }
306 #endif
307       {
308         auto prev_perf_level = GetPerfLevel();
309         IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
310         s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
311         SetPerfLevel(prev_perf_level);
312       }
313 #ifndef ROCKSDB_LITE
314       if (ShouldNotifyListeners()) {
315         auto finish_ts = std::chrono::system_clock::now();
316         NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
317       }
318 #endif
319       if (!s.ok()) {
320         return s;
321       }
322     }
323 
324     IOSTATS_ADD(bytes_written, allowed);
325     TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
326 
327     left -= allowed;
328     src += allowed;
329   }
330   buf_.Size(0);
331   return s;
332 }
333 
CalculateFileChecksum(const Slice & data)334 void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
335   if (checksum_func_ != nullptr) {
336     if (is_first_checksum_) {
337       file_checksum_ = checksum_func_->Value(data.data(), data.size());
338       is_first_checksum_ = false;
339     } else {
340       file_checksum_ =
341           checksum_func_->Extend(file_checksum_, data.data(), data.size());
342     }
343   }
344 }
345 
346 // This flushes the accumulated data in the buffer. We pad data with zeros if
347 // necessary to the whole page.
348 // However, during automatic flushes padding would not be necessary.
349 // We always use RateLimiter if available. We move (Refit) any buffer bytes
350 // that are left over the
351 // whole number of pages to be written again on the next flush because we can
352 // only write on aligned
353 // offsets.
354 #ifndef ROCKSDB_LITE
WriteDirect()355 Status WritableFileWriter::WriteDirect() {
356   assert(use_direct_io());
357   Status s;
358   const size_t alignment = buf_.Alignment();
359   assert((next_write_offset_ % alignment) == 0);
360 
361   // Calculate whole page final file advance if all writes succeed
362   size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
363 
364   // Calculate the leftover tail, we write it here padded with zeros BUT we
365   // will write
366   // it again in the future either on Close() OR when the current whole page
367   // fills out
368   size_t leftover_tail = buf_.CurrentSize() - file_advance;
369 
370   // Round up and pad
371   buf_.PadToAlignmentWith(0);
372 
373   const char* src = buf_.BufferStart();
374   uint64_t write_offset = next_write_offset_;
375   size_t left = buf_.CurrentSize();
376 
377   while (left > 0) {
378     // Check how much is allowed
379     size_t size;
380     if (rate_limiter_ != nullptr) {
381       size = rate_limiter_->RequestToken(left, buf_.Alignment(),
382                                          writable_file_->GetIOPriority(),
383                                          stats_, RateLimiter::OpType::kWrite);
384     } else {
385       size = left;
386     }
387 
388     {
389       IOSTATS_TIMER_GUARD(write_nanos);
390       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
391       FileOperationInfo::TimePoint start_ts;
392       if (ShouldNotifyListeners()) {
393         start_ts = std::chrono::system_clock::now();
394       }
395       // direct writes must be positional
396       s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
397                                            IOOptions(), nullptr);
398       if (ShouldNotifyListeners()) {
399         auto finish_ts = std::chrono::system_clock::now();
400         NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
401       }
402       if (!s.ok()) {
403         buf_.Size(file_advance + leftover_tail);
404         return s;
405       }
406     }
407 
408     IOSTATS_ADD(bytes_written, size);
409     left -= size;
410     src += size;
411     write_offset += size;
412     assert((next_write_offset_ % alignment) == 0);
413   }
414 
415   if (s.ok()) {
416     // Move the tail to the beginning of the buffer
417     // This never happens during normal Append but rather during
418     // explicit call to Flush()/Sync() or Close()
419     buf_.RefitTail(file_advance, leftover_tail);
420     // This is where we start writing next time which may or not be
421     // the actual file size on disk. They match if the buffer size
422     // is a multiple of whole pages otherwise filesize_ is leftover_tail
423     // behind
424     next_write_offset_ += file_advance;
425   }
426   return s;
427 }
428 #endif  // !ROCKSDB_LITE
429 }  // namespace ROCKSDB_NAMESPACE
430