1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 #pragma once
6 #ifndef ROCKSDB_LITE
7 
8 #include <unordered_set>
9 
10 #include "db/blob_index.h"
11 #include "monitoring/statistics.h"
12 #include "rocksdb/compaction_filter.h"
13 #include "rocksdb/env.h"
14 #include "utilities/blob_db/blob_db_gc_stats.h"
15 #include "utilities/blob_db/blob_db_impl.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 namespace blob_db {
19 
20 struct BlobCompactionContext {
21   uint64_t next_file_number = 0;
22   std::unordered_set<uint64_t> current_blob_files;
23   SequenceNumber fifo_eviction_seq = 0;
24   uint64_t evict_expiration_up_to = 0;
25 };
26 
27 struct BlobCompactionContextGC {
28   BlobDBImpl* blob_db_impl = nullptr;
29   uint64_t cutoff_file_number = 0;
30 };
31 
32 // Compaction filter that deletes expired blob indexes from the base DB.
33 // Comes into two varieties, one for the non-GC case and one for the GC case.
34 class BlobIndexCompactionFilterBase : public CompactionFilter {
35  public:
BlobIndexCompactionFilterBase(BlobCompactionContext && context,uint64_t current_time,Statistics * stats)36   BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
37                                 uint64_t current_time, Statistics* stats)
38       : context_(std::move(context)),
39         current_time_(current_time),
40         statistics_(stats) {}
41 
~BlobIndexCompactionFilterBase()42   ~BlobIndexCompactionFilterBase() override {
43     RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
44     RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
45     RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
46     RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
47   }
48 
49   // Filter expired blob indexes regardless of snapshots.
IgnoreSnapshots()50   bool IgnoreSnapshots() const override { return true; }
51 
52   Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
53                     const Slice& value, std::string* /*new_value*/,
54                     std::string* /*skip_until*/) const override;
55 
56  protected:
statistics()57   Statistics* statistics() const { return statistics_; }
58 
59  private:
60   BlobCompactionContext context_;
61   const uint64_t current_time_;
62   Statistics* statistics_;
63   // It is safe to not using std::atomic since the compaction filter, created
64   // from a compaction filter factroy, will not be called from multiple threads.
65   mutable uint64_t expired_count_ = 0;
66   mutable uint64_t expired_size_ = 0;
67   mutable uint64_t evicted_count_ = 0;
68   mutable uint64_t evicted_size_ = 0;
69 };
70 
71 class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
72  public:
BlobIndexCompactionFilter(BlobCompactionContext && context,uint64_t current_time,Statistics * stats)73   BlobIndexCompactionFilter(BlobCompactionContext&& context,
74                             uint64_t current_time, Statistics* stats)
75       : BlobIndexCompactionFilterBase(std::move(context), current_time, stats) {
76   }
77 
Name()78   const char* Name() const override { return "BlobIndexCompactionFilter"; }
79 };
80 
81 class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
82  public:
BlobIndexCompactionFilterGC(BlobCompactionContext && context,BlobCompactionContextGC && context_gc,uint64_t current_time,Statistics * stats)83   BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
84                               BlobCompactionContextGC&& context_gc,
85                               uint64_t current_time, Statistics* stats)
86       : BlobIndexCompactionFilterBase(std::move(context), current_time, stats),
87         context_gc_(std::move(context_gc)) {}
88 
89   ~BlobIndexCompactionFilterGC() override;
90 
Name()91   const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
92 
93   BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
94                                  std::string* new_value) const override;
95 
96  private:
97   bool OpenNewBlobFileIfNeeded() const;
98   bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
99                            PinnableSlice* blob,
100                            CompressionType* compression_type) const;
101   bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
102                           uint64_t* new_blob_file_number,
103                           uint64_t* new_blob_offset) const;
104   bool CloseAndRegisterNewBlobFileIfNeeded() const;
105   bool CloseAndRegisterNewBlobFile() const;
106 
107  private:
108   BlobCompactionContextGC context_gc_;
109   mutable std::shared_ptr<BlobFile> blob_file_;
110   mutable std::shared_ptr<Writer> writer_;
111   mutable BlobDBGarbageCollectionStats gc_stats_;
112 };
113 
114 // Compaction filter factory; similarly to the filters above, it comes
115 // in two flavors, one that creates filters that support GC, and one
116 // that creates non-GC filters.
117 class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
118  public:
BlobIndexCompactionFilterFactoryBase(BlobDBImpl * _blob_db_impl,Env * _env,Statistics * _statistics)119   BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
120                                        Statistics* _statistics)
121       : blob_db_impl_(_blob_db_impl), env_(_env), statistics_(_statistics) {}
122 
123  protected:
blob_db_impl()124   BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
env()125   Env* env() const { return env_; }
statistics()126   Statistics* statistics() const { return statistics_; }
127 
128  private:
129   BlobDBImpl* blob_db_impl_;
130   Env* env_;
131   Statistics* statistics_;
132 };
133 
134 class BlobIndexCompactionFilterFactory
135     : public BlobIndexCompactionFilterFactoryBase {
136  public:
BlobIndexCompactionFilterFactory(BlobDBImpl * _blob_db_impl,Env * _env,Statistics * _statistics)137   BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
138                                    Statistics* _statistics)
139       : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
140   }
141 
Name()142   const char* Name() const override {
143     return "BlobIndexCompactionFilterFactory";
144   }
145 
146   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
147       const CompactionFilter::Context& /*context*/) override;
148 };
149 
150 class BlobIndexCompactionFilterFactoryGC
151     : public BlobIndexCompactionFilterFactoryBase {
152  public:
BlobIndexCompactionFilterFactoryGC(BlobDBImpl * _blob_db_impl,Env * _env,Statistics * _statistics)153   BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
154                                      Statistics* _statistics)
155       : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
156   }
157 
Name()158   const char* Name() const override {
159     return "BlobIndexCompactionFilterFactoryGC";
160   }
161 
162   std::unique_ptr<CompactionFilter> CreateCompactionFilter(
163       const CompactionFilter::Context& /*context*/) override;
164 };
165 
166 }  // namespace blob_db
167 }  // namespace ROCKSDB_NAMESPACE
168 #endif  // ROCKSDB_LITE
169