1 
2 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
3 //  This source code is licensed under both the GPLv2 (found in the
4 //  COPYING file in the root directory) and Apache 2.0 License
5 //  (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7 
8 #include "utilities/blob_db/blob_db_impl.h"
9 #include <algorithm>
10 #include <cinttypes>
11 #include <iomanip>
12 #include <memory>
13 #include <sstream>
14 
15 #include "db/blob/blob_index.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/write_batch_internal.h"
18 #include "file/file_util.h"
19 #include "file/filename.h"
20 #include "file/random_access_file_reader.h"
21 #include "file/sst_file_manager_impl.h"
22 #include "file/writable_file_writer.h"
23 #include "logging/logging.h"
24 #include "monitoring/instrumented_mutex.h"
25 #include "monitoring/statistics.h"
26 #include "rocksdb/convenience.h"
27 #include "rocksdb/env.h"
28 #include "rocksdb/iterator.h"
29 #include "rocksdb/utilities/stackable_db.h"
30 #include "rocksdb/utilities/transaction.h"
31 #include "table/block_based/block.h"
32 #include "table/block_based/block_based_table_builder.h"
33 #include "table/block_based/block_builder.h"
34 #include "table/meta_blocks.h"
35 #include "test_util/sync_point.h"
36 #include "util/cast_util.h"
37 #include "util/crc32c.h"
38 #include "util/mutexlock.h"
39 #include "util/random.h"
40 #include "util/stop_watch.h"
41 #include "util/timer_queue.h"
42 #include "utilities/blob_db/blob_compaction_filter.h"
43 #include "utilities/blob_db/blob_db_iterator.h"
44 #include "utilities/blob_db/blob_db_listener.h"
45 
46 namespace {
47 int kBlockBasedTableVersionFormat = 2;
48 }  // end namespace
49 
50 namespace ROCKSDB_NAMESPACE {
51 namespace blob_db {
52 
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const53 bool BlobFileComparator::operator()(
54     const std::shared_ptr<BlobFile>& lhs,
55     const std::shared_ptr<BlobFile>& rhs) const {
56   return lhs->BlobFileNumber() > rhs->BlobFileNumber();
57 }
58 
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const59 bool BlobFileComparatorTTL::operator()(
60     const std::shared_ptr<BlobFile>& lhs,
61     const std::shared_ptr<BlobFile>& rhs) const {
62   assert(lhs->HasTTL() && rhs->HasTTL());
63   if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
64     return true;
65   }
66   if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
67     return false;
68   }
69   return lhs->BlobFileNumber() < rhs->BlobFileNumber();
70 }
71 
BlobDBImpl(const std::string & dbname,const BlobDBOptions & blob_db_options,const DBOptions & db_options,const ColumnFamilyOptions & cf_options)72 BlobDBImpl::BlobDBImpl(const std::string& dbname,
73                        const BlobDBOptions& blob_db_options,
74                        const DBOptions& db_options,
75                        const ColumnFamilyOptions& cf_options)
76     : BlobDB(),
77       dbname_(dbname),
78       db_impl_(nullptr),
79       env_(db_options.env),
80       bdb_options_(blob_db_options),
81       db_options_(db_options),
82       cf_options_(cf_options),
83       file_options_(db_options),
84       statistics_(db_options_.statistics.get()),
85       next_file_number_(1),
86       flush_sequence_(0),
87       closed_(true),
88       open_file_count_(0),
89       total_blob_size_(0),
90       live_sst_size_(0),
91       fifo_eviction_seq_(0),
92       evict_expiration_up_to_(0),
93       debug_level_(0) {
94   clock_ = env_->GetSystemClock().get();
95   blob_dir_ = (bdb_options_.path_relative)
96                   ? dbname + "/" + bdb_options_.blob_dir
97                   : bdb_options_.blob_dir;
98   file_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
99 }
100 
~BlobDBImpl()101 BlobDBImpl::~BlobDBImpl() {
102   tqueue_.shutdown();
103   // CancelAllBackgroundWork(db_, true);
104   Status s __attribute__((__unused__)) = Close();
105   assert(s.ok());
106 }
107 
Close()108 Status BlobDBImpl::Close() {
109   if (closed_) {
110     return Status::OK();
111   }
112   closed_ = true;
113 
114   // Close base DB before BlobDBImpl destructs to stop event listener and
115   // compaction filter call.
116   Status s = db_->Close();
117   // delete db_ anyway even if close failed.
118   delete db_;
119   // Reset pointers to avoid StackableDB delete the pointer again.
120   db_ = nullptr;
121   db_impl_ = nullptr;
122   if (!s.ok()) {
123     return s;
124   }
125 
126   s = SyncBlobFiles();
127   return s;
128 }
129 
GetBlobDBOptions() const130 BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
131 
Open(std::vector<ColumnFamilyHandle * > * handles)132 Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
133   assert(handles != nullptr);
134   assert(db_ == nullptr);
135 
136   if (blob_dir_.empty()) {
137     return Status::NotSupported("No blob directory in options");
138   }
139 
140   if (bdb_options_.garbage_collection_cutoff < 0.0 ||
141       bdb_options_.garbage_collection_cutoff > 1.0) {
142     return Status::InvalidArgument(
143         "Garbage collection cutoff must be in the interval [0.0, 1.0]");
144   }
145 
146   // Temporarily disable compactions in the base DB during open; save the user
147   // defined value beforehand so we can restore it once BlobDB is initialized.
148   // Note: this is only needed if garbage collection is enabled.
149   const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
150 
151   if (bdb_options_.enable_garbage_collection) {
152     cf_options_.disable_auto_compactions = true;
153   }
154 
155   Status s;
156 
157   // Create info log.
158   if (db_options_.info_log == nullptr) {
159     s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
160     if (!s.ok()) {
161       return s;
162     }
163   }
164 
165   ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
166 
167   if ((cf_options_.compaction_filter != nullptr ||
168        cf_options_.compaction_filter_factory != nullptr)) {
169     ROCKS_LOG_INFO(db_options_.info_log,
170                    "BlobDB only support compaction filter on non-TTL values.");
171   }
172 
173   // Open blob directory.
174   s = env_->CreateDirIfMissing(blob_dir_);
175   if (!s.ok()) {
176     ROCKS_LOG_ERROR(db_options_.info_log,
177                     "Failed to create blob_dir %s, status: %s",
178                     blob_dir_.c_str(), s.ToString().c_str());
179   }
180   s = env_->NewDirectory(blob_dir_, &dir_ent_);
181   if (!s.ok()) {
182     ROCKS_LOG_ERROR(db_options_.info_log,
183                     "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
184                     s.ToString().c_str());
185     return s;
186   }
187 
188   // Open blob files.
189   s = OpenAllBlobFiles();
190   if (!s.ok()) {
191     return s;
192   }
193 
194   // Update options
195   if (bdb_options_.enable_garbage_collection) {
196     db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
197     cf_options_.compaction_filter_factory =
198         std::make_shared<BlobIndexCompactionFilterFactoryGC>(
199             this, clock_, cf_options_, statistics_);
200   } else {
201     db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
202     cf_options_.compaction_filter_factory =
203         std::make_shared<BlobIndexCompactionFilterFactory>(
204             this, clock_, cf_options_, statistics_);
205   }
206 
207   // Reset user compaction filter after building into compaction factory.
208   cf_options_.compaction_filter = nullptr;
209 
210   // Open base db.
211   ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
212   s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
213   if (!s.ok()) {
214     return s;
215   }
216   db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
217 
218   // Sanitize the blob_dir provided. Using a directory where the
219   // base DB stores its files for the default CF is not supported.
220   const ColumnFamilyData* const cfd =
221       static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
222   assert(cfd);
223 
224   const ImmutableCFOptions* const ioptions = cfd->ioptions();
225   assert(ioptions);
226 
227   assert(env_);
228 
229   for (const auto& cf_path : ioptions->cf_paths) {
230     bool blob_dir_same_as_cf_dir = false;
231     s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
232     if (!s.ok()) {
233       ROCKS_LOG_ERROR(db_options_.info_log,
234                       "Error while sanitizing blob_dir %s, status: %s",
235                       blob_dir_.c_str(), s.ToString().c_str());
236       return s;
237     }
238 
239     if (blob_dir_same_as_cf_dir) {
240       return Status::NotSupported(
241           "Using the base DB's storage directories for BlobDB files is not "
242           "supported.");
243     }
244   }
245 
246   // Initialize SST file <-> oldest blob file mapping if garbage collection
247   // is enabled.
248   if (bdb_options_.enable_garbage_collection) {
249     std::vector<LiveFileMetaData> live_files;
250     db_->GetLiveFilesMetaData(&live_files);
251 
252     InitializeBlobFileToSstMapping(live_files);
253 
254     MarkUnreferencedBlobFilesObsoleteDuringOpen();
255 
256     if (!disable_auto_compactions) {
257       s = db_->EnableAutoCompaction(*handles);
258       if (!s.ok()) {
259         ROCKS_LOG_ERROR(
260             db_options_.info_log,
261             "Failed to enable automatic compactions during open, status: %s",
262             s.ToString().c_str());
263         return s;
264       }
265     }
266   }
267 
268   // Add trash files in blob dir to file delete scheduler.
269   SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
270       db_impl_->immutable_db_options().sst_file_manager.get());
271   DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
272 
273   UpdateLiveSSTSize();
274 
275   // Start background jobs.
276   if (!bdb_options_.disable_background_tasks) {
277     StartBackgroundTasks();
278   }
279 
280   ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
281   bdb_options_.Dump(db_options_.info_log.get());
282   closed_ = false;
283   return s;
284 }
285 
StartBackgroundTasks()286 void BlobDBImpl::StartBackgroundTasks() {
287   // store a call to a member function and object
288   tqueue_.add(
289       kReclaimOpenFilesPeriodMillisecs,
290       std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
291   tqueue_.add(
292       kDeleteObsoleteFilesPeriodMillisecs,
293       std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
294   tqueue_.add(kSanityCheckPeriodMillisecs,
295               std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
296   tqueue_.add(
297       kEvictExpiredFilesPeriodMillisecs,
298       std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
299 }
300 
GetAllBlobFiles(std::set<uint64_t> * file_numbers)301 Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
302   assert(file_numbers != nullptr);
303   std::vector<std::string> all_files;
304   Status s = env_->GetChildren(blob_dir_, &all_files);
305   if (!s.ok()) {
306     ROCKS_LOG_ERROR(db_options_.info_log,
307                     "Failed to get list of blob files, status: %s",
308                     s.ToString().c_str());
309     return s;
310   }
311 
312   for (const auto& file_name : all_files) {
313     uint64_t file_number;
314     FileType type;
315     bool success = ParseFileName(file_name, &file_number, &type);
316     if (success && type == kBlobFile) {
317       file_numbers->insert(file_number);
318     } else {
319       ROCKS_LOG_WARN(db_options_.info_log,
320                      "Skipping file in blob directory: %s", file_name.c_str());
321     }
322   }
323 
324   return s;
325 }
326 
OpenAllBlobFiles()327 Status BlobDBImpl::OpenAllBlobFiles() {
328   std::set<uint64_t> file_numbers;
329   Status s = GetAllBlobFiles(&file_numbers);
330   if (!s.ok()) {
331     return s;
332   }
333 
334   if (!file_numbers.empty()) {
335     next_file_number_.store(*file_numbers.rbegin() + 1);
336   }
337 
338   std::ostringstream blob_file_oss;
339   std::ostringstream live_imm_oss;
340   std::ostringstream obsolete_file_oss;
341 
342   for (auto& file_number : file_numbers) {
343     std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
344         this, blob_dir_, file_number, db_options_.info_log.get());
345     blob_file->MarkImmutable(/* sequence */ 0);
346 
347     // Read file header and footer
348     Status read_metadata_status =
349         blob_file->ReadMetadata(env_->GetFileSystem(), file_options_);
350     if (read_metadata_status.IsCorruption()) {
351       // Remove incomplete file.
352       if (!obsolete_files_.empty()) {
353         obsolete_file_oss << ", ";
354       }
355       obsolete_file_oss << file_number;
356 
357       ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
358       continue;
359     } else if (!read_metadata_status.ok()) {
360       ROCKS_LOG_ERROR(db_options_.info_log,
361                       "Unable to read metadata of blob file %" PRIu64
362                       ", status: '%s'",
363                       file_number, read_metadata_status.ToString().c_str());
364       return read_metadata_status;
365     }
366 
367     total_blob_size_ += blob_file->GetFileSize();
368 
369     if (!blob_files_.empty()) {
370       blob_file_oss << ", ";
371     }
372     blob_file_oss << file_number;
373 
374     blob_files_[file_number] = blob_file;
375 
376     if (!blob_file->HasTTL()) {
377       if (!live_imm_non_ttl_blob_files_.empty()) {
378         live_imm_oss << ", ";
379       }
380       live_imm_oss << file_number;
381 
382       live_imm_non_ttl_blob_files_[file_number] = blob_file;
383     }
384   }
385 
386   ROCKS_LOG_INFO(db_options_.info_log,
387                  "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
388                  blob_file_oss.str().c_str());
389   ROCKS_LOG_INFO(
390       db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
391       live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
392   ROCKS_LOG_INFO(db_options_.info_log,
393                  "Found %" ROCKSDB_PRIszt
394                  " incomplete or corrupted blob files: %s",
395                  obsolete_files_.size(), obsolete_file_oss.str().c_str());
396   return s;
397 }
398 
399 template <typename Linker>
LinkSstToBlobFileImpl(uint64_t sst_file_number,uint64_t blob_file_number,Linker linker)400 void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
401                                        uint64_t blob_file_number,
402                                        Linker linker) {
403   assert(bdb_options_.enable_garbage_collection);
404   assert(blob_file_number != kInvalidBlobFileNumber);
405 
406   auto it = blob_files_.find(blob_file_number);
407   if (it == blob_files_.end()) {
408     ROCKS_LOG_WARN(db_options_.info_log,
409                    "Blob file %" PRIu64
410                    " not found while trying to link "
411                    "SST file %" PRIu64,
412                    blob_file_number, sst_file_number);
413     return;
414   }
415 
416   BlobFile* const blob_file = it->second.get();
417   assert(blob_file);
418 
419   linker(blob_file, sst_file_number);
420 
421   ROCKS_LOG_INFO(db_options_.info_log,
422                  "Blob file %" PRIu64 " linked to SST file %" PRIu64,
423                  blob_file_number, sst_file_number);
424 }
425 
LinkSstToBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)426 void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
427                                    uint64_t blob_file_number) {
428   auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
429     WriteLock file_lock(&blob_file->mutex_);
430     blob_file->LinkSstFile(sst_file);
431   };
432 
433   LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
434 }
435 
LinkSstToBlobFileNoLock(uint64_t sst_file_number,uint64_t blob_file_number)436 void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
437                                          uint64_t blob_file_number) {
438   auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
439     blob_file->LinkSstFile(sst_file);
440   };
441 
442   LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
443 }
444 
UnlinkSstFromBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)445 void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
446                                        uint64_t blob_file_number) {
447   assert(bdb_options_.enable_garbage_collection);
448   assert(blob_file_number != kInvalidBlobFileNumber);
449 
450   auto it = blob_files_.find(blob_file_number);
451   if (it == blob_files_.end()) {
452     ROCKS_LOG_WARN(db_options_.info_log,
453                    "Blob file %" PRIu64
454                    " not found while trying to unlink "
455                    "SST file %" PRIu64,
456                    blob_file_number, sst_file_number);
457     return;
458   }
459 
460   BlobFile* const blob_file = it->second.get();
461   assert(blob_file);
462 
463   {
464     WriteLock file_lock(&blob_file->mutex_);
465     blob_file->UnlinkSstFile(sst_file_number);
466   }
467 
468   ROCKS_LOG_INFO(db_options_.info_log,
469                  "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
470                  blob_file_number, sst_file_number);
471 }
472 
InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)473 void BlobDBImpl::InitializeBlobFileToSstMapping(
474     const std::vector<LiveFileMetaData>& live_files) {
475   assert(bdb_options_.enable_garbage_collection);
476 
477   for (const auto& live_file : live_files) {
478     const uint64_t sst_file_number = live_file.file_number;
479     const uint64_t blob_file_number = live_file.oldest_blob_file_number;
480 
481     if (blob_file_number == kInvalidBlobFileNumber) {
482       continue;
483     }
484 
485     LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
486   }
487 }
488 
ProcessFlushJobInfo(const FlushJobInfo & info)489 void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
490   assert(bdb_options_.enable_garbage_collection);
491 
492   WriteLock lock(&mutex_);
493 
494   if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
495     LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
496   }
497 
498   assert(flush_sequence_ < info.largest_seqno);
499   flush_sequence_ = info.largest_seqno;
500 
501   MarkUnreferencedBlobFilesObsolete();
502 }
503 
ProcessCompactionJobInfo(const CompactionJobInfo & info)504 void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
505   assert(bdb_options_.enable_garbage_collection);
506 
507   if (!info.status.ok()) {
508     return;
509   }
510 
511   // Note: the same SST file may appear in both the input and the output
512   // file list in case of a trivial move. We walk through the two lists
513   // below in a fashion that's similar to merge sort to detect this.
514 
515   auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
516     return lhs.file_number < rhs.file_number;
517   };
518 
519   auto inputs = info.input_file_infos;
520   auto iit = inputs.begin();
521   const auto iit_end = inputs.end();
522 
523   std::sort(iit, iit_end, cmp);
524 
525   auto outputs = info.output_file_infos;
526   auto oit = outputs.begin();
527   const auto oit_end = outputs.end();
528 
529   std::sort(oit, oit_end, cmp);
530 
531   WriteLock lock(&mutex_);
532 
533   while (iit != iit_end && oit != oit_end) {
534     const auto& input = *iit;
535     const auto& output = *oit;
536 
537     if (input.file_number == output.file_number) {
538       ++iit;
539       ++oit;
540     } else if (input.file_number < output.file_number) {
541       if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
542         UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
543       }
544 
545       ++iit;
546     } else {
547       assert(output.file_number < input.file_number);
548 
549       if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
550         LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
551       }
552 
553       ++oit;
554     }
555   }
556 
557   while (iit != iit_end) {
558     const auto& input = *iit;
559 
560     if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
561       UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
562     }
563 
564     ++iit;
565   }
566 
567   while (oit != oit_end) {
568     const auto& output = *oit;
569 
570     if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
571       LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
572     }
573 
574     ++oit;
575   }
576 
577   MarkUnreferencedBlobFilesObsolete();
578 }
579 
MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq)580 bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
581     const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
582   assert(blob_file);
583   assert(!blob_file->HasTTL());
584   assert(blob_file->Immutable());
585   assert(bdb_options_.enable_garbage_collection);
586 
587   // Note: FIFO eviction could have marked this file obsolete already.
588   if (blob_file->Obsolete()) {
589     return true;
590   }
591 
592   // We cannot mark this file (or any higher-numbered files for that matter)
593   // obsolete if it is referenced by any memtables or SSTs. We keep track of
594   // the SSTs explicitly. To account for memtables, we keep track of the highest
595   // sequence number received in flush notifications, and we do not mark the
596   // blob file obsolete if there are still unflushed memtables from before
597   // the time the blob file was closed.
598   if (blob_file->GetImmutableSequence() > flush_sequence_ ||
599       !blob_file->GetLinkedSstFiles().empty()) {
600     return false;
601   }
602 
603   ROCKS_LOG_INFO(db_options_.info_log,
604                  "Blob file %" PRIu64 " is no longer needed, marking obsolete",
605                  blob_file->BlobFileNumber());
606 
607   ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
608   return true;
609 }
610 
611 template <class Functor>
MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed)612 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
613   assert(bdb_options_.enable_garbage_collection);
614 
615   // Iterate through all live immutable non-TTL blob files, and mark them
616   // obsolete assuming no SST files or memtables rely on the blobs in them.
617   // Note: we need to stop as soon as we find a blob file that has any
618   // linked SSTs (or one potentially referenced by memtables).
619 
620   uint64_t obsoleted_files = 0;
621 
622   auto it = live_imm_non_ttl_blob_files_.begin();
623   while (it != live_imm_non_ttl_blob_files_.end()) {
624     const auto& blob_file = it->second;
625     assert(blob_file);
626     assert(blob_file->BlobFileNumber() == it->first);
627     assert(!blob_file->HasTTL());
628     assert(blob_file->Immutable());
629 
630     // Small optimization: Obsolete() does an atomic read, so we can do
631     // this check without taking a lock on the blob file's mutex.
632     if (blob_file->Obsolete()) {
633       it = live_imm_non_ttl_blob_files_.erase(it);
634       continue;
635     }
636 
637     if (!mark_if_needed(blob_file)) {
638       break;
639     }
640 
641     it = live_imm_non_ttl_blob_files_.erase(it);
642 
643     ++obsoleted_files;
644   }
645 
646   if (obsoleted_files > 0) {
647     ROCKS_LOG_INFO(db_options_.info_log,
648                    "%" PRIu64 " blob file(s) marked obsolete by GC",
649                    obsoleted_files);
650     RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
651   }
652 }
653 
MarkUnreferencedBlobFilesObsolete()654 void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
655   const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
656 
657   MarkUnreferencedBlobFilesObsoleteImpl(
658       [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
659         WriteLock file_lock(&blob_file->mutex_);
660         return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
661       });
662 }
663 
MarkUnreferencedBlobFilesObsoleteDuringOpen()664 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
665   MarkUnreferencedBlobFilesObsoleteImpl(
666       [this](const std::shared_ptr<BlobFile>& blob_file) {
667         return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
668       });
669 }
670 
CloseRandomAccessLocked(const std::shared_ptr<BlobFile> & bfile)671 void BlobDBImpl::CloseRandomAccessLocked(
672     const std::shared_ptr<BlobFile>& bfile) {
673   bfile->CloseRandomAccessLocked();
674   open_file_count_--;
675 }
676 
GetBlobFileReader(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<RandomAccessFileReader> * reader)677 Status BlobDBImpl::GetBlobFileReader(
678     const std::shared_ptr<BlobFile>& blob_file,
679     std::shared_ptr<RandomAccessFileReader>* reader) {
680   assert(reader != nullptr);
681   bool fresh_open = false;
682   Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open);
683   if (s.ok() && fresh_open) {
684     assert(*reader != nullptr);
685     open_file_count_++;
686   }
687   return s;
688 }
689 
NewBlobFile(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason)690 std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
691     bool has_ttl, const ExpirationRange& expiration_range,
692     const std::string& reason) {
693   assert(has_ttl == (expiration_range.first || expiration_range.second));
694 
695   uint64_t file_num = next_file_number_++;
696 
697   const uint32_t column_family_id =
698       static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
699   auto blob_file = std::make_shared<BlobFile>(
700       this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
701       bdb_options_.compression, has_ttl, expiration_range);
702 
703   ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
704                   blob_file->PathName().c_str(), reason.c_str());
705   LogFlush(db_options_.info_log);
706 
707   return blob_file;
708 }
709 
RegisterBlobFile(std::shared_ptr<BlobFile> blob_file)710 void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
711   const uint64_t blob_file_number = blob_file->BlobFileNumber();
712 
713   auto it = blob_files_.lower_bound(blob_file_number);
714   assert(it == blob_files_.end() || it->first != blob_file_number);
715 
716   blob_files_.insert(it,
717                      std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
718                          blob_file_number, std::move(blob_file)));
719 }
720 
CreateWriterLocked(const std::shared_ptr<BlobFile> & bfile)721 Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
722   std::string fpath(bfile->PathName());
723   std::unique_ptr<FSWritableFile> wfile;
724   const auto& fs = env_->GetFileSystem();
725 
726   Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr);
727   if (!s.ok()) {
728     ROCKS_LOG_ERROR(db_options_.info_log,
729                     "Failed to open blob file for write: %s status: '%s'"
730                     " exists: '%s'",
731                     fpath.c_str(), s.ToString().c_str(),
732                     fs->FileExists(fpath, file_options_.io_options, nullptr)
733                         .ToString()
734                         .c_str());
735     return s;
736   }
737 
738   std::unique_ptr<WritableFileWriter> fwriter;
739   fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_));
740 
741   uint64_t boffset = bfile->GetFileSize();
742   if (debug_level_ >= 2 && boffset) {
743     ROCKS_LOG_DEBUG(db_options_.info_log,
744                     "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
745                     boffset);
746   }
747 
748   BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
749   if (bfile->file_size_ == BlobLogHeader::kSize) {
750     et = BlobLogWriter::kEtFileHdr;
751   } else if (bfile->file_size_ > BlobLogHeader::kSize) {
752     et = BlobLogWriter::kEtRecord;
753   } else if (bfile->file_size_) {
754     ROCKS_LOG_WARN(db_options_.info_log,
755                    "Open blob file: %s with wrong size: %" PRIu64,
756                    fpath.c_str(), boffset);
757     return Status::Corruption("Invalid blob file size");
758   }
759 
760   constexpr bool do_flush = true;
761 
762   bfile->log_writer_ = std::make_shared<BlobLogWriter>(
763       std::move(fwriter), clock_, statistics_, bfile->file_number_,
764       db_options_.use_fsync, do_flush, boffset);
765   bfile->log_writer_->last_elem_type_ = et;
766 
767   return s;
768 }
769 
FindBlobFileLocked(uint64_t expiration) const770 std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
771     uint64_t expiration) const {
772   if (open_ttl_files_.empty()) {
773     return nullptr;
774   }
775 
776   std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
777   tmp->SetHasTTL(true);
778   tmp->expiration_range_ = std::make_pair(expiration, 0);
779   tmp->file_number_ = std::numeric_limits<uint64_t>::max();
780 
781   auto citr = open_ttl_files_.equal_range(tmp);
782   if (citr.first == open_ttl_files_.end()) {
783     assert(citr.second == open_ttl_files_.end());
784 
785     std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
786     return (check->expiration_range_.second <= expiration) ? nullptr : check;
787   }
788 
789   if (citr.first != citr.second) {
790     return *(citr.first);
791   }
792 
793   auto finditr = citr.second;
794   if (finditr != open_ttl_files_.begin()) {
795     --finditr;
796   }
797 
798   bool b2 = (*finditr)->expiration_range_.second <= expiration;
799   bool b1 = (*finditr)->expiration_range_.first > expiration;
800 
801   return (b1 || b2) ? nullptr : (*finditr);
802 }
803 
CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<BlobLogWriter> * writer)804 Status BlobDBImpl::CheckOrCreateWriterLocked(
805     const std::shared_ptr<BlobFile>& blob_file,
806     std::shared_ptr<BlobLogWriter>* writer) {
807   assert(writer != nullptr);
808   *writer = blob_file->GetWriter();
809   if (*writer != nullptr) {
810     return Status::OK();
811   }
812   Status s = CreateWriterLocked(blob_file);
813   if (s.ok()) {
814     *writer = blob_file->GetWriter();
815   }
816   return s;
817 }
818 
CreateBlobFileAndWriter(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason,std::shared_ptr<BlobFile> * blob_file,std::shared_ptr<BlobLogWriter> * writer)819 Status BlobDBImpl::CreateBlobFileAndWriter(
820     bool has_ttl, const ExpirationRange& expiration_range,
821     const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
822     std::shared_ptr<BlobLogWriter>* writer) {
823   TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
824   assert(has_ttl == (expiration_range.first || expiration_range.second));
825   assert(blob_file);
826   assert(writer);
827 
828   *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
829   assert(*blob_file);
830 
831   // file not visible, hence no lock
832   Status s = CheckOrCreateWriterLocked(*blob_file, writer);
833   if (!s.ok()) {
834     ROCKS_LOG_ERROR(db_options_.info_log,
835                     "Failed to get writer for blob file: %s, error: %s",
836                     (*blob_file)->PathName().c_str(), s.ToString().c_str());
837     return s;
838   }
839 
840   assert(*writer);
841 
842   s = (*writer)->WriteHeader((*blob_file)->header_);
843   if (!s.ok()) {
844     ROCKS_LOG_ERROR(db_options_.info_log,
845                     "Failed to write header to new blob file: %s"
846                     " status: '%s'",
847                     (*blob_file)->PathName().c_str(), s.ToString().c_str());
848     return s;
849   }
850 
851   (*blob_file)->SetFileSize(BlobLogHeader::kSize);
852   total_blob_size_ += BlobLogHeader::kSize;
853 
854   return s;
855 }
856 
SelectBlobFile(std::shared_ptr<BlobFile> * blob_file)857 Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
858   assert(blob_file);
859 
860   {
861     ReadLock rl(&mutex_);
862 
863     if (open_non_ttl_file_) {
864       assert(!open_non_ttl_file_->Immutable());
865       *blob_file = open_non_ttl_file_;
866       return Status::OK();
867     }
868   }
869 
870   // Check again
871   WriteLock wl(&mutex_);
872 
873   if (open_non_ttl_file_) {
874     assert(!open_non_ttl_file_->Immutable());
875     *blob_file = open_non_ttl_file_;
876     return Status::OK();
877   }
878 
879   std::shared_ptr<BlobLogWriter> writer;
880   const Status s = CreateBlobFileAndWriter(
881       /* has_ttl */ false, ExpirationRange(),
882       /* reason */ "SelectBlobFile", blob_file, &writer);
883   if (!s.ok()) {
884     return s;
885   }
886 
887   RegisterBlobFile(*blob_file);
888   open_non_ttl_file_ = *blob_file;
889 
890   return s;
891 }
892 
SelectBlobFileTTL(uint64_t expiration,std::shared_ptr<BlobFile> * blob_file)893 Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
894                                      std::shared_ptr<BlobFile>* blob_file) {
895   assert(blob_file);
896   assert(expiration != kNoExpiration);
897 
898   {
899     ReadLock rl(&mutex_);
900 
901     *blob_file = FindBlobFileLocked(expiration);
902     if (*blob_file != nullptr) {
903       assert(!(*blob_file)->Immutable());
904       return Status::OK();
905     }
906   }
907 
908   // Check again
909   WriteLock wl(&mutex_);
910 
911   *blob_file = FindBlobFileLocked(expiration);
912   if (*blob_file != nullptr) {
913     assert(!(*blob_file)->Immutable());
914     return Status::OK();
915   }
916 
917   const uint64_t exp_low =
918       (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
919   const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
920   const ExpirationRange expiration_range(exp_low, exp_high);
921 
922   std::ostringstream oss;
923   oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
924 
925   std::shared_ptr<BlobLogWriter> writer;
926   const Status s =
927       CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
928                               /* reason */ oss.str(), blob_file, &writer);
929   if (!s.ok()) {
930     return s;
931   }
932 
933   RegisterBlobFile(*blob_file);
934   open_ttl_files_.insert(*blob_file);
935 
936   return s;
937 }
938 
939 class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
940  private:
941   const WriteOptions& options_;
942   BlobDBImpl* blob_db_impl_;
943   uint32_t default_cf_id_;
944   WriteBatch batch_;
945 
946  public:
BlobInserter(const WriteOptions & options,BlobDBImpl * blob_db_impl,uint32_t default_cf_id)947   BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
948                uint32_t default_cf_id)
949       : options_(options),
950         blob_db_impl_(blob_db_impl),
951         default_cf_id_(default_cf_id) {}
952 
batch()953   WriteBatch* batch() { return &batch_; }
954 
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)955   Status PutCF(uint32_t column_family_id, const Slice& key,
956                const Slice& value) override {
957     if (column_family_id != default_cf_id_) {
958       return Status::NotSupported(
959           "Blob DB doesn't support non-default column family.");
960     }
961     Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
962                                            &batch_);
963     return s;
964   }
965 
DeleteCF(uint32_t column_family_id,const Slice & key)966   Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
967     if (column_family_id != default_cf_id_) {
968       return Status::NotSupported(
969           "Blob DB doesn't support non-default column family.");
970     }
971     Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
972     return s;
973   }
974 
DeleteRange(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)975   virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
976                              const Slice& end_key) {
977     if (column_family_id != default_cf_id_) {
978       return Status::NotSupported(
979           "Blob DB doesn't support non-default column family.");
980     }
981     Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
982                                                begin_key, end_key);
983     return s;
984   }
985 
SingleDeleteCF(uint32_t,const Slice &)986   Status SingleDeleteCF(uint32_t /*column_family_id*/,
987                         const Slice& /*key*/) override {
988     return Status::NotSupported("Not supported operation in blob db.");
989   }
990 
MergeCF(uint32_t,const Slice &,const Slice &)991   Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
992                  const Slice& /*value*/) override {
993     return Status::NotSupported("Not supported operation in blob db.");
994   }
995 
LogData(const Slice & blob)996   void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
997 };
998 
Write(const WriteOptions & options,WriteBatch * updates)999 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1000   StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
1001   RecordTick(statistics_, BLOB_DB_NUM_WRITE);
1002   uint32_t default_cf_id =
1003       static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
1004           ->GetID();
1005   Status s;
1006   BlobInserter blob_inserter(options, this, default_cf_id);
1007   {
1008     // Release write_mutex_ before DB write to avoid race condition with
1009     // flush begin listener, which also require write_mutex_ to sync
1010     // blob files.
1011     MutexLock l(&write_mutex_);
1012     s = updates->Iterate(&blob_inserter);
1013   }
1014   if (!s.ok()) {
1015     return s;
1016   }
1017   return db_->Write(options, blob_inserter.batch());
1018 }
1019 
Put(const WriteOptions & options,const Slice & key,const Slice & value)1020 Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
1021                        const Slice& value) {
1022   return PutUntil(options, key, value, kNoExpiration);
1023 }
1024 
PutWithTTL(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t ttl)1025 Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
1026                               const Slice& key, const Slice& value,
1027                               uint64_t ttl) {
1028   uint64_t now = EpochNow();
1029   uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
1030   return PutUntil(options, key, value, expiration);
1031 }
1032 
PutUntil(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t expiration)1033 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
1034                             const Slice& value, uint64_t expiration) {
1035   StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
1036   RecordTick(statistics_, BLOB_DB_NUM_PUT);
1037   Status s;
1038   WriteBatch batch;
1039   {
1040     // Release write_mutex_ before DB write to avoid race condition with
1041     // flush begin listener, which also require write_mutex_ to sync
1042     // blob files.
1043     MutexLock l(&write_mutex_);
1044     s = PutBlobValue(options, key, value, expiration, &batch);
1045   }
1046   if (s.ok()) {
1047     s = db_->Write(options, &batch);
1048   }
1049   return s;
1050 }
1051 
PutBlobValue(const WriteOptions &,const Slice & key,const Slice & value,uint64_t expiration,WriteBatch * batch)1052 Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
1053                                 const Slice& key, const Slice& value,
1054                                 uint64_t expiration, WriteBatch* batch) {
1055   write_mutex_.AssertHeld();
1056   Status s;
1057   std::string index_entry;
1058   uint32_t column_family_id =
1059       static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
1060           ->GetID();
1061   if (value.size() < bdb_options_.min_blob_size) {
1062     if (expiration == kNoExpiration) {
1063       // Put as normal value
1064       s = batch->Put(key, value);
1065       RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
1066     } else {
1067       // Inlined with TTL
1068       BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
1069       s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1070                                            index_entry);
1071       RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
1072     }
1073   } else {
1074     std::string compression_output;
1075     Slice value_compressed = GetCompressedSlice(value, &compression_output);
1076 
1077     std::string headerbuf;
1078     BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
1079                                        expiration);
1080 
1081     // Check DB size limit before selecting blob file to
1082     // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
1083     // done before calling SelectBlobFile().
1084     s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
1085                                    value_compressed.size());
1086     if (!s.ok()) {
1087       return s;
1088     }
1089 
1090     std::shared_ptr<BlobFile> blob_file;
1091     if (expiration != kNoExpiration) {
1092       s = SelectBlobFileTTL(expiration, &blob_file);
1093     } else {
1094       s = SelectBlobFile(&blob_file);
1095     }
1096     if (s.ok()) {
1097       assert(blob_file != nullptr);
1098       assert(blob_file->GetCompressionType() == bdb_options_.compression);
1099       s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
1100                      &index_entry);
1101     }
1102     if (s.ok()) {
1103       if (expiration != kNoExpiration) {
1104         WriteLock file_lock(&blob_file->mutex_);
1105         blob_file->ExtendExpirationRange(expiration);
1106       }
1107       s = CloseBlobFileIfNeeded(blob_file);
1108     }
1109     if (s.ok()) {
1110       s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1111                                            index_entry);
1112     }
1113     if (s.ok()) {
1114       if (expiration == kNoExpiration) {
1115         RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
1116       } else {
1117         RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
1118       }
1119     } else {
1120       ROCKS_LOG_ERROR(
1121           db_options_.info_log,
1122           "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
1123           " status: '%s' blob_file: '%s'",
1124           blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
1125           s.ToString().c_str(), blob_file->DumpState().c_str());
1126     }
1127   }
1128 
1129   RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
1130   RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
1131   RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
1132   RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
1133 
1134   return s;
1135 }
1136 
GetCompressedSlice(const Slice & raw,std::string * compression_output) const1137 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
1138                                      std::string* compression_output) const {
1139   if (bdb_options_.compression == kNoCompression) {
1140     return raw;
1141   }
1142   StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS);
1143   CompressionType type = bdb_options_.compression;
1144   CompressionOptions opts;
1145   CompressionContext context(type);
1146   CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
1147                        0 /* sample_for_compression */);
1148   CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
1149                 compression_output, nullptr, nullptr);
1150   return *compression_output;
1151 }
1152 
DecompressSlice(const Slice & compressed_value,CompressionType compression_type,PinnableSlice * value_output) const1153 Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
1154                                    CompressionType compression_type,
1155                                    PinnableSlice* value_output) const {
1156   assert(compression_type != kNoCompression);
1157 
1158   BlockContents contents;
1159   auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
1160 
1161   {
1162     StopWatch decompression_sw(clock_, statistics_,
1163                                BLOB_DB_DECOMPRESSION_MICROS);
1164     UncompressionContext context(compression_type);
1165     UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
1166                            compression_type);
1167     Status s = UncompressBlockContentsForCompressionType(
1168         info, compressed_value.data(), compressed_value.size(), &contents,
1169         kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
1170     if (!s.ok()) {
1171       return Status::Corruption("Unable to decompress blob.");
1172     }
1173   }
1174 
1175   value_output->PinSelf(contents.data);
1176 
1177   return Status::OK();
1178 }
1179 
CompactFiles(const CompactionOptions & compact_options,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)1180 Status BlobDBImpl::CompactFiles(
1181     const CompactionOptions& compact_options,
1182     const std::vector<std::string>& input_file_names, const int output_level,
1183     const int output_path_id, std::vector<std::string>* const output_file_names,
1184     CompactionJobInfo* compaction_job_info) {
1185   // Note: we need CompactionJobInfo to be able to track updates to the
1186   // blob file <-> SST mappings, so we provide one if the user hasn't,
1187   // assuming that GC is enabled.
1188   CompactionJobInfo info{};
1189   if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
1190     compaction_job_info = &info;
1191   }
1192 
1193   const Status s =
1194       db_->CompactFiles(compact_options, input_file_names, output_level,
1195                         output_path_id, output_file_names, compaction_job_info);
1196   if (!s.ok()) {
1197     return s;
1198   }
1199 
1200   if (bdb_options_.enable_garbage_collection) {
1201     assert(compaction_job_info);
1202     ProcessCompactionJobInfo(*compaction_job_info);
1203   }
1204 
1205   return s;
1206 }
1207 
GetCompactionContextCommon(BlobCompactionContext * context)1208 void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
1209   assert(context);
1210 
1211   context->blob_db_impl = this;
1212   context->next_file_number = next_file_number_.load();
1213   context->current_blob_files.clear();
1214   for (auto& p : blob_files_) {
1215     context->current_blob_files.insert(p.first);
1216   }
1217   context->fifo_eviction_seq = fifo_eviction_seq_;
1218   context->evict_expiration_up_to = evict_expiration_up_to_;
1219 }
1220 
GetCompactionContext(BlobCompactionContext * context)1221 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
1222   assert(context);
1223 
1224   ReadLock l(&mutex_);
1225   GetCompactionContextCommon(context);
1226 }
1227 
GetCompactionContext(BlobCompactionContext * context,BlobCompactionContextGC * context_gc)1228 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
1229                                       BlobCompactionContextGC* context_gc) {
1230   assert(context);
1231   assert(context_gc);
1232 
1233   ReadLock l(&mutex_);
1234   GetCompactionContextCommon(context);
1235 
1236   if (!live_imm_non_ttl_blob_files_.empty()) {
1237     auto it = live_imm_non_ttl_blob_files_.begin();
1238     std::advance(it, bdb_options_.garbage_collection_cutoff *
1239                          live_imm_non_ttl_blob_files_.size());
1240     context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
1241                                          ? it->first
1242                                          : std::numeric_limits<uint64_t>::max();
1243   }
1244 }
1245 
UpdateLiveSSTSize()1246 void BlobDBImpl::UpdateLiveSSTSize() {
1247   uint64_t live_sst_size = 0;
1248   bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
1249   if (ok) {
1250     live_sst_size_.store(live_sst_size);
1251     ROCKS_LOG_INFO(db_options_.info_log,
1252                    "Updated total SST file size: %" PRIu64 " bytes.",
1253                    live_sst_size);
1254   } else {
1255     ROCKS_LOG_ERROR(
1256         db_options_.info_log,
1257         "Failed to update total SST file size after flush or compaction.");
1258   }
1259   {
1260     // Trigger FIFO eviction if needed.
1261     MutexLock l(&write_mutex_);
1262     Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
1263     if (s.IsNoSpace()) {
1264       ROCKS_LOG_WARN(db_options_.info_log,
1265                      "DB grow out-of-space after SST size updated. Current live"
1266                      " SST size: %" PRIu64
1267                      " , current blob files size: %" PRIu64 ".",
1268                      live_sst_size_.load(), total_blob_size_.load());
1269     }
1270   }
1271 }
1272 
CheckSizeAndEvictBlobFiles(uint64_t blob_size,bool force_evict)1273 Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
1274                                               bool force_evict) {
1275   write_mutex_.AssertHeld();
1276 
1277   uint64_t live_sst_size = live_sst_size_.load();
1278   if (bdb_options_.max_db_size == 0 ||
1279       live_sst_size + total_blob_size_.load() + blob_size <=
1280           bdb_options_.max_db_size) {
1281     return Status::OK();
1282   }
1283 
1284   if (bdb_options_.is_fifo == false ||
1285       (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
1286     // FIFO eviction is disabled, or no space to insert new blob even we evict
1287     // all blob files.
1288     return Status::NoSpace(
1289         "Write failed, as writing it would exceed max_db_size limit.");
1290   }
1291 
1292   std::vector<std::shared_ptr<BlobFile>> candidate_files;
1293   CopyBlobFiles(&candidate_files);
1294   std::sort(candidate_files.begin(), candidate_files.end(),
1295             BlobFileComparator());
1296   fifo_eviction_seq_ = GetLatestSequenceNumber();
1297 
1298   WriteLock l(&mutex_);
1299 
1300   while (!candidate_files.empty() &&
1301          live_sst_size + total_blob_size_.load() + blob_size >
1302              bdb_options_.max_db_size) {
1303     std::shared_ptr<BlobFile> blob_file = candidate_files.back();
1304     candidate_files.pop_back();
1305     WriteLock file_lock(&blob_file->mutex_);
1306     if (blob_file->Obsolete()) {
1307       // File already obsoleted by someone else.
1308       assert(blob_file->Immutable());
1309       continue;
1310     }
1311     // FIFO eviction can evict open blob files.
1312     if (!blob_file->Immutable()) {
1313       Status s = CloseBlobFile(blob_file);
1314       if (!s.ok()) {
1315         return s;
1316       }
1317     }
1318     assert(blob_file->Immutable());
1319     auto expiration_range = blob_file->GetExpirationRange();
1320     ROCKS_LOG_INFO(db_options_.info_log,
1321                    "Evict oldest blob file since DB out of space. Current "
1322                    "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
1323                    ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
1324                    ".",
1325                    live_sst_size, total_blob_size_.load(),
1326                    bdb_options_.max_db_size, blob_file->BlobFileNumber());
1327     ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
1328     evict_expiration_up_to_ = expiration_range.first;
1329     RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
1330     RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
1331                blob_file->BlobCount());
1332     RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
1333                blob_file->GetFileSize());
1334     TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
1335   }
1336   if (live_sst_size + total_blob_size_.load() + blob_size >
1337       bdb_options_.max_db_size) {
1338     return Status::NoSpace(
1339         "Write failed, as writing it would exceed max_db_size limit.");
1340   }
1341   return Status::OK();
1342 }
1343 
AppendBlob(const std::shared_ptr<BlobFile> & bfile,const std::string & headerbuf,const Slice & key,const Slice & value,uint64_t expiration,std::string * index_entry)1344 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
1345                               const std::string& headerbuf, const Slice& key,
1346                               const Slice& value, uint64_t expiration,
1347                               std::string* index_entry) {
1348   Status s;
1349   uint64_t blob_offset = 0;
1350   uint64_t key_offset = 0;
1351   {
1352     WriteLock lockbfile_w(&bfile->mutex_);
1353     std::shared_ptr<BlobLogWriter> writer;
1354     s = CheckOrCreateWriterLocked(bfile, &writer);
1355     if (!s.ok()) {
1356       return s;
1357     }
1358 
1359     // write the blob to the blob log.
1360     s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
1361                                    &blob_offset);
1362   }
1363 
1364   if (!s.ok()) {
1365     ROCKS_LOG_ERROR(db_options_.info_log,
1366                     "Invalid status in AppendBlob: %s status: '%s'",
1367                     bfile->PathName().c_str(), s.ToString().c_str());
1368     return s;
1369   }
1370 
1371   uint64_t size_put = headerbuf.size() + key.size() + value.size();
1372   bfile->BlobRecordAdded(size_put);
1373   total_blob_size_ += size_put;
1374 
1375   if (expiration == kNoExpiration) {
1376     BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
1377                           value.size(), bdb_options_.compression);
1378   } else {
1379     BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
1380                              blob_offset, value.size(),
1381                              bdb_options_.compression);
1382   }
1383 
1384   return s;
1385 }
1386 
MultiGet(const ReadOptions & read_options,const std::vector<Slice> & keys,std::vector<std::string> * values)1387 std::vector<Status> BlobDBImpl::MultiGet(
1388     const ReadOptions& read_options,
1389     const std::vector<Slice>& keys, std::vector<std::string>* values) {
1390   StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
1391   RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
1392   // Get a snapshot to avoid blob file get deleted between we
1393   // fetch and index entry and reading from the file.
1394   ReadOptions ro(read_options);
1395   bool snapshot_created = SetSnapshotIfNeeded(&ro);
1396 
1397   std::vector<Status> statuses;
1398   statuses.reserve(keys.size());
1399   values->clear();
1400   values->reserve(keys.size());
1401   PinnableSlice value;
1402   for (size_t i = 0; i < keys.size(); i++) {
1403     statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
1404     values->push_back(value.ToString());
1405     value.Reset();
1406   }
1407   if (snapshot_created) {
1408     db_->ReleaseSnapshot(ro.snapshot);
1409   }
1410   return statuses;
1411 }
1412 
SetSnapshotIfNeeded(ReadOptions * read_options)1413 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
1414   assert(read_options != nullptr);
1415   if (read_options->snapshot != nullptr) {
1416     return false;
1417   }
1418   read_options->snapshot = db_->GetSnapshot();
1419   return true;
1420 }
1421 
GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value,uint64_t * expiration)1422 Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
1423                                 PinnableSlice* value, uint64_t* expiration) {
1424   assert(value);
1425 
1426   BlobIndex blob_index;
1427   Status s = blob_index.DecodeFrom(index_entry);
1428   if (!s.ok()) {
1429     return s;
1430   }
1431 
1432   if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
1433     return Status::NotFound("Key expired");
1434   }
1435 
1436   if (expiration != nullptr) {
1437     if (blob_index.HasTTL()) {
1438       *expiration = blob_index.expiration();
1439     } else {
1440       *expiration = kNoExpiration;
1441     }
1442   }
1443 
1444   if (blob_index.IsInlined()) {
1445     // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
1446     // memory buffer to avoid extra copy.
1447     value->PinSelf(blob_index.value());
1448     return Status::OK();
1449   }
1450 
1451   CompressionType compression_type = kNoCompression;
1452   s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
1453                          blob_index.size(), value, &compression_type);
1454   if (!s.ok()) {
1455     return s;
1456   }
1457 
1458   if (compression_type != kNoCompression) {
1459     s = DecompressSlice(*value, compression_type, value);
1460     if (!s.ok()) {
1461       if (debug_level_ >= 2) {
1462         ROCKS_LOG_ERROR(
1463             db_options_.info_log,
1464             "Uncompression error during blob read from file: %" PRIu64
1465             " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1466             " key: %s status: '%s'",
1467             blob_index.file_number(), blob_index.offset(), blob_index.size(),
1468             key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1469       }
1470       return s;
1471     }
1472   }
1473 
1474   return Status::OK();
1475 }
1476 
GetRawBlobFromFile(const Slice & key,uint64_t file_number,uint64_t offset,uint64_t size,PinnableSlice * value,CompressionType * compression_type)1477 Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
1478                                       uint64_t offset, uint64_t size,
1479                                       PinnableSlice* value,
1480                                       CompressionType* compression_type) {
1481   assert(value);
1482   assert(compression_type);
1483   assert(*compression_type == kNoCompression);
1484 
1485   if (!size) {
1486     value->PinSelf("");
1487     return Status::OK();
1488   }
1489 
1490   // offset has to have certain min, as we will read CRC
1491   // later from the Blob Header, which needs to be also a
1492   // valid offset.
1493   if (offset <
1494       (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
1495     if (debug_level_ >= 2) {
1496       ROCKS_LOG_ERROR(db_options_.info_log,
1497                       "Invalid blob index file_number: %" PRIu64
1498                       " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1499                       " key: %s",
1500                       file_number, offset, size,
1501                       key.ToString(/* output_hex */ true).c_str());
1502     }
1503 
1504     return Status::NotFound("Invalid blob offset");
1505   }
1506 
1507   std::shared_ptr<BlobFile> blob_file;
1508 
1509   {
1510     ReadLock rl(&mutex_);
1511     auto it = blob_files_.find(file_number);
1512 
1513     // file was deleted
1514     if (it == blob_files_.end()) {
1515       return Status::NotFound("Blob Not Found as blob file missing");
1516     }
1517 
1518     blob_file = it->second;
1519   }
1520 
1521   *compression_type = blob_file->GetCompressionType();
1522 
1523   // takes locks when called
1524   std::shared_ptr<RandomAccessFileReader> reader;
1525   Status s = GetBlobFileReader(blob_file, &reader);
1526   if (!s.ok()) {
1527     return s;
1528   }
1529 
1530   assert(offset >= key.size() + sizeof(uint32_t));
1531   const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
1532   const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
1533 
1534   // Allocate the buffer. This is safe in C++11
1535   std::string buf;
1536   AlignedBuf aligned_buf;
1537 
1538   // A partial blob record contain checksum, key and value.
1539   Slice blob_record;
1540 
1541   {
1542     StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
1543     if (reader->use_direct_io()) {
1544       s = reader->Read(IOOptions(), record_offset,
1545                        static_cast<size_t>(record_size), &blob_record, nullptr,
1546                        &aligned_buf);
1547     } else {
1548       buf.reserve(static_cast<size_t>(record_size));
1549       s = reader->Read(IOOptions(), record_offset,
1550                        static_cast<size_t>(record_size), &blob_record, &buf[0],
1551                        nullptr);
1552     }
1553     RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
1554   }
1555 
1556   if (!s.ok()) {
1557     ROCKS_LOG_DEBUG(
1558         db_options_.info_log,
1559         "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1560         ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
1561         file_number, offset, size, key.size(), s.ToString().c_str());
1562     return s;
1563   }
1564 
1565   if (blob_record.size() != record_size) {
1566     ROCKS_LOG_DEBUG(
1567         db_options_.info_log,
1568         "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1569         ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
1570         ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
1571         file_number, offset, size, key.size(), blob_record.size(), record_size);
1572 
1573     return Status::Corruption("Failed to retrieve blob from blob index.");
1574   }
1575 
1576   Slice crc_slice(blob_record.data(), sizeof(uint32_t));
1577   Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
1578                    static_cast<size_t>(size));
1579 
1580   uint32_t crc_exp = 0;
1581   if (!GetFixed32(&crc_slice, &crc_exp)) {
1582     ROCKS_LOG_DEBUG(
1583         db_options_.info_log,
1584         "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
1585         ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
1586         file_number, offset, size, key.size(), s.ToString().c_str());
1587     return Status::Corruption("Unable to decode checksum.");
1588   }
1589 
1590   uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
1591                                blob_record.size() - sizeof(uint32_t));
1592   crc = crc32c::Mask(crc);  // Adjust for storage
1593   if (crc != crc_exp) {
1594     if (debug_level_ >= 2) {
1595       ROCKS_LOG_ERROR(
1596           db_options_.info_log,
1597           "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
1598           " blob_size: %" PRIu64 " key: %s status: '%s'",
1599           file_number, offset, size,
1600           key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1601     }
1602 
1603     return Status::Corruption("Corruption. Blob CRC mismatch");
1604   }
1605 
1606   value->PinSelf(blob_value);
1607 
1608   return Status::OK();
1609 }
1610 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1611 Status BlobDBImpl::Get(const ReadOptions& read_options,
1612                        ColumnFamilyHandle* column_family, const Slice& key,
1613                        PinnableSlice* value) {
1614   return Get(read_options, column_family, key, value,
1615              static_cast<uint64_t*>(nullptr) /*expiration*/);
1616 }
1617 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1618 Status BlobDBImpl::Get(const ReadOptions& read_options,
1619                        ColumnFamilyHandle* column_family, const Slice& key,
1620                        PinnableSlice* value, uint64_t* expiration) {
1621   StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS);
1622   RecordTick(statistics_, BLOB_DB_NUM_GET);
1623   return GetImpl(read_options, column_family, key, value, expiration);
1624 }
1625 
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1626 Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
1627                            ColumnFamilyHandle* column_family, const Slice& key,
1628                            PinnableSlice* value, uint64_t* expiration) {
1629   if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
1630     return Status::NotSupported(
1631         "Blob DB doesn't support non-default column family.");
1632   }
1633   // Get a snapshot to avoid blob file get deleted between we
1634   // fetch and index entry and reading from the file.
1635   // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1636   ReadOptions ro(read_options);
1637   bool snapshot_created = SetSnapshotIfNeeded(&ro);
1638 
1639   PinnableSlice index_entry;
1640   Status s;
1641   bool is_blob_index = false;
1642   DBImpl::GetImplOptions get_impl_options;
1643   get_impl_options.column_family = column_family;
1644   get_impl_options.value = &index_entry;
1645   get_impl_options.is_blob_index = &is_blob_index;
1646   s = db_impl_->GetImpl(ro, key, get_impl_options);
1647   if (expiration != nullptr) {
1648     *expiration = kNoExpiration;
1649   }
1650   RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
1651   if (s.ok()) {
1652     if (is_blob_index) {
1653       s = GetBlobValue(key, index_entry, value, expiration);
1654     } else {
1655       // The index entry is the value itself in this case.
1656       value->PinSelf(index_entry);
1657     }
1658     RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
1659   }
1660   if (snapshot_created) {
1661     db_->ReleaseSnapshot(ro.snapshot);
1662   }
1663   return s;
1664 }
1665 
SanityCheck(bool aborted)1666 std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
1667   if (aborted) {
1668     return std::make_pair(false, -1);
1669   }
1670 
1671   ReadLock rl(&mutex_);
1672 
1673   ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
1674   ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
1675                  blob_files_.size());
1676   ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
1677                  open_ttl_files_.size());
1678 
1679   for (const auto& blob_file : open_ttl_files_) {
1680     (void)blob_file;
1681     assert(!blob_file->Immutable());
1682   }
1683 
1684   for (const auto& pair : live_imm_non_ttl_blob_files_) {
1685     const auto& blob_file = pair.second;
1686     (void)blob_file;
1687     assert(!blob_file->HasTTL());
1688     assert(blob_file->Immutable());
1689   }
1690 
1691   uint64_t now = EpochNow();
1692 
1693   for (auto blob_file_pair : blob_files_) {
1694     auto blob_file = blob_file_pair.second;
1695     std::ostringstream buf;
1696 
1697     buf << "Blob file " << blob_file->BlobFileNumber() << ", size "
1698         << blob_file->GetFileSize() << ", blob count " << blob_file->BlobCount()
1699         << ", immutable " << blob_file->Immutable();
1700 
1701     if (blob_file->HasTTL()) {
1702       ExpirationRange expiration_range;
1703       {
1704         ReadLock file_lock(&blob_file->mutex_);
1705         expiration_range = blob_file->GetExpirationRange();
1706       }
1707       buf << ", expiration range (" << expiration_range.first << ", "
1708           << expiration_range.second << ")";
1709 
1710       if (!blob_file->Obsolete()) {
1711         buf << ", expire in " << (expiration_range.second - now) << "seconds";
1712       }
1713     }
1714     if (blob_file->Obsolete()) {
1715       buf << ", obsolete at " << blob_file->GetObsoleteSequence();
1716     }
1717     buf << ".";
1718     ROCKS_LOG_INFO(db_options_.info_log, "%s", buf.str().c_str());
1719   }
1720 
1721   // reschedule
1722   return std::make_pair(true, -1);
1723 }
1724 
CloseBlobFile(std::shared_ptr<BlobFile> bfile)1725 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
1726   TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
1727   assert(bfile);
1728   assert(!bfile->Immutable());
1729   assert(!bfile->Obsolete());
1730 
1731   if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
1732     write_mutex_.AssertHeld();
1733   }
1734 
1735   ROCKS_LOG_INFO(db_options_.info_log,
1736                  "Closing blob file %" PRIu64 ". Path: %s",
1737                  bfile->BlobFileNumber(), bfile->PathName().c_str());
1738 
1739   const SequenceNumber sequence = GetLatestSequenceNumber();
1740 
1741   const Status s = bfile->WriteFooterAndCloseLocked(sequence);
1742 
1743   if (s.ok()) {
1744     total_blob_size_ += BlobLogFooter::kSize;
1745   } else {
1746     bfile->MarkImmutable(sequence);
1747 
1748     ROCKS_LOG_ERROR(db_options_.info_log,
1749                     "Failed to close blob file %" PRIu64 "with error: %s",
1750                     bfile->BlobFileNumber(), s.ToString().c_str());
1751   }
1752 
1753   if (bfile->HasTTL()) {
1754     size_t erased __attribute__((__unused__));
1755     erased = open_ttl_files_.erase(bfile);
1756   } else {
1757     if (bfile == open_non_ttl_file_) {
1758       open_non_ttl_file_ = nullptr;
1759     }
1760 
1761     const uint64_t blob_file_number = bfile->BlobFileNumber();
1762     auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
1763     assert(it == live_imm_non_ttl_blob_files_.end() ||
1764            it->first != blob_file_number);
1765     live_imm_non_ttl_blob_files_.insert(
1766         it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
1767                 blob_file_number, bfile));
1768   }
1769 
1770   return s;
1771 }
1772 
CloseBlobFileIfNeeded(std::shared_ptr<BlobFile> & bfile)1773 Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
1774   write_mutex_.AssertHeld();
1775 
1776   // atomic read
1777   if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
1778     return Status::OK();
1779   }
1780 
1781   WriteLock lock(&mutex_);
1782   WriteLock file_lock(&bfile->mutex_);
1783 
1784   assert(!bfile->Obsolete() || bfile->Immutable());
1785   if (bfile->Immutable()) {
1786     return Status::OK();
1787   }
1788 
1789   return CloseBlobFile(bfile);
1790 }
1791 
ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,SequenceNumber obsolete_seq,bool update_size)1792 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
1793                                   SequenceNumber obsolete_seq,
1794                                   bool update_size) {
1795   assert(blob_file->Immutable());
1796   assert(!blob_file->Obsolete());
1797 
1798   // Should hold write lock of mutex_ or during DB open.
1799   blob_file->MarkObsolete(obsolete_seq);
1800   obsolete_files_.push_back(blob_file);
1801   assert(total_blob_size_.load() >= blob_file->GetFileSize());
1802   if (update_size) {
1803     total_blob_size_ -= blob_file->GetFileSize();
1804   }
1805 }
1806 
VisibleToActiveSnapshot(const std::shared_ptr<BlobFile> & bfile)1807 bool BlobDBImpl::VisibleToActiveSnapshot(
1808     const std::shared_ptr<BlobFile>& bfile) {
1809   assert(bfile->Obsolete());
1810 
1811   // We check whether the oldest snapshot is no less than the last sequence
1812   // by the time the blob file become obsolete. If so, the blob file is not
1813   // visible to all existing snapshots.
1814   //
1815   // If we keep track of the earliest sequence of the keys in the blob file,
1816   // we could instead check if there's a snapshot falls in range
1817   // [earliest_sequence, obsolete_sequence). But doing so will make the
1818   // implementation more complicated.
1819   SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
1820   SequenceNumber oldest_snapshot = kMaxSequenceNumber;
1821   {
1822     // Need to lock DBImpl mutex before access snapshot list.
1823     InstrumentedMutexLock l(db_impl_->mutex());
1824     auto& snapshots = db_impl_->snapshots();
1825     if (!snapshots.empty()) {
1826       oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
1827     }
1828   }
1829   bool visible = oldest_snapshot < obsolete_sequence;
1830   if (visible) {
1831     ROCKS_LOG_INFO(db_options_.info_log,
1832                    "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
1833                    ") visible to oldest snapshot %" PRIu64 ".",
1834                    bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
1835   }
1836   return visible;
1837 }
1838 
EvictExpiredFiles(bool aborted)1839 std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
1840   if (aborted) {
1841     return std::make_pair(false, -1);
1842   }
1843 
1844   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
1845   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
1846 
1847   std::vector<std::shared_ptr<BlobFile>> process_files;
1848   uint64_t now = EpochNow();
1849   {
1850     ReadLock rl(&mutex_);
1851     for (auto p : blob_files_) {
1852       auto& blob_file = p.second;
1853       ReadLock file_lock(&blob_file->mutex_);
1854       if (blob_file->HasTTL() && !blob_file->Obsolete() &&
1855           blob_file->GetExpirationRange().second <= now) {
1856         process_files.push_back(blob_file);
1857       }
1858     }
1859   }
1860 
1861   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
1862   TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
1863   TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
1864 
1865   SequenceNumber seq = GetLatestSequenceNumber();
1866   {
1867     MutexLock l(&write_mutex_);
1868     WriteLock lock(&mutex_);
1869     for (auto& blob_file : process_files) {
1870       WriteLock file_lock(&blob_file->mutex_);
1871 
1872       // Need to double check if the file is obsolete.
1873       if (blob_file->Obsolete()) {
1874         assert(blob_file->Immutable());
1875         continue;
1876       }
1877 
1878       if (!blob_file->Immutable()) {
1879         CloseBlobFile(blob_file);
1880       }
1881 
1882       assert(blob_file->Immutable());
1883 
1884       ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
1885     }
1886   }
1887 
1888   return std::make_pair(true, -1);
1889 }
1890 
SyncBlobFiles()1891 Status BlobDBImpl::SyncBlobFiles() {
1892   MutexLock l(&write_mutex_);
1893 
1894   std::vector<std::shared_ptr<BlobFile>> process_files;
1895   {
1896     ReadLock rl(&mutex_);
1897     for (auto fitr : open_ttl_files_) {
1898       process_files.push_back(fitr);
1899     }
1900     if (open_non_ttl_file_ != nullptr) {
1901       process_files.push_back(open_non_ttl_file_);
1902     }
1903   }
1904 
1905   Status s;
1906   for (auto& blob_file : process_files) {
1907     s = blob_file->Fsync();
1908     if (!s.ok()) {
1909       ROCKS_LOG_ERROR(db_options_.info_log,
1910                       "Failed to sync blob file %" PRIu64 ", status: %s",
1911                       blob_file->BlobFileNumber(), s.ToString().c_str());
1912       return s;
1913     }
1914   }
1915 
1916   s = dir_ent_->Fsync();
1917   if (!s.ok()) {
1918     ROCKS_LOG_ERROR(db_options_.info_log,
1919                     "Failed to sync blob directory, status: %s",
1920                     s.ToString().c_str());
1921   }
1922   return s;
1923 }
1924 
ReclaimOpenFiles(bool aborted)1925 std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
1926   if (aborted) return std::make_pair(false, -1);
1927 
1928   if (open_file_count_.load() < kOpenFilesTrigger) {
1929     return std::make_pair(true, -1);
1930   }
1931 
1932   // in the future, we should sort by last_access_
1933   // instead of closing every file
1934   ReadLock rl(&mutex_);
1935   for (auto const& ent : blob_files_) {
1936     auto bfile = ent.second;
1937     if (bfile->last_access_.load() == -1) continue;
1938 
1939     WriteLock lockbfile_w(&bfile->mutex_);
1940     CloseRandomAccessLocked(bfile);
1941   }
1942 
1943   return std::make_pair(true, -1);
1944 }
1945 
DeleteObsoleteFiles(bool aborted)1946 std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
1947   if (aborted) {
1948     return std::make_pair(false, -1);
1949   }
1950 
1951   MutexLock delete_file_lock(&delete_file_mutex_);
1952   if (disable_file_deletions_ > 0) {
1953     return std::make_pair(true, -1);
1954   }
1955 
1956   std::list<std::shared_ptr<BlobFile>> tobsolete;
1957   {
1958     WriteLock wl(&mutex_);
1959     if (obsolete_files_.empty()) {
1960       return std::make_pair(true, -1);
1961     }
1962     tobsolete.swap(obsolete_files_);
1963   }
1964 
1965   bool file_deleted = false;
1966   for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
1967     auto bfile = *iter;
1968     {
1969       ReadLock lockbfile_r(&bfile->mutex_);
1970       if (VisibleToActiveSnapshot(bfile)) {
1971         ROCKS_LOG_INFO(db_options_.info_log,
1972                        "Could not delete file due to snapshot failure %s",
1973                        bfile->PathName().c_str());
1974         ++iter;
1975         continue;
1976       }
1977     }
1978     ROCKS_LOG_INFO(db_options_.info_log,
1979                    "Will delete file due to snapshot success %s",
1980                    bfile->PathName().c_str());
1981 
1982     {
1983       WriteLock wl(&mutex_);
1984       blob_files_.erase(bfile->BlobFileNumber());
1985     }
1986 
1987     Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
1988                             bfile->PathName(), blob_dir_, true,
1989                             /*force_fg=*/false);
1990     if (!s.ok()) {
1991       ROCKS_LOG_ERROR(db_options_.info_log,
1992                       "File failed to be deleted as obsolete %s",
1993                       bfile->PathName().c_str());
1994       ++iter;
1995       continue;
1996     }
1997 
1998     file_deleted = true;
1999     ROCKS_LOG_INFO(db_options_.info_log,
2000                    "File deleted as obsolete from blob dir %s",
2001                    bfile->PathName().c_str());
2002 
2003     iter = tobsolete.erase(iter);
2004   }
2005 
2006   // directory change. Fsync
2007   if (file_deleted) {
2008     Status s = dir_ent_->Fsync();
2009     if (!s.ok()) {
2010       ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
2011                       blob_dir_.c_str(), s.ToString().c_str());
2012     }
2013   }
2014 
2015   // put files back into obsolete if for some reason, delete failed
2016   if (!tobsolete.empty()) {
2017     WriteLock wl(&mutex_);
2018     for (auto bfile : tobsolete) {
2019       blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
2020       obsolete_files_.push_front(bfile);
2021     }
2022   }
2023 
2024   return std::make_pair(!aborted, -1);
2025 }
2026 
CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>> * bfiles_copy)2027 void BlobDBImpl::CopyBlobFiles(
2028     std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
2029   ReadLock rl(&mutex_);
2030   for (auto const& p : blob_files_) {
2031     bfiles_copy->push_back(p.second);
2032   }
2033 }
2034 
NewIterator(const ReadOptions & read_options)2035 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
2036   auto* cfd =
2037       static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
2038           ->cfd();
2039   // Get a snapshot to avoid blob file get deleted between we
2040   // fetch and index entry and reading from the file.
2041   ManagedSnapshot* own_snapshot = nullptr;
2042   const Snapshot* snapshot = read_options.snapshot;
2043   if (snapshot == nullptr) {
2044     own_snapshot = new ManagedSnapshot(db_);
2045     snapshot = own_snapshot->snapshot();
2046   }
2047   auto* iter = db_impl_->NewIteratorImpl(
2048       read_options, cfd, snapshot->GetSequenceNumber(),
2049       nullptr /*read_callback*/, true /*expose_blob_index*/);
2050   return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
2051 }
2052 
DestroyBlobDB(const std::string & dbname,const Options & options,const BlobDBOptions & bdb_options)2053 Status DestroyBlobDB(const std::string& dbname, const Options& options,
2054                      const BlobDBOptions& bdb_options) {
2055   const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
2056   Env* env = soptions.env;
2057 
2058   Status status;
2059   std::string blobdir;
2060   blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
2061                                         : bdb_options.blob_dir;
2062 
2063   std::vector<std::string> filenames;
2064   if (env->GetChildren(blobdir, &filenames).ok()) {
2065     for (const auto& f : filenames) {
2066       uint64_t number;
2067       FileType type;
2068       if (ParseFileName(f, &number, &type) && type == kBlobFile) {
2069         Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
2070                                   /*force_fg=*/false);
2071         if (status.ok() && !del.ok()) {
2072           status = del;
2073         }
2074       }
2075     }
2076     // TODO: What to do if we cannot delete the directory?
2077     env->DeleteDir(blobdir).PermitUncheckedError();
2078   }
2079   Status destroy = DestroyDB(dbname, options);
2080   if (status.ok() && !destroy.ok()) {
2081     status = destroy;
2082   }
2083 
2084   return status;
2085 }
2086 
2087 #ifndef NDEBUG
TEST_GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value)2088 Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
2089                                      PinnableSlice* value) {
2090   return GetBlobValue(key, index_entry, value);
2091 }
2092 
TEST_AddDummyBlobFile(uint64_t blob_file_number,SequenceNumber immutable_sequence)2093 void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
2094                                        SequenceNumber immutable_sequence) {
2095   auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
2096                                               db_options_.info_log.get());
2097   blob_file->MarkImmutable(immutable_sequence);
2098 
2099   blob_files_[blob_file_number] = blob_file;
2100   live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
2101 }
2102 
TEST_GetBlobFiles() const2103 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
2104   ReadLock l(&mutex_);
2105   std::vector<std::shared_ptr<BlobFile>> blob_files;
2106   for (auto& p : blob_files_) {
2107     blob_files.emplace_back(p.second);
2108   }
2109   return blob_files;
2110 }
2111 
TEST_GetLiveImmNonTTLFiles() const2112 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
2113     const {
2114   ReadLock l(&mutex_);
2115   std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
2116   for (const auto& pair : live_imm_non_ttl_blob_files_) {
2117     live_imm_non_ttl_files.emplace_back(pair.second);
2118   }
2119   return live_imm_non_ttl_files;
2120 }
2121 
TEST_GetObsoleteFiles() const2122 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
2123     const {
2124   ReadLock l(&mutex_);
2125   std::vector<std::shared_ptr<BlobFile>> obsolete_files;
2126   for (auto& bfile : obsolete_files_) {
2127     obsolete_files.emplace_back(bfile);
2128   }
2129   return obsolete_files;
2130 }
2131 
TEST_DeleteObsoleteFiles()2132 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
2133   DeleteObsoleteFiles(false /*abort*/);
2134 }
2135 
TEST_CloseBlobFile(std::shared_ptr<BlobFile> & bfile)2136 Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
2137   MutexLock l(&write_mutex_);
2138   WriteLock lock(&mutex_);
2139   WriteLock file_lock(&bfile->mutex_);
2140 
2141   return CloseBlobFile(bfile);
2142 }
2143 
TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq,bool update_size)2144 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
2145                                        SequenceNumber obsolete_seq,
2146                                        bool update_size) {
2147   return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
2148 }
2149 
TEST_EvictExpiredFiles()2150 void BlobDBImpl::TEST_EvictExpiredFiles() {
2151   EvictExpiredFiles(false /*abort*/);
2152 }
2153 
TEST_live_sst_size()2154 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
2155 
TEST_InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)2156 void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
2157     const std::vector<LiveFileMetaData>& live_files) {
2158   InitializeBlobFileToSstMapping(live_files);
2159 }
2160 
TEST_ProcessFlushJobInfo(const FlushJobInfo & info)2161 void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
2162   ProcessFlushJobInfo(info);
2163 }
2164 
TEST_ProcessCompactionJobInfo(const CompactionJobInfo & info)2165 void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
2166   ProcessCompactionJobInfo(info);
2167 }
2168 
2169 #endif  //  !NDEBUG
2170 
2171 }  // namespace blob_db
2172 }  // namespace ROCKSDB_NAMESPACE
2173 #endif  // ROCKSDB_LITE
2174