1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_set.h"
11 
12 #include <stdio.h>
13 #include <algorithm>
14 #include <array>
15 #include <cinttypes>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 #include "compaction/compaction.h"
23 #include "db/internal_stats.h"
24 #include "db/log_reader.h"
25 #include "db/log_writer.h"
26 #include "db/memtable.h"
27 #include "db/merge_context.h"
28 #include "db/merge_helper.h"
29 #include "db/pinned_iterators_manager.h"
30 #include "db/table_cache.h"
31 #include "db/version_builder.h"
32 #include "file/filename.h"
33 #include "file/random_access_file_reader.h"
34 #include "file/read_write_util.h"
35 #include "file/writable_file_writer.h"
36 #include "monitoring/file_read_sample.h"
37 #include "monitoring/perf_context_imp.h"
38 #include "monitoring/persistent_stats_history.h"
39 #include "rocksdb/env.h"
40 #include "rocksdb/merge_operator.h"
41 #include "rocksdb/write_buffer_manager.h"
42 #include "table/format.h"
43 #include "table/get_context.h"
44 #include "table/internal_iterator.h"
45 #include "table/merging_iterator.h"
46 #include "table/meta_blocks.h"
47 #include "table/multiget_context.h"
48 #include "table/plain/plain_table_factory.h"
49 #include "table/table_reader.h"
50 #include "table/two_level_iterator.h"
51 #include "test_util/sync_point.h"
52 #include "util/coding.h"
53 #include "util/stop_watch.h"
54 #include "util/string_util.h"
55 #include "util/user_comparator_wrapper.h"
56 
57 namespace ROCKSDB_NAMESPACE {
58 
59 namespace {
60 
61 // Find File in LevelFilesBrief data structure
62 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)63 int FindFileInRange(const InternalKeyComparator& icmp,
64     const LevelFilesBrief& file_level,
65     const Slice& key,
66     uint32_t left,
67     uint32_t right) {
68   auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
69     return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
70   };
71   const auto &b = file_level.files;
72   return static_cast<int>(std::lower_bound(b + left,
73                                            b + right, key, cmp) - b);
74 }
75 
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)76 Status OverlapWithIterator(const Comparator* ucmp,
77     const Slice& smallest_user_key,
78     const Slice& largest_user_key,
79     InternalIterator* iter,
80     bool* overlap) {
81   InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
82                           kValueTypeForSeek);
83   iter->Seek(range_start.Encode());
84   if (!iter->status().ok()) {
85     return iter->status();
86   }
87 
88   *overlap = false;
89   if (iter->Valid()) {
90     ParsedInternalKey seek_result;
91     if (!ParseInternalKey(iter->key(), &seek_result)) {
92       return Status::Corruption("DB have corrupted keys");
93     }
94 
95     if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
96         0) {
97       *overlap = true;
98     }
99   }
100 
101   return iter->status();
102 }
103 
104 // Class to help choose the next file to search for the particular key.
105 // Searches and returns files level by level.
106 // We can search level-by-level since entries never hop across
107 // levels. Therefore we are guaranteed that if we find data
108 // in a smaller level, later levels are irrelevant (unless we
109 // are MergeInProgress).
110 class FilePicker {
111  public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)112   FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
113              const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
114              unsigned int num_levels, FileIndexer* file_indexer,
115              const Comparator* user_comparator,
116              const InternalKeyComparator* internal_comparator)
117       : num_levels_(num_levels),
118         curr_level_(static_cast<unsigned int>(-1)),
119         returned_file_level_(static_cast<unsigned int>(-1)),
120         hit_file_level_(static_cast<unsigned int>(-1)),
121         search_left_bound_(0),
122         search_right_bound_(FileIndexer::kLevelMaxIndex),
123 #ifndef NDEBUG
124         files_(files),
125 #endif
126         level_files_brief_(file_levels),
127         is_hit_file_last_in_level_(false),
128         curr_file_level_(nullptr),
129         user_key_(user_key),
130         ikey_(ikey),
131         file_indexer_(file_indexer),
132         user_comparator_(user_comparator),
133         internal_comparator_(internal_comparator) {
134 #ifdef NDEBUG
135     (void)files;
136 #endif
137     // Setup member variables to search first level.
138     search_ended_ = !PrepareNextLevel();
139     if (!search_ended_) {
140       // Prefetch Level 0 table data to avoid cache miss if possible.
141       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
142         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
143         if (r) {
144           r->Prepare(ikey);
145         }
146       }
147     }
148   }
149 
GetCurrentLevel() const150   int GetCurrentLevel() const { return curr_level_; }
151 
GetNextFile()152   FdWithKeyRange* GetNextFile() {
153     while (!search_ended_) {  // Loops over different levels.
154       while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
155         // Loops over all files in current level.
156         FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
157         hit_file_level_ = curr_level_;
158         is_hit_file_last_in_level_ =
159             curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
160         int cmp_largest = -1;
161 
162         // Do key range filtering of files or/and fractional cascading if:
163         // (1) not all the files are in level 0, or
164         // (2) there are more than 3 current level files
165         // If there are only 3 or less current level files in the system, we skip
166         // the key range filtering. In this case, more likely, the system is
167         // highly tuned to minimize number of tables queried by each query,
168         // so it is unlikely that key range filtering is more efficient than
169         // querying the files.
170         if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
171           // Check if key is within a file's range. If search left bound and
172           // right bound point to the same find, we are sure key falls in
173           // range.
174           assert(curr_level_ == 0 ||
175                  curr_index_in_curr_level_ == start_index_in_curr_level_ ||
176                  user_comparator_->CompareWithoutTimestamp(
177                      user_key_, ExtractUserKey(f->smallest_key)) <= 0);
178 
179           int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
180               user_key_, ExtractUserKey(f->smallest_key));
181           if (cmp_smallest >= 0) {
182             cmp_largest = user_comparator_->CompareWithoutTimestamp(
183                 user_key_, ExtractUserKey(f->largest_key));
184           }
185 
186           // Setup file search bound for the next level based on the
187           // comparison results
188           if (curr_level_ > 0) {
189             file_indexer_->GetNextLevelIndex(curr_level_,
190                                             curr_index_in_curr_level_,
191                                             cmp_smallest, cmp_largest,
192                                             &search_left_bound_,
193                                             &search_right_bound_);
194           }
195           // Key falls out of current file's range
196           if (cmp_smallest < 0 || cmp_largest > 0) {
197             if (curr_level_ == 0) {
198               ++curr_index_in_curr_level_;
199               continue;
200             } else {
201               // Search next level.
202               break;
203             }
204           }
205         }
206 #ifndef NDEBUG
207         // Sanity check to make sure that the files are correctly sorted
208         if (prev_file_) {
209           if (curr_level_ != 0) {
210             int comp_sign = internal_comparator_->Compare(
211                 prev_file_->largest_key, f->smallest_key);
212             assert(comp_sign < 0);
213           } else {
214             // level == 0, the current file cannot be newer than the previous
215             // one. Use compressed data structure, has no attribute seqNo
216             assert(curr_index_in_curr_level_ > 0);
217             assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
218                   files_[0][curr_index_in_curr_level_-1]));
219           }
220         }
221         prev_file_ = f;
222 #endif
223         returned_file_level_ = curr_level_;
224         if (curr_level_ > 0 && cmp_largest < 0) {
225           // No more files to search in this level.
226           search_ended_ = !PrepareNextLevel();
227         } else {
228           ++curr_index_in_curr_level_;
229         }
230         return f;
231       }
232       // Start searching next level.
233       search_ended_ = !PrepareNextLevel();
234     }
235     // Search ended.
236     return nullptr;
237   }
238 
239   // getter for current file level
240   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()241   unsigned int GetHitFileLevel() { return hit_file_level_; }
242 
243   // Returns true if the most recent "hit file" (i.e., one returned by
244   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()245   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
246 
247  private:
248   unsigned int num_levels_;
249   unsigned int curr_level_;
250   unsigned int returned_file_level_;
251   unsigned int hit_file_level_;
252   int32_t search_left_bound_;
253   int32_t search_right_bound_;
254 #ifndef NDEBUG
255   std::vector<FileMetaData*>* files_;
256 #endif
257   autovector<LevelFilesBrief>* level_files_brief_;
258   bool search_ended_;
259   bool is_hit_file_last_in_level_;
260   LevelFilesBrief* curr_file_level_;
261   unsigned int curr_index_in_curr_level_;
262   unsigned int start_index_in_curr_level_;
263   Slice user_key_;
264   Slice ikey_;
265   FileIndexer* file_indexer_;
266   const Comparator* user_comparator_;
267   const InternalKeyComparator* internal_comparator_;
268 #ifndef NDEBUG
269   FdWithKeyRange* prev_file_;
270 #endif
271 
272   // Setup local variables to search next level.
273   // Returns false if there are no more levels to search.
PrepareNextLevel()274   bool PrepareNextLevel() {
275     curr_level_++;
276     while (curr_level_ < num_levels_) {
277       curr_file_level_ = &(*level_files_brief_)[curr_level_];
278       if (curr_file_level_->num_files == 0) {
279         // When current level is empty, the search bound generated from upper
280         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
281         // also empty.
282         assert(search_left_bound_ == 0);
283         assert(search_right_bound_ == -1 ||
284                search_right_bound_ == FileIndexer::kLevelMaxIndex);
285         // Since current level is empty, it will need to search all files in
286         // the next level
287         search_left_bound_ = 0;
288         search_right_bound_ = FileIndexer::kLevelMaxIndex;
289         curr_level_++;
290         continue;
291       }
292 
293       // Some files may overlap each other. We find
294       // all files that overlap user_key and process them in order from
295       // newest to oldest. In the context of merge-operator, this can occur at
296       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
297       // are always compacted into a single entry).
298       int32_t start_index;
299       if (curr_level_ == 0) {
300         // On Level-0, we read through all files to check for overlap.
301         start_index = 0;
302       } else {
303         // On Level-n (n>=1), files are sorted. Binary search to find the
304         // earliest file whose largest key >= ikey. Search left bound and
305         // right bound are used to narrow the range.
306         if (search_left_bound_ <= search_right_bound_) {
307           if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
308             search_right_bound_ =
309                 static_cast<int32_t>(curr_file_level_->num_files) - 1;
310           }
311           // `search_right_bound_` is an inclusive upper-bound, but since it was
312           // determined based on user key, it is still possible the lookup key
313           // falls to the right of `search_right_bound_`'s corresponding file.
314           // So, pass a limit one higher, which allows us to detect this case.
315           start_index =
316               FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
317                               static_cast<uint32_t>(search_left_bound_),
318                               static_cast<uint32_t>(search_right_bound_) + 1);
319           if (start_index == search_right_bound_ + 1) {
320             // `ikey_` comes after `search_right_bound_`. The lookup key does
321             // not exist on this level, so let's skip this level and do a full
322             // binary search on the next level.
323             search_left_bound_ = 0;
324             search_right_bound_ = FileIndexer::kLevelMaxIndex;
325             curr_level_++;
326             continue;
327           }
328         } else {
329           // search_left_bound > search_right_bound, key does not exist in
330           // this level. Since no comparison is done in this level, it will
331           // need to search all files in the next level.
332           search_left_bound_ = 0;
333           search_right_bound_ = FileIndexer::kLevelMaxIndex;
334           curr_level_++;
335           continue;
336         }
337       }
338       start_index_in_curr_level_ = start_index;
339       curr_index_in_curr_level_ = start_index;
340 #ifndef NDEBUG
341       prev_file_ = nullptr;
342 #endif
343       return true;
344     }
345     // curr_level_ = num_levels_. So, no more levels to search.
346     return false;
347   }
348 };
349 
350 class FilePickerMultiGet {
351  private:
352   struct FilePickerContext;
353 
354  public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)355   FilePickerMultiGet(MultiGetRange* range,
356                      autovector<LevelFilesBrief>* file_levels,
357                      unsigned int num_levels, FileIndexer* file_indexer,
358                      const Comparator* user_comparator,
359                      const InternalKeyComparator* internal_comparator)
360       : num_levels_(num_levels),
361         curr_level_(static_cast<unsigned int>(-1)),
362         returned_file_level_(static_cast<unsigned int>(-1)),
363         hit_file_level_(static_cast<unsigned int>(-1)),
364         range_(range),
365         batch_iter_(range->begin()),
366         batch_iter_prev_(range->begin()),
367         maybe_repeat_key_(false),
368         current_level_range_(*range, range->begin(), range->end()),
369         current_file_range_(*range, range->begin(), range->end()),
370         level_files_brief_(file_levels),
371         is_hit_file_last_in_level_(false),
372         curr_file_level_(nullptr),
373         file_indexer_(file_indexer),
374         user_comparator_(user_comparator),
375         internal_comparator_(internal_comparator) {
376     for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
377       fp_ctx_array_[iter.index()] =
378           FilePickerContext(0, FileIndexer::kLevelMaxIndex);
379     }
380 
381     // Setup member variables to search first level.
382     search_ended_ = !PrepareNextLevel();
383     if (!search_ended_) {
384       // REVISIT
385       // Prefetch Level 0 table data to avoid cache miss if possible.
386       // As of now, only PlainTableReader and CuckooTableReader do any
387       // prefetching. This may not be necessary anymore once we implement
388       // batching in those table readers
389       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
390         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
391         if (r) {
392           for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
393             r->Prepare(iter->ikey);
394           }
395         }
396       }
397     }
398   }
399 
GetCurrentLevel() const400   int GetCurrentLevel() const { return curr_level_; }
401 
402   // Iterates through files in the current level until it finds a file that
403   // contains atleast one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)404   bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
405                                   size_t* file_index, FdWithKeyRange** fd,
406                                   bool* is_last_key_in_file) {
407     size_t curr_file_index = *file_index;
408     FdWithKeyRange* f = nullptr;
409     bool file_hit = false;
410     int cmp_largest = -1;
411     if (curr_file_index >= curr_file_level_->num_files) {
412       // In the unlikely case the next key is a duplicate of the current key,
413       // and the current key is the last in the level and the internal key
414       // was not found, we need to skip lookup for the remaining keys and
415       // reset the search bounds
416       if (batch_iter_ != current_level_range_.end()) {
417         ++batch_iter_;
418         for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
419           struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
420           fp_ctx.search_left_bound = 0;
421           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
422         }
423       }
424       return false;
425     }
426     // Loops over keys in the MultiGet batch until it finds a file with
427     // atleast one of the keys. Then it keeps moving forward until the
428     // last key in the batch that falls in that file
429     while (batch_iter_ != current_level_range_.end() &&
430            (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
431                 curr_file_index ||
432             !file_hit)) {
433       struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
434       f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
435       Slice& user_key = batch_iter_->ukey;
436 
437       // Do key range filtering of files or/and fractional cascading if:
438       // (1) not all the files are in level 0, or
439       // (2) there are more than 3 current level files
440       // If there are only 3 or less current level files in the system, we
441       // skip the key range filtering. In this case, more likely, the system
442       // is highly tuned to minimize number of tables queried by each query,
443       // so it is unlikely that key range filtering is more efficient than
444       // querying the files.
445       if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
446         // Check if key is within a file's range. If search left bound and
447         // right bound point to the same find, we are sure key falls in
448         // range.
449         assert(curr_level_ == 0 ||
450                fp_ctx.curr_index_in_curr_level ==
451                    fp_ctx.start_index_in_curr_level ||
452                user_comparator_->Compare(user_key,
453                                          ExtractUserKey(f->smallest_key)) <= 0);
454 
455         int cmp_smallest = user_comparator_->Compare(
456             user_key, ExtractUserKey(f->smallest_key));
457         if (cmp_smallest >= 0) {
458           cmp_largest = user_comparator_->Compare(
459               user_key, ExtractUserKey(f->largest_key));
460         } else {
461           cmp_largest = -1;
462         }
463 
464         // Setup file search bound for the next level based on the
465         // comparison results
466         if (curr_level_ > 0) {
467           file_indexer_->GetNextLevelIndex(
468               curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
469               cmp_largest, &fp_ctx.search_left_bound,
470               &fp_ctx.search_right_bound);
471         }
472         // Key falls out of current file's range
473         if (cmp_smallest < 0 || cmp_largest > 0) {
474           next_file_range->SkipKey(batch_iter_);
475         } else {
476           file_hit = true;
477         }
478       } else {
479         file_hit = true;
480       }
481       if (cmp_largest == 0) {
482         // cmp_largest is 0, which means the next key will not be in this
483         // file, so stop looking further. Also don't increment megt_iter_
484         // as we may have to look for this key in the next file if we don't
485         // find it in this one
486         break;
487       } else {
488         if (curr_level_ == 0) {
489           // We need to look through all files in level 0
490           ++fp_ctx.curr_index_in_curr_level;
491         }
492         ++batch_iter_;
493       }
494       if (!file_hit) {
495         curr_file_index =
496             (batch_iter_ != current_level_range_.end())
497                 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
498                 : curr_file_level_->num_files;
499       }
500     }
501 
502     *fd = f;
503     *file_index = curr_file_index;
504     *is_last_key_in_file = cmp_largest == 0;
505     return file_hit;
506   }
507 
GetNextFile()508   FdWithKeyRange* GetNextFile() {
509     while (!search_ended_) {
510       // Start searching next level.
511       if (batch_iter_ == current_level_range_.end()) {
512         search_ended_ = !PrepareNextLevel();
513         continue;
514       } else {
515         if (maybe_repeat_key_) {
516           maybe_repeat_key_ = false;
517           // Check if we found the final value for the last key in the
518           // previous lookup range. If we did, then there's no need to look
519           // any further for that key, so advance batch_iter_. Else, keep
520           // batch_iter_ positioned on that key so we look it up again in
521           // the next file
522           // For L0, always advance the key because we will look in the next
523           // file regardless for all keys not found yet
524           if (current_level_range_.CheckKeyDone(batch_iter_) ||
525               curr_level_ == 0) {
526             ++batch_iter_;
527           }
528         }
529         // batch_iter_prev_ will become the start key for the next file
530         // lookup
531         batch_iter_prev_ = batch_iter_;
532       }
533 
534       MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
535                                     current_level_range_.end());
536       size_t curr_file_index =
537           (batch_iter_ != current_level_range_.end())
538               ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
539               : curr_file_level_->num_files;
540       FdWithKeyRange* f;
541       bool is_last_key_in_file;
542       if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
543                                       &is_last_key_in_file)) {
544         search_ended_ = !PrepareNextLevel();
545       } else {
546         MultiGetRange::Iterator upper_key = batch_iter_;
547         if (is_last_key_in_file) {
548           // Since cmp_largest is 0, batch_iter_ still points to the last key
549           // that falls in this file, instead of the next one. Increment
550           // upper_key so we can set the range properly for SST MultiGet
551           ++upper_key;
552           ++(fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level);
553           maybe_repeat_key_ = true;
554         }
555         // Set the range for this file
556         current_file_range_ =
557             MultiGetRange(next_file_range, batch_iter_prev_, upper_key);
558         returned_file_level_ = curr_level_;
559         hit_file_level_ = curr_level_;
560         is_hit_file_last_in_level_ =
561             curr_file_index == curr_file_level_->num_files - 1;
562         return f;
563       }
564     }
565 
566     // Search ended
567     return nullptr;
568   }
569 
570   // getter for current file level
571   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()572   unsigned int GetHitFileLevel() { return hit_file_level_; }
573 
574   // Returns true if the most recent "hit file" (i.e., one returned by
575   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()576   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
577 
CurrentFileRange()578   const MultiGetRange& CurrentFileRange() { return current_file_range_; }
579 
580  private:
581   unsigned int num_levels_;
582   unsigned int curr_level_;
583   unsigned int returned_file_level_;
584   unsigned int hit_file_level_;
585 
586   struct FilePickerContext {
587     int32_t search_left_bound;
588     int32_t search_right_bound;
589     unsigned int curr_index_in_curr_level;
590     unsigned int start_index_in_curr_level;
591 
FilePickerContextROCKSDB_NAMESPACE::__anonf974f4af0111::FilePickerMultiGet::FilePickerContext592     FilePickerContext(int32_t left, int32_t right)
593         : search_left_bound(left), search_right_bound(right),
594           curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
595 
596     FilePickerContext() = default;
597   };
598   std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
599   MultiGetRange* range_;
600   // Iterator to iterate through the keys in a MultiGet batch, that gets reset
601   // at the beginning of each level. Each call to GetNextFile() will position
602   // batch_iter_ at or right after the last key that was found in the returned
603   // SST file
604   MultiGetRange::Iterator batch_iter_;
605   // An iterator that records the previous position of batch_iter_, i.e last
606   // key found in the previous SST file, in order to serve as the start of
607   // the batch key range for the next SST file
608   MultiGetRange::Iterator batch_iter_prev_;
609   bool maybe_repeat_key_;
610   MultiGetRange current_level_range_;
611   MultiGetRange current_file_range_;
612   autovector<LevelFilesBrief>* level_files_brief_;
613   bool search_ended_;
614   bool is_hit_file_last_in_level_;
615   LevelFilesBrief* curr_file_level_;
616   FileIndexer* file_indexer_;
617   const Comparator* user_comparator_;
618   const InternalKeyComparator* internal_comparator_;
619 
620   // Setup local variables to search next level.
621   // Returns false if there are no more levels to search.
PrepareNextLevel()622   bool PrepareNextLevel() {
623     if (curr_level_ == 0) {
624       MultiGetRange::Iterator mget_iter = current_level_range_.begin();
625       if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
626           curr_file_level_->num_files) {
627         batch_iter_prev_ = current_level_range_.begin();
628         batch_iter_ = current_level_range_.begin();
629         return true;
630       }
631     }
632 
633     curr_level_++;
634     // Reset key range to saved value
635     while (curr_level_ < num_levels_) {
636       bool level_contains_keys = false;
637       curr_file_level_ = &(*level_files_brief_)[curr_level_];
638       if (curr_file_level_->num_files == 0) {
639         // When current level is empty, the search bound generated from upper
640         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
641         // also empty.
642 
643         for (auto mget_iter = current_level_range_.begin();
644              mget_iter != current_level_range_.end(); ++mget_iter) {
645           struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
646 
647           assert(fp_ctx.search_left_bound == 0);
648           assert(fp_ctx.search_right_bound == -1 ||
649                  fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
650           // Since current level is empty, it will need to search all files in
651           // the next level
652           fp_ctx.search_left_bound = 0;
653           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
654         }
655         // Skip all subsequent empty levels
656         do {
657           ++curr_level_;
658         } while ((curr_level_ < num_levels_) &&
659                  (*level_files_brief_)[curr_level_].num_files == 0);
660         continue;
661       }
662 
663       // Some files may overlap each other. We find
664       // all files that overlap user_key and process them in order from
665       // newest to oldest. In the context of merge-operator, this can occur at
666       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
667       // are always compacted into a single entry).
668       int32_t start_index = -1;
669       current_level_range_ =
670           MultiGetRange(*range_, range_->begin(), range_->end());
671       for (auto mget_iter = current_level_range_.begin();
672            mget_iter != current_level_range_.end(); ++mget_iter) {
673         struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
674         if (curr_level_ == 0) {
675           // On Level-0, we read through all files to check for overlap.
676           start_index = 0;
677           level_contains_keys = true;
678         } else {
679           // On Level-n (n>=1), files are sorted. Binary search to find the
680           // earliest file whose largest key >= ikey. Search left bound and
681           // right bound are used to narrow the range.
682           if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
683             if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
684               fp_ctx.search_right_bound =
685                   static_cast<int32_t>(curr_file_level_->num_files) - 1;
686             }
687             // `search_right_bound_` is an inclusive upper-bound, but since it
688             // was determined based on user key, it is still possible the lookup
689             // key falls to the right of `search_right_bound_`'s corresponding
690             // file. So, pass a limit one higher, which allows us to detect this
691             // case.
692             Slice& ikey = mget_iter->ikey;
693             start_index = FindFileInRange(
694                 *internal_comparator_, *curr_file_level_, ikey,
695                 static_cast<uint32_t>(fp_ctx.search_left_bound),
696                 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
697             if (start_index == fp_ctx.search_right_bound + 1) {
698               // `ikey_` comes after `search_right_bound_`. The lookup key does
699               // not exist on this level, so let's skip this level and do a full
700               // binary search on the next level.
701               fp_ctx.search_left_bound = 0;
702               fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
703               current_level_range_.SkipKey(mget_iter);
704               continue;
705             } else {
706               level_contains_keys = true;
707             }
708           } else {
709             // search_left_bound > search_right_bound, key does not exist in
710             // this level. Since no comparison is done in this level, it will
711             // need to search all files in the next level.
712             fp_ctx.search_left_bound = 0;
713             fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
714             current_level_range_.SkipKey(mget_iter);
715             continue;
716           }
717         }
718         fp_ctx.start_index_in_curr_level = start_index;
719         fp_ctx.curr_index_in_curr_level = start_index;
720       }
721       if (level_contains_keys) {
722         batch_iter_prev_ = current_level_range_.begin();
723         batch_iter_ = current_level_range_.begin();
724         return true;
725       }
726       curr_level_++;
727     }
728     // curr_level_ = num_levels_. So, no more levels to search.
729     return false;
730   }
731 };
732 }  // anonymous namespace
733 
~VersionStorageInfo()734 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
735 
~Version()736 Version::~Version() {
737   assert(refs_ == 0);
738 
739   // Remove from linked list
740   prev_->next_ = next_;
741   next_->prev_ = prev_;
742 
743   // Drop references to files
744   for (int level = 0; level < storage_info_.num_levels_; level++) {
745     for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
746       FileMetaData* f = storage_info_.files_[level][i];
747       assert(f->refs > 0);
748       f->refs--;
749       if (f->refs <= 0) {
750         assert(cfd_ != nullptr);
751         uint32_t path_id = f->fd.GetPathId();
752         assert(path_id < cfd_->ioptions()->cf_paths.size());
753         vset_->obsolete_files_.push_back(
754             ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
755       }
756     }
757   }
758 }
759 
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)760 int FindFile(const InternalKeyComparator& icmp,
761              const LevelFilesBrief& file_level,
762              const Slice& key) {
763   return FindFileInRange(icmp, file_level, key, 0,
764                          static_cast<uint32_t>(file_level.num_files));
765 }
766 
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)767 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
768         const std::vector<FileMetaData*>& files,
769         Arena* arena) {
770   assert(file_level);
771   assert(arena);
772 
773   size_t num = files.size();
774   file_level->num_files = num;
775   char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
776   file_level->files = new (mem)FdWithKeyRange[num];
777 
778   for (size_t i = 0; i < num; i++) {
779     Slice smallest_key = files[i]->smallest.Encode();
780     Slice largest_key = files[i]->largest.Encode();
781 
782     // Copy key slice to sequential memory
783     size_t smallest_size = smallest_key.size();
784     size_t largest_size = largest_key.size();
785     mem = arena->AllocateAligned(smallest_size + largest_size);
786     memcpy(mem, smallest_key.data(), smallest_size);
787     memcpy(mem + smallest_size, largest_key.data(), largest_size);
788 
789     FdWithKeyRange& f = file_level->files[i];
790     f.fd = files[i]->fd;
791     f.file_metadata = files[i];
792     f.smallest_key = Slice(mem, smallest_size);
793     f.largest_key = Slice(mem + smallest_size, largest_size);
794   }
795 }
796 
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)797 static bool AfterFile(const Comparator* ucmp,
798                       const Slice* user_key, const FdWithKeyRange* f) {
799   // nullptr user_key occurs before all keys and is therefore never after *f
800   return (user_key != nullptr &&
801           ucmp->CompareWithoutTimestamp(*user_key,
802                                         ExtractUserKey(f->largest_key)) > 0);
803 }
804 
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)805 static bool BeforeFile(const Comparator* ucmp,
806                        const Slice* user_key, const FdWithKeyRange* f) {
807   // nullptr user_key occurs after all keys and is therefore never before *f
808   return (user_key != nullptr &&
809           ucmp->CompareWithoutTimestamp(*user_key,
810                                         ExtractUserKey(f->smallest_key)) < 0);
811 }
812 
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)813 bool SomeFileOverlapsRange(
814     const InternalKeyComparator& icmp,
815     bool disjoint_sorted_files,
816     const LevelFilesBrief& file_level,
817     const Slice* smallest_user_key,
818     const Slice* largest_user_key) {
819   const Comparator* ucmp = icmp.user_comparator();
820   if (!disjoint_sorted_files) {
821     // Need to check against all files
822     for (size_t i = 0; i < file_level.num_files; i++) {
823       const FdWithKeyRange* f = &(file_level.files[i]);
824       if (AfterFile(ucmp, smallest_user_key, f) ||
825           BeforeFile(ucmp, largest_user_key, f)) {
826         // No overlap
827       } else {
828         return true;  // Overlap
829       }
830     }
831     return false;
832   }
833 
834   // Binary search over file list
835   uint32_t index = 0;
836   if (smallest_user_key != nullptr) {
837     // Find the leftmost possible internal key for smallest_user_key
838     InternalKey small;
839     small.SetMinPossibleForUserKey(*smallest_user_key);
840     index = FindFile(icmp, file_level, small.Encode());
841   }
842 
843   if (index >= file_level.num_files) {
844     // beginning of range is after all files, so no overlap.
845     return false;
846   }
847 
848   return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
849 }
850 
851 namespace {
852 
853 class LevelIterator final : public InternalIterator {
854  public:
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr)855   LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
856                 const FileOptions& file_options,
857                 const InternalKeyComparator& icomparator,
858                 const LevelFilesBrief* flevel,
859                 const SliceTransform* prefix_extractor, bool should_sample,
860                 HistogramImpl* file_read_hist, TableReaderCaller caller,
861                 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
862                 const std::vector<AtomicCompactionUnitBoundary>*
863                     compaction_boundaries = nullptr)
864       : table_cache_(table_cache),
865         read_options_(read_options),
866         file_options_(file_options),
867         icomparator_(icomparator),
868         user_comparator_(icomparator.user_comparator()),
869         flevel_(flevel),
870         prefix_extractor_(prefix_extractor),
871         file_read_hist_(file_read_hist),
872         should_sample_(should_sample),
873         caller_(caller),
874         skip_filters_(skip_filters),
875         file_index_(flevel_->num_files),
876         level_(level),
877         range_del_agg_(range_del_agg),
878         pinned_iters_mgr_(nullptr),
879         compaction_boundaries_(compaction_boundaries) {
880     // Empty level is not supported.
881     assert(flevel_ != nullptr && flevel_->num_files > 0);
882   }
883 
~LevelIterator()884   ~LevelIterator() override { delete file_iter_.Set(nullptr); }
885 
886   void Seek(const Slice& target) override;
887   void SeekForPrev(const Slice& target) override;
888   void SeekToFirst() override;
889   void SeekToLast() override;
890   void Next() final override;
891   bool NextAndGetResult(IterateResult* result) override;
892   void Prev() override;
893 
Valid() const894   bool Valid() const override { return file_iter_.Valid(); }
key() const895   Slice key() const override {
896     assert(Valid());
897     return file_iter_.key();
898   }
899 
value() const900   Slice value() const override {
901     assert(Valid());
902     return file_iter_.value();
903   }
904 
status() const905   Status status() const override {
906     return file_iter_.iter() ? file_iter_.status() : Status::OK();
907   }
908 
MayBeOutOfLowerBound()909   inline bool MayBeOutOfLowerBound() override {
910     assert(Valid());
911     return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
912   }
913 
MayBeOutOfUpperBound()914   inline bool MayBeOutOfUpperBound() override {
915     assert(Valid());
916     return file_iter_.MayBeOutOfUpperBound();
917   }
918 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)919   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
920     pinned_iters_mgr_ = pinned_iters_mgr;
921     if (file_iter_.iter()) {
922       file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
923     }
924   }
925 
IsKeyPinned() const926   bool IsKeyPinned() const override {
927     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
928            file_iter_.iter() && file_iter_.IsKeyPinned();
929   }
930 
IsValuePinned() const931   bool IsValuePinned() const override {
932     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
933            file_iter_.iter() && file_iter_.IsValuePinned();
934   }
935 
936  private:
937   // Return true if at least one invalid file is seen and skipped.
938   bool SkipEmptyFileForward();
939   void SkipEmptyFileBackward();
940   void SetFileIterator(InternalIterator* iter);
941   void InitFileIterator(size_t new_file_index);
942 
943   // Called by both of Next() and NextAndGetResult(). Force inline.
NextImpl()944   void NextImpl() {
945     assert(Valid());
946     file_iter_.Next();
947     SkipEmptyFileForward();
948   }
949 
file_smallest_key(size_t file_index)950   const Slice& file_smallest_key(size_t file_index) {
951     assert(file_index < flevel_->num_files);
952     return flevel_->files[file_index].smallest_key;
953   }
954 
KeyReachedUpperBound(const Slice & internal_key)955   bool KeyReachedUpperBound(const Slice& internal_key) {
956     return read_options_.iterate_upper_bound != nullptr &&
957            user_comparator_.CompareWithoutTimestamp(
958                ExtractUserKey(internal_key),
959                *read_options_.iterate_upper_bound) >= 0;
960   }
961 
NewFileIterator()962   InternalIterator* NewFileIterator() {
963     assert(file_index_ < flevel_->num_files);
964     auto file_meta = flevel_->files[file_index_];
965     if (should_sample_) {
966       sample_file_read_inc(file_meta.file_metadata);
967     }
968 
969     const InternalKey* smallest_compaction_key = nullptr;
970     const InternalKey* largest_compaction_key = nullptr;
971     if (compaction_boundaries_ != nullptr) {
972       smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
973       largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
974     }
975     CheckMayBeOutOfLowerBound();
976     return table_cache_->NewIterator(
977         read_options_, file_options_, icomparator_, *file_meta.file_metadata,
978         range_del_agg_, prefix_extractor_,
979         nullptr /* don't need reference to table */, file_read_hist_, caller_,
980         /*arena=*/nullptr, skip_filters_, level_, smallest_compaction_key,
981         largest_compaction_key);
982   }
983 
984   // Check if current file being fully within iterate_lower_bound.
985   //
986   // Note MyRocks may update iterate bounds between seek. To workaround it,
987   // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()988   void CheckMayBeOutOfLowerBound() {
989     if (read_options_.iterate_lower_bound != nullptr &&
990         file_index_ < flevel_->num_files) {
991       may_be_out_of_lower_bound_ =
992           user_comparator_.Compare(
993               ExtractUserKey(file_smallest_key(file_index_)),
994               *read_options_.iterate_lower_bound) < 0;
995     }
996   }
997 
998   TableCache* table_cache_;
999   const ReadOptions read_options_;
1000   const FileOptions& file_options_;
1001   const InternalKeyComparator& icomparator_;
1002   const UserComparatorWrapper user_comparator_;
1003   const LevelFilesBrief* flevel_;
1004   mutable FileDescriptor current_value_;
1005   // `prefix_extractor_` may be non-null even for total order seek. Checking
1006   // this variable is not the right way to identify whether prefix iterator
1007   // is used.
1008   const SliceTransform* prefix_extractor_;
1009 
1010   HistogramImpl* file_read_hist_;
1011   bool should_sample_;
1012   TableReaderCaller caller_;
1013   bool skip_filters_;
1014   bool may_be_out_of_lower_bound_ = true;
1015   size_t file_index_;
1016   int level_;
1017   RangeDelAggregator* range_del_agg_;
1018   IteratorWrapper file_iter_;  // May be nullptr
1019   PinnedIteratorsManager* pinned_iters_mgr_;
1020 
1021   // To be propagated to RangeDelAggregator in order to safely truncate range
1022   // tombstones.
1023   const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1024 };
1025 
Seek(const Slice & target)1026 void LevelIterator::Seek(const Slice& target) {
1027   // Check whether the seek key fall under the same file
1028   bool need_to_reseek = true;
1029   if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1030     const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1031     if (icomparator_.InternalKeyComparator::Compare(
1032             target, cur_file.largest_key) <= 0 &&
1033         icomparator_.InternalKeyComparator::Compare(
1034             target, cur_file.smallest_key) >= 0) {
1035       need_to_reseek = false;
1036       assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1037              file_index_);
1038     }
1039   }
1040   if (need_to_reseek) {
1041     TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1042     size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1043     InitFileIterator(new_file_index);
1044   }
1045 
1046   if (file_iter_.iter() != nullptr) {
1047     file_iter_.Seek(target);
1048   }
1049   if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1050       !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1051       file_iter_.iter() != nullptr && file_iter_.Valid()) {
1052     // We've skipped the file we initially positioned to. In the prefix
1053     // seek case, it is likely that the file is skipped because of
1054     // prefix bloom or hash, where more keys are skipped. We then check
1055     // the current key and invalidate the iterator if the prefix is
1056     // already passed.
1057     // When doing prefix iterator seek, when keys for one prefix have
1058     // been exhausted, it can jump to any key that is larger. Here we are
1059     // enforcing a stricter contract than that, in order to make it easier for
1060     // higher layers (merging and DB iterator) to reason the correctness:
1061     // 1. Within the prefix, the result should be accurate.
1062     // 2. If keys for the prefix is exhausted, it is either positioned to the
1063     //    next key after the prefix, or make the iterator invalid.
1064     // A side benefit will be that it invalidates the iterator earlier so that
1065     // the upper level merging iterator can merge fewer child iterators.
1066     Slice target_user_key = ExtractUserKey(target);
1067     Slice file_user_key = ExtractUserKey(file_iter_.key());
1068     if (prefix_extractor_->InDomain(target_user_key) &&
1069         (!prefix_extractor_->InDomain(file_user_key) ||
1070          user_comparator_.Compare(
1071              prefix_extractor_->Transform(target_user_key),
1072              prefix_extractor_->Transform(file_user_key)) != 0)) {
1073       SetFileIterator(nullptr);
1074     }
1075   }
1076   CheckMayBeOutOfLowerBound();
1077 }
1078 
SeekForPrev(const Slice & target)1079 void LevelIterator::SeekForPrev(const Slice& target) {
1080   size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1081   if (new_file_index >= flevel_->num_files) {
1082     new_file_index = flevel_->num_files - 1;
1083   }
1084 
1085   InitFileIterator(new_file_index);
1086   if (file_iter_.iter() != nullptr) {
1087     file_iter_.SeekForPrev(target);
1088     SkipEmptyFileBackward();
1089   }
1090   CheckMayBeOutOfLowerBound();
1091 }
1092 
SeekToFirst()1093 void LevelIterator::SeekToFirst() {
1094   InitFileIterator(0);
1095   if (file_iter_.iter() != nullptr) {
1096     file_iter_.SeekToFirst();
1097   }
1098   SkipEmptyFileForward();
1099   CheckMayBeOutOfLowerBound();
1100 }
1101 
SeekToLast()1102 void LevelIterator::SeekToLast() {
1103   InitFileIterator(flevel_->num_files - 1);
1104   if (file_iter_.iter() != nullptr) {
1105     file_iter_.SeekToLast();
1106   }
1107   SkipEmptyFileBackward();
1108   CheckMayBeOutOfLowerBound();
1109 }
1110 
Next()1111 void LevelIterator::Next() { NextImpl(); }
1112 
NextAndGetResult(IterateResult * result)1113 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1114   NextImpl();
1115   bool is_valid = Valid();
1116   if (is_valid) {
1117     result->key = key();
1118     result->may_be_out_of_upper_bound = MayBeOutOfUpperBound();
1119   }
1120   return is_valid;
1121 }
1122 
Prev()1123 void LevelIterator::Prev() {
1124   assert(Valid());
1125   file_iter_.Prev();
1126   SkipEmptyFileBackward();
1127 }
1128 
SkipEmptyFileForward()1129 bool LevelIterator::SkipEmptyFileForward() {
1130   bool seen_empty_file = false;
1131   while (file_iter_.iter() == nullptr ||
1132          (!file_iter_.Valid() && file_iter_.status().ok() &&
1133           !file_iter_.iter()->IsOutOfBound())) {
1134     seen_empty_file = true;
1135     // Move to next file
1136     if (file_index_ >= flevel_->num_files - 1) {
1137       // Already at the last file
1138       SetFileIterator(nullptr);
1139       break;
1140     }
1141     if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1142       SetFileIterator(nullptr);
1143       break;
1144     }
1145     InitFileIterator(file_index_ + 1);
1146     if (file_iter_.iter() != nullptr) {
1147       file_iter_.SeekToFirst();
1148     }
1149   }
1150   return seen_empty_file;
1151 }
1152 
SkipEmptyFileBackward()1153 void LevelIterator::SkipEmptyFileBackward() {
1154   while (file_iter_.iter() == nullptr ||
1155          (!file_iter_.Valid() && file_iter_.status().ok())) {
1156     // Move to previous file
1157     if (file_index_ == 0) {
1158       // Already the first file
1159       SetFileIterator(nullptr);
1160       return;
1161     }
1162     InitFileIterator(file_index_ - 1);
1163     if (file_iter_.iter() != nullptr) {
1164       file_iter_.SeekToLast();
1165     }
1166   }
1167 }
1168 
SetFileIterator(InternalIterator * iter)1169 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1170   if (pinned_iters_mgr_ && iter) {
1171     iter->SetPinnedItersMgr(pinned_iters_mgr_);
1172   }
1173 
1174   InternalIterator* old_iter = file_iter_.Set(iter);
1175   if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1176     pinned_iters_mgr_->PinIterator(old_iter);
1177   } else {
1178     delete old_iter;
1179   }
1180 }
1181 
InitFileIterator(size_t new_file_index)1182 void LevelIterator::InitFileIterator(size_t new_file_index) {
1183   if (new_file_index >= flevel_->num_files) {
1184     file_index_ = new_file_index;
1185     SetFileIterator(nullptr);
1186     return;
1187   } else {
1188     // If the file iterator shows incomplete, we try it again if users seek
1189     // to the same file, as this time we may go to a different data block
1190     // which is cached in block cache.
1191     //
1192     if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1193         new_file_index == file_index_) {
1194       // file_iter_ is already constructed with this iterator, so
1195       // no need to change anything
1196     } else {
1197       file_index_ = new_file_index;
1198       InternalIterator* iter = NewFileIterator();
1199       SetFileIterator(iter);
1200     }
1201   }
1202 }
1203 }  // anonymous namespace
1204 
1205 // A wrapper of version builder which references the current version in
1206 // constructor and unref it in the destructor.
1207 // Both of the constructor and destructor need to be called inside DB Mutex.
1208 class BaseReferencedVersionBuilder {
1209  public:
BaseReferencedVersionBuilder(ColumnFamilyData * cfd)1210   explicit BaseReferencedVersionBuilder(ColumnFamilyData* cfd)
1211       : version_builder_(new VersionBuilder(
1212             cfd->current()->version_set()->file_options(), cfd->table_cache(),
1213             cfd->current()->storage_info(), cfd->ioptions()->info_log)),
1214         version_(cfd->current()) {
1215     version_->Ref();
1216   }
~BaseReferencedVersionBuilder()1217   ~BaseReferencedVersionBuilder() {
1218     version_->Unref();
1219   }
version_builder()1220   VersionBuilder* version_builder() { return version_builder_.get(); }
1221 
1222  private:
1223   std::unique_ptr<VersionBuilder> version_builder_;
1224   Version* version_;
1225 };
1226 
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1227 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1228                                    const FileMetaData* file_meta,
1229                                    const std::string* fname) const {
1230   auto table_cache = cfd_->table_cache();
1231   auto ioptions = cfd_->ioptions();
1232   Status s = table_cache->GetTableProperties(
1233       file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1234       mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1235   if (s.ok()) {
1236     return s;
1237   }
1238 
1239   // We only ignore error type `Incomplete` since it's by design that we
1240   // disallow table when it's not in table cache.
1241   if (!s.IsIncomplete()) {
1242     return s;
1243   }
1244 
1245   // 2. Table is not present in table cache, we'll read the table properties
1246   // directly from the properties block in the file.
1247   std::unique_ptr<FSRandomAccessFile> file;
1248   std::string file_name;
1249   if (fname != nullptr) {
1250     file_name = *fname;
1251   } else {
1252     file_name =
1253       TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1254                     file_meta->fd.GetPathId());
1255   }
1256   s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1257                                         nullptr);
1258   if (!s.ok()) {
1259     return s;
1260   }
1261 
1262   TableProperties* raw_table_properties;
1263   // By setting the magic number to kInvalidTableMagicNumber, we can by
1264   // pass the magic number check in the footer.
1265   std::unique_ptr<RandomAccessFileReader> file_reader(
1266       new RandomAccessFileReader(
1267           std::move(file), file_name, nullptr /* env */, nullptr /* stats */,
1268           0 /* hist_type */, nullptr /* file_read_hist */,
1269           nullptr /* rate_limiter */, ioptions->listeners));
1270   s = ReadTableProperties(
1271       file_reader.get(), file_meta->fd.GetFileSize(),
1272       Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1273       &raw_table_properties, false /* compression_type_missing */);
1274   if (!s.ok()) {
1275     return s;
1276   }
1277   RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1278 
1279   *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1280   return s;
1281 }
1282 
GetPropertiesOfAllTables(TablePropertiesCollection * props)1283 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1284   Status s;
1285   for (int level = 0; level < storage_info_.num_levels_; level++) {
1286     s = GetPropertiesOfAllTables(props, level);
1287     if (!s.ok()) {
1288       return s;
1289     }
1290   }
1291 
1292   return Status::OK();
1293 }
1294 
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1295 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1296                                             std::string* out_str) {
1297   if (max_entries_to_print <= 0) {
1298     return Status::OK();
1299   }
1300   int num_entries_left = max_entries_to_print;
1301 
1302   std::stringstream ss;
1303 
1304   for (int level = 0; level < storage_info_.num_levels_; level++) {
1305     for (const auto& file_meta : storage_info_.files_[level]) {
1306       auto fname =
1307           TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1308                         file_meta->fd.GetPathId());
1309 
1310       ss << "=== file : " << fname << " ===\n";
1311 
1312       TableCache* table_cache = cfd_->table_cache();
1313       std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1314 
1315       Status s = table_cache->GetRangeTombstoneIterator(
1316           ReadOptions(), cfd_->internal_comparator(), *file_meta,
1317           &tombstone_iter);
1318       if (!s.ok()) {
1319         return s;
1320       }
1321       if (tombstone_iter) {
1322         tombstone_iter->SeekToFirst();
1323 
1324         while (tombstone_iter->Valid() && num_entries_left > 0) {
1325           ss << "start: " << tombstone_iter->start_key().ToString(true)
1326              << " end: " << tombstone_iter->end_key().ToString(true)
1327              << " seq: " << tombstone_iter->seq() << '\n';
1328           tombstone_iter->Next();
1329           num_entries_left--;
1330         }
1331         if (num_entries_left <= 0) {
1332           break;
1333         }
1334       }
1335     }
1336     if (num_entries_left <= 0) {
1337       break;
1338     }
1339   }
1340   assert(num_entries_left >= 0);
1341   if (num_entries_left <= 0) {
1342     ss << "(results may not be complete)\n";
1343   }
1344 
1345   *out_str = ss.str();
1346   return Status::OK();
1347 }
1348 
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1349 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1350                                          int level) {
1351   for (const auto& file_meta : storage_info_.files_[level]) {
1352     auto fname =
1353         TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1354                       file_meta->fd.GetPathId());
1355     // 1. If the table is already present in table cache, load table
1356     // properties from there.
1357     std::shared_ptr<const TableProperties> table_properties;
1358     Status s = GetTableProperties(&table_properties, file_meta, &fname);
1359     if (s.ok()) {
1360       props->insert({fname, table_properties});
1361     } else {
1362       return s;
1363     }
1364   }
1365 
1366   return Status::OK();
1367 }
1368 
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1369 Status Version::GetPropertiesOfTablesInRange(
1370     const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1371   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1372     for (decltype(n) i = 0; i < n; i++) {
1373       // Convert user_key into a corresponding internal key.
1374       InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1375       InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1376       std::vector<FileMetaData*> files;
1377       storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1378                                          false);
1379       for (const auto& file_meta : files) {
1380         auto fname =
1381             TableFileName(cfd_->ioptions()->cf_paths,
1382                           file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1383         if (props->count(fname) == 0) {
1384           // 1. If the table is already present in table cache, load table
1385           // properties from there.
1386           std::shared_ptr<const TableProperties> table_properties;
1387           Status s = GetTableProperties(&table_properties, file_meta, &fname);
1388           if (s.ok()) {
1389             props->insert({fname, table_properties});
1390           } else {
1391             return s;
1392           }
1393         }
1394       }
1395     }
1396   }
1397 
1398   return Status::OK();
1399 }
1400 
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1401 Status Version::GetAggregatedTableProperties(
1402     std::shared_ptr<const TableProperties>* tp, int level) {
1403   TablePropertiesCollection props;
1404   Status s;
1405   if (level < 0) {
1406     s = GetPropertiesOfAllTables(&props);
1407   } else {
1408     s = GetPropertiesOfAllTables(&props, level);
1409   }
1410   if (!s.ok()) {
1411     return s;
1412   }
1413 
1414   auto* new_tp = new TableProperties();
1415   for (const auto& item : props) {
1416     new_tp->Add(*item.second);
1417   }
1418   tp->reset(new_tp);
1419   return Status::OK();
1420 }
1421 
GetMemoryUsageByTableReaders()1422 size_t Version::GetMemoryUsageByTableReaders() {
1423   size_t total_usage = 0;
1424   for (auto& file_level : storage_info_.level_files_brief_) {
1425     for (size_t i = 0; i < file_level.num_files; i++) {
1426       total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1427           file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1428           mutable_cf_options_.prefix_extractor.get());
1429     }
1430   }
1431   return total_usage;
1432 }
1433 
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1434 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1435   assert(cf_meta);
1436   assert(cfd_);
1437 
1438   cf_meta->name = cfd_->GetName();
1439   cf_meta->size = 0;
1440   cf_meta->file_count = 0;
1441   cf_meta->levels.clear();
1442 
1443   auto* ioptions = cfd_->ioptions();
1444   auto* vstorage = storage_info();
1445 
1446   for (int level = 0; level < cfd_->NumberLevels(); level++) {
1447     uint64_t level_size = 0;
1448     cf_meta->file_count += vstorage->LevelFiles(level).size();
1449     std::vector<SstFileMetaData> files;
1450     for (const auto& file : vstorage->LevelFiles(level)) {
1451       uint32_t path_id = file->fd.GetPathId();
1452       std::string file_path;
1453       if (path_id < ioptions->cf_paths.size()) {
1454         file_path = ioptions->cf_paths[path_id].path;
1455       } else {
1456         assert(!ioptions->cf_paths.empty());
1457         file_path = ioptions->cf_paths.back().path;
1458       }
1459       const uint64_t file_number = file->fd.GetNumber();
1460       files.emplace_back(SstFileMetaData{
1461           MakeTableFileName("", file_number), file_number, file_path,
1462           static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1463           file->fd.largest_seqno, file->smallest.user_key().ToString(),
1464           file->largest.user_key().ToString(),
1465           file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1466           file->being_compacted, file->oldest_blob_file_number,
1467           file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
1468           file->file_checksum, file->file_checksum_func_name});
1469       files.back().num_entries = file->num_entries;
1470       files.back().num_deletions = file->num_deletions;
1471       level_size += file->fd.GetFileSize();
1472     }
1473     cf_meta->levels.emplace_back(
1474         level, level_size, std::move(files));
1475     cf_meta->size += level_size;
1476   }
1477 }
1478 
GetSstFilesSize()1479 uint64_t Version::GetSstFilesSize() {
1480   uint64_t sst_files_size = 0;
1481   for (int level = 0; level < storage_info_.num_levels_; level++) {
1482     for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1483       sst_files_size += file_meta->fd.GetFileSize();
1484     }
1485   }
1486   return sst_files_size;
1487 }
1488 
GetCreationTimeOfOldestFile(uint64_t * creation_time)1489 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1490   uint64_t oldest_time = port::kMaxUint64;
1491   for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1492     for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1493       assert(meta->fd.table_reader != nullptr);
1494       uint64_t file_creation_time = meta->TryGetFileCreationTime();
1495       if (file_creation_time == kUnknownFileCreationTime) {
1496         *creation_time = 0;
1497         return;
1498       }
1499       if (file_creation_time < oldest_time) {
1500         oldest_time = file_creation_time;
1501       }
1502     }
1503   }
1504   *creation_time = oldest_time;
1505 }
1506 
GetEstimatedActiveKeys() const1507 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1508   // Estimation will be inaccurate when:
1509   // (1) there exist merge keys
1510   // (2) keys are directly overwritten
1511   // (3) deletion on non-existing keys
1512   // (4) low number of samples
1513   if (current_num_samples_ == 0) {
1514     return 0;
1515   }
1516 
1517   if (current_num_non_deletions_ <= current_num_deletions_) {
1518     return 0;
1519   }
1520 
1521   uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1522 
1523   uint64_t file_count = 0;
1524   for (int level = 0; level < num_levels_; ++level) {
1525     file_count += files_[level].size();
1526   }
1527 
1528   if (current_num_samples_ < file_count) {
1529     // casting to avoid overflowing
1530     return
1531       static_cast<uint64_t>(
1532         (est * static_cast<double>(file_count) / current_num_samples_)
1533       );
1534   } else {
1535     return est;
1536   }
1537 }
1538 
GetEstimatedCompressionRatioAtLevel(int level) const1539 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1540     int level) const {
1541   assert(level < num_levels_);
1542   uint64_t sum_file_size_bytes = 0;
1543   uint64_t sum_data_size_bytes = 0;
1544   for (auto* file_meta : files_[level]) {
1545     sum_file_size_bytes += file_meta->fd.GetFileSize();
1546     sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1547   }
1548   if (sum_file_size_bytes == 0) {
1549     return -1.0;
1550   }
1551   return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1552 }
1553 
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg)1554 void Version::AddIterators(const ReadOptions& read_options,
1555                            const FileOptions& soptions,
1556                            MergeIteratorBuilder* merge_iter_builder,
1557                            RangeDelAggregator* range_del_agg) {
1558   assert(storage_info_.finalized_);
1559 
1560   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1561     AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1562                          range_del_agg);
1563   }
1564 }
1565 
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg)1566 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1567                                    const FileOptions& soptions,
1568                                    MergeIteratorBuilder* merge_iter_builder,
1569                                    int level,
1570                                    RangeDelAggregator* range_del_agg) {
1571   assert(storage_info_.finalized_);
1572   if (level >= storage_info_.num_non_empty_levels()) {
1573     // This is an empty level
1574     return;
1575   } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1576     // No files in this level
1577     return;
1578   }
1579 
1580   bool should_sample = should_sample_file_read();
1581 
1582   auto* arena = merge_iter_builder->GetArena();
1583   if (level == 0) {
1584     // Merge all level zero files together since they may overlap
1585     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1586       const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1587       merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1588           read_options, soptions, cfd_->internal_comparator(),
1589           *file.file_metadata, range_del_agg,
1590           mutable_cf_options_.prefix_extractor.get(), nullptr,
1591           cfd_->internal_stats()->GetFileReadHist(0),
1592           TableReaderCaller::kUserIterator, arena,
1593           /*skip_filters=*/false, /*level=*/0,
1594           /*smallest_compaction_key=*/nullptr,
1595           /*largest_compaction_key=*/nullptr));
1596     }
1597     if (should_sample) {
1598       // Count ones for every L0 files. This is done per iterator creation
1599       // rather than Seek(), while files in other levels are recored per seek.
1600       // If users execute one range query per iterator, there may be some
1601       // discrepancy here.
1602       for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1603         sample_file_read_inc(meta);
1604       }
1605     }
1606   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1607     // For levels > 0, we can use a concatenating iterator that sequentially
1608     // walks through the non-overlapping files in the level, opening them
1609     // lazily.
1610     auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1611     merge_iter_builder->AddIterator(new (mem) LevelIterator(
1612         cfd_->table_cache(), read_options, soptions,
1613         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1614         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1615         cfd_->internal_stats()->GetFileReadHist(level),
1616         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1617         range_del_agg, /*largest_compaction_key=*/nullptr));
1618   }
1619 }
1620 
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1621 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1622                                          const FileOptions& file_options,
1623                                          const Slice& smallest_user_key,
1624                                          const Slice& largest_user_key,
1625                                          int level, bool* overlap) {
1626   assert(storage_info_.finalized_);
1627 
1628   auto icmp = cfd_->internal_comparator();
1629   auto ucmp = icmp.user_comparator();
1630 
1631   Arena arena;
1632   Status status;
1633   ReadRangeDelAggregator range_del_agg(&icmp,
1634                                        kMaxSequenceNumber /* upper_bound */);
1635 
1636   *overlap = false;
1637 
1638   if (level == 0) {
1639     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1640       const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1641       if (AfterFile(ucmp, &smallest_user_key, file) ||
1642           BeforeFile(ucmp, &largest_user_key, file)) {
1643         continue;
1644       }
1645       ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1646           read_options, file_options, cfd_->internal_comparator(),
1647           *file->file_metadata, &range_del_agg,
1648           mutable_cf_options_.prefix_extractor.get(), nullptr,
1649           cfd_->internal_stats()->GetFileReadHist(0),
1650           TableReaderCaller::kUserIterator, &arena,
1651           /*skip_filters=*/false, /*level=*/0,
1652           /*smallest_compaction_key=*/nullptr,
1653           /*largest_compaction_key=*/nullptr));
1654       status = OverlapWithIterator(
1655           ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1656       if (!status.ok() || *overlap) {
1657         break;
1658       }
1659     }
1660   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1661     auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1662     ScopedArenaIterator iter(new (mem) LevelIterator(
1663         cfd_->table_cache(), read_options, file_options,
1664         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1665         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1666         cfd_->internal_stats()->GetFileReadHist(level),
1667         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1668         &range_del_agg));
1669     status = OverlapWithIterator(
1670         ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1671   }
1672 
1673   if (status.ok() && *overlap == false &&
1674       range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1675     *overlap = true;
1676   }
1677   return status;
1678 }
1679 
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1680 VersionStorageInfo::VersionStorageInfo(
1681     const InternalKeyComparator* internal_comparator,
1682     const Comparator* user_comparator, int levels,
1683     CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1684     bool _force_consistency_checks)
1685     : internal_comparator_(internal_comparator),
1686       user_comparator_(user_comparator),
1687       // cfd is nullptr if Version is dummy
1688       num_levels_(levels),
1689       num_non_empty_levels_(0),
1690       file_indexer_(user_comparator),
1691       compaction_style_(compaction_style),
1692       files_(new std::vector<FileMetaData*>[num_levels_]),
1693       base_level_(num_levels_ == 1 ? -1 : 1),
1694       level_multiplier_(0.0),
1695       files_by_compaction_pri_(num_levels_),
1696       level0_non_overlapping_(false),
1697       next_file_to_compact_by_size_(num_levels_),
1698       compaction_score_(num_levels_),
1699       compaction_level_(num_levels_),
1700       l0_delay_trigger_count_(0),
1701       accumulated_file_size_(0),
1702       accumulated_raw_key_size_(0),
1703       accumulated_raw_value_size_(0),
1704       accumulated_num_non_deletions_(0),
1705       accumulated_num_deletions_(0),
1706       current_num_non_deletions_(0),
1707       current_num_deletions_(0),
1708       current_num_samples_(0),
1709       estimated_compaction_needed_bytes_(0),
1710       finalized_(false),
1711       force_consistency_checks_(_force_consistency_checks) {
1712   if (ref_vstorage != nullptr) {
1713     accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1714     accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1715     accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1716     accumulated_num_non_deletions_ =
1717         ref_vstorage->accumulated_num_non_deletions_;
1718     accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1719     current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1720     current_num_deletions_ = ref_vstorage->current_num_deletions_;
1721     current_num_samples_ = ref_vstorage->current_num_samples_;
1722     oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1723   }
1724 }
1725 
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,uint64_t version_number)1726 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1727                  const FileOptions& file_opt,
1728                  const MutableCFOptions mutable_cf_options,
1729                  uint64_t version_number)
1730     : env_(vset->env_),
1731       cfd_(column_family_data),
1732       info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1733       db_statistics_((cfd_ == nullptr) ? nullptr
1734                                        : cfd_->ioptions()->statistics),
1735       table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1736       merge_operator_((cfd_ == nullptr) ? nullptr
1737                                         : cfd_->ioptions()->merge_operator),
1738       storage_info_(
1739           (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1740           (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1741           cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1742           cfd_ == nullptr ? kCompactionStyleLevel
1743                           : cfd_->ioptions()->compaction_style,
1744           (cfd_ == nullptr || cfd_->current() == nullptr)
1745               ? nullptr
1746               : cfd_->current()->storage_info(),
1747           cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1748       vset_(vset),
1749       next_(this),
1750       prev_(this),
1751       refs_(0),
1752       file_options_(file_opt),
1753       mutable_cf_options_(mutable_cf_options),
1754       version_number_(version_number) {}
1755 
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1756 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1757                   PinnableSlice* value, Status* status,
1758                   MergeContext* merge_context,
1759                   SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1760                   bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1761                   bool* is_blob, bool do_merge) {
1762   Slice ikey = k.internal_key();
1763   Slice user_key = k.user_key();
1764 
1765   assert(status->ok() || status->IsMergeInProgress());
1766 
1767   if (key_exists != nullptr) {
1768     // will falsify below if not found
1769     *key_exists = true;
1770   }
1771 
1772   PinnedIteratorsManager pinned_iters_mgr;
1773   uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1774   if (vset_ && vset_->block_cache_tracer_ &&
1775       vset_->block_cache_tracer_->is_tracing_enabled()) {
1776     tracing_get_id = vset_->block_cache_tracer_->NextGetId();
1777   }
1778   GetContext get_context(
1779       user_comparator(), merge_operator_, info_log_, db_statistics_,
1780       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1781       do_merge ? value : nullptr, value_found, merge_context, do_merge,
1782       max_covering_tombstone_seq, this->env_, seq,
1783       merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1784       tracing_get_id);
1785 
1786   // Pin blocks that we read to hold merge operands
1787   if (merge_operator_) {
1788     pinned_iters_mgr.StartPinning();
1789   }
1790 
1791   FilePicker fp(
1792       storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1793       storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1794       user_comparator(), internal_comparator());
1795   FdWithKeyRange* f = fp.GetNextFile();
1796 
1797   while (f != nullptr) {
1798     if (*max_covering_tombstone_seq > 0) {
1799       // The remaining files we look at will only contain covered keys, so we
1800       // stop here.
1801       break;
1802     }
1803     if (get_context.sample()) {
1804       sample_file_read_inc(f->file_metadata);
1805     }
1806 
1807     bool timer_enabled =
1808         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1809         get_perf_context()->per_level_perf_context_enabled;
1810     StopWatchNano timer(env_, timer_enabled /* auto_start */);
1811     *status = table_cache_->Get(
1812         read_options, *internal_comparator(), *f->file_metadata, ikey,
1813         &get_context, mutable_cf_options_.prefix_extractor.get(),
1814         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1815         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1816                         fp.IsHitFileLastInLevel()),
1817         fp.GetCurrentLevel());
1818     // TODO: examine the behavior for corrupted key
1819     if (timer_enabled) {
1820       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1821                                 fp.GetCurrentLevel());
1822     }
1823     if (!status->ok()) {
1824       return;
1825     }
1826 
1827     // report the counters before returning
1828     if (get_context.State() != GetContext::kNotFound &&
1829         get_context.State() != GetContext::kMerge &&
1830         db_statistics_ != nullptr) {
1831       get_context.ReportCounters();
1832     }
1833     switch (get_context.State()) {
1834       case GetContext::kNotFound:
1835         // Keep searching in other files
1836         break;
1837       case GetContext::kMerge:
1838         // TODO: update per-level perfcontext user_key_return_count for kMerge
1839         break;
1840       case GetContext::kFound:
1841         if (fp.GetHitFileLevel() == 0) {
1842           RecordTick(db_statistics_, GET_HIT_L0);
1843         } else if (fp.GetHitFileLevel() == 1) {
1844           RecordTick(db_statistics_, GET_HIT_L1);
1845         } else if (fp.GetHitFileLevel() >= 2) {
1846           RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1847         }
1848         PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1849                                   fp.GetHitFileLevel());
1850         return;
1851       case GetContext::kDeleted:
1852         // Use empty error message for speed
1853         *status = Status::NotFound();
1854         return;
1855       case GetContext::kCorrupt:
1856         *status = Status::Corruption("corrupted key for ", user_key);
1857         return;
1858       case GetContext::kBlobIndex:
1859         ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1860         *status = Status::NotSupported(
1861             "Encounter unexpected blob index. Please open DB with "
1862             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1863         return;
1864     }
1865     f = fp.GetNextFile();
1866   }
1867   if (db_statistics_ != nullptr) {
1868     get_context.ReportCounters();
1869   }
1870   if (GetContext::kMerge == get_context.State()) {
1871     if (!do_merge) {
1872       *status = Status::OK();
1873       return;
1874     }
1875     if (!merge_operator_) {
1876       *status =  Status::InvalidArgument(
1877           "merge_operator is not properly initialized.");
1878       return;
1879     }
1880     // merge_operands are in saver and we hit the beginning of the key history
1881     // do a final merge of nullptr and operands;
1882     std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1883     *status = MergeHelper::TimedFullMerge(
1884         merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1885         str_value, info_log_, db_statistics_, env_,
1886         nullptr /* result_operand */, true);
1887     if (LIKELY(value != nullptr)) {
1888       value->PinSelf();
1889     }
1890   } else {
1891     if (key_exists != nullptr) {
1892       *key_exists = false;
1893     }
1894     *status = Status::NotFound(); // Use an empty error message for speed
1895   }
1896 }
1897 
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback,bool * is_blob)1898 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
1899                        ReadCallback* callback, bool* is_blob) {
1900   PinnedIteratorsManager pinned_iters_mgr;
1901 
1902   // Pin blocks that we read to hold merge operands
1903   if (merge_operator_) {
1904     pinned_iters_mgr.StartPinning();
1905   }
1906   uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
1907 
1908   if (vset_ && vset_->block_cache_tracer_ &&
1909       vset_->block_cache_tracer_->is_tracing_enabled()) {
1910     tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
1911   }
1912   // Even though we know the batch size won't be > MAX_BATCH_SIZE,
1913   // use autovector in order to avoid unnecessary construction of GetContext
1914   // objects, which is expensive
1915   autovector<GetContext, 16> get_ctx;
1916   for (auto iter = range->begin(); iter != range->end(); ++iter) {
1917     assert(iter->s->ok() || iter->s->IsMergeInProgress());
1918     get_ctx.emplace_back(
1919         user_comparator(), merge_operator_, info_log_, db_statistics_,
1920         iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge, iter->ukey,
1921         iter->value, nullptr, &(iter->merge_context), true,
1922         &iter->max_covering_tombstone_seq, this->env_, nullptr,
1923         merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob,
1924         tracing_mget_id);
1925     // MergeInProgress status, if set, has been transferred to the get_context
1926     // state, so we set status to ok here. From now on, the iter status will
1927     // be used for IO errors, and get_context state will be used for any
1928     // key level errors
1929     *(iter->s) = Status::OK();
1930   }
1931   int get_ctx_index = 0;
1932   for (auto iter = range->begin(); iter != range->end();
1933        ++iter, get_ctx_index++) {
1934     iter->get_context = &(get_ctx[get_ctx_index]);
1935   }
1936 
1937   MultiGetRange file_picker_range(*range, range->begin(), range->end());
1938   FilePickerMultiGet fp(
1939       &file_picker_range,
1940       &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
1941       &storage_info_.file_indexer_, user_comparator(), internal_comparator());
1942   FdWithKeyRange* f = fp.GetNextFile();
1943 
1944   while (f != nullptr) {
1945     MultiGetRange file_range = fp.CurrentFileRange();
1946     bool timer_enabled =
1947         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1948         get_perf_context()->per_level_perf_context_enabled;
1949     StopWatchNano timer(env_, timer_enabled /* auto_start */);
1950     Status s = table_cache_->MultiGet(
1951         read_options, *internal_comparator(), *f->file_metadata, &file_range,
1952         mutable_cf_options_.prefix_extractor.get(),
1953         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1954         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1955                         fp.IsHitFileLastInLevel()),
1956         fp.GetCurrentLevel());
1957     // TODO: examine the behavior for corrupted key
1958     if (timer_enabled) {
1959       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1960                                 fp.GetCurrentLevel());
1961     }
1962     if (!s.ok()) {
1963       // TODO: Set status for individual keys appropriately
1964       for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1965         *iter->s = s;
1966         file_range.MarkKeyDone(iter);
1967       }
1968       return;
1969     }
1970     uint64_t batch_size = 0;
1971     for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
1972       GetContext& get_context = *iter->get_context;
1973       Status* status = iter->s;
1974       // The Status in the KeyContext takes precedence over GetContext state
1975       // Status may be an error if there were any IO errors in the table
1976       // reader. We never expect Status to be NotFound(), as that is
1977       // determined by get_context
1978       assert(!status->IsNotFound());
1979       if (!status->ok()) {
1980         file_range.MarkKeyDone(iter);
1981         continue;
1982       }
1983 
1984       if (get_context.sample()) {
1985         sample_file_read_inc(f->file_metadata);
1986       }
1987       batch_size++;
1988       // report the counters before returning
1989       if (get_context.State() != GetContext::kNotFound &&
1990           get_context.State() != GetContext::kMerge &&
1991           db_statistics_ != nullptr) {
1992         get_context.ReportCounters();
1993       } else {
1994         if (iter->max_covering_tombstone_seq > 0) {
1995           // The remaining files we look at will only contain covered keys, so
1996           // we stop here for this key
1997           file_picker_range.SkipKey(iter);
1998         }
1999       }
2000       switch (get_context.State()) {
2001         case GetContext::kNotFound:
2002           // Keep searching in other files
2003           break;
2004         case GetContext::kMerge:
2005           // TODO: update per-level perfcontext user_key_return_count for kMerge
2006           break;
2007         case GetContext::kFound:
2008           if (fp.GetHitFileLevel() == 0) {
2009             RecordTick(db_statistics_, GET_HIT_L0);
2010           } else if (fp.GetHitFileLevel() == 1) {
2011             RecordTick(db_statistics_, GET_HIT_L1);
2012           } else if (fp.GetHitFileLevel() >= 2) {
2013             RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2014           }
2015           PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2016                                     fp.GetHitFileLevel());
2017           file_range.MarkKeyDone(iter);
2018           continue;
2019         case GetContext::kDeleted:
2020           // Use empty error message for speed
2021           *status = Status::NotFound();
2022           file_range.MarkKeyDone(iter);
2023           continue;
2024         case GetContext::kCorrupt:
2025           *status =
2026               Status::Corruption("corrupted key for ", iter->lkey->user_key());
2027           file_range.MarkKeyDone(iter);
2028           continue;
2029         case GetContext::kBlobIndex:
2030           ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2031           *status = Status::NotSupported(
2032               "Encounter unexpected blob index. Please open DB with "
2033               "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2034           file_range.MarkKeyDone(iter);
2035           continue;
2036       }
2037     }
2038     RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2039     if (file_picker_range.empty()) {
2040       break;
2041     }
2042     f = fp.GetNextFile();
2043   }
2044 
2045   // Process any left over keys
2046   for (auto iter = range->begin(); iter != range->end(); ++iter) {
2047     GetContext& get_context = *iter->get_context;
2048     Status* status = iter->s;
2049     Slice user_key = iter->lkey->user_key();
2050 
2051     if (db_statistics_ != nullptr) {
2052       get_context.ReportCounters();
2053     }
2054     if (GetContext::kMerge == get_context.State()) {
2055       if (!merge_operator_) {
2056         *status = Status::InvalidArgument(
2057             "merge_operator is not properly initialized.");
2058         range->MarkKeyDone(iter);
2059         continue;
2060       }
2061       // merge_operands are in saver and we hit the beginning of the key history
2062       // do a final merge of nullptr and operands;
2063       std::string* str_value =
2064           iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2065       *status = MergeHelper::TimedFullMerge(
2066           merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2067           str_value, info_log_, db_statistics_, env_,
2068           nullptr /* result_operand */, true);
2069       if (LIKELY(iter->value != nullptr)) {
2070         iter->value->PinSelf();
2071       }
2072     } else {
2073       range->MarkKeyDone(iter);
2074       *status = Status::NotFound();  // Use an empty error message for speed
2075     }
2076   }
2077 }
2078 
IsFilterSkipped(int level,bool is_file_last_in_level)2079 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2080   // Reaching the bottom level implies misses at all upper levels, so we'll
2081   // skip checking the filters when we predict a hit.
2082   return cfd_->ioptions()->optimize_filters_for_hits &&
2083          (level > 0 || is_file_last_in_level) &&
2084          level == storage_info_.num_non_empty_levels() - 1;
2085 }
2086 
GenerateLevelFilesBrief()2087 void VersionStorageInfo::GenerateLevelFilesBrief() {
2088   level_files_brief_.resize(num_non_empty_levels_);
2089   for (int level = 0; level < num_non_empty_levels_; level++) {
2090     DoGenerateLevelFilesBrief(
2091         &level_files_brief_[level], files_[level], &arena_);
2092   }
2093 }
2094 
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2095 void Version::PrepareApply(
2096     const MutableCFOptions& mutable_cf_options,
2097     bool update_stats) {
2098   UpdateAccumulatedStats(update_stats);
2099   storage_info_.UpdateNumNonEmptyLevels();
2100   storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2101   storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2102   storage_info_.GenerateFileIndexer();
2103   storage_info_.GenerateLevelFilesBrief();
2104   storage_info_.GenerateLevel0NonOverlapping();
2105   storage_info_.GenerateBottommostFiles();
2106 }
2107 
MaybeInitializeFileMetaData(FileMetaData * file_meta)2108 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2109   if (file_meta->init_stats_from_file ||
2110       file_meta->compensated_file_size > 0) {
2111     return false;
2112   }
2113   std::shared_ptr<const TableProperties> tp;
2114   Status s = GetTableProperties(&tp, file_meta);
2115   file_meta->init_stats_from_file = true;
2116   if (!s.ok()) {
2117     ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2118                     "Unable to load table properties for file %" PRIu64
2119                     " --- %s\n",
2120                     file_meta->fd.GetNumber(), s.ToString().c_str());
2121     return false;
2122   }
2123   if (tp.get() == nullptr) return false;
2124   file_meta->num_entries = tp->num_entries;
2125   file_meta->num_deletions = tp->num_deletions;
2126   file_meta->raw_value_size = tp->raw_value_size;
2127   file_meta->raw_key_size = tp->raw_key_size;
2128 
2129   return true;
2130 }
2131 
UpdateAccumulatedStats(FileMetaData * file_meta)2132 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2133   TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2134                            nullptr);
2135 
2136   assert(file_meta->init_stats_from_file);
2137   accumulated_file_size_ += file_meta->fd.GetFileSize();
2138   accumulated_raw_key_size_ += file_meta->raw_key_size;
2139   accumulated_raw_value_size_ += file_meta->raw_value_size;
2140   accumulated_num_non_deletions_ +=
2141       file_meta->num_entries - file_meta->num_deletions;
2142   accumulated_num_deletions_ += file_meta->num_deletions;
2143 
2144   current_num_non_deletions_ +=
2145       file_meta->num_entries - file_meta->num_deletions;
2146   current_num_deletions_ += file_meta->num_deletions;
2147   current_num_samples_++;
2148 }
2149 
RemoveCurrentStats(FileMetaData * file_meta)2150 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2151   if (file_meta->init_stats_from_file) {
2152     current_num_non_deletions_ -=
2153         file_meta->num_entries - file_meta->num_deletions;
2154     current_num_deletions_ -= file_meta->num_deletions;
2155     current_num_samples_--;
2156   }
2157 }
2158 
UpdateAccumulatedStats(bool update_stats)2159 void Version::UpdateAccumulatedStats(bool update_stats) {
2160   if (update_stats) {
2161     // maximum number of table properties loaded from files.
2162     const int kMaxInitCount = 20;
2163     int init_count = 0;
2164     // here only the first kMaxInitCount files which haven't been
2165     // initialized from file will be updated with num_deletions.
2166     // The motivation here is to cap the maximum I/O per Version creation.
2167     // The reason for choosing files from lower-level instead of higher-level
2168     // is that such design is able to propagate the initialization from
2169     // lower-level to higher-level:  When the num_deletions of lower-level
2170     // files are updated, it will make the lower-level files have accurate
2171     // compensated_file_size, making lower-level to higher-level compaction
2172     // will be triggered, which creates higher-level files whose num_deletions
2173     // will be updated here.
2174     for (int level = 0;
2175          level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2176          ++level) {
2177       for (auto* file_meta : storage_info_.files_[level]) {
2178         if (MaybeInitializeFileMetaData(file_meta)) {
2179           // each FileMeta will be initialized only once.
2180           storage_info_.UpdateAccumulatedStats(file_meta);
2181           // when option "max_open_files" is -1, all the file metadata has
2182           // already been read, so MaybeInitializeFileMetaData() won't incur
2183           // any I/O cost. "max_open_files=-1" means that the table cache passed
2184           // to the VersionSet and then to the ColumnFamilySet has a size of
2185           // TableCache::kInfiniteCapacity
2186           if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2187               TableCache::kInfiniteCapacity) {
2188             continue;
2189           }
2190           if (++init_count >= kMaxInitCount) {
2191             break;
2192           }
2193         }
2194       }
2195     }
2196     // In case all sampled-files contain only deletion entries, then we
2197     // load the table-property of a file in higher-level to initialize
2198     // that value.
2199     for (int level = storage_info_.num_levels_ - 1;
2200          storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2201          --level) {
2202       for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2203            storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2204         if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2205           storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2206         }
2207       }
2208     }
2209   }
2210 
2211   storage_info_.ComputeCompensatedSizes();
2212 }
2213 
ComputeCompensatedSizes()2214 void VersionStorageInfo::ComputeCompensatedSizes() {
2215   static const int kDeletionWeightOnCompaction = 2;
2216   uint64_t average_value_size = GetAverageValueSize();
2217 
2218   // compute the compensated size
2219   for (int level = 0; level < num_levels_; level++) {
2220     for (auto* file_meta : files_[level]) {
2221       // Here we only compute compensated_file_size for those file_meta
2222       // which compensated_file_size is uninitialized (== 0). This is true only
2223       // for files that have been created right now and no other thread has
2224       // access to them. That's why we can safely mutate compensated_file_size.
2225       if (file_meta->compensated_file_size == 0) {
2226         file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2227         // Here we only boost the size of deletion entries of a file only
2228         // when the number of deletion entries is greater than the number of
2229         // non-deletion entries in the file.  The motivation here is that in
2230         // a stable workload, the number of deletion entries should be roughly
2231         // equal to the number of non-deletion entries.  If we compensate the
2232         // size of deletion entries in a stable workload, the deletion
2233         // compensation logic might introduce unwanted effet which changes the
2234         // shape of LSM tree.
2235         if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2236           file_meta->compensated_file_size +=
2237               (file_meta->num_deletions * 2 - file_meta->num_entries) *
2238               average_value_size * kDeletionWeightOnCompaction;
2239         }
2240       }
2241     }
2242   }
2243 }
2244 
MaxInputLevel() const2245 int VersionStorageInfo::MaxInputLevel() const {
2246   if (compaction_style_ == kCompactionStyleLevel) {
2247     return num_levels() - 2;
2248   }
2249   return 0;
2250 }
2251 
MaxOutputLevel(bool allow_ingest_behind) const2252 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2253   if (allow_ingest_behind) {
2254     assert(num_levels() > 1);
2255     return num_levels() - 2;
2256   }
2257   return num_levels() - 1;
2258 }
2259 
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2260 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2261     const MutableCFOptions& mutable_cf_options) {
2262   // Only implemented for level-based compaction
2263   if (compaction_style_ != kCompactionStyleLevel) {
2264     estimated_compaction_needed_bytes_ = 0;
2265     return;
2266   }
2267 
2268   // Start from Level 0, if level 0 qualifies compaction to level 1,
2269   // we estimate the size of compaction.
2270   // Then we move on to the next level and see whether it qualifies compaction
2271   // to the next level. The size of the level is estimated as the actual size
2272   // on the level plus the input bytes from the previous level if there is any.
2273   // If it exceeds, take the exceeded bytes as compaction input and add the size
2274   // of the compaction size to tatal size.
2275   // We keep doing it to Level 2, 3, etc, until the last level and return the
2276   // accumulated bytes.
2277 
2278   uint64_t bytes_compact_to_next_level = 0;
2279   uint64_t level_size = 0;
2280   for (auto* f : files_[0]) {
2281     level_size += f->fd.GetFileSize();
2282   }
2283   // Level 0
2284   bool level0_compact_triggered = false;
2285   if (static_cast<int>(files_[0].size()) >=
2286           mutable_cf_options.level0_file_num_compaction_trigger ||
2287       level_size >= mutable_cf_options.max_bytes_for_level_base) {
2288     level0_compact_triggered = true;
2289     estimated_compaction_needed_bytes_ = level_size;
2290     bytes_compact_to_next_level = level_size;
2291   } else {
2292     estimated_compaction_needed_bytes_ = 0;
2293   }
2294 
2295   // Level 1 and up.
2296   uint64_t bytes_next_level = 0;
2297   for (int level = base_level(); level <= MaxInputLevel(); level++) {
2298     level_size = 0;
2299     if (bytes_next_level > 0) {
2300 #ifndef NDEBUG
2301       uint64_t level_size2 = 0;
2302       for (auto* f : files_[level]) {
2303         level_size2 += f->fd.GetFileSize();
2304       }
2305       assert(level_size2 == bytes_next_level);
2306 #endif
2307       level_size = bytes_next_level;
2308       bytes_next_level = 0;
2309     } else {
2310       for (auto* f : files_[level]) {
2311         level_size += f->fd.GetFileSize();
2312       }
2313     }
2314     if (level == base_level() && level0_compact_triggered) {
2315       // Add base level size to compaction if level0 compaction triggered.
2316       estimated_compaction_needed_bytes_ += level_size;
2317     }
2318     // Add size added by previous compaction
2319     level_size += bytes_compact_to_next_level;
2320     bytes_compact_to_next_level = 0;
2321     uint64_t level_target = MaxBytesForLevel(level);
2322     if (level_size > level_target) {
2323       bytes_compact_to_next_level = level_size - level_target;
2324       // Estimate the actual compaction fan-out ratio as size ratio between
2325       // the two levels.
2326 
2327       assert(bytes_next_level == 0);
2328       if (level + 1 < num_levels_) {
2329         for (auto* f : files_[level + 1]) {
2330           bytes_next_level += f->fd.GetFileSize();
2331         }
2332       }
2333       if (bytes_next_level > 0) {
2334         assert(level_size > 0);
2335         estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2336             static_cast<double>(bytes_compact_to_next_level) *
2337             (static_cast<double>(bytes_next_level) /
2338                  static_cast<double>(level_size) +
2339              1));
2340       }
2341     }
2342   }
2343 }
2344 
2345 namespace {
GetExpiredTtlFilesCount(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2346 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
2347                                  const MutableCFOptions& mutable_cf_options,
2348                                  const std::vector<FileMetaData*>& files) {
2349   uint32_t ttl_expired_files_count = 0;
2350 
2351   int64_t _current_time;
2352   auto status = ioptions.env->GetCurrentTime(&_current_time);
2353   if (status.ok()) {
2354     const uint64_t current_time = static_cast<uint64_t>(_current_time);
2355     for (FileMetaData* f : files) {
2356       if (!f->being_compacted) {
2357         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2358         if (oldest_ancester_time != 0 &&
2359             oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2360           ttl_expired_files_count++;
2361         }
2362       }
2363     }
2364   }
2365   return ttl_expired_files_count;
2366 }
2367 }  // anonymous namespace
2368 
ComputeCompactionScore(const ImmutableCFOptions & immutable_cf_options,const MutableCFOptions & mutable_cf_options)2369 void VersionStorageInfo::ComputeCompactionScore(
2370     const ImmutableCFOptions& immutable_cf_options,
2371     const MutableCFOptions& mutable_cf_options) {
2372   for (int level = 0; level <= MaxInputLevel(); level++) {
2373     double score;
2374     if (level == 0) {
2375       // We treat level-0 specially by bounding the number of files
2376       // instead of number of bytes for two reasons:
2377       //
2378       // (1) With larger write-buffer sizes, it is nice not to do too
2379       // many level-0 compactions.
2380       //
2381       // (2) The files in level-0 are merged on every read and
2382       // therefore we wish to avoid too many files when the individual
2383       // file size is small (perhaps because of a small write-buffer
2384       // setting, or very high compression ratios, or lots of
2385       // overwrites/deletions).
2386       int num_sorted_runs = 0;
2387       uint64_t total_size = 0;
2388       for (auto* f : files_[level]) {
2389         if (!f->being_compacted) {
2390           total_size += f->compensated_file_size;
2391           num_sorted_runs++;
2392         }
2393       }
2394       if (compaction_style_ == kCompactionStyleUniversal) {
2395         // For universal compaction, we use level0 score to indicate
2396         // compaction score for the whole DB. Adding other levels as if
2397         // they are L0 files.
2398         for (int i = 1; i < num_levels(); i++) {
2399           if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2400             num_sorted_runs++;
2401           }
2402         }
2403       }
2404 
2405       if (compaction_style_ == kCompactionStyleFIFO) {
2406         score = static_cast<double>(total_size) /
2407                 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2408         if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2409           score = std::max(
2410               static_cast<double>(num_sorted_runs) /
2411                   mutable_cf_options.level0_file_num_compaction_trigger,
2412               score);
2413         }
2414         if (mutable_cf_options.ttl > 0) {
2415           score = std::max(
2416               static_cast<double>(GetExpiredTtlFilesCount(
2417                   immutable_cf_options, mutable_cf_options, files_[level])),
2418               score);
2419         }
2420 
2421       } else {
2422         score = static_cast<double>(num_sorted_runs) /
2423                 mutable_cf_options.level0_file_num_compaction_trigger;
2424         if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2425           // Level-based involves L0->L0 compactions that can lead to oversized
2426           // L0 files. Take into account size as well to avoid later giant
2427           // compactions to the base level.
2428           score = std::max(
2429               score, static_cast<double>(total_size) /
2430                      mutable_cf_options.max_bytes_for_level_base);
2431         }
2432       }
2433     } else {
2434       // Compute the ratio of current size to size limit.
2435       uint64_t level_bytes_no_compacting = 0;
2436       for (auto f : files_[level]) {
2437         if (!f->being_compacted) {
2438           level_bytes_no_compacting += f->compensated_file_size;
2439         }
2440       }
2441       score = static_cast<double>(level_bytes_no_compacting) /
2442               MaxBytesForLevel(level);
2443     }
2444     compaction_level_[level] = level;
2445     compaction_score_[level] = score;
2446   }
2447 
2448   // sort all the levels based on their score. Higher scores get listed
2449   // first. Use bubble sort because the number of entries are small.
2450   for (int i = 0; i < num_levels() - 2; i++) {
2451     for (int j = i + 1; j < num_levels() - 1; j++) {
2452       if (compaction_score_[i] < compaction_score_[j]) {
2453         double score = compaction_score_[i];
2454         int level = compaction_level_[i];
2455         compaction_score_[i] = compaction_score_[j];
2456         compaction_level_[i] = compaction_level_[j];
2457         compaction_score_[j] = score;
2458         compaction_level_[j] = level;
2459       }
2460     }
2461   }
2462   ComputeFilesMarkedForCompaction();
2463   ComputeBottommostFilesMarkedForCompaction();
2464   if (mutable_cf_options.ttl > 0) {
2465     ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
2466   }
2467   if (mutable_cf_options.periodic_compaction_seconds > 0) {
2468     ComputeFilesMarkedForPeriodicCompaction(
2469         immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
2470   }
2471   EstimateCompactionBytesNeeded(mutable_cf_options);
2472 }
2473 
ComputeFilesMarkedForCompaction()2474 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2475   files_marked_for_compaction_.clear();
2476   int last_qualify_level = 0;
2477 
2478   // Do not include files from the last level with data
2479   // If table properties collector suggests a file on the last level,
2480   // we should not move it to a new level.
2481   for (int level = num_levels() - 1; level >= 1; level--) {
2482     if (!files_[level].empty()) {
2483       last_qualify_level = level - 1;
2484       break;
2485     }
2486   }
2487 
2488   for (int level = 0; level <= last_qualify_level; level++) {
2489     for (auto* f : files_[level]) {
2490       if (!f->being_compacted && f->marked_for_compaction) {
2491         files_marked_for_compaction_.emplace_back(level, f);
2492       }
2493     }
2494   }
2495 }
2496 
ComputeExpiredTtlFiles(const ImmutableCFOptions & ioptions,const uint64_t ttl)2497 void VersionStorageInfo::ComputeExpiredTtlFiles(
2498     const ImmutableCFOptions& ioptions, const uint64_t ttl) {
2499   assert(ttl > 0);
2500 
2501   expired_ttl_files_.clear();
2502 
2503   int64_t _current_time;
2504   auto status = ioptions.env->GetCurrentTime(&_current_time);
2505   if (!status.ok()) {
2506     return;
2507   }
2508   const uint64_t current_time = static_cast<uint64_t>(_current_time);
2509 
2510   for (int level = 0; level < num_levels() - 1; level++) {
2511     for (FileMetaData* f : files_[level]) {
2512       if (!f->being_compacted) {
2513         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2514         if (oldest_ancester_time > 0 &&
2515             oldest_ancester_time < (current_time - ttl)) {
2516           expired_ttl_files_.emplace_back(level, f);
2517         }
2518       }
2519     }
2520   }
2521 }
2522 
ComputeFilesMarkedForPeriodicCompaction(const ImmutableCFOptions & ioptions,const uint64_t periodic_compaction_seconds)2523 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2524     const ImmutableCFOptions& ioptions,
2525     const uint64_t periodic_compaction_seconds) {
2526   assert(periodic_compaction_seconds > 0);
2527 
2528   files_marked_for_periodic_compaction_.clear();
2529 
2530   int64_t temp_current_time;
2531   auto status = ioptions.env->GetCurrentTime(&temp_current_time);
2532   if (!status.ok()) {
2533     return;
2534   }
2535   const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2536 
2537   // If periodic_compaction_seconds is larger than current time, periodic
2538   // compaction can't possibly be triggered.
2539   if (periodic_compaction_seconds > current_time) {
2540     return;
2541   }
2542 
2543   const uint64_t allowed_time_limit =
2544       current_time - periodic_compaction_seconds;
2545 
2546   for (int level = 0; level < num_levels(); level++) {
2547     for (auto f : files_[level]) {
2548       if (!f->being_compacted) {
2549         // Compute a file's modification time in the following order:
2550         // 1. Use file_creation_time table property if it is > 0.
2551         // 2. Use creation_time table property if it is > 0.
2552         // 3. Use file's mtime metadata if the above two table properties are 0.
2553         // Don't consider the file at all if the modification time cannot be
2554         // correctly determined based on the above conditions.
2555         uint64_t file_modification_time = f->TryGetFileCreationTime();
2556         if (file_modification_time == kUnknownFileCreationTime) {
2557           file_modification_time = f->TryGetOldestAncesterTime();
2558         }
2559         if (file_modification_time == kUnknownOldestAncesterTime) {
2560           auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2561                                          f->fd.GetPathId());
2562           status = ioptions.env->GetFileModificationTime(
2563               file_path, &file_modification_time);
2564           if (!status.ok()) {
2565             ROCKS_LOG_WARN(ioptions.info_log,
2566                            "Can't get file modification time: %s: %s",
2567                            file_path.c_str(), status.ToString().c_str());
2568             continue;
2569           }
2570         }
2571         if (file_modification_time > 0 &&
2572             file_modification_time < allowed_time_limit) {
2573           files_marked_for_periodic_compaction_.emplace_back(level, f);
2574         }
2575       }
2576     }
2577   }
2578 }
2579 
2580 namespace {
2581 
2582 // used to sort files by size
2583 struct Fsize {
2584   size_t index;
2585   FileMetaData* file;
2586 };
2587 
2588 // Compator that is used to sort files based on their size
2589 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)2590 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2591   return (first.file->compensated_file_size >
2592       second.file->compensated_file_size);
2593 }
2594 } // anonymous namespace
2595 
AddFile(int level,FileMetaData * f,Logger * info_log)2596 void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) {
2597   auto* level_files = &files_[level];
2598   // Must not overlap
2599 #ifndef NDEBUG
2600   if (level > 0 && !level_files->empty() &&
2601       internal_comparator_->Compare(
2602           (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) {
2603     auto* f2 = (*level_files)[level_files->size() - 1];
2604     if (info_log != nullptr) {
2605       Error(info_log, "Adding new file %" PRIu64
2606                       " range (%s, %s) to level %d but overlapping "
2607                       "with existing file %" PRIu64 " %s %s",
2608             f->fd.GetNumber(), f->smallest.DebugString(true).c_str(),
2609             f->largest.DebugString(true).c_str(), level, f2->fd.GetNumber(),
2610             f2->smallest.DebugString(true).c_str(),
2611             f2->largest.DebugString(true).c_str());
2612       LogFlush(info_log);
2613     }
2614     assert(false);
2615   }
2616 #else
2617   (void)info_log;
2618 #endif
2619   f->refs++;
2620   level_files->push_back(f);
2621 }
2622 
2623 // Version::PrepareApply() need to be called before calling the function, or
2624 // following functions called:
2625 // 1. UpdateNumNonEmptyLevels();
2626 // 2. CalculateBaseBytes();
2627 // 3. UpdateFilesByCompactionPri();
2628 // 4. GenerateFileIndexer();
2629 // 5. GenerateLevelFilesBrief();
2630 // 6. GenerateLevel0NonOverlapping();
2631 // 7. GenerateBottommostFiles();
SetFinalized()2632 void VersionStorageInfo::SetFinalized() {
2633   finalized_ = true;
2634 #ifndef NDEBUG
2635   if (compaction_style_ != kCompactionStyleLevel) {
2636     // Not level based compaction.
2637     return;
2638   }
2639   assert(base_level_ < 0 || num_levels() == 1 ||
2640          (base_level_ >= 1 && base_level_ < num_levels()));
2641   // Verify all levels newer than base_level are empty except L0
2642   for (int level = 1; level < base_level(); level++) {
2643     assert(NumLevelBytes(level) == 0);
2644   }
2645   uint64_t max_bytes_prev_level = 0;
2646   for (int level = base_level(); level < num_levels() - 1; level++) {
2647     if (LevelFiles(level).size() == 0) {
2648       continue;
2649     }
2650     assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
2651     max_bytes_prev_level = MaxBytesForLevel(level);
2652   }
2653   int num_empty_non_l0_level = 0;
2654   for (int level = 0; level < num_levels(); level++) {
2655     assert(LevelFiles(level).size() == 0 ||
2656            LevelFiles(level).size() == LevelFilesBrief(level).num_files);
2657     if (level > 0 && NumLevelBytes(level) > 0) {
2658       num_empty_non_l0_level++;
2659     }
2660     if (LevelFiles(level).size() > 0) {
2661       assert(level < num_non_empty_levels());
2662     }
2663   }
2664   assert(compaction_level_.size() > 0);
2665   assert(compaction_level_.size() == compaction_score_.size());
2666 #endif
2667 }
2668 
UpdateNumNonEmptyLevels()2669 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2670   num_non_empty_levels_ = num_levels_;
2671   for (int i = num_levels_ - 1; i >= 0; i--) {
2672     if (files_[i].size() != 0) {
2673       return;
2674     } else {
2675       num_non_empty_levels_ = i;
2676     }
2677   }
2678 }
2679 
2680 namespace {
2681 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)2682 void SortFileByOverlappingRatio(
2683     const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
2684     const std::vector<FileMetaData*>& next_level_files,
2685     std::vector<Fsize>* temp) {
2686   std::unordered_map<uint64_t, uint64_t> file_to_order;
2687   auto next_level_it = next_level_files.begin();
2688 
2689   for (auto& file : files) {
2690     uint64_t overlapping_bytes = 0;
2691     // Skip files in next level that is smaller than current file
2692     while (next_level_it != next_level_files.end() &&
2693            icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
2694       next_level_it++;
2695     }
2696 
2697     while (next_level_it != next_level_files.end() &&
2698            icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
2699       overlapping_bytes += (*next_level_it)->fd.file_size;
2700 
2701       if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
2702         // next level file cross large boundary of current file.
2703         break;
2704       }
2705       next_level_it++;
2706     }
2707 
2708     assert(file->compensated_file_size != 0);
2709     file_to_order[file->fd.GetNumber()] =
2710         overlapping_bytes * 1024u / file->compensated_file_size;
2711   }
2712 
2713   std::sort(temp->begin(), temp->end(),
2714             [&](const Fsize& f1, const Fsize& f2) -> bool {
2715               return file_to_order[f1.file->fd.GetNumber()] <
2716                      file_to_order[f2.file->fd.GetNumber()];
2717             });
2718 }
2719 }  // namespace
2720 
UpdateFilesByCompactionPri(CompactionPri compaction_pri)2721 void VersionStorageInfo::UpdateFilesByCompactionPri(
2722     CompactionPri compaction_pri) {
2723   if (compaction_style_ == kCompactionStyleNone ||
2724       compaction_style_ == kCompactionStyleFIFO ||
2725       compaction_style_ == kCompactionStyleUniversal) {
2726     // don't need this
2727     return;
2728   }
2729   // No need to sort the highest level because it is never compacted.
2730   for (int level = 0; level < num_levels() - 1; level++) {
2731     const std::vector<FileMetaData*>& files = files_[level];
2732     auto& files_by_compaction_pri = files_by_compaction_pri_[level];
2733     assert(files_by_compaction_pri.size() == 0);
2734 
2735     // populate a temp vector for sorting based on size
2736     std::vector<Fsize> temp(files.size());
2737     for (size_t i = 0; i < files.size(); i++) {
2738       temp[i].index = i;
2739       temp[i].file = files[i];
2740     }
2741 
2742     // sort the top number_of_files_to_sort_ based on file size
2743     size_t num = VersionStorageInfo::kNumberFilesToSort;
2744     if (num > temp.size()) {
2745       num = temp.size();
2746     }
2747     switch (compaction_pri) {
2748       case kByCompensatedSize:
2749         std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
2750                           CompareCompensatedSizeDescending);
2751         break;
2752       case kOldestLargestSeqFirst:
2753         std::sort(temp.begin(), temp.end(),
2754                   [](const Fsize& f1, const Fsize& f2) -> bool {
2755                     return f1.file->fd.largest_seqno <
2756                            f2.file->fd.largest_seqno;
2757                   });
2758         break;
2759       case kOldestSmallestSeqFirst:
2760         std::sort(temp.begin(), temp.end(),
2761                   [](const Fsize& f1, const Fsize& f2) -> bool {
2762                     return f1.file->fd.smallest_seqno <
2763                            f2.file->fd.smallest_seqno;
2764                   });
2765         break;
2766       case kMinOverlappingRatio:
2767         SortFileByOverlappingRatio(*internal_comparator_, files_[level],
2768                                    files_[level + 1], &temp);
2769         break;
2770       default:
2771         assert(false);
2772     }
2773     assert(temp.size() == files.size());
2774 
2775     // initialize files_by_compaction_pri_
2776     for (size_t i = 0; i < temp.size(); i++) {
2777       files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2778     }
2779     next_file_to_compact_by_size_[level] = 0;
2780     assert(files_[level].size() == files_by_compaction_pri_[level].size());
2781   }
2782 }
2783 
GenerateLevel0NonOverlapping()2784 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2785   assert(!finalized_);
2786   level0_non_overlapping_ = true;
2787   if (level_files_brief_.size() == 0) {
2788     return;
2789   }
2790 
2791   // A copy of L0 files sorted by smallest key
2792   std::vector<FdWithKeyRange> level0_sorted_file(
2793       level_files_brief_[0].files,
2794       level_files_brief_[0].files + level_files_brief_[0].num_files);
2795   std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
2796             [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
2797               return (internal_comparator_->Compare(f1.smallest_key,
2798                                                     f2.smallest_key) < 0);
2799             });
2800 
2801   for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
2802     FdWithKeyRange& f = level0_sorted_file[i];
2803     FdWithKeyRange& prev = level0_sorted_file[i - 1];
2804     if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
2805       level0_non_overlapping_ = false;
2806       break;
2807     }
2808   }
2809 }
2810 
GenerateBottommostFiles()2811 void VersionStorageInfo::GenerateBottommostFiles() {
2812   assert(!finalized_);
2813   assert(bottommost_files_.empty());
2814   for (size_t level = 0; level < level_files_brief_.size(); ++level) {
2815     for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
2816          ++file_idx) {
2817       const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
2818       int l0_file_idx;
2819       if (level == 0) {
2820         l0_file_idx = static_cast<int>(file_idx);
2821       } else {
2822         l0_file_idx = -1;
2823       }
2824       Slice smallest_user_key = ExtractUserKey(f.smallest_key);
2825       Slice largest_user_key = ExtractUserKey(f.largest_key);
2826       if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2827                                          static_cast<int>(level),
2828                                          l0_file_idx)) {
2829         bottommost_files_.emplace_back(static_cast<int>(level),
2830                                        f.file_metadata);
2831       }
2832     }
2833   }
2834 }
2835 
UpdateOldestSnapshot(SequenceNumber seqnum)2836 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
2837   assert(seqnum >= oldest_snapshot_seqnum_);
2838   oldest_snapshot_seqnum_ = seqnum;
2839   if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
2840     ComputeBottommostFilesMarkedForCompaction();
2841   }
2842 }
2843 
ComputeBottommostFilesMarkedForCompaction()2844 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
2845   bottommost_files_marked_for_compaction_.clear();
2846   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2847   for (auto& level_and_file : bottommost_files_) {
2848     if (!level_and_file.second->being_compacted &&
2849         level_and_file.second->fd.largest_seqno != 0 &&
2850         level_and_file.second->num_deletions > 1) {
2851       // largest_seqno might be nonzero due to containing the final key in an
2852       // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
2853       // ensures the file really contains deleted or overwritten keys.
2854       if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
2855         bottommost_files_marked_for_compaction_.push_back(level_and_file);
2856       } else {
2857         bottommost_files_mark_threshold_ =
2858             std::min(bottommost_files_mark_threshold_,
2859                      level_and_file.second->fd.largest_seqno);
2860       }
2861     }
2862   }
2863 }
2864 
Ref()2865 void Version::Ref() {
2866   ++refs_;
2867 }
2868 
Unref()2869 bool Version::Unref() {
2870   assert(refs_ >= 1);
2871   --refs_;
2872   if (refs_ == 0) {
2873     delete this;
2874     return true;
2875   }
2876   return false;
2877 }
2878 
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)2879 bool VersionStorageInfo::OverlapInLevel(int level,
2880                                         const Slice* smallest_user_key,
2881                                         const Slice* largest_user_key) {
2882   if (level >= num_non_empty_levels_) {
2883     // empty level, no overlap
2884     return false;
2885   }
2886   return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
2887                                level_files_brief_[level], smallest_user_key,
2888                                largest_user_key);
2889 }
2890 
2891 // Store in "*inputs" all files in "level" that overlap [begin,end]
2892 // If hint_index is specified, then it points to a file in the
2893 // overlapping range.
2894 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const2895 void VersionStorageInfo::GetOverlappingInputs(
2896     int level, const InternalKey* begin, const InternalKey* end,
2897     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
2898     bool expand_range, InternalKey** next_smallest) const {
2899   if (level >= num_non_empty_levels_) {
2900     // this level is empty, no overlapping inputs
2901     return;
2902   }
2903 
2904   inputs->clear();
2905   if (file_index) {
2906     *file_index = -1;
2907   }
2908   const Comparator* user_cmp = user_comparator_;
2909   if (level > 0) {
2910     GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
2911                                           file_index, false, next_smallest);
2912     return;
2913   }
2914 
2915   if (next_smallest) {
2916     // next_smallest key only makes sense for non-level 0, where files are
2917     // non-overlapping
2918     *next_smallest = nullptr;
2919   }
2920 
2921   Slice user_begin, user_end;
2922   if (begin != nullptr) {
2923     user_begin = begin->user_key();
2924   }
2925   if (end != nullptr) {
2926     user_end = end->user_key();
2927   }
2928 
2929   // index stores the file index need to check.
2930   std::list<size_t> index;
2931   for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
2932     index.emplace_back(i);
2933   }
2934 
2935   while (!index.empty()) {
2936     bool found_overlapping_file = false;
2937     auto iter = index.begin();
2938     while (iter != index.end()) {
2939       FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
2940       const Slice file_start = ExtractUserKey(f->smallest_key);
2941       const Slice file_limit = ExtractUserKey(f->largest_key);
2942       if (begin != nullptr &&
2943           user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
2944         // "f" is completely before specified range; skip it
2945         iter++;
2946       } else if (end != nullptr &&
2947                  user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
2948         // "f" is completely after specified range; skip it
2949         iter++;
2950       } else {
2951         // if overlap
2952         inputs->emplace_back(files_[level][*iter]);
2953         found_overlapping_file = true;
2954         // record the first file index.
2955         if (file_index && *file_index == -1) {
2956           *file_index = static_cast<int>(*iter);
2957         }
2958         // the related file is overlap, erase to avoid checking again.
2959         iter = index.erase(iter);
2960         if (expand_range) {
2961           if (begin != nullptr &&
2962               user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
2963             user_begin = file_start;
2964           }
2965           if (end != nullptr &&
2966               user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
2967             user_end = file_limit;
2968           }
2969         }
2970       }
2971     }
2972     // if all the files left are not overlap, break
2973     if (!found_overlapping_file) {
2974       break;
2975     }
2976   }
2977 }
2978 
2979 // Store in "*inputs" files in "level" that within range [begin,end]
2980 // Guarantee a "clean cut" boundary between the files in inputs
2981 // and the surrounding files and the maxinum number of files.
2982 // This will ensure that no parts of a key are lost during compaction.
2983 // If hint_index is specified, then it points to a file in the range.
2984 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const2985 void VersionStorageInfo::GetCleanInputsWithinInterval(
2986     int level, const InternalKey* begin, const InternalKey* end,
2987     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
2988   inputs->clear();
2989   if (file_index) {
2990     *file_index = -1;
2991   }
2992   if (level >= num_non_empty_levels_ || level == 0 ||
2993       level_files_brief_[level].num_files == 0) {
2994     // this level is empty, no inputs within range
2995     // also don't support clean input interval within L0
2996     return;
2997   }
2998 
2999   GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3000                                         hint_index, file_index,
3001                                         true /* within_interval */);
3002 }
3003 
3004 // Store in "*inputs" all files in "level" that overlap [begin,end]
3005 // Employ binary search to find at least one file that overlaps the
3006 // specified range. From that file, iterate backwards and
3007 // forwards to find all overlapping files.
3008 // if within_range is set, then only store the maximum clean inputs
3009 // within range [begin, end]. "clean" means there is a boudnary
3010 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3011 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3012     int level, const InternalKey* begin, const InternalKey* end,
3013     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3014     bool within_interval, InternalKey** next_smallest) const {
3015   assert(level > 0);
3016 
3017   auto user_cmp = user_comparator_;
3018   const FdWithKeyRange* files = level_files_brief_[level].files;
3019   const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3020 
3021   // begin to use binary search to find lower bound
3022   // and upper bound.
3023   int start_index = 0;
3024   int end_index = num_files;
3025 
3026   if (begin != nullptr) {
3027     // if within_interval is true, with file_key would find
3028     // not overlapping ranges in std::lower_bound.
3029     auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3030                                              const InternalKey* k) {
3031       auto& file_key = within_interval ? f.file_metadata->smallest
3032                                        : f.file_metadata->largest;
3033       return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3034     };
3035 
3036     start_index = static_cast<int>(
3037         std::lower_bound(files,
3038                          files + (hint_index == -1 ? num_files : hint_index),
3039                          begin, cmp) -
3040         files);
3041 
3042     if (start_index > 0 && within_interval) {
3043       bool is_overlapping = true;
3044       while (is_overlapping && start_index < num_files) {
3045         auto& pre_limit = files[start_index - 1].file_metadata->largest;
3046         auto& cur_start = files[start_index].file_metadata->smallest;
3047         is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3048         start_index += is_overlapping;
3049       }
3050     }
3051   }
3052 
3053   if (end != nullptr) {
3054     // if within_interval is true, with file_key would find
3055     // not overlapping ranges in std::upper_bound.
3056     auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3057                                              const FdWithKeyRange& f) {
3058       auto& file_key = within_interval ? f.file_metadata->largest
3059                                        : f.file_metadata->smallest;
3060       return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3061     };
3062 
3063     end_index = static_cast<int>(
3064         std::upper_bound(files + start_index, files + num_files, end, cmp) -
3065         files);
3066 
3067     if (end_index < num_files && within_interval) {
3068       bool is_overlapping = true;
3069       while (is_overlapping && end_index > start_index) {
3070         auto& next_start = files[end_index].file_metadata->smallest;
3071         auto& cur_limit = files[end_index - 1].file_metadata->largest;
3072         is_overlapping =
3073             sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3074         end_index -= is_overlapping;
3075       }
3076     }
3077   }
3078 
3079   assert(start_index <= end_index);
3080 
3081   // If there were no overlapping files, return immediately.
3082   if (start_index == end_index) {
3083     if (next_smallest) {
3084       *next_smallest = nullptr;
3085     }
3086     return;
3087   }
3088 
3089   assert(start_index < end_index);
3090 
3091   // returns the index where an overlap is found
3092   if (file_index) {
3093     *file_index = start_index;
3094   }
3095 
3096   // insert overlapping files into vector
3097   for (int i = start_index; i < end_index; i++) {
3098     inputs->push_back(files_[level][i]);
3099   }
3100 
3101   if (next_smallest != nullptr) {
3102     // Provide the next key outside the range covered by inputs
3103     if (end_index < static_cast<int>(files_[level].size())) {
3104       **next_smallest = files_[level][end_index]->smallest;
3105     } else {
3106       *next_smallest = nullptr;
3107     }
3108   }
3109 }
3110 
NumLevelBytes(int level) const3111 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3112   assert(level >= 0);
3113   assert(level < num_levels());
3114   return TotalFileSize(files_[level]);
3115 }
3116 
LevelSummary(LevelSummaryStorage * scratch) const3117 const char* VersionStorageInfo::LevelSummary(
3118     LevelSummaryStorage* scratch) const {
3119   int len = 0;
3120   if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3121     assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3122     if (level_multiplier_ != 0.0) {
3123       len = snprintf(
3124           scratch->buffer, sizeof(scratch->buffer),
3125           "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3126           base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3127     }
3128   }
3129   len +=
3130       snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3131   for (int i = 0; i < num_levels(); i++) {
3132     int sz = sizeof(scratch->buffer) - len;
3133     int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3134     if (ret < 0 || ret >= sz) break;
3135     len += ret;
3136   }
3137   if (len > 0) {
3138     // overwrite the last space
3139     --len;
3140   }
3141   len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3142                   "] max score %.2f", compaction_score_[0]);
3143 
3144   if (!files_marked_for_compaction_.empty()) {
3145     snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3146              " (%" ROCKSDB_PRIszt " files need compaction)",
3147              files_marked_for_compaction_.size());
3148   }
3149 
3150   return scratch->buffer;
3151 }
3152 
LevelFileSummary(FileSummaryStorage * scratch,int level) const3153 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3154                                                  int level) const {
3155   int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3156   for (const auto& f : files_[level]) {
3157     int sz = sizeof(scratch->buffer) - len;
3158     char sztxt[16];
3159     AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3160     int ret = snprintf(scratch->buffer + len, sz,
3161                        "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3162                        f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3163                        static_cast<int>(f->being_compacted));
3164     if (ret < 0 || ret >= sz)
3165       break;
3166     len += ret;
3167   }
3168   // overwrite the last space (only if files_[level].size() is non-zero)
3169   if (files_[level].size() && len > 0) {
3170     --len;
3171   }
3172   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3173   return scratch->buffer;
3174 }
3175 
MaxNextLevelOverlappingBytes()3176 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3177   uint64_t result = 0;
3178   std::vector<FileMetaData*> overlaps;
3179   for (int level = 1; level < num_levels() - 1; level++) {
3180     for (const auto& f : files_[level]) {
3181       GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3182       const uint64_t sum = TotalFileSize(overlaps);
3183       if (sum > result) {
3184         result = sum;
3185       }
3186     }
3187   }
3188   return result;
3189 }
3190 
MaxBytesForLevel(int level) const3191 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3192   // Note: the result for level zero is not really used since we set
3193   // the level-0 compaction threshold based on number of files.
3194   assert(level >= 0);
3195   assert(level < static_cast<int>(level_max_bytes_.size()));
3196   return level_max_bytes_[level];
3197 }
3198 
CalculateBaseBytes(const ImmutableCFOptions & ioptions,const MutableCFOptions & options)3199 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
3200                                             const MutableCFOptions& options) {
3201   // Special logic to set number of sorted runs.
3202   // It is to match the previous behavior when all files are in L0.
3203   int num_l0_count = static_cast<int>(files_[0].size());
3204   if (compaction_style_ == kCompactionStyleUniversal) {
3205     // For universal compaction, we use level0 score to indicate
3206     // compaction score for the whole DB. Adding other levels as if
3207     // they are L0 files.
3208     for (int i = 1; i < num_levels(); i++) {
3209       if (!files_[i].empty()) {
3210         num_l0_count++;
3211       }
3212     }
3213   }
3214   set_l0_delay_trigger_count(num_l0_count);
3215 
3216   level_max_bytes_.resize(ioptions.num_levels);
3217   if (!ioptions.level_compaction_dynamic_level_bytes) {
3218     base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3219 
3220     // Calculate for static bytes base case
3221     for (int i = 0; i < ioptions.num_levels; ++i) {
3222       if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3223         level_max_bytes_[i] = options.max_bytes_for_level_base;
3224       } else if (i > 1) {
3225         level_max_bytes_[i] = MultiplyCheckOverflow(
3226             MultiplyCheckOverflow(level_max_bytes_[i - 1],
3227                                   options.max_bytes_for_level_multiplier),
3228             options.MaxBytesMultiplerAdditional(i - 1));
3229       } else {
3230         level_max_bytes_[i] = options.max_bytes_for_level_base;
3231       }
3232     }
3233   } else {
3234     uint64_t max_level_size = 0;
3235 
3236     int first_non_empty_level = -1;
3237     // Find size of non-L0 level of most data.
3238     // Cannot use the size of the last level because it can be empty or less
3239     // than previous levels after compaction.
3240     for (int i = 1; i < num_levels_; i++) {
3241       uint64_t total_size = 0;
3242       for (const auto& f : files_[i]) {
3243         total_size += f->fd.GetFileSize();
3244       }
3245       if (total_size > 0 && first_non_empty_level == -1) {
3246         first_non_empty_level = i;
3247       }
3248       if (total_size > max_level_size) {
3249         max_level_size = total_size;
3250       }
3251     }
3252 
3253     // Prefill every level's max bytes to disallow compaction from there.
3254     for (int i = 0; i < num_levels_; i++) {
3255       level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3256     }
3257 
3258     if (max_level_size == 0) {
3259       // No data for L1 and up. L0 compacts to last level directly.
3260       // No compaction from L1+ needs to be scheduled.
3261       base_level_ = num_levels_ - 1;
3262     } else {
3263       uint64_t l0_size = 0;
3264       for (const auto& f : files_[0]) {
3265         l0_size += f->fd.GetFileSize();
3266       }
3267 
3268       uint64_t base_bytes_max =
3269           std::max(options.max_bytes_for_level_base, l0_size);
3270       uint64_t base_bytes_min = static_cast<uint64_t>(
3271           base_bytes_max / options.max_bytes_for_level_multiplier);
3272 
3273       // Try whether we can make last level's target size to be max_level_size
3274       uint64_t cur_level_size = max_level_size;
3275       for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3276         // Round up after dividing
3277         cur_level_size = static_cast<uint64_t>(
3278             cur_level_size / options.max_bytes_for_level_multiplier);
3279       }
3280 
3281       // Calculate base level and its size.
3282       uint64_t base_level_size;
3283       if (cur_level_size <= base_bytes_min) {
3284         // Case 1. If we make target size of last level to be max_level_size,
3285         // target size of the first non-empty level would be smaller than
3286         // base_bytes_min. We set it be base_bytes_min.
3287         base_level_size = base_bytes_min + 1U;
3288         base_level_ = first_non_empty_level;
3289         ROCKS_LOG_INFO(ioptions.info_log,
3290                        "More existing levels in DB than needed. "
3291                        "max_bytes_for_level_multiplier may not be guaranteed.");
3292       } else {
3293         // Find base level (where L0 data is compacted to).
3294         base_level_ = first_non_empty_level;
3295         while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3296           --base_level_;
3297           cur_level_size = static_cast<uint64_t>(
3298               cur_level_size / options.max_bytes_for_level_multiplier);
3299         }
3300         if (cur_level_size > base_bytes_max) {
3301           // Even L1 will be too large
3302           assert(base_level_ == 1);
3303           base_level_size = base_bytes_max;
3304         } else {
3305           base_level_size = cur_level_size;
3306         }
3307       }
3308 
3309       level_multiplier_ = options.max_bytes_for_level_multiplier;
3310       assert(base_level_size > 0);
3311       if (l0_size > base_level_size &&
3312           (l0_size > options.max_bytes_for_level_base ||
3313            static_cast<int>(files_[0].size() / 2) >=
3314                options.level0_file_num_compaction_trigger)) {
3315         // We adjust the base level according to actual L0 size, and adjust
3316         // the level multiplier accordingly, when:
3317         //   1. the L0 size is larger than level size base, or
3318         //   2. number of L0 files reaches twice the L0->L1 compaction trigger
3319         // We don't do this otherwise to keep the LSM-tree structure stable
3320         // unless the L0 compation is backlogged.
3321         base_level_size = l0_size;
3322         if (base_level_ == num_levels_ - 1) {
3323           level_multiplier_ = 1.0;
3324         } else {
3325           level_multiplier_ = std::pow(
3326               static_cast<double>(max_level_size) /
3327                   static_cast<double>(base_level_size),
3328               1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3329         }
3330       }
3331 
3332       uint64_t level_size = base_level_size;
3333       for (int i = base_level_; i < num_levels_; i++) {
3334         if (i > base_level_) {
3335           level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3336         }
3337         // Don't set any level below base_bytes_max. Otherwise, the LSM can
3338         // assume an hourglass shape where L1+ sizes are smaller than L0. This
3339         // causes compaction scoring, which depends on level sizes, to favor L1+
3340         // at the expense of L0, which may fill up and stall.
3341         level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3342       }
3343     }
3344   }
3345 }
3346 
EstimateLiveDataSize() const3347 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3348   // Estimate the live data size by adding up the size of the last level for all
3349   // key ranges. Note: Estimate depends on the ordering of files in level 0
3350   // because files in level 0 can be overlapping.
3351   uint64_t size = 0;
3352 
3353   auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3354     return internal_comparator_->Compare(*x, *y) < 0;
3355   };
3356   // (Ordered) map of largest keys in non-overlapping files
3357   std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3358 
3359   for (int l = num_levels_ - 1; l >= 0; l--) {
3360     bool found_end = false;
3361     for (auto file : files_[l]) {
3362       // Find the first file where the largest key is larger than the smallest
3363       // key of the current file. If this file does not overlap with the
3364       // current file, none of the files in the map does. If there is
3365       // no potential overlap, we can safely insert the rest of this level
3366       // (if the level is not 0) into the map without checking again because
3367       // the elements in the level are sorted and non-overlapping.
3368       auto lb = (found_end && l != 0) ?
3369         ranges.end() : ranges.lower_bound(&file->smallest);
3370       found_end = (lb == ranges.end());
3371       if (found_end || internal_comparator_->Compare(
3372             file->largest, (*lb).second->smallest) < 0) {
3373           ranges.emplace_hint(lb, &file->largest, file);
3374           size += file->fd.file_size;
3375       }
3376     }
3377   }
3378   return size;
3379 }
3380 
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3381 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3382     const Slice& smallest_user_key, const Slice& largest_user_key,
3383     int last_level, int last_l0_idx) {
3384   assert((last_l0_idx != -1) == (last_level == 0));
3385   // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3386   // bottommost only if it's the oldest L0 file and there are no files on older
3387   // levels. It'd be better to consider it bottommost if there's no overlap in
3388   // older levels/files.
3389   if (last_level == 0 &&
3390       last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3391     return true;
3392   }
3393 
3394   // Checks whether there are files living beyond the `last_level`. If lower
3395   // levels have files, it checks for overlap between [`smallest_key`,
3396   // `largest_key`] and those files. Bottomlevel optimizations can be made if
3397   // there are no files in lower levels or if there is no overlap with the files
3398   // in the lower levels.
3399   for (int level = last_level + 1; level < num_levels(); level++) {
3400     // The range is not in the bottommost level if there are files in lower
3401     // levels when the `last_level` is 0 or if there are files in lower levels
3402     // which overlap with [`smallest_key`, `largest_key`].
3403     if (files_[level].size() > 0 &&
3404         (last_level == 0 ||
3405          OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3406       return true;
3407     }
3408   }
3409   return false;
3410 }
3411 
AddLiveFiles(std::vector<FileDescriptor> * live)3412 void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
3413   for (int level = 0; level < storage_info_.num_levels(); level++) {
3414     const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3415     for (const auto& file : files) {
3416       live->push_back(file->fd);
3417     }
3418   }
3419 }
3420 
DebugString(bool hex,bool print_stats) const3421 std::string Version::DebugString(bool hex, bool print_stats) const {
3422   std::string r;
3423   for (int level = 0; level < storage_info_.num_levels_; level++) {
3424     // E.g.,
3425     //   --- level 1 ---
3426     //   17:123[1 .. 124]['a' .. 'd']
3427     //   20:43[124 .. 128]['e' .. 'g']
3428     //
3429     // if print_stats=true:
3430     //   17:123[1 .. 124]['a' .. 'd'](4096)
3431     r.append("--- level ");
3432     AppendNumberTo(&r, level);
3433     r.append(" --- version# ");
3434     AppendNumberTo(&r, version_number_);
3435     r.append(" ---\n");
3436     const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3437     for (size_t i = 0; i < files.size(); i++) {
3438       r.push_back(' ');
3439       AppendNumberTo(&r, files[i]->fd.GetNumber());
3440       r.push_back(':');
3441       AppendNumberTo(&r, files[i]->fd.GetFileSize());
3442       r.append("[");
3443       AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3444       r.append(" .. ");
3445       AppendNumberTo(&r, files[i]->fd.largest_seqno);
3446       r.append("]");
3447       r.append("[");
3448       r.append(files[i]->smallest.DebugString(hex));
3449       r.append(" .. ");
3450       r.append(files[i]->largest.DebugString(hex));
3451       r.append("]");
3452       if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3453         r.append(" blob_file:");
3454         AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3455       }
3456       if (print_stats) {
3457         r.append("(");
3458         r.append(ToString(
3459             files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3460         r.append(")");
3461       }
3462       r.append("\n");
3463     }
3464   }
3465   return r;
3466 }
3467 
3468 // this is used to batch writes to the manifest file
3469 struct VersionSet::ManifestWriter {
3470   Status status;
3471   bool done;
3472   InstrumentedCondVar cv;
3473   ColumnFamilyData* cfd;
3474   const MutableCFOptions mutable_cf_options;
3475   const autovector<VersionEdit*>& edit_list;
3476 
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3477   explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3478                           const MutableCFOptions& cf_options,
3479                           const autovector<VersionEdit*>& e)
3480       : done(false),
3481         cv(mu),
3482         cfd(_cfd),
3483         mutable_cf_options(cf_options),
3484         edit_list(e) {}
3485 };
3486 
AddEdit(VersionEdit * edit)3487 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3488   assert(edit);
3489   if (edit->is_in_atomic_group_) {
3490     TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3491     if (replay_buffer_.empty()) {
3492       replay_buffer_.resize(edit->remaining_entries_ + 1);
3493       TEST_SYNC_POINT_CALLBACK(
3494           "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
3495     }
3496     read_edits_in_atomic_group_++;
3497     if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
3498         static_cast<uint32_t>(replay_buffer_.size())) {
3499       TEST_SYNC_POINT_CALLBACK(
3500           "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
3501       return Status::Corruption("corrupted atomic group");
3502     }
3503     replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3504     if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
3505       TEST_SYNC_POINT_CALLBACK(
3506           "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
3507       return Status::OK();
3508     }
3509     return Status::OK();
3510   }
3511 
3512   // A normal edit.
3513   if (!replay_buffer().empty()) {
3514     TEST_SYNC_POINT_CALLBACK(
3515         "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
3516     return Status::Corruption("corrupted atomic group");
3517   }
3518   return Status::OK();
3519 }
3520 
IsFull() const3521 bool AtomicGroupReadBuffer::IsFull() const {
3522   return read_edits_in_atomic_group_ == replay_buffer_.size();
3523 }
3524 
IsEmpty() const3525 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
3526 
Clear()3527 void AtomicGroupReadBuffer::Clear() {
3528   read_edits_in_atomic_group_ = 0;
3529   replay_buffer_.clear();
3530 }
3531 
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer)3532 VersionSet::VersionSet(const std::string& dbname,
3533                        const ImmutableDBOptions* _db_options,
3534                        const FileOptions& storage_options, Cache* table_cache,
3535                        WriteBufferManager* write_buffer_manager,
3536                        WriteController* write_controller,
3537                        BlockCacheTracer* const block_cache_tracer)
3538     : column_family_set_(new ColumnFamilySet(
3539           dbname, _db_options, storage_options, table_cache,
3540           write_buffer_manager, write_controller, block_cache_tracer)),
3541       env_(_db_options->env),
3542       fs_(_db_options->fs.get()),
3543       dbname_(dbname),
3544       db_options_(_db_options),
3545       next_file_number_(2),
3546       manifest_file_number_(0),  // Filled by Recover()
3547       options_file_number_(0),
3548       pending_manifest_file_number_(0),
3549       last_sequence_(0),
3550       last_allocated_sequence_(0),
3551       last_published_sequence_(0),
3552       prev_log_number_(0),
3553       current_version_number_(0),
3554       manifest_file_size_(0),
3555       file_options_(storage_options),
3556       block_cache_tracer_(block_cache_tracer) {}
3557 
~VersionSet()3558 VersionSet::~VersionSet() {
3559   // we need to delete column_family_set_ because its destructor depends on
3560   // VersionSet
3561   Cache* table_cache = column_family_set_->get_table_cache();
3562   column_family_set_.reset();
3563   for (auto& file : obsolete_files_) {
3564     if (file.metadata->table_reader_handle) {
3565       table_cache->Release(file.metadata->table_reader_handle);
3566       TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
3567     }
3568     file.DeleteMetadata();
3569   }
3570   obsolete_files_.clear();
3571 }
3572 
AppendVersion(ColumnFamilyData * column_family_data,Version * v)3573 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
3574                                Version* v) {
3575   // compute new compaction score
3576   v->storage_info()->ComputeCompactionScore(
3577       *column_family_data->ioptions(),
3578       *column_family_data->GetLatestMutableCFOptions());
3579 
3580   // Mark v finalized
3581   v->storage_info_.SetFinalized();
3582 
3583   // Make "v" current
3584   assert(v->refs_ == 0);
3585   Version* current = column_family_data->current();
3586   assert(v != current);
3587   if (current != nullptr) {
3588     assert(current->refs_ > 0);
3589     current->Unref();
3590   }
3591   column_family_data->SetCurrent(v);
3592   v->Ref();
3593 
3594   // Append to linked list
3595   v->prev_ = column_family_data->dummy_versions()->prev_;
3596   v->next_ = column_family_data->dummy_versions();
3597   v->prev_->next_ = v;
3598   v->next_->prev_ = v;
3599 }
3600 
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,Directory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)3601 Status VersionSet::ProcessManifestWrites(
3602     std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3603     Directory* db_directory, bool new_descriptor_log,
3604     const ColumnFamilyOptions* new_cf_options) {
3605   assert(!writers.empty());
3606   ManifestWriter& first_writer = writers.front();
3607   ManifestWriter* last_writer = &first_writer;
3608 
3609   assert(!manifest_writers_.empty());
3610   assert(manifest_writers_.front() == &first_writer);
3611 
3612   autovector<VersionEdit*> batch_edits;
3613   autovector<Version*> versions;
3614   autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
3615   std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
3616 
3617   if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3618     // No group commits for column family add or drop
3619     LogAndApplyCFHelper(first_writer.edit_list.front());
3620     batch_edits.push_back(first_writer.edit_list.front());
3621   } else {
3622     auto it = manifest_writers_.cbegin();
3623     size_t group_start = std::numeric_limits<size_t>::max();
3624     while (it != manifest_writers_.cend()) {
3625       if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3626         // no group commits for column family add or drop
3627         break;
3628       }
3629       last_writer = *(it++);
3630       assert(last_writer != nullptr);
3631       assert(last_writer->cfd != nullptr);
3632       if (last_writer->cfd->IsDropped()) {
3633         // If we detect a dropped CF at this point, and the corresponding
3634         // version edits belong to an atomic group, then we need to find out
3635         // the preceding version edits in the same atomic group, and update
3636         // their `remaining_entries_` member variable because we are NOT going
3637         // to write the version edits' of dropped CF to the MANIFEST. If we
3638         // don't update, then Recover can report corrupted atomic group because
3639         // the `remaining_entries_` do not match.
3640         if (!batch_edits.empty()) {
3641           if (batch_edits.back()->is_in_atomic_group_ &&
3642               batch_edits.back()->remaining_entries_ > 0) {
3643             assert(group_start < batch_edits.size());
3644             const auto& edit_list = last_writer->edit_list;
3645             size_t k = 0;
3646             while (k < edit_list.size()) {
3647               if (!edit_list[k]->is_in_atomic_group_) {
3648                 break;
3649               } else if (edit_list[k]->remaining_entries_ == 0) {
3650                 ++k;
3651                 break;
3652               }
3653               ++k;
3654             }
3655             for (auto i = group_start; i < batch_edits.size(); ++i) {
3656               assert(static_cast<uint32_t>(k) <=
3657                      batch_edits.back()->remaining_entries_);
3658               batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
3659             }
3660           }
3661         }
3662         continue;
3663       }
3664       // We do a linear search on versions because versions is small.
3665       // TODO(yanqin) maybe consider unordered_map
3666       Version* version = nullptr;
3667       VersionBuilder* builder = nullptr;
3668       for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
3669         uint32_t cf_id = last_writer->cfd->GetID();
3670         if (versions[i]->cfd()->GetID() == cf_id) {
3671           version = versions[i];
3672           assert(!builder_guards.empty() &&
3673                  builder_guards.size() == versions.size());
3674           builder = builder_guards[i]->version_builder();
3675           TEST_SYNC_POINT_CALLBACK(
3676               "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
3677           break;
3678         }
3679       }
3680       if (version == nullptr) {
3681         version = new Version(last_writer->cfd, this, file_options_,
3682                               last_writer->mutable_cf_options,
3683                               current_version_number_++);
3684         versions.push_back(version);
3685         mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
3686         builder_guards.emplace_back(
3687             new BaseReferencedVersionBuilder(last_writer->cfd));
3688         builder = builder_guards.back()->version_builder();
3689       }
3690       assert(builder != nullptr);  // make checker happy
3691       for (const auto& e : last_writer->edit_list) {
3692         if (e->is_in_atomic_group_) {
3693           if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
3694               (batch_edits.back()->is_in_atomic_group_ &&
3695                batch_edits.back()->remaining_entries_ == 0)) {
3696             group_start = batch_edits.size();
3697           }
3698         } else if (group_start != std::numeric_limits<size_t>::max()) {
3699           group_start = std::numeric_limits<size_t>::max();
3700         }
3701         Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3702         if (!s.ok()) {
3703           // free up the allocated memory
3704           for (auto v : versions) {
3705             delete v;
3706           }
3707           return s;
3708         }
3709         batch_edits.push_back(e);
3710       }
3711     }
3712     for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3713       assert(!builder_guards.empty() &&
3714              builder_guards.size() == versions.size());
3715       auto* builder = builder_guards[i]->version_builder();
3716       Status s = builder->SaveTo(versions[i]->storage_info());
3717       if (!s.ok()) {
3718         // free up the allocated memory
3719         for (auto v : versions) {
3720           delete v;
3721         }
3722         return s;
3723       }
3724     }
3725   }
3726 
3727 #ifndef NDEBUG
3728   // Verify that version edits of atomic groups have correct
3729   // remaining_entries_.
3730   size_t k = 0;
3731   while (k < batch_edits.size()) {
3732     while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
3733       ++k;
3734     }
3735     if (k == batch_edits.size()) {
3736       break;
3737     }
3738     size_t i = k;
3739     while (i < batch_edits.size()) {
3740       if (!batch_edits[i]->is_in_atomic_group_) {
3741         break;
3742       }
3743       assert(i - k + batch_edits[i]->remaining_entries_ ==
3744              batch_edits[k]->remaining_entries_);
3745       if (batch_edits[i]->remaining_entries_ == 0) {
3746         ++i;
3747         break;
3748       }
3749       ++i;
3750     }
3751     assert(batch_edits[i - 1]->is_in_atomic_group_);
3752     assert(0 == batch_edits[i - 1]->remaining_entries_);
3753     std::vector<VersionEdit*> tmp;
3754     for (size_t j = k; j != i; ++j) {
3755       tmp.emplace_back(batch_edits[j]);
3756     }
3757     TEST_SYNC_POINT_CALLBACK(
3758         "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
3759     k = i;
3760   }
3761 #endif  // NDEBUG
3762 
3763   uint64_t new_manifest_file_size = 0;
3764   Status s;
3765 
3766   assert(pending_manifest_file_number_ == 0);
3767   if (!descriptor_log_ ||
3768       manifest_file_size_ > db_options_->max_manifest_file_size) {
3769     TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
3770     new_descriptor_log = true;
3771   } else {
3772     pending_manifest_file_number_ = manifest_file_number_;
3773   }
3774 
3775   // Local cached copy of state variable(s). WriteCurrentStateToManifest()
3776   // reads its content after releasing db mutex to avoid race with
3777   // SwitchMemtable().
3778   std::unordered_map<uint32_t, MutableCFState> curr_state;
3779   if (new_descriptor_log) {
3780     pending_manifest_file_number_ = NewFileNumber();
3781     batch_edits.back()->SetNextFile(next_file_number_.load());
3782 
3783     // if we are writing out new snapshot make sure to persist max column
3784     // family.
3785     if (column_family_set_->GetMaxColumnFamily() > 0) {
3786       first_writer.edit_list.front()->SetMaxColumnFamily(
3787           column_family_set_->GetMaxColumnFamily());
3788     }
3789     for (const auto* cfd : *column_family_set_) {
3790       assert(curr_state.find(cfd->GetID()) == curr_state.end());
3791       curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
3792     }
3793   }
3794 
3795   {
3796     FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
3797     mu->Unlock();
3798 
3799     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
3800     if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3801       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3802         assert(!builder_guards.empty() &&
3803                builder_guards.size() == versions.size());
3804         assert(!mutable_cf_options_ptrs.empty() &&
3805                builder_guards.size() == versions.size());
3806         ColumnFamilyData* cfd = versions[i]->cfd_;
3807         s = builder_guards[i]->version_builder()->LoadTableHandlers(
3808             cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits,
3809             true /* prefetch_index_and_filter_in_cache */,
3810             false /* is_initial_load */,
3811             mutable_cf_options_ptrs[i]->prefix_extractor.get());
3812         if (!s.ok()) {
3813           if (db_options_->paranoid_checks) {
3814             break;
3815           }
3816           s = Status::OK();
3817         }
3818       }
3819     }
3820 
3821     if (s.ok() && new_descriptor_log) {
3822       // This is fine because everything inside of this block is serialized --
3823       // only one thread can be here at the same time
3824       // create new manifest file
3825       ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
3826                      pending_manifest_file_number_);
3827       std::string descriptor_fname =
3828           DescriptorFileName(dbname_, pending_manifest_file_number_);
3829       std::unique_ptr<FSWritableFile> descriptor_file;
3830       s = NewWritableFile(fs_, descriptor_fname, &descriptor_file,
3831                           opt_file_opts);
3832       if (s.ok()) {
3833         descriptor_file->SetPreallocationBlockSize(
3834             db_options_->manifest_preallocation_size);
3835 
3836         std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
3837             std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
3838             nullptr, db_options_->listeners));
3839         descriptor_log_.reset(
3840             new log::Writer(std::move(file_writer), 0, false));
3841         s = WriteCurrentStateToManifest(curr_state, descriptor_log_.get());
3842       }
3843     }
3844 
3845     if (s.ok()) {
3846       if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3847         for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3848           versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
3849         }
3850       }
3851 
3852       // Write new records to MANIFEST log
3853 #ifndef NDEBUG
3854       size_t idx = 0;
3855 #endif
3856       for (auto& e : batch_edits) {
3857         std::string record;
3858         if (!e->EncodeTo(&record)) {
3859           s = Status::Corruption("Unable to encode VersionEdit:" +
3860                                  e->DebugString(true));
3861           break;
3862         }
3863         TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
3864                          rocksdb_kill_odds * REDUCE_ODDS2);
3865 #ifndef NDEBUG
3866         if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
3867           TEST_SYNC_POINT_CALLBACK(
3868               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
3869               nullptr);
3870           TEST_SYNC_POINT(
3871               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
3872         }
3873         ++idx;
3874 #endif /* !NDEBUG */
3875         s = descriptor_log_->AddRecord(record);
3876         if (!s.ok()) {
3877           break;
3878         }
3879       }
3880       if (s.ok()) {
3881         s = SyncManifest(env_, db_options_, descriptor_log_->file());
3882       }
3883       if (!s.ok()) {
3884         ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
3885                         s.ToString().c_str());
3886       }
3887     }
3888 
3889     // If we just created a new descriptor file, install it by writing a
3890     // new CURRENT file that points to it.
3891     if (s.ok() && new_descriptor_log) {
3892       s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
3893                          db_directory);
3894       TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
3895     }
3896 
3897     if (s.ok()) {
3898       // find offset in manifest file where this version is stored.
3899       new_manifest_file_size = descriptor_log_->file()->GetFileSize();
3900     }
3901 
3902     if (first_writer.edit_list.front()->is_column_family_drop_) {
3903       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
3904       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
3905       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
3906     }
3907 
3908     LogFlush(db_options_->info_log);
3909     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
3910     mu->Lock();
3911   }
3912 
3913   // Append the old manifest file to the obsolete_manifest_ list to be deleted
3914   // by PurgeObsoleteFiles later.
3915   if (s.ok() && new_descriptor_log) {
3916     obsolete_manifests_.emplace_back(
3917         DescriptorFileName("", manifest_file_number_));
3918   }
3919 
3920   // Install the new versions
3921   if (s.ok()) {
3922     if (first_writer.edit_list.front()->is_column_family_add_) {
3923       assert(batch_edits.size() == 1);
3924       assert(new_cf_options != nullptr);
3925       CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
3926     } else if (first_writer.edit_list.front()->is_column_family_drop_) {
3927       assert(batch_edits.size() == 1);
3928       first_writer.cfd->SetDropped();
3929       first_writer.cfd->UnrefAndTryDelete();
3930     } else {
3931       // Each version in versions corresponds to a column family.
3932       // For each column family, update its log number indicating that logs
3933       // with number smaller than this should be ignored.
3934       for (const auto version : versions) {
3935         uint64_t max_log_number_in_batch = 0;
3936         uint32_t cf_id = version->cfd_->GetID();
3937         for (const auto& e : batch_edits) {
3938           if (e->has_log_number_ && e->column_family_ == cf_id) {
3939             max_log_number_in_batch =
3940                 std::max(max_log_number_in_batch, e->log_number_);
3941           }
3942         }
3943         if (max_log_number_in_batch != 0) {
3944           assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
3945           version->cfd_->SetLogNumber(max_log_number_in_batch);
3946         }
3947       }
3948 
3949       uint64_t last_min_log_number_to_keep = 0;
3950       for (auto& e : batch_edits) {
3951         if (e->has_min_log_number_to_keep_) {
3952           last_min_log_number_to_keep =
3953               std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
3954         }
3955       }
3956 
3957       if (last_min_log_number_to_keep != 0) {
3958         // Should only be set in 2PC mode.
3959         MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
3960       }
3961 
3962       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3963         ColumnFamilyData* cfd = versions[i]->cfd_;
3964         AppendVersion(cfd, versions[i]);
3965       }
3966     }
3967     manifest_file_number_ = pending_manifest_file_number_;
3968     manifest_file_size_ = new_manifest_file_size;
3969     prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
3970   } else {
3971     std::string version_edits;
3972     for (auto& e : batch_edits) {
3973       version_edits += ("\n" + e->DebugString(true));
3974     }
3975     ROCKS_LOG_ERROR(db_options_->info_log,
3976                     "Error in committing version edit to MANIFEST: %s",
3977                     version_edits.c_str());
3978     for (auto v : versions) {
3979       delete v;
3980     }
3981     // If manifest append failed for whatever reason, the file could be
3982     // corrupted. So we need to force the next version update to start a
3983     // new manifest file.
3984     descriptor_log_.reset();
3985     if (new_descriptor_log) {
3986       ROCKS_LOG_INFO(db_options_->info_log,
3987                      "Deleting manifest %" PRIu64 " current manifest %" PRIu64
3988                      "\n",
3989                      manifest_file_number_, pending_manifest_file_number_);
3990       env_->DeleteFile(
3991           DescriptorFileName(dbname_, pending_manifest_file_number_));
3992     }
3993   }
3994 
3995   pending_manifest_file_number_ = 0;
3996 
3997   // wake up all the waiting writers
3998   while (true) {
3999     ManifestWriter* ready = manifest_writers_.front();
4000     manifest_writers_.pop_front();
4001     bool need_signal = true;
4002     for (const auto& w : writers) {
4003       if (&w == ready) {
4004         need_signal = false;
4005         break;
4006       }
4007     }
4008     ready->status = s;
4009     ready->done = true;
4010     if (need_signal) {
4011       ready->cv.Signal();
4012     }
4013     if (ready == last_writer) {
4014       break;
4015     }
4016   }
4017   if (!manifest_writers_.empty()) {
4018     manifest_writers_.front()->cv.Signal();
4019   }
4020   return s;
4021 }
4022 
4023 // 'datas' is gramatically incorrect. We still use this notation to indicate
4024 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,Directory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)4025 Status VersionSet::LogAndApply(
4026     const autovector<ColumnFamilyData*>& column_family_datas,
4027     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4028     const autovector<autovector<VersionEdit*>>& edit_lists,
4029     InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log,
4030     const ColumnFamilyOptions* new_cf_options) {
4031   mu->AssertHeld();
4032   int num_edits = 0;
4033   for (const auto& elist : edit_lists) {
4034     num_edits += static_cast<int>(elist.size());
4035   }
4036   if (num_edits == 0) {
4037     return Status::OK();
4038   } else if (num_edits > 1) {
4039 #ifndef NDEBUG
4040     for (const auto& edit_list : edit_lists) {
4041       for (const auto& edit : edit_list) {
4042         assert(!edit->IsColumnFamilyManipulation());
4043       }
4044     }
4045 #endif /* ! NDEBUG */
4046   }
4047 
4048   int num_cfds = static_cast<int>(column_family_datas.size());
4049   if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4050     assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4051     assert(edit_lists[0][0]->is_column_family_add_);
4052     assert(new_cf_options != nullptr);
4053   }
4054   std::deque<ManifestWriter> writers;
4055   if (num_cfds > 0) {
4056     assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4057     assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4058   }
4059   for (int i = 0; i < num_cfds; ++i) {
4060     writers.emplace_back(mu, column_family_datas[i],
4061                          *mutable_cf_options_list[i], edit_lists[i]);
4062     manifest_writers_.push_back(&writers[i]);
4063   }
4064   assert(!writers.empty());
4065   ManifestWriter& first_writer = writers.front();
4066   while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4067     first_writer.cv.Wait();
4068   }
4069   if (first_writer.done) {
4070     // All non-CF-manipulation operations can be grouped together and committed
4071     // to MANIFEST. They should all have finished. The status code is stored in
4072     // the first manifest writer.
4073 #ifndef NDEBUG
4074     for (const auto& writer : writers) {
4075       assert(writer.done);
4076     }
4077 #endif /* !NDEBUG */
4078     return first_writer.status;
4079   }
4080 
4081   int num_undropped_cfds = 0;
4082   for (auto cfd : column_family_datas) {
4083     // if cfd == nullptr, it is a column family add.
4084     if (cfd == nullptr || !cfd->IsDropped()) {
4085       ++num_undropped_cfds;
4086     }
4087   }
4088   if (0 == num_undropped_cfds) {
4089     for (int i = 0; i != num_cfds; ++i) {
4090       manifest_writers_.pop_front();
4091     }
4092     // Notify new head of manifest write queue.
4093     if (!manifest_writers_.empty()) {
4094       manifest_writers_.front()->cv.Signal();
4095     }
4096     return Status::ColumnFamilyDropped();
4097   }
4098 
4099   return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4100                                new_cf_options);
4101 }
4102 
LogAndApplyCFHelper(VersionEdit * edit)4103 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4104   assert(edit->IsColumnFamilyManipulation());
4105   edit->SetNextFile(next_file_number_.load());
4106   // The log might have data that is not visible to memtbale and hence have not
4107   // updated the last_sequence_ yet. It is also possible that the log has is
4108   // expecting some new data that is not written yet. Since LastSequence is an
4109   // upper bound on the sequence, it is ok to record
4110   // last_allocated_sequence_ as the last sequence.
4111   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4112                                                       : last_sequence_);
4113   if (edit->is_column_family_drop_) {
4114     // if we drop column family, we have to make sure to save max column family,
4115     // so that we don't reuse existing ID
4116     edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4117   }
4118 }
4119 
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4120 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4121                                      VersionBuilder* builder, VersionEdit* edit,
4122                                      InstrumentedMutex* mu) {
4123 #ifdef NDEBUG
4124   (void)cfd;
4125 #endif
4126   mu->AssertHeld();
4127   assert(!edit->IsColumnFamilyManipulation());
4128 
4129   if (edit->has_log_number_) {
4130     assert(edit->log_number_ >= cfd->GetLogNumber());
4131     assert(edit->log_number_ < next_file_number_.load());
4132   }
4133 
4134   if (!edit->has_prev_log_number_) {
4135     edit->SetPrevLogNumber(prev_log_number_);
4136   }
4137   edit->SetNextFile(next_file_number_.load());
4138   // The log might have data that is not visible to memtbale and hence have not
4139   // updated the last_sequence_ yet. It is also possible that the log has is
4140   // expecting some new data that is not written yet. Since LastSequence is an
4141   // upper bound on the sequence, it is ok to record
4142   // last_allocated_sequence_ as the last sequence.
4143   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4144                                                       : last_sequence_);
4145 
4146   Status s = builder->Apply(edit);
4147 
4148   return s;
4149 }
4150 
ApplyOneVersionEditToBuilder(VersionEdit & edit,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params)4151 Status VersionSet::ApplyOneVersionEditToBuilder(
4152     VersionEdit& edit,
4153     const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4154     std::unordered_map<int, std::string>& column_families_not_found,
4155     std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4156         builders,
4157     VersionEditParams* version_edit_params) {
4158   // Not found means that user didn't supply that column
4159   // family option AND we encountered column family add
4160   // record. Once we encounter column family drop record,
4161   // we will delete the column family from
4162   // column_families_not_found.
4163   bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
4164                           column_families_not_found.end());
4165   // in builders means that user supplied that column family
4166   // option AND that we encountered column family add record
4167   bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
4168 
4169   // they can't both be true
4170   assert(!(cf_in_not_found && cf_in_builders));
4171 
4172   ColumnFamilyData* cfd = nullptr;
4173 
4174   if (edit.is_column_family_add_) {
4175     if (cf_in_builders || cf_in_not_found) {
4176       return Status::Corruption(
4177           "Manifest adding the same column family twice: " +
4178           edit.column_family_name_);
4179     }
4180     auto cf_options = name_to_options.find(edit.column_family_name_);
4181     // implicitly add persistent_stats column family without requiring user
4182     // to specify
4183     bool is_persistent_stats_column_family =
4184         edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
4185     if (cf_options == name_to_options.end() &&
4186         !is_persistent_stats_column_family) {
4187       column_families_not_found.insert(
4188           {edit.column_family_, edit.column_family_name_});
4189     } else {
4190       // recover persistent_stats CF from a DB that already contains it
4191       if (is_persistent_stats_column_family) {
4192         ColumnFamilyOptions cfo;
4193         OptimizeForPersistentStats(&cfo);
4194         cfd = CreateColumnFamily(cfo, &edit);
4195       } else {
4196         cfd = CreateColumnFamily(cf_options->second, &edit);
4197       }
4198       cfd->set_initialized();
4199       builders.insert(std::make_pair(
4200           edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4201                                    new BaseReferencedVersionBuilder(cfd))));
4202     }
4203   } else if (edit.is_column_family_drop_) {
4204     if (cf_in_builders) {
4205       auto builder = builders.find(edit.column_family_);
4206       assert(builder != builders.end());
4207       builders.erase(builder);
4208       cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4209       assert(cfd != nullptr);
4210       if (cfd->UnrefAndTryDelete()) {
4211         cfd = nullptr;
4212       } else {
4213         // who else can have reference to cfd!?
4214         assert(false);
4215       }
4216     } else if (cf_in_not_found) {
4217       column_families_not_found.erase(edit.column_family_);
4218     } else {
4219       return Status::Corruption(
4220           "Manifest - dropping non-existing column family");
4221     }
4222   } else if (!cf_in_not_found) {
4223     if (!cf_in_builders) {
4224       return Status::Corruption(
4225           "Manifest record referencing unknown column family");
4226     }
4227 
4228     cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4229     // this should never happen since cf_in_builders is true
4230     assert(cfd != nullptr);
4231 
4232     // if it is not column family add or column family drop,
4233     // then it's a file add/delete, which should be forwarded
4234     // to builder
4235     auto builder = builders.find(edit.column_family_);
4236     assert(builder != builders.end());
4237     Status s = builder->second->version_builder()->Apply(&edit);
4238     if (!s.ok()) {
4239       return s;
4240     }
4241   }
4242   return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
4243 }
4244 
ExtractInfoFromVersionEdit(ColumnFamilyData * cfd,const VersionEdit & from_edit,VersionEditParams * version_edit_params)4245 Status VersionSet::ExtractInfoFromVersionEdit(
4246     ColumnFamilyData* cfd, const VersionEdit& from_edit,
4247     VersionEditParams* version_edit_params) {
4248   if (cfd != nullptr) {
4249     if (from_edit.has_db_id_) {
4250       version_edit_params->SetDBId(from_edit.db_id_);
4251     }
4252     if (from_edit.has_log_number_) {
4253       if (cfd->GetLogNumber() > from_edit.log_number_) {
4254         ROCKS_LOG_WARN(
4255             db_options_->info_log,
4256             "MANIFEST corruption detected, but ignored - Log numbers in "
4257             "records NOT monotonically increasing");
4258       } else {
4259         cfd->SetLogNumber(from_edit.log_number_);
4260         version_edit_params->SetLogNumber(from_edit.log_number_);
4261       }
4262     }
4263     if (from_edit.has_comparator_ &&
4264         from_edit.comparator_ != cfd->user_comparator()->Name()) {
4265       return Status::InvalidArgument(
4266           cfd->user_comparator()->Name(),
4267           "does not match existing comparator " + from_edit.comparator_);
4268     }
4269   }
4270 
4271   if (from_edit.has_prev_log_number_) {
4272     version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
4273   }
4274 
4275   if (from_edit.has_next_file_number_) {
4276     version_edit_params->SetNextFile(from_edit.next_file_number_);
4277   }
4278 
4279   if (from_edit.has_max_column_family_) {
4280     version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
4281   }
4282 
4283   if (from_edit.has_min_log_number_to_keep_) {
4284     version_edit_params->min_log_number_to_keep_ =
4285         std::max(version_edit_params->min_log_number_to_keep_,
4286                  from_edit.min_log_number_to_keep_);
4287   }
4288 
4289   if (from_edit.has_last_sequence_) {
4290     version_edit_params->SetLastSequence(from_edit.last_sequence_);
4291   }
4292   return Status::OK();
4293 }
4294 
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4295 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4296                                           FileSystem* fs,
4297                                           std::string* manifest_path,
4298                                           uint64_t* manifest_file_number) {
4299   assert(fs != nullptr);
4300   assert(manifest_path != nullptr);
4301   assert(manifest_file_number != nullptr);
4302 
4303   std::string fname;
4304   Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4305   if (!s.ok()) {
4306     return s;
4307   }
4308   if (fname.empty() || fname.back() != '\n') {
4309     return Status::Corruption("CURRENT file does not end with newline");
4310   }
4311   // remove the trailing '\n'
4312   fname.resize(fname.size() - 1);
4313   FileType type;
4314   bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4315   if (!parse_ok || type != kDescriptorFile) {
4316     return Status::Corruption("CURRENT file corrupted");
4317   }
4318   *manifest_path = dbname;
4319   if (dbname.back() != '/') {
4320     manifest_path->push_back('/');
4321   }
4322   *manifest_path += fname;
4323   return Status::OK();
4324 }
4325 
ReadAndRecover(log::Reader * reader,AtomicGroupReadBuffer * read_buffer,const std::unordered_map<std::string,ColumnFamilyOptions> & name_to_options,std::unordered_map<int,std::string> & column_families_not_found,std::unordered_map<uint32_t,std::unique_ptr<BaseReferencedVersionBuilder>> & builders,VersionEditParams * version_edit_params,std::string * db_id)4326 Status VersionSet::ReadAndRecover(
4327     log::Reader* reader, AtomicGroupReadBuffer* read_buffer,
4328     const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4329     std::unordered_map<int, std::string>& column_families_not_found,
4330     std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4331         builders,
4332     VersionEditParams* version_edit_params, std::string* db_id) {
4333   assert(reader != nullptr);
4334   assert(read_buffer != nullptr);
4335   Status s;
4336   Slice record;
4337   std::string scratch;
4338   size_t recovered_edits = 0;
4339   while (reader->ReadRecord(&record, &scratch) && s.ok()) {
4340     VersionEdit edit;
4341     s = edit.DecodeFrom(record);
4342     if (!s.ok()) {
4343       break;
4344     }
4345     if (edit.has_db_id_) {
4346       db_id_ = edit.GetDbId();
4347       if (db_id != nullptr) {
4348         db_id->assign(edit.GetDbId());
4349       }
4350     }
4351     s = read_buffer->AddEdit(&edit);
4352     if (!s.ok()) {
4353       break;
4354     }
4355     if (edit.is_in_atomic_group_) {
4356       if (read_buffer->IsFull()) {
4357         // Apply edits in an atomic group when we have read all edits in the
4358         // group.
4359         for (auto& e : read_buffer->replay_buffer()) {
4360           s = ApplyOneVersionEditToBuilder(e, name_to_options,
4361                                            column_families_not_found, builders,
4362                                            version_edit_params);
4363           if (!s.ok()) {
4364             break;
4365           }
4366           recovered_edits++;
4367         }
4368         if (!s.ok()) {
4369           break;
4370         }
4371         read_buffer->Clear();
4372       }
4373     } else {
4374       // Apply a normal edit immediately.
4375       s = ApplyOneVersionEditToBuilder(edit, name_to_options,
4376                                        column_families_not_found, builders,
4377                                        version_edit_params);
4378       if (s.ok()) {
4379         recovered_edits++;
4380       }
4381     }
4382   }
4383   if (!s.ok()) {
4384     // Clear the buffer if we fail to decode/apply an edit.
4385     read_buffer->Clear();
4386   }
4387   TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
4388                            &recovered_edits);
4389   return s;
4390 }
4391 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4392 Status VersionSet::Recover(
4393     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4394     std::string* db_id) {
4395   std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
4396   for (const auto& cf : column_families) {
4397     cf_name_to_options.emplace(cf.name, cf.options);
4398   }
4399   // keeps track of column families in manifest that were not found in
4400   // column families parameters. if those column families are not dropped
4401   // by subsequent manifest records, Recover() will return failure status
4402   std::unordered_map<int, std::string> column_families_not_found;
4403 
4404   // Read "CURRENT" file, which contains a pointer to the current manifest file
4405   std::string manifest_path;
4406   Status s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
4407                                     &manifest_file_number_);
4408   if (!s.ok()) {
4409     return s;
4410   }
4411 
4412   ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4413                  manifest_path.c_str());
4414 
4415   std::unique_ptr<SequentialFileReader> manifest_file_reader;
4416   {
4417     std::unique_ptr<FSSequentialFile> manifest_file;
4418     s = fs_->NewSequentialFile(manifest_path,
4419                                fs_->OptimizeForManifestRead(file_options_),
4420                                &manifest_file, nullptr);
4421     if (!s.ok()) {
4422       return s;
4423     }
4424     manifest_file_reader.reset(
4425         new SequentialFileReader(std::move(manifest_file), manifest_path,
4426                                  db_options_->log_readahead_size));
4427   }
4428 
4429   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4430       builders;
4431 
4432   // add default column family
4433   auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
4434   if (default_cf_iter == cf_name_to_options.end()) {
4435     return Status::InvalidArgument("Default column family not specified");
4436   }
4437   VersionEdit default_cf_edit;
4438   default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4439   default_cf_edit.SetColumnFamily(0);
4440   ColumnFamilyData* default_cfd =
4441       CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
4442   // In recovery, nobody else can access it, so it's fine to set it to be
4443   // initialized earlier.
4444   default_cfd->set_initialized();
4445   builders.insert(
4446       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4447                             new BaseReferencedVersionBuilder(default_cfd))));
4448   uint64_t current_manifest_file_size = 0;
4449   VersionEditParams version_edit_params;
4450   {
4451     VersionSet::LogReporter reporter;
4452     reporter.status = &s;
4453     log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4454                        true /* checksum */, 0 /* log_number */);
4455     Slice record;
4456     std::string scratch;
4457     AtomicGroupReadBuffer read_buffer;
4458     s = ReadAndRecover(&reader, &read_buffer, cf_name_to_options,
4459                        column_families_not_found, builders,
4460                        &version_edit_params, db_id);
4461     current_manifest_file_size = reader.GetReadOffset();
4462     assert(current_manifest_file_size != 0);
4463   }
4464 
4465   if (s.ok()) {
4466     if (!version_edit_params.has_next_file_number_) {
4467       s = Status::Corruption("no meta-nextfile entry in descriptor");
4468     } else if (!version_edit_params.has_log_number_) {
4469       s = Status::Corruption("no meta-lognumber entry in descriptor");
4470     } else if (!version_edit_params.has_last_sequence_) {
4471       s = Status::Corruption("no last-sequence-number entry in descriptor");
4472     }
4473 
4474     if (!version_edit_params.has_prev_log_number_) {
4475       version_edit_params.SetPrevLogNumber(0);
4476     }
4477 
4478     column_family_set_->UpdateMaxColumnFamily(
4479         version_edit_params.max_column_family_);
4480 
4481     // When reading DB generated using old release, min_log_number_to_keep=0.
4482     // All log files will be scanned for potential prepare entries.
4483     MarkMinLogNumberToKeep2PC(version_edit_params.min_log_number_to_keep_);
4484     MarkFileNumberUsed(version_edit_params.prev_log_number_);
4485     MarkFileNumberUsed(version_edit_params.log_number_);
4486   }
4487 
4488   // there were some column families in the MANIFEST that weren't specified
4489   // in the argument. This is OK in read_only mode
4490   if (read_only == false && !column_families_not_found.empty()) {
4491     std::string list_of_not_found;
4492     for (const auto& cf : column_families_not_found) {
4493       list_of_not_found += ", " + cf.second;
4494     }
4495     list_of_not_found = list_of_not_found.substr(2);
4496     s = Status::InvalidArgument(
4497         "You have to open all column families. Column families not opened: " +
4498         list_of_not_found);
4499   }
4500 
4501   if (s.ok()) {
4502     for (auto cfd : *column_family_set_) {
4503       assert(builders.count(cfd->GetID()) > 0);
4504       auto* builder = builders[cfd->GetID()]->version_builder();
4505       if (!builder->CheckConsistencyForNumLevels()) {
4506         s = Status::InvalidArgument(
4507             "db has more levels than options.num_levels");
4508         break;
4509       }
4510     }
4511   }
4512 
4513   if (s.ok()) {
4514     for (auto cfd : *column_family_set_) {
4515       if (cfd->IsDropped()) {
4516         continue;
4517       }
4518       if (read_only) {
4519         cfd->table_cache()->SetTablesAreImmortal();
4520       }
4521       assert(cfd->initialized());
4522       auto builders_iter = builders.find(cfd->GetID());
4523       assert(builders_iter != builders.end());
4524       auto builder = builders_iter->second->version_builder();
4525 
4526       // unlimited table cache. Pre-load table handle now.
4527       // Need to do it out of the mutex.
4528       s = builder->LoadTableHandlers(
4529           cfd->internal_stats(), db_options_->max_file_opening_threads,
4530           false /* prefetch_index_and_filter_in_cache */,
4531           true /* is_initial_load */,
4532           cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
4533       if (!s.ok()) {
4534         if (db_options_->paranoid_checks) {
4535           return s;
4536         }
4537         s = Status::OK();
4538       }
4539 
4540       Version* v = new Version(cfd, this, file_options_,
4541                                *cfd->GetLatestMutableCFOptions(),
4542                                current_version_number_++);
4543       builder->SaveTo(v->storage_info());
4544 
4545       // Install recovered version
4546       v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
4547           !(db_options_->skip_stats_update_on_db_open));
4548       AppendVersion(cfd, v);
4549     }
4550 
4551     manifest_file_size_ = current_manifest_file_size;
4552     next_file_number_.store(version_edit_params.next_file_number_ + 1);
4553     last_allocated_sequence_ = version_edit_params.last_sequence_;
4554     last_published_sequence_ = version_edit_params.last_sequence_;
4555     last_sequence_ = version_edit_params.last_sequence_;
4556     prev_log_number_ = version_edit_params.prev_log_number_;
4557 
4558     ROCKS_LOG_INFO(
4559         db_options_->info_log,
4560         "Recovered from manifest file:%s succeeded,"
4561         "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4562         ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4563         ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4564         ",min_log_number_to_keep is %" PRIu64 "\n",
4565         manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4566         last_sequence_.load(), version_edit_params.log_number_,
4567         prev_log_number_, column_family_set_->GetMaxColumnFamily(),
4568         min_log_number_to_keep_2pc());
4569 
4570     for (auto cfd : *column_family_set_) {
4571       if (cfd->IsDropped()) {
4572         continue;
4573       }
4574       ROCKS_LOG_INFO(db_options_->info_log,
4575                      "Column family [%s] (ID %" PRIu32
4576                      "), log number is %" PRIu64 "\n",
4577                      cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4578     }
4579   }
4580 
4581   return s;
4582 }
4583 
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)4584 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4585                                       const std::string& dbname,
4586                                       FileSystem* fs) {
4587   // these are just for performance reasons, not correcntes,
4588   // so we're fine using the defaults
4589   FileOptions soptions;
4590   // Read "CURRENT" file, which contains a pointer to the current manifest file
4591   std::string manifest_path;
4592   uint64_t manifest_file_number;
4593   Status s =
4594       GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
4595   if (!s.ok()) {
4596     return s;
4597   }
4598 
4599   std::unique_ptr<SequentialFileReader> file_reader;
4600   {
4601     std::unique_ptr<FSSequentialFile> file;
4602     s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4603     if (!s.ok()) {
4604       return s;
4605   }
4606   file_reader.reset(new SequentialFileReader(std::move(file), manifest_path));
4607   }
4608 
4609   std::map<uint32_t, std::string> column_family_names;
4610   // default column family is always implicitly there
4611   column_family_names.insert({0, kDefaultColumnFamilyName});
4612   VersionSet::LogReporter reporter;
4613   reporter.status = &s;
4614   log::Reader reader(nullptr, std::move(file_reader), &reporter,
4615                      true /* checksum */, 0 /* log_number */);
4616   Slice record;
4617   std::string scratch;
4618   while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4619     VersionEdit edit;
4620     s = edit.DecodeFrom(record);
4621     if (!s.ok()) {
4622       break;
4623     }
4624     if (edit.is_column_family_add_) {
4625       if (column_family_names.find(edit.column_family_) !=
4626           column_family_names.end()) {
4627         s = Status::Corruption("Manifest adding the same column family twice");
4628         break;
4629       }
4630       column_family_names.insert(
4631           {edit.column_family_, edit.column_family_name_});
4632     } else if (edit.is_column_family_drop_) {
4633       if (column_family_names.find(edit.column_family_) ==
4634           column_family_names.end()) {
4635         s = Status::Corruption(
4636             "Manifest - dropping non-existing column family");
4637         break;
4638       }
4639       column_family_names.erase(edit.column_family_);
4640     }
4641   }
4642 
4643   column_families->clear();
4644   if (s.ok()) {
4645     for (const auto& iter : column_family_names) {
4646       column_families->push_back(iter.second);
4647     }
4648   }
4649 
4650   return s;
4651 }
4652 
4653 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)4654 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
4655                                         const Options* options,
4656                                         const FileOptions& file_options,
4657                                         int new_levels) {
4658   if (new_levels <= 1) {
4659     return Status::InvalidArgument(
4660         "Number of levels needs to be bigger than 1");
4661   }
4662 
4663   ImmutableDBOptions db_options(*options);
4664   ColumnFamilyOptions cf_options(*options);
4665   std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
4666                                         options->table_cache_numshardbits));
4667   WriteController wc(options->delayed_write_rate);
4668   WriteBufferManager wb(options->db_write_buffer_size);
4669   VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4670                       /*block_cache_tracer=*/nullptr);
4671   Status status;
4672 
4673   std::vector<ColumnFamilyDescriptor> dummy;
4674   ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
4675                                           ColumnFamilyOptions(*options));
4676   dummy.push_back(dummy_descriptor);
4677   status = versions.Recover(dummy);
4678   if (!status.ok()) {
4679     return status;
4680   }
4681 
4682   Version* current_version =
4683       versions.GetColumnFamilySet()->GetDefault()->current();
4684   auto* vstorage = current_version->storage_info();
4685   int current_levels = vstorage->num_levels();
4686 
4687   if (current_levels <= new_levels) {
4688     return Status::OK();
4689   }
4690 
4691   // Make sure there are file only on one level from
4692   // (new_levels-1) to (current_levels-1)
4693   int first_nonempty_level = -1;
4694   int first_nonempty_level_filenum = 0;
4695   for (int i = new_levels - 1; i < current_levels; i++) {
4696     int file_num = vstorage->NumLevelFiles(i);
4697     if (file_num != 0) {
4698       if (first_nonempty_level < 0) {
4699         first_nonempty_level = i;
4700         first_nonempty_level_filenum = file_num;
4701       } else {
4702         char msg[255];
4703         snprintf(msg, sizeof(msg),
4704                  "Found at least two levels containing files: "
4705                  "[%d:%d],[%d:%d].\n",
4706                  first_nonempty_level, first_nonempty_level_filenum, i,
4707                  file_num);
4708         return Status::InvalidArgument(msg);
4709       }
4710     }
4711   }
4712 
4713   // we need to allocate an array with the old number of levels size to
4714   // avoid SIGSEGV in WriteCurrentStatetoManifest()
4715   // however, all levels bigger or equal to new_levels will be empty
4716   std::vector<FileMetaData*>* new_files_list =
4717       new std::vector<FileMetaData*>[current_levels];
4718   for (int i = 0; i < new_levels - 1; i++) {
4719     new_files_list[i] = vstorage->LevelFiles(i);
4720   }
4721 
4722   if (first_nonempty_level > 0) {
4723     new_files_list[new_levels - 1] = vstorage->LevelFiles(first_nonempty_level);
4724   }
4725 
4726   delete[] vstorage -> files_;
4727   vstorage->files_ = new_files_list;
4728   vstorage->num_levels_ = new_levels;
4729 
4730   MutableCFOptions mutable_cf_options(*options);
4731   VersionEdit ve;
4732   InstrumentedMutex dummy_mutex;
4733   InstrumentedMutexLock l(&dummy_mutex);
4734   return versions.LogAndApply(
4735       versions.GetColumnFamilySet()->GetDefault(),
4736       mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
4737 }
4738 
4739 // Get the checksum information including the checksum and checksum function
4740 // name of all SST files in VersionSet. Store the information in
4741 // FileChecksumList which contains a map from file number to its checksum info.
4742 // If DB is not running, make sure call VersionSet::Recover() to load the file
4743 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)4744 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
4745   // Clean the previously stored checksum information if any.
4746   if (checksum_list == nullptr) {
4747     return Status::InvalidArgument("checksum_list is nullptr");
4748   }
4749   checksum_list->reset();
4750 
4751   for (auto cfd : *column_family_set_) {
4752     if (cfd->IsDropped() || !cfd->initialized()) {
4753       continue;
4754     }
4755     for (int level = 0; level < cfd->NumberLevels(); level++) {
4756       for (const auto& file :
4757            cfd->current()->storage_info()->LevelFiles(level)) {
4758         checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
4759                                              file->file_checksum,
4760                                              file->file_checksum_func_name);
4761       }
4762     }
4763   }
4764   return Status::OK();
4765 }
4766 
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)4767 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
4768                                 bool verbose, bool hex, bool json) {
4769   // Open the specified manifest file.
4770   std::unique_ptr<SequentialFileReader> file_reader;
4771   Status s;
4772   {
4773     std::unique_ptr<FSSequentialFile> file;
4774     s = options.file_system->NewSequentialFile(
4775         dscname,
4776         options.file_system->OptimizeForManifestRead(file_options_), &file,
4777         nullptr);
4778     if (!s.ok()) {
4779       return s;
4780     }
4781     file_reader.reset(new SequentialFileReader(
4782         std::move(file), dscname, db_options_->log_readahead_size));
4783   }
4784 
4785   bool have_prev_log_number = false;
4786   bool have_next_file = false;
4787   bool have_last_sequence = false;
4788   uint64_t next_file = 0;
4789   uint64_t last_sequence = 0;
4790   uint64_t previous_log_number = 0;
4791   int count = 0;
4792   std::unordered_map<uint32_t, std::string> comparators;
4793   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
4794       builders;
4795 
4796   // add default column family
4797   VersionEdit default_cf_edit;
4798   default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
4799   default_cf_edit.SetColumnFamily(0);
4800   ColumnFamilyData* default_cfd =
4801       CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
4802   builders.insert(
4803       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
4804                             new BaseReferencedVersionBuilder(default_cfd))));
4805 
4806   {
4807     VersionSet::LogReporter reporter;
4808     reporter.status = &s;
4809     log::Reader reader(nullptr, std::move(file_reader), &reporter,
4810                        true /* checksum */, 0 /* log_number */);
4811     Slice record;
4812     std::string scratch;
4813     while (reader.ReadRecord(&record, &scratch) && s.ok()) {
4814       VersionEdit edit;
4815       s = edit.DecodeFrom(record);
4816       if (!s.ok()) {
4817         break;
4818       }
4819 
4820       // Write out each individual edit
4821       if (verbose && !json) {
4822         printf("%s\n", edit.DebugString(hex).c_str());
4823       } else if (json) {
4824         printf("%s\n", edit.DebugJSON(count, hex).c_str());
4825       }
4826       count++;
4827 
4828       bool cf_in_builders =
4829           builders.find(edit.column_family_) != builders.end();
4830 
4831       if (edit.has_comparator_) {
4832         comparators.insert({edit.column_family_, edit.comparator_});
4833       }
4834 
4835       ColumnFamilyData* cfd = nullptr;
4836 
4837       if (edit.is_column_family_add_) {
4838         if (cf_in_builders) {
4839           s = Status::Corruption(
4840               "Manifest adding the same column family twice");
4841           break;
4842         }
4843         cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
4844         cfd->set_initialized();
4845         builders.insert(std::make_pair(
4846             edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4847                                      new BaseReferencedVersionBuilder(cfd))));
4848       } else if (edit.is_column_family_drop_) {
4849         if (!cf_in_builders) {
4850           s = Status::Corruption(
4851               "Manifest - dropping non-existing column family");
4852           break;
4853         }
4854         auto builder_iter = builders.find(edit.column_family_);
4855         builders.erase(builder_iter);
4856         comparators.erase(edit.column_family_);
4857         cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4858         assert(cfd != nullptr);
4859         cfd->UnrefAndTryDelete();
4860         cfd = nullptr;
4861       } else {
4862         if (!cf_in_builders) {
4863           s = Status::Corruption(
4864               "Manifest record referencing unknown column family");
4865           break;
4866         }
4867 
4868         cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4869         // this should never happen since cf_in_builders is true
4870         assert(cfd != nullptr);
4871 
4872         // if it is not column family add or column family drop,
4873         // then it's a file add/delete, which should be forwarded
4874         // to builder
4875         auto builder = builders.find(edit.column_family_);
4876         assert(builder != builders.end());
4877         s = builder->second->version_builder()->Apply(&edit);
4878         if (!s.ok()) {
4879           break;
4880         }
4881       }
4882 
4883       if (cfd != nullptr && edit.has_log_number_) {
4884         cfd->SetLogNumber(edit.log_number_);
4885       }
4886 
4887 
4888       if (edit.has_prev_log_number_) {
4889         previous_log_number = edit.prev_log_number_;
4890         have_prev_log_number = true;
4891       }
4892 
4893       if (edit.has_next_file_number_) {
4894         next_file = edit.next_file_number_;
4895         have_next_file = true;
4896       }
4897 
4898       if (edit.has_last_sequence_) {
4899         last_sequence = edit.last_sequence_;
4900         have_last_sequence = true;
4901       }
4902 
4903       if (edit.has_max_column_family_) {
4904         column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
4905       }
4906 
4907       if (edit.has_min_log_number_to_keep_) {
4908         MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
4909       }
4910     }
4911   }
4912   file_reader.reset();
4913 
4914   if (s.ok()) {
4915     if (!have_next_file) {
4916       s = Status::Corruption("no meta-nextfile entry in descriptor");
4917       printf("no meta-nextfile entry in descriptor");
4918     } else if (!have_last_sequence) {
4919       printf("no last-sequence-number entry in descriptor");
4920       s = Status::Corruption("no last-sequence-number entry in descriptor");
4921     }
4922 
4923     if (!have_prev_log_number) {
4924       previous_log_number = 0;
4925     }
4926   }
4927 
4928   if (s.ok()) {
4929     for (auto cfd : *column_family_set_) {
4930       if (cfd->IsDropped()) {
4931         continue;
4932       }
4933       auto builders_iter = builders.find(cfd->GetID());
4934       assert(builders_iter != builders.end());
4935       auto builder = builders_iter->second->version_builder();
4936 
4937       Version* v = new Version(cfd, this, file_options_,
4938                                *cfd->GetLatestMutableCFOptions(),
4939                                current_version_number_++);
4940       builder->SaveTo(v->storage_info());
4941       v->PrepareApply(*cfd->GetLatestMutableCFOptions(), false);
4942 
4943       printf("--------------- Column family \"%s\"  (ID %" PRIu32
4944              ") --------------\n",
4945              cfd->GetName().c_str(), cfd->GetID());
4946       printf("log number: %" PRIu64 "\n", cfd->GetLogNumber());
4947       auto comparator = comparators.find(cfd->GetID());
4948       if (comparator != comparators.end()) {
4949         printf("comparator: %s\n", comparator->second.c_str());
4950       } else {
4951         printf("comparator: <NO COMPARATOR>\n");
4952       }
4953       printf("%s \n", v->DebugString(hex).c_str());
4954       delete v;
4955     }
4956 
4957     next_file_number_.store(next_file + 1);
4958     last_allocated_sequence_ = last_sequence;
4959     last_published_sequence_ = last_sequence;
4960     last_sequence_ = last_sequence;
4961     prev_log_number_ = previous_log_number;
4962 
4963     printf("next_file_number %" PRIu64 " last_sequence %" PRIu64
4964            "  prev_log_number %" PRIu64 " max_column_family %" PRIu32
4965            " min_log_number_to_keep "
4966            "%" PRIu64 "\n",
4967            next_file_number_.load(), last_sequence, previous_log_number,
4968            column_family_set_->GetMaxColumnFamily(),
4969            min_log_number_to_keep_2pc());
4970   }
4971 
4972   return s;
4973 }
4974 #endif  // ROCKSDB_LITE
4975 
MarkFileNumberUsed(uint64_t number)4976 void VersionSet::MarkFileNumberUsed(uint64_t number) {
4977   // only called during recovery and repair which are single threaded, so this
4978   // works because there can't be concurrent calls
4979   if (next_file_number_.load(std::memory_order_relaxed) <= number) {
4980     next_file_number_.store(number + 1, std::memory_order_relaxed);
4981   }
4982 }
4983 // Called only either from ::LogAndApply which is protected by mutex or during
4984 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)4985 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
4986   if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
4987     min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
4988   }
4989 }
4990 
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,log::Writer * log)4991 Status VersionSet::WriteCurrentStateToManifest(
4992     const std::unordered_map<uint32_t, MutableCFState>& curr_state,
4993     log::Writer* log) {
4994   // TODO: Break up into multiple records to reduce memory usage on recovery?
4995 
4996   // WARNING: This method doesn't hold a mutex!!
4997 
4998   // This is done without DB mutex lock held, but only within single-threaded
4999   // LogAndApply. Column family manipulations can only happen within LogAndApply
5000   // (the same single thread), so we're safe to iterate.
5001 
5002   if (db_options_->write_dbid_to_manifest) {
5003     VersionEdit edit_for_db_id;
5004     assert(!db_id_.empty());
5005     edit_for_db_id.SetDBId(db_id_);
5006     std::string db_id_record;
5007     if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5008       return Status::Corruption("Unable to Encode VersionEdit:" +
5009                                 edit_for_db_id.DebugString(true));
5010     }
5011     Status add_record = log->AddRecord(db_id_record);
5012     if (!add_record.ok()) {
5013       return add_record;
5014     }
5015   }
5016 
5017   for (auto cfd : *column_family_set_) {
5018     if (cfd->IsDropped()) {
5019       continue;
5020     }
5021     assert(cfd->initialized());
5022     {
5023       // Store column family info
5024       VersionEdit edit;
5025       if (cfd->GetID() != 0) {
5026         // default column family is always there,
5027         // no need to explicitly write it
5028         edit.AddColumnFamily(cfd->GetName());
5029         edit.SetColumnFamily(cfd->GetID());
5030       }
5031       edit.SetComparatorName(
5032           cfd->internal_comparator().user_comparator()->Name());
5033       std::string record;
5034       if (!edit.EncodeTo(&record)) {
5035         return Status::Corruption(
5036             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5037       }
5038       Status s = log->AddRecord(record);
5039       if (!s.ok()) {
5040         return s;
5041       }
5042     }
5043 
5044     {
5045       // Save files
5046       VersionEdit edit;
5047       edit.SetColumnFamily(cfd->GetID());
5048 
5049       for (int level = 0; level < cfd->NumberLevels(); level++) {
5050         for (const auto& f :
5051              cfd->current()->storage_info()->LevelFiles(level)) {
5052           edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5053                        f->fd.GetFileSize(), f->smallest, f->largest,
5054                        f->fd.smallest_seqno, f->fd.largest_seqno,
5055                        f->marked_for_compaction, f->oldest_blob_file_number,
5056                        f->oldest_ancester_time, f->file_creation_time,
5057                        f->file_checksum, f->file_checksum_func_name);
5058         }
5059       }
5060       const auto iter = curr_state.find(cfd->GetID());
5061       assert(iter != curr_state.end());
5062       uint64_t log_number = iter->second.log_number;
5063       edit.SetLogNumber(log_number);
5064       std::string record;
5065       if (!edit.EncodeTo(&record)) {
5066         return Status::Corruption(
5067             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5068       }
5069       Status s = log->AddRecord(record);
5070       if (!s.ok()) {
5071         return s;
5072       }
5073     }
5074   }
5075   return Status::OK();
5076 }
5077 
5078 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5079 // function is called repeatedly with consecutive pairs of slices. For example
5080 // if the slice list is [a, b, c, d] this function is called with arguments
5081 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5082 // we avoid doing binary search for the keys b and c twice and instead somehow
5083 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5084 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5085                                      Version* v, const Slice& start,
5086                                      const Slice& end, int start_level,
5087                                      int end_level, TableReaderCaller caller) {
5088   const auto& icmp = v->cfd_->internal_comparator();
5089 
5090   // pre-condition
5091   assert(icmp.Compare(start, end) <= 0);
5092 
5093   uint64_t total_full_size = 0;
5094   const auto* vstorage = v->storage_info();
5095   const int num_non_empty_levels = vstorage->num_non_empty_levels();
5096   end_level = (end_level == -1) ? num_non_empty_levels
5097                                 : std::min(end_level, num_non_empty_levels);
5098 
5099   assert(start_level <= end_level);
5100 
5101   // Outline of the optimization that uses options.files_size_error_margin.
5102   // When approximating the files total size that is used to store a keys range,
5103   // we first sum up the sizes of the files that fully fall into the range.
5104   // Then we sum up the sizes of all the files that may intersect with the range
5105   // (this includes all files in L0 as well). Then, if total_intersecting_size
5106   // is smaller than total_full_size * options.files_size_error_margin - we can
5107   // infer that the intersecting files have a sufficiently negligible
5108   // contribution to the total size, and we can approximate the storage required
5109   // for the keys in range as just half of the intersecting_files_size.
5110   // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5111   // approximation is limited to only ~10% of the total size of files that fully
5112   // fall into the keys range. In such case, this helps to avoid a costly
5113   // process of binary searching the intersecting files that is required only
5114   // for a more precise calculation of the total size.
5115 
5116   autovector<FdWithKeyRange*, 32> first_files;
5117   autovector<FdWithKeyRange*, 16> last_files;
5118 
5119   // scan all the levels
5120   for (int level = start_level; level < end_level; ++level) {
5121     const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5122     if (files_brief.num_files == 0) {
5123       // empty level, skip exploration
5124       continue;
5125     }
5126 
5127     if (level == 0) {
5128       // level 0 files are not in sorted order, we need to iterate through
5129       // the list to compute the total bytes that require scanning,
5130       // so handle the case explicitly (similarly to first_files case)
5131       for (size_t i = 0; i < files_brief.num_files; i++) {
5132         first_files.push_back(&files_brief.files[i]);
5133       }
5134       continue;
5135     }
5136 
5137     assert(level > 0);
5138     assert(files_brief.num_files > 0);
5139 
5140     // identify the file position for start key
5141     const int idx_start =
5142         FindFileInRange(icmp, files_brief, start, 0,
5143                         static_cast<uint32_t>(files_brief.num_files - 1));
5144     assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5145 
5146     // identify the file position for end key
5147     int idx_end = idx_start;
5148     if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5149       idx_end =
5150           FindFileInRange(icmp, files_brief, end, idx_start,
5151                           static_cast<uint32_t>(files_brief.num_files - 1));
5152     }
5153     assert(idx_end >= idx_start &&
5154            static_cast<size_t>(idx_end) < files_brief.num_files);
5155 
5156     // scan all files from the starting index to the ending index
5157     // (inferred from the sorted order)
5158 
5159     // first scan all the intermediate full files (excluding first and last)
5160     for (int i = idx_start + 1; i < idx_end; ++i) {
5161       uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5162       // The entire file falls into the range, so we can just take its size.
5163       assert(file_size ==
5164              ApproximateSize(v, files_brief.files[i], start, end, caller));
5165       total_full_size += file_size;
5166     }
5167 
5168     // save the first and the last files (which may be the same file), so we
5169     // can scan them later.
5170     first_files.push_back(&files_brief.files[idx_start]);
5171     if (idx_start != idx_end) {
5172       // we need to estimate size for both files, only if they are different
5173       last_files.push_back(&files_brief.files[idx_end]);
5174     }
5175   }
5176 
5177   // The sum of all file sizes that intersect the [start, end] keys range.
5178   uint64_t total_intersecting_size = 0;
5179   for (const auto* file_ptr : first_files) {
5180     total_intersecting_size += file_ptr->fd.GetFileSize();
5181   }
5182   for (const auto* file_ptr : last_files) {
5183     total_intersecting_size += file_ptr->fd.GetFileSize();
5184   }
5185 
5186   // Now scan all the first & last files at each level, and estimate their size.
5187   // If the total_intersecting_size is less than X% of the total_full_size - we
5188   // want to approximate the result in order to avoid the costly binary search
5189   // inside ApproximateSize. We use half of file size as an approximation below.
5190 
5191   const double margin = options.files_size_error_margin;
5192   if (margin > 0 && total_intersecting_size <
5193                         static_cast<uint64_t>(total_full_size * margin)) {
5194     total_full_size += total_intersecting_size / 2;
5195   } else {
5196     // Estimate for all the first files, at each level
5197     for (const auto file_ptr : first_files) {
5198       total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5199     }
5200 
5201     // Estimate for all the last files, at each level
5202     for (const auto file_ptr : last_files) {
5203       // We could use ApproximateSize here, but calling ApproximateOffsetOf
5204       // directly is just more efficient.
5205       total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5206     }
5207   }
5208 
5209   return total_full_size;
5210 }
5211 
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5212 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5213                                          const Slice& key,
5214                                          TableReaderCaller caller) {
5215   // pre-condition
5216   assert(v);
5217   const auto& icmp = v->cfd_->internal_comparator();
5218 
5219   uint64_t result = 0;
5220   if (icmp.Compare(f.largest_key, key) <= 0) {
5221     // Entire file is before "key", so just add the file size
5222     result = f.fd.GetFileSize();
5223   } else if (icmp.Compare(f.smallest_key, key) > 0) {
5224     // Entire file is after "key", so ignore
5225     result = 0;
5226   } else {
5227     // "key" falls in the range for this table.  Add the
5228     // approximate offset of "key" within the table.
5229     TableCache* table_cache = v->cfd_->table_cache();
5230     if (table_cache != nullptr) {
5231       result = table_cache->ApproximateOffsetOf(
5232           key, f.file_metadata->fd, caller, icmp,
5233           v->GetMutableCFOptions().prefix_extractor.get());
5234     }
5235   }
5236   return result;
5237 }
5238 
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5239 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5240                                      const Slice& start, const Slice& end,
5241                                      TableReaderCaller caller) {
5242   // pre-condition
5243   assert(v);
5244   const auto& icmp = v->cfd_->internal_comparator();
5245   assert(icmp.Compare(start, end) <= 0);
5246 
5247   if (icmp.Compare(f.largest_key, start) <= 0 ||
5248       icmp.Compare(f.smallest_key, end) > 0) {
5249     // Entire file is before or after the start/end keys range
5250     return 0;
5251   }
5252 
5253   if (icmp.Compare(f.smallest_key, start) >= 0) {
5254     // Start of the range is before the file start - approximate by end offset
5255     return ApproximateOffsetOf(v, f, end, caller);
5256   }
5257 
5258   if (icmp.Compare(f.largest_key, end) < 0) {
5259     // End of the range is after the file end - approximate by subtracting
5260     // start offset from the file size
5261     uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5262     assert(f.fd.GetFileSize() >= start_offset);
5263     return f.fd.GetFileSize() - start_offset;
5264   }
5265 
5266   // The interval falls entirely in the range for this file.
5267   TableCache* table_cache = v->cfd_->table_cache();
5268   if (table_cache == nullptr) {
5269     return 0;
5270   }
5271   return table_cache->ApproximateSize(
5272       start, end, f.file_metadata->fd, caller, icmp,
5273       v->GetMutableCFOptions().prefix_extractor.get());
5274 }
5275 
AddLiveFiles(std::vector<FileDescriptor> * live_list)5276 void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
5277   // pre-calculate space requirement
5278   int64_t total_files = 0;
5279   for (auto cfd : *column_family_set_) {
5280     if (!cfd->initialized()) {
5281       continue;
5282     }
5283     Version* dummy_versions = cfd->dummy_versions();
5284     for (Version* v = dummy_versions->next_; v != dummy_versions;
5285          v = v->next_) {
5286       const auto* vstorage = v->storage_info();
5287       for (int level = 0; level < vstorage->num_levels(); level++) {
5288         total_files += vstorage->LevelFiles(level).size();
5289       }
5290     }
5291   }
5292 
5293   // just one time extension to the right size
5294   live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
5295 
5296   for (auto cfd : *column_family_set_) {
5297     if (!cfd->initialized()) {
5298       continue;
5299     }
5300     auto* current = cfd->current();
5301     bool found_current = false;
5302     Version* dummy_versions = cfd->dummy_versions();
5303     for (Version* v = dummy_versions->next_; v != dummy_versions;
5304          v = v->next_) {
5305       v->AddLiveFiles(live_list);
5306       if (v == current) {
5307         found_current = true;
5308       }
5309     }
5310     if (!found_current && current != nullptr) {
5311       // Should never happen unless it is a bug.
5312       assert(false);
5313       current->AddLiveFiles(live_list);
5314     }
5315   }
5316 }
5317 
MakeInputIterator(const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5318 InternalIterator* VersionSet::MakeInputIterator(
5319     const Compaction* c, RangeDelAggregator* range_del_agg,
5320     const FileOptions& file_options_compactions) {
5321   auto cfd = c->column_family_data();
5322   ReadOptions read_options;
5323   read_options.verify_checksums = true;
5324   read_options.fill_cache = false;
5325   // Compaction iterators shouldn't be confined to a single prefix.
5326   // Compactions use Seek() for
5327   // (a) concurrent compactions,
5328   // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
5329   read_options.total_order_seek = true;
5330 
5331   // Level-0 files have to be merged together.  For other levels,
5332   // we will make a concatenating iterator per level.
5333   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5334   const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5335                                               c->num_input_levels() - 1
5336                                         : c->num_input_levels());
5337   InternalIterator** list = new InternalIterator* [space];
5338   size_t num = 0;
5339   for (size_t which = 0; which < c->num_input_levels(); which++) {
5340     if (c->input_levels(which)->num_files != 0) {
5341       if (c->level(which) == 0) {
5342         const LevelFilesBrief* flevel = c->input_levels(which);
5343         for (size_t i = 0; i < flevel->num_files; i++) {
5344           list[num++] = cfd->table_cache()->NewIterator(
5345               read_options, file_options_compactions,
5346               cfd->internal_comparator(),
5347               *flevel->files[i].file_metadata, range_del_agg,
5348               c->mutable_cf_options()->prefix_extractor.get(),
5349               /*table_reader_ptr=*/nullptr,
5350               /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5351               /*arena=*/nullptr,
5352               /*skip_filters=*/false, /*level=*/static_cast<int>(which),
5353               /*smallest_compaction_key=*/nullptr,
5354               /*largest_compaction_key=*/nullptr);
5355         }
5356       } else {
5357         // Create concatenating iterator for the files from this level
5358         list[num++] = new LevelIterator(
5359             cfd->table_cache(), read_options, file_options_compactions,
5360             cfd->internal_comparator(), c->input_levels(which),
5361             c->mutable_cf_options()->prefix_extractor.get(),
5362             /*should_sample=*/false,
5363             /*no per level latency histogram=*/nullptr,
5364             TableReaderCaller::kCompaction, /*skip_filters=*/false,
5365             /*level=*/static_cast<int>(which), range_del_agg,
5366             c->boundaries(which));
5367       }
5368     }
5369   }
5370   assert(num <= space);
5371   InternalIterator* result =
5372       NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5373                          static_cast<int>(num));
5374   delete[] list;
5375   return result;
5376 }
5377 
5378 // verify that the files listed in this compaction are present
5379 // in the current version
VerifyCompactionFileConsistency(Compaction * c)5380 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5381 #ifndef NDEBUG
5382   Version* version = c->column_family_data()->current();
5383   const VersionStorageInfo* vstorage = version->storage_info();
5384   if (c->input_version() != version) {
5385     ROCKS_LOG_INFO(
5386         db_options_->info_log,
5387         "[%s] compaction output being applied to a different base version from"
5388         " input version",
5389         c->column_family_data()->GetName().c_str());
5390 
5391     if (vstorage->compaction_style_ == kCompactionStyleLevel &&
5392         c->start_level() == 0 && c->num_input_levels() > 2U) {
5393       // We are doing a L0->base_level compaction. The assumption is if
5394       // base level is not L1, levels from L1 to base_level - 1 is empty.
5395       // This is ensured by having one compaction from L0 going on at the
5396       // same time in level-based compaction. So that during the time, no
5397       // compaction/flush can put files to those levels.
5398       for (int l = c->start_level() + 1; l < c->output_level(); l++) {
5399         if (vstorage->NumLevelFiles(l) != 0) {
5400           return false;
5401         }
5402       }
5403     }
5404   }
5405 
5406   for (size_t input = 0; input < c->num_input_levels(); ++input) {
5407     int level = c->level(input);
5408     for (size_t i = 0; i < c->num_input_files(input); ++i) {
5409       uint64_t number = c->input(input, i)->fd.GetNumber();
5410       bool found = false;
5411       for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5412         FileMetaData* f = vstorage->files_[level][j];
5413         if (f->fd.GetNumber() == number) {
5414           found = true;
5415           break;
5416         }
5417       }
5418       if (!found) {
5419         return false;  // input files non existent in current version
5420       }
5421     }
5422   }
5423 #else
5424   (void)c;
5425 #endif
5426   return true;     // everything good
5427 }
5428 
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5429 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5430                                       FileMetaData** meta,
5431                                       ColumnFamilyData** cfd) {
5432   for (auto cfd_iter : *column_family_set_) {
5433     if (!cfd_iter->initialized()) {
5434       continue;
5435     }
5436     Version* version = cfd_iter->current();
5437     const auto* vstorage = version->storage_info();
5438     for (int level = 0; level < vstorage->num_levels(); level++) {
5439       for (const auto& file : vstorage->LevelFiles(level)) {
5440         if (file->fd.GetNumber() == number) {
5441           *meta = file;
5442           *filelevel = level;
5443           *cfd = cfd_iter;
5444           return Status::OK();
5445         }
5446       }
5447     }
5448   }
5449   return Status::NotFound("File not present in any level");
5450 }
5451 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5452 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5453   for (auto cfd : *column_family_set_) {
5454     if (cfd->IsDropped() || !cfd->initialized()) {
5455       continue;
5456     }
5457     for (int level = 0; level < cfd->NumberLevels(); level++) {
5458       for (const auto& file :
5459            cfd->current()->storage_info()->LevelFiles(level)) {
5460         LiveFileMetaData filemetadata;
5461         filemetadata.column_family_name = cfd->GetName();
5462         uint32_t path_id = file->fd.GetPathId();
5463         if (path_id < cfd->ioptions()->cf_paths.size()) {
5464           filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5465         } else {
5466           assert(!cfd->ioptions()->cf_paths.empty());
5467           filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5468         }
5469         const uint64_t file_number = file->fd.GetNumber();
5470         filemetadata.name = MakeTableFileName("", file_number);
5471         filemetadata.file_number = file_number;
5472         filemetadata.level = level;
5473         filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5474         filemetadata.smallestkey = file->smallest.user_key().ToString();
5475         filemetadata.largestkey = file->largest.user_key().ToString();
5476         filemetadata.smallest_seqno = file->fd.smallest_seqno;
5477         filemetadata.largest_seqno = file->fd.largest_seqno;
5478         filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5479             std::memory_order_relaxed);
5480         filemetadata.being_compacted = file->being_compacted;
5481         filemetadata.num_entries = file->num_entries;
5482         filemetadata.num_deletions = file->num_deletions;
5483         filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5484         filemetadata.file_checksum = file->file_checksum;
5485         filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5486         metadata->push_back(filemetadata);
5487       }
5488     }
5489   }
5490 }
5491 
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5492 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5493                                   std::vector<std::string>* manifest_filenames,
5494                                   uint64_t min_pending_output) {
5495   assert(manifest_filenames->empty());
5496   obsolete_manifests_.swap(*manifest_filenames);
5497   std::vector<ObsoleteFileInfo> pending_files;
5498   for (auto& f : obsolete_files_) {
5499     if (f.metadata->fd.GetNumber() < min_pending_output) {
5500       files->push_back(std::move(f));
5501     } else {
5502       pending_files.push_back(std::move(f));
5503     }
5504   }
5505   obsolete_files_.swap(pending_files);
5506 }
5507 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,VersionEdit * edit)5508 ColumnFamilyData* VersionSet::CreateColumnFamily(
5509     const ColumnFamilyOptions& cf_options, VersionEdit* edit) {
5510   assert(edit->is_column_family_add_);
5511 
5512   MutableCFOptions dummy_cf_options;
5513   Version* dummy_versions =
5514       new Version(nullptr, this, file_options_, dummy_cf_options);
5515   // Ref() dummy version once so that later we can call Unref() to delete it
5516   // by avoiding calling "delete" explicitly (~Version is private)
5517   dummy_versions->Ref();
5518   auto new_cfd = column_family_set_->CreateColumnFamily(
5519       edit->column_family_name_, edit->column_family_, dummy_versions,
5520       cf_options);
5521 
5522   Version* v = new Version(new_cfd, this, file_options_,
5523                            *new_cfd->GetLatestMutableCFOptions(),
5524                            current_version_number_++);
5525 
5526   // Fill level target base information.
5527   v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5528                                         *new_cfd->GetLatestMutableCFOptions());
5529   AppendVersion(new_cfd, v);
5530   // GetLatestMutableCFOptions() is safe here without mutex since the
5531   // cfd is not available to client
5532   new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5533                              LastSequence());
5534   new_cfd->SetLogNumber(edit->log_number_);
5535   return new_cfd;
5536 }
5537 
GetNumLiveVersions(Version * dummy_versions)5538 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5539   uint64_t count = 0;
5540   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5541     count++;
5542   }
5543   return count;
5544 }
5545 
GetTotalSstFilesSize(Version * dummy_versions)5546 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5547   std::unordered_set<uint64_t> unique_files;
5548   uint64_t total_files_size = 0;
5549   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5550     VersionStorageInfo* storage_info = v->storage_info();
5551     for (int level = 0; level < storage_info->num_levels_; level++) {
5552       for (const auto& file_meta : storage_info->LevelFiles(level)) {
5553         if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5554             unique_files.end()) {
5555           unique_files.insert(file_meta->fd.packed_number_and_path_id);
5556           total_files_size += file_meta->fd.GetFileSize();
5557         }
5558       }
5559     }
5560   }
5561   return total_files_size;
5562 }
5563 
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller)5564 ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
5565                                        const ImmutableDBOptions* _db_options,
5566                                        const FileOptions& _file_options,
5567                                        Cache* table_cache,
5568                                        WriteBufferManager* write_buffer_manager,
5569                                        WriteController* write_controller)
5570     : VersionSet(dbname, _db_options, _file_options, table_cache,
5571                  write_buffer_manager, write_controller,
5572                  /*block_cache_tracer=*/nullptr),
5573       number_of_edits_to_skip_(0) {}
5574 
~ReactiveVersionSet()5575 ReactiveVersionSet::~ReactiveVersionSet() {}
5576 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5577 Status ReactiveVersionSet::Recover(
5578     const std::vector<ColumnFamilyDescriptor>& column_families,
5579     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5580     std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5581     std::unique_ptr<Status>* manifest_reader_status) {
5582   assert(manifest_reader != nullptr);
5583   assert(manifest_reporter != nullptr);
5584   assert(manifest_reader_status != nullptr);
5585 
5586   std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
5587   for (const auto& cf : column_families) {
5588     cf_name_to_options.insert({cf.name, cf.options});
5589   }
5590 
5591   // add default column family
5592   auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
5593   if (default_cf_iter == cf_name_to_options.end()) {
5594     return Status::InvalidArgument("Default column family not specified");
5595   }
5596   VersionEdit default_cf_edit;
5597   default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
5598   default_cf_edit.SetColumnFamily(0);
5599   ColumnFamilyData* default_cfd =
5600       CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
5601   // In recovery, nobody else can access it, so it's fine to set it to be
5602   // initialized earlier.
5603   default_cfd->set_initialized();
5604   std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
5605       builders;
5606   std::unordered_map<int, std::string> column_families_not_found;
5607   builders.insert(
5608       std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
5609                             new BaseReferencedVersionBuilder(default_cfd))));
5610 
5611   manifest_reader_status->reset(new Status());
5612   manifest_reporter->reset(new LogReporter());
5613   static_cast<LogReporter*>(manifest_reporter->get())->status =
5614       manifest_reader_status->get();
5615   Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5616   log::Reader* reader = manifest_reader->get();
5617 
5618   int retry = 0;
5619   VersionEdit version_edit;
5620   while (s.ok() && retry < 1) {
5621     assert(reader != nullptr);
5622     Slice record;
5623     std::string scratch;
5624     s = ReadAndRecover(reader, &read_buffer_, cf_name_to_options,
5625                        column_families_not_found, builders, &version_edit);
5626     if (s.ok()) {
5627       bool enough = version_edit.has_next_file_number_ &&
5628                     version_edit.has_log_number_ &&
5629                     version_edit.has_last_sequence_;
5630       if (enough) {
5631         for (const auto& cf : column_families) {
5632           auto cfd = column_family_set_->GetColumnFamily(cf.name);
5633           if (cfd == nullptr) {
5634             enough = false;
5635             break;
5636           }
5637         }
5638       }
5639       if (enough) {
5640         for (const auto& cf : column_families) {
5641           auto cfd = column_family_set_->GetColumnFamily(cf.name);
5642           assert(cfd != nullptr);
5643           if (!cfd->IsDropped()) {
5644             auto builder_iter = builders.find(cfd->GetID());
5645             assert(builder_iter != builders.end());
5646             auto builder = builder_iter->second->version_builder();
5647             assert(builder != nullptr);
5648             s = builder->LoadTableHandlers(
5649                 cfd->internal_stats(), db_options_->max_file_opening_threads,
5650                 false /* prefetch_index_and_filter_in_cache */,
5651                 true /* is_initial_load */,
5652                 cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
5653             if (!s.ok()) {
5654               enough = false;
5655               if (s.IsPathNotFound()) {
5656                 s = Status::OK();
5657               }
5658               break;
5659             }
5660           }
5661         }
5662       }
5663       if (enough) {
5664         break;
5665       }
5666     }
5667     ++retry;
5668   }
5669 
5670   if (s.ok()) {
5671     if (!version_edit.has_prev_log_number_) {
5672       version_edit.prev_log_number_ = 0;
5673     }
5674     column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
5675 
5676     MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
5677     MarkFileNumberUsed(version_edit.prev_log_number_);
5678     MarkFileNumberUsed(version_edit.log_number_);
5679 
5680     for (auto cfd : *column_family_set_) {
5681       assert(builders.count(cfd->GetID()) > 0);
5682       auto builder = builders[cfd->GetID()]->version_builder();
5683       if (!builder->CheckConsistencyForNumLevels()) {
5684         s = Status::InvalidArgument(
5685             "db has more levels than options.num_levels");
5686         break;
5687       }
5688     }
5689   }
5690 
5691   if (s.ok()) {
5692     for (auto cfd : *column_family_set_) {
5693       if (cfd->IsDropped()) {
5694         continue;
5695       }
5696       assert(cfd->initialized());
5697       auto builders_iter = builders.find(cfd->GetID());
5698       assert(builders_iter != builders.end());
5699       auto* builder = builders_iter->second->version_builder();
5700 
5701       Version* v = new Version(cfd, this, file_options_,
5702                                *cfd->GetLatestMutableCFOptions(),
5703                                current_version_number_++);
5704       builder->SaveTo(v->storage_info());
5705 
5706       // Install recovered version
5707       v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
5708                       !(db_options_->skip_stats_update_on_db_open));
5709       AppendVersion(cfd, v);
5710     }
5711     next_file_number_.store(version_edit.next_file_number_ + 1);
5712     last_allocated_sequence_ = version_edit.last_sequence_;
5713     last_published_sequence_ = version_edit.last_sequence_;
5714     last_sequence_ = version_edit.last_sequence_;
5715     prev_log_number_ = version_edit.prev_log_number_;
5716     for (auto cfd : *column_family_set_) {
5717       if (cfd->IsDropped()) {
5718         continue;
5719       }
5720       ROCKS_LOG_INFO(db_options_->info_log,
5721                      "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
5722                      cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
5723     }
5724   }
5725   return s;
5726 }
5727 
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unordered_set<ColumnFamilyData * > * cfds_changed)5728 Status ReactiveVersionSet::ReadAndApply(
5729     InstrumentedMutex* mu,
5730     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5731     std::unordered_set<ColumnFamilyData*>* cfds_changed) {
5732   assert(manifest_reader != nullptr);
5733   assert(cfds_changed != nullptr);
5734   mu->AssertHeld();
5735 
5736   Status s;
5737   uint64_t applied_edits = 0;
5738   while (s.ok()) {
5739     Slice record;
5740     std::string scratch;
5741     log::Reader* reader = manifest_reader->get();
5742     std::string old_manifest_path = reader->file()->file_name();
5743     while (reader->ReadRecord(&record, &scratch)) {
5744       VersionEdit edit;
5745       s = edit.DecodeFrom(record);
5746       if (!s.ok()) {
5747         break;
5748       }
5749 
5750       // Skip the first VersionEdits of each MANIFEST generated by
5751       // VersionSet::WriteCurrentStatetoManifest.
5752       if (number_of_edits_to_skip_ > 0) {
5753         ColumnFamilyData* cfd =
5754             column_family_set_->GetColumnFamily(edit.column_family_);
5755         if (cfd != nullptr && !cfd->IsDropped()) {
5756           --number_of_edits_to_skip_;
5757         }
5758         continue;
5759       }
5760 
5761       s = read_buffer_.AddEdit(&edit);
5762       if (!s.ok()) {
5763         break;
5764       }
5765       VersionEdit temp_edit;
5766       if (edit.is_in_atomic_group_) {
5767         if (read_buffer_.IsFull()) {
5768           // Apply edits in an atomic group when we have read all edits in the
5769           // group.
5770           for (auto& e : read_buffer_.replay_buffer()) {
5771             s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
5772             if (!s.ok()) {
5773               break;
5774             }
5775             applied_edits++;
5776           }
5777           if (!s.ok()) {
5778             break;
5779           }
5780           read_buffer_.Clear();
5781         }
5782       } else {
5783         // Apply a normal edit immediately.
5784         s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
5785         if (s.ok()) {
5786           applied_edits++;
5787         }
5788       }
5789     }
5790     if (!s.ok()) {
5791       // Clear the buffer if we fail to decode/apply an edit.
5792       read_buffer_.Clear();
5793     }
5794     // It's possible that:
5795     // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
5796     // 2) we have finished reading the current MANIFEST.
5797     // 3) we have encountered an IOError reading the current MANIFEST.
5798     // We need to look for the next MANIFEST and start from there. If we cannot
5799     // find the next MANIFEST, we should exit the loop.
5800     s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
5801     reader = manifest_reader->get();
5802     if (s.ok()) {
5803       if (reader->file()->file_name() == old_manifest_path) {
5804         // Still processing the same MANIFEST, thus no need to continue this
5805         // loop since no record is available if we have reached here.
5806         break;
5807       } else {
5808         // We have switched to a new MANIFEST whose first records have been
5809         // generated by VersionSet::WriteCurrentStatetoManifest. Since the
5810         // secondary instance has already finished recovering upon start, there
5811         // is no need for the secondary to process these records. Actually, if
5812         // the secondary were to replay these records, the secondary may end up
5813         // adding the same SST files AGAIN to each column family, causing
5814         // consistency checks done by VersionBuilder to fail. Therefore, we
5815         // record the number of records to skip at the beginning of the new
5816         // MANIFEST and ignore them.
5817         number_of_edits_to_skip_ = 0;
5818         for (auto* cfd : *column_family_set_) {
5819           if (cfd->IsDropped()) {
5820             continue;
5821           }
5822           // Increase number_of_edits_to_skip by 2 because
5823           // WriteCurrentStatetoManifest() writes 2 version edits for each
5824           // column family at the beginning of the newly-generated MANIFEST.
5825           // TODO(yanqin) remove hard-coded value.
5826           if (db_options_->write_dbid_to_manifest) {
5827             number_of_edits_to_skip_ += 3;
5828           } else {
5829             number_of_edits_to_skip_ += 2;
5830           }
5831         }
5832       }
5833     }
5834   }
5835 
5836   if (s.ok()) {
5837     for (auto cfd : *column_family_set_) {
5838       auto builder_iter = active_version_builders_.find(cfd->GetID());
5839       if (builder_iter == active_version_builders_.end()) {
5840         continue;
5841       }
5842       auto builder = builder_iter->second->version_builder();
5843       if (!builder->CheckConsistencyForNumLevels()) {
5844         s = Status::InvalidArgument(
5845             "db has more levels than options.num_levels");
5846         break;
5847       }
5848     }
5849   }
5850   TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
5851                            &applied_edits);
5852   return s;
5853 }
5854 
ApplyOneVersionEditToBuilder(VersionEdit & edit,std::unordered_set<ColumnFamilyData * > * cfds_changed,VersionEdit * version_edit)5855 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
5856     VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
5857     VersionEdit* version_edit) {
5858   ColumnFamilyData* cfd =
5859       column_family_set_->GetColumnFamily(edit.column_family_);
5860 
5861   // If we cannot find this column family in our column family set, then it
5862   // may be a new column family created by the primary after the secondary
5863   // starts. It is also possible that the secondary instance opens only a subset
5864   // of column families. Ignore it for now.
5865   if (nullptr == cfd) {
5866     return Status::OK();
5867   }
5868   if (active_version_builders_.find(edit.column_family_) ==
5869           active_version_builders_.end() &&
5870       !cfd->IsDropped()) {
5871     std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
5872         new BaseReferencedVersionBuilder(cfd));
5873     active_version_builders_.insert(
5874         std::make_pair(edit.column_family_, std::move(builder_guard)));
5875   }
5876 
5877   auto builder_iter = active_version_builders_.find(edit.column_family_);
5878   assert(builder_iter != active_version_builders_.end());
5879   auto builder = builder_iter->second->version_builder();
5880   assert(builder != nullptr);
5881 
5882   if (edit.is_column_family_add_) {
5883     // TODO (yanqin) for now the secondary ignores column families created
5884     // after Open. This also simplifies handling of switching to a new MANIFEST
5885     // and processing the snapshot of the system at the beginning of the
5886     // MANIFEST.
5887   } else if (edit.is_column_family_drop_) {
5888     // Drop the column family by setting it to be 'dropped' without destroying
5889     // the column family handle.
5890     // TODO (haoyu) figure out how to handle column faimly drop for
5891     // secondary instance. (Is it possible that the ref count for cfd is 0 but
5892     // the ref count for its versions is higher than 0?)
5893     cfd->SetDropped();
5894     if (cfd->UnrefAndTryDelete()) {
5895       cfd = nullptr;
5896     }
5897     active_version_builders_.erase(builder_iter);
5898   } else {
5899     Status s = builder->Apply(&edit);
5900     if (!s.ok()) {
5901       return s;
5902     }
5903   }
5904   Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
5905   if (!s.ok()) {
5906     return s;
5907   }
5908 
5909   if (cfd != nullptr && !cfd->IsDropped()) {
5910     s = builder->LoadTableHandlers(
5911         cfd->internal_stats(), db_options_->max_file_opening_threads,
5912         false /* prefetch_index_and_filter_in_cache */,
5913         false /* is_initial_load */,
5914         cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
5915     TEST_SYNC_POINT_CALLBACK(
5916         "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
5917         "AfterLoadTableHandlers",
5918         &s);
5919 
5920     if (s.ok()) {
5921       auto version = new Version(cfd, this, file_options_,
5922                                  *cfd->GetLatestMutableCFOptions(),
5923                                  current_version_number_++);
5924       builder->SaveTo(version->storage_info());
5925       version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
5926       AppendVersion(cfd, version);
5927       active_version_builders_.erase(builder_iter);
5928       if (cfds_changed->count(cfd) == 0) {
5929         cfds_changed->insert(cfd);
5930       }
5931     } else if (s.IsPathNotFound()) {
5932       s = Status::OK();
5933     }
5934     // Some other error has occurred during LoadTableHandlers.
5935   }
5936 
5937   if (version_edit->HasNextFile()) {
5938     next_file_number_.store(version_edit->next_file_number_ + 1);
5939   }
5940   if (version_edit->has_last_sequence_) {
5941     last_allocated_sequence_ = version_edit->last_sequence_;
5942     last_published_sequence_ = version_edit->last_sequence_;
5943     last_sequence_ = version_edit->last_sequence_;
5944   }
5945   if (version_edit->has_prev_log_number_) {
5946     prev_log_number_ = version_edit->prev_log_number_;
5947     MarkFileNumberUsed(version_edit->prev_log_number_);
5948   }
5949   if (version_edit->has_log_number_) {
5950     MarkFileNumberUsed(version_edit->log_number_);
5951   }
5952   column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
5953   MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
5954   return s;
5955 }
5956 
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)5957 Status ReactiveVersionSet::MaybeSwitchManifest(
5958     log::Reader::Reporter* reporter,
5959     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
5960   assert(manifest_reader != nullptr);
5961   Status s;
5962   do {
5963     std::string manifest_path;
5964     s = GetCurrentManifestPath(dbname_, fs_, &manifest_path,
5965                                &manifest_file_number_);
5966     std::unique_ptr<FSSequentialFile> manifest_file;
5967     if (s.ok()) {
5968       if (nullptr == manifest_reader->get() ||
5969           manifest_reader->get()->file()->file_name() != manifest_path) {
5970         TEST_SYNC_POINT(
5971             "ReactiveVersionSet::MaybeSwitchManifest:"
5972             "AfterGetCurrentManifestPath:0");
5973         TEST_SYNC_POINT(
5974             "ReactiveVersionSet::MaybeSwitchManifest:"
5975             "AfterGetCurrentManifestPath:1");
5976         s = fs_->NewSequentialFile(manifest_path,
5977                                    env_->OptimizeForManifestRead(file_options_),
5978                                    &manifest_file, nullptr);
5979       } else {
5980         // No need to switch manifest.
5981         break;
5982       }
5983     }
5984     std::unique_ptr<SequentialFileReader> manifest_file_reader;
5985     if (s.ok()) {
5986       manifest_file_reader.reset(
5987           new SequentialFileReader(std::move(manifest_file), manifest_path,
5988                                    db_options_->log_readahead_size));
5989       manifest_reader->reset(new log::FragmentBufferedReader(
5990           nullptr, std::move(manifest_file_reader), reporter,
5991           true /* checksum */, 0 /* log_number */));
5992       ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
5993                      manifest_path.c_str());
5994       // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
5995       // active_version_builders_ map because we choose to construct the
5996       // versions from scratch, thanks to the first part of each MANIFEST
5997       // written by VersionSet::WriteCurrentStatetoManifest. This is not
5998       // necessary, but we choose this at present for the sake of simplicity.
5999       active_version_builders_.clear();
6000     }
6001   } while (s.IsPathNotFound());
6002   return s;
6003 }
6004 
6005 }  // namespace ROCKSDB_NAMESPACE
6006