1
2 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/blob_db/blob_db_impl.h"
9 #include <algorithm>
10 #include <cinttypes>
11 #include <iomanip>
12 #include <memory>
13 #include <sstream>
14
15 #include "db/blob/blob_index.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/write_batch_internal.h"
18 #include "file/file_util.h"
19 #include "file/filename.h"
20 #include "file/random_access_file_reader.h"
21 #include "file/sst_file_manager_impl.h"
22 #include "file/writable_file_writer.h"
23 #include "logging/logging.h"
24 #include "monitoring/instrumented_mutex.h"
25 #include "monitoring/statistics.h"
26 #include "rocksdb/convenience.h"
27 #include "rocksdb/env.h"
28 #include "rocksdb/iterator.h"
29 #include "rocksdb/utilities/stackable_db.h"
30 #include "rocksdb/utilities/transaction.h"
31 #include "table/block_based/block.h"
32 #include "table/block_based/block_based_table_builder.h"
33 #include "table/block_based/block_builder.h"
34 #include "table/meta_blocks.h"
35 #include "test_util/sync_point.h"
36 #include "util/cast_util.h"
37 #include "util/crc32c.h"
38 #include "util/mutexlock.h"
39 #include "util/random.h"
40 #include "util/stop_watch.h"
41 #include "util/timer_queue.h"
42 #include "utilities/blob_db/blob_compaction_filter.h"
43 #include "utilities/blob_db/blob_db_iterator.h"
44 #include "utilities/blob_db/blob_db_listener.h"
45
46 namespace {
47 int kBlockBasedTableVersionFormat = 2;
48 } // end namespace
49
50 namespace ROCKSDB_NAMESPACE {
51 namespace blob_db {
52
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const53 bool BlobFileComparator::operator()(
54 const std::shared_ptr<BlobFile>& lhs,
55 const std::shared_ptr<BlobFile>& rhs) const {
56 return lhs->BlobFileNumber() > rhs->BlobFileNumber();
57 }
58
operator ()(const std::shared_ptr<BlobFile> & lhs,const std::shared_ptr<BlobFile> & rhs) const59 bool BlobFileComparatorTTL::operator()(
60 const std::shared_ptr<BlobFile>& lhs,
61 const std::shared_ptr<BlobFile>& rhs) const {
62 assert(lhs->HasTTL() && rhs->HasTTL());
63 if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
64 return true;
65 }
66 if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
67 return false;
68 }
69 return lhs->BlobFileNumber() < rhs->BlobFileNumber();
70 }
71
BlobDBImpl(const std::string & dbname,const BlobDBOptions & blob_db_options,const DBOptions & db_options,const ColumnFamilyOptions & cf_options)72 BlobDBImpl::BlobDBImpl(const std::string& dbname,
73 const BlobDBOptions& blob_db_options,
74 const DBOptions& db_options,
75 const ColumnFamilyOptions& cf_options)
76 : BlobDB(),
77 dbname_(dbname),
78 db_impl_(nullptr),
79 env_(db_options.env),
80 bdb_options_(blob_db_options),
81 db_options_(db_options),
82 cf_options_(cf_options),
83 file_options_(db_options),
84 statistics_(db_options_.statistics.get()),
85 next_file_number_(1),
86 flush_sequence_(0),
87 closed_(true),
88 open_file_count_(0),
89 total_blob_size_(0),
90 live_sst_size_(0),
91 fifo_eviction_seq_(0),
92 evict_expiration_up_to_(0),
93 debug_level_(0) {
94 clock_ = env_->GetSystemClock().get();
95 blob_dir_ = (bdb_options_.path_relative)
96 ? dbname + "/" + bdb_options_.blob_dir
97 : bdb_options_.blob_dir;
98 file_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
99 }
100
~BlobDBImpl()101 BlobDBImpl::~BlobDBImpl() {
102 tqueue_.shutdown();
103 // CancelAllBackgroundWork(db_, true);
104 Status s __attribute__((__unused__)) = Close();
105 assert(s.ok());
106 }
107
Close()108 Status BlobDBImpl::Close() {
109 if (closed_) {
110 return Status::OK();
111 }
112 closed_ = true;
113
114 // Close base DB before BlobDBImpl destructs to stop event listener and
115 // compaction filter call.
116 Status s = db_->Close();
117 // delete db_ anyway even if close failed.
118 delete db_;
119 // Reset pointers to avoid StackableDB delete the pointer again.
120 db_ = nullptr;
121 db_impl_ = nullptr;
122 if (!s.ok()) {
123 return s;
124 }
125
126 s = SyncBlobFiles();
127 return s;
128 }
129
GetBlobDBOptions() const130 BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
131
Open(std::vector<ColumnFamilyHandle * > * handles)132 Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
133 assert(handles != nullptr);
134 assert(db_ == nullptr);
135
136 if (blob_dir_.empty()) {
137 return Status::NotSupported("No blob directory in options");
138 }
139
140 if (bdb_options_.garbage_collection_cutoff < 0.0 ||
141 bdb_options_.garbage_collection_cutoff > 1.0) {
142 return Status::InvalidArgument(
143 "Garbage collection cutoff must be in the interval [0.0, 1.0]");
144 }
145
146 // Temporarily disable compactions in the base DB during open; save the user
147 // defined value beforehand so we can restore it once BlobDB is initialized.
148 // Note: this is only needed if garbage collection is enabled.
149 const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
150
151 if (bdb_options_.enable_garbage_collection) {
152 cf_options_.disable_auto_compactions = true;
153 }
154
155 Status s;
156
157 // Create info log.
158 if (db_options_.info_log == nullptr) {
159 s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
160 if (!s.ok()) {
161 return s;
162 }
163 }
164
165 ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
166
167 if ((cf_options_.compaction_filter != nullptr ||
168 cf_options_.compaction_filter_factory != nullptr)) {
169 ROCKS_LOG_INFO(db_options_.info_log,
170 "BlobDB only support compaction filter on non-TTL values.");
171 }
172
173 // Open blob directory.
174 s = env_->CreateDirIfMissing(blob_dir_);
175 if (!s.ok()) {
176 ROCKS_LOG_ERROR(db_options_.info_log,
177 "Failed to create blob_dir %s, status: %s",
178 blob_dir_.c_str(), s.ToString().c_str());
179 }
180 s = env_->NewDirectory(blob_dir_, &dir_ent_);
181 if (!s.ok()) {
182 ROCKS_LOG_ERROR(db_options_.info_log,
183 "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
184 s.ToString().c_str());
185 return s;
186 }
187
188 // Open blob files.
189 s = OpenAllBlobFiles();
190 if (!s.ok()) {
191 return s;
192 }
193
194 // Update options
195 if (bdb_options_.enable_garbage_collection) {
196 db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
197 cf_options_.compaction_filter_factory =
198 std::make_shared<BlobIndexCompactionFilterFactoryGC>(
199 this, clock_, cf_options_, statistics_);
200 } else {
201 db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
202 cf_options_.compaction_filter_factory =
203 std::make_shared<BlobIndexCompactionFilterFactory>(
204 this, clock_, cf_options_, statistics_);
205 }
206
207 // Reset user compaction filter after building into compaction factory.
208 cf_options_.compaction_filter = nullptr;
209
210 // Open base db.
211 ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
212 s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
213 if (!s.ok()) {
214 return s;
215 }
216 db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
217
218 // Sanitize the blob_dir provided. Using a directory where the
219 // base DB stores its files for the default CF is not supported.
220 const ColumnFamilyData* const cfd =
221 static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
222 assert(cfd);
223
224 const ImmutableCFOptions* const ioptions = cfd->ioptions();
225 assert(ioptions);
226
227 assert(env_);
228
229 for (const auto& cf_path : ioptions->cf_paths) {
230 bool blob_dir_same_as_cf_dir = false;
231 s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
232 if (!s.ok()) {
233 ROCKS_LOG_ERROR(db_options_.info_log,
234 "Error while sanitizing blob_dir %s, status: %s",
235 blob_dir_.c_str(), s.ToString().c_str());
236 return s;
237 }
238
239 if (blob_dir_same_as_cf_dir) {
240 return Status::NotSupported(
241 "Using the base DB's storage directories for BlobDB files is not "
242 "supported.");
243 }
244 }
245
246 // Initialize SST file <-> oldest blob file mapping if garbage collection
247 // is enabled.
248 if (bdb_options_.enable_garbage_collection) {
249 std::vector<LiveFileMetaData> live_files;
250 db_->GetLiveFilesMetaData(&live_files);
251
252 InitializeBlobFileToSstMapping(live_files);
253
254 MarkUnreferencedBlobFilesObsoleteDuringOpen();
255
256 if (!disable_auto_compactions) {
257 s = db_->EnableAutoCompaction(*handles);
258 if (!s.ok()) {
259 ROCKS_LOG_ERROR(
260 db_options_.info_log,
261 "Failed to enable automatic compactions during open, status: %s",
262 s.ToString().c_str());
263 return s;
264 }
265 }
266 }
267
268 // Add trash files in blob dir to file delete scheduler.
269 SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
270 db_impl_->immutable_db_options().sst_file_manager.get());
271 DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
272
273 UpdateLiveSSTSize();
274
275 // Start background jobs.
276 if (!bdb_options_.disable_background_tasks) {
277 StartBackgroundTasks();
278 }
279
280 ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
281 bdb_options_.Dump(db_options_.info_log.get());
282 closed_ = false;
283 return s;
284 }
285
StartBackgroundTasks()286 void BlobDBImpl::StartBackgroundTasks() {
287 // store a call to a member function and object
288 tqueue_.add(
289 kReclaimOpenFilesPeriodMillisecs,
290 std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
291 tqueue_.add(
292 kDeleteObsoleteFilesPeriodMillisecs,
293 std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
294 tqueue_.add(kSanityCheckPeriodMillisecs,
295 std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
296 tqueue_.add(
297 kEvictExpiredFilesPeriodMillisecs,
298 std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
299 }
300
GetAllBlobFiles(std::set<uint64_t> * file_numbers)301 Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
302 assert(file_numbers != nullptr);
303 std::vector<std::string> all_files;
304 Status s = env_->GetChildren(blob_dir_, &all_files);
305 if (!s.ok()) {
306 ROCKS_LOG_ERROR(db_options_.info_log,
307 "Failed to get list of blob files, status: %s",
308 s.ToString().c_str());
309 return s;
310 }
311
312 for (const auto& file_name : all_files) {
313 uint64_t file_number;
314 FileType type;
315 bool success = ParseFileName(file_name, &file_number, &type);
316 if (success && type == kBlobFile) {
317 file_numbers->insert(file_number);
318 } else {
319 ROCKS_LOG_WARN(db_options_.info_log,
320 "Skipping file in blob directory: %s", file_name.c_str());
321 }
322 }
323
324 return s;
325 }
326
OpenAllBlobFiles()327 Status BlobDBImpl::OpenAllBlobFiles() {
328 std::set<uint64_t> file_numbers;
329 Status s = GetAllBlobFiles(&file_numbers);
330 if (!s.ok()) {
331 return s;
332 }
333
334 if (!file_numbers.empty()) {
335 next_file_number_.store(*file_numbers.rbegin() + 1);
336 }
337
338 std::ostringstream blob_file_oss;
339 std::ostringstream live_imm_oss;
340 std::ostringstream obsolete_file_oss;
341
342 for (auto& file_number : file_numbers) {
343 std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
344 this, blob_dir_, file_number, db_options_.info_log.get());
345 blob_file->MarkImmutable(/* sequence */ 0);
346
347 // Read file header and footer
348 Status read_metadata_status =
349 blob_file->ReadMetadata(env_->GetFileSystem(), file_options_);
350 if (read_metadata_status.IsCorruption()) {
351 // Remove incomplete file.
352 if (!obsolete_files_.empty()) {
353 obsolete_file_oss << ", ";
354 }
355 obsolete_file_oss << file_number;
356
357 ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
358 continue;
359 } else if (!read_metadata_status.ok()) {
360 ROCKS_LOG_ERROR(db_options_.info_log,
361 "Unable to read metadata of blob file %" PRIu64
362 ", status: '%s'",
363 file_number, read_metadata_status.ToString().c_str());
364 return read_metadata_status;
365 }
366
367 total_blob_size_ += blob_file->GetFileSize();
368
369 if (!blob_files_.empty()) {
370 blob_file_oss << ", ";
371 }
372 blob_file_oss << file_number;
373
374 blob_files_[file_number] = blob_file;
375
376 if (!blob_file->HasTTL()) {
377 if (!live_imm_non_ttl_blob_files_.empty()) {
378 live_imm_oss << ", ";
379 }
380 live_imm_oss << file_number;
381
382 live_imm_non_ttl_blob_files_[file_number] = blob_file;
383 }
384 }
385
386 ROCKS_LOG_INFO(db_options_.info_log,
387 "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
388 blob_file_oss.str().c_str());
389 ROCKS_LOG_INFO(
390 db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
391 live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
392 ROCKS_LOG_INFO(db_options_.info_log,
393 "Found %" ROCKSDB_PRIszt
394 " incomplete or corrupted blob files: %s",
395 obsolete_files_.size(), obsolete_file_oss.str().c_str());
396 return s;
397 }
398
399 template <typename Linker>
LinkSstToBlobFileImpl(uint64_t sst_file_number,uint64_t blob_file_number,Linker linker)400 void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
401 uint64_t blob_file_number,
402 Linker linker) {
403 assert(bdb_options_.enable_garbage_collection);
404 assert(blob_file_number != kInvalidBlobFileNumber);
405
406 auto it = blob_files_.find(blob_file_number);
407 if (it == blob_files_.end()) {
408 ROCKS_LOG_WARN(db_options_.info_log,
409 "Blob file %" PRIu64
410 " not found while trying to link "
411 "SST file %" PRIu64,
412 blob_file_number, sst_file_number);
413 return;
414 }
415
416 BlobFile* const blob_file = it->second.get();
417 assert(blob_file);
418
419 linker(blob_file, sst_file_number);
420
421 ROCKS_LOG_INFO(db_options_.info_log,
422 "Blob file %" PRIu64 " linked to SST file %" PRIu64,
423 blob_file_number, sst_file_number);
424 }
425
LinkSstToBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)426 void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
427 uint64_t blob_file_number) {
428 auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
429 WriteLock file_lock(&blob_file->mutex_);
430 blob_file->LinkSstFile(sst_file);
431 };
432
433 LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
434 }
435
LinkSstToBlobFileNoLock(uint64_t sst_file_number,uint64_t blob_file_number)436 void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
437 uint64_t blob_file_number) {
438 auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
439 blob_file->LinkSstFile(sst_file);
440 };
441
442 LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
443 }
444
UnlinkSstFromBlobFile(uint64_t sst_file_number,uint64_t blob_file_number)445 void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
446 uint64_t blob_file_number) {
447 assert(bdb_options_.enable_garbage_collection);
448 assert(blob_file_number != kInvalidBlobFileNumber);
449
450 auto it = blob_files_.find(blob_file_number);
451 if (it == blob_files_.end()) {
452 ROCKS_LOG_WARN(db_options_.info_log,
453 "Blob file %" PRIu64
454 " not found while trying to unlink "
455 "SST file %" PRIu64,
456 blob_file_number, sst_file_number);
457 return;
458 }
459
460 BlobFile* const blob_file = it->second.get();
461 assert(blob_file);
462
463 {
464 WriteLock file_lock(&blob_file->mutex_);
465 blob_file->UnlinkSstFile(sst_file_number);
466 }
467
468 ROCKS_LOG_INFO(db_options_.info_log,
469 "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
470 blob_file_number, sst_file_number);
471 }
472
InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)473 void BlobDBImpl::InitializeBlobFileToSstMapping(
474 const std::vector<LiveFileMetaData>& live_files) {
475 assert(bdb_options_.enable_garbage_collection);
476
477 for (const auto& live_file : live_files) {
478 const uint64_t sst_file_number = live_file.file_number;
479 const uint64_t blob_file_number = live_file.oldest_blob_file_number;
480
481 if (blob_file_number == kInvalidBlobFileNumber) {
482 continue;
483 }
484
485 LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
486 }
487 }
488
ProcessFlushJobInfo(const FlushJobInfo & info)489 void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
490 assert(bdb_options_.enable_garbage_collection);
491
492 WriteLock lock(&mutex_);
493
494 if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
495 LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
496 }
497
498 assert(flush_sequence_ < info.largest_seqno);
499 flush_sequence_ = info.largest_seqno;
500
501 MarkUnreferencedBlobFilesObsolete();
502 }
503
ProcessCompactionJobInfo(const CompactionJobInfo & info)504 void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
505 assert(bdb_options_.enable_garbage_collection);
506
507 if (!info.status.ok()) {
508 return;
509 }
510
511 // Note: the same SST file may appear in both the input and the output
512 // file list in case of a trivial move. We walk through the two lists
513 // below in a fashion that's similar to merge sort to detect this.
514
515 auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
516 return lhs.file_number < rhs.file_number;
517 };
518
519 auto inputs = info.input_file_infos;
520 auto iit = inputs.begin();
521 const auto iit_end = inputs.end();
522
523 std::sort(iit, iit_end, cmp);
524
525 auto outputs = info.output_file_infos;
526 auto oit = outputs.begin();
527 const auto oit_end = outputs.end();
528
529 std::sort(oit, oit_end, cmp);
530
531 WriteLock lock(&mutex_);
532
533 while (iit != iit_end && oit != oit_end) {
534 const auto& input = *iit;
535 const auto& output = *oit;
536
537 if (input.file_number == output.file_number) {
538 ++iit;
539 ++oit;
540 } else if (input.file_number < output.file_number) {
541 if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
542 UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
543 }
544
545 ++iit;
546 } else {
547 assert(output.file_number < input.file_number);
548
549 if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
550 LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
551 }
552
553 ++oit;
554 }
555 }
556
557 while (iit != iit_end) {
558 const auto& input = *iit;
559
560 if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
561 UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
562 }
563
564 ++iit;
565 }
566
567 while (oit != oit_end) {
568 const auto& output = *oit;
569
570 if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
571 LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
572 }
573
574 ++oit;
575 }
576
577 MarkUnreferencedBlobFilesObsolete();
578 }
579
MarkBlobFileObsoleteIfNeeded(const std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq)580 bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
581 const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
582 assert(blob_file);
583 assert(!blob_file->HasTTL());
584 assert(blob_file->Immutable());
585 assert(bdb_options_.enable_garbage_collection);
586
587 // Note: FIFO eviction could have marked this file obsolete already.
588 if (blob_file->Obsolete()) {
589 return true;
590 }
591
592 // We cannot mark this file (or any higher-numbered files for that matter)
593 // obsolete if it is referenced by any memtables or SSTs. We keep track of
594 // the SSTs explicitly. To account for memtables, we keep track of the highest
595 // sequence number received in flush notifications, and we do not mark the
596 // blob file obsolete if there are still unflushed memtables from before
597 // the time the blob file was closed.
598 if (blob_file->GetImmutableSequence() > flush_sequence_ ||
599 !blob_file->GetLinkedSstFiles().empty()) {
600 return false;
601 }
602
603 ROCKS_LOG_INFO(db_options_.info_log,
604 "Blob file %" PRIu64 " is no longer needed, marking obsolete",
605 blob_file->BlobFileNumber());
606
607 ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
608 return true;
609 }
610
611 template <class Functor>
MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed)612 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
613 assert(bdb_options_.enable_garbage_collection);
614
615 // Iterate through all live immutable non-TTL blob files, and mark them
616 // obsolete assuming no SST files or memtables rely on the blobs in them.
617 // Note: we need to stop as soon as we find a blob file that has any
618 // linked SSTs (or one potentially referenced by memtables).
619
620 uint64_t obsoleted_files = 0;
621
622 auto it = live_imm_non_ttl_blob_files_.begin();
623 while (it != live_imm_non_ttl_blob_files_.end()) {
624 const auto& blob_file = it->second;
625 assert(blob_file);
626 assert(blob_file->BlobFileNumber() == it->first);
627 assert(!blob_file->HasTTL());
628 assert(blob_file->Immutable());
629
630 // Small optimization: Obsolete() does an atomic read, so we can do
631 // this check without taking a lock on the blob file's mutex.
632 if (blob_file->Obsolete()) {
633 it = live_imm_non_ttl_blob_files_.erase(it);
634 continue;
635 }
636
637 if (!mark_if_needed(blob_file)) {
638 break;
639 }
640
641 it = live_imm_non_ttl_blob_files_.erase(it);
642
643 ++obsoleted_files;
644 }
645
646 if (obsoleted_files > 0) {
647 ROCKS_LOG_INFO(db_options_.info_log,
648 "%" PRIu64 " blob file(s) marked obsolete by GC",
649 obsoleted_files);
650 RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
651 }
652 }
653
MarkUnreferencedBlobFilesObsolete()654 void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
655 const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
656
657 MarkUnreferencedBlobFilesObsoleteImpl(
658 [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
659 WriteLock file_lock(&blob_file->mutex_);
660 return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
661 });
662 }
663
MarkUnreferencedBlobFilesObsoleteDuringOpen()664 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
665 MarkUnreferencedBlobFilesObsoleteImpl(
666 [this](const std::shared_ptr<BlobFile>& blob_file) {
667 return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
668 });
669 }
670
CloseRandomAccessLocked(const std::shared_ptr<BlobFile> & bfile)671 void BlobDBImpl::CloseRandomAccessLocked(
672 const std::shared_ptr<BlobFile>& bfile) {
673 bfile->CloseRandomAccessLocked();
674 open_file_count_--;
675 }
676
GetBlobFileReader(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<RandomAccessFileReader> * reader)677 Status BlobDBImpl::GetBlobFileReader(
678 const std::shared_ptr<BlobFile>& blob_file,
679 std::shared_ptr<RandomAccessFileReader>* reader) {
680 assert(reader != nullptr);
681 bool fresh_open = false;
682 Status s = blob_file->GetReader(env_, file_options_, reader, &fresh_open);
683 if (s.ok() && fresh_open) {
684 assert(*reader != nullptr);
685 open_file_count_++;
686 }
687 return s;
688 }
689
NewBlobFile(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason)690 std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
691 bool has_ttl, const ExpirationRange& expiration_range,
692 const std::string& reason) {
693 assert(has_ttl == (expiration_range.first || expiration_range.second));
694
695 uint64_t file_num = next_file_number_++;
696
697 const uint32_t column_family_id =
698 static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
699 auto blob_file = std::make_shared<BlobFile>(
700 this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
701 bdb_options_.compression, has_ttl, expiration_range);
702
703 ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
704 blob_file->PathName().c_str(), reason.c_str());
705 LogFlush(db_options_.info_log);
706
707 return blob_file;
708 }
709
RegisterBlobFile(std::shared_ptr<BlobFile> blob_file)710 void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
711 const uint64_t blob_file_number = blob_file->BlobFileNumber();
712
713 auto it = blob_files_.lower_bound(blob_file_number);
714 assert(it == blob_files_.end() || it->first != blob_file_number);
715
716 blob_files_.insert(it,
717 std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
718 blob_file_number, std::move(blob_file)));
719 }
720
CreateWriterLocked(const std::shared_ptr<BlobFile> & bfile)721 Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
722 std::string fpath(bfile->PathName());
723 std::unique_ptr<FSWritableFile> wfile;
724 const auto& fs = env_->GetFileSystem();
725
726 Status s = fs->ReopenWritableFile(fpath, file_options_, &wfile, nullptr);
727 if (!s.ok()) {
728 ROCKS_LOG_ERROR(db_options_.info_log,
729 "Failed to open blob file for write: %s status: '%s'"
730 " exists: '%s'",
731 fpath.c_str(), s.ToString().c_str(),
732 fs->FileExists(fpath, file_options_.io_options, nullptr)
733 .ToString()
734 .c_str());
735 return s;
736 }
737
738 std::unique_ptr<WritableFileWriter> fwriter;
739 fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, file_options_));
740
741 uint64_t boffset = bfile->GetFileSize();
742 if (debug_level_ >= 2 && boffset) {
743 ROCKS_LOG_DEBUG(db_options_.info_log,
744 "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
745 boffset);
746 }
747
748 BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
749 if (bfile->file_size_ == BlobLogHeader::kSize) {
750 et = BlobLogWriter::kEtFileHdr;
751 } else if (bfile->file_size_ > BlobLogHeader::kSize) {
752 et = BlobLogWriter::kEtRecord;
753 } else if (bfile->file_size_) {
754 ROCKS_LOG_WARN(db_options_.info_log,
755 "Open blob file: %s with wrong size: %" PRIu64,
756 fpath.c_str(), boffset);
757 return Status::Corruption("Invalid blob file size");
758 }
759
760 constexpr bool do_flush = true;
761
762 bfile->log_writer_ = std::make_shared<BlobLogWriter>(
763 std::move(fwriter), clock_, statistics_, bfile->file_number_,
764 db_options_.use_fsync, do_flush, boffset);
765 bfile->log_writer_->last_elem_type_ = et;
766
767 return s;
768 }
769
FindBlobFileLocked(uint64_t expiration) const770 std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
771 uint64_t expiration) const {
772 if (open_ttl_files_.empty()) {
773 return nullptr;
774 }
775
776 std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
777 tmp->SetHasTTL(true);
778 tmp->expiration_range_ = std::make_pair(expiration, 0);
779 tmp->file_number_ = std::numeric_limits<uint64_t>::max();
780
781 auto citr = open_ttl_files_.equal_range(tmp);
782 if (citr.first == open_ttl_files_.end()) {
783 assert(citr.second == open_ttl_files_.end());
784
785 std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
786 return (check->expiration_range_.second <= expiration) ? nullptr : check;
787 }
788
789 if (citr.first != citr.second) {
790 return *(citr.first);
791 }
792
793 auto finditr = citr.second;
794 if (finditr != open_ttl_files_.begin()) {
795 --finditr;
796 }
797
798 bool b2 = (*finditr)->expiration_range_.second <= expiration;
799 bool b1 = (*finditr)->expiration_range_.first > expiration;
800
801 return (b1 || b2) ? nullptr : (*finditr);
802 }
803
CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile> & blob_file,std::shared_ptr<BlobLogWriter> * writer)804 Status BlobDBImpl::CheckOrCreateWriterLocked(
805 const std::shared_ptr<BlobFile>& blob_file,
806 std::shared_ptr<BlobLogWriter>* writer) {
807 assert(writer != nullptr);
808 *writer = blob_file->GetWriter();
809 if (*writer != nullptr) {
810 return Status::OK();
811 }
812 Status s = CreateWriterLocked(blob_file);
813 if (s.ok()) {
814 *writer = blob_file->GetWriter();
815 }
816 return s;
817 }
818
CreateBlobFileAndWriter(bool has_ttl,const ExpirationRange & expiration_range,const std::string & reason,std::shared_ptr<BlobFile> * blob_file,std::shared_ptr<BlobLogWriter> * writer)819 Status BlobDBImpl::CreateBlobFileAndWriter(
820 bool has_ttl, const ExpirationRange& expiration_range,
821 const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
822 std::shared_ptr<BlobLogWriter>* writer) {
823 TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
824 assert(has_ttl == (expiration_range.first || expiration_range.second));
825 assert(blob_file);
826 assert(writer);
827
828 *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
829 assert(*blob_file);
830
831 // file not visible, hence no lock
832 Status s = CheckOrCreateWriterLocked(*blob_file, writer);
833 if (!s.ok()) {
834 ROCKS_LOG_ERROR(db_options_.info_log,
835 "Failed to get writer for blob file: %s, error: %s",
836 (*blob_file)->PathName().c_str(), s.ToString().c_str());
837 return s;
838 }
839
840 assert(*writer);
841
842 s = (*writer)->WriteHeader((*blob_file)->header_);
843 if (!s.ok()) {
844 ROCKS_LOG_ERROR(db_options_.info_log,
845 "Failed to write header to new blob file: %s"
846 " status: '%s'",
847 (*blob_file)->PathName().c_str(), s.ToString().c_str());
848 return s;
849 }
850
851 (*blob_file)->SetFileSize(BlobLogHeader::kSize);
852 total_blob_size_ += BlobLogHeader::kSize;
853
854 return s;
855 }
856
SelectBlobFile(std::shared_ptr<BlobFile> * blob_file)857 Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
858 assert(blob_file);
859
860 {
861 ReadLock rl(&mutex_);
862
863 if (open_non_ttl_file_) {
864 assert(!open_non_ttl_file_->Immutable());
865 *blob_file = open_non_ttl_file_;
866 return Status::OK();
867 }
868 }
869
870 // Check again
871 WriteLock wl(&mutex_);
872
873 if (open_non_ttl_file_) {
874 assert(!open_non_ttl_file_->Immutable());
875 *blob_file = open_non_ttl_file_;
876 return Status::OK();
877 }
878
879 std::shared_ptr<BlobLogWriter> writer;
880 const Status s = CreateBlobFileAndWriter(
881 /* has_ttl */ false, ExpirationRange(),
882 /* reason */ "SelectBlobFile", blob_file, &writer);
883 if (!s.ok()) {
884 return s;
885 }
886
887 RegisterBlobFile(*blob_file);
888 open_non_ttl_file_ = *blob_file;
889
890 return s;
891 }
892
SelectBlobFileTTL(uint64_t expiration,std::shared_ptr<BlobFile> * blob_file)893 Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
894 std::shared_ptr<BlobFile>* blob_file) {
895 assert(blob_file);
896 assert(expiration != kNoExpiration);
897
898 {
899 ReadLock rl(&mutex_);
900
901 *blob_file = FindBlobFileLocked(expiration);
902 if (*blob_file != nullptr) {
903 assert(!(*blob_file)->Immutable());
904 return Status::OK();
905 }
906 }
907
908 // Check again
909 WriteLock wl(&mutex_);
910
911 *blob_file = FindBlobFileLocked(expiration);
912 if (*blob_file != nullptr) {
913 assert(!(*blob_file)->Immutable());
914 return Status::OK();
915 }
916
917 const uint64_t exp_low =
918 (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
919 const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
920 const ExpirationRange expiration_range(exp_low, exp_high);
921
922 std::ostringstream oss;
923 oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
924
925 std::shared_ptr<BlobLogWriter> writer;
926 const Status s =
927 CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
928 /* reason */ oss.str(), blob_file, &writer);
929 if (!s.ok()) {
930 return s;
931 }
932
933 RegisterBlobFile(*blob_file);
934 open_ttl_files_.insert(*blob_file);
935
936 return s;
937 }
938
939 class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
940 private:
941 const WriteOptions& options_;
942 BlobDBImpl* blob_db_impl_;
943 uint32_t default_cf_id_;
944 WriteBatch batch_;
945
946 public:
BlobInserter(const WriteOptions & options,BlobDBImpl * blob_db_impl,uint32_t default_cf_id)947 BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
948 uint32_t default_cf_id)
949 : options_(options),
950 blob_db_impl_(blob_db_impl),
951 default_cf_id_(default_cf_id) {}
952
batch()953 WriteBatch* batch() { return &batch_; }
954
PutCF(uint32_t column_family_id,const Slice & key,const Slice & value)955 Status PutCF(uint32_t column_family_id, const Slice& key,
956 const Slice& value) override {
957 if (column_family_id != default_cf_id_) {
958 return Status::NotSupported(
959 "Blob DB doesn't support non-default column family.");
960 }
961 Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
962 &batch_);
963 return s;
964 }
965
DeleteCF(uint32_t column_family_id,const Slice & key)966 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
967 if (column_family_id != default_cf_id_) {
968 return Status::NotSupported(
969 "Blob DB doesn't support non-default column family.");
970 }
971 Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
972 return s;
973 }
974
DeleteRange(uint32_t column_family_id,const Slice & begin_key,const Slice & end_key)975 virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
976 const Slice& end_key) {
977 if (column_family_id != default_cf_id_) {
978 return Status::NotSupported(
979 "Blob DB doesn't support non-default column family.");
980 }
981 Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
982 begin_key, end_key);
983 return s;
984 }
985
SingleDeleteCF(uint32_t,const Slice &)986 Status SingleDeleteCF(uint32_t /*column_family_id*/,
987 const Slice& /*key*/) override {
988 return Status::NotSupported("Not supported operation in blob db.");
989 }
990
MergeCF(uint32_t,const Slice &,const Slice &)991 Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
992 const Slice& /*value*/) override {
993 return Status::NotSupported("Not supported operation in blob db.");
994 }
995
LogData(const Slice & blob)996 void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
997 };
998
Write(const WriteOptions & options,WriteBatch * updates)999 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1000 StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
1001 RecordTick(statistics_, BLOB_DB_NUM_WRITE);
1002 uint32_t default_cf_id =
1003 static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
1004 ->GetID();
1005 Status s;
1006 BlobInserter blob_inserter(options, this, default_cf_id);
1007 {
1008 // Release write_mutex_ before DB write to avoid race condition with
1009 // flush begin listener, which also require write_mutex_ to sync
1010 // blob files.
1011 MutexLock l(&write_mutex_);
1012 s = updates->Iterate(&blob_inserter);
1013 }
1014 if (!s.ok()) {
1015 return s;
1016 }
1017 return db_->Write(options, blob_inserter.batch());
1018 }
1019
Put(const WriteOptions & options,const Slice & key,const Slice & value)1020 Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
1021 const Slice& value) {
1022 return PutUntil(options, key, value, kNoExpiration);
1023 }
1024
PutWithTTL(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t ttl)1025 Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
1026 const Slice& key, const Slice& value,
1027 uint64_t ttl) {
1028 uint64_t now = EpochNow();
1029 uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
1030 return PutUntil(options, key, value, expiration);
1031 }
1032
PutUntil(const WriteOptions & options,const Slice & key,const Slice & value,uint64_t expiration)1033 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
1034 const Slice& value, uint64_t expiration) {
1035 StopWatch write_sw(clock_, statistics_, BLOB_DB_WRITE_MICROS);
1036 RecordTick(statistics_, BLOB_DB_NUM_PUT);
1037 Status s;
1038 WriteBatch batch;
1039 {
1040 // Release write_mutex_ before DB write to avoid race condition with
1041 // flush begin listener, which also require write_mutex_ to sync
1042 // blob files.
1043 MutexLock l(&write_mutex_);
1044 s = PutBlobValue(options, key, value, expiration, &batch);
1045 }
1046 if (s.ok()) {
1047 s = db_->Write(options, &batch);
1048 }
1049 return s;
1050 }
1051
PutBlobValue(const WriteOptions &,const Slice & key,const Slice & value,uint64_t expiration,WriteBatch * batch)1052 Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
1053 const Slice& key, const Slice& value,
1054 uint64_t expiration, WriteBatch* batch) {
1055 write_mutex_.AssertHeld();
1056 Status s;
1057 std::string index_entry;
1058 uint32_t column_family_id =
1059 static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
1060 ->GetID();
1061 if (value.size() < bdb_options_.min_blob_size) {
1062 if (expiration == kNoExpiration) {
1063 // Put as normal value
1064 s = batch->Put(key, value);
1065 RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
1066 } else {
1067 // Inlined with TTL
1068 BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
1069 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1070 index_entry);
1071 RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
1072 }
1073 } else {
1074 std::string compression_output;
1075 Slice value_compressed = GetCompressedSlice(value, &compression_output);
1076
1077 std::string headerbuf;
1078 BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
1079 expiration);
1080
1081 // Check DB size limit before selecting blob file to
1082 // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
1083 // done before calling SelectBlobFile().
1084 s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
1085 value_compressed.size());
1086 if (!s.ok()) {
1087 return s;
1088 }
1089
1090 std::shared_ptr<BlobFile> blob_file;
1091 if (expiration != kNoExpiration) {
1092 s = SelectBlobFileTTL(expiration, &blob_file);
1093 } else {
1094 s = SelectBlobFile(&blob_file);
1095 }
1096 if (s.ok()) {
1097 assert(blob_file != nullptr);
1098 assert(blob_file->GetCompressionType() == bdb_options_.compression);
1099 s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
1100 &index_entry);
1101 }
1102 if (s.ok()) {
1103 if (expiration != kNoExpiration) {
1104 WriteLock file_lock(&blob_file->mutex_);
1105 blob_file->ExtendExpirationRange(expiration);
1106 }
1107 s = CloseBlobFileIfNeeded(blob_file);
1108 }
1109 if (s.ok()) {
1110 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1111 index_entry);
1112 }
1113 if (s.ok()) {
1114 if (expiration == kNoExpiration) {
1115 RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
1116 } else {
1117 RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
1118 }
1119 } else {
1120 ROCKS_LOG_ERROR(
1121 db_options_.info_log,
1122 "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
1123 " status: '%s' blob_file: '%s'",
1124 blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
1125 s.ToString().c_str(), blob_file->DumpState().c_str());
1126 }
1127 }
1128
1129 RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
1130 RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
1131 RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
1132 RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
1133
1134 return s;
1135 }
1136
GetCompressedSlice(const Slice & raw,std::string * compression_output) const1137 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
1138 std::string* compression_output) const {
1139 if (bdb_options_.compression == kNoCompression) {
1140 return raw;
1141 }
1142 StopWatch compression_sw(clock_, statistics_, BLOB_DB_COMPRESSION_MICROS);
1143 CompressionType type = bdb_options_.compression;
1144 CompressionOptions opts;
1145 CompressionContext context(type);
1146 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
1147 0 /* sample_for_compression */);
1148 CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
1149 compression_output, nullptr, nullptr);
1150 return *compression_output;
1151 }
1152
DecompressSlice(const Slice & compressed_value,CompressionType compression_type,PinnableSlice * value_output) const1153 Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
1154 CompressionType compression_type,
1155 PinnableSlice* value_output) const {
1156 assert(compression_type != kNoCompression);
1157
1158 BlockContents contents;
1159 auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
1160
1161 {
1162 StopWatch decompression_sw(clock_, statistics_,
1163 BLOB_DB_DECOMPRESSION_MICROS);
1164 UncompressionContext context(compression_type);
1165 UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
1166 compression_type);
1167 Status s = UncompressBlockContentsForCompressionType(
1168 info, compressed_value.data(), compressed_value.size(), &contents,
1169 kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
1170 if (!s.ok()) {
1171 return Status::Corruption("Unable to decompress blob.");
1172 }
1173 }
1174
1175 value_output->PinSelf(contents.data);
1176
1177 return Status::OK();
1178 }
1179
CompactFiles(const CompactionOptions & compact_options,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)1180 Status BlobDBImpl::CompactFiles(
1181 const CompactionOptions& compact_options,
1182 const std::vector<std::string>& input_file_names, const int output_level,
1183 const int output_path_id, std::vector<std::string>* const output_file_names,
1184 CompactionJobInfo* compaction_job_info) {
1185 // Note: we need CompactionJobInfo to be able to track updates to the
1186 // blob file <-> SST mappings, so we provide one if the user hasn't,
1187 // assuming that GC is enabled.
1188 CompactionJobInfo info{};
1189 if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
1190 compaction_job_info = &info;
1191 }
1192
1193 const Status s =
1194 db_->CompactFiles(compact_options, input_file_names, output_level,
1195 output_path_id, output_file_names, compaction_job_info);
1196 if (!s.ok()) {
1197 return s;
1198 }
1199
1200 if (bdb_options_.enable_garbage_collection) {
1201 assert(compaction_job_info);
1202 ProcessCompactionJobInfo(*compaction_job_info);
1203 }
1204
1205 return s;
1206 }
1207
GetCompactionContextCommon(BlobCompactionContext * context)1208 void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
1209 assert(context);
1210
1211 context->blob_db_impl = this;
1212 context->next_file_number = next_file_number_.load();
1213 context->current_blob_files.clear();
1214 for (auto& p : blob_files_) {
1215 context->current_blob_files.insert(p.first);
1216 }
1217 context->fifo_eviction_seq = fifo_eviction_seq_;
1218 context->evict_expiration_up_to = evict_expiration_up_to_;
1219 }
1220
GetCompactionContext(BlobCompactionContext * context)1221 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
1222 assert(context);
1223
1224 ReadLock l(&mutex_);
1225 GetCompactionContextCommon(context);
1226 }
1227
GetCompactionContext(BlobCompactionContext * context,BlobCompactionContextGC * context_gc)1228 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
1229 BlobCompactionContextGC* context_gc) {
1230 assert(context);
1231 assert(context_gc);
1232
1233 ReadLock l(&mutex_);
1234 GetCompactionContextCommon(context);
1235
1236 if (!live_imm_non_ttl_blob_files_.empty()) {
1237 auto it = live_imm_non_ttl_blob_files_.begin();
1238 std::advance(it, bdb_options_.garbage_collection_cutoff *
1239 live_imm_non_ttl_blob_files_.size());
1240 context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
1241 ? it->first
1242 : std::numeric_limits<uint64_t>::max();
1243 }
1244 }
1245
UpdateLiveSSTSize()1246 void BlobDBImpl::UpdateLiveSSTSize() {
1247 uint64_t live_sst_size = 0;
1248 bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
1249 if (ok) {
1250 live_sst_size_.store(live_sst_size);
1251 ROCKS_LOG_INFO(db_options_.info_log,
1252 "Updated total SST file size: %" PRIu64 " bytes.",
1253 live_sst_size);
1254 } else {
1255 ROCKS_LOG_ERROR(
1256 db_options_.info_log,
1257 "Failed to update total SST file size after flush or compaction.");
1258 }
1259 {
1260 // Trigger FIFO eviction if needed.
1261 MutexLock l(&write_mutex_);
1262 Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
1263 if (s.IsNoSpace()) {
1264 ROCKS_LOG_WARN(db_options_.info_log,
1265 "DB grow out-of-space after SST size updated. Current live"
1266 " SST size: %" PRIu64
1267 " , current blob files size: %" PRIu64 ".",
1268 live_sst_size_.load(), total_blob_size_.load());
1269 }
1270 }
1271 }
1272
CheckSizeAndEvictBlobFiles(uint64_t blob_size,bool force_evict)1273 Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
1274 bool force_evict) {
1275 write_mutex_.AssertHeld();
1276
1277 uint64_t live_sst_size = live_sst_size_.load();
1278 if (bdb_options_.max_db_size == 0 ||
1279 live_sst_size + total_blob_size_.load() + blob_size <=
1280 bdb_options_.max_db_size) {
1281 return Status::OK();
1282 }
1283
1284 if (bdb_options_.is_fifo == false ||
1285 (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
1286 // FIFO eviction is disabled, or no space to insert new blob even we evict
1287 // all blob files.
1288 return Status::NoSpace(
1289 "Write failed, as writing it would exceed max_db_size limit.");
1290 }
1291
1292 std::vector<std::shared_ptr<BlobFile>> candidate_files;
1293 CopyBlobFiles(&candidate_files);
1294 std::sort(candidate_files.begin(), candidate_files.end(),
1295 BlobFileComparator());
1296 fifo_eviction_seq_ = GetLatestSequenceNumber();
1297
1298 WriteLock l(&mutex_);
1299
1300 while (!candidate_files.empty() &&
1301 live_sst_size + total_blob_size_.load() + blob_size >
1302 bdb_options_.max_db_size) {
1303 std::shared_ptr<BlobFile> blob_file = candidate_files.back();
1304 candidate_files.pop_back();
1305 WriteLock file_lock(&blob_file->mutex_);
1306 if (blob_file->Obsolete()) {
1307 // File already obsoleted by someone else.
1308 assert(blob_file->Immutable());
1309 continue;
1310 }
1311 // FIFO eviction can evict open blob files.
1312 if (!blob_file->Immutable()) {
1313 Status s = CloseBlobFile(blob_file);
1314 if (!s.ok()) {
1315 return s;
1316 }
1317 }
1318 assert(blob_file->Immutable());
1319 auto expiration_range = blob_file->GetExpirationRange();
1320 ROCKS_LOG_INFO(db_options_.info_log,
1321 "Evict oldest blob file since DB out of space. Current "
1322 "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
1323 ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
1324 ".",
1325 live_sst_size, total_blob_size_.load(),
1326 bdb_options_.max_db_size, blob_file->BlobFileNumber());
1327 ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
1328 evict_expiration_up_to_ = expiration_range.first;
1329 RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
1330 RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
1331 blob_file->BlobCount());
1332 RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
1333 blob_file->GetFileSize());
1334 TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
1335 }
1336 if (live_sst_size + total_blob_size_.load() + blob_size >
1337 bdb_options_.max_db_size) {
1338 return Status::NoSpace(
1339 "Write failed, as writing it would exceed max_db_size limit.");
1340 }
1341 return Status::OK();
1342 }
1343
AppendBlob(const std::shared_ptr<BlobFile> & bfile,const std::string & headerbuf,const Slice & key,const Slice & value,uint64_t expiration,std::string * index_entry)1344 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
1345 const std::string& headerbuf, const Slice& key,
1346 const Slice& value, uint64_t expiration,
1347 std::string* index_entry) {
1348 Status s;
1349 uint64_t blob_offset = 0;
1350 uint64_t key_offset = 0;
1351 {
1352 WriteLock lockbfile_w(&bfile->mutex_);
1353 std::shared_ptr<BlobLogWriter> writer;
1354 s = CheckOrCreateWriterLocked(bfile, &writer);
1355 if (!s.ok()) {
1356 return s;
1357 }
1358
1359 // write the blob to the blob log.
1360 s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
1361 &blob_offset);
1362 }
1363
1364 if (!s.ok()) {
1365 ROCKS_LOG_ERROR(db_options_.info_log,
1366 "Invalid status in AppendBlob: %s status: '%s'",
1367 bfile->PathName().c_str(), s.ToString().c_str());
1368 return s;
1369 }
1370
1371 uint64_t size_put = headerbuf.size() + key.size() + value.size();
1372 bfile->BlobRecordAdded(size_put);
1373 total_blob_size_ += size_put;
1374
1375 if (expiration == kNoExpiration) {
1376 BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
1377 value.size(), bdb_options_.compression);
1378 } else {
1379 BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
1380 blob_offset, value.size(),
1381 bdb_options_.compression);
1382 }
1383
1384 return s;
1385 }
1386
MultiGet(const ReadOptions & read_options,const std::vector<Slice> & keys,std::vector<std::string> * values)1387 std::vector<Status> BlobDBImpl::MultiGet(
1388 const ReadOptions& read_options,
1389 const std::vector<Slice>& keys, std::vector<std::string>* values) {
1390 StopWatch multiget_sw(clock_, statistics_, BLOB_DB_MULTIGET_MICROS);
1391 RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
1392 // Get a snapshot to avoid blob file get deleted between we
1393 // fetch and index entry and reading from the file.
1394 ReadOptions ro(read_options);
1395 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1396
1397 std::vector<Status> statuses;
1398 statuses.reserve(keys.size());
1399 values->clear();
1400 values->reserve(keys.size());
1401 PinnableSlice value;
1402 for (size_t i = 0; i < keys.size(); i++) {
1403 statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
1404 values->push_back(value.ToString());
1405 value.Reset();
1406 }
1407 if (snapshot_created) {
1408 db_->ReleaseSnapshot(ro.snapshot);
1409 }
1410 return statuses;
1411 }
1412
SetSnapshotIfNeeded(ReadOptions * read_options)1413 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
1414 assert(read_options != nullptr);
1415 if (read_options->snapshot != nullptr) {
1416 return false;
1417 }
1418 read_options->snapshot = db_->GetSnapshot();
1419 return true;
1420 }
1421
GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value,uint64_t * expiration)1422 Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
1423 PinnableSlice* value, uint64_t* expiration) {
1424 assert(value);
1425
1426 BlobIndex blob_index;
1427 Status s = blob_index.DecodeFrom(index_entry);
1428 if (!s.ok()) {
1429 return s;
1430 }
1431
1432 if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
1433 return Status::NotFound("Key expired");
1434 }
1435
1436 if (expiration != nullptr) {
1437 if (blob_index.HasTTL()) {
1438 *expiration = blob_index.expiration();
1439 } else {
1440 *expiration = kNoExpiration;
1441 }
1442 }
1443
1444 if (blob_index.IsInlined()) {
1445 // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
1446 // memory buffer to avoid extra copy.
1447 value->PinSelf(blob_index.value());
1448 return Status::OK();
1449 }
1450
1451 CompressionType compression_type = kNoCompression;
1452 s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
1453 blob_index.size(), value, &compression_type);
1454 if (!s.ok()) {
1455 return s;
1456 }
1457
1458 if (compression_type != kNoCompression) {
1459 s = DecompressSlice(*value, compression_type, value);
1460 if (!s.ok()) {
1461 if (debug_level_ >= 2) {
1462 ROCKS_LOG_ERROR(
1463 db_options_.info_log,
1464 "Uncompression error during blob read from file: %" PRIu64
1465 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1466 " key: %s status: '%s'",
1467 blob_index.file_number(), blob_index.offset(), blob_index.size(),
1468 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1469 }
1470 return s;
1471 }
1472 }
1473
1474 return Status::OK();
1475 }
1476
GetRawBlobFromFile(const Slice & key,uint64_t file_number,uint64_t offset,uint64_t size,PinnableSlice * value,CompressionType * compression_type)1477 Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
1478 uint64_t offset, uint64_t size,
1479 PinnableSlice* value,
1480 CompressionType* compression_type) {
1481 assert(value);
1482 assert(compression_type);
1483 assert(*compression_type == kNoCompression);
1484
1485 if (!size) {
1486 value->PinSelf("");
1487 return Status::OK();
1488 }
1489
1490 // offset has to have certain min, as we will read CRC
1491 // later from the Blob Header, which needs to be also a
1492 // valid offset.
1493 if (offset <
1494 (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
1495 if (debug_level_ >= 2) {
1496 ROCKS_LOG_ERROR(db_options_.info_log,
1497 "Invalid blob index file_number: %" PRIu64
1498 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1499 " key: %s",
1500 file_number, offset, size,
1501 key.ToString(/* output_hex */ true).c_str());
1502 }
1503
1504 return Status::NotFound("Invalid blob offset");
1505 }
1506
1507 std::shared_ptr<BlobFile> blob_file;
1508
1509 {
1510 ReadLock rl(&mutex_);
1511 auto it = blob_files_.find(file_number);
1512
1513 // file was deleted
1514 if (it == blob_files_.end()) {
1515 return Status::NotFound("Blob Not Found as blob file missing");
1516 }
1517
1518 blob_file = it->second;
1519 }
1520
1521 *compression_type = blob_file->GetCompressionType();
1522
1523 // takes locks when called
1524 std::shared_ptr<RandomAccessFileReader> reader;
1525 Status s = GetBlobFileReader(blob_file, &reader);
1526 if (!s.ok()) {
1527 return s;
1528 }
1529
1530 assert(offset >= key.size() + sizeof(uint32_t));
1531 const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
1532 const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
1533
1534 // Allocate the buffer. This is safe in C++11
1535 std::string buf;
1536 AlignedBuf aligned_buf;
1537
1538 // A partial blob record contain checksum, key and value.
1539 Slice blob_record;
1540
1541 {
1542 StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
1543 if (reader->use_direct_io()) {
1544 s = reader->Read(IOOptions(), record_offset,
1545 static_cast<size_t>(record_size), &blob_record, nullptr,
1546 &aligned_buf);
1547 } else {
1548 buf.reserve(static_cast<size_t>(record_size));
1549 s = reader->Read(IOOptions(), record_offset,
1550 static_cast<size_t>(record_size), &blob_record, &buf[0],
1551 nullptr);
1552 }
1553 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
1554 }
1555
1556 if (!s.ok()) {
1557 ROCKS_LOG_DEBUG(
1558 db_options_.info_log,
1559 "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1560 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
1561 file_number, offset, size, key.size(), s.ToString().c_str());
1562 return s;
1563 }
1564
1565 if (blob_record.size() != record_size) {
1566 ROCKS_LOG_DEBUG(
1567 db_options_.info_log,
1568 "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1569 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
1570 ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
1571 file_number, offset, size, key.size(), blob_record.size(), record_size);
1572
1573 return Status::Corruption("Failed to retrieve blob from blob index.");
1574 }
1575
1576 Slice crc_slice(blob_record.data(), sizeof(uint32_t));
1577 Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
1578 static_cast<size_t>(size));
1579
1580 uint32_t crc_exp = 0;
1581 if (!GetFixed32(&crc_slice, &crc_exp)) {
1582 ROCKS_LOG_DEBUG(
1583 db_options_.info_log,
1584 "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
1585 ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
1586 file_number, offset, size, key.size(), s.ToString().c_str());
1587 return Status::Corruption("Unable to decode checksum.");
1588 }
1589
1590 uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
1591 blob_record.size() - sizeof(uint32_t));
1592 crc = crc32c::Mask(crc); // Adjust for storage
1593 if (crc != crc_exp) {
1594 if (debug_level_ >= 2) {
1595 ROCKS_LOG_ERROR(
1596 db_options_.info_log,
1597 "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
1598 " blob_size: %" PRIu64 " key: %s status: '%s'",
1599 file_number, offset, size,
1600 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1601 }
1602
1603 return Status::Corruption("Corruption. Blob CRC mismatch");
1604 }
1605
1606 value->PinSelf(blob_value);
1607
1608 return Status::OK();
1609 }
1610
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1611 Status BlobDBImpl::Get(const ReadOptions& read_options,
1612 ColumnFamilyHandle* column_family, const Slice& key,
1613 PinnableSlice* value) {
1614 return Get(read_options, column_family, key, value,
1615 static_cast<uint64_t*>(nullptr) /*expiration*/);
1616 }
1617
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1618 Status BlobDBImpl::Get(const ReadOptions& read_options,
1619 ColumnFamilyHandle* column_family, const Slice& key,
1620 PinnableSlice* value, uint64_t* expiration) {
1621 StopWatch get_sw(clock_, statistics_, BLOB_DB_GET_MICROS);
1622 RecordTick(statistics_, BLOB_DB_NUM_GET);
1623 return GetImpl(read_options, column_family, key, value, expiration);
1624 }
1625
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,uint64_t * expiration)1626 Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
1627 ColumnFamilyHandle* column_family, const Slice& key,
1628 PinnableSlice* value, uint64_t* expiration) {
1629 if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
1630 return Status::NotSupported(
1631 "Blob DB doesn't support non-default column family.");
1632 }
1633 // Get a snapshot to avoid blob file get deleted between we
1634 // fetch and index entry and reading from the file.
1635 // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1636 ReadOptions ro(read_options);
1637 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1638
1639 PinnableSlice index_entry;
1640 Status s;
1641 bool is_blob_index = false;
1642 DBImpl::GetImplOptions get_impl_options;
1643 get_impl_options.column_family = column_family;
1644 get_impl_options.value = &index_entry;
1645 get_impl_options.is_blob_index = &is_blob_index;
1646 s = db_impl_->GetImpl(ro, key, get_impl_options);
1647 if (expiration != nullptr) {
1648 *expiration = kNoExpiration;
1649 }
1650 RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
1651 if (s.ok()) {
1652 if (is_blob_index) {
1653 s = GetBlobValue(key, index_entry, value, expiration);
1654 } else {
1655 // The index entry is the value itself in this case.
1656 value->PinSelf(index_entry);
1657 }
1658 RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
1659 }
1660 if (snapshot_created) {
1661 db_->ReleaseSnapshot(ro.snapshot);
1662 }
1663 return s;
1664 }
1665
SanityCheck(bool aborted)1666 std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
1667 if (aborted) {
1668 return std::make_pair(false, -1);
1669 }
1670
1671 ReadLock rl(&mutex_);
1672
1673 ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
1674 ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
1675 blob_files_.size());
1676 ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
1677 open_ttl_files_.size());
1678
1679 for (const auto& blob_file : open_ttl_files_) {
1680 (void)blob_file;
1681 assert(!blob_file->Immutable());
1682 }
1683
1684 for (const auto& pair : live_imm_non_ttl_blob_files_) {
1685 const auto& blob_file = pair.second;
1686 (void)blob_file;
1687 assert(!blob_file->HasTTL());
1688 assert(blob_file->Immutable());
1689 }
1690
1691 uint64_t now = EpochNow();
1692
1693 for (auto blob_file_pair : blob_files_) {
1694 auto blob_file = blob_file_pair.second;
1695 std::ostringstream buf;
1696
1697 buf << "Blob file " << blob_file->BlobFileNumber() << ", size "
1698 << blob_file->GetFileSize() << ", blob count " << blob_file->BlobCount()
1699 << ", immutable " << blob_file->Immutable();
1700
1701 if (blob_file->HasTTL()) {
1702 ExpirationRange expiration_range;
1703 {
1704 ReadLock file_lock(&blob_file->mutex_);
1705 expiration_range = blob_file->GetExpirationRange();
1706 }
1707 buf << ", expiration range (" << expiration_range.first << ", "
1708 << expiration_range.second << ")";
1709
1710 if (!blob_file->Obsolete()) {
1711 buf << ", expire in " << (expiration_range.second - now) << "seconds";
1712 }
1713 }
1714 if (blob_file->Obsolete()) {
1715 buf << ", obsolete at " << blob_file->GetObsoleteSequence();
1716 }
1717 buf << ".";
1718 ROCKS_LOG_INFO(db_options_.info_log, "%s", buf.str().c_str());
1719 }
1720
1721 // reschedule
1722 return std::make_pair(true, -1);
1723 }
1724
CloseBlobFile(std::shared_ptr<BlobFile> bfile)1725 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
1726 TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
1727 assert(bfile);
1728 assert(!bfile->Immutable());
1729 assert(!bfile->Obsolete());
1730
1731 if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
1732 write_mutex_.AssertHeld();
1733 }
1734
1735 ROCKS_LOG_INFO(db_options_.info_log,
1736 "Closing blob file %" PRIu64 ". Path: %s",
1737 bfile->BlobFileNumber(), bfile->PathName().c_str());
1738
1739 const SequenceNumber sequence = GetLatestSequenceNumber();
1740
1741 const Status s = bfile->WriteFooterAndCloseLocked(sequence);
1742
1743 if (s.ok()) {
1744 total_blob_size_ += BlobLogFooter::kSize;
1745 } else {
1746 bfile->MarkImmutable(sequence);
1747
1748 ROCKS_LOG_ERROR(db_options_.info_log,
1749 "Failed to close blob file %" PRIu64 "with error: %s",
1750 bfile->BlobFileNumber(), s.ToString().c_str());
1751 }
1752
1753 if (bfile->HasTTL()) {
1754 size_t erased __attribute__((__unused__));
1755 erased = open_ttl_files_.erase(bfile);
1756 } else {
1757 if (bfile == open_non_ttl_file_) {
1758 open_non_ttl_file_ = nullptr;
1759 }
1760
1761 const uint64_t blob_file_number = bfile->BlobFileNumber();
1762 auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
1763 assert(it == live_imm_non_ttl_blob_files_.end() ||
1764 it->first != blob_file_number);
1765 live_imm_non_ttl_blob_files_.insert(
1766 it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
1767 blob_file_number, bfile));
1768 }
1769
1770 return s;
1771 }
1772
CloseBlobFileIfNeeded(std::shared_ptr<BlobFile> & bfile)1773 Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
1774 write_mutex_.AssertHeld();
1775
1776 // atomic read
1777 if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
1778 return Status::OK();
1779 }
1780
1781 WriteLock lock(&mutex_);
1782 WriteLock file_lock(&bfile->mutex_);
1783
1784 assert(!bfile->Obsolete() || bfile->Immutable());
1785 if (bfile->Immutable()) {
1786 return Status::OK();
1787 }
1788
1789 return CloseBlobFile(bfile);
1790 }
1791
ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,SequenceNumber obsolete_seq,bool update_size)1792 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
1793 SequenceNumber obsolete_seq,
1794 bool update_size) {
1795 assert(blob_file->Immutable());
1796 assert(!blob_file->Obsolete());
1797
1798 // Should hold write lock of mutex_ or during DB open.
1799 blob_file->MarkObsolete(obsolete_seq);
1800 obsolete_files_.push_back(blob_file);
1801 assert(total_blob_size_.load() >= blob_file->GetFileSize());
1802 if (update_size) {
1803 total_blob_size_ -= blob_file->GetFileSize();
1804 }
1805 }
1806
VisibleToActiveSnapshot(const std::shared_ptr<BlobFile> & bfile)1807 bool BlobDBImpl::VisibleToActiveSnapshot(
1808 const std::shared_ptr<BlobFile>& bfile) {
1809 assert(bfile->Obsolete());
1810
1811 // We check whether the oldest snapshot is no less than the last sequence
1812 // by the time the blob file become obsolete. If so, the blob file is not
1813 // visible to all existing snapshots.
1814 //
1815 // If we keep track of the earliest sequence of the keys in the blob file,
1816 // we could instead check if there's a snapshot falls in range
1817 // [earliest_sequence, obsolete_sequence). But doing so will make the
1818 // implementation more complicated.
1819 SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
1820 SequenceNumber oldest_snapshot = kMaxSequenceNumber;
1821 {
1822 // Need to lock DBImpl mutex before access snapshot list.
1823 InstrumentedMutexLock l(db_impl_->mutex());
1824 auto& snapshots = db_impl_->snapshots();
1825 if (!snapshots.empty()) {
1826 oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
1827 }
1828 }
1829 bool visible = oldest_snapshot < obsolete_sequence;
1830 if (visible) {
1831 ROCKS_LOG_INFO(db_options_.info_log,
1832 "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
1833 ") visible to oldest snapshot %" PRIu64 ".",
1834 bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
1835 }
1836 return visible;
1837 }
1838
EvictExpiredFiles(bool aborted)1839 std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
1840 if (aborted) {
1841 return std::make_pair(false, -1);
1842 }
1843
1844 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
1845 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
1846
1847 std::vector<std::shared_ptr<BlobFile>> process_files;
1848 uint64_t now = EpochNow();
1849 {
1850 ReadLock rl(&mutex_);
1851 for (auto p : blob_files_) {
1852 auto& blob_file = p.second;
1853 ReadLock file_lock(&blob_file->mutex_);
1854 if (blob_file->HasTTL() && !blob_file->Obsolete() &&
1855 blob_file->GetExpirationRange().second <= now) {
1856 process_files.push_back(blob_file);
1857 }
1858 }
1859 }
1860
1861 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
1862 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
1863 TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
1864
1865 SequenceNumber seq = GetLatestSequenceNumber();
1866 {
1867 MutexLock l(&write_mutex_);
1868 WriteLock lock(&mutex_);
1869 for (auto& blob_file : process_files) {
1870 WriteLock file_lock(&blob_file->mutex_);
1871
1872 // Need to double check if the file is obsolete.
1873 if (blob_file->Obsolete()) {
1874 assert(blob_file->Immutable());
1875 continue;
1876 }
1877
1878 if (!blob_file->Immutable()) {
1879 CloseBlobFile(blob_file);
1880 }
1881
1882 assert(blob_file->Immutable());
1883
1884 ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
1885 }
1886 }
1887
1888 return std::make_pair(true, -1);
1889 }
1890
SyncBlobFiles()1891 Status BlobDBImpl::SyncBlobFiles() {
1892 MutexLock l(&write_mutex_);
1893
1894 std::vector<std::shared_ptr<BlobFile>> process_files;
1895 {
1896 ReadLock rl(&mutex_);
1897 for (auto fitr : open_ttl_files_) {
1898 process_files.push_back(fitr);
1899 }
1900 if (open_non_ttl_file_ != nullptr) {
1901 process_files.push_back(open_non_ttl_file_);
1902 }
1903 }
1904
1905 Status s;
1906 for (auto& blob_file : process_files) {
1907 s = blob_file->Fsync();
1908 if (!s.ok()) {
1909 ROCKS_LOG_ERROR(db_options_.info_log,
1910 "Failed to sync blob file %" PRIu64 ", status: %s",
1911 blob_file->BlobFileNumber(), s.ToString().c_str());
1912 return s;
1913 }
1914 }
1915
1916 s = dir_ent_->Fsync();
1917 if (!s.ok()) {
1918 ROCKS_LOG_ERROR(db_options_.info_log,
1919 "Failed to sync blob directory, status: %s",
1920 s.ToString().c_str());
1921 }
1922 return s;
1923 }
1924
ReclaimOpenFiles(bool aborted)1925 std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
1926 if (aborted) return std::make_pair(false, -1);
1927
1928 if (open_file_count_.load() < kOpenFilesTrigger) {
1929 return std::make_pair(true, -1);
1930 }
1931
1932 // in the future, we should sort by last_access_
1933 // instead of closing every file
1934 ReadLock rl(&mutex_);
1935 for (auto const& ent : blob_files_) {
1936 auto bfile = ent.second;
1937 if (bfile->last_access_.load() == -1) continue;
1938
1939 WriteLock lockbfile_w(&bfile->mutex_);
1940 CloseRandomAccessLocked(bfile);
1941 }
1942
1943 return std::make_pair(true, -1);
1944 }
1945
DeleteObsoleteFiles(bool aborted)1946 std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
1947 if (aborted) {
1948 return std::make_pair(false, -1);
1949 }
1950
1951 MutexLock delete_file_lock(&delete_file_mutex_);
1952 if (disable_file_deletions_ > 0) {
1953 return std::make_pair(true, -1);
1954 }
1955
1956 std::list<std::shared_ptr<BlobFile>> tobsolete;
1957 {
1958 WriteLock wl(&mutex_);
1959 if (obsolete_files_.empty()) {
1960 return std::make_pair(true, -1);
1961 }
1962 tobsolete.swap(obsolete_files_);
1963 }
1964
1965 bool file_deleted = false;
1966 for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
1967 auto bfile = *iter;
1968 {
1969 ReadLock lockbfile_r(&bfile->mutex_);
1970 if (VisibleToActiveSnapshot(bfile)) {
1971 ROCKS_LOG_INFO(db_options_.info_log,
1972 "Could not delete file due to snapshot failure %s",
1973 bfile->PathName().c_str());
1974 ++iter;
1975 continue;
1976 }
1977 }
1978 ROCKS_LOG_INFO(db_options_.info_log,
1979 "Will delete file due to snapshot success %s",
1980 bfile->PathName().c_str());
1981
1982 {
1983 WriteLock wl(&mutex_);
1984 blob_files_.erase(bfile->BlobFileNumber());
1985 }
1986
1987 Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
1988 bfile->PathName(), blob_dir_, true,
1989 /*force_fg=*/false);
1990 if (!s.ok()) {
1991 ROCKS_LOG_ERROR(db_options_.info_log,
1992 "File failed to be deleted as obsolete %s",
1993 bfile->PathName().c_str());
1994 ++iter;
1995 continue;
1996 }
1997
1998 file_deleted = true;
1999 ROCKS_LOG_INFO(db_options_.info_log,
2000 "File deleted as obsolete from blob dir %s",
2001 bfile->PathName().c_str());
2002
2003 iter = tobsolete.erase(iter);
2004 }
2005
2006 // directory change. Fsync
2007 if (file_deleted) {
2008 Status s = dir_ent_->Fsync();
2009 if (!s.ok()) {
2010 ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
2011 blob_dir_.c_str(), s.ToString().c_str());
2012 }
2013 }
2014
2015 // put files back into obsolete if for some reason, delete failed
2016 if (!tobsolete.empty()) {
2017 WriteLock wl(&mutex_);
2018 for (auto bfile : tobsolete) {
2019 blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
2020 obsolete_files_.push_front(bfile);
2021 }
2022 }
2023
2024 return std::make_pair(!aborted, -1);
2025 }
2026
CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>> * bfiles_copy)2027 void BlobDBImpl::CopyBlobFiles(
2028 std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
2029 ReadLock rl(&mutex_);
2030 for (auto const& p : blob_files_) {
2031 bfiles_copy->push_back(p.second);
2032 }
2033 }
2034
NewIterator(const ReadOptions & read_options)2035 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
2036 auto* cfd =
2037 static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
2038 ->cfd();
2039 // Get a snapshot to avoid blob file get deleted between we
2040 // fetch and index entry and reading from the file.
2041 ManagedSnapshot* own_snapshot = nullptr;
2042 const Snapshot* snapshot = read_options.snapshot;
2043 if (snapshot == nullptr) {
2044 own_snapshot = new ManagedSnapshot(db_);
2045 snapshot = own_snapshot->snapshot();
2046 }
2047 auto* iter = db_impl_->NewIteratorImpl(
2048 read_options, cfd, snapshot->GetSequenceNumber(),
2049 nullptr /*read_callback*/, true /*expose_blob_index*/);
2050 return new BlobDBIterator(own_snapshot, iter, this, clock_, statistics_);
2051 }
2052
DestroyBlobDB(const std::string & dbname,const Options & options,const BlobDBOptions & bdb_options)2053 Status DestroyBlobDB(const std::string& dbname, const Options& options,
2054 const BlobDBOptions& bdb_options) {
2055 const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
2056 Env* env = soptions.env;
2057
2058 Status status;
2059 std::string blobdir;
2060 blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
2061 : bdb_options.blob_dir;
2062
2063 std::vector<std::string> filenames;
2064 if (env->GetChildren(blobdir, &filenames).ok()) {
2065 for (const auto& f : filenames) {
2066 uint64_t number;
2067 FileType type;
2068 if (ParseFileName(f, &number, &type) && type == kBlobFile) {
2069 Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
2070 /*force_fg=*/false);
2071 if (status.ok() && !del.ok()) {
2072 status = del;
2073 }
2074 }
2075 }
2076 // TODO: What to do if we cannot delete the directory?
2077 env->DeleteDir(blobdir).PermitUncheckedError();
2078 }
2079 Status destroy = DestroyDB(dbname, options);
2080 if (status.ok() && !destroy.ok()) {
2081 status = destroy;
2082 }
2083
2084 return status;
2085 }
2086
2087 #ifndef NDEBUG
TEST_GetBlobValue(const Slice & key,const Slice & index_entry,PinnableSlice * value)2088 Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
2089 PinnableSlice* value) {
2090 return GetBlobValue(key, index_entry, value);
2091 }
2092
TEST_AddDummyBlobFile(uint64_t blob_file_number,SequenceNumber immutable_sequence)2093 void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
2094 SequenceNumber immutable_sequence) {
2095 auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
2096 db_options_.info_log.get());
2097 blob_file->MarkImmutable(immutable_sequence);
2098
2099 blob_files_[blob_file_number] = blob_file;
2100 live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
2101 }
2102
TEST_GetBlobFiles() const2103 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
2104 ReadLock l(&mutex_);
2105 std::vector<std::shared_ptr<BlobFile>> blob_files;
2106 for (auto& p : blob_files_) {
2107 blob_files.emplace_back(p.second);
2108 }
2109 return blob_files;
2110 }
2111
TEST_GetLiveImmNonTTLFiles() const2112 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
2113 const {
2114 ReadLock l(&mutex_);
2115 std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
2116 for (const auto& pair : live_imm_non_ttl_blob_files_) {
2117 live_imm_non_ttl_files.emplace_back(pair.second);
2118 }
2119 return live_imm_non_ttl_files;
2120 }
2121
TEST_GetObsoleteFiles() const2122 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
2123 const {
2124 ReadLock l(&mutex_);
2125 std::vector<std::shared_ptr<BlobFile>> obsolete_files;
2126 for (auto& bfile : obsolete_files_) {
2127 obsolete_files.emplace_back(bfile);
2128 }
2129 return obsolete_files;
2130 }
2131
TEST_DeleteObsoleteFiles()2132 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
2133 DeleteObsoleteFiles(false /*abort*/);
2134 }
2135
TEST_CloseBlobFile(std::shared_ptr<BlobFile> & bfile)2136 Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
2137 MutexLock l(&write_mutex_);
2138 WriteLock lock(&mutex_);
2139 WriteLock file_lock(&bfile->mutex_);
2140
2141 return CloseBlobFile(bfile);
2142 }
2143
TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile> & blob_file,SequenceNumber obsolete_seq,bool update_size)2144 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
2145 SequenceNumber obsolete_seq,
2146 bool update_size) {
2147 return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
2148 }
2149
TEST_EvictExpiredFiles()2150 void BlobDBImpl::TEST_EvictExpiredFiles() {
2151 EvictExpiredFiles(false /*abort*/);
2152 }
2153
TEST_live_sst_size()2154 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
2155
TEST_InitializeBlobFileToSstMapping(const std::vector<LiveFileMetaData> & live_files)2156 void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
2157 const std::vector<LiveFileMetaData>& live_files) {
2158 InitializeBlobFileToSstMapping(live_files);
2159 }
2160
TEST_ProcessFlushJobInfo(const FlushJobInfo & info)2161 void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
2162 ProcessFlushJobInfo(info);
2163 }
2164
TEST_ProcessCompactionJobInfo(const CompactionJobInfo & info)2165 void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
2166 ProcessCompactionJobInfo(info);
2167 }
2168
2169 #endif // !NDEBUG
2170
2171 } // namespace blob_db
2172 } // namespace ROCKSDB_NAMESPACE
2173 #endif // ROCKSDB_LITE
2174