1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #ifndef ROCKSDB_LITE
9 
10 #include <atomic>
11 #include <condition_variable>
12 #include <limits>
13 #include <list>
14 #include <memory>
15 #include <set>
16 #include <string>
17 #include <thread>
18 #include <unordered_map>
19 #include <utility>
20 #include <vector>
21 
22 #include "db/db_iter.h"
23 #include "rocksdb/compaction_filter.h"
24 #include "rocksdb/db.h"
25 #include "rocksdb/listener.h"
26 #include "rocksdb/options.h"
27 #include "rocksdb/statistics.h"
28 #include "rocksdb/wal_filter.h"
29 #include "util/mutexlock.h"
30 #include "util/timer_queue.h"
31 #include "utilities/blob_db/blob_db.h"
32 #include "utilities/blob_db/blob_file.h"
33 #include "utilities/blob_db/blob_log_format.h"
34 #include "utilities/blob_db/blob_log_reader.h"
35 #include "utilities/blob_db/blob_log_writer.h"
36 
37 namespace ROCKSDB_NAMESPACE {
38 
39 class DBImpl;
40 class ColumnFamilyHandle;
41 class ColumnFamilyData;
42 struct FlushJobInfo;
43 
44 namespace blob_db {
45 
46 struct BlobCompactionContext;
47 struct BlobCompactionContextGC;
48 class BlobDBImpl;
49 class BlobFile;
50 
51 // Comparator to sort "TTL" aware Blob files based on the lower value of
52 // TTL range.
53 struct BlobFileComparatorTTL {
54   bool operator()(const std::shared_ptr<BlobFile>& lhs,
55                   const std::shared_ptr<BlobFile>& rhs) const;
56 };
57 
58 struct BlobFileComparator {
59   bool operator()(const std::shared_ptr<BlobFile>& lhs,
60                   const std::shared_ptr<BlobFile>& rhs) const;
61 };
62 
63 /**
64  * The implementation class for BlobDB. It manages the blob logs, which
65  * are sequentially written files. Blob logs can be of the TTL or non-TTL
66  * varieties; the former are cleaned up when they expire, while the latter
67  * are (optionally) garbage collected.
68  */
69 class BlobDBImpl : public BlobDB {
70   friend class BlobFile;
71   friend class BlobDBIterator;
72   friend class BlobDBListener;
73   friend class BlobDBListenerGC;
74   friend class BlobIndexCompactionFilterGC;
75 
76  public:
77   // deletions check period
78   static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
79 
80   // sanity check task
81   static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
82 
83   // how many random access open files can we tolerate
84   static constexpr uint32_t kOpenFilesTrigger = 100;
85 
86   // how often to schedule reclaim open files.
87   static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
88 
89   // how often to schedule delete obs files periods
90   static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
91 
92   // how often to schedule expired files eviction.
93   static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
94 
95   // when should oldest file be evicted:
96   // on reaching 90% of blob_dir_size
97   static constexpr double kEvictOldestFileAtSize = 0.9;
98 
99   using BlobDB::Put;
100   Status Put(const WriteOptions& options, const Slice& key,
101              const Slice& value) override;
102 
103   using BlobDB::Get;
104   Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
105              const Slice& key, PinnableSlice* value) override;
106 
107   Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
108              const Slice& key, PinnableSlice* value,
109              uint64_t* expiration) override;
110 
111   using BlobDB::NewIterator;
112   virtual Iterator* NewIterator(const ReadOptions& read_options) override;
113 
114   using BlobDB::NewIterators;
NewIterators(const ReadOptions &,const std::vector<ColumnFamilyHandle * > &,std::vector<Iterator * > *)115   virtual Status NewIterators(
116       const ReadOptions& /*read_options*/,
117       const std::vector<ColumnFamilyHandle*>& /*column_families*/,
118       std::vector<Iterator*>* /*iterators*/) override {
119     return Status::NotSupported("Not implemented");
120   }
121 
122   using BlobDB::MultiGet;
123   virtual std::vector<Status> MultiGet(
124       const ReadOptions& read_options,
125       const std::vector<Slice>& keys,
126       std::vector<std::string>* values) override;
127 
128   virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
129 
130   virtual Status Close() override;
131 
132   using BlobDB::PutWithTTL;
133   Status PutWithTTL(const WriteOptions& options, const Slice& key,
134                     const Slice& value, uint64_t ttl) override;
135 
136   using BlobDB::PutUntil;
137   Status PutUntil(const WriteOptions& options, const Slice& key,
138                   const Slice& value, uint64_t expiration) override;
139 
140   using BlobDB::CompactFiles;
141   Status CompactFiles(
142       const CompactionOptions& compact_options,
143       const std::vector<std::string>& input_file_names, const int output_level,
144       const int output_path_id = -1,
145       std::vector<std::string>* const output_file_names = nullptr,
146       CompactionJobInfo* compaction_job_info = nullptr) override;
147 
148   BlobDBOptions GetBlobDBOptions() const override;
149 
150   BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
151              const DBOptions& db_options,
152              const ColumnFamilyOptions& cf_options);
153 
154   virtual Status DisableFileDeletions() override;
155 
156   virtual Status EnableFileDeletions(bool force) override;
157 
158   virtual Status GetLiveFiles(std::vector<std::string>&,
159                               uint64_t* manifest_file_size,
160                               bool flush_memtable = true) override;
161   virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
162 
163   ~BlobDBImpl();
164 
165   Status Open(std::vector<ColumnFamilyHandle*>* handles);
166 
167   Status SyncBlobFiles() override;
168 
169   // Common part of the two GetCompactionContext methods below.
170   // REQUIRES: read lock on mutex_
171   void GetCompactionContextCommon(BlobCompactionContext* context) const;
172 
173   void GetCompactionContext(BlobCompactionContext* context);
174   void GetCompactionContext(BlobCompactionContext* context,
175                             BlobCompactionContextGC* context_gc);
176 
177 #ifndef NDEBUG
178   Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
179                            PinnableSlice* value);
180 
181   void TEST_AddDummyBlobFile(uint64_t blob_file_number,
182                              SequenceNumber immutable_sequence);
183 
184   std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
185 
186   std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const;
187 
188   std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
189 
190   Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
191 
192   void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
193                              SequenceNumber obsolete_seq = 0,
194                              bool update_size = true);
195 
196   void TEST_EvictExpiredFiles();
197 
198   void TEST_DeleteObsoleteFiles();
199 
200   uint64_t TEST_live_sst_size();
201 
TEST_blob_dir()202   const std::string& TEST_blob_dir() const { return blob_dir_; }
203 
204   void TEST_InitializeBlobFileToSstMapping(
205       const std::vector<LiveFileMetaData>& live_files);
206 
207   void TEST_ProcessFlushJobInfo(const FlushJobInfo& info);
208 
209   void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info);
210 
211 #endif  //  !NDEBUG
212 
213  private:
214   class BlobInserter;
215 
216   // Create a snapshot if there isn't one in read options.
217   // Return true if a snapshot is created.
218   bool SetSnapshotIfNeeded(ReadOptions* read_options);
219 
220   Status GetImpl(const ReadOptions& read_options,
221                  ColumnFamilyHandle* column_family, const Slice& key,
222                  PinnableSlice* value, uint64_t* expiration = nullptr);
223 
224   Status GetBlobValue(const Slice& key, const Slice& index_entry,
225                       PinnableSlice* value, uint64_t* expiration = nullptr);
226 
227   Status GetRawBlobFromFile(const Slice& key, uint64_t file_number,
228                             uint64_t offset, uint64_t size,
229                             PinnableSlice* value,
230                             CompressionType* compression_type);
231 
232   Slice GetCompressedSlice(const Slice& raw,
233                            std::string* compression_output) const;
234 
235   // Close a file by appending a footer, and removes file from open files list.
236   // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_
237   // and the blob file's mutex_. If called on a blob file which is visible only
238   // to a single thread (like in the case of new files written during GC), the
239   // locks on write_mutex_ and the blob file's mutex_ can be avoided.
240   Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
241 
242   // Close a file if its size exceeds blob_file_size
243   // REQUIRES: lock held on write_mutex_.
244   Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
245 
246   // Mark file as obsolete and move the file to obsolete file list.
247   //
248   // REQUIRED: hold write lock of mutex_ or during DB open.
249   void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
250                         SequenceNumber obsolete_seq, bool update_size);
251 
252   Status PutBlobValue(const WriteOptions& options, const Slice& key,
253                       const Slice& value, uint64_t expiration,
254                       WriteBatch* batch);
255 
256   Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
257                     const std::string& headerbuf, const Slice& key,
258                     const Slice& value, uint64_t expiration,
259                     std::string* index_entry);
260 
261   // Create a new blob file and associated writer.
262   Status CreateBlobFileAndWriter(bool has_ttl,
263                                  const ExpirationRange& expiration_range,
264                                  const std::string& reason,
265                                  std::shared_ptr<BlobFile>* blob_file,
266                                  std::shared_ptr<Writer>* writer);
267 
268   // Get the open non-TTL blob log file, or create a new one if no such file
269   // exists.
270   Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
271 
272   // Get the open TTL blob log file for a certain expiration, or create a new
273   // one if no such file exists.
274   Status SelectBlobFileTTL(uint64_t expiration,
275                            std::shared_ptr<BlobFile>* blob_file);
276 
277   std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
278 
279   // periodic sanity check. Bunch of checks
280   std::pair<bool, int64_t> SanityCheck(bool aborted);
281 
282   // Delete files that have been marked obsolete (either because of TTL
283   // or GC). Check whether any snapshots exist which refer to the same.
284   std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
285 
286   // periodically check if open blob files and their TTL's has expired
287   // if expired, close the sequential writer and make the file immutable
288   std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
289 
290   // if the number of open files, approaches ULIMIT's this
291   // task will close random readers, which are kept around for
292   // efficiency
293   std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
294 
295   std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
296 
297   // Adds the background tasks to the timer queue
298   void StartBackgroundTasks();
299 
300   // add a new Blob File
301   std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl,
302                                         const ExpirationRange& expiration_range,
303                                         const std::string& reason);
304 
305   // Register a new blob file.
306   // REQUIRES: write lock on mutex_.
307   void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file);
308 
309   // collect all the blob log files from the blob directory
310   Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
311 
312   // Open all blob files found in blob_dir.
313   Status OpenAllBlobFiles();
314 
315   // Link an SST to a blob file. Comes in locking and non-locking varieties
316   // (the latter is used during Open).
317   template <typename Linker>
318   void LinkSstToBlobFileImpl(uint64_t sst_file_number,
319                              uint64_t blob_file_number, Linker linker);
320 
321   void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number);
322 
323   void LinkSstToBlobFileNoLock(uint64_t sst_file_number,
324                                uint64_t blob_file_number);
325 
326   // Unlink an SST from a blob file.
327   void UnlinkSstFromBlobFile(uint64_t sst_file_number,
328                              uint64_t blob_file_number);
329 
330   // Initialize the mapping between blob files and SSTs during Open.
331   void InitializeBlobFileToSstMapping(
332       const std::vector<LiveFileMetaData>& live_files);
333 
334   // Update the mapping between blob files and SSTs after a flush and mark
335   // any unneeded blob files obsolete.
336   void ProcessFlushJobInfo(const FlushJobInfo& info);
337 
338   // Update the mapping between blob files and SSTs after a compaction and
339   // mark any unneeded blob files obsolete.
340   void ProcessCompactionJobInfo(const CompactionJobInfo& info);
341 
342   // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs
343   // linked to it, and all memtables from before the blob file became immutable
344   // have been flushed. Note: should only be called if the condition holds for
345   // all lower-numbered non-TTL blob files as well.
346   bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file,
347                                     SequenceNumber obsolete_seq);
348 
349   // Mark all immutable non-TTL blob files that aren't needed by any SSTs as
350   // obsolete. Comes in two varieties; the version used during Open need not
351   // worry about locking or snapshots.
352   template <class Functor>
353   void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed);
354 
355   void MarkUnreferencedBlobFilesObsolete();
356   void MarkUnreferencedBlobFilesObsoleteDuringOpen();
357 
358   void UpdateLiveSSTSize();
359 
360   Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
361                            std::shared_ptr<RandomAccessFileReader>* reader);
362 
363   // hold write mutex on file and call.
364   // Close the above Random Access reader
365   void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
366 
367   // hold write mutex on file and call
368   // creates a sequential (append) writer for this blobfile
369   Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
370 
371   // returns a Writer object for the file. If writer is not
372   // already present, creates one. Needs Write Mutex to be held
373   Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
374                                    std::shared_ptr<Writer>* writer);
375 
376   // checks if there is no snapshot which is referencing the
377   // blobs
378   bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
379   bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
380 
381   void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
382 
EpochNow()383   uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
384 
385   // Check if inserting a new blob will make DB grow out of space.
386   // If is_fifo = true, FIFO eviction will be triggered to make room for the
387   // new blob. If force_evict = true, FIFO eviction will evict blob files
388   // even eviction will not make enough room for the new blob.
389   Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
390                                     bool force_evict = false);
391 
392   // name of the database directory
393   std::string dbname_;
394 
395   // the base DB
396   DBImpl* db_impl_;
397   Env* env_;
398 
399   // the options that govern the behavior of Blob Storage
400   BlobDBOptions bdb_options_;
401   DBOptions db_options_;
402   ColumnFamilyOptions cf_options_;
403   EnvOptions env_options_;
404 
405   // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
406   // ownership.
407   Statistics* statistics_;
408 
409   // by default this is "blob_dir" under dbname_
410   // but can be configured
411   std::string blob_dir_;
412 
413   // pointer to directory
414   std::unique_ptr<Directory> dir_ent_;
415 
416   // Read Write Mutex, which protects all the data structures
417   // HEAVILY TRAFFICKED
418   mutable port::RWMutex mutex_;
419 
420   // Writers has to hold write_mutex_ before writing.
421   mutable port::Mutex write_mutex_;
422 
423   // counter for blob file number
424   std::atomic<uint64_t> next_file_number_;
425 
426   // entire metadata of all the BLOB files memory
427   std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
428 
429   // All live immutable non-TTL blob files.
430   std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_;
431 
432   // The largest sequence number that has been flushed.
433   SequenceNumber flush_sequence_;
434 
435   // opened non-TTL blob file.
436   std::shared_ptr<BlobFile> open_non_ttl_file_;
437 
438   // all the blob files which are currently being appended to based
439   // on variety of incoming TTL's
440   std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
441 
442   // Flag to check whether Close() has been called on this DB
443   bool closed_;
444 
445   // timer based queue to execute tasks
446   TimerQueue tqueue_;
447 
448   // number of files opened for random access/GET
449   // counter is used to monitor and close excess RA files.
450   std::atomic<uint32_t> open_file_count_;
451 
452   // Total size of all live blob files (i.e. exclude obsolete files).
453   std::atomic<uint64_t> total_blob_size_;
454 
455   // total size of SST files.
456   std::atomic<uint64_t> live_sst_size_;
457 
458   // Latest FIFO eviction timestamp
459   //
460   // REQUIRES: access with metex_ lock held.
461   uint64_t fifo_eviction_seq_;
462 
463   // The expiration up to which latest FIFO eviction evicts.
464   //
465   // REQUIRES: access with metex_ lock held.
466   uint64_t evict_expiration_up_to_;
467 
468   std::list<std::shared_ptr<BlobFile>> obsolete_files_;
469 
470   // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
471   // on the mutex to avoid contention.
472   //
473   // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
474   // the difference. mutex_ only needs to be held when access the
475   // data-structure, and delete_file_mutex_ needs to be held the whole time
476   // during DeleteObsoleteFiles to avoid being run simultaneously with
477   // DisableFileDeletions.
478   //
479   // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
480   // to hold delete_file_mutex_ first to avoid deadlock.
481   mutable port::Mutex delete_file_mutex_;
482 
483   // Each call of DisableFileDeletions will increase disable_file_deletion_
484   // by 1. EnableFileDeletions will either decrease the count by 1 or reset
485   // it to zeor, depending on the force flag.
486   //
487   // REQUIRES: access with delete_file_mutex_ held.
488   int disable_file_deletions_ = 0;
489 
490   uint32_t debug_level_;
491 };
492 
493 }  // namespace blob_db
494 }  // namespace ROCKSDB_NAMESPACE
495 #endif  // ROCKSDB_LITE
496