1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11
12 #include <algorithm>
13 #include <array>
14 #include <cinttypes>
15 #include <cstdio>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22
23 #include "compaction/compaction.h"
24 #include "db/blob/blob_fetcher.h"
25 #include "db/blob/blob_file_cache.h"
26 #include "db/blob/blob_file_reader.h"
27 #include "db/blob/blob_index.h"
28 #include "db/blob/blob_log_format.h"
29 #include "db/internal_stats.h"
30 #include "db/log_reader.h"
31 #include "db/log_writer.h"
32 #include "db/memtable.h"
33 #include "db/merge_context.h"
34 #include "db/merge_helper.h"
35 #include "db/pinned_iterators_manager.h"
36 #include "db/table_cache.h"
37 #include "db/version_builder.h"
38 #include "db/version_edit_handler.h"
39 #include "file/filename.h"
40 #include "file/random_access_file_reader.h"
41 #include "file/read_write_util.h"
42 #include "file/writable_file_writer.h"
43 #include "logging/logging.h"
44 #include "monitoring/file_read_sample.h"
45 #include "monitoring/perf_context_imp.h"
46 #include "monitoring/persistent_stats_history.h"
47 #include "options/options_helper.h"
48 #include "rocksdb/env.h"
49 #include "rocksdb/merge_operator.h"
50 #include "rocksdb/write_buffer_manager.h"
51 #include "table/format.h"
52 #include "table/get_context.h"
53 #include "table/internal_iterator.h"
54 #include "table/merging_iterator.h"
55 #include "table/meta_blocks.h"
56 #include "table/multiget_context.h"
57 #include "table/plain/plain_table_factory.h"
58 #include "table/table_reader.h"
59 #include "table/two_level_iterator.h"
60 #include "test_util/sync_point.h"
61 #include "util/cast_util.h"
62 #include "util/coding.h"
63 #include "util/stop_watch.h"
64 #include "util/string_util.h"
65 #include "util/user_comparator_wrapper.h"
66
67 namespace ROCKSDB_NAMESPACE {
68
69 namespace {
70
71 // Find File in LevelFilesBrief data structure
72 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)73 int FindFileInRange(const InternalKeyComparator& icmp,
74 const LevelFilesBrief& file_level,
75 const Slice& key,
76 uint32_t left,
77 uint32_t right) {
78 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
79 return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
80 };
81 const auto &b = file_level.files;
82 return static_cast<int>(std::lower_bound(b + left,
83 b + right, key, cmp) - b);
84 }
85
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)86 Status OverlapWithIterator(const Comparator* ucmp,
87 const Slice& smallest_user_key,
88 const Slice& largest_user_key,
89 InternalIterator* iter,
90 bool* overlap) {
91 InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
92 kValueTypeForSeek);
93 iter->Seek(range_start.Encode());
94 if (!iter->status().ok()) {
95 return iter->status();
96 }
97
98 *overlap = false;
99 if (iter->Valid()) {
100 ParsedInternalKey seek_result;
101 Status s = ParseInternalKey(iter->key(), &seek_result,
102 false /* log_err_key */); // TODO
103 if (!s.ok()) return s;
104
105 if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
106 0) {
107 *overlap = true;
108 }
109 }
110
111 return iter->status();
112 }
113
114 // Class to help choose the next file to search for the particular key.
115 // Searches and returns files level by level.
116 // We can search level-by-level since entries never hop across
117 // levels. Therefore we are guaranteed that if we find data
118 // in a smaller level, later levels are irrelevant (unless we
119 // are MergeInProgress).
120 class FilePicker {
121 public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)122 FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
123 const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
124 unsigned int num_levels, FileIndexer* file_indexer,
125 const Comparator* user_comparator,
126 const InternalKeyComparator* internal_comparator)
127 : num_levels_(num_levels),
128 curr_level_(static_cast<unsigned int>(-1)),
129 returned_file_level_(static_cast<unsigned int>(-1)),
130 hit_file_level_(static_cast<unsigned int>(-1)),
131 search_left_bound_(0),
132 search_right_bound_(FileIndexer::kLevelMaxIndex),
133 #ifndef NDEBUG
134 files_(files),
135 #endif
136 level_files_brief_(file_levels),
137 is_hit_file_last_in_level_(false),
138 curr_file_level_(nullptr),
139 user_key_(user_key),
140 ikey_(ikey),
141 file_indexer_(file_indexer),
142 user_comparator_(user_comparator),
143 internal_comparator_(internal_comparator) {
144 #ifdef NDEBUG
145 (void)files;
146 #endif
147 // Setup member variables to search first level.
148 search_ended_ = !PrepareNextLevel();
149 if (!search_ended_) {
150 // Prefetch Level 0 table data to avoid cache miss if possible.
151 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
152 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
153 if (r) {
154 r->Prepare(ikey);
155 }
156 }
157 }
158 }
159
GetCurrentLevel() const160 int GetCurrentLevel() const { return curr_level_; }
161
GetNextFile()162 FdWithKeyRange* GetNextFile() {
163 while (!search_ended_) { // Loops over different levels.
164 while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
165 // Loops over all files in current level.
166 FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
167 hit_file_level_ = curr_level_;
168 is_hit_file_last_in_level_ =
169 curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
170 int cmp_largest = -1;
171
172 // Do key range filtering of files or/and fractional cascading if:
173 // (1) not all the files are in level 0, or
174 // (2) there are more than 3 current level files
175 // If there are only 3 or less current level files in the system, we skip
176 // the key range filtering. In this case, more likely, the system is
177 // highly tuned to minimize number of tables queried by each query,
178 // so it is unlikely that key range filtering is more efficient than
179 // querying the files.
180 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
181 // Check if key is within a file's range. If search left bound and
182 // right bound point to the same find, we are sure key falls in
183 // range.
184 assert(curr_level_ == 0 ||
185 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
186 user_comparator_->CompareWithoutTimestamp(
187 user_key_, ExtractUserKey(f->smallest_key)) <= 0);
188
189 int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
190 user_key_, ExtractUserKey(f->smallest_key));
191 if (cmp_smallest >= 0) {
192 cmp_largest = user_comparator_->CompareWithoutTimestamp(
193 user_key_, ExtractUserKey(f->largest_key));
194 }
195
196 // Setup file search bound for the next level based on the
197 // comparison results
198 if (curr_level_ > 0) {
199 file_indexer_->GetNextLevelIndex(curr_level_,
200 curr_index_in_curr_level_,
201 cmp_smallest, cmp_largest,
202 &search_left_bound_,
203 &search_right_bound_);
204 }
205 // Key falls out of current file's range
206 if (cmp_smallest < 0 || cmp_largest > 0) {
207 if (curr_level_ == 0) {
208 ++curr_index_in_curr_level_;
209 continue;
210 } else {
211 // Search next level.
212 break;
213 }
214 }
215 }
216 #ifndef NDEBUG
217 // Sanity check to make sure that the files are correctly sorted
218 if (prev_file_) {
219 if (curr_level_ != 0) {
220 int comp_sign = internal_comparator_->Compare(
221 prev_file_->largest_key, f->smallest_key);
222 assert(comp_sign < 0);
223 } else {
224 // level == 0, the current file cannot be newer than the previous
225 // one. Use compressed data structure, has no attribute seqNo
226 assert(curr_index_in_curr_level_ > 0);
227 assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
228 files_[0][curr_index_in_curr_level_-1]));
229 }
230 }
231 prev_file_ = f;
232 #endif
233 returned_file_level_ = curr_level_;
234 if (curr_level_ > 0 && cmp_largest < 0) {
235 // No more files to search in this level.
236 search_ended_ = !PrepareNextLevel();
237 } else {
238 ++curr_index_in_curr_level_;
239 }
240 return f;
241 }
242 // Start searching next level.
243 search_ended_ = !PrepareNextLevel();
244 }
245 // Search ended.
246 return nullptr;
247 }
248
249 // getter for current file level
250 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()251 unsigned int GetHitFileLevel() { return hit_file_level_; }
252
253 // Returns true if the most recent "hit file" (i.e., one returned by
254 // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()255 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
256
257 private:
258 unsigned int num_levels_;
259 unsigned int curr_level_;
260 unsigned int returned_file_level_;
261 unsigned int hit_file_level_;
262 int32_t search_left_bound_;
263 int32_t search_right_bound_;
264 #ifndef NDEBUG
265 std::vector<FileMetaData*>* files_;
266 #endif
267 autovector<LevelFilesBrief>* level_files_brief_;
268 bool search_ended_;
269 bool is_hit_file_last_in_level_;
270 LevelFilesBrief* curr_file_level_;
271 unsigned int curr_index_in_curr_level_;
272 unsigned int start_index_in_curr_level_;
273 Slice user_key_;
274 Slice ikey_;
275 FileIndexer* file_indexer_;
276 const Comparator* user_comparator_;
277 const InternalKeyComparator* internal_comparator_;
278 #ifndef NDEBUG
279 FdWithKeyRange* prev_file_;
280 #endif
281
282 // Setup local variables to search next level.
283 // Returns false if there are no more levels to search.
PrepareNextLevel()284 bool PrepareNextLevel() {
285 curr_level_++;
286 while (curr_level_ < num_levels_) {
287 curr_file_level_ = &(*level_files_brief_)[curr_level_];
288 if (curr_file_level_->num_files == 0) {
289 // When current level is empty, the search bound generated from upper
290 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
291 // also empty.
292 assert(search_left_bound_ == 0);
293 assert(search_right_bound_ == -1 ||
294 search_right_bound_ == FileIndexer::kLevelMaxIndex);
295 // Since current level is empty, it will need to search all files in
296 // the next level
297 search_left_bound_ = 0;
298 search_right_bound_ = FileIndexer::kLevelMaxIndex;
299 curr_level_++;
300 continue;
301 }
302
303 // Some files may overlap each other. We find
304 // all files that overlap user_key and process them in order from
305 // newest to oldest. In the context of merge-operator, this can occur at
306 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
307 // are always compacted into a single entry).
308 int32_t start_index;
309 if (curr_level_ == 0) {
310 // On Level-0, we read through all files to check for overlap.
311 start_index = 0;
312 } else {
313 // On Level-n (n>=1), files are sorted. Binary search to find the
314 // earliest file whose largest key >= ikey. Search left bound and
315 // right bound are used to narrow the range.
316 if (search_left_bound_ <= search_right_bound_) {
317 if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
318 search_right_bound_ =
319 static_cast<int32_t>(curr_file_level_->num_files) - 1;
320 }
321 // `search_right_bound_` is an inclusive upper-bound, but since it was
322 // determined based on user key, it is still possible the lookup key
323 // falls to the right of `search_right_bound_`'s corresponding file.
324 // So, pass a limit one higher, which allows us to detect this case.
325 start_index =
326 FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
327 static_cast<uint32_t>(search_left_bound_),
328 static_cast<uint32_t>(search_right_bound_) + 1);
329 if (start_index == search_right_bound_ + 1) {
330 // `ikey_` comes after `search_right_bound_`. The lookup key does
331 // not exist on this level, so let's skip this level and do a full
332 // binary search on the next level.
333 search_left_bound_ = 0;
334 search_right_bound_ = FileIndexer::kLevelMaxIndex;
335 curr_level_++;
336 continue;
337 }
338 } else {
339 // search_left_bound > search_right_bound, key does not exist in
340 // this level. Since no comparison is done in this level, it will
341 // need to search all files in the next level.
342 search_left_bound_ = 0;
343 search_right_bound_ = FileIndexer::kLevelMaxIndex;
344 curr_level_++;
345 continue;
346 }
347 }
348 start_index_in_curr_level_ = start_index;
349 curr_index_in_curr_level_ = start_index;
350 #ifndef NDEBUG
351 prev_file_ = nullptr;
352 #endif
353 return true;
354 }
355 // curr_level_ = num_levels_. So, no more levels to search.
356 return false;
357 }
358 };
359
360 class FilePickerMultiGet {
361 private:
362 struct FilePickerContext;
363
364 public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)365 FilePickerMultiGet(MultiGetRange* range,
366 autovector<LevelFilesBrief>* file_levels,
367 unsigned int num_levels, FileIndexer* file_indexer,
368 const Comparator* user_comparator,
369 const InternalKeyComparator* internal_comparator)
370 : num_levels_(num_levels),
371 curr_level_(static_cast<unsigned int>(-1)),
372 returned_file_level_(static_cast<unsigned int>(-1)),
373 hit_file_level_(static_cast<unsigned int>(-1)),
374 range_(range),
375 batch_iter_(range->begin()),
376 batch_iter_prev_(range->begin()),
377 upper_key_(range->begin()),
378 maybe_repeat_key_(false),
379 current_level_range_(*range, range->begin(), range->end()),
380 current_file_range_(*range, range->begin(), range->end()),
381 level_files_brief_(file_levels),
382 is_hit_file_last_in_level_(false),
383 curr_file_level_(nullptr),
384 file_indexer_(file_indexer),
385 user_comparator_(user_comparator),
386 internal_comparator_(internal_comparator) {
387 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
388 fp_ctx_array_[iter.index()] =
389 FilePickerContext(0, FileIndexer::kLevelMaxIndex);
390 }
391
392 // Setup member variables to search first level.
393 search_ended_ = !PrepareNextLevel();
394 if (!search_ended_) {
395 // REVISIT
396 // Prefetch Level 0 table data to avoid cache miss if possible.
397 // As of now, only PlainTableReader and CuckooTableReader do any
398 // prefetching. This may not be necessary anymore once we implement
399 // batching in those table readers
400 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
401 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
402 if (r) {
403 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
404 r->Prepare(iter->ikey);
405 }
406 }
407 }
408 }
409 }
410
GetCurrentLevel() const411 int GetCurrentLevel() const { return curr_level_; }
412
413 // Iterates through files in the current level until it finds a file that
414 // contains at least one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)415 bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
416 size_t* file_index, FdWithKeyRange** fd,
417 bool* is_last_key_in_file) {
418 size_t curr_file_index = *file_index;
419 FdWithKeyRange* f = nullptr;
420 bool file_hit = false;
421 int cmp_largest = -1;
422 if (curr_file_index >= curr_file_level_->num_files) {
423 // In the unlikely case the next key is a duplicate of the current key,
424 // and the current key is the last in the level and the internal key
425 // was not found, we need to skip lookup for the remaining keys and
426 // reset the search bounds
427 if (batch_iter_ != current_level_range_.end()) {
428 ++batch_iter_;
429 for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
430 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
431 fp_ctx.search_left_bound = 0;
432 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
433 }
434 }
435 return false;
436 }
437 // Loops over keys in the MultiGet batch until it finds a file with
438 // atleast one of the keys. Then it keeps moving forward until the
439 // last key in the batch that falls in that file
440 while (batch_iter_ != current_level_range_.end() &&
441 (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
442 curr_file_index ||
443 !file_hit)) {
444 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
445 f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
446 Slice& user_key = batch_iter_->ukey_without_ts;
447
448 // Do key range filtering of files or/and fractional cascading if:
449 // (1) not all the files are in level 0, or
450 // (2) there are more than 3 current level files
451 // If there are only 3 or less current level files in the system, we
452 // skip the key range filtering. In this case, more likely, the system
453 // is highly tuned to minimize number of tables queried by each query,
454 // so it is unlikely that key range filtering is more efficient than
455 // querying the files.
456 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
457 // Check if key is within a file's range. If search left bound and
458 // right bound point to the same find, we are sure key falls in
459 // range.
460 int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
461 user_key, false, ExtractUserKey(f->smallest_key), true);
462
463 assert(curr_level_ == 0 ||
464 fp_ctx.curr_index_in_curr_level ==
465 fp_ctx.start_index_in_curr_level ||
466 cmp_smallest <= 0);
467
468 if (cmp_smallest >= 0) {
469 cmp_largest = user_comparator_->CompareWithoutTimestamp(
470 user_key, false, ExtractUserKey(f->largest_key), true);
471 } else {
472 cmp_largest = -1;
473 }
474
475 // Setup file search bound for the next level based on the
476 // comparison results
477 if (curr_level_ > 0) {
478 file_indexer_->GetNextLevelIndex(
479 curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
480 cmp_largest, &fp_ctx.search_left_bound,
481 &fp_ctx.search_right_bound);
482 }
483 // Key falls out of current file's range
484 if (cmp_smallest < 0 || cmp_largest > 0) {
485 next_file_range->SkipKey(batch_iter_);
486 } else {
487 file_hit = true;
488 }
489 } else {
490 file_hit = true;
491 }
492 if (cmp_largest == 0) {
493 // cmp_largest is 0, which means the next key will not be in this
494 // file, so stop looking further. However, its possible there are
495 // duplicates in the batch, so find the upper bound for the batch
496 // in this file (upper_key_) by skipping past the duplicates. We
497 // leave batch_iter_ as is since we may have to pick up from there
498 // for the next file, if this file has a merge value rather than
499 // final value
500 upper_key_ = batch_iter_;
501 ++upper_key_;
502 while (upper_key_ != current_level_range_.end() &&
503 user_comparator_->CompareWithoutTimestamp(
504 batch_iter_->ukey_without_ts, false,
505 upper_key_->ukey_without_ts, false) == 0) {
506 ++upper_key_;
507 }
508 break;
509 } else {
510 if (curr_level_ == 0) {
511 // We need to look through all files in level 0
512 ++fp_ctx.curr_index_in_curr_level;
513 }
514 ++batch_iter_;
515 }
516 if (!file_hit) {
517 curr_file_index =
518 (batch_iter_ != current_level_range_.end())
519 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
520 : curr_file_level_->num_files;
521 }
522 }
523
524 *fd = f;
525 *file_index = curr_file_index;
526 *is_last_key_in_file = cmp_largest == 0;
527 if (!*is_last_key_in_file) {
528 // If the largest key in the batch overlapping the file is not the
529 // largest key in the file, upper_ley_ would not have been updated so
530 // update it here
531 upper_key_ = batch_iter_;
532 }
533 return file_hit;
534 }
535
GetNextFile()536 FdWithKeyRange* GetNextFile() {
537 while (!search_ended_) {
538 // Start searching next level.
539 if (batch_iter_ == current_level_range_.end()) {
540 search_ended_ = !PrepareNextLevel();
541 continue;
542 } else {
543 if (maybe_repeat_key_) {
544 maybe_repeat_key_ = false;
545 // Check if we found the final value for the last key in the
546 // previous lookup range. If we did, then there's no need to look
547 // any further for that key, so advance batch_iter_. Else, keep
548 // batch_iter_ positioned on that key so we look it up again in
549 // the next file
550 // For L0, always advance the key because we will look in the next
551 // file regardless for all keys not found yet
552 if (current_level_range_.CheckKeyDone(batch_iter_) ||
553 curr_level_ == 0) {
554 batch_iter_ = upper_key_;
555 }
556 }
557 // batch_iter_prev_ will become the start key for the next file
558 // lookup
559 batch_iter_prev_ = batch_iter_;
560 }
561
562 MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
563 current_level_range_.end());
564 size_t curr_file_index =
565 (batch_iter_ != current_level_range_.end())
566 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
567 : curr_file_level_->num_files;
568 FdWithKeyRange* f;
569 bool is_last_key_in_file;
570 if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
571 &is_last_key_in_file)) {
572 search_ended_ = !PrepareNextLevel();
573 } else {
574 if (is_last_key_in_file) {
575 // Since cmp_largest is 0, batch_iter_ still points to the last key
576 // that falls in this file, instead of the next one. Increment
577 // the file index for all keys between batch_iter_ and upper_key_
578 auto tmp_iter = batch_iter_;
579 while (tmp_iter != upper_key_) {
580 ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
581 ++tmp_iter;
582 }
583 maybe_repeat_key_ = true;
584 }
585 // Set the range for this file
586 current_file_range_ =
587 MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
588 returned_file_level_ = curr_level_;
589 hit_file_level_ = curr_level_;
590 is_hit_file_last_in_level_ =
591 curr_file_index == curr_file_level_->num_files - 1;
592 return f;
593 }
594 }
595
596 // Search ended
597 return nullptr;
598 }
599
600 // getter for current file level
601 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()602 unsigned int GetHitFileLevel() { return hit_file_level_; }
603
604 // Returns true if the most recent "hit file" (i.e., one returned by
605 // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()606 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
607
CurrentFileRange()608 const MultiGetRange& CurrentFileRange() { return current_file_range_; }
609
610 private:
611 unsigned int num_levels_;
612 unsigned int curr_level_;
613 unsigned int returned_file_level_;
614 unsigned int hit_file_level_;
615
616 struct FilePickerContext {
617 int32_t search_left_bound;
618 int32_t search_right_bound;
619 unsigned int curr_index_in_curr_level;
620 unsigned int start_index_in_curr_level;
621
FilePickerContextROCKSDB_NAMESPACE::__anon6d02991c0111::FilePickerMultiGet::FilePickerContext622 FilePickerContext(int32_t left, int32_t right)
623 : search_left_bound(left), search_right_bound(right),
624 curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
625
626 FilePickerContext() = default;
627 };
628 std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
629 MultiGetRange* range_;
630 // Iterator to iterate through the keys in a MultiGet batch, that gets reset
631 // at the beginning of each level. Each call to GetNextFile() will position
632 // batch_iter_ at or right after the last key that was found in the returned
633 // SST file
634 MultiGetRange::Iterator batch_iter_;
635 // An iterator that records the previous position of batch_iter_, i.e last
636 // key found in the previous SST file, in order to serve as the start of
637 // the batch key range for the next SST file
638 MultiGetRange::Iterator batch_iter_prev_;
639 MultiGetRange::Iterator upper_key_;
640 bool maybe_repeat_key_;
641 MultiGetRange current_level_range_;
642 MultiGetRange current_file_range_;
643 autovector<LevelFilesBrief>* level_files_brief_;
644 bool search_ended_;
645 bool is_hit_file_last_in_level_;
646 LevelFilesBrief* curr_file_level_;
647 FileIndexer* file_indexer_;
648 const Comparator* user_comparator_;
649 const InternalKeyComparator* internal_comparator_;
650
651 // Setup local variables to search next level.
652 // Returns false if there are no more levels to search.
PrepareNextLevel()653 bool PrepareNextLevel() {
654 if (curr_level_ == 0) {
655 MultiGetRange::Iterator mget_iter = current_level_range_.begin();
656 if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
657 curr_file_level_->num_files) {
658 batch_iter_prev_ = current_level_range_.begin();
659 upper_key_ = batch_iter_ = current_level_range_.begin();
660 return true;
661 }
662 }
663
664 curr_level_++;
665 // Reset key range to saved value
666 while (curr_level_ < num_levels_) {
667 bool level_contains_keys = false;
668 curr_file_level_ = &(*level_files_brief_)[curr_level_];
669 if (curr_file_level_->num_files == 0) {
670 // When current level is empty, the search bound generated from upper
671 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
672 // also empty.
673
674 for (auto mget_iter = current_level_range_.begin();
675 mget_iter != current_level_range_.end(); ++mget_iter) {
676 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
677
678 assert(fp_ctx.search_left_bound == 0);
679 assert(fp_ctx.search_right_bound == -1 ||
680 fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
681 // Since current level is empty, it will need to search all files in
682 // the next level
683 fp_ctx.search_left_bound = 0;
684 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
685 }
686 // Skip all subsequent empty levels
687 do {
688 ++curr_level_;
689 } while ((curr_level_ < num_levels_) &&
690 (*level_files_brief_)[curr_level_].num_files == 0);
691 continue;
692 }
693
694 // Some files may overlap each other. We find
695 // all files that overlap user_key and process them in order from
696 // newest to oldest. In the context of merge-operator, this can occur at
697 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
698 // are always compacted into a single entry).
699 int32_t start_index = -1;
700 current_level_range_ =
701 MultiGetRange(*range_, range_->begin(), range_->end());
702 for (auto mget_iter = current_level_range_.begin();
703 mget_iter != current_level_range_.end(); ++mget_iter) {
704 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
705 if (curr_level_ == 0) {
706 // On Level-0, we read through all files to check for overlap.
707 start_index = 0;
708 level_contains_keys = true;
709 } else {
710 // On Level-n (n>=1), files are sorted. Binary search to find the
711 // earliest file whose largest key >= ikey. Search left bound and
712 // right bound are used to narrow the range.
713 if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
714 if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
715 fp_ctx.search_right_bound =
716 static_cast<int32_t>(curr_file_level_->num_files) - 1;
717 }
718 // `search_right_bound_` is an inclusive upper-bound, but since it
719 // was determined based on user key, it is still possible the lookup
720 // key falls to the right of `search_right_bound_`'s corresponding
721 // file. So, pass a limit one higher, which allows us to detect this
722 // case.
723 Slice& ikey = mget_iter->ikey;
724 start_index = FindFileInRange(
725 *internal_comparator_, *curr_file_level_, ikey,
726 static_cast<uint32_t>(fp_ctx.search_left_bound),
727 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
728 if (start_index == fp_ctx.search_right_bound + 1) {
729 // `ikey_` comes after `search_right_bound_`. The lookup key does
730 // not exist on this level, so let's skip this level and do a full
731 // binary search on the next level.
732 fp_ctx.search_left_bound = 0;
733 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
734 current_level_range_.SkipKey(mget_iter);
735 continue;
736 } else {
737 level_contains_keys = true;
738 }
739 } else {
740 // search_left_bound > search_right_bound, key does not exist in
741 // this level. Since no comparison is done in this level, it will
742 // need to search all files in the next level.
743 fp_ctx.search_left_bound = 0;
744 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
745 current_level_range_.SkipKey(mget_iter);
746 continue;
747 }
748 }
749 fp_ctx.start_index_in_curr_level = start_index;
750 fp_ctx.curr_index_in_curr_level = start_index;
751 }
752 if (level_contains_keys) {
753 batch_iter_prev_ = current_level_range_.begin();
754 upper_key_ = batch_iter_ = current_level_range_.begin();
755 return true;
756 }
757 curr_level_++;
758 }
759 // curr_level_ = num_levels_. So, no more levels to search.
760 return false;
761 }
762 };
763 } // anonymous namespace
764
~VersionStorageInfo()765 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
766
~Version()767 Version::~Version() {
768 assert(refs_ == 0);
769
770 // Remove from linked list
771 prev_->next_ = next_;
772 next_->prev_ = prev_;
773
774 // Drop references to files
775 for (int level = 0; level < storage_info_.num_levels_; level++) {
776 for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
777 FileMetaData* f = storage_info_.files_[level][i];
778 assert(f->refs > 0);
779 f->refs--;
780 if (f->refs <= 0) {
781 assert(cfd_ != nullptr);
782 uint32_t path_id = f->fd.GetPathId();
783 assert(path_id < cfd_->ioptions()->cf_paths.size());
784 vset_->obsolete_files_.push_back(
785 ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
786 }
787 }
788 }
789 }
790
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)791 int FindFile(const InternalKeyComparator& icmp,
792 const LevelFilesBrief& file_level,
793 const Slice& key) {
794 return FindFileInRange(icmp, file_level, key, 0,
795 static_cast<uint32_t>(file_level.num_files));
796 }
797
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)798 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
799 const std::vector<FileMetaData*>& files,
800 Arena* arena) {
801 assert(file_level);
802 assert(arena);
803
804 size_t num = files.size();
805 file_level->num_files = num;
806 char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
807 file_level->files = new (mem)FdWithKeyRange[num];
808
809 for (size_t i = 0; i < num; i++) {
810 Slice smallest_key = files[i]->smallest.Encode();
811 Slice largest_key = files[i]->largest.Encode();
812
813 // Copy key slice to sequential memory
814 size_t smallest_size = smallest_key.size();
815 size_t largest_size = largest_key.size();
816 mem = arena->AllocateAligned(smallest_size + largest_size);
817 memcpy(mem, smallest_key.data(), smallest_size);
818 memcpy(mem + smallest_size, largest_key.data(), largest_size);
819
820 FdWithKeyRange& f = file_level->files[i];
821 f.fd = files[i]->fd;
822 f.file_metadata = files[i];
823 f.smallest_key = Slice(mem, smallest_size);
824 f.largest_key = Slice(mem + smallest_size, largest_size);
825 }
826 }
827
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)828 static bool AfterFile(const Comparator* ucmp,
829 const Slice* user_key, const FdWithKeyRange* f) {
830 // nullptr user_key occurs before all keys and is therefore never after *f
831 return (user_key != nullptr &&
832 ucmp->CompareWithoutTimestamp(*user_key,
833 ExtractUserKey(f->largest_key)) > 0);
834 }
835
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)836 static bool BeforeFile(const Comparator* ucmp,
837 const Slice* user_key, const FdWithKeyRange* f) {
838 // nullptr user_key occurs after all keys and is therefore never before *f
839 return (user_key != nullptr &&
840 ucmp->CompareWithoutTimestamp(*user_key,
841 ExtractUserKey(f->smallest_key)) < 0);
842 }
843
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)844 bool SomeFileOverlapsRange(
845 const InternalKeyComparator& icmp,
846 bool disjoint_sorted_files,
847 const LevelFilesBrief& file_level,
848 const Slice* smallest_user_key,
849 const Slice* largest_user_key) {
850 const Comparator* ucmp = icmp.user_comparator();
851 if (!disjoint_sorted_files) {
852 // Need to check against all files
853 for (size_t i = 0; i < file_level.num_files; i++) {
854 const FdWithKeyRange* f = &(file_level.files[i]);
855 if (AfterFile(ucmp, smallest_user_key, f) ||
856 BeforeFile(ucmp, largest_user_key, f)) {
857 // No overlap
858 } else {
859 return true; // Overlap
860 }
861 }
862 return false;
863 }
864
865 // Binary search over file list
866 uint32_t index = 0;
867 if (smallest_user_key != nullptr) {
868 // Find the leftmost possible internal key for smallest_user_key
869 InternalKey small;
870 small.SetMinPossibleForUserKey(*smallest_user_key);
871 index = FindFile(icmp, file_level, small.Encode());
872 }
873
874 if (index >= file_level.num_files) {
875 // beginning of range is after all files, so no overlap.
876 return false;
877 }
878
879 return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
880 }
881
882 namespace {
883
884 class LevelIterator final : public InternalIterator {
885 public:
886 // @param read_options Must outlive this iterator.
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr,bool allow_unprepared_value=false)887 LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
888 const FileOptions& file_options,
889 const InternalKeyComparator& icomparator,
890 const LevelFilesBrief* flevel,
891 const SliceTransform* prefix_extractor, bool should_sample,
892 HistogramImpl* file_read_hist, TableReaderCaller caller,
893 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
894 const std::vector<AtomicCompactionUnitBoundary>*
895 compaction_boundaries = nullptr,
896 bool allow_unprepared_value = false)
897 : table_cache_(table_cache),
898 read_options_(read_options),
899 file_options_(file_options),
900 icomparator_(icomparator),
901 user_comparator_(icomparator.user_comparator()),
902 flevel_(flevel),
903 prefix_extractor_(prefix_extractor),
904 file_read_hist_(file_read_hist),
905 should_sample_(should_sample),
906 caller_(caller),
907 skip_filters_(skip_filters),
908 allow_unprepared_value_(allow_unprepared_value),
909 file_index_(flevel_->num_files),
910 level_(level),
911 range_del_agg_(range_del_agg),
912 pinned_iters_mgr_(nullptr),
913 compaction_boundaries_(compaction_boundaries) {
914 // Empty level is not supported.
915 assert(flevel_ != nullptr && flevel_->num_files > 0);
916 }
917
~LevelIterator()918 ~LevelIterator() override { delete file_iter_.Set(nullptr); }
919
920 void Seek(const Slice& target) override;
921 void SeekForPrev(const Slice& target) override;
922 void SeekToFirst() override;
923 void SeekToLast() override;
924 void Next() final override;
925 bool NextAndGetResult(IterateResult* result) override;
926 void Prev() override;
927
Valid() const928 bool Valid() const override { return file_iter_.Valid(); }
key() const929 Slice key() const override {
930 assert(Valid());
931 return file_iter_.key();
932 }
933
value() const934 Slice value() const override {
935 assert(Valid());
936 return file_iter_.value();
937 }
938
status() const939 Status status() const override {
940 return file_iter_.iter() ? file_iter_.status() : Status::OK();
941 }
942
PrepareValue()943 bool PrepareValue() override {
944 return file_iter_.PrepareValue();
945 }
946
MayBeOutOfLowerBound()947 inline bool MayBeOutOfLowerBound() override {
948 assert(Valid());
949 return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
950 }
951
UpperBoundCheckResult()952 inline IterBoundCheck UpperBoundCheckResult() override {
953 if (Valid()) {
954 return file_iter_.UpperBoundCheckResult();
955 } else {
956 return IterBoundCheck::kUnknown;
957 }
958 }
959
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)960 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
961 pinned_iters_mgr_ = pinned_iters_mgr;
962 if (file_iter_.iter()) {
963 file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
964 }
965 }
966
IsKeyPinned() const967 bool IsKeyPinned() const override {
968 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
969 file_iter_.iter() && file_iter_.IsKeyPinned();
970 }
971
IsValuePinned() const972 bool IsValuePinned() const override {
973 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
974 file_iter_.iter() && file_iter_.IsValuePinned();
975 }
976
977 private:
978 // Return true if at least one invalid file is seen and skipped.
979 bool SkipEmptyFileForward();
980 void SkipEmptyFileBackward();
981 void SetFileIterator(InternalIterator* iter);
982 void InitFileIterator(size_t new_file_index);
983
file_smallest_key(size_t file_index)984 const Slice& file_smallest_key(size_t file_index) {
985 assert(file_index < flevel_->num_files);
986 return flevel_->files[file_index].smallest_key;
987 }
988
KeyReachedUpperBound(const Slice & internal_key)989 bool KeyReachedUpperBound(const Slice& internal_key) {
990 return read_options_.iterate_upper_bound != nullptr &&
991 user_comparator_.CompareWithoutTimestamp(
992 ExtractUserKey(internal_key), /*a_has_ts=*/true,
993 *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
994 }
995
NewFileIterator()996 InternalIterator* NewFileIterator() {
997 assert(file_index_ < flevel_->num_files);
998 auto file_meta = flevel_->files[file_index_];
999 if (should_sample_) {
1000 sample_file_read_inc(file_meta.file_metadata);
1001 }
1002
1003 const InternalKey* smallest_compaction_key = nullptr;
1004 const InternalKey* largest_compaction_key = nullptr;
1005 if (compaction_boundaries_ != nullptr) {
1006 smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
1007 largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
1008 }
1009 CheckMayBeOutOfLowerBound();
1010 return table_cache_->NewIterator(
1011 read_options_, file_options_, icomparator_, *file_meta.file_metadata,
1012 range_del_agg_, prefix_extractor_,
1013 nullptr /* don't need reference to table */, file_read_hist_, caller_,
1014 /*arena=*/nullptr, skip_filters_, level_,
1015 /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
1016 largest_compaction_key, allow_unprepared_value_);
1017 }
1018
1019 // Check if current file being fully within iterate_lower_bound.
1020 //
1021 // Note MyRocks may update iterate bounds between seek. To workaround it,
1022 // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()1023 void CheckMayBeOutOfLowerBound() {
1024 if (read_options_.iterate_lower_bound != nullptr &&
1025 file_index_ < flevel_->num_files) {
1026 may_be_out_of_lower_bound_ =
1027 user_comparator_.CompareWithoutTimestamp(
1028 ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
1029 *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
1030 }
1031 }
1032
1033 TableCache* table_cache_;
1034 const ReadOptions& read_options_;
1035 const FileOptions& file_options_;
1036 const InternalKeyComparator& icomparator_;
1037 const UserComparatorWrapper user_comparator_;
1038 const LevelFilesBrief* flevel_;
1039 mutable FileDescriptor current_value_;
1040 // `prefix_extractor_` may be non-null even for total order seek. Checking
1041 // this variable is not the right way to identify whether prefix iterator
1042 // is used.
1043 const SliceTransform* prefix_extractor_;
1044
1045 HistogramImpl* file_read_hist_;
1046 bool should_sample_;
1047 TableReaderCaller caller_;
1048 bool skip_filters_;
1049 bool allow_unprepared_value_;
1050 bool may_be_out_of_lower_bound_ = true;
1051 size_t file_index_;
1052 int level_;
1053 RangeDelAggregator* range_del_agg_;
1054 IteratorWrapper file_iter_; // May be nullptr
1055 PinnedIteratorsManager* pinned_iters_mgr_;
1056
1057 // To be propagated to RangeDelAggregator in order to safely truncate range
1058 // tombstones.
1059 const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1060 };
1061
Seek(const Slice & target)1062 void LevelIterator::Seek(const Slice& target) {
1063 // Check whether the seek key fall under the same file
1064 bool need_to_reseek = true;
1065 if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1066 const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1067 if (icomparator_.InternalKeyComparator::Compare(
1068 target, cur_file.largest_key) <= 0 &&
1069 icomparator_.InternalKeyComparator::Compare(
1070 target, cur_file.smallest_key) >= 0) {
1071 need_to_reseek = false;
1072 assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1073 file_index_);
1074 }
1075 }
1076 if (need_to_reseek) {
1077 TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1078 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1079 InitFileIterator(new_file_index);
1080 }
1081
1082 if (file_iter_.iter() != nullptr) {
1083 file_iter_.Seek(target);
1084 }
1085 if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1086 !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1087 file_iter_.iter() != nullptr && file_iter_.Valid()) {
1088 // We've skipped the file we initially positioned to. In the prefix
1089 // seek case, it is likely that the file is skipped because of
1090 // prefix bloom or hash, where more keys are skipped. We then check
1091 // the current key and invalidate the iterator if the prefix is
1092 // already passed.
1093 // When doing prefix iterator seek, when keys for one prefix have
1094 // been exhausted, it can jump to any key that is larger. Here we are
1095 // enforcing a stricter contract than that, in order to make it easier for
1096 // higher layers (merging and DB iterator) to reason the correctness:
1097 // 1. Within the prefix, the result should be accurate.
1098 // 2. If keys for the prefix is exhausted, it is either positioned to the
1099 // next key after the prefix, or make the iterator invalid.
1100 // A side benefit will be that it invalidates the iterator earlier so that
1101 // the upper level merging iterator can merge fewer child iterators.
1102 size_t ts_sz = user_comparator_.timestamp_size();
1103 Slice target_user_key_without_ts =
1104 ExtractUserKeyAndStripTimestamp(target, ts_sz);
1105 Slice file_user_key_without_ts =
1106 ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
1107 if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
1108 (!prefix_extractor_->InDomain(file_user_key_without_ts) ||
1109 user_comparator_.CompareWithoutTimestamp(
1110 prefix_extractor_->Transform(target_user_key_without_ts), false,
1111 prefix_extractor_->Transform(file_user_key_without_ts),
1112 false) != 0)) {
1113 SetFileIterator(nullptr);
1114 }
1115 }
1116 CheckMayBeOutOfLowerBound();
1117 }
1118
SeekForPrev(const Slice & target)1119 void LevelIterator::SeekForPrev(const Slice& target) {
1120 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1121 if (new_file_index >= flevel_->num_files) {
1122 new_file_index = flevel_->num_files - 1;
1123 }
1124
1125 InitFileIterator(new_file_index);
1126 if (file_iter_.iter() != nullptr) {
1127 file_iter_.SeekForPrev(target);
1128 SkipEmptyFileBackward();
1129 }
1130 CheckMayBeOutOfLowerBound();
1131 }
1132
SeekToFirst()1133 void LevelIterator::SeekToFirst() {
1134 InitFileIterator(0);
1135 if (file_iter_.iter() != nullptr) {
1136 file_iter_.SeekToFirst();
1137 }
1138 SkipEmptyFileForward();
1139 CheckMayBeOutOfLowerBound();
1140 }
1141
SeekToLast()1142 void LevelIterator::SeekToLast() {
1143 InitFileIterator(flevel_->num_files - 1);
1144 if (file_iter_.iter() != nullptr) {
1145 file_iter_.SeekToLast();
1146 }
1147 SkipEmptyFileBackward();
1148 CheckMayBeOutOfLowerBound();
1149 }
1150
Next()1151 void LevelIterator::Next() {
1152 assert(Valid());
1153 file_iter_.Next();
1154 SkipEmptyFileForward();
1155 }
1156
NextAndGetResult(IterateResult * result)1157 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1158 assert(Valid());
1159 bool is_valid = file_iter_.NextAndGetResult(result);
1160 if (!is_valid) {
1161 SkipEmptyFileForward();
1162 is_valid = Valid();
1163 if (is_valid) {
1164 result->key = key();
1165 result->bound_check_result = file_iter_.UpperBoundCheckResult();
1166 // Ideally, we should return the real file_iter_.value_prepared but the
1167 // information is not here. It would casue an extra PrepareValue()
1168 // for the first key of a file.
1169 result->value_prepared = !allow_unprepared_value_;
1170 }
1171 }
1172 return is_valid;
1173 }
1174
Prev()1175 void LevelIterator::Prev() {
1176 assert(Valid());
1177 file_iter_.Prev();
1178 SkipEmptyFileBackward();
1179 }
1180
SkipEmptyFileForward()1181 bool LevelIterator::SkipEmptyFileForward() {
1182 bool seen_empty_file = false;
1183 while (file_iter_.iter() == nullptr ||
1184 (!file_iter_.Valid() && file_iter_.status().ok() &&
1185 file_iter_.iter()->UpperBoundCheckResult() !=
1186 IterBoundCheck::kOutOfBound)) {
1187 seen_empty_file = true;
1188 // Move to next file
1189 if (file_index_ >= flevel_->num_files - 1) {
1190 // Already at the last file
1191 SetFileIterator(nullptr);
1192 break;
1193 }
1194 if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1195 SetFileIterator(nullptr);
1196 break;
1197 }
1198 InitFileIterator(file_index_ + 1);
1199 if (file_iter_.iter() != nullptr) {
1200 file_iter_.SeekToFirst();
1201 }
1202 }
1203 return seen_empty_file;
1204 }
1205
SkipEmptyFileBackward()1206 void LevelIterator::SkipEmptyFileBackward() {
1207 while (file_iter_.iter() == nullptr ||
1208 (!file_iter_.Valid() && file_iter_.status().ok())) {
1209 // Move to previous file
1210 if (file_index_ == 0) {
1211 // Already the first file
1212 SetFileIterator(nullptr);
1213 return;
1214 }
1215 InitFileIterator(file_index_ - 1);
1216 if (file_iter_.iter() != nullptr) {
1217 file_iter_.SeekToLast();
1218 }
1219 }
1220 }
1221
SetFileIterator(InternalIterator * iter)1222 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1223 if (pinned_iters_mgr_ && iter) {
1224 iter->SetPinnedItersMgr(pinned_iters_mgr_);
1225 }
1226
1227 InternalIterator* old_iter = file_iter_.Set(iter);
1228 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1229 pinned_iters_mgr_->PinIterator(old_iter);
1230 } else {
1231 delete old_iter;
1232 }
1233 }
1234
InitFileIterator(size_t new_file_index)1235 void LevelIterator::InitFileIterator(size_t new_file_index) {
1236 if (new_file_index >= flevel_->num_files) {
1237 file_index_ = new_file_index;
1238 SetFileIterator(nullptr);
1239 return;
1240 } else {
1241 // If the file iterator shows incomplete, we try it again if users seek
1242 // to the same file, as this time we may go to a different data block
1243 // which is cached in block cache.
1244 //
1245 if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1246 new_file_index == file_index_) {
1247 // file_iter_ is already constructed with this iterator, so
1248 // no need to change anything
1249 } else {
1250 file_index_ = new_file_index;
1251 InternalIterator* iter = NewFileIterator();
1252 SetFileIterator(iter);
1253 }
1254 }
1255 }
1256 } // anonymous namespace
1257
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1258 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1259 const FileMetaData* file_meta,
1260 const std::string* fname) const {
1261 auto table_cache = cfd_->table_cache();
1262 auto ioptions = cfd_->ioptions();
1263 Status s = table_cache->GetTableProperties(
1264 file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1265 mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1266 if (s.ok()) {
1267 return s;
1268 }
1269
1270 // We only ignore error type `Incomplete` since it's by design that we
1271 // disallow table when it's not in table cache.
1272 if (!s.IsIncomplete()) {
1273 return s;
1274 }
1275
1276 // 2. Table is not present in table cache, we'll read the table properties
1277 // directly from the properties block in the file.
1278 std::unique_ptr<FSRandomAccessFile> file;
1279 std::string file_name;
1280 if (fname != nullptr) {
1281 file_name = *fname;
1282 } else {
1283 file_name =
1284 TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1285 file_meta->fd.GetPathId());
1286 }
1287 s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1288 nullptr);
1289 if (!s.ok()) {
1290 return s;
1291 }
1292
1293 TableProperties* raw_table_properties;
1294 // By setting the magic number to kInvalidTableMagicNumber, we can by
1295 // pass the magic number check in the footer.
1296 std::unique_ptr<RandomAccessFileReader> file_reader(
1297 new RandomAccessFileReader(
1298 std::move(file), file_name, nullptr /* env */, io_tracer_,
1299 nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
1300 nullptr /* rate_limiter */, ioptions->listeners));
1301 s = ReadTableProperties(
1302 file_reader.get(), file_meta->fd.GetFileSize(),
1303 Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1304 &raw_table_properties, false /* compression_type_missing */);
1305 if (!s.ok()) {
1306 return s;
1307 }
1308 RecordTick(ioptions->stats, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1309
1310 *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1311 return s;
1312 }
1313
GetPropertiesOfAllTables(TablePropertiesCollection * props)1314 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1315 Status s;
1316 for (int level = 0; level < storage_info_.num_levels_; level++) {
1317 s = GetPropertiesOfAllTables(props, level);
1318 if (!s.ok()) {
1319 return s;
1320 }
1321 }
1322
1323 return Status::OK();
1324 }
1325
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1326 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1327 std::string* out_str) {
1328 if (max_entries_to_print <= 0) {
1329 return Status::OK();
1330 }
1331 int num_entries_left = max_entries_to_print;
1332
1333 std::stringstream ss;
1334
1335 for (int level = 0; level < storage_info_.num_levels_; level++) {
1336 for (const auto& file_meta : storage_info_.files_[level]) {
1337 auto fname =
1338 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1339 file_meta->fd.GetPathId());
1340
1341 ss << "=== file : " << fname << " ===\n";
1342
1343 TableCache* table_cache = cfd_->table_cache();
1344 std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1345
1346 Status s = table_cache->GetRangeTombstoneIterator(
1347 ReadOptions(), cfd_->internal_comparator(), *file_meta,
1348 &tombstone_iter);
1349 if (!s.ok()) {
1350 return s;
1351 }
1352 if (tombstone_iter) {
1353 tombstone_iter->SeekToFirst();
1354
1355 while (tombstone_iter->Valid() && num_entries_left > 0) {
1356 ss << "start: " << tombstone_iter->start_key().ToString(true)
1357 << " end: " << tombstone_iter->end_key().ToString(true)
1358 << " seq: " << tombstone_iter->seq() << '\n';
1359 tombstone_iter->Next();
1360 num_entries_left--;
1361 }
1362 if (num_entries_left <= 0) {
1363 break;
1364 }
1365 }
1366 }
1367 if (num_entries_left <= 0) {
1368 break;
1369 }
1370 }
1371 assert(num_entries_left >= 0);
1372 if (num_entries_left <= 0) {
1373 ss << "(results may not be complete)\n";
1374 }
1375
1376 *out_str = ss.str();
1377 return Status::OK();
1378 }
1379
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1380 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1381 int level) {
1382 for (const auto& file_meta : storage_info_.files_[level]) {
1383 auto fname =
1384 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1385 file_meta->fd.GetPathId());
1386 // 1. If the table is already present in table cache, load table
1387 // properties from there.
1388 std::shared_ptr<const TableProperties> table_properties;
1389 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1390 if (s.ok()) {
1391 props->insert({fname, table_properties});
1392 } else {
1393 return s;
1394 }
1395 }
1396
1397 return Status::OK();
1398 }
1399
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1400 Status Version::GetPropertiesOfTablesInRange(
1401 const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1402 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1403 for (decltype(n) i = 0; i < n; i++) {
1404 // Convert user_key into a corresponding internal key.
1405 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1406 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1407 std::vector<FileMetaData*> files;
1408 storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1409 false);
1410 for (const auto& file_meta : files) {
1411 auto fname =
1412 TableFileName(cfd_->ioptions()->cf_paths,
1413 file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1414 if (props->count(fname) == 0) {
1415 // 1. If the table is already present in table cache, load table
1416 // properties from there.
1417 std::shared_ptr<const TableProperties> table_properties;
1418 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1419 if (s.ok()) {
1420 props->insert({fname, table_properties});
1421 } else {
1422 return s;
1423 }
1424 }
1425 }
1426 }
1427 }
1428
1429 return Status::OK();
1430 }
1431
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1432 Status Version::GetAggregatedTableProperties(
1433 std::shared_ptr<const TableProperties>* tp, int level) {
1434 TablePropertiesCollection props;
1435 Status s;
1436 if (level < 0) {
1437 s = GetPropertiesOfAllTables(&props);
1438 } else {
1439 s = GetPropertiesOfAllTables(&props, level);
1440 }
1441 if (!s.ok()) {
1442 return s;
1443 }
1444
1445 auto* new_tp = new TableProperties();
1446 for (const auto& item : props) {
1447 new_tp->Add(*item.second);
1448 }
1449 tp->reset(new_tp);
1450 return Status::OK();
1451 }
1452
GetMemoryUsageByTableReaders()1453 size_t Version::GetMemoryUsageByTableReaders() {
1454 size_t total_usage = 0;
1455 for (auto& file_level : storage_info_.level_files_brief_) {
1456 for (size_t i = 0; i < file_level.num_files; i++) {
1457 total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1458 file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1459 mutable_cf_options_.prefix_extractor.get());
1460 }
1461 }
1462 return total_usage;
1463 }
1464
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1465 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1466 assert(cf_meta);
1467 assert(cfd_);
1468
1469 cf_meta->name = cfd_->GetName();
1470 cf_meta->size = 0;
1471 cf_meta->file_count = 0;
1472 cf_meta->levels.clear();
1473
1474 cf_meta->blob_file_size = 0;
1475 cf_meta->blob_file_count = 0;
1476 cf_meta->blob_files.clear();
1477
1478 auto* ioptions = cfd_->ioptions();
1479 auto* vstorage = storage_info();
1480
1481 for (int level = 0; level < cfd_->NumberLevels(); level++) {
1482 uint64_t level_size = 0;
1483 cf_meta->file_count += vstorage->LevelFiles(level).size();
1484 std::vector<SstFileMetaData> files;
1485 for (const auto& file : vstorage->LevelFiles(level)) {
1486 uint32_t path_id = file->fd.GetPathId();
1487 std::string file_path;
1488 if (path_id < ioptions->cf_paths.size()) {
1489 file_path = ioptions->cf_paths[path_id].path;
1490 } else {
1491 assert(!ioptions->cf_paths.empty());
1492 file_path = ioptions->cf_paths.back().path;
1493 }
1494 const uint64_t file_number = file->fd.GetNumber();
1495 files.emplace_back(
1496 MakeTableFileName("", file_number), file_number, file_path,
1497 static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1498 file->fd.largest_seqno, file->smallest.user_key().ToString(),
1499 file->largest.user_key().ToString(),
1500 file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1501 file->being_compacted, file->temperature,
1502 file->oldest_blob_file_number, file->TryGetOldestAncesterTime(),
1503 file->TryGetFileCreationTime(), file->file_checksum,
1504 file->file_checksum_func_name);
1505 files.back().num_entries = file->num_entries;
1506 files.back().num_deletions = file->num_deletions;
1507 level_size += file->fd.GetFileSize();
1508 }
1509 cf_meta->levels.emplace_back(
1510 level, level_size, std::move(files));
1511 cf_meta->size += level_size;
1512 }
1513 for (const auto& iter : vstorage->GetBlobFiles()) {
1514 const auto meta = iter.second.get();
1515 cf_meta->blob_files.emplace_back(
1516 meta->GetBlobFileNumber(), BlobFileName("", meta->GetBlobFileNumber()),
1517 ioptions->cf_paths.front().path, meta->GetBlobFileSize(),
1518 meta->GetTotalBlobCount(), meta->GetTotalBlobBytes(),
1519 meta->GetGarbageBlobCount(), meta->GetGarbageBlobBytes(),
1520 meta->GetChecksumMethod(), meta->GetChecksumValue());
1521 cf_meta->blob_file_count++;
1522 cf_meta->blob_file_size += meta->GetBlobFileSize();
1523 }
1524 }
1525
GetSstFilesSize()1526 uint64_t Version::GetSstFilesSize() {
1527 uint64_t sst_files_size = 0;
1528 for (int level = 0; level < storage_info_.num_levels_; level++) {
1529 for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1530 sst_files_size += file_meta->fd.GetFileSize();
1531 }
1532 }
1533 return sst_files_size;
1534 }
1535
GetCreationTimeOfOldestFile(uint64_t * creation_time)1536 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1537 uint64_t oldest_time = port::kMaxUint64;
1538 for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1539 for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1540 assert(meta->fd.table_reader != nullptr);
1541 uint64_t file_creation_time = meta->TryGetFileCreationTime();
1542 if (file_creation_time == kUnknownFileCreationTime) {
1543 *creation_time = 0;
1544 return;
1545 }
1546 if (file_creation_time < oldest_time) {
1547 oldest_time = file_creation_time;
1548 }
1549 }
1550 }
1551 *creation_time = oldest_time;
1552 }
1553
GetEstimatedActiveKeys() const1554 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1555 // Estimation will be inaccurate when:
1556 // (1) there exist merge keys
1557 // (2) keys are directly overwritten
1558 // (3) deletion on non-existing keys
1559 // (4) low number of samples
1560 if (current_num_samples_ == 0) {
1561 return 0;
1562 }
1563
1564 if (current_num_non_deletions_ <= current_num_deletions_) {
1565 return 0;
1566 }
1567
1568 uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1569
1570 uint64_t file_count = 0;
1571 for (int level = 0; level < num_levels_; ++level) {
1572 file_count += files_[level].size();
1573 }
1574
1575 if (current_num_samples_ < file_count) {
1576 // casting to avoid overflowing
1577 return
1578 static_cast<uint64_t>(
1579 (est * static_cast<double>(file_count) / current_num_samples_)
1580 );
1581 } else {
1582 return est;
1583 }
1584 }
1585
GetEstimatedCompressionRatioAtLevel(int level) const1586 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1587 int level) const {
1588 assert(level < num_levels_);
1589 uint64_t sum_file_size_bytes = 0;
1590 uint64_t sum_data_size_bytes = 0;
1591 for (auto* file_meta : files_[level]) {
1592 sum_file_size_bytes += file_meta->fd.GetFileSize();
1593 sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1594 }
1595 if (sum_file_size_bytes == 0) {
1596 return -1.0;
1597 }
1598 return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1599 }
1600
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg,bool allow_unprepared_value)1601 void Version::AddIterators(const ReadOptions& read_options,
1602 const FileOptions& soptions,
1603 MergeIteratorBuilder* merge_iter_builder,
1604 RangeDelAggregator* range_del_agg,
1605 bool allow_unprepared_value) {
1606 assert(storage_info_.finalized_);
1607
1608 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1609 AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1610 range_del_agg, allow_unprepared_value);
1611 }
1612 }
1613
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg,bool allow_unprepared_value)1614 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1615 const FileOptions& soptions,
1616 MergeIteratorBuilder* merge_iter_builder,
1617 int level,
1618 RangeDelAggregator* range_del_agg,
1619 bool allow_unprepared_value) {
1620 assert(storage_info_.finalized_);
1621 if (level >= storage_info_.num_non_empty_levels()) {
1622 // This is an empty level
1623 return;
1624 } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1625 // No files in this level
1626 return;
1627 }
1628
1629 bool should_sample = should_sample_file_read();
1630
1631 auto* arena = merge_iter_builder->GetArena();
1632 if (level == 0) {
1633 // Merge all level zero files together since they may overlap
1634 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1635 const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1636 merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1637 read_options, soptions, cfd_->internal_comparator(),
1638 *file.file_metadata, range_del_agg,
1639 mutable_cf_options_.prefix_extractor.get(), nullptr,
1640 cfd_->internal_stats()->GetFileReadHist(0),
1641 TableReaderCaller::kUserIterator, arena,
1642 /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1643 /*smallest_compaction_key=*/nullptr,
1644 /*largest_compaction_key=*/nullptr, allow_unprepared_value));
1645 }
1646 if (should_sample) {
1647 // Count ones for every L0 files. This is done per iterator creation
1648 // rather than Seek(), while files in other levels are recored per seek.
1649 // If users execute one range query per iterator, there may be some
1650 // discrepancy here.
1651 for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1652 sample_file_read_inc(meta);
1653 }
1654 }
1655 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1656 // For levels > 0, we can use a concatenating iterator that sequentially
1657 // walks through the non-overlapping files in the level, opening them
1658 // lazily.
1659 auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1660 merge_iter_builder->AddIterator(new (mem) LevelIterator(
1661 cfd_->table_cache(), read_options, soptions,
1662 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1663 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1664 cfd_->internal_stats()->GetFileReadHist(level),
1665 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1666 range_del_agg,
1667 /*compaction_boundaries=*/nullptr, allow_unprepared_value));
1668 }
1669 }
1670
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1671 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1672 const FileOptions& file_options,
1673 const Slice& smallest_user_key,
1674 const Slice& largest_user_key,
1675 int level, bool* overlap) {
1676 assert(storage_info_.finalized_);
1677
1678 auto icmp = cfd_->internal_comparator();
1679 auto ucmp = icmp.user_comparator();
1680
1681 Arena arena;
1682 Status status;
1683 ReadRangeDelAggregator range_del_agg(&icmp,
1684 kMaxSequenceNumber /* upper_bound */);
1685
1686 *overlap = false;
1687
1688 if (level == 0) {
1689 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1690 const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1691 if (AfterFile(ucmp, &smallest_user_key, file) ||
1692 BeforeFile(ucmp, &largest_user_key, file)) {
1693 continue;
1694 }
1695 ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1696 read_options, file_options, cfd_->internal_comparator(),
1697 *file->file_metadata, &range_del_agg,
1698 mutable_cf_options_.prefix_extractor.get(), nullptr,
1699 cfd_->internal_stats()->GetFileReadHist(0),
1700 TableReaderCaller::kUserIterator, &arena,
1701 /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1702 /*smallest_compaction_key=*/nullptr,
1703 /*largest_compaction_key=*/nullptr,
1704 /*allow_unprepared_value=*/false));
1705 status = OverlapWithIterator(
1706 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1707 if (!status.ok() || *overlap) {
1708 break;
1709 }
1710 }
1711 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1712 auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1713 ScopedArenaIterator iter(new (mem) LevelIterator(
1714 cfd_->table_cache(), read_options, file_options,
1715 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1716 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1717 cfd_->internal_stats()->GetFileReadHist(level),
1718 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1719 &range_del_agg));
1720 status = OverlapWithIterator(
1721 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1722 }
1723
1724 if (status.ok() && *overlap == false &&
1725 range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1726 *overlap = true;
1727 }
1728 return status;
1729 }
1730
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1731 VersionStorageInfo::VersionStorageInfo(
1732 const InternalKeyComparator* internal_comparator,
1733 const Comparator* user_comparator, int levels,
1734 CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1735 bool _force_consistency_checks)
1736 : internal_comparator_(internal_comparator),
1737 user_comparator_(user_comparator),
1738 // cfd is nullptr if Version is dummy
1739 num_levels_(levels),
1740 num_non_empty_levels_(0),
1741 file_indexer_(user_comparator),
1742 compaction_style_(compaction_style),
1743 files_(new std::vector<FileMetaData*>[num_levels_]),
1744 base_level_(num_levels_ == 1 ? -1 : 1),
1745 level_multiplier_(0.0),
1746 files_by_compaction_pri_(num_levels_),
1747 level0_non_overlapping_(false),
1748 next_file_to_compact_by_size_(num_levels_),
1749 compaction_score_(num_levels_),
1750 compaction_level_(num_levels_),
1751 l0_delay_trigger_count_(0),
1752 accumulated_file_size_(0),
1753 accumulated_raw_key_size_(0),
1754 accumulated_raw_value_size_(0),
1755 accumulated_num_non_deletions_(0),
1756 accumulated_num_deletions_(0),
1757 current_num_non_deletions_(0),
1758 current_num_deletions_(0),
1759 current_num_samples_(0),
1760 estimated_compaction_needed_bytes_(0),
1761 finalized_(false),
1762 force_consistency_checks_(_force_consistency_checks) {
1763 if (ref_vstorage != nullptr) {
1764 accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1765 accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1766 accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1767 accumulated_num_non_deletions_ =
1768 ref_vstorage->accumulated_num_non_deletions_;
1769 accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1770 current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1771 current_num_deletions_ = ref_vstorage->current_num_deletions_;
1772 current_num_samples_ = ref_vstorage->current_num_samples_;
1773 oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1774 }
1775 }
1776
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,const std::shared_ptr<IOTracer> & io_tracer,uint64_t version_number)1777 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1778 const FileOptions& file_opt,
1779 const MutableCFOptions mutable_cf_options,
1780 const std::shared_ptr<IOTracer>& io_tracer,
1781 uint64_t version_number)
1782 : env_(vset->env_),
1783 clock_(vset->clock_),
1784 cfd_(column_family_data),
1785 info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->logger),
1786 db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->stats),
1787 table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1788 blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
1789 merge_operator_(
1790 (cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()),
1791 storage_info_(
1792 (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1793 (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1794 cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1795 cfd_ == nullptr ? kCompactionStyleLevel
1796 : cfd_->ioptions()->compaction_style,
1797 (cfd_ == nullptr || cfd_->current() == nullptr)
1798 ? nullptr
1799 : cfd_->current()->storage_info(),
1800 cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1801 vset_(vset),
1802 next_(this),
1803 prev_(this),
1804 refs_(0),
1805 file_options_(file_opt),
1806 mutable_cf_options_(mutable_cf_options),
1807 max_file_size_for_l0_meta_pin_(
1808 MaxFileSizeForL0MetaPin(mutable_cf_options_)),
1809 version_number_(version_number),
1810 io_tracer_(io_tracer) {}
1811
GetBlob(const ReadOptions & read_options,const Slice & user_key,const Slice & blob_index_slice,PinnableSlice * value,uint64_t * bytes_read) const1812 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1813 const Slice& blob_index_slice, PinnableSlice* value,
1814 uint64_t* bytes_read) const {
1815 if (read_options.read_tier == kBlockCacheTier) {
1816 return Status::Incomplete("Cannot read blob: no disk I/O allowed");
1817 }
1818
1819 BlobIndex blob_index;
1820
1821 {
1822 Status s = blob_index.DecodeFrom(blob_index_slice);
1823 if (!s.ok()) {
1824 return s;
1825 }
1826 }
1827
1828 return GetBlob(read_options, user_key, blob_index, value, bytes_read);
1829 }
1830
GetBlob(const ReadOptions & read_options,const Slice & user_key,const BlobIndex & blob_index,PinnableSlice * value,uint64_t * bytes_read) const1831 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1832 const BlobIndex& blob_index, PinnableSlice* value,
1833 uint64_t* bytes_read) const {
1834 assert(value);
1835
1836 if (blob_index.HasTTL() || blob_index.IsInlined()) {
1837 return Status::Corruption("Unexpected TTL/inlined blob index");
1838 }
1839
1840 const auto& blob_files = storage_info_.GetBlobFiles();
1841
1842 const uint64_t blob_file_number = blob_index.file_number();
1843
1844 const auto it = blob_files.find(blob_file_number);
1845 if (it == blob_files.end()) {
1846 return Status::Corruption("Invalid blob file number");
1847 }
1848
1849 CacheHandleGuard<BlobFileReader> blob_file_reader;
1850
1851 {
1852 assert(blob_file_cache_);
1853 const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number,
1854 &blob_file_reader);
1855 if (!s.ok()) {
1856 return s;
1857 }
1858 }
1859
1860 assert(blob_file_reader.GetValue());
1861 const Status s = blob_file_reader.GetValue()->GetBlob(
1862 read_options, user_key, blob_index.offset(), blob_index.size(),
1863 blob_index.compression(), value, bytes_read);
1864
1865 return s;
1866 }
1867
MultiGetBlob(const ReadOptions & read_options,MultiGetRange & range,std::unordered_map<uint64_t,BlobReadRequests> & blob_rqs)1868 void Version::MultiGetBlob(
1869 const ReadOptions& read_options, MultiGetRange& range,
1870 std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
1871 if (read_options.read_tier == kBlockCacheTier) {
1872 Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
1873 for (const auto& elem : blob_rqs) {
1874 for (const auto& blob_rq : elem.second) {
1875 const KeyContext& key_context = blob_rq.second;
1876 assert(key_context.s);
1877 assert(key_context.s->ok());
1878 *(key_context.s) = s;
1879 assert(key_context.get_context);
1880 auto& get_context = *(key_context.get_context);
1881 get_context.MarkKeyMayExist();
1882 }
1883 }
1884 return;
1885 }
1886
1887 assert(!blob_rqs.empty());
1888 Status status;
1889 const auto& blob_files = storage_info_.GetBlobFiles();
1890 for (auto& elem : blob_rqs) {
1891 uint64_t blob_file_number = elem.first;
1892 if (blob_files.find(blob_file_number) == blob_files.end()) {
1893 auto& blobs_in_file = elem.second;
1894 for (const auto& blob : blobs_in_file) {
1895 const KeyContext& key_context = blob.second;
1896 *(key_context.s) = Status::Corruption("Invalid blob file number");
1897 }
1898 continue;
1899 }
1900 CacheHandleGuard<BlobFileReader> blob_file_reader;
1901 assert(blob_file_cache_);
1902 status = blob_file_cache_->GetBlobFileReader(blob_file_number,
1903 &blob_file_reader);
1904 assert(!status.ok() || blob_file_reader.GetValue());
1905
1906 auto& blobs_in_file = elem.second;
1907 if (!status.ok()) {
1908 for (const auto& blob : blobs_in_file) {
1909 const KeyContext& key_context = blob.second;
1910 *(key_context.s) = status;
1911 }
1912 continue;
1913 }
1914
1915 assert(blob_file_reader.GetValue());
1916 const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
1917 const CompressionType compression =
1918 blob_file_reader.GetValue()->GetCompressionType();
1919
1920 // sort blobs_in_file by file offset.
1921 std::sort(
1922 blobs_in_file.begin(), blobs_in_file.end(),
1923 [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
1924 assert(lhs.first.file_number() == rhs.first.file_number());
1925 return lhs.first.offset() < rhs.first.offset();
1926 });
1927
1928 autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
1929 autovector<std::reference_wrapper<const Slice>> user_keys;
1930 autovector<uint64_t> offsets;
1931 autovector<uint64_t> value_sizes;
1932 autovector<Status*> statuses;
1933 autovector<PinnableSlice*> values;
1934 for (const auto& blob : blobs_in_file) {
1935 const auto& blob_index = blob.first;
1936 const KeyContext& key_context = blob.second;
1937 if (blob_index.HasTTL() || blob_index.IsInlined()) {
1938 *(key_context.s) =
1939 Status::Corruption("Unexpected TTL/inlined blob index");
1940 continue;
1941 }
1942 const uint64_t key_size = key_context.ukey_with_ts.size();
1943 const uint64_t offset = blob_index.offset();
1944 const uint64_t value_size = blob_index.size();
1945 if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) {
1946 *(key_context.s) = Status::Corruption("Invalid blob offset");
1947 continue;
1948 }
1949 if (blob_index.compression() != compression) {
1950 *(key_context.s) =
1951 Status::Corruption("Compression type mismatch when reading a blob");
1952 continue;
1953 }
1954 blob_read_key_contexts.emplace_back(std::cref(key_context));
1955 user_keys.emplace_back(std::cref(key_context.ukey_with_ts));
1956 offsets.push_back(blob_index.offset());
1957 value_sizes.push_back(blob_index.size());
1958 statuses.push_back(key_context.s);
1959 values.push_back(key_context.value);
1960 }
1961 blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets,
1962 value_sizes, statuses, values,
1963 /*bytes_read=*/nullptr);
1964 size_t num = blob_read_key_contexts.size();
1965 assert(num == user_keys.size());
1966 assert(num == offsets.size());
1967 assert(num == value_sizes.size());
1968 assert(num == statuses.size());
1969 assert(num == values.size());
1970 for (size_t i = 0; i < num; ++i) {
1971 if (statuses[i]->ok()) {
1972 range.AddValueSize(blob_read_key_contexts[i].get().value->size());
1973 if (range.GetValueSize() > read_options.value_size_soft_limit) {
1974 *(blob_read_key_contexts[i].get().s) = Status::Aborted();
1975 }
1976 }
1977 }
1978 }
1979 }
1980
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,std::string * timestamp,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1981 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1982 PinnableSlice* value, std::string* timestamp, Status* status,
1983 MergeContext* merge_context,
1984 SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1985 bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1986 bool* is_blob, bool do_merge) {
1987 Slice ikey = k.internal_key();
1988 Slice user_key = k.user_key();
1989
1990 assert(status->ok() || status->IsMergeInProgress());
1991
1992 if (key_exists != nullptr) {
1993 // will falsify below if not found
1994 *key_exists = true;
1995 }
1996
1997 PinnedIteratorsManager pinned_iters_mgr;
1998 uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1999 if (vset_ && vset_->block_cache_tracer_ &&
2000 vset_->block_cache_tracer_->is_tracing_enabled()) {
2001 tracing_get_id = vset_->block_cache_tracer_->NextGetId();
2002 }
2003
2004 // Note: the old StackableDB-based BlobDB passes in
2005 // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
2006 // need to provide it here.
2007 bool is_blob_index = false;
2008 bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
2009 BlobFetcher blob_fetcher(this, read_options);
2010
2011 GetContext get_context(
2012 user_comparator(), merge_operator_, info_log_, db_statistics_,
2013 status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
2014 do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
2015 merge_context, do_merge, max_covering_tombstone_seq, clock_, seq,
2016 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
2017 tracing_get_id, &blob_fetcher);
2018
2019 // Pin blocks that we read to hold merge operands
2020 if (merge_operator_) {
2021 pinned_iters_mgr.StartPinning();
2022 }
2023
2024 FilePicker fp(
2025 storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
2026 storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
2027 user_comparator(), internal_comparator());
2028 FdWithKeyRange* f = fp.GetNextFile();
2029
2030 while (f != nullptr) {
2031 if (*max_covering_tombstone_seq > 0) {
2032 // The remaining files we look at will only contain covered keys, so we
2033 // stop here.
2034 break;
2035 }
2036 if (get_context.sample()) {
2037 sample_file_read_inc(f->file_metadata);
2038 }
2039
2040 bool timer_enabled =
2041 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
2042 get_perf_context()->per_level_perf_context_enabled;
2043 StopWatchNano timer(clock_, timer_enabled /* auto_start */);
2044 *status = table_cache_->Get(
2045 read_options, *internal_comparator(), *f->file_metadata, ikey,
2046 &get_context, mutable_cf_options_.prefix_extractor.get(),
2047 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
2048 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
2049 fp.IsHitFileLastInLevel()),
2050 fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
2051 // TODO: examine the behavior for corrupted key
2052 if (timer_enabled) {
2053 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
2054 fp.GetHitFileLevel());
2055 }
2056 if (!status->ok()) {
2057 if (db_statistics_ != nullptr) {
2058 get_context.ReportCounters();
2059 }
2060 return;
2061 }
2062
2063 // report the counters before returning
2064 if (get_context.State() != GetContext::kNotFound &&
2065 get_context.State() != GetContext::kMerge &&
2066 db_statistics_ != nullptr) {
2067 get_context.ReportCounters();
2068 }
2069 switch (get_context.State()) {
2070 case GetContext::kNotFound:
2071 // Keep searching in other files
2072 break;
2073 case GetContext::kMerge:
2074 // TODO: update per-level perfcontext user_key_return_count for kMerge
2075 break;
2076 case GetContext::kFound:
2077 if (fp.GetHitFileLevel() == 0) {
2078 RecordTick(db_statistics_, GET_HIT_L0);
2079 } else if (fp.GetHitFileLevel() == 1) {
2080 RecordTick(db_statistics_, GET_HIT_L1);
2081 } else if (fp.GetHitFileLevel() >= 2) {
2082 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2083 }
2084
2085 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2086 fp.GetHitFileLevel());
2087
2088 if (is_blob_index) {
2089 if (do_merge && value) {
2090 constexpr uint64_t* bytes_read = nullptr;
2091
2092 *status =
2093 GetBlob(read_options, user_key, *value, value, bytes_read);
2094 if (!status->ok()) {
2095 if (status->IsIncomplete()) {
2096 get_context.MarkKeyMayExist();
2097 }
2098 return;
2099 }
2100 }
2101 }
2102
2103 return;
2104 case GetContext::kDeleted:
2105 // Use empty error message for speed
2106 *status = Status::NotFound();
2107 return;
2108 case GetContext::kCorrupt:
2109 *status = Status::Corruption("corrupted key for ", user_key);
2110 return;
2111 case GetContext::kUnexpectedBlobIndex:
2112 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2113 *status = Status::NotSupported(
2114 "Encounter unexpected blob index. Please open DB with "
2115 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2116 return;
2117 }
2118 f = fp.GetNextFile();
2119 }
2120 if (db_statistics_ != nullptr) {
2121 get_context.ReportCounters();
2122 }
2123 if (GetContext::kMerge == get_context.State()) {
2124 if (!do_merge) {
2125 *status = Status::OK();
2126 return;
2127 }
2128 if (!merge_operator_) {
2129 *status = Status::InvalidArgument(
2130 "merge_operator is not properly initialized.");
2131 return;
2132 }
2133 // merge_operands are in saver and we hit the beginning of the key history
2134 // do a final merge of nullptr and operands;
2135 std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
2136 *status = MergeHelper::TimedFullMerge(
2137 merge_operator_, user_key, nullptr, merge_context->GetOperands(),
2138 str_value, info_log_, db_statistics_, clock_,
2139 nullptr /* result_operand */, true);
2140 if (LIKELY(value != nullptr)) {
2141 value->PinSelf();
2142 }
2143 } else {
2144 if (key_exists != nullptr) {
2145 *key_exists = false;
2146 }
2147 *status = Status::NotFound(); // Use an empty error message for speed
2148 }
2149 }
2150
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback)2151 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
2152 ReadCallback* callback) {
2153 PinnedIteratorsManager pinned_iters_mgr;
2154
2155 // Pin blocks that we read to hold merge operands
2156 if (merge_operator_) {
2157 pinned_iters_mgr.StartPinning();
2158 }
2159 uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
2160
2161 if (vset_ && vset_->block_cache_tracer_ &&
2162 vset_->block_cache_tracer_->is_tracing_enabled()) {
2163 tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
2164 }
2165 // Even though we know the batch size won't be > MAX_BATCH_SIZE,
2166 // use autovector in order to avoid unnecessary construction of GetContext
2167 // objects, which is expensive
2168 autovector<GetContext, 16> get_ctx;
2169 BlobFetcher blob_fetcher(this, read_options);
2170 for (auto iter = range->begin(); iter != range->end(); ++iter) {
2171 assert(iter->s->ok() || iter->s->IsMergeInProgress());
2172 get_ctx.emplace_back(
2173 user_comparator(), merge_operator_, info_log_, db_statistics_,
2174 iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
2175 iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
2176 &(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_,
2177 nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
2178 &iter->is_blob_index, tracing_mget_id, &blob_fetcher);
2179 // MergeInProgress status, if set, has been transferred to the get_context
2180 // state, so we set status to ok here. From now on, the iter status will
2181 // be used for IO errors, and get_context state will be used for any
2182 // key level errors
2183 *(iter->s) = Status::OK();
2184 }
2185 int get_ctx_index = 0;
2186 for (auto iter = range->begin(); iter != range->end();
2187 ++iter, get_ctx_index++) {
2188 iter->get_context = &(get_ctx[get_ctx_index]);
2189 }
2190
2191 MultiGetRange file_picker_range(*range, range->begin(), range->end());
2192 FilePickerMultiGet fp(
2193 &file_picker_range,
2194 &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
2195 &storage_info_.file_indexer_, user_comparator(), internal_comparator());
2196 FdWithKeyRange* f = fp.GetNextFile();
2197 Status s;
2198 uint64_t num_index_read = 0;
2199 uint64_t num_filter_read = 0;
2200 uint64_t num_data_read = 0;
2201 uint64_t num_sst_read = 0;
2202
2203 MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
2204 // blob_file => [[blob_idx, it], ...]
2205 std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
2206
2207 while (f != nullptr) {
2208 MultiGetRange file_range = fp.CurrentFileRange();
2209 bool timer_enabled =
2210 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
2211 get_perf_context()->per_level_perf_context_enabled;
2212 StopWatchNano timer(clock_, timer_enabled /* auto_start */);
2213 s = table_cache_->MultiGet(
2214 read_options, *internal_comparator(), *f->file_metadata, &file_range,
2215 mutable_cf_options_.prefix_extractor.get(),
2216 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
2217 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
2218 fp.IsHitFileLastInLevel()),
2219 fp.GetHitFileLevel());
2220 // TODO: examine the behavior for corrupted key
2221 if (timer_enabled) {
2222 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
2223 fp.GetHitFileLevel());
2224 }
2225 if (!s.ok()) {
2226 // TODO: Set status for individual keys appropriately
2227 for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
2228 *iter->s = s;
2229 file_range.MarkKeyDone(iter);
2230 }
2231 return;
2232 }
2233 uint64_t batch_size = 0;
2234 for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
2235 ++iter) {
2236 GetContext& get_context = *iter->get_context;
2237 Status* status = iter->s;
2238 // The Status in the KeyContext takes precedence over GetContext state
2239 // Status may be an error if there were any IO errors in the table
2240 // reader. We never expect Status to be NotFound(), as that is
2241 // determined by get_context
2242 assert(!status->IsNotFound());
2243 if (!status->ok()) {
2244 file_range.MarkKeyDone(iter);
2245 continue;
2246 }
2247
2248 if (get_context.sample()) {
2249 sample_file_read_inc(f->file_metadata);
2250 }
2251 batch_size++;
2252 num_index_read += get_context.get_context_stats_.num_index_read;
2253 num_filter_read += get_context.get_context_stats_.num_filter_read;
2254 num_data_read += get_context.get_context_stats_.num_data_read;
2255 num_sst_read += get_context.get_context_stats_.num_sst_read;
2256
2257 // report the counters before returning
2258 if (get_context.State() != GetContext::kNotFound &&
2259 get_context.State() != GetContext::kMerge &&
2260 db_statistics_ != nullptr) {
2261 get_context.ReportCounters();
2262 } else {
2263 if (iter->max_covering_tombstone_seq > 0) {
2264 // The remaining files we look at will only contain covered keys, so
2265 // we stop here for this key
2266 file_picker_range.SkipKey(iter);
2267 }
2268 }
2269 switch (get_context.State()) {
2270 case GetContext::kNotFound:
2271 // Keep searching in other files
2272 break;
2273 case GetContext::kMerge:
2274 // TODO: update per-level perfcontext user_key_return_count for kMerge
2275 break;
2276 case GetContext::kFound:
2277 if (fp.GetHitFileLevel() == 0) {
2278 RecordTick(db_statistics_, GET_HIT_L0);
2279 } else if (fp.GetHitFileLevel() == 1) {
2280 RecordTick(db_statistics_, GET_HIT_L1);
2281 } else if (fp.GetHitFileLevel() >= 2) {
2282 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2283 }
2284
2285 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2286 fp.GetHitFileLevel());
2287
2288 file_range.MarkKeyDone(iter);
2289
2290 if (iter->is_blob_index) {
2291 if (iter->value) {
2292 const Slice& blob_index_slice = *(iter->value);
2293 BlobIndex blob_index;
2294 Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
2295 if (tmp_s.ok()) {
2296 const uint64_t blob_file_num = blob_index.file_number();
2297 blob_rqs[blob_file_num].emplace_back(
2298 std::make_pair(blob_index, std::cref(*iter)));
2299 } else {
2300 *(iter->s) = tmp_s;
2301 }
2302 }
2303 } else {
2304 file_range.AddValueSize(iter->value->size());
2305 if (file_range.GetValueSize() >
2306 read_options.value_size_soft_limit) {
2307 s = Status::Aborted();
2308 break;
2309 }
2310 }
2311 continue;
2312 case GetContext::kDeleted:
2313 // Use empty error message for speed
2314 *status = Status::NotFound();
2315 file_range.MarkKeyDone(iter);
2316 continue;
2317 case GetContext::kCorrupt:
2318 *status =
2319 Status::Corruption("corrupted key for ", iter->lkey->user_key());
2320 file_range.MarkKeyDone(iter);
2321 continue;
2322 case GetContext::kUnexpectedBlobIndex:
2323 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2324 *status = Status::NotSupported(
2325 "Encounter unexpected blob index. Please open DB with "
2326 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2327 file_range.MarkKeyDone(iter);
2328 continue;
2329 }
2330 }
2331
2332 // Report MultiGet stats per level.
2333 if (fp.IsHitFileLastInLevel()) {
2334 // Dump the stats if this is the last file of this level and reset for
2335 // next level.
2336 RecordInHistogram(db_statistics_,
2337 NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
2338 num_index_read + num_filter_read);
2339 RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
2340 num_data_read);
2341 RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
2342 num_filter_read = 0;
2343 num_index_read = 0;
2344 num_data_read = 0;
2345 num_sst_read = 0;
2346 }
2347
2348 RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2349 if (!s.ok() || file_picker_range.empty()) {
2350 break;
2351 }
2352 f = fp.GetNextFile();
2353 }
2354
2355 if (s.ok() && !blob_rqs.empty()) {
2356 MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
2357 }
2358
2359 // Process any left over keys
2360 for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
2361 GetContext& get_context = *iter->get_context;
2362 Status* status = iter->s;
2363 Slice user_key = iter->lkey->user_key();
2364
2365 if (db_statistics_ != nullptr) {
2366 get_context.ReportCounters();
2367 }
2368 if (GetContext::kMerge == get_context.State()) {
2369 if (!merge_operator_) {
2370 *status = Status::InvalidArgument(
2371 "merge_operator is not properly initialized.");
2372 range->MarkKeyDone(iter);
2373 continue;
2374 }
2375 // merge_operands are in saver and we hit the beginning of the key history
2376 // do a final merge of nullptr and operands;
2377 std::string* str_value =
2378 iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2379 *status = MergeHelper::TimedFullMerge(
2380 merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2381 str_value, info_log_, db_statistics_, clock_,
2382 nullptr /* result_operand */, true);
2383 if (LIKELY(iter->value != nullptr)) {
2384 iter->value->PinSelf();
2385 range->AddValueSize(iter->value->size());
2386 range->MarkKeyDone(iter);
2387 if (range->GetValueSize() > read_options.value_size_soft_limit) {
2388 s = Status::Aborted();
2389 break;
2390 }
2391 }
2392 } else {
2393 range->MarkKeyDone(iter);
2394 *status = Status::NotFound(); // Use an empty error message for speed
2395 }
2396 }
2397
2398 for (auto iter = range->begin(); iter != range->end(); ++iter) {
2399 range->MarkKeyDone(iter);
2400 *(iter->s) = s;
2401 }
2402 }
2403
IsFilterSkipped(int level,bool is_file_last_in_level)2404 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2405 // Reaching the bottom level implies misses at all upper levels, so we'll
2406 // skip checking the filters when we predict a hit.
2407 return cfd_->ioptions()->optimize_filters_for_hits &&
2408 (level > 0 || is_file_last_in_level) &&
2409 level == storage_info_.num_non_empty_levels() - 1;
2410 }
2411
GenerateLevelFilesBrief()2412 void VersionStorageInfo::GenerateLevelFilesBrief() {
2413 level_files_brief_.resize(num_non_empty_levels_);
2414 for (int level = 0; level < num_non_empty_levels_; level++) {
2415 DoGenerateLevelFilesBrief(
2416 &level_files_brief_[level], files_[level], &arena_);
2417 }
2418 }
2419
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2420 void Version::PrepareApply(
2421 const MutableCFOptions& mutable_cf_options,
2422 bool update_stats) {
2423 TEST_SYNC_POINT_CALLBACK(
2424 "Version::PrepareApply:forced_check",
2425 reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
2426 UpdateAccumulatedStats(update_stats);
2427 storage_info_.UpdateNumNonEmptyLevels();
2428 storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2429 storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2430 storage_info_.GenerateFileIndexer();
2431 storage_info_.GenerateLevelFilesBrief();
2432 storage_info_.GenerateLevel0NonOverlapping();
2433 storage_info_.GenerateBottommostFiles();
2434 }
2435
MaybeInitializeFileMetaData(FileMetaData * file_meta)2436 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2437 if (file_meta->init_stats_from_file ||
2438 file_meta->compensated_file_size > 0) {
2439 return false;
2440 }
2441 std::shared_ptr<const TableProperties> tp;
2442 Status s = GetTableProperties(&tp, file_meta);
2443 file_meta->init_stats_from_file = true;
2444 if (!s.ok()) {
2445 ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2446 "Unable to load table properties for file %" PRIu64
2447 " --- %s\n",
2448 file_meta->fd.GetNumber(), s.ToString().c_str());
2449 return false;
2450 }
2451 if (tp.get() == nullptr) return false;
2452 file_meta->num_entries = tp->num_entries;
2453 file_meta->num_deletions = tp->num_deletions;
2454 file_meta->raw_value_size = tp->raw_value_size;
2455 file_meta->raw_key_size = tp->raw_key_size;
2456
2457 return true;
2458 }
2459
UpdateAccumulatedStats(FileMetaData * file_meta)2460 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2461 TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2462 nullptr);
2463
2464 assert(file_meta->init_stats_from_file);
2465 accumulated_file_size_ += file_meta->fd.GetFileSize();
2466 accumulated_raw_key_size_ += file_meta->raw_key_size;
2467 accumulated_raw_value_size_ += file_meta->raw_value_size;
2468 accumulated_num_non_deletions_ +=
2469 file_meta->num_entries - file_meta->num_deletions;
2470 accumulated_num_deletions_ += file_meta->num_deletions;
2471
2472 current_num_non_deletions_ +=
2473 file_meta->num_entries - file_meta->num_deletions;
2474 current_num_deletions_ += file_meta->num_deletions;
2475 current_num_samples_++;
2476 }
2477
RemoveCurrentStats(FileMetaData * file_meta)2478 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2479 if (file_meta->init_stats_from_file) {
2480 current_num_non_deletions_ -=
2481 file_meta->num_entries - file_meta->num_deletions;
2482 current_num_deletions_ -= file_meta->num_deletions;
2483 current_num_samples_--;
2484 }
2485 }
2486
UpdateAccumulatedStats(bool update_stats)2487 void Version::UpdateAccumulatedStats(bool update_stats) {
2488 if (update_stats) {
2489 // maximum number of table properties loaded from files.
2490 const int kMaxInitCount = 20;
2491 int init_count = 0;
2492 // here only the first kMaxInitCount files which haven't been
2493 // initialized from file will be updated with num_deletions.
2494 // The motivation here is to cap the maximum I/O per Version creation.
2495 // The reason for choosing files from lower-level instead of higher-level
2496 // is that such design is able to propagate the initialization from
2497 // lower-level to higher-level: When the num_deletions of lower-level
2498 // files are updated, it will make the lower-level files have accurate
2499 // compensated_file_size, making lower-level to higher-level compaction
2500 // will be triggered, which creates higher-level files whose num_deletions
2501 // will be updated here.
2502 for (int level = 0;
2503 level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2504 ++level) {
2505 for (auto* file_meta : storage_info_.files_[level]) {
2506 if (MaybeInitializeFileMetaData(file_meta)) {
2507 // each FileMeta will be initialized only once.
2508 storage_info_.UpdateAccumulatedStats(file_meta);
2509 // when option "max_open_files" is -1, all the file metadata has
2510 // already been read, so MaybeInitializeFileMetaData() won't incur
2511 // any I/O cost. "max_open_files=-1" means that the table cache passed
2512 // to the VersionSet and then to the ColumnFamilySet has a size of
2513 // TableCache::kInfiniteCapacity
2514 if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2515 TableCache::kInfiniteCapacity) {
2516 continue;
2517 }
2518 if (++init_count >= kMaxInitCount) {
2519 break;
2520 }
2521 }
2522 }
2523 }
2524 // In case all sampled-files contain only deletion entries, then we
2525 // load the table-property of a file in higher-level to initialize
2526 // that value.
2527 for (int level = storage_info_.num_levels_ - 1;
2528 storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2529 --level) {
2530 for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2531 storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2532 if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2533 storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2534 }
2535 }
2536 }
2537 }
2538
2539 storage_info_.ComputeCompensatedSizes();
2540 }
2541
ComputeCompensatedSizes()2542 void VersionStorageInfo::ComputeCompensatedSizes() {
2543 static const int kDeletionWeightOnCompaction = 2;
2544 uint64_t average_value_size = GetAverageValueSize();
2545
2546 // compute the compensated size
2547 for (int level = 0; level < num_levels_; level++) {
2548 for (auto* file_meta : files_[level]) {
2549 // Here we only compute compensated_file_size for those file_meta
2550 // which compensated_file_size is uninitialized (== 0). This is true only
2551 // for files that have been created right now and no other thread has
2552 // access to them. That's why we can safely mutate compensated_file_size.
2553 if (file_meta->compensated_file_size == 0) {
2554 file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2555 // Here we only boost the size of deletion entries of a file only
2556 // when the number of deletion entries is greater than the number of
2557 // non-deletion entries in the file. The motivation here is that in
2558 // a stable workload, the number of deletion entries should be roughly
2559 // equal to the number of non-deletion entries. If we compensate the
2560 // size of deletion entries in a stable workload, the deletion
2561 // compensation logic might introduce unwanted effet which changes the
2562 // shape of LSM tree.
2563 if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2564 file_meta->compensated_file_size +=
2565 (file_meta->num_deletions * 2 - file_meta->num_entries) *
2566 average_value_size * kDeletionWeightOnCompaction;
2567 }
2568 }
2569 }
2570 }
2571 }
2572
MaxInputLevel() const2573 int VersionStorageInfo::MaxInputLevel() const {
2574 if (compaction_style_ == kCompactionStyleLevel) {
2575 return num_levels() - 2;
2576 }
2577 return 0;
2578 }
2579
MaxOutputLevel(bool allow_ingest_behind) const2580 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2581 if (allow_ingest_behind) {
2582 assert(num_levels() > 1);
2583 return num_levels() - 2;
2584 }
2585 return num_levels() - 1;
2586 }
2587
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2588 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2589 const MutableCFOptions& mutable_cf_options) {
2590 // Only implemented for level-based compaction
2591 if (compaction_style_ != kCompactionStyleLevel) {
2592 estimated_compaction_needed_bytes_ = 0;
2593 return;
2594 }
2595
2596 // Start from Level 0, if level 0 qualifies compaction to level 1,
2597 // we estimate the size of compaction.
2598 // Then we move on to the next level and see whether it qualifies compaction
2599 // to the next level. The size of the level is estimated as the actual size
2600 // on the level plus the input bytes from the previous level if there is any.
2601 // If it exceeds, take the exceeded bytes as compaction input and add the size
2602 // of the compaction size to tatal size.
2603 // We keep doing it to Level 2, 3, etc, until the last level and return the
2604 // accumulated bytes.
2605
2606 uint64_t bytes_compact_to_next_level = 0;
2607 uint64_t level_size = 0;
2608 for (auto* f : files_[0]) {
2609 level_size += f->fd.GetFileSize();
2610 }
2611 // Level 0
2612 bool level0_compact_triggered = false;
2613 if (static_cast<int>(files_[0].size()) >=
2614 mutable_cf_options.level0_file_num_compaction_trigger ||
2615 level_size >= mutable_cf_options.max_bytes_for_level_base) {
2616 level0_compact_triggered = true;
2617 estimated_compaction_needed_bytes_ = level_size;
2618 bytes_compact_to_next_level = level_size;
2619 } else {
2620 estimated_compaction_needed_bytes_ = 0;
2621 }
2622
2623 // Level 1 and up.
2624 uint64_t bytes_next_level = 0;
2625 for (int level = base_level(); level <= MaxInputLevel(); level++) {
2626 level_size = 0;
2627 if (bytes_next_level > 0) {
2628 #ifndef NDEBUG
2629 uint64_t level_size2 = 0;
2630 for (auto* f : files_[level]) {
2631 level_size2 += f->fd.GetFileSize();
2632 }
2633 assert(level_size2 == bytes_next_level);
2634 #endif
2635 level_size = bytes_next_level;
2636 bytes_next_level = 0;
2637 } else {
2638 for (auto* f : files_[level]) {
2639 level_size += f->fd.GetFileSize();
2640 }
2641 }
2642 if (level == base_level() && level0_compact_triggered) {
2643 // Add base level size to compaction if level0 compaction triggered.
2644 estimated_compaction_needed_bytes_ += level_size;
2645 }
2646 // Add size added by previous compaction
2647 level_size += bytes_compact_to_next_level;
2648 bytes_compact_to_next_level = 0;
2649 uint64_t level_target = MaxBytesForLevel(level);
2650 if (level_size > level_target) {
2651 bytes_compact_to_next_level = level_size - level_target;
2652 // Estimate the actual compaction fan-out ratio as size ratio between
2653 // the two levels.
2654
2655 assert(bytes_next_level == 0);
2656 if (level + 1 < num_levels_) {
2657 for (auto* f : files_[level + 1]) {
2658 bytes_next_level += f->fd.GetFileSize();
2659 }
2660 }
2661 if (bytes_next_level > 0) {
2662 assert(level_size > 0);
2663 estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2664 static_cast<double>(bytes_compact_to_next_level) *
2665 (static_cast<double>(bytes_next_level) /
2666 static_cast<double>(level_size) +
2667 1));
2668 }
2669 }
2670 }
2671 }
2672
2673 namespace {
GetExpiredTtlFilesCount(const ImmutableOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2674 uint32_t GetExpiredTtlFilesCount(const ImmutableOptions& ioptions,
2675 const MutableCFOptions& mutable_cf_options,
2676 const std::vector<FileMetaData*>& files) {
2677 uint32_t ttl_expired_files_count = 0;
2678
2679 int64_t _current_time;
2680 auto status = ioptions.clock->GetCurrentTime(&_current_time);
2681 if (status.ok()) {
2682 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2683 for (FileMetaData* f : files) {
2684 if (!f->being_compacted) {
2685 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2686 if (oldest_ancester_time != 0 &&
2687 oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2688 ttl_expired_files_count++;
2689 }
2690 }
2691 }
2692 }
2693 return ttl_expired_files_count;
2694 }
2695 } // anonymous namespace
2696
ComputeCompactionScore(const ImmutableOptions & immutable_options,const MutableCFOptions & mutable_cf_options)2697 void VersionStorageInfo::ComputeCompactionScore(
2698 const ImmutableOptions& immutable_options,
2699 const MutableCFOptions& mutable_cf_options) {
2700 for (int level = 0; level <= MaxInputLevel(); level++) {
2701 double score;
2702 if (level == 0) {
2703 // We treat level-0 specially by bounding the number of files
2704 // instead of number of bytes for two reasons:
2705 //
2706 // (1) With larger write-buffer sizes, it is nice not to do too
2707 // many level-0 compactions.
2708 //
2709 // (2) The files in level-0 are merged on every read and
2710 // therefore we wish to avoid too many files when the individual
2711 // file size is small (perhaps because of a small write-buffer
2712 // setting, or very high compression ratios, or lots of
2713 // overwrites/deletions).
2714 int num_sorted_runs = 0;
2715 uint64_t total_size = 0;
2716 for (auto* f : files_[level]) {
2717 if (!f->being_compacted) {
2718 total_size += f->compensated_file_size;
2719 num_sorted_runs++;
2720 }
2721 }
2722 if (compaction_style_ == kCompactionStyleUniversal) {
2723 // For universal compaction, we use level0 score to indicate
2724 // compaction score for the whole DB. Adding other levels as if
2725 // they are L0 files.
2726 for (int i = 1; i < num_levels(); i++) {
2727 // Its possible that a subset of the files in a level may be in a
2728 // compaction, due to delete triggered compaction or trivial move.
2729 // In that case, the below check may not catch a level being
2730 // compacted as it only checks the first file. The worst that can
2731 // happen is a scheduled compaction thread will find nothing to do.
2732 if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2733 num_sorted_runs++;
2734 }
2735 }
2736 }
2737
2738 if (compaction_style_ == kCompactionStyleFIFO) {
2739 score = static_cast<double>(total_size) /
2740 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2741 if (mutable_cf_options.compaction_options_fifo.allow_compaction ||
2742 mutable_cf_options.compaction_options_fifo.age_for_warm > 0) {
2743 // Warm tier move can happen at any time. It's too expensive to
2744 // check very file's timestamp now. For now, just trigger it
2745 // slightly more frequently than FIFO compaction so that this
2746 // happens first.
2747 score = std::max(
2748 static_cast<double>(num_sorted_runs) /
2749 mutable_cf_options.level0_file_num_compaction_trigger,
2750 score);
2751 }
2752 if (mutable_cf_options.ttl > 0) {
2753 score = std::max(
2754 static_cast<double>(GetExpiredTtlFilesCount(
2755 immutable_options, mutable_cf_options, files_[level])),
2756 score);
2757 }
2758 } else {
2759 score = static_cast<double>(num_sorted_runs) /
2760 mutable_cf_options.level0_file_num_compaction_trigger;
2761 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2762 // Level-based involves L0->L0 compactions that can lead to oversized
2763 // L0 files. Take into account size as well to avoid later giant
2764 // compactions to the base level.
2765 uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
2766 if (immutable_options.level_compaction_dynamic_level_bytes &&
2767 level_multiplier_ != 0.0) {
2768 // Prevent L0 to Lbase fanout from growing larger than
2769 // `level_multiplier_`. This prevents us from getting stuck picking
2770 // L0 forever even when it is hurting write-amp. That could happen
2771 // in dynamic level compaction's write-burst mode where the base
2772 // level's target size can grow to be enormous.
2773 l0_target_size =
2774 std::max(l0_target_size,
2775 static_cast<uint64_t>(level_max_bytes_[base_level_] /
2776 level_multiplier_));
2777 }
2778 score =
2779 std::max(score, static_cast<double>(total_size) / l0_target_size);
2780 }
2781 }
2782 } else {
2783 // Compute the ratio of current size to size limit.
2784 uint64_t level_bytes_no_compacting = 0;
2785 for (auto f : files_[level]) {
2786 if (!f->being_compacted) {
2787 level_bytes_no_compacting += f->compensated_file_size;
2788 }
2789 }
2790 score = static_cast<double>(level_bytes_no_compacting) /
2791 MaxBytesForLevel(level);
2792 }
2793 compaction_level_[level] = level;
2794 compaction_score_[level] = score;
2795 }
2796
2797 // sort all the levels based on their score. Higher scores get listed
2798 // first. Use bubble sort because the number of entries are small.
2799 for (int i = 0; i < num_levels() - 2; i++) {
2800 for (int j = i + 1; j < num_levels() - 1; j++) {
2801 if (compaction_score_[i] < compaction_score_[j]) {
2802 double score = compaction_score_[i];
2803 int level = compaction_level_[i];
2804 compaction_score_[i] = compaction_score_[j];
2805 compaction_level_[i] = compaction_level_[j];
2806 compaction_score_[j] = score;
2807 compaction_level_[j] = level;
2808 }
2809 }
2810 }
2811 ComputeFilesMarkedForCompaction();
2812 ComputeBottommostFilesMarkedForCompaction();
2813 if (mutable_cf_options.ttl > 0) {
2814 ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl);
2815 }
2816 if (mutable_cf_options.periodic_compaction_seconds > 0) {
2817 ComputeFilesMarkedForPeriodicCompaction(
2818 immutable_options, mutable_cf_options.periodic_compaction_seconds);
2819 }
2820
2821 if (mutable_cf_options.enable_blob_garbage_collection &&
2822 mutable_cf_options.blob_garbage_collection_age_cutoff > 0.0 &&
2823 mutable_cf_options.blob_garbage_collection_force_threshold < 1.0) {
2824 ComputeFilesMarkedForForcedBlobGC(
2825 mutable_cf_options.blob_garbage_collection_age_cutoff,
2826 mutable_cf_options.blob_garbage_collection_force_threshold);
2827 }
2828
2829 EstimateCompactionBytesNeeded(mutable_cf_options);
2830 }
2831
ComputeFilesMarkedForCompaction()2832 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2833 files_marked_for_compaction_.clear();
2834 int last_qualify_level = 0;
2835
2836 // Do not include files from the last level with data
2837 // If table properties collector suggests a file on the last level,
2838 // we should not move it to a new level.
2839 for (int level = num_levels() - 1; level >= 1; level--) {
2840 if (!files_[level].empty()) {
2841 last_qualify_level = level - 1;
2842 break;
2843 }
2844 }
2845
2846 for (int level = 0; level <= last_qualify_level; level++) {
2847 for (auto* f : files_[level]) {
2848 if (!f->being_compacted && f->marked_for_compaction) {
2849 files_marked_for_compaction_.emplace_back(level, f);
2850 }
2851 }
2852 }
2853 }
2854
ComputeExpiredTtlFiles(const ImmutableOptions & ioptions,const uint64_t ttl)2855 void VersionStorageInfo::ComputeExpiredTtlFiles(
2856 const ImmutableOptions& ioptions, const uint64_t ttl) {
2857 assert(ttl > 0);
2858
2859 expired_ttl_files_.clear();
2860
2861 int64_t _current_time;
2862 auto status = ioptions.clock->GetCurrentTime(&_current_time);
2863 if (!status.ok()) {
2864 return;
2865 }
2866 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2867
2868 for (int level = 0; level < num_levels() - 1; level++) {
2869 for (FileMetaData* f : files_[level]) {
2870 if (!f->being_compacted) {
2871 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2872 if (oldest_ancester_time > 0 &&
2873 oldest_ancester_time < (current_time - ttl)) {
2874 expired_ttl_files_.emplace_back(level, f);
2875 }
2876 }
2877 }
2878 }
2879 }
2880
ComputeFilesMarkedForPeriodicCompaction(const ImmutableOptions & ioptions,const uint64_t periodic_compaction_seconds)2881 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2882 const ImmutableOptions& ioptions,
2883 const uint64_t periodic_compaction_seconds) {
2884 assert(periodic_compaction_seconds > 0);
2885
2886 files_marked_for_periodic_compaction_.clear();
2887
2888 int64_t temp_current_time;
2889 auto status = ioptions.clock->GetCurrentTime(&temp_current_time);
2890 if (!status.ok()) {
2891 return;
2892 }
2893 const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2894
2895 // If periodic_compaction_seconds is larger than current time, periodic
2896 // compaction can't possibly be triggered.
2897 if (periodic_compaction_seconds > current_time) {
2898 return;
2899 }
2900
2901 const uint64_t allowed_time_limit =
2902 current_time - periodic_compaction_seconds;
2903
2904 for (int level = 0; level < num_levels(); level++) {
2905 for (auto f : files_[level]) {
2906 if (!f->being_compacted) {
2907 // Compute a file's modification time in the following order:
2908 // 1. Use file_creation_time table property if it is > 0.
2909 // 2. Use creation_time table property if it is > 0.
2910 // 3. Use file's mtime metadata if the above two table properties are 0.
2911 // Don't consider the file at all if the modification time cannot be
2912 // correctly determined based on the above conditions.
2913 uint64_t file_modification_time = f->TryGetFileCreationTime();
2914 if (file_modification_time == kUnknownFileCreationTime) {
2915 file_modification_time = f->TryGetOldestAncesterTime();
2916 }
2917 if (file_modification_time == kUnknownOldestAncesterTime) {
2918 auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2919 f->fd.GetPathId());
2920 status = ioptions.env->GetFileModificationTime(
2921 file_path, &file_modification_time);
2922 if (!status.ok()) {
2923 ROCKS_LOG_WARN(ioptions.logger,
2924 "Can't get file modification time: %s: %s",
2925 file_path.c_str(), status.ToString().c_str());
2926 continue;
2927 }
2928 }
2929 if (file_modification_time > 0 &&
2930 file_modification_time < allowed_time_limit) {
2931 files_marked_for_periodic_compaction_.emplace_back(level, f);
2932 }
2933 }
2934 }
2935 }
2936 }
2937
ComputeFilesMarkedForForcedBlobGC(double blob_garbage_collection_age_cutoff,double blob_garbage_collection_force_threshold)2938 void VersionStorageInfo::ComputeFilesMarkedForForcedBlobGC(
2939 double blob_garbage_collection_age_cutoff,
2940 double blob_garbage_collection_force_threshold) {
2941 files_marked_for_forced_blob_gc_.clear();
2942
2943 if (blob_files_.empty()) {
2944 return;
2945 }
2946
2947 // Number of blob files eligible for GC based on age
2948 const size_t cutoff_count = static_cast<size_t>(
2949 blob_garbage_collection_age_cutoff * blob_files_.size());
2950 if (!cutoff_count) {
2951 return;
2952 }
2953
2954 // Compute the sum of total and garbage bytes over the oldest batch of blob
2955 // files. The oldest batch is defined as the set of blob files which are
2956 // kept alive by the same SSTs as the very oldest one. Here is a toy example.
2957 // Let's assume we have three SSTs 1, 2, and 3, and four blob files 10, 11,
2958 // 12, and 13. Also, let's say SSTs 1 and 2 both rely on blob file 10 and
2959 // potentially some higher-numbered ones, while SST 3 relies on blob file 12
2960 // and potentially some higher-numbered ones. Then, the SST to oldest blob
2961 // file mapping is as follows:
2962 //
2963 // SST file number Oldest blob file number
2964 // 1 10
2965 // 2 10
2966 // 3 12
2967 //
2968 // This is what the same thing looks like from the blob files' POV. (Note that
2969 // the linked SSTs simply denote the inverse mapping of the above.)
2970 //
2971 // Blob file number Linked SST set
2972 // 10 {1, 2}
2973 // 11 {}
2974 // 12 {3}
2975 // 13 {}
2976 //
2977 // Then, the oldest batch of blob files consists of blob files 10 and 11,
2978 // and we can get rid of them by forcing the compaction of SSTs 1 and 2.
2979 //
2980 // Note that the overall ratio of garbage computed for the batch has to exceed
2981 // blob_garbage_collection_force_threshold and the entire batch has to be
2982 // eligible for GC according to blob_garbage_collection_age_cutoff in order
2983 // for us to schedule any compactions.
2984 const auto oldest_it = blob_files_.begin();
2985
2986 const auto& oldest_meta = oldest_it->second;
2987 assert(oldest_meta);
2988
2989 const auto& linked_ssts = oldest_meta->GetLinkedSsts();
2990 assert(!linked_ssts.empty());
2991
2992 size_t count = 1;
2993 uint64_t sum_total_blob_bytes = oldest_meta->GetTotalBlobBytes();
2994 uint64_t sum_garbage_blob_bytes = oldest_meta->GetGarbageBlobBytes();
2995
2996 auto it = oldest_it;
2997 for (++it; it != blob_files_.end(); ++it) {
2998 const auto& meta = it->second;
2999 assert(meta);
3000
3001 if (!meta->GetLinkedSsts().empty()) {
3002 break;
3003 }
3004
3005 if (++count > cutoff_count) {
3006 return;
3007 }
3008
3009 sum_total_blob_bytes += meta->GetTotalBlobBytes();
3010 sum_garbage_blob_bytes += meta->GetGarbageBlobBytes();
3011 }
3012
3013 if (sum_garbage_blob_bytes <
3014 blob_garbage_collection_force_threshold * sum_total_blob_bytes) {
3015 return;
3016 }
3017
3018 for (uint64_t sst_file_number : linked_ssts) {
3019 const FileLocation location = GetFileLocation(sst_file_number);
3020 assert(location.IsValid());
3021
3022 const int level = location.GetLevel();
3023 assert(level >= 0);
3024
3025 const size_t pos = location.GetPosition();
3026
3027 FileMetaData* const sst_meta = files_[level][pos];
3028 assert(sst_meta);
3029
3030 if (sst_meta->being_compacted) {
3031 continue;
3032 }
3033
3034 files_marked_for_forced_blob_gc_.emplace_back(level, sst_meta);
3035 }
3036 }
3037
3038 namespace {
3039
3040 // used to sort files by size
3041 struct Fsize {
3042 size_t index;
3043 FileMetaData* file;
3044 };
3045
3046 // Comparator that is used to sort files based on their size
3047 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)3048 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
3049 return (first.file->compensated_file_size >
3050 second.file->compensated_file_size);
3051 }
3052 } // anonymous namespace
3053
AddFile(int level,FileMetaData * f)3054 void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
3055 auto& level_files = files_[level];
3056 level_files.push_back(f);
3057
3058 f->refs++;
3059
3060 const uint64_t file_number = f->fd.GetNumber();
3061
3062 assert(file_locations_.find(file_number) == file_locations_.end());
3063 file_locations_.emplace(file_number,
3064 FileLocation(level, level_files.size() - 1));
3065 }
3066
AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta)3067 void VersionStorageInfo::AddBlobFile(
3068 std::shared_ptr<BlobFileMetaData> blob_file_meta) {
3069 assert(blob_file_meta);
3070
3071 const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
3072
3073 auto it = blob_files_.lower_bound(blob_file_number);
3074 assert(it == blob_files_.end() || it->first != blob_file_number);
3075
3076 blob_files_.insert(
3077 it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
3078 }
3079
3080 // Version::PrepareApply() need to be called before calling the function, or
3081 // following functions called:
3082 // 1. UpdateNumNonEmptyLevels();
3083 // 2. CalculateBaseBytes();
3084 // 3. UpdateFilesByCompactionPri();
3085 // 4. GenerateFileIndexer();
3086 // 5. GenerateLevelFilesBrief();
3087 // 6. GenerateLevel0NonOverlapping();
3088 // 7. GenerateBottommostFiles();
SetFinalized()3089 void VersionStorageInfo::SetFinalized() {
3090 finalized_ = true;
3091 #ifndef NDEBUG
3092 if (compaction_style_ != kCompactionStyleLevel) {
3093 // Not level based compaction.
3094 return;
3095 }
3096 assert(base_level_ < 0 || num_levels() == 1 ||
3097 (base_level_ >= 1 && base_level_ < num_levels()));
3098 // Verify all levels newer than base_level are empty except L0
3099 for (int level = 1; level < base_level(); level++) {
3100 assert(NumLevelBytes(level) == 0);
3101 }
3102 uint64_t max_bytes_prev_level = 0;
3103 for (int level = base_level(); level < num_levels() - 1; level++) {
3104 if (LevelFiles(level).size() == 0) {
3105 continue;
3106 }
3107 assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
3108 max_bytes_prev_level = MaxBytesForLevel(level);
3109 }
3110 int num_empty_non_l0_level = 0;
3111 for (int level = 0; level < num_levels(); level++) {
3112 assert(LevelFiles(level).size() == 0 ||
3113 LevelFiles(level).size() == LevelFilesBrief(level).num_files);
3114 if (level > 0 && NumLevelBytes(level) > 0) {
3115 num_empty_non_l0_level++;
3116 }
3117 if (LevelFiles(level).size() > 0) {
3118 assert(level < num_non_empty_levels());
3119 }
3120 }
3121 assert(compaction_level_.size() > 0);
3122 assert(compaction_level_.size() == compaction_score_.size());
3123 #endif
3124 }
3125
UpdateNumNonEmptyLevels()3126 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
3127 num_non_empty_levels_ = num_levels_;
3128 for (int i = num_levels_ - 1; i >= 0; i--) {
3129 if (files_[i].size() != 0) {
3130 return;
3131 } else {
3132 num_non_empty_levels_ = i;
3133 }
3134 }
3135 }
3136
3137 namespace {
3138 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)3139 void SortFileByOverlappingRatio(
3140 const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
3141 const std::vector<FileMetaData*>& next_level_files,
3142 std::vector<Fsize>* temp) {
3143 std::unordered_map<uint64_t, uint64_t> file_to_order;
3144 auto next_level_it = next_level_files.begin();
3145
3146 for (auto& file : files) {
3147 uint64_t overlapping_bytes = 0;
3148 // Skip files in next level that is smaller than current file
3149 while (next_level_it != next_level_files.end() &&
3150 icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
3151 next_level_it++;
3152 }
3153
3154 while (next_level_it != next_level_files.end() &&
3155 icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
3156 overlapping_bytes += (*next_level_it)->fd.file_size;
3157
3158 if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
3159 // next level file cross large boundary of current file.
3160 break;
3161 }
3162 next_level_it++;
3163 }
3164
3165 assert(file->compensated_file_size != 0);
3166 file_to_order[file->fd.GetNumber()] =
3167 overlapping_bytes * 1024u / file->compensated_file_size;
3168 }
3169
3170 std::sort(temp->begin(), temp->end(),
3171 [&](const Fsize& f1, const Fsize& f2) -> bool {
3172 return file_to_order[f1.file->fd.GetNumber()] <
3173 file_to_order[f2.file->fd.GetNumber()];
3174 });
3175 }
3176 } // namespace
3177
UpdateFilesByCompactionPri(CompactionPri compaction_pri)3178 void VersionStorageInfo::UpdateFilesByCompactionPri(
3179 CompactionPri compaction_pri) {
3180 if (compaction_style_ == kCompactionStyleNone ||
3181 compaction_style_ == kCompactionStyleFIFO ||
3182 compaction_style_ == kCompactionStyleUniversal) {
3183 // don't need this
3184 return;
3185 }
3186 // No need to sort the highest level because it is never compacted.
3187 for (int level = 0; level < num_levels() - 1; level++) {
3188 const std::vector<FileMetaData*>& files = files_[level];
3189 auto& files_by_compaction_pri = files_by_compaction_pri_[level];
3190 assert(files_by_compaction_pri.size() == 0);
3191
3192 // populate a temp vector for sorting based on size
3193 std::vector<Fsize> temp(files.size());
3194 for (size_t i = 0; i < files.size(); i++) {
3195 temp[i].index = i;
3196 temp[i].file = files[i];
3197 }
3198
3199 // sort the top number_of_files_to_sort_ based on file size
3200 size_t num = VersionStorageInfo::kNumberFilesToSort;
3201 if (num > temp.size()) {
3202 num = temp.size();
3203 }
3204 switch (compaction_pri) {
3205 case kByCompensatedSize:
3206 std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
3207 CompareCompensatedSizeDescending);
3208 break;
3209 case kOldestLargestSeqFirst:
3210 std::sort(temp.begin(), temp.end(),
3211 [](const Fsize& f1, const Fsize& f2) -> bool {
3212 return f1.file->fd.largest_seqno <
3213 f2.file->fd.largest_seqno;
3214 });
3215 break;
3216 case kOldestSmallestSeqFirst:
3217 std::sort(temp.begin(), temp.end(),
3218 [](const Fsize& f1, const Fsize& f2) -> bool {
3219 return f1.file->fd.smallest_seqno <
3220 f2.file->fd.smallest_seqno;
3221 });
3222 break;
3223 case kMinOverlappingRatio:
3224 SortFileByOverlappingRatio(*internal_comparator_, files_[level],
3225 files_[level + 1], &temp);
3226 break;
3227 default:
3228 assert(false);
3229 }
3230 assert(temp.size() == files.size());
3231
3232 // initialize files_by_compaction_pri_
3233 for (size_t i = 0; i < temp.size(); i++) {
3234 files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
3235 }
3236 next_file_to_compact_by_size_[level] = 0;
3237 assert(files_[level].size() == files_by_compaction_pri_[level].size());
3238 }
3239 }
3240
GenerateLevel0NonOverlapping()3241 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
3242 assert(!finalized_);
3243 level0_non_overlapping_ = true;
3244 if (level_files_brief_.size() == 0) {
3245 return;
3246 }
3247
3248 // A copy of L0 files sorted by smallest key
3249 std::vector<FdWithKeyRange> level0_sorted_file(
3250 level_files_brief_[0].files,
3251 level_files_brief_[0].files + level_files_brief_[0].num_files);
3252 std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
3253 [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
3254 return (internal_comparator_->Compare(f1.smallest_key,
3255 f2.smallest_key) < 0);
3256 });
3257
3258 for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
3259 FdWithKeyRange& f = level0_sorted_file[i];
3260 FdWithKeyRange& prev = level0_sorted_file[i - 1];
3261 if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
3262 level0_non_overlapping_ = false;
3263 break;
3264 }
3265 }
3266 }
3267
GenerateBottommostFiles()3268 void VersionStorageInfo::GenerateBottommostFiles() {
3269 assert(!finalized_);
3270 assert(bottommost_files_.empty());
3271 for (size_t level = 0; level < level_files_brief_.size(); ++level) {
3272 for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
3273 ++file_idx) {
3274 const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
3275 int l0_file_idx;
3276 if (level == 0) {
3277 l0_file_idx = static_cast<int>(file_idx);
3278 } else {
3279 l0_file_idx = -1;
3280 }
3281 Slice smallest_user_key = ExtractUserKey(f.smallest_key);
3282 Slice largest_user_key = ExtractUserKey(f.largest_key);
3283 if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
3284 static_cast<int>(level),
3285 l0_file_idx)) {
3286 bottommost_files_.emplace_back(static_cast<int>(level),
3287 f.file_metadata);
3288 }
3289 }
3290 }
3291 }
3292
UpdateOldestSnapshot(SequenceNumber seqnum)3293 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
3294 assert(seqnum >= oldest_snapshot_seqnum_);
3295 oldest_snapshot_seqnum_ = seqnum;
3296 if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
3297 ComputeBottommostFilesMarkedForCompaction();
3298 }
3299 }
3300
ComputeBottommostFilesMarkedForCompaction()3301 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
3302 bottommost_files_marked_for_compaction_.clear();
3303 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3304 for (auto& level_and_file : bottommost_files_) {
3305 if (!level_and_file.second->being_compacted &&
3306 level_and_file.second->fd.largest_seqno != 0) {
3307 // largest_seqno might be nonzero due to containing the final key in an
3308 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
3309 // ensures the file really contains deleted or overwritten keys.
3310 if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
3311 bottommost_files_marked_for_compaction_.push_back(level_and_file);
3312 } else {
3313 bottommost_files_mark_threshold_ =
3314 std::min(bottommost_files_mark_threshold_,
3315 level_and_file.second->fd.largest_seqno);
3316 }
3317 }
3318 }
3319 }
3320
Ref()3321 void Version::Ref() {
3322 ++refs_;
3323 }
3324
Unref()3325 bool Version::Unref() {
3326 assert(refs_ >= 1);
3327 --refs_;
3328 if (refs_ == 0) {
3329 delete this;
3330 return true;
3331 }
3332 return false;
3333 }
3334
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)3335 bool VersionStorageInfo::OverlapInLevel(int level,
3336 const Slice* smallest_user_key,
3337 const Slice* largest_user_key) {
3338 if (level >= num_non_empty_levels_) {
3339 // empty level, no overlap
3340 return false;
3341 }
3342 return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
3343 level_files_brief_[level], smallest_user_key,
3344 largest_user_key);
3345 }
3346
3347 // Store in "*inputs" all files in "level" that overlap [begin,end]
3348 // If hint_index is specified, then it points to a file in the
3349 // overlapping range.
3350 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const3351 void VersionStorageInfo::GetOverlappingInputs(
3352 int level, const InternalKey* begin, const InternalKey* end,
3353 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3354 bool expand_range, InternalKey** next_smallest) const {
3355 if (level >= num_non_empty_levels_) {
3356 // this level is empty, no overlapping inputs
3357 return;
3358 }
3359
3360 inputs->clear();
3361 if (file_index) {
3362 *file_index = -1;
3363 }
3364 const Comparator* user_cmp = user_comparator_;
3365 if (level > 0) {
3366 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
3367 file_index, false, next_smallest);
3368 return;
3369 }
3370
3371 if (next_smallest) {
3372 // next_smallest key only makes sense for non-level 0, where files are
3373 // non-overlapping
3374 *next_smallest = nullptr;
3375 }
3376
3377 Slice user_begin, user_end;
3378 if (begin != nullptr) {
3379 user_begin = begin->user_key();
3380 }
3381 if (end != nullptr) {
3382 user_end = end->user_key();
3383 }
3384
3385 // index stores the file index need to check.
3386 std::list<size_t> index;
3387 for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
3388 index.emplace_back(i);
3389 }
3390
3391 while (!index.empty()) {
3392 bool found_overlapping_file = false;
3393 auto iter = index.begin();
3394 while (iter != index.end()) {
3395 FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
3396 const Slice file_start = ExtractUserKey(f->smallest_key);
3397 const Slice file_limit = ExtractUserKey(f->largest_key);
3398 if (begin != nullptr &&
3399 user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
3400 // "f" is completely before specified range; skip it
3401 iter++;
3402 } else if (end != nullptr &&
3403 user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
3404 // "f" is completely after specified range; skip it
3405 iter++;
3406 } else {
3407 // if overlap
3408 inputs->emplace_back(files_[level][*iter]);
3409 found_overlapping_file = true;
3410 // record the first file index.
3411 if (file_index && *file_index == -1) {
3412 *file_index = static_cast<int>(*iter);
3413 }
3414 // the related file is overlap, erase to avoid checking again.
3415 iter = index.erase(iter);
3416 if (expand_range) {
3417 if (begin != nullptr &&
3418 user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
3419 user_begin = file_start;
3420 }
3421 if (end != nullptr &&
3422 user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
3423 user_end = file_limit;
3424 }
3425 }
3426 }
3427 }
3428 // if all the files left are not overlap, break
3429 if (!found_overlapping_file) {
3430 break;
3431 }
3432 }
3433 }
3434
3435 // Store in "*inputs" files in "level" that within range [begin,end]
3436 // Guarantee a "clean cut" boundary between the files in inputs
3437 // and the surrounding files and the maxinum number of files.
3438 // This will ensure that no parts of a key are lost during compaction.
3439 // If hint_index is specified, then it points to a file in the range.
3440 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const3441 void VersionStorageInfo::GetCleanInputsWithinInterval(
3442 int level, const InternalKey* begin, const InternalKey* end,
3443 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
3444 inputs->clear();
3445 if (file_index) {
3446 *file_index = -1;
3447 }
3448 if (level >= num_non_empty_levels_ || level == 0 ||
3449 level_files_brief_[level].num_files == 0) {
3450 // this level is empty, no inputs within range
3451 // also don't support clean input interval within L0
3452 return;
3453 }
3454
3455 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3456 hint_index, file_index,
3457 true /* within_interval */);
3458 }
3459
3460 // Store in "*inputs" all files in "level" that overlap [begin,end]
3461 // Employ binary search to find at least one file that overlaps the
3462 // specified range. From that file, iterate backwards and
3463 // forwards to find all overlapping files.
3464 // if within_range is set, then only store the maximum clean inputs
3465 // within range [begin, end]. "clean" means there is a boundary
3466 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3467 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3468 int level, const InternalKey* begin, const InternalKey* end,
3469 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3470 bool within_interval, InternalKey** next_smallest) const {
3471 assert(level > 0);
3472
3473 auto user_cmp = user_comparator_;
3474 const FdWithKeyRange* files = level_files_brief_[level].files;
3475 const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3476
3477 // begin to use binary search to find lower bound
3478 // and upper bound.
3479 int start_index = 0;
3480 int end_index = num_files;
3481
3482 if (begin != nullptr) {
3483 // if within_interval is true, with file_key would find
3484 // not overlapping ranges in std::lower_bound.
3485 auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3486 const InternalKey* k) {
3487 auto& file_key = within_interval ? f.file_metadata->smallest
3488 : f.file_metadata->largest;
3489 return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3490 };
3491
3492 start_index = static_cast<int>(
3493 std::lower_bound(files,
3494 files + (hint_index == -1 ? num_files : hint_index),
3495 begin, cmp) -
3496 files);
3497
3498 if (start_index > 0 && within_interval) {
3499 bool is_overlapping = true;
3500 while (is_overlapping && start_index < num_files) {
3501 auto& pre_limit = files[start_index - 1].file_metadata->largest;
3502 auto& cur_start = files[start_index].file_metadata->smallest;
3503 is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3504 start_index += is_overlapping;
3505 }
3506 }
3507 }
3508
3509 if (end != nullptr) {
3510 // if within_interval is true, with file_key would find
3511 // not overlapping ranges in std::upper_bound.
3512 auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3513 const FdWithKeyRange& f) {
3514 auto& file_key = within_interval ? f.file_metadata->largest
3515 : f.file_metadata->smallest;
3516 return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3517 };
3518
3519 end_index = static_cast<int>(
3520 std::upper_bound(files + start_index, files + num_files, end, cmp) -
3521 files);
3522
3523 if (end_index < num_files && within_interval) {
3524 bool is_overlapping = true;
3525 while (is_overlapping && end_index > start_index) {
3526 auto& next_start = files[end_index].file_metadata->smallest;
3527 auto& cur_limit = files[end_index - 1].file_metadata->largest;
3528 is_overlapping =
3529 sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3530 end_index -= is_overlapping;
3531 }
3532 }
3533 }
3534
3535 assert(start_index <= end_index);
3536
3537 // If there were no overlapping files, return immediately.
3538 if (start_index == end_index) {
3539 if (next_smallest) {
3540 *next_smallest = nullptr;
3541 }
3542 return;
3543 }
3544
3545 assert(start_index < end_index);
3546
3547 // returns the index where an overlap is found
3548 if (file_index) {
3549 *file_index = start_index;
3550 }
3551
3552 // insert overlapping files into vector
3553 for (int i = start_index; i < end_index; i++) {
3554 inputs->push_back(files_[level][i]);
3555 }
3556
3557 if (next_smallest != nullptr) {
3558 // Provide the next key outside the range covered by inputs
3559 if (end_index < static_cast<int>(files_[level].size())) {
3560 **next_smallest = files_[level][end_index]->smallest;
3561 } else {
3562 *next_smallest = nullptr;
3563 }
3564 }
3565 }
3566
NumLevelBytes(int level) const3567 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3568 assert(level >= 0);
3569 assert(level < num_levels());
3570 return TotalFileSize(files_[level]);
3571 }
3572
LevelSummary(LevelSummaryStorage * scratch) const3573 const char* VersionStorageInfo::LevelSummary(
3574 LevelSummaryStorage* scratch) const {
3575 int len = 0;
3576 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3577 assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3578 if (level_multiplier_ != 0.0) {
3579 len = snprintf(
3580 scratch->buffer, sizeof(scratch->buffer),
3581 "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3582 base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3583 }
3584 }
3585 len +=
3586 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3587 for (int i = 0; i < num_levels(); i++) {
3588 int sz = sizeof(scratch->buffer) - len;
3589 int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3590 if (ret < 0 || ret >= sz) break;
3591 len += ret;
3592 }
3593 if (len > 0) {
3594 // overwrite the last space
3595 --len;
3596 }
3597 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3598 "] max score %.2f", compaction_score_[0]);
3599
3600 if (!files_marked_for_compaction_.empty()) {
3601 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3602 " (%" ROCKSDB_PRIszt " files need compaction)",
3603 files_marked_for_compaction_.size());
3604 }
3605
3606 return scratch->buffer;
3607 }
3608
LevelFileSummary(FileSummaryStorage * scratch,int level) const3609 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3610 int level) const {
3611 int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3612 for (const auto& f : files_[level]) {
3613 int sz = sizeof(scratch->buffer) - len;
3614 char sztxt[16];
3615 AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3616 int ret = snprintf(scratch->buffer + len, sz,
3617 "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3618 f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3619 static_cast<int>(f->being_compacted));
3620 if (ret < 0 || ret >= sz)
3621 break;
3622 len += ret;
3623 }
3624 // overwrite the last space (only if files_[level].size() is non-zero)
3625 if (files_[level].size() && len > 0) {
3626 --len;
3627 }
3628 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3629 return scratch->buffer;
3630 }
3631
MaxNextLevelOverlappingBytes()3632 uint64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3633 uint64_t result = 0;
3634 std::vector<FileMetaData*> overlaps;
3635 for (int level = 1; level < num_levels() - 1; level++) {
3636 for (const auto& f : files_[level]) {
3637 GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3638 const uint64_t sum = TotalFileSize(overlaps);
3639 if (sum > result) {
3640 result = sum;
3641 }
3642 }
3643 }
3644 return result;
3645 }
3646
MaxBytesForLevel(int level) const3647 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3648 // Note: the result for level zero is not really used since we set
3649 // the level-0 compaction threshold based on number of files.
3650 assert(level >= 0);
3651 assert(level < static_cast<int>(level_max_bytes_.size()));
3652 return level_max_bytes_[level];
3653 }
3654
CalculateBaseBytes(const ImmutableOptions & ioptions,const MutableCFOptions & options)3655 void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
3656 const MutableCFOptions& options) {
3657 // Special logic to set number of sorted runs.
3658 // It is to match the previous behavior when all files are in L0.
3659 int num_l0_count = static_cast<int>(files_[0].size());
3660 if (compaction_style_ == kCompactionStyleUniversal) {
3661 // For universal compaction, we use level0 score to indicate
3662 // compaction score for the whole DB. Adding other levels as if
3663 // they are L0 files.
3664 for (int i = 1; i < num_levels(); i++) {
3665 if (!files_[i].empty()) {
3666 num_l0_count++;
3667 }
3668 }
3669 }
3670 set_l0_delay_trigger_count(num_l0_count);
3671
3672 level_max_bytes_.resize(ioptions.num_levels);
3673 if (!ioptions.level_compaction_dynamic_level_bytes) {
3674 base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3675
3676 // Calculate for static bytes base case
3677 for (int i = 0; i < ioptions.num_levels; ++i) {
3678 if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3679 level_max_bytes_[i] = options.max_bytes_for_level_base;
3680 } else if (i > 1) {
3681 level_max_bytes_[i] = MultiplyCheckOverflow(
3682 MultiplyCheckOverflow(level_max_bytes_[i - 1],
3683 options.max_bytes_for_level_multiplier),
3684 options.MaxBytesMultiplerAdditional(i - 1));
3685 } else {
3686 level_max_bytes_[i] = options.max_bytes_for_level_base;
3687 }
3688 }
3689 } else {
3690 uint64_t max_level_size = 0;
3691
3692 int first_non_empty_level = -1;
3693 // Find size of non-L0 level of most data.
3694 // Cannot use the size of the last level because it can be empty or less
3695 // than previous levels after compaction.
3696 for (int i = 1; i < num_levels_; i++) {
3697 uint64_t total_size = 0;
3698 for (const auto& f : files_[i]) {
3699 total_size += f->fd.GetFileSize();
3700 }
3701 if (total_size > 0 && first_non_empty_level == -1) {
3702 first_non_empty_level = i;
3703 }
3704 if (total_size > max_level_size) {
3705 max_level_size = total_size;
3706 }
3707 }
3708
3709 // Prefill every level's max bytes to disallow compaction from there.
3710 for (int i = 0; i < num_levels_; i++) {
3711 level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3712 }
3713
3714 if (max_level_size == 0) {
3715 // No data for L1 and up. L0 compacts to last level directly.
3716 // No compaction from L1+ needs to be scheduled.
3717 base_level_ = num_levels_ - 1;
3718 } else {
3719 uint64_t l0_size = 0;
3720 for (const auto& f : files_[0]) {
3721 l0_size += f->fd.GetFileSize();
3722 }
3723
3724 uint64_t base_bytes_max =
3725 std::max(options.max_bytes_for_level_base, l0_size);
3726 uint64_t base_bytes_min = static_cast<uint64_t>(
3727 base_bytes_max / options.max_bytes_for_level_multiplier);
3728
3729 // Try whether we can make last level's target size to be max_level_size
3730 uint64_t cur_level_size = max_level_size;
3731 for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3732 // Round up after dividing
3733 cur_level_size = static_cast<uint64_t>(
3734 cur_level_size / options.max_bytes_for_level_multiplier);
3735 }
3736
3737 // Calculate base level and its size.
3738 uint64_t base_level_size;
3739 if (cur_level_size <= base_bytes_min) {
3740 // Case 1. If we make target size of last level to be max_level_size,
3741 // target size of the first non-empty level would be smaller than
3742 // base_bytes_min. We set it be base_bytes_min.
3743 base_level_size = base_bytes_min + 1U;
3744 base_level_ = first_non_empty_level;
3745 ROCKS_LOG_INFO(ioptions.logger,
3746 "More existing levels in DB than needed. "
3747 "max_bytes_for_level_multiplier may not be guaranteed.");
3748 } else {
3749 // Find base level (where L0 data is compacted to).
3750 base_level_ = first_non_empty_level;
3751 while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3752 --base_level_;
3753 cur_level_size = static_cast<uint64_t>(
3754 cur_level_size / options.max_bytes_for_level_multiplier);
3755 }
3756 if (cur_level_size > base_bytes_max) {
3757 // Even L1 will be too large
3758 assert(base_level_ == 1);
3759 base_level_size = base_bytes_max;
3760 } else {
3761 base_level_size = cur_level_size;
3762 }
3763 }
3764
3765 level_multiplier_ = options.max_bytes_for_level_multiplier;
3766 assert(base_level_size > 0);
3767 if (l0_size > base_level_size &&
3768 (l0_size > options.max_bytes_for_level_base ||
3769 static_cast<int>(files_[0].size() / 2) >=
3770 options.level0_file_num_compaction_trigger)) {
3771 // We adjust the base level according to actual L0 size, and adjust
3772 // the level multiplier accordingly, when:
3773 // 1. the L0 size is larger than level size base, or
3774 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
3775 // We don't do this otherwise to keep the LSM-tree structure stable
3776 // unless the L0 compaction is backlogged.
3777 base_level_size = l0_size;
3778 if (base_level_ == num_levels_ - 1) {
3779 level_multiplier_ = 1.0;
3780 } else {
3781 level_multiplier_ = std::pow(
3782 static_cast<double>(max_level_size) /
3783 static_cast<double>(base_level_size),
3784 1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3785 }
3786 }
3787
3788 uint64_t level_size = base_level_size;
3789 for (int i = base_level_; i < num_levels_; i++) {
3790 if (i > base_level_) {
3791 level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3792 }
3793 // Don't set any level below base_bytes_max. Otherwise, the LSM can
3794 // assume an hourglass shape where L1+ sizes are smaller than L0. This
3795 // causes compaction scoring, which depends on level sizes, to favor L1+
3796 // at the expense of L0, which may fill up and stall.
3797 level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3798 }
3799 }
3800 }
3801 }
3802
EstimateLiveDataSize() const3803 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3804 // Estimate the live data size by adding up the size of a maximal set of
3805 // sst files with no range overlap in same or higher level. The less
3806 // compacted, the more optimistic (smaller) this estimate is. Also,
3807 // for multiple sorted runs within a level, file order will matter.
3808 uint64_t size = 0;
3809
3810 auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3811 return internal_comparator_->Compare(*x, *y) < 0;
3812 };
3813 // (Ordered) map of largest keys in files being included in size estimate
3814 std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3815
3816 for (int l = num_levels_ - 1; l >= 0; l--) {
3817 bool found_end = false;
3818 for (auto file : files_[l]) {
3819 // Find the first file already included with largest key is larger than
3820 // the smallest key of `file`. If that file does not overlap with the
3821 // current file, none of the files in the map does. If there is
3822 // no potential overlap, we can safely insert the rest of this level
3823 // (if the level is not 0) into the map without checking again because
3824 // the elements in the level are sorted and non-overlapping.
3825 auto lb = (found_end && l != 0) ?
3826 ranges.end() : ranges.lower_bound(&file->smallest);
3827 found_end = (lb == ranges.end());
3828 if (found_end || internal_comparator_->Compare(
3829 file->largest, (*lb).second->smallest) < 0) {
3830 ranges.emplace_hint(lb, &file->largest, file);
3831 size += file->fd.file_size;
3832 }
3833 }
3834 }
3835 // For BlobDB, the result also includes the exact value of live bytes in the
3836 // blob files of the version.
3837 const auto& blobFiles = GetBlobFiles();
3838 for (const auto& pair : blobFiles) {
3839 const auto& meta = pair.second;
3840 size += meta->GetTotalBlobBytes();
3841 size -= meta->GetGarbageBlobBytes();
3842 }
3843 return size;
3844 }
3845
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3846 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3847 const Slice& smallest_user_key, const Slice& largest_user_key,
3848 int last_level, int last_l0_idx) {
3849 assert((last_l0_idx != -1) == (last_level == 0));
3850 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3851 // bottommost only if it's the oldest L0 file and there are no files on older
3852 // levels. It'd be better to consider it bottommost if there's no overlap in
3853 // older levels/files.
3854 if (last_level == 0 &&
3855 last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3856 return true;
3857 }
3858
3859 // Checks whether there are files living beyond the `last_level`. If lower
3860 // levels have files, it checks for overlap between [`smallest_key`,
3861 // `largest_key`] and those files. Bottomlevel optimizations can be made if
3862 // there are no files in lower levels or if there is no overlap with the files
3863 // in the lower levels.
3864 for (int level = last_level + 1; level < num_levels(); level++) {
3865 // The range is not in the bottommost level if there are files in lower
3866 // levels when the `last_level` is 0 or if there are files in lower levels
3867 // which overlap with [`smallest_key`, `largest_key`].
3868 if (files_[level].size() > 0 &&
3869 (last_level == 0 ||
3870 OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3871 return true;
3872 }
3873 }
3874 return false;
3875 }
3876
AddLiveFiles(std::vector<uint64_t> * live_table_files,std::vector<uint64_t> * live_blob_files) const3877 void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
3878 std::vector<uint64_t>* live_blob_files) const {
3879 assert(live_table_files);
3880 assert(live_blob_files);
3881
3882 for (int level = 0; level < storage_info_.num_levels(); ++level) {
3883 const auto& level_files = storage_info_.LevelFiles(level);
3884 for (const auto& meta : level_files) {
3885 assert(meta);
3886
3887 live_table_files->emplace_back(meta->fd.GetNumber());
3888 }
3889 }
3890
3891 const auto& blob_files = storage_info_.GetBlobFiles();
3892 for (const auto& pair : blob_files) {
3893 const auto& meta = pair.second;
3894 assert(meta);
3895
3896 live_blob_files->emplace_back(meta->GetBlobFileNumber());
3897 }
3898 }
3899
DebugString(bool hex,bool print_stats) const3900 std::string Version::DebugString(bool hex, bool print_stats) const {
3901 std::string r;
3902 for (int level = 0; level < storage_info_.num_levels_; level++) {
3903 // E.g.,
3904 // --- level 1 ---
3905 // 17:123[1 .. 124]['a' .. 'd']
3906 // 20:43[124 .. 128]['e' .. 'g']
3907 //
3908 // if print_stats=true:
3909 // 17:123[1 .. 124]['a' .. 'd'](4096)
3910 r.append("--- level ");
3911 AppendNumberTo(&r, level);
3912 r.append(" --- version# ");
3913 AppendNumberTo(&r, version_number_);
3914 r.append(" ---\n");
3915 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3916 for (size_t i = 0; i < files.size(); i++) {
3917 r.push_back(' ');
3918 AppendNumberTo(&r, files[i]->fd.GetNumber());
3919 r.push_back(':');
3920 AppendNumberTo(&r, files[i]->fd.GetFileSize());
3921 r.append("[");
3922 AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3923 r.append(" .. ");
3924 AppendNumberTo(&r, files[i]->fd.largest_seqno);
3925 r.append("]");
3926 r.append("[");
3927 r.append(files[i]->smallest.DebugString(hex));
3928 r.append(" .. ");
3929 r.append(files[i]->largest.DebugString(hex));
3930 r.append("]");
3931 if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3932 r.append(" blob_file:");
3933 AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3934 }
3935 if (print_stats) {
3936 r.append("(");
3937 r.append(ToString(
3938 files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3939 r.append(")");
3940 }
3941 r.append("\n");
3942 }
3943 }
3944
3945 const auto& blob_files = storage_info_.GetBlobFiles();
3946 if (!blob_files.empty()) {
3947 r.append("--- blob files --- version# ");
3948 AppendNumberTo(&r, version_number_);
3949 r.append(" ---\n");
3950 for (const auto& pair : blob_files) {
3951 const auto& blob_file_meta = pair.second;
3952 assert(blob_file_meta);
3953
3954 r.append(blob_file_meta->DebugString());
3955 r.push_back('\n');
3956 }
3957 }
3958
3959 return r;
3960 }
3961
3962 // this is used to batch writes to the manifest file
3963 struct VersionSet::ManifestWriter {
3964 Status status;
3965 bool done;
3966 InstrumentedCondVar cv;
3967 ColumnFamilyData* cfd;
3968 const MutableCFOptions mutable_cf_options;
3969 const autovector<VersionEdit*>& edit_list;
3970 const std::function<void(const Status&)> manifest_write_callback;
3971
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3972 explicit ManifestWriter(
3973 InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3974 const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
3975 const std::function<void(const Status&)>& manifest_wcb)
3976 : done(false),
3977 cv(mu),
3978 cfd(_cfd),
3979 mutable_cf_options(cf_options),
3980 edit_list(e),
3981 manifest_write_callback(manifest_wcb) {}
~ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3982 ~ManifestWriter() { status.PermitUncheckedError(); }
3983
IsAllWalEditsROCKSDB_NAMESPACE::VersionSet::ManifestWriter3984 bool IsAllWalEdits() const {
3985 bool all_wal_edits = true;
3986 for (const auto& e : edit_list) {
3987 if (!e->IsWalManipulation()) {
3988 all_wal_edits = false;
3989 break;
3990 }
3991 }
3992 return all_wal_edits;
3993 }
3994 };
3995
AddEdit(VersionEdit * edit)3996 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3997 assert(edit);
3998 if (edit->is_in_atomic_group_) {
3999 TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
4000 if (replay_buffer_.empty()) {
4001 replay_buffer_.resize(edit->remaining_entries_ + 1);
4002 TEST_SYNC_POINT_CALLBACK(
4003 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
4004 }
4005 read_edits_in_atomic_group_++;
4006 if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
4007 static_cast<uint32_t>(replay_buffer_.size())) {
4008 TEST_SYNC_POINT_CALLBACK(
4009 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
4010 return Status::Corruption("corrupted atomic group");
4011 }
4012 replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
4013 if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
4014 TEST_SYNC_POINT_CALLBACK(
4015 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
4016 return Status::OK();
4017 }
4018 return Status::OK();
4019 }
4020
4021 // A normal edit.
4022 if (!replay_buffer().empty()) {
4023 TEST_SYNC_POINT_CALLBACK(
4024 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
4025 return Status::Corruption("corrupted atomic group");
4026 }
4027 return Status::OK();
4028 }
4029
IsFull() const4030 bool AtomicGroupReadBuffer::IsFull() const {
4031 return read_edits_in_atomic_group_ == replay_buffer_.size();
4032 }
4033
IsEmpty() const4034 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
4035
Clear()4036 void AtomicGroupReadBuffer::Clear() {
4037 read_edits_in_atomic_group_ = 0;
4038 replay_buffer_.clear();
4039 }
4040
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_session_id)4041 VersionSet::VersionSet(const std::string& dbname,
4042 const ImmutableDBOptions* _db_options,
4043 const FileOptions& storage_options, Cache* table_cache,
4044 WriteBufferManager* write_buffer_manager,
4045 WriteController* write_controller,
4046 BlockCacheTracer* const block_cache_tracer,
4047 const std::shared_ptr<IOTracer>& io_tracer,
4048 const std::string& db_session_id)
4049 : column_family_set_(
4050 new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
4051 write_buffer_manager, write_controller,
4052 block_cache_tracer, io_tracer, db_session_id)),
4053 table_cache_(table_cache),
4054 env_(_db_options->env),
4055 fs_(_db_options->fs, io_tracer),
4056 clock_(_db_options->clock),
4057 dbname_(dbname),
4058 db_options_(_db_options),
4059 next_file_number_(2),
4060 manifest_file_number_(0), // Filled by Recover()
4061 options_file_number_(0),
4062 options_file_size_(0),
4063 pending_manifest_file_number_(0),
4064 last_sequence_(0),
4065 last_allocated_sequence_(0),
4066 last_published_sequence_(0),
4067 prev_log_number_(0),
4068 current_version_number_(0),
4069 manifest_file_size_(0),
4070 file_options_(storage_options),
4071 block_cache_tracer_(block_cache_tracer),
4072 io_tracer_(io_tracer),
4073 db_session_id_(db_session_id) {}
4074
~VersionSet()4075 VersionSet::~VersionSet() {
4076 // we need to delete column_family_set_ because its destructor depends on
4077 // VersionSet
4078 column_family_set_.reset();
4079 for (auto& file : obsolete_files_) {
4080 if (file.metadata->table_reader_handle) {
4081 table_cache_->Release(file.metadata->table_reader_handle);
4082 TableCache::Evict(table_cache_, file.metadata->fd.GetNumber());
4083 }
4084 file.DeleteMetadata();
4085 }
4086 obsolete_files_.clear();
4087 io_status_.PermitUncheckedError();
4088 }
4089
Reset()4090 void VersionSet::Reset() {
4091 if (column_family_set_) {
4092 WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
4093 WriteController* wc = column_family_set_->write_controller();
4094 column_family_set_.reset(new ColumnFamilySet(
4095 dbname_, db_options_, file_options_, table_cache_, wbm, wc,
4096 block_cache_tracer_, io_tracer_, db_session_id_));
4097 }
4098 db_id_.clear();
4099 next_file_number_.store(2);
4100 min_log_number_to_keep_2pc_.store(0);
4101 manifest_file_number_ = 0;
4102 options_file_number_ = 0;
4103 pending_manifest_file_number_ = 0;
4104 last_sequence_.store(0);
4105 last_allocated_sequence_.store(0);
4106 last_published_sequence_.store(0);
4107 prev_log_number_ = 0;
4108 descriptor_log_.reset();
4109 current_version_number_ = 0;
4110 manifest_writers_.clear();
4111 manifest_file_size_ = 0;
4112 obsolete_files_.clear();
4113 obsolete_manifests_.clear();
4114 wals_.Reset();
4115 }
4116
AppendVersion(ColumnFamilyData * column_family_data,Version * v)4117 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
4118 Version* v) {
4119 // compute new compaction score
4120 v->storage_info()->ComputeCompactionScore(
4121 *column_family_data->ioptions(),
4122 *column_family_data->GetLatestMutableCFOptions());
4123
4124 // Mark v finalized
4125 v->storage_info_.SetFinalized();
4126
4127 // Make "v" current
4128 assert(v->refs_ == 0);
4129 Version* current = column_family_data->current();
4130 assert(v != current);
4131 if (current != nullptr) {
4132 assert(current->refs_ > 0);
4133 current->Unref();
4134 }
4135 column_family_data->SetCurrent(v);
4136 v->Ref();
4137
4138 // Append to linked list
4139 v->prev_ = column_family_data->dummy_versions()->prev_;
4140 v->next_ = column_family_data->dummy_versions();
4141 v->prev_->next_ = v;
4142 v->next_->prev_ = v;
4143 }
4144
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)4145 Status VersionSet::ProcessManifestWrites(
4146 std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
4147 FSDirectory* db_directory, bool new_descriptor_log,
4148 const ColumnFamilyOptions* new_cf_options) {
4149 mu->AssertHeld();
4150 assert(!writers.empty());
4151 ManifestWriter& first_writer = writers.front();
4152 ManifestWriter* last_writer = &first_writer;
4153
4154 assert(!manifest_writers_.empty());
4155 assert(manifest_writers_.front() == &first_writer);
4156
4157 autovector<VersionEdit*> batch_edits;
4158 autovector<Version*> versions;
4159 autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
4160 std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
4161
4162 if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4163 // No group commits for column family add or drop
4164 LogAndApplyCFHelper(first_writer.edit_list.front());
4165 batch_edits.push_back(first_writer.edit_list.front());
4166 } else {
4167 auto it = manifest_writers_.cbegin();
4168 size_t group_start = std::numeric_limits<size_t>::max();
4169 while (it != manifest_writers_.cend()) {
4170 if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
4171 // no group commits for column family add or drop
4172 break;
4173 }
4174 last_writer = *(it++);
4175 assert(last_writer != nullptr);
4176 assert(last_writer->cfd != nullptr);
4177 if (last_writer->cfd->IsDropped()) {
4178 // If we detect a dropped CF at this point, and the corresponding
4179 // version edits belong to an atomic group, then we need to find out
4180 // the preceding version edits in the same atomic group, and update
4181 // their `remaining_entries_` member variable because we are NOT going
4182 // to write the version edits' of dropped CF to the MANIFEST. If we
4183 // don't update, then Recover can report corrupted atomic group because
4184 // the `remaining_entries_` do not match.
4185 if (!batch_edits.empty()) {
4186 if (batch_edits.back()->is_in_atomic_group_ &&
4187 batch_edits.back()->remaining_entries_ > 0) {
4188 assert(group_start < batch_edits.size());
4189 const auto& edit_list = last_writer->edit_list;
4190 size_t k = 0;
4191 while (k < edit_list.size()) {
4192 if (!edit_list[k]->is_in_atomic_group_) {
4193 break;
4194 } else if (edit_list[k]->remaining_entries_ == 0) {
4195 ++k;
4196 break;
4197 }
4198 ++k;
4199 }
4200 for (auto i = group_start; i < batch_edits.size(); ++i) {
4201 assert(static_cast<uint32_t>(k) <=
4202 batch_edits.back()->remaining_entries_);
4203 batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
4204 }
4205 }
4206 }
4207 continue;
4208 }
4209 // We do a linear search on versions because versions is small.
4210 // TODO(yanqin) maybe consider unordered_map
4211 Version* version = nullptr;
4212 VersionBuilder* builder = nullptr;
4213 for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
4214 uint32_t cf_id = last_writer->cfd->GetID();
4215 if (versions[i]->cfd()->GetID() == cf_id) {
4216 version = versions[i];
4217 assert(!builder_guards.empty() &&
4218 builder_guards.size() == versions.size());
4219 builder = builder_guards[i]->version_builder();
4220 TEST_SYNC_POINT_CALLBACK(
4221 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
4222 break;
4223 }
4224 }
4225 if (version == nullptr) {
4226 // WAL manipulations do not need to be applied to versions.
4227 if (!last_writer->IsAllWalEdits()) {
4228 version = new Version(last_writer->cfd, this, file_options_,
4229 last_writer->mutable_cf_options, io_tracer_,
4230 current_version_number_++);
4231 versions.push_back(version);
4232 mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
4233 builder_guards.emplace_back(
4234 new BaseReferencedVersionBuilder(last_writer->cfd));
4235 builder = builder_guards.back()->version_builder();
4236 }
4237 assert(last_writer->IsAllWalEdits() || builder);
4238 assert(last_writer->IsAllWalEdits() || version);
4239 TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
4240 version);
4241 }
4242 for (const auto& e : last_writer->edit_list) {
4243 if (e->is_in_atomic_group_) {
4244 if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
4245 (batch_edits.back()->is_in_atomic_group_ &&
4246 batch_edits.back()->remaining_entries_ == 0)) {
4247 group_start = batch_edits.size();
4248 }
4249 } else if (group_start != std::numeric_limits<size_t>::max()) {
4250 group_start = std::numeric_limits<size_t>::max();
4251 }
4252 Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
4253 if (!s.ok()) {
4254 // free up the allocated memory
4255 for (auto v : versions) {
4256 delete v;
4257 }
4258 return s;
4259 }
4260 batch_edits.push_back(e);
4261 }
4262 }
4263 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4264 assert(!builder_guards.empty() &&
4265 builder_guards.size() == versions.size());
4266 auto* builder = builder_guards[i]->version_builder();
4267 Status s = builder->SaveTo(versions[i]->storage_info());
4268 if (!s.ok()) {
4269 // free up the allocated memory
4270 for (auto v : versions) {
4271 delete v;
4272 }
4273 return s;
4274 }
4275 }
4276 }
4277
4278 #ifndef NDEBUG
4279 // Verify that version edits of atomic groups have correct
4280 // remaining_entries_.
4281 size_t k = 0;
4282 while (k < batch_edits.size()) {
4283 while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
4284 ++k;
4285 }
4286 if (k == batch_edits.size()) {
4287 break;
4288 }
4289 size_t i = k;
4290 while (i < batch_edits.size()) {
4291 if (!batch_edits[i]->is_in_atomic_group_) {
4292 break;
4293 }
4294 assert(i - k + batch_edits[i]->remaining_entries_ ==
4295 batch_edits[k]->remaining_entries_);
4296 if (batch_edits[i]->remaining_entries_ == 0) {
4297 ++i;
4298 break;
4299 }
4300 ++i;
4301 }
4302 assert(batch_edits[i - 1]->is_in_atomic_group_);
4303 assert(0 == batch_edits[i - 1]->remaining_entries_);
4304 std::vector<VersionEdit*> tmp;
4305 for (size_t j = k; j != i; ++j) {
4306 tmp.emplace_back(batch_edits[j]);
4307 }
4308 TEST_SYNC_POINT_CALLBACK(
4309 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
4310 k = i;
4311 }
4312 #endif // NDEBUG
4313
4314 assert(pending_manifest_file_number_ == 0);
4315 if (!descriptor_log_ ||
4316 manifest_file_size_ > db_options_->max_manifest_file_size) {
4317 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
4318 new_descriptor_log = true;
4319 } else {
4320 pending_manifest_file_number_ = manifest_file_number_;
4321 }
4322
4323 // Local cached copy of state variable(s). WriteCurrentStateToManifest()
4324 // reads its content after releasing db mutex to avoid race with
4325 // SwitchMemtable().
4326 std::unordered_map<uint32_t, MutableCFState> curr_state;
4327 VersionEdit wal_additions;
4328 if (new_descriptor_log) {
4329 pending_manifest_file_number_ = NewFileNumber();
4330 batch_edits.back()->SetNextFile(next_file_number_.load());
4331
4332 // if we are writing out new snapshot make sure to persist max column
4333 // family.
4334 if (column_family_set_->GetMaxColumnFamily() > 0) {
4335 first_writer.edit_list.front()->SetMaxColumnFamily(
4336 column_family_set_->GetMaxColumnFamily());
4337 }
4338 for (const auto* cfd : *column_family_set_) {
4339 assert(curr_state.find(cfd->GetID()) == curr_state.end());
4340 curr_state.emplace(std::make_pair(
4341 cfd->GetID(),
4342 MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow())));
4343 }
4344
4345 for (const auto& wal : wals_.GetWals()) {
4346 wal_additions.AddWal(wal.first, wal.second);
4347 }
4348 }
4349
4350 uint64_t new_manifest_file_size = 0;
4351 Status s;
4352 IOStatus io_s;
4353 IOStatus manifest_io_status;
4354 {
4355 FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
4356 mu->Unlock();
4357 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestStart");
4358 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
4359 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4360 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4361 assert(!builder_guards.empty() &&
4362 builder_guards.size() == versions.size());
4363 assert(!mutable_cf_options_ptrs.empty() &&
4364 builder_guards.size() == versions.size());
4365 ColumnFamilyData* cfd = versions[i]->cfd_;
4366 s = builder_guards[i]->version_builder()->LoadTableHandlers(
4367 cfd->internal_stats(), 1 /* max_threads */,
4368 true /* prefetch_index_and_filter_in_cache */,
4369 false /* is_initial_load */,
4370 mutable_cf_options_ptrs[i]->prefix_extractor.get(),
4371 MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
4372 if (!s.ok()) {
4373 if (db_options_->paranoid_checks) {
4374 break;
4375 }
4376 s = Status::OK();
4377 }
4378 }
4379 }
4380
4381 if (s.ok() && new_descriptor_log) {
4382 // This is fine because everything inside of this block is serialized --
4383 // only one thread can be here at the same time
4384 // create new manifest file
4385 ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
4386 pending_manifest_file_number_);
4387 std::string descriptor_fname =
4388 DescriptorFileName(dbname_, pending_manifest_file_number_);
4389 std::unique_ptr<FSWritableFile> descriptor_file;
4390 io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
4391 opt_file_opts);
4392 if (io_s.ok()) {
4393 descriptor_file->SetPreallocationBlockSize(
4394 db_options_->manifest_preallocation_size);
4395 FileTypeSet tmp_set = db_options_->checksum_handoff_file_types;
4396 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
4397 std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_,
4398 io_tracer_, nullptr, db_options_->listeners, nullptr,
4399 tmp_set.Contains(FileType::kDescriptorFile),
4400 tmp_set.Contains(FileType::kDescriptorFile)));
4401 descriptor_log_.reset(
4402 new log::Writer(std::move(file_writer), 0, false));
4403 s = WriteCurrentStateToManifest(curr_state, wal_additions,
4404 descriptor_log_.get(), io_s);
4405 } else {
4406 manifest_io_status = io_s;
4407 s = io_s;
4408 }
4409 }
4410
4411 if (s.ok()) {
4412 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4413 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4414 versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
4415 }
4416 }
4417
4418 // Write new records to MANIFEST log
4419 #ifndef NDEBUG
4420 size_t idx = 0;
4421 #endif
4422 for (auto& e : batch_edits) {
4423 std::string record;
4424 if (!e->EncodeTo(&record)) {
4425 s = Status::Corruption("Unable to encode VersionEdit:" +
4426 e->DebugString(true));
4427 break;
4428 }
4429 TEST_KILL_RANDOM_WITH_WEIGHT("VersionSet::LogAndApply:BeforeAddRecord",
4430 REDUCE_ODDS2);
4431 #ifndef NDEBUG
4432 if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
4433 TEST_SYNC_POINT_CALLBACK(
4434 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
4435 nullptr);
4436 TEST_SYNC_POINT(
4437 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
4438 }
4439 ++idx;
4440 #endif /* !NDEBUG */
4441 io_s = descriptor_log_->AddRecord(record);
4442 if (!io_s.ok()) {
4443 s = io_s;
4444 manifest_io_status = io_s;
4445 break;
4446 }
4447 }
4448 if (s.ok()) {
4449 io_s = SyncManifest(db_options_, descriptor_log_->file());
4450 manifest_io_status = io_s;
4451 TEST_SYNC_POINT_CALLBACK(
4452 "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
4453 }
4454 if (!io_s.ok()) {
4455 s = io_s;
4456 ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
4457 s.ToString().c_str());
4458 }
4459 }
4460
4461 // If we just created a new descriptor file, install it by writing a
4462 // new CURRENT file that points to it.
4463 if (s.ok()) {
4464 assert(manifest_io_status.ok());
4465 }
4466 if (s.ok() && new_descriptor_log) {
4467 io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
4468 db_directory);
4469 if (!io_s.ok()) {
4470 s = io_s;
4471 }
4472 }
4473
4474 if (s.ok()) {
4475 // find offset in manifest file where this version is stored.
4476 new_manifest_file_size = descriptor_log_->file()->GetFileSize();
4477 }
4478
4479 if (first_writer.edit_list.front()->is_column_family_drop_) {
4480 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
4481 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
4482 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
4483 }
4484
4485 LogFlush(db_options_->info_log);
4486 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
4487 mu->Lock();
4488 }
4489
4490 if (s.ok()) {
4491 // Apply WAL edits, DB mutex must be held.
4492 for (auto& e : batch_edits) {
4493 if (e->IsWalAddition()) {
4494 s = wals_.AddWals(e->GetWalAdditions());
4495 } else if (e->IsWalDeletion()) {
4496 s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
4497 }
4498 if (!s.ok()) {
4499 break;
4500 }
4501 }
4502 }
4503
4504 if (!io_s.ok()) {
4505 if (io_status_.ok()) {
4506 io_status_ = io_s;
4507 }
4508 } else if (!io_status_.ok()) {
4509 io_status_ = io_s;
4510 }
4511
4512 // Append the old manifest file to the obsolete_manifest_ list to be deleted
4513 // by PurgeObsoleteFiles later.
4514 if (s.ok() && new_descriptor_log) {
4515 obsolete_manifests_.emplace_back(
4516 DescriptorFileName("", manifest_file_number_));
4517 }
4518
4519 // Install the new versions
4520 if (s.ok()) {
4521 if (first_writer.edit_list.front()->is_column_family_add_) {
4522 assert(batch_edits.size() == 1);
4523 assert(new_cf_options != nullptr);
4524 CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
4525 } else if (first_writer.edit_list.front()->is_column_family_drop_) {
4526 assert(batch_edits.size() == 1);
4527 first_writer.cfd->SetDropped();
4528 first_writer.cfd->UnrefAndTryDelete();
4529 } else {
4530 // Each version in versions corresponds to a column family.
4531 // For each column family, update its log number indicating that logs
4532 // with number smaller than this should be ignored.
4533 uint64_t last_min_log_number_to_keep = 0;
4534 for (const auto& e : batch_edits) {
4535 ColumnFamilyData* cfd = nullptr;
4536 if (!e->IsColumnFamilyManipulation()) {
4537 cfd = column_family_set_->GetColumnFamily(e->column_family_);
4538 // e would not have been added to batch_edits if its corresponding
4539 // column family is dropped.
4540 assert(cfd);
4541 }
4542 if (cfd) {
4543 if (e->has_log_number_ && e->log_number_ > cfd->GetLogNumber()) {
4544 cfd->SetLogNumber(e->log_number_);
4545 }
4546 if (e->HasFullHistoryTsLow()) {
4547 cfd->SetFullHistoryTsLow(e->GetFullHistoryTsLow());
4548 }
4549 }
4550 if (e->has_min_log_number_to_keep_) {
4551 last_min_log_number_to_keep =
4552 std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
4553 }
4554 }
4555
4556 if (last_min_log_number_to_keep != 0) {
4557 // Should only be set in 2PC mode.
4558 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
4559 }
4560
4561 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4562 ColumnFamilyData* cfd = versions[i]->cfd_;
4563 AppendVersion(cfd, versions[i]);
4564 }
4565 }
4566 manifest_file_number_ = pending_manifest_file_number_;
4567 manifest_file_size_ = new_manifest_file_size;
4568 prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
4569 } else {
4570 std::string version_edits;
4571 for (auto& e : batch_edits) {
4572 version_edits += ("\n" + e->DebugString(true));
4573 }
4574 ROCKS_LOG_ERROR(db_options_->info_log,
4575 "Error in committing version edit to MANIFEST: %s",
4576 version_edits.c_str());
4577 for (auto v : versions) {
4578 delete v;
4579 }
4580 if (manifest_io_status.ok()) {
4581 manifest_file_number_ = pending_manifest_file_number_;
4582 manifest_file_size_ = new_manifest_file_size;
4583 }
4584 // If manifest append failed for whatever reason, the file could be
4585 // corrupted. So we need to force the next version update to start a
4586 // new manifest file.
4587 descriptor_log_.reset();
4588 // If manifest operations failed, then we know the CURRENT file still
4589 // points to the original MANIFEST. Therefore, we can safely delete the
4590 // new MANIFEST.
4591 // If manifest operations succeeded, and we are here, then it is possible
4592 // that renaming tmp file to CURRENT failed.
4593 //
4594 // On local POSIX-compliant FS, the CURRENT must point to the original
4595 // MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
4596 // keep it. Future recovery will ignore this MANIFEST. It's also ok for the
4597 // process not to crash and continue using the db. Any future LogAndApply()
4598 // call will switch to a new MANIFEST and update CURRENT, still ignoring
4599 // this one.
4600 //
4601 // On non-local FS, it is
4602 // possible that the rename operation succeeded on the server (remote)
4603 // side, but the client somehow returns a non-ok status to RocksDB. Note
4604 // that this does not violate atomicity. Should we delete the new MANIFEST
4605 // successfully, a subsequent recovery attempt will likely see the CURRENT
4606 // pointing to the new MANIFEST, thus fail. We will not be able to open the
4607 // DB again. Therefore, if manifest operations succeed, we should keep the
4608 // the new MANIFEST. If the process proceeds, any future LogAndApply() call
4609 // will switch to a new MANIFEST and update CURRENT. If user tries to
4610 // re-open the DB,
4611 // a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
4612 // b) CURRENT points to the original MANIFEST, and the original MANIFEST
4613 // also exists.
4614 if (new_descriptor_log && !manifest_io_status.ok()) {
4615 ROCKS_LOG_INFO(db_options_->info_log,
4616 "Deleting manifest %" PRIu64 " current manifest %" PRIu64
4617 "\n",
4618 pending_manifest_file_number_, manifest_file_number_);
4619 Status manifest_del_status = env_->DeleteFile(
4620 DescriptorFileName(dbname_, pending_manifest_file_number_));
4621 if (!manifest_del_status.ok()) {
4622 ROCKS_LOG_WARN(db_options_->info_log,
4623 "Failed to delete manifest %" PRIu64 ": %s",
4624 pending_manifest_file_number_,
4625 manifest_del_status.ToString().c_str());
4626 }
4627 }
4628 }
4629
4630 pending_manifest_file_number_ = 0;
4631
4632 // wake up all the waiting writers
4633 while (true) {
4634 ManifestWriter* ready = manifest_writers_.front();
4635 manifest_writers_.pop_front();
4636 bool need_signal = true;
4637 for (const auto& w : writers) {
4638 if (&w == ready) {
4639 need_signal = false;
4640 break;
4641 }
4642 }
4643 ready->status = s;
4644 ready->done = true;
4645 if (ready->manifest_write_callback) {
4646 (ready->manifest_write_callback)(s);
4647 }
4648 if (need_signal) {
4649 ready->cv.Signal();
4650 }
4651 if (ready == last_writer) {
4652 break;
4653 }
4654 }
4655 if (!manifest_writers_.empty()) {
4656 manifest_writers_.front()->cv.Signal();
4657 }
4658 return s;
4659 }
4660
WakeUpWaitingManifestWriters()4661 void VersionSet::WakeUpWaitingManifestWriters() {
4662 // wake up all the waiting writers
4663 // Notify new head of manifest write queue.
4664 if (!manifest_writers_.empty()) {
4665 manifest_writers_.front()->cv.Signal();
4666 }
4667 }
4668
4669 // 'datas' is grammatically incorrect. We still use this notation to indicate
4670 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options,const std::vector<std::function<void (const Status &)>> & manifest_wcbs)4671 Status VersionSet::LogAndApply(
4672 const autovector<ColumnFamilyData*>& column_family_datas,
4673 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4674 const autovector<autovector<VersionEdit*>>& edit_lists,
4675 InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4676 const ColumnFamilyOptions* new_cf_options,
4677 const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
4678 mu->AssertHeld();
4679 int num_edits = 0;
4680 for (const auto& elist : edit_lists) {
4681 num_edits += static_cast<int>(elist.size());
4682 }
4683 if (num_edits == 0) {
4684 return Status::OK();
4685 } else if (num_edits > 1) {
4686 #ifndef NDEBUG
4687 for (const auto& edit_list : edit_lists) {
4688 for (const auto& edit : edit_list) {
4689 assert(!edit->IsColumnFamilyManipulation());
4690 }
4691 }
4692 #endif /* ! NDEBUG */
4693 }
4694
4695 int num_cfds = static_cast<int>(column_family_datas.size());
4696 if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4697 assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4698 assert(edit_lists[0][0]->is_column_family_add_);
4699 assert(new_cf_options != nullptr);
4700 }
4701 std::deque<ManifestWriter> writers;
4702 if (num_cfds > 0) {
4703 assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4704 assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4705 }
4706 for (int i = 0; i < num_cfds; ++i) {
4707 const auto wcb =
4708 manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
4709 writers.emplace_back(mu, column_family_datas[i],
4710 *mutable_cf_options_list[i], edit_lists[i], wcb);
4711 manifest_writers_.push_back(&writers[i]);
4712 }
4713 assert(!writers.empty());
4714 ManifestWriter& first_writer = writers.front();
4715 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
4716 nullptr);
4717 while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4718 first_writer.cv.Wait();
4719 }
4720 if (first_writer.done) {
4721 // All non-CF-manipulation operations can be grouped together and committed
4722 // to MANIFEST. They should all have finished. The status code is stored in
4723 // the first manifest writer.
4724 #ifndef NDEBUG
4725 for (const auto& writer : writers) {
4726 assert(writer.done);
4727 }
4728 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
4729 #endif /* !NDEBUG */
4730 return first_writer.status;
4731 }
4732
4733 int num_undropped_cfds = 0;
4734 for (auto cfd : column_family_datas) {
4735 // if cfd == nullptr, it is a column family add.
4736 if (cfd == nullptr || !cfd->IsDropped()) {
4737 ++num_undropped_cfds;
4738 }
4739 }
4740 if (0 == num_undropped_cfds) {
4741 for (int i = 0; i != num_cfds; ++i) {
4742 manifest_writers_.pop_front();
4743 }
4744 // Notify new head of manifest write queue.
4745 if (!manifest_writers_.empty()) {
4746 manifest_writers_.front()->cv.Signal();
4747 }
4748 return Status::ColumnFamilyDropped();
4749 }
4750
4751 return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4752 new_cf_options);
4753 }
4754
LogAndApplyCFHelper(VersionEdit * edit)4755 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4756 assert(edit->IsColumnFamilyManipulation());
4757 edit->SetNextFile(next_file_number_.load());
4758 // The log might have data that is not visible to memtbale and hence have not
4759 // updated the last_sequence_ yet. It is also possible that the log has is
4760 // expecting some new data that is not written yet. Since LastSequence is an
4761 // upper bound on the sequence, it is ok to record
4762 // last_allocated_sequence_ as the last sequence.
4763 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4764 : last_sequence_);
4765 if (edit->is_column_family_drop_) {
4766 // if we drop column family, we have to make sure to save max column family,
4767 // so that we don't reuse existing ID
4768 edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4769 }
4770 }
4771
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4772 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4773 VersionBuilder* builder, VersionEdit* edit,
4774 InstrumentedMutex* mu) {
4775 #ifdef NDEBUG
4776 (void)cfd;
4777 #endif
4778 mu->AssertHeld();
4779 assert(!edit->IsColumnFamilyManipulation());
4780
4781 if (edit->has_log_number_) {
4782 assert(edit->log_number_ >= cfd->GetLogNumber());
4783 assert(edit->log_number_ < next_file_number_.load());
4784 }
4785
4786 if (!edit->has_prev_log_number_) {
4787 edit->SetPrevLogNumber(prev_log_number_);
4788 }
4789 edit->SetNextFile(next_file_number_.load());
4790 // The log might have data that is not visible to memtbale and hence have not
4791 // updated the last_sequence_ yet. It is also possible that the log has is
4792 // expecting some new data that is not written yet. Since LastSequence is an
4793 // upper bound on the sequence, it is ok to record
4794 // last_allocated_sequence_ as the last sequence.
4795 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4796 : last_sequence_);
4797
4798 // The builder can be nullptr only if edit is WAL manipulation,
4799 // because WAL edits do not need to be applied to versions,
4800 // we return Status::OK() in this case.
4801 assert(builder || edit->IsWalManipulation());
4802 return builder ? builder->Apply(edit) : Status::OK();
4803 }
4804
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4805 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4806 FileSystem* fs,
4807 std::string* manifest_path,
4808 uint64_t* manifest_file_number) {
4809 assert(fs != nullptr);
4810 assert(manifest_path != nullptr);
4811 assert(manifest_file_number != nullptr);
4812
4813 std::string fname;
4814 Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4815 if (!s.ok()) {
4816 return s;
4817 }
4818 if (fname.empty() || fname.back() != '\n') {
4819 return Status::Corruption("CURRENT file does not end with newline");
4820 }
4821 // remove the trailing '\n'
4822 fname.resize(fname.size() - 1);
4823 FileType type;
4824 bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4825 if (!parse_ok || type != kDescriptorFile) {
4826 return Status::Corruption("CURRENT file corrupted");
4827 }
4828 *manifest_path = dbname;
4829 if (dbname.back() != '/') {
4830 manifest_path->push_back('/');
4831 }
4832 manifest_path->append(fname);
4833 return Status::OK();
4834 }
4835
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4836 Status VersionSet::Recover(
4837 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4838 std::string* db_id) {
4839 // Read "CURRENT" file, which contains a pointer to the current manifest file
4840 std::string manifest_path;
4841 Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
4842 &manifest_file_number_);
4843 if (!s.ok()) {
4844 return s;
4845 }
4846
4847 ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4848 manifest_path.c_str());
4849
4850 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4851 {
4852 std::unique_ptr<FSSequentialFile> manifest_file;
4853 s = fs_->NewSequentialFile(manifest_path,
4854 fs_->OptimizeForManifestRead(file_options_),
4855 &manifest_file, nullptr);
4856 if (!s.ok()) {
4857 return s;
4858 }
4859 manifest_file_reader.reset(new SequentialFileReader(
4860 std::move(manifest_file), manifest_path,
4861 db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
4862 }
4863 uint64_t current_manifest_file_size = 0;
4864 uint64_t log_number = 0;
4865 {
4866 VersionSet::LogReporter reporter;
4867 Status log_read_status;
4868 reporter.status = &log_read_status;
4869 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4870 true /* checksum */, 0 /* log_number */);
4871 VersionEditHandler handler(read_only, column_families,
4872 const_cast<VersionSet*>(this),
4873 /*track_missing_files=*/false,
4874 /*no_error_if_files_missing=*/false, io_tracer_);
4875 handler.Iterate(reader, &log_read_status);
4876 s = handler.status();
4877 if (s.ok()) {
4878 log_number = handler.GetVersionEditParams().log_number_;
4879 current_manifest_file_size = reader.GetReadOffset();
4880 assert(current_manifest_file_size != 0);
4881 handler.GetDbId(db_id);
4882 }
4883 }
4884
4885 if (s.ok()) {
4886 manifest_file_size_ = current_manifest_file_size;
4887 ROCKS_LOG_INFO(
4888 db_options_->info_log,
4889 "Recovered from manifest file:%s succeeded,"
4890 "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4891 ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4892 ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4893 ",min_log_number_to_keep is %" PRIu64 "\n",
4894 manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4895 last_sequence_.load(), log_number, prev_log_number_,
4896 column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
4897
4898 for (auto cfd : *column_family_set_) {
4899 if (cfd->IsDropped()) {
4900 continue;
4901 }
4902 ROCKS_LOG_INFO(db_options_->info_log,
4903 "Column family [%s] (ID %" PRIu32
4904 "), log number is %" PRIu64 "\n",
4905 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4906 }
4907 }
4908
4909 return s;
4910 }
4911
4912 namespace {
4913 class ManifestPicker {
4914 public:
4915 explicit ManifestPicker(const std::string& dbname,
4916 const std::vector<std::string>& files_in_dbname);
4917 // REQUIRES Valid() == true
4918 std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
Valid() const4919 bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
4920
4921 private:
4922 const std::string& dbname_;
4923 // MANIFEST file names(s)
4924 std::vector<std::string> manifest_files_;
4925 std::vector<std::string>::const_iterator manifest_file_iter_;
4926 };
4927
ManifestPicker(const std::string & dbname,const std::vector<std::string> & files_in_dbname)4928 ManifestPicker::ManifestPicker(const std::string& dbname,
4929 const std::vector<std::string>& files_in_dbname)
4930 : dbname_(dbname) {
4931 // populate manifest files
4932 assert(!files_in_dbname.empty());
4933 for (const auto& fname : files_in_dbname) {
4934 uint64_t file_num = 0;
4935 FileType file_type;
4936 bool parse_ok = ParseFileName(fname, &file_num, &file_type);
4937 if (parse_ok && file_type == kDescriptorFile) {
4938 manifest_files_.push_back(fname);
4939 }
4940 }
4941 // seek to first manifest
4942 std::sort(manifest_files_.begin(), manifest_files_.end(),
4943 [](const std::string& lhs, const std::string& rhs) {
4944 uint64_t num1 = 0;
4945 uint64_t num2 = 0;
4946 FileType type1;
4947 FileType type2;
4948 bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
4949 bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
4950 #ifndef NDEBUG
4951 assert(parse_ok1);
4952 assert(parse_ok2);
4953 #else
4954 (void)parse_ok1;
4955 (void)parse_ok2;
4956 #endif
4957 return num1 > num2;
4958 });
4959 manifest_file_iter_ = manifest_files_.begin();
4960 }
4961
GetNextManifest(uint64_t * number,std::string * file_name)4962 std::string ManifestPicker::GetNextManifest(uint64_t* number,
4963 std::string* file_name) {
4964 assert(Valid());
4965 std::string ret;
4966 if (manifest_file_iter_ != manifest_files_.end()) {
4967 ret.assign(dbname_);
4968 if (ret.back() != kFilePathSeparator) {
4969 ret.push_back(kFilePathSeparator);
4970 }
4971 ret.append(*manifest_file_iter_);
4972 if (number) {
4973 FileType type;
4974 bool parse = ParseFileName(*manifest_file_iter_, number, &type);
4975 assert(type == kDescriptorFile);
4976 #ifndef NDEBUG
4977 assert(parse);
4978 #else
4979 (void)parse;
4980 #endif
4981 }
4982 if (file_name) {
4983 *file_name = *manifest_file_iter_;
4984 }
4985 ++manifest_file_iter_;
4986 }
4987 return ret;
4988 }
4989 } // namespace
4990
TryRecover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,const std::vector<std::string> & files_in_dbname,std::string * db_id,bool * has_missing_table_file)4991 Status VersionSet::TryRecover(
4992 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4993 const std::vector<std::string>& files_in_dbname, std::string* db_id,
4994 bool* has_missing_table_file) {
4995 ManifestPicker manifest_picker(dbname_, files_in_dbname);
4996 if (!manifest_picker.Valid()) {
4997 return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
4998 }
4999 Status s;
5000 std::string manifest_path =
5001 manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
5002 while (!manifest_path.empty()) {
5003 s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
5004 db_id, has_missing_table_file);
5005 if (s.ok() || !manifest_picker.Valid()) {
5006 break;
5007 }
5008 Reset();
5009 manifest_path =
5010 manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
5011 }
5012 return s;
5013 }
5014
TryRecoverFromOneManifest(const std::string & manifest_path,const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)5015 Status VersionSet::TryRecoverFromOneManifest(
5016 const std::string& manifest_path,
5017 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
5018 std::string* db_id, bool* has_missing_table_file) {
5019 ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
5020 manifest_path.c_str());
5021 std::unique_ptr<SequentialFileReader> manifest_file_reader;
5022 Status s;
5023 {
5024 std::unique_ptr<FSSequentialFile> manifest_file;
5025 s = fs_->NewSequentialFile(manifest_path,
5026 fs_->OptimizeForManifestRead(file_options_),
5027 &manifest_file, nullptr);
5028 if (!s.ok()) {
5029 return s;
5030 }
5031 manifest_file_reader.reset(new SequentialFileReader(
5032 std::move(manifest_file), manifest_path,
5033 db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
5034 }
5035
5036 assert(s.ok());
5037 VersionSet::LogReporter reporter;
5038 reporter.status = &s;
5039 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
5040 /*checksum=*/true, /*log_num=*/0);
5041 VersionEditHandlerPointInTime handler_pit(
5042 read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
5043
5044 handler_pit.Iterate(reader, &s);
5045
5046 handler_pit.GetDbId(db_id);
5047
5048 assert(nullptr != has_missing_table_file);
5049 *has_missing_table_file = handler_pit.HasMissingFiles();
5050
5051 return handler_pit.status();
5052 }
5053
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)5054 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
5055 const std::string& dbname,
5056 FileSystem* fs) {
5057 // these are just for performance reasons, not correctness,
5058 // so we're fine using the defaults
5059 FileOptions soptions;
5060 // Read "CURRENT" file, which contains a pointer to the current manifest file
5061 std::string manifest_path;
5062 uint64_t manifest_file_number;
5063 Status s =
5064 GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
5065 if (!s.ok()) {
5066 return s;
5067 }
5068
5069 std::unique_ptr<SequentialFileReader> file_reader;
5070 {
5071 std::unique_ptr<FSSequentialFile> file;
5072 s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
5073 if (!s.ok()) {
5074 return s;
5075 }
5076 file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
5077 nullptr /*IOTracer*/));
5078 }
5079
5080 VersionSet::LogReporter reporter;
5081 reporter.status = &s;
5082 log::Reader reader(nullptr, std::move(file_reader), &reporter,
5083 true /* checksum */, 0 /* log_number */);
5084
5085 ListColumnFamiliesHandler handler;
5086 handler.Iterate(reader, &s);
5087
5088 assert(column_families);
5089 column_families->clear();
5090 if (handler.status().ok()) {
5091 for (const auto& iter : handler.GetColumnFamilyNames()) {
5092 column_families->push_back(iter.second);
5093 }
5094 }
5095
5096 return handler.status();
5097 }
5098
5099 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)5100 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
5101 const Options* options,
5102 const FileOptions& file_options,
5103 int new_levels) {
5104 if (new_levels <= 1) {
5105 return Status::InvalidArgument(
5106 "Number of levels needs to be bigger than 1");
5107 }
5108
5109 ImmutableDBOptions db_options(*options);
5110 ColumnFamilyOptions cf_options(*options);
5111 std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
5112 options->table_cache_numshardbits));
5113 WriteController wc(options->delayed_write_rate);
5114 WriteBufferManager wb(options->db_write_buffer_size);
5115 VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
5116 nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/,
5117 /*db_session_id*/ "");
5118 Status status;
5119
5120 std::vector<ColumnFamilyDescriptor> dummy;
5121 ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
5122 ColumnFamilyOptions(*options));
5123 dummy.push_back(dummy_descriptor);
5124 status = versions.Recover(dummy);
5125 if (!status.ok()) {
5126 return status;
5127 }
5128
5129 Version* current_version =
5130 versions.GetColumnFamilySet()->GetDefault()->current();
5131 auto* vstorage = current_version->storage_info();
5132 int current_levels = vstorage->num_levels();
5133
5134 if (current_levels <= new_levels) {
5135 return Status::OK();
5136 }
5137
5138 // Make sure there are file only on one level from
5139 // (new_levels-1) to (current_levels-1)
5140 int first_nonempty_level = -1;
5141 int first_nonempty_level_filenum = 0;
5142 for (int i = new_levels - 1; i < current_levels; i++) {
5143 int file_num = vstorage->NumLevelFiles(i);
5144 if (file_num != 0) {
5145 if (first_nonempty_level < 0) {
5146 first_nonempty_level = i;
5147 first_nonempty_level_filenum = file_num;
5148 } else {
5149 char msg[255];
5150 snprintf(msg, sizeof(msg),
5151 "Found at least two levels containing files: "
5152 "[%d:%d],[%d:%d].\n",
5153 first_nonempty_level, first_nonempty_level_filenum, i,
5154 file_num);
5155 return Status::InvalidArgument(msg);
5156 }
5157 }
5158 }
5159
5160 // we need to allocate an array with the old number of levels size to
5161 // avoid SIGSEGV in WriteCurrentStatetoManifest()
5162 // however, all levels bigger or equal to new_levels will be empty
5163 std::vector<FileMetaData*>* new_files_list =
5164 new std::vector<FileMetaData*>[current_levels];
5165 for (int i = 0; i < new_levels - 1; i++) {
5166 new_files_list[i] = vstorage->LevelFiles(i);
5167 }
5168
5169 if (first_nonempty_level > 0) {
5170 auto& new_last_level = new_files_list[new_levels - 1];
5171
5172 new_last_level = vstorage->LevelFiles(first_nonempty_level);
5173
5174 for (size_t i = 0; i < new_last_level.size(); ++i) {
5175 const FileMetaData* const meta = new_last_level[i];
5176 assert(meta);
5177
5178 const uint64_t file_number = meta->fd.GetNumber();
5179
5180 vstorage->file_locations_[file_number] =
5181 VersionStorageInfo::FileLocation(new_levels - 1, i);
5182 }
5183 }
5184
5185 delete[] vstorage -> files_;
5186 vstorage->files_ = new_files_list;
5187 vstorage->num_levels_ = new_levels;
5188
5189 MutableCFOptions mutable_cf_options(*options);
5190 VersionEdit ve;
5191 InstrumentedMutex dummy_mutex;
5192 InstrumentedMutexLock l(&dummy_mutex);
5193 return versions.LogAndApply(
5194 versions.GetColumnFamilySet()->GetDefault(),
5195 mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
5196 }
5197
5198 // Get the checksum information including the checksum and checksum function
5199 // name of all SST and blob files in VersionSet. Store the information in
5200 // FileChecksumList which contains a map from file number to its checksum info.
5201 // If DB is not running, make sure call VersionSet::Recover() to load the file
5202 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)5203 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
5204 // Clean the previously stored checksum information if any.
5205 Status s;
5206 if (checksum_list == nullptr) {
5207 s = Status::InvalidArgument("checksum_list is nullptr");
5208 return s;
5209 }
5210 checksum_list->reset();
5211
5212 for (auto cfd : *column_family_set_) {
5213 if (cfd->IsDropped() || !cfd->initialized()) {
5214 continue;
5215 }
5216 /* SST files */
5217 for (int level = 0; level < cfd->NumberLevels(); level++) {
5218 for (const auto& file :
5219 cfd->current()->storage_info()->LevelFiles(level)) {
5220 s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
5221 file->file_checksum,
5222 file->file_checksum_func_name);
5223 if (!s.ok()) {
5224 return s;
5225 }
5226 }
5227 }
5228
5229 /* Blob files */
5230 const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
5231 for (const auto& pair : blob_files) {
5232 const uint64_t blob_file_number = pair.first;
5233 const auto& meta = pair.second;
5234
5235 assert(meta);
5236 assert(blob_file_number == meta->GetBlobFileNumber());
5237
5238 std::string checksum_value = meta->GetChecksumValue();
5239 std::string checksum_method = meta->GetChecksumMethod();
5240 assert(checksum_value.empty() == checksum_method.empty());
5241 if (meta->GetChecksumMethod().empty()) {
5242 checksum_value = kUnknownFileChecksum;
5243 checksum_method = kUnknownFileChecksumFuncName;
5244 }
5245
5246 s = checksum_list->InsertOneFileChecksum(blob_file_number, checksum_value,
5247 checksum_method);
5248 if (!s.ok()) {
5249 return s;
5250 }
5251 }
5252 }
5253
5254 return s;
5255 }
5256
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)5257 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
5258 bool verbose, bool hex, bool json) {
5259 // Open the specified manifest file.
5260 std::unique_ptr<SequentialFileReader> file_reader;
5261 Status s;
5262 {
5263 std::unique_ptr<FSSequentialFile> file;
5264 const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
5265 s = fs->NewSequentialFile(
5266 dscname,
5267 fs->OptimizeForManifestRead(file_options_), &file,
5268 nullptr);
5269 if (!s.ok()) {
5270 return s;
5271 }
5272 file_reader.reset(new SequentialFileReader(
5273 std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
5274 }
5275
5276 std::vector<ColumnFamilyDescriptor> column_families(
5277 1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
5278 DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
5279 json);
5280 {
5281 VersionSet::LogReporter reporter;
5282 reporter.status = &s;
5283 log::Reader reader(nullptr, std::move(file_reader), &reporter,
5284 true /* checksum */, 0 /* log_number */);
5285 handler.Iterate(reader, &s);
5286 }
5287
5288 return handler.status();
5289 }
5290 #endif // ROCKSDB_LITE
5291
MarkFileNumberUsed(uint64_t number)5292 void VersionSet::MarkFileNumberUsed(uint64_t number) {
5293 // only called during recovery and repair which are single threaded, so this
5294 // works because there can't be concurrent calls
5295 if (next_file_number_.load(std::memory_order_relaxed) <= number) {
5296 next_file_number_.store(number + 1, std::memory_order_relaxed);
5297 }
5298 }
5299 // Called only either from ::LogAndApply which is protected by mutex or during
5300 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)5301 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
5302 if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
5303 min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
5304 }
5305 }
5306
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,const VersionEdit & wal_additions,log::Writer * log,IOStatus & io_s)5307 Status VersionSet::WriteCurrentStateToManifest(
5308 const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5309 const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
5310 // TODO: Break up into multiple records to reduce memory usage on recovery?
5311
5312 // WARNING: This method doesn't hold a mutex!!
5313
5314 // This is done without DB mutex lock held, but only within single-threaded
5315 // LogAndApply. Column family manipulations can only happen within LogAndApply
5316 // (the same single thread), so we're safe to iterate.
5317
5318 assert(io_s.ok());
5319 if (db_options_->write_dbid_to_manifest) {
5320 VersionEdit edit_for_db_id;
5321 assert(!db_id_.empty());
5322 edit_for_db_id.SetDBId(db_id_);
5323 std::string db_id_record;
5324 if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5325 return Status::Corruption("Unable to Encode VersionEdit:" +
5326 edit_for_db_id.DebugString(true));
5327 }
5328 io_s = log->AddRecord(db_id_record);
5329 if (!io_s.ok()) {
5330 return io_s;
5331 }
5332 }
5333
5334 // Save WALs.
5335 if (!wal_additions.GetWalAdditions().empty()) {
5336 TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
5337 const_cast<VersionEdit*>(&wal_additions));
5338 std::string record;
5339 if (!wal_additions.EncodeTo(&record)) {
5340 return Status::Corruption("Unable to Encode VersionEdit: " +
5341 wal_additions.DebugString(true));
5342 }
5343 io_s = log->AddRecord(record);
5344 if (!io_s.ok()) {
5345 return io_s;
5346 }
5347 }
5348
5349 for (auto cfd : *column_family_set_) {
5350 assert(cfd);
5351
5352 if (cfd->IsDropped()) {
5353 continue;
5354 }
5355 assert(cfd->initialized());
5356 {
5357 // Store column family info
5358 VersionEdit edit;
5359 if (cfd->GetID() != 0) {
5360 // default column family is always there,
5361 // no need to explicitly write it
5362 edit.AddColumnFamily(cfd->GetName());
5363 edit.SetColumnFamily(cfd->GetID());
5364 }
5365 edit.SetComparatorName(
5366 cfd->internal_comparator().user_comparator()->Name());
5367 std::string record;
5368 if (!edit.EncodeTo(&record)) {
5369 return Status::Corruption(
5370 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5371 }
5372 io_s = log->AddRecord(record);
5373 if (!io_s.ok()) {
5374 return io_s;
5375 }
5376 }
5377
5378 {
5379 // Save files
5380 VersionEdit edit;
5381 edit.SetColumnFamily(cfd->GetID());
5382
5383 assert(cfd->current());
5384 assert(cfd->current()->storage_info());
5385
5386 for (int level = 0; level < cfd->NumberLevels(); level++) {
5387 for (const auto& f :
5388 cfd->current()->storage_info()->LevelFiles(level)) {
5389 edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5390 f->fd.GetFileSize(), f->smallest, f->largest,
5391 f->fd.smallest_seqno, f->fd.largest_seqno,
5392 f->marked_for_compaction, f->oldest_blob_file_number,
5393 f->oldest_ancester_time, f->file_creation_time,
5394 f->file_checksum, f->file_checksum_func_name);
5395 }
5396 }
5397
5398 const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
5399 for (const auto& pair : blob_files) {
5400 const uint64_t blob_file_number = pair.first;
5401 const auto& meta = pair.second;
5402
5403 assert(meta);
5404 assert(blob_file_number == meta->GetBlobFileNumber());
5405
5406 edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
5407 meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
5408 meta->GetChecksumValue());
5409 if (meta->GetGarbageBlobCount() > 0) {
5410 edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
5411 meta->GetGarbageBlobBytes());
5412 }
5413 }
5414
5415 const auto iter = curr_state.find(cfd->GetID());
5416 assert(iter != curr_state.end());
5417 uint64_t log_number = iter->second.log_number;
5418 edit.SetLogNumber(log_number);
5419
5420 if (cfd->GetID() == 0) {
5421 // min_log_number_to_keep is for the whole db, not for specific column family.
5422 // So it does not need to be set for every column family, just need to be set once.
5423 // Since default CF can never be dropped, we set the min_log to the default CF here.
5424 uint64_t min_log = min_log_number_to_keep_2pc();
5425 if (min_log != 0) {
5426 edit.SetMinLogNumberToKeep(min_log);
5427 }
5428 }
5429
5430 const std::string& full_history_ts_low = iter->second.full_history_ts_low;
5431 if (!full_history_ts_low.empty()) {
5432 edit.SetFullHistoryTsLow(full_history_ts_low);
5433 }
5434 std::string record;
5435 if (!edit.EncodeTo(&record)) {
5436 return Status::Corruption(
5437 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5438 }
5439 io_s = log->AddRecord(record);
5440 if (!io_s.ok()) {
5441 return io_s;
5442 }
5443 }
5444 }
5445 return Status::OK();
5446 }
5447
5448 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5449 // function is called repeatedly with consecutive pairs of slices. For example
5450 // if the slice list is [a, b, c, d] this function is called with arguments
5451 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5452 // we avoid doing binary search for the keys b and c twice and instead somehow
5453 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5454 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5455 Version* v, const Slice& start,
5456 const Slice& end, int start_level,
5457 int end_level, TableReaderCaller caller) {
5458 const auto& icmp = v->cfd_->internal_comparator();
5459
5460 // pre-condition
5461 assert(icmp.Compare(start, end) <= 0);
5462
5463 uint64_t total_full_size = 0;
5464 const auto* vstorage = v->storage_info();
5465 const int num_non_empty_levels = vstorage->num_non_empty_levels();
5466 end_level = (end_level == -1) ? num_non_empty_levels
5467 : std::min(end_level, num_non_empty_levels);
5468
5469 assert(start_level <= end_level);
5470
5471 // Outline of the optimization that uses options.files_size_error_margin.
5472 // When approximating the files total size that is used to store a keys range,
5473 // we first sum up the sizes of the files that fully fall into the range.
5474 // Then we sum up the sizes of all the files that may intersect with the range
5475 // (this includes all files in L0 as well). Then, if total_intersecting_size
5476 // is smaller than total_full_size * options.files_size_error_margin - we can
5477 // infer that the intersecting files have a sufficiently negligible
5478 // contribution to the total size, and we can approximate the storage required
5479 // for the keys in range as just half of the intersecting_files_size.
5480 // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5481 // approximation is limited to only ~10% of the total size of files that fully
5482 // fall into the keys range. In such case, this helps to avoid a costly
5483 // process of binary searching the intersecting files that is required only
5484 // for a more precise calculation of the total size.
5485
5486 autovector<FdWithKeyRange*, 32> first_files;
5487 autovector<FdWithKeyRange*, 16> last_files;
5488
5489 // scan all the levels
5490 for (int level = start_level; level < end_level; ++level) {
5491 const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5492 if (files_brief.num_files == 0) {
5493 // empty level, skip exploration
5494 continue;
5495 }
5496
5497 if (level == 0) {
5498 // level 0 files are not in sorted order, we need to iterate through
5499 // the list to compute the total bytes that require scanning,
5500 // so handle the case explicitly (similarly to first_files case)
5501 for (size_t i = 0; i < files_brief.num_files; i++) {
5502 first_files.push_back(&files_brief.files[i]);
5503 }
5504 continue;
5505 }
5506
5507 assert(level > 0);
5508 assert(files_brief.num_files > 0);
5509
5510 // identify the file position for start key
5511 const int idx_start =
5512 FindFileInRange(icmp, files_brief, start, 0,
5513 static_cast<uint32_t>(files_brief.num_files - 1));
5514 assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5515
5516 // identify the file position for end key
5517 int idx_end = idx_start;
5518 if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5519 idx_end =
5520 FindFileInRange(icmp, files_brief, end, idx_start,
5521 static_cast<uint32_t>(files_brief.num_files - 1));
5522 }
5523 assert(idx_end >= idx_start &&
5524 static_cast<size_t>(idx_end) < files_brief.num_files);
5525
5526 // scan all files from the starting index to the ending index
5527 // (inferred from the sorted order)
5528
5529 // first scan all the intermediate full files (excluding first and last)
5530 for (int i = idx_start + 1; i < idx_end; ++i) {
5531 uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5532 // The entire file falls into the range, so we can just take its size.
5533 assert(file_size ==
5534 ApproximateSize(v, files_brief.files[i], start, end, caller));
5535 total_full_size += file_size;
5536 }
5537
5538 // save the first and the last files (which may be the same file), so we
5539 // can scan them later.
5540 first_files.push_back(&files_brief.files[idx_start]);
5541 if (idx_start != idx_end) {
5542 // we need to estimate size for both files, only if they are different
5543 last_files.push_back(&files_brief.files[idx_end]);
5544 }
5545 }
5546
5547 // The sum of all file sizes that intersect the [start, end] keys range.
5548 uint64_t total_intersecting_size = 0;
5549 for (const auto* file_ptr : first_files) {
5550 total_intersecting_size += file_ptr->fd.GetFileSize();
5551 }
5552 for (const auto* file_ptr : last_files) {
5553 total_intersecting_size += file_ptr->fd.GetFileSize();
5554 }
5555
5556 // Now scan all the first & last files at each level, and estimate their size.
5557 // If the total_intersecting_size is less than X% of the total_full_size - we
5558 // want to approximate the result in order to avoid the costly binary search
5559 // inside ApproximateSize. We use half of file size as an approximation below.
5560
5561 const double margin = options.files_size_error_margin;
5562 if (margin > 0 && total_intersecting_size <
5563 static_cast<uint64_t>(total_full_size * margin)) {
5564 total_full_size += total_intersecting_size / 2;
5565 } else {
5566 // Estimate for all the first files (might also be last files), at each
5567 // level
5568 for (const auto file_ptr : first_files) {
5569 total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5570 }
5571
5572 // Estimate for all the last files, at each level
5573 for (const auto file_ptr : last_files) {
5574 // We could use ApproximateSize here, but calling ApproximateOffsetOf
5575 // directly is just more efficient.
5576 total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5577 }
5578 }
5579
5580 return total_full_size;
5581 }
5582
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5583 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5584 const Slice& key,
5585 TableReaderCaller caller) {
5586 // pre-condition
5587 assert(v);
5588 const auto& icmp = v->cfd_->internal_comparator();
5589
5590 uint64_t result = 0;
5591 if (icmp.Compare(f.largest_key, key) <= 0) {
5592 // Entire file is before "key", so just add the file size
5593 result = f.fd.GetFileSize();
5594 } else if (icmp.Compare(f.smallest_key, key) > 0) {
5595 // Entire file is after "key", so ignore
5596 result = 0;
5597 } else {
5598 // "key" falls in the range for this table. Add the
5599 // approximate offset of "key" within the table.
5600 TableCache* table_cache = v->cfd_->table_cache();
5601 if (table_cache != nullptr) {
5602 result = table_cache->ApproximateOffsetOf(
5603 key, f.file_metadata->fd, caller, icmp,
5604 v->GetMutableCFOptions().prefix_extractor.get());
5605 }
5606 }
5607 return result;
5608 }
5609
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5610 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5611 const Slice& start, const Slice& end,
5612 TableReaderCaller caller) {
5613 // pre-condition
5614 assert(v);
5615 const auto& icmp = v->cfd_->internal_comparator();
5616 assert(icmp.Compare(start, end) <= 0);
5617
5618 if (icmp.Compare(f.largest_key, start) <= 0 ||
5619 icmp.Compare(f.smallest_key, end) > 0) {
5620 // Entire file is before or after the start/end keys range
5621 return 0;
5622 }
5623
5624 if (icmp.Compare(f.smallest_key, start) >= 0) {
5625 // Start of the range is before the file start - approximate by end offset
5626 return ApproximateOffsetOf(v, f, end, caller);
5627 }
5628
5629 if (icmp.Compare(f.largest_key, end) < 0) {
5630 // End of the range is after the file end - approximate by subtracting
5631 // start offset from the file size
5632 uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5633 assert(f.fd.GetFileSize() >= start_offset);
5634 return f.fd.GetFileSize() - start_offset;
5635 }
5636
5637 // The interval falls entirely in the range for this file.
5638 TableCache* table_cache = v->cfd_->table_cache();
5639 if (table_cache == nullptr) {
5640 return 0;
5641 }
5642 return table_cache->ApproximateSize(
5643 start, end, f.file_metadata->fd, caller, icmp,
5644 v->GetMutableCFOptions().prefix_extractor.get());
5645 }
5646
AddLiveFiles(std::vector<uint64_t> * live_table_files,std::vector<uint64_t> * live_blob_files) const5647 void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
5648 std::vector<uint64_t>* live_blob_files) const {
5649 assert(live_table_files);
5650 assert(live_blob_files);
5651
5652 // pre-calculate space requirement
5653 size_t total_table_files = 0;
5654 size_t total_blob_files = 0;
5655
5656 assert(column_family_set_);
5657 for (auto cfd : *column_family_set_) {
5658 assert(cfd);
5659
5660 if (!cfd->initialized()) {
5661 continue;
5662 }
5663
5664 Version* const dummy_versions = cfd->dummy_versions();
5665 assert(dummy_versions);
5666
5667 for (Version* v = dummy_versions->next_; v != dummy_versions;
5668 v = v->next_) {
5669 assert(v);
5670
5671 const auto* vstorage = v->storage_info();
5672 assert(vstorage);
5673
5674 for (int level = 0; level < vstorage->num_levels(); ++level) {
5675 total_table_files += vstorage->LevelFiles(level).size();
5676 }
5677
5678 total_blob_files += vstorage->GetBlobFiles().size();
5679 }
5680 }
5681
5682 // just one time extension to the right size
5683 live_table_files->reserve(live_table_files->size() + total_table_files);
5684 live_blob_files->reserve(live_blob_files->size() + total_blob_files);
5685
5686 assert(column_family_set_);
5687 for (auto cfd : *column_family_set_) {
5688 assert(cfd);
5689 if (!cfd->initialized()) {
5690 continue;
5691 }
5692
5693 auto* current = cfd->current();
5694 bool found_current = false;
5695
5696 Version* const dummy_versions = cfd->dummy_versions();
5697 assert(dummy_versions);
5698
5699 for (Version* v = dummy_versions->next_; v != dummy_versions;
5700 v = v->next_) {
5701 v->AddLiveFiles(live_table_files, live_blob_files);
5702 if (v == current) {
5703 found_current = true;
5704 }
5705 }
5706
5707 if (!found_current && current != nullptr) {
5708 // Should never happen unless it is a bug.
5709 assert(false);
5710 current->AddLiveFiles(live_table_files, live_blob_files);
5711 }
5712 }
5713 }
5714
MakeInputIterator(const ReadOptions & read_options,const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5715 InternalIterator* VersionSet::MakeInputIterator(
5716 const ReadOptions& read_options, const Compaction* c,
5717 RangeDelAggregator* range_del_agg,
5718 const FileOptions& file_options_compactions) {
5719 auto cfd = c->column_family_data();
5720 // Level-0 files have to be merged together. For other levels,
5721 // we will make a concatenating iterator per level.
5722 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5723 const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5724 c->num_input_levels() - 1
5725 : c->num_input_levels());
5726 InternalIterator** list = new InternalIterator* [space];
5727 size_t num = 0;
5728 for (size_t which = 0; which < c->num_input_levels(); which++) {
5729 if (c->input_levels(which)->num_files != 0) {
5730 if (c->level(which) == 0) {
5731 const LevelFilesBrief* flevel = c->input_levels(which);
5732 for (size_t i = 0; i < flevel->num_files; i++) {
5733 list[num++] = cfd->table_cache()->NewIterator(
5734 read_options, file_options_compactions,
5735 cfd->internal_comparator(), *flevel->files[i].file_metadata,
5736 range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
5737 /*table_reader_ptr=*/nullptr,
5738 /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5739 /*arena=*/nullptr,
5740 /*skip_filters=*/false,
5741 /*level=*/static_cast<int>(c->level(which)),
5742 MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
5743 /*smallest_compaction_key=*/nullptr,
5744 /*largest_compaction_key=*/nullptr,
5745 /*allow_unprepared_value=*/false);
5746 }
5747 } else {
5748 // Create concatenating iterator for the files from this level
5749 list[num++] = new LevelIterator(
5750 cfd->table_cache(), read_options, file_options_compactions,
5751 cfd->internal_comparator(), c->input_levels(which),
5752 c->mutable_cf_options()->prefix_extractor.get(),
5753 /*should_sample=*/false,
5754 /*no per level latency histogram=*/nullptr,
5755 TableReaderCaller::kCompaction, /*skip_filters=*/false,
5756 /*level=*/static_cast<int>(c->level(which)), range_del_agg,
5757 c->boundaries(which));
5758 }
5759 }
5760 }
5761 assert(num <= space);
5762 InternalIterator* result =
5763 NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5764 static_cast<int>(num));
5765 delete[] list;
5766 return result;
5767 }
5768
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5769 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5770 FileMetaData** meta,
5771 ColumnFamilyData** cfd) {
5772 for (auto cfd_iter : *column_family_set_) {
5773 if (!cfd_iter->initialized()) {
5774 continue;
5775 }
5776 Version* version = cfd_iter->current();
5777 const auto* vstorage = version->storage_info();
5778 for (int level = 0; level < vstorage->num_levels(); level++) {
5779 for (const auto& file : vstorage->LevelFiles(level)) {
5780 if (file->fd.GetNumber() == number) {
5781 *meta = file;
5782 *filelevel = level;
5783 *cfd = cfd_iter;
5784 return Status::OK();
5785 }
5786 }
5787 }
5788 }
5789 return Status::NotFound("File not present in any level");
5790 }
5791
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5792 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5793 for (auto cfd : *column_family_set_) {
5794 if (cfd->IsDropped() || !cfd->initialized()) {
5795 continue;
5796 }
5797 for (int level = 0; level < cfd->NumberLevels(); level++) {
5798 for (const auto& file :
5799 cfd->current()->storage_info()->LevelFiles(level)) {
5800 LiveFileMetaData filemetadata;
5801 filemetadata.column_family_name = cfd->GetName();
5802 uint32_t path_id = file->fd.GetPathId();
5803 if (path_id < cfd->ioptions()->cf_paths.size()) {
5804 filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5805 } else {
5806 assert(!cfd->ioptions()->cf_paths.empty());
5807 filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5808 }
5809 const uint64_t file_number = file->fd.GetNumber();
5810 filemetadata.name = MakeTableFileName("", file_number);
5811 filemetadata.file_number = file_number;
5812 filemetadata.level = level;
5813 filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5814 filemetadata.smallestkey = file->smallest.user_key().ToString();
5815 filemetadata.largestkey = file->largest.user_key().ToString();
5816 filemetadata.smallest_seqno = file->fd.smallest_seqno;
5817 filemetadata.largest_seqno = file->fd.largest_seqno;
5818 filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5819 std::memory_order_relaxed);
5820 filemetadata.being_compacted = file->being_compacted;
5821 filemetadata.num_entries = file->num_entries;
5822 filemetadata.num_deletions = file->num_deletions;
5823 filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5824 filemetadata.file_checksum = file->file_checksum;
5825 filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5826 filemetadata.temperature = file->temperature;
5827 filemetadata.oldest_ancester_time = file->TryGetOldestAncesterTime();
5828 filemetadata.file_creation_time = file->TryGetFileCreationTime();
5829 metadata->push_back(filemetadata);
5830 }
5831 }
5832 }
5833 }
5834
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<ObsoleteBlobFileInfo> * blob_files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5835 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5836 std::vector<ObsoleteBlobFileInfo>* blob_files,
5837 std::vector<std::string>* manifest_filenames,
5838 uint64_t min_pending_output) {
5839 assert(files);
5840 assert(blob_files);
5841 assert(manifest_filenames);
5842 assert(files->empty());
5843 assert(blob_files->empty());
5844 assert(manifest_filenames->empty());
5845
5846 std::vector<ObsoleteFileInfo> pending_files;
5847 for (auto& f : obsolete_files_) {
5848 if (f.metadata->fd.GetNumber() < min_pending_output) {
5849 files->emplace_back(std::move(f));
5850 } else {
5851 pending_files.emplace_back(std::move(f));
5852 }
5853 }
5854 obsolete_files_.swap(pending_files);
5855
5856 std::vector<ObsoleteBlobFileInfo> pending_blob_files;
5857 for (auto& blob_file : obsolete_blob_files_) {
5858 if (blob_file.GetBlobFileNumber() < min_pending_output) {
5859 blob_files->emplace_back(std::move(blob_file));
5860 } else {
5861 pending_blob_files.emplace_back(std::move(blob_file));
5862 }
5863 }
5864 obsolete_blob_files_.swap(pending_blob_files);
5865
5866 obsolete_manifests_.swap(*manifest_filenames);
5867 }
5868
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const VersionEdit * edit)5869 ColumnFamilyData* VersionSet::CreateColumnFamily(
5870 const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
5871 assert(edit->is_column_family_add_);
5872
5873 MutableCFOptions dummy_cf_options;
5874 Version* dummy_versions =
5875 new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
5876 // Ref() dummy version once so that later we can call Unref() to delete it
5877 // by avoiding calling "delete" explicitly (~Version is private)
5878 dummy_versions->Ref();
5879 auto new_cfd = column_family_set_->CreateColumnFamily(
5880 edit->column_family_name_, edit->column_family_, dummy_versions,
5881 cf_options);
5882
5883 Version* v = new Version(new_cfd, this, file_options_,
5884 *new_cfd->GetLatestMutableCFOptions(), io_tracer_,
5885 current_version_number_++);
5886
5887 // Fill level target base information.
5888 v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5889 *new_cfd->GetLatestMutableCFOptions());
5890 AppendVersion(new_cfd, v);
5891 // GetLatestMutableCFOptions() is safe here without mutex since the
5892 // cfd is not available to client
5893 new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5894 LastSequence());
5895 new_cfd->SetLogNumber(edit->log_number_);
5896 return new_cfd;
5897 }
5898
GetNumLiveVersions(Version * dummy_versions)5899 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5900 uint64_t count = 0;
5901 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5902 count++;
5903 }
5904 return count;
5905 }
5906
GetTotalSstFilesSize(Version * dummy_versions)5907 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5908 std::unordered_set<uint64_t> unique_files;
5909 uint64_t total_files_size = 0;
5910 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5911 VersionStorageInfo* storage_info = v->storage_info();
5912 for (int level = 0; level < storage_info->num_levels_; level++) {
5913 for (const auto& file_meta : storage_info->LevelFiles(level)) {
5914 if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5915 unique_files.end()) {
5916 unique_files.insert(file_meta->fd.packed_number_and_path_id);
5917 total_files_size += file_meta->fd.GetFileSize();
5918 }
5919 }
5920 }
5921 }
5922 return total_files_size;
5923 }
5924
GetTotalBlobFileSize(Version * dummy_versions)5925 uint64_t VersionSet::GetTotalBlobFileSize(Version* dummy_versions) {
5926 std::unordered_set<uint64_t> unique_blob_files;
5927 uint64_t all_v_blob_file_size = 0;
5928 for (auto* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5929 // iterate all the versions
5930 auto* vstorage = v->storage_info();
5931 const auto& blob_files = vstorage->GetBlobFiles();
5932 for (const auto& pair : blob_files) {
5933 if (unique_blob_files.find(pair.first) == unique_blob_files.end()) {
5934 // find Blob file that has not been counted
5935 unique_blob_files.insert(pair.first);
5936 const auto& meta = pair.second;
5937 all_v_blob_file_size += meta->GetBlobFileSize();
5938 }
5939 }
5940 }
5941 return all_v_blob_file_size;
5942 }
5943
VerifyFileMetadata(const std::string & fpath,const FileMetaData & meta) const5944 Status VersionSet::VerifyFileMetadata(const std::string& fpath,
5945 const FileMetaData& meta) const {
5946 uint64_t fsize = 0;
5947 Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
5948 if (status.ok()) {
5949 if (fsize != meta.fd.GetFileSize()) {
5950 status = Status::Corruption("File size mismatch: " + fpath);
5951 }
5952 }
5953 return status;
5954 }
5955
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,const std::shared_ptr<IOTracer> & io_tracer)5956 ReactiveVersionSet::ReactiveVersionSet(
5957 const std::string& dbname, const ImmutableDBOptions* _db_options,
5958 const FileOptions& _file_options, Cache* table_cache,
5959 WriteBufferManager* write_buffer_manager, WriteController* write_controller,
5960 const std::shared_ptr<IOTracer>& io_tracer)
5961 : VersionSet(dbname, _db_options, _file_options, table_cache,
5962 write_buffer_manager, write_controller,
5963 /*block_cache_tracer=*/nullptr, io_tracer,
5964 /*db_session_id*/ "") {}
5965
~ReactiveVersionSet()5966 ReactiveVersionSet::~ReactiveVersionSet() {}
5967
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5968 Status ReactiveVersionSet::Recover(
5969 const std::vector<ColumnFamilyDescriptor>& column_families,
5970 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5971 std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5972 std::unique_ptr<Status>* manifest_reader_status) {
5973 assert(manifest_reader != nullptr);
5974 assert(manifest_reporter != nullptr);
5975 assert(manifest_reader_status != nullptr);
5976
5977 manifest_reader_status->reset(new Status());
5978 manifest_reporter->reset(new LogReporter());
5979 static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
5980 manifest_reader_status->get();
5981 Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5982 if (!s.ok()) {
5983 return s;
5984 }
5985 log::Reader* reader = manifest_reader->get();
5986 assert(reader);
5987
5988 manifest_tailer_.reset(new ManifestTailer(
5989 column_families, const_cast<ReactiveVersionSet*>(this), io_tracer_));
5990
5991 manifest_tailer_->Iterate(*reader, manifest_reader_status->get());
5992
5993 return manifest_tailer_->status();
5994 }
5995
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,Status * manifest_read_status,std::unordered_set<ColumnFamilyData * > * cfds_changed)5996 Status ReactiveVersionSet::ReadAndApply(
5997 InstrumentedMutex* mu,
5998 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5999 Status* manifest_read_status,
6000 std::unordered_set<ColumnFamilyData*>* cfds_changed) {
6001 assert(manifest_reader != nullptr);
6002 assert(cfds_changed != nullptr);
6003 mu->AssertHeld();
6004
6005 Status s;
6006 log::Reader* reader = manifest_reader->get();
6007 assert(reader);
6008 s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
6009 if (!s.ok()) {
6010 return s;
6011 }
6012 manifest_tailer_->Iterate(*(manifest_reader->get()), manifest_read_status);
6013 s = manifest_tailer_->status();
6014 if (s.ok()) {
6015 *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies());
6016 }
6017
6018 return s;
6019 }
6020
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)6021 Status ReactiveVersionSet::MaybeSwitchManifest(
6022 log::Reader::Reporter* reporter,
6023 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
6024 assert(manifest_reader != nullptr);
6025 Status s;
6026 std::string manifest_path;
6027 s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
6028 &manifest_file_number_);
6029 if (!s.ok()) {
6030 return s;
6031 }
6032 std::unique_ptr<FSSequentialFile> manifest_file;
6033 if (manifest_reader->get() != nullptr &&
6034 manifest_reader->get()->file()->file_name() == manifest_path) {
6035 // CURRENT points to the same MANIFEST as before, no need to switch
6036 // MANIFEST.
6037 return s;
6038 }
6039 assert(nullptr == manifest_reader->get() ||
6040 manifest_reader->get()->file()->file_name() != manifest_path);
6041 s = fs_->FileExists(manifest_path, IOOptions(), nullptr);
6042 if (s.IsNotFound()) {
6043 return Status::TryAgain(
6044 "The primary may have switched to a new MANIFEST and deleted the old "
6045 "one.");
6046 } else if (!s.ok()) {
6047 return s;
6048 }
6049 TEST_SYNC_POINT(
6050 "ReactiveVersionSet::MaybeSwitchManifest:"
6051 "AfterGetCurrentManifestPath:0");
6052 TEST_SYNC_POINT(
6053 "ReactiveVersionSet::MaybeSwitchManifest:"
6054 "AfterGetCurrentManifestPath:1");
6055 // The primary can also delete the MANIFEST while the secondary is reading
6056 // it. This is OK on POSIX. For other file systems, maybe create a hard link
6057 // to MANIFEST. The hard link should be cleaned up later by the secondary.
6058 s = fs_->NewSequentialFile(manifest_path,
6059 fs_->OptimizeForManifestRead(file_options_),
6060 &manifest_file, nullptr);
6061 std::unique_ptr<SequentialFileReader> manifest_file_reader;
6062 if (s.ok()) {
6063 manifest_file_reader.reset(new SequentialFileReader(
6064 std::move(manifest_file), manifest_path,
6065 db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
6066 manifest_reader->reset(new log::FragmentBufferedReader(
6067 nullptr, std::move(manifest_file_reader), reporter, true /* checksum */,
6068 0 /* log_number */));
6069 ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
6070 manifest_path.c_str());
6071 if (manifest_tailer_) {
6072 manifest_tailer_->PrepareToReadNewManifest();
6073 }
6074 } else if (s.IsPathNotFound()) {
6075 // This can happen if the primary switches to a new MANIFEST after the
6076 // secondary reads the CURRENT file but before the secondary actually tries
6077 // to open the MANIFEST.
6078 s = Status::TryAgain(
6079 "The primary may have switched to a new MANIFEST and deleted the old "
6080 "one.");
6081 }
6082 return s;
6083 }
6084
6085 #ifndef NDEBUG
TEST_read_edits_in_atomic_group() const6086 uint64_t ReactiveVersionSet::TEST_read_edits_in_atomic_group() const {
6087 assert(manifest_tailer_);
6088 return manifest_tailer_->GetReadBuffer().TEST_read_edits_in_atomic_group();
6089 }
6090 #endif // !NDEBUG
6091
replay_buffer()6092 std::vector<VersionEdit>& ReactiveVersionSet::replay_buffer() {
6093 assert(manifest_tailer_);
6094 return manifest_tailer_->GetReadBuffer().replay_buffer();
6095 }
6096
6097 } // namespace ROCKSDB_NAMESPACE
6098