1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/writable_file_writer.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
21
22 namespace rocksdb {
Append(const Slice & data)23 Status WritableFileWriter::Append(const Slice& data) {
24 const char* src = data.data();
25 size_t left = data.size();
26 Status s;
27 pending_sync_ = true;
28
29 TEST_KILL_RANDOM("WritableFileWriter::Append:0",
30 rocksdb_kill_odds * REDUCE_ODDS2);
31
32 {
33 IOSTATS_TIMER_GUARD(prepare_write_nanos);
34 TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
35 writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
36 IOOptions(), nullptr);
37 }
38
39 // See whether we need to enlarge the buffer to avoid the flush
40 if (buf_.Capacity() - buf_.CurrentSize() < left) {
41 for (size_t cap = buf_.Capacity();
42 cap < max_buffer_size_; // There is still room to increase
43 cap *= 2) {
44 // See whether the next available size is large enough.
45 // Buffer will never be increased to more than max_buffer_size_.
46 size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
47 if (desired_capacity - buf_.CurrentSize() >= left ||
48 (use_direct_io() && desired_capacity == max_buffer_size_)) {
49 buf_.AllocateNewBuffer(desired_capacity, true);
50 break;
51 }
52 }
53 }
54
55 // Flush only when buffered I/O
56 if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
57 if (buf_.CurrentSize() > 0) {
58 s = Flush();
59 if (!s.ok()) {
60 return s;
61 }
62 }
63 assert(buf_.CurrentSize() == 0);
64 }
65
66 // We never write directly to disk with direct I/O on.
67 // or we simply use it for its original purpose to accumulate many small
68 // chunks
69 if (use_direct_io() || (buf_.Capacity() >= left)) {
70 while (left > 0) {
71 size_t appended = buf_.Append(src, left);
72 left -= appended;
73 src += appended;
74
75 if (left > 0) {
76 s = Flush();
77 if (!s.ok()) {
78 break;
79 }
80 }
81 }
82 } else {
83 // Writing directly to file bypassing the buffer
84 assert(buf_.CurrentSize() == 0);
85 s = WriteBuffered(src, left);
86 }
87
88 TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
89 if (s.ok()) {
90 filesize_ += data.size();
91 }
92 return s;
93 }
94
Pad(const size_t pad_bytes)95 Status WritableFileWriter::Pad(const size_t pad_bytes) {
96 assert(pad_bytes < kDefaultPageSize);
97 size_t left = pad_bytes;
98 size_t cap = buf_.Capacity() - buf_.CurrentSize();
99
100 // Assume pad_bytes is small compared to buf_ capacity. So we always
101 // use buf_ rather than write directly to file in certain cases like
102 // Append() does.
103 while (left) {
104 size_t append_bytes = std::min(cap, left);
105 buf_.PadWith(append_bytes, 0);
106 left -= append_bytes;
107 if (left > 0) {
108 Status s = Flush();
109 if (!s.ok()) {
110 return s;
111 }
112 }
113 cap = buf_.Capacity() - buf_.CurrentSize();
114 }
115 pending_sync_ = true;
116 filesize_ += pad_bytes;
117 return Status::OK();
118 }
119
Close()120 Status WritableFileWriter::Close() {
121 // Do not quit immediately on failure the file MUST be closed
122 Status s;
123
124 // Possible to close it twice now as we MUST close
125 // in __dtor, simply flushing is not enough
126 // Windows when pre-allocating does not fill with zeros
127 // also with unbuffered access we also set the end of data.
128 if (!writable_file_) {
129 return s;
130 }
131
132 s = Flush(); // flush cache to OS
133
134 Status interim;
135 // In direct I/O mode we write whole pages so
136 // we need to let the file know where data ends.
137 if (use_direct_io()) {
138 interim = writable_file_->Truncate(filesize_, IOOptions(), nullptr);
139 if (interim.ok()) {
140 interim = writable_file_->Fsync(IOOptions(), nullptr);
141 }
142 if (!interim.ok() && s.ok()) {
143 s = interim;
144 }
145 }
146
147 TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds);
148 interim = writable_file_->Close(IOOptions(), nullptr);
149 if (!interim.ok() && s.ok()) {
150 s = interim;
151 }
152
153 writable_file_.reset();
154 TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds);
155
156 return s;
157 }
158
159 // write out the cached data to the OS cache or storage if direct I/O
160 // enabled
Flush()161 Status WritableFileWriter::Flush() {
162 Status s;
163 TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
164 rocksdb_kill_odds * REDUCE_ODDS2);
165
166 if (buf_.CurrentSize() > 0) {
167 if (use_direct_io()) {
168 #ifndef ROCKSDB_LITE
169 if (pending_sync_) {
170 s = WriteDirect();
171 }
172 #endif // !ROCKSDB_LITE
173 } else {
174 s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize());
175 }
176 if (!s.ok()) {
177 return s;
178 }
179 }
180
181 s = writable_file_->Flush(IOOptions(), nullptr);
182
183 if (!s.ok()) {
184 return s;
185 }
186
187 // sync OS cache to disk for every bytes_per_sync_
188 // TODO: give log file and sst file different options (log
189 // files could be potentially cached in OS for their whole
190 // life time, thus we might not want to flush at all).
191
192 // We try to avoid sync to the last 1MB of data. For two reasons:
193 // (1) avoid rewrite the same page that is modified later.
194 // (2) for older version of OS, write can block while writing out
195 // the page.
196 // Xfs does neighbor page flushing outside of the specified ranges. We
197 // need to make sure sync range is far from the write offset.
198 if (!use_direct_io() && bytes_per_sync_) {
199 const uint64_t kBytesNotSyncRange =
200 1024 * 1024; // recent 1MB is not synced.
201 const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB.
202 if (filesize_ > kBytesNotSyncRange) {
203 uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange;
204 offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
205 assert(offset_sync_to >= last_sync_size_);
206 if (offset_sync_to > 0 &&
207 offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
208 s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
209 last_sync_size_ = offset_sync_to;
210 }
211 }
212 }
213
214 return s;
215 }
216
Sync(bool use_fsync)217 Status WritableFileWriter::Sync(bool use_fsync) {
218 Status s = Flush();
219 if (!s.ok()) {
220 return s;
221 }
222 TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds);
223 if (!use_direct_io() && pending_sync_) {
224 s = SyncInternal(use_fsync);
225 if (!s.ok()) {
226 return s;
227 }
228 }
229 TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
230 pending_sync_ = false;
231 return Status::OK();
232 }
233
SyncWithoutFlush(bool use_fsync)234 Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
235 if (!writable_file_->IsSyncThreadSafe()) {
236 return Status::NotSupported(
237 "Can't WritableFileWriter::SyncWithoutFlush() because "
238 "WritableFile::IsSyncThreadSafe() is false");
239 }
240 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
241 Status s = SyncInternal(use_fsync);
242 TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
243 return s;
244 }
245
SyncInternal(bool use_fsync)246 Status WritableFileWriter::SyncInternal(bool use_fsync) {
247 Status s;
248 IOSTATS_TIMER_GUARD(fsync_nanos);
249 TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
250 auto prev_perf_level = GetPerfLevel();
251 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
252 if (use_fsync) {
253 s = writable_file_->Fsync(IOOptions(), nullptr);
254 } else {
255 s = writable_file_->Sync(IOOptions(), nullptr);
256 }
257 SetPerfLevel(prev_perf_level);
258 return s;
259 }
260
RangeSync(uint64_t offset,uint64_t nbytes)261 Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
262 IOSTATS_TIMER_GUARD(range_sync_nanos);
263 TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
264 return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
265 }
266
267 // This method writes to disk the specified data and makes use of the rate
268 // limiter if available
WriteBuffered(const char * data,size_t size)269 Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
270 Status s;
271 assert(!use_direct_io());
272 const char* src = data;
273 size_t left = size;
274
275 while (left > 0) {
276 size_t allowed;
277 if (rate_limiter_ != nullptr) {
278 allowed = rate_limiter_->RequestToken(
279 left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_,
280 RateLimiter::OpType::kWrite);
281 } else {
282 allowed = left;
283 }
284
285 {
286 IOSTATS_TIMER_GUARD(write_nanos);
287 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
288
289 #ifndef ROCKSDB_LITE
290 FileOperationInfo::TimePoint start_ts;
291 uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr);
292 if (ShouldNotifyListeners()) {
293 start_ts = std::chrono::system_clock::now();
294 old_size = next_write_offset_;
295 }
296 #endif
297 {
298 auto prev_perf_level = GetPerfLevel();
299 IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_);
300 s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr);
301 SetPerfLevel(prev_perf_level);
302 }
303 #ifndef ROCKSDB_LITE
304 if (ShouldNotifyListeners()) {
305 auto finish_ts = std::chrono::system_clock::now();
306 NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
307 }
308 #endif
309 if (!s.ok()) {
310 return s;
311 }
312 }
313
314 IOSTATS_ADD(bytes_written, allowed);
315 TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds);
316
317 left -= allowed;
318 src += allowed;
319 }
320 buf_.Size(0);
321 return s;
322 }
323
324 // This flushes the accumulated data in the buffer. We pad data with zeros if
325 // necessary to the whole page.
326 // However, during automatic flushes padding would not be necessary.
327 // We always use RateLimiter if available. We move (Refit) any buffer bytes
328 // that are left over the
329 // whole number of pages to be written again on the next flush because we can
330 // only write on aligned
331 // offsets.
332 #ifndef ROCKSDB_LITE
WriteDirect()333 Status WritableFileWriter::WriteDirect() {
334 assert(use_direct_io());
335 Status s;
336 const size_t alignment = buf_.Alignment();
337 assert((next_write_offset_ % alignment) == 0);
338
339 // Calculate whole page final file advance if all writes succeed
340 size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize());
341
342 // Calculate the leftover tail, we write it here padded with zeros BUT we
343 // will write
344 // it again in the future either on Close() OR when the current whole page
345 // fills out
346 size_t leftover_tail = buf_.CurrentSize() - file_advance;
347
348 // Round up and pad
349 buf_.PadToAlignmentWith(0);
350
351 const char* src = buf_.BufferStart();
352 uint64_t write_offset = next_write_offset_;
353 size_t left = buf_.CurrentSize();
354
355 while (left > 0) {
356 // Check how much is allowed
357 size_t size;
358 if (rate_limiter_ != nullptr) {
359 size = rate_limiter_->RequestToken(left, buf_.Alignment(),
360 writable_file_->GetIOPriority(),
361 stats_, RateLimiter::OpType::kWrite);
362 } else {
363 size = left;
364 }
365
366 {
367 IOSTATS_TIMER_GUARD(write_nanos);
368 TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
369 FileOperationInfo::TimePoint start_ts;
370 if (ShouldNotifyListeners()) {
371 start_ts = std::chrono::system_clock::now();
372 }
373 // direct writes must be positional
374 s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
375 IOOptions(), nullptr);
376 if (ShouldNotifyListeners()) {
377 auto finish_ts = std::chrono::system_clock::now();
378 NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
379 }
380 if (!s.ok()) {
381 buf_.Size(file_advance + leftover_tail);
382 return s;
383 }
384 }
385
386 IOSTATS_ADD(bytes_written, size);
387 left -= size;
388 src += size;
389 write_offset += size;
390 assert((next_write_offset_ % alignment) == 0);
391 }
392
393 if (s.ok()) {
394 // Move the tail to the beginning of the buffer
395 // This never happens during normal Append but rather during
396 // explicit call to Flush()/Sync() or Close()
397 buf_.RefitTail(file_advance, leftover_tail);
398 // This is where we start writing next time which may or not be
399 // the actual file size on disk. They match if the buffer size
400 // is a multiple of whole pages otherwise filesize_ is leftover_tail
401 // behind
402 next_write_offset_ += file_advance;
403 }
404 return s;
405 }
406 #endif // !ROCKSDB_LITE
407 } // namespace rocksdb
408