1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <stdint.h>
12 #ifdef OS_SOLARIS
13 #include <alloca.h>
14 #endif
15 
16 #include <algorithm>
17 #include <cinttypes>
18 #include <cstdio>
19 #include <map>
20 #include <set>
21 #include <stdexcept>
22 #include <string>
23 #include <unordered_map>
24 #include <unordered_set>
25 #include <utility>
26 #include <vector>
27 
28 #include "db/arena_wrapped_db_iter.h"
29 #include "db/builder.h"
30 #include "db/compaction/compaction_job.h"
31 #include "db/db_info_dumper.h"
32 #include "db/db_iter.h"
33 #include "db/dbformat.h"
34 #include "db/error_handler.h"
35 #include "db/event_helpers.h"
36 #include "db/external_sst_file_ingestion_job.h"
37 #include "db/flush_job.h"
38 #include "db/forward_iterator.h"
39 #include "db/import_column_family_job.h"
40 #include "db/job_context.h"
41 #include "db/log_reader.h"
42 #include "db/log_writer.h"
43 #include "db/malloc_stats.h"
44 #include "db/memtable.h"
45 #include "db/memtable_list.h"
46 #include "db/merge_context.h"
47 #include "db/merge_helper.h"
48 #include "db/range_tombstone_fragmenter.h"
49 #include "db/table_cache.h"
50 #include "db/table_properties_collector.h"
51 #include "db/transaction_log_impl.h"
52 #include "db/version_set.h"
53 #include "db/write_batch_internal.h"
54 #include "db/write_callback.h"
55 #include "env/composite_env_wrapper.h"
56 #include "file/file_util.h"
57 #include "file/filename.h"
58 #include "file/random_access_file_reader.h"
59 #include "file/sst_file_manager_impl.h"
60 #include "logging/auto_roll_logger.h"
61 #include "logging/log_buffer.h"
62 #include "logging/logging.h"
63 #include "memtable/hash_linklist_rep.h"
64 #include "memtable/hash_skiplist_rep.h"
65 #include "monitoring/in_memory_stats_history.h"
66 #include "monitoring/iostats_context_imp.h"
67 #include "monitoring/perf_context_imp.h"
68 #include "monitoring/persistent_stats_history.h"
69 #include "monitoring/thread_status_updater.h"
70 #include "monitoring/thread_status_util.h"
71 #include "options/cf_options.h"
72 #include "options/options_helper.h"
73 #include "options/options_parser.h"
74 #include "port/port.h"
75 #include "rocksdb/cache.h"
76 #include "rocksdb/compaction_filter.h"
77 #include "rocksdb/convenience.h"
78 #include "rocksdb/db.h"
79 #include "rocksdb/env.h"
80 #include "rocksdb/merge_operator.h"
81 #include "rocksdb/statistics.h"
82 #include "rocksdb/stats_history.h"
83 #include "rocksdb/status.h"
84 #include "rocksdb/table.h"
85 #include "rocksdb/write_buffer_manager.h"
86 #include "table/block_based/block.h"
87 #include "table/block_based/block_based_table_factory.h"
88 #include "table/get_context.h"
89 #include "table/merging_iterator.h"
90 #include "table/multiget_context.h"
91 #include "table/table_builder.h"
92 #include "table/two_level_iterator.h"
93 #include "test_util/sync_point.h"
94 #include "tools/sst_dump_tool_imp.h"
95 #include "util/autovector.h"
96 #include "util/build_version.h"
97 #include "util/cast_util.h"
98 #include "util/coding.h"
99 #include "util/compression.h"
100 #include "util/crc32c.h"
101 #include "util/mutexlock.h"
102 #include "util/stop_watch.h"
103 #include "util/string_util.h"
104 
105 namespace rocksdb {
106 
107 const std::string kDefaultColumnFamilyName("default");
108 const std::string kPersistentStatsColumnFamilyName(
109     "___rocksdb_stats_history___");
110 void DumpRocksDBBuildVersion(Logger* log);
111 
GetCompressionFlush(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)112 CompressionType GetCompressionFlush(
113     const ImmutableCFOptions& ioptions,
114     const MutableCFOptions& mutable_cf_options) {
115   // Compressing memtable flushes might not help unless the sequential load
116   // optimization is used for leveled compaction. Otherwise the CPU and
117   // latency overhead is not offset by saving much space.
118   if (ioptions.compaction_style == kCompactionStyleUniversal) {
119     if (mutable_cf_options.compaction_options_universal
120             .compression_size_percent < 0) {
121       return mutable_cf_options.compression;
122     } else {
123       return kNoCompression;
124     }
125   } else if (!ioptions.compression_per_level.empty()) {
126     // For leveled compress when min_level_to_compress != 0.
127     return ioptions.compression_per_level[0];
128   } else {
129     return mutable_cf_options.compression;
130   }
131 }
132 
133 namespace {
DumpSupportInfo(Logger * logger)134 void DumpSupportInfo(Logger* logger) {
135   ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
136   for (auto& compression : OptionsHelper::compression_type_string_map) {
137     if (compression.second != kNoCompression &&
138         compression.second != kDisableCompressionOption) {
139       ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
140                        CompressionTypeSupported(compression.second));
141     }
142   }
143   ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
144                    crc32c::IsFastCrc32Supported().c_str());
145 }
146 }  // namespace
147 
DBImpl(const DBOptions & options,const std::string & dbname,const bool seq_per_batch,const bool batch_per_txn)148 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
149                const bool seq_per_batch, const bool batch_per_txn)
150     : dbname_(dbname),
151       own_info_log_(options.info_log == nullptr),
152       initial_db_options_(SanitizeOptions(dbname, options)),
153       env_(initial_db_options_.env),
154       fs_(initial_db_options_.file_system),
155       immutable_db_options_(initial_db_options_),
156       mutable_db_options_(initial_db_options_),
157       stats_(immutable_db_options_.statistics.get()),
158       mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
159              immutable_db_options_.use_adaptive_mutex),
160       default_cf_handle_(nullptr),
161       max_total_in_memory_state_(0),
162       file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
163       file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
164           file_options_, immutable_db_options_)),
165       seq_per_batch_(seq_per_batch),
166       batch_per_txn_(batch_per_txn),
167       db_lock_(nullptr),
168       shutting_down_(false),
169       manual_compaction_paused_(false),
170       bg_cv_(&mutex_),
171       logfile_number_(0),
172       log_dir_synced_(false),
173       log_empty_(true),
174       persist_stats_cf_handle_(nullptr),
175       log_sync_cv_(&mutex_),
176       total_log_size_(0),
177       is_snapshot_supported_(true),
178       write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
179       write_thread_(immutable_db_options_),
180       nonmem_write_thread_(immutable_db_options_),
181       write_controller_(mutable_db_options_.delayed_write_rate),
182       last_batch_group_size_(0),
183       unscheduled_flushes_(0),
184       unscheduled_compactions_(0),
185       bg_bottom_compaction_scheduled_(0),
186       bg_compaction_scheduled_(0),
187       num_running_compactions_(0),
188       bg_flush_scheduled_(0),
189       num_running_flushes_(0),
190       bg_purge_scheduled_(0),
191       disable_delete_obsolete_files_(0),
192       pending_purge_obsolete_files_(0),
193       delete_obsolete_files_last_run_(env_->NowMicros()),
194       last_stats_dump_time_microsec_(0),
195       next_job_id_(1),
196       has_unpersisted_data_(false),
197       unable_to_release_oldest_log_(false),
198       num_running_ingest_file_(0),
199 #ifndef ROCKSDB_LITE
200       wal_manager_(immutable_db_options_, file_options_, seq_per_batch),
201 #endif  // ROCKSDB_LITE
202       event_logger_(immutable_db_options_.info_log.get()),
203       bg_work_paused_(0),
204       bg_compaction_paused_(0),
205       refitting_level_(false),
206       opened_successfully_(false),
207       two_write_queues_(options.two_write_queues),
208       manual_wal_flush_(options.manual_wal_flush),
209       // last_sequencee_ is always maintained by the main queue that also writes
210       // to the memtable. When two_write_queues_ is disabled last seq in
211       // memtable is the same as last seq published to the readers. When it is
212       // enabled but seq_per_batch_ is disabled, last seq in memtable still
213       // indicates last published seq since wal-only writes that go to the 2nd
214       // queue do not consume a sequence number. Otherwise writes performed by
215       // the 2nd queue could change what is visible to the readers. In this
216       // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
217       // separate variable to indicate the last published sequence.
218       last_seq_same_as_publish_seq_(
219           !(seq_per_batch && options.two_write_queues)),
220       // Since seq_per_batch_ is currently set only by WritePreparedTxn which
221       // requires a custom gc for compaction, we use that to set use_custom_gc_
222       // as well.
223       use_custom_gc_(seq_per_batch),
224       shutdown_initiated_(false),
225       own_sfm_(options.sst_file_manager == nullptr),
226       preserve_deletes_(options.preserve_deletes),
227       closed_(false),
228       error_handler_(this, immutable_db_options_, &mutex_),
229       atomic_flush_install_cv_(&mutex_) {
230   // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
231   // WriteUnprepared, which should use seq_per_batch_.
232   assert(batch_per_txn_ || seq_per_batch_);
233   env_->GetAbsolutePath(dbname, &db_absolute_path_);
234 
235   // Reserve ten files or so for other uses and give the rest to TableCache.
236   // Give a large number for setting of "infinite" open files.
237   const int table_cache_size = (mutable_db_options_.max_open_files == -1)
238                                    ? TableCache::kInfiniteCapacity
239                                    : mutable_db_options_.max_open_files - 10;
240   LRUCacheOptions co;
241   co.capacity = table_cache_size;
242   co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
243   co.metadata_charge_policy = kDontChargeCacheMetadata;
244   table_cache_ = NewLRUCache(co);
245 
246   versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
247                                  table_cache_.get(), write_buffer_manager_,
248                                  &write_controller_, &block_cache_tracer_));
249   column_family_memtables_.reset(
250       new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
251 
252   DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
253   DumpDBFileSummary(immutable_db_options_, dbname_);
254   immutable_db_options_.Dump(immutable_db_options_.info_log.get());
255   mutable_db_options_.Dump(immutable_db_options_.info_log.get());
256   DumpSupportInfo(immutable_db_options_.info_log.get());
257 
258   // always open the DB with 0 here, which means if preserve_deletes_==true
259   // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
260   // is called by client and this seqnum is advanced.
261   preserve_deletes_seqnum_.store(0);
262 }
263 
Resume()264 Status DBImpl::Resume() {
265   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
266 
267   InstrumentedMutexLock db_mutex(&mutex_);
268 
269   if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
270     // Nothing to do
271     return Status::OK();
272   }
273 
274   if (error_handler_.IsRecoveryInProgress()) {
275     // Don't allow a mix of manual and automatic recovery
276     return Status::Busy();
277   }
278 
279   mutex_.Unlock();
280   Status s = error_handler_.RecoverFromBGError(true);
281   mutex_.Lock();
282   return s;
283 }
284 
285 // This function implements the guts of recovery from a background error. It
286 // is eventually called for both manual as well as automatic recovery. It does
287 // the following -
288 // 1. Wait for currently scheduled background flush/compaction to exit, in
289 //    order to inadvertently causing an error and thinking recovery failed
290 // 2. Flush memtables if there's any data for all the CFs. This may result
291 //    another error, which will be saved by error_handler_ and reported later
292 //    as the recovery status
293 // 3. Find and delete any obsolete files
294 // 4. Schedule compactions if needed for all the CFs. This is needed as the
295 //    flush in the prior step might have been a no-op for some CFs, which
296 //    means a new super version wouldn't have been installed
ResumeImpl()297 Status DBImpl::ResumeImpl() {
298   mutex_.AssertHeld();
299   WaitForBackgroundWork();
300 
301   Status bg_error = error_handler_.GetBGError();
302   Status s;
303   if (shutdown_initiated_) {
304     // Returning shutdown status to SFM during auto recovery will cause it
305     // to abort the recovery and allow the shutdown to progress
306     s = Status::ShutdownInProgress();
307   }
308   if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
309     ROCKS_LOG_INFO(
310         immutable_db_options_.info_log,
311         "DB resume requested but failed due to Fatal/Unrecoverable error");
312     s = bg_error;
313   }
314 
315   // We cannot guarantee consistency of the WAL. So force flush Memtables of
316   // all the column families
317   if (s.ok()) {
318     FlushOptions flush_opts;
319     // We allow flush to stall write since we are trying to resume from error.
320     flush_opts.allow_write_stall = true;
321     if (immutable_db_options_.atomic_flush) {
322       autovector<ColumnFamilyData*> cfds;
323       SelectColumnFamiliesForAtomicFlush(&cfds);
324       mutex_.Unlock();
325       s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
326       mutex_.Lock();
327     } else {
328       for (auto cfd : *versions_->GetColumnFamilySet()) {
329         if (cfd->IsDropped()) {
330           continue;
331         }
332         cfd->Ref();
333         mutex_.Unlock();
334         s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
335         mutex_.Lock();
336         cfd->UnrefAndTryDelete();
337         if (!s.ok()) {
338           break;
339         }
340       }
341     }
342     if (!s.ok()) {
343       ROCKS_LOG_INFO(immutable_db_options_.info_log,
344                      "DB resume requested but failed due to Flush failure [%s]",
345                      s.ToString().c_str());
346     }
347   }
348 
349   JobContext job_context(0);
350   FindObsoleteFiles(&job_context, true);
351   if (s.ok()) {
352     s = error_handler_.ClearBGError();
353   }
354   mutex_.Unlock();
355 
356   job_context.manifest_file_number = 1;
357   if (job_context.HaveSomethingToDelete()) {
358     PurgeObsoleteFiles(job_context);
359   }
360   job_context.Clean();
361 
362   if (s.ok()) {
363     ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
364   }
365   mutex_.Lock();
366   // Check for shutdown again before scheduling further compactions,
367   // since we released and re-acquired the lock above
368   if (shutdown_initiated_) {
369     s = Status::ShutdownInProgress();
370   }
371   if (s.ok()) {
372     for (auto cfd : *versions_->GetColumnFamilySet()) {
373       SchedulePendingCompaction(cfd);
374     }
375     MaybeScheduleFlushOrCompaction();
376   }
377 
378   // Wake up any waiters - in this case, it could be the shutdown thread
379   bg_cv_.SignalAll();
380 
381   // No need to check BGError again. If something happened, event listener would
382   // be notified and the operation causing it would have failed
383   return s;
384 }
385 
WaitForBackgroundWork()386 void DBImpl::WaitForBackgroundWork() {
387   // Wait for background work to finish
388   while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
389          bg_flush_scheduled_) {
390     bg_cv_.Wait();
391   }
392 }
393 
394 // Will lock the mutex_,  will wait for completion if wait is true
CancelAllBackgroundWork(bool wait)395 void DBImpl::CancelAllBackgroundWork(bool wait) {
396   ROCKS_LOG_INFO(immutable_db_options_.info_log,
397                  "Shutdown: canceling all background work");
398 
399   if (thread_dump_stats_ != nullptr) {
400     thread_dump_stats_->cancel();
401     thread_dump_stats_.reset();
402   }
403   if (thread_persist_stats_ != nullptr) {
404     thread_persist_stats_->cancel();
405     thread_persist_stats_.reset();
406   }
407   InstrumentedMutexLock l(&mutex_);
408   if (!shutting_down_.load(std::memory_order_acquire) &&
409       has_unpersisted_data_.load(std::memory_order_relaxed) &&
410       !mutable_db_options_.avoid_flush_during_shutdown) {
411     if (immutable_db_options_.atomic_flush) {
412       autovector<ColumnFamilyData*> cfds;
413       SelectColumnFamiliesForAtomicFlush(&cfds);
414       mutex_.Unlock();
415       AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
416       mutex_.Lock();
417     } else {
418       for (auto cfd : *versions_->GetColumnFamilySet()) {
419         if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
420           cfd->Ref();
421           mutex_.Unlock();
422           FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
423           mutex_.Lock();
424           cfd->UnrefAndTryDelete();
425         }
426       }
427     }
428     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
429   }
430 
431   shutting_down_.store(true, std::memory_order_release);
432   bg_cv_.SignalAll();
433   if (!wait) {
434     return;
435   }
436   WaitForBackgroundWork();
437 }
438 
CloseHelper()439 Status DBImpl::CloseHelper() {
440   // Guarantee that there is no background error recovery in progress before
441   // continuing with the shutdown
442   mutex_.Lock();
443   shutdown_initiated_ = true;
444   error_handler_.CancelErrorRecovery();
445   while (error_handler_.IsRecoveryInProgress()) {
446     bg_cv_.Wait();
447   }
448   mutex_.Unlock();
449 
450   // CancelAllBackgroundWork called with false means we just set the shutdown
451   // marker. After this we do a variant of the waiting and unschedule work
452   // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
453   CancelAllBackgroundWork(false);
454   int bottom_compactions_unscheduled =
455       env_->UnSchedule(this, Env::Priority::BOTTOM);
456   int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
457   int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
458   Status ret;
459   mutex_.Lock();
460   bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
461   bg_compaction_scheduled_ -= compactions_unscheduled;
462   bg_flush_scheduled_ -= flushes_unscheduled;
463 
464   // Wait for background work to finish
465   while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
466          bg_flush_scheduled_ || bg_purge_scheduled_ ||
467          pending_purge_obsolete_files_ ||
468          error_handler_.IsRecoveryInProgress()) {
469     TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
470     bg_cv_.Wait();
471   }
472   TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
473                            &files_grabbed_for_purge_);
474   EraseThreadStatusDbInfo();
475   flush_scheduler_.Clear();
476   trim_history_scheduler_.Clear();
477 
478   while (!flush_queue_.empty()) {
479     const FlushRequest& flush_req = PopFirstFromFlushQueue();
480     for (const auto& iter : flush_req) {
481       iter.first->UnrefAndTryDelete();
482     }
483   }
484   while (!compaction_queue_.empty()) {
485     auto cfd = PopFirstFromCompactionQueue();
486     cfd->UnrefAndTryDelete();
487   }
488 
489   if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
490     // we need to delete handle outside of lock because it does its own locking
491     mutex_.Unlock();
492     if (default_cf_handle_) {
493       delete default_cf_handle_;
494       default_cf_handle_ = nullptr;
495     }
496     if (persist_stats_cf_handle_) {
497       delete persist_stats_cf_handle_;
498       persist_stats_cf_handle_ = nullptr;
499     }
500     mutex_.Lock();
501   }
502 
503   // Clean up obsolete files due to SuperVersion release.
504   // (1) Need to delete to obsolete files before closing because RepairDB()
505   // scans all existing files in the file system and builds manifest file.
506   // Keeping obsolete files confuses the repair process.
507   // (2) Need to check if we Open()/Recover() the DB successfully before
508   // deleting because if VersionSet recover fails (may be due to corrupted
509   // manifest file), it is not able to identify live files correctly. As a
510   // result, all "live" files can get deleted by accident. However, corrupted
511   // manifest is recoverable by RepairDB().
512   if (opened_successfully_) {
513     JobContext job_context(next_job_id_.fetch_add(1));
514     FindObsoleteFiles(&job_context, true);
515 
516     mutex_.Unlock();
517     // manifest number starting from 2
518     job_context.manifest_file_number = 1;
519     if (job_context.HaveSomethingToDelete()) {
520       PurgeObsoleteFiles(job_context);
521     }
522     job_context.Clean();
523     mutex_.Lock();
524   }
525 
526   for (auto l : logs_to_free_) {
527     delete l;
528   }
529   for (auto& log : logs_) {
530     uint64_t log_number = log.writer->get_log_number();
531     Status s = log.ClearWriter();
532     if (!s.ok()) {
533       ROCKS_LOG_WARN(
534           immutable_db_options_.info_log,
535           "Unable to Sync WAL file %s with error -- %s",
536           LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
537           s.ToString().c_str());
538       // Retain the first error
539       if (ret.ok()) {
540         ret = s;
541       }
542     }
543   }
544   logs_.clear();
545 
546   // Table cache may have table handles holding blocks from the block cache.
547   // We need to release them before the block cache is destroyed. The block
548   // cache may be destroyed inside versions_.reset(), when column family data
549   // list is destroyed, so leaving handles in table cache after
550   // versions_.reset() may cause issues.
551   // Here we clean all unreferenced handles in table cache.
552   // Now we assume all user queries have finished, so only version set itself
553   // can possibly hold the blocks from block cache. After releasing unreferenced
554   // handles here, only handles held by version set left and inside
555   // versions_.reset(), we will release them. There, we need to make sure every
556   // time a handle is released, we erase it from the cache too. By doing that,
557   // we can guarantee that after versions_.reset(), table cache is empty
558   // so the cache can be safely destroyed.
559   table_cache_->EraseUnRefEntries();
560 
561   for (auto& txn_entry : recovered_transactions_) {
562     delete txn_entry.second;
563   }
564 
565   // versions need to be destroyed before table_cache since it can hold
566   // references to table_cache.
567   versions_.reset();
568   mutex_.Unlock();
569   if (db_lock_ != nullptr) {
570     env_->UnlockFile(db_lock_);
571   }
572 
573   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
574   LogFlush(immutable_db_options_.info_log);
575 
576 #ifndef ROCKSDB_LITE
577   // If the sst_file_manager was allocated by us during DB::Open(), ccall
578   // Close() on it before closing the info_log. Otherwise, background thread
579   // in SstFileManagerImpl might try to log something
580   if (immutable_db_options_.sst_file_manager && own_sfm_) {
581     auto sfm = static_cast<SstFileManagerImpl*>(
582         immutable_db_options_.sst_file_manager.get());
583     sfm->Close();
584   }
585 #endif  // ROCKSDB_LITE
586 
587   if (immutable_db_options_.info_log && own_info_log_) {
588     Status s = immutable_db_options_.info_log->Close();
589     if (ret.ok()) {
590       ret = s;
591     }
592   }
593 
594   if (ret.IsAborted()) {
595     // Reserve IsAborted() error for those where users didn't release
596     // certain resource and they can release them and come back and
597     // retry. In this case, we wrap this exception to something else.
598     return Status::Incomplete(ret.ToString());
599   }
600   return ret;
601 }
602 
CloseImpl()603 Status DBImpl::CloseImpl() { return CloseHelper(); }
604 
~DBImpl()605 DBImpl::~DBImpl() {
606   if (!closed_) {
607     closed_ = true;
608     CloseHelper();
609   }
610 }
611 
MaybeIgnoreError(Status * s) const612 void DBImpl::MaybeIgnoreError(Status* s) const {
613   if (s->ok() || immutable_db_options_.paranoid_checks) {
614     // No change needed
615   } else {
616     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
617                    s->ToString().c_str());
618     *s = Status::OK();
619   }
620 }
621 
CreateArchivalDirectory()622 const Status DBImpl::CreateArchivalDirectory() {
623   if (immutable_db_options_.wal_ttl_seconds > 0 ||
624       immutable_db_options_.wal_size_limit_mb > 0) {
625     std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
626     return env_->CreateDirIfMissing(archivalPath);
627   }
628   return Status::OK();
629 }
630 
PrintStatistics()631 void DBImpl::PrintStatistics() {
632   auto dbstats = immutable_db_options_.statistics.get();
633   if (dbstats) {
634     ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
635                    dbstats->ToString().c_str());
636   }
637 }
638 
StartTimedTasks()639 void DBImpl::StartTimedTasks() {
640   unsigned int stats_dump_period_sec = 0;
641   unsigned int stats_persist_period_sec = 0;
642   {
643     InstrumentedMutexLock l(&mutex_);
644     stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
645     if (stats_dump_period_sec > 0) {
646       if (!thread_dump_stats_) {
647         thread_dump_stats_.reset(new rocksdb::RepeatableThread(
648             [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
649             static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond));
650       }
651     }
652     stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
653     if (stats_persist_period_sec > 0) {
654       if (!thread_persist_stats_) {
655         thread_persist_stats_.reset(new rocksdb::RepeatableThread(
656             [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
657             static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond));
658       }
659     }
660   }
661 }
662 
663 // esitmate the total size of stats_history_
EstimateInMemoryStatsHistorySize() const664 size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
665   size_t size_total =
666       sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
667   if (stats_history_.size() == 0) return size_total;
668   size_t size_per_slice =
669       sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
670   // non-empty map, stats_history_.begin() guaranteed to exist
671   std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
672   for (const auto& pairs : sample_slice) {
673     size_per_slice +=
674         pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
675   }
676   size_total = size_per_slice * stats_history_.size();
677   return size_total;
678 }
679 
PersistStats()680 void DBImpl::PersistStats() {
681   TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
682 #ifndef ROCKSDB_LITE
683   if (shutdown_initiated_) {
684     return;
685   }
686   uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
687   Statistics* statistics = immutable_db_options_.statistics.get();
688   if (!statistics) {
689     return;
690   }
691   size_t stats_history_size_limit = 0;
692   {
693     InstrumentedMutexLock l(&mutex_);
694     stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
695   }
696 
697   std::map<std::string, uint64_t> stats_map;
698   if (!statistics->getTickerMap(&stats_map)) {
699     return;
700   }
701   ROCKS_LOG_INFO(immutable_db_options_.info_log,
702                  "------- PERSISTING STATS -------");
703 
704   if (immutable_db_options_.persist_stats_to_disk) {
705     WriteBatch batch;
706     if (stats_slice_initialized_) {
707       ROCKS_LOG_INFO(immutable_db_options_.info_log,
708                      "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
709                      stats_slice_.size());
710       for (const auto& stat : stats_map) {
711         char key[100];
712         int length =
713             EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
714         // calculate the delta from last time
715         if (stats_slice_.find(stat.first) != stats_slice_.end()) {
716           uint64_t delta = stat.second - stats_slice_[stat.first];
717           batch.Put(persist_stats_cf_handle_, Slice(key, std::min(100, length)),
718                     ToString(delta));
719         }
720       }
721     }
722     stats_slice_initialized_ = true;
723     std::swap(stats_slice_, stats_map);
724     WriteOptions wo;
725     wo.low_pri = true;
726     wo.no_slowdown = true;
727     wo.sync = false;
728     Status s = Write(wo, &batch);
729     if (!s.ok()) {
730       ROCKS_LOG_INFO(immutable_db_options_.info_log,
731                      "Writing to persistent stats CF failed -- %s",
732                      s.ToString().c_str());
733     } else {
734       ROCKS_LOG_INFO(immutable_db_options_.info_log,
735                      "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
736                      " to persistent stats CF succeeded",
737                      stats_slice_.size(), now_seconds);
738     }
739     // TODO(Zhongyi): add purging for persisted data
740   } else {
741     InstrumentedMutexLock l(&stats_history_mutex_);
742     // calculate the delta from last time
743     if (stats_slice_initialized_) {
744       std::map<std::string, uint64_t> stats_delta;
745       for (const auto& stat : stats_map) {
746         if (stats_slice_.find(stat.first) != stats_slice_.end()) {
747           stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
748         }
749       }
750       ROCKS_LOG_INFO(immutable_db_options_.info_log,
751                      "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
752                      " to in-memory stats history",
753                      stats_slice_.size(), now_seconds);
754       stats_history_[now_seconds] = stats_delta;
755     }
756     stats_slice_initialized_ = true;
757     std::swap(stats_slice_, stats_map);
758     TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
759 
760     // delete older stats snapshots to control memory consumption
761     size_t stats_history_size = EstimateInMemoryStatsHistorySize();
762     bool purge_needed = stats_history_size > stats_history_size_limit;
763     ROCKS_LOG_INFO(immutable_db_options_.info_log,
764                    "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
765                    " bytes, slice count: %" ROCKSDB_PRIszt,
766                    stats_history_size, stats_history_.size());
767     while (purge_needed && !stats_history_.empty()) {
768       stats_history_.erase(stats_history_.begin());
769       purge_needed =
770           EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
771     }
772     ROCKS_LOG_INFO(immutable_db_options_.info_log,
773                    "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
774                    " bytes, slice count: %" ROCKSDB_PRIszt,
775                    stats_history_size, stats_history_.size());
776   }
777 #endif  // !ROCKSDB_LITE
778 }
779 
FindStatsByTime(uint64_t start_time,uint64_t end_time,uint64_t * new_time,std::map<std::string,uint64_t> * stats_map)780 bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
781                              uint64_t* new_time,
782                              std::map<std::string, uint64_t>* stats_map) {
783   assert(new_time);
784   assert(stats_map);
785   if (!new_time || !stats_map) return false;
786   // lock when search for start_time
787   {
788     InstrumentedMutexLock l(&stats_history_mutex_);
789     auto it = stats_history_.lower_bound(start_time);
790     if (it != stats_history_.end() && it->first < end_time) {
791       // make a copy for timestamp and stats_map
792       *new_time = it->first;
793       *stats_map = it->second;
794       return true;
795     } else {
796       return false;
797     }
798   }
799 }
800 
GetStatsHistory(uint64_t start_time,uint64_t end_time,std::unique_ptr<StatsHistoryIterator> * stats_iterator)801 Status DBImpl::GetStatsHistory(
802     uint64_t start_time, uint64_t end_time,
803     std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
804   if (!stats_iterator) {
805     return Status::InvalidArgument("stats_iterator not preallocated.");
806   }
807   if (immutable_db_options_.persist_stats_to_disk) {
808     stats_iterator->reset(
809         new PersistentStatsHistoryIterator(start_time, end_time, this));
810   } else {
811     stats_iterator->reset(
812         new InMemoryStatsHistoryIterator(start_time, end_time, this));
813   }
814   return (*stats_iterator)->status();
815 }
816 
DumpStats()817 void DBImpl::DumpStats() {
818   TEST_SYNC_POINT("DBImpl::DumpStats:1");
819 #ifndef ROCKSDB_LITE
820   const DBPropertyInfo* cf_property_info =
821       GetPropertyInfo(DB::Properties::kCFStats);
822   assert(cf_property_info != nullptr);
823   const DBPropertyInfo* db_property_info =
824       GetPropertyInfo(DB::Properties::kDBStats);
825   assert(db_property_info != nullptr);
826 
827   std::string stats;
828   if (shutdown_initiated_) {
829     return;
830   }
831   {
832     InstrumentedMutexLock l(&mutex_);
833     default_cf_internal_stats_->GetStringProperty(
834         *db_property_info, DB::Properties::kDBStats, &stats);
835     for (auto cfd : *versions_->GetColumnFamilySet()) {
836       if (cfd->initialized()) {
837         cfd->internal_stats()->GetStringProperty(
838             *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
839       }
840     }
841     for (auto cfd : *versions_->GetColumnFamilySet()) {
842       if (cfd->initialized()) {
843         cfd->internal_stats()->GetStringProperty(
844             *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
845       }
846     }
847   }
848   TEST_SYNC_POINT("DBImpl::DumpStats:2");
849   ROCKS_LOG_INFO(immutable_db_options_.info_log,
850                  "------- DUMPING STATS -------");
851   ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
852   if (immutable_db_options_.dump_malloc_stats) {
853     stats.clear();
854     DumpMallocStats(&stats);
855     if (!stats.empty()) {
856       ROCKS_LOG_INFO(immutable_db_options_.info_log,
857                      "------- Malloc STATS -------");
858       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
859     }
860   }
861 #endif  // !ROCKSDB_LITE
862 
863   PrintStatistics();
864 }
865 
TablesRangeTombstoneSummary(ColumnFamilyHandle * column_family,int max_entries_to_print,std::string * out_str)866 Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
867                                            int max_entries_to_print,
868                                            std::string* out_str) {
869   auto* cfh =
870       static_cast_with_check<ColumnFamilyHandleImpl, ColumnFamilyHandle>(
871           column_family);
872   ColumnFamilyData* cfd = cfh->cfd();
873 
874   SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
875   Version* version = super_version->current;
876 
877   Status s =
878       version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
879 
880   CleanupSuperVersion(super_version);
881   return s;
882 }
883 
ScheduleBgLogWriterClose(JobContext * job_context)884 void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
885   if (!job_context->logs_to_free.empty()) {
886     for (auto l : job_context->logs_to_free) {
887       AddToLogsToFreeQueue(l);
888     }
889     job_context->logs_to_free.clear();
890   }
891 }
892 
GetDataDir(ColumnFamilyData * cfd,size_t path_id) const893 Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
894   assert(cfd);
895   Directory* ret_dir = cfd->GetDataDir(path_id);
896   if (ret_dir == nullptr) {
897     return directories_.GetDataDir(path_id);
898   }
899   return ret_dir;
900 }
901 
SetOptions(ColumnFamilyHandle * column_family,const std::unordered_map<std::string,std::string> & options_map)902 Status DBImpl::SetOptions(
903     ColumnFamilyHandle* column_family,
904     const std::unordered_map<std::string, std::string>& options_map) {
905 #ifdef ROCKSDB_LITE
906   (void)column_family;
907   (void)options_map;
908   return Status::NotSupported("Not supported in ROCKSDB LITE");
909 #else
910   auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
911   if (options_map.empty()) {
912     ROCKS_LOG_WARN(immutable_db_options_.info_log,
913                    "SetOptions() on column family [%s], empty input",
914                    cfd->GetName().c_str());
915     return Status::InvalidArgument("empty input");
916   }
917 
918   MutableCFOptions new_options;
919   Status s;
920   Status persist_options_status;
921   SuperVersionContext sv_context(/* create_superversion */ true);
922   {
923     auto db_options = GetDBOptions();
924     InstrumentedMutexLock l(&mutex_);
925     s = cfd->SetOptions(db_options, options_map);
926     if (s.ok()) {
927       new_options = *cfd->GetLatestMutableCFOptions();
928       // Append new version to recompute compaction score.
929       VersionEdit dummy_edit;
930       versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
931                              directories_.GetDbDir());
932       // Trigger possible flush/compactions. This has to be before we persist
933       // options to file, otherwise there will be a deadlock with writer
934       // thread.
935       InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
936 
937       persist_options_status = WriteOptionsFile(
938           false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
939       bg_cv_.SignalAll();
940     }
941   }
942   sv_context.Clean();
943 
944   ROCKS_LOG_INFO(
945       immutable_db_options_.info_log,
946       "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
947   for (const auto& o : options_map) {
948     ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
949                    o.second.c_str());
950   }
951   if (s.ok()) {
952     ROCKS_LOG_INFO(immutable_db_options_.info_log,
953                    "[%s] SetOptions() succeeded", cfd->GetName().c_str());
954     new_options.Dump(immutable_db_options_.info_log.get());
955     if (!persist_options_status.ok()) {
956       s = persist_options_status;
957     }
958   } else {
959     ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
960                    cfd->GetName().c_str());
961   }
962   LogFlush(immutable_db_options_.info_log);
963   return s;
964 #endif  // ROCKSDB_LITE
965 }
966 
SetDBOptions(const std::unordered_map<std::string,std::string> & options_map)967 Status DBImpl::SetDBOptions(
968     const std::unordered_map<std::string, std::string>& options_map) {
969 #ifdef ROCKSDB_LITE
970   (void)options_map;
971   return Status::NotSupported("Not supported in ROCKSDB LITE");
972 #else
973   if (options_map.empty()) {
974     ROCKS_LOG_WARN(immutable_db_options_.info_log,
975                    "SetDBOptions(), empty input.");
976     return Status::InvalidArgument("empty input");
977   }
978 
979   MutableDBOptions new_options;
980   Status s;
981   Status persist_options_status;
982   bool wal_changed = false;
983   WriteContext write_context;
984   {
985     InstrumentedMutexLock l(&mutex_);
986     s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
987                                        &new_options);
988     if (new_options.bytes_per_sync == 0) {
989       new_options.bytes_per_sync = 1024 * 1024;
990     }
991     DBOptions new_db_options =
992         BuildDBOptions(immutable_db_options_, new_options);
993     if (s.ok()) {
994       s = ValidateOptions(new_db_options);
995     }
996     if (s.ok()) {
997       for (auto c : *versions_->GetColumnFamilySet()) {
998         if (!c->IsDropped()) {
999           auto cf_options = c->GetLatestCFOptions();
1000           s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1001           if (!s.ok()) {
1002             break;
1003           }
1004         }
1005       }
1006     }
1007     if (s.ok()) {
1008       const BGJobLimits current_bg_job_limits =
1009           GetBGJobLimits(immutable_db_options_.max_background_flushes,
1010                          mutable_db_options_.max_background_compactions,
1011                          mutable_db_options_.max_background_jobs,
1012                          /* parallelize_compactions */ true);
1013       const BGJobLimits new_bg_job_limits = GetBGJobLimits(
1014           immutable_db_options_.max_background_flushes,
1015           new_options.max_background_compactions,
1016           new_options.max_background_jobs, /* parallelize_compactions */ true);
1017 
1018       const bool max_flushes_increased =
1019           new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1020       const bool max_compactions_increased =
1021           new_bg_job_limits.max_compactions >
1022           current_bg_job_limits.max_compactions;
1023 
1024       if (max_flushes_increased || max_compactions_increased) {
1025         if (max_flushes_increased) {
1026           env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1027                                              Env::Priority::HIGH);
1028         }
1029 
1030         if (max_compactions_increased) {
1031           env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1032                                              Env::Priority::LOW);
1033         }
1034 
1035         MaybeScheduleFlushOrCompaction();
1036       }
1037 
1038       if (new_options.stats_dump_period_sec !=
1039           mutable_db_options_.stats_dump_period_sec) {
1040         if (thread_dump_stats_) {
1041           mutex_.Unlock();
1042           thread_dump_stats_->cancel();
1043           mutex_.Lock();
1044         }
1045         if (new_options.stats_dump_period_sec > 0) {
1046           thread_dump_stats_.reset(new rocksdb::RepeatableThread(
1047               [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
1048               static_cast<uint64_t>(new_options.stats_dump_period_sec) *
1049                   kMicrosInSecond));
1050         } else {
1051           thread_dump_stats_.reset();
1052         }
1053       }
1054       if (new_options.stats_persist_period_sec !=
1055           mutable_db_options_.stats_persist_period_sec) {
1056         if (thread_persist_stats_) {
1057           mutex_.Unlock();
1058           thread_persist_stats_->cancel();
1059           mutex_.Lock();
1060         }
1061         if (new_options.stats_persist_period_sec > 0) {
1062           thread_persist_stats_.reset(new rocksdb::RepeatableThread(
1063               [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
1064               static_cast<uint64_t>(new_options.stats_persist_period_sec) *
1065                   kMicrosInSecond));
1066         } else {
1067           thread_persist_stats_.reset();
1068         }
1069       }
1070       write_controller_.set_max_delayed_write_rate(
1071           new_options.delayed_write_rate);
1072       table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1073                                           ? TableCache::kInfiniteCapacity
1074                                           : new_options.max_open_files - 10);
1075       wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1076                     new_options.wal_bytes_per_sync;
1077       mutable_db_options_ = new_options;
1078       file_options_for_compaction_ = FileOptions(new_db_options);
1079       file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1080           file_options_for_compaction_, immutable_db_options_);
1081       versions_->ChangeFileOptions(mutable_db_options_);
1082       //TODO(xiez): clarify why apply optimize for read to write options
1083       file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1084           file_options_for_compaction_, immutable_db_options_);
1085       file_options_for_compaction_.compaction_readahead_size =
1086           mutable_db_options_.compaction_readahead_size;
1087       WriteThread::Writer w;
1088       write_thread_.EnterUnbatched(&w, &mutex_);
1089       if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1090         Status purge_wal_status = SwitchWAL(&write_context);
1091         if (!purge_wal_status.ok()) {
1092           ROCKS_LOG_WARN(immutable_db_options_.info_log,
1093                          "Unable to purge WAL files in SetDBOptions() -- %s",
1094                          purge_wal_status.ToString().c_str());
1095         }
1096       }
1097       persist_options_status = WriteOptionsFile(
1098           false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1099       write_thread_.ExitUnbatched(&w);
1100     }
1101   }
1102   ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1103   for (const auto& o : options_map) {
1104     ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1105                    o.second.c_str());
1106   }
1107   if (s.ok()) {
1108     ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1109     new_options.Dump(immutable_db_options_.info_log.get());
1110     if (!persist_options_status.ok()) {
1111       if (immutable_db_options_.fail_if_options_file_error) {
1112         s = Status::IOError(
1113             "SetDBOptions() succeeded, but unable to persist options",
1114             persist_options_status.ToString());
1115       }
1116       ROCKS_LOG_WARN(immutable_db_options_.info_log,
1117                      "Unable to persist options in SetDBOptions() -- %s",
1118                      persist_options_status.ToString().c_str());
1119     }
1120   } else {
1121     ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1122   }
1123   LogFlush(immutable_db_options_.info_log);
1124   return s;
1125 #endif  // ROCKSDB_LITE
1126 }
1127 
1128 // return the same level if it cannot be moved
FindMinimumEmptyLevelFitting(ColumnFamilyData * cfd,const MutableCFOptions &,int level)1129 int DBImpl::FindMinimumEmptyLevelFitting(
1130     ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1131     int level) {
1132   mutex_.AssertHeld();
1133   const auto* vstorage = cfd->current()->storage_info();
1134   int minimum_level = level;
1135   for (int i = level - 1; i > 0; --i) {
1136     // stop if level i is not empty
1137     if (vstorage->NumLevelFiles(i) > 0) break;
1138     // stop if level i is too small (cannot fit the level files)
1139     if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1140       break;
1141     }
1142 
1143     minimum_level = i;
1144   }
1145   return minimum_level;
1146 }
1147 
FlushWAL(bool sync)1148 Status DBImpl::FlushWAL(bool sync) {
1149   if (manual_wal_flush_) {
1150     Status s;
1151     {
1152       // We need to lock log_write_mutex_ since logs_ might change concurrently
1153       InstrumentedMutexLock wl(&log_write_mutex_);
1154       log::Writer* cur_log_writer = logs_.back().writer;
1155       s = cur_log_writer->WriteBuffer();
1156     }
1157     if (!s.ok()) {
1158       ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1159                       s.ToString().c_str());
1160       // In case there is a fs error we should set it globally to prevent the
1161       // future writes
1162       WriteStatusCheck(s);
1163       // whether sync or not, we should abort the rest of function upon error
1164       return s;
1165     }
1166     if (!sync) {
1167       ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1168       return s;
1169     }
1170   }
1171   if (!sync) {
1172     return Status::OK();
1173   }
1174   // sync = true
1175   ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1176   return SyncWAL();
1177 }
1178 
SyncWAL()1179 Status DBImpl::SyncWAL() {
1180   autovector<log::Writer*, 1> logs_to_sync;
1181   bool need_log_dir_sync;
1182   uint64_t current_log_number;
1183 
1184   {
1185     InstrumentedMutexLock l(&mutex_);
1186     assert(!logs_.empty());
1187 
1188     // This SyncWAL() call only cares about logs up to this number.
1189     current_log_number = logfile_number_;
1190 
1191     while (logs_.front().number <= current_log_number &&
1192            logs_.front().getting_synced) {
1193       log_sync_cv_.Wait();
1194     }
1195     // First check that logs are safe to sync in background.
1196     for (auto it = logs_.begin();
1197          it != logs_.end() && it->number <= current_log_number; ++it) {
1198       if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1199         return Status::NotSupported(
1200             "SyncWAL() is not supported for this implementation of WAL file",
1201             immutable_db_options_.allow_mmap_writes
1202                 ? "try setting Options::allow_mmap_writes to false"
1203                 : Slice());
1204       }
1205     }
1206     for (auto it = logs_.begin();
1207          it != logs_.end() && it->number <= current_log_number; ++it) {
1208       auto& log = *it;
1209       assert(!log.getting_synced);
1210       log.getting_synced = true;
1211       logs_to_sync.push_back(log.writer);
1212     }
1213 
1214     need_log_dir_sync = !log_dir_synced_;
1215   }
1216 
1217   TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
1218   RecordTick(stats_, WAL_FILE_SYNCED);
1219   Status status;
1220   for (log::Writer* log : logs_to_sync) {
1221     status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1222     if (!status.ok()) {
1223       break;
1224     }
1225   }
1226   if (status.ok() && need_log_dir_sync) {
1227     status = directories_.GetWalDir()->Fsync();
1228   }
1229   TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
1230 
1231   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1232   {
1233     InstrumentedMutexLock l(&mutex_);
1234     MarkLogsSynced(current_log_number, need_log_dir_sync, status);
1235   }
1236   TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1237 
1238   return status;
1239 }
1240 
LockWAL()1241 Status DBImpl::LockWAL() {
1242   log_write_mutex_.Lock();
1243   auto cur_log_writer = logs_.back().writer;
1244   auto status = cur_log_writer->WriteBuffer();
1245   if (!status.ok()) {
1246     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1247                     status.ToString().c_str());
1248     // In case there is a fs error we should set it globally to prevent the
1249     // future writes
1250     WriteStatusCheck(status);
1251   }
1252   return status;
1253 }
1254 
UnlockWAL()1255 Status DBImpl::UnlockWAL() {
1256   log_write_mutex_.Unlock();
1257   return Status::OK();
1258 }
1259 
MarkLogsSynced(uint64_t up_to,bool synced_dir,const Status & status)1260 void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1261                             const Status& status) {
1262   mutex_.AssertHeld();
1263   if (synced_dir && logfile_number_ == up_to && status.ok()) {
1264     log_dir_synced_ = true;
1265   }
1266   for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1267     auto& log = *it;
1268     assert(log.getting_synced);
1269     if (status.ok() && logs_.size() > 1) {
1270       logs_to_free_.push_back(log.ReleaseWriter());
1271       // To modify logs_ both mutex_ and log_write_mutex_ must be held
1272       InstrumentedMutexLock l(&log_write_mutex_);
1273       it = logs_.erase(it);
1274     } else {
1275       log.getting_synced = false;
1276       ++it;
1277     }
1278   }
1279   assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
1280          (logs_.size() == 1 && !logs_[0].getting_synced));
1281   log_sync_cv_.SignalAll();
1282 }
1283 
GetLatestSequenceNumber() const1284 SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1285   return versions_->LastSequence();
1286 }
1287 
SetLastPublishedSequence(SequenceNumber seq)1288 void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1289   versions_->SetLastPublishedSequence(seq);
1290 }
1291 
SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)1292 bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1293   if (seqnum > preserve_deletes_seqnum_.load()) {
1294     preserve_deletes_seqnum_.store(seqnum);
1295     return true;
1296   } else {
1297     return false;
1298   }
1299 }
1300 
NewInternalIterator(Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,ColumnFamilyHandle * column_family)1301 InternalIterator* DBImpl::NewInternalIterator(
1302     Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
1303     ColumnFamilyHandle* column_family) {
1304   ColumnFamilyData* cfd;
1305   if (column_family == nullptr) {
1306     cfd = default_cf_handle_->cfd();
1307   } else {
1308     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1309     cfd = cfh->cfd();
1310   }
1311 
1312   mutex_.Lock();
1313   SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1314   mutex_.Unlock();
1315   ReadOptions roptions;
1316   return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
1317                              sequence);
1318 }
1319 
SchedulePurge()1320 void DBImpl::SchedulePurge() {
1321   mutex_.AssertHeld();
1322   assert(opened_successfully_);
1323 
1324   // Purge operations are put into High priority queue
1325   bg_purge_scheduled_++;
1326   env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1327 }
1328 
BackgroundCallPurge()1329 void DBImpl::BackgroundCallPurge() {
1330   mutex_.Lock();
1331 
1332   while (!logs_to_free_queue_.empty()) {
1333     assert(!logs_to_free_queue_.empty());
1334     log::Writer* log_writer = *(logs_to_free_queue_.begin());
1335     logs_to_free_queue_.pop_front();
1336     mutex_.Unlock();
1337     delete log_writer;
1338     mutex_.Lock();
1339   }
1340   while (!superversions_to_free_queue_.empty()) {
1341     assert(!superversions_to_free_queue_.empty());
1342     SuperVersion* sv = superversions_to_free_queue_.front();
1343     superversions_to_free_queue_.pop_front();
1344     mutex_.Unlock();
1345     delete sv;
1346     mutex_.Lock();
1347   }
1348 
1349   // Can't use iterator to go over purge_files_ because inside the loop we're
1350   // unlocking the mutex that protects purge_files_.
1351   while (!purge_files_.empty()) {
1352     auto it = purge_files_.begin();
1353     // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1354     PurgeFileInfo purge_file = it->second;
1355 
1356     const std::string& fname = purge_file.fname;
1357     const std::string& dir_to_sync = purge_file.dir_to_sync;
1358     FileType type = purge_file.type;
1359     uint64_t number = purge_file.number;
1360     int job_id = purge_file.job_id;
1361 
1362     purge_files_.erase(it);
1363 
1364     mutex_.Unlock();
1365     DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1366     mutex_.Lock();
1367   }
1368 
1369   bg_purge_scheduled_--;
1370 
1371   bg_cv_.SignalAll();
1372   // IMPORTANT:there should be no code after calling SignalAll. This call may
1373   // signal the DB destructor that it's OK to proceed with destruction. In
1374   // that case, all DB variables will be dealloacated and referencing them
1375   // will cause trouble.
1376   mutex_.Unlock();
1377 }
1378 
1379 namespace {
1380 struct IterState {
IterStaterocksdb::__anon86dd08d00411::IterState1381   IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1382             bool _background_purge)
1383       : db(_db),
1384         mu(_mu),
1385         super_version(_super_version),
1386         background_purge(_background_purge) {}
1387 
1388   DBImpl* db;
1389   InstrumentedMutex* mu;
1390   SuperVersion* super_version;
1391   bool background_purge;
1392 };
1393 
CleanupIteratorState(void * arg1,void *)1394 static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
1395   IterState* state = reinterpret_cast<IterState*>(arg1);
1396 
1397   if (state->super_version->Unref()) {
1398     // Job id == 0 means that this is not our background process, but rather
1399     // user thread
1400     JobContext job_context(0);
1401 
1402     state->mu->Lock();
1403     state->super_version->Cleanup();
1404     state->db->FindObsoleteFiles(&job_context, false, true);
1405     if (state->background_purge) {
1406       state->db->ScheduleBgLogWriterClose(&job_context);
1407       state->db->AddSuperVersionsToFreeQueue(state->super_version);
1408       state->db->SchedulePurge();
1409     }
1410     state->mu->Unlock();
1411 
1412     if (!state->background_purge) {
1413       delete state->super_version;
1414     }
1415     if (job_context.HaveSomethingToDelete()) {
1416       if (state->background_purge) {
1417         // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1418         // files to be deleted to a job queue, and deletes it in a separate
1419         // background thread.
1420         state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1421         state->mu->Lock();
1422         state->db->SchedulePurge();
1423         state->mu->Unlock();
1424       } else {
1425         state->db->PurgeObsoleteFiles(job_context);
1426       }
1427     }
1428     job_context.Clean();
1429   }
1430 
1431   delete state;
1432 }
1433 }  // namespace
1434 
NewInternalIterator(const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * super_version,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence)1435 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1436                                               ColumnFamilyData* cfd,
1437                                               SuperVersion* super_version,
1438                                               Arena* arena,
1439                                               RangeDelAggregator* range_del_agg,
1440                                               SequenceNumber sequence) {
1441   InternalIterator* internal_iter;
1442   assert(arena != nullptr);
1443   assert(range_del_agg != nullptr);
1444   // Need to create internal iterator from the arena.
1445   MergeIteratorBuilder merge_iter_builder(
1446       &cfd->internal_comparator(), arena,
1447       !read_options.total_order_seek &&
1448           super_version->mutable_cf_options.prefix_extractor != nullptr);
1449   // Collect iterator for mutable mem
1450   merge_iter_builder.AddIterator(
1451       super_version->mem->NewIterator(read_options, arena));
1452   std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
1453   Status s;
1454   if (!read_options.ignore_range_deletions) {
1455     range_del_iter.reset(
1456         super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1457     range_del_agg->AddTombstones(std::move(range_del_iter));
1458   }
1459   // Collect all needed child iterators for immutable memtables
1460   if (s.ok()) {
1461     super_version->imm->AddIterators(read_options, &merge_iter_builder);
1462     if (!read_options.ignore_range_deletions) {
1463       s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1464                                                          range_del_agg);
1465     }
1466   }
1467   TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
1468   if (s.ok()) {
1469     // Collect iterators for files in L0 - Ln
1470     if (read_options.read_tier != kMemtableTier) {
1471       super_version->current->AddIterators(read_options, file_options_,
1472                                            &merge_iter_builder, range_del_agg);
1473     }
1474     internal_iter = merge_iter_builder.Finish();
1475     IterState* cleanup =
1476         new IterState(this, &mutex_, super_version,
1477                       read_options.background_purge_on_iterator_cleanup ||
1478                       immutable_db_options_.avoid_unnecessary_blocking_io);
1479     internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1480 
1481     return internal_iter;
1482   } else {
1483     CleanupSuperVersion(super_version);
1484   }
1485   return NewErrorInternalIterator<Slice>(s, arena);
1486 }
1487 
DefaultColumnFamily() const1488 ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1489   return default_cf_handle_;
1490 }
1491 
PersistentStatsColumnFamily() const1492 ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1493   return persist_stats_cf_handle_;
1494 }
1495 
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1496 Status DBImpl::Get(const ReadOptions& read_options,
1497                    ColumnFamilyHandle* column_family, const Slice& key,
1498                    PinnableSlice* value) {
1499   GetImplOptions get_impl_options;
1500   get_impl_options.column_family = column_family;
1501   get_impl_options.value = value;
1502   return GetImpl(read_options, key, get_impl_options);
1503 }
1504 
GetImpl(const ReadOptions & read_options,const Slice & key,GetImplOptions get_impl_options)1505 Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
1506                        GetImplOptions get_impl_options) {
1507   assert(get_impl_options.value != nullptr ||
1508          get_impl_options.merge_operands != nullptr);
1509   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1510   StopWatch sw(env_, stats_, DB_GET);
1511   PERF_TIMER_GUARD(get_snapshot_time);
1512 
1513   auto cfh =
1514       reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
1515   auto cfd = cfh->cfd();
1516 
1517   if (tracer_) {
1518     // TODO: This mutex should be removed later, to improve performance when
1519     // tracing is enabled.
1520     InstrumentedMutexLock lock(&trace_mutex_);
1521     if (tracer_) {
1522       tracer_->Get(get_impl_options.column_family, key);
1523     }
1524   }
1525 
1526   // Acquire SuperVersion
1527   SuperVersion* sv = GetAndRefSuperVersion(cfd);
1528 
1529   TEST_SYNC_POINT("DBImpl::GetImpl:1");
1530   TEST_SYNC_POINT("DBImpl::GetImpl:2");
1531 
1532   SequenceNumber snapshot;
1533   if (read_options.snapshot != nullptr) {
1534     if (get_impl_options.callback) {
1535       // Already calculated based on read_options.snapshot
1536       snapshot = get_impl_options.callback->max_visible_seq();
1537     } else {
1538       snapshot =
1539           reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1540     }
1541   } else {
1542     // Note that the snapshot is assigned AFTER referencing the super
1543     // version because otherwise a flush happening in between may compact away
1544     // data for the snapshot, so the reader would see neither data that was be
1545     // visible to the snapshot before compaction nor the newer data inserted
1546     // afterwards.
1547     snapshot = last_seq_same_as_publish_seq_
1548                    ? versions_->LastSequence()
1549                    : versions_->LastPublishedSequence();
1550     if (get_impl_options.callback) {
1551       // The unprep_seqs are not published for write unprepared, so it could be
1552       // that max_visible_seq is larger. Seek to the std::max of the two.
1553       // However, we still want our callback to contain the actual snapshot so
1554       // that it can do the correct visibility filtering.
1555       get_impl_options.callback->Refresh(snapshot);
1556 
1557       // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1558       // max_visible_seq = max(max_visible_seq, snapshot)
1559       //
1560       // Currently, the commented out assert is broken by
1561       // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1562       // the regular transaction flow, then this special read callback would not
1563       // be needed.
1564       //
1565       // assert(callback->max_visible_seq() >= snapshot);
1566       snapshot = get_impl_options.callback->max_visible_seq();
1567     }
1568   }
1569   TEST_SYNC_POINT("DBImpl::GetImpl:3");
1570   TEST_SYNC_POINT("DBImpl::GetImpl:4");
1571 
1572   // Prepare to store a list of merge operations if merge occurs.
1573   MergeContext merge_context;
1574   SequenceNumber max_covering_tombstone_seq = 0;
1575 
1576   Status s;
1577   // First look in the memtable, then in the immutable memtable (if any).
1578   // s is both in/out. When in, s could either be OK or MergeInProgress.
1579   // merge_operands will contain the sequence of merges in the latter case.
1580   LookupKey lkey(key, snapshot, read_options.timestamp);
1581   PERF_TIMER_STOP(get_snapshot_time);
1582 
1583   bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1584                         has_unpersisted_data_.load(std::memory_order_relaxed));
1585   bool done = false;
1586   if (!skip_memtable) {
1587     // Get value associated with key
1588     if (get_impl_options.get_value) {
1589       if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
1590                        &merge_context, &max_covering_tombstone_seq,
1591                        read_options, get_impl_options.callback,
1592                        get_impl_options.is_blob_index)) {
1593         done = true;
1594         get_impl_options.value->PinSelf();
1595         RecordTick(stats_, MEMTABLE_HIT);
1596       } else if ((s.ok() || s.IsMergeInProgress()) &&
1597                  sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
1598                               &merge_context, &max_covering_tombstone_seq,
1599                               read_options, get_impl_options.callback,
1600                               get_impl_options.is_blob_index)) {
1601         done = true;
1602         get_impl_options.value->PinSelf();
1603         RecordTick(stats_, MEMTABLE_HIT);
1604       }
1605     } else {
1606       // Get Merge Operands associated with key, Merge Operands should not be
1607       // merged and raw values should be returned to the user.
1608       if (sv->mem->Get(lkey, nullptr, &s, &merge_context,
1609                        &max_covering_tombstone_seq, read_options, nullptr,
1610                        nullptr, false)) {
1611         done = true;
1612         RecordTick(stats_, MEMTABLE_HIT);
1613       } else if ((s.ok() || s.IsMergeInProgress()) &&
1614                  sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1615                                            &max_covering_tombstone_seq,
1616                                            read_options)) {
1617         done = true;
1618         RecordTick(stats_, MEMTABLE_HIT);
1619       }
1620     }
1621     if (!done && !s.ok() && !s.IsMergeInProgress()) {
1622       ReturnAndCleanupSuperVersion(cfd, sv);
1623       return s;
1624     }
1625   }
1626   if (!done) {
1627     PERF_TIMER_GUARD(get_from_output_files_time);
1628     sv->current->Get(
1629         read_options, lkey, get_impl_options.value, &s, &merge_context,
1630         &max_covering_tombstone_seq,
1631         get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1632         nullptr, nullptr,
1633         get_impl_options.get_value ? get_impl_options.callback : nullptr,
1634         get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1635         get_impl_options.get_value);
1636     RecordTick(stats_, MEMTABLE_MISS);
1637   }
1638 
1639   {
1640     PERF_TIMER_GUARD(get_post_process_time);
1641 
1642     ReturnAndCleanupSuperVersion(cfd, sv);
1643 
1644     RecordTick(stats_, NUMBER_KEYS_READ);
1645     size_t size = 0;
1646     if (s.ok()) {
1647       if (get_impl_options.get_value) {
1648         size = get_impl_options.value->size();
1649       } else {
1650         // Return all merge operands for get_impl_options.key
1651         *get_impl_options.number_of_operands =
1652             static_cast<int>(merge_context.GetNumOperands());
1653         if (*get_impl_options.number_of_operands >
1654             get_impl_options.get_merge_operands_options
1655                 ->expected_max_number_of_operands) {
1656           s = Status::Incomplete(
1657               Status::SubCode::KMergeOperandsInsufficientCapacity);
1658         } else {
1659           for (const Slice& sl : merge_context.GetOperands()) {
1660             size += sl.size();
1661             get_impl_options.merge_operands->PinSelf(sl);
1662             get_impl_options.merge_operands++;
1663           }
1664         }
1665       }
1666       RecordTick(stats_, BYTES_READ, size);
1667       PERF_COUNTER_ADD(get_read_bytes, size);
1668     }
1669     RecordInHistogram(stats_, BYTES_PER_READ, size);
1670   }
1671   return s;
1672 }
1673 
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)1674 std::vector<Status> DBImpl::MultiGet(
1675     const ReadOptions& read_options,
1676     const std::vector<ColumnFamilyHandle*>& column_family,
1677     const std::vector<Slice>& keys, std::vector<std::string>* values) {
1678   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1679   StopWatch sw(env_, stats_, DB_MULTIGET);
1680   PERF_TIMER_GUARD(get_snapshot_time);
1681 
1682   SequenceNumber consistent_seqnum;
1683   ;
1684 
1685   std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1686       column_family.size());
1687   for (auto cf : column_family) {
1688     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
1689     auto cfd = cfh->cfd();
1690     if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
1691       multiget_cf_data.emplace(cfd->GetID(),
1692                                MultiGetColumnFamilyData(cfh, nullptr));
1693     }
1694   }
1695 
1696   std::function<MultiGetColumnFamilyData*(
1697       std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1698       iter_deref_lambda =
1699           [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1700                  cf_iter) { return &cf_iter->second; };
1701 
1702   bool unref_only =
1703       MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1704           read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1705           &consistent_seqnum);
1706 
1707   // Contain a list of merge operations if merge occurs.
1708   MergeContext merge_context;
1709 
1710   // Note: this always resizes the values array
1711   size_t num_keys = keys.size();
1712   std::vector<Status> stat_list(num_keys);
1713   values->resize(num_keys);
1714 
1715   // Keep track of bytes that we read for statistics-recording later
1716   uint64_t bytes_read = 0;
1717   PERF_TIMER_STOP(get_snapshot_time);
1718 
1719   // For each of the given keys, apply the entire "get" process as follows:
1720   // First look in the memtable, then in the immutable memtable (if any).
1721   // s is both in/out. When in, s could either be OK or MergeInProgress.
1722   // merge_operands will contain the sequence of merges in the latter case.
1723   size_t num_found = 0;
1724   for (size_t i = 0; i < num_keys; ++i) {
1725     merge_context.Clear();
1726     Status& s = stat_list[i];
1727     std::string* value = &(*values)[i];
1728 
1729     LookupKey lkey(keys[i], consistent_seqnum);
1730     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
1731     SequenceNumber max_covering_tombstone_seq = 0;
1732     auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1733     assert(mgd_iter != multiget_cf_data.end());
1734     auto mgd = mgd_iter->second;
1735     auto super_version = mgd.super_version;
1736     bool skip_memtable =
1737         (read_options.read_tier == kPersistedTier &&
1738          has_unpersisted_data_.load(std::memory_order_relaxed));
1739     bool done = false;
1740     if (!skip_memtable) {
1741       if (super_version->mem->Get(lkey, value, &s, &merge_context,
1742                                   &max_covering_tombstone_seq, read_options)) {
1743         done = true;
1744         RecordTick(stats_, MEMTABLE_HIT);
1745       } else if (super_version->imm->Get(lkey, value, &s, &merge_context,
1746                                          &max_covering_tombstone_seq,
1747                                          read_options)) {
1748         done = true;
1749         RecordTick(stats_, MEMTABLE_HIT);
1750       }
1751     }
1752     if (!done) {
1753       PinnableSlice pinnable_val;
1754       PERF_TIMER_GUARD(get_from_output_files_time);
1755       super_version->current->Get(read_options, lkey, &pinnable_val, &s,
1756                                   &merge_context, &max_covering_tombstone_seq);
1757       value->assign(pinnable_val.data(), pinnable_val.size());
1758       RecordTick(stats_, MEMTABLE_MISS);
1759     }
1760 
1761     if (s.ok()) {
1762       bytes_read += value->size();
1763       num_found++;
1764     }
1765   }
1766 
1767   // Post processing (decrement reference counts and record statistics)
1768   PERF_TIMER_GUARD(get_post_process_time);
1769   autovector<SuperVersion*> superversions_to_delete;
1770 
1771   for (auto mgd_iter : multiget_cf_data) {
1772     auto mgd = mgd_iter.second;
1773     if (!unref_only) {
1774       ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
1775     } else {
1776       mgd.cfd->GetSuperVersion()->Unref();
1777     }
1778   }
1779   RecordTick(stats_, NUMBER_MULTIGET_CALLS);
1780   RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
1781   RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
1782   RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
1783   RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
1784   PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
1785   PERF_TIMER_STOP(get_post_process_time);
1786 
1787   return stat_list;
1788 }
1789 
1790 template <class T>
MultiCFSnapshot(const ReadOptions & read_options,ReadCallback * callback,std::function<MultiGetColumnFamilyData * (typename T::iterator &)> & iter_deref_func,T * cf_list,SequenceNumber * snapshot)1791 bool DBImpl::MultiCFSnapshot(
1792     const ReadOptions& read_options, ReadCallback* callback,
1793     std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
1794         iter_deref_func,
1795     T* cf_list, SequenceNumber* snapshot) {
1796   PERF_TIMER_GUARD(get_snapshot_time);
1797 
1798   bool last_try = false;
1799   if (cf_list->size() == 1) {
1800     // Fast path for a single column family. We can simply get the thread loca
1801     // super version
1802     auto cf_iter = cf_list->begin();
1803     auto node = iter_deref_func(cf_iter);
1804     node->super_version = GetAndRefSuperVersion(node->cfd);
1805     if (read_options.snapshot != nullptr) {
1806       // Note: In WritePrepared txns this is not necessary but not harmful
1807       // either.  Because prep_seq > snapshot => commit_seq > snapshot so if
1808       // a snapshot is specified we should be fine with skipping seq numbers
1809       // that are greater than that.
1810       //
1811       // In WriteUnprepared, we cannot set snapshot in the lookup key because we
1812       // may skip uncommitted data that should be visible to the transaction for
1813       // reading own writes.
1814       *snapshot =
1815           static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1816       if (callback) {
1817         *snapshot = std::max(*snapshot, callback->max_visible_seq());
1818       }
1819     } else {
1820       // Since we get and reference the super version before getting
1821       // the snapshot number, without a mutex protection, it is possible
1822       // that a memtable switch happened in the middle and not all the
1823       // data for this snapshot is available. But it will contain all
1824       // the data available in the super version we have, which is also
1825       // a valid snapshot to read from.
1826       // We shouldn't get snapshot before finding and referencing the super
1827       // version because a flush happening in between may compact away data for
1828       // the snapshot, but the snapshot is earlier than the data overwriting it,
1829       // so users may see wrong results.
1830       *snapshot = last_seq_same_as_publish_seq_
1831                       ? versions_->LastSequence()
1832                       : versions_->LastPublishedSequence();
1833     }
1834   } else {
1835     // If we end up with the same issue of memtable geting sealed during 2
1836     // consecutive retries, it means the write rate is very high. In that case
1837     // its probably ok to take the mutex on the 3rd try so we can succeed for
1838     // sure
1839     static const int num_retries = 3;
1840     for (int i = 0; i < num_retries; ++i) {
1841       last_try = (i == num_retries - 1);
1842       bool retry = false;
1843 
1844       if (i > 0) {
1845         for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1846              ++cf_iter) {
1847           auto node = iter_deref_func(cf_iter);
1848           SuperVersion* super_version = node->super_version;
1849           ColumnFamilyData* cfd = node->cfd;
1850           if (super_version != nullptr) {
1851             ReturnAndCleanupSuperVersion(cfd, super_version);
1852           }
1853           node->super_version = nullptr;
1854         }
1855       }
1856       if (read_options.snapshot == nullptr) {
1857         if (last_try) {
1858           TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
1859           // We're close to max number of retries. For the last retry,
1860           // acquire the lock so we're sure to succeed
1861           mutex_.Lock();
1862         }
1863         *snapshot = last_seq_same_as_publish_seq_
1864                         ? versions_->LastSequence()
1865                         : versions_->LastPublishedSequence();
1866       } else {
1867         *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
1868                         ->number_;
1869       }
1870       for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1871            ++cf_iter) {
1872         auto node = iter_deref_func(cf_iter);
1873         if (!last_try) {
1874           node->super_version = GetAndRefSuperVersion(node->cfd);
1875         } else {
1876           node->super_version = node->cfd->GetSuperVersion()->Ref();
1877         }
1878         TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
1879         if (read_options.snapshot != nullptr || last_try) {
1880           // If user passed a snapshot, then we don't care if a memtable is
1881           // sealed or compaction happens because the snapshot would ensure
1882           // that older key versions are kept around. If this is the last
1883           // retry, then we have the lock so nothing bad can happen
1884           continue;
1885         }
1886         // We could get the earliest sequence number for the whole list of
1887         // memtables, which will include immutable memtables as well, but that
1888         // might be tricky to maintain in case we decide, in future, to do
1889         // memtable compaction.
1890         if (!last_try) {
1891           SequenceNumber seq =
1892               node->super_version->mem->GetEarliestSequenceNumber();
1893           if (seq > *snapshot) {
1894             retry = true;
1895             break;
1896           }
1897         }
1898       }
1899       if (!retry) {
1900         if (last_try) {
1901           mutex_.Unlock();
1902         }
1903         break;
1904       }
1905     }
1906   }
1907 
1908   // Keep track of bytes that we read for statistics-recording later
1909   PERF_TIMER_STOP(get_snapshot_time);
1910 
1911   return last_try;
1912 }
1913 
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)1914 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
1915                       ColumnFamilyHandle** column_families, const Slice* keys,
1916                       PinnableSlice* values, Status* statuses,
1917                       const bool sorted_input) {
1918   if (num_keys == 0) {
1919     return;
1920   }
1921   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
1922   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
1923   sorted_keys.resize(num_keys);
1924   for (size_t i = 0; i < num_keys; ++i) {
1925     key_context.emplace_back(column_families[i], keys[i], &values[i],
1926                              &statuses[i]);
1927   }
1928   for (size_t i = 0; i < num_keys; ++i) {
1929     sorted_keys[i] = &key_context[i];
1930   }
1931   PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
1932 
1933   autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
1934       multiget_cf_data;
1935   size_t cf_start = 0;
1936   ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
1937   for (size_t i = 0; i < num_keys; ++i) {
1938     KeyContext* key_ctx = sorted_keys[i];
1939     if (key_ctx->column_family != cf) {
1940       multiget_cf_data.emplace_back(
1941           MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
1942       cf_start = i;
1943       cf = key_ctx->column_family;
1944     }
1945   }
1946   {
1947     // multiget_cf_data.emplace_back(
1948     // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
1949     multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
1950   }
1951   std::function<MultiGetColumnFamilyData*(
1952       autovector<MultiGetColumnFamilyData,
1953                  MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
1954       iter_deref_lambda =
1955           [](autovector<MultiGetColumnFamilyData,
1956                         MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
1957             return &(*cf_iter);
1958           };
1959 
1960   SequenceNumber consistent_seqnum;
1961   bool unref_only = MultiCFSnapshot<
1962       autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
1963       read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1964       &consistent_seqnum);
1965 
1966   for (auto cf_iter = multiget_cf_data.begin();
1967        cf_iter != multiget_cf_data.end(); ++cf_iter) {
1968     MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
1969                  cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
1970     if (!unref_only) {
1971       ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
1972     } else {
1973       cf_iter->cfd->GetSuperVersion()->Unref();
1974     }
1975   }
1976 }
1977 
1978 namespace {
1979 // Order keys by CF ID, followed by key contents
1980 struct CompareKeyContext {
operator ()rocksdb::__anon86dd08d00711::CompareKeyContext1981   inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
1982     ColumnFamilyHandleImpl* cfh =
1983         static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
1984     uint32_t cfd_id1 = cfh->cfd()->GetID();
1985     const Comparator* comparator = cfh->cfd()->user_comparator();
1986     cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
1987     uint32_t cfd_id2 = cfh->cfd()->GetID();
1988 
1989     if (cfd_id1 < cfd_id2) {
1990       return true;
1991     } else if (cfd_id1 > cfd_id2) {
1992       return false;
1993     }
1994 
1995     // Both keys are from the same column family
1996     int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
1997     if (cmp < 0) {
1998       return true;
1999     }
2000     return false;
2001   }
2002 };
2003 
2004 }  // anonymous namespace
2005 
PrepareMultiGetKeys(size_t num_keys,bool sorted_input,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2006 void DBImpl::PrepareMultiGetKeys(
2007     size_t num_keys, bool sorted_input,
2008     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2009 #ifndef NDEBUG
2010   if (sorted_input) {
2011     for (size_t index = 0; index < sorted_keys->size(); ++index) {
2012       if (index > 0) {
2013         KeyContext* lhs = (*sorted_keys)[index - 1];
2014         KeyContext* rhs = (*sorted_keys)[index];
2015         ColumnFamilyHandleImpl* cfh =
2016             reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2017         uint32_t cfd_id1 = cfh->cfd()->GetID();
2018         const Comparator* comparator = cfh->cfd()->user_comparator();
2019         cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2020         uint32_t cfd_id2 = cfh->cfd()->GetID();
2021 
2022         assert(cfd_id1 <= cfd_id2);
2023         if (cfd_id1 < cfd_id2) {
2024           continue;
2025         }
2026 
2027         // Both keys are from the same column family
2028         int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
2029         assert(cmp <= 0);
2030       }
2031       index++;
2032     }
2033   }
2034 #endif
2035   if (!sorted_input) {
2036     CompareKeyContext sort_comparator;
2037     std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2038               sort_comparator);
2039   }
2040 }
2041 
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2042 void DBImpl::MultiGet(const ReadOptions& read_options,
2043                       ColumnFamilyHandle* column_family, const size_t num_keys,
2044                       const Slice* keys, PinnableSlice* values,
2045                       Status* statuses, const bool sorted_input) {
2046   autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2047   autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2048   sorted_keys.resize(num_keys);
2049   for (size_t i = 0; i < num_keys; ++i) {
2050     key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]);
2051   }
2052   for (size_t i = 0; i < num_keys; ++i) {
2053     sorted_keys[i] = &key_context[i];
2054   }
2055   PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2056   MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2057 }
2058 
MultiGetWithCallback(const ReadOptions & read_options,ColumnFamilyHandle * column_family,ReadCallback * callback,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2059 void DBImpl::MultiGetWithCallback(
2060     const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2061     ReadCallback* callback,
2062     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2063   std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2064   multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2065   std::function<MultiGetColumnFamilyData*(
2066       std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2067       iter_deref_lambda =
2068           [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2069             return &(*cf_iter);
2070           };
2071 
2072   size_t num_keys = sorted_keys->size();
2073   SequenceNumber consistent_seqnum;
2074   bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2075       read_options, callback, iter_deref_lambda, &multiget_cf_data,
2076       &consistent_seqnum);
2077 #ifndef NDEBUG
2078   assert(!unref_only);
2079 #else
2080   // Silence unused variable warning
2081   (void)unref_only;
2082 #endif  // NDEBUG
2083 
2084   if (callback && read_options.snapshot == nullptr) {
2085     // The unprep_seqs are not published for write unprepared, so it could be
2086     // that max_visible_seq is larger. Seek to the std::max of the two.
2087     // However, we still want our callback to contain the actual snapshot so
2088     // that it can do the correct visibility filtering.
2089     callback->Refresh(consistent_seqnum);
2090 
2091     // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2092     // max_visible_seq = max(max_visible_seq, snapshot)
2093     //
2094     // Currently, the commented out assert is broken by
2095     // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2096     // the regular transaction flow, then this special read callback would not
2097     // be needed.
2098     //
2099     // assert(callback->max_visible_seq() >= snapshot);
2100     consistent_seqnum = callback->max_visible_seq();
2101   }
2102 
2103   MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2104                multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
2105                nullptr);
2106   ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2107                                multiget_cf_data[0].super_version);
2108 }
2109 
MultiGetImpl(const ReadOptions & read_options,size_t start_key,size_t num_keys,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys,SuperVersion * super_version,SequenceNumber snapshot,ReadCallback * callback,bool * is_blob_index)2110 void DBImpl::MultiGetImpl(
2111     const ReadOptions& read_options, size_t start_key, size_t num_keys,
2112     autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2113     SuperVersion* super_version, SequenceNumber snapshot,
2114     ReadCallback* callback, bool* is_blob_index) {
2115   PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
2116   StopWatch sw(env_, stats_, DB_MULTIGET);
2117 
2118   // For each of the given keys, apply the entire "get" process as follows:
2119   // First look in the memtable, then in the immutable memtable (if any).
2120   // s is both in/out. When in, s could either be OK or MergeInProgress.
2121   // merge_operands will contain the sequence of merges in the latter case.
2122   size_t keys_left = num_keys;
2123   while (keys_left) {
2124     size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2125                             ? MultiGetContext::MAX_BATCH_SIZE
2126                             : keys_left;
2127     MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
2128                         batch_size, snapshot);
2129     MultiGetRange range = ctx.GetMultiGetRange();
2130     bool lookup_current = false;
2131 
2132     keys_left -= batch_size;
2133     for (auto mget_iter = range.begin(); mget_iter != range.end();
2134          ++mget_iter) {
2135       mget_iter->merge_context.Clear();
2136       *mget_iter->s = Status::OK();
2137     }
2138 
2139     bool skip_memtable =
2140         (read_options.read_tier == kPersistedTier &&
2141          has_unpersisted_data_.load(std::memory_order_relaxed));
2142     if (!skip_memtable) {
2143       super_version->mem->MultiGet(read_options, &range, callback,
2144                                    is_blob_index);
2145       if (!range.empty()) {
2146         super_version->imm->MultiGet(read_options, &range, callback,
2147                                      is_blob_index);
2148       }
2149       if (!range.empty()) {
2150         lookup_current = true;
2151         uint64_t left = range.KeysLeft();
2152         RecordTick(stats_, MEMTABLE_MISS, left);
2153       }
2154     }
2155     if (lookup_current) {
2156       PERF_TIMER_GUARD(get_from_output_files_time);
2157       super_version->current->MultiGet(read_options, &range, callback,
2158                                        is_blob_index);
2159     }
2160   }
2161 
2162   // Post processing (decrement reference counts and record statistics)
2163   PERF_TIMER_GUARD(get_post_process_time);
2164   size_t num_found = 0;
2165   uint64_t bytes_read = 0;
2166   for (size_t i = start_key; i < start_key + num_keys; ++i) {
2167     KeyContext* key = (*sorted_keys)[i];
2168     if (key->s->ok()) {
2169       bytes_read += key->value->size();
2170       num_found++;
2171     }
2172   }
2173 
2174   RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2175   RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2176   RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2177   RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2178   RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2179   PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2180   PERF_TIMER_STOP(get_post_process_time);
2181 }
2182 
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const std::string & column_family,ColumnFamilyHandle ** handle)2183 Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2184                                   const std::string& column_family,
2185                                   ColumnFamilyHandle** handle) {
2186   assert(handle != nullptr);
2187   Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2188   if (s.ok()) {
2189     s = WriteOptionsFile(true /*need_mutex_lock*/,
2190                          true /*need_enter_write_thread*/);
2191   }
2192   return s;
2193 }
2194 
CreateColumnFamilies(const ColumnFamilyOptions & cf_options,const std::vector<std::string> & column_family_names,std::vector<ColumnFamilyHandle * > * handles)2195 Status DBImpl::CreateColumnFamilies(
2196     const ColumnFamilyOptions& cf_options,
2197     const std::vector<std::string>& column_family_names,
2198     std::vector<ColumnFamilyHandle*>* handles) {
2199   assert(handles != nullptr);
2200   handles->clear();
2201   size_t num_cf = column_family_names.size();
2202   Status s;
2203   bool success_once = false;
2204   for (size_t i = 0; i < num_cf; i++) {
2205     ColumnFamilyHandle* handle;
2206     s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2207     if (!s.ok()) {
2208       break;
2209     }
2210     handles->push_back(handle);
2211     success_once = true;
2212   }
2213   if (success_once) {
2214     Status persist_options_status = WriteOptionsFile(
2215         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2216     if (s.ok() && !persist_options_status.ok()) {
2217       s = persist_options_status;
2218     }
2219   }
2220   return s;
2221 }
2222 
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles)2223 Status DBImpl::CreateColumnFamilies(
2224     const std::vector<ColumnFamilyDescriptor>& column_families,
2225     std::vector<ColumnFamilyHandle*>* handles) {
2226   assert(handles != nullptr);
2227   handles->clear();
2228   size_t num_cf = column_families.size();
2229   Status s;
2230   bool success_once = false;
2231   for (size_t i = 0; i < num_cf; i++) {
2232     ColumnFamilyHandle* handle;
2233     s = CreateColumnFamilyImpl(column_families[i].options,
2234                                column_families[i].name, &handle);
2235     if (!s.ok()) {
2236       break;
2237     }
2238     handles->push_back(handle);
2239     success_once = true;
2240   }
2241   if (success_once) {
2242     Status persist_options_status = WriteOptionsFile(
2243         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2244     if (s.ok() && !persist_options_status.ok()) {
2245       s = persist_options_status;
2246     }
2247   }
2248   return s;
2249 }
2250 
CreateColumnFamilyImpl(const ColumnFamilyOptions & cf_options,const std::string & column_family_name,ColumnFamilyHandle ** handle)2251 Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2252                                       const std::string& column_family_name,
2253                                       ColumnFamilyHandle** handle) {
2254   Status s;
2255   Status persist_options_status;
2256   *handle = nullptr;
2257 
2258   DBOptions db_options =
2259       BuildDBOptions(immutable_db_options_, mutable_db_options_);
2260   s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
2261   if (s.ok()) {
2262     for (auto& cf_path : cf_options.cf_paths) {
2263       s = env_->CreateDirIfMissing(cf_path.path);
2264       if (!s.ok()) {
2265         break;
2266       }
2267     }
2268   }
2269   if (!s.ok()) {
2270     return s;
2271   }
2272 
2273   SuperVersionContext sv_context(/* create_superversion */ true);
2274   {
2275     InstrumentedMutexLock l(&mutex_);
2276 
2277     if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2278         nullptr) {
2279       return Status::InvalidArgument("Column family already exists");
2280     }
2281     VersionEdit edit;
2282     edit.AddColumnFamily(column_family_name);
2283     uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2284     edit.SetColumnFamily(new_id);
2285     edit.SetLogNumber(logfile_number_);
2286     edit.SetComparatorName(cf_options.comparator->Name());
2287 
2288     // LogAndApply will both write the creation in MANIFEST and create
2289     // ColumnFamilyData object
2290     {  // write thread
2291       WriteThread::Writer w;
2292       write_thread_.EnterUnbatched(&w, &mutex_);
2293       // LogAndApply will both write the creation in MANIFEST and create
2294       // ColumnFamilyData object
2295       s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2296                                  &mutex_, directories_.GetDbDir(), false,
2297                                  &cf_options);
2298       write_thread_.ExitUnbatched(&w);
2299     }
2300     if (s.ok()) {
2301       auto* cfd =
2302           versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2303       assert(cfd != nullptr);
2304       s = cfd->AddDirectories();
2305     }
2306     if (s.ok()) {
2307       single_column_family_mode_ = false;
2308       auto* cfd =
2309           versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2310       assert(cfd != nullptr);
2311       InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2312                                          *cfd->GetLatestMutableCFOptions());
2313 
2314       if (!cfd->mem()->IsSnapshotSupported()) {
2315         is_snapshot_supported_ = false;
2316       }
2317 
2318       cfd->set_initialized();
2319 
2320       *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2321       ROCKS_LOG_INFO(immutable_db_options_.info_log,
2322                      "Created column family [%s] (ID %u)",
2323                      column_family_name.c_str(), (unsigned)cfd->GetID());
2324     } else {
2325       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2326                       "Creating column family [%s] FAILED -- %s",
2327                       column_family_name.c_str(), s.ToString().c_str());
2328     }
2329   }  // InstrumentedMutexLock l(&mutex_)
2330 
2331   sv_context.Clean();
2332   // this is outside the mutex
2333   if (s.ok()) {
2334     NewThreadStatusCfInfo(
2335         reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
2336   }
2337   return s;
2338 }
2339 
DropColumnFamily(ColumnFamilyHandle * column_family)2340 Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
2341   assert(column_family != nullptr);
2342   Status s = DropColumnFamilyImpl(column_family);
2343   if (s.ok()) {
2344     s = WriteOptionsFile(true /*need_mutex_lock*/,
2345                          true /*need_enter_write_thread*/);
2346   }
2347   return s;
2348 }
2349 
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > & column_families)2350 Status DBImpl::DropColumnFamilies(
2351     const std::vector<ColumnFamilyHandle*>& column_families) {
2352   Status s;
2353   bool success_once = false;
2354   for (auto* handle : column_families) {
2355     s = DropColumnFamilyImpl(handle);
2356     if (!s.ok()) {
2357       break;
2358     }
2359     success_once = true;
2360   }
2361   if (success_once) {
2362     Status persist_options_status = WriteOptionsFile(
2363         true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2364     if (s.ok() && !persist_options_status.ok()) {
2365       s = persist_options_status;
2366     }
2367   }
2368   return s;
2369 }
2370 
DropColumnFamilyImpl(ColumnFamilyHandle * column_family)2371 Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
2372   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2373   auto cfd = cfh->cfd();
2374   if (cfd->GetID() == 0) {
2375     return Status::InvalidArgument("Can't drop default column family");
2376   }
2377 
2378   bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2379 
2380   VersionEdit edit;
2381   edit.DropColumnFamily();
2382   edit.SetColumnFamily(cfd->GetID());
2383 
2384   Status s;
2385   {
2386     InstrumentedMutexLock l(&mutex_);
2387     if (cfd->IsDropped()) {
2388       s = Status::InvalidArgument("Column family already dropped!\n");
2389     }
2390     if (s.ok()) {
2391       // we drop column family from a single write thread
2392       WriteThread::Writer w;
2393       write_thread_.EnterUnbatched(&w, &mutex_);
2394       s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2395                                  &mutex_);
2396       write_thread_.ExitUnbatched(&w);
2397     }
2398     if (s.ok()) {
2399       auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2400       max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2401                                     mutable_cf_options->max_write_buffer_number;
2402     }
2403 
2404     if (!cf_support_snapshot) {
2405       // Dropped Column Family doesn't support snapshot. Need to recalculate
2406       // is_snapshot_supported_.
2407       bool new_is_snapshot_supported = true;
2408       for (auto c : *versions_->GetColumnFamilySet()) {
2409         if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2410           new_is_snapshot_supported = false;
2411           break;
2412         }
2413       }
2414       is_snapshot_supported_ = new_is_snapshot_supported;
2415     }
2416     bg_cv_.SignalAll();
2417   }
2418 
2419   if (s.ok()) {
2420     // Note that here we erase the associated cf_info of the to-be-dropped
2421     // cfd before its ref-count goes to zero to avoid having to erase cf_info
2422     // later inside db_mutex.
2423     EraseThreadStatusCfInfo(cfd);
2424     assert(cfd->IsDropped());
2425     ROCKS_LOG_INFO(immutable_db_options_.info_log,
2426                    "Dropped column family with id %u\n", cfd->GetID());
2427   } else {
2428     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2429                     "Dropping column family with id %u FAILED -- %s\n",
2430                     cfd->GetID(), s.ToString().c_str());
2431   }
2432 
2433   return s;
2434 }
2435 
KeyMayExist(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,bool * value_found)2436 bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2437                          ColumnFamilyHandle* column_family, const Slice& key,
2438                          std::string* value, bool* value_found) {
2439   assert(value != nullptr);
2440   if (value_found != nullptr) {
2441     // falsify later if key-may-exist but can't fetch value
2442     *value_found = true;
2443   }
2444   ReadOptions roptions = read_options;
2445   roptions.read_tier = kBlockCacheTier;  // read from block cache only
2446   PinnableSlice pinnable_val;
2447   GetImplOptions get_impl_options;
2448   get_impl_options.column_family = column_family;
2449   get_impl_options.value = &pinnable_val;
2450   get_impl_options.value_found = value_found;
2451   auto s = GetImpl(roptions, key, get_impl_options);
2452   value->assign(pinnable_val.data(), pinnable_val.size());
2453 
2454   // If block_cache is enabled and the index block of the table didn't
2455   // not present in block_cache, the return value will be Status::Incomplete.
2456   // In this case, key may still exist in the table.
2457   return s.ok() || s.IsIncomplete();
2458 }
2459 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)2460 Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2461                               ColumnFamilyHandle* column_family) {
2462   if (read_options.managed) {
2463     return NewErrorIterator(
2464         Status::NotSupported("Managed iterator is not supported anymore."));
2465   }
2466   Iterator* result = nullptr;
2467   if (read_options.read_tier == kPersistedTier) {
2468     return NewErrorIterator(Status::NotSupported(
2469         "ReadTier::kPersistedData is not yet supported in iterators."));
2470   }
2471   // if iterator wants internal keys, we can only proceed if
2472   // we can guarantee the deletes haven't been processed yet
2473   if (immutable_db_options_.preserve_deletes &&
2474       read_options.iter_start_seqnum > 0 &&
2475       read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2476     return NewErrorIterator(Status::InvalidArgument(
2477         "Iterator requested internal keys which are too old and are not"
2478         " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2479   }
2480   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2481   auto cfd = cfh->cfd();
2482   ReadCallback* read_callback = nullptr;  // No read callback provided.
2483   if (read_options.tailing) {
2484 #ifdef ROCKSDB_LITE
2485     // not supported in lite version
2486     result = nullptr;
2487 
2488 #else
2489     SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2490     auto iter = new ForwardIterator(this, read_options, cfd, sv);
2491     result = NewDBIterator(
2492         env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2493         cfd->user_comparator(), iter, kMaxSequenceNumber,
2494         sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2495         this, cfd);
2496 #endif
2497   } else {
2498     // Note: no need to consider the special case of
2499     // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2500     // WritePreparedTxnDB
2501     auto snapshot = read_options.snapshot != nullptr
2502                         ? read_options.snapshot->GetSequenceNumber()
2503                         : versions_->LastSequence();
2504     result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
2505   }
2506   return result;
2507 }
2508 
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool allow_blob,bool allow_refresh)2509 ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2510                                             ColumnFamilyData* cfd,
2511                                             SequenceNumber snapshot,
2512                                             ReadCallback* read_callback,
2513                                             bool allow_blob,
2514                                             bool allow_refresh) {
2515   SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2516 
2517   // Try to generate a DB iterator tree in continuous memory area to be
2518   // cache friendly. Here is an example of result:
2519   // +-------------------------------+
2520   // |                               |
2521   // | ArenaWrappedDBIter            |
2522   // |  +                            |
2523   // |  +---> Inner Iterator   ------------+
2524   // |  |                            |     |
2525   // |  |    +-- -- -- -- -- -- -- --+     |
2526   // |  +--- | Arena                 |     |
2527   // |       |                       |     |
2528   // |          Allocated Memory:    |     |
2529   // |       |   +-------------------+     |
2530   // |       |   | DBIter            | <---+
2531   // |           |  +                |
2532   // |       |   |  +-> iter_  ------------+
2533   // |       |   |                   |     |
2534   // |       |   +-------------------+     |
2535   // |       |   | MergingIterator   | <---+
2536   // |           |  +                |
2537   // |       |   |  +->child iter1  ------------+
2538   // |       |   |  |                |          |
2539   // |           |  +->child iter2  ----------+ |
2540   // |       |   |  |                |        | |
2541   // |       |   |  +->child iter3  --------+ | |
2542   // |           |                   |      | | |
2543   // |       |   +-------------------+      | | |
2544   // |       |   | Iterator1         | <--------+
2545   // |       |   +-------------------+      | |
2546   // |       |   | Iterator2         | <------+
2547   // |       |   +-------------------+      |
2548   // |       |   | Iterator3         | <----+
2549   // |       |   +-------------------+
2550   // |       |                       |
2551   // +-------+-----------------------+
2552   //
2553   // ArenaWrappedDBIter inlines an arena area where all the iterators in
2554   // the iterator tree are allocated in the order of being accessed when
2555   // querying.
2556   // Laying out the iterators in the order of being accessed makes it more
2557   // likely that any iterator pointer is close to the iterator it points to so
2558   // that they are likely to be in the same cache line and/or page.
2559   ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2560       env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
2561       sv->mutable_cf_options.max_sequential_skip_in_iterations,
2562       sv->version_number, read_callback, this, cfd, allow_blob,
2563       ((read_options.snapshot != nullptr) ? false : allow_refresh));
2564 
2565   InternalIterator* internal_iter =
2566       NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
2567                           db_iter->GetRangeDelAggregator(), snapshot);
2568   db_iter->SetIterUnderDBIter(internal_iter);
2569 
2570   return db_iter;
2571 }
2572 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)2573 Status DBImpl::NewIterators(
2574     const ReadOptions& read_options,
2575     const std::vector<ColumnFamilyHandle*>& column_families,
2576     std::vector<Iterator*>* iterators) {
2577   if (read_options.managed) {
2578     return Status::NotSupported("Managed iterator is not supported anymore.");
2579   }
2580   if (read_options.read_tier == kPersistedTier) {
2581     return Status::NotSupported(
2582         "ReadTier::kPersistedData is not yet supported in iterators.");
2583   }
2584   ReadCallback* read_callback = nullptr;  // No read callback provided.
2585   iterators->clear();
2586   iterators->reserve(column_families.size());
2587   if (read_options.tailing) {
2588 #ifdef ROCKSDB_LITE
2589     return Status::InvalidArgument(
2590         "Tailing iterator not supported in RocksDB lite");
2591 #else
2592     for (auto cfh : column_families) {
2593       auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
2594       SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2595       auto iter = new ForwardIterator(this, read_options, cfd, sv);
2596       iterators->push_back(NewDBIterator(
2597           env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2598           cfd->user_comparator(), iter, kMaxSequenceNumber,
2599           sv->mutable_cf_options.max_sequential_skip_in_iterations,
2600           read_callback, this, cfd));
2601     }
2602 #endif
2603   } else {
2604     // Note: no need to consider the special case of
2605     // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2606     // WritePreparedTxnDB
2607     auto snapshot = read_options.snapshot != nullptr
2608                         ? read_options.snapshot->GetSequenceNumber()
2609                         : versions_->LastSequence();
2610     for (size_t i = 0; i < column_families.size(); ++i) {
2611       auto* cfd =
2612           reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i])->cfd();
2613       iterators->push_back(
2614           NewIteratorImpl(read_options, cfd, snapshot, read_callback));
2615     }
2616   }
2617 
2618   return Status::OK();
2619 }
2620 
GetSnapshot()2621 const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2622 
2623 #ifndef ROCKSDB_LITE
GetSnapshotForWriteConflictBoundary()2624 const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2625   return GetSnapshotImpl(true);
2626 }
2627 #endif  // ROCKSDB_LITE
2628 
GetSnapshotImpl(bool is_write_conflict_boundary,bool lock)2629 SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
2630                                       bool lock) {
2631   int64_t unix_time = 0;
2632   env_->GetCurrentTime(&unix_time);  // Ignore error
2633   SnapshotImpl* s = new SnapshotImpl;
2634 
2635   if (lock) {
2636     mutex_.Lock();
2637   }
2638   // returns null if the underlying memtable does not support snapshot.
2639   if (!is_snapshot_supported_) {
2640     if (lock) {
2641       mutex_.Unlock();
2642     }
2643     delete s;
2644     return nullptr;
2645   }
2646   auto snapshot_seq = last_seq_same_as_publish_seq_
2647                           ? versions_->LastSequence()
2648                           : versions_->LastPublishedSequence();
2649   SnapshotImpl* snapshot =
2650       snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
2651   if (lock) {
2652     mutex_.Unlock();
2653   }
2654   return snapshot;
2655 }
2656 
2657 namespace {
2658 typedef autovector<ColumnFamilyData*, 2> CfdList;
CfdListContains(const CfdList & list,ColumnFamilyData * cfd)2659 bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
2660   for (const ColumnFamilyData* t : list) {
2661     if (t == cfd) {
2662       return true;
2663     }
2664   }
2665   return false;
2666 }
2667 }  //  namespace
2668 
ReleaseSnapshot(const Snapshot * s)2669 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
2670   const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
2671   {
2672     InstrumentedMutexLock l(&mutex_);
2673     snapshots_.Delete(casted_s);
2674     uint64_t oldest_snapshot;
2675     if (snapshots_.empty()) {
2676       oldest_snapshot = last_seq_same_as_publish_seq_
2677                             ? versions_->LastSequence()
2678                             : versions_->LastPublishedSequence();
2679     } else {
2680       oldest_snapshot = snapshots_.oldest()->number_;
2681     }
2682     // Avoid to go through every column family by checking a global threshold
2683     // first.
2684     if (oldest_snapshot > bottommost_files_mark_threshold_) {
2685       CfdList cf_scheduled;
2686       for (auto* cfd : *versions_->GetColumnFamilySet()) {
2687         cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
2688         if (!cfd->current()
2689                  ->storage_info()
2690                  ->BottommostFilesMarkedForCompaction()
2691                  .empty()) {
2692           SchedulePendingCompaction(cfd);
2693           MaybeScheduleFlushOrCompaction();
2694           cf_scheduled.push_back(cfd);
2695         }
2696       }
2697 
2698       // Calculate a new threshold, skipping those CFs where compactions are
2699       // scheduled. We do not do the same pass as the previous loop because
2700       // mutex might be unlocked during the loop, making the result inaccurate.
2701       SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
2702       for (auto* cfd : *versions_->GetColumnFamilySet()) {
2703         if (CfdListContains(cf_scheduled, cfd)) {
2704           continue;
2705         }
2706         new_bottommost_files_mark_threshold = std::min(
2707             new_bottommost_files_mark_threshold,
2708             cfd->current()->storage_info()->bottommost_files_mark_threshold());
2709       }
2710       bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
2711     }
2712   }
2713   delete casted_s;
2714 }
2715 
2716 #ifndef ROCKSDB_LITE
GetPropertiesOfAllTables(ColumnFamilyHandle * column_family,TablePropertiesCollection * props)2717 Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
2718                                         TablePropertiesCollection* props) {
2719   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2720   auto cfd = cfh->cfd();
2721 
2722   // Increment the ref count
2723   mutex_.Lock();
2724   auto version = cfd->current();
2725   version->Ref();
2726   mutex_.Unlock();
2727 
2728   auto s = version->GetPropertiesOfAllTables(props);
2729 
2730   // Decrement the ref count
2731   mutex_.Lock();
2732   version->Unref();
2733   mutex_.Unlock();
2734 
2735   return s;
2736 }
2737 
GetPropertiesOfTablesInRange(ColumnFamilyHandle * column_family,const Range * range,std::size_t n,TablePropertiesCollection * props)2738 Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
2739                                             const Range* range, std::size_t n,
2740                                             TablePropertiesCollection* props) {
2741   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2742   auto cfd = cfh->cfd();
2743 
2744   // Increment the ref count
2745   mutex_.Lock();
2746   auto version = cfd->current();
2747   version->Ref();
2748   mutex_.Unlock();
2749 
2750   auto s = version->GetPropertiesOfTablesInRange(range, n, props);
2751 
2752   // Decrement the ref count
2753   mutex_.Lock();
2754   version->Unref();
2755   mutex_.Unlock();
2756 
2757   return s;
2758 }
2759 
2760 #endif  // ROCKSDB_LITE
2761 
GetName() const2762 const std::string& DBImpl::GetName() const { return dbname_; }
2763 
GetEnv() const2764 Env* DBImpl::GetEnv() const { return env_; }
2765 
GetFileSystem() const2766 FileSystem* DB::GetFileSystem() const {
2767   static LegacyFileSystemWrapper fs_wrap(GetEnv());
2768   return &fs_wrap;
2769 }
2770 
GetFileSystem() const2771 FileSystem* DBImpl::GetFileSystem() const {
2772   return immutable_db_options_.fs.get();
2773 }
2774 
GetOptions(ColumnFamilyHandle * column_family) const2775 Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
2776   InstrumentedMutexLock l(&mutex_);
2777   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2778   return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
2779                  cfh->cfd()->GetLatestCFOptions());
2780 }
2781 
GetDBOptions() const2782 DBOptions DBImpl::GetDBOptions() const {
2783   InstrumentedMutexLock l(&mutex_);
2784   return BuildDBOptions(immutable_db_options_, mutable_db_options_);
2785 }
2786 
GetProperty(ColumnFamilyHandle * column_family,const Slice & property,std::string * value)2787 bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
2788                          const Slice& property, std::string* value) {
2789   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2790   value->clear();
2791   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2792   if (property_info == nullptr) {
2793     return false;
2794   } else if (property_info->handle_int) {
2795     uint64_t int_value;
2796     bool ret_value =
2797         GetIntPropertyInternal(cfd, *property_info, false, &int_value);
2798     if (ret_value) {
2799       *value = ToString(int_value);
2800     }
2801     return ret_value;
2802   } else if (property_info->handle_string) {
2803     InstrumentedMutexLock l(&mutex_);
2804     return cfd->internal_stats()->GetStringProperty(*property_info, property,
2805                                                     value);
2806   } else if (property_info->handle_string_dbimpl) {
2807     std::string tmp_value;
2808     bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
2809     if (ret_value) {
2810       *value = tmp_value;
2811     }
2812     return ret_value;
2813   }
2814   // Shouldn't reach here since exactly one of handle_string and handle_int
2815   // should be non-nullptr.
2816   assert(false);
2817   return false;
2818 }
2819 
GetMapProperty(ColumnFamilyHandle * column_family,const Slice & property,std::map<std::string,std::string> * value)2820 bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
2821                             const Slice& property,
2822                             std::map<std::string, std::string>* value) {
2823   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2824   value->clear();
2825   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2826   if (property_info == nullptr) {
2827     return false;
2828   } else if (property_info->handle_map) {
2829     InstrumentedMutexLock l(&mutex_);
2830     return cfd->internal_stats()->GetMapProperty(*property_info, property,
2831                                                  value);
2832   }
2833   // If we reach this point it means that handle_map is not provided for the
2834   // requested property
2835   return false;
2836 }
2837 
GetIntProperty(ColumnFamilyHandle * column_family,const Slice & property,uint64_t * value)2838 bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
2839                             const Slice& property, uint64_t* value) {
2840   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2841   if (property_info == nullptr || property_info->handle_int == nullptr) {
2842     return false;
2843   }
2844   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2845   return GetIntPropertyInternal(cfd, *property_info, false, value);
2846 }
2847 
GetIntPropertyInternal(ColumnFamilyData * cfd,const DBPropertyInfo & property_info,bool is_locked,uint64_t * value)2848 bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
2849                                     const DBPropertyInfo& property_info,
2850                                     bool is_locked, uint64_t* value) {
2851   assert(property_info.handle_int != nullptr);
2852   if (!property_info.need_out_of_mutex) {
2853     if (is_locked) {
2854       mutex_.AssertHeld();
2855       return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2856     } else {
2857       InstrumentedMutexLock l(&mutex_);
2858       return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2859     }
2860   } else {
2861     SuperVersion* sv = nullptr;
2862     if (!is_locked) {
2863       sv = GetAndRefSuperVersion(cfd);
2864     } else {
2865       sv = cfd->GetSuperVersion();
2866     }
2867 
2868     bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
2869         property_info, sv->current, value);
2870 
2871     if (!is_locked) {
2872       ReturnAndCleanupSuperVersion(cfd, sv);
2873     }
2874 
2875     return ret;
2876   }
2877 }
2878 
GetPropertyHandleOptionsStatistics(std::string * value)2879 bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
2880   assert(value != nullptr);
2881   Statistics* statistics = immutable_db_options_.statistics.get();
2882   if (!statistics) {
2883     return false;
2884   }
2885   *value = statistics->ToString();
2886   return true;
2887 }
2888 
2889 #ifndef ROCKSDB_LITE
ResetStats()2890 Status DBImpl::ResetStats() {
2891   InstrumentedMutexLock l(&mutex_);
2892   for (auto* cfd : *versions_->GetColumnFamilySet()) {
2893     if (cfd->initialized()) {
2894       cfd->internal_stats()->Clear();
2895     }
2896   }
2897   return Status::OK();
2898 }
2899 #endif  // ROCKSDB_LITE
2900 
GetAggregatedIntProperty(const Slice & property,uint64_t * aggregated_value)2901 bool DBImpl::GetAggregatedIntProperty(const Slice& property,
2902                                       uint64_t* aggregated_value) {
2903   const DBPropertyInfo* property_info = GetPropertyInfo(property);
2904   if (property_info == nullptr || property_info->handle_int == nullptr) {
2905     return false;
2906   }
2907 
2908   uint64_t sum = 0;
2909   {
2910     // Needs mutex to protect the list of column families.
2911     InstrumentedMutexLock l(&mutex_);
2912     uint64_t value;
2913     for (auto* cfd : *versions_->GetColumnFamilySet()) {
2914       if (!cfd->initialized()) {
2915         continue;
2916       }
2917       if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
2918         sum += value;
2919       } else {
2920         return false;
2921       }
2922     }
2923   }
2924   *aggregated_value = sum;
2925   return true;
2926 }
2927 
GetAndRefSuperVersion(ColumnFamilyData * cfd)2928 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
2929   // TODO(ljin): consider using GetReferencedSuperVersion() directly
2930   return cfd->GetThreadLocalSuperVersion(this);
2931 }
2932 
2933 // REQUIRED: this function should only be called on the write thread or if the
2934 // mutex is held.
GetAndRefSuperVersion(uint32_t column_family_id)2935 SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
2936   auto column_family_set = versions_->GetColumnFamilySet();
2937   auto cfd = column_family_set->GetColumnFamily(column_family_id);
2938   if (!cfd) {
2939     return nullptr;
2940   }
2941 
2942   return GetAndRefSuperVersion(cfd);
2943 }
2944 
CleanupSuperVersion(SuperVersion * sv)2945 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
2946   // Release SuperVersion
2947   if (sv->Unref()) {
2948     bool defer_purge =
2949             immutable_db_options().avoid_unnecessary_blocking_io;
2950     {
2951       InstrumentedMutexLock l(&mutex_);
2952       sv->Cleanup();
2953       if (defer_purge) {
2954         AddSuperVersionsToFreeQueue(sv);
2955         SchedulePurge();
2956       }
2957     }
2958     if (!defer_purge) {
2959       delete sv;
2960     }
2961     RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
2962   }
2963   RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
2964 }
2965 
ReturnAndCleanupSuperVersion(ColumnFamilyData * cfd,SuperVersion * sv)2966 void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
2967                                           SuperVersion* sv) {
2968   if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
2969     CleanupSuperVersion(sv);
2970   }
2971 }
2972 
2973 // REQUIRED: this function should only be called on the write thread.
ReturnAndCleanupSuperVersion(uint32_t column_family_id,SuperVersion * sv)2974 void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
2975                                           SuperVersion* sv) {
2976   auto column_family_set = versions_->GetColumnFamilySet();
2977   auto cfd = column_family_set->GetColumnFamily(column_family_id);
2978 
2979   // If SuperVersion is held, and we successfully fetched a cfd using
2980   // GetAndRefSuperVersion(), it must still exist.
2981   assert(cfd != nullptr);
2982   ReturnAndCleanupSuperVersion(cfd, sv);
2983 }
2984 
2985 // REQUIRED: this function should only be called on the write thread or if the
2986 // mutex is held.
GetColumnFamilyHandle(uint32_t column_family_id)2987 ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
2988   ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
2989 
2990   if (!cf_memtables->Seek(column_family_id)) {
2991     return nullptr;
2992   }
2993 
2994   return cf_memtables->GetColumnFamilyHandle();
2995 }
2996 
2997 // REQUIRED: mutex is NOT held.
GetColumnFamilyHandleUnlocked(uint32_t column_family_id)2998 std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
2999     uint32_t column_family_id) {
3000   InstrumentedMutexLock l(&mutex_);
3001 
3002   auto* cfd =
3003       versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3004   if (cfd == nullptr) {
3005     return nullptr;
3006   }
3007 
3008   return std::unique_ptr<ColumnFamilyHandleImpl>(
3009       new ColumnFamilyHandleImpl(cfd, this, &mutex_));
3010 }
3011 
GetApproximateMemTableStats(ColumnFamilyHandle * column_family,const Range & range,uint64_t * const count,uint64_t * const size)3012 void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3013                                          const Range& range,
3014                                          uint64_t* const count,
3015                                          uint64_t* const size) {
3016   ColumnFamilyHandleImpl* cfh =
3017       reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3018   ColumnFamilyData* cfd = cfh->cfd();
3019   SuperVersion* sv = GetAndRefSuperVersion(cfd);
3020 
3021   // Convert user_key into a corresponding internal key.
3022   InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3023   InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3024   MemTable::MemTableStats memStats =
3025       sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3026   MemTable::MemTableStats immStats =
3027       sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3028   *count = memStats.count + immStats.count;
3029   *size = memStats.size + immStats.size;
3030 
3031   ReturnAndCleanupSuperVersion(cfd, sv);
3032 }
3033 
GetApproximateSizes(const SizeApproximationOptions & options,ColumnFamilyHandle * column_family,const Range * range,int n,uint64_t * sizes)3034 Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3035                                    ColumnFamilyHandle* column_family,
3036                                    const Range* range, int n, uint64_t* sizes) {
3037   if (!options.include_memtabtles && !options.include_files) {
3038     return Status::InvalidArgument("Invalid options");
3039   }
3040 
3041   Version* v;
3042   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3043   auto cfd = cfh->cfd();
3044   SuperVersion* sv = GetAndRefSuperVersion(cfd);
3045   v = sv->current;
3046 
3047   for (int i = 0; i < n; i++) {
3048     // Convert user_key into a corresponding internal key.
3049     InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
3050     InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
3051     sizes[i] = 0;
3052     if (options.include_files) {
3053       sizes[i] += versions_->ApproximateSize(
3054           options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3055           /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
3056     }
3057     if (options.include_memtabtles) {
3058       sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3059       sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3060     }
3061   }
3062 
3063   ReturnAndCleanupSuperVersion(cfd, sv);
3064   return Status::OK();
3065 }
3066 
3067 std::list<uint64_t>::iterator
CaptureCurrentFileNumberInPendingOutputs()3068 DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3069   // We need to remember the iterator of our insert, because after the
3070   // background job is done, we need to remove that element from
3071   // pending_outputs_.
3072   pending_outputs_.push_back(versions_->current_next_file_number());
3073   auto pending_outputs_inserted_elem = pending_outputs_.end();
3074   --pending_outputs_inserted_elem;
3075   return pending_outputs_inserted_elem;
3076 }
3077 
ReleaseFileNumberFromPendingOutputs(std::unique_ptr<std::list<uint64_t>::iterator> & v)3078 void DBImpl::ReleaseFileNumberFromPendingOutputs(
3079     std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3080   if (v.get() != nullptr) {
3081     pending_outputs_.erase(*v.get());
3082     v.reset();
3083   }
3084 }
3085 
3086 #ifndef ROCKSDB_LITE
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options)3087 Status DBImpl::GetUpdatesSince(
3088     SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
3089     const TransactionLogIterator::ReadOptions& read_options) {
3090   RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3091   if (seq > versions_->LastSequence()) {
3092     return Status::NotFound("Requested sequence not yet written in the db");
3093   }
3094   return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3095 }
3096 
DeleteFile(std::string name)3097 Status DBImpl::DeleteFile(std::string name) {
3098   uint64_t number;
3099   FileType type;
3100   WalFileType log_type;
3101   if (!ParseFileName(name, &number, &type, &log_type) ||
3102       (type != kTableFile && type != kLogFile)) {
3103     ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3104                     name.c_str());
3105     return Status::InvalidArgument("Invalid file name");
3106   }
3107 
3108   Status status;
3109   if (type == kLogFile) {
3110     // Only allow deleting archived log files
3111     if (log_type != kArchivedLogFile) {
3112       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3113                       "DeleteFile %s failed - not archived log.\n",
3114                       name.c_str());
3115       return Status::NotSupported("Delete only supported for archived logs");
3116     }
3117     status = wal_manager_.DeleteFile(name, number);
3118     if (!status.ok()) {
3119       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3120                       "DeleteFile %s failed -- %s.\n", name.c_str(),
3121                       status.ToString().c_str());
3122     }
3123     return status;
3124   }
3125 
3126   int level;
3127   FileMetaData* metadata;
3128   ColumnFamilyData* cfd;
3129   VersionEdit edit;
3130   JobContext job_context(next_job_id_.fetch_add(1), true);
3131   {
3132     InstrumentedMutexLock l(&mutex_);
3133     status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3134     if (!status.ok()) {
3135       ROCKS_LOG_WARN(immutable_db_options_.info_log,
3136                      "DeleteFile %s failed. File not found\n", name.c_str());
3137       job_context.Clean();
3138       return Status::InvalidArgument("File not found");
3139     }
3140     assert(level < cfd->NumberLevels());
3141 
3142     // If the file is being compacted no need to delete.
3143     if (metadata->being_compacted) {
3144       ROCKS_LOG_INFO(immutable_db_options_.info_log,
3145                      "DeleteFile %s Skipped. File about to be compacted\n",
3146                      name.c_str());
3147       job_context.Clean();
3148       return Status::OK();
3149     }
3150 
3151     // Only the files in the last level can be deleted externally.
3152     // This is to make sure that any deletion tombstones are not
3153     // lost. Check that the level passed is the last level.
3154     auto* vstoreage = cfd->current()->storage_info();
3155     for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3156       if (vstoreage->NumLevelFiles(i) != 0) {
3157         ROCKS_LOG_WARN(immutable_db_options_.info_log,
3158                        "DeleteFile %s FAILED. File not in last level\n",
3159                        name.c_str());
3160         job_context.Clean();
3161         return Status::InvalidArgument("File not in last level");
3162       }
3163     }
3164     // if level == 0, it has to be the oldest file
3165     if (level == 0 &&
3166         vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3167       ROCKS_LOG_WARN(immutable_db_options_.info_log,
3168                      "DeleteFile %s failed ---"
3169                      " target file in level 0 must be the oldest.",
3170                      name.c_str());
3171       job_context.Clean();
3172       return Status::InvalidArgument("File in level 0, but not oldest");
3173     }
3174     edit.SetColumnFamily(cfd->GetID());
3175     edit.DeleteFile(level, number);
3176     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3177                                     &edit, &mutex_, directories_.GetDbDir());
3178     if (status.ok()) {
3179       InstallSuperVersionAndScheduleWork(cfd,
3180                                          &job_context.superversion_contexts[0],
3181                                          *cfd->GetLatestMutableCFOptions());
3182     }
3183     FindObsoleteFiles(&job_context, false);
3184   }  // lock released here
3185 
3186   LogFlush(immutable_db_options_.info_log);
3187   // remove files outside the db-lock
3188   if (job_context.HaveSomethingToDelete()) {
3189     // Call PurgeObsoleteFiles() without holding mutex.
3190     PurgeObsoleteFiles(job_context);
3191   }
3192   job_context.Clean();
3193   return status;
3194 }
3195 
DeleteFilesInRanges(ColumnFamilyHandle * column_family,const RangePtr * ranges,size_t n,bool include_end)3196 Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3197                                    const RangePtr* ranges, size_t n,
3198                                    bool include_end) {
3199   Status status;
3200   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3201   ColumnFamilyData* cfd = cfh->cfd();
3202   VersionEdit edit;
3203   std::set<FileMetaData*> deleted_files;
3204   JobContext job_context(next_job_id_.fetch_add(1), true);
3205   {
3206     InstrumentedMutexLock l(&mutex_);
3207     Version* input_version = cfd->current();
3208 
3209     auto* vstorage = input_version->storage_info();
3210     for (size_t r = 0; r < n; r++) {
3211       auto begin = ranges[r].start, end = ranges[r].limit;
3212       for (int i = 1; i < cfd->NumberLevels(); i++) {
3213         if (vstorage->LevelFiles(i).empty() ||
3214             !vstorage->OverlapInLevel(i, begin, end)) {
3215           continue;
3216         }
3217         std::vector<FileMetaData*> level_files;
3218         InternalKey begin_storage, end_storage, *begin_key, *end_key;
3219         if (begin == nullptr) {
3220           begin_key = nullptr;
3221         } else {
3222           begin_storage.SetMinPossibleForUserKey(*begin);
3223           begin_key = &begin_storage;
3224         }
3225         if (end == nullptr) {
3226           end_key = nullptr;
3227         } else {
3228           end_storage.SetMaxPossibleForUserKey(*end);
3229           end_key = &end_storage;
3230         }
3231 
3232         vstorage->GetCleanInputsWithinInterval(
3233             i, begin_key, end_key, &level_files, -1 /* hint_index */,
3234             nullptr /* file_index */);
3235         FileMetaData* level_file;
3236         for (uint32_t j = 0; j < level_files.size(); j++) {
3237           level_file = level_files[j];
3238           if (level_file->being_compacted) {
3239             continue;
3240           }
3241           if (deleted_files.find(level_file) != deleted_files.end()) {
3242             continue;
3243           }
3244           if (!include_end && end != nullptr &&
3245               cfd->user_comparator()->Compare(level_file->largest.user_key(),
3246                                               *end) == 0) {
3247             continue;
3248           }
3249           edit.SetColumnFamily(cfd->GetID());
3250           edit.DeleteFile(i, level_file->fd.GetNumber());
3251           deleted_files.insert(level_file);
3252           level_file->being_compacted = true;
3253         }
3254       }
3255     }
3256     if (edit.GetDeletedFiles().empty()) {
3257       job_context.Clean();
3258       return Status::OK();
3259     }
3260     input_version->Ref();
3261     status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3262                                     &edit, &mutex_, directories_.GetDbDir());
3263     if (status.ok()) {
3264       InstallSuperVersionAndScheduleWork(cfd,
3265                                          &job_context.superversion_contexts[0],
3266                                          *cfd->GetLatestMutableCFOptions());
3267     }
3268     for (auto* deleted_file : deleted_files) {
3269       deleted_file->being_compacted = false;
3270     }
3271     input_version->Unref();
3272     FindObsoleteFiles(&job_context, false);
3273   }  // lock released here
3274 
3275   LogFlush(immutable_db_options_.info_log);
3276   // remove files outside the db-lock
3277   if (job_context.HaveSomethingToDelete()) {
3278     // Call PurgeObsoleteFiles() without holding mutex.
3279     PurgeObsoleteFiles(job_context);
3280   }
3281   job_context.Clean();
3282   return status;
3283 }
3284 
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)3285 void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3286   InstrumentedMutexLock l(&mutex_);
3287   versions_->GetLiveFilesMetaData(metadata);
3288 }
3289 
GetColumnFamilyMetaData(ColumnFamilyHandle * column_family,ColumnFamilyMetaData * cf_meta)3290 void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3291                                      ColumnFamilyMetaData* cf_meta) {
3292   assert(column_family);
3293   auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
3294   auto* sv = GetAndRefSuperVersion(cfd);
3295   {
3296     // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3297     // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3298     // this may cause regression. An alternative is to make
3299     // FileMetaData::being_compacted atomic, but it will make FileMetaData
3300     // non-copy-able. Another option is to separate these variables from
3301     // original FileMetaData struct, and this requires re-organization of data
3302     // structures. For now, we take the easy approach. If
3303     // DB::GetColumnFamilyMetaData is not called frequently, the regression
3304     // should not be big. We still need to keep an eye on it.
3305     InstrumentedMutexLock l(&mutex_);
3306     sv->current->GetColumnFamilyMetaData(cf_meta);
3307   }
3308   ReturnAndCleanupSuperVersion(cfd, sv);
3309 }
3310 
3311 #endif  // ROCKSDB_LITE
3312 
CheckConsistency()3313 Status DBImpl::CheckConsistency() {
3314   mutex_.AssertHeld();
3315   std::vector<LiveFileMetaData> metadata;
3316   versions_->GetLiveFilesMetaData(&metadata);
3317   TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
3318 
3319   std::string corruption_messages;
3320   for (const auto& md : metadata) {
3321     // md.name has a leading "/".
3322     std::string file_path = md.db_path + md.name;
3323 
3324     uint64_t fsize = 0;
3325     TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3326     Status s = env_->GetFileSize(file_path, &fsize);
3327     if (!s.ok() &&
3328         env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3329       s = Status::OK();
3330     }
3331     if (!s.ok()) {
3332       corruption_messages +=
3333           "Can't access " + md.name + ": " + s.ToString() + "\n";
3334     } else if (fsize != md.size) {
3335       corruption_messages += "Sst file size mismatch: " + file_path +
3336                              ". Size recorded in manifest " +
3337                              ToString(md.size) + ", actual size " +
3338                              ToString(fsize) + "\n";
3339     }
3340   }
3341   if (corruption_messages.size() == 0) {
3342     return Status::OK();
3343   } else {
3344     return Status::Corruption(corruption_messages);
3345   }
3346 }
3347 
GetDbIdentity(std::string & identity) const3348 Status DBImpl::GetDbIdentity(std::string& identity) const {
3349   identity.assign(db_id_);
3350   return Status::OK();
3351 }
3352 
GetDbIdentityFromIdentityFile(std::string * identity) const3353 Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
3354   std::string idfilename = IdentityFileName(dbname_);
3355   const FileOptions soptions;
3356   std::unique_ptr<SequentialFileReader> id_file_reader;
3357   Status s;
3358   {
3359     std::unique_ptr<FSSequentialFile> idfile;
3360     s = fs_->NewSequentialFile(idfilename, soptions, &idfile, nullptr);
3361     if (!s.ok()) {
3362       return s;
3363     }
3364     id_file_reader.reset(
3365         new SequentialFileReader(std::move(idfile), idfilename));
3366   }
3367 
3368   uint64_t file_size;
3369   s = fs_->GetFileSize(idfilename, IOOptions(), &file_size, nullptr);
3370   if (!s.ok()) {
3371     return s;
3372   }
3373   char* buffer =
3374       reinterpret_cast<char*>(alloca(static_cast<size_t>(file_size)));
3375   Slice id;
3376   s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer);
3377   if (!s.ok()) {
3378     return s;
3379   }
3380   identity->assign(id.ToString());
3381   // If last character is '\n' remove it from identity
3382   if (identity->size() > 0 && identity->back() == '\n') {
3383     identity->pop_back();
3384   }
3385   return s;
3386 }
3387 
3388 // Default implementation -- returns not supported status
CreateColumnFamily(const ColumnFamilyOptions &,const std::string &,ColumnFamilyHandle **)3389 Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3390                               const std::string& /*column_family_name*/,
3391                               ColumnFamilyHandle** /*handle*/) {
3392   return Status::NotSupported("");
3393 }
3394 
CreateColumnFamilies(const ColumnFamilyOptions &,const std::vector<std::string> &,std::vector<ColumnFamilyHandle * > *)3395 Status DB::CreateColumnFamilies(
3396     const ColumnFamilyOptions& /*cf_options*/,
3397     const std::vector<std::string>& /*column_family_names*/,
3398     std::vector<ColumnFamilyHandle*>* /*handles*/) {
3399   return Status::NotSupported("");
3400 }
3401 
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> &,std::vector<ColumnFamilyHandle * > *)3402 Status DB::CreateColumnFamilies(
3403     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3404     std::vector<ColumnFamilyHandle*>* /*handles*/) {
3405   return Status::NotSupported("");
3406 }
3407 
DropColumnFamily(ColumnFamilyHandle *)3408 Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3409   return Status::NotSupported("");
3410 }
3411 
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > &)3412 Status DB::DropColumnFamilies(
3413     const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
3414   return Status::NotSupported("");
3415 }
3416 
DestroyColumnFamilyHandle(ColumnFamilyHandle * column_family)3417 Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3418   delete column_family;
3419   return Status::OK();
3420 }
3421 
~DB()3422 DB::~DB() {}
3423 
Close()3424 Status DBImpl::Close() {
3425   if (!closed_) {
3426     {
3427       InstrumentedMutexLock l(&mutex_);
3428       // If there is unreleased snapshot, fail the close call
3429       if (!snapshots_.empty()) {
3430         return Status::Aborted("Cannot close DB with unreleased snapshot.");
3431       }
3432     }
3433 
3434     closed_ = true;
3435     return CloseImpl();
3436   }
3437   return Status::OK();
3438 }
3439 
ListColumnFamilies(const DBOptions & db_options,const std::string & name,std::vector<std::string> * column_families)3440 Status DB::ListColumnFamilies(const DBOptions& db_options,
3441                               const std::string& name,
3442                               std::vector<std::string>* column_families) {
3443   FileSystem* fs = db_options.file_system.get();
3444   LegacyFileSystemWrapper legacy_fs(db_options.env);
3445   if (!fs) {
3446     fs = &legacy_fs;
3447   }
3448   return VersionSet::ListColumnFamilies(column_families, name, fs);
3449 }
3450 
~Snapshot()3451 Snapshot::~Snapshot() {}
3452 
DestroyDB(const std::string & dbname,const Options & options,const std::vector<ColumnFamilyDescriptor> & column_families)3453 Status DestroyDB(const std::string& dbname, const Options& options,
3454                  const std::vector<ColumnFamilyDescriptor>& column_families) {
3455   ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
3456   Env* env = soptions.env;
3457   std::vector<std::string> filenames;
3458   bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
3459 
3460   // Reset the logger because it holds a handle to the
3461   // log file and prevents cleanup and directory removal
3462   soptions.info_log.reset();
3463   // Ignore error in case directory does not exist
3464   env->GetChildren(dbname, &filenames);
3465 
3466   FileLock* lock;
3467   const std::string lockname = LockFileName(dbname);
3468   Status result = env->LockFile(lockname, &lock);
3469   if (result.ok()) {
3470     uint64_t number;
3471     FileType type;
3472     InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
3473     for (const auto& fname : filenames) {
3474       if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
3475           type != kDBLockFile) {  // Lock file will be deleted at end
3476         Status del;
3477         std::string path_to_delete = dbname + "/" + fname;
3478         if (type == kMetaDatabase) {
3479           del = DestroyDB(path_to_delete, options);
3480         } else if (type == kTableFile || type == kLogFile) {
3481           del = DeleteDBFile(&soptions, path_to_delete, dbname,
3482                              /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3483         } else {
3484           del = env->DeleteFile(path_to_delete);
3485         }
3486         if (result.ok() && !del.ok()) {
3487           result = del;
3488         }
3489       }
3490     }
3491 
3492     std::vector<std::string> paths;
3493 
3494     for (const auto& path : options.db_paths) {
3495       paths.emplace_back(path.path);
3496     }
3497     for (const auto& cf : column_families) {
3498       for (const auto& path : cf.options.cf_paths) {
3499         paths.emplace_back(path.path);
3500       }
3501     }
3502 
3503     // Remove duplicate paths.
3504     // Note that we compare only the actual paths but not path ids.
3505     // This reason is that same path can appear at different path_ids
3506     // for different column families.
3507     std::sort(paths.begin(), paths.end());
3508     paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
3509 
3510     for (const auto& path : paths) {
3511       if (env->GetChildren(path, &filenames).ok()) {
3512         for (const auto& fname : filenames) {
3513           if (ParseFileName(fname, &number, &type) &&
3514               type == kTableFile) {  // Lock file will be deleted at end
3515             std::string table_path = path + "/" + fname;
3516             Status del = DeleteDBFile(&soptions, table_path, dbname,
3517                                       /*force_bg=*/false, /*force_fg=*/false);
3518             if (result.ok() && !del.ok()) {
3519               result = del;
3520             }
3521           }
3522         }
3523         env->DeleteDir(path);
3524       }
3525     }
3526 
3527     std::vector<std::string> walDirFiles;
3528     std::string archivedir = ArchivalDirectory(dbname);
3529     bool wal_dir_exists = false;
3530     if (dbname != soptions.wal_dir) {
3531       wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
3532       archivedir = ArchivalDirectory(soptions.wal_dir);
3533     }
3534 
3535     // Archive dir may be inside wal dir or dbname and should be
3536     // processed and removed before those otherwise we have issues
3537     // removing them
3538     std::vector<std::string> archiveFiles;
3539     if (env->GetChildren(archivedir, &archiveFiles).ok()) {
3540       // Delete archival files.
3541       for (const auto& file : archiveFiles) {
3542         if (ParseFileName(file, &number, &type) && type == kLogFile) {
3543           Status del =
3544               DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
3545                            /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3546           if (result.ok() && !del.ok()) {
3547             result = del;
3548           }
3549         }
3550       }
3551       env->DeleteDir(archivedir);
3552     }
3553 
3554     // Delete log files in the WAL dir
3555     if (wal_dir_exists) {
3556       for (const auto& file : walDirFiles) {
3557         if (ParseFileName(file, &number, &type) && type == kLogFile) {
3558           Status del =
3559               DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
3560                            soptions.wal_dir, /*force_bg=*/false,
3561                            /*force_fg=*/!wal_in_db_path);
3562           if (result.ok() && !del.ok()) {
3563             result = del;
3564           }
3565         }
3566       }
3567       env->DeleteDir(soptions.wal_dir);
3568     }
3569 
3570     env->UnlockFile(lock);  // Ignore error since state is already gone
3571     env->DeleteFile(lockname);
3572     env->DeleteDir(dbname);  // Ignore error in case dir contains other files
3573   }
3574   return result;
3575 }
3576 
WriteOptionsFile(bool need_mutex_lock,bool need_enter_write_thread)3577 Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
3578                                 bool need_enter_write_thread) {
3579 #ifndef ROCKSDB_LITE
3580   WriteThread::Writer w;
3581   if (need_mutex_lock) {
3582     mutex_.Lock();
3583   } else {
3584     mutex_.AssertHeld();
3585   }
3586   if (need_enter_write_thread) {
3587     write_thread_.EnterUnbatched(&w, &mutex_);
3588   }
3589 
3590   std::vector<std::string> cf_names;
3591   std::vector<ColumnFamilyOptions> cf_opts;
3592 
3593   // This part requires mutex to protect the column family options
3594   for (auto cfd : *versions_->GetColumnFamilySet()) {
3595     if (cfd->IsDropped()) {
3596       continue;
3597     }
3598     cf_names.push_back(cfd->GetName());
3599     cf_opts.push_back(cfd->GetLatestCFOptions());
3600   }
3601 
3602   // Unlock during expensive operations.  New writes cannot get here
3603   // because the single write thread ensures all new writes get queued.
3604   DBOptions db_options =
3605       BuildDBOptions(immutable_db_options_, mutable_db_options_);
3606   mutex_.Unlock();
3607 
3608   TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
3609   TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
3610 
3611   std::string file_name =
3612       TempOptionsFileName(GetName(), versions_->NewFileNumber());
3613   Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
3614                                    GetFileSystem());
3615 
3616   if (s.ok()) {
3617     s = RenameTempFileToOptionsFile(file_name);
3618   }
3619   // restore lock
3620   if (!need_mutex_lock) {
3621     mutex_.Lock();
3622   }
3623   if (need_enter_write_thread) {
3624     write_thread_.ExitUnbatched(&w);
3625   }
3626   if (!s.ok()) {
3627     ROCKS_LOG_WARN(immutable_db_options_.info_log,
3628                    "Unnable to persist options -- %s", s.ToString().c_str());
3629     if (immutable_db_options_.fail_if_options_file_error) {
3630       return Status::IOError("Unable to persist options.",
3631                              s.ToString().c_str());
3632     }
3633   }
3634 #else
3635   (void)need_mutex_lock;
3636   (void)need_enter_write_thread;
3637 #endif  // !ROCKSDB_LITE
3638   return Status::OK();
3639 }
3640 
3641 #ifndef ROCKSDB_LITE
3642 namespace {
DeleteOptionsFilesHelper(const std::map<uint64_t,std::string> & filenames,const size_t num_files_to_keep,const std::shared_ptr<Logger> & info_log,Env * env)3643 void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
3644                               const size_t num_files_to_keep,
3645                               const std::shared_ptr<Logger>& info_log,
3646                               Env* env) {
3647   if (filenames.size() <= num_files_to_keep) {
3648     return;
3649   }
3650   for (auto iter = std::next(filenames.begin(), num_files_to_keep);
3651        iter != filenames.end(); ++iter) {
3652     if (!env->DeleteFile(iter->second).ok()) {
3653       ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
3654                      iter->second.c_str());
3655     }
3656   }
3657 }
3658 }  // namespace
3659 #endif  // !ROCKSDB_LITE
3660 
DeleteObsoleteOptionsFiles()3661 Status DBImpl::DeleteObsoleteOptionsFiles() {
3662 #ifndef ROCKSDB_LITE
3663   std::vector<std::string> filenames;
3664   // use ordered map to store keep the filenames sorted from the newest
3665   // to the oldest.
3666   std::map<uint64_t, std::string> options_filenames;
3667   Status s;
3668   s = GetEnv()->GetChildren(GetName(), &filenames);
3669   if (!s.ok()) {
3670     return s;
3671   }
3672   for (auto& filename : filenames) {
3673     uint64_t file_number;
3674     FileType type;
3675     if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
3676       options_filenames.insert(
3677           {std::numeric_limits<uint64_t>::max() - file_number,
3678            GetName() + "/" + filename});
3679     }
3680   }
3681 
3682   // Keeps the latest 2 Options file
3683   const size_t kNumOptionsFilesKept = 2;
3684   DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
3685                            immutable_db_options_.info_log, GetEnv());
3686   return Status::OK();
3687 #else
3688   return Status::OK();
3689 #endif  // !ROCKSDB_LITE
3690 }
3691 
RenameTempFileToOptionsFile(const std::string & file_name)3692 Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
3693 #ifndef ROCKSDB_LITE
3694   Status s;
3695 
3696   uint64_t options_file_number = versions_->NewFileNumber();
3697   std::string options_file_name =
3698       OptionsFileName(GetName(), options_file_number);
3699   // Retry if the file name happen to conflict with an existing one.
3700   s = GetEnv()->RenameFile(file_name, options_file_name);
3701   if (s.ok()) {
3702     InstrumentedMutexLock l(&mutex_);
3703     versions_->options_file_number_ = options_file_number;
3704   }
3705 
3706   if (0 == disable_delete_obsolete_files_) {
3707     DeleteObsoleteOptionsFiles();
3708   }
3709   return s;
3710 #else
3711   (void)file_name;
3712   return Status::OK();
3713 #endif  // !ROCKSDB_LITE
3714 }
3715 
3716 #ifdef ROCKSDB_USING_THREAD_STATUS
3717 
NewThreadStatusCfInfo(ColumnFamilyData * cfd) const3718 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3719   if (immutable_db_options_.enable_thread_tracking) {
3720     ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
3721                                           cfd->ioptions()->env);
3722   }
3723 }
3724 
EraseThreadStatusCfInfo(ColumnFamilyData * cfd) const3725 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3726   if (immutable_db_options_.enable_thread_tracking) {
3727     ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
3728   }
3729 }
3730 
EraseThreadStatusDbInfo() const3731 void DBImpl::EraseThreadStatusDbInfo() const {
3732   if (immutable_db_options_.enable_thread_tracking) {
3733     ThreadStatusUtil::EraseDatabaseInfo(this);
3734   }
3735 }
3736 
3737 #else
NewThreadStatusCfInfo(ColumnFamilyData *) const3738 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3739 
EraseThreadStatusCfInfo(ColumnFamilyData *) const3740 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3741 
EraseThreadStatusDbInfo() const3742 void DBImpl::EraseThreadStatusDbInfo() const {}
3743 #endif  // ROCKSDB_USING_THREAD_STATUS
3744 
3745 //
3746 // A global method that can dump out the build version
DumpRocksDBBuildVersion(Logger * log)3747 void DumpRocksDBBuildVersion(Logger* log) {
3748 #if !defined(IOS_CROSS_COMPILE)
3749   // if we compile with Xcode, we don't run build_detect_version, so we don't
3750   // generate util/build_version.cc
3751   ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
3752                    ROCKSDB_MINOR, ROCKSDB_PATCH);
3753   ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
3754   ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
3755 #else
3756   (void)log;  // ignore "-Wunused-parameter"
3757 #endif
3758 }
3759 
3760 #ifndef ROCKSDB_LITE
GetEarliestMemTableSequenceNumber(SuperVersion * sv,bool include_history)3761 SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
3762                                                          bool include_history) {
3763   // Find the earliest sequence number that we know we can rely on reading
3764   // from the memtable without needing to check sst files.
3765   SequenceNumber earliest_seq =
3766       sv->imm->GetEarliestSequenceNumber(include_history);
3767   if (earliest_seq == kMaxSequenceNumber) {
3768     earliest_seq = sv->mem->GetEarliestSequenceNumber();
3769   }
3770   assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
3771 
3772   return earliest_seq;
3773 }
3774 #endif  // ROCKSDB_LITE
3775 
3776 #ifndef ROCKSDB_LITE
GetLatestSequenceForKey(SuperVersion * sv,const Slice & key,bool cache_only,SequenceNumber lower_bound_seq,SequenceNumber * seq,bool * found_record_for_key,bool * is_blob_index)3777 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
3778                                        bool cache_only,
3779                                        SequenceNumber lower_bound_seq,
3780                                        SequenceNumber* seq,
3781                                        bool* found_record_for_key,
3782                                        bool* is_blob_index) {
3783   Status s;
3784   MergeContext merge_context;
3785   SequenceNumber max_covering_tombstone_seq = 0;
3786 
3787   ReadOptions read_options;
3788   SequenceNumber current_seq = versions_->LastSequence();
3789   LookupKey lkey(key, current_seq);
3790 
3791   *seq = kMaxSequenceNumber;
3792   *found_record_for_key = false;
3793 
3794   // Check if there is a record for this key in the latest memtable
3795   sv->mem->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
3796                seq, read_options, nullptr /*read_callback*/, is_blob_index);
3797 
3798   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3799     // unexpected error reading memtable.
3800     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3801                     "Unexpected status returned from MemTable::Get: %s\n",
3802                     s.ToString().c_str());
3803 
3804     return s;
3805   }
3806 
3807   if (*seq != kMaxSequenceNumber) {
3808     // Found a sequence number, no need to check immutable memtables
3809     *found_record_for_key = true;
3810     return Status::OK();
3811   }
3812 
3813   SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
3814   if (lower_bound_in_mem != kMaxSequenceNumber &&
3815       lower_bound_in_mem < lower_bound_seq) {
3816     *found_record_for_key = false;
3817     return Status::OK();
3818   }
3819 
3820   // Check if there is a record for this key in the immutable memtables
3821   sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
3822                seq, read_options, nullptr /*read_callback*/, is_blob_index);
3823 
3824   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3825     // unexpected error reading memtable.
3826     ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3827                     "Unexpected status returned from MemTableList::Get: %s\n",
3828                     s.ToString().c_str());
3829 
3830     return s;
3831   }
3832 
3833   if (*seq != kMaxSequenceNumber) {
3834     // Found a sequence number, no need to check memtable history
3835     *found_record_for_key = true;
3836     return Status::OK();
3837   }
3838 
3839   SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
3840   if (lower_bound_in_imm != kMaxSequenceNumber &&
3841       lower_bound_in_imm < lower_bound_seq) {
3842     *found_record_for_key = false;
3843     return Status::OK();
3844   }
3845 
3846   // Check if there is a record for this key in the immutable memtables
3847   sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context,
3848                           &max_covering_tombstone_seq, seq, read_options,
3849                           is_blob_index);
3850 
3851   if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3852     // unexpected error reading memtable.
3853     ROCKS_LOG_ERROR(
3854         immutable_db_options_.info_log,
3855         "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
3856         s.ToString().c_str());
3857 
3858     return s;
3859   }
3860 
3861   if (*seq != kMaxSequenceNumber) {
3862     // Found a sequence number, no need to check SST files
3863     *found_record_for_key = true;
3864     return Status::OK();
3865   }
3866 
3867   // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
3868   // check here to skip the history if possible. But currently the caller
3869   // already does that. Maybe we should move the logic here later.
3870 
3871   // TODO(agiardullo): possible optimization: consider checking cached
3872   // SST files if cache_only=true?
3873   if (!cache_only) {
3874     // Check tables
3875     sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
3876                      &max_covering_tombstone_seq, nullptr /* value_found */,
3877                      found_record_for_key, seq, nullptr /*read_callback*/,
3878                      is_blob_index);
3879 
3880     if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3881       // unexpected error reading SST files
3882       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3883                       "Unexpected status returned from Version::Get: %s\n",
3884                       s.ToString().c_str());
3885     }
3886   }
3887 
3888   return s;
3889 }
3890 
IngestExternalFile(ColumnFamilyHandle * column_family,const std::vector<std::string> & external_files,const IngestExternalFileOptions & ingestion_options)3891 Status DBImpl::IngestExternalFile(
3892     ColumnFamilyHandle* column_family,
3893     const std::vector<std::string>& external_files,
3894     const IngestExternalFileOptions& ingestion_options) {
3895   IngestExternalFileArg arg;
3896   arg.column_family = column_family;
3897   arg.external_files = external_files;
3898   arg.options = ingestion_options;
3899   return IngestExternalFiles({arg});
3900 }
3901 
IngestExternalFiles(const std::vector<IngestExternalFileArg> & args)3902 Status DBImpl::IngestExternalFiles(
3903     const std::vector<IngestExternalFileArg>& args) {
3904   if (args.empty()) {
3905     return Status::InvalidArgument("ingestion arg list is empty");
3906   }
3907   {
3908     std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
3909     for (const auto& arg : args) {
3910       if (arg.column_family == nullptr) {
3911         return Status::InvalidArgument("column family handle is null");
3912       } else if (unique_cfhs.count(arg.column_family) > 0) {
3913         return Status::InvalidArgument(
3914             "ingestion args have duplicate column families");
3915       }
3916       unique_cfhs.insert(arg.column_family);
3917     }
3918   }
3919   // Ingest multiple external SST files atomically.
3920   size_t num_cfs = args.size();
3921   for (size_t i = 0; i != num_cfs; ++i) {
3922     if (args[i].external_files.empty()) {
3923       char err_msg[128] = {0};
3924       snprintf(err_msg, 128, "external_files[%zu] is empty", i);
3925       return Status::InvalidArgument(err_msg);
3926     }
3927   }
3928   for (const auto& arg : args) {
3929     const IngestExternalFileOptions& ingest_opts = arg.options;
3930     if (ingest_opts.ingest_behind &&
3931         !immutable_db_options_.allow_ingest_behind) {
3932       return Status::InvalidArgument(
3933           "can't ingest_behind file in DB with allow_ingest_behind=false");
3934     }
3935   }
3936 
3937   // TODO (yanqin) maybe handle the case in which column_families have
3938   // duplicates
3939   std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
3940   size_t total = 0;
3941   for (const auto& arg : args) {
3942     total += arg.external_files.size();
3943   }
3944   uint64_t next_file_number = 0;
3945   Status status = ReserveFileNumbersBeforeIngestion(
3946       static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
3947       pending_output_elem, &next_file_number);
3948   if (!status.ok()) {
3949     InstrumentedMutexLock l(&mutex_);
3950     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
3951     return status;
3952   }
3953 
3954   std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
3955   for (const auto& arg : args) {
3956     auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
3957     ingestion_jobs.emplace_back(
3958         env_, versions_.get(), cfd, immutable_db_options_, file_options_,
3959         &snapshots_, arg.options, &directories_, &event_logger_);
3960   }
3961   std::vector<std::pair<bool, Status>> exec_results;
3962   for (size_t i = 0; i != num_cfs; ++i) {
3963     exec_results.emplace_back(false, Status::OK());
3964   }
3965   // TODO(yanqin) maybe make jobs run in parallel
3966   uint64_t start_file_number = next_file_number;
3967   for (size_t i = 1; i != num_cfs; ++i) {
3968     start_file_number += args[i - 1].external_files.size();
3969     auto* cfd =
3970         static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3971     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
3972     exec_results[i].second = ingestion_jobs[i].Prepare(
3973         args[i].external_files, start_file_number, super_version);
3974     exec_results[i].first = true;
3975     CleanupSuperVersion(super_version);
3976   }
3977   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
3978   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
3979   {
3980     auto* cfd =
3981         static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
3982     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
3983     exec_results[0].second = ingestion_jobs[0].Prepare(
3984         args[0].external_files, next_file_number, super_version);
3985     exec_results[0].first = true;
3986     CleanupSuperVersion(super_version);
3987   }
3988   for (const auto& exec_result : exec_results) {
3989     if (!exec_result.second.ok()) {
3990       status = exec_result.second;
3991       break;
3992     }
3993   }
3994   if (!status.ok()) {
3995     for (size_t i = 0; i != num_cfs; ++i) {
3996       if (exec_results[i].first) {
3997         ingestion_jobs[i].Cleanup(status);
3998       }
3999     }
4000     InstrumentedMutexLock l(&mutex_);
4001     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4002     return status;
4003   }
4004 
4005   std::vector<SuperVersionContext> sv_ctxs;
4006   for (size_t i = 0; i != num_cfs; ++i) {
4007     sv_ctxs.emplace_back(true /* create_superversion */);
4008   }
4009   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4010   TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
4011   TEST_SYNC_POINT("DBImpl::AddFile:Start");
4012   {
4013     InstrumentedMutexLock l(&mutex_);
4014     TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4015 
4016     // Stop writes to the DB by entering both write threads
4017     WriteThread::Writer w;
4018     write_thread_.EnterUnbatched(&w, &mutex_);
4019     WriteThread::Writer nonmem_w;
4020     if (two_write_queues_) {
4021       nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4022     }
4023 
4024     // When unordered_write is enabled, the keys are writing to memtable in an
4025     // unordered way. If the ingestion job checks memtable key range before the
4026     // key landing in memtable, the ingestion job may skip the necessary
4027     // memtable flush.
4028     // So wait here to ensure there is no pending write to memtable.
4029     WaitForPendingWrites();
4030 
4031     num_running_ingest_file_ += static_cast<int>(num_cfs);
4032     TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
4033 
4034     bool at_least_one_cf_need_flush = false;
4035     std::vector<bool> need_flush(num_cfs, false);
4036     for (size_t i = 0; i != num_cfs; ++i) {
4037       auto* cfd =
4038           static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4039       if (cfd->IsDropped()) {
4040         // TODO (yanqin) investigate whether we should abort ingestion or
4041         // proceed with other non-dropped column families.
4042         status = Status::InvalidArgument(
4043             "cannot ingest an external file into a dropped CF");
4044         break;
4045       }
4046       bool tmp = false;
4047       status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4048       need_flush[i] = tmp;
4049       at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4050       if (!status.ok()) {
4051         break;
4052       }
4053     }
4054     TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4055                              &at_least_one_cf_need_flush);
4056 
4057     if (status.ok() && at_least_one_cf_need_flush) {
4058       FlushOptions flush_opts;
4059       flush_opts.allow_write_stall = true;
4060       if (immutable_db_options_.atomic_flush) {
4061         autovector<ColumnFamilyData*> cfds_to_flush;
4062         SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
4063         mutex_.Unlock();
4064         status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4065                                       FlushReason::kExternalFileIngestion,
4066                                       true /* writes_stopped */);
4067         mutex_.Lock();
4068       } else {
4069         for (size_t i = 0; i != num_cfs; ++i) {
4070           if (need_flush[i]) {
4071             mutex_.Unlock();
4072             auto* cfd =
4073                 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4074                     ->cfd();
4075             status = FlushMemTable(cfd, flush_opts,
4076                                    FlushReason::kExternalFileIngestion,
4077                                    true /* writes_stopped */);
4078             mutex_.Lock();
4079             if (!status.ok()) {
4080               break;
4081             }
4082           }
4083         }
4084       }
4085     }
4086     // Run ingestion jobs.
4087     if (status.ok()) {
4088       for (size_t i = 0; i != num_cfs; ++i) {
4089         status = ingestion_jobs[i].Run();
4090         if (!status.ok()) {
4091           break;
4092         }
4093       }
4094     }
4095     if (status.ok()) {
4096       int consumed_seqno_count =
4097           ingestion_jobs[0].ConsumedSequenceNumbersCount();
4098 #ifndef NDEBUG
4099       for (size_t i = 1; i != num_cfs; ++i) {
4100         assert(!!consumed_seqno_count ==
4101                !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4102         consumed_seqno_count +=
4103             ingestion_jobs[i].ConsumedSequenceNumbersCount();
4104       }
4105 #endif
4106       if (consumed_seqno_count > 0) {
4107         const SequenceNumber last_seqno = versions_->LastSequence();
4108         versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4109         versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4110         versions_->SetLastSequence(last_seqno + consumed_seqno_count);
4111       }
4112       autovector<ColumnFamilyData*> cfds_to_commit;
4113       autovector<const MutableCFOptions*> mutable_cf_options_list;
4114       autovector<autovector<VersionEdit*>> edit_lists;
4115       uint32_t num_entries = 0;
4116       for (size_t i = 0; i != num_cfs; ++i) {
4117         auto* cfd =
4118             static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4119         if (cfd->IsDropped()) {
4120           continue;
4121         }
4122         cfds_to_commit.push_back(cfd);
4123         mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4124         autovector<VersionEdit*> edit_list;
4125         edit_list.push_back(ingestion_jobs[i].edit());
4126         edit_lists.push_back(edit_list);
4127         ++num_entries;
4128       }
4129       // Mark the version edits as an atomic group if the number of version
4130       // edits exceeds 1.
4131       if (cfds_to_commit.size() > 1) {
4132         for (auto& edits : edit_lists) {
4133           assert(edits.size() == 1);
4134           edits[0]->MarkAtomicGroup(--num_entries);
4135         }
4136         assert(0 == num_entries);
4137       }
4138       status =
4139           versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4140                                  edit_lists, &mutex_, directories_.GetDbDir());
4141     }
4142 
4143     if (status.ok()) {
4144       for (size_t i = 0; i != num_cfs; ++i) {
4145         auto* cfd =
4146             static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4147         if (!cfd->IsDropped()) {
4148           InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4149                                              *cfd->GetLatestMutableCFOptions());
4150 #ifndef NDEBUG
4151           if (0 == i && num_cfs > 1) {
4152             TEST_SYNC_POINT(
4153                 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4154             TEST_SYNC_POINT(
4155                 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4156           }
4157 #endif  // !NDEBUG
4158         }
4159       }
4160     }
4161 
4162     // Resume writes to the DB
4163     if (two_write_queues_) {
4164       nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4165     }
4166     write_thread_.ExitUnbatched(&w);
4167 
4168     if (status.ok()) {
4169       for (auto& job : ingestion_jobs) {
4170         job.UpdateStats();
4171       }
4172     }
4173     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4174     num_running_ingest_file_ -= static_cast<int>(num_cfs);
4175     if (0 == num_running_ingest_file_) {
4176       bg_cv_.SignalAll();
4177     }
4178     TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4179   }
4180   // mutex_ is unlocked here
4181 
4182   // Cleanup
4183   for (size_t i = 0; i != num_cfs; ++i) {
4184     sv_ctxs[i].Clean();
4185     // This may rollback jobs that have completed successfully. This is
4186     // intended for atomicity.
4187     ingestion_jobs[i].Cleanup(status);
4188   }
4189   if (status.ok()) {
4190     for (size_t i = 0; i != num_cfs; ++i) {
4191       auto* cfd =
4192           static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4193       if (!cfd->IsDropped()) {
4194         NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4195       }
4196     }
4197   }
4198   return status;
4199 }
4200 
CreateColumnFamilyWithImport(const ColumnFamilyOptions & options,const std::string & column_family_name,const ImportColumnFamilyOptions & import_options,const ExportImportFilesMetaData & metadata,ColumnFamilyHandle ** handle)4201 Status DBImpl::CreateColumnFamilyWithImport(
4202     const ColumnFamilyOptions& options, const std::string& column_family_name,
4203     const ImportColumnFamilyOptions& import_options,
4204     const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4205   assert(handle != nullptr);
4206   assert(*handle == nullptr);
4207   std::string cf_comparator_name = options.comparator->Name();
4208   if (cf_comparator_name != metadata.db_comparator_name) {
4209     return Status::InvalidArgument("Comparator name mismatch");
4210   }
4211 
4212   // Create column family.
4213   auto status = CreateColumnFamily(options, column_family_name, handle);
4214   if (!status.ok()) {
4215     return status;
4216   }
4217 
4218   // Import sst files from metadata.
4219   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
4220   auto cfd = cfh->cfd();
4221   ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
4222                                    immutable_db_options_, file_options_,
4223                                    import_options, metadata.files);
4224 
4225   SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4226   VersionEdit dummy_edit;
4227   uint64_t next_file_number = 0;
4228   std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4229   {
4230     // Lock db mutex
4231     InstrumentedMutexLock l(&mutex_);
4232     if (error_handler_.IsDBStopped()) {
4233       // Don't import files when there is a bg_error
4234       status = error_handler_.GetBGError();
4235     }
4236 
4237     // Make sure that bg cleanup wont delete the files that we are importing
4238     pending_output_elem.reset(new std::list<uint64_t>::iterator(
4239         CaptureCurrentFileNumberInPendingOutputs()));
4240 
4241     if (status.ok()) {
4242       // If crash happen after a hard link established, Recover function may
4243       // reuse the file number that has already assigned to the internal file,
4244       // and this will overwrite the external file. To protect the external
4245       // file, we have to make sure the file number will never being reused.
4246       next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4247       auto cf_options = cfd->GetLatestMutableCFOptions();
4248       status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4249                                       directories_.GetDbDir());
4250       if (status.ok()) {
4251         InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4252       }
4253     }
4254   }
4255   dummy_sv_ctx.Clean();
4256 
4257   if (status.ok()) {
4258     SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4259     status = import_job.Prepare(next_file_number, sv);
4260     CleanupSuperVersion(sv);
4261   }
4262 
4263   if (status.ok()) {
4264     SuperVersionContext sv_context(true /*create_superversion*/);
4265     {
4266       // Lock db mutex
4267       InstrumentedMutexLock l(&mutex_);
4268 
4269       // Stop writes to the DB by entering both write threads
4270       WriteThread::Writer w;
4271       write_thread_.EnterUnbatched(&w, &mutex_);
4272       WriteThread::Writer nonmem_w;
4273       if (two_write_queues_) {
4274         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4275       }
4276 
4277       num_running_ingest_file_++;
4278       assert(!cfd->IsDropped());
4279       status = import_job.Run();
4280 
4281       // Install job edit [Mutex will be unlocked here]
4282       if (status.ok()) {
4283         auto cf_options = cfd->GetLatestMutableCFOptions();
4284         status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4285                                         &mutex_, directories_.GetDbDir());
4286         if (status.ok()) {
4287           InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4288         }
4289       }
4290 
4291       // Resume writes to the DB
4292       if (two_write_queues_) {
4293         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4294       }
4295       write_thread_.ExitUnbatched(&w);
4296 
4297       num_running_ingest_file_--;
4298       if (num_running_ingest_file_ == 0) {
4299         bg_cv_.SignalAll();
4300       }
4301     }
4302     // mutex_ is unlocked here
4303 
4304     sv_context.Clean();
4305   }
4306 
4307   {
4308     InstrumentedMutexLock l(&mutex_);
4309     ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4310   }
4311 
4312   import_job.Cleanup(status);
4313   if (!status.ok()) {
4314     DropColumnFamily(*handle);
4315     DestroyColumnFamilyHandle(*handle);
4316     *handle = nullptr;
4317   }
4318   return status;
4319 }
4320 
VerifyChecksum(const ReadOptions & read_options)4321 Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
4322   Status s;
4323   std::vector<ColumnFamilyData*> cfd_list;
4324   {
4325     InstrumentedMutexLock l(&mutex_);
4326     for (auto cfd : *versions_->GetColumnFamilySet()) {
4327       if (!cfd->IsDropped() && cfd->initialized()) {
4328         cfd->Ref();
4329         cfd_list.push_back(cfd);
4330       }
4331     }
4332   }
4333   std::vector<SuperVersion*> sv_list;
4334   for (auto cfd : cfd_list) {
4335     sv_list.push_back(cfd->GetReferencedSuperVersion(this));
4336   }
4337   for (auto& sv : sv_list) {
4338     VersionStorageInfo* vstorage = sv->current->storage_info();
4339     ColumnFamilyData* cfd = sv->current->cfd();
4340     Options opts;
4341     {
4342       InstrumentedMutexLock l(&mutex_);
4343       opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4344                      cfd->GetLatestCFOptions());
4345     }
4346     for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4347       for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4348            j++) {
4349         const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
4350         std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4351                                           fd.GetNumber(), fd.GetPathId());
4352         s = rocksdb::VerifySstFileChecksum(opts, file_options_, read_options,
4353                                            fname);
4354       }
4355     }
4356     if (!s.ok()) {
4357       break;
4358     }
4359   }
4360   bool defer_purge =
4361           immutable_db_options().avoid_unnecessary_blocking_io;
4362   {
4363     InstrumentedMutexLock l(&mutex_);
4364     for (auto sv : sv_list) {
4365       if (sv && sv->Unref()) {
4366         sv->Cleanup();
4367         if (defer_purge) {
4368           AddSuperVersionsToFreeQueue(sv);
4369         } else {
4370           delete sv;
4371         }
4372       }
4373     }
4374     if (defer_purge) {
4375       SchedulePurge();
4376     }
4377     for (auto cfd : cfd_list) {
4378       cfd->UnrefAndTryDelete();
4379     }
4380   }
4381   return s;
4382 }
4383 
NotifyOnExternalFileIngested(ColumnFamilyData * cfd,const ExternalSstFileIngestionJob & ingestion_job)4384 void DBImpl::NotifyOnExternalFileIngested(
4385     ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
4386   if (immutable_db_options_.listeners.empty()) {
4387     return;
4388   }
4389 
4390   for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4391     ExternalFileIngestionInfo info;
4392     info.cf_name = cfd->GetName();
4393     info.external_file_path = f.external_file_path;
4394     info.internal_file_path = f.internal_file_path;
4395     info.global_seqno = f.assigned_seqno;
4396     info.table_properties = f.table_properties;
4397     for (auto listener : immutable_db_options_.listeners) {
4398       listener->OnExternalFileIngested(this, info);
4399     }
4400   }
4401 }
4402 
WaitForIngestFile()4403 void DBImpl::WaitForIngestFile() {
4404   mutex_.AssertHeld();
4405   while (num_running_ingest_file_ > 0) {
4406     bg_cv_.Wait();
4407   }
4408 }
4409 
StartTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4410 Status DBImpl::StartTrace(const TraceOptions& trace_options,
4411                           std::unique_ptr<TraceWriter>&& trace_writer) {
4412   InstrumentedMutexLock lock(&trace_mutex_);
4413   tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
4414   return Status::OK();
4415 }
4416 
EndTrace()4417 Status DBImpl::EndTrace() {
4418   InstrumentedMutexLock lock(&trace_mutex_);
4419   Status s;
4420   if (tracer_ != nullptr) {
4421     s = tracer_->Close();
4422     tracer_.reset();
4423   } else {
4424     return Status::IOError("No trace file to close");
4425   }
4426   return s;
4427 }
4428 
StartBlockCacheTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4429 Status DBImpl::StartBlockCacheTrace(
4430     const TraceOptions& trace_options,
4431     std::unique_ptr<TraceWriter>&& trace_writer) {
4432   return block_cache_tracer_.StartTrace(env_, trace_options,
4433                                         std::move(trace_writer));
4434 }
4435 
EndBlockCacheTrace()4436 Status DBImpl::EndBlockCacheTrace() {
4437   block_cache_tracer_.EndTrace();
4438   return Status::OK();
4439 }
4440 
TraceIteratorSeek(const uint32_t & cf_id,const Slice & key)4441 Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
4442   Status s;
4443   if (tracer_) {
4444     InstrumentedMutexLock lock(&trace_mutex_);
4445     if (tracer_) {
4446       s = tracer_->IteratorSeek(cf_id, key);
4447     }
4448   }
4449   return s;
4450 }
4451 
TraceIteratorSeekForPrev(const uint32_t & cf_id,const Slice & key)4452 Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
4453                                         const Slice& key) {
4454   Status s;
4455   if (tracer_) {
4456     InstrumentedMutexLock lock(&trace_mutex_);
4457     if (tracer_) {
4458       s = tracer_->IteratorSeekForPrev(cf_id, key);
4459     }
4460   }
4461   return s;
4462 }
4463 
ReserveFileNumbersBeforeIngestion(ColumnFamilyData * cfd,uint64_t num,std::unique_ptr<std::list<uint64_t>::iterator> & pending_output_elem,uint64_t * next_file_number)4464 Status DBImpl::ReserveFileNumbersBeforeIngestion(
4465     ColumnFamilyData* cfd, uint64_t num,
4466     std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
4467     uint64_t* next_file_number) {
4468   Status s;
4469   SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
4470   assert(nullptr != next_file_number);
4471   InstrumentedMutexLock l(&mutex_);
4472   if (error_handler_.IsDBStopped()) {
4473     // Do not ingest files when there is a bg_error
4474     return error_handler_.GetBGError();
4475   }
4476   pending_output_elem.reset(new std::list<uint64_t>::iterator(
4477       CaptureCurrentFileNumberInPendingOutputs()));
4478   *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
4479   auto cf_options = cfd->GetLatestMutableCFOptions();
4480   VersionEdit dummy_edit;
4481   // If crash happen after a hard link established, Recover function may
4482   // reuse the file number that has already assigned to the internal file,
4483   // and this will overwrite the external file. To protect the external
4484   // file, we have to make sure the file number will never being reused.
4485   s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4486                              directories_.GetDbDir());
4487   if (s.ok()) {
4488     InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4489   }
4490   dummy_sv_ctx.Clean();
4491   return s;
4492 }
4493 
GetCreationTimeOfOldestFile(uint64_t * creation_time)4494 Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
4495   if (mutable_db_options_.max_open_files == -1) {
4496     uint64_t oldest_time = port::kMaxUint64;
4497     for (auto cfd : *versions_->GetColumnFamilySet()) {
4498       if (!cfd->IsDropped()) {
4499         uint64_t ctime;
4500         {
4501           SuperVersion* sv = GetAndRefSuperVersion(cfd);
4502           Version* version = sv->current;
4503           version->GetCreationTimeOfOldestFile(&ctime);
4504           ReturnAndCleanupSuperVersion(cfd, sv);
4505         }
4506 
4507         if (ctime < oldest_time) {
4508           oldest_time = ctime;
4509         }
4510         if (oldest_time == 0) {
4511           break;
4512         }
4513       }
4514     }
4515     *creation_time = oldest_time;
4516     return Status::OK();
4517   } else {
4518     return Status::NotSupported("This API only works if max_open_files = -1");
4519   }
4520 }
4521 #endif  // ROCKSDB_LITE
4522 
4523 }  // namespace rocksdb
4524