1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "file/writable_file_writer.h"
11 
12 #include <algorithm>
13 #include <mutex>
14 
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
21 
22 namespace rocksdb {
Append(const Slice & data)23 Status WritableFileWriter::Append(const Slice& data) {
24   const char* src = data.data();
25   size_t left = data.size();
26   Status s;
27   pending_sync_ = true;
28 
29   TEST_KILL_RANDOM("WritableFileWriter::Append:0",
30                    rocksdb_kill_odds * REDUCE_ODDS2);
31 
32   {
33     IOSTATS_TIMER_GUARD(prepare_write_nanos);
34     TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
35     writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
36                                  IOOptions(), nullptr);
37   }
38 
39   // See whether we need to enlarge the buffer to avoid the flush
40   if (buf_.Capacity() - buf_.CurrentSize() < left) {
41     for (size_t cap = buf_.Capacity();
42          cap < max_buffer_size_;  // There is still room to increase
43          cap *= 2) {
44       // See whether the next available size is large enough.
45       // Buffer will never be increased to more than max_buffer_size_.
46       size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
47       if (desired_capacity - buf_.CurrentSize() >= left ||
48           (use_direct_io() && desired_capacity == max_buffer_size_)) {
49         buf_.AllocateNewBuffer(desired_capacity, true);
50         break;
51       }
52     }
53   }
54 
55   // Flush only when buffered I/O
56   if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
57     if (buf_.CurrentSize() > 0) {
58       s = Flush();
59       if (!s.ok()) {
60         return s;
61       }
62     }
63     assert(buf_.CurrentSize() == 0);
64   }
65 
66   // We never write directly to disk with direct I/O on.
67   // or we simply use it for its original purpose to accumulate many small
68   // chunks
69   if (use_direct_io() || (buf_.Capacity() >= left)) {
70     while (left > 0) {
71       size_t appended = buf_.Append(src, left);
72       left -= appended;
73       src += appended;
74 
75       if (left > 0) {
76         s = Flush();
77         if (!s.ok()) {
78           break;
79         }
80       }
81     }
82   } else {
83     // Writing directly to file bypassing the buffer
84     assert(buf_.CurrentSize() == 0);
85     s = WriteBuffered(src, left);
86   }
87 
88   TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
89   if (s.ok()) {
90     filesize_ += data.size();
91   }
92   return s;
93 }
94 
Pad(const size_t pad_bytes)95 Status WritableFileWriter::Pad(const size_t pad_bytes) {
96   assert(pad_bytes < kDefaultPageSize);
97   size_t left = pad_bytes;
98   size_t cap = buf_.Capacity() - buf_.CurrentSize();
99 
100   // Assume pad_bytes is small compared to buf_ capacity. So we always
101   // use buf_ rather than write directly to file in certain cases like
102   // Append() does.
103   while (left) {
104     size_t append_bytes = std::min(cap, left);
105     buf_.PadWith(append_bytes, 0);
106     left -= append_bytes;
107     if (left > 0) {
108       Status s = Flush();
109       if (!s.ok()) {
110         return s;
111       }
112     }
113     cap = buf_.Capacity() - buf_.CurrentSize();
114   }
115   pending_sync_ = true;
116   filesize_ += pad_bytes;
117   return Status::OK();
118 }
119 
Close()120 Status WritableFileWriter::Close() {
121   // Do not quit immediately on failure the file MUST be closed
122   Status s;
123 
124   // Possible to close it twice now as we MUST close
125   // in __dtor, simply flushing is not enough
126   // Windows when pre-allocating does not fill with zeros
127   // also with unbuffered access we also set the end of data.
128   if (!writable_file_) {
129     return s;
130   }
131 
132   s = Flush();  // flush cache to OS
133 
134   Status interim;
135   // In direct I/O mode we write whole pages so
136   // we need to let the file know where data ends.
137   if (use_direct_io()) {
138     interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
139     if (interim.ok()) {
140       interim = writable_file_->Fsync(IOOptions(), nullptr);
141     }
142     if (!interim.ok() && s.ok()) {
143       s = interim;
144     }
145   }
146 
147   TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
148   interim = writable_file_->Close(IOOptions(), nullptr);
149   if (!interim.ok() && s.ok()) {
150     s = interim;
151   }
152 
153   writable_file_.reset();
154   TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
155 
156   return s;
157 }
158 
159 // write out the cached data to the OS cache or storage if direct I/O
160 // enabled
Flush()161 Status WritableFileWriter::Flush() {
162   Status s;
163   TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
164                    rocksdb_kill_odds * REDUCE_ODDS2);
165 
166   if (buf_.CurrentSize() > 0) {
167     if (use_direct_io()) {
168 #ifndef ROCKSDB_LITE
169       if (pending_sync_) {
170         s = WriteDirect();
171       }
172 #endif  // !ROCKSDB_LITE
173     } else {
174       s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
175     }
176     if (!s.ok()) {
177       return s;
178     }
179   }
180 
181   s = writable_file_->Flush(IOOptions(), nullptr);
182 
183   if (!s.ok()) {
184     return s;
185   }
186 
187   // sync OS cache to disk for every bytes_per_sync_
188   // TODO: give log file and sst file different options (log
189   // files could be potentially cached in OS for their whole
190   // life time, thus we might not want to flush at all).
191 
192   // We try to avoid sync to the last 1MB of data. For two reasons:
193   // (1) avoid rewrite the same page that is modified later.
194   // (2) for older version of OS, write can block while writing out
195   //     the page.
196   // Xfs does neighbor page flushing outside of the specified ranges. We
197   // need to make sure sync range is far from the write offset.
198   if (!use_direct_io() && bytes_per_sync_) {
199     const uint64_t kBytesNotSyncRange =
200         1024 * 1024;                                // recent 1MB is not synced.
201     const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
202     if (filesize_ > kBytesNotSyncRange) {
203       uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
204       offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
205       assert(offset_sync_to >= last_sync_size_);
206       if (offset_sync_to > 0 &&
207           offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
208         s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
209         last_sync_size_ = offset_sync_to;
210       }
211     }
212   }
213 
214   return s;
215 }
216 
Sync(bool use_fsync)217 Status WritableFileWriter::Sync(bool use_fsync) {
218   Status s = Flush();
219   if (!s.ok()) {
220     return s;
221   }
222   TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
223   if (!use_direct_io() && pending_sync_) {
224     s = SyncInternal(use_fsync);
225     if (!s.ok()) {
226       return s;
227     }
228   }
229   TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
230   pending_sync_ = false;
231   return Status::OK();
232 }
233 
SyncWithoutFlush(bool use_fsync)234 Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
235   if (!writable_file_->IsSyncThreadSafe()) {
236     return Status::NotSupported(
237         "Can't WritableFileWriter::SyncWithoutFlush() because "
238         "WritableFile::IsSyncThreadSafe() is false");
239   }
240   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
241   Status s = SyncInternal(use_fsync);
242   TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
243   return s;
244 }
245 
SyncInternal(bool use_fsync)246 Status WritableFileWriter::SyncInternal(bool use_fsync) {
247   Status s;
248   IOSTATS_TIMER_GUARD(fsync_nanos);
249   TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
250   auto prev_perf_level = GetPerfLevel();
251   IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
252   if (use_fsync) {
253     s = writable_file_->Fsync(IOOptions(), nullptr);
254   } else {
255     s = writable_file_->Sync(IOOptions(), nullptr);
256   }
257   SetPerfLevel(prev_perf_level);
258   return s;
259 }
260 
RangeSync(uint64_t offset,uint64_t nbytes)261 Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
262   IOSTATS_TIMER_GUARD(range_sync_nanos);
263   TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
264   return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
265 }
266 
267 // This method writes to disk the specified data and makes use of the rate
268 // limiter if available
WriteBuffered(const char * data,size_t size)269 Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
270   Status s;
271   assert(!use_direct_io());
272   const char* src = data;
273   size_t left = size;
274 
275   while (left > 0) {
276     size_t allowed;
277     if (rate_limiter_ != nullptr) {
278       allowed = rate_limiter_->RequestToken(
279           left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
280           RateLimiter::OpType::kWrite);
281     } else {
282       allowed = left;
283     }
284 
285     {
286       IOSTATS_TIMER_GUARD(write_nanos);
287       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
288 
289 #ifndef ROCKSDB_LITE
290       FileOperationInfo::TimePoint start_ts;
291       uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
292       if (ShouldNotifyListeners()) {
293         start_ts = std::chrono::system_clock::now();
294         old_size = next_write_offset_;
295       }
296 #endif
297       {
298         auto prev_perf_level = GetPerfLevel();
299         IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
300         s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
301         SetPerfLevel(prev_perf_level);
302       }
303 #ifndef ROCKSDB_LITE
304       if (ShouldNotifyListeners()) {
305         auto finish_ts = std::chrono::system_clock::now();
306         NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
307       }
308 #endif
309       if (!s.ok()) {
310         return s;
311       }
312     }
313 
314     IOSTATS_ADD(bytes_written, allowed);
315     TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
316 
317     left -= allowed;
318     src += allowed;
319   }
320   buf_.Size(0);
321   return s;
322 }
323 
324 // This flushes the accumulated data in the buffer. We pad data with zeros if
325 // necessary to the whole page.
326 // However, during automatic flushes padding would not be necessary.
327 // We always use RateLimiter if available. We move (Refit) any buffer bytes
328 // that are left over the
329 // whole number of pages to be written again on the next flush because we can
330 // only write on aligned
331 // offsets.
332 #ifndef ROCKSDB_LITE
WriteDirect()333 Status WritableFileWriter::WriteDirect() {
334   assert(use_direct_io());
335   Status s;
336   const size_t alignment = buf_.Alignment();
337   assert((next_write_offset_ % alignment) == 0);
338 
339   // Calculate whole page final file advance if all writes succeed
340   size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
341 
342   // Calculate the leftover tail, we write it here padded with zeros BUT we
343   // will write
344   // it again in the future either on Close() OR when the current whole page
345   // fills out
346   size_t leftover_tail = buf_.CurrentSize() - file_advance;
347 
348   // Round up and pad
349   buf_.PadToAlignmentWith(0);
350 
351   const char* src = buf_.BufferStart();
352   uint64_t write_offset = next_write_offset_;
353   size_t left = buf_.CurrentSize();
354 
355   while (left > 0) {
356     // Check how much is allowed
357     size_t size;
358     if (rate_limiter_ != nullptr) {
359       size = rate_limiter_->RequestToken(left, buf_.Alignment(),
360                                          writable_file_->GetIOPriority(),
361                                          stats_, RateLimiter::OpType::kWrite);
362     } else {
363       size = left;
364     }
365 
366     {
367       IOSTATS_TIMER_GUARD(write_nanos);
368       TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
369       FileOperationInfo::TimePoint start_ts;
370       if (ShouldNotifyListeners()) {
371         start_ts = std::chrono::system_clock::now();
372       }
373       // direct writes must be positional
374       s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
375                                            IOOptions(), nullptr);
376       if (ShouldNotifyListeners()) {
377         auto finish_ts = std::chrono::system_clock::now();
378         NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
379       }
380       if (!s.ok()) {
381         buf_.Size(file_advance + leftover_tail);
382         return s;
383       }
384     }
385 
386     IOSTATS_ADD(bytes_written, size);
387     left -= size;
388     src += size;
389     write_offset += size;
390     assert((next_write_offset_ % alignment) == 0);
391   }
392 
393   if (s.ok()) {
394     // Move the tail to the beginning of the buffer
395     // This never happens during normal Append but rather during
396     // explicit call to Flush()/Sync() or Close()
397     buf_.RefitTail(file_advance, leftover_tail);
398     // This is where we start writing next time which may or not be
399     // the actual file size on disk. They match if the buffer size
400     // is a multiple of whole pages otherwise filesize_ is leftover_tail
401     // behind
402     next_write_offset_ += file_advance;
403   }
404   return s;
405 }
406 #endif  // !ROCKSDB_LITE
407 }  // namespace rocksdb
408