1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_set.h"
11 
12 #include <algorithm>
13 #include <array>
14 #include <cinttypes>
15 #include <cstdio>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 
23 #include "compaction/compaction.h"
24 #include "db/blob/blob_file_cache.h"
25 #include "db/blob/blob_file_reader.h"
26 #include "db/blob/blob_index.h"
27 #include "db/internal_stats.h"
28 #include "db/log_reader.h"
29 #include "db/log_writer.h"
30 #include "db/memtable.h"
31 #include "db/merge_context.h"
32 #include "db/merge_helper.h"
33 #include "db/pinned_iterators_manager.h"
34 #include "db/table_cache.h"
35 #include "db/version_builder.h"
36 #include "db/version_edit_handler.h"
37 #include "file/filename.h"
38 #include "file/random_access_file_reader.h"
39 #include "file/read_write_util.h"
40 #include "file/writable_file_writer.h"
41 #include "monitoring/file_read_sample.h"
42 #include "monitoring/perf_context_imp.h"
43 #include "monitoring/persistent_stats_history.h"
44 #include "options/options_helper.h"
45 #include "rocksdb/env.h"
46 #include "rocksdb/merge_operator.h"
47 #include "rocksdb/write_buffer_manager.h"
48 #include "table/format.h"
49 #include "table/get_context.h"
50 #include "table/internal_iterator.h"
51 #include "table/merging_iterator.h"
52 #include "table/meta_blocks.h"
53 #include "table/multiget_context.h"
54 #include "table/plain/plain_table_factory.h"
55 #include "table/table_reader.h"
56 #include "table/two_level_iterator.h"
57 #include "test_util/sync_point.h"
58 #include "util/cast_util.h"
59 #include "util/coding.h"
60 #include "util/stop_watch.h"
61 #include "util/string_util.h"
62 #include "util/user_comparator_wrapper.h"
63 
64 namespace ROCKSDB_NAMESPACE {
65 
66 namespace {
67 
68 // Find File in LevelFilesBrief data structure
69 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)70 int FindFileInRange(const InternalKeyComparator& icmp,
71     const LevelFilesBrief& file_level,
72     const Slice& key,
73     uint32_t left,
74     uint32_t right) {
75   auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
76     return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
77   };
78   const auto &b = file_level.files;
79   return static_cast<int>(std::lower_bound(b + left,
80                                            b + right, key, cmp) - b);
81 }
82 
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)83 Status OverlapWithIterator(const Comparator* ucmp,
84     const Slice& smallest_user_key,
85     const Slice& largest_user_key,
86     InternalIterator* iter,
87     bool* overlap) {
88   InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
89                           kValueTypeForSeek);
90   iter->Seek(range_start.Encode());
91   if (!iter->status().ok()) {
92     return iter->status();
93   }
94 
95   *overlap = false;
96   if (iter->Valid()) {
97     ParsedInternalKey seek_result;
98     Status s = ParseInternalKey(iter->key(), &seek_result,
99                                 false /* log_err_key */);  // TODO
100     if (!s.ok()) return s;
101 
102     if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
103         0) {
104       *overlap = true;
105     }
106   }
107 
108   return iter->status();
109 }
110 
111 // Class to help choose the next file to search for the particular key.
112 // Searches and returns files level by level.
113 // We can search level-by-level since entries never hop across
114 // levels. Therefore we are guaranteed that if we find data
115 // in a smaller level, later levels are irrelevant (unless we
116 // are MergeInProgress).
117 class FilePicker {
118  public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)119   FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
120              const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
121              unsigned int num_levels, FileIndexer* file_indexer,
122              const Comparator* user_comparator,
123              const InternalKeyComparator* internal_comparator)
124       : num_levels_(num_levels),
125         curr_level_(static_cast<unsigned int>(-1)),
126         returned_file_level_(static_cast<unsigned int>(-1)),
127         hit_file_level_(static_cast<unsigned int>(-1)),
128         search_left_bound_(0),
129         search_right_bound_(FileIndexer::kLevelMaxIndex),
130 #ifndef NDEBUG
131         files_(files),
132 #endif
133         level_files_brief_(file_levels),
134         is_hit_file_last_in_level_(false),
135         curr_file_level_(nullptr),
136         user_key_(user_key),
137         ikey_(ikey),
138         file_indexer_(file_indexer),
139         user_comparator_(user_comparator),
140         internal_comparator_(internal_comparator) {
141 #ifdef NDEBUG
142     (void)files;
143 #endif
144     // Setup member variables to search first level.
145     search_ended_ = !PrepareNextLevel();
146     if (!search_ended_) {
147       // Prefetch Level 0 table data to avoid cache miss if possible.
148       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
149         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
150         if (r) {
151           r->Prepare(ikey);
152         }
153       }
154     }
155   }
156 
GetCurrentLevel() const157   int GetCurrentLevel() const { return curr_level_; }
158 
GetNextFile()159   FdWithKeyRange* GetNextFile() {
160     while (!search_ended_) {  // Loops over different levels.
161       while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
162         // Loops over all files in current level.
163         FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
164         hit_file_level_ = curr_level_;
165         is_hit_file_last_in_level_ =
166             curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
167         int cmp_largest = -1;
168 
169         // Do key range filtering of files or/and fractional cascading if:
170         // (1) not all the files are in level 0, or
171         // (2) there are more than 3 current level files
172         // If there are only 3 or less current level files in the system, we skip
173         // the key range filtering. In this case, more likely, the system is
174         // highly tuned to minimize number of tables queried by each query,
175         // so it is unlikely that key range filtering is more efficient than
176         // querying the files.
177         if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
178           // Check if key is within a file's range. If search left bound and
179           // right bound point to the same find, we are sure key falls in
180           // range.
181           assert(curr_level_ == 0 ||
182                  curr_index_in_curr_level_ == start_index_in_curr_level_ ||
183                  user_comparator_->CompareWithoutTimestamp(
184                      user_key_, ExtractUserKey(f->smallest_key)) <= 0);
185 
186           int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
187               user_key_, ExtractUserKey(f->smallest_key));
188           if (cmp_smallest >= 0) {
189             cmp_largest = user_comparator_->CompareWithoutTimestamp(
190                 user_key_, ExtractUserKey(f->largest_key));
191           }
192 
193           // Setup file search bound for the next level based on the
194           // comparison results
195           if (curr_level_ > 0) {
196             file_indexer_->GetNextLevelIndex(curr_level_,
197                                             curr_index_in_curr_level_,
198                                             cmp_smallest, cmp_largest,
199                                             &search_left_bound_,
200                                             &search_right_bound_);
201           }
202           // Key falls out of current file's range
203           if (cmp_smallest < 0 || cmp_largest > 0) {
204             if (curr_level_ == 0) {
205               ++curr_index_in_curr_level_;
206               continue;
207             } else {
208               // Search next level.
209               break;
210             }
211           }
212         }
213 #ifndef NDEBUG
214         // Sanity check to make sure that the files are correctly sorted
215         if (prev_file_) {
216           if (curr_level_ != 0) {
217             int comp_sign = internal_comparator_->Compare(
218                 prev_file_->largest_key, f->smallest_key);
219             assert(comp_sign < 0);
220           } else {
221             // level == 0, the current file cannot be newer than the previous
222             // one. Use compressed data structure, has no attribute seqNo
223             assert(curr_index_in_curr_level_ > 0);
224             assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
225                   files_[0][curr_index_in_curr_level_-1]));
226           }
227         }
228         prev_file_ = f;
229 #endif
230         returned_file_level_ = curr_level_;
231         if (curr_level_ > 0 && cmp_largest < 0) {
232           // No more files to search in this level.
233           search_ended_ = !PrepareNextLevel();
234         } else {
235           ++curr_index_in_curr_level_;
236         }
237         return f;
238       }
239       // Start searching next level.
240       search_ended_ = !PrepareNextLevel();
241     }
242     // Search ended.
243     return nullptr;
244   }
245 
246   // getter for current file level
247   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()248   unsigned int GetHitFileLevel() { return hit_file_level_; }
249 
250   // Returns true if the most recent "hit file" (i.e., one returned by
251   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()252   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
253 
254  private:
255   unsigned int num_levels_;
256   unsigned int curr_level_;
257   unsigned int returned_file_level_;
258   unsigned int hit_file_level_;
259   int32_t search_left_bound_;
260   int32_t search_right_bound_;
261 #ifndef NDEBUG
262   std::vector<FileMetaData*>* files_;
263 #endif
264   autovector<LevelFilesBrief>* level_files_brief_;
265   bool search_ended_;
266   bool is_hit_file_last_in_level_;
267   LevelFilesBrief* curr_file_level_;
268   unsigned int curr_index_in_curr_level_;
269   unsigned int start_index_in_curr_level_;
270   Slice user_key_;
271   Slice ikey_;
272   FileIndexer* file_indexer_;
273   const Comparator* user_comparator_;
274   const InternalKeyComparator* internal_comparator_;
275 #ifndef NDEBUG
276   FdWithKeyRange* prev_file_;
277 #endif
278 
279   // Setup local variables to search next level.
280   // Returns false if there are no more levels to search.
PrepareNextLevel()281   bool PrepareNextLevel() {
282     curr_level_++;
283     while (curr_level_ < num_levels_) {
284       curr_file_level_ = &(*level_files_brief_)[curr_level_];
285       if (curr_file_level_->num_files == 0) {
286         // When current level is empty, the search bound generated from upper
287         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
288         // also empty.
289         assert(search_left_bound_ == 0);
290         assert(search_right_bound_ == -1 ||
291                search_right_bound_ == FileIndexer::kLevelMaxIndex);
292         // Since current level is empty, it will need to search all files in
293         // the next level
294         search_left_bound_ = 0;
295         search_right_bound_ = FileIndexer::kLevelMaxIndex;
296         curr_level_++;
297         continue;
298       }
299 
300       // Some files may overlap each other. We find
301       // all files that overlap user_key and process them in order from
302       // newest to oldest. In the context of merge-operator, this can occur at
303       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
304       // are always compacted into a single entry).
305       int32_t start_index;
306       if (curr_level_ == 0) {
307         // On Level-0, we read through all files to check for overlap.
308         start_index = 0;
309       } else {
310         // On Level-n (n>=1), files are sorted. Binary search to find the
311         // earliest file whose largest key >= ikey. Search left bound and
312         // right bound are used to narrow the range.
313         if (search_left_bound_ <= search_right_bound_) {
314           if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
315             search_right_bound_ =
316                 static_cast<int32_t>(curr_file_level_->num_files) - 1;
317           }
318           // `search_right_bound_` is an inclusive upper-bound, but since it was
319           // determined based on user key, it is still possible the lookup key
320           // falls to the right of `search_right_bound_`'s corresponding file.
321           // So, pass a limit one higher, which allows us to detect this case.
322           start_index =
323               FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
324                               static_cast<uint32_t>(search_left_bound_),
325                               static_cast<uint32_t>(search_right_bound_) + 1);
326           if (start_index == search_right_bound_ + 1) {
327             // `ikey_` comes after `search_right_bound_`. The lookup key does
328             // not exist on this level, so let's skip this level and do a full
329             // binary search on the next level.
330             search_left_bound_ = 0;
331             search_right_bound_ = FileIndexer::kLevelMaxIndex;
332             curr_level_++;
333             continue;
334           }
335         } else {
336           // search_left_bound > search_right_bound, key does not exist in
337           // this level. Since no comparison is done in this level, it will
338           // need to search all files in the next level.
339           search_left_bound_ = 0;
340           search_right_bound_ = FileIndexer::kLevelMaxIndex;
341           curr_level_++;
342           continue;
343         }
344       }
345       start_index_in_curr_level_ = start_index;
346       curr_index_in_curr_level_ = start_index;
347 #ifndef NDEBUG
348       prev_file_ = nullptr;
349 #endif
350       return true;
351     }
352     // curr_level_ = num_levels_. So, no more levels to search.
353     return false;
354   }
355 };
356 
357 class FilePickerMultiGet {
358  private:
359   struct FilePickerContext;
360 
361  public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)362   FilePickerMultiGet(MultiGetRange* range,
363                      autovector<LevelFilesBrief>* file_levels,
364                      unsigned int num_levels, FileIndexer* file_indexer,
365                      const Comparator* user_comparator,
366                      const InternalKeyComparator* internal_comparator)
367       : num_levels_(num_levels),
368         curr_level_(static_cast<unsigned int>(-1)),
369         returned_file_level_(static_cast<unsigned int>(-1)),
370         hit_file_level_(static_cast<unsigned int>(-1)),
371         range_(range),
372         batch_iter_(range->begin()),
373         batch_iter_prev_(range->begin()),
374         upper_key_(range->begin()),
375         maybe_repeat_key_(false),
376         current_level_range_(*range, range->begin(), range->end()),
377         current_file_range_(*range, range->begin(), range->end()),
378         level_files_brief_(file_levels),
379         is_hit_file_last_in_level_(false),
380         curr_file_level_(nullptr),
381         file_indexer_(file_indexer),
382         user_comparator_(user_comparator),
383         internal_comparator_(internal_comparator) {
384     for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
385       fp_ctx_array_[iter.index()] =
386           FilePickerContext(0, FileIndexer::kLevelMaxIndex);
387     }
388 
389     // Setup member variables to search first level.
390     search_ended_ = !PrepareNextLevel();
391     if (!search_ended_) {
392       // REVISIT
393       // Prefetch Level 0 table data to avoid cache miss if possible.
394       // As of now, only PlainTableReader and CuckooTableReader do any
395       // prefetching. This may not be necessary anymore once we implement
396       // batching in those table readers
397       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
398         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
399         if (r) {
400           for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
401             r->Prepare(iter->ikey);
402           }
403         }
404       }
405     }
406   }
407 
GetCurrentLevel() const408   int GetCurrentLevel() const { return curr_level_; }
409 
410   // Iterates through files in the current level until it finds a file that
411   // contains at least one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)412   bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
413                                   size_t* file_index, FdWithKeyRange** fd,
414                                   bool* is_last_key_in_file) {
415     size_t curr_file_index = *file_index;
416     FdWithKeyRange* f = nullptr;
417     bool file_hit = false;
418     int cmp_largest = -1;
419     if (curr_file_index >= curr_file_level_->num_files) {
420       // In the unlikely case the next key is a duplicate of the current key,
421       // and the current key is the last in the level and the internal key
422       // was not found, we need to skip lookup for the remaining keys and
423       // reset the search bounds
424       if (batch_iter_ != current_level_range_.end()) {
425         ++batch_iter_;
426         for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
427           struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
428           fp_ctx.search_left_bound = 0;
429           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
430         }
431       }
432       return false;
433     }
434     // Loops over keys in the MultiGet batch until it finds a file with
435     // atleast one of the keys. Then it keeps moving forward until the
436     // last key in the batch that falls in that file
437     while (batch_iter_ != current_level_range_.end() &&
438            (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
439                 curr_file_index ||
440             !file_hit)) {
441       struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
442       f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
443       Slice& user_key = batch_iter_->ukey_without_ts;
444 
445       // Do key range filtering of files or/and fractional cascading if:
446       // (1) not all the files are in level 0, or
447       // (2) there are more than 3 current level files
448       // If there are only 3 or less current level files in the system, we
449       // skip the key range filtering. In this case, more likely, the system
450       // is highly tuned to minimize number of tables queried by each query,
451       // so it is unlikely that key range filtering is more efficient than
452       // querying the files.
453       if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
454         // Check if key is within a file's range. If search left bound and
455         // right bound point to the same find, we are sure key falls in
456         // range.
457         int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
458             user_key, false, ExtractUserKey(f->smallest_key), true);
459 
460         assert(curr_level_ == 0 ||
461                fp_ctx.curr_index_in_curr_level ==
462                    fp_ctx.start_index_in_curr_level ||
463                cmp_smallest <= 0);
464 
465         if (cmp_smallest >= 0) {
466           cmp_largest = user_comparator_->CompareWithoutTimestamp(
467               user_key, false, ExtractUserKey(f->largest_key), true);
468         } else {
469           cmp_largest = -1;
470         }
471 
472         // Setup file search bound for the next level based on the
473         // comparison results
474         if (curr_level_ > 0) {
475           file_indexer_->GetNextLevelIndex(
476               curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
477               cmp_largest, &fp_ctx.search_left_bound,
478               &fp_ctx.search_right_bound);
479         }
480         // Key falls out of current file's range
481         if (cmp_smallest < 0 || cmp_largest > 0) {
482           next_file_range->SkipKey(batch_iter_);
483         } else {
484           file_hit = true;
485         }
486       } else {
487         file_hit = true;
488       }
489       if (cmp_largest == 0) {
490         // cmp_largest is 0, which means the next key will not be in this
491         // file, so stop looking further. However, its possible there are
492         // duplicates in the batch, so find the upper bound for the batch
493         // in this file (upper_key_) by skipping past the duplicates. We
494         // leave batch_iter_ as is since we may have to pick up from there
495         // for the next file, if this file has a merge value rather than
496         // final value
497         upper_key_ = batch_iter_;
498         ++upper_key_;
499         while (upper_key_ != current_level_range_.end() &&
500                user_comparator_->CompareWithoutTimestamp(
501                    batch_iter_->ukey_without_ts, false,
502                    upper_key_->ukey_without_ts, false) == 0) {
503           ++upper_key_;
504         }
505         break;
506       } else {
507         if (curr_level_ == 0) {
508           // We need to look through all files in level 0
509           ++fp_ctx.curr_index_in_curr_level;
510         }
511         ++batch_iter_;
512       }
513       if (!file_hit) {
514         curr_file_index =
515             (batch_iter_ != current_level_range_.end())
516                 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
517                 : curr_file_level_->num_files;
518       }
519     }
520 
521     *fd = f;
522     *file_index = curr_file_index;
523     *is_last_key_in_file = cmp_largest == 0;
524     if (!*is_last_key_in_file) {
525       // If the largest key in the batch overlapping the file is not the
526       // largest key in the file, upper_ley_ would not have been updated so
527       // update it here
528       upper_key_ = batch_iter_;
529     }
530     return file_hit;
531   }
532 
GetNextFile()533   FdWithKeyRange* GetNextFile() {
534     while (!search_ended_) {
535       // Start searching next level.
536       if (batch_iter_ == current_level_range_.end()) {
537         search_ended_ = !PrepareNextLevel();
538         continue;
539       } else {
540         if (maybe_repeat_key_) {
541           maybe_repeat_key_ = false;
542           // Check if we found the final value for the last key in the
543           // previous lookup range. If we did, then there's no need to look
544           // any further for that key, so advance batch_iter_. Else, keep
545           // batch_iter_ positioned on that key so we look it up again in
546           // the next file
547           // For L0, always advance the key because we will look in the next
548           // file regardless for all keys not found yet
549           if (current_level_range_.CheckKeyDone(batch_iter_) ||
550               curr_level_ == 0) {
551             batch_iter_ = upper_key_;
552           }
553         }
554         // batch_iter_prev_ will become the start key for the next file
555         // lookup
556         batch_iter_prev_ = batch_iter_;
557       }
558 
559       MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
560                                     current_level_range_.end());
561       size_t curr_file_index =
562           (batch_iter_ != current_level_range_.end())
563               ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
564               : curr_file_level_->num_files;
565       FdWithKeyRange* f;
566       bool is_last_key_in_file;
567       if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
568                                       &is_last_key_in_file)) {
569         search_ended_ = !PrepareNextLevel();
570       } else {
571         if (is_last_key_in_file) {
572           // Since cmp_largest is 0, batch_iter_ still points to the last key
573           // that falls in this file, instead of the next one. Increment
574           // the file index for all keys between batch_iter_ and upper_key_
575           auto tmp_iter = batch_iter_;
576           while (tmp_iter != upper_key_) {
577             ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
578             ++tmp_iter;
579           }
580           maybe_repeat_key_ = true;
581         }
582         // Set the range for this file
583         current_file_range_ =
584             MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
585         returned_file_level_ = curr_level_;
586         hit_file_level_ = curr_level_;
587         is_hit_file_last_in_level_ =
588             curr_file_index == curr_file_level_->num_files - 1;
589         return f;
590       }
591     }
592 
593     // Search ended
594     return nullptr;
595   }
596 
597   // getter for current file level
598   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()599   unsigned int GetHitFileLevel() { return hit_file_level_; }
600 
601   // Returns true if the most recent "hit file" (i.e., one returned by
602   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()603   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
604 
CurrentFileRange()605   const MultiGetRange& CurrentFileRange() { return current_file_range_; }
606 
607  private:
608   unsigned int num_levels_;
609   unsigned int curr_level_;
610   unsigned int returned_file_level_;
611   unsigned int hit_file_level_;
612 
613   struct FilePickerContext {
614     int32_t search_left_bound;
615     int32_t search_right_bound;
616     unsigned int curr_index_in_curr_level;
617     unsigned int start_index_in_curr_level;
618 
FilePickerContextROCKSDB_NAMESPACE::__anonc022178e0111::FilePickerMultiGet::FilePickerContext619     FilePickerContext(int32_t left, int32_t right)
620         : search_left_bound(left), search_right_bound(right),
621           curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
622 
623     FilePickerContext() = default;
624   };
625   std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
626   MultiGetRange* range_;
627   // Iterator to iterate through the keys in a MultiGet batch, that gets reset
628   // at the beginning of each level. Each call to GetNextFile() will position
629   // batch_iter_ at or right after the last key that was found in the returned
630   // SST file
631   MultiGetRange::Iterator batch_iter_;
632   // An iterator that records the previous position of batch_iter_, i.e last
633   // key found in the previous SST file, in order to serve as the start of
634   // the batch key range for the next SST file
635   MultiGetRange::Iterator batch_iter_prev_;
636   MultiGetRange::Iterator upper_key_;
637   bool maybe_repeat_key_;
638   MultiGetRange current_level_range_;
639   MultiGetRange current_file_range_;
640   autovector<LevelFilesBrief>* level_files_brief_;
641   bool search_ended_;
642   bool is_hit_file_last_in_level_;
643   LevelFilesBrief* curr_file_level_;
644   FileIndexer* file_indexer_;
645   const Comparator* user_comparator_;
646   const InternalKeyComparator* internal_comparator_;
647 
648   // Setup local variables to search next level.
649   // Returns false if there are no more levels to search.
PrepareNextLevel()650   bool PrepareNextLevel() {
651     if (curr_level_ == 0) {
652       MultiGetRange::Iterator mget_iter = current_level_range_.begin();
653       if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
654           curr_file_level_->num_files) {
655         batch_iter_prev_ = current_level_range_.begin();
656         upper_key_ = batch_iter_ = current_level_range_.begin();
657         return true;
658       }
659     }
660 
661     curr_level_++;
662     // Reset key range to saved value
663     while (curr_level_ < num_levels_) {
664       bool level_contains_keys = false;
665       curr_file_level_ = &(*level_files_brief_)[curr_level_];
666       if (curr_file_level_->num_files == 0) {
667         // When current level is empty, the search bound generated from upper
668         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
669         // also empty.
670 
671         for (auto mget_iter = current_level_range_.begin();
672              mget_iter != current_level_range_.end(); ++mget_iter) {
673           struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
674 
675           assert(fp_ctx.search_left_bound == 0);
676           assert(fp_ctx.search_right_bound == -1 ||
677                  fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
678           // Since current level is empty, it will need to search all files in
679           // the next level
680           fp_ctx.search_left_bound = 0;
681           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
682         }
683         // Skip all subsequent empty levels
684         do {
685           ++curr_level_;
686         } while ((curr_level_ < num_levels_) &&
687                  (*level_files_brief_)[curr_level_].num_files == 0);
688         continue;
689       }
690 
691       // Some files may overlap each other. We find
692       // all files that overlap user_key and process them in order from
693       // newest to oldest. In the context of merge-operator, this can occur at
694       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
695       // are always compacted into a single entry).
696       int32_t start_index = -1;
697       current_level_range_ =
698           MultiGetRange(*range_, range_->begin(), range_->end());
699       for (auto mget_iter = current_level_range_.begin();
700            mget_iter != current_level_range_.end(); ++mget_iter) {
701         struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
702         if (curr_level_ == 0) {
703           // On Level-0, we read through all files to check for overlap.
704           start_index = 0;
705           level_contains_keys = true;
706         } else {
707           // On Level-n (n>=1), files are sorted. Binary search to find the
708           // earliest file whose largest key >= ikey. Search left bound and
709           // right bound are used to narrow the range.
710           if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
711             if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
712               fp_ctx.search_right_bound =
713                   static_cast<int32_t>(curr_file_level_->num_files) - 1;
714             }
715             // `search_right_bound_` is an inclusive upper-bound, but since it
716             // was determined based on user key, it is still possible the lookup
717             // key falls to the right of `search_right_bound_`'s corresponding
718             // file. So, pass a limit one higher, which allows us to detect this
719             // case.
720             Slice& ikey = mget_iter->ikey;
721             start_index = FindFileInRange(
722                 *internal_comparator_, *curr_file_level_, ikey,
723                 static_cast<uint32_t>(fp_ctx.search_left_bound),
724                 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
725             if (start_index == fp_ctx.search_right_bound + 1) {
726               // `ikey_` comes after `search_right_bound_`. The lookup key does
727               // not exist on this level, so let's skip this level and do a full
728               // binary search on the next level.
729               fp_ctx.search_left_bound = 0;
730               fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
731               current_level_range_.SkipKey(mget_iter);
732               continue;
733             } else {
734               level_contains_keys = true;
735             }
736           } else {
737             // search_left_bound > search_right_bound, key does not exist in
738             // this level. Since no comparison is done in this level, it will
739             // need to search all files in the next level.
740             fp_ctx.search_left_bound = 0;
741             fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
742             current_level_range_.SkipKey(mget_iter);
743             continue;
744           }
745         }
746         fp_ctx.start_index_in_curr_level = start_index;
747         fp_ctx.curr_index_in_curr_level = start_index;
748       }
749       if (level_contains_keys) {
750         batch_iter_prev_ = current_level_range_.begin();
751         upper_key_ = batch_iter_ = current_level_range_.begin();
752         return true;
753       }
754       curr_level_++;
755     }
756     // curr_level_ = num_levels_. So, no more levels to search.
757     return false;
758   }
759 };
760 }  // anonymous namespace
761 
~VersionStorageInfo()762 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
763 
~Version()764 Version::~Version() {
765   assert(refs_ == 0);
766 
767   // Remove from linked list
768   prev_->next_ = next_;
769   next_->prev_ = prev_;
770 
771   // Drop references to files
772   for (int level = 0; level < storage_info_.num_levels_; level++) {
773     for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
774       FileMetaData* f = storage_info_.files_[level][i];
775       assert(f->refs > 0);
776       f->refs--;
777       if (f->refs <= 0) {
778         assert(cfd_ != nullptr);
779         uint32_t path_id = f->fd.GetPathId();
780         assert(path_id < cfd_->ioptions()->cf_paths.size());
781         vset_->obsolete_files_.push_back(
782             ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
783       }
784     }
785   }
786 }
787 
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)788 int FindFile(const InternalKeyComparator& icmp,
789              const LevelFilesBrief& file_level,
790              const Slice& key) {
791   return FindFileInRange(icmp, file_level, key, 0,
792                          static_cast<uint32_t>(file_level.num_files));
793 }
794 
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)795 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
796         const std::vector<FileMetaData*>& files,
797         Arena* arena) {
798   assert(file_level);
799   assert(arena);
800 
801   size_t num = files.size();
802   file_level->num_files = num;
803   char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
804   file_level->files = new (mem)FdWithKeyRange[num];
805 
806   for (size_t i = 0; i < num; i++) {
807     Slice smallest_key = files[i]->smallest.Encode();
808     Slice largest_key = files[i]->largest.Encode();
809 
810     // Copy key slice to sequential memory
811     size_t smallest_size = smallest_key.size();
812     size_t largest_size = largest_key.size();
813     mem = arena->AllocateAligned(smallest_size + largest_size);
814     memcpy(mem, smallest_key.data(), smallest_size);
815     memcpy(mem + smallest_size, largest_key.data(), largest_size);
816 
817     FdWithKeyRange& f = file_level->files[i];
818     f.fd = files[i]->fd;
819     f.file_metadata = files[i];
820     f.smallest_key = Slice(mem, smallest_size);
821     f.largest_key = Slice(mem + smallest_size, largest_size);
822   }
823 }
824 
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)825 static bool AfterFile(const Comparator* ucmp,
826                       const Slice* user_key, const FdWithKeyRange* f) {
827   // nullptr user_key occurs before all keys and is therefore never after *f
828   return (user_key != nullptr &&
829           ucmp->CompareWithoutTimestamp(*user_key,
830                                         ExtractUserKey(f->largest_key)) > 0);
831 }
832 
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)833 static bool BeforeFile(const Comparator* ucmp,
834                        const Slice* user_key, const FdWithKeyRange* f) {
835   // nullptr user_key occurs after all keys and is therefore never before *f
836   return (user_key != nullptr &&
837           ucmp->CompareWithoutTimestamp(*user_key,
838                                         ExtractUserKey(f->smallest_key)) < 0);
839 }
840 
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)841 bool SomeFileOverlapsRange(
842     const InternalKeyComparator& icmp,
843     bool disjoint_sorted_files,
844     const LevelFilesBrief& file_level,
845     const Slice* smallest_user_key,
846     const Slice* largest_user_key) {
847   const Comparator* ucmp = icmp.user_comparator();
848   if (!disjoint_sorted_files) {
849     // Need to check against all files
850     for (size_t i = 0; i < file_level.num_files; i++) {
851       const FdWithKeyRange* f = &(file_level.files[i]);
852       if (AfterFile(ucmp, smallest_user_key, f) ||
853           BeforeFile(ucmp, largest_user_key, f)) {
854         // No overlap
855       } else {
856         return true;  // Overlap
857       }
858     }
859     return false;
860   }
861 
862   // Binary search over file list
863   uint32_t index = 0;
864   if (smallest_user_key != nullptr) {
865     // Find the leftmost possible internal key for smallest_user_key
866     InternalKey small;
867     small.SetMinPossibleForUserKey(*smallest_user_key);
868     index = FindFile(icmp, file_level, small.Encode());
869   }
870 
871   if (index >= file_level.num_files) {
872     // beginning of range is after all files, so no overlap.
873     return false;
874   }
875 
876   return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
877 }
878 
879 namespace {
880 
881 class LevelIterator final : public InternalIterator {
882  public:
883   // @param read_options Must outlive this iterator.
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr,bool allow_unprepared_value=false)884   LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
885                 const FileOptions& file_options,
886                 const InternalKeyComparator& icomparator,
887                 const LevelFilesBrief* flevel,
888                 const SliceTransform* prefix_extractor, bool should_sample,
889                 HistogramImpl* file_read_hist, TableReaderCaller caller,
890                 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
891                 const std::vector<AtomicCompactionUnitBoundary>*
892                     compaction_boundaries = nullptr,
893                 bool allow_unprepared_value = false)
894       : table_cache_(table_cache),
895         read_options_(read_options),
896         file_options_(file_options),
897         icomparator_(icomparator),
898         user_comparator_(icomparator.user_comparator()),
899         flevel_(flevel),
900         prefix_extractor_(prefix_extractor),
901         file_read_hist_(file_read_hist),
902         should_sample_(should_sample),
903         caller_(caller),
904         skip_filters_(skip_filters),
905         allow_unprepared_value_(allow_unprepared_value),
906         file_index_(flevel_->num_files),
907         level_(level),
908         range_del_agg_(range_del_agg),
909         pinned_iters_mgr_(nullptr),
910         compaction_boundaries_(compaction_boundaries) {
911     // Empty level is not supported.
912     assert(flevel_ != nullptr && flevel_->num_files > 0);
913   }
914 
~LevelIterator()915   ~LevelIterator() override { delete file_iter_.Set(nullptr); }
916 
917   void Seek(const Slice& target) override;
918   void SeekForPrev(const Slice& target) override;
919   void SeekToFirst() override;
920   void SeekToLast() override;
921   void Next() final override;
922   bool NextAndGetResult(IterateResult* result) override;
923   void Prev() override;
924 
Valid() const925   bool Valid() const override { return file_iter_.Valid(); }
key() const926   Slice key() const override {
927     assert(Valid());
928     return file_iter_.key();
929   }
930 
value() const931   Slice value() const override {
932     assert(Valid());
933     return file_iter_.value();
934   }
935 
status() const936   Status status() const override {
937     return file_iter_.iter() ? file_iter_.status() : Status::OK();
938   }
939 
PrepareValue()940   bool PrepareValue() override {
941     return file_iter_.PrepareValue();
942   }
943 
MayBeOutOfLowerBound()944   inline bool MayBeOutOfLowerBound() override {
945     assert(Valid());
946     return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
947   }
948 
UpperBoundCheckResult()949   inline IterBoundCheck UpperBoundCheckResult() override {
950     if (Valid()) {
951       return file_iter_.UpperBoundCheckResult();
952     } else {
953       return IterBoundCheck::kUnknown;
954     }
955   }
956 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)957   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
958     pinned_iters_mgr_ = pinned_iters_mgr;
959     if (file_iter_.iter()) {
960       file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
961     }
962   }
963 
IsKeyPinned() const964   bool IsKeyPinned() const override {
965     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
966            file_iter_.iter() && file_iter_.IsKeyPinned();
967   }
968 
IsValuePinned() const969   bool IsValuePinned() const override {
970     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
971            file_iter_.iter() && file_iter_.IsValuePinned();
972   }
973 
974  private:
975   // Return true if at least one invalid file is seen and skipped.
976   bool SkipEmptyFileForward();
977   void SkipEmptyFileBackward();
978   void SetFileIterator(InternalIterator* iter);
979   void InitFileIterator(size_t new_file_index);
980 
file_smallest_key(size_t file_index)981   const Slice& file_smallest_key(size_t file_index) {
982     assert(file_index < flevel_->num_files);
983     return flevel_->files[file_index].smallest_key;
984   }
985 
KeyReachedUpperBound(const Slice & internal_key)986   bool KeyReachedUpperBound(const Slice& internal_key) {
987     return read_options_.iterate_upper_bound != nullptr &&
988            user_comparator_.CompareWithoutTimestamp(
989                ExtractUserKey(internal_key), /*a_has_ts=*/true,
990                *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
991   }
992 
NewFileIterator()993   InternalIterator* NewFileIterator() {
994     assert(file_index_ < flevel_->num_files);
995     auto file_meta = flevel_->files[file_index_];
996     if (should_sample_) {
997       sample_file_read_inc(file_meta.file_metadata);
998     }
999 
1000     const InternalKey* smallest_compaction_key = nullptr;
1001     const InternalKey* largest_compaction_key = nullptr;
1002     if (compaction_boundaries_ != nullptr) {
1003       smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
1004       largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
1005     }
1006     CheckMayBeOutOfLowerBound();
1007     return table_cache_->NewIterator(
1008         read_options_, file_options_, icomparator_, *file_meta.file_metadata,
1009         range_del_agg_, prefix_extractor_,
1010         nullptr /* don't need reference to table */, file_read_hist_, caller_,
1011         /*arena=*/nullptr, skip_filters_, level_,
1012         /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
1013         largest_compaction_key, allow_unprepared_value_);
1014   }
1015 
1016   // Check if current file being fully within iterate_lower_bound.
1017   //
1018   // Note MyRocks may update iterate bounds between seek. To workaround it,
1019   // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()1020   void CheckMayBeOutOfLowerBound() {
1021     if (read_options_.iterate_lower_bound != nullptr &&
1022         file_index_ < flevel_->num_files) {
1023       may_be_out_of_lower_bound_ =
1024           user_comparator_.CompareWithoutTimestamp(
1025               ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
1026               *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
1027     }
1028   }
1029 
1030   TableCache* table_cache_;
1031   const ReadOptions& read_options_;
1032   const FileOptions& file_options_;
1033   const InternalKeyComparator& icomparator_;
1034   const UserComparatorWrapper user_comparator_;
1035   const LevelFilesBrief* flevel_;
1036   mutable FileDescriptor current_value_;
1037   // `prefix_extractor_` may be non-null even for total order seek. Checking
1038   // this variable is not the right way to identify whether prefix iterator
1039   // is used.
1040   const SliceTransform* prefix_extractor_;
1041 
1042   HistogramImpl* file_read_hist_;
1043   bool should_sample_;
1044   TableReaderCaller caller_;
1045   bool skip_filters_;
1046   bool allow_unprepared_value_;
1047   bool may_be_out_of_lower_bound_ = true;
1048   size_t file_index_;
1049   int level_;
1050   RangeDelAggregator* range_del_agg_;
1051   IteratorWrapper file_iter_;  // May be nullptr
1052   PinnedIteratorsManager* pinned_iters_mgr_;
1053 
1054   // To be propagated to RangeDelAggregator in order to safely truncate range
1055   // tombstones.
1056   const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1057 };
1058 
Seek(const Slice & target)1059 void LevelIterator::Seek(const Slice& target) {
1060   // Check whether the seek key fall under the same file
1061   bool need_to_reseek = true;
1062   if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1063     const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1064     if (icomparator_.InternalKeyComparator::Compare(
1065             target, cur_file.largest_key) <= 0 &&
1066         icomparator_.InternalKeyComparator::Compare(
1067             target, cur_file.smallest_key) >= 0) {
1068       need_to_reseek = false;
1069       assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1070              file_index_);
1071     }
1072   }
1073   if (need_to_reseek) {
1074     TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1075     size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1076     InitFileIterator(new_file_index);
1077   }
1078 
1079   if (file_iter_.iter() != nullptr) {
1080     file_iter_.Seek(target);
1081   }
1082   if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1083       !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1084       file_iter_.iter() != nullptr && file_iter_.Valid()) {
1085     // We've skipped the file we initially positioned to. In the prefix
1086     // seek case, it is likely that the file is skipped because of
1087     // prefix bloom or hash, where more keys are skipped. We then check
1088     // the current key and invalidate the iterator if the prefix is
1089     // already passed.
1090     // When doing prefix iterator seek, when keys for one prefix have
1091     // been exhausted, it can jump to any key that is larger. Here we are
1092     // enforcing a stricter contract than that, in order to make it easier for
1093     // higher layers (merging and DB iterator) to reason the correctness:
1094     // 1. Within the prefix, the result should be accurate.
1095     // 2. If keys for the prefix is exhausted, it is either positioned to the
1096     //    next key after the prefix, or make the iterator invalid.
1097     // A side benefit will be that it invalidates the iterator earlier so that
1098     // the upper level merging iterator can merge fewer child iterators.
1099     size_t ts_sz = user_comparator_.timestamp_size();
1100     Slice target_user_key_without_ts =
1101         ExtractUserKeyAndStripTimestamp(target, ts_sz);
1102     Slice file_user_key_without_ts =
1103         ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
1104     if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
1105         (!prefix_extractor_->InDomain(file_user_key_without_ts) ||
1106          user_comparator_.CompareWithoutTimestamp(
1107              prefix_extractor_->Transform(target_user_key_without_ts), false,
1108              prefix_extractor_->Transform(file_user_key_without_ts),
1109              false) != 0)) {
1110       SetFileIterator(nullptr);
1111     }
1112   }
1113   CheckMayBeOutOfLowerBound();
1114 }
1115 
SeekForPrev(const Slice & target)1116 void LevelIterator::SeekForPrev(const Slice& target) {
1117   size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1118   if (new_file_index >= flevel_->num_files) {
1119     new_file_index = flevel_->num_files - 1;
1120   }
1121 
1122   InitFileIterator(new_file_index);
1123   if (file_iter_.iter() != nullptr) {
1124     file_iter_.SeekForPrev(target);
1125     SkipEmptyFileBackward();
1126   }
1127   CheckMayBeOutOfLowerBound();
1128 }
1129 
SeekToFirst()1130 void LevelIterator::SeekToFirst() {
1131   InitFileIterator(0);
1132   if (file_iter_.iter() != nullptr) {
1133     file_iter_.SeekToFirst();
1134   }
1135   SkipEmptyFileForward();
1136   CheckMayBeOutOfLowerBound();
1137 }
1138 
SeekToLast()1139 void LevelIterator::SeekToLast() {
1140   InitFileIterator(flevel_->num_files - 1);
1141   if (file_iter_.iter() != nullptr) {
1142     file_iter_.SeekToLast();
1143   }
1144   SkipEmptyFileBackward();
1145   CheckMayBeOutOfLowerBound();
1146 }
1147 
Next()1148 void LevelIterator::Next() {
1149   assert(Valid());
1150   file_iter_.Next();
1151   SkipEmptyFileForward();
1152 }
1153 
NextAndGetResult(IterateResult * result)1154 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1155   assert(Valid());
1156   bool is_valid = file_iter_.NextAndGetResult(result);
1157   if (!is_valid) {
1158     SkipEmptyFileForward();
1159     is_valid = Valid();
1160     if (is_valid) {
1161       result->key = key();
1162       result->bound_check_result = file_iter_.UpperBoundCheckResult();
1163       // Ideally, we should return the real file_iter_.value_prepared but the
1164       // information is not here. It would casue an extra PrepareValue()
1165       // for the first key of a file.
1166       result->value_prepared = !allow_unprepared_value_;
1167     }
1168   }
1169   return is_valid;
1170 }
1171 
Prev()1172 void LevelIterator::Prev() {
1173   assert(Valid());
1174   file_iter_.Prev();
1175   SkipEmptyFileBackward();
1176 }
1177 
SkipEmptyFileForward()1178 bool LevelIterator::SkipEmptyFileForward() {
1179   bool seen_empty_file = false;
1180   while (file_iter_.iter() == nullptr ||
1181          (!file_iter_.Valid() && file_iter_.status().ok() &&
1182           file_iter_.iter()->UpperBoundCheckResult() !=
1183               IterBoundCheck::kOutOfBound)) {
1184     seen_empty_file = true;
1185     // Move to next file
1186     if (file_index_ >= flevel_->num_files - 1) {
1187       // Already at the last file
1188       SetFileIterator(nullptr);
1189       break;
1190     }
1191     if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1192       SetFileIterator(nullptr);
1193       break;
1194     }
1195     InitFileIterator(file_index_ + 1);
1196     if (file_iter_.iter() != nullptr) {
1197       file_iter_.SeekToFirst();
1198     }
1199   }
1200   return seen_empty_file;
1201 }
1202 
SkipEmptyFileBackward()1203 void LevelIterator::SkipEmptyFileBackward() {
1204   while (file_iter_.iter() == nullptr ||
1205          (!file_iter_.Valid() && file_iter_.status().ok())) {
1206     // Move to previous file
1207     if (file_index_ == 0) {
1208       // Already the first file
1209       SetFileIterator(nullptr);
1210       return;
1211     }
1212     InitFileIterator(file_index_ - 1);
1213     if (file_iter_.iter() != nullptr) {
1214       file_iter_.SeekToLast();
1215     }
1216   }
1217 }
1218 
SetFileIterator(InternalIterator * iter)1219 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1220   if (pinned_iters_mgr_ && iter) {
1221     iter->SetPinnedItersMgr(pinned_iters_mgr_);
1222   }
1223 
1224   InternalIterator* old_iter = file_iter_.Set(iter);
1225   if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1226     pinned_iters_mgr_->PinIterator(old_iter);
1227   } else {
1228     delete old_iter;
1229   }
1230 }
1231 
InitFileIterator(size_t new_file_index)1232 void LevelIterator::InitFileIterator(size_t new_file_index) {
1233   if (new_file_index >= flevel_->num_files) {
1234     file_index_ = new_file_index;
1235     SetFileIterator(nullptr);
1236     return;
1237   } else {
1238     // If the file iterator shows incomplete, we try it again if users seek
1239     // to the same file, as this time we may go to a different data block
1240     // which is cached in block cache.
1241     //
1242     if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1243         new_file_index == file_index_) {
1244       // file_iter_ is already constructed with this iterator, so
1245       // no need to change anything
1246     } else {
1247       file_index_ = new_file_index;
1248       InternalIterator* iter = NewFileIterator();
1249       SetFileIterator(iter);
1250     }
1251   }
1252 }
1253 }  // anonymous namespace
1254 
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1255 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1256                                    const FileMetaData* file_meta,
1257                                    const std::string* fname) const {
1258   auto table_cache = cfd_->table_cache();
1259   auto ioptions = cfd_->ioptions();
1260   Status s = table_cache->GetTableProperties(
1261       file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1262       mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1263   if (s.ok()) {
1264     return s;
1265   }
1266 
1267   // We only ignore error type `Incomplete` since it's by design that we
1268   // disallow table when it's not in table cache.
1269   if (!s.IsIncomplete()) {
1270     return s;
1271   }
1272 
1273   // 2. Table is not present in table cache, we'll read the table properties
1274   // directly from the properties block in the file.
1275   std::unique_ptr<FSRandomAccessFile> file;
1276   std::string file_name;
1277   if (fname != nullptr) {
1278     file_name = *fname;
1279   } else {
1280     file_name =
1281       TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1282                     file_meta->fd.GetPathId());
1283   }
1284   s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1285                                         nullptr);
1286   if (!s.ok()) {
1287     return s;
1288   }
1289 
1290   TableProperties* raw_table_properties;
1291   // By setting the magic number to kInvalidTableMagicNumber, we can by
1292   // pass the magic number check in the footer.
1293   std::unique_ptr<RandomAccessFileReader> file_reader(
1294       new RandomAccessFileReader(
1295           std::move(file), file_name, nullptr /* env */, io_tracer_,
1296           nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
1297           nullptr /* rate_limiter */, ioptions->listeners));
1298   s = ReadTableProperties(
1299       file_reader.get(), file_meta->fd.GetFileSize(),
1300       Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1301       &raw_table_properties, false /* compression_type_missing */);
1302   if (!s.ok()) {
1303     return s;
1304   }
1305   RecordTick(ioptions->stats, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1306 
1307   *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1308   return s;
1309 }
1310 
GetPropertiesOfAllTables(TablePropertiesCollection * props)1311 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1312   Status s;
1313   for (int level = 0; level < storage_info_.num_levels_; level++) {
1314     s = GetPropertiesOfAllTables(props, level);
1315     if (!s.ok()) {
1316       return s;
1317     }
1318   }
1319 
1320   return Status::OK();
1321 }
1322 
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1323 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1324                                             std::string* out_str) {
1325   if (max_entries_to_print <= 0) {
1326     return Status::OK();
1327   }
1328   int num_entries_left = max_entries_to_print;
1329 
1330   std::stringstream ss;
1331 
1332   for (int level = 0; level < storage_info_.num_levels_; level++) {
1333     for (const auto& file_meta : storage_info_.files_[level]) {
1334       auto fname =
1335           TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1336                         file_meta->fd.GetPathId());
1337 
1338       ss << "=== file : " << fname << " ===\n";
1339 
1340       TableCache* table_cache = cfd_->table_cache();
1341       std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1342 
1343       Status s = table_cache->GetRangeTombstoneIterator(
1344           ReadOptions(), cfd_->internal_comparator(), *file_meta,
1345           &tombstone_iter);
1346       if (!s.ok()) {
1347         return s;
1348       }
1349       if (tombstone_iter) {
1350         tombstone_iter->SeekToFirst();
1351 
1352         while (tombstone_iter->Valid() && num_entries_left > 0) {
1353           ss << "start: " << tombstone_iter->start_key().ToString(true)
1354              << " end: " << tombstone_iter->end_key().ToString(true)
1355              << " seq: " << tombstone_iter->seq() << '\n';
1356           tombstone_iter->Next();
1357           num_entries_left--;
1358         }
1359         if (num_entries_left <= 0) {
1360           break;
1361         }
1362       }
1363     }
1364     if (num_entries_left <= 0) {
1365       break;
1366     }
1367   }
1368   assert(num_entries_left >= 0);
1369   if (num_entries_left <= 0) {
1370     ss << "(results may not be complete)\n";
1371   }
1372 
1373   *out_str = ss.str();
1374   return Status::OK();
1375 }
1376 
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1377 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1378                                          int level) {
1379   for (const auto& file_meta : storage_info_.files_[level]) {
1380     auto fname =
1381         TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1382                       file_meta->fd.GetPathId());
1383     // 1. If the table is already present in table cache, load table
1384     // properties from there.
1385     std::shared_ptr<const TableProperties> table_properties;
1386     Status s = GetTableProperties(&table_properties, file_meta, &fname);
1387     if (s.ok()) {
1388       props->insert({fname, table_properties});
1389     } else {
1390       return s;
1391     }
1392   }
1393 
1394   return Status::OK();
1395 }
1396 
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1397 Status Version::GetPropertiesOfTablesInRange(
1398     const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1399   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1400     for (decltype(n) i = 0; i < n; i++) {
1401       // Convert user_key into a corresponding internal key.
1402       InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1403       InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1404       std::vector<FileMetaData*> files;
1405       storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1406                                          false);
1407       for (const auto& file_meta : files) {
1408         auto fname =
1409             TableFileName(cfd_->ioptions()->cf_paths,
1410                           file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1411         if (props->count(fname) == 0) {
1412           // 1. If the table is already present in table cache, load table
1413           // properties from there.
1414           std::shared_ptr<const TableProperties> table_properties;
1415           Status s = GetTableProperties(&table_properties, file_meta, &fname);
1416           if (s.ok()) {
1417             props->insert({fname, table_properties});
1418           } else {
1419             return s;
1420           }
1421         }
1422       }
1423     }
1424   }
1425 
1426   return Status::OK();
1427 }
1428 
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1429 Status Version::GetAggregatedTableProperties(
1430     std::shared_ptr<const TableProperties>* tp, int level) {
1431   TablePropertiesCollection props;
1432   Status s;
1433   if (level < 0) {
1434     s = GetPropertiesOfAllTables(&props);
1435   } else {
1436     s = GetPropertiesOfAllTables(&props, level);
1437   }
1438   if (!s.ok()) {
1439     return s;
1440   }
1441 
1442   auto* new_tp = new TableProperties();
1443   for (const auto& item : props) {
1444     new_tp->Add(*item.second);
1445   }
1446   tp->reset(new_tp);
1447   return Status::OK();
1448 }
1449 
GetMemoryUsageByTableReaders()1450 size_t Version::GetMemoryUsageByTableReaders() {
1451   size_t total_usage = 0;
1452   for (auto& file_level : storage_info_.level_files_brief_) {
1453     for (size_t i = 0; i < file_level.num_files; i++) {
1454       total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1455           file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1456           mutable_cf_options_.prefix_extractor.get());
1457     }
1458   }
1459   return total_usage;
1460 }
1461 
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1462 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1463   assert(cf_meta);
1464   assert(cfd_);
1465 
1466   cf_meta->name = cfd_->GetName();
1467   cf_meta->size = 0;
1468   cf_meta->file_count = 0;
1469   cf_meta->levels.clear();
1470 
1471   auto* ioptions = cfd_->ioptions();
1472   auto* vstorage = storage_info();
1473 
1474   for (int level = 0; level < cfd_->NumberLevels(); level++) {
1475     uint64_t level_size = 0;
1476     cf_meta->file_count += vstorage->LevelFiles(level).size();
1477     std::vector<SstFileMetaData> files;
1478     for (const auto& file : vstorage->LevelFiles(level)) {
1479       uint32_t path_id = file->fd.GetPathId();
1480       std::string file_path;
1481       if (path_id < ioptions->cf_paths.size()) {
1482         file_path = ioptions->cf_paths[path_id].path;
1483       } else {
1484         assert(!ioptions->cf_paths.empty());
1485         file_path = ioptions->cf_paths.back().path;
1486       }
1487       const uint64_t file_number = file->fd.GetNumber();
1488       files.emplace_back(
1489           MakeTableFileName("", file_number), file_number, file_path,
1490           static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1491           file->fd.largest_seqno, file->smallest.user_key().ToString(),
1492           file->largest.user_key().ToString(),
1493           file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1494           file->being_compacted, file->temperature,
1495           file->oldest_blob_file_number, file->TryGetOldestAncesterTime(),
1496           file->TryGetFileCreationTime(), file->file_checksum,
1497           file->file_checksum_func_name);
1498       files.back().num_entries = file->num_entries;
1499       files.back().num_deletions = file->num_deletions;
1500       level_size += file->fd.GetFileSize();
1501     }
1502     cf_meta->levels.emplace_back(
1503         level, level_size, std::move(files));
1504     cf_meta->size += level_size;
1505   }
1506 }
1507 
GetSstFilesSize()1508 uint64_t Version::GetSstFilesSize() {
1509   uint64_t sst_files_size = 0;
1510   for (int level = 0; level < storage_info_.num_levels_; level++) {
1511     for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1512       sst_files_size += file_meta->fd.GetFileSize();
1513     }
1514   }
1515   return sst_files_size;
1516 }
1517 
GetCreationTimeOfOldestFile(uint64_t * creation_time)1518 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1519   uint64_t oldest_time = port::kMaxUint64;
1520   for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1521     for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1522       assert(meta->fd.table_reader != nullptr);
1523       uint64_t file_creation_time = meta->TryGetFileCreationTime();
1524       if (file_creation_time == kUnknownFileCreationTime) {
1525         *creation_time = 0;
1526         return;
1527       }
1528       if (file_creation_time < oldest_time) {
1529         oldest_time = file_creation_time;
1530       }
1531     }
1532   }
1533   *creation_time = oldest_time;
1534 }
1535 
GetEstimatedActiveKeys() const1536 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1537   // Estimation will be inaccurate when:
1538   // (1) there exist merge keys
1539   // (2) keys are directly overwritten
1540   // (3) deletion on non-existing keys
1541   // (4) low number of samples
1542   if (current_num_samples_ == 0) {
1543     return 0;
1544   }
1545 
1546   if (current_num_non_deletions_ <= current_num_deletions_) {
1547     return 0;
1548   }
1549 
1550   uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1551 
1552   uint64_t file_count = 0;
1553   for (int level = 0; level < num_levels_; ++level) {
1554     file_count += files_[level].size();
1555   }
1556 
1557   if (current_num_samples_ < file_count) {
1558     // casting to avoid overflowing
1559     return
1560       static_cast<uint64_t>(
1561         (est * static_cast<double>(file_count) / current_num_samples_)
1562       );
1563   } else {
1564     return est;
1565   }
1566 }
1567 
GetEstimatedCompressionRatioAtLevel(int level) const1568 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1569     int level) const {
1570   assert(level < num_levels_);
1571   uint64_t sum_file_size_bytes = 0;
1572   uint64_t sum_data_size_bytes = 0;
1573   for (auto* file_meta : files_[level]) {
1574     sum_file_size_bytes += file_meta->fd.GetFileSize();
1575     sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1576   }
1577   if (sum_file_size_bytes == 0) {
1578     return -1.0;
1579   }
1580   return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1581 }
1582 
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg,bool allow_unprepared_value)1583 void Version::AddIterators(const ReadOptions& read_options,
1584                            const FileOptions& soptions,
1585                            MergeIteratorBuilder* merge_iter_builder,
1586                            RangeDelAggregator* range_del_agg,
1587                            bool allow_unprepared_value) {
1588   assert(storage_info_.finalized_);
1589 
1590   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1591     AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1592                          range_del_agg, allow_unprepared_value);
1593   }
1594 }
1595 
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg,bool allow_unprepared_value)1596 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1597                                    const FileOptions& soptions,
1598                                    MergeIteratorBuilder* merge_iter_builder,
1599                                    int level,
1600                                    RangeDelAggregator* range_del_agg,
1601                                    bool allow_unprepared_value) {
1602   assert(storage_info_.finalized_);
1603   if (level >= storage_info_.num_non_empty_levels()) {
1604     // This is an empty level
1605     return;
1606   } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1607     // No files in this level
1608     return;
1609   }
1610 
1611   bool should_sample = should_sample_file_read();
1612 
1613   auto* arena = merge_iter_builder->GetArena();
1614   if (level == 0) {
1615     // Merge all level zero files together since they may overlap
1616     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1617       const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1618       merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1619           read_options, soptions, cfd_->internal_comparator(),
1620           *file.file_metadata, range_del_agg,
1621           mutable_cf_options_.prefix_extractor.get(), nullptr,
1622           cfd_->internal_stats()->GetFileReadHist(0),
1623           TableReaderCaller::kUserIterator, arena,
1624           /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1625           /*smallest_compaction_key=*/nullptr,
1626           /*largest_compaction_key=*/nullptr, allow_unprepared_value));
1627     }
1628     if (should_sample) {
1629       // Count ones for every L0 files. This is done per iterator creation
1630       // rather than Seek(), while files in other levels are recored per seek.
1631       // If users execute one range query per iterator, there may be some
1632       // discrepancy here.
1633       for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1634         sample_file_read_inc(meta);
1635       }
1636     }
1637   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1638     // For levels > 0, we can use a concatenating iterator that sequentially
1639     // walks through the non-overlapping files in the level, opening them
1640     // lazily.
1641     auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1642     merge_iter_builder->AddIterator(new (mem) LevelIterator(
1643         cfd_->table_cache(), read_options, soptions,
1644         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1645         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1646         cfd_->internal_stats()->GetFileReadHist(level),
1647         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1648         range_del_agg,
1649         /*compaction_boundaries=*/nullptr, allow_unprepared_value));
1650   }
1651 }
1652 
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1653 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1654                                          const FileOptions& file_options,
1655                                          const Slice& smallest_user_key,
1656                                          const Slice& largest_user_key,
1657                                          int level, bool* overlap) {
1658   assert(storage_info_.finalized_);
1659 
1660   auto icmp = cfd_->internal_comparator();
1661   auto ucmp = icmp.user_comparator();
1662 
1663   Arena arena;
1664   Status status;
1665   ReadRangeDelAggregator range_del_agg(&icmp,
1666                                        kMaxSequenceNumber /* upper_bound */);
1667 
1668   *overlap = false;
1669 
1670   if (level == 0) {
1671     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1672       const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1673       if (AfterFile(ucmp, &smallest_user_key, file) ||
1674           BeforeFile(ucmp, &largest_user_key, file)) {
1675         continue;
1676       }
1677       ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1678           read_options, file_options, cfd_->internal_comparator(),
1679           *file->file_metadata, &range_del_agg,
1680           mutable_cf_options_.prefix_extractor.get(), nullptr,
1681           cfd_->internal_stats()->GetFileReadHist(0),
1682           TableReaderCaller::kUserIterator, &arena,
1683           /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1684           /*smallest_compaction_key=*/nullptr,
1685           /*largest_compaction_key=*/nullptr,
1686           /*allow_unprepared_value=*/false));
1687       status = OverlapWithIterator(
1688           ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1689       if (!status.ok() || *overlap) {
1690         break;
1691       }
1692     }
1693   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1694     auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1695     ScopedArenaIterator iter(new (mem) LevelIterator(
1696         cfd_->table_cache(), read_options, file_options,
1697         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1698         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1699         cfd_->internal_stats()->GetFileReadHist(level),
1700         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1701         &range_del_agg));
1702     status = OverlapWithIterator(
1703         ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1704   }
1705 
1706   if (status.ok() && *overlap == false &&
1707       range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1708     *overlap = true;
1709   }
1710   return status;
1711 }
1712 
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1713 VersionStorageInfo::VersionStorageInfo(
1714     const InternalKeyComparator* internal_comparator,
1715     const Comparator* user_comparator, int levels,
1716     CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1717     bool _force_consistency_checks)
1718     : internal_comparator_(internal_comparator),
1719       user_comparator_(user_comparator),
1720       // cfd is nullptr if Version is dummy
1721       num_levels_(levels),
1722       num_non_empty_levels_(0),
1723       file_indexer_(user_comparator),
1724       compaction_style_(compaction_style),
1725       files_(new std::vector<FileMetaData*>[num_levels_]),
1726       base_level_(num_levels_ == 1 ? -1 : 1),
1727       level_multiplier_(0.0),
1728       files_by_compaction_pri_(num_levels_),
1729       level0_non_overlapping_(false),
1730       next_file_to_compact_by_size_(num_levels_),
1731       compaction_score_(num_levels_),
1732       compaction_level_(num_levels_),
1733       l0_delay_trigger_count_(0),
1734       accumulated_file_size_(0),
1735       accumulated_raw_key_size_(0),
1736       accumulated_raw_value_size_(0),
1737       accumulated_num_non_deletions_(0),
1738       accumulated_num_deletions_(0),
1739       current_num_non_deletions_(0),
1740       current_num_deletions_(0),
1741       current_num_samples_(0),
1742       estimated_compaction_needed_bytes_(0),
1743       finalized_(false),
1744       force_consistency_checks_(_force_consistency_checks) {
1745   if (ref_vstorage != nullptr) {
1746     accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1747     accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1748     accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1749     accumulated_num_non_deletions_ =
1750         ref_vstorage->accumulated_num_non_deletions_;
1751     accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1752     current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1753     current_num_deletions_ = ref_vstorage->current_num_deletions_;
1754     current_num_samples_ = ref_vstorage->current_num_samples_;
1755     oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1756   }
1757 }
1758 
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,const std::shared_ptr<IOTracer> & io_tracer,uint64_t version_number)1759 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1760                  const FileOptions& file_opt,
1761                  const MutableCFOptions mutable_cf_options,
1762                  const std::shared_ptr<IOTracer>& io_tracer,
1763                  uint64_t version_number)
1764     : env_(vset->env_),
1765       clock_(vset->clock_),
1766       cfd_(column_family_data),
1767       info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->logger),
1768       db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->stats),
1769       table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1770       blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
1771       merge_operator_(
1772           (cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()),
1773       storage_info_(
1774           (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1775           (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1776           cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1777           cfd_ == nullptr ? kCompactionStyleLevel
1778                           : cfd_->ioptions()->compaction_style,
1779           (cfd_ == nullptr || cfd_->current() == nullptr)
1780               ? nullptr
1781               : cfd_->current()->storage_info(),
1782           cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1783       vset_(vset),
1784       next_(this),
1785       prev_(this),
1786       refs_(0),
1787       file_options_(file_opt),
1788       mutable_cf_options_(mutable_cf_options),
1789       max_file_size_for_l0_meta_pin_(
1790           MaxFileSizeForL0MetaPin(mutable_cf_options_)),
1791       version_number_(version_number),
1792       io_tracer_(io_tracer) {}
1793 
GetBlob(const ReadOptions & read_options,const Slice & user_key,const Slice & blob_index_slice,PinnableSlice * value,uint64_t * bytes_read) const1794 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1795                         const Slice& blob_index_slice, PinnableSlice* value,
1796                         uint64_t* bytes_read) const {
1797   if (read_options.read_tier == kBlockCacheTier) {
1798     return Status::Incomplete("Cannot read blob: no disk I/O allowed");
1799   }
1800 
1801   BlobIndex blob_index;
1802 
1803   {
1804     Status s = blob_index.DecodeFrom(blob_index_slice);
1805     if (!s.ok()) {
1806       return s;
1807     }
1808   }
1809 
1810   return GetBlob(read_options, user_key, blob_index, value, bytes_read);
1811 }
1812 
GetBlob(const ReadOptions & read_options,const Slice & user_key,const BlobIndex & blob_index,PinnableSlice * value,uint64_t * bytes_read) const1813 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1814                         const BlobIndex& blob_index, PinnableSlice* value,
1815                         uint64_t* bytes_read) const {
1816   assert(value);
1817 
1818   if (blob_index.HasTTL() || blob_index.IsInlined()) {
1819     return Status::Corruption("Unexpected TTL/inlined blob index");
1820   }
1821 
1822   const auto& blob_files = storage_info_.GetBlobFiles();
1823 
1824   const uint64_t blob_file_number = blob_index.file_number();
1825 
1826   const auto it = blob_files.find(blob_file_number);
1827   if (it == blob_files.end()) {
1828     return Status::Corruption("Invalid blob file number");
1829   }
1830 
1831   CacheHandleGuard<BlobFileReader> blob_file_reader;
1832 
1833   {
1834     assert(blob_file_cache_);
1835     const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number,
1836                                                          &blob_file_reader);
1837     if (!s.ok()) {
1838       return s;
1839     }
1840   }
1841 
1842   assert(blob_file_reader.GetValue());
1843   const Status s = blob_file_reader.GetValue()->GetBlob(
1844       read_options, user_key, blob_index.offset(), blob_index.size(),
1845       blob_index.compression(), value, bytes_read);
1846 
1847   return s;
1848 }
1849 
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,std::string * timestamp,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1850 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1851                   PinnableSlice* value, std::string* timestamp, Status* status,
1852                   MergeContext* merge_context,
1853                   SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1854                   bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1855                   bool* is_blob, bool do_merge) {
1856   Slice ikey = k.internal_key();
1857   Slice user_key = k.user_key();
1858 
1859   assert(status->ok() || status->IsMergeInProgress());
1860 
1861   if (key_exists != nullptr) {
1862     // will falsify below if not found
1863     *key_exists = true;
1864   }
1865 
1866   PinnedIteratorsManager pinned_iters_mgr;
1867   uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1868   if (vset_ && vset_->block_cache_tracer_ &&
1869       vset_->block_cache_tracer_->is_tracing_enabled()) {
1870     tracing_get_id = vset_->block_cache_tracer_->NextGetId();
1871   }
1872 
1873   // Note: the old StackableDB-based BlobDB passes in
1874   // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
1875   // need to provide it here.
1876   bool is_blob_index = false;
1877   bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
1878 
1879   GetContext get_context(
1880       user_comparator(), merge_operator_, info_log_, db_statistics_,
1881       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1882       do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
1883       merge_context, do_merge, max_covering_tombstone_seq, clock_, seq,
1884       merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
1885       tracing_get_id);
1886 
1887   // Pin blocks that we read to hold merge operands
1888   if (merge_operator_) {
1889     pinned_iters_mgr.StartPinning();
1890   }
1891 
1892   FilePicker fp(
1893       storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1894       storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1895       user_comparator(), internal_comparator());
1896   FdWithKeyRange* f = fp.GetNextFile();
1897 
1898   while (f != nullptr) {
1899     if (*max_covering_tombstone_seq > 0) {
1900       // The remaining files we look at will only contain covered keys, so we
1901       // stop here.
1902       break;
1903     }
1904     if (get_context.sample()) {
1905       sample_file_read_inc(f->file_metadata);
1906     }
1907 
1908     bool timer_enabled =
1909         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1910         get_perf_context()->per_level_perf_context_enabled;
1911     StopWatchNano timer(clock_, timer_enabled /* auto_start */);
1912     *status = table_cache_->Get(
1913         read_options, *internal_comparator(), *f->file_metadata, ikey,
1914         &get_context, mutable_cf_options_.prefix_extractor.get(),
1915         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1916         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1917                         fp.IsHitFileLastInLevel()),
1918         fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
1919     // TODO: examine the behavior for corrupted key
1920     if (timer_enabled) {
1921       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1922                                 fp.GetHitFileLevel());
1923     }
1924     if (!status->ok()) {
1925       return;
1926     }
1927 
1928     // report the counters before returning
1929     if (get_context.State() != GetContext::kNotFound &&
1930         get_context.State() != GetContext::kMerge &&
1931         db_statistics_ != nullptr) {
1932       get_context.ReportCounters();
1933     }
1934     switch (get_context.State()) {
1935       case GetContext::kNotFound:
1936         // Keep searching in other files
1937         break;
1938       case GetContext::kMerge:
1939         // TODO: update per-level perfcontext user_key_return_count for kMerge
1940         break;
1941       case GetContext::kFound:
1942         if (fp.GetHitFileLevel() == 0) {
1943           RecordTick(db_statistics_, GET_HIT_L0);
1944         } else if (fp.GetHitFileLevel() == 1) {
1945           RecordTick(db_statistics_, GET_HIT_L1);
1946         } else if (fp.GetHitFileLevel() >= 2) {
1947           RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1948         }
1949 
1950         PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1951                                   fp.GetHitFileLevel());
1952 
1953         if (is_blob_index) {
1954           if (do_merge && value) {
1955             constexpr uint64_t* bytes_read = nullptr;
1956 
1957             *status =
1958                 GetBlob(read_options, user_key, *value, value, bytes_read);
1959             if (!status->ok()) {
1960               if (status->IsIncomplete()) {
1961                 get_context.MarkKeyMayExist();
1962               }
1963               return;
1964             }
1965           }
1966         }
1967 
1968         return;
1969       case GetContext::kDeleted:
1970         // Use empty error message for speed
1971         *status = Status::NotFound();
1972         return;
1973       case GetContext::kCorrupt:
1974         *status = Status::Corruption("corrupted key for ", user_key);
1975         return;
1976       case GetContext::kUnexpectedBlobIndex:
1977         ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1978         *status = Status::NotSupported(
1979             "Encounter unexpected blob index. Please open DB with "
1980             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1981         return;
1982     }
1983     f = fp.GetNextFile();
1984   }
1985   if (db_statistics_ != nullptr) {
1986     get_context.ReportCounters();
1987   }
1988   if (GetContext::kMerge == get_context.State()) {
1989     if (!do_merge) {
1990       *status = Status::OK();
1991       return;
1992     }
1993     if (!merge_operator_) {
1994       *status =  Status::InvalidArgument(
1995           "merge_operator is not properly initialized.");
1996       return;
1997     }
1998     // merge_operands are in saver and we hit the beginning of the key history
1999     // do a final merge of nullptr and operands;
2000     std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
2001     *status = MergeHelper::TimedFullMerge(
2002         merge_operator_, user_key, nullptr, merge_context->GetOperands(),
2003         str_value, info_log_, db_statistics_, clock_,
2004         nullptr /* result_operand */, true);
2005     if (LIKELY(value != nullptr)) {
2006       value->PinSelf();
2007     }
2008   } else {
2009     if (key_exists != nullptr) {
2010       *key_exists = false;
2011     }
2012     *status = Status::NotFound(); // Use an empty error message for speed
2013   }
2014 }
2015 
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback)2016 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
2017                        ReadCallback* callback) {
2018   PinnedIteratorsManager pinned_iters_mgr;
2019 
2020   // Pin blocks that we read to hold merge operands
2021   if (merge_operator_) {
2022     pinned_iters_mgr.StartPinning();
2023   }
2024   uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
2025 
2026   if (vset_ && vset_->block_cache_tracer_ &&
2027       vset_->block_cache_tracer_->is_tracing_enabled()) {
2028     tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
2029   }
2030   // Even though we know the batch size won't be > MAX_BATCH_SIZE,
2031   // use autovector in order to avoid unnecessary construction of GetContext
2032   // objects, which is expensive
2033   autovector<GetContext, 16> get_ctx;
2034   for (auto iter = range->begin(); iter != range->end(); ++iter) {
2035     assert(iter->s->ok() || iter->s->IsMergeInProgress());
2036     get_ctx.emplace_back(
2037         user_comparator(), merge_operator_, info_log_, db_statistics_,
2038         iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
2039         iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
2040         &(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_,
2041         nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
2042         &iter->is_blob_index, tracing_mget_id);
2043     // MergeInProgress status, if set, has been transferred to the get_context
2044     // state, so we set status to ok here. From now on, the iter status will
2045     // be used for IO errors, and get_context state will be used for any
2046     // key level errors
2047     *(iter->s) = Status::OK();
2048   }
2049   int get_ctx_index = 0;
2050   for (auto iter = range->begin(); iter != range->end();
2051        ++iter, get_ctx_index++) {
2052     iter->get_context = &(get_ctx[get_ctx_index]);
2053   }
2054 
2055   MultiGetRange file_picker_range(*range, range->begin(), range->end());
2056   FilePickerMultiGet fp(
2057       &file_picker_range,
2058       &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
2059       &storage_info_.file_indexer_, user_comparator(), internal_comparator());
2060   FdWithKeyRange* f = fp.GetNextFile();
2061   Status s;
2062   uint64_t num_index_read = 0;
2063   uint64_t num_filter_read = 0;
2064   uint64_t num_data_read = 0;
2065   uint64_t num_sst_read = 0;
2066 
2067   while (f != nullptr) {
2068     MultiGetRange file_range = fp.CurrentFileRange();
2069     bool timer_enabled =
2070         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
2071         get_perf_context()->per_level_perf_context_enabled;
2072     StopWatchNano timer(clock_, timer_enabled /* auto_start */);
2073     s = table_cache_->MultiGet(
2074         read_options, *internal_comparator(), *f->file_metadata, &file_range,
2075         mutable_cf_options_.prefix_extractor.get(),
2076         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
2077         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
2078                         fp.IsHitFileLastInLevel()),
2079         fp.GetHitFileLevel());
2080     // TODO: examine the behavior for corrupted key
2081     if (timer_enabled) {
2082       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
2083                                 fp.GetHitFileLevel());
2084     }
2085     if (!s.ok()) {
2086       // TODO: Set status for individual keys appropriately
2087       for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
2088         *iter->s = s;
2089         file_range.MarkKeyDone(iter);
2090       }
2091       return;
2092     }
2093     uint64_t batch_size = 0;
2094     for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
2095          ++iter) {
2096       GetContext& get_context = *iter->get_context;
2097       Status* status = iter->s;
2098       // The Status in the KeyContext takes precedence over GetContext state
2099       // Status may be an error if there were any IO errors in the table
2100       // reader. We never expect Status to be NotFound(), as that is
2101       // determined by get_context
2102       assert(!status->IsNotFound());
2103       if (!status->ok()) {
2104         file_range.MarkKeyDone(iter);
2105         continue;
2106       }
2107 
2108       if (get_context.sample()) {
2109         sample_file_read_inc(f->file_metadata);
2110       }
2111       batch_size++;
2112       num_index_read += get_context.get_context_stats_.num_index_read;
2113       num_filter_read += get_context.get_context_stats_.num_filter_read;
2114       num_data_read += get_context.get_context_stats_.num_data_read;
2115       num_sst_read += get_context.get_context_stats_.num_sst_read;
2116 
2117       // report the counters before returning
2118       if (get_context.State() != GetContext::kNotFound &&
2119           get_context.State() != GetContext::kMerge &&
2120           db_statistics_ != nullptr) {
2121         get_context.ReportCounters();
2122       } else {
2123         if (iter->max_covering_tombstone_seq > 0) {
2124           // The remaining files we look at will only contain covered keys, so
2125           // we stop here for this key
2126           file_picker_range.SkipKey(iter);
2127         }
2128       }
2129       switch (get_context.State()) {
2130         case GetContext::kNotFound:
2131           // Keep searching in other files
2132           break;
2133         case GetContext::kMerge:
2134           // TODO: update per-level perfcontext user_key_return_count for kMerge
2135           break;
2136         case GetContext::kFound:
2137           if (fp.GetHitFileLevel() == 0) {
2138             RecordTick(db_statistics_, GET_HIT_L0);
2139           } else if (fp.GetHitFileLevel() == 1) {
2140             RecordTick(db_statistics_, GET_HIT_L1);
2141           } else if (fp.GetHitFileLevel() >= 2) {
2142             RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2143           }
2144 
2145           PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2146                                     fp.GetHitFileLevel());
2147 
2148           file_range.MarkKeyDone(iter);
2149 
2150           if (iter->is_blob_index) {
2151             if (iter->value) {
2152               constexpr uint64_t* bytes_read = nullptr;
2153 
2154               *status = GetBlob(read_options, iter->ukey_with_ts, *iter->value,
2155                                 iter->value, bytes_read);
2156               if (!status->ok()) {
2157                 if (status->IsIncomplete()) {
2158                   get_context.MarkKeyMayExist();
2159                 }
2160 
2161                 continue;
2162               }
2163             }
2164           }
2165 
2166           file_range.AddValueSize(iter->value->size());
2167           if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
2168             s = Status::Aborted();
2169             break;
2170           }
2171           continue;
2172         case GetContext::kDeleted:
2173           // Use empty error message for speed
2174           *status = Status::NotFound();
2175           file_range.MarkKeyDone(iter);
2176           continue;
2177         case GetContext::kCorrupt:
2178           *status =
2179               Status::Corruption("corrupted key for ", iter->lkey->user_key());
2180           file_range.MarkKeyDone(iter);
2181           continue;
2182         case GetContext::kUnexpectedBlobIndex:
2183           ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2184           *status = Status::NotSupported(
2185               "Encounter unexpected blob index. Please open DB with "
2186               "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2187           file_range.MarkKeyDone(iter);
2188           continue;
2189       }
2190     }
2191 
2192     // Report MultiGet stats per level.
2193     if (fp.IsHitFileLastInLevel()) {
2194       // Dump the stats if this is the last file of this level and reset for
2195       // next level.
2196       RecordInHistogram(db_statistics_,
2197                         NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
2198                         num_index_read + num_filter_read);
2199       RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
2200                         num_data_read);
2201       RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
2202       num_filter_read = 0;
2203       num_index_read = 0;
2204       num_data_read = 0;
2205       num_sst_read = 0;
2206     }
2207 
2208     RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2209     if (!s.ok() || file_picker_range.empty()) {
2210       break;
2211     }
2212     f = fp.GetNextFile();
2213   }
2214 
2215   // Process any left over keys
2216   for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
2217     GetContext& get_context = *iter->get_context;
2218     Status* status = iter->s;
2219     Slice user_key = iter->lkey->user_key();
2220 
2221     if (db_statistics_ != nullptr) {
2222       get_context.ReportCounters();
2223     }
2224     if (GetContext::kMerge == get_context.State()) {
2225       if (!merge_operator_) {
2226         *status = Status::InvalidArgument(
2227             "merge_operator is not properly initialized.");
2228         range->MarkKeyDone(iter);
2229         continue;
2230       }
2231       // merge_operands are in saver and we hit the beginning of the key history
2232       // do a final merge of nullptr and operands;
2233       std::string* str_value =
2234           iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2235       *status = MergeHelper::TimedFullMerge(
2236           merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2237           str_value, info_log_, db_statistics_, clock_,
2238           nullptr /* result_operand */, true);
2239       if (LIKELY(iter->value != nullptr)) {
2240         iter->value->PinSelf();
2241         range->AddValueSize(iter->value->size());
2242         range->MarkKeyDone(iter);
2243         if (range->GetValueSize() > read_options.value_size_soft_limit) {
2244           s = Status::Aborted();
2245           break;
2246         }
2247       }
2248     } else {
2249       range->MarkKeyDone(iter);
2250       *status = Status::NotFound();  // Use an empty error message for speed
2251     }
2252   }
2253 
2254   for (auto iter = range->begin(); iter != range->end(); ++iter) {
2255     range->MarkKeyDone(iter);
2256     *(iter->s) = s;
2257   }
2258 }
2259 
IsFilterSkipped(int level,bool is_file_last_in_level)2260 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2261   // Reaching the bottom level implies misses at all upper levels, so we'll
2262   // skip checking the filters when we predict a hit.
2263   return cfd_->ioptions()->optimize_filters_for_hits &&
2264          (level > 0 || is_file_last_in_level) &&
2265          level == storage_info_.num_non_empty_levels() - 1;
2266 }
2267 
GenerateLevelFilesBrief()2268 void VersionStorageInfo::GenerateLevelFilesBrief() {
2269   level_files_brief_.resize(num_non_empty_levels_);
2270   for (int level = 0; level < num_non_empty_levels_; level++) {
2271     DoGenerateLevelFilesBrief(
2272         &level_files_brief_[level], files_[level], &arena_);
2273   }
2274 }
2275 
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2276 void Version::PrepareApply(
2277     const MutableCFOptions& mutable_cf_options,
2278     bool update_stats) {
2279   TEST_SYNC_POINT_CALLBACK(
2280       "Version::PrepareApply:forced_check",
2281       reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
2282   UpdateAccumulatedStats(update_stats);
2283   storage_info_.UpdateNumNonEmptyLevels();
2284   storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2285   storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2286   storage_info_.GenerateFileIndexer();
2287   storage_info_.GenerateLevelFilesBrief();
2288   storage_info_.GenerateLevel0NonOverlapping();
2289   storage_info_.GenerateBottommostFiles();
2290 }
2291 
MaybeInitializeFileMetaData(FileMetaData * file_meta)2292 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2293   if (file_meta->init_stats_from_file ||
2294       file_meta->compensated_file_size > 0) {
2295     return false;
2296   }
2297   std::shared_ptr<const TableProperties> tp;
2298   Status s = GetTableProperties(&tp, file_meta);
2299   file_meta->init_stats_from_file = true;
2300   if (!s.ok()) {
2301     ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2302                     "Unable to load table properties for file %" PRIu64
2303                     " --- %s\n",
2304                     file_meta->fd.GetNumber(), s.ToString().c_str());
2305     return false;
2306   }
2307   if (tp.get() == nullptr) return false;
2308   file_meta->num_entries = tp->num_entries;
2309   file_meta->num_deletions = tp->num_deletions;
2310   file_meta->raw_value_size = tp->raw_value_size;
2311   file_meta->raw_key_size = tp->raw_key_size;
2312 
2313   return true;
2314 }
2315 
UpdateAccumulatedStats(FileMetaData * file_meta)2316 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2317   TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2318                            nullptr);
2319 
2320   assert(file_meta->init_stats_from_file);
2321   accumulated_file_size_ += file_meta->fd.GetFileSize();
2322   accumulated_raw_key_size_ += file_meta->raw_key_size;
2323   accumulated_raw_value_size_ += file_meta->raw_value_size;
2324   accumulated_num_non_deletions_ +=
2325       file_meta->num_entries - file_meta->num_deletions;
2326   accumulated_num_deletions_ += file_meta->num_deletions;
2327 
2328   current_num_non_deletions_ +=
2329       file_meta->num_entries - file_meta->num_deletions;
2330   current_num_deletions_ += file_meta->num_deletions;
2331   current_num_samples_++;
2332 }
2333 
RemoveCurrentStats(FileMetaData * file_meta)2334 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2335   if (file_meta->init_stats_from_file) {
2336     current_num_non_deletions_ -=
2337         file_meta->num_entries - file_meta->num_deletions;
2338     current_num_deletions_ -= file_meta->num_deletions;
2339     current_num_samples_--;
2340   }
2341 }
2342 
UpdateAccumulatedStats(bool update_stats)2343 void Version::UpdateAccumulatedStats(bool update_stats) {
2344   if (update_stats) {
2345     // maximum number of table properties loaded from files.
2346     const int kMaxInitCount = 20;
2347     int init_count = 0;
2348     // here only the first kMaxInitCount files which haven't been
2349     // initialized from file will be updated with num_deletions.
2350     // The motivation here is to cap the maximum I/O per Version creation.
2351     // The reason for choosing files from lower-level instead of higher-level
2352     // is that such design is able to propagate the initialization from
2353     // lower-level to higher-level:  When the num_deletions of lower-level
2354     // files are updated, it will make the lower-level files have accurate
2355     // compensated_file_size, making lower-level to higher-level compaction
2356     // will be triggered, which creates higher-level files whose num_deletions
2357     // will be updated here.
2358     for (int level = 0;
2359          level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2360          ++level) {
2361       for (auto* file_meta : storage_info_.files_[level]) {
2362         if (MaybeInitializeFileMetaData(file_meta)) {
2363           // each FileMeta will be initialized only once.
2364           storage_info_.UpdateAccumulatedStats(file_meta);
2365           // when option "max_open_files" is -1, all the file metadata has
2366           // already been read, so MaybeInitializeFileMetaData() won't incur
2367           // any I/O cost. "max_open_files=-1" means that the table cache passed
2368           // to the VersionSet and then to the ColumnFamilySet has a size of
2369           // TableCache::kInfiniteCapacity
2370           if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2371               TableCache::kInfiniteCapacity) {
2372             continue;
2373           }
2374           if (++init_count >= kMaxInitCount) {
2375             break;
2376           }
2377         }
2378       }
2379     }
2380     // In case all sampled-files contain only deletion entries, then we
2381     // load the table-property of a file in higher-level to initialize
2382     // that value.
2383     for (int level = storage_info_.num_levels_ - 1;
2384          storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2385          --level) {
2386       for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2387            storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2388         if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2389           storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2390         }
2391       }
2392     }
2393   }
2394 
2395   storage_info_.ComputeCompensatedSizes();
2396 }
2397 
ComputeCompensatedSizes()2398 void VersionStorageInfo::ComputeCompensatedSizes() {
2399   static const int kDeletionWeightOnCompaction = 2;
2400   uint64_t average_value_size = GetAverageValueSize();
2401 
2402   // compute the compensated size
2403   for (int level = 0; level < num_levels_; level++) {
2404     for (auto* file_meta : files_[level]) {
2405       // Here we only compute compensated_file_size for those file_meta
2406       // which compensated_file_size is uninitialized (== 0). This is true only
2407       // for files that have been created right now and no other thread has
2408       // access to them. That's why we can safely mutate compensated_file_size.
2409       if (file_meta->compensated_file_size == 0) {
2410         file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2411         // Here we only boost the size of deletion entries of a file only
2412         // when the number of deletion entries is greater than the number of
2413         // non-deletion entries in the file.  The motivation here is that in
2414         // a stable workload, the number of deletion entries should be roughly
2415         // equal to the number of non-deletion entries.  If we compensate the
2416         // size of deletion entries in a stable workload, the deletion
2417         // compensation logic might introduce unwanted effet which changes the
2418         // shape of LSM tree.
2419         if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2420           file_meta->compensated_file_size +=
2421               (file_meta->num_deletions * 2 - file_meta->num_entries) *
2422               average_value_size * kDeletionWeightOnCompaction;
2423         }
2424       }
2425     }
2426   }
2427 }
2428 
MaxInputLevel() const2429 int VersionStorageInfo::MaxInputLevel() const {
2430   if (compaction_style_ == kCompactionStyleLevel) {
2431     return num_levels() - 2;
2432   }
2433   return 0;
2434 }
2435 
MaxOutputLevel(bool allow_ingest_behind) const2436 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2437   if (allow_ingest_behind) {
2438     assert(num_levels() > 1);
2439     return num_levels() - 2;
2440   }
2441   return num_levels() - 1;
2442 }
2443 
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2444 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2445     const MutableCFOptions& mutable_cf_options) {
2446   // Only implemented for level-based compaction
2447   if (compaction_style_ != kCompactionStyleLevel) {
2448     estimated_compaction_needed_bytes_ = 0;
2449     return;
2450   }
2451 
2452   // Start from Level 0, if level 0 qualifies compaction to level 1,
2453   // we estimate the size of compaction.
2454   // Then we move on to the next level and see whether it qualifies compaction
2455   // to the next level. The size of the level is estimated as the actual size
2456   // on the level plus the input bytes from the previous level if there is any.
2457   // If it exceeds, take the exceeded bytes as compaction input and add the size
2458   // of the compaction size to tatal size.
2459   // We keep doing it to Level 2, 3, etc, until the last level and return the
2460   // accumulated bytes.
2461 
2462   uint64_t bytes_compact_to_next_level = 0;
2463   uint64_t level_size = 0;
2464   for (auto* f : files_[0]) {
2465     level_size += f->fd.GetFileSize();
2466   }
2467   // Level 0
2468   bool level0_compact_triggered = false;
2469   if (static_cast<int>(files_[0].size()) >=
2470           mutable_cf_options.level0_file_num_compaction_trigger ||
2471       level_size >= mutable_cf_options.max_bytes_for_level_base) {
2472     level0_compact_triggered = true;
2473     estimated_compaction_needed_bytes_ = level_size;
2474     bytes_compact_to_next_level = level_size;
2475   } else {
2476     estimated_compaction_needed_bytes_ = 0;
2477   }
2478 
2479   // Level 1 and up.
2480   uint64_t bytes_next_level = 0;
2481   for (int level = base_level(); level <= MaxInputLevel(); level++) {
2482     level_size = 0;
2483     if (bytes_next_level > 0) {
2484 #ifndef NDEBUG
2485       uint64_t level_size2 = 0;
2486       for (auto* f : files_[level]) {
2487         level_size2 += f->fd.GetFileSize();
2488       }
2489       assert(level_size2 == bytes_next_level);
2490 #endif
2491       level_size = bytes_next_level;
2492       bytes_next_level = 0;
2493     } else {
2494       for (auto* f : files_[level]) {
2495         level_size += f->fd.GetFileSize();
2496       }
2497     }
2498     if (level == base_level() && level0_compact_triggered) {
2499       // Add base level size to compaction if level0 compaction triggered.
2500       estimated_compaction_needed_bytes_ += level_size;
2501     }
2502     // Add size added by previous compaction
2503     level_size += bytes_compact_to_next_level;
2504     bytes_compact_to_next_level = 0;
2505     uint64_t level_target = MaxBytesForLevel(level);
2506     if (level_size > level_target) {
2507       bytes_compact_to_next_level = level_size - level_target;
2508       // Estimate the actual compaction fan-out ratio as size ratio between
2509       // the two levels.
2510 
2511       assert(bytes_next_level == 0);
2512       if (level + 1 < num_levels_) {
2513         for (auto* f : files_[level + 1]) {
2514           bytes_next_level += f->fd.GetFileSize();
2515         }
2516       }
2517       if (bytes_next_level > 0) {
2518         assert(level_size > 0);
2519         estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2520             static_cast<double>(bytes_compact_to_next_level) *
2521             (static_cast<double>(bytes_next_level) /
2522                  static_cast<double>(level_size) +
2523              1));
2524       }
2525     }
2526   }
2527 }
2528 
2529 namespace {
GetExpiredTtlFilesCount(const ImmutableOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2530 uint32_t GetExpiredTtlFilesCount(const ImmutableOptions& ioptions,
2531                                  const MutableCFOptions& mutable_cf_options,
2532                                  const std::vector<FileMetaData*>& files) {
2533   uint32_t ttl_expired_files_count = 0;
2534 
2535   int64_t _current_time;
2536   auto status = ioptions.clock->GetCurrentTime(&_current_time);
2537   if (status.ok()) {
2538     const uint64_t current_time = static_cast<uint64_t>(_current_time);
2539     for (FileMetaData* f : files) {
2540       if (!f->being_compacted) {
2541         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2542         if (oldest_ancester_time != 0 &&
2543             oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2544           ttl_expired_files_count++;
2545         }
2546       }
2547     }
2548   }
2549   return ttl_expired_files_count;
2550 }
2551 }  // anonymous namespace
2552 
ComputeCompactionScore(const ImmutableOptions & immutable_cf_options,const MutableCFOptions & mutable_cf_options)2553 void VersionStorageInfo::ComputeCompactionScore(
2554     const ImmutableOptions& immutable_cf_options,
2555     const MutableCFOptions& mutable_cf_options) {
2556   for (int level = 0; level <= MaxInputLevel(); level++) {
2557     double score;
2558     if (level == 0) {
2559       // We treat level-0 specially by bounding the number of files
2560       // instead of number of bytes for two reasons:
2561       //
2562       // (1) With larger write-buffer sizes, it is nice not to do too
2563       // many level-0 compactions.
2564       //
2565       // (2) The files in level-0 are merged on every read and
2566       // therefore we wish to avoid too many files when the individual
2567       // file size is small (perhaps because of a small write-buffer
2568       // setting, or very high compression ratios, or lots of
2569       // overwrites/deletions).
2570       int num_sorted_runs = 0;
2571       uint64_t total_size = 0;
2572       for (auto* f : files_[level]) {
2573         if (!f->being_compacted) {
2574           total_size += f->compensated_file_size;
2575           num_sorted_runs++;
2576         }
2577       }
2578       if (compaction_style_ == kCompactionStyleUniversal) {
2579         // For universal compaction, we use level0 score to indicate
2580         // compaction score for the whole DB. Adding other levels as if
2581         // they are L0 files.
2582         for (int i = 1; i < num_levels(); i++) {
2583           // Its possible that a subset of the files in a level may be in a
2584           // compaction, due to delete triggered compaction or trivial move.
2585           // In that case, the below check may not catch a level being
2586           // compacted as it only checks the first file. The worst that can
2587           // happen is a scheduled compaction thread will find nothing to do.
2588           if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2589             num_sorted_runs++;
2590           }
2591         }
2592       }
2593 
2594       if (compaction_style_ == kCompactionStyleFIFO) {
2595         score = static_cast<double>(total_size) /
2596                 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2597         if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2598           score = std::max(
2599               static_cast<double>(num_sorted_runs) /
2600                   mutable_cf_options.level0_file_num_compaction_trigger,
2601               score);
2602         }
2603         if (mutable_cf_options.ttl > 0) {
2604           score = std::max(
2605               static_cast<double>(GetExpiredTtlFilesCount(
2606                   immutable_cf_options, mutable_cf_options, files_[level])),
2607               score);
2608         }
2609 
2610       } else {
2611         score = static_cast<double>(num_sorted_runs) /
2612                 mutable_cf_options.level0_file_num_compaction_trigger;
2613         if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2614           // Level-based involves L0->L0 compactions that can lead to oversized
2615           // L0 files. Take into account size as well to avoid later giant
2616           // compactions to the base level.
2617           uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
2618           if (immutable_cf_options.level_compaction_dynamic_level_bytes &&
2619               level_multiplier_ != 0.0) {
2620             // Prevent L0 to Lbase fanout from growing larger than
2621             // `level_multiplier_`. This prevents us from getting stuck picking
2622             // L0 forever even when it is hurting write-amp. That could happen
2623             // in dynamic level compaction's write-burst mode where the base
2624             // level's target size can grow to be enormous.
2625             l0_target_size =
2626                 std::max(l0_target_size,
2627                          static_cast<uint64_t>(level_max_bytes_[base_level_] /
2628                                                level_multiplier_));
2629           }
2630           score =
2631               std::max(score, static_cast<double>(total_size) / l0_target_size);
2632         }
2633       }
2634     } else {
2635       // Compute the ratio of current size to size limit.
2636       uint64_t level_bytes_no_compacting = 0;
2637       for (auto f : files_[level]) {
2638         if (!f->being_compacted) {
2639           level_bytes_no_compacting += f->compensated_file_size;
2640         }
2641       }
2642       score = static_cast<double>(level_bytes_no_compacting) /
2643               MaxBytesForLevel(level);
2644     }
2645     compaction_level_[level] = level;
2646     compaction_score_[level] = score;
2647   }
2648 
2649   // sort all the levels based on their score. Higher scores get listed
2650   // first. Use bubble sort because the number of entries are small.
2651   for (int i = 0; i < num_levels() - 2; i++) {
2652     for (int j = i + 1; j < num_levels() - 1; j++) {
2653       if (compaction_score_[i] < compaction_score_[j]) {
2654         double score = compaction_score_[i];
2655         int level = compaction_level_[i];
2656         compaction_score_[i] = compaction_score_[j];
2657         compaction_level_[i] = compaction_level_[j];
2658         compaction_score_[j] = score;
2659         compaction_level_[j] = level;
2660       }
2661     }
2662   }
2663   ComputeFilesMarkedForCompaction();
2664   ComputeBottommostFilesMarkedForCompaction();
2665   if (mutable_cf_options.ttl > 0) {
2666     ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
2667   }
2668   if (mutable_cf_options.periodic_compaction_seconds > 0) {
2669     ComputeFilesMarkedForPeriodicCompaction(
2670         immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
2671   }
2672   EstimateCompactionBytesNeeded(mutable_cf_options);
2673 }
2674 
ComputeFilesMarkedForCompaction()2675 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2676   files_marked_for_compaction_.clear();
2677   int last_qualify_level = 0;
2678 
2679   // Do not include files from the last level with data
2680   // If table properties collector suggests a file on the last level,
2681   // we should not move it to a new level.
2682   for (int level = num_levels() - 1; level >= 1; level--) {
2683     if (!files_[level].empty()) {
2684       last_qualify_level = level - 1;
2685       break;
2686     }
2687   }
2688 
2689   for (int level = 0; level <= last_qualify_level; level++) {
2690     for (auto* f : files_[level]) {
2691       if (!f->being_compacted && f->marked_for_compaction) {
2692         files_marked_for_compaction_.emplace_back(level, f);
2693       }
2694     }
2695   }
2696 }
2697 
ComputeExpiredTtlFiles(const ImmutableOptions & ioptions,const uint64_t ttl)2698 void VersionStorageInfo::ComputeExpiredTtlFiles(
2699     const ImmutableOptions& ioptions, const uint64_t ttl) {
2700   assert(ttl > 0);
2701 
2702   expired_ttl_files_.clear();
2703 
2704   int64_t _current_time;
2705   auto status = ioptions.clock->GetCurrentTime(&_current_time);
2706   if (!status.ok()) {
2707     return;
2708   }
2709   const uint64_t current_time = static_cast<uint64_t>(_current_time);
2710 
2711   for (int level = 0; level < num_levels() - 1; level++) {
2712     for (FileMetaData* f : files_[level]) {
2713       if (!f->being_compacted) {
2714         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2715         if (oldest_ancester_time > 0 &&
2716             oldest_ancester_time < (current_time - ttl)) {
2717           expired_ttl_files_.emplace_back(level, f);
2718         }
2719       }
2720     }
2721   }
2722 }
2723 
ComputeFilesMarkedForPeriodicCompaction(const ImmutableOptions & ioptions,const uint64_t periodic_compaction_seconds)2724 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2725     const ImmutableOptions& ioptions,
2726     const uint64_t periodic_compaction_seconds) {
2727   assert(periodic_compaction_seconds > 0);
2728 
2729   files_marked_for_periodic_compaction_.clear();
2730 
2731   int64_t temp_current_time;
2732   auto status = ioptions.clock->GetCurrentTime(&temp_current_time);
2733   if (!status.ok()) {
2734     return;
2735   }
2736   const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2737 
2738   // If periodic_compaction_seconds is larger than current time, periodic
2739   // compaction can't possibly be triggered.
2740   if (periodic_compaction_seconds > current_time) {
2741     return;
2742   }
2743 
2744   const uint64_t allowed_time_limit =
2745       current_time - periodic_compaction_seconds;
2746 
2747   for (int level = 0; level < num_levels(); level++) {
2748     for (auto f : files_[level]) {
2749       if (!f->being_compacted) {
2750         // Compute a file's modification time in the following order:
2751         // 1. Use file_creation_time table property if it is > 0.
2752         // 2. Use creation_time table property if it is > 0.
2753         // 3. Use file's mtime metadata if the above two table properties are 0.
2754         // Don't consider the file at all if the modification time cannot be
2755         // correctly determined based on the above conditions.
2756         uint64_t file_modification_time = f->TryGetFileCreationTime();
2757         if (file_modification_time == kUnknownFileCreationTime) {
2758           file_modification_time = f->TryGetOldestAncesterTime();
2759         }
2760         if (file_modification_time == kUnknownOldestAncesterTime) {
2761           auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2762                                          f->fd.GetPathId());
2763           status = ioptions.env->GetFileModificationTime(
2764               file_path, &file_modification_time);
2765           if (!status.ok()) {
2766             ROCKS_LOG_WARN(ioptions.logger,
2767                            "Can't get file modification time: %s: %s",
2768                            file_path.c_str(), status.ToString().c_str());
2769             continue;
2770           }
2771         }
2772         if (file_modification_time > 0 &&
2773             file_modification_time < allowed_time_limit) {
2774           files_marked_for_periodic_compaction_.emplace_back(level, f);
2775         }
2776       }
2777     }
2778   }
2779 }
2780 
2781 namespace {
2782 
2783 // used to sort files by size
2784 struct Fsize {
2785   size_t index;
2786   FileMetaData* file;
2787 };
2788 
2789 // Comparator that is used to sort files based on their size
2790 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)2791 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2792   return (first.file->compensated_file_size >
2793       second.file->compensated_file_size);
2794 }
2795 } // anonymous namespace
2796 
AddFile(int level,FileMetaData * f)2797 void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
2798   auto& level_files = files_[level];
2799   level_files.push_back(f);
2800 
2801   f->refs++;
2802 
2803   const uint64_t file_number = f->fd.GetNumber();
2804 
2805   assert(file_locations_.find(file_number) == file_locations_.end());
2806   file_locations_.emplace(file_number,
2807                           FileLocation(level, level_files.size() - 1));
2808 }
2809 
AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta)2810 void VersionStorageInfo::AddBlobFile(
2811     std::shared_ptr<BlobFileMetaData> blob_file_meta) {
2812   assert(blob_file_meta);
2813 
2814   const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
2815 
2816   auto it = blob_files_.lower_bound(blob_file_number);
2817   assert(it == blob_files_.end() || it->first != blob_file_number);
2818 
2819   blob_files_.insert(
2820       it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
2821 }
2822 
2823 // Version::PrepareApply() need to be called before calling the function, or
2824 // following functions called:
2825 // 1. UpdateNumNonEmptyLevels();
2826 // 2. CalculateBaseBytes();
2827 // 3. UpdateFilesByCompactionPri();
2828 // 4. GenerateFileIndexer();
2829 // 5. GenerateLevelFilesBrief();
2830 // 6. GenerateLevel0NonOverlapping();
2831 // 7. GenerateBottommostFiles();
SetFinalized()2832 void VersionStorageInfo::SetFinalized() {
2833   finalized_ = true;
2834 #ifndef NDEBUG
2835   if (compaction_style_ != kCompactionStyleLevel) {
2836     // Not level based compaction.
2837     return;
2838   }
2839   assert(base_level_ < 0 || num_levels() == 1 ||
2840          (base_level_ >= 1 && base_level_ < num_levels()));
2841   // Verify all levels newer than base_level are empty except L0
2842   for (int level = 1; level < base_level(); level++) {
2843     assert(NumLevelBytes(level) == 0);
2844   }
2845   uint64_t max_bytes_prev_level = 0;
2846   for (int level = base_level(); level < num_levels() - 1; level++) {
2847     if (LevelFiles(level).size() == 0) {
2848       continue;
2849     }
2850     assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
2851     max_bytes_prev_level = MaxBytesForLevel(level);
2852   }
2853   int num_empty_non_l0_level = 0;
2854   for (int level = 0; level < num_levels(); level++) {
2855     assert(LevelFiles(level).size() == 0 ||
2856            LevelFiles(level).size() == LevelFilesBrief(level).num_files);
2857     if (level > 0 && NumLevelBytes(level) > 0) {
2858       num_empty_non_l0_level++;
2859     }
2860     if (LevelFiles(level).size() > 0) {
2861       assert(level < num_non_empty_levels());
2862     }
2863   }
2864   assert(compaction_level_.size() > 0);
2865   assert(compaction_level_.size() == compaction_score_.size());
2866 #endif
2867 }
2868 
UpdateNumNonEmptyLevels()2869 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2870   num_non_empty_levels_ = num_levels_;
2871   for (int i = num_levels_ - 1; i >= 0; i--) {
2872     if (files_[i].size() != 0) {
2873       return;
2874     } else {
2875       num_non_empty_levels_ = i;
2876     }
2877   }
2878 }
2879 
2880 namespace {
2881 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)2882 void SortFileByOverlappingRatio(
2883     const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
2884     const std::vector<FileMetaData*>& next_level_files,
2885     std::vector<Fsize>* temp) {
2886   std::unordered_map<uint64_t, uint64_t> file_to_order;
2887   auto next_level_it = next_level_files.begin();
2888 
2889   for (auto& file : files) {
2890     uint64_t overlapping_bytes = 0;
2891     // Skip files in next level that is smaller than current file
2892     while (next_level_it != next_level_files.end() &&
2893            icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
2894       next_level_it++;
2895     }
2896 
2897     while (next_level_it != next_level_files.end() &&
2898            icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
2899       overlapping_bytes += (*next_level_it)->fd.file_size;
2900 
2901       if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
2902         // next level file cross large boundary of current file.
2903         break;
2904       }
2905       next_level_it++;
2906     }
2907 
2908     assert(file->compensated_file_size != 0);
2909     file_to_order[file->fd.GetNumber()] =
2910         overlapping_bytes * 1024u / file->compensated_file_size;
2911   }
2912 
2913   std::sort(temp->begin(), temp->end(),
2914             [&](const Fsize& f1, const Fsize& f2) -> bool {
2915               return file_to_order[f1.file->fd.GetNumber()] <
2916                      file_to_order[f2.file->fd.GetNumber()];
2917             });
2918 }
2919 }  // namespace
2920 
UpdateFilesByCompactionPri(CompactionPri compaction_pri)2921 void VersionStorageInfo::UpdateFilesByCompactionPri(
2922     CompactionPri compaction_pri) {
2923   if (compaction_style_ == kCompactionStyleNone ||
2924       compaction_style_ == kCompactionStyleFIFO ||
2925       compaction_style_ == kCompactionStyleUniversal) {
2926     // don't need this
2927     return;
2928   }
2929   // No need to sort the highest level because it is never compacted.
2930   for (int level = 0; level < num_levels() - 1; level++) {
2931     const std::vector<FileMetaData*>& files = files_[level];
2932     auto& files_by_compaction_pri = files_by_compaction_pri_[level];
2933     assert(files_by_compaction_pri.size() == 0);
2934 
2935     // populate a temp vector for sorting based on size
2936     std::vector<Fsize> temp(files.size());
2937     for (size_t i = 0; i < files.size(); i++) {
2938       temp[i].index = i;
2939       temp[i].file = files[i];
2940     }
2941 
2942     // sort the top number_of_files_to_sort_ based on file size
2943     size_t num = VersionStorageInfo::kNumberFilesToSort;
2944     if (num > temp.size()) {
2945       num = temp.size();
2946     }
2947     switch (compaction_pri) {
2948       case kByCompensatedSize:
2949         std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
2950                           CompareCompensatedSizeDescending);
2951         break;
2952       case kOldestLargestSeqFirst:
2953         std::sort(temp.begin(), temp.end(),
2954                   [](const Fsize& f1, const Fsize& f2) -> bool {
2955                     return f1.file->fd.largest_seqno <
2956                            f2.file->fd.largest_seqno;
2957                   });
2958         break;
2959       case kOldestSmallestSeqFirst:
2960         std::sort(temp.begin(), temp.end(),
2961                   [](const Fsize& f1, const Fsize& f2) -> bool {
2962                     return f1.file->fd.smallest_seqno <
2963                            f2.file->fd.smallest_seqno;
2964                   });
2965         break;
2966       case kMinOverlappingRatio:
2967         SortFileByOverlappingRatio(*internal_comparator_, files_[level],
2968                                    files_[level + 1], &temp);
2969         break;
2970       default:
2971         assert(false);
2972     }
2973     assert(temp.size() == files.size());
2974 
2975     // initialize files_by_compaction_pri_
2976     for (size_t i = 0; i < temp.size(); i++) {
2977       files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2978     }
2979     next_file_to_compact_by_size_[level] = 0;
2980     assert(files_[level].size() == files_by_compaction_pri_[level].size());
2981   }
2982 }
2983 
GenerateLevel0NonOverlapping()2984 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2985   assert(!finalized_);
2986   level0_non_overlapping_ = true;
2987   if (level_files_brief_.size() == 0) {
2988     return;
2989   }
2990 
2991   // A copy of L0 files sorted by smallest key
2992   std::vector<FdWithKeyRange> level0_sorted_file(
2993       level_files_brief_[0].files,
2994       level_files_brief_[0].files + level_files_brief_[0].num_files);
2995   std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
2996             [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
2997               return (internal_comparator_->Compare(f1.smallest_key,
2998                                                     f2.smallest_key) < 0);
2999             });
3000 
3001   for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
3002     FdWithKeyRange& f = level0_sorted_file[i];
3003     FdWithKeyRange& prev = level0_sorted_file[i - 1];
3004     if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
3005       level0_non_overlapping_ = false;
3006       break;
3007     }
3008   }
3009 }
3010 
GenerateBottommostFiles()3011 void VersionStorageInfo::GenerateBottommostFiles() {
3012   assert(!finalized_);
3013   assert(bottommost_files_.empty());
3014   for (size_t level = 0; level < level_files_brief_.size(); ++level) {
3015     for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
3016          ++file_idx) {
3017       const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
3018       int l0_file_idx;
3019       if (level == 0) {
3020         l0_file_idx = static_cast<int>(file_idx);
3021       } else {
3022         l0_file_idx = -1;
3023       }
3024       Slice smallest_user_key = ExtractUserKey(f.smallest_key);
3025       Slice largest_user_key = ExtractUserKey(f.largest_key);
3026       if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
3027                                          static_cast<int>(level),
3028                                          l0_file_idx)) {
3029         bottommost_files_.emplace_back(static_cast<int>(level),
3030                                        f.file_metadata);
3031       }
3032     }
3033   }
3034 }
3035 
UpdateOldestSnapshot(SequenceNumber seqnum)3036 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
3037   assert(seqnum >= oldest_snapshot_seqnum_);
3038   oldest_snapshot_seqnum_ = seqnum;
3039   if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
3040     ComputeBottommostFilesMarkedForCompaction();
3041   }
3042 }
3043 
ComputeBottommostFilesMarkedForCompaction()3044 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
3045   bottommost_files_marked_for_compaction_.clear();
3046   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3047   for (auto& level_and_file : bottommost_files_) {
3048     if (!level_and_file.second->being_compacted &&
3049         level_and_file.second->fd.largest_seqno != 0 &&
3050         level_and_file.second->num_deletions > 1) {
3051       // largest_seqno might be nonzero due to containing the final key in an
3052       // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
3053       // ensures the file really contains deleted or overwritten keys.
3054       if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
3055         bottommost_files_marked_for_compaction_.push_back(level_and_file);
3056       } else {
3057         bottommost_files_mark_threshold_ =
3058             std::min(bottommost_files_mark_threshold_,
3059                      level_and_file.second->fd.largest_seqno);
3060       }
3061     }
3062   }
3063 }
3064 
Ref()3065 void Version::Ref() {
3066   ++refs_;
3067 }
3068 
Unref()3069 bool Version::Unref() {
3070   assert(refs_ >= 1);
3071   --refs_;
3072   if (refs_ == 0) {
3073     delete this;
3074     return true;
3075   }
3076   return false;
3077 }
3078 
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)3079 bool VersionStorageInfo::OverlapInLevel(int level,
3080                                         const Slice* smallest_user_key,
3081                                         const Slice* largest_user_key) {
3082   if (level >= num_non_empty_levels_) {
3083     // empty level, no overlap
3084     return false;
3085   }
3086   return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
3087                                level_files_brief_[level], smallest_user_key,
3088                                largest_user_key);
3089 }
3090 
3091 // Store in "*inputs" all files in "level" that overlap [begin,end]
3092 // If hint_index is specified, then it points to a file in the
3093 // overlapping range.
3094 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const3095 void VersionStorageInfo::GetOverlappingInputs(
3096     int level, const InternalKey* begin, const InternalKey* end,
3097     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3098     bool expand_range, InternalKey** next_smallest) const {
3099   if (level >= num_non_empty_levels_) {
3100     // this level is empty, no overlapping inputs
3101     return;
3102   }
3103 
3104   inputs->clear();
3105   if (file_index) {
3106     *file_index = -1;
3107   }
3108   const Comparator* user_cmp = user_comparator_;
3109   if (level > 0) {
3110     GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
3111                                           file_index, false, next_smallest);
3112     return;
3113   }
3114 
3115   if (next_smallest) {
3116     // next_smallest key only makes sense for non-level 0, where files are
3117     // non-overlapping
3118     *next_smallest = nullptr;
3119   }
3120 
3121   Slice user_begin, user_end;
3122   if (begin != nullptr) {
3123     user_begin = begin->user_key();
3124   }
3125   if (end != nullptr) {
3126     user_end = end->user_key();
3127   }
3128 
3129   // index stores the file index need to check.
3130   std::list<size_t> index;
3131   for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
3132     index.emplace_back(i);
3133   }
3134 
3135   while (!index.empty()) {
3136     bool found_overlapping_file = false;
3137     auto iter = index.begin();
3138     while (iter != index.end()) {
3139       FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
3140       const Slice file_start = ExtractUserKey(f->smallest_key);
3141       const Slice file_limit = ExtractUserKey(f->largest_key);
3142       if (begin != nullptr &&
3143           user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
3144         // "f" is completely before specified range; skip it
3145         iter++;
3146       } else if (end != nullptr &&
3147                  user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
3148         // "f" is completely after specified range; skip it
3149         iter++;
3150       } else {
3151         // if overlap
3152         inputs->emplace_back(files_[level][*iter]);
3153         found_overlapping_file = true;
3154         // record the first file index.
3155         if (file_index && *file_index == -1) {
3156           *file_index = static_cast<int>(*iter);
3157         }
3158         // the related file is overlap, erase to avoid checking again.
3159         iter = index.erase(iter);
3160         if (expand_range) {
3161           if (begin != nullptr &&
3162               user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
3163             user_begin = file_start;
3164           }
3165           if (end != nullptr &&
3166               user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
3167             user_end = file_limit;
3168           }
3169         }
3170       }
3171     }
3172     // if all the files left are not overlap, break
3173     if (!found_overlapping_file) {
3174       break;
3175     }
3176   }
3177 }
3178 
3179 // Store in "*inputs" files in "level" that within range [begin,end]
3180 // Guarantee a "clean cut" boundary between the files in inputs
3181 // and the surrounding files and the maxinum number of files.
3182 // This will ensure that no parts of a key are lost during compaction.
3183 // If hint_index is specified, then it points to a file in the range.
3184 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const3185 void VersionStorageInfo::GetCleanInputsWithinInterval(
3186     int level, const InternalKey* begin, const InternalKey* end,
3187     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
3188   inputs->clear();
3189   if (file_index) {
3190     *file_index = -1;
3191   }
3192   if (level >= num_non_empty_levels_ || level == 0 ||
3193       level_files_brief_[level].num_files == 0) {
3194     // this level is empty, no inputs within range
3195     // also don't support clean input interval within L0
3196     return;
3197   }
3198 
3199   GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3200                                         hint_index, file_index,
3201                                         true /* within_interval */);
3202 }
3203 
3204 // Store in "*inputs" all files in "level" that overlap [begin,end]
3205 // Employ binary search to find at least one file that overlaps the
3206 // specified range. From that file, iterate backwards and
3207 // forwards to find all overlapping files.
3208 // if within_range is set, then only store the maximum clean inputs
3209 // within range [begin, end]. "clean" means there is a boundary
3210 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3211 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3212     int level, const InternalKey* begin, const InternalKey* end,
3213     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3214     bool within_interval, InternalKey** next_smallest) const {
3215   assert(level > 0);
3216 
3217   auto user_cmp = user_comparator_;
3218   const FdWithKeyRange* files = level_files_brief_[level].files;
3219   const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3220 
3221   // begin to use binary search to find lower bound
3222   // and upper bound.
3223   int start_index = 0;
3224   int end_index = num_files;
3225 
3226   if (begin != nullptr) {
3227     // if within_interval is true, with file_key would find
3228     // not overlapping ranges in std::lower_bound.
3229     auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3230                                              const InternalKey* k) {
3231       auto& file_key = within_interval ? f.file_metadata->smallest
3232                                        : f.file_metadata->largest;
3233       return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3234     };
3235 
3236     start_index = static_cast<int>(
3237         std::lower_bound(files,
3238                          files + (hint_index == -1 ? num_files : hint_index),
3239                          begin, cmp) -
3240         files);
3241 
3242     if (start_index > 0 && within_interval) {
3243       bool is_overlapping = true;
3244       while (is_overlapping && start_index < num_files) {
3245         auto& pre_limit = files[start_index - 1].file_metadata->largest;
3246         auto& cur_start = files[start_index].file_metadata->smallest;
3247         is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3248         start_index += is_overlapping;
3249       }
3250     }
3251   }
3252 
3253   if (end != nullptr) {
3254     // if within_interval is true, with file_key would find
3255     // not overlapping ranges in std::upper_bound.
3256     auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3257                                              const FdWithKeyRange& f) {
3258       auto& file_key = within_interval ? f.file_metadata->largest
3259                                        : f.file_metadata->smallest;
3260       return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3261     };
3262 
3263     end_index = static_cast<int>(
3264         std::upper_bound(files + start_index, files + num_files, end, cmp) -
3265         files);
3266 
3267     if (end_index < num_files && within_interval) {
3268       bool is_overlapping = true;
3269       while (is_overlapping && end_index > start_index) {
3270         auto& next_start = files[end_index].file_metadata->smallest;
3271         auto& cur_limit = files[end_index - 1].file_metadata->largest;
3272         is_overlapping =
3273             sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3274         end_index -= is_overlapping;
3275       }
3276     }
3277   }
3278 
3279   assert(start_index <= end_index);
3280 
3281   // If there were no overlapping files, return immediately.
3282   if (start_index == end_index) {
3283     if (next_smallest) {
3284       *next_smallest = nullptr;
3285     }
3286     return;
3287   }
3288 
3289   assert(start_index < end_index);
3290 
3291   // returns the index where an overlap is found
3292   if (file_index) {
3293     *file_index = start_index;
3294   }
3295 
3296   // insert overlapping files into vector
3297   for (int i = start_index; i < end_index; i++) {
3298     inputs->push_back(files_[level][i]);
3299   }
3300 
3301   if (next_smallest != nullptr) {
3302     // Provide the next key outside the range covered by inputs
3303     if (end_index < static_cast<int>(files_[level].size())) {
3304       **next_smallest = files_[level][end_index]->smallest;
3305     } else {
3306       *next_smallest = nullptr;
3307     }
3308   }
3309 }
3310 
NumLevelBytes(int level) const3311 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3312   assert(level >= 0);
3313   assert(level < num_levels());
3314   return TotalFileSize(files_[level]);
3315 }
3316 
LevelSummary(LevelSummaryStorage * scratch) const3317 const char* VersionStorageInfo::LevelSummary(
3318     LevelSummaryStorage* scratch) const {
3319   int len = 0;
3320   if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3321     assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3322     if (level_multiplier_ != 0.0) {
3323       len = snprintf(
3324           scratch->buffer, sizeof(scratch->buffer),
3325           "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3326           base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3327     }
3328   }
3329   len +=
3330       snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3331   for (int i = 0; i < num_levels(); i++) {
3332     int sz = sizeof(scratch->buffer) - len;
3333     int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3334     if (ret < 0 || ret >= sz) break;
3335     len += ret;
3336   }
3337   if (len > 0) {
3338     // overwrite the last space
3339     --len;
3340   }
3341   len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3342                   "] max score %.2f", compaction_score_[0]);
3343 
3344   if (!files_marked_for_compaction_.empty()) {
3345     snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3346              " (%" ROCKSDB_PRIszt " files need compaction)",
3347              files_marked_for_compaction_.size());
3348   }
3349 
3350   return scratch->buffer;
3351 }
3352 
LevelFileSummary(FileSummaryStorage * scratch,int level) const3353 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3354                                                  int level) const {
3355   int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3356   for (const auto& f : files_[level]) {
3357     int sz = sizeof(scratch->buffer) - len;
3358     char sztxt[16];
3359     AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3360     int ret = snprintf(scratch->buffer + len, sz,
3361                        "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3362                        f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3363                        static_cast<int>(f->being_compacted));
3364     if (ret < 0 || ret >= sz)
3365       break;
3366     len += ret;
3367   }
3368   // overwrite the last space (only if files_[level].size() is non-zero)
3369   if (files_[level].size() && len > 0) {
3370     --len;
3371   }
3372   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3373   return scratch->buffer;
3374 }
3375 
MaxNextLevelOverlappingBytes()3376 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3377   uint64_t result = 0;
3378   std::vector<FileMetaData*> overlaps;
3379   for (int level = 1; level < num_levels() - 1; level++) {
3380     for (const auto& f : files_[level]) {
3381       GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3382       const uint64_t sum = TotalFileSize(overlaps);
3383       if (sum > result) {
3384         result = sum;
3385       }
3386     }
3387   }
3388   return result;
3389 }
3390 
MaxBytesForLevel(int level) const3391 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3392   // Note: the result for level zero is not really used since we set
3393   // the level-0 compaction threshold based on number of files.
3394   assert(level >= 0);
3395   assert(level < static_cast<int>(level_max_bytes_.size()));
3396   return level_max_bytes_[level];
3397 }
3398 
CalculateBaseBytes(const ImmutableOptions & ioptions,const MutableCFOptions & options)3399 void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
3400                                             const MutableCFOptions& options) {
3401   // Special logic to set number of sorted runs.
3402   // It is to match the previous behavior when all files are in L0.
3403   int num_l0_count = static_cast<int>(files_[0].size());
3404   if (compaction_style_ == kCompactionStyleUniversal) {
3405     // For universal compaction, we use level0 score to indicate
3406     // compaction score for the whole DB. Adding other levels as if
3407     // they are L0 files.
3408     for (int i = 1; i < num_levels(); i++) {
3409       if (!files_[i].empty()) {
3410         num_l0_count++;
3411       }
3412     }
3413   }
3414   set_l0_delay_trigger_count(num_l0_count);
3415 
3416   level_max_bytes_.resize(ioptions.num_levels);
3417   if (!ioptions.level_compaction_dynamic_level_bytes) {
3418     base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3419 
3420     // Calculate for static bytes base case
3421     for (int i = 0; i < ioptions.num_levels; ++i) {
3422       if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3423         level_max_bytes_[i] = options.max_bytes_for_level_base;
3424       } else if (i > 1) {
3425         level_max_bytes_[i] = MultiplyCheckOverflow(
3426             MultiplyCheckOverflow(level_max_bytes_[i - 1],
3427                                   options.max_bytes_for_level_multiplier),
3428             options.MaxBytesMultiplerAdditional(i - 1));
3429       } else {
3430         level_max_bytes_[i] = options.max_bytes_for_level_base;
3431       }
3432     }
3433   } else {
3434     uint64_t max_level_size = 0;
3435 
3436     int first_non_empty_level = -1;
3437     // Find size of non-L0 level of most data.
3438     // Cannot use the size of the last level because it can be empty or less
3439     // than previous levels after compaction.
3440     for (int i = 1; i < num_levels_; i++) {
3441       uint64_t total_size = 0;
3442       for (const auto& f : files_[i]) {
3443         total_size += f->fd.GetFileSize();
3444       }
3445       if (total_size > 0 && first_non_empty_level == -1) {
3446         first_non_empty_level = i;
3447       }
3448       if (total_size > max_level_size) {
3449         max_level_size = total_size;
3450       }
3451     }
3452 
3453     // Prefill every level's max bytes to disallow compaction from there.
3454     for (int i = 0; i < num_levels_; i++) {
3455       level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3456     }
3457 
3458     if (max_level_size == 0) {
3459       // No data for L1 and up. L0 compacts to last level directly.
3460       // No compaction from L1+ needs to be scheduled.
3461       base_level_ = num_levels_ - 1;
3462     } else {
3463       uint64_t l0_size = 0;
3464       for (const auto& f : files_[0]) {
3465         l0_size += f->fd.GetFileSize();
3466       }
3467 
3468       uint64_t base_bytes_max =
3469           std::max(options.max_bytes_for_level_base, l0_size);
3470       uint64_t base_bytes_min = static_cast<uint64_t>(
3471           base_bytes_max / options.max_bytes_for_level_multiplier);
3472 
3473       // Try whether we can make last level's target size to be max_level_size
3474       uint64_t cur_level_size = max_level_size;
3475       for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3476         // Round up after dividing
3477         cur_level_size = static_cast<uint64_t>(
3478             cur_level_size / options.max_bytes_for_level_multiplier);
3479       }
3480 
3481       // Calculate base level and its size.
3482       uint64_t base_level_size;
3483       if (cur_level_size <= base_bytes_min) {
3484         // Case 1. If we make target size of last level to be max_level_size,
3485         // target size of the first non-empty level would be smaller than
3486         // base_bytes_min. We set it be base_bytes_min.
3487         base_level_size = base_bytes_min + 1U;
3488         base_level_ = first_non_empty_level;
3489         ROCKS_LOG_INFO(ioptions.logger,
3490                        "More existing levels in DB than needed. "
3491                        "max_bytes_for_level_multiplier may not be guaranteed.");
3492       } else {
3493         // Find base level (where L0 data is compacted to).
3494         base_level_ = first_non_empty_level;
3495         while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3496           --base_level_;
3497           cur_level_size = static_cast<uint64_t>(
3498               cur_level_size / options.max_bytes_for_level_multiplier);
3499         }
3500         if (cur_level_size > base_bytes_max) {
3501           // Even L1 will be too large
3502           assert(base_level_ == 1);
3503           base_level_size = base_bytes_max;
3504         } else {
3505           base_level_size = cur_level_size;
3506         }
3507       }
3508 
3509       level_multiplier_ = options.max_bytes_for_level_multiplier;
3510       assert(base_level_size > 0);
3511       if (l0_size > base_level_size &&
3512           (l0_size > options.max_bytes_for_level_base ||
3513            static_cast<int>(files_[0].size() / 2) >=
3514                options.level0_file_num_compaction_trigger)) {
3515         // We adjust the base level according to actual L0 size, and adjust
3516         // the level multiplier accordingly, when:
3517         //   1. the L0 size is larger than level size base, or
3518         //   2. number of L0 files reaches twice the L0->L1 compaction trigger
3519         // We don't do this otherwise to keep the LSM-tree structure stable
3520         // unless the L0 compaction is backlogged.
3521         base_level_size = l0_size;
3522         if (base_level_ == num_levels_ - 1) {
3523           level_multiplier_ = 1.0;
3524         } else {
3525           level_multiplier_ = std::pow(
3526               static_cast<double>(max_level_size) /
3527                   static_cast<double>(base_level_size),
3528               1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3529         }
3530       }
3531 
3532       uint64_t level_size = base_level_size;
3533       for (int i = base_level_; i < num_levels_; i++) {
3534         if (i > base_level_) {
3535           level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3536         }
3537         // Don't set any level below base_bytes_max. Otherwise, the LSM can
3538         // assume an hourglass shape where L1+ sizes are smaller than L0. This
3539         // causes compaction scoring, which depends on level sizes, to favor L1+
3540         // at the expense of L0, which may fill up and stall.
3541         level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3542       }
3543     }
3544   }
3545 }
3546 
EstimateLiveDataSize() const3547 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3548   // Estimate the live data size by adding up the size of a maximal set of
3549   // sst files with no range overlap in same or higher level. The less
3550   // compacted, the more optimistic (smaller) this estimate is. Also,
3551   // for multiple sorted runs within a level, file order will matter.
3552   uint64_t size = 0;
3553 
3554   auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3555     return internal_comparator_->Compare(*x, *y) < 0;
3556   };
3557   // (Ordered) map of largest keys in files being included in size estimate
3558   std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3559 
3560   for (int l = num_levels_ - 1; l >= 0; l--) {
3561     bool found_end = false;
3562     for (auto file : files_[l]) {
3563       // Find the first file already included with largest key is larger than
3564       // the smallest key of `file`. If that file does not overlap with the
3565       // current file, none of the files in the map does. If there is
3566       // no potential overlap, we can safely insert the rest of this level
3567       // (if the level is not 0) into the map without checking again because
3568       // the elements in the level are sorted and non-overlapping.
3569       auto lb = (found_end && l != 0) ?
3570         ranges.end() : ranges.lower_bound(&file->smallest);
3571       found_end = (lb == ranges.end());
3572       if (found_end || internal_comparator_->Compare(
3573             file->largest, (*lb).second->smallest) < 0) {
3574           ranges.emplace_hint(lb, &file->largest, file);
3575           size += file->fd.file_size;
3576       }
3577     }
3578   }
3579   return size;
3580 }
3581 
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3582 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3583     const Slice& smallest_user_key, const Slice& largest_user_key,
3584     int last_level, int last_l0_idx) {
3585   assert((last_l0_idx != -1) == (last_level == 0));
3586   // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3587   // bottommost only if it's the oldest L0 file and there are no files on older
3588   // levels. It'd be better to consider it bottommost if there's no overlap in
3589   // older levels/files.
3590   if (last_level == 0 &&
3591       last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3592     return true;
3593   }
3594 
3595   // Checks whether there are files living beyond the `last_level`. If lower
3596   // levels have files, it checks for overlap between [`smallest_key`,
3597   // `largest_key`] and those files. Bottomlevel optimizations can be made if
3598   // there are no files in lower levels or if there is no overlap with the files
3599   // in the lower levels.
3600   for (int level = last_level + 1; level < num_levels(); level++) {
3601     // The range is not in the bottommost level if there are files in lower
3602     // levels when the `last_level` is 0 or if there are files in lower levels
3603     // which overlap with [`smallest_key`, `largest_key`].
3604     if (files_[level].size() > 0 &&
3605         (last_level == 0 ||
3606          OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3607       return true;
3608     }
3609   }
3610   return false;
3611 }
3612 
AddLiveFiles(std::vector<uint64_t> * live_table_files,std::vector<uint64_t> * live_blob_files) const3613 void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
3614                            std::vector<uint64_t>* live_blob_files) const {
3615   assert(live_table_files);
3616   assert(live_blob_files);
3617 
3618   for (int level = 0; level < storage_info_.num_levels(); ++level) {
3619     const auto& level_files = storage_info_.LevelFiles(level);
3620     for (const auto& meta : level_files) {
3621       assert(meta);
3622 
3623       live_table_files->emplace_back(meta->fd.GetNumber());
3624     }
3625   }
3626 
3627   const auto& blob_files = storage_info_.GetBlobFiles();
3628   for (const auto& pair : blob_files) {
3629     const auto& meta = pair.second;
3630     assert(meta);
3631 
3632     live_blob_files->emplace_back(meta->GetBlobFileNumber());
3633   }
3634 }
3635 
DebugString(bool hex,bool print_stats) const3636 std::string Version::DebugString(bool hex, bool print_stats) const {
3637   std::string r;
3638   for (int level = 0; level < storage_info_.num_levels_; level++) {
3639     // E.g.,
3640     //   --- level 1 ---
3641     //   17:123[1 .. 124]['a' .. 'd']
3642     //   20:43[124 .. 128]['e' .. 'g']
3643     //
3644     // if print_stats=true:
3645     //   17:123[1 .. 124]['a' .. 'd'](4096)
3646     r.append("--- level ");
3647     AppendNumberTo(&r, level);
3648     r.append(" --- version# ");
3649     AppendNumberTo(&r, version_number_);
3650     r.append(" ---\n");
3651     const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3652     for (size_t i = 0; i < files.size(); i++) {
3653       r.push_back(' ');
3654       AppendNumberTo(&r, files[i]->fd.GetNumber());
3655       r.push_back(':');
3656       AppendNumberTo(&r, files[i]->fd.GetFileSize());
3657       r.append("[");
3658       AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3659       r.append(" .. ");
3660       AppendNumberTo(&r, files[i]->fd.largest_seqno);
3661       r.append("]");
3662       r.append("[");
3663       r.append(files[i]->smallest.DebugString(hex));
3664       r.append(" .. ");
3665       r.append(files[i]->largest.DebugString(hex));
3666       r.append("]");
3667       if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3668         r.append(" blob_file:");
3669         AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3670       }
3671       if (print_stats) {
3672         r.append("(");
3673         r.append(ToString(
3674             files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3675         r.append(")");
3676       }
3677       r.append("\n");
3678     }
3679   }
3680 
3681   const auto& blob_files = storage_info_.GetBlobFiles();
3682   if (!blob_files.empty()) {
3683     r.append("--- blob files --- version# ");
3684     AppendNumberTo(&r, version_number_);
3685     r.append(" ---\n");
3686     for (const auto& pair : blob_files) {
3687       const auto& blob_file_meta = pair.second;
3688       assert(blob_file_meta);
3689 
3690       r.append(blob_file_meta->DebugString());
3691       r.push_back('\n');
3692     }
3693   }
3694 
3695   return r;
3696 }
3697 
3698 // this is used to batch writes to the manifest file
3699 struct VersionSet::ManifestWriter {
3700   Status status;
3701   bool done;
3702   InstrumentedCondVar cv;
3703   ColumnFamilyData* cfd;
3704   const MutableCFOptions mutable_cf_options;
3705   const autovector<VersionEdit*>& edit_list;
3706   const std::function<void(const Status&)> manifest_write_callback;
3707 
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3708   explicit ManifestWriter(
3709       InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3710       const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
3711       const std::function<void(const Status&)>& manifest_wcb)
3712       : done(false),
3713         cv(mu),
3714         cfd(_cfd),
3715         mutable_cf_options(cf_options),
3716         edit_list(e),
3717         manifest_write_callback(manifest_wcb) {}
~ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3718   ~ManifestWriter() { status.PermitUncheckedError(); }
3719 
IsAllWalEditsROCKSDB_NAMESPACE::VersionSet::ManifestWriter3720   bool IsAllWalEdits() const {
3721     bool all_wal_edits = true;
3722     for (const auto& e : edit_list) {
3723       if (!e->IsWalManipulation()) {
3724         all_wal_edits = false;
3725         break;
3726       }
3727     }
3728     return all_wal_edits;
3729   }
3730 };
3731 
AddEdit(VersionEdit * edit)3732 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3733   assert(edit);
3734   if (edit->is_in_atomic_group_) {
3735     TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3736     if (replay_buffer_.empty()) {
3737       replay_buffer_.resize(edit->remaining_entries_ + 1);
3738       TEST_SYNC_POINT_CALLBACK(
3739           "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
3740     }
3741     read_edits_in_atomic_group_++;
3742     if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
3743         static_cast<uint32_t>(replay_buffer_.size())) {
3744       TEST_SYNC_POINT_CALLBACK(
3745           "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
3746       return Status::Corruption("corrupted atomic group");
3747     }
3748     replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3749     if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
3750       TEST_SYNC_POINT_CALLBACK(
3751           "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
3752       return Status::OK();
3753     }
3754     return Status::OK();
3755   }
3756 
3757   // A normal edit.
3758   if (!replay_buffer().empty()) {
3759     TEST_SYNC_POINT_CALLBACK(
3760         "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
3761     return Status::Corruption("corrupted atomic group");
3762   }
3763   return Status::OK();
3764 }
3765 
IsFull() const3766 bool AtomicGroupReadBuffer::IsFull() const {
3767   return read_edits_in_atomic_group_ == replay_buffer_.size();
3768 }
3769 
IsEmpty() const3770 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
3771 
Clear()3772 void AtomicGroupReadBuffer::Clear() {
3773   read_edits_in_atomic_group_ = 0;
3774   replay_buffer_.clear();
3775 }
3776 
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer)3777 VersionSet::VersionSet(const std::string& dbname,
3778                        const ImmutableDBOptions* _db_options,
3779                        const FileOptions& storage_options, Cache* table_cache,
3780                        WriteBufferManager* write_buffer_manager,
3781                        WriteController* write_controller,
3782                        BlockCacheTracer* const block_cache_tracer,
3783                        const std::shared_ptr<IOTracer>& io_tracer)
3784     : column_family_set_(
3785           new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
3786                               write_buffer_manager, write_controller,
3787                               block_cache_tracer, io_tracer)),
3788       table_cache_(table_cache),
3789       env_(_db_options->env),
3790       fs_(_db_options->fs, io_tracer),
3791       clock_(_db_options->clock),
3792       dbname_(dbname),
3793       db_options_(_db_options),
3794       next_file_number_(2),
3795       manifest_file_number_(0),  // Filled by Recover()
3796       options_file_number_(0),
3797       pending_manifest_file_number_(0),
3798       last_sequence_(0),
3799       last_allocated_sequence_(0),
3800       last_published_sequence_(0),
3801       prev_log_number_(0),
3802       current_version_number_(0),
3803       manifest_file_size_(0),
3804       file_options_(storage_options),
3805       block_cache_tracer_(block_cache_tracer),
3806       io_tracer_(io_tracer) {}
3807 
~VersionSet()3808 VersionSet::~VersionSet() {
3809   // we need to delete column_family_set_ because its destructor depends on
3810   // VersionSet
3811   column_family_set_.reset();
3812   for (auto& file : obsolete_files_) {
3813     if (file.metadata->table_reader_handle) {
3814       table_cache_->Release(file.metadata->table_reader_handle);
3815       TableCache::Evict(table_cache_, file.metadata->fd.GetNumber());
3816     }
3817     file.DeleteMetadata();
3818   }
3819   obsolete_files_.clear();
3820   io_status_.PermitUncheckedError();
3821 }
3822 
Reset()3823 void VersionSet::Reset() {
3824   if (column_family_set_) {
3825     WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
3826     WriteController* wc = column_family_set_->write_controller();
3827     column_family_set_.reset(
3828         new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_,
3829                             wbm, wc, block_cache_tracer_, io_tracer_));
3830   }
3831   db_id_.clear();
3832   next_file_number_.store(2);
3833   min_log_number_to_keep_2pc_.store(0);
3834   manifest_file_number_ = 0;
3835   options_file_number_ = 0;
3836   pending_manifest_file_number_ = 0;
3837   last_sequence_.store(0);
3838   last_allocated_sequence_.store(0);
3839   last_published_sequence_.store(0);
3840   prev_log_number_ = 0;
3841   descriptor_log_.reset();
3842   current_version_number_ = 0;
3843   manifest_writers_.clear();
3844   manifest_file_size_ = 0;
3845   obsolete_files_.clear();
3846   obsolete_manifests_.clear();
3847   wals_.Reset();
3848 }
3849 
AppendVersion(ColumnFamilyData * column_family_data,Version * v)3850 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
3851                                Version* v) {
3852   // compute new compaction score
3853   v->storage_info()->ComputeCompactionScore(
3854       *column_family_data->ioptions(),
3855       *column_family_data->GetLatestMutableCFOptions());
3856 
3857   // Mark v finalized
3858   v->storage_info_.SetFinalized();
3859 
3860   // Make "v" current
3861   assert(v->refs_ == 0);
3862   Version* current = column_family_data->current();
3863   assert(v != current);
3864   if (current != nullptr) {
3865     assert(current->refs_ > 0);
3866     current->Unref();
3867   }
3868   column_family_data->SetCurrent(v);
3869   v->Ref();
3870 
3871   // Append to linked list
3872   v->prev_ = column_family_data->dummy_versions()->prev_;
3873   v->next_ = column_family_data->dummy_versions();
3874   v->prev_->next_ = v;
3875   v->next_->prev_ = v;
3876 }
3877 
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)3878 Status VersionSet::ProcessManifestWrites(
3879     std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3880     FSDirectory* db_directory, bool new_descriptor_log,
3881     const ColumnFamilyOptions* new_cf_options) {
3882   mu->AssertHeld();
3883   assert(!writers.empty());
3884   ManifestWriter& first_writer = writers.front();
3885   ManifestWriter* last_writer = &first_writer;
3886 
3887   assert(!manifest_writers_.empty());
3888   assert(manifest_writers_.front() == &first_writer);
3889 
3890   autovector<VersionEdit*> batch_edits;
3891   autovector<Version*> versions;
3892   autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
3893   std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
3894 
3895   if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3896     // No group commits for column family add or drop
3897     LogAndApplyCFHelper(first_writer.edit_list.front());
3898     batch_edits.push_back(first_writer.edit_list.front());
3899   } else {
3900     auto it = manifest_writers_.cbegin();
3901     size_t group_start = std::numeric_limits<size_t>::max();
3902     while (it != manifest_writers_.cend()) {
3903       if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3904         // no group commits for column family add or drop
3905         break;
3906       }
3907       last_writer = *(it++);
3908       assert(last_writer != nullptr);
3909       assert(last_writer->cfd != nullptr);
3910       if (last_writer->cfd->IsDropped()) {
3911         // If we detect a dropped CF at this point, and the corresponding
3912         // version edits belong to an atomic group, then we need to find out
3913         // the preceding version edits in the same atomic group, and update
3914         // their `remaining_entries_` member variable because we are NOT going
3915         // to write the version edits' of dropped CF to the MANIFEST. If we
3916         // don't update, then Recover can report corrupted atomic group because
3917         // the `remaining_entries_` do not match.
3918         if (!batch_edits.empty()) {
3919           if (batch_edits.back()->is_in_atomic_group_ &&
3920               batch_edits.back()->remaining_entries_ > 0) {
3921             assert(group_start < batch_edits.size());
3922             const auto& edit_list = last_writer->edit_list;
3923             size_t k = 0;
3924             while (k < edit_list.size()) {
3925               if (!edit_list[k]->is_in_atomic_group_) {
3926                 break;
3927               } else if (edit_list[k]->remaining_entries_ == 0) {
3928                 ++k;
3929                 break;
3930               }
3931               ++k;
3932             }
3933             for (auto i = group_start; i < batch_edits.size(); ++i) {
3934               assert(static_cast<uint32_t>(k) <=
3935                      batch_edits.back()->remaining_entries_);
3936               batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
3937             }
3938           }
3939         }
3940         continue;
3941       }
3942       // We do a linear search on versions because versions is small.
3943       // TODO(yanqin) maybe consider unordered_map
3944       Version* version = nullptr;
3945       VersionBuilder* builder = nullptr;
3946       for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
3947         uint32_t cf_id = last_writer->cfd->GetID();
3948         if (versions[i]->cfd()->GetID() == cf_id) {
3949           version = versions[i];
3950           assert(!builder_guards.empty() &&
3951                  builder_guards.size() == versions.size());
3952           builder = builder_guards[i]->version_builder();
3953           TEST_SYNC_POINT_CALLBACK(
3954               "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
3955           break;
3956         }
3957       }
3958       if (version == nullptr) {
3959         // WAL manipulations do not need to be applied to versions.
3960         if (!last_writer->IsAllWalEdits()) {
3961           version = new Version(last_writer->cfd, this, file_options_,
3962                                 last_writer->mutable_cf_options, io_tracer_,
3963                                 current_version_number_++);
3964           versions.push_back(version);
3965           mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
3966           builder_guards.emplace_back(
3967               new BaseReferencedVersionBuilder(last_writer->cfd));
3968           builder = builder_guards.back()->version_builder();
3969         }
3970         assert(last_writer->IsAllWalEdits() || builder);
3971         assert(last_writer->IsAllWalEdits() || version);
3972         TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
3973                                  version);
3974       }
3975       for (const auto& e : last_writer->edit_list) {
3976         if (e->is_in_atomic_group_) {
3977           if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
3978               (batch_edits.back()->is_in_atomic_group_ &&
3979                batch_edits.back()->remaining_entries_ == 0)) {
3980             group_start = batch_edits.size();
3981           }
3982         } else if (group_start != std::numeric_limits<size_t>::max()) {
3983           group_start = std::numeric_limits<size_t>::max();
3984         }
3985         Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3986         if (!s.ok()) {
3987           // free up the allocated memory
3988           for (auto v : versions) {
3989             delete v;
3990           }
3991           return s;
3992         }
3993         batch_edits.push_back(e);
3994       }
3995     }
3996     for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3997       assert(!builder_guards.empty() &&
3998              builder_guards.size() == versions.size());
3999       auto* builder = builder_guards[i]->version_builder();
4000       Status s = builder->SaveTo(versions[i]->storage_info());
4001       if (!s.ok()) {
4002         // free up the allocated memory
4003         for (auto v : versions) {
4004           delete v;
4005         }
4006         return s;
4007       }
4008     }
4009   }
4010 
4011 #ifndef NDEBUG
4012   // Verify that version edits of atomic groups have correct
4013   // remaining_entries_.
4014   size_t k = 0;
4015   while (k < batch_edits.size()) {
4016     while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
4017       ++k;
4018     }
4019     if (k == batch_edits.size()) {
4020       break;
4021     }
4022     size_t i = k;
4023     while (i < batch_edits.size()) {
4024       if (!batch_edits[i]->is_in_atomic_group_) {
4025         break;
4026       }
4027       assert(i - k + batch_edits[i]->remaining_entries_ ==
4028              batch_edits[k]->remaining_entries_);
4029       if (batch_edits[i]->remaining_entries_ == 0) {
4030         ++i;
4031         break;
4032       }
4033       ++i;
4034     }
4035     assert(batch_edits[i - 1]->is_in_atomic_group_);
4036     assert(0 == batch_edits[i - 1]->remaining_entries_);
4037     std::vector<VersionEdit*> tmp;
4038     for (size_t j = k; j != i; ++j) {
4039       tmp.emplace_back(batch_edits[j]);
4040     }
4041     TEST_SYNC_POINT_CALLBACK(
4042         "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
4043     k = i;
4044   }
4045 #endif  // NDEBUG
4046 
4047   assert(pending_manifest_file_number_ == 0);
4048   if (!descriptor_log_ ||
4049       manifest_file_size_ > db_options_->max_manifest_file_size) {
4050     TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
4051     new_descriptor_log = true;
4052   } else {
4053     pending_manifest_file_number_ = manifest_file_number_;
4054   }
4055 
4056   // Local cached copy of state variable(s). WriteCurrentStateToManifest()
4057   // reads its content after releasing db mutex to avoid race with
4058   // SwitchMemtable().
4059   std::unordered_map<uint32_t, MutableCFState> curr_state;
4060   VersionEdit wal_additions;
4061   if (new_descriptor_log) {
4062     pending_manifest_file_number_ = NewFileNumber();
4063     batch_edits.back()->SetNextFile(next_file_number_.load());
4064 
4065     // if we are writing out new snapshot make sure to persist max column
4066     // family.
4067     if (column_family_set_->GetMaxColumnFamily() > 0) {
4068       first_writer.edit_list.front()->SetMaxColumnFamily(
4069           column_family_set_->GetMaxColumnFamily());
4070     }
4071     for (const auto* cfd : *column_family_set_) {
4072       assert(curr_state.find(cfd->GetID()) == curr_state.end());
4073       curr_state.emplace(std::make_pair(
4074           cfd->GetID(),
4075           MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow())));
4076     }
4077 
4078     for (const auto& wal : wals_.GetWals()) {
4079       wal_additions.AddWal(wal.first, wal.second);
4080     }
4081   }
4082 
4083   uint64_t new_manifest_file_size = 0;
4084   Status s;
4085   IOStatus io_s;
4086   IOStatus manifest_io_status;
4087   {
4088     FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
4089     mu->Unlock();
4090 
4091     TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
4092     if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4093       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4094         assert(!builder_guards.empty() &&
4095                builder_guards.size() == versions.size());
4096         assert(!mutable_cf_options_ptrs.empty() &&
4097                builder_guards.size() == versions.size());
4098         ColumnFamilyData* cfd = versions[i]->cfd_;
4099         s = builder_guards[i]->version_builder()->LoadTableHandlers(
4100             cfd->internal_stats(), 1 /* max_threads */,
4101             true /* prefetch_index_and_filter_in_cache */,
4102             false /* is_initial_load */,
4103             mutable_cf_options_ptrs[i]->prefix_extractor.get(),
4104             MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
4105         if (!s.ok()) {
4106           if (db_options_->paranoid_checks) {
4107             break;
4108           }
4109           s = Status::OK();
4110         }
4111       }
4112     }
4113 
4114     if (s.ok() && new_descriptor_log) {
4115       // This is fine because everything inside of this block is serialized --
4116       // only one thread can be here at the same time
4117       // create new manifest file
4118       ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
4119                      pending_manifest_file_number_);
4120       std::string descriptor_fname =
4121           DescriptorFileName(dbname_, pending_manifest_file_number_);
4122       std::unique_ptr<FSWritableFile> descriptor_file;
4123       io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
4124                              opt_file_opts);
4125       if (io_s.ok()) {
4126         descriptor_file->SetPreallocationBlockSize(
4127             db_options_->manifest_preallocation_size);
4128         FileTypeSet tmp_set = db_options_->checksum_handoff_file_types;
4129         std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
4130             std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_,
4131             io_tracer_, nullptr, db_options_->listeners, nullptr,
4132             tmp_set.Contains(FileType::kDescriptorFile)));
4133         descriptor_log_.reset(
4134             new log::Writer(std::move(file_writer), 0, false));
4135         s = WriteCurrentStateToManifest(curr_state, wal_additions,
4136                                         descriptor_log_.get(), io_s);
4137       } else {
4138         manifest_io_status = io_s;
4139         s = io_s;
4140       }
4141     }
4142 
4143     if (s.ok()) {
4144       if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4145         for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4146           versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
4147         }
4148       }
4149 
4150       // Write new records to MANIFEST log
4151 #ifndef NDEBUG
4152       size_t idx = 0;
4153 #endif
4154       for (auto& e : batch_edits) {
4155         std::string record;
4156         if (!e->EncodeTo(&record)) {
4157           s = Status::Corruption("Unable to encode VersionEdit:" +
4158                                  e->DebugString(true));
4159           break;
4160         }
4161         TEST_KILL_RANDOM_WITH_WEIGHT("VersionSet::LogAndApply:BeforeAddRecord",
4162                                      REDUCE_ODDS2);
4163 #ifndef NDEBUG
4164         if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
4165           TEST_SYNC_POINT_CALLBACK(
4166               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
4167               nullptr);
4168           TEST_SYNC_POINT(
4169               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
4170         }
4171         ++idx;
4172 #endif /* !NDEBUG */
4173         io_s = descriptor_log_->AddRecord(record);
4174         if (!io_s.ok()) {
4175           s = io_s;
4176           manifest_io_status = io_s;
4177           break;
4178         }
4179       }
4180       if (s.ok()) {
4181         io_s = SyncManifest(db_options_, descriptor_log_->file());
4182         manifest_io_status = io_s;
4183         TEST_SYNC_POINT_CALLBACK(
4184             "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
4185       }
4186       if (!io_s.ok()) {
4187         s = io_s;
4188         ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
4189                         s.ToString().c_str());
4190       }
4191     }
4192 
4193     // If we just created a new descriptor file, install it by writing a
4194     // new CURRENT file that points to it.
4195     if (s.ok()) {
4196       assert(manifest_io_status.ok());
4197     }
4198     if (s.ok() && new_descriptor_log) {
4199       io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
4200                             db_directory);
4201       if (!io_s.ok()) {
4202         s = io_s;
4203       }
4204       TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
4205     }
4206 
4207     if (s.ok()) {
4208       // find offset in manifest file where this version is stored.
4209       new_manifest_file_size = descriptor_log_->file()->GetFileSize();
4210     }
4211 
4212     if (first_writer.edit_list.front()->is_column_family_drop_) {
4213       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
4214       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
4215       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
4216     }
4217 
4218     LogFlush(db_options_->info_log);
4219     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
4220     mu->Lock();
4221   }
4222 
4223   if (s.ok()) {
4224     // Apply WAL edits, DB mutex must be held.
4225     for (auto& e : batch_edits) {
4226       if (e->IsWalAddition()) {
4227         s = wals_.AddWals(e->GetWalAdditions());
4228       } else if (e->IsWalDeletion()) {
4229         s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
4230       }
4231       if (!s.ok()) {
4232         break;
4233       }
4234     }
4235   }
4236 
4237   if (!io_s.ok()) {
4238     if (io_status_.ok()) {
4239       io_status_ = io_s;
4240     }
4241   } else if (!io_status_.ok()) {
4242     io_status_ = io_s;
4243   }
4244 
4245   // Append the old manifest file to the obsolete_manifest_ list to be deleted
4246   // by PurgeObsoleteFiles later.
4247   if (s.ok() && new_descriptor_log) {
4248     obsolete_manifests_.emplace_back(
4249         DescriptorFileName("", manifest_file_number_));
4250   }
4251 
4252   // Install the new versions
4253   if (s.ok()) {
4254     if (first_writer.edit_list.front()->is_column_family_add_) {
4255       assert(batch_edits.size() == 1);
4256       assert(new_cf_options != nullptr);
4257       CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
4258     } else if (first_writer.edit_list.front()->is_column_family_drop_) {
4259       assert(batch_edits.size() == 1);
4260       first_writer.cfd->SetDropped();
4261       first_writer.cfd->UnrefAndTryDelete();
4262     } else {
4263       // Each version in versions corresponds to a column family.
4264       // For each column family, update its log number indicating that logs
4265       // with number smaller than this should be ignored.
4266       uint64_t last_min_log_number_to_keep = 0;
4267       for (const auto& e : batch_edits) {
4268         ColumnFamilyData* cfd = nullptr;
4269         if (!e->IsColumnFamilyManipulation()) {
4270           cfd = column_family_set_->GetColumnFamily(e->column_family_);
4271           // e would not have been added to batch_edits if its corresponding
4272           // column family is dropped.
4273           assert(cfd);
4274         }
4275         if (cfd) {
4276           if (e->has_log_number_ && e->log_number_ > cfd->GetLogNumber()) {
4277             cfd->SetLogNumber(e->log_number_);
4278           }
4279           if (e->HasFullHistoryTsLow()) {
4280             cfd->SetFullHistoryTsLow(e->GetFullHistoryTsLow());
4281           }
4282         }
4283         if (e->has_min_log_number_to_keep_) {
4284           last_min_log_number_to_keep =
4285               std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
4286         }
4287       }
4288 
4289       if (last_min_log_number_to_keep != 0) {
4290         // Should only be set in 2PC mode.
4291         MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
4292       }
4293 
4294       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4295         ColumnFamilyData* cfd = versions[i]->cfd_;
4296         AppendVersion(cfd, versions[i]);
4297       }
4298     }
4299     manifest_file_number_ = pending_manifest_file_number_;
4300     manifest_file_size_ = new_manifest_file_size;
4301     prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
4302   } else {
4303     std::string version_edits;
4304     for (auto& e : batch_edits) {
4305       version_edits += ("\n" + e->DebugString(true));
4306     }
4307     ROCKS_LOG_ERROR(db_options_->info_log,
4308                     "Error in committing version edit to MANIFEST: %s",
4309                     version_edits.c_str());
4310     for (auto v : versions) {
4311       delete v;
4312     }
4313     if (manifest_io_status.ok()) {
4314       manifest_file_number_ = pending_manifest_file_number_;
4315       manifest_file_size_ = new_manifest_file_size;
4316     }
4317     // If manifest append failed for whatever reason, the file could be
4318     // corrupted. So we need to force the next version update to start a
4319     // new manifest file.
4320     descriptor_log_.reset();
4321     // If manifest operations failed, then we know the CURRENT file still
4322     // points to the original MANIFEST. Therefore, we can safely delete the
4323     // new MANIFEST.
4324     // If manifest operations succeeded, and we are here, then it is possible
4325     // that renaming tmp file to CURRENT failed.
4326     //
4327     // On local POSIX-compliant FS, the CURRENT must point to the original
4328     // MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
4329     // keep it. Future recovery will ignore this MANIFEST. It's also ok for the
4330     // process not to crash and continue using the db. Any future LogAndApply()
4331     // call will switch to a new MANIFEST and update CURRENT, still ignoring
4332     // this one.
4333     //
4334     // On non-local FS, it is
4335     // possible that the rename operation succeeded on the server (remote)
4336     // side, but the client somehow returns a non-ok status to RocksDB. Note
4337     // that this does not violate atomicity. Should we delete the new MANIFEST
4338     // successfully, a subsequent recovery attempt will likely see the CURRENT
4339     // pointing to the new MANIFEST, thus fail. We will not be able to open the
4340     // DB again. Therefore, if manifest operations succeed, we should keep the
4341     // the new MANIFEST. If the process proceeds, any future LogAndApply() call
4342     // will switch to a new MANIFEST and update CURRENT. If user tries to
4343     // re-open the DB,
4344     // a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
4345     // b) CURRENT points to the original MANIFEST, and the original MANIFEST
4346     //    also exists.
4347     if (new_descriptor_log && !manifest_io_status.ok()) {
4348       ROCKS_LOG_INFO(db_options_->info_log,
4349                      "Deleting manifest %" PRIu64 " current manifest %" PRIu64
4350                      "\n",
4351                      pending_manifest_file_number_, manifest_file_number_);
4352       Status manifest_del_status = env_->DeleteFile(
4353           DescriptorFileName(dbname_, pending_manifest_file_number_));
4354       if (!manifest_del_status.ok()) {
4355         ROCKS_LOG_WARN(db_options_->info_log,
4356                        "Failed to delete manifest %" PRIu64 ": %s",
4357                        pending_manifest_file_number_,
4358                        manifest_del_status.ToString().c_str());
4359       }
4360     }
4361   }
4362 
4363   pending_manifest_file_number_ = 0;
4364 
4365   // wake up all the waiting writers
4366   while (true) {
4367     ManifestWriter* ready = manifest_writers_.front();
4368     manifest_writers_.pop_front();
4369     bool need_signal = true;
4370     for (const auto& w : writers) {
4371       if (&w == ready) {
4372         need_signal = false;
4373         break;
4374       }
4375     }
4376     ready->status = s;
4377     ready->done = true;
4378     if (ready->manifest_write_callback) {
4379       (ready->manifest_write_callback)(s);
4380     }
4381     if (need_signal) {
4382       ready->cv.Signal();
4383     }
4384     if (ready == last_writer) {
4385       break;
4386     }
4387   }
4388   if (!manifest_writers_.empty()) {
4389     manifest_writers_.front()->cv.Signal();
4390   }
4391   return s;
4392 }
4393 
4394 // 'datas' is grammatically incorrect. We still use this notation to indicate
4395 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options,const std::vector<std::function<void (const Status &)>> & manifest_wcbs)4396 Status VersionSet::LogAndApply(
4397     const autovector<ColumnFamilyData*>& column_family_datas,
4398     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4399     const autovector<autovector<VersionEdit*>>& edit_lists,
4400     InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4401     const ColumnFamilyOptions* new_cf_options,
4402     const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
4403   mu->AssertHeld();
4404   int num_edits = 0;
4405   for (const auto& elist : edit_lists) {
4406     num_edits += static_cast<int>(elist.size());
4407   }
4408   if (num_edits == 0) {
4409     return Status::OK();
4410   } else if (num_edits > 1) {
4411 #ifndef NDEBUG
4412     for (const auto& edit_list : edit_lists) {
4413       for (const auto& edit : edit_list) {
4414         assert(!edit->IsColumnFamilyManipulation());
4415       }
4416     }
4417 #endif /* ! NDEBUG */
4418   }
4419 
4420   int num_cfds = static_cast<int>(column_family_datas.size());
4421   if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4422     assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4423     assert(edit_lists[0][0]->is_column_family_add_);
4424     assert(new_cf_options != nullptr);
4425   }
4426   std::deque<ManifestWriter> writers;
4427   if (num_cfds > 0) {
4428     assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4429     assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4430   }
4431   for (int i = 0; i < num_cfds; ++i) {
4432     const auto wcb =
4433         manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
4434     writers.emplace_back(mu, column_family_datas[i],
4435                          *mutable_cf_options_list[i], edit_lists[i], wcb);
4436     manifest_writers_.push_back(&writers[i]);
4437   }
4438   assert(!writers.empty());
4439   ManifestWriter& first_writer = writers.front();
4440   TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
4441                            nullptr);
4442   while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4443     first_writer.cv.Wait();
4444   }
4445   if (first_writer.done) {
4446     // All non-CF-manipulation operations can be grouped together and committed
4447     // to MANIFEST. They should all have finished. The status code is stored in
4448     // the first manifest writer.
4449 #ifndef NDEBUG
4450     for (const auto& writer : writers) {
4451       assert(writer.done);
4452     }
4453     TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
4454 #endif /* !NDEBUG */
4455     return first_writer.status;
4456   }
4457 
4458   int num_undropped_cfds = 0;
4459   for (auto cfd : column_family_datas) {
4460     // if cfd == nullptr, it is a column family add.
4461     if (cfd == nullptr || !cfd->IsDropped()) {
4462       ++num_undropped_cfds;
4463     }
4464   }
4465   if (0 == num_undropped_cfds) {
4466     for (int i = 0; i != num_cfds; ++i) {
4467       manifest_writers_.pop_front();
4468     }
4469     // Notify new head of manifest write queue.
4470     if (!manifest_writers_.empty()) {
4471       manifest_writers_.front()->cv.Signal();
4472     }
4473     return Status::ColumnFamilyDropped();
4474   }
4475 
4476   return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4477                                new_cf_options);
4478 }
4479 
LogAndApplyCFHelper(VersionEdit * edit)4480 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4481   assert(edit->IsColumnFamilyManipulation());
4482   edit->SetNextFile(next_file_number_.load());
4483   // The log might have data that is not visible to memtbale and hence have not
4484   // updated the last_sequence_ yet. It is also possible that the log has is
4485   // expecting some new data that is not written yet. Since LastSequence is an
4486   // upper bound on the sequence, it is ok to record
4487   // last_allocated_sequence_ as the last sequence.
4488   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4489                                                       : last_sequence_);
4490   if (edit->is_column_family_drop_) {
4491     // if we drop column family, we have to make sure to save max column family,
4492     // so that we don't reuse existing ID
4493     edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4494   }
4495 }
4496 
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4497 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4498                                      VersionBuilder* builder, VersionEdit* edit,
4499                                      InstrumentedMutex* mu) {
4500 #ifdef NDEBUG
4501   (void)cfd;
4502 #endif
4503   mu->AssertHeld();
4504   assert(!edit->IsColumnFamilyManipulation());
4505 
4506   if (edit->has_log_number_) {
4507     assert(edit->log_number_ >= cfd->GetLogNumber());
4508     assert(edit->log_number_ < next_file_number_.load());
4509   }
4510 
4511   if (!edit->has_prev_log_number_) {
4512     edit->SetPrevLogNumber(prev_log_number_);
4513   }
4514   edit->SetNextFile(next_file_number_.load());
4515   // The log might have data that is not visible to memtbale and hence have not
4516   // updated the last_sequence_ yet. It is also possible that the log has is
4517   // expecting some new data that is not written yet. Since LastSequence is an
4518   // upper bound on the sequence, it is ok to record
4519   // last_allocated_sequence_ as the last sequence.
4520   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4521                                                       : last_sequence_);
4522 
4523   // The builder can be nullptr only if edit is WAL manipulation,
4524   // because WAL edits do not need to be applied to versions,
4525   // we return Status::OK() in this case.
4526   assert(builder || edit->IsWalManipulation());
4527   return builder ? builder->Apply(edit) : Status::OK();
4528 }
4529 
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4530 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4531                                           FileSystem* fs,
4532                                           std::string* manifest_path,
4533                                           uint64_t* manifest_file_number) {
4534   assert(fs != nullptr);
4535   assert(manifest_path != nullptr);
4536   assert(manifest_file_number != nullptr);
4537 
4538   std::string fname;
4539   Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4540   if (!s.ok()) {
4541     return s;
4542   }
4543   if (fname.empty() || fname.back() != '\n') {
4544     return Status::Corruption("CURRENT file does not end with newline");
4545   }
4546   // remove the trailing '\n'
4547   fname.resize(fname.size() - 1);
4548   FileType type;
4549   bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4550   if (!parse_ok || type != kDescriptorFile) {
4551     return Status::Corruption("CURRENT file corrupted");
4552   }
4553   *manifest_path = dbname;
4554   if (dbname.back() != '/') {
4555     manifest_path->push_back('/');
4556   }
4557   manifest_path->append(fname);
4558   return Status::OK();
4559 }
4560 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4561 Status VersionSet::Recover(
4562     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4563     std::string* db_id) {
4564   // Read "CURRENT" file, which contains a pointer to the current manifest file
4565   std::string manifest_path;
4566   Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
4567                                     &manifest_file_number_);
4568   if (!s.ok()) {
4569     return s;
4570   }
4571 
4572   ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4573                  manifest_path.c_str());
4574 
4575   std::unique_ptr<SequentialFileReader> manifest_file_reader;
4576   {
4577     std::unique_ptr<FSSequentialFile> manifest_file;
4578     s = fs_->NewSequentialFile(manifest_path,
4579                                fs_->OptimizeForManifestRead(file_options_),
4580                                &manifest_file, nullptr);
4581     if (!s.ok()) {
4582       return s;
4583     }
4584     manifest_file_reader.reset(
4585         new SequentialFileReader(std::move(manifest_file), manifest_path,
4586                                  db_options_->log_readahead_size, io_tracer_));
4587   }
4588   uint64_t current_manifest_file_size = 0;
4589   uint64_t log_number = 0;
4590   {
4591     VersionSet::LogReporter reporter;
4592     Status log_read_status;
4593     reporter.status = &log_read_status;
4594     log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4595                        true /* checksum */, 0 /* log_number */);
4596     VersionEditHandler handler(read_only, column_families,
4597                                const_cast<VersionSet*>(this),
4598                                /*track_missing_files=*/false,
4599                                /*no_error_if_files_missing=*/false, io_tracer_);
4600     handler.Iterate(reader, &log_read_status);
4601     s = handler.status();
4602     if (s.ok()) {
4603       log_number = handler.GetVersionEditParams().log_number_;
4604       current_manifest_file_size = reader.GetReadOffset();
4605       assert(current_manifest_file_size != 0);
4606       handler.GetDbId(db_id);
4607     }
4608   }
4609 
4610   if (s.ok()) {
4611     manifest_file_size_ = current_manifest_file_size;
4612     ROCKS_LOG_INFO(
4613         db_options_->info_log,
4614         "Recovered from manifest file:%s succeeded,"
4615         "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4616         ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4617         ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4618         ",min_log_number_to_keep is %" PRIu64 "\n",
4619         manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4620         last_sequence_.load(), log_number, prev_log_number_,
4621         column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
4622 
4623     for (auto cfd : *column_family_set_) {
4624       if (cfd->IsDropped()) {
4625         continue;
4626       }
4627       ROCKS_LOG_INFO(db_options_->info_log,
4628                      "Column family [%s] (ID %" PRIu32
4629                      "), log number is %" PRIu64 "\n",
4630                      cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4631     }
4632   }
4633 
4634   return s;
4635 }
4636 
4637 namespace {
4638 class ManifestPicker {
4639  public:
4640   explicit ManifestPicker(const std::string& dbname,
4641                           const std::vector<std::string>& files_in_dbname);
4642   // REQUIRES Valid() == true
4643   std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
Valid() const4644   bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
4645 
4646  private:
4647   const std::string& dbname_;
4648   // MANIFEST file names(s)
4649   std::vector<std::string> manifest_files_;
4650   std::vector<std::string>::const_iterator manifest_file_iter_;
4651 };
4652 
ManifestPicker(const std::string & dbname,const std::vector<std::string> & files_in_dbname)4653 ManifestPicker::ManifestPicker(const std::string& dbname,
4654                                const std::vector<std::string>& files_in_dbname)
4655     : dbname_(dbname) {
4656   // populate manifest files
4657   assert(!files_in_dbname.empty());
4658   for (const auto& fname : files_in_dbname) {
4659     uint64_t file_num = 0;
4660     FileType file_type;
4661     bool parse_ok = ParseFileName(fname, &file_num, &file_type);
4662     if (parse_ok && file_type == kDescriptorFile) {
4663       manifest_files_.push_back(fname);
4664     }
4665   }
4666   // seek to first manifest
4667   std::sort(manifest_files_.begin(), manifest_files_.end(),
4668             [](const std::string& lhs, const std::string& rhs) {
4669               uint64_t num1 = 0;
4670               uint64_t num2 = 0;
4671               FileType type1;
4672               FileType type2;
4673               bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
4674               bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
4675 #ifndef NDEBUG
4676               assert(parse_ok1);
4677               assert(parse_ok2);
4678 #else
4679               (void)parse_ok1;
4680               (void)parse_ok2;
4681 #endif
4682               return num1 > num2;
4683             });
4684   manifest_file_iter_ = manifest_files_.begin();
4685 }
4686 
GetNextManifest(uint64_t * number,std::string * file_name)4687 std::string ManifestPicker::GetNextManifest(uint64_t* number,
4688                                             std::string* file_name) {
4689   assert(Valid());
4690   std::string ret;
4691   if (manifest_file_iter_ != manifest_files_.end()) {
4692     ret.assign(dbname_);
4693     if (ret.back() != kFilePathSeparator) {
4694       ret.push_back(kFilePathSeparator);
4695     }
4696     ret.append(*manifest_file_iter_);
4697     if (number) {
4698       FileType type;
4699       bool parse = ParseFileName(*manifest_file_iter_, number, &type);
4700       assert(type == kDescriptorFile);
4701 #ifndef NDEBUG
4702       assert(parse);
4703 #else
4704       (void)parse;
4705 #endif
4706     }
4707     if (file_name) {
4708       *file_name = *manifest_file_iter_;
4709     }
4710     ++manifest_file_iter_;
4711   }
4712   return ret;
4713 }
4714 }  // namespace
4715 
TryRecover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,const std::vector<std::string> & files_in_dbname,std::string * db_id,bool * has_missing_table_file)4716 Status VersionSet::TryRecover(
4717     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4718     const std::vector<std::string>& files_in_dbname, std::string* db_id,
4719     bool* has_missing_table_file) {
4720   ManifestPicker manifest_picker(dbname_, files_in_dbname);
4721   if (!manifest_picker.Valid()) {
4722     return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
4723   }
4724   Status s;
4725   std::string manifest_path =
4726       manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4727   while (!manifest_path.empty()) {
4728     s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
4729                                   db_id, has_missing_table_file);
4730     if (s.ok() || !manifest_picker.Valid()) {
4731       break;
4732     }
4733     Reset();
4734     manifest_path =
4735         manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4736   }
4737   return s;
4738 }
4739 
TryRecoverFromOneManifest(const std::string & manifest_path,const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)4740 Status VersionSet::TryRecoverFromOneManifest(
4741     const std::string& manifest_path,
4742     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4743     std::string* db_id, bool* has_missing_table_file) {
4744   ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
4745                  manifest_path.c_str());
4746   std::unique_ptr<SequentialFileReader> manifest_file_reader;
4747   Status s;
4748   {
4749     std::unique_ptr<FSSequentialFile> manifest_file;
4750     s = fs_->NewSequentialFile(manifest_path,
4751                                fs_->OptimizeForManifestRead(file_options_),
4752                                &manifest_file, nullptr);
4753     if (!s.ok()) {
4754       return s;
4755     }
4756     manifest_file_reader.reset(
4757         new SequentialFileReader(std::move(manifest_file), manifest_path,
4758                                  db_options_->log_readahead_size, io_tracer_));
4759   }
4760 
4761   assert(s.ok());
4762   VersionSet::LogReporter reporter;
4763   reporter.status = &s;
4764   log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4765                      /*checksum=*/true, /*log_num=*/0);
4766   VersionEditHandlerPointInTime handler_pit(
4767       read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
4768 
4769   handler_pit.Iterate(reader, &s);
4770 
4771   handler_pit.GetDbId(db_id);
4772 
4773   assert(nullptr != has_missing_table_file);
4774   *has_missing_table_file = handler_pit.HasMissingFiles();
4775 
4776   return handler_pit.status();
4777 }
4778 
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)4779 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4780                                       const std::string& dbname,
4781                                       FileSystem* fs) {
4782   // these are just for performance reasons, not correctness,
4783   // so we're fine using the defaults
4784   FileOptions soptions;
4785   // Read "CURRENT" file, which contains a pointer to the current manifest file
4786   std::string manifest_path;
4787   uint64_t manifest_file_number;
4788   Status s =
4789       GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
4790   if (!s.ok()) {
4791     return s;
4792   }
4793 
4794   std::unique_ptr<SequentialFileReader> file_reader;
4795   {
4796     std::unique_ptr<FSSequentialFile> file;
4797     s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4798     if (!s.ok()) {
4799       return s;
4800   }
4801   file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
4802                                              nullptr /*IOTracer*/));
4803   }
4804 
4805   VersionSet::LogReporter reporter;
4806   reporter.status = &s;
4807   log::Reader reader(nullptr, std::move(file_reader), &reporter,
4808                      true /* checksum */, 0 /* log_number */);
4809 
4810   ListColumnFamiliesHandler handler;
4811   handler.Iterate(reader, &s);
4812 
4813   assert(column_families);
4814   column_families->clear();
4815   if (handler.status().ok()) {
4816     for (const auto& iter : handler.GetColumnFamilyNames()) {
4817       column_families->push_back(iter.second);
4818     }
4819   }
4820 
4821   return handler.status();
4822 }
4823 
4824 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)4825 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
4826                                         const Options* options,
4827                                         const FileOptions& file_options,
4828                                         int new_levels) {
4829   if (new_levels <= 1) {
4830     return Status::InvalidArgument(
4831         "Number of levels needs to be bigger than 1");
4832   }
4833 
4834   ImmutableDBOptions db_options(*options);
4835   ColumnFamilyOptions cf_options(*options);
4836   std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
4837                                         options->table_cache_numshardbits));
4838   WriteController wc(options->delayed_write_rate);
4839   WriteBufferManager wb(options->db_write_buffer_size);
4840   VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4841                       nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
4842   Status status;
4843 
4844   std::vector<ColumnFamilyDescriptor> dummy;
4845   ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
4846                                           ColumnFamilyOptions(*options));
4847   dummy.push_back(dummy_descriptor);
4848   status = versions.Recover(dummy);
4849   if (!status.ok()) {
4850     return status;
4851   }
4852 
4853   Version* current_version =
4854       versions.GetColumnFamilySet()->GetDefault()->current();
4855   auto* vstorage = current_version->storage_info();
4856   int current_levels = vstorage->num_levels();
4857 
4858   if (current_levels <= new_levels) {
4859     return Status::OK();
4860   }
4861 
4862   // Make sure there are file only on one level from
4863   // (new_levels-1) to (current_levels-1)
4864   int first_nonempty_level = -1;
4865   int first_nonempty_level_filenum = 0;
4866   for (int i = new_levels - 1; i < current_levels; i++) {
4867     int file_num = vstorage->NumLevelFiles(i);
4868     if (file_num != 0) {
4869       if (first_nonempty_level < 0) {
4870         first_nonempty_level = i;
4871         first_nonempty_level_filenum = file_num;
4872       } else {
4873         char msg[255];
4874         snprintf(msg, sizeof(msg),
4875                  "Found at least two levels containing files: "
4876                  "[%d:%d],[%d:%d].\n",
4877                  first_nonempty_level, first_nonempty_level_filenum, i,
4878                  file_num);
4879         return Status::InvalidArgument(msg);
4880       }
4881     }
4882   }
4883 
4884   // we need to allocate an array with the old number of levels size to
4885   // avoid SIGSEGV in WriteCurrentStatetoManifest()
4886   // however, all levels bigger or equal to new_levels will be empty
4887   std::vector<FileMetaData*>* new_files_list =
4888       new std::vector<FileMetaData*>[current_levels];
4889   for (int i = 0; i < new_levels - 1; i++) {
4890     new_files_list[i] = vstorage->LevelFiles(i);
4891   }
4892 
4893   if (first_nonempty_level > 0) {
4894     auto& new_last_level = new_files_list[new_levels - 1];
4895 
4896     new_last_level = vstorage->LevelFiles(first_nonempty_level);
4897 
4898     for (size_t i = 0; i < new_last_level.size(); ++i) {
4899       const FileMetaData* const meta = new_last_level[i];
4900       assert(meta);
4901 
4902       const uint64_t file_number = meta->fd.GetNumber();
4903 
4904       vstorage->file_locations_[file_number] =
4905           VersionStorageInfo::FileLocation(new_levels - 1, i);
4906     }
4907   }
4908 
4909   delete[] vstorage -> files_;
4910   vstorage->files_ = new_files_list;
4911   vstorage->num_levels_ = new_levels;
4912 
4913   MutableCFOptions mutable_cf_options(*options);
4914   VersionEdit ve;
4915   InstrumentedMutex dummy_mutex;
4916   InstrumentedMutexLock l(&dummy_mutex);
4917   return versions.LogAndApply(
4918       versions.GetColumnFamilySet()->GetDefault(),
4919       mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
4920 }
4921 
4922 // Get the checksum information including the checksum and checksum function
4923 // name of all SST and blob files in VersionSet. Store the information in
4924 // FileChecksumList which contains a map from file number to its checksum info.
4925 // If DB is not running, make sure call VersionSet::Recover() to load the file
4926 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)4927 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
4928   // Clean the previously stored checksum information if any.
4929   Status s;
4930   if (checksum_list == nullptr) {
4931     s = Status::InvalidArgument("checksum_list is nullptr");
4932     return s;
4933   }
4934   checksum_list->reset();
4935 
4936   for (auto cfd : *column_family_set_) {
4937     if (cfd->IsDropped() || !cfd->initialized()) {
4938       continue;
4939     }
4940     /* SST files */
4941     for (int level = 0; level < cfd->NumberLevels(); level++) {
4942       for (const auto& file :
4943            cfd->current()->storage_info()->LevelFiles(level)) {
4944         s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
4945                                                  file->file_checksum,
4946                                                  file->file_checksum_func_name);
4947         if (!s.ok()) {
4948           return s;
4949         }
4950       }
4951     }
4952 
4953     /* Blob files */
4954     const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
4955     for (const auto& pair : blob_files) {
4956       const uint64_t blob_file_number = pair.first;
4957       const auto& meta = pair.second;
4958 
4959       assert(meta);
4960       assert(blob_file_number == meta->GetBlobFileNumber());
4961 
4962       std::string checksum_value = meta->GetChecksumValue();
4963       std::string checksum_method = meta->GetChecksumMethod();
4964       assert(checksum_value.empty() == checksum_method.empty());
4965       if (meta->GetChecksumMethod().empty()) {
4966         checksum_value = kUnknownFileChecksum;
4967         checksum_method = kUnknownFileChecksumFuncName;
4968       }
4969 
4970       s = checksum_list->InsertOneFileChecksum(blob_file_number, checksum_value,
4971                                                checksum_method);
4972       if (!s.ok()) {
4973         return s;
4974       }
4975     }
4976   }
4977 
4978   return s;
4979 }
4980 
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)4981 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4982                                 bool verbose, bool hex, bool json) {
4983   // Open the specified manifest file.
4984   std::unique_ptr<SequentialFileReader> file_reader;
4985   Status s;
4986   {
4987     std::unique_ptr<FSSequentialFile> file;
4988     const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
4989     s = fs->NewSequentialFile(
4990         dscname,
4991         fs->OptimizeForManifestRead(file_options_), &file,
4992         nullptr);
4993     if (!s.ok()) {
4994       return s;
4995     }
4996     file_reader.reset(new SequentialFileReader(
4997         std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
4998   }
4999 
5000   std::vector<ColumnFamilyDescriptor> column_families(
5001       1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
5002   DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
5003                               json);
5004   {
5005     VersionSet::LogReporter reporter;
5006     reporter.status = &s;
5007     log::Reader reader(nullptr, std::move(file_reader), &reporter,
5008                        true /* checksum */, 0 /* log_number */);
5009     handler.Iterate(reader, &s);
5010   }
5011 
5012   return handler.status();
5013 }
5014 #endif  // ROCKSDB_LITE
5015 
MarkFileNumberUsed(uint64_t number)5016 void VersionSet::MarkFileNumberUsed(uint64_t number) {
5017   // only called during recovery and repair which are single threaded, so this
5018   // works because there can't be concurrent calls
5019   if (next_file_number_.load(std::memory_order_relaxed) <= number) {
5020     next_file_number_.store(number + 1, std::memory_order_relaxed);
5021   }
5022 }
5023 // Called only either from ::LogAndApply which is protected by mutex or during
5024 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)5025 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
5026   if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
5027     min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
5028   }
5029 }
5030 
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,const VersionEdit & wal_additions,log::Writer * log,IOStatus & io_s)5031 Status VersionSet::WriteCurrentStateToManifest(
5032     const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5033     const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
5034   // TODO: Break up into multiple records to reduce memory usage on recovery?
5035 
5036   // WARNING: This method doesn't hold a mutex!!
5037 
5038   // This is done without DB mutex lock held, but only within single-threaded
5039   // LogAndApply. Column family manipulations can only happen within LogAndApply
5040   // (the same single thread), so we're safe to iterate.
5041 
5042   assert(io_s.ok());
5043   if (db_options_->write_dbid_to_manifest) {
5044     VersionEdit edit_for_db_id;
5045     assert(!db_id_.empty());
5046     edit_for_db_id.SetDBId(db_id_);
5047     std::string db_id_record;
5048     if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5049       return Status::Corruption("Unable to Encode VersionEdit:" +
5050                                 edit_for_db_id.DebugString(true));
5051     }
5052     io_s = log->AddRecord(db_id_record);
5053     if (!io_s.ok()) {
5054       return io_s;
5055     }
5056   }
5057 
5058   // Save WALs.
5059   if (!wal_additions.GetWalAdditions().empty()) {
5060     TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
5061                              const_cast<VersionEdit*>(&wal_additions));
5062     std::string record;
5063     if (!wal_additions.EncodeTo(&record)) {
5064       return Status::Corruption("Unable to Encode VersionEdit: " +
5065                                 wal_additions.DebugString(true));
5066     }
5067     io_s = log->AddRecord(record);
5068     if (!io_s.ok()) {
5069       return io_s;
5070     }
5071   }
5072 
5073   for (auto cfd : *column_family_set_) {
5074     assert(cfd);
5075 
5076     if (cfd->IsDropped()) {
5077       continue;
5078     }
5079     assert(cfd->initialized());
5080     {
5081       // Store column family info
5082       VersionEdit edit;
5083       if (cfd->GetID() != 0) {
5084         // default column family is always there,
5085         // no need to explicitly write it
5086         edit.AddColumnFamily(cfd->GetName());
5087         edit.SetColumnFamily(cfd->GetID());
5088       }
5089       edit.SetComparatorName(
5090           cfd->internal_comparator().user_comparator()->Name());
5091       std::string record;
5092       if (!edit.EncodeTo(&record)) {
5093         return Status::Corruption(
5094             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5095       }
5096       io_s = log->AddRecord(record);
5097       if (!io_s.ok()) {
5098         return io_s;
5099       }
5100     }
5101 
5102     {
5103       // Save files
5104       VersionEdit edit;
5105       edit.SetColumnFamily(cfd->GetID());
5106 
5107       assert(cfd->current());
5108       assert(cfd->current()->storage_info());
5109 
5110       for (int level = 0; level < cfd->NumberLevels(); level++) {
5111         for (const auto& f :
5112              cfd->current()->storage_info()->LevelFiles(level)) {
5113           edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5114                        f->fd.GetFileSize(), f->smallest, f->largest,
5115                        f->fd.smallest_seqno, f->fd.largest_seqno,
5116                        f->marked_for_compaction, f->oldest_blob_file_number,
5117                        f->oldest_ancester_time, f->file_creation_time,
5118                        f->file_checksum, f->file_checksum_func_name);
5119         }
5120       }
5121 
5122       const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
5123       for (const auto& pair : blob_files) {
5124         const uint64_t blob_file_number = pair.first;
5125         const auto& meta = pair.second;
5126 
5127         assert(meta);
5128         assert(blob_file_number == meta->GetBlobFileNumber());
5129 
5130         edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
5131                          meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
5132                          meta->GetChecksumValue());
5133         if (meta->GetGarbageBlobCount() > 0) {
5134           edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
5135                                   meta->GetGarbageBlobBytes());
5136         }
5137       }
5138 
5139       const auto iter = curr_state.find(cfd->GetID());
5140       assert(iter != curr_state.end());
5141       uint64_t log_number = iter->second.log_number;
5142       edit.SetLogNumber(log_number);
5143 
5144       if (cfd->GetID() == 0) {
5145         // min_log_number_to_keep is for the whole db, not for specific column family.
5146         // So it does not need to be set for every column family, just need to be set once.
5147         // Since default CF can never be dropped, we set the min_log to the default CF here.
5148         uint64_t min_log = min_log_number_to_keep_2pc();
5149         if (min_log != 0) {
5150           edit.SetMinLogNumberToKeep(min_log);
5151         }
5152       }
5153 
5154       const std::string& full_history_ts_low = iter->second.full_history_ts_low;
5155       if (!full_history_ts_low.empty()) {
5156         edit.SetFullHistoryTsLow(full_history_ts_low);
5157       }
5158       std::string record;
5159       if (!edit.EncodeTo(&record)) {
5160         return Status::Corruption(
5161             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5162       }
5163       io_s = log->AddRecord(record);
5164       if (!io_s.ok()) {
5165         return io_s;
5166       }
5167     }
5168   }
5169   return Status::OK();
5170 }
5171 
5172 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5173 // function is called repeatedly with consecutive pairs of slices. For example
5174 // if the slice list is [a, b, c, d] this function is called with arguments
5175 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5176 // we avoid doing binary search for the keys b and c twice and instead somehow
5177 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5178 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5179                                      Version* v, const Slice& start,
5180                                      const Slice& end, int start_level,
5181                                      int end_level, TableReaderCaller caller) {
5182   const auto& icmp = v->cfd_->internal_comparator();
5183 
5184   // pre-condition
5185   assert(icmp.Compare(start, end) <= 0);
5186 
5187   uint64_t total_full_size = 0;
5188   const auto* vstorage = v->storage_info();
5189   const int num_non_empty_levels = vstorage->num_non_empty_levels();
5190   end_level = (end_level == -1) ? num_non_empty_levels
5191                                 : std::min(end_level, num_non_empty_levels);
5192 
5193   assert(start_level <= end_level);
5194 
5195   // Outline of the optimization that uses options.files_size_error_margin.
5196   // When approximating the files total size that is used to store a keys range,
5197   // we first sum up the sizes of the files that fully fall into the range.
5198   // Then we sum up the sizes of all the files that may intersect with the range
5199   // (this includes all files in L0 as well). Then, if total_intersecting_size
5200   // is smaller than total_full_size * options.files_size_error_margin - we can
5201   // infer that the intersecting files have a sufficiently negligible
5202   // contribution to the total size, and we can approximate the storage required
5203   // for the keys in range as just half of the intersecting_files_size.
5204   // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5205   // approximation is limited to only ~10% of the total size of files that fully
5206   // fall into the keys range. In such case, this helps to avoid a costly
5207   // process of binary searching the intersecting files that is required only
5208   // for a more precise calculation of the total size.
5209 
5210   autovector<FdWithKeyRange*, 32> first_files;
5211   autovector<FdWithKeyRange*, 16> last_files;
5212 
5213   // scan all the levels
5214   for (int level = start_level; level < end_level; ++level) {
5215     const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5216     if (files_brief.num_files == 0) {
5217       // empty level, skip exploration
5218       continue;
5219     }
5220 
5221     if (level == 0) {
5222       // level 0 files are not in sorted order, we need to iterate through
5223       // the list to compute the total bytes that require scanning,
5224       // so handle the case explicitly (similarly to first_files case)
5225       for (size_t i = 0; i < files_brief.num_files; i++) {
5226         first_files.push_back(&files_brief.files[i]);
5227       }
5228       continue;
5229     }
5230 
5231     assert(level > 0);
5232     assert(files_brief.num_files > 0);
5233 
5234     // identify the file position for start key
5235     const int idx_start =
5236         FindFileInRange(icmp, files_brief, start, 0,
5237                         static_cast<uint32_t>(files_brief.num_files - 1));
5238     assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5239 
5240     // identify the file position for end key
5241     int idx_end = idx_start;
5242     if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5243       idx_end =
5244           FindFileInRange(icmp, files_brief, end, idx_start,
5245                           static_cast<uint32_t>(files_brief.num_files - 1));
5246     }
5247     assert(idx_end >= idx_start &&
5248            static_cast<size_t>(idx_end) < files_brief.num_files);
5249 
5250     // scan all files from the starting index to the ending index
5251     // (inferred from the sorted order)
5252 
5253     // first scan all the intermediate full files (excluding first and last)
5254     for (int i = idx_start + 1; i < idx_end; ++i) {
5255       uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5256       // The entire file falls into the range, so we can just take its size.
5257       assert(file_size ==
5258              ApproximateSize(v, files_brief.files[i], start, end, caller));
5259       total_full_size += file_size;
5260     }
5261 
5262     // save the first and the last files (which may be the same file), so we
5263     // can scan them later.
5264     first_files.push_back(&files_brief.files[idx_start]);
5265     if (idx_start != idx_end) {
5266       // we need to estimate size for both files, only if they are different
5267       last_files.push_back(&files_brief.files[idx_end]);
5268     }
5269   }
5270 
5271   // The sum of all file sizes that intersect the [start, end] keys range.
5272   uint64_t total_intersecting_size = 0;
5273   for (const auto* file_ptr : first_files) {
5274     total_intersecting_size += file_ptr->fd.GetFileSize();
5275   }
5276   for (const auto* file_ptr : last_files) {
5277     total_intersecting_size += file_ptr->fd.GetFileSize();
5278   }
5279 
5280   // Now scan all the first & last files at each level, and estimate their size.
5281   // If the total_intersecting_size is less than X% of the total_full_size - we
5282   // want to approximate the result in order to avoid the costly binary search
5283   // inside ApproximateSize. We use half of file size as an approximation below.
5284 
5285   const double margin = options.files_size_error_margin;
5286   if (margin > 0 && total_intersecting_size <
5287                         static_cast<uint64_t>(total_full_size * margin)) {
5288     total_full_size += total_intersecting_size / 2;
5289   } else {
5290     // Estimate for all the first files (might also be last files), at each
5291     // level
5292     for (const auto file_ptr : first_files) {
5293       total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5294     }
5295 
5296     // Estimate for all the last files, at each level
5297     for (const auto file_ptr : last_files) {
5298       // We could use ApproximateSize here, but calling ApproximateOffsetOf
5299       // directly is just more efficient.
5300       total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5301     }
5302   }
5303 
5304   return total_full_size;
5305 }
5306 
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5307 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5308                                          const Slice& key,
5309                                          TableReaderCaller caller) {
5310   // pre-condition
5311   assert(v);
5312   const auto& icmp = v->cfd_->internal_comparator();
5313 
5314   uint64_t result = 0;
5315   if (icmp.Compare(f.largest_key, key) <= 0) {
5316     // Entire file is before "key", so just add the file size
5317     result = f.fd.GetFileSize();
5318   } else if (icmp.Compare(f.smallest_key, key) > 0) {
5319     // Entire file is after "key", so ignore
5320     result = 0;
5321   } else {
5322     // "key" falls in the range for this table.  Add the
5323     // approximate offset of "key" within the table.
5324     TableCache* table_cache = v->cfd_->table_cache();
5325     if (table_cache != nullptr) {
5326       result = table_cache->ApproximateOffsetOf(
5327           key, f.file_metadata->fd, caller, icmp,
5328           v->GetMutableCFOptions().prefix_extractor.get());
5329     }
5330   }
5331   return result;
5332 }
5333 
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5334 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5335                                      const Slice& start, const Slice& end,
5336                                      TableReaderCaller caller) {
5337   // pre-condition
5338   assert(v);
5339   const auto& icmp = v->cfd_->internal_comparator();
5340   assert(icmp.Compare(start, end) <= 0);
5341 
5342   if (icmp.Compare(f.largest_key, start) <= 0 ||
5343       icmp.Compare(f.smallest_key, end) > 0) {
5344     // Entire file is before or after the start/end keys range
5345     return 0;
5346   }
5347 
5348   if (icmp.Compare(f.smallest_key, start) >= 0) {
5349     // Start of the range is before the file start - approximate by end offset
5350     return ApproximateOffsetOf(v, f, end, caller);
5351   }
5352 
5353   if (icmp.Compare(f.largest_key, end) < 0) {
5354     // End of the range is after the file end - approximate by subtracting
5355     // start offset from the file size
5356     uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5357     assert(f.fd.GetFileSize() >= start_offset);
5358     return f.fd.GetFileSize() - start_offset;
5359   }
5360 
5361   // The interval falls entirely in the range for this file.
5362   TableCache* table_cache = v->cfd_->table_cache();
5363   if (table_cache == nullptr) {
5364     return 0;
5365   }
5366   return table_cache->ApproximateSize(
5367       start, end, f.file_metadata->fd, caller, icmp,
5368       v->GetMutableCFOptions().prefix_extractor.get());
5369 }
5370 
AddLiveFiles(std::vector<uint64_t> * live_table_files,std::vector<uint64_t> * live_blob_files) const5371 void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
5372                               std::vector<uint64_t>* live_blob_files) const {
5373   assert(live_table_files);
5374   assert(live_blob_files);
5375 
5376   // pre-calculate space requirement
5377   size_t total_table_files = 0;
5378   size_t total_blob_files = 0;
5379 
5380   assert(column_family_set_);
5381   for (auto cfd : *column_family_set_) {
5382     assert(cfd);
5383 
5384     if (!cfd->initialized()) {
5385       continue;
5386     }
5387 
5388     Version* const dummy_versions = cfd->dummy_versions();
5389     assert(dummy_versions);
5390 
5391     for (Version* v = dummy_versions->next_; v != dummy_versions;
5392          v = v->next_) {
5393       assert(v);
5394 
5395       const auto* vstorage = v->storage_info();
5396       assert(vstorage);
5397 
5398       for (int level = 0; level < vstorage->num_levels(); ++level) {
5399         total_table_files += vstorage->LevelFiles(level).size();
5400       }
5401 
5402       total_blob_files += vstorage->GetBlobFiles().size();
5403     }
5404   }
5405 
5406   // just one time extension to the right size
5407   live_table_files->reserve(live_table_files->size() + total_table_files);
5408   live_blob_files->reserve(live_blob_files->size() + total_blob_files);
5409 
5410   assert(column_family_set_);
5411   for (auto cfd : *column_family_set_) {
5412     assert(cfd);
5413     if (!cfd->initialized()) {
5414       continue;
5415     }
5416 
5417     auto* current = cfd->current();
5418     bool found_current = false;
5419 
5420     Version* const dummy_versions = cfd->dummy_versions();
5421     assert(dummy_versions);
5422 
5423     for (Version* v = dummy_versions->next_; v != dummy_versions;
5424          v = v->next_) {
5425       v->AddLiveFiles(live_table_files, live_blob_files);
5426       if (v == current) {
5427         found_current = true;
5428       }
5429     }
5430 
5431     if (!found_current && current != nullptr) {
5432       // Should never happen unless it is a bug.
5433       assert(false);
5434       current->AddLiveFiles(live_table_files, live_blob_files);
5435     }
5436   }
5437 }
5438 
MakeInputIterator(const ReadOptions & read_options,const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5439 InternalIterator* VersionSet::MakeInputIterator(
5440     const ReadOptions& read_options, const Compaction* c,
5441     RangeDelAggregator* range_del_agg,
5442     const FileOptions& file_options_compactions) {
5443   auto cfd = c->column_family_data();
5444   // Level-0 files have to be merged together.  For other levels,
5445   // we will make a concatenating iterator per level.
5446   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5447   const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5448                                               c->num_input_levels() - 1
5449                                         : c->num_input_levels());
5450   InternalIterator** list = new InternalIterator* [space];
5451   size_t num = 0;
5452   for (size_t which = 0; which < c->num_input_levels(); which++) {
5453     if (c->input_levels(which)->num_files != 0) {
5454       if (c->level(which) == 0) {
5455         const LevelFilesBrief* flevel = c->input_levels(which);
5456         for (size_t i = 0; i < flevel->num_files; i++) {
5457           list[num++] = cfd->table_cache()->NewIterator(
5458               read_options, file_options_compactions,
5459               cfd->internal_comparator(), *flevel->files[i].file_metadata,
5460               range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
5461               /*table_reader_ptr=*/nullptr,
5462               /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5463               /*arena=*/nullptr,
5464               /*skip_filters=*/false,
5465               /*level=*/static_cast<int>(c->level(which)),
5466               MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
5467               /*smallest_compaction_key=*/nullptr,
5468               /*largest_compaction_key=*/nullptr,
5469               /*allow_unprepared_value=*/false);
5470         }
5471       } else {
5472         // Create concatenating iterator for the files from this level
5473         list[num++] = new LevelIterator(
5474             cfd->table_cache(), read_options, file_options_compactions,
5475             cfd->internal_comparator(), c->input_levels(which),
5476             c->mutable_cf_options()->prefix_extractor.get(),
5477             /*should_sample=*/false,
5478             /*no per level latency histogram=*/nullptr,
5479             TableReaderCaller::kCompaction, /*skip_filters=*/false,
5480             /*level=*/static_cast<int>(c->level(which)), range_del_agg,
5481             c->boundaries(which));
5482       }
5483     }
5484   }
5485   assert(num <= space);
5486   InternalIterator* result =
5487       NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5488                          static_cast<int>(num));
5489   delete[] list;
5490   return result;
5491 }
5492 
5493 // verify that the files listed in this compaction are present
5494 // in the current version
VerifyCompactionFileConsistency(Compaction * c)5495 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5496 #ifndef NDEBUG
5497   Version* version = c->column_family_data()->current();
5498   const VersionStorageInfo* vstorage = version->storage_info();
5499   if (c->input_version() != version) {
5500     ROCKS_LOG_INFO(
5501         db_options_->info_log,
5502         "[%s] compaction output being applied to a different base version from"
5503         " input version",
5504         c->column_family_data()->GetName().c_str());
5505   }
5506 
5507   for (size_t input = 0; input < c->num_input_levels(); ++input) {
5508     int level = c->level(input);
5509     for (size_t i = 0; i < c->num_input_files(input); ++i) {
5510       uint64_t number = c->input(input, i)->fd.GetNumber();
5511       bool found = false;
5512       for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5513         FileMetaData* f = vstorage->files_[level][j];
5514         if (f->fd.GetNumber() == number) {
5515           found = true;
5516           break;
5517         }
5518       }
5519       if (!found) {
5520         return false;  // input files non existent in current version
5521       }
5522     }
5523   }
5524 #else
5525   (void)c;
5526 #endif
5527   return true;     // everything good
5528 }
5529 
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5530 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5531                                       FileMetaData** meta,
5532                                       ColumnFamilyData** cfd) {
5533   for (auto cfd_iter : *column_family_set_) {
5534     if (!cfd_iter->initialized()) {
5535       continue;
5536     }
5537     Version* version = cfd_iter->current();
5538     const auto* vstorage = version->storage_info();
5539     for (int level = 0; level < vstorage->num_levels(); level++) {
5540       for (const auto& file : vstorage->LevelFiles(level)) {
5541         if (file->fd.GetNumber() == number) {
5542           *meta = file;
5543           *filelevel = level;
5544           *cfd = cfd_iter;
5545           return Status::OK();
5546         }
5547       }
5548     }
5549   }
5550   return Status::NotFound("File not present in any level");
5551 }
5552 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5553 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5554   for (auto cfd : *column_family_set_) {
5555     if (cfd->IsDropped() || !cfd->initialized()) {
5556       continue;
5557     }
5558     for (int level = 0; level < cfd->NumberLevels(); level++) {
5559       for (const auto& file :
5560            cfd->current()->storage_info()->LevelFiles(level)) {
5561         LiveFileMetaData filemetadata;
5562         filemetadata.column_family_name = cfd->GetName();
5563         uint32_t path_id = file->fd.GetPathId();
5564         if (path_id < cfd->ioptions()->cf_paths.size()) {
5565           filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5566         } else {
5567           assert(!cfd->ioptions()->cf_paths.empty());
5568           filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5569         }
5570         const uint64_t file_number = file->fd.GetNumber();
5571         filemetadata.name = MakeTableFileName("", file_number);
5572         filemetadata.file_number = file_number;
5573         filemetadata.level = level;
5574         filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5575         filemetadata.smallestkey = file->smallest.user_key().ToString();
5576         filemetadata.largestkey = file->largest.user_key().ToString();
5577         filemetadata.smallest_seqno = file->fd.smallest_seqno;
5578         filemetadata.largest_seqno = file->fd.largest_seqno;
5579         filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5580             std::memory_order_relaxed);
5581         filemetadata.being_compacted = file->being_compacted;
5582         filemetadata.num_entries = file->num_entries;
5583         filemetadata.num_deletions = file->num_deletions;
5584         filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5585         filemetadata.file_checksum = file->file_checksum;
5586         filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5587         metadata->push_back(filemetadata);
5588       }
5589     }
5590   }
5591 }
5592 
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<ObsoleteBlobFileInfo> * blob_files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5593 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5594                                   std::vector<ObsoleteBlobFileInfo>* blob_files,
5595                                   std::vector<std::string>* manifest_filenames,
5596                                   uint64_t min_pending_output) {
5597   assert(files);
5598   assert(blob_files);
5599   assert(manifest_filenames);
5600   assert(files->empty());
5601   assert(blob_files->empty());
5602   assert(manifest_filenames->empty());
5603 
5604   std::vector<ObsoleteFileInfo> pending_files;
5605   for (auto& f : obsolete_files_) {
5606     if (f.metadata->fd.GetNumber() < min_pending_output) {
5607       files->emplace_back(std::move(f));
5608     } else {
5609       pending_files.emplace_back(std::move(f));
5610     }
5611   }
5612   obsolete_files_.swap(pending_files);
5613 
5614   std::vector<ObsoleteBlobFileInfo> pending_blob_files;
5615   for (auto& blob_file : obsolete_blob_files_) {
5616     if (blob_file.GetBlobFileNumber() < min_pending_output) {
5617       blob_files->emplace_back(std::move(blob_file));
5618     } else {
5619       pending_blob_files.emplace_back(std::move(blob_file));
5620     }
5621   }
5622   obsolete_blob_files_.swap(pending_blob_files);
5623 
5624   obsolete_manifests_.swap(*manifest_filenames);
5625 }
5626 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const VersionEdit * edit)5627 ColumnFamilyData* VersionSet::CreateColumnFamily(
5628     const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
5629   assert(edit->is_column_family_add_);
5630 
5631   MutableCFOptions dummy_cf_options;
5632   Version* dummy_versions =
5633       new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
5634   // Ref() dummy version once so that later we can call Unref() to delete it
5635   // by avoiding calling "delete" explicitly (~Version is private)
5636   dummy_versions->Ref();
5637   auto new_cfd = column_family_set_->CreateColumnFamily(
5638       edit->column_family_name_, edit->column_family_, dummy_versions,
5639       cf_options);
5640 
5641   Version* v = new Version(new_cfd, this, file_options_,
5642                            *new_cfd->GetLatestMutableCFOptions(), io_tracer_,
5643                            current_version_number_++);
5644 
5645   // Fill level target base information.
5646   v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5647                                         *new_cfd->GetLatestMutableCFOptions());
5648   AppendVersion(new_cfd, v);
5649   // GetLatestMutableCFOptions() is safe here without mutex since the
5650   // cfd is not available to client
5651   new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5652                              LastSequence());
5653   new_cfd->SetLogNumber(edit->log_number_);
5654   return new_cfd;
5655 }
5656 
GetNumLiveVersions(Version * dummy_versions)5657 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5658   uint64_t count = 0;
5659   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5660     count++;
5661   }
5662   return count;
5663 }
5664 
GetTotalSstFilesSize(Version * dummy_versions)5665 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5666   std::unordered_set<uint64_t> unique_files;
5667   uint64_t total_files_size = 0;
5668   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5669     VersionStorageInfo* storage_info = v->storage_info();
5670     for (int level = 0; level < storage_info->num_levels_; level++) {
5671       for (const auto& file_meta : storage_info->LevelFiles(level)) {
5672         if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5673             unique_files.end()) {
5674           unique_files.insert(file_meta->fd.packed_number_and_path_id);
5675           total_files_size += file_meta->fd.GetFileSize();
5676         }
5677       }
5678     }
5679   }
5680   return total_files_size;
5681 }
5682 
VerifyFileMetadata(const std::string & fpath,const FileMetaData & meta) const5683 Status VersionSet::VerifyFileMetadata(const std::string& fpath,
5684                                       const FileMetaData& meta) const {
5685   uint64_t fsize = 0;
5686   Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
5687   if (status.ok()) {
5688     if (fsize != meta.fd.GetFileSize()) {
5689       status = Status::Corruption("File size mismatch: " + fpath);
5690     }
5691   }
5692   return status;
5693 }
5694 
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,const std::shared_ptr<IOTracer> & io_tracer)5695 ReactiveVersionSet::ReactiveVersionSet(
5696     const std::string& dbname, const ImmutableDBOptions* _db_options,
5697     const FileOptions& _file_options, Cache* table_cache,
5698     WriteBufferManager* write_buffer_manager, WriteController* write_controller,
5699     const std::shared_ptr<IOTracer>& io_tracer)
5700     : VersionSet(dbname, _db_options, _file_options, table_cache,
5701                  write_buffer_manager, write_controller,
5702                  /*block_cache_tracer=*/nullptr, io_tracer) {}
5703 
~ReactiveVersionSet()5704 ReactiveVersionSet::~ReactiveVersionSet() {}
5705 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5706 Status ReactiveVersionSet::Recover(
5707     const std::vector<ColumnFamilyDescriptor>& column_families,
5708     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5709     std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5710     std::unique_ptr<Status>* manifest_reader_status) {
5711   assert(manifest_reader != nullptr);
5712   assert(manifest_reporter != nullptr);
5713   assert(manifest_reader_status != nullptr);
5714 
5715   manifest_reader_status->reset(new Status());
5716   manifest_reporter->reset(new LogReporter());
5717   static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
5718       manifest_reader_status->get();
5719   Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5720   log::Reader* reader = manifest_reader->get();
5721   assert(reader);
5722 
5723   manifest_tailer_.reset(new ManifestTailer(
5724       column_families, const_cast<ReactiveVersionSet*>(this), io_tracer_));
5725 
5726   manifest_tailer_->Iterate(*reader, manifest_reader_status->get());
5727 
5728   return manifest_tailer_->status();
5729 }
5730 
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,Status * manifest_read_status,std::unordered_set<ColumnFamilyData * > * cfds_changed)5731 Status ReactiveVersionSet::ReadAndApply(
5732     InstrumentedMutex* mu,
5733     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5734     Status* manifest_read_status,
5735     std::unordered_set<ColumnFamilyData*>* cfds_changed) {
5736   assert(manifest_reader != nullptr);
5737   assert(cfds_changed != nullptr);
5738   mu->AssertHeld();
5739 
5740   Status s;
5741   log::Reader* reader = manifest_reader->get();
5742   assert(reader);
5743   s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
5744   if (!s.ok()) {
5745     return s;
5746   }
5747   manifest_tailer_->Iterate(*(manifest_reader->get()), manifest_read_status);
5748   s = manifest_tailer_->status();
5749   if (s.ok()) {
5750     *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies());
5751   }
5752 
5753   return s;
5754 }
5755 
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)5756 Status ReactiveVersionSet::MaybeSwitchManifest(
5757     log::Reader::Reporter* reporter,
5758     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
5759   assert(manifest_reader != nullptr);
5760   Status s;
5761   do {
5762     std::string manifest_path;
5763     s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
5764                                &manifest_file_number_);
5765     std::unique_ptr<FSSequentialFile> manifest_file;
5766     if (s.ok()) {
5767       if (nullptr == manifest_reader->get() ||
5768           manifest_reader->get()->file()->file_name() != manifest_path) {
5769         TEST_SYNC_POINT(
5770             "ReactiveVersionSet::MaybeSwitchManifest:"
5771             "AfterGetCurrentManifestPath:0");
5772         TEST_SYNC_POINT(
5773             "ReactiveVersionSet::MaybeSwitchManifest:"
5774             "AfterGetCurrentManifestPath:1");
5775         s = fs_->NewSequentialFile(manifest_path,
5776                                    fs_->OptimizeForManifestRead(file_options_),
5777                                    &manifest_file, nullptr);
5778       } else {
5779         // No need to switch manifest.
5780         break;
5781       }
5782     }
5783     std::unique_ptr<SequentialFileReader> manifest_file_reader;
5784     if (s.ok()) {
5785       manifest_file_reader.reset(new SequentialFileReader(
5786           std::move(manifest_file), manifest_path,
5787           db_options_->log_readahead_size, io_tracer_));
5788       manifest_reader->reset(new log::FragmentBufferedReader(
5789           nullptr, std::move(manifest_file_reader), reporter,
5790           true /* checksum */, 0 /* log_number */));
5791       ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
5792                      manifest_path.c_str());
5793       if (manifest_tailer_) {
5794         manifest_tailer_->PrepareToReadNewManifest();
5795       }
5796     }
5797   } while (s.IsPathNotFound());
5798   return s;
5799 }
5800 
5801 #ifndef NDEBUG
TEST_read_edits_in_atomic_group() const5802 uint64_t ReactiveVersionSet::TEST_read_edits_in_atomic_group() const {
5803   assert(manifest_tailer_);
5804   return manifest_tailer_->GetReadBuffer().TEST_read_edits_in_atomic_group();
5805 }
5806 #endif  // !NDEBUG
5807 
replay_buffer()5808 std::vector<VersionEdit>& ReactiveVersionSet::replay_buffer() {
5809   assert(manifest_tailer_);
5810   return manifest_tailer_->GetReadBuffer().replay_buffer();
5811 }
5812 
5813 }  // namespace ROCKSDB_NAMESPACE
5814