1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #ifndef ROCKSDB_LITE
9 
10 #include <atomic>
11 #include <condition_variable>
12 #include <limits>
13 #include <list>
14 #include <memory>
15 #include <set>
16 #include <string>
17 #include <thread>
18 #include <unordered_map>
19 #include <utility>
20 #include <vector>
21 
22 #include "db/blob/blob_log_format.h"
23 #include "db/blob/blob_log_writer.h"
24 #include "db/db_iter.h"
25 #include "rocksdb/compaction_filter.h"
26 #include "rocksdb/db.h"
27 #include "rocksdb/file_system.h"
28 #include "rocksdb/listener.h"
29 #include "rocksdb/options.h"
30 #include "rocksdb/statistics.h"
31 #include "rocksdb/wal_filter.h"
32 #include "util/mutexlock.h"
33 #include "util/timer_queue.h"
34 #include "utilities/blob_db/blob_db.h"
35 #include "utilities/blob_db/blob_file.h"
36 
37 namespace ROCKSDB_NAMESPACE {
38 
39 class DBImpl;
40 class ColumnFamilyHandle;
41 class ColumnFamilyData;
42 class SystemClock;
43 
44 struct FlushJobInfo;
45 
46 namespace blob_db {
47 
48 struct BlobCompactionContext;
49 struct BlobCompactionContextGC;
50 class BlobDBImpl;
51 class BlobFile;
52 
53 // Comparator to sort "TTL" aware Blob files based on the lower value of
54 // TTL range.
55 struct BlobFileComparatorTTL {
56   bool operator()(const std::shared_ptr<BlobFile>& lhs,
57                   const std::shared_ptr<BlobFile>& rhs) const;
58 };
59 
60 struct BlobFileComparator {
61   bool operator()(const std::shared_ptr<BlobFile>& lhs,
62                   const std::shared_ptr<BlobFile>& rhs) const;
63 };
64 
65 /**
66  * The implementation class for BlobDB. It manages the blob logs, which
67  * are sequentially written files. Blob logs can be of the TTL or non-TTL
68  * varieties; the former are cleaned up when they expire, while the latter
69  * are (optionally) garbage collected.
70  */
71 class BlobDBImpl : public BlobDB {
72   friend class BlobFile;
73   friend class BlobDBIterator;
74   friend class BlobDBListener;
75   friend class BlobDBListenerGC;
76   friend class BlobIndexCompactionFilterBase;
77   friend class BlobIndexCompactionFilterGC;
78 
79  public:
80   // deletions check period
81   static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
82 
83   // sanity check task
84   static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
85 
86   // how many random access open files can we tolerate
87   static constexpr uint32_t kOpenFilesTrigger = 100;
88 
89   // how often to schedule reclaim open files.
90   static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
91 
92   // how often to schedule delete obs files periods
93   static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
94 
95   // how often to schedule expired files eviction.
96   static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
97 
98   // when should oldest file be evicted:
99   // on reaching 90% of blob_dir_size
100   static constexpr double kEvictOldestFileAtSize = 0.9;
101 
102   using BlobDB::Put;
103   Status Put(const WriteOptions& options, const Slice& key,
104              const Slice& value) override;
105 
106   using BlobDB::Get;
107   Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
108              const Slice& key, PinnableSlice* value) override;
109 
110   Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
111              const Slice& key, PinnableSlice* value,
112              uint64_t* expiration) override;
113 
114   using BlobDB::NewIterator;
115   virtual Iterator* NewIterator(const ReadOptions& read_options) override;
116 
117   using BlobDB::NewIterators;
NewIterators(const ReadOptions &,const std::vector<ColumnFamilyHandle * > &,std::vector<Iterator * > *)118   virtual Status NewIterators(
119       const ReadOptions& /*read_options*/,
120       const std::vector<ColumnFamilyHandle*>& /*column_families*/,
121       std::vector<Iterator*>* /*iterators*/) override {
122     return Status::NotSupported("Not implemented");
123   }
124 
125   using BlobDB::MultiGet;
126   virtual std::vector<Status> MultiGet(
127       const ReadOptions& read_options,
128       const std::vector<Slice>& keys,
129       std::vector<std::string>* values) override;
130 
131   virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
132 
133   virtual Status Close() override;
134 
135   using BlobDB::PutWithTTL;
136   Status PutWithTTL(const WriteOptions& options, const Slice& key,
137                     const Slice& value, uint64_t ttl) override;
138 
139   using BlobDB::PutUntil;
140   Status PutUntil(const WriteOptions& options, const Slice& key,
141                   const Slice& value, uint64_t expiration) override;
142 
143   using BlobDB::CompactFiles;
144   Status CompactFiles(
145       const CompactionOptions& compact_options,
146       const std::vector<std::string>& input_file_names, const int output_level,
147       const int output_path_id = -1,
148       std::vector<std::string>* const output_file_names = nullptr,
149       CompactionJobInfo* compaction_job_info = nullptr) override;
150 
151   BlobDBOptions GetBlobDBOptions() const override;
152 
153   BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
154              const DBOptions& db_options,
155              const ColumnFamilyOptions& cf_options);
156 
157   virtual Status DisableFileDeletions() override;
158 
159   virtual Status EnableFileDeletions(bool force) override;
160 
161   virtual Status GetLiveFiles(std::vector<std::string>&,
162                               uint64_t* manifest_file_size,
163                               bool flush_memtable = true) override;
164   virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
165 
166   ~BlobDBImpl();
167 
168   Status Open(std::vector<ColumnFamilyHandle*>* handles);
169 
170   Status SyncBlobFiles() override;
171 
172   // Common part of the two GetCompactionContext methods below.
173   // REQUIRES: read lock on mutex_
174   void GetCompactionContextCommon(BlobCompactionContext* context);
175 
176   void GetCompactionContext(BlobCompactionContext* context);
177   void GetCompactionContext(BlobCompactionContext* context,
178                             BlobCompactionContextGC* context_gc);
179 
180 #ifndef NDEBUG
181   Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
182                            PinnableSlice* value);
183 
184   void TEST_AddDummyBlobFile(uint64_t blob_file_number,
185                              SequenceNumber immutable_sequence);
186 
187   std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
188 
189   std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const;
190 
191   std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
192 
193   Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
194 
195   void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
196                              SequenceNumber obsolete_seq = 0,
197                              bool update_size = true);
198 
199   void TEST_EvictExpiredFiles();
200 
201   void TEST_DeleteObsoleteFiles();
202 
203   uint64_t TEST_live_sst_size();
204 
TEST_blob_dir()205   const std::string& TEST_blob_dir() const { return blob_dir_; }
206 
207   void TEST_InitializeBlobFileToSstMapping(
208       const std::vector<LiveFileMetaData>& live_files);
209 
210   void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
211 
212   void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
213 
214 #endif  //  !NDEBUG
215 
216  private:
217   class BlobInserter;
218 
219   // Create a snapshot if there isn't one in read options.
220   // Return true if a snapshot is created.
221   bool SetSnapshotIfNeeded(ReadOptions* read_options);
222 
223   Status GetImpl(const ReadOptions& read_options,
224                  ColumnFamilyHandle* column_family, const Slice& key,
225                  PinnableSlice* value, uint64_t* expiration = nullptr);
226 
227   Status GetBlobValue(const Slice& key, const Slice& index_entry,
228                       PinnableSlice* value, uint64_t* expiration = nullptr);
229 
230   Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
231                             uint64_t offset, uint64_t size,
232                             PinnableSlice* value,
233                             CompressionType* compression_type);
234 
235   Slice GetCompressedSlice(const Slice& raw,
236                            std::string* compression_output) const;
237 
238   Status DecompressSlice(const Slice& compressed_value,
239                          CompressionType compression_type,
240                          PinnableSlice* value_output) const;
241 
242   // Close a file by appending a footer, and removes file from open files list.
243   // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
244   // and the blob file's mutex_. If called on a blob file which is visible only
245   // to a single thread (like in the case of new files written during
246   // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be
247   // avoided.
248   Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
249 
250   // Close a file if its size exceeds blob_file_size
251   // REQUIRES: lock held on write_mutex_.
252   Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
253 
254   // Mark file as obsolete and move the file to obsolete file list.
255   //
256   // REQUIRED: hold write lock of mutex_ or during DB open.
257   void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
258                         SequenceNumber obsolete_seq, bool update_size);
259 
260   Status PutBlobValue(const WriteOptions& options, const Slice& key,
261                       const Slice& value, uint64_t expiration,
262                       WriteBatch* batch);
263 
264   Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
265                     const std::string& headerbuf, const Slice& key,
266                     const Slice& value, uint64_t expiration,
267                     std::string* index_entry);
268 
269   // Create a new blob file and associated writer.
270   Status CreateBlobFileAndWriter(bool has_ttl,
271                                  const ExpirationRange& expiration_range,
272                                  const std::string& reason,
273                                  std::shared_ptr<BlobFile>* blob_file,
274                                  std::shared_ptr<BlobLogWriter>* writer);
275 
276   // Get the open non-TTL blob log file, or create a new one if no such file
277   // exists.
278   Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
279 
280   // Get the open TTL blob log file for a certain expiration, or create a new
281   // one if no such file exists.
282   Status SelectBlobFileTTL(uint64_t expiration,
283                            std::shared_ptr<BlobFile>* blob_file);
284 
285   std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
286 
287   // periodic sanity check. Bunch of checks
288   std::pair<bool, int64_t> SanityCheck(bool aborted);
289 
290   // Delete files that have been marked obsolete (either because of TTL
291   // or GC). Check whether any snapshots exist which refer to the same.
292   std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
293 
294   // periodically check if open blob files and their TTL's has expired
295   // if expired, close the sequential writer and make the file immutable
296   std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
297 
298   // if the number of open files, approaches ULIMIT's this
299   // task will close random readers, which are kept around for
300   // efficiency
301   std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
302 
303   std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
304 
305   // Adds the background tasks to the timer queue
306   void StartBackgroundTasks();
307 
308   // add a new Blob File
309   std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
310                                         const ExpirationRange& expiration_range,
311                                         const std::string& reason);
312 
313   // Register a new blob file.
314   // REQUIRES: write lock on mutex_.
315   void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
316 
317   // collect all the blob log files from the blob directory
318   Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
319 
320   // Open all blob files found in blob_dir.
321   Status OpenAllBlobFiles();
322 
323   // Link an SST to a blob file. Comes in locking and non-locking varieties
324   // (the latter is used during Open).
325   template <typename Linker>
326   void LinkSstToBlobFileImpl(uint64_t sst_file_number,
327                              uint64_t blob_file_number, Linker linker);
328 
329   void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
330 
331   void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
332                                uint64_t blob_file_number);
333 
334   // Unlink an SST from a blob file.
335   void UnlinkSstFromBlobFile(uint64_t sst_file_number,
336                              uint64_t blob_file_number);
337 
338   // Initialize the mapping between blob files and SSTs during Open.
339   void InitializeBlobFileToSstMapping(
340       const std::vector<LiveFileMetaData>& live_files);
341 
342   // Update the mapping between blob files and SSTs after a flush and mark
343   // any unneeded blob files obsolete.
344   void ProcessFlushJobInfo(const FlushJobInfo& info);
345 
346   // Update the mapping between blob files and SSTs after a compaction and
347   // mark any unneeded blob files obsolete.
348   void ProcessCompactionJobInfo(const CompactionJobInfo& info);
349 
350   // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs
351   // linked to it, and all memtables from before the blob file became immutable
352   // have been flushed. Note: should only be called if the condition holds for
353   // all lower-numbered non-TTL blob files as well.
354   bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file,
355                                     SequenceNumber obsolete_seq);
356 
357   // Mark all immutable non-TTL blob files that aren't needed by any SSTs as
358   // obsolete. Comes in two varieties; the version used during Open need not
359   // worry about locking or snapshots.
360   template <class Functor>
361   void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed);
362 
363   void MarkUnreferencedBlobFilesObsolete();
364   void MarkUnreferencedBlobFilesObsoleteDuringOpen();
365 
366   void UpdateLiveSSTSize();
367 
368   Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
369                            std::shared_ptr<RandomAccessFileReader>* reader);
370 
371   // hold write mutex on file and call.
372   // Close the above Random Access reader
373   void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
374 
375   // hold write mutex on file and call
376   // creates a sequential (append) writer for this blobfile
377   Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
378 
379   // returns a BlobLogWriter object for the file. If writer is not
380   // already present, creates one. Needs Write Mutex to be held
381   Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
382                                    std::shared_ptr<BlobLogWriter>* writer);
383 
384   // checks if there is no snapshot which is referencing the
385   // blobs
386   bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
387   bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
388 
389   void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
390 
EpochNow()391   uint64_t EpochNow() { return clock_->NowMicros() / 1000000; }
392 
393   // Check if inserting a new blob will make DB grow out of space.
394   // If is_fifo = true, FIFO eviction will be triggered to make room for the
395   // new blob. If force_evict = true, FIFO eviction will evict blob files
396   // even eviction will not make enough room for the new blob.
397   Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
398                                     bool force_evict = false);
399 
400   // name of the database directory
401   std::string dbname_;
402 
403   // the base DB
404   DBImpl* db_impl_;
405   Env* env_;
406   SystemClock* clock_;
407   // the options that govern the behavior of Blob Storage
408   BlobDBOptions bdb_options_;
409   DBOptions db_options_;
410   ColumnFamilyOptions cf_options_;
411   FileOptions file_options_;
412 
413   // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
414   // ownership.
415   Statistics* statistics_;
416 
417   // by default this is "blob_dir" under dbname_
418   // but can be configured
419   std::string blob_dir_;
420 
421   // pointer to directory
422   std::unique_ptr<Directory> dir_ent_;
423 
424   // Read Write Mutex, which protects all the data structures
425   // HEAVILY TRAFFICKED
426   mutable port::RWMutex mutex_;
427 
428   // Writers has to hold write_mutex_ before writing.
429   mutable port::Mutex write_mutex_;
430 
431   // counter for blob file number
432   std::atomic<uint64_t> next_file_number_;
433 
434   // entire metadata of all the BLOB files memory
435   std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
436 
437   // All live immutable non-TTL blob files.
438   std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_;
439 
440   // The largest sequence number that has been flushed.
441   SequenceNumber flush_sequence_;
442 
443   // opened non-TTL blob file.
444   std::shared_ptr<BlobFile> open_non_ttl_file_;
445 
446   // all the blob files which are currently being appended to based
447   // on variety of incoming TTL's
448   std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
449 
450   // Flag to check whether Close() has been called on this DB
451   bool closed_;
452 
453   // timer based queue to execute tasks
454   TimerQueue tqueue_;
455 
456   // number of files opened for random access/GET
457   // counter is used to monitor and close excess RA files.
458   std::atomic<uint32_t> open_file_count_;
459 
460   // Total size of all live blob files (i.e. exclude obsolete files).
461   std::atomic<uint64_t> total_blob_size_;
462 
463   // total size of SST files.
464   std::atomic<uint64_t> live_sst_size_;
465 
466   // Latest FIFO eviction timestamp
467   //
468   // REQUIRES: access with metex_ lock held.
469   uint64_t fifo_eviction_seq_;
470 
471   // The expiration up to which latest FIFO eviction evicts.
472   //
473   // REQUIRES: access with metex_ lock held.
474   uint64_t evict_expiration_up_to_;
475 
476   std::list<std::shared_ptr<BlobFile>> obsolete_files_;
477 
478   // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
479   // on the mutex to avoid contention.
480   //
481   // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
482   // the difference. mutex_ only needs to be held when access the
483   // data-structure, and delete_file_mutex_ needs to be held the whole time
484   // during DeleteObsoleteFiles to avoid being run simultaneously with
485   // DisableFileDeletions.
486   //
487   // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
488   // to hold delete_file_mutex_ first to avoid deadlock.
489   mutable port::Mutex delete_file_mutex_;
490 
491   // Each call of DisableFileDeletions will increase disable_file_deletion_
492   // by 1. EnableFileDeletions will either decrease the count by 1 or reset
493   // it to zeor, depending on the force flag.
494   //
495   // REQUIRES: access with delete_file_mutex_ held.
496   int disable_file_deletions_ = 0;
497 
498   uint32_t debug_level_;
499 };
500 
501 }  // namespace blob_db
502 }  // namespace ROCKSDB_NAMESPACE
503 #endif  // ROCKSDB_LITE
504