1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_builder.h"
11
12 #include <algorithm>
13 #include <atomic>
14 #include <cinttypes>
15 #include <functional>
16 #include <map>
17 #include <memory>
18 #include <set>
19 #include <sstream>
20 #include <thread>
21 #include <unordered_map>
22 #include <unordered_set>
23 #include <utility>
24 #include <vector>
25
26 #include "db/blob/blob_file_meta.h"
27 #include "db/dbformat.h"
28 #include "db/internal_stats.h"
29 #include "db/table_cache.h"
30 #include "db/version_set.h"
31 #include "port/port.h"
32 #include "table/table_reader.h"
33 #include "util/string_util.h"
34
35 namespace ROCKSDB_NAMESPACE {
36
NewestFirstBySeqNo(FileMetaData * a,FileMetaData * b)37 bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
38 if (a->fd.largest_seqno != b->fd.largest_seqno) {
39 return a->fd.largest_seqno > b->fd.largest_seqno;
40 }
41 if (a->fd.smallest_seqno != b->fd.smallest_seqno) {
42 return a->fd.smallest_seqno > b->fd.smallest_seqno;
43 }
44 // Break ties by file number
45 return a->fd.GetNumber() > b->fd.GetNumber();
46 }
47
48 namespace {
BySmallestKey(FileMetaData * a,FileMetaData * b,const InternalKeyComparator * cmp)49 bool BySmallestKey(FileMetaData* a, FileMetaData* b,
50 const InternalKeyComparator* cmp) {
51 int r = cmp->Compare(a->smallest, b->smallest);
52 if (r != 0) {
53 return (r < 0);
54 }
55 // Break ties by file number
56 return (a->fd.GetNumber() < b->fd.GetNumber());
57 }
58 } // namespace
59
60 class VersionBuilder::Rep {
61 private:
62 // Helper to sort files_ in v
63 // kLevel0 -- NewestFirstBySeqNo
64 // kLevelNon0 -- BySmallestKey
65 struct FileComparator {
66 enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
67 const InternalKeyComparator* internal_comparator;
68
FileComparatorROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator69 FileComparator() : internal_comparator(nullptr) {}
70
operator ()ROCKSDB_NAMESPACE::VersionBuilder::Rep::FileComparator71 bool operator()(FileMetaData* f1, FileMetaData* f2) const {
72 switch (sort_method) {
73 case kLevel0:
74 return NewestFirstBySeqNo(f1, f2);
75 case kLevelNon0:
76 return BySmallestKey(f1, f2, internal_comparator);
77 }
78 assert(false);
79 return false;
80 }
81 };
82
83 struct LevelState {
84 std::unordered_set<uint64_t> deleted_files;
85 // Map from file number to file meta data.
86 std::unordered_map<uint64_t, FileMetaData*> added_files;
87 };
88
89 class BlobFileMetaDataDelta {
90 public:
IsEmpty() const91 bool IsEmpty() const {
92 return !shared_meta_ && !additional_garbage_count_ &&
93 !additional_garbage_bytes_ && newly_linked_ssts_.empty() &&
94 newly_unlinked_ssts_.empty();
95 }
96
GetSharedMeta() const97 std::shared_ptr<SharedBlobFileMetaData> GetSharedMeta() const {
98 return shared_meta_;
99 }
100
GetAdditionalGarbageCount() const101 uint64_t GetAdditionalGarbageCount() const {
102 return additional_garbage_count_;
103 }
104
GetAdditionalGarbageBytes() const105 uint64_t GetAdditionalGarbageBytes() const {
106 return additional_garbage_bytes_;
107 }
108
GetNewlyLinkedSsts() const109 const std::unordered_set<uint64_t>& GetNewlyLinkedSsts() const {
110 return newly_linked_ssts_;
111 }
112
GetNewlyUnlinkedSsts() const113 const std::unordered_set<uint64_t>& GetNewlyUnlinkedSsts() const {
114 return newly_unlinked_ssts_;
115 }
116
SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta)117 void SetSharedMeta(std::shared_ptr<SharedBlobFileMetaData> shared_meta) {
118 assert(!shared_meta_);
119 assert(shared_meta);
120
121 shared_meta_ = std::move(shared_meta);
122 }
123
AddGarbage(uint64_t count,uint64_t bytes)124 void AddGarbage(uint64_t count, uint64_t bytes) {
125 additional_garbage_count_ += count;
126 additional_garbage_bytes_ += bytes;
127 }
128
LinkSst(uint64_t sst_file_number)129 void LinkSst(uint64_t sst_file_number) {
130 assert(newly_linked_ssts_.find(sst_file_number) ==
131 newly_linked_ssts_.end());
132
133 // Reconcile with newly unlinked SSTs on the fly. (Note: an SST can be
134 // linked to and unlinked from the same blob file in the case of a trivial
135 // move.)
136 auto it = newly_unlinked_ssts_.find(sst_file_number);
137
138 if (it != newly_unlinked_ssts_.end()) {
139 newly_unlinked_ssts_.erase(it);
140 } else {
141 newly_linked_ssts_.emplace(sst_file_number);
142 }
143 }
144
UnlinkSst(uint64_t sst_file_number)145 void UnlinkSst(uint64_t sst_file_number) {
146 assert(newly_unlinked_ssts_.find(sst_file_number) ==
147 newly_unlinked_ssts_.end());
148
149 // Reconcile with newly linked SSTs on the fly. (Note: an SST can be
150 // linked to and unlinked from the same blob file in the case of a trivial
151 // move.)
152 auto it = newly_linked_ssts_.find(sst_file_number);
153
154 if (it != newly_linked_ssts_.end()) {
155 newly_linked_ssts_.erase(it);
156 } else {
157 newly_unlinked_ssts_.emplace(sst_file_number);
158 }
159 }
160
161 private:
162 std::shared_ptr<SharedBlobFileMetaData> shared_meta_;
163 uint64_t additional_garbage_count_ = 0;
164 uint64_t additional_garbage_bytes_ = 0;
165 std::unordered_set<uint64_t> newly_linked_ssts_;
166 std::unordered_set<uint64_t> newly_unlinked_ssts_;
167 };
168
169 const FileOptions& file_options_;
170 const ImmutableCFOptions* const ioptions_;
171 TableCache* table_cache_;
172 VersionStorageInfo* base_vstorage_;
173 VersionSet* version_set_;
174 int num_levels_;
175 LevelState* levels_;
176 // Store sizes of levels larger than num_levels_. We do this instead of
177 // storing them in levels_ to avoid regression in case there are no files
178 // on invalid levels. The version is not consistent if in the end the files
179 // on invalid levels don't cancel out.
180 std::unordered_map<int, size_t> invalid_level_sizes_;
181 // Whether there are invalid new files or invalid deletion on levels larger
182 // than num_levels_.
183 bool has_invalid_levels_;
184 // Current levels of table files affected by additions/deletions.
185 std::unordered_map<uint64_t, int> table_file_levels_;
186 FileComparator level_zero_cmp_;
187 FileComparator level_nonzero_cmp_;
188
189 // Metadata delta for all blob files affected by the series of version edits.
190 std::map<uint64_t, BlobFileMetaDataDelta> blob_file_meta_deltas_;
191
192 public:
Rep(const FileOptions & file_options,const ImmutableCFOptions * ioptions,TableCache * table_cache,VersionStorageInfo * base_vstorage,VersionSet * version_set)193 Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
194 TableCache* table_cache, VersionStorageInfo* base_vstorage,
195 VersionSet* version_set)
196 : file_options_(file_options),
197 ioptions_(ioptions),
198 table_cache_(table_cache),
199 base_vstorage_(base_vstorage),
200 version_set_(version_set),
201 num_levels_(base_vstorage->num_levels()),
202 has_invalid_levels_(false) {
203 assert(ioptions_);
204
205 levels_ = new LevelState[num_levels_];
206 level_zero_cmp_.sort_method = FileComparator::kLevel0;
207 level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
208 level_nonzero_cmp_.internal_comparator =
209 base_vstorage_->InternalComparator();
210 }
211
~Rep()212 ~Rep() {
213 for (int level = 0; level < num_levels_; level++) {
214 const auto& added = levels_[level].added_files;
215 for (auto& pair : added) {
216 UnrefFile(pair.second);
217 }
218 }
219
220 delete[] levels_;
221 }
222
UnrefFile(FileMetaData * f)223 void UnrefFile(FileMetaData* f) {
224 f->refs--;
225 if (f->refs <= 0) {
226 if (f->table_reader_handle) {
227 assert(table_cache_ != nullptr);
228 table_cache_->ReleaseHandle(f->table_reader_handle);
229 f->table_reader_handle = nullptr;
230 }
231 delete f;
232 }
233 }
234
IsBlobFileInVersion(uint64_t blob_file_number) const235 bool IsBlobFileInVersion(uint64_t blob_file_number) const {
236 auto delta_it = blob_file_meta_deltas_.find(blob_file_number);
237 if (delta_it != blob_file_meta_deltas_.end()) {
238 if (delta_it->second.GetSharedMeta()) {
239 return true;
240 }
241 }
242
243 assert(base_vstorage_);
244
245 const auto& base_blob_files = base_vstorage_->GetBlobFiles();
246
247 auto base_it = base_blob_files.find(blob_file_number);
248 if (base_it != base_blob_files.end()) {
249 assert(base_it->second);
250 assert(base_it->second->GetSharedMeta());
251
252 return true;
253 }
254
255 return false;
256 }
257
258 using ExpectedLinkedSsts =
259 std::unordered_map<uint64_t, BlobFileMetaData::LinkedSsts>;
260
UpdateExpectedLinkedSsts(uint64_t table_file_number,uint64_t blob_file_number,ExpectedLinkedSsts * expected_linked_ssts)261 static void UpdateExpectedLinkedSsts(
262 uint64_t table_file_number, uint64_t blob_file_number,
263 ExpectedLinkedSsts* expected_linked_ssts) {
264 assert(expected_linked_ssts);
265
266 if (blob_file_number == kInvalidBlobFileNumber) {
267 return;
268 }
269
270 (*expected_linked_ssts)[blob_file_number].emplace(table_file_number);
271 }
272
CheckConsistencyDetails(VersionStorageInfo * vstorage)273 Status CheckConsistencyDetails(VersionStorageInfo* vstorage) {
274 // Make sure the files are sorted correctly and that the links between
275 // table files and blob files are consistent. The latter is checked using
276 // the following mapping, which is built using the forward links
277 // (table file -> blob file), and is subsequently compared with the inverse
278 // mapping stored in the BlobFileMetaData objects.
279 ExpectedLinkedSsts expected_linked_ssts;
280
281 for (int level = 0; level < num_levels_; level++) {
282 auto& level_files = vstorage->LevelFiles(level);
283
284 if (level_files.empty()) {
285 continue;
286 }
287
288 assert(level_files[0]);
289 UpdateExpectedLinkedSsts(level_files[0]->fd.GetNumber(),
290 level_files[0]->oldest_blob_file_number,
291 &expected_linked_ssts);
292 for (size_t i = 1; i < level_files.size(); i++) {
293 assert(level_files[i]);
294 UpdateExpectedLinkedSsts(level_files[i]->fd.GetNumber(),
295 level_files[i]->oldest_blob_file_number,
296 &expected_linked_ssts);
297
298 auto f1 = level_files[i - 1];
299 auto f2 = level_files[i];
300 if (level == 0) {
301 #ifndef NDEBUG
302 auto pair = std::make_pair(&f1, &f2);
303 TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency0", &pair);
304 #endif
305 if (!level_zero_cmp_(f1, f2)) {
306 return Status::Corruption("L0 files are not sorted properly");
307 }
308
309 if (f2->fd.smallest_seqno == f2->fd.largest_seqno) {
310 // This is an external file that we ingested
311 SequenceNumber external_file_seqno = f2->fd.smallest_seqno;
312 if (!(external_file_seqno < f1->fd.largest_seqno ||
313 external_file_seqno == 0)) {
314 return Status::Corruption(
315 "L0 file with seqno " +
316 NumberToString(f1->fd.smallest_seqno) + " " +
317 NumberToString(f1->fd.largest_seqno) +
318 " vs. file with global_seqno" +
319 NumberToString(external_file_seqno) + " with fileNumber " +
320 NumberToString(f1->fd.GetNumber()));
321 }
322 } else if (f1->fd.smallest_seqno <= f2->fd.smallest_seqno) {
323 return Status::Corruption(
324 "L0 files seqno " + NumberToString(f1->fd.smallest_seqno) +
325 " " + NumberToString(f1->fd.largest_seqno) + " " +
326 NumberToString(f1->fd.GetNumber()) + " vs. " +
327 NumberToString(f2->fd.smallest_seqno) + " " +
328 NumberToString(f2->fd.largest_seqno) + " " +
329 NumberToString(f2->fd.GetNumber()));
330 }
331 } else {
332 #ifndef NDEBUG
333 auto pair = std::make_pair(&f1, &f2);
334 TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistency1", &pair);
335 #endif
336 if (!level_nonzero_cmp_(f1, f2)) {
337 return Status::Corruption(
338 "L" + NumberToString(level) +
339 " files are not sorted properly: files #" +
340 NumberToString(f1->fd.GetNumber()) + ", #" +
341 NumberToString(f2->fd.GetNumber()));
342 }
343
344 // Make sure there is no overlap in levels > 0
345 if (vstorage->InternalComparator()->Compare(f1->largest,
346 f2->smallest) >= 0) {
347 return Status::Corruption(
348 "L" + NumberToString(level) +
349 " have overlapping ranges: file #" +
350 NumberToString(f1->fd.GetNumber()) +
351 " largest key: " + (f1->largest).DebugString(true) +
352 " vs. file #" + NumberToString(f2->fd.GetNumber()) +
353 " smallest key: " + (f2->smallest).DebugString(true));
354 }
355 }
356 }
357 }
358
359 // Make sure that all blob files in the version have non-garbage data.
360 const auto& blob_files = vstorage->GetBlobFiles();
361 for (const auto& pair : blob_files) {
362 const uint64_t blob_file_number = pair.first;
363 const auto& blob_file_meta = pair.second;
364 assert(blob_file_meta);
365
366 if (blob_file_meta->GetGarbageBlobCount() >=
367 blob_file_meta->GetTotalBlobCount()) {
368 std::ostringstream oss;
369 oss << "Blob file #" << blob_file_number
370 << " consists entirely of garbage";
371
372 return Status::Corruption("VersionBuilder", oss.str());
373 }
374
375 if (blob_file_meta->GetLinkedSsts() !=
376 expected_linked_ssts[blob_file_number]) {
377 std::ostringstream oss;
378 oss << "Links are inconsistent between table files and blob file #"
379 << blob_file_number;
380
381 return Status::Corruption("VersionBuilder", oss.str());
382 }
383 }
384
385 Status ret_s;
386 TEST_SYNC_POINT_CALLBACK("VersionBuilder::CheckConsistencyBeforeReturn",
387 &ret_s);
388 return ret_s;
389 }
390
CheckConsistency(VersionStorageInfo * vstorage)391 Status CheckConsistency(VersionStorageInfo* vstorage) {
392 // Always run consistency checks in debug build
393 #ifdef NDEBUG
394 if (!vstorage->force_consistency_checks()) {
395 return Status::OK();
396 }
397 #endif
398 Status s = CheckConsistencyDetails(vstorage);
399 if (s.IsCorruption() && s.getState()) {
400 // Make it clear the error is due to force_consistency_checks = 1 or
401 // debug build
402 #ifdef NDEBUG
403 auto prefix = "force_consistency_checks";
404 #else
405 auto prefix = "force_consistency_checks(DEBUG)";
406 #endif
407 s = Status::Corruption(prefix, s.getState());
408 } else {
409 // was only expecting corruption with message, or OK
410 assert(s.ok());
411 }
412 return s;
413 }
414
CheckConsistencyForNumLevels() const415 bool CheckConsistencyForNumLevels() const {
416 // Make sure there are no files on or beyond num_levels().
417 if (has_invalid_levels_) {
418 return false;
419 }
420
421 for (const auto& pair : invalid_level_sizes_) {
422 const size_t level_size = pair.second;
423 if (level_size != 0) {
424 return false;
425 }
426 }
427
428 return true;
429 }
430
ApplyBlobFileAddition(const BlobFileAddition & blob_file_addition)431 Status ApplyBlobFileAddition(const BlobFileAddition& blob_file_addition) {
432 const uint64_t blob_file_number = blob_file_addition.GetBlobFileNumber();
433
434 if (IsBlobFileInVersion(blob_file_number)) {
435 std::ostringstream oss;
436 oss << "Blob file #" << blob_file_number << " already added";
437
438 return Status::Corruption("VersionBuilder", oss.str());
439 }
440
441 // Note: we use C++11 for now but in C++14, this could be done in a more
442 // elegant way using generalized lambda capture.
443 VersionSet* const vs = version_set_;
444 const ImmutableCFOptions* const ioptions = ioptions_;
445
446 auto deleter = [vs, ioptions](SharedBlobFileMetaData* shared_meta) {
447 if (vs) {
448 assert(ioptions);
449 assert(!ioptions->cf_paths.empty());
450 assert(shared_meta);
451
452 vs->AddObsoleteBlobFile(shared_meta->GetBlobFileNumber(),
453 ioptions->cf_paths.front().path);
454 }
455
456 delete shared_meta;
457 };
458
459 auto shared_meta = SharedBlobFileMetaData::Create(
460 blob_file_number, blob_file_addition.GetTotalBlobCount(),
461 blob_file_addition.GetTotalBlobBytes(),
462 blob_file_addition.GetChecksumMethod(),
463 blob_file_addition.GetChecksumValue(), deleter);
464
465 blob_file_meta_deltas_[blob_file_number].SetSharedMeta(
466 std::move(shared_meta));
467
468 return Status::OK();
469 }
470
ApplyBlobFileGarbage(const BlobFileGarbage & blob_file_garbage)471 Status ApplyBlobFileGarbage(const BlobFileGarbage& blob_file_garbage) {
472 const uint64_t blob_file_number = blob_file_garbage.GetBlobFileNumber();
473
474 if (!IsBlobFileInVersion(blob_file_number)) {
475 std::ostringstream oss;
476 oss << "Blob file #" << blob_file_number << " not found";
477
478 return Status::Corruption("VersionBuilder", oss.str());
479 }
480
481 blob_file_meta_deltas_[blob_file_number].AddGarbage(
482 blob_file_garbage.GetGarbageBlobCount(),
483 blob_file_garbage.GetGarbageBlobBytes());
484
485 return Status::OK();
486 }
487
GetCurrentLevelForTableFile(uint64_t file_number) const488 int GetCurrentLevelForTableFile(uint64_t file_number) const {
489 auto it = table_file_levels_.find(file_number);
490 if (it != table_file_levels_.end()) {
491 return it->second;
492 }
493
494 assert(base_vstorage_);
495 return base_vstorage_->GetFileLocation(file_number).GetLevel();
496 }
497
GetOldestBlobFileNumberForTableFile(int level,uint64_t file_number) const498 uint64_t GetOldestBlobFileNumberForTableFile(int level,
499 uint64_t file_number) const {
500 assert(level < num_levels_);
501
502 const auto& added_files = levels_[level].added_files;
503
504 auto it = added_files.find(file_number);
505 if (it != added_files.end()) {
506 const FileMetaData* const meta = it->second;
507 assert(meta);
508
509 return meta->oldest_blob_file_number;
510 }
511
512 assert(base_vstorage_);
513 const FileMetaData* const meta =
514 base_vstorage_->GetFileMetaDataByNumber(file_number);
515 assert(meta);
516
517 return meta->oldest_blob_file_number;
518 }
519
GetMinOldestBlobFileNumber() const520 uint64_t GetMinOldestBlobFileNumber() const {
521 uint64_t min_oldest_blob_file_num = std::numeric_limits<uint64_t>::max();
522 for (int level = 0; level < num_levels_; ++level) {
523 const auto& base_files = base_vstorage_->LevelFiles(level);
524 for (const auto* fmeta : base_files) {
525 assert(fmeta);
526 min_oldest_blob_file_num =
527 std::min(min_oldest_blob_file_num, fmeta->oldest_blob_file_number);
528 }
529 const auto& added_files = levels_[level].added_files;
530 for (const auto& elem : added_files) {
531 assert(elem.second);
532 min_oldest_blob_file_num = std::min(
533 min_oldest_blob_file_num, elem.second->oldest_blob_file_number);
534 }
535 }
536 if (min_oldest_blob_file_num == std::numeric_limits<uint64_t>::max()) {
537 min_oldest_blob_file_num = kInvalidBlobFileNumber;
538 }
539 return min_oldest_blob_file_num;
540 }
541
ApplyFileDeletion(int level,uint64_t file_number)542 Status ApplyFileDeletion(int level, uint64_t file_number) {
543 assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
544
545 const int current_level = GetCurrentLevelForTableFile(file_number);
546
547 if (level != current_level) {
548 if (level >= num_levels_) {
549 has_invalid_levels_ = true;
550 }
551
552 std::ostringstream oss;
553 oss << "Cannot delete table file #" << file_number << " from level "
554 << level << " since it is ";
555 if (current_level ==
556 VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
557 oss << "not in the LSM tree";
558 } else {
559 oss << "on level " << current_level;
560 }
561
562 return Status::Corruption("VersionBuilder", oss.str());
563 }
564
565 if (level >= num_levels_) {
566 assert(invalid_level_sizes_[level] > 0);
567 --invalid_level_sizes_[level];
568
569 table_file_levels_[file_number] =
570 VersionStorageInfo::FileLocation::Invalid().GetLevel();
571
572 return Status::OK();
573 }
574
575 const uint64_t blob_file_number =
576 GetOldestBlobFileNumberForTableFile(level, file_number);
577
578 if (blob_file_number != kInvalidBlobFileNumber &&
579 IsBlobFileInVersion(blob_file_number)) {
580 blob_file_meta_deltas_[blob_file_number].UnlinkSst(file_number);
581 }
582
583 auto& level_state = levels_[level];
584
585 auto& add_files = level_state.added_files;
586 auto add_it = add_files.find(file_number);
587 if (add_it != add_files.end()) {
588 UnrefFile(add_it->second);
589 add_files.erase(add_it);
590 }
591
592 auto& del_files = level_state.deleted_files;
593 assert(del_files.find(file_number) == del_files.end());
594 del_files.emplace(file_number);
595
596 table_file_levels_[file_number] =
597 VersionStorageInfo::FileLocation::Invalid().GetLevel();
598
599 return Status::OK();
600 }
601
ApplyFileAddition(int level,const FileMetaData & meta)602 Status ApplyFileAddition(int level, const FileMetaData& meta) {
603 assert(level != VersionStorageInfo::FileLocation::Invalid().GetLevel());
604
605 const uint64_t file_number = meta.fd.GetNumber();
606
607 const int current_level = GetCurrentLevelForTableFile(file_number);
608
609 if (current_level !=
610 VersionStorageInfo::FileLocation::Invalid().GetLevel()) {
611 if (level >= num_levels_) {
612 has_invalid_levels_ = true;
613 }
614
615 std::ostringstream oss;
616 oss << "Cannot add table file #" << file_number << " to level " << level
617 << " since it is already in the LSM tree on level " << current_level;
618 return Status::Corruption("VersionBuilder", oss.str());
619 }
620
621 if (level >= num_levels_) {
622 ++invalid_level_sizes_[level];
623 table_file_levels_[file_number] = level;
624
625 return Status::OK();
626 }
627
628 auto& level_state = levels_[level];
629
630 auto& del_files = level_state.deleted_files;
631 auto del_it = del_files.find(file_number);
632 if (del_it != del_files.end()) {
633 del_files.erase(del_it);
634 }
635
636 FileMetaData* const f = new FileMetaData(meta);
637 f->refs = 1;
638
639 auto& add_files = level_state.added_files;
640 assert(add_files.find(file_number) == add_files.end());
641 add_files.emplace(file_number, f);
642
643 const uint64_t blob_file_number = f->oldest_blob_file_number;
644
645 if (blob_file_number != kInvalidBlobFileNumber &&
646 IsBlobFileInVersion(blob_file_number)) {
647 blob_file_meta_deltas_[blob_file_number].LinkSst(file_number);
648 }
649
650 table_file_levels_[file_number] = level;
651
652 return Status::OK();
653 }
654
655 // Apply all of the edits in *edit to the current state.
Apply(VersionEdit * edit)656 Status Apply(VersionEdit* edit) {
657 {
658 const Status s = CheckConsistency(base_vstorage_);
659 if (!s.ok()) {
660 return s;
661 }
662 }
663
664 // Note: we process the blob file related changes first because the
665 // table file addition/deletion logic depends on the blob files
666 // already being there.
667
668 // Add new blob files
669 for (const auto& blob_file_addition : edit->GetBlobFileAdditions()) {
670 const Status s = ApplyBlobFileAddition(blob_file_addition);
671 if (!s.ok()) {
672 return s;
673 }
674 }
675
676 // Increase the amount of garbage for blob files affected by GC
677 for (const auto& blob_file_garbage : edit->GetBlobFileGarbages()) {
678 const Status s = ApplyBlobFileGarbage(blob_file_garbage);
679 if (!s.ok()) {
680 return s;
681 }
682 }
683
684 // Delete table files
685 for (const auto& deleted_file : edit->GetDeletedFiles()) {
686 const int level = deleted_file.first;
687 const uint64_t file_number = deleted_file.second;
688
689 const Status s = ApplyFileDeletion(level, file_number);
690 if (!s.ok()) {
691 return s;
692 }
693 }
694
695 // Add new table files
696 for (const auto& new_file : edit->GetNewFiles()) {
697 const int level = new_file.first;
698 const FileMetaData& meta = new_file.second;
699
700 const Status s = ApplyFileAddition(level, meta);
701 if (!s.ok()) {
702 return s;
703 }
704 }
705
706 return Status::OK();
707 }
708
ApplyLinkedSstChanges(const BlobFileMetaData::LinkedSsts & base,const std::unordered_set<uint64_t> & newly_linked,const std::unordered_set<uint64_t> & newly_unlinked)709 static BlobFileMetaData::LinkedSsts ApplyLinkedSstChanges(
710 const BlobFileMetaData::LinkedSsts& base,
711 const std::unordered_set<uint64_t>& newly_linked,
712 const std::unordered_set<uint64_t>& newly_unlinked) {
713 BlobFileMetaData::LinkedSsts result(base);
714
715 for (uint64_t sst_file_number : newly_unlinked) {
716 assert(result.find(sst_file_number) != result.end());
717
718 result.erase(sst_file_number);
719 }
720
721 for (uint64_t sst_file_number : newly_linked) {
722 assert(result.find(sst_file_number) == result.end());
723
724 result.emplace(sst_file_number);
725 }
726
727 return result;
728 }
729
CreateMetaDataForNewBlobFile(const BlobFileMetaDataDelta & delta)730 static std::shared_ptr<BlobFileMetaData> CreateMetaDataForNewBlobFile(
731 const BlobFileMetaDataDelta& delta) {
732 auto shared_meta = delta.GetSharedMeta();
733 assert(shared_meta);
734
735 assert(delta.GetNewlyUnlinkedSsts().empty());
736
737 auto meta = BlobFileMetaData::Create(
738 std::move(shared_meta), delta.GetNewlyLinkedSsts(),
739 delta.GetAdditionalGarbageCount(), delta.GetAdditionalGarbageBytes());
740
741 return meta;
742 }
743
744 static std::shared_ptr<BlobFileMetaData>
GetOrCreateMetaDataForExistingBlobFile(const std::shared_ptr<BlobFileMetaData> & base_meta,const BlobFileMetaDataDelta & delta)745 GetOrCreateMetaDataForExistingBlobFile(
746 const std::shared_ptr<BlobFileMetaData>& base_meta,
747 const BlobFileMetaDataDelta& delta) {
748 assert(base_meta);
749 assert(!delta.GetSharedMeta());
750
751 if (delta.IsEmpty()) {
752 return base_meta;
753 }
754
755 auto shared_meta = base_meta->GetSharedMeta();
756 assert(shared_meta);
757
758 auto linked_ssts = ApplyLinkedSstChanges(base_meta->GetLinkedSsts(),
759 delta.GetNewlyLinkedSsts(),
760 delta.GetNewlyUnlinkedSsts());
761
762 auto meta = BlobFileMetaData::Create(
763 std::move(shared_meta), std::move(linked_ssts),
764 base_meta->GetGarbageBlobCount() + delta.GetAdditionalGarbageCount(),
765 base_meta->GetGarbageBlobBytes() + delta.GetAdditionalGarbageBytes());
766
767 return meta;
768 }
769
770 // Add the blob file specified by meta to *vstorage if it is determined to
771 // contain valid data (blobs). We make this decision based on the amount
772 // of garbage in the file, and whether the file or any lower-numbered blob
773 // files have any linked SSTs. The latter condition is tracked using the
774 // flag *found_first_non_empty.
AddBlobFileIfNeeded(VersionStorageInfo * vstorage,const std::shared_ptr<BlobFileMetaData> & meta,bool * found_first_non_empty) const775 void AddBlobFileIfNeeded(VersionStorageInfo* vstorage,
776 const std::shared_ptr<BlobFileMetaData>& meta,
777 bool* found_first_non_empty) const {
778 assert(vstorage);
779 assert(meta);
780 assert(found_first_non_empty);
781
782 if (!meta->GetLinkedSsts().empty()) {
783 (*found_first_non_empty) = true;
784 } else if (!(*found_first_non_empty) ||
785 meta->GetGarbageBlobCount() >= meta->GetTotalBlobCount()) {
786 return;
787 }
788
789 vstorage->AddBlobFile(meta);
790 }
791
792 // Merge the blob file metadata from the base version with the changes (edits)
793 // applied, and save the result into *vstorage.
SaveBlobFilesTo(VersionStorageInfo * vstorage) const794 void SaveBlobFilesTo(VersionStorageInfo* vstorage) const {
795 assert(base_vstorage_);
796 assert(vstorage);
797
798 bool found_first_non_empty = false;
799
800 const auto& base_blob_files = base_vstorage_->GetBlobFiles();
801 auto base_it = base_blob_files.begin();
802 const auto base_it_end = base_blob_files.end();
803
804 auto delta_it = blob_file_meta_deltas_.begin();
805 const auto delta_it_end = blob_file_meta_deltas_.end();
806
807 while (base_it != base_it_end && delta_it != delta_it_end) {
808 const uint64_t base_blob_file_number = base_it->first;
809 const uint64_t delta_blob_file_number = delta_it->first;
810
811 if (base_blob_file_number < delta_blob_file_number) {
812 const auto& base_meta = base_it->second;
813
814 AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
815
816 ++base_it;
817 } else if (delta_blob_file_number < base_blob_file_number) {
818 const auto& delta = delta_it->second;
819
820 auto meta = CreateMetaDataForNewBlobFile(delta);
821
822 AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
823
824 ++delta_it;
825 } else {
826 assert(base_blob_file_number == delta_blob_file_number);
827
828 const auto& base_meta = base_it->second;
829 const auto& delta = delta_it->second;
830
831 auto meta = GetOrCreateMetaDataForExistingBlobFile(base_meta, delta);
832
833 AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
834
835 ++base_it;
836 ++delta_it;
837 }
838 }
839
840 while (base_it != base_it_end) {
841 const auto& base_meta = base_it->second;
842
843 AddBlobFileIfNeeded(vstorage, base_meta, &found_first_non_empty);
844
845 ++base_it;
846 }
847
848 while (delta_it != delta_it_end) {
849 const auto& delta = delta_it->second;
850
851 auto meta = CreateMetaDataForNewBlobFile(delta);
852
853 AddBlobFileIfNeeded(vstorage, meta, &found_first_non_empty);
854
855 ++delta_it;
856 }
857 }
858
859 // Save the current state in *vstorage.
SaveTo(VersionStorageInfo * vstorage)860 Status SaveTo(VersionStorageInfo* vstorage) {
861 Status s = CheckConsistency(base_vstorage_);
862 if (!s.ok()) {
863 return s;
864 }
865
866 s = CheckConsistency(vstorage);
867 if (!s.ok()) {
868 return s;
869 }
870
871 for (int level = 0; level < num_levels_; level++) {
872 const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
873 // Merge the set of added files with the set of pre-existing files.
874 // Drop any deleted files. Store the result in *v.
875 const auto& base_files = base_vstorage_->LevelFiles(level);
876 const auto& unordered_added_files = levels_[level].added_files;
877 vstorage->Reserve(level,
878 base_files.size() + unordered_added_files.size());
879
880 // Sort added files for the level.
881 std::vector<FileMetaData*> added_files;
882 added_files.reserve(unordered_added_files.size());
883 for (const auto& pair : unordered_added_files) {
884 added_files.push_back(pair.second);
885 }
886 std::sort(added_files.begin(), added_files.end(), cmp);
887
888 #ifndef NDEBUG
889 FileMetaData* prev_added_file = nullptr;
890 for (const auto& added : added_files) {
891 if (level > 0 && prev_added_file != nullptr) {
892 assert(base_vstorage_->InternalComparator()->Compare(
893 prev_added_file->smallest, added->smallest) <= 0);
894 }
895 prev_added_file = added;
896 }
897 #endif
898
899 auto base_iter = base_files.begin();
900 auto base_end = base_files.end();
901 auto added_iter = added_files.begin();
902 auto added_end = added_files.end();
903 while (added_iter != added_end || base_iter != base_end) {
904 if (base_iter == base_end ||
905 (added_iter != added_end && cmp(*added_iter, *base_iter))) {
906 MaybeAddFile(vstorage, level, *added_iter++);
907 } else {
908 MaybeAddFile(vstorage, level, *base_iter++);
909 }
910 }
911 }
912
913 SaveBlobFilesTo(vstorage);
914
915 s = CheckConsistency(vstorage);
916 return s;
917 }
918
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor,size_t max_file_size_for_l0_meta_pin)919 Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
920 bool prefetch_index_and_filter_in_cache,
921 bool is_initial_load,
922 const SliceTransform* prefix_extractor,
923 size_t max_file_size_for_l0_meta_pin) {
924 assert(table_cache_ != nullptr);
925
926 size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
927 bool always_load = (table_cache_capacity == TableCache::kInfiniteCapacity);
928 size_t max_load = port::kMaxSizet;
929
930 if (!always_load) {
931 // If it is initial loading and not set to always loading all the
932 // files, we only load up to kInitialLoadLimit files, to limit the
933 // time reopening the DB.
934 const size_t kInitialLoadLimit = 16;
935 size_t load_limit;
936 // If the table cache is not 1/4 full, we pin the table handle to
937 // file metadata to avoid the cache read costs when reading the file.
938 // The downside of pinning those files is that LRU won't be followed
939 // for those files. This doesn't matter much because if number of files
940 // of the DB excceeds table cache capacity, eventually no table reader
941 // will be pinned and LRU will be followed.
942 if (is_initial_load) {
943 load_limit = std::min(kInitialLoadLimit, table_cache_capacity / 4);
944 } else {
945 load_limit = table_cache_capacity / 4;
946 }
947
948 size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
949 if (table_cache_usage >= load_limit) {
950 // TODO (yanqin) find a suitable status code.
951 return Status::OK();
952 } else {
953 max_load = load_limit - table_cache_usage;
954 }
955 }
956
957 // <file metadata, level>
958 std::vector<std::pair<FileMetaData*, int>> files_meta;
959 std::vector<Status> statuses;
960 for (int level = 0; level < num_levels_; level++) {
961 for (auto& file_meta_pair : levels_[level].added_files) {
962 auto* file_meta = file_meta_pair.second;
963 // If the file has been opened before, just skip it.
964 if (!file_meta->table_reader_handle) {
965 files_meta.emplace_back(file_meta, level);
966 statuses.emplace_back(Status::OK());
967 }
968 if (files_meta.size() >= max_load) {
969 break;
970 }
971 }
972 if (files_meta.size() >= max_load) {
973 break;
974 }
975 }
976
977 std::atomic<size_t> next_file_meta_idx(0);
978 std::function<void()> load_handlers_func([&]() {
979 while (true) {
980 size_t file_idx = next_file_meta_idx.fetch_add(1);
981 if (file_idx >= files_meta.size()) {
982 break;
983 }
984
985 auto* file_meta = files_meta[file_idx].first;
986 int level = files_meta[file_idx].second;
987 statuses[file_idx] = table_cache_->FindTable(
988 ReadOptions(), file_options_,
989 *(base_vstorage_->InternalComparator()), file_meta->fd,
990 &file_meta->table_reader_handle, prefix_extractor, false /*no_io */,
991 true /* record_read_stats */,
992 internal_stats->GetFileReadHist(level), false, level,
993 prefetch_index_and_filter_in_cache, max_file_size_for_l0_meta_pin);
994 if (file_meta->table_reader_handle != nullptr) {
995 // Load table_reader
996 file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
997 file_meta->table_reader_handle);
998 }
999 }
1000 });
1001
1002 std::vector<port::Thread> threads;
1003 for (int i = 1; i < max_threads; i++) {
1004 threads.emplace_back(load_handlers_func);
1005 }
1006 load_handlers_func();
1007 for (auto& t : threads) {
1008 t.join();
1009 }
1010 Status ret;
1011 for (const auto& s : statuses) {
1012 if (!s.ok()) {
1013 if (ret.ok()) {
1014 ret = s;
1015 }
1016 }
1017 }
1018 return ret;
1019 }
1020
MaybeAddFile(VersionStorageInfo * vstorage,int level,FileMetaData * f)1021 void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
1022 const uint64_t file_number = f->fd.GetNumber();
1023
1024 const auto& level_state = levels_[level];
1025
1026 const auto& del_files = level_state.deleted_files;
1027 const auto del_it = del_files.find(file_number);
1028
1029 if (del_it != del_files.end()) {
1030 // f is to-be-deleted table file
1031 vstorage->RemoveCurrentStats(f);
1032 } else {
1033 const auto& add_files = level_state.added_files;
1034 const auto add_it = add_files.find(file_number);
1035
1036 // Note: if the file appears both in the base version and in the added
1037 // list, the added FileMetaData supersedes the one in the base version.
1038 if (add_it != add_files.end() && add_it->second != f) {
1039 vstorage->RemoveCurrentStats(f);
1040 } else {
1041 vstorage->AddFile(level, f);
1042 }
1043 }
1044 }
1045 };
1046
VersionBuilder(const FileOptions & file_options,const ImmutableCFOptions * ioptions,TableCache * table_cache,VersionStorageInfo * base_vstorage,VersionSet * version_set)1047 VersionBuilder::VersionBuilder(const FileOptions& file_options,
1048 const ImmutableCFOptions* ioptions,
1049 TableCache* table_cache,
1050 VersionStorageInfo* base_vstorage,
1051 VersionSet* version_set)
1052 : rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
1053 version_set)) {}
1054
1055 VersionBuilder::~VersionBuilder() = default;
1056
CheckConsistencyForNumLevels()1057 bool VersionBuilder::CheckConsistencyForNumLevels() {
1058 return rep_->CheckConsistencyForNumLevels();
1059 }
1060
Apply(VersionEdit * edit)1061 Status VersionBuilder::Apply(VersionEdit* edit) { return rep_->Apply(edit); }
1062
SaveTo(VersionStorageInfo * vstorage)1063 Status VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
1064 return rep_->SaveTo(vstorage);
1065 }
1066
LoadTableHandlers(InternalStats * internal_stats,int max_threads,bool prefetch_index_and_filter_in_cache,bool is_initial_load,const SliceTransform * prefix_extractor,size_t max_file_size_for_l0_meta_pin)1067 Status VersionBuilder::LoadTableHandlers(
1068 InternalStats* internal_stats, int max_threads,
1069 bool prefetch_index_and_filter_in_cache, bool is_initial_load,
1070 const SliceTransform* prefix_extractor,
1071 size_t max_file_size_for_l0_meta_pin) {
1072 return rep_->LoadTableHandlers(
1073 internal_stats, max_threads, prefetch_index_and_filter_in_cache,
1074 is_initial_load, prefix_extractor, max_file_size_for_l0_meta_pin);
1075 }
1076
GetMinOldestBlobFileNumber() const1077 uint64_t VersionBuilder::GetMinOldestBlobFileNumber() const {
1078 return rep_->GetMinOldestBlobFileNumber();
1079 }
1080
BaseReferencedVersionBuilder(ColumnFamilyData * cfd)1081 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
1082 ColumnFamilyData* cfd)
1083 : version_builder_(new VersionBuilder(
1084 cfd->current()->version_set()->file_options(), cfd->ioptions(),
1085 cfd->table_cache(), cfd->current()->storage_info(),
1086 cfd->current()->version_set())),
1087 version_(cfd->current()) {
1088 version_->Ref();
1089 }
1090
BaseReferencedVersionBuilder(ColumnFamilyData * cfd,Version * v)1091 BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
1092 ColumnFamilyData* cfd, Version* v)
1093 : version_builder_(new VersionBuilder(
1094 cfd->current()->version_set()->file_options(), cfd->ioptions(),
1095 cfd->table_cache(), v->storage_info(), v->version_set())),
1096 version_(v) {
1097 assert(version_ != cfd->current());
1098 }
1099
~BaseReferencedVersionBuilder()1100 BaseReferencedVersionBuilder::~BaseReferencedVersionBuilder() {
1101 version_->Unref();
1102 }
1103
1104 } // namespace ROCKSDB_NAMESPACE
1105