1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/db_impl/db_impl_secondary.h"
7
8 #include <cinttypes>
9
10 #include "db/arena_wrapped_db_iter.h"
11 #include "db/merge_context.h"
12 #include "logging/auto_roll_logger.h"
13 #include "logging/logging.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "rocksdb/configurable.h"
16 #include "util/cast_util.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 #ifndef ROCKSDB_LITE
DBImplSecondary(const DBOptions & db_options,const std::string & dbname,std::string secondary_path)21 DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
22 const std::string& dbname,
23 std::string secondary_path)
24 : DBImpl(db_options, dbname, false, true, true),
25 secondary_path_(std::move(secondary_path)) {
26 ROCKS_LOG_INFO(immutable_db_options_.info_log,
27 "Opening the db in secondary mode");
28 LogFlush(immutable_db_options_.info_log);
29 }
30
~DBImplSecondary()31 DBImplSecondary::~DBImplSecondary() {}
32
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool,bool,bool,uint64_t *)33 Status DBImplSecondary::Recover(
34 const std::vector<ColumnFamilyDescriptor>& column_families,
35 bool /*readonly*/, bool /*error_if_wal_file_exists*/,
36 bool /*error_if_data_exists_in_wals*/, uint64_t*) {
37 mutex_.AssertHeld();
38
39 JobContext job_context(0);
40 Status s;
41 s = static_cast<ReactiveVersionSet*>(versions_.get())
42 ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
43 &manifest_reader_status_);
44 if (!s.ok()) {
45 if (manifest_reader_status_) {
46 manifest_reader_status_->PermitUncheckedError();
47 }
48 return s;
49 }
50 if (immutable_db_options_.paranoid_checks && s.ok()) {
51 s = CheckConsistency();
52 }
53 // Initial max_total_in_memory_state_ before recovery logs.
54 max_total_in_memory_state_ = 0;
55 for (auto cfd : *versions_->GetColumnFamilySet()) {
56 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
57 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
58 mutable_cf_options->max_write_buffer_number;
59 }
60 if (s.ok()) {
61 default_cf_handle_ = new ColumnFamilyHandleImpl(
62 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
63 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
64 single_column_family_mode_ =
65 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
66
67 std::unordered_set<ColumnFamilyData*> cfds_changed;
68 s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
69 }
70
71 if (s.IsPathNotFound()) {
72 ROCKS_LOG_INFO(immutable_db_options_.info_log,
73 "Secondary tries to read WAL, but WAL file(s) have already "
74 "been purged by primary.");
75 s = Status::OK();
76 }
77 // TODO: update options_file_number_ needed?
78
79 job_context.Clean();
80 return s;
81 }
82
83 // find new WAL and apply them in order to the secondary instance
FindAndRecoverLogFiles(std::unordered_set<ColumnFamilyData * > * cfds_changed,JobContext * job_context)84 Status DBImplSecondary::FindAndRecoverLogFiles(
85 std::unordered_set<ColumnFamilyData*>* cfds_changed,
86 JobContext* job_context) {
87 assert(nullptr != cfds_changed);
88 assert(nullptr != job_context);
89 Status s;
90 std::vector<uint64_t> logs;
91 s = FindNewLogNumbers(&logs);
92 if (s.ok() && !logs.empty()) {
93 SequenceNumber next_sequence(kMaxSequenceNumber);
94 s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
95 }
96 return s;
97 }
98
99 // List wal_dir and find all new WALs, return these log numbers
FindNewLogNumbers(std::vector<uint64_t> * logs)100 Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
101 assert(logs != nullptr);
102 std::vector<std::string> filenames;
103 Status s;
104 s = env_->GetChildren(immutable_db_options_.GetWalDir(), &filenames);
105 if (s.IsNotFound()) {
106 return Status::InvalidArgument("Failed to open wal_dir",
107 immutable_db_options_.GetWalDir());
108 } else if (!s.ok()) {
109 return s;
110 }
111
112 // if log_readers_ is non-empty, it means we have applied all logs with log
113 // numbers smaller than the smallest log in log_readers_, so there is no
114 // need to pass these logs to RecoverLogFiles
115 uint64_t log_number_min = 0;
116 if (!log_readers_.empty()) {
117 log_number_min = log_readers_.begin()->first;
118 }
119 for (size_t i = 0; i < filenames.size(); i++) {
120 uint64_t number;
121 FileType type;
122 if (ParseFileName(filenames[i], &number, &type) && type == kWalFile &&
123 number >= log_number_min) {
124 logs->push_back(number);
125 }
126 }
127 // Recover logs in the order that they were generated
128 if (!logs->empty()) {
129 std::sort(logs->begin(), logs->end());
130 }
131 return s;
132 }
133
MaybeInitLogReader(uint64_t log_number,log::FragmentBufferedReader ** log_reader)134 Status DBImplSecondary::MaybeInitLogReader(
135 uint64_t log_number, log::FragmentBufferedReader** log_reader) {
136 auto iter = log_readers_.find(log_number);
137 // make sure the log file is still present
138 if (iter == log_readers_.end() ||
139 iter->second->reader_->GetLogNumber() != log_number) {
140 // delete the obsolete log reader if log number mismatch
141 if (iter != log_readers_.end()) {
142 log_readers_.erase(iter);
143 }
144 // initialize log reader from log_number
145 // TODO: min_log_number_to_keep_2pc check needed?
146 // Open the log file
147 std::string fname =
148 LogFileName(immutable_db_options_.GetWalDir(), log_number);
149 ROCKS_LOG_INFO(immutable_db_options_.info_log,
150 "Recovering log #%" PRIu64 " mode %d", log_number,
151 static_cast<int>(immutable_db_options_.wal_recovery_mode));
152
153 std::unique_ptr<SequentialFileReader> file_reader;
154 {
155 std::unique_ptr<FSSequentialFile> file;
156 Status status = fs_->NewSequentialFile(
157 fname, fs_->OptimizeForLogRead(file_options_), &file,
158 nullptr);
159 if (!status.ok()) {
160 *log_reader = nullptr;
161 return status;
162 }
163 file_reader.reset(new SequentialFileReader(
164 std::move(file), fname, immutable_db_options_.log_readahead_size,
165 io_tracer_));
166 }
167
168 // Create the log reader.
169 LogReaderContainer* log_reader_container = new LogReaderContainer(
170 env_, immutable_db_options_.info_log, std::move(fname),
171 std::move(file_reader), log_number);
172 log_readers_.insert(std::make_pair(
173 log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
174 }
175 iter = log_readers_.find(log_number);
176 assert(iter != log_readers_.end());
177 *log_reader = iter->second->reader_;
178 return Status::OK();
179 }
180
181 // After manifest recovery, replay WALs and refresh log_readers_ if necessary
182 // REQUIRES: log_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & log_numbers,SequenceNumber * next_sequence,std::unordered_set<ColumnFamilyData * > * cfds_changed,JobContext * job_context)183 Status DBImplSecondary::RecoverLogFiles(
184 const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
185 std::unordered_set<ColumnFamilyData*>* cfds_changed,
186 JobContext* job_context) {
187 assert(nullptr != cfds_changed);
188 assert(nullptr != job_context);
189 mutex_.AssertHeld();
190 Status status;
191 for (auto log_number : log_numbers) {
192 log::FragmentBufferedReader* reader = nullptr;
193 status = MaybeInitLogReader(log_number, &reader);
194 if (!status.ok()) {
195 return status;
196 }
197 assert(reader != nullptr);
198 }
199 for (auto log_number : log_numbers) {
200 auto it = log_readers_.find(log_number);
201 assert(it != log_readers_.end());
202 log::FragmentBufferedReader* reader = it->second->reader_;
203 Status* wal_read_status = it->second->status_;
204 assert(wal_read_status);
205 // Manually update the file number allocation counter in VersionSet.
206 versions_->MarkFileNumberUsed(log_number);
207
208 // Determine if we should tolerate incomplete records at the tail end of the
209 // Read all the records and add to a memtable
210 std::string scratch;
211 Slice record;
212 WriteBatch batch;
213
214 while (reader->ReadRecord(&record, &scratch,
215 immutable_db_options_.wal_recovery_mode) &&
216 wal_read_status->ok() && status.ok()) {
217 if (record.size() < WriteBatchInternal::kHeader) {
218 reader->GetReporter()->Corruption(
219 record.size(), Status::Corruption("log record too small"));
220 continue;
221 }
222 status = WriteBatchInternal::SetContents(&batch, record);
223 if (!status.ok()) {
224 break;
225 }
226 SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
227 std::vector<uint32_t> column_family_ids;
228 status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
229 if (status.ok()) {
230 for (const auto id : column_family_ids) {
231 ColumnFamilyData* cfd =
232 versions_->GetColumnFamilySet()->GetColumnFamily(id);
233 if (cfd == nullptr) {
234 continue;
235 }
236 if (cfds_changed->count(cfd) == 0) {
237 cfds_changed->insert(cfd);
238 }
239 const std::vector<FileMetaData*>& l0_files =
240 cfd->current()->storage_info()->LevelFiles(0);
241 SequenceNumber seq =
242 l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
243 // If the write batch's sequence number is smaller than the last
244 // sequence number of the largest sequence persisted for this column
245 // family, then its data must reside in an SST that has already been
246 // added in the prior MANIFEST replay.
247 if (seq_of_batch <= seq) {
248 continue;
249 }
250 auto curr_log_num = port::kMaxUint64;
251 if (cfd_to_current_log_.count(cfd) > 0) {
252 curr_log_num = cfd_to_current_log_[cfd];
253 }
254 // If the active memtable contains records added by replaying an
255 // earlier WAL, then we need to seal the memtable, add it to the
256 // immutable memtable list and create a new active memtable.
257 if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 ||
258 curr_log_num != log_number)) {
259 const MutableCFOptions mutable_cf_options =
260 *cfd->GetLatestMutableCFOptions();
261 MemTable* new_mem =
262 cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
263 cfd->mem()->SetNextLogNumber(log_number);
264 cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
265 new_mem->Ref();
266 cfd->SetMemtable(new_mem);
267 }
268 }
269 bool has_valid_writes = false;
270 status = WriteBatchInternal::InsertInto(
271 &batch, column_family_memtables_.get(),
272 nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
273 true, log_number, this, false /* concurrent_memtable_writes */,
274 next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
275 }
276 // If column family was not found, it might mean that the WAL write
277 // batch references to the column family that was dropped after the
278 // insert. We don't want to fail the whole write batch in that case --
279 // we just ignore the update.
280 // That's why we set ignore missing column families to true
281 // passing null flush_scheduler will disable memtable flushing which is
282 // needed for secondary instances
283 if (status.ok()) {
284 for (const auto id : column_family_ids) {
285 ColumnFamilyData* cfd =
286 versions_->GetColumnFamilySet()->GetColumnFamily(id);
287 if (cfd == nullptr) {
288 continue;
289 }
290 std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
291 cfd_to_current_log_.find(cfd);
292 if (iter == cfd_to_current_log_.end()) {
293 cfd_to_current_log_.insert({cfd, log_number});
294 } else if (log_number > iter->second) {
295 iter->second = log_number;
296 }
297 }
298 auto last_sequence = *next_sequence - 1;
299 if ((*next_sequence != kMaxSequenceNumber) &&
300 (versions_->LastSequence() <= last_sequence)) {
301 versions_->SetLastAllocatedSequence(last_sequence);
302 versions_->SetLastPublishedSequence(last_sequence);
303 versions_->SetLastSequence(last_sequence);
304 }
305 } else {
306 // We are treating this as a failure while reading since we read valid
307 // blocks that do not form coherent data
308 reader->GetReporter()->Corruption(record.size(), status);
309 }
310 }
311 if (status.ok() && !wal_read_status->ok()) {
312 status = *wal_read_status;
313 }
314 if (!status.ok()) {
315 return status;
316 }
317 }
318 // remove logreaders from map after successfully recovering the WAL
319 if (log_readers_.size() > 1) {
320 auto erase_iter = log_readers_.begin();
321 std::advance(erase_iter, log_readers_.size() - 1);
322 log_readers_.erase(log_readers_.begin(), erase_iter);
323 }
324 return status;
325 }
326
327 // Implementation of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)328 Status DBImplSecondary::Get(const ReadOptions& read_options,
329 ColumnFamilyHandle* column_family, const Slice& key,
330 PinnableSlice* value) {
331 return GetImpl(read_options, column_family, key, value);
332 }
333
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)334 Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
335 ColumnFamilyHandle* column_family,
336 const Slice& key, PinnableSlice* pinnable_val) {
337 assert(pinnable_val != nullptr);
338 PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
339 StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
340 PERF_TIMER_GUARD(get_snapshot_time);
341
342 auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
343 ColumnFamilyData* cfd = cfh->cfd();
344 if (tracer_) {
345 InstrumentedMutexLock lock(&trace_mutex_);
346 if (tracer_) {
347 tracer_->Get(column_family, key);
348 }
349 }
350 // Acquire SuperVersion
351 SuperVersion* super_version = GetAndRefSuperVersion(cfd);
352 SequenceNumber snapshot = versions_->LastSequence();
353 MergeContext merge_context;
354 SequenceNumber max_covering_tombstone_seq = 0;
355 Status s;
356 LookupKey lkey(key, snapshot);
357 PERF_TIMER_STOP(get_snapshot_time);
358
359 bool done = false;
360 if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
361 /*timestamp=*/nullptr, &s, &merge_context,
362 &max_covering_tombstone_seq, read_options)) {
363 done = true;
364 pinnable_val->PinSelf();
365 RecordTick(stats_, MEMTABLE_HIT);
366 } else if ((s.ok() || s.IsMergeInProgress()) &&
367 super_version->imm->Get(
368 lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s,
369 &merge_context, &max_covering_tombstone_seq, read_options)) {
370 done = true;
371 pinnable_val->PinSelf();
372 RecordTick(stats_, MEMTABLE_HIT);
373 }
374 if (!done && !s.ok() && !s.IsMergeInProgress()) {
375 ReturnAndCleanupSuperVersion(cfd, super_version);
376 return s;
377 }
378 if (!done) {
379 PERF_TIMER_GUARD(get_from_output_files_time);
380 super_version->current->Get(read_options, lkey, pinnable_val,
381 /*timestamp=*/nullptr, &s, &merge_context,
382 &max_covering_tombstone_seq);
383 RecordTick(stats_, MEMTABLE_MISS);
384 }
385 {
386 PERF_TIMER_GUARD(get_post_process_time);
387 ReturnAndCleanupSuperVersion(cfd, super_version);
388 RecordTick(stats_, NUMBER_KEYS_READ);
389 size_t size = pinnable_val->size();
390 RecordTick(stats_, BYTES_READ, size);
391 RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
392 PERF_COUNTER_ADD(get_read_bytes, size);
393 }
394 return s;
395 }
396
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)397 Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
398 ColumnFamilyHandle* column_family) {
399 if (read_options.managed) {
400 return NewErrorIterator(
401 Status::NotSupported("Managed iterator is not supported anymore."));
402 }
403 if (read_options.read_tier == kPersistedTier) {
404 return NewErrorIterator(Status::NotSupported(
405 "ReadTier::kPersistedData is not yet supported in iterators."));
406 }
407 Iterator* result = nullptr;
408 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
409 auto cfd = cfh->cfd();
410 ReadCallback* read_callback = nullptr; // No read callback provided.
411 if (read_options.tailing) {
412 return NewErrorIterator(Status::NotSupported(
413 "tailing iterator not supported in secondary mode"));
414 } else if (read_options.snapshot != nullptr) {
415 // TODO (yanqin) support snapshot.
416 return NewErrorIterator(
417 Status::NotSupported("snapshot not supported in secondary mode"));
418 } else {
419 SequenceNumber snapshot(kMaxSequenceNumber);
420 result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
421 }
422 return result;
423 }
424
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool expose_blob_index,bool allow_refresh)425 ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
426 const ReadOptions& read_options, ColumnFamilyData* cfd,
427 SequenceNumber snapshot, ReadCallback* read_callback,
428 bool expose_blob_index, bool allow_refresh) {
429 assert(nullptr != cfd);
430 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
431 assert(snapshot == kMaxSequenceNumber);
432 snapshot = versions_->LastSequence();
433 assert(snapshot != kMaxSequenceNumber);
434 auto db_iter = NewArenaWrappedDbIterator(
435 env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
436 super_version->current, snapshot,
437 super_version->mutable_cf_options.max_sequential_skip_in_iterations,
438 super_version->version_number, read_callback, this, cfd,
439 expose_blob_index, read_options.snapshot ? false : allow_refresh);
440 auto internal_iter = NewInternalIterator(
441 db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
442 db_iter->GetRangeDelAggregator(), snapshot,
443 /* allow_unprepared_value */ true);
444 db_iter->SetIterUnderDBIter(internal_iter);
445 return db_iter;
446 }
447
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)448 Status DBImplSecondary::NewIterators(
449 const ReadOptions& read_options,
450 const std::vector<ColumnFamilyHandle*>& column_families,
451 std::vector<Iterator*>* iterators) {
452 if (read_options.managed) {
453 return Status::NotSupported("Managed iterator is not supported anymore.");
454 }
455 if (read_options.read_tier == kPersistedTier) {
456 return Status::NotSupported(
457 "ReadTier::kPersistedData is not yet supported in iterators.");
458 }
459 ReadCallback* read_callback = nullptr; // No read callback provided.
460 if (iterators == nullptr) {
461 return Status::InvalidArgument("iterators not allowed to be nullptr");
462 }
463 iterators->clear();
464 iterators->reserve(column_families.size());
465 if (read_options.tailing) {
466 return Status::NotSupported(
467 "tailing iterator not supported in secondary mode");
468 } else if (read_options.snapshot != nullptr) {
469 // TODO (yanqin) support snapshot.
470 return Status::NotSupported("snapshot not supported in secondary mode");
471 } else {
472 SequenceNumber read_seq = versions_->LastSequence();
473 for (auto cfh : column_families) {
474 ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
475 iterators->push_back(
476 NewIteratorImpl(read_options, cfd, read_seq, read_callback));
477 }
478 }
479 return Status::OK();
480 }
481
CheckConsistency()482 Status DBImplSecondary::CheckConsistency() {
483 mutex_.AssertHeld();
484 Status s = DBImpl::CheckConsistency();
485 // If DBImpl::CheckConsistency() which is stricter returns success, then we
486 // do not need to give a second chance.
487 if (s.ok()) {
488 return s;
489 }
490 // It's possible that DBImpl::CheckConssitency() can fail because the primary
491 // may have removed certain files, causing the GetFileSize(name) call to
492 // fail and returning a PathNotFound. In this case, we take a best-effort
493 // approach and just proceed.
494 TEST_SYNC_POINT_CALLBACK(
495 "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s);
496
497 if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
498 return Status::OK();
499 }
500
501 std::vector<LiveFileMetaData> metadata;
502 versions_->GetLiveFilesMetaData(&metadata);
503
504 std::string corruption_messages;
505 for (const auto& md : metadata) {
506 // md.name has a leading "/".
507 std::string file_path = md.db_path + md.name;
508
509 uint64_t fsize = 0;
510 s = env_->GetFileSize(file_path, &fsize);
511 if (!s.ok() &&
512 (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() ||
513 s.IsPathNotFound())) {
514 s = Status::OK();
515 }
516 if (!s.ok()) {
517 corruption_messages +=
518 "Can't access " + md.name + ": " + s.ToString() + "\n";
519 }
520 }
521 return corruption_messages.empty() ? Status::OK()
522 : Status::Corruption(corruption_messages);
523 }
524
TryCatchUpWithPrimary()525 Status DBImplSecondary::TryCatchUpWithPrimary() {
526 assert(versions_.get() != nullptr);
527 assert(manifest_reader_.get() != nullptr);
528 Status s;
529 // read the manifest and apply new changes to the secondary instance
530 std::unordered_set<ColumnFamilyData*> cfds_changed;
531 JobContext job_context(0, true /*create_superversion*/);
532 {
533 InstrumentedMutexLock lock_guard(&mutex_);
534 s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
535 ->ReadAndApply(&mutex_, &manifest_reader_,
536 manifest_reader_status_.get(), &cfds_changed);
537
538 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
539 static_cast<uint64_t>(versions_->LastSequence()));
540 for (ColumnFamilyData* cfd : cfds_changed) {
541 if (cfd->IsDropped()) {
542 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
543 cfd->GetName().c_str());
544 continue;
545 }
546 VersionStorageInfo::LevelSummaryStorage tmp;
547 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
548 "[%s] Level summary: %s\n", cfd->GetName().c_str(),
549 cfd->current()->storage_info()->LevelSummary(&tmp));
550 }
551
552 // list wal_dir to discover new WALs and apply new changes to the secondary
553 // instance
554 if (s.ok()) {
555 s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
556 }
557 if (s.IsPathNotFound()) {
558 ROCKS_LOG_INFO(
559 immutable_db_options_.info_log,
560 "Secondary tries to read WAL, but WAL file(s) have already "
561 "been purged by primary.");
562 s = Status::OK();
563 }
564 if (s.ok()) {
565 for (auto cfd : cfds_changed) {
566 cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
567 &job_context.memtables_to_free);
568 auto& sv_context = job_context.superversion_contexts.back();
569 cfd->InstallSuperVersion(&sv_context, &mutex_);
570 sv_context.NewSuperVersion();
571 }
572 }
573 }
574 job_context.Clean();
575
576 // Cleanup unused, obsolete files.
577 JobContext purge_files_job_context(0);
578 {
579 InstrumentedMutexLock lock_guard(&mutex_);
580 // Currently, secondary instance does not own the database files, thus it
581 // is unnecessary for the secondary to force full scan.
582 FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
583 }
584 if (purge_files_job_context.HaveSomethingToDelete()) {
585 PurgeObsoleteFiles(purge_files_job_context);
586 }
587 purge_files_job_context.Clean();
588 return s;
589 }
590
OpenAsSecondary(const Options & options,const std::string & dbname,const std::string & secondary_path,DB ** dbptr)591 Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
592 const std::string& secondary_path, DB** dbptr) {
593 *dbptr = nullptr;
594
595 DBOptions db_options(options);
596 ColumnFamilyOptions cf_options(options);
597 std::vector<ColumnFamilyDescriptor> column_families;
598 column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
599 std::vector<ColumnFamilyHandle*> handles;
600
601 Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
602 column_families, &handles, dbptr);
603 if (s.ok()) {
604 assert(handles.size() == 1);
605 delete handles[0];
606 }
607 return s;
608 }
609
OpenAsSecondary(const DBOptions & db_options,const std::string & dbname,const std::string & secondary_path,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)610 Status DB::OpenAsSecondary(
611 const DBOptions& db_options, const std::string& dbname,
612 const std::string& secondary_path,
613 const std::vector<ColumnFamilyDescriptor>& column_families,
614 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
615 *dbptr = nullptr;
616 if (db_options.max_open_files != -1) {
617 // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
618 // on SST files so that db secondary can still have access to old SSTs
619 // while primary instance may delete original.
620 return Status::InvalidArgument("require max_open_files to be -1");
621 }
622
623 DBOptions tmp_opts(db_options);
624 Status s;
625 if (nullptr == tmp_opts.info_log) {
626 s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
627 if (!s.ok()) {
628 tmp_opts.info_log = nullptr;
629 }
630 }
631
632 handles->clear();
633 DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path);
634 impl->versions_.reset(new ReactiveVersionSet(
635 dbname, &impl->immutable_db_options_, impl->file_options_,
636 impl->table_cache_.get(), impl->write_buffer_manager_,
637 &impl->write_controller_, impl->io_tracer_));
638 impl->column_family_memtables_.reset(
639 new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
640 impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
641
642 impl->mutex_.Lock();
643 s = impl->Recover(column_families, true, false, false);
644 if (s.ok()) {
645 for (auto cf : column_families) {
646 auto cfd =
647 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
648 if (nullptr == cfd) {
649 s = Status::InvalidArgument("Column family not found", cf.name);
650 break;
651 }
652 handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
653 }
654 }
655 SuperVersionContext sv_context(true /* create_superversion */);
656 if (s.ok()) {
657 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
658 sv_context.NewSuperVersion();
659 cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
660 }
661 }
662 impl->mutex_.Unlock();
663 sv_context.Clean();
664 if (s.ok()) {
665 *dbptr = impl;
666 for (auto h : *handles) {
667 impl->NewThreadStatusCfInfo(
668 static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
669 }
670 } else {
671 for (auto h : *handles) {
672 delete h;
673 }
674 handles->clear();
675 delete impl;
676 }
677 return s;
678 }
679
CompactWithoutInstallation(ColumnFamilyHandle * cfh,const CompactionServiceInput & input,CompactionServiceResult * result)680 Status DBImplSecondary::CompactWithoutInstallation(
681 ColumnFamilyHandle* cfh, const CompactionServiceInput& input,
682 CompactionServiceResult* result) {
683 InstrumentedMutexLock l(&mutex_);
684 auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
685 if (!cfd) {
686 return Status::InvalidArgument("Cannot find column family" +
687 cfh->GetName());
688 }
689
690 std::unordered_set<uint64_t> input_set;
691 for (const auto& file_name : input.input_files) {
692 input_set.insert(TableFileNameToNumber(file_name));
693 }
694
695 auto* version = cfd->current();
696
697 ColumnFamilyMetaData cf_meta;
698 version->GetColumnFamilyMetaData(&cf_meta);
699
700 const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions();
701 ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions();
702 VersionStorageInfo* vstorage = version->storage_info();
703
704 // Use comp_options to reuse some CompactFiles functions
705 CompactionOptions comp_options;
706 comp_options.compression = kDisableCompressionOption;
707 comp_options.output_file_size_limit = MaxFileSizeForLevel(
708 *mutable_cf_options, input.output_level, cf_options.compaction_style,
709 vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes);
710
711 std::vector<CompactionInputFiles> input_files;
712 Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
713 &input_files, &input_set, vstorage, comp_options);
714 if (!s.ok()) {
715 return s;
716 }
717
718 std::unique_ptr<Compaction> c;
719 assert(cfd->compaction_picker());
720 c.reset(cfd->compaction_picker()->CompactFiles(
721 comp_options, input_files, input.output_level, vstorage,
722 *mutable_cf_options, mutable_db_options_, 0));
723 assert(c != nullptr);
724
725 c->SetInputVersion(version);
726
727 // Create output directory if it's not existed yet
728 std::unique_ptr<FSDirectory> output_dir;
729 s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir);
730 if (!s.ok()) {
731 return s;
732 }
733
734 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
735 immutable_db_options_.info_log.get());
736
737 const int job_id = next_job_id_.fetch_add(1);
738
739 CompactionServiceCompactionJob compaction_job(
740 job_id, c.get(), immutable_db_options_, mutable_db_options_,
741 file_options_for_compaction_, versions_.get(), &shutting_down_,
742 &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
743 input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
744 db_id_, db_session_id_, secondary_path_, input, result);
745
746 mutex_.Unlock();
747 s = compaction_job.Run();
748 mutex_.Lock();
749
750 // clean up
751 compaction_job.io_status().PermitUncheckedError();
752 compaction_job.CleanupCompaction();
753 c->ReleaseCompactionFiles(s);
754 c.reset();
755
756 TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End",
757 &s);
758 result->status = s;
759 return s;
760 }
761
OpenAndCompact(const std::string & name,const std::string & output_directory,const std::string & input,std::string * result,const CompactionServiceOptionsOverride & override_options)762 Status DB::OpenAndCompact(
763 const std::string& name, const std::string& output_directory,
764 const std::string& input, std::string* result,
765 const CompactionServiceOptionsOverride& override_options) {
766 CompactionServiceInput compaction_input;
767 Status s = CompactionServiceInput::Read(input, &compaction_input);
768 if (!s.ok()) {
769 return s;
770 }
771
772 compaction_input.db_options.max_open_files = -1;
773 compaction_input.db_options.compaction_service = nullptr;
774 if (compaction_input.db_options.statistics) {
775 compaction_input.db_options.statistics.reset();
776 }
777 compaction_input.db_options.env = override_options.env;
778 compaction_input.db_options.file_checksum_gen_factory =
779 override_options.file_checksum_gen_factory;
780 compaction_input.db_options.statistics = override_options.statistics;
781 compaction_input.column_family.options.comparator =
782 override_options.comparator;
783 compaction_input.column_family.options.merge_operator =
784 override_options.merge_operator;
785 compaction_input.column_family.options.compaction_filter =
786 override_options.compaction_filter;
787 compaction_input.column_family.options.compaction_filter_factory =
788 override_options.compaction_filter_factory;
789 compaction_input.column_family.options.prefix_extractor =
790 override_options.prefix_extractor;
791 compaction_input.column_family.options.table_factory =
792 override_options.table_factory;
793 compaction_input.column_family.options.sst_partitioner_factory =
794 override_options.sst_partitioner_factory;
795
796 std::vector<ColumnFamilyDescriptor> column_families;
797 column_families.push_back(compaction_input.column_family);
798 // TODO: we have to open default CF, because of an implementation limitation,
799 // currently we just use the same CF option from input, which is not collect
800 // and open may fail.
801 if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
802 column_families.emplace_back(kDefaultColumnFamilyName,
803 compaction_input.column_family.options);
804 }
805
806 DB* db;
807 std::vector<ColumnFamilyHandle*> handles;
808
809 s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
810 column_families, &handles, &db);
811 if (!s.ok()) {
812 return s;
813 }
814
815 CompactionServiceResult compaction_result;
816 DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
817 assert(handles.size() > 0);
818 s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input,
819 &compaction_result);
820
821 Status serialization_status = compaction_result.Write(result);
822
823 for (auto& handle : handles) {
824 delete handle;
825 }
826 delete db;
827 if (s.ok()) {
828 return serialization_status;
829 }
830 return s;
831 }
832
833 #else // !ROCKSDB_LITE
834
835 Status DB::OpenAsSecondary(const Options& /*options*/,
836 const std::string& /*name*/,
837 const std::string& /*secondary_path*/,
838 DB** /*dbptr*/) {
839 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
840 }
841
842 Status DB::OpenAsSecondary(
843 const DBOptions& /*db_options*/, const std::string& /*dbname*/,
844 const std::string& /*secondary_path*/,
845 const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
846 std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
847 return Status::NotSupported("Not supported in ROCKSDB_LITE.");
848 }
849 #endif // !ROCKSDB_LITE
850
851 } // namespace ROCKSDB_NAMESPACE
852