1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11
12 #include <stdio.h>
13 #include <algorithm>
14 #include <array>
15 #include <cinttypes>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 #include "compaction/compaction.h"
23 #include "db/internal_stats.h"
24 #include "db/log_reader.h"
25 #include "db/log_writer.h"
26 #include "db/memtable.h"
27 #include "db/merge_context.h"
28 #include "db/merge_helper.h"
29 #include "db/pinned_iterators_manager.h"
30 #include "db/table_cache.h"
31 #include "db/version_builder.h"
32 #include "file/filename.h"
33 #include "file/random_access_file_reader.h"
34 #include "file/read_write_util.h"
35 #include "file/writable_file_writer.h"
36 #include "monitoring/file_read_sample.h"
37 #include "monitoring/perf_context_imp.h"
38 #include "monitoring/persistent_stats_history.h"
39 #include "rocksdb/env.h"
40 #include "rocksdb/merge_operator.h"
41 #include "rocksdb/write_buffer_manager.h"
42 #include "table/format.h"
43 #include "table/get_context.h"
44 #include "table/internal_iterator.h"
45 #include "table/merging_iterator.h"
46 #include "table/meta_blocks.h"
47 #include "table/multiget_context.h"
48 #include "table/plain/plain_table_factory.h"
49 #include "table/table_reader.h"
50 #include "table/two_level_iterator.h"
51 #include "test_util/sync_point.h"
52 #include "util/coding.h"
53 #include "util/stop_watch.h"
54 #include "util/string_util.h"
55 #include "util/user_comparator_wrapper.h"
56
57 namespace ROCKSDB_NAMESPACE {
58
59 namespace {
60
61 // Find File in LevelFilesBrief data structure
62 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)63 int FindFileInRange(const InternalKeyComparator& icmp,
64 const LevelFilesBrief& file_level,
65 const Slice& key,
66 uint32_t left,
67 uint32_t right) {
68 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
69 return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
70 };
71 const auto &b = file_level.files;
72 return static_cast<int>(std::lower_bound(b + left,
73 b + right, key, cmp) - b);
74 }
75
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)76 Status OverlapWithIterator(const Comparator* ucmp,
77 const Slice& smallest_user_key,
78 const Slice& largest_user_key,
79 InternalIterator* iter,
80 bool* overlap) {
81 InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
82 kValueTypeForSeek);
83 iter->Seek(range_start.Encode());
84 if (!iter->status().ok()) {
85 return iter->status();
86 }
87
88 *overlap = false;
89 if (iter->Valid()) {
90 ParsedInternalKey seek_result;
91 if (!ParseInternalKey(iter->key(), &seek_result)) {
92 return Status::Corruption("DB have corrupted keys");
93 }
94
95 if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
96 0) {
97 *overlap = true;
98 }
99 }
100
101 return iter->status();
102 }
103
104 // Class to help choose the next file to search for the particular key.
105 // Searches and returns files level by level.
106 // We can search level-by-level since entries never hop across
107 // levels. Therefore we are guaranteed that if we find data
108 // in a smaller level, later levels are irrelevant (unless we
109 // are MergeInProgress).
110 class FilePicker {
111 public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)112 FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
113 const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
114 unsigned int num_levels, FileIndexer* file_indexer,
115 const Comparator* user_comparator,
116 const InternalKeyComparator* internal_comparator)
117 : num_levels_(num_levels),
118 curr_level_(static_cast<unsigned int>(-1)),
119 returned_file_level_(static_cast<unsigned int>(-1)),
120 hit_file_level_(static_cast<unsigned int>(-1)),
121 search_left_bound_(0),
122 search_right_bound_(FileIndexer::kLevelMaxIndex),
123 #ifndef NDEBUG
124 files_(files),
125 #endif
126 level_files_brief_(file_levels),
127 is_hit_file_last_in_level_(false),
128 curr_file_level_(nullptr),
129 user_key_(user_key),
130 ikey_(ikey),
131 file_indexer_(file_indexer),
132 user_comparator_(user_comparator),
133 internal_comparator_(internal_comparator) {
134 #ifdef NDEBUG
135 (void)files;
136 #endif
137 // Setup member variables to search first level.
138 search_ended_ = !PrepareNextLevel();
139 if (!search_ended_) {
140 // Prefetch Level 0 table data to avoid cache miss if possible.
141 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
142 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
143 if (r) {
144 r->Prepare(ikey);
145 }
146 }
147 }
148 }
149
GetCurrentLevel() const150 int GetCurrentLevel() const { return curr_level_; }
151
GetNextFile()152 FdWithKeyRange* GetNextFile() {
153 while (!search_ended_) { // Loops over different levels.
154 while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
155 // Loops over all files in current level.
156 FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
157 hit_file_level_ = curr_level_;
158 is_hit_file_last_in_level_ =
159 curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
160 int cmp_largest = -1;
161
162 // Do key range filtering of files or/and fractional cascading if:
163 // (1) not all the files are in level 0, or
164 // (2) there are more than 3 current level files
165 // If there are only 3 or less current level files in the system, we skip
166 // the key range filtering. In this case, more likely, the system is
167 // highly tuned to minimize number of tables queried by each query,
168 // so it is unlikely that key range filtering is more efficient than
169 // querying the files.
170 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
171 // Check if key is within a file's range. If search left bound and
172 // right bound point to the same find, we are sure key falls in
173 // range.
174 assert(curr_level_ == 0 ||
175 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
176 user_comparator_->CompareWithoutTimestamp(
177 user_key_, ExtractUserKey(f->smallest_key)) <= 0);
178
179 int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
180 user_key_, ExtractUserKey(f->smallest_key));
181 if (cmp_smallest >= 0) {
182 cmp_largest = user_comparator_->CompareWithoutTimestamp(
183 user_key_, ExtractUserKey(f->largest_key));
184 }
185
186 // Setup file search bound for the next level based on the
187 // comparison results
188 if (curr_level_ > 0) {
189 file_indexer_->GetNextLevelIndex(curr_level_,
190 curr_index_in_curr_level_,
191 cmp_smallest, cmp_largest,
192 &search_left_bound_,
193 &search_right_bound_);
194 }
195 // Key falls out of current file's range
196 if (cmp_smallest < 0 || cmp_largest > 0) {
197 if (curr_level_ == 0) {
198 ++curr_index_in_curr_level_;
199 continue;
200 } else {
201 // Search next level.
202 break;
203 }
204 }
205 }
206 #ifndef NDEBUG
207 // Sanity check to make sure that the files are correctly sorted
208 if (prev_file_) {
209 if (curr_level_ != 0) {
210 int comp_sign = internal_comparator_->Compare(
211 prev_file_->largest_key, f->smallest_key);
212 assert(comp_sign < 0);
213 } else {
214 // level == 0, the current file cannot be newer than the previous
215 // one. Use compressed data structure, has no attribute seqNo
216 assert(curr_index_in_curr_level_ > 0);
217 assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
218 files_[0][curr_index_in_curr_level_-1]));
219 }
220 }
221 prev_file_ = f;
222 #endif
223 returned_file_level_ = curr_level_;
224 if (curr_level_ > 0 && cmp_largest < 0) {
225 // No more files to search in this level.
226 search_ended_ = !PrepareNextLevel();
227 } else {
228 ++curr_index_in_curr_level_;
229 }
230 return f;
231 }
232 // Start searching next level.
233 search_ended_ = !PrepareNextLevel();
234 }
235 // Search ended.
236 return nullptr;
237 }
238
239 // getter for current file level
240 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()241 unsigned int GetHitFileLevel() { return hit_file_level_; }
242
243 // Returns true if the most recent "hit file" (i.e., one returned by
244 // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()245 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
246
247 private:
248 unsigned int num_levels_;
249 unsigned int curr_level_;
250 unsigned int returned_file_level_;
251 unsigned int hit_file_level_;
252 int32_t search_left_bound_;
253 int32_t search_right_bound_;
254 #ifndef NDEBUG
255 std::vector<FileMetaData*>* files_;
256 #endif
257 autovector<LevelFilesBrief>* level_files_brief_;
258 bool search_ended_;
259 bool is_hit_file_last_in_level_;
260 LevelFilesBrief* curr_file_level_;
261 unsigned int curr_index_in_curr_level_;
262 unsigned int start_index_in_curr_level_;
263 Slice user_key_;
264 Slice ikey_;
265 FileIndexer* file_indexer_;
266 const Comparator* user_comparator_;
267 const InternalKeyComparator* internal_comparator_;
268 #ifndef NDEBUG
269 FdWithKeyRange* prev_file_;
270 #endif
271
272 // Setup local variables to search next level.
273 // Returns false if there are no more levels to search.
PrepareNextLevel()274 bool PrepareNextLevel() {
275 curr_level_++;
276 while (curr_level_ < num_levels_) {
277 curr_file_level_ = &(*level_files_brief_)[curr_level_];
278 if (curr_file_level_->num_files == 0) {
279 // When current level is empty, the search bound generated from upper
280 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
281 // also empty.
282 assert(search_left_bound_ == 0);
283 assert(search_right_bound_ == -1 ||
284 search_right_bound_ == FileIndexer::kLevelMaxIndex);
285 // Since current level is empty, it will need to search all files in
286 // the next level
287 search_left_bound_ = 0;
288 search_right_bound_ = FileIndexer::kLevelMaxIndex;
289 curr_level_++;
290 continue;
291 }
292
293 // Some files may overlap each other. We find
294 // all files that overlap user_key and process them in order from
295 // newest to oldest. In the context of merge-operator, this can occur at
296 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
297 // are always compacted into a single entry).
298 int32_t start_index;
299 if (curr_level_ == 0) {
300 // On Level-0, we read through all files to check for overlap.
301 start_index = 0;
302 } else {
303 // On Level-n (n>=1), files are sorted. Binary search to find the
304 // earliest file whose largest key >= ikey. Search left bound and
305 // right bound are used to narrow the range.
306 if (search_left_bound_ <= search_right_bound_) {
307 if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
308 search_right_bound_ =
309 static_cast<int32_t>(curr_file_level_->num_files) - 1;
310 }
311 // `search_right_bound_` is an inclusive upper-bound, but since it was
312 // determined based on user key, it is still possible the lookup key
313 // falls to the right of `search_right_bound_`'s corresponding file.
314 // So, pass a limit one higher, which allows us to detect this case.
315 start_index =
316 FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
317 static_cast<uint32_t>(search_left_bound_),
318 static_cast<uint32_t>(search_right_bound_) + 1);
319 if (start_index == search_right_bound_ + 1) {
320 // `ikey_` comes after `search_right_bound_`. The lookup key does
321 // not exist on this level, so let's skip this level and do a full
322 // binary search on the next level.
323 search_left_bound_ = 0;
324 search_right_bound_ = FileIndexer::kLevelMaxIndex;
325 curr_level_++;
326 continue;
327 }
328 } else {
329 // search_left_bound > search_right_bound, key does not exist in
330 // this level. Since no comparison is done in this level, it will
331 // need to search all files in the next level.
332 search_left_bound_ = 0;
333 search_right_bound_ = FileIndexer::kLevelMaxIndex;
334 curr_level_++;
335 continue;
336 }
337 }
338 start_index_in_curr_level_ = start_index;
339 curr_index_in_curr_level_ = start_index;
340 #ifndef NDEBUG
341 prev_file_ = nullptr;
342 #endif
343 return true;
344 }
345 // curr_level_ = num_levels_. So, no more levels to search.
346 return false;
347 }
348 };
349
350 class FilePickerMultiGet {
351 private:
352 struct FilePickerContext;
353
354 public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)355 FilePickerMultiGet(MultiGetRange* range,
356 autovector<LevelFilesBrief>* file_levels,
357 unsigned int num_levels, FileIndexer* file_indexer,
358 const Comparator* user_comparator,
359 const InternalKeyComparator* internal_comparator)
360 : num_levels_(num_levels),
361 curr_level_(static_cast<unsigned int>(-1)),
362 returned_file_level_(static_cast<unsigned int>(-1)),
363 hit_file_level_(static_cast<unsigned int>(-1)),
364 range_(range),
365 batch_iter_(range->begin()),
366 batch_iter_prev_(range->begin()),
367 maybe_repeat_key_(false),
368 current_level_range_(*range, range->begin(), range->end()),
369 current_file_range_(*range, range->begin(), range->end()),
370 level_files_brief_(file_levels),
371 is_hit_file_last_in_level_(false),
372 curr_file_level_(nullptr),
373 file_indexer_(file_indexer),
374 user_comparator_(user_comparator),
375 internal_comparator_(internal_comparator) {
376 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
377 fp_ctx_array_[iter.index()] =
378 FilePickerContext(0, FileIndexer::kLevelMaxIndex);
379 }
380
381 // Setup member variables to search first level.
382 search_ended_ = !PrepareNextLevel();
383 if (!search_ended_) {
384 // REVISIT
385 // Prefetch Level 0 table data to avoid cache miss if possible.
386 // As of now, only PlainTableReader and CuckooTableReader do any
387 // prefetching. This may not be necessary anymore once we implement
388 // batching in those table readers
389 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
390 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
391 if (r) {
392 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
393 r->Prepare(iter->ikey);
394 }
395 }
396 }
397 }
398 }
399
GetCurrentLevel() const400 int GetCurrentLevel() const { return curr_level_; }
401
402 // Iterates through files in the current level until it finds a file that
403 // contains atleast one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)404 bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
405 size_t* file_index, FdWithKeyRange** fd,
406 bool* is_last_key_in_file) {
407 size_t curr_file_index = *file_index;
408 FdWithKeyRange* f = nullptr;
409 bool file_hit = false;
410 int cmp_largest = -1;
411 if (curr_file_index >= curr_file_level_->num_files) {
412 // In the unlikely case the next key is a duplicate of the current key,
413 // and the current key is the last in the level and the internal key
414 // was not found, we need to skip lookup for the remaining keys and
415 // reset the search bounds
416 if (batch_iter_ != current_level_range_.end()) {
417 ++batch_iter_;
418 for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
419 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
420 fp_ctx.search_left_bound = 0;
421 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
422 }
423 }
424 return false;
425 }
426 // Loops over keys in the MultiGet batch until it finds a file with
427 // atleast one of the keys. Then it keeps moving forward until the
428 // last key in the batch that falls in that file
429 while (batch_iter_ != current_level_range_.end() &&
430 (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
431 curr_file_index ||
432 !file_hit)) {
433 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
434 f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
435 Slice& user_key = batch_iter_->ukey;
436
437 // Do key range filtering of files or/and fractional cascading if:
438 // (1) not all the files are in level 0, or
439 // (2) there are more than 3 current level files
440 // If there are only 3 or less current level files in the system, we
441 // skip the key range filtering. In this case, more likely, the system
442 // is highly tuned to minimize number of tables queried by each query,
443 // so it is unlikely that key range filtering is more efficient than
444 // querying the files.
445 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
446 // Check if key is within a file's range. If search left bound and
447 // right bound point to the same find, we are sure key falls in
448 // range.
449 assert(curr_level_ == 0 ||
450 fp_ctx.curr_index_in_curr_level ==
451 fp_ctx.start_index_in_curr_level ||
452 user_comparator_->Compare(user_key,
453 ExtractUserKey(f->smallest_key)) <= 0);
454
455 int cmp_smallest = user_comparator_->Compare(
456 user_key, ExtractUserKey(f->smallest_key));
457 if (cmp_smallest >= 0) {
458 cmp_largest = user_comparator_->Compare(
459 user_key, ExtractUserKey(f->largest_key));
460 } else {
461 cmp_largest = -1;
462 }
463
464 // Setup file search bound for the next level based on the
465 // comparison results
466 if (curr_level_ > 0) {
467 file_indexer_->GetNextLevelIndex(
468 curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
469 cmp_largest, &fp_ctx.search_left_bound,
470 &fp_ctx.search_right_bound);
471 }
472 // Key falls out of current file's range
473 if (cmp_smallest < 0 || cmp_largest > 0) {
474 next_file_range->SkipKey(batch_iter_);
475 } else {
476 file_hit = true;
477 }
478 } else {
479 file_hit = true;
480 }
481 if (cmp_largest == 0) {
482 // cmp_largest is 0, which means the next key will not be in this
483 // file, so stop looking further. Also don't increment megt_iter_
484 // as we may have to look for this key in the next file if we don't
485 // find it in this one
486 break;
487 } else {
488 if (curr_level_ == 0) {
489 // We need to look through all files in level 0
490 ++fp_ctx.curr_index_in_curr_level;
491 }
492 ++batch_iter_;
493 }
494 if (!file_hit) {
495 curr_file_index =
496 (batch_iter_ != current_level_range_.end())
497 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
498 : curr_file_level_->num_files;
499 }
500 }
501
502 *fd = f;
503 *file_index = curr_file_index;
504 *is_last_key_in_file = cmp_largest == 0;
505 return file_hit;
506 }
507
GetNextFile()508 FdWithKeyRange* GetNextFile() {
509 while (!search_ended_) {
510 // Start searching next level.
511 if (batch_iter_ == current_level_range_.end()) {
512 search_ended_ = !PrepareNextLevel();
513 continue;
514 } else {
515 if (maybe_repeat_key_) {
516 maybe_repeat_key_ = false;
517 // Check if we found the final value for the last key in the
518 // previous lookup range. If we did, then there's no need to look
519 // any further for that key, so advance batch_iter_. Else, keep
520 // batch_iter_ positioned on that key so we look it up again in
521 // the next file
522 // For L0, always advance the key because we will look in the next
523 // file regardless for all keys not found yet
524 if (current_level_range_.CheckKeyDone(batch_iter_) ||
525 curr_level_ == 0) {
526 ++batch_iter_;
527 }
528 }
529 // batch_iter_prev_ will become the start key for the next file
530 // lookup
531 batch_iter_prev_ = batch_iter_;
532 }
533
534 MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
535 current_level_range_.end());
536 size_t curr_file_index =
537 (batch_iter_ != current_level_range_.end())
538 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
539 : curr_file_level_->num_files;
540 FdWithKeyRange* f;
541 bool is_last_key_in_file;
542 if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
543 &is_last_key_in_file)) {
544 search_ended_ = !PrepareNextLevel();
545 } else {
546 MultiGetRange::Iterator upper_key = batch_iter_;
547 if (is_last_key_in_file) {
548 // Since cmp_largest is 0, batch_iter_ still points to the last key
549 // that falls in this file, instead of the next one. Increment
550 // upper_key so we can set the range properly for SST MultiGet
551 ++upper_key;
552 ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
553 maybe_repeat_key_ = true;
554 }
555 // Set the range for this file
556 current_file_range_ =
557 MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
558 returned_file_level_ = curr_level_;
559 hit_file_level_ = curr_level_;
560 is_hit_file_last_in_level_ =
561 curr_file_index == curr_file_level_->num_files - 1;
562 return f;
563 }
564 }
565
566 // Search ended
567 return nullptr;
568 }
569
570 // getter for current file level
571 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()572 unsigned int GetHitFileLevel() { return hit_file_level_; }
573
574 // Returns true if the most recent "hit file" (i.e., one returned by
575 // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()576 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
577
CurrentFileRange()578 const MultiGetRange& CurrentFileRange() { return current_file_range_; }
579
580 private:
581 unsigned int num_levels_;
582 unsigned int curr_level_;
583 unsigned int returned_file_level_;
584 unsigned int hit_file_level_;
585
586 struct FilePickerContext {
587 int32_t search_left_bound;
588 int32_t search_right_bound;
589 unsigned int curr_index_in_curr_level;
590 unsigned int start_index_in_curr_level;
591
FilePickerContextROCKSDB_NAMESPACE::__anonf974f4af0111::FilePickerMultiGet::FilePickerContext592 FilePickerContext(int32_t left, int32_t right)
593 : search_left_bound(left), search_right_bound(right),
594 curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
595
596 FilePickerContext() = default;
597 };
598 std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
599 MultiGetRange* range_;
600 // Iterator to iterate through the keys in a MultiGet batch, that gets reset
601 // at the beginning of each level. Each call to GetNextFile() will position
602 // batch_iter_ at or right after the last key that was found in the returned
603 // SST file
604 MultiGetRange::Iterator batch_iter_;
605 // An iterator that records the previous position of batch_iter_, i.e last
606 // key found in the previous SST file, in order to serve as the start of
607 // the batch key range for the next SST file
608 MultiGetRange::Iterator batch_iter_prev_;
609 bool maybe_repeat_key_;
610 MultiGetRange current_level_range_;
611 MultiGetRange current_file_range_;
612 autovector<LevelFilesBrief>* level_files_brief_;
613 bool search_ended_;
614 bool is_hit_file_last_in_level_;
615 LevelFilesBrief* curr_file_level_;
616 FileIndexer* file_indexer_;
617 const Comparator* user_comparator_;
618 const InternalKeyComparator* internal_comparator_;
619
620 // Setup local variables to search next level.
621 // Returns false if there are no more levels to search.
PrepareNextLevel()622 bool PrepareNextLevel() {
623 if (curr_level_ == 0) {
624 MultiGetRange::Iterator mget_iter = current_level_range_.begin();
625 if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
626 curr_file_level_->num_files) {
627 batch_iter_prev_ = current_level_range_.begin();
628 batch_iter_ = current_level_range_.begin();
629 return true;
630 }
631 }
632
633 curr_level_++;
634 // Reset key range to saved value
635 while (curr_level_ < num_levels_) {
636 bool level_contains_keys = false;
637 curr_file_level_ = &(*level_files_brief_)[curr_level_];
638 if (curr_file_level_->num_files == 0) {
639 // When current level is empty, the search bound generated from upper
640 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
641 // also empty.
642
643 for (auto mget_iter = current_level_range_.begin();
644 mget_iter != current_level_range_.end(); ++mget_iter) {
645 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
646
647 assert(fp_ctx.search_left_bound == 0);
648 assert(fp_ctx.search_right_bound == -1 ||
649 fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
650 // Since current level is empty, it will need to search all files in
651 // the next level
652 fp_ctx.search_left_bound = 0;
653 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
654 }
655 // Skip all subsequent empty levels
656 do {
657 ++curr_level_;
658 } while ((curr_level_ < num_levels_) &&
659 (*level_files_brief_)[curr_level_].num_files == 0);
660 continue;
661 }
662
663 // Some files may overlap each other. We find
664 // all files that overlap user_key and process them in order from
665 // newest to oldest. In the context of merge-operator, this can occur at
666 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
667 // are always compacted into a single entry).
668 int32_t start_index = -1;
669 current_level_range_ =
670 MultiGetRange(*range_, range_->begin(), range_->end());
671 for (auto mget_iter = current_level_range_.begin();
672 mget_iter != current_level_range_.end(); ++mget_iter) {
673 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
674 if (curr_level_ == 0) {
675 // On Level-0, we read through all files to check for overlap.
676 start_index = 0;
677 level_contains_keys = true;
678 } else {
679 // On Level-n (n>=1), files are sorted. Binary search to find the
680 // earliest file whose largest key >= ikey. Search left bound and
681 // right bound are used to narrow the range.
682 if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
683 if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
684 fp_ctx.search_right_bound =
685 static_cast<int32_t>(curr_file_level_->num_files) - 1;
686 }
687 // `search_right_bound_` is an inclusive upper-bound, but since it
688 // was determined based on user key, it is still possible the lookup
689 // key falls to the right of `search_right_bound_`'s corresponding
690 // file. So, pass a limit one higher, which allows us to detect this
691 // case.
692 Slice& ikey = mget_iter->ikey;
693 start_index = FindFileInRange(
694 *internal_comparator_, *curr_file_level_, ikey,
695 static_cast<uint32_t>(fp_ctx.search_left_bound),
696 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
697 if (start_index == fp_ctx.search_right_bound + 1) {
698 // `ikey_` comes after `search_right_bound_`. The lookup key does
699 // not exist on this level, so let's skip this level and do a full
700 // binary search on the next level.
701 fp_ctx.search_left_bound = 0;
702 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
703 current_level_range_.SkipKey(mget_iter);
704 continue;
705 } else {
706 level_contains_keys = true;
707 }
708 } else {
709 // search_left_bound > search_right_bound, key does not exist in
710 // this level. Since no comparison is done in this level, it will
711 // need to search all files in the next level.
712 fp_ctx.search_left_bound = 0;
713 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
714 current_level_range_.SkipKey(mget_iter);
715 continue;
716 }
717 }
718 fp_ctx.start_index_in_curr_level = start_index;
719 fp_ctx.curr_index_in_curr_level = start_index;
720 }
721 if (level_contains_keys) {
722 batch_iter_prev_ = current_level_range_.begin();
723 batch_iter_ = current_level_range_.begin();
724 return true;
725 }
726 curr_level_++;
727 }
728 // curr_level_ = num_levels_. So, no more levels to search.
729 return false;
730 }
731 };
732 } // anonymous namespace
733
~VersionStorageInfo()734 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
735
~Version()736 Version::~Version() {
737 assert(refs_ == 0);
738
739 // Remove from linked list
740 prev_->next_ = next_;
741 next_->prev_ = prev_;
742
743 // Drop references to files
744 for (int level = 0; level < storage_info_.num_levels_; level++) {
745 for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
746 FileMetaData* f = storage_info_.files_[level][i];
747 assert(f->refs > 0);
748 f->refs--;
749 if (f->refs <= 0) {
750 assert(cfd_ != nullptr);
751 uint32_t path_id = f->fd.GetPathId();
752 assert(path_id < cfd_->ioptions()->cf_paths.size());
753 vset_->obsolete_files_.push_back(
754 ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
755 }
756 }
757 }
758 }
759
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)760 int FindFile(const InternalKeyComparator& icmp,
761 const LevelFilesBrief& file_level,
762 const Slice& key) {
763 return FindFileInRange(icmp, file_level, key, 0,
764 static_cast<uint32_t>(file_level.num_files));
765 }
766
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)767 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
768 const std::vector<FileMetaData*>& files,
769 Arena* arena) {
770 assert(file_level);
771 assert(arena);
772
773 size_t num = files.size();
774 file_level->num_files = num;
775 char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
776 file_level->files = new (mem)FdWithKeyRange[num];
777
778 for (size_t i = 0; i < num; i++) {
779 Slice smallest_key = files[i]->smallest.Encode();
780 Slice largest_key = files[i]->largest.Encode();
781
782 // Copy key slice to sequential memory
783 size_t smallest_size = smallest_key.size();
784 size_t largest_size = largest_key.size();
785 mem = arena->AllocateAligned(smallest_size + largest_size);
786 memcpy(mem, smallest_key.data(), smallest_size);
787 memcpy(mem + smallest_size, largest_key.data(), largest_size);
788
789 FdWithKeyRange& f = file_level->files[i];
790 f.fd = files[i]->fd;
791 f.file_metadata = files[i];
792 f.smallest_key = Slice(mem, smallest_size);
793 f.largest_key = Slice(mem + smallest_size, largest_size);
794 }
795 }
796
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)797 static bool AfterFile(const Comparator* ucmp,
798 const Slice* user_key, const FdWithKeyRange* f) {
799 // nullptr user_key occurs before all keys and is therefore never after *f
800 return (user_key != nullptr &&
801 ucmp->CompareWithoutTimestamp(*user_key,
802 ExtractUserKey(f->largest_key)) > 0);
803 }
804
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)805 static bool BeforeFile(const Comparator* ucmp,
806 const Slice* user_key, const FdWithKeyRange* f) {
807 // nullptr user_key occurs after all keys and is therefore never before *f
808 return (user_key != nullptr &&
809 ucmp->CompareWithoutTimestamp(*user_key,
810 ExtractUserKey(f->smallest_key)) < 0);
811 }
812
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)813 bool SomeFileOverlapsRange(
814 const InternalKeyComparator& icmp,
815 bool disjoint_sorted_files,
816 const LevelFilesBrief& file_level,
817 const Slice* smallest_user_key,
818 const Slice* largest_user_key) {
819 const Comparator* ucmp = icmp.user_comparator();
820 if (!disjoint_sorted_files) {
821 // Need to check against all files
822 for (size_t i = 0; i < file_level.num_files; i++) {
823 const FdWithKeyRange* f = &(file_level.files[i]);
824 if (AfterFile(ucmp, smallest_user_key, f) ||
825 BeforeFile(ucmp, largest_user_key, f)) {
826 // No overlap
827 } else {
828 return true; // Overlap
829 }
830 }
831 return false;
832 }
833
834 // Binary search over file list
835 uint32_t index = 0;
836 if (smallest_user_key != nullptr) {
837 // Find the leftmost possible internal key for smallest_user_key
838 InternalKey small;
839 small.SetMinPossibleForUserKey(*smallest_user_key);
840 index = FindFile(icmp, file_level, small.Encode());
841 }
842
843 if (index >= file_level.num_files) {
844 // beginning of range is after all files, so no overlap.
845 return false;
846 }
847
848 return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
849 }
850
851 namespace {
852
853 class LevelIterator final : public InternalIterator {
854 public:
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr)855 LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
856 const FileOptions& file_options,
857 const InternalKeyComparator& icomparator,
858 const LevelFilesBrief* flevel,
859 const SliceTransform* prefix_extractor, bool should_sample,
860 HistogramImpl* file_read_hist, TableReaderCaller caller,
861 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
862 const std::vector<AtomicCompactionUnitBoundary>*
863 compaction_boundaries = nullptr)
864 : table_cache_(table_cache),
865 read_options_(read_options),
866 file_options_(file_options),
867 icomparator_(icomparator),
868 user_comparator_(icomparator.user_comparator()),
869 flevel_(flevel),
870 prefix_extractor_(prefix_extractor),
871 file_read_hist_(file_read_hist),
872 should_sample_(should_sample),
873 caller_(caller),
874 skip_filters_(skip_filters),
875 file_index_(flevel_->num_files),
876 level_(level),
877 range_del_agg_(range_del_agg),
878 pinned_iters_mgr_(nullptr),
879 compaction_boundaries_(compaction_boundaries) {
880 // Empty level is not supported.
881 assert(flevel_ != nullptr && flevel_->num_files > 0);
882 }
883
~LevelIterator()884 ~LevelIterator() override { delete file_iter_.Set(nullptr); }
885
886 void Seek(const Slice& target) override;
887 void SeekForPrev(const Slice& target) override;
888 void SeekToFirst() override;
889 void SeekToLast() override;
890 void Next() final override;
891 bool NextAndGetResult(IterateResult* result) override;
892 void Prev() override;
893
Valid() const894 bool Valid() const override { return file_iter_.Valid(); }
key() const895 Slice key() const override {
896 assert(Valid());
897 return file_iter_.key();
898 }
899
value() const900 Slice value() const override {
901 assert(Valid());
902 return file_iter_.value();
903 }
904
status() const905 Status status() const override {
906 return file_iter_.iter() ? file_iter_.status() : Status::OK();
907 }
908
MayBeOutOfLowerBound()909 inline bool MayBeOutOfLowerBound() override {
910 assert(Valid());
911 return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
912 }
913
MayBeOutOfUpperBound()914 inline bool MayBeOutOfUpperBound() override {
915 assert(Valid());
916 return file_iter_.MayBeOutOfUpperBound();
917 }
918
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)919 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
920 pinned_iters_mgr_ = pinned_iters_mgr;
921 if (file_iter_.iter()) {
922 file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
923 }
924 }
925
IsKeyPinned() const926 bool IsKeyPinned() const override {
927 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
928 file_iter_.iter() && file_iter_.IsKeyPinned();
929 }
930
IsValuePinned() const931 bool IsValuePinned() const override {
932 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
933 file_iter_.iter() && file_iter_.IsValuePinned();
934 }
935
936 private:
937 // Return true if at least one invalid file is seen and skipped.
938 bool SkipEmptyFileForward();
939 void SkipEmptyFileBackward();
940 void SetFileIterator(InternalIterator* iter);
941 void InitFileIterator(size_t new_file_index);
942
943 // Called by both of Next() and NextAndGetResult(). Force inline.
NextImpl()944 void NextImpl() {
945 assert(Valid());
946 file_iter_.Next();
947 SkipEmptyFileForward();
948 }
949
file_smallest_key(size_t file_index)950 const Slice& file_smallest_key(size_t file_index) {
951 assert(file_index < flevel_->num_files);
952 return flevel_->files[file_index].smallest_key;
953 }
954
KeyReachedUpperBound(const Slice & internal_key)955 bool KeyReachedUpperBound(const Slice& internal_key) {
956 return read_options_.iterate_upper_bound != nullptr &&
957 user_comparator_.CompareWithoutTimestamp(
958 ExtractUserKey(internal_key),
959 *read_options_.iterate_upper_bound) >= 0;
960 }
961
NewFileIterator()962 InternalIterator* NewFileIterator() {
963 assert(file_index_ < flevel_->num_files);
964 auto file_meta = flevel_->files[file_index_];
965 if (should_sample_) {
966 sample_file_read_inc(file_meta.file_metadata);
967 }
968
969 const InternalKey* smallest_compaction_key = nullptr;
970 const InternalKey* largest_compaction_key = nullptr;
971 if (compaction_boundaries_ != nullptr) {
972 smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
973 largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
974 }
975 CheckMayBeOutOfLowerBound();
976 return table_cache_->NewIterator(
977 read_options_, file_options_, icomparator_, *file_meta.file_metadata,
978 range_del_agg_, prefix_extractor_,
979 nullptr /* don't need reference to table */, file_read_hist_, caller_,
980 /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
981 largest_compaction_key);
982 }
983
984 // Check if current file being fully within iterate_lower_bound.
985 //
986 // Note MyRocks may update iterate bounds between seek. To workaround it,
987 // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()988 void CheckMayBeOutOfLowerBound() {
989 if (read_options_.iterate_lower_bound != nullptr &&
990 file_index_ < flevel_->num_files) {
991 may_be_out_of_lower_bound_ =
992 user_comparator_.Compare(
993 ExtractUserKey(file_smallest_key(file_index_)),
994 *read_options_.iterate_lower_bound) < 0;
995 }
996 }
997
998 TableCache* table_cache_;
999 const ReadOptions read_options_;
1000 const FileOptions& file_options_;
1001 const InternalKeyComparator& icomparator_;
1002 const UserComparatorWrapper user_comparator_;
1003 const LevelFilesBrief* flevel_;
1004 mutable FileDescriptor current_value_;
1005 // `prefix_extractor_` may be non-null even for total order seek. Checking
1006 // this variable is not the right way to identify whether prefix iterator
1007 // is used.
1008 const SliceTransform* prefix_extractor_;
1009
1010 HistogramImpl* file_read_hist_;
1011 bool should_sample_;
1012 TableReaderCaller caller_;
1013 bool skip_filters_;
1014 bool may_be_out_of_lower_bound_ = true;
1015 size_t file_index_;
1016 int level_;
1017 RangeDelAggregator* range_del_agg_;
1018 IteratorWrapper file_iter_; // May be nullptr
1019 PinnedIteratorsManager* pinned_iters_mgr_;
1020
1021 // To be propagated to RangeDelAggregator in order to safely truncate range
1022 // tombstones.
1023 const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1024 };
1025
Seek(const Slice & target)1026 void LevelIterator::Seek(const Slice& target) {
1027 // Check whether the seek key fall under the same file
1028 bool need_to_reseek = true;
1029 if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1030 const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1031 if (icomparator_.InternalKeyComparator::Compare(
1032 target, cur_file.largest_key) <= 0 &&
1033 icomparator_.InternalKeyComparator::Compare(
1034 target, cur_file.smallest_key) >= 0) {
1035 need_to_reseek = false;
1036 assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1037 file_index_);
1038 }
1039 }
1040 if (need_to_reseek) {
1041 TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1042 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1043 InitFileIterator(new_file_index);
1044 }
1045
1046 if (file_iter_.iter() != nullptr) {
1047 file_iter_.Seek(target);
1048 }
1049 if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1050 !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1051 file_iter_.iter() != nullptr && file_iter_.Valid()) {
1052 // We've skipped the file we initially positioned to. In the prefix
1053 // seek case, it is likely that the file is skipped because of
1054 // prefix bloom or hash, where more keys are skipped. We then check
1055 // the current key and invalidate the iterator if the prefix is
1056 // already passed.
1057 // When doing prefix iterator seek, when keys for one prefix have
1058 // been exhausted, it can jump to any key that is larger. Here we are
1059 // enforcing a stricter contract than that, in order to make it easier for
1060 // higher layers (merging and DB iterator) to reason the correctness:
1061 // 1. Within the prefix, the result should be accurate.
1062 // 2. If keys for the prefix is exhausted, it is either positioned to the
1063 // next key after the prefix, or make the iterator invalid.
1064 // A side benefit will be that it invalidates the iterator earlier so that
1065 // the upper level merging iterator can merge fewer child iterators.
1066 Slice target_user_key = ExtractUserKey(target);
1067 Slice file_user_key = ExtractUserKey(file_iter_.key());
1068 if (prefix_extractor_->InDomain(target_user_key) &&
1069 (!prefix_extractor_->InDomain(file_user_key) ||
1070 user_comparator_.Compare(
1071 prefix_extractor_->Transform(target_user_key),
1072 prefix_extractor_->Transform(file_user_key)) != 0)) {
1073 SetFileIterator(nullptr);
1074 }
1075 }
1076 CheckMayBeOutOfLowerBound();
1077 }
1078
SeekForPrev(const Slice & target)1079 void LevelIterator::SeekForPrev(const Slice& target) {
1080 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1081 if (new_file_index >= flevel_->num_files) {
1082 new_file_index = flevel_->num_files - 1;
1083 }
1084
1085 InitFileIterator(new_file_index);
1086 if (file_iter_.iter() != nullptr) {
1087 file_iter_.SeekForPrev(target);
1088 SkipEmptyFileBackward();
1089 }
1090 CheckMayBeOutOfLowerBound();
1091 }
1092
SeekToFirst()1093 void LevelIterator::SeekToFirst() {
1094 InitFileIterator(0);
1095 if (file_iter_.iter() != nullptr) {
1096 file_iter_.SeekToFirst();
1097 }
1098 SkipEmptyFileForward();
1099 CheckMayBeOutOfLowerBound();
1100 }
1101
SeekToLast()1102 void LevelIterator::SeekToLast() {
1103 InitFileIterator(flevel_->num_files - 1);
1104 if (file_iter_.iter() != nullptr) {
1105 file_iter_.SeekToLast();
1106 }
1107 SkipEmptyFileBackward();
1108 CheckMayBeOutOfLowerBound();
1109 }
1110
Next()1111 void LevelIterator::Next() { NextImpl(); }
1112
NextAndGetResult(IterateResult * result)1113 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1114 NextImpl();
1115 bool is_valid = Valid();
1116 if (is_valid) {
1117 result->key = key();
1118 result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
1119 }
1120 return is_valid;
1121 }
1122
Prev()1123 void LevelIterator::Prev() {
1124 assert(Valid());
1125 file_iter_.Prev();
1126 SkipEmptyFileBackward();
1127 }
1128
SkipEmptyFileForward()1129 bool LevelIterator::SkipEmptyFileForward() {
1130 bool seen_empty_file = false;
1131 while (file_iter_.iter() == nullptr ||
1132 (!file_iter_.Valid() && file_iter_.status().ok() &&
1133 !file_iter_.iter()->IsOutOfBound())) {
1134 seen_empty_file = true;
1135 // Move to next file
1136 if (file_index_ >= flevel_->num_files - 1) {
1137 // Already at the last file
1138 SetFileIterator(nullptr);
1139 break;
1140 }
1141 if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1142 SetFileIterator(nullptr);
1143 break;
1144 }
1145 InitFileIterator(file_index_ + 1);
1146 if (file_iter_.iter() != nullptr) {
1147 file_iter_.SeekToFirst();
1148 }
1149 }
1150 return seen_empty_file;
1151 }
1152
SkipEmptyFileBackward()1153 void LevelIterator::SkipEmptyFileBackward() {
1154 while (file_iter_.iter() == nullptr ||
1155 (!file_iter_.Valid() && file_iter_.status().ok())) {
1156 // Move to previous file
1157 if (file_index_ == 0) {
1158 // Already the first file
1159 SetFileIterator(nullptr);
1160 return;
1161 }
1162 InitFileIterator(file_index_ - 1);
1163 if (file_iter_.iter() != nullptr) {
1164 file_iter_.SeekToLast();
1165 }
1166 }
1167 }
1168
SetFileIterator(InternalIterator * iter)1169 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1170 if (pinned_iters_mgr_ && iter) {
1171 iter->SetPinnedItersMgr(pinned_iters_mgr_);
1172 }
1173
1174 InternalIterator* old_iter = file_iter_.Set(iter);
1175 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1176 pinned_iters_mgr_->PinIterator(old_iter);
1177 } else {
1178 delete old_iter;
1179 }
1180 }
1181
InitFileIterator(size_t new_file_index)1182 void LevelIterator::InitFileIterator(size_t new_file_index) {
1183 if (new_file_index >= flevel_->num_files) {
1184 file_index_ = new_file_index;
1185 SetFileIterator(nullptr);
1186 return;
1187 } else {
1188 // If the file iterator shows incomplete, we try it again if users seek
1189 // to the same file, as this time we may go to a different data block
1190 // which is cached in block cache.
1191 //
1192 if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1193 new_file_index == file_index_) {
1194 // file_iter_ is already constructed with this iterator, so
1195 // no need to change anything
1196 } else {
1197 file_index_ = new_file_index;
1198 InternalIterator* iter = NewFileIterator();
1199 SetFileIterator(iter);
1200 }
1201 }
1202 }
1203 } // anonymous namespace
1204
1205 // A wrapper of version builder which references the current version in
1206 // constructor and unref it in the destructor.
1207 // Both of the constructor and destructor need to be called inside DB Mutex.
1208 class BaseReferencedVersionBuilder {
1209 public:
BaseReferencedVersionBuilder(ColumnFamilyData * cfd)1210 explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
1211 : version_builder_(new VersionBuilder(
1212 cfd->current()->version_set()->file_options(), cfd->table_cache(),
1213 cfd->current()->storage_info(), cfd->ioptions()->info_log)),
1214 version_(cfd->current()) {
1215 version_->Ref();
1216 }
~BaseReferencedVersionBuilder()1217 ~BaseReferencedVersionBuilder() {
1218 version_->Unref();
1219 }
version_builder()1220 VersionBuilder* version_builder() { return version_builder_.get(); }
1221
1222 private:
1223 std::unique_ptr<VersionBuilder> version_builder_;
1224 Version* version_;
1225 };
1226
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1227 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1228 const FileMetaData* file_meta,
1229 const std::string* fname) const {
1230 auto table_cache = cfd_->table_cache();
1231 auto ioptions = cfd_->ioptions();
1232 Status s = table_cache->GetTableProperties(
1233 file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1234 mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1235 if (s.ok()) {
1236 return s;
1237 }
1238
1239 // We only ignore error type `Incomplete` since it's by design that we
1240 // disallow table when it's not in table cache.
1241 if (!s.IsIncomplete()) {
1242 return s;
1243 }
1244
1245 // 2. Table is not present in table cache, we'll read the table properties
1246 // directly from the properties block in the file.
1247 std::unique_ptr<FSRandomAccessFile> file;
1248 std::string file_name;
1249 if (fname != nullptr) {
1250 file_name = *fname;
1251 } else {
1252 file_name =
1253 TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1254 file_meta->fd.GetPathId());
1255 }
1256 s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1257 nullptr);
1258 if (!s.ok()) {
1259 return s;
1260 }
1261
1262 TableProperties* raw_table_properties;
1263 // By setting the magic number to kInvalidTableMagicNumber, we can by
1264 // pass the magic number check in the footer.
1265 std::unique_ptr<RandomAccessFileReader> file_reader(
1266 new RandomAccessFileReader(
1267 std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
1268 0 /* hist_type */, nullptr /* file_read_hist */,
1269 nullptr /* rate_limiter */, ioptions->listeners));
1270 s = ReadTableProperties(
1271 file_reader.get(), file_meta->fd.GetFileSize(),
1272 Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1273 &raw_table_properties, false /* compression_type_missing */);
1274 if (!s.ok()) {
1275 return s;
1276 }
1277 RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1278
1279 *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1280 return s;
1281 }
1282
GetPropertiesOfAllTables(TablePropertiesCollection * props)1283 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1284 Status s;
1285 for (int level = 0; level < storage_info_.num_levels_; level++) {
1286 s = GetPropertiesOfAllTables(props, level);
1287 if (!s.ok()) {
1288 return s;
1289 }
1290 }
1291
1292 return Status::OK();
1293 }
1294
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1295 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1296 std::string* out_str) {
1297 if (max_entries_to_print <= 0) {
1298 return Status::OK();
1299 }
1300 int num_entries_left = max_entries_to_print;
1301
1302 std::stringstream ss;
1303
1304 for (int level = 0; level < storage_info_.num_levels_; level++) {
1305 for (const auto& file_meta : storage_info_.files_[level]) {
1306 auto fname =
1307 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1308 file_meta->fd.GetPathId());
1309
1310 ss << "=== file : " << fname << " ===\n";
1311
1312 TableCache* table_cache = cfd_->table_cache();
1313 std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1314
1315 Status s = table_cache->GetRangeTombstoneIterator(
1316 ReadOptions(), cfd_->internal_comparator(), *file_meta,
1317 &tombstone_iter);
1318 if (!s.ok()) {
1319 return s;
1320 }
1321 if (tombstone_iter) {
1322 tombstone_iter->SeekToFirst();
1323
1324 while (tombstone_iter->Valid() && num_entries_left > 0) {
1325 ss << "start: " << tombstone_iter->start_key().ToString(true)
1326 << " end: " << tombstone_iter->end_key().ToString(true)
1327 << " seq: " << tombstone_iter->seq() << '\n';
1328 tombstone_iter->Next();
1329 num_entries_left--;
1330 }
1331 if (num_entries_left <= 0) {
1332 break;
1333 }
1334 }
1335 }
1336 if (num_entries_left <= 0) {
1337 break;
1338 }
1339 }
1340 assert(num_entries_left >= 0);
1341 if (num_entries_left <= 0) {
1342 ss << "(results may not be complete)\n";
1343 }
1344
1345 *out_str = ss.str();
1346 return Status::OK();
1347 }
1348
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1349 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1350 int level) {
1351 for (const auto& file_meta : storage_info_.files_[level]) {
1352 auto fname =
1353 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1354 file_meta->fd.GetPathId());
1355 // 1. If the table is already present in table cache, load table
1356 // properties from there.
1357 std::shared_ptr<const TableProperties> table_properties;
1358 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1359 if (s.ok()) {
1360 props->insert({fname, table_properties});
1361 } else {
1362 return s;
1363 }
1364 }
1365
1366 return Status::OK();
1367 }
1368
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1369 Status Version::GetPropertiesOfTablesInRange(
1370 const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1371 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1372 for (decltype(n) i = 0; i < n; i++) {
1373 // Convert user_key into a corresponding internal key.
1374 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1375 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1376 std::vector<FileMetaData*> files;
1377 storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1378 false);
1379 for (const auto& file_meta : files) {
1380 auto fname =
1381 TableFileName(cfd_->ioptions()->cf_paths,
1382 file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1383 if (props->count(fname) == 0) {
1384 // 1. If the table is already present in table cache, load table
1385 // properties from there.
1386 std::shared_ptr<const TableProperties> table_properties;
1387 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1388 if (s.ok()) {
1389 props->insert({fname, table_properties});
1390 } else {
1391 return s;
1392 }
1393 }
1394 }
1395 }
1396 }
1397
1398 return Status::OK();
1399 }
1400
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1401 Status Version::GetAggregatedTableProperties(
1402 std::shared_ptr<const TableProperties>* tp, int level) {
1403 TablePropertiesCollection props;
1404 Status s;
1405 if (level < 0) {
1406 s = GetPropertiesOfAllTables(&props);
1407 } else {
1408 s = GetPropertiesOfAllTables(&props, level);
1409 }
1410 if (!s.ok()) {
1411 return s;
1412 }
1413
1414 auto* new_tp = new TableProperties();
1415 for (const auto& item : props) {
1416 new_tp->Add(*item.second);
1417 }
1418 tp->reset(new_tp);
1419 return Status::OK();
1420 }
1421
GetMemoryUsageByTableReaders()1422 size_t Version::GetMemoryUsageByTableReaders() {
1423 size_t total_usage = 0;
1424 for (auto& file_level : storage_info_.level_files_brief_) {
1425 for (size_t i = 0; i < file_level.num_files; i++) {
1426 total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1427 file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1428 mutable_cf_options_.prefix_extractor.get());
1429 }
1430 }
1431 return total_usage;
1432 }
1433
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1434 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1435 assert(cf_meta);
1436 assert(cfd_);
1437
1438 cf_meta->name = cfd_->GetName();
1439 cf_meta->size = 0;
1440 cf_meta->file_count = 0;
1441 cf_meta->levels.clear();
1442
1443 auto* ioptions = cfd_->ioptions();
1444 auto* vstorage = storage_info();
1445
1446 for (int level = 0; level < cfd_->NumberLevels(); level++) {
1447 uint64_t level_size = 0;
1448 cf_meta->file_count += vstorage->LevelFiles(level).size();
1449 std::vector<SstFileMetaData> files;
1450 for (const auto& file : vstorage->LevelFiles(level)) {
1451 uint32_t path_id = file->fd.GetPathId();
1452 std::string file_path;
1453 if (path_id < ioptions->cf_paths.size()) {
1454 file_path = ioptions->cf_paths[path_id].path;
1455 } else {
1456 assert(!ioptions->cf_paths.empty());
1457 file_path = ioptions->cf_paths.back().path;
1458 }
1459 const uint64_t file_number = file->fd.GetNumber();
1460 files.emplace_back(SstFileMetaData{
1461 MakeTableFileName("", file_number), file_number, file_path,
1462 static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1463 file->fd.largest_seqno, file->smallest.user_key().ToString(),
1464 file->largest.user_key().ToString(),
1465 file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1466 file->being_compacted, file->oldest_blob_file_number,
1467 file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
1468 file->file_checksum, file->file_checksum_func_name});
1469 files.back().num_entries = file->num_entries;
1470 files.back().num_deletions = file->num_deletions;
1471 level_size += file->fd.GetFileSize();
1472 }
1473 cf_meta->levels.emplace_back(
1474 level, level_size, std::move(files));
1475 cf_meta->size += level_size;
1476 }
1477 }
1478
GetSstFilesSize()1479 uint64_t Version::GetSstFilesSize() {
1480 uint64_t sst_files_size = 0;
1481 for (int level = 0; level < storage_info_.num_levels_; level++) {
1482 for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1483 sst_files_size += file_meta->fd.GetFileSize();
1484 }
1485 }
1486 return sst_files_size;
1487 }
1488
GetCreationTimeOfOldestFile(uint64_t * creation_time)1489 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1490 uint64_t oldest_time = port::kMaxUint64;
1491 for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1492 for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1493 assert(meta->fd.table_reader != nullptr);
1494 uint64_t file_creation_time = meta->TryGetFileCreationTime();
1495 if (file_creation_time == kUnknownFileCreationTime) {
1496 *creation_time = 0;
1497 return;
1498 }
1499 if (file_creation_time < oldest_time) {
1500 oldest_time = file_creation_time;
1501 }
1502 }
1503 }
1504 *creation_time = oldest_time;
1505 }
1506
GetEstimatedActiveKeys() const1507 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1508 // Estimation will be inaccurate when:
1509 // (1) there exist merge keys
1510 // (2) keys are directly overwritten
1511 // (3) deletion on non-existing keys
1512 // (4) low number of samples
1513 if (current_num_samples_ == 0) {
1514 return 0;
1515 }
1516
1517 if (current_num_non_deletions_ <= current_num_deletions_) {
1518 return 0;
1519 }
1520
1521 uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1522
1523 uint64_t file_count = 0;
1524 for (int level = 0; level < num_levels_; ++level) {
1525 file_count += files_[level].size();
1526 }
1527
1528 if (current_num_samples_ < file_count) {
1529 // casting to avoid overflowing
1530 return
1531 static_cast<uint64_t>(
1532 (est * static_cast<double>(file_count) / current_num_samples_)
1533 );
1534 } else {
1535 return est;
1536 }
1537 }
1538
GetEstimatedCompressionRatioAtLevel(int level) const1539 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1540 int level) const {
1541 assert(level < num_levels_);
1542 uint64_t sum_file_size_bytes = 0;
1543 uint64_t sum_data_size_bytes = 0;
1544 for (auto* file_meta : files_[level]) {
1545 sum_file_size_bytes += file_meta->fd.GetFileSize();
1546 sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1547 }
1548 if (sum_file_size_bytes == 0) {
1549 return -1.0;
1550 }
1551 return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1552 }
1553
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg)1554 void Version::AddIterators(const ReadOptions& read_options,
1555 const FileOptions& soptions,
1556 MergeIteratorBuilder* merge_iter_builder,
1557 RangeDelAggregator* range_del_agg) {
1558 assert(storage_info_.finalized_);
1559
1560 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1561 AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1562 range_del_agg);
1563 }
1564 }
1565
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg)1566 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1567 const FileOptions& soptions,
1568 MergeIteratorBuilder* merge_iter_builder,
1569 int level,
1570 RangeDelAggregator* range_del_agg) {
1571 assert(storage_info_.finalized_);
1572 if (level >= storage_info_.num_non_empty_levels()) {
1573 // This is an empty level
1574 return;
1575 } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1576 // No files in this level
1577 return;
1578 }
1579
1580 bool should_sample = should_sample_file_read();
1581
1582 auto* arena = merge_iter_builder->GetArena();
1583 if (level == 0) {
1584 // Merge all level zero files together since they may overlap
1585 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1586 const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1587 merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1588 read_options, soptions, cfd_->internal_comparator(),
1589 *file.file_metadata, range_del_agg,
1590 mutable_cf_options_.prefix_extractor.get(), nullptr,
1591 cfd_->internal_stats()->GetFileReadHist(0),
1592 TableReaderCaller::kUserIterator, arena,
1593 /*skip_filters=*/false, /*level=*/0,
1594 /*smallest_compaction_key=*/nullptr,
1595 /*largest_compaction_key=*/nullptr));
1596 }
1597 if (should_sample) {
1598 // Count ones for every L0 files. This is done per iterator creation
1599 // rather than Seek(), while files in other levels are recored per seek.
1600 // If users execute one range query per iterator, there may be some
1601 // discrepancy here.
1602 for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1603 sample_file_read_inc(meta);
1604 }
1605 }
1606 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1607 // For levels > 0, we can use a concatenating iterator that sequentially
1608 // walks through the non-overlapping files in the level, opening them
1609 // lazily.
1610 auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1611 merge_iter_builder->AddIterator(new (mem) LevelIterator(
1612 cfd_->table_cache(), read_options, soptions,
1613 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1614 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1615 cfd_->internal_stats()->GetFileReadHist(level),
1616 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1617 range_del_agg, /*largest_compaction_key=*/nullptr));
1618 }
1619 }
1620
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1621 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1622 const FileOptions& file_options,
1623 const Slice& smallest_user_key,
1624 const Slice& largest_user_key,
1625 int level, bool* overlap) {
1626 assert(storage_info_.finalized_);
1627
1628 auto icmp = cfd_->internal_comparator();
1629 auto ucmp = icmp.user_comparator();
1630
1631 Arena arena;
1632 Status status;
1633 ReadRangeDelAggregator range_del_agg(&icmp,
1634 kMaxSequenceNumber /* upper_bound */);
1635
1636 *overlap = false;
1637
1638 if (level == 0) {
1639 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1640 const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1641 if (AfterFile(ucmp, &smallest_user_key, file) ||
1642 BeforeFile(ucmp, &largest_user_key, file)) {
1643 continue;
1644 }
1645 ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1646 read_options, file_options, cfd_->internal_comparator(),
1647 *file->file_metadata, &range_del_agg,
1648 mutable_cf_options_.prefix_extractor.get(), nullptr,
1649 cfd_->internal_stats()->GetFileReadHist(0),
1650 TableReaderCaller::kUserIterator, &arena,
1651 /*skip_filters=*/false, /*level=*/0,
1652 /*smallest_compaction_key=*/nullptr,
1653 /*largest_compaction_key=*/nullptr));
1654 status = OverlapWithIterator(
1655 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1656 if (!status.ok() || *overlap) {
1657 break;
1658 }
1659 }
1660 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1661 auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1662 ScopedArenaIterator iter(new (mem) LevelIterator(
1663 cfd_->table_cache(), read_options, file_options,
1664 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1665 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1666 cfd_->internal_stats()->GetFileReadHist(level),
1667 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1668 &range_del_agg));
1669 status = OverlapWithIterator(
1670 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1671 }
1672
1673 if (status.ok() && *overlap == false &&
1674 range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1675 *overlap = true;
1676 }
1677 return status;
1678 }
1679
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1680 VersionStorageInfo::VersionStorageInfo(
1681 const InternalKeyComparator* internal_comparator,
1682 const Comparator* user_comparator, int levels,
1683 CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1684 bool _force_consistency_checks)
1685 : internal_comparator_(internal_comparator),
1686 user_comparator_(user_comparator),
1687 // cfd is nullptr if Version is dummy
1688 num_levels_(levels),
1689 num_non_empty_levels_(0),
1690 file_indexer_(user_comparator),
1691 compaction_style_(compaction_style),
1692 files_(new std::vector<FileMetaData*>[num_levels_]),
1693 base_level_(num_levels_ == 1 ? -1 : 1),
1694 level_multiplier_(0.0),
1695 files_by_compaction_pri_(num_levels_),
1696 level0_non_overlapping_(false),
1697 next_file_to_compact_by_size_(num_levels_),
1698 compaction_score_(num_levels_),
1699 compaction_level_(num_levels_),
1700 l0_delay_trigger_count_(0),
1701 accumulated_file_size_(0),
1702 accumulated_raw_key_size_(0),
1703 accumulated_raw_value_size_(0),
1704 accumulated_num_non_deletions_(0),
1705 accumulated_num_deletions_(0),
1706 current_num_non_deletions_(0),
1707 current_num_deletions_(0),
1708 current_num_samples_(0),
1709 estimated_compaction_needed_bytes_(0),
1710 finalized_(false),
1711 force_consistency_checks_(_force_consistency_checks) {
1712 if (ref_vstorage != nullptr) {
1713 accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1714 accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1715 accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1716 accumulated_num_non_deletions_ =
1717 ref_vstorage->accumulated_num_non_deletions_;
1718 accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1719 current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1720 current_num_deletions_ = ref_vstorage->current_num_deletions_;
1721 current_num_samples_ = ref_vstorage->current_num_samples_;
1722 oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1723 }
1724 }
1725
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,uint64_t version_number)1726 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1727 const FileOptions& file_opt,
1728 const MutableCFOptions mutable_cf_options,
1729 uint64_t version_number)
1730 : env_(vset->env_),
1731 cfd_(column_family_data),
1732 info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1733 db_statistics_((cfd_ == nullptr) ? nullptr
1734 : cfd_->ioptions()->statistics),
1735 table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1736 merge_operator_((cfd_ == nullptr) ? nullptr
1737 : cfd_->ioptions()->merge_operator),
1738 storage_info_(
1739 (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1740 (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1741 cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1742 cfd_ == nullptr ? kCompactionStyleLevel
1743 : cfd_->ioptions()->compaction_style,
1744 (cfd_ == nullptr || cfd_->current() == nullptr)
1745 ? nullptr
1746 : cfd_->current()->storage_info(),
1747 cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1748 vset_(vset),
1749 next_(this),
1750 prev_(this),
1751 refs_(0),
1752 file_options_(file_opt),
1753 mutable_cf_options_(mutable_cf_options),
1754 version_number_(version_number) {}
1755
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1756 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1757 PinnableSlice* value, Status* status,
1758 MergeContext* merge_context,
1759 SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1760 bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1761 bool* is_blob, bool do_merge) {
1762 Slice ikey = k.internal_key();
1763 Slice user_key = k.user_key();
1764
1765 assert(status->ok() || status->IsMergeInProgress());
1766
1767 if (key_exists != nullptr) {
1768 // will falsify below if not found
1769 *key_exists = true;
1770 }
1771
1772 PinnedIteratorsManager pinned_iters_mgr;
1773 uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1774 if (vset_ && vset_->block_cache_tracer_ &&
1775 vset_->block_cache_tracer_->is_tracing_enabled()) {
1776 tracing_get_id = vset_->block_cache_tracer_->NextGetId();
1777 }
1778 GetContext get_context(
1779 user_comparator(), merge_operator_, info_log_, db_statistics_,
1780 status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1781 do_merge ? value : nullptr, value_found, merge_context, do_merge,
1782 max_covering_tombstone_seq, this->env_, seq,
1783 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1784 tracing_get_id);
1785
1786 // Pin blocks that we read to hold merge operands
1787 if (merge_operator_) {
1788 pinned_iters_mgr.StartPinning();
1789 }
1790
1791 FilePicker fp(
1792 storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1793 storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1794 user_comparator(), internal_comparator());
1795 FdWithKeyRange* f = fp.GetNextFile();
1796
1797 while (f != nullptr) {
1798 if (*max_covering_tombstone_seq > 0) {
1799 // The remaining files we look at will only contain covered keys, so we
1800 // stop here.
1801 break;
1802 }
1803 if (get_context.sample()) {
1804 sample_file_read_inc(f->file_metadata);
1805 }
1806
1807 bool timer_enabled =
1808 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1809 get_perf_context()->per_level_perf_context_enabled;
1810 StopWatchNano timer(env_, timer_enabled /* auto_start */);
1811 *status = table_cache_->Get(
1812 read_options, *internal_comparator(), *f->file_metadata, ikey,
1813 &get_context, mutable_cf_options_.prefix_extractor.get(),
1814 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1815 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1816 fp.IsHitFileLastInLevel()),
1817 fp.GetCurrentLevel());
1818 // TODO: examine the behavior for corrupted key
1819 if (timer_enabled) {
1820 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1821 fp.GetCurrentLevel());
1822 }
1823 if (!status->ok()) {
1824 return;
1825 }
1826
1827 // report the counters before returning
1828 if (get_context.State() != GetContext::kNotFound &&
1829 get_context.State() != GetContext::kMerge &&
1830 db_statistics_ != nullptr) {
1831 get_context.ReportCounters();
1832 }
1833 switch (get_context.State()) {
1834 case GetContext::kNotFound:
1835 // Keep searching in other files
1836 break;
1837 case GetContext::kMerge:
1838 // TODO: update per-level perfcontext user_key_return_count for kMerge
1839 break;
1840 case GetContext::kFound:
1841 if (fp.GetHitFileLevel() == 0) {
1842 RecordTick(db_statistics_, GET_HIT_L0);
1843 } else if (fp.GetHitFileLevel() == 1) {
1844 RecordTick(db_statistics_, GET_HIT_L1);
1845 } else if (fp.GetHitFileLevel() >= 2) {
1846 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1847 }
1848 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1849 fp.GetHitFileLevel());
1850 return;
1851 case GetContext::kDeleted:
1852 // Use empty error message for speed
1853 *status = Status::NotFound();
1854 return;
1855 case GetContext::kCorrupt:
1856 *status = Status::Corruption("corrupted key for ", user_key);
1857 return;
1858 case GetContext::kBlobIndex:
1859 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1860 *status = Status::NotSupported(
1861 "Encounter unexpected blob index. Please open DB with "
1862 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1863 return;
1864 }
1865 f = fp.GetNextFile();
1866 }
1867 if (db_statistics_ != nullptr) {
1868 get_context.ReportCounters();
1869 }
1870 if (GetContext::kMerge == get_context.State()) {
1871 if (!do_merge) {
1872 *status = Status::OK();
1873 return;
1874 }
1875 if (!merge_operator_) {
1876 *status = Status::InvalidArgument(
1877 "merge_operator is not properly initialized.");
1878 return;
1879 }
1880 // merge_operands are in saver and we hit the beginning of the key history
1881 // do a final merge of nullptr and operands;
1882 std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1883 *status = MergeHelper::TimedFullMerge(
1884 merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1885 str_value, info_log_, db_statistics_, env_,
1886 nullptr /* result_operand */, true);
1887 if (LIKELY(value != nullptr)) {
1888 value->PinSelf();
1889 }
1890 } else {
1891 if (key_exists != nullptr) {
1892 *key_exists = false;
1893 }
1894 *status = Status::NotFound(); // Use an empty error message for speed
1895 }
1896 }
1897
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)1898 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
1899 ReadCallback* callback, bool* is_blob) {
1900 PinnedIteratorsManager pinned_iters_mgr;
1901
1902 // Pin blocks that we read to hold merge operands
1903 if (merge_operator_) {
1904 pinned_iters_mgr.StartPinning();
1905 }
1906 uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
1907
1908 if (vset_ && vset_->block_cache_tracer_ &&
1909 vset_->block_cache_tracer_->is_tracing_enabled()) {
1910 tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
1911 }
1912 // Even though we know the batch size won't be > MAX_BATCH_SIZE,
1913 // use autovector in order to avoid unnecessary construction of GetContext
1914 // objects, which is expensive
1915 autovector<GetContext, 16> get_ctx;
1916 for (auto iter = range->begin(); iter != range->end(); ++iter) {
1917 assert(iter->s->ok() || iter->s->IsMergeInProgress());
1918 get_ctx.emplace_back(
1919 user_comparator(), merge_operator_, info_log_, db_statistics_,
1920 iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
1921 iter->value, nullptr, &(iter->merge_context), true,
1922 &iter->max_covering_tombstone_seq, this->env_, nullptr,
1923 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1924 tracing_mget_id);
1925 // MergeInProgress status, if set, has been transferred to the get_context
1926 // state, so we set status to ok here. From now on, the iter status will
1927 // be used for IO errors, and get_context state will be used for any
1928 // key level errors
1929 *(iter->s) = Status::OK();
1930 }
1931 int get_ctx_index = 0;
1932 for (auto iter = range->begin(); iter != range->end();
1933 ++iter, get_ctx_index++) {
1934 iter->get_context = &(get_ctx[get_ctx_index]);
1935 }
1936
1937 MultiGetRange file_picker_range(*range, range->begin(), range->end());
1938 FilePickerMultiGet fp(
1939 &file_picker_range,
1940 &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
1941 &storage_info_.file_indexer_, user_comparator(), internal_comparator());
1942 FdWithKeyRange* f = fp.GetNextFile();
1943
1944 while (f != nullptr) {
1945 MultiGetRange file_range = fp.CurrentFileRange();
1946 bool timer_enabled =
1947 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1948 get_perf_context()->per_level_perf_context_enabled;
1949 StopWatchNano timer(env_, timer_enabled /* auto_start */);
1950 Status s = table_cache_->MultiGet(
1951 read_options, *internal_comparator(), *f->file_metadata, &file_range,
1952 mutable_cf_options_.prefix_extractor.get(),
1953 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1954 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1955 fp.IsHitFileLastInLevel()),
1956 fp.GetCurrentLevel());
1957 // TODO: examine the behavior for corrupted key
1958 if (timer_enabled) {
1959 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1960 fp.GetCurrentLevel());
1961 }
1962 if (!s.ok()) {
1963 // TODO: Set status for individual keys appropriately
1964 for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1965 *iter->s = s;
1966 file_range.MarkKeyDone(iter);
1967 }
1968 return;
1969 }
1970 uint64_t batch_size = 0;
1971 for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1972 GetContext& get_context = *iter->get_context;
1973 Status* status = iter->s;
1974 // The Status in the KeyContext takes precedence over GetContext state
1975 // Status may be an error if there were any IO errors in the table
1976 // reader. We never expect Status to be NotFound(), as that is
1977 // determined by get_context
1978 assert(!status->IsNotFound());
1979 if (!status->ok()) {
1980 file_range.MarkKeyDone(iter);
1981 continue;
1982 }
1983
1984 if (get_context.sample()) {
1985 sample_file_read_inc(f->file_metadata);
1986 }
1987 batch_size++;
1988 // report the counters before returning
1989 if (get_context.State() != GetContext::kNotFound &&
1990 get_context.State() != GetContext::kMerge &&
1991 db_statistics_ != nullptr) {
1992 get_context.ReportCounters();
1993 } else {
1994 if (iter->max_covering_tombstone_seq > 0) {
1995 // The remaining files we look at will only contain covered keys, so
1996 // we stop here for this key
1997 file_picker_range.SkipKey(iter);
1998 }
1999 }
2000 switch (get_context.State()) {
2001 case GetContext::kNotFound:
2002 // Keep searching in other files
2003 break;
2004 case GetContext::kMerge:
2005 // TODO: update per-level perfcontext user_key_return_count for kMerge
2006 break;
2007 case GetContext::kFound:
2008 if (fp.GetHitFileLevel() == 0) {
2009 RecordTick(db_statistics_, GET_HIT_L0);
2010 } else if (fp.GetHitFileLevel() == 1) {
2011 RecordTick(db_statistics_, GET_HIT_L1);
2012 } else if (fp.GetHitFileLevel() >= 2) {
2013 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2014 }
2015 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2016 fp.GetHitFileLevel());
2017 file_range.MarkKeyDone(iter);
2018 continue;
2019 case GetContext::kDeleted:
2020 // Use empty error message for speed
2021 *status = Status::NotFound();
2022 file_range.MarkKeyDone(iter);
2023 continue;
2024 case GetContext::kCorrupt:
2025 *status =
2026 Status::Corruption("corrupted key for ", iter->lkey->user_key());
2027 file_range.MarkKeyDone(iter);
2028 continue;
2029 case GetContext::kBlobIndex:
2030 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2031 *status = Status::NotSupported(
2032 "Encounter unexpected blob index. Please open DB with "
2033 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2034 file_range.MarkKeyDone(iter);
2035 continue;
2036 }
2037 }
2038 RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2039 if (file_picker_range.empty()) {
2040 break;
2041 }
2042 f = fp.GetNextFile();
2043 }
2044
2045 // Process any left over keys
2046 for (auto iter = range->begin(); iter != range->end(); ++iter) {
2047 GetContext& get_context = *iter->get_context;
2048 Status* status = iter->s;
2049 Slice user_key = iter->lkey->user_key();
2050
2051 if (db_statistics_ != nullptr) {
2052 get_context.ReportCounters();
2053 }
2054 if (GetContext::kMerge == get_context.State()) {
2055 if (!merge_operator_) {
2056 *status = Status::InvalidArgument(
2057 "merge_operator is not properly initialized.");
2058 range->MarkKeyDone(iter);
2059 continue;
2060 }
2061 // merge_operands are in saver and we hit the beginning of the key history
2062 // do a final merge of nullptr and operands;
2063 std::string* str_value =
2064 iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2065 *status = MergeHelper::TimedFullMerge(
2066 merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2067 str_value, info_log_, db_statistics_, env_,
2068 nullptr /* result_operand */, true);
2069 if (LIKELY(iter->value != nullptr)) {
2070 iter->value->PinSelf();
2071 }
2072 } else {
2073 range->MarkKeyDone(iter);
2074 *status = Status::NotFound(); // Use an empty error message for speed
2075 }
2076 }
2077 }
2078
IsFilterSkipped(int level,bool is_file_last_in_level)2079 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2080 // Reaching the bottom level implies misses at all upper levels, so we'll
2081 // skip checking the filters when we predict a hit.
2082 return cfd_->ioptions()->optimize_filters_for_hits &&
2083 (level > 0 || is_file_last_in_level) &&
2084 level == storage_info_.num_non_empty_levels() - 1;
2085 }
2086
GenerateLevelFilesBrief()2087 void VersionStorageInfo::GenerateLevelFilesBrief() {
2088 level_files_brief_.resize(num_non_empty_levels_);
2089 for (int level = 0; level < num_non_empty_levels_; level++) {
2090 DoGenerateLevelFilesBrief(
2091 &level_files_brief_[level], files_[level], &arena_);
2092 }
2093 }
2094
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2095 void Version::PrepareApply(
2096 const MutableCFOptions& mutable_cf_options,
2097 bool update_stats) {
2098 UpdateAccumulatedStats(update_stats);
2099 storage_info_.UpdateNumNonEmptyLevels();
2100 storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2101 storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2102 storage_info_.GenerateFileIndexer();
2103 storage_info_.GenerateLevelFilesBrief();
2104 storage_info_.GenerateLevel0NonOverlapping();
2105 storage_info_.GenerateBottommostFiles();
2106 }
2107
MaybeInitializeFileMetaData(FileMetaData * file_meta)2108 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2109 if (file_meta->init_stats_from_file ||
2110 file_meta->compensated_file_size > 0) {
2111 return false;
2112 }
2113 std::shared_ptr<const TableProperties> tp;
2114 Status s = GetTableProperties(&tp, file_meta);
2115 file_meta->init_stats_from_file = true;
2116 if (!s.ok()) {
2117 ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2118 "Unable to load table properties for file %" PRIu64
2119 " --- %s\n",
2120 file_meta->fd.GetNumber(), s.ToString().c_str());
2121 return false;
2122 }
2123 if (tp.get() == nullptr) return false;
2124 file_meta->num_entries = tp->num_entries;
2125 file_meta->num_deletions = tp->num_deletions;
2126 file_meta->raw_value_size = tp->raw_value_size;
2127 file_meta->raw_key_size = tp->raw_key_size;
2128
2129 return true;
2130 }
2131
UpdateAccumulatedStats(FileMetaData * file_meta)2132 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2133 TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2134 nullptr);
2135
2136 assert(file_meta->init_stats_from_file);
2137 accumulated_file_size_ += file_meta->fd.GetFileSize();
2138 accumulated_raw_key_size_ += file_meta->raw_key_size;
2139 accumulated_raw_value_size_ += file_meta->raw_value_size;
2140 accumulated_num_non_deletions_ +=
2141 file_meta->num_entries - file_meta->num_deletions;
2142 accumulated_num_deletions_ += file_meta->num_deletions;
2143
2144 current_num_non_deletions_ +=
2145 file_meta->num_entries - file_meta->num_deletions;
2146 current_num_deletions_ += file_meta->num_deletions;
2147 current_num_samples_++;
2148 }
2149
RemoveCurrentStats(FileMetaData * file_meta)2150 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2151 if (file_meta->init_stats_from_file) {
2152 current_num_non_deletions_ -=
2153 file_meta->num_entries - file_meta->num_deletions;
2154 current_num_deletions_ -= file_meta->num_deletions;
2155 current_num_samples_--;
2156 }
2157 }
2158
UpdateAccumulatedStats(bool update_stats)2159 void Version::UpdateAccumulatedStats(bool update_stats) {
2160 if (update_stats) {
2161 // maximum number of table properties loaded from files.
2162 const int kMaxInitCount = 20;
2163 int init_count = 0;
2164 // here only the first kMaxInitCount files which haven't been
2165 // initialized from file will be updated with num_deletions.
2166 // The motivation here is to cap the maximum I/O per Version creation.
2167 // The reason for choosing files from lower-level instead of higher-level
2168 // is that such design is able to propagate the initialization from
2169 // lower-level to higher-level: When the num_deletions of lower-level
2170 // files are updated, it will make the lower-level files have accurate
2171 // compensated_file_size, making lower-level to higher-level compaction
2172 // will be triggered, which creates higher-level files whose num_deletions
2173 // will be updated here.
2174 for (int level = 0;
2175 level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2176 ++level) {
2177 for (auto* file_meta : storage_info_.files_[level]) {
2178 if (MaybeInitializeFileMetaData(file_meta)) {
2179 // each FileMeta will be initialized only once.
2180 storage_info_.UpdateAccumulatedStats(file_meta);
2181 // when option "max_open_files" is -1, all the file metadata has
2182 // already been read, so MaybeInitializeFileMetaData() won't incur
2183 // any I/O cost. "max_open_files=-1" means that the table cache passed
2184 // to the VersionSet and then to the ColumnFamilySet has a size of
2185 // TableCache::kInfiniteCapacity
2186 if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2187 TableCache::kInfiniteCapacity) {
2188 continue;
2189 }
2190 if (++init_count >= kMaxInitCount) {
2191 break;
2192 }
2193 }
2194 }
2195 }
2196 // In case all sampled-files contain only deletion entries, then we
2197 // load the table-property of a file in higher-level to initialize
2198 // that value.
2199 for (int level = storage_info_.num_levels_ - 1;
2200 storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2201 --level) {
2202 for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2203 storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2204 if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2205 storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2206 }
2207 }
2208 }
2209 }
2210
2211 storage_info_.ComputeCompensatedSizes();
2212 }
2213
ComputeCompensatedSizes()2214 void VersionStorageInfo::ComputeCompensatedSizes() {
2215 static const int kDeletionWeightOnCompaction = 2;
2216 uint64_t average_value_size = GetAverageValueSize();
2217
2218 // compute the compensated size
2219 for (int level = 0; level < num_levels_; level++) {
2220 for (auto* file_meta : files_[level]) {
2221 // Here we only compute compensated_file_size for those file_meta
2222 // which compensated_file_size is uninitialized (== 0). This is true only
2223 // for files that have been created right now and no other thread has
2224 // access to them. That's why we can safely mutate compensated_file_size.
2225 if (file_meta->compensated_file_size == 0) {
2226 file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2227 // Here we only boost the size of deletion entries of a file only
2228 // when the number of deletion entries is greater than the number of
2229 // non-deletion entries in the file. The motivation here is that in
2230 // a stable workload, the number of deletion entries should be roughly
2231 // equal to the number of non-deletion entries. If we compensate the
2232 // size of deletion entries in a stable workload, the deletion
2233 // compensation logic might introduce unwanted effet which changes the
2234 // shape of LSM tree.
2235 if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2236 file_meta->compensated_file_size +=
2237 (file_meta->num_deletions * 2 - file_meta->num_entries) *
2238 average_value_size * kDeletionWeightOnCompaction;
2239 }
2240 }
2241 }
2242 }
2243 }
2244
MaxInputLevel() const2245 int VersionStorageInfo::MaxInputLevel() const {
2246 if (compaction_style_ == kCompactionStyleLevel) {
2247 return num_levels() - 2;
2248 }
2249 return 0;
2250 }
2251
MaxOutputLevel(bool allow_ingest_behind) const2252 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2253 if (allow_ingest_behind) {
2254 assert(num_levels() > 1);
2255 return num_levels() - 2;
2256 }
2257 return num_levels() - 1;
2258 }
2259
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2260 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2261 const MutableCFOptions& mutable_cf_options) {
2262 // Only implemented for level-based compaction
2263 if (compaction_style_ != kCompactionStyleLevel) {
2264 estimated_compaction_needed_bytes_ = 0;
2265 return;
2266 }
2267
2268 // Start from Level 0, if level 0 qualifies compaction to level 1,
2269 // we estimate the size of compaction.
2270 // Then we move on to the next level and see whether it qualifies compaction
2271 // to the next level. The size of the level is estimated as the actual size
2272 // on the level plus the input bytes from the previous level if there is any.
2273 // If it exceeds, take the exceeded bytes as compaction input and add the size
2274 // of the compaction size to tatal size.
2275 // We keep doing it to Level 2, 3, etc, until the last level and return the
2276 // accumulated bytes.
2277
2278 uint64_t bytes_compact_to_next_level = 0;
2279 uint64_t level_size = 0;
2280 for (auto* f : files_[0]) {
2281 level_size += f->fd.GetFileSize();
2282 }
2283 // Level 0
2284 bool level0_compact_triggered = false;
2285 if (static_cast<int>(files_[0].size()) >=
2286 mutable_cf_options.level0_file_num_compaction_trigger ||
2287 level_size >= mutable_cf_options.max_bytes_for_level_base) {
2288 level0_compact_triggered = true;
2289 estimated_compaction_needed_bytes_ = level_size;
2290 bytes_compact_to_next_level = level_size;
2291 } else {
2292 estimated_compaction_needed_bytes_ = 0;
2293 }
2294
2295 // Level 1 and up.
2296 uint64_t bytes_next_level = 0;
2297 for (int level = base_level(); level <= MaxInputLevel(); level++) {
2298 level_size = 0;
2299 if (bytes_next_level > 0) {
2300 #ifndef NDEBUG
2301 uint64_t level_size2 = 0;
2302 for (auto* f : files_[level]) {
2303 level_size2 += f->fd.GetFileSize();
2304 }
2305 assert(level_size2 == bytes_next_level);
2306 #endif
2307 level_size = bytes_next_level;
2308 bytes_next_level = 0;
2309 } else {
2310 for (auto* f : files_[level]) {
2311 level_size += f->fd.GetFileSize();
2312 }
2313 }
2314 if (level == base_level() && level0_compact_triggered) {
2315 // Add base level size to compaction if level0 compaction triggered.
2316 estimated_compaction_needed_bytes_ += level_size;
2317 }
2318 // Add size added by previous compaction
2319 level_size += bytes_compact_to_next_level;
2320 bytes_compact_to_next_level = 0;
2321 uint64_t level_target = MaxBytesForLevel(level);
2322 if (level_size > level_target) {
2323 bytes_compact_to_next_level = level_size - level_target;
2324 // Estimate the actual compaction fan-out ratio as size ratio between
2325 // the two levels.
2326
2327 assert(bytes_next_level == 0);
2328 if (level + 1 < num_levels_) {
2329 for (auto* f : files_[level + 1]) {
2330 bytes_next_level += f->fd.GetFileSize();
2331 }
2332 }
2333 if (bytes_next_level > 0) {
2334 assert(level_size > 0);
2335 estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2336 static_cast<double>(bytes_compact_to_next_level) *
2337 (static_cast<double>(bytes_next_level) /
2338 static_cast<double>(level_size) +
2339 1));
2340 }
2341 }
2342 }
2343 }
2344
2345 namespace {
GetExpiredTtlFilesCount(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2346 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
2347 const MutableCFOptions& mutable_cf_options,
2348 const std::vector<FileMetaData*>& files) {
2349 uint32_t ttl_expired_files_count = 0;
2350
2351 int64_t _current_time;
2352 auto status = ioptions.env->GetCurrentTime(&_current_time);
2353 if (status.ok()) {
2354 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2355 for (FileMetaData* f : files) {
2356 if (!f->being_compacted) {
2357 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2358 if (oldest_ancester_time != 0 &&
2359 oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2360 ttl_expired_files_count++;
2361 }
2362 }
2363 }
2364 }
2365 return ttl_expired_files_count;
2366 }
2367 } // anonymous namespace
2368
ComputeCompactionScore(const ImmutableCFOptions & immutable_cf_options,const MutableCFOptions & mutable_cf_options)2369 void VersionStorageInfo::ComputeCompactionScore(
2370 const ImmutableCFOptions& immutable_cf_options,
2371 const MutableCFOptions& mutable_cf_options) {
2372 for (int level = 0; level <= MaxInputLevel(); level++) {
2373 double score;
2374 if (level == 0) {
2375 // We treat level-0 specially by bounding the number of files
2376 // instead of number of bytes for two reasons:
2377 //
2378 // (1) With larger write-buffer sizes, it is nice not to do too
2379 // many level-0 compactions.
2380 //
2381 // (2) The files in level-0 are merged on every read and
2382 // therefore we wish to avoid too many files when the individual
2383 // file size is small (perhaps because of a small write-buffer
2384 // setting, or very high compression ratios, or lots of
2385 // overwrites/deletions).
2386 int num_sorted_runs = 0;
2387 uint64_t total_size = 0;
2388 for (auto* f : files_[level]) {
2389 if (!f->being_compacted) {
2390 total_size += f->compensated_file_size;
2391 num_sorted_runs++;
2392 }
2393 }
2394 if (compaction_style_ == kCompactionStyleUniversal) {
2395 // For universal compaction, we use level0 score to indicate
2396 // compaction score for the whole DB. Adding other levels as if
2397 // they are L0 files.
2398 for (int i = 1; i < num_levels(); i++) {
2399 if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2400 num_sorted_runs++;
2401 }
2402 }
2403 }
2404
2405 if (compaction_style_ == kCompactionStyleFIFO) {
2406 score = static_cast<double>(total_size) /
2407 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2408 if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2409 score = std::max(
2410 static_cast<double>(num_sorted_runs) /
2411 mutable_cf_options.level0_file_num_compaction_trigger,
2412 score);
2413 }
2414 if (mutable_cf_options.ttl > 0) {
2415 score = std::max(
2416 static_cast<double>(GetExpiredTtlFilesCount(
2417 immutable_cf_options, mutable_cf_options, files_[level])),
2418 score);
2419 }
2420
2421 } else {
2422 score = static_cast<double>(num_sorted_runs) /
2423 mutable_cf_options.level0_file_num_compaction_trigger;
2424 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2425 // Level-based involves L0->L0 compactions that can lead to oversized
2426 // L0 files. Take into account size as well to avoid later giant
2427 // compactions to the base level.
2428 score = std::max(
2429 score, static_cast<double>(total_size) /
2430 mutable_cf_options.max_bytes_for_level_base);
2431 }
2432 }
2433 } else {
2434 // Compute the ratio of current size to size limit.
2435 uint64_t level_bytes_no_compacting = 0;
2436 for (auto f : files_[level]) {
2437 if (!f->being_compacted) {
2438 level_bytes_no_compacting += f->compensated_file_size;
2439 }
2440 }
2441 score = static_cast<double>(level_bytes_no_compacting) /
2442 MaxBytesForLevel(level);
2443 }
2444 compaction_level_[level] = level;
2445 compaction_score_[level] = score;
2446 }
2447
2448 // sort all the levels based on their score. Higher scores get listed
2449 // first. Use bubble sort because the number of entries are small.
2450 for (int i = 0; i < num_levels() - 2; i++) {
2451 for (int j = i + 1; j < num_levels() - 1; j++) {
2452 if (compaction_score_[i] < compaction_score_[j]) {
2453 double score = compaction_score_[i];
2454 int level = compaction_level_[i];
2455 compaction_score_[i] = compaction_score_[j];
2456 compaction_level_[i] = compaction_level_[j];
2457 compaction_score_[j] = score;
2458 compaction_level_[j] = level;
2459 }
2460 }
2461 }
2462 ComputeFilesMarkedForCompaction();
2463 ComputeBottommostFilesMarkedForCompaction();
2464 if (mutable_cf_options.ttl > 0) {
2465 ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
2466 }
2467 if (mutable_cf_options.periodic_compaction_seconds > 0) {
2468 ComputeFilesMarkedForPeriodicCompaction(
2469 immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
2470 }
2471 EstimateCompactionBytesNeeded(mutable_cf_options);
2472 }
2473
ComputeFilesMarkedForCompaction()2474 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2475 files_marked_for_compaction_.clear();
2476 int last_qualify_level = 0;
2477
2478 // Do not include files from the last level with data
2479 // If table properties collector suggests a file on the last level,
2480 // we should not move it to a new level.
2481 for (int level = num_levels() - 1; level >= 1; level--) {
2482 if (!files_[level].empty()) {
2483 last_qualify_level = level - 1;
2484 break;
2485 }
2486 }
2487
2488 for (int level = 0; level <= last_qualify_level; level++) {
2489 for (auto* f : files_[level]) {
2490 if (!f->being_compacted && f->marked_for_compaction) {
2491 files_marked_for_compaction_.emplace_back(level, f);
2492 }
2493 }
2494 }
2495 }
2496
ComputeExpiredTtlFiles(const ImmutableCFOptions & ioptions,const uint64_t ttl)2497 void VersionStorageInfo::ComputeExpiredTtlFiles(
2498 const ImmutableCFOptions& ioptions, const uint64_t ttl) {
2499 assert(ttl > 0);
2500
2501 expired_ttl_files_.clear();
2502
2503 int64_t _current_time;
2504 auto status = ioptions.env->GetCurrentTime(&_current_time);
2505 if (!status.ok()) {
2506 return;
2507 }
2508 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2509
2510 for (int level = 0; level < num_levels() - 1; level++) {
2511 for (FileMetaData* f : files_[level]) {
2512 if (!f->being_compacted) {
2513 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2514 if (oldest_ancester_time > 0 &&
2515 oldest_ancester_time < (current_time - ttl)) {
2516 expired_ttl_files_.emplace_back(level, f);
2517 }
2518 }
2519 }
2520 }
2521 }
2522
ComputeFilesMarkedForPeriodicCompaction(const ImmutableCFOptions & ioptions,const uint64_t periodic_compaction_seconds)2523 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2524 const ImmutableCFOptions& ioptions,
2525 const uint64_t periodic_compaction_seconds) {
2526 assert(periodic_compaction_seconds > 0);
2527
2528 files_marked_for_periodic_compaction_.clear();
2529
2530 int64_t temp_current_time;
2531 auto status = ioptions.env->GetCurrentTime(&temp_current_time);
2532 if (!status.ok()) {
2533 return;
2534 }
2535 const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2536
2537 // If periodic_compaction_seconds is larger than current time, periodic
2538 // compaction can't possibly be triggered.
2539 if (periodic_compaction_seconds > current_time) {
2540 return;
2541 }
2542
2543 const uint64_t allowed_time_limit =
2544 current_time - periodic_compaction_seconds;
2545
2546 for (int level = 0; level < num_levels(); level++) {
2547 for (auto f : files_[level]) {
2548 if (!f->being_compacted) {
2549 // Compute a file's modification time in the following order:
2550 // 1. Use file_creation_time table property if it is > 0.
2551 // 2. Use creation_time table property if it is > 0.
2552 // 3. Use file's mtime metadata if the above two table properties are 0.
2553 // Don't consider the file at all if the modification time cannot be
2554 // correctly determined based on the above conditions.
2555 uint64_t file_modification_time = f->TryGetFileCreationTime();
2556 if (file_modification_time == kUnknownFileCreationTime) {
2557 file_modification_time = f->TryGetOldestAncesterTime();
2558 }
2559 if (file_modification_time == kUnknownOldestAncesterTime) {
2560 auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2561 f->fd.GetPathId());
2562 status = ioptions.env->GetFileModificationTime(
2563 file_path, &file_modification_time);
2564 if (!status.ok()) {
2565 ROCKS_LOG_WARN(ioptions.info_log,
2566 "Can't get file modification time: %s: %s",
2567 file_path.c_str(), status.ToString().c_str());
2568 continue;
2569 }
2570 }
2571 if (file_modification_time > 0 &&
2572 file_modification_time < allowed_time_limit) {
2573 files_marked_for_periodic_compaction_.emplace_back(level, f);
2574 }
2575 }
2576 }
2577 }
2578 }
2579
2580 namespace {
2581
2582 // used to sort files by size
2583 struct Fsize {
2584 size_t index;
2585 FileMetaData* file;
2586 };
2587
2588 // Compator that is used to sort files based on their size
2589 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)2590 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2591 return (first.file->compensated_file_size >
2592 second.file->compensated_file_size);
2593 }
2594 } // anonymous namespace
2595
AddFile(int level,FileMetaData * f,Logger * info_log)2596 void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
2597 auto* level_files = &files_[level];
2598 // Must not overlap
2599 #ifndef NDEBUG
2600 if (level > 0 && !level_files->empty() &&
2601 internal_comparator_->Compare(
2602 (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
2603 auto* f2 = (*level_files)[level_files->size() - 1];
2604 if (info_log != nullptr) {
2605 Error(info_log, "Adding new file %" PRIu64
2606 " range (%s, %s) to level %d but overlapping "
2607 "with existing file %" PRIu64 " %s %s",
2608 f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
2609 f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
2610 f2->smallest.DebugString(true).c_str(),
2611 f2->largest.DebugString(true).c_str());
2612 LogFlush(info_log);
2613 }
2614 assert(false);
2615 }
2616 #else
2617 (void)info_log;
2618 #endif
2619 f->refs++;
2620 level_files->push_back(f);
2621 }
2622
2623 // Version::PrepareApply() need to be called before calling the function, or
2624 // following functions called:
2625 // 1. UpdateNumNonEmptyLevels();
2626 // 2. CalculateBaseBytes();
2627 // 3. UpdateFilesByCompactionPri();
2628 // 4. GenerateFileIndexer();
2629 // 5. GenerateLevelFilesBrief();
2630 // 6. GenerateLevel0NonOverlapping();
2631 // 7. GenerateBottommostFiles();
SetFinalized()2632 void VersionStorageInfo::SetFinalized() {
2633 finalized_ = true;
2634 #ifndef NDEBUG
2635 if (compaction_style_ != kCompactionStyleLevel) {
2636 // Not level based compaction.
2637 return;
2638 }
2639 assert(base_level_ < 0 || num_levels() == 1 ||
2640 (base_level_ >= 1 && base_level_ < num_levels()));
2641 // Verify all levels newer than base_level are empty except L0
2642 for (int level = 1; level < base_level(); level++) {
2643 assert(NumLevelBytes(level) == 0);
2644 }
2645 uint64_t max_bytes_prev_level = 0;
2646 for (int level = base_level(); level < num_levels() - 1; level++) {
2647 if (LevelFiles(level).size() == 0) {
2648 continue;
2649 }
2650 assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
2651 max_bytes_prev_level = MaxBytesForLevel(level);
2652 }
2653 int num_empty_non_l0_level = 0;
2654 for (int level = 0; level < num_levels(); level++) {
2655 assert(LevelFiles(level).size() == 0 ||
2656 LevelFiles(level).size() == LevelFilesBrief(level).num_files);
2657 if (level > 0 && NumLevelBytes(level) > 0) {
2658 num_empty_non_l0_level++;
2659 }
2660 if (LevelFiles(level).size() > 0) {
2661 assert(level < num_non_empty_levels());
2662 }
2663 }
2664 assert(compaction_level_.size() > 0);
2665 assert(compaction_level_.size() == compaction_score_.size());
2666 #endif
2667 }
2668
UpdateNumNonEmptyLevels()2669 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2670 num_non_empty_levels_ = num_levels_;
2671 for (int i = num_levels_ - 1; i >= 0; i--) {
2672 if (files_[i].size() != 0) {
2673 return;
2674 } else {
2675 num_non_empty_levels_ = i;
2676 }
2677 }
2678 }
2679
2680 namespace {
2681 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)2682 void SortFileByOverlappingRatio(
2683 const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
2684 const std::vector<FileMetaData*>& next_level_files,
2685 std::vector<Fsize>* temp) {
2686 std::unordered_map<uint64_t, uint64_t> file_to_order;
2687 auto next_level_it = next_level_files.begin();
2688
2689 for (auto& file : files) {
2690 uint64_t overlapping_bytes = 0;
2691 // Skip files in next level that is smaller than current file
2692 while (next_level_it != next_level_files.end() &&
2693 icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
2694 next_level_it++;
2695 }
2696
2697 while (next_level_it != next_level_files.end() &&
2698 icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
2699 overlapping_bytes += (*next_level_it)->fd.file_size;
2700
2701 if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
2702 // next level file cross large boundary of current file.
2703 break;
2704 }
2705 next_level_it++;
2706 }
2707
2708 assert(file->compensated_file_size != 0);
2709 file_to_order[file->fd.GetNumber()] =
2710 overlapping_bytes * 1024u / file->compensated_file_size;
2711 }
2712
2713 std::sort(temp->begin(), temp->end(),
2714 [&](const Fsize& f1, const Fsize& f2) -> bool {
2715 return file_to_order[f1.file->fd.GetNumber()] <
2716 file_to_order[f2.file->fd.GetNumber()];
2717 });
2718 }
2719 } // namespace
2720
UpdateFilesByCompactionPri(CompactionPri compaction_pri)2721 void VersionStorageInfo::UpdateFilesByCompactionPri(
2722 CompactionPri compaction_pri) {
2723 if (compaction_style_ == kCompactionStyleNone ||
2724 compaction_style_ == kCompactionStyleFIFO ||
2725 compaction_style_ == kCompactionStyleUniversal) {
2726 // don't need this
2727 return;
2728 }
2729 // No need to sort the highest level because it is never compacted.
2730 for (int level = 0; level < num_levels() - 1; level++) {
2731 const std::vector<FileMetaData*>& files = files_[level];
2732 auto& files_by_compaction_pri = files_by_compaction_pri_[level];
2733 assert(files_by_compaction_pri.size() == 0);
2734
2735 // populate a temp vector for sorting based on size
2736 std::vector<Fsize> temp(files.size());
2737 for (size_t i = 0; i < files.size(); i++) {
2738 temp[i].index = i;
2739 temp[i].file = files[i];
2740 }
2741
2742 // sort the top number_of_files_to_sort_ based on file size
2743 size_t num = VersionStorageInfo::kNumberFilesToSort;
2744 if (num > temp.size()) {
2745 num = temp.size();
2746 }
2747 switch (compaction_pri) {
2748 case kByCompensatedSize:
2749 std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
2750 CompareCompensatedSizeDescending);
2751 break;
2752 case kOldestLargestSeqFirst:
2753 std::sort(temp.begin(), temp.end(),
2754 [](const Fsize& f1, const Fsize& f2) -> bool {
2755 return f1.file->fd.largest_seqno <
2756 f2.file->fd.largest_seqno;
2757 });
2758 break;
2759 case kOldestSmallestSeqFirst:
2760 std::sort(temp.begin(), temp.end(),
2761 [](const Fsize& f1, const Fsize& f2) -> bool {
2762 return f1.file->fd.smallest_seqno <
2763 f2.file->fd.smallest_seqno;
2764 });
2765 break;
2766 case kMinOverlappingRatio:
2767 SortFileByOverlappingRatio(*internal_comparator_, files_[level],
2768 files_[level + 1], &temp);
2769 break;
2770 default:
2771 assert(false);
2772 }
2773 assert(temp.size() == files.size());
2774
2775 // initialize files_by_compaction_pri_
2776 for (size_t i = 0; i < temp.size(); i++) {
2777 files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2778 }
2779 next_file_to_compact_by_size_[level] = 0;
2780 assert(files_[level].size() == files_by_compaction_pri_[level].size());
2781 }
2782 }
2783
GenerateLevel0NonOverlapping()2784 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2785 assert(!finalized_);
2786 level0_non_overlapping_ = true;
2787 if (level_files_brief_.size() == 0) {
2788 return;
2789 }
2790
2791 // A copy of L0 files sorted by smallest key
2792 std::vector<FdWithKeyRange> level0_sorted_file(
2793 level_files_brief_[0].files,
2794 level_files_brief_[0].files + level_files_brief_[0].num_files);
2795 std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
2796 [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
2797 return (internal_comparator_->Compare(f1.smallest_key,
2798 f2.smallest_key) < 0);
2799 });
2800
2801 for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
2802 FdWithKeyRange& f = level0_sorted_file[i];
2803 FdWithKeyRange& prev = level0_sorted_file[i - 1];
2804 if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
2805 level0_non_overlapping_ = false;
2806 break;
2807 }
2808 }
2809 }
2810
GenerateBottommostFiles()2811 void VersionStorageInfo::GenerateBottommostFiles() {
2812 assert(!finalized_);
2813 assert(bottommost_files_.empty());
2814 for (size_t level = 0; level < level_files_brief_.size(); ++level) {
2815 for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
2816 ++file_idx) {
2817 const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
2818 int l0_file_idx;
2819 if (level == 0) {
2820 l0_file_idx = static_cast<int>(file_idx);
2821 } else {
2822 l0_file_idx = -1;
2823 }
2824 Slice smallest_user_key = ExtractUserKey(f.smallest_key);
2825 Slice largest_user_key = ExtractUserKey(f.largest_key);
2826 if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2827 static_cast<int>(level),
2828 l0_file_idx)) {
2829 bottommost_files_.emplace_back(static_cast<int>(level),
2830 f.file_metadata);
2831 }
2832 }
2833 }
2834 }
2835
UpdateOldestSnapshot(SequenceNumber seqnum)2836 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
2837 assert(seqnum >= oldest_snapshot_seqnum_);
2838 oldest_snapshot_seqnum_ = seqnum;
2839 if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
2840 ComputeBottommostFilesMarkedForCompaction();
2841 }
2842 }
2843
ComputeBottommostFilesMarkedForCompaction()2844 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
2845 bottommost_files_marked_for_compaction_.clear();
2846 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2847 for (auto& level_and_file : bottommost_files_) {
2848 if (!level_and_file.second->being_compacted &&
2849 level_and_file.second->fd.largest_seqno != 0 &&
2850 level_and_file.second->num_deletions > 1) {
2851 // largest_seqno might be nonzero due to containing the final key in an
2852 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
2853 // ensures the file really contains deleted or overwritten keys.
2854 if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2855 bottommost_files_marked_for_compaction_.push_back(level_and_file);
2856 } else {
2857 bottommost_files_mark_threshold_ =
2858 std::min(bottommost_files_mark_threshold_,
2859 level_and_file.second->fd.largest_seqno);
2860 }
2861 }
2862 }
2863 }
2864
Ref()2865 void Version::Ref() {
2866 ++refs_;
2867 }
2868
Unref()2869 bool Version::Unref() {
2870 assert(refs_ >= 1);
2871 --refs_;
2872 if (refs_ == 0) {
2873 delete this;
2874 return true;
2875 }
2876 return false;
2877 }
2878
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)2879 bool VersionStorageInfo::OverlapInLevel(int level,
2880 const Slice* smallest_user_key,
2881 const Slice* largest_user_key) {
2882 if (level >= num_non_empty_levels_) {
2883 // empty level, no overlap
2884 return false;
2885 }
2886 return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2887 level_files_brief_[level], smallest_user_key,
2888 largest_user_key);
2889 }
2890
2891 // Store in "*inputs" all files in "level" that overlap [begin,end]
2892 // If hint_index is specified, then it points to a file in the
2893 // overlapping range.
2894 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const2895 void VersionStorageInfo::GetOverlappingInputs(
2896 int level, const InternalKey* begin, const InternalKey* end,
2897 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2898 bool expand_range, InternalKey** next_smallest) const {
2899 if (level >= num_non_empty_levels_) {
2900 // this level is empty, no overlapping inputs
2901 return;
2902 }
2903
2904 inputs->clear();
2905 if (file_index) {
2906 *file_index = -1;
2907 }
2908 const Comparator* user_cmp = user_comparator_;
2909 if (level > 0) {
2910 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
2911 file_index, false, next_smallest);
2912 return;
2913 }
2914
2915 if (next_smallest) {
2916 // next_smallest key only makes sense for non-level 0, where files are
2917 // non-overlapping
2918 *next_smallest = nullptr;
2919 }
2920
2921 Slice user_begin, user_end;
2922 if (begin != nullptr) {
2923 user_begin = begin->user_key();
2924 }
2925 if (end != nullptr) {
2926 user_end = end->user_key();
2927 }
2928
2929 // index stores the file index need to check.
2930 std::list<size_t> index;
2931 for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
2932 index.emplace_back(i);
2933 }
2934
2935 while (!index.empty()) {
2936 bool found_overlapping_file = false;
2937 auto iter = index.begin();
2938 while (iter != index.end()) {
2939 FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
2940 const Slice file_start = ExtractUserKey(f->smallest_key);
2941 const Slice file_limit = ExtractUserKey(f->largest_key);
2942 if (begin != nullptr &&
2943 user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
2944 // "f" is completely before specified range; skip it
2945 iter++;
2946 } else if (end != nullptr &&
2947 user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
2948 // "f" is completely after specified range; skip it
2949 iter++;
2950 } else {
2951 // if overlap
2952 inputs->emplace_back(files_[level][*iter]);
2953 found_overlapping_file = true;
2954 // record the first file index.
2955 if (file_index && *file_index == -1) {
2956 *file_index = static_cast<int>(*iter);
2957 }
2958 // the related file is overlap, erase to avoid checking again.
2959 iter = index.erase(iter);
2960 if (expand_range) {
2961 if (begin != nullptr &&
2962 user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
2963 user_begin = file_start;
2964 }
2965 if (end != nullptr &&
2966 user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
2967 user_end = file_limit;
2968 }
2969 }
2970 }
2971 }
2972 // if all the files left are not overlap, break
2973 if (!found_overlapping_file) {
2974 break;
2975 }
2976 }
2977 }
2978
2979 // Store in "*inputs" files in "level" that within range [begin,end]
2980 // Guarantee a "clean cut" boundary between the files in inputs
2981 // and the surrounding files and the maxinum number of files.
2982 // This will ensure that no parts of a key are lost during compaction.
2983 // If hint_index is specified, then it points to a file in the range.
2984 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const2985 void VersionStorageInfo::GetCleanInputsWithinInterval(
2986 int level, const InternalKey* begin, const InternalKey* end,
2987 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2988 inputs->clear();
2989 if (file_index) {
2990 *file_index = -1;
2991 }
2992 if (level >= num_non_empty_levels_ || level == 0 ||
2993 level_files_brief_[level].num_files == 0) {
2994 // this level is empty, no inputs within range
2995 // also don't support clean input interval within L0
2996 return;
2997 }
2998
2999 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3000 hint_index, file_index,
3001 true /* within_interval */);
3002 }
3003
3004 // Store in "*inputs" all files in "level" that overlap [begin,end]
3005 // Employ binary search to find at least one file that overlaps the
3006 // specified range. From that file, iterate backwards and
3007 // forwards to find all overlapping files.
3008 // if within_range is set, then only store the maximum clean inputs
3009 // within range [begin, end]. "clean" means there is a boudnary
3010 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3011 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3012 int level, const InternalKey* begin, const InternalKey* end,
3013 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3014 bool within_interval, InternalKey** next_smallest) const {
3015 assert(level > 0);
3016
3017 auto user_cmp = user_comparator_;
3018 const FdWithKeyRange* files = level_files_brief_[level].files;
3019 const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3020
3021 // begin to use binary search to find lower bound
3022 // and upper bound.
3023 int start_index = 0;
3024 int end_index = num_files;
3025
3026 if (begin != nullptr) {
3027 // if within_interval is true, with file_key would find
3028 // not overlapping ranges in std::lower_bound.
3029 auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3030 const InternalKey* k) {
3031 auto& file_key = within_interval ? f.file_metadata->smallest
3032 : f.file_metadata->largest;
3033 return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3034 };
3035
3036 start_index = static_cast<int>(
3037 std::lower_bound(files,
3038 files + (hint_index == -1 ? num_files : hint_index),
3039 begin, cmp) -
3040 files);
3041
3042 if (start_index > 0 && within_interval) {
3043 bool is_overlapping = true;
3044 while (is_overlapping && start_index < num_files) {
3045 auto& pre_limit = files[start_index - 1].file_metadata->largest;
3046 auto& cur_start = files[start_index].file_metadata->smallest;
3047 is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3048 start_index += is_overlapping;
3049 }
3050 }
3051 }
3052
3053 if (end != nullptr) {
3054 // if within_interval is true, with file_key would find
3055 // not overlapping ranges in std::upper_bound.
3056 auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3057 const FdWithKeyRange& f) {
3058 auto& file_key = within_interval ? f.file_metadata->largest
3059 : f.file_metadata->smallest;
3060 return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3061 };
3062
3063 end_index = static_cast<int>(
3064 std::upper_bound(files + start_index, files + num_files, end, cmp) -
3065 files);
3066
3067 if (end_index < num_files && within_interval) {
3068 bool is_overlapping = true;
3069 while (is_overlapping && end_index > start_index) {
3070 auto& next_start = files[end_index].file_metadata->smallest;
3071 auto& cur_limit = files[end_index - 1].file_metadata->largest;
3072 is_overlapping =
3073 sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3074 end_index -= is_overlapping;
3075 }
3076 }
3077 }
3078
3079 assert(start_index <= end_index);
3080
3081 // If there were no overlapping files, return immediately.
3082 if (start_index == end_index) {
3083 if (next_smallest) {
3084 *next_smallest = nullptr;
3085 }
3086 return;
3087 }
3088
3089 assert(start_index < end_index);
3090
3091 // returns the index where an overlap is found
3092 if (file_index) {
3093 *file_index = start_index;
3094 }
3095
3096 // insert overlapping files into vector
3097 for (int i = start_index; i < end_index; i++) {
3098 inputs->push_back(files_[level][i]);
3099 }
3100
3101 if (next_smallest != nullptr) {
3102 // Provide the next key outside the range covered by inputs
3103 if (end_index < static_cast<int>(files_[level].size())) {
3104 **next_smallest = files_[level][end_index]->smallest;
3105 } else {
3106 *next_smallest = nullptr;
3107 }
3108 }
3109 }
3110
NumLevelBytes(int level) const3111 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3112 assert(level >= 0);
3113 assert(level < num_levels());
3114 return TotalFileSize(files_[level]);
3115 }
3116
LevelSummary(LevelSummaryStorage * scratch) const3117 const char* VersionStorageInfo::LevelSummary(
3118 LevelSummaryStorage* scratch) const {
3119 int len = 0;
3120 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3121 assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3122 if (level_multiplier_ != 0.0) {
3123 len = snprintf(
3124 scratch->buffer, sizeof(scratch->buffer),
3125 "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3126 base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3127 }
3128 }
3129 len +=
3130 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3131 for (int i = 0; i < num_levels(); i++) {
3132 int sz = sizeof(scratch->buffer) - len;
3133 int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3134 if (ret < 0 || ret >= sz) break;
3135 len += ret;
3136 }
3137 if (len > 0) {
3138 // overwrite the last space
3139 --len;
3140 }
3141 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3142 "] max score %.2f", compaction_score_[0]);
3143
3144 if (!files_marked_for_compaction_.empty()) {
3145 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3146 " (%" ROCKSDB_PRIszt " files need compaction)",
3147 files_marked_for_compaction_.size());
3148 }
3149
3150 return scratch->buffer;
3151 }
3152
LevelFileSummary(FileSummaryStorage * scratch,int level) const3153 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3154 int level) const {
3155 int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3156 for (const auto& f : files_[level]) {
3157 int sz = sizeof(scratch->buffer) - len;
3158 char sztxt[16];
3159 AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3160 int ret = snprintf(scratch->buffer + len, sz,
3161 "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3162 f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3163 static_cast<int>(f->being_compacted));
3164 if (ret < 0 || ret >= sz)
3165 break;
3166 len += ret;
3167 }
3168 // overwrite the last space (only if files_[level].size() is non-zero)
3169 if (files_[level].size() && len > 0) {
3170 --len;
3171 }
3172 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3173 return scratch->buffer;
3174 }
3175
MaxNextLevelOverlappingBytes()3176 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3177 uint64_t result = 0;
3178 std::vector<FileMetaData*> overlaps;
3179 for (int level = 1; level < num_levels() - 1; level++) {
3180 for (const auto& f : files_[level]) {
3181 GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3182 const uint64_t sum = TotalFileSize(overlaps);
3183 if (sum > result) {
3184 result = sum;
3185 }
3186 }
3187 }
3188 return result;
3189 }
3190
MaxBytesForLevel(int level) const3191 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3192 // Note: the result for level zero is not really used since we set
3193 // the level-0 compaction threshold based on number of files.
3194 assert(level >= 0);
3195 assert(level < static_cast<int>(level_max_bytes_.size()));
3196 return level_max_bytes_[level];
3197 }
3198
CalculateBaseBytes(const ImmutableCFOptions & ioptions,const MutableCFOptions & options)3199 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
3200 const MutableCFOptions& options) {
3201 // Special logic to set number of sorted runs.
3202 // It is to match the previous behavior when all files are in L0.
3203 int num_l0_count = static_cast<int>(files_[0].size());
3204 if (compaction_style_ == kCompactionStyleUniversal) {
3205 // For universal compaction, we use level0 score to indicate
3206 // compaction score for the whole DB. Adding other levels as if
3207 // they are L0 files.
3208 for (int i = 1; i < num_levels(); i++) {
3209 if (!files_[i].empty()) {
3210 num_l0_count++;
3211 }
3212 }
3213 }
3214 set_l0_delay_trigger_count(num_l0_count);
3215
3216 level_max_bytes_.resize(ioptions.num_levels);
3217 if (!ioptions.level_compaction_dynamic_level_bytes) {
3218 base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3219
3220 // Calculate for static bytes base case
3221 for (int i = 0; i < ioptions.num_levels; ++i) {
3222 if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3223 level_max_bytes_[i] = options.max_bytes_for_level_base;
3224 } else if (i > 1) {
3225 level_max_bytes_[i] = MultiplyCheckOverflow(
3226 MultiplyCheckOverflow(level_max_bytes_[i - 1],
3227 options.max_bytes_for_level_multiplier),
3228 options.MaxBytesMultiplerAdditional(i - 1));
3229 } else {
3230 level_max_bytes_[i] = options.max_bytes_for_level_base;
3231 }
3232 }
3233 } else {
3234 uint64_t max_level_size = 0;
3235
3236 int first_non_empty_level = -1;
3237 // Find size of non-L0 level of most data.
3238 // Cannot use the size of the last level because it can be empty or less
3239 // than previous levels after compaction.
3240 for (int i = 1; i < num_levels_; i++) {
3241 uint64_t total_size = 0;
3242 for (const auto& f : files_[i]) {
3243 total_size += f->fd.GetFileSize();
3244 }
3245 if (total_size > 0 && first_non_empty_level == -1) {
3246 first_non_empty_level = i;
3247 }
3248 if (total_size > max_level_size) {
3249 max_level_size = total_size;
3250 }
3251 }
3252
3253 // Prefill every level's max bytes to disallow compaction from there.
3254 for (int i = 0; i < num_levels_; i++) {
3255 level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3256 }
3257
3258 if (max_level_size == 0) {
3259 // No data for L1 and up. L0 compacts to last level directly.
3260 // No compaction from L1+ needs to be scheduled.
3261 base_level_ = num_levels_ - 1;
3262 } else {
3263 uint64_t l0_size = 0;
3264 for (const auto& f : files_[0]) {
3265 l0_size += f->fd.GetFileSize();
3266 }
3267
3268 uint64_t base_bytes_max =
3269 std::max(options.max_bytes_for_level_base, l0_size);
3270 uint64_t base_bytes_min = static_cast<uint64_t>(
3271 base_bytes_max / options.max_bytes_for_level_multiplier);
3272
3273 // Try whether we can make last level's target size to be max_level_size
3274 uint64_t cur_level_size = max_level_size;
3275 for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3276 // Round up after dividing
3277 cur_level_size = static_cast<uint64_t>(
3278 cur_level_size / options.max_bytes_for_level_multiplier);
3279 }
3280
3281 // Calculate base level and its size.
3282 uint64_t base_level_size;
3283 if (cur_level_size <= base_bytes_min) {
3284 // Case 1. If we make target size of last level to be max_level_size,
3285 // target size of the first non-empty level would be smaller than
3286 // base_bytes_min. We set it be base_bytes_min.
3287 base_level_size = base_bytes_min + 1U;
3288 base_level_ = first_non_empty_level;
3289 ROCKS_LOG_INFO(ioptions.info_log,
3290 "More existing levels in DB than needed. "
3291 "max_bytes_for_level_multiplier may not be guaranteed.");
3292 } else {
3293 // Find base level (where L0 data is compacted to).
3294 base_level_ = first_non_empty_level;
3295 while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3296 --base_level_;
3297 cur_level_size = static_cast<uint64_t>(
3298 cur_level_size / options.max_bytes_for_level_multiplier);
3299 }
3300 if (cur_level_size > base_bytes_max) {
3301 // Even L1 will be too large
3302 assert(base_level_ == 1);
3303 base_level_size = base_bytes_max;
3304 } else {
3305 base_level_size = cur_level_size;
3306 }
3307 }
3308
3309 level_multiplier_ = options.max_bytes_for_level_multiplier;
3310 assert(base_level_size > 0);
3311 if (l0_size > base_level_size &&
3312 (l0_size > options.max_bytes_for_level_base ||
3313 static_cast<int>(files_[0].size() / 2) >=
3314 options.level0_file_num_compaction_trigger)) {
3315 // We adjust the base level according to actual L0 size, and adjust
3316 // the level multiplier accordingly, when:
3317 // 1. the L0 size is larger than level size base, or
3318 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
3319 // We don't do this otherwise to keep the LSM-tree structure stable
3320 // unless the L0 compation is backlogged.
3321 base_level_size = l0_size;
3322 if (base_level_ == num_levels_ - 1) {
3323 level_multiplier_ = 1.0;
3324 } else {
3325 level_multiplier_ = std::pow(
3326 static_cast<double>(max_level_size) /
3327 static_cast<double>(base_level_size),
3328 1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3329 }
3330 }
3331
3332 uint64_t level_size = base_level_size;
3333 for (int i = base_level_; i < num_levels_; i++) {
3334 if (i > base_level_) {
3335 level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3336 }
3337 // Don't set any level below base_bytes_max. Otherwise, the LSM can
3338 // assume an hourglass shape where L1+ sizes are smaller than L0. This
3339 // causes compaction scoring, which depends on level sizes, to favor L1+
3340 // at the expense of L0, which may fill up and stall.
3341 level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3342 }
3343 }
3344 }
3345 }
3346
EstimateLiveDataSize() const3347 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3348 // Estimate the live data size by adding up the size of the last level for all
3349 // key ranges. Note: Estimate depends on the ordering of files in level 0
3350 // because files in level 0 can be overlapping.
3351 uint64_t size = 0;
3352
3353 auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3354 return internal_comparator_->Compare(*x, *y) < 0;
3355 };
3356 // (Ordered) map of largest keys in non-overlapping files
3357 std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3358
3359 for (int l = num_levels_ - 1; l >= 0; l--) {
3360 bool found_end = false;
3361 for (auto file : files_[l]) {
3362 // Find the first file where the largest key is larger than the smallest
3363 // key of the current file. If this file does not overlap with the
3364 // current file, none of the files in the map does. If there is
3365 // no potential overlap, we can safely insert the rest of this level
3366 // (if the level is not 0) into the map without checking again because
3367 // the elements in the level are sorted and non-overlapping.
3368 auto lb = (found_end && l != 0) ?
3369 ranges.end() : ranges.lower_bound(&file->smallest);
3370 found_end = (lb == ranges.end());
3371 if (found_end || internal_comparator_->Compare(
3372 file->largest, (*lb).second->smallest) < 0) {
3373 ranges.emplace_hint(lb, &file->largest, file);
3374 size += file->fd.file_size;
3375 }
3376 }
3377 }
3378 return size;
3379 }
3380
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3381 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3382 const Slice& smallest_user_key, const Slice& largest_user_key,
3383 int last_level, int last_l0_idx) {
3384 assert((last_l0_idx != -1) == (last_level == 0));
3385 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3386 // bottommost only if it's the oldest L0 file and there are no files on older
3387 // levels. It'd be better to consider it bottommost if there's no overlap in
3388 // older levels/files.
3389 if (last_level == 0 &&
3390 last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3391 return true;
3392 }
3393
3394 // Checks whether there are files living beyond the `last_level`. If lower
3395 // levels have files, it checks for overlap between [`smallest_key`,
3396 // `largest_key`] and those files. Bottomlevel optimizations can be made if
3397 // there are no files in lower levels or if there is no overlap with the files
3398 // in the lower levels.
3399 for (int level = last_level + 1; level < num_levels(); level++) {
3400 // The range is not in the bottommost level if there are files in lower
3401 // levels when the `last_level` is 0 or if there are files in lower levels
3402 // which overlap with [`smallest_key`, `largest_key`].
3403 if (files_[level].size() > 0 &&
3404 (last_level == 0 ||
3405 OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3406 return true;
3407 }
3408 }
3409 return false;
3410 }
3411
AddLiveFiles(std::vector<FileDescriptor> * live)3412 void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
3413 for (int level = 0; level < storage_info_.num_levels(); level++) {
3414 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3415 for (const auto& file : files) {
3416 live->push_back(file->fd);
3417 }
3418 }
3419 }
3420
DebugString(bool hex,bool print_stats) const3421 std::string Version::DebugString(bool hex, bool print_stats) const {
3422 std::string r;
3423 for (int level = 0; level < storage_info_.num_levels_; level++) {
3424 // E.g.,
3425 // --- level 1 ---
3426 // 17:123[1 .. 124]['a' .. 'd']
3427 // 20:43[124 .. 128]['e' .. 'g']
3428 //
3429 // if print_stats=true:
3430 // 17:123[1 .. 124]['a' .. 'd'](4096)
3431 r.append("--- level ");
3432 AppendNumberTo(&r, level);
3433 r.append(" --- version# ");
3434 AppendNumberTo(&r, version_number_);
3435 r.append(" ---\n");
3436 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3437 for (size_t i = 0; i < files.size(); i++) {
3438 r.push_back(' ');
3439 AppendNumberTo(&r, files[i]->fd.GetNumber());
3440 r.push_back(':');
3441 AppendNumberTo(&r, files[i]->fd.GetFileSize());
3442 r.append("[");
3443 AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3444 r.append(" .. ");
3445 AppendNumberTo(&r, files[i]->fd.largest_seqno);
3446 r.append("]");
3447 r.append("[");
3448 r.append(files[i]->smallest.DebugString(hex));
3449 r.append(" .. ");
3450 r.append(files[i]->largest.DebugString(hex));
3451 r.append("]");
3452 if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3453 r.append(" blob_file:");
3454 AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3455 }
3456 if (print_stats) {
3457 r.append("(");
3458 r.append(ToString(
3459 files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3460 r.append(")");
3461 }
3462 r.append("\n");
3463 }
3464 }
3465 return r;
3466 }
3467
3468 // this is used to batch writes to the manifest file
3469 struct VersionSet::ManifestWriter {
3470 Status status;
3471 bool done;
3472 InstrumentedCondVar cv;
3473 ColumnFamilyData* cfd;
3474 const MutableCFOptions mutable_cf_options;
3475 const autovector<VersionEdit*>& edit_list;
3476
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3477 explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3478 const MutableCFOptions& cf_options,
3479 const autovector<VersionEdit*>& e)
3480 : done(false),
3481 cv(mu),
3482 cfd(_cfd),
3483 mutable_cf_options(cf_options),
3484 edit_list(e) {}
3485 };
3486
AddEdit(VersionEdit * edit)3487 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3488 assert(edit);
3489 if (edit->is_in_atomic_group_) {
3490 TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3491 if (replay_buffer_.empty()) {
3492 replay_buffer_.resize(edit->remaining_entries_ + 1);
3493 TEST_SYNC_POINT_CALLBACK(
3494 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
3495 }
3496 read_edits_in_atomic_group_++;
3497 if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
3498 static_cast<uint32_t>(replay_buffer_.size())) {
3499 TEST_SYNC_POINT_CALLBACK(
3500 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
3501 return Status::Corruption("corrupted atomic group");
3502 }
3503 replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3504 if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
3505 TEST_SYNC_POINT_CALLBACK(
3506 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
3507 return Status::OK();
3508 }
3509 return Status::OK();
3510 }
3511
3512 // A normal edit.
3513 if (!replay_buffer().empty()) {
3514 TEST_SYNC_POINT_CALLBACK(
3515 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
3516 return Status::Corruption("corrupted atomic group");
3517 }
3518 return Status::OK();
3519 }
3520
IsFull() const3521 bool AtomicGroupReadBuffer::IsFull() const {
3522 return read_edits_in_atomic_group_ == replay_buffer_.size();
3523 }
3524
IsEmpty() const3525 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
3526
Clear()3527 void AtomicGroupReadBuffer::Clear() {
3528 read_edits_in_atomic_group_ = 0;
3529 replay_buffer_.clear();
3530 }
3531
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer)3532 VersionSet::VersionSet(const std::string& dbname,
3533 const ImmutableDBOptions* _db_options,
3534 const FileOptions& storage_options, Cache* table_cache,
3535 WriteBufferManager* write_buffer_manager,
3536 WriteController* write_controller,
3537 BlockCacheTracer* const block_cache_tracer)
3538 : column_family_set_(new ColumnFamilySet(
3539 dbname, _db_options, storage_options, table_cache,
3540 write_buffer_manager, write_controller, block_cache_tracer)),
3541 env_(_db_options->env),
3542 fs_(_db_options->fs.get()),
3543 dbname_(dbname),
3544 db_options_(_db_options),
3545 next_file_number_(2),
3546 manifest_file_number_(0), // Filled by Recover()
3547 options_file_number_(0),
3548 pending_manifest_file_number_(0),
3549 last_sequence_(0),
3550 last_allocated_sequence_(0),
3551 last_published_sequence_(0),
3552 prev_log_number_(0),
3553 current_version_number_(0),
3554 manifest_file_size_(0),
3555 file_options_(storage_options),
3556 block_cache_tracer_(block_cache_tracer) {}
3557
~VersionSet()3558 VersionSet::~VersionSet() {
3559 // we need to delete column_family_set_ because its destructor depends on
3560 // VersionSet
3561 Cache* table_cache = column_family_set_->get_table_cache();
3562 column_family_set_.reset();
3563 for (auto& file : obsolete_files_) {
3564 if (file.metadata->table_reader_handle) {
3565 table_cache->Release(file.metadata->table_reader_handle);
3566 TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
3567 }
3568 file.DeleteMetadata();
3569 }
3570 obsolete_files_.clear();
3571 }
3572
AppendVersion(ColumnFamilyData * column_family_data,Version * v)3573 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
3574 Version* v) {
3575 // compute new compaction score
3576 v->storage_info()->ComputeCompactionScore(
3577 *column_family_data->ioptions(),
3578 *column_family_data->GetLatestMutableCFOptions());
3579
3580 // Mark v finalized
3581 v->storage_info_.SetFinalized();
3582
3583 // Make "v" current
3584 assert(v->refs_ == 0);
3585 Version* current = column_family_data->current();
3586 assert(v != current);
3587 if (current != nullptr) {
3588 assert(current->refs_ > 0);
3589 current->Unref();
3590 }
3591 column_family_data->SetCurrent(v);
3592 v->Ref();
3593
3594 // Append to linked list
3595 v->prev_ = column_family_data->dummy_versions()->prev_;
3596 v->next_ = column_family_data->dummy_versions();
3597 v->prev_->next_ = v;
3598 v->next_->prev_ = v;
3599 }
3600
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,Directory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)3601 Status VersionSet::ProcessManifestWrites(
3602 std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3603 Directory* db_directory, bool new_descriptor_log,
3604 const ColumnFamilyOptions* new_cf_options) {
3605 assert(!writers.empty());
3606 ManifestWriter& first_writer = writers.front();
3607 ManifestWriter* last_writer = &first_writer;
3608
3609 assert(!manifest_writers_.empty());
3610 assert(manifest_writers_.front() == &first_writer);
3611
3612 autovector<VersionEdit*> batch_edits;
3613 autovector<Version*> versions;
3614 autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
3615 std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
3616
3617 if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3618 // No group commits for column family add or drop
3619 LogAndApplyCFHelper(first_writer.edit_list.front());
3620 batch_edits.push_back(first_writer.edit_list.front());
3621 } else {
3622 auto it = manifest_writers_.cbegin();
3623 size_t group_start = std::numeric_limits<size_t>::max();
3624 while (it != manifest_writers_.cend()) {
3625 if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3626 // no group commits for column family add or drop
3627 break;
3628 }
3629 last_writer = *(it++);
3630 assert(last_writer != nullptr);
3631 assert(last_writer->cfd != nullptr);
3632 if (last_writer->cfd->IsDropped()) {
3633 // If we detect a dropped CF at this point, and the corresponding
3634 // version edits belong to an atomic group, then we need to find out
3635 // the preceding version edits in the same atomic group, and update
3636 // their `remaining_entries_` member variable because we are NOT going
3637 // to write the version edits' of dropped CF to the MANIFEST. If we
3638 // don't update, then Recover can report corrupted atomic group because
3639 // the `remaining_entries_` do not match.
3640 if (!batch_edits.empty()) {
3641 if (batch_edits.back()->is_in_atomic_group_ &&
3642 batch_edits.back()->remaining_entries_ > 0) {
3643 assert(group_start < batch_edits.size());
3644 const auto& edit_list = last_writer->edit_list;
3645 size_t k = 0;
3646 while (k < edit_list.size()) {
3647 if (!edit_list[k]->is_in_atomic_group_) {
3648 break;
3649 } else if (edit_list[k]->remaining_entries_ == 0) {
3650 ++k;
3651 break;
3652 }
3653 ++k;
3654 }
3655 for (auto i = group_start; i < batch_edits.size(); ++i) {
3656 assert(static_cast<uint32_t>(k) <=
3657 batch_edits.back()->remaining_entries_);
3658 batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
3659 }
3660 }
3661 }
3662 continue;
3663 }
3664 // We do a linear search on versions because versions is small.
3665 // TODO(yanqin) maybe consider unordered_map
3666 Version* version = nullptr;
3667 VersionBuilder* builder = nullptr;
3668 for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
3669 uint32_t cf_id = last_writer->cfd->GetID();
3670 if (versions[i]->cfd()->GetID() == cf_id) {
3671 version = versions[i];
3672 assert(!builder_guards.empty() &&
3673 builder_guards.size() == versions.size());
3674 builder = builder_guards[i]->version_builder();
3675 TEST_SYNC_POINT_CALLBACK(
3676 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
3677 break;
3678 }
3679 }
3680 if (version == nullptr) {
3681 version = new Version(last_writer->cfd, this, file_options_,
3682 last_writer->mutable_cf_options,
3683 current_version_number_++);
3684 versions.push_back(version);
3685 mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
3686 builder_guards.emplace_back(
3687 new BaseReferencedVersionBuilder(last_writer->cfd));
3688 builder = builder_guards.back()->version_builder();
3689 }
3690 assert(builder != nullptr); // make checker happy
3691 for (const auto& e : last_writer->edit_list) {
3692 if (e->is_in_atomic_group_) {
3693 if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
3694 (batch_edits.back()->is_in_atomic_group_ &&
3695 batch_edits.back()->remaining_entries_ == 0)) {
3696 group_start = batch_edits.size();
3697 }
3698 } else if (group_start != std::numeric_limits<size_t>::max()) {
3699 group_start = std::numeric_limits<size_t>::max();
3700 }
3701 Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3702 if (!s.ok()) {
3703 // free up the allocated memory
3704 for (auto v : versions) {
3705 delete v;
3706 }
3707 return s;
3708 }
3709 batch_edits.push_back(e);
3710 }
3711 }
3712 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3713 assert(!builder_guards.empty() &&
3714 builder_guards.size() == versions.size());
3715 auto* builder = builder_guards[i]->version_builder();
3716 Status s = builder->SaveTo(versions[i]->storage_info());
3717 if (!s.ok()) {
3718 // free up the allocated memory
3719 for (auto v : versions) {
3720 delete v;
3721 }
3722 return s;
3723 }
3724 }
3725 }
3726
3727 #ifndef NDEBUG
3728 // Verify that version edits of atomic groups have correct
3729 // remaining_entries_.
3730 size_t k = 0;
3731 while (k < batch_edits.size()) {
3732 while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
3733 ++k;
3734 }
3735 if (k == batch_edits.size()) {
3736 break;
3737 }
3738 size_t i = k;
3739 while (i < batch_edits.size()) {
3740 if (!batch_edits[i]->is_in_atomic_group_) {
3741 break;
3742 }
3743 assert(i - k + batch_edits[i]->remaining_entries_ ==
3744 batch_edits[k]->remaining_entries_);
3745 if (batch_edits[i]->remaining_entries_ == 0) {
3746 ++i;
3747 break;
3748 }
3749 ++i;
3750 }
3751 assert(batch_edits[i - 1]->is_in_atomic_group_);
3752 assert(0 == batch_edits[i - 1]->remaining_entries_);
3753 std::vector<VersionEdit*> tmp;
3754 for (size_t j = k; j != i; ++j) {
3755 tmp.emplace_back(batch_edits[j]);
3756 }
3757 TEST_SYNC_POINT_CALLBACK(
3758 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
3759 k = i;
3760 }
3761 #endif // NDEBUG
3762
3763 uint64_t new_manifest_file_size = 0;
3764 Status s;
3765
3766 assert(pending_manifest_file_number_ == 0);
3767 if (!descriptor_log_ ||
3768 manifest_file_size_ > db_options_->max_manifest_file_size) {
3769 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
3770 new_descriptor_log = true;
3771 } else {
3772 pending_manifest_file_number_ = manifest_file_number_;
3773 }
3774
3775 // Local cached copy of state variable(s). WriteCurrentStateToManifest()
3776 // reads its content after releasing db mutex to avoid race with
3777 // SwitchMemtable().
3778 std::unordered_map<uint32_t, MutableCFState> curr_state;
3779 if (new_descriptor_log) {
3780 pending_manifest_file_number_ = NewFileNumber();
3781 batch_edits.back()->SetNextFile(next_file_number_.load());
3782
3783 // if we are writing out new snapshot make sure to persist max column
3784 // family.
3785 if (column_family_set_->GetMaxColumnFamily() > 0) {
3786 first_writer.edit_list.front()->SetMaxColumnFamily(
3787 column_family_set_->GetMaxColumnFamily());
3788 }
3789 for (const auto* cfd : *column_family_set_) {
3790 assert(curr_state.find(cfd->GetID()) == curr_state.end());
3791 curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
3792 }
3793 }
3794
3795 {
3796 FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
3797 mu->Unlock();
3798
3799 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3800 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3801 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3802 assert(!builder_guards.empty() &&
3803 builder_guards.size() == versions.size());
3804 assert(!mutable_cf_options_ptrs.empty() &&
3805 builder_guards.size() == versions.size());
3806 ColumnFamilyData* cfd = versions[i]->cfd_;
3807 s = builder_guards[i]->version_builder()->LoadTableHandlers(
3808 cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
3809 true /* prefetch_index_and_filter_in_cache */,
3810 false /* is_initial_load */,
3811 mutable_cf_options_ptrs[i]->prefix_extractor.get());
3812 if (!s.ok()) {
3813 if (db_options_->paranoid_checks) {
3814 break;
3815 }
3816 s = Status::OK();
3817 }
3818 }
3819 }
3820
3821 if (s.ok() && new_descriptor_log) {
3822 // This is fine because everything inside of this block is serialized --
3823 // only one thread can be here at the same time
3824 // create new manifest file
3825 ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
3826 pending_manifest_file_number_);
3827 std::string descriptor_fname =
3828 DescriptorFileName(dbname_, pending_manifest_file_number_);
3829 std::unique_ptr<FSWritableFile> descriptor_file;
3830 s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
3831 opt_file_opts);
3832 if (s.ok()) {
3833 descriptor_file->SetPreallocationBlockSize(
3834 db_options_->manifest_preallocation_size);
3835
3836 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3837 std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
3838 nullptr, db_options_->listeners));
3839 descriptor_log_.reset(
3840 new log::Writer(std::move(file_writer), 0, false));
3841 s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
3842 }
3843 }
3844
3845 if (s.ok()) {
3846 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3847 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3848 versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
3849 }
3850 }
3851
3852 // Write new records to MANIFEST log
3853 #ifndef NDEBUG
3854 size_t idx = 0;
3855 #endif
3856 for (auto& e : batch_edits) {
3857 std::string record;
3858 if (!e->EncodeTo(&record)) {
3859 s = Status::Corruption("Unable to encode VersionEdit:" +
3860 e->DebugString(true));
3861 break;
3862 }
3863 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
3864 rocksdb_kill_odds * REDUCE_ODDS2);
3865 #ifndef NDEBUG
3866 if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
3867 TEST_SYNC_POINT_CALLBACK(
3868 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
3869 nullptr);
3870 TEST_SYNC_POINT(
3871 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
3872 }
3873 ++idx;
3874 #endif /* !NDEBUG */
3875 s = descriptor_log_->AddRecord(record);
3876 if (!s.ok()) {
3877 break;
3878 }
3879 }
3880 if (s.ok()) {
3881 s = SyncManifest(env_, db_options_, descriptor_log_->file());
3882 }
3883 if (!s.ok()) {
3884 ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3885 s.ToString().c_str());
3886 }
3887 }
3888
3889 // If we just created a new descriptor file, install it by writing a
3890 // new CURRENT file that points to it.
3891 if (s.ok() && new_descriptor_log) {
3892 s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
3893 db_directory);
3894 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3895 }
3896
3897 if (s.ok()) {
3898 // find offset in manifest file where this version is stored.
3899 new_manifest_file_size = descriptor_log_->file()->GetFileSize();
3900 }
3901
3902 if (first_writer.edit_list.front()->is_column_family_drop_) {
3903 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3904 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3905 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3906 }
3907
3908 LogFlush(db_options_->info_log);
3909 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3910 mu->Lock();
3911 }
3912
3913 // Append the old manifest file to the obsolete_manifest_ list to be deleted
3914 // by PurgeObsoleteFiles later.
3915 if (s.ok() && new_descriptor_log) {
3916 obsolete_manifests_.emplace_back(
3917 DescriptorFileName("", manifest_file_number_));
3918 }
3919
3920 // Install the new versions
3921 if (s.ok()) {
3922 if (first_writer.edit_list.front()->is_column_family_add_) {
3923 assert(batch_edits.size() == 1);
3924 assert(new_cf_options != nullptr);
3925 CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
3926 } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3927 assert(batch_edits.size() == 1);
3928 first_writer.cfd->SetDropped();
3929 first_writer.cfd->UnrefAndTryDelete();
3930 } else {
3931 // Each version in versions corresponds to a column family.
3932 // For each column family, update its log number indicating that logs
3933 // with number smaller than this should be ignored.
3934 for (const auto version : versions) {
3935 uint64_t max_log_number_in_batch = 0;
3936 uint32_t cf_id = version->cfd_->GetID();
3937 for (const auto& e : batch_edits) {
3938 if (e->has_log_number_ && e->column_family_ == cf_id) {
3939 max_log_number_in_batch =
3940 std::max(max_log_number_in_batch, e->log_number_);
3941 }
3942 }
3943 if (max_log_number_in_batch != 0) {
3944 assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
3945 version->cfd_->SetLogNumber(max_log_number_in_batch);
3946 }
3947 }
3948
3949 uint64_t last_min_log_number_to_keep = 0;
3950 for (auto& e : batch_edits) {
3951 if (e->has_min_log_number_to_keep_) {
3952 last_min_log_number_to_keep =
3953 std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
3954 }
3955 }
3956
3957 if (last_min_log_number_to_keep != 0) {
3958 // Should only be set in 2PC mode.
3959 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
3960 }
3961
3962 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3963 ColumnFamilyData* cfd = versions[i]->cfd_;
3964 AppendVersion(cfd, versions[i]);
3965 }
3966 }
3967 manifest_file_number_ = pending_manifest_file_number_;
3968 manifest_file_size_ = new_manifest_file_size;
3969 prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
3970 } else {
3971 std::string version_edits;
3972 for (auto& e : batch_edits) {
3973 version_edits += ("\n" + e->DebugString(true));
3974 }
3975 ROCKS_LOG_ERROR(db_options_->info_log,
3976 "Error in committing version edit to MANIFEST: %s",
3977 version_edits.c_str());
3978 for (auto v : versions) {
3979 delete v;
3980 }
3981 // If manifest append failed for whatever reason, the file could be
3982 // corrupted. So we need to force the next version update to start a
3983 // new manifest file.
3984 descriptor_log_.reset();
3985 if (new_descriptor_log) {
3986 ROCKS_LOG_INFO(db_options_->info_log,
3987 "Deleting manifest %" PRIu64 " current manifest %" PRIu64
3988 "\n",
3989 manifest_file_number_, pending_manifest_file_number_);
3990 env_->DeleteFile(
3991 DescriptorFileName(dbname_, pending_manifest_file_number_));
3992 }
3993 }
3994
3995 pending_manifest_file_number_ = 0;
3996
3997 // wake up all the waiting writers
3998 while (true) {
3999 ManifestWriter* ready = manifest_writers_.front();
4000 manifest_writers_.pop_front();
4001 bool need_signal = true;
4002 for (const auto& w : writers) {
4003 if (&w == ready) {
4004 need_signal = false;
4005 break;
4006 }
4007 }
4008 ready->status = s;
4009 ready->done = true;
4010 if (need_signal) {
4011 ready->cv.Signal();
4012 }
4013 if (ready == last_writer) {
4014 break;
4015 }
4016 }
4017 if (!manifest_writers_.empty()) {
4018 manifest_writers_.front()->cv.Signal();
4019 }
4020 return s;
4021 }
4022
4023 // 'datas' is gramatically incorrect. We still use this notation to indicate
4024 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,Directory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)4025 Status VersionSet::LogAndApply(
4026 const autovector<ColumnFamilyData*>& column_family_datas,
4027 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4028 const autovector<autovector<VersionEdit*>>& edit_lists,
4029 InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
4030 const ColumnFamilyOptions* new_cf_options) {
4031 mu->AssertHeld();
4032 int num_edits = 0;
4033 for (const auto& elist : edit_lists) {
4034 num_edits += static_cast<int>(elist.size());
4035 }
4036 if (num_edits == 0) {
4037 return Status::OK();
4038 } else if (num_edits > 1) {
4039 #ifndef NDEBUG
4040 for (const auto& edit_list : edit_lists) {
4041 for (const auto& edit : edit_list) {
4042 assert(!edit->IsColumnFamilyManipulation());
4043 }
4044 }
4045 #endif /* ! NDEBUG */
4046 }
4047
4048 int num_cfds = static_cast<int>(column_family_datas.size());
4049 if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4050 assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4051 assert(edit_lists[0][0]->is_column_family_add_);
4052 assert(new_cf_options != nullptr);
4053 }
4054 std::deque<ManifestWriter> writers;
4055 if (num_cfds > 0) {
4056 assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4057 assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4058 }
4059 for (int i = 0; i < num_cfds; ++i) {
4060 writers.emplace_back(mu, column_family_datas[i],
4061 *mutable_cf_options_list[i], edit_lists[i]);
4062 manifest_writers_.push_back(&writers[i]);
4063 }
4064 assert(!writers.empty());
4065 ManifestWriter& first_writer = writers.front();
4066 while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4067 first_writer.cv.Wait();
4068 }
4069 if (first_writer.done) {
4070 // All non-CF-manipulation operations can be grouped together and committed
4071 // to MANIFEST. They should all have finished. The status code is stored in
4072 // the first manifest writer.
4073 #ifndef NDEBUG
4074 for (const auto& writer : writers) {
4075 assert(writer.done);
4076 }
4077 #endif /* !NDEBUG */
4078 return first_writer.status;
4079 }
4080
4081 int num_undropped_cfds = 0;
4082 for (auto cfd : column_family_datas) {
4083 // if cfd == nullptr, it is a column family add.
4084 if (cfd == nullptr || !cfd->IsDropped()) {
4085 ++num_undropped_cfds;
4086 }
4087 }
4088 if (0 == num_undropped_cfds) {
4089 for (int i = 0; i != num_cfds; ++i) {
4090 manifest_writers_.pop_front();
4091 }
4092 // Notify new head of manifest write queue.
4093 if (!manifest_writers_.empty()) {
4094 manifest_writers_.front()->cv.Signal();
4095 }
4096 return Status::ColumnFamilyDropped();
4097 }
4098
4099 return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4100 new_cf_options);
4101 }
4102
LogAndApplyCFHelper(VersionEdit * edit)4103 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4104 assert(edit->IsColumnFamilyManipulation());
4105 edit->SetNextFile(next_file_number_.load());
4106 // The log might have data that is not visible to memtbale and hence have not
4107 // updated the last_sequence_ yet. It is also possible that the log has is
4108 // expecting some new data that is not written yet. Since LastSequence is an
4109 // upper bound on the sequence, it is ok to record
4110 // last_allocated_sequence_ as the last sequence.
4111 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4112 : last_sequence_);
4113 if (edit->is_column_family_drop_) {
4114 // if we drop column family, we have to make sure to save max column family,
4115 // so that we don't reuse existing ID
4116 edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4117 }
4118 }
4119
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4120 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4121 VersionBuilder* builder, VersionEdit* edit,
4122 InstrumentedMutex* mu) {
4123 #ifdef NDEBUG
4124 (void)cfd;
4125 #endif
4126 mu->AssertHeld();
4127 assert(!edit->IsColumnFamilyManipulation());
4128
4129 if (edit->has_log_number_) {
4130 assert(edit->log_number_ >= cfd->GetLogNumber());
4131 assert(edit->log_number_ < next_file_number_.load());
4132 }
4133
4134 if (!edit->has_prev_log_number_) {
4135 edit->SetPrevLogNumber(prev_log_number_);
4136 }
4137 edit->SetNextFile(next_file_number_.load());
4138 // The log might have data that is not visible to memtbale and hence have not
4139 // updated the last_sequence_ yet. It is also possible that the log has is
4140 // expecting some new data that is not written yet. Since LastSequence is an
4141 // upper bound on the sequence, it is ok to record
4142 // last_allocated_sequence_ as the last sequence.
4143 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4144 : last_sequence_);
4145
4146 Status s = builder->Apply(edit);
4147
4148 return s;
4149 }
4150
ApplyOneVersionEditToBuilder(VersionEdit & edit,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params)4151 Status VersionSet::ApplyOneVersionEditToBuilder(
4152 VersionEdit& edit,
4153 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4154 std::unordered_map<int, std::string>& column_families_not_found,
4155 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4156 builders,
4157 VersionEditParams* version_edit_params) {
4158 // Not found means that user didn't supply that column
4159 // family option AND we encountered column family add
4160 // record. Once we encounter column family drop record,
4161 // we will delete the column family from
4162 // column_families_not_found.
4163 bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
4164 column_families_not_found.end());
4165 // in builders means that user supplied that column family
4166 // option AND that we encountered column family add record
4167 bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
4168
4169 // they can't both be true
4170 assert(!(cf_in_not_found && cf_in_builders));
4171
4172 ColumnFamilyData* cfd = nullptr;
4173
4174 if (edit.is_column_family_add_) {
4175 if (cf_in_builders || cf_in_not_found) {
4176 return Status::Corruption(
4177 "Manifest adding the same column family twice: " +
4178 edit.column_family_name_);
4179 }
4180 auto cf_options = name_to_options.find(edit.column_family_name_);
4181 // implicitly add persistent_stats column family without requiring user
4182 // to specify
4183 bool is_persistent_stats_column_family =
4184 edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
4185 if (cf_options == name_to_options.end() &&
4186 !is_persistent_stats_column_family) {
4187 column_families_not_found.insert(
4188 {edit.column_family_, edit.column_family_name_});
4189 } else {
4190 // recover persistent_stats CF from a DB that already contains it
4191 if (is_persistent_stats_column_family) {
4192 ColumnFamilyOptions cfo;
4193 OptimizeForPersistentStats(&cfo);
4194 cfd = CreateColumnFamily(cfo, &edit);
4195 } else {
4196 cfd = CreateColumnFamily(cf_options->second, &edit);
4197 }
4198 cfd->set_initialized();
4199 builders.insert(std::make_pair(
4200 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4201 new BaseReferencedVersionBuilder(cfd))));
4202 }
4203 } else if (edit.is_column_family_drop_) {
4204 if (cf_in_builders) {
4205 auto builder = builders.find(edit.column_family_);
4206 assert(builder != builders.end());
4207 builders.erase(builder);
4208 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4209 assert(cfd != nullptr);
4210 if (cfd->UnrefAndTryDelete()) {
4211 cfd = nullptr;
4212 } else {
4213 // who else can have reference to cfd!?
4214 assert(false);
4215 }
4216 } else if (cf_in_not_found) {
4217 column_families_not_found.erase(edit.column_family_);
4218 } else {
4219 return Status::Corruption(
4220 "Manifest - dropping non-existing column family");
4221 }
4222 } else if (!cf_in_not_found) {
4223 if (!cf_in_builders) {
4224 return Status::Corruption(
4225 "Manifest record referencing unknown column family");
4226 }
4227
4228 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4229 // this should never happen since cf_in_builders is true
4230 assert(cfd != nullptr);
4231
4232 // if it is not column family add or column family drop,
4233 // then it's a file add/delete, which should be forwarded
4234 // to builder
4235 auto builder = builders.find(edit.column_family_);
4236 assert(builder != builders.end());
4237 Status s = builder->second->version_builder()->Apply(&edit);
4238 if (!s.ok()) {
4239 return s;
4240 }
4241 }
4242 return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
4243 }
4244
ExtractInfoFromVersionEdit(ColumnFamilyData * cfd,const VersionEdit & from_edit,VersionEditParams * version_edit_params)4245 Status VersionSet::ExtractInfoFromVersionEdit(
4246 ColumnFamilyData* cfd, const VersionEdit& from_edit,
4247 VersionEditParams* version_edit_params) {
4248 if (cfd != nullptr) {
4249 if (from_edit.has_db_id_) {
4250 version_edit_params->SetDBId(from_edit.db_id_);
4251 }
4252 if (from_edit.has_log_number_) {
4253 if (cfd->GetLogNumber() > from_edit.log_number_) {
4254 ROCKS_LOG_WARN(
4255 db_options_->info_log,
4256 "MANIFEST corruption detected, but ignored - Log numbers in "
4257 "records NOT monotonically increasing");
4258 } else {
4259 cfd->SetLogNumber(from_edit.log_number_);
4260 version_edit_params->SetLogNumber(from_edit.log_number_);
4261 }
4262 }
4263 if (from_edit.has_comparator_ &&
4264 from_edit.comparator_ != cfd->user_comparator()->Name()) {
4265 return Status::InvalidArgument(
4266 cfd->user_comparator()->Name(),
4267 "does not match existing comparator " + from_edit.comparator_);
4268 }
4269 }
4270
4271 if (from_edit.has_prev_log_number_) {
4272 version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
4273 }
4274
4275 if (from_edit.has_next_file_number_) {
4276 version_edit_params->SetNextFile(from_edit.next_file_number_);
4277 }
4278
4279 if (from_edit.has_max_column_family_) {
4280 version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
4281 }
4282
4283 if (from_edit.has_min_log_number_to_keep_) {
4284 version_edit_params->min_log_number_to_keep_ =
4285 std::max(version_edit_params->min_log_number_to_keep_,
4286 from_edit.min_log_number_to_keep_);
4287 }
4288
4289 if (from_edit.has_last_sequence_) {
4290 version_edit_params->SetLastSequence(from_edit.last_sequence_);
4291 }
4292 return Status::OK();
4293 }
4294
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4295 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4296 FileSystem* fs,
4297 std::string* manifest_path,
4298 uint64_t* manifest_file_number) {
4299 assert(fs != nullptr);
4300 assert(manifest_path != nullptr);
4301 assert(manifest_file_number != nullptr);
4302
4303 std::string fname;
4304 Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4305 if (!s.ok()) {
4306 return s;
4307 }
4308 if (fname.empty() || fname.back() != '\n') {
4309 return Status::Corruption("CURRENT file does not end with newline");
4310 }
4311 // remove the trailing '\n'
4312 fname.resize(fname.size() - 1);
4313 FileType type;
4314 bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4315 if (!parse_ok || type != kDescriptorFile) {
4316 return Status::Corruption("CURRENT file corrupted");
4317 }
4318 *manifest_path = dbname;
4319 if (dbname.back() != '/') {
4320 manifest_path->push_back('/');
4321 }
4322 *manifest_path += fname;
4323 return Status::OK();
4324 }
4325
ReadAndRecover(log::Reader * reader,AtomicGroupReadBuffer * read_buffer,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params,std::string * db_id)4326 Status VersionSet::ReadAndRecover(
4327 log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
4328 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4329 std::unordered_map<int, std::string>& column_families_not_found,
4330 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4331 builders,
4332 VersionEditParams* version_edit_params, std::string* db_id) {
4333 assert(reader != nullptr);
4334 assert(read_buffer != nullptr);
4335 Status s;
4336 Slice record;
4337 std::string scratch;
4338 size_t recovered_edits = 0;
4339 while (reader->ReadRecord(&record, &scratch) && s.ok()) {
4340 VersionEdit edit;
4341 s = edit.DecodeFrom(record);
4342 if (!s.ok()) {
4343 break;
4344 }
4345 if (edit.has_db_id_) {
4346 db_id_ = edit.GetDbId();
4347 if (db_id != nullptr) {
4348 db_id->assign(edit.GetDbId());
4349 }
4350 }
4351 s = read_buffer->AddEdit(&edit);
4352 if (!s.ok()) {
4353 break;
4354 }
4355 if (edit.is_in_atomic_group_) {
4356 if (read_buffer->IsFull()) {
4357 // Apply edits in an atomic group when we have read all edits in the
4358 // group.
4359 for (auto& e : read_buffer->replay_buffer()) {
4360 s = ApplyOneVersionEditToBuilder(e, name_to_options,
4361 column_families_not_found, builders,
4362 version_edit_params);
4363 if (!s.ok()) {
4364 break;
4365 }
4366 recovered_edits++;
4367 }
4368 if (!s.ok()) {
4369 break;
4370 }
4371 read_buffer->Clear();
4372 }
4373 } else {
4374 // Apply a normal edit immediately.
4375 s = ApplyOneVersionEditToBuilder(edit, name_to_options,
4376 column_families_not_found, builders,
4377 version_edit_params);
4378 if (s.ok()) {
4379 recovered_edits++;
4380 }
4381 }
4382 }
4383 if (!s.ok()) {
4384 // Clear the buffer if we fail to decode/apply an edit.
4385 read_buffer->Clear();
4386 }
4387 TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
4388 &recovered_edits);
4389 return s;
4390 }
4391
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4392 Status VersionSet::Recover(
4393 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4394 std::string* db_id) {
4395 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
4396 for (const auto& cf : column_families) {
4397 cf_name_to_options.emplace(cf.name, cf.options);
4398 }
4399 // keeps track of column families in manifest that were not found in
4400 // column families parameters. if those column families are not dropped
4401 // by subsequent manifest records, Recover() will return failure status
4402 std::unordered_map<int, std::string> column_families_not_found;
4403
4404 // Read "CURRENT" file, which contains a pointer to the current manifest file
4405 std::string manifest_path;
4406 Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
4407 &manifest_file_number_);
4408 if (!s.ok()) {
4409 return s;
4410 }
4411
4412 ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4413 manifest_path.c_str());
4414
4415 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4416 {
4417 std::unique_ptr<FSSequentialFile> manifest_file;
4418 s = fs_->NewSequentialFile(manifest_path,
4419 fs_->OptimizeForManifestRead(file_options_),
4420 &manifest_file, nullptr);
4421 if (!s.ok()) {
4422 return s;
4423 }
4424 manifest_file_reader.reset(
4425 new SequentialFileReader(std::move(manifest_file), manifest_path,
4426 db_options_->log_readahead_size));
4427 }
4428
4429 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4430 builders;
4431
4432 // add default column family
4433 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
4434 if (default_cf_iter == cf_name_to_options.end()) {
4435 return Status::InvalidArgument("Default column family not specified");
4436 }
4437 VersionEdit default_cf_edit;
4438 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4439 default_cf_edit.SetColumnFamily(0);
4440 ColumnFamilyData* default_cfd =
4441 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
4442 // In recovery, nobody else can access it, so it's fine to set it to be
4443 // initialized earlier.
4444 default_cfd->set_initialized();
4445 builders.insert(
4446 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4447 new BaseReferencedVersionBuilder(default_cfd))));
4448 uint64_t current_manifest_file_size = 0;
4449 VersionEditParams version_edit_params;
4450 {
4451 VersionSet::LogReporter reporter;
4452 reporter.status = &s;
4453 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4454 true /* checksum */, 0 /* log_number */);
4455 Slice record;
4456 std::string scratch;
4457 AtomicGroupReadBuffer read_buffer;
4458 s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
4459 column_families_not_found, builders,
4460 &version_edit_params, db_id);
4461 current_manifest_file_size = reader.GetReadOffset();
4462 assert(current_manifest_file_size != 0);
4463 }
4464
4465 if (s.ok()) {
4466 if (!version_edit_params.has_next_file_number_) {
4467 s = Status::Corruption("no meta-nextfile entry in descriptor");
4468 } else if (!version_edit_params.has_log_number_) {
4469 s = Status::Corruption("no meta-lognumber entry in descriptor");
4470 } else if (!version_edit_params.has_last_sequence_) {
4471 s = Status::Corruption("no last-sequence-number entry in descriptor");
4472 }
4473
4474 if (!version_edit_params.has_prev_log_number_) {
4475 version_edit_params.SetPrevLogNumber(0);
4476 }
4477
4478 column_family_set_->UpdateMaxColumnFamily(
4479 version_edit_params.max_column_family_);
4480
4481 // When reading DB generated using old release, min_log_number_to_keep=0.
4482 // All log files will be scanned for potential prepare entries.
4483 MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
4484 MarkFileNumberUsed(version_edit_params.prev_log_number_);
4485 MarkFileNumberUsed(version_edit_params.log_number_);
4486 }
4487
4488 // there were some column families in the MANIFEST that weren't specified
4489 // in the argument. This is OK in read_only mode
4490 if (read_only == false && !column_families_not_found.empty()) {
4491 std::string list_of_not_found;
4492 for (const auto& cf : column_families_not_found) {
4493 list_of_not_found += ", " + cf.second;
4494 }
4495 list_of_not_found = list_of_not_found.substr(2);
4496 s = Status::InvalidArgument(
4497 "You have to open all column families. Column families not opened: " +
4498 list_of_not_found);
4499 }
4500
4501 if (s.ok()) {
4502 for (auto cfd : *column_family_set_) {
4503 assert(builders.count(cfd->GetID()) > 0);
4504 auto* builder = builders[cfd->GetID()]->version_builder();
4505 if (!builder->CheckConsistencyForNumLevels()) {
4506 s = Status::InvalidArgument(
4507 "db has more levels than options.num_levels");
4508 break;
4509 }
4510 }
4511 }
4512
4513 if (s.ok()) {
4514 for (auto cfd : *column_family_set_) {
4515 if (cfd->IsDropped()) {
4516 continue;
4517 }
4518 if (read_only) {
4519 cfd->table_cache()->SetTablesAreImmortal();
4520 }
4521 assert(cfd->initialized());
4522 auto builders_iter = builders.find(cfd->GetID());
4523 assert(builders_iter != builders.end());
4524 auto builder = builders_iter->second->version_builder();
4525
4526 // unlimited table cache. Pre-load table handle now.
4527 // Need to do it out of the mutex.
4528 s = builder->LoadTableHandlers(
4529 cfd->internal_stats(), db_options_->max_file_opening_threads,
4530 false /* prefetch_index_and_filter_in_cache */,
4531 true /* is_initial_load */,
4532 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4533 if (!s.ok()) {
4534 if (db_options_->paranoid_checks) {
4535 return s;
4536 }
4537 s = Status::OK();
4538 }
4539
4540 Version* v = new Version(cfd, this, file_options_,
4541 *cfd->GetLatestMutableCFOptions(),
4542 current_version_number_++);
4543 builder->SaveTo(v->storage_info());
4544
4545 // Install recovered version
4546 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
4547 !(db_options_->skip_stats_update_on_db_open));
4548 AppendVersion(cfd, v);
4549 }
4550
4551 manifest_file_size_ = current_manifest_file_size;
4552 next_file_number_.store(version_edit_params.next_file_number_ + 1);
4553 last_allocated_sequence_ = version_edit_params.last_sequence_;
4554 last_published_sequence_ = version_edit_params.last_sequence_;
4555 last_sequence_ = version_edit_params.last_sequence_;
4556 prev_log_number_ = version_edit_params.prev_log_number_;
4557
4558 ROCKS_LOG_INFO(
4559 db_options_->info_log,
4560 "Recovered from manifest file:%s succeeded,"
4561 "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4562 ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4563 ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4564 ",min_log_number_to_keep is %" PRIu64 "\n",
4565 manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4566 last_sequence_.load(), version_edit_params.log_number_,
4567 prev_log_number_, column_family_set_->GetMaxColumnFamily(),
4568 min_log_number_to_keep_2pc());
4569
4570 for (auto cfd : *column_family_set_) {
4571 if (cfd->IsDropped()) {
4572 continue;
4573 }
4574 ROCKS_LOG_INFO(db_options_->info_log,
4575 "Column family [%s] (ID %" PRIu32
4576 "), log number is %" PRIu64 "\n",
4577 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4578 }
4579 }
4580
4581 return s;
4582 }
4583
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)4584 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4585 const std::string& dbname,
4586 FileSystem* fs) {
4587 // these are just for performance reasons, not correcntes,
4588 // so we're fine using the defaults
4589 FileOptions soptions;
4590 // Read "CURRENT" file, which contains a pointer to the current manifest file
4591 std::string manifest_path;
4592 uint64_t manifest_file_number;
4593 Status s =
4594 GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
4595 if (!s.ok()) {
4596 return s;
4597 }
4598
4599 std::unique_ptr<SequentialFileReader> file_reader;
4600 {
4601 std::unique_ptr<FSSequentialFile> file;
4602 s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4603 if (!s.ok()) {
4604 return s;
4605 }
4606 file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
4607 }
4608
4609 std::map<uint32_t, std::string> column_family_names;
4610 // default column family is always implicitly there
4611 column_family_names.insert({0, kDefaultColumnFamilyName});
4612 VersionSet::LogReporter reporter;
4613 reporter.status = &s;
4614 log::Reader reader(nullptr, std::move(file_reader), &reporter,
4615 true /* checksum */, 0 /* log_number */);
4616 Slice record;
4617 std::string scratch;
4618 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4619 VersionEdit edit;
4620 s = edit.DecodeFrom(record);
4621 if (!s.ok()) {
4622 break;
4623 }
4624 if (edit.is_column_family_add_) {
4625 if (column_family_names.find(edit.column_family_) !=
4626 column_family_names.end()) {
4627 s = Status::Corruption("Manifest adding the same column family twice");
4628 break;
4629 }
4630 column_family_names.insert(
4631 {edit.column_family_, edit.column_family_name_});
4632 } else if (edit.is_column_family_drop_) {
4633 if (column_family_names.find(edit.column_family_) ==
4634 column_family_names.end()) {
4635 s = Status::Corruption(
4636 "Manifest - dropping non-existing column family");
4637 break;
4638 }
4639 column_family_names.erase(edit.column_family_);
4640 }
4641 }
4642
4643 column_families->clear();
4644 if (s.ok()) {
4645 for (const auto& iter : column_family_names) {
4646 column_families->push_back(iter.second);
4647 }
4648 }
4649
4650 return s;
4651 }
4652
4653 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)4654 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
4655 const Options* options,
4656 const FileOptions& file_options,
4657 int new_levels) {
4658 if (new_levels <= 1) {
4659 return Status::InvalidArgument(
4660 "Number of levels needs to be bigger than 1");
4661 }
4662
4663 ImmutableDBOptions db_options(*options);
4664 ColumnFamilyOptions cf_options(*options);
4665 std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
4666 options->table_cache_numshardbits));
4667 WriteController wc(options->delayed_write_rate);
4668 WriteBufferManager wb(options->db_write_buffer_size);
4669 VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4670 /*block_cache_tracer=*/nullptr);
4671 Status status;
4672
4673 std::vector<ColumnFamilyDescriptor> dummy;
4674 ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
4675 ColumnFamilyOptions(*options));
4676 dummy.push_back(dummy_descriptor);
4677 status = versions.Recover(dummy);
4678 if (!status.ok()) {
4679 return status;
4680 }
4681
4682 Version* current_version =
4683 versions.GetColumnFamilySet()->GetDefault()->current();
4684 auto* vstorage = current_version->storage_info();
4685 int current_levels = vstorage->num_levels();
4686
4687 if (current_levels <= new_levels) {
4688 return Status::OK();
4689 }
4690
4691 // Make sure there are file only on one level from
4692 // (new_levels-1) to (current_levels-1)
4693 int first_nonempty_level = -1;
4694 int first_nonempty_level_filenum = 0;
4695 for (int i = new_levels - 1; i < current_levels; i++) {
4696 int file_num = vstorage->NumLevelFiles(i);
4697 if (file_num != 0) {
4698 if (first_nonempty_level < 0) {
4699 first_nonempty_level = i;
4700 first_nonempty_level_filenum = file_num;
4701 } else {
4702 char msg[255];
4703 snprintf(msg, sizeof(msg),
4704 "Found at least two levels containing files: "
4705 "[%d:%d],[%d:%d].\n",
4706 first_nonempty_level, first_nonempty_level_filenum, i,
4707 file_num);
4708 return Status::InvalidArgument(msg);
4709 }
4710 }
4711 }
4712
4713 // we need to allocate an array with the old number of levels size to
4714 // avoid SIGSEGV in WriteCurrentStatetoManifest()
4715 // however, all levels bigger or equal to new_levels will be empty
4716 std::vector<FileMetaData*>* new_files_list =
4717 new std::vector<FileMetaData*>[current_levels];
4718 for (int i = 0; i < new_levels - 1; i++) {
4719 new_files_list[i] = vstorage->LevelFiles(i);
4720 }
4721
4722 if (first_nonempty_level > 0) {
4723 new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
4724 }
4725
4726 delete[] vstorage -> files_;
4727 vstorage->files_ = new_files_list;
4728 vstorage->num_levels_ = new_levels;
4729
4730 MutableCFOptions mutable_cf_options(*options);
4731 VersionEdit ve;
4732 InstrumentedMutex dummy_mutex;
4733 InstrumentedMutexLock l(&dummy_mutex);
4734 return versions.LogAndApply(
4735 versions.GetColumnFamilySet()->GetDefault(),
4736 mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
4737 }
4738
4739 // Get the checksum information including the checksum and checksum function
4740 // name of all SST files in VersionSet. Store the information in
4741 // FileChecksumList which contains a map from file number to its checksum info.
4742 // If DB is not running, make sure call VersionSet::Recover() to load the file
4743 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)4744 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
4745 // Clean the previously stored checksum information if any.
4746 if (checksum_list == nullptr) {
4747 return Status::InvalidArgument("checksum_list is nullptr");
4748 }
4749 checksum_list->reset();
4750
4751 for (auto cfd : *column_family_set_) {
4752 if (cfd->IsDropped() || !cfd->initialized()) {
4753 continue;
4754 }
4755 for (int level = 0; level < cfd->NumberLevels(); level++) {
4756 for (const auto& file :
4757 cfd->current()->storage_info()->LevelFiles(level)) {
4758 checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
4759 file->file_checksum,
4760 file->file_checksum_func_name);
4761 }
4762 }
4763 }
4764 return Status::OK();
4765 }
4766
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)4767 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4768 bool verbose, bool hex, bool json) {
4769 // Open the specified manifest file.
4770 std::unique_ptr<SequentialFileReader> file_reader;
4771 Status s;
4772 {
4773 std::unique_ptr<FSSequentialFile> file;
4774 s = options.file_system->NewSequentialFile(
4775 dscname,
4776 options.file_system->OptimizeForManifestRead(file_options_), &file,
4777 nullptr);
4778 if (!s.ok()) {
4779 return s;
4780 }
4781 file_reader.reset(new SequentialFileReader(
4782 std::move(file), dscname, db_options_->log_readahead_size));
4783 }
4784
4785 bool have_prev_log_number = false;
4786 bool have_next_file = false;
4787 bool have_last_sequence = false;
4788 uint64_t next_file = 0;
4789 uint64_t last_sequence = 0;
4790 uint64_t previous_log_number = 0;
4791 int count = 0;
4792 std::unordered_map<uint32_t, std::string> comparators;
4793 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4794 builders;
4795
4796 // add default column family
4797 VersionEdit default_cf_edit;
4798 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4799 default_cf_edit.SetColumnFamily(0);
4800 ColumnFamilyData* default_cfd =
4801 CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
4802 builders.insert(
4803 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4804 new BaseReferencedVersionBuilder(default_cfd))));
4805
4806 {
4807 VersionSet::LogReporter reporter;
4808 reporter.status = &s;
4809 log::Reader reader(nullptr, std::move(file_reader), &reporter,
4810 true /* checksum */, 0 /* log_number */);
4811 Slice record;
4812 std::string scratch;
4813 while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4814 VersionEdit edit;
4815 s = edit.DecodeFrom(record);
4816 if (!s.ok()) {
4817 break;
4818 }
4819
4820 // Write out each individual edit
4821 if (verbose && !json) {
4822 printf("%s\n", edit.DebugString(hex).c_str());
4823 } else if (json) {
4824 printf("%s\n", edit.DebugJSON(count, hex).c_str());
4825 }
4826 count++;
4827
4828 bool cf_in_builders =
4829 builders.find(edit.column_family_) != builders.end();
4830
4831 if (edit.has_comparator_) {
4832 comparators.insert({edit.column_family_, edit.comparator_});
4833 }
4834
4835 ColumnFamilyData* cfd = nullptr;
4836
4837 if (edit.is_column_family_add_) {
4838 if (cf_in_builders) {
4839 s = Status::Corruption(
4840 "Manifest adding the same column family twice");
4841 break;
4842 }
4843 cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
4844 cfd->set_initialized();
4845 builders.insert(std::make_pair(
4846 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4847 new BaseReferencedVersionBuilder(cfd))));
4848 } else if (edit.is_column_family_drop_) {
4849 if (!cf_in_builders) {
4850 s = Status::Corruption(
4851 "Manifest - dropping non-existing column family");
4852 break;
4853 }
4854 auto builder_iter = builders.find(edit.column_family_);
4855 builders.erase(builder_iter);
4856 comparators.erase(edit.column_family_);
4857 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4858 assert(cfd != nullptr);
4859 cfd->UnrefAndTryDelete();
4860 cfd = nullptr;
4861 } else {
4862 if (!cf_in_builders) {
4863 s = Status::Corruption(
4864 "Manifest record referencing unknown column family");
4865 break;
4866 }
4867
4868 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4869 // this should never happen since cf_in_builders is true
4870 assert(cfd != nullptr);
4871
4872 // if it is not column family add or column family drop,
4873 // then it's a file add/delete, which should be forwarded
4874 // to builder
4875 auto builder = builders.find(edit.column_family_);
4876 assert(builder != builders.end());
4877 s = builder->second->version_builder()->Apply(&edit);
4878 if (!s.ok()) {
4879 break;
4880 }
4881 }
4882
4883 if (cfd != nullptr && edit.has_log_number_) {
4884 cfd->SetLogNumber(edit.log_number_);
4885 }
4886
4887
4888 if (edit.has_prev_log_number_) {
4889 previous_log_number = edit.prev_log_number_;
4890 have_prev_log_number = true;
4891 }
4892
4893 if (edit.has_next_file_number_) {
4894 next_file = edit.next_file_number_;
4895 have_next_file = true;
4896 }
4897
4898 if (edit.has_last_sequence_) {
4899 last_sequence = edit.last_sequence_;
4900 have_last_sequence = true;
4901 }
4902
4903 if (edit.has_max_column_family_) {
4904 column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
4905 }
4906
4907 if (edit.has_min_log_number_to_keep_) {
4908 MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
4909 }
4910 }
4911 }
4912 file_reader.reset();
4913
4914 if (s.ok()) {
4915 if (!have_next_file) {
4916 s = Status::Corruption("no meta-nextfile entry in descriptor");
4917 printf("no meta-nextfile entry in descriptor");
4918 } else if (!have_last_sequence) {
4919 printf("no last-sequence-number entry in descriptor");
4920 s = Status::Corruption("no last-sequence-number entry in descriptor");
4921 }
4922
4923 if (!have_prev_log_number) {
4924 previous_log_number = 0;
4925 }
4926 }
4927
4928 if (s.ok()) {
4929 for (auto cfd : *column_family_set_) {
4930 if (cfd->IsDropped()) {
4931 continue;
4932 }
4933 auto builders_iter = builders.find(cfd->GetID());
4934 assert(builders_iter != builders.end());
4935 auto builder = builders_iter->second->version_builder();
4936
4937 Version* v = new Version(cfd, this, file_options_,
4938 *cfd->GetLatestMutableCFOptions(),
4939 current_version_number_++);
4940 builder->SaveTo(v->storage_info());
4941 v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
4942
4943 printf("--------------- Column family \"%s\" (ID %" PRIu32
4944 ") --------------\n",
4945 cfd->GetName().c_str(), cfd->GetID());
4946 printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
4947 auto comparator = comparators.find(cfd->GetID());
4948 if (comparator != comparators.end()) {
4949 printf("comparator: %s\n", comparator->second.c_str());
4950 } else {
4951 printf("comparator: <NO COMPARATOR>\n");
4952 }
4953 printf("%s \n", v->DebugString(hex).c_str());
4954 delete v;
4955 }
4956
4957 next_file_number_.store(next_file + 1);
4958 last_allocated_sequence_ = last_sequence;
4959 last_published_sequence_ = last_sequence;
4960 last_sequence_ = last_sequence;
4961 prev_log_number_ = previous_log_number;
4962
4963 printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
4964 " prev_log_number %" PRIu64 " max_column_family %" PRIu32
4965 " min_log_number_to_keep "
4966 "%" PRIu64 "\n",
4967 next_file_number_.load(), last_sequence, previous_log_number,
4968 column_family_set_->GetMaxColumnFamily(),
4969 min_log_number_to_keep_2pc());
4970 }
4971
4972 return s;
4973 }
4974 #endif // ROCKSDB_LITE
4975
MarkFileNumberUsed(uint64_t number)4976 void VersionSet::MarkFileNumberUsed(uint64_t number) {
4977 // only called during recovery and repair which are single threaded, so this
4978 // works because there can't be concurrent calls
4979 if (next_file_number_.load(std::memory_order_relaxed) <= number) {
4980 next_file_number_.store(number + 1, std::memory_order_relaxed);
4981 }
4982 }
4983 // Called only either from ::LogAndApply which is protected by mutex or during
4984 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)4985 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
4986 if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
4987 min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
4988 }
4989 }
4990
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,log::Writer * log)4991 Status VersionSet::WriteCurrentStateToManifest(
4992 const std::unordered_map<uint32_t, MutableCFState>& curr_state,
4993 log::Writer* log) {
4994 // TODO: Break up into multiple records to reduce memory usage on recovery?
4995
4996 // WARNING: This method doesn't hold a mutex!!
4997
4998 // This is done without DB mutex lock held, but only within single-threaded
4999 // LogAndApply. Column family manipulations can only happen within LogAndApply
5000 // (the same single thread), so we're safe to iterate.
5001
5002 if (db_options_->write_dbid_to_manifest) {
5003 VersionEdit edit_for_db_id;
5004 assert(!db_id_.empty());
5005 edit_for_db_id.SetDBId(db_id_);
5006 std::string db_id_record;
5007 if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5008 return Status::Corruption("Unable to Encode VersionEdit:" +
5009 edit_for_db_id.DebugString(true));
5010 }
5011 Status add_record = log->AddRecord(db_id_record);
5012 if (!add_record.ok()) {
5013 return add_record;
5014 }
5015 }
5016
5017 for (auto cfd : *column_family_set_) {
5018 if (cfd->IsDropped()) {
5019 continue;
5020 }
5021 assert(cfd->initialized());
5022 {
5023 // Store column family info
5024 VersionEdit edit;
5025 if (cfd->GetID() != 0) {
5026 // default column family is always there,
5027 // no need to explicitly write it
5028 edit.AddColumnFamily(cfd->GetName());
5029 edit.SetColumnFamily(cfd->GetID());
5030 }
5031 edit.SetComparatorName(
5032 cfd->internal_comparator().user_comparator()->Name());
5033 std::string record;
5034 if (!edit.EncodeTo(&record)) {
5035 return Status::Corruption(
5036 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5037 }
5038 Status s = log->AddRecord(record);
5039 if (!s.ok()) {
5040 return s;
5041 }
5042 }
5043
5044 {
5045 // Save files
5046 VersionEdit edit;
5047 edit.SetColumnFamily(cfd->GetID());
5048
5049 for (int level = 0; level < cfd->NumberLevels(); level++) {
5050 for (const auto& f :
5051 cfd->current()->storage_info()->LevelFiles(level)) {
5052 edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5053 f->fd.GetFileSize(), f->smallest, f->largest,
5054 f->fd.smallest_seqno, f->fd.largest_seqno,
5055 f->marked_for_compaction, f->oldest_blob_file_number,
5056 f->oldest_ancester_time, f->file_creation_time,
5057 f->file_checksum, f->file_checksum_func_name);
5058 }
5059 }
5060 const auto iter = curr_state.find(cfd->GetID());
5061 assert(iter != curr_state.end());
5062 uint64_t log_number = iter->second.log_number;
5063 edit.SetLogNumber(log_number);
5064 std::string record;
5065 if (!edit.EncodeTo(&record)) {
5066 return Status::Corruption(
5067 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5068 }
5069 Status s = log->AddRecord(record);
5070 if (!s.ok()) {
5071 return s;
5072 }
5073 }
5074 }
5075 return Status::OK();
5076 }
5077
5078 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5079 // function is called repeatedly with consecutive pairs of slices. For example
5080 // if the slice list is [a, b, c, d] this function is called with arguments
5081 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5082 // we avoid doing binary search for the keys b and c twice and instead somehow
5083 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5084 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5085 Version* v, const Slice& start,
5086 const Slice& end, int start_level,
5087 int end_level, TableReaderCaller caller) {
5088 const auto& icmp = v->cfd_->internal_comparator();
5089
5090 // pre-condition
5091 assert(icmp.Compare(start, end) <= 0);
5092
5093 uint64_t total_full_size = 0;
5094 const auto* vstorage = v->storage_info();
5095 const int num_non_empty_levels = vstorage->num_non_empty_levels();
5096 end_level = (end_level == -1) ? num_non_empty_levels
5097 : std::min(end_level, num_non_empty_levels);
5098
5099 assert(start_level <= end_level);
5100
5101 // Outline of the optimization that uses options.files_size_error_margin.
5102 // When approximating the files total size that is used to store a keys range,
5103 // we first sum up the sizes of the files that fully fall into the range.
5104 // Then we sum up the sizes of all the files that may intersect with the range
5105 // (this includes all files in L0 as well). Then, if total_intersecting_size
5106 // is smaller than total_full_size * options.files_size_error_margin - we can
5107 // infer that the intersecting files have a sufficiently negligible
5108 // contribution to the total size, and we can approximate the storage required
5109 // for the keys in range as just half of the intersecting_files_size.
5110 // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5111 // approximation is limited to only ~10% of the total size of files that fully
5112 // fall into the keys range. In such case, this helps to avoid a costly
5113 // process of binary searching the intersecting files that is required only
5114 // for a more precise calculation of the total size.
5115
5116 autovector<FdWithKeyRange*, 32> first_files;
5117 autovector<FdWithKeyRange*, 16> last_files;
5118
5119 // scan all the levels
5120 for (int level = start_level; level < end_level; ++level) {
5121 const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5122 if (files_brief.num_files == 0) {
5123 // empty level, skip exploration
5124 continue;
5125 }
5126
5127 if (level == 0) {
5128 // level 0 files are not in sorted order, we need to iterate through
5129 // the list to compute the total bytes that require scanning,
5130 // so handle the case explicitly (similarly to first_files case)
5131 for (size_t i = 0; i < files_brief.num_files; i++) {
5132 first_files.push_back(&files_brief.files[i]);
5133 }
5134 continue;
5135 }
5136
5137 assert(level > 0);
5138 assert(files_brief.num_files > 0);
5139
5140 // identify the file position for start key
5141 const int idx_start =
5142 FindFileInRange(icmp, files_brief, start, 0,
5143 static_cast<uint32_t>(files_brief.num_files - 1));
5144 assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5145
5146 // identify the file position for end key
5147 int idx_end = idx_start;
5148 if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5149 idx_end =
5150 FindFileInRange(icmp, files_brief, end, idx_start,
5151 static_cast<uint32_t>(files_brief.num_files - 1));
5152 }
5153 assert(idx_end >= idx_start &&
5154 static_cast<size_t>(idx_end) < files_brief.num_files);
5155
5156 // scan all files from the starting index to the ending index
5157 // (inferred from the sorted order)
5158
5159 // first scan all the intermediate full files (excluding first and last)
5160 for (int i = idx_start + 1; i < idx_end; ++i) {
5161 uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5162 // The entire file falls into the range, so we can just take its size.
5163 assert(file_size ==
5164 ApproximateSize(v, files_brief.files[i], start, end, caller));
5165 total_full_size += file_size;
5166 }
5167
5168 // save the first and the last files (which may be the same file), so we
5169 // can scan them later.
5170 first_files.push_back(&files_brief.files[idx_start]);
5171 if (idx_start != idx_end) {
5172 // we need to estimate size for both files, only if they are different
5173 last_files.push_back(&files_brief.files[idx_end]);
5174 }
5175 }
5176
5177 // The sum of all file sizes that intersect the [start, end] keys range.
5178 uint64_t total_intersecting_size = 0;
5179 for (const auto* file_ptr : first_files) {
5180 total_intersecting_size += file_ptr->fd.GetFileSize();
5181 }
5182 for (const auto* file_ptr : last_files) {
5183 total_intersecting_size += file_ptr->fd.GetFileSize();
5184 }
5185
5186 // Now scan all the first & last files at each level, and estimate their size.
5187 // If the total_intersecting_size is less than X% of the total_full_size - we
5188 // want to approximate the result in order to avoid the costly binary search
5189 // inside ApproximateSize. We use half of file size as an approximation below.
5190
5191 const double margin = options.files_size_error_margin;
5192 if (margin > 0 && total_intersecting_size <
5193 static_cast<uint64_t>(total_full_size * margin)) {
5194 total_full_size += total_intersecting_size / 2;
5195 } else {
5196 // Estimate for all the first files, at each level
5197 for (const auto file_ptr : first_files) {
5198 total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5199 }
5200
5201 // Estimate for all the last files, at each level
5202 for (const auto file_ptr : last_files) {
5203 // We could use ApproximateSize here, but calling ApproximateOffsetOf
5204 // directly is just more efficient.
5205 total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5206 }
5207 }
5208
5209 return total_full_size;
5210 }
5211
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5212 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5213 const Slice& key,
5214 TableReaderCaller caller) {
5215 // pre-condition
5216 assert(v);
5217 const auto& icmp = v->cfd_->internal_comparator();
5218
5219 uint64_t result = 0;
5220 if (icmp.Compare(f.largest_key, key) <= 0) {
5221 // Entire file is before "key", so just add the file size
5222 result = f.fd.GetFileSize();
5223 } else if (icmp.Compare(f.smallest_key, key) > 0) {
5224 // Entire file is after "key", so ignore
5225 result = 0;
5226 } else {
5227 // "key" falls in the range for this table. Add the
5228 // approximate offset of "key" within the table.
5229 TableCache* table_cache = v->cfd_->table_cache();
5230 if (table_cache != nullptr) {
5231 result = table_cache->ApproximateOffsetOf(
5232 key, f.file_metadata->fd, caller, icmp,
5233 v->GetMutableCFOptions().prefix_extractor.get());
5234 }
5235 }
5236 return result;
5237 }
5238
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5239 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5240 const Slice& start, const Slice& end,
5241 TableReaderCaller caller) {
5242 // pre-condition
5243 assert(v);
5244 const auto& icmp = v->cfd_->internal_comparator();
5245 assert(icmp.Compare(start, end) <= 0);
5246
5247 if (icmp.Compare(f.largest_key, start) <= 0 ||
5248 icmp.Compare(f.smallest_key, end) > 0) {
5249 // Entire file is before or after the start/end keys range
5250 return 0;
5251 }
5252
5253 if (icmp.Compare(f.smallest_key, start) >= 0) {
5254 // Start of the range is before the file start - approximate by end offset
5255 return ApproximateOffsetOf(v, f, end, caller);
5256 }
5257
5258 if (icmp.Compare(f.largest_key, end) < 0) {
5259 // End of the range is after the file end - approximate by subtracting
5260 // start offset from the file size
5261 uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5262 assert(f.fd.GetFileSize() >= start_offset);
5263 return f.fd.GetFileSize() - start_offset;
5264 }
5265
5266 // The interval falls entirely in the range for this file.
5267 TableCache* table_cache = v->cfd_->table_cache();
5268 if (table_cache == nullptr) {
5269 return 0;
5270 }
5271 return table_cache->ApproximateSize(
5272 start, end, f.file_metadata->fd, caller, icmp,
5273 v->GetMutableCFOptions().prefix_extractor.get());
5274 }
5275
AddLiveFiles(std::vector<FileDescriptor> * live_list)5276 void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
5277 // pre-calculate space requirement
5278 int64_t total_files = 0;
5279 for (auto cfd : *column_family_set_) {
5280 if (!cfd->initialized()) {
5281 continue;
5282 }
5283 Version* dummy_versions = cfd->dummy_versions();
5284 for (Version* v = dummy_versions->next_; v != dummy_versions;
5285 v = v->next_) {
5286 const auto* vstorage = v->storage_info();
5287 for (int level = 0; level < vstorage->num_levels(); level++) {
5288 total_files += vstorage->LevelFiles(level).size();
5289 }
5290 }
5291 }
5292
5293 // just one time extension to the right size
5294 live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
5295
5296 for (auto cfd : *column_family_set_) {
5297 if (!cfd->initialized()) {
5298 continue;
5299 }
5300 auto* current = cfd->current();
5301 bool found_current = false;
5302 Version* dummy_versions = cfd->dummy_versions();
5303 for (Version* v = dummy_versions->next_; v != dummy_versions;
5304 v = v->next_) {
5305 v->AddLiveFiles(live_list);
5306 if (v == current) {
5307 found_current = true;
5308 }
5309 }
5310 if (!found_current && current != nullptr) {
5311 // Should never happen unless it is a bug.
5312 assert(false);
5313 current->AddLiveFiles(live_list);
5314 }
5315 }
5316 }
5317
MakeInputIterator(const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5318 InternalIterator* VersionSet::MakeInputIterator(
5319 const Compaction* c, RangeDelAggregator* range_del_agg,
5320 const FileOptions& file_options_compactions) {
5321 auto cfd = c->column_family_data();
5322 ReadOptions read_options;
5323 read_options.verify_checksums = true;
5324 read_options.fill_cache = false;
5325 // Compaction iterators shouldn't be confined to a single prefix.
5326 // Compactions use Seek() for
5327 // (a) concurrent compactions,
5328 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
5329 read_options.total_order_seek = true;
5330
5331 // Level-0 files have to be merged together. For other levels,
5332 // we will make a concatenating iterator per level.
5333 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5334 const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5335 c->num_input_levels() - 1
5336 : c->num_input_levels());
5337 InternalIterator** list = new InternalIterator* [space];
5338 size_t num = 0;
5339 for (size_t which = 0; which < c->num_input_levels(); which++) {
5340 if (c->input_levels(which)->num_files != 0) {
5341 if (c->level(which) == 0) {
5342 const LevelFilesBrief* flevel = c->input_levels(which);
5343 for (size_t i = 0; i < flevel->num_files; i++) {
5344 list[num++] = cfd->table_cache()->NewIterator(
5345 read_options, file_options_compactions,
5346 cfd->internal_comparator(),
5347 *flevel->files[i].file_metadata, range_del_agg,
5348 c->mutable_cf_options()->prefix_extractor.get(),
5349 /*table_reader_ptr=*/nullptr,
5350 /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5351 /*arena=*/nullptr,
5352 /*skip_filters=*/false, /*level=*/static_cast<int>(which),
5353 /*smallest_compaction_key=*/nullptr,
5354 /*largest_compaction_key=*/nullptr);
5355 }
5356 } else {
5357 // Create concatenating iterator for the files from this level
5358 list[num++] = new LevelIterator(
5359 cfd->table_cache(), read_options, file_options_compactions,
5360 cfd->internal_comparator(), c->input_levels(which),
5361 c->mutable_cf_options()->prefix_extractor.get(),
5362 /*should_sample=*/false,
5363 /*no per level latency histogram=*/nullptr,
5364 TableReaderCaller::kCompaction, /*skip_filters=*/false,
5365 /*level=*/static_cast<int>(which), range_del_agg,
5366 c->boundaries(which));
5367 }
5368 }
5369 }
5370 assert(num <= space);
5371 InternalIterator* result =
5372 NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5373 static_cast<int>(num));
5374 delete[] list;
5375 return result;
5376 }
5377
5378 // verify that the files listed in this compaction are present
5379 // in the current version
VerifyCompactionFileConsistency(Compaction * c)5380 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5381 #ifndef NDEBUG
5382 Version* version = c->column_family_data()->current();
5383 const VersionStorageInfo* vstorage = version->storage_info();
5384 if (c->input_version() != version) {
5385 ROCKS_LOG_INFO(
5386 db_options_->info_log,
5387 "[%s] compaction output being applied to a different base version from"
5388 " input version",
5389 c->column_family_data()->GetName().c_str());
5390
5391 if (vstorage->compaction_style_ == kCompactionStyleLevel &&
5392 c->start_level() == 0 && c->num_input_levels() > 2U) {
5393 // We are doing a L0->base_level compaction. The assumption is if
5394 // base level is not L1, levels from L1 to base_level - 1 is empty.
5395 // This is ensured by having one compaction from L0 going on at the
5396 // same time in level-based compaction. So that during the time, no
5397 // compaction/flush can put files to those levels.
5398 for (int l = c->start_level() + 1; l < c->output_level(); l++) {
5399 if (vstorage->NumLevelFiles(l) != 0) {
5400 return false;
5401 }
5402 }
5403 }
5404 }
5405
5406 for (size_t input = 0; input < c->num_input_levels(); ++input) {
5407 int level = c->level(input);
5408 for (size_t i = 0; i < c->num_input_files(input); ++i) {
5409 uint64_t number = c->input(input, i)->fd.GetNumber();
5410 bool found = false;
5411 for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5412 FileMetaData* f = vstorage->files_[level][j];
5413 if (f->fd.GetNumber() == number) {
5414 found = true;
5415 break;
5416 }
5417 }
5418 if (!found) {
5419 return false; // input files non existent in current version
5420 }
5421 }
5422 }
5423 #else
5424 (void)c;
5425 #endif
5426 return true; // everything good
5427 }
5428
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5429 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5430 FileMetaData** meta,
5431 ColumnFamilyData** cfd) {
5432 for (auto cfd_iter : *column_family_set_) {
5433 if (!cfd_iter->initialized()) {
5434 continue;
5435 }
5436 Version* version = cfd_iter->current();
5437 const auto* vstorage = version->storage_info();
5438 for (int level = 0; level < vstorage->num_levels(); level++) {
5439 for (const auto& file : vstorage->LevelFiles(level)) {
5440 if (file->fd.GetNumber() == number) {
5441 *meta = file;
5442 *filelevel = level;
5443 *cfd = cfd_iter;
5444 return Status::OK();
5445 }
5446 }
5447 }
5448 }
5449 return Status::NotFound("File not present in any level");
5450 }
5451
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5452 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5453 for (auto cfd : *column_family_set_) {
5454 if (cfd->IsDropped() || !cfd->initialized()) {
5455 continue;
5456 }
5457 for (int level = 0; level < cfd->NumberLevels(); level++) {
5458 for (const auto& file :
5459 cfd->current()->storage_info()->LevelFiles(level)) {
5460 LiveFileMetaData filemetadata;
5461 filemetadata.column_family_name = cfd->GetName();
5462 uint32_t path_id = file->fd.GetPathId();
5463 if (path_id < cfd->ioptions()->cf_paths.size()) {
5464 filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5465 } else {
5466 assert(!cfd->ioptions()->cf_paths.empty());
5467 filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5468 }
5469 const uint64_t file_number = file->fd.GetNumber();
5470 filemetadata.name = MakeTableFileName("", file_number);
5471 filemetadata.file_number = file_number;
5472 filemetadata.level = level;
5473 filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5474 filemetadata.smallestkey = file->smallest.user_key().ToString();
5475 filemetadata.largestkey = file->largest.user_key().ToString();
5476 filemetadata.smallest_seqno = file->fd.smallest_seqno;
5477 filemetadata.largest_seqno = file->fd.largest_seqno;
5478 filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5479 std::memory_order_relaxed);
5480 filemetadata.being_compacted = file->being_compacted;
5481 filemetadata.num_entries = file->num_entries;
5482 filemetadata.num_deletions = file->num_deletions;
5483 filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5484 filemetadata.file_checksum = file->file_checksum;
5485 filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5486 metadata->push_back(filemetadata);
5487 }
5488 }
5489 }
5490 }
5491
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5492 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5493 std::vector<std::string>* manifest_filenames,
5494 uint64_t min_pending_output) {
5495 assert(manifest_filenames->empty());
5496 obsolete_manifests_.swap(*manifest_filenames);
5497 std::vector<ObsoleteFileInfo> pending_files;
5498 for (auto& f : obsolete_files_) {
5499 if (f.metadata->fd.GetNumber() < min_pending_output) {
5500 files->push_back(std::move(f));
5501 } else {
5502 pending_files.push_back(std::move(f));
5503 }
5504 }
5505 obsolete_files_.swap(pending_files);
5506 }
5507
CreateColumnFamily(const ColumnFamilyOptions & cf_options,VersionEdit * edit)5508 ColumnFamilyData* VersionSet::CreateColumnFamily(
5509 const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
5510 assert(edit->is_column_family_add_);
5511
5512 MutableCFOptions dummy_cf_options;
5513 Version* dummy_versions =
5514 new Version(nullptr, this, file_options_, dummy_cf_options);
5515 // Ref() dummy version once so that later we can call Unref() to delete it
5516 // by avoiding calling "delete" explicitly (~Version is private)
5517 dummy_versions->Ref();
5518 auto new_cfd = column_family_set_->CreateColumnFamily(
5519 edit->column_family_name_, edit->column_family_, dummy_versions,
5520 cf_options);
5521
5522 Version* v = new Version(new_cfd, this, file_options_,
5523 *new_cfd->GetLatestMutableCFOptions(),
5524 current_version_number_++);
5525
5526 // Fill level target base information.
5527 v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5528 *new_cfd->GetLatestMutableCFOptions());
5529 AppendVersion(new_cfd, v);
5530 // GetLatestMutableCFOptions() is safe here without mutex since the
5531 // cfd is not available to client
5532 new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5533 LastSequence());
5534 new_cfd->SetLogNumber(edit->log_number_);
5535 return new_cfd;
5536 }
5537
GetNumLiveVersions(Version * dummy_versions)5538 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5539 uint64_t count = 0;
5540 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5541 count++;
5542 }
5543 return count;
5544 }
5545
GetTotalSstFilesSize(Version * dummy_versions)5546 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5547 std::unordered_set<uint64_t> unique_files;
5548 uint64_t total_files_size = 0;
5549 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5550 VersionStorageInfo* storage_info = v->storage_info();
5551 for (int level = 0; level < storage_info->num_levels_; level++) {
5552 for (const auto& file_meta : storage_info->LevelFiles(level)) {
5553 if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5554 unique_files.end()) {
5555 unique_files.insert(file_meta->fd.packed_number_and_path_id);
5556 total_files_size += file_meta->fd.GetFileSize();
5557 }
5558 }
5559 }
5560 }
5561 return total_files_size;
5562 }
5563
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller)5564 ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
5565 const ImmutableDBOptions* _db_options,
5566 const FileOptions& _file_options,
5567 Cache* table_cache,
5568 WriteBufferManager* write_buffer_manager,
5569 WriteController* write_controller)
5570 : VersionSet(dbname, _db_options, _file_options, table_cache,
5571 write_buffer_manager, write_controller,
5572 /*block_cache_tracer=*/nullptr),
5573 number_of_edits_to_skip_(0) {}
5574
~ReactiveVersionSet()5575 ReactiveVersionSet::~ReactiveVersionSet() {}
5576
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5577 Status ReactiveVersionSet::Recover(
5578 const std::vector<ColumnFamilyDescriptor>& column_families,
5579 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5580 std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5581 std::unique_ptr<Status>* manifest_reader_status) {
5582 assert(manifest_reader != nullptr);
5583 assert(manifest_reporter != nullptr);
5584 assert(manifest_reader_status != nullptr);
5585
5586 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
5587 for (const auto& cf : column_families) {
5588 cf_name_to_options.insert({cf.name, cf.options});
5589 }
5590
5591 // add default column family
5592 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
5593 if (default_cf_iter == cf_name_to_options.end()) {
5594 return Status::InvalidArgument("Default column family not specified");
5595 }
5596 VersionEdit default_cf_edit;
5597 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
5598 default_cf_edit.SetColumnFamily(0);
5599 ColumnFamilyData* default_cfd =
5600 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
5601 // In recovery, nobody else can access it, so it's fine to set it to be
5602 // initialized earlier.
5603 default_cfd->set_initialized();
5604 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
5605 builders;
5606 std::unordered_map<int, std::string> column_families_not_found;
5607 builders.insert(
5608 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
5609 new BaseReferencedVersionBuilder(default_cfd))));
5610
5611 manifest_reader_status->reset(new Status());
5612 manifest_reporter->reset(new LogReporter());
5613 static_cast<LogReporter*>(manifest_reporter->get())->status =
5614 manifest_reader_status->get();
5615 Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5616 log::Reader* reader = manifest_reader->get();
5617
5618 int retry = 0;
5619 VersionEdit version_edit;
5620 while (s.ok() && retry < 1) {
5621 assert(reader != nullptr);
5622 Slice record;
5623 std::string scratch;
5624 s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
5625 column_families_not_found, builders, &version_edit);
5626 if (s.ok()) {
5627 bool enough = version_edit.has_next_file_number_ &&
5628 version_edit.has_log_number_ &&
5629 version_edit.has_last_sequence_;
5630 if (enough) {
5631 for (const auto& cf : column_families) {
5632 auto cfd = column_family_set_->GetColumnFamily(cf.name);
5633 if (cfd == nullptr) {
5634 enough = false;
5635 break;
5636 }
5637 }
5638 }
5639 if (enough) {
5640 for (const auto& cf : column_families) {
5641 auto cfd = column_family_set_->GetColumnFamily(cf.name);
5642 assert(cfd != nullptr);
5643 if (!cfd->IsDropped()) {
5644 auto builder_iter = builders.find(cfd->GetID());
5645 assert(builder_iter != builders.end());
5646 auto builder = builder_iter->second->version_builder();
5647 assert(builder != nullptr);
5648 s = builder->LoadTableHandlers(
5649 cfd->internal_stats(), db_options_->max_file_opening_threads,
5650 false /* prefetch_index_and_filter_in_cache */,
5651 true /* is_initial_load */,
5652 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
5653 if (!s.ok()) {
5654 enough = false;
5655 if (s.IsPathNotFound()) {
5656 s = Status::OK();
5657 }
5658 break;
5659 }
5660 }
5661 }
5662 }
5663 if (enough) {
5664 break;
5665 }
5666 }
5667 ++retry;
5668 }
5669
5670 if (s.ok()) {
5671 if (!version_edit.has_prev_log_number_) {
5672 version_edit.prev_log_number_ = 0;
5673 }
5674 column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
5675
5676 MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
5677 MarkFileNumberUsed(version_edit.prev_log_number_);
5678 MarkFileNumberUsed(version_edit.log_number_);
5679
5680 for (auto cfd : *column_family_set_) {
5681 assert(builders.count(cfd->GetID()) > 0);
5682 auto builder = builders[cfd->GetID()]->version_builder();
5683 if (!builder->CheckConsistencyForNumLevels()) {
5684 s = Status::InvalidArgument(
5685 "db has more levels than options.num_levels");
5686 break;
5687 }
5688 }
5689 }
5690
5691 if (s.ok()) {
5692 for (auto cfd : *column_family_set_) {
5693 if (cfd->IsDropped()) {
5694 continue;
5695 }
5696 assert(cfd->initialized());
5697 auto builders_iter = builders.find(cfd->GetID());
5698 assert(builders_iter != builders.end());
5699 auto* builder = builders_iter->second->version_builder();
5700
5701 Version* v = new Version(cfd, this, file_options_,
5702 *cfd->GetLatestMutableCFOptions(),
5703 current_version_number_++);
5704 builder->SaveTo(v->storage_info());
5705
5706 // Install recovered version
5707 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
5708 !(db_options_->skip_stats_update_on_db_open));
5709 AppendVersion(cfd, v);
5710 }
5711 next_file_number_.store(version_edit.next_file_number_ + 1);
5712 last_allocated_sequence_ = version_edit.last_sequence_;
5713 last_published_sequence_ = version_edit.last_sequence_;
5714 last_sequence_ = version_edit.last_sequence_;
5715 prev_log_number_ = version_edit.prev_log_number_;
5716 for (auto cfd : *column_family_set_) {
5717 if (cfd->IsDropped()) {
5718 continue;
5719 }
5720 ROCKS_LOG_INFO(db_options_->info_log,
5721 "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
5722 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
5723 }
5724 }
5725 return s;
5726 }
5727
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unordered_set<ColumnFamilyData * > * cfds_changed)5728 Status ReactiveVersionSet::ReadAndApply(
5729 InstrumentedMutex* mu,
5730 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5731 std::unordered_set<ColumnFamilyData*>* cfds_changed) {
5732 assert(manifest_reader != nullptr);
5733 assert(cfds_changed != nullptr);
5734 mu->AssertHeld();
5735
5736 Status s;
5737 uint64_t applied_edits = 0;
5738 while (s.ok()) {
5739 Slice record;
5740 std::string scratch;
5741 log::Reader* reader = manifest_reader->get();
5742 std::string old_manifest_path = reader->file()->file_name();
5743 while (reader->ReadRecord(&record, &scratch)) {
5744 VersionEdit edit;
5745 s = edit.DecodeFrom(record);
5746 if (!s.ok()) {
5747 break;
5748 }
5749
5750 // Skip the first VersionEdits of each MANIFEST generated by
5751 // VersionSet::WriteCurrentStatetoManifest.
5752 if (number_of_edits_to_skip_ > 0) {
5753 ColumnFamilyData* cfd =
5754 column_family_set_->GetColumnFamily(edit.column_family_);
5755 if (cfd != nullptr && !cfd->IsDropped()) {
5756 --number_of_edits_to_skip_;
5757 }
5758 continue;
5759 }
5760
5761 s = read_buffer_.AddEdit(&edit);
5762 if (!s.ok()) {
5763 break;
5764 }
5765 VersionEdit temp_edit;
5766 if (edit.is_in_atomic_group_) {
5767 if (read_buffer_.IsFull()) {
5768 // Apply edits in an atomic group when we have read all edits in the
5769 // group.
5770 for (auto& e : read_buffer_.replay_buffer()) {
5771 s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
5772 if (!s.ok()) {
5773 break;
5774 }
5775 applied_edits++;
5776 }
5777 if (!s.ok()) {
5778 break;
5779 }
5780 read_buffer_.Clear();
5781 }
5782 } else {
5783 // Apply a normal edit immediately.
5784 s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
5785 if (s.ok()) {
5786 applied_edits++;
5787 }
5788 }
5789 }
5790 if (!s.ok()) {
5791 // Clear the buffer if we fail to decode/apply an edit.
5792 read_buffer_.Clear();
5793 }
5794 // It's possible that:
5795 // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
5796 // 2) we have finished reading the current MANIFEST.
5797 // 3) we have encountered an IOError reading the current MANIFEST.
5798 // We need to look for the next MANIFEST and start from there. If we cannot
5799 // find the next MANIFEST, we should exit the loop.
5800 s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
5801 reader = manifest_reader->get();
5802 if (s.ok()) {
5803 if (reader->file()->file_name() == old_manifest_path) {
5804 // Still processing the same MANIFEST, thus no need to continue this
5805 // loop since no record is available if we have reached here.
5806 break;
5807 } else {
5808 // We have switched to a new MANIFEST whose first records have been
5809 // generated by VersionSet::WriteCurrentStatetoManifest. Since the
5810 // secondary instance has already finished recovering upon start, there
5811 // is no need for the secondary to process these records. Actually, if
5812 // the secondary were to replay these records, the secondary may end up
5813 // adding the same SST files AGAIN to each column family, causing
5814 // consistency checks done by VersionBuilder to fail. Therefore, we
5815 // record the number of records to skip at the beginning of the new
5816 // MANIFEST and ignore them.
5817 number_of_edits_to_skip_ = 0;
5818 for (auto* cfd : *column_family_set_) {
5819 if (cfd->IsDropped()) {
5820 continue;
5821 }
5822 // Increase number_of_edits_to_skip by 2 because
5823 // WriteCurrentStatetoManifest() writes 2 version edits for each
5824 // column family at the beginning of the newly-generated MANIFEST.
5825 // TODO(yanqin) remove hard-coded value.
5826 if (db_options_->write_dbid_to_manifest) {
5827 number_of_edits_to_skip_ += 3;
5828 } else {
5829 number_of_edits_to_skip_ += 2;
5830 }
5831 }
5832 }
5833 }
5834 }
5835
5836 if (s.ok()) {
5837 for (auto cfd : *column_family_set_) {
5838 auto builder_iter = active_version_builders_.find(cfd->GetID());
5839 if (builder_iter == active_version_builders_.end()) {
5840 continue;
5841 }
5842 auto builder = builder_iter->second->version_builder();
5843 if (!builder->CheckConsistencyForNumLevels()) {
5844 s = Status::InvalidArgument(
5845 "db has more levels than options.num_levels");
5846 break;
5847 }
5848 }
5849 }
5850 TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
5851 &applied_edits);
5852 return s;
5853 }
5854
ApplyOneVersionEditToBuilder(VersionEdit & edit,std::unordered_set<ColumnFamilyData * > * cfds_changed,VersionEdit * version_edit)5855 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
5856 VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
5857 VersionEdit* version_edit) {
5858 ColumnFamilyData* cfd =
5859 column_family_set_->GetColumnFamily(edit.column_family_);
5860
5861 // If we cannot find this column family in our column family set, then it
5862 // may be a new column family created by the primary after the secondary
5863 // starts. It is also possible that the secondary instance opens only a subset
5864 // of column families. Ignore it for now.
5865 if (nullptr == cfd) {
5866 return Status::OK();
5867 }
5868 if (active_version_builders_.find(edit.column_family_) ==
5869 active_version_builders_.end() &&
5870 !cfd->IsDropped()) {
5871 std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
5872 new BaseReferencedVersionBuilder(cfd));
5873 active_version_builders_.insert(
5874 std::make_pair(edit.column_family_, std::move(builder_guard)));
5875 }
5876
5877 auto builder_iter = active_version_builders_.find(edit.column_family_);
5878 assert(builder_iter != active_version_builders_.end());
5879 auto builder = builder_iter->second->version_builder();
5880 assert(builder != nullptr);
5881
5882 if (edit.is_column_family_add_) {
5883 // TODO (yanqin) for now the secondary ignores column families created
5884 // after Open. This also simplifies handling of switching to a new MANIFEST
5885 // and processing the snapshot of the system at the beginning of the
5886 // MANIFEST.
5887 } else if (edit.is_column_family_drop_) {
5888 // Drop the column family by setting it to be 'dropped' without destroying
5889 // the column family handle.
5890 // TODO (haoyu) figure out how to handle column faimly drop for
5891 // secondary instance. (Is it possible that the ref count for cfd is 0 but
5892 // the ref count for its versions is higher than 0?)
5893 cfd->SetDropped();
5894 if (cfd->UnrefAndTryDelete()) {
5895 cfd = nullptr;
5896 }
5897 active_version_builders_.erase(builder_iter);
5898 } else {
5899 Status s = builder->Apply(&edit);
5900 if (!s.ok()) {
5901 return s;
5902 }
5903 }
5904 Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
5905 if (!s.ok()) {
5906 return s;
5907 }
5908
5909 if (cfd != nullptr && !cfd->IsDropped()) {
5910 s = builder->LoadTableHandlers(
5911 cfd->internal_stats(), db_options_->max_file_opening_threads,
5912 false /* prefetch_index_and_filter_in_cache */,
5913 false /* is_initial_load */,
5914 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
5915 TEST_SYNC_POINT_CALLBACK(
5916 "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
5917 "AfterLoadTableHandlers",
5918 &s);
5919
5920 if (s.ok()) {
5921 auto version = new Version(cfd, this, file_options_,
5922 *cfd->GetLatestMutableCFOptions(),
5923 current_version_number_++);
5924 builder->SaveTo(version->storage_info());
5925 version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
5926 AppendVersion(cfd, version);
5927 active_version_builders_.erase(builder_iter);
5928 if (cfds_changed->count(cfd) == 0) {
5929 cfds_changed->insert(cfd);
5930 }
5931 } else if (s.IsPathNotFound()) {
5932 s = Status::OK();
5933 }
5934 // Some other error has occurred during LoadTableHandlers.
5935 }
5936
5937 if (version_edit->HasNextFile()) {
5938 next_file_number_.store(version_edit->next_file_number_ + 1);
5939 }
5940 if (version_edit->has_last_sequence_) {
5941 last_allocated_sequence_ = version_edit->last_sequence_;
5942 last_published_sequence_ = version_edit->last_sequence_;
5943 last_sequence_ = version_edit->last_sequence_;
5944 }
5945 if (version_edit->has_prev_log_number_) {
5946 prev_log_number_ = version_edit->prev_log_number_;
5947 MarkFileNumberUsed(version_edit->prev_log_number_);
5948 }
5949 if (version_edit->has_log_number_) {
5950 MarkFileNumberUsed(version_edit->log_number_);
5951 }
5952 column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
5953 MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
5954 return s;
5955 }
5956
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)5957 Status ReactiveVersionSet::MaybeSwitchManifest(
5958 log::Reader::Reporter* reporter,
5959 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
5960 assert(manifest_reader != nullptr);
5961 Status s;
5962 do {
5963 std::string manifest_path;
5964 s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
5965 &manifest_file_number_);
5966 std::unique_ptr<FSSequentialFile> manifest_file;
5967 if (s.ok()) {
5968 if (nullptr == manifest_reader->get() ||
5969 manifest_reader->get()->file()->file_name() != manifest_path) {
5970 TEST_SYNC_POINT(
5971 "ReactiveVersionSet::MaybeSwitchManifest:"
5972 "AfterGetCurrentManifestPath:0");
5973 TEST_SYNC_POINT(
5974 "ReactiveVersionSet::MaybeSwitchManifest:"
5975 "AfterGetCurrentManifestPath:1");
5976 s = fs_->NewSequentialFile(manifest_path,
5977 env_->OptimizeForManifestRead(file_options_),
5978 &manifest_file, nullptr);
5979 } else {
5980 // No need to switch manifest.
5981 break;
5982 }
5983 }
5984 std::unique_ptr<SequentialFileReader> manifest_file_reader;
5985 if (s.ok()) {
5986 manifest_file_reader.reset(
5987 new SequentialFileReader(std::move(manifest_file), manifest_path,
5988 db_options_->log_readahead_size));
5989 manifest_reader->reset(new log::FragmentBufferedReader(
5990 nullptr, std::move(manifest_file_reader), reporter,
5991 true /* checksum */, 0 /* log_number */));
5992 ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
5993 manifest_path.c_str());
5994 // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
5995 // active_version_builders_ map because we choose to construct the
5996 // versions from scratch, thanks to the first part of each MANIFEST
5997 // written by VersionSet::WriteCurrentStatetoManifest. This is not
5998 // necessary, but we choose this at present for the sake of simplicity.
5999 active_version_builders_.clear();
6000 }
6001 } while (s.IsPathNotFound());
6002 return s;
6003 }
6004
6005 } // namespace ROCKSDB_NAMESPACE
6006