1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 #ifndef ROCKSDB_LITE
7 
8 #include <unordered_set>
9 
10 #include "db/blob/blob_index.h"
11 #include "monitoring/statistics.h"
12 #include "rocksdb/compaction_filter.h"
13 #include "utilities/blob_db/blob_db_gc_stats.h"
14 #include "utilities/blob_db/blob_db_impl.h"
15 #include "utilities/compaction_filters/layered_compaction_filter_base.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 class SystemClock;
19 namespace blob_db {
20 
21 struct BlobCompactionContext {
22   BlobDBImpl* blob_db_impl = nullptr;
23   uint64_t next_file_number = 0;
24   std::unordered_set<uint64_t> current_blob_files;
25   SequenceNumber fifo_eviction_seq = 0;
26   uint64_t evict_expiration_up_to = 0;
27 };
28 
29 struct BlobCompactionContextGC {
30   uint64_t cutoff_file_number = 0;
31 };
32 
33 // Compaction filter that deletes expired blob indexes from the base DB.
34 // Comes into two varieties, one for the non-GC case and one for the GC case.
35 class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
36  public:
BlobIndexCompactionFilterBase(BlobCompactionContext && _context,const CompactionFilter * _user_comp_filter,std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,uint64_t current_time,Statistics * stats)37   BlobIndexCompactionFilterBase(
38       BlobCompactionContext&& _context,
39       const CompactionFilter* _user_comp_filter,
40       std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
41       uint64_t current_time, Statistics* stats)
42       : LayeredCompactionFilterBase(_user_comp_filter,
43                                     std::move(_user_comp_filter_from_factory)),
44         context_(std::move(_context)),
45         current_time_(current_time),
46         statistics_(stats) {}
47 
48   ~BlobIndexCompactionFilterBase() override;
49 
50   // Filter expired blob indexes regardless of snapshots.
IgnoreSnapshots()51   bool IgnoreSnapshots() const override { return true; }
52 
53   Decision FilterV2(int level, const Slice& key, ValueType value_type,
54                     const Slice& value, std::string* new_value,
55                     std::string* skip_until) const override;
56 
IsStackedBlobDbInternalCompactionFilter()57   bool IsStackedBlobDbInternalCompactionFilter() const override { return true; }
58 
59  protected:
60   bool IsBlobFileOpened() const;
61   virtual bool OpenNewBlobFileIfNeeded() const;
62   bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
63                            PinnableSlice* blob, bool need_decompress,
64                            CompressionType* compression_type) const;
65   bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
66                           uint64_t* new_blob_file_number,
67                           uint64_t* new_blob_offset) const;
68   bool CloseAndRegisterNewBlobFileIfNeeded() const;
69   bool CloseAndRegisterNewBlobFile() const;
70 
statistics()71   Statistics* statistics() const { return statistics_; }
context()72   const BlobCompactionContext& context() const { return context_; }
73 
74  private:
75   Decision HandleValueChange(const Slice& key, std::string* new_value) const;
76 
77  private:
78   BlobCompactionContext context_;
79   const uint64_t current_time_;
80   Statistics* statistics_;
81 
82   mutable std::shared_ptr<BlobFile> blob_file_;
83   mutable std::shared_ptr<BlobLogWriter> writer_;
84 
85   // It is safe to not using std::atomic since the compaction filter, created
86   // from a compaction filter factroy, will not be called from multiple threads.
87   mutable uint64_t expired_count_ = 0;
88   mutable uint64_t expired_size_ = 0;
89   mutable uint64_t evicted_count_ = 0;
90   mutable uint64_t evicted_size_ = 0;
91 };
92 
93 class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
94  public:
BlobIndexCompactionFilter(BlobCompactionContext && _context,const CompactionFilter * _user_comp_filter,std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,uint64_t current_time,Statistics * stats)95   BlobIndexCompactionFilter(
96       BlobCompactionContext&& _context,
97       const CompactionFilter* _user_comp_filter,
98       std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
99       uint64_t current_time, Statistics* stats)
100       : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
101                                       std::move(_user_comp_filter_from_factory),
102                                       current_time, stats) {}
103 
Name()104   const char* Name() const override { return "BlobIndexCompactionFilter"; }
105 };
106 
107 class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
108  public:
BlobIndexCompactionFilterGC(BlobCompactionContext && _context,BlobCompactionContextGC && context_gc,const CompactionFilter * _user_comp_filter,std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,uint64_t current_time,Statistics * stats)109   BlobIndexCompactionFilterGC(
110       BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
111       const CompactionFilter* _user_comp_filter,
112       std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
113       uint64_t current_time, Statistics* stats)
114       : BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
115                                       std::move(_user_comp_filter_from_factory),
116                                       current_time, stats),
117         context_gc_(std::move(context_gc)) {}
118 
119   ~BlobIndexCompactionFilterGC() override;
120 
Name()121   const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
122 
123   BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
124                                  std::string* new_value) const override;
125 
126  private:
127   bool OpenNewBlobFileIfNeeded() const override;
128 
129  private:
130   BlobCompactionContextGC context_gc_;
131   mutable BlobDBGarbageCollectionStats gc_stats_;
132 };
133 
134 // Compaction filter factory; similarly to the filters above, it comes
135 // in two flavors, one that creates filters that support GC, and one
136 // that creates non-GC filters.
137 class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
138  public:
BlobIndexCompactionFilterFactoryBase(BlobDBImpl * _blob_db_impl,SystemClock * _clock,const ColumnFamilyOptions & _cf_options,Statistics * _statistics)139   BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl,
140                                        SystemClock* _clock,
141                                        const ColumnFamilyOptions& _cf_options,
142                                        Statistics* _statistics)
143       : blob_db_impl_(_blob_db_impl),
144         clock_(_clock),
145         statistics_(_statistics),
146         user_comp_filter_(_cf_options.compaction_filter),
147         user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
148 
149  protected:
150   std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
151       const CompactionFilter::Context& context) const;
152 
blob_db_impl()153   BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
clock()154   SystemClock* clock() const { return clock_; }
statistics()155   Statistics* statistics() const { return statistics_; }
user_comp_filter()156   const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
157 
158  private:
159   BlobDBImpl* blob_db_impl_;
160   SystemClock* clock_;
161   Statistics* statistics_;
162   const CompactionFilter* user_comp_filter_;
163   std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
164 };
165 
166 class BlobIndexCompactionFilterFactory
167     : public BlobIndexCompactionFilterFactoryBase {
168  public:
BlobIndexCompactionFilterFactory(BlobDBImpl * _blob_db_impl,SystemClock * _clock,const ColumnFamilyOptions & _cf_options,Statistics * _statistics)169   BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl,
170                                    SystemClock* _clock,
171                                    const ColumnFamilyOptions& _cf_options,
172                                    Statistics* _statistics)
173       : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
174                                              _statistics) {}
175 
Name()176   const char* Name() const override {
177     return "BlobIndexCompactionFilterFactory";
178   }
179 
180   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
181       const CompactionFilter::Context& context) override;
182 };
183 
184 class BlobIndexCompactionFilterFactoryGC
185     : public BlobIndexCompactionFilterFactoryBase {
186  public:
BlobIndexCompactionFilterFactoryGC(BlobDBImpl * _blob_db_impl,SystemClock * _clock,const ColumnFamilyOptions & _cf_options,Statistics * _statistics)187   BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl,
188                                      SystemClock* _clock,
189                                      const ColumnFamilyOptions& _cf_options,
190                                      Statistics* _statistics)
191       : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
192                                              _statistics) {}
193 
Name()194   const char* Name() const override {
195     return "BlobIndexCompactionFilterFactoryGC";
196   }
197 
198   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
199       const CompactionFilter::Context& context) override;
200 };
201 
202 }  // namespace blob_db
203 }  // namespace ROCKSDB_NAMESPACE
204 #endif  // ROCKSDB_LITE
205