1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <stdint.h>
12 #ifdef OS_SOLARIS
13 #include <alloca.h>
14 #endif
15 
16 #include <algorithm>
17 #include <cinttypes>
18 #include <cstdio>
19 #include <map>
20 #include <set>
21 #include <sstream>
22 #include <stdexcept>
23 #include <string>
24 #include <unordered_map>
25 #include <utility>
26 #include <vector>
27 
28 #include "db/arena_wrapped_db_iter.h"
29 #include "db/builder.h"
30 #include "db/compaction/compaction_job.h"
31 #include "db/db_info_dumper.h"
32 #include "db/db_iter.h"
33 #include "db/dbformat.h"
34 #include "db/error_handler.h"
35 #include "db/event_helpers.h"
36 #include "db/external_sst_file_ingestion_job.h"
37 #include "db/flush_job.h"
38 #include "db/forward_iterator.h"
39 #include "db/import_column_family_job.h"
40 #include "db/job_context.h"
41 #include "db/log_reader.h"
42 #include "db/log_writer.h"
43 #include "db/malloc_stats.h"
44 #include "db/memtable.h"
45 #include "db/memtable_list.h"
46 #include "db/merge_context.h"
47 #include "db/merge_helper.h"
48 #include "db/periodic_work_scheduler.h"
49 #include "db/range_tombstone_fragmenter.h"
50 #include "db/table_cache.h"
51 #include "db/table_properties_collector.h"
52 #include "db/transaction_log_impl.h"
53 #include "db/version_set.h"
54 #include "db/write_batch_internal.h"
55 #include "db/write_callback.h"
56 #include "file/file_util.h"
57 #include "file/filename.h"
58 #include "file/random_access_file_reader.h"
59 #include "file/sst_file_manager_impl.h"
60 #include "logging/auto_roll_logger.h"
61 #include "logging/log_buffer.h"
62 #include "logging/logging.h"
63 #include "memtable/hash_linklist_rep.h"
64 #include "memtable/hash_skiplist_rep.h"
65 #include "monitoring/in_memory_stats_history.h"
66 #include "monitoring/iostats_context_imp.h"
67 #include "monitoring/perf_context_imp.h"
68 #include "monitoring/persistent_stats_history.h"
69 #include "monitoring/thread_status_updater.h"
70 #include "monitoring/thread_status_util.h"
71 #include "options/cf_options.h"
72 #include "options/options_helper.h"
73 #include "options/options_parser.h"
74 #include "port/port.h"
75 #include "rocksdb/cache.h"
76 #include "rocksdb/compaction_filter.h"
77 #include "rocksdb/convenience.h"
78 #include "rocksdb/db.h"
79 #include "rocksdb/env.h"
80 #include "rocksdb/merge_operator.h"
81 #include "rocksdb/statistics.h"
82 #include "rocksdb/stats_history.h"
83 #include "rocksdb/status.h"
84 #include "rocksdb/table.h"
85 #include "rocksdb/version.h"
86 #include "rocksdb/write_buffer_manager.h"
87 #include "table/block_based/block.h"
88 #include "table/block_based/block_based_table_factory.h"
89 #include "table/get_context.h"
90 #include "table/merging_iterator.h"
91 #include "table/multiget_context.h"
92 #include "table/sst_file_dumper.h"
93 #include "table/table_builder.h"
94 #include "table/two_level_iterator.h"
95 #include "test_util/sync_point.h"
96 #include "util/autovector.h"
97 #include "util/cast_util.h"
98 #include "util/coding.h"
99 #include "util/compression.h"
100 #include "util/crc32c.h"
101 #include "util/mutexlock.h"
102 #include "util/stop_watch.h"
103 #include "util/string_util.h"
104 
105 namespace ROCKSDB_NAMESPACE {
106 
107 const std::string kDefaultColumnFamilyName("default");
108 const std::string kPersistentStatsColumnFamilyName(
109     "___rocksdb_stats_history___");
110 void DumpRocksDBBuildVersion(Logger* log);
111 
GetCompressionFlush(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)112 CompressionType GetCompressionFlush(
113     const ImmutableCFOptions& ioptions,
114     const MutableCFOptions& mutable_cf_options) {
115   // Compressing memtable flushes might not help unless the sequential load
116   // optimization is used for leveled compaction. Otherwise the CPU and
117   // latency overhead is not offset by saving much space.
118   if (ioptions.compaction_style == kCompactionStyleUniversal) {
119     if (mutable_cf_options.compaction_options_universal
120             .compression_size_percent < 0) {
121       return mutable_cf_options.compression;
122     } else {
123       return kNoCompression;
124     }
125   } else if (!ioptions.compression_per_level.empty()) {
126     // For leveled compress when min_level_to_compress != 0.
127     return ioptions.compression_per_level[0];
128   } else {
129     return mutable_cf_options.compression;
130   }
131 }
132 
133 namespace {
DumpSupportInfo(Logger * logger)134 void DumpSupportInfo(Logger* logger) {
135   ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
136   for (auto& compression : OptionsHelper::compression_type_string_map) {
137     if (compression.second != kNoCompression &&
138         compression.second != kDisableCompressionOption) {
139       ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
140                        CompressionTypeSupported(compression.second));
141     }
142   }
143   ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
144                    crc32c::IsFastCrc32Supported().c_str());
145 }
146 }  // namespace
147 
DBImpl(const DBOptions & options,const std::string & dbname,const bool seq_per_batch,const bool batch_per_txn,bool read_only)148 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
149                const bool seq_per_batch, const bool batch_per_txn,
150                bool read_only)
151     : dbname_(dbname),
152       own_info_log_(options.info_log == nullptr),
153       initial_db_options_(SanitizeOptions(dbname, options, read_only)),
154       env_(initial_db_options_.env),
155       io_tracer_(std::make_shared<IOTracer>()),
156       immutable_db_options_(initial_db_options_),
157       fs_(immutable_db_options_.fs, io_tracer_),
158       mutable_db_options_(initial_db_options_),
159       stats_(immutable_db_options_.stats),
160       mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
161              immutable_db_options_.use_adaptive_mutex),
162       default_cf_handle_(nullptr),
163       error_handler_(this, immutable_db_options_, &mutex_),
164       event_logger_(immutable_db_options_.info_log.get()),
165       max_total_in_memory_state_(0),
166       file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
167       file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
168           file_options_, immutable_db_options_)),
169       seq_per_batch_(seq_per_batch),
170       batch_per_txn_(batch_per_txn),
171       next_job_id_(1),
172       shutting_down_(false),
173       db_lock_(nullptr),
174       manual_compaction_paused_(false),
175       bg_cv_(&mutex_),
176       logfile_number_(0),
177       log_dir_synced_(false),
178       log_empty_(true),
179       persist_stats_cf_handle_(nullptr),
180       log_sync_cv_(&mutex_),
181       total_log_size_(0),
182       is_snapshot_supported_(true),
183       write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
184       write_thread_(immutable_db_options_),
185       nonmem_write_thread_(immutable_db_options_),
186       write_controller_(mutable_db_options_.delayed_write_rate),
187       last_batch_group_size_(0),
188       unscheduled_flushes_(0),
189       unscheduled_compactions_(0),
190       bg_bottom_compaction_scheduled_(0),
191       bg_compaction_scheduled_(0),
192       num_running_compactions_(0),
193       bg_flush_scheduled_(0),
194       num_running_flushes_(0),
195       bg_purge_scheduled_(0),
196       disable_delete_obsolete_files_(0),
197       pending_purge_obsolete_files_(0),
198       delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
199       last_stats_dump_time_microsec_(0),
200       has_unpersisted_data_(false),
201       unable_to_release_oldest_log_(false),
202       num_running_ingest_file_(0),
203 #ifndef ROCKSDB_LITE
204       wal_manager_(immutable_db_options_, file_options_, io_tracer_,
205                    seq_per_batch),
206 #endif  // ROCKSDB_LITE
207       bg_work_paused_(0),
208       bg_compaction_paused_(0),
209       refitting_level_(false),
210       opened_successfully_(false),
211 #ifndef ROCKSDB_LITE
212       periodic_work_scheduler_(nullptr),
213 #endif  // ROCKSDB_LITE
214       two_write_queues_(options.two_write_queues),
215       manual_wal_flush_(options.manual_wal_flush),
216       // last_sequencee_ is always maintained by the main queue that also writes
217       // to the memtable. When two_write_queues_ is disabled last seq in
218       // memtable is the same as last seq published to the readers. When it is
219       // enabled but seq_per_batch_ is disabled, last seq in memtable still
220       // indicates last published seq since wal-only writes that go to the 2nd
221       // queue do not consume a sequence number. Otherwise writes performed by
222       // the 2nd queue could change what is visible to the readers. In this
223       // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
224       // separate variable to indicate the last published sequence.
225       last_seq_same_as_publish_seq_(
226           !(seq_per_batch && options.two_write_queues)),
227       // Since seq_per_batch_ is currently set only by WritePreparedTxn which
228       // requires a custom gc for compaction, we use that to set use_custom_gc_
229       // as well.
230       use_custom_gc_(seq_per_batch),
231       shutdown_initiated_(false),
232       own_sfm_(options.sst_file_manager == nullptr),
233       preserve_deletes_(options.preserve_deletes),
234       closed_(false),
235       atomic_flush_install_cv_(&mutex_),
236       blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
237                      &error_handler_) {
238   // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
239   // WriteUnprepared, which should use seq_per_batch_.
240   assert(batch_per_txn_ || seq_per_batch_);
241   // TODO: Check for an error here
242   env_->GetAbsolutePath(dbname, &db_absolute_path_).PermitUncheckedError();
243 
244   // Reserve ten files or so for other uses and give the rest to TableCache.
245   // Give a large number for setting of "infinite" open files.
246   const int table_cache_size = (mutable_db_options_.max_open_files == -1)
247                                    ? TableCache::kInfiniteCapacity
248                                    : mutable_db_options_.max_open_files - 10;
249   LRUCacheOptions co;
250   co.capacity = table_cache_size;
251   co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
252   co.metadata_charge_policy = kDontChargeCacheMetadata;
253   table_cache_ = NewLRUCache(co);
254 
255   versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
256                                  table_cache_.get(), write_buffer_manager_,
257                                  &write_controller_, &block_cache_tracer_,
258                                  io_tracer_));
259   column_family_memtables_.reset(
260       new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
261 
262   DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
263   SetDbSessionId();
264   DumpDBFileSummary(immutable_db_options_, dbname_, db_session_id_);
265   immutable_db_options_.Dump(immutable_db_options_.info_log.get());
266   mutable_db_options_.Dump(immutable_db_options_.info_log.get());
267   DumpSupportInfo(immutable_db_options_.info_log.get());
268 
269   // always open the DB with 0 here, which means if preserve_deletes_==true
270   // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
271   // is called by client and this seqnum is advanced.
272   preserve_deletes_seqnum_.store(0);
273 
274   if (write_buffer_manager_) {
275     wbm_stall_.reset(new WBMStallInterface());
276   }
277 }
278 
Resume()279 Status DBImpl::Resume() {
280   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
281 
282   InstrumentedMutexLock db_mutex(&mutex_);
283 
284   if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
285     // Nothing to do
286     return Status::OK();
287   }
288 
289   if (error_handler_.IsRecoveryInProgress()) {
290     // Don't allow a mix of manual and automatic recovery
291     return Status::Busy();
292   }
293 
294   mutex_.Unlock();
295   Status s = error_handler_.RecoverFromBGError(true);
296   mutex_.Lock();
297   return s;
298 }
299 
300 // This function implements the guts of recovery from a background error. It
301 // is eventually called for both manual as well as automatic recovery. It does
302 // the following -
303 // 1. Wait for currently scheduled background flush/compaction to exit, in
304 //    order to inadvertently causing an error and thinking recovery failed
305 // 2. Flush memtables if there's any data for all the CFs. This may result
306 //    another error, which will be saved by error_handler_ and reported later
307 //    as the recovery status
308 // 3. Find and delete any obsolete files
309 // 4. Schedule compactions if needed for all the CFs. This is needed as the
310 //    flush in the prior step might have been a no-op for some CFs, which
311 //    means a new super version wouldn't have been installed
ResumeImpl(DBRecoverContext context)312 Status DBImpl::ResumeImpl(DBRecoverContext context) {
313   mutex_.AssertHeld();
314   WaitForBackgroundWork();
315 
316   Status s;
317   if (shutdown_initiated_) {
318     // Returning shutdown status to SFM during auto recovery will cause it
319     // to abort the recovery and allow the shutdown to progress
320     s = Status::ShutdownInProgress();
321   }
322 
323   if (s.ok()) {
324     Status bg_error = error_handler_.GetBGError();
325     if (bg_error.severity() > Status::Severity::kHardError) {
326       ROCKS_LOG_INFO(
327           immutable_db_options_.info_log,
328           "DB resume requested but failed due to Fatal/Unrecoverable error");
329       s = bg_error;
330     }
331   }
332 
333   // Make sure the IO Status stored in version set is set to OK.
334   bool file_deletion_disabled = !IsFileDeletionsEnabled();
335   if (s.ok()) {
336     IOStatus io_s = versions_->io_status();
337     if (io_s.IsIOError()) {
338       // If resuming from IOError resulted from MANIFEST write, then assert
339       // that we must have already set the MANIFEST writer to nullptr during
340       // clean-up phase MANIFEST writing. We must have also disabled file
341       // deletions.
342       assert(!versions_->descriptor_log_);
343       assert(file_deletion_disabled);
344       // Since we are trying to recover from MANIFEST write error, we need to
345       // switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
346       // Therefore, force writing a dummy version edit because we do not know
347       // whether there are flush jobs with non-empty data to flush, triggering
348       // appends to MANIFEST.
349       VersionEdit edit;
350       auto cfh =
351           static_cast_with_check<ColumnFamilyHandleImpl>(default_cf_handle_);
352       assert(cfh);
353       ColumnFamilyData* cfd = cfh->cfd();
354       const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
355       s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
356                                  directories_.GetDbDir());
357       if (!s.ok()) {
358         io_s = versions_->io_status();
359         if (!io_s.ok()) {
360           s = error_handler_.SetBGError(io_s,
361                                         BackgroundErrorReason::kManifestWrite);
362         }
363       }
364     }
365   }
366 
367   // We cannot guarantee consistency of the WAL. So force flush Memtables of
368   // all the column families
369   if (s.ok()) {
370     FlushOptions flush_opts;
371     // We allow flush to stall write since we are trying to resume from error.
372     flush_opts.allow_write_stall = true;
373     if (immutable_db_options_.atomic_flush) {
374       autovector<ColumnFamilyData*> cfds;
375       SelectColumnFamiliesForAtomicFlush(&cfds);
376       mutex_.Unlock();
377       s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
378       mutex_.Lock();
379     } else {
380       for (auto cfd : *versions_->GetColumnFamilySet()) {
381         if (cfd->IsDropped()) {
382           continue;
383         }
384         cfd->Ref();
385         mutex_.Unlock();
386         s = FlushMemTable(cfd, flush_opts, context.flush_reason);
387         mutex_.Lock();
388         cfd->UnrefAndTryDelete();
389         if (!s.ok()) {
390           break;
391         }
392       }
393     }
394     if (!s.ok()) {
395       ROCKS_LOG_INFO(immutable_db_options_.info_log,
396                      "DB resume requested but failed due to Flush failure [%s]",
397                      s.ToString().c_str());
398     }
399   }
400 
401   JobContext job_context(0);
402   FindObsoleteFiles(&job_context, true);
403   if (s.ok()) {
404     s = error_handler_.ClearBGError();
405   } else {
406     // NOTE: this is needed to pass ASSERT_STATUS_CHECKED
407     // in the DBSSTTest.DBWithMaxSpaceAllowedRandomized test.
408     // See https://github.com/facebook/rocksdb/pull/7715#issuecomment-754947952
409     error_handler_.GetRecoveryError().PermitUncheckedError();
410   }
411   mutex_.Unlock();
412 
413   job_context.manifest_file_number = 1;
414   if (job_context.HaveSomethingToDelete()) {
415     PurgeObsoleteFiles(job_context);
416   }
417   job_context.Clean();
418 
419   if (s.ok()) {
420     assert(versions_->io_status().ok());
421     // If we reach here, we should re-enable file deletions if it was disabled
422     // during previous error handling.
423     if (file_deletion_disabled) {
424       // Always return ok
425       s = EnableFileDeletions(/*force=*/true);
426       if (!s.ok()) {
427         ROCKS_LOG_INFO(
428             immutable_db_options_.info_log,
429             "DB resume requested but could not enable file deletions [%s]",
430             s.ToString().c_str());
431       }
432     }
433     ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
434   }
435   mutex_.Lock();
436   // Check for shutdown again before scheduling further compactions,
437   // since we released and re-acquired the lock above
438   if (shutdown_initiated_) {
439     s = Status::ShutdownInProgress();
440   }
441   if (s.ok()) {
442     for (auto cfd : *versions_->GetColumnFamilySet()) {
443       SchedulePendingCompaction(cfd);
444     }
445     MaybeScheduleFlushOrCompaction();
446   }
447 
448   // Wake up any waiters - in this case, it could be the shutdown thread
449   bg_cv_.SignalAll();
450 
451   // No need to check BGError again. If something happened, event listener would
452   // be notified and the operation causing it would have failed
453   return s;
454 }
455 
WaitForBackgroundWork()456 void DBImpl::WaitForBackgroundWork() {
457   // Wait for background work to finish
458   while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
459          bg_flush_scheduled_) {
460     bg_cv_.Wait();
461   }
462 }
463 
464 // Will lock the mutex_,  will wait for completion if wait is true
CancelAllBackgroundWork(bool wait)465 void DBImpl::CancelAllBackgroundWork(bool wait) {
466   ROCKS_LOG_INFO(immutable_db_options_.info_log,
467                  "Shutdown: canceling all background work");
468 
469 #ifndef ROCKSDB_LITE
470   if (periodic_work_scheduler_ != nullptr) {
471     periodic_work_scheduler_->Unregister(this);
472   }
473 #endif  // !ROCKSDB_LITE
474 
475   InstrumentedMutexLock l(&mutex_);
476   if (!shutting_down_.load(std::memory_order_acquire) &&
477       has_unpersisted_data_.load(std::memory_order_relaxed) &&
478       !mutable_db_options_.avoid_flush_during_shutdown) {
479     if (immutable_db_options_.atomic_flush) {
480       autovector<ColumnFamilyData*> cfds;
481       SelectColumnFamiliesForAtomicFlush(&cfds);
482       mutex_.Unlock();
483       Status s =
484           AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
485       s.PermitUncheckedError();  //**TODO: What to do on error?
486       mutex_.Lock();
487     } else {
488       for (auto cfd : *versions_->GetColumnFamilySet()) {
489         if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
490           cfd->Ref();
491           mutex_.Unlock();
492           Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
493           s.PermitUncheckedError();  //**TODO: What to do on error?
494           mutex_.Lock();
495           cfd->UnrefAndTryDelete();
496         }
497       }
498     }
499     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
500   }
501 
502   shutting_down_.store(true, std::memory_order_release);
503   bg_cv_.SignalAll();
504   if (!wait) {
505     return;
506   }
507   WaitForBackgroundWork();
508 }
509 
CloseHelper()510 Status DBImpl::CloseHelper() {
511   // Guarantee that there is no background error recovery in progress before
512   // continuing with the shutdown
513   mutex_.Lock();
514   shutdown_initiated_ = true;
515   error_handler_.CancelErrorRecovery();
516   while (error_handler_.IsRecoveryInProgress()) {
517     bg_cv_.Wait();
518   }
519   mutex_.Unlock();
520 
521   // Below check is added as recovery_error_ is not checked and it causes crash
522   // in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is
523   // reached.
524   error_handler_.GetRecoveryError().PermitUncheckedError();
525 
526   // CancelAllBackgroundWork called with false means we just set the shutdown
527   // marker. After this we do a variant of the waiting and unschedule work
528   // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
529   CancelAllBackgroundWork(false);
530   mutex_.Lock();
531   env_->UnSchedule(this, Env::Priority::BOTTOM);
532   env_->UnSchedule(this, Env::Priority::LOW);
533   env_->UnSchedule(this, Env::Priority::HIGH);
534   Status ret = Status::OK();
535 
536   // Wait for background work to finish
537   while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
538          bg_flush_scheduled_ || bg_purge_scheduled_ ||
539          pending_purge_obsolete_files_ ||
540          error_handler_.IsRecoveryInProgress()) {
541     TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
542     bg_cv_.Wait();
543   }
544   TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
545                            &files_grabbed_for_purge_);
546   EraseThreadStatusDbInfo();
547   flush_scheduler_.Clear();
548   trim_history_scheduler_.Clear();
549 
550   while (!flush_queue_.empty()) {
551     const FlushRequest& flush_req = PopFirstFromFlushQueue();
552     for (const auto& iter : flush_req) {
553       iter.first->UnrefAndTryDelete();
554     }
555   }
556   while (!compaction_queue_.empty()) {
557     auto cfd = PopFirstFromCompactionQueue();
558     cfd->UnrefAndTryDelete();
559   }
560 
561   if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
562     // we need to delete handle outside of lock because it does its own locking
563     mutex_.Unlock();
564     if (default_cf_handle_) {
565       delete default_cf_handle_;
566       default_cf_handle_ = nullptr;
567     }
568     if (persist_stats_cf_handle_) {
569       delete persist_stats_cf_handle_;
570       persist_stats_cf_handle_ = nullptr;
571     }
572     mutex_.Lock();
573   }
574 
575   // Clean up obsolete files due to SuperVersion release.
576   // (1) Need to delete to obsolete files before closing because RepairDB()
577   // scans all existing files in the file system and builds manifest file.
578   // Keeping obsolete files confuses the repair process.
579   // (2) Need to check if we Open()/Recover() the DB successfully before
580   // deleting because if VersionSet recover fails (may be due to corrupted
581   // manifest file), it is not able to identify live files correctly. As a
582   // result, all "live" files can get deleted by accident. However, corrupted
583   // manifest is recoverable by RepairDB().
584   if (opened_successfully_) {
585     JobContext job_context(next_job_id_.fetch_add(1));
586     FindObsoleteFiles(&job_context, true);
587 
588     mutex_.Unlock();
589     // manifest number starting from 2
590     job_context.manifest_file_number = 1;
591     if (job_context.HaveSomethingToDelete()) {
592       PurgeObsoleteFiles(job_context);
593     }
594     job_context.Clean();
595     mutex_.Lock();
596   }
597 
598   for (auto l : logs_to_free_) {
599     delete l;
600   }
601   for (auto& log : logs_) {
602     uint64_t log_number = log.writer->get_log_number();
603     Status s = log.ClearWriter();
604     if (!s.ok()) {
605       ROCKS_LOG_WARN(
606           immutable_db_options_.info_log,
607           "Unable to Sync WAL file %s with error -- %s",
608           LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
609           s.ToString().c_str());
610       // Retain the first error
611       if (ret.ok()) {
612         ret = s;
613       }
614     }
615   }
616   logs_.clear();
617 
618   // Table cache may have table handles holding blocks from the block cache.
619   // We need to release them before the block cache is destroyed. The block
620   // cache may be destroyed inside versions_.reset(), when column family data
621   // list is destroyed, so leaving handles in table cache after
622   // versions_.reset() may cause issues.
623   // Here we clean all unreferenced handles in table cache.
624   // Now we assume all user queries have finished, so only version set itself
625   // can possibly hold the blocks from block cache. After releasing unreferenced
626   // handles here, only handles held by version set left and inside
627   // versions_.reset(), we will release them. There, we need to make sure every
628   // time a handle is released, we erase it from the cache too. By doing that,
629   // we can guarantee that after versions_.reset(), table cache is empty
630   // so the cache can be safely destroyed.
631   table_cache_->EraseUnRefEntries();
632 
633   for (auto& txn_entry : recovered_transactions_) {
634     delete txn_entry.second;
635   }
636 
637   // versions need to be destroyed before table_cache since it can hold
638   // references to table_cache.
639   versions_.reset();
640   mutex_.Unlock();
641   if (db_lock_ != nullptr) {
642     // TODO: Check for unlock error
643     env_->UnlockFile(db_lock_).PermitUncheckedError();
644   }
645 
646   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
647   LogFlush(immutable_db_options_.info_log);
648 
649 #ifndef ROCKSDB_LITE
650   // If the sst_file_manager was allocated by us during DB::Open(), ccall
651   // Close() on it before closing the info_log. Otherwise, background thread
652   // in SstFileManagerImpl might try to log something
653   if (immutable_db_options_.sst_file_manager && own_sfm_) {
654     auto sfm = static_cast<SstFileManagerImpl*>(
655         immutable_db_options_.sst_file_manager.get());
656     sfm->Close();
657   }
658 #endif  // ROCKSDB_LITE
659 
660   if (immutable_db_options_.info_log && own_info_log_) {
661     Status s = immutable_db_options_.info_log->Close();
662     if (!s.ok() && !s.IsNotSupported() && ret.ok()) {
663       ret = s;
664     }
665   }
666 
667   if (write_buffer_manager_ && wbm_stall_) {
668     write_buffer_manager_->RemoveDBFromQueue(wbm_stall_.get());
669   }
670 
671   if (ret.IsAborted()) {
672     // Reserve IsAborted() error for those where users didn't release
673     // certain resource and they can release them and come back and
674     // retry. In this case, we wrap this exception to something else.
675     return Status::Incomplete(ret.ToString());
676   }
677   return ret;
678 }
679 
CloseImpl()680 Status DBImpl::CloseImpl() { return CloseHelper(); }
681 
~DBImpl()682 DBImpl::~DBImpl() {
683   if (!closed_) {
684     closed_ = true;
685     CloseHelper().PermitUncheckedError();
686   }
687 }
688 
MaybeIgnoreError(Status * s) const689 void DBImpl::MaybeIgnoreError(Status* s) const {
690   if (s->ok() || immutable_db_options_.paranoid_checks) {
691     // No change needed
692   } else {
693     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
694                    s->ToString().c_str());
695     *s = Status::OK();
696   }
697 }
698 
CreateArchivalDirectory()699 const Status DBImpl::CreateArchivalDirectory() {
700   if (immutable_db_options_.WAL_ttl_seconds > 0 ||
701       immutable_db_options_.WAL_size_limit_MB > 0) {
702     std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
703     return env_->CreateDirIfMissing(archivalPath);
704   }
705   return Status::OK();
706 }
707 
PrintStatistics()708 void DBImpl::PrintStatistics() {
709   auto dbstats = immutable_db_options_.stats;
710   if (dbstats) {
711     ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
712                    dbstats->ToString().c_str());
713   }
714 }
715 
StartPeriodicWorkScheduler()716 void DBImpl::StartPeriodicWorkScheduler() {
717 #ifndef ROCKSDB_LITE
718 
719 #ifndef NDEBUG
720   // It only used by test to disable scheduler
721   bool disable_scheduler = false;
722   TEST_SYNC_POINT_CALLBACK(
723       "DBImpl::StartPeriodicWorkScheduler:DisableScheduler",
724       &disable_scheduler);
725   if (disable_scheduler) {
726     return;
727   }
728 #endif  // !NDEBUG
729 
730   {
731     InstrumentedMutexLock l(&mutex_);
732     periodic_work_scheduler_ = PeriodicWorkScheduler::Default();
733     TEST_SYNC_POINT_CALLBACK("DBImpl::StartPeriodicWorkScheduler:Init",
734                              &periodic_work_scheduler_);
735   }
736 
737   periodic_work_scheduler_->Register(
738       this, mutable_db_options_.stats_dump_period_sec,
739       mutable_db_options_.stats_persist_period_sec);
740 #endif  // !ROCKSDB_LITE
741 }
742 
743 // esitmate the total size of stats_history_
EstimateInMemoryStatsHistorySize() const744 size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
745   size_t size_total =
746       sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
747   if (stats_history_.size() == 0) return size_total;
748   size_t size_per_slice =
749       sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
750   // non-empty map, stats_history_.begin() guaranteed to exist
751   std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
752   for (const auto& pairs : sample_slice) {
753     size_per_slice +=
754         pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
755   }
756   size_total = size_per_slice * stats_history_.size();
757   return size_total;
758 }
759 
PersistStats()760 void DBImpl::PersistStats() {
761   TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
762 #ifndef ROCKSDB_LITE
763   if (shutdown_initiated_) {
764     return;
765   }
766   TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
767   uint64_t now_seconds =
768       immutable_db_options_.clock->NowMicros() / kMicrosInSecond;
769 
770   Statistics* statistics = immutable_db_options_.stats;
771   if (!statistics) {
772     return;
773   }
774   size_t stats_history_size_limit = 0;
775   {
776     InstrumentedMutexLock l(&mutex_);
777     stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
778   }
779 
780   std::map<std::string, uint64_t> stats_map;
781   if (!statistics->getTickerMap(&stats_map)) {
782     return;
783   }
784   ROCKS_LOG_INFO(immutable_db_options_.info_log,
785                  "------- PERSISTING STATS -------");
786 
787   if (immutable_db_options_.persist_stats_to_disk) {
788     WriteBatch batch;
789     Status s = Status::OK();
790     if (stats_slice_initialized_) {
791       ROCKS_LOG_INFO(immutable_db_options_.info_log,
792                      "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
793                      stats_slice_.size());
794       for (const auto& stat : stats_map) {
795         if (s.ok()) {
796           char key[100];
797           int length =
798               EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
799           // calculate the delta from last time
800           if (stats_slice_.find(stat.first) != stats_slice_.end()) {
801             uint64_t delta = stat.second - stats_slice_[stat.first];
802             s = batch.Put(persist_stats_cf_handle_,
803                           Slice(key, std::min(100, length)), ToString(delta));
804           }
805         }
806       }
807     }
808     stats_slice_initialized_ = true;
809     std::swap(stats_slice_, stats_map);
810     if (s.ok()) {
811       WriteOptions wo;
812       wo.low_pri = true;
813       wo.no_slowdown = true;
814       wo.sync = false;
815       s = Write(wo, &batch);
816     }
817     if (!s.ok()) {
818       ROCKS_LOG_INFO(immutable_db_options_.info_log,
819                      "Writing to persistent stats CF failed -- %s",
820                      s.ToString().c_str());
821     } else {
822       ROCKS_LOG_INFO(immutable_db_options_.info_log,
823                      "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
824                      " to persistent stats CF succeeded",
825                      stats_slice_.size(), now_seconds);
826     }
827     // TODO(Zhongyi): add purging for persisted data
828   } else {
829     InstrumentedMutexLock l(&stats_history_mutex_);
830     // calculate the delta from last time
831     if (stats_slice_initialized_) {
832       std::map<std::string, uint64_t> stats_delta;
833       for (const auto& stat : stats_map) {
834         if (stats_slice_.find(stat.first) != stats_slice_.end()) {
835           stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
836         }
837       }
838       ROCKS_LOG_INFO(immutable_db_options_.info_log,
839                      "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
840                      " to in-memory stats history",
841                      stats_slice_.size(), now_seconds);
842       stats_history_[now_seconds] = stats_delta;
843     }
844     stats_slice_initialized_ = true;
845     std::swap(stats_slice_, stats_map);
846     TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
847 
848     // delete older stats snapshots to control memory consumption
849     size_t stats_history_size = EstimateInMemoryStatsHistorySize();
850     bool purge_needed = stats_history_size > stats_history_size_limit;
851     ROCKS_LOG_INFO(immutable_db_options_.info_log,
852                    "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
853                    " bytes, slice count: %" ROCKSDB_PRIszt,
854                    stats_history_size, stats_history_.size());
855     while (purge_needed && !stats_history_.empty()) {
856       stats_history_.erase(stats_history_.begin());
857       purge_needed =
858           EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
859     }
860     ROCKS_LOG_INFO(immutable_db_options_.info_log,
861                    "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
862                    " bytes, slice count: %" ROCKSDB_PRIszt,
863                    stats_history_size, stats_history_.size());
864   }
865   TEST_SYNC_POINT("DBImpl::PersistStats:End");
866 #endif  // !ROCKSDB_LITE
867 }
868 
FindStatsByTime(uint64_t start_time,uint64_t end_time,uint64_t * new_time,std::map<std::string,uint64_t> * stats_map)869 bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
870                              uint64_t* new_time,
871                              std::map<std::string, uint64_t>* stats_map) {
872   assert(new_time);
873   assert(stats_map);
874   if (!new_time || !stats_map) return false;
875   // lock when search for start_time
876   {
877     InstrumentedMutexLock l(&stats_history_mutex_);
878     auto it = stats_history_.lower_bound(start_time);
879     if (it != stats_history_.end() && it->first < end_time) {
880       // make a copy for timestamp and stats_map
881       *new_time = it->first;
882       *stats_map = it->second;
883       return true;
884     } else {
885       return false;
886     }
887   }
888 }
889 
GetStatsHistory(uint64_t start_time,uint64_t end_time,std::unique_ptr<StatsHistoryIterator> * stats_iterator)890 Status DBImpl::GetStatsHistory(
891     uint64_t start_time, uint64_t end_time,
892     std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
893   if (!stats_iterator) {
894     return Status::InvalidArgument("stats_iterator not preallocated.");
895   }
896   if (immutable_db_options_.persist_stats_to_disk) {
897     stats_iterator->reset(
898         new PersistentStatsHistoryIterator(start_time, end_time, this));
899   } else {
900     stats_iterator->reset(
901         new InMemoryStatsHistoryIterator(start_time, end_time, this));
902   }
903   return (*stats_iterator)->status();
904 }
905 
DumpStats()906 void DBImpl::DumpStats() {
907   TEST_SYNC_POINT("DBImpl::DumpStats:1");
908 #ifndef ROCKSDB_LITE
909   const DBPropertyInfo* cf_property_info =
910       GetPropertyInfo(DB::Properties::kCFStats);
911   assert(cf_property_info != nullptr);
912   const DBPropertyInfo* db_property_info =
913       GetPropertyInfo(DB::Properties::kDBStats);
914   assert(db_property_info != nullptr);
915 
916   std::string stats;
917   if (shutdown_initiated_) {
918     return;
919   }
920   TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
921   {
922     InstrumentedMutexLock l(&mutex_);
923     default_cf_internal_stats_->GetStringProperty(
924         *db_property_info, DB::Properties::kDBStats, &stats);
925     for (auto cfd : *versions_->GetColumnFamilySet()) {
926       if (cfd->initialized()) {
927         cfd->internal_stats()->GetStringProperty(
928             *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
929       }
930     }
931     for (auto cfd : *versions_->GetColumnFamilySet()) {
932       if (cfd->initialized()) {
933         cfd->internal_stats()->GetStringProperty(
934             *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
935       }
936     }
937   }
938   TEST_SYNC_POINT("DBImpl::DumpStats:2");
939   ROCKS_LOG_INFO(immutable_db_options_.info_log,
940                  "------- DUMPING STATS -------");
941   ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
942   if (immutable_db_options_.dump_malloc_stats) {
943     stats.clear();
944     DumpMallocStats(&stats);
945     if (!stats.empty()) {
946       ROCKS_LOG_INFO(immutable_db_options_.info_log,
947                      "------- Malloc STATS -------");
948       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
949     }
950   }
951 #endif  // !ROCKSDB_LITE
952 
953   PrintStatistics();
954 }
955 
FlushInfoLog()956 void DBImpl::FlushInfoLog() {
957   if (shutdown_initiated_) {
958     return;
959   }
960   TEST_SYNC_POINT("DBImpl::FlushInfoLog:StartRunning");
961   LogFlush(immutable_db_options_.info_log);
962 }
963 
TablesRangeTombstoneSummary(ColumnFamilyHandle * column_family,int max_entries_to_print,std::string * out_str)964 Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
965                                            int max_entries_to_print,
966                                            std::string* out_str) {
967   auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
968   ColumnFamilyData* cfd = cfh->cfd();
969 
970   SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
971   Version* version = super_version->current;
972 
973   Status s =
974       version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
975 
976   CleanupSuperVersion(super_version);
977   return s;
978 }
979 
ScheduleBgLogWriterClose(JobContext * job_context)980 void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
981   if (!job_context->logs_to_free.empty()) {
982     for (auto l : job_context->logs_to_free) {
983       AddToLogsToFreeQueue(l);
984     }
985     job_context->logs_to_free.clear();
986   }
987 }
988 
GetDataDir(ColumnFamilyData * cfd,size_t path_id) const989 FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
990   assert(cfd);
991   FSDirectory* ret_dir = cfd->GetDataDir(path_id);
992   if (ret_dir == nullptr) {
993     return directories_.GetDataDir(path_id);
994   }
995   return ret_dir;
996 }
997 
SetOptions(ColumnFamilyHandle * column_family,const std::unordered_map<std::string,std::string> & options_map)998 Status DBImpl::SetOptions(
999     ColumnFamilyHandle* column_family,
1000     const std::unordered_map<std::string, std::string>& options_map) {
1001 #ifdef ROCKSDB_LITE
1002   (void)column_family;
1003   (void)options_map;
1004   return Status::NotSupported("Not supported in ROCKSDB LITE");
1005 #else
1006   auto* cfd =
1007       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
1008   if (options_map.empty()) {
1009     ROCKS_LOG_WARN(immutable_db_options_.info_log,
1010                    "SetOptions() on column family [%s], empty input",
1011                    cfd->GetName().c_str());
1012     return Status::InvalidArgument("empty input");
1013   }
1014 
1015   MutableCFOptions new_options;
1016   Status s;
1017   Status persist_options_status;
1018   persist_options_status.PermitUncheckedError();  // Allow uninitialized access
1019   SuperVersionContext sv_context(/* create_superversion */ true);
1020   {
1021     auto db_options = GetDBOptions();
1022     InstrumentedMutexLock l(&mutex_);
1023     s = cfd->SetOptions(db_options, options_map);
1024     if (s.ok()) {
1025       new_options = *cfd->GetLatestMutableCFOptions();
1026       // Append new version to recompute compaction score.
1027       VersionEdit dummy_edit;
1028       s = versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
1029                                  directories_.GetDbDir());
1030       // Trigger possible flush/compactions. This has to be before we persist
1031       // options to file, otherwise there will be a deadlock with writer
1032       // thread.
1033       InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
1034 
1035       persist_options_status = WriteOptionsFile(
1036           false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
1037       bg_cv_.SignalAll();
1038     }
1039   }
1040   sv_context.Clean();
1041 
1042   ROCKS_LOG_INFO(
1043       immutable_db_options_.info_log,
1044       "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
1045   for (const auto& o : options_map) {
1046     ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1047                    o.second.c_str());
1048   }
1049   if (s.ok()) {
1050     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1051                    "[%s] SetOptions() succeeded", cfd->GetName().c_str());
1052     new_options.Dump(immutable_db_options_.info_log.get());
1053     if (!persist_options_status.ok()) {
1054       s = persist_options_status;
1055     }
1056   } else {
1057     ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
1058                    cfd->GetName().c_str());
1059   }
1060   LogFlush(immutable_db_options_.info_log);
1061   return s;
1062 #endif  // ROCKSDB_LITE
1063 }
1064 
SetDBOptions(const std::unordered_map<std::string,std::string> & options_map)1065 Status DBImpl::SetDBOptions(
1066     const std::unordered_map<std::string, std::string>& options_map) {
1067 #ifdef ROCKSDB_LITE
1068   (void)options_map;
1069   return Status::NotSupported("Not supported in ROCKSDB LITE");
1070 #else
1071   if (options_map.empty()) {
1072     ROCKS_LOG_WARN(immutable_db_options_.info_log,
1073                    "SetDBOptions(), empty input.");
1074     return Status::InvalidArgument("empty input");
1075   }
1076 
1077   MutableDBOptions new_options;
1078   Status s;
1079   Status persist_options_status = Status::OK();
1080   bool wal_changed = false;
1081   WriteContext write_context;
1082   {
1083     InstrumentedMutexLock l(&mutex_);
1084     s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
1085                                        &new_options);
1086     if (new_options.bytes_per_sync == 0) {
1087       new_options.bytes_per_sync = 1024 * 1024;
1088     }
1089     DBOptions new_db_options =
1090         BuildDBOptions(immutable_db_options_, new_options);
1091     if (s.ok()) {
1092       s = ValidateOptions(new_db_options);
1093     }
1094     if (s.ok()) {
1095       for (auto c : *versions_->GetColumnFamilySet()) {
1096         if (!c->IsDropped()) {
1097           auto cf_options = c->GetLatestCFOptions();
1098           s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1099           if (!s.ok()) {
1100             break;
1101           }
1102         }
1103       }
1104     }
1105     if (s.ok()) {
1106       const BGJobLimits current_bg_job_limits =
1107           GetBGJobLimits(mutable_db_options_.max_background_flushes,
1108                          mutable_db_options_.max_background_compactions,
1109                          mutable_db_options_.max_background_jobs,
1110                          /* parallelize_compactions */ true);
1111       const BGJobLimits new_bg_job_limits = GetBGJobLimits(
1112           new_options.max_background_flushes,
1113           new_options.max_background_compactions,
1114           new_options.max_background_jobs, /* parallelize_compactions */ true);
1115 
1116       const bool max_flushes_increased =
1117           new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1118       const bool max_compactions_increased =
1119           new_bg_job_limits.max_compactions >
1120           current_bg_job_limits.max_compactions;
1121 
1122       if (max_flushes_increased || max_compactions_increased) {
1123         if (max_flushes_increased) {
1124           env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1125                                              Env::Priority::HIGH);
1126         }
1127 
1128         if (max_compactions_increased) {
1129           env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1130                                              Env::Priority::LOW);
1131         }
1132 
1133         MaybeScheduleFlushOrCompaction();
1134       }
1135 
1136       if (new_options.stats_dump_period_sec !=
1137               mutable_db_options_.stats_dump_period_sec ||
1138           new_options.stats_persist_period_sec !=
1139               mutable_db_options_.stats_persist_period_sec) {
1140         mutex_.Unlock();
1141         periodic_work_scheduler_->Unregister(this);
1142         periodic_work_scheduler_->Register(
1143             this, new_options.stats_dump_period_sec,
1144             new_options.stats_persist_period_sec);
1145         mutex_.Lock();
1146       }
1147       write_controller_.set_max_delayed_write_rate(
1148           new_options.delayed_write_rate);
1149       table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1150                                           ? TableCache::kInfiniteCapacity
1151                                           : new_options.max_open_files - 10);
1152       wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1153                     new_options.wal_bytes_per_sync;
1154       mutable_db_options_ = new_options;
1155       file_options_for_compaction_ = FileOptions(new_db_options);
1156       file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1157           file_options_for_compaction_, immutable_db_options_);
1158       versions_->ChangeFileOptions(mutable_db_options_);
1159       //TODO(xiez): clarify why apply optimize for read to write options
1160       file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1161           file_options_for_compaction_, immutable_db_options_);
1162       file_options_for_compaction_.compaction_readahead_size =
1163           mutable_db_options_.compaction_readahead_size;
1164       WriteThread::Writer w;
1165       write_thread_.EnterUnbatched(&w, &mutex_);
1166       if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1167         Status purge_wal_status = SwitchWAL(&write_context);
1168         if (!purge_wal_status.ok()) {
1169           ROCKS_LOG_WARN(immutable_db_options_.info_log,
1170                          "Unable to purge WAL files in SetDBOptions() -- %s",
1171                          purge_wal_status.ToString().c_str());
1172         }
1173       }
1174       persist_options_status = WriteOptionsFile(
1175           false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1176       write_thread_.ExitUnbatched(&w);
1177     } else {
1178       // To get here, we must have had invalid options and will not attempt to
1179       // persist the options, which means the status is "OK/Uninitialized.
1180       persist_options_status.PermitUncheckedError();
1181     }
1182   }
1183   ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1184   for (const auto& o : options_map) {
1185     ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1186                    o.second.c_str());
1187   }
1188   if (s.ok()) {
1189     ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1190     new_options.Dump(immutable_db_options_.info_log.get());
1191     if (!persist_options_status.ok()) {
1192       if (immutable_db_options_.fail_if_options_file_error) {
1193         s = Status::IOError(
1194             "SetDBOptions() succeeded, but unable to persist options",
1195             persist_options_status.ToString());
1196       }
1197       ROCKS_LOG_WARN(immutable_db_options_.info_log,
1198                      "Unable to persist options in SetDBOptions() -- %s",
1199                      persist_options_status.ToString().c_str());
1200     }
1201   } else {
1202     ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1203   }
1204   LogFlush(immutable_db_options_.info_log);
1205   return s;
1206 #endif  // ROCKSDB_LITE
1207 }
1208 
1209 // return the same level if it cannot be moved
FindMinimumEmptyLevelFitting(ColumnFamilyData * cfd,const MutableCFOptions &,int level)1210 int DBImpl::FindMinimumEmptyLevelFitting(
1211     ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1212     int level) {
1213   mutex_.AssertHeld();
1214   const auto* vstorage = cfd->current()->storage_info();
1215   int minimum_level = level;
1216   for (int i = level - 1; i > 0; --i) {
1217     // stop if level i is not empty
1218     if (vstorage->NumLevelFiles(i) > 0) break;
1219     // stop if level i is too small (cannot fit the level files)
1220     if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1221       break;
1222     }
1223 
1224     minimum_level = i;
1225   }
1226   return minimum_level;
1227 }
1228 
FlushWAL(bool sync)1229 Status DBImpl::FlushWAL(bool sync) {
1230   if (manual_wal_flush_) {
1231     IOStatus io_s;
1232     {
1233       // We need to lock log_write_mutex_ since logs_ might change concurrently
1234       InstrumentedMutexLock wl(&log_write_mutex_);
1235       log::Writer* cur_log_writer = logs_.back().writer;
1236       io_s = cur_log_writer->WriteBuffer();
1237     }
1238     if (!io_s.ok()) {
1239       ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1240                       io_s.ToString().c_str());
1241       // In case there is a fs error we should set it globally to prevent the
1242       // future writes
1243       IOStatusCheck(io_s);
1244       // whether sync or not, we should abort the rest of function upon error
1245       return std::move(io_s);
1246     }
1247     if (!sync) {
1248       ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1249       return std::move(io_s);
1250     }
1251   }
1252   if (!sync) {
1253     return Status::OK();
1254   }
1255   // sync = true
1256   ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1257   return SyncWAL();
1258 }
1259 
SyncWAL()1260 Status DBImpl::SyncWAL() {
1261   autovector<log::Writer*, 1> logs_to_sync;
1262   bool need_log_dir_sync;
1263   uint64_t current_log_number;
1264 
1265   {
1266     InstrumentedMutexLock l(&mutex_);
1267     assert(!logs_.empty());
1268 
1269     // This SyncWAL() call only cares about logs up to this number.
1270     current_log_number = logfile_number_;
1271 
1272     while (logs_.front().number <= current_log_number &&
1273            logs_.front().getting_synced) {
1274       log_sync_cv_.Wait();
1275     }
1276     // First check that logs are safe to sync in background.
1277     for (auto it = logs_.begin();
1278          it != logs_.end() && it->number <= current_log_number; ++it) {
1279       if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1280         return Status::NotSupported(
1281             "SyncWAL() is not supported for this implementation of WAL file",
1282             immutable_db_options_.allow_mmap_writes
1283                 ? "try setting Options::allow_mmap_writes to false"
1284                 : Slice());
1285       }
1286     }
1287     for (auto it = logs_.begin();
1288          it != logs_.end() && it->number <= current_log_number; ++it) {
1289       auto& log = *it;
1290       assert(!log.getting_synced);
1291       log.getting_synced = true;
1292       logs_to_sync.push_back(log.writer);
1293     }
1294 
1295     need_log_dir_sync = !log_dir_synced_;
1296   }
1297 
1298   TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
1299   RecordTick(stats_, WAL_FILE_SYNCED);
1300   Status status;
1301   IOStatus io_s;
1302   for (log::Writer* log : logs_to_sync) {
1303     io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1304     if (!io_s.ok()) {
1305       status = io_s;
1306       break;
1307     }
1308   }
1309   if (!io_s.ok()) {
1310     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
1311                     io_s.ToString().c_str());
1312     // In case there is a fs error we should set it globally to prevent the
1313     // future writes
1314     IOStatusCheck(io_s);
1315   }
1316   if (status.ok() && need_log_dir_sync) {
1317     status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
1318   }
1319   TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
1320 
1321   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1322   {
1323     InstrumentedMutexLock l(&mutex_);
1324     if (status.ok()) {
1325       status = MarkLogsSynced(current_log_number, need_log_dir_sync);
1326     } else {
1327       MarkLogsNotSynced(current_log_number);
1328     }
1329   }
1330   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1331 
1332   return status;
1333 }
1334 
LockWAL()1335 Status DBImpl::LockWAL() {
1336   log_write_mutex_.Lock();
1337   auto cur_log_writer = logs_.back().writer;
1338   auto status = cur_log_writer->WriteBuffer();
1339   if (!status.ok()) {
1340     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1341                     status.ToString().c_str());
1342     // In case there is a fs error we should set it globally to prevent the
1343     // future writes
1344     WriteStatusCheck(status);
1345   }
1346   return std::move(status);
1347 }
1348 
UnlockWAL()1349 Status DBImpl::UnlockWAL() {
1350   log_write_mutex_.Unlock();
1351   return Status::OK();
1352 }
1353 
MarkLogsSynced(uint64_t up_to,bool synced_dir)1354 Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
1355   mutex_.AssertHeld();
1356   if (synced_dir && logfile_number_ == up_to) {
1357     log_dir_synced_ = true;
1358   }
1359   VersionEdit synced_wals;
1360   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1361     auto& wal = *it;
1362     assert(wal.getting_synced);
1363     if (logs_.size() > 1) {
1364       if (immutable_db_options_.track_and_verify_wals_in_manifest &&
1365           wal.writer->file()->GetFileSize() > 0) {
1366         synced_wals.AddWal(wal.number,
1367                            WalMetadata(wal.writer->file()->GetFileSize()));
1368       }
1369       logs_to_free_.push_back(wal.ReleaseWriter());
1370       // To modify logs_ both mutex_ and log_write_mutex_ must be held
1371       InstrumentedMutexLock l(&log_write_mutex_);
1372       it = logs_.erase(it);
1373     } else {
1374       wal.getting_synced = false;
1375       ++it;
1376     }
1377   }
1378   assert(logs_.empty() || logs_[0].number > up_to ||
1379          (logs_.size() == 1 && !logs_[0].getting_synced));
1380 
1381   Status s;
1382   if (synced_wals.IsWalAddition()) {
1383     // not empty, write to MANIFEST.
1384     s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
1385     if (!s.ok() && versions_->io_status().IsIOError()) {
1386       s = error_handler_.SetBGError(versions_->io_status(),
1387                                     BackgroundErrorReason::kManifestWrite);
1388     }
1389   }
1390   log_sync_cv_.SignalAll();
1391   return s;
1392 }
1393 
MarkLogsNotSynced(uint64_t up_to)1394 void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
1395   mutex_.AssertHeld();
1396   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
1397        ++it) {
1398     auto& wal = *it;
1399     assert(wal.getting_synced);
1400     wal.getting_synced = false;
1401   }
1402   log_sync_cv_.SignalAll();
1403 }
1404 
GetLatestSequenceNumber() const1405 SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1406   return versions_->LastSequence();
1407 }
1408 
SetLastPublishedSequence(SequenceNumber seq)1409 void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1410   versions_->SetLastPublishedSequence(seq);
1411 }
1412 
SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)1413 bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1414   if (seqnum > preserve_deletes_seqnum_.load()) {
1415     preserve_deletes_seqnum_.store(seqnum);
1416     return true;
1417   } else {
1418     return false;
1419   }
1420 }
1421 
NewInternalIterator(const ReadOptions & read_options,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,ColumnFamilyHandle * column_family,bool allow_unprepared_value)1422 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1423                                               Arena* arena,
1424                                               RangeDelAggregator* range_del_agg,
1425                                               SequenceNumber sequence,
1426                                               ColumnFamilyHandle* column_family,
1427                                               bool allow_unprepared_value) {
1428   ColumnFamilyData* cfd;
1429   if (column_family == nullptr) {
1430     cfd = default_cf_handle_->cfd();
1431   } else {
1432     auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1433     cfd = cfh->cfd();
1434   }
1435 
1436   mutex_.Lock();
1437   SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1438   mutex_.Unlock();
1439   return NewInternalIterator(read_options, cfd, super_version, arena,
1440                              range_del_agg, sequence, allow_unprepared_value);
1441 }
1442 
SchedulePurge()1443 void DBImpl::SchedulePurge() {
1444   mutex_.AssertHeld();
1445   assert(opened_successfully_);
1446 
1447   // Purge operations are put into High priority queue
1448   bg_purge_scheduled_++;
1449   env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1450 }
1451 
BackgroundCallPurge()1452 void DBImpl::BackgroundCallPurge() {
1453   mutex_.Lock();
1454 
1455   while (!logs_to_free_queue_.empty()) {
1456     assert(!logs_to_free_queue_.empty());
1457     log::Writer* log_writer = *(logs_to_free_queue_.begin());
1458     logs_to_free_queue_.pop_front();
1459     mutex_.Unlock();
1460     delete log_writer;
1461     mutex_.Lock();
1462   }
1463   while (!superversions_to_free_queue_.empty()) {
1464     assert(!superversions_to_free_queue_.empty());
1465     SuperVersion* sv = superversions_to_free_queue_.front();
1466     superversions_to_free_queue_.pop_front();
1467     mutex_.Unlock();
1468     delete sv;
1469     mutex_.Lock();
1470   }
1471 
1472   // Can't use iterator to go over purge_files_ because inside the loop we're
1473   // unlocking the mutex that protects purge_files_.
1474   while (!purge_files_.empty()) {
1475     auto it = purge_files_.begin();
1476     // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1477     PurgeFileInfo purge_file = it->second;
1478 
1479     const std::string& fname = purge_file.fname;
1480     const std::string& dir_to_sync = purge_file.dir_to_sync;
1481     FileType type = purge_file.type;
1482     uint64_t number = purge_file.number;
1483     int job_id = purge_file.job_id;
1484 
1485     purge_files_.erase(it);
1486 
1487     mutex_.Unlock();
1488     DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1489     mutex_.Lock();
1490   }
1491 
1492   bg_purge_scheduled_--;
1493 
1494   bg_cv_.SignalAll();
1495   // IMPORTANT:there should be no code after calling SignalAll. This call may
1496   // signal the DB destructor that it's OK to proceed with destruction. In
1497   // that case, all DB variables will be dealloacated and referencing them
1498   // will cause trouble.
1499   mutex_.Unlock();
1500 }
1501 
1502 namespace {
1503 struct IterState {
IterStateROCKSDB_NAMESPACE::__anonf5eb00d20211::IterState1504   IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1505             bool _background_purge)
1506       : db(_db),
1507         mu(_mu),
1508         super_version(_super_version),
1509         background_purge(_background_purge) {}
1510 
1511   DBImpl* db;
1512   InstrumentedMutex* mu;
1513   SuperVersion* super_version;
1514   bool background_purge;
1515 };
1516 
CleanupIteratorState(void * arg1,void *)1517 static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
1518   IterState* state = reinterpret_cast<IterState*>(arg1);
1519 
1520   if (state->super_version->Unref()) {
1521     // Job id == 0 means that this is not our background process, but rather
1522     // user thread
1523     JobContext job_context(0);
1524 
1525     state->mu->Lock();
1526     state->super_version->Cleanup();
1527     state->db->FindObsoleteFiles(&job_context, false, true);
1528     if (state->background_purge) {
1529       state->db->ScheduleBgLogWriterClose(&job_context);
1530       state->db->AddSuperVersionsToFreeQueue(state->super_version);
1531       state->db->SchedulePurge();
1532     }
1533     state->mu->Unlock();
1534 
1535     if (!state->background_purge) {
1536       delete state->super_version;
1537     }
1538     if (job_context.HaveSomethingToDelete()) {
1539       if (state->background_purge) {
1540         // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1541         // files to be deleted to a job queue, and deletes it in a separate
1542         // background thread.
1543         state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1544         state->mu->Lock();
1545         state->db->SchedulePurge();
1546         state->mu->Unlock();
1547       } else {
1548         state->db->PurgeObsoleteFiles(job_context);
1549       }
1550     }
1551     job_context.Clean();
1552   }
1553 
1554   delete state;
1555 }
1556 }  // namespace
1557 
NewInternalIterator(const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * super_version,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,bool allow_unprepared_value)1558 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1559                                               ColumnFamilyData* cfd,
1560                                               SuperVersion* super_version,
1561                                               Arena* arena,
1562                                               RangeDelAggregator* range_del_agg,
1563                                               SequenceNumber sequence,
1564                                               bool allow_unprepared_value) {
1565   InternalIterator* internal_iter;
1566   assert(arena != nullptr);
1567   assert(range_del_agg != nullptr);
1568   // Need to create internal iterator from the arena.
1569   MergeIteratorBuilder merge_iter_builder(
1570       &cfd->internal_comparator(), arena,
1571       !read_options.total_order_seek &&
1572           super_version->mutable_cf_options.prefix_extractor != nullptr);
1573   // Collect iterator for mutable mem
1574   merge_iter_builder.AddIterator(
1575       super_version->mem->NewIterator(read_options, arena));
1576   std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
1577   Status s;
1578   if (!read_options.ignore_range_deletions) {
1579     range_del_iter.reset(
1580         super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1581     range_del_agg->AddTombstones(std::move(range_del_iter));
1582   }
1583   // Collect all needed child iterators for immutable memtables
1584   if (s.ok()) {
1585     super_version->imm->AddIterators(read_options, &merge_iter_builder);
1586     if (!read_options.ignore_range_deletions) {
1587       s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1588                                                          range_del_agg);
1589     }
1590   }
1591   TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
1592   if (s.ok()) {
1593     // Collect iterators for files in L0 - Ln
1594     if (read_options.read_tier != kMemtableTier) {
1595       super_version->current->AddIterators(read_options, file_options_,
1596                                            &merge_iter_builder, range_del_agg,
1597                                            allow_unprepared_value);
1598     }
1599     internal_iter = merge_iter_builder.Finish();
1600     IterState* cleanup =
1601         new IterState(this, &mutex_, super_version,
1602                       read_options.background_purge_on_iterator_cleanup ||
1603                       immutable_db_options_.avoid_unnecessary_blocking_io);
1604     internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1605 
1606     return internal_iter;
1607   } else {
1608     CleanupSuperVersion(super_version);
1609   }
1610   return NewErrorInternalIterator<Slice>(s, arena);
1611 }
1612 
DefaultColumnFamily() const1613 ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1614   return default_cf_handle_;
1615 }
1616 
PersistentStatsColumnFamily() const1617 ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1618   return persist_stats_cf_handle_;
1619 }
1620 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1621 Status DBImpl::Get(const ReadOptions& read_options,
1622                    ColumnFamilyHandle* column_family, const Slice& key,
1623                    PinnableSlice* value) {
1624   return Get(read_options, column_family, key, value, /*timestamp=*/nullptr);
1625 }
1626 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value,std::string * timestamp)1627 Status DBImpl::Get(const ReadOptions& read_options,
1628                    ColumnFamilyHandle* column_family, const Slice& key,
1629                    PinnableSlice* value, std::string* timestamp) {
1630   GetImplOptions get_impl_options;
1631   get_impl_options.column_family = column_family;
1632   get_impl_options.value = value;
1633   get_impl_options.timestamp = timestamp;
1634   Status s = GetImpl(read_options, key, get_impl_options);
1635   return s;
1636 }
1637 
1638 namespace {
1639 class GetWithTimestampReadCallback : public ReadCallback {
1640  public:
GetWithTimestampReadCallback(SequenceNumber seq)1641   explicit GetWithTimestampReadCallback(SequenceNumber seq)
1642       : ReadCallback(seq) {}
IsVisibleFullCheck(SequenceNumber seq)1643   bool IsVisibleFullCheck(SequenceNumber seq) override {
1644     return seq <= max_visible_seq_;
1645   }
1646 };
1647 }  // namespace
1648 
GetImpl(const ReadOptions & read_options,const Slice & key,GetImplOptions & get_impl_options)1649 Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
1650                        GetImplOptions& get_impl_options) {
1651   assert(get_impl_options.value != nullptr ||
1652          get_impl_options.merge_operands != nullptr);
1653 
1654   assert(get_impl_options.column_family);
1655   const Comparator* ucmp = get_impl_options.column_family->GetComparator();
1656   assert(ucmp);
1657   size_t ts_sz = ucmp->timestamp_size();
1658   GetWithTimestampReadCallback read_cb(0);  // Will call Refresh
1659 
1660 #ifndef NDEBUG
1661   if (ts_sz > 0) {
1662     assert(read_options.timestamp);
1663     assert(read_options.timestamp->size() == ts_sz);
1664   } else {
1665     assert(!read_options.timestamp);
1666   }
1667 #endif  // NDEBUG
1668 
1669   PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
1670   StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
1671   PERF_TIMER_GUARD(get_snapshot_time);
1672 
1673   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
1674       get_impl_options.column_family);
1675   auto cfd = cfh->cfd();
1676 
1677   if (tracer_) {
1678     // TODO: This mutex should be removed later, to improve performance when
1679     // tracing is enabled.
1680     InstrumentedMutexLock lock(&trace_mutex_);
1681     if (tracer_) {
1682       // TODO: maybe handle the tracing status?
1683       tracer_->Get(get_impl_options.column_family, key).PermitUncheckedError();
1684     }
1685   }
1686 
1687   // Acquire SuperVersion
1688   SuperVersion* sv = GetAndRefSuperVersion(cfd);
1689 
1690   TEST_SYNC_POINT("DBImpl::GetImpl:1");
1691   TEST_SYNC_POINT("DBImpl::GetImpl:2");
1692 
1693   SequenceNumber snapshot;
1694   if (read_options.snapshot != nullptr) {
1695     if (get_impl_options.callback) {
1696       // Already calculated based on read_options.snapshot
1697       snapshot = get_impl_options.callback->max_visible_seq();
1698     } else {
1699       snapshot =
1700           reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1701     }
1702   } else {
1703     // Note that the snapshot is assigned AFTER referencing the super
1704     // version because otherwise a flush happening in between may compact away
1705     // data for the snapshot, so the reader would see neither data that was be
1706     // visible to the snapshot before compaction nor the newer data inserted
1707     // afterwards.
1708     if (last_seq_same_as_publish_seq_) {
1709       snapshot = versions_->LastSequence();
1710     } else {
1711       snapshot = versions_->LastPublishedSequence();
1712     }
1713     if (get_impl_options.callback) {
1714       // The unprep_seqs are not published for write unprepared, so it could be
1715       // that max_visible_seq is larger. Seek to the std::max of the two.
1716       // However, we still want our callback to contain the actual snapshot so
1717       // that it can do the correct visibility filtering.
1718       get_impl_options.callback->Refresh(snapshot);
1719 
1720       // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1721       // max_visible_seq = max(max_visible_seq, snapshot)
1722       //
1723       // Currently, the commented out assert is broken by
1724       // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1725       // the regular transaction flow, then this special read callback would not
1726       // be needed.
1727       //
1728       // assert(callback->max_visible_seq() >= snapshot);
1729       snapshot = get_impl_options.callback->max_visible_seq();
1730     }
1731   }
1732   // If timestamp is used, we use read callback to ensure <key,t,s> is returned
1733   // only if t <= read_opts.timestamp and s <= snapshot.
1734   if (ts_sz > 0) {
1735     assert(!get_impl_options
1736                 .callback);  // timestamp with callback is not supported
1737     read_cb.Refresh(snapshot);
1738     get_impl_options.callback = &read_cb;
1739   }
1740   TEST_SYNC_POINT("DBImpl::GetImpl:3");
1741   TEST_SYNC_POINT("DBImpl::GetImpl:4");
1742 
1743   // Prepare to store a list of merge operations if merge occurs.
1744   MergeContext merge_context;
1745   SequenceNumber max_covering_tombstone_seq = 0;
1746 
1747   Status s;
1748   // First look in the memtable, then in the immutable memtable (if any).
1749   // s is both in/out. When in, s could either be OK or MergeInProgress.
1750   // merge_operands will contain the sequence of merges in the latter case.
1751   LookupKey lkey(key, snapshot, read_options.timestamp);
1752   PERF_TIMER_STOP(get_snapshot_time);
1753 
1754   bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1755                         has_unpersisted_data_.load(std::memory_order_relaxed));
1756   bool done = false;
1757   std::string* timestamp = ts_sz > 0 ? get_impl_options.timestamp : nullptr;
1758   if (!skip_memtable) {
1759     // Get value associated with key
1760     if (get_impl_options.get_value) {
1761       if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), timestamp, &s,
1762                        &merge_context, &max_covering_tombstone_seq,
1763                        read_options, get_impl_options.callback,
1764                        get_impl_options.is_blob_index)) {
1765         done = true;
1766         get_impl_options.value->PinSelf();
1767         RecordTick(stats_, MEMTABLE_HIT);
1768       } else if ((s.ok() || s.IsMergeInProgress()) &&
1769                  sv->imm->Get(lkey, get_impl_options.value->GetSelf(),
1770                               timestamp, &s, &merge_context,
1771                               &max_covering_tombstone_seq, read_options,
1772                               get_impl_options.callback,
1773                               get_impl_options.is_blob_index)) {
1774         done = true;
1775         get_impl_options.value->PinSelf();
1776         RecordTick(stats_, MEMTABLE_HIT);
1777       }
1778     } else {
1779       // Get Merge Operands associated with key, Merge Operands should not be
1780       // merged and raw values should be returned to the user.
1781       if (sv->mem->Get(lkey, /*value*/ nullptr, /*timestamp=*/nullptr, &s,
1782                        &merge_context, &max_covering_tombstone_seq,
1783                        read_options, nullptr, nullptr, false)) {
1784         done = true;
1785         RecordTick(stats_, MEMTABLE_HIT);
1786       } else if ((s.ok() || s.IsMergeInProgress()) &&
1787                  sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1788                                            &max_covering_tombstone_seq,
1789                                            read_options)) {
1790         done = true;
1791         RecordTick(stats_, MEMTABLE_HIT);
1792       }
1793     }
1794     if (!done && !s.ok() && !s.IsMergeInProgress()) {
1795       ReturnAndCleanupSuperVersion(cfd, sv);
1796       return s;
1797     }
1798   }
1799   if (!done) {
1800     PERF_TIMER_GUARD(get_from_output_files_time);
1801     sv->current->Get(
1802         read_options, lkey, get_impl_options.value, timestamp, &s,
1803         &merge_context, &max_covering_tombstone_seq,
1804         get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1805         nullptr, nullptr,
1806         get_impl_options.get_value ? get_impl_options.callback : nullptr,
1807         get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1808         get_impl_options.get_value);
1809     RecordTick(stats_, MEMTABLE_MISS);
1810   }
1811 
1812   {
1813     PERF_TIMER_GUARD(get_post_process_time);
1814 
1815     ReturnAndCleanupSuperVersion(cfd, sv);
1816 
1817     RecordTick(stats_, NUMBER_KEYS_READ);
1818     size_t size = 0;
1819     if (s.ok()) {
1820       if (get_impl_options.get_value) {
1821         size = get_impl_options.value->size();
1822       } else {
1823         // Return all merge operands for get_impl_options.key
1824         *get_impl_options.number_of_operands =
1825             static_cast<int>(merge_context.GetNumOperands());
1826         if (*get_impl_options.number_of_operands >
1827             get_impl_options.get_merge_operands_options
1828                 ->expected_max_number_of_operands) {
1829           s = Status::Incomplete(
1830               Status::SubCode::KMergeOperandsInsufficientCapacity);
1831         } else {
1832           for (const Slice& sl : merge_context.GetOperands()) {
1833             size += sl.size();
1834             get_impl_options.merge_operands->PinSelf(sl);
1835             get_impl_options.merge_operands++;
1836           }
1837         }
1838       }
1839       RecordTick(stats_, BYTES_READ, size);
1840       PERF_COUNTER_ADD(get_read_bytes, size);
1841     }
1842     RecordInHistogram(stats_, BYTES_PER_READ, size);
1843   }
1844   return s;
1845 }
1846 
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)1847 std::vector<Status> DBImpl::MultiGet(
1848     const ReadOptions& read_options,
1849     const std::vector<ColumnFamilyHandle*>& column_family,
1850     const std::vector<Slice>& keys, std::vector<std::string>* values) {
1851   return MultiGet(read_options, column_family, keys, values,
1852                   /*timestamps=*/nullptr);
1853 }
1854 
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values,std::vector<std::string> * timestamps)1855 std::vector<Status> DBImpl::MultiGet(
1856     const ReadOptions& read_options,
1857     const std::vector<ColumnFamilyHandle*>& column_family,
1858     const std::vector<Slice>& keys, std::vector<std::string>* values,
1859     std::vector<std::string>* timestamps) {
1860   PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
1861   StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
1862   PERF_TIMER_GUARD(get_snapshot_time);
1863 
1864 #ifndef NDEBUG
1865   for (const auto* cfh : column_family) {
1866     assert(cfh);
1867     const Comparator* const ucmp = cfh->GetComparator();
1868     assert(ucmp);
1869     if (ucmp->timestamp_size() > 0) {
1870       assert(read_options.timestamp);
1871       assert(ucmp->timestamp_size() == read_options.timestamp->size());
1872     } else {
1873       assert(!read_options.timestamp);
1874     }
1875   }
1876 #endif  // NDEBUG
1877 
1878   SequenceNumber consistent_seqnum;
1879 
1880   std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1881       column_family.size());
1882   for (auto cf : column_family) {
1883     auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
1884     auto cfd = cfh->cfd();
1885     if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
1886       multiget_cf_data.emplace(cfd->GetID(),
1887                                MultiGetColumnFamilyData(cfh, nullptr));
1888     }
1889   }
1890 
1891   std::function<MultiGetColumnFamilyData*(
1892       std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1893       iter_deref_lambda =
1894           [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1895                  cf_iter) { return &cf_iter->second; };
1896 
1897   bool unref_only =
1898       MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1899           read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1900           &consistent_seqnum);
1901 
1902   TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum1");
1903   TEST_SYNC_POINT("DBImpl::MultiGet:AfterGetSeqNum2");
1904 
1905   // Contain a list of merge operations if merge occurs.
1906   MergeContext merge_context;
1907 
1908   // Note: this always resizes the values array
1909   size_t num_keys = keys.size();
1910   std::vector<Status> stat_list(num_keys);
1911   values->resize(num_keys);
1912   if (timestamps) {
1913     timestamps->resize(num_keys);
1914   }
1915 
1916   // Keep track of bytes that we read for statistics-recording later
1917   uint64_t bytes_read = 0;
1918   PERF_TIMER_STOP(get_snapshot_time);
1919 
1920   // For each of the given keys, apply the entire "get" process as follows:
1921   // First look in the memtable, then in the immutable memtable (if any).
1922   // s is both in/out. When in, s could either be OK or MergeInProgress.
1923   // merge_operands will contain the sequence of merges in the latter case.
1924   size_t num_found = 0;
1925   size_t keys_read;
1926   uint64_t curr_value_size = 0;
1927 
1928   GetWithTimestampReadCallback timestamp_read_callback(0);
1929   ReadCallback* read_callback = nullptr;
1930   if (read_options.timestamp && read_options.timestamp->size() > 0) {
1931     timestamp_read_callback.Refresh(consistent_seqnum);
1932     read_callback = &timestamp_read_callback;
1933   }
1934 
1935   for (keys_read = 0; keys_read < num_keys; ++keys_read) {
1936     merge_context.Clear();
1937     Status& s = stat_list[keys_read];
1938     std::string* value = &(*values)[keys_read];
1939     std::string* timestamp = timestamps ? &(*timestamps)[keys_read] : nullptr;
1940 
1941     LookupKey lkey(keys[keys_read], consistent_seqnum, read_options.timestamp);
1942     auto cfh =
1943         static_cast_with_check<ColumnFamilyHandleImpl>(column_family[keys_read]);
1944     SequenceNumber max_covering_tombstone_seq = 0;
1945     auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1946     assert(mgd_iter != multiget_cf_data.end());
1947     auto mgd = mgd_iter->second;
1948     auto super_version = mgd.super_version;
1949     bool skip_memtable =
1950         (read_options.read_tier == kPersistedTier &&
1951          has_unpersisted_data_.load(std::memory_order_relaxed));
1952     bool done = false;
1953     if (!skip_memtable) {
1954       if (super_version->mem->Get(lkey, value, timestamp, &s, &merge_context,
1955                                   &max_covering_tombstone_seq, read_options,
1956                                   read_callback)) {
1957         done = true;
1958         RecordTick(stats_, MEMTABLE_HIT);
1959       } else if (super_version->imm->Get(lkey, value, timestamp, &s,
1960                                          &merge_context,
1961                                          &max_covering_tombstone_seq,
1962                                          read_options, read_callback)) {
1963         done = true;
1964         RecordTick(stats_, MEMTABLE_HIT);
1965       }
1966     }
1967     if (!done) {
1968       PinnableSlice pinnable_val;
1969       PERF_TIMER_GUARD(get_from_output_files_time);
1970       super_version->current->Get(
1971           read_options, lkey, &pinnable_val, timestamp, &s, &merge_context,
1972           &max_covering_tombstone_seq, /*value_found=*/nullptr,
1973           /*key_exists=*/nullptr,
1974           /*seq=*/nullptr, read_callback);
1975       value->assign(pinnable_val.data(), pinnable_val.size());
1976       RecordTick(stats_, MEMTABLE_MISS);
1977     }
1978 
1979     if (s.ok()) {
1980       bytes_read += value->size();
1981       num_found++;
1982       curr_value_size += value->size();
1983       if (curr_value_size > read_options.value_size_soft_limit) {
1984         while (++keys_read < num_keys) {
1985           stat_list[keys_read] = Status::Aborted();
1986         }
1987         break;
1988       }
1989     }
1990     if (read_options.deadline.count() &&
1991         immutable_db_options_.clock->NowMicros() >
1992             static_cast<uint64_t>(read_options.deadline.count())) {
1993       break;
1994     }
1995   }
1996 
1997   if (keys_read < num_keys) {
1998     // The only reason to break out of the loop is when the deadline is
1999     // exceeded
2000     assert(immutable_db_options_.clock->NowMicros() >
2001            static_cast<uint64_t>(read_options.deadline.count()));
2002     for (++keys_read; keys_read < num_keys; ++keys_read) {
2003       stat_list[keys_read] = Status::TimedOut();
2004     }
2005   }
2006 
2007   // Post processing (decrement reference counts and record statistics)
2008   PERF_TIMER_GUARD(get_post_process_time);
2009   autovector<SuperVersion*> superversions_to_delete;
2010 
2011   for (auto mgd_iter : multiget_cf_data) {
2012     auto mgd = mgd_iter.second;
2013     if (!unref_only) {
2014       ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
2015     } else {
2016       mgd.cfd->GetSuperVersion()->Unref();
2017     }
2018   }
2019   RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2020   RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2021   RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2022   RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2023   RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2024   PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2025   PERF_TIMER_STOP(get_post_process_time);
2026 
2027   return stat_list;
2028 }
2029 
2030 template <class T>
MultiCFSnapshot(const ReadOptions & read_options,ReadCallback * callback,std::function<MultiGetColumnFamilyData * (typename T::iterator &)> & iter_deref_func,T * cf_list,SequenceNumber * snapshot)2031 bool DBImpl::MultiCFSnapshot(
2032     const ReadOptions& read_options, ReadCallback* callback,
2033     std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
2034         iter_deref_func,
2035     T* cf_list, SequenceNumber* snapshot) {
2036   PERF_TIMER_GUARD(get_snapshot_time);
2037 
2038   bool last_try = false;
2039   if (cf_list->size() == 1) {
2040     // Fast path for a single column family. We can simply get the thread loca
2041     // super version
2042     auto cf_iter = cf_list->begin();
2043     auto node = iter_deref_func(cf_iter);
2044     node->super_version = GetAndRefSuperVersion(node->cfd);
2045     if (read_options.snapshot != nullptr) {
2046       // Note: In WritePrepared txns this is not necessary but not harmful
2047       // either.  Because prep_seq > snapshot => commit_seq > snapshot so if
2048       // a snapshot is specified we should be fine with skipping seq numbers
2049       // that are greater than that.
2050       //
2051       // In WriteUnprepared, we cannot set snapshot in the lookup key because we
2052       // may skip uncommitted data that should be visible to the transaction for
2053       // reading own writes.
2054       *snapshot =
2055           static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
2056       if (callback) {
2057         *snapshot = std::max(*snapshot, callback->max_visible_seq());
2058       }
2059     } else {
2060       // Since we get and reference the super version before getting
2061       // the snapshot number, without a mutex protection, it is possible
2062       // that a memtable switch happened in the middle and not all the
2063       // data for this snapshot is available. But it will contain all
2064       // the data available in the super version we have, which is also
2065       // a valid snapshot to read from.
2066       // We shouldn't get snapshot before finding and referencing the super
2067       // version because a flush happening in between may compact away data for
2068       // the snapshot, but the snapshot is earlier than the data overwriting it,
2069       // so users may see wrong results.
2070       if (last_seq_same_as_publish_seq_) {
2071         *snapshot = versions_->LastSequence();
2072       } else {
2073         *snapshot = versions_->LastPublishedSequence();
2074       }
2075     }
2076   } else {
2077     // If we end up with the same issue of memtable geting sealed during 2
2078     // consecutive retries, it means the write rate is very high. In that case
2079     // its probably ok to take the mutex on the 3rd try so we can succeed for
2080     // sure
2081     static const int num_retries = 3;
2082     for (int i = 0; i < num_retries; ++i) {
2083       last_try = (i == num_retries - 1);
2084       bool retry = false;
2085 
2086       if (i > 0) {
2087         for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2088              ++cf_iter) {
2089           auto node = iter_deref_func(cf_iter);
2090           SuperVersion* super_version = node->super_version;
2091           ColumnFamilyData* cfd = node->cfd;
2092           if (super_version != nullptr) {
2093             ReturnAndCleanupSuperVersion(cfd, super_version);
2094           }
2095           node->super_version = nullptr;
2096         }
2097       }
2098       if (read_options.snapshot == nullptr) {
2099         if (last_try) {
2100           TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
2101           // We're close to max number of retries. For the last retry,
2102           // acquire the lock so we're sure to succeed
2103           mutex_.Lock();
2104         }
2105         if (last_seq_same_as_publish_seq_) {
2106           *snapshot = versions_->LastSequence();
2107         } else {
2108           *snapshot = versions_->LastPublishedSequence();
2109         }
2110       } else {
2111         *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
2112                         ->number_;
2113       }
2114       for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
2115            ++cf_iter) {
2116         auto node = iter_deref_func(cf_iter);
2117         if (!last_try) {
2118           node->super_version = GetAndRefSuperVersion(node->cfd);
2119         } else {
2120           node->super_version = node->cfd->GetSuperVersion()->Ref();
2121         }
2122         TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
2123         if (read_options.snapshot != nullptr || last_try) {
2124           // If user passed a snapshot, then we don't care if a memtable is
2125           // sealed or compaction happens because the snapshot would ensure
2126           // that older key versions are kept around. If this is the last
2127           // retry, then we have the lock so nothing bad can happen
2128           continue;
2129         }
2130         // We could get the earliest sequence number for the whole list of
2131         // memtables, which will include immutable memtables as well, but that
2132         // might be tricky to maintain in case we decide, in future, to do
2133         // memtable compaction.
2134         if (!last_try) {
2135           SequenceNumber seq =
2136               node->super_version->mem->GetEarliestSequenceNumber();
2137           if (seq > *snapshot) {
2138             retry = true;
2139             break;
2140           }
2141         }
2142       }
2143       if (!retry) {
2144         if (last_try) {
2145           mutex_.Unlock();
2146         }
2147         break;
2148       }
2149     }
2150   }
2151 
2152   // Keep track of bytes that we read for statistics-recording later
2153   PERF_TIMER_STOP(get_snapshot_time);
2154 
2155   return last_try;
2156 }
2157 
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2158 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2159                       ColumnFamilyHandle** column_families, const Slice* keys,
2160                       PinnableSlice* values, Status* statuses,
2161                       const bool sorted_input) {
2162   return MultiGet(read_options, num_keys, column_families, keys, values,
2163                   /*timestamps=*/nullptr, statuses, sorted_input);
2164 }
2165 
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)2166 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
2167                       ColumnFamilyHandle** column_families, const Slice* keys,
2168                       PinnableSlice* values, std::string* timestamps,
2169                       Status* statuses, const bool sorted_input) {
2170   if (num_keys == 0) {
2171     return;
2172   }
2173 
2174 #ifndef NDEBUG
2175   for (size_t i = 0; i < num_keys; ++i) {
2176     ColumnFamilyHandle* cfh = column_families[i];
2177     assert(cfh);
2178     const Comparator* const ucmp = cfh->GetComparator();
2179     assert(ucmp);
2180     if (ucmp->timestamp_size() > 0) {
2181       assert(read_options.timestamp);
2182       assert(read_options.timestamp->size() == ucmp->timestamp_size());
2183     } else {
2184       assert(!read_options.timestamp);
2185     }
2186   }
2187 #endif  // NDEBUG
2188 
2189   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2190   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2191   sorted_keys.resize(num_keys);
2192   for (size_t i = 0; i < num_keys; ++i) {
2193     key_context.emplace_back(column_families[i], keys[i], &values[i],
2194                              timestamps ? &timestamps[i] : nullptr,
2195                              &statuses[i]);
2196   }
2197   for (size_t i = 0; i < num_keys; ++i) {
2198     sorted_keys[i] = &key_context[i];
2199   }
2200   PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2201 
2202   autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
2203       multiget_cf_data;
2204   size_t cf_start = 0;
2205   ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
2206   for (size_t i = 0; i < num_keys; ++i) {
2207     KeyContext* key_ctx = sorted_keys[i];
2208     if (key_ctx->column_family != cf) {
2209       multiget_cf_data.emplace_back(
2210           MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
2211       cf_start = i;
2212       cf = key_ctx->column_family;
2213     }
2214   }
2215   {
2216     // multiget_cf_data.emplace_back(
2217     // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
2218     multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
2219   }
2220   std::function<MultiGetColumnFamilyData*(
2221       autovector<MultiGetColumnFamilyData,
2222                  MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
2223       iter_deref_lambda =
2224           [](autovector<MultiGetColumnFamilyData,
2225                         MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
2226             return &(*cf_iter);
2227           };
2228 
2229   SequenceNumber consistent_seqnum;
2230   bool unref_only = MultiCFSnapshot<
2231       autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
2232       read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
2233       &consistent_seqnum);
2234 
2235   GetWithTimestampReadCallback timestamp_read_callback(0);
2236   ReadCallback* read_callback = nullptr;
2237   if (read_options.timestamp && read_options.timestamp->size() > 0) {
2238     timestamp_read_callback.Refresh(consistent_seqnum);
2239     read_callback = &timestamp_read_callback;
2240   }
2241 
2242   Status s;
2243   auto cf_iter = multiget_cf_data.begin();
2244   for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2245     s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
2246                      &sorted_keys, cf_iter->super_version, consistent_seqnum,
2247                      read_callback);
2248     if (!s.ok()) {
2249       break;
2250     }
2251   }
2252   if (!s.ok()) {
2253     assert(s.IsTimedOut() || s.IsAborted());
2254     for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
2255       for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
2256            ++i) {
2257         *sorted_keys[i]->s = s;
2258       }
2259     }
2260   }
2261 
2262   for (const auto& iter : multiget_cf_data) {
2263     if (!unref_only) {
2264       ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
2265     } else {
2266       iter.cfd->GetSuperVersion()->Unref();
2267     }
2268   }
2269 }
2270 
2271 namespace {
2272 // Order keys by CF ID, followed by key contents
2273 struct CompareKeyContext {
operator ()ROCKSDB_NAMESPACE::__anonf5eb00d20611::CompareKeyContext2274   inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
2275     ColumnFamilyHandleImpl* cfh =
2276         static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2277     uint32_t cfd_id1 = cfh->cfd()->GetID();
2278     const Comparator* comparator = cfh->cfd()->user_comparator();
2279     cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2280     uint32_t cfd_id2 = cfh->cfd()->GetID();
2281 
2282     if (cfd_id1 < cfd_id2) {
2283       return true;
2284     } else if (cfd_id1 > cfd_id2) {
2285       return false;
2286     }
2287 
2288     // Both keys are from the same column family
2289     int cmp = comparator->CompareWithoutTimestamp(
2290         *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
2291     if (cmp < 0) {
2292       return true;
2293     }
2294     return false;
2295   }
2296 };
2297 
2298 }  // anonymous namespace
2299 
PrepareMultiGetKeys(size_t num_keys,bool sorted_input,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2300 void DBImpl::PrepareMultiGetKeys(
2301     size_t num_keys, bool sorted_input,
2302     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2303 #ifndef NDEBUG
2304   if (sorted_input) {
2305     for (size_t index = 0; index < sorted_keys->size(); ++index) {
2306       if (index > 0) {
2307         KeyContext* lhs = (*sorted_keys)[index - 1];
2308         KeyContext* rhs = (*sorted_keys)[index];
2309         ColumnFamilyHandleImpl* cfh =
2310             static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
2311         uint32_t cfd_id1 = cfh->cfd()->GetID();
2312         const Comparator* comparator = cfh->cfd()->user_comparator();
2313         cfh =
2314             static_cast_with_check<ColumnFamilyHandleImpl>(lhs->column_family);
2315         uint32_t cfd_id2 = cfh->cfd()->GetID();
2316 
2317         assert(cfd_id1 <= cfd_id2);
2318         if (cfd_id1 < cfd_id2) {
2319           continue;
2320         }
2321 
2322         // Both keys are from the same column family
2323         int cmp = comparator->CompareWithoutTimestamp(
2324             *(lhs->key), /*a_has_ts=*/false, *(rhs->key), /*b_has_ts=*/false);
2325         assert(cmp <= 0);
2326       }
2327       index++;
2328     }
2329   }
2330 #endif
2331   if (!sorted_input) {
2332     CompareKeyContext sort_comparator;
2333     std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2334               sort_comparator);
2335   }
2336 }
2337 
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2338 void DBImpl::MultiGet(const ReadOptions& read_options,
2339                       ColumnFamilyHandle* column_family, const size_t num_keys,
2340                       const Slice* keys, PinnableSlice* values,
2341                       Status* statuses, const bool sorted_input) {
2342   return MultiGet(read_options, column_family, num_keys, keys, values,
2343                   /*timestamp=*/nullptr, statuses, sorted_input);
2344 }
2345 
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,std::string * timestamps,Status * statuses,const bool sorted_input)2346 void DBImpl::MultiGet(const ReadOptions& read_options,
2347                       ColumnFamilyHandle* column_family, const size_t num_keys,
2348                       const Slice* keys, PinnableSlice* values,
2349                       std::string* timestamps, Status* statuses,
2350                       const bool sorted_input) {
2351   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2352   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2353   sorted_keys.resize(num_keys);
2354   for (size_t i = 0; i < num_keys; ++i) {
2355     key_context.emplace_back(column_family, keys[i], &values[i],
2356                              timestamps ? &timestamps[i] : nullptr,
2357                              &statuses[i]);
2358   }
2359   for (size_t i = 0; i < num_keys; ++i) {
2360     sorted_keys[i] = &key_context[i];
2361   }
2362   PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2363   MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2364 }
2365 
MultiGetWithCallback(const ReadOptions & read_options,ColumnFamilyHandle * column_family,ReadCallback * callback,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2366 void DBImpl::MultiGetWithCallback(
2367     const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2368     ReadCallback* callback,
2369     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2370   std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2371   multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2372   std::function<MultiGetColumnFamilyData*(
2373       std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2374       iter_deref_lambda =
2375           [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2376             return &(*cf_iter);
2377           };
2378 
2379   size_t num_keys = sorted_keys->size();
2380   SequenceNumber consistent_seqnum;
2381   bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2382       read_options, callback, iter_deref_lambda, &multiget_cf_data,
2383       &consistent_seqnum);
2384 #ifndef NDEBUG
2385   assert(!unref_only);
2386 #else
2387   // Silence unused variable warning
2388   (void)unref_only;
2389 #endif  // NDEBUG
2390 
2391   if (callback && read_options.snapshot == nullptr) {
2392     // The unprep_seqs are not published for write unprepared, so it could be
2393     // that max_visible_seq is larger. Seek to the std::max of the two.
2394     // However, we still want our callback to contain the actual snapshot so
2395     // that it can do the correct visibility filtering.
2396     callback->Refresh(consistent_seqnum);
2397 
2398     // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2399     // max_visible_seq = max(max_visible_seq, snapshot)
2400     //
2401     // Currently, the commented out assert is broken by
2402     // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2403     // the regular transaction flow, then this special read callback would not
2404     // be needed.
2405     //
2406     // assert(callback->max_visible_seq() >= snapshot);
2407     consistent_seqnum = callback->max_visible_seq();
2408   }
2409 
2410   GetWithTimestampReadCallback timestamp_read_callback(0);
2411   ReadCallback* read_callback = callback;
2412   if (read_options.timestamp && read_options.timestamp->size() > 0) {
2413     assert(!read_callback);  // timestamp with callback is not supported
2414     timestamp_read_callback.Refresh(consistent_seqnum);
2415     read_callback = &timestamp_read_callback;
2416   }
2417 
2418   Status s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2419                           multiget_cf_data[0].super_version, consistent_seqnum,
2420                           read_callback);
2421   assert(s.ok() || s.IsTimedOut() || s.IsAborted());
2422   ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2423                                multiget_cf_data[0].super_version);
2424 }
2425 
2426 // The actual implementation of batched MultiGet. Parameters -
2427 // start_key - Index in the sorted_keys vector to start processing from
2428 // num_keys - Number of keys to lookup, starting with sorted_keys[start_key]
2429 // sorted_keys - The entire batch of sorted keys for this CF
2430 //
2431 // The per key status is returned in the KeyContext structures pointed to by
2432 // sorted_keys. An overall Status is also returned, with the only possible
2433 // values being Status::OK() and Status::TimedOut(). The latter indicates
2434 // that the call exceeded read_options.deadline
MultiGetImpl(const ReadOptions & read_options,size_t start_key,size_t num_keys,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys,SuperVersion * super_version,SequenceNumber snapshot,ReadCallback * callback)2435 Status DBImpl::MultiGetImpl(
2436     const ReadOptions& read_options, size_t start_key, size_t num_keys,
2437     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2438     SuperVersion* super_version, SequenceNumber snapshot,
2439     ReadCallback* callback) {
2440   PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
2441   StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
2442 
2443   // For each of the given keys, apply the entire "get" process as follows:
2444   // First look in the memtable, then in the immutable memtable (if any).
2445   // s is both in/out. When in, s could either be OK or MergeInProgress.
2446   // merge_operands will contain the sequence of merges in the latter case.
2447   size_t keys_left = num_keys;
2448   Status s;
2449   uint64_t curr_value_size = 0;
2450   while (keys_left) {
2451     if (read_options.deadline.count() &&
2452         immutable_db_options_.clock->NowMicros() >
2453             static_cast<uint64_t>(read_options.deadline.count())) {
2454       s = Status::TimedOut();
2455       break;
2456     }
2457 
2458     size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2459                             ? MultiGetContext::MAX_BATCH_SIZE
2460                             : keys_left;
2461     MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
2462                         batch_size, snapshot, read_options);
2463     MultiGetRange range = ctx.GetMultiGetRange();
2464     range.AddValueSize(curr_value_size);
2465     bool lookup_current = false;
2466 
2467     keys_left -= batch_size;
2468     for (auto mget_iter = range.begin(); mget_iter != range.end();
2469          ++mget_iter) {
2470       mget_iter->merge_context.Clear();
2471       *mget_iter->s = Status::OK();
2472     }
2473 
2474     bool skip_memtable =
2475         (read_options.read_tier == kPersistedTier &&
2476          has_unpersisted_data_.load(std::memory_order_relaxed));
2477     if (!skip_memtable) {
2478       super_version->mem->MultiGet(read_options, &range, callback);
2479       if (!range.empty()) {
2480         super_version->imm->MultiGet(read_options, &range, callback);
2481       }
2482       if (!range.empty()) {
2483         lookup_current = true;
2484         uint64_t left = range.KeysLeft();
2485         RecordTick(stats_, MEMTABLE_MISS, left);
2486       }
2487     }
2488     if (lookup_current) {
2489       PERF_TIMER_GUARD(get_from_output_files_time);
2490       super_version->current->MultiGet(read_options, &range, callback);
2491     }
2492     curr_value_size = range.GetValueSize();
2493     if (curr_value_size > read_options.value_size_soft_limit) {
2494       s = Status::Aborted();
2495       break;
2496     }
2497   }
2498 
2499   // Post processing (decrement reference counts and record statistics)
2500   PERF_TIMER_GUARD(get_post_process_time);
2501   size_t num_found = 0;
2502   uint64_t bytes_read = 0;
2503   for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
2504     KeyContext* key = (*sorted_keys)[i];
2505     if (key->s->ok()) {
2506       bytes_read += key->value->size();
2507       num_found++;
2508     }
2509   }
2510   if (keys_left) {
2511     assert(s.IsTimedOut() || s.IsAborted());
2512     for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
2513          ++i) {
2514       KeyContext* key = (*sorted_keys)[i];
2515       *key->s = s;
2516     }
2517   }
2518 
2519   RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2520   RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2521   RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2522   RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2523   RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2524   PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2525   PERF_TIMER_STOP(get_post_process_time);
2526 
2527   return s;
2528 }
2529 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const std::string & column_family,ColumnFamilyHandle ** handle)2530 Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2531                                   const std::string& column_family,
2532                                   ColumnFamilyHandle** handle) {
2533   assert(handle != nullptr);
2534   Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2535   if (s.ok()) {
2536     s = WriteOptionsFile(true /*need_mutex_lock*/,
2537                          true /*need_enter_write_thread*/);
2538   }
2539   return s;
2540 }
2541 
CreateColumnFamilies(const ColumnFamilyOptions & cf_options,const std::vector<std::string> & column_family_names,std::vector<ColumnFamilyHandle * > * handles)2542 Status DBImpl::CreateColumnFamilies(
2543     const ColumnFamilyOptions& cf_options,
2544     const std::vector<std::string>& column_family_names,
2545     std::vector<ColumnFamilyHandle*>* handles) {
2546   assert(handles != nullptr);
2547   handles->clear();
2548   size_t num_cf = column_family_names.size();
2549   Status s;
2550   bool success_once = false;
2551   for (size_t i = 0; i < num_cf; i++) {
2552     ColumnFamilyHandle* handle;
2553     s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2554     if (!s.ok()) {
2555       break;
2556     }
2557     handles->push_back(handle);
2558     success_once = true;
2559   }
2560   if (success_once) {
2561     Status persist_options_status = WriteOptionsFile(
2562         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2563     if (s.ok() && !persist_options_status.ok()) {
2564       s = persist_options_status;
2565     }
2566   }
2567   return s;
2568 }
2569 
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles)2570 Status DBImpl::CreateColumnFamilies(
2571     const std::vector<ColumnFamilyDescriptor>& column_families,
2572     std::vector<ColumnFamilyHandle*>* handles) {
2573   assert(handles != nullptr);
2574   handles->clear();
2575   size_t num_cf = column_families.size();
2576   Status s;
2577   bool success_once = false;
2578   for (size_t i = 0; i < num_cf; i++) {
2579     ColumnFamilyHandle* handle;
2580     s = CreateColumnFamilyImpl(column_families[i].options,
2581                                column_families[i].name, &handle);
2582     if (!s.ok()) {
2583       break;
2584     }
2585     handles->push_back(handle);
2586     success_once = true;
2587   }
2588   if (success_once) {
2589     Status persist_options_status = WriteOptionsFile(
2590         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2591     if (s.ok() && !persist_options_status.ok()) {
2592       s = persist_options_status;
2593     }
2594   }
2595   return s;
2596 }
2597 
CreateColumnFamilyImpl(const ColumnFamilyOptions & cf_options,const std::string & column_family_name,ColumnFamilyHandle ** handle)2598 Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2599                                       const std::string& column_family_name,
2600                                       ColumnFamilyHandle** handle) {
2601   Status s;
2602   *handle = nullptr;
2603 
2604   DBOptions db_options =
2605       BuildDBOptions(immutable_db_options_, mutable_db_options_);
2606   s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
2607   if (s.ok()) {
2608     for (auto& cf_path : cf_options.cf_paths) {
2609       s = env_->CreateDirIfMissing(cf_path.path);
2610       if (!s.ok()) {
2611         break;
2612       }
2613     }
2614   }
2615   if (!s.ok()) {
2616     return s;
2617   }
2618 
2619   SuperVersionContext sv_context(/* create_superversion */ true);
2620   {
2621     InstrumentedMutexLock l(&mutex_);
2622 
2623     if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2624         nullptr) {
2625       return Status::InvalidArgument("Column family already exists");
2626     }
2627     VersionEdit edit;
2628     edit.AddColumnFamily(column_family_name);
2629     uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2630     edit.SetColumnFamily(new_id);
2631     edit.SetLogNumber(logfile_number_);
2632     edit.SetComparatorName(cf_options.comparator->Name());
2633 
2634     // LogAndApply will both write the creation in MANIFEST and create
2635     // ColumnFamilyData object
2636     {  // write thread
2637       WriteThread::Writer w;
2638       write_thread_.EnterUnbatched(&w, &mutex_);
2639       // LogAndApply will both write the creation in MANIFEST and create
2640       // ColumnFamilyData object
2641       s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2642                                  &mutex_, directories_.GetDbDir(), false,
2643                                  &cf_options);
2644       write_thread_.ExitUnbatched(&w);
2645     }
2646     if (s.ok()) {
2647       auto* cfd =
2648           versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2649       assert(cfd != nullptr);
2650       std::map<std::string, std::shared_ptr<FSDirectory>> dummy_created_dirs;
2651       s = cfd->AddDirectories(&dummy_created_dirs);
2652     }
2653     if (s.ok()) {
2654       single_column_family_mode_ = false;
2655       auto* cfd =
2656           versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2657       assert(cfd != nullptr);
2658       InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2659                                          *cfd->GetLatestMutableCFOptions());
2660 
2661       if (!cfd->mem()->IsSnapshotSupported()) {
2662         is_snapshot_supported_ = false;
2663       }
2664 
2665       cfd->set_initialized();
2666 
2667       *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2668       ROCKS_LOG_INFO(immutable_db_options_.info_log,
2669                      "Created column family [%s] (ID %u)",
2670                      column_family_name.c_str(), (unsigned)cfd->GetID());
2671     } else {
2672       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2673                       "Creating column family [%s] FAILED -- %s",
2674                       column_family_name.c_str(), s.ToString().c_str());
2675     }
2676   }  // InstrumentedMutexLock l(&mutex_)
2677 
2678   sv_context.Clean();
2679   // this is outside the mutex
2680   if (s.ok()) {
2681     NewThreadStatusCfInfo(
2682         static_cast_with_check<ColumnFamilyHandleImpl>(*handle)->cfd());
2683   }
2684   return s;
2685 }
2686 
DropColumnFamily(ColumnFamilyHandle * column_family)2687 Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
2688   assert(column_family != nullptr);
2689   Status s = DropColumnFamilyImpl(column_family);
2690   if (s.ok()) {
2691     s = WriteOptionsFile(true /*need_mutex_lock*/,
2692                          true /*need_enter_write_thread*/);
2693   }
2694   return s;
2695 }
2696 
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > & column_families)2697 Status DBImpl::DropColumnFamilies(
2698     const std::vector<ColumnFamilyHandle*>& column_families) {
2699   Status s;
2700   bool success_once = false;
2701   for (auto* handle : column_families) {
2702     s = DropColumnFamilyImpl(handle);
2703     if (!s.ok()) {
2704       break;
2705     }
2706     success_once = true;
2707   }
2708   if (success_once) {
2709     Status persist_options_status = WriteOptionsFile(
2710         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2711     if (s.ok() && !persist_options_status.ok()) {
2712       s = persist_options_status;
2713     }
2714   }
2715   return s;
2716 }
2717 
DropColumnFamilyImpl(ColumnFamilyHandle * column_family)2718 Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
2719   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
2720   auto cfd = cfh->cfd();
2721   if (cfd->GetID() == 0) {
2722     return Status::InvalidArgument("Can't drop default column family");
2723   }
2724 
2725   bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2726 
2727   VersionEdit edit;
2728   edit.DropColumnFamily();
2729   edit.SetColumnFamily(cfd->GetID());
2730 
2731   Status s;
2732   {
2733     InstrumentedMutexLock l(&mutex_);
2734     if (cfd->IsDropped()) {
2735       s = Status::InvalidArgument("Column family already dropped!\n");
2736     }
2737     if (s.ok()) {
2738       // we drop column family from a single write thread
2739       WriteThread::Writer w;
2740       write_thread_.EnterUnbatched(&w, &mutex_);
2741       s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2742                                  &mutex_);
2743       write_thread_.ExitUnbatched(&w);
2744     }
2745     if (s.ok()) {
2746       auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2747       max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2748                                     mutable_cf_options->max_write_buffer_number;
2749     }
2750 
2751     if (!cf_support_snapshot) {
2752       // Dropped Column Family doesn't support snapshot. Need to recalculate
2753       // is_snapshot_supported_.
2754       bool new_is_snapshot_supported = true;
2755       for (auto c : *versions_->GetColumnFamilySet()) {
2756         if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2757           new_is_snapshot_supported = false;
2758           break;
2759         }
2760       }
2761       is_snapshot_supported_ = new_is_snapshot_supported;
2762     }
2763     bg_cv_.SignalAll();
2764   }
2765 
2766   if (s.ok()) {
2767     // Note that here we erase the associated cf_info of the to-be-dropped
2768     // cfd before its ref-count goes to zero to avoid having to erase cf_info
2769     // later inside db_mutex.
2770     EraseThreadStatusCfInfo(cfd);
2771     assert(cfd->IsDropped());
2772     ROCKS_LOG_INFO(immutable_db_options_.info_log,
2773                    "Dropped column family with id %u\n", cfd->GetID());
2774   } else {
2775     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2776                     "Dropping column family with id %u FAILED -- %s\n",
2777                     cfd->GetID(), s.ToString().c_str());
2778   }
2779 
2780   return s;
2781 }
2782 
KeyMayExist(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,std::string * timestamp,bool * value_found)2783 bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2784                          ColumnFamilyHandle* column_family, const Slice& key,
2785                          std::string* value, std::string* timestamp,
2786                          bool* value_found) {
2787   assert(value != nullptr);
2788   if (value_found != nullptr) {
2789     // falsify later if key-may-exist but can't fetch value
2790     *value_found = true;
2791   }
2792   ReadOptions roptions = read_options;
2793   roptions.read_tier = kBlockCacheTier;  // read from block cache only
2794   PinnableSlice pinnable_val;
2795   GetImplOptions get_impl_options;
2796   get_impl_options.column_family = column_family;
2797   get_impl_options.value = &pinnable_val;
2798   get_impl_options.value_found = value_found;
2799   get_impl_options.timestamp = timestamp;
2800   auto s = GetImpl(roptions, key, get_impl_options);
2801   value->assign(pinnable_val.data(), pinnable_val.size());
2802 
2803   // If block_cache is enabled and the index block of the table didn't
2804   // not present in block_cache, the return value will be Status::Incomplete.
2805   // In this case, key may still exist in the table.
2806   return s.ok() || s.IsIncomplete();
2807 }
2808 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)2809 Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2810                               ColumnFamilyHandle* column_family) {
2811   if (read_options.managed) {
2812     return NewErrorIterator(
2813         Status::NotSupported("Managed iterator is not supported anymore."));
2814   }
2815   Iterator* result = nullptr;
2816   if (read_options.read_tier == kPersistedTier) {
2817     return NewErrorIterator(Status::NotSupported(
2818         "ReadTier::kPersistedData is not yet supported in iterators."));
2819   }
2820   // if iterator wants internal keys, we can only proceed if
2821   // we can guarantee the deletes haven't been processed yet
2822   if (immutable_db_options_.preserve_deletes &&
2823       read_options.iter_start_seqnum > 0 &&
2824       read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2825     return NewErrorIterator(Status::InvalidArgument(
2826         "Iterator requested internal keys which are too old and are not"
2827         " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2828   }
2829   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
2830   ColumnFamilyData* cfd = cfh->cfd();
2831   assert(cfd != nullptr);
2832   ReadCallback* read_callback = nullptr;  // No read callback provided.
2833   if (read_options.tailing) {
2834 #ifdef ROCKSDB_LITE
2835     // not supported in lite version
2836     result = nullptr;
2837 
2838 #else
2839     SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2840     auto iter = new ForwardIterator(this, read_options, cfd, sv,
2841                                     /* allow_unprepared_value */ true);
2842     result = NewDBIterator(
2843         env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2844         cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
2845         sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2846         this, cfd);
2847 #endif
2848   } else {
2849     // Note: no need to consider the special case of
2850     // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2851     // WritePreparedTxnDB
2852     result = NewIteratorImpl(read_options, cfd,
2853                              (read_options.snapshot != nullptr)
2854                                  ? read_options.snapshot->GetSequenceNumber()
2855                                  : kMaxSequenceNumber,
2856                              read_callback);
2857   }
2858   return result;
2859 }
2860 
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool expose_blob_index,bool allow_refresh)2861 ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2862                                             ColumnFamilyData* cfd,
2863                                             SequenceNumber snapshot,
2864                                             ReadCallback* read_callback,
2865                                             bool expose_blob_index,
2866                                             bool allow_refresh) {
2867   SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2868 
2869   TEST_SYNC_POINT("DBImpl::NewIterator:1");
2870   TEST_SYNC_POINT("DBImpl::NewIterator:2");
2871 
2872   if (snapshot == kMaxSequenceNumber) {
2873     // Note that the snapshot is assigned AFTER referencing the super
2874     // version because otherwise a flush happening in between may compact away
2875     // data for the snapshot, so the reader would see neither data that was be
2876     // visible to the snapshot before compaction nor the newer data inserted
2877     // afterwards.
2878     // Note that the super version might not contain all the data available
2879     // to this snapshot, but in that case it can see all the data in the
2880     // super version, which is a valid consistent state after the user
2881     // calls NewIterator().
2882     snapshot = versions_->LastSequence();
2883     TEST_SYNC_POINT("DBImpl::NewIterator:3");
2884     TEST_SYNC_POINT("DBImpl::NewIterator:4");
2885   }
2886 
2887   // Try to generate a DB iterator tree in continuous memory area to be
2888   // cache friendly. Here is an example of result:
2889   // +-------------------------------+
2890   // |                               |
2891   // | ArenaWrappedDBIter            |
2892   // |  +                            |
2893   // |  +---> Inner Iterator   ------------+
2894   // |  |                            |     |
2895   // |  |    +-- -- -- -- -- -- -- --+     |
2896   // |  +--- | Arena                 |     |
2897   // |       |                       |     |
2898   // |          Allocated Memory:    |     |
2899   // |       |   +-------------------+     |
2900   // |       |   | DBIter            | <---+
2901   // |           |  +                |
2902   // |       |   |  +-> iter_  ------------+
2903   // |       |   |                   |     |
2904   // |       |   +-------------------+     |
2905   // |       |   | MergingIterator   | <---+
2906   // |           |  +                |
2907   // |       |   |  +->child iter1  ------------+
2908   // |       |   |  |                |          |
2909   // |           |  +->child iter2  ----------+ |
2910   // |       |   |  |                |        | |
2911   // |       |   |  +->child iter3  --------+ | |
2912   // |           |                   |      | | |
2913   // |       |   +-------------------+      | | |
2914   // |       |   | Iterator1         | <--------+
2915   // |       |   +-------------------+      | |
2916   // |       |   | Iterator2         | <------+
2917   // |       |   +-------------------+      |
2918   // |       |   | Iterator3         | <----+
2919   // |       |   +-------------------+
2920   // |       |                       |
2921   // +-------+-----------------------+
2922   //
2923   // ArenaWrappedDBIter inlines an arena area where all the iterators in
2924   // the iterator tree are allocated in the order of being accessed when
2925   // querying.
2926   // Laying out the iterators in the order of being accessed makes it more
2927   // likely that any iterator pointer is close to the iterator it points to so
2928   // that they are likely to be in the same cache line and/or page.
2929   ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2930       env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, sv->current,
2931       snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations,
2932       sv->version_number, read_callback, this, cfd, expose_blob_index,
2933       read_options.snapshot != nullptr ? false : allow_refresh);
2934 
2935   InternalIterator* internal_iter = NewInternalIterator(
2936       db_iter->GetReadOptions(), cfd, sv, db_iter->GetArena(),
2937       db_iter->GetRangeDelAggregator(), snapshot,
2938       /* allow_unprepared_value */ true);
2939   db_iter->SetIterUnderDBIter(internal_iter);
2940 
2941   return db_iter;
2942 }
2943 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)2944 Status DBImpl::NewIterators(
2945     const ReadOptions& read_options,
2946     const std::vector<ColumnFamilyHandle*>& column_families,
2947     std::vector<Iterator*>* iterators) {
2948   if (read_options.managed) {
2949     return Status::NotSupported("Managed iterator is not supported anymore.");
2950   }
2951   if (read_options.read_tier == kPersistedTier) {
2952     return Status::NotSupported(
2953         "ReadTier::kPersistedData is not yet supported in iterators.");
2954   }
2955   ReadCallback* read_callback = nullptr;  // No read callback provided.
2956   iterators->clear();
2957   iterators->reserve(column_families.size());
2958   if (read_options.tailing) {
2959 #ifdef ROCKSDB_LITE
2960     return Status::InvalidArgument(
2961         "Tailing iterator not supported in RocksDB lite");
2962 #else
2963     for (auto cfh : column_families) {
2964       auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
2965       SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2966       auto iter = new ForwardIterator(this, read_options, cfd, sv,
2967                                       /* allow_unprepared_value */ true);
2968       iterators->push_back(NewDBIterator(
2969           env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2970           cfd->user_comparator(), iter, sv->current, kMaxSequenceNumber,
2971           sv->mutable_cf_options.max_sequential_skip_in_iterations,
2972           read_callback, this, cfd));
2973     }
2974 #endif
2975   } else {
2976     // Note: no need to consider the special case of
2977     // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2978     // WritePreparedTxnDB
2979     auto snapshot = read_options.snapshot != nullptr
2980                         ? read_options.snapshot->GetSequenceNumber()
2981                         : versions_->LastSequence();
2982     for (size_t i = 0; i < column_families.size(); ++i) {
2983       auto* cfd =
2984           static_cast_with_check<ColumnFamilyHandleImpl>(column_families[i])
2985               ->cfd();
2986       iterators->push_back(
2987           NewIteratorImpl(read_options, cfd, snapshot, read_callback));
2988     }
2989   }
2990 
2991   return Status::OK();
2992 }
2993 
GetSnapshot()2994 const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2995 
2996 #ifndef ROCKSDB_LITE
GetSnapshotForWriteConflictBoundary()2997 const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2998   return GetSnapshotImpl(true);
2999 }
3000 #endif  // ROCKSDB_LITE
3001 
GetSnapshotImpl(bool is_write_conflict_boundary,bool lock)3002 SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
3003                                       bool lock) {
3004   int64_t unix_time = 0;
3005   immutable_db_options_.clock->GetCurrentTime(&unix_time)
3006       .PermitUncheckedError();  // Ignore error
3007   SnapshotImpl* s = new SnapshotImpl;
3008 
3009   if (lock) {
3010     mutex_.Lock();
3011   }
3012   // returns null if the underlying memtable does not support snapshot.
3013   if (!is_snapshot_supported_) {
3014     if (lock) {
3015       mutex_.Unlock();
3016     }
3017     delete s;
3018     return nullptr;
3019   }
3020   auto snapshot_seq = last_seq_same_as_publish_seq_
3021                           ? versions_->LastSequence()
3022                           : versions_->LastPublishedSequence();
3023   SnapshotImpl* snapshot =
3024       snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
3025   if (lock) {
3026     mutex_.Unlock();
3027   }
3028   return snapshot;
3029 }
3030 
3031 namespace {
3032 typedef autovector<ColumnFamilyData*, 2> CfdList;
CfdListContains(const CfdList & list,ColumnFamilyData * cfd)3033 bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
3034   for (const ColumnFamilyData* t : list) {
3035     if (t == cfd) {
3036       return true;
3037     }
3038   }
3039   return false;
3040 }
3041 }  //  namespace
3042 
ReleaseSnapshot(const Snapshot * s)3043 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
3044   const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
3045   {
3046     InstrumentedMutexLock l(&mutex_);
3047     snapshots_.Delete(casted_s);
3048     uint64_t oldest_snapshot;
3049     if (snapshots_.empty()) {
3050       if (last_seq_same_as_publish_seq_) {
3051         oldest_snapshot = versions_->LastSequence();
3052       } else {
3053         oldest_snapshot = versions_->LastPublishedSequence();
3054       }
3055     } else {
3056       oldest_snapshot = snapshots_.oldest()->number_;
3057     }
3058     // Avoid to go through every column family by checking a global threshold
3059     // first.
3060     if (oldest_snapshot > bottommost_files_mark_threshold_) {
3061       CfdList cf_scheduled;
3062       for (auto* cfd : *versions_->GetColumnFamilySet()) {
3063         cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
3064         if (!cfd->current()
3065                  ->storage_info()
3066                  ->BottommostFilesMarkedForCompaction()
3067                  .empty()) {
3068           SchedulePendingCompaction(cfd);
3069           MaybeScheduleFlushOrCompaction();
3070           cf_scheduled.push_back(cfd);
3071         }
3072       }
3073 
3074       // Calculate a new threshold, skipping those CFs where compactions are
3075       // scheduled. We do not do the same pass as the previous loop because
3076       // mutex might be unlocked during the loop, making the result inaccurate.
3077       SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
3078       for (auto* cfd : *versions_->GetColumnFamilySet()) {
3079         if (CfdListContains(cf_scheduled, cfd)) {
3080           continue;
3081         }
3082         new_bottommost_files_mark_threshold = std::min(
3083             new_bottommost_files_mark_threshold,
3084             cfd->current()->storage_info()->bottommost_files_mark_threshold());
3085       }
3086       bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
3087     }
3088   }
3089   delete casted_s;
3090 }
3091 
3092 #ifndef ROCKSDB_LITE
GetPropertiesOfAllTables(ColumnFamilyHandle * column_family,TablePropertiesCollection * props)3093 Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
3094                                         TablePropertiesCollection* props) {
3095   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3096   auto cfd = cfh->cfd();
3097 
3098   // Increment the ref count
3099   mutex_.Lock();
3100   auto version = cfd->current();
3101   version->Ref();
3102   mutex_.Unlock();
3103 
3104   auto s = version->GetPropertiesOfAllTables(props);
3105 
3106   // Decrement the ref count
3107   mutex_.Lock();
3108   version->Unref();
3109   mutex_.Unlock();
3110 
3111   return s;
3112 }
3113 
GetPropertiesOfTablesInRange(ColumnFamilyHandle * column_family,const Range * range,std::size_t n,TablePropertiesCollection * props)3114 Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
3115                                             const Range* range, std::size_t n,
3116                                             TablePropertiesCollection* props) {
3117   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3118   auto cfd = cfh->cfd();
3119 
3120   // Increment the ref count
3121   mutex_.Lock();
3122   auto version = cfd->current();
3123   version->Ref();
3124   mutex_.Unlock();
3125 
3126   auto s = version->GetPropertiesOfTablesInRange(range, n, props);
3127 
3128   // Decrement the ref count
3129   mutex_.Lock();
3130   version->Unref();
3131   mutex_.Unlock();
3132 
3133   return s;
3134 }
3135 
3136 #endif  // ROCKSDB_LITE
3137 
GetName() const3138 const std::string& DBImpl::GetName() const { return dbname_; }
3139 
GetEnv() const3140 Env* DBImpl::GetEnv() const { return env_; }
3141 
GetFileSystem() const3142 FileSystem* DB::GetFileSystem() const {
3143   const auto& fs = GetEnv()->GetFileSystem();
3144   return fs.get();
3145 }
3146 
GetFileSystem() const3147 FileSystem* DBImpl::GetFileSystem() const {
3148   return immutable_db_options_.fs.get();
3149 }
3150 
GetSystemClock() const3151 SystemClock* DBImpl::GetSystemClock() const {
3152   return immutable_db_options_.clock;
3153 }
3154 
3155 #ifndef ROCKSDB_LITE
3156 
StartIOTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)3157 Status DBImpl::StartIOTrace(const TraceOptions& trace_options,
3158                             std::unique_ptr<TraceWriter>&& trace_writer) {
3159   assert(trace_writer != nullptr);
3160   return io_tracer_->StartIOTrace(GetSystemClock(), trace_options,
3161                                   std::move(trace_writer));
3162 }
3163 
EndIOTrace()3164 Status DBImpl::EndIOTrace() {
3165   io_tracer_->EndIOTrace();
3166   return Status::OK();
3167 }
3168 
3169 #endif  // ROCKSDB_LITE
3170 
GetOptions(ColumnFamilyHandle * column_family) const3171 Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
3172   InstrumentedMutexLock l(&mutex_);
3173   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3174   return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
3175                  cfh->cfd()->GetLatestCFOptions());
3176 }
3177 
GetDBOptions() const3178 DBOptions DBImpl::GetDBOptions() const {
3179   InstrumentedMutexLock l(&mutex_);
3180   return BuildDBOptions(immutable_db_options_, mutable_db_options_);
3181 }
3182 
GetProperty(ColumnFamilyHandle * column_family,const Slice & property,std::string * value)3183 bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
3184                          const Slice& property, std::string* value) {
3185   const DBPropertyInfo* property_info = GetPropertyInfo(property);
3186   value->clear();
3187   auto cfd =
3188       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3189   if (property_info == nullptr) {
3190     return false;
3191   } else if (property_info->handle_int) {
3192     uint64_t int_value;
3193     bool ret_value =
3194         GetIntPropertyInternal(cfd, *property_info, false, &int_value);
3195     if (ret_value) {
3196       *value = ToString(int_value);
3197     }
3198     return ret_value;
3199   } else if (property_info->handle_string) {
3200     InstrumentedMutexLock l(&mutex_);
3201     return cfd->internal_stats()->GetStringProperty(*property_info, property,
3202                                                     value);
3203   } else if (property_info->handle_string_dbimpl) {
3204     std::string tmp_value;
3205     bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
3206     if (ret_value) {
3207       *value = tmp_value;
3208     }
3209     return ret_value;
3210   }
3211   // Shouldn't reach here since exactly one of handle_string and handle_int
3212   // should be non-nullptr.
3213   assert(false);
3214   return false;
3215 }
3216 
GetMapProperty(ColumnFamilyHandle * column_family,const Slice & property,std::map<std::string,std::string> * value)3217 bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
3218                             const Slice& property,
3219                             std::map<std::string, std::string>* value) {
3220   const DBPropertyInfo* property_info = GetPropertyInfo(property);
3221   value->clear();
3222   auto cfd =
3223       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3224   if (property_info == nullptr) {
3225     return false;
3226   } else if (property_info->handle_map) {
3227     InstrumentedMutexLock l(&mutex_);
3228     return cfd->internal_stats()->GetMapProperty(*property_info, property,
3229                                                  value);
3230   }
3231   // If we reach this point it means that handle_map is not provided for the
3232   // requested property
3233   return false;
3234 }
3235 
GetIntProperty(ColumnFamilyHandle * column_family,const Slice & property,uint64_t * value)3236 bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
3237                             const Slice& property, uint64_t* value) {
3238   const DBPropertyInfo* property_info = GetPropertyInfo(property);
3239   if (property_info == nullptr || property_info->handle_int == nullptr) {
3240     return false;
3241   }
3242   auto cfd =
3243       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3244   return GetIntPropertyInternal(cfd, *property_info, false, value);
3245 }
3246 
GetIntPropertyInternal(ColumnFamilyData * cfd,const DBPropertyInfo & property_info,bool is_locked,uint64_t * value)3247 bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
3248                                     const DBPropertyInfo& property_info,
3249                                     bool is_locked, uint64_t* value) {
3250   assert(property_info.handle_int != nullptr);
3251   if (!property_info.need_out_of_mutex) {
3252     if (is_locked) {
3253       mutex_.AssertHeld();
3254       return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3255     } else {
3256       InstrumentedMutexLock l(&mutex_);
3257       return cfd->internal_stats()->GetIntProperty(property_info, value, this);
3258     }
3259   } else {
3260     SuperVersion* sv = nullptr;
3261     if (is_locked) {
3262       mutex_.Unlock();
3263     }
3264     sv = GetAndRefSuperVersion(cfd);
3265 
3266     bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
3267         property_info, sv->current, value);
3268 
3269     ReturnAndCleanupSuperVersion(cfd, sv);
3270     if (is_locked) {
3271       mutex_.Lock();
3272     }
3273 
3274     return ret;
3275   }
3276 }
3277 
GetPropertyHandleOptionsStatistics(std::string * value)3278 bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
3279   assert(value != nullptr);
3280   Statistics* statistics = immutable_db_options_.stats;
3281   if (!statistics) {
3282     return false;
3283   }
3284   *value = statistics->ToString();
3285   return true;
3286 }
3287 
3288 #ifndef ROCKSDB_LITE
ResetStats()3289 Status DBImpl::ResetStats() {
3290   InstrumentedMutexLock l(&mutex_);
3291   for (auto* cfd : *versions_->GetColumnFamilySet()) {
3292     if (cfd->initialized()) {
3293       cfd->internal_stats()->Clear();
3294     }
3295   }
3296   return Status::OK();
3297 }
3298 #endif  // ROCKSDB_LITE
3299 
GetAggregatedIntProperty(const Slice & property,uint64_t * aggregated_value)3300 bool DBImpl::GetAggregatedIntProperty(const Slice& property,
3301                                       uint64_t* aggregated_value) {
3302   const DBPropertyInfo* property_info = GetPropertyInfo(property);
3303   if (property_info == nullptr || property_info->handle_int == nullptr) {
3304     return false;
3305   }
3306 
3307   uint64_t sum = 0;
3308   bool ret = true;
3309   {
3310     // Needs mutex to protect the list of column families.
3311     InstrumentedMutexLock l(&mutex_);
3312     uint64_t value;
3313     for (auto* cfd : *versions_->GetColumnFamilySet()) {
3314       if (!cfd->initialized()) {
3315         continue;
3316       }
3317       cfd->Ref();
3318       ret = GetIntPropertyInternal(cfd, *property_info, true, &value);
3319       // GetIntPropertyInternal may release db mutex and re-acquire it.
3320       mutex_.AssertHeld();
3321       cfd->UnrefAndTryDelete();
3322       if (ret) {
3323         sum += value;
3324       } else {
3325         ret = false;
3326         break;
3327       }
3328     }
3329   }
3330   *aggregated_value = sum;
3331   return ret;
3332 }
3333 
GetAndRefSuperVersion(ColumnFamilyData * cfd)3334 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
3335   // TODO(ljin): consider using GetReferencedSuperVersion() directly
3336   return cfd->GetThreadLocalSuperVersion(this);
3337 }
3338 
3339 // REQUIRED: this function should only be called on the write thread or if the
3340 // mutex is held.
GetAndRefSuperVersion(uint32_t column_family_id)3341 SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
3342   auto column_family_set = versions_->GetColumnFamilySet();
3343   auto cfd = column_family_set->GetColumnFamily(column_family_id);
3344   if (!cfd) {
3345     return nullptr;
3346   }
3347 
3348   return GetAndRefSuperVersion(cfd);
3349 }
3350 
CleanupSuperVersion(SuperVersion * sv)3351 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
3352   // Release SuperVersion
3353   if (sv->Unref()) {
3354     bool defer_purge =
3355             immutable_db_options().avoid_unnecessary_blocking_io;
3356     {
3357       InstrumentedMutexLock l(&mutex_);
3358       sv->Cleanup();
3359       if (defer_purge) {
3360         AddSuperVersionsToFreeQueue(sv);
3361         SchedulePurge();
3362       }
3363     }
3364     if (!defer_purge) {
3365       delete sv;
3366     }
3367     RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
3368   }
3369   RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
3370 }
3371 
ReturnAndCleanupSuperVersion(ColumnFamilyData * cfd,SuperVersion * sv)3372 void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
3373                                           SuperVersion* sv) {
3374   if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
3375     CleanupSuperVersion(sv);
3376   }
3377 }
3378 
3379 // REQUIRED: this function should only be called on the write thread.
ReturnAndCleanupSuperVersion(uint32_t column_family_id,SuperVersion * sv)3380 void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
3381                                           SuperVersion* sv) {
3382   auto column_family_set = versions_->GetColumnFamilySet();
3383   auto cfd = column_family_set->GetColumnFamily(column_family_id);
3384 
3385   // If SuperVersion is held, and we successfully fetched a cfd using
3386   // GetAndRefSuperVersion(), it must still exist.
3387   assert(cfd != nullptr);
3388   ReturnAndCleanupSuperVersion(cfd, sv);
3389 }
3390 
3391 // REQUIRED: this function should only be called on the write thread or if the
3392 // mutex is held.
GetColumnFamilyHandle(uint32_t column_family_id)3393 ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
3394   ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
3395 
3396   if (!cf_memtables->Seek(column_family_id)) {
3397     return nullptr;
3398   }
3399 
3400   return cf_memtables->GetColumnFamilyHandle();
3401 }
3402 
3403 // REQUIRED: mutex is NOT held.
GetColumnFamilyHandleUnlocked(uint32_t column_family_id)3404 std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
3405     uint32_t column_family_id) {
3406   InstrumentedMutexLock l(&mutex_);
3407 
3408   auto* cfd =
3409       versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3410   if (cfd == nullptr) {
3411     return nullptr;
3412   }
3413 
3414   return std::unique_ptr<ColumnFamilyHandleImpl>(
3415       new ColumnFamilyHandleImpl(cfd, this, &mutex_));
3416 }
3417 
GetApproximateMemTableStats(ColumnFamilyHandle * column_family,const Range & range,uint64_t * const count,uint64_t * const size)3418 void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3419                                          const Range& range,
3420                                          uint64_t* const count,
3421                                          uint64_t* const size) {
3422   ColumnFamilyHandleImpl* cfh =
3423       static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3424   ColumnFamilyData* cfd = cfh->cfd();
3425   SuperVersion* sv = GetAndRefSuperVersion(cfd);
3426 
3427   // Convert user_key into a corresponding internal key.
3428   InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3429   InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3430   MemTable::MemTableStats memStats =
3431       sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3432   MemTable::MemTableStats immStats =
3433       sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3434   *count = memStats.count + immStats.count;
3435   *size = memStats.size + immStats.size;
3436 
3437   ReturnAndCleanupSuperVersion(cfd, sv);
3438 }
3439 
GetApproximateSizes(const SizeApproximationOptions & options,ColumnFamilyHandle * column_family,const Range * range,int n,uint64_t * sizes)3440 Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3441                                    ColumnFamilyHandle* column_family,
3442                                    const Range* range, int n, uint64_t* sizes) {
3443   if (!options.include_memtabtles && !options.include_files) {
3444     return Status::InvalidArgument("Invalid options");
3445   }
3446 
3447   const Comparator* const ucmp = column_family->GetComparator();
3448   assert(ucmp);
3449   size_t ts_sz = ucmp->timestamp_size();
3450 
3451   Version* v;
3452   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3453   auto cfd = cfh->cfd();
3454   SuperVersion* sv = GetAndRefSuperVersion(cfd);
3455   v = sv->current;
3456 
3457   for (int i = 0; i < n; i++) {
3458     Slice start = range[i].start;
3459     Slice limit = range[i].limit;
3460 
3461     // Add timestamp if needed
3462     std::string start_with_ts, limit_with_ts;
3463     if (ts_sz > 0) {
3464       // Maximum timestamp means including all key with any timestamp
3465       AppendKeyWithMaxTimestamp(&start_with_ts, start, ts_sz);
3466       // Append a maximum timestamp as the range limit is exclusive:
3467       // [start, limit)
3468       AppendKeyWithMaxTimestamp(&limit_with_ts, limit, ts_sz);
3469       start = start_with_ts;
3470       limit = limit_with_ts;
3471     }
3472     // Convert user_key into a corresponding internal key.
3473     InternalKey k1(start, kMaxSequenceNumber, kValueTypeForSeek);
3474     InternalKey k2(limit, kMaxSequenceNumber, kValueTypeForSeek);
3475     sizes[i] = 0;
3476     if (options.include_files) {
3477       sizes[i] += versions_->ApproximateSize(
3478           options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3479           /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
3480     }
3481     if (options.include_memtabtles) {
3482       sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3483       sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3484     }
3485   }
3486 
3487   ReturnAndCleanupSuperVersion(cfd, sv);
3488   return Status::OK();
3489 }
3490 
3491 std::list<uint64_t>::iterator
CaptureCurrentFileNumberInPendingOutputs()3492 DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3493   // We need to remember the iterator of our insert, because after the
3494   // background job is done, we need to remove that element from
3495   // pending_outputs_.
3496   pending_outputs_.push_back(versions_->current_next_file_number());
3497   auto pending_outputs_inserted_elem = pending_outputs_.end();
3498   --pending_outputs_inserted_elem;
3499   return pending_outputs_inserted_elem;
3500 }
3501 
ReleaseFileNumberFromPendingOutputs(std::unique_ptr<std::list<uint64_t>::iterator> & v)3502 void DBImpl::ReleaseFileNumberFromPendingOutputs(
3503     std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3504   if (v.get() != nullptr) {
3505     pending_outputs_.erase(*v.get());
3506     v.reset();
3507   }
3508 }
3509 
3510 #ifndef ROCKSDB_LITE
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options)3511 Status DBImpl::GetUpdatesSince(
3512     SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
3513     const TransactionLogIterator::ReadOptions& read_options) {
3514   RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3515   if (seq > versions_->LastSequence()) {
3516     return Status::NotFound("Requested sequence not yet written in the db");
3517   }
3518   return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3519 }
3520 
DeleteFile(std::string name)3521 Status DBImpl::DeleteFile(std::string name) {
3522   uint64_t number;
3523   FileType type;
3524   WalFileType log_type;
3525   if (!ParseFileName(name, &number, &type, &log_type) ||
3526       (type != kTableFile && type != kWalFile)) {
3527     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3528                     name.c_str());
3529     return Status::InvalidArgument("Invalid file name");
3530   }
3531 
3532   if (type == kWalFile) {
3533     // Only allow deleting archived log files
3534     if (log_type != kArchivedLogFile) {
3535       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3536                       "DeleteFile %s failed - not archived log.\n",
3537                       name.c_str());
3538       return Status::NotSupported("Delete only supported for archived logs");
3539     }
3540     Status status = wal_manager_.DeleteFile(name, number);
3541     if (!status.ok()) {
3542       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3543                       "DeleteFile %s failed -- %s.\n", name.c_str(),
3544                       status.ToString().c_str());
3545     }
3546     return status;
3547   }
3548 
3549   Status status;
3550   int level;
3551   FileMetaData* metadata;
3552   ColumnFamilyData* cfd;
3553   VersionEdit edit;
3554   JobContext job_context(next_job_id_.fetch_add(1), true);
3555   {
3556     InstrumentedMutexLock l(&mutex_);
3557     status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3558     if (!status.ok()) {
3559       ROCKS_LOG_WARN(immutable_db_options_.info_log,
3560                      "DeleteFile %s failed. File not found\n", name.c_str());
3561       job_context.Clean();
3562       return Status::InvalidArgument("File not found");
3563     }
3564     assert(level < cfd->NumberLevels());
3565 
3566     // If the file is being compacted no need to delete.
3567     if (metadata->being_compacted) {
3568       ROCKS_LOG_INFO(immutable_db_options_.info_log,
3569                      "DeleteFile %s Skipped. File about to be compacted\n",
3570                      name.c_str());
3571       job_context.Clean();
3572       return Status::OK();
3573     }
3574 
3575     // Only the files in the last level can be deleted externally.
3576     // This is to make sure that any deletion tombstones are not
3577     // lost. Check that the level passed is the last level.
3578     auto* vstoreage = cfd->current()->storage_info();
3579     for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3580       if (vstoreage->NumLevelFiles(i) != 0) {
3581         ROCKS_LOG_WARN(immutable_db_options_.info_log,
3582                        "DeleteFile %s FAILED. File not in last level\n",
3583                        name.c_str());
3584         job_context.Clean();
3585         return Status::InvalidArgument("File not in last level");
3586       }
3587     }
3588     // if level == 0, it has to be the oldest file
3589     if (level == 0 &&
3590         vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3591       ROCKS_LOG_WARN(immutable_db_options_.info_log,
3592                      "DeleteFile %s failed ---"
3593                      " target file in level 0 must be the oldest.",
3594                      name.c_str());
3595       job_context.Clean();
3596       return Status::InvalidArgument("File in level 0, but not oldest");
3597     }
3598     edit.SetColumnFamily(cfd->GetID());
3599     edit.DeleteFile(level, number);
3600     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3601                                     &edit, &mutex_, directories_.GetDbDir());
3602     if (status.ok()) {
3603       InstallSuperVersionAndScheduleWork(cfd,
3604                                          &job_context.superversion_contexts[0],
3605                                          *cfd->GetLatestMutableCFOptions());
3606     }
3607     FindObsoleteFiles(&job_context, false);
3608   }  // lock released here
3609 
3610   LogFlush(immutable_db_options_.info_log);
3611   // remove files outside the db-lock
3612   if (job_context.HaveSomethingToDelete()) {
3613     // Call PurgeObsoleteFiles() without holding mutex.
3614     PurgeObsoleteFiles(job_context);
3615   }
3616   job_context.Clean();
3617   return status;
3618 }
3619 
DeleteFilesInRanges(ColumnFamilyHandle * column_family,const RangePtr * ranges,size_t n,bool include_end)3620 Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3621                                    const RangePtr* ranges, size_t n,
3622                                    bool include_end) {
3623   Status status = Status::OK();
3624   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
3625   ColumnFamilyData* cfd = cfh->cfd();
3626   VersionEdit edit;
3627   std::set<FileMetaData*> deleted_files;
3628   JobContext job_context(next_job_id_.fetch_add(1), true);
3629   {
3630     InstrumentedMutexLock l(&mutex_);
3631     Version* input_version = cfd->current();
3632 
3633     auto* vstorage = input_version->storage_info();
3634     for (size_t r = 0; r < n; r++) {
3635       auto begin = ranges[r].start, end = ranges[r].limit;
3636       for (int i = 1; i < cfd->NumberLevels(); i++) {
3637         if (vstorage->LevelFiles(i).empty() ||
3638             !vstorage->OverlapInLevel(i, begin, end)) {
3639           continue;
3640         }
3641         std::vector<FileMetaData*> level_files;
3642         InternalKey begin_storage, end_storage, *begin_key, *end_key;
3643         if (begin == nullptr) {
3644           begin_key = nullptr;
3645         } else {
3646           begin_storage.SetMinPossibleForUserKey(*begin);
3647           begin_key = &begin_storage;
3648         }
3649         if (end == nullptr) {
3650           end_key = nullptr;
3651         } else {
3652           end_storage.SetMaxPossibleForUserKey(*end);
3653           end_key = &end_storage;
3654         }
3655 
3656         vstorage->GetCleanInputsWithinInterval(
3657             i, begin_key, end_key, &level_files, -1 /* hint_index */,
3658             nullptr /* file_index */);
3659         FileMetaData* level_file;
3660         for (uint32_t j = 0; j < level_files.size(); j++) {
3661           level_file = level_files[j];
3662           if (level_file->being_compacted) {
3663             continue;
3664           }
3665           if (deleted_files.find(level_file) != deleted_files.end()) {
3666             continue;
3667           }
3668           if (!include_end && end != nullptr &&
3669               cfd->user_comparator()->Compare(level_file->largest.user_key(),
3670                                               *end) == 0) {
3671             continue;
3672           }
3673           edit.SetColumnFamily(cfd->GetID());
3674           edit.DeleteFile(i, level_file->fd.GetNumber());
3675           deleted_files.insert(level_file);
3676           level_file->being_compacted = true;
3677         }
3678       }
3679     }
3680     if (edit.GetDeletedFiles().empty()) {
3681       job_context.Clean();
3682       return status;
3683     }
3684     input_version->Ref();
3685     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3686                                     &edit, &mutex_, directories_.GetDbDir());
3687     if (status.ok()) {
3688       InstallSuperVersionAndScheduleWork(cfd,
3689                                          &job_context.superversion_contexts[0],
3690                                          *cfd->GetLatestMutableCFOptions());
3691     }
3692     for (auto* deleted_file : deleted_files) {
3693       deleted_file->being_compacted = false;
3694     }
3695     input_version->Unref();
3696     FindObsoleteFiles(&job_context, false);
3697   }  // lock released here
3698 
3699   LogFlush(immutable_db_options_.info_log);
3700   // remove files outside the db-lock
3701   if (job_context.HaveSomethingToDelete()) {
3702     // Call PurgeObsoleteFiles() without holding mutex.
3703     PurgeObsoleteFiles(job_context);
3704   }
3705   job_context.Clean();
3706   return status;
3707 }
3708 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)3709 void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3710   InstrumentedMutexLock l(&mutex_);
3711   versions_->GetLiveFilesMetaData(metadata);
3712 }
3713 
GetLiveFilesChecksumInfo(FileChecksumList * checksum_list)3714 Status DBImpl::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
3715   InstrumentedMutexLock l(&mutex_);
3716   return versions_->GetLiveFilesChecksumInfo(checksum_list);
3717 }
3718 
GetColumnFamilyMetaData(ColumnFamilyHandle * column_family,ColumnFamilyMetaData * cf_meta)3719 void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3720                                      ColumnFamilyMetaData* cf_meta) {
3721   assert(column_family);
3722   auto* cfd =
3723       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
3724   auto* sv = GetAndRefSuperVersion(cfd);
3725   {
3726     // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3727     // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3728     // this may cause regression. An alternative is to make
3729     // FileMetaData::being_compacted atomic, but it will make FileMetaData
3730     // non-copy-able. Another option is to separate these variables from
3731     // original FileMetaData struct, and this requires re-organization of data
3732     // structures. For now, we take the easy approach. If
3733     // DB::GetColumnFamilyMetaData is not called frequently, the regression
3734     // should not be big. We still need to keep an eye on it.
3735     InstrumentedMutexLock l(&mutex_);
3736     sv->current->GetColumnFamilyMetaData(cf_meta);
3737   }
3738   ReturnAndCleanupSuperVersion(cfd, sv);
3739 }
3740 
3741 #endif  // ROCKSDB_LITE
3742 
CheckConsistency()3743 Status DBImpl::CheckConsistency() {
3744   mutex_.AssertHeld();
3745   std::vector<LiveFileMetaData> metadata;
3746   versions_->GetLiveFilesMetaData(&metadata);
3747   TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
3748 
3749   std::string corruption_messages;
3750 
3751   if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
3752     // Instead of calling GetFileSize() for each expected file, call
3753     // GetChildren() for the DB directory and check that all expected files
3754     // are listed, without checking their sizes.
3755     // Since sst files might be in different directories, do it for each
3756     // directory separately.
3757     std::map<std::string, std::vector<std::string>> files_by_directory;
3758     for (const auto& md : metadata) {
3759       // md.name has a leading "/". Remove it.
3760       std::string fname = md.name;
3761       if (!fname.empty() && fname[0] == '/') {
3762         fname = fname.substr(1);
3763       }
3764       files_by_directory[md.db_path].push_back(fname);
3765     }
3766     for (const auto& dir_files : files_by_directory) {
3767       std::string directory = dir_files.first;
3768       std::vector<std::string> existing_files;
3769       Status s = env_->GetChildren(directory, &existing_files);
3770       if (!s.ok()) {
3771         corruption_messages +=
3772             "Can't list files in " + directory + ": " + s.ToString() + "\n";
3773         continue;
3774       }
3775       std::sort(existing_files.begin(), existing_files.end());
3776 
3777       for (const std::string& fname : dir_files.second) {
3778         if (!std::binary_search(existing_files.begin(), existing_files.end(),
3779                                 fname) &&
3780             !std::binary_search(existing_files.begin(), existing_files.end(),
3781                                 Rocks2LevelTableFileName(fname))) {
3782           corruption_messages +=
3783               "Missing sst file " + fname + " in " + directory + "\n";
3784         }
3785       }
3786     }
3787   } else {
3788     for (const auto& md : metadata) {
3789       // md.name has a leading "/".
3790       std::string file_path = md.db_path + md.name;
3791 
3792       uint64_t fsize = 0;
3793       TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3794       Status s = env_->GetFileSize(file_path, &fsize);
3795       if (!s.ok() &&
3796           env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3797         s = Status::OK();
3798       }
3799       if (!s.ok()) {
3800         corruption_messages +=
3801             "Can't access " + md.name + ": " + s.ToString() + "\n";
3802       } else if (fsize != md.size) {
3803         corruption_messages += "Sst file size mismatch: " + file_path +
3804                                ". Size recorded in manifest " +
3805                                ToString(md.size) + ", actual size " +
3806                                ToString(fsize) + "\n";
3807       }
3808     }
3809   }
3810 
3811   if (corruption_messages.size() == 0) {
3812     return Status::OK();
3813   } else {
3814     return Status::Corruption(corruption_messages);
3815   }
3816 }
3817 
GetDbIdentity(std::string & identity) const3818 Status DBImpl::GetDbIdentity(std::string& identity) const {
3819   identity.assign(db_id_);
3820   return Status::OK();
3821 }
3822 
GetDbIdentityFromIdentityFile(std::string * identity) const3823 Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
3824   std::string idfilename = IdentityFileName(dbname_);
3825   const FileOptions soptions;
3826 
3827   Status s = ReadFileToString(fs_.get(), idfilename, identity);
3828   if (!s.ok()) {
3829     return s;
3830   }
3831 
3832   // If last character is '\n' remove it from identity
3833   if (identity->size() > 0 && identity->back() == '\n') {
3834     identity->pop_back();
3835   }
3836   return s;
3837 }
3838 
GetDbSessionId(std::string & session_id) const3839 Status DBImpl::GetDbSessionId(std::string& session_id) const {
3840   session_id.assign(db_session_id_);
3841   return Status::OK();
3842 }
3843 
SetDbSessionId()3844 void DBImpl::SetDbSessionId() {
3845   // GenerateUniqueId() generates an identifier that has a negligible
3846   // probability of being duplicated, ~128 bits of entropy
3847   std::string uuid = env_->GenerateUniqueId();
3848 
3849   // Hash and reformat that down to a more compact format, 20 characters
3850   // in base-36 ([0-9A-Z]), which is ~103 bits of entropy, which is enough
3851   // to expect no collisions across a billion servers each opening DBs
3852   // a million times (~2^50). Benefits vs. raw unique id:
3853   // * Save ~ dozen bytes per SST file
3854   // * Shorter shared backup file names (some platforms have low limits)
3855   // * Visually distinct from DB id format
3856   uint64_t a = NPHash64(uuid.data(), uuid.size(), 1234U);
3857   uint64_t b = NPHash64(uuid.data(), uuid.size(), 5678U);
3858   db_session_id_.resize(20);
3859   static const char* const base36 = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
3860   size_t i = 0;
3861   for (; i < 10U; ++i, a /= 36U) {
3862     db_session_id_[i] = base36[a % 36];
3863   }
3864   for (; i < 20U; ++i, b /= 36U) {
3865     db_session_id_[i] = base36[b % 36];
3866   }
3867   TEST_SYNC_POINT_CALLBACK("DBImpl::SetDbSessionId", &db_session_id_);
3868 }
3869 
3870 // Default implementation -- returns not supported status
CreateColumnFamily(const ColumnFamilyOptions &,const std::string &,ColumnFamilyHandle **)3871 Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3872                               const std::string& /*column_family_name*/,
3873                               ColumnFamilyHandle** /*handle*/) {
3874   return Status::NotSupported("");
3875 }
3876 
CreateColumnFamilies(const ColumnFamilyOptions &,const std::vector<std::string> &,std::vector<ColumnFamilyHandle * > *)3877 Status DB::CreateColumnFamilies(
3878     const ColumnFamilyOptions& /*cf_options*/,
3879     const std::vector<std::string>& /*column_family_names*/,
3880     std::vector<ColumnFamilyHandle*>* /*handles*/) {
3881   return Status::NotSupported("");
3882 }
3883 
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> &,std::vector<ColumnFamilyHandle * > *)3884 Status DB::CreateColumnFamilies(
3885     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3886     std::vector<ColumnFamilyHandle*>* /*handles*/) {
3887   return Status::NotSupported("");
3888 }
3889 
DropColumnFamily(ColumnFamilyHandle *)3890 Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3891   return Status::NotSupported("");
3892 }
3893 
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > &)3894 Status DB::DropColumnFamilies(
3895     const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
3896   return Status::NotSupported("");
3897 }
3898 
DestroyColumnFamilyHandle(ColumnFamilyHandle * column_family)3899 Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3900   delete column_family;
3901   return Status::OK();
3902 }
3903 
~DB()3904 DB::~DB() {}
3905 
Close()3906 Status DBImpl::Close() {
3907   if (!closed_) {
3908     {
3909       InstrumentedMutexLock l(&mutex_);
3910       // If there is unreleased snapshot, fail the close call
3911       if (!snapshots_.empty()) {
3912         return Status::Aborted("Cannot close DB with unreleased snapshot.");
3913       }
3914     }
3915 
3916     closed_ = true;
3917     return CloseImpl();
3918   }
3919   return Status::OK();
3920 }
3921 
ListColumnFamilies(const DBOptions & db_options,const std::string & name,std::vector<std::string> * column_families)3922 Status DB::ListColumnFamilies(const DBOptions& db_options,
3923                               const std::string& name,
3924                               std::vector<std::string>* column_families) {
3925   const std::shared_ptr<FileSystem>& fs = db_options.env->GetFileSystem();
3926   return VersionSet::ListColumnFamilies(column_families, name, fs.get());
3927 }
3928 
~Snapshot()3929 Snapshot::~Snapshot() {}
3930 
DestroyDB(const std::string & dbname,const Options & options,const std::vector<ColumnFamilyDescriptor> & column_families)3931 Status DestroyDB(const std::string& dbname, const Options& options,
3932                  const std::vector<ColumnFamilyDescriptor>& column_families) {
3933   ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
3934   Env* env = soptions.env;
3935   std::vector<std::string> filenames;
3936   bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
3937 
3938   // Reset the logger because it holds a handle to the
3939   // log file and prevents cleanup and directory removal
3940   soptions.info_log.reset();
3941   // Ignore error in case directory does not exist
3942   env->GetChildren(dbname, &filenames).PermitUncheckedError();
3943 
3944   FileLock* lock;
3945   const std::string lockname = LockFileName(dbname);
3946   Status result = env->LockFile(lockname, &lock);
3947   if (result.ok()) {
3948     uint64_t number;
3949     FileType type;
3950     InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
3951     for (const auto& fname : filenames) {
3952       if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
3953           type != kDBLockFile) {  // Lock file will be deleted at end
3954         Status del;
3955         std::string path_to_delete = dbname + "/" + fname;
3956         if (type == kMetaDatabase) {
3957           del = DestroyDB(path_to_delete, options);
3958         } else if (type == kTableFile || type == kWalFile ||
3959                    type == kBlobFile) {
3960           del = DeleteDBFile(&soptions, path_to_delete, dbname,
3961                              /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3962         } else {
3963           del = env->DeleteFile(path_to_delete);
3964         }
3965         if (!del.ok() && result.ok()) {
3966           result = del;
3967         }
3968       }
3969     }
3970 
3971     std::set<std::string> paths;
3972     for (const DbPath& db_path : options.db_paths) {
3973       paths.insert(db_path.path);
3974     }
3975     for (const ColumnFamilyDescriptor& cf : column_families) {
3976       for (const DbPath& cf_path : cf.options.cf_paths) {
3977         paths.insert(cf_path.path);
3978       }
3979     }
3980     for (const auto& path : paths) {
3981       if (env->GetChildren(path, &filenames).ok()) {
3982         for (const auto& fname : filenames) {
3983           if (ParseFileName(fname, &number, &type) &&
3984               (type == kTableFile ||
3985                type == kBlobFile)) {  // Lock file will be deleted at end
3986             std::string file_path = path + "/" + fname;
3987             Status del = DeleteDBFile(&soptions, file_path, dbname,
3988                                       /*force_bg=*/false, /*force_fg=*/false);
3989             if (!del.ok() && result.ok()) {
3990               result = del;
3991             }
3992           }
3993         }
3994         // TODO: Should we return an error if we cannot delete the directory?
3995         env->DeleteDir(path).PermitUncheckedError();
3996       }
3997     }
3998 
3999     std::vector<std::string> walDirFiles;
4000     std::string archivedir = ArchivalDirectory(dbname);
4001     bool wal_dir_exists = false;
4002     if (dbname != soptions.wal_dir) {
4003       wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
4004       archivedir = ArchivalDirectory(soptions.wal_dir);
4005     }
4006 
4007     // Archive dir may be inside wal dir or dbname and should be
4008     // processed and removed before those otherwise we have issues
4009     // removing them
4010     std::vector<std::string> archiveFiles;
4011     if (env->GetChildren(archivedir, &archiveFiles).ok()) {
4012       // Delete archival files.
4013       for (const auto& file : archiveFiles) {
4014         if (ParseFileName(file, &number, &type) && type == kWalFile) {
4015           Status del =
4016               DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
4017                            /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
4018           if (!del.ok() && result.ok()) {
4019             result = del;
4020           }
4021         }
4022       }
4023       // Ignore error in case dir contains other files
4024       env->DeleteDir(archivedir).PermitUncheckedError();
4025     }
4026 
4027     // Delete log files in the WAL dir
4028     if (wal_dir_exists) {
4029       for (const auto& file : walDirFiles) {
4030         if (ParseFileName(file, &number, &type) && type == kWalFile) {
4031           Status del =
4032               DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
4033                            soptions.wal_dir, /*force_bg=*/false,
4034                            /*force_fg=*/!wal_in_db_path);
4035           if (!del.ok() && result.ok()) {
4036             result = del;
4037           }
4038         }
4039       }
4040       // Ignore error in case dir contains other files
4041       env->DeleteDir(soptions.wal_dir).PermitUncheckedError();
4042     }
4043 
4044     // Ignore error since state is already gone
4045     env->UnlockFile(lock).PermitUncheckedError();
4046     env->DeleteFile(lockname).PermitUncheckedError();
4047 
4048     // sst_file_manager holds a ref to the logger. Make sure the logger is
4049     // gone before trying to remove the directory.
4050     soptions.sst_file_manager.reset();
4051 
4052     // Ignore error in case dir contains other files
4053     env->DeleteDir(dbname).PermitUncheckedError();
4054     ;
4055   }
4056   return result;
4057 }
4058 
WriteOptionsFile(bool need_mutex_lock,bool need_enter_write_thread)4059 Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
4060                                 bool need_enter_write_thread) {
4061 #ifndef ROCKSDB_LITE
4062   WriteThread::Writer w;
4063   if (need_mutex_lock) {
4064     mutex_.Lock();
4065   } else {
4066     mutex_.AssertHeld();
4067   }
4068   if (need_enter_write_thread) {
4069     write_thread_.EnterUnbatched(&w, &mutex_);
4070   }
4071 
4072   std::vector<std::string> cf_names;
4073   std::vector<ColumnFamilyOptions> cf_opts;
4074 
4075   // This part requires mutex to protect the column family options
4076   for (auto cfd : *versions_->GetColumnFamilySet()) {
4077     if (cfd->IsDropped()) {
4078       continue;
4079     }
4080     cf_names.push_back(cfd->GetName());
4081     cf_opts.push_back(cfd->GetLatestCFOptions());
4082   }
4083 
4084   // Unlock during expensive operations.  New writes cannot get here
4085   // because the single write thread ensures all new writes get queued.
4086   DBOptions db_options =
4087       BuildDBOptions(immutable_db_options_, mutable_db_options_);
4088   mutex_.Unlock();
4089 
4090   TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
4091   TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
4092 
4093   std::string file_name =
4094       TempOptionsFileName(GetName(), versions_->NewFileNumber());
4095   Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
4096                                    fs_.get());
4097 
4098   if (s.ok()) {
4099     s = RenameTempFileToOptionsFile(file_name);
4100   }
4101   // restore lock
4102   if (!need_mutex_lock) {
4103     mutex_.Lock();
4104   }
4105   if (need_enter_write_thread) {
4106     write_thread_.ExitUnbatched(&w);
4107   }
4108   if (!s.ok()) {
4109     ROCKS_LOG_WARN(immutable_db_options_.info_log,
4110                    "Unnable to persist options -- %s", s.ToString().c_str());
4111     if (immutable_db_options_.fail_if_options_file_error) {
4112       return Status::IOError("Unable to persist options.",
4113                              s.ToString().c_str());
4114     }
4115   }
4116 #else
4117   (void)need_mutex_lock;
4118   (void)need_enter_write_thread;
4119 #endif  // !ROCKSDB_LITE
4120   return Status::OK();
4121 }
4122 
4123 #ifndef ROCKSDB_LITE
4124 namespace {
DeleteOptionsFilesHelper(const std::map<uint64_t,std::string> & filenames,const size_t num_files_to_keep,const std::shared_ptr<Logger> & info_log,Env * env)4125 void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
4126                               const size_t num_files_to_keep,
4127                               const std::shared_ptr<Logger>& info_log,
4128                               Env* env) {
4129   if (filenames.size() <= num_files_to_keep) {
4130     return;
4131   }
4132   for (auto iter = std::next(filenames.begin(), num_files_to_keep);
4133        iter != filenames.end(); ++iter) {
4134     if (!env->DeleteFile(iter->second).ok()) {
4135       ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
4136                      iter->second.c_str());
4137     }
4138   }
4139 }
4140 }  // namespace
4141 #endif  // !ROCKSDB_LITE
4142 
DeleteObsoleteOptionsFiles()4143 Status DBImpl::DeleteObsoleteOptionsFiles() {
4144 #ifndef ROCKSDB_LITE
4145   std::vector<std::string> filenames;
4146   // use ordered map to store keep the filenames sorted from the newest
4147   // to the oldest.
4148   std::map<uint64_t, std::string> options_filenames;
4149   Status s;
4150   s = GetEnv()->GetChildren(GetName(), &filenames);
4151   if (!s.ok()) {
4152     return s;
4153   }
4154   for (auto& filename : filenames) {
4155     uint64_t file_number;
4156     FileType type;
4157     if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
4158       options_filenames.insert(
4159           {std::numeric_limits<uint64_t>::max() - file_number,
4160            GetName() + "/" + filename});
4161     }
4162   }
4163 
4164   // Keeps the latest 2 Options file
4165   const size_t kNumOptionsFilesKept = 2;
4166   DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
4167                            immutable_db_options_.info_log, GetEnv());
4168   return Status::OK();
4169 #else
4170   return Status::OK();
4171 #endif  // !ROCKSDB_LITE
4172 }
4173 
RenameTempFileToOptionsFile(const std::string & file_name)4174 Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
4175 #ifndef ROCKSDB_LITE
4176   Status s;
4177 
4178   uint64_t options_file_number = versions_->NewFileNumber();
4179   std::string options_file_name =
4180       OptionsFileName(GetName(), options_file_number);
4181   // Retry if the file name happen to conflict with an existing one.
4182   s = GetEnv()->RenameFile(file_name, options_file_name);
4183   if (s.ok()) {
4184     InstrumentedMutexLock l(&mutex_);
4185     versions_->options_file_number_ = options_file_number;
4186   }
4187 
4188   if (0 == disable_delete_obsolete_files_) {
4189     // TODO: Should we check for errors here?
4190     DeleteObsoleteOptionsFiles().PermitUncheckedError();
4191   }
4192   return s;
4193 #else
4194   (void)file_name;
4195   return Status::OK();
4196 #endif  // !ROCKSDB_LITE
4197 }
4198 
4199 #ifdef ROCKSDB_USING_THREAD_STATUS
4200 
NewThreadStatusCfInfo(ColumnFamilyData * cfd) const4201 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
4202   if (immutable_db_options_.enable_thread_tracking) {
4203     ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
4204                                           cfd->ioptions()->env);
4205   }
4206 }
4207 
EraseThreadStatusCfInfo(ColumnFamilyData * cfd) const4208 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
4209   if (immutable_db_options_.enable_thread_tracking) {
4210     ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
4211   }
4212 }
4213 
EraseThreadStatusDbInfo() const4214 void DBImpl::EraseThreadStatusDbInfo() const {
4215   if (immutable_db_options_.enable_thread_tracking) {
4216     ThreadStatusUtil::EraseDatabaseInfo(this);
4217   }
4218 }
4219 
4220 #else
NewThreadStatusCfInfo(ColumnFamilyData *) const4221 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
4222 
EraseThreadStatusCfInfo(ColumnFamilyData *) const4223 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
4224 
EraseThreadStatusDbInfo() const4225 void DBImpl::EraseThreadStatusDbInfo() const {}
4226 #endif  // ROCKSDB_USING_THREAD_STATUS
4227 
4228 //
4229 // A global method that can dump out the build version
DumpRocksDBBuildVersion(Logger * log)4230 void DumpRocksDBBuildVersion(Logger* log) {
4231   ROCKS_LOG_HEADER(log, "RocksDB version: %s\n",
4232                    GetRocksVersionAsString().c_str());
4233   const auto& props = GetRocksBuildProperties();
4234   const auto& sha = props.find("rocksdb_build_git_sha");
4235   if (sha != props.end()) {
4236     ROCKS_LOG_HEADER(log, "Git sha %s", sha->second.c_str());
4237   }
4238   const auto date = props.find("rocksdb_build_date");
4239   if (date != props.end()) {
4240     ROCKS_LOG_HEADER(log, "Compile date %s", date->second.c_str());
4241   }
4242 }
4243 
4244 #ifndef ROCKSDB_LITE
GetEarliestMemTableSequenceNumber(SuperVersion * sv,bool include_history)4245 SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
4246                                                          bool include_history) {
4247   // Find the earliest sequence number that we know we can rely on reading
4248   // from the memtable without needing to check sst files.
4249   SequenceNumber earliest_seq =
4250       sv->imm->GetEarliestSequenceNumber(include_history);
4251   if (earliest_seq == kMaxSequenceNumber) {
4252     earliest_seq = sv->mem->GetEarliestSequenceNumber();
4253   }
4254   assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
4255 
4256   return earliest_seq;
4257 }
4258 #endif  // ROCKSDB_LITE
4259 
4260 #ifndef ROCKSDB_LITE
GetLatestSequenceForKey(SuperVersion * sv,const Slice & key,bool cache_only,SequenceNumber lower_bound_seq,SequenceNumber * seq,bool * found_record_for_key,bool * is_blob_index)4261 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
4262                                        bool cache_only,
4263                                        SequenceNumber lower_bound_seq,
4264                                        SequenceNumber* seq,
4265                                        bool* found_record_for_key,
4266                                        bool* is_blob_index) {
4267   Status s;
4268   MergeContext merge_context;
4269   SequenceNumber max_covering_tombstone_seq = 0;
4270 
4271   ReadOptions read_options;
4272   SequenceNumber current_seq = versions_->LastSequence();
4273   LookupKey lkey(key, current_seq);
4274 
4275   *seq = kMaxSequenceNumber;
4276   *found_record_for_key = false;
4277 
4278   // Check if there is a record for this key in the latest memtable
4279   sv->mem->Get(lkey, nullptr, nullptr, &s, &merge_context,
4280                &max_covering_tombstone_seq, seq, read_options,
4281                nullptr /*read_callback*/, is_blob_index);
4282 
4283   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4284     // unexpected error reading memtable.
4285     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4286                     "Unexpected status returned from MemTable::Get: %s\n",
4287                     s.ToString().c_str());
4288 
4289     return s;
4290   }
4291 
4292   if (*seq != kMaxSequenceNumber) {
4293     // Found a sequence number, no need to check immutable memtables
4294     *found_record_for_key = true;
4295     return Status::OK();
4296   }
4297 
4298   SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
4299   if (lower_bound_in_mem != kMaxSequenceNumber &&
4300       lower_bound_in_mem < lower_bound_seq) {
4301     *found_record_for_key = false;
4302     return Status::OK();
4303   }
4304 
4305   // Check if there is a record for this key in the immutable memtables
4306   sv->imm->Get(lkey, nullptr, nullptr, &s, &merge_context,
4307                &max_covering_tombstone_seq, seq, read_options,
4308                nullptr /*read_callback*/, is_blob_index);
4309 
4310   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4311     // unexpected error reading memtable.
4312     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4313                     "Unexpected status returned from MemTableList::Get: %s\n",
4314                     s.ToString().c_str());
4315 
4316     return s;
4317   }
4318 
4319   if (*seq != kMaxSequenceNumber) {
4320     // Found a sequence number, no need to check memtable history
4321     *found_record_for_key = true;
4322     return Status::OK();
4323   }
4324 
4325   SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
4326   if (lower_bound_in_imm != kMaxSequenceNumber &&
4327       lower_bound_in_imm < lower_bound_seq) {
4328     *found_record_for_key = false;
4329     return Status::OK();
4330   }
4331 
4332   // Check if there is a record for this key in the immutable memtables
4333   sv->imm->GetFromHistory(lkey, nullptr, nullptr, &s, &merge_context,
4334                           &max_covering_tombstone_seq, seq, read_options,
4335                           is_blob_index);
4336 
4337   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4338     // unexpected error reading memtable.
4339     ROCKS_LOG_ERROR(
4340         immutable_db_options_.info_log,
4341         "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
4342         s.ToString().c_str());
4343 
4344     return s;
4345   }
4346 
4347   if (*seq != kMaxSequenceNumber) {
4348     // Found a sequence number, no need to check SST files
4349     *found_record_for_key = true;
4350     return Status::OK();
4351   }
4352 
4353   // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
4354   // check here to skip the history if possible. But currently the caller
4355   // already does that. Maybe we should move the logic here later.
4356 
4357   // TODO(agiardullo): possible optimization: consider checking cached
4358   // SST files if cache_only=true?
4359   if (!cache_only) {
4360     // Check tables
4361     sv->current->Get(read_options, lkey, nullptr, nullptr, &s, &merge_context,
4362                      &max_covering_tombstone_seq, nullptr /* value_found */,
4363                      found_record_for_key, seq, nullptr /*read_callback*/,
4364                      is_blob_index);
4365 
4366     if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
4367       // unexpected error reading SST files
4368       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4369                       "Unexpected status returned from Version::Get: %s\n",
4370                       s.ToString().c_str());
4371     }
4372   }
4373 
4374   return s;
4375 }
4376 
IngestExternalFile(ColumnFamilyHandle * column_family,const std::vector<std::string> & external_files,const IngestExternalFileOptions & ingestion_options)4377 Status DBImpl::IngestExternalFile(
4378     ColumnFamilyHandle* column_family,
4379     const std::vector<std::string>& external_files,
4380     const IngestExternalFileOptions& ingestion_options) {
4381   IngestExternalFileArg arg;
4382   arg.column_family = column_family;
4383   arg.external_files = external_files;
4384   arg.options = ingestion_options;
4385   return IngestExternalFiles({arg});
4386 }
4387 
IngestExternalFiles(const std::vector<IngestExternalFileArg> & args)4388 Status DBImpl::IngestExternalFiles(
4389     const std::vector<IngestExternalFileArg>& args) {
4390   if (args.empty()) {
4391     return Status::InvalidArgument("ingestion arg list is empty");
4392   }
4393   {
4394     std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
4395     for (const auto& arg : args) {
4396       if (arg.column_family == nullptr) {
4397         return Status::InvalidArgument("column family handle is null");
4398       } else if (unique_cfhs.count(arg.column_family) > 0) {
4399         return Status::InvalidArgument(
4400             "ingestion args have duplicate column families");
4401       }
4402       unique_cfhs.insert(arg.column_family);
4403     }
4404   }
4405   // Ingest multiple external SST files atomically.
4406   const size_t num_cfs = args.size();
4407   for (size_t i = 0; i != num_cfs; ++i) {
4408     if (args[i].external_files.empty()) {
4409       char err_msg[128] = {0};
4410       snprintf(err_msg, 128, "external_files[%zu] is empty", i);
4411       return Status::InvalidArgument(err_msg);
4412     }
4413   }
4414   for (const auto& arg : args) {
4415     const IngestExternalFileOptions& ingest_opts = arg.options;
4416     if (ingest_opts.ingest_behind &&
4417         !immutable_db_options_.allow_ingest_behind) {
4418       return Status::InvalidArgument(
4419           "can't ingest_behind file in DB with allow_ingest_behind=false");
4420     }
4421   }
4422 
4423   // TODO (yanqin) maybe handle the case in which column_families have
4424   // duplicates
4425   std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4426   size_t total = 0;
4427   for (const auto& arg : args) {
4428     total += arg.external_files.size();
4429   }
4430   uint64_t next_file_number = 0;
4431   Status status = ReserveFileNumbersBeforeIngestion(
4432       static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
4433       pending_output_elem, &next_file_number);
4434   if (!status.ok()) {
4435     InstrumentedMutexLock l(&mutex_);
4436     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4437     return status;
4438   }
4439 
4440   std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
4441   for (const auto& arg : args) {
4442     auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
4443     ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
4444                                 file_options_, &snapshots_, arg.options,
4445                                 &directories_, &event_logger_, io_tracer_);
4446   }
4447 
4448   // TODO(yanqin) maybe make jobs run in parallel
4449   uint64_t start_file_number = next_file_number;
4450   for (size_t i = 1; i != num_cfs; ++i) {
4451     start_file_number += args[i - 1].external_files.size();
4452     auto* cfd =
4453         static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4454     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4455     Status es = ingestion_jobs[i].Prepare(
4456         args[i].external_files, args[i].files_checksums,
4457         args[i].files_checksum_func_names, start_file_number, super_version);
4458     // capture first error only
4459     if (!es.ok() && status.ok()) {
4460       status = es;
4461     }
4462     CleanupSuperVersion(super_version);
4463   }
4464   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
4465   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
4466   {
4467     auto* cfd =
4468         static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
4469     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
4470     Status es = ingestion_jobs[0].Prepare(
4471         args[0].external_files, args[0].files_checksums,
4472         args[0].files_checksum_func_names, next_file_number, super_version);
4473     if (!es.ok()) {
4474       status = es;
4475     }
4476     CleanupSuperVersion(super_version);
4477   }
4478   if (!status.ok()) {
4479     for (size_t i = 0; i != num_cfs; ++i) {
4480       ingestion_jobs[i].Cleanup(status);
4481     }
4482     InstrumentedMutexLock l(&mutex_);
4483     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4484     return status;
4485   }
4486 
4487   std::vector<SuperVersionContext> sv_ctxs;
4488   for (size_t i = 0; i != num_cfs; ++i) {
4489     sv_ctxs.emplace_back(true /* create_superversion */);
4490   }
4491   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4492   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
4493   TEST_SYNC_POINT("DBImpl::AddFile:Start");
4494   {
4495     InstrumentedMutexLock l(&mutex_);
4496     TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4497 
4498     // Stop writes to the DB by entering both write threads
4499     WriteThread::Writer w;
4500     write_thread_.EnterUnbatched(&w, &mutex_);
4501     WriteThread::Writer nonmem_w;
4502     if (two_write_queues_) {
4503       nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4504     }
4505 
4506     // When unordered_write is enabled, the keys are writing to memtable in an
4507     // unordered way. If the ingestion job checks memtable key range before the
4508     // key landing in memtable, the ingestion job may skip the necessary
4509     // memtable flush.
4510     // So wait here to ensure there is no pending write to memtable.
4511     WaitForPendingWrites();
4512 
4513     num_running_ingest_file_ += static_cast<int>(num_cfs);
4514     TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
4515 
4516     bool at_least_one_cf_need_flush = false;
4517     std::vector<bool> need_flush(num_cfs, false);
4518     for (size_t i = 0; i != num_cfs; ++i) {
4519       auto* cfd =
4520           static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4521       if (cfd->IsDropped()) {
4522         // TODO (yanqin) investigate whether we should abort ingestion or
4523         // proceed with other non-dropped column families.
4524         status = Status::InvalidArgument(
4525             "cannot ingest an external file into a dropped CF");
4526         break;
4527       }
4528       bool tmp = false;
4529       status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4530       need_flush[i] = tmp;
4531       at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4532       if (!status.ok()) {
4533         break;
4534       }
4535     }
4536     TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4537                              &at_least_one_cf_need_flush);
4538 
4539     if (status.ok() && at_least_one_cf_need_flush) {
4540       FlushOptions flush_opts;
4541       flush_opts.allow_write_stall = true;
4542       if (immutable_db_options_.atomic_flush) {
4543         autovector<ColumnFamilyData*> cfds_to_flush;
4544         SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
4545         mutex_.Unlock();
4546         status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4547                                       FlushReason::kExternalFileIngestion,
4548                                       true /* writes_stopped */);
4549         mutex_.Lock();
4550       } else {
4551         for (size_t i = 0; i != num_cfs; ++i) {
4552           if (need_flush[i]) {
4553             mutex_.Unlock();
4554             auto* cfd =
4555                 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4556                     ->cfd();
4557             status = FlushMemTable(cfd, flush_opts,
4558                                    FlushReason::kExternalFileIngestion,
4559                                    true /* writes_stopped */);
4560             mutex_.Lock();
4561             if (!status.ok()) {
4562               break;
4563             }
4564           }
4565         }
4566       }
4567     }
4568     // Run ingestion jobs.
4569     if (status.ok()) {
4570       for (size_t i = 0; i != num_cfs; ++i) {
4571         status = ingestion_jobs[i].Run();
4572         if (!status.ok()) {
4573           break;
4574         }
4575       }
4576     }
4577     if (status.ok()) {
4578       int consumed_seqno_count =
4579           ingestion_jobs[0].ConsumedSequenceNumbersCount();
4580 #ifndef NDEBUG
4581       for (size_t i = 1; i != num_cfs; ++i) {
4582         assert(!!consumed_seqno_count ==
4583                !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4584         consumed_seqno_count +=
4585             ingestion_jobs[i].ConsumedSequenceNumbersCount();
4586       }
4587 #endif
4588       if (consumed_seqno_count > 0) {
4589         const SequenceNumber last_seqno = versions_->LastSequence();
4590         versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4591         versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4592         versions_->SetLastSequence(last_seqno + consumed_seqno_count);
4593       }
4594       autovector<ColumnFamilyData*> cfds_to_commit;
4595       autovector<const MutableCFOptions*> mutable_cf_options_list;
4596       autovector<autovector<VersionEdit*>> edit_lists;
4597       uint32_t num_entries = 0;
4598       for (size_t i = 0; i != num_cfs; ++i) {
4599         auto* cfd =
4600             static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4601         if (cfd->IsDropped()) {
4602           continue;
4603         }
4604         cfds_to_commit.push_back(cfd);
4605         mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4606         autovector<VersionEdit*> edit_list;
4607         edit_list.push_back(ingestion_jobs[i].edit());
4608         edit_lists.push_back(edit_list);
4609         ++num_entries;
4610       }
4611       // Mark the version edits as an atomic group if the number of version
4612       // edits exceeds 1.
4613       if (cfds_to_commit.size() > 1) {
4614         for (auto& edits : edit_lists) {
4615           assert(edits.size() == 1);
4616           edits[0]->MarkAtomicGroup(--num_entries);
4617         }
4618         assert(0 == num_entries);
4619       }
4620       status =
4621           versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4622                                  edit_lists, &mutex_, directories_.GetDbDir());
4623     }
4624 
4625     if (status.ok()) {
4626       for (size_t i = 0; i != num_cfs; ++i) {
4627         auto* cfd =
4628             static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4629         if (!cfd->IsDropped()) {
4630           InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4631                                              *cfd->GetLatestMutableCFOptions());
4632 #ifndef NDEBUG
4633           if (0 == i && num_cfs > 1) {
4634             TEST_SYNC_POINT(
4635                 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4636             TEST_SYNC_POINT(
4637                 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4638           }
4639 #endif  // !NDEBUG
4640         }
4641       }
4642     } else if (versions_->io_status().IsIOError()) {
4643       // Error while writing to MANIFEST.
4644       // In fact, versions_->io_status() can also be the result of renaming
4645       // CURRENT file. With current code, it's just difficult to tell. So just
4646       // be pessimistic and try write to a new MANIFEST.
4647       // TODO: distinguish between MANIFEST write and CURRENT renaming
4648       const IOStatus& io_s = versions_->io_status();
4649       // Should handle return error?
4650       error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
4651     }
4652 
4653     // Resume writes to the DB
4654     if (two_write_queues_) {
4655       nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4656     }
4657     write_thread_.ExitUnbatched(&w);
4658 
4659     if (status.ok()) {
4660       for (auto& job : ingestion_jobs) {
4661         job.UpdateStats();
4662       }
4663     }
4664     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4665     num_running_ingest_file_ -= static_cast<int>(num_cfs);
4666     if (0 == num_running_ingest_file_) {
4667       bg_cv_.SignalAll();
4668     }
4669     TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4670   }
4671   // mutex_ is unlocked here
4672 
4673   // Cleanup
4674   for (size_t i = 0; i != num_cfs; ++i) {
4675     sv_ctxs[i].Clean();
4676     // This may rollback jobs that have completed successfully. This is
4677     // intended for atomicity.
4678     ingestion_jobs[i].Cleanup(status);
4679   }
4680   if (status.ok()) {
4681     for (size_t i = 0; i != num_cfs; ++i) {
4682       auto* cfd =
4683           static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4684       if (!cfd->IsDropped()) {
4685         NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4686       }
4687     }
4688   }
4689   return status;
4690 }
4691 
CreateColumnFamilyWithImport(const ColumnFamilyOptions & options,const std::string & column_family_name,const ImportColumnFamilyOptions & import_options,const ExportImportFilesMetaData & metadata,ColumnFamilyHandle ** handle)4692 Status DBImpl::CreateColumnFamilyWithImport(
4693     const ColumnFamilyOptions& options, const std::string& column_family_name,
4694     const ImportColumnFamilyOptions& import_options,
4695     const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4696   assert(handle != nullptr);
4697   assert(*handle == nullptr);
4698   std::string cf_comparator_name = options.comparator->Name();
4699   if (cf_comparator_name != metadata.db_comparator_name) {
4700     return Status::InvalidArgument("Comparator name mismatch");
4701   }
4702 
4703   // Create column family.
4704   auto status = CreateColumnFamily(options, column_family_name, handle);
4705   if (!status.ok()) {
4706     return status;
4707   }
4708 
4709   // Import sst files from metadata.
4710   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
4711   auto cfd = cfh->cfd();
4712   ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
4713                                    file_options_, import_options,
4714                                    metadata.files, io_tracer_);
4715 
4716   SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4717   VersionEdit dummy_edit;
4718   uint64_t next_file_number = 0;
4719   std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4720   {
4721     // Lock db mutex
4722     InstrumentedMutexLock l(&mutex_);
4723     if (error_handler_.IsDBStopped()) {
4724       // Don't import files when there is a bg_error
4725       status = error_handler_.GetBGError();
4726     }
4727 
4728     // Make sure that bg cleanup wont delete the files that we are importing
4729     pending_output_elem.reset(new std::list<uint64_t>::iterator(
4730         CaptureCurrentFileNumberInPendingOutputs()));
4731 
4732     if (status.ok()) {
4733       // If crash happen after a hard link established, Recover function may
4734       // reuse the file number that has already assigned to the internal file,
4735       // and this will overwrite the external file. To protect the external
4736       // file, we have to make sure the file number will never being reused.
4737       next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4738       auto cf_options = cfd->GetLatestMutableCFOptions();
4739       status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4740                                       directories_.GetDbDir());
4741       if (status.ok()) {
4742         InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4743       }
4744     }
4745   }
4746   dummy_sv_ctx.Clean();
4747 
4748   if (status.ok()) {
4749     SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4750     status = import_job.Prepare(next_file_number, sv);
4751     CleanupSuperVersion(sv);
4752   }
4753 
4754   if (status.ok()) {
4755     SuperVersionContext sv_context(true /*create_superversion*/);
4756     {
4757       // Lock db mutex
4758       InstrumentedMutexLock l(&mutex_);
4759 
4760       // Stop writes to the DB by entering both write threads
4761       WriteThread::Writer w;
4762       write_thread_.EnterUnbatched(&w, &mutex_);
4763       WriteThread::Writer nonmem_w;
4764       if (two_write_queues_) {
4765         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4766       }
4767 
4768       num_running_ingest_file_++;
4769       assert(!cfd->IsDropped());
4770       status = import_job.Run();
4771 
4772       // Install job edit [Mutex will be unlocked here]
4773       if (status.ok()) {
4774         auto cf_options = cfd->GetLatestMutableCFOptions();
4775         status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4776                                         &mutex_, directories_.GetDbDir());
4777         if (status.ok()) {
4778           InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4779         }
4780       }
4781 
4782       // Resume writes to the DB
4783       if (two_write_queues_) {
4784         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4785       }
4786       write_thread_.ExitUnbatched(&w);
4787 
4788       num_running_ingest_file_--;
4789       if (num_running_ingest_file_ == 0) {
4790         bg_cv_.SignalAll();
4791       }
4792     }
4793     // mutex_ is unlocked here
4794 
4795     sv_context.Clean();
4796   }
4797 
4798   {
4799     InstrumentedMutexLock l(&mutex_);
4800     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4801   }
4802 
4803   import_job.Cleanup(status);
4804   if (!status.ok()) {
4805     Status temp_s = DropColumnFamily(*handle);
4806     if (!temp_s.ok()) {
4807       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
4808                       "DropColumnFamily failed with error %s",
4809                       temp_s.ToString().c_str());
4810     }
4811     // Always returns Status::OK()
4812     temp_s = DestroyColumnFamilyHandle(*handle);
4813     assert(temp_s.ok());
4814     *handle = nullptr;
4815   }
4816   return status;
4817 }
4818 
VerifyFileChecksums(const ReadOptions & read_options)4819 Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
4820   return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
4821 }
4822 
VerifyChecksum(const ReadOptions & read_options)4823 Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
4824   return VerifyChecksumInternal(read_options, /*use_file_checksum=*/false);
4825 }
4826 
VerifyChecksumInternal(const ReadOptions & read_options,bool use_file_checksum)4827 Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
4828                                       bool use_file_checksum) {
4829   Status s;
4830 
4831   if (use_file_checksum) {
4832     FileChecksumGenFactory* const file_checksum_gen_factory =
4833         immutable_db_options_.file_checksum_gen_factory.get();
4834     if (!file_checksum_gen_factory) {
4835       s = Status::InvalidArgument(
4836           "Cannot verify file checksum if options.file_checksum_gen_factory is "
4837           "null");
4838       return s;
4839     }
4840   }
4841 
4842   std::vector<ColumnFamilyData*> cfd_list;
4843   {
4844     InstrumentedMutexLock l(&mutex_);
4845     for (auto cfd : *versions_->GetColumnFamilySet()) {
4846       if (!cfd->IsDropped() && cfd->initialized()) {
4847         cfd->Ref();
4848         cfd_list.push_back(cfd);
4849       }
4850     }
4851   }
4852   std::vector<SuperVersion*> sv_list;
4853   for (auto cfd : cfd_list) {
4854     sv_list.push_back(cfd->GetReferencedSuperVersion(this));
4855   }
4856 
4857   for (auto& sv : sv_list) {
4858     VersionStorageInfo* vstorage = sv->current->storage_info();
4859     ColumnFamilyData* cfd = sv->current->cfd();
4860     Options opts;
4861     if (!use_file_checksum) {
4862       InstrumentedMutexLock l(&mutex_);
4863       opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4864                      cfd->GetLatestCFOptions());
4865     }
4866     for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4867       for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4868            j++) {
4869         const auto& fd_with_krange = vstorage->LevelFilesBrief(i).files[j];
4870         const auto& fd = fd_with_krange.fd;
4871         const FileMetaData* fmeta = fd_with_krange.file_metadata;
4872         assert(fmeta);
4873         std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4874                                           fd.GetNumber(), fd.GetPathId());
4875         if (use_file_checksum) {
4876           s = VerifyFullFileChecksum(fmeta->file_checksum,
4877                                      fmeta->file_checksum_func_name, fname,
4878                                      read_options);
4879         } else {
4880           s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
4881                                                        read_options, fname);
4882         }
4883       }
4884     }
4885 
4886     if (s.ok() && use_file_checksum) {
4887       const auto& blob_files = vstorage->GetBlobFiles();
4888       for (const auto& pair : blob_files) {
4889         const uint64_t blob_file_number = pair.first;
4890         const auto& meta = pair.second;
4891         assert(meta);
4892         const std::string blob_file_name = BlobFileName(
4893             cfd->ioptions()->cf_paths.front().path, blob_file_number);
4894         s = VerifyFullFileChecksum(meta->GetChecksumValue(),
4895                                    meta->GetChecksumMethod(), blob_file_name,
4896                                    read_options);
4897         if (!s.ok()) {
4898           break;
4899         }
4900       }
4901     }
4902     if (!s.ok()) {
4903       break;
4904     }
4905   }
4906 
4907   bool defer_purge =
4908           immutable_db_options().avoid_unnecessary_blocking_io;
4909   {
4910     InstrumentedMutexLock l(&mutex_);
4911     for (auto sv : sv_list) {
4912       if (sv && sv->Unref()) {
4913         sv->Cleanup();
4914         if (defer_purge) {
4915           AddSuperVersionsToFreeQueue(sv);
4916         } else {
4917           delete sv;
4918         }
4919       }
4920     }
4921     if (defer_purge) {
4922       SchedulePurge();
4923     }
4924     for (auto cfd : cfd_list) {
4925       cfd->UnrefAndTryDelete();
4926     }
4927   }
4928   return s;
4929 }
4930 
VerifyFullFileChecksum(const std::string & file_checksum_expected,const std::string & func_name_expected,const std::string & fname,const ReadOptions & read_options)4931 Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
4932                                       const std::string& func_name_expected,
4933                                       const std::string& fname,
4934                                       const ReadOptions& read_options) {
4935   Status s;
4936   if (file_checksum_expected == kUnknownFileChecksum) {
4937     return s;
4938   }
4939   std::string file_checksum;
4940   std::string func_name;
4941   s = ROCKSDB_NAMESPACE::GenerateOneFileChecksum(
4942       fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
4943       func_name_expected, &file_checksum, &func_name,
4944       read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
4945       io_tracer_, immutable_db_options_.rate_limiter.get());
4946   if (s.ok()) {
4947     assert(func_name_expected == func_name);
4948     if (file_checksum != file_checksum_expected) {
4949       std::ostringstream oss;
4950       oss << fname << " file checksum mismatch, ";
4951       oss << "expecting "
4952           << Slice(file_checksum_expected).ToString(/*hex=*/true);
4953       oss << ", but actual " << Slice(file_checksum).ToString(/*hex=*/true);
4954       s = Status::Corruption(oss.str());
4955       TEST_SYNC_POINT_CALLBACK("DBImpl::VerifyFullFileChecksum:mismatch", &s);
4956     }
4957   }
4958   return s;
4959 }
4960 
NotifyOnExternalFileIngested(ColumnFamilyData * cfd,const ExternalSstFileIngestionJob & ingestion_job)4961 void DBImpl::NotifyOnExternalFileIngested(
4962     ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
4963   if (immutable_db_options_.listeners.empty()) {
4964     return;
4965   }
4966 
4967   for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4968     ExternalFileIngestionInfo info;
4969     info.cf_name = cfd->GetName();
4970     info.external_file_path = f.external_file_path;
4971     info.internal_file_path = f.internal_file_path;
4972     info.global_seqno = f.assigned_seqno;
4973     info.table_properties = f.table_properties;
4974     for (auto listener : immutable_db_options_.listeners) {
4975       listener->OnExternalFileIngested(this, info);
4976     }
4977   }
4978 }
4979 
WaitForIngestFile()4980 void DBImpl::WaitForIngestFile() {
4981   mutex_.AssertHeld();
4982   while (num_running_ingest_file_ > 0) {
4983     bg_cv_.Wait();
4984   }
4985 }
4986 
StartTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4987 Status DBImpl::StartTrace(const TraceOptions& trace_options,
4988                           std::unique_ptr<TraceWriter>&& trace_writer) {
4989   InstrumentedMutexLock lock(&trace_mutex_);
4990   tracer_.reset(new Tracer(immutable_db_options_.clock, trace_options,
4991                            std::move(trace_writer)));
4992   return Status::OK();
4993 }
4994 
EndTrace()4995 Status DBImpl::EndTrace() {
4996   InstrumentedMutexLock lock(&trace_mutex_);
4997   Status s;
4998   if (tracer_ != nullptr) {
4999     s = tracer_->Close();
5000     tracer_.reset();
5001   } else {
5002     return Status::IOError("No trace file to close");
5003   }
5004   return s;
5005 }
5006 
StartBlockCacheTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)5007 Status DBImpl::StartBlockCacheTrace(
5008     const TraceOptions& trace_options,
5009     std::unique_ptr<TraceWriter>&& trace_writer) {
5010   return block_cache_tracer_.StartTrace(immutable_db_options_.clock,
5011                                         trace_options, std::move(trace_writer));
5012 }
5013 
EndBlockCacheTrace()5014 Status DBImpl::EndBlockCacheTrace() {
5015   block_cache_tracer_.EndTrace();
5016   return Status::OK();
5017 }
5018 
TraceIteratorSeek(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)5019 Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
5020                                  const Slice& lower_bound,
5021                                  const Slice upper_bound) {
5022   Status s;
5023   if (tracer_) {
5024     InstrumentedMutexLock lock(&trace_mutex_);
5025     if (tracer_) {
5026       s = tracer_->IteratorSeek(cf_id, key, lower_bound, upper_bound);
5027     }
5028   }
5029   return s;
5030 }
5031 
TraceIteratorSeekForPrev(const uint32_t & cf_id,const Slice & key,const Slice & lower_bound,const Slice upper_bound)5032 Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
5033                                         const Slice& lower_bound,
5034                                         const Slice upper_bound) {
5035   Status s;
5036   if (tracer_) {
5037     InstrumentedMutexLock lock(&trace_mutex_);
5038     if (tracer_) {
5039       s = tracer_->IteratorSeekForPrev(cf_id, key, lower_bound, upper_bound);
5040     }
5041   }
5042   return s;
5043 }
5044 
ReserveFileNumbersBeforeIngestion(ColumnFamilyData * cfd,uint64_t num,std::unique_ptr<std::list<uint64_t>::iterator> & pending_output_elem,uint64_t * next_file_number)5045 Status DBImpl::ReserveFileNumbersBeforeIngestion(
5046     ColumnFamilyData* cfd, uint64_t num,
5047     std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
5048     uint64_t* next_file_number) {
5049   Status s;
5050   SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
5051   assert(nullptr != next_file_number);
5052   InstrumentedMutexLock l(&mutex_);
5053   if (error_handler_.IsDBStopped()) {
5054     // Do not ingest files when there is a bg_error
5055     return error_handler_.GetBGError();
5056   }
5057   pending_output_elem.reset(new std::list<uint64_t>::iterator(
5058       CaptureCurrentFileNumberInPendingOutputs()));
5059   *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
5060   auto cf_options = cfd->GetLatestMutableCFOptions();
5061   VersionEdit dummy_edit;
5062   // If crash happen after a hard link established, Recover function may
5063   // reuse the file number that has already assigned to the internal file,
5064   // and this will overwrite the external file. To protect the external
5065   // file, we have to make sure the file number will never being reused.
5066   s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
5067                              directories_.GetDbDir());
5068   if (s.ok()) {
5069     InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
5070   }
5071   dummy_sv_ctx.Clean();
5072   return s;
5073 }
5074 
GetCreationTimeOfOldestFile(uint64_t * creation_time)5075 Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
5076   if (mutable_db_options_.max_open_files == -1) {
5077     uint64_t oldest_time = port::kMaxUint64;
5078     for (auto cfd : *versions_->GetColumnFamilySet()) {
5079       if (!cfd->IsDropped()) {
5080         uint64_t ctime;
5081         {
5082           SuperVersion* sv = GetAndRefSuperVersion(cfd);
5083           Version* version = sv->current;
5084           version->GetCreationTimeOfOldestFile(&ctime);
5085           ReturnAndCleanupSuperVersion(cfd, sv);
5086         }
5087 
5088         if (ctime < oldest_time) {
5089           oldest_time = ctime;
5090         }
5091         if (oldest_time == 0) {
5092           break;
5093         }
5094       }
5095     }
5096     *creation_time = oldest_time;
5097     return Status::OK();
5098   } else {
5099     return Status::NotSupported("This API only works if max_open_files = -1");
5100   }
5101 }
5102 #endif  // ROCKSDB_LITE
5103 
5104 }  // namespace ROCKSDB_NAMESPACE
5105