1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <stdint.h>
12 #ifdef OS_SOLARIS
13 #include <alloca.h>
14 #endif
15
16 #include <algorithm>
17 #include <cinttypes>
18 #include <cstdio>
19 #include <map>
20 #include <set>
21 #include <sstream>
22 #include <stdexcept>
23 #include <string>
24 #include <unordered_map>
25 #include <utility>
26 #include <vector>
27
28 #include "db/arena_wrapped_db_iter.h"
29 #include "db/builder.h"
30 #include "db/compaction/compaction_job.h"
31 #include "db/db_info_dumper.h"
32 #include "db/db_iter.h"
33 #include "db/dbformat.h"
34 #include "db/error_handler.h"
35 #include "db/event_helpers.h"
36 #include "db/external_sst_file_ingestion_job.h"
37 #include "db/flush_job.h"
38 #include "db/forward_iterator.h"
39 #include "db/import_column_family_job.h"
40 #include "db/job_context.h"
41 #include "db/log_reader.h"
42 #include "db/log_writer.h"
43 #include "db/malloc_stats.h"
44 #include "db/memtable.h"
45 #include "db/memtable_list.h"
46 #include "db/merge_context.h"
47 #include "db/merge_helper.h"
48 #include "db/periodic_work_scheduler.h"
49 #include "db/range_tombstone_fragmenter.h"
50 #include "db/table_cache.h"
51 #include "db/table_properties_collector.h"
52 #include "db/transaction_log_impl.h"
53 #include "db/version_set.h"
54 #include "db/write_batch_internal.h"
55 #include "db/write_callback.h"
56 #include "file/file_util.h"
57 #include "file/filename.h"
58 #include "file/random_access_file_reader.h"
59 #include "file/sst_file_manager_impl.h"
60 #include "logging/auto_roll_logger.h"
61 #include "logging/log_buffer.h"
62 #include "logging/logging.h"
63 #include "memtable/hash_linklist_rep.h"
64 #include "memtable/hash_skiplist_rep.h"
65 #include "monitoring/in_memory_stats_history.h"
66 #include "monitoring/iostats_context_imp.h"
67 #include "monitoring/perf_context_imp.h"
68 #include "monitoring/persistent_stats_history.h"
69 #include "monitoring/thread_status_updater.h"
70 #include "monitoring/thread_status_util.h"
71 #include "options/cf_options.h"
72 #include "options/options_helper.h"
73 #include "options/options_parser.h"
74 #include "port/port.h"
75 #include "rocksdb/cache.h"
76 #include "rocksdb/compaction_filter.h"
77 #include "rocksdb/convenience.h"
78 #include "rocksdb/db.h"
79 #include "rocksdb/env.h"
80 #include "rocksdb/merge_operator.h"
81 #include "rocksdb/statistics.h"
82 #include "rocksdb/stats_history.h"
83 #include "rocksdb/status.h"
84 #include "rocksdb/table.h"
85 #include "rocksdb/version.h"
86 #include "rocksdb/write_buffer_manager.h"
87 #include "table/block_based/block.h"
88 #include "table/block_based/block_based_table_factory.h"
89 #include "table/get_context.h"
90 #include "table/merging_iterator.h"
91 #include "table/multiget_context.h"
92 #include "table/sst_file_dumper.h"
93 #include "table/table_builder.h"
94 #include "table/two_level_iterator.h"
95 #include "test_util/sync_point.h"
96 #include "util/autovector.h"
97 #include "util/cast_util.h"
98 #include "util/coding.h"
99 #include "util/compression.h"
100 #include "util/crc32c.h"
101 #include "util/mutexlock.h"
102 #include "util/stop_watch.h"
103 #include "util/string_util.h"
104
105 namespace ROCKSDB_NAMESPACE {
106
107 const std::string kDefaultColumnFamilyName("default");
108 const std::string kPersistentStatsColumnFamilyName(
109 "___rocksdb_stats_history___");
110 void DumpRocksDBBuildVersion(Logger* log);
111
GetCompressionFlush(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)112 CompressionType GetCompressionFlush(
113 const ImmutableCFOptions& ioptions,
114 const MutableCFOptions& mutable_cf_options) {
115 // Compressing memtable flushes might not help unless the sequential load
116 // optimization is used for leveled compaction. Otherwise the CPU and
117 // latency overhead is not offset by saving much space.
118 if (ioptions.compaction_style == kCompactionStyleUniversal) {
119 if (mutable_cf_options.compaction_options_universal
120 .compression_size_percent < 0) {
121 return mutable_cf_options.compression;
122 } else {
123 return kNoCompression;
124 }
125 } else if (!ioptions.compression_per_level.empty()) {
126 // For leveled compress when min_level_to_compress != 0.
127 return ioptions.compression_per_level[0];
128 } else {
129 return mutable_cf_options.compression;
130 }
131 }
132
133 namespace {
DumpSupportInfo(Logger * logger)134 void DumpSupportInfo(Logger* logger) {
135 ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
136 for (auto& compression : OptionsHelper::compression_type_string_map) {
137 if (compression.second != kNoCompression &&
138 compression.second != kDisableCompressionOption) {
139 ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
140 CompressionTypeSupported(compression.second));
141 }
142 }
143 ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
144 crc32c::IsFastCrc32Supported().c_str());
145 }
146 } // namespace
147
DBImpl(const DBOptions & options,const std::string & dbname,const bool seq_per_batch,const bool batch_per_txn,bool read_only)148 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
149 const bool seq_per_batch, const bool batch_per_txn,
150 bool read_only)
151 : dbname_(dbname),
152 own_info_log_(options.info_log == nullptr),
153 initial_db_options_(SanitizeOptions(dbname, options, read_only)),
154 env_(initial_db_options_.env),
155 io_tracer_(std::make_shared<IOTracer>()),
156 immutable_db_options_(initial_db_options_),
157 fs_(immutable_db_options_.fs, io_tracer_),
158 mutable_db_options_(initial_db_options_),
159 stats_(immutable_db_options_.stats),
160 mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
161 immutable_db_options_.use_adaptive_mutex),
162 default_cf_handle_(nullptr),
163 error_handler_(this, immutable_db_options_, &mutex_),
164 event_logger_(immutable_db_options_.info_log.get()),
165 max_total_in_memory_state_(0),
166 file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
167 file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
168 file_options_, immutable_db_options_)),
169 seq_per_batch_(seq_per_batch),
170 batch_per_txn_(batch_per_txn),
171 next_job_id_(1),
172 shutting_down_(false),
173 db_lock_(nullptr),
174 manual_compaction_paused_(false),
175 bg_cv_(&mutex_),
176 logfile_number_(0),
177 log_dir_synced_(false),
178 log_empty_(true),
179 persist_stats_cf_handle_(nullptr),
180 log_sync_cv_(&mutex_),
181 total_log_size_(0),
182 is_snapshot_supported_(true),
183 write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
184 write_thread_(immutable_db_options_),
185 nonmem_write_thread_(immutable_db_options_),
186 write_controller_(mutable_db_options_.delayed_write_rate),
187 last_batch_group_size_(0),
188 unscheduled_flushes_(0),
189 unscheduled_compactions_(0),
190 bg_bottom_compaction_scheduled_(0),
191 bg_compaction_scheduled_(0),
192 num_running_compactions_(0),
193 bg_flush_scheduled_(0),
194 num_running_flushes_(0),
195 bg_purge_scheduled_(0),
196 disable_delete_obsolete_files_(0),
197 pending_purge_obsolete_files_(0),
198 delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
199 last_stats_dump_time_microsec_(0),
200 has_unpersisted_data_(false),
201 unable_to_release_oldest_log_(false),
202 num_running_ingest_file_(0),
203 #ifndef ROCKSDB_LITE
204 wal_manager_(immutable_db_options_, file_options_, io_tracer_,
205 seq_per_batch),
206 #endif // ROCKSDB_LITE
207 bg_work_paused_(0),
208 bg_compaction_paused_(0),
209 refitting_level_(false),
210 opened_successfully_(false),
211 #ifndef ROCKSDB_LITE
212 periodic_work_scheduler_(nullptr),
213 #endif // ROCKSDB_LITE
214 two_write_queues_(options.two_write_queues),
215 manual_wal_flush_(options.manual_wal_flush),
216 // last_sequencee_ is always maintained by the main queue that also writes
217 // to the memtable. When two_write_queues_ is disabled last seq in
218 // memtable is the same as last seq published to the readers. When it is
219 // enabled but seq_per_batch_ is disabled, last seq in memtable still
220 // indicates last published seq since wal-only writes that go to the 2nd
221 // queue do not consume a sequence number. Otherwise writes performed by
222 // the 2nd queue could change what is visible to the readers. In this
223 // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
224 // separate variable to indicate the last published sequence.
225 last_seq_same_as_publish_seq_(
226 !(seq_per_batch && options.two_write_queues)),
227 // Since seq_per_batch_ is currently set only by WritePreparedTxn which
228 // requires a custom gc for compaction, we use that to set use_custom_gc_
229 // as well.
230 use_custom_gc_(seq_per_batch),
231 shutdown_initiated_(false),
232 own_sfm_(options.sst_file_manager == nullptr),
233 preserve_deletes_(options.preserve_deletes),
234 closed_(false),
235 atomic_flush_install_cv_(&mutex_),
236 blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
237 &error_handler_) {
238 // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
239 // WriteUnprepared, which should use seq_per_batch_.
240 assert(batch_per_txn_ || seq_per_batch_);
241 // TODO: Check for an error here
242 env_->GetAbsolutePath(dbname, &db_absolute_path_).PermitUncheckedError();
243
244 // Reserve ten files or so for other uses and give the rest to TableCache.
245 // Give a large number for setting of "infinite" open files.
246 const int table_cache_size = (mutable_db_options_.max_open_files == -1)
247 ? TableCache::kInfiniteCapacity
248 : mutable_db_options_.max_open_files - 10;
249 LRUCacheOptions co;
250 co.capacity = table_cache_size;
251 co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
252 co.metadata_charge_policy = kDontChargeCacheMetadata;
253 table_cache_ = NewLRUCache(co);
254
255 versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
256 table_cache_.get(), write_buffer_manager_,
257 &write_controller_, &block_cache_tracer_,
258 io_tracer_));
259 column_family_memtables_.reset(
260 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
261
262 DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
263 SetDbSessionId();
264 DumpDBFileSummary(immutable_db_options_, dbname_, db_session_id_);
265 immutable_db_options_.Dump(immutable_db_options_.info_log.get());
266 mutable_db_options_.Dump(immutable_db_options_.info_log.get());
267 DumpSupportInfo(immutable_db_options_.info_log.get());
268
269 // always open the DB with 0 here, which means if preserve_deletes_==true
270 // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
271 // is called by client and this seqnum is advanced.
272 preserve_deletes_seqnum_.store(0);
273
274 if (write_buffer_manager_) {
275 wbm_stall_.reset(new WBMStallInterface());
276 }
277 }
278
Resume()279 Status DBImpl::Resume() {
280 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
281
282 InstrumentedMutexLock db_mutex(&mutex_);
283
284 if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
285 // Nothing to do
286 return Status::OK();
287 }
288
289 if (error_handler_.IsRecoveryInProgress()) {
290 // Don't allow a mix of manual and automatic recovery
291 return Status::Busy();
292 }
293
294 mutex_.Unlock();
295 Status s = error_handler_.RecoverFromBGError(true);
296 mutex_.Lock();
297 return s;
298 }
299
300 // This function implements the guts of recovery from a background error. It
301 // is eventually called for both manual as well as automatic recovery. It does
302 // the following -
303 // 1. Wait for currently scheduled background flush/compaction to exit, in
304 // order to inadvertently causing an error and thinking recovery failed
305 // 2. Flush memtables if there's any data for all the CFs. This may result
306 // another error, which will be saved by error_handler_ and reported later
307 // as the recovery status
308 // 3. Find and delete any obsolete files
309 // 4. Schedule compactions if needed for all the CFs. This is needed as the
310 // flush in the prior step might have been a no-op for some CFs, which
311 // means a new super version wouldn't have been installed
ResumeImpl(DBRecoverContext context)312 Status DBImpl::ResumeImpl(DBRecoverContext context) {
313 mutex_.AssertHeld();
314 WaitForBackgroundWork();
315
316 Status s;
317 if (shutdown_initiated_) {
318 // Returning shutdown status to SFM during auto recovery will cause it
319 // to abort the recovery and allow the shutdown to progress
320 s = Status::ShutdownInProgress();
321 }
322
323 if (s.ok()) {
324 Status bg_error = error_handler_.GetBGError();
325 if (bg_error.severity() > Status::Severity::kHardError) {
326 ROCKS_LOG_INFO(
327 immutable_db_options_.info_log,
328 "DB resume requested but failed due to Fatal/Unrecoverable error");
329 s = bg_error;
330 }
331 }
332
333 // Make sure the IO Status stored in version set is set to OK.
334 bool file_deletion_disabled = !IsFileDeletionsEnabled();
335 if (s.ok()) {
336 IOStatus io_s = versions_->io_status();
337 if (io_s.IsIOError()) {
338 // If resuming from IOError resulted from MANIFEST write, then assert
339 // that we must have already set the MANIFEST writer to nullptr during
340 // clean-up phase MANIFEST writing. We must have also disabled file
341 // deletions.
342 assert(!versions_->descriptor_log_);
343 assert(file_deletion_disabled);
344 // Since we are trying to recover from MANIFEST write error, we need to
345 // switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
346 // Therefore, force writing a dummy version edit because we do not know
347 // whether there are flush jobs with non-empty data to flush, triggering
348 // appends to MANIFEST.
349 VersionEdit edit;
350 auto cfh =
351 static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
352 assert(cfh);
353 ColumnFamilyData* cfd = cfh->cfd();
354 const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
355 s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
356 directories_.GetDbDir());
357 if (!s.ok()) {
358 io_s = versions_->io_status();
359 if (!io_s.ok()) {
360 s = error_handler_.SetBGError(io_s,
361 BackgroundErrorReason::kManifestWrite);
362 }
363 }
364 }
365 }
366
367 // We cannot guarantee consistency of the WAL. So force flush Memtables of
368 // all the column families
369 if (s.ok()) {
370 FlushOptions flush_opts;
371 // We allow flush to stall write since we are trying to resume from error.
372 flush_opts.allow_write_stall = true;
373 if (immutable_db_options_.atomic_flush) {
374 autovector<ColumnFamilyData*> cfds;
375 SelectColumnFamiliesForAtomicFlush(&cfds);
376 mutex_.Unlock();
377 s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
378 mutex_.Lock();
379 } else {
380 for (auto cfd : *versions_->GetColumnFamilySet()) {
381 if (cfd->IsDropped()) {
382 continue;
383 }
384 cfd->Ref();
385 mutex_.Unlock();
386 s = FlushMemTable(cfd, flush_opts, context.flush_reason);
387 mutex_.Lock();
388 cfd->UnrefAndTryDelete();
389 if (!s.ok()) {
390 break;
391 }
392 }
393 }
394 if (!s.ok()) {
395 ROCKS_LOG_INFO(immutable_db_options_.info_log,
396 "DB resume requested but failed due to Flush failure [%s]",
397 s.ToString().c_str());
398 }
399 }
400
401 JobContext job_context(0);
402 FindObsoleteFiles(&job_context, true);
403 if (s.ok()) {
404 s = error_handler_.ClearBGError();
405 } else {
406 // NOTE: this is needed to pass ASSERT_STATUS_CHECKED
407 // in the DBSSTTest.DBWithMaxSpaceAllowedRandomized test.
408 // See https://github.com/facebook/rocksdb/pull/7715#issuecomment-754947952
409 error_handler_.GetRecoveryError().PermitUncheckedError();
410 }
411 mutex_.Unlock();
412
413 job_context.manifest_file_number = 1;
414 if (job_context.HaveSomethingToDelete()) {
415 PurgeObsoleteFiles(job_context);
416 }
417 job_context.Clean();
418
419 if (s.ok()) {
420 assert(versions_->io_status().ok());
421 // If we reach here, we should re-enable file deletions if it was disabled
422 // during previous error handling.
423 if (file_deletion_disabled) {
424 // Always return ok
425 s = EnableFileDeletions(/*force=*/true);
426 if (!s.ok()) {
427 ROCKS_LOG_INFO(
428 immutable_db_options_.info_log,
429 "DB resume requested but could not enable file deletions [%s]",
430 s.ToString().c_str());
431 }
432 }
433 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
434 }
435 mutex_.Lock();
436 // Check for shutdown again before scheduling further compactions,
437 // since we released and re-acquired the lock above
438 if (shutdown_initiated_) {
439 s = Status::ShutdownInProgress();
440 }
441 if (s.ok()) {
442 for (auto cfd : *versions_->GetColumnFamilySet()) {
443 SchedulePendingCompaction(cfd);
444 }
445 MaybeScheduleFlushOrCompaction();
446 }
447
448 // Wake up any waiters - in this case, it could be the shutdown thread
449 bg_cv_.SignalAll();
450
451 // No need to check BGError again. If something happened, event listener would
452 // be notified and the operation causing it would have failed
453 return s;
454 }
455
WaitForBackgroundWork()456 void DBImpl::WaitForBackgroundWork() {
457 // Wait for background work to finish
458 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
459 bg_flush_scheduled_) {
460 bg_cv_.Wait();
461 }
462 }
463
464 // Will lock the mutex_, will wait for completion if wait is true
CancelAllBackgroundWork(bool wait)465 void DBImpl::CancelAllBackgroundWork(bool wait) {
466 ROCKS_LOG_INFO(immutable_db_options_.info_log,
467 "Shutdown: canceling all background work");
468
469 #ifndef ROCKSDB_LITE
470 if (periodic_work_scheduler_ != nullptr) {
471 periodic_work_scheduler_->Unregister(this);
472 }
473 #endif // !ROCKSDB_LITE
474
475 InstrumentedMutexLock l(&mutex_);
476 if (!shutting_down_.load(std::memory_order_acquire) &&
477 has_unpersisted_data_.load(std::memory_order_relaxed) &&
478 !mutable_db_options_.avoid_flush_during_shutdown) {
479 if (immutable_db_options_.atomic_flush) {
480 autovector<ColumnFamilyData*> cfds;
481 SelectColumnFamiliesForAtomicFlush(&cfds);
482 mutex_.Unlock();
483 Status s =
484 AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
485 s.PermitUncheckedError(); //**TODO: What to do on error?
486 mutex_.Lock();
487 } else {
488 for (auto cfd : *versions_->GetColumnFamilySet()) {
489 if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
490 cfd->Ref();
491 mutex_.Unlock();
492 Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
493 s.PermitUncheckedError(); //**TODO: What to do on error?
494 mutex_.Lock();
495 cfd->UnrefAndTryDelete();
496 }
497 }
498 }
499 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
500 }
501
502 shutting_down_.store(true, std::memory_order_release);
503 bg_cv_.SignalAll();
504 if (!wait) {
505 return;
506 }
507 WaitForBackgroundWork();
508 }
509
CloseHelper()510 Status DBImpl::CloseHelper() {
511 // Guarantee that there is no background error recovery in progress before
512 // continuing with the shutdown
513 mutex_.Lock();
514 shutdown_initiated_ = true;
515 error_handler_.CancelErrorRecovery();
516 while (error_handler_.IsRecoveryInProgress()) {
517 bg_cv_.Wait();
518 }
519 mutex_.Unlock();
520
521 // Below check is added as recovery_error_ is not checked and it causes crash
522 // in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
523 // reached.
524 error_handler_.GetRecoveryError().PermitUncheckedError();
525
526 // CancelAllBackgroundWork called with false means we just set the shutdown
527 // marker. After this we do a variant of the waiting and unschedule work
528 // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
529 CancelAllBackgroundWork(false);
530 mutex_.Lock();
531 env_->UnSchedule(this, Env::Priority::BOTTOM);
532 env_->UnSchedule(this, Env::Priority::LOW);
533 env_->UnSchedule(this, Env::Priority::HIGH);
534 Status ret = Status::OK();
535
536 // Wait for background work to finish
537 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
538 bg_flush_scheduled_ || bg_purge_scheduled_ ||
539 pending_purge_obsolete_files_ ||
540 error_handler_.IsRecoveryInProgress()) {
541 TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
542 bg_cv_.Wait();
543 }
544 TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
545 &files_grabbed_for_purge_);
546 EraseThreadStatusDbInfo();
547 flush_scheduler_.Clear();
548 trim_history_scheduler_.Clear();
549
550 while (!flush_queue_.empty()) {
551 const FlushRequest& flush_req = PopFirstFromFlushQueue();
552 for (const auto& iter : flush_req) {
553 iter.first->UnrefAndTryDelete();
554 }
555 }
556 while (!compaction_queue_.empty()) {
557 auto cfd = PopFirstFromCompactionQueue();
558 cfd->UnrefAndTryDelete();
559 }
560
561 if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
562 // we need to delete handle outside of lock because it does its own locking
563 mutex_.Unlock();
564 if (default_cf_handle_) {
565 delete default_cf_handle_;
566 default_cf_handle_ = nullptr;
567 }
568 if (persist_stats_cf_handle_) {
569 delete persist_stats_cf_handle_;
570 persist_stats_cf_handle_ = nullptr;
571 }
572 mutex_.Lock();
573 }
574
575 // Clean up obsolete files due to SuperVersion release.
576 // (1) Need to delete to obsolete files before closing because RepairDB()
577 // scans all existing files in the file system and builds manifest file.
578 // Keeping obsolete files confuses the repair process.
579 // (2) Need to check if we Open()/Recover() the DB successfully before
580 // deleting because if VersionSet recover fails (may be due to corrupted
581 // manifest file), it is not able to identify live files correctly. As a
582 // result, all "live" files can get deleted by accident. However, corrupted
583 // manifest is recoverable by RepairDB().
584 if (opened_successfully_) {
585 JobContext job_context(next_job_id_.fetch_add(1));
586 FindObsoleteFiles(&job_context, true);
587
588 mutex_.Unlock();
589 // manifest number starting from 2
590 job_context.manifest_file_number = 1;
591 if (job_context.HaveSomethingToDelete()) {
592 PurgeObsoleteFiles(job_context);
593 }
594 job_context.Clean();
595 mutex_.Lock();
596 }
597
598 for (auto l : logs_to_free_) {
599 delete l;
600 }
601 for (auto& log : logs_) {
602 uint64_t log_number = log.writer->get_log_number();
603 Status s = log.ClearWriter();
604 if (!s.ok()) {
605 ROCKS_LOG_WARN(
606 immutable_db_options_.info_log,
607 "Unable to Sync WAL file %s with error -- %s",
608 LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
609 s.ToString().c_str());
610 // Retain the first error
611 if (ret.ok()) {
612 ret = s;
613 }
614 }
615 }
616 logs_.clear();
617
618 // Table cache may have table handles holding blocks from the block cache.
619 // We need to release them before the block cache is destroyed. The block
620 // cache may be destroyed inside versions_.reset(), when column family data
621 // list is destroyed, so leaving handles in table cache after
622 // versions_.reset() may cause issues.
623 // Here we clean all unreferenced handles in table cache.
624 // Now we assume all user queries have finished, so only version set itself
625 // can possibly hold the blocks from block cache. After releasing unreferenced
626 // handles here, only handles held by version set left and inside
627 // versions_.reset(), we will release them. There, we need to make sure every
628 // time a handle is released, we erase it from the cache too. By doing that,
629 // we can guarantee that after versions_.reset(), table cache is empty
630 // so the cache can be safely destroyed.
631 table_cache_->EraseUnRefEntries();
632
633 for (auto& txn_entry : recovered_transactions_) {
634 delete txn_entry.second;
635 }
636
637 // versions need to be destroyed before table_cache since it can hold
638 // references to table_cache.
639 versions_.reset();
640 mutex_.Unlock();
641 if (db_lock_ != nullptr) {
642 // TODO: Check for unlock error
643 env_->UnlockFile(db_lock_).PermitUncheckedError();
644 }
645
646 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
647 LogFlush(immutable_db_options_.info_log);
648
649 #ifndef ROCKSDB_LITE
650 // If the sst_file_manager was allocated by us during DB::Open(), ccall
651 // Close() on it before closing the info_log. Otherwise, background thread
652 // in SstFileManagerImpl might try to log something
653 if (immutable_db_options_.sst_file_manager && own_sfm_) {
654 auto sfm = static_cast<SstFileManagerImpl*>(
655 immutable_db_options_.sst_file_manager.get());
656 sfm->Close();
657 }
658 #endif // ROCKSDB_LITE
659
660 if (immutable_db_options_.info_log && own_info_log_) {
661 Status s = immutable_db_options_.info_log->Close();
662 if (!s.ok() && !s.IsNotSupported() && ret.ok()) {
663 ret = s;
664 }
665 }
666
667 if (write_buffer_manager_ && wbm_stall_) {
668 write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get());
669 }
670
671 if (ret.IsAborted()) {
672 // Reserve IsAborted() error for those where users didn't release
673 // certain resource and they can release them and come back and
674 // retry. In this case, we wrap this exception to something else.
675 return Status::Incomplete(ret.ToString());
676 }
677 return ret;
678 }
679
CloseImpl()680 Status DBImpl::CloseImpl() { return CloseHelper(); }
681
~DBImpl()682 DBImpl::~DBImpl() {
683 if (!closed_) {
684 closed_ = true;
685 CloseHelper().PermitUncheckedError();
686 }
687 }
688
MaybeIgnoreError(Status * s) const689 void DBImpl::MaybeIgnoreError(Status* s) const {
690 if (s->ok() || immutable_db_options_.paranoid_checks) {
691 // No change needed
692 } else {
693 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
694 s->ToString().c_str());
695 *s = Status::OK();
696 }
697 }
698
CreateArchivalDirectory()699 const Status DBImpl::CreateArchivalDirectory() {
700 if (immutable_db_options_.WAL_ttl_seconds > 0 ||
701 immutable_db_options_.WAL_size_limit_MB > 0) {
702 std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
703 return env_->CreateDirIfMissing(archivalPath);
704 }
705 return Status::OK();
706 }
707
PrintStatistics()708 void DBImpl::PrintStatistics() {
709 auto dbstats = immutable_db_options_.stats;
710 if (dbstats) {
711 ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
712 dbstats->ToString().c_str());
713 }
714 }
715
StartPeriodicWorkScheduler()716 void DBImpl::StartPeriodicWorkScheduler() {
717 #ifndef ROCKSDB_LITE
718
719 #ifndef NDEBUG
720 // It only used by test to disable scheduler
721 bool disable_scheduler = false;
722 TEST_SYNC_POINT_CALLBACK(
723 "DBImpl::StartPeriodicWorkScheduler:DisableScheduler",
724 &disable_scheduler);
725 if (disable_scheduler) {
726 return;
727 }
728 #endif // !NDEBUG
729
730 {
731 InstrumentedMutexLock l(&mutex_);
732 periodic_work_scheduler_ = PeriodicWorkScheduler::Default();
733 TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init",
734 &periodic_work_scheduler_);
735 }
736
737 periodic_work_scheduler_->Register(
738 this, mutable_db_options_.stats_dump_period_sec,
739 mutable_db_options_.stats_persist_period_sec);
740 #endif // !ROCKSDB_LITE
741 }
742
743 // esitmate the total size of stats_history_
EstimateInMemoryStatsHistorySize() const744 size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
745 size_t size_total =
746 sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
747 if (stats_history_.size() == 0) return size_total;
748 size_t size_per_slice =
749 sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
750 // non-empty map, stats_history_.begin() guaranteed to exist
751 std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
752 for (const auto& pairs : sample_slice) {
753 size_per_slice +=
754 pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
755 }
756 size_total = size_per_slice * stats_history_.size();
757 return size_total;
758 }
759
PersistStats()760 void DBImpl::PersistStats() {
761 TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
762 #ifndef ROCKSDB_LITE
763 if (shutdown_initiated_) {
764 return;
765 }
766 TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
767 uint64_t now_seconds =
768 immutable_db_options_.clock->NowMicros() / kMicrosInSecond;
769
770 Statistics* statistics = immutable_db_options_.stats;
771 if (!statistics) {
772 return;
773 }
774 size_t stats_history_size_limit = 0;
775 {
776 InstrumentedMutexLock l(&mutex_);
777 stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
778 }
779
780 std::map<std::string, uint64_t> stats_map;
781 if (!statistics->getTickerMap(&stats_map)) {
782 return;
783 }
784 ROCKS_LOG_INFO(immutable_db_options_.info_log,
785 "------- PERSISTING STATS -------");
786
787 if (immutable_db_options_.persist_stats_to_disk) {
788 WriteBatch batch;
789 Status s = Status::OK();
790 if (stats_slice_initialized_) {
791 ROCKS_LOG_INFO(immutable_db_options_.info_log,
792 "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
793 stats_slice_.size());
794 for (const auto& stat : stats_map) {
795 if (s.ok()) {
796 char key[100];
797 int length =
798 EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
799 // calculate the delta from last time
800 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
801 uint64_t delta = stat.second - stats_slice_[stat.first];
802 s = batch.Put(persist_stats_cf_handle_,
803 Slice(key, std::min(100, length)), ToString(delta));
804 }
805 }
806 }
807 }
808 stats_slice_initialized_ = true;
809 std::swap(stats_slice_, stats_map);
810 if (s.ok()) {
811 WriteOptions wo;
812 wo.low_pri = true;
813 wo.no_slowdown = true;
814 wo.sync = false;
815 s = Write(wo, &batch);
816 }
817 if (!s.ok()) {
818 ROCKS_LOG_INFO(immutable_db_options_.info_log,
819 "Writing to persistent stats CF failed -- %s",
820 s.ToString().c_str());
821 } else {
822 ROCKS_LOG_INFO(immutable_db_options_.info_log,
823 "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
824 " to persistent stats CF succeeded",
825 stats_slice_.size(), now_seconds);
826 }
827 // TODO(Zhongyi): add purging for persisted data
828 } else {
829 InstrumentedMutexLock l(&stats_history_mutex_);
830 // calculate the delta from last time
831 if (stats_slice_initialized_) {
832 std::map<std::string, uint64_t> stats_delta;
833 for (const auto& stat : stats_map) {
834 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
835 stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
836 }
837 }
838 ROCKS_LOG_INFO(immutable_db_options_.info_log,
839 "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
840 " to in-memory stats history",
841 stats_slice_.size(), now_seconds);
842 stats_history_[now_seconds] = stats_delta;
843 }
844 stats_slice_initialized_ = true;
845 std::swap(stats_slice_, stats_map);
846 TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
847
848 // delete older stats snapshots to control memory consumption
849 size_t stats_history_size = EstimateInMemoryStatsHistorySize();
850 bool purge_needed = stats_history_size > stats_history_size_limit;
851 ROCKS_LOG_INFO(immutable_db_options_.info_log,
852 "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
853 " bytes, slice count: %" ROCKSDB_PRIszt,
854 stats_history_size, stats_history_.size());
855 while (purge_needed && !stats_history_.empty()) {
856 stats_history_.erase(stats_history_.begin());
857 purge_needed =
858 EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
859 }
860 ROCKS_LOG_INFO(immutable_db_options_.info_log,
861 "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
862 " bytes, slice count: %" ROCKSDB_PRIszt,
863 stats_history_size, stats_history_.size());
864 }
865 TEST_SYNC_POINT("DBImpl::PersistStats:End");
866 #endif // !ROCKSDB_LITE
867 }
868
FindStatsByTime(uint64_t start_time,uint64_t end_time,uint64_t * new_time,std::map<std::string,uint64_t> * stats_map)869 bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
870 uint64_t* new_time,
871 std::map<std::string, uint64_t>* stats_map) {
872 assert(new_time);
873 assert(stats_map);
874 if (!new_time || !stats_map) return false;
875 // lock when search for start_time
876 {
877 InstrumentedMutexLock l(&stats_history_mutex_);
878 auto it = stats_history_.lower_bound(start_time);
879 if (it != stats_history_.end() && it->first < end_time) {
880 // make a copy for timestamp and stats_map
881 *new_time = it->first;
882 *stats_map = it->second;
883 return true;
884 } else {
885 return false;
886 }
887 }
888 }
889
GetStatsHistory(uint64_t start_time,uint64_t end_time,std::unique_ptr<StatsHistoryIterator> * stats_iterator)890 Status DBImpl::GetStatsHistory(
891 uint64_t start_time, uint64_t end_time,
892 std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
893 if (!stats_iterator) {
894 return Status::InvalidArgument("stats_iterator not preallocated.");
895 }
896 if (immutable_db_options_.persist_stats_to_disk) {
897 stats_iterator->reset(
898 new PersistentStatsHistoryIterator(start_time, end_time, this));
899 } else {
900 stats_iterator->reset(
901 new InMemoryStatsHistoryIterator(start_time, end_time, this));
902 }
903 return (*stats_iterator)->status();
904 }
905
DumpStats()906 void DBImpl::DumpStats() {
907 TEST_SYNC_POINT("DBImpl::DumpStats:1");
908 #ifndef ROCKSDB_LITE
909 const DBPropertyInfo* cf_property_info =
910 GetPropertyInfo(DB::Properties::kCFStats);
911 assert(cf_property_info != nullptr);
912 const DBPropertyInfo* db_property_info =
913 GetPropertyInfo(DB::Properties::kDBStats);
914 assert(db_property_info != nullptr);
915
916 std::string stats;
917 if (shutdown_initiated_) {
918 return;
919 }
920 TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
921 {
922 InstrumentedMutexLock l(&mutex_);
923 default_cf_internal_stats_->GetStringProperty(
924 *db_property_info, DB::Properties::kDBStats, &stats);
925 for (auto cfd : *versions_->GetColumnFamilySet()) {
926 if (cfd->initialized()) {
927 cfd->internal_stats()->GetStringProperty(
928 *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
929 }
930 }
931 for (auto cfd : *versions_->GetColumnFamilySet()) {
932 if (cfd->initialized()) {
933 cfd->internal_stats()->GetStringProperty(
934 *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
935 }
936 }
937 }
938 TEST_SYNC_POINT("DBImpl::DumpStats:2");
939 ROCKS_LOG_INFO(immutable_db_options_.info_log,
940 "------- DUMPING STATS -------");
941 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
942 if (immutable_db_options_.dump_malloc_stats) {
943 stats.clear();
944 DumpMallocStats(&stats);
945 if (!stats.empty()) {
946 ROCKS_LOG_INFO(immutable_db_options_.info_log,
947 "------- Malloc STATS -------");
948 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
949 }
950 }
951 #endif // !ROCKSDB_LITE
952
953 PrintStatistics();
954 }
955
FlushInfoLog()956 void DBImpl::FlushInfoLog() {
957 if (shutdown_initiated_) {
958 return;
959 }
960 TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
961 LogFlush(immutable_db_options_.info_log);
962 }
963
TablesRangeTombstoneSummary(ColumnFamilyHandle * column_family,int max_entries_to_print,std::string * out_str)964 Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
965 int max_entries_to_print,
966 std::string* out_str) {
967 auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
968 ColumnFamilyData* cfd = cfh->cfd();
969
970 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
971 Version* version = super_version->current;
972
973 Status s =
974 version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
975
976 CleanupSuperVersion(super_version);
977 return s;
978 }
979
ScheduleBgLogWriterClose(JobContext * job_context)980 void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
981 if (!job_context->logs_to_free.empty()) {
982 for (auto l : job_context->logs_to_free) {
983 AddToLogsToFreeQueue(l);
984 }
985 job_context->logs_to_free.clear();
986 }
987 }
988
GetDataDir(ColumnFamilyData * cfd,size_t path_id) const989 FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
990 assert(cfd);
991 FSDirectory* ret_dir = cfd->GetDataDir(path_id);
992 if (ret_dir == nullptr) {
993 return directories_.GetDataDir(path_id);
994 }
995 return ret_dir;
996 }
997
SetOptions(ColumnFamilyHandle * column_family,const std::unordered_map<std::string,std::string> & options_map)998 Status DBImpl::SetOptions(
999 ColumnFamilyHandle* column_family,
1000 const std::unordered_map<std::string, std::string>& options_map) {
1001 #ifdef ROCKSDB_LITE
1002 (void)column_family;
1003 (void)options_map;
1004 return Status::NotSupported("Not supported in ROCKSDB LITE");
1005 #else
1006 auto* cfd =
1007 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
1008 if (options_map.empty()) {
1009 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1010 "SetOptions() on column family [%s], empty input",
1011 cfd->GetName().c_str());
1012 return Status::InvalidArgument("empty input");
1013 }
1014
1015 MutableCFOptions new_options;
1016 Status s;
1017 Status persist_options_status;
1018 persist_options_status.PermitUncheckedError(); // Allow uninitialized access
1019 SuperVersionContext sv_context(/* create_superversion */ true);
1020 {
1021 auto db_options = GetDBOptions();
1022 InstrumentedMutexLock l(&mutex_);
1023 s = cfd->SetOptions(db_options, options_map);
1024 if (s.ok()) {
1025 new_options = *cfd->GetLatestMutableCFOptions();
1026 // Append new version to recompute compaction score.
1027 VersionEdit dummy_edit;
1028 s = versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
1029 directories_.GetDbDir());
1030 // Trigger possible flush/compactions. This has to be before we persist
1031 // options to file, otherwise there will be a deadlock with writer
1032 // thread.
1033 InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
1034
1035 persist_options_status = WriteOptionsFile(
1036 false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
1037 bg_cv_.SignalAll();
1038 }
1039 }
1040 sv_context.Clean();
1041
1042 ROCKS_LOG_INFO(
1043 immutable_db_options_.info_log,
1044 "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
1045 for (const auto& o : options_map) {
1046 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1047 o.second.c_str());
1048 }
1049 if (s.ok()) {
1050 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1051 "[%s] SetOptions() succeeded", cfd->GetName().c_str());
1052 new_options.Dump(immutable_db_options_.info_log.get());
1053 if (!persist_options_status.ok()) {
1054 s = persist_options_status;
1055 }
1056 } else {
1057 ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
1058 cfd->GetName().c_str());
1059 }
1060 LogFlush(immutable_db_options_.info_log);
1061 return s;
1062 #endif // ROCKSDB_LITE
1063 }
1064
SetDBOptions(const std::unordered_map<std::string,std::string> & options_map)1065 Status DBImpl::SetDBOptions(
1066 const std::unordered_map<std::string, std::string>& options_map) {
1067 #ifdef ROCKSDB_LITE
1068 (void)options_map;
1069 return Status::NotSupported("Not supported in ROCKSDB LITE");
1070 #else
1071 if (options_map.empty()) {
1072 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1073 "SetDBOptions(), empty input.");
1074 return Status::InvalidArgument("empty input");
1075 }
1076
1077 MutableDBOptions new_options;
1078 Status s;
1079 Status persist_options_status = Status::OK();
1080 bool wal_changed = false;
1081 WriteContext write_context;
1082 {
1083 InstrumentedMutexLock l(&mutex_);
1084 s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
1085 &new_options);
1086 if (new_options.bytes_per_sync == 0) {
1087 new_options.bytes_per_sync = 1024 * 1024;
1088 }
1089 DBOptions new_db_options =
1090 BuildDBOptions(immutable_db_options_, new_options);
1091 if (s.ok()) {
1092 s = ValidateOptions(new_db_options);
1093 }
1094 if (s.ok()) {
1095 for (auto c : *versions_->GetColumnFamilySet()) {
1096 if (!c->IsDropped()) {
1097 auto cf_options = c->GetLatestCFOptions();
1098 s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1099 if (!s.ok()) {
1100 break;
1101 }
1102 }
1103 }
1104 }
1105 if (s.ok()) {
1106 const BGJobLimits current_bg_job_limits =
1107 GetBGJobLimits(mutable_db_options_.max_background_flushes,
1108 mutable_db_options_.max_background_compactions,
1109 mutable_db_options_.max_background_jobs,
1110 /* parallelize_compactions */ true);
1111 const BGJobLimits new_bg_job_limits = GetBGJobLimits(
1112 new_options.max_background_flushes,
1113 new_options.max_background_compactions,
1114 new_options.max_background_jobs, /* parallelize_compactions */ true);
1115
1116 const bool max_flushes_increased =
1117 new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1118 const bool max_compactions_increased =
1119 new_bg_job_limits.max_compactions >
1120 current_bg_job_limits.max_compactions;
1121
1122 if (max_flushes_increased || max_compactions_increased) {
1123 if (max_flushes_increased) {
1124 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1125 Env::Priority::HIGH);
1126 }
1127
1128 if (max_compactions_increased) {
1129 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1130 Env::Priority::LOW);
1131 }
1132
1133 MaybeScheduleFlushOrCompaction();
1134 }
1135
1136 if (new_options.stats_dump_period_sec !=
1137 mutable_db_options_.stats_dump_period_sec ||
1138 new_options.stats_persist_period_sec !=
1139 mutable_db_options_.stats_persist_period_sec) {
1140 mutex_.Unlock();
1141 periodic_work_scheduler_->Unregister(this);
1142 periodic_work_scheduler_->Register(
1143 this, new_options.stats_dump_period_sec,
1144 new_options.stats_persist_period_sec);
1145 mutex_.Lock();
1146 }
1147 write_controller_.set_max_delayed_write_rate(
1148 new_options.delayed_write_rate);
1149 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1150 ? TableCache::kInfiniteCapacity
1151 : new_options.max_open_files - 10);
1152 wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1153 new_options.wal_bytes_per_sync;
1154 mutable_db_options_ = new_options;
1155 file_options_for_compaction_ = FileOptions(new_db_options);
1156 file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1157 file_options_for_compaction_, immutable_db_options_);
1158 versions_->ChangeFileOptions(mutable_db_options_);
1159 //TODO(xiez): clarify why apply optimize for read to write options
1160 file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1161 file_options_for_compaction_, immutable_db_options_);
1162 file_options_for_compaction_.compaction_readahead_size =
1163 mutable_db_options_.compaction_readahead_size;
1164 WriteThread::Writer w;
1165 write_thread_.EnterUnbatched(&w, &mutex_);
1166 if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1167 Status purge_wal_status = SwitchWAL(&write_context);
1168 if (!purge_wal_status.ok()) {
1169 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1170 "Unable to purge WAL files in SetDBOptions() -- %s",
1171 purge_wal_status.ToString().c_str());
1172 }
1173 }
1174 persist_options_status = WriteOptionsFile(
1175 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1176 write_thread_.ExitUnbatched(&w);
1177 } else {
1178 // To get here, we must have had invalid options and will not attempt to
1179 // persist the options, which means the status is "OK/Uninitialized.
1180 persist_options_status.PermitUncheckedError();
1181 }
1182 }
1183 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1184 for (const auto& o : options_map) {
1185 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1186 o.second.c_str());
1187 }
1188 if (s.ok()) {
1189 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1190 new_options.Dump(immutable_db_options_.info_log.get());
1191 if (!persist_options_status.ok()) {
1192 if (immutable_db_options_.fail_if_options_file_error) {
1193 s = Status::IOError(
1194 "SetDBOptions() succeeded, but unable to persist options",
1195 persist_options_status.ToString());
1196 }
1197 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1198 "Unable to persist options in SetDBOptions() -- %s",
1199 persist_options_status.ToString().c_str());
1200 }
1201 } else {
1202 ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1203 }
1204 LogFlush(immutable_db_options_.info_log);
1205 return s;
1206 #endif // ROCKSDB_LITE
1207 }
1208
1209 // return the same level if it cannot be moved
FindMinimumEmptyLevelFitting(ColumnFamilyData * cfd,const MutableCFOptions &,int level)1210 int DBImpl::FindMinimumEmptyLevelFitting(
1211 ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1212 int level) {
1213 mutex_.AssertHeld();
1214 const auto* vstorage = cfd->current()->storage_info();
1215 int minimum_level = level;
1216 for (int i = level - 1; i > 0; --i) {
1217 // stop if level i is not empty
1218 if (vstorage->NumLevelFiles(i) > 0) break;
1219 // stop if level i is too small (cannot fit the level files)
1220 if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1221 break;
1222 }
1223
1224 minimum_level = i;
1225 }
1226 return minimum_level;
1227 }
1228
FlushWAL(bool sync)1229 Status DBImpl::FlushWAL(bool sync) {
1230 if (manual_wal_flush_) {
1231 IOStatus io_s;
1232 {
1233 // We need to lock log_write_mutex_ since logs_ might change concurrently
1234 InstrumentedMutexLock wl(&log_write_mutex_);
1235 log::Writer* cur_log_writer = logs_.back().writer;
1236 io_s = cur_log_writer->WriteBuffer();
1237 }
1238 if (!io_s.ok()) {
1239 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1240 io_s.ToString().c_str());
1241 // In case there is a fs error we should set it globally to prevent the
1242 // future writes
1243 IOStatusCheck(io_s);
1244 // whether sync or not, we should abort the rest of function upon error
1245 return std::move(io_s);
1246 }
1247 if (!sync) {
1248 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1249 return std::move(io_s);
1250 }
1251 }
1252 if (!sync) {
1253 return Status::OK();
1254 }
1255 // sync = true
1256 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1257 return SyncWAL();
1258 }
1259
SyncWAL()1260 Status DBImpl::SyncWAL() {
1261 autovector<log::Writer*, 1> logs_to_sync;
1262 bool need_log_dir_sync;
1263 uint64_t current_log_number;
1264
1265 {
1266 InstrumentedMutexLock l(&mutex_);
1267 assert(!logs_.empty());
1268
1269 // This SyncWAL() call only cares about logs up to this number.
1270 current_log_number = logfile_number_;
1271
1272 while (logs_.front().number <= current_log_number &&
1273 logs_.front().getting_synced) {
1274 log_sync_cv_.Wait();
1275 }
1276 // First check that logs are safe to sync in background.
1277 for (auto it = logs_.begin();
1278 it != logs_.end() && it->number <= current_log_number; ++it) {
1279 if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1280 return Status::NotSupported(
1281 "SyncWAL() is not supported for this implementation of WAL file",
1282 immutable_db_options_.allow_mmap_writes
1283 ? "try setting Options::allow_mmap_writes to false"
1284 : Slice());
1285 }
1286 }
1287 for (auto it = logs_.begin();
1288 it != logs_.end() && it->number <= current_log_number; ++it) {
1289 auto& log = *it;
1290 assert(!log.getting_synced);
1291 log.getting_synced = true;
1292 logs_to_sync.push_back(log.writer);
1293 }
1294
1295 need_log_dir_sync = !log_dir_synced_;
1296 }
1297
1298 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
1299 RecordTick(stats_, WAL_FILE_SYNCED);
1300 Status status;
1301 IOStatus io_s;
1302 for (log::Writer* log : logs_to_sync) {
1303 io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1304 if (!io_s.ok()) {
1305 status = io_s;
1306 break;
1307 }
1308 }
1309 if (!io_s.ok()) {
1310 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
1311 io_s.ToString().c_str());
1312 // In case there is a fs error we should set it globally to prevent the
1313 // future writes
1314 IOStatusCheck(io_s);
1315 }
1316 if (status.ok() && need_log_dir_sync) {
1317 status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
1318 }
1319 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
1320
1321 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1322 {
1323 InstrumentedMutexLock l(&mutex_);
1324 if (status.ok()) {
1325 status = MarkLogsSynced(current_log_number, need_log_dir_sync);
1326 } else {
1327 MarkLogsNotSynced(current_log_number);
1328 }
1329 }
1330 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1331
1332 return status;
1333 }
1334
LockWAL()1335 Status DBImpl::LockWAL() {
1336 log_write_mutex_.Lock();
1337 auto cur_log_writer = logs_.back().writer;
1338 auto status = cur_log_writer->WriteBuffer();
1339 if (!status.ok()) {
1340 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1341 status.ToString().c_str());
1342 // In case there is a fs error we should set it globally to prevent the
1343 // future writes
1344 WriteStatusCheck(status);
1345 }
1346 return std::move(status);
1347 }
1348
UnlockWAL()1349 Status DBImpl::UnlockWAL() {
1350 log_write_mutex_.Unlock();
1351 return Status::OK();
1352 }
1353
MarkLogsSynced(uint64_t up_to,bool synced_dir)1354 Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
1355 mutex_.AssertHeld();
1356 if (synced_dir && logfile_number_ == up_to) {
1357 log_dir_synced_ = true;
1358 }
1359 VersionEdit synced_wals;
1360 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1361 auto& wal = *it;
1362 assert(wal.getting_synced);
1363 if (logs_.size() > 1) {
1364 if (immutable_db_options_.track_and_verify_wals_in_manifest &&
1365 wal.writer->file()->GetFileSize() > 0) {
1366 synced_wals.AddWal(wal.number,
1367 WalMetadata(wal.writer->file()->GetFileSize()));
1368 }
1369 logs_to_free_.push_back(wal.ReleaseWriter());
1370 // To modify logs_ both mutex_ and log_write_mutex_ must be held
1371 InstrumentedMutexLock l(&log_write_mutex_);
1372 it = logs_.erase(it);
1373 } else {
1374 wal.getting_synced = false;
1375 ++it;
1376 }
1377 }
1378 assert(logs_.empty() || logs_[0].number > up_to ||
1379 (logs_.size() == 1 && !logs_[0].getting_synced));
1380
1381 Status s;
1382 if (synced_wals.IsWalAddition()) {
1383 // not empty, write to MANIFEST.
1384 s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
1385 if (!s.ok() && versions_->io_status().IsIOError()) {
1386 s = error_handler_.SetBGError(versions_->io_status(),
1387 BackgroundErrorReason::kManifestWrite);
1388 }
1389 }
1390 log_sync_cv_.SignalAll();
1391 return s;
1392 }
1393
MarkLogsNotSynced(uint64_t up_to)1394 void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
1395 mutex_.AssertHeld();
1396 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
1397 ++it) {
1398 auto& wal = *it;
1399 assert(wal.getting_synced);
1400 wal.getting_synced = false;
1401 }
1402 log_sync_cv_.SignalAll();
1403 }
1404
GetLatestSequenceNumber() const1405 SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1406 return versions_->LastSequence();
1407 }
1408
SetLastPublishedSequence(SequenceNumber seq)1409 void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1410 versions_->SetLastPublishedSequence(seq);
1411 }
1412
SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)1413 bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1414 if (seqnum > preserve_deletes_seqnum_.load()) {
1415 preserve_deletes_seqnum_.store(seqnum);
1416 return true;
1417 } else {
1418 return false;
1419 }
1420 }
1421
NewInternalIterator(const ReadOptions & read_options,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,ColumnFamilyHandle * column_family,bool allow_unprepared_value)1422 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1423 Arena* arena,
1424 RangeDelAggregator* range_del_agg,
1425 SequenceNumber sequence,
1426 ColumnFamilyHandle* column_family,
1427 bool allow_unprepared_value) {
1428 ColumnFamilyData* cfd;
1429 if (column_family == nullptr) {
1430 cfd = default_cf_handle_->cfd();
1431 } else {
1432 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1433 cfd = cfh->cfd();
1434 }
1435
1436 mutex_.Lock();
1437 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1438 mutex_.Unlock();
1439 return NewInternalIterator(read_options, cfd, super_version, arena,
1440 range_del_agg, sequence, allow_unprepared_value);
1441 }
1442
SchedulePurge()1443 void DBImpl::SchedulePurge() {
1444 mutex_.AssertHeld();
1445 assert(opened_successfully_);
1446
1447 // Purge operations are put into High priority queue
1448 bg_purge_scheduled_++;
1449 env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1450 }
1451
BackgroundCallPurge()1452 void DBImpl::BackgroundCallPurge() {
1453 mutex_.Lock();
1454
1455 while (!logs_to_free_queue_.empty()) {
1456 assert(!logs_to_free_queue_.empty());
1457 log::Writer* log_writer = *(logs_to_free_queue_.begin());
1458 logs_to_free_queue_.pop_front();
1459 mutex_.Unlock();
1460 delete log_writer;
1461 mutex_.Lock();
1462 }
1463 while (!superversions_to_free_queue_.empty()) {
1464 assert(!superversions_to_free_queue_.empty());
1465 SuperVersion* sv = superversions_to_free_queue_.front();
1466 superversions_to_free_queue_.pop_front();
1467 mutex_.Unlock();
1468 delete sv;
1469 mutex_.Lock();
1470 }
1471
1472 // Can't use iterator to go over purge_files_ because inside the loop we're
1473 // unlocking the mutex that protects purge_files_.
1474 while (!purge_files_.empty()) {
1475 auto it = purge_files_.begin();
1476 // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1477 PurgeFileInfo purge_file = it->second;
1478
1479 const std::string& fname = purge_file.fname;
1480 const std::string& dir_to_sync = purge_file.dir_to_sync;
1481 FileType type = purge_file.type;
1482 uint64_t number = purge_file.number;
1483 int job_id = purge_file.job_id;
1484
1485 purge_files_.erase(it);
1486
1487 mutex_.Unlock();
1488 DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1489 mutex_.Lock();
1490 }
1491
1492 bg_purge_scheduled_--;
1493
1494 bg_cv_.SignalAll();
1495 // IMPORTANT:there should be no code after calling SignalAll. This call may
1496 // signal the DB destructor that it's OK to proceed with destruction. In
1497 // that case, all DB variables will be dealloacated and referencing them
1498 // will cause trouble.
1499 mutex_.Unlock();
1500 }
1501
1502 namespace {
1503 struct IterState {
IterStateROCKSDB_NAMESPACE::__anonf5eb00d20211::IterState1504 IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1505 bool _background_purge)
1506 : db(_db),
1507 mu(_mu),
1508 super_version(_super_version),
1509 background_purge(_background_purge) {}
1510
1511 DBImpl* db;
1512 InstrumentedMutex* mu;
1513 SuperVersion* super_version;
1514 bool background_purge;
1515 };
1516
CleanupIteratorState(void * arg1,void *)1517 static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
1518 IterState* state = reinterpret_cast<IterState*>(arg1);
1519
1520 if (state->super_version->Unref()) {
1521 // Job id == 0 means that this is not our background process, but rather
1522 // user thread
1523 JobContext job_context(0);
1524
1525 state->mu->Lock();
1526 state->super_version->Cleanup();
1527 state->db->FindObsoleteFiles(&job_context, false, true);
1528 if (state->background_purge) {
1529 state->db->ScheduleBgLogWriterClose(&job_context);
1530 state->db->AddSuperVersionsToFreeQueue(state->super_version);
1531 state->db->SchedulePurge();
1532 }
1533 state->mu->Unlock();
1534
1535 if (!state->background_purge) {
1536 delete state->super_version;
1537 }
1538 if (job_context.HaveSomethingToDelete()) {
1539 if (state->background_purge) {
1540 // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1541 // files to be deleted to a job queue, and deletes it in a separate
1542 // background thread.
1543 state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1544 state->mu->Lock();
1545 state->db->SchedulePurge();
1546 state->mu->Unlock();
1547 } else {
1548 state->db->PurgeObsoleteFiles(job_context);
1549 }
1550 }
1551 job_context.Clean();
1552 }
1553
1554 delete state;
1555 }
1556 } // namespace
1557
NewInternalIterator(const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * super_version,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,bool allow_unprepared_value)1558 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1559 ColumnFamilyData* cfd,
1560 SuperVersion* super_version,
1561 Arena* arena,
1562 RangeDelAggregator* range_del_agg,
1563 SequenceNumber sequence,
1564 bool allow_unprepared_value) {
1565 InternalIterator* internal_iter;
1566 assert(arena != nullptr);
1567 assert(range_del_agg != nullptr);
1568 // Need to create internal iterator from the arena.
1569 MergeIteratorBuilder merge_iter_builder(
1570 &cfd->internal_comparator(), arena,
1571 !read_options.total_order_seek &&
1572 super_version->mutable_cf_options.prefix_extractor != nullptr);
1573 // Collect iterator for mutable mem
1574 merge_iter_builder.AddIterator(
1575 super_version->mem->NewIterator(read_options, arena));
1576 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
1577 Status s;
1578 if (!read_options.ignore_range_deletions) {
1579 range_del_iter.reset(
1580 super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1581 range_del_agg->AddTombstones(std::move(range_del_iter));
1582 }
1583 // Collect all needed child iterators for immutable memtables
1584 if (s.ok()) {
1585 super_version->imm->AddIterators(read_options, &merge_iter_builder);
1586 if (!read_options.ignore_range_deletions) {
1587 s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1588 range_del_agg);
1589 }
1590 }
1591 TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
1592 if (s.ok()) {
1593 // Collect iterators for files in L0 - Ln
1594 if (read_options.read_tier != kMemtableTier) {
1595 super_version->current->AddIterators(read_options, file_options_,
1596 &merge_iter_builder, range_del_agg,
1597 allow_unprepared_value);
1598 }
1599 internal_iter = merge_iter_builder.Finish();
1600 IterState* cleanup =
1601 new IterState(this, &mutex_, super_version,
1602 read_options.background_purge_on_iterator_cleanup ||
1603 immutable_db_options_.avoid_unnecessary_blocking_io);
1604 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1605
1606 return internal_iter;
1607 } else {
1608 CleanupSuperVersion(super_version);
1609 }
1610 return NewErrorInternalIterator<Slice>(s, arena);
1611 }
1612
DefaultColumnFamily() const1613 ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1614 return default_cf_handle_;
1615 }
1616
PersistentStatsColumnFamily() const1617 ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1618 return persist_stats_cf_handle_;
1619 }
1620
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1621 Status DBImpl::Get(const ReadOptions& read_options,
1622 ColumnFamilyHandle* column_family, const Slice& key,
1623 PinnableSlice* value) {
1624 return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
1625 }
1626
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,std::string * timestamp)1627 Status DBImpl::Get(const ReadOptions& read_options,
1628 ColumnFamilyHandle* column_family, const Slice& key,
1629 PinnableSlice* value, std::string* timestamp) {
1630 GetImplOptions get_impl_options;
1631 get_impl_options.column_family = column_family;
1632 get_impl_options.value = value;
1633 get_impl_options.timestamp = timestamp;
1634 Status s = GetImpl(read_options, key, get_impl_options);
1635 return s;
1636 }
1637
1638 namespace {
1639 class GetWithTimestampReadCallback : public ReadCallback {
1640 public:
GetWithTimestampReadCallback(SequenceNumber seq)1641 explicit GetWithTimestampReadCallback(SequenceNumber seq)
1642 : ReadCallback(seq) {}
IsVisibleFullCheck(SequenceNumber seq)1643 bool IsVisibleFullCheck(SequenceNumber seq) override {
1644 return seq <= max_visible_seq_;
1645 }
1646 };
1647 } // namespace
1648
GetImpl(const ReadOptions & read_options,const Slice & key,GetImplOptions & get_impl_options)1649 Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
1650 GetImplOptions& get_impl_options) {
1651 assert(get_impl_options.value != nullptr ||
1652 get_impl_options.merge_operands != nullptr);
1653
1654 assert(get_impl_options.column_family);
1655 const Comparator* ucmp = get_impl_options.column_family->GetComparator();
1656 assert(ucmp);
1657 size_t ts_sz = ucmp->timestamp_size();
1658 GetWithTimestampReadCallback read_cb(0); // Will call Refresh
1659
1660 #ifndef NDEBUG
1661 if (ts_sz > 0) {
1662 assert(read_options.timestamp);
1663 assert(read_options.timestamp->size() == ts_sz);
1664 } else {
1665 assert(!read_options.timestamp);
1666 }
1667 #endif // NDEBUG
1668
1669 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
1670 StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
1671 PERF_TIMER_GUARD(get_snapshot_time);
1672
1673 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
1674 get_impl_options.column_family);
1675 auto cfd = cfh->cfd();
1676
1677 if (tracer_) {
1678 // TODO: This mutex should be removed later, to improve performance when
1679 // tracing is enabled.
1680 InstrumentedMutexLock lock(&trace_mutex_);
1681 if (tracer_) {
1682 // TODO: maybe handle the tracing status?
1683 tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
1684 }
1685 }
1686
1687 // Acquire SuperVersion
1688 SuperVersion* sv = GetAndRefSuperVersion(cfd);
1689
1690 TEST_SYNC_POINT("DBImpl::GetImpl:1");
1691 TEST_SYNC_POINT("DBImpl::GetImpl:2");
1692
1693 SequenceNumber snapshot;
1694 if (read_options.snapshot != nullptr) {
1695 if (get_impl_options.callback) {
1696 // Already calculated based on read_options.snapshot
1697 snapshot = get_impl_options.callback->max_visible_seq();
1698 } else {
1699 snapshot =
1700 reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1701 }
1702 } else {
1703 // Note that the snapshot is assigned AFTER referencing the super
1704 // version because otherwise a flush happening in between may compact away
1705 // data for the snapshot, so the reader would see neither data that was be
1706 // visible to the snapshot before compaction nor the newer data inserted
1707 // afterwards.
1708 if (last_seq_same_as_publish_seq_) {
1709 snapshot = versions_->LastSequence();
1710 } else {
1711 snapshot = versions_->LastPublishedSequence();
1712 }
1713 if (get_impl_options.callback) {
1714 // The unprep_seqs are not published for write unprepared, so it could be
1715 // that max_visible_seq is larger. Seek to the std::max of the two.
1716 // However, we still want our callback to contain the actual snapshot so
1717 // that it can do the correct visibility filtering.
1718 get_impl_options.callback->Refresh(snapshot);
1719
1720 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1721 // max_visible_seq = max(max_visible_seq, snapshot)
1722 //
1723 // Currently, the commented out assert is broken by
1724 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1725 // the regular transaction flow, then this special read callback would not
1726 // be needed.
1727 //
1728 // assert(callback->max_visible_seq() >= snapshot);
1729 snapshot = get_impl_options.callback->max_visible_seq();
1730 }
1731 }
1732 // If timestamp is used, we use read callback to ensure <key,t,s> is returned
1733 // only if t <= read_opts.timestamp and s <= snapshot.
1734 if (ts_sz > 0) {
1735 assert(!get_impl_options
1736 .callback); // timestamp with callback is not supported
1737 read_cb.Refresh(snapshot);
1738 get_impl_options.callback = &read_cb;
1739 }
1740 TEST_SYNC_POINT("DBImpl::GetImpl:3");
1741 TEST_SYNC_POINT("DBImpl::GetImpl:4");
1742
1743 // Prepare to store a list of merge operations if merge occurs.
1744 MergeContext merge_context;
1745 SequenceNumber max_covering_tombstone_seq = 0;
1746
1747 Status s;
1748 // First look in the memtable, then in the immutable memtable (if any).
1749 // s is both in/out. When in, s could either be OK or MergeInProgress.
1750 // merge_operands will contain the sequence of merges in the latter case.
1751 LookupKey lkey(key, snapshot, read_options.timestamp);
1752 PERF_TIMER_STOP(get_snapshot_time);
1753
1754 bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1755 has_unpersisted_data_.load(std::memory_order_relaxed));
1756 bool done = false;
1757 std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
1758 if (!skip_memtable) {
1759 // Get value associated with key
1760 if (get_impl_options.get_value) {
1761 if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
1762 &merge_context, &max_covering_tombstone_seq,
1763 read_options, get_impl_options.callback,
1764 get_impl_options.is_blob_index)) {
1765 done = true;
1766 get_impl_options.value->PinSelf();
1767 RecordTick(stats_, MEMTABLE_HIT);
1768 } else if ((s.ok() || s.IsMergeInProgress()) &&
1769 sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
1770 timestamp, &s, &merge_context,
1771 &max_covering_tombstone_seq, read_options,
1772 get_impl_options.callback,
1773 get_impl_options.is_blob_index)) {
1774 done = true;
1775 get_impl_options.value->PinSelf();
1776 RecordTick(stats_, MEMTABLE_HIT);
1777 }
1778 } else {
1779 // Get Merge Operands associated with key, Merge Operands should not be
1780 // merged and raw values should be returned to the user.
1781 if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
1782 &merge_context, &max_covering_tombstone_seq,
1783 read_options, nullptr, nullptr, false)) {
1784 done = true;
1785 RecordTick(stats_, MEMTABLE_HIT);
1786 } else if ((s.ok() || s.IsMergeInProgress()) &&
1787 sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1788 &max_covering_tombstone_seq,
1789 read_options)) {
1790 done = true;
1791 RecordTick(stats_, MEMTABLE_HIT);
1792 }
1793 }
1794 if (!done && !s.ok() && !s.IsMergeInProgress()) {
1795 ReturnAndCleanupSuperVersion(cfd, sv);
1796 return s;
1797 }
1798 }
1799 if (!done) {
1800 PERF_TIMER_GUARD(get_from_output_files_time);
1801 sv->current->Get(
1802 read_options, lkey, get_impl_options.value, timestamp, &s,
1803 &merge_context, &max_covering_tombstone_seq,
1804 get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1805 nullptr, nullptr,
1806 get_impl_options.get_value ? get_impl_options.callback : nullptr,
1807 get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1808 get_impl_options.get_value);
1809 RecordTick(stats_, MEMTABLE_MISS);
1810 }
1811
1812 {
1813 PERF_TIMER_GUARD(get_post_process_time);
1814
1815 ReturnAndCleanupSuperVersion(cfd, sv);
1816
1817 RecordTick(stats_, NUMBER_KEYS_READ);
1818 size_t size = 0;
1819 if (s.ok()) {
1820 if (get_impl_options.get_value) {
1821 size = get_impl_options.value->size();
1822 } else {
1823 // Return all merge operands for get_impl_options.key
1824 *get_impl_options.number_of_operands =
1825 static_cast<int>(merge_context.GetNumOperands());
1826 if (*get_impl_options.number_of_operands >
1827 get_impl_options.get_merge_operands_options
1828 ->expected_max_number_of_operands) {
1829 s = Status::Incomplete(
1830 Status::SubCode::KMergeOperandsInsufficientCapacity);
1831 } else {
1832 for (const Slice& sl : merge_context.GetOperands()) {
1833 size += sl.size();
1834 get_impl_options.merge_operands->PinSelf(sl);
1835 get_impl_options.merge_operands++;
1836 }
1837 }
1838 }
1839 RecordTick(stats_, BYTES_READ, size);
1840 PERF_COUNTER_ADD(get_read_bytes, size);
1841 }
1842 RecordInHistogram(stats_, BYTES_PER_READ, size);
1843 }
1844 return s;
1845 }
1846
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)1847 std::vector<Status> DBImpl::MultiGet(
1848 const ReadOptions& read_options,
1849 const std::vector<ColumnFamilyHandle*>& column_family,
1850 const std::vector<Slice>& keys, std::vector<std::string>* values) {
1851 return MultiGet(read_options, column_family, keys, values,
1852 /*timestamps=*/nullptr);
1853 }
1854
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values,std::vector<std::string> * timestamps)1855 std::vector<Status> DBImpl::MultiGet(
1856 const ReadOptions& read_options,
1857 const std::vector<ColumnFamilyHandle*>& column_family,
1858 const std::vector<Slice>& keys, std::vector<std::string>* values,
1859 std::vector<std::string>* timestamps) {
1860 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
1861 StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
1862 PERF_TIMER_GUARD(get_snapshot_time);
1863
1864 #ifndef NDEBUG
1865 for (const auto* cfh : column_family) {
1866 assert(cfh);
1867 const Comparator* const ucmp = cfh->GetComparator();
1868 assert(ucmp);
1869 if (ucmp->timestamp_size() > 0) {
1870 assert(read_options.timestamp);
1871 assert(ucmp->timestamp_size() == read_options.timestamp->size());
1872 } else {
1873 assert(!read_options.timestamp);
1874 }
1875 }
1876 #endif // NDEBUG
1877
1878 SequenceNumber consistent_seqnum;
1879
1880 std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1881 column_family.size());
1882 for (auto cf : column_family) {
1883 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
1884 auto cfd = cfh->cfd();
1885 if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
1886 multiget_cf_data.emplace(cfd->GetID(),
1887 MultiGetColumnFamilyData(cfh, nullptr));
1888 }
1889 }
1890
1891 std::function<MultiGetColumnFamilyData*(
1892 std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1893 iter_deref_lambda =
1894 [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1895 cf_iter) { return &cf_iter->second; };
1896
1897 bool unref_only =
1898 MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1899 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1900 &consistent_seqnum);
1901
1902 TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1");
1903 TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2");
1904
1905 // Contain a list of merge operations if merge occurs.
1906 MergeContext merge_context;
1907
1908 // Note: this always resizes the values array
1909 size_t num_keys = keys.size();
1910 std::vector<Status> stat_list(num_keys);
1911 values->resize(num_keys);
1912 if (timestamps) {
1913 timestamps->resize(num_keys);
1914 }
1915
1916 // Keep track of bytes that we read for statistics-recording later
1917 uint64_t bytes_read = 0;
1918 PERF_TIMER_STOP(get_snapshot_time);
1919
1920 // For each of the given keys, apply the entire "get" process as follows:
1921 // First look in the memtable, then in the immutable memtable (if any).
1922 // s is both in/out. When in, s could either be OK or MergeInProgress.
1923 // merge_operands will contain the sequence of merges in the latter case.
1924 size_t num_found = 0;
1925 size_t keys_read;
1926 uint64_t curr_value_size = 0;
1927
1928 GetWithTimestampReadCallback timestamp_read_callback(0);
1929 ReadCallback* read_callback = nullptr;
1930 if (read_options.timestamp && read_options.timestamp->size() > 0) {
1931 timestamp_read_callback.Refresh(consistent_seqnum);
1932 read_callback = ×tamp_read_callback;
1933 }
1934
1935 for (keys_read = 0; keys_read < num_keys; ++keys_read) {
1936 merge_context.Clear();
1937 Status& s = stat_list[keys_read];
1938 std::string* value = &(*values)[keys_read];
1939 std::string* timestamp = timestamps ? &(*timestamps)[keys_read] : nullptr;
1940
1941 LookupKey lkey(keys[keys_read], consistent_seqnum, read_options.timestamp);
1942 auto cfh =
1943 static_cast_with_check<ColumnFamilyHandleImpl>(column_family[keys_read]);
1944 SequenceNumber max_covering_tombstone_seq = 0;
1945 auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1946 assert(mgd_iter != multiget_cf_data.end());
1947 auto mgd = mgd_iter->second;
1948 auto super_version = mgd.super_version;
1949 bool skip_memtable =
1950 (read_options.read_tier == kPersistedTier &&
1951 has_unpersisted_data_.load(std::memory_order_relaxed));
1952 bool done = false;
1953 if (!skip_memtable) {
1954 if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
1955 &max_covering_tombstone_seq, read_options,
1956 read_callback)) {
1957 done = true;
1958 RecordTick(stats_, MEMTABLE_HIT);
1959 } else if (super_version->imm->Get(lkey, value, timestamp, &s,
1960 &merge_context,
1961 &max_covering_tombstone_seq,
1962 read_options, read_callback)) {
1963 done = true;
1964 RecordTick(stats_, MEMTABLE_HIT);
1965 }
1966 }
1967 if (!done) {
1968 PinnableSlice pinnable_val;
1969 PERF_TIMER_GUARD(get_from_output_files_time);
1970 super_version->current->Get(
1971 read_options, lkey, &pinnable_val, timestamp, &s, &merge_context,
1972 &max_covering_tombstone_seq, /*value_found=*/nullptr,
1973 /*key_exists=*/nullptr,
1974 /*seq=*/nullptr, read_callback);
1975 value->assign(pinnable_val.data(), pinnable_val.size());
1976 RecordTick(stats_, MEMTABLE_MISS);
1977 }
1978
1979 if (s.ok()) {
1980 bytes_read += value->size();
1981 num_found++;
1982 curr_value_size += value->size();
1983 if (curr_value_size > read_options.value_size_soft_limit) {
1984 while (++keys_read < num_keys) {
1985 stat_list[keys_read] = Status::Aborted();
1986 }
1987 break;
1988 }
1989 }
1990 if (read_options.deadline.count() &&
1991 immutable_db_options_.clock->NowMicros() >
1992 static_cast<uint64_t>(read_options.deadline.count())) {
1993 break;
1994 }
1995 }
1996
1997 if (keys_read < num_keys) {
1998 // The only reason to break out of the loop is when the deadline is
1999 // exceeded
2000 assert(immutable_db_options_.clock->NowMicros() >
2001 static_cast<uint64_t>(read_options.deadline.count()));
2002 for (++keys_read; keys_read < num_keys; ++keys_read) {
2003 stat_list[keys_read] = Status::TimedOut();
2004 }
2005 }
2006
2007 // Post processing (decrement reference counts and record statistics)
2008 PERF_TIMER_GUARD(get_post_process_time);
2009 autovector<SuperVersion*> superversions_to_delete;
2010
2011 for (auto mgd_iter : multiget_cf_data) {
2012 auto mgd = mgd_iter.second;
2013 if (!unref_only) {
2014 ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
2015 } else {
2016 mgd.cfd->GetSuperVersion()->Unref();
2017 }
2018 }
2019 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2020 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2021 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2022 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2023 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2024 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2025 PERF_TIMER_STOP(get_post_process_time);
2026
2027 return stat_list;
2028 }
2029
2030 template <class T>
MultiCFSnapshot(const ReadOptions & read_options,ReadCallback * callback,std::function<MultiGetColumnFamilyData * (typename T::iterator &)> & iter_deref_func,T * cf_list,SequenceNumber * snapshot)2031 bool DBImpl::MultiCFSnapshot(
2032 const ReadOptions& read_options, ReadCallback* callback,
2033 std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
2034 iter_deref_func,
2035 T* cf_list, SequenceNumber* snapshot) {
2036 PERF_TIMER_GUARD(get_snapshot_time);
2037
2038 bool last_try = false;
2039 if (cf_list->size() == 1) {
2040 // Fast path for a single column family. We can simply get the thread loca
2041 // super version
2042 auto cf_iter = cf_list->begin();
2043 auto node = iter_deref_func(cf_iter);
2044 node->super_version = GetAndRefSuperVersion(node->cfd);
2045 if (read_options.snapshot != nullptr) {
2046 // Note: In WritePrepared txns this is not necessary but not harmful
2047 // either. Because prep_seq > snapshot => commit_seq > snapshot so if
2048 // a snapshot is specified we should be fine with skipping seq numbers
2049 // that are greater than that.
2050 //
2051 // In WriteUnprepared, we cannot set snapshot in the lookup key because we
2052 // may skip uncommitted data that should be visible to the transaction for
2053 // reading own writes.
2054 *snapshot =
2055 static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
2056 if (callback) {
2057 *snapshot = std::max(*snapshot, callback->max_visible_seq());
2058 }
2059 } else {
2060 // Since we get and reference the super version before getting
2061 // the snapshot number, without a mutex protection, it is possible
2062 // that a memtable switch happened in the middle and not all the
2063 // data for this snapshot is available. But it will contain all
2064 // the data available in the super version we have, which is also
2065 // a valid snapshot to read from.
2066 // We shouldn't get snapshot before finding and referencing the super
2067 // version because a flush happening in between may compact away data for
2068 // the snapshot, but the snapshot is earlier than the data overwriting it,
2069 // so users may see wrong results.
2070 if (last_seq_same_as_publish_seq_) {
2071 *snapshot = versions_->LastSequence();
2072 } else {
2073 *snapshot = versions_->LastPublishedSequence();
2074 }
2075 }
2076 } else {
2077 // If we end up with the same issue of memtable geting sealed during 2
2078 // consecutive retries, it means the write rate is very high. In that case
2079 // its probably ok to take the mutex on the 3rd try so we can succeed for
2080 // sure
2081 static const int num_retries = 3;
2082 for (int i = 0; i < num_retries; ++i) {
2083 last_try = (i == num_retries - 1);
2084 bool retry = false;
2085
2086 if (i > 0) {
2087 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2088 ++cf_iter) {
2089 auto node = iter_deref_func(cf_iter);
2090 SuperVersion* super_version = node->super_version;
2091 ColumnFamilyData* cfd = node->cfd;
2092 if (super_version != nullptr) {
2093 ReturnAndCleanupSuperVersion(cfd, super_version);
2094 }
2095 node->super_version = nullptr;
2096 }
2097 }
2098 if (read_options.snapshot == nullptr) {
2099 if (last_try) {
2100 TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
2101 // We're close to max number of retries. For the last retry,
2102 // acquire the lock so we're sure to succeed
2103 mutex_.Lock();
2104 }
2105 if (last_seq_same_as_publish_seq_) {
2106 *snapshot = versions_->LastSequence();
2107 } else {
2108 *snapshot = versions_->LastPublishedSequence();
2109 }
2110 } else {
2111 *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
2112 ->number_;
2113 }
2114 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2115 ++cf_iter) {
2116 auto node = iter_deref_func(cf_iter);
2117 if (!last_try) {
2118 node->super_version = GetAndRefSuperVersion(node->cfd);
2119 } else {
2120 node->super_version = node->cfd->GetSuperVersion()->Ref();
2121 }
2122 TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
2123 if (read_options.snapshot != nullptr || last_try) {
2124 // If user passed a snapshot, then we don't care if a memtable is
2125 // sealed or compaction happens because the snapshot would ensure
2126 // that older key versions are kept around. If this is the last
2127 // retry, then we have the lock so nothing bad can happen
2128 continue;
2129 }
2130 // We could get the earliest sequence number for the whole list of
2131 // memtables, which will include immutable memtables as well, but that
2132 // might be tricky to maintain in case we decide, in future, to do
2133 // memtable compaction.
2134 if (!last_try) {
2135 SequenceNumber seq =
2136 node->super_version->mem->GetEarliestSequenceNumber();
2137 if (seq > *snapshot) {
2138 retry = true;
2139 break;
2140 }
2141 }
2142 }
2143 if (!retry) {
2144 if (last_try) {
2145 mutex_.Unlock();
2146 }
2147 break;
2148 }
2149 }
2150 }
2151
2152 // Keep track of bytes that we read for statistics-recording later
2153 PERF_TIMER_STOP(get_snapshot_time);
2154
2155 return last_try;
2156 }
2157
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2158 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2159 ColumnFamilyHandle** column_families, const Slice* keys,
2160 PinnableSlice* values, Status* statuses,
2161 const bool sorted_input) {
2162 return MultiGet(read_options, num_keys, column_families, keys, values,
2163 /*timestamps=*/nullptr, statuses, sorted_input);
2164 }
2165
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)2166 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2167 ColumnFamilyHandle** column_families, const Slice* keys,
2168 PinnableSlice* values, std::string* timestamps,
2169 Status* statuses, const bool sorted_input) {
2170 if (num_keys == 0) {
2171 return;
2172 }
2173
2174 #ifndef NDEBUG
2175 for (size_t i = 0; i < num_keys; ++i) {
2176 ColumnFamilyHandle* cfh = column_families[i];
2177 assert(cfh);
2178 const Comparator* const ucmp = cfh->GetComparator();
2179 assert(ucmp);
2180 if (ucmp->timestamp_size() > 0) {
2181 assert(read_options.timestamp);
2182 assert(read_options.timestamp->size() == ucmp->timestamp_size());
2183 } else {
2184 assert(!read_options.timestamp);
2185 }
2186 }
2187 #endif // NDEBUG
2188
2189 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2190 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2191 sorted_keys.resize(num_keys);
2192 for (size_t i = 0; i < num_keys; ++i) {
2193 key_context.emplace_back(column_families[i], keys[i], &values[i],
2194 timestamps ? ×tamps[i] : nullptr,
2195 &statuses[i]);
2196 }
2197 for (size_t i = 0; i < num_keys; ++i) {
2198 sorted_keys[i] = &key_context[i];
2199 }
2200 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2201
2202 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
2203 multiget_cf_data;
2204 size_t cf_start = 0;
2205 ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
2206 for (size_t i = 0; i < num_keys; ++i) {
2207 KeyContext* key_ctx = sorted_keys[i];
2208 if (key_ctx->column_family != cf) {
2209 multiget_cf_data.emplace_back(
2210 MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
2211 cf_start = i;
2212 cf = key_ctx->column_family;
2213 }
2214 }
2215 {
2216 // multiget_cf_data.emplace_back(
2217 // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
2218 multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
2219 }
2220 std::function<MultiGetColumnFamilyData*(
2221 autovector<MultiGetColumnFamilyData,
2222 MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
2223 iter_deref_lambda =
2224 [](autovector<MultiGetColumnFamilyData,
2225 MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
2226 return &(*cf_iter);
2227 };
2228
2229 SequenceNumber consistent_seqnum;
2230 bool unref_only = MultiCFSnapshot<
2231 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
2232 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2233 &consistent_seqnum);
2234
2235 GetWithTimestampReadCallback timestamp_read_callback(0);
2236 ReadCallback* read_callback = nullptr;
2237 if (read_options.timestamp && read_options.timestamp->size() > 0) {
2238 timestamp_read_callback.Refresh(consistent_seqnum);
2239 read_callback = ×tamp_read_callback;
2240 }
2241
2242 Status s;
2243 auto cf_iter = multiget_cf_data.begin();
2244 for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2245 s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
2246 &sorted_keys, cf_iter->super_version, consistent_seqnum,
2247 read_callback);
2248 if (!s.ok()) {
2249 break;
2250 }
2251 }
2252 if (!s.ok()) {
2253 assert(s.IsTimedOut() || s.IsAborted());
2254 for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2255 for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
2256 ++i) {
2257 *sorted_keys[i]->s = s;
2258 }
2259 }
2260 }
2261
2262 for (const auto& iter : multiget_cf_data) {
2263 if (!unref_only) {
2264 ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
2265 } else {
2266 iter.cfd->GetSuperVersion()->Unref();
2267 }
2268 }
2269 }
2270
2271 namespace {
2272 // Order keys by CF ID, followed by key contents
2273 struct CompareKeyContext {
operator ()ROCKSDB_NAMESPACE::__anonf5eb00d20611::CompareKeyContext2274 inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
2275 ColumnFamilyHandleImpl* cfh =
2276 static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2277 uint32_t cfd_id1 = cfh->cfd()->GetID();
2278 const Comparator* comparator = cfh->cfd()->user_comparator();
2279 cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2280 uint32_t cfd_id2 = cfh->cfd()->GetID();
2281
2282 if (cfd_id1 < cfd_id2) {
2283 return true;
2284 } else if (cfd_id1 > cfd_id2) {
2285 return false;
2286 }
2287
2288 // Both keys are from the same column family
2289 int cmp = comparator->CompareWithoutTimestamp(
2290 *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
2291 if (cmp < 0) {
2292 return true;
2293 }
2294 return false;
2295 }
2296 };
2297
2298 } // anonymous namespace
2299
PrepareMultiGetKeys(size_t num_keys,bool sorted_input,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2300 void DBImpl::PrepareMultiGetKeys(
2301 size_t num_keys, bool sorted_input,
2302 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2303 #ifndef NDEBUG
2304 if (sorted_input) {
2305 for (size_t index = 0; index < sorted_keys->size(); ++index) {
2306 if (index > 0) {
2307 KeyContext* lhs = (*sorted_keys)[index - 1];
2308 KeyContext* rhs = (*sorted_keys)[index];
2309 ColumnFamilyHandleImpl* cfh =
2310 static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
2311 uint32_t cfd_id1 = cfh->cfd()->GetID();
2312 const Comparator* comparator = cfh->cfd()->user_comparator();
2313 cfh =
2314 static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
2315 uint32_t cfd_id2 = cfh->cfd()->GetID();
2316
2317 assert(cfd_id1 <= cfd_id2);
2318 if (cfd_id1 < cfd_id2) {
2319 continue;
2320 }
2321
2322 // Both keys are from the same column family
2323 int cmp = comparator->CompareWithoutTimestamp(
2324 *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
2325 assert(cmp <= 0);
2326 }
2327 index++;
2328 }
2329 }
2330 #endif
2331 if (!sorted_input) {
2332 CompareKeyContext sort_comparator;
2333 std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2334 sort_comparator);
2335 }
2336 }
2337
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2338 void DBImpl::MultiGet(const ReadOptions& read_options,
2339 ColumnFamilyHandle* column_family, const size_t num_keys,
2340 const Slice* keys, PinnableSlice* values,
2341 Status* statuses, const bool sorted_input) {
2342 return MultiGet(read_options, column_family, num_keys, keys, values,
2343 /*timestamp=*/nullptr, statuses, sorted_input);
2344 }
2345
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)2346 void DBImpl::MultiGet(const ReadOptions& read_options,
2347 ColumnFamilyHandle* column_family, const size_t num_keys,
2348 const Slice* keys, PinnableSlice* values,
2349 std::string* timestamps, Status* statuses,
2350 const bool sorted_input) {
2351 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2352 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2353 sorted_keys.resize(num_keys);
2354 for (size_t i = 0; i < num_keys; ++i) {
2355 key_context.emplace_back(column_family, keys[i], &values[i],
2356 timestamps ? ×tamps[i] : nullptr,
2357 &statuses[i]);
2358 }
2359 for (size_t i = 0; i < num_keys; ++i) {
2360 sorted_keys[i] = &key_context[i];
2361 }
2362 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2363 MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2364 }
2365
MultiGetWithCallback(const ReadOptions & read_options,ColumnFamilyHandle * column_family,ReadCallback * callback,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2366 void DBImpl::MultiGetWithCallback(
2367 const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2368 ReadCallback* callback,
2369 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2370 std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2371 multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2372 std::function<MultiGetColumnFamilyData*(
2373 std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2374 iter_deref_lambda =
2375 [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2376 return &(*cf_iter);
2377 };
2378
2379 size_t num_keys = sorted_keys->size();
2380 SequenceNumber consistent_seqnum;
2381 bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2382 read_options, callback, iter_deref_lambda, &multiget_cf_data,
2383 &consistent_seqnum);
2384 #ifndef NDEBUG
2385 assert(!unref_only);
2386 #else
2387 // Silence unused variable warning
2388 (void)unref_only;
2389 #endif // NDEBUG
2390
2391 if (callback && read_options.snapshot == nullptr) {
2392 // The unprep_seqs are not published for write unprepared, so it could be
2393 // that max_visible_seq is larger. Seek to the std::max of the two.
2394 // However, we still want our callback to contain the actual snapshot so
2395 // that it can do the correct visibility filtering.
2396 callback->Refresh(consistent_seqnum);
2397
2398 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2399 // max_visible_seq = max(max_visible_seq, snapshot)
2400 //
2401 // Currently, the commented out assert is broken by
2402 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2403 // the regular transaction flow, then this special read callback would not
2404 // be needed.
2405 //
2406 // assert(callback->max_visible_seq() >= snapshot);
2407 consistent_seqnum = callback->max_visible_seq();
2408 }
2409
2410 GetWithTimestampReadCallback timestamp_read_callback(0);
2411 ReadCallback* read_callback = callback;
2412 if (read_options.timestamp && read_options.timestamp->size() > 0) {
2413 assert(!read_callback); // timestamp with callback is not supported
2414 timestamp_read_callback.Refresh(consistent_seqnum);
2415 read_callback = ×tamp_read_callback;
2416 }
2417
2418 Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2419 multiget_cf_data[0].super_version, consistent_seqnum,
2420 read_callback);
2421 assert(s.ok() || s.IsTimedOut() || s.IsAborted());
2422 ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2423 multiget_cf_data[0].super_version);
2424 }
2425
2426 // The actual implementation of batched MultiGet. Parameters -
2427 // start_key - Index in the sorted_keys vector to start processing from
2428 // num_keys - Number of keys to lookup, starting with sorted_keys[start_key]
2429 // sorted_keys - The entire batch of sorted keys for this CF
2430 //
2431 // The per key status is returned in the KeyContext structures pointed to by
2432 // sorted_keys. An overall Status is also returned, with the only possible
2433 // values being Status::OK() and Status::TimedOut(). The latter indicates
2434 // that the call exceeded read_options.deadline
MultiGetImpl(const ReadOptions & read_options,size_t start_key,size_t num_keys,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys,SuperVersion * super_version,SequenceNumber snapshot,ReadCallback * callback)2435 Status DBImpl::MultiGetImpl(
2436 const ReadOptions& read_options, size_t start_key, size_t num_keys,
2437 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2438 SuperVersion* super_version, SequenceNumber snapshot,
2439 ReadCallback* callback) {
2440 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
2441 StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
2442
2443 // For each of the given keys, apply the entire "get" process as follows:
2444 // First look in the memtable, then in the immutable memtable (if any).
2445 // s is both in/out. When in, s could either be OK or MergeInProgress.
2446 // merge_operands will contain the sequence of merges in the latter case.
2447 size_t keys_left = num_keys;
2448 Status s;
2449 uint64_t curr_value_size = 0;
2450 while (keys_left) {
2451 if (read_options.deadline.count() &&
2452 immutable_db_options_.clock->NowMicros() >
2453 static_cast<uint64_t>(read_options.deadline.count())) {
2454 s = Status::TimedOut();
2455 break;
2456 }
2457
2458 size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2459 ? MultiGetContext::MAX_BATCH_SIZE
2460 : keys_left;
2461 MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
2462 batch_size, snapshot, read_options);
2463 MultiGetRange range = ctx.GetMultiGetRange();
2464 range.AddValueSize(curr_value_size);
2465 bool lookup_current = false;
2466
2467 keys_left -= batch_size;
2468 for (auto mget_iter = range.begin(); mget_iter != range.end();
2469 ++mget_iter) {
2470 mget_iter->merge_context.Clear();
2471 *mget_iter->s = Status::OK();
2472 }
2473
2474 bool skip_memtable =
2475 (read_options.read_tier == kPersistedTier &&
2476 has_unpersisted_data_.load(std::memory_order_relaxed));
2477 if (!skip_memtable) {
2478 super_version->mem->MultiGet(read_options, &range, callback);
2479 if (!range.empty()) {
2480 super_version->imm->MultiGet(read_options, &range, callback);
2481 }
2482 if (!range.empty()) {
2483 lookup_current = true;
2484 uint64_t left = range.KeysLeft();
2485 RecordTick(stats_, MEMTABLE_MISS, left);
2486 }
2487 }
2488 if (lookup_current) {
2489 PERF_TIMER_GUARD(get_from_output_files_time);
2490 super_version->current->MultiGet(read_options, &range, callback);
2491 }
2492 curr_value_size = range.GetValueSize();
2493 if (curr_value_size > read_options.value_size_soft_limit) {
2494 s = Status::Aborted();
2495 break;
2496 }
2497 }
2498
2499 // Post processing (decrement reference counts and record statistics)
2500 PERF_TIMER_GUARD(get_post_process_time);
2501 size_t num_found = 0;
2502 uint64_t bytes_read = 0;
2503 for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
2504 KeyContext* key = (*sorted_keys)[i];
2505 if (key->s->ok()) {
2506 bytes_read += key->value->size();
2507 num_found++;
2508 }
2509 }
2510 if (keys_left) {
2511 assert(s.IsTimedOut() || s.IsAborted());
2512 for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
2513 ++i) {
2514 KeyContext* key = (*sorted_keys)[i];
2515 *key->s = s;
2516 }
2517 }
2518
2519 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2520 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2521 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2522 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2523 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2524 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2525 PERF_TIMER_STOP(get_post_process_time);
2526
2527 return s;
2528 }
2529
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const std::string & column_family,ColumnFamilyHandle ** handle)2530 Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2531 const std::string& column_family,
2532 ColumnFamilyHandle** handle) {
2533 assert(handle != nullptr);
2534 Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2535 if (s.ok()) {
2536 s = WriteOptionsFile(true /*need_mutex_lock*/,
2537 true /*need_enter_write_thread*/);
2538 }
2539 return s;
2540 }
2541
CreateColumnFamilies(const ColumnFamilyOptions & cf_options,const std::vector<std::string> & column_family_names,std::vector<ColumnFamilyHandle * > * handles)2542 Status DBImpl::CreateColumnFamilies(
2543 const ColumnFamilyOptions& cf_options,
2544 const std::vector<std::string>& column_family_names,
2545 std::vector<ColumnFamilyHandle*>* handles) {
2546 assert(handles != nullptr);
2547 handles->clear();
2548 size_t num_cf = column_family_names.size();
2549 Status s;
2550 bool success_once = false;
2551 for (size_t i = 0; i < num_cf; i++) {
2552 ColumnFamilyHandle* handle;
2553 s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2554 if (!s.ok()) {
2555 break;
2556 }
2557 handles->push_back(handle);
2558 success_once = true;
2559 }
2560 if (success_once) {
2561 Status persist_options_status = WriteOptionsFile(
2562 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2563 if (s.ok() && !persist_options_status.ok()) {
2564 s = persist_options_status;
2565 }
2566 }
2567 return s;
2568 }
2569
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles)2570 Status DBImpl::CreateColumnFamilies(
2571 const std::vector<ColumnFamilyDescriptor>& column_families,
2572 std::vector<ColumnFamilyHandle*>* handles) {
2573 assert(handles != nullptr);
2574 handles->clear();
2575 size_t num_cf = column_families.size();
2576 Status s;
2577 bool success_once = false;
2578 for (size_t i = 0; i < num_cf; i++) {
2579 ColumnFamilyHandle* handle;
2580 s = CreateColumnFamilyImpl(column_families[i].options,
2581 column_families[i].name, &handle);
2582 if (!s.ok()) {
2583 break;
2584 }
2585 handles->push_back(handle);
2586 success_once = true;
2587 }
2588 if (success_once) {
2589 Status persist_options_status = WriteOptionsFile(
2590 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2591 if (s.ok() && !persist_options_status.ok()) {
2592 s = persist_options_status;
2593 }
2594 }
2595 return s;
2596 }
2597
CreateColumnFamilyImpl(const ColumnFamilyOptions & cf_options,const std::string & column_family_name,ColumnFamilyHandle ** handle)2598 Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2599 const std::string& column_family_name,
2600 ColumnFamilyHandle** handle) {
2601 Status s;
2602 *handle = nullptr;
2603
2604 DBOptions db_options =
2605 BuildDBOptions(immutable_db_options_, mutable_db_options_);
2606 s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
2607 if (s.ok()) {
2608 for (auto& cf_path : cf_options.cf_paths) {
2609 s = env_->CreateDirIfMissing(cf_path.path);
2610 if (!s.ok()) {
2611 break;
2612 }
2613 }
2614 }
2615 if (!s.ok()) {
2616 return s;
2617 }
2618
2619 SuperVersionContext sv_context(/* create_superversion */ true);
2620 {
2621 InstrumentedMutexLock l(&mutex_);
2622
2623 if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2624 nullptr) {
2625 return Status::InvalidArgument("Column family already exists");
2626 }
2627 VersionEdit edit;
2628 edit.AddColumnFamily(column_family_name);
2629 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2630 edit.SetColumnFamily(new_id);
2631 edit.SetLogNumber(logfile_number_);
2632 edit.SetComparatorName(cf_options.comparator->Name());
2633
2634 // LogAndApply will both write the creation in MANIFEST and create
2635 // ColumnFamilyData object
2636 { // write thread
2637 WriteThread::Writer w;
2638 write_thread_.EnterUnbatched(&w, &mutex_);
2639 // LogAndApply will both write the creation in MANIFEST and create
2640 // ColumnFamilyData object
2641 s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2642 &mutex_, directories_.GetDbDir(), false,
2643 &cf_options);
2644 write_thread_.ExitUnbatched(&w);
2645 }
2646 if (s.ok()) {
2647 auto* cfd =
2648 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2649 assert(cfd != nullptr);
2650 std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
2651 s = cfd->AddDirectories(&dummy_created_dirs);
2652 }
2653 if (s.ok()) {
2654 single_column_family_mode_ = false;
2655 auto* cfd =
2656 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2657 assert(cfd != nullptr);
2658 InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2659 *cfd->GetLatestMutableCFOptions());
2660
2661 if (!cfd->mem()->IsSnapshotSupported()) {
2662 is_snapshot_supported_ = false;
2663 }
2664
2665 cfd->set_initialized();
2666
2667 *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2668 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2669 "Created column family [%s] (ID %u)",
2670 column_family_name.c_str(), (unsigned)cfd->GetID());
2671 } else {
2672 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2673 "Creating column family [%s] FAILED -- %s",
2674 column_family_name.c_str(), s.ToString().c_str());
2675 }
2676 } // InstrumentedMutexLock l(&mutex_)
2677
2678 sv_context.Clean();
2679 // this is outside the mutex
2680 if (s.ok()) {
2681 NewThreadStatusCfInfo(
2682 static_cast_with_check<ColumnFamilyHandleImpl>(*handle)->cfd());
2683 }
2684 return s;
2685 }
2686
DropColumnFamily(ColumnFamilyHandle * column_family)2687 Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
2688 assert(column_family != nullptr);
2689 Status s = DropColumnFamilyImpl(column_family);
2690 if (s.ok()) {
2691 s = WriteOptionsFile(true /*need_mutex_lock*/,
2692 true /*need_enter_write_thread*/);
2693 }
2694 return s;
2695 }
2696
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > & column_families)2697 Status DBImpl::DropColumnFamilies(
2698 const std::vector<ColumnFamilyHandle*>& column_families) {
2699 Status s;
2700 bool success_once = false;
2701 for (auto* handle : column_families) {
2702 s = DropColumnFamilyImpl(handle);
2703 if (!s.ok()) {
2704 break;
2705 }
2706 success_once = true;
2707 }
2708 if (success_once) {
2709 Status persist_options_status = WriteOptionsFile(
2710 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2711 if (s.ok() && !persist_options_status.ok()) {
2712 s = persist_options_status;
2713 }
2714 }
2715 return s;
2716 }
2717
DropColumnFamilyImpl(ColumnFamilyHandle * column_family)2718 Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
2719 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
2720 auto cfd = cfh->cfd();
2721 if (cfd->GetID() == 0) {
2722 return Status::InvalidArgument("Can't drop default column family");
2723 }
2724
2725 bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2726
2727 VersionEdit edit;
2728 edit.DropColumnFamily();
2729 edit.SetColumnFamily(cfd->GetID());
2730
2731 Status s;
2732 {
2733 InstrumentedMutexLock l(&mutex_);
2734 if (cfd->IsDropped()) {
2735 s = Status::InvalidArgument("Column family already dropped!\n");
2736 }
2737 if (s.ok()) {
2738 // we drop column family from a single write thread
2739 WriteThread::Writer w;
2740 write_thread_.EnterUnbatched(&w, &mutex_);
2741 s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2742 &mutex_);
2743 write_thread_.ExitUnbatched(&w);
2744 }
2745 if (s.ok()) {
2746 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2747 max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2748 mutable_cf_options->max_write_buffer_number;
2749 }
2750
2751 if (!cf_support_snapshot) {
2752 // Dropped Column Family doesn't support snapshot. Need to recalculate
2753 // is_snapshot_supported_.
2754 bool new_is_snapshot_supported = true;
2755 for (auto c : *versions_->GetColumnFamilySet()) {
2756 if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2757 new_is_snapshot_supported = false;
2758 break;
2759 }
2760 }
2761 is_snapshot_supported_ = new_is_snapshot_supported;
2762 }
2763 bg_cv_.SignalAll();
2764 }
2765
2766 if (s.ok()) {
2767 // Note that here we erase the associated cf_info of the to-be-dropped
2768 // cfd before its ref-count goes to zero to avoid having to erase cf_info
2769 // later inside db_mutex.
2770 EraseThreadStatusCfInfo(cfd);
2771 assert(cfd->IsDropped());
2772 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2773 "Dropped column family with id %u\n", cfd->GetID());
2774 } else {
2775 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2776 "Dropping column family with id %u FAILED -- %s\n",
2777 cfd->GetID(), s.ToString().c_str());
2778 }
2779
2780 return s;
2781 }
2782
KeyMayExist(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,std::string * timestamp,bool * value_found)2783 bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2784 ColumnFamilyHandle* column_family, const Slice& key,
2785 std::string* value, std::string* timestamp,
2786 bool* value_found) {
2787 assert(value != nullptr);
2788 if (value_found != nullptr) {
2789 // falsify later if key-may-exist but can't fetch value
2790 *value_found = true;
2791 }
2792 ReadOptions roptions = read_options;
2793 roptions.read_tier = kBlockCacheTier; // read from block cache only
2794 PinnableSlice pinnable_val;
2795 GetImplOptions get_impl_options;
2796 get_impl_options.column_family = column_family;
2797 get_impl_options.value = &pinnable_val;
2798 get_impl_options.value_found = value_found;
2799 get_impl_options.timestamp = timestamp;
2800 auto s = GetImpl(roptions, key, get_impl_options);
2801 value->assign(pinnable_val.data(), pinnable_val.size());
2802
2803 // If block_cache is enabled and the index block of the table didn't
2804 // not present in block_cache, the return value will be Status::Incomplete.
2805 // In this case, key may still exist in the table.
2806 return s.ok() || s.IsIncomplete();
2807 }
2808
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)2809 Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2810 ColumnFamilyHandle* column_family) {
2811 if (read_options.managed) {
2812 return NewErrorIterator(
2813 Status::NotSupported("Managed iterator is not supported anymore."));
2814 }
2815 Iterator* result = nullptr;
2816 if (read_options.read_tier == kPersistedTier) {
2817 return NewErrorIterator(Status::NotSupported(
2818 "ReadTier::kPersistedData is not yet supported in iterators."));
2819 }
2820 // if iterator wants internal keys, we can only proceed if
2821 // we can guarantee the deletes haven't been processed yet
2822 if (immutable_db_options_.preserve_deletes &&
2823 read_options.iter_start_seqnum > 0 &&
2824 read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2825 return NewErrorIterator(Status::InvalidArgument(
2826 "Iterator requested internal keys which are too old and are not"
2827 " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2828 }
2829 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
2830 ColumnFamilyData* cfd = cfh->cfd();
2831 assert(cfd != nullptr);
2832 ReadCallback* read_callback = nullptr; // No read callback provided.
2833 if (read_options.tailing) {
2834 #ifdef ROCKSDB_LITE
2835 // not supported in lite version
2836 result = nullptr;
2837
2838 #else
2839 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2840 auto iter = new ForwardIterator(this, read_options, cfd, sv,
2841 /* allow_unprepared_value */ true);
2842 result = NewDBIterator(
2843 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2844 cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
2845 sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2846 this, cfd);
2847 #endif
2848 } else {
2849 // Note: no need to consider the special case of
2850 // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2851 // WritePreparedTxnDB
2852 result = NewIteratorImpl(read_options, cfd,
2853 (read_options.snapshot != nullptr)
2854 ? read_options.snapshot->GetSequenceNumber()
2855 : kMaxSequenceNumber,
2856 read_callback);
2857 }
2858 return result;
2859 }
2860
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool expose_blob_index,bool allow_refresh)2861 ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2862 ColumnFamilyData* cfd,
2863 SequenceNumber snapshot,
2864 ReadCallback* read_callback,
2865 bool expose_blob_index,
2866 bool allow_refresh) {
2867 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2868
2869 TEST_SYNC_POINT("DBImpl::NewIterator:1");
2870 TEST_SYNC_POINT("DBImpl::NewIterator:2");
2871
2872 if (snapshot == kMaxSequenceNumber) {
2873 // Note that the snapshot is assigned AFTER referencing the super
2874 // version because otherwise a flush happening in between may compact away
2875 // data for the snapshot, so the reader would see neither data that was be
2876 // visible to the snapshot before compaction nor the newer data inserted
2877 // afterwards.
2878 // Note that the super version might not contain all the data available
2879 // to this snapshot, but in that case it can see all the data in the
2880 // super version, which is a valid consistent state after the user
2881 // calls NewIterator().
2882 snapshot = versions_->LastSequence();
2883 TEST_SYNC_POINT("DBImpl::NewIterator:3");
2884 TEST_SYNC_POINT("DBImpl::NewIterator:4");
2885 }
2886
2887 // Try to generate a DB iterator tree in continuous memory area to be
2888 // cache friendly. Here is an example of result:
2889 // +-------------------------------+
2890 // | |
2891 // | ArenaWrappedDBIter |
2892 // | + |
2893 // | +---> Inner Iterator ------------+
2894 // | | | |
2895 // | | +-- -- -- -- -- -- -- --+ |
2896 // | +--- | Arena | |
2897 // | | | |
2898 // | Allocated Memory: | |
2899 // | | +-------------------+ |
2900 // | | | DBIter | <---+
2901 // | | + |
2902 // | | | +-> iter_ ------------+
2903 // | | | | |
2904 // | | +-------------------+ |
2905 // | | | MergingIterator | <---+
2906 // | | + |
2907 // | | | +->child iter1 ------------+
2908 // | | | | | |
2909 // | | +->child iter2 ----------+ |
2910 // | | | | | | |
2911 // | | | +->child iter3 --------+ | |
2912 // | | | | | |
2913 // | | +-------------------+ | | |
2914 // | | | Iterator1 | <--------+
2915 // | | +-------------------+ | |
2916 // | | | Iterator2 | <------+
2917 // | | +-------------------+ |
2918 // | | | Iterator3 | <----+
2919 // | | +-------------------+
2920 // | | |
2921 // +-------+-----------------------+
2922 //
2923 // ArenaWrappedDBIter inlines an arena area where all the iterators in
2924 // the iterator tree are allocated in the order of being accessed when
2925 // querying.
2926 // Laying out the iterators in the order of being accessed makes it more
2927 // likely that any iterator pointer is close to the iterator it points to so
2928 // that they are likely to be in the same cache line and/or page.
2929 ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2930 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
2931 snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
2932 sv->version_number, read_callback, this, cfd, expose_blob_index,
2933 read_options.snapshot != nullptr ? false : allow_refresh);
2934
2935 InternalIterator* internal_iter = NewInternalIterator(
2936 db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
2937 db_iter->GetRangeDelAggregator(), snapshot,
2938 /* allow_unprepared_value */ true);
2939 db_iter->SetIterUnderDBIter(internal_iter);
2940
2941 return db_iter;
2942 }
2943
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)2944 Status DBImpl::NewIterators(
2945 const ReadOptions& read_options,
2946 const std::vector<ColumnFamilyHandle*>& column_families,
2947 std::vector<Iterator*>* iterators) {
2948 if (read_options.managed) {
2949 return Status::NotSupported("Managed iterator is not supported anymore.");
2950 }
2951 if (read_options.read_tier == kPersistedTier) {
2952 return Status::NotSupported(
2953 "ReadTier::kPersistedData is not yet supported in iterators.");
2954 }
2955 ReadCallback* read_callback = nullptr; // No read callback provided.
2956 iterators->clear();
2957 iterators->reserve(column_families.size());
2958 if (read_options.tailing) {
2959 #ifdef ROCKSDB_LITE
2960 return Status::InvalidArgument(
2961 "Tailing iterator not supported in RocksDB lite");
2962 #else
2963 for (auto cfh : column_families) {
2964 auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
2965 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2966 auto iter = new ForwardIterator(this, read_options, cfd, sv,
2967 /* allow_unprepared_value */ true);
2968 iterators->push_back(NewDBIterator(
2969 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2970 cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
2971 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2972 read_callback, this, cfd));
2973 }
2974 #endif
2975 } else {
2976 // Note: no need to consider the special case of
2977 // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2978 // WritePreparedTxnDB
2979 auto snapshot = read_options.snapshot != nullptr
2980 ? read_options.snapshot->GetSequenceNumber()
2981 : versions_->LastSequence();
2982 for (size_t i = 0; i < column_families.size(); ++i) {
2983 auto* cfd =
2984 static_cast_with_check<ColumnFamilyHandleImpl>(column_families[i])
2985 ->cfd();
2986 iterators->push_back(
2987 NewIteratorImpl(read_options, cfd, snapshot, read_callback));
2988 }
2989 }
2990
2991 return Status::OK();
2992 }
2993
GetSnapshot()2994 const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2995
2996 #ifndef ROCKSDB_LITE
GetSnapshotForWriteConflictBoundary()2997 const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2998 return GetSnapshotImpl(true);
2999 }
3000 #endif // ROCKSDB_LITE
3001
GetSnapshotImpl(bool is_write_conflict_boundary,bool lock)3002 SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
3003 bool lock) {
3004 int64_t unix_time = 0;
3005 immutable_db_options_.clock->GetCurrentTime(&unix_time)
3006 .PermitUncheckedError(); // Ignore error
3007 SnapshotImpl* s = new SnapshotImpl;
3008
3009 if (lock) {
3010 mutex_.Lock();
3011 }
3012 // returns null if the underlying memtable does not support snapshot.
3013 if (!is_snapshot_supported_) {
3014 if (lock) {
3015 mutex_.Unlock();
3016 }
3017 delete s;
3018 return nullptr;
3019 }
3020 auto snapshot_seq = last_seq_same_as_publish_seq_
3021 ? versions_->LastSequence()
3022 : versions_->LastPublishedSequence();
3023 SnapshotImpl* snapshot =
3024 snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
3025 if (lock) {
3026 mutex_.Unlock();
3027 }
3028 return snapshot;
3029 }
3030
3031 namespace {
3032 typedef autovector<ColumnFamilyData*, 2> CfdList;
CfdListContains(const CfdList & list,ColumnFamilyData * cfd)3033 bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
3034 for (const ColumnFamilyData* t : list) {
3035 if (t == cfd) {
3036 return true;
3037 }
3038 }
3039 return false;
3040 }
3041 } // namespace
3042
ReleaseSnapshot(const Snapshot * s)3043 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
3044 const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
3045 {
3046 InstrumentedMutexLock l(&mutex_);
3047 snapshots_.Delete(casted_s);
3048 uint64_t oldest_snapshot;
3049 if (snapshots_.empty()) {
3050 if (last_seq_same_as_publish_seq_) {
3051 oldest_snapshot = versions_->LastSequence();
3052 } else {
3053 oldest_snapshot = versions_->LastPublishedSequence();
3054 }
3055 } else {
3056 oldest_snapshot = snapshots_.oldest()->number_;
3057 }
3058 // Avoid to go through every column family by checking a global threshold
3059 // first.
3060 if (oldest_snapshot > bottommost_files_mark_threshold_) {
3061 CfdList cf_scheduled;
3062 for (auto* cfd : *versions_->GetColumnFamilySet()) {
3063 cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
3064 if (!cfd->current()
3065 ->storage_info()
3066 ->BottommostFilesMarkedForCompaction()
3067 .empty()) {
3068 SchedulePendingCompaction(cfd);
3069 MaybeScheduleFlushOrCompaction();
3070 cf_scheduled.push_back(cfd);
3071 }
3072 }
3073
3074 // Calculate a new threshold, skipping those CFs where compactions are
3075 // scheduled. We do not do the same pass as the previous loop because
3076 // mutex might be unlocked during the loop, making the result inaccurate.
3077 SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
3078 for (auto* cfd : *versions_->GetColumnFamilySet()) {
3079 if (CfdListContains(cf_scheduled, cfd)) {
3080 continue;
3081 }
3082 new_bottommost_files_mark_threshold = std::min(
3083 new_bottommost_files_mark_threshold,
3084 cfd->current()->storage_info()->bottommost_files_mark_threshold());
3085 }
3086 bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
3087 }
3088 }
3089 delete casted_s;
3090 }
3091
3092 #ifndef ROCKSDB_LITE
GetPropertiesOfAllTables(ColumnFamilyHandle * column_family,TablePropertiesCollection * props)3093 Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
3094 TablePropertiesCollection* props) {
3095 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3096 auto cfd = cfh->cfd();
3097
3098 // Increment the ref count
3099 mutex_.Lock();
3100 auto version = cfd->current();
3101 version->Ref();
3102 mutex_.Unlock();
3103
3104 auto s = version->GetPropertiesOfAllTables(props);
3105
3106 // Decrement the ref count
3107 mutex_.Lock();
3108 version->Unref();
3109 mutex_.Unlock();
3110
3111 return s;
3112 }
3113
GetPropertiesOfTablesInRange(ColumnFamilyHandle * column_family,const Range * range,std::size_t n,TablePropertiesCollection * props)3114 Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
3115 const Range* range, std::size_t n,
3116 TablePropertiesCollection* props) {
3117 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3118 auto cfd = cfh->cfd();
3119
3120 // Increment the ref count
3121 mutex_.Lock();
3122 auto version = cfd->current();
3123 version->Ref();
3124 mutex_.Unlock();
3125
3126 auto s = version->GetPropertiesOfTablesInRange(range, n, props);
3127
3128 // Decrement the ref count
3129 mutex_.Lock();
3130 version->Unref();
3131 mutex_.Unlock();
3132
3133 return s;
3134 }
3135
3136 #endif // ROCKSDB_LITE
3137
GetName() const3138 const std::string& DBImpl::GetName() const { return dbname_; }
3139
GetEnv() const3140 Env* DBImpl::GetEnv() const { return env_; }
3141
GetFileSystem() const3142 FileSystem* DB::GetFileSystem() const {
3143 const auto& fs = GetEnv()->GetFileSystem();
3144 return fs.get();
3145 }
3146
GetFileSystem() const3147 FileSystem* DBImpl::GetFileSystem() const {
3148 return immutable_db_options_.fs.get();
3149 }
3150
GetSystemClock() const3151 SystemClock* DBImpl::GetSystemClock() const {
3152 return immutable_db_options_.clock;
3153 }
3154
3155 #ifndef ROCKSDB_LITE
3156
StartIOTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)3157 Status DBImpl::StartIOTrace(const TraceOptions& trace_options,
3158 std::unique_ptr<TraceWriter>&& trace_writer) {
3159 assert(trace_writer != nullptr);
3160 return io_tracer_->StartIOTrace(GetSystemClock(), trace_options,
3161 std::move(trace_writer));
3162 }
3163
EndIOTrace()3164 Status DBImpl::EndIOTrace() {
3165 io_tracer_->EndIOTrace();
3166 return Status::OK();
3167 }
3168
3169 #endif // ROCKSDB_LITE
3170
GetOptions(ColumnFamilyHandle * column_family) const3171 Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
3172 InstrumentedMutexLock l(&mutex_);
3173 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3174 return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
3175 cfh->cfd()->GetLatestCFOptions());
3176 }
3177
GetDBOptions() const3178 DBOptions DBImpl::GetDBOptions() const {
3179 InstrumentedMutexLock l(&mutex_);
3180 return BuildDBOptions(immutable_db_options_, mutable_db_options_);
3181 }
3182
GetProperty(ColumnFamilyHandle * column_family,const Slice & property,std::string * value)3183 bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
3184 const Slice& property, std::string* value) {
3185 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3186 value->clear();
3187 auto cfd =
3188 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3189 if (property_info == nullptr) {
3190 return false;
3191 } else if (property_info->handle_int) {
3192 uint64_t int_value;
3193 bool ret_value =
3194 GetIntPropertyInternal(cfd, *property_info, false, &int_value);
3195 if (ret_value) {
3196 *value = ToString(int_value);
3197 }
3198 return ret_value;
3199 } else if (property_info->handle_string) {
3200 InstrumentedMutexLock l(&mutex_);
3201 return cfd->internal_stats()->GetStringProperty(*property_info, property,
3202 value);
3203 } else if (property_info->handle_string_dbimpl) {
3204 std::string tmp_value;
3205 bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
3206 if (ret_value) {
3207 *value = tmp_value;
3208 }
3209 return ret_value;
3210 }
3211 // Shouldn't reach here since exactly one of handle_string and handle_int
3212 // should be non-nullptr.
3213 assert(false);
3214 return false;
3215 }
3216
GetMapProperty(ColumnFamilyHandle * column_family,const Slice & property,std::map<std::string,std::string> * value)3217 bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
3218 const Slice& property,
3219 std::map<std::string, std::string>* value) {
3220 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3221 value->clear();
3222 auto cfd =
3223 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3224 if (property_info == nullptr) {
3225 return false;
3226 } else if (property_info->handle_map) {
3227 InstrumentedMutexLock l(&mutex_);
3228 return cfd->internal_stats()->GetMapProperty(*property_info, property,
3229 value);
3230 }
3231 // If we reach this point it means that handle_map is not provided for the
3232 // requested property
3233 return false;
3234 }
3235
GetIntProperty(ColumnFamilyHandle * column_family,const Slice & property,uint64_t * value)3236 bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
3237 const Slice& property, uint64_t* value) {
3238 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3239 if (property_info == nullptr || property_info->handle_int == nullptr) {
3240 return false;
3241 }
3242 auto cfd =
3243 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3244 return GetIntPropertyInternal(cfd, *property_info, false, value);
3245 }
3246
GetIntPropertyInternal(ColumnFamilyData * cfd,const DBPropertyInfo & property_info,bool is_locked,uint64_t * value)3247 bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
3248 const DBPropertyInfo& property_info,
3249 bool is_locked, uint64_t* value) {
3250 assert(property_info.handle_int != nullptr);
3251 if (!property_info.need_out_of_mutex) {
3252 if (is_locked) {
3253 mutex_.AssertHeld();
3254 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3255 } else {
3256 InstrumentedMutexLock l(&mutex_);
3257 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3258 }
3259 } else {
3260 SuperVersion* sv = nullptr;
3261 if (is_locked) {
3262 mutex_.Unlock();
3263 }
3264 sv = GetAndRefSuperVersion(cfd);
3265
3266 bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
3267 property_info, sv->current, value);
3268
3269 ReturnAndCleanupSuperVersion(cfd, sv);
3270 if (is_locked) {
3271 mutex_.Lock();
3272 }
3273
3274 return ret;
3275 }
3276 }
3277
GetPropertyHandleOptionsStatistics(std::string * value)3278 bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
3279 assert(value != nullptr);
3280 Statistics* statistics = immutable_db_options_.stats;
3281 if (!statistics) {
3282 return false;
3283 }
3284 *value = statistics->ToString();
3285 return true;
3286 }
3287
3288 #ifndef ROCKSDB_LITE
ResetStats()3289 Status DBImpl::ResetStats() {
3290 InstrumentedMutexLock l(&mutex_);
3291 for (auto* cfd : *versions_->GetColumnFamilySet()) {
3292 if (cfd->initialized()) {
3293 cfd->internal_stats()->Clear();
3294 }
3295 }
3296 return Status::OK();
3297 }
3298 #endif // ROCKSDB_LITE
3299
GetAggregatedIntProperty(const Slice & property,uint64_t * aggregated_value)3300 bool DBImpl::GetAggregatedIntProperty(const Slice& property,
3301 uint64_t* aggregated_value) {
3302 const DBPropertyInfo* property_info = GetPropertyInfo(property);
3303 if (property_info == nullptr || property_info->handle_int == nullptr) {
3304 return false;
3305 }
3306
3307 uint64_t sum = 0;
3308 bool ret = true;
3309 {
3310 // Needs mutex to protect the list of column families.
3311 InstrumentedMutexLock l(&mutex_);
3312 uint64_t value;
3313 for (auto* cfd : *versions_->GetColumnFamilySet()) {
3314 if (!cfd->initialized()) {
3315 continue;
3316 }
3317 cfd->Ref();
3318 ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
3319 // GetIntPropertyInternal may release db mutex and re-acquire it.
3320 mutex_.AssertHeld();
3321 cfd->UnrefAndTryDelete();
3322 if (ret) {
3323 sum += value;
3324 } else {
3325 ret = false;
3326 break;
3327 }
3328 }
3329 }
3330 *aggregated_value = sum;
3331 return ret;
3332 }
3333
GetAndRefSuperVersion(ColumnFamilyData * cfd)3334 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
3335 // TODO(ljin): consider using GetReferencedSuperVersion() directly
3336 return cfd->GetThreadLocalSuperVersion(this);
3337 }
3338
3339 // REQUIRED: this function should only be called on the write thread or if the
3340 // mutex is held.
GetAndRefSuperVersion(uint32_t column_family_id)3341 SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
3342 auto column_family_set = versions_->GetColumnFamilySet();
3343 auto cfd = column_family_set->GetColumnFamily(column_family_id);
3344 if (!cfd) {
3345 return nullptr;
3346 }
3347
3348 return GetAndRefSuperVersion(cfd);
3349 }
3350
CleanupSuperVersion(SuperVersion * sv)3351 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
3352 // Release SuperVersion
3353 if (sv->Unref()) {
3354 bool defer_purge =
3355 immutable_db_options().avoid_unnecessary_blocking_io;
3356 {
3357 InstrumentedMutexLock l(&mutex_);
3358 sv->Cleanup();
3359 if (defer_purge) {
3360 AddSuperVersionsToFreeQueue(sv);
3361 SchedulePurge();
3362 }
3363 }
3364 if (!defer_purge) {
3365 delete sv;
3366 }
3367 RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
3368 }
3369 RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
3370 }
3371
ReturnAndCleanupSuperVersion(ColumnFamilyData * cfd,SuperVersion * sv)3372 void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
3373 SuperVersion* sv) {
3374 if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
3375 CleanupSuperVersion(sv);
3376 }
3377 }
3378
3379 // REQUIRED: this function should only be called on the write thread.
ReturnAndCleanupSuperVersion(uint32_t column_family_id,SuperVersion * sv)3380 void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
3381 SuperVersion* sv) {
3382 auto column_family_set = versions_->GetColumnFamilySet();
3383 auto cfd = column_family_set->GetColumnFamily(column_family_id);
3384
3385 // If SuperVersion is held, and we successfully fetched a cfd using
3386 // GetAndRefSuperVersion(), it must still exist.
3387 assert(cfd != nullptr);
3388 ReturnAndCleanupSuperVersion(cfd, sv);
3389 }
3390
3391 // REQUIRED: this function should only be called on the write thread or if the
3392 // mutex is held.
GetColumnFamilyHandle(uint32_t column_family_id)3393 ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
3394 ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
3395
3396 if (!cf_memtables->Seek(column_family_id)) {
3397 return nullptr;
3398 }
3399
3400 return cf_memtables->GetColumnFamilyHandle();
3401 }
3402
3403 // REQUIRED: mutex is NOT held.
GetColumnFamilyHandleUnlocked(uint32_t column_family_id)3404 std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
3405 uint32_t column_family_id) {
3406 InstrumentedMutexLock l(&mutex_);
3407
3408 auto* cfd =
3409 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3410 if (cfd == nullptr) {
3411 return nullptr;
3412 }
3413
3414 return std::unique_ptr<ColumnFamilyHandleImpl>(
3415 new ColumnFamilyHandleImpl(cfd, this, &mutex_));
3416 }
3417
GetApproximateMemTableStats(ColumnFamilyHandle * column_family,const Range & range,uint64_t * const count,uint64_t * const size)3418 void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3419 const Range& range,
3420 uint64_t* const count,
3421 uint64_t* const size) {
3422 ColumnFamilyHandleImpl* cfh =
3423 static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3424 ColumnFamilyData* cfd = cfh->cfd();
3425 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3426
3427 // Convert user_key into a corresponding internal key.
3428 InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3429 InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3430 MemTable::MemTableStats memStats =
3431 sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3432 MemTable::MemTableStats immStats =
3433 sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3434 *count = memStats.count + immStats.count;
3435 *size = memStats.size + immStats.size;
3436
3437 ReturnAndCleanupSuperVersion(cfd, sv);
3438 }
3439
GetApproximateSizes(const SizeApproximationOptions & options,ColumnFamilyHandle * column_family,const Range * range,int n,uint64_t * sizes)3440 Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3441 ColumnFamilyHandle* column_family,
3442 const Range* range, int n, uint64_t* sizes) {
3443 if (!options.include_memtabtles && !options.include_files) {
3444 return Status::InvalidArgument("Invalid options");
3445 }
3446
3447 const Comparator* const ucmp = column_family->GetComparator();
3448 assert(ucmp);
3449 size_t ts_sz = ucmp->timestamp_size();
3450
3451 Version* v;
3452 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3453 auto cfd = cfh->cfd();
3454 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3455 v = sv->current;
3456
3457 for (int i = 0; i < n; i++) {
3458 Slice start = range[i].start;
3459 Slice limit = range[i].limit;
3460
3461 // Add timestamp if needed
3462 std::string start_with_ts, limit_with_ts;
3463 if (ts_sz > 0) {
3464 // Maximum timestamp means including all key with any timestamp
3465 AppendKeyWithMaxTimestamp(&start_with_ts, start, ts_sz);
3466 // Append a maximum timestamp as the range limit is exclusive:
3467 // [start, limit)
3468 AppendKeyWithMaxTimestamp(&limit_with_ts, limit, ts_sz);
3469 start = start_with_ts;
3470 limit = limit_with_ts;
3471 }
3472 // Convert user_key into a corresponding internal key.
3473 InternalKey k1(start, kMaxSequenceNumber, kValueTypeForSeek);
3474 InternalKey k2(limit, kMaxSequenceNumber, kValueTypeForSeek);
3475 sizes[i] = 0;
3476 if (options.include_files) {
3477 sizes[i] += versions_->ApproximateSize(
3478 options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3479 /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
3480 }
3481 if (options.include_memtabtles) {
3482 sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3483 sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3484 }
3485 }
3486
3487 ReturnAndCleanupSuperVersion(cfd, sv);
3488 return Status::OK();
3489 }
3490
3491 std::list<uint64_t>::iterator
CaptureCurrentFileNumberInPendingOutputs()3492 DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3493 // We need to remember the iterator of our insert, because after the
3494 // background job is done, we need to remove that element from
3495 // pending_outputs_.
3496 pending_outputs_.push_back(versions_->current_next_file_number());
3497 auto pending_outputs_inserted_elem = pending_outputs_.end();
3498 --pending_outputs_inserted_elem;
3499 return pending_outputs_inserted_elem;
3500 }
3501
ReleaseFileNumberFromPendingOutputs(std::unique_ptr<std::list<uint64_t>::iterator> & v)3502 void DBImpl::ReleaseFileNumberFromPendingOutputs(
3503 std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3504 if (v.get() != nullptr) {
3505 pending_outputs_.erase(*v.get());
3506 v.reset();
3507 }
3508 }
3509
3510 #ifndef ROCKSDB_LITE
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options)3511 Status DBImpl::GetUpdatesSince(
3512 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
3513 const TransactionLogIterator::ReadOptions& read_options) {
3514 RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3515 if (seq > versions_->LastSequence()) {
3516 return Status::NotFound("Requested sequence not yet written in the db");
3517 }
3518 return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3519 }
3520
DeleteFile(std::string name)3521 Status DBImpl::DeleteFile(std::string name) {
3522 uint64_t number;
3523 FileType type;
3524 WalFileType log_type;
3525 if (!ParseFileName(name, &number, &type, &log_type) ||
3526 (type != kTableFile && type != kWalFile)) {
3527 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3528 name.c_str());
3529 return Status::InvalidArgument("Invalid file name");
3530 }
3531
3532 if (type == kWalFile) {
3533 // Only allow deleting archived log files
3534 if (log_type != kArchivedLogFile) {
3535 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3536 "DeleteFile %s failed - not archived log.\n",
3537 name.c_str());
3538 return Status::NotSupported("Delete only supported for archived logs");
3539 }
3540 Status status = wal_manager_.DeleteFile(name, number);
3541 if (!status.ok()) {
3542 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3543 "DeleteFile %s failed -- %s.\n", name.c_str(),
3544 status.ToString().c_str());
3545 }
3546 return status;
3547 }
3548
3549 Status status;
3550 int level;
3551 FileMetaData* metadata;
3552 ColumnFamilyData* cfd;
3553 VersionEdit edit;
3554 JobContext job_context(next_job_id_.fetch_add(1), true);
3555 {
3556 InstrumentedMutexLock l(&mutex_);
3557 status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3558 if (!status.ok()) {
3559 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3560 "DeleteFile %s failed. File not found\n", name.c_str());
3561 job_context.Clean();
3562 return Status::InvalidArgument("File not found");
3563 }
3564 assert(level < cfd->NumberLevels());
3565
3566 // If the file is being compacted no need to delete.
3567 if (metadata->being_compacted) {
3568 ROCKS_LOG_INFO(immutable_db_options_.info_log,
3569 "DeleteFile %s Skipped. File about to be compacted\n",
3570 name.c_str());
3571 job_context.Clean();
3572 return Status::OK();
3573 }
3574
3575 // Only the files in the last level can be deleted externally.
3576 // This is to make sure that any deletion tombstones are not
3577 // lost. Check that the level passed is the last level.
3578 auto* vstoreage = cfd->current()->storage_info();
3579 for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3580 if (vstoreage->NumLevelFiles(i) != 0) {
3581 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3582 "DeleteFile %s FAILED. File not in last level\n",
3583 name.c_str());
3584 job_context.Clean();
3585 return Status::InvalidArgument("File not in last level");
3586 }
3587 }
3588 // if level == 0, it has to be the oldest file
3589 if (level == 0 &&
3590 vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3591 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3592 "DeleteFile %s failed ---"
3593 " target file in level 0 must be the oldest.",
3594 name.c_str());
3595 job_context.Clean();
3596 return Status::InvalidArgument("File in level 0, but not oldest");
3597 }
3598 edit.SetColumnFamily(cfd->GetID());
3599 edit.DeleteFile(level, number);
3600 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3601 &edit, &mutex_, directories_.GetDbDir());
3602 if (status.ok()) {
3603 InstallSuperVersionAndScheduleWork(cfd,
3604 &job_context.superversion_contexts[0],
3605 *cfd->GetLatestMutableCFOptions());
3606 }
3607 FindObsoleteFiles(&job_context, false);
3608 } // lock released here
3609
3610 LogFlush(immutable_db_options_.info_log);
3611 // remove files outside the db-lock
3612 if (job_context.HaveSomethingToDelete()) {
3613 // Call PurgeObsoleteFiles() without holding mutex.
3614 PurgeObsoleteFiles(job_context);
3615 }
3616 job_context.Clean();
3617 return status;
3618 }
3619
DeleteFilesInRanges(ColumnFamilyHandle * column_family,const RangePtr * ranges,size_t n,bool include_end)3620 Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3621 const RangePtr* ranges, size_t n,
3622 bool include_end) {
3623 Status status = Status::OK();
3624 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3625 ColumnFamilyData* cfd = cfh->cfd();
3626 VersionEdit edit;
3627 std::set<FileMetaData*> deleted_files;
3628 JobContext job_context(next_job_id_.fetch_add(1), true);
3629 {
3630 InstrumentedMutexLock l(&mutex_);
3631 Version* input_version = cfd->current();
3632
3633 auto* vstorage = input_version->storage_info();
3634 for (size_t r = 0; r < n; r++) {
3635 auto begin = ranges[r].start, end = ranges[r].limit;
3636 for (int i = 1; i < cfd->NumberLevels(); i++) {
3637 if (vstorage->LevelFiles(i).empty() ||
3638 !vstorage->OverlapInLevel(i, begin, end)) {
3639 continue;
3640 }
3641 std::vector<FileMetaData*> level_files;
3642 InternalKey begin_storage, end_storage, *begin_key, *end_key;
3643 if (begin == nullptr) {
3644 begin_key = nullptr;
3645 } else {
3646 begin_storage.SetMinPossibleForUserKey(*begin);
3647 begin_key = &begin_storage;
3648 }
3649 if (end == nullptr) {
3650 end_key = nullptr;
3651 } else {
3652 end_storage.SetMaxPossibleForUserKey(*end);
3653 end_key = &end_storage;
3654 }
3655
3656 vstorage->GetCleanInputsWithinInterval(
3657 i, begin_key, end_key, &level_files, -1 /* hint_index */,
3658 nullptr /* file_index */);
3659 FileMetaData* level_file;
3660 for (uint32_t j = 0; j < level_files.size(); j++) {
3661 level_file = level_files[j];
3662 if (level_file->being_compacted) {
3663 continue;
3664 }
3665 if (deleted_files.find(level_file) != deleted_files.end()) {
3666 continue;
3667 }
3668 if (!include_end && end != nullptr &&
3669 cfd->user_comparator()->Compare(level_file->largest.user_key(),
3670 *end) == 0) {
3671 continue;
3672 }
3673 edit.SetColumnFamily(cfd->GetID());
3674 edit.DeleteFile(i, level_file->fd.GetNumber());
3675 deleted_files.insert(level_file);
3676 level_file->being_compacted = true;
3677 }
3678 }
3679 }
3680 if (edit.GetDeletedFiles().empty()) {
3681 job_context.Clean();
3682 return status;
3683 }
3684 input_version->Ref();
3685 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3686 &edit, &mutex_, directories_.GetDbDir());
3687 if (status.ok()) {
3688 InstallSuperVersionAndScheduleWork(cfd,
3689 &job_context.superversion_contexts[0],
3690 *cfd->GetLatestMutableCFOptions());
3691 }
3692 for (auto* deleted_file : deleted_files) {
3693 deleted_file->being_compacted = false;
3694 }
3695 input_version->Unref();
3696 FindObsoleteFiles(&job_context, false);
3697 } // lock released here
3698
3699 LogFlush(immutable_db_options_.info_log);
3700 // remove files outside the db-lock
3701 if (job_context.HaveSomethingToDelete()) {
3702 // Call PurgeObsoleteFiles() without holding mutex.
3703 PurgeObsoleteFiles(job_context);
3704 }
3705 job_context.Clean();
3706 return status;
3707 }
3708
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)3709 void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3710 InstrumentedMutexLock l(&mutex_);
3711 versions_->GetLiveFilesMetaData(metadata);
3712 }
3713
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)3714 Status DBImpl::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
3715 InstrumentedMutexLock l(&mutex_);
3716 return versions_->GetLiveFilesChecksumInfo(checksum_list);
3717 }
3718
GetColumnFamilyMetaData(ColumnFamilyHandle * column_family,ColumnFamilyMetaData * cf_meta)3719 void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3720 ColumnFamilyMetaData* cf_meta) {
3721 assert(column_family);
3722 auto* cfd =
3723 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3724 auto* sv = GetAndRefSuperVersion(cfd);
3725 {
3726 // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3727 // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3728 // this may cause regression. An alternative is to make
3729 // FileMetaData::being_compacted atomic, but it will make FileMetaData
3730 // non-copy-able. Another option is to separate these variables from
3731 // original FileMetaData struct, and this requires re-organization of data
3732 // structures. For now, we take the easy approach. If
3733 // DB::GetColumnFamilyMetaData is not called frequently, the regression
3734 // should not be big. We still need to keep an eye on it.
3735 InstrumentedMutexLock l(&mutex_);
3736 sv->current->GetColumnFamilyMetaData(cf_meta);
3737 }
3738 ReturnAndCleanupSuperVersion(cfd, sv);
3739 }
3740
3741 #endif // ROCKSDB_LITE
3742
CheckConsistency()3743 Status DBImpl::CheckConsistency() {
3744 mutex_.AssertHeld();
3745 std::vector<LiveFileMetaData> metadata;
3746 versions_->GetLiveFilesMetaData(&metadata);
3747 TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
3748
3749 std::string corruption_messages;
3750
3751 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
3752 // Instead of calling GetFileSize() for each expected file, call
3753 // GetChildren() for the DB directory and check that all expected files
3754 // are listed, without checking their sizes.
3755 // Since sst files might be in different directories, do it for each
3756 // directory separately.
3757 std::map<std::string, std::vector<std::string>> files_by_directory;
3758 for (const auto& md : metadata) {
3759 // md.name has a leading "/". Remove it.
3760 std::string fname = md.name;
3761 if (!fname.empty() && fname[0] == '/') {
3762 fname = fname.substr(1);
3763 }
3764 files_by_directory[md.db_path].push_back(fname);
3765 }
3766 for (const auto& dir_files : files_by_directory) {
3767 std::string directory = dir_files.first;
3768 std::vector<std::string> existing_files;
3769 Status s = env_->GetChildren(directory, &existing_files);
3770 if (!s.ok()) {
3771 corruption_messages +=
3772 "Can't list files in " + directory + ": " + s.ToString() + "\n";
3773 continue;
3774 }
3775 std::sort(existing_files.begin(), existing_files.end());
3776
3777 for (const std::string& fname : dir_files.second) {
3778 if (!std::binary_search(existing_files.begin(), existing_files.end(),
3779 fname) &&
3780 !std::binary_search(existing_files.begin(), existing_files.end(),
3781 Rocks2LevelTableFileName(fname))) {
3782 corruption_messages +=
3783 "Missing sst file " + fname + " in " + directory + "\n";
3784 }
3785 }
3786 }
3787 } else {
3788 for (const auto& md : metadata) {
3789 // md.name has a leading "/".
3790 std::string file_path = md.db_path + md.name;
3791
3792 uint64_t fsize = 0;
3793 TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3794 Status s = env_->GetFileSize(file_path, &fsize);
3795 if (!s.ok() &&
3796 env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3797 s = Status::OK();
3798 }
3799 if (!s.ok()) {
3800 corruption_messages +=
3801 "Can't access " + md.name + ": " + s.ToString() + "\n";
3802 } else if (fsize != md.size) {
3803 corruption_messages += "Sst file size mismatch: " + file_path +
3804 ". Size recorded in manifest " +
3805 ToString(md.size) + ", actual size " +
3806 ToString(fsize) + "\n";
3807 }
3808 }
3809 }
3810
3811 if (corruption_messages.size() == 0) {
3812 return Status::OK();
3813 } else {
3814 return Status::Corruption(corruption_messages);
3815 }
3816 }
3817
GetDbIdentity(std::string & identity) const3818 Status DBImpl::GetDbIdentity(std::string& identity) const {
3819 identity.assign(db_id_);
3820 return Status::OK();
3821 }
3822
GetDbIdentityFromIdentityFile(std::string * identity) const3823 Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
3824 std::string idfilename = IdentityFileName(dbname_);
3825 const FileOptions soptions;
3826
3827 Status s = ReadFileToString(fs_.get(), idfilename, identity);
3828 if (!s.ok()) {
3829 return s;
3830 }
3831
3832 // If last character is '\n' remove it from identity
3833 if (identity->size() > 0 && identity->back() == '\n') {
3834 identity->pop_back();
3835 }
3836 return s;
3837 }
3838
GetDbSessionId(std::string & session_id) const3839 Status DBImpl::GetDbSessionId(std::string& session_id) const {
3840 session_id.assign(db_session_id_);
3841 return Status::OK();
3842 }
3843
SetDbSessionId()3844 void DBImpl::SetDbSessionId() {
3845 // GenerateUniqueId() generates an identifier that has a negligible
3846 // probability of being duplicated, ~128 bits of entropy
3847 std::string uuid = env_->GenerateUniqueId();
3848
3849 // Hash and reformat that down to a more compact format, 20 characters
3850 // in base-36 ([0-9A-Z]), which is ~103 bits of entropy, which is enough
3851 // to expect no collisions across a billion servers each opening DBs
3852 // a million times (~2^50). Benefits vs. raw unique id:
3853 // * Save ~ dozen bytes per SST file
3854 // * Shorter shared backup file names (some platforms have low limits)
3855 // * Visually distinct from DB id format
3856 uint64_t a = NPHash64(uuid.data(), uuid.size(), 1234U);
3857 uint64_t b = NPHash64(uuid.data(), uuid.size(), 5678U);
3858 db_session_id_.resize(20);
3859 static const char* const base36 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
3860 size_t i = 0;
3861 for (; i < 10U; ++i, a /= 36U) {
3862 db_session_id_[i] = base36[a % 36];
3863 }
3864 for (; i < 20U; ++i, b /= 36U) {
3865 db_session_id_[i] = base36[b % 36];
3866 }
3867 TEST_SYNC_POINT_CALLBACK("DBImpl::SetDbSessionId", &db_session_id_);
3868 }
3869
3870 // Default implementation -- returns not supported status
CreateColumnFamily(const ColumnFamilyOptions &,const std::string &,ColumnFamilyHandle **)3871 Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3872 const std::string& /*column_family_name*/,
3873 ColumnFamilyHandle** /*handle*/) {
3874 return Status::NotSupported("");
3875 }
3876
CreateColumnFamilies(const ColumnFamilyOptions &,const std::vector<std::string> &,std::vector<ColumnFamilyHandle * > *)3877 Status DB::CreateColumnFamilies(
3878 const ColumnFamilyOptions& /*cf_options*/,
3879 const std::vector<std::string>& /*column_family_names*/,
3880 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3881 return Status::NotSupported("");
3882 }
3883
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> &,std::vector<ColumnFamilyHandle * > *)3884 Status DB::CreateColumnFamilies(
3885 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3886 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3887 return Status::NotSupported("");
3888 }
3889
DropColumnFamily(ColumnFamilyHandle *)3890 Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3891 return Status::NotSupported("");
3892 }
3893
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > &)3894 Status DB::DropColumnFamilies(
3895 const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
3896 return Status::NotSupported("");
3897 }
3898
DestroyColumnFamilyHandle(ColumnFamilyHandle * column_family)3899 Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3900 delete column_family;
3901 return Status::OK();
3902 }
3903
~DB()3904 DB::~DB() {}
3905
Close()3906 Status DBImpl::Close() {
3907 if (!closed_) {
3908 {
3909 InstrumentedMutexLock l(&mutex_);
3910 // If there is unreleased snapshot, fail the close call
3911 if (!snapshots_.empty()) {
3912 return Status::Aborted("Cannot close DB with unreleased snapshot.");
3913 }
3914 }
3915
3916 closed_ = true;
3917 return CloseImpl();
3918 }
3919 return Status::OK();
3920 }
3921
ListColumnFamilies(const DBOptions & db_options,const std::string & name,std::vector<std::string> * column_families)3922 Status DB::ListColumnFamilies(const DBOptions& db_options,
3923 const std::string& name,
3924 std::vector<std::string>* column_families) {
3925 const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
3926 return VersionSet::ListColumnFamilies(column_families, name, fs.get());
3927 }
3928
~Snapshot()3929 Snapshot::~Snapshot() {}
3930
DestroyDB(const std::string & dbname,const Options & options,const std::vector<ColumnFamilyDescriptor> & column_families)3931 Status DestroyDB(const std::string& dbname, const Options& options,
3932 const std::vector<ColumnFamilyDescriptor>& column_families) {
3933 ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
3934 Env* env = soptions.env;
3935 std::vector<std::string> filenames;
3936 bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
3937
3938 // Reset the logger because it holds a handle to the
3939 // log file and prevents cleanup and directory removal
3940 soptions.info_log.reset();
3941 // Ignore error in case directory does not exist
3942 env->GetChildren(dbname, &filenames).PermitUncheckedError();
3943
3944 FileLock* lock;
3945 const std::string lockname = LockFileName(dbname);
3946 Status result = env->LockFile(lockname, &lock);
3947 if (result.ok()) {
3948 uint64_t number;
3949 FileType type;
3950 InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
3951 for (const auto& fname : filenames) {
3952 if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
3953 type != kDBLockFile) { // Lock file will be deleted at end
3954 Status del;
3955 std::string path_to_delete = dbname + "/" + fname;
3956 if (type == kMetaDatabase) {
3957 del = DestroyDB(path_to_delete, options);
3958 } else if (type == kTableFile || type == kWalFile ||
3959 type == kBlobFile) {
3960 del = DeleteDBFile(&soptions, path_to_delete, dbname,
3961 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3962 } else {
3963 del = env->DeleteFile(path_to_delete);
3964 }
3965 if (!del.ok() && result.ok()) {
3966 result = del;
3967 }
3968 }
3969 }
3970
3971 std::set<std::string> paths;
3972 for (const DbPath& db_path : options.db_paths) {
3973 paths.insert(db_path.path);
3974 }
3975 for (const ColumnFamilyDescriptor& cf : column_families) {
3976 for (const DbPath& cf_path : cf.options.cf_paths) {
3977 paths.insert(cf_path.path);
3978 }
3979 }
3980 for (const auto& path : paths) {
3981 if (env->GetChildren(path, &filenames).ok()) {
3982 for (const auto& fname : filenames) {
3983 if (ParseFileName(fname, &number, &type) &&
3984 (type == kTableFile ||
3985 type == kBlobFile)) { // Lock file will be deleted at end
3986 std::string file_path = path + "/" + fname;
3987 Status del = DeleteDBFile(&soptions, file_path, dbname,
3988 /*force_bg=*/false, /*force_fg=*/false);
3989 if (!del.ok() && result.ok()) {
3990 result = del;
3991 }
3992 }
3993 }
3994 // TODO: Should we return an error if we cannot delete the directory?
3995 env->DeleteDir(path).PermitUncheckedError();
3996 }
3997 }
3998
3999 std::vector<std::string> walDirFiles;
4000 std::string archivedir = ArchivalDirectory(dbname);
4001 bool wal_dir_exists = false;
4002 if (dbname != soptions.wal_dir) {
4003 wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
4004 archivedir = ArchivalDirectory(soptions.wal_dir);
4005 }
4006
4007 // Archive dir may be inside wal dir or dbname and should be
4008 // processed and removed before those otherwise we have issues
4009 // removing them
4010 std::vector<std::string> archiveFiles;
4011 if (env->GetChildren(archivedir, &archiveFiles).ok()) {
4012 // Delete archival files.
4013 for (const auto& file : archiveFiles) {
4014 if (ParseFileName(file, &number, &type) && type == kWalFile) {
4015 Status del =
4016 DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
4017 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
4018 if (!del.ok() && result.ok()) {
4019 result = del;
4020 }
4021 }
4022 }
4023 // Ignore error in case dir contains other files
4024 env->DeleteDir(archivedir).PermitUncheckedError();
4025 }
4026
4027 // Delete log files in the WAL dir
4028 if (wal_dir_exists) {
4029 for (const auto& file : walDirFiles) {
4030 if (ParseFileName(file, &number, &type) && type == kWalFile) {
4031 Status del =
4032 DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
4033 soptions.wal_dir, /*force_bg=*/false,
4034 /*force_fg=*/!wal_in_db_path);
4035 if (!del.ok() && result.ok()) {
4036 result = del;
4037 }
4038 }
4039 }
4040 // Ignore error in case dir contains other files
4041 env->DeleteDir(soptions.wal_dir).PermitUncheckedError();
4042 }
4043
4044 // Ignore error since state is already gone
4045 env->UnlockFile(lock).PermitUncheckedError();
4046 env->DeleteFile(lockname).PermitUncheckedError();
4047
4048 // sst_file_manager holds a ref to the logger. Make sure the logger is
4049 // gone before trying to remove the directory.
4050 soptions.sst_file_manager.reset();
4051
4052 // Ignore error in case dir contains other files
4053 env->DeleteDir(dbname).PermitUncheckedError();
4054 ;
4055 }
4056 return result;
4057 }
4058
WriteOptionsFile(bool need_mutex_lock,bool need_enter_write_thread)4059 Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
4060 bool need_enter_write_thread) {
4061 #ifndef ROCKSDB_LITE
4062 WriteThread::Writer w;
4063 if (need_mutex_lock) {
4064 mutex_.Lock();
4065 } else {
4066 mutex_.AssertHeld();
4067 }
4068 if (need_enter_write_thread) {
4069 write_thread_.EnterUnbatched(&w, &mutex_);
4070 }
4071
4072 std::vector<std::string> cf_names;
4073 std::vector<ColumnFamilyOptions> cf_opts;
4074
4075 // This part requires mutex to protect the column family options
4076 for (auto cfd : *versions_->GetColumnFamilySet()) {
4077 if (cfd->IsDropped()) {
4078 continue;
4079 }
4080 cf_names.push_back(cfd->GetName());
4081 cf_opts.push_back(cfd->GetLatestCFOptions());
4082 }
4083
4084 // Unlock during expensive operations. New writes cannot get here
4085 // because the single write thread ensures all new writes get queued.
4086 DBOptions db_options =
4087 BuildDBOptions(immutable_db_options_, mutable_db_options_);
4088 mutex_.Unlock();
4089
4090 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
4091 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
4092
4093 std::string file_name =
4094 TempOptionsFileName(GetName(), versions_->NewFileNumber());
4095 Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
4096 fs_.get());
4097
4098 if (s.ok()) {
4099 s = RenameTempFileToOptionsFile(file_name);
4100 }
4101 // restore lock
4102 if (!need_mutex_lock) {
4103 mutex_.Lock();
4104 }
4105 if (need_enter_write_thread) {
4106 write_thread_.ExitUnbatched(&w);
4107 }
4108 if (!s.ok()) {
4109 ROCKS_LOG_WARN(immutable_db_options_.info_log,
4110 "Unnable to persist options -- %s", s.ToString().c_str());
4111 if (immutable_db_options_.fail_if_options_file_error) {
4112 return Status::IOError("Unable to persist options.",
4113 s.ToString().c_str());
4114 }
4115 }
4116 #else
4117 (void)need_mutex_lock;
4118 (void)need_enter_write_thread;
4119 #endif // !ROCKSDB_LITE
4120 return Status::OK();
4121 }
4122
4123 #ifndef ROCKSDB_LITE
4124 namespace {
DeleteOptionsFilesHelper(const std::map<uint64_t,std::string> & filenames,const size_t num_files_to_keep,const std::shared_ptr<Logger> & info_log,Env * env)4125 void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
4126 const size_t num_files_to_keep,
4127 const std::shared_ptr<Logger>& info_log,
4128 Env* env) {
4129 if (filenames.size() <= num_files_to_keep) {
4130 return;
4131 }
4132 for (auto iter = std::next(filenames.begin(), num_files_to_keep);
4133 iter != filenames.end(); ++iter) {
4134 if (!env->DeleteFile(iter->second).ok()) {
4135 ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
4136 iter->second.c_str());
4137 }
4138 }
4139 }
4140 } // namespace
4141 #endif // !ROCKSDB_LITE
4142
DeleteObsoleteOptionsFiles()4143 Status DBImpl::DeleteObsoleteOptionsFiles() {
4144 #ifndef ROCKSDB_LITE
4145 std::vector<std::string> filenames;
4146 // use ordered map to store keep the filenames sorted from the newest
4147 // to the oldest.
4148 std::map<uint64_t, std::string> options_filenames;
4149 Status s;
4150 s = GetEnv()->GetChildren(GetName(), &filenames);
4151 if (!s.ok()) {
4152 return s;
4153 }
4154 for (auto& filename : filenames) {
4155 uint64_t file_number;
4156 FileType type;
4157 if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
4158 options_filenames.insert(
4159 {std::numeric_limits<uint64_t>::max() - file_number,
4160 GetName() + "/" + filename});
4161 }
4162 }
4163
4164 // Keeps the latest 2 Options file
4165 const size_t kNumOptionsFilesKept = 2;
4166 DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
4167 immutable_db_options_.info_log, GetEnv());
4168 return Status::OK();
4169 #else
4170 return Status::OK();
4171 #endif // !ROCKSDB_LITE
4172 }
4173
RenameTempFileToOptionsFile(const std::string & file_name)4174 Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
4175 #ifndef ROCKSDB_LITE
4176 Status s;
4177
4178 uint64_t options_file_number = versions_->NewFileNumber();
4179 std::string options_file_name =
4180 OptionsFileName(GetName(), options_file_number);
4181 // Retry if the file name happen to conflict with an existing one.
4182 s = GetEnv()->RenameFile(file_name, options_file_name);
4183 if (s.ok()) {
4184 InstrumentedMutexLock l(&mutex_);
4185 versions_->options_file_number_ = options_file_number;
4186 }
4187
4188 if (0 == disable_delete_obsolete_files_) {
4189 // TODO: Should we check for errors here?
4190 DeleteObsoleteOptionsFiles().PermitUncheckedError();
4191 }
4192 return s;
4193 #else
4194 (void)file_name;
4195 return Status::OK();
4196 #endif // !ROCKSDB_LITE
4197 }
4198
4199 #ifdef ROCKSDB_USING_THREAD_STATUS
4200
NewThreadStatusCfInfo(ColumnFamilyData * cfd) const4201 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
4202 if (immutable_db_options_.enable_thread_tracking) {
4203 ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
4204 cfd->ioptions()->env);
4205 }
4206 }
4207
EraseThreadStatusCfInfo(ColumnFamilyData * cfd) const4208 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
4209 if (immutable_db_options_.enable_thread_tracking) {
4210 ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
4211 }
4212 }
4213
EraseThreadStatusDbInfo() const4214 void DBImpl::EraseThreadStatusDbInfo() const {
4215 if (immutable_db_options_.enable_thread_tracking) {
4216 ThreadStatusUtil::EraseDatabaseInfo(this);
4217 }
4218 }
4219
4220 #else
NewThreadStatusCfInfo(ColumnFamilyData *) const4221 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
4222
EraseThreadStatusCfInfo(ColumnFamilyData *) const4223 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
4224
EraseThreadStatusDbInfo() const4225 void DBImpl::EraseThreadStatusDbInfo() const {}
4226 #endif // ROCKSDB_USING_THREAD_STATUS
4227
4228 //
4229 // A global method that can dump out the build version
DumpRocksDBBuildVersion(Logger * log)4230 void DumpRocksDBBuildVersion(Logger* log) {
4231 ROCKS_LOG_HEADER(log, "RocksDB version: %s\n",
4232 GetRocksVersionAsString().c_str());
4233 const auto& props = GetRocksBuildProperties();
4234 const auto& sha = props.find("rocksdb_build_git_sha");
4235 if (sha != props.end()) {
4236 ROCKS_LOG_HEADER(log, "Git sha %s", sha->second.c_str());
4237 }
4238 const auto date = props.find("rocksdb_build_date");
4239 if (date != props.end()) {
4240 ROCKS_LOG_HEADER(log, "Compile date %s", date->second.c_str());
4241 }
4242 }
4243
4244 #ifndef ROCKSDB_LITE
GetEarliestMemTableSequenceNumber(SuperVersion * sv,bool include_history)4245 SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
4246 bool include_history) {
4247 // Find the earliest sequence number that we know we can rely on reading
4248 // from the memtable without needing to check sst files.
4249 SequenceNumber earliest_seq =
4250 sv->imm->GetEarliestSequenceNumber(include_history);
4251 if (earliest_seq == kMaxSequenceNumber) {
4252 earliest_seq = sv->mem->GetEarliestSequenceNumber();
4253 }
4254 assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
4255
4256 return earliest_seq;
4257 }
4258 #endif // ROCKSDB_LITE
4259
4260 #ifndef ROCKSDB_LITE
GetLatestSequenceForKey(SuperVersion * sv,const Slice & key,bool cache_only,SequenceNumber lower_bound_seq,SequenceNumber * seq,bool * found_record_for_key,bool * is_blob_index)4261 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
4262 bool cache_only,
4263 SequenceNumber lower_bound_seq,
4264 SequenceNumber* seq,
4265 bool* found_record_for_key,
4266 bool* is_blob_index) {
4267 Status s;
4268 MergeContext merge_context;
4269 SequenceNumber max_covering_tombstone_seq = 0;
4270
4271 ReadOptions read_options;
4272 SequenceNumber current_seq = versions_->LastSequence();
4273 LookupKey lkey(key, current_seq);
4274
4275 *seq = kMaxSequenceNumber;
4276 *found_record_for_key = false;
4277
4278 // Check if there is a record for this key in the latest memtable
4279 sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
4280 &max_covering_tombstone_seq, seq, read_options,
4281 nullptr /*read_callback*/, is_blob_index);
4282
4283 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4284 // unexpected error reading memtable.
4285 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4286 "Unexpected status returned from MemTable::Get: %s\n",
4287 s.ToString().c_str());
4288
4289 return s;
4290 }
4291
4292 if (*seq != kMaxSequenceNumber) {
4293 // Found a sequence number, no need to check immutable memtables
4294 *found_record_for_key = true;
4295 return Status::OK();
4296 }
4297
4298 SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
4299 if (lower_bound_in_mem != kMaxSequenceNumber &&
4300 lower_bound_in_mem < lower_bound_seq) {
4301 *found_record_for_key = false;
4302 return Status::OK();
4303 }
4304
4305 // Check if there is a record for this key in the immutable memtables
4306 sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
4307 &max_covering_tombstone_seq, seq, read_options,
4308 nullptr /*read_callback*/, is_blob_index);
4309
4310 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4311 // unexpected error reading memtable.
4312 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4313 "Unexpected status returned from MemTableList::Get: %s\n",
4314 s.ToString().c_str());
4315
4316 return s;
4317 }
4318
4319 if (*seq != kMaxSequenceNumber) {
4320 // Found a sequence number, no need to check memtable history
4321 *found_record_for_key = true;
4322 return Status::OK();
4323 }
4324
4325 SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
4326 if (lower_bound_in_imm != kMaxSequenceNumber &&
4327 lower_bound_in_imm < lower_bound_seq) {
4328 *found_record_for_key = false;
4329 return Status::OK();
4330 }
4331
4332 // Check if there is a record for this key in the immutable memtables
4333 sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
4334 &max_covering_tombstone_seq, seq, read_options,
4335 is_blob_index);
4336
4337 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4338 // unexpected error reading memtable.
4339 ROCKS_LOG_ERROR(
4340 immutable_db_options_.info_log,
4341 "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
4342 s.ToString().c_str());
4343
4344 return s;
4345 }
4346
4347 if (*seq != kMaxSequenceNumber) {
4348 // Found a sequence number, no need to check SST files
4349 *found_record_for_key = true;
4350 return Status::OK();
4351 }
4352
4353 // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
4354 // check here to skip the history if possible. But currently the caller
4355 // already does that. Maybe we should move the logic here later.
4356
4357 // TODO(agiardullo): possible optimization: consider checking cached
4358 // SST files if cache_only=true?
4359 if (!cache_only) {
4360 // Check tables
4361 sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
4362 &max_covering_tombstone_seq, nullptr /* value_found */,
4363 found_record_for_key, seq, nullptr /*read_callback*/,
4364 is_blob_index);
4365
4366 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4367 // unexpected error reading SST files
4368 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4369 "Unexpected status returned from Version::Get: %s\n",
4370 s.ToString().c_str());
4371 }
4372 }
4373
4374 return s;
4375 }
4376
IngestExternalFile(ColumnFamilyHandle * column_family,const std::vector<std::string> & external_files,const IngestExternalFileOptions & ingestion_options)4377 Status DBImpl::IngestExternalFile(
4378 ColumnFamilyHandle* column_family,
4379 const std::vector<std::string>& external_files,
4380 const IngestExternalFileOptions& ingestion_options) {
4381 IngestExternalFileArg arg;
4382 arg.column_family = column_family;
4383 arg.external_files = external_files;
4384 arg.options = ingestion_options;
4385 return IngestExternalFiles({arg});
4386 }
4387
IngestExternalFiles(const std::vector<IngestExternalFileArg> & args)4388 Status DBImpl::IngestExternalFiles(
4389 const std::vector<IngestExternalFileArg>& args) {
4390 if (args.empty()) {
4391 return Status::InvalidArgument("ingestion arg list is empty");
4392 }
4393 {
4394 std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
4395 for (const auto& arg : args) {
4396 if (arg.column_family == nullptr) {
4397 return Status::InvalidArgument("column family handle is null");
4398 } else if (unique_cfhs.count(arg.column_family) > 0) {
4399 return Status::InvalidArgument(
4400 "ingestion args have duplicate column families");
4401 }
4402 unique_cfhs.insert(arg.column_family);
4403 }
4404 }
4405 // Ingest multiple external SST files atomically.
4406 const size_t num_cfs = args.size();
4407 for (size_t i = 0; i != num_cfs; ++i) {
4408 if (args[i].external_files.empty()) {
4409 char err_msg[128] = {0};
4410 snprintf(err_msg, 128, "external_files[%zu] is empty", i);
4411 return Status::InvalidArgument(err_msg);
4412 }
4413 }
4414 for (const auto& arg : args) {
4415 const IngestExternalFileOptions& ingest_opts = arg.options;
4416 if (ingest_opts.ingest_behind &&
4417 !immutable_db_options_.allow_ingest_behind) {
4418 return Status::InvalidArgument(
4419 "can't ingest_behind file in DB with allow_ingest_behind=false");
4420 }
4421 }
4422
4423 // TODO (yanqin) maybe handle the case in which column_families have
4424 // duplicates
4425 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4426 size_t total = 0;
4427 for (const auto& arg : args) {
4428 total += arg.external_files.size();
4429 }
4430 uint64_t next_file_number = 0;
4431 Status status = ReserveFileNumbersBeforeIngestion(
4432 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
4433 pending_output_elem, &next_file_number);
4434 if (!status.ok()) {
4435 InstrumentedMutexLock l(&mutex_);
4436 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4437 return status;
4438 }
4439
4440 std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
4441 for (const auto& arg : args) {
4442 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
4443 ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
4444 file_options_, &snapshots_, arg.options,
4445 &directories_, &event_logger_, io_tracer_);
4446 }
4447
4448 // TODO(yanqin) maybe make jobs run in parallel
4449 uint64_t start_file_number = next_file_number;
4450 for (size_t i = 1; i != num_cfs; ++i) {
4451 start_file_number += args[i - 1].external_files.size();
4452 auto* cfd =
4453 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4454 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4455 Status es = ingestion_jobs[i].Prepare(
4456 args[i].external_files, args[i].files_checksums,
4457 args[i].files_checksum_func_names, start_file_number, super_version);
4458 // capture first error only
4459 if (!es.ok() && status.ok()) {
4460 status = es;
4461 }
4462 CleanupSuperVersion(super_version);
4463 }
4464 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
4465 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
4466 {
4467 auto* cfd =
4468 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
4469 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4470 Status es = ingestion_jobs[0].Prepare(
4471 args[0].external_files, args[0].files_checksums,
4472 args[0].files_checksum_func_names, next_file_number, super_version);
4473 if (!es.ok()) {
4474 status = es;
4475 }
4476 CleanupSuperVersion(super_version);
4477 }
4478 if (!status.ok()) {
4479 for (size_t i = 0; i != num_cfs; ++i) {
4480 ingestion_jobs[i].Cleanup(status);
4481 }
4482 InstrumentedMutexLock l(&mutex_);
4483 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4484 return status;
4485 }
4486
4487 std::vector<SuperVersionContext> sv_ctxs;
4488 for (size_t i = 0; i != num_cfs; ++i) {
4489 sv_ctxs.emplace_back(true /* create_superversion */);
4490 }
4491 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4492 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
4493 TEST_SYNC_POINT("DBImpl::AddFile:Start");
4494 {
4495 InstrumentedMutexLock l(&mutex_);
4496 TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4497
4498 // Stop writes to the DB by entering both write threads
4499 WriteThread::Writer w;
4500 write_thread_.EnterUnbatched(&w, &mutex_);
4501 WriteThread::Writer nonmem_w;
4502 if (two_write_queues_) {
4503 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4504 }
4505
4506 // When unordered_write is enabled, the keys are writing to memtable in an
4507 // unordered way. If the ingestion job checks memtable key range before the
4508 // key landing in memtable, the ingestion job may skip the necessary
4509 // memtable flush.
4510 // So wait here to ensure there is no pending write to memtable.
4511 WaitForPendingWrites();
4512
4513 num_running_ingest_file_ += static_cast<int>(num_cfs);
4514 TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
4515
4516 bool at_least_one_cf_need_flush = false;
4517 std::vector<bool> need_flush(num_cfs, false);
4518 for (size_t i = 0; i != num_cfs; ++i) {
4519 auto* cfd =
4520 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4521 if (cfd->IsDropped()) {
4522 // TODO (yanqin) investigate whether we should abort ingestion or
4523 // proceed with other non-dropped column families.
4524 status = Status::InvalidArgument(
4525 "cannot ingest an external file into a dropped CF");
4526 break;
4527 }
4528 bool tmp = false;
4529 status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4530 need_flush[i] = tmp;
4531 at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4532 if (!status.ok()) {
4533 break;
4534 }
4535 }
4536 TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4537 &at_least_one_cf_need_flush);
4538
4539 if (status.ok() && at_least_one_cf_need_flush) {
4540 FlushOptions flush_opts;
4541 flush_opts.allow_write_stall = true;
4542 if (immutable_db_options_.atomic_flush) {
4543 autovector<ColumnFamilyData*> cfds_to_flush;
4544 SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
4545 mutex_.Unlock();
4546 status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4547 FlushReason::kExternalFileIngestion,
4548 true /* writes_stopped */);
4549 mutex_.Lock();
4550 } else {
4551 for (size_t i = 0; i != num_cfs; ++i) {
4552 if (need_flush[i]) {
4553 mutex_.Unlock();
4554 auto* cfd =
4555 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4556 ->cfd();
4557 status = FlushMemTable(cfd, flush_opts,
4558 FlushReason::kExternalFileIngestion,
4559 true /* writes_stopped */);
4560 mutex_.Lock();
4561 if (!status.ok()) {
4562 break;
4563 }
4564 }
4565 }
4566 }
4567 }
4568 // Run ingestion jobs.
4569 if (status.ok()) {
4570 for (size_t i = 0; i != num_cfs; ++i) {
4571 status = ingestion_jobs[i].Run();
4572 if (!status.ok()) {
4573 break;
4574 }
4575 }
4576 }
4577 if (status.ok()) {
4578 int consumed_seqno_count =
4579 ingestion_jobs[0].ConsumedSequenceNumbersCount();
4580 #ifndef NDEBUG
4581 for (size_t i = 1; i != num_cfs; ++i) {
4582 assert(!!consumed_seqno_count ==
4583 !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4584 consumed_seqno_count +=
4585 ingestion_jobs[i].ConsumedSequenceNumbersCount();
4586 }
4587 #endif
4588 if (consumed_seqno_count > 0) {
4589 const SequenceNumber last_seqno = versions_->LastSequence();
4590 versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4591 versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4592 versions_->SetLastSequence(last_seqno + consumed_seqno_count);
4593 }
4594 autovector<ColumnFamilyData*> cfds_to_commit;
4595 autovector<const MutableCFOptions*> mutable_cf_options_list;
4596 autovector<autovector<VersionEdit*>> edit_lists;
4597 uint32_t num_entries = 0;
4598 for (size_t i = 0; i != num_cfs; ++i) {
4599 auto* cfd =
4600 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4601 if (cfd->IsDropped()) {
4602 continue;
4603 }
4604 cfds_to_commit.push_back(cfd);
4605 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4606 autovector<VersionEdit*> edit_list;
4607 edit_list.push_back(ingestion_jobs[i].edit());
4608 edit_lists.push_back(edit_list);
4609 ++num_entries;
4610 }
4611 // Mark the version edits as an atomic group if the number of version
4612 // edits exceeds 1.
4613 if (cfds_to_commit.size() > 1) {
4614 for (auto& edits : edit_lists) {
4615 assert(edits.size() == 1);
4616 edits[0]->MarkAtomicGroup(--num_entries);
4617 }
4618 assert(0 == num_entries);
4619 }
4620 status =
4621 versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4622 edit_lists, &mutex_, directories_.GetDbDir());
4623 }
4624
4625 if (status.ok()) {
4626 for (size_t i = 0; i != num_cfs; ++i) {
4627 auto* cfd =
4628 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4629 if (!cfd->IsDropped()) {
4630 InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4631 *cfd->GetLatestMutableCFOptions());
4632 #ifndef NDEBUG
4633 if (0 == i && num_cfs > 1) {
4634 TEST_SYNC_POINT(
4635 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4636 TEST_SYNC_POINT(
4637 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4638 }
4639 #endif // !NDEBUG
4640 }
4641 }
4642 } else if (versions_->io_status().IsIOError()) {
4643 // Error while writing to MANIFEST.
4644 // In fact, versions_->io_status() can also be the result of renaming
4645 // CURRENT file. With current code, it's just difficult to tell. So just
4646 // be pessimistic and try write to a new MANIFEST.
4647 // TODO: distinguish between MANIFEST write and CURRENT renaming
4648 const IOStatus& io_s = versions_->io_status();
4649 // Should handle return error?
4650 error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
4651 }
4652
4653 // Resume writes to the DB
4654 if (two_write_queues_) {
4655 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4656 }
4657 write_thread_.ExitUnbatched(&w);
4658
4659 if (status.ok()) {
4660 for (auto& job : ingestion_jobs) {
4661 job.UpdateStats();
4662 }
4663 }
4664 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4665 num_running_ingest_file_ -= static_cast<int>(num_cfs);
4666 if (0 == num_running_ingest_file_) {
4667 bg_cv_.SignalAll();
4668 }
4669 TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4670 }
4671 // mutex_ is unlocked here
4672
4673 // Cleanup
4674 for (size_t i = 0; i != num_cfs; ++i) {
4675 sv_ctxs[i].Clean();
4676 // This may rollback jobs that have completed successfully. This is
4677 // intended for atomicity.
4678 ingestion_jobs[i].Cleanup(status);
4679 }
4680 if (status.ok()) {
4681 for (size_t i = 0; i != num_cfs; ++i) {
4682 auto* cfd =
4683 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4684 if (!cfd->IsDropped()) {
4685 NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4686 }
4687 }
4688 }
4689 return status;
4690 }
4691
CreateColumnFamilyWithImport(const ColumnFamilyOptions & options,const std::string & column_family_name,const ImportColumnFamilyOptions & import_options,const ExportImportFilesMetaData & metadata,ColumnFamilyHandle ** handle)4692 Status DBImpl::CreateColumnFamilyWithImport(
4693 const ColumnFamilyOptions& options, const std::string& column_family_name,
4694 const ImportColumnFamilyOptions& import_options,
4695 const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4696 assert(handle != nullptr);
4697 assert(*handle == nullptr);
4698 std::string cf_comparator_name = options.comparator->Name();
4699 if (cf_comparator_name != metadata.db_comparator_name) {
4700 return Status::InvalidArgument("Comparator name mismatch");
4701 }
4702
4703 // Create column family.
4704 auto status = CreateColumnFamily(options, column_family_name, handle);
4705 if (!status.ok()) {
4706 return status;
4707 }
4708
4709 // Import sst files from metadata.
4710 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
4711 auto cfd = cfh->cfd();
4712 ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
4713 file_options_, import_options,
4714 metadata.files, io_tracer_);
4715
4716 SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4717 VersionEdit dummy_edit;
4718 uint64_t next_file_number = 0;
4719 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4720 {
4721 // Lock db mutex
4722 InstrumentedMutexLock l(&mutex_);
4723 if (error_handler_.IsDBStopped()) {
4724 // Don't import files when there is a bg_error
4725 status = error_handler_.GetBGError();
4726 }
4727
4728 // Make sure that bg cleanup wont delete the files that we are importing
4729 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4730 CaptureCurrentFileNumberInPendingOutputs()));
4731
4732 if (status.ok()) {
4733 // If crash happen after a hard link established, Recover function may
4734 // reuse the file number that has already assigned to the internal file,
4735 // and this will overwrite the external file. To protect the external
4736 // file, we have to make sure the file number will never being reused.
4737 next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4738 auto cf_options = cfd->GetLatestMutableCFOptions();
4739 status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4740 directories_.GetDbDir());
4741 if (status.ok()) {
4742 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4743 }
4744 }
4745 }
4746 dummy_sv_ctx.Clean();
4747
4748 if (status.ok()) {
4749 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4750 status = import_job.Prepare(next_file_number, sv);
4751 CleanupSuperVersion(sv);
4752 }
4753
4754 if (status.ok()) {
4755 SuperVersionContext sv_context(true /*create_superversion*/);
4756 {
4757 // Lock db mutex
4758 InstrumentedMutexLock l(&mutex_);
4759
4760 // Stop writes to the DB by entering both write threads
4761 WriteThread::Writer w;
4762 write_thread_.EnterUnbatched(&w, &mutex_);
4763 WriteThread::Writer nonmem_w;
4764 if (two_write_queues_) {
4765 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4766 }
4767
4768 num_running_ingest_file_++;
4769 assert(!cfd->IsDropped());
4770 status = import_job.Run();
4771
4772 // Install job edit [Mutex will be unlocked here]
4773 if (status.ok()) {
4774 auto cf_options = cfd->GetLatestMutableCFOptions();
4775 status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4776 &mutex_, directories_.GetDbDir());
4777 if (status.ok()) {
4778 InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4779 }
4780 }
4781
4782 // Resume writes to the DB
4783 if (two_write_queues_) {
4784 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4785 }
4786 write_thread_.ExitUnbatched(&w);
4787
4788 num_running_ingest_file_--;
4789 if (num_running_ingest_file_ == 0) {
4790 bg_cv_.SignalAll();
4791 }
4792 }
4793 // mutex_ is unlocked here
4794
4795 sv_context.Clean();
4796 }
4797
4798 {
4799 InstrumentedMutexLock l(&mutex_);
4800 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4801 }
4802
4803 import_job.Cleanup(status);
4804 if (!status.ok()) {
4805 Status temp_s = DropColumnFamily(*handle);
4806 if (!temp_s.ok()) {
4807 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4808 "DropColumnFamily failed with error %s",
4809 temp_s.ToString().c_str());
4810 }
4811 // Always returns Status::OK()
4812 temp_s = DestroyColumnFamilyHandle(*handle);
4813 assert(temp_s.ok());
4814 *handle = nullptr;
4815 }
4816 return status;
4817 }
4818
VerifyFileChecksums(const ReadOptions & read_options)4819 Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
4820 return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
4821 }
4822
VerifyChecksum(const ReadOptions & read_options)4823 Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
4824 return VerifyChecksumInternal(read_options, /*use_file_checksum=*/false);
4825 }
4826
VerifyChecksumInternal(const ReadOptions & read_options,bool use_file_checksum)4827 Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
4828 bool use_file_checksum) {
4829 Status s;
4830
4831 if (use_file_checksum) {
4832 FileChecksumGenFactory* const file_checksum_gen_factory =
4833 immutable_db_options_.file_checksum_gen_factory.get();
4834 if (!file_checksum_gen_factory) {
4835 s = Status::InvalidArgument(
4836 "Cannot verify file checksum if options.file_checksum_gen_factory is "
4837 "null");
4838 return s;
4839 }
4840 }
4841
4842 std::vector<ColumnFamilyData*> cfd_list;
4843 {
4844 InstrumentedMutexLock l(&mutex_);
4845 for (auto cfd : *versions_->GetColumnFamilySet()) {
4846 if (!cfd->IsDropped() && cfd->initialized()) {
4847 cfd->Ref();
4848 cfd_list.push_back(cfd);
4849 }
4850 }
4851 }
4852 std::vector<SuperVersion*> sv_list;
4853 for (auto cfd : cfd_list) {
4854 sv_list.push_back(cfd->GetReferencedSuperVersion(this));
4855 }
4856
4857 for (auto& sv : sv_list) {
4858 VersionStorageInfo* vstorage = sv->current->storage_info();
4859 ColumnFamilyData* cfd = sv->current->cfd();
4860 Options opts;
4861 if (!use_file_checksum) {
4862 InstrumentedMutexLock l(&mutex_);
4863 opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4864 cfd->GetLatestCFOptions());
4865 }
4866 for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4867 for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4868 j++) {
4869 const auto& fd_with_krange = vstorage->LevelFilesBrief(i).files[j];
4870 const auto& fd = fd_with_krange.fd;
4871 const FileMetaData* fmeta = fd_with_krange.file_metadata;
4872 assert(fmeta);
4873 std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4874 fd.GetNumber(), fd.GetPathId());
4875 if (use_file_checksum) {
4876 s = VerifyFullFileChecksum(fmeta->file_checksum,
4877 fmeta->file_checksum_func_name, fname,
4878 read_options);
4879 } else {
4880 s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
4881 read_options, fname);
4882 }
4883 }
4884 }
4885
4886 if (s.ok() && use_file_checksum) {
4887 const auto& blob_files = vstorage->GetBlobFiles();
4888 for (const auto& pair : blob_files) {
4889 const uint64_t blob_file_number = pair.first;
4890 const auto& meta = pair.second;
4891 assert(meta);
4892 const std::string blob_file_name = BlobFileName(
4893 cfd->ioptions()->cf_paths.front().path, blob_file_number);
4894 s = VerifyFullFileChecksum(meta->GetChecksumValue(),
4895 meta->GetChecksumMethod(), blob_file_name,
4896 read_options);
4897 if (!s.ok()) {
4898 break;
4899 }
4900 }
4901 }
4902 if (!s.ok()) {
4903 break;
4904 }
4905 }
4906
4907 bool defer_purge =
4908 immutable_db_options().avoid_unnecessary_blocking_io;
4909 {
4910 InstrumentedMutexLock l(&mutex_);
4911 for (auto sv : sv_list) {
4912 if (sv && sv->Unref()) {
4913 sv->Cleanup();
4914 if (defer_purge) {
4915 AddSuperVersionsToFreeQueue(sv);
4916 } else {
4917 delete sv;
4918 }
4919 }
4920 }
4921 if (defer_purge) {
4922 SchedulePurge();
4923 }
4924 for (auto cfd : cfd_list) {
4925 cfd->UnrefAndTryDelete();
4926 }
4927 }
4928 return s;
4929 }
4930
VerifyFullFileChecksum(const std::string & file_checksum_expected,const std::string & func_name_expected,const std::string & fname,const ReadOptions & read_options)4931 Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
4932 const std::string& func_name_expected,
4933 const std::string& fname,
4934 const ReadOptions& read_options) {
4935 Status s;
4936 if (file_checksum_expected == kUnknownFileChecksum) {
4937 return s;
4938 }
4939 std::string file_checksum;
4940 std::string func_name;
4941 s = ROCKSDB_NAMESPACE::GenerateOneFileChecksum(
4942 fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
4943 func_name_expected, &file_checksum, &func_name,
4944 read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
4945 io_tracer_, immutable_db_options_.rate_limiter.get());
4946 if (s.ok()) {
4947 assert(func_name_expected == func_name);
4948 if (file_checksum != file_checksum_expected) {
4949 std::ostringstream oss;
4950 oss << fname << " file checksum mismatch, ";
4951 oss << "expecting "
4952 << Slice(file_checksum_expected).ToString(/*hex=*/true);
4953 oss << ", but actual " << Slice(file_checksum).ToString(/*hex=*/true);
4954 s = Status::Corruption(oss.str());
4955 TEST_SYNC_POINT_CALLBACK("DBImpl::VerifyFullFileChecksum:mismatch", &s);
4956 }
4957 }
4958 return s;
4959 }
4960
NotifyOnExternalFileIngested(ColumnFamilyData * cfd,const ExternalSstFileIngestionJob & ingestion_job)4961 void DBImpl::NotifyOnExternalFileIngested(
4962 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
4963 if (immutable_db_options_.listeners.empty()) {
4964 return;
4965 }
4966
4967 for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4968 ExternalFileIngestionInfo info;
4969 info.cf_name = cfd->GetName();
4970 info.external_file_path = f.external_file_path;
4971 info.internal_file_path = f.internal_file_path;
4972 info.global_seqno = f.assigned_seqno;
4973 info.table_properties = f.table_properties;
4974 for (auto listener : immutable_db_options_.listeners) {
4975 listener->OnExternalFileIngested(this, info);
4976 }
4977 }
4978 }
4979
WaitForIngestFile()4980 void DBImpl::WaitForIngestFile() {
4981 mutex_.AssertHeld();
4982 while (num_running_ingest_file_ > 0) {
4983 bg_cv_.Wait();
4984 }
4985 }
4986
StartTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4987 Status DBImpl::StartTrace(const TraceOptions& trace_options,
4988 std::unique_ptr<TraceWriter>&& trace_writer) {
4989 InstrumentedMutexLock lock(&trace_mutex_);
4990 tracer_.reset(new Tracer(immutable_db_options_.clock, trace_options,
4991 std::move(trace_writer)));
4992 return Status::OK();
4993 }
4994
EndTrace()4995 Status DBImpl::EndTrace() {
4996 InstrumentedMutexLock lock(&trace_mutex_);
4997 Status s;
4998 if (tracer_ != nullptr) {
4999 s = tracer_->Close();
5000 tracer_.reset();
5001 } else {
5002 return Status::IOError("No trace file to close");
5003 }
5004 return s;
5005 }
5006
StartBlockCacheTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)5007 Status DBImpl::StartBlockCacheTrace(
5008 const TraceOptions& trace_options,
5009 std::unique_ptr<TraceWriter>&& trace_writer) {
5010 return block_cache_tracer_.StartTrace(immutable_db_options_.clock,
5011 trace_options, std::move(trace_writer));
5012 }
5013
EndBlockCacheTrace()5014 Status DBImpl::EndBlockCacheTrace() {
5015 block_cache_tracer_.EndTrace();
5016 return Status::OK();
5017 }
5018
TraceIteratorSeek(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)5019 Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
5020 const Slice& lower_bound,
5021 const Slice upper_bound) {
5022 Status s;
5023 if (tracer_) {
5024 InstrumentedMutexLock lock(&trace_mutex_);
5025 if (tracer_) {
5026 s = tracer_->IteratorSeek(cf_id, key, lower_bound, upper_bound);
5027 }
5028 }
5029 return s;
5030 }
5031
TraceIteratorSeekForPrev(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)5032 Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
5033 const Slice& lower_bound,
5034 const Slice upper_bound) {
5035 Status s;
5036 if (tracer_) {
5037 InstrumentedMutexLock lock(&trace_mutex_);
5038 if (tracer_) {
5039 s = tracer_->IteratorSeekForPrev(cf_id, key, lower_bound, upper_bound);
5040 }
5041 }
5042 return s;
5043 }
5044
ReserveFileNumbersBeforeIngestion(ColumnFamilyData * cfd,uint64_t num,std::unique_ptr<std::list<uint64_t>::iterator> & pending_output_elem,uint64_t * next_file_number)5045 Status DBImpl::ReserveFileNumbersBeforeIngestion(
5046 ColumnFamilyData* cfd, uint64_t num,
5047 std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
5048 uint64_t* next_file_number) {
5049 Status s;
5050 SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
5051 assert(nullptr != next_file_number);
5052 InstrumentedMutexLock l(&mutex_);
5053 if (error_handler_.IsDBStopped()) {
5054 // Do not ingest files when there is a bg_error
5055 return error_handler_.GetBGError();
5056 }
5057 pending_output_elem.reset(new std::list<uint64_t>::iterator(
5058 CaptureCurrentFileNumberInPendingOutputs()));
5059 *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
5060 auto cf_options = cfd->GetLatestMutableCFOptions();
5061 VersionEdit dummy_edit;
5062 // If crash happen after a hard link established, Recover function may
5063 // reuse the file number that has already assigned to the internal file,
5064 // and this will overwrite the external file. To protect the external
5065 // file, we have to make sure the file number will never being reused.
5066 s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
5067 directories_.GetDbDir());
5068 if (s.ok()) {
5069 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
5070 }
5071 dummy_sv_ctx.Clean();
5072 return s;
5073 }
5074
GetCreationTimeOfOldestFile(uint64_t * creation_time)5075 Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
5076 if (mutable_db_options_.max_open_files == -1) {
5077 uint64_t oldest_time = port::kMaxUint64;
5078 for (auto cfd : *versions_->GetColumnFamilySet()) {
5079 if (!cfd->IsDropped()) {
5080 uint64_t ctime;
5081 {
5082 SuperVersion* sv = GetAndRefSuperVersion(cfd);
5083 Version* version = sv->current;
5084 version->GetCreationTimeOfOldestFile(&ctime);
5085 ReturnAndCleanupSuperVersion(cfd, sv);
5086 }
5087
5088 if (ctime < oldest_time) {
5089 oldest_time = ctime;
5090 }
5091 if (oldest_time == 0) {
5092 break;
5093 }
5094 }
5095 }
5096 *creation_time = oldest_time;
5097 return Status::OK();
5098 } else {
5099 return Status::NotSupported("This API only works if max_open_files = -1");
5100 }
5101 }
5102 #endif // ROCKSDB_LITE
5103
5104 } // namespace ROCKSDB_NAMESPACE
5105