1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/version_builder.h"
11 
12 #include <algorithm>
13 #include <atomic>
14 #include <cinttypes>
15 #include <functional>
16 #include <map>
17 #include <memory>
18 #include <set>
19 #include <sstream>
20 #include <thread>
21 #include <unordered_map>
22 #include <unordered_set>
23 #include <utility>
24 #include <vector>
25 
26 #include "db/blob/blob_file_meta.h"
27 #include "db/dbformat.h"
28 #include "db/internal_stats.h"
29 #include "db/table_cache.h"
30 #include "db/version_set.h"
31 #include "port/port.h"
32 #include "table/table_reader.h"
33 #include "util/string_util.h"
34 
35 namespace ROCKSDB_NAMESPACE {
36 
NewestFirstBySeqNo(FileMetaData * a,FileMetaData * b)37 bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
38   if (a->fd.largest_seqno != b->fd.largest_seqno) {
39     return a->fd.largest_seqno > b->fd.largest_seqno;
40   }
41   if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
42     return a->fd.smallest_seqno > b->fd.smallest_seqno;
43   }
44   // Break ties by file number
45   return a->fd.GetNumber() > b->fd.GetNumber();
46 }
47 
48 namespace {
BySmallestKey(FileMetaData * a,FileMetaData * b,const InternalKeyComparator * cmp)49 bool BySmallestKey(FileMetaData* a, FileMetaData* b,
50                    const InternalKeyComparator* cmp) {
51   int r = cmp->Compare(a->smallest, b->smallest);
52   if (r != 0) {
53     return (r < 0);
54   }
55   // Break ties by file number
56   return (a->fd.GetNumber() < b->fd.GetNumber());
57 }
58 }  // namespace
59 
60 class VersionBuilder::Rep {
61  private:
62   // Helper to sort files_ in v
63   // kLevel0 -- NewestFirstBySeqNo
64   // kLevelNon0 -- BySmallestKey
65   struct FileComparator {
66     enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
67     const InternalKeyComparator* internal_comparator;
68 
FileComparatorROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator69     FileComparator() : internal_comparator(nullptr) {}
70 
operator ()ROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator71     bool operator()(FileMetaData* f1, FileMetaData* f2) const {
72       switch (sort_method) {
73         case kLevel0:
74           return NewestFirstBySeqNo(f1, f2);
75         case kLevelNon0:
76           return BySmallestKey(f1, f2, internal_comparator);
77       }
78       assert(false);
79       return false;
80     }
81   };
82 
83   struct LevelState {
84     std::unordered_set<uint64_t> deleted_files;
85     // Map from file number to file meta data.
86     std::unordered_map<uint64_t, FileMetaData*> added_files;
87   };
88 
89   class BlobFileMetaDataDelta {
90    public:
IsEmpty() const91     bool IsEmpty() const {
92       return !shared_meta_ && !additional_garbage_count_ &&
93              !additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
94              newly_unlinked_ssts_.empty();
95     }
96 
GetSharedMeta() const97     std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
98       return shared_meta_;
99     }
100 
GetAdditionalGarbageCount() const101     uint64_t GetAdditionalGarbageCount() const {
102       return additional_garbage_count_;
103     }
104 
GetAdditionalGarbageBytes() const105     uint64_t GetAdditionalGarbageBytes() const {
106       return additional_garbage_bytes_;
107     }
108 
GetNewlyLinkedSsts() const109     const std::unordered_set<uint64_t>& GetNewlyLinkedSsts() const {
110       return newly_linked_ssts_;
111     }
112 
GetNewlyUnlinkedSsts() const113     const std::unordered_set<uint64_t>& GetNewlyUnlinkedSsts() const {
114       return newly_unlinked_ssts_;
115     }
116 
SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta)117     void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
118       assert(!shared_meta_);
119       assert(shared_meta);
120 
121       shared_meta_ = std::move(shared_meta);
122     }
123 
AddGarbage(uint64_t count,uint64_t bytes)124     void AddGarbage(uint64_t count, uint64_t bytes) {
125       additional_garbage_count_ += count;
126       additional_garbage_bytes_ += bytes;
127     }
128 
LinkSst(uint64_t sst_file_number)129     void LinkSst(uint64_t sst_file_number) {
130       assert(newly_linked_ssts_.find(sst_file_number) ==
131              newly_linked_ssts_.end());
132 
133       // Reconcile with newly unlinked SSTs on the fly. (Note: an SST can be
134       // linked to and unlinked from the same blob file in the case of a trivial
135       // move.)
136       auto it = newly_unlinked_ssts_.find(sst_file_number);
137 
138       if (it != newly_unlinked_ssts_.end()) {
139         newly_unlinked_ssts_.erase(it);
140       } else {
141         newly_linked_ssts_.emplace(sst_file_number);
142       }
143     }
144 
UnlinkSst(uint64_t sst_file_number)145     void UnlinkSst(uint64_t sst_file_number) {
146       assert(newly_unlinked_ssts_.find(sst_file_number) ==
147              newly_unlinked_ssts_.end());
148 
149       // Reconcile with newly linked SSTs on the fly. (Note: an SST can be
150       // linked to and unlinked from the same blob file in the case of a trivial
151       // move.)
152       auto it = newly_linked_ssts_.find(sst_file_number);
153 
154       if (it != newly_linked_ssts_.end()) {
155         newly_linked_ssts_.erase(it);
156       } else {
157         newly_unlinked_ssts_.emplace(sst_file_number);
158       }
159     }
160 
161    private:
162     std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
163     uint64_t additional_garbage_count_ = 0;
164     uint64_t additional_garbage_bytes_ = 0;
165     std::unordered_set<uint64_t> newly_linked_ssts_;
166     std::unordered_set<uint64_t> newly_unlinked_ssts_;
167   };
168 
169   const FileOptions& file_options_;
170   const ImmutableCFOptions* const ioptions_;
171   TableCache* table_cache_;
172   VersionStorageInfo* base_vstorage_;
173   VersionSet* version_set_;
174   int num_levels_;
175   LevelState* levels_;
176   // Store sizes of levels larger than num_levels_. We do this instead of
177   // storing them in levels_ to avoid regression in case there are no files
178   // on invalid levels. The version is not consistent if in the end the files
179   // on invalid levels don't cancel out.
180   std::unordered_map<int, size_t> invalid_level_sizes_;
181   // Whether there are invalid new files or invalid deletion on levels larger
182   // than num_levels_.
183   bool has_invalid_levels_;
184   // Current levels of table files affected by additions/deletions.
185   std::unordered_map<uint64_t, int> table_file_levels_;
186   FileComparator level_zero_cmp_;
187   FileComparator level_nonzero_cmp_;
188 
189   // Metadata delta for all blob files affected by the series of version edits.
190   std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
191 
192  public:
Rep(const FileOptions & file_options,const ImmutableCFOptions * ioptions,TableCache * table_cache,VersionStorageInfo * base_vstorage,VersionSet * version_set)193   Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
194       TableCache* table_cache, VersionStorageInfo* base_vstorage,
195       VersionSet* version_set)
196       : file_options_(file_options),
197         ioptions_(ioptions),
198         table_cache_(table_cache),
199         base_vstorage_(base_vstorage),
200         version_set_(version_set),
201         num_levels_(base_vstorage->num_levels()),
202         has_invalid_levels_(false) {
203     assert(ioptions_);
204 
205     levels_ = new LevelState[num_levels_];
206     level_zero_cmp_.sort_method = FileComparator::kLevel0;
207     level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
208     level_nonzero_cmp_.internal_comparator =
209         base_vstorage_->InternalComparator();
210   }
211 
~Rep()212   ~Rep() {
213     for (int level = 0; level < num_levels_; level++) {
214       const auto& added = levels_[level].added_files;
215       for (auto& pair : added) {
216         UnrefFile(pair.second);
217       }
218     }
219 
220     delete[] levels_;
221   }
222 
UnrefFile(FileMetaData * f)223   void UnrefFile(FileMetaData* f) {
224     f->refs--;
225     if (f->refs <= 0) {
226       if (f->table_reader_handle) {
227         assert(table_cache_ != nullptr);
228         table_cache_->ReleaseHandle(f->table_reader_handle);
229         f->table_reader_handle = nullptr;
230       }
231       delete f;
232     }
233   }
234 
IsBlobFileInVersion(uint64_t blob_file_number) const235   bool IsBlobFileInVersion(uint64_t blob_file_number) const {
236     auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
237     if (delta_it != blob_file_meta_deltas_.end()) {
238       if (delta_it->second.GetSharedMeta()) {
239         return true;
240       }
241     }
242 
243     assert(base_vstorage_);
244 
245     const auto& base_blob_files = base_vstorage_->GetBlobFiles();
246 
247     auto base_it = base_blob_files.find(blob_file_number);
248     if (base_it != base_blob_files.end()) {
249       assert(base_it->second);
250       assert(base_it->second->GetSharedMeta());
251 
252       return true;
253     }
254 
255     return false;
256   }
257 
258   using ExpectedLinkedSsts =
259       std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
260 
UpdateExpectedLinkedSsts(uint64_t table_file_number,uint64_t blob_file_number,ExpectedLinkedSsts * expected_linked_ssts)261   static void UpdateExpectedLinkedSsts(
262       uint64_t table_file_number, uint64_t blob_file_number,
263       ExpectedLinkedSsts* expected_linked_ssts) {
264     assert(expected_linked_ssts);
265 
266     if (blob_file_number == kInvalidBlobFileNumber) {
267       return;
268     }
269 
270     (*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
271   }
272 
CheckConsistencyDetails(VersionStorageInfo * vstorage)273   Status CheckConsistencyDetails(VersionStorageInfo* vstorage) {
274     // Make sure the files are sorted correctly and that the links between
275     // table files and blob files are consistent. The latter is checked using
276     // the following mapping, which is built using the forward links
277     // (table file -> blob file), and is subsequently compared with the inverse
278     // mapping stored in the BlobFileMetaData objects.
279     ExpectedLinkedSsts expected_linked_ssts;
280 
281     for (int level = 0; level < num_levels_; level++) {
282       auto& level_files = vstorage->LevelFiles(level);
283 
284       if (level_files.empty()) {
285         continue;
286       }
287 
288       assert(level_files[0]);
289       UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
290                                level_files[0]->oldest_blob_file_number,
291                                &expected_linked_ssts);
292       for (size_t i = 1; i < level_files.size(); i++) {
293         assert(level_files[i]);
294         UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
295                                  level_files[i]->oldest_blob_file_number,
296                                  &expected_linked_ssts);
297 
298         auto f1 = level_files[i - 1];
299         auto f2 = level_files[i];
300         if (level == 0) {
301 #ifndef NDEBUG
302           auto pair = std::make_pair(&f1, &f2);
303           TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency0", &pair);
304 #endif
305           if (!level_zero_cmp_(f1, f2)) {
306             return Status::Corruption("L0 files are not sorted properly");
307           }
308 
309           if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
310             // This is an external file that we ingested
311             SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
312             if (!(external_file_seqno < f1->fd.largest_seqno ||
313                   external_file_seqno == 0)) {
314               return Status::Corruption(
315                   "L0 file with seqno " +
316                   NumberToString(f1->fd.smallest_seqno) + " " +
317                   NumberToString(f1->fd.largest_seqno) +
318                   " vs. file with global_seqno" +
319                   NumberToString(external_file_seqno) + " with fileNumber " +
320                   NumberToString(f1->fd.GetNumber()));
321             }
322           } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
323             return Status::Corruption(
324                 "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
325                 " " + NumberToString(f1->fd.largest_seqno) + " " +
326                 NumberToString(f1->fd.GetNumber()) + " vs. " +
327                 NumberToString(f2->fd.smallest_seqno) + " " +
328                 NumberToString(f2->fd.largest_seqno) + " " +
329                 NumberToString(f2->fd.GetNumber()));
330           }
331         } else {
332 #ifndef NDEBUG
333           auto pair = std::make_pair(&f1, &f2);
334           TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency1", &pair);
335 #endif
336           if (!level_nonzero_cmp_(f1, f2)) {
337             return Status::Corruption(
338                 "L" + NumberToString(level) +
339                 " files are not sorted properly: files #" +
340                 NumberToString(f1->fd.GetNumber()) + ", #" +
341                 NumberToString(f2->fd.GetNumber()));
342           }
343 
344           // Make sure there is no overlap in levels > 0
345           if (vstorage->InternalComparator()->Compare(f1->largest,
346                                                       f2->smallest) >= 0) {
347             return Status::Corruption(
348                 "L" + NumberToString(level) +
349                 " have overlapping ranges: file #" +
350                 NumberToString(f1->fd.GetNumber()) +
351                 " largest key: " + (f1->largest).DebugString(true) +
352                 " vs. file #" + NumberToString(f2->fd.GetNumber()) +
353                 " smallest key: " + (f2->smallest).DebugString(true));
354           }
355         }
356       }
357     }
358 
359     // Make sure that all blob files in the version have non-garbage data.
360     const auto& blob_files = vstorage->GetBlobFiles();
361     for (const auto& pair : blob_files) {
362       const uint64_t blob_file_number = pair.first;
363       const auto& blob_file_meta = pair.second;
364       assert(blob_file_meta);
365 
366       if (blob_file_meta->GetGarbageBlobCount() >=
367           blob_file_meta->GetTotalBlobCount()) {
368         std::ostringstream oss;
369         oss << "Blob file #" << blob_file_number
370             << " consists entirely of garbage";
371 
372         return Status::Corruption("VersionBuilder", oss.str());
373       }
374 
375       if (blob_file_meta->GetLinkedSsts() !=
376           expected_linked_ssts[blob_file_number]) {
377         std::ostringstream oss;
378         oss << "Links are inconsistent between table files and blob file #"
379             << blob_file_number;
380 
381         return Status::Corruption("VersionBuilder", oss.str());
382       }
383     }
384 
385     Status ret_s;
386     TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
387                              &ret_s);
388     return ret_s;
389   }
390 
CheckConsistency(VersionStorageInfo * vstorage)391   Status CheckConsistency(VersionStorageInfo* vstorage) {
392     // Always run consistency checks in debug build
393 #ifdef NDEBUG
394     if (!vstorage->force_consistency_checks()) {
395       return Status::OK();
396     }
397 #endif
398     Status s = CheckConsistencyDetails(vstorage);
399     if (s.IsCorruption() && s.getState()) {
400       // Make it clear the error is due to force_consistency_checks = 1 or
401       // debug build
402 #ifdef NDEBUG
403       auto prefix = "force_consistency_checks";
404 #else
405       auto prefix = "force_consistency_checks(DEBUG)";
406 #endif
407       s = Status::Corruption(prefix, s.getState());
408     } else {
409       // was only expecting corruption with message, or OK
410       assert(s.ok());
411     }
412     return s;
413   }
414 
CheckConsistencyForNumLevels() const415   bool CheckConsistencyForNumLevels() const {
416     // Make sure there are no files on or beyond num_levels().
417     if (has_invalid_levels_) {
418       return false;
419     }
420 
421     for (const auto& pair : invalid_level_sizes_) {
422       const size_t level_size = pair.second;
423       if (level_size != 0) {
424         return false;
425       }
426     }
427 
428     return true;
429   }
430 
ApplyBlobFileAddition(const BlobFileAddition & blob_file_addition)431   Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
432     const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
433 
434     if (IsBlobFileInVersion(blob_file_number)) {
435       std::ostringstream oss;
436       oss << "Blob file #" << blob_file_number << " already added";
437 
438       return Status::Corruption("VersionBuilder", oss.str());
439     }
440 
441     // Note: we use C++11 for now but in C++14, this could be done in a more
442     // elegant way using generalized lambda capture.
443     VersionSet* const vs = version_set_;
444     const ImmutableCFOptions* const ioptions = ioptions_;
445 
446     auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
447       if (vs) {
448         assert(ioptions);
449         assert(!ioptions->cf_paths.empty());
450         assert(shared_meta);
451 
452         vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
453                                 ioptions->cf_paths.front().path);
454       }
455 
456       delete shared_meta;
457     };
458 
459     auto shared_meta = SharedBlobFileMetaData::Create(
460         blob_file_number, blob_file_addition.GetTotalBlobCount(),
461         blob_file_addition.GetTotalBlobBytes(),
462         blob_file_addition.GetChecksumMethod(),
463         blob_file_addition.GetChecksumValue(), deleter);
464 
465     blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
466         std::move(shared_meta));
467 
468     return Status::OK();
469   }
470 
ApplyBlobFileGarbage(const BlobFileGarbage & blob_file_garbage)471   Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
472     const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
473 
474     if (!IsBlobFileInVersion(blob_file_number)) {
475       std::ostringstream oss;
476       oss << "Blob file #" << blob_file_number << " not found";
477 
478       return Status::Corruption("VersionBuilder", oss.str());
479     }
480 
481     blob_file_meta_deltas_[blob_file_number].AddGarbage(
482         blob_file_garbage.GetGarbageBlobCount(),
483         blob_file_garbage.GetGarbageBlobBytes());
484 
485     return Status::OK();
486   }
487 
GetCurrentLevelForTableFile(uint64_t file_number) const488   int GetCurrentLevelForTableFile(uint64_t file_number) const {
489     auto it = table_file_levels_.find(file_number);
490     if (it != table_file_levels_.end()) {
491       return it->second;
492     }
493 
494     assert(base_vstorage_);
495     return base_vstorage_->GetFileLocation(file_number).GetLevel();
496   }
497 
GetOldestBlobFileNumberForTableFile(int level,uint64_t file_number) const498   uint64_t GetOldestBlobFileNumberForTableFile(int level,
499                                                uint64_t file_number) const {
500     assert(level < num_levels_);
501 
502     const auto& added_files = levels_[level].added_files;
503 
504     auto it = added_files.find(file_number);
505     if (it != added_files.end()) {
506       const FileMetaData* const meta = it->second;
507       assert(meta);
508 
509       return meta->oldest_blob_file_number;
510     }
511 
512     assert(base_vstorage_);
513     const FileMetaData* const meta =
514         base_vstorage_->GetFileMetaDataByNumber(file_number);
515     assert(meta);
516 
517     return meta->oldest_blob_file_number;
518   }
519 
GetMinOldestBlobFileNumber() const520   uint64_t GetMinOldestBlobFileNumber() const {
521     uint64_t min_oldest_blob_file_num = std::numeric_limits<uint64_t>::max();
522     for (int level = 0; level < num_levels_; ++level) {
523       const auto& base_files = base_vstorage_->LevelFiles(level);
524       for (const auto* fmeta : base_files) {
525         assert(fmeta);
526         min_oldest_blob_file_num =
527             std::min(min_oldest_blob_file_num, fmeta->oldest_blob_file_number);
528       }
529       const auto& added_files = levels_[level].added_files;
530       for (const auto& elem : added_files) {
531         assert(elem.second);
532         min_oldest_blob_file_num = std::min(
533             min_oldest_blob_file_num, elem.second->oldest_blob_file_number);
534       }
535     }
536     if (min_oldest_blob_file_num == std::numeric_limits<uint64_t>::max()) {
537       min_oldest_blob_file_num = kInvalidBlobFileNumber;
538     }
539     return min_oldest_blob_file_num;
540   }
541 
ApplyFileDeletion(int level,uint64_t file_number)542   Status ApplyFileDeletion(int level, uint64_t file_number) {
543     assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
544 
545     const int current_level = GetCurrentLevelForTableFile(file_number);
546 
547     if (level != current_level) {
548       if (level >= num_levels_) {
549         has_invalid_levels_ = true;
550       }
551 
552       std::ostringstream oss;
553       oss << "Cannot delete table file #" << file_number << " from level "
554           << level << " since it is ";
555       if (current_level ==
556           VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
557         oss << "not in the LSM tree";
558       } else {
559         oss << "on level " << current_level;
560       }
561 
562       return Status::Corruption("VersionBuilder", oss.str());
563     }
564 
565     if (level >= num_levels_) {
566       assert(invalid_level_sizes_[level] > 0);
567       --invalid_level_sizes_[level];
568 
569       table_file_levels_[file_number] =
570           VersionStorageInfo::FileLocation::Invalid().GetLevel();
571 
572       return Status::OK();
573     }
574 
575     const uint64_t blob_file_number =
576         GetOldestBlobFileNumberForTableFile(level, file_number);
577 
578     if (blob_file_number != kInvalidBlobFileNumber &&
579         IsBlobFileInVersion(blob_file_number)) {
580       blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
581     }
582 
583     auto& level_state = levels_[level];
584 
585     auto& add_files = level_state.added_files;
586     auto add_it = add_files.find(file_number);
587     if (add_it != add_files.end()) {
588       UnrefFile(add_it->second);
589       add_files.erase(add_it);
590     }
591 
592     auto& del_files = level_state.deleted_files;
593     assert(del_files.find(file_number) == del_files.end());
594     del_files.emplace(file_number);
595 
596     table_file_levels_[file_number] =
597         VersionStorageInfo::FileLocation::Invalid().GetLevel();
598 
599     return Status::OK();
600   }
601 
ApplyFileAddition(int level,const FileMetaData & meta)602   Status ApplyFileAddition(int level, const FileMetaData& meta) {
603     assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
604 
605     const uint64_t file_number = meta.fd.GetNumber();
606 
607     const int current_level = GetCurrentLevelForTableFile(file_number);
608 
609     if (current_level !=
610         VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
611       if (level >= num_levels_) {
612         has_invalid_levels_ = true;
613       }
614 
615       std::ostringstream oss;
616       oss << "Cannot add table file #" << file_number << " to level " << level
617           << " since it is already in the LSM tree on level " << current_level;
618       return Status::Corruption("VersionBuilder", oss.str());
619     }
620 
621     if (level >= num_levels_) {
622       ++invalid_level_sizes_[level];
623       table_file_levels_[file_number] = level;
624 
625       return Status::OK();
626     }
627 
628     auto& level_state = levels_[level];
629 
630     auto& del_files = level_state.deleted_files;
631     auto del_it = del_files.find(file_number);
632     if (del_it != del_files.end()) {
633       del_files.erase(del_it);
634     }
635 
636     FileMetaData* const f = new FileMetaData(meta);
637     f->refs = 1;
638 
639     auto& add_files = level_state.added_files;
640     assert(add_files.find(file_number) == add_files.end());
641     add_files.emplace(file_number, f);
642 
643     const uint64_t blob_file_number = f->oldest_blob_file_number;
644 
645     if (blob_file_number != kInvalidBlobFileNumber &&
646         IsBlobFileInVersion(blob_file_number)) {
647       blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
648     }
649 
650     table_file_levels_[file_number] = level;
651 
652     return Status::OK();
653   }
654 
655   // Apply all of the edits in *edit to the current state.
Apply(VersionEdit * edit)656   Status Apply(VersionEdit* edit) {
657     {
658       const Status s = CheckConsistency(base_vstorage_);
659       if (!s.ok()) {
660         return s;
661       }
662     }
663 
664     // Note: we process the blob file related changes first because the
665     // table file addition/deletion logic depends on the blob files
666     // already being there.
667 
668     // Add new blob files
669     for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
670       const Status s = ApplyBlobFileAddition(blob_file_addition);
671       if (!s.ok()) {
672         return s;
673       }
674     }
675 
676     // Increase the amount of garbage for blob files affected by GC
677     for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
678       const Status s = ApplyBlobFileGarbage(blob_file_garbage);
679       if (!s.ok()) {
680         return s;
681       }
682     }
683 
684     // Delete table files
685     for (const auto& deleted_file : edit->GetDeletedFiles()) {
686       const int level = deleted_file.first;
687       const uint64_t file_number = deleted_file.second;
688 
689       const Status s = ApplyFileDeletion(level, file_number);
690       if (!s.ok()) {
691         return s;
692       }
693     }
694 
695     // Add new table files
696     for (const auto& new_file : edit->GetNewFiles()) {
697       const int level = new_file.first;
698       const FileMetaData& meta = new_file.second;
699 
700       const Status s = ApplyFileAddition(level, meta);
701       if (!s.ok()) {
702         return s;
703       }
704     }
705 
706     return Status::OK();
707   }
708 
ApplyLinkedSstChanges(const BlobFileMetaData::LinkedSsts & base,const std::unordered_set<uint64_t> & newly_linked,const std::unordered_set<uint64_t> & newly_unlinked)709   static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
710       const BlobFileMetaData::LinkedSsts& base,
711       const std::unordered_set<uint64_t>& newly_linked,
712       const std::unordered_set<uint64_t>& newly_unlinked) {
713     BlobFileMetaData::LinkedSsts result(base);
714 
715     for (uint64_t sst_file_number : newly_unlinked) {
716       assert(result.find(sst_file_number) != result.end());
717 
718       result.erase(sst_file_number);
719     }
720 
721     for (uint64_t sst_file_number : newly_linked) {
722       assert(result.find(sst_file_number) == result.end());
723 
724       result.emplace(sst_file_number);
725     }
726 
727     return result;
728   }
729 
CreateMetaDataForNewBlobFile(const BlobFileMetaDataDelta & delta)730   static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
731       const BlobFileMetaDataDelta& delta) {
732     auto shared_meta = delta.GetSharedMeta();
733     assert(shared_meta);
734 
735     assert(delta.GetNewlyUnlinkedSsts().empty());
736 
737     auto meta = BlobFileMetaData::Create(
738         std::move(shared_meta), delta.GetNewlyLinkedSsts(),
739         delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
740 
741     return meta;
742   }
743 
744   static std::shared_ptr<BlobFileMetaData>
GetOrCreateMetaDataForExistingBlobFile(const std::shared_ptr<BlobFileMetaData> & base_meta,const BlobFileMetaDataDelta & delta)745   GetOrCreateMetaDataForExistingBlobFile(
746       const std::shared_ptr<BlobFileMetaData>& base_meta,
747       const BlobFileMetaDataDelta& delta) {
748     assert(base_meta);
749     assert(!delta.GetSharedMeta());
750 
751     if (delta.IsEmpty()) {
752       return base_meta;
753     }
754 
755     auto shared_meta = base_meta->GetSharedMeta();
756     assert(shared_meta);
757 
758     auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
759                                              delta.GetNewlyLinkedSsts(),
760                                              delta.GetNewlyUnlinkedSsts());
761 
762     auto meta = BlobFileMetaData::Create(
763         std::move(shared_meta), std::move(linked_ssts),
764         base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
765         base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
766 
767     return meta;
768   }
769 
770   // Add the blob file specified by meta to *vstorage if it is determined to
771   // contain valid data (blobs). We make this decision based on the amount
772   // of garbage in the file, and whether the file or any lower-numbered blob
773   // files have any linked SSTs. The latter condition is tracked using the
774   // flag *found_first_non_empty.
AddBlobFileIfNeeded(VersionStorageInfo * vstorage,const std::shared_ptr<BlobFileMetaData> & meta,bool * found_first_non_empty) const775   void AddBlobFileIfNeeded(VersionStorageInfo* vstorage,
776                            const std::shared_ptr<BlobFileMetaData>& meta,
777                            bool* found_first_non_empty) const {
778     assert(vstorage);
779     assert(meta);
780     assert(found_first_non_empty);
781 
782     if (!meta->GetLinkedSsts().empty()) {
783       (*found_first_non_empty) = true;
784     } else if (!(*found_first_non_empty) ||
785                meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
786       return;
787     }
788 
789     vstorage->AddBlobFile(meta);
790   }
791 
792   // Merge the blob file metadata from the base version with the changes (edits)
793   // applied, and save the result into *vstorage.
SaveBlobFilesTo(VersionStorageInfo * vstorage) const794   void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
795     assert(base_vstorage_);
796     assert(vstorage);
797 
798     bool found_first_non_empty = false;
799 
800     const auto& base_blob_files = base_vstorage_->GetBlobFiles();
801     auto base_it = base_blob_files.begin();
802     const auto base_it_end = base_blob_files.end();
803 
804     auto delta_it = blob_file_meta_deltas_.begin();
805     const auto delta_it_end = blob_file_meta_deltas_.end();
806 
807     while (base_it != base_it_end && delta_it != delta_it_end) {
808       const uint64_t base_blob_file_number = base_it->first;
809       const uint64_t delta_blob_file_number = delta_it->first;
810 
811       if (base_blob_file_number < delta_blob_file_number) {
812         const auto& base_meta = base_it->second;
813 
814         AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
815 
816         ++base_it;
817       } else if (delta_blob_file_number < base_blob_file_number) {
818         const auto& delta = delta_it->second;
819 
820         auto meta = CreateMetaDataForNewBlobFile(delta);
821 
822         AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
823 
824         ++delta_it;
825       } else {
826         assert(base_blob_file_number == delta_blob_file_number);
827 
828         const auto& base_meta = base_it->second;
829         const auto& delta = delta_it->second;
830 
831         auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
832 
833         AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
834 
835         ++base_it;
836         ++delta_it;
837       }
838     }
839 
840     while (base_it != base_it_end) {
841       const auto& base_meta = base_it->second;
842 
843       AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
844 
845       ++base_it;
846     }
847 
848     while (delta_it != delta_it_end) {
849       const auto& delta = delta_it->second;
850 
851       auto meta = CreateMetaDataForNewBlobFile(delta);
852 
853       AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
854 
855       ++delta_it;
856     }
857   }
858 
859   // Save the current state in *vstorage.
SaveTo(VersionStorageInfo * vstorage)860   Status SaveTo(VersionStorageInfo* vstorage) {
861     Status s = CheckConsistency(base_vstorage_);
862     if (!s.ok()) {
863       return s;
864     }
865 
866     s = CheckConsistency(vstorage);
867     if (!s.ok()) {
868       return s;
869     }
870 
871     for (int level = 0; level < num_levels_; level++) {
872       const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
873       // Merge the set of added files with the set of pre-existing files.
874       // Drop any deleted files.  Store the result in *v.
875       const auto& base_files = base_vstorage_->LevelFiles(level);
876       const auto& unordered_added_files = levels_[level].added_files;
877       vstorage->Reserve(level,
878                         base_files.size() + unordered_added_files.size());
879 
880       // Sort added files for the level.
881       std::vector<FileMetaData*> added_files;
882       added_files.reserve(unordered_added_files.size());
883       for (const auto& pair : unordered_added_files) {
884         added_files.push_back(pair.second);
885       }
886       std::sort(added_files.begin(), added_files.end(), cmp);
887 
888 #ifndef NDEBUG
889       FileMetaData* prev_added_file = nullptr;
890       for (const auto& added : added_files) {
891         if (level > 0 && prev_added_file != nullptr) {
892           assert(base_vstorage_->InternalComparator()->Compare(
893                      prev_added_file->smallest, added->smallest) <= 0);
894         }
895         prev_added_file = added;
896       }
897 #endif
898 
899       auto base_iter = base_files.begin();
900       auto base_end = base_files.end();
901       auto added_iter = added_files.begin();
902       auto added_end = added_files.end();
903       while (added_iter != added_end || base_iter != base_end) {
904         if (base_iter == base_end ||
905                 (added_iter != added_end && cmp(*added_iter, *base_iter))) {
906           MaybeAddFile(vstorage, level, *added_iter++);
907         } else {
908           MaybeAddFile(vstorage, level, *base_iter++);
909         }
910       }
911     }
912 
913     SaveBlobFilesTo(vstorage);
914 
915     s = CheckConsistency(vstorage);
916     return s;
917   }
918 
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor,size_t max_file_size_for_l0_meta_pin)919   Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
920                            bool prefetch_index_and_filter_in_cache,
921                            bool is_initial_load,
922                            const SliceTransform* prefix_extractor,
923                            size_t max_file_size_for_l0_meta_pin) {
924     assert(table_cache_ != nullptr);
925 
926     size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
927     bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
928     size_t max_load = port::kMaxSizet;
929 
930     if (!always_load) {
931       // If it is initial loading and not set to always loading all the
932       // files, we only load up to kInitialLoadLimit files, to limit the
933       // time reopening the DB.
934       const size_t kInitialLoadLimit = 16;
935       size_t load_limit;
936       // If the table cache is not 1/4 full, we pin the table handle to
937       // file metadata to avoid the cache read costs when reading the file.
938       // The downside of pinning those files is that LRU won't be followed
939       // for those files. This doesn't matter much because if number of files
940       // of the DB excceeds table cache capacity, eventually no table reader
941       // will be pinned and LRU will be followed.
942       if (is_initial_load) {
943         load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
944       } else {
945         load_limit = table_cache_capacity / 4;
946       }
947 
948       size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
949       if (table_cache_usage >= load_limit) {
950         // TODO (yanqin) find a suitable status code.
951         return Status::OK();
952       } else {
953         max_load = load_limit - table_cache_usage;
954       }
955     }
956 
957     // <file metadata, level>
958     std::vector<std::pair<FileMetaData*, int>> files_meta;
959     std::vector<Status> statuses;
960     for (int level = 0; level < num_levels_; level++) {
961       for (auto& file_meta_pair : levels_[level].added_files) {
962         auto* file_meta = file_meta_pair.second;
963         // If the file has been opened before, just skip it.
964         if (!file_meta->table_reader_handle) {
965           files_meta.emplace_back(file_meta, level);
966           statuses.emplace_back(Status::OK());
967         }
968         if (files_meta.size() >= max_load) {
969           break;
970         }
971       }
972       if (files_meta.size() >= max_load) {
973         break;
974       }
975     }
976 
977     std::atomic<size_t> next_file_meta_idx(0);
978     std::function<void()> load_handlers_func([&]() {
979       while (true) {
980         size_t file_idx = next_file_meta_idx.fetch_add(1);
981         if (file_idx >= files_meta.size()) {
982           break;
983         }
984 
985         auto* file_meta = files_meta[file_idx].first;
986         int level = files_meta[file_idx].second;
987         statuses[file_idx] = table_cache_->FindTable(
988             ReadOptions(), file_options_,
989             *(base_vstorage_->InternalComparator()), file_meta->fd,
990             &file_meta->table_reader_handle, prefix_extractor, false /*no_io */,
991             true /* record_read_stats */,
992             internal_stats->GetFileReadHist(level), false, level,
993             prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin);
994         if (file_meta->table_reader_handle != nullptr) {
995           // Load table_reader
996           file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
997               file_meta->table_reader_handle);
998         }
999       }
1000     });
1001 
1002     std::vector<port::Thread> threads;
1003     for (int i = 1; i < max_threads; i++) {
1004       threads.emplace_back(load_handlers_func);
1005     }
1006     load_handlers_func();
1007     for (auto& t : threads) {
1008       t.join();
1009     }
1010     Status ret;
1011     for (const auto& s : statuses) {
1012       if (!s.ok()) {
1013         if (ret.ok()) {
1014           ret = s;
1015         }
1016       }
1017     }
1018     return ret;
1019   }
1020 
MaybeAddFile(VersionStorageInfo * vstorage,int level,FileMetaData * f)1021   void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
1022     const uint64_t file_number = f->fd.GetNumber();
1023 
1024     const auto& level_state = levels_[level];
1025 
1026     const auto& del_files = level_state.deleted_files;
1027     const auto del_it = del_files.find(file_number);
1028 
1029     if (del_it != del_files.end()) {
1030       // f is to-be-deleted table file
1031       vstorage->RemoveCurrentStats(f);
1032     } else {
1033       const auto& add_files = level_state.added_files;
1034       const auto add_it = add_files.find(file_number);
1035 
1036       // Note: if the file appears both in the base version and in the added
1037       // list, the added FileMetaData supersedes the one in the base version.
1038       if (add_it != add_files.end() && add_it->second != f) {
1039         vstorage->RemoveCurrentStats(f);
1040       } else {
1041         vstorage->AddFile(level, f);
1042       }
1043     }
1044   }
1045 };
1046 
VersionBuilder(const FileOptions & file_options,const ImmutableCFOptions * ioptions,TableCache * table_cache,VersionStorageInfo * base_vstorage,VersionSet * version_set)1047 VersionBuilder::VersionBuilder(const FileOptions& file_options,
1048                                const ImmutableCFOptions* ioptions,
1049                                TableCache* table_cache,
1050                                VersionStorageInfo* base_vstorage,
1051                                VersionSet* version_set)
1052     : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
1053                    version_set)) {}
1054 
1055 VersionBuilder::~VersionBuilder() = default;
1056 
CheckConsistencyForNumLevels()1057 bool VersionBuilder::CheckConsistencyForNumLevels() {
1058   return rep_->CheckConsistencyForNumLevels();
1059 }
1060 
Apply(VersionEdit * edit)1061 Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
1062 
SaveTo(VersionStorageInfo * vstorage)1063 Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
1064   return rep_->SaveTo(vstorage);
1065 }
1066 
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor,size_t max_file_size_for_l0_meta_pin)1067 Status VersionBuilder::LoadTableHandlers(
1068     InternalStats* internal_stats, int max_threads,
1069     bool prefetch_index_and_filter_in_cache, bool is_initial_load,
1070     const SliceTransform* prefix_extractor,
1071     size_t max_file_size_for_l0_meta_pin) {
1072   return rep_->LoadTableHandlers(
1073       internal_stats, max_threads, prefetch_index_and_filter_in_cache,
1074       is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
1075 }
1076 
GetMinOldestBlobFileNumber() const1077 uint64_t VersionBuilder::GetMinOldestBlobFileNumber() const {
1078   return rep_->GetMinOldestBlobFileNumber();
1079 }
1080 
BaseReferencedVersionBuilder(ColumnFamilyData * cfd)1081 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
1082     ColumnFamilyData* cfd)
1083     : version_builder_(new VersionBuilder(
1084           cfd->current()->version_set()->file_options(), cfd->ioptions(),
1085           cfd->table_cache(), cfd->current()->storage_info(),
1086           cfd->current()->version_set())),
1087       version_(cfd->current()) {
1088   version_->Ref();
1089 }
1090 
BaseReferencedVersionBuilder(ColumnFamilyData * cfd,Version * v)1091 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
1092     ColumnFamilyData* cfd, Version* v)
1093     : version_builder_(new VersionBuilder(
1094           cfd->current()->version_set()->file_options(), cfd->ioptions(),
1095           cfd->table_cache(), v->storage_info(), v->version_set())),
1096       version_(v) {
1097   assert(version_ != cfd->current());
1098 }
1099 
~BaseReferencedVersionBuilder()1100 BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
1101   version_->Unref();
1102 }
1103 
1104 }  // namespace ROCKSDB_NAMESPACE
1105