1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 #pragma once 6 #ifndef ROCKSDB_LITE 7 8 #include <unordered_set> 9 10 #include "db/blob/blob_index.h" 11 #include "monitoring/statistics.h" 12 #include "rocksdb/compaction_filter.h" 13 #include "utilities/blob_db/blob_db_gc_stats.h" 14 #include "utilities/blob_db/blob_db_impl.h" 15 #include "utilities/compaction_filters/layered_compaction_filter_base.h" 16 17 namespace ROCKSDB_NAMESPACE { 18 class SystemClock; 19 namespace blob_db { 20 21 struct BlobCompactionContext { 22 BlobDBImpl* blob_db_impl = nullptr; 23 uint64_t next_file_number = 0; 24 std::unordered_set<uint64_t> current_blob_files; 25 SequenceNumber fifo_eviction_seq = 0; 26 uint64_t evict_expiration_up_to = 0; 27 }; 28 29 struct BlobCompactionContextGC { 30 uint64_t cutoff_file_number = 0; 31 }; 32 33 // Compaction filter that deletes expired blob indexes from the base DB. 34 // Comes into two varieties, one for the non-GC case and one for the GC case. 35 class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase { 36 public: BlobIndexCompactionFilterBase(BlobCompactionContext && _context,const CompactionFilter * _user_comp_filter,std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,uint64_t current_time,Statistics * stats)37 BlobIndexCompactionFilterBase( 38 BlobCompactionContext&& _context, 39 const CompactionFilter* _user_comp_filter, 40 std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory, 41 uint64_t current_time, Statistics* stats) 42 : LayeredCompactionFilterBase(_user_comp_filter, 43 std::move(_user_comp_filter_from_factory)), 44 context_(std::move(_context)), 45 current_time_(current_time), 46 statistics_(stats) {} 47 48 ~BlobIndexCompactionFilterBase() override; 49 50 // Filter expired blob indexes regardless of snapshots. IgnoreSnapshots()51 bool IgnoreSnapshots() const override { return true; } 52 53 Decision FilterV2(int level, const Slice& key, ValueType value_type, 54 const Slice& value, std::string* new_value, 55 std::string* skip_until) const override; 56 IsStackedBlobDbInternalCompactionFilter()57 bool IsStackedBlobDbInternalCompactionFilter() const override { return true; } 58 59 protected: 60 bool IsBlobFileOpened() const; 61 virtual bool OpenNewBlobFileIfNeeded() const; 62 bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index, 63 PinnableSlice* blob, bool need_decompress, 64 CompressionType* compression_type) const; 65 bool WriteBlobToNewFile(const Slice& key, const Slice& blob, 66 uint64_t* new_blob_file_number, 67 uint64_t* new_blob_offset) const; 68 bool CloseAndRegisterNewBlobFileIfNeeded() const; 69 bool CloseAndRegisterNewBlobFile() const; 70 statistics()71 Statistics* statistics() const { return statistics_; } context()72 const BlobCompactionContext& context() const { return context_; } 73 74 private: 75 Decision HandleValueChange(const Slice& key, std::string* new_value) const; 76 77 private: 78 BlobCompactionContext context_; 79 const uint64_t current_time_; 80 Statistics* statistics_; 81 82 mutable std::shared_ptr<BlobFile> blob_file_; 83 mutable std::shared_ptr<BlobLogWriter> writer_; 84 85 // It is safe to not using std::atomic since the compaction filter, created 86 // from a compaction filter factroy, will not be called from multiple threads. 87 mutable uint64_t expired_count_ = 0; 88 mutable uint64_t expired_size_ = 0; 89 mutable uint64_t evicted_count_ = 0; 90 mutable uint64_t evicted_size_ = 0; 91 }; 92 93 class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase { 94 public: BlobIndexCompactionFilter(BlobCompactionContext && _context,const CompactionFilter * _user_comp_filter,std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,uint64_t current_time,Statistics * stats)95 BlobIndexCompactionFilter( 96 BlobCompactionContext&& _context, 97 const CompactionFilter* _user_comp_filter, 98 std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory, 99 uint64_t current_time, Statistics* stats) 100 : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter, 101 std::move(_user_comp_filter_from_factory), 102 current_time, stats) {} 103 Name()104 const char* Name() const override { return "BlobIndexCompactionFilter"; } 105 }; 106 107 class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase { 108 public: BlobIndexCompactionFilterGC(BlobCompactionContext && _context,BlobCompactionContextGC && context_gc,const CompactionFilter * _user_comp_filter,std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,uint64_t current_time,Statistics * stats)109 BlobIndexCompactionFilterGC( 110 BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc, 111 const CompactionFilter* _user_comp_filter, 112 std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory, 113 uint64_t current_time, Statistics* stats) 114 : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter, 115 std::move(_user_comp_filter_from_factory), 116 current_time, stats), 117 context_gc_(std::move(context_gc)) {} 118 119 ~BlobIndexCompactionFilterGC() override; 120 Name()121 const char* Name() const override { return "BlobIndexCompactionFilterGC"; } 122 123 BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value, 124 std::string* new_value) const override; 125 126 private: 127 bool OpenNewBlobFileIfNeeded() const override; 128 129 private: 130 BlobCompactionContextGC context_gc_; 131 mutable BlobDBGarbageCollectionStats gc_stats_; 132 }; 133 134 // Compaction filter factory; similarly to the filters above, it comes 135 // in two flavors, one that creates filters that support GC, and one 136 // that creates non-GC filters. 137 class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory { 138 public: BlobIndexCompactionFilterFactoryBase(BlobDBImpl * _blob_db_impl,SystemClock * _clock,const ColumnFamilyOptions & _cf_options,Statistics * _statistics)139 BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, 140 SystemClock* _clock, 141 const ColumnFamilyOptions& _cf_options, 142 Statistics* _statistics) 143 : blob_db_impl_(_blob_db_impl), 144 clock_(_clock), 145 statistics_(_statistics), 146 user_comp_filter_(_cf_options.compaction_filter), 147 user_comp_filter_factory_(_cf_options.compaction_filter_factory) {} 148 149 protected: 150 std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory( 151 const CompactionFilter::Context& context) const; 152 blob_db_impl()153 BlobDBImpl* blob_db_impl() const { return blob_db_impl_; } clock()154 SystemClock* clock() const { return clock_; } statistics()155 Statistics* statistics() const { return statistics_; } user_comp_filter()156 const CompactionFilter* user_comp_filter() const { return user_comp_filter_; } 157 158 private: 159 BlobDBImpl* blob_db_impl_; 160 SystemClock* clock_; 161 Statistics* statistics_; 162 const CompactionFilter* user_comp_filter_; 163 std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_; 164 }; 165 166 class BlobIndexCompactionFilterFactory 167 : public BlobIndexCompactionFilterFactoryBase { 168 public: BlobIndexCompactionFilterFactory(BlobDBImpl * _blob_db_impl,SystemClock * _clock,const ColumnFamilyOptions & _cf_options,Statistics * _statistics)169 BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, 170 SystemClock* _clock, 171 const ColumnFamilyOptions& _cf_options, 172 Statistics* _statistics) 173 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options, 174 _statistics) {} 175 Name()176 const char* Name() const override { 177 return "BlobIndexCompactionFilterFactory"; 178 } 179 180 std::unique_ptr<CompactionFilter> CreateCompactionFilter( 181 const CompactionFilter::Context& context) override; 182 }; 183 184 class BlobIndexCompactionFilterFactoryGC 185 : public BlobIndexCompactionFilterFactoryBase { 186 public: BlobIndexCompactionFilterFactoryGC(BlobDBImpl * _blob_db_impl,SystemClock * _clock,const ColumnFamilyOptions & _cf_options,Statistics * _statistics)187 BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, 188 SystemClock* _clock, 189 const ColumnFamilyOptions& _cf_options, 190 Statistics* _statistics) 191 : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options, 192 _statistics) {} 193 Name()194 const char* Name() const override { 195 return "BlobIndexCompactionFilterFactoryGC"; 196 } 197 198 std::unique_ptr<CompactionFilter> CreateCompactionFilter( 199 const CompactionFilter::Context& context) override; 200 }; 201 202 } // namespace blob_db 203 } // namespace ROCKSDB_NAMESPACE 204 #endif // ROCKSDB_LITE 205