1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/db_impl/db_impl_secondary.h"
7 
8 #include <cinttypes>
9 
10 #include "db/arena_wrapped_db_iter.h"
11 #include "db/merge_context.h"
12 #include "logging/auto_roll_logger.h"
13 #include "logging/logging.h"
14 #include "monitoring/perf_context_imp.h"
15 #include "rocksdb/configurable.h"
16 #include "util/cast_util.h"
17 
18 namespace ROCKSDB_NAMESPACE {
19 
20 #ifndef ROCKSDB_LITE
DBImplSecondary(const DBOptions & db_options,const std::string & dbname,std::string secondary_path)21 DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
22                                  const std::string& dbname,
23                                  std::string secondary_path)
24     : DBImpl(db_options, dbname, false, true, true),
25       secondary_path_(std::move(secondary_path)) {
26   ROCKS_LOG_INFO(immutable_db_options_.info_log,
27                  "Opening the db in secondary mode");
28   LogFlush(immutable_db_options_.info_log);
29 }
30 
~DBImplSecondary()31 DBImplSecondary::~DBImplSecondary() {}
32 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool,bool,bool,uint64_t *)33 Status DBImplSecondary::Recover(
34     const std::vector<ColumnFamilyDescriptor>& column_families,
35     bool /*readonly*/, bool /*error_if_wal_file_exists*/,
36     bool /*error_if_data_exists_in_wals*/, uint64_t*) {
37   mutex_.AssertHeld();
38 
39   JobContext job_context(0);
40   Status s;
41   s = static_cast<ReactiveVersionSet*>(versions_.get())
42           ->Recover(column_families, &manifest_reader_, &manifest_reporter_,
43                     &manifest_reader_status_);
44   if (!s.ok()) {
45     if (manifest_reader_status_) {
46       manifest_reader_status_->PermitUncheckedError();
47     }
48     return s;
49   }
50   if (immutable_db_options_.paranoid_checks && s.ok()) {
51     s = CheckConsistency();
52   }
53   // Initial max_total_in_memory_state_ before recovery logs.
54   max_total_in_memory_state_ = 0;
55   for (auto cfd : *versions_->GetColumnFamilySet()) {
56     auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
57     max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
58                                   mutable_cf_options->max_write_buffer_number;
59   }
60   if (s.ok()) {
61     default_cf_handle_ = new ColumnFamilyHandleImpl(
62         versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
63     default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
64     single_column_family_mode_ =
65         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
66 
67     std::unordered_set<ColumnFamilyData*> cfds_changed;
68     s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
69   }
70 
71   if (s.IsPathNotFound()) {
72     ROCKS_LOG_INFO(immutable_db_options_.info_log,
73                    "Secondary tries to read WAL, but WAL file(s) have already "
74                    "been purged by primary.");
75     s = Status::OK();
76   }
77   // TODO: update options_file_number_ needed?
78 
79   job_context.Clean();
80   return s;
81 }
82 
83 // find new WAL and apply them in order to the secondary instance
FindAndRecoverLogFiles(std::unordered_set<ColumnFamilyData * > * cfds_changed,JobContext * job_context)84 Status DBImplSecondary::FindAndRecoverLogFiles(
85     std::unordered_set<ColumnFamilyData*>* cfds_changed,
86     JobContext* job_context) {
87   assert(nullptr != cfds_changed);
88   assert(nullptr != job_context);
89   Status s;
90   std::vector<uint64_t> logs;
91   s = FindNewLogNumbers(&logs);
92   if (s.ok() && !logs.empty()) {
93     SequenceNumber next_sequence(kMaxSequenceNumber);
94     s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context);
95   }
96   return s;
97 }
98 
99 // List wal_dir and find all new WALs, return these log numbers
FindNewLogNumbers(std::vector<uint64_t> * logs)100 Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) {
101   assert(logs != nullptr);
102   std::vector<std::string> filenames;
103   Status s;
104   s = env_->GetChildren(immutable_db_options_.GetWalDir(), &filenames);
105   if (s.IsNotFound()) {
106     return Status::InvalidArgument("Failed to open wal_dir",
107                                    immutable_db_options_.GetWalDir());
108   } else if (!s.ok()) {
109     return s;
110   }
111 
112   // if log_readers_ is non-empty, it means we have applied all logs with log
113   // numbers smaller than the smallest log in log_readers_, so there is no
114   // need to pass these logs to RecoverLogFiles
115   uint64_t log_number_min = 0;
116   if (!log_readers_.empty()) {
117     log_number_min = log_readers_.begin()->first;
118   }
119   for (size_t i = 0; i < filenames.size(); i++) {
120     uint64_t number;
121     FileType type;
122     if (ParseFileName(filenames[i], &number, &type) && type == kWalFile &&
123         number >= log_number_min) {
124       logs->push_back(number);
125     }
126   }
127   // Recover logs in the order that they were generated
128   if (!logs->empty()) {
129     std::sort(logs->begin(), logs->end());
130   }
131   return s;
132 }
133 
MaybeInitLogReader(uint64_t log_number,log::FragmentBufferedReader ** log_reader)134 Status DBImplSecondary::MaybeInitLogReader(
135     uint64_t log_number, log::FragmentBufferedReader** log_reader) {
136   auto iter = log_readers_.find(log_number);
137   // make sure the log file is still present
138   if (iter == log_readers_.end() ||
139       iter->second->reader_->GetLogNumber() != log_number) {
140     // delete the obsolete log reader if log number mismatch
141     if (iter != log_readers_.end()) {
142       log_readers_.erase(iter);
143     }
144     // initialize log reader from log_number
145     // TODO: min_log_number_to_keep_2pc check needed?
146     // Open the log file
147     std::string fname =
148         LogFileName(immutable_db_options_.GetWalDir(), log_number);
149     ROCKS_LOG_INFO(immutable_db_options_.info_log,
150                    "Recovering log #%" PRIu64 " mode %d", log_number,
151                    static_cast<int>(immutable_db_options_.wal_recovery_mode));
152 
153     std::unique_ptr<SequentialFileReader> file_reader;
154     {
155       std::unique_ptr<FSSequentialFile> file;
156       Status status = fs_->NewSequentialFile(
157           fname, fs_->OptimizeForLogRead(file_options_), &file,
158           nullptr);
159       if (!status.ok()) {
160         *log_reader = nullptr;
161         return status;
162       }
163       file_reader.reset(new SequentialFileReader(
164           std::move(file), fname, immutable_db_options_.log_readahead_size,
165           io_tracer_));
166     }
167 
168     // Create the log reader.
169     LogReaderContainer* log_reader_container = new LogReaderContainer(
170         env_, immutable_db_options_.info_log, std::move(fname),
171         std::move(file_reader), log_number);
172     log_readers_.insert(std::make_pair(
173         log_number, std::unique_ptr<LogReaderContainer>(log_reader_container)));
174   }
175   iter = log_readers_.find(log_number);
176   assert(iter != log_readers_.end());
177   *log_reader = iter->second->reader_;
178   return Status::OK();
179 }
180 
181 // After manifest recovery, replay WALs and refresh log_readers_ if necessary
182 // REQUIRES: log_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & log_numbers,SequenceNumber * next_sequence,std::unordered_set<ColumnFamilyData * > * cfds_changed,JobContext * job_context)183 Status DBImplSecondary::RecoverLogFiles(
184     const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence,
185     std::unordered_set<ColumnFamilyData*>* cfds_changed,
186     JobContext* job_context) {
187   assert(nullptr != cfds_changed);
188   assert(nullptr != job_context);
189   mutex_.AssertHeld();
190   Status status;
191   for (auto log_number : log_numbers) {
192     log::FragmentBufferedReader* reader = nullptr;
193     status = MaybeInitLogReader(log_number, &reader);
194     if (!status.ok()) {
195       return status;
196     }
197     assert(reader != nullptr);
198   }
199   for (auto log_number : log_numbers) {
200     auto it  = log_readers_.find(log_number);
201     assert(it != log_readers_.end());
202     log::FragmentBufferedReader* reader = it->second->reader_;
203     Status* wal_read_status = it->second->status_;
204     assert(wal_read_status);
205     // Manually update the file number allocation counter in VersionSet.
206     versions_->MarkFileNumberUsed(log_number);
207 
208     // Determine if we should tolerate incomplete records at the tail end of the
209     // Read all the records and add to a memtable
210     std::string scratch;
211     Slice record;
212     WriteBatch batch;
213 
214     while (reader->ReadRecord(&record, &scratch,
215                               immutable_db_options_.wal_recovery_mode) &&
216            wal_read_status->ok() && status.ok()) {
217       if (record.size() < WriteBatchInternal::kHeader) {
218         reader->GetReporter()->Corruption(
219             record.size(), Status::Corruption("log record too small"));
220         continue;
221       }
222       status = WriteBatchInternal::SetContents(&batch, record);
223       if (!status.ok()) {
224         break;
225       }
226       SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch);
227       std::vector<uint32_t> column_family_ids;
228       status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids);
229       if (status.ok()) {
230         for (const auto id : column_family_ids) {
231           ColumnFamilyData* cfd =
232               versions_->GetColumnFamilySet()->GetColumnFamily(id);
233           if (cfd == nullptr) {
234             continue;
235           }
236           if (cfds_changed->count(cfd) == 0) {
237             cfds_changed->insert(cfd);
238           }
239           const std::vector<FileMetaData*>& l0_files =
240               cfd->current()->storage_info()->LevelFiles(0);
241           SequenceNumber seq =
242               l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno;
243           // If the write batch's sequence number is smaller than the last
244           // sequence number of the largest sequence persisted for this column
245           // family, then its data must reside in an SST that has already been
246           // added in the prior MANIFEST replay.
247           if (seq_of_batch <= seq) {
248             continue;
249           }
250           auto curr_log_num = port::kMaxUint64;
251           if (cfd_to_current_log_.count(cfd) > 0) {
252             curr_log_num = cfd_to_current_log_[cfd];
253           }
254           // If the active memtable contains records added by replaying an
255           // earlier WAL, then we need to seal the memtable, add it to the
256           // immutable memtable list and create a new active memtable.
257           if (!cfd->mem()->IsEmpty() && (curr_log_num == port::kMaxUint64 ||
258                                          curr_log_num != log_number)) {
259             const MutableCFOptions mutable_cf_options =
260                 *cfd->GetLatestMutableCFOptions();
261             MemTable* new_mem =
262                 cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch);
263             cfd->mem()->SetNextLogNumber(log_number);
264             cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free);
265             new_mem->Ref();
266             cfd->SetMemtable(new_mem);
267           }
268         }
269         bool has_valid_writes = false;
270         status = WriteBatchInternal::InsertInto(
271             &batch, column_family_memtables_.get(),
272             nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/,
273             true, log_number, this, false /* concurrent_memtable_writes */,
274             next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_);
275       }
276       // If column family was not found, it might mean that the WAL write
277       // batch references to the column family that was dropped after the
278       // insert. We don't want to fail the whole write batch in that case --
279       // we just ignore the update.
280       // That's why we set ignore missing column families to true
281       // passing null flush_scheduler will disable memtable flushing which is
282       // needed for secondary instances
283       if (status.ok()) {
284         for (const auto id : column_family_ids) {
285           ColumnFamilyData* cfd =
286               versions_->GetColumnFamilySet()->GetColumnFamily(id);
287           if (cfd == nullptr) {
288             continue;
289           }
290           std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter =
291               cfd_to_current_log_.find(cfd);
292           if (iter == cfd_to_current_log_.end()) {
293             cfd_to_current_log_.insert({cfd, log_number});
294           } else if (log_number > iter->second) {
295             iter->second = log_number;
296           }
297         }
298         auto last_sequence = *next_sequence - 1;
299         if ((*next_sequence != kMaxSequenceNumber) &&
300             (versions_->LastSequence() <= last_sequence)) {
301           versions_->SetLastAllocatedSequence(last_sequence);
302           versions_->SetLastPublishedSequence(last_sequence);
303           versions_->SetLastSequence(last_sequence);
304         }
305       } else {
306         // We are treating this as a failure while reading since we read valid
307         // blocks that do not form coherent data
308         reader->GetReporter()->Corruption(record.size(), status);
309       }
310     }
311     if (status.ok() && !wal_read_status->ok()) {
312       status = *wal_read_status;
313     }
314     if (!status.ok()) {
315       return status;
316     }
317   }
318   // remove logreaders from map after successfully recovering the WAL
319   if (log_readers_.size() > 1) {
320     auto erase_iter = log_readers_.begin();
321     std::advance(erase_iter, log_readers_.size() - 1);
322     log_readers_.erase(log_readers_.begin(), erase_iter);
323   }
324   return status;
325 }
326 
327 // Implementation of the DB interface
Get(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * value)328 Status DBImplSecondary::Get(const ReadOptions& read_options,
329                             ColumnFamilyHandle* column_family, const Slice& key,
330                             PinnableSlice* value) {
331   return GetImpl(read_options, column_family, key, value);
332 }
333 
GetImpl(const ReadOptions & read_options,ColumnFamilyHandle * column_family,const Slice & key,PinnableSlice * pinnable_val)334 Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
335                                 ColumnFamilyHandle* column_family,
336                                 const Slice& key, PinnableSlice* pinnable_val) {
337   assert(pinnable_val != nullptr);
338   PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
339   StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
340   PERF_TIMER_GUARD(get_snapshot_time);
341 
342   auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
343   ColumnFamilyData* cfd = cfh->cfd();
344   if (tracer_) {
345     InstrumentedMutexLock lock(&trace_mutex_);
346     if (tracer_) {
347       tracer_->Get(column_family, key);
348     }
349   }
350   // Acquire SuperVersion
351   SuperVersion* super_version = GetAndRefSuperVersion(cfd);
352   SequenceNumber snapshot = versions_->LastSequence();
353   MergeContext merge_context;
354   SequenceNumber max_covering_tombstone_seq = 0;
355   Status s;
356   LookupKey lkey(key, snapshot);
357   PERF_TIMER_STOP(get_snapshot_time);
358 
359   bool done = false;
360   if (super_version->mem->Get(lkey, pinnable_val->GetSelf(),
361                               /*timestamp=*/nullptr, &s, &merge_context,
362                               &max_covering_tombstone_seq, read_options)) {
363     done = true;
364     pinnable_val->PinSelf();
365     RecordTick(stats_, MEMTABLE_HIT);
366   } else if ((s.ok() || s.IsMergeInProgress()) &&
367              super_version->imm->Get(
368                  lkey, pinnable_val->GetSelf(), /*timestamp=*/nullptr, &s,
369                  &merge_context, &max_covering_tombstone_seq, read_options)) {
370     done = true;
371     pinnable_val->PinSelf();
372     RecordTick(stats_, MEMTABLE_HIT);
373   }
374   if (!done && !s.ok() && !s.IsMergeInProgress()) {
375     ReturnAndCleanupSuperVersion(cfd, super_version);
376     return s;
377   }
378   if (!done) {
379     PERF_TIMER_GUARD(get_from_output_files_time);
380     super_version->current->Get(read_options, lkey, pinnable_val,
381                                 /*timestamp=*/nullptr, &s, &merge_context,
382                                 &max_covering_tombstone_seq);
383     RecordTick(stats_, MEMTABLE_MISS);
384   }
385   {
386     PERF_TIMER_GUARD(get_post_process_time);
387     ReturnAndCleanupSuperVersion(cfd, super_version);
388     RecordTick(stats_, NUMBER_KEYS_READ);
389     size_t size = pinnable_val->size();
390     RecordTick(stats_, BYTES_READ, size);
391     RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
392     PERF_COUNTER_ADD(get_read_bytes, size);
393   }
394   return s;
395 }
396 
NewIterator(const ReadOptions & read_options,ColumnFamilyHandle * column_family)397 Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
398                                        ColumnFamilyHandle* column_family) {
399   if (read_options.managed) {
400     return NewErrorIterator(
401         Status::NotSupported("Managed iterator is not supported anymore."));
402   }
403   if (read_options.read_tier == kPersistedTier) {
404     return NewErrorIterator(Status::NotSupported(
405         "ReadTier::kPersistedData is not yet supported in iterators."));
406   }
407   Iterator* result = nullptr;
408   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
409   auto cfd = cfh->cfd();
410   ReadCallback* read_callback = nullptr;  // No read callback provided.
411   if (read_options.tailing) {
412     return NewErrorIterator(Status::NotSupported(
413         "tailing iterator not supported in secondary mode"));
414   } else if (read_options.snapshot != nullptr) {
415     // TODO (yanqin) support snapshot.
416     return NewErrorIterator(
417         Status::NotSupported("snapshot not supported in secondary mode"));
418   } else {
419     SequenceNumber snapshot(kMaxSequenceNumber);
420     result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
421   }
422   return result;
423 }
424 
NewIteratorImpl(const ReadOptions & read_options,ColumnFamilyData * cfd,SequenceNumber snapshot,ReadCallback * read_callback,bool expose_blob_index,bool allow_refresh)425 ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
426     const ReadOptions& read_options, ColumnFamilyData* cfd,
427     SequenceNumber snapshot, ReadCallback* read_callback,
428     bool expose_blob_index, bool allow_refresh) {
429   assert(nullptr != cfd);
430   SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
431   assert(snapshot == kMaxSequenceNumber);
432   snapshot = versions_->LastSequence();
433   assert(snapshot != kMaxSequenceNumber);
434   auto db_iter = NewArenaWrappedDbIterator(
435       env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
436       super_version->current, snapshot,
437       super_version->mutable_cf_options.max_sequential_skip_in_iterations,
438       super_version->version_number, read_callback, this, cfd,
439       expose_blob_index, read_options.snapshot ? false : allow_refresh);
440   auto internal_iter = NewInternalIterator(
441       db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(),
442       db_iter->GetRangeDelAggregator(), snapshot,
443       /* allow_unprepared_value */ true);
444   db_iter->SetIterUnderDBIter(internal_iter);
445   return db_iter;
446 }
447 
NewIterators(const ReadOptions & read_options,const std::vector<ColumnFamilyHandle * > & column_families,std::vector<Iterator * > * iterators)448 Status DBImplSecondary::NewIterators(
449     const ReadOptions& read_options,
450     const std::vector<ColumnFamilyHandle*>& column_families,
451     std::vector<Iterator*>* iterators) {
452   if (read_options.managed) {
453     return Status::NotSupported("Managed iterator is not supported anymore.");
454   }
455   if (read_options.read_tier == kPersistedTier) {
456     return Status::NotSupported(
457         "ReadTier::kPersistedData is not yet supported in iterators.");
458   }
459   ReadCallback* read_callback = nullptr;  // No read callback provided.
460   if (iterators == nullptr) {
461     return Status::InvalidArgument("iterators not allowed to be nullptr");
462   }
463   iterators->clear();
464   iterators->reserve(column_families.size());
465   if (read_options.tailing) {
466     return Status::NotSupported(
467         "tailing iterator not supported in secondary mode");
468   } else if (read_options.snapshot != nullptr) {
469     // TODO (yanqin) support snapshot.
470     return Status::NotSupported("snapshot not supported in secondary mode");
471   } else {
472     SequenceNumber read_seq = versions_->LastSequence();
473     for (auto cfh : column_families) {
474       ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
475       iterators->push_back(
476           NewIteratorImpl(read_options, cfd, read_seq, read_callback));
477     }
478   }
479   return Status::OK();
480 }
481 
CheckConsistency()482 Status DBImplSecondary::CheckConsistency() {
483   mutex_.AssertHeld();
484   Status s = DBImpl::CheckConsistency();
485   // If DBImpl::CheckConsistency() which is stricter returns success, then we
486   // do not need to give a second chance.
487   if (s.ok()) {
488     return s;
489   }
490   // It's possible that DBImpl::CheckConssitency() can fail because the primary
491   // may have removed certain files, causing the GetFileSize(name) call to
492   // fail and returning a PathNotFound. In this case, we take a best-effort
493   // approach and just proceed.
494   TEST_SYNC_POINT_CALLBACK(
495       "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s);
496 
497   if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) {
498     return Status::OK();
499   }
500 
501   std::vector<LiveFileMetaData> metadata;
502   versions_->GetLiveFilesMetaData(&metadata);
503 
504   std::string corruption_messages;
505   for (const auto& md : metadata) {
506     // md.name has a leading "/".
507     std::string file_path = md.db_path + md.name;
508 
509     uint64_t fsize = 0;
510     s = env_->GetFileSize(file_path, &fsize);
511     if (!s.ok() &&
512         (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() ||
513          s.IsPathNotFound())) {
514       s = Status::OK();
515     }
516     if (!s.ok()) {
517       corruption_messages +=
518           "Can't access " + md.name + ": " + s.ToString() + "\n";
519     }
520   }
521   return corruption_messages.empty() ? Status::OK()
522                                      : Status::Corruption(corruption_messages);
523 }
524 
TryCatchUpWithPrimary()525 Status DBImplSecondary::TryCatchUpWithPrimary() {
526   assert(versions_.get() != nullptr);
527   assert(manifest_reader_.get() != nullptr);
528   Status s;
529   // read the manifest and apply new changes to the secondary instance
530   std::unordered_set<ColumnFamilyData*> cfds_changed;
531   JobContext job_context(0, true /*create_superversion*/);
532   {
533     InstrumentedMutexLock lock_guard(&mutex_);
534     s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
535             ->ReadAndApply(&mutex_, &manifest_reader_,
536                            manifest_reader_status_.get(), &cfds_changed);
537 
538     ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
539                    static_cast<uint64_t>(versions_->LastSequence()));
540     for (ColumnFamilyData* cfd : cfds_changed) {
541       if (cfd->IsDropped()) {
542         ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
543                         cfd->GetName().c_str());
544         continue;
545       }
546       VersionStorageInfo::LevelSummaryStorage tmp;
547       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
548                       "[%s] Level summary: %s\n", cfd->GetName().c_str(),
549                       cfd->current()->storage_info()->LevelSummary(&tmp));
550     }
551 
552     // list wal_dir to discover new WALs and apply new changes to the secondary
553     // instance
554     if (s.ok()) {
555       s = FindAndRecoverLogFiles(&cfds_changed, &job_context);
556     }
557     if (s.IsPathNotFound()) {
558       ROCKS_LOG_INFO(
559           immutable_db_options_.info_log,
560           "Secondary tries to read WAL, but WAL file(s) have already "
561           "been purged by primary.");
562       s = Status::OK();
563     }
564     if (s.ok()) {
565       for (auto cfd : cfds_changed) {
566         cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(),
567                                        &job_context.memtables_to_free);
568         auto& sv_context = job_context.superversion_contexts.back();
569         cfd->InstallSuperVersion(&sv_context, &mutex_);
570         sv_context.NewSuperVersion();
571       }
572     }
573   }
574   job_context.Clean();
575 
576   // Cleanup unused, obsolete files.
577   JobContext purge_files_job_context(0);
578   {
579     InstrumentedMutexLock lock_guard(&mutex_);
580     // Currently, secondary instance does not own the database files, thus it
581     // is unnecessary for the secondary to force full scan.
582     FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
583   }
584   if (purge_files_job_context.HaveSomethingToDelete()) {
585     PurgeObsoleteFiles(purge_files_job_context);
586   }
587   purge_files_job_context.Clean();
588   return s;
589 }
590 
OpenAsSecondary(const Options & options,const std::string & dbname,const std::string & secondary_path,DB ** dbptr)591 Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
592                            const std::string& secondary_path, DB** dbptr) {
593   *dbptr = nullptr;
594 
595   DBOptions db_options(options);
596   ColumnFamilyOptions cf_options(options);
597   std::vector<ColumnFamilyDescriptor> column_families;
598   column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
599   std::vector<ColumnFamilyHandle*> handles;
600 
601   Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
602                                  column_families, &handles, dbptr);
603   if (s.ok()) {
604     assert(handles.size() == 1);
605     delete handles[0];
606   }
607   return s;
608 }
609 
OpenAsSecondary(const DBOptions & db_options,const std::string & dbname,const std::string & secondary_path,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)610 Status DB::OpenAsSecondary(
611     const DBOptions& db_options, const std::string& dbname,
612     const std::string& secondary_path,
613     const std::vector<ColumnFamilyDescriptor>& column_families,
614     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
615   *dbptr = nullptr;
616   if (db_options.max_open_files != -1) {
617     // TODO (yanqin) maybe support max_open_files != -1 by creating hard links
618     // on SST files so that db secondary can still have access to old SSTs
619     // while primary instance may delete original.
620     return Status::InvalidArgument("require max_open_files to be -1");
621   }
622 
623   DBOptions tmp_opts(db_options);
624   Status s;
625   if (nullptr == tmp_opts.info_log) {
626     s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log);
627     if (!s.ok()) {
628       tmp_opts.info_log = nullptr;
629     }
630   }
631 
632   handles->clear();
633   DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path);
634   impl->versions_.reset(new ReactiveVersionSet(
635       dbname, &impl->immutable_db_options_, impl->file_options_,
636       impl->table_cache_.get(), impl->write_buffer_manager_,
637       &impl->write_controller_, impl->io_tracer_));
638   impl->column_family_memtables_.reset(
639       new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
640   impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
641 
642   impl->mutex_.Lock();
643   s = impl->Recover(column_families, true, false, false);
644   if (s.ok()) {
645     for (auto cf : column_families) {
646       auto cfd =
647           impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
648       if (nullptr == cfd) {
649         s = Status::InvalidArgument("Column family not found", cf.name);
650         break;
651       }
652       handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
653     }
654   }
655   SuperVersionContext sv_context(true /* create_superversion */);
656   if (s.ok()) {
657     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
658       sv_context.NewSuperVersion();
659       cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
660     }
661   }
662   impl->mutex_.Unlock();
663   sv_context.Clean();
664   if (s.ok()) {
665     *dbptr = impl;
666     for (auto h : *handles) {
667       impl->NewThreadStatusCfInfo(
668           static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd());
669     }
670   } else {
671     for (auto h : *handles) {
672       delete h;
673     }
674     handles->clear();
675     delete impl;
676   }
677   return s;
678 }
679 
CompactWithoutInstallation(ColumnFamilyHandle * cfh,const CompactionServiceInput & input,CompactionServiceResult * result)680 Status DBImplSecondary::CompactWithoutInstallation(
681     ColumnFamilyHandle* cfh, const CompactionServiceInput& input,
682     CompactionServiceResult* result) {
683   InstrumentedMutexLock l(&mutex_);
684   auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd();
685   if (!cfd) {
686     return Status::InvalidArgument("Cannot find column family" +
687                                    cfh->GetName());
688   }
689 
690   std::unordered_set<uint64_t> input_set;
691   for (const auto& file_name : input.input_files) {
692     input_set.insert(TableFileNameToNumber(file_name));
693   }
694 
695   auto* version = cfd->current();
696 
697   ColumnFamilyMetaData cf_meta;
698   version->GetColumnFamilyMetaData(&cf_meta);
699 
700   const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions();
701   ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions();
702   VersionStorageInfo* vstorage = version->storage_info();
703 
704   // Use comp_options to reuse some CompactFiles functions
705   CompactionOptions comp_options;
706   comp_options.compression = kDisableCompressionOption;
707   comp_options.output_file_size_limit = MaxFileSizeForLevel(
708       *mutable_cf_options, input.output_level, cf_options.compaction_style,
709       vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes);
710 
711   std::vector<CompactionInputFiles> input_files;
712   Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
713       &input_files, &input_set, vstorage, comp_options);
714   if (!s.ok()) {
715     return s;
716   }
717 
718   std::unique_ptr<Compaction> c;
719   assert(cfd->compaction_picker());
720   c.reset(cfd->compaction_picker()->CompactFiles(
721       comp_options, input_files, input.output_level, vstorage,
722       *mutable_cf_options, mutable_db_options_, 0));
723   assert(c != nullptr);
724 
725   c->SetInputVersion(version);
726 
727   // Create output directory if it's not existed yet
728   std::unique_ptr<FSDirectory> output_dir;
729   s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir);
730   if (!s.ok()) {
731     return s;
732   }
733 
734   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
735                        immutable_db_options_.info_log.get());
736 
737   const int job_id = next_job_id_.fetch_add(1);
738 
739   CompactionServiceCompactionJob compaction_job(
740       job_id, c.get(), immutable_db_options_, mutable_db_options_,
741       file_options_for_compaction_, versions_.get(), &shutting_down_,
742       &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_,
743       input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_,
744       db_id_, db_session_id_, secondary_path_, input, result);
745 
746   mutex_.Unlock();
747   s = compaction_job.Run();
748   mutex_.Lock();
749 
750   // clean up
751   compaction_job.io_status().PermitUncheckedError();
752   compaction_job.CleanupCompaction();
753   c->ReleaseCompactionFiles(s);
754   c.reset();
755 
756   TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End",
757                            &s);
758   result->status = s;
759   return s;
760 }
761 
OpenAndCompact(const std::string & name,const std::string & output_directory,const std::string & input,std::string * result,const CompactionServiceOptionsOverride & override_options)762 Status DB::OpenAndCompact(
763     const std::string& name, const std::string& output_directory,
764     const std::string& input, std::string* result,
765     const CompactionServiceOptionsOverride& override_options) {
766   CompactionServiceInput compaction_input;
767   Status s = CompactionServiceInput::Read(input, &compaction_input);
768   if (!s.ok()) {
769     return s;
770   }
771 
772   compaction_input.db_options.max_open_files = -1;
773   compaction_input.db_options.compaction_service = nullptr;
774   if (compaction_input.db_options.statistics) {
775     compaction_input.db_options.statistics.reset();
776   }
777   compaction_input.db_options.env = override_options.env;
778   compaction_input.db_options.file_checksum_gen_factory =
779       override_options.file_checksum_gen_factory;
780   compaction_input.db_options.statistics = override_options.statistics;
781   compaction_input.column_family.options.comparator =
782       override_options.comparator;
783   compaction_input.column_family.options.merge_operator =
784       override_options.merge_operator;
785   compaction_input.column_family.options.compaction_filter =
786       override_options.compaction_filter;
787   compaction_input.column_family.options.compaction_filter_factory =
788       override_options.compaction_filter_factory;
789   compaction_input.column_family.options.prefix_extractor =
790       override_options.prefix_extractor;
791   compaction_input.column_family.options.table_factory =
792       override_options.table_factory;
793   compaction_input.column_family.options.sst_partitioner_factory =
794       override_options.sst_partitioner_factory;
795 
796   std::vector<ColumnFamilyDescriptor> column_families;
797   column_families.push_back(compaction_input.column_family);
798   // TODO: we have to open default CF, because of an implementation limitation,
799   // currently we just use the same CF option from input, which is not collect
800   // and open may fail.
801   if (compaction_input.column_family.name != kDefaultColumnFamilyName) {
802     column_families.emplace_back(kDefaultColumnFamilyName,
803                                  compaction_input.column_family.options);
804   }
805 
806   DB* db;
807   std::vector<ColumnFamilyHandle*> handles;
808 
809   s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory,
810                           column_families, &handles, &db);
811   if (!s.ok()) {
812     return s;
813   }
814 
815   CompactionServiceResult compaction_result;
816   DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db);
817   assert(handles.size() > 0);
818   s = db_secondary->CompactWithoutInstallation(handles[0], compaction_input,
819                                                &compaction_result);
820 
821   Status serialization_status = compaction_result.Write(result);
822 
823   for (auto& handle : handles) {
824     delete handle;
825   }
826   delete db;
827   if (s.ok()) {
828     return serialization_status;
829   }
830   return s;
831 }
832 
833 #else   // !ROCKSDB_LITE
834 
835 Status DB::OpenAsSecondary(const Options& /*options*/,
836                            const std::string& /*name*/,
837                            const std::string& /*secondary_path*/,
838                            DB** /*dbptr*/) {
839   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
840 }
841 
842 Status DB::OpenAsSecondary(
843     const DBOptions& /*db_options*/, const std::string& /*dbname*/,
844     const std::string& /*secondary_path*/,
845     const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
846     std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
847   return Status::NotSupported("Not supported in ROCKSDB_LITE.");
848 }
849 #endif  // !ROCKSDB_LITE
850 
851 }  // namespace ROCKSDB_NAMESPACE
852