1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10
11 #include <stdint.h>
12 #ifdef OS_SOLARIS
13 #include <alloca.h>
14 #endif
15
16 #include <algorithm>
17 #include <cinttypes>
18 #include <cstdio>
19 #include <map>
20 #include <set>
21 #include <stdexcept>
22 #include <string>
23 #include <unordered_map>
24 #include <unordered_set>
25 #include <utility>
26 #include <vector>
27
28 #include "db/arena_wrapped_db_iter.h"
29 #include "db/builder.h"
30 #include "db/compaction/compaction_job.h"
31 #include "db/db_info_dumper.h"
32 #include "db/db_iter.h"
33 #include "db/dbformat.h"
34 #include "db/error_handler.h"
35 #include "db/event_helpers.h"
36 #include "db/external_sst_file_ingestion_job.h"
37 #include "db/flush_job.h"
38 #include "db/forward_iterator.h"
39 #include "db/import_column_family_job.h"
40 #include "db/job_context.h"
41 #include "db/log_reader.h"
42 #include "db/log_writer.h"
43 #include "db/malloc_stats.h"
44 #include "db/memtable.h"
45 #include "db/memtable_list.h"
46 #include "db/merge_context.h"
47 #include "db/merge_helper.h"
48 #include "db/range_tombstone_fragmenter.h"
49 #include "db/table_cache.h"
50 #include "db/table_properties_collector.h"
51 #include "db/transaction_log_impl.h"
52 #include "db/version_set.h"
53 #include "db/write_batch_internal.h"
54 #include "db/write_callback.h"
55 #include "env/composite_env_wrapper.h"
56 #include "file/file_util.h"
57 #include "file/filename.h"
58 #include "file/random_access_file_reader.h"
59 #include "file/sst_file_manager_impl.h"
60 #include "logging/auto_roll_logger.h"
61 #include "logging/log_buffer.h"
62 #include "logging/logging.h"
63 #include "memtable/hash_linklist_rep.h"
64 #include "memtable/hash_skiplist_rep.h"
65 #include "monitoring/in_memory_stats_history.h"
66 #include "monitoring/iostats_context_imp.h"
67 #include "monitoring/perf_context_imp.h"
68 #include "monitoring/persistent_stats_history.h"
69 #include "monitoring/thread_status_updater.h"
70 #include "monitoring/thread_status_util.h"
71 #include "options/cf_options.h"
72 #include "options/options_helper.h"
73 #include "options/options_parser.h"
74 #include "port/port.h"
75 #include "rocksdb/cache.h"
76 #include "rocksdb/compaction_filter.h"
77 #include "rocksdb/convenience.h"
78 #include "rocksdb/db.h"
79 #include "rocksdb/env.h"
80 #include "rocksdb/merge_operator.h"
81 #include "rocksdb/statistics.h"
82 #include "rocksdb/stats_history.h"
83 #include "rocksdb/status.h"
84 #include "rocksdb/table.h"
85 #include "rocksdb/write_buffer_manager.h"
86 #include "table/block_based/block.h"
87 #include "table/block_based/block_based_table_factory.h"
88 #include "table/get_context.h"
89 #include "table/merging_iterator.h"
90 #include "table/multiget_context.h"
91 #include "table/table_builder.h"
92 #include "table/two_level_iterator.h"
93 #include "test_util/sync_point.h"
94 #include "tools/sst_dump_tool_imp.h"
95 #include "util/autovector.h"
96 #include "util/build_version.h"
97 #include "util/cast_util.h"
98 #include "util/coding.h"
99 #include "util/compression.h"
100 #include "util/crc32c.h"
101 #include "util/mutexlock.h"
102 #include "util/stop_watch.h"
103 #include "util/string_util.h"
104
105 namespace rocksdb {
106
107 const std::string kDefaultColumnFamilyName("default");
108 const std::string kPersistentStatsColumnFamilyName(
109 "___rocksdb_stats_history___");
110 void DumpRocksDBBuildVersion(Logger* log);
111
GetCompressionFlush(const ImmutableCFOptions & ioptions,const MutableCFOptions & mutable_cf_options)112 CompressionType GetCompressionFlush(
113 const ImmutableCFOptions& ioptions,
114 const MutableCFOptions& mutable_cf_options) {
115 // Compressing memtable flushes might not help unless the sequential load
116 // optimization is used for leveled compaction. Otherwise the CPU and
117 // latency overhead is not offset by saving much space.
118 if (ioptions.compaction_style == kCompactionStyleUniversal) {
119 if (mutable_cf_options.compaction_options_universal
120 .compression_size_percent < 0) {
121 return mutable_cf_options.compression;
122 } else {
123 return kNoCompression;
124 }
125 } else if (!ioptions.compression_per_level.empty()) {
126 // For leveled compress when min_level_to_compress != 0.
127 return ioptions.compression_per_level[0];
128 } else {
129 return mutable_cf_options.compression;
130 }
131 }
132
133 namespace {
DumpSupportInfo(Logger * logger)134 void DumpSupportInfo(Logger* logger) {
135 ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
136 for (auto& compression : OptionsHelper::compression_type_string_map) {
137 if (compression.second != kNoCompression &&
138 compression.second != kDisableCompressionOption) {
139 ROCKS_LOG_HEADER(logger, "\t%s supported: %d", compression.first.c_str(),
140 CompressionTypeSupported(compression.second));
141 }
142 }
143 ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
144 crc32c::IsFastCrc32Supported().c_str());
145 }
146 } // namespace
147
DBImpl(const DBOptions & options,const std::string & dbname,const bool seq_per_batch,const bool batch_per_txn)148 DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
149 const bool seq_per_batch, const bool batch_per_txn)
150 : dbname_(dbname),
151 own_info_log_(options.info_log == nullptr),
152 initial_db_options_(SanitizeOptions(dbname, options)),
153 env_(initial_db_options_.env),
154 fs_(initial_db_options_.file_system),
155 immutable_db_options_(initial_db_options_),
156 mutable_db_options_(initial_db_options_),
157 stats_(immutable_db_options_.statistics.get()),
158 mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
159 immutable_db_options_.use_adaptive_mutex),
160 default_cf_handle_(nullptr),
161 max_total_in_memory_state_(0),
162 file_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
163 file_options_for_compaction_(fs_->OptimizeForCompactionTableWrite(
164 file_options_, immutable_db_options_)),
165 seq_per_batch_(seq_per_batch),
166 batch_per_txn_(batch_per_txn),
167 db_lock_(nullptr),
168 shutting_down_(false),
169 manual_compaction_paused_(false),
170 bg_cv_(&mutex_),
171 logfile_number_(0),
172 log_dir_synced_(false),
173 log_empty_(true),
174 persist_stats_cf_handle_(nullptr),
175 log_sync_cv_(&mutex_),
176 total_log_size_(0),
177 is_snapshot_supported_(true),
178 write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
179 write_thread_(immutable_db_options_),
180 nonmem_write_thread_(immutable_db_options_),
181 write_controller_(mutable_db_options_.delayed_write_rate),
182 last_batch_group_size_(0),
183 unscheduled_flushes_(0),
184 unscheduled_compactions_(0),
185 bg_bottom_compaction_scheduled_(0),
186 bg_compaction_scheduled_(0),
187 num_running_compactions_(0),
188 bg_flush_scheduled_(0),
189 num_running_flushes_(0),
190 bg_purge_scheduled_(0),
191 disable_delete_obsolete_files_(0),
192 pending_purge_obsolete_files_(0),
193 delete_obsolete_files_last_run_(env_->NowMicros()),
194 last_stats_dump_time_microsec_(0),
195 next_job_id_(1),
196 has_unpersisted_data_(false),
197 unable_to_release_oldest_log_(false),
198 num_running_ingest_file_(0),
199 #ifndef ROCKSDB_LITE
200 wal_manager_(immutable_db_options_, file_options_, seq_per_batch),
201 #endif // ROCKSDB_LITE
202 event_logger_(immutable_db_options_.info_log.get()),
203 bg_work_paused_(0),
204 bg_compaction_paused_(0),
205 refitting_level_(false),
206 opened_successfully_(false),
207 two_write_queues_(options.two_write_queues),
208 manual_wal_flush_(options.manual_wal_flush),
209 // last_sequencee_ is always maintained by the main queue that also writes
210 // to the memtable. When two_write_queues_ is disabled last seq in
211 // memtable is the same as last seq published to the readers. When it is
212 // enabled but seq_per_batch_ is disabled, last seq in memtable still
213 // indicates last published seq since wal-only writes that go to the 2nd
214 // queue do not consume a sequence number. Otherwise writes performed by
215 // the 2nd queue could change what is visible to the readers. In this
216 // cases, last_seq_same_as_publish_seq_==false, the 2nd queue maintains a
217 // separate variable to indicate the last published sequence.
218 last_seq_same_as_publish_seq_(
219 !(seq_per_batch && options.two_write_queues)),
220 // Since seq_per_batch_ is currently set only by WritePreparedTxn which
221 // requires a custom gc for compaction, we use that to set use_custom_gc_
222 // as well.
223 use_custom_gc_(seq_per_batch),
224 shutdown_initiated_(false),
225 own_sfm_(options.sst_file_manager == nullptr),
226 preserve_deletes_(options.preserve_deletes),
227 closed_(false),
228 error_handler_(this, immutable_db_options_, &mutex_),
229 atomic_flush_install_cv_(&mutex_) {
230 // !batch_per_trx_ implies seq_per_batch_ because it is only unset for
231 // WriteUnprepared, which should use seq_per_batch_.
232 assert(batch_per_txn_ || seq_per_batch_);
233 env_->GetAbsolutePath(dbname, &db_absolute_path_);
234
235 // Reserve ten files or so for other uses and give the rest to TableCache.
236 // Give a large number for setting of "infinite" open files.
237 const int table_cache_size = (mutable_db_options_.max_open_files == -1)
238 ? TableCache::kInfiniteCapacity
239 : mutable_db_options_.max_open_files - 10;
240 LRUCacheOptions co;
241 co.capacity = table_cache_size;
242 co.num_shard_bits = immutable_db_options_.table_cache_numshardbits;
243 co.metadata_charge_policy = kDontChargeCacheMetadata;
244 table_cache_ = NewLRUCache(co);
245
246 versions_.reset(new VersionSet(dbname_, &immutable_db_options_, file_options_,
247 table_cache_.get(), write_buffer_manager_,
248 &write_controller_, &block_cache_tracer_));
249 column_family_memtables_.reset(
250 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
251
252 DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
253 DumpDBFileSummary(immutable_db_options_, dbname_);
254 immutable_db_options_.Dump(immutable_db_options_.info_log.get());
255 mutable_db_options_.Dump(immutable_db_options_.info_log.get());
256 DumpSupportInfo(immutable_db_options_.info_log.get());
257
258 // always open the DB with 0 here, which means if preserve_deletes_==true
259 // we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
260 // is called by client and this seqnum is advanced.
261 preserve_deletes_seqnum_.store(0);
262 }
263
Resume()264 Status DBImpl::Resume() {
265 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
266
267 InstrumentedMutexLock db_mutex(&mutex_);
268
269 if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
270 // Nothing to do
271 return Status::OK();
272 }
273
274 if (error_handler_.IsRecoveryInProgress()) {
275 // Don't allow a mix of manual and automatic recovery
276 return Status::Busy();
277 }
278
279 mutex_.Unlock();
280 Status s = error_handler_.RecoverFromBGError(true);
281 mutex_.Lock();
282 return s;
283 }
284
285 // This function implements the guts of recovery from a background error. It
286 // is eventually called for both manual as well as automatic recovery. It does
287 // the following -
288 // 1. Wait for currently scheduled background flush/compaction to exit, in
289 // order to inadvertently causing an error and thinking recovery failed
290 // 2. Flush memtables if there's any data for all the CFs. This may result
291 // another error, which will be saved by error_handler_ and reported later
292 // as the recovery status
293 // 3. Find and delete any obsolete files
294 // 4. Schedule compactions if needed for all the CFs. This is needed as the
295 // flush in the prior step might have been a no-op for some CFs, which
296 // means a new super version wouldn't have been installed
ResumeImpl()297 Status DBImpl::ResumeImpl() {
298 mutex_.AssertHeld();
299 WaitForBackgroundWork();
300
301 Status bg_error = error_handler_.GetBGError();
302 Status s;
303 if (shutdown_initiated_) {
304 // Returning shutdown status to SFM during auto recovery will cause it
305 // to abort the recovery and allow the shutdown to progress
306 s = Status::ShutdownInProgress();
307 }
308 if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
309 ROCKS_LOG_INFO(
310 immutable_db_options_.info_log,
311 "DB resume requested but failed due to Fatal/Unrecoverable error");
312 s = bg_error;
313 }
314
315 // We cannot guarantee consistency of the WAL. So force flush Memtables of
316 // all the column families
317 if (s.ok()) {
318 FlushOptions flush_opts;
319 // We allow flush to stall write since we are trying to resume from error.
320 flush_opts.allow_write_stall = true;
321 if (immutable_db_options_.atomic_flush) {
322 autovector<ColumnFamilyData*> cfds;
323 SelectColumnFamiliesForAtomicFlush(&cfds);
324 mutex_.Unlock();
325 s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
326 mutex_.Lock();
327 } else {
328 for (auto cfd : *versions_->GetColumnFamilySet()) {
329 if (cfd->IsDropped()) {
330 continue;
331 }
332 cfd->Ref();
333 mutex_.Unlock();
334 s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
335 mutex_.Lock();
336 cfd->UnrefAndTryDelete();
337 if (!s.ok()) {
338 break;
339 }
340 }
341 }
342 if (!s.ok()) {
343 ROCKS_LOG_INFO(immutable_db_options_.info_log,
344 "DB resume requested but failed due to Flush failure [%s]",
345 s.ToString().c_str());
346 }
347 }
348
349 JobContext job_context(0);
350 FindObsoleteFiles(&job_context, true);
351 if (s.ok()) {
352 s = error_handler_.ClearBGError();
353 }
354 mutex_.Unlock();
355
356 job_context.manifest_file_number = 1;
357 if (job_context.HaveSomethingToDelete()) {
358 PurgeObsoleteFiles(job_context);
359 }
360 job_context.Clean();
361
362 if (s.ok()) {
363 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
364 }
365 mutex_.Lock();
366 // Check for shutdown again before scheduling further compactions,
367 // since we released and re-acquired the lock above
368 if (shutdown_initiated_) {
369 s = Status::ShutdownInProgress();
370 }
371 if (s.ok()) {
372 for (auto cfd : *versions_->GetColumnFamilySet()) {
373 SchedulePendingCompaction(cfd);
374 }
375 MaybeScheduleFlushOrCompaction();
376 }
377
378 // Wake up any waiters - in this case, it could be the shutdown thread
379 bg_cv_.SignalAll();
380
381 // No need to check BGError again. If something happened, event listener would
382 // be notified and the operation causing it would have failed
383 return s;
384 }
385
WaitForBackgroundWork()386 void DBImpl::WaitForBackgroundWork() {
387 // Wait for background work to finish
388 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
389 bg_flush_scheduled_) {
390 bg_cv_.Wait();
391 }
392 }
393
394 // Will lock the mutex_, will wait for completion if wait is true
CancelAllBackgroundWork(bool wait)395 void DBImpl::CancelAllBackgroundWork(bool wait) {
396 ROCKS_LOG_INFO(immutable_db_options_.info_log,
397 "Shutdown: canceling all background work");
398
399 if (thread_dump_stats_ != nullptr) {
400 thread_dump_stats_->cancel();
401 thread_dump_stats_.reset();
402 }
403 if (thread_persist_stats_ != nullptr) {
404 thread_persist_stats_->cancel();
405 thread_persist_stats_.reset();
406 }
407 InstrumentedMutexLock l(&mutex_);
408 if (!shutting_down_.load(std::memory_order_acquire) &&
409 has_unpersisted_data_.load(std::memory_order_relaxed) &&
410 !mutable_db_options_.avoid_flush_during_shutdown) {
411 if (immutable_db_options_.atomic_flush) {
412 autovector<ColumnFamilyData*> cfds;
413 SelectColumnFamiliesForAtomicFlush(&cfds);
414 mutex_.Unlock();
415 AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
416 mutex_.Lock();
417 } else {
418 for (auto cfd : *versions_->GetColumnFamilySet()) {
419 if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
420 cfd->Ref();
421 mutex_.Unlock();
422 FlushMemTable(cfd, FlushOptions(), FlushReason::kShutDown);
423 mutex_.Lock();
424 cfd->UnrefAndTryDelete();
425 }
426 }
427 }
428 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
429 }
430
431 shutting_down_.store(true, std::memory_order_release);
432 bg_cv_.SignalAll();
433 if (!wait) {
434 return;
435 }
436 WaitForBackgroundWork();
437 }
438
CloseHelper()439 Status DBImpl::CloseHelper() {
440 // Guarantee that there is no background error recovery in progress before
441 // continuing with the shutdown
442 mutex_.Lock();
443 shutdown_initiated_ = true;
444 error_handler_.CancelErrorRecovery();
445 while (error_handler_.IsRecoveryInProgress()) {
446 bg_cv_.Wait();
447 }
448 mutex_.Unlock();
449
450 // CancelAllBackgroundWork called with false means we just set the shutdown
451 // marker. After this we do a variant of the waiting and unschedule work
452 // (to consider: moving all the waiting into CancelAllBackgroundWork(true))
453 CancelAllBackgroundWork(false);
454 int bottom_compactions_unscheduled =
455 env_->UnSchedule(this, Env::Priority::BOTTOM);
456 int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
457 int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
458 Status ret;
459 mutex_.Lock();
460 bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
461 bg_compaction_scheduled_ -= compactions_unscheduled;
462 bg_flush_scheduled_ -= flushes_unscheduled;
463
464 // Wait for background work to finish
465 while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
466 bg_flush_scheduled_ || bg_purge_scheduled_ ||
467 pending_purge_obsolete_files_ ||
468 error_handler_.IsRecoveryInProgress()) {
469 TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
470 bg_cv_.Wait();
471 }
472 TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished",
473 &files_grabbed_for_purge_);
474 EraseThreadStatusDbInfo();
475 flush_scheduler_.Clear();
476 trim_history_scheduler_.Clear();
477
478 while (!flush_queue_.empty()) {
479 const FlushRequest& flush_req = PopFirstFromFlushQueue();
480 for (const auto& iter : flush_req) {
481 iter.first->UnrefAndTryDelete();
482 }
483 }
484 while (!compaction_queue_.empty()) {
485 auto cfd = PopFirstFromCompactionQueue();
486 cfd->UnrefAndTryDelete();
487 }
488
489 if (default_cf_handle_ != nullptr || persist_stats_cf_handle_ != nullptr) {
490 // we need to delete handle outside of lock because it does its own locking
491 mutex_.Unlock();
492 if (default_cf_handle_) {
493 delete default_cf_handle_;
494 default_cf_handle_ = nullptr;
495 }
496 if (persist_stats_cf_handle_) {
497 delete persist_stats_cf_handle_;
498 persist_stats_cf_handle_ = nullptr;
499 }
500 mutex_.Lock();
501 }
502
503 // Clean up obsolete files due to SuperVersion release.
504 // (1) Need to delete to obsolete files before closing because RepairDB()
505 // scans all existing files in the file system and builds manifest file.
506 // Keeping obsolete files confuses the repair process.
507 // (2) Need to check if we Open()/Recover() the DB successfully before
508 // deleting because if VersionSet recover fails (may be due to corrupted
509 // manifest file), it is not able to identify live files correctly. As a
510 // result, all "live" files can get deleted by accident. However, corrupted
511 // manifest is recoverable by RepairDB().
512 if (opened_successfully_) {
513 JobContext job_context(next_job_id_.fetch_add(1));
514 FindObsoleteFiles(&job_context, true);
515
516 mutex_.Unlock();
517 // manifest number starting from 2
518 job_context.manifest_file_number = 1;
519 if (job_context.HaveSomethingToDelete()) {
520 PurgeObsoleteFiles(job_context);
521 }
522 job_context.Clean();
523 mutex_.Lock();
524 }
525
526 for (auto l : logs_to_free_) {
527 delete l;
528 }
529 for (auto& log : logs_) {
530 uint64_t log_number = log.writer->get_log_number();
531 Status s = log.ClearWriter();
532 if (!s.ok()) {
533 ROCKS_LOG_WARN(
534 immutable_db_options_.info_log,
535 "Unable to Sync WAL file %s with error -- %s",
536 LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
537 s.ToString().c_str());
538 // Retain the first error
539 if (ret.ok()) {
540 ret = s;
541 }
542 }
543 }
544 logs_.clear();
545
546 // Table cache may have table handles holding blocks from the block cache.
547 // We need to release them before the block cache is destroyed. The block
548 // cache may be destroyed inside versions_.reset(), when column family data
549 // list is destroyed, so leaving handles in table cache after
550 // versions_.reset() may cause issues.
551 // Here we clean all unreferenced handles in table cache.
552 // Now we assume all user queries have finished, so only version set itself
553 // can possibly hold the blocks from block cache. After releasing unreferenced
554 // handles here, only handles held by version set left and inside
555 // versions_.reset(), we will release them. There, we need to make sure every
556 // time a handle is released, we erase it from the cache too. By doing that,
557 // we can guarantee that after versions_.reset(), table cache is empty
558 // so the cache can be safely destroyed.
559 table_cache_->EraseUnRefEntries();
560
561 for (auto& txn_entry : recovered_transactions_) {
562 delete txn_entry.second;
563 }
564
565 // versions need to be destroyed before table_cache since it can hold
566 // references to table_cache.
567 versions_.reset();
568 mutex_.Unlock();
569 if (db_lock_ != nullptr) {
570 env_->UnlockFile(db_lock_);
571 }
572
573 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
574 LogFlush(immutable_db_options_.info_log);
575
576 #ifndef ROCKSDB_LITE
577 // If the sst_file_manager was allocated by us during DB::Open(), ccall
578 // Close() on it before closing the info_log. Otherwise, background thread
579 // in SstFileManagerImpl might try to log something
580 if (immutable_db_options_.sst_file_manager && own_sfm_) {
581 auto sfm = static_cast<SstFileManagerImpl*>(
582 immutable_db_options_.sst_file_manager.get());
583 sfm->Close();
584 }
585 #endif // ROCKSDB_LITE
586
587 if (immutable_db_options_.info_log && own_info_log_) {
588 Status s = immutable_db_options_.info_log->Close();
589 if (ret.ok()) {
590 ret = s;
591 }
592 }
593
594 if (ret.IsAborted()) {
595 // Reserve IsAborted() error for those where users didn't release
596 // certain resource and they can release them and come back and
597 // retry. In this case, we wrap this exception to something else.
598 return Status::Incomplete(ret.ToString());
599 }
600 return ret;
601 }
602
CloseImpl()603 Status DBImpl::CloseImpl() { return CloseHelper(); }
604
~DBImpl()605 DBImpl::~DBImpl() {
606 if (!closed_) {
607 closed_ = true;
608 CloseHelper();
609 }
610 }
611
MaybeIgnoreError(Status * s) const612 void DBImpl::MaybeIgnoreError(Status* s) const {
613 if (s->ok() || immutable_db_options_.paranoid_checks) {
614 // No change needed
615 } else {
616 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
617 s->ToString().c_str());
618 *s = Status::OK();
619 }
620 }
621
CreateArchivalDirectory()622 const Status DBImpl::CreateArchivalDirectory() {
623 if (immutable_db_options_.wal_ttl_seconds > 0 ||
624 immutable_db_options_.wal_size_limit_mb > 0) {
625 std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
626 return env_->CreateDirIfMissing(archivalPath);
627 }
628 return Status::OK();
629 }
630
PrintStatistics()631 void DBImpl::PrintStatistics() {
632 auto dbstats = immutable_db_options_.statistics.get();
633 if (dbstats) {
634 ROCKS_LOG_INFO(immutable_db_options_.info_log, "STATISTICS:\n %s",
635 dbstats->ToString().c_str());
636 }
637 }
638
StartTimedTasks()639 void DBImpl::StartTimedTasks() {
640 unsigned int stats_dump_period_sec = 0;
641 unsigned int stats_persist_period_sec = 0;
642 {
643 InstrumentedMutexLock l(&mutex_);
644 stats_dump_period_sec = mutable_db_options_.stats_dump_period_sec;
645 if (stats_dump_period_sec > 0) {
646 if (!thread_dump_stats_) {
647 thread_dump_stats_.reset(new rocksdb::RepeatableThread(
648 [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
649 static_cast<uint64_t>(stats_dump_period_sec) * kMicrosInSecond));
650 }
651 }
652 stats_persist_period_sec = mutable_db_options_.stats_persist_period_sec;
653 if (stats_persist_period_sec > 0) {
654 if (!thread_persist_stats_) {
655 thread_persist_stats_.reset(new rocksdb::RepeatableThread(
656 [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
657 static_cast<uint64_t>(stats_persist_period_sec) * kMicrosInSecond));
658 }
659 }
660 }
661 }
662
663 // esitmate the total size of stats_history_
EstimateInMemoryStatsHistorySize() const664 size_t DBImpl::EstimateInMemoryStatsHistorySize() const {
665 size_t size_total =
666 sizeof(std::map<uint64_t, std::map<std::string, uint64_t>>);
667 if (stats_history_.size() == 0) return size_total;
668 size_t size_per_slice =
669 sizeof(uint64_t) + sizeof(std::map<std::string, uint64_t>);
670 // non-empty map, stats_history_.begin() guaranteed to exist
671 std::map<std::string, uint64_t> sample_slice(stats_history_.begin()->second);
672 for (const auto& pairs : sample_slice) {
673 size_per_slice +=
674 pairs.first.capacity() + sizeof(pairs.first) + sizeof(pairs.second);
675 }
676 size_total = size_per_slice * stats_history_.size();
677 return size_total;
678 }
679
PersistStats()680 void DBImpl::PersistStats() {
681 TEST_SYNC_POINT("DBImpl::PersistStats:Entry");
682 #ifndef ROCKSDB_LITE
683 if (shutdown_initiated_) {
684 return;
685 }
686 uint64_t now_seconds = env_->NowMicros() / kMicrosInSecond;
687 Statistics* statistics = immutable_db_options_.statistics.get();
688 if (!statistics) {
689 return;
690 }
691 size_t stats_history_size_limit = 0;
692 {
693 InstrumentedMutexLock l(&mutex_);
694 stats_history_size_limit = mutable_db_options_.stats_history_buffer_size;
695 }
696
697 std::map<std::string, uint64_t> stats_map;
698 if (!statistics->getTickerMap(&stats_map)) {
699 return;
700 }
701 ROCKS_LOG_INFO(immutable_db_options_.info_log,
702 "------- PERSISTING STATS -------");
703
704 if (immutable_db_options_.persist_stats_to_disk) {
705 WriteBatch batch;
706 if (stats_slice_initialized_) {
707 ROCKS_LOG_INFO(immutable_db_options_.info_log,
708 "Reading %" ROCKSDB_PRIszt " stats from statistics\n",
709 stats_slice_.size());
710 for (const auto& stat : stats_map) {
711 char key[100];
712 int length =
713 EncodePersistentStatsKey(now_seconds, stat.first, 100, key);
714 // calculate the delta from last time
715 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
716 uint64_t delta = stat.second - stats_slice_[stat.first];
717 batch.Put(persist_stats_cf_handle_, Slice(key, std::min(100, length)),
718 ToString(delta));
719 }
720 }
721 }
722 stats_slice_initialized_ = true;
723 std::swap(stats_slice_, stats_map);
724 WriteOptions wo;
725 wo.low_pri = true;
726 wo.no_slowdown = true;
727 wo.sync = false;
728 Status s = Write(wo, &batch);
729 if (!s.ok()) {
730 ROCKS_LOG_INFO(immutable_db_options_.info_log,
731 "Writing to persistent stats CF failed -- %s",
732 s.ToString().c_str());
733 } else {
734 ROCKS_LOG_INFO(immutable_db_options_.info_log,
735 "Writing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
736 " to persistent stats CF succeeded",
737 stats_slice_.size(), now_seconds);
738 }
739 // TODO(Zhongyi): add purging for persisted data
740 } else {
741 InstrumentedMutexLock l(&stats_history_mutex_);
742 // calculate the delta from last time
743 if (stats_slice_initialized_) {
744 std::map<std::string, uint64_t> stats_delta;
745 for (const auto& stat : stats_map) {
746 if (stats_slice_.find(stat.first) != stats_slice_.end()) {
747 stats_delta[stat.first] = stat.second - stats_slice_[stat.first];
748 }
749 }
750 ROCKS_LOG_INFO(immutable_db_options_.info_log,
751 "Storing %" ROCKSDB_PRIszt " stats with timestamp %" PRIu64
752 " to in-memory stats history",
753 stats_slice_.size(), now_seconds);
754 stats_history_[now_seconds] = stats_delta;
755 }
756 stats_slice_initialized_ = true;
757 std::swap(stats_slice_, stats_map);
758 TEST_SYNC_POINT("DBImpl::PersistStats:StatsCopied");
759
760 // delete older stats snapshots to control memory consumption
761 size_t stats_history_size = EstimateInMemoryStatsHistorySize();
762 bool purge_needed = stats_history_size > stats_history_size_limit;
763 ROCKS_LOG_INFO(immutable_db_options_.info_log,
764 "[Pre-GC] In-memory stats history size: %" ROCKSDB_PRIszt
765 " bytes, slice count: %" ROCKSDB_PRIszt,
766 stats_history_size, stats_history_.size());
767 while (purge_needed && !stats_history_.empty()) {
768 stats_history_.erase(stats_history_.begin());
769 purge_needed =
770 EstimateInMemoryStatsHistorySize() > stats_history_size_limit;
771 }
772 ROCKS_LOG_INFO(immutable_db_options_.info_log,
773 "[Post-GC] In-memory stats history size: %" ROCKSDB_PRIszt
774 " bytes, slice count: %" ROCKSDB_PRIszt,
775 stats_history_size, stats_history_.size());
776 }
777 #endif // !ROCKSDB_LITE
778 }
779
FindStatsByTime(uint64_t start_time,uint64_t end_time,uint64_t * new_time,std::map<std::string,uint64_t> * stats_map)780 bool DBImpl::FindStatsByTime(uint64_t start_time, uint64_t end_time,
781 uint64_t* new_time,
782 std::map<std::string, uint64_t>* stats_map) {
783 assert(new_time);
784 assert(stats_map);
785 if (!new_time || !stats_map) return false;
786 // lock when search for start_time
787 {
788 InstrumentedMutexLock l(&stats_history_mutex_);
789 auto it = stats_history_.lower_bound(start_time);
790 if (it != stats_history_.end() && it->first < end_time) {
791 // make a copy for timestamp and stats_map
792 *new_time = it->first;
793 *stats_map = it->second;
794 return true;
795 } else {
796 return false;
797 }
798 }
799 }
800
GetStatsHistory(uint64_t start_time,uint64_t end_time,std::unique_ptr<StatsHistoryIterator> * stats_iterator)801 Status DBImpl::GetStatsHistory(
802 uint64_t start_time, uint64_t end_time,
803 std::unique_ptr<StatsHistoryIterator>* stats_iterator) {
804 if (!stats_iterator) {
805 return Status::InvalidArgument("stats_iterator not preallocated.");
806 }
807 if (immutable_db_options_.persist_stats_to_disk) {
808 stats_iterator->reset(
809 new PersistentStatsHistoryIterator(start_time, end_time, this));
810 } else {
811 stats_iterator->reset(
812 new InMemoryStatsHistoryIterator(start_time, end_time, this));
813 }
814 return (*stats_iterator)->status();
815 }
816
DumpStats()817 void DBImpl::DumpStats() {
818 TEST_SYNC_POINT("DBImpl::DumpStats:1");
819 #ifndef ROCKSDB_LITE
820 const DBPropertyInfo* cf_property_info =
821 GetPropertyInfo(DB::Properties::kCFStats);
822 assert(cf_property_info != nullptr);
823 const DBPropertyInfo* db_property_info =
824 GetPropertyInfo(DB::Properties::kDBStats);
825 assert(db_property_info != nullptr);
826
827 std::string stats;
828 if (shutdown_initiated_) {
829 return;
830 }
831 {
832 InstrumentedMutexLock l(&mutex_);
833 default_cf_internal_stats_->GetStringProperty(
834 *db_property_info, DB::Properties::kDBStats, &stats);
835 for (auto cfd : *versions_->GetColumnFamilySet()) {
836 if (cfd->initialized()) {
837 cfd->internal_stats()->GetStringProperty(
838 *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
839 }
840 }
841 for (auto cfd : *versions_->GetColumnFamilySet()) {
842 if (cfd->initialized()) {
843 cfd->internal_stats()->GetStringProperty(
844 *cf_property_info, DB::Properties::kCFFileHistogram, &stats);
845 }
846 }
847 }
848 TEST_SYNC_POINT("DBImpl::DumpStats:2");
849 ROCKS_LOG_INFO(immutable_db_options_.info_log,
850 "------- DUMPING STATS -------");
851 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
852 if (immutable_db_options_.dump_malloc_stats) {
853 stats.clear();
854 DumpMallocStats(&stats);
855 if (!stats.empty()) {
856 ROCKS_LOG_INFO(immutable_db_options_.info_log,
857 "------- Malloc STATS -------");
858 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s", stats.c_str());
859 }
860 }
861 #endif // !ROCKSDB_LITE
862
863 PrintStatistics();
864 }
865
TablesRangeTombstoneSummary(ColumnFamilyHandle * column_family,int max_entries_to_print,std::string * out_str)866 Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
867 int max_entries_to_print,
868 std::string* out_str) {
869 auto* cfh =
870 static_cast_with_check<ColumnFamilyHandleImpl, ColumnFamilyHandle>(
871 column_family);
872 ColumnFamilyData* cfd = cfh->cfd();
873
874 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
875 Version* version = super_version->current;
876
877 Status s =
878 version->TablesRangeTombstoneSummary(max_entries_to_print, out_str);
879
880 CleanupSuperVersion(super_version);
881 return s;
882 }
883
ScheduleBgLogWriterClose(JobContext * job_context)884 void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
885 if (!job_context->logs_to_free.empty()) {
886 for (auto l : job_context->logs_to_free) {
887 AddToLogsToFreeQueue(l);
888 }
889 job_context->logs_to_free.clear();
890 }
891 }
892
GetDataDir(ColumnFamilyData * cfd,size_t path_id) const893 Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
894 assert(cfd);
895 Directory* ret_dir = cfd->GetDataDir(path_id);
896 if (ret_dir == nullptr) {
897 return directories_.GetDataDir(path_id);
898 }
899 return ret_dir;
900 }
901
SetOptions(ColumnFamilyHandle * column_family,const std::unordered_map<std::string,std::string> & options_map)902 Status DBImpl::SetOptions(
903 ColumnFamilyHandle* column_family,
904 const std::unordered_map<std::string, std::string>& options_map) {
905 #ifdef ROCKSDB_LITE
906 (void)column_family;
907 (void)options_map;
908 return Status::NotSupported("Not supported in ROCKSDB LITE");
909 #else
910 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
911 if (options_map.empty()) {
912 ROCKS_LOG_WARN(immutable_db_options_.info_log,
913 "SetOptions() on column family [%s], empty input",
914 cfd->GetName().c_str());
915 return Status::InvalidArgument("empty input");
916 }
917
918 MutableCFOptions new_options;
919 Status s;
920 Status persist_options_status;
921 SuperVersionContext sv_context(/* create_superversion */ true);
922 {
923 auto db_options = GetDBOptions();
924 InstrumentedMutexLock l(&mutex_);
925 s = cfd->SetOptions(db_options, options_map);
926 if (s.ok()) {
927 new_options = *cfd->GetLatestMutableCFOptions();
928 // Append new version to recompute compaction score.
929 VersionEdit dummy_edit;
930 versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
931 directories_.GetDbDir());
932 // Trigger possible flush/compactions. This has to be before we persist
933 // options to file, otherwise there will be a deadlock with writer
934 // thread.
935 InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
936
937 persist_options_status = WriteOptionsFile(
938 false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
939 bg_cv_.SignalAll();
940 }
941 }
942 sv_context.Clean();
943
944 ROCKS_LOG_INFO(
945 immutable_db_options_.info_log,
946 "SetOptions() on column family [%s], inputs:", cfd->GetName().c_str());
947 for (const auto& o : options_map) {
948 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
949 o.second.c_str());
950 }
951 if (s.ok()) {
952 ROCKS_LOG_INFO(immutable_db_options_.info_log,
953 "[%s] SetOptions() succeeded", cfd->GetName().c_str());
954 new_options.Dump(immutable_db_options_.info_log.get());
955 if (!persist_options_status.ok()) {
956 s = persist_options_status;
957 }
958 } else {
959 ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
960 cfd->GetName().c_str());
961 }
962 LogFlush(immutable_db_options_.info_log);
963 return s;
964 #endif // ROCKSDB_LITE
965 }
966
SetDBOptions(const std::unordered_map<std::string,std::string> & options_map)967 Status DBImpl::SetDBOptions(
968 const std::unordered_map<std::string, std::string>& options_map) {
969 #ifdef ROCKSDB_LITE
970 (void)options_map;
971 return Status::NotSupported("Not supported in ROCKSDB LITE");
972 #else
973 if (options_map.empty()) {
974 ROCKS_LOG_WARN(immutable_db_options_.info_log,
975 "SetDBOptions(), empty input.");
976 return Status::InvalidArgument("empty input");
977 }
978
979 MutableDBOptions new_options;
980 Status s;
981 Status persist_options_status;
982 bool wal_changed = false;
983 WriteContext write_context;
984 {
985 InstrumentedMutexLock l(&mutex_);
986 s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
987 &new_options);
988 if (new_options.bytes_per_sync == 0) {
989 new_options.bytes_per_sync = 1024 * 1024;
990 }
991 DBOptions new_db_options =
992 BuildDBOptions(immutable_db_options_, new_options);
993 if (s.ok()) {
994 s = ValidateOptions(new_db_options);
995 }
996 if (s.ok()) {
997 for (auto c : *versions_->GetColumnFamilySet()) {
998 if (!c->IsDropped()) {
999 auto cf_options = c->GetLatestCFOptions();
1000 s = ColumnFamilyData::ValidateOptions(new_db_options, cf_options);
1001 if (!s.ok()) {
1002 break;
1003 }
1004 }
1005 }
1006 }
1007 if (s.ok()) {
1008 const BGJobLimits current_bg_job_limits =
1009 GetBGJobLimits(immutable_db_options_.max_background_flushes,
1010 mutable_db_options_.max_background_compactions,
1011 mutable_db_options_.max_background_jobs,
1012 /* parallelize_compactions */ true);
1013 const BGJobLimits new_bg_job_limits = GetBGJobLimits(
1014 immutable_db_options_.max_background_flushes,
1015 new_options.max_background_compactions,
1016 new_options.max_background_jobs, /* parallelize_compactions */ true);
1017
1018 const bool max_flushes_increased =
1019 new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
1020 const bool max_compactions_increased =
1021 new_bg_job_limits.max_compactions >
1022 current_bg_job_limits.max_compactions;
1023
1024 if (max_flushes_increased || max_compactions_increased) {
1025 if (max_flushes_increased) {
1026 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_flushes,
1027 Env::Priority::HIGH);
1028 }
1029
1030 if (max_compactions_increased) {
1031 env_->IncBackgroundThreadsIfNeeded(new_bg_job_limits.max_compactions,
1032 Env::Priority::LOW);
1033 }
1034
1035 MaybeScheduleFlushOrCompaction();
1036 }
1037
1038 if (new_options.stats_dump_period_sec !=
1039 mutable_db_options_.stats_dump_period_sec) {
1040 if (thread_dump_stats_) {
1041 mutex_.Unlock();
1042 thread_dump_stats_->cancel();
1043 mutex_.Lock();
1044 }
1045 if (new_options.stats_dump_period_sec > 0) {
1046 thread_dump_stats_.reset(new rocksdb::RepeatableThread(
1047 [this]() { DBImpl::DumpStats(); }, "dump_st", env_,
1048 static_cast<uint64_t>(new_options.stats_dump_period_sec) *
1049 kMicrosInSecond));
1050 } else {
1051 thread_dump_stats_.reset();
1052 }
1053 }
1054 if (new_options.stats_persist_period_sec !=
1055 mutable_db_options_.stats_persist_period_sec) {
1056 if (thread_persist_stats_) {
1057 mutex_.Unlock();
1058 thread_persist_stats_->cancel();
1059 mutex_.Lock();
1060 }
1061 if (new_options.stats_persist_period_sec > 0) {
1062 thread_persist_stats_.reset(new rocksdb::RepeatableThread(
1063 [this]() { DBImpl::PersistStats(); }, "pst_st", env_,
1064 static_cast<uint64_t>(new_options.stats_persist_period_sec) *
1065 kMicrosInSecond));
1066 } else {
1067 thread_persist_stats_.reset();
1068 }
1069 }
1070 write_controller_.set_max_delayed_write_rate(
1071 new_options.delayed_write_rate);
1072 table_cache_.get()->SetCapacity(new_options.max_open_files == -1
1073 ? TableCache::kInfiniteCapacity
1074 : new_options.max_open_files - 10);
1075 wal_changed = mutable_db_options_.wal_bytes_per_sync !=
1076 new_options.wal_bytes_per_sync;
1077 mutable_db_options_ = new_options;
1078 file_options_for_compaction_ = FileOptions(new_db_options);
1079 file_options_for_compaction_ = fs_->OptimizeForCompactionTableWrite(
1080 file_options_for_compaction_, immutable_db_options_);
1081 versions_->ChangeFileOptions(mutable_db_options_);
1082 //TODO(xiez): clarify why apply optimize for read to write options
1083 file_options_for_compaction_ = fs_->OptimizeForCompactionTableRead(
1084 file_options_for_compaction_, immutable_db_options_);
1085 file_options_for_compaction_.compaction_readahead_size =
1086 mutable_db_options_.compaction_readahead_size;
1087 WriteThread::Writer w;
1088 write_thread_.EnterUnbatched(&w, &mutex_);
1089 if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
1090 Status purge_wal_status = SwitchWAL(&write_context);
1091 if (!purge_wal_status.ok()) {
1092 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1093 "Unable to purge WAL files in SetDBOptions() -- %s",
1094 purge_wal_status.ToString().c_str());
1095 }
1096 }
1097 persist_options_status = WriteOptionsFile(
1098 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1099 write_thread_.ExitUnbatched(&w);
1100 }
1101 }
1102 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
1103 for (const auto& o : options_map) {
1104 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
1105 o.second.c_str());
1106 }
1107 if (s.ok()) {
1108 ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
1109 new_options.Dump(immutable_db_options_.info_log.get());
1110 if (!persist_options_status.ok()) {
1111 if (immutable_db_options_.fail_if_options_file_error) {
1112 s = Status::IOError(
1113 "SetDBOptions() succeeded, but unable to persist options",
1114 persist_options_status.ToString());
1115 }
1116 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1117 "Unable to persist options in SetDBOptions() -- %s",
1118 persist_options_status.ToString().c_str());
1119 }
1120 } else {
1121 ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
1122 }
1123 LogFlush(immutable_db_options_.info_log);
1124 return s;
1125 #endif // ROCKSDB_LITE
1126 }
1127
1128 // return the same level if it cannot be moved
FindMinimumEmptyLevelFitting(ColumnFamilyData * cfd,const MutableCFOptions &,int level)1129 int DBImpl::FindMinimumEmptyLevelFitting(
1130 ColumnFamilyData* cfd, const MutableCFOptions& /*mutable_cf_options*/,
1131 int level) {
1132 mutex_.AssertHeld();
1133 const auto* vstorage = cfd->current()->storage_info();
1134 int minimum_level = level;
1135 for (int i = level - 1; i > 0; --i) {
1136 // stop if level i is not empty
1137 if (vstorage->NumLevelFiles(i) > 0) break;
1138 // stop if level i is too small (cannot fit the level files)
1139 if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
1140 break;
1141 }
1142
1143 minimum_level = i;
1144 }
1145 return minimum_level;
1146 }
1147
FlushWAL(bool sync)1148 Status DBImpl::FlushWAL(bool sync) {
1149 if (manual_wal_flush_) {
1150 Status s;
1151 {
1152 // We need to lock log_write_mutex_ since logs_ might change concurrently
1153 InstrumentedMutexLock wl(&log_write_mutex_);
1154 log::Writer* cur_log_writer = logs_.back().writer;
1155 s = cur_log_writer->WriteBuffer();
1156 }
1157 if (!s.ok()) {
1158 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1159 s.ToString().c_str());
1160 // In case there is a fs error we should set it globally to prevent the
1161 // future writes
1162 WriteStatusCheck(s);
1163 // whether sync or not, we should abort the rest of function upon error
1164 return s;
1165 }
1166 if (!sync) {
1167 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
1168 return s;
1169 }
1170 }
1171 if (!sync) {
1172 return Status::OK();
1173 }
1174 // sync = true
1175 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
1176 return SyncWAL();
1177 }
1178
SyncWAL()1179 Status DBImpl::SyncWAL() {
1180 autovector<log::Writer*, 1> logs_to_sync;
1181 bool need_log_dir_sync;
1182 uint64_t current_log_number;
1183
1184 {
1185 InstrumentedMutexLock l(&mutex_);
1186 assert(!logs_.empty());
1187
1188 // This SyncWAL() call only cares about logs up to this number.
1189 current_log_number = logfile_number_;
1190
1191 while (logs_.front().number <= current_log_number &&
1192 logs_.front().getting_synced) {
1193 log_sync_cv_.Wait();
1194 }
1195 // First check that logs are safe to sync in background.
1196 for (auto it = logs_.begin();
1197 it != logs_.end() && it->number <= current_log_number; ++it) {
1198 if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
1199 return Status::NotSupported(
1200 "SyncWAL() is not supported for this implementation of WAL file",
1201 immutable_db_options_.allow_mmap_writes
1202 ? "try setting Options::allow_mmap_writes to false"
1203 : Slice());
1204 }
1205 }
1206 for (auto it = logs_.begin();
1207 it != logs_.end() && it->number <= current_log_number; ++it) {
1208 auto& log = *it;
1209 assert(!log.getting_synced);
1210 log.getting_synced = true;
1211 logs_to_sync.push_back(log.writer);
1212 }
1213
1214 need_log_dir_sync = !log_dir_synced_;
1215 }
1216
1217 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
1218 RecordTick(stats_, WAL_FILE_SYNCED);
1219 Status status;
1220 for (log::Writer* log : logs_to_sync) {
1221 status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
1222 if (!status.ok()) {
1223 break;
1224 }
1225 }
1226 if (status.ok() && need_log_dir_sync) {
1227 status = directories_.GetWalDir()->Fsync();
1228 }
1229 TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
1230
1231 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1232 {
1233 InstrumentedMutexLock l(&mutex_);
1234 MarkLogsSynced(current_log_number, need_log_dir_sync, status);
1235 }
1236 TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
1237
1238 return status;
1239 }
1240
LockWAL()1241 Status DBImpl::LockWAL() {
1242 log_write_mutex_.Lock();
1243 auto cur_log_writer = logs_.back().writer;
1244 auto status = cur_log_writer->WriteBuffer();
1245 if (!status.ok()) {
1246 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
1247 status.ToString().c_str());
1248 // In case there is a fs error we should set it globally to prevent the
1249 // future writes
1250 WriteStatusCheck(status);
1251 }
1252 return status;
1253 }
1254
UnlockWAL()1255 Status DBImpl::UnlockWAL() {
1256 log_write_mutex_.Unlock();
1257 return Status::OK();
1258 }
1259
MarkLogsSynced(uint64_t up_to,bool synced_dir,const Status & status)1260 void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1261 const Status& status) {
1262 mutex_.AssertHeld();
1263 if (synced_dir && logfile_number_ == up_to && status.ok()) {
1264 log_dir_synced_ = true;
1265 }
1266 for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
1267 auto& log = *it;
1268 assert(log.getting_synced);
1269 if (status.ok() && logs_.size() > 1) {
1270 logs_to_free_.push_back(log.ReleaseWriter());
1271 // To modify logs_ both mutex_ and log_write_mutex_ must be held
1272 InstrumentedMutexLock l(&log_write_mutex_);
1273 it = logs_.erase(it);
1274 } else {
1275 log.getting_synced = false;
1276 ++it;
1277 }
1278 }
1279 assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
1280 (logs_.size() == 1 && !logs_[0].getting_synced));
1281 log_sync_cv_.SignalAll();
1282 }
1283
GetLatestSequenceNumber() const1284 SequenceNumber DBImpl::GetLatestSequenceNumber() const {
1285 return versions_->LastSequence();
1286 }
1287
SetLastPublishedSequence(SequenceNumber seq)1288 void DBImpl::SetLastPublishedSequence(SequenceNumber seq) {
1289 versions_->SetLastPublishedSequence(seq);
1290 }
1291
SetPreserveDeletesSequenceNumber(SequenceNumber seqnum)1292 bool DBImpl::SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) {
1293 if (seqnum > preserve_deletes_seqnum_.load()) {
1294 preserve_deletes_seqnum_.store(seqnum);
1295 return true;
1296 } else {
1297 return false;
1298 }
1299 }
1300
NewInternalIterator(Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence,ColumnFamilyHandle * column_family)1301 InternalIterator* DBImpl::NewInternalIterator(
1302 Arena* arena, RangeDelAggregator* range_del_agg, SequenceNumber sequence,
1303 ColumnFamilyHandle* column_family) {
1304 ColumnFamilyData* cfd;
1305 if (column_family == nullptr) {
1306 cfd = default_cf_handle_->cfd();
1307 } else {
1308 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1309 cfd = cfh->cfd();
1310 }
1311
1312 mutex_.Lock();
1313 SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
1314 mutex_.Unlock();
1315 ReadOptions roptions;
1316 return NewInternalIterator(roptions, cfd, super_version, arena, range_del_agg,
1317 sequence);
1318 }
1319
SchedulePurge()1320 void DBImpl::SchedulePurge() {
1321 mutex_.AssertHeld();
1322 assert(opened_successfully_);
1323
1324 // Purge operations are put into High priority queue
1325 bg_purge_scheduled_++;
1326 env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
1327 }
1328
BackgroundCallPurge()1329 void DBImpl::BackgroundCallPurge() {
1330 mutex_.Lock();
1331
1332 while (!logs_to_free_queue_.empty()) {
1333 assert(!logs_to_free_queue_.empty());
1334 log::Writer* log_writer = *(logs_to_free_queue_.begin());
1335 logs_to_free_queue_.pop_front();
1336 mutex_.Unlock();
1337 delete log_writer;
1338 mutex_.Lock();
1339 }
1340 while (!superversions_to_free_queue_.empty()) {
1341 assert(!superversions_to_free_queue_.empty());
1342 SuperVersion* sv = superversions_to_free_queue_.front();
1343 superversions_to_free_queue_.pop_front();
1344 mutex_.Unlock();
1345 delete sv;
1346 mutex_.Lock();
1347 }
1348
1349 // Can't use iterator to go over purge_files_ because inside the loop we're
1350 // unlocking the mutex that protects purge_files_.
1351 while (!purge_files_.empty()) {
1352 auto it = purge_files_.begin();
1353 // Need to make a copy of the PurgeFilesInfo before unlocking the mutex.
1354 PurgeFileInfo purge_file = it->second;
1355
1356 const std::string& fname = purge_file.fname;
1357 const std::string& dir_to_sync = purge_file.dir_to_sync;
1358 FileType type = purge_file.type;
1359 uint64_t number = purge_file.number;
1360 int job_id = purge_file.job_id;
1361
1362 purge_files_.erase(it);
1363
1364 mutex_.Unlock();
1365 DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
1366 mutex_.Lock();
1367 }
1368
1369 bg_purge_scheduled_--;
1370
1371 bg_cv_.SignalAll();
1372 // IMPORTANT:there should be no code after calling SignalAll. This call may
1373 // signal the DB destructor that it's OK to proceed with destruction. In
1374 // that case, all DB variables will be dealloacated and referencing them
1375 // will cause trouble.
1376 mutex_.Unlock();
1377 }
1378
1379 namespace {
1380 struct IterState {
IterStaterocksdb::__anon86dd08d00411::IterState1381 IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
1382 bool _background_purge)
1383 : db(_db),
1384 mu(_mu),
1385 super_version(_super_version),
1386 background_purge(_background_purge) {}
1387
1388 DBImpl* db;
1389 InstrumentedMutex* mu;
1390 SuperVersion* super_version;
1391 bool background_purge;
1392 };
1393
CleanupIteratorState(void * arg1,void *)1394 static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
1395 IterState* state = reinterpret_cast<IterState*>(arg1);
1396
1397 if (state->super_version->Unref()) {
1398 // Job id == 0 means that this is not our background process, but rather
1399 // user thread
1400 JobContext job_context(0);
1401
1402 state->mu->Lock();
1403 state->super_version->Cleanup();
1404 state->db->FindObsoleteFiles(&job_context, false, true);
1405 if (state->background_purge) {
1406 state->db->ScheduleBgLogWriterClose(&job_context);
1407 state->db->AddSuperVersionsToFreeQueue(state->super_version);
1408 state->db->SchedulePurge();
1409 }
1410 state->mu->Unlock();
1411
1412 if (!state->background_purge) {
1413 delete state->super_version;
1414 }
1415 if (job_context.HaveSomethingToDelete()) {
1416 if (state->background_purge) {
1417 // PurgeObsoleteFiles here does not delete files. Instead, it adds the
1418 // files to be deleted to a job queue, and deletes it in a separate
1419 // background thread.
1420 state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
1421 state->mu->Lock();
1422 state->db->SchedulePurge();
1423 state->mu->Unlock();
1424 } else {
1425 state->db->PurgeObsoleteFiles(job_context);
1426 }
1427 }
1428 job_context.Clean();
1429 }
1430
1431 delete state;
1432 }
1433 } // namespace
1434
NewInternalIterator(const ReadOptions & read_options,ColumnFamilyData * cfd,SuperVersion * super_version,Arena * arena,RangeDelAggregator * range_del_agg,SequenceNumber sequence)1435 InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
1436 ColumnFamilyData* cfd,
1437 SuperVersion* super_version,
1438 Arena* arena,
1439 RangeDelAggregator* range_del_agg,
1440 SequenceNumber sequence) {
1441 InternalIterator* internal_iter;
1442 assert(arena != nullptr);
1443 assert(range_del_agg != nullptr);
1444 // Need to create internal iterator from the arena.
1445 MergeIteratorBuilder merge_iter_builder(
1446 &cfd->internal_comparator(), arena,
1447 !read_options.total_order_seek &&
1448 super_version->mutable_cf_options.prefix_extractor != nullptr);
1449 // Collect iterator for mutable mem
1450 merge_iter_builder.AddIterator(
1451 super_version->mem->NewIterator(read_options, arena));
1452 std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
1453 Status s;
1454 if (!read_options.ignore_range_deletions) {
1455 range_del_iter.reset(
1456 super_version->mem->NewRangeTombstoneIterator(read_options, sequence));
1457 range_del_agg->AddTombstones(std::move(range_del_iter));
1458 }
1459 // Collect all needed child iterators for immutable memtables
1460 if (s.ok()) {
1461 super_version->imm->AddIterators(read_options, &merge_iter_builder);
1462 if (!read_options.ignore_range_deletions) {
1463 s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
1464 range_del_agg);
1465 }
1466 }
1467 TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
1468 if (s.ok()) {
1469 // Collect iterators for files in L0 - Ln
1470 if (read_options.read_tier != kMemtableTier) {
1471 super_version->current->AddIterators(read_options, file_options_,
1472 &merge_iter_builder, range_del_agg);
1473 }
1474 internal_iter = merge_iter_builder.Finish();
1475 IterState* cleanup =
1476 new IterState(this, &mutex_, super_version,
1477 read_options.background_purge_on_iterator_cleanup ||
1478 immutable_db_options_.avoid_unnecessary_blocking_io);
1479 internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1480
1481 return internal_iter;
1482 } else {
1483 CleanupSuperVersion(super_version);
1484 }
1485 return NewErrorInternalIterator<Slice>(s, arena);
1486 }
1487
DefaultColumnFamily() const1488 ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
1489 return default_cf_handle_;
1490 }
1491
PersistentStatsColumnFamily() const1492 ColumnFamilyHandle* DBImpl::PersistentStatsColumnFamily() const {
1493 return persist_stats_cf_handle_;
1494 }
1495
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)1496 Status DBImpl::Get(const ReadOptions& read_options,
1497 ColumnFamilyHandle* column_family, const Slice& key,
1498 PinnableSlice* value) {
1499 GetImplOptions get_impl_options;
1500 get_impl_options.column_family = column_family;
1501 get_impl_options.value = value;
1502 return GetImpl(read_options, key, get_impl_options);
1503 }
1504
GetImpl(const ReadOptions & read_options,const Slice & key,GetImplOptions get_impl_options)1505 Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
1506 GetImplOptions get_impl_options) {
1507 assert(get_impl_options.value != nullptr ||
1508 get_impl_options.merge_operands != nullptr);
1509 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1510 StopWatch sw(env_, stats_, DB_GET);
1511 PERF_TIMER_GUARD(get_snapshot_time);
1512
1513 auto cfh =
1514 reinterpret_cast<ColumnFamilyHandleImpl*>(get_impl_options.column_family);
1515 auto cfd = cfh->cfd();
1516
1517 if (tracer_) {
1518 // TODO: This mutex should be removed later, to improve performance when
1519 // tracing is enabled.
1520 InstrumentedMutexLock lock(&trace_mutex_);
1521 if (tracer_) {
1522 tracer_->Get(get_impl_options.column_family, key);
1523 }
1524 }
1525
1526 // Acquire SuperVersion
1527 SuperVersion* sv = GetAndRefSuperVersion(cfd);
1528
1529 TEST_SYNC_POINT("DBImpl::GetImpl:1");
1530 TEST_SYNC_POINT("DBImpl::GetImpl:2");
1531
1532 SequenceNumber snapshot;
1533 if (read_options.snapshot != nullptr) {
1534 if (get_impl_options.callback) {
1535 // Already calculated based on read_options.snapshot
1536 snapshot = get_impl_options.callback->max_visible_seq();
1537 } else {
1538 snapshot =
1539 reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1540 }
1541 } else {
1542 // Note that the snapshot is assigned AFTER referencing the super
1543 // version because otherwise a flush happening in between may compact away
1544 // data for the snapshot, so the reader would see neither data that was be
1545 // visible to the snapshot before compaction nor the newer data inserted
1546 // afterwards.
1547 snapshot = last_seq_same_as_publish_seq_
1548 ? versions_->LastSequence()
1549 : versions_->LastPublishedSequence();
1550 if (get_impl_options.callback) {
1551 // The unprep_seqs are not published for write unprepared, so it could be
1552 // that max_visible_seq is larger. Seek to the std::max of the two.
1553 // However, we still want our callback to contain the actual snapshot so
1554 // that it can do the correct visibility filtering.
1555 get_impl_options.callback->Refresh(snapshot);
1556
1557 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
1558 // max_visible_seq = max(max_visible_seq, snapshot)
1559 //
1560 // Currently, the commented out assert is broken by
1561 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
1562 // the regular transaction flow, then this special read callback would not
1563 // be needed.
1564 //
1565 // assert(callback->max_visible_seq() >= snapshot);
1566 snapshot = get_impl_options.callback->max_visible_seq();
1567 }
1568 }
1569 TEST_SYNC_POINT("DBImpl::GetImpl:3");
1570 TEST_SYNC_POINT("DBImpl::GetImpl:4");
1571
1572 // Prepare to store a list of merge operations if merge occurs.
1573 MergeContext merge_context;
1574 SequenceNumber max_covering_tombstone_seq = 0;
1575
1576 Status s;
1577 // First look in the memtable, then in the immutable memtable (if any).
1578 // s is both in/out. When in, s could either be OK or MergeInProgress.
1579 // merge_operands will contain the sequence of merges in the latter case.
1580 LookupKey lkey(key, snapshot, read_options.timestamp);
1581 PERF_TIMER_STOP(get_snapshot_time);
1582
1583 bool skip_memtable = (read_options.read_tier == kPersistedTier &&
1584 has_unpersisted_data_.load(std::memory_order_relaxed));
1585 bool done = false;
1586 if (!skip_memtable) {
1587 // Get value associated with key
1588 if (get_impl_options.get_value) {
1589 if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
1590 &merge_context, &max_covering_tombstone_seq,
1591 read_options, get_impl_options.callback,
1592 get_impl_options.is_blob_index)) {
1593 done = true;
1594 get_impl_options.value->PinSelf();
1595 RecordTick(stats_, MEMTABLE_HIT);
1596 } else if ((s.ok() || s.IsMergeInProgress()) &&
1597 sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
1598 &merge_context, &max_covering_tombstone_seq,
1599 read_options, get_impl_options.callback,
1600 get_impl_options.is_blob_index)) {
1601 done = true;
1602 get_impl_options.value->PinSelf();
1603 RecordTick(stats_, MEMTABLE_HIT);
1604 }
1605 } else {
1606 // Get Merge Operands associated with key, Merge Operands should not be
1607 // merged and raw values should be returned to the user.
1608 if (sv->mem->Get(lkey, nullptr, &s, &merge_context,
1609 &max_covering_tombstone_seq, read_options, nullptr,
1610 nullptr, false)) {
1611 done = true;
1612 RecordTick(stats_, MEMTABLE_HIT);
1613 } else if ((s.ok() || s.IsMergeInProgress()) &&
1614 sv->imm->GetMergeOperands(lkey, &s, &merge_context,
1615 &max_covering_tombstone_seq,
1616 read_options)) {
1617 done = true;
1618 RecordTick(stats_, MEMTABLE_HIT);
1619 }
1620 }
1621 if (!done && !s.ok() && !s.IsMergeInProgress()) {
1622 ReturnAndCleanupSuperVersion(cfd, sv);
1623 return s;
1624 }
1625 }
1626 if (!done) {
1627 PERF_TIMER_GUARD(get_from_output_files_time);
1628 sv->current->Get(
1629 read_options, lkey, get_impl_options.value, &s, &merge_context,
1630 &max_covering_tombstone_seq,
1631 get_impl_options.get_value ? get_impl_options.value_found : nullptr,
1632 nullptr, nullptr,
1633 get_impl_options.get_value ? get_impl_options.callback : nullptr,
1634 get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
1635 get_impl_options.get_value);
1636 RecordTick(stats_, MEMTABLE_MISS);
1637 }
1638
1639 {
1640 PERF_TIMER_GUARD(get_post_process_time);
1641
1642 ReturnAndCleanupSuperVersion(cfd, sv);
1643
1644 RecordTick(stats_, NUMBER_KEYS_READ);
1645 size_t size = 0;
1646 if (s.ok()) {
1647 if (get_impl_options.get_value) {
1648 size = get_impl_options.value->size();
1649 } else {
1650 // Return all merge operands for get_impl_options.key
1651 *get_impl_options.number_of_operands =
1652 static_cast<int>(merge_context.GetNumOperands());
1653 if (*get_impl_options.number_of_operands >
1654 get_impl_options.get_merge_operands_options
1655 ->expected_max_number_of_operands) {
1656 s = Status::Incomplete(
1657 Status::SubCode::KMergeOperandsInsufficientCapacity);
1658 } else {
1659 for (const Slice& sl : merge_context.GetOperands()) {
1660 size += sl.size();
1661 get_impl_options.merge_operands->PinSelf(sl);
1662 get_impl_options.merge_operands++;
1663 }
1664 }
1665 }
1666 RecordTick(stats_, BYTES_READ, size);
1667 PERF_COUNTER_ADD(get_read_bytes, size);
1668 }
1669 RecordInHistogram(stats_, BYTES_PER_READ, size);
1670 }
1671 return s;
1672 }
1673
MultiGet(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_family,const std::vector<Slice> & keys,std::vector<std::string> * values)1674 std::vector<Status> DBImpl::MultiGet(
1675 const ReadOptions& read_options,
1676 const std::vector<ColumnFamilyHandle*>& column_family,
1677 const std::vector<Slice>& keys, std::vector<std::string>* values) {
1678 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
1679 StopWatch sw(env_, stats_, DB_MULTIGET);
1680 PERF_TIMER_GUARD(get_snapshot_time);
1681
1682 SequenceNumber consistent_seqnum;
1683 ;
1684
1685 std::unordered_map<uint32_t, MultiGetColumnFamilyData> multiget_cf_data(
1686 column_family.size());
1687 for (auto cf : column_family) {
1688 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
1689 auto cfd = cfh->cfd();
1690 if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
1691 multiget_cf_data.emplace(cfd->GetID(),
1692 MultiGetColumnFamilyData(cfh, nullptr));
1693 }
1694 }
1695
1696 std::function<MultiGetColumnFamilyData*(
1697 std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&)>
1698 iter_deref_lambda =
1699 [](std::unordered_map<uint32_t, MultiGetColumnFamilyData>::iterator&
1700 cf_iter) { return &cf_iter->second; };
1701
1702 bool unref_only =
1703 MultiCFSnapshot<std::unordered_map<uint32_t, MultiGetColumnFamilyData>>(
1704 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1705 &consistent_seqnum);
1706
1707 // Contain a list of merge operations if merge occurs.
1708 MergeContext merge_context;
1709
1710 // Note: this always resizes the values array
1711 size_t num_keys = keys.size();
1712 std::vector<Status> stat_list(num_keys);
1713 values->resize(num_keys);
1714
1715 // Keep track of bytes that we read for statistics-recording later
1716 uint64_t bytes_read = 0;
1717 PERF_TIMER_STOP(get_snapshot_time);
1718
1719 // For each of the given keys, apply the entire "get" process as follows:
1720 // First look in the memtable, then in the immutable memtable (if any).
1721 // s is both in/out. When in, s could either be OK or MergeInProgress.
1722 // merge_operands will contain the sequence of merges in the latter case.
1723 size_t num_found = 0;
1724 for (size_t i = 0; i < num_keys; ++i) {
1725 merge_context.Clear();
1726 Status& s = stat_list[i];
1727 std::string* value = &(*values)[i];
1728
1729 LookupKey lkey(keys[i], consistent_seqnum);
1730 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
1731 SequenceNumber max_covering_tombstone_seq = 0;
1732 auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
1733 assert(mgd_iter != multiget_cf_data.end());
1734 auto mgd = mgd_iter->second;
1735 auto super_version = mgd.super_version;
1736 bool skip_memtable =
1737 (read_options.read_tier == kPersistedTier &&
1738 has_unpersisted_data_.load(std::memory_order_relaxed));
1739 bool done = false;
1740 if (!skip_memtable) {
1741 if (super_version->mem->Get(lkey, value, &s, &merge_context,
1742 &max_covering_tombstone_seq, read_options)) {
1743 done = true;
1744 RecordTick(stats_, MEMTABLE_HIT);
1745 } else if (super_version->imm->Get(lkey, value, &s, &merge_context,
1746 &max_covering_tombstone_seq,
1747 read_options)) {
1748 done = true;
1749 RecordTick(stats_, MEMTABLE_HIT);
1750 }
1751 }
1752 if (!done) {
1753 PinnableSlice pinnable_val;
1754 PERF_TIMER_GUARD(get_from_output_files_time);
1755 super_version->current->Get(read_options, lkey, &pinnable_val, &s,
1756 &merge_context, &max_covering_tombstone_seq);
1757 value->assign(pinnable_val.data(), pinnable_val.size());
1758 RecordTick(stats_, MEMTABLE_MISS);
1759 }
1760
1761 if (s.ok()) {
1762 bytes_read += value->size();
1763 num_found++;
1764 }
1765 }
1766
1767 // Post processing (decrement reference counts and record statistics)
1768 PERF_TIMER_GUARD(get_post_process_time);
1769 autovector<SuperVersion*> superversions_to_delete;
1770
1771 for (auto mgd_iter : multiget_cf_data) {
1772 auto mgd = mgd_iter.second;
1773 if (!unref_only) {
1774 ReturnAndCleanupSuperVersion(mgd.cfd, mgd.super_version);
1775 } else {
1776 mgd.cfd->GetSuperVersion()->Unref();
1777 }
1778 }
1779 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
1780 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
1781 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
1782 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
1783 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
1784 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
1785 PERF_TIMER_STOP(get_post_process_time);
1786
1787 return stat_list;
1788 }
1789
1790 template <class T>
MultiCFSnapshot(const ReadOptions & read_options,ReadCallback * callback,std::function<MultiGetColumnFamilyData * (typename T::iterator &)> & iter_deref_func,T * cf_list,SequenceNumber * snapshot)1791 bool DBImpl::MultiCFSnapshot(
1792 const ReadOptions& read_options, ReadCallback* callback,
1793 std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
1794 iter_deref_func,
1795 T* cf_list, SequenceNumber* snapshot) {
1796 PERF_TIMER_GUARD(get_snapshot_time);
1797
1798 bool last_try = false;
1799 if (cf_list->size() == 1) {
1800 // Fast path for a single column family. We can simply get the thread loca
1801 // super version
1802 auto cf_iter = cf_list->begin();
1803 auto node = iter_deref_func(cf_iter);
1804 node->super_version = GetAndRefSuperVersion(node->cfd);
1805 if (read_options.snapshot != nullptr) {
1806 // Note: In WritePrepared txns this is not necessary but not harmful
1807 // either. Because prep_seq > snapshot => commit_seq > snapshot so if
1808 // a snapshot is specified we should be fine with skipping seq numbers
1809 // that are greater than that.
1810 //
1811 // In WriteUnprepared, we cannot set snapshot in the lookup key because we
1812 // may skip uncommitted data that should be visible to the transaction for
1813 // reading own writes.
1814 *snapshot =
1815 static_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
1816 if (callback) {
1817 *snapshot = std::max(*snapshot, callback->max_visible_seq());
1818 }
1819 } else {
1820 // Since we get and reference the super version before getting
1821 // the snapshot number, without a mutex protection, it is possible
1822 // that a memtable switch happened in the middle and not all the
1823 // data for this snapshot is available. But it will contain all
1824 // the data available in the super version we have, which is also
1825 // a valid snapshot to read from.
1826 // We shouldn't get snapshot before finding and referencing the super
1827 // version because a flush happening in between may compact away data for
1828 // the snapshot, but the snapshot is earlier than the data overwriting it,
1829 // so users may see wrong results.
1830 *snapshot = last_seq_same_as_publish_seq_
1831 ? versions_->LastSequence()
1832 : versions_->LastPublishedSequence();
1833 }
1834 } else {
1835 // If we end up with the same issue of memtable geting sealed during 2
1836 // consecutive retries, it means the write rate is very high. In that case
1837 // its probably ok to take the mutex on the 3rd try so we can succeed for
1838 // sure
1839 static const int num_retries = 3;
1840 for (int i = 0; i < num_retries; ++i) {
1841 last_try = (i == num_retries - 1);
1842 bool retry = false;
1843
1844 if (i > 0) {
1845 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1846 ++cf_iter) {
1847 auto node = iter_deref_func(cf_iter);
1848 SuperVersion* super_version = node->super_version;
1849 ColumnFamilyData* cfd = node->cfd;
1850 if (super_version != nullptr) {
1851 ReturnAndCleanupSuperVersion(cfd, super_version);
1852 }
1853 node->super_version = nullptr;
1854 }
1855 }
1856 if (read_options.snapshot == nullptr) {
1857 if (last_try) {
1858 TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
1859 // We're close to max number of retries. For the last retry,
1860 // acquire the lock so we're sure to succeed
1861 mutex_.Lock();
1862 }
1863 *snapshot = last_seq_same_as_publish_seq_
1864 ? versions_->LastSequence()
1865 : versions_->LastPublishedSequence();
1866 } else {
1867 *snapshot = reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)
1868 ->number_;
1869 }
1870 for (auto cf_iter = cf_list->begin(); cf_iter != cf_list->end();
1871 ++cf_iter) {
1872 auto node = iter_deref_func(cf_iter);
1873 if (!last_try) {
1874 node->super_version = GetAndRefSuperVersion(node->cfd);
1875 } else {
1876 node->super_version = node->cfd->GetSuperVersion()->Ref();
1877 }
1878 TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
1879 if (read_options.snapshot != nullptr || last_try) {
1880 // If user passed a snapshot, then we don't care if a memtable is
1881 // sealed or compaction happens because the snapshot would ensure
1882 // that older key versions are kept around. If this is the last
1883 // retry, then we have the lock so nothing bad can happen
1884 continue;
1885 }
1886 // We could get the earliest sequence number for the whole list of
1887 // memtables, which will include immutable memtables as well, but that
1888 // might be tricky to maintain in case we decide, in future, to do
1889 // memtable compaction.
1890 if (!last_try) {
1891 SequenceNumber seq =
1892 node->super_version->mem->GetEarliestSequenceNumber();
1893 if (seq > *snapshot) {
1894 retry = true;
1895 break;
1896 }
1897 }
1898 }
1899 if (!retry) {
1900 if (last_try) {
1901 mutex_.Unlock();
1902 }
1903 break;
1904 }
1905 }
1906 }
1907
1908 // Keep track of bytes that we read for statistics-recording later
1909 PERF_TIMER_STOP(get_snapshot_time);
1910
1911 return last_try;
1912 }
1913
MultiGet(const ReadOptions & read_options,const size_t num_keys,ColumnFamilyHandle ** column_families,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)1914 void DBImpl::MultiGet(const ReadOptions& read_options, const size_t num_keys,
1915 ColumnFamilyHandle** column_families, const Slice* keys,
1916 PinnableSlice* values, Status* statuses,
1917 const bool sorted_input) {
1918 if (num_keys == 0) {
1919 return;
1920 }
1921 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
1922 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
1923 sorted_keys.resize(num_keys);
1924 for (size_t i = 0; i < num_keys; ++i) {
1925 key_context.emplace_back(column_families[i], keys[i], &values[i],
1926 &statuses[i]);
1927 }
1928 for (size_t i = 0; i < num_keys; ++i) {
1929 sorted_keys[i] = &key_context[i];
1930 }
1931 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
1932
1933 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
1934 multiget_cf_data;
1935 size_t cf_start = 0;
1936 ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
1937 for (size_t i = 0; i < num_keys; ++i) {
1938 KeyContext* key_ctx = sorted_keys[i];
1939 if (key_ctx->column_family != cf) {
1940 multiget_cf_data.emplace_back(
1941 MultiGetColumnFamilyData(cf, cf_start, i - cf_start, nullptr));
1942 cf_start = i;
1943 cf = key_ctx->column_family;
1944 }
1945 }
1946 {
1947 // multiget_cf_data.emplace_back(
1948 // MultiGetColumnFamilyData(cf, cf_start, num_keys - cf_start, nullptr));
1949 multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
1950 }
1951 std::function<MultiGetColumnFamilyData*(
1952 autovector<MultiGetColumnFamilyData,
1953 MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
1954 iter_deref_lambda =
1955 [](autovector<MultiGetColumnFamilyData,
1956 MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
1957 return &(*cf_iter);
1958 };
1959
1960 SequenceNumber consistent_seqnum;
1961 bool unref_only = MultiCFSnapshot<
1962 autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
1963 read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
1964 &consistent_seqnum);
1965
1966 for (auto cf_iter = multiget_cf_data.begin();
1967 cf_iter != multiget_cf_data.end(); ++cf_iter) {
1968 MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, &sorted_keys,
1969 cf_iter->super_version, consistent_seqnum, nullptr, nullptr);
1970 if (!unref_only) {
1971 ReturnAndCleanupSuperVersion(cf_iter->cfd, cf_iter->super_version);
1972 } else {
1973 cf_iter->cfd->GetSuperVersion()->Unref();
1974 }
1975 }
1976 }
1977
1978 namespace {
1979 // Order keys by CF ID, followed by key contents
1980 struct CompareKeyContext {
operator ()rocksdb::__anon86dd08d00711::CompareKeyContext1981 inline bool operator()(const KeyContext* lhs, const KeyContext* rhs) {
1982 ColumnFamilyHandleImpl* cfh =
1983 static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
1984 uint32_t cfd_id1 = cfh->cfd()->GetID();
1985 const Comparator* comparator = cfh->cfd()->user_comparator();
1986 cfh = static_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
1987 uint32_t cfd_id2 = cfh->cfd()->GetID();
1988
1989 if (cfd_id1 < cfd_id2) {
1990 return true;
1991 } else if (cfd_id1 > cfd_id2) {
1992 return false;
1993 }
1994
1995 // Both keys are from the same column family
1996 int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
1997 if (cmp < 0) {
1998 return true;
1999 }
2000 return false;
2001 }
2002 };
2003
2004 } // anonymous namespace
2005
PrepareMultiGetKeys(size_t num_keys,bool sorted_input,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2006 void DBImpl::PrepareMultiGetKeys(
2007 size_t num_keys, bool sorted_input,
2008 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2009 #ifndef NDEBUG
2010 if (sorted_input) {
2011 for (size_t index = 0; index < sorted_keys->size(); ++index) {
2012 if (index > 0) {
2013 KeyContext* lhs = (*sorted_keys)[index - 1];
2014 KeyContext* rhs = (*sorted_keys)[index];
2015 ColumnFamilyHandleImpl* cfh =
2016 reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2017 uint32_t cfd_id1 = cfh->cfd()->GetID();
2018 const Comparator* comparator = cfh->cfd()->user_comparator();
2019 cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(lhs->column_family);
2020 uint32_t cfd_id2 = cfh->cfd()->GetID();
2021
2022 assert(cfd_id1 <= cfd_id2);
2023 if (cfd_id1 < cfd_id2) {
2024 continue;
2025 }
2026
2027 // Both keys are from the same column family
2028 int cmp = comparator->Compare(*(lhs->key), *(rhs->key));
2029 assert(cmp <= 0);
2030 }
2031 index++;
2032 }
2033 }
2034 #endif
2035 if (!sorted_input) {
2036 CompareKeyContext sort_comparator;
2037 std::sort(sorted_keys->begin(), sorted_keys->begin() + num_keys,
2038 sort_comparator);
2039 }
2040 }
2041
MultiGet(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const size_t num_keys,const Slice * keys,PinnableSlice * values,Status * statuses,const bool sorted_input)2042 void DBImpl::MultiGet(const ReadOptions& read_options,
2043 ColumnFamilyHandle* column_family, const size_t num_keys,
2044 const Slice* keys, PinnableSlice* values,
2045 Status* statuses, const bool sorted_input) {
2046 autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
2047 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
2048 sorted_keys.resize(num_keys);
2049 for (size_t i = 0; i < num_keys; ++i) {
2050 key_context.emplace_back(column_family, keys[i], &values[i], &statuses[i]);
2051 }
2052 for (size_t i = 0; i < num_keys; ++i) {
2053 sorted_keys[i] = &key_context[i];
2054 }
2055 PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
2056 MultiGetWithCallback(read_options, column_family, nullptr, &sorted_keys);
2057 }
2058
MultiGetWithCallback(const ReadOptions & read_options,ColumnFamilyHandle * column_family,ReadCallback * callback,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys)2059 void DBImpl::MultiGetWithCallback(
2060 const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2061 ReadCallback* callback,
2062 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
2063 std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
2064 multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
2065 std::function<MultiGetColumnFamilyData*(
2066 std::array<MultiGetColumnFamilyData, 1>::iterator&)>
2067 iter_deref_lambda =
2068 [](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
2069 return &(*cf_iter);
2070 };
2071
2072 size_t num_keys = sorted_keys->size();
2073 SequenceNumber consistent_seqnum;
2074 bool unref_only = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
2075 read_options, callback, iter_deref_lambda, &multiget_cf_data,
2076 &consistent_seqnum);
2077 #ifndef NDEBUG
2078 assert(!unref_only);
2079 #else
2080 // Silence unused variable warning
2081 (void)unref_only;
2082 #endif // NDEBUG
2083
2084 if (callback && read_options.snapshot == nullptr) {
2085 // The unprep_seqs are not published for write unprepared, so it could be
2086 // that max_visible_seq is larger. Seek to the std::max of the two.
2087 // However, we still want our callback to contain the actual snapshot so
2088 // that it can do the correct visibility filtering.
2089 callback->Refresh(consistent_seqnum);
2090
2091 // Internally, WriteUnpreparedTxnReadCallback::Refresh would set
2092 // max_visible_seq = max(max_visible_seq, snapshot)
2093 //
2094 // Currently, the commented out assert is broken by
2095 // InvalidSnapshotReadCallback, but if write unprepared recovery followed
2096 // the regular transaction flow, then this special read callback would not
2097 // be needed.
2098 //
2099 // assert(callback->max_visible_seq() >= snapshot);
2100 consistent_seqnum = callback->max_visible_seq();
2101 }
2102
2103 MultiGetImpl(read_options, 0, num_keys, sorted_keys,
2104 multiget_cf_data[0].super_version, consistent_seqnum, nullptr,
2105 nullptr);
2106 ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
2107 multiget_cf_data[0].super_version);
2108 }
2109
MultiGetImpl(const ReadOptions & read_options,size_t start_key,size_t num_keys,autovector<KeyContext *,MultiGetContext::MAX_BATCH_SIZE> * sorted_keys,SuperVersion * super_version,SequenceNumber snapshot,ReadCallback * callback,bool * is_blob_index)2110 void DBImpl::MultiGetImpl(
2111 const ReadOptions& read_options, size_t start_key, size_t num_keys,
2112 autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2113 SuperVersion* super_version, SequenceNumber snapshot,
2114 ReadCallback* callback, bool* is_blob_index) {
2115 PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
2116 StopWatch sw(env_, stats_, DB_MULTIGET);
2117
2118 // For each of the given keys, apply the entire "get" process as follows:
2119 // First look in the memtable, then in the immutable memtable (if any).
2120 // s is both in/out. When in, s could either be OK or MergeInProgress.
2121 // merge_operands will contain the sequence of merges in the latter case.
2122 size_t keys_left = num_keys;
2123 while (keys_left) {
2124 size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
2125 ? MultiGetContext::MAX_BATCH_SIZE
2126 : keys_left;
2127 MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
2128 batch_size, snapshot);
2129 MultiGetRange range = ctx.GetMultiGetRange();
2130 bool lookup_current = false;
2131
2132 keys_left -= batch_size;
2133 for (auto mget_iter = range.begin(); mget_iter != range.end();
2134 ++mget_iter) {
2135 mget_iter->merge_context.Clear();
2136 *mget_iter->s = Status::OK();
2137 }
2138
2139 bool skip_memtable =
2140 (read_options.read_tier == kPersistedTier &&
2141 has_unpersisted_data_.load(std::memory_order_relaxed));
2142 if (!skip_memtable) {
2143 super_version->mem->MultiGet(read_options, &range, callback,
2144 is_blob_index);
2145 if (!range.empty()) {
2146 super_version->imm->MultiGet(read_options, &range, callback,
2147 is_blob_index);
2148 }
2149 if (!range.empty()) {
2150 lookup_current = true;
2151 uint64_t left = range.KeysLeft();
2152 RecordTick(stats_, MEMTABLE_MISS, left);
2153 }
2154 }
2155 if (lookup_current) {
2156 PERF_TIMER_GUARD(get_from_output_files_time);
2157 super_version->current->MultiGet(read_options, &range, callback,
2158 is_blob_index);
2159 }
2160 }
2161
2162 // Post processing (decrement reference counts and record statistics)
2163 PERF_TIMER_GUARD(get_post_process_time);
2164 size_t num_found = 0;
2165 uint64_t bytes_read = 0;
2166 for (size_t i = start_key; i < start_key + num_keys; ++i) {
2167 KeyContext* key = (*sorted_keys)[i];
2168 if (key->s->ok()) {
2169 bytes_read += key->value->size();
2170 num_found++;
2171 }
2172 }
2173
2174 RecordTick(stats_, NUMBER_MULTIGET_CALLS);
2175 RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
2176 RecordTick(stats_, NUMBER_MULTIGET_KEYS_FOUND, num_found);
2177 RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
2178 RecordInHistogram(stats_, BYTES_PER_MULTIGET, bytes_read);
2179 PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
2180 PERF_TIMER_STOP(get_post_process_time);
2181 }
2182
CreateColumnFamily(const ColumnFamilyOptions & cf_options,const std::string & column_family,ColumnFamilyHandle ** handle)2183 Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
2184 const std::string& column_family,
2185 ColumnFamilyHandle** handle) {
2186 assert(handle != nullptr);
2187 Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
2188 if (s.ok()) {
2189 s = WriteOptionsFile(true /*need_mutex_lock*/,
2190 true /*need_enter_write_thread*/);
2191 }
2192 return s;
2193 }
2194
CreateColumnFamilies(const ColumnFamilyOptions & cf_options,const std::vector<std::string> & column_family_names,std::vector<ColumnFamilyHandle * > * handles)2195 Status DBImpl::CreateColumnFamilies(
2196 const ColumnFamilyOptions& cf_options,
2197 const std::vector<std::string>& column_family_names,
2198 std::vector<ColumnFamilyHandle*>* handles) {
2199 assert(handles != nullptr);
2200 handles->clear();
2201 size_t num_cf = column_family_names.size();
2202 Status s;
2203 bool success_once = false;
2204 for (size_t i = 0; i < num_cf; i++) {
2205 ColumnFamilyHandle* handle;
2206 s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
2207 if (!s.ok()) {
2208 break;
2209 }
2210 handles->push_back(handle);
2211 success_once = true;
2212 }
2213 if (success_once) {
2214 Status persist_options_status = WriteOptionsFile(
2215 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2216 if (s.ok() && !persist_options_status.ok()) {
2217 s = persist_options_status;
2218 }
2219 }
2220 return s;
2221 }
2222
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles)2223 Status DBImpl::CreateColumnFamilies(
2224 const std::vector<ColumnFamilyDescriptor>& column_families,
2225 std::vector<ColumnFamilyHandle*>* handles) {
2226 assert(handles != nullptr);
2227 handles->clear();
2228 size_t num_cf = column_families.size();
2229 Status s;
2230 bool success_once = false;
2231 for (size_t i = 0; i < num_cf; i++) {
2232 ColumnFamilyHandle* handle;
2233 s = CreateColumnFamilyImpl(column_families[i].options,
2234 column_families[i].name, &handle);
2235 if (!s.ok()) {
2236 break;
2237 }
2238 handles->push_back(handle);
2239 success_once = true;
2240 }
2241 if (success_once) {
2242 Status persist_options_status = WriteOptionsFile(
2243 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2244 if (s.ok() && !persist_options_status.ok()) {
2245 s = persist_options_status;
2246 }
2247 }
2248 return s;
2249 }
2250
CreateColumnFamilyImpl(const ColumnFamilyOptions & cf_options,const std::string & column_family_name,ColumnFamilyHandle ** handle)2251 Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
2252 const std::string& column_family_name,
2253 ColumnFamilyHandle** handle) {
2254 Status s;
2255 Status persist_options_status;
2256 *handle = nullptr;
2257
2258 DBOptions db_options =
2259 BuildDBOptions(immutable_db_options_, mutable_db_options_);
2260 s = ColumnFamilyData::ValidateOptions(db_options, cf_options);
2261 if (s.ok()) {
2262 for (auto& cf_path : cf_options.cf_paths) {
2263 s = env_->CreateDirIfMissing(cf_path.path);
2264 if (!s.ok()) {
2265 break;
2266 }
2267 }
2268 }
2269 if (!s.ok()) {
2270 return s;
2271 }
2272
2273 SuperVersionContext sv_context(/* create_superversion */ true);
2274 {
2275 InstrumentedMutexLock l(&mutex_);
2276
2277 if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
2278 nullptr) {
2279 return Status::InvalidArgument("Column family already exists");
2280 }
2281 VersionEdit edit;
2282 edit.AddColumnFamily(column_family_name);
2283 uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
2284 edit.SetColumnFamily(new_id);
2285 edit.SetLogNumber(logfile_number_);
2286 edit.SetComparatorName(cf_options.comparator->Name());
2287
2288 // LogAndApply will both write the creation in MANIFEST and create
2289 // ColumnFamilyData object
2290 { // write thread
2291 WriteThread::Writer w;
2292 write_thread_.EnterUnbatched(&w, &mutex_);
2293 // LogAndApply will both write the creation in MANIFEST and create
2294 // ColumnFamilyData object
2295 s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
2296 &mutex_, directories_.GetDbDir(), false,
2297 &cf_options);
2298 write_thread_.ExitUnbatched(&w);
2299 }
2300 if (s.ok()) {
2301 auto* cfd =
2302 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2303 assert(cfd != nullptr);
2304 s = cfd->AddDirectories();
2305 }
2306 if (s.ok()) {
2307 single_column_family_mode_ = false;
2308 auto* cfd =
2309 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
2310 assert(cfd != nullptr);
2311 InstallSuperVersionAndScheduleWork(cfd, &sv_context,
2312 *cfd->GetLatestMutableCFOptions());
2313
2314 if (!cfd->mem()->IsSnapshotSupported()) {
2315 is_snapshot_supported_ = false;
2316 }
2317
2318 cfd->set_initialized();
2319
2320 *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
2321 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2322 "Created column family [%s] (ID %u)",
2323 column_family_name.c_str(), (unsigned)cfd->GetID());
2324 } else {
2325 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2326 "Creating column family [%s] FAILED -- %s",
2327 column_family_name.c_str(), s.ToString().c_str());
2328 }
2329 } // InstrumentedMutexLock l(&mutex_)
2330
2331 sv_context.Clean();
2332 // this is outside the mutex
2333 if (s.ok()) {
2334 NewThreadStatusCfInfo(
2335 reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
2336 }
2337 return s;
2338 }
2339
DropColumnFamily(ColumnFamilyHandle * column_family)2340 Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
2341 assert(column_family != nullptr);
2342 Status s = DropColumnFamilyImpl(column_family);
2343 if (s.ok()) {
2344 s = WriteOptionsFile(true /*need_mutex_lock*/,
2345 true /*need_enter_write_thread*/);
2346 }
2347 return s;
2348 }
2349
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > & column_families)2350 Status DBImpl::DropColumnFamilies(
2351 const std::vector<ColumnFamilyHandle*>& column_families) {
2352 Status s;
2353 bool success_once = false;
2354 for (auto* handle : column_families) {
2355 s = DropColumnFamilyImpl(handle);
2356 if (!s.ok()) {
2357 break;
2358 }
2359 success_once = true;
2360 }
2361 if (success_once) {
2362 Status persist_options_status = WriteOptionsFile(
2363 true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
2364 if (s.ok() && !persist_options_status.ok()) {
2365 s = persist_options_status;
2366 }
2367 }
2368 return s;
2369 }
2370
DropColumnFamilyImpl(ColumnFamilyHandle * column_family)2371 Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
2372 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2373 auto cfd = cfh->cfd();
2374 if (cfd->GetID() == 0) {
2375 return Status::InvalidArgument("Can't drop default column family");
2376 }
2377
2378 bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
2379
2380 VersionEdit edit;
2381 edit.DropColumnFamily();
2382 edit.SetColumnFamily(cfd->GetID());
2383
2384 Status s;
2385 {
2386 InstrumentedMutexLock l(&mutex_);
2387 if (cfd->IsDropped()) {
2388 s = Status::InvalidArgument("Column family already dropped!\n");
2389 }
2390 if (s.ok()) {
2391 // we drop column family from a single write thread
2392 WriteThread::Writer w;
2393 write_thread_.EnterUnbatched(&w, &mutex_);
2394 s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
2395 &mutex_);
2396 write_thread_.ExitUnbatched(&w);
2397 }
2398 if (s.ok()) {
2399 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2400 max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
2401 mutable_cf_options->max_write_buffer_number;
2402 }
2403
2404 if (!cf_support_snapshot) {
2405 // Dropped Column Family doesn't support snapshot. Need to recalculate
2406 // is_snapshot_supported_.
2407 bool new_is_snapshot_supported = true;
2408 for (auto c : *versions_->GetColumnFamilySet()) {
2409 if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
2410 new_is_snapshot_supported = false;
2411 break;
2412 }
2413 }
2414 is_snapshot_supported_ = new_is_snapshot_supported;
2415 }
2416 bg_cv_.SignalAll();
2417 }
2418
2419 if (s.ok()) {
2420 // Note that here we erase the associated cf_info of the to-be-dropped
2421 // cfd before its ref-count goes to zero to avoid having to erase cf_info
2422 // later inside db_mutex.
2423 EraseThreadStatusCfInfo(cfd);
2424 assert(cfd->IsDropped());
2425 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2426 "Dropped column family with id %u\n", cfd->GetID());
2427 } else {
2428 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2429 "Dropping column family with id %u FAILED -- %s\n",
2430 cfd->GetID(), s.ToString().c_str());
2431 }
2432
2433 return s;
2434 }
2435
KeyMayExist(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,std::string * value,bool * value_found)2436 bool DBImpl::KeyMayExist(const ReadOptions& read_options,
2437 ColumnFamilyHandle* column_family, const Slice& key,
2438 std::string* value, bool* value_found) {
2439 assert(value != nullptr);
2440 if (value_found != nullptr) {
2441 // falsify later if key-may-exist but can't fetch value
2442 *value_found = true;
2443 }
2444 ReadOptions roptions = read_options;
2445 roptions.read_tier = kBlockCacheTier; // read from block cache only
2446 PinnableSlice pinnable_val;
2447 GetImplOptions get_impl_options;
2448 get_impl_options.column_family = column_family;
2449 get_impl_options.value = &pinnable_val;
2450 get_impl_options.value_found = value_found;
2451 auto s = GetImpl(roptions, key, get_impl_options);
2452 value->assign(pinnable_val.data(), pinnable_val.size());
2453
2454 // If block_cache is enabled and the index block of the table didn't
2455 // not present in block_cache, the return value will be Status::Incomplete.
2456 // In this case, key may still exist in the table.
2457 return s.ok() || s.IsIncomplete();
2458 }
2459
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)2460 Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
2461 ColumnFamilyHandle* column_family) {
2462 if (read_options.managed) {
2463 return NewErrorIterator(
2464 Status::NotSupported("Managed iterator is not supported anymore."));
2465 }
2466 Iterator* result = nullptr;
2467 if (read_options.read_tier == kPersistedTier) {
2468 return NewErrorIterator(Status::NotSupported(
2469 "ReadTier::kPersistedData is not yet supported in iterators."));
2470 }
2471 // if iterator wants internal keys, we can only proceed if
2472 // we can guarantee the deletes haven't been processed yet
2473 if (immutable_db_options_.preserve_deletes &&
2474 read_options.iter_start_seqnum > 0 &&
2475 read_options.iter_start_seqnum < preserve_deletes_seqnum_.load()) {
2476 return NewErrorIterator(Status::InvalidArgument(
2477 "Iterator requested internal keys which are too old and are not"
2478 " guaranteed to be preserved, try larger iter_start_seqnum opt."));
2479 }
2480 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2481 auto cfd = cfh->cfd();
2482 ReadCallback* read_callback = nullptr; // No read callback provided.
2483 if (read_options.tailing) {
2484 #ifdef ROCKSDB_LITE
2485 // not supported in lite version
2486 result = nullptr;
2487
2488 #else
2489 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2490 auto iter = new ForwardIterator(this, read_options, cfd, sv);
2491 result = NewDBIterator(
2492 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2493 cfd->user_comparator(), iter, kMaxSequenceNumber,
2494 sv->mutable_cf_options.max_sequential_skip_in_iterations, read_callback,
2495 this, cfd);
2496 #endif
2497 } else {
2498 // Note: no need to consider the special case of
2499 // last_seq_same_as_publish_seq_==false since NewIterator is overridden in
2500 // WritePreparedTxnDB
2501 auto snapshot = read_options.snapshot != nullptr
2502 ? read_options.snapshot->GetSequenceNumber()
2503 : versions_->LastSequence();
2504 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
2505 }
2506 return result;
2507 }
2508
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool allow_blob,bool allow_refresh)2509 ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
2510 ColumnFamilyData* cfd,
2511 SequenceNumber snapshot,
2512 ReadCallback* read_callback,
2513 bool allow_blob,
2514 bool allow_refresh) {
2515 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2516
2517 // Try to generate a DB iterator tree in continuous memory area to be
2518 // cache friendly. Here is an example of result:
2519 // +-------------------------------+
2520 // | |
2521 // | ArenaWrappedDBIter |
2522 // | + |
2523 // | +---> Inner Iterator ------------+
2524 // | | | |
2525 // | | +-- -- -- -- -- -- -- --+ |
2526 // | +--- | Arena | |
2527 // | | | |
2528 // | Allocated Memory: | |
2529 // | | +-------------------+ |
2530 // | | | DBIter | <---+
2531 // | | + |
2532 // | | | +-> iter_ ------------+
2533 // | | | | |
2534 // | | +-------------------+ |
2535 // | | | MergingIterator | <---+
2536 // | | + |
2537 // | | | +->child iter1 ------------+
2538 // | | | | | |
2539 // | | +->child iter2 ----------+ |
2540 // | | | | | | |
2541 // | | | +->child iter3 --------+ | |
2542 // | | | | | |
2543 // | | +-------------------+ | | |
2544 // | | | Iterator1 | <--------+
2545 // | | +-------------------+ | |
2546 // | | | Iterator2 | <------+
2547 // | | +-------------------+ |
2548 // | | | Iterator3 | <----+
2549 // | | +-------------------+
2550 // | | |
2551 // +-------+-----------------------+
2552 //
2553 // ArenaWrappedDBIter inlines an arena area where all the iterators in
2554 // the iterator tree are allocated in the order of being accessed when
2555 // querying.
2556 // Laying out the iterators in the order of being accessed makes it more
2557 // likely that any iterator pointer is close to the iterator it points to so
2558 // that they are likely to be in the same cache line and/or page.
2559 ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
2560 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options, snapshot,
2561 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2562 sv->version_number, read_callback, this, cfd, allow_blob,
2563 ((read_options.snapshot != nullptr) ? false : allow_refresh));
2564
2565 InternalIterator* internal_iter =
2566 NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
2567 db_iter->GetRangeDelAggregator(), snapshot);
2568 db_iter->SetIterUnderDBIter(internal_iter);
2569
2570 return db_iter;
2571 }
2572
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)2573 Status DBImpl::NewIterators(
2574 const ReadOptions& read_options,
2575 const std::vector<ColumnFamilyHandle*>& column_families,
2576 std::vector<Iterator*>* iterators) {
2577 if (read_options.managed) {
2578 return Status::NotSupported("Managed iterator is not supported anymore.");
2579 }
2580 if (read_options.read_tier == kPersistedTier) {
2581 return Status::NotSupported(
2582 "ReadTier::kPersistedData is not yet supported in iterators.");
2583 }
2584 ReadCallback* read_callback = nullptr; // No read callback provided.
2585 iterators->clear();
2586 iterators->reserve(column_families.size());
2587 if (read_options.tailing) {
2588 #ifdef ROCKSDB_LITE
2589 return Status::InvalidArgument(
2590 "Tailing iterator not supported in RocksDB lite");
2591 #else
2592 for (auto cfh : column_families) {
2593 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
2594 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
2595 auto iter = new ForwardIterator(this, read_options, cfd, sv);
2596 iterators->push_back(NewDBIterator(
2597 env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
2598 cfd->user_comparator(), iter, kMaxSequenceNumber,
2599 sv->mutable_cf_options.max_sequential_skip_in_iterations,
2600 read_callback, this, cfd));
2601 }
2602 #endif
2603 } else {
2604 // Note: no need to consider the special case of
2605 // last_seq_same_as_publish_seq_==false since NewIterators is overridden in
2606 // WritePreparedTxnDB
2607 auto snapshot = read_options.snapshot != nullptr
2608 ? read_options.snapshot->GetSequenceNumber()
2609 : versions_->LastSequence();
2610 for (size_t i = 0; i < column_families.size(); ++i) {
2611 auto* cfd =
2612 reinterpret_cast<ColumnFamilyHandleImpl*>(column_families[i])->cfd();
2613 iterators->push_back(
2614 NewIteratorImpl(read_options, cfd, snapshot, read_callback));
2615 }
2616 }
2617
2618 return Status::OK();
2619 }
2620
GetSnapshot()2621 const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
2622
2623 #ifndef ROCKSDB_LITE
GetSnapshotForWriteConflictBoundary()2624 const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
2625 return GetSnapshotImpl(true);
2626 }
2627 #endif // ROCKSDB_LITE
2628
GetSnapshotImpl(bool is_write_conflict_boundary,bool lock)2629 SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
2630 bool lock) {
2631 int64_t unix_time = 0;
2632 env_->GetCurrentTime(&unix_time); // Ignore error
2633 SnapshotImpl* s = new SnapshotImpl;
2634
2635 if (lock) {
2636 mutex_.Lock();
2637 }
2638 // returns null if the underlying memtable does not support snapshot.
2639 if (!is_snapshot_supported_) {
2640 if (lock) {
2641 mutex_.Unlock();
2642 }
2643 delete s;
2644 return nullptr;
2645 }
2646 auto snapshot_seq = last_seq_same_as_publish_seq_
2647 ? versions_->LastSequence()
2648 : versions_->LastPublishedSequence();
2649 SnapshotImpl* snapshot =
2650 snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
2651 if (lock) {
2652 mutex_.Unlock();
2653 }
2654 return snapshot;
2655 }
2656
2657 namespace {
2658 typedef autovector<ColumnFamilyData*, 2> CfdList;
CfdListContains(const CfdList & list,ColumnFamilyData * cfd)2659 bool CfdListContains(const CfdList& list, ColumnFamilyData* cfd) {
2660 for (const ColumnFamilyData* t : list) {
2661 if (t == cfd) {
2662 return true;
2663 }
2664 }
2665 return false;
2666 }
2667 } // namespace
2668
ReleaseSnapshot(const Snapshot * s)2669 void DBImpl::ReleaseSnapshot(const Snapshot* s) {
2670 const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
2671 {
2672 InstrumentedMutexLock l(&mutex_);
2673 snapshots_.Delete(casted_s);
2674 uint64_t oldest_snapshot;
2675 if (snapshots_.empty()) {
2676 oldest_snapshot = last_seq_same_as_publish_seq_
2677 ? versions_->LastSequence()
2678 : versions_->LastPublishedSequence();
2679 } else {
2680 oldest_snapshot = snapshots_.oldest()->number_;
2681 }
2682 // Avoid to go through every column family by checking a global threshold
2683 // first.
2684 if (oldest_snapshot > bottommost_files_mark_threshold_) {
2685 CfdList cf_scheduled;
2686 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2687 cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
2688 if (!cfd->current()
2689 ->storage_info()
2690 ->BottommostFilesMarkedForCompaction()
2691 .empty()) {
2692 SchedulePendingCompaction(cfd);
2693 MaybeScheduleFlushOrCompaction();
2694 cf_scheduled.push_back(cfd);
2695 }
2696 }
2697
2698 // Calculate a new threshold, skipping those CFs where compactions are
2699 // scheduled. We do not do the same pass as the previous loop because
2700 // mutex might be unlocked during the loop, making the result inaccurate.
2701 SequenceNumber new_bottommost_files_mark_threshold = kMaxSequenceNumber;
2702 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2703 if (CfdListContains(cf_scheduled, cfd)) {
2704 continue;
2705 }
2706 new_bottommost_files_mark_threshold = std::min(
2707 new_bottommost_files_mark_threshold,
2708 cfd->current()->storage_info()->bottommost_files_mark_threshold());
2709 }
2710 bottommost_files_mark_threshold_ = new_bottommost_files_mark_threshold;
2711 }
2712 }
2713 delete casted_s;
2714 }
2715
2716 #ifndef ROCKSDB_LITE
GetPropertiesOfAllTables(ColumnFamilyHandle * column_family,TablePropertiesCollection * props)2717 Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
2718 TablePropertiesCollection* props) {
2719 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2720 auto cfd = cfh->cfd();
2721
2722 // Increment the ref count
2723 mutex_.Lock();
2724 auto version = cfd->current();
2725 version->Ref();
2726 mutex_.Unlock();
2727
2728 auto s = version->GetPropertiesOfAllTables(props);
2729
2730 // Decrement the ref count
2731 mutex_.Lock();
2732 version->Unref();
2733 mutex_.Unlock();
2734
2735 return s;
2736 }
2737
GetPropertiesOfTablesInRange(ColumnFamilyHandle * column_family,const Range * range,std::size_t n,TablePropertiesCollection * props)2738 Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
2739 const Range* range, std::size_t n,
2740 TablePropertiesCollection* props) {
2741 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2742 auto cfd = cfh->cfd();
2743
2744 // Increment the ref count
2745 mutex_.Lock();
2746 auto version = cfd->current();
2747 version->Ref();
2748 mutex_.Unlock();
2749
2750 auto s = version->GetPropertiesOfTablesInRange(range, n, props);
2751
2752 // Decrement the ref count
2753 mutex_.Lock();
2754 version->Unref();
2755 mutex_.Unlock();
2756
2757 return s;
2758 }
2759
2760 #endif // ROCKSDB_LITE
2761
GetName() const2762 const std::string& DBImpl::GetName() const { return dbname_; }
2763
GetEnv() const2764 Env* DBImpl::GetEnv() const { return env_; }
2765
GetFileSystem() const2766 FileSystem* DB::GetFileSystem() const {
2767 static LegacyFileSystemWrapper fs_wrap(GetEnv());
2768 return &fs_wrap;
2769 }
2770
GetFileSystem() const2771 FileSystem* DBImpl::GetFileSystem() const {
2772 return immutable_db_options_.fs.get();
2773 }
2774
GetOptions(ColumnFamilyHandle * column_family) const2775 Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
2776 InstrumentedMutexLock l(&mutex_);
2777 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
2778 return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
2779 cfh->cfd()->GetLatestCFOptions());
2780 }
2781
GetDBOptions() const2782 DBOptions DBImpl::GetDBOptions() const {
2783 InstrumentedMutexLock l(&mutex_);
2784 return BuildDBOptions(immutable_db_options_, mutable_db_options_);
2785 }
2786
GetProperty(ColumnFamilyHandle * column_family,const Slice & property,std::string * value)2787 bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
2788 const Slice& property, std::string* value) {
2789 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2790 value->clear();
2791 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2792 if (property_info == nullptr) {
2793 return false;
2794 } else if (property_info->handle_int) {
2795 uint64_t int_value;
2796 bool ret_value =
2797 GetIntPropertyInternal(cfd, *property_info, false, &int_value);
2798 if (ret_value) {
2799 *value = ToString(int_value);
2800 }
2801 return ret_value;
2802 } else if (property_info->handle_string) {
2803 InstrumentedMutexLock l(&mutex_);
2804 return cfd->internal_stats()->GetStringProperty(*property_info, property,
2805 value);
2806 } else if (property_info->handle_string_dbimpl) {
2807 std::string tmp_value;
2808 bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
2809 if (ret_value) {
2810 *value = tmp_value;
2811 }
2812 return ret_value;
2813 }
2814 // Shouldn't reach here since exactly one of handle_string and handle_int
2815 // should be non-nullptr.
2816 assert(false);
2817 return false;
2818 }
2819
GetMapProperty(ColumnFamilyHandle * column_family,const Slice & property,std::map<std::string,std::string> * value)2820 bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
2821 const Slice& property,
2822 std::map<std::string, std::string>* value) {
2823 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2824 value->clear();
2825 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2826 if (property_info == nullptr) {
2827 return false;
2828 } else if (property_info->handle_map) {
2829 InstrumentedMutexLock l(&mutex_);
2830 return cfd->internal_stats()->GetMapProperty(*property_info, property,
2831 value);
2832 }
2833 // If we reach this point it means that handle_map is not provided for the
2834 // requested property
2835 return false;
2836 }
2837
GetIntProperty(ColumnFamilyHandle * column_family,const Slice & property,uint64_t * value)2838 bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
2839 const Slice& property, uint64_t* value) {
2840 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2841 if (property_info == nullptr || property_info->handle_int == nullptr) {
2842 return false;
2843 }
2844 auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
2845 return GetIntPropertyInternal(cfd, *property_info, false, value);
2846 }
2847
GetIntPropertyInternal(ColumnFamilyData * cfd,const DBPropertyInfo & property_info,bool is_locked,uint64_t * value)2848 bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
2849 const DBPropertyInfo& property_info,
2850 bool is_locked, uint64_t* value) {
2851 assert(property_info.handle_int != nullptr);
2852 if (!property_info.need_out_of_mutex) {
2853 if (is_locked) {
2854 mutex_.AssertHeld();
2855 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2856 } else {
2857 InstrumentedMutexLock l(&mutex_);
2858 return cfd->internal_stats()->GetIntProperty(property_info, value, this);
2859 }
2860 } else {
2861 SuperVersion* sv = nullptr;
2862 if (!is_locked) {
2863 sv = GetAndRefSuperVersion(cfd);
2864 } else {
2865 sv = cfd->GetSuperVersion();
2866 }
2867
2868 bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
2869 property_info, sv->current, value);
2870
2871 if (!is_locked) {
2872 ReturnAndCleanupSuperVersion(cfd, sv);
2873 }
2874
2875 return ret;
2876 }
2877 }
2878
GetPropertyHandleOptionsStatistics(std::string * value)2879 bool DBImpl::GetPropertyHandleOptionsStatistics(std::string* value) {
2880 assert(value != nullptr);
2881 Statistics* statistics = immutable_db_options_.statistics.get();
2882 if (!statistics) {
2883 return false;
2884 }
2885 *value = statistics->ToString();
2886 return true;
2887 }
2888
2889 #ifndef ROCKSDB_LITE
ResetStats()2890 Status DBImpl::ResetStats() {
2891 InstrumentedMutexLock l(&mutex_);
2892 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2893 if (cfd->initialized()) {
2894 cfd->internal_stats()->Clear();
2895 }
2896 }
2897 return Status::OK();
2898 }
2899 #endif // ROCKSDB_LITE
2900
GetAggregatedIntProperty(const Slice & property,uint64_t * aggregated_value)2901 bool DBImpl::GetAggregatedIntProperty(const Slice& property,
2902 uint64_t* aggregated_value) {
2903 const DBPropertyInfo* property_info = GetPropertyInfo(property);
2904 if (property_info == nullptr || property_info->handle_int == nullptr) {
2905 return false;
2906 }
2907
2908 uint64_t sum = 0;
2909 {
2910 // Needs mutex to protect the list of column families.
2911 InstrumentedMutexLock l(&mutex_);
2912 uint64_t value;
2913 for (auto* cfd : *versions_->GetColumnFamilySet()) {
2914 if (!cfd->initialized()) {
2915 continue;
2916 }
2917 if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
2918 sum += value;
2919 } else {
2920 return false;
2921 }
2922 }
2923 }
2924 *aggregated_value = sum;
2925 return true;
2926 }
2927
GetAndRefSuperVersion(ColumnFamilyData * cfd)2928 SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
2929 // TODO(ljin): consider using GetReferencedSuperVersion() directly
2930 return cfd->GetThreadLocalSuperVersion(this);
2931 }
2932
2933 // REQUIRED: this function should only be called on the write thread or if the
2934 // mutex is held.
GetAndRefSuperVersion(uint32_t column_family_id)2935 SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
2936 auto column_family_set = versions_->GetColumnFamilySet();
2937 auto cfd = column_family_set->GetColumnFamily(column_family_id);
2938 if (!cfd) {
2939 return nullptr;
2940 }
2941
2942 return GetAndRefSuperVersion(cfd);
2943 }
2944
CleanupSuperVersion(SuperVersion * sv)2945 void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
2946 // Release SuperVersion
2947 if (sv->Unref()) {
2948 bool defer_purge =
2949 immutable_db_options().avoid_unnecessary_blocking_io;
2950 {
2951 InstrumentedMutexLock l(&mutex_);
2952 sv->Cleanup();
2953 if (defer_purge) {
2954 AddSuperVersionsToFreeQueue(sv);
2955 SchedulePurge();
2956 }
2957 }
2958 if (!defer_purge) {
2959 delete sv;
2960 }
2961 RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
2962 }
2963 RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
2964 }
2965
ReturnAndCleanupSuperVersion(ColumnFamilyData * cfd,SuperVersion * sv)2966 void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
2967 SuperVersion* sv) {
2968 if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
2969 CleanupSuperVersion(sv);
2970 }
2971 }
2972
2973 // REQUIRED: this function should only be called on the write thread.
ReturnAndCleanupSuperVersion(uint32_t column_family_id,SuperVersion * sv)2974 void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
2975 SuperVersion* sv) {
2976 auto column_family_set = versions_->GetColumnFamilySet();
2977 auto cfd = column_family_set->GetColumnFamily(column_family_id);
2978
2979 // If SuperVersion is held, and we successfully fetched a cfd using
2980 // GetAndRefSuperVersion(), it must still exist.
2981 assert(cfd != nullptr);
2982 ReturnAndCleanupSuperVersion(cfd, sv);
2983 }
2984
2985 // REQUIRED: this function should only be called on the write thread or if the
2986 // mutex is held.
GetColumnFamilyHandle(uint32_t column_family_id)2987 ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
2988 ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
2989
2990 if (!cf_memtables->Seek(column_family_id)) {
2991 return nullptr;
2992 }
2993
2994 return cf_memtables->GetColumnFamilyHandle();
2995 }
2996
2997 // REQUIRED: mutex is NOT held.
GetColumnFamilyHandleUnlocked(uint32_t column_family_id)2998 std::unique_ptr<ColumnFamilyHandle> DBImpl::GetColumnFamilyHandleUnlocked(
2999 uint32_t column_family_id) {
3000 InstrumentedMutexLock l(&mutex_);
3001
3002 auto* cfd =
3003 versions_->GetColumnFamilySet()->GetColumnFamily(column_family_id);
3004 if (cfd == nullptr) {
3005 return nullptr;
3006 }
3007
3008 return std::unique_ptr<ColumnFamilyHandleImpl>(
3009 new ColumnFamilyHandleImpl(cfd, this, &mutex_));
3010 }
3011
GetApproximateMemTableStats(ColumnFamilyHandle * column_family,const Range & range,uint64_t * const count,uint64_t * const size)3012 void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
3013 const Range& range,
3014 uint64_t* const count,
3015 uint64_t* const size) {
3016 ColumnFamilyHandleImpl* cfh =
3017 reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3018 ColumnFamilyData* cfd = cfh->cfd();
3019 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3020
3021 // Convert user_key into a corresponding internal key.
3022 InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
3023 InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
3024 MemTable::MemTableStats memStats =
3025 sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
3026 MemTable::MemTableStats immStats =
3027 sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
3028 *count = memStats.count + immStats.count;
3029 *size = memStats.size + immStats.size;
3030
3031 ReturnAndCleanupSuperVersion(cfd, sv);
3032 }
3033
GetApproximateSizes(const SizeApproximationOptions & options,ColumnFamilyHandle * column_family,const Range * range,int n,uint64_t * sizes)3034 Status DBImpl::GetApproximateSizes(const SizeApproximationOptions& options,
3035 ColumnFamilyHandle* column_family,
3036 const Range* range, int n, uint64_t* sizes) {
3037 if (!options.include_memtabtles && !options.include_files) {
3038 return Status::InvalidArgument("Invalid options");
3039 }
3040
3041 Version* v;
3042 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3043 auto cfd = cfh->cfd();
3044 SuperVersion* sv = GetAndRefSuperVersion(cfd);
3045 v = sv->current;
3046
3047 for (int i = 0; i < n; i++) {
3048 // Convert user_key into a corresponding internal key.
3049 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
3050 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
3051 sizes[i] = 0;
3052 if (options.include_files) {
3053 sizes[i] += versions_->ApproximateSize(
3054 options, v, k1.Encode(), k2.Encode(), /*start_level=*/0,
3055 /*end_level=*/-1, TableReaderCaller::kUserApproximateSize);
3056 }
3057 if (options.include_memtabtles) {
3058 sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
3059 sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
3060 }
3061 }
3062
3063 ReturnAndCleanupSuperVersion(cfd, sv);
3064 return Status::OK();
3065 }
3066
3067 std::list<uint64_t>::iterator
CaptureCurrentFileNumberInPendingOutputs()3068 DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
3069 // We need to remember the iterator of our insert, because after the
3070 // background job is done, we need to remove that element from
3071 // pending_outputs_.
3072 pending_outputs_.push_back(versions_->current_next_file_number());
3073 auto pending_outputs_inserted_elem = pending_outputs_.end();
3074 --pending_outputs_inserted_elem;
3075 return pending_outputs_inserted_elem;
3076 }
3077
ReleaseFileNumberFromPendingOutputs(std::unique_ptr<std::list<uint64_t>::iterator> & v)3078 void DBImpl::ReleaseFileNumberFromPendingOutputs(
3079 std::unique_ptr<std::list<uint64_t>::iterator>& v) {
3080 if (v.get() != nullptr) {
3081 pending_outputs_.erase(*v.get());
3082 v.reset();
3083 }
3084 }
3085
3086 #ifndef ROCKSDB_LITE
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options)3087 Status DBImpl::GetUpdatesSince(
3088 SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
3089 const TransactionLogIterator::ReadOptions& read_options) {
3090 RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
3091 if (seq > versions_->LastSequence()) {
3092 return Status::NotFound("Requested sequence not yet written in the db");
3093 }
3094 return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
3095 }
3096
DeleteFile(std::string name)3097 Status DBImpl::DeleteFile(std::string name) {
3098 uint64_t number;
3099 FileType type;
3100 WalFileType log_type;
3101 if (!ParseFileName(name, &number, &type, &log_type) ||
3102 (type != kTableFile && type != kLogFile)) {
3103 ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
3104 name.c_str());
3105 return Status::InvalidArgument("Invalid file name");
3106 }
3107
3108 Status status;
3109 if (type == kLogFile) {
3110 // Only allow deleting archived log files
3111 if (log_type != kArchivedLogFile) {
3112 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3113 "DeleteFile %s failed - not archived log.\n",
3114 name.c_str());
3115 return Status::NotSupported("Delete only supported for archived logs");
3116 }
3117 status = wal_manager_.DeleteFile(name, number);
3118 if (!status.ok()) {
3119 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3120 "DeleteFile %s failed -- %s.\n", name.c_str(),
3121 status.ToString().c_str());
3122 }
3123 return status;
3124 }
3125
3126 int level;
3127 FileMetaData* metadata;
3128 ColumnFamilyData* cfd;
3129 VersionEdit edit;
3130 JobContext job_context(next_job_id_.fetch_add(1), true);
3131 {
3132 InstrumentedMutexLock l(&mutex_);
3133 status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
3134 if (!status.ok()) {
3135 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3136 "DeleteFile %s failed. File not found\n", name.c_str());
3137 job_context.Clean();
3138 return Status::InvalidArgument("File not found");
3139 }
3140 assert(level < cfd->NumberLevels());
3141
3142 // If the file is being compacted no need to delete.
3143 if (metadata->being_compacted) {
3144 ROCKS_LOG_INFO(immutable_db_options_.info_log,
3145 "DeleteFile %s Skipped. File about to be compacted\n",
3146 name.c_str());
3147 job_context.Clean();
3148 return Status::OK();
3149 }
3150
3151 // Only the files in the last level can be deleted externally.
3152 // This is to make sure that any deletion tombstones are not
3153 // lost. Check that the level passed is the last level.
3154 auto* vstoreage = cfd->current()->storage_info();
3155 for (int i = level + 1; i < cfd->NumberLevels(); i++) {
3156 if (vstoreage->NumLevelFiles(i) != 0) {
3157 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3158 "DeleteFile %s FAILED. File not in last level\n",
3159 name.c_str());
3160 job_context.Clean();
3161 return Status::InvalidArgument("File not in last level");
3162 }
3163 }
3164 // if level == 0, it has to be the oldest file
3165 if (level == 0 &&
3166 vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
3167 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3168 "DeleteFile %s failed ---"
3169 " target file in level 0 must be the oldest.",
3170 name.c_str());
3171 job_context.Clean();
3172 return Status::InvalidArgument("File in level 0, but not oldest");
3173 }
3174 edit.SetColumnFamily(cfd->GetID());
3175 edit.DeleteFile(level, number);
3176 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3177 &edit, &mutex_, directories_.GetDbDir());
3178 if (status.ok()) {
3179 InstallSuperVersionAndScheduleWork(cfd,
3180 &job_context.superversion_contexts[0],
3181 *cfd->GetLatestMutableCFOptions());
3182 }
3183 FindObsoleteFiles(&job_context, false);
3184 } // lock released here
3185
3186 LogFlush(immutable_db_options_.info_log);
3187 // remove files outside the db-lock
3188 if (job_context.HaveSomethingToDelete()) {
3189 // Call PurgeObsoleteFiles() without holding mutex.
3190 PurgeObsoleteFiles(job_context);
3191 }
3192 job_context.Clean();
3193 return status;
3194 }
3195
DeleteFilesInRanges(ColumnFamilyHandle * column_family,const RangePtr * ranges,size_t n,bool include_end)3196 Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family,
3197 const RangePtr* ranges, size_t n,
3198 bool include_end) {
3199 Status status;
3200 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
3201 ColumnFamilyData* cfd = cfh->cfd();
3202 VersionEdit edit;
3203 std::set<FileMetaData*> deleted_files;
3204 JobContext job_context(next_job_id_.fetch_add(1), true);
3205 {
3206 InstrumentedMutexLock l(&mutex_);
3207 Version* input_version = cfd->current();
3208
3209 auto* vstorage = input_version->storage_info();
3210 for (size_t r = 0; r < n; r++) {
3211 auto begin = ranges[r].start, end = ranges[r].limit;
3212 for (int i = 1; i < cfd->NumberLevels(); i++) {
3213 if (vstorage->LevelFiles(i).empty() ||
3214 !vstorage->OverlapInLevel(i, begin, end)) {
3215 continue;
3216 }
3217 std::vector<FileMetaData*> level_files;
3218 InternalKey begin_storage, end_storage, *begin_key, *end_key;
3219 if (begin == nullptr) {
3220 begin_key = nullptr;
3221 } else {
3222 begin_storage.SetMinPossibleForUserKey(*begin);
3223 begin_key = &begin_storage;
3224 }
3225 if (end == nullptr) {
3226 end_key = nullptr;
3227 } else {
3228 end_storage.SetMaxPossibleForUserKey(*end);
3229 end_key = &end_storage;
3230 }
3231
3232 vstorage->GetCleanInputsWithinInterval(
3233 i, begin_key, end_key, &level_files, -1 /* hint_index */,
3234 nullptr /* file_index */);
3235 FileMetaData* level_file;
3236 for (uint32_t j = 0; j < level_files.size(); j++) {
3237 level_file = level_files[j];
3238 if (level_file->being_compacted) {
3239 continue;
3240 }
3241 if (deleted_files.find(level_file) != deleted_files.end()) {
3242 continue;
3243 }
3244 if (!include_end && end != nullptr &&
3245 cfd->user_comparator()->Compare(level_file->largest.user_key(),
3246 *end) == 0) {
3247 continue;
3248 }
3249 edit.SetColumnFamily(cfd->GetID());
3250 edit.DeleteFile(i, level_file->fd.GetNumber());
3251 deleted_files.insert(level_file);
3252 level_file->being_compacted = true;
3253 }
3254 }
3255 }
3256 if (edit.GetDeletedFiles().empty()) {
3257 job_context.Clean();
3258 return Status::OK();
3259 }
3260 input_version->Ref();
3261 status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
3262 &edit, &mutex_, directories_.GetDbDir());
3263 if (status.ok()) {
3264 InstallSuperVersionAndScheduleWork(cfd,
3265 &job_context.superversion_contexts[0],
3266 *cfd->GetLatestMutableCFOptions());
3267 }
3268 for (auto* deleted_file : deleted_files) {
3269 deleted_file->being_compacted = false;
3270 }
3271 input_version->Unref();
3272 FindObsoleteFiles(&job_context, false);
3273 } // lock released here
3274
3275 LogFlush(immutable_db_options_.info_log);
3276 // remove files outside the db-lock
3277 if (job_context.HaveSomethingToDelete()) {
3278 // Call PurgeObsoleteFiles() without holding mutex.
3279 PurgeObsoleteFiles(job_context);
3280 }
3281 job_context.Clean();
3282 return status;
3283 }
3284
GetLiveFilesMetaData(std::vector<LiveFileMetaData> * metadata)3285 void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
3286 InstrumentedMutexLock l(&mutex_);
3287 versions_->GetLiveFilesMetaData(metadata);
3288 }
3289
GetColumnFamilyMetaData(ColumnFamilyHandle * column_family,ColumnFamilyMetaData * cf_meta)3290 void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
3291 ColumnFamilyMetaData* cf_meta) {
3292 assert(column_family);
3293 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
3294 auto* sv = GetAndRefSuperVersion(cfd);
3295 {
3296 // Without mutex, Version::GetColumnFamilyMetaData will have data race with
3297 // Compaction::MarkFilesBeingCompacted. One solution is to use mutex, but
3298 // this may cause regression. An alternative is to make
3299 // FileMetaData::being_compacted atomic, but it will make FileMetaData
3300 // non-copy-able. Another option is to separate these variables from
3301 // original FileMetaData struct, and this requires re-organization of data
3302 // structures. For now, we take the easy approach. If
3303 // DB::GetColumnFamilyMetaData is not called frequently, the regression
3304 // should not be big. We still need to keep an eye on it.
3305 InstrumentedMutexLock l(&mutex_);
3306 sv->current->GetColumnFamilyMetaData(cf_meta);
3307 }
3308 ReturnAndCleanupSuperVersion(cfd, sv);
3309 }
3310
3311 #endif // ROCKSDB_LITE
3312
CheckConsistency()3313 Status DBImpl::CheckConsistency() {
3314 mutex_.AssertHeld();
3315 std::vector<LiveFileMetaData> metadata;
3316 versions_->GetLiveFilesMetaData(&metadata);
3317 TEST_SYNC_POINT("DBImpl::CheckConsistency:AfterGetLiveFilesMetaData");
3318
3319 std::string corruption_messages;
3320 for (const auto& md : metadata) {
3321 // md.name has a leading "/".
3322 std::string file_path = md.db_path + md.name;
3323
3324 uint64_t fsize = 0;
3325 TEST_SYNC_POINT("DBImpl::CheckConsistency:BeforeGetFileSize");
3326 Status s = env_->GetFileSize(file_path, &fsize);
3327 if (!s.ok() &&
3328 env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
3329 s = Status::OK();
3330 }
3331 if (!s.ok()) {
3332 corruption_messages +=
3333 "Can't access " + md.name + ": " + s.ToString() + "\n";
3334 } else if (fsize != md.size) {
3335 corruption_messages += "Sst file size mismatch: " + file_path +
3336 ". Size recorded in manifest " +
3337 ToString(md.size) + ", actual size " +
3338 ToString(fsize) + "\n";
3339 }
3340 }
3341 if (corruption_messages.size() == 0) {
3342 return Status::OK();
3343 } else {
3344 return Status::Corruption(corruption_messages);
3345 }
3346 }
3347
GetDbIdentity(std::string & identity) const3348 Status DBImpl::GetDbIdentity(std::string& identity) const {
3349 identity.assign(db_id_);
3350 return Status::OK();
3351 }
3352
GetDbIdentityFromIdentityFile(std::string * identity) const3353 Status DBImpl::GetDbIdentityFromIdentityFile(std::string* identity) const {
3354 std::string idfilename = IdentityFileName(dbname_);
3355 const FileOptions soptions;
3356 std::unique_ptr<SequentialFileReader> id_file_reader;
3357 Status s;
3358 {
3359 std::unique_ptr<FSSequentialFile> idfile;
3360 s = fs_->NewSequentialFile(idfilename, soptions, &idfile, nullptr);
3361 if (!s.ok()) {
3362 return s;
3363 }
3364 id_file_reader.reset(
3365 new SequentialFileReader(std::move(idfile), idfilename));
3366 }
3367
3368 uint64_t file_size;
3369 s = fs_->GetFileSize(idfilename, IOOptions(), &file_size, nullptr);
3370 if (!s.ok()) {
3371 return s;
3372 }
3373 char* buffer =
3374 reinterpret_cast<char*>(alloca(static_cast<size_t>(file_size)));
3375 Slice id;
3376 s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer);
3377 if (!s.ok()) {
3378 return s;
3379 }
3380 identity->assign(id.ToString());
3381 // If last character is '\n' remove it from identity
3382 if (identity->size() > 0 && identity->back() == '\n') {
3383 identity->pop_back();
3384 }
3385 return s;
3386 }
3387
3388 // Default implementation -- returns not supported status
CreateColumnFamily(const ColumnFamilyOptions &,const std::string &,ColumnFamilyHandle **)3389 Status DB::CreateColumnFamily(const ColumnFamilyOptions& /*cf_options*/,
3390 const std::string& /*column_family_name*/,
3391 ColumnFamilyHandle** /*handle*/) {
3392 return Status::NotSupported("");
3393 }
3394
CreateColumnFamilies(const ColumnFamilyOptions &,const std::vector<std::string> &,std::vector<ColumnFamilyHandle * > *)3395 Status DB::CreateColumnFamilies(
3396 const ColumnFamilyOptions& /*cf_options*/,
3397 const std::vector<std::string>& /*column_family_names*/,
3398 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3399 return Status::NotSupported("");
3400 }
3401
CreateColumnFamilies(const std::vector<ColumnFamilyDescriptor> &,std::vector<ColumnFamilyHandle * > *)3402 Status DB::CreateColumnFamilies(
3403 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
3404 std::vector<ColumnFamilyHandle*>* /*handles*/) {
3405 return Status::NotSupported("");
3406 }
3407
DropColumnFamily(ColumnFamilyHandle *)3408 Status DB::DropColumnFamily(ColumnFamilyHandle* /*column_family*/) {
3409 return Status::NotSupported("");
3410 }
3411
DropColumnFamilies(const std::vector<ColumnFamilyHandle * > &)3412 Status DB::DropColumnFamilies(
3413 const std::vector<ColumnFamilyHandle*>& /*column_families*/) {
3414 return Status::NotSupported("");
3415 }
3416
DestroyColumnFamilyHandle(ColumnFamilyHandle * column_family)3417 Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
3418 delete column_family;
3419 return Status::OK();
3420 }
3421
~DB()3422 DB::~DB() {}
3423
Close()3424 Status DBImpl::Close() {
3425 if (!closed_) {
3426 {
3427 InstrumentedMutexLock l(&mutex_);
3428 // If there is unreleased snapshot, fail the close call
3429 if (!snapshots_.empty()) {
3430 return Status::Aborted("Cannot close DB with unreleased snapshot.");
3431 }
3432 }
3433
3434 closed_ = true;
3435 return CloseImpl();
3436 }
3437 return Status::OK();
3438 }
3439
ListColumnFamilies(const DBOptions & db_options,const std::string & name,std::vector<std::string> * column_families)3440 Status DB::ListColumnFamilies(const DBOptions& db_options,
3441 const std::string& name,
3442 std::vector<std::string>* column_families) {
3443 FileSystem* fs = db_options.file_system.get();
3444 LegacyFileSystemWrapper legacy_fs(db_options.env);
3445 if (!fs) {
3446 fs = &legacy_fs;
3447 }
3448 return VersionSet::ListColumnFamilies(column_families, name, fs);
3449 }
3450
~Snapshot()3451 Snapshot::~Snapshot() {}
3452
DestroyDB(const std::string & dbname,const Options & options,const std::vector<ColumnFamilyDescriptor> & column_families)3453 Status DestroyDB(const std::string& dbname, const Options& options,
3454 const std::vector<ColumnFamilyDescriptor>& column_families) {
3455 ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
3456 Env* env = soptions.env;
3457 std::vector<std::string> filenames;
3458 bool wal_in_db_path = IsWalDirSameAsDBPath(&soptions);
3459
3460 // Reset the logger because it holds a handle to the
3461 // log file and prevents cleanup and directory removal
3462 soptions.info_log.reset();
3463 // Ignore error in case directory does not exist
3464 env->GetChildren(dbname, &filenames);
3465
3466 FileLock* lock;
3467 const std::string lockname = LockFileName(dbname);
3468 Status result = env->LockFile(lockname, &lock);
3469 if (result.ok()) {
3470 uint64_t number;
3471 FileType type;
3472 InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
3473 for (const auto& fname : filenames) {
3474 if (ParseFileName(fname, &number, info_log_prefix.prefix, &type) &&
3475 type != kDBLockFile) { // Lock file will be deleted at end
3476 Status del;
3477 std::string path_to_delete = dbname + "/" + fname;
3478 if (type == kMetaDatabase) {
3479 del = DestroyDB(path_to_delete, options);
3480 } else if (type == kTableFile || type == kLogFile) {
3481 del = DeleteDBFile(&soptions, path_to_delete, dbname,
3482 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3483 } else {
3484 del = env->DeleteFile(path_to_delete);
3485 }
3486 if (result.ok() && !del.ok()) {
3487 result = del;
3488 }
3489 }
3490 }
3491
3492 std::vector<std::string> paths;
3493
3494 for (const auto& path : options.db_paths) {
3495 paths.emplace_back(path.path);
3496 }
3497 for (const auto& cf : column_families) {
3498 for (const auto& path : cf.options.cf_paths) {
3499 paths.emplace_back(path.path);
3500 }
3501 }
3502
3503 // Remove duplicate paths.
3504 // Note that we compare only the actual paths but not path ids.
3505 // This reason is that same path can appear at different path_ids
3506 // for different column families.
3507 std::sort(paths.begin(), paths.end());
3508 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
3509
3510 for (const auto& path : paths) {
3511 if (env->GetChildren(path, &filenames).ok()) {
3512 for (const auto& fname : filenames) {
3513 if (ParseFileName(fname, &number, &type) &&
3514 type == kTableFile) { // Lock file will be deleted at end
3515 std::string table_path = path + "/" + fname;
3516 Status del = DeleteDBFile(&soptions, table_path, dbname,
3517 /*force_bg=*/false, /*force_fg=*/false);
3518 if (result.ok() && !del.ok()) {
3519 result = del;
3520 }
3521 }
3522 }
3523 env->DeleteDir(path);
3524 }
3525 }
3526
3527 std::vector<std::string> walDirFiles;
3528 std::string archivedir = ArchivalDirectory(dbname);
3529 bool wal_dir_exists = false;
3530 if (dbname != soptions.wal_dir) {
3531 wal_dir_exists = env->GetChildren(soptions.wal_dir, &walDirFiles).ok();
3532 archivedir = ArchivalDirectory(soptions.wal_dir);
3533 }
3534
3535 // Archive dir may be inside wal dir or dbname and should be
3536 // processed and removed before those otherwise we have issues
3537 // removing them
3538 std::vector<std::string> archiveFiles;
3539 if (env->GetChildren(archivedir, &archiveFiles).ok()) {
3540 // Delete archival files.
3541 for (const auto& file : archiveFiles) {
3542 if (ParseFileName(file, &number, &type) && type == kLogFile) {
3543 Status del =
3544 DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
3545 /*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
3546 if (result.ok() && !del.ok()) {
3547 result = del;
3548 }
3549 }
3550 }
3551 env->DeleteDir(archivedir);
3552 }
3553
3554 // Delete log files in the WAL dir
3555 if (wal_dir_exists) {
3556 for (const auto& file : walDirFiles) {
3557 if (ParseFileName(file, &number, &type) && type == kLogFile) {
3558 Status del =
3559 DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
3560 soptions.wal_dir, /*force_bg=*/false,
3561 /*force_fg=*/!wal_in_db_path);
3562 if (result.ok() && !del.ok()) {
3563 result = del;
3564 }
3565 }
3566 }
3567 env->DeleteDir(soptions.wal_dir);
3568 }
3569
3570 env->UnlockFile(lock); // Ignore error since state is already gone
3571 env->DeleteFile(lockname);
3572 env->DeleteDir(dbname); // Ignore error in case dir contains other files
3573 }
3574 return result;
3575 }
3576
WriteOptionsFile(bool need_mutex_lock,bool need_enter_write_thread)3577 Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
3578 bool need_enter_write_thread) {
3579 #ifndef ROCKSDB_LITE
3580 WriteThread::Writer w;
3581 if (need_mutex_lock) {
3582 mutex_.Lock();
3583 } else {
3584 mutex_.AssertHeld();
3585 }
3586 if (need_enter_write_thread) {
3587 write_thread_.EnterUnbatched(&w, &mutex_);
3588 }
3589
3590 std::vector<std::string> cf_names;
3591 std::vector<ColumnFamilyOptions> cf_opts;
3592
3593 // This part requires mutex to protect the column family options
3594 for (auto cfd : *versions_->GetColumnFamilySet()) {
3595 if (cfd->IsDropped()) {
3596 continue;
3597 }
3598 cf_names.push_back(cfd->GetName());
3599 cf_opts.push_back(cfd->GetLatestCFOptions());
3600 }
3601
3602 // Unlock during expensive operations. New writes cannot get here
3603 // because the single write thread ensures all new writes get queued.
3604 DBOptions db_options =
3605 BuildDBOptions(immutable_db_options_, mutable_db_options_);
3606 mutex_.Unlock();
3607
3608 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
3609 TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
3610
3611 std::string file_name =
3612 TempOptionsFileName(GetName(), versions_->NewFileNumber());
3613 Status s = PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name,
3614 GetFileSystem());
3615
3616 if (s.ok()) {
3617 s = RenameTempFileToOptionsFile(file_name);
3618 }
3619 // restore lock
3620 if (!need_mutex_lock) {
3621 mutex_.Lock();
3622 }
3623 if (need_enter_write_thread) {
3624 write_thread_.ExitUnbatched(&w);
3625 }
3626 if (!s.ok()) {
3627 ROCKS_LOG_WARN(immutable_db_options_.info_log,
3628 "Unnable to persist options -- %s", s.ToString().c_str());
3629 if (immutable_db_options_.fail_if_options_file_error) {
3630 return Status::IOError("Unable to persist options.",
3631 s.ToString().c_str());
3632 }
3633 }
3634 #else
3635 (void)need_mutex_lock;
3636 (void)need_enter_write_thread;
3637 #endif // !ROCKSDB_LITE
3638 return Status::OK();
3639 }
3640
3641 #ifndef ROCKSDB_LITE
3642 namespace {
DeleteOptionsFilesHelper(const std::map<uint64_t,std::string> & filenames,const size_t num_files_to_keep,const std::shared_ptr<Logger> & info_log,Env * env)3643 void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
3644 const size_t num_files_to_keep,
3645 const std::shared_ptr<Logger>& info_log,
3646 Env* env) {
3647 if (filenames.size() <= num_files_to_keep) {
3648 return;
3649 }
3650 for (auto iter = std::next(filenames.begin(), num_files_to_keep);
3651 iter != filenames.end(); ++iter) {
3652 if (!env->DeleteFile(iter->second).ok()) {
3653 ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
3654 iter->second.c_str());
3655 }
3656 }
3657 }
3658 } // namespace
3659 #endif // !ROCKSDB_LITE
3660
DeleteObsoleteOptionsFiles()3661 Status DBImpl::DeleteObsoleteOptionsFiles() {
3662 #ifndef ROCKSDB_LITE
3663 std::vector<std::string> filenames;
3664 // use ordered map to store keep the filenames sorted from the newest
3665 // to the oldest.
3666 std::map<uint64_t, std::string> options_filenames;
3667 Status s;
3668 s = GetEnv()->GetChildren(GetName(), &filenames);
3669 if (!s.ok()) {
3670 return s;
3671 }
3672 for (auto& filename : filenames) {
3673 uint64_t file_number;
3674 FileType type;
3675 if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
3676 options_filenames.insert(
3677 {std::numeric_limits<uint64_t>::max() - file_number,
3678 GetName() + "/" + filename});
3679 }
3680 }
3681
3682 // Keeps the latest 2 Options file
3683 const size_t kNumOptionsFilesKept = 2;
3684 DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
3685 immutable_db_options_.info_log, GetEnv());
3686 return Status::OK();
3687 #else
3688 return Status::OK();
3689 #endif // !ROCKSDB_LITE
3690 }
3691
RenameTempFileToOptionsFile(const std::string & file_name)3692 Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
3693 #ifndef ROCKSDB_LITE
3694 Status s;
3695
3696 uint64_t options_file_number = versions_->NewFileNumber();
3697 std::string options_file_name =
3698 OptionsFileName(GetName(), options_file_number);
3699 // Retry if the file name happen to conflict with an existing one.
3700 s = GetEnv()->RenameFile(file_name, options_file_name);
3701 if (s.ok()) {
3702 InstrumentedMutexLock l(&mutex_);
3703 versions_->options_file_number_ = options_file_number;
3704 }
3705
3706 if (0 == disable_delete_obsolete_files_) {
3707 DeleteObsoleteOptionsFiles();
3708 }
3709 return s;
3710 #else
3711 (void)file_name;
3712 return Status::OK();
3713 #endif // !ROCKSDB_LITE
3714 }
3715
3716 #ifdef ROCKSDB_USING_THREAD_STATUS
3717
NewThreadStatusCfInfo(ColumnFamilyData * cfd) const3718 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3719 if (immutable_db_options_.enable_thread_tracking) {
3720 ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
3721 cfd->ioptions()->env);
3722 }
3723 }
3724
EraseThreadStatusCfInfo(ColumnFamilyData * cfd) const3725 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const {
3726 if (immutable_db_options_.enable_thread_tracking) {
3727 ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
3728 }
3729 }
3730
EraseThreadStatusDbInfo() const3731 void DBImpl::EraseThreadStatusDbInfo() const {
3732 if (immutable_db_options_.enable_thread_tracking) {
3733 ThreadStatusUtil::EraseDatabaseInfo(this);
3734 }
3735 }
3736
3737 #else
NewThreadStatusCfInfo(ColumnFamilyData *) const3738 void DBImpl::NewThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3739
EraseThreadStatusCfInfo(ColumnFamilyData *) const3740 void DBImpl::EraseThreadStatusCfInfo(ColumnFamilyData* /*cfd*/) const {}
3741
EraseThreadStatusDbInfo() const3742 void DBImpl::EraseThreadStatusDbInfo() const {}
3743 #endif // ROCKSDB_USING_THREAD_STATUS
3744
3745 //
3746 // A global method that can dump out the build version
DumpRocksDBBuildVersion(Logger * log)3747 void DumpRocksDBBuildVersion(Logger* log) {
3748 #if !defined(IOS_CROSS_COMPILE)
3749 // if we compile with Xcode, we don't run build_detect_version, so we don't
3750 // generate util/build_version.cc
3751 ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
3752 ROCKSDB_MINOR, ROCKSDB_PATCH);
3753 ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
3754 ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
3755 #else
3756 (void)log; // ignore "-Wunused-parameter"
3757 #endif
3758 }
3759
3760 #ifndef ROCKSDB_LITE
GetEarliestMemTableSequenceNumber(SuperVersion * sv,bool include_history)3761 SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
3762 bool include_history) {
3763 // Find the earliest sequence number that we know we can rely on reading
3764 // from the memtable without needing to check sst files.
3765 SequenceNumber earliest_seq =
3766 sv->imm->GetEarliestSequenceNumber(include_history);
3767 if (earliest_seq == kMaxSequenceNumber) {
3768 earliest_seq = sv->mem->GetEarliestSequenceNumber();
3769 }
3770 assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
3771
3772 return earliest_seq;
3773 }
3774 #endif // ROCKSDB_LITE
3775
3776 #ifndef ROCKSDB_LITE
GetLatestSequenceForKey(SuperVersion * sv,const Slice & key,bool cache_only,SequenceNumber lower_bound_seq,SequenceNumber * seq,bool * found_record_for_key,bool * is_blob_index)3777 Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
3778 bool cache_only,
3779 SequenceNumber lower_bound_seq,
3780 SequenceNumber* seq,
3781 bool* found_record_for_key,
3782 bool* is_blob_index) {
3783 Status s;
3784 MergeContext merge_context;
3785 SequenceNumber max_covering_tombstone_seq = 0;
3786
3787 ReadOptions read_options;
3788 SequenceNumber current_seq = versions_->LastSequence();
3789 LookupKey lkey(key, current_seq);
3790
3791 *seq = kMaxSequenceNumber;
3792 *found_record_for_key = false;
3793
3794 // Check if there is a record for this key in the latest memtable
3795 sv->mem->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
3796 seq, read_options, nullptr /*read_callback*/, is_blob_index);
3797
3798 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3799 // unexpected error reading memtable.
3800 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3801 "Unexpected status returned from MemTable::Get: %s\n",
3802 s.ToString().c_str());
3803
3804 return s;
3805 }
3806
3807 if (*seq != kMaxSequenceNumber) {
3808 // Found a sequence number, no need to check immutable memtables
3809 *found_record_for_key = true;
3810 return Status::OK();
3811 }
3812
3813 SequenceNumber lower_bound_in_mem = sv->mem->GetEarliestSequenceNumber();
3814 if (lower_bound_in_mem != kMaxSequenceNumber &&
3815 lower_bound_in_mem < lower_bound_seq) {
3816 *found_record_for_key = false;
3817 return Status::OK();
3818 }
3819
3820 // Check if there is a record for this key in the immutable memtables
3821 sv->imm->Get(lkey, nullptr, &s, &merge_context, &max_covering_tombstone_seq,
3822 seq, read_options, nullptr /*read_callback*/, is_blob_index);
3823
3824 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3825 // unexpected error reading memtable.
3826 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3827 "Unexpected status returned from MemTableList::Get: %s\n",
3828 s.ToString().c_str());
3829
3830 return s;
3831 }
3832
3833 if (*seq != kMaxSequenceNumber) {
3834 // Found a sequence number, no need to check memtable history
3835 *found_record_for_key = true;
3836 return Status::OK();
3837 }
3838
3839 SequenceNumber lower_bound_in_imm = sv->imm->GetEarliestSequenceNumber();
3840 if (lower_bound_in_imm != kMaxSequenceNumber &&
3841 lower_bound_in_imm < lower_bound_seq) {
3842 *found_record_for_key = false;
3843 return Status::OK();
3844 }
3845
3846 // Check if there is a record for this key in the immutable memtables
3847 sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context,
3848 &max_covering_tombstone_seq, seq, read_options,
3849 is_blob_index);
3850
3851 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3852 // unexpected error reading memtable.
3853 ROCKS_LOG_ERROR(
3854 immutable_db_options_.info_log,
3855 "Unexpected status returned from MemTableList::GetFromHistory: %s\n",
3856 s.ToString().c_str());
3857
3858 return s;
3859 }
3860
3861 if (*seq != kMaxSequenceNumber) {
3862 // Found a sequence number, no need to check SST files
3863 *found_record_for_key = true;
3864 return Status::OK();
3865 }
3866
3867 // We could do a sv->imm->GetEarliestSequenceNumber(/*include_history*/ true)
3868 // check here to skip the history if possible. But currently the caller
3869 // already does that. Maybe we should move the logic here later.
3870
3871 // TODO(agiardullo): possible optimization: consider checking cached
3872 // SST files if cache_only=true?
3873 if (!cache_only) {
3874 // Check tables
3875 sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
3876 &max_covering_tombstone_seq, nullptr /* value_found */,
3877 found_record_for_key, seq, nullptr /*read_callback*/,
3878 is_blob_index);
3879
3880 if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
3881 // unexpected error reading SST files
3882 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
3883 "Unexpected status returned from Version::Get: %s\n",
3884 s.ToString().c_str());
3885 }
3886 }
3887
3888 return s;
3889 }
3890
IngestExternalFile(ColumnFamilyHandle * column_family,const std::vector<std::string> & external_files,const IngestExternalFileOptions & ingestion_options)3891 Status DBImpl::IngestExternalFile(
3892 ColumnFamilyHandle* column_family,
3893 const std::vector<std::string>& external_files,
3894 const IngestExternalFileOptions& ingestion_options) {
3895 IngestExternalFileArg arg;
3896 arg.column_family = column_family;
3897 arg.external_files = external_files;
3898 arg.options = ingestion_options;
3899 return IngestExternalFiles({arg});
3900 }
3901
IngestExternalFiles(const std::vector<IngestExternalFileArg> & args)3902 Status DBImpl::IngestExternalFiles(
3903 const std::vector<IngestExternalFileArg>& args) {
3904 if (args.empty()) {
3905 return Status::InvalidArgument("ingestion arg list is empty");
3906 }
3907 {
3908 std::unordered_set<ColumnFamilyHandle*> unique_cfhs;
3909 for (const auto& arg : args) {
3910 if (arg.column_family == nullptr) {
3911 return Status::InvalidArgument("column family handle is null");
3912 } else if (unique_cfhs.count(arg.column_family) > 0) {
3913 return Status::InvalidArgument(
3914 "ingestion args have duplicate column families");
3915 }
3916 unique_cfhs.insert(arg.column_family);
3917 }
3918 }
3919 // Ingest multiple external SST files atomically.
3920 size_t num_cfs = args.size();
3921 for (size_t i = 0; i != num_cfs; ++i) {
3922 if (args[i].external_files.empty()) {
3923 char err_msg[128] = {0};
3924 snprintf(err_msg, 128, "external_files[%zu] is empty", i);
3925 return Status::InvalidArgument(err_msg);
3926 }
3927 }
3928 for (const auto& arg : args) {
3929 const IngestExternalFileOptions& ingest_opts = arg.options;
3930 if (ingest_opts.ingest_behind &&
3931 !immutable_db_options_.allow_ingest_behind) {
3932 return Status::InvalidArgument(
3933 "can't ingest_behind file in DB with allow_ingest_behind=false");
3934 }
3935 }
3936
3937 // TODO (yanqin) maybe handle the case in which column_families have
3938 // duplicates
3939 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
3940 size_t total = 0;
3941 for (const auto& arg : args) {
3942 total += arg.external_files.size();
3943 }
3944 uint64_t next_file_number = 0;
3945 Status status = ReserveFileNumbersBeforeIngestion(
3946 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd(), total,
3947 pending_output_elem, &next_file_number);
3948 if (!status.ok()) {
3949 InstrumentedMutexLock l(&mutex_);
3950 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
3951 return status;
3952 }
3953
3954 std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
3955 for (const auto& arg : args) {
3956 auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
3957 ingestion_jobs.emplace_back(
3958 env_, versions_.get(), cfd, immutable_db_options_, file_options_,
3959 &snapshots_, arg.options, &directories_, &event_logger_);
3960 }
3961 std::vector<std::pair<bool, Status>> exec_results;
3962 for (size_t i = 0; i != num_cfs; ++i) {
3963 exec_results.emplace_back(false, Status::OK());
3964 }
3965 // TODO(yanqin) maybe make jobs run in parallel
3966 uint64_t start_file_number = next_file_number;
3967 for (size_t i = 1; i != num_cfs; ++i) {
3968 start_file_number += args[i - 1].external_files.size();
3969 auto* cfd =
3970 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
3971 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
3972 exec_results[i].second = ingestion_jobs[i].Prepare(
3973 args[i].external_files, start_file_number, super_version);
3974 exec_results[i].first = true;
3975 CleanupSuperVersion(super_version);
3976 }
3977 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:0");
3978 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeLastJobPrepare:1");
3979 {
3980 auto* cfd =
3981 static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
3982 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
3983 exec_results[0].second = ingestion_jobs[0].Prepare(
3984 args[0].external_files, next_file_number, super_version);
3985 exec_results[0].first = true;
3986 CleanupSuperVersion(super_version);
3987 }
3988 for (const auto& exec_result : exec_results) {
3989 if (!exec_result.second.ok()) {
3990 status = exec_result.second;
3991 break;
3992 }
3993 }
3994 if (!status.ok()) {
3995 for (size_t i = 0; i != num_cfs; ++i) {
3996 if (exec_results[i].first) {
3997 ingestion_jobs[i].Cleanup(status);
3998 }
3999 }
4000 InstrumentedMutexLock l(&mutex_);
4001 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4002 return status;
4003 }
4004
4005 std::vector<SuperVersionContext> sv_ctxs;
4006 for (size_t i = 0; i != num_cfs; ++i) {
4007 sv_ctxs.emplace_back(true /* create_superversion */);
4008 }
4009 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:0");
4010 TEST_SYNC_POINT("DBImpl::IngestExternalFiles:BeforeJobsRun:1");
4011 TEST_SYNC_POINT("DBImpl::AddFile:Start");
4012 {
4013 InstrumentedMutexLock l(&mutex_);
4014 TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
4015
4016 // Stop writes to the DB by entering both write threads
4017 WriteThread::Writer w;
4018 write_thread_.EnterUnbatched(&w, &mutex_);
4019 WriteThread::Writer nonmem_w;
4020 if (two_write_queues_) {
4021 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4022 }
4023
4024 // When unordered_write is enabled, the keys are writing to memtable in an
4025 // unordered way. If the ingestion job checks memtable key range before the
4026 // key landing in memtable, the ingestion job may skip the necessary
4027 // memtable flush.
4028 // So wait here to ensure there is no pending write to memtable.
4029 WaitForPendingWrites();
4030
4031 num_running_ingest_file_ += static_cast<int>(num_cfs);
4032 TEST_SYNC_POINT("DBImpl::IngestExternalFile:AfterIncIngestFileCounter");
4033
4034 bool at_least_one_cf_need_flush = false;
4035 std::vector<bool> need_flush(num_cfs, false);
4036 for (size_t i = 0; i != num_cfs; ++i) {
4037 auto* cfd =
4038 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4039 if (cfd->IsDropped()) {
4040 // TODO (yanqin) investigate whether we should abort ingestion or
4041 // proceed with other non-dropped column families.
4042 status = Status::InvalidArgument(
4043 "cannot ingest an external file into a dropped CF");
4044 break;
4045 }
4046 bool tmp = false;
4047 status = ingestion_jobs[i].NeedsFlush(&tmp, cfd->GetSuperVersion());
4048 need_flush[i] = tmp;
4049 at_least_one_cf_need_flush = (at_least_one_cf_need_flush || tmp);
4050 if (!status.ok()) {
4051 break;
4052 }
4053 }
4054 TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
4055 &at_least_one_cf_need_flush);
4056
4057 if (status.ok() && at_least_one_cf_need_flush) {
4058 FlushOptions flush_opts;
4059 flush_opts.allow_write_stall = true;
4060 if (immutable_db_options_.atomic_flush) {
4061 autovector<ColumnFamilyData*> cfds_to_flush;
4062 SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
4063 mutex_.Unlock();
4064 status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
4065 FlushReason::kExternalFileIngestion,
4066 true /* writes_stopped */);
4067 mutex_.Lock();
4068 } else {
4069 for (size_t i = 0; i != num_cfs; ++i) {
4070 if (need_flush[i]) {
4071 mutex_.Unlock();
4072 auto* cfd =
4073 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)
4074 ->cfd();
4075 status = FlushMemTable(cfd, flush_opts,
4076 FlushReason::kExternalFileIngestion,
4077 true /* writes_stopped */);
4078 mutex_.Lock();
4079 if (!status.ok()) {
4080 break;
4081 }
4082 }
4083 }
4084 }
4085 }
4086 // Run ingestion jobs.
4087 if (status.ok()) {
4088 for (size_t i = 0; i != num_cfs; ++i) {
4089 status = ingestion_jobs[i].Run();
4090 if (!status.ok()) {
4091 break;
4092 }
4093 }
4094 }
4095 if (status.ok()) {
4096 int consumed_seqno_count =
4097 ingestion_jobs[0].ConsumedSequenceNumbersCount();
4098 #ifndef NDEBUG
4099 for (size_t i = 1; i != num_cfs; ++i) {
4100 assert(!!consumed_seqno_count ==
4101 !!ingestion_jobs[i].ConsumedSequenceNumbersCount());
4102 consumed_seqno_count +=
4103 ingestion_jobs[i].ConsumedSequenceNumbersCount();
4104 }
4105 #endif
4106 if (consumed_seqno_count > 0) {
4107 const SequenceNumber last_seqno = versions_->LastSequence();
4108 versions_->SetLastAllocatedSequence(last_seqno + consumed_seqno_count);
4109 versions_->SetLastPublishedSequence(last_seqno + consumed_seqno_count);
4110 versions_->SetLastSequence(last_seqno + consumed_seqno_count);
4111 }
4112 autovector<ColumnFamilyData*> cfds_to_commit;
4113 autovector<const MutableCFOptions*> mutable_cf_options_list;
4114 autovector<autovector<VersionEdit*>> edit_lists;
4115 uint32_t num_entries = 0;
4116 for (size_t i = 0; i != num_cfs; ++i) {
4117 auto* cfd =
4118 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4119 if (cfd->IsDropped()) {
4120 continue;
4121 }
4122 cfds_to_commit.push_back(cfd);
4123 mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
4124 autovector<VersionEdit*> edit_list;
4125 edit_list.push_back(ingestion_jobs[i].edit());
4126 edit_lists.push_back(edit_list);
4127 ++num_entries;
4128 }
4129 // Mark the version edits as an atomic group if the number of version
4130 // edits exceeds 1.
4131 if (cfds_to_commit.size() > 1) {
4132 for (auto& edits : edit_lists) {
4133 assert(edits.size() == 1);
4134 edits[0]->MarkAtomicGroup(--num_entries);
4135 }
4136 assert(0 == num_entries);
4137 }
4138 status =
4139 versions_->LogAndApply(cfds_to_commit, mutable_cf_options_list,
4140 edit_lists, &mutex_, directories_.GetDbDir());
4141 }
4142
4143 if (status.ok()) {
4144 for (size_t i = 0; i != num_cfs; ++i) {
4145 auto* cfd =
4146 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4147 if (!cfd->IsDropped()) {
4148 InstallSuperVersionAndScheduleWork(cfd, &sv_ctxs[i],
4149 *cfd->GetLatestMutableCFOptions());
4150 #ifndef NDEBUG
4151 if (0 == i && num_cfs > 1) {
4152 TEST_SYNC_POINT(
4153 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:0");
4154 TEST_SYNC_POINT(
4155 "DBImpl::IngestExternalFiles:InstallSVForFirstCF:1");
4156 }
4157 #endif // !NDEBUG
4158 }
4159 }
4160 }
4161
4162 // Resume writes to the DB
4163 if (two_write_queues_) {
4164 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4165 }
4166 write_thread_.ExitUnbatched(&w);
4167
4168 if (status.ok()) {
4169 for (auto& job : ingestion_jobs) {
4170 job.UpdateStats();
4171 }
4172 }
4173 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4174 num_running_ingest_file_ -= static_cast<int>(num_cfs);
4175 if (0 == num_running_ingest_file_) {
4176 bg_cv_.SignalAll();
4177 }
4178 TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
4179 }
4180 // mutex_ is unlocked here
4181
4182 // Cleanup
4183 for (size_t i = 0; i != num_cfs; ++i) {
4184 sv_ctxs[i].Clean();
4185 // This may rollback jobs that have completed successfully. This is
4186 // intended for atomicity.
4187 ingestion_jobs[i].Cleanup(status);
4188 }
4189 if (status.ok()) {
4190 for (size_t i = 0; i != num_cfs; ++i) {
4191 auto* cfd =
4192 static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
4193 if (!cfd->IsDropped()) {
4194 NotifyOnExternalFileIngested(cfd, ingestion_jobs[i]);
4195 }
4196 }
4197 }
4198 return status;
4199 }
4200
CreateColumnFamilyWithImport(const ColumnFamilyOptions & options,const std::string & column_family_name,const ImportColumnFamilyOptions & import_options,const ExportImportFilesMetaData & metadata,ColumnFamilyHandle ** handle)4201 Status DBImpl::CreateColumnFamilyWithImport(
4202 const ColumnFamilyOptions& options, const std::string& column_family_name,
4203 const ImportColumnFamilyOptions& import_options,
4204 const ExportImportFilesMetaData& metadata, ColumnFamilyHandle** handle) {
4205 assert(handle != nullptr);
4206 assert(*handle == nullptr);
4207 std::string cf_comparator_name = options.comparator->Name();
4208 if (cf_comparator_name != metadata.db_comparator_name) {
4209 return Status::InvalidArgument("Comparator name mismatch");
4210 }
4211
4212 // Create column family.
4213 auto status = CreateColumnFamily(options, column_family_name, handle);
4214 if (!status.ok()) {
4215 return status;
4216 }
4217
4218 // Import sst files from metadata.
4219 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(*handle);
4220 auto cfd = cfh->cfd();
4221 ImportColumnFamilyJob import_job(env_, versions_.get(), cfd,
4222 immutable_db_options_, file_options_,
4223 import_options, metadata.files);
4224
4225 SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
4226 VersionEdit dummy_edit;
4227 uint64_t next_file_number = 0;
4228 std::unique_ptr<std::list<uint64_t>::iterator> pending_output_elem;
4229 {
4230 // Lock db mutex
4231 InstrumentedMutexLock l(&mutex_);
4232 if (error_handler_.IsDBStopped()) {
4233 // Don't import files when there is a bg_error
4234 status = error_handler_.GetBGError();
4235 }
4236
4237 // Make sure that bg cleanup wont delete the files that we are importing
4238 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4239 CaptureCurrentFileNumberInPendingOutputs()));
4240
4241 if (status.ok()) {
4242 // If crash happen after a hard link established, Recover function may
4243 // reuse the file number that has already assigned to the internal file,
4244 // and this will overwrite the external file. To protect the external
4245 // file, we have to make sure the file number will never being reused.
4246 next_file_number = versions_->FetchAddFileNumber(metadata.files.size());
4247 auto cf_options = cfd->GetLatestMutableCFOptions();
4248 status = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4249 directories_.GetDbDir());
4250 if (status.ok()) {
4251 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4252 }
4253 }
4254 }
4255 dummy_sv_ctx.Clean();
4256
4257 if (status.ok()) {
4258 SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
4259 status = import_job.Prepare(next_file_number, sv);
4260 CleanupSuperVersion(sv);
4261 }
4262
4263 if (status.ok()) {
4264 SuperVersionContext sv_context(true /*create_superversion*/);
4265 {
4266 // Lock db mutex
4267 InstrumentedMutexLock l(&mutex_);
4268
4269 // Stop writes to the DB by entering both write threads
4270 WriteThread::Writer w;
4271 write_thread_.EnterUnbatched(&w, &mutex_);
4272 WriteThread::Writer nonmem_w;
4273 if (two_write_queues_) {
4274 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
4275 }
4276
4277 num_running_ingest_file_++;
4278 assert(!cfd->IsDropped());
4279 status = import_job.Run();
4280
4281 // Install job edit [Mutex will be unlocked here]
4282 if (status.ok()) {
4283 auto cf_options = cfd->GetLatestMutableCFOptions();
4284 status = versions_->LogAndApply(cfd, *cf_options, import_job.edit(),
4285 &mutex_, directories_.GetDbDir());
4286 if (status.ok()) {
4287 InstallSuperVersionAndScheduleWork(cfd, &sv_context, *cf_options);
4288 }
4289 }
4290
4291 // Resume writes to the DB
4292 if (two_write_queues_) {
4293 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
4294 }
4295 write_thread_.ExitUnbatched(&w);
4296
4297 num_running_ingest_file_--;
4298 if (num_running_ingest_file_ == 0) {
4299 bg_cv_.SignalAll();
4300 }
4301 }
4302 // mutex_ is unlocked here
4303
4304 sv_context.Clean();
4305 }
4306
4307 {
4308 InstrumentedMutexLock l(&mutex_);
4309 ReleaseFileNumberFromPendingOutputs(pending_output_elem);
4310 }
4311
4312 import_job.Cleanup(status);
4313 if (!status.ok()) {
4314 DropColumnFamily(*handle);
4315 DestroyColumnFamilyHandle(*handle);
4316 *handle = nullptr;
4317 }
4318 return status;
4319 }
4320
VerifyChecksum(const ReadOptions & read_options)4321 Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
4322 Status s;
4323 std::vector<ColumnFamilyData*> cfd_list;
4324 {
4325 InstrumentedMutexLock l(&mutex_);
4326 for (auto cfd : *versions_->GetColumnFamilySet()) {
4327 if (!cfd->IsDropped() && cfd->initialized()) {
4328 cfd->Ref();
4329 cfd_list.push_back(cfd);
4330 }
4331 }
4332 }
4333 std::vector<SuperVersion*> sv_list;
4334 for (auto cfd : cfd_list) {
4335 sv_list.push_back(cfd->GetReferencedSuperVersion(this));
4336 }
4337 for (auto& sv : sv_list) {
4338 VersionStorageInfo* vstorage = sv->current->storage_info();
4339 ColumnFamilyData* cfd = sv->current->cfd();
4340 Options opts;
4341 {
4342 InstrumentedMutexLock l(&mutex_);
4343 opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
4344 cfd->GetLatestCFOptions());
4345 }
4346 for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
4347 for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
4348 j++) {
4349 const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
4350 std::string fname = TableFileName(cfd->ioptions()->cf_paths,
4351 fd.GetNumber(), fd.GetPathId());
4352 s = rocksdb::VerifySstFileChecksum(opts, file_options_, read_options,
4353 fname);
4354 }
4355 }
4356 if (!s.ok()) {
4357 break;
4358 }
4359 }
4360 bool defer_purge =
4361 immutable_db_options().avoid_unnecessary_blocking_io;
4362 {
4363 InstrumentedMutexLock l(&mutex_);
4364 for (auto sv : sv_list) {
4365 if (sv && sv->Unref()) {
4366 sv->Cleanup();
4367 if (defer_purge) {
4368 AddSuperVersionsToFreeQueue(sv);
4369 } else {
4370 delete sv;
4371 }
4372 }
4373 }
4374 if (defer_purge) {
4375 SchedulePurge();
4376 }
4377 for (auto cfd : cfd_list) {
4378 cfd->UnrefAndTryDelete();
4379 }
4380 }
4381 return s;
4382 }
4383
NotifyOnExternalFileIngested(ColumnFamilyData * cfd,const ExternalSstFileIngestionJob & ingestion_job)4384 void DBImpl::NotifyOnExternalFileIngested(
4385 ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
4386 if (immutable_db_options_.listeners.empty()) {
4387 return;
4388 }
4389
4390 for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
4391 ExternalFileIngestionInfo info;
4392 info.cf_name = cfd->GetName();
4393 info.external_file_path = f.external_file_path;
4394 info.internal_file_path = f.internal_file_path;
4395 info.global_seqno = f.assigned_seqno;
4396 info.table_properties = f.table_properties;
4397 for (auto listener : immutable_db_options_.listeners) {
4398 listener->OnExternalFileIngested(this, info);
4399 }
4400 }
4401 }
4402
WaitForIngestFile()4403 void DBImpl::WaitForIngestFile() {
4404 mutex_.AssertHeld();
4405 while (num_running_ingest_file_ > 0) {
4406 bg_cv_.Wait();
4407 }
4408 }
4409
StartTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4410 Status DBImpl::StartTrace(const TraceOptions& trace_options,
4411 std::unique_ptr<TraceWriter>&& trace_writer) {
4412 InstrumentedMutexLock lock(&trace_mutex_);
4413 tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer)));
4414 return Status::OK();
4415 }
4416
EndTrace()4417 Status DBImpl::EndTrace() {
4418 InstrumentedMutexLock lock(&trace_mutex_);
4419 Status s;
4420 if (tracer_ != nullptr) {
4421 s = tracer_->Close();
4422 tracer_.reset();
4423 } else {
4424 return Status::IOError("No trace file to close");
4425 }
4426 return s;
4427 }
4428
StartBlockCacheTrace(const TraceOptions & trace_options,std::unique_ptr<TraceWriter> && trace_writer)4429 Status DBImpl::StartBlockCacheTrace(
4430 const TraceOptions& trace_options,
4431 std::unique_ptr<TraceWriter>&& trace_writer) {
4432 return block_cache_tracer_.StartTrace(env_, trace_options,
4433 std::move(trace_writer));
4434 }
4435
EndBlockCacheTrace()4436 Status DBImpl::EndBlockCacheTrace() {
4437 block_cache_tracer_.EndTrace();
4438 return Status::OK();
4439 }
4440
TraceIteratorSeek(const uint32_t & cf_id,const Slice & key)4441 Status DBImpl::TraceIteratorSeek(const uint32_t& cf_id, const Slice& key) {
4442 Status s;
4443 if (tracer_) {
4444 InstrumentedMutexLock lock(&trace_mutex_);
4445 if (tracer_) {
4446 s = tracer_->IteratorSeek(cf_id, key);
4447 }
4448 }
4449 return s;
4450 }
4451
TraceIteratorSeekForPrev(const uint32_t & cf_id,const Slice & key)4452 Status DBImpl::TraceIteratorSeekForPrev(const uint32_t& cf_id,
4453 const Slice& key) {
4454 Status s;
4455 if (tracer_) {
4456 InstrumentedMutexLock lock(&trace_mutex_);
4457 if (tracer_) {
4458 s = tracer_->IteratorSeekForPrev(cf_id, key);
4459 }
4460 }
4461 return s;
4462 }
4463
ReserveFileNumbersBeforeIngestion(ColumnFamilyData * cfd,uint64_t num,std::unique_ptr<std::list<uint64_t>::iterator> & pending_output_elem,uint64_t * next_file_number)4464 Status DBImpl::ReserveFileNumbersBeforeIngestion(
4465 ColumnFamilyData* cfd, uint64_t num,
4466 std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
4467 uint64_t* next_file_number) {
4468 Status s;
4469 SuperVersionContext dummy_sv_ctx(true /* create_superversion */);
4470 assert(nullptr != next_file_number);
4471 InstrumentedMutexLock l(&mutex_);
4472 if (error_handler_.IsDBStopped()) {
4473 // Do not ingest files when there is a bg_error
4474 return error_handler_.GetBGError();
4475 }
4476 pending_output_elem.reset(new std::list<uint64_t>::iterator(
4477 CaptureCurrentFileNumberInPendingOutputs()));
4478 *next_file_number = versions_->FetchAddFileNumber(static_cast<uint64_t>(num));
4479 auto cf_options = cfd->GetLatestMutableCFOptions();
4480 VersionEdit dummy_edit;
4481 // If crash happen after a hard link established, Recover function may
4482 // reuse the file number that has already assigned to the internal file,
4483 // and this will overwrite the external file. To protect the external
4484 // file, we have to make sure the file number will never being reused.
4485 s = versions_->LogAndApply(cfd, *cf_options, &dummy_edit, &mutex_,
4486 directories_.GetDbDir());
4487 if (s.ok()) {
4488 InstallSuperVersionAndScheduleWork(cfd, &dummy_sv_ctx, *cf_options);
4489 }
4490 dummy_sv_ctx.Clean();
4491 return s;
4492 }
4493
GetCreationTimeOfOldestFile(uint64_t * creation_time)4494 Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
4495 if (mutable_db_options_.max_open_files == -1) {
4496 uint64_t oldest_time = port::kMaxUint64;
4497 for (auto cfd : *versions_->GetColumnFamilySet()) {
4498 if (!cfd->IsDropped()) {
4499 uint64_t ctime;
4500 {
4501 SuperVersion* sv = GetAndRefSuperVersion(cfd);
4502 Version* version = sv->current;
4503 version->GetCreationTimeOfOldestFile(&ctime);
4504 ReturnAndCleanupSuperVersion(cfd, sv);
4505 }
4506
4507 if (ctime < oldest_time) {
4508 oldest_time = ctime;
4509 }
4510 if (oldest_time == 0) {
4511 break;
4512 }
4513 }
4514 }
4515 *creation_time = oldest_time;
4516 return Status::OK();
4517 } else {
4518 return Status::NotSupported("This API only works if max_open_files = -1");
4519 }
4520 }
4521 #endif // ROCKSDB_LITE
4522
4523 } // namespace rocksdb
4524