1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #ifndef ROCKSDB_LITE 9 10 #include <atomic> 11 #include <condition_variable> 12 #include <limits> 13 #include <list> 14 #include <memory> 15 #include <set> 16 #include <string> 17 #include <thread> 18 #include <unordered_map> 19 #include <utility> 20 #include <vector> 21 22 #include "db/blob/blob_log_format.h" 23 #include "db/blob/blob_log_writer.h" 24 #include "db/db_iter.h" 25 #include "rocksdb/compaction_filter.h" 26 #include "rocksdb/db.h" 27 #include "rocksdb/file_system.h" 28 #include "rocksdb/listener.h" 29 #include "rocksdb/options.h" 30 #include "rocksdb/statistics.h" 31 #include "rocksdb/wal_filter.h" 32 #include "util/mutexlock.h" 33 #include "util/timer_queue.h" 34 #include "utilities/blob_db/blob_db.h" 35 #include "utilities/blob_db/blob_file.h" 36 37 namespace ROCKSDB_NAMESPACE { 38 39 class DBImpl; 40 class ColumnFamilyHandle; 41 class ColumnFamilyData; 42 class SystemClock; 43 44 struct FlushJobInfo; 45 46 namespace blob_db { 47 48 struct BlobCompactionContext; 49 struct BlobCompactionContextGC; 50 class BlobDBImpl; 51 class BlobFile; 52 53 // Comparator to sort "TTL" aware Blob files based on the lower value of 54 // TTL range. 55 struct BlobFileComparatorTTL { 56 bool operator()(const std::shared_ptr<BlobFile>& lhs, 57 const std::shared_ptr<BlobFile>& rhs) const; 58 }; 59 60 struct BlobFileComparator { 61 bool operator()(const std::shared_ptr<BlobFile>& lhs, 62 const std::shared_ptr<BlobFile>& rhs) const; 63 }; 64 65 /** 66 * The implementation class for BlobDB. It manages the blob logs, which 67 * are sequentially written files. Blob logs can be of the TTL or non-TTL 68 * varieties; the former are cleaned up when they expire, while the latter 69 * are (optionally) garbage collected. 70 */ 71 class BlobDBImpl : public BlobDB { 72 friend class BlobFile; 73 friend class BlobDBIterator; 74 friend class BlobDBListener; 75 friend class BlobDBListenerGC; 76 friend class BlobIndexCompactionFilterBase; 77 friend class BlobIndexCompactionFilterGC; 78 79 public: 80 // deletions check period 81 static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; 82 83 // sanity check task 84 static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; 85 86 // how many random access open files can we tolerate 87 static constexpr uint32_t kOpenFilesTrigger = 100; 88 89 // how often to schedule reclaim open files. 90 static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; 91 92 // how often to schedule delete obs files periods 93 static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; 94 95 // how often to schedule expired files eviction. 96 static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; 97 98 // when should oldest file be evicted: 99 // on reaching 90% of blob_dir_size 100 static constexpr double kEvictOldestFileAtSize = 0.9; 101 102 using BlobDB::Put; 103 Status Put(const WriteOptions& options, const Slice& key, 104 const Slice& value) override; 105 106 using BlobDB::Get; 107 Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, 108 const Slice& key, PinnableSlice* value) override; 109 110 Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, 111 const Slice& key, PinnableSlice* value, 112 uint64_t* expiration) override; 113 114 using BlobDB::NewIterator; 115 virtual Iterator* NewIterator(const ReadOptions& read_options) override; 116 117 using BlobDB::NewIterators; NewIterators(const ReadOptions &,const std::vector<ColumnFamilyHandle * > &,std::vector<Iterator * > *)118 virtual Status NewIterators( 119 const ReadOptions& /*read_options*/, 120 const std::vector<ColumnFamilyHandle*>& /*column_families*/, 121 std::vector<Iterator*>* /*iterators*/) override { 122 return Status::NotSupported("Not implemented"); 123 } 124 125 using BlobDB::MultiGet; 126 virtual std::vector<Status> MultiGet( 127 const ReadOptions& read_options, 128 const std::vector<Slice>& keys, 129 std::vector<std::string>* values) override; 130 131 virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; 132 133 virtual Status Close() override; 134 135 using BlobDB::PutWithTTL; 136 Status PutWithTTL(const WriteOptions& options, const Slice& key, 137 const Slice& value, uint64_t ttl) override; 138 139 using BlobDB::PutUntil; 140 Status PutUntil(const WriteOptions& options, const Slice& key, 141 const Slice& value, uint64_t expiration) override; 142 143 using BlobDB::CompactFiles; 144 Status CompactFiles( 145 const CompactionOptions& compact_options, 146 const std::vector<std::string>& input_file_names, const int output_level, 147 const int output_path_id = -1, 148 std::vector<std::string>* const output_file_names = nullptr, 149 CompactionJobInfo* compaction_job_info = nullptr) override; 150 151 BlobDBOptions GetBlobDBOptions() const override; 152 153 BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, 154 const DBOptions& db_options, 155 const ColumnFamilyOptions& cf_options); 156 157 virtual Status DisableFileDeletions() override; 158 159 virtual Status EnableFileDeletions(bool force) override; 160 161 virtual Status GetLiveFiles(std::vector<std::string>&, 162 uint64_t* manifest_file_size, 163 bool flush_memtable = true) override; 164 virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override; 165 166 ~BlobDBImpl(); 167 168 Status Open(std::vector<ColumnFamilyHandle*>* handles); 169 170 Status SyncBlobFiles() override; 171 172 // Common part of the two GetCompactionContext methods below. 173 // REQUIRES: read lock on mutex_ 174 void GetCompactionContextCommon(BlobCompactionContext* context); 175 176 void GetCompactionContext(BlobCompactionContext* context); 177 void GetCompactionContext(BlobCompactionContext* context, 178 BlobCompactionContextGC* context_gc); 179 180 #ifndef NDEBUG 181 Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, 182 PinnableSlice* value); 183 184 void TEST_AddDummyBlobFile(uint64_t blob_file_number, 185 SequenceNumber immutable_sequence); 186 187 std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const; 188 189 std::vector<std::shared_ptr<BlobFile>> TEST_GetLiveImmNonTTLFiles() const; 190 191 std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const; 192 193 Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile); 194 195 void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file, 196 SequenceNumber obsolete_seq = 0, 197 bool update_size = true); 198 199 void TEST_EvictExpiredFiles(); 200 201 void TEST_DeleteObsoleteFiles(); 202 203 uint64_t TEST_live_sst_size(); 204 TEST_blob_dir()205 const std::string& TEST_blob_dir() const { return blob_dir_; } 206 207 void TEST_InitializeBlobFileToSstMapping( 208 const std::vector<LiveFileMetaData>& live_files); 209 210 void TEST_ProcessFlushJobInfo(const FlushJobInfo& info); 211 212 void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info); 213 214 #endif // !NDEBUG 215 216 private: 217 class BlobInserter; 218 219 // Create a snapshot if there isn't one in read options. 220 // Return true if a snapshot is created. 221 bool SetSnapshotIfNeeded(ReadOptions* read_options); 222 223 Status GetImpl(const ReadOptions& read_options, 224 ColumnFamilyHandle* column_family, const Slice& key, 225 PinnableSlice* value, uint64_t* expiration = nullptr); 226 227 Status GetBlobValue(const Slice& key, const Slice& index_entry, 228 PinnableSlice* value, uint64_t* expiration = nullptr); 229 230 Status GetRawBlobFromFile(const Slice& key, uint64_t file_number, 231 uint64_t offset, uint64_t size, 232 PinnableSlice* value, 233 CompressionType* compression_type); 234 235 Slice GetCompressedSlice(const Slice& raw, 236 std::string* compression_output) const; 237 238 Status DecompressSlice(const Slice& compressed_value, 239 CompressionType compression_type, 240 PinnableSlice* value_output) const; 241 242 // Close a file by appending a footer, and removes file from open files list. 243 // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ 244 // and the blob file's mutex_. If called on a blob file which is visible only 245 // to a single thread (like in the case of new files written during 246 // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be 247 // avoided. 248 Status CloseBlobFile(std::shared_ptr<BlobFile> bfile); 249 250 // Close a file if its size exceeds blob_file_size 251 // REQUIRES: lock held on write_mutex_. 252 Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile); 253 254 // Mark file as obsolete and move the file to obsolete file list. 255 // 256 // REQUIRED: hold write lock of mutex_ or during DB open. 257 void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file, 258 SequenceNumber obsolete_seq, bool update_size); 259 260 Status PutBlobValue(const WriteOptions& options, const Slice& key, 261 const Slice& value, uint64_t expiration, 262 WriteBatch* batch); 263 264 Status AppendBlob(const std::shared_ptr<BlobFile>& bfile, 265 const std::string& headerbuf, const Slice& key, 266 const Slice& value, uint64_t expiration, 267 std::string* index_entry); 268 269 // Create a new blob file and associated writer. 270 Status CreateBlobFileAndWriter(bool has_ttl, 271 const ExpirationRange& expiration_range, 272 const std::string& reason, 273 std::shared_ptr<BlobFile>* blob_file, 274 std::shared_ptr<BlobLogWriter>* writer); 275 276 // Get the open non-TTL blob log file, or create a new one if no such file 277 // exists. 278 Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file); 279 280 // Get the open TTL blob log file for a certain expiration, or create a new 281 // one if no such file exists. 282 Status SelectBlobFileTTL(uint64_t expiration, 283 std::shared_ptr<BlobFile>* blob_file); 284 285 std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const; 286 287 // periodic sanity check. Bunch of checks 288 std::pair<bool, int64_t> SanityCheck(bool aborted); 289 290 // Delete files that have been marked obsolete (either because of TTL 291 // or GC). Check whether any snapshots exist which refer to the same. 292 std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted); 293 294 // periodically check if open blob files and their TTL's has expired 295 // if expired, close the sequential writer and make the file immutable 296 std::pair<bool, int64_t> EvictExpiredFiles(bool aborted); 297 298 // if the number of open files, approaches ULIMIT's this 299 // task will close random readers, which are kept around for 300 // efficiency 301 std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted); 302 303 std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted); 304 305 // Adds the background tasks to the timer queue 306 void StartBackgroundTasks(); 307 308 // add a new Blob File 309 std::shared_ptr<BlobFile> NewBlobFile(bool has_ttl, 310 const ExpirationRange& expiration_range, 311 const std::string& reason); 312 313 // Register a new blob file. 314 // REQUIRES: write lock on mutex_. 315 void RegisterBlobFile(std::shared_ptr<BlobFile> blob_file); 316 317 // collect all the blob log files from the blob directory 318 Status GetAllBlobFiles(std::set<uint64_t>* file_numbers); 319 320 // Open all blob files found in blob_dir. 321 Status OpenAllBlobFiles(); 322 323 // Link an SST to a blob file. Comes in locking and non-locking varieties 324 // (the latter is used during Open). 325 template <typename Linker> 326 void LinkSstToBlobFileImpl(uint64_t sst_file_number, 327 uint64_t blob_file_number, Linker linker); 328 329 void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number); 330 331 void LinkSstToBlobFileNoLock(uint64_t sst_file_number, 332 uint64_t blob_file_number); 333 334 // Unlink an SST from a blob file. 335 void UnlinkSstFromBlobFile(uint64_t sst_file_number, 336 uint64_t blob_file_number); 337 338 // Initialize the mapping between blob files and SSTs during Open. 339 void InitializeBlobFileToSstMapping( 340 const std::vector<LiveFileMetaData>& live_files); 341 342 // Update the mapping between blob files and SSTs after a flush and mark 343 // any unneeded blob files obsolete. 344 void ProcessFlushJobInfo(const FlushJobInfo& info); 345 346 // Update the mapping between blob files and SSTs after a compaction and 347 // mark any unneeded blob files obsolete. 348 void ProcessCompactionJobInfo(const CompactionJobInfo& info); 349 350 // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs 351 // linked to it, and all memtables from before the blob file became immutable 352 // have been flushed. Note: should only be called if the condition holds for 353 // all lower-numbered non-TTL blob files as well. 354 bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile>& blob_file, 355 SequenceNumber obsolete_seq); 356 357 // Mark all immutable non-TTL blob files that aren't needed by any SSTs as 358 // obsolete. Comes in two varieties; the version used during Open need not 359 // worry about locking or snapshots. 360 template <class Functor> 361 void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed); 362 363 void MarkUnreferencedBlobFilesObsolete(); 364 void MarkUnreferencedBlobFilesObsoleteDuringOpen(); 365 366 void UpdateLiveSSTSize(); 367 368 Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file, 369 std::shared_ptr<RandomAccessFileReader>* reader); 370 371 // hold write mutex on file and call. 372 // Close the above Random Access reader 373 void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile); 374 375 // hold write mutex on file and call 376 // creates a sequential (append) writer for this blobfile 377 Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile); 378 379 // returns a BlobLogWriter object for the file. If writer is not 380 // already present, creates one. Needs Write Mutex to be held 381 Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file, 382 std::shared_ptr<BlobLogWriter>* writer); 383 384 // checks if there is no snapshot which is referencing the 385 // blobs 386 bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file); 387 bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile); 388 389 void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy); 390 EpochNow()391 uint64_t EpochNow() { return clock_->NowMicros() / 1000000; } 392 393 // Check if inserting a new blob will make DB grow out of space. 394 // If is_fifo = true, FIFO eviction will be triggered to make room for the 395 // new blob. If force_evict = true, FIFO eviction will evict blob files 396 // even eviction will not make enough room for the new blob. 397 Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, 398 bool force_evict = false); 399 400 // name of the database directory 401 std::string dbname_; 402 403 // the base DB 404 DBImpl* db_impl_; 405 Env* env_; 406 SystemClock* clock_; 407 // the options that govern the behavior of Blob Storage 408 BlobDBOptions bdb_options_; 409 DBOptions db_options_; 410 ColumnFamilyOptions cf_options_; 411 FileOptions file_options_; 412 413 // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold 414 // ownership. 415 Statistics* statistics_; 416 417 // by default this is "blob_dir" under dbname_ 418 // but can be configured 419 std::string blob_dir_; 420 421 // pointer to directory 422 std::unique_ptr<Directory> dir_ent_; 423 424 // Read Write Mutex, which protects all the data structures 425 // HEAVILY TRAFFICKED 426 mutable port::RWMutex mutex_; 427 428 // Writers has to hold write_mutex_ before writing. 429 mutable port::Mutex write_mutex_; 430 431 // counter for blob file number 432 std::atomic<uint64_t> next_file_number_; 433 434 // entire metadata of all the BLOB files memory 435 std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_; 436 437 // All live immutable non-TTL blob files. 438 std::map<uint64_t, std::shared_ptr<BlobFile>> live_imm_non_ttl_blob_files_; 439 440 // The largest sequence number that has been flushed. 441 SequenceNumber flush_sequence_; 442 443 // opened non-TTL blob file. 444 std::shared_ptr<BlobFile> open_non_ttl_file_; 445 446 // all the blob files which are currently being appended to based 447 // on variety of incoming TTL's 448 std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_; 449 450 // Flag to check whether Close() has been called on this DB 451 bool closed_; 452 453 // timer based queue to execute tasks 454 TimerQueue tqueue_; 455 456 // number of files opened for random access/GET 457 // counter is used to monitor and close excess RA files. 458 std::atomic<uint32_t> open_file_count_; 459 460 // Total size of all live blob files (i.e. exclude obsolete files). 461 std::atomic<uint64_t> total_blob_size_; 462 463 // total size of SST files. 464 std::atomic<uint64_t> live_sst_size_; 465 466 // Latest FIFO eviction timestamp 467 // 468 // REQUIRES: access with metex_ lock held. 469 uint64_t fifo_eviction_seq_; 470 471 // The expiration up to which latest FIFO eviction evicts. 472 // 473 // REQUIRES: access with metex_ lock held. 474 uint64_t evict_expiration_up_to_; 475 476 std::list<std::shared_ptr<BlobFile>> obsolete_files_; 477 478 // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block 479 // on the mutex to avoid contention. 480 // 481 // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note 482 // the difference. mutex_ only needs to be held when access the 483 // data-structure, and delete_file_mutex_ needs to be held the whole time 484 // during DeleteObsoleteFiles to avoid being run simultaneously with 485 // DisableFileDeletions. 486 // 487 // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced 488 // to hold delete_file_mutex_ first to avoid deadlock. 489 mutable port::Mutex delete_file_mutex_; 490 491 // Each call of DisableFileDeletions will increase disable_file_deletion_ 492 // by 1. EnableFileDeletions will either decrease the count by 1 or reset 493 // it to zeor, depending on the force flag. 494 // 495 // REQUIRES: access with delete_file_mutex_ held. 496 int disable_file_deletions_ = 0; 497 498 uint32_t debug_level_; 499 }; 500 501 } // namespace blob_db 502 } // namespace ROCKSDB_NAMESPACE 503 #endif // ROCKSDB_LITE 504