1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_set.h"
11 
12 #include <algorithm>
13 #include <array>
14 #include <cinttypes>
15 #include <cstdio>
16 #include <list>
17 #include <map>
18 #include <set>
19 #include <string>
20 #include <unordered_map>
21 #include <vector>
22 
23 #include "compaction/compaction.h"
24 #include "db/blob/blob_fetcher.h"
25 #include "db/blob/blob_file_cache.h"
26 #include "db/blob/blob_file_reader.h"
27 #include "db/blob/blob_index.h"
28 #include "db/blob/blob_log_format.h"
29 #include "db/internal_stats.h"
30 #include "db/log_reader.h"
31 #include "db/log_writer.h"
32 #include "db/memtable.h"
33 #include "db/merge_context.h"
34 #include "db/merge_helper.h"
35 #include "db/pinned_iterators_manager.h"
36 #include "db/table_cache.h"
37 #include "db/version_builder.h"
38 #include "db/version_edit_handler.h"
39 #include "file/filename.h"
40 #include "file/random_access_file_reader.h"
41 #include "file/read_write_util.h"
42 #include "file/writable_file_writer.h"
43 #include "logging/logging.h"
44 #include "monitoring/file_read_sample.h"
45 #include "monitoring/perf_context_imp.h"
46 #include "monitoring/persistent_stats_history.h"
47 #include "options/options_helper.h"
48 #include "rocksdb/env.h"
49 #include "rocksdb/merge_operator.h"
50 #include "rocksdb/write_buffer_manager.h"
51 #include "table/format.h"
52 #include "table/get_context.h"
53 #include "table/internal_iterator.h"
54 #include "table/merging_iterator.h"
55 #include "table/meta_blocks.h"
56 #include "table/multiget_context.h"
57 #include "table/plain/plain_table_factory.h"
58 #include "table/table_reader.h"
59 #include "table/two_level_iterator.h"
60 #include "test_util/sync_point.h"
61 #include "util/cast_util.h"
62 #include "util/coding.h"
63 #include "util/stop_watch.h"
64 #include "util/string_util.h"
65 #include "util/user_comparator_wrapper.h"
66 
67 namespace ROCKSDB_NAMESPACE {
68 
69 namespace {
70 
71 // Find File in LevelFilesBrief data structure
72 // Within an index range defined by left and right
FindFileInRange(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key,uint32_t left,uint32_t right)73 int FindFileInRange(const InternalKeyComparator& icmp,
74     const LevelFilesBrief& file_level,
75     const Slice& key,
76     uint32_t left,
77     uint32_t right) {
78   auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
79     return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
80   };
81   const auto &b = file_level.files;
82   return static_cast<int>(std::lower_bound(b + left,
83                                            b + right, key, cmp) - b);
84 }
85 
OverlapWithIterator(const Comparator * ucmp,const Slice & smallest_user_key,const Slice & largest_user_key,InternalIterator * iter,bool * overlap)86 Status OverlapWithIterator(const Comparator* ucmp,
87     const Slice& smallest_user_key,
88     const Slice& largest_user_key,
89     InternalIterator* iter,
90     bool* overlap) {
91   InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
92                           kValueTypeForSeek);
93   iter->Seek(range_start.Encode());
94   if (!iter->status().ok()) {
95     return iter->status();
96   }
97 
98   *overlap = false;
99   if (iter->Valid()) {
100     ParsedInternalKey seek_result;
101     Status s = ParseInternalKey(iter->key(), &seek_result,
102                                 false /* log_err_key */);  // TODO
103     if (!s.ok()) return s;
104 
105     if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
106         0) {
107       *overlap = true;
108     }
109   }
110 
111   return iter->status();
112 }
113 
114 // Class to help choose the next file to search for the particular key.
115 // Searches and returns files level by level.
116 // We can search level-by-level since entries never hop across
117 // levels. Therefore we are guaranteed that if we find data
118 // in a smaller level, later levels are irrelevant (unless we
119 // are MergeInProgress).
120 class FilePicker {
121  public:
FilePicker(std::vector<FileMetaData * > * files,const Slice & user_key,const Slice & ikey,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)122   FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
123              const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
124              unsigned int num_levels, FileIndexer* file_indexer,
125              const Comparator* user_comparator,
126              const InternalKeyComparator* internal_comparator)
127       : num_levels_(num_levels),
128         curr_level_(static_cast<unsigned int>(-1)),
129         returned_file_level_(static_cast<unsigned int>(-1)),
130         hit_file_level_(static_cast<unsigned int>(-1)),
131         search_left_bound_(0),
132         search_right_bound_(FileIndexer::kLevelMaxIndex),
133 #ifndef NDEBUG
134         files_(files),
135 #endif
136         level_files_brief_(file_levels),
137         is_hit_file_last_in_level_(false),
138         curr_file_level_(nullptr),
139         user_key_(user_key),
140         ikey_(ikey),
141         file_indexer_(file_indexer),
142         user_comparator_(user_comparator),
143         internal_comparator_(internal_comparator) {
144 #ifdef NDEBUG
145     (void)files;
146 #endif
147     // Setup member variables to search first level.
148     search_ended_ = !PrepareNextLevel();
149     if (!search_ended_) {
150       // Prefetch Level 0 table data to avoid cache miss if possible.
151       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
152         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
153         if (r) {
154           r->Prepare(ikey);
155         }
156       }
157     }
158   }
159 
GetCurrentLevel() const160   int GetCurrentLevel() const { return curr_level_; }
161 
GetNextFile()162   FdWithKeyRange* GetNextFile() {
163     while (!search_ended_) {  // Loops over different levels.
164       while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
165         // Loops over all files in current level.
166         FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
167         hit_file_level_ = curr_level_;
168         is_hit_file_last_in_level_ =
169             curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
170         int cmp_largest = -1;
171 
172         // Do key range filtering of files or/and fractional cascading if:
173         // (1) not all the files are in level 0, or
174         // (2) there are more than 3 current level files
175         // If there are only 3 or less current level files in the system, we skip
176         // the key range filtering. In this case, more likely, the system is
177         // highly tuned to minimize number of tables queried by each query,
178         // so it is unlikely that key range filtering is more efficient than
179         // querying the files.
180         if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
181           // Check if key is within a file's range. If search left bound and
182           // right bound point to the same find, we are sure key falls in
183           // range.
184           assert(curr_level_ == 0 ||
185                  curr_index_in_curr_level_ == start_index_in_curr_level_ ||
186                  user_comparator_->CompareWithoutTimestamp(
187                      user_key_, ExtractUserKey(f->smallest_key)) <= 0);
188 
189           int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
190               user_key_, ExtractUserKey(f->smallest_key));
191           if (cmp_smallest >= 0) {
192             cmp_largest = user_comparator_->CompareWithoutTimestamp(
193                 user_key_, ExtractUserKey(f->largest_key));
194           }
195 
196           // Setup file search bound for the next level based on the
197           // comparison results
198           if (curr_level_ > 0) {
199             file_indexer_->GetNextLevelIndex(curr_level_,
200                                             curr_index_in_curr_level_,
201                                             cmp_smallest, cmp_largest,
202                                             &search_left_bound_,
203                                             &search_right_bound_);
204           }
205           // Key falls out of current file's range
206           if (cmp_smallest < 0 || cmp_largest > 0) {
207             if (curr_level_ == 0) {
208               ++curr_index_in_curr_level_;
209               continue;
210             } else {
211               // Search next level.
212               break;
213             }
214           }
215         }
216 #ifndef NDEBUG
217         // Sanity check to make sure that the files are correctly sorted
218         if (prev_file_) {
219           if (curr_level_ != 0) {
220             int comp_sign = internal_comparator_->Compare(
221                 prev_file_->largest_key, f->smallest_key);
222             assert(comp_sign < 0);
223           } else {
224             // level == 0, the current file cannot be newer than the previous
225             // one. Use compressed data structure, has no attribute seqNo
226             assert(curr_index_in_curr_level_ > 0);
227             assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
228                   files_[0][curr_index_in_curr_level_-1]));
229           }
230         }
231         prev_file_ = f;
232 #endif
233         returned_file_level_ = curr_level_;
234         if (curr_level_ > 0 && cmp_largest < 0) {
235           // No more files to search in this level.
236           search_ended_ = !PrepareNextLevel();
237         } else {
238           ++curr_index_in_curr_level_;
239         }
240         return f;
241       }
242       // Start searching next level.
243       search_ended_ = !PrepareNextLevel();
244     }
245     // Search ended.
246     return nullptr;
247   }
248 
249   // getter for current file level
250   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()251   unsigned int GetHitFileLevel() { return hit_file_level_; }
252 
253   // Returns true if the most recent "hit file" (i.e., one returned by
254   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()255   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
256 
257  private:
258   unsigned int num_levels_;
259   unsigned int curr_level_;
260   unsigned int returned_file_level_;
261   unsigned int hit_file_level_;
262   int32_t search_left_bound_;
263   int32_t search_right_bound_;
264 #ifndef NDEBUG
265   std::vector<FileMetaData*>* files_;
266 #endif
267   autovector<LevelFilesBrief>* level_files_brief_;
268   bool search_ended_;
269   bool is_hit_file_last_in_level_;
270   LevelFilesBrief* curr_file_level_;
271   unsigned int curr_index_in_curr_level_;
272   unsigned int start_index_in_curr_level_;
273   Slice user_key_;
274   Slice ikey_;
275   FileIndexer* file_indexer_;
276   const Comparator* user_comparator_;
277   const InternalKeyComparator* internal_comparator_;
278 #ifndef NDEBUG
279   FdWithKeyRange* prev_file_;
280 #endif
281 
282   // Setup local variables to search next level.
283   // Returns false if there are no more levels to search.
PrepareNextLevel()284   bool PrepareNextLevel() {
285     curr_level_++;
286     while (curr_level_ < num_levels_) {
287       curr_file_level_ = &(*level_files_brief_)[curr_level_];
288       if (curr_file_level_->num_files == 0) {
289         // When current level is empty, the search bound generated from upper
290         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
291         // also empty.
292         assert(search_left_bound_ == 0);
293         assert(search_right_bound_ == -1 ||
294                search_right_bound_ == FileIndexer::kLevelMaxIndex);
295         // Since current level is empty, it will need to search all files in
296         // the next level
297         search_left_bound_ = 0;
298         search_right_bound_ = FileIndexer::kLevelMaxIndex;
299         curr_level_++;
300         continue;
301       }
302 
303       // Some files may overlap each other. We find
304       // all files that overlap user_key and process them in order from
305       // newest to oldest. In the context of merge-operator, this can occur at
306       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
307       // are always compacted into a single entry).
308       int32_t start_index;
309       if (curr_level_ == 0) {
310         // On Level-0, we read through all files to check for overlap.
311         start_index = 0;
312       } else {
313         // On Level-n (n>=1), files are sorted. Binary search to find the
314         // earliest file whose largest key >= ikey. Search left bound and
315         // right bound are used to narrow the range.
316         if (search_left_bound_ <= search_right_bound_) {
317           if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
318             search_right_bound_ =
319                 static_cast<int32_t>(curr_file_level_->num_files) - 1;
320           }
321           // `search_right_bound_` is an inclusive upper-bound, but since it was
322           // determined based on user key, it is still possible the lookup key
323           // falls to the right of `search_right_bound_`'s corresponding file.
324           // So, pass a limit one higher, which allows us to detect this case.
325           start_index =
326               FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
327                               static_cast<uint32_t>(search_left_bound_),
328                               static_cast<uint32_t>(search_right_bound_) + 1);
329           if (start_index == search_right_bound_ + 1) {
330             // `ikey_` comes after `search_right_bound_`. The lookup key does
331             // not exist on this level, so let's skip this level and do a full
332             // binary search on the next level.
333             search_left_bound_ = 0;
334             search_right_bound_ = FileIndexer::kLevelMaxIndex;
335             curr_level_++;
336             continue;
337           }
338         } else {
339           // search_left_bound > search_right_bound, key does not exist in
340           // this level. Since no comparison is done in this level, it will
341           // need to search all files in the next level.
342           search_left_bound_ = 0;
343           search_right_bound_ = FileIndexer::kLevelMaxIndex;
344           curr_level_++;
345           continue;
346         }
347       }
348       start_index_in_curr_level_ = start_index;
349       curr_index_in_curr_level_ = start_index;
350 #ifndef NDEBUG
351       prev_file_ = nullptr;
352 #endif
353       return true;
354     }
355     // curr_level_ = num_levels_. So, no more levels to search.
356     return false;
357   }
358 };
359 
360 class FilePickerMultiGet {
361  private:
362   struct FilePickerContext;
363 
364  public:
FilePickerMultiGet(MultiGetRange * range,autovector<LevelFilesBrief> * file_levels,unsigned int num_levels,FileIndexer * file_indexer,const Comparator * user_comparator,const InternalKeyComparator * internal_comparator)365   FilePickerMultiGet(MultiGetRange* range,
366                      autovector<LevelFilesBrief>* file_levels,
367                      unsigned int num_levels, FileIndexer* file_indexer,
368                      const Comparator* user_comparator,
369                      const InternalKeyComparator* internal_comparator)
370       : num_levels_(num_levels),
371         curr_level_(static_cast<unsigned int>(-1)),
372         returned_file_level_(static_cast<unsigned int>(-1)),
373         hit_file_level_(static_cast<unsigned int>(-1)),
374         range_(range),
375         batch_iter_(range->begin()),
376         batch_iter_prev_(range->begin()),
377         upper_key_(range->begin()),
378         maybe_repeat_key_(false),
379         current_level_range_(*range, range->begin(), range->end()),
380         current_file_range_(*range, range->begin(), range->end()),
381         level_files_brief_(file_levels),
382         is_hit_file_last_in_level_(false),
383         curr_file_level_(nullptr),
384         file_indexer_(file_indexer),
385         user_comparator_(user_comparator),
386         internal_comparator_(internal_comparator) {
387     for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
388       fp_ctx_array_[iter.index()] =
389           FilePickerContext(0, FileIndexer::kLevelMaxIndex);
390     }
391 
392     // Setup member variables to search first level.
393     search_ended_ = !PrepareNextLevel();
394     if (!search_ended_) {
395       // REVISIT
396       // Prefetch Level 0 table data to avoid cache miss if possible.
397       // As of now, only PlainTableReader and CuckooTableReader do any
398       // prefetching. This may not be necessary anymore once we implement
399       // batching in those table readers
400       for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
401         auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
402         if (r) {
403           for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
404             r->Prepare(iter->ikey);
405           }
406         }
407       }
408     }
409   }
410 
GetCurrentLevel() const411   int GetCurrentLevel() const { return curr_level_; }
412 
413   // Iterates through files in the current level until it finds a file that
414   // contains at least one key from the MultiGet batch
GetNextFileInLevelWithKeys(MultiGetRange * next_file_range,size_t * file_index,FdWithKeyRange ** fd,bool * is_last_key_in_file)415   bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
416                                   size_t* file_index, FdWithKeyRange** fd,
417                                   bool* is_last_key_in_file) {
418     size_t curr_file_index = *file_index;
419     FdWithKeyRange* f = nullptr;
420     bool file_hit = false;
421     int cmp_largest = -1;
422     if (curr_file_index >= curr_file_level_->num_files) {
423       // In the unlikely case the next key is a duplicate of the current key,
424       // and the current key is the last in the level and the internal key
425       // was not found, we need to skip lookup for the remaining keys and
426       // reset the search bounds
427       if (batch_iter_ != current_level_range_.end()) {
428         ++batch_iter_;
429         for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
430           struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
431           fp_ctx.search_left_bound = 0;
432           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
433         }
434       }
435       return false;
436     }
437     // Loops over keys in the MultiGet batch until it finds a file with
438     // atleast one of the keys. Then it keeps moving forward until the
439     // last key in the batch that falls in that file
440     while (batch_iter_ != current_level_range_.end() &&
441            (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
442                 curr_file_index ||
443             !file_hit)) {
444       struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
445       f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
446       Slice& user_key = batch_iter_->ukey_without_ts;
447 
448       // Do key range filtering of files or/and fractional cascading if:
449       // (1) not all the files are in level 0, or
450       // (2) there are more than 3 current level files
451       // If there are only 3 or less current level files in the system, we
452       // skip the key range filtering. In this case, more likely, the system
453       // is highly tuned to minimize number of tables queried by each query,
454       // so it is unlikely that key range filtering is more efficient than
455       // querying the files.
456       if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
457         // Check if key is within a file's range. If search left bound and
458         // right bound point to the same find, we are sure key falls in
459         // range.
460         int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
461             user_key, false, ExtractUserKey(f->smallest_key), true);
462 
463         assert(curr_level_ == 0 ||
464                fp_ctx.curr_index_in_curr_level ==
465                    fp_ctx.start_index_in_curr_level ||
466                cmp_smallest <= 0);
467 
468         if (cmp_smallest >= 0) {
469           cmp_largest = user_comparator_->CompareWithoutTimestamp(
470               user_key, false, ExtractUserKey(f->largest_key), true);
471         } else {
472           cmp_largest = -1;
473         }
474 
475         // Setup file search bound for the next level based on the
476         // comparison results
477         if (curr_level_ > 0) {
478           file_indexer_->GetNextLevelIndex(
479               curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
480               cmp_largest, &fp_ctx.search_left_bound,
481               &fp_ctx.search_right_bound);
482         }
483         // Key falls out of current file's range
484         if (cmp_smallest < 0 || cmp_largest > 0) {
485           next_file_range->SkipKey(batch_iter_);
486         } else {
487           file_hit = true;
488         }
489       } else {
490         file_hit = true;
491       }
492       if (cmp_largest == 0) {
493         // cmp_largest is 0, which means the next key will not be in this
494         // file, so stop looking further. However, its possible there are
495         // duplicates in the batch, so find the upper bound for the batch
496         // in this file (upper_key_) by skipping past the duplicates. We
497         // leave batch_iter_ as is since we may have to pick up from there
498         // for the next file, if this file has a merge value rather than
499         // final value
500         upper_key_ = batch_iter_;
501         ++upper_key_;
502         while (upper_key_ != current_level_range_.end() &&
503                user_comparator_->CompareWithoutTimestamp(
504                    batch_iter_->ukey_without_ts, false,
505                    upper_key_->ukey_without_ts, false) == 0) {
506           ++upper_key_;
507         }
508         break;
509       } else {
510         if (curr_level_ == 0) {
511           // We need to look through all files in level 0
512           ++fp_ctx.curr_index_in_curr_level;
513         }
514         ++batch_iter_;
515       }
516       if (!file_hit) {
517         curr_file_index =
518             (batch_iter_ != current_level_range_.end())
519                 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
520                 : curr_file_level_->num_files;
521       }
522     }
523 
524     *fd = f;
525     *file_index = curr_file_index;
526     *is_last_key_in_file = cmp_largest == 0;
527     if (!*is_last_key_in_file) {
528       // If the largest key in the batch overlapping the file is not the
529       // largest key in the file, upper_ley_ would not have been updated so
530       // update it here
531       upper_key_ = batch_iter_;
532     }
533     return file_hit;
534   }
535 
GetNextFile()536   FdWithKeyRange* GetNextFile() {
537     while (!search_ended_) {
538       // Start searching next level.
539       if (batch_iter_ == current_level_range_.end()) {
540         search_ended_ = !PrepareNextLevel();
541         continue;
542       } else {
543         if (maybe_repeat_key_) {
544           maybe_repeat_key_ = false;
545           // Check if we found the final value for the last key in the
546           // previous lookup range. If we did, then there's no need to look
547           // any further for that key, so advance batch_iter_. Else, keep
548           // batch_iter_ positioned on that key so we look it up again in
549           // the next file
550           // For L0, always advance the key because we will look in the next
551           // file regardless for all keys not found yet
552           if (current_level_range_.CheckKeyDone(batch_iter_) ||
553               curr_level_ == 0) {
554             batch_iter_ = upper_key_;
555           }
556         }
557         // batch_iter_prev_ will become the start key for the next file
558         // lookup
559         batch_iter_prev_ = batch_iter_;
560       }
561 
562       MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
563                                     current_level_range_.end());
564       size_t curr_file_index =
565           (batch_iter_ != current_level_range_.end())
566               ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
567               : curr_file_level_->num_files;
568       FdWithKeyRange* f;
569       bool is_last_key_in_file;
570       if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
571                                       &is_last_key_in_file)) {
572         search_ended_ = !PrepareNextLevel();
573       } else {
574         if (is_last_key_in_file) {
575           // Since cmp_largest is 0, batch_iter_ still points to the last key
576           // that falls in this file, instead of the next one. Increment
577           // the file index for all keys between batch_iter_ and upper_key_
578           auto tmp_iter = batch_iter_;
579           while (tmp_iter != upper_key_) {
580             ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
581             ++tmp_iter;
582           }
583           maybe_repeat_key_ = true;
584         }
585         // Set the range for this file
586         current_file_range_ =
587             MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
588         returned_file_level_ = curr_level_;
589         hit_file_level_ = curr_level_;
590         is_hit_file_last_in_level_ =
591             curr_file_index == curr_file_level_->num_files - 1;
592         return f;
593       }
594     }
595 
596     // Search ended
597     return nullptr;
598   }
599 
600   // getter for current file level
601   // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
GetHitFileLevel()602   unsigned int GetHitFileLevel() { return hit_file_level_; }
603 
604   // Returns true if the most recent "hit file" (i.e., one returned by
605   // GetNextFile()) is at the last index in its level.
IsHitFileLastInLevel()606   bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
607 
CurrentFileRange()608   const MultiGetRange& CurrentFileRange() { return current_file_range_; }
609 
610  private:
611   unsigned int num_levels_;
612   unsigned int curr_level_;
613   unsigned int returned_file_level_;
614   unsigned int hit_file_level_;
615 
616   struct FilePickerContext {
617     int32_t search_left_bound;
618     int32_t search_right_bound;
619     unsigned int curr_index_in_curr_level;
620     unsigned int start_index_in_curr_level;
621 
FilePickerContextROCKSDB_NAMESPACE::__anon6d02991c0111::FilePickerMultiGet::FilePickerContext622     FilePickerContext(int32_t left, int32_t right)
623         : search_left_bound(left), search_right_bound(right),
624           curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
625 
626     FilePickerContext() = default;
627   };
628   std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
629   MultiGetRange* range_;
630   // Iterator to iterate through the keys in a MultiGet batch, that gets reset
631   // at the beginning of each level. Each call to GetNextFile() will position
632   // batch_iter_ at or right after the last key that was found in the returned
633   // SST file
634   MultiGetRange::Iterator batch_iter_;
635   // An iterator that records the previous position of batch_iter_, i.e last
636   // key found in the previous SST file, in order to serve as the start of
637   // the batch key range for the next SST file
638   MultiGetRange::Iterator batch_iter_prev_;
639   MultiGetRange::Iterator upper_key_;
640   bool maybe_repeat_key_;
641   MultiGetRange current_level_range_;
642   MultiGetRange current_file_range_;
643   autovector<LevelFilesBrief>* level_files_brief_;
644   bool search_ended_;
645   bool is_hit_file_last_in_level_;
646   LevelFilesBrief* curr_file_level_;
647   FileIndexer* file_indexer_;
648   const Comparator* user_comparator_;
649   const InternalKeyComparator* internal_comparator_;
650 
651   // Setup local variables to search next level.
652   // Returns false if there are no more levels to search.
PrepareNextLevel()653   bool PrepareNextLevel() {
654     if (curr_level_ == 0) {
655       MultiGetRange::Iterator mget_iter = current_level_range_.begin();
656       if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
657           curr_file_level_->num_files) {
658         batch_iter_prev_ = current_level_range_.begin();
659         upper_key_ = batch_iter_ = current_level_range_.begin();
660         return true;
661       }
662     }
663 
664     curr_level_++;
665     // Reset key range to saved value
666     while (curr_level_ < num_levels_) {
667       bool level_contains_keys = false;
668       curr_file_level_ = &(*level_files_brief_)[curr_level_];
669       if (curr_file_level_->num_files == 0) {
670         // When current level is empty, the search bound generated from upper
671         // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
672         // also empty.
673 
674         for (auto mget_iter = current_level_range_.begin();
675              mget_iter != current_level_range_.end(); ++mget_iter) {
676           struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
677 
678           assert(fp_ctx.search_left_bound == 0);
679           assert(fp_ctx.search_right_bound == -1 ||
680                  fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
681           // Since current level is empty, it will need to search all files in
682           // the next level
683           fp_ctx.search_left_bound = 0;
684           fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
685         }
686         // Skip all subsequent empty levels
687         do {
688           ++curr_level_;
689         } while ((curr_level_ < num_levels_) &&
690                  (*level_files_brief_)[curr_level_].num_files == 0);
691         continue;
692       }
693 
694       // Some files may overlap each other. We find
695       // all files that overlap user_key and process them in order from
696       // newest to oldest. In the context of merge-operator, this can occur at
697       // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
698       // are always compacted into a single entry).
699       int32_t start_index = -1;
700       current_level_range_ =
701           MultiGetRange(*range_, range_->begin(), range_->end());
702       for (auto mget_iter = current_level_range_.begin();
703            mget_iter != current_level_range_.end(); ++mget_iter) {
704         struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
705         if (curr_level_ == 0) {
706           // On Level-0, we read through all files to check for overlap.
707           start_index = 0;
708           level_contains_keys = true;
709         } else {
710           // On Level-n (n>=1), files are sorted. Binary search to find the
711           // earliest file whose largest key >= ikey. Search left bound and
712           // right bound are used to narrow the range.
713           if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
714             if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
715               fp_ctx.search_right_bound =
716                   static_cast<int32_t>(curr_file_level_->num_files) - 1;
717             }
718             // `search_right_bound_` is an inclusive upper-bound, but since it
719             // was determined based on user key, it is still possible the lookup
720             // key falls to the right of `search_right_bound_`'s corresponding
721             // file. So, pass a limit one higher, which allows us to detect this
722             // case.
723             Slice& ikey = mget_iter->ikey;
724             start_index = FindFileInRange(
725                 *internal_comparator_, *curr_file_level_, ikey,
726                 static_cast<uint32_t>(fp_ctx.search_left_bound),
727                 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
728             if (start_index == fp_ctx.search_right_bound + 1) {
729               // `ikey_` comes after `search_right_bound_`. The lookup key does
730               // not exist on this level, so let's skip this level and do a full
731               // binary search on the next level.
732               fp_ctx.search_left_bound = 0;
733               fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
734               current_level_range_.SkipKey(mget_iter);
735               continue;
736             } else {
737               level_contains_keys = true;
738             }
739           } else {
740             // search_left_bound > search_right_bound, key does not exist in
741             // this level. Since no comparison is done in this level, it will
742             // need to search all files in the next level.
743             fp_ctx.search_left_bound = 0;
744             fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
745             current_level_range_.SkipKey(mget_iter);
746             continue;
747           }
748         }
749         fp_ctx.start_index_in_curr_level = start_index;
750         fp_ctx.curr_index_in_curr_level = start_index;
751       }
752       if (level_contains_keys) {
753         batch_iter_prev_ = current_level_range_.begin();
754         upper_key_ = batch_iter_ = current_level_range_.begin();
755         return true;
756       }
757       curr_level_++;
758     }
759     // curr_level_ = num_levels_. So, no more levels to search.
760     return false;
761   }
762 };
763 }  // anonymous namespace
764 
~VersionStorageInfo()765 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
766 
~Version()767 Version::~Version() {
768   assert(refs_ == 0);
769 
770   // Remove from linked list
771   prev_->next_ = next_;
772   next_->prev_ = prev_;
773 
774   // Drop references to files
775   for (int level = 0; level < storage_info_.num_levels_; level++) {
776     for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
777       FileMetaData* f = storage_info_.files_[level][i];
778       assert(f->refs > 0);
779       f->refs--;
780       if (f->refs <= 0) {
781         assert(cfd_ != nullptr);
782         uint32_t path_id = f->fd.GetPathId();
783         assert(path_id < cfd_->ioptions()->cf_paths.size());
784         vset_->obsolete_files_.push_back(
785             ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
786       }
787     }
788   }
789 }
790 
FindFile(const InternalKeyComparator & icmp,const LevelFilesBrief & file_level,const Slice & key)791 int FindFile(const InternalKeyComparator& icmp,
792              const LevelFilesBrief& file_level,
793              const Slice& key) {
794   return FindFileInRange(icmp, file_level, key, 0,
795                          static_cast<uint32_t>(file_level.num_files));
796 }
797 
DoGenerateLevelFilesBrief(LevelFilesBrief * file_level,const std::vector<FileMetaData * > & files,Arena * arena)798 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
799         const std::vector<FileMetaData*>& files,
800         Arena* arena) {
801   assert(file_level);
802   assert(arena);
803 
804   size_t num = files.size();
805   file_level->num_files = num;
806   char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
807   file_level->files = new (mem)FdWithKeyRange[num];
808 
809   for (size_t i = 0; i < num; i++) {
810     Slice smallest_key = files[i]->smallest.Encode();
811     Slice largest_key = files[i]->largest.Encode();
812 
813     // Copy key slice to sequential memory
814     size_t smallest_size = smallest_key.size();
815     size_t largest_size = largest_key.size();
816     mem = arena->AllocateAligned(smallest_size + largest_size);
817     memcpy(mem, smallest_key.data(), smallest_size);
818     memcpy(mem + smallest_size, largest_key.data(), largest_size);
819 
820     FdWithKeyRange& f = file_level->files[i];
821     f.fd = files[i]->fd;
822     f.file_metadata = files[i];
823     f.smallest_key = Slice(mem, smallest_size);
824     f.largest_key = Slice(mem + smallest_size, largest_size);
825   }
826 }
827 
AfterFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)828 static bool AfterFile(const Comparator* ucmp,
829                       const Slice* user_key, const FdWithKeyRange* f) {
830   // nullptr user_key occurs before all keys and is therefore never after *f
831   return (user_key != nullptr &&
832           ucmp->CompareWithoutTimestamp(*user_key,
833                                         ExtractUserKey(f->largest_key)) > 0);
834 }
835 
BeforeFile(const Comparator * ucmp,const Slice * user_key,const FdWithKeyRange * f)836 static bool BeforeFile(const Comparator* ucmp,
837                        const Slice* user_key, const FdWithKeyRange* f) {
838   // nullptr user_key occurs after all keys and is therefore never before *f
839   return (user_key != nullptr &&
840           ucmp->CompareWithoutTimestamp(*user_key,
841                                         ExtractUserKey(f->smallest_key)) < 0);
842 }
843 
SomeFileOverlapsRange(const InternalKeyComparator & icmp,bool disjoint_sorted_files,const LevelFilesBrief & file_level,const Slice * smallest_user_key,const Slice * largest_user_key)844 bool SomeFileOverlapsRange(
845     const InternalKeyComparator& icmp,
846     bool disjoint_sorted_files,
847     const LevelFilesBrief& file_level,
848     const Slice* smallest_user_key,
849     const Slice* largest_user_key) {
850   const Comparator* ucmp = icmp.user_comparator();
851   if (!disjoint_sorted_files) {
852     // Need to check against all files
853     for (size_t i = 0; i < file_level.num_files; i++) {
854       const FdWithKeyRange* f = &(file_level.files[i]);
855       if (AfterFile(ucmp, smallest_user_key, f) ||
856           BeforeFile(ucmp, largest_user_key, f)) {
857         // No overlap
858       } else {
859         return true;  // Overlap
860       }
861     }
862     return false;
863   }
864 
865   // Binary search over file list
866   uint32_t index = 0;
867   if (smallest_user_key != nullptr) {
868     // Find the leftmost possible internal key for smallest_user_key
869     InternalKey small;
870     small.SetMinPossibleForUserKey(*smallest_user_key);
871     index = FindFile(icmp, file_level, small.Encode());
872   }
873 
874   if (index >= file_level.num_files) {
875     // beginning of range is after all files, so no overlap.
876     return false;
877   }
878 
879   return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
880 }
881 
882 namespace {
883 
884 class LevelIterator final : public InternalIterator {
885  public:
886   // @param read_options Must outlive this iterator.
LevelIterator(TableCache * table_cache,const ReadOptions & read_options,const FileOptions & file_options,const InternalKeyComparator & icomparator,const LevelFilesBrief * flevel,const SliceTransform * prefix_extractor,bool should_sample,HistogramImpl * file_read_hist,TableReaderCaller caller,bool skip_filters,int level,RangeDelAggregator * range_del_agg,const std::vector<AtomicCompactionUnitBoundary> * compaction_boundaries=nullptr,bool allow_unprepared_value=false)887   LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
888                 const FileOptions& file_options,
889                 const InternalKeyComparator& icomparator,
890                 const LevelFilesBrief* flevel,
891                 const SliceTransform* prefix_extractor, bool should_sample,
892                 HistogramImpl* file_read_hist, TableReaderCaller caller,
893                 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
894                 const std::vector<AtomicCompactionUnitBoundary>*
895                     compaction_boundaries = nullptr,
896                 bool allow_unprepared_value = false)
897       : table_cache_(table_cache),
898         read_options_(read_options),
899         file_options_(file_options),
900         icomparator_(icomparator),
901         user_comparator_(icomparator.user_comparator()),
902         flevel_(flevel),
903         prefix_extractor_(prefix_extractor),
904         file_read_hist_(file_read_hist),
905         should_sample_(should_sample),
906         caller_(caller),
907         skip_filters_(skip_filters),
908         allow_unprepared_value_(allow_unprepared_value),
909         file_index_(flevel_->num_files),
910         level_(level),
911         range_del_agg_(range_del_agg),
912         pinned_iters_mgr_(nullptr),
913         compaction_boundaries_(compaction_boundaries) {
914     // Empty level is not supported.
915     assert(flevel_ != nullptr && flevel_->num_files > 0);
916   }
917 
~LevelIterator()918   ~LevelIterator() override { delete file_iter_.Set(nullptr); }
919 
920   void Seek(const Slice& target) override;
921   void SeekForPrev(const Slice& target) override;
922   void SeekToFirst() override;
923   void SeekToLast() override;
924   void Next() final override;
925   bool NextAndGetResult(IterateResult* result) override;
926   void Prev() override;
927 
Valid() const928   bool Valid() const override { return file_iter_.Valid(); }
key() const929   Slice key() const override {
930     assert(Valid());
931     return file_iter_.key();
932   }
933 
value() const934   Slice value() const override {
935     assert(Valid());
936     return file_iter_.value();
937   }
938 
status() const939   Status status() const override {
940     return file_iter_.iter() ? file_iter_.status() : Status::OK();
941   }
942 
PrepareValue()943   bool PrepareValue() override {
944     return file_iter_.PrepareValue();
945   }
946 
MayBeOutOfLowerBound()947   inline bool MayBeOutOfLowerBound() override {
948     assert(Valid());
949     return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
950   }
951 
UpperBoundCheckResult()952   inline IterBoundCheck UpperBoundCheckResult() override {
953     if (Valid()) {
954       return file_iter_.UpperBoundCheckResult();
955     } else {
956       return IterBoundCheck::kUnknown;
957     }
958   }
959 
SetPinnedItersMgr(PinnedIteratorsManager * pinned_iters_mgr)960   void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
961     pinned_iters_mgr_ = pinned_iters_mgr;
962     if (file_iter_.iter()) {
963       file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
964     }
965   }
966 
IsKeyPinned() const967   bool IsKeyPinned() const override {
968     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
969            file_iter_.iter() && file_iter_.IsKeyPinned();
970   }
971 
IsValuePinned() const972   bool IsValuePinned() const override {
973     return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
974            file_iter_.iter() && file_iter_.IsValuePinned();
975   }
976 
977  private:
978   // Return true if at least one invalid file is seen and skipped.
979   bool SkipEmptyFileForward();
980   void SkipEmptyFileBackward();
981   void SetFileIterator(InternalIterator* iter);
982   void InitFileIterator(size_t new_file_index);
983 
file_smallest_key(size_t file_index)984   const Slice& file_smallest_key(size_t file_index) {
985     assert(file_index < flevel_->num_files);
986     return flevel_->files[file_index].smallest_key;
987   }
988 
KeyReachedUpperBound(const Slice & internal_key)989   bool KeyReachedUpperBound(const Slice& internal_key) {
990     return read_options_.iterate_upper_bound != nullptr &&
991            user_comparator_.CompareWithoutTimestamp(
992                ExtractUserKey(internal_key), /*a_has_ts=*/true,
993                *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
994   }
995 
NewFileIterator()996   InternalIterator* NewFileIterator() {
997     assert(file_index_ < flevel_->num_files);
998     auto file_meta = flevel_->files[file_index_];
999     if (should_sample_) {
1000       sample_file_read_inc(file_meta.file_metadata);
1001     }
1002 
1003     const InternalKey* smallest_compaction_key = nullptr;
1004     const InternalKey* largest_compaction_key = nullptr;
1005     if (compaction_boundaries_ != nullptr) {
1006       smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
1007       largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
1008     }
1009     CheckMayBeOutOfLowerBound();
1010     return table_cache_->NewIterator(
1011         read_options_, file_options_, icomparator_, *file_meta.file_metadata,
1012         range_del_agg_, prefix_extractor_,
1013         nullptr /* don't need reference to table */, file_read_hist_, caller_,
1014         /*arena=*/nullptr, skip_filters_, level_,
1015         /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
1016         largest_compaction_key, allow_unprepared_value_);
1017   }
1018 
1019   // Check if current file being fully within iterate_lower_bound.
1020   //
1021   // Note MyRocks may update iterate bounds between seek. To workaround it,
1022   // we need to check and update may_be_out_of_lower_bound_ accordingly.
CheckMayBeOutOfLowerBound()1023   void CheckMayBeOutOfLowerBound() {
1024     if (read_options_.iterate_lower_bound != nullptr &&
1025         file_index_ < flevel_->num_files) {
1026       may_be_out_of_lower_bound_ =
1027           user_comparator_.CompareWithoutTimestamp(
1028               ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
1029               *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
1030     }
1031   }
1032 
1033   TableCache* table_cache_;
1034   const ReadOptions& read_options_;
1035   const FileOptions& file_options_;
1036   const InternalKeyComparator& icomparator_;
1037   const UserComparatorWrapper user_comparator_;
1038   const LevelFilesBrief* flevel_;
1039   mutable FileDescriptor current_value_;
1040   // `prefix_extractor_` may be non-null even for total order seek. Checking
1041   // this variable is not the right way to identify whether prefix iterator
1042   // is used.
1043   const SliceTransform* prefix_extractor_;
1044 
1045   HistogramImpl* file_read_hist_;
1046   bool should_sample_;
1047   TableReaderCaller caller_;
1048   bool skip_filters_;
1049   bool allow_unprepared_value_;
1050   bool may_be_out_of_lower_bound_ = true;
1051   size_t file_index_;
1052   int level_;
1053   RangeDelAggregator* range_del_agg_;
1054   IteratorWrapper file_iter_;  // May be nullptr
1055   PinnedIteratorsManager* pinned_iters_mgr_;
1056 
1057   // To be propagated to RangeDelAggregator in order to safely truncate range
1058   // tombstones.
1059   const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1060 };
1061 
Seek(const Slice & target)1062 void LevelIterator::Seek(const Slice& target) {
1063   // Check whether the seek key fall under the same file
1064   bool need_to_reseek = true;
1065   if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1066     const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1067     if (icomparator_.InternalKeyComparator::Compare(
1068             target, cur_file.largest_key) <= 0 &&
1069         icomparator_.InternalKeyComparator::Compare(
1070             target, cur_file.smallest_key) >= 0) {
1071       need_to_reseek = false;
1072       assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1073              file_index_);
1074     }
1075   }
1076   if (need_to_reseek) {
1077     TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1078     size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1079     InitFileIterator(new_file_index);
1080   }
1081 
1082   if (file_iter_.iter() != nullptr) {
1083     file_iter_.Seek(target);
1084   }
1085   if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1086       !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1087       file_iter_.iter() != nullptr && file_iter_.Valid()) {
1088     // We've skipped the file we initially positioned to. In the prefix
1089     // seek case, it is likely that the file is skipped because of
1090     // prefix bloom or hash, where more keys are skipped. We then check
1091     // the current key and invalidate the iterator if the prefix is
1092     // already passed.
1093     // When doing prefix iterator seek, when keys for one prefix have
1094     // been exhausted, it can jump to any key that is larger. Here we are
1095     // enforcing a stricter contract than that, in order to make it easier for
1096     // higher layers (merging and DB iterator) to reason the correctness:
1097     // 1. Within the prefix, the result should be accurate.
1098     // 2. If keys for the prefix is exhausted, it is either positioned to the
1099     //    next key after the prefix, or make the iterator invalid.
1100     // A side benefit will be that it invalidates the iterator earlier so that
1101     // the upper level merging iterator can merge fewer child iterators.
1102     size_t ts_sz = user_comparator_.timestamp_size();
1103     Slice target_user_key_without_ts =
1104         ExtractUserKeyAndStripTimestamp(target, ts_sz);
1105     Slice file_user_key_without_ts =
1106         ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
1107     if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
1108         (!prefix_extractor_->InDomain(file_user_key_without_ts) ||
1109          user_comparator_.CompareWithoutTimestamp(
1110              prefix_extractor_->Transform(target_user_key_without_ts), false,
1111              prefix_extractor_->Transform(file_user_key_without_ts),
1112              false) != 0)) {
1113       SetFileIterator(nullptr);
1114     }
1115   }
1116   CheckMayBeOutOfLowerBound();
1117 }
1118 
SeekForPrev(const Slice & target)1119 void LevelIterator::SeekForPrev(const Slice& target) {
1120   size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1121   if (new_file_index >= flevel_->num_files) {
1122     new_file_index = flevel_->num_files - 1;
1123   }
1124 
1125   InitFileIterator(new_file_index);
1126   if (file_iter_.iter() != nullptr) {
1127     file_iter_.SeekForPrev(target);
1128     SkipEmptyFileBackward();
1129   }
1130   CheckMayBeOutOfLowerBound();
1131 }
1132 
SeekToFirst()1133 void LevelIterator::SeekToFirst() {
1134   InitFileIterator(0);
1135   if (file_iter_.iter() != nullptr) {
1136     file_iter_.SeekToFirst();
1137   }
1138   SkipEmptyFileForward();
1139   CheckMayBeOutOfLowerBound();
1140 }
1141 
SeekToLast()1142 void LevelIterator::SeekToLast() {
1143   InitFileIterator(flevel_->num_files - 1);
1144   if (file_iter_.iter() != nullptr) {
1145     file_iter_.SeekToLast();
1146   }
1147   SkipEmptyFileBackward();
1148   CheckMayBeOutOfLowerBound();
1149 }
1150 
Next()1151 void LevelIterator::Next() {
1152   assert(Valid());
1153   file_iter_.Next();
1154   SkipEmptyFileForward();
1155 }
1156 
NextAndGetResult(IterateResult * result)1157 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1158   assert(Valid());
1159   bool is_valid = file_iter_.NextAndGetResult(result);
1160   if (!is_valid) {
1161     SkipEmptyFileForward();
1162     is_valid = Valid();
1163     if (is_valid) {
1164       result->key = key();
1165       result->bound_check_result = file_iter_.UpperBoundCheckResult();
1166       // Ideally, we should return the real file_iter_.value_prepared but the
1167       // information is not here. It would casue an extra PrepareValue()
1168       // for the first key of a file.
1169       result->value_prepared = !allow_unprepared_value_;
1170     }
1171   }
1172   return is_valid;
1173 }
1174 
Prev()1175 void LevelIterator::Prev() {
1176   assert(Valid());
1177   file_iter_.Prev();
1178   SkipEmptyFileBackward();
1179 }
1180 
SkipEmptyFileForward()1181 bool LevelIterator::SkipEmptyFileForward() {
1182   bool seen_empty_file = false;
1183   while (file_iter_.iter() == nullptr ||
1184          (!file_iter_.Valid() && file_iter_.status().ok() &&
1185           file_iter_.iter()->UpperBoundCheckResult() !=
1186               IterBoundCheck::kOutOfBound)) {
1187     seen_empty_file = true;
1188     // Move to next file
1189     if (file_index_ >= flevel_->num_files - 1) {
1190       // Already at the last file
1191       SetFileIterator(nullptr);
1192       break;
1193     }
1194     if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1195       SetFileIterator(nullptr);
1196       break;
1197     }
1198     InitFileIterator(file_index_ + 1);
1199     if (file_iter_.iter() != nullptr) {
1200       file_iter_.SeekToFirst();
1201     }
1202   }
1203   return seen_empty_file;
1204 }
1205 
SkipEmptyFileBackward()1206 void LevelIterator::SkipEmptyFileBackward() {
1207   while (file_iter_.iter() == nullptr ||
1208          (!file_iter_.Valid() && file_iter_.status().ok())) {
1209     // Move to previous file
1210     if (file_index_ == 0) {
1211       // Already the first file
1212       SetFileIterator(nullptr);
1213       return;
1214     }
1215     InitFileIterator(file_index_ - 1);
1216     if (file_iter_.iter() != nullptr) {
1217       file_iter_.SeekToLast();
1218     }
1219   }
1220 }
1221 
SetFileIterator(InternalIterator * iter)1222 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1223   if (pinned_iters_mgr_ && iter) {
1224     iter->SetPinnedItersMgr(pinned_iters_mgr_);
1225   }
1226 
1227   InternalIterator* old_iter = file_iter_.Set(iter);
1228   if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1229     pinned_iters_mgr_->PinIterator(old_iter);
1230   } else {
1231     delete old_iter;
1232   }
1233 }
1234 
InitFileIterator(size_t new_file_index)1235 void LevelIterator::InitFileIterator(size_t new_file_index) {
1236   if (new_file_index >= flevel_->num_files) {
1237     file_index_ = new_file_index;
1238     SetFileIterator(nullptr);
1239     return;
1240   } else {
1241     // If the file iterator shows incomplete, we try it again if users seek
1242     // to the same file, as this time we may go to a different data block
1243     // which is cached in block cache.
1244     //
1245     if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1246         new_file_index == file_index_) {
1247       // file_iter_ is already constructed with this iterator, so
1248       // no need to change anything
1249     } else {
1250       file_index_ = new_file_index;
1251       InternalIterator* iter = NewFileIterator();
1252       SetFileIterator(iter);
1253     }
1254   }
1255 }
1256 }  // anonymous namespace
1257 
GetTableProperties(std::shared_ptr<const TableProperties> * tp,const FileMetaData * file_meta,const std::string * fname) const1258 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1259                                    const FileMetaData* file_meta,
1260                                    const std::string* fname) const {
1261   auto table_cache = cfd_->table_cache();
1262   auto ioptions = cfd_->ioptions();
1263   Status s = table_cache->GetTableProperties(
1264       file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1265       mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1266   if (s.ok()) {
1267     return s;
1268   }
1269 
1270   // We only ignore error type `Incomplete` since it's by design that we
1271   // disallow table when it's not in table cache.
1272   if (!s.IsIncomplete()) {
1273     return s;
1274   }
1275 
1276   // 2. Table is not present in table cache, we'll read the table properties
1277   // directly from the properties block in the file.
1278   std::unique_ptr<FSRandomAccessFile> file;
1279   std::string file_name;
1280   if (fname != nullptr) {
1281     file_name = *fname;
1282   } else {
1283     file_name =
1284       TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1285                     file_meta->fd.GetPathId());
1286   }
1287   s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1288                                         nullptr);
1289   if (!s.ok()) {
1290     return s;
1291   }
1292 
1293   TableProperties* raw_table_properties;
1294   // By setting the magic number to kInvalidTableMagicNumber, we can by
1295   // pass the magic number check in the footer.
1296   std::unique_ptr<RandomAccessFileReader> file_reader(
1297       new RandomAccessFileReader(
1298           std::move(file), file_name, nullptr /* env */, io_tracer_,
1299           nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
1300           nullptr /* rate_limiter */, ioptions->listeners));
1301   s = ReadTableProperties(
1302       file_reader.get(), file_meta->fd.GetFileSize(),
1303       Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1304       &raw_table_properties, false /* compression_type_missing */);
1305   if (!s.ok()) {
1306     return s;
1307   }
1308   RecordTick(ioptions->stats, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1309 
1310   *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1311   return s;
1312 }
1313 
GetPropertiesOfAllTables(TablePropertiesCollection * props)1314 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1315   Status s;
1316   for (int level = 0; level < storage_info_.num_levels_; level++) {
1317     s = GetPropertiesOfAllTables(props, level);
1318     if (!s.ok()) {
1319       return s;
1320     }
1321   }
1322 
1323   return Status::OK();
1324 }
1325 
TablesRangeTombstoneSummary(int max_entries_to_print,std::string * out_str)1326 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1327                                             std::string* out_str) {
1328   if (max_entries_to_print <= 0) {
1329     return Status::OK();
1330   }
1331   int num_entries_left = max_entries_to_print;
1332 
1333   std::stringstream ss;
1334 
1335   for (int level = 0; level < storage_info_.num_levels_; level++) {
1336     for (const auto& file_meta : storage_info_.files_[level]) {
1337       auto fname =
1338           TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1339                         file_meta->fd.GetPathId());
1340 
1341       ss << "=== file : " << fname << " ===\n";
1342 
1343       TableCache* table_cache = cfd_->table_cache();
1344       std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1345 
1346       Status s = table_cache->GetRangeTombstoneIterator(
1347           ReadOptions(), cfd_->internal_comparator(), *file_meta,
1348           &tombstone_iter);
1349       if (!s.ok()) {
1350         return s;
1351       }
1352       if (tombstone_iter) {
1353         tombstone_iter->SeekToFirst();
1354 
1355         while (tombstone_iter->Valid() && num_entries_left > 0) {
1356           ss << "start: " << tombstone_iter->start_key().ToString(true)
1357              << " end: " << tombstone_iter->end_key().ToString(true)
1358              << " seq: " << tombstone_iter->seq() << '\n';
1359           tombstone_iter->Next();
1360           num_entries_left--;
1361         }
1362         if (num_entries_left <= 0) {
1363           break;
1364         }
1365       }
1366     }
1367     if (num_entries_left <= 0) {
1368       break;
1369     }
1370   }
1371   assert(num_entries_left >= 0);
1372   if (num_entries_left <= 0) {
1373     ss << "(results may not be complete)\n";
1374   }
1375 
1376   *out_str = ss.str();
1377   return Status::OK();
1378 }
1379 
GetPropertiesOfAllTables(TablePropertiesCollection * props,int level)1380 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1381                                          int level) {
1382   for (const auto& file_meta : storage_info_.files_[level]) {
1383     auto fname =
1384         TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1385                       file_meta->fd.GetPathId());
1386     // 1. If the table is already present in table cache, load table
1387     // properties from there.
1388     std::shared_ptr<const TableProperties> table_properties;
1389     Status s = GetTableProperties(&table_properties, file_meta, &fname);
1390     if (s.ok()) {
1391       props->insert({fname, table_properties});
1392     } else {
1393       return s;
1394     }
1395   }
1396 
1397   return Status::OK();
1398 }
1399 
GetPropertiesOfTablesInRange(const Range * range,std::size_t n,TablePropertiesCollection * props) const1400 Status Version::GetPropertiesOfTablesInRange(
1401     const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1402   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1403     for (decltype(n) i = 0; i < n; i++) {
1404       // Convert user_key into a corresponding internal key.
1405       InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1406       InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1407       std::vector<FileMetaData*> files;
1408       storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1409                                          false);
1410       for (const auto& file_meta : files) {
1411         auto fname =
1412             TableFileName(cfd_->ioptions()->cf_paths,
1413                           file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1414         if (props->count(fname) == 0) {
1415           // 1. If the table is already present in table cache, load table
1416           // properties from there.
1417           std::shared_ptr<const TableProperties> table_properties;
1418           Status s = GetTableProperties(&table_properties, file_meta, &fname);
1419           if (s.ok()) {
1420             props->insert({fname, table_properties});
1421           } else {
1422             return s;
1423           }
1424         }
1425       }
1426     }
1427   }
1428 
1429   return Status::OK();
1430 }
1431 
GetAggregatedTableProperties(std::shared_ptr<const TableProperties> * tp,int level)1432 Status Version::GetAggregatedTableProperties(
1433     std::shared_ptr<const TableProperties>* tp, int level) {
1434   TablePropertiesCollection props;
1435   Status s;
1436   if (level < 0) {
1437     s = GetPropertiesOfAllTables(&props);
1438   } else {
1439     s = GetPropertiesOfAllTables(&props, level);
1440   }
1441   if (!s.ok()) {
1442     return s;
1443   }
1444 
1445   auto* new_tp = new TableProperties();
1446   for (const auto& item : props) {
1447     new_tp->Add(*item.second);
1448   }
1449   tp->reset(new_tp);
1450   return Status::OK();
1451 }
1452 
GetMemoryUsageByTableReaders()1453 size_t Version::GetMemoryUsageByTableReaders() {
1454   size_t total_usage = 0;
1455   for (auto& file_level : storage_info_.level_files_brief_) {
1456     for (size_t i = 0; i < file_level.num_files; i++) {
1457       total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1458           file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1459           mutable_cf_options_.prefix_extractor.get());
1460     }
1461   }
1462   return total_usage;
1463 }
1464 
GetColumnFamilyMetaData(ColumnFamilyMetaData * cf_meta)1465 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1466   assert(cf_meta);
1467   assert(cfd_);
1468 
1469   cf_meta->name = cfd_->GetName();
1470   cf_meta->size = 0;
1471   cf_meta->file_count = 0;
1472   cf_meta->levels.clear();
1473 
1474   cf_meta->blob_file_size = 0;
1475   cf_meta->blob_file_count = 0;
1476   cf_meta->blob_files.clear();
1477 
1478   auto* ioptions = cfd_->ioptions();
1479   auto* vstorage = storage_info();
1480 
1481   for (int level = 0; level < cfd_->NumberLevels(); level++) {
1482     uint64_t level_size = 0;
1483     cf_meta->file_count += vstorage->LevelFiles(level).size();
1484     std::vector<SstFileMetaData> files;
1485     for (const auto& file : vstorage->LevelFiles(level)) {
1486       uint32_t path_id = file->fd.GetPathId();
1487       std::string file_path;
1488       if (path_id < ioptions->cf_paths.size()) {
1489         file_path = ioptions->cf_paths[path_id].path;
1490       } else {
1491         assert(!ioptions->cf_paths.empty());
1492         file_path = ioptions->cf_paths.back().path;
1493       }
1494       const uint64_t file_number = file->fd.GetNumber();
1495       files.emplace_back(
1496           MakeTableFileName("", file_number), file_number, file_path,
1497           static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1498           file->fd.largest_seqno, file->smallest.user_key().ToString(),
1499           file->largest.user_key().ToString(),
1500           file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1501           file->being_compacted, file->temperature,
1502           file->oldest_blob_file_number, file->TryGetOldestAncesterTime(),
1503           file->TryGetFileCreationTime(), file->file_checksum,
1504           file->file_checksum_func_name);
1505       files.back().num_entries = file->num_entries;
1506       files.back().num_deletions = file->num_deletions;
1507       level_size += file->fd.GetFileSize();
1508     }
1509     cf_meta->levels.emplace_back(
1510         level, level_size, std::move(files));
1511     cf_meta->size += level_size;
1512   }
1513   for (const auto& iter : vstorage->GetBlobFiles()) {
1514     const auto meta = iter.second.get();
1515     cf_meta->blob_files.emplace_back(
1516         meta->GetBlobFileNumber(), BlobFileName("", meta->GetBlobFileNumber()),
1517         ioptions->cf_paths.front().path, meta->GetBlobFileSize(),
1518         meta->GetTotalBlobCount(), meta->GetTotalBlobBytes(),
1519         meta->GetGarbageBlobCount(), meta->GetGarbageBlobBytes(),
1520         meta->GetChecksumMethod(), meta->GetChecksumValue());
1521     cf_meta->blob_file_count++;
1522     cf_meta->blob_file_size += meta->GetBlobFileSize();
1523   }
1524 }
1525 
GetSstFilesSize()1526 uint64_t Version::GetSstFilesSize() {
1527   uint64_t sst_files_size = 0;
1528   for (int level = 0; level < storage_info_.num_levels_; level++) {
1529     for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1530       sst_files_size += file_meta->fd.GetFileSize();
1531     }
1532   }
1533   return sst_files_size;
1534 }
1535 
GetCreationTimeOfOldestFile(uint64_t * creation_time)1536 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1537   uint64_t oldest_time = port::kMaxUint64;
1538   for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1539     for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1540       assert(meta->fd.table_reader != nullptr);
1541       uint64_t file_creation_time = meta->TryGetFileCreationTime();
1542       if (file_creation_time == kUnknownFileCreationTime) {
1543         *creation_time = 0;
1544         return;
1545       }
1546       if (file_creation_time < oldest_time) {
1547         oldest_time = file_creation_time;
1548       }
1549     }
1550   }
1551   *creation_time = oldest_time;
1552 }
1553 
GetEstimatedActiveKeys() const1554 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1555   // Estimation will be inaccurate when:
1556   // (1) there exist merge keys
1557   // (2) keys are directly overwritten
1558   // (3) deletion on non-existing keys
1559   // (4) low number of samples
1560   if (current_num_samples_ == 0) {
1561     return 0;
1562   }
1563 
1564   if (current_num_non_deletions_ <= current_num_deletions_) {
1565     return 0;
1566   }
1567 
1568   uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1569 
1570   uint64_t file_count = 0;
1571   for (int level = 0; level < num_levels_; ++level) {
1572     file_count += files_[level].size();
1573   }
1574 
1575   if (current_num_samples_ < file_count) {
1576     // casting to avoid overflowing
1577     return
1578       static_cast<uint64_t>(
1579         (est * static_cast<double>(file_count) / current_num_samples_)
1580       );
1581   } else {
1582     return est;
1583   }
1584 }
1585 
GetEstimatedCompressionRatioAtLevel(int level) const1586 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1587     int level) const {
1588   assert(level < num_levels_);
1589   uint64_t sum_file_size_bytes = 0;
1590   uint64_t sum_data_size_bytes = 0;
1591   for (auto* file_meta : files_[level]) {
1592     sum_file_size_bytes += file_meta->fd.GetFileSize();
1593     sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1594   }
1595   if (sum_file_size_bytes == 0) {
1596     return -1.0;
1597   }
1598   return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1599 }
1600 
AddIterators(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,RangeDelAggregator * range_del_agg,bool allow_unprepared_value)1601 void Version::AddIterators(const ReadOptions& read_options,
1602                            const FileOptions& soptions,
1603                            MergeIteratorBuilder* merge_iter_builder,
1604                            RangeDelAggregator* range_del_agg,
1605                            bool allow_unprepared_value) {
1606   assert(storage_info_.finalized_);
1607 
1608   for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1609     AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1610                          range_del_agg, allow_unprepared_value);
1611   }
1612 }
1613 
AddIteratorsForLevel(const ReadOptions & read_options,const FileOptions & soptions,MergeIteratorBuilder * merge_iter_builder,int level,RangeDelAggregator * range_del_agg,bool allow_unprepared_value)1614 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1615                                    const FileOptions& soptions,
1616                                    MergeIteratorBuilder* merge_iter_builder,
1617                                    int level,
1618                                    RangeDelAggregator* range_del_agg,
1619                                    bool allow_unprepared_value) {
1620   assert(storage_info_.finalized_);
1621   if (level >= storage_info_.num_non_empty_levels()) {
1622     // This is an empty level
1623     return;
1624   } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1625     // No files in this level
1626     return;
1627   }
1628 
1629   bool should_sample = should_sample_file_read();
1630 
1631   auto* arena = merge_iter_builder->GetArena();
1632   if (level == 0) {
1633     // Merge all level zero files together since they may overlap
1634     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1635       const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1636       merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1637           read_options, soptions, cfd_->internal_comparator(),
1638           *file.file_metadata, range_del_agg,
1639           mutable_cf_options_.prefix_extractor.get(), nullptr,
1640           cfd_->internal_stats()->GetFileReadHist(0),
1641           TableReaderCaller::kUserIterator, arena,
1642           /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1643           /*smallest_compaction_key=*/nullptr,
1644           /*largest_compaction_key=*/nullptr, allow_unprepared_value));
1645     }
1646     if (should_sample) {
1647       // Count ones for every L0 files. This is done per iterator creation
1648       // rather than Seek(), while files in other levels are recored per seek.
1649       // If users execute one range query per iterator, there may be some
1650       // discrepancy here.
1651       for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1652         sample_file_read_inc(meta);
1653       }
1654     }
1655   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1656     // For levels > 0, we can use a concatenating iterator that sequentially
1657     // walks through the non-overlapping files in the level, opening them
1658     // lazily.
1659     auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1660     merge_iter_builder->AddIterator(new (mem) LevelIterator(
1661         cfd_->table_cache(), read_options, soptions,
1662         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1663         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1664         cfd_->internal_stats()->GetFileReadHist(level),
1665         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1666         range_del_agg,
1667         /*compaction_boundaries=*/nullptr, allow_unprepared_value));
1668   }
1669 }
1670 
OverlapWithLevelIterator(const ReadOptions & read_options,const FileOptions & file_options,const Slice & smallest_user_key,const Slice & largest_user_key,int level,bool * overlap)1671 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1672                                          const FileOptions& file_options,
1673                                          const Slice& smallest_user_key,
1674                                          const Slice& largest_user_key,
1675                                          int level, bool* overlap) {
1676   assert(storage_info_.finalized_);
1677 
1678   auto icmp = cfd_->internal_comparator();
1679   auto ucmp = icmp.user_comparator();
1680 
1681   Arena arena;
1682   Status status;
1683   ReadRangeDelAggregator range_del_agg(&icmp,
1684                                        kMaxSequenceNumber /* upper_bound */);
1685 
1686   *overlap = false;
1687 
1688   if (level == 0) {
1689     for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1690       const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1691       if (AfterFile(ucmp, &smallest_user_key, file) ||
1692           BeforeFile(ucmp, &largest_user_key, file)) {
1693         continue;
1694       }
1695       ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1696           read_options, file_options, cfd_->internal_comparator(),
1697           *file->file_metadata, &range_del_agg,
1698           mutable_cf_options_.prefix_extractor.get(), nullptr,
1699           cfd_->internal_stats()->GetFileReadHist(0),
1700           TableReaderCaller::kUserIterator, &arena,
1701           /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1702           /*smallest_compaction_key=*/nullptr,
1703           /*largest_compaction_key=*/nullptr,
1704           /*allow_unprepared_value=*/false));
1705       status = OverlapWithIterator(
1706           ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1707       if (!status.ok() || *overlap) {
1708         break;
1709       }
1710     }
1711   } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1712     auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1713     ScopedArenaIterator iter(new (mem) LevelIterator(
1714         cfd_->table_cache(), read_options, file_options,
1715         cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1716         mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1717         cfd_->internal_stats()->GetFileReadHist(level),
1718         TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1719         &range_del_agg));
1720     status = OverlapWithIterator(
1721         ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1722   }
1723 
1724   if (status.ok() && *overlap == false &&
1725       range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1726     *overlap = true;
1727   }
1728   return status;
1729 }
1730 
VersionStorageInfo(const InternalKeyComparator * internal_comparator,const Comparator * user_comparator,int levels,CompactionStyle compaction_style,VersionStorageInfo * ref_vstorage,bool _force_consistency_checks)1731 VersionStorageInfo::VersionStorageInfo(
1732     const InternalKeyComparator* internal_comparator,
1733     const Comparator* user_comparator, int levels,
1734     CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1735     bool _force_consistency_checks)
1736     : internal_comparator_(internal_comparator),
1737       user_comparator_(user_comparator),
1738       // cfd is nullptr if Version is dummy
1739       num_levels_(levels),
1740       num_non_empty_levels_(0),
1741       file_indexer_(user_comparator),
1742       compaction_style_(compaction_style),
1743       files_(new std::vector<FileMetaData*>[num_levels_]),
1744       base_level_(num_levels_ == 1 ? -1 : 1),
1745       level_multiplier_(0.0),
1746       files_by_compaction_pri_(num_levels_),
1747       level0_non_overlapping_(false),
1748       next_file_to_compact_by_size_(num_levels_),
1749       compaction_score_(num_levels_),
1750       compaction_level_(num_levels_),
1751       l0_delay_trigger_count_(0),
1752       accumulated_file_size_(0),
1753       accumulated_raw_key_size_(0),
1754       accumulated_raw_value_size_(0),
1755       accumulated_num_non_deletions_(0),
1756       accumulated_num_deletions_(0),
1757       current_num_non_deletions_(0),
1758       current_num_deletions_(0),
1759       current_num_samples_(0),
1760       estimated_compaction_needed_bytes_(0),
1761       finalized_(false),
1762       force_consistency_checks_(_force_consistency_checks) {
1763   if (ref_vstorage != nullptr) {
1764     accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1765     accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1766     accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1767     accumulated_num_non_deletions_ =
1768         ref_vstorage->accumulated_num_non_deletions_;
1769     accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1770     current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1771     current_num_deletions_ = ref_vstorage->current_num_deletions_;
1772     current_num_samples_ = ref_vstorage->current_num_samples_;
1773     oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1774   }
1775 }
1776 
Version(ColumnFamilyData * column_family_data,VersionSet * vset,const FileOptions & file_opt,const MutableCFOptions mutable_cf_options,const std::shared_ptr<IOTracer> & io_tracer,uint64_t version_number)1777 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1778                  const FileOptions& file_opt,
1779                  const MutableCFOptions mutable_cf_options,
1780                  const std::shared_ptr<IOTracer>& io_tracer,
1781                  uint64_t version_number)
1782     : env_(vset->env_),
1783       clock_(vset->clock_),
1784       cfd_(column_family_data),
1785       info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->logger),
1786       db_statistics_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->stats),
1787       table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1788       blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
1789       merge_operator_(
1790           (cfd_ == nullptr) ? nullptr : cfd_->ioptions()->merge_operator.get()),
1791       storage_info_(
1792           (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1793           (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1794           cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1795           cfd_ == nullptr ? kCompactionStyleLevel
1796                           : cfd_->ioptions()->compaction_style,
1797           (cfd_ == nullptr || cfd_->current() == nullptr)
1798               ? nullptr
1799               : cfd_->current()->storage_info(),
1800           cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1801       vset_(vset),
1802       next_(this),
1803       prev_(this),
1804       refs_(0),
1805       file_options_(file_opt),
1806       mutable_cf_options_(mutable_cf_options),
1807       max_file_size_for_l0_meta_pin_(
1808           MaxFileSizeForL0MetaPin(mutable_cf_options_)),
1809       version_number_(version_number),
1810       io_tracer_(io_tracer) {}
1811 
GetBlob(const ReadOptions & read_options,const Slice & user_key,const Slice & blob_index_slice,PinnableSlice * value,uint64_t * bytes_read) const1812 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1813                         const Slice& blob_index_slice, PinnableSlice* value,
1814                         uint64_t* bytes_read) const {
1815   if (read_options.read_tier == kBlockCacheTier) {
1816     return Status::Incomplete("Cannot read blob: no disk I/O allowed");
1817   }
1818 
1819   BlobIndex blob_index;
1820 
1821   {
1822     Status s = blob_index.DecodeFrom(blob_index_slice);
1823     if (!s.ok()) {
1824       return s;
1825     }
1826   }
1827 
1828   return GetBlob(read_options, user_key, blob_index, value, bytes_read);
1829 }
1830 
GetBlob(const ReadOptions & read_options,const Slice & user_key,const BlobIndex & blob_index,PinnableSlice * value,uint64_t * bytes_read) const1831 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1832                         const BlobIndex& blob_index, PinnableSlice* value,
1833                         uint64_t* bytes_read) const {
1834   assert(value);
1835 
1836   if (blob_index.HasTTL() || blob_index.IsInlined()) {
1837     return Status::Corruption("Unexpected TTL/inlined blob index");
1838   }
1839 
1840   const auto& blob_files = storage_info_.GetBlobFiles();
1841 
1842   const uint64_t blob_file_number = blob_index.file_number();
1843 
1844   const auto it = blob_files.find(blob_file_number);
1845   if (it == blob_files.end()) {
1846     return Status::Corruption("Invalid blob file number");
1847   }
1848 
1849   CacheHandleGuard<BlobFileReader> blob_file_reader;
1850 
1851   {
1852     assert(blob_file_cache_);
1853     const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number,
1854                                                          &blob_file_reader);
1855     if (!s.ok()) {
1856       return s;
1857     }
1858   }
1859 
1860   assert(blob_file_reader.GetValue());
1861   const Status s = blob_file_reader.GetValue()->GetBlob(
1862       read_options, user_key, blob_index.offset(), blob_index.size(),
1863       blob_index.compression(), value, bytes_read);
1864 
1865   return s;
1866 }
1867 
MultiGetBlob(const ReadOptions & read_options,MultiGetRange & range,std::unordered_map<uint64_t,BlobReadRequests> & blob_rqs)1868 void Version::MultiGetBlob(
1869     const ReadOptions& read_options, MultiGetRange& range,
1870     std::unordered_map<uint64_t, BlobReadRequests>& blob_rqs) {
1871   if (read_options.read_tier == kBlockCacheTier) {
1872     Status s = Status::Incomplete("Cannot read blob(s): no disk I/O allowed");
1873     for (const auto& elem : blob_rqs) {
1874       for (const auto& blob_rq : elem.second) {
1875         const KeyContext& key_context = blob_rq.second;
1876         assert(key_context.s);
1877         assert(key_context.s->ok());
1878         *(key_context.s) = s;
1879         assert(key_context.get_context);
1880         auto& get_context = *(key_context.get_context);
1881         get_context.MarkKeyMayExist();
1882       }
1883     }
1884     return;
1885   }
1886 
1887   assert(!blob_rqs.empty());
1888   Status status;
1889   const auto& blob_files = storage_info_.GetBlobFiles();
1890   for (auto& elem : blob_rqs) {
1891     uint64_t blob_file_number = elem.first;
1892     if (blob_files.find(blob_file_number) == blob_files.end()) {
1893       auto& blobs_in_file = elem.second;
1894       for (const auto& blob : blobs_in_file) {
1895         const KeyContext& key_context = blob.second;
1896         *(key_context.s) = Status::Corruption("Invalid blob file number");
1897       }
1898       continue;
1899     }
1900     CacheHandleGuard<BlobFileReader> blob_file_reader;
1901     assert(blob_file_cache_);
1902     status = blob_file_cache_->GetBlobFileReader(blob_file_number,
1903                                                  &blob_file_reader);
1904     assert(!status.ok() || blob_file_reader.GetValue());
1905 
1906     auto& blobs_in_file = elem.second;
1907     if (!status.ok()) {
1908       for (const auto& blob : blobs_in_file) {
1909         const KeyContext& key_context = blob.second;
1910         *(key_context.s) = status;
1911       }
1912       continue;
1913     }
1914 
1915     assert(blob_file_reader.GetValue());
1916     const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
1917     const CompressionType compression =
1918         blob_file_reader.GetValue()->GetCompressionType();
1919 
1920     // sort blobs_in_file by file offset.
1921     std::sort(
1922         blobs_in_file.begin(), blobs_in_file.end(),
1923         [](const BlobReadRequest& lhs, const BlobReadRequest& rhs) -> bool {
1924           assert(lhs.first.file_number() == rhs.first.file_number());
1925           return lhs.first.offset() < rhs.first.offset();
1926         });
1927 
1928     autovector<std::reference_wrapper<const KeyContext>> blob_read_key_contexts;
1929     autovector<std::reference_wrapper<const Slice>> user_keys;
1930     autovector<uint64_t> offsets;
1931     autovector<uint64_t> value_sizes;
1932     autovector<Status*> statuses;
1933     autovector<PinnableSlice*> values;
1934     for (const auto& blob : blobs_in_file) {
1935       const auto& blob_index = blob.first;
1936       const KeyContext& key_context = blob.second;
1937       if (blob_index.HasTTL() || blob_index.IsInlined()) {
1938         *(key_context.s) =
1939             Status::Corruption("Unexpected TTL/inlined blob index");
1940         continue;
1941       }
1942       const uint64_t key_size = key_context.ukey_with_ts.size();
1943       const uint64_t offset = blob_index.offset();
1944       const uint64_t value_size = blob_index.size();
1945       if (!IsValidBlobOffset(offset, key_size, value_size, file_size)) {
1946         *(key_context.s) = Status::Corruption("Invalid blob offset");
1947         continue;
1948       }
1949       if (blob_index.compression() != compression) {
1950         *(key_context.s) =
1951             Status::Corruption("Compression type mismatch when reading a blob");
1952         continue;
1953       }
1954       blob_read_key_contexts.emplace_back(std::cref(key_context));
1955       user_keys.emplace_back(std::cref(key_context.ukey_with_ts));
1956       offsets.push_back(blob_index.offset());
1957       value_sizes.push_back(blob_index.size());
1958       statuses.push_back(key_context.s);
1959       values.push_back(key_context.value);
1960     }
1961     blob_file_reader.GetValue()->MultiGetBlob(read_options, user_keys, offsets,
1962                                               value_sizes, statuses, values,
1963                                               /*bytes_read=*/nullptr);
1964     size_t num = blob_read_key_contexts.size();
1965     assert(num == user_keys.size());
1966     assert(num == offsets.size());
1967     assert(num == value_sizes.size());
1968     assert(num == statuses.size());
1969     assert(num == values.size());
1970     for (size_t i = 0; i < num; ++i) {
1971       if (statuses[i]->ok()) {
1972         range.AddValueSize(blob_read_key_contexts[i].get().value->size());
1973         if (range.GetValueSize() > read_options.value_size_soft_limit) {
1974           *(blob_read_key_contexts[i].get().s) = Status::Aborted();
1975         }
1976       }
1977     }
1978   }
1979 }
1980 
Get(const ReadOptions & read_options,const LookupKey & k,PinnableSlice * value,std::string * timestamp,Status * status,MergeContext * merge_context,SequenceNumber * max_covering_tombstone_seq,bool * value_found,bool * key_exists,SequenceNumber * seq,ReadCallback * callback,bool * is_blob,bool do_merge)1981 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1982                   PinnableSlice* value, std::string* timestamp, Status* status,
1983                   MergeContext* merge_context,
1984                   SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1985                   bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1986                   bool* is_blob, bool do_merge) {
1987   Slice ikey = k.internal_key();
1988   Slice user_key = k.user_key();
1989 
1990   assert(status->ok() || status->IsMergeInProgress());
1991 
1992   if (key_exists != nullptr) {
1993     // will falsify below if not found
1994     *key_exists = true;
1995   }
1996 
1997   PinnedIteratorsManager pinned_iters_mgr;
1998   uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1999   if (vset_ && vset_->block_cache_tracer_ &&
2000       vset_->block_cache_tracer_->is_tracing_enabled()) {
2001     tracing_get_id = vset_->block_cache_tracer_->NextGetId();
2002   }
2003 
2004   // Note: the old StackableDB-based BlobDB passes in
2005   // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
2006   // need to provide it here.
2007   bool is_blob_index = false;
2008   bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
2009   BlobFetcher blob_fetcher(this, read_options);
2010 
2011   GetContext get_context(
2012       user_comparator(), merge_operator_, info_log_, db_statistics_,
2013       status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
2014       do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
2015       merge_context, do_merge, max_covering_tombstone_seq, clock_, seq,
2016       merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
2017       tracing_get_id, &blob_fetcher);
2018 
2019   // Pin blocks that we read to hold merge operands
2020   if (merge_operator_) {
2021     pinned_iters_mgr.StartPinning();
2022   }
2023 
2024   FilePicker fp(
2025       storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
2026       storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
2027       user_comparator(), internal_comparator());
2028   FdWithKeyRange* f = fp.GetNextFile();
2029 
2030   while (f != nullptr) {
2031     if (*max_covering_tombstone_seq > 0) {
2032       // The remaining files we look at will only contain covered keys, so we
2033       // stop here.
2034       break;
2035     }
2036     if (get_context.sample()) {
2037       sample_file_read_inc(f->file_metadata);
2038     }
2039 
2040     bool timer_enabled =
2041         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
2042         get_perf_context()->per_level_perf_context_enabled;
2043     StopWatchNano timer(clock_, timer_enabled /* auto_start */);
2044     *status = table_cache_->Get(
2045         read_options, *internal_comparator(), *f->file_metadata, ikey,
2046         &get_context, mutable_cf_options_.prefix_extractor.get(),
2047         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
2048         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
2049                         fp.IsHitFileLastInLevel()),
2050         fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
2051     // TODO: examine the behavior for corrupted key
2052     if (timer_enabled) {
2053       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
2054                                 fp.GetHitFileLevel());
2055     }
2056     if (!status->ok()) {
2057       if (db_statistics_ != nullptr) {
2058         get_context.ReportCounters();
2059       }
2060       return;
2061     }
2062 
2063     // report the counters before returning
2064     if (get_context.State() != GetContext::kNotFound &&
2065         get_context.State() != GetContext::kMerge &&
2066         db_statistics_ != nullptr) {
2067       get_context.ReportCounters();
2068     }
2069     switch (get_context.State()) {
2070       case GetContext::kNotFound:
2071         // Keep searching in other files
2072         break;
2073       case GetContext::kMerge:
2074         // TODO: update per-level perfcontext user_key_return_count for kMerge
2075         break;
2076       case GetContext::kFound:
2077         if (fp.GetHitFileLevel() == 0) {
2078           RecordTick(db_statistics_, GET_HIT_L0);
2079         } else if (fp.GetHitFileLevel() == 1) {
2080           RecordTick(db_statistics_, GET_HIT_L1);
2081         } else if (fp.GetHitFileLevel() >= 2) {
2082           RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2083         }
2084 
2085         PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2086                                   fp.GetHitFileLevel());
2087 
2088         if (is_blob_index) {
2089           if (do_merge && value) {
2090             constexpr uint64_t* bytes_read = nullptr;
2091 
2092             *status =
2093                 GetBlob(read_options, user_key, *value, value, bytes_read);
2094             if (!status->ok()) {
2095               if (status->IsIncomplete()) {
2096                 get_context.MarkKeyMayExist();
2097               }
2098               return;
2099             }
2100           }
2101         }
2102 
2103         return;
2104       case GetContext::kDeleted:
2105         // Use empty error message for speed
2106         *status = Status::NotFound();
2107         return;
2108       case GetContext::kCorrupt:
2109         *status = Status::Corruption("corrupted key for ", user_key);
2110         return;
2111       case GetContext::kUnexpectedBlobIndex:
2112         ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2113         *status = Status::NotSupported(
2114             "Encounter unexpected blob index. Please open DB with "
2115             "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2116         return;
2117     }
2118     f = fp.GetNextFile();
2119   }
2120   if (db_statistics_ != nullptr) {
2121     get_context.ReportCounters();
2122   }
2123   if (GetContext::kMerge == get_context.State()) {
2124     if (!do_merge) {
2125       *status = Status::OK();
2126       return;
2127     }
2128     if (!merge_operator_) {
2129       *status =  Status::InvalidArgument(
2130           "merge_operator is not properly initialized.");
2131       return;
2132     }
2133     // merge_operands are in saver and we hit the beginning of the key history
2134     // do a final merge of nullptr and operands;
2135     std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
2136     *status = MergeHelper::TimedFullMerge(
2137         merge_operator_, user_key, nullptr, merge_context->GetOperands(),
2138         str_value, info_log_, db_statistics_, clock_,
2139         nullptr /* result_operand */, true);
2140     if (LIKELY(value != nullptr)) {
2141       value->PinSelf();
2142     }
2143   } else {
2144     if (key_exists != nullptr) {
2145       *key_exists = false;
2146     }
2147     *status = Status::NotFound(); // Use an empty error message for speed
2148   }
2149 }
2150 
MultiGet(const ReadOptions & read_options,MultiGetRange * range,ReadCallback * callback)2151 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
2152                        ReadCallback* callback) {
2153   PinnedIteratorsManager pinned_iters_mgr;
2154 
2155   // Pin blocks that we read to hold merge operands
2156   if (merge_operator_) {
2157     pinned_iters_mgr.StartPinning();
2158   }
2159   uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
2160 
2161   if (vset_ && vset_->block_cache_tracer_ &&
2162       vset_->block_cache_tracer_->is_tracing_enabled()) {
2163     tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
2164   }
2165   // Even though we know the batch size won't be > MAX_BATCH_SIZE,
2166   // use autovector in order to avoid unnecessary construction of GetContext
2167   // objects, which is expensive
2168   autovector<GetContext, 16> get_ctx;
2169   BlobFetcher blob_fetcher(this, read_options);
2170   for (auto iter = range->begin(); iter != range->end(); ++iter) {
2171     assert(iter->s->ok() || iter->s->IsMergeInProgress());
2172     get_ctx.emplace_back(
2173         user_comparator(), merge_operator_, info_log_, db_statistics_,
2174         iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
2175         iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
2176         &(iter->merge_context), true, &iter->max_covering_tombstone_seq, clock_,
2177         nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr, callback,
2178         &iter->is_blob_index, tracing_mget_id, &blob_fetcher);
2179     // MergeInProgress status, if set, has been transferred to the get_context
2180     // state, so we set status to ok here. From now on, the iter status will
2181     // be used for IO errors, and get_context state will be used for any
2182     // key level errors
2183     *(iter->s) = Status::OK();
2184   }
2185   int get_ctx_index = 0;
2186   for (auto iter = range->begin(); iter != range->end();
2187        ++iter, get_ctx_index++) {
2188     iter->get_context = &(get_ctx[get_ctx_index]);
2189   }
2190 
2191   MultiGetRange file_picker_range(*range, range->begin(), range->end());
2192   FilePickerMultiGet fp(
2193       &file_picker_range,
2194       &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
2195       &storage_info_.file_indexer_, user_comparator(), internal_comparator());
2196   FdWithKeyRange* f = fp.GetNextFile();
2197   Status s;
2198   uint64_t num_index_read = 0;
2199   uint64_t num_filter_read = 0;
2200   uint64_t num_data_read = 0;
2201   uint64_t num_sst_read = 0;
2202 
2203   MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end());
2204   // blob_file => [[blob_idx, it], ...]
2205   std::unordered_map<uint64_t, BlobReadRequests> blob_rqs;
2206 
2207   while (f != nullptr) {
2208     MultiGetRange file_range = fp.CurrentFileRange();
2209     bool timer_enabled =
2210         GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
2211         get_perf_context()->per_level_perf_context_enabled;
2212     StopWatchNano timer(clock_, timer_enabled /* auto_start */);
2213     s = table_cache_->MultiGet(
2214         read_options, *internal_comparator(), *f->file_metadata, &file_range,
2215         mutable_cf_options_.prefix_extractor.get(),
2216         cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
2217         IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
2218                         fp.IsHitFileLastInLevel()),
2219         fp.GetHitFileLevel());
2220     // TODO: examine the behavior for corrupted key
2221     if (timer_enabled) {
2222       PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
2223                                 fp.GetHitFileLevel());
2224     }
2225     if (!s.ok()) {
2226       // TODO: Set status for individual keys appropriately
2227       for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
2228         *iter->s = s;
2229         file_range.MarkKeyDone(iter);
2230       }
2231       return;
2232     }
2233     uint64_t batch_size = 0;
2234     for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
2235          ++iter) {
2236       GetContext& get_context = *iter->get_context;
2237       Status* status = iter->s;
2238       // The Status in the KeyContext takes precedence over GetContext state
2239       // Status may be an error if there were any IO errors in the table
2240       // reader. We never expect Status to be NotFound(), as that is
2241       // determined by get_context
2242       assert(!status->IsNotFound());
2243       if (!status->ok()) {
2244         file_range.MarkKeyDone(iter);
2245         continue;
2246       }
2247 
2248       if (get_context.sample()) {
2249         sample_file_read_inc(f->file_metadata);
2250       }
2251       batch_size++;
2252       num_index_read += get_context.get_context_stats_.num_index_read;
2253       num_filter_read += get_context.get_context_stats_.num_filter_read;
2254       num_data_read += get_context.get_context_stats_.num_data_read;
2255       num_sst_read += get_context.get_context_stats_.num_sst_read;
2256 
2257       // report the counters before returning
2258       if (get_context.State() != GetContext::kNotFound &&
2259           get_context.State() != GetContext::kMerge &&
2260           db_statistics_ != nullptr) {
2261         get_context.ReportCounters();
2262       } else {
2263         if (iter->max_covering_tombstone_seq > 0) {
2264           // The remaining files we look at will only contain covered keys, so
2265           // we stop here for this key
2266           file_picker_range.SkipKey(iter);
2267         }
2268       }
2269       switch (get_context.State()) {
2270         case GetContext::kNotFound:
2271           // Keep searching in other files
2272           break;
2273         case GetContext::kMerge:
2274           // TODO: update per-level perfcontext user_key_return_count for kMerge
2275           break;
2276         case GetContext::kFound:
2277           if (fp.GetHitFileLevel() == 0) {
2278             RecordTick(db_statistics_, GET_HIT_L0);
2279           } else if (fp.GetHitFileLevel() == 1) {
2280             RecordTick(db_statistics_, GET_HIT_L1);
2281           } else if (fp.GetHitFileLevel() >= 2) {
2282             RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2283           }
2284 
2285           PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2286                                     fp.GetHitFileLevel());
2287 
2288           file_range.MarkKeyDone(iter);
2289 
2290           if (iter->is_blob_index) {
2291             if (iter->value) {
2292               const Slice& blob_index_slice = *(iter->value);
2293               BlobIndex blob_index;
2294               Status tmp_s = blob_index.DecodeFrom(blob_index_slice);
2295               if (tmp_s.ok()) {
2296                 const uint64_t blob_file_num = blob_index.file_number();
2297                 blob_rqs[blob_file_num].emplace_back(
2298                     std::make_pair(blob_index, std::cref(*iter)));
2299               } else {
2300                 *(iter->s) = tmp_s;
2301               }
2302             }
2303           } else {
2304             file_range.AddValueSize(iter->value->size());
2305             if (file_range.GetValueSize() >
2306                 read_options.value_size_soft_limit) {
2307               s = Status::Aborted();
2308               break;
2309             }
2310           }
2311           continue;
2312         case GetContext::kDeleted:
2313           // Use empty error message for speed
2314           *status = Status::NotFound();
2315           file_range.MarkKeyDone(iter);
2316           continue;
2317         case GetContext::kCorrupt:
2318           *status =
2319               Status::Corruption("corrupted key for ", iter->lkey->user_key());
2320           file_range.MarkKeyDone(iter);
2321           continue;
2322         case GetContext::kUnexpectedBlobIndex:
2323           ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2324           *status = Status::NotSupported(
2325               "Encounter unexpected blob index. Please open DB with "
2326               "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2327           file_range.MarkKeyDone(iter);
2328           continue;
2329       }
2330     }
2331 
2332     // Report MultiGet stats per level.
2333     if (fp.IsHitFileLastInLevel()) {
2334       // Dump the stats if this is the last file of this level and reset for
2335       // next level.
2336       RecordInHistogram(db_statistics_,
2337                         NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
2338                         num_index_read + num_filter_read);
2339       RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
2340                         num_data_read);
2341       RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
2342       num_filter_read = 0;
2343       num_index_read = 0;
2344       num_data_read = 0;
2345       num_sst_read = 0;
2346     }
2347 
2348     RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2349     if (!s.ok() || file_picker_range.empty()) {
2350       break;
2351     }
2352     f = fp.GetNextFile();
2353   }
2354 
2355   if (s.ok() && !blob_rqs.empty()) {
2356     MultiGetBlob(read_options, keys_with_blobs_range, blob_rqs);
2357   }
2358 
2359   // Process any left over keys
2360   for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
2361     GetContext& get_context = *iter->get_context;
2362     Status* status = iter->s;
2363     Slice user_key = iter->lkey->user_key();
2364 
2365     if (db_statistics_ != nullptr) {
2366       get_context.ReportCounters();
2367     }
2368     if (GetContext::kMerge == get_context.State()) {
2369       if (!merge_operator_) {
2370         *status = Status::InvalidArgument(
2371             "merge_operator is not properly initialized.");
2372         range->MarkKeyDone(iter);
2373         continue;
2374       }
2375       // merge_operands are in saver and we hit the beginning of the key history
2376       // do a final merge of nullptr and operands;
2377       std::string* str_value =
2378           iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2379       *status = MergeHelper::TimedFullMerge(
2380           merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2381           str_value, info_log_, db_statistics_, clock_,
2382           nullptr /* result_operand */, true);
2383       if (LIKELY(iter->value != nullptr)) {
2384         iter->value->PinSelf();
2385         range->AddValueSize(iter->value->size());
2386         range->MarkKeyDone(iter);
2387         if (range->GetValueSize() > read_options.value_size_soft_limit) {
2388           s = Status::Aborted();
2389           break;
2390         }
2391       }
2392     } else {
2393       range->MarkKeyDone(iter);
2394       *status = Status::NotFound();  // Use an empty error message for speed
2395     }
2396   }
2397 
2398   for (auto iter = range->begin(); iter != range->end(); ++iter) {
2399     range->MarkKeyDone(iter);
2400     *(iter->s) = s;
2401   }
2402 }
2403 
IsFilterSkipped(int level,bool is_file_last_in_level)2404 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2405   // Reaching the bottom level implies misses at all upper levels, so we'll
2406   // skip checking the filters when we predict a hit.
2407   return cfd_->ioptions()->optimize_filters_for_hits &&
2408          (level > 0 || is_file_last_in_level) &&
2409          level == storage_info_.num_non_empty_levels() - 1;
2410 }
2411 
GenerateLevelFilesBrief()2412 void VersionStorageInfo::GenerateLevelFilesBrief() {
2413   level_files_brief_.resize(num_non_empty_levels_);
2414   for (int level = 0; level < num_non_empty_levels_; level++) {
2415     DoGenerateLevelFilesBrief(
2416         &level_files_brief_[level], files_[level], &arena_);
2417   }
2418 }
2419 
PrepareApply(const MutableCFOptions & mutable_cf_options,bool update_stats)2420 void Version::PrepareApply(
2421     const MutableCFOptions& mutable_cf_options,
2422     bool update_stats) {
2423   TEST_SYNC_POINT_CALLBACK(
2424       "Version::PrepareApply:forced_check",
2425       reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
2426   UpdateAccumulatedStats(update_stats);
2427   storage_info_.UpdateNumNonEmptyLevels();
2428   storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2429   storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2430   storage_info_.GenerateFileIndexer();
2431   storage_info_.GenerateLevelFilesBrief();
2432   storage_info_.GenerateLevel0NonOverlapping();
2433   storage_info_.GenerateBottommostFiles();
2434 }
2435 
MaybeInitializeFileMetaData(FileMetaData * file_meta)2436 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2437   if (file_meta->init_stats_from_file ||
2438       file_meta->compensated_file_size > 0) {
2439     return false;
2440   }
2441   std::shared_ptr<const TableProperties> tp;
2442   Status s = GetTableProperties(&tp, file_meta);
2443   file_meta->init_stats_from_file = true;
2444   if (!s.ok()) {
2445     ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2446                     "Unable to load table properties for file %" PRIu64
2447                     " --- %s\n",
2448                     file_meta->fd.GetNumber(), s.ToString().c_str());
2449     return false;
2450   }
2451   if (tp.get() == nullptr) return false;
2452   file_meta->num_entries = tp->num_entries;
2453   file_meta->num_deletions = tp->num_deletions;
2454   file_meta->raw_value_size = tp->raw_value_size;
2455   file_meta->raw_key_size = tp->raw_key_size;
2456 
2457   return true;
2458 }
2459 
UpdateAccumulatedStats(FileMetaData * file_meta)2460 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2461   TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2462                            nullptr);
2463 
2464   assert(file_meta->init_stats_from_file);
2465   accumulated_file_size_ += file_meta->fd.GetFileSize();
2466   accumulated_raw_key_size_ += file_meta->raw_key_size;
2467   accumulated_raw_value_size_ += file_meta->raw_value_size;
2468   accumulated_num_non_deletions_ +=
2469       file_meta->num_entries - file_meta->num_deletions;
2470   accumulated_num_deletions_ += file_meta->num_deletions;
2471 
2472   current_num_non_deletions_ +=
2473       file_meta->num_entries - file_meta->num_deletions;
2474   current_num_deletions_ += file_meta->num_deletions;
2475   current_num_samples_++;
2476 }
2477 
RemoveCurrentStats(FileMetaData * file_meta)2478 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2479   if (file_meta->init_stats_from_file) {
2480     current_num_non_deletions_ -=
2481         file_meta->num_entries - file_meta->num_deletions;
2482     current_num_deletions_ -= file_meta->num_deletions;
2483     current_num_samples_--;
2484   }
2485 }
2486 
UpdateAccumulatedStats(bool update_stats)2487 void Version::UpdateAccumulatedStats(bool update_stats) {
2488   if (update_stats) {
2489     // maximum number of table properties loaded from files.
2490     const int kMaxInitCount = 20;
2491     int init_count = 0;
2492     // here only the first kMaxInitCount files which haven't been
2493     // initialized from file will be updated with num_deletions.
2494     // The motivation here is to cap the maximum I/O per Version creation.
2495     // The reason for choosing files from lower-level instead of higher-level
2496     // is that such design is able to propagate the initialization from
2497     // lower-level to higher-level:  When the num_deletions of lower-level
2498     // files are updated, it will make the lower-level files have accurate
2499     // compensated_file_size, making lower-level to higher-level compaction
2500     // will be triggered, which creates higher-level files whose num_deletions
2501     // will be updated here.
2502     for (int level = 0;
2503          level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2504          ++level) {
2505       for (auto* file_meta : storage_info_.files_[level]) {
2506         if (MaybeInitializeFileMetaData(file_meta)) {
2507           // each FileMeta will be initialized only once.
2508           storage_info_.UpdateAccumulatedStats(file_meta);
2509           // when option "max_open_files" is -1, all the file metadata has
2510           // already been read, so MaybeInitializeFileMetaData() won't incur
2511           // any I/O cost. "max_open_files=-1" means that the table cache passed
2512           // to the VersionSet and then to the ColumnFamilySet has a size of
2513           // TableCache::kInfiniteCapacity
2514           if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2515               TableCache::kInfiniteCapacity) {
2516             continue;
2517           }
2518           if (++init_count >= kMaxInitCount) {
2519             break;
2520           }
2521         }
2522       }
2523     }
2524     // In case all sampled-files contain only deletion entries, then we
2525     // load the table-property of a file in higher-level to initialize
2526     // that value.
2527     for (int level = storage_info_.num_levels_ - 1;
2528          storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2529          --level) {
2530       for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2531            storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2532         if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2533           storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2534         }
2535       }
2536     }
2537   }
2538 
2539   storage_info_.ComputeCompensatedSizes();
2540 }
2541 
ComputeCompensatedSizes()2542 void VersionStorageInfo::ComputeCompensatedSizes() {
2543   static const int kDeletionWeightOnCompaction = 2;
2544   uint64_t average_value_size = GetAverageValueSize();
2545 
2546   // compute the compensated size
2547   for (int level = 0; level < num_levels_; level++) {
2548     for (auto* file_meta : files_[level]) {
2549       // Here we only compute compensated_file_size for those file_meta
2550       // which compensated_file_size is uninitialized (== 0). This is true only
2551       // for files that have been created right now and no other thread has
2552       // access to them. That's why we can safely mutate compensated_file_size.
2553       if (file_meta->compensated_file_size == 0) {
2554         file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2555         // Here we only boost the size of deletion entries of a file only
2556         // when the number of deletion entries is greater than the number of
2557         // non-deletion entries in the file.  The motivation here is that in
2558         // a stable workload, the number of deletion entries should be roughly
2559         // equal to the number of non-deletion entries.  If we compensate the
2560         // size of deletion entries in a stable workload, the deletion
2561         // compensation logic might introduce unwanted effet which changes the
2562         // shape of LSM tree.
2563         if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2564           file_meta->compensated_file_size +=
2565               (file_meta->num_deletions * 2 - file_meta->num_entries) *
2566               average_value_size * kDeletionWeightOnCompaction;
2567         }
2568       }
2569     }
2570   }
2571 }
2572 
MaxInputLevel() const2573 int VersionStorageInfo::MaxInputLevel() const {
2574   if (compaction_style_ == kCompactionStyleLevel) {
2575     return num_levels() - 2;
2576   }
2577   return 0;
2578 }
2579 
MaxOutputLevel(bool allow_ingest_behind) const2580 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2581   if (allow_ingest_behind) {
2582     assert(num_levels() > 1);
2583     return num_levels() - 2;
2584   }
2585   return num_levels() - 1;
2586 }
2587 
EstimateCompactionBytesNeeded(const MutableCFOptions & mutable_cf_options)2588 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2589     const MutableCFOptions& mutable_cf_options) {
2590   // Only implemented for level-based compaction
2591   if (compaction_style_ != kCompactionStyleLevel) {
2592     estimated_compaction_needed_bytes_ = 0;
2593     return;
2594   }
2595 
2596   // Start from Level 0, if level 0 qualifies compaction to level 1,
2597   // we estimate the size of compaction.
2598   // Then we move on to the next level and see whether it qualifies compaction
2599   // to the next level. The size of the level is estimated as the actual size
2600   // on the level plus the input bytes from the previous level if there is any.
2601   // If it exceeds, take the exceeded bytes as compaction input and add the size
2602   // of the compaction size to tatal size.
2603   // We keep doing it to Level 2, 3, etc, until the last level and return the
2604   // accumulated bytes.
2605 
2606   uint64_t bytes_compact_to_next_level = 0;
2607   uint64_t level_size = 0;
2608   for (auto* f : files_[0]) {
2609     level_size += f->fd.GetFileSize();
2610   }
2611   // Level 0
2612   bool level0_compact_triggered = false;
2613   if (static_cast<int>(files_[0].size()) >=
2614           mutable_cf_options.level0_file_num_compaction_trigger ||
2615       level_size >= mutable_cf_options.max_bytes_for_level_base) {
2616     level0_compact_triggered = true;
2617     estimated_compaction_needed_bytes_ = level_size;
2618     bytes_compact_to_next_level = level_size;
2619   } else {
2620     estimated_compaction_needed_bytes_ = 0;
2621   }
2622 
2623   // Level 1 and up.
2624   uint64_t bytes_next_level = 0;
2625   for (int level = base_level(); level <= MaxInputLevel(); level++) {
2626     level_size = 0;
2627     if (bytes_next_level > 0) {
2628 #ifndef NDEBUG
2629       uint64_t level_size2 = 0;
2630       for (auto* f : files_[level]) {
2631         level_size2 += f->fd.GetFileSize();
2632       }
2633       assert(level_size2 == bytes_next_level);
2634 #endif
2635       level_size = bytes_next_level;
2636       bytes_next_level = 0;
2637     } else {
2638       for (auto* f : files_[level]) {
2639         level_size += f->fd.GetFileSize();
2640       }
2641     }
2642     if (level == base_level() && level0_compact_triggered) {
2643       // Add base level size to compaction if level0 compaction triggered.
2644       estimated_compaction_needed_bytes_ += level_size;
2645     }
2646     // Add size added by previous compaction
2647     level_size += bytes_compact_to_next_level;
2648     bytes_compact_to_next_level = 0;
2649     uint64_t level_target = MaxBytesForLevel(level);
2650     if (level_size > level_target) {
2651       bytes_compact_to_next_level = level_size - level_target;
2652       // Estimate the actual compaction fan-out ratio as size ratio between
2653       // the two levels.
2654 
2655       assert(bytes_next_level == 0);
2656       if (level + 1 < num_levels_) {
2657         for (auto* f : files_[level + 1]) {
2658           bytes_next_level += f->fd.GetFileSize();
2659         }
2660       }
2661       if (bytes_next_level > 0) {
2662         assert(level_size > 0);
2663         estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2664             static_cast<double>(bytes_compact_to_next_level) *
2665             (static_cast<double>(bytes_next_level) /
2666                  static_cast<double>(level_size) +
2667              1));
2668       }
2669     }
2670   }
2671 }
2672 
2673 namespace {
GetExpiredTtlFilesCount(const ImmutableOptions & ioptions,const MutableCFOptions & mutable_cf_options,const std::vector<FileMetaData * > & files)2674 uint32_t GetExpiredTtlFilesCount(const ImmutableOptions& ioptions,
2675                                  const MutableCFOptions& mutable_cf_options,
2676                                  const std::vector<FileMetaData*>& files) {
2677   uint32_t ttl_expired_files_count = 0;
2678 
2679   int64_t _current_time;
2680   auto status = ioptions.clock->GetCurrentTime(&_current_time);
2681   if (status.ok()) {
2682     const uint64_t current_time = static_cast<uint64_t>(_current_time);
2683     for (FileMetaData* f : files) {
2684       if (!f->being_compacted) {
2685         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2686         if (oldest_ancester_time != 0 &&
2687             oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2688           ttl_expired_files_count++;
2689         }
2690       }
2691     }
2692   }
2693   return ttl_expired_files_count;
2694 }
2695 }  // anonymous namespace
2696 
ComputeCompactionScore(const ImmutableOptions & immutable_options,const MutableCFOptions & mutable_cf_options)2697 void VersionStorageInfo::ComputeCompactionScore(
2698     const ImmutableOptions& immutable_options,
2699     const MutableCFOptions& mutable_cf_options) {
2700   for (int level = 0; level <= MaxInputLevel(); level++) {
2701     double score;
2702     if (level == 0) {
2703       // We treat level-0 specially by bounding the number of files
2704       // instead of number of bytes for two reasons:
2705       //
2706       // (1) With larger write-buffer sizes, it is nice not to do too
2707       // many level-0 compactions.
2708       //
2709       // (2) The files in level-0 are merged on every read and
2710       // therefore we wish to avoid too many files when the individual
2711       // file size is small (perhaps because of a small write-buffer
2712       // setting, or very high compression ratios, or lots of
2713       // overwrites/deletions).
2714       int num_sorted_runs = 0;
2715       uint64_t total_size = 0;
2716       for (auto* f : files_[level]) {
2717         if (!f->being_compacted) {
2718           total_size += f->compensated_file_size;
2719           num_sorted_runs++;
2720         }
2721       }
2722       if (compaction_style_ == kCompactionStyleUniversal) {
2723         // For universal compaction, we use level0 score to indicate
2724         // compaction score for the whole DB. Adding other levels as if
2725         // they are L0 files.
2726         for (int i = 1; i < num_levels(); i++) {
2727           // Its possible that a subset of the files in a level may be in a
2728           // compaction, due to delete triggered compaction or trivial move.
2729           // In that case, the below check may not catch a level being
2730           // compacted as it only checks the first file. The worst that can
2731           // happen is a scheduled compaction thread will find nothing to do.
2732           if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2733             num_sorted_runs++;
2734           }
2735         }
2736       }
2737 
2738       if (compaction_style_ == kCompactionStyleFIFO) {
2739         score = static_cast<double>(total_size) /
2740                 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2741         if (mutable_cf_options.compaction_options_fifo.allow_compaction ||
2742             mutable_cf_options.compaction_options_fifo.age_for_warm > 0) {
2743           // Warm tier move can happen at any time. It's too expensive to
2744           // check very file's timestamp now. For now, just trigger it
2745           // slightly more frequently than FIFO compaction so that this
2746           // happens first.
2747           score = std::max(
2748               static_cast<double>(num_sorted_runs) /
2749                   mutable_cf_options.level0_file_num_compaction_trigger,
2750               score);
2751         }
2752         if (mutable_cf_options.ttl > 0) {
2753           score = std::max(
2754               static_cast<double>(GetExpiredTtlFilesCount(
2755                   immutable_options, mutable_cf_options, files_[level])),
2756               score);
2757         }
2758       } else {
2759         score = static_cast<double>(num_sorted_runs) /
2760                 mutable_cf_options.level0_file_num_compaction_trigger;
2761         if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2762           // Level-based involves L0->L0 compactions that can lead to oversized
2763           // L0 files. Take into account size as well to avoid later giant
2764           // compactions to the base level.
2765           uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
2766           if (immutable_options.level_compaction_dynamic_level_bytes &&
2767               level_multiplier_ != 0.0) {
2768             // Prevent L0 to Lbase fanout from growing larger than
2769             // `level_multiplier_`. This prevents us from getting stuck picking
2770             // L0 forever even when it is hurting write-amp. That could happen
2771             // in dynamic level compaction's write-burst mode where the base
2772             // level's target size can grow to be enormous.
2773             l0_target_size =
2774                 std::max(l0_target_size,
2775                          static_cast<uint64_t>(level_max_bytes_[base_level_] /
2776                                                level_multiplier_));
2777           }
2778           score =
2779               std::max(score, static_cast<double>(total_size) / l0_target_size);
2780         }
2781       }
2782     } else {
2783       // Compute the ratio of current size to size limit.
2784       uint64_t level_bytes_no_compacting = 0;
2785       for (auto f : files_[level]) {
2786         if (!f->being_compacted) {
2787           level_bytes_no_compacting += f->compensated_file_size;
2788         }
2789       }
2790       score = static_cast<double>(level_bytes_no_compacting) /
2791               MaxBytesForLevel(level);
2792     }
2793     compaction_level_[level] = level;
2794     compaction_score_[level] = score;
2795   }
2796 
2797   // sort all the levels based on their score. Higher scores get listed
2798   // first. Use bubble sort because the number of entries are small.
2799   for (int i = 0; i < num_levels() - 2; i++) {
2800     for (int j = i + 1; j < num_levels() - 1; j++) {
2801       if (compaction_score_[i] < compaction_score_[j]) {
2802         double score = compaction_score_[i];
2803         int level = compaction_level_[i];
2804         compaction_score_[i] = compaction_score_[j];
2805         compaction_level_[i] = compaction_level_[j];
2806         compaction_score_[j] = score;
2807         compaction_level_[j] = level;
2808       }
2809     }
2810   }
2811   ComputeFilesMarkedForCompaction();
2812   ComputeBottommostFilesMarkedForCompaction();
2813   if (mutable_cf_options.ttl > 0) {
2814     ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl);
2815   }
2816   if (mutable_cf_options.periodic_compaction_seconds > 0) {
2817     ComputeFilesMarkedForPeriodicCompaction(
2818         immutable_options, mutable_cf_options.periodic_compaction_seconds);
2819   }
2820 
2821   if (mutable_cf_options.enable_blob_garbage_collection &&
2822       mutable_cf_options.blob_garbage_collection_age_cutoff > 0.0 &&
2823       mutable_cf_options.blob_garbage_collection_force_threshold < 1.0) {
2824     ComputeFilesMarkedForForcedBlobGC(
2825         mutable_cf_options.blob_garbage_collection_age_cutoff,
2826         mutable_cf_options.blob_garbage_collection_force_threshold);
2827   }
2828 
2829   EstimateCompactionBytesNeeded(mutable_cf_options);
2830 }
2831 
ComputeFilesMarkedForCompaction()2832 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2833   files_marked_for_compaction_.clear();
2834   int last_qualify_level = 0;
2835 
2836   // Do not include files from the last level with data
2837   // If table properties collector suggests a file on the last level,
2838   // we should not move it to a new level.
2839   for (int level = num_levels() - 1; level >= 1; level--) {
2840     if (!files_[level].empty()) {
2841       last_qualify_level = level - 1;
2842       break;
2843     }
2844   }
2845 
2846   for (int level = 0; level <= last_qualify_level; level++) {
2847     for (auto* f : files_[level]) {
2848       if (!f->being_compacted && f->marked_for_compaction) {
2849         files_marked_for_compaction_.emplace_back(level, f);
2850       }
2851     }
2852   }
2853 }
2854 
ComputeExpiredTtlFiles(const ImmutableOptions & ioptions,const uint64_t ttl)2855 void VersionStorageInfo::ComputeExpiredTtlFiles(
2856     const ImmutableOptions& ioptions, const uint64_t ttl) {
2857   assert(ttl > 0);
2858 
2859   expired_ttl_files_.clear();
2860 
2861   int64_t _current_time;
2862   auto status = ioptions.clock->GetCurrentTime(&_current_time);
2863   if (!status.ok()) {
2864     return;
2865   }
2866   const uint64_t current_time = static_cast<uint64_t>(_current_time);
2867 
2868   for (int level = 0; level < num_levels() - 1; level++) {
2869     for (FileMetaData* f : files_[level]) {
2870       if (!f->being_compacted) {
2871         uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2872         if (oldest_ancester_time > 0 &&
2873             oldest_ancester_time < (current_time - ttl)) {
2874           expired_ttl_files_.emplace_back(level, f);
2875         }
2876       }
2877     }
2878   }
2879 }
2880 
ComputeFilesMarkedForPeriodicCompaction(const ImmutableOptions & ioptions,const uint64_t periodic_compaction_seconds)2881 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2882     const ImmutableOptions& ioptions,
2883     const uint64_t periodic_compaction_seconds) {
2884   assert(periodic_compaction_seconds > 0);
2885 
2886   files_marked_for_periodic_compaction_.clear();
2887 
2888   int64_t temp_current_time;
2889   auto status = ioptions.clock->GetCurrentTime(&temp_current_time);
2890   if (!status.ok()) {
2891     return;
2892   }
2893   const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2894 
2895   // If periodic_compaction_seconds is larger than current time, periodic
2896   // compaction can't possibly be triggered.
2897   if (periodic_compaction_seconds > current_time) {
2898     return;
2899   }
2900 
2901   const uint64_t allowed_time_limit =
2902       current_time - periodic_compaction_seconds;
2903 
2904   for (int level = 0; level < num_levels(); level++) {
2905     for (auto f : files_[level]) {
2906       if (!f->being_compacted) {
2907         // Compute a file's modification time in the following order:
2908         // 1. Use file_creation_time table property if it is > 0.
2909         // 2. Use creation_time table property if it is > 0.
2910         // 3. Use file's mtime metadata if the above two table properties are 0.
2911         // Don't consider the file at all if the modification time cannot be
2912         // correctly determined based on the above conditions.
2913         uint64_t file_modification_time = f->TryGetFileCreationTime();
2914         if (file_modification_time == kUnknownFileCreationTime) {
2915           file_modification_time = f->TryGetOldestAncesterTime();
2916         }
2917         if (file_modification_time == kUnknownOldestAncesterTime) {
2918           auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2919                                          f->fd.GetPathId());
2920           status = ioptions.env->GetFileModificationTime(
2921               file_path, &file_modification_time);
2922           if (!status.ok()) {
2923             ROCKS_LOG_WARN(ioptions.logger,
2924                            "Can't get file modification time: %s: %s",
2925                            file_path.c_str(), status.ToString().c_str());
2926             continue;
2927           }
2928         }
2929         if (file_modification_time > 0 &&
2930             file_modification_time < allowed_time_limit) {
2931           files_marked_for_periodic_compaction_.emplace_back(level, f);
2932         }
2933       }
2934     }
2935   }
2936 }
2937 
ComputeFilesMarkedForForcedBlobGC(double blob_garbage_collection_age_cutoff,double blob_garbage_collection_force_threshold)2938 void VersionStorageInfo::ComputeFilesMarkedForForcedBlobGC(
2939     double blob_garbage_collection_age_cutoff,
2940     double blob_garbage_collection_force_threshold) {
2941   files_marked_for_forced_blob_gc_.clear();
2942 
2943   if (blob_files_.empty()) {
2944     return;
2945   }
2946 
2947   // Number of blob files eligible for GC based on age
2948   const size_t cutoff_count = static_cast<size_t>(
2949       blob_garbage_collection_age_cutoff * blob_files_.size());
2950   if (!cutoff_count) {
2951     return;
2952   }
2953 
2954   // Compute the sum of total and garbage bytes over the oldest batch of blob
2955   // files. The oldest batch is defined as the set of blob files which are
2956   // kept alive by the same SSTs as the very oldest one. Here is a toy example.
2957   // Let's assume we have three SSTs 1, 2, and 3, and four blob files 10, 11,
2958   // 12, and 13. Also, let's say SSTs 1 and 2 both rely on blob file 10 and
2959   // potentially some higher-numbered ones, while SST 3 relies on blob file 12
2960   // and potentially some higher-numbered ones. Then, the SST to oldest blob
2961   // file mapping is as follows:
2962   //
2963   // SST file number               Oldest blob file number
2964   // 1                             10
2965   // 2                             10
2966   // 3                             12
2967   //
2968   // This is what the same thing looks like from the blob files' POV. (Note that
2969   // the linked SSTs simply denote the inverse mapping of the above.)
2970   //
2971   // Blob file number              Linked SST set
2972   // 10                            {1, 2}
2973   // 11                            {}
2974   // 12                            {3}
2975   // 13                            {}
2976   //
2977   // Then, the oldest batch of blob files consists of blob files 10 and 11,
2978   // and we can get rid of them by forcing the compaction of SSTs 1 and 2.
2979   //
2980   // Note that the overall ratio of garbage computed for the batch has to exceed
2981   // blob_garbage_collection_force_threshold and the entire batch has to be
2982   // eligible for GC according to blob_garbage_collection_age_cutoff in order
2983   // for us to schedule any compactions.
2984   const auto oldest_it = blob_files_.begin();
2985 
2986   const auto& oldest_meta = oldest_it->second;
2987   assert(oldest_meta);
2988 
2989   const auto& linked_ssts = oldest_meta->GetLinkedSsts();
2990   assert(!linked_ssts.empty());
2991 
2992   size_t count = 1;
2993   uint64_t sum_total_blob_bytes = oldest_meta->GetTotalBlobBytes();
2994   uint64_t sum_garbage_blob_bytes = oldest_meta->GetGarbageBlobBytes();
2995 
2996   auto it = oldest_it;
2997   for (++it; it != blob_files_.end(); ++it) {
2998     const auto& meta = it->second;
2999     assert(meta);
3000 
3001     if (!meta->GetLinkedSsts().empty()) {
3002       break;
3003     }
3004 
3005     if (++count > cutoff_count) {
3006       return;
3007     }
3008 
3009     sum_total_blob_bytes += meta->GetTotalBlobBytes();
3010     sum_garbage_blob_bytes += meta->GetGarbageBlobBytes();
3011   }
3012 
3013   if (sum_garbage_blob_bytes <
3014       blob_garbage_collection_force_threshold * sum_total_blob_bytes) {
3015     return;
3016   }
3017 
3018   for (uint64_t sst_file_number : linked_ssts) {
3019     const FileLocation location = GetFileLocation(sst_file_number);
3020     assert(location.IsValid());
3021 
3022     const int level = location.GetLevel();
3023     assert(level >= 0);
3024 
3025     const size_t pos = location.GetPosition();
3026 
3027     FileMetaData* const sst_meta = files_[level][pos];
3028     assert(sst_meta);
3029 
3030     if (sst_meta->being_compacted) {
3031       continue;
3032     }
3033 
3034     files_marked_for_forced_blob_gc_.emplace_back(level, sst_meta);
3035   }
3036 }
3037 
3038 namespace {
3039 
3040 // used to sort files by size
3041 struct Fsize {
3042   size_t index;
3043   FileMetaData* file;
3044 };
3045 
3046 // Comparator that is used to sort files based on their size
3047 // In normal mode: descending size
CompareCompensatedSizeDescending(const Fsize & first,const Fsize & second)3048 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
3049   return (first.file->compensated_file_size >
3050       second.file->compensated_file_size);
3051 }
3052 } // anonymous namespace
3053 
AddFile(int level,FileMetaData * f)3054 void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
3055   auto& level_files = files_[level];
3056   level_files.push_back(f);
3057 
3058   f->refs++;
3059 
3060   const uint64_t file_number = f->fd.GetNumber();
3061 
3062   assert(file_locations_.find(file_number) == file_locations_.end());
3063   file_locations_.emplace(file_number,
3064                           FileLocation(level, level_files.size() - 1));
3065 }
3066 
AddBlobFile(std::shared_ptr<BlobFileMetaData> blob_file_meta)3067 void VersionStorageInfo::AddBlobFile(
3068     std::shared_ptr<BlobFileMetaData> blob_file_meta) {
3069   assert(blob_file_meta);
3070 
3071   const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
3072 
3073   auto it = blob_files_.lower_bound(blob_file_number);
3074   assert(it == blob_files_.end() || it->first != blob_file_number);
3075 
3076   blob_files_.insert(
3077       it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
3078 }
3079 
3080 // Version::PrepareApply() need to be called before calling the function, or
3081 // following functions called:
3082 // 1. UpdateNumNonEmptyLevels();
3083 // 2. CalculateBaseBytes();
3084 // 3. UpdateFilesByCompactionPri();
3085 // 4. GenerateFileIndexer();
3086 // 5. GenerateLevelFilesBrief();
3087 // 6. GenerateLevel0NonOverlapping();
3088 // 7. GenerateBottommostFiles();
SetFinalized()3089 void VersionStorageInfo::SetFinalized() {
3090   finalized_ = true;
3091 #ifndef NDEBUG
3092   if (compaction_style_ != kCompactionStyleLevel) {
3093     // Not level based compaction.
3094     return;
3095   }
3096   assert(base_level_ < 0 || num_levels() == 1 ||
3097          (base_level_ >= 1 && base_level_ < num_levels()));
3098   // Verify all levels newer than base_level are empty except L0
3099   for (int level = 1; level < base_level(); level++) {
3100     assert(NumLevelBytes(level) == 0);
3101   }
3102   uint64_t max_bytes_prev_level = 0;
3103   for (int level = base_level(); level < num_levels() - 1; level++) {
3104     if (LevelFiles(level).size() == 0) {
3105       continue;
3106     }
3107     assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
3108     max_bytes_prev_level = MaxBytesForLevel(level);
3109   }
3110   int num_empty_non_l0_level = 0;
3111   for (int level = 0; level < num_levels(); level++) {
3112     assert(LevelFiles(level).size() == 0 ||
3113            LevelFiles(level).size() == LevelFilesBrief(level).num_files);
3114     if (level > 0 && NumLevelBytes(level) > 0) {
3115       num_empty_non_l0_level++;
3116     }
3117     if (LevelFiles(level).size() > 0) {
3118       assert(level < num_non_empty_levels());
3119     }
3120   }
3121   assert(compaction_level_.size() > 0);
3122   assert(compaction_level_.size() == compaction_score_.size());
3123 #endif
3124 }
3125 
UpdateNumNonEmptyLevels()3126 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
3127   num_non_empty_levels_ = num_levels_;
3128   for (int i = num_levels_ - 1; i >= 0; i--) {
3129     if (files_[i].size() != 0) {
3130       return;
3131     } else {
3132       num_non_empty_levels_ = i;
3133     }
3134   }
3135 }
3136 
3137 namespace {
3138 // Sort `temp` based on ratio of overlapping size over file size
SortFileByOverlappingRatio(const InternalKeyComparator & icmp,const std::vector<FileMetaData * > & files,const std::vector<FileMetaData * > & next_level_files,std::vector<Fsize> * temp)3139 void SortFileByOverlappingRatio(
3140     const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
3141     const std::vector<FileMetaData*>& next_level_files,
3142     std::vector<Fsize>* temp) {
3143   std::unordered_map<uint64_t, uint64_t> file_to_order;
3144   auto next_level_it = next_level_files.begin();
3145 
3146   for (auto& file : files) {
3147     uint64_t overlapping_bytes = 0;
3148     // Skip files in next level that is smaller than current file
3149     while (next_level_it != next_level_files.end() &&
3150            icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
3151       next_level_it++;
3152     }
3153 
3154     while (next_level_it != next_level_files.end() &&
3155            icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
3156       overlapping_bytes += (*next_level_it)->fd.file_size;
3157 
3158       if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
3159         // next level file cross large boundary of current file.
3160         break;
3161       }
3162       next_level_it++;
3163     }
3164 
3165     assert(file->compensated_file_size != 0);
3166     file_to_order[file->fd.GetNumber()] =
3167         overlapping_bytes * 1024u / file->compensated_file_size;
3168   }
3169 
3170   std::sort(temp->begin(), temp->end(),
3171             [&](const Fsize& f1, const Fsize& f2) -> bool {
3172               return file_to_order[f1.file->fd.GetNumber()] <
3173                      file_to_order[f2.file->fd.GetNumber()];
3174             });
3175 }
3176 }  // namespace
3177 
UpdateFilesByCompactionPri(CompactionPri compaction_pri)3178 void VersionStorageInfo::UpdateFilesByCompactionPri(
3179     CompactionPri compaction_pri) {
3180   if (compaction_style_ == kCompactionStyleNone ||
3181       compaction_style_ == kCompactionStyleFIFO ||
3182       compaction_style_ == kCompactionStyleUniversal) {
3183     // don't need this
3184     return;
3185   }
3186   // No need to sort the highest level because it is never compacted.
3187   for (int level = 0; level < num_levels() - 1; level++) {
3188     const std::vector<FileMetaData*>& files = files_[level];
3189     auto& files_by_compaction_pri = files_by_compaction_pri_[level];
3190     assert(files_by_compaction_pri.size() == 0);
3191 
3192     // populate a temp vector for sorting based on size
3193     std::vector<Fsize> temp(files.size());
3194     for (size_t i = 0; i < files.size(); i++) {
3195       temp[i].index = i;
3196       temp[i].file = files[i];
3197     }
3198 
3199     // sort the top number_of_files_to_sort_ based on file size
3200     size_t num = VersionStorageInfo::kNumberFilesToSort;
3201     if (num > temp.size()) {
3202       num = temp.size();
3203     }
3204     switch (compaction_pri) {
3205       case kByCompensatedSize:
3206         std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
3207                           CompareCompensatedSizeDescending);
3208         break;
3209       case kOldestLargestSeqFirst:
3210         std::sort(temp.begin(), temp.end(),
3211                   [](const Fsize& f1, const Fsize& f2) -> bool {
3212                     return f1.file->fd.largest_seqno <
3213                            f2.file->fd.largest_seqno;
3214                   });
3215         break;
3216       case kOldestSmallestSeqFirst:
3217         std::sort(temp.begin(), temp.end(),
3218                   [](const Fsize& f1, const Fsize& f2) -> bool {
3219                     return f1.file->fd.smallest_seqno <
3220                            f2.file->fd.smallest_seqno;
3221                   });
3222         break;
3223       case kMinOverlappingRatio:
3224         SortFileByOverlappingRatio(*internal_comparator_, files_[level],
3225                                    files_[level + 1], &temp);
3226         break;
3227       default:
3228         assert(false);
3229     }
3230     assert(temp.size() == files.size());
3231 
3232     // initialize files_by_compaction_pri_
3233     for (size_t i = 0; i < temp.size(); i++) {
3234       files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
3235     }
3236     next_file_to_compact_by_size_[level] = 0;
3237     assert(files_[level].size() == files_by_compaction_pri_[level].size());
3238   }
3239 }
3240 
GenerateLevel0NonOverlapping()3241 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
3242   assert(!finalized_);
3243   level0_non_overlapping_ = true;
3244   if (level_files_brief_.size() == 0) {
3245     return;
3246   }
3247 
3248   // A copy of L0 files sorted by smallest key
3249   std::vector<FdWithKeyRange> level0_sorted_file(
3250       level_files_brief_[0].files,
3251       level_files_brief_[0].files + level_files_brief_[0].num_files);
3252   std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
3253             [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
3254               return (internal_comparator_->Compare(f1.smallest_key,
3255                                                     f2.smallest_key) < 0);
3256             });
3257 
3258   for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
3259     FdWithKeyRange& f = level0_sorted_file[i];
3260     FdWithKeyRange& prev = level0_sorted_file[i - 1];
3261     if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
3262       level0_non_overlapping_ = false;
3263       break;
3264     }
3265   }
3266 }
3267 
GenerateBottommostFiles()3268 void VersionStorageInfo::GenerateBottommostFiles() {
3269   assert(!finalized_);
3270   assert(bottommost_files_.empty());
3271   for (size_t level = 0; level < level_files_brief_.size(); ++level) {
3272     for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
3273          ++file_idx) {
3274       const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
3275       int l0_file_idx;
3276       if (level == 0) {
3277         l0_file_idx = static_cast<int>(file_idx);
3278       } else {
3279         l0_file_idx = -1;
3280       }
3281       Slice smallest_user_key = ExtractUserKey(f.smallest_key);
3282       Slice largest_user_key = ExtractUserKey(f.largest_key);
3283       if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
3284                                          static_cast<int>(level),
3285                                          l0_file_idx)) {
3286         bottommost_files_.emplace_back(static_cast<int>(level),
3287                                        f.file_metadata);
3288       }
3289     }
3290   }
3291 }
3292 
UpdateOldestSnapshot(SequenceNumber seqnum)3293 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
3294   assert(seqnum >= oldest_snapshot_seqnum_);
3295   oldest_snapshot_seqnum_ = seqnum;
3296   if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
3297     ComputeBottommostFilesMarkedForCompaction();
3298   }
3299 }
3300 
ComputeBottommostFilesMarkedForCompaction()3301 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
3302   bottommost_files_marked_for_compaction_.clear();
3303   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3304   for (auto& level_and_file : bottommost_files_) {
3305     if (!level_and_file.second->being_compacted &&
3306         level_and_file.second->fd.largest_seqno != 0) {
3307       // largest_seqno might be nonzero due to containing the final key in an
3308       // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
3309       // ensures the file really contains deleted or overwritten keys.
3310       if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
3311         bottommost_files_marked_for_compaction_.push_back(level_and_file);
3312       } else {
3313         bottommost_files_mark_threshold_ =
3314             std::min(bottommost_files_mark_threshold_,
3315                      level_and_file.second->fd.largest_seqno);
3316       }
3317     }
3318   }
3319 }
3320 
Ref()3321 void Version::Ref() {
3322   ++refs_;
3323 }
3324 
Unref()3325 bool Version::Unref() {
3326   assert(refs_ >= 1);
3327   --refs_;
3328   if (refs_ == 0) {
3329     delete this;
3330     return true;
3331   }
3332   return false;
3333 }
3334 
OverlapInLevel(int level,const Slice * smallest_user_key,const Slice * largest_user_key)3335 bool VersionStorageInfo::OverlapInLevel(int level,
3336                                         const Slice* smallest_user_key,
3337                                         const Slice* largest_user_key) {
3338   if (level >= num_non_empty_levels_) {
3339     // empty level, no overlap
3340     return false;
3341   }
3342   return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
3343                                level_files_brief_[level], smallest_user_key,
3344                                largest_user_key);
3345 }
3346 
3347 // Store in "*inputs" all files in "level" that overlap [begin,end]
3348 // If hint_index is specified, then it points to a file in the
3349 // overlapping range.
3350 // The file_index returns a pointer to any file in an overlapping range.
GetOverlappingInputs(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool expand_range,InternalKey ** next_smallest) const3351 void VersionStorageInfo::GetOverlappingInputs(
3352     int level, const InternalKey* begin, const InternalKey* end,
3353     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3354     bool expand_range, InternalKey** next_smallest) const {
3355   if (level >= num_non_empty_levels_) {
3356     // this level is empty, no overlapping inputs
3357     return;
3358   }
3359 
3360   inputs->clear();
3361   if (file_index) {
3362     *file_index = -1;
3363   }
3364   const Comparator* user_cmp = user_comparator_;
3365   if (level > 0) {
3366     GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
3367                                           file_index, false, next_smallest);
3368     return;
3369   }
3370 
3371   if (next_smallest) {
3372     // next_smallest key only makes sense for non-level 0, where files are
3373     // non-overlapping
3374     *next_smallest = nullptr;
3375   }
3376 
3377   Slice user_begin, user_end;
3378   if (begin != nullptr) {
3379     user_begin = begin->user_key();
3380   }
3381   if (end != nullptr) {
3382     user_end = end->user_key();
3383   }
3384 
3385   // index stores the file index need to check.
3386   std::list<size_t> index;
3387   for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
3388     index.emplace_back(i);
3389   }
3390 
3391   while (!index.empty()) {
3392     bool found_overlapping_file = false;
3393     auto iter = index.begin();
3394     while (iter != index.end()) {
3395       FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
3396       const Slice file_start = ExtractUserKey(f->smallest_key);
3397       const Slice file_limit = ExtractUserKey(f->largest_key);
3398       if (begin != nullptr &&
3399           user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
3400         // "f" is completely before specified range; skip it
3401         iter++;
3402       } else if (end != nullptr &&
3403                  user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
3404         // "f" is completely after specified range; skip it
3405         iter++;
3406       } else {
3407         // if overlap
3408         inputs->emplace_back(files_[level][*iter]);
3409         found_overlapping_file = true;
3410         // record the first file index.
3411         if (file_index && *file_index == -1) {
3412           *file_index = static_cast<int>(*iter);
3413         }
3414         // the related file is overlap, erase to avoid checking again.
3415         iter = index.erase(iter);
3416         if (expand_range) {
3417           if (begin != nullptr &&
3418               user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
3419             user_begin = file_start;
3420           }
3421           if (end != nullptr &&
3422               user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
3423             user_end = file_limit;
3424           }
3425         }
3426       }
3427     }
3428     // if all the files left are not overlap, break
3429     if (!found_overlapping_file) {
3430       break;
3431     }
3432   }
3433 }
3434 
3435 // Store in "*inputs" files in "level" that within range [begin,end]
3436 // Guarantee a "clean cut" boundary between the files in inputs
3437 // and the surrounding files and the maxinum number of files.
3438 // This will ensure that no parts of a key are lost during compaction.
3439 // If hint_index is specified, then it points to a file in the range.
3440 // The file_index returns a pointer to any file in an overlapping range.
GetCleanInputsWithinInterval(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index) const3441 void VersionStorageInfo::GetCleanInputsWithinInterval(
3442     int level, const InternalKey* begin, const InternalKey* end,
3443     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
3444   inputs->clear();
3445   if (file_index) {
3446     *file_index = -1;
3447   }
3448   if (level >= num_non_empty_levels_ || level == 0 ||
3449       level_files_brief_[level].num_files == 0) {
3450     // this level is empty, no inputs within range
3451     // also don't support clean input interval within L0
3452     return;
3453   }
3454 
3455   GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3456                                         hint_index, file_index,
3457                                         true /* within_interval */);
3458 }
3459 
3460 // Store in "*inputs" all files in "level" that overlap [begin,end]
3461 // Employ binary search to find at least one file that overlaps the
3462 // specified range. From that file, iterate backwards and
3463 // forwards to find all overlapping files.
3464 // if within_range is set, then only store the maximum clean inputs
3465 // within range [begin, end]. "clean" means there is a boundary
3466 // between the files in "*inputs" and the surrounding files
GetOverlappingInputsRangeBinarySearch(int level,const InternalKey * begin,const InternalKey * end,std::vector<FileMetaData * > * inputs,int hint_index,int * file_index,bool within_interval,InternalKey ** next_smallest) const3467 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3468     int level, const InternalKey* begin, const InternalKey* end,
3469     std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3470     bool within_interval, InternalKey** next_smallest) const {
3471   assert(level > 0);
3472 
3473   auto user_cmp = user_comparator_;
3474   const FdWithKeyRange* files = level_files_brief_[level].files;
3475   const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3476 
3477   // begin to use binary search to find lower bound
3478   // and upper bound.
3479   int start_index = 0;
3480   int end_index = num_files;
3481 
3482   if (begin != nullptr) {
3483     // if within_interval is true, with file_key would find
3484     // not overlapping ranges in std::lower_bound.
3485     auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3486                                              const InternalKey* k) {
3487       auto& file_key = within_interval ? f.file_metadata->smallest
3488                                        : f.file_metadata->largest;
3489       return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3490     };
3491 
3492     start_index = static_cast<int>(
3493         std::lower_bound(files,
3494                          files + (hint_index == -1 ? num_files : hint_index),
3495                          begin, cmp) -
3496         files);
3497 
3498     if (start_index > 0 && within_interval) {
3499       bool is_overlapping = true;
3500       while (is_overlapping && start_index < num_files) {
3501         auto& pre_limit = files[start_index - 1].file_metadata->largest;
3502         auto& cur_start = files[start_index].file_metadata->smallest;
3503         is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3504         start_index += is_overlapping;
3505       }
3506     }
3507   }
3508 
3509   if (end != nullptr) {
3510     // if within_interval is true, with file_key would find
3511     // not overlapping ranges in std::upper_bound.
3512     auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3513                                              const FdWithKeyRange& f) {
3514       auto& file_key = within_interval ? f.file_metadata->largest
3515                                        : f.file_metadata->smallest;
3516       return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3517     };
3518 
3519     end_index = static_cast<int>(
3520         std::upper_bound(files + start_index, files + num_files, end, cmp) -
3521         files);
3522 
3523     if (end_index < num_files && within_interval) {
3524       bool is_overlapping = true;
3525       while (is_overlapping && end_index > start_index) {
3526         auto& next_start = files[end_index].file_metadata->smallest;
3527         auto& cur_limit = files[end_index - 1].file_metadata->largest;
3528         is_overlapping =
3529             sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3530         end_index -= is_overlapping;
3531       }
3532     }
3533   }
3534 
3535   assert(start_index <= end_index);
3536 
3537   // If there were no overlapping files, return immediately.
3538   if (start_index == end_index) {
3539     if (next_smallest) {
3540       *next_smallest = nullptr;
3541     }
3542     return;
3543   }
3544 
3545   assert(start_index < end_index);
3546 
3547   // returns the index where an overlap is found
3548   if (file_index) {
3549     *file_index = start_index;
3550   }
3551 
3552   // insert overlapping files into vector
3553   for (int i = start_index; i < end_index; i++) {
3554     inputs->push_back(files_[level][i]);
3555   }
3556 
3557   if (next_smallest != nullptr) {
3558     // Provide the next key outside the range covered by inputs
3559     if (end_index < static_cast<int>(files_[level].size())) {
3560       **next_smallest = files_[level][end_index]->smallest;
3561     } else {
3562       *next_smallest = nullptr;
3563     }
3564   }
3565 }
3566 
NumLevelBytes(int level) const3567 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3568   assert(level >= 0);
3569   assert(level < num_levels());
3570   return TotalFileSize(files_[level]);
3571 }
3572 
LevelSummary(LevelSummaryStorage * scratch) const3573 const char* VersionStorageInfo::LevelSummary(
3574     LevelSummaryStorage* scratch) const {
3575   int len = 0;
3576   if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3577     assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3578     if (level_multiplier_ != 0.0) {
3579       len = snprintf(
3580           scratch->buffer, sizeof(scratch->buffer),
3581           "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3582           base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3583     }
3584   }
3585   len +=
3586       snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3587   for (int i = 0; i < num_levels(); i++) {
3588     int sz = sizeof(scratch->buffer) - len;
3589     int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3590     if (ret < 0 || ret >= sz) break;
3591     len += ret;
3592   }
3593   if (len > 0) {
3594     // overwrite the last space
3595     --len;
3596   }
3597   len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3598                   "] max score %.2f", compaction_score_[0]);
3599 
3600   if (!files_marked_for_compaction_.empty()) {
3601     snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3602              " (%" ROCKSDB_PRIszt " files need compaction)",
3603              files_marked_for_compaction_.size());
3604   }
3605 
3606   return scratch->buffer;
3607 }
3608 
LevelFileSummary(FileSummaryStorage * scratch,int level) const3609 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3610                                                  int level) const {
3611   int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3612   for (const auto& f : files_[level]) {
3613     int sz = sizeof(scratch->buffer) - len;
3614     char sztxt[16];
3615     AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3616     int ret = snprintf(scratch->buffer + len, sz,
3617                        "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3618                        f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3619                        static_cast<int>(f->being_compacted));
3620     if (ret < 0 || ret >= sz)
3621       break;
3622     len += ret;
3623   }
3624   // overwrite the last space (only if files_[level].size() is non-zero)
3625   if (files_[level].size() && len > 0) {
3626     --len;
3627   }
3628   snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3629   return scratch->buffer;
3630 }
3631 
MaxNextLevelOverlappingBytes()3632 uint64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3633   uint64_t result = 0;
3634   std::vector<FileMetaData*> overlaps;
3635   for (int level = 1; level < num_levels() - 1; level++) {
3636     for (const auto& f : files_[level]) {
3637       GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3638       const uint64_t sum = TotalFileSize(overlaps);
3639       if (sum > result) {
3640         result = sum;
3641       }
3642     }
3643   }
3644   return result;
3645 }
3646 
MaxBytesForLevel(int level) const3647 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3648   // Note: the result for level zero is not really used since we set
3649   // the level-0 compaction threshold based on number of files.
3650   assert(level >= 0);
3651   assert(level < static_cast<int>(level_max_bytes_.size()));
3652   return level_max_bytes_[level];
3653 }
3654 
CalculateBaseBytes(const ImmutableOptions & ioptions,const MutableCFOptions & options)3655 void VersionStorageInfo::CalculateBaseBytes(const ImmutableOptions& ioptions,
3656                                             const MutableCFOptions& options) {
3657   // Special logic to set number of sorted runs.
3658   // It is to match the previous behavior when all files are in L0.
3659   int num_l0_count = static_cast<int>(files_[0].size());
3660   if (compaction_style_ == kCompactionStyleUniversal) {
3661     // For universal compaction, we use level0 score to indicate
3662     // compaction score for the whole DB. Adding other levels as if
3663     // they are L0 files.
3664     for (int i = 1; i < num_levels(); i++) {
3665       if (!files_[i].empty()) {
3666         num_l0_count++;
3667       }
3668     }
3669   }
3670   set_l0_delay_trigger_count(num_l0_count);
3671 
3672   level_max_bytes_.resize(ioptions.num_levels);
3673   if (!ioptions.level_compaction_dynamic_level_bytes) {
3674     base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3675 
3676     // Calculate for static bytes base case
3677     for (int i = 0; i < ioptions.num_levels; ++i) {
3678       if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3679         level_max_bytes_[i] = options.max_bytes_for_level_base;
3680       } else if (i > 1) {
3681         level_max_bytes_[i] = MultiplyCheckOverflow(
3682             MultiplyCheckOverflow(level_max_bytes_[i - 1],
3683                                   options.max_bytes_for_level_multiplier),
3684             options.MaxBytesMultiplerAdditional(i - 1));
3685       } else {
3686         level_max_bytes_[i] = options.max_bytes_for_level_base;
3687       }
3688     }
3689   } else {
3690     uint64_t max_level_size = 0;
3691 
3692     int first_non_empty_level = -1;
3693     // Find size of non-L0 level of most data.
3694     // Cannot use the size of the last level because it can be empty or less
3695     // than previous levels after compaction.
3696     for (int i = 1; i < num_levels_; i++) {
3697       uint64_t total_size = 0;
3698       for (const auto& f : files_[i]) {
3699         total_size += f->fd.GetFileSize();
3700       }
3701       if (total_size > 0 && first_non_empty_level == -1) {
3702         first_non_empty_level = i;
3703       }
3704       if (total_size > max_level_size) {
3705         max_level_size = total_size;
3706       }
3707     }
3708 
3709     // Prefill every level's max bytes to disallow compaction from there.
3710     for (int i = 0; i < num_levels_; i++) {
3711       level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3712     }
3713 
3714     if (max_level_size == 0) {
3715       // No data for L1 and up. L0 compacts to last level directly.
3716       // No compaction from L1+ needs to be scheduled.
3717       base_level_ = num_levels_ - 1;
3718     } else {
3719       uint64_t l0_size = 0;
3720       for (const auto& f : files_[0]) {
3721         l0_size += f->fd.GetFileSize();
3722       }
3723 
3724       uint64_t base_bytes_max =
3725           std::max(options.max_bytes_for_level_base, l0_size);
3726       uint64_t base_bytes_min = static_cast<uint64_t>(
3727           base_bytes_max / options.max_bytes_for_level_multiplier);
3728 
3729       // Try whether we can make last level's target size to be max_level_size
3730       uint64_t cur_level_size = max_level_size;
3731       for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3732         // Round up after dividing
3733         cur_level_size = static_cast<uint64_t>(
3734             cur_level_size / options.max_bytes_for_level_multiplier);
3735       }
3736 
3737       // Calculate base level and its size.
3738       uint64_t base_level_size;
3739       if (cur_level_size <= base_bytes_min) {
3740         // Case 1. If we make target size of last level to be max_level_size,
3741         // target size of the first non-empty level would be smaller than
3742         // base_bytes_min. We set it be base_bytes_min.
3743         base_level_size = base_bytes_min + 1U;
3744         base_level_ = first_non_empty_level;
3745         ROCKS_LOG_INFO(ioptions.logger,
3746                        "More existing levels in DB than needed. "
3747                        "max_bytes_for_level_multiplier may not be guaranteed.");
3748       } else {
3749         // Find base level (where L0 data is compacted to).
3750         base_level_ = first_non_empty_level;
3751         while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3752           --base_level_;
3753           cur_level_size = static_cast<uint64_t>(
3754               cur_level_size / options.max_bytes_for_level_multiplier);
3755         }
3756         if (cur_level_size > base_bytes_max) {
3757           // Even L1 will be too large
3758           assert(base_level_ == 1);
3759           base_level_size = base_bytes_max;
3760         } else {
3761           base_level_size = cur_level_size;
3762         }
3763       }
3764 
3765       level_multiplier_ = options.max_bytes_for_level_multiplier;
3766       assert(base_level_size > 0);
3767       if (l0_size > base_level_size &&
3768           (l0_size > options.max_bytes_for_level_base ||
3769            static_cast<int>(files_[0].size() / 2) >=
3770                options.level0_file_num_compaction_trigger)) {
3771         // We adjust the base level according to actual L0 size, and adjust
3772         // the level multiplier accordingly, when:
3773         //   1. the L0 size is larger than level size base, or
3774         //   2. number of L0 files reaches twice the L0->L1 compaction trigger
3775         // We don't do this otherwise to keep the LSM-tree structure stable
3776         // unless the L0 compaction is backlogged.
3777         base_level_size = l0_size;
3778         if (base_level_ == num_levels_ - 1) {
3779           level_multiplier_ = 1.0;
3780         } else {
3781           level_multiplier_ = std::pow(
3782               static_cast<double>(max_level_size) /
3783                   static_cast<double>(base_level_size),
3784               1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3785         }
3786       }
3787 
3788       uint64_t level_size = base_level_size;
3789       for (int i = base_level_; i < num_levels_; i++) {
3790         if (i > base_level_) {
3791           level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3792         }
3793         // Don't set any level below base_bytes_max. Otherwise, the LSM can
3794         // assume an hourglass shape where L1+ sizes are smaller than L0. This
3795         // causes compaction scoring, which depends on level sizes, to favor L1+
3796         // at the expense of L0, which may fill up and stall.
3797         level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3798       }
3799     }
3800   }
3801 }
3802 
EstimateLiveDataSize() const3803 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3804   // Estimate the live data size by adding up the size of a maximal set of
3805   // sst files with no range overlap in same or higher level. The less
3806   // compacted, the more optimistic (smaller) this estimate is. Also,
3807   // for multiple sorted runs within a level, file order will matter.
3808   uint64_t size = 0;
3809 
3810   auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3811     return internal_comparator_->Compare(*x, *y) < 0;
3812   };
3813   // (Ordered) map of largest keys in files being included in size estimate
3814   std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3815 
3816   for (int l = num_levels_ - 1; l >= 0; l--) {
3817     bool found_end = false;
3818     for (auto file : files_[l]) {
3819       // Find the first file already included with largest key is larger than
3820       // the smallest key of `file`. If that file does not overlap with the
3821       // current file, none of the files in the map does. If there is
3822       // no potential overlap, we can safely insert the rest of this level
3823       // (if the level is not 0) into the map without checking again because
3824       // the elements in the level are sorted and non-overlapping.
3825       auto lb = (found_end && l != 0) ?
3826         ranges.end() : ranges.lower_bound(&file->smallest);
3827       found_end = (lb == ranges.end());
3828       if (found_end || internal_comparator_->Compare(
3829             file->largest, (*lb).second->smallest) < 0) {
3830           ranges.emplace_hint(lb, &file->largest, file);
3831           size += file->fd.file_size;
3832       }
3833     }
3834   }
3835   // For BlobDB, the result also includes the exact value of live bytes in the
3836   // blob files of the version.
3837   const auto& blobFiles = GetBlobFiles();
3838   for (const auto& pair : blobFiles) {
3839     const auto& meta = pair.second;
3840     size += meta->GetTotalBlobBytes();
3841     size -= meta->GetGarbageBlobBytes();
3842   }
3843   return size;
3844 }
3845 
RangeMightExistAfterSortedRun(const Slice & smallest_user_key,const Slice & largest_user_key,int last_level,int last_l0_idx)3846 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3847     const Slice& smallest_user_key, const Slice& largest_user_key,
3848     int last_level, int last_l0_idx) {
3849   assert((last_l0_idx != -1) == (last_level == 0));
3850   // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3851   // bottommost only if it's the oldest L0 file and there are no files on older
3852   // levels. It'd be better to consider it bottommost if there's no overlap in
3853   // older levels/files.
3854   if (last_level == 0 &&
3855       last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3856     return true;
3857   }
3858 
3859   // Checks whether there are files living beyond the `last_level`. If lower
3860   // levels have files, it checks for overlap between [`smallest_key`,
3861   // `largest_key`] and those files. Bottomlevel optimizations can be made if
3862   // there are no files in lower levels or if there is no overlap with the files
3863   // in the lower levels.
3864   for (int level = last_level + 1; level < num_levels(); level++) {
3865     // The range is not in the bottommost level if there are files in lower
3866     // levels when the `last_level` is 0 or if there are files in lower levels
3867     // which overlap with [`smallest_key`, `largest_key`].
3868     if (files_[level].size() > 0 &&
3869         (last_level == 0 ||
3870          OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3871       return true;
3872     }
3873   }
3874   return false;
3875 }
3876 
AddLiveFiles(std::vector<uint64_t> * live_table_files,std::vector<uint64_t> * live_blob_files) const3877 void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
3878                            std::vector<uint64_t>* live_blob_files) const {
3879   assert(live_table_files);
3880   assert(live_blob_files);
3881 
3882   for (int level = 0; level < storage_info_.num_levels(); ++level) {
3883     const auto& level_files = storage_info_.LevelFiles(level);
3884     for (const auto& meta : level_files) {
3885       assert(meta);
3886 
3887       live_table_files->emplace_back(meta->fd.GetNumber());
3888     }
3889   }
3890 
3891   const auto& blob_files = storage_info_.GetBlobFiles();
3892   for (const auto& pair : blob_files) {
3893     const auto& meta = pair.second;
3894     assert(meta);
3895 
3896     live_blob_files->emplace_back(meta->GetBlobFileNumber());
3897   }
3898 }
3899 
DebugString(bool hex,bool print_stats) const3900 std::string Version::DebugString(bool hex, bool print_stats) const {
3901   std::string r;
3902   for (int level = 0; level < storage_info_.num_levels_; level++) {
3903     // E.g.,
3904     //   --- level 1 ---
3905     //   17:123[1 .. 124]['a' .. 'd']
3906     //   20:43[124 .. 128]['e' .. 'g']
3907     //
3908     // if print_stats=true:
3909     //   17:123[1 .. 124]['a' .. 'd'](4096)
3910     r.append("--- level ");
3911     AppendNumberTo(&r, level);
3912     r.append(" --- version# ");
3913     AppendNumberTo(&r, version_number_);
3914     r.append(" ---\n");
3915     const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3916     for (size_t i = 0; i < files.size(); i++) {
3917       r.push_back(' ');
3918       AppendNumberTo(&r, files[i]->fd.GetNumber());
3919       r.push_back(':');
3920       AppendNumberTo(&r, files[i]->fd.GetFileSize());
3921       r.append("[");
3922       AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3923       r.append(" .. ");
3924       AppendNumberTo(&r, files[i]->fd.largest_seqno);
3925       r.append("]");
3926       r.append("[");
3927       r.append(files[i]->smallest.DebugString(hex));
3928       r.append(" .. ");
3929       r.append(files[i]->largest.DebugString(hex));
3930       r.append("]");
3931       if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3932         r.append(" blob_file:");
3933         AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3934       }
3935       if (print_stats) {
3936         r.append("(");
3937         r.append(ToString(
3938             files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3939         r.append(")");
3940       }
3941       r.append("\n");
3942     }
3943   }
3944 
3945   const auto& blob_files = storage_info_.GetBlobFiles();
3946   if (!blob_files.empty()) {
3947     r.append("--- blob files --- version# ");
3948     AppendNumberTo(&r, version_number_);
3949     r.append(" ---\n");
3950     for (const auto& pair : blob_files) {
3951       const auto& blob_file_meta = pair.second;
3952       assert(blob_file_meta);
3953 
3954       r.append(blob_file_meta->DebugString());
3955       r.push_back('\n');
3956     }
3957   }
3958 
3959   return r;
3960 }
3961 
3962 // this is used to batch writes to the manifest file
3963 struct VersionSet::ManifestWriter {
3964   Status status;
3965   bool done;
3966   InstrumentedCondVar cv;
3967   ColumnFamilyData* cfd;
3968   const MutableCFOptions mutable_cf_options;
3969   const autovector<VersionEdit*>& edit_list;
3970   const std::function<void(const Status&)> manifest_write_callback;
3971 
ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3972   explicit ManifestWriter(
3973       InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3974       const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
3975       const std::function<void(const Status&)>& manifest_wcb)
3976       : done(false),
3977         cv(mu),
3978         cfd(_cfd),
3979         mutable_cf_options(cf_options),
3980         edit_list(e),
3981         manifest_write_callback(manifest_wcb) {}
~ManifestWriterROCKSDB_NAMESPACE::VersionSet::ManifestWriter3982   ~ManifestWriter() { status.PermitUncheckedError(); }
3983 
IsAllWalEditsROCKSDB_NAMESPACE::VersionSet::ManifestWriter3984   bool IsAllWalEdits() const {
3985     bool all_wal_edits = true;
3986     for (const auto& e : edit_list) {
3987       if (!e->IsWalManipulation()) {
3988         all_wal_edits = false;
3989         break;
3990       }
3991     }
3992     return all_wal_edits;
3993   }
3994 };
3995 
AddEdit(VersionEdit * edit)3996 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3997   assert(edit);
3998   if (edit->is_in_atomic_group_) {
3999     TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
4000     if (replay_buffer_.empty()) {
4001       replay_buffer_.resize(edit->remaining_entries_ + 1);
4002       TEST_SYNC_POINT_CALLBACK(
4003           "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
4004     }
4005     read_edits_in_atomic_group_++;
4006     if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
4007         static_cast<uint32_t>(replay_buffer_.size())) {
4008       TEST_SYNC_POINT_CALLBACK(
4009           "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
4010       return Status::Corruption("corrupted atomic group");
4011     }
4012     replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
4013     if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
4014       TEST_SYNC_POINT_CALLBACK(
4015           "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
4016       return Status::OK();
4017     }
4018     return Status::OK();
4019   }
4020 
4021   // A normal edit.
4022   if (!replay_buffer().empty()) {
4023     TEST_SYNC_POINT_CALLBACK(
4024         "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
4025     return Status::Corruption("corrupted atomic group");
4026   }
4027   return Status::OK();
4028 }
4029 
IsFull() const4030 bool AtomicGroupReadBuffer::IsFull() const {
4031   return read_edits_in_atomic_group_ == replay_buffer_.size();
4032 }
4033 
IsEmpty() const4034 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
4035 
Clear()4036 void AtomicGroupReadBuffer::Clear() {
4037   read_edits_in_atomic_group_ = 0;
4038   replay_buffer_.clear();
4039 }
4040 
VersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & storage_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_session_id)4041 VersionSet::VersionSet(const std::string& dbname,
4042                        const ImmutableDBOptions* _db_options,
4043                        const FileOptions& storage_options, Cache* table_cache,
4044                        WriteBufferManager* write_buffer_manager,
4045                        WriteController* write_controller,
4046                        BlockCacheTracer* const block_cache_tracer,
4047                        const std::shared_ptr<IOTracer>& io_tracer,
4048                        const std::string& db_session_id)
4049     : column_family_set_(
4050           new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
4051                               write_buffer_manager, write_controller,
4052                               block_cache_tracer, io_tracer, db_session_id)),
4053       table_cache_(table_cache),
4054       env_(_db_options->env),
4055       fs_(_db_options->fs, io_tracer),
4056       clock_(_db_options->clock),
4057       dbname_(dbname),
4058       db_options_(_db_options),
4059       next_file_number_(2),
4060       manifest_file_number_(0),  // Filled by Recover()
4061       options_file_number_(0),
4062       options_file_size_(0),
4063       pending_manifest_file_number_(0),
4064       last_sequence_(0),
4065       last_allocated_sequence_(0),
4066       last_published_sequence_(0),
4067       prev_log_number_(0),
4068       current_version_number_(0),
4069       manifest_file_size_(0),
4070       file_options_(storage_options),
4071       block_cache_tracer_(block_cache_tracer),
4072       io_tracer_(io_tracer),
4073       db_session_id_(db_session_id) {}
4074 
~VersionSet()4075 VersionSet::~VersionSet() {
4076   // we need to delete column_family_set_ because its destructor depends on
4077   // VersionSet
4078   column_family_set_.reset();
4079   for (auto& file : obsolete_files_) {
4080     if (file.metadata->table_reader_handle) {
4081       table_cache_->Release(file.metadata->table_reader_handle);
4082       TableCache::Evict(table_cache_, file.metadata->fd.GetNumber());
4083     }
4084     file.DeleteMetadata();
4085   }
4086   obsolete_files_.clear();
4087   io_status_.PermitUncheckedError();
4088 }
4089 
Reset()4090 void VersionSet::Reset() {
4091   if (column_family_set_) {
4092     WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
4093     WriteController* wc = column_family_set_->write_controller();
4094     column_family_set_.reset(new ColumnFamilySet(
4095         dbname_, db_options_, file_options_, table_cache_, wbm, wc,
4096         block_cache_tracer_, io_tracer_, db_session_id_));
4097   }
4098   db_id_.clear();
4099   next_file_number_.store(2);
4100   min_log_number_to_keep_2pc_.store(0);
4101   manifest_file_number_ = 0;
4102   options_file_number_ = 0;
4103   pending_manifest_file_number_ = 0;
4104   last_sequence_.store(0);
4105   last_allocated_sequence_.store(0);
4106   last_published_sequence_.store(0);
4107   prev_log_number_ = 0;
4108   descriptor_log_.reset();
4109   current_version_number_ = 0;
4110   manifest_writers_.clear();
4111   manifest_file_size_ = 0;
4112   obsolete_files_.clear();
4113   obsolete_manifests_.clear();
4114   wals_.Reset();
4115 }
4116 
AppendVersion(ColumnFamilyData * column_family_data,Version * v)4117 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
4118                                Version* v) {
4119   // compute new compaction score
4120   v->storage_info()->ComputeCompactionScore(
4121       *column_family_data->ioptions(),
4122       *column_family_data->GetLatestMutableCFOptions());
4123 
4124   // Mark v finalized
4125   v->storage_info_.SetFinalized();
4126 
4127   // Make "v" current
4128   assert(v->refs_ == 0);
4129   Version* current = column_family_data->current();
4130   assert(v != current);
4131   if (current != nullptr) {
4132     assert(current->refs_ > 0);
4133     current->Unref();
4134   }
4135   column_family_data->SetCurrent(v);
4136   v->Ref();
4137 
4138   // Append to linked list
4139   v->prev_ = column_family_data->dummy_versions()->prev_;
4140   v->next_ = column_family_data->dummy_versions();
4141   v->prev_->next_ = v;
4142   v->next_->prev_ = v;
4143 }
4144 
ProcessManifestWrites(std::deque<ManifestWriter> & writers,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options)4145 Status VersionSet::ProcessManifestWrites(
4146     std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
4147     FSDirectory* db_directory, bool new_descriptor_log,
4148     const ColumnFamilyOptions* new_cf_options) {
4149   mu->AssertHeld();
4150   assert(!writers.empty());
4151   ManifestWriter& first_writer = writers.front();
4152   ManifestWriter* last_writer = &first_writer;
4153 
4154   assert(!manifest_writers_.empty());
4155   assert(manifest_writers_.front() == &first_writer);
4156 
4157   autovector<VersionEdit*> batch_edits;
4158   autovector<Version*> versions;
4159   autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
4160   std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
4161 
4162   if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4163     // No group commits for column family add or drop
4164     LogAndApplyCFHelper(first_writer.edit_list.front());
4165     batch_edits.push_back(first_writer.edit_list.front());
4166   } else {
4167     auto it = manifest_writers_.cbegin();
4168     size_t group_start = std::numeric_limits<size_t>::max();
4169     while (it != manifest_writers_.cend()) {
4170       if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
4171         // no group commits for column family add or drop
4172         break;
4173       }
4174       last_writer = *(it++);
4175       assert(last_writer != nullptr);
4176       assert(last_writer->cfd != nullptr);
4177       if (last_writer->cfd->IsDropped()) {
4178         // If we detect a dropped CF at this point, and the corresponding
4179         // version edits belong to an atomic group, then we need to find out
4180         // the preceding version edits in the same atomic group, and update
4181         // their `remaining_entries_` member variable because we are NOT going
4182         // to write the version edits' of dropped CF to the MANIFEST. If we
4183         // don't update, then Recover can report corrupted atomic group because
4184         // the `remaining_entries_` do not match.
4185         if (!batch_edits.empty()) {
4186           if (batch_edits.back()->is_in_atomic_group_ &&
4187               batch_edits.back()->remaining_entries_ > 0) {
4188             assert(group_start < batch_edits.size());
4189             const auto& edit_list = last_writer->edit_list;
4190             size_t k = 0;
4191             while (k < edit_list.size()) {
4192               if (!edit_list[k]->is_in_atomic_group_) {
4193                 break;
4194               } else if (edit_list[k]->remaining_entries_ == 0) {
4195                 ++k;
4196                 break;
4197               }
4198               ++k;
4199             }
4200             for (auto i = group_start; i < batch_edits.size(); ++i) {
4201               assert(static_cast<uint32_t>(k) <=
4202                      batch_edits.back()->remaining_entries_);
4203               batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
4204             }
4205           }
4206         }
4207         continue;
4208       }
4209       // We do a linear search on versions because versions is small.
4210       // TODO(yanqin) maybe consider unordered_map
4211       Version* version = nullptr;
4212       VersionBuilder* builder = nullptr;
4213       for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
4214         uint32_t cf_id = last_writer->cfd->GetID();
4215         if (versions[i]->cfd()->GetID() == cf_id) {
4216           version = versions[i];
4217           assert(!builder_guards.empty() &&
4218                  builder_guards.size() == versions.size());
4219           builder = builder_guards[i]->version_builder();
4220           TEST_SYNC_POINT_CALLBACK(
4221               "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
4222           break;
4223         }
4224       }
4225       if (version == nullptr) {
4226         // WAL manipulations do not need to be applied to versions.
4227         if (!last_writer->IsAllWalEdits()) {
4228           version = new Version(last_writer->cfd, this, file_options_,
4229                                 last_writer->mutable_cf_options, io_tracer_,
4230                                 current_version_number_++);
4231           versions.push_back(version);
4232           mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
4233           builder_guards.emplace_back(
4234               new BaseReferencedVersionBuilder(last_writer->cfd));
4235           builder = builder_guards.back()->version_builder();
4236         }
4237         assert(last_writer->IsAllWalEdits() || builder);
4238         assert(last_writer->IsAllWalEdits() || version);
4239         TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
4240                                  version);
4241       }
4242       for (const auto& e : last_writer->edit_list) {
4243         if (e->is_in_atomic_group_) {
4244           if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
4245               (batch_edits.back()->is_in_atomic_group_ &&
4246                batch_edits.back()->remaining_entries_ == 0)) {
4247             group_start = batch_edits.size();
4248           }
4249         } else if (group_start != std::numeric_limits<size_t>::max()) {
4250           group_start = std::numeric_limits<size_t>::max();
4251         }
4252         Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
4253         if (!s.ok()) {
4254           // free up the allocated memory
4255           for (auto v : versions) {
4256             delete v;
4257           }
4258           return s;
4259         }
4260         batch_edits.push_back(e);
4261       }
4262     }
4263     for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4264       assert(!builder_guards.empty() &&
4265              builder_guards.size() == versions.size());
4266       auto* builder = builder_guards[i]->version_builder();
4267       Status s = builder->SaveTo(versions[i]->storage_info());
4268       if (!s.ok()) {
4269         // free up the allocated memory
4270         for (auto v : versions) {
4271           delete v;
4272         }
4273         return s;
4274       }
4275     }
4276   }
4277 
4278 #ifndef NDEBUG
4279   // Verify that version edits of atomic groups have correct
4280   // remaining_entries_.
4281   size_t k = 0;
4282   while (k < batch_edits.size()) {
4283     while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
4284       ++k;
4285     }
4286     if (k == batch_edits.size()) {
4287       break;
4288     }
4289     size_t i = k;
4290     while (i < batch_edits.size()) {
4291       if (!batch_edits[i]->is_in_atomic_group_) {
4292         break;
4293       }
4294       assert(i - k + batch_edits[i]->remaining_entries_ ==
4295              batch_edits[k]->remaining_entries_);
4296       if (batch_edits[i]->remaining_entries_ == 0) {
4297         ++i;
4298         break;
4299       }
4300       ++i;
4301     }
4302     assert(batch_edits[i - 1]->is_in_atomic_group_);
4303     assert(0 == batch_edits[i - 1]->remaining_entries_);
4304     std::vector<VersionEdit*> tmp;
4305     for (size_t j = k; j != i; ++j) {
4306       tmp.emplace_back(batch_edits[j]);
4307     }
4308     TEST_SYNC_POINT_CALLBACK(
4309         "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
4310     k = i;
4311   }
4312 #endif  // NDEBUG
4313 
4314   assert(pending_manifest_file_number_ == 0);
4315   if (!descriptor_log_ ||
4316       manifest_file_size_ > db_options_->max_manifest_file_size) {
4317     TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
4318     new_descriptor_log = true;
4319   } else {
4320     pending_manifest_file_number_ = manifest_file_number_;
4321   }
4322 
4323   // Local cached copy of state variable(s). WriteCurrentStateToManifest()
4324   // reads its content after releasing db mutex to avoid race with
4325   // SwitchMemtable().
4326   std::unordered_map<uint32_t, MutableCFState> curr_state;
4327   VersionEdit wal_additions;
4328   if (new_descriptor_log) {
4329     pending_manifest_file_number_ = NewFileNumber();
4330     batch_edits.back()->SetNextFile(next_file_number_.load());
4331 
4332     // if we are writing out new snapshot make sure to persist max column
4333     // family.
4334     if (column_family_set_->GetMaxColumnFamily() > 0) {
4335       first_writer.edit_list.front()->SetMaxColumnFamily(
4336           column_family_set_->GetMaxColumnFamily());
4337     }
4338     for (const auto* cfd : *column_family_set_) {
4339       assert(curr_state.find(cfd->GetID()) == curr_state.end());
4340       curr_state.emplace(std::make_pair(
4341           cfd->GetID(),
4342           MutableCFState(cfd->GetLogNumber(), cfd->GetFullHistoryTsLow())));
4343     }
4344 
4345     for (const auto& wal : wals_.GetWals()) {
4346       wal_additions.AddWal(wal.first, wal.second);
4347     }
4348   }
4349 
4350   uint64_t new_manifest_file_size = 0;
4351   Status s;
4352   IOStatus io_s;
4353   IOStatus manifest_io_status;
4354   {
4355     FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
4356     mu->Unlock();
4357     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestStart");
4358     TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
4359     if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4360       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4361         assert(!builder_guards.empty() &&
4362                builder_guards.size() == versions.size());
4363         assert(!mutable_cf_options_ptrs.empty() &&
4364                builder_guards.size() == versions.size());
4365         ColumnFamilyData* cfd = versions[i]->cfd_;
4366         s = builder_guards[i]->version_builder()->LoadTableHandlers(
4367             cfd->internal_stats(), 1 /* max_threads */,
4368             true /* prefetch_index_and_filter_in_cache */,
4369             false /* is_initial_load */,
4370             mutable_cf_options_ptrs[i]->prefix_extractor.get(),
4371             MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
4372         if (!s.ok()) {
4373           if (db_options_->paranoid_checks) {
4374             break;
4375           }
4376           s = Status::OK();
4377         }
4378       }
4379     }
4380 
4381     if (s.ok() && new_descriptor_log) {
4382       // This is fine because everything inside of this block is serialized --
4383       // only one thread can be here at the same time
4384       // create new manifest file
4385       ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
4386                      pending_manifest_file_number_);
4387       std::string descriptor_fname =
4388           DescriptorFileName(dbname_, pending_manifest_file_number_);
4389       std::unique_ptr<FSWritableFile> descriptor_file;
4390       io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
4391                              opt_file_opts);
4392       if (io_s.ok()) {
4393         descriptor_file->SetPreallocationBlockSize(
4394             db_options_->manifest_preallocation_size);
4395         FileTypeSet tmp_set = db_options_->checksum_handoff_file_types;
4396         std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
4397             std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_,
4398             io_tracer_, nullptr, db_options_->listeners, nullptr,
4399             tmp_set.Contains(FileType::kDescriptorFile),
4400             tmp_set.Contains(FileType::kDescriptorFile)));
4401         descriptor_log_.reset(
4402             new log::Writer(std::move(file_writer), 0, false));
4403         s = WriteCurrentStateToManifest(curr_state, wal_additions,
4404                                         descriptor_log_.get(), io_s);
4405       } else {
4406         manifest_io_status = io_s;
4407         s = io_s;
4408       }
4409     }
4410 
4411     if (s.ok()) {
4412       if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4413         for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4414           versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
4415         }
4416       }
4417 
4418       // Write new records to MANIFEST log
4419 #ifndef NDEBUG
4420       size_t idx = 0;
4421 #endif
4422       for (auto& e : batch_edits) {
4423         std::string record;
4424         if (!e->EncodeTo(&record)) {
4425           s = Status::Corruption("Unable to encode VersionEdit:" +
4426                                  e->DebugString(true));
4427           break;
4428         }
4429         TEST_KILL_RANDOM_WITH_WEIGHT("VersionSet::LogAndApply:BeforeAddRecord",
4430                                      REDUCE_ODDS2);
4431 #ifndef NDEBUG
4432         if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
4433           TEST_SYNC_POINT_CALLBACK(
4434               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
4435               nullptr);
4436           TEST_SYNC_POINT(
4437               "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
4438         }
4439         ++idx;
4440 #endif /* !NDEBUG */
4441         io_s = descriptor_log_->AddRecord(record);
4442         if (!io_s.ok()) {
4443           s = io_s;
4444           manifest_io_status = io_s;
4445           break;
4446         }
4447       }
4448       if (s.ok()) {
4449         io_s = SyncManifest(db_options_, descriptor_log_->file());
4450         manifest_io_status = io_s;
4451         TEST_SYNC_POINT_CALLBACK(
4452             "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
4453       }
4454       if (!io_s.ok()) {
4455         s = io_s;
4456         ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
4457                         s.ToString().c_str());
4458       }
4459     }
4460 
4461     // If we just created a new descriptor file, install it by writing a
4462     // new CURRENT file that points to it.
4463     if (s.ok()) {
4464       assert(manifest_io_status.ok());
4465     }
4466     if (s.ok() && new_descriptor_log) {
4467       io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
4468                             db_directory);
4469       if (!io_s.ok()) {
4470         s = io_s;
4471       }
4472     }
4473 
4474     if (s.ok()) {
4475       // find offset in manifest file where this version is stored.
4476       new_manifest_file_size = descriptor_log_->file()->GetFileSize();
4477     }
4478 
4479     if (first_writer.edit_list.front()->is_column_family_drop_) {
4480       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
4481       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
4482       TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
4483     }
4484 
4485     LogFlush(db_options_->info_log);
4486     TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
4487     mu->Lock();
4488   }
4489 
4490   if (s.ok()) {
4491     // Apply WAL edits, DB mutex must be held.
4492     for (auto& e : batch_edits) {
4493       if (e->IsWalAddition()) {
4494         s = wals_.AddWals(e->GetWalAdditions());
4495       } else if (e->IsWalDeletion()) {
4496         s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
4497       }
4498       if (!s.ok()) {
4499         break;
4500       }
4501     }
4502   }
4503 
4504   if (!io_s.ok()) {
4505     if (io_status_.ok()) {
4506       io_status_ = io_s;
4507     }
4508   } else if (!io_status_.ok()) {
4509     io_status_ = io_s;
4510   }
4511 
4512   // Append the old manifest file to the obsolete_manifest_ list to be deleted
4513   // by PurgeObsoleteFiles later.
4514   if (s.ok() && new_descriptor_log) {
4515     obsolete_manifests_.emplace_back(
4516         DescriptorFileName("", manifest_file_number_));
4517   }
4518 
4519   // Install the new versions
4520   if (s.ok()) {
4521     if (first_writer.edit_list.front()->is_column_family_add_) {
4522       assert(batch_edits.size() == 1);
4523       assert(new_cf_options != nullptr);
4524       CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
4525     } else if (first_writer.edit_list.front()->is_column_family_drop_) {
4526       assert(batch_edits.size() == 1);
4527       first_writer.cfd->SetDropped();
4528       first_writer.cfd->UnrefAndTryDelete();
4529     } else {
4530       // Each version in versions corresponds to a column family.
4531       // For each column family, update its log number indicating that logs
4532       // with number smaller than this should be ignored.
4533       uint64_t last_min_log_number_to_keep = 0;
4534       for (const auto& e : batch_edits) {
4535         ColumnFamilyData* cfd = nullptr;
4536         if (!e->IsColumnFamilyManipulation()) {
4537           cfd = column_family_set_->GetColumnFamily(e->column_family_);
4538           // e would not have been added to batch_edits if its corresponding
4539           // column family is dropped.
4540           assert(cfd);
4541         }
4542         if (cfd) {
4543           if (e->has_log_number_ && e->log_number_ > cfd->GetLogNumber()) {
4544             cfd->SetLogNumber(e->log_number_);
4545           }
4546           if (e->HasFullHistoryTsLow()) {
4547             cfd->SetFullHistoryTsLow(e->GetFullHistoryTsLow());
4548           }
4549         }
4550         if (e->has_min_log_number_to_keep_) {
4551           last_min_log_number_to_keep =
4552               std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
4553         }
4554       }
4555 
4556       if (last_min_log_number_to_keep != 0) {
4557         // Should only be set in 2PC mode.
4558         MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
4559       }
4560 
4561       for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4562         ColumnFamilyData* cfd = versions[i]->cfd_;
4563         AppendVersion(cfd, versions[i]);
4564       }
4565     }
4566     manifest_file_number_ = pending_manifest_file_number_;
4567     manifest_file_size_ = new_manifest_file_size;
4568     prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
4569   } else {
4570     std::string version_edits;
4571     for (auto& e : batch_edits) {
4572       version_edits += ("\n" + e->DebugString(true));
4573     }
4574     ROCKS_LOG_ERROR(db_options_->info_log,
4575                     "Error in committing version edit to MANIFEST: %s",
4576                     version_edits.c_str());
4577     for (auto v : versions) {
4578       delete v;
4579     }
4580     if (manifest_io_status.ok()) {
4581       manifest_file_number_ = pending_manifest_file_number_;
4582       manifest_file_size_ = new_manifest_file_size;
4583     }
4584     // If manifest append failed for whatever reason, the file could be
4585     // corrupted. So we need to force the next version update to start a
4586     // new manifest file.
4587     descriptor_log_.reset();
4588     // If manifest operations failed, then we know the CURRENT file still
4589     // points to the original MANIFEST. Therefore, we can safely delete the
4590     // new MANIFEST.
4591     // If manifest operations succeeded, and we are here, then it is possible
4592     // that renaming tmp file to CURRENT failed.
4593     //
4594     // On local POSIX-compliant FS, the CURRENT must point to the original
4595     // MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
4596     // keep it. Future recovery will ignore this MANIFEST. It's also ok for the
4597     // process not to crash and continue using the db. Any future LogAndApply()
4598     // call will switch to a new MANIFEST and update CURRENT, still ignoring
4599     // this one.
4600     //
4601     // On non-local FS, it is
4602     // possible that the rename operation succeeded on the server (remote)
4603     // side, but the client somehow returns a non-ok status to RocksDB. Note
4604     // that this does not violate atomicity. Should we delete the new MANIFEST
4605     // successfully, a subsequent recovery attempt will likely see the CURRENT
4606     // pointing to the new MANIFEST, thus fail. We will not be able to open the
4607     // DB again. Therefore, if manifest operations succeed, we should keep the
4608     // the new MANIFEST. If the process proceeds, any future LogAndApply() call
4609     // will switch to a new MANIFEST and update CURRENT. If user tries to
4610     // re-open the DB,
4611     // a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
4612     // b) CURRENT points to the original MANIFEST, and the original MANIFEST
4613     //    also exists.
4614     if (new_descriptor_log && !manifest_io_status.ok()) {
4615       ROCKS_LOG_INFO(db_options_->info_log,
4616                      "Deleting manifest %" PRIu64 " current manifest %" PRIu64
4617                      "\n",
4618                      pending_manifest_file_number_, manifest_file_number_);
4619       Status manifest_del_status = env_->DeleteFile(
4620           DescriptorFileName(dbname_, pending_manifest_file_number_));
4621       if (!manifest_del_status.ok()) {
4622         ROCKS_LOG_WARN(db_options_->info_log,
4623                        "Failed to delete manifest %" PRIu64 ": %s",
4624                        pending_manifest_file_number_,
4625                        manifest_del_status.ToString().c_str());
4626       }
4627     }
4628   }
4629 
4630   pending_manifest_file_number_ = 0;
4631 
4632   // wake up all the waiting writers
4633   while (true) {
4634     ManifestWriter* ready = manifest_writers_.front();
4635     manifest_writers_.pop_front();
4636     bool need_signal = true;
4637     for (const auto& w : writers) {
4638       if (&w == ready) {
4639         need_signal = false;
4640         break;
4641       }
4642     }
4643     ready->status = s;
4644     ready->done = true;
4645     if (ready->manifest_write_callback) {
4646       (ready->manifest_write_callback)(s);
4647     }
4648     if (need_signal) {
4649       ready->cv.Signal();
4650     }
4651     if (ready == last_writer) {
4652       break;
4653     }
4654   }
4655   if (!manifest_writers_.empty()) {
4656     manifest_writers_.front()->cv.Signal();
4657   }
4658   return s;
4659 }
4660 
WakeUpWaitingManifestWriters()4661 void VersionSet::WakeUpWaitingManifestWriters() {
4662   // wake up all the waiting writers
4663   // Notify new head of manifest write queue.
4664   if (!manifest_writers_.empty()) {
4665     manifest_writers_.front()->cv.Signal();
4666   }
4667 }
4668 
4669 // 'datas' is grammatically incorrect. We still use this notation to indicate
4670 // that this variable represents a collection of column_family_data.
LogAndApply(const autovector<ColumnFamilyData * > & column_family_datas,const autovector<const MutableCFOptions * > & mutable_cf_options_list,const autovector<autovector<VersionEdit * >> & edit_lists,InstrumentedMutex * mu,FSDirectory * db_directory,bool new_descriptor_log,const ColumnFamilyOptions * new_cf_options,const std::vector<std::function<void (const Status &)>> & manifest_wcbs)4671 Status VersionSet::LogAndApply(
4672     const autovector<ColumnFamilyData*>& column_family_datas,
4673     const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4674     const autovector<autovector<VersionEdit*>>& edit_lists,
4675     InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4676     const ColumnFamilyOptions* new_cf_options,
4677     const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
4678   mu->AssertHeld();
4679   int num_edits = 0;
4680   for (const auto& elist : edit_lists) {
4681     num_edits += static_cast<int>(elist.size());
4682   }
4683   if (num_edits == 0) {
4684     return Status::OK();
4685   } else if (num_edits > 1) {
4686 #ifndef NDEBUG
4687     for (const auto& edit_list : edit_lists) {
4688       for (const auto& edit : edit_list) {
4689         assert(!edit->IsColumnFamilyManipulation());
4690       }
4691     }
4692 #endif /* ! NDEBUG */
4693   }
4694 
4695   int num_cfds = static_cast<int>(column_family_datas.size());
4696   if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4697     assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4698     assert(edit_lists[0][0]->is_column_family_add_);
4699     assert(new_cf_options != nullptr);
4700   }
4701   std::deque<ManifestWriter> writers;
4702   if (num_cfds > 0) {
4703     assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4704     assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4705   }
4706   for (int i = 0; i < num_cfds; ++i) {
4707     const auto wcb =
4708         manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
4709     writers.emplace_back(mu, column_family_datas[i],
4710                          *mutable_cf_options_list[i], edit_lists[i], wcb);
4711     manifest_writers_.push_back(&writers[i]);
4712   }
4713   assert(!writers.empty());
4714   ManifestWriter& first_writer = writers.front();
4715   TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
4716                            nullptr);
4717   while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4718     first_writer.cv.Wait();
4719   }
4720   if (first_writer.done) {
4721     // All non-CF-manipulation operations can be grouped together and committed
4722     // to MANIFEST. They should all have finished. The status code is stored in
4723     // the first manifest writer.
4724 #ifndef NDEBUG
4725     for (const auto& writer : writers) {
4726       assert(writer.done);
4727     }
4728     TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
4729 #endif /* !NDEBUG */
4730     return first_writer.status;
4731   }
4732 
4733   int num_undropped_cfds = 0;
4734   for (auto cfd : column_family_datas) {
4735     // if cfd == nullptr, it is a column family add.
4736     if (cfd == nullptr || !cfd->IsDropped()) {
4737       ++num_undropped_cfds;
4738     }
4739   }
4740   if (0 == num_undropped_cfds) {
4741     for (int i = 0; i != num_cfds; ++i) {
4742       manifest_writers_.pop_front();
4743     }
4744     // Notify new head of manifest write queue.
4745     if (!manifest_writers_.empty()) {
4746       manifest_writers_.front()->cv.Signal();
4747     }
4748     return Status::ColumnFamilyDropped();
4749   }
4750 
4751   return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4752                                new_cf_options);
4753 }
4754 
LogAndApplyCFHelper(VersionEdit * edit)4755 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4756   assert(edit->IsColumnFamilyManipulation());
4757   edit->SetNextFile(next_file_number_.load());
4758   // The log might have data that is not visible to memtbale and hence have not
4759   // updated the last_sequence_ yet. It is also possible that the log has is
4760   // expecting some new data that is not written yet. Since LastSequence is an
4761   // upper bound on the sequence, it is ok to record
4762   // last_allocated_sequence_ as the last sequence.
4763   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4764                                                       : last_sequence_);
4765   if (edit->is_column_family_drop_) {
4766     // if we drop column family, we have to make sure to save max column family,
4767     // so that we don't reuse existing ID
4768     edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4769   }
4770 }
4771 
LogAndApplyHelper(ColumnFamilyData * cfd,VersionBuilder * builder,VersionEdit * edit,InstrumentedMutex * mu)4772 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4773                                      VersionBuilder* builder, VersionEdit* edit,
4774                                      InstrumentedMutex* mu) {
4775 #ifdef NDEBUG
4776   (void)cfd;
4777 #endif
4778   mu->AssertHeld();
4779   assert(!edit->IsColumnFamilyManipulation());
4780 
4781   if (edit->has_log_number_) {
4782     assert(edit->log_number_ >= cfd->GetLogNumber());
4783     assert(edit->log_number_ < next_file_number_.load());
4784   }
4785 
4786   if (!edit->has_prev_log_number_) {
4787     edit->SetPrevLogNumber(prev_log_number_);
4788   }
4789   edit->SetNextFile(next_file_number_.load());
4790   // The log might have data that is not visible to memtbale and hence have not
4791   // updated the last_sequence_ yet. It is also possible that the log has is
4792   // expecting some new data that is not written yet. Since LastSequence is an
4793   // upper bound on the sequence, it is ok to record
4794   // last_allocated_sequence_ as the last sequence.
4795   edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4796                                                       : last_sequence_);
4797 
4798   // The builder can be nullptr only if edit is WAL manipulation,
4799   // because WAL edits do not need to be applied to versions,
4800   // we return Status::OK() in this case.
4801   assert(builder || edit->IsWalManipulation());
4802   return builder ? builder->Apply(edit) : Status::OK();
4803 }
4804 
GetCurrentManifestPath(const std::string & dbname,FileSystem * fs,std::string * manifest_path,uint64_t * manifest_file_number)4805 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4806                                           FileSystem* fs,
4807                                           std::string* manifest_path,
4808                                           uint64_t* manifest_file_number) {
4809   assert(fs != nullptr);
4810   assert(manifest_path != nullptr);
4811   assert(manifest_file_number != nullptr);
4812 
4813   std::string fname;
4814   Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4815   if (!s.ok()) {
4816     return s;
4817   }
4818   if (fname.empty() || fname.back() != '\n') {
4819     return Status::Corruption("CURRENT file does not end with newline");
4820   }
4821   // remove the trailing '\n'
4822   fname.resize(fname.size() - 1);
4823   FileType type;
4824   bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4825   if (!parse_ok || type != kDescriptorFile) {
4826     return Status::Corruption("CURRENT file corrupted");
4827   }
4828   *manifest_path = dbname;
4829   if (dbname.back() != '/') {
4830     manifest_path->push_back('/');
4831   }
4832   manifest_path->append(fname);
4833   return Status::OK();
4834 }
4835 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id)4836 Status VersionSet::Recover(
4837     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4838     std::string* db_id) {
4839   // Read "CURRENT" file, which contains a pointer to the current manifest file
4840   std::string manifest_path;
4841   Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
4842                                     &manifest_file_number_);
4843   if (!s.ok()) {
4844     return s;
4845   }
4846 
4847   ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4848                  manifest_path.c_str());
4849 
4850   std::unique_ptr<SequentialFileReader> manifest_file_reader;
4851   {
4852     std::unique_ptr<FSSequentialFile> manifest_file;
4853     s = fs_->NewSequentialFile(manifest_path,
4854                                fs_->OptimizeForManifestRead(file_options_),
4855                                &manifest_file, nullptr);
4856     if (!s.ok()) {
4857       return s;
4858     }
4859     manifest_file_reader.reset(new SequentialFileReader(
4860         std::move(manifest_file), manifest_path,
4861         db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
4862   }
4863   uint64_t current_manifest_file_size = 0;
4864   uint64_t log_number = 0;
4865   {
4866     VersionSet::LogReporter reporter;
4867     Status log_read_status;
4868     reporter.status = &log_read_status;
4869     log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4870                        true /* checksum */, 0 /* log_number */);
4871     VersionEditHandler handler(read_only, column_families,
4872                                const_cast<VersionSet*>(this),
4873                                /*track_missing_files=*/false,
4874                                /*no_error_if_files_missing=*/false, io_tracer_);
4875     handler.Iterate(reader, &log_read_status);
4876     s = handler.status();
4877     if (s.ok()) {
4878       log_number = handler.GetVersionEditParams().log_number_;
4879       current_manifest_file_size = reader.GetReadOffset();
4880       assert(current_manifest_file_size != 0);
4881       handler.GetDbId(db_id);
4882     }
4883   }
4884 
4885   if (s.ok()) {
4886     manifest_file_size_ = current_manifest_file_size;
4887     ROCKS_LOG_INFO(
4888         db_options_->info_log,
4889         "Recovered from manifest file:%s succeeded,"
4890         "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4891         ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4892         ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4893         ",min_log_number_to_keep is %" PRIu64 "\n",
4894         manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4895         last_sequence_.load(), log_number, prev_log_number_,
4896         column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
4897 
4898     for (auto cfd : *column_family_set_) {
4899       if (cfd->IsDropped()) {
4900         continue;
4901       }
4902       ROCKS_LOG_INFO(db_options_->info_log,
4903                      "Column family [%s] (ID %" PRIu32
4904                      "), log number is %" PRIu64 "\n",
4905                      cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4906     }
4907   }
4908 
4909   return s;
4910 }
4911 
4912 namespace {
4913 class ManifestPicker {
4914  public:
4915   explicit ManifestPicker(const std::string& dbname,
4916                           const std::vector<std::string>& files_in_dbname);
4917   // REQUIRES Valid() == true
4918   std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
Valid() const4919   bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
4920 
4921  private:
4922   const std::string& dbname_;
4923   // MANIFEST file names(s)
4924   std::vector<std::string> manifest_files_;
4925   std::vector<std::string>::const_iterator manifest_file_iter_;
4926 };
4927 
ManifestPicker(const std::string & dbname,const std::vector<std::string> & files_in_dbname)4928 ManifestPicker::ManifestPicker(const std::string& dbname,
4929                                const std::vector<std::string>& files_in_dbname)
4930     : dbname_(dbname) {
4931   // populate manifest files
4932   assert(!files_in_dbname.empty());
4933   for (const auto& fname : files_in_dbname) {
4934     uint64_t file_num = 0;
4935     FileType file_type;
4936     bool parse_ok = ParseFileName(fname, &file_num, &file_type);
4937     if (parse_ok && file_type == kDescriptorFile) {
4938       manifest_files_.push_back(fname);
4939     }
4940   }
4941   // seek to first manifest
4942   std::sort(manifest_files_.begin(), manifest_files_.end(),
4943             [](const std::string& lhs, const std::string& rhs) {
4944               uint64_t num1 = 0;
4945               uint64_t num2 = 0;
4946               FileType type1;
4947               FileType type2;
4948               bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
4949               bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
4950 #ifndef NDEBUG
4951               assert(parse_ok1);
4952               assert(parse_ok2);
4953 #else
4954               (void)parse_ok1;
4955               (void)parse_ok2;
4956 #endif
4957               return num1 > num2;
4958             });
4959   manifest_file_iter_ = manifest_files_.begin();
4960 }
4961 
GetNextManifest(uint64_t * number,std::string * file_name)4962 std::string ManifestPicker::GetNextManifest(uint64_t* number,
4963                                             std::string* file_name) {
4964   assert(Valid());
4965   std::string ret;
4966   if (manifest_file_iter_ != manifest_files_.end()) {
4967     ret.assign(dbname_);
4968     if (ret.back() != kFilePathSeparator) {
4969       ret.push_back(kFilePathSeparator);
4970     }
4971     ret.append(*manifest_file_iter_);
4972     if (number) {
4973       FileType type;
4974       bool parse = ParseFileName(*manifest_file_iter_, number, &type);
4975       assert(type == kDescriptorFile);
4976 #ifndef NDEBUG
4977       assert(parse);
4978 #else
4979       (void)parse;
4980 #endif
4981     }
4982     if (file_name) {
4983       *file_name = *manifest_file_iter_;
4984     }
4985     ++manifest_file_iter_;
4986   }
4987   return ret;
4988 }
4989 }  // namespace
4990 
TryRecover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,const std::vector<std::string> & files_in_dbname,std::string * db_id,bool * has_missing_table_file)4991 Status VersionSet::TryRecover(
4992     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4993     const std::vector<std::string>& files_in_dbname, std::string* db_id,
4994     bool* has_missing_table_file) {
4995   ManifestPicker manifest_picker(dbname_, files_in_dbname);
4996   if (!manifest_picker.Valid()) {
4997     return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
4998   }
4999   Status s;
5000   std::string manifest_path =
5001       manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
5002   while (!manifest_path.empty()) {
5003     s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
5004                                   db_id, has_missing_table_file);
5005     if (s.ok() || !manifest_picker.Valid()) {
5006       break;
5007     }
5008     Reset();
5009     manifest_path =
5010         manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
5011   }
5012   return s;
5013 }
5014 
TryRecoverFromOneManifest(const std::string & manifest_path,const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,std::string * db_id,bool * has_missing_table_file)5015 Status VersionSet::TryRecoverFromOneManifest(
5016     const std::string& manifest_path,
5017     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
5018     std::string* db_id, bool* has_missing_table_file) {
5019   ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
5020                  manifest_path.c_str());
5021   std::unique_ptr<SequentialFileReader> manifest_file_reader;
5022   Status s;
5023   {
5024     std::unique_ptr<FSSequentialFile> manifest_file;
5025     s = fs_->NewSequentialFile(manifest_path,
5026                                fs_->OptimizeForManifestRead(file_options_),
5027                                &manifest_file, nullptr);
5028     if (!s.ok()) {
5029       return s;
5030     }
5031     manifest_file_reader.reset(new SequentialFileReader(
5032         std::move(manifest_file), manifest_path,
5033         db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
5034   }
5035 
5036   assert(s.ok());
5037   VersionSet::LogReporter reporter;
5038   reporter.status = &s;
5039   log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
5040                      /*checksum=*/true, /*log_num=*/0);
5041   VersionEditHandlerPointInTime handler_pit(
5042       read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
5043 
5044   handler_pit.Iterate(reader, &s);
5045 
5046   handler_pit.GetDbId(db_id);
5047 
5048   assert(nullptr != has_missing_table_file);
5049   *has_missing_table_file = handler_pit.HasMissingFiles();
5050 
5051   return handler_pit.status();
5052 }
5053 
ListColumnFamilies(std::vector<std::string> * column_families,const std::string & dbname,FileSystem * fs)5054 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
5055                                       const std::string& dbname,
5056                                       FileSystem* fs) {
5057   // these are just for performance reasons, not correctness,
5058   // so we're fine using the defaults
5059   FileOptions soptions;
5060   // Read "CURRENT" file, which contains a pointer to the current manifest file
5061   std::string manifest_path;
5062   uint64_t manifest_file_number;
5063   Status s =
5064       GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
5065   if (!s.ok()) {
5066     return s;
5067   }
5068 
5069   std::unique_ptr<SequentialFileReader> file_reader;
5070   {
5071     std::unique_ptr<FSSequentialFile> file;
5072     s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
5073     if (!s.ok()) {
5074       return s;
5075   }
5076   file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
5077                                              nullptr /*IOTracer*/));
5078   }
5079 
5080   VersionSet::LogReporter reporter;
5081   reporter.status = &s;
5082   log::Reader reader(nullptr, std::move(file_reader), &reporter,
5083                      true /* checksum */, 0 /* log_number */);
5084 
5085   ListColumnFamiliesHandler handler;
5086   handler.Iterate(reader, &s);
5087 
5088   assert(column_families);
5089   column_families->clear();
5090   if (handler.status().ok()) {
5091     for (const auto& iter : handler.GetColumnFamilyNames()) {
5092       column_families->push_back(iter.second);
5093     }
5094   }
5095 
5096   return handler.status();
5097 }
5098 
5099 #ifndef ROCKSDB_LITE
ReduceNumberOfLevels(const std::string & dbname,const Options * options,const FileOptions & file_options,int new_levels)5100 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
5101                                         const Options* options,
5102                                         const FileOptions& file_options,
5103                                         int new_levels) {
5104   if (new_levels <= 1) {
5105     return Status::InvalidArgument(
5106         "Number of levels needs to be bigger than 1");
5107   }
5108 
5109   ImmutableDBOptions db_options(*options);
5110   ColumnFamilyOptions cf_options(*options);
5111   std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
5112                                         options->table_cache_numshardbits));
5113   WriteController wc(options->delayed_write_rate);
5114   WriteBufferManager wb(options->db_write_buffer_size);
5115   VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
5116                       nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/,
5117                       /*db_session_id*/ "");
5118   Status status;
5119 
5120   std::vector<ColumnFamilyDescriptor> dummy;
5121   ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
5122                                           ColumnFamilyOptions(*options));
5123   dummy.push_back(dummy_descriptor);
5124   status = versions.Recover(dummy);
5125   if (!status.ok()) {
5126     return status;
5127   }
5128 
5129   Version* current_version =
5130       versions.GetColumnFamilySet()->GetDefault()->current();
5131   auto* vstorage = current_version->storage_info();
5132   int current_levels = vstorage->num_levels();
5133 
5134   if (current_levels <= new_levels) {
5135     return Status::OK();
5136   }
5137 
5138   // Make sure there are file only on one level from
5139   // (new_levels-1) to (current_levels-1)
5140   int first_nonempty_level = -1;
5141   int first_nonempty_level_filenum = 0;
5142   for (int i = new_levels - 1; i < current_levels; i++) {
5143     int file_num = vstorage->NumLevelFiles(i);
5144     if (file_num != 0) {
5145       if (first_nonempty_level < 0) {
5146         first_nonempty_level = i;
5147         first_nonempty_level_filenum = file_num;
5148       } else {
5149         char msg[255];
5150         snprintf(msg, sizeof(msg),
5151                  "Found at least two levels containing files: "
5152                  "[%d:%d],[%d:%d].\n",
5153                  first_nonempty_level, first_nonempty_level_filenum, i,
5154                  file_num);
5155         return Status::InvalidArgument(msg);
5156       }
5157     }
5158   }
5159 
5160   // we need to allocate an array with the old number of levels size to
5161   // avoid SIGSEGV in WriteCurrentStatetoManifest()
5162   // however, all levels bigger or equal to new_levels will be empty
5163   std::vector<FileMetaData*>* new_files_list =
5164       new std::vector<FileMetaData*>[current_levels];
5165   for (int i = 0; i < new_levels - 1; i++) {
5166     new_files_list[i] = vstorage->LevelFiles(i);
5167   }
5168 
5169   if (first_nonempty_level > 0) {
5170     auto& new_last_level = new_files_list[new_levels - 1];
5171 
5172     new_last_level = vstorage->LevelFiles(first_nonempty_level);
5173 
5174     for (size_t i = 0; i < new_last_level.size(); ++i) {
5175       const FileMetaData* const meta = new_last_level[i];
5176       assert(meta);
5177 
5178       const uint64_t file_number = meta->fd.GetNumber();
5179 
5180       vstorage->file_locations_[file_number] =
5181           VersionStorageInfo::FileLocation(new_levels - 1, i);
5182     }
5183   }
5184 
5185   delete[] vstorage -> files_;
5186   vstorage->files_ = new_files_list;
5187   vstorage->num_levels_ = new_levels;
5188 
5189   MutableCFOptions mutable_cf_options(*options);
5190   VersionEdit ve;
5191   InstrumentedMutex dummy_mutex;
5192   InstrumentedMutexLock l(&dummy_mutex);
5193   return versions.LogAndApply(
5194       versions.GetColumnFamilySet()->GetDefault(),
5195       mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
5196 }
5197 
5198 // Get the checksum information including the checksum and checksum function
5199 // name of all SST and blob files in VersionSet. Store the information in
5200 // FileChecksumList which contains a map from file number to its checksum info.
5201 // If DB is not running, make sure call VersionSet::Recover() to load the file
5202 // metadata from Manifest to VersionSet before calling this function.
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)5203 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
5204   // Clean the previously stored checksum information if any.
5205   Status s;
5206   if (checksum_list == nullptr) {
5207     s = Status::InvalidArgument("checksum_list is nullptr");
5208     return s;
5209   }
5210   checksum_list->reset();
5211 
5212   for (auto cfd : *column_family_set_) {
5213     if (cfd->IsDropped() || !cfd->initialized()) {
5214       continue;
5215     }
5216     /* SST files */
5217     for (int level = 0; level < cfd->NumberLevels(); level++) {
5218       for (const auto& file :
5219            cfd->current()->storage_info()->LevelFiles(level)) {
5220         s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
5221                                                  file->file_checksum,
5222                                                  file->file_checksum_func_name);
5223         if (!s.ok()) {
5224           return s;
5225         }
5226       }
5227     }
5228 
5229     /* Blob files */
5230     const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
5231     for (const auto& pair : blob_files) {
5232       const uint64_t blob_file_number = pair.first;
5233       const auto& meta = pair.second;
5234 
5235       assert(meta);
5236       assert(blob_file_number == meta->GetBlobFileNumber());
5237 
5238       std::string checksum_value = meta->GetChecksumValue();
5239       std::string checksum_method = meta->GetChecksumMethod();
5240       assert(checksum_value.empty() == checksum_method.empty());
5241       if (meta->GetChecksumMethod().empty()) {
5242         checksum_value = kUnknownFileChecksum;
5243         checksum_method = kUnknownFileChecksumFuncName;
5244       }
5245 
5246       s = checksum_list->InsertOneFileChecksum(blob_file_number, checksum_value,
5247                                                checksum_method);
5248       if (!s.ok()) {
5249         return s;
5250       }
5251     }
5252   }
5253 
5254   return s;
5255 }
5256 
DumpManifest(Options & options,std::string & dscname,bool verbose,bool hex,bool json)5257 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
5258                                 bool verbose, bool hex, bool json) {
5259   // Open the specified manifest file.
5260   std::unique_ptr<SequentialFileReader> file_reader;
5261   Status s;
5262   {
5263     std::unique_ptr<FSSequentialFile> file;
5264     const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
5265     s = fs->NewSequentialFile(
5266         dscname,
5267         fs->OptimizeForManifestRead(file_options_), &file,
5268         nullptr);
5269     if (!s.ok()) {
5270       return s;
5271     }
5272     file_reader.reset(new SequentialFileReader(
5273         std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
5274   }
5275 
5276   std::vector<ColumnFamilyDescriptor> column_families(
5277       1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
5278   DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
5279                               json);
5280   {
5281     VersionSet::LogReporter reporter;
5282     reporter.status = &s;
5283     log::Reader reader(nullptr, std::move(file_reader), &reporter,
5284                        true /* checksum */, 0 /* log_number */);
5285     handler.Iterate(reader, &s);
5286   }
5287 
5288   return handler.status();
5289 }
5290 #endif  // ROCKSDB_LITE
5291 
MarkFileNumberUsed(uint64_t number)5292 void VersionSet::MarkFileNumberUsed(uint64_t number) {
5293   // only called during recovery and repair which are single threaded, so this
5294   // works because there can't be concurrent calls
5295   if (next_file_number_.load(std::memory_order_relaxed) <= number) {
5296     next_file_number_.store(number + 1, std::memory_order_relaxed);
5297   }
5298 }
5299 // Called only either from ::LogAndApply which is protected by mutex or during
5300 // recovery which is single-threaded.
MarkMinLogNumberToKeep2PC(uint64_t number)5301 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
5302   if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
5303     min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
5304   }
5305 }
5306 
WriteCurrentStateToManifest(const std::unordered_map<uint32_t,MutableCFState> & curr_state,const VersionEdit & wal_additions,log::Writer * log,IOStatus & io_s)5307 Status VersionSet::WriteCurrentStateToManifest(
5308     const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5309     const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
5310   // TODO: Break up into multiple records to reduce memory usage on recovery?
5311 
5312   // WARNING: This method doesn't hold a mutex!!
5313 
5314   // This is done without DB mutex lock held, but only within single-threaded
5315   // LogAndApply. Column family manipulations can only happen within LogAndApply
5316   // (the same single thread), so we're safe to iterate.
5317 
5318   assert(io_s.ok());
5319   if (db_options_->write_dbid_to_manifest) {
5320     VersionEdit edit_for_db_id;
5321     assert(!db_id_.empty());
5322     edit_for_db_id.SetDBId(db_id_);
5323     std::string db_id_record;
5324     if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5325       return Status::Corruption("Unable to Encode VersionEdit:" +
5326                                 edit_for_db_id.DebugString(true));
5327     }
5328     io_s = log->AddRecord(db_id_record);
5329     if (!io_s.ok()) {
5330       return io_s;
5331     }
5332   }
5333 
5334   // Save WALs.
5335   if (!wal_additions.GetWalAdditions().empty()) {
5336     TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
5337                              const_cast<VersionEdit*>(&wal_additions));
5338     std::string record;
5339     if (!wal_additions.EncodeTo(&record)) {
5340       return Status::Corruption("Unable to Encode VersionEdit: " +
5341                                 wal_additions.DebugString(true));
5342     }
5343     io_s = log->AddRecord(record);
5344     if (!io_s.ok()) {
5345       return io_s;
5346     }
5347   }
5348 
5349   for (auto cfd : *column_family_set_) {
5350     assert(cfd);
5351 
5352     if (cfd->IsDropped()) {
5353       continue;
5354     }
5355     assert(cfd->initialized());
5356     {
5357       // Store column family info
5358       VersionEdit edit;
5359       if (cfd->GetID() != 0) {
5360         // default column family is always there,
5361         // no need to explicitly write it
5362         edit.AddColumnFamily(cfd->GetName());
5363         edit.SetColumnFamily(cfd->GetID());
5364       }
5365       edit.SetComparatorName(
5366           cfd->internal_comparator().user_comparator()->Name());
5367       std::string record;
5368       if (!edit.EncodeTo(&record)) {
5369         return Status::Corruption(
5370             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5371       }
5372       io_s = log->AddRecord(record);
5373       if (!io_s.ok()) {
5374         return io_s;
5375       }
5376     }
5377 
5378     {
5379       // Save files
5380       VersionEdit edit;
5381       edit.SetColumnFamily(cfd->GetID());
5382 
5383       assert(cfd->current());
5384       assert(cfd->current()->storage_info());
5385 
5386       for (int level = 0; level < cfd->NumberLevels(); level++) {
5387         for (const auto& f :
5388              cfd->current()->storage_info()->LevelFiles(level)) {
5389           edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5390                        f->fd.GetFileSize(), f->smallest, f->largest,
5391                        f->fd.smallest_seqno, f->fd.largest_seqno,
5392                        f->marked_for_compaction, f->oldest_blob_file_number,
5393                        f->oldest_ancester_time, f->file_creation_time,
5394                        f->file_checksum, f->file_checksum_func_name);
5395         }
5396       }
5397 
5398       const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
5399       for (const auto& pair : blob_files) {
5400         const uint64_t blob_file_number = pair.first;
5401         const auto& meta = pair.second;
5402 
5403         assert(meta);
5404         assert(blob_file_number == meta->GetBlobFileNumber());
5405 
5406         edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
5407                          meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
5408                          meta->GetChecksumValue());
5409         if (meta->GetGarbageBlobCount() > 0) {
5410           edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
5411                                   meta->GetGarbageBlobBytes());
5412         }
5413       }
5414 
5415       const auto iter = curr_state.find(cfd->GetID());
5416       assert(iter != curr_state.end());
5417       uint64_t log_number = iter->second.log_number;
5418       edit.SetLogNumber(log_number);
5419 
5420       if (cfd->GetID() == 0) {
5421         // min_log_number_to_keep is for the whole db, not for specific column family.
5422         // So it does not need to be set for every column family, just need to be set once.
5423         // Since default CF can never be dropped, we set the min_log to the default CF here.
5424         uint64_t min_log = min_log_number_to_keep_2pc();
5425         if (min_log != 0) {
5426           edit.SetMinLogNumberToKeep(min_log);
5427         }
5428       }
5429 
5430       const std::string& full_history_ts_low = iter->second.full_history_ts_low;
5431       if (!full_history_ts_low.empty()) {
5432         edit.SetFullHistoryTsLow(full_history_ts_low);
5433       }
5434       std::string record;
5435       if (!edit.EncodeTo(&record)) {
5436         return Status::Corruption(
5437             "Unable to Encode VersionEdit:" + edit.DebugString(true));
5438       }
5439       io_s = log->AddRecord(record);
5440       if (!io_s.ok()) {
5441         return io_s;
5442       }
5443     }
5444   }
5445   return Status::OK();
5446 }
5447 
5448 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5449 // function is called repeatedly with consecutive pairs of slices. For example
5450 // if the slice list is [a, b, c, d] this function is called with arguments
5451 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5452 // we avoid doing binary search for the keys b and c twice and instead somehow
5453 // maintain state of where they first appear in the files.
ApproximateSize(const SizeApproximationOptions & options,Version * v,const Slice & start,const Slice & end,int start_level,int end_level,TableReaderCaller caller)5454 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5455                                      Version* v, const Slice& start,
5456                                      const Slice& end, int start_level,
5457                                      int end_level, TableReaderCaller caller) {
5458   const auto& icmp = v->cfd_->internal_comparator();
5459 
5460   // pre-condition
5461   assert(icmp.Compare(start, end) <= 0);
5462 
5463   uint64_t total_full_size = 0;
5464   const auto* vstorage = v->storage_info();
5465   const int num_non_empty_levels = vstorage->num_non_empty_levels();
5466   end_level = (end_level == -1) ? num_non_empty_levels
5467                                 : std::min(end_level, num_non_empty_levels);
5468 
5469   assert(start_level <= end_level);
5470 
5471   // Outline of the optimization that uses options.files_size_error_margin.
5472   // When approximating the files total size that is used to store a keys range,
5473   // we first sum up the sizes of the files that fully fall into the range.
5474   // Then we sum up the sizes of all the files that may intersect with the range
5475   // (this includes all files in L0 as well). Then, if total_intersecting_size
5476   // is smaller than total_full_size * options.files_size_error_margin - we can
5477   // infer that the intersecting files have a sufficiently negligible
5478   // contribution to the total size, and we can approximate the storage required
5479   // for the keys in range as just half of the intersecting_files_size.
5480   // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5481   // approximation is limited to only ~10% of the total size of files that fully
5482   // fall into the keys range. In such case, this helps to avoid a costly
5483   // process of binary searching the intersecting files that is required only
5484   // for a more precise calculation of the total size.
5485 
5486   autovector<FdWithKeyRange*, 32> first_files;
5487   autovector<FdWithKeyRange*, 16> last_files;
5488 
5489   // scan all the levels
5490   for (int level = start_level; level < end_level; ++level) {
5491     const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5492     if (files_brief.num_files == 0) {
5493       // empty level, skip exploration
5494       continue;
5495     }
5496 
5497     if (level == 0) {
5498       // level 0 files are not in sorted order, we need to iterate through
5499       // the list to compute the total bytes that require scanning,
5500       // so handle the case explicitly (similarly to first_files case)
5501       for (size_t i = 0; i < files_brief.num_files; i++) {
5502         first_files.push_back(&files_brief.files[i]);
5503       }
5504       continue;
5505     }
5506 
5507     assert(level > 0);
5508     assert(files_brief.num_files > 0);
5509 
5510     // identify the file position for start key
5511     const int idx_start =
5512         FindFileInRange(icmp, files_brief, start, 0,
5513                         static_cast<uint32_t>(files_brief.num_files - 1));
5514     assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5515 
5516     // identify the file position for end key
5517     int idx_end = idx_start;
5518     if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5519       idx_end =
5520           FindFileInRange(icmp, files_brief, end, idx_start,
5521                           static_cast<uint32_t>(files_brief.num_files - 1));
5522     }
5523     assert(idx_end >= idx_start &&
5524            static_cast<size_t>(idx_end) < files_brief.num_files);
5525 
5526     // scan all files from the starting index to the ending index
5527     // (inferred from the sorted order)
5528 
5529     // first scan all the intermediate full files (excluding first and last)
5530     for (int i = idx_start + 1; i < idx_end; ++i) {
5531       uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5532       // The entire file falls into the range, so we can just take its size.
5533       assert(file_size ==
5534              ApproximateSize(v, files_brief.files[i], start, end, caller));
5535       total_full_size += file_size;
5536     }
5537 
5538     // save the first and the last files (which may be the same file), so we
5539     // can scan them later.
5540     first_files.push_back(&files_brief.files[idx_start]);
5541     if (idx_start != idx_end) {
5542       // we need to estimate size for both files, only if they are different
5543       last_files.push_back(&files_brief.files[idx_end]);
5544     }
5545   }
5546 
5547   // The sum of all file sizes that intersect the [start, end] keys range.
5548   uint64_t total_intersecting_size = 0;
5549   for (const auto* file_ptr : first_files) {
5550     total_intersecting_size += file_ptr->fd.GetFileSize();
5551   }
5552   for (const auto* file_ptr : last_files) {
5553     total_intersecting_size += file_ptr->fd.GetFileSize();
5554   }
5555 
5556   // Now scan all the first & last files at each level, and estimate their size.
5557   // If the total_intersecting_size is less than X% of the total_full_size - we
5558   // want to approximate the result in order to avoid the costly binary search
5559   // inside ApproximateSize. We use half of file size as an approximation below.
5560 
5561   const double margin = options.files_size_error_margin;
5562   if (margin > 0 && total_intersecting_size <
5563                         static_cast<uint64_t>(total_full_size * margin)) {
5564     total_full_size += total_intersecting_size / 2;
5565   } else {
5566     // Estimate for all the first files (might also be last files), at each
5567     // level
5568     for (const auto file_ptr : first_files) {
5569       total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5570     }
5571 
5572     // Estimate for all the last files, at each level
5573     for (const auto file_ptr : last_files) {
5574       // We could use ApproximateSize here, but calling ApproximateOffsetOf
5575       // directly is just more efficient.
5576       total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5577     }
5578   }
5579 
5580   return total_full_size;
5581 }
5582 
ApproximateOffsetOf(Version * v,const FdWithKeyRange & f,const Slice & key,TableReaderCaller caller)5583 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5584                                          const Slice& key,
5585                                          TableReaderCaller caller) {
5586   // pre-condition
5587   assert(v);
5588   const auto& icmp = v->cfd_->internal_comparator();
5589 
5590   uint64_t result = 0;
5591   if (icmp.Compare(f.largest_key, key) <= 0) {
5592     // Entire file is before "key", so just add the file size
5593     result = f.fd.GetFileSize();
5594   } else if (icmp.Compare(f.smallest_key, key) > 0) {
5595     // Entire file is after "key", so ignore
5596     result = 0;
5597   } else {
5598     // "key" falls in the range for this table.  Add the
5599     // approximate offset of "key" within the table.
5600     TableCache* table_cache = v->cfd_->table_cache();
5601     if (table_cache != nullptr) {
5602       result = table_cache->ApproximateOffsetOf(
5603           key, f.file_metadata->fd, caller, icmp,
5604           v->GetMutableCFOptions().prefix_extractor.get());
5605     }
5606   }
5607   return result;
5608 }
5609 
ApproximateSize(Version * v,const FdWithKeyRange & f,const Slice & start,const Slice & end,TableReaderCaller caller)5610 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5611                                      const Slice& start, const Slice& end,
5612                                      TableReaderCaller caller) {
5613   // pre-condition
5614   assert(v);
5615   const auto& icmp = v->cfd_->internal_comparator();
5616   assert(icmp.Compare(start, end) <= 0);
5617 
5618   if (icmp.Compare(f.largest_key, start) <= 0 ||
5619       icmp.Compare(f.smallest_key, end) > 0) {
5620     // Entire file is before or after the start/end keys range
5621     return 0;
5622   }
5623 
5624   if (icmp.Compare(f.smallest_key, start) >= 0) {
5625     // Start of the range is before the file start - approximate by end offset
5626     return ApproximateOffsetOf(v, f, end, caller);
5627   }
5628 
5629   if (icmp.Compare(f.largest_key, end) < 0) {
5630     // End of the range is after the file end - approximate by subtracting
5631     // start offset from the file size
5632     uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5633     assert(f.fd.GetFileSize() >= start_offset);
5634     return f.fd.GetFileSize() - start_offset;
5635   }
5636 
5637   // The interval falls entirely in the range for this file.
5638   TableCache* table_cache = v->cfd_->table_cache();
5639   if (table_cache == nullptr) {
5640     return 0;
5641   }
5642   return table_cache->ApproximateSize(
5643       start, end, f.file_metadata->fd, caller, icmp,
5644       v->GetMutableCFOptions().prefix_extractor.get());
5645 }
5646 
AddLiveFiles(std::vector<uint64_t> * live_table_files,std::vector<uint64_t> * live_blob_files) const5647 void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
5648                               std::vector<uint64_t>* live_blob_files) const {
5649   assert(live_table_files);
5650   assert(live_blob_files);
5651 
5652   // pre-calculate space requirement
5653   size_t total_table_files = 0;
5654   size_t total_blob_files = 0;
5655 
5656   assert(column_family_set_);
5657   for (auto cfd : *column_family_set_) {
5658     assert(cfd);
5659 
5660     if (!cfd->initialized()) {
5661       continue;
5662     }
5663 
5664     Version* const dummy_versions = cfd->dummy_versions();
5665     assert(dummy_versions);
5666 
5667     for (Version* v = dummy_versions->next_; v != dummy_versions;
5668          v = v->next_) {
5669       assert(v);
5670 
5671       const auto* vstorage = v->storage_info();
5672       assert(vstorage);
5673 
5674       for (int level = 0; level < vstorage->num_levels(); ++level) {
5675         total_table_files += vstorage->LevelFiles(level).size();
5676       }
5677 
5678       total_blob_files += vstorage->GetBlobFiles().size();
5679     }
5680   }
5681 
5682   // just one time extension to the right size
5683   live_table_files->reserve(live_table_files->size() + total_table_files);
5684   live_blob_files->reserve(live_blob_files->size() + total_blob_files);
5685 
5686   assert(column_family_set_);
5687   for (auto cfd : *column_family_set_) {
5688     assert(cfd);
5689     if (!cfd->initialized()) {
5690       continue;
5691     }
5692 
5693     auto* current = cfd->current();
5694     bool found_current = false;
5695 
5696     Version* const dummy_versions = cfd->dummy_versions();
5697     assert(dummy_versions);
5698 
5699     for (Version* v = dummy_versions->next_; v != dummy_versions;
5700          v = v->next_) {
5701       v->AddLiveFiles(live_table_files, live_blob_files);
5702       if (v == current) {
5703         found_current = true;
5704       }
5705     }
5706 
5707     if (!found_current && current != nullptr) {
5708       // Should never happen unless it is a bug.
5709       assert(false);
5710       current->AddLiveFiles(live_table_files, live_blob_files);
5711     }
5712   }
5713 }
5714 
MakeInputIterator(const ReadOptions & read_options,const Compaction * c,RangeDelAggregator * range_del_agg,const FileOptions & file_options_compactions)5715 InternalIterator* VersionSet::MakeInputIterator(
5716     const ReadOptions& read_options, const Compaction* c,
5717     RangeDelAggregator* range_del_agg,
5718     const FileOptions& file_options_compactions) {
5719   auto cfd = c->column_family_data();
5720   // Level-0 files have to be merged together.  For other levels,
5721   // we will make a concatenating iterator per level.
5722   // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5723   const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5724                                               c->num_input_levels() - 1
5725                                         : c->num_input_levels());
5726   InternalIterator** list = new InternalIterator* [space];
5727   size_t num = 0;
5728   for (size_t which = 0; which < c->num_input_levels(); which++) {
5729     if (c->input_levels(which)->num_files != 0) {
5730       if (c->level(which) == 0) {
5731         const LevelFilesBrief* flevel = c->input_levels(which);
5732         for (size_t i = 0; i < flevel->num_files; i++) {
5733           list[num++] = cfd->table_cache()->NewIterator(
5734               read_options, file_options_compactions,
5735               cfd->internal_comparator(), *flevel->files[i].file_metadata,
5736               range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
5737               /*table_reader_ptr=*/nullptr,
5738               /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5739               /*arena=*/nullptr,
5740               /*skip_filters=*/false,
5741               /*level=*/static_cast<int>(c->level(which)),
5742               MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
5743               /*smallest_compaction_key=*/nullptr,
5744               /*largest_compaction_key=*/nullptr,
5745               /*allow_unprepared_value=*/false);
5746         }
5747       } else {
5748         // Create concatenating iterator for the files from this level
5749         list[num++] = new LevelIterator(
5750             cfd->table_cache(), read_options, file_options_compactions,
5751             cfd->internal_comparator(), c->input_levels(which),
5752             c->mutable_cf_options()->prefix_extractor.get(),
5753             /*should_sample=*/false,
5754             /*no per level latency histogram=*/nullptr,
5755             TableReaderCaller::kCompaction, /*skip_filters=*/false,
5756             /*level=*/static_cast<int>(c->level(which)), range_del_agg,
5757             c->boundaries(which));
5758       }
5759     }
5760   }
5761   assert(num <= space);
5762   InternalIterator* result =
5763       NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5764                          static_cast<int>(num));
5765   delete[] list;
5766   return result;
5767 }
5768 
GetMetadataForFile(uint64_t number,int * filelevel,FileMetaData ** meta,ColumnFamilyData ** cfd)5769 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5770                                       FileMetaData** meta,
5771                                       ColumnFamilyData** cfd) {
5772   for (auto cfd_iter : *column_family_set_) {
5773     if (!cfd_iter->initialized()) {
5774       continue;
5775     }
5776     Version* version = cfd_iter->current();
5777     const auto* vstorage = version->storage_info();
5778     for (int level = 0; level < vstorage->num_levels(); level++) {
5779       for (const auto& file : vstorage->LevelFiles(level)) {
5780         if (file->fd.GetNumber() == number) {
5781           *meta = file;
5782           *filelevel = level;
5783           *cfd = cfd_iter;
5784           return Status::OK();
5785         }
5786       }
5787     }
5788   }
5789   return Status::NotFound("File not present in any level");
5790 }
5791 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)5792 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5793   for (auto cfd : *column_family_set_) {
5794     if (cfd->IsDropped() || !cfd->initialized()) {
5795       continue;
5796     }
5797     for (int level = 0; level < cfd->NumberLevels(); level++) {
5798       for (const auto& file :
5799            cfd->current()->storage_info()->LevelFiles(level)) {
5800         LiveFileMetaData filemetadata;
5801         filemetadata.column_family_name = cfd->GetName();
5802         uint32_t path_id = file->fd.GetPathId();
5803         if (path_id < cfd->ioptions()->cf_paths.size()) {
5804           filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5805         } else {
5806           assert(!cfd->ioptions()->cf_paths.empty());
5807           filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5808         }
5809         const uint64_t file_number = file->fd.GetNumber();
5810         filemetadata.name = MakeTableFileName("", file_number);
5811         filemetadata.file_number = file_number;
5812         filemetadata.level = level;
5813         filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5814         filemetadata.smallestkey = file->smallest.user_key().ToString();
5815         filemetadata.largestkey = file->largest.user_key().ToString();
5816         filemetadata.smallest_seqno = file->fd.smallest_seqno;
5817         filemetadata.largest_seqno = file->fd.largest_seqno;
5818         filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5819             std::memory_order_relaxed);
5820         filemetadata.being_compacted = file->being_compacted;
5821         filemetadata.num_entries = file->num_entries;
5822         filemetadata.num_deletions = file->num_deletions;
5823         filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5824         filemetadata.file_checksum = file->file_checksum;
5825         filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5826         filemetadata.temperature = file->temperature;
5827         filemetadata.oldest_ancester_time = file->TryGetOldestAncesterTime();
5828         filemetadata.file_creation_time = file->TryGetFileCreationTime();
5829         metadata->push_back(filemetadata);
5830       }
5831     }
5832   }
5833 }
5834 
GetObsoleteFiles(std::vector<ObsoleteFileInfo> * files,std::vector<ObsoleteBlobFileInfo> * blob_files,std::vector<std::string> * manifest_filenames,uint64_t min_pending_output)5835 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5836                                   std::vector<ObsoleteBlobFileInfo>* blob_files,
5837                                   std::vector<std::string>* manifest_filenames,
5838                                   uint64_t min_pending_output) {
5839   assert(files);
5840   assert(blob_files);
5841   assert(manifest_filenames);
5842   assert(files->empty());
5843   assert(blob_files->empty());
5844   assert(manifest_filenames->empty());
5845 
5846   std::vector<ObsoleteFileInfo> pending_files;
5847   for (auto& f : obsolete_files_) {
5848     if (f.metadata->fd.GetNumber() < min_pending_output) {
5849       files->emplace_back(std::move(f));
5850     } else {
5851       pending_files.emplace_back(std::move(f));
5852     }
5853   }
5854   obsolete_files_.swap(pending_files);
5855 
5856   std::vector<ObsoleteBlobFileInfo> pending_blob_files;
5857   for (auto& blob_file : obsolete_blob_files_) {
5858     if (blob_file.GetBlobFileNumber() < min_pending_output) {
5859       blob_files->emplace_back(std::move(blob_file));
5860     } else {
5861       pending_blob_files.emplace_back(std::move(blob_file));
5862     }
5863   }
5864   obsolete_blob_files_.swap(pending_blob_files);
5865 
5866   obsolete_manifests_.swap(*manifest_filenames);
5867 }
5868 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const VersionEdit * edit)5869 ColumnFamilyData* VersionSet::CreateColumnFamily(
5870     const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
5871   assert(edit->is_column_family_add_);
5872 
5873   MutableCFOptions dummy_cf_options;
5874   Version* dummy_versions =
5875       new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
5876   // Ref() dummy version once so that later we can call Unref() to delete it
5877   // by avoiding calling "delete" explicitly (~Version is private)
5878   dummy_versions->Ref();
5879   auto new_cfd = column_family_set_->CreateColumnFamily(
5880       edit->column_family_name_, edit->column_family_, dummy_versions,
5881       cf_options);
5882 
5883   Version* v = new Version(new_cfd, this, file_options_,
5884                            *new_cfd->GetLatestMutableCFOptions(), io_tracer_,
5885                            current_version_number_++);
5886 
5887   // Fill level target base information.
5888   v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5889                                         *new_cfd->GetLatestMutableCFOptions());
5890   AppendVersion(new_cfd, v);
5891   // GetLatestMutableCFOptions() is safe here without mutex since the
5892   // cfd is not available to client
5893   new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5894                              LastSequence());
5895   new_cfd->SetLogNumber(edit->log_number_);
5896   return new_cfd;
5897 }
5898 
GetNumLiveVersions(Version * dummy_versions)5899 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5900   uint64_t count = 0;
5901   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5902     count++;
5903   }
5904   return count;
5905 }
5906 
GetTotalSstFilesSize(Version * dummy_versions)5907 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5908   std::unordered_set<uint64_t> unique_files;
5909   uint64_t total_files_size = 0;
5910   for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5911     VersionStorageInfo* storage_info = v->storage_info();
5912     for (int level = 0; level < storage_info->num_levels_; level++) {
5913       for (const auto& file_meta : storage_info->LevelFiles(level)) {
5914         if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5915             unique_files.end()) {
5916           unique_files.insert(file_meta->fd.packed_number_and_path_id);
5917           total_files_size += file_meta->fd.GetFileSize();
5918         }
5919       }
5920     }
5921   }
5922   return total_files_size;
5923 }
5924 
GetTotalBlobFileSize(Version * dummy_versions)5925 uint64_t VersionSet::GetTotalBlobFileSize(Version* dummy_versions) {
5926   std::unordered_set<uint64_t> unique_blob_files;
5927   uint64_t all_v_blob_file_size = 0;
5928   for (auto* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5929     // iterate all the versions
5930     auto* vstorage = v->storage_info();
5931     const auto& blob_files = vstorage->GetBlobFiles();
5932     for (const auto& pair : blob_files) {
5933       if (unique_blob_files.find(pair.first) == unique_blob_files.end()) {
5934         // find Blob file that has not been counted
5935         unique_blob_files.insert(pair.first);
5936         const auto& meta = pair.second;
5937         all_v_blob_file_size += meta->GetBlobFileSize();
5938       }
5939     }
5940   }
5941   return all_v_blob_file_size;
5942 }
5943 
VerifyFileMetadata(const std::string & fpath,const FileMetaData & meta) const5944 Status VersionSet::VerifyFileMetadata(const std::string& fpath,
5945                                       const FileMetaData& meta) const {
5946   uint64_t fsize = 0;
5947   Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
5948   if (status.ok()) {
5949     if (fsize != meta.fd.GetFileSize()) {
5950       status = Status::Corruption("File size mismatch: " + fpath);
5951     }
5952   }
5953   return status;
5954 }
5955 
ReactiveVersionSet(const std::string & dbname,const ImmutableDBOptions * _db_options,const FileOptions & _file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,const std::shared_ptr<IOTracer> & io_tracer)5956 ReactiveVersionSet::ReactiveVersionSet(
5957     const std::string& dbname, const ImmutableDBOptions* _db_options,
5958     const FileOptions& _file_options, Cache* table_cache,
5959     WriteBufferManager* write_buffer_manager, WriteController* write_controller,
5960     const std::shared_ptr<IOTracer>& io_tracer)
5961     : VersionSet(dbname, _db_options, _file_options, table_cache,
5962                  write_buffer_manager, write_controller,
5963                  /*block_cache_tracer=*/nullptr, io_tracer,
5964                  /*db_session_id*/ "") {}
5965 
~ReactiveVersionSet()5966 ReactiveVersionSet::~ReactiveVersionSet() {}
5967 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,std::unique_ptr<log::Reader::Reporter> * manifest_reporter,std::unique_ptr<Status> * manifest_reader_status)5968 Status ReactiveVersionSet::Recover(
5969     const std::vector<ColumnFamilyDescriptor>& column_families,
5970     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5971     std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5972     std::unique_ptr<Status>* manifest_reader_status) {
5973   assert(manifest_reader != nullptr);
5974   assert(manifest_reporter != nullptr);
5975   assert(manifest_reader_status != nullptr);
5976 
5977   manifest_reader_status->reset(new Status());
5978   manifest_reporter->reset(new LogReporter());
5979   static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
5980       manifest_reader_status->get();
5981   Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5982   if (!s.ok()) {
5983     return s;
5984   }
5985   log::Reader* reader = manifest_reader->get();
5986   assert(reader);
5987 
5988   manifest_tailer_.reset(new ManifestTailer(
5989       column_families, const_cast<ReactiveVersionSet*>(this), io_tracer_));
5990 
5991   manifest_tailer_->Iterate(*reader, manifest_reader_status->get());
5992 
5993   return manifest_tailer_->status();
5994 }
5995 
ReadAndApply(InstrumentedMutex * mu,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader,Status * manifest_read_status,std::unordered_set<ColumnFamilyData * > * cfds_changed)5996 Status ReactiveVersionSet::ReadAndApply(
5997     InstrumentedMutex* mu,
5998     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5999     Status* manifest_read_status,
6000     std::unordered_set<ColumnFamilyData*>* cfds_changed) {
6001   assert(manifest_reader != nullptr);
6002   assert(cfds_changed != nullptr);
6003   mu->AssertHeld();
6004 
6005   Status s;
6006   log::Reader* reader = manifest_reader->get();
6007   assert(reader);
6008   s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
6009   if (!s.ok()) {
6010     return s;
6011   }
6012   manifest_tailer_->Iterate(*(manifest_reader->get()), manifest_read_status);
6013   s = manifest_tailer_->status();
6014   if (s.ok()) {
6015     *cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies());
6016   }
6017 
6018   return s;
6019 }
6020 
MaybeSwitchManifest(log::Reader::Reporter * reporter,std::unique_ptr<log::FragmentBufferedReader> * manifest_reader)6021 Status ReactiveVersionSet::MaybeSwitchManifest(
6022     log::Reader::Reporter* reporter,
6023     std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
6024   assert(manifest_reader != nullptr);
6025   Status s;
6026   std::string manifest_path;
6027   s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
6028                              &manifest_file_number_);
6029   if (!s.ok()) {
6030     return s;
6031   }
6032   std::unique_ptr<FSSequentialFile> manifest_file;
6033   if (manifest_reader->get() != nullptr &&
6034       manifest_reader->get()->file()->file_name() == manifest_path) {
6035     // CURRENT points to the same MANIFEST as before, no need to switch
6036     // MANIFEST.
6037     return s;
6038   }
6039   assert(nullptr == manifest_reader->get() ||
6040          manifest_reader->get()->file()->file_name() != manifest_path);
6041   s = fs_->FileExists(manifest_path, IOOptions(), nullptr);
6042   if (s.IsNotFound()) {
6043     return Status::TryAgain(
6044         "The primary may have switched to a new MANIFEST and deleted the old "
6045         "one.");
6046   } else if (!s.ok()) {
6047     return s;
6048   }
6049   TEST_SYNC_POINT(
6050       "ReactiveVersionSet::MaybeSwitchManifest:"
6051       "AfterGetCurrentManifestPath:0");
6052   TEST_SYNC_POINT(
6053       "ReactiveVersionSet::MaybeSwitchManifest:"
6054       "AfterGetCurrentManifestPath:1");
6055   // The primary can also delete the MANIFEST while the secondary is reading
6056   // it. This is OK on POSIX. For other file systems, maybe create a hard link
6057   // to MANIFEST. The hard link should be cleaned up later by the secondary.
6058   s = fs_->NewSequentialFile(manifest_path,
6059                              fs_->OptimizeForManifestRead(file_options_),
6060                              &manifest_file, nullptr);
6061   std::unique_ptr<SequentialFileReader> manifest_file_reader;
6062   if (s.ok()) {
6063     manifest_file_reader.reset(new SequentialFileReader(
6064         std::move(manifest_file), manifest_path,
6065         db_options_->log_readahead_size, io_tracer_, db_options_->listeners));
6066     manifest_reader->reset(new log::FragmentBufferedReader(
6067         nullptr, std::move(manifest_file_reader), reporter, true /* checksum */,
6068         0 /* log_number */));
6069     ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
6070                    manifest_path.c_str());
6071     if (manifest_tailer_) {
6072       manifest_tailer_->PrepareToReadNewManifest();
6073     }
6074   } else if (s.IsPathNotFound()) {
6075     // This can happen if the primary switches to a new MANIFEST after the
6076     // secondary reads the CURRENT file but before the secondary actually tries
6077     // to open the MANIFEST.
6078     s = Status::TryAgain(
6079         "The primary may have switched to a new MANIFEST and deleted the old "
6080         "one.");
6081   }
6082   return s;
6083 }
6084 
6085 #ifndef NDEBUG
TEST_read_edits_in_atomic_group() const6086 uint64_t ReactiveVersionSet::TEST_read_edits_in_atomic_group() const {
6087   assert(manifest_tailer_);
6088   return manifest_tailer_->GetReadBuffer().TEST_read_edits_in_atomic_group();
6089 }
6090 #endif  // !NDEBUG
6091 
replay_buffer()6092 std::vector<VersionEdit>& ReactiveVersionSet::replay_buffer() {
6093   assert(manifest_tailer_);
6094   return manifest_tailer_->GetReadBuffer().replay_buffer();
6095 }
6096 
6097 }  // namespace ROCKSDB_NAMESPACE
6098