1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <cinttypes>
10
11 #include "db/builder.h"
12 #include "db/db_impl/db_impl.h"
13 #include "db/error_handler.h"
14 #include "db/periodic_work_scheduler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/filename.h"
17 #include "file/read_write_util.h"
18 #include "file/sst_file_manager_impl.h"
19 #include "file/writable_file_writer.h"
20 #include "logging/logging.h"
21 #include "monitoring/persistent_stats_history.h"
22 #include "options/options_helper.h"
23 #include "rocksdb/table.h"
24 #include "rocksdb/wal_filter.h"
25 #include "test_util/sync_point.h"
26 #include "util/rate_limiter.h"
27
28 namespace ROCKSDB_NAMESPACE {
SanitizeOptions(const std::string & dbname,const Options & src,bool read_only)29 Options SanitizeOptions(const std::string& dbname, const Options& src,
30 bool read_only) {
31 auto db_options = SanitizeOptions(dbname, DBOptions(src), read_only);
32 ImmutableDBOptions immutable_db_options(db_options);
33 auto cf_options =
34 SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
35 return Options(db_options, cf_options);
36 }
37
SanitizeOptions(const std::string & dbname,const DBOptions & src,bool read_only)38 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
39 bool read_only) {
40 DBOptions result(src);
41
42 if (result.env == nullptr) {
43 result.env = Env::Default();
44 }
45
46 // result.max_open_files means an "infinite" open files.
47 if (result.max_open_files != -1) {
48 int max_max_open_files = port::GetMaxOpenFiles();
49 if (max_max_open_files == -1) {
50 max_max_open_files = 0x400000;
51 }
52 ClipToRange(&result.max_open_files, 20, max_max_open_files);
53 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
54 &result.max_open_files);
55 }
56
57 if (result.info_log == nullptr && !read_only) {
58 Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
59 if (!s.ok()) {
60 // No place suitable for logging
61 result.info_log = nullptr;
62 }
63 }
64
65 if (!result.write_buffer_manager) {
66 result.write_buffer_manager.reset(
67 new WriteBufferManager(result.db_write_buffer_size));
68 }
69 auto bg_job_limits = DBImpl::GetBGJobLimits(
70 result.max_background_flushes, result.max_background_compactions,
71 result.max_background_jobs, true /* parallelize_compactions */);
72 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
73 Env::Priority::LOW);
74 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
75 Env::Priority::HIGH);
76
77 if (result.rate_limiter.get() != nullptr) {
78 if (result.bytes_per_sync == 0) {
79 result.bytes_per_sync = 1024 * 1024;
80 }
81 }
82
83 if (result.delayed_write_rate == 0) {
84 if (result.rate_limiter.get() != nullptr) {
85 result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
86 }
87 if (result.delayed_write_rate == 0) {
88 result.delayed_write_rate = 16 * 1024 * 1024;
89 }
90 }
91
92 if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
93 result.recycle_log_file_num = false;
94 }
95
96 if (result.recycle_log_file_num &&
97 (result.wal_recovery_mode ==
98 WALRecoveryMode::kTolerateCorruptedTailRecords ||
99 result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
100 result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
101 // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
102 // feature. WAL recycling expects recovery success upon encountering a
103 // corrupt record at the point where new data ends and recycled data
104 // remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
105 // upon encountering any such corrupt record, as it cannot differentiate
106 // between this and a real corruption, which would cause committed updates
107 // to be truncated -- a violation of the recovery guarantee.
108 // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
109 // recycle log file feature temporarily due to a bug found introducing a
110 // hole in the recovered data
111 // (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
112 // Besides this bug, we believe the features are fundamentally compatible.
113 result.recycle_log_file_num = 0;
114 }
115
116 if (result.db_paths.size() == 0) {
117 result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
118 } else if (result.wal_dir.empty()) {
119 // Use dbname as default
120 result.wal_dir = dbname;
121 }
122 if (!result.wal_dir.empty()) {
123 // If there is a wal_dir already set, check to see if the wal_dir is the
124 // same as the dbname AND the same as the db_path[0] (which must exist from
125 // a few lines ago). If the wal_dir matches both of these values, then clear
126 // the wal_dir value, which will make wal_dir == dbname. Most likely this
127 // condition was the result of reading an old options file where we forced
128 // wal_dir to be set (to dbname).
129 auto npath = NormalizePath(dbname + "/");
130 if (npath == NormalizePath(result.wal_dir + "/") &&
131 npath == NormalizePath(result.db_paths[0].path + "/")) {
132 result.wal_dir.clear();
133 }
134 }
135
136 if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
137 result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
138 }
139
140 if (result.use_direct_reads && result.compaction_readahead_size == 0) {
141 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
142 result.compaction_readahead_size = 1024 * 1024 * 2;
143 }
144
145 if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
146 result.new_table_reader_for_compaction_inputs = true;
147 }
148
149 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
150 // guarantee that consecutive log files have consecutive sequence id, which
151 // make recovery complicated.
152 if (result.allow_2pc) {
153 result.avoid_flush_during_recovery = false;
154 }
155
156 #ifndef ROCKSDB_LITE
157 ImmutableDBOptions immutable_db_options(result);
158 if (!immutable_db_options.IsWalDirSameAsDBPath()) {
159 // Either the WAL dir and db_paths[0]/db_name are not the same, or we
160 // cannot tell for sure. In either case, assume they're different and
161 // explicitly cleanup the trash log files (bypass DeleteScheduler)
162 // Do this first so even if we end up calling
163 // DeleteScheduler::CleanupDirectory on the same dir later, it will be
164 // safe
165 std::vector<std::string> filenames;
166 auto wal_dir = immutable_db_options.GetWalDir();
167 Status s = result.env->GetChildren(wal_dir, &filenames);
168 s.PermitUncheckedError(); //**TODO: What to do on error?
169 for (std::string& filename : filenames) {
170 if (filename.find(".log.trash", filename.length() -
171 std::string(".log.trash").length()) !=
172 std::string::npos) {
173 std::string trash_file = wal_dir + "/" + filename;
174 result.env->DeleteFile(trash_file).PermitUncheckedError();
175 }
176 }
177 }
178 // When the DB is stopped, it's possible that there are some .trash files that
179 // were not deleted yet, when we open the DB we will find these .trash files
180 // and schedule them to be deleted (or delete immediately if SstFileManager
181 // was not used)
182 auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
183 for (size_t i = 0; i < result.db_paths.size(); i++) {
184 DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
185 .PermitUncheckedError();
186 }
187
188 // Create a default SstFileManager for purposes of tracking compaction size
189 // and facilitating recovery from out of space errors.
190 if (result.sst_file_manager.get() == nullptr) {
191 std::shared_ptr<SstFileManager> sst_file_manager(
192 NewSstFileManager(result.env, result.info_log));
193 result.sst_file_manager = sst_file_manager;
194 }
195 #endif // !ROCKSDB_LITE
196
197 if (!result.paranoid_checks) {
198 result.skip_checking_sst_file_sizes_on_db_open = true;
199 ROCKS_LOG_INFO(result.info_log,
200 "file size check will be skipped during open.");
201 }
202
203 return result;
204 }
205
206 namespace {
ValidateOptionsByTable(const DBOptions & db_opts,const std::vector<ColumnFamilyDescriptor> & column_families)207 Status ValidateOptionsByTable(
208 const DBOptions& db_opts,
209 const std::vector<ColumnFamilyDescriptor>& column_families) {
210 Status s;
211 for (auto cf : column_families) {
212 s = ValidateOptions(db_opts, cf.options);
213 if (!s.ok()) {
214 return s;
215 }
216 }
217 return Status::OK();
218 }
219 } // namespace
220
ValidateOptions(const DBOptions & db_options,const std::vector<ColumnFamilyDescriptor> & column_families)221 Status DBImpl::ValidateOptions(
222 const DBOptions& db_options,
223 const std::vector<ColumnFamilyDescriptor>& column_families) {
224 Status s;
225 for (auto& cfd : column_families) {
226 s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
227 if (!s.ok()) {
228 return s;
229 }
230 }
231 s = ValidateOptions(db_options);
232 return s;
233 }
234
ValidateOptions(const DBOptions & db_options)235 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
236 if (db_options.db_paths.size() > 4) {
237 return Status::NotSupported(
238 "More than four DB paths are not supported yet. ");
239 }
240
241 if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
242 // Protect against assert in PosixMMapReadableFile constructor
243 return Status::NotSupported(
244 "If memory mapped reads (allow_mmap_reads) are enabled "
245 "then direct I/O reads (use_direct_reads) must be disabled. ");
246 }
247
248 if (db_options.allow_mmap_writes &&
249 db_options.use_direct_io_for_flush_and_compaction) {
250 return Status::NotSupported(
251 "If memory mapped writes (allow_mmap_writes) are enabled "
252 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
253 "be disabled. ");
254 }
255
256 if (db_options.keep_log_file_num == 0) {
257 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
258 }
259
260 if (db_options.unordered_write &&
261 !db_options.allow_concurrent_memtable_write) {
262 return Status::InvalidArgument(
263 "unordered_write is incompatible with !allow_concurrent_memtable_write");
264 }
265
266 if (db_options.unordered_write && db_options.enable_pipelined_write) {
267 return Status::InvalidArgument(
268 "unordered_write is incompatible with enable_pipelined_write");
269 }
270
271 if (db_options.atomic_flush && db_options.enable_pipelined_write) {
272 return Status::InvalidArgument(
273 "atomic_flush is incompatible with enable_pipelined_write");
274 }
275
276 // TODO remove this restriction
277 if (db_options.atomic_flush && db_options.best_efforts_recovery) {
278 return Status::InvalidArgument(
279 "atomic_flush is currently incompatible with best-efforts recovery");
280 }
281
282 return Status::OK();
283 }
284
NewDB(std::vector<std::string> * new_filenames)285 Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
286 VersionEdit new_db;
287 Status s = SetIdentityFile(env_, dbname_);
288 if (!s.ok()) {
289 return s;
290 }
291 if (immutable_db_options_.write_dbid_to_manifest) {
292 std::string temp_db_id;
293 GetDbIdentityFromIdentityFile(&temp_db_id);
294 new_db.SetDBId(temp_db_id);
295 }
296 new_db.SetLogNumber(0);
297 new_db.SetNextFile(2);
298 new_db.SetLastSequence(0);
299
300 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
301 const std::string manifest = DescriptorFileName(dbname_, 1);
302 {
303 if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
304 fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
305 }
306 std::unique_ptr<FSWritableFile> file;
307 FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
308 s = NewWritableFile(fs_.get(), manifest, &file, file_options);
309 if (!s.ok()) {
310 return s;
311 }
312 FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
313 file->SetPreallocationBlockSize(
314 immutable_db_options_.manifest_preallocation_size);
315 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
316 std::move(file), manifest, file_options, immutable_db_options_.clock,
317 io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
318 nullptr, tmp_set.Contains(FileType::kDescriptorFile),
319 tmp_set.Contains(FileType::kDescriptorFile)));
320 log::Writer log(std::move(file_writer), 0, false);
321 std::string record;
322 new_db.EncodeTo(&record);
323 s = log.AddRecord(record);
324 if (s.ok()) {
325 s = SyncManifest(&immutable_db_options_, log.file());
326 }
327 }
328 if (s.ok()) {
329 // Make "CURRENT" file that points to the new manifest file.
330 s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
331 if (new_filenames) {
332 new_filenames->emplace_back(
333 manifest.substr(manifest.find_last_of("/\\") + 1));
334 }
335 } else {
336 fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
337 }
338 return s;
339 }
340
CreateAndNewDirectory(FileSystem * fs,const std::string & dirname,std::unique_ptr<FSDirectory> * directory)341 IOStatus DBImpl::CreateAndNewDirectory(
342 FileSystem* fs, const std::string& dirname,
343 std::unique_ptr<FSDirectory>* directory) {
344 // We call CreateDirIfMissing() as the directory may already exist (if we
345 // are reopening a DB), when this happens we don't want creating the
346 // directory to cause an error. However, we need to check if creating the
347 // directory fails or else we may get an obscure message about the lock
348 // file not existing. One real-world example of this occurring is if
349 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
350 // when dbname_ is "dir/db" but when "dir" doesn't exist.
351 IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
352 if (!io_s.ok()) {
353 return io_s;
354 }
355 return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
356 }
357
SetDirectories(FileSystem * fs,const std::string & dbname,const std::string & wal_dir,const std::vector<DbPath> & data_paths)358 IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
359 const std::string& wal_dir,
360 const std::vector<DbPath>& data_paths) {
361 IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
362 if (!io_s.ok()) {
363 return io_s;
364 }
365 if (!wal_dir.empty() && dbname != wal_dir) {
366 io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
367 if (!io_s.ok()) {
368 return io_s;
369 }
370 }
371
372 data_dirs_.clear();
373 for (auto& p : data_paths) {
374 const std::string db_path = p.path;
375 if (db_path == dbname) {
376 data_dirs_.emplace_back(nullptr);
377 } else {
378 std::unique_ptr<FSDirectory> path_directory;
379 io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
380 if (!io_s.ok()) {
381 return io_s;
382 }
383 data_dirs_.emplace_back(path_directory.release());
384 }
385 }
386 assert(data_dirs_.size() == data_paths.size());
387 return IOStatus::OK();
388 }
389
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,bool error_if_wal_file_exists,bool error_if_data_exists_in_wals,uint64_t * recovered_seq)390 Status DBImpl::Recover(
391 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
392 bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
393 uint64_t* recovered_seq) {
394 mutex_.AssertHeld();
395
396 bool is_new_db = false;
397 assert(db_lock_ == nullptr);
398 std::vector<std::string> files_in_dbname;
399 if (!read_only) {
400 Status s = directories_.SetDirectories(fs_.get(), dbname_,
401 immutable_db_options_.wal_dir,
402 immutable_db_options_.db_paths);
403 if (!s.ok()) {
404 return s;
405 }
406
407 s = env_->LockFile(LockFileName(dbname_), &db_lock_);
408 if (!s.ok()) {
409 return s;
410 }
411
412 std::string current_fname = CurrentFileName(dbname_);
413 // Path to any MANIFEST file in the db dir. It does not matter which one.
414 // Since best-efforts recovery ignores CURRENT file, existence of a
415 // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
416 // can be found, a new db will be created.
417 std::string manifest_path;
418 if (!immutable_db_options_.best_efforts_recovery) {
419 s = env_->FileExists(current_fname);
420 } else {
421 s = Status::NotFound();
422 Status io_s = env_->GetChildren(dbname_, &files_in_dbname);
423 if (!io_s.ok()) {
424 s = io_s;
425 files_in_dbname.clear();
426 }
427 for (const std::string& file : files_in_dbname) {
428 uint64_t number = 0;
429 FileType type = kWalFile; // initialize
430 if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
431 // Found MANIFEST (descriptor log), thus best-efforts recovery does
432 // not have to treat the db as empty.
433 s = Status::OK();
434 manifest_path = dbname_ + "/" + file;
435 break;
436 }
437 }
438 }
439 if (s.IsNotFound()) {
440 if (immutable_db_options_.create_if_missing) {
441 s = NewDB(&files_in_dbname);
442 is_new_db = true;
443 if (!s.ok()) {
444 return s;
445 }
446 } else {
447 return Status::InvalidArgument(
448 current_fname, "does not exist (create_if_missing is false)");
449 }
450 } else if (s.ok()) {
451 if (immutable_db_options_.error_if_exists) {
452 return Status::InvalidArgument(dbname_,
453 "exists (error_if_exists is true)");
454 }
455 } else {
456 // Unexpected error reading file
457 assert(s.IsIOError());
458 return s;
459 }
460 // Verify compatibility of file_options_ and filesystem
461 {
462 std::unique_ptr<FSRandomAccessFile> idfile;
463 FileOptions customized_fs(file_options_);
464 customized_fs.use_direct_reads |=
465 immutable_db_options_.use_direct_io_for_flush_and_compaction;
466 const std::string& fname =
467 manifest_path.empty() ? current_fname : manifest_path;
468 s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
469 if (!s.ok()) {
470 std::string error_str = s.ToString();
471 // Check if unsupported Direct I/O is the root cause
472 customized_fs.use_direct_reads = false;
473 s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
474 if (s.ok()) {
475 return Status::InvalidArgument(
476 "Direct I/O is not supported by the specified DB.");
477 } else {
478 return Status::InvalidArgument(
479 "Found options incompatible with filesystem", error_str.c_str());
480 }
481 }
482 }
483 } else if (immutable_db_options_.best_efforts_recovery) {
484 assert(files_in_dbname.empty());
485 Status s = env_->GetChildren(dbname_, &files_in_dbname);
486 if (s.IsNotFound()) {
487 return Status::InvalidArgument(dbname_,
488 "does not exist (open for read only)");
489 } else if (s.IsIOError()) {
490 return s;
491 }
492 assert(s.ok());
493 }
494 assert(db_id_.empty());
495 Status s;
496 bool missing_table_file = false;
497 if (!immutable_db_options_.best_efforts_recovery) {
498 s = versions_->Recover(column_families, read_only, &db_id_);
499 } else {
500 assert(!files_in_dbname.empty());
501 s = versions_->TryRecover(column_families, read_only, files_in_dbname,
502 &db_id_, &missing_table_file);
503 if (s.ok()) {
504 // TryRecover may delete previous column_family_set_.
505 column_family_memtables_.reset(
506 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
507 }
508 }
509 if (!s.ok()) {
510 return s;
511 }
512 s = SetDBId(read_only);
513 if (s.ok() && !read_only) {
514 s = DeleteUnreferencedSstFiles();
515 }
516
517 if (immutable_db_options_.paranoid_checks && s.ok()) {
518 s = CheckConsistency();
519 }
520 if (s.ok() && !read_only) {
521 std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
522 for (auto cfd : *versions_->GetColumnFamilySet()) {
523 s = cfd->AddDirectories(&created_dirs);
524 if (!s.ok()) {
525 return s;
526 }
527 }
528 }
529 // DB mutex is already held
530 if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
531 s = InitPersistStatsColumnFamily();
532 }
533
534 std::vector<std::string> files_in_wal_dir;
535 if (s.ok()) {
536 // Initial max_total_in_memory_state_ before recovery wals. Log recovery
537 // may check this value to decide whether to flush.
538 max_total_in_memory_state_ = 0;
539 for (auto cfd : *versions_->GetColumnFamilySet()) {
540 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
541 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
542 mutable_cf_options->max_write_buffer_number;
543 }
544
545 SequenceNumber next_sequence(kMaxSequenceNumber);
546 default_cf_handle_ = new ColumnFamilyHandleImpl(
547 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
548 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
549 // TODO(Zhongyi): handle single_column_family_mode_ when
550 // persistent_stats is enabled
551 single_column_family_mode_ =
552 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
553
554 // Recover from all newer log files than the ones named in the
555 // descriptor (new log files may have been added by the previous
556 // incarnation without registering them in the descriptor).
557 //
558 // Note that prev_log_number() is no longer used, but we pay
559 // attention to it in case we are recovering a database
560 // produced by an older version of rocksdb.
561 auto wal_dir = immutable_db_options_.GetWalDir();
562 if (!immutable_db_options_.best_efforts_recovery) {
563 s = env_->GetChildren(wal_dir, &files_in_wal_dir);
564 }
565 if (s.IsNotFound()) {
566 return Status::InvalidArgument("wal_dir not found", wal_dir);
567 } else if (!s.ok()) {
568 return s;
569 }
570
571 std::unordered_map<uint64_t, std::string> wal_files;
572 for (const auto& file : files_in_wal_dir) {
573 uint64_t number;
574 FileType type;
575 if (ParseFileName(file, &number, &type) && type == kWalFile) {
576 if (is_new_db) {
577 return Status::Corruption(
578 "While creating a new Db, wal_dir contains "
579 "existing log file: ",
580 file);
581 } else {
582 wal_files[number] = LogFileName(wal_dir, number);
583 }
584 }
585 }
586
587 if (immutable_db_options_.track_and_verify_wals_in_manifest) {
588 if (!immutable_db_options_.best_efforts_recovery) {
589 // Verify WALs in MANIFEST.
590 s = versions_->GetWalSet().CheckWals(env_, wal_files);
591 } // else since best effort recovery does not recover from WALs, no need
592 // to check WALs.
593 } else if (!versions_->GetWalSet().GetWals().empty()) {
594 // Tracking is disabled, clear previously tracked WALs from MANIFEST,
595 // otherwise, in the future, if WAL tracking is enabled again,
596 // since the WALs deleted when WAL tracking is disabled are not persisted
597 // into MANIFEST, WAL check may fail.
598 VersionEdit edit;
599 WalNumber max_wal_number =
600 versions_->GetWalSet().GetWals().rbegin()->first;
601 edit.DeleteWalsBefore(max_wal_number + 1);
602 s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
603 }
604 if (!s.ok()) {
605 return s;
606 }
607
608 if (!wal_files.empty()) {
609 if (error_if_wal_file_exists) {
610 return Status::Corruption(
611 "The db was opened in readonly mode with error_if_wal_file_exists"
612 "flag but a WAL file already exists");
613 } else if (error_if_data_exists_in_wals) {
614 for (auto& wal_file : wal_files) {
615 uint64_t bytes;
616 s = env_->GetFileSize(wal_file.second, &bytes);
617 if (s.ok()) {
618 if (bytes > 0) {
619 return Status::Corruption(
620 "error_if_data_exists_in_wals is set but there are data "
621 " in WAL files.");
622 }
623 }
624 }
625 }
626 }
627
628 if (!wal_files.empty()) {
629 // Recover in the order in which the wals were generated
630 std::vector<uint64_t> wals;
631 wals.reserve(wal_files.size());
632 for (const auto& wal_file : wal_files) {
633 wals.push_back(wal_file.first);
634 }
635 std::sort(wals.begin(), wals.end());
636
637 bool corrupted_wal_found = false;
638 s = RecoverLogFiles(wals, &next_sequence, read_only,
639 &corrupted_wal_found);
640 if (corrupted_wal_found && recovered_seq != nullptr) {
641 *recovered_seq = next_sequence;
642 }
643 if (!s.ok()) {
644 // Clear memtables if recovery failed
645 for (auto cfd : *versions_->GetColumnFamilySet()) {
646 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
647 kMaxSequenceNumber);
648 }
649 }
650 }
651 }
652
653 if (read_only) {
654 // If we are opening as read-only, we need to update options_file_number_
655 // to reflect the most recent OPTIONS file. It does not matter for regular
656 // read-write db instance because options_file_number_ will later be
657 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
658 std::vector<std::string> filenames;
659 if (s.ok()) {
660 const std::string normalized_dbname = NormalizePath(dbname_);
661 const std::string normalized_wal_dir =
662 NormalizePath(immutable_db_options_.GetWalDir());
663 if (immutable_db_options_.best_efforts_recovery) {
664 filenames = std::move(files_in_dbname);
665 } else if (normalized_dbname == normalized_wal_dir) {
666 filenames = std::move(files_in_wal_dir);
667 } else {
668 s = env_->GetChildren(GetName(), &filenames);
669 }
670 }
671 if (s.ok()) {
672 uint64_t number = 0;
673 uint64_t options_file_number = 0;
674 FileType type;
675 for (const auto& fname : filenames) {
676 if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
677 options_file_number = std::max(number, options_file_number);
678 }
679 }
680 versions_->options_file_number_ = options_file_number;
681 uint64_t options_file_size = 0;
682 if (options_file_number > 0) {
683 s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
684 &options_file_size);
685 }
686 versions_->options_file_size_ = options_file_size;
687 }
688 }
689 return s;
690 }
691
PersistentStatsProcessFormatVersion()692 Status DBImpl::PersistentStatsProcessFormatVersion() {
693 mutex_.AssertHeld();
694 Status s;
695 // persist version when stats CF doesn't exist
696 bool should_persist_format_version = !persistent_stats_cfd_exists_;
697 mutex_.Unlock();
698 if (persistent_stats_cfd_exists_) {
699 // Check persistent stats format version compatibility. Drop and recreate
700 // persistent stats CF if format version is incompatible
701 uint64_t format_version_recovered = 0;
702 Status s_format = DecodePersistentStatsVersionNumber(
703 this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
704 uint64_t compatible_version_recovered = 0;
705 Status s_compatible = DecodePersistentStatsVersionNumber(
706 this, StatsVersionKeyType::kCompatibleVersion,
707 &compatible_version_recovered);
708 // abort reading from existing stats CF if any of following is true:
709 // 1. failed to read format version or compatible version from disk
710 // 2. sst's format version is greater than current format version, meaning
711 // this sst is encoded with a newer RocksDB release, and current compatible
712 // version is below the sst's compatible version
713 if (!s_format.ok() || !s_compatible.ok() ||
714 (kStatsCFCurrentFormatVersion < format_version_recovered &&
715 kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
716 if (!s_format.ok() || !s_compatible.ok()) {
717 ROCKS_LOG_WARN(
718 immutable_db_options_.info_log,
719 "Recreating persistent stats column family since reading "
720 "persistent stats version key failed. Format key: %s, compatible "
721 "key: %s",
722 s_format.ToString().c_str(), s_compatible.ToString().c_str());
723 } else {
724 ROCKS_LOG_WARN(
725 immutable_db_options_.info_log,
726 "Recreating persistent stats column family due to corrupted or "
727 "incompatible format version. Recovered format: %" PRIu64
728 "; recovered format compatible since: %" PRIu64 "\n",
729 format_version_recovered, compatible_version_recovered);
730 }
731 s = DropColumnFamily(persist_stats_cf_handle_);
732 if (s.ok()) {
733 s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
734 }
735 ColumnFamilyHandle* handle = nullptr;
736 if (s.ok()) {
737 ColumnFamilyOptions cfo;
738 OptimizeForPersistentStats(&cfo);
739 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
740 }
741 if (s.ok()) {
742 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
743 // should also persist version here because old stats CF is discarded
744 should_persist_format_version = true;
745 }
746 }
747 }
748 if (should_persist_format_version) {
749 // Persistent stats CF being created for the first time, need to write
750 // format version key
751 WriteBatch batch;
752 if (s.ok()) {
753 s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
754 ToString(kStatsCFCurrentFormatVersion));
755 }
756 if (s.ok()) {
757 s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
758 ToString(kStatsCFCompatibleFormatVersion));
759 }
760 if (s.ok()) {
761 WriteOptions wo;
762 wo.low_pri = true;
763 wo.no_slowdown = true;
764 wo.sync = false;
765 s = Write(wo, &batch);
766 }
767 }
768 mutex_.Lock();
769 return s;
770 }
771
InitPersistStatsColumnFamily()772 Status DBImpl::InitPersistStatsColumnFamily() {
773 mutex_.AssertHeld();
774 assert(!persist_stats_cf_handle_);
775 ColumnFamilyData* persistent_stats_cfd =
776 versions_->GetColumnFamilySet()->GetColumnFamily(
777 kPersistentStatsColumnFamilyName);
778 persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
779
780 Status s;
781 if (persistent_stats_cfd != nullptr) {
782 // We are recovering from a DB which already contains persistent stats CF,
783 // the CF is already created in VersionSet::ApplyOneVersionEdit, but
784 // column family handle was not. Need to explicitly create handle here.
785 persist_stats_cf_handle_ =
786 new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
787 } else {
788 mutex_.Unlock();
789 ColumnFamilyHandle* handle = nullptr;
790 ColumnFamilyOptions cfo;
791 OptimizeForPersistentStats(&cfo);
792 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
793 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
794 mutex_.Lock();
795 }
796 return s;
797 }
798
799 // REQUIRES: wal_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & wal_numbers,SequenceNumber * next_sequence,bool read_only,bool * corrupted_wal_found)800 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
801 SequenceNumber* next_sequence, bool read_only,
802 bool* corrupted_wal_found) {
803 struct LogReporter : public log::Reader::Reporter {
804 Env* env;
805 Logger* info_log;
806 const char* fname;
807 Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
808 void Corruption(size_t bytes, const Status& s) override {
809 ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
810 (status == nullptr ? "(ignoring error) " : ""), fname,
811 static_cast<int>(bytes), s.ToString().c_str());
812 if (status != nullptr && status->ok()) {
813 *status = s;
814 }
815 }
816 };
817
818 mutex_.AssertHeld();
819 Status status;
820 std::unordered_map<int, VersionEdit> version_edits;
821 // no need to refcount because iteration is under mutex
822 for (auto cfd : *versions_->GetColumnFamilySet()) {
823 VersionEdit edit;
824 edit.SetColumnFamily(cfd->GetID());
825 version_edits.insert({cfd->GetID(), edit});
826 }
827 int job_id = next_job_id_.fetch_add(1);
828 {
829 auto stream = event_logger_.Log();
830 stream << "job" << job_id << "event"
831 << "recovery_started";
832 stream << "wal_files";
833 stream.StartArray();
834 for (auto wal_number : wal_numbers) {
835 stream << wal_number;
836 }
837 stream.EndArray();
838 }
839
840 #ifndef ROCKSDB_LITE
841 if (immutable_db_options_.wal_filter != nullptr) {
842 std::map<std::string, uint32_t> cf_name_id_map;
843 std::map<uint32_t, uint64_t> cf_lognumber_map;
844 for (auto cfd : *versions_->GetColumnFamilySet()) {
845 cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
846 cf_lognumber_map.insert(
847 std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
848 }
849
850 immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
851 cf_name_id_map);
852 }
853 #endif
854
855 bool stop_replay_by_wal_filter = false;
856 bool stop_replay_for_corruption = false;
857 bool flushed = false;
858 uint64_t corrupted_wal_number = kMaxSequenceNumber;
859 uint64_t min_wal_number = MinLogNumberToKeep();
860 for (auto wal_number : wal_numbers) {
861 if (wal_number < min_wal_number) {
862 ROCKS_LOG_INFO(immutable_db_options_.info_log,
863 "Skipping log #%" PRIu64
864 " since it is older than min log to keep #%" PRIu64,
865 wal_number, min_wal_number);
866 continue;
867 }
868 // The previous incarnation may not have written any MANIFEST
869 // records after allocating this log number. So we manually
870 // update the file number allocation counter in VersionSet.
871 versions_->MarkFileNumberUsed(wal_number);
872 // Open the log file
873 std::string fname =
874 LogFileName(immutable_db_options_.GetWalDir(), wal_number);
875
876 ROCKS_LOG_INFO(immutable_db_options_.info_log,
877 "Recovering log #%" PRIu64 " mode %d", wal_number,
878 static_cast<int>(immutable_db_options_.wal_recovery_mode));
879 auto logFileDropped = [this, &fname]() {
880 uint64_t bytes;
881 if (env_->GetFileSize(fname, &bytes).ok()) {
882 auto info_log = immutable_db_options_.info_log.get();
883 ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
884 static_cast<int>(bytes));
885 }
886 };
887 if (stop_replay_by_wal_filter) {
888 logFileDropped();
889 continue;
890 }
891
892 std::unique_ptr<SequentialFileReader> file_reader;
893 {
894 std::unique_ptr<FSSequentialFile> file;
895 status = fs_->NewSequentialFile(fname,
896 fs_->OptimizeForLogRead(file_options_),
897 &file, nullptr);
898 if (!status.ok()) {
899 MaybeIgnoreError(&status);
900 if (!status.ok()) {
901 return status;
902 } else {
903 // Fail with one log file, but that's ok.
904 // Try next one.
905 continue;
906 }
907 }
908 file_reader.reset(new SequentialFileReader(
909 std::move(file), fname, immutable_db_options_.log_readahead_size,
910 io_tracer_));
911 }
912
913 // Create the log reader.
914 LogReporter reporter;
915 reporter.env = env_;
916 reporter.info_log = immutable_db_options_.info_log.get();
917 reporter.fname = fname.c_str();
918 if (!immutable_db_options_.paranoid_checks ||
919 immutable_db_options_.wal_recovery_mode ==
920 WALRecoveryMode::kSkipAnyCorruptedRecords) {
921 reporter.status = nullptr;
922 } else {
923 reporter.status = &status;
924 }
925 // We intentially make log::Reader do checksumming even if
926 // paranoid_checks==false so that corruptions cause entire commits
927 // to be skipped instead of propagating bad information (like overly
928 // large sequence numbers).
929 log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
930 &reporter, true /*checksum*/, wal_number);
931
932 // Determine if we should tolerate incomplete records at the tail end of the
933 // Read all the records and add to a memtable
934 std::string scratch;
935 Slice record;
936 WriteBatch batch;
937
938 TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
939 /*arg=*/nullptr);
940 while (!stop_replay_by_wal_filter &&
941 reader.ReadRecord(&record, &scratch,
942 immutable_db_options_.wal_recovery_mode) &&
943 status.ok()) {
944 if (record.size() < WriteBatchInternal::kHeader) {
945 reporter.Corruption(record.size(),
946 Status::Corruption("log record too small"));
947 continue;
948 }
949
950 status = WriteBatchInternal::SetContents(&batch, record);
951 if (!status.ok()) {
952 return status;
953 }
954 SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
955
956 if (immutable_db_options_.wal_recovery_mode ==
957 WALRecoveryMode::kPointInTimeRecovery) {
958 // In point-in-time recovery mode, if sequence id of log files are
959 // consecutive, we continue recovery despite corruption. This could
960 // happen when we open and write to a corrupted DB, where sequence id
961 // will start from the last sequence id we recovered.
962 if (sequence == *next_sequence) {
963 stop_replay_for_corruption = false;
964 }
965 if (stop_replay_for_corruption) {
966 logFileDropped();
967 break;
968 }
969 }
970
971 #ifndef ROCKSDB_LITE
972 if (immutable_db_options_.wal_filter != nullptr) {
973 WriteBatch new_batch;
974 bool batch_changed = false;
975
976 WalFilter::WalProcessingOption wal_processing_option =
977 immutable_db_options_.wal_filter->LogRecordFound(
978 wal_number, fname, batch, &new_batch, &batch_changed);
979
980 switch (wal_processing_option) {
981 case WalFilter::WalProcessingOption::kContinueProcessing:
982 // do nothing, proceeed normally
983 break;
984 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
985 // skip current record
986 continue;
987 case WalFilter::WalProcessingOption::kStopReplay:
988 // skip current record and stop replay
989 stop_replay_by_wal_filter = true;
990 continue;
991 case WalFilter::WalProcessingOption::kCorruptedRecord: {
992 status =
993 Status::Corruption("Corruption reported by Wal Filter ",
994 immutable_db_options_.wal_filter->Name());
995 MaybeIgnoreError(&status);
996 if (!status.ok()) {
997 reporter.Corruption(record.size(), status);
998 continue;
999 }
1000 break;
1001 }
1002 default: {
1003 assert(false); // unhandled case
1004 status = Status::NotSupported(
1005 "Unknown WalProcessingOption returned"
1006 " by Wal Filter ",
1007 immutable_db_options_.wal_filter->Name());
1008 MaybeIgnoreError(&status);
1009 if (!status.ok()) {
1010 return status;
1011 } else {
1012 // Ignore the error with current record processing.
1013 continue;
1014 }
1015 }
1016 }
1017
1018 if (batch_changed) {
1019 // Make sure that the count in the new batch is
1020 // within the orignal count.
1021 int new_count = WriteBatchInternal::Count(&new_batch);
1022 int original_count = WriteBatchInternal::Count(&batch);
1023 if (new_count > original_count) {
1024 ROCKS_LOG_FATAL(
1025 immutable_db_options_.info_log,
1026 "Recovering log #%" PRIu64
1027 " mode %d log filter %s returned "
1028 "more records (%d) than original (%d) which is not allowed. "
1029 "Aborting recovery.",
1030 wal_number,
1031 static_cast<int>(immutable_db_options_.wal_recovery_mode),
1032 immutable_db_options_.wal_filter->Name(), new_count,
1033 original_count);
1034 status = Status::NotSupported(
1035 "More than original # of records "
1036 "returned by Wal Filter ",
1037 immutable_db_options_.wal_filter->Name());
1038 return status;
1039 }
1040 // Set the same sequence number in the new_batch
1041 // as the original batch.
1042 WriteBatchInternal::SetSequence(&new_batch,
1043 WriteBatchInternal::Sequence(&batch));
1044 batch = new_batch;
1045 }
1046 }
1047 #endif // ROCKSDB_LITE
1048
1049 // If column family was not found, it might mean that the WAL write
1050 // batch references to the column family that was dropped after the
1051 // insert. We don't want to fail the whole write batch in that case --
1052 // we just ignore the update.
1053 // That's why we set ignore missing column families to true
1054 bool has_valid_writes = false;
1055 status = WriteBatchInternal::InsertInto(
1056 &batch, column_family_memtables_.get(), &flush_scheduler_,
1057 &trim_history_scheduler_, true, wal_number, this,
1058 false /* concurrent_memtable_writes */, next_sequence,
1059 &has_valid_writes, seq_per_batch_, batch_per_txn_);
1060 MaybeIgnoreError(&status);
1061 if (!status.ok()) {
1062 // We are treating this as a failure while reading since we read valid
1063 // blocks that do not form coherent data
1064 reporter.Corruption(record.size(), status);
1065 continue;
1066 }
1067
1068 if (has_valid_writes && !read_only) {
1069 // we can do this because this is called before client has access to the
1070 // DB and there is only a single thread operating on DB
1071 ColumnFamilyData* cfd;
1072
1073 while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1074 cfd->UnrefAndTryDelete();
1075 // If this asserts, it means that InsertInto failed in
1076 // filtering updates to already-flushed column families
1077 assert(cfd->GetLogNumber() <= wal_number);
1078 auto iter = version_edits.find(cfd->GetID());
1079 assert(iter != version_edits.end());
1080 VersionEdit* edit = &iter->second;
1081 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1082 if (!status.ok()) {
1083 // Reflect errors immediately so that conditions like full
1084 // file-systems cause the DB::Open() to fail.
1085 return status;
1086 }
1087 flushed = true;
1088
1089 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1090 *next_sequence);
1091 }
1092 }
1093 }
1094
1095 if (!status.ok()) {
1096 if (status.IsNotSupported()) {
1097 // We should not treat NotSupported as corruption. It is rather a clear
1098 // sign that we are processing a WAL that is produced by an incompatible
1099 // version of the code.
1100 return status;
1101 }
1102 if (immutable_db_options_.wal_recovery_mode ==
1103 WALRecoveryMode::kSkipAnyCorruptedRecords) {
1104 // We should ignore all errors unconditionally
1105 status = Status::OK();
1106 } else if (immutable_db_options_.wal_recovery_mode ==
1107 WALRecoveryMode::kPointInTimeRecovery) {
1108 if (status.IsIOError()) {
1109 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1110 "IOError during point-in-time reading log #%" PRIu64
1111 " seq #%" PRIu64
1112 ". %s. This likely mean loss of synced WAL, "
1113 "thus recovery fails.",
1114 wal_number, *next_sequence,
1115 status.ToString().c_str());
1116 return status;
1117 }
1118 // We should ignore the error but not continue replaying
1119 status = Status::OK();
1120 stop_replay_for_corruption = true;
1121 corrupted_wal_number = wal_number;
1122 if (corrupted_wal_found != nullptr) {
1123 *corrupted_wal_found = true;
1124 }
1125 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1126 "Point in time recovered to log #%" PRIu64
1127 " seq #%" PRIu64,
1128 wal_number, *next_sequence);
1129 } else {
1130 assert(immutable_db_options_.wal_recovery_mode ==
1131 WALRecoveryMode::kTolerateCorruptedTailRecords ||
1132 immutable_db_options_.wal_recovery_mode ==
1133 WALRecoveryMode::kAbsoluteConsistency);
1134 return status;
1135 }
1136 }
1137
1138 flush_scheduler_.Clear();
1139 trim_history_scheduler_.Clear();
1140 auto last_sequence = *next_sequence - 1;
1141 if ((*next_sequence != kMaxSequenceNumber) &&
1142 (versions_->LastSequence() <= last_sequence)) {
1143 versions_->SetLastAllocatedSequence(last_sequence);
1144 versions_->SetLastPublishedSequence(last_sequence);
1145 versions_->SetLastSequence(last_sequence);
1146 }
1147 }
1148 // Compare the corrupted log number to all columnfamily's current log number.
1149 // Abort Open() if any column family's log number is greater than
1150 // the corrupted log number, which means CF contains data beyond the point of
1151 // corruption. This could during PIT recovery when the WAL is corrupted and
1152 // some (but not all) CFs are flushed
1153 // Exclude the PIT case where no log is dropped after the corruption point.
1154 // This is to cover the case for empty wals after corrupted log, in which we
1155 // don't reset stop_replay_for_corruption.
1156 if (stop_replay_for_corruption == true &&
1157 (immutable_db_options_.wal_recovery_mode ==
1158 WALRecoveryMode::kPointInTimeRecovery ||
1159 immutable_db_options_.wal_recovery_mode ==
1160 WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1161 for (auto cfd : *versions_->GetColumnFamilySet()) {
1162 // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
1163 // the CF is still consistent: If a new column family is created during
1164 // the flush and the WAL sync fails at the same time, the new CF points to
1165 // the new WAL but the old WAL is curropted. Since the new CF is empty, it
1166 // is still consistent. We add the check of CF sst file size to avoid the
1167 // false positive alert.
1168
1169 // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
1170 // the ignorance of a very rare inconsistency case caused in data
1171 // canclation. One CF is empty due to KV deletion. But those operations
1172 // are in the WAL. If the WAL is corrupted, the status of this CF might
1173 // not be consistent with others. However, the consistency check will be
1174 // bypassed due to empty CF.
1175 // TODO: a better and complete implementation is needed to ensure strict
1176 // consistency check in WAL recovery including hanlding the tailing
1177 // issues.
1178 if (cfd->GetLogNumber() > corrupted_wal_number &&
1179 cfd->GetLiveSstFilesSize() > 0) {
1180 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1181 "Column family inconsistency: SST file contains data"
1182 " beyond the point of corruption.");
1183 return Status::Corruption("SST file is ahead of WALs in CF " +
1184 cfd->GetName());
1185 }
1186 }
1187 }
1188
1189 // True if there's any data in the WALs; if not, we can skip re-processing
1190 // them later
1191 bool data_seen = false;
1192 if (!read_only) {
1193 // no need to refcount since client still doesn't have access
1194 // to the DB and can not drop column families while we iterate
1195 const WalNumber max_wal_number = wal_numbers.back();
1196 for (auto cfd : *versions_->GetColumnFamilySet()) {
1197 auto iter = version_edits.find(cfd->GetID());
1198 assert(iter != version_edits.end());
1199 VersionEdit* edit = &iter->second;
1200
1201 if (cfd->GetLogNumber() > max_wal_number) {
1202 // Column family cfd has already flushed the data
1203 // from all wals. Memtable has to be empty because
1204 // we filter the updates based on wal_number
1205 // (in WriteBatch::InsertInto)
1206 assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1207 assert(edit->NumEntries() == 0);
1208 continue;
1209 }
1210
1211 TEST_SYNC_POINT_CALLBACK(
1212 "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1213
1214 // flush the final memtable (if non-empty)
1215 if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1216 // If flush happened in the middle of recovery (e.g. due to memtable
1217 // being full), we flush at the end. Otherwise we'll need to record
1218 // where we were on last flush, which make the logic complicated.
1219 if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1220 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1221 if (!status.ok()) {
1222 // Recovery failed
1223 break;
1224 }
1225 flushed = true;
1226
1227 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1228 versions_->LastSequence());
1229 }
1230 data_seen = true;
1231 }
1232
1233 // Update the log number info in the version edit corresponding to this
1234 // column family. Note that the version edits will be written to MANIFEST
1235 // together later.
1236 // writing wal_number in the manifest means that any log file
1237 // with number strongly less than (wal_number + 1) is already
1238 // recovered and should be ignored on next reincarnation.
1239 // Since we already recovered max_wal_number, we want all wals
1240 // with numbers `<= max_wal_number` (includes this one) to be ignored
1241 if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1242 edit->SetLogNumber(max_wal_number + 1);
1243 }
1244 }
1245 if (status.ok()) {
1246 // we must mark the next log number as used, even though it's
1247 // not actually used. that is because VersionSet assumes
1248 // VersionSet::next_file_number_ always to be strictly greater than any
1249 // log number
1250 versions_->MarkFileNumberUsed(max_wal_number + 1);
1251
1252 autovector<ColumnFamilyData*> cfds;
1253 autovector<const MutableCFOptions*> cf_opts;
1254 autovector<autovector<VersionEdit*>> edit_lists;
1255 for (auto* cfd : *versions_->GetColumnFamilySet()) {
1256 cfds.push_back(cfd);
1257 cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1258 auto iter = version_edits.find(cfd->GetID());
1259 assert(iter != version_edits.end());
1260 edit_lists.push_back({&iter->second});
1261 }
1262
1263 std::unique_ptr<VersionEdit> wal_deletion;
1264 if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1265 wal_deletion.reset(new VersionEdit);
1266 wal_deletion->DeleteWalsBefore(max_wal_number + 1);
1267 edit_lists.back().push_back(wal_deletion.get());
1268 }
1269
1270 // write MANIFEST with update
1271 status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1272 directories_.GetDbDir(),
1273 /*new_descriptor_log=*/true);
1274 }
1275 }
1276
1277 if (status.ok()) {
1278 if (data_seen && !flushed) {
1279 status = RestoreAliveLogFiles(wal_numbers);
1280 } else {
1281 // If there's no data in the WAL, or we flushed all the data, still
1282 // truncate the log file. If the process goes into a crash loop before
1283 // the file is deleted, the preallocated space will never get freed.
1284 const bool truncate = !read_only;
1285 GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
1286 .PermitUncheckedError();
1287 }
1288 }
1289
1290 event_logger_.Log() << "job" << job_id << "event"
1291 << "recovery_finished";
1292
1293 return status;
1294 }
1295
GetLogSizeAndMaybeTruncate(uint64_t wal_number,bool truncate,LogFileNumberSize * log_ptr)1296 Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
1297 LogFileNumberSize* log_ptr) {
1298 LogFileNumberSize log(wal_number);
1299 std::string fname =
1300 LogFileName(immutable_db_options_.GetWalDir(), wal_number);
1301 Status s;
1302 // This gets the appear size of the wals, not including preallocated space.
1303 s = env_->GetFileSize(fname, &log.size);
1304 if (s.ok() && truncate) {
1305 std::unique_ptr<FSWritableFile> last_log;
1306 Status truncate_status = fs_->ReopenWritableFile(
1307 fname,
1308 fs_->OptimizeForLogWrite(
1309 file_options_,
1310 BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1311 &last_log, nullptr);
1312 if (truncate_status.ok()) {
1313 truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1314 }
1315 if (truncate_status.ok()) {
1316 truncate_status = last_log->Close(IOOptions(), nullptr);
1317 }
1318 // Not a critical error if fail to truncate.
1319 if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
1320 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1321 "Failed to truncate log #%" PRIu64 ": %s", wal_number,
1322 truncate_status.ToString().c_str());
1323 }
1324 }
1325 if (log_ptr) {
1326 *log_ptr = log;
1327 }
1328 return s;
1329 }
1330
RestoreAliveLogFiles(const std::vector<uint64_t> & wal_numbers)1331 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
1332 if (wal_numbers.empty()) {
1333 return Status::OK();
1334 }
1335 Status s;
1336 mutex_.AssertHeld();
1337 assert(immutable_db_options_.avoid_flush_during_recovery);
1338 if (two_write_queues_) {
1339 log_write_mutex_.Lock();
1340 }
1341 // Mark these as alive so they'll be considered for deletion later by
1342 // FindObsoleteFiles()
1343 total_log_size_ = 0;
1344 log_empty_ = false;
1345 for (auto wal_number : wal_numbers) {
1346 // We preallocate space for wals, but then after a crash and restart, those
1347 // preallocated space are not needed anymore. It is likely only the last
1348 // log has such preallocated space, so we only truncate for the last log.
1349 LogFileNumberSize log;
1350 s = GetLogSizeAndMaybeTruncate(
1351 wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
1352 if (!s.ok()) {
1353 break;
1354 }
1355 total_log_size_ += log.size;
1356 alive_log_files_.push_back(log);
1357 }
1358 if (two_write_queues_) {
1359 log_write_mutex_.Unlock();
1360 }
1361 return s;
1362 }
1363
WriteLevel0TableForRecovery(int job_id,ColumnFamilyData * cfd,MemTable * mem,VersionEdit * edit)1364 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1365 MemTable* mem, VersionEdit* edit) {
1366 mutex_.AssertHeld();
1367 const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
1368
1369 FileMetaData meta;
1370 std::vector<BlobFileAddition> blob_file_additions;
1371
1372 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1373 new std::list<uint64_t>::iterator(
1374 CaptureCurrentFileNumberInPendingOutputs()));
1375 meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1376 ReadOptions ro;
1377 ro.total_order_seek = true;
1378 Arena arena;
1379 Status s;
1380 TableProperties table_properties;
1381 {
1382 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1383 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1384 "[%s] [WriteLevel0TableForRecovery]"
1385 " Level-0 table #%" PRIu64 ": started",
1386 cfd->GetName().c_str(), meta.fd.GetNumber());
1387
1388 // Get the latest mutable cf options while the mutex is still locked
1389 const MutableCFOptions mutable_cf_options =
1390 *cfd->GetLatestMutableCFOptions();
1391 bool paranoid_file_checks =
1392 cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1393
1394 int64_t _current_time = 0;
1395 immutable_db_options_.clock->GetCurrentTime(&_current_time)
1396 .PermitUncheckedError(); // ignore error
1397 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1398 meta.oldest_ancester_time = current_time;
1399
1400 {
1401 auto write_hint = cfd->CalculateSSTWriteHint(0);
1402 mutex_.Unlock();
1403
1404 SequenceNumber earliest_write_conflict_snapshot;
1405 std::vector<SequenceNumber> snapshot_seqs =
1406 snapshots_.GetAll(&earliest_write_conflict_snapshot);
1407 auto snapshot_checker = snapshot_checker_.get();
1408 if (use_custom_gc_ && snapshot_checker == nullptr) {
1409 snapshot_checker = DisableGCSnapshotChecker::Instance();
1410 }
1411 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1412 range_del_iters;
1413 auto range_del_iter =
1414 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1415 if (range_del_iter != nullptr) {
1416 range_del_iters.emplace_back(range_del_iter);
1417 }
1418
1419 IOStatus io_s;
1420 TableBuilderOptions tboptions(
1421 *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
1422 cfd->int_tbl_prop_collector_factories(),
1423 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1424 mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
1425 0 /* level */, false /* is_bottommost */,
1426 TableFileCreationReason::kRecovery, current_time,
1427 0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_,
1428 db_session_id_, 0 /* target_file_size */, meta.fd.GetNumber());
1429 s = BuildTable(
1430 dbname_, versions_.get(), immutable_db_options_, tboptions,
1431 file_options_for_compaction_, cfd->table_cache(), iter.get(),
1432 std::move(range_del_iters), &meta, &blob_file_additions,
1433 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1434 paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
1435 BlobFileCreationReason::kRecovery, &event_logger_, job_id,
1436 Env::IO_HIGH, nullptr /* table_properties */, write_hint,
1437 nullptr /*full_history_ts_low*/, &blob_callback_);
1438 LogFlush(immutable_db_options_.info_log);
1439 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1440 "[%s] [WriteLevel0TableForRecovery]"
1441 " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1442 cfd->GetName().c_str(), meta.fd.GetNumber(),
1443 meta.fd.GetFileSize(), s.ToString().c_str());
1444 mutex_.Lock();
1445
1446 // TODO(AR) is this ok?
1447 if (!io_s.ok() && s.ok()) {
1448 s = io_s;
1449 }
1450 }
1451 }
1452 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1453
1454 // Note that if file_size is zero, the file has been deleted and
1455 // should not be added to the manifest.
1456 const bool has_output = meta.fd.GetFileSize() > 0;
1457
1458 constexpr int level = 0;
1459
1460 if (s.ok() && has_output) {
1461 edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1462 meta.fd.GetFileSize(), meta.smallest, meta.largest,
1463 meta.fd.smallest_seqno, meta.fd.largest_seqno,
1464 meta.marked_for_compaction, meta.oldest_blob_file_number,
1465 meta.oldest_ancester_time, meta.file_creation_time,
1466 meta.file_checksum, meta.file_checksum_func_name);
1467
1468 for (const auto& blob : blob_file_additions) {
1469 edit->AddBlobFile(blob);
1470 }
1471 }
1472
1473 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1474 stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
1475
1476 if (has_output) {
1477 stats.bytes_written = meta.fd.GetFileSize();
1478 stats.num_output_files = 1;
1479 }
1480
1481 const auto& blobs = edit->GetBlobFileAdditions();
1482 for (const auto& blob : blobs) {
1483 stats.bytes_written_blob += blob.GetTotalBlobBytes();
1484 }
1485
1486 stats.num_output_files_blob = static_cast<int>(blobs.size());
1487
1488 cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1489 cfd->internal_stats()->AddCFStats(
1490 InternalStats::BYTES_FLUSHED,
1491 stats.bytes_written + stats.bytes_written_blob);
1492 RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1493 return s;
1494 }
1495
Open(const Options & options,const std::string & dbname,DB ** dbptr)1496 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1497 DBOptions db_options(options);
1498 ColumnFamilyOptions cf_options(options);
1499 std::vector<ColumnFamilyDescriptor> column_families;
1500 column_families.push_back(
1501 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1502 if (db_options.persist_stats_to_disk) {
1503 column_families.push_back(
1504 ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1505 }
1506 std::vector<ColumnFamilyHandle*> handles;
1507 Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1508 if (s.ok()) {
1509 if (db_options.persist_stats_to_disk) {
1510 assert(handles.size() == 2);
1511 } else {
1512 assert(handles.size() == 1);
1513 }
1514 // i can delete the handle since DBImpl is always holding a reference to
1515 // default column family
1516 if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1517 delete handles[1];
1518 }
1519 delete handles[0];
1520 }
1521 return s;
1522 }
1523
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)1524 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1525 const std::vector<ColumnFamilyDescriptor>& column_families,
1526 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1527 const bool kSeqPerBatch = true;
1528 const bool kBatchPerTxn = true;
1529 return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1530 !kSeqPerBatch, kBatchPerTxn);
1531 }
1532
CreateWAL(uint64_t log_file_num,uint64_t recycle_log_number,size_t preallocate_block_size,log::Writer ** new_log)1533 IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1534 size_t preallocate_block_size,
1535 log::Writer** new_log) {
1536 IOStatus io_s;
1537 std::unique_ptr<FSWritableFile> lfile;
1538
1539 DBOptions db_options =
1540 BuildDBOptions(immutable_db_options_, mutable_db_options_);
1541 FileOptions opt_file_options =
1542 fs_->OptimizeForLogWrite(file_options_, db_options);
1543 std::string wal_dir = immutable_db_options_.GetWalDir();
1544 std::string log_fname = LogFileName(wal_dir, log_file_num);
1545
1546 if (recycle_log_number) {
1547 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1548 "reusing log %" PRIu64 " from recycle list\n",
1549 recycle_log_number);
1550 std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
1551 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1552 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1553 io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1554 &lfile, /*dbg=*/nullptr);
1555 } else {
1556 io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1557 }
1558
1559 if (io_s.ok()) {
1560 lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1561 lfile->SetPreallocationBlockSize(preallocate_block_size);
1562
1563 const auto& listeners = immutable_db_options_.listeners;
1564 FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
1565 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1566 std::move(lfile), log_fname, opt_file_options,
1567 immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
1568 nullptr, tmp_set.Contains(FileType::kWalFile),
1569 tmp_set.Contains(FileType::kWalFile)));
1570 *new_log = new log::Writer(std::move(file_writer), log_file_num,
1571 immutable_db_options_.recycle_log_file_num > 0,
1572 immutable_db_options_.manual_wal_flush);
1573 }
1574 return io_s;
1575 }
1576
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,const bool seq_per_batch,const bool batch_per_txn)1577 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1578 const std::vector<ColumnFamilyDescriptor>& column_families,
1579 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1580 const bool seq_per_batch, const bool batch_per_txn) {
1581 Status s = ValidateOptionsByTable(db_options, column_families);
1582 if (!s.ok()) {
1583 return s;
1584 }
1585
1586 s = ValidateOptions(db_options, column_families);
1587 if (!s.ok()) {
1588 return s;
1589 }
1590
1591 *dbptr = nullptr;
1592 handles->clear();
1593
1594 size_t max_write_buffer_size = 0;
1595 for (auto cf : column_families) {
1596 max_write_buffer_size =
1597 std::max(max_write_buffer_size, cf.options.write_buffer_size);
1598 }
1599
1600 DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1601 s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
1602 if (s.ok()) {
1603 std::vector<std::string> paths;
1604 for (auto& db_path : impl->immutable_db_options_.db_paths) {
1605 paths.emplace_back(db_path.path);
1606 }
1607 for (auto& cf : column_families) {
1608 for (auto& cf_path : cf.options.cf_paths) {
1609 paths.emplace_back(cf_path.path);
1610 }
1611 }
1612 for (auto& path : paths) {
1613 s = impl->env_->CreateDirIfMissing(path);
1614 if (!s.ok()) {
1615 break;
1616 }
1617 }
1618
1619 // For recovery from NoSpace() error, we can only handle
1620 // the case where the database is stored in a single path
1621 if (paths.size() <= 1) {
1622 impl->error_handler_.EnableAutoRecovery();
1623 }
1624 }
1625 if (s.ok()) {
1626 s = impl->CreateArchivalDirectory();
1627 }
1628 if (!s.ok()) {
1629 delete impl;
1630 return s;
1631 }
1632
1633 impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
1634
1635 impl->mutex_.Lock();
1636 // Handles create_if_missing, error_if_exists
1637 uint64_t recovered_seq(kMaxSequenceNumber);
1638 s = impl->Recover(column_families, false, false, false, &recovered_seq);
1639 if (s.ok()) {
1640 uint64_t new_log_number = impl->versions_->NewFileNumber();
1641 log::Writer* new_log = nullptr;
1642 const size_t preallocate_block_size =
1643 impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1644 s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1645 preallocate_block_size, &new_log);
1646 if (s.ok()) {
1647 InstrumentedMutexLock wl(&impl->log_write_mutex_);
1648 impl->logfile_number_ = new_log_number;
1649 assert(new_log != nullptr);
1650 assert(impl->logs_.empty());
1651 impl->logs_.emplace_back(new_log_number, new_log);
1652 }
1653
1654 if (s.ok()) {
1655 // set column family handles
1656 for (auto cf : column_families) {
1657 auto cfd =
1658 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1659 if (cfd != nullptr) {
1660 handles->push_back(
1661 new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1662 impl->NewThreadStatusCfInfo(cfd);
1663 } else {
1664 if (db_options.create_missing_column_families) {
1665 // missing column family, create it
1666 ColumnFamilyHandle* handle;
1667 impl->mutex_.Unlock();
1668 s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1669 impl->mutex_.Lock();
1670 if (s.ok()) {
1671 handles->push_back(handle);
1672 } else {
1673 break;
1674 }
1675 } else {
1676 s = Status::InvalidArgument("Column family not found", cf.name);
1677 break;
1678 }
1679 }
1680 }
1681 }
1682 if (s.ok()) {
1683 SuperVersionContext sv_context(/* create_superversion */ true);
1684 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1685 impl->InstallSuperVersionAndScheduleWork(
1686 cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1687 }
1688 sv_context.Clean();
1689 if (impl->two_write_queues_) {
1690 impl->log_write_mutex_.Lock();
1691 }
1692 impl->alive_log_files_.push_back(
1693 DBImpl::LogFileNumberSize(impl->logfile_number_));
1694 if (impl->two_write_queues_) {
1695 impl->log_write_mutex_.Unlock();
1696 }
1697
1698 impl->DeleteObsoleteFiles();
1699 s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
1700 TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFilesAndSyncDir");
1701 }
1702 if (s.ok()) {
1703 // In WritePrepared there could be gap in sequence numbers. This breaks
1704 // the trick we use in kPointInTimeRecovery which assumes the first seq in
1705 // the log right after the corrupted log is one larger than the last seq
1706 // we read from the wals. To let this trick keep working, we add a dummy
1707 // entry with the expected sequence to the first log right after recovery.
1708 // In non-WritePrepared case also the new log after recovery could be
1709 // empty, and thus missing the consecutive seq hint to distinguish
1710 // middle-log corruption to corrupted-log-remained-after-recovery. This
1711 // case also will be addressed by a dummy write.
1712 if (recovered_seq != kMaxSequenceNumber) {
1713 WriteBatch empty_batch;
1714 WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1715 WriteOptions write_options;
1716 uint64_t log_used, log_size;
1717 log::Writer* log_writer = impl->logs_.back().writer;
1718 s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1719 if (s.ok()) {
1720 // Need to fsync, otherwise it might get lost after a power reset.
1721 s = impl->FlushWAL(false);
1722 if (s.ok()) {
1723 s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1724 }
1725 }
1726 }
1727 }
1728 }
1729 if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1730 // try to read format version
1731 s = impl->PersistentStatsProcessFormatVersion();
1732 }
1733
1734 if (s.ok()) {
1735 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1736 if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1737 auto* vstorage = cfd->current()->storage_info();
1738 for (int i = 1; i < vstorage->num_levels(); ++i) {
1739 int num_files = vstorage->NumLevelFiles(i);
1740 if (num_files > 0) {
1741 s = Status::InvalidArgument(
1742 "Not all files are at level 0. Cannot "
1743 "open with FIFO compaction style.");
1744 break;
1745 }
1746 }
1747 }
1748 if (!cfd->mem()->IsSnapshotSupported()) {
1749 impl->is_snapshot_supported_ = false;
1750 }
1751 if (cfd->ioptions()->merge_operator != nullptr &&
1752 !cfd->mem()->IsMergeOperatorSupported()) {
1753 s = Status::InvalidArgument(
1754 "The memtable of column family %s does not support merge operator "
1755 "its options.merge_operator is non-null",
1756 cfd->GetName().c_str());
1757 }
1758 if (!s.ok()) {
1759 break;
1760 }
1761 }
1762 }
1763 TEST_SYNC_POINT("DBImpl::Open:Opened");
1764 Status persist_options_status;
1765 if (s.ok()) {
1766 // Persist RocksDB Options before scheduling the compaction.
1767 // The WriteOptionsFile() will release and lock the mutex internally.
1768 persist_options_status = impl->WriteOptionsFile(
1769 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1770
1771 *dbptr = impl;
1772 impl->opened_successfully_ = true;
1773 impl->MaybeScheduleFlushOrCompaction();
1774 } else {
1775 persist_options_status.PermitUncheckedError();
1776 }
1777 impl->mutex_.Unlock();
1778
1779 #ifndef ROCKSDB_LITE
1780 auto sfm = static_cast<SstFileManagerImpl*>(
1781 impl->immutable_db_options_.sst_file_manager.get());
1782 if (s.ok() && sfm) {
1783 // Set Statistics ptr for SstFileManager to dump the stats of
1784 // DeleteScheduler.
1785 sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
1786 ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
1787 "SstFileManager instance %p", sfm);
1788
1789 // Notify SstFileManager about all sst files that already exist in
1790 // db_paths[0] and cf_paths[0] when the DB is opened.
1791
1792 // SstFileManagerImpl needs to know sizes of the files. For files whose size
1793 // we already know (sst files that appear in manifest - typically that's the
1794 // vast majority of all files), we'll pass the size to SstFileManager.
1795 // For all other files SstFileManager will query the size from filesystem.
1796
1797 std::vector<LiveFileMetaData> metadata;
1798
1799 // TODO: Once GetLiveFilesMetaData supports blob files, update the logic
1800 // below to get known_file_sizes for blob files.
1801 impl->mutex_.Lock();
1802 impl->versions_->GetLiveFilesMetaData(&metadata);
1803 impl->mutex_.Unlock();
1804
1805 std::unordered_map<std::string, uint64_t> known_file_sizes;
1806 for (const auto& md : metadata) {
1807 std::string name = md.name;
1808 if (!name.empty() && name[0] == '/') {
1809 name = name.substr(1);
1810 }
1811 known_file_sizes[name] = md.size;
1812 }
1813
1814 std::vector<std::string> paths;
1815 paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1816 for (auto& cf : column_families) {
1817 if (!cf.options.cf_paths.empty()) {
1818 paths.emplace_back(cf.options.cf_paths[0].path);
1819 }
1820 }
1821 // Remove duplicate paths.
1822 std::sort(paths.begin(), paths.end());
1823 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1824 for (auto& path : paths) {
1825 std::vector<std::string> existing_files;
1826 impl->immutable_db_options_.env->GetChildren(path, &existing_files)
1827 .PermitUncheckedError(); //**TODO: What do to on error?
1828 for (auto& file_name : existing_files) {
1829 uint64_t file_number;
1830 FileType file_type;
1831 std::string file_path = path + "/" + file_name;
1832 if (ParseFileName(file_name, &file_number, &file_type) &&
1833 (file_type == kTableFile || file_type == kBlobFile)) {
1834 // TODO: Check for errors from OnAddFile?
1835 if (known_file_sizes.count(file_name)) {
1836 // We're assuming that each sst file name exists in at most one of
1837 // the paths.
1838 sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
1839 .PermitUncheckedError();
1840 } else {
1841 sfm->OnAddFile(file_path).PermitUncheckedError();
1842 }
1843 }
1844 }
1845 }
1846
1847 // Reserve some disk buffer space. This is a heuristic - when we run out
1848 // of disk space, this ensures that there is atleast write_buffer_size
1849 // amount of free space before we resume DB writes. In low disk space
1850 // conditions, we want to avoid a lot of small L0 files due to frequent
1851 // WAL write failures and resultant forced flushes
1852 sfm->ReserveDiskBuffer(max_write_buffer_size,
1853 impl->immutable_db_options_.db_paths[0].path);
1854 }
1855
1856 #endif // !ROCKSDB_LITE
1857
1858 if (s.ok()) {
1859 ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1860 impl);
1861 LogFlush(impl->immutable_db_options_.info_log);
1862 assert(impl->TEST_WALBufferIsEmpty());
1863 // If the assert above fails then we need to FlushWAL before returning
1864 // control back to the user.
1865 if (!persist_options_status.ok()) {
1866 s = Status::IOError(
1867 "DB::Open() failed --- Unable to persist Options file",
1868 persist_options_status.ToString());
1869 }
1870 } else {
1871 ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
1872 "Persisting Option File error: %s",
1873 persist_options_status.ToString().c_str());
1874 }
1875 if (s.ok()) {
1876 impl->StartPeriodicWorkScheduler();
1877 } else {
1878 for (auto* h : *handles) {
1879 delete h;
1880 }
1881 handles->clear();
1882 delete impl;
1883 *dbptr = nullptr;
1884 }
1885 return s;
1886 }
1887 } // namespace ROCKSDB_NAMESPACE
1888