1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <cinttypes>
10
11 #include "db/builder.h"
12 #include "db/db_impl/db_impl.h"
13 #include "db/error_handler.h"
14 #include "db/periodic_work_scheduler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/read_write_util.h"
17 #include "file/sst_file_manager_impl.h"
18 #include "file/writable_file_writer.h"
19 #include "monitoring/persistent_stats_history.h"
20 #include "options/options_helper.h"
21 #include "rocksdb/table.h"
22 #include "rocksdb/wal_filter.h"
23 #include "test_util/sync_point.h"
24 #include "util/rate_limiter.h"
25
26 namespace ROCKSDB_NAMESPACE {
SanitizeOptions(const std::string & dbname,const Options & src,bool read_only)27 Options SanitizeOptions(const std::string& dbname, const Options& src,
28 bool read_only) {
29 auto db_options = SanitizeOptions(dbname, DBOptions(src), read_only);
30 ImmutableDBOptions immutable_db_options(db_options);
31 auto cf_options =
32 SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
33 return Options(db_options, cf_options);
34 }
35
SanitizeOptions(const std::string & dbname,const DBOptions & src,bool read_only)36 DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
37 bool read_only) {
38 DBOptions result(src);
39
40 if (result.env == nullptr) {
41 result.env = Env::Default();
42 }
43
44 // result.max_open_files means an "infinite" open files.
45 if (result.max_open_files != -1) {
46 int max_max_open_files = port::GetMaxOpenFiles();
47 if (max_max_open_files == -1) {
48 max_max_open_files = 0x400000;
49 }
50 ClipToRange(&result.max_open_files, 20, max_max_open_files);
51 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
52 &result.max_open_files);
53 }
54
55 if (result.info_log == nullptr && !read_only) {
56 Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
57 if (!s.ok()) {
58 // No place suitable for logging
59 result.info_log = nullptr;
60 }
61 }
62
63 if (!result.write_buffer_manager) {
64 result.write_buffer_manager.reset(
65 new WriteBufferManager(result.db_write_buffer_size));
66 }
67 auto bg_job_limits = DBImpl::GetBGJobLimits(
68 result.max_background_flushes, result.max_background_compactions,
69 result.max_background_jobs, true /* parallelize_compactions */);
70 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
71 Env::Priority::LOW);
72 result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
73 Env::Priority::HIGH);
74
75 if (result.rate_limiter.get() != nullptr) {
76 if (result.bytes_per_sync == 0) {
77 result.bytes_per_sync = 1024 * 1024;
78 }
79 }
80
81 if (result.delayed_write_rate == 0) {
82 if (result.rate_limiter.get() != nullptr) {
83 result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
84 }
85 if (result.delayed_write_rate == 0) {
86 result.delayed_write_rate = 16 * 1024 * 1024;
87 }
88 }
89
90 if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
91 result.recycle_log_file_num = false;
92 }
93
94 if (result.recycle_log_file_num &&
95 (result.wal_recovery_mode ==
96 WALRecoveryMode::kTolerateCorruptedTailRecords ||
97 result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
98 result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
99 // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
100 // feature. WAL recycling expects recovery success upon encountering a
101 // corrupt record at the point where new data ends and recycled data
102 // remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
103 // upon encountering any such corrupt record, as it cannot differentiate
104 // between this and a real corruption, which would cause committed updates
105 // to be truncated -- a violation of the recovery guarantee.
106 // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
107 // recycle log file feature temporarily due to a bug found introducing a
108 // hole in the recovered data
109 // (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
110 // Besides this bug, we believe the features are fundamentally compatible.
111 result.recycle_log_file_num = 0;
112 }
113
114 if (result.wal_dir.empty()) {
115 // Use dbname as default
116 result.wal_dir = dbname;
117 }
118 if (result.wal_dir.back() == '/') {
119 result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
120 }
121
122 if (result.db_paths.size() == 0) {
123 result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
124 }
125
126 if (result.use_direct_reads && result.compaction_readahead_size == 0) {
127 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
128 result.compaction_readahead_size = 1024 * 1024 * 2;
129 }
130
131 if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
132 result.new_table_reader_for_compaction_inputs = true;
133 }
134
135 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
136 // guarantee that consecutive log files have consecutive sequence id, which
137 // make recovery complicated.
138 if (result.allow_2pc) {
139 result.avoid_flush_during_recovery = false;
140 }
141
142 #ifndef ROCKSDB_LITE
143 ImmutableDBOptions immutable_db_options(result);
144 if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
145 // Either the WAL dir and db_paths[0]/db_name are not the same, or we
146 // cannot tell for sure. In either case, assume they're different and
147 // explicitly cleanup the trash log files (bypass DeleteScheduler)
148 // Do this first so even if we end up calling
149 // DeleteScheduler::CleanupDirectory on the same dir later, it will be
150 // safe
151 std::vector<std::string> filenames;
152 Status s = result.env->GetChildren(result.wal_dir, &filenames);
153 s.PermitUncheckedError(); //**TODO: What to do on error?
154 for (std::string& filename : filenames) {
155 if (filename.find(".log.trash", filename.length() -
156 std::string(".log.trash").length()) !=
157 std::string::npos) {
158 std::string trash_file = result.wal_dir + "/" + filename;
159 result.env->DeleteFile(trash_file).PermitUncheckedError();
160 }
161 }
162 }
163 // When the DB is stopped, it's possible that there are some .trash files that
164 // were not deleted yet, when we open the DB we will find these .trash files
165 // and schedule them to be deleted (or delete immediately if SstFileManager
166 // was not used)
167 auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
168 for (size_t i = 0; i < result.db_paths.size(); i++) {
169 DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
170 .PermitUncheckedError();
171 }
172
173 // Create a default SstFileManager for purposes of tracking compaction size
174 // and facilitating recovery from out of space errors.
175 if (result.sst_file_manager.get() == nullptr) {
176 std::shared_ptr<SstFileManager> sst_file_manager(
177 NewSstFileManager(result.env, result.info_log));
178 result.sst_file_manager = sst_file_manager;
179 }
180 #endif // !ROCKSDB_LITE
181
182 if (!result.paranoid_checks) {
183 result.skip_checking_sst_file_sizes_on_db_open = true;
184 ROCKS_LOG_INFO(result.info_log,
185 "file size check will be skipped during open.");
186 }
187
188 return result;
189 }
190
191 namespace {
ValidateOptionsByTable(const DBOptions & db_opts,const std::vector<ColumnFamilyDescriptor> & column_families)192 Status ValidateOptionsByTable(
193 const DBOptions& db_opts,
194 const std::vector<ColumnFamilyDescriptor>& column_families) {
195 Status s;
196 for (auto cf : column_families) {
197 s = ValidateOptions(db_opts, cf.options);
198 if (!s.ok()) {
199 return s;
200 }
201 }
202 return Status::OK();
203 }
204 } // namespace
205
ValidateOptions(const DBOptions & db_options,const std::vector<ColumnFamilyDescriptor> & column_families)206 Status DBImpl::ValidateOptions(
207 const DBOptions& db_options,
208 const std::vector<ColumnFamilyDescriptor>& column_families) {
209 Status s;
210 for (auto& cfd : column_families) {
211 s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
212 if (!s.ok()) {
213 return s;
214 }
215 }
216 s = ValidateOptions(db_options);
217 return s;
218 }
219
ValidateOptions(const DBOptions & db_options)220 Status DBImpl::ValidateOptions(const DBOptions& db_options) {
221 if (db_options.db_paths.size() > 4) {
222 return Status::NotSupported(
223 "More than four DB paths are not supported yet. ");
224 }
225
226 if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
227 // Protect against assert in PosixMMapReadableFile constructor
228 return Status::NotSupported(
229 "If memory mapped reads (allow_mmap_reads) are enabled "
230 "then direct I/O reads (use_direct_reads) must be disabled. ");
231 }
232
233 if (db_options.allow_mmap_writes &&
234 db_options.use_direct_io_for_flush_and_compaction) {
235 return Status::NotSupported(
236 "If memory mapped writes (allow_mmap_writes) are enabled "
237 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
238 "be disabled. ");
239 }
240
241 if (db_options.keep_log_file_num == 0) {
242 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
243 }
244
245 if (db_options.unordered_write &&
246 !db_options.allow_concurrent_memtable_write) {
247 return Status::InvalidArgument(
248 "unordered_write is incompatible with !allow_concurrent_memtable_write");
249 }
250
251 if (db_options.unordered_write && db_options.enable_pipelined_write) {
252 return Status::InvalidArgument(
253 "unordered_write is incompatible with enable_pipelined_write");
254 }
255
256 if (db_options.atomic_flush && db_options.enable_pipelined_write) {
257 return Status::InvalidArgument(
258 "atomic_flush is incompatible with enable_pipelined_write");
259 }
260
261 // TODO remove this restriction
262 if (db_options.atomic_flush && db_options.best_efforts_recovery) {
263 return Status::InvalidArgument(
264 "atomic_flush is currently incompatible with best-efforts recovery");
265 }
266
267 return Status::OK();
268 }
269
NewDB(std::vector<std::string> * new_filenames)270 Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
271 VersionEdit new_db;
272 Status s = SetIdentityFile(env_, dbname_);
273 if (!s.ok()) {
274 return s;
275 }
276 if (immutable_db_options_.write_dbid_to_manifest) {
277 std::string temp_db_id;
278 GetDbIdentityFromIdentityFile(&temp_db_id);
279 new_db.SetDBId(temp_db_id);
280 }
281 new_db.SetLogNumber(0);
282 new_db.SetNextFile(2);
283 new_db.SetLastSequence(0);
284
285 ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
286 const std::string manifest = DescriptorFileName(dbname_, 1);
287 {
288 if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
289 fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
290 }
291 std::unique_ptr<FSWritableFile> file;
292 FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
293 s = NewWritableFile(fs_.get(), manifest, &file, file_options);
294 if (!s.ok()) {
295 return s;
296 }
297 FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
298 file->SetPreallocationBlockSize(
299 immutable_db_options_.manifest_preallocation_size);
300 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
301 std::move(file), manifest, file_options, immutable_db_options_.clock,
302 io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
303 nullptr, tmp_set.Contains(FileType::kDescriptorFile)));
304 log::Writer log(std::move(file_writer), 0, false);
305 std::string record;
306 new_db.EncodeTo(&record);
307 s = log.AddRecord(record);
308 if (s.ok()) {
309 s = SyncManifest(&immutable_db_options_, log.file());
310 }
311 }
312 if (s.ok()) {
313 // Make "CURRENT" file that points to the new manifest file.
314 s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
315 if (new_filenames) {
316 new_filenames->emplace_back(
317 manifest.substr(manifest.find_last_of("/\\") + 1));
318 }
319 } else {
320 fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
321 }
322 return s;
323 }
324
CreateAndNewDirectory(FileSystem * fs,const std::string & dirname,std::unique_ptr<FSDirectory> * directory)325 IOStatus DBImpl::CreateAndNewDirectory(
326 FileSystem* fs, const std::string& dirname,
327 std::unique_ptr<FSDirectory>* directory) {
328 // We call CreateDirIfMissing() as the directory may already exist (if we
329 // are reopening a DB), when this happens we don't want creating the
330 // directory to cause an error. However, we need to check if creating the
331 // directory fails or else we may get an obscure message about the lock
332 // file not existing. One real-world example of this occurring is if
333 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
334 // when dbname_ is "dir/db" but when "dir" doesn't exist.
335 IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
336 if (!io_s.ok()) {
337 return io_s;
338 }
339 return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
340 }
341
SetDirectories(FileSystem * fs,const std::string & dbname,const std::string & wal_dir,const std::vector<DbPath> & data_paths)342 IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
343 const std::string& wal_dir,
344 const std::vector<DbPath>& data_paths) {
345 IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
346 if (!io_s.ok()) {
347 return io_s;
348 }
349 if (!wal_dir.empty() && dbname != wal_dir) {
350 io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
351 if (!io_s.ok()) {
352 return io_s;
353 }
354 }
355
356 data_dirs_.clear();
357 for (auto& p : data_paths) {
358 const std::string db_path = p.path;
359 if (db_path == dbname) {
360 data_dirs_.emplace_back(nullptr);
361 } else {
362 std::unique_ptr<FSDirectory> path_directory;
363 io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
364 if (!io_s.ok()) {
365 return io_s;
366 }
367 data_dirs_.emplace_back(path_directory.release());
368 }
369 }
370 assert(data_dirs_.size() == data_paths.size());
371 return IOStatus::OK();
372 }
373
Recover(const std::vector<ColumnFamilyDescriptor> & column_families,bool read_only,bool error_if_wal_file_exists,bool error_if_data_exists_in_wals,uint64_t * recovered_seq)374 Status DBImpl::Recover(
375 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
376 bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
377 uint64_t* recovered_seq) {
378 mutex_.AssertHeld();
379
380 bool is_new_db = false;
381 assert(db_lock_ == nullptr);
382 std::vector<std::string> files_in_dbname;
383 if (!read_only) {
384 Status s = directories_.SetDirectories(fs_.get(), dbname_,
385 immutable_db_options_.wal_dir,
386 immutable_db_options_.db_paths);
387 if (!s.ok()) {
388 return s;
389 }
390
391 s = env_->LockFile(LockFileName(dbname_), &db_lock_);
392 if (!s.ok()) {
393 return s;
394 }
395
396 std::string current_fname = CurrentFileName(dbname_);
397 // Path to any MANIFEST file in the db dir. It does not matter which one.
398 // Since best-efforts recovery ignores CURRENT file, existence of a
399 // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
400 // can be found, a new db will be created.
401 std::string manifest_path;
402 if (!immutable_db_options_.best_efforts_recovery) {
403 s = env_->FileExists(current_fname);
404 } else {
405 s = Status::NotFound();
406 Status io_s = env_->GetChildren(dbname_, &files_in_dbname);
407 if (!io_s.ok()) {
408 s = io_s;
409 files_in_dbname.clear();
410 }
411 for (const std::string& file : files_in_dbname) {
412 uint64_t number = 0;
413 FileType type = kWalFile; // initialize
414 if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
415 // Found MANIFEST (descriptor log), thus best-efforts recovery does
416 // not have to treat the db as empty.
417 s = Status::OK();
418 manifest_path = dbname_ + "/" + file;
419 break;
420 }
421 }
422 }
423 if (s.IsNotFound()) {
424 if (immutable_db_options_.create_if_missing) {
425 s = NewDB(&files_in_dbname);
426 is_new_db = true;
427 if (!s.ok()) {
428 return s;
429 }
430 } else {
431 return Status::InvalidArgument(
432 current_fname, "does not exist (create_if_missing is false)");
433 }
434 } else if (s.ok()) {
435 if (immutable_db_options_.error_if_exists) {
436 return Status::InvalidArgument(dbname_,
437 "exists (error_if_exists is true)");
438 }
439 } else {
440 // Unexpected error reading file
441 assert(s.IsIOError());
442 return s;
443 }
444 // Verify compatibility of file_options_ and filesystem
445 {
446 std::unique_ptr<FSRandomAccessFile> idfile;
447 FileOptions customized_fs(file_options_);
448 customized_fs.use_direct_reads |=
449 immutable_db_options_.use_direct_io_for_flush_and_compaction;
450 const std::string& fname =
451 manifest_path.empty() ? current_fname : manifest_path;
452 s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
453 if (!s.ok()) {
454 std::string error_str = s.ToString();
455 // Check if unsupported Direct I/O is the root cause
456 customized_fs.use_direct_reads = false;
457 s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
458 if (s.ok()) {
459 return Status::InvalidArgument(
460 "Direct I/O is not supported by the specified DB.");
461 } else {
462 return Status::InvalidArgument(
463 "Found options incompatible with filesystem", error_str.c_str());
464 }
465 }
466 }
467 } else if (immutable_db_options_.best_efforts_recovery) {
468 assert(files_in_dbname.empty());
469 Status s = env_->GetChildren(dbname_, &files_in_dbname);
470 if (s.IsNotFound()) {
471 return Status::InvalidArgument(dbname_,
472 "does not exist (open for read only)");
473 } else if (s.IsIOError()) {
474 return s;
475 }
476 assert(s.ok());
477 }
478 assert(db_id_.empty());
479 Status s;
480 bool missing_table_file = false;
481 if (!immutable_db_options_.best_efforts_recovery) {
482 s = versions_->Recover(column_families, read_only, &db_id_);
483 } else {
484 assert(!files_in_dbname.empty());
485 s = versions_->TryRecover(column_families, read_only, files_in_dbname,
486 &db_id_, &missing_table_file);
487 if (s.ok()) {
488 // TryRecover may delete previous column_family_set_.
489 column_family_memtables_.reset(
490 new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
491 }
492 }
493 if (!s.ok()) {
494 return s;
495 }
496 s = SetDBId(read_only);
497 if (s.ok() && !read_only) {
498 s = DeleteUnreferencedSstFiles();
499 }
500
501 if (immutable_db_options_.paranoid_checks && s.ok()) {
502 s = CheckConsistency();
503 }
504 if (s.ok() && !read_only) {
505 std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
506 for (auto cfd : *versions_->GetColumnFamilySet()) {
507 s = cfd->AddDirectories(&created_dirs);
508 if (!s.ok()) {
509 return s;
510 }
511 }
512 }
513 // DB mutex is already held
514 if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
515 s = InitPersistStatsColumnFamily();
516 }
517
518 std::vector<std::string> files_in_wal_dir;
519 if (s.ok()) {
520 // Initial max_total_in_memory_state_ before recovery wals. Log recovery
521 // may check this value to decide whether to flush.
522 max_total_in_memory_state_ = 0;
523 for (auto cfd : *versions_->GetColumnFamilySet()) {
524 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
525 max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
526 mutable_cf_options->max_write_buffer_number;
527 }
528
529 SequenceNumber next_sequence(kMaxSequenceNumber);
530 default_cf_handle_ = new ColumnFamilyHandleImpl(
531 versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
532 default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
533 // TODO(Zhongyi): handle single_column_family_mode_ when
534 // persistent_stats is enabled
535 single_column_family_mode_ =
536 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
537
538 // Recover from all newer log files than the ones named in the
539 // descriptor (new log files may have been added by the previous
540 // incarnation without registering them in the descriptor).
541 //
542 // Note that prev_log_number() is no longer used, but we pay
543 // attention to it in case we are recovering a database
544 // produced by an older version of rocksdb.
545 if (!immutable_db_options_.best_efforts_recovery) {
546 s = env_->GetChildren(immutable_db_options_.wal_dir, &files_in_wal_dir);
547 }
548 if (s.IsNotFound()) {
549 return Status::InvalidArgument("wal_dir not found",
550 immutable_db_options_.wal_dir);
551 } else if (!s.ok()) {
552 return s;
553 }
554
555 std::unordered_map<uint64_t, std::string> wal_files;
556 for (const auto& file : files_in_wal_dir) {
557 uint64_t number;
558 FileType type;
559 if (ParseFileName(file, &number, &type) && type == kWalFile) {
560 if (is_new_db) {
561 return Status::Corruption(
562 "While creating a new Db, wal_dir contains "
563 "existing log file: ",
564 file);
565 } else {
566 wal_files[number] =
567 LogFileName(immutable_db_options_.wal_dir, number);
568 }
569 }
570 }
571
572 if (immutable_db_options_.track_and_verify_wals_in_manifest) {
573 if (!immutable_db_options_.best_efforts_recovery) {
574 // Verify WALs in MANIFEST.
575 s = versions_->GetWalSet().CheckWals(env_, wal_files);
576 } // else since best effort recovery does not recover from WALs, no need
577 // to check WALs.
578 } else if (!versions_->GetWalSet().GetWals().empty()) {
579 // Tracking is disabled, clear previously tracked WALs from MANIFEST,
580 // otherwise, in the future, if WAL tracking is enabled again,
581 // since the WALs deleted when WAL tracking is disabled are not persisted
582 // into MANIFEST, WAL check may fail.
583 VersionEdit edit;
584 WalNumber max_wal_number =
585 versions_->GetWalSet().GetWals().rbegin()->first;
586 edit.DeleteWalsBefore(max_wal_number + 1);
587 s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
588 }
589 if (!s.ok()) {
590 return s;
591 }
592
593 if (!wal_files.empty()) {
594 if (error_if_wal_file_exists) {
595 return Status::Corruption(
596 "The db was opened in readonly mode with error_if_wal_file_exists"
597 "flag but a WAL file already exists");
598 } else if (error_if_data_exists_in_wals) {
599 for (auto& wal_file : wal_files) {
600 uint64_t bytes;
601 s = env_->GetFileSize(wal_file.second, &bytes);
602 if (s.ok()) {
603 if (bytes > 0) {
604 return Status::Corruption(
605 "error_if_data_exists_in_wals is set but there are data "
606 " in WAL files.");
607 }
608 }
609 }
610 }
611 }
612
613 if (!wal_files.empty()) {
614 // Recover in the order in which the wals were generated
615 std::vector<uint64_t> wals;
616 wals.reserve(wal_files.size());
617 for (const auto& wal_file : wal_files) {
618 wals.push_back(wal_file.first);
619 }
620 std::sort(wals.begin(), wals.end());
621
622 bool corrupted_wal_found = false;
623 s = RecoverLogFiles(wals, &next_sequence, read_only,
624 &corrupted_wal_found);
625 if (corrupted_wal_found && recovered_seq != nullptr) {
626 *recovered_seq = next_sequence;
627 }
628 if (!s.ok()) {
629 // Clear memtables if recovery failed
630 for (auto cfd : *versions_->GetColumnFamilySet()) {
631 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
632 kMaxSequenceNumber);
633 }
634 }
635 }
636 }
637
638 if (read_only) {
639 // If we are opening as read-only, we need to update options_file_number_
640 // to reflect the most recent OPTIONS file. It does not matter for regular
641 // read-write db instance because options_file_number_ will later be
642 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
643 std::vector<std::string> filenames;
644 if (s.ok()) {
645 const std::string normalized_dbname = NormalizePath(dbname_);
646 const std::string normalized_wal_dir =
647 NormalizePath(immutable_db_options_.wal_dir);
648 if (immutable_db_options_.best_efforts_recovery) {
649 filenames = std::move(files_in_dbname);
650 } else if (normalized_dbname == normalized_wal_dir) {
651 filenames = std::move(files_in_wal_dir);
652 } else {
653 s = env_->GetChildren(GetName(), &filenames);
654 }
655 }
656 if (s.ok()) {
657 uint64_t number = 0;
658 uint64_t options_file_number = 0;
659 FileType type;
660 for (const auto& fname : filenames) {
661 if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
662 options_file_number = std::max(number, options_file_number);
663 }
664 }
665 versions_->options_file_number_ = options_file_number;
666 }
667 }
668 return s;
669 }
670
PersistentStatsProcessFormatVersion()671 Status DBImpl::PersistentStatsProcessFormatVersion() {
672 mutex_.AssertHeld();
673 Status s;
674 // persist version when stats CF doesn't exist
675 bool should_persist_format_version = !persistent_stats_cfd_exists_;
676 mutex_.Unlock();
677 if (persistent_stats_cfd_exists_) {
678 // Check persistent stats format version compatibility. Drop and recreate
679 // persistent stats CF if format version is incompatible
680 uint64_t format_version_recovered = 0;
681 Status s_format = DecodePersistentStatsVersionNumber(
682 this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
683 uint64_t compatible_version_recovered = 0;
684 Status s_compatible = DecodePersistentStatsVersionNumber(
685 this, StatsVersionKeyType::kCompatibleVersion,
686 &compatible_version_recovered);
687 // abort reading from existing stats CF if any of following is true:
688 // 1. failed to read format version or compatible version from disk
689 // 2. sst's format version is greater than current format version, meaning
690 // this sst is encoded with a newer RocksDB release, and current compatible
691 // version is below the sst's compatible version
692 if (!s_format.ok() || !s_compatible.ok() ||
693 (kStatsCFCurrentFormatVersion < format_version_recovered &&
694 kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
695 if (!s_format.ok() || !s_compatible.ok()) {
696 ROCKS_LOG_WARN(
697 immutable_db_options_.info_log,
698 "Recreating persistent stats column family since reading "
699 "persistent stats version key failed. Format key: %s, compatible "
700 "key: %s",
701 s_format.ToString().c_str(), s_compatible.ToString().c_str());
702 } else {
703 ROCKS_LOG_WARN(
704 immutable_db_options_.info_log,
705 "Recreating persistent stats column family due to corrupted or "
706 "incompatible format version. Recovered format: %" PRIu64
707 "; recovered format compatible since: %" PRIu64 "\n",
708 format_version_recovered, compatible_version_recovered);
709 }
710 s = DropColumnFamily(persist_stats_cf_handle_);
711 if (s.ok()) {
712 s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
713 }
714 ColumnFamilyHandle* handle = nullptr;
715 if (s.ok()) {
716 ColumnFamilyOptions cfo;
717 OptimizeForPersistentStats(&cfo);
718 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
719 }
720 if (s.ok()) {
721 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
722 // should also persist version here because old stats CF is discarded
723 should_persist_format_version = true;
724 }
725 }
726 }
727 if (should_persist_format_version) {
728 // Persistent stats CF being created for the first time, need to write
729 // format version key
730 WriteBatch batch;
731 if (s.ok()) {
732 s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
733 ToString(kStatsCFCurrentFormatVersion));
734 }
735 if (s.ok()) {
736 s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
737 ToString(kStatsCFCompatibleFormatVersion));
738 }
739 if (s.ok()) {
740 WriteOptions wo;
741 wo.low_pri = true;
742 wo.no_slowdown = true;
743 wo.sync = false;
744 s = Write(wo, &batch);
745 }
746 }
747 mutex_.Lock();
748 return s;
749 }
750
InitPersistStatsColumnFamily()751 Status DBImpl::InitPersistStatsColumnFamily() {
752 mutex_.AssertHeld();
753 assert(!persist_stats_cf_handle_);
754 ColumnFamilyData* persistent_stats_cfd =
755 versions_->GetColumnFamilySet()->GetColumnFamily(
756 kPersistentStatsColumnFamilyName);
757 persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
758
759 Status s;
760 if (persistent_stats_cfd != nullptr) {
761 // We are recovering from a DB which already contains persistent stats CF,
762 // the CF is already created in VersionSet::ApplyOneVersionEdit, but
763 // column family handle was not. Need to explicitly create handle here.
764 persist_stats_cf_handle_ =
765 new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
766 } else {
767 mutex_.Unlock();
768 ColumnFamilyHandle* handle = nullptr;
769 ColumnFamilyOptions cfo;
770 OptimizeForPersistentStats(&cfo);
771 s = CreateColumnFamily(cfo, kPersistentStatsColumnFamilyName, &handle);
772 persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
773 mutex_.Lock();
774 }
775 return s;
776 }
777
778 // REQUIRES: wal_numbers are sorted in ascending order
RecoverLogFiles(const std::vector<uint64_t> & wal_numbers,SequenceNumber * next_sequence,bool read_only,bool * corrupted_wal_found)779 Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
780 SequenceNumber* next_sequence, bool read_only,
781 bool* corrupted_wal_found) {
782 struct LogReporter : public log::Reader::Reporter {
783 Env* env;
784 Logger* info_log;
785 const char* fname;
786 Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
787 void Corruption(size_t bytes, const Status& s) override {
788 ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
789 (status == nullptr ? "(ignoring error) " : ""), fname,
790 static_cast<int>(bytes), s.ToString().c_str());
791 if (status != nullptr && status->ok()) {
792 *status = s;
793 }
794 }
795 };
796
797 mutex_.AssertHeld();
798 Status status;
799 std::unordered_map<int, VersionEdit> version_edits;
800 // no need to refcount because iteration is under mutex
801 for (auto cfd : *versions_->GetColumnFamilySet()) {
802 VersionEdit edit;
803 edit.SetColumnFamily(cfd->GetID());
804 version_edits.insert({cfd->GetID(), edit});
805 }
806 int job_id = next_job_id_.fetch_add(1);
807 {
808 auto stream = event_logger_.Log();
809 stream << "job" << job_id << "event"
810 << "recovery_started";
811 stream << "wal_files";
812 stream.StartArray();
813 for (auto wal_number : wal_numbers) {
814 stream << wal_number;
815 }
816 stream.EndArray();
817 }
818
819 #ifndef ROCKSDB_LITE
820 if (immutable_db_options_.wal_filter != nullptr) {
821 std::map<std::string, uint32_t> cf_name_id_map;
822 std::map<uint32_t, uint64_t> cf_lognumber_map;
823 for (auto cfd : *versions_->GetColumnFamilySet()) {
824 cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
825 cf_lognumber_map.insert(
826 std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
827 }
828
829 immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
830 cf_name_id_map);
831 }
832 #endif
833
834 bool stop_replay_by_wal_filter = false;
835 bool stop_replay_for_corruption = false;
836 bool flushed = false;
837 uint64_t corrupted_wal_number = kMaxSequenceNumber;
838 uint64_t min_wal_number = MinLogNumberToKeep();
839 for (auto wal_number : wal_numbers) {
840 if (wal_number < min_wal_number) {
841 ROCKS_LOG_INFO(immutable_db_options_.info_log,
842 "Skipping log #%" PRIu64
843 " since it is older than min log to keep #%" PRIu64,
844 wal_number, min_wal_number);
845 continue;
846 }
847 // The previous incarnation may not have written any MANIFEST
848 // records after allocating this log number. So we manually
849 // update the file number allocation counter in VersionSet.
850 versions_->MarkFileNumberUsed(wal_number);
851 // Open the log file
852 std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
853
854 ROCKS_LOG_INFO(immutable_db_options_.info_log,
855 "Recovering log #%" PRIu64 " mode %d", wal_number,
856 static_cast<int>(immutable_db_options_.wal_recovery_mode));
857 auto logFileDropped = [this, &fname]() {
858 uint64_t bytes;
859 if (env_->GetFileSize(fname, &bytes).ok()) {
860 auto info_log = immutable_db_options_.info_log.get();
861 ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
862 static_cast<int>(bytes));
863 }
864 };
865 if (stop_replay_by_wal_filter) {
866 logFileDropped();
867 continue;
868 }
869
870 std::unique_ptr<SequentialFileReader> file_reader;
871 {
872 std::unique_ptr<FSSequentialFile> file;
873 status = fs_->NewSequentialFile(fname,
874 fs_->OptimizeForLogRead(file_options_),
875 &file, nullptr);
876 if (!status.ok()) {
877 MaybeIgnoreError(&status);
878 if (!status.ok()) {
879 return status;
880 } else {
881 // Fail with one log file, but that's ok.
882 // Try next one.
883 continue;
884 }
885 }
886 file_reader.reset(new SequentialFileReader(
887 std::move(file), fname, immutable_db_options_.log_readahead_size,
888 io_tracer_));
889 }
890
891 // Create the log reader.
892 LogReporter reporter;
893 reporter.env = env_;
894 reporter.info_log = immutable_db_options_.info_log.get();
895 reporter.fname = fname.c_str();
896 if (!immutable_db_options_.paranoid_checks ||
897 immutable_db_options_.wal_recovery_mode ==
898 WALRecoveryMode::kSkipAnyCorruptedRecords) {
899 reporter.status = nullptr;
900 } else {
901 reporter.status = &status;
902 }
903 // We intentially make log::Reader do checksumming even if
904 // paranoid_checks==false so that corruptions cause entire commits
905 // to be skipped instead of propagating bad information (like overly
906 // large sequence numbers).
907 log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
908 &reporter, true /*checksum*/, wal_number);
909
910 // Determine if we should tolerate incomplete records at the tail end of the
911 // Read all the records and add to a memtable
912 std::string scratch;
913 Slice record;
914 WriteBatch batch;
915
916 TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
917 /*arg=*/nullptr);
918 while (!stop_replay_by_wal_filter &&
919 reader.ReadRecord(&record, &scratch,
920 immutable_db_options_.wal_recovery_mode) &&
921 status.ok()) {
922 if (record.size() < WriteBatchInternal::kHeader) {
923 reporter.Corruption(record.size(),
924 Status::Corruption("log record too small"));
925 continue;
926 }
927
928 status = WriteBatchInternal::SetContents(&batch, record);
929 if (!status.ok()) {
930 return status;
931 }
932 SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
933
934 if (immutable_db_options_.wal_recovery_mode ==
935 WALRecoveryMode::kPointInTimeRecovery) {
936 // In point-in-time recovery mode, if sequence id of log files are
937 // consecutive, we continue recovery despite corruption. This could
938 // happen when we open and write to a corrupted DB, where sequence id
939 // will start from the last sequence id we recovered.
940 if (sequence == *next_sequence) {
941 stop_replay_for_corruption = false;
942 }
943 if (stop_replay_for_corruption) {
944 logFileDropped();
945 break;
946 }
947 }
948
949 #ifndef ROCKSDB_LITE
950 if (immutable_db_options_.wal_filter != nullptr) {
951 WriteBatch new_batch;
952 bool batch_changed = false;
953
954 WalFilter::WalProcessingOption wal_processing_option =
955 immutable_db_options_.wal_filter->LogRecordFound(
956 wal_number, fname, batch, &new_batch, &batch_changed);
957
958 switch (wal_processing_option) {
959 case WalFilter::WalProcessingOption::kContinueProcessing:
960 // do nothing, proceeed normally
961 break;
962 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
963 // skip current record
964 continue;
965 case WalFilter::WalProcessingOption::kStopReplay:
966 // skip current record and stop replay
967 stop_replay_by_wal_filter = true;
968 continue;
969 case WalFilter::WalProcessingOption::kCorruptedRecord: {
970 status =
971 Status::Corruption("Corruption reported by Wal Filter ",
972 immutable_db_options_.wal_filter->Name());
973 MaybeIgnoreError(&status);
974 if (!status.ok()) {
975 reporter.Corruption(record.size(), status);
976 continue;
977 }
978 break;
979 }
980 default: {
981 assert(false); // unhandled case
982 status = Status::NotSupported(
983 "Unknown WalProcessingOption returned"
984 " by Wal Filter ",
985 immutable_db_options_.wal_filter->Name());
986 MaybeIgnoreError(&status);
987 if (!status.ok()) {
988 return status;
989 } else {
990 // Ignore the error with current record processing.
991 continue;
992 }
993 }
994 }
995
996 if (batch_changed) {
997 // Make sure that the count in the new batch is
998 // within the orignal count.
999 int new_count = WriteBatchInternal::Count(&new_batch);
1000 int original_count = WriteBatchInternal::Count(&batch);
1001 if (new_count > original_count) {
1002 ROCKS_LOG_FATAL(
1003 immutable_db_options_.info_log,
1004 "Recovering log #%" PRIu64
1005 " mode %d log filter %s returned "
1006 "more records (%d) than original (%d) which is not allowed. "
1007 "Aborting recovery.",
1008 wal_number,
1009 static_cast<int>(immutable_db_options_.wal_recovery_mode),
1010 immutable_db_options_.wal_filter->Name(), new_count,
1011 original_count);
1012 status = Status::NotSupported(
1013 "More than original # of records "
1014 "returned by Wal Filter ",
1015 immutable_db_options_.wal_filter->Name());
1016 return status;
1017 }
1018 // Set the same sequence number in the new_batch
1019 // as the original batch.
1020 WriteBatchInternal::SetSequence(&new_batch,
1021 WriteBatchInternal::Sequence(&batch));
1022 batch = new_batch;
1023 }
1024 }
1025 #endif // ROCKSDB_LITE
1026
1027 // If column family was not found, it might mean that the WAL write
1028 // batch references to the column family that was dropped after the
1029 // insert. We don't want to fail the whole write batch in that case --
1030 // we just ignore the update.
1031 // That's why we set ignore missing column families to true
1032 bool has_valid_writes = false;
1033 status = WriteBatchInternal::InsertInto(
1034 &batch, column_family_memtables_.get(), &flush_scheduler_,
1035 &trim_history_scheduler_, true, wal_number, this,
1036 false /* concurrent_memtable_writes */, next_sequence,
1037 &has_valid_writes, seq_per_batch_, batch_per_txn_);
1038 MaybeIgnoreError(&status);
1039 if (!status.ok()) {
1040 // We are treating this as a failure while reading since we read valid
1041 // blocks that do not form coherent data
1042 reporter.Corruption(record.size(), status);
1043 continue;
1044 }
1045
1046 if (has_valid_writes && !read_only) {
1047 // we can do this because this is called before client has access to the
1048 // DB and there is only a single thread operating on DB
1049 ColumnFamilyData* cfd;
1050
1051 while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1052 cfd->UnrefAndTryDelete();
1053 // If this asserts, it means that InsertInto failed in
1054 // filtering updates to already-flushed column families
1055 assert(cfd->GetLogNumber() <= wal_number);
1056 auto iter = version_edits.find(cfd->GetID());
1057 assert(iter != version_edits.end());
1058 VersionEdit* edit = &iter->second;
1059 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1060 if (!status.ok()) {
1061 // Reflect errors immediately so that conditions like full
1062 // file-systems cause the DB::Open() to fail.
1063 return status;
1064 }
1065 flushed = true;
1066
1067 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1068 *next_sequence);
1069 }
1070 }
1071 }
1072
1073 if (!status.ok()) {
1074 if (status.IsNotSupported()) {
1075 // We should not treat NotSupported as corruption. It is rather a clear
1076 // sign that we are processing a WAL that is produced by an incompatible
1077 // version of the code.
1078 return status;
1079 }
1080 if (immutable_db_options_.wal_recovery_mode ==
1081 WALRecoveryMode::kSkipAnyCorruptedRecords) {
1082 // We should ignore all errors unconditionally
1083 status = Status::OK();
1084 } else if (immutable_db_options_.wal_recovery_mode ==
1085 WALRecoveryMode::kPointInTimeRecovery) {
1086 if (status.IsIOError()) {
1087 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1088 "IOError during point-in-time reading log #%" PRIu64
1089 " seq #%" PRIu64
1090 ". %s. This likely mean loss of synced WAL, "
1091 "thus recovery fails.",
1092 wal_number, *next_sequence,
1093 status.ToString().c_str());
1094 return status;
1095 }
1096 // We should ignore the error but not continue replaying
1097 status = Status::OK();
1098 stop_replay_for_corruption = true;
1099 corrupted_wal_number = wal_number;
1100 if (corrupted_wal_found != nullptr) {
1101 *corrupted_wal_found = true;
1102 }
1103 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1104 "Point in time recovered to log #%" PRIu64
1105 " seq #%" PRIu64,
1106 wal_number, *next_sequence);
1107 } else {
1108 assert(immutable_db_options_.wal_recovery_mode ==
1109 WALRecoveryMode::kTolerateCorruptedTailRecords ||
1110 immutable_db_options_.wal_recovery_mode ==
1111 WALRecoveryMode::kAbsoluteConsistency);
1112 return status;
1113 }
1114 }
1115
1116 flush_scheduler_.Clear();
1117 trim_history_scheduler_.Clear();
1118 auto last_sequence = *next_sequence - 1;
1119 if ((*next_sequence != kMaxSequenceNumber) &&
1120 (versions_->LastSequence() <= last_sequence)) {
1121 versions_->SetLastAllocatedSequence(last_sequence);
1122 versions_->SetLastPublishedSequence(last_sequence);
1123 versions_->SetLastSequence(last_sequence);
1124 }
1125 }
1126 // Compare the corrupted log number to all columnfamily's current log number.
1127 // Abort Open() if any column family's log number is greater than
1128 // the corrupted log number, which means CF contains data beyond the point of
1129 // corruption. This could during PIT recovery when the WAL is corrupted and
1130 // some (but not all) CFs are flushed
1131 // Exclude the PIT case where no log is dropped after the corruption point.
1132 // This is to cover the case for empty wals after corrupted log, in which we
1133 // don't reset stop_replay_for_corruption.
1134 if (stop_replay_for_corruption == true &&
1135 (immutable_db_options_.wal_recovery_mode ==
1136 WALRecoveryMode::kPointInTimeRecovery ||
1137 immutable_db_options_.wal_recovery_mode ==
1138 WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1139 for (auto cfd : *versions_->GetColumnFamilySet()) {
1140 // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
1141 // the CF is still consistent: If a new column family is created during
1142 // the flush and the WAL sync fails at the same time, the new CF points to
1143 // the new WAL but the old WAL is curropted. Since the new CF is empty, it
1144 // is still consistent. We add the check of CF sst file size to avoid the
1145 // false positive alert.
1146
1147 // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
1148 // the ignorance of a very rare inconsistency case caused in data
1149 // canclation. One CF is empty due to KV deletion. But those operations
1150 // are in the WAL. If the WAL is corrupted, the status of this CF might
1151 // not be consistent with others. However, the consistency check will be
1152 // bypassed due to empty CF.
1153 // TODO: a better and complete implementation is needed to ensure strict
1154 // consistency check in WAL recovery including hanlding the tailing
1155 // issues.
1156 if (cfd->GetLogNumber() > corrupted_wal_number &&
1157 cfd->GetLiveSstFilesSize() > 0) {
1158 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1159 "Column family inconsistency: SST file contains data"
1160 " beyond the point of corruption.");
1161 return Status::Corruption("SST file is ahead of WALs in CF " +
1162 cfd->GetName());
1163 }
1164 }
1165 }
1166
1167 // True if there's any data in the WALs; if not, we can skip re-processing
1168 // them later
1169 bool data_seen = false;
1170 if (!read_only) {
1171 // no need to refcount since client still doesn't have access
1172 // to the DB and can not drop column families while we iterate
1173 const WalNumber max_wal_number = wal_numbers.back();
1174 for (auto cfd : *versions_->GetColumnFamilySet()) {
1175 auto iter = version_edits.find(cfd->GetID());
1176 assert(iter != version_edits.end());
1177 VersionEdit* edit = &iter->second;
1178
1179 if (cfd->GetLogNumber() > max_wal_number) {
1180 // Column family cfd has already flushed the data
1181 // from all wals. Memtable has to be empty because
1182 // we filter the updates based on wal_number
1183 // (in WriteBatch::InsertInto)
1184 assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1185 assert(edit->NumEntries() == 0);
1186 continue;
1187 }
1188
1189 TEST_SYNC_POINT_CALLBACK(
1190 "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1191
1192 // flush the final memtable (if non-empty)
1193 if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1194 // If flush happened in the middle of recovery (e.g. due to memtable
1195 // being full), we flush at the end. Otherwise we'll need to record
1196 // where we were on last flush, which make the logic complicated.
1197 if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1198 status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1199 if (!status.ok()) {
1200 // Recovery failed
1201 break;
1202 }
1203 flushed = true;
1204
1205 cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1206 versions_->LastSequence());
1207 }
1208 data_seen = true;
1209 }
1210
1211 // Update the log number info in the version edit corresponding to this
1212 // column family. Note that the version edits will be written to MANIFEST
1213 // together later.
1214 // writing wal_number in the manifest means that any log file
1215 // with number strongly less than (wal_number + 1) is already
1216 // recovered and should be ignored on next reincarnation.
1217 // Since we already recovered max_wal_number, we want all wals
1218 // with numbers `<= max_wal_number` (includes this one) to be ignored
1219 if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1220 edit->SetLogNumber(max_wal_number + 1);
1221 }
1222 }
1223 if (status.ok()) {
1224 // we must mark the next log number as used, even though it's
1225 // not actually used. that is because VersionSet assumes
1226 // VersionSet::next_file_number_ always to be strictly greater than any
1227 // log number
1228 versions_->MarkFileNumberUsed(max_wal_number + 1);
1229
1230 autovector<ColumnFamilyData*> cfds;
1231 autovector<const MutableCFOptions*> cf_opts;
1232 autovector<autovector<VersionEdit*>> edit_lists;
1233 for (auto* cfd : *versions_->GetColumnFamilySet()) {
1234 cfds.push_back(cfd);
1235 cf_opts.push_back(cfd->GetLatestMutableCFOptions());
1236 auto iter = version_edits.find(cfd->GetID());
1237 assert(iter != version_edits.end());
1238 edit_lists.push_back({&iter->second});
1239 }
1240
1241 std::unique_ptr<VersionEdit> wal_deletion;
1242 if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1243 wal_deletion.reset(new VersionEdit);
1244 wal_deletion->DeleteWalsBefore(max_wal_number + 1);
1245 edit_lists.back().push_back(wal_deletion.get());
1246 }
1247
1248 // write MANIFEST with update
1249 status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
1250 directories_.GetDbDir(),
1251 /*new_descriptor_log=*/true);
1252 }
1253 }
1254
1255 if (status.ok()) {
1256 if (data_seen && !flushed) {
1257 status = RestoreAliveLogFiles(wal_numbers);
1258 } else {
1259 // If there's no data in the WAL, or we flushed all the data, still
1260 // truncate the log file. If the process goes into a crash loop before
1261 // the file is deleted, the preallocated space will never get freed.
1262 GetLogSizeAndMaybeTruncate(wal_numbers.back(), true, nullptr)
1263 .PermitUncheckedError();
1264 }
1265 }
1266
1267 event_logger_.Log() << "job" << job_id << "event"
1268 << "recovery_finished";
1269
1270 return status;
1271 }
1272
GetLogSizeAndMaybeTruncate(uint64_t wal_number,bool truncate,LogFileNumberSize * log_ptr)1273 Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
1274 LogFileNumberSize* log_ptr) {
1275 LogFileNumberSize log(wal_number);
1276 std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
1277 Status s;
1278 // This gets the appear size of the wals, not including preallocated space.
1279 s = env_->GetFileSize(fname, &log.size);
1280 if (s.ok() && truncate) {
1281 std::unique_ptr<FSWritableFile> last_log;
1282 Status truncate_status = fs_->ReopenWritableFile(
1283 fname,
1284 fs_->OptimizeForLogWrite(
1285 file_options_,
1286 BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1287 &last_log, nullptr);
1288 if (truncate_status.ok()) {
1289 truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1290 }
1291 if (truncate_status.ok()) {
1292 truncate_status = last_log->Close(IOOptions(), nullptr);
1293 }
1294 // Not a critical error if fail to truncate.
1295 if (!truncate_status.ok()) {
1296 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1297 "Failed to truncate log #%" PRIu64 ": %s", wal_number,
1298 truncate_status.ToString().c_str());
1299 }
1300 }
1301 if (log_ptr) {
1302 *log_ptr = log;
1303 }
1304 return s;
1305 }
1306
RestoreAliveLogFiles(const std::vector<uint64_t> & wal_numbers)1307 Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
1308 if (wal_numbers.empty()) {
1309 return Status::OK();
1310 }
1311 Status s;
1312 mutex_.AssertHeld();
1313 assert(immutable_db_options_.avoid_flush_during_recovery);
1314 if (two_write_queues_) {
1315 log_write_mutex_.Lock();
1316 }
1317 // Mark these as alive so they'll be considered for deletion later by
1318 // FindObsoleteFiles()
1319 total_log_size_ = 0;
1320 log_empty_ = false;
1321 for (auto wal_number : wal_numbers) {
1322 // We preallocate space for wals, but then after a crash and restart, those
1323 // preallocated space are not needed anymore. It is likely only the last
1324 // log has such preallocated space, so we only truncate for the last log.
1325 LogFileNumberSize log;
1326 s = GetLogSizeAndMaybeTruncate(
1327 wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
1328 if (!s.ok()) {
1329 break;
1330 }
1331 total_log_size_ += log.size;
1332 alive_log_files_.push_back(log);
1333 }
1334 if (two_write_queues_) {
1335 log_write_mutex_.Unlock();
1336 }
1337 return s;
1338 }
1339
WriteLevel0TableForRecovery(int job_id,ColumnFamilyData * cfd,MemTable * mem,VersionEdit * edit)1340 Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1341 MemTable* mem, VersionEdit* edit) {
1342 mutex_.AssertHeld();
1343 const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
1344
1345 FileMetaData meta;
1346 std::vector<BlobFileAddition> blob_file_additions;
1347
1348 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1349 new std::list<uint64_t>::iterator(
1350 CaptureCurrentFileNumberInPendingOutputs()));
1351 meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1352 ReadOptions ro;
1353 ro.total_order_seek = true;
1354 Arena arena;
1355 Status s;
1356 TableProperties table_properties;
1357 {
1358 ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
1359 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1360 "[%s] [WriteLevel0TableForRecovery]"
1361 " Level-0 table #%" PRIu64 ": started",
1362 cfd->GetName().c_str(), meta.fd.GetNumber());
1363
1364 // Get the latest mutable cf options while the mutex is still locked
1365 const MutableCFOptions mutable_cf_options =
1366 *cfd->GetLatestMutableCFOptions();
1367 bool paranoid_file_checks =
1368 cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1369
1370 int64_t _current_time = 0;
1371 immutable_db_options_.clock->GetCurrentTime(&_current_time)
1372 .PermitUncheckedError(); // ignore error
1373 const uint64_t current_time = static_cast<uint64_t>(_current_time);
1374 meta.oldest_ancester_time = current_time;
1375
1376 {
1377 auto write_hint = cfd->CalculateSSTWriteHint(0);
1378 mutex_.Unlock();
1379
1380 SequenceNumber earliest_write_conflict_snapshot;
1381 std::vector<SequenceNumber> snapshot_seqs =
1382 snapshots_.GetAll(&earliest_write_conflict_snapshot);
1383 auto snapshot_checker = snapshot_checker_.get();
1384 if (use_custom_gc_ && snapshot_checker == nullptr) {
1385 snapshot_checker = DisableGCSnapshotChecker::Instance();
1386 }
1387 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1388 range_del_iters;
1389 auto range_del_iter =
1390 mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
1391 if (range_del_iter != nullptr) {
1392 range_del_iters.emplace_back(range_del_iter);
1393 }
1394
1395 IOStatus io_s;
1396 TableBuilderOptions tboptions(
1397 *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
1398 cfd->int_tbl_prop_collector_factories(),
1399 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1400 mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
1401 0 /* level */, false /* is_bottommost */,
1402 TableFileCreationReason::kRecovery, current_time,
1403 0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_,
1404 db_session_id_, 0 /* target_file_size */);
1405 s = BuildTable(
1406 dbname_, versions_.get(), immutable_db_options_, tboptions,
1407 file_options_for_compaction_, cfd->table_cache(), iter.get(),
1408 std::move(range_del_iters), &meta, &blob_file_additions,
1409 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1410 paranoid_file_checks, cfd->internal_stats(), &io_s, io_tracer_,
1411 &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
1412 write_hint, nullptr /*full_history_ts_low*/, &blob_callback_);
1413 LogFlush(immutable_db_options_.info_log);
1414 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1415 "[%s] [WriteLevel0TableForRecovery]"
1416 " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1417 cfd->GetName().c_str(), meta.fd.GetNumber(),
1418 meta.fd.GetFileSize(), s.ToString().c_str());
1419 mutex_.Lock();
1420
1421 io_s.PermitUncheckedError(); // TODO(AR) is this correct, or should we
1422 // return io_s if not ok()?
1423 }
1424 }
1425 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1426
1427 // Note that if file_size is zero, the file has been deleted and
1428 // should not be added to the manifest.
1429 const bool has_output = meta.fd.GetFileSize() > 0;
1430
1431 constexpr int level = 0;
1432
1433 if (s.ok() && has_output) {
1434 edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1435 meta.fd.GetFileSize(), meta.smallest, meta.largest,
1436 meta.fd.smallest_seqno, meta.fd.largest_seqno,
1437 meta.marked_for_compaction, meta.oldest_blob_file_number,
1438 meta.oldest_ancester_time, meta.file_creation_time,
1439 meta.file_checksum, meta.file_checksum_func_name);
1440
1441 for (const auto& blob : blob_file_additions) {
1442 edit->AddBlobFile(blob);
1443 }
1444 }
1445
1446 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1447 stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
1448
1449 if (has_output) {
1450 stats.bytes_written = meta.fd.GetFileSize();
1451 stats.num_output_files = 1;
1452 }
1453
1454 const auto& blobs = edit->GetBlobFileAdditions();
1455 for (const auto& blob : blobs) {
1456 stats.bytes_written_blob += blob.GetTotalBlobBytes();
1457 }
1458
1459 stats.num_output_files_blob = static_cast<int>(blobs.size());
1460
1461 cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1462 cfd->internal_stats()->AddCFStats(
1463 InternalStats::BYTES_FLUSHED,
1464 stats.bytes_written + stats.bytes_written_blob);
1465 RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1466 return s;
1467 }
1468
Open(const Options & options,const std::string & dbname,DB ** dbptr)1469 Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1470 DBOptions db_options(options);
1471 ColumnFamilyOptions cf_options(options);
1472 std::vector<ColumnFamilyDescriptor> column_families;
1473 column_families.push_back(
1474 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
1475 if (db_options.persist_stats_to_disk) {
1476 column_families.push_back(
1477 ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName, cf_options));
1478 }
1479 std::vector<ColumnFamilyHandle*> handles;
1480 Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1481 if (s.ok()) {
1482 if (db_options.persist_stats_to_disk) {
1483 assert(handles.size() == 2);
1484 } else {
1485 assert(handles.size() == 1);
1486 }
1487 // i can delete the handle since DBImpl is always holding a reference to
1488 // default column family
1489 if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1490 delete handles[1];
1491 }
1492 delete handles[0];
1493 }
1494 return s;
1495 }
1496
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr)1497 Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1498 const std::vector<ColumnFamilyDescriptor>& column_families,
1499 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1500 const bool kSeqPerBatch = true;
1501 const bool kBatchPerTxn = true;
1502 return DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1503 !kSeqPerBatch, kBatchPerTxn);
1504 }
1505
CreateWAL(uint64_t log_file_num,uint64_t recycle_log_number,size_t preallocate_block_size,log::Writer ** new_log)1506 IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
1507 size_t preallocate_block_size,
1508 log::Writer** new_log) {
1509 IOStatus io_s;
1510 std::unique_ptr<FSWritableFile> lfile;
1511
1512 DBOptions db_options =
1513 BuildDBOptions(immutable_db_options_, mutable_db_options_);
1514 FileOptions opt_file_options =
1515 fs_->OptimizeForLogWrite(file_options_, db_options);
1516 std::string log_fname =
1517 LogFileName(immutable_db_options_.wal_dir, log_file_num);
1518
1519 if (recycle_log_number) {
1520 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1521 "reusing log %" PRIu64 " from recycle list\n",
1522 recycle_log_number);
1523 std::string old_log_fname =
1524 LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
1525 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1526 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1527 io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1528 &lfile, /*dbg=*/nullptr);
1529 } else {
1530 io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1531 }
1532
1533 if (io_s.ok()) {
1534 lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1535 lfile->SetPreallocationBlockSize(preallocate_block_size);
1536
1537 const auto& listeners = immutable_db_options_.listeners;
1538 FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
1539 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1540 std::move(lfile), log_fname, opt_file_options,
1541 immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
1542 nullptr, tmp_set.Contains(FileType::kWalFile)));
1543 *new_log = new log::Writer(std::move(file_writer), log_file_num,
1544 immutable_db_options_.recycle_log_file_num > 0,
1545 immutable_db_options_.manual_wal_flush);
1546 }
1547 return io_s;
1548 }
1549
Open(const DBOptions & db_options,const std::string & dbname,const std::vector<ColumnFamilyDescriptor> & column_families,std::vector<ColumnFamilyHandle * > * handles,DB ** dbptr,const bool seq_per_batch,const bool batch_per_txn)1550 Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
1551 const std::vector<ColumnFamilyDescriptor>& column_families,
1552 std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1553 const bool seq_per_batch, const bool batch_per_txn) {
1554 Status s = ValidateOptionsByTable(db_options, column_families);
1555 if (!s.ok()) {
1556 return s;
1557 }
1558
1559 s = ValidateOptions(db_options, column_families);
1560 if (!s.ok()) {
1561 return s;
1562 }
1563
1564 *dbptr = nullptr;
1565 handles->clear();
1566
1567 size_t max_write_buffer_size = 0;
1568 for (auto cf : column_families) {
1569 max_write_buffer_size =
1570 std::max(max_write_buffer_size, cf.options.write_buffer_size);
1571 }
1572
1573 DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
1574 s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
1575 if (s.ok()) {
1576 std::vector<std::string> paths;
1577 for (auto& db_path : impl->immutable_db_options_.db_paths) {
1578 paths.emplace_back(db_path.path);
1579 }
1580 for (auto& cf : column_families) {
1581 for (auto& cf_path : cf.options.cf_paths) {
1582 paths.emplace_back(cf_path.path);
1583 }
1584 }
1585 for (auto& path : paths) {
1586 s = impl->env_->CreateDirIfMissing(path);
1587 if (!s.ok()) {
1588 break;
1589 }
1590 }
1591
1592 // For recovery from NoSpace() error, we can only handle
1593 // the case where the database is stored in a single path
1594 if (paths.size() <= 1) {
1595 impl->error_handler_.EnableAutoRecovery();
1596 }
1597 }
1598 if (s.ok()) {
1599 s = impl->CreateArchivalDirectory();
1600 }
1601 if (!s.ok()) {
1602 delete impl;
1603 return s;
1604 }
1605
1606 impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
1607
1608 impl->mutex_.Lock();
1609 // Handles create_if_missing, error_if_exists
1610 uint64_t recovered_seq(kMaxSequenceNumber);
1611 s = impl->Recover(column_families, false, false, false, &recovered_seq);
1612 if (s.ok()) {
1613 uint64_t new_log_number = impl->versions_->NewFileNumber();
1614 log::Writer* new_log = nullptr;
1615 const size_t preallocate_block_size =
1616 impl->GetWalPreallocateBlockSize(max_write_buffer_size);
1617 s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,
1618 preallocate_block_size, &new_log);
1619 if (s.ok()) {
1620 InstrumentedMutexLock wl(&impl->log_write_mutex_);
1621 impl->logfile_number_ = new_log_number;
1622 assert(new_log != nullptr);
1623 assert(impl->logs_.empty());
1624 impl->logs_.emplace_back(new_log_number, new_log);
1625 }
1626
1627 if (s.ok()) {
1628 // set column family handles
1629 for (auto cf : column_families) {
1630 auto cfd =
1631 impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
1632 if (cfd != nullptr) {
1633 handles->push_back(
1634 new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
1635 impl->NewThreadStatusCfInfo(cfd);
1636 } else {
1637 if (db_options.create_missing_column_families) {
1638 // missing column family, create it
1639 ColumnFamilyHandle* handle;
1640 impl->mutex_.Unlock();
1641 s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
1642 impl->mutex_.Lock();
1643 if (s.ok()) {
1644 handles->push_back(handle);
1645 } else {
1646 break;
1647 }
1648 } else {
1649 s = Status::InvalidArgument("Column family not found", cf.name);
1650 break;
1651 }
1652 }
1653 }
1654 }
1655 if (s.ok()) {
1656 SuperVersionContext sv_context(/* create_superversion */ true);
1657 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1658 impl->InstallSuperVersionAndScheduleWork(
1659 cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
1660 }
1661 sv_context.Clean();
1662 if (impl->two_write_queues_) {
1663 impl->log_write_mutex_.Lock();
1664 }
1665 impl->alive_log_files_.push_back(
1666 DBImpl::LogFileNumberSize(impl->logfile_number_));
1667 if (impl->two_write_queues_) {
1668 impl->log_write_mutex_.Unlock();
1669 }
1670
1671 impl->DeleteObsoleteFiles();
1672 s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
1673 }
1674 if (s.ok()) {
1675 // In WritePrepared there could be gap in sequence numbers. This breaks
1676 // the trick we use in kPointInTimeRecovery which assumes the first seq in
1677 // the log right after the corrupted log is one larger than the last seq
1678 // we read from the wals. To let this trick keep working, we add a dummy
1679 // entry with the expected sequence to the first log right after recovery.
1680 // In non-WritePrepared case also the new log after recovery could be
1681 // empty, and thus missing the consecutive seq hint to distinguish
1682 // middle-log corruption to corrupted-log-remained-after-recovery. This
1683 // case also will be addressed by a dummy write.
1684 if (recovered_seq != kMaxSequenceNumber) {
1685 WriteBatch empty_batch;
1686 WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
1687 WriteOptions write_options;
1688 uint64_t log_used, log_size;
1689 log::Writer* log_writer = impl->logs_.back().writer;
1690 s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
1691 if (s.ok()) {
1692 // Need to fsync, otherwise it might get lost after a power reset.
1693 s = impl->FlushWAL(false);
1694 if (s.ok()) {
1695 s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
1696 }
1697 }
1698 }
1699 }
1700 }
1701 if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
1702 // try to read format version
1703 s = impl->PersistentStatsProcessFormatVersion();
1704 }
1705
1706 if (s.ok()) {
1707 for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
1708 if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1709 auto* vstorage = cfd->current()->storage_info();
1710 for (int i = 1; i < vstorage->num_levels(); ++i) {
1711 int num_files = vstorage->NumLevelFiles(i);
1712 if (num_files > 0) {
1713 s = Status::InvalidArgument(
1714 "Not all files are at level 0. Cannot "
1715 "open with FIFO compaction style.");
1716 break;
1717 }
1718 }
1719 }
1720 if (!cfd->mem()->IsSnapshotSupported()) {
1721 impl->is_snapshot_supported_ = false;
1722 }
1723 if (cfd->ioptions()->merge_operator != nullptr &&
1724 !cfd->mem()->IsMergeOperatorSupported()) {
1725 s = Status::InvalidArgument(
1726 "The memtable of column family %s does not support merge operator "
1727 "its options.merge_operator is non-null",
1728 cfd->GetName().c_str());
1729 }
1730 if (!s.ok()) {
1731 break;
1732 }
1733 }
1734 }
1735 TEST_SYNC_POINT("DBImpl::Open:Opened");
1736 Status persist_options_status;
1737 if (s.ok()) {
1738 // Persist RocksDB Options before scheduling the compaction.
1739 // The WriteOptionsFile() will release and lock the mutex internally.
1740 persist_options_status = impl->WriteOptionsFile(
1741 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1742
1743 *dbptr = impl;
1744 impl->opened_successfully_ = true;
1745 impl->MaybeScheduleFlushOrCompaction();
1746 } else {
1747 persist_options_status.PermitUncheckedError();
1748 }
1749 impl->mutex_.Unlock();
1750
1751 #ifndef ROCKSDB_LITE
1752 auto sfm = static_cast<SstFileManagerImpl*>(
1753 impl->immutable_db_options_.sst_file_manager.get());
1754 if (s.ok() && sfm) {
1755 // Set Statistics ptr for SstFileManager to dump the stats of
1756 // DeleteScheduler.
1757 sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
1758 ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
1759 "SstFileManager instance %p", sfm);
1760
1761 // Notify SstFileManager about all sst files that already exist in
1762 // db_paths[0] and cf_paths[0] when the DB is opened.
1763
1764 // SstFileManagerImpl needs to know sizes of the files. For files whose size
1765 // we already know (sst files that appear in manifest - typically that's the
1766 // vast majority of all files), we'll pass the size to SstFileManager.
1767 // For all other files SstFileManager will query the size from filesystem.
1768
1769 std::vector<LiveFileMetaData> metadata;
1770
1771 // TODO: Once GetLiveFilesMetaData supports blob files, update the logic
1772 // below to get known_file_sizes for blob files.
1773 impl->mutex_.Lock();
1774 impl->versions_->GetLiveFilesMetaData(&metadata);
1775 impl->mutex_.Unlock();
1776
1777 std::unordered_map<std::string, uint64_t> known_file_sizes;
1778 for (const auto& md : metadata) {
1779 std::string name = md.name;
1780 if (!name.empty() && name[0] == '/') {
1781 name = name.substr(1);
1782 }
1783 known_file_sizes[name] = md.size;
1784 }
1785
1786 std::vector<std::string> paths;
1787 paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
1788 for (auto& cf : column_families) {
1789 if (!cf.options.cf_paths.empty()) {
1790 paths.emplace_back(cf.options.cf_paths[0].path);
1791 }
1792 }
1793 // Remove duplicate paths.
1794 std::sort(paths.begin(), paths.end());
1795 paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
1796 for (auto& path : paths) {
1797 std::vector<std::string> existing_files;
1798 impl->immutable_db_options_.env->GetChildren(path, &existing_files)
1799 .PermitUncheckedError(); //**TODO: What do to on error?
1800 for (auto& file_name : existing_files) {
1801 uint64_t file_number;
1802 FileType file_type;
1803 std::string file_path = path + "/" + file_name;
1804 if (ParseFileName(file_name, &file_number, &file_type) &&
1805 (file_type == kTableFile || file_type == kBlobFile)) {
1806 // TODO: Check for errors from OnAddFile?
1807 if (known_file_sizes.count(file_name)) {
1808 // We're assuming that each sst file name exists in at most one of
1809 // the paths.
1810 sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
1811 .PermitUncheckedError();
1812 } else {
1813 sfm->OnAddFile(file_path).PermitUncheckedError();
1814 }
1815 }
1816 }
1817 }
1818
1819 // Reserve some disk buffer space. This is a heuristic - when we run out
1820 // of disk space, this ensures that there is atleast write_buffer_size
1821 // amount of free space before we resume DB writes. In low disk space
1822 // conditions, we want to avoid a lot of small L0 files due to frequent
1823 // WAL write failures and resultant forced flushes
1824 sfm->ReserveDiskBuffer(max_write_buffer_size,
1825 impl->immutable_db_options_.db_paths[0].path);
1826 }
1827
1828 #endif // !ROCKSDB_LITE
1829
1830 if (s.ok()) {
1831 ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
1832 impl);
1833 LogFlush(impl->immutable_db_options_.info_log);
1834 assert(impl->TEST_WALBufferIsEmpty());
1835 // If the assert above fails then we need to FlushWAL before returning
1836 // control back to the user.
1837 if (!persist_options_status.ok()) {
1838 s = Status::IOError(
1839 "DB::Open() failed --- Unable to persist Options file",
1840 persist_options_status.ToString());
1841 }
1842 } else {
1843 ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
1844 "Persisting Option File error: %s",
1845 persist_options_status.ToString().c_str());
1846 }
1847 if (s.ok()) {
1848 impl->StartPeriodicWorkScheduler();
1849 } else {
1850 for (auto* h : *handles) {
1851 delete h;
1852 }
1853 handles->clear();
1854 delete impl;
1855 *dbptr = nullptr;
1856 }
1857 return s;
1858 }
1859 } // namespace ROCKSDB_NAMESPACE
1860