1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef ROCKSDB_LITE
11
12 #include <stdlib.h>
13 #include <algorithm>
14 #include <atomic>
15 #include <cinttypes>
16 #include <functional>
17 #include <future>
18 #include <limits>
19 #include <map>
20 #include <mutex>
21 #include <sstream>
22 #include <string>
23 #include <thread>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <vector>
27
28 #include "env/composite_env_wrapper.h"
29 #include "file/filename.h"
30 #include "file/sequence_file_reader.h"
31 #include "file/writable_file_writer.h"
32 #include "logging/logging.h"
33 #include "port/port.h"
34 #include "rocksdb/rate_limiter.h"
35 #include "rocksdb/transaction_log.h"
36 #include "rocksdb/utilities/backupable_db.h"
37 #include "test_util/sync_point.h"
38 #include "util/channel.h"
39 #include "util/coding.h"
40 #include "util/crc32c.h"
41 #include "util/string_util.h"
42 #include "utilities/checkpoint/checkpoint_impl.h"
43
44 namespace ROCKSDB_NAMESPACE {
45
IncrementNumberSuccessBackup()46 void BackupStatistics::IncrementNumberSuccessBackup() {
47 number_success_backup++;
48 }
IncrementNumberFailBackup()49 void BackupStatistics::IncrementNumberFailBackup() {
50 number_fail_backup++;
51 }
52
GetNumberSuccessBackup() const53 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
54 return number_success_backup;
55 }
GetNumberFailBackup() const56 uint32_t BackupStatistics::GetNumberFailBackup() const {
57 return number_fail_backup;
58 }
59
ToString() const60 std::string BackupStatistics::ToString() const {
61 char result[50];
62 snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
63 GetNumberSuccessBackup(), GetNumberFailBackup());
64 return result;
65 }
66
Dump(Logger * logger) const67 void BackupableDBOptions::Dump(Logger* logger) const {
68 ROCKS_LOG_INFO(logger, " Options.backup_dir: %s",
69 backup_dir.c_str());
70 ROCKS_LOG_INFO(logger, " Options.backup_env: %p", backup_env);
71 ROCKS_LOG_INFO(logger, " Options.share_table_files: %d",
72 static_cast<int>(share_table_files));
73 ROCKS_LOG_INFO(logger, " Options.info_log: %p", info_log);
74 ROCKS_LOG_INFO(logger, " Options.sync: %d",
75 static_cast<int>(sync));
76 ROCKS_LOG_INFO(logger, " Options.destroy_old_data: %d",
77 static_cast<int>(destroy_old_data));
78 ROCKS_LOG_INFO(logger, " Options.backup_log_files: %d",
79 static_cast<int>(backup_log_files));
80 ROCKS_LOG_INFO(logger, " Options.backup_rate_limit: %" PRIu64,
81 backup_rate_limit);
82 ROCKS_LOG_INFO(logger, " Options.restore_rate_limit: %" PRIu64,
83 restore_rate_limit);
84 ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
85 max_background_operations);
86 }
87
88 // -------- BackupEngineImpl class ---------
89 class BackupEngineImpl : public BackupEngine {
90 public:
91 BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
92 bool read_only = false);
93 ~BackupEngineImpl() override;
94 Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
95 bool flush_before_backup = false,
96 std::function<void()> progress_callback =
__anonc0806a9f0102() 97 []() {}) override;
98 Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
99 Status DeleteBackup(BackupID backup_id) override;
StopBackup()100 void StopBackup() override {
101 stop_backup_.store(true, std::memory_order_release);
102 }
103 Status GarbageCollect() override;
104
105 // The returned BackupInfos are in chronological order, which means the
106 // latest backup comes last.
107 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
108 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
109 Status RestoreDBFromBackup(
110 BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
111 const RestoreOptions& restore_options = RestoreOptions()) override;
RestoreDBFromLatestBackup(const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options=RestoreOptions ())112 Status RestoreDBFromLatestBackup(
113 const std::string& db_dir, const std::string& wal_dir,
114 const RestoreOptions& restore_options = RestoreOptions()) override {
115 return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
116 restore_options);
117 }
118
119 Status VerifyBackup(BackupID backup_id) override;
120
121 Status Initialize();
122
123 private:
124 void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
125 Status DeleteBackupInternal(BackupID backup_id);
126
127 // Extends the "result" map with pathname->size mappings for the contents of
128 // "dir" in "env". Pathnames are prefixed with "dir".
129 Status InsertPathnameToSizeBytes(
130 const std::string& dir, Env* env,
131 std::unordered_map<std::string, uint64_t>* result);
132
133 struct FileInfo {
FileInfoROCKSDB_NAMESPACE::BackupEngineImpl::FileInfo134 FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
135 : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
136
137 FileInfo(const FileInfo&) = delete;
138 FileInfo& operator=(const FileInfo&) = delete;
139
140 int refs;
141 const std::string filename;
142 const uint64_t size;
143 const uint32_t checksum_value;
144 };
145
146 class BackupMeta {
147 public:
BackupMeta(const std::string & meta_filename,const std::string & meta_tmp_filename,std::unordered_map<std::string,std::shared_ptr<FileInfo>> * file_infos,Env * env)148 BackupMeta(
149 const std::string& meta_filename, const std::string& meta_tmp_filename,
150 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
151 Env* env)
152 : timestamp_(0),
153 sequence_number_(0),
154 size_(0),
155 meta_filename_(meta_filename),
156 meta_tmp_filename_(meta_tmp_filename),
157 file_infos_(file_infos),
158 env_(env) {}
159
160 BackupMeta(const BackupMeta&) = delete;
161 BackupMeta& operator=(const BackupMeta&) = delete;
162
~BackupMeta()163 ~BackupMeta() {}
164
RecordTimestamp()165 void RecordTimestamp() {
166 env_->GetCurrentTime(×tamp_);
167 }
GetTimestamp() const168 int64_t GetTimestamp() const {
169 return timestamp_;
170 }
GetSize() const171 uint64_t GetSize() const {
172 return size_;
173 }
GetNumberFiles()174 uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
SetSequenceNumber(uint64_t sequence_number)175 void SetSequenceNumber(uint64_t sequence_number) {
176 sequence_number_ = sequence_number;
177 }
GetSequenceNumber()178 uint64_t GetSequenceNumber() {
179 return sequence_number_;
180 }
181
GetAppMetadata() const182 const std::string& GetAppMetadata() const { return app_metadata_; }
183
SetAppMetadata(const std::string & app_metadata)184 void SetAppMetadata(const std::string& app_metadata) {
185 app_metadata_ = app_metadata;
186 }
187
188 Status AddFile(std::shared_ptr<FileInfo> file_info);
189
190 Status Delete(bool delete_meta = true);
191
Empty()192 bool Empty() {
193 return files_.empty();
194 }
195
GetFile(const std::string & filename) const196 std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
197 auto it = file_infos_->find(filename);
198 if (it == file_infos_->end())
199 return nullptr;
200 return it->second;
201 }
202
GetFiles()203 const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
204 return files_;
205 }
206
207 // @param abs_path_to_size Pre-fetched file sizes (bytes).
208 Status LoadFromFile(
209 const std::string& backup_dir,
210 const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
211 Status StoreToFile(bool sync);
212
GetInfoString()213 std::string GetInfoString() {
214 std::ostringstream ss;
215 ss << "Timestamp: " << timestamp_ << std::endl;
216 char human_size[16];
217 AppendHumanBytes(size_, human_size, sizeof(human_size));
218 ss << "Size: " << human_size << std::endl;
219 ss << "Files:" << std::endl;
220 for (const auto& file : files_) {
221 AppendHumanBytes(file->size, human_size, sizeof(human_size));
222 ss << file->filename << ", size " << human_size << ", refs "
223 << file->refs << std::endl;
224 }
225 return ss.str();
226 }
227
228 private:
229 int64_t timestamp_;
230 // sequence number is only approximate, should not be used
231 // by clients
232 uint64_t sequence_number_;
233 uint64_t size_;
234 std::string app_metadata_;
235 std::string const meta_filename_;
236 std::string const meta_tmp_filename_;
237 // files with relative paths (without "/" prefix!!)
238 std::vector<std::shared_ptr<FileInfo>> files_;
239 std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
240 Env* env_;
241
242 static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024; // 10MB
243 }; // BackupMeta
244
GetAbsolutePath(const std::string & relative_path="") const245 inline std::string GetAbsolutePath(
246 const std::string &relative_path = "") const {
247 assert(relative_path.size() == 0 || relative_path[0] != '/');
248 return options_.backup_dir + "/" + relative_path;
249 }
GetPrivateDirRel() const250 inline std::string GetPrivateDirRel() const {
251 return "private";
252 }
GetSharedChecksumDirRel() const253 inline std::string GetSharedChecksumDirRel() const {
254 return "shared_checksum";
255 }
GetPrivateFileRel(BackupID backup_id,bool tmp=false,const std::string & file="") const256 inline std::string GetPrivateFileRel(BackupID backup_id,
257 bool tmp = false,
258 const std::string& file = "") const {
259 assert(file.size() == 0 || file[0] != '/');
260 return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
261 (tmp ? ".tmp" : "") + "/" + file;
262 }
GetSharedFileRel(const std::string & file="",bool tmp=false) const263 inline std::string GetSharedFileRel(const std::string& file = "",
264 bool tmp = false) const {
265 assert(file.size() == 0 || file[0] != '/');
266 return std::string("shared/") + (tmp ? "." : "") + file +
267 (tmp ? ".tmp" : "");
268 }
GetSharedFileWithChecksumRel(const std::string & file="",bool tmp=false) const269 inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
270 bool tmp = false) const {
271 assert(file.size() == 0 || file[0] != '/');
272 return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
273 (tmp ? ".tmp" : "");
274 }
GetSharedFileWithChecksum(const std::string & file,const uint32_t checksum_value,const uint64_t file_size) const275 inline std::string GetSharedFileWithChecksum(const std::string& file,
276 const uint32_t checksum_value,
277 const uint64_t file_size) const {
278 assert(file.size() == 0 || file[0] != '/');
279 std::string file_copy = file;
280 return file_copy.insert(file_copy.find_last_of('.'),
281 "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) +
282 "_" + ROCKSDB_NAMESPACE::ToString(file_size));
283 }
GetFileFromChecksumFile(const std::string & file) const284 inline std::string GetFileFromChecksumFile(const std::string& file) const {
285 assert(file.size() == 0 || file[0] != '/');
286 std::string file_copy = file;
287 size_t first_underscore = file_copy.find_first_of('_');
288 return file_copy.erase(first_underscore,
289 file_copy.find_last_of('.') - first_underscore);
290 }
GetBackupMetaDir() const291 inline std::string GetBackupMetaDir() const {
292 return GetAbsolutePath("meta");
293 }
GetBackupMetaFile(BackupID backup_id,bool tmp) const294 inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
295 return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
296 ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
297 }
298
299 // If size_limit == 0, there is no size limit, copy everything.
300 //
301 // Exactly one of src and contents must be non-empty.
302 //
303 // @param src If non-empty, the file is copied from this pathname.
304 // @param contents If non-empty, the file will be created with these contents.
305 Status CopyOrCreateFile(const std::string& src, const std::string& dst,
306 const std::string& contents, Env* src_env,
307 Env* dst_env, const EnvOptions& src_env_options,
308 bool sync, RateLimiter* rate_limiter,
309 uint64_t* size = nullptr,
310 uint32_t* checksum_value = nullptr,
311 uint64_t size_limit = 0,
__anonc0806a9f0202() 312 std::function<void()> progress_callback = []() {});
313
314 Status CalculateChecksum(const std::string& src, Env* src_env,
315 const EnvOptions& src_env_options,
316 uint64_t size_limit, uint32_t* checksum_value);
317
318 struct CopyOrCreateResult {
319 uint64_t size;
320 uint32_t checksum_value;
321 Status status;
322 };
323
324 // Exactly one of src_path and contents must be non-empty. If src_path is
325 // non-empty, the file is copied from this pathname. Otherwise, if contents is
326 // non-empty, the file will be created at dst_path with these contents.
327 struct CopyOrCreateWorkItem {
328 std::string src_path;
329 std::string dst_path;
330 std::string contents;
331 Env* src_env;
332 Env* dst_env;
333 EnvOptions src_env_options;
334 bool sync;
335 RateLimiter* rate_limiter;
336 uint64_t size_limit;
337 std::promise<CopyOrCreateResult> result;
338 std::function<void()> progress_callback;
339
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem340 CopyOrCreateWorkItem()
341 : src_path(""),
342 dst_path(""),
343 contents(""),
344 src_env(nullptr),
345 dst_env(nullptr),
346 src_env_options(),
347 sync(false),
348 rate_limiter(nullptr),
349 size_limit(0) {}
350
351 CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
352 CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
353
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem354 CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
355 *this = std::move(o);
356 }
357
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem358 CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
359 src_path = std::move(o.src_path);
360 dst_path = std::move(o.dst_path);
361 contents = std::move(o.contents);
362 src_env = o.src_env;
363 dst_env = o.dst_env;
364 src_env_options = std::move(o.src_env_options);
365 sync = o.sync;
366 rate_limiter = o.rate_limiter;
367 size_limit = o.size_limit;
368 result = std::move(o.result);
369 progress_callback = std::move(o.progress_callback);
370 return *this;
371 }
372
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem373 CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
374 std::string _contents, Env* _src_env, Env* _dst_env,
375 EnvOptions _src_env_options, bool _sync,
376 RateLimiter* _rate_limiter, uint64_t _size_limit,
377 std::function<void()> _progress_callback = []() {})
378 : src_path(std::move(_src_path)),
379 dst_path(std::move(_dst_path)),
380 contents(std::move(_contents)),
381 src_env(_src_env),
382 dst_env(_dst_env),
383 src_env_options(std::move(_src_env_options)),
384 sync(_sync),
385 rate_limiter(_rate_limiter),
386 size_limit(_size_limit),
387 progress_callback(_progress_callback) {}
388 };
389
390 struct BackupAfterCopyOrCreateWorkItem {
391 std::future<CopyOrCreateResult> result;
392 bool shared;
393 bool needed_to_copy;
394 Env* backup_env;
395 std::string dst_path_tmp;
396 std::string dst_path;
397 std::string dst_relative;
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem398 BackupAfterCopyOrCreateWorkItem()
399 : shared(false),
400 needed_to_copy(false),
401 backup_env(nullptr),
402 dst_path_tmp(""),
403 dst_path(""),
404 dst_relative("") {}
405
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem406 BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
407 ROCKSDB_NOEXCEPT {
408 *this = std::move(o);
409 }
410
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem411 BackupAfterCopyOrCreateWorkItem& operator=(
412 BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
413 result = std::move(o.result);
414 shared = o.shared;
415 needed_to_copy = o.needed_to_copy;
416 backup_env = o.backup_env;
417 dst_path_tmp = std::move(o.dst_path_tmp);
418 dst_path = std::move(o.dst_path);
419 dst_relative = std::move(o.dst_relative);
420 return *this;
421 }
422
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem423 BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
424 bool _shared, bool _needed_to_copy,
425 Env* _backup_env, std::string _dst_path_tmp,
426 std::string _dst_path,
427 std::string _dst_relative)
428 : result(std::move(_result)),
429 shared(_shared),
430 needed_to_copy(_needed_to_copy),
431 backup_env(_backup_env),
432 dst_path_tmp(std::move(_dst_path_tmp)),
433 dst_path(std::move(_dst_path)),
434 dst_relative(std::move(_dst_relative)) {}
435 };
436
437 struct RestoreAfterCopyOrCreateWorkItem {
438 std::future<CopyOrCreateResult> result;
439 uint32_t checksum_value;
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem440 RestoreAfterCopyOrCreateWorkItem()
441 : checksum_value(0) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem442 RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
443 uint32_t _checksum_value)
444 : result(std::move(_result)), checksum_value(_checksum_value) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem445 RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
446 ROCKSDB_NOEXCEPT {
447 *this = std::move(o);
448 }
449
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem450 RestoreAfterCopyOrCreateWorkItem& operator=(
451 RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
452 result = std::move(o.result);
453 checksum_value = o.checksum_value;
454 return *this;
455 }
456 };
457
458 bool initialized_;
459 std::mutex byte_report_mutex_;
460 channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
461 std::vector<port::Thread> threads_;
462 // Certain operations like PurgeOldBackups and DeleteBackup will trigger
463 // automatic GarbageCollect (true) unless we've already done one in this
464 // session and have not failed to delete backup files since then (false).
465 bool might_need_garbage_collect_ = true;
466
467 // Adds a file to the backup work queue to be copied or created if it doesn't
468 // already exist.
469 //
470 // Exactly one of src_dir and contents must be non-empty.
471 //
472 // @param src_dir If non-empty, the file in this directory named fname will be
473 // copied.
474 // @param fname Name of destination file and, in case of copy, source file.
475 // @param contents If non-empty, the file will be created with these contents.
476 Status AddBackupFileWorkItem(
477 std::unordered_set<std::string>& live_dst_paths,
478 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
479 BackupID backup_id, bool shared, const std::string& src_dir,
480 const std::string& fname, // starts with "/"
481 const EnvOptions& src_env_options, RateLimiter* rate_limiter,
482 uint64_t size_bytes, uint64_t size_limit = 0,
483 bool shared_checksum = false,
__anonc0806a9f0402() 484 std::function<void()> progress_callback = []() {},
485 const std::string& contents = std::string());
486
487 // backup state data
488 BackupID latest_backup_id_;
489 BackupID latest_valid_backup_id_;
490 std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
491 std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
492 corrupt_backups_;
493 std::unordered_map<std::string,
494 std::shared_ptr<FileInfo>> backuped_file_infos_;
495 std::atomic<bool> stop_backup_;
496
497 // options data
498 BackupableDBOptions options_;
499 Env* db_env_;
500 Env* backup_env_;
501
502 // directories
503 std::unique_ptr<Directory> backup_directory_;
504 std::unique_ptr<Directory> shared_directory_;
505 std::unique_ptr<Directory> meta_directory_;
506 std::unique_ptr<Directory> private_directory_;
507
508 static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB
509 size_t copy_file_buffer_size_;
510 bool read_only_;
511 BackupStatistics backup_statistics_;
512 static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
513 };
514
Open(Env * env,const BackupableDBOptions & options,BackupEngine ** backup_engine_ptr)515 Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
516 BackupEngine** backup_engine_ptr) {
517 std::unique_ptr<BackupEngineImpl> backup_engine(
518 new BackupEngineImpl(env, options));
519 auto s = backup_engine->Initialize();
520 if (!s.ok()) {
521 *backup_engine_ptr = nullptr;
522 return s;
523 }
524 *backup_engine_ptr = backup_engine.release();
525 return Status::OK();
526 }
527
BackupEngineImpl(Env * db_env,const BackupableDBOptions & options,bool read_only)528 BackupEngineImpl::BackupEngineImpl(Env* db_env,
529 const BackupableDBOptions& options,
530 bool read_only)
531 : initialized_(false),
532 latest_backup_id_(0),
533 latest_valid_backup_id_(0),
534 stop_backup_(false),
535 options_(options),
536 db_env_(db_env),
537 backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
538 copy_file_buffer_size_(kDefaultCopyFileBufferSize),
539 read_only_(read_only) {
540 if (options_.backup_rate_limiter == nullptr &&
541 options_.backup_rate_limit > 0) {
542 options_.backup_rate_limiter.reset(
543 NewGenericRateLimiter(options_.backup_rate_limit));
544 }
545 if (options_.restore_rate_limiter == nullptr &&
546 options_.restore_rate_limit > 0) {
547 options_.restore_rate_limiter.reset(
548 NewGenericRateLimiter(options_.restore_rate_limit));
549 }
550 }
551
~BackupEngineImpl()552 BackupEngineImpl::~BackupEngineImpl() {
553 files_to_copy_or_create_.sendEof();
554 for (auto& t : threads_) {
555 t.join();
556 }
557 LogFlush(options_.info_log);
558 }
559
Initialize()560 Status BackupEngineImpl::Initialize() {
561 assert(!initialized_);
562 initialized_ = true;
563 if (read_only_) {
564 ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
565 }
566 options_.Dump(options_.info_log);
567
568 if (!read_only_) {
569 // we might need to clean up from previous crash or I/O errors
570 might_need_garbage_collect_ = true;
571
572 if (options_.max_valid_backups_to_open != port::kMaxInt32) {
573 options_.max_valid_backups_to_open = port::kMaxInt32;
574 ROCKS_LOG_WARN(
575 options_.info_log,
576 "`max_valid_backups_to_open` is not set to the default value. Ignoring "
577 "its value since BackupEngine is not read-only.");
578 }
579
580 // gather the list of directories that we need to create
581 std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
582 directories;
583 directories.emplace_back(GetAbsolutePath(), &backup_directory_);
584 if (options_.share_table_files) {
585 if (options_.share_files_with_checksum) {
586 directories.emplace_back(
587 GetAbsolutePath(GetSharedFileWithChecksumRel()),
588 &shared_directory_);
589 } else {
590 directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
591 &shared_directory_);
592 }
593 }
594 directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
595 &private_directory_);
596 directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
597 // create all the dirs we need
598 for (const auto& d : directories) {
599 auto s = backup_env_->CreateDirIfMissing(d.first);
600 if (s.ok()) {
601 s = backup_env_->NewDirectory(d.first, d.second);
602 }
603 if (!s.ok()) {
604 return s;
605 }
606 }
607 }
608
609 std::vector<std::string> backup_meta_files;
610 {
611 auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
612 if (s.IsNotFound()) {
613 return Status::NotFound(GetBackupMetaDir() + " is missing");
614 } else if (!s.ok()) {
615 return s;
616 }
617 }
618 // create backups_ structure
619 for (auto& file : backup_meta_files) {
620 if (file == "." || file == "..") {
621 continue;
622 }
623 ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
624 BackupID backup_id = 0;
625 sscanf(file.c_str(), "%u", &backup_id);
626 if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
627 if (!read_only_) {
628 // invalid file name, delete that
629 auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
630 ROCKS_LOG_INFO(options_.info_log,
631 "Unrecognized meta file %s, deleting -- %s",
632 file.c_str(), s.ToString().c_str());
633 }
634 continue;
635 }
636 assert(backups_.find(backup_id) == backups_.end());
637 backups_.insert(std::make_pair(
638 backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
639 GetBackupMetaFile(backup_id, false /* tmp */),
640 GetBackupMetaFile(backup_id, true /* tmp */),
641 &backuped_file_infos_, backup_env_))));
642 }
643
644 latest_backup_id_ = 0;
645 latest_valid_backup_id_ = 0;
646 if (options_.destroy_old_data) { // Destroy old data
647 assert(!read_only_);
648 ROCKS_LOG_INFO(
649 options_.info_log,
650 "Backup Engine started with destroy_old_data == true, deleting all "
651 "backups");
652 auto s = PurgeOldBackups(0);
653 if (s.ok()) {
654 s = GarbageCollect();
655 }
656 if (!s.ok()) {
657 return s;
658 }
659 } else { // Load data from storage
660 std::unordered_map<std::string, uint64_t> abs_path_to_size;
661 for (const auto& rel_dir :
662 {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
663 const auto abs_dir = GetAbsolutePath(rel_dir);
664 InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
665 }
666 // load the backups if any, until valid_backups_to_open of the latest
667 // non-corrupted backups have been successfully opened.
668 int valid_backups_to_open = options_.max_valid_backups_to_open;
669 for (auto backup_iter = backups_.rbegin();
670 backup_iter != backups_.rend();
671 ++backup_iter) {
672 assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
673 if (latest_backup_id_ == 0) {
674 latest_backup_id_ = backup_iter->first;
675 }
676 if (valid_backups_to_open == 0) {
677 break;
678 }
679
680 InsertPathnameToSizeBytes(
681 GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
682 &abs_path_to_size);
683 Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
684 abs_path_to_size);
685 if (s.IsCorruption()) {
686 ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
687 backup_iter->first, s.ToString().c_str());
688 corrupt_backups_.insert(
689 std::make_pair(backup_iter->first,
690 std::make_pair(s, std::move(backup_iter->second))));
691 } else if (!s.ok()) {
692 // Distinguish corruption errors from errors in the backup Env.
693 // Errors in the backup Env (i.e., this code path) will cause Open() to
694 // fail, whereas corruption errors would not cause Open() failures.
695 return s;
696 } else {
697 ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
698 backup_iter->first,
699 backup_iter->second->GetInfoString().c_str());
700 assert(latest_valid_backup_id_ == 0 ||
701 latest_valid_backup_id_ > backup_iter->first);
702 if (latest_valid_backup_id_ == 0) {
703 latest_valid_backup_id_ = backup_iter->first;
704 }
705 --valid_backups_to_open;
706 }
707 }
708
709 for (const auto& corrupt : corrupt_backups_) {
710 backups_.erase(backups_.find(corrupt.first));
711 }
712 // erase the backups before max_valid_backups_to_open
713 int num_unopened_backups;
714 if (options_.max_valid_backups_to_open == 0) {
715 num_unopened_backups = 0;
716 } else {
717 num_unopened_backups =
718 std::max(0, static_cast<int>(backups_.size()) -
719 options_.max_valid_backups_to_open);
720 }
721 for (int i = 0; i < num_unopened_backups; ++i) {
722 assert(backups_.begin()->second->Empty());
723 backups_.erase(backups_.begin());
724 }
725 }
726
727 ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
728 ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
729 latest_valid_backup_id_);
730
731 // set up threads perform copies from files_to_copy_or_create_ in the
732 // background
733 for (int t = 0; t < options_.max_background_operations; t++) {
734 threads_.emplace_back([this]() {
735 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
736 #if __GLIBC_PREREQ(2, 12)
737 pthread_setname_np(pthread_self(), "backup_engine");
738 #endif
739 #endif
740 CopyOrCreateWorkItem work_item;
741 while (files_to_copy_or_create_.read(work_item)) {
742 CopyOrCreateResult result;
743 result.status = CopyOrCreateFile(
744 work_item.src_path, work_item.dst_path, work_item.contents,
745 work_item.src_env, work_item.dst_env, work_item.src_env_options,
746 work_item.sync, work_item.rate_limiter, &result.size,
747 &result.checksum_value, work_item.size_limit,
748 work_item.progress_callback);
749 work_item.result.set_value(std::move(result));
750 }
751 });
752 }
753 ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
754
755 return Status::OK();
756 }
757
CreateNewBackupWithMetadata(DB * db,const std::string & app_metadata,bool flush_before_backup,std::function<void ()> progress_callback)758 Status BackupEngineImpl::CreateNewBackupWithMetadata(
759 DB* db, const std::string& app_metadata, bool flush_before_backup,
760 std::function<void()> progress_callback) {
761 assert(initialized_);
762 assert(!read_only_);
763 if (app_metadata.size() > kMaxAppMetaSize) {
764 return Status::InvalidArgument("App metadata too large");
765 }
766
767 BackupID new_backup_id = latest_backup_id_ + 1;
768
769 assert(backups_.find(new_backup_id) == backups_.end());
770
771 auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
772 Status s = backup_env_->FileExists(private_dir);
773 if (s.ok()) {
774 // maybe last backup failed and left partial state behind, clean it up.
775 // need to do this before updating backups_ such that a private dir
776 // named after new_backup_id will be cleaned up.
777 // (If an incomplete new backup is followed by an incomplete delete
778 // of the latest full backup, then there could be more than one next
779 // id with a private dir, the last thing to be deleted in delete
780 // backup, but all will be cleaned up with a GarbageCollect.)
781 s = GarbageCollect();
782 } else if (s.IsNotFound()) {
783 // normal case, the new backup's private dir doesn't exist yet
784 s = Status::OK();
785 }
786
787 auto ret = backups_.insert(std::make_pair(
788 new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
789 GetBackupMetaFile(new_backup_id, false /* tmp */),
790 GetBackupMetaFile(new_backup_id, true /* tmp */),
791 &backuped_file_infos_, backup_env_))));
792 assert(ret.second == true);
793 auto& new_backup = ret.first->second;
794 new_backup->RecordTimestamp();
795 new_backup->SetAppMetadata(app_metadata);
796
797 auto start_backup = backup_env_->NowMicros();
798
799 ROCKS_LOG_INFO(options_.info_log,
800 "Started the backup process -- creating backup %u",
801 new_backup_id);
802 if (s.ok()) {
803 s = backup_env_->CreateDir(private_dir);
804 }
805
806 RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
807 if (rate_limiter) {
808 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
809 }
810
811 // A set into which we will insert the dst_paths that are calculated for live
812 // files and live WAL files.
813 // This is used to check whether a live files shares a dst_path with another
814 // live file.
815 std::unordered_set<std::string> live_dst_paths;
816
817 std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
818 // Add a CopyOrCreateWorkItem to the channel for each live file
819 db->DisableFileDeletions();
820 if (s.ok()) {
821 CheckpointImpl checkpoint(db);
822 uint64_t sequence_number = 0;
823 DBOptions db_options = db->GetDBOptions();
824 EnvOptions src_raw_env_options(db_options);
825 s = checkpoint.CreateCustomCheckpoint(
826 db_options,
827 [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
828 FileType) {
829 // custom checkpoint will switch to calling copy_file_cb after it sees
830 // NotSupported returned from link_file_cb.
831 return Status::NotSupported();
832 } /* link_file_cb */,
833 [&](const std::string& src_dirname, const std::string& fname,
834 uint64_t size_limit_bytes, FileType type) {
835 if (type == kLogFile && !options_.backup_log_files) {
836 return Status::OK();
837 }
838 Log(options_.info_log, "add file for backup %s", fname.c_str());
839 uint64_t size_bytes = 0;
840 Status st;
841 if (type == kTableFile) {
842 st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
843 }
844 EnvOptions src_env_options;
845 switch (type) {
846 case kLogFile:
847 src_env_options =
848 db_env_->OptimizeForLogRead(src_raw_env_options);
849 break;
850 case kTableFile:
851 src_env_options = db_env_->OptimizeForCompactionTableRead(
852 src_raw_env_options, ImmutableDBOptions(db_options));
853 break;
854 case kDescriptorFile:
855 src_env_options =
856 db_env_->OptimizeForManifestRead(src_raw_env_options);
857 break;
858 default:
859 // Other backed up files (like options file) are not read by live
860 // DB, so don't need to worry about avoiding mixing buffered and
861 // direct I/O. Just use plain defaults.
862 src_env_options = src_raw_env_options;
863 break;
864 }
865 if (st.ok()) {
866 st = AddBackupFileWorkItem(
867 live_dst_paths, backup_items_to_finish, new_backup_id,
868 options_.share_table_files && type == kTableFile, src_dirname,
869 fname, src_env_options, rate_limiter, size_bytes,
870 size_limit_bytes,
871 options_.share_files_with_checksum && type == kTableFile,
872 progress_callback);
873 }
874 return st;
875 } /* copy_file_cb */,
876 [&](const std::string& fname, const std::string& contents, FileType) {
877 Log(options_.info_log, "add file for backup %s", fname.c_str());
878 return AddBackupFileWorkItem(
879 live_dst_paths, backup_items_to_finish, new_backup_id,
880 false /* shared */, "" /* src_dir */, fname,
881 EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
882 0 /* size_limit */, false /* shared_checksum */,
883 progress_callback, contents);
884 } /* create_file_cb */,
885 &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
886 if (s.ok()) {
887 new_backup->SetSequenceNumber(sequence_number);
888 }
889 }
890 ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
891 Status item_status;
892 for (auto& item : backup_items_to_finish) {
893 item.result.wait();
894 auto result = item.result.get();
895 item_status = result.status;
896 if (item_status.ok() && item.shared && item.needed_to_copy) {
897 item_status = item.backup_env->RenameFile(item.dst_path_tmp,
898 item.dst_path);
899 }
900 if (item_status.ok()) {
901 item_status = new_backup.get()->AddFile(
902 std::make_shared<FileInfo>(item.dst_relative,
903 result.size,
904 result.checksum_value));
905 }
906 if (!item_status.ok()) {
907 s = item_status;
908 }
909 }
910
911 // we copied all the files, enable file deletions
912 db->EnableFileDeletions(false);
913
914 auto backup_time = backup_env_->NowMicros() - start_backup;
915
916 if (s.ok()) {
917 // persist the backup metadata on the disk
918 s = new_backup->StoreToFile(options_.sync);
919 }
920 if (s.ok() && options_.sync) {
921 std::unique_ptr<Directory> backup_private_directory;
922 backup_env_->NewDirectory(
923 GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
924 &backup_private_directory);
925 if (backup_private_directory != nullptr) {
926 s = backup_private_directory->Fsync();
927 }
928 if (s.ok() && private_directory_ != nullptr) {
929 s = private_directory_->Fsync();
930 }
931 if (s.ok() && meta_directory_ != nullptr) {
932 s = meta_directory_->Fsync();
933 }
934 if (s.ok() && shared_directory_ != nullptr) {
935 s = shared_directory_->Fsync();
936 }
937 if (s.ok() && backup_directory_ != nullptr) {
938 s = backup_directory_->Fsync();
939 }
940 }
941
942 if (s.ok()) {
943 backup_statistics_.IncrementNumberSuccessBackup();
944 }
945 if (!s.ok()) {
946 backup_statistics_.IncrementNumberFailBackup();
947 // clean all the files we might have created
948 ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
949 s.ToString().c_str());
950 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
951 backup_statistics_.ToString().c_str());
952 // delete files that we might have already written
953 might_need_garbage_collect_ = true;
954 DeleteBackup(new_backup_id);
955 return s;
956 }
957
958 // here we know that we succeeded and installed the new backup
959 // in the LATEST_BACKUP file
960 latest_backup_id_ = new_backup_id;
961 latest_valid_backup_id_ = new_backup_id;
962 ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
963
964 // backup_speed is in byte/second
965 double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
966 ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
967 new_backup->GetNumberFiles());
968 char human_size[16];
969 AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
970 ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
971 ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
972 backup_time);
973 ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
974 ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
975 backup_statistics_.ToString().c_str());
976 return s;
977 }
978
PurgeOldBackups(uint32_t num_backups_to_keep)979 Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
980 assert(initialized_);
981 assert(!read_only_);
982
983 // Best effort deletion even with errors
984 Status overall_status = Status::OK();
985
986 ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
987 num_backups_to_keep);
988 std::vector<BackupID> to_delete;
989 auto itr = backups_.begin();
990 while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
991 to_delete.push_back(itr->first);
992 itr++;
993 }
994 for (auto backup_id : to_delete) {
995 auto s = DeleteBackupInternal(backup_id);
996 if (!s.ok()) {
997 overall_status = s;
998 }
999 }
1000 // Clean up after any incomplete backup deletion, potentially from
1001 // earlier session.
1002 if (might_need_garbage_collect_) {
1003 auto s = GarbageCollect();
1004 if (!s.ok() && overall_status.ok()) {
1005 overall_status = s;
1006 }
1007 }
1008 return overall_status;
1009 }
1010
DeleteBackup(BackupID backup_id)1011 Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
1012 auto s1 = DeleteBackupInternal(backup_id);
1013 auto s2 = Status::OK();
1014
1015 // Clean up after any incomplete backup deletion, potentially from
1016 // earlier session.
1017 if (might_need_garbage_collect_) {
1018 s2 = GarbageCollect();
1019 }
1020
1021 if (!s1.ok()) {
1022 return s1;
1023 } else {
1024 return s2;
1025 }
1026 }
1027
1028 // Does not auto-GarbageCollect
DeleteBackupInternal(BackupID backup_id)1029 Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
1030 assert(initialized_);
1031 assert(!read_only_);
1032
1033 ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
1034 auto backup = backups_.find(backup_id);
1035 if (backup != backups_.end()) {
1036 auto s = backup->second->Delete();
1037 if (!s.ok()) {
1038 return s;
1039 }
1040 backups_.erase(backup);
1041 } else {
1042 auto corrupt = corrupt_backups_.find(backup_id);
1043 if (corrupt == corrupt_backups_.end()) {
1044 return Status::NotFound("Backup not found");
1045 }
1046 auto s = corrupt->second.second->Delete();
1047 if (!s.ok()) {
1048 return s;
1049 }
1050 corrupt_backups_.erase(corrupt);
1051 }
1052
1053 // After removing meta file, best effort deletion even with errors.
1054 // (Don't delete other files if we can't delete the meta file right
1055 // now.)
1056 std::vector<std::string> to_delete;
1057 for (auto& itr : backuped_file_infos_) {
1058 if (itr.second->refs == 0) {
1059 Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
1060 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
1061 s.ToString().c_str());
1062 to_delete.push_back(itr.first);
1063 if (!s.ok()) {
1064 // Trying again later might work
1065 might_need_garbage_collect_ = true;
1066 }
1067 }
1068 }
1069 for (auto& td : to_delete) {
1070 backuped_file_infos_.erase(td);
1071 }
1072
1073 // take care of private dirs -- GarbageCollect() will take care of them
1074 // if they are not empty
1075 std::string private_dir = GetPrivateFileRel(backup_id);
1076 Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
1077 ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
1078 private_dir.c_str(), s.ToString().c_str());
1079 if (!s.ok()) {
1080 // Full gc or trying again later might work
1081 might_need_garbage_collect_ = true;
1082 }
1083 return Status::OK();
1084 }
1085
GetBackupInfo(std::vector<BackupInfo> * backup_info)1086 void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1087 assert(initialized_);
1088 backup_info->reserve(backups_.size());
1089 for (auto& backup : backups_) {
1090 if (!backup.second->Empty()) {
1091 backup_info->push_back(BackupInfo(
1092 backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
1093 backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
1094 }
1095 }
1096 }
1097
1098 void
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1099 BackupEngineImpl::GetCorruptedBackups(
1100 std::vector<BackupID>* corrupt_backup_ids) {
1101 assert(initialized_);
1102 corrupt_backup_ids->reserve(corrupt_backups_.size());
1103 for (auto& backup : corrupt_backups_) {
1104 corrupt_backup_ids->push_back(backup.first);
1105 }
1106 }
1107
RestoreDBFromBackup(BackupID backup_id,const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options)1108 Status BackupEngineImpl::RestoreDBFromBackup(
1109 BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
1110 const RestoreOptions& restore_options) {
1111 assert(initialized_);
1112 auto corrupt_itr = corrupt_backups_.find(backup_id);
1113 if (corrupt_itr != corrupt_backups_.end()) {
1114 return corrupt_itr->second.first;
1115 }
1116 auto backup_itr = backups_.find(backup_id);
1117 if (backup_itr == backups_.end()) {
1118 return Status::NotFound("Backup not found");
1119 }
1120 auto& backup = backup_itr->second;
1121 if (backup->Empty()) {
1122 return Status::NotFound("Backup not found");
1123 }
1124
1125 ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
1126 ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1127 static_cast<int>(restore_options.keep_log_files));
1128
1129 // just in case. Ignore errors
1130 db_env_->CreateDirIfMissing(db_dir);
1131 db_env_->CreateDirIfMissing(wal_dir);
1132
1133 if (restore_options.keep_log_files) {
1134 // delete files in db_dir, but keep all the log files
1135 DeleteChildren(db_dir, 1 << kLogFile);
1136 // move all the files from archive dir to wal_dir
1137 std::string archive_dir = ArchivalDirectory(wal_dir);
1138 std::vector<std::string> archive_files;
1139 db_env_->GetChildren(archive_dir, &archive_files); // ignore errors
1140 for (const auto& f : archive_files) {
1141 uint64_t number;
1142 FileType type;
1143 bool ok = ParseFileName(f, &number, &type);
1144 if (ok && type == kLogFile) {
1145 ROCKS_LOG_INFO(options_.info_log,
1146 "Moving log file from archive/ to wal_dir: %s",
1147 f.c_str());
1148 Status s =
1149 db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
1150 if (!s.ok()) {
1151 // if we can't move log file from archive_dir to wal_dir,
1152 // we should fail, since it might mean data loss
1153 return s;
1154 }
1155 }
1156 }
1157 } else {
1158 DeleteChildren(wal_dir);
1159 DeleteChildren(ArchivalDirectory(wal_dir));
1160 DeleteChildren(db_dir);
1161 }
1162
1163 RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
1164 if (rate_limiter) {
1165 copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
1166 }
1167 Status s;
1168 std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1169 for (const auto& file_info : backup->GetFiles()) {
1170 const std::string &file = file_info->filename;
1171 std::string dst;
1172 // 1. extract the filename
1173 size_t slash = file.find_last_of('/');
1174 // file will either be shared/<file>, shared_checksum/<file_crc32_size>
1175 // or private/<number>/<file>
1176 assert(slash != std::string::npos);
1177 dst = file.substr(slash + 1);
1178
1179 // if the file was in shared_checksum, extract the real file name
1180 // in this case the file is <number>_<checksum>_<size>.<type>
1181 if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
1182 dst = GetFileFromChecksumFile(dst);
1183 }
1184
1185 // 2. find the filetype
1186 uint64_t number;
1187 FileType type;
1188 bool ok = ParseFileName(dst, &number, &type);
1189 if (!ok) {
1190 return Status::Corruption("Backup corrupted");
1191 }
1192 // 3. Construct the final path
1193 // kLogFile lives in wal_dir and all the rest live in db_dir
1194 dst = ((type == kLogFile) ? wal_dir : db_dir) +
1195 "/" + dst;
1196
1197 ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
1198 dst.c_str());
1199 CopyOrCreateWorkItem copy_or_create_work_item(
1200 GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1201 EnvOptions() /* src_env_options */, false, rate_limiter,
1202 0 /* size_limit */);
1203 RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1204 copy_or_create_work_item.result.get_future(),
1205 file_info->checksum_value);
1206 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1207 restore_items_to_finish.push_back(
1208 std::move(after_copy_or_create_work_item));
1209 }
1210 Status item_status;
1211 for (auto& item : restore_items_to_finish) {
1212 item.result.wait();
1213 auto result = item.result.get();
1214 item_status = result.status;
1215 // Note: It is possible that both of the following bad-status cases occur
1216 // during copying. But, we only return one status.
1217 if (!item_status.ok()) {
1218 s = item_status;
1219 break;
1220 } else if (item.checksum_value != result.checksum_value) {
1221 s = Status::Corruption("Checksum check failed");
1222 break;
1223 }
1224 }
1225
1226 ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
1227 s.ToString().c_str());
1228 return s;
1229 }
1230
VerifyBackup(BackupID backup_id)1231 Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
1232 assert(initialized_);
1233 auto corrupt_itr = corrupt_backups_.find(backup_id);
1234 if (corrupt_itr != corrupt_backups_.end()) {
1235 return corrupt_itr->second.first;
1236 }
1237
1238 auto backup_itr = backups_.find(backup_id);
1239 if (backup_itr == backups_.end()) {
1240 return Status::NotFound();
1241 }
1242
1243 auto& backup = backup_itr->second;
1244 if (backup->Empty()) {
1245 return Status::NotFound();
1246 }
1247
1248 ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1249
1250 std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
1251 for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
1252 GetSharedFileWithChecksumRel()}) {
1253 const auto abs_dir = GetAbsolutePath(rel_dir);
1254 InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
1255 }
1256
1257 for (const auto& file_info : backup->GetFiles()) {
1258 const auto abs_path = GetAbsolutePath(file_info->filename);
1259 if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
1260 return Status::NotFound("File missing: " + abs_path);
1261 }
1262 if (file_info->size != curr_abs_path_to_size[abs_path]) {
1263 return Status::Corruption("File corrupted: " + abs_path);
1264 }
1265 }
1266 return Status::OK();
1267 }
1268
CopyOrCreateFile(const std::string & src,const std::string & dst,const std::string & contents,Env * src_env,Env * dst_env,const EnvOptions & src_env_options,bool sync,RateLimiter * rate_limiter,uint64_t * size,uint32_t * checksum_value,uint64_t size_limit,std::function<void ()> progress_callback)1269 Status BackupEngineImpl::CopyOrCreateFile(
1270 const std::string& src, const std::string& dst, const std::string& contents,
1271 Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
1272 RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
1273 uint64_t size_limit, std::function<void()> progress_callback) {
1274 assert(src.empty() != contents.empty());
1275 Status s;
1276 std::unique_ptr<WritableFile> dst_file;
1277 std::unique_ptr<SequentialFile> src_file;
1278 EnvOptions dst_env_options;
1279 dst_env_options.use_mmap_writes = false;
1280 // TODO:(gzh) maybe use direct reads/writes here if possible
1281 if (size != nullptr) {
1282 *size = 0;
1283 }
1284 if (checksum_value != nullptr) {
1285 *checksum_value = 0;
1286 }
1287
1288 // Check if size limit is set. if not, set it to very big number
1289 if (size_limit == 0) {
1290 size_limit = std::numeric_limits<uint64_t>::max();
1291 }
1292
1293 s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
1294 if (s.ok() && !src.empty()) {
1295 s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1296 }
1297 if (!s.ok()) {
1298 return s;
1299 }
1300
1301 std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
1302 NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
1303 std::unique_ptr<SequentialFileReader> src_reader;
1304 std::unique_ptr<char[]> buf;
1305 if (!src.empty()) {
1306 src_reader.reset(new SequentialFileReader(
1307 NewLegacySequentialFileWrapper(src_file), src));
1308 buf.reset(new char[copy_file_buffer_size_]);
1309 }
1310
1311 Slice data;
1312 uint64_t processed_buffer_size = 0;
1313 do {
1314 if (stop_backup_.load(std::memory_order_acquire)) {
1315 return Status::Incomplete("Backup stopped");
1316 }
1317 if (!src.empty()) {
1318 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
1319 ? copy_file_buffer_size_
1320 : static_cast<size_t>(size_limit);
1321 s = src_reader->Read(buffer_to_read, &data, buf.get());
1322 processed_buffer_size += buffer_to_read;
1323 } else {
1324 data = contents;
1325 }
1326 size_limit -= data.size();
1327
1328 if (!s.ok()) {
1329 return s;
1330 }
1331
1332 if (size != nullptr) {
1333 *size += data.size();
1334 }
1335 if (checksum_value != nullptr) {
1336 *checksum_value =
1337 crc32c::Extend(*checksum_value, data.data(), data.size());
1338 }
1339 s = dest_writer->Append(data);
1340 if (rate_limiter != nullptr) {
1341 rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
1342 RateLimiter::OpType::kWrite);
1343 }
1344 if (processed_buffer_size > options_.callback_trigger_interval_size) {
1345 processed_buffer_size -= options_.callback_trigger_interval_size;
1346 std::lock_guard<std::mutex> lock(byte_report_mutex_);
1347 progress_callback();
1348 }
1349 } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
1350
1351 if (s.ok() && sync) {
1352 s = dest_writer->Sync(false);
1353 }
1354 if (s.ok()) {
1355 s = dest_writer->Close();
1356 }
1357 return s;
1358 }
1359
1360 // fname will always start with "/"
AddBackupFileWorkItem(std::unordered_set<std::string> & live_dst_paths,std::vector<BackupAfterCopyOrCreateWorkItem> & backup_items_to_finish,BackupID backup_id,bool shared,const std::string & src_dir,const std::string & fname,const EnvOptions & src_env_options,RateLimiter * rate_limiter,uint64_t size_bytes,uint64_t size_limit,bool shared_checksum,std::function<void ()> progress_callback,const std::string & contents)1361 Status BackupEngineImpl::AddBackupFileWorkItem(
1362 std::unordered_set<std::string>& live_dst_paths,
1363 std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1364 BackupID backup_id, bool shared, const std::string& src_dir,
1365 const std::string& fname, const EnvOptions& src_env_options,
1366 RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
1367 bool shared_checksum, std::function<void()> progress_callback,
1368 const std::string& contents) {
1369 assert(!fname.empty() && fname[0] == '/');
1370 assert(contents.empty() != src_dir.empty());
1371
1372 std::string dst_relative = fname.substr(1);
1373 std::string dst_relative_tmp;
1374 Status s;
1375 uint32_t checksum_value = 0;
1376
1377 if (shared && shared_checksum) {
1378 // add checksum and file length to the file name
1379 s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
1380 &checksum_value);
1381 if (!s.ok()) {
1382 return s;
1383 }
1384 if (size_bytes == port::kMaxUint64) {
1385 return Status::NotFound("File missing: " + src_dir + fname);
1386 }
1387 dst_relative =
1388 GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
1389 dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
1390 dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
1391 } else if (shared) {
1392 dst_relative_tmp = GetSharedFileRel(dst_relative, true);
1393 dst_relative = GetSharedFileRel(dst_relative, false);
1394 } else {
1395 dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
1396 }
1397
1398 // We copy into `temp_dest_path` and, once finished, rename it to
1399 // `final_dest_path`. This allows files to atomically appear at
1400 // `final_dest_path`. We can copy directly to the final path when atomicity
1401 // is unnecessary, like for files in private backup directories.
1402 const std::string* copy_dest_path;
1403 std::string temp_dest_path;
1404 std::string final_dest_path = GetAbsolutePath(dst_relative);
1405 if (!dst_relative_tmp.empty()) {
1406 temp_dest_path = GetAbsolutePath(dst_relative_tmp);
1407 copy_dest_path = &temp_dest_path;
1408 } else {
1409 copy_dest_path = &final_dest_path;
1410 }
1411
1412 // if it's shared, we also need to check if it exists -- if it does, no need
1413 // to copy it again.
1414 bool need_to_copy = true;
1415 // true if final_dest_path is the same path as another live file
1416 const bool same_path =
1417 live_dst_paths.find(final_dest_path) != live_dst_paths.end();
1418
1419 bool file_exists = false;
1420 if (shared && !same_path) {
1421 Status exist = backup_env_->FileExists(final_dest_path);
1422 if (exist.ok()) {
1423 file_exists = true;
1424 } else if (exist.IsNotFound()) {
1425 file_exists = false;
1426 } else {
1427 assert(s.IsIOError());
1428 return exist;
1429 }
1430 }
1431
1432 if (!contents.empty()) {
1433 need_to_copy = false;
1434 } else if (shared && (same_path || file_exists)) {
1435 need_to_copy = false;
1436 if (shared_checksum) {
1437 ROCKS_LOG_INFO(options_.info_log,
1438 "%s already present, with checksum %u and size %" PRIu64,
1439 fname.c_str(), checksum_value, size_bytes);
1440 } else if (backuped_file_infos_.find(dst_relative) ==
1441 backuped_file_infos_.end() && !same_path) {
1442 // file already exists, but it's not referenced by any backup. overwrite
1443 // the file
1444 ROCKS_LOG_INFO(
1445 options_.info_log,
1446 "%s already present, but not referenced by any backup. We will "
1447 "overwrite the file.",
1448 fname.c_str());
1449 need_to_copy = true;
1450 backup_env_->DeleteFile(final_dest_path);
1451 } else {
1452 // the file is present and referenced by a backup
1453 ROCKS_LOG_INFO(options_.info_log,
1454 "%s already present, calculate checksum", fname.c_str());
1455 s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
1456 size_limit, &checksum_value);
1457 }
1458 }
1459 live_dst_paths.insert(final_dest_path);
1460
1461 if (!contents.empty() || need_to_copy) {
1462 ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1463 copy_dest_path->c_str());
1464 CopyOrCreateWorkItem copy_or_create_work_item(
1465 src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1466 db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
1467 size_limit, progress_callback);
1468 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1469 copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1470 backup_env_, temp_dest_path, final_dest_path, dst_relative);
1471 files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1472 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1473 } else {
1474 std::promise<CopyOrCreateResult> promise_result;
1475 BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1476 promise_result.get_future(), shared, need_to_copy, backup_env_,
1477 temp_dest_path, final_dest_path, dst_relative);
1478 backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1479 CopyOrCreateResult result;
1480 result.status = s;
1481 result.size = size_bytes;
1482 result.checksum_value = checksum_value;
1483 promise_result.set_value(std::move(result));
1484 }
1485 return s;
1486 }
1487
CalculateChecksum(const std::string & src,Env * src_env,const EnvOptions & src_env_options,uint64_t size_limit,uint32_t * checksum_value)1488 Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
1489 const EnvOptions& src_env_options,
1490 uint64_t size_limit,
1491 uint32_t* checksum_value) {
1492 *checksum_value = 0;
1493 if (size_limit == 0) {
1494 size_limit = std::numeric_limits<uint64_t>::max();
1495 }
1496
1497 std::unique_ptr<SequentialFile> src_file;
1498 Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1499 if (!s.ok()) {
1500 return s;
1501 }
1502
1503 std::unique_ptr<SequentialFileReader> src_reader(
1504 new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
1505 std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
1506 Slice data;
1507
1508 do {
1509 if (stop_backup_.load(std::memory_order_acquire)) {
1510 return Status::Incomplete("Backup stopped");
1511 }
1512 size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1513 copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1514 s = src_reader->Read(buffer_to_read, &data, buf.get());
1515
1516 if (!s.ok()) {
1517 return s;
1518 }
1519
1520 size_limit -= data.size();
1521 *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
1522 } while (data.size() > 0 && size_limit > 0);
1523
1524 return s;
1525 }
1526
DeleteChildren(const std::string & dir,uint32_t file_type_filter)1527 void BackupEngineImpl::DeleteChildren(const std::string& dir,
1528 uint32_t file_type_filter) {
1529 std::vector<std::string> children;
1530 db_env_->GetChildren(dir, &children); // ignore errors
1531
1532 for (const auto& f : children) {
1533 uint64_t number;
1534 FileType type;
1535 bool ok = ParseFileName(f, &number, &type);
1536 if (ok && (file_type_filter & (1 << type))) {
1537 // don't delete this file
1538 continue;
1539 }
1540 db_env_->DeleteFile(dir + "/" + f); // ignore errors
1541 }
1542 }
1543
InsertPathnameToSizeBytes(const std::string & dir,Env * env,std::unordered_map<std::string,uint64_t> * result)1544 Status BackupEngineImpl::InsertPathnameToSizeBytes(
1545 const std::string& dir, Env* env,
1546 std::unordered_map<std::string, uint64_t>* result) {
1547 assert(result != nullptr);
1548 std::vector<Env::FileAttributes> files_attrs;
1549 Status status = env->FileExists(dir);
1550 if (status.ok()) {
1551 status = env->GetChildrenFileAttributes(dir, &files_attrs);
1552 } else if (status.IsNotFound()) {
1553 // Insert no entries can be considered success
1554 status = Status::OK();
1555 }
1556 const bool slash_needed = dir.empty() || dir.back() != '/';
1557 for (const auto& file_attrs : files_attrs) {
1558 result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
1559 file_attrs.size_bytes);
1560 }
1561 return status;
1562 }
1563
GarbageCollect()1564 Status BackupEngineImpl::GarbageCollect() {
1565 assert(!read_only_);
1566
1567 // We will make a best effort to remove all garbage even in the presence
1568 // of inconsistencies or I/O failures that inhibit finding garbage.
1569 Status overall_status = Status::OK();
1570 // If all goes well, we don't need another auto-GC this session
1571 might_need_garbage_collect_ = false;
1572
1573 ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1574
1575 // delete obsolete shared files
1576 for (bool with_checksum : {false, true}) {
1577 std::vector<std::string> shared_children;
1578 {
1579 std::string shared_path;
1580 if (with_checksum) {
1581 shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
1582 } else {
1583 shared_path = GetAbsolutePath(GetSharedFileRel());
1584 }
1585 auto s = backup_env_->FileExists(shared_path);
1586 if (s.ok()) {
1587 s = backup_env_->GetChildren(shared_path, &shared_children);
1588 } else if (s.IsNotFound()) {
1589 s = Status::OK();
1590 }
1591 if (!s.ok()) {
1592 overall_status = s;
1593 // Trying again later might work
1594 might_need_garbage_collect_ = true;
1595 }
1596 }
1597 for (auto& child : shared_children) {
1598 if (child == "." || child == "..") {
1599 continue;
1600 }
1601 std::string rel_fname;
1602 if (with_checksum) {
1603 rel_fname = GetSharedFileWithChecksumRel(child);
1604 } else {
1605 rel_fname = GetSharedFileRel(child);
1606 }
1607 auto child_itr = backuped_file_infos_.find(rel_fname);
1608 // if it's not refcounted, delete it
1609 if (child_itr == backuped_file_infos_.end() ||
1610 child_itr->second->refs == 0) {
1611 // this might be a directory, but DeleteFile will just fail in that
1612 // case, so we're good
1613 Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1614 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1615 rel_fname.c_str(), s.ToString().c_str());
1616 backuped_file_infos_.erase(rel_fname);
1617 if (!s.ok()) {
1618 // Trying again later might work
1619 might_need_garbage_collect_ = true;
1620 }
1621 }
1622 }
1623 }
1624
1625 // delete obsolete private files
1626 std::vector<std::string> private_children;
1627 {
1628 auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1629 &private_children);
1630 if (!s.ok()) {
1631 overall_status = s;
1632 // Trying again later might work
1633 might_need_garbage_collect_ = true;
1634 }
1635 }
1636 for (auto& child : private_children) {
1637 if (child == "." || child == "..") {
1638 continue;
1639 }
1640
1641 BackupID backup_id = 0;
1642 bool tmp_dir = child.find(".tmp") != std::string::npos;
1643 sscanf(child.c_str(), "%u", &backup_id);
1644 if (!tmp_dir && // if it's tmp_dir, delete it
1645 (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
1646 // it's either not a number or it's still alive. continue
1647 continue;
1648 }
1649 // here we have to delete the dir and all its children
1650 std::string full_private_path =
1651 GetAbsolutePath(GetPrivateFileRel(backup_id));
1652 std::vector<std::string> subchildren;
1653 backup_env_->GetChildren(full_private_path, &subchildren);
1654 for (auto& subchild : subchildren) {
1655 if (subchild == "." || subchild == "..") {
1656 continue;
1657 }
1658 Status s = backup_env_->DeleteFile(full_private_path + subchild);
1659 ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1660 (full_private_path + subchild).c_str(),
1661 s.ToString().c_str());
1662 if (!s.ok()) {
1663 // Trying again later might work
1664 might_need_garbage_collect_ = true;
1665 }
1666 }
1667 // finally delete the private dir
1668 Status s = backup_env_->DeleteDir(full_private_path);
1669 ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
1670 full_private_path.c_str(), s.ToString().c_str());
1671 if (!s.ok()) {
1672 // Trying again later might work
1673 might_need_garbage_collect_ = true;
1674 }
1675 }
1676
1677 assert(overall_status.ok() || might_need_garbage_collect_);
1678 return overall_status;
1679 }
1680
1681 // ------- BackupMeta class --------
1682
AddFile(std::shared_ptr<FileInfo> file_info)1683 Status BackupEngineImpl::BackupMeta::AddFile(
1684 std::shared_ptr<FileInfo> file_info) {
1685 auto itr = file_infos_->find(file_info->filename);
1686 if (itr == file_infos_->end()) {
1687 auto ret = file_infos_->insert({file_info->filename, file_info});
1688 if (ret.second) {
1689 itr = ret.first;
1690 itr->second->refs = 1;
1691 } else {
1692 // if this happens, something is seriously wrong
1693 return Status::Corruption("In memory metadata insertion error");
1694 }
1695 } else {
1696 if (itr->second->checksum_value != file_info->checksum_value) {
1697 return Status::Corruption(
1698 "Checksum mismatch for existing backup file. Delete old backups and "
1699 "try again.");
1700 }
1701 ++itr->second->refs; // increase refcount if already present
1702 }
1703
1704 size_ += file_info->size;
1705 files_.push_back(itr->second);
1706
1707 return Status::OK();
1708 }
1709
Delete(bool delete_meta)1710 Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
1711 Status s;
1712 for (const auto& file : files_) {
1713 --file->refs; // decrease refcount
1714 }
1715 files_.clear();
1716 // delete meta file
1717 if (delete_meta) {
1718 s = env_->FileExists(meta_filename_);
1719 if (s.ok()) {
1720 s = env_->DeleteFile(meta_filename_);
1721 } else if (s.IsNotFound()) {
1722 s = Status::OK(); // nothing to delete
1723 }
1724 }
1725 timestamp_ = 0;
1726 return s;
1727 }
1728
1729 Slice kMetaDataPrefix("metadata ");
1730
1731 // each backup meta file is of the format:
1732 // <timestamp>
1733 // <seq number>
1734 // <metadata(literal string)> <metadata> (optional)
1735 // <number of files>
1736 // <file1> <crc32(literal string)> <crc32_value>
1737 // <file2> <crc32(literal string)> <crc32_value>
1738 // ...
LoadFromFile(const std::string & backup_dir,const std::unordered_map<std::string,uint64_t> & abs_path_to_size)1739 Status BackupEngineImpl::BackupMeta::LoadFromFile(
1740 const std::string& backup_dir,
1741 const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
1742 assert(Empty());
1743 Status s;
1744 std::unique_ptr<SequentialFile> backup_meta_file;
1745 s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
1746 if (!s.ok()) {
1747 return s;
1748 }
1749
1750 std::unique_ptr<SequentialFileReader> backup_meta_reader(
1751 new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
1752 meta_filename_));
1753 std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
1754 Slice data;
1755 s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
1756
1757 if (!s.ok() || data.size() == max_backup_meta_file_size_) {
1758 return s.ok() ? Status::Corruption("File size too big") : s;
1759 }
1760 buf[data.size()] = 0;
1761
1762 uint32_t num_files = 0;
1763 char *next;
1764 timestamp_ = strtoull(data.data(), &next, 10);
1765 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1766 sequence_number_ = strtoull(data.data(), &next, 10);
1767 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1768
1769 if (data.starts_with(kMetaDataPrefix)) {
1770 // app metadata present
1771 data.remove_prefix(kMetaDataPrefix.size());
1772 Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
1773 bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
1774 if (!decode_success) {
1775 return Status::Corruption(
1776 "Failed to decode stored hex encoded app metadata");
1777 }
1778 }
1779
1780 num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1781 data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1782
1783 std::vector<std::shared_ptr<FileInfo>> files;
1784
1785 Slice checksum_prefix("crc32 ");
1786
1787 for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
1788 auto line = GetSliceUntil(&data, '\n');
1789 std::string filename = GetSliceUntil(&line, ' ').ToString();
1790
1791 uint64_t size;
1792 const std::shared_ptr<FileInfo> file_info = GetFile(filename);
1793 if (file_info) {
1794 size = file_info->size;
1795 } else {
1796 std::string abs_path = backup_dir + "/" + filename;
1797 try {
1798 size = abs_path_to_size.at(abs_path);
1799 } catch (std::out_of_range&) {
1800 return Status::Corruption("Size missing for pathname: " + abs_path);
1801 }
1802 }
1803
1804 if (line.empty()) {
1805 return Status::Corruption("File checksum is missing for " + filename +
1806 " in " + meta_filename_);
1807 }
1808
1809 uint32_t checksum_value = 0;
1810 if (line.starts_with(checksum_prefix)) {
1811 line.remove_prefix(checksum_prefix.size());
1812 checksum_value = static_cast<uint32_t>(
1813 strtoul(line.data(), nullptr, 10));
1814 if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
1815 return Status::Corruption("Invalid checksum value for " + filename +
1816 " in " + meta_filename_);
1817 }
1818 } else {
1819 return Status::Corruption("Unknown checksum type for " + filename +
1820 " in " + meta_filename_);
1821 }
1822
1823 files.emplace_back(new FileInfo(filename, size, checksum_value));
1824 }
1825
1826 if (s.ok() && data.size() > 0) {
1827 // file has to be read completely. if not, we count it as corruption
1828 s = Status::Corruption("Tailing data in backup meta file in " +
1829 meta_filename_);
1830 }
1831
1832 if (s.ok()) {
1833 files_.reserve(files.size());
1834 for (const auto& file_info : files) {
1835 s = AddFile(file_info);
1836 if (!s.ok()) {
1837 break;
1838 }
1839 }
1840 }
1841
1842 return s;
1843 }
1844
StoreToFile(bool sync)1845 Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
1846 Status s;
1847 std::unique_ptr<WritableFile> backup_meta_file;
1848 EnvOptions env_options;
1849 env_options.use_mmap_writes = false;
1850 env_options.use_direct_writes = false;
1851 s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
1852 if (!s.ok()) {
1853 return s;
1854 }
1855
1856 std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
1857 size_t len = 0, buf_size = max_backup_meta_file_size_;
1858 len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1859 len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
1860 sequence_number_);
1861 if (!app_metadata_.empty()) {
1862 std::string hex_encoded_metadata =
1863 Slice(app_metadata_).ToString(/* hex */ true);
1864
1865 // +1 to accommodate newline character
1866 size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
1867 if (hex_meta_strlen >= buf_size) {
1868 return Status::Corruption("Buffer too small to fit backup metadata");
1869 }
1870 else if (len + hex_meta_strlen >= buf_size) {
1871 backup_meta_file->Append(Slice(buf.get(), len));
1872 buf.reset();
1873 std::unique_ptr<char[]> new_reset_buf(
1874 new char[max_backup_meta_file_size_]);
1875 buf.swap(new_reset_buf);
1876 len = 0;
1877 }
1878 len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
1879 kMetaDataPrefix.ToString().c_str(),
1880 hex_encoded_metadata.c_str());
1881 }
1882
1883 char writelen_temp[19];
1884 if (len + snprintf(writelen_temp, sizeof(writelen_temp),
1885 "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
1886 backup_meta_file->Append(Slice(buf.get(), len));
1887 buf.reset();
1888 std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1889 buf.swap(new_reset_buf);
1890 len = 0;
1891 }
1892 {
1893 const char *const_write = writelen_temp;
1894 len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
1895 }
1896
1897 for (const auto& file : files_) {
1898 // use crc32 for now, switch to something else if needed
1899
1900 size_t newlen = len + file->filename.length() + snprintf(writelen_temp,
1901 sizeof(writelen_temp), " crc32 %u\n", file->checksum_value);
1902 const char *const_write = writelen_temp;
1903 if (newlen >= buf_size) {
1904 backup_meta_file->Append(Slice(buf.get(), len));
1905 buf.reset();
1906 std::unique_ptr<char[]> new_reset_buf(
1907 new char[max_backup_meta_file_size_]);
1908 buf.swap(new_reset_buf);
1909 len = 0;
1910 }
1911 len += snprintf(buf.get() + len, buf_size - len, "%s%s",
1912 file->filename.c_str(), const_write);
1913 }
1914
1915 s = backup_meta_file->Append(Slice(buf.get(), len));
1916 if (s.ok() && sync) {
1917 s = backup_meta_file->Sync();
1918 }
1919 if (s.ok()) {
1920 s = backup_meta_file->Close();
1921 }
1922 if (s.ok()) {
1923 s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
1924 }
1925 return s;
1926 }
1927
1928 // -------- BackupEngineReadOnlyImpl ---------
1929 class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
1930 public:
BackupEngineReadOnlyImpl(Env * db_env,const BackupableDBOptions & options)1931 BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
1932 : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}
1933
~BackupEngineReadOnlyImpl()1934 ~BackupEngineReadOnlyImpl() override {}
1935
1936 // The returned BackupInfos are in chronological order, which means the
1937 // latest backup comes last.
GetBackupInfo(std::vector<BackupInfo> * backup_info)1938 void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
1939 backup_engine_->GetBackupInfo(backup_info);
1940 }
1941
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1942 void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
1943 backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
1944 }
1945
RestoreDBFromBackup(BackupID backup_id,const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options=RestoreOptions ())1946 Status RestoreDBFromBackup(
1947 BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
1948 const RestoreOptions& restore_options = RestoreOptions()) override {
1949 return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
1950 restore_options);
1951 }
1952
RestoreDBFromLatestBackup(const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options=RestoreOptions ())1953 Status RestoreDBFromLatestBackup(
1954 const std::string& db_dir, const std::string& wal_dir,
1955 const RestoreOptions& restore_options = RestoreOptions()) override {
1956 return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
1957 restore_options);
1958 }
1959
VerifyBackup(BackupID backup_id)1960 Status VerifyBackup(BackupID backup_id) override {
1961 return backup_engine_->VerifyBackup(backup_id);
1962 }
1963
Initialize()1964 Status Initialize() { return backup_engine_->Initialize(); }
1965
1966 private:
1967 std::unique_ptr<BackupEngineImpl> backup_engine_;
1968 };
1969
Open(Env * env,const BackupableDBOptions & options,BackupEngineReadOnly ** backup_engine_ptr)1970 Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
1971 BackupEngineReadOnly** backup_engine_ptr) {
1972 if (options.destroy_old_data) {
1973 return Status::InvalidArgument(
1974 "Can't destroy old data with ReadOnly BackupEngine");
1975 }
1976 std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
1977 new BackupEngineReadOnlyImpl(env, options));
1978 auto s = backup_engine->Initialize();
1979 if (!s.ok()) {
1980 *backup_engine_ptr = nullptr;
1981 return s;
1982 }
1983 *backup_engine_ptr = backup_engine.release();
1984 return Status::OK();
1985 }
1986
1987 } // namespace ROCKSDB_NAMESPACE
1988
1989 #endif // ROCKSDB_LITE
1990