1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <cinttypes>
10 
11 #include "db/builder.h"
12 #include "db/db_impl/db_impl.h"
13 #include "db/error_handler.h"
14 #include "db/periodic_work_scheduler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/filename.h"
17 #include "file/read_write_util.h"
18 #include "file/sst_file_manager_impl.h"
19 #include "file/writable_file_writer.h"
20 #include "logging/logging.h"
21 #include "monitoring/persistent_stats_history.h"
22 #include "options/options_helper.h"
23 #include "rocksdb/table.h"
24 #include "rocksdb/wal_filter.h"
25 #include "test_util/sync_point.h"
26 #include "util/rate_limiter.h"
27 
28 namespace ROCKSDB_NAMESPACE {
SanitizeOptions(const std::string & dbname,const Options & src,bool read_only)29 Options SanitizeOptions(const std::string& dbname, const Options& src,
30                         bool read_only) {
31   auto db_options = SanitizeOptions(dbname, DBOptions(src), read_only);
32   ImmutableDBOptions immutable_db_options(db_options);
33   auto cf_options =
34       SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
35   return Options(db_options, cf_options);
36 }
37 
SanitizeOptions(const std::string & dbname,const DBOptions & src,bool read_only)38 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
39                           bool read_only) {
40   DBOptions result(src);
41 
42   if (result.env == nullptr) {
43     result.env = Env::Default();
44   }
45 
46   // result.max_open_files means an "infinite" open files.
47   if (result.max_open_files != -1) {
48     int max_max_open_files = port::GetMaxOpenFiles();
49     if (max_max_open_files == -1) {
50       max_max_open_files = 0x400000;
51     }
52     ClipToRange(&result.max_open_files, 20, max_max_open_files);
53     TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
54                              &result.max_open_files);
55   }
56 
57   if (result.info_log == nullptr && !read_only) {
58     Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
59     if (!s.ok()) {
60       // No place suitable for logging
61       result.info_log = nullptr;
62     }
63   }
64 
65   if (!result.write_buffer_manager) {
66     result.write_buffer_manager.reset(
67         new WriteBufferManager(result.db_write_buffer_size));
68   }
69   auto bg_job_limits = DBImpl::GetBGJobLimits(
70       result.max_background_flushes, result.max_background_compactions,
71       result.max_background_jobs, true /* parallelize_compactions */);
72   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
73                                            Env::Priority::LOW);
74   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
75                                            Env::Priority::HIGH);
76 
77   if (result.rate_limiter.get() != nullptr) {
78     if (result.bytes_per_sync == 0) {
79       result.bytes_per_sync = 1024 * 1024;
80     }
81   }
82 
83   if (result.delayed_write_rate == 0) {
84     if (result.rate_limiter.get() != nullptr) {
85       result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
86     }
87     if (result.delayed_write_rate == 0) {
88       result.delayed_write_rate = 16 * 1024 * 1024;
89     }
90   }
91 
92   if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
93     result.recycle_log_file_num = false;
94   }
95 
96   if (result.recycle_log_file_num &&
97       (result.wal_recovery_mode ==
98            WALRecoveryMode::kTolerateCorruptedTailRecords ||
99        result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
100        result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
101     // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
102     //   feature. WAL recycling expects recovery success upon encountering a
103     //   corrupt record at the point where new data ends and recycled data
104     //   remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
105     //   upon encountering any such corrupt record, as it cannot differentiate
106     //   between this and a real corruption, which would cause committed updates
107     //   to be truncated -- a violation of the recovery guarantee.
108     // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
109     //   recycle log file feature temporarily due to a bug found introducing a
110     //   hole in the recovered data
111     //   (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
112     //   Besides this bug, we believe the features are fundamentally compatible.
113     result.recycle_log_file_num = 0;
114   }
115 
116   if (result.db_paths.size() == 0) {
117     result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
118   } else if (result.wal_dir.empty()) {
119     // Use dbname as default
120     result.wal_dir = dbname;
121   }
122   if (!result.wal_dir.empty()) {
123     // If there is a wal_dir already set, check to see if the wal_dir is the
124     // same as the dbname AND the same as the db_path[0] (which must exist from
125     // a few lines ago). If the wal_dir matches both of these values, then clear
126     // the wal_dir value, which will make wal_dir == dbname.  Most likely this
127     // condition was the result of reading an old options file where we forced
128     // wal_dir to be set (to dbname).
129     auto npath = NormalizePath(dbname + "/");
130     if (npath == NormalizePath(result.wal_dir + "/") &&
131         npath == NormalizePath(result.db_paths[0].path + "/")) {
132       result.wal_dir.clear();
133     }
134   }
135 
136   if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
137     result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
138   }
139 
140   if (result.use_direct_reads && result.compaction_readahead_size == 0) {
141     TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
142     result.compaction_readahead_size = 1024 * 1024 * 2;
143   }
144 
145   if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
146     result.new_table_reader_for_compaction_inputs = true;
147   }
148 
149   // Force flush on DB open if 2PC is enabled, since with 2PC we have no
150   // guarantee that consecutive log files have consecutive sequence id, which
151   // make recovery complicated.
152   if (result.allow_2pc) {
153     result.avoid_flush_during_recovery = false;
154   }
155 
156 #ifndef ROCKSDB_LITE
157   ImmutableDBOptions immutable_db_options(result);
158   if (!immutable_db_options.IsWalDirSameAsDBPath()) {
159     // Either the WAL dir and db_paths[0]/db_name are not the same, or we
160     // cannot tell for sure. In either case, assume they're different and
161     // explicitly cleanup the trash log files (bypass DeleteScheduler)
162     // Do this first so even if we end up calling
163     // DeleteScheduler::CleanupDirectory on the same dir later, it will be
164     // safe
165     std::vector<std::string> filenames;
166     auto wal_dir = immutable_db_options.GetWalDir();
167     Status s = result.env->GetChildren(wal_dir, &filenames);
168     s.PermitUncheckedError();  //**TODO: What to do on error?
169     for (std::string& filename : filenames) {
170       if (filename.find(".log.trash", filename.length() -
171                                           std::string(".log.trash").length()) !=
172           std::string::npos) {
173         std::string trash_file = wal_dir + "/" + filename;
174         result.env->DeleteFile(trash_file).PermitUncheckedError();
175       }
176     }
177   }
178   // When the DB is stopped, it's possible that there are some .trash files that
179   // were not deleted yet, when we open the DB we will find these .trash files
180   // and schedule them to be deleted (or delete immediately if SstFileManager
181   // was not used)
182   auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
183   for (size_t i = 0; i < result.db_paths.size(); i++) {
184     DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
185         .PermitUncheckedError();
186   }
187 
188   // Create a default SstFileManager for purposes of tracking compaction size
189   // and facilitating recovery from out of space errors.
190   if (result.sst_file_manager.get() == nullptr) {
191     std::shared_ptr<SstFileManager> sst_file_manager(
192         NewSstFileManager(result.env, result.info_log));
193     result.sst_file_manager = sst_file_manager;
194   }
195 #endif  // !ROCKSDB_LITE
196 
197   if (!result.paranoid_checks) {
198     result.skip_checking_sst_file_sizes_on_db_open = true;
199     ROCKS_LOG_INFO(result.info_log,
200                    "file size check will be skipped during open.");
201   }
202 
203   return result;
204 }
205 
206 namespace {
ValidateOptionsByTable(const DBOptions & db_opts,const std::vector<ColumnFamilyDescriptor> & column_families)207 Status ValidateOptionsByTable(
208     const DBOptions& db_opts,
209     const std::vector<ColumnFamilyDescriptor>& column_families) {
210   Status s;
211   for (auto cf : column_families) {
212     s = ValidateOptions(db_opts, cf.options);
213     if (!s.ok()) {
214       return s;
215     }
216   }
217   return Status::OK();
218 }
219 }  // namespace
220 
ValidateOptions(const DBOptions & db_options,const std::vector<ColumnFamilyDescriptor> & column_families)221 Status DBImpl::ValidateOptions(
222     const DBOptions& db_options,
223     const std::vector<ColumnFamilyDescriptor>& column_families) {
224   Status s;
225   for (auto& cfd : column_families) {
226     s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
227     if (!s.ok()) {
228       return s;
229     }
230   }
231   s = ValidateOptions(db_options);
232   return s;
233 }
234 
ValidateOptions(const DBOptions & db_options)235 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
236   if (db_options.db_paths.size() > 4) {
237     return Status::NotSupported(
238         "More than four DB paths are not supported yet. ");
239   }
240 
241   if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
242     // Protect against assert in PosixMMapReadableFile constructor
243     return Status::NotSupported(
244         "If memory mapped reads (allow_mmap_reads) are enabled "
245         "then direct I/O reads (use_direct_reads) must be disabled. ");
246   }
247 
248   if (db_options.allow_mmap_writes &&
249       db_options.use_direct_io_for_flush_and_compaction) {
250     return Status::NotSupported(
251         "If memory mapped writes (allow_mmap_writes) are enabled "
252         "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
253         "be disabled. ");
254   }
255 
256   if (db_options.keep_log_file_num == 0) {
257     return Status::InvalidArgument("keep_log_file_num must be greater than 0");
258   }
259 
260   if (db_options.unordered_write &&
261       !db_options.allow_concurrent_memtable_write) {
262     return Status::InvalidArgument(
263         "unordered_write is incompatible with !allow_concurrent_memtable_write");
264   }
265 
266   if (db_options.unordered_write && db_options.enable_pipelined_write) {
267     return Status::InvalidArgument(
268         "unordered_write is incompatible with enable_pipelined_write");
269   }
270 
271   if (db_options.atomic_flush && db_options.enable_pipelined_write) {
272     return Status::InvalidArgument(
273         "atomic_flush is incompatible with enable_pipelined_write");
274   }
275 
276   // TODO remove this restriction
277   if (db_options.atomic_flush && db_options.best_efforts_recovery) {
278     return Status::InvalidArgument(
279         "atomic_flush is currently incompatible with best-efforts recovery");
280   }
281 
282   return Status::OK();
283 }
284 
NewDB(std::vector<std::string> * new_filenames)285 Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
286   VersionEdit new_db;
287   Status s = SetIdentityFile(env_, dbname_);
288   if (!s.ok()) {
289     return s;
290   }
291   if (immutable_db_options_.write_dbid_to_manifest) {
292     std::string temp_db_id;
293     GetDbIdentityFromIdentityFile(&temp_db_id);
294     new_db.SetDBId(temp_db_id);
295   }
296   new_db.SetLogNumber(0);
297   new_db.SetNextFile(2);
298   new_db.SetLastSequence(0);
299 
300   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
301   const std::string manifest = DescriptorFileName(dbname_, 1);
302   {
303     if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
304       fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
305     }
306     std::unique_ptr<FSWritableFile> file;
307     FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
308     s = NewWritableFile(fs_.get(), manifest, &file, file_options);
309     if (!s.ok()) {
310       return s;
311     }
312     FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
313     file->SetPreallocationBlockSize(
314         immutable_db_options_.manifest_preallocation_size);
315     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
316         std::move(file), manifest, file_options, immutable_db_options_.clock,
317         io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
318         nullptr, tmp_set.Contains(FileType::kDescriptorFile),
319         tmp_set.Contains(FileType::kDescriptorFile)));
320     log::Writer log(std::move(file_writer), 0, false);
321     std::string record;
322     new_db.EncodeTo(&record);
323     s = log.AddRecord(record);
324     if (s.ok()) {
325       s = SyncManifest(&immutable_db_options_, log.file());
326     }
327   }
328   if (s.ok()) {
329     // Make "CURRENT" file that points to the new manifest file.
330     s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
331     if (new_filenames) {
332       new_filenames->emplace_back(
333           manifest.substr(manifest.find_last_of("/\\") + 1));
334     }
335   } else {
336     fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
337   }
338   return s;
339 }
340 
CreateAndNewDirectory(FileSystem * fs,const std::string & dirname,std::unique_ptr<FSDirectory> * directory)341 IOStatus DBImpl::CreateAndNewDirectory(
342     FileSystem* fs, const std::string& dirname,
343     std::unique_ptr<FSDirectory>* directory) {
344   // We call CreateDirIfMissing() as the directory may already exist (if we
345   // are reopening a DB), when this happens we don't want creating the
346   // directory to cause an error. However, we need to check if creating the
347   // directory fails or else we may get an obscure message about the lock
348   // file not existing. One real-world example of this occurring is if
349   // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
350   // when dbname_ is "dir/db" but when "dir" doesn't exist.
351   IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
352   if (!io_s.ok()) {
353     return io_s;
354   }
355   return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
356 }
357 
SetDirectories(FileSystem * fs,const std::string & dbname,const std::string & wal_dir,const std::vector<DbPath> & data_paths)358 IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
359                                      const std::string& wal_dir,
360                                      const std::vector<DbPath>& data_paths) {
361   IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
362   if (!io_s.ok()) {
363     return io_s;
364   }
365   if (!wal_dir.empty() && dbname != wal_dir) {
366     io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
367     if (!io_s.ok()) {
368       return io_s;
369     }
370   }
371 
372   data_dirs_.clear();
373   for (auto& p : data_paths) {
374     const std::string db_path = p.path;
375     if (db_path == dbname) {
376       data_dirs_.emplace_back(nullptr);
377     } else {
378       std::unique_ptr<FSDirectory> path_directory;
379       io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
380       if (!io_s.ok()) {
381         return io_s;
382       }
383       data_dirs_.emplace_back(path_directory.release());
384     }
385   }
386   assert(data_dirs_.size() == data_paths.size());
387   return IOStatus::OK();
388 }
389 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,bool error_if_wal_file_exists,bool error_if_data_exists_in_wals,uint64_t * recovered_seq)390 Status DBImpl::Recover(
391     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
392     bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
393     uint64_t* recovered_seq) {
394   mutex_.AssertHeld();
395 
396   bool is_new_db = false;
397   assert(db_lock_ == nullptr);
398   std::vector<std::string> files_in_dbname;
399   if (!read_only) {
400     Status s = directories_.SetDirectories(fs_.get(), dbname_,
401                                            immutable_db_options_.wal_dir,
402                                            immutable_db_options_.db_paths);
403     if (!s.ok()) {
404       return s;
405     }
406 
407     s = env_->LockFile(LockFileName(dbname_), &db_lock_);
408     if (!s.ok()) {
409       return s;
410     }
411 
412     std::string current_fname = CurrentFileName(dbname_);
413     // Path to any MANIFEST file in the db dir. It does not matter which one.
414     // Since best-efforts recovery ignores CURRENT file, existence of a
415     // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
416     // can be found, a new db will be created.
417     std::string manifest_path;
418     if (!immutable_db_options_.best_efforts_recovery) {
419       s = env_->FileExists(current_fname);
420     } else {
421       s = Status::NotFound();
422       Status io_s = env_->GetChildren(dbname_, &files_in_dbname);
423       if (!io_s.ok()) {
424         s = io_s;
425         files_in_dbname.clear();
426       }
427       for (const std::string& file : files_in_dbname) {
428         uint64_t number = 0;
429         FileType type = kWalFile;  // initialize
430         if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
431           // Found MANIFEST (descriptor log), thus best-efforts recovery does
432           // not have to treat the db as empty.
433           s = Status::OK();
434           manifest_path = dbname_ + "/" + file;
435           break;
436         }
437       }
438     }
439     if (s.IsNotFound()) {
440       if (immutable_db_options_.create_if_missing) {
441         s = NewDB(&files_in_dbname);
442         is_new_db = true;
443         if (!s.ok()) {
444           return s;
445         }
446       } else {
447         return Status::InvalidArgument(
448             current_fname, "does not exist (create_if_missing is false)");
449       }
450     } else if (s.ok()) {
451       if (immutable_db_options_.error_if_exists) {
452         return Status::InvalidArgument(dbname_,
453                                        "exists (error_if_exists is true)");
454       }
455     } else {
456       // Unexpected error reading file
457       assert(s.IsIOError());
458       return s;
459     }
460     // Verify compatibility of file_options_ and filesystem
461     {
462       std::unique_ptr<FSRandomAccessFile> idfile;
463       FileOptions customized_fs(file_options_);
464       customized_fs.use_direct_reads |=
465           immutable_db_options_.use_direct_io_for_flush_and_compaction;
466       const std::string& fname =
467           manifest_path.empty() ? current_fname : manifest_path;
468       s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
469       if (!s.ok()) {
470         std::string error_str = s.ToString();
471         // Check if unsupported Direct I/O is the root cause
472         customized_fs.use_direct_reads = false;
473         s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
474         if (s.ok()) {
475           return Status::InvalidArgument(
476               "Direct I/O is not supported by the specified DB.");
477         } else {
478           return Status::InvalidArgument(
479               "Found options incompatible with filesystem", error_str.c_str());
480         }
481       }
482     }
483   } else if (immutable_db_options_.best_efforts_recovery) {
484     assert(files_in_dbname.empty());
485     Status s = env_->GetChildren(dbname_, &files_in_dbname);
486     if (s.IsNotFound()) {
487       return Status::InvalidArgument(dbname_,
488                                      "does not exist (open for read only)");
489     } else if (s.IsIOError()) {
490       return s;
491     }
492     assert(s.ok());
493   }
494   assert(db_id_.empty());
495   Status s;
496   bool missing_table_file = false;
497   if (!immutable_db_options_.best_efforts_recovery) {
498     s = versions_->Recover(column_families, read_only, &db_id_);
499   } else {
500     assert(!files_in_dbname.empty());
501     s = versions_->TryRecover(column_families, read_only, files_in_dbname,
502                               &db_id_, &missing_table_file);
503     if (s.ok()) {
504       // TryRecover may delete previous column_family_set_.
505       column_family_memtables_.reset(
506           new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
507     }
508   }
509   if (!s.ok()) {
510     return s;
511   }
512   s = SetDBId(read_only);
513   if (s.ok() && !read_only) {
514     s = DeleteUnreferencedSstFiles();
515   }
516 
517   if (immutable_db_options_.paranoid_checks && s.ok()) {
518     s = CheckConsistency();
519   }
520   if (s.ok() && !read_only) {
521     std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
522     for (auto cfd : *versions_->GetColumnFamilySet()) {
523       s = cfd->AddDirectories(&created_dirs);
524       if (!s.ok()) {
525         return s;
526       }
527     }
528   }
529   // DB mutex is already held
530   if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
531     s = InitPersistStatsColumnFamily();
532   }
533 
534   std::vector<std::string> files_in_wal_dir;
535   if (s.ok()) {
536     // Initial max_total_in_memory_state_ before recovery wals. Log recovery
537     // may check this value to decide whether to flush.
538     max_total_in_memory_state_ = 0;
539     for (auto cfd : *versions_->GetColumnFamilySet()) {
540       auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
541       max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
542                                     mutable_cf_options->max_write_buffer_number;
543     }
544 
545     SequenceNumber next_sequence(kMaxSequenceNumber);
546     default_cf_handle_ = new ColumnFamilyHandleImpl(
547         versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
548     default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
549     // TODO(Zhongyi): handle single_column_family_mode_ when
550     // persistent_stats is enabled
551     single_column_family_mode_ =
552         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
553 
554     // Recover from all newer log files than the ones named in the
555     // descriptor (new log files may have been added by the previous
556     // incarnation without registering them in the descriptor).
557     //
558     // Note that prev_log_number() is no longer used, but we pay
559     // attention to it in case we are recovering a database
560     // produced by an older version of rocksdb.
561     auto wal_dir = immutable_db_options_.GetWalDir();
562     if (!immutable_db_options_.best_efforts_recovery) {
563       s = env_->GetChildren(wal_dir, &files_in_wal_dir);
564     }
565     if (s.IsNotFound()) {
566       return Status::InvalidArgument("wal_dir not found", wal_dir);
567     } else if (!s.ok()) {
568       return s;
569     }
570 
571     std::unordered_map<uint64_t, std::string> wal_files;
572     for (const auto& file : files_in_wal_dir) {
573       uint64_t number;
574       FileType type;
575       if (ParseFileName(file, &number, &type) && type == kWalFile) {
576         if (is_new_db) {
577           return Status::Corruption(
578               "While creating a new Db, wal_dir contains "
579               "existing log file: ",
580               file);
581         } else {
582           wal_files[number] = LogFileName(wal_dir, number);
583         }
584       }
585     }
586 
587     if (immutable_db_options_.track_and_verify_wals_in_manifest) {
588       if (!immutable_db_options_.best_efforts_recovery) {
589         // Verify WALs in MANIFEST.
590         s = versions_->GetWalSet().CheckWals(env_, wal_files);
591       }  // else since best effort recovery does not recover from WALs, no need
592          // to check WALs.
593     } else if (!versions_->GetWalSet().GetWals().empty()) {
594       // Tracking is disabled, clear previously tracked WALs from MANIFEST,
595       // otherwise, in the future, if WAL tracking is enabled again,
596       // since the WALs deleted when WAL tracking is disabled are not persisted
597       // into MANIFEST, WAL check may fail.
598       VersionEdit edit;
599       WalNumber max_wal_number =
600           versions_->GetWalSet().GetWals().rbegin()->first;
601       edit.DeleteWalsBefore(max_wal_number + 1);
602       s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
603     }
604     if (!s.ok()) {
605       return s;
606     }
607 
608     if (!wal_files.empty()) {
609       if (error_if_wal_file_exists) {
610         return Status::Corruption(
611             "The db was opened in readonly mode with error_if_wal_file_exists"
612             "flag but a WAL file already exists");
613       } else if (error_if_data_exists_in_wals) {
614         for (auto& wal_file : wal_files) {
615           uint64_t bytes;
616           s = env_->GetFileSize(wal_file.second, &bytes);
617           if (s.ok()) {
618             if (bytes > 0) {
619               return Status::Corruption(
620                   "error_if_data_exists_in_wals is set but there are data "
621                   " in WAL files.");
622             }
623           }
624         }
625       }
626     }
627 
628     if (!wal_files.empty()) {
629       // Recover in the order in which the wals were generated
630       std::vector<uint64_t> wals;
631       wals.reserve(wal_files.size());
632       for (const auto& wal_file : wal_files) {
633         wals.push_back(wal_file.first);
634       }
635       std::sort(wals.begin(), wals.end());
636 
637       bool corrupted_wal_found = false;
638       s = RecoverLogFiles(wals, &next_sequence, read_only,
639                           &corrupted_wal_found);
640       if (corrupted_wal_found && recovered_seq != nullptr) {
641         *recovered_seq = next_sequence;
642       }
643       if (!s.ok()) {
644         // Clear memtables if recovery failed
645         for (auto cfd : *versions_->GetColumnFamilySet()) {
646           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
647                                  kMaxSequenceNumber);
648         }
649       }
650     }
651   }
652 
653   if (read_only) {
654     // If we are opening as read-only, we need to update options_file_number_
655     // to reflect the most recent OPTIONS file. It does not matter for regular
656     // read-write db instance because options_file_number_ will later be
657     // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
658     std::vector<std::string> filenames;
659     if (s.ok()) {
660       const std::string normalized_dbname = NormalizePath(dbname_);
661       const std::string normalized_wal_dir =
662           NormalizePath(immutable_db_options_.GetWalDir());
663       if (immutable_db_options_.best_efforts_recovery) {
664         filenames = std::move(files_in_dbname);
665       } else if (normalized_dbname == normalized_wal_dir) {
666         filenames = std::move(files_in_wal_dir);
667       } else {
668         s = env_->GetChildren(GetName(), &filenames);
669       }
670     }
671     if (s.ok()) {
672       uint64_t number = 0;
673       uint64_t options_file_number = 0;
674       FileType type;
675       for (const auto& fname : filenames) {
676         if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
677           options_file_number = std::max(number, options_file_number);
678         }
679       }
680       versions_->options_file_number_ = options_file_number;
681       uint64_t options_file_size = 0;
682       if (options_file_number > 0) {
683         s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
684                               &options_file_size);
685       }
686       versions_->options_file_size_ = options_file_size;
687     }
688   }
689   return s;
690 }
691 
PersistentStatsProcessFormatVersion()692 Status DBImpl::PersistentStatsProcessFormatVersion() {
693   mutex_.AssertHeld();
694   Status s;
695   // persist version when stats CF doesn't exist
696   bool should_persist_format_version = !persistent_stats_cfd_exists_;
697   mutex_.Unlock();
698   if (persistent_stats_cfd_exists_) {
699     // Check persistent stats format version compatibility. Drop and recreate
700     // persistent stats CF if format version is incompatible
701     uint64_t format_version_recovered = 0;
702     Status s_format = DecodePersistentStatsVersionNumber(
703         this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
704     uint64_t compatible_version_recovered = 0;
705     Status s_compatible = DecodePersistentStatsVersionNumber(
706         this, StatsVersionKeyType::kCompatibleVersion,
707         &compatible_version_recovered);
708     // abort reading from existing stats CF if any of following is true:
709     // 1. failed to read format version or compatible version from disk
710     // 2. sst's format version is greater than current format version, meaning
711     // this sst is encoded with a newer RocksDB release, and current compatible
712     // version is below the sst's compatible version
713     if (!s_format.ok() || !s_compatible.ok() ||
714         (kStatsCFCurrentFormatVersion < format_version_recovered &&
715          kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
716       if (!s_format.ok() || !s_compatible.ok()) {
717         ROCKS_LOG_WARN(
718             immutable_db_options_.info_log,
719             "Recreating persistent stats column family since reading "
720             "persistent stats version key failed. Format key: %s, compatible "
721             "key: %s",
722             s_format.ToString().c_str(), s_compatible.ToString().c_str());
723       } else {
724         ROCKS_LOG_WARN(
725             immutable_db_options_.info_log,
726             "Recreating persistent stats column family due to corrupted or "
727             "incompatible format version. Recovered format: %" PRIu64
728             "; recovered format compatible since: %" PRIu64 "\n",
729             format_version_recovered, compatible_version_recovered);
730       }
731       s = DropColumnFamily(persist_stats_cf_handle_);
732       if (s.ok()) {
733         s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
734       }
735       ColumnFamilyHandle* handle = nullptr;
736       if (s.ok()) {
737         ColumnFamilyOptions cfo;
738         OptimizeForPersistentStats(&cfo);
739         s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
740       }
741       if (s.ok()) {
742         persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
743         // should also persist version here because old stats CF is discarded
744         should_persist_format_version = true;
745       }
746     }
747   }
748   if (should_persist_format_version) {
749     // Persistent stats CF being created for the first time, need to write
750     // format version key
751     WriteBatch batch;
752     if (s.ok()) {
753       s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
754                     ToString(kStatsCFCurrentFormatVersion));
755     }
756     if (s.ok()) {
757       s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
758                     ToString(kStatsCFCompatibleFormatVersion));
759     }
760     if (s.ok()) {
761       WriteOptions wo;
762       wo.low_pri = true;
763       wo.no_slowdown = true;
764       wo.sync = false;
765       s = Write(wo, &batch);
766     }
767   }
768   mutex_.Lock();
769   return s;
770 }
771 
InitPersistStatsColumnFamily()772 Status DBImpl::InitPersistStatsColumnFamily() {
773   mutex_.AssertHeld();
774   assert(!persist_stats_cf_handle_);
775   ColumnFamilyData* persistent_stats_cfd =
776       versions_->GetColumnFamilySet()->GetColumnFamily(
777           kPersistentStatsColumnFamilyName);
778   persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
779 
780   Status s;
781   if (persistent_stats_cfd != nullptr) {
782     // We are recovering from a DB which already contains persistent stats CF,
783     // the CF is already created in VersionSet::ApplyOneVersionEdit, but
784     // column family handle was not. Need to explicitly create handle here.
785     persist_stats_cf_handle_ =
786         new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
787   } else {
788     mutex_.Unlock();
789     ColumnFamilyHandle* handle = nullptr;
790     ColumnFamilyOptions cfo;
791     OptimizeForPersistentStats(&cfo);
792     s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
793     persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
794     mutex_.Lock();
795   }
796   return s;
797 }
798 
799 // REQUIRES: wal_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & wal_numbers,SequenceNumber * next_sequence,bool read_only,bool * corrupted_wal_found)800 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
801                                SequenceNumber* next_sequence, bool read_only,
802                                bool* corrupted_wal_found) {
803   struct LogReporter : public log::Reader::Reporter {
804     Env* env;
805     Logger* info_log;
806     const char* fname;
807     Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
808     void Corruption(size_t bytes, const Status& s) override {
809       ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
810                      (status == nullptr ? "(ignoring error) " : ""), fname,
811                      static_cast<int>(bytes), s.ToString().c_str());
812       if (status != nullptr && status->ok()) {
813         *status = s;
814       }
815     }
816   };
817 
818   mutex_.AssertHeld();
819   Status status;
820   std::unordered_map<int, VersionEdit> version_edits;
821   // no need to refcount because iteration is under mutex
822   for (auto cfd : *versions_->GetColumnFamilySet()) {
823     VersionEdit edit;
824     edit.SetColumnFamily(cfd->GetID());
825     version_edits.insert({cfd->GetID(), edit});
826   }
827   int job_id = next_job_id_.fetch_add(1);
828   {
829     auto stream = event_logger_.Log();
830     stream << "job" << job_id << "event"
831            << "recovery_started";
832     stream << "wal_files";
833     stream.StartArray();
834     for (auto wal_number : wal_numbers) {
835       stream << wal_number;
836     }
837     stream.EndArray();
838   }
839 
840 #ifndef ROCKSDB_LITE
841   if (immutable_db_options_.wal_filter != nullptr) {
842     std::map<std::string, uint32_t> cf_name_id_map;
843     std::map<uint32_t, uint64_t> cf_lognumber_map;
844     for (auto cfd : *versions_->GetColumnFamilySet()) {
845       cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
846       cf_lognumber_map.insert(
847           std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
848     }
849 
850     immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
851                                                                cf_name_id_map);
852   }
853 #endif
854 
855   bool stop_replay_by_wal_filter = false;
856   bool stop_replay_for_corruption = false;
857   bool flushed = false;
858   uint64_t corrupted_wal_number = kMaxSequenceNumber;
859   uint64_t min_wal_number = MinLogNumberToKeep();
860   for (auto wal_number : wal_numbers) {
861     if (wal_number < min_wal_number) {
862       ROCKS_LOG_INFO(immutable_db_options_.info_log,
863                      "Skipping log #%" PRIu64
864                      " since it is older than min log to keep #%" PRIu64,
865                      wal_number, min_wal_number);
866       continue;
867     }
868     // The previous incarnation may not have written any MANIFEST
869     // records after allocating this log number.  So we manually
870     // update the file number allocation counter in VersionSet.
871     versions_->MarkFileNumberUsed(wal_number);
872     // Open the log file
873     std::string fname =
874         LogFileName(immutable_db_options_.GetWalDir(), wal_number);
875 
876     ROCKS_LOG_INFO(immutable_db_options_.info_log,
877                    "Recovering log #%" PRIu64 " mode %d", wal_number,
878                    static_cast<int>(immutable_db_options_.wal_recovery_mode));
879     auto logFileDropped = [this, &fname]() {
880       uint64_t bytes;
881       if (env_->GetFileSize(fname, &bytes).ok()) {
882         auto info_log = immutable_db_options_.info_log.get();
883         ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
884                        static_cast<int>(bytes));
885       }
886     };
887     if (stop_replay_by_wal_filter) {
888       logFileDropped();
889       continue;
890     }
891 
892     std::unique_ptr<SequentialFileReader> file_reader;
893     {
894       std::unique_ptr<FSSequentialFile> file;
895       status = fs_->NewSequentialFile(fname,
896                                       fs_->OptimizeForLogRead(file_options_),
897                                       &file, nullptr);
898       if (!status.ok()) {
899         MaybeIgnoreError(&status);
900         if (!status.ok()) {
901           return status;
902         } else {
903           // Fail with one log file, but that's ok.
904           // Try next one.
905           continue;
906         }
907       }
908       file_reader.reset(new SequentialFileReader(
909           std::move(file), fname, immutable_db_options_.log_readahead_size,
910           io_tracer_));
911     }
912 
913     // Create the log reader.
914     LogReporter reporter;
915     reporter.env = env_;
916     reporter.info_log = immutable_db_options_.info_log.get();
917     reporter.fname = fname.c_str();
918     if (!immutable_db_options_.paranoid_checks ||
919         immutable_db_options_.wal_recovery_mode ==
920             WALRecoveryMode::kSkipAnyCorruptedRecords) {
921       reporter.status = nullptr;
922     } else {
923       reporter.status = &status;
924     }
925     // We intentially make log::Reader do checksumming even if
926     // paranoid_checks==false so that corruptions cause entire commits
927     // to be skipped instead of propagating bad information (like overly
928     // large sequence numbers).
929     log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
930                        &reporter, true /*checksum*/, wal_number);
931 
932     // Determine if we should tolerate incomplete records at the tail end of the
933     // Read all the records and add to a memtable
934     std::string scratch;
935     Slice record;
936     WriteBatch batch;
937 
938     TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
939                              /*arg=*/nullptr);
940     while (!stop_replay_by_wal_filter &&
941            reader.ReadRecord(&record, &scratch,
942                              immutable_db_options_.wal_recovery_mode) &&
943            status.ok()) {
944       if (record.size() < WriteBatchInternal::kHeader) {
945         reporter.Corruption(record.size(),
946                             Status::Corruption("log record too small"));
947         continue;
948       }
949 
950       status = WriteBatchInternal::SetContents(&batch, record);
951       if (!status.ok()) {
952         return status;
953       }
954       SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
955 
956       if (immutable_db_options_.wal_recovery_mode ==
957           WALRecoveryMode::kPointInTimeRecovery) {
958         // In point-in-time recovery mode, if sequence id of log files are
959         // consecutive, we continue recovery despite corruption. This could
960         // happen when we open and write to a corrupted DB, where sequence id
961         // will start from the last sequence id we recovered.
962         if (sequence == *next_sequence) {
963           stop_replay_for_corruption = false;
964         }
965         if (stop_replay_for_corruption) {
966           logFileDropped();
967           break;
968         }
969       }
970 
971 #ifndef ROCKSDB_LITE
972       if (immutable_db_options_.wal_filter != nullptr) {
973         WriteBatch new_batch;
974         bool batch_changed = false;
975 
976         WalFilter::WalProcessingOption wal_processing_option =
977             immutable_db_options_.wal_filter->LogRecordFound(
978                 wal_number, fname, batch, &new_batch, &batch_changed);
979 
980         switch (wal_processing_option) {
981           case WalFilter::WalProcessingOption::kContinueProcessing:
982             // do nothing, proceeed normally
983             break;
984           case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
985             // skip current record
986             continue;
987           case WalFilter::WalProcessingOption::kStopReplay:
988             // skip current record and stop replay
989             stop_replay_by_wal_filter = true;
990             continue;
991           case WalFilter::WalProcessingOption::kCorruptedRecord: {
992             status =
993                 Status::Corruption("Corruption reported by Wal Filter ",
994                                    immutable_db_options_.wal_filter->Name());
995             MaybeIgnoreError(&status);
996             if (!status.ok()) {
997               reporter.Corruption(record.size(), status);
998               continue;
999             }
1000             break;
1001           }
1002           default: {
1003             assert(false);  // unhandled case
1004             status = Status::NotSupported(
1005                 "Unknown WalProcessingOption returned"
1006                 " by Wal Filter ",
1007                 immutable_db_options_.wal_filter->Name());
1008             MaybeIgnoreError(&status);
1009             if (!status.ok()) {
1010               return status;
1011             } else {
1012               // Ignore the error with current record processing.
1013               continue;
1014             }
1015           }
1016         }
1017 
1018         if (batch_changed) {
1019           // Make sure that the count in the new batch is
1020           // within the orignal count.
1021           int new_count = WriteBatchInternal::Count(&new_batch);
1022           int original_count = WriteBatchInternal::Count(&batch);
1023           if (new_count > original_count) {
1024             ROCKS_LOG_FATAL(
1025                 immutable_db_options_.info_log,
1026                 "Recovering log #%" PRIu64
1027                 " mode %d log filter %s returned "
1028                 "more records (%d) than original (%d) which is not allowed. "
1029                 "Aborting recovery.",
1030                 wal_number,
1031                 static_cast<int>(immutable_db_options_.wal_recovery_mode),
1032                 immutable_db_options_.wal_filter->Name(), new_count,
1033                 original_count);
1034             status = Status::NotSupported(
1035                 "More than original # of records "
1036                 "returned by Wal Filter ",
1037                 immutable_db_options_.wal_filter->Name());
1038             return status;
1039           }
1040           // Set the same sequence number in the new_batch
1041           // as the original batch.
1042           WriteBatchInternal::SetSequence(&new_batch,
1043                                           WriteBatchInternal::Sequence(&batch));
1044           batch = new_batch;
1045         }
1046       }
1047 #endif  // ROCKSDB_LITE
1048 
1049       // If column family was not found, it might mean that the WAL write
1050       // batch references to the column family that was dropped after the
1051       // insert. We don't want to fail the whole write batch in that case --
1052       // we just ignore the update.
1053       // That's why we set ignore missing column families to true
1054       bool has_valid_writes = false;
1055       status = WriteBatchInternal::InsertInto(
1056           &batch, column_family_memtables_.get(), &flush_scheduler_,
1057           &trim_history_scheduler_, true, wal_number, this,
1058           false /* concurrent_memtable_writes */, next_sequence,
1059           &has_valid_writes, seq_per_batch_, batch_per_txn_);
1060       MaybeIgnoreError(&status);
1061       if (!status.ok()) {
1062         // We are treating this as a failure while reading since we read valid
1063         // blocks that do not form coherent data
1064         reporter.Corruption(record.size(), status);
1065         continue;
1066       }
1067 
1068       if (has_valid_writes && !read_only) {
1069         // we can do this because this is called before client has access to the
1070         // DB and there is only a single thread operating on DB
1071         ColumnFamilyData* cfd;
1072 
1073         while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1074           cfd->UnrefAndTryDelete();
1075           // If this asserts, it means that InsertInto failed in
1076           // filtering updates to already-flushed column families
1077           assert(cfd->GetLogNumber() <= wal_number);
1078           auto iter = version_edits.find(cfd->GetID());
1079           assert(iter != version_edits.end());
1080           VersionEdit* edit = &iter->second;
1081           status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1082           if (!status.ok()) {
1083             // Reflect errors immediately so that conditions like full
1084             // file-systems cause the DB::Open() to fail.
1085             return status;
1086           }
1087           flushed = true;
1088 
1089           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1090                                  *next_sequence);
1091         }
1092       }
1093     }
1094 
1095     if (!status.ok()) {
1096       if (status.IsNotSupported()) {
1097         // We should not treat NotSupported as corruption. It is rather a clear
1098         // sign that we are processing a WAL that is produced by an incompatible
1099         // version of the code.
1100         return status;
1101       }
1102       if (immutable_db_options_.wal_recovery_mode ==
1103           WALRecoveryMode::kSkipAnyCorruptedRecords) {
1104         // We should ignore all errors unconditionally
1105         status = Status::OK();
1106       } else if (immutable_db_options_.wal_recovery_mode ==
1107                  WALRecoveryMode::kPointInTimeRecovery) {
1108         if (status.IsIOError()) {
1109           ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1110                           "IOError during point-in-time reading log #%" PRIu64
1111                           " seq #%" PRIu64
1112                           ". %s. This likely mean loss of synced WAL, "
1113                           "thus recovery fails.",
1114                           wal_number, *next_sequence,
1115                           status.ToString().c_str());
1116           return status;
1117         }
1118         // We should ignore the error but not continue replaying
1119         status = Status::OK();
1120         stop_replay_for_corruption = true;
1121         corrupted_wal_number = wal_number;
1122         if (corrupted_wal_found != nullptr) {
1123           *corrupted_wal_found = true;
1124         }
1125         ROCKS_LOG_INFO(immutable_db_options_.info_log,
1126                        "Point in time recovered to log #%" PRIu64
1127                        " seq #%" PRIu64,
1128                        wal_number, *next_sequence);
1129       } else {
1130         assert(immutable_db_options_.wal_recovery_mode ==
1131                    WALRecoveryMode::kTolerateCorruptedTailRecords ||
1132                immutable_db_options_.wal_recovery_mode ==
1133                    WALRecoveryMode::kAbsoluteConsistency);
1134         return status;
1135       }
1136     }
1137 
1138     flush_scheduler_.Clear();
1139     trim_history_scheduler_.Clear();
1140     auto last_sequence = *next_sequence - 1;
1141     if ((*next_sequence != kMaxSequenceNumber) &&
1142         (versions_->LastSequence() <= last_sequence)) {
1143       versions_->SetLastAllocatedSequence(last_sequence);
1144       versions_->SetLastPublishedSequence(last_sequence);
1145       versions_->SetLastSequence(last_sequence);
1146     }
1147   }
1148   // Compare the corrupted log number to all columnfamily's current log number.
1149   // Abort Open() if any column family's log number is greater than
1150   // the corrupted log number, which means CF contains data beyond the point of
1151   // corruption. This could during PIT recovery when the WAL is corrupted and
1152   // some (but not all) CFs are flushed
1153   // Exclude the PIT case where no log is dropped after the corruption point.
1154   // This is to cover the case for empty wals after corrupted log, in which we
1155   // don't reset stop_replay_for_corruption.
1156   if (stop_replay_for_corruption == true &&
1157       (immutable_db_options_.wal_recovery_mode ==
1158            WALRecoveryMode::kPointInTimeRecovery ||
1159        immutable_db_options_.wal_recovery_mode ==
1160            WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1161     for (auto cfd : *versions_->GetColumnFamilySet()) {
1162       // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
1163       // the CF is still consistent: If a new column family is created during
1164       // the flush and the WAL sync fails at the same time, the new CF points to
1165       // the new WAL but the old WAL is curropted. Since the new CF is empty, it
1166       // is still consistent. We add the check of CF sst file size to avoid the
1167       // false positive alert.
1168 
1169       // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
1170       // the ignorance of a very rare inconsistency case caused in data
1171       // canclation. One CF is empty due to KV deletion. But those operations
1172       // are in the WAL. If the WAL is corrupted, the status of this CF might
1173       // not be consistent with others. However, the consistency check will be
1174       // bypassed due to empty CF.
1175       // TODO: a better and complete implementation is needed to ensure strict
1176       // consistency check in WAL recovery including hanlding the tailing
1177       // issues.
1178       if (cfd->GetLogNumber() > corrupted_wal_number &&
1179           cfd->GetLiveSstFilesSize() > 0) {
1180         ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1181                         "Column family inconsistency: SST file contains data"
1182                         " beyond the point of corruption.");
1183         return Status::Corruption("SST file is ahead of WALs in CF " +
1184                                   cfd->GetName());
1185       }
1186     }
1187   }
1188 
1189   // True if there's any data in the WALs; if not, we can skip re-processing
1190   // them later
1191   bool data_seen = false;
1192   if (!read_only) {
1193     // no need to refcount since client still doesn't have access
1194     // to the DB and can not drop column families while we iterate
1195     const WalNumber max_wal_number = wal_numbers.back();
1196     for (auto cfd : *versions_->GetColumnFamilySet()) {
1197       auto iter = version_edits.find(cfd->GetID());
1198       assert(iter != version_edits.end());
1199       VersionEdit* edit = &iter->second;
1200 
1201       if (cfd->GetLogNumber() > max_wal_number) {
1202         // Column family cfd has already flushed the data
1203         // from all wals. Memtable has to be empty because
1204         // we filter the updates based on wal_number
1205         // (in WriteBatch::InsertInto)
1206         assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1207         assert(edit->NumEntries() == 0);
1208         continue;
1209       }
1210 
1211       TEST_SYNC_POINT_CALLBACK(
1212           "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1213 
1214       // flush the final memtable (if non-empty)
1215       if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1216         // If flush happened in the middle of recovery (e.g. due to memtable
1217         // being full), we flush at the end. Otherwise we'll need to record
1218         // where we were on last flush, which make the logic complicated.
1219         if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1220           status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1221           if (!status.ok()) {
1222             // Recovery failed
1223             break;
1224           }
1225           flushed = true;
1226 
1227           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1228                                  versions_->LastSequence());
1229         }
1230         data_seen = true;
1231       }
1232 
1233       // Update the log number info in the version edit corresponding to this
1234       // column family. Note that the version edits will be written to MANIFEST
1235       // together later.
1236       // writing wal_number in the manifest means that any log file
1237       // with number strongly less than (wal_number + 1) is already
1238       // recovered and should be ignored on next reincarnation.
1239       // Since we already recovered max_wal_number, we want all wals
1240       // with numbers `<= max_wal_number` (includes this one) to be ignored
1241       if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1242         edit->SetLogNumber(max_wal_number + 1);
1243       }
1244     }
1245     if (status.ok()) {
1246       // we must mark the next log number as used, even though it's
1247       // not actually used. that is because VersionSet assumes
1248       // VersionSet::next_file_number_ always to be strictly greater than any
1249       // log number
1250       versions_->MarkFileNumberUsed(max_wal_number + 1);
1251 
1252       autovector<ColumnFamilyData*> cfds;
1253       autovector<const MutableCFOptions*> cf_opts;
1254       autovector<autovector<VersionEdit*>> edit_lists;
1255       for (auto* cfd : *versions_->GetColumnFamilySet()) {
1256         cfds.push_back(cfd);
1257         cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1258         auto iter = version_edits.find(cfd->GetID());
1259         assert(iter != version_edits.end());
1260         edit_lists.push_back({&iter->second});
1261       }
1262 
1263       std::unique_ptr<VersionEdit> wal_deletion;
1264       if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1265         wal_deletion.reset(new VersionEdit);
1266         wal_deletion->DeleteWalsBefore(max_wal_number + 1);
1267         edit_lists.back().push_back(wal_deletion.get());
1268       }
1269 
1270       // write MANIFEST with update
1271       status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1272                                       directories_.GetDbDir(),
1273                                       /*new_descriptor_log=*/true);
1274     }
1275   }
1276 
1277   if (status.ok()) {
1278     if (data_seen && !flushed) {
1279       status = RestoreAliveLogFiles(wal_numbers);
1280     } else {
1281       // If there's no data in the WAL, or we flushed all the data, still
1282       // truncate the log file. If the process goes into a crash loop before
1283       // the file is deleted, the preallocated space will never get freed.
1284       const bool truncate = !read_only;
1285       GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
1286           .PermitUncheckedError();
1287     }
1288   }
1289 
1290   event_logger_.Log() << "job" << job_id << "event"
1291                       << "recovery_finished";
1292 
1293   return status;
1294 }
1295 
GetLogSizeAndMaybeTruncate(uint64_t wal_number,bool truncate,LogFileNumberSize * log_ptr)1296 Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
1297                                           LogFileNumberSize* log_ptr) {
1298   LogFileNumberSize log(wal_number);
1299   std::string fname =
1300       LogFileName(immutable_db_options_.GetWalDir(), wal_number);
1301   Status s;
1302   // This gets the appear size of the wals, not including preallocated space.
1303   s = env_->GetFileSize(fname, &log.size);
1304   if (s.ok() && truncate) {
1305     std::unique_ptr<FSWritableFile> last_log;
1306     Status truncate_status = fs_->ReopenWritableFile(
1307         fname,
1308         fs_->OptimizeForLogWrite(
1309             file_options_,
1310             BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1311         &last_log, nullptr);
1312     if (truncate_status.ok()) {
1313       truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1314     }
1315     if (truncate_status.ok()) {
1316       truncate_status = last_log->Close(IOOptions(), nullptr);
1317     }
1318     // Not a critical error if fail to truncate.
1319     if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
1320       ROCKS_LOG_WARN(immutable_db_options_.info_log,
1321                      "Failed to truncate log #%" PRIu64 ": %s", wal_number,
1322                      truncate_status.ToString().c_str());
1323     }
1324   }
1325   if (log_ptr) {
1326     *log_ptr = log;
1327   }
1328   return s;
1329 }
1330 
RestoreAliveLogFiles(const std::vector<uint64_t> & wal_numbers)1331 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
1332   if (wal_numbers.empty()) {
1333     return Status::OK();
1334   }
1335   Status s;
1336   mutex_.AssertHeld();
1337   assert(immutable_db_options_.avoid_flush_during_recovery);
1338   if (two_write_queues_) {
1339     log_write_mutex_.Lock();
1340   }
1341   // Mark these as alive so they'll be considered for deletion later by
1342   // FindObsoleteFiles()
1343   total_log_size_ = 0;
1344   log_empty_ = false;
1345   for (auto wal_number : wal_numbers) {
1346     // We preallocate space for wals, but then after a crash and restart, those
1347     // preallocated space are not needed anymore. It is likely only the last
1348     // log has such preallocated space, so we only truncate for the last log.
1349     LogFileNumberSize log;
1350     s = GetLogSizeAndMaybeTruncate(
1351         wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
1352     if (!s.ok()) {
1353       break;
1354     }
1355     total_log_size_ += log.size;
1356     alive_log_files_.push_back(log);
1357   }
1358   if (two_write_queues_) {
1359     log_write_mutex_.Unlock();
1360   }
1361   return s;
1362 }
1363 
WriteLevel0TableForRecovery(int job_id,ColumnFamilyData * cfd,MemTable * mem,VersionEdit * edit)1364 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1365                                            MemTable* mem, VersionEdit* edit) {
1366   mutex_.AssertHeld();
1367   const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
1368 
1369   FileMetaData meta;
1370   std::vector<BlobFileAddition> blob_file_additions;
1371 
1372   std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1373       new std::list<uint64_t>::iterator(
1374           CaptureCurrentFileNumberInPendingOutputs()));
1375   meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1376   ReadOptions ro;
1377   ro.total_order_seek = true;
1378   Arena arena;
1379   Status s;
1380   TableProperties table_properties;
1381   {
1382     ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1383     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1384                     "[%s] [WriteLevel0TableForRecovery]"
1385                     " Level-0 table #%" PRIu64 ": started",
1386                     cfd->GetName().c_str(), meta.fd.GetNumber());
1387 
1388     // Get the latest mutable cf options while the mutex is still locked
1389     const MutableCFOptions mutable_cf_options =
1390         *cfd->GetLatestMutableCFOptions();
1391     bool paranoid_file_checks =
1392         cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1393 
1394     int64_t _current_time = 0;
1395     immutable_db_options_.clock->GetCurrentTime(&_current_time)
1396         .PermitUncheckedError();  // ignore error
1397     const uint64_t current_time = static_cast<uint64_t>(_current_time);
1398     meta.oldest_ancester_time = current_time;
1399 
1400     {
1401       auto write_hint = cfd->CalculateSSTWriteHint(0);
1402       mutex_.Unlock();
1403 
1404       SequenceNumber earliest_write_conflict_snapshot;
1405       std::vector<SequenceNumber> snapshot_seqs =
1406           snapshots_.GetAll(&earliest_write_conflict_snapshot);
1407       auto snapshot_checker = snapshot_checker_.get();
1408       if (use_custom_gc_ && snapshot_checker == nullptr) {
1409         snapshot_checker = DisableGCSnapshotChecker::Instance();
1410       }
1411       std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1412           range_del_iters;
1413       auto range_del_iter =
1414           mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1415       if (range_del_iter != nullptr) {
1416         range_del_iters.emplace_back(range_del_iter);
1417       }
1418 
1419       IOStatus io_s;
1420       TableBuilderOptions tboptions(
1421           *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
1422           cfd->int_tbl_prop_collector_factories(),
1423           GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1424           mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
1425           0 /* level */, false /* is_bottommost */,
1426           TableFileCreationReason::kRecovery, current_time,
1427           0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_,
1428           db_session_id_, 0 /* target_file_size */, meta.fd.GetNumber());
1429       s = BuildTable(
1430           dbname_, versions_.get(), immutable_db_options_, tboptions,
1431           file_options_for_compaction_, cfd->table_cache(), iter.get(),
1432           std::move(range_del_iters), &meta, &blob_file_additions,
1433           snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1434           paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
1435           BlobFileCreationReason::kRecovery, &event_logger_, job_id,
1436           Env::IO_HIGH, nullptr /* table_properties */, write_hint,
1437           nullptr /*full_history_ts_low*/, &blob_callback_);
1438       LogFlush(immutable_db_options_.info_log);
1439       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1440                       "[%s] [WriteLevel0TableForRecovery]"
1441                       " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1442                       cfd->GetName().c_str(), meta.fd.GetNumber(),
1443                       meta.fd.GetFileSize(), s.ToString().c_str());
1444       mutex_.Lock();
1445 
1446       // TODO(AR) is this ok?
1447       if (!io_s.ok() && s.ok()) {
1448         s = io_s;
1449       }
1450     }
1451   }
1452   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1453 
1454   // Note that if file_size is zero, the file has been deleted and
1455   // should not be added to the manifest.
1456   const bool has_output = meta.fd.GetFileSize() > 0;
1457 
1458   constexpr int level = 0;
1459 
1460   if (s.ok() && has_output) {
1461     edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1462                   meta.fd.GetFileSize(), meta.smallest, meta.largest,
1463                   meta.fd.smallest_seqno, meta.fd.largest_seqno,
1464                   meta.marked_for_compaction, meta.oldest_blob_file_number,
1465                   meta.oldest_ancester_time, meta.file_creation_time,
1466                   meta.file_checksum, meta.file_checksum_func_name);
1467 
1468     for (const auto& blob : blob_file_additions) {
1469       edit->AddBlobFile(blob);
1470     }
1471   }
1472 
1473   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1474   stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
1475 
1476   if (has_output) {
1477     stats.bytes_written = meta.fd.GetFileSize();
1478     stats.num_output_files = 1;
1479   }
1480 
1481   const auto& blobs = edit->GetBlobFileAdditions();
1482   for (const auto& blob : blobs) {
1483     stats.bytes_written_blob += blob.GetTotalBlobBytes();
1484   }
1485 
1486   stats.num_output_files_blob = static_cast<int>(blobs.size());
1487 
1488   cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1489   cfd->internal_stats()->AddCFStats(
1490       InternalStats::BYTES_FLUSHED,
1491       stats.bytes_written + stats.bytes_written_blob);
1492   RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1493   return s;
1494 }
1495 
Open(const Options & options,const std::string & dbname,DB ** dbptr)1496 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1497   DBOptions db_options(options);
1498   ColumnFamilyOptions cf_options(options);
1499   std::vector<ColumnFamilyDescriptor> column_families;
1500   column_families.push_back(
1501       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1502   if (db_options.persist_stats_to_disk) {
1503     column_families.push_back(
1504         ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1505   }
1506   std::vector<ColumnFamilyHandle*> handles;
1507   Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1508   if (s.ok()) {
1509     if (db_options.persist_stats_to_disk) {
1510       assert(handles.size() == 2);
1511     } else {
1512       assert(handles.size() == 1);
1513     }
1514     // i can delete the handle since DBImpl is always holding a reference to
1515     // default column family
1516     if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1517       delete handles[1];
1518     }
1519     delete handles[0];
1520   }
1521   return s;
1522 }
1523 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)1524 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1525                 const std::vector<ColumnFamilyDescriptor>& column_families,
1526                 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1527   const bool kSeqPerBatch = true;
1528   const bool kBatchPerTxn = true;
1529   return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1530                       !kSeqPerBatch, kBatchPerTxn);
1531 }
1532 
CreateWAL(uint64_t log_file_num,uint64_t recycle_log_number,size_t preallocate_block_size,log::Writer ** new_log)1533 IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1534                            size_t preallocate_block_size,
1535                            log::Writer** new_log) {
1536   IOStatus io_s;
1537   std::unique_ptr<FSWritableFile> lfile;
1538 
1539   DBOptions db_options =
1540       BuildDBOptions(immutable_db_options_, mutable_db_options_);
1541   FileOptions opt_file_options =
1542       fs_->OptimizeForLogWrite(file_options_, db_options);
1543   std::string wal_dir = immutable_db_options_.GetWalDir();
1544   std::string log_fname = LogFileName(wal_dir, log_file_num);
1545 
1546   if (recycle_log_number) {
1547     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1548                    "reusing log %" PRIu64 " from recycle list\n",
1549                    recycle_log_number);
1550     std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
1551     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1552     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1553     io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1554                                   &lfile, /*dbg=*/nullptr);
1555   } else {
1556     io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1557   }
1558 
1559   if (io_s.ok()) {
1560     lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1561     lfile->SetPreallocationBlockSize(preallocate_block_size);
1562 
1563     const auto& listeners = immutable_db_options_.listeners;
1564     FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
1565     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1566         std::move(lfile), log_fname, opt_file_options,
1567         immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
1568         nullptr, tmp_set.Contains(FileType::kWalFile),
1569         tmp_set.Contains(FileType::kWalFile)));
1570     *new_log = new log::Writer(std::move(file_writer), log_file_num,
1571                                immutable_db_options_.recycle_log_file_num > 0,
1572                                immutable_db_options_.manual_wal_flush);
1573   }
1574   return io_s;
1575 }
1576 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,const bool seq_per_batch,const bool batch_per_txn)1577 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1578                     const std::vector<ColumnFamilyDescriptor>& column_families,
1579                     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1580                     const bool seq_per_batch, const bool batch_per_txn) {
1581   Status s = ValidateOptionsByTable(db_options, column_families);
1582   if (!s.ok()) {
1583     return s;
1584   }
1585 
1586   s = ValidateOptions(db_options, column_families);
1587   if (!s.ok()) {
1588     return s;
1589   }
1590 
1591   *dbptr = nullptr;
1592   handles->clear();
1593 
1594   size_t max_write_buffer_size = 0;
1595   for (auto cf : column_families) {
1596     max_write_buffer_size =
1597         std::max(max_write_buffer_size, cf.options.write_buffer_size);
1598   }
1599 
1600   DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1601   s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
1602   if (s.ok()) {
1603     std::vector<std::string> paths;
1604     for (auto& db_path : impl->immutable_db_options_.db_paths) {
1605       paths.emplace_back(db_path.path);
1606     }
1607     for (auto& cf : column_families) {
1608       for (auto& cf_path : cf.options.cf_paths) {
1609         paths.emplace_back(cf_path.path);
1610       }
1611     }
1612     for (auto& path : paths) {
1613       s = impl->env_->CreateDirIfMissing(path);
1614       if (!s.ok()) {
1615         break;
1616       }
1617     }
1618 
1619     // For recovery from NoSpace() error, we can only handle
1620     // the case where the database is stored in a single path
1621     if (paths.size() <= 1) {
1622       impl->error_handler_.EnableAutoRecovery();
1623     }
1624   }
1625   if (s.ok()) {
1626     s = impl->CreateArchivalDirectory();
1627   }
1628   if (!s.ok()) {
1629     delete impl;
1630     return s;
1631   }
1632 
1633   impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
1634 
1635   impl->mutex_.Lock();
1636   // Handles create_if_missing, error_if_exists
1637   uint64_t recovered_seq(kMaxSequenceNumber);
1638   s = impl->Recover(column_families, false, false, false, &recovered_seq);
1639   if (s.ok()) {
1640     uint64_t new_log_number = impl->versions_->NewFileNumber();
1641     log::Writer* new_log = nullptr;
1642     const size_t preallocate_block_size =
1643         impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1644     s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1645                         preallocate_block_size, &new_log);
1646     if (s.ok()) {
1647       InstrumentedMutexLock wl(&impl->log_write_mutex_);
1648       impl->logfile_number_ = new_log_number;
1649       assert(new_log != nullptr);
1650       assert(impl->logs_.empty());
1651       impl->logs_.emplace_back(new_log_number, new_log);
1652     }
1653 
1654     if (s.ok()) {
1655       // set column family handles
1656       for (auto cf : column_families) {
1657         auto cfd =
1658             impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1659         if (cfd != nullptr) {
1660           handles->push_back(
1661               new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1662           impl->NewThreadStatusCfInfo(cfd);
1663         } else {
1664           if (db_options.create_missing_column_families) {
1665             // missing column family, create it
1666             ColumnFamilyHandle* handle;
1667             impl->mutex_.Unlock();
1668             s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1669             impl->mutex_.Lock();
1670             if (s.ok()) {
1671               handles->push_back(handle);
1672             } else {
1673               break;
1674             }
1675           } else {
1676             s = Status::InvalidArgument("Column family not found", cf.name);
1677             break;
1678           }
1679         }
1680       }
1681     }
1682     if (s.ok()) {
1683       SuperVersionContext sv_context(/* create_superversion */ true);
1684       for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1685         impl->InstallSuperVersionAndScheduleWork(
1686             cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1687       }
1688       sv_context.Clean();
1689       if (impl->two_write_queues_) {
1690         impl->log_write_mutex_.Lock();
1691       }
1692       impl->alive_log_files_.push_back(
1693           DBImpl::LogFileNumberSize(impl->logfile_number_));
1694       if (impl->two_write_queues_) {
1695         impl->log_write_mutex_.Unlock();
1696       }
1697 
1698       impl->DeleteObsoleteFiles();
1699       s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
1700       TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFilesAndSyncDir");
1701     }
1702     if (s.ok()) {
1703       // In WritePrepared there could be gap in sequence numbers. This breaks
1704       // the trick we use in kPointInTimeRecovery which assumes the first seq in
1705       // the log right after the corrupted log is one larger than the last seq
1706       // we read from the wals. To let this trick keep working, we add a dummy
1707       // entry with the expected sequence to the first log right after recovery.
1708       // In non-WritePrepared case also the new log after recovery could be
1709       // empty, and thus missing the consecutive seq hint to distinguish
1710       // middle-log corruption to corrupted-log-remained-after-recovery. This
1711       // case also will be addressed by a dummy write.
1712       if (recovered_seq != kMaxSequenceNumber) {
1713         WriteBatch empty_batch;
1714         WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1715         WriteOptions write_options;
1716         uint64_t log_used, log_size;
1717         log::Writer* log_writer = impl->logs_.back().writer;
1718         s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1719         if (s.ok()) {
1720           // Need to fsync, otherwise it might get lost after a power reset.
1721           s = impl->FlushWAL(false);
1722           if (s.ok()) {
1723             s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1724           }
1725         }
1726       }
1727     }
1728   }
1729   if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1730     // try to read format version
1731     s = impl->PersistentStatsProcessFormatVersion();
1732   }
1733 
1734   if (s.ok()) {
1735     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1736       if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1737         auto* vstorage = cfd->current()->storage_info();
1738         for (int i = 1; i < vstorage->num_levels(); ++i) {
1739           int num_files = vstorage->NumLevelFiles(i);
1740           if (num_files > 0) {
1741             s = Status::InvalidArgument(
1742                 "Not all files are at level 0. Cannot "
1743                 "open with FIFO compaction style.");
1744             break;
1745           }
1746         }
1747       }
1748       if (!cfd->mem()->IsSnapshotSupported()) {
1749         impl->is_snapshot_supported_ = false;
1750       }
1751       if (cfd->ioptions()->merge_operator != nullptr &&
1752           !cfd->mem()->IsMergeOperatorSupported()) {
1753         s = Status::InvalidArgument(
1754             "The memtable of column family %s does not support merge operator "
1755             "its options.merge_operator is non-null",
1756             cfd->GetName().c_str());
1757       }
1758       if (!s.ok()) {
1759         break;
1760       }
1761     }
1762   }
1763   TEST_SYNC_POINT("DBImpl::Open:Opened");
1764   Status persist_options_status;
1765   if (s.ok()) {
1766     // Persist RocksDB Options before scheduling the compaction.
1767     // The WriteOptionsFile() will release and lock the mutex internally.
1768     persist_options_status = impl->WriteOptionsFile(
1769         false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1770 
1771     *dbptr = impl;
1772     impl->opened_successfully_ = true;
1773     impl->MaybeScheduleFlushOrCompaction();
1774   } else {
1775     persist_options_status.PermitUncheckedError();
1776   }
1777   impl->mutex_.Unlock();
1778 
1779 #ifndef ROCKSDB_LITE
1780   auto sfm = static_cast<SstFileManagerImpl*>(
1781       impl->immutable_db_options_.sst_file_manager.get());
1782   if (s.ok() && sfm) {
1783     // Set Statistics ptr for SstFileManager to dump the stats of
1784     // DeleteScheduler.
1785     sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
1786     ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
1787                    "SstFileManager instance %p", sfm);
1788 
1789     // Notify SstFileManager about all sst files that already exist in
1790     // db_paths[0] and cf_paths[0] when the DB is opened.
1791 
1792     // SstFileManagerImpl needs to know sizes of the files. For files whose size
1793     // we already know (sst files that appear in manifest - typically that's the
1794     // vast majority of all files), we'll pass the size to SstFileManager.
1795     // For all other files SstFileManager will query the size from filesystem.
1796 
1797     std::vector<LiveFileMetaData> metadata;
1798 
1799     // TODO: Once GetLiveFilesMetaData supports blob files, update the logic
1800     // below to get known_file_sizes for blob files.
1801     impl->mutex_.Lock();
1802     impl->versions_->GetLiveFilesMetaData(&metadata);
1803     impl->mutex_.Unlock();
1804 
1805     std::unordered_map<std::string, uint64_t> known_file_sizes;
1806     for (const auto& md : metadata) {
1807       std::string name = md.name;
1808       if (!name.empty() && name[0] == '/') {
1809         name = name.substr(1);
1810       }
1811       known_file_sizes[name] = md.size;
1812     }
1813 
1814     std::vector<std::string> paths;
1815     paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1816     for (auto& cf : column_families) {
1817       if (!cf.options.cf_paths.empty()) {
1818         paths.emplace_back(cf.options.cf_paths[0].path);
1819       }
1820     }
1821     // Remove duplicate paths.
1822     std::sort(paths.begin(), paths.end());
1823     paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1824     for (auto& path : paths) {
1825       std::vector<std::string> existing_files;
1826       impl->immutable_db_options_.env->GetChildren(path, &existing_files)
1827           .PermitUncheckedError();  //**TODO: What do to on error?
1828       for (auto& file_name : existing_files) {
1829         uint64_t file_number;
1830         FileType file_type;
1831         std::string file_path = path + "/" + file_name;
1832         if (ParseFileName(file_name, &file_number, &file_type) &&
1833             (file_type == kTableFile || file_type == kBlobFile)) {
1834           // TODO: Check for errors from OnAddFile?
1835           if (known_file_sizes.count(file_name)) {
1836             // We're assuming that each sst file name exists in at most one of
1837             // the paths.
1838             sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
1839                 .PermitUncheckedError();
1840           } else {
1841             sfm->OnAddFile(file_path).PermitUncheckedError();
1842           }
1843         }
1844       }
1845     }
1846 
1847     // Reserve some disk buffer space. This is a heuristic - when we run out
1848     // of disk space, this ensures that there is atleast write_buffer_size
1849     // amount of free space before we resume DB writes. In low disk space
1850     // conditions, we want to avoid a lot of small L0 files due to frequent
1851     // WAL write failures and resultant forced flushes
1852     sfm->ReserveDiskBuffer(max_write_buffer_size,
1853                            impl->immutable_db_options_.db_paths[0].path);
1854   }
1855 
1856 #endif  // !ROCKSDB_LITE
1857 
1858   if (s.ok()) {
1859     ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1860                      impl);
1861     LogFlush(impl->immutable_db_options_.info_log);
1862     assert(impl->TEST_WALBufferIsEmpty());
1863     // If the assert above fails then we need to FlushWAL before returning
1864     // control back to the user.
1865     if (!persist_options_status.ok()) {
1866       s = Status::IOError(
1867           "DB::Open() failed --- Unable to persist Options file",
1868           persist_options_status.ToString());
1869     }
1870   } else {
1871     ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
1872                    "Persisting Option File error: %s",
1873                    persist_options_status.ToString().c_str());
1874   }
1875   if (s.ok()) {
1876     impl->StartPeriodicWorkScheduler();
1877   } else {
1878     for (auto* h : *handles) {
1879       delete h;
1880     }
1881     handles->clear();
1882     delete impl;
1883     *dbptr = nullptr;
1884   }
1885   return s;
1886 }
1887 }  // namespace ROCKSDB_NAMESPACE
1888