1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 #ifndef ROCKSDB_LITE 7 8 #include <unordered_set> 9 10 #include "db/blob_index.h" 11 #include "monitoring/statistics.h" 12 #include "rocksdb/compaction_filter.h" 13 #include "rocksdb/env.h" 14 #include "utilities/blob_db/blob_db_gc_stats.h" 15 #include "utilities/blob_db/blob_db_impl.h" 16 17 namespace ROCKSDB_NAMESPACE { 18 namespace blob_db { 19 20 struct BlobCompactionContext { 21 uint64_t next_file_number = 0; 22 std::unordered_set<uint64_t> current_blob_files; 23 SequenceNumber fifo_eviction_seq = 0; 24 uint64_t evict_expiration_up_to = 0; 25 }; 26 27 struct BlobCompactionContextGC { 28 BlobDBImpl* blob_db_impl = nullptr; 29 uint64_t cutoff_file_number = 0; 30 }; 31 32 // Compaction filter that deletes expired blob indexes from the base DB. 33 // Comes into two varieties, one for the non-GC case and one for the GC case. 34 class BlobIndexCompactionFilterBase : public CompactionFilter { 35 public: BlobIndexCompactionFilterBase(BlobCompactionContext && context,uint64_t current_time,Statistics * stats)36 BlobIndexCompactionFilterBase(BlobCompactionContext&& context, 37 uint64_t current_time, Statistics* stats) 38 : context_(std::move(context)), 39 current_time_(current_time), 40 statistics_(stats) {} 41 ~BlobIndexCompactionFilterBase()42 ~BlobIndexCompactionFilterBase() override { 43 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); 44 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); 45 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); 46 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); 47 } 48 49 // Filter expired blob indexes regardless of snapshots. IgnoreSnapshots()50 bool IgnoreSnapshots() const override { return true; } 51 52 Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type, 53 const Slice& value, std::string* /*new_value*/, 54 std::string* /*skip_until*/) const override; 55 56 protected: statistics()57 Statistics* statistics() const { return statistics_; } 58 59 private: 60 BlobCompactionContext context_; 61 const uint64_t current_time_; 62 Statistics* statistics_; 63 // It is safe to not using std::atomic since the compaction filter, created 64 // from a compaction filter factroy, will not be called from multiple threads. 65 mutable uint64_t expired_count_ = 0; 66 mutable uint64_t expired_size_ = 0; 67 mutable uint64_t evicted_count_ = 0; 68 mutable uint64_t evicted_size_ = 0; 69 }; 70 71 class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase { 72 public: BlobIndexCompactionFilter(BlobCompactionContext && context,uint64_t current_time,Statistics * stats)73 BlobIndexCompactionFilter(BlobCompactionContext&& context, 74 uint64_t current_time, Statistics* stats) 75 : BlobIndexCompactionFilterBase(std::move(context), current_time, stats) { 76 } 77 Name()78 const char* Name() const override { return "BlobIndexCompactionFilter"; } 79 }; 80 81 class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase { 82 public: BlobIndexCompactionFilterGC(BlobCompactionContext && context,BlobCompactionContextGC && context_gc,uint64_t current_time,Statistics * stats)83 BlobIndexCompactionFilterGC(BlobCompactionContext&& context, 84 BlobCompactionContextGC&& context_gc, 85 uint64_t current_time, Statistics* stats) 86 : BlobIndexCompactionFilterBase(std::move(context), current_time, stats), 87 context_gc_(std::move(context_gc)) {} 88 89 ~BlobIndexCompactionFilterGC() override; 90 Name()91 const char* Name() const override { return "BlobIndexCompactionFilterGC"; } 92 93 BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value, 94 std::string* new_value) const override; 95 96 private: 97 bool OpenNewBlobFileIfNeeded() const; 98 bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index, 99 PinnableSlice* blob, 100 CompressionType* compression_type) const; 101 bool WriteBlobToNewFile(const Slice& key, const Slice& blob, 102 uint64_t* new_blob_file_number, 103 uint64_t* new_blob_offset) const; 104 bool CloseAndRegisterNewBlobFileIfNeeded() const; 105 bool CloseAndRegisterNewBlobFile() const; 106 107 private: 108 BlobCompactionContextGC context_gc_; 109 mutable std::shared_ptr<BlobFile> blob_file_; 110 mutable std::shared_ptr<Writer> writer_; 111 mutable BlobDBGarbageCollectionStats gc_stats_; 112 }; 113 114 // Compaction filter factory; similarly to the filters above, it comes 115 // in two flavors, one that creates filters that support GC, and one 116 // that creates non-GC filters. 117 class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory { 118 public: BlobIndexCompactionFilterFactoryBase(BlobDBImpl * _blob_db_impl,Env * _env,Statistics * _statistics)119 BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env, 120 Statistics* _statistics) 121 : blob_db_impl_(_blob_db_impl), env_(_env), statistics_(_statistics) {} 122 123 protected: blob_db_impl()124 BlobDBImpl* blob_db_impl() const { return blob_db_impl_; } env()125 Env* env() const { return env_; } statistics()126 Statistics* statistics() const { return statistics_; } 127 128 private: 129 BlobDBImpl* blob_db_impl_; 130 Env* env_; 131 Statistics* statistics_; 132 }; 133 134 class BlobIndexCompactionFilterFactory 135 : public BlobIndexCompactionFilterFactoryBase { 136 public: BlobIndexCompactionFilterFactory(BlobDBImpl * _blob_db_impl,Env * _env,Statistics * _statistics)137 BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env, 138 Statistics* _statistics) 139 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) { 140 } 141 Name()142 const char* Name() const override { 143 return "BlobIndexCompactionFilterFactory"; 144 } 145 146 std::unique_ptr<CompactionFilter> CreateCompactionFilter( 147 const CompactionFilter::Context& /*context*/) override; 148 }; 149 150 class BlobIndexCompactionFilterFactoryGC 151 : public BlobIndexCompactionFilterFactoryBase { 152 public: BlobIndexCompactionFilterFactoryGC(BlobDBImpl * _blob_db_impl,Env * _env,Statistics * _statistics)153 BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env, 154 Statistics* _statistics) 155 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) { 156 } 157 Name()158 const char* Name() const override { 159 return "BlobIndexCompactionFilterFactoryGC"; 160 } 161 162 std::unique_ptr<CompactionFilter> CreateCompactionFilter( 163 const CompactionFilter::Context& /*context*/) override; 164 }; 165 166 } // namespace blob_db 167 } // namespace ROCKSDB_NAMESPACE 168 #endif // ROCKSDB_LITE 169