1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <cinttypes>
10 
11 #include "db/builder.h"
12 #include "db/db_impl/db_impl.h"
13 #include "db/error_handler.h"
14 #include "db/periodic_work_scheduler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/read_write_util.h"
17 #include "file/sst_file_manager_impl.h"
18 #include "file/writable_file_writer.h"
19 #include "monitoring/persistent_stats_history.h"
20 #include "options/options_helper.h"
21 #include "rocksdb/table.h"
22 #include "rocksdb/wal_filter.h"
23 #include "test_util/sync_point.h"
24 #include "util/rate_limiter.h"
25 
26 namespace ROCKSDB_NAMESPACE {
SanitizeOptions(const std::string & dbname,const Options & src,bool read_only)27 Options SanitizeOptions(const std::string& dbname, const Options& src,
28                         bool read_only) {
29   auto db_options = SanitizeOptions(dbname, DBOptions(src), read_only);
30   ImmutableDBOptions immutable_db_options(db_options);
31   auto cf_options =
32       SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
33   return Options(db_options, cf_options);
34 }
35 
SanitizeOptions(const std::string & dbname,const DBOptions & src,bool read_only)36 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
37                           bool read_only) {
38   DBOptions result(src);
39 
40   if (result.env == nullptr) {
41     result.env = Env::Default();
42   }
43 
44   // result.max_open_files means an "infinite" open files.
45   if (result.max_open_files != -1) {
46     int max_max_open_files = port::GetMaxOpenFiles();
47     if (max_max_open_files == -1) {
48       max_max_open_files = 0x400000;
49     }
50     ClipToRange(&result.max_open_files, 20, max_max_open_files);
51     TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
52                              &result.max_open_files);
53   }
54 
55   if (result.info_log == nullptr && !read_only) {
56     Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
57     if (!s.ok()) {
58       // No place suitable for logging
59       result.info_log = nullptr;
60     }
61   }
62 
63   if (!result.write_buffer_manager) {
64     result.write_buffer_manager.reset(
65         new WriteBufferManager(result.db_write_buffer_size));
66   }
67   auto bg_job_limits = DBImpl::GetBGJobLimits(
68       result.max_background_flushes, result.max_background_compactions,
69       result.max_background_jobs, true /* parallelize_compactions */);
70   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
71                                            Env::Priority::LOW);
72   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
73                                            Env::Priority::HIGH);
74 
75   if (result.rate_limiter.get() != nullptr) {
76     if (result.bytes_per_sync == 0) {
77       result.bytes_per_sync = 1024 * 1024;
78     }
79   }
80 
81   if (result.delayed_write_rate == 0) {
82     if (result.rate_limiter.get() != nullptr) {
83       result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
84     }
85     if (result.delayed_write_rate == 0) {
86       result.delayed_write_rate = 16 * 1024 * 1024;
87     }
88   }
89 
90   if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
91     result.recycle_log_file_num = false;
92   }
93 
94   if (result.recycle_log_file_num &&
95       (result.wal_recovery_mode ==
96            WALRecoveryMode::kTolerateCorruptedTailRecords ||
97        result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
98        result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
99     // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
100     //   feature. WAL recycling expects recovery success upon encountering a
101     //   corrupt record at the point where new data ends and recycled data
102     //   remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
103     //   upon encountering any such corrupt record, as it cannot differentiate
104     //   between this and a real corruption, which would cause committed updates
105     //   to be truncated -- a violation of the recovery guarantee.
106     // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
107     //   recycle log file feature temporarily due to a bug found introducing a
108     //   hole in the recovered data
109     //   (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
110     //   Besides this bug, we believe the features are fundamentally compatible.
111     result.recycle_log_file_num = 0;
112   }
113 
114   if (result.wal_dir.empty()) {
115     // Use dbname as default
116     result.wal_dir = dbname;
117   }
118   if (result.wal_dir.back() == '/') {
119     result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
120   }
121 
122   if (result.db_paths.size() == 0) {
123     result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
124   }
125 
126   if (result.use_direct_reads && result.compaction_readahead_size == 0) {
127     TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
128     result.compaction_readahead_size = 1024 * 1024 * 2;
129   }
130 
131   if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
132     result.new_table_reader_for_compaction_inputs = true;
133   }
134 
135   // Force flush on DB open if 2PC is enabled, since with 2PC we have no
136   // guarantee that consecutive log files have consecutive sequence id, which
137   // make recovery complicated.
138   if (result.allow_2pc) {
139     result.avoid_flush_during_recovery = false;
140   }
141 
142 #ifndef ROCKSDB_LITE
143   ImmutableDBOptions immutable_db_options(result);
144   if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
145     // Either the WAL dir and db_paths[0]/db_name are not the same, or we
146     // cannot tell for sure. In either case, assume they're different and
147     // explicitly cleanup the trash log files (bypass DeleteScheduler)
148     // Do this first so even if we end up calling
149     // DeleteScheduler::CleanupDirectory on the same dir later, it will be
150     // safe
151     std::vector<std::string> filenames;
152     Status s = result.env->GetChildren(result.wal_dir, &filenames);
153     s.PermitUncheckedError();  //**TODO: What to do on error?
154     for (std::string& filename : filenames) {
155       if (filename.find(".log.trash", filename.length() -
156                                           std::string(".log.trash").length()) !=
157           std::string::npos) {
158         std::string trash_file = result.wal_dir + "/" + filename;
159         result.env->DeleteFile(trash_file).PermitUncheckedError();
160       }
161     }
162   }
163   // When the DB is stopped, it's possible that there are some .trash files that
164   // were not deleted yet, when we open the DB we will find these .trash files
165   // and schedule them to be deleted (or delete immediately if SstFileManager
166   // was not used)
167   auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
168   for (size_t i = 0; i < result.db_paths.size(); i++) {
169     DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
170         .PermitUncheckedError();
171   }
172 
173   // Create a default SstFileManager for purposes of tracking compaction size
174   // and facilitating recovery from out of space errors.
175   if (result.sst_file_manager.get() == nullptr) {
176     std::shared_ptr<SstFileManager> sst_file_manager(
177         NewSstFileManager(result.env, result.info_log));
178     result.sst_file_manager = sst_file_manager;
179   }
180 #endif  // !ROCKSDB_LITE
181 
182   if (!result.paranoid_checks) {
183     result.skip_checking_sst_file_sizes_on_db_open = true;
184     ROCKS_LOG_INFO(result.info_log,
185                    "file size check will be skipped during open.");
186   }
187 
188   return result;
189 }
190 
191 namespace {
ValidateOptionsByTable(const DBOptions & db_opts,const std::vector<ColumnFamilyDescriptor> & column_families)192 Status ValidateOptionsByTable(
193     const DBOptions& db_opts,
194     const std::vector<ColumnFamilyDescriptor>& column_families) {
195   Status s;
196   for (auto cf : column_families) {
197     s = ValidateOptions(db_opts, cf.options);
198     if (!s.ok()) {
199       return s;
200     }
201   }
202   return Status::OK();
203 }
204 }  // namespace
205 
ValidateOptions(const DBOptions & db_options,const std::vector<ColumnFamilyDescriptor> & column_families)206 Status DBImpl::ValidateOptions(
207     const DBOptions& db_options,
208     const std::vector<ColumnFamilyDescriptor>& column_families) {
209   Status s;
210   for (auto& cfd : column_families) {
211     s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
212     if (!s.ok()) {
213       return s;
214     }
215   }
216   s = ValidateOptions(db_options);
217   return s;
218 }
219 
ValidateOptions(const DBOptions & db_options)220 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
221   if (db_options.db_paths.size() > 4) {
222     return Status::NotSupported(
223         "More than four DB paths are not supported yet. ");
224   }
225 
226   if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
227     // Protect against assert in PosixMMapReadableFile constructor
228     return Status::NotSupported(
229         "If memory mapped reads (allow_mmap_reads) are enabled "
230         "then direct I/O reads (use_direct_reads) must be disabled. ");
231   }
232 
233   if (db_options.allow_mmap_writes &&
234       db_options.use_direct_io_for_flush_and_compaction) {
235     return Status::NotSupported(
236         "If memory mapped writes (allow_mmap_writes) are enabled "
237         "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
238         "be disabled. ");
239   }
240 
241   if (db_options.keep_log_file_num == 0) {
242     return Status::InvalidArgument("keep_log_file_num must be greater than 0");
243   }
244 
245   if (db_options.unordered_write &&
246       !db_options.allow_concurrent_memtable_write) {
247     return Status::InvalidArgument(
248         "unordered_write is incompatible with !allow_concurrent_memtable_write");
249   }
250 
251   if (db_options.unordered_write && db_options.enable_pipelined_write) {
252     return Status::InvalidArgument(
253         "unordered_write is incompatible with enable_pipelined_write");
254   }
255 
256   if (db_options.atomic_flush && db_options.enable_pipelined_write) {
257     return Status::InvalidArgument(
258         "atomic_flush is incompatible with enable_pipelined_write");
259   }
260 
261   // TODO remove this restriction
262   if (db_options.atomic_flush && db_options.best_efforts_recovery) {
263     return Status::InvalidArgument(
264         "atomic_flush is currently incompatible with best-efforts recovery");
265   }
266 
267   return Status::OK();
268 }
269 
NewDB(std::vector<std::string> * new_filenames)270 Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
271   VersionEdit new_db;
272   Status s = SetIdentityFile(env_, dbname_);
273   if (!s.ok()) {
274     return s;
275   }
276   if (immutable_db_options_.write_dbid_to_manifest) {
277     std::string temp_db_id;
278     GetDbIdentityFromIdentityFile(&temp_db_id);
279     new_db.SetDBId(temp_db_id);
280   }
281   new_db.SetLogNumber(0);
282   new_db.SetNextFile(2);
283   new_db.SetLastSequence(0);
284 
285   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
286   const std::string manifest = DescriptorFileName(dbname_, 1);
287   {
288     if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
289       fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
290     }
291     std::unique_ptr<FSWritableFile> file;
292     FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
293     s = NewWritableFile(fs_.get(), manifest, &file, file_options);
294     if (!s.ok()) {
295       return s;
296     }
297     FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
298     file->SetPreallocationBlockSize(
299         immutable_db_options_.manifest_preallocation_size);
300     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
301         std::move(file), manifest, file_options, immutable_db_options_.clock,
302         io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
303         nullptr, tmp_set.Contains(FileType::kDescriptorFile)));
304     log::Writer log(std::move(file_writer), 0, false);
305     std::string record;
306     new_db.EncodeTo(&record);
307     s = log.AddRecord(record);
308     if (s.ok()) {
309       s = SyncManifest(&immutable_db_options_, log.file());
310     }
311   }
312   if (s.ok()) {
313     // Make "CURRENT" file that points to the new manifest file.
314     s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
315     if (new_filenames) {
316       new_filenames->emplace_back(
317           manifest.substr(manifest.find_last_of("/\\") + 1));
318     }
319   } else {
320     fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
321   }
322   return s;
323 }
324 
CreateAndNewDirectory(FileSystem * fs,const std::string & dirname,std::unique_ptr<FSDirectory> * directory)325 IOStatus DBImpl::CreateAndNewDirectory(
326     FileSystem* fs, const std::string& dirname,
327     std::unique_ptr<FSDirectory>* directory) {
328   // We call CreateDirIfMissing() as the directory may already exist (if we
329   // are reopening a DB), when this happens we don't want creating the
330   // directory to cause an error. However, we need to check if creating the
331   // directory fails or else we may get an obscure message about the lock
332   // file not existing. One real-world example of this occurring is if
333   // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
334   // when dbname_ is "dir/db" but when "dir" doesn't exist.
335   IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
336   if (!io_s.ok()) {
337     return io_s;
338   }
339   return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
340 }
341 
SetDirectories(FileSystem * fs,const std::string & dbname,const std::string & wal_dir,const std::vector<DbPath> & data_paths)342 IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
343                                      const std::string& wal_dir,
344                                      const std::vector<DbPath>& data_paths) {
345   IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
346   if (!io_s.ok()) {
347     return io_s;
348   }
349   if (!wal_dir.empty() && dbname != wal_dir) {
350     io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
351     if (!io_s.ok()) {
352       return io_s;
353     }
354   }
355 
356   data_dirs_.clear();
357   for (auto& p : data_paths) {
358     const std::string db_path = p.path;
359     if (db_path == dbname) {
360       data_dirs_.emplace_back(nullptr);
361     } else {
362       std::unique_ptr<FSDirectory> path_directory;
363       io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
364       if (!io_s.ok()) {
365         return io_s;
366       }
367       data_dirs_.emplace_back(path_directory.release());
368     }
369   }
370   assert(data_dirs_.size() == data_paths.size());
371   return IOStatus::OK();
372 }
373 
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,bool error_if_wal_file_exists,bool error_if_data_exists_in_wals,uint64_t * recovered_seq)374 Status DBImpl::Recover(
375     const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
376     bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
377     uint64_t* recovered_seq) {
378   mutex_.AssertHeld();
379 
380   bool is_new_db = false;
381   assert(db_lock_ == nullptr);
382   std::vector<std::string> files_in_dbname;
383   if (!read_only) {
384     Status s = directories_.SetDirectories(fs_.get(), dbname_,
385                                            immutable_db_options_.wal_dir,
386                                            immutable_db_options_.db_paths);
387     if (!s.ok()) {
388       return s;
389     }
390 
391     s = env_->LockFile(LockFileName(dbname_), &db_lock_);
392     if (!s.ok()) {
393       return s;
394     }
395 
396     std::string current_fname = CurrentFileName(dbname_);
397     // Path to any MANIFEST file in the db dir. It does not matter which one.
398     // Since best-efforts recovery ignores CURRENT file, existence of a
399     // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
400     // can be found, a new db will be created.
401     std::string manifest_path;
402     if (!immutable_db_options_.best_efforts_recovery) {
403       s = env_->FileExists(current_fname);
404     } else {
405       s = Status::NotFound();
406       Status io_s = env_->GetChildren(dbname_, &files_in_dbname);
407       if (!io_s.ok()) {
408         s = io_s;
409         files_in_dbname.clear();
410       }
411       for (const std::string& file : files_in_dbname) {
412         uint64_t number = 0;
413         FileType type = kWalFile;  // initialize
414         if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
415           // Found MANIFEST (descriptor log), thus best-efforts recovery does
416           // not have to treat the db as empty.
417           s = Status::OK();
418           manifest_path = dbname_ + "/" + file;
419           break;
420         }
421       }
422     }
423     if (s.IsNotFound()) {
424       if (immutable_db_options_.create_if_missing) {
425         s = NewDB(&files_in_dbname);
426         is_new_db = true;
427         if (!s.ok()) {
428           return s;
429         }
430       } else {
431         return Status::InvalidArgument(
432             current_fname, "does not exist (create_if_missing is false)");
433       }
434     } else if (s.ok()) {
435       if (immutable_db_options_.error_if_exists) {
436         return Status::InvalidArgument(dbname_,
437                                        "exists (error_if_exists is true)");
438       }
439     } else {
440       // Unexpected error reading file
441       assert(s.IsIOError());
442       return s;
443     }
444     // Verify compatibility of file_options_ and filesystem
445     {
446       std::unique_ptr<FSRandomAccessFile> idfile;
447       FileOptions customized_fs(file_options_);
448       customized_fs.use_direct_reads |=
449           immutable_db_options_.use_direct_io_for_flush_and_compaction;
450       const std::string& fname =
451           manifest_path.empty() ? current_fname : manifest_path;
452       s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
453       if (!s.ok()) {
454         std::string error_str = s.ToString();
455         // Check if unsupported Direct I/O is the root cause
456         customized_fs.use_direct_reads = false;
457         s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
458         if (s.ok()) {
459           return Status::InvalidArgument(
460               "Direct I/O is not supported by the specified DB.");
461         } else {
462           return Status::InvalidArgument(
463               "Found options incompatible with filesystem", error_str.c_str());
464         }
465       }
466     }
467   } else if (immutable_db_options_.best_efforts_recovery) {
468     assert(files_in_dbname.empty());
469     Status s = env_->GetChildren(dbname_, &files_in_dbname);
470     if (s.IsNotFound()) {
471       return Status::InvalidArgument(dbname_,
472                                      "does not exist (open for read only)");
473     } else if (s.IsIOError()) {
474       return s;
475     }
476     assert(s.ok());
477   }
478   assert(db_id_.empty());
479   Status s;
480   bool missing_table_file = false;
481   if (!immutable_db_options_.best_efforts_recovery) {
482     s = versions_->Recover(column_families, read_only, &db_id_);
483   } else {
484     assert(!files_in_dbname.empty());
485     s = versions_->TryRecover(column_families, read_only, files_in_dbname,
486                               &db_id_, &missing_table_file);
487     if (s.ok()) {
488       // TryRecover may delete previous column_family_set_.
489       column_family_memtables_.reset(
490           new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
491     }
492   }
493   if (!s.ok()) {
494     return s;
495   }
496   s = SetDBId(read_only);
497   if (s.ok() && !read_only) {
498     s = DeleteUnreferencedSstFiles();
499   }
500 
501   if (immutable_db_options_.paranoid_checks && s.ok()) {
502     s = CheckConsistency();
503   }
504   if (s.ok() && !read_only) {
505     std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
506     for (auto cfd : *versions_->GetColumnFamilySet()) {
507       s = cfd->AddDirectories(&created_dirs);
508       if (!s.ok()) {
509         return s;
510       }
511     }
512   }
513   // DB mutex is already held
514   if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
515     s = InitPersistStatsColumnFamily();
516   }
517 
518   std::vector<std::string> files_in_wal_dir;
519   if (s.ok()) {
520     // Initial max_total_in_memory_state_ before recovery wals. Log recovery
521     // may check this value to decide whether to flush.
522     max_total_in_memory_state_ = 0;
523     for (auto cfd : *versions_->GetColumnFamilySet()) {
524       auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
525       max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
526                                     mutable_cf_options->max_write_buffer_number;
527     }
528 
529     SequenceNumber next_sequence(kMaxSequenceNumber);
530     default_cf_handle_ = new ColumnFamilyHandleImpl(
531         versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
532     default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
533     // TODO(Zhongyi): handle single_column_family_mode_ when
534     // persistent_stats is enabled
535     single_column_family_mode_ =
536         versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
537 
538     // Recover from all newer log files than the ones named in the
539     // descriptor (new log files may have been added by the previous
540     // incarnation without registering them in the descriptor).
541     //
542     // Note that prev_log_number() is no longer used, but we pay
543     // attention to it in case we are recovering a database
544     // produced by an older version of rocksdb.
545     if (!immutable_db_options_.best_efforts_recovery) {
546       s = env_->GetChildren(immutable_db_options_.wal_dir, &files_in_wal_dir);
547     }
548     if (s.IsNotFound()) {
549       return Status::InvalidArgument("wal_dir not found",
550                                      immutable_db_options_.wal_dir);
551     } else if (!s.ok()) {
552       return s;
553     }
554 
555     std::unordered_map<uint64_t, std::string> wal_files;
556     for (const auto& file : files_in_wal_dir) {
557       uint64_t number;
558       FileType type;
559       if (ParseFileName(file, &number, &type) && type == kWalFile) {
560         if (is_new_db) {
561           return Status::Corruption(
562               "While creating a new Db, wal_dir contains "
563               "existing log file: ",
564               file);
565         } else {
566           wal_files[number] =
567               LogFileName(immutable_db_options_.wal_dir, number);
568         }
569       }
570     }
571 
572     if (immutable_db_options_.track_and_verify_wals_in_manifest) {
573       if (!immutable_db_options_.best_efforts_recovery) {
574         // Verify WALs in MANIFEST.
575         s = versions_->GetWalSet().CheckWals(env_, wal_files);
576       }  // else since best effort recovery does not recover from WALs, no need
577          // to check WALs.
578     } else if (!versions_->GetWalSet().GetWals().empty()) {
579       // Tracking is disabled, clear previously tracked WALs from MANIFEST,
580       // otherwise, in the future, if WAL tracking is enabled again,
581       // since the WALs deleted when WAL tracking is disabled are not persisted
582       // into MANIFEST, WAL check may fail.
583       VersionEdit edit;
584       WalNumber max_wal_number =
585           versions_->GetWalSet().GetWals().rbegin()->first;
586       edit.DeleteWalsBefore(max_wal_number + 1);
587       s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
588     }
589     if (!s.ok()) {
590       return s;
591     }
592 
593     if (!wal_files.empty()) {
594       if (error_if_wal_file_exists) {
595         return Status::Corruption(
596             "The db was opened in readonly mode with error_if_wal_file_exists"
597             "flag but a WAL file already exists");
598       } else if (error_if_data_exists_in_wals) {
599         for (auto& wal_file : wal_files) {
600           uint64_t bytes;
601           s = env_->GetFileSize(wal_file.second, &bytes);
602           if (s.ok()) {
603             if (bytes > 0) {
604               return Status::Corruption(
605                   "error_if_data_exists_in_wals is set but there are data "
606                   " in WAL files.");
607             }
608           }
609         }
610       }
611     }
612 
613     if (!wal_files.empty()) {
614       // Recover in the order in which the wals were generated
615       std::vector<uint64_t> wals;
616       wals.reserve(wal_files.size());
617       for (const auto& wal_file : wal_files) {
618         wals.push_back(wal_file.first);
619       }
620       std::sort(wals.begin(), wals.end());
621 
622       bool corrupted_wal_found = false;
623       s = RecoverLogFiles(wals, &next_sequence, read_only,
624                           &corrupted_wal_found);
625       if (corrupted_wal_found && recovered_seq != nullptr) {
626         *recovered_seq = next_sequence;
627       }
628       if (!s.ok()) {
629         // Clear memtables if recovery failed
630         for (auto cfd : *versions_->GetColumnFamilySet()) {
631           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
632                                  kMaxSequenceNumber);
633         }
634       }
635     }
636   }
637 
638   if (read_only) {
639     // If we are opening as read-only, we need to update options_file_number_
640     // to reflect the most recent OPTIONS file. It does not matter for regular
641     // read-write db instance because options_file_number_ will later be
642     // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
643     std::vector<std::string> filenames;
644     if (s.ok()) {
645       const std::string normalized_dbname = NormalizePath(dbname_);
646       const std::string normalized_wal_dir =
647           NormalizePath(immutable_db_options_.wal_dir);
648       if (immutable_db_options_.best_efforts_recovery) {
649         filenames = std::move(files_in_dbname);
650       } else if (normalized_dbname == normalized_wal_dir) {
651         filenames = std::move(files_in_wal_dir);
652       } else {
653         s = env_->GetChildren(GetName(), &filenames);
654       }
655     }
656     if (s.ok()) {
657       uint64_t number = 0;
658       uint64_t options_file_number = 0;
659       FileType type;
660       for (const auto& fname : filenames) {
661         if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
662           options_file_number = std::max(number, options_file_number);
663         }
664       }
665       versions_->options_file_number_ = options_file_number;
666     }
667   }
668   return s;
669 }
670 
PersistentStatsProcessFormatVersion()671 Status DBImpl::PersistentStatsProcessFormatVersion() {
672   mutex_.AssertHeld();
673   Status s;
674   // persist version when stats CF doesn't exist
675   bool should_persist_format_version = !persistent_stats_cfd_exists_;
676   mutex_.Unlock();
677   if (persistent_stats_cfd_exists_) {
678     // Check persistent stats format version compatibility. Drop and recreate
679     // persistent stats CF if format version is incompatible
680     uint64_t format_version_recovered = 0;
681     Status s_format = DecodePersistentStatsVersionNumber(
682         this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
683     uint64_t compatible_version_recovered = 0;
684     Status s_compatible = DecodePersistentStatsVersionNumber(
685         this, StatsVersionKeyType::kCompatibleVersion,
686         &compatible_version_recovered);
687     // abort reading from existing stats CF if any of following is true:
688     // 1. failed to read format version or compatible version from disk
689     // 2. sst's format version is greater than current format version, meaning
690     // this sst is encoded with a newer RocksDB release, and current compatible
691     // version is below the sst's compatible version
692     if (!s_format.ok() || !s_compatible.ok() ||
693         (kStatsCFCurrentFormatVersion < format_version_recovered &&
694          kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
695       if (!s_format.ok() || !s_compatible.ok()) {
696         ROCKS_LOG_WARN(
697             immutable_db_options_.info_log,
698             "Recreating persistent stats column family since reading "
699             "persistent stats version key failed. Format key: %s, compatible "
700             "key: %s",
701             s_format.ToString().c_str(), s_compatible.ToString().c_str());
702       } else {
703         ROCKS_LOG_WARN(
704             immutable_db_options_.info_log,
705             "Recreating persistent stats column family due to corrupted or "
706             "incompatible format version. Recovered format: %" PRIu64
707             "; recovered format compatible since: %" PRIu64 "\n",
708             format_version_recovered, compatible_version_recovered);
709       }
710       s = DropColumnFamily(persist_stats_cf_handle_);
711       if (s.ok()) {
712         s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
713       }
714       ColumnFamilyHandle* handle = nullptr;
715       if (s.ok()) {
716         ColumnFamilyOptions cfo;
717         OptimizeForPersistentStats(&cfo);
718         s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
719       }
720       if (s.ok()) {
721         persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
722         // should also persist version here because old stats CF is discarded
723         should_persist_format_version = true;
724       }
725     }
726   }
727   if (should_persist_format_version) {
728     // Persistent stats CF being created for the first time, need to write
729     // format version key
730     WriteBatch batch;
731     if (s.ok()) {
732       s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
733                     ToString(kStatsCFCurrentFormatVersion));
734     }
735     if (s.ok()) {
736       s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
737                     ToString(kStatsCFCompatibleFormatVersion));
738     }
739     if (s.ok()) {
740       WriteOptions wo;
741       wo.low_pri = true;
742       wo.no_slowdown = true;
743       wo.sync = false;
744       s = Write(wo, &batch);
745     }
746   }
747   mutex_.Lock();
748   return s;
749 }
750 
InitPersistStatsColumnFamily()751 Status DBImpl::InitPersistStatsColumnFamily() {
752   mutex_.AssertHeld();
753   assert(!persist_stats_cf_handle_);
754   ColumnFamilyData* persistent_stats_cfd =
755       versions_->GetColumnFamilySet()->GetColumnFamily(
756           kPersistentStatsColumnFamilyName);
757   persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
758 
759   Status s;
760   if (persistent_stats_cfd != nullptr) {
761     // We are recovering from a DB which already contains persistent stats CF,
762     // the CF is already created in VersionSet::ApplyOneVersionEdit, but
763     // column family handle was not. Need to explicitly create handle here.
764     persist_stats_cf_handle_ =
765         new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
766   } else {
767     mutex_.Unlock();
768     ColumnFamilyHandle* handle = nullptr;
769     ColumnFamilyOptions cfo;
770     OptimizeForPersistentStats(&cfo);
771     s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
772     persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
773     mutex_.Lock();
774   }
775   return s;
776 }
777 
778 // REQUIRES: wal_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & wal_numbers,SequenceNumber * next_sequence,bool read_only,bool * corrupted_wal_found)779 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
780                                SequenceNumber* next_sequence, bool read_only,
781                                bool* corrupted_wal_found) {
782   struct LogReporter : public log::Reader::Reporter {
783     Env* env;
784     Logger* info_log;
785     const char* fname;
786     Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
787     void Corruption(size_t bytes, const Status& s) override {
788       ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
789                      (status == nullptr ? "(ignoring error) " : ""), fname,
790                      static_cast<int>(bytes), s.ToString().c_str());
791       if (status != nullptr && status->ok()) {
792         *status = s;
793       }
794     }
795   };
796 
797   mutex_.AssertHeld();
798   Status status;
799   std::unordered_map<int, VersionEdit> version_edits;
800   // no need to refcount because iteration is under mutex
801   for (auto cfd : *versions_->GetColumnFamilySet()) {
802     VersionEdit edit;
803     edit.SetColumnFamily(cfd->GetID());
804     version_edits.insert({cfd->GetID(), edit});
805   }
806   int job_id = next_job_id_.fetch_add(1);
807   {
808     auto stream = event_logger_.Log();
809     stream << "job" << job_id << "event"
810            << "recovery_started";
811     stream << "wal_files";
812     stream.StartArray();
813     for (auto wal_number : wal_numbers) {
814       stream << wal_number;
815     }
816     stream.EndArray();
817   }
818 
819 #ifndef ROCKSDB_LITE
820   if (immutable_db_options_.wal_filter != nullptr) {
821     std::map<std::string, uint32_t> cf_name_id_map;
822     std::map<uint32_t, uint64_t> cf_lognumber_map;
823     for (auto cfd : *versions_->GetColumnFamilySet()) {
824       cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
825       cf_lognumber_map.insert(
826           std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
827     }
828 
829     immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
830                                                                cf_name_id_map);
831   }
832 #endif
833 
834   bool stop_replay_by_wal_filter = false;
835   bool stop_replay_for_corruption = false;
836   bool flushed = false;
837   uint64_t corrupted_wal_number = kMaxSequenceNumber;
838   uint64_t min_wal_number = MinLogNumberToKeep();
839   for (auto wal_number : wal_numbers) {
840     if (wal_number < min_wal_number) {
841       ROCKS_LOG_INFO(immutable_db_options_.info_log,
842                      "Skipping log #%" PRIu64
843                      " since it is older than min log to keep #%" PRIu64,
844                      wal_number, min_wal_number);
845       continue;
846     }
847     // The previous incarnation may not have written any MANIFEST
848     // records after allocating this log number.  So we manually
849     // update the file number allocation counter in VersionSet.
850     versions_->MarkFileNumberUsed(wal_number);
851     // Open the log file
852     std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
853 
854     ROCKS_LOG_INFO(immutable_db_options_.info_log,
855                    "Recovering log #%" PRIu64 " mode %d", wal_number,
856                    static_cast<int>(immutable_db_options_.wal_recovery_mode));
857     auto logFileDropped = [this, &fname]() {
858       uint64_t bytes;
859       if (env_->GetFileSize(fname, &bytes).ok()) {
860         auto info_log = immutable_db_options_.info_log.get();
861         ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
862                        static_cast<int>(bytes));
863       }
864     };
865     if (stop_replay_by_wal_filter) {
866       logFileDropped();
867       continue;
868     }
869 
870     std::unique_ptr<SequentialFileReader> file_reader;
871     {
872       std::unique_ptr<FSSequentialFile> file;
873       status = fs_->NewSequentialFile(fname,
874                                       fs_->OptimizeForLogRead(file_options_),
875                                       &file, nullptr);
876       if (!status.ok()) {
877         MaybeIgnoreError(&status);
878         if (!status.ok()) {
879           return status;
880         } else {
881           // Fail with one log file, but that's ok.
882           // Try next one.
883           continue;
884         }
885       }
886       file_reader.reset(new SequentialFileReader(
887           std::move(file), fname, immutable_db_options_.log_readahead_size,
888           io_tracer_));
889     }
890 
891     // Create the log reader.
892     LogReporter reporter;
893     reporter.env = env_;
894     reporter.info_log = immutable_db_options_.info_log.get();
895     reporter.fname = fname.c_str();
896     if (!immutable_db_options_.paranoid_checks ||
897         immutable_db_options_.wal_recovery_mode ==
898             WALRecoveryMode::kSkipAnyCorruptedRecords) {
899       reporter.status = nullptr;
900     } else {
901       reporter.status = &status;
902     }
903     // We intentially make log::Reader do checksumming even if
904     // paranoid_checks==false so that corruptions cause entire commits
905     // to be skipped instead of propagating bad information (like overly
906     // large sequence numbers).
907     log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
908                        &reporter, true /*checksum*/, wal_number);
909 
910     // Determine if we should tolerate incomplete records at the tail end of the
911     // Read all the records and add to a memtable
912     std::string scratch;
913     Slice record;
914     WriteBatch batch;
915 
916     TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
917                              /*arg=*/nullptr);
918     while (!stop_replay_by_wal_filter &&
919            reader.ReadRecord(&record, &scratch,
920                              immutable_db_options_.wal_recovery_mode) &&
921            status.ok()) {
922       if (record.size() < WriteBatchInternal::kHeader) {
923         reporter.Corruption(record.size(),
924                             Status::Corruption("log record too small"));
925         continue;
926       }
927 
928       status = WriteBatchInternal::SetContents(&batch, record);
929       if (!status.ok()) {
930         return status;
931       }
932       SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
933 
934       if (immutable_db_options_.wal_recovery_mode ==
935           WALRecoveryMode::kPointInTimeRecovery) {
936         // In point-in-time recovery mode, if sequence id of log files are
937         // consecutive, we continue recovery despite corruption. This could
938         // happen when we open and write to a corrupted DB, where sequence id
939         // will start from the last sequence id we recovered.
940         if (sequence == *next_sequence) {
941           stop_replay_for_corruption = false;
942         }
943         if (stop_replay_for_corruption) {
944           logFileDropped();
945           break;
946         }
947       }
948 
949 #ifndef ROCKSDB_LITE
950       if (immutable_db_options_.wal_filter != nullptr) {
951         WriteBatch new_batch;
952         bool batch_changed = false;
953 
954         WalFilter::WalProcessingOption wal_processing_option =
955             immutable_db_options_.wal_filter->LogRecordFound(
956                 wal_number, fname, batch, &new_batch, &batch_changed);
957 
958         switch (wal_processing_option) {
959           case WalFilter::WalProcessingOption::kContinueProcessing:
960             // do nothing, proceeed normally
961             break;
962           case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
963             // skip current record
964             continue;
965           case WalFilter::WalProcessingOption::kStopReplay:
966             // skip current record and stop replay
967             stop_replay_by_wal_filter = true;
968             continue;
969           case WalFilter::WalProcessingOption::kCorruptedRecord: {
970             status =
971                 Status::Corruption("Corruption reported by Wal Filter ",
972                                    immutable_db_options_.wal_filter->Name());
973             MaybeIgnoreError(&status);
974             if (!status.ok()) {
975               reporter.Corruption(record.size(), status);
976               continue;
977             }
978             break;
979           }
980           default: {
981             assert(false);  // unhandled case
982             status = Status::NotSupported(
983                 "Unknown WalProcessingOption returned"
984                 " by Wal Filter ",
985                 immutable_db_options_.wal_filter->Name());
986             MaybeIgnoreError(&status);
987             if (!status.ok()) {
988               return status;
989             } else {
990               // Ignore the error with current record processing.
991               continue;
992             }
993           }
994         }
995 
996         if (batch_changed) {
997           // Make sure that the count in the new batch is
998           // within the orignal count.
999           int new_count = WriteBatchInternal::Count(&new_batch);
1000           int original_count = WriteBatchInternal::Count(&batch);
1001           if (new_count > original_count) {
1002             ROCKS_LOG_FATAL(
1003                 immutable_db_options_.info_log,
1004                 "Recovering log #%" PRIu64
1005                 " mode %d log filter %s returned "
1006                 "more records (%d) than original (%d) which is not allowed. "
1007                 "Aborting recovery.",
1008                 wal_number,
1009                 static_cast<int>(immutable_db_options_.wal_recovery_mode),
1010                 immutable_db_options_.wal_filter->Name(), new_count,
1011                 original_count);
1012             status = Status::NotSupported(
1013                 "More than original # of records "
1014                 "returned by Wal Filter ",
1015                 immutable_db_options_.wal_filter->Name());
1016             return status;
1017           }
1018           // Set the same sequence number in the new_batch
1019           // as the original batch.
1020           WriteBatchInternal::SetSequence(&new_batch,
1021                                           WriteBatchInternal::Sequence(&batch));
1022           batch = new_batch;
1023         }
1024       }
1025 #endif  // ROCKSDB_LITE
1026 
1027       // If column family was not found, it might mean that the WAL write
1028       // batch references to the column family that was dropped after the
1029       // insert. We don't want to fail the whole write batch in that case --
1030       // we just ignore the update.
1031       // That's why we set ignore missing column families to true
1032       bool has_valid_writes = false;
1033       status = WriteBatchInternal::InsertInto(
1034           &batch, column_family_memtables_.get(), &flush_scheduler_,
1035           &trim_history_scheduler_, true, wal_number, this,
1036           false /* concurrent_memtable_writes */, next_sequence,
1037           &has_valid_writes, seq_per_batch_, batch_per_txn_);
1038       MaybeIgnoreError(&status);
1039       if (!status.ok()) {
1040         // We are treating this as a failure while reading since we read valid
1041         // blocks that do not form coherent data
1042         reporter.Corruption(record.size(), status);
1043         continue;
1044       }
1045 
1046       if (has_valid_writes && !read_only) {
1047         // we can do this because this is called before client has access to the
1048         // DB and there is only a single thread operating on DB
1049         ColumnFamilyData* cfd;
1050 
1051         while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1052           cfd->UnrefAndTryDelete();
1053           // If this asserts, it means that InsertInto failed in
1054           // filtering updates to already-flushed column families
1055           assert(cfd->GetLogNumber() <= wal_number);
1056           auto iter = version_edits.find(cfd->GetID());
1057           assert(iter != version_edits.end());
1058           VersionEdit* edit = &iter->second;
1059           status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1060           if (!status.ok()) {
1061             // Reflect errors immediately so that conditions like full
1062             // file-systems cause the DB::Open() to fail.
1063             return status;
1064           }
1065           flushed = true;
1066 
1067           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1068                                  *next_sequence);
1069         }
1070       }
1071     }
1072 
1073     if (!status.ok()) {
1074       if (status.IsNotSupported()) {
1075         // We should not treat NotSupported as corruption. It is rather a clear
1076         // sign that we are processing a WAL that is produced by an incompatible
1077         // version of the code.
1078         return status;
1079       }
1080       if (immutable_db_options_.wal_recovery_mode ==
1081           WALRecoveryMode::kSkipAnyCorruptedRecords) {
1082         // We should ignore all errors unconditionally
1083         status = Status::OK();
1084       } else if (immutable_db_options_.wal_recovery_mode ==
1085                  WALRecoveryMode::kPointInTimeRecovery) {
1086         if (status.IsIOError()) {
1087           ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1088                           "IOError during point-in-time reading log #%" PRIu64
1089                           " seq #%" PRIu64
1090                           ". %s. This likely mean loss of synced WAL, "
1091                           "thus recovery fails.",
1092                           wal_number, *next_sequence,
1093                           status.ToString().c_str());
1094           return status;
1095         }
1096         // We should ignore the error but not continue replaying
1097         status = Status::OK();
1098         stop_replay_for_corruption = true;
1099         corrupted_wal_number = wal_number;
1100         if (corrupted_wal_found != nullptr) {
1101           *corrupted_wal_found = true;
1102         }
1103         ROCKS_LOG_INFO(immutable_db_options_.info_log,
1104                        "Point in time recovered to log #%" PRIu64
1105                        " seq #%" PRIu64,
1106                        wal_number, *next_sequence);
1107       } else {
1108         assert(immutable_db_options_.wal_recovery_mode ==
1109                    WALRecoveryMode::kTolerateCorruptedTailRecords ||
1110                immutable_db_options_.wal_recovery_mode ==
1111                    WALRecoveryMode::kAbsoluteConsistency);
1112         return status;
1113       }
1114     }
1115 
1116     flush_scheduler_.Clear();
1117     trim_history_scheduler_.Clear();
1118     auto last_sequence = *next_sequence - 1;
1119     if ((*next_sequence != kMaxSequenceNumber) &&
1120         (versions_->LastSequence() <= last_sequence)) {
1121       versions_->SetLastAllocatedSequence(last_sequence);
1122       versions_->SetLastPublishedSequence(last_sequence);
1123       versions_->SetLastSequence(last_sequence);
1124     }
1125   }
1126   // Compare the corrupted log number to all columnfamily's current log number.
1127   // Abort Open() if any column family's log number is greater than
1128   // the corrupted log number, which means CF contains data beyond the point of
1129   // corruption. This could during PIT recovery when the WAL is corrupted and
1130   // some (but not all) CFs are flushed
1131   // Exclude the PIT case where no log is dropped after the corruption point.
1132   // This is to cover the case for empty wals after corrupted log, in which we
1133   // don't reset stop_replay_for_corruption.
1134   if (stop_replay_for_corruption == true &&
1135       (immutable_db_options_.wal_recovery_mode ==
1136            WALRecoveryMode::kPointInTimeRecovery ||
1137        immutable_db_options_.wal_recovery_mode ==
1138            WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1139     for (auto cfd : *versions_->GetColumnFamilySet()) {
1140       // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
1141       // the CF is still consistent: If a new column family is created during
1142       // the flush and the WAL sync fails at the same time, the new CF points to
1143       // the new WAL but the old WAL is curropted. Since the new CF is empty, it
1144       // is still consistent. We add the check of CF sst file size to avoid the
1145       // false positive alert.
1146 
1147       // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
1148       // the ignorance of a very rare inconsistency case caused in data
1149       // canclation. One CF is empty due to KV deletion. But those operations
1150       // are in the WAL. If the WAL is corrupted, the status of this CF might
1151       // not be consistent with others. However, the consistency check will be
1152       // bypassed due to empty CF.
1153       // TODO: a better and complete implementation is needed to ensure strict
1154       // consistency check in WAL recovery including hanlding the tailing
1155       // issues.
1156       if (cfd->GetLogNumber() > corrupted_wal_number &&
1157           cfd->GetLiveSstFilesSize() > 0) {
1158         ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1159                         "Column family inconsistency: SST file contains data"
1160                         " beyond the point of corruption.");
1161         return Status::Corruption("SST file is ahead of WALs in CF " +
1162                                   cfd->GetName());
1163       }
1164     }
1165   }
1166 
1167   // True if there's any data in the WALs; if not, we can skip re-processing
1168   // them later
1169   bool data_seen = false;
1170   if (!read_only) {
1171     // no need to refcount since client still doesn't have access
1172     // to the DB and can not drop column families while we iterate
1173     const WalNumber max_wal_number = wal_numbers.back();
1174     for (auto cfd : *versions_->GetColumnFamilySet()) {
1175       auto iter = version_edits.find(cfd->GetID());
1176       assert(iter != version_edits.end());
1177       VersionEdit* edit = &iter->second;
1178 
1179       if (cfd->GetLogNumber() > max_wal_number) {
1180         // Column family cfd has already flushed the data
1181         // from all wals. Memtable has to be empty because
1182         // we filter the updates based on wal_number
1183         // (in WriteBatch::InsertInto)
1184         assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1185         assert(edit->NumEntries() == 0);
1186         continue;
1187       }
1188 
1189       TEST_SYNC_POINT_CALLBACK(
1190           "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1191 
1192       // flush the final memtable (if non-empty)
1193       if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1194         // If flush happened in the middle of recovery (e.g. due to memtable
1195         // being full), we flush at the end. Otherwise we'll need to record
1196         // where we were on last flush, which make the logic complicated.
1197         if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1198           status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1199           if (!status.ok()) {
1200             // Recovery failed
1201             break;
1202           }
1203           flushed = true;
1204 
1205           cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1206                                  versions_->LastSequence());
1207         }
1208         data_seen = true;
1209       }
1210 
1211       // Update the log number info in the version edit corresponding to this
1212       // column family. Note that the version edits will be written to MANIFEST
1213       // together later.
1214       // writing wal_number in the manifest means that any log file
1215       // with number strongly less than (wal_number + 1) is already
1216       // recovered and should be ignored on next reincarnation.
1217       // Since we already recovered max_wal_number, we want all wals
1218       // with numbers `<= max_wal_number` (includes this one) to be ignored
1219       if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1220         edit->SetLogNumber(max_wal_number + 1);
1221       }
1222     }
1223     if (status.ok()) {
1224       // we must mark the next log number as used, even though it's
1225       // not actually used. that is because VersionSet assumes
1226       // VersionSet::next_file_number_ always to be strictly greater than any
1227       // log number
1228       versions_->MarkFileNumberUsed(max_wal_number + 1);
1229 
1230       autovector<ColumnFamilyData*> cfds;
1231       autovector<const MutableCFOptions*> cf_opts;
1232       autovector<autovector<VersionEdit*>> edit_lists;
1233       for (auto* cfd : *versions_->GetColumnFamilySet()) {
1234         cfds.push_back(cfd);
1235         cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1236         auto iter = version_edits.find(cfd->GetID());
1237         assert(iter != version_edits.end());
1238         edit_lists.push_back({&iter->second});
1239       }
1240 
1241       std::unique_ptr<VersionEdit> wal_deletion;
1242       if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1243         wal_deletion.reset(new VersionEdit);
1244         wal_deletion->DeleteWalsBefore(max_wal_number + 1);
1245         edit_lists.back().push_back(wal_deletion.get());
1246       }
1247 
1248       // write MANIFEST with update
1249       status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1250                                       directories_.GetDbDir(),
1251                                       /*new_descriptor_log=*/true);
1252     }
1253   }
1254 
1255   if (status.ok()) {
1256     if (data_seen && !flushed) {
1257       status = RestoreAliveLogFiles(wal_numbers);
1258     } else {
1259       // If there's no data in the WAL, or we flushed all the data, still
1260       // truncate the log file. If the process goes into a crash loop before
1261       // the file is deleted, the preallocated space will never get freed.
1262       GetLogSizeAndMaybeTruncate(wal_numbers.back(), true, nullptr)
1263           .PermitUncheckedError();
1264     }
1265   }
1266 
1267   event_logger_.Log() << "job" << job_id << "event"
1268                       << "recovery_finished";
1269 
1270   return status;
1271 }
1272 
GetLogSizeAndMaybeTruncate(uint64_t wal_number,bool truncate,LogFileNumberSize * log_ptr)1273 Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
1274                                           LogFileNumberSize* log_ptr) {
1275   LogFileNumberSize log(wal_number);
1276   std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
1277   Status s;
1278   // This gets the appear size of the wals, not including preallocated space.
1279   s = env_->GetFileSize(fname, &log.size);
1280   if (s.ok() && truncate) {
1281     std::unique_ptr<FSWritableFile> last_log;
1282     Status truncate_status = fs_->ReopenWritableFile(
1283         fname,
1284         fs_->OptimizeForLogWrite(
1285             file_options_,
1286             BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1287         &last_log, nullptr);
1288     if (truncate_status.ok()) {
1289       truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1290     }
1291     if (truncate_status.ok()) {
1292       truncate_status = last_log->Close(IOOptions(), nullptr);
1293     }
1294     // Not a critical error if fail to truncate.
1295     if (!truncate_status.ok()) {
1296       ROCKS_LOG_WARN(immutable_db_options_.info_log,
1297                      "Failed to truncate log #%" PRIu64 ": %s", wal_number,
1298                      truncate_status.ToString().c_str());
1299     }
1300   }
1301   if (log_ptr) {
1302     *log_ptr = log;
1303   }
1304   return s;
1305 }
1306 
RestoreAliveLogFiles(const std::vector<uint64_t> & wal_numbers)1307 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
1308   if (wal_numbers.empty()) {
1309     return Status::OK();
1310   }
1311   Status s;
1312   mutex_.AssertHeld();
1313   assert(immutable_db_options_.avoid_flush_during_recovery);
1314   if (two_write_queues_) {
1315     log_write_mutex_.Lock();
1316   }
1317   // Mark these as alive so they'll be considered for deletion later by
1318   // FindObsoleteFiles()
1319   total_log_size_ = 0;
1320   log_empty_ = false;
1321   for (auto wal_number : wal_numbers) {
1322     // We preallocate space for wals, but then after a crash and restart, those
1323     // preallocated space are not needed anymore. It is likely only the last
1324     // log has such preallocated space, so we only truncate for the last log.
1325     LogFileNumberSize log;
1326     s = GetLogSizeAndMaybeTruncate(
1327         wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
1328     if (!s.ok()) {
1329       break;
1330     }
1331     total_log_size_ += log.size;
1332     alive_log_files_.push_back(log);
1333   }
1334   if (two_write_queues_) {
1335     log_write_mutex_.Unlock();
1336   }
1337   return s;
1338 }
1339 
WriteLevel0TableForRecovery(int job_id,ColumnFamilyData * cfd,MemTable * mem,VersionEdit * edit)1340 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1341                                            MemTable* mem, VersionEdit* edit) {
1342   mutex_.AssertHeld();
1343   const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
1344 
1345   FileMetaData meta;
1346   std::vector<BlobFileAddition> blob_file_additions;
1347 
1348   std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1349       new std::list<uint64_t>::iterator(
1350           CaptureCurrentFileNumberInPendingOutputs()));
1351   meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1352   ReadOptions ro;
1353   ro.total_order_seek = true;
1354   Arena arena;
1355   Status s;
1356   TableProperties table_properties;
1357   {
1358     ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1359     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1360                     "[%s] [WriteLevel0TableForRecovery]"
1361                     " Level-0 table #%" PRIu64 ": started",
1362                     cfd->GetName().c_str(), meta.fd.GetNumber());
1363 
1364     // Get the latest mutable cf options while the mutex is still locked
1365     const MutableCFOptions mutable_cf_options =
1366         *cfd->GetLatestMutableCFOptions();
1367     bool paranoid_file_checks =
1368         cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1369 
1370     int64_t _current_time = 0;
1371     immutable_db_options_.clock->GetCurrentTime(&_current_time)
1372         .PermitUncheckedError();  // ignore error
1373     const uint64_t current_time = static_cast<uint64_t>(_current_time);
1374     meta.oldest_ancester_time = current_time;
1375 
1376     {
1377       auto write_hint = cfd->CalculateSSTWriteHint(0);
1378       mutex_.Unlock();
1379 
1380       SequenceNumber earliest_write_conflict_snapshot;
1381       std::vector<SequenceNumber> snapshot_seqs =
1382           snapshots_.GetAll(&earliest_write_conflict_snapshot);
1383       auto snapshot_checker = snapshot_checker_.get();
1384       if (use_custom_gc_ && snapshot_checker == nullptr) {
1385         snapshot_checker = DisableGCSnapshotChecker::Instance();
1386       }
1387       std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1388           range_del_iters;
1389       auto range_del_iter =
1390           mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1391       if (range_del_iter != nullptr) {
1392         range_del_iters.emplace_back(range_del_iter);
1393       }
1394 
1395       IOStatus io_s;
1396       TableBuilderOptions tboptions(
1397           *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
1398           cfd->int_tbl_prop_collector_factories(),
1399           GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1400           mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
1401           0 /* level */, false /* is_bottommost */,
1402           TableFileCreationReason::kRecovery, current_time,
1403           0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_,
1404           db_session_id_, 0 /* target_file_size */);
1405       s = BuildTable(
1406           dbname_, versions_.get(), immutable_db_options_, tboptions,
1407           file_options_for_compaction_, cfd->table_cache(), iter.get(),
1408           std::move(range_del_iters), &meta, &blob_file_additions,
1409           snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1410           paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
1411           &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
1412           write_hint, nullptr /*full_history_ts_low*/, &blob_callback_);
1413       LogFlush(immutable_db_options_.info_log);
1414       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1415                       "[%s] [WriteLevel0TableForRecovery]"
1416                       " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1417                       cfd->GetName().c_str(), meta.fd.GetNumber(),
1418                       meta.fd.GetFileSize(), s.ToString().c_str());
1419       mutex_.Lock();
1420 
1421       io_s.PermitUncheckedError();  // TODO(AR) is this correct, or should we
1422                                     // return io_s if not ok()?
1423     }
1424   }
1425   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1426 
1427   // Note that if file_size is zero, the file has been deleted and
1428   // should not be added to the manifest.
1429   const bool has_output = meta.fd.GetFileSize() > 0;
1430 
1431   constexpr int level = 0;
1432 
1433   if (s.ok() && has_output) {
1434     edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1435                   meta.fd.GetFileSize(), meta.smallest, meta.largest,
1436                   meta.fd.smallest_seqno, meta.fd.largest_seqno,
1437                   meta.marked_for_compaction, meta.oldest_blob_file_number,
1438                   meta.oldest_ancester_time, meta.file_creation_time,
1439                   meta.file_checksum, meta.file_checksum_func_name);
1440 
1441     for (const auto& blob : blob_file_additions) {
1442       edit->AddBlobFile(blob);
1443     }
1444   }
1445 
1446   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1447   stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
1448 
1449   if (has_output) {
1450     stats.bytes_written = meta.fd.GetFileSize();
1451     stats.num_output_files = 1;
1452   }
1453 
1454   const auto& blobs = edit->GetBlobFileAdditions();
1455   for (const auto& blob : blobs) {
1456     stats.bytes_written_blob += blob.GetTotalBlobBytes();
1457   }
1458 
1459   stats.num_output_files_blob = static_cast<int>(blobs.size());
1460 
1461   cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1462   cfd->internal_stats()->AddCFStats(
1463       InternalStats::BYTES_FLUSHED,
1464       stats.bytes_written + stats.bytes_written_blob);
1465   RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1466   return s;
1467 }
1468 
Open(const Options & options,const std::string & dbname,DB ** dbptr)1469 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1470   DBOptions db_options(options);
1471   ColumnFamilyOptions cf_options(options);
1472   std::vector<ColumnFamilyDescriptor> column_families;
1473   column_families.push_back(
1474       ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1475   if (db_options.persist_stats_to_disk) {
1476     column_families.push_back(
1477         ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1478   }
1479   std::vector<ColumnFamilyHandle*> handles;
1480   Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1481   if (s.ok()) {
1482     if (db_options.persist_stats_to_disk) {
1483       assert(handles.size() == 2);
1484     } else {
1485       assert(handles.size() == 1);
1486     }
1487     // i can delete the handle since DBImpl is always holding a reference to
1488     // default column family
1489     if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1490       delete handles[1];
1491     }
1492     delete handles[0];
1493   }
1494   return s;
1495 }
1496 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)1497 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1498                 const std::vector<ColumnFamilyDescriptor>& column_families,
1499                 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1500   const bool kSeqPerBatch = true;
1501   const bool kBatchPerTxn = true;
1502   return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1503                       !kSeqPerBatch, kBatchPerTxn);
1504 }
1505 
CreateWAL(uint64_t log_file_num,uint64_t recycle_log_number,size_t preallocate_block_size,log::Writer ** new_log)1506 IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1507                            size_t preallocate_block_size,
1508                            log::Writer** new_log) {
1509   IOStatus io_s;
1510   std::unique_ptr<FSWritableFile> lfile;
1511 
1512   DBOptions db_options =
1513       BuildDBOptions(immutable_db_options_, mutable_db_options_);
1514   FileOptions opt_file_options =
1515       fs_->OptimizeForLogWrite(file_options_, db_options);
1516   std::string log_fname =
1517       LogFileName(immutable_db_options_.wal_dir, log_file_num);
1518 
1519   if (recycle_log_number) {
1520     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1521                    "reusing log %" PRIu64 " from recycle list\n",
1522                    recycle_log_number);
1523     std::string old_log_fname =
1524         LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
1525     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1526     TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1527     io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1528                                   &lfile, /*dbg=*/nullptr);
1529   } else {
1530     io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1531   }
1532 
1533   if (io_s.ok()) {
1534     lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1535     lfile->SetPreallocationBlockSize(preallocate_block_size);
1536 
1537     const auto& listeners = immutable_db_options_.listeners;
1538     FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
1539     std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1540         std::move(lfile), log_fname, opt_file_options,
1541         immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
1542         nullptr, tmp_set.Contains(FileType::kWalFile)));
1543     *new_log = new log::Writer(std::move(file_writer), log_file_num,
1544                                immutable_db_options_.recycle_log_file_num > 0,
1545                                immutable_db_options_.manual_wal_flush);
1546   }
1547   return io_s;
1548 }
1549 
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,const bool seq_per_batch,const bool batch_per_txn)1550 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1551                     const std::vector<ColumnFamilyDescriptor>& column_families,
1552                     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1553                     const bool seq_per_batch, const bool batch_per_txn) {
1554   Status s = ValidateOptionsByTable(db_options, column_families);
1555   if (!s.ok()) {
1556     return s;
1557   }
1558 
1559   s = ValidateOptions(db_options, column_families);
1560   if (!s.ok()) {
1561     return s;
1562   }
1563 
1564   *dbptr = nullptr;
1565   handles->clear();
1566 
1567   size_t max_write_buffer_size = 0;
1568   for (auto cf : column_families) {
1569     max_write_buffer_size =
1570         std::max(max_write_buffer_size, cf.options.write_buffer_size);
1571   }
1572 
1573   DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1574   s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
1575   if (s.ok()) {
1576     std::vector<std::string> paths;
1577     for (auto& db_path : impl->immutable_db_options_.db_paths) {
1578       paths.emplace_back(db_path.path);
1579     }
1580     for (auto& cf : column_families) {
1581       for (auto& cf_path : cf.options.cf_paths) {
1582         paths.emplace_back(cf_path.path);
1583       }
1584     }
1585     for (auto& path : paths) {
1586       s = impl->env_->CreateDirIfMissing(path);
1587       if (!s.ok()) {
1588         break;
1589       }
1590     }
1591 
1592     // For recovery from NoSpace() error, we can only handle
1593     // the case where the database is stored in a single path
1594     if (paths.size() <= 1) {
1595       impl->error_handler_.EnableAutoRecovery();
1596     }
1597   }
1598   if (s.ok()) {
1599     s = impl->CreateArchivalDirectory();
1600   }
1601   if (!s.ok()) {
1602     delete impl;
1603     return s;
1604   }
1605 
1606   impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
1607 
1608   impl->mutex_.Lock();
1609   // Handles create_if_missing, error_if_exists
1610   uint64_t recovered_seq(kMaxSequenceNumber);
1611   s = impl->Recover(column_families, false, false, false, &recovered_seq);
1612   if (s.ok()) {
1613     uint64_t new_log_number = impl->versions_->NewFileNumber();
1614     log::Writer* new_log = nullptr;
1615     const size_t preallocate_block_size =
1616         impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1617     s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1618                         preallocate_block_size, &new_log);
1619     if (s.ok()) {
1620       InstrumentedMutexLock wl(&impl->log_write_mutex_);
1621       impl->logfile_number_ = new_log_number;
1622       assert(new_log != nullptr);
1623       assert(impl->logs_.empty());
1624       impl->logs_.emplace_back(new_log_number, new_log);
1625     }
1626 
1627     if (s.ok()) {
1628       // set column family handles
1629       for (auto cf : column_families) {
1630         auto cfd =
1631             impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1632         if (cfd != nullptr) {
1633           handles->push_back(
1634               new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1635           impl->NewThreadStatusCfInfo(cfd);
1636         } else {
1637           if (db_options.create_missing_column_families) {
1638             // missing column family, create it
1639             ColumnFamilyHandle* handle;
1640             impl->mutex_.Unlock();
1641             s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1642             impl->mutex_.Lock();
1643             if (s.ok()) {
1644               handles->push_back(handle);
1645             } else {
1646               break;
1647             }
1648           } else {
1649             s = Status::InvalidArgument("Column family not found", cf.name);
1650             break;
1651           }
1652         }
1653       }
1654     }
1655     if (s.ok()) {
1656       SuperVersionContext sv_context(/* create_superversion */ true);
1657       for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1658         impl->InstallSuperVersionAndScheduleWork(
1659             cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1660       }
1661       sv_context.Clean();
1662       if (impl->two_write_queues_) {
1663         impl->log_write_mutex_.Lock();
1664       }
1665       impl->alive_log_files_.push_back(
1666           DBImpl::LogFileNumberSize(impl->logfile_number_));
1667       if (impl->two_write_queues_) {
1668         impl->log_write_mutex_.Unlock();
1669       }
1670 
1671       impl->DeleteObsoleteFiles();
1672       s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
1673     }
1674     if (s.ok()) {
1675       // In WritePrepared there could be gap in sequence numbers. This breaks
1676       // the trick we use in kPointInTimeRecovery which assumes the first seq in
1677       // the log right after the corrupted log is one larger than the last seq
1678       // we read from the wals. To let this trick keep working, we add a dummy
1679       // entry with the expected sequence to the first log right after recovery.
1680       // In non-WritePrepared case also the new log after recovery could be
1681       // empty, and thus missing the consecutive seq hint to distinguish
1682       // middle-log corruption to corrupted-log-remained-after-recovery. This
1683       // case also will be addressed by a dummy write.
1684       if (recovered_seq != kMaxSequenceNumber) {
1685         WriteBatch empty_batch;
1686         WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1687         WriteOptions write_options;
1688         uint64_t log_used, log_size;
1689         log::Writer* log_writer = impl->logs_.back().writer;
1690         s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1691         if (s.ok()) {
1692           // Need to fsync, otherwise it might get lost after a power reset.
1693           s = impl->FlushWAL(false);
1694           if (s.ok()) {
1695             s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1696           }
1697         }
1698       }
1699     }
1700   }
1701   if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1702     // try to read format version
1703     s = impl->PersistentStatsProcessFormatVersion();
1704   }
1705 
1706   if (s.ok()) {
1707     for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1708       if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1709         auto* vstorage = cfd->current()->storage_info();
1710         for (int i = 1; i < vstorage->num_levels(); ++i) {
1711           int num_files = vstorage->NumLevelFiles(i);
1712           if (num_files > 0) {
1713             s = Status::InvalidArgument(
1714                 "Not all files are at level 0. Cannot "
1715                 "open with FIFO compaction style.");
1716             break;
1717           }
1718         }
1719       }
1720       if (!cfd->mem()->IsSnapshotSupported()) {
1721         impl->is_snapshot_supported_ = false;
1722       }
1723       if (cfd->ioptions()->merge_operator != nullptr &&
1724           !cfd->mem()->IsMergeOperatorSupported()) {
1725         s = Status::InvalidArgument(
1726             "The memtable of column family %s does not support merge operator "
1727             "its options.merge_operator is non-null",
1728             cfd->GetName().c_str());
1729       }
1730       if (!s.ok()) {
1731         break;
1732       }
1733     }
1734   }
1735   TEST_SYNC_POINT("DBImpl::Open:Opened");
1736   Status persist_options_status;
1737   if (s.ok()) {
1738     // Persist RocksDB Options before scheduling the compaction.
1739     // The WriteOptionsFile() will release and lock the mutex internally.
1740     persist_options_status = impl->WriteOptionsFile(
1741         false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1742 
1743     *dbptr = impl;
1744     impl->opened_successfully_ = true;
1745     impl->MaybeScheduleFlushOrCompaction();
1746   } else {
1747     persist_options_status.PermitUncheckedError();
1748   }
1749   impl->mutex_.Unlock();
1750 
1751 #ifndef ROCKSDB_LITE
1752   auto sfm = static_cast<SstFileManagerImpl*>(
1753       impl->immutable_db_options_.sst_file_manager.get());
1754   if (s.ok() && sfm) {
1755     // Set Statistics ptr for SstFileManager to dump the stats of
1756     // DeleteScheduler.
1757     sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
1758     ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
1759                    "SstFileManager instance %p", sfm);
1760 
1761     // Notify SstFileManager about all sst files that already exist in
1762     // db_paths[0] and cf_paths[0] when the DB is opened.
1763 
1764     // SstFileManagerImpl needs to know sizes of the files. For files whose size
1765     // we already know (sst files that appear in manifest - typically that's the
1766     // vast majority of all files), we'll pass the size to SstFileManager.
1767     // For all other files SstFileManager will query the size from filesystem.
1768 
1769     std::vector<LiveFileMetaData> metadata;
1770 
1771     // TODO: Once GetLiveFilesMetaData supports blob files, update the logic
1772     // below to get known_file_sizes for blob files.
1773     impl->mutex_.Lock();
1774     impl->versions_->GetLiveFilesMetaData(&metadata);
1775     impl->mutex_.Unlock();
1776 
1777     std::unordered_map<std::string, uint64_t> known_file_sizes;
1778     for (const auto& md : metadata) {
1779       std::string name = md.name;
1780       if (!name.empty() && name[0] == '/') {
1781         name = name.substr(1);
1782       }
1783       known_file_sizes[name] = md.size;
1784     }
1785 
1786     std::vector<std::string> paths;
1787     paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1788     for (auto& cf : column_families) {
1789       if (!cf.options.cf_paths.empty()) {
1790         paths.emplace_back(cf.options.cf_paths[0].path);
1791       }
1792     }
1793     // Remove duplicate paths.
1794     std::sort(paths.begin(), paths.end());
1795     paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1796     for (auto& path : paths) {
1797       std::vector<std::string> existing_files;
1798       impl->immutable_db_options_.env->GetChildren(path, &existing_files)
1799           .PermitUncheckedError();  //**TODO: What do to on error?
1800       for (auto& file_name : existing_files) {
1801         uint64_t file_number;
1802         FileType file_type;
1803         std::string file_path = path + "/" + file_name;
1804         if (ParseFileName(file_name, &file_number, &file_type) &&
1805             (file_type == kTableFile || file_type == kBlobFile)) {
1806           // TODO: Check for errors from OnAddFile?
1807           if (known_file_sizes.count(file_name)) {
1808             // We're assuming that each sst file name exists in at most one of
1809             // the paths.
1810             sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
1811                 .PermitUncheckedError();
1812           } else {
1813             sfm->OnAddFile(file_path).PermitUncheckedError();
1814           }
1815         }
1816       }
1817     }
1818 
1819     // Reserve some disk buffer space. This is a heuristic - when we run out
1820     // of disk space, this ensures that there is atleast write_buffer_size
1821     // amount of free space before we resume DB writes. In low disk space
1822     // conditions, we want to avoid a lot of small L0 files due to frequent
1823     // WAL write failures and resultant forced flushes
1824     sfm->ReserveDiskBuffer(max_write_buffer_size,
1825                            impl->immutable_db_options_.db_paths[0].path);
1826   }
1827 
1828 #endif  // !ROCKSDB_LITE
1829 
1830   if (s.ok()) {
1831     ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1832                      impl);
1833     LogFlush(impl->immutable_db_options_.info_log);
1834     assert(impl->TEST_WALBufferIsEmpty());
1835     // If the assert above fails then we need to FlushWAL before returning
1836     // control back to the user.
1837     if (!persist_options_status.ok()) {
1838       s = Status::IOError(
1839           "DB::Open() failed --- Unable to persist Options file",
1840           persist_options_status.ToString());
1841     }
1842   } else {
1843     ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
1844                    "Persisting Option File error: %s",
1845                    persist_options_status.ToString().c_str());
1846   }
1847   if (s.ok()) {
1848     impl->StartPeriodicWorkScheduler();
1849   } else {
1850     for (auto* h : *handles) {
1851       delete h;
1852     }
1853     handles->clear();
1854     delete impl;
1855     *dbptr = nullptr;
1856   }
1857   return s;
1858 }
1859 }  // namespace ROCKSDB_NAMESPACE
1860