1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #ifndef ROCKSDB_LITE 9 10 #include <atomic> 11 #include <condition_variable> 12 #include <limits> 13 #include <list> 14 #include <memory> 15 #include <set> 16 #include <string> 17 #include <thread> 18 #include <unordered_map> 19 #include <utility> 20 #include <vector> 21 22 #include "db/db_iter.h" 23 #include "rocksdb/compaction_filter.h" 24 #include "rocksdb/db.h" 25 #include "rocksdb/listener.h" 26 #include "rocksdb/options.h" 27 #include "rocksdb/statistics.h" 28 #include "rocksdb/wal_filter.h" 29 #include "util/mutexlock.h" 30 #include "util/timer_queue.h" 31 #include "utilities/blob_db/blob_db.h" 32 #include "utilities/blob_db/blob_file.h" 33 #include "utilities/blob_db/blob_log_format.h" 34 #include "utilities/blob_db/blob_log_reader.h" 35 #include "utilities/blob_db/blob_log_writer.h" 36 37 namespace ROCKSDB_NAMESPACE { 38 39 class DBImpl; 40 class ColumnFamilyHandle; 41 class ColumnFamilyData; 42 struct FlushJobInfo; 43 44 namespace blob_db { 45 46 struct BlobCompactionContext; 47 struct BlobCompactionContextGC; 48 class BlobDBImpl; 49 class BlobFile; 50 51 // Comparator to sort "TTL" aware Blob files based on the lower value of 52 // TTL range. 53 struct BlobFileComparatorTTL { 54 bool operator()(const std::shared_ptr<BlobFile>& lhs, 55 const std::shared_ptr<BlobFile>& rhs) const; 56 }; 57 58 struct BlobFileComparator { 59 bool operator()(const std::shared_ptr<BlobFile>& lhs, 60 const std::shared_ptr<BlobFile>& rhs) const; 61 }; 62 63 /** 64 * The implementation class for BlobDB. It manages the blob logs, which 65 * are sequentially written files. Blob logs can be of the TTL or non-TTL 66 * varieties; the former are cleaned up when they expire, while the latter 67 * are (optionally) garbage collected. 68 */ 69 class BlobDBImpl : public BlobDB { 70 friend class BlobFile; 71 friend class BlobDBIterator; 72 friend class BlobDBListener; 73 friend class BlobDBListenerGC; 74 friend class BlobIndexCompactionFilterGC; 75 76 public: 77 // deletions check period 78 static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; 79 80 // sanity check task 81 static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; 82 83 // how many random access open files can we tolerate 84 static constexpr uint32_t kOpenFilesTrigger = 100; 85 86 // how often to schedule reclaim open files. 87 static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; 88 89 // how often to schedule delete obs files periods 90 static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; 91 92 // how often to schedule expired files eviction. 93 static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; 94 95 // when should oldest file be evicted: 96 // on reaching 90% of blob_dir_size 97 static constexpr double kEvictOldestFileAtSize = 0.9; 98 99 using BlobDB::Put; 100 Status Put(const WriteOptions& options, const Slice& key, 101 const Slice& value) override; 102 103 using BlobDB::Get; 104 Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, 105 const Slice& key, PinnableSlice* value) override; 106 107 Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, 108 const Slice& key, PinnableSlice* value, 109 uint64_t* expiration) override; 110 111 using BlobDB::NewIterator; 112 virtual Iterator* NewIterator(const ReadOptions& read_options) override; 113 114 using BlobDB::NewIterators; NewIterators(const ReadOptions &,const std::vector<ColumnFamilyHandle * > &,std::vector<Iterator * > *)115 virtual Status NewIterators( 116 const ReadOptions& /*read_options*/, 117 const std::vector<ColumnFamilyHandle*>& /*column_families*/, 118 std::vector<Iterator*>* /*iterators*/) override { 119 return Status::NotSupported("Not implemented"); 120 } 121 122 using BlobDB::MultiGet; 123 virtual std::vector<Status> MultiGet( 124 const ReadOptions& read_options, 125 const std::vector<Slice>& keys, 126 std::vector<std::string>* values) override; 127 128 virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; 129 130 virtual Status Close() override; 131 132 using BlobDB::PutWithTTL; 133 Status PutWithTTL(const WriteOptions& options, const Slice& key, 134 const Slice& value, uint64_t ttl) override; 135 136 using BlobDB::PutUntil; 137 Status PutUntil(const WriteOptions& options, const Slice& key, 138 const Slice& value, uint64_t expiration) override; 139 140 using BlobDB::CompactFiles; 141 Status CompactFiles( 142 const CompactionOptions& compact_options, 143 const std::vector<std::string>& input_file_names, const int output_level, 144 const int output_path_id = -1, 145 std::vector<std::string>* const output_file_names = nullptr, 146 CompactionJobInfo* compaction_job_info = nullptr) override; 147 148 BlobDBOptions GetBlobDBOptions() const override; 149 150 BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, 151 const DBOptions& db_options, 152 const ColumnFamilyOptions& cf_options); 153 154 virtual Status DisableFileDeletions() override; 155 156 virtual Status EnableFileDeletions(bool force) override; 157 158 virtual Status GetLiveFiles(std::vector<std::string>&, 159 uint64_t* manifest_file_size, 160 bool flush_memtable = true) override; 161 virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override; 162 163 ~BlobDBImpl(); 164 165 Status Open(std::vector<ColumnFamilyHandle*>* handles); 166 167 Status SyncBlobFiles() override; 168 169 // Common part of the two GetCompactionContext methods below. 170 // REQUIRES: read lock on mutex_ 171 void GetCompactionContextCommon(BlobCompactionContext* context) const; 172 173 void GetCompactionContext(BlobCompactionContext* context); 174 void GetCompactionContext(BlobCompactionContext* context, 175 BlobCompactionContextGC* context_gc); 176 177 #ifndef NDEBUG 178 Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, 179 PinnableSlice* value); 180 181 void TEST_AddDummyBlobFile(uint64_t blob_file_number, 182 SequenceNumber immutable_sequence); 183 184 std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const; 185 186 std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const; 187 188 std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const; 189 190 Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile); 191 192 void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file, 193 SequenceNumber obsolete_seq = 0, 194 bool update_size = true); 195 196 void TEST_EvictExpiredFiles(); 197 198 void TEST_DeleteObsoleteFiles(); 199 200 uint64_t TEST_live_sst_size(); 201 TEST_blob_dir()202 const std::string& TEST_blob_dir() const { return blob_dir_; } 203 204 void TEST_InitializeBlobFileToSstMapping( 205 const std::vector<LiveFileMetaData>& live_files); 206 207 void TEST_ProcessFlushJobInfo(const FlushJobInfo& info); 208 209 void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info); 210 211 #endif // !NDEBUG 212 213 private: 214 class BlobInserter; 215 216 // Create a snapshot if there isn't one in read options. 217 // Return true if a snapshot is created. 218 bool SetSnapshotIfNeeded(ReadOptions* read_options); 219 220 Status GetImpl(const ReadOptions& read_options, 221 ColumnFamilyHandle* column_family, const Slice& key, 222 PinnableSlice* value, uint64_t* expiration = nullptr); 223 224 Status GetBlobValue(const Slice& key, const Slice& index_entry, 225 PinnableSlice* value, uint64_t* expiration = nullptr); 226 227 Status GetRawBlobFromFile(const Slice& key, uint64_t file_number, 228 uint64_t offset, uint64_t size, 229 PinnableSlice* value, 230 CompressionType* compression_type); 231 232 Slice GetCompressedSlice(const Slice& raw, 233 std::string* compression_output) const; 234 235 // Close a file by appending a footer, and removes file from open files list. 236 // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ 237 // and the blob file's mutex_. If called on a blob file which is visible only 238 // to a single thread (like in the case of new files written during GC), the 239 // locks on write_mutex_ and the blob file's mutex_ can be avoided. 240 Status CloseBlobFile(std::shared_ptr<BlobFile> bfile); 241 242 // Close a file if its size exceeds blob_file_size 243 // REQUIRES: lock held on write_mutex_. 244 Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile); 245 246 // Mark file as obsolete and move the file to obsolete file list. 247 // 248 // REQUIRED: hold write lock of mutex_ or during DB open. 249 void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file, 250 SequenceNumber obsolete_seq, bool update_size); 251 252 Status PutBlobValue(const WriteOptions& options, const Slice& key, 253 const Slice& value, uint64_t expiration, 254 WriteBatch* batch); 255 256 Status AppendBlob(const std::shared_ptr<BlobFile>& bfile, 257 const std::string& headerbuf, const Slice& key, 258 const Slice& value, uint64_t expiration, 259 std::string* index_entry); 260 261 // Create a new blob file and associated writer. 262 Status CreateBlobFileAndWriter(bool has_ttl, 263 const ExpirationRange& expiration_range, 264 const std::string& reason, 265 std::shared_ptr<BlobFile>* blob_file, 266 std::shared_ptr<Writer>* writer); 267 268 // Get the open non-TTL blob log file, or create a new one if no such file 269 // exists. 270 Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file); 271 272 // Get the open TTL blob log file for a certain expiration, or create a new 273 // one if no such file exists. 274 Status SelectBlobFileTTL(uint64_t expiration, 275 std::shared_ptr<BlobFile>* blob_file); 276 277 std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const; 278 279 // periodic sanity check. Bunch of checks 280 std::pair<bool, int64_t> SanityCheck(bool aborted); 281 282 // Delete files that have been marked obsolete (either because of TTL 283 // or GC). Check whether any snapshots exist which refer to the same. 284 std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted); 285 286 // periodically check if open blob files and their TTL's has expired 287 // if expired, close the sequential writer and make the file immutable 288 std::pair<bool, int64_t> EvictExpiredFiles(bool aborted); 289 290 // if the number of open files, approaches ULIMIT's this 291 // task will close random readers, which are kept around for 292 // efficiency 293 std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted); 294 295 std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted); 296 297 // Adds the background tasks to the timer queue 298 void StartBackgroundTasks(); 299 300 // add a new Blob File 301 std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl, 302 const ExpirationRange& expiration_range, 303 const std::string& reason); 304 305 // Register a new blob file. 306 // REQUIRES: write lock on mutex_. 307 void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file); 308 309 // collect all the blob log files from the blob directory 310 Status GetAllBlobFiles(std::set<uint64_t>* file_numbers); 311 312 // Open all blob files found in blob_dir. 313 Status OpenAllBlobFiles(); 314 315 // Link an SST to a blob file. Comes in locking and non-locking varieties 316 // (the latter is used during Open). 317 template <typename Linker> 318 void LinkSstToBlobFileImpl(uint64_t sst_file_number, 319 uint64_t blob_file_number, Linker linker); 320 321 void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number); 322 323 void LinkSstToBlobFileNoLock(uint64_t sst_file_number, 324 uint64_t blob_file_number); 325 326 // Unlink an SST from a blob file. 327 void UnlinkSstFromBlobFile(uint64_t sst_file_number, 328 uint64_t blob_file_number); 329 330 // Initialize the mapping between blob files and SSTs during Open. 331 void InitializeBlobFileToSstMapping( 332 const std::vector<LiveFileMetaData>& live_files); 333 334 // Update the mapping between blob files and SSTs after a flush and mark 335 // any unneeded blob files obsolete. 336 void ProcessFlushJobInfo(const FlushJobInfo& info); 337 338 // Update the mapping between blob files and SSTs after a compaction and 339 // mark any unneeded blob files obsolete. 340 void ProcessCompactionJobInfo(const CompactionJobInfo& info); 341 342 // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs 343 // linked to it, and all memtables from before the blob file became immutable 344 // have been flushed. Note: should only be called if the condition holds for 345 // all lower-numbered non-TTL blob files as well. 346 bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file, 347 SequenceNumber obsolete_seq); 348 349 // Mark all immutable non-TTL blob files that aren't needed by any SSTs as 350 // obsolete. Comes in two varieties; the version used during Open need not 351 // worry about locking or snapshots. 352 template <class Functor> 353 void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed); 354 355 void MarkUnreferencedBlobFilesObsolete(); 356 void MarkUnreferencedBlobFilesObsoleteDuringOpen(); 357 358 void UpdateLiveSSTSize(); 359 360 Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file, 361 std::shared_ptr<RandomAccessFileReader>* reader); 362 363 // hold write mutex on file and call. 364 // Close the above Random Access reader 365 void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile); 366 367 // hold write mutex on file and call 368 // creates a sequential (append) writer for this blobfile 369 Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile); 370 371 // returns a Writer object for the file. If writer is not 372 // already present, creates one. Needs Write Mutex to be held 373 Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file, 374 std::shared_ptr<Writer>* writer); 375 376 // checks if there is no snapshot which is referencing the 377 // blobs 378 bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file); 379 bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile); 380 381 void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy); 382 EpochNow()383 uint64_t EpochNow() { return env_->NowMicros() / 1000000; } 384 385 // Check if inserting a new blob will make DB grow out of space. 386 // If is_fifo = true, FIFO eviction will be triggered to make room for the 387 // new blob. If force_evict = true, FIFO eviction will evict blob files 388 // even eviction will not make enough room for the new blob. 389 Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, 390 bool force_evict = false); 391 392 // name of the database directory 393 std::string dbname_; 394 395 // the base DB 396 DBImpl* db_impl_; 397 Env* env_; 398 399 // the options that govern the behavior of Blob Storage 400 BlobDBOptions bdb_options_; 401 DBOptions db_options_; 402 ColumnFamilyOptions cf_options_; 403 EnvOptions env_options_; 404 405 // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold 406 // ownership. 407 Statistics* statistics_; 408 409 // by default this is "blob_dir" under dbname_ 410 // but can be configured 411 std::string blob_dir_; 412 413 // pointer to directory 414 std::unique_ptr<Directory> dir_ent_; 415 416 // Read Write Mutex, which protects all the data structures 417 // HEAVILY TRAFFICKED 418 mutable port::RWMutex mutex_; 419 420 // Writers has to hold write_mutex_ before writing. 421 mutable port::Mutex write_mutex_; 422 423 // counter for blob file number 424 std::atomic<uint64_t> next_file_number_; 425 426 // entire metadata of all the BLOB files memory 427 std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_; 428 429 // All live immutable non-TTL blob files. 430 std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_; 431 432 // The largest sequence number that has been flushed. 433 SequenceNumber flush_sequence_; 434 435 // opened non-TTL blob file. 436 std::shared_ptr<BlobFile> open_non_ttl_file_; 437 438 // all the blob files which are currently being appended to based 439 // on variety of incoming TTL's 440 std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_; 441 442 // Flag to check whether Close() has been called on this DB 443 bool closed_; 444 445 // timer based queue to execute tasks 446 TimerQueue tqueue_; 447 448 // number of files opened for random access/GET 449 // counter is used to monitor and close excess RA files. 450 std::atomic<uint32_t> open_file_count_; 451 452 // Total size of all live blob files (i.e. exclude obsolete files). 453 std::atomic<uint64_t> total_blob_size_; 454 455 // total size of SST files. 456 std::atomic<uint64_t> live_sst_size_; 457 458 // Latest FIFO eviction timestamp 459 // 460 // REQUIRES: access with metex_ lock held. 461 uint64_t fifo_eviction_seq_; 462 463 // The expiration up to which latest FIFO eviction evicts. 464 // 465 // REQUIRES: access with metex_ lock held. 466 uint64_t evict_expiration_up_to_; 467 468 std::list<std::shared_ptr<BlobFile>> obsolete_files_; 469 470 // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block 471 // on the mutex to avoid contention. 472 // 473 // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note 474 // the difference. mutex_ only needs to be held when access the 475 // data-structure, and delete_file_mutex_ needs to be held the whole time 476 // during DeleteObsoleteFiles to avoid being run simultaneously with 477 // DisableFileDeletions. 478 // 479 // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced 480 // to hold delete_file_mutex_ first to avoid deadlock. 481 mutable port::Mutex delete_file_mutex_; 482 483 // Each call of DisableFileDeletions will increase disable_file_deletion_ 484 // by 1. EnableFileDeletions will either decrease the count by 1 or reset 485 // it to zeor, depending on the force flag. 486 // 487 // REQUIRES: access with delete_file_mutex_ held. 488 int disable_file_deletions_ = 0; 489 490 uint32_t debug_level_; 491 }; 492 493 } // namespace blob_db 494 } // namespace ROCKSDB_NAMESPACE 495 #endif // ROCKSDB_LITE 496