1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #ifndef ROCKSDB_LITE
11 
12 #include <stdlib.h>
13 #include <algorithm>
14 #include <atomic>
15 #include <cinttypes>
16 #include <functional>
17 #include <future>
18 #include <limits>
19 #include <map>
20 #include <mutex>
21 #include <sstream>
22 #include <string>
23 #include <thread>
24 #include <unordered_map>
25 #include <unordered_set>
26 #include <vector>
27 
28 #include "env/composite_env_wrapper.h"
29 #include "file/filename.h"
30 #include "file/sequence_file_reader.h"
31 #include "file/writable_file_writer.h"
32 #include "logging/logging.h"
33 #include "port/port.h"
34 #include "rocksdb/rate_limiter.h"
35 #include "rocksdb/transaction_log.h"
36 #include "rocksdb/utilities/backupable_db.h"
37 #include "test_util/sync_point.h"
38 #include "util/channel.h"
39 #include "util/coding.h"
40 #include "util/crc32c.h"
41 #include "util/string_util.h"
42 #include "utilities/checkpoint/checkpoint_impl.h"
43 
44 namespace ROCKSDB_NAMESPACE {
45 
IncrementNumberSuccessBackup()46 void BackupStatistics::IncrementNumberSuccessBackup() {
47   number_success_backup++;
48 }
IncrementNumberFailBackup()49 void BackupStatistics::IncrementNumberFailBackup() {
50   number_fail_backup++;
51 }
52 
GetNumberSuccessBackup() const53 uint32_t BackupStatistics::GetNumberSuccessBackup() const {
54   return number_success_backup;
55 }
GetNumberFailBackup() const56 uint32_t BackupStatistics::GetNumberFailBackup() const {
57   return number_fail_backup;
58 }
59 
ToString() const60 std::string BackupStatistics::ToString() const {
61   char result[50];
62   snprintf(result, sizeof(result), "# success backup: %u, # fail backup: %u",
63            GetNumberSuccessBackup(), GetNumberFailBackup());
64   return result;
65 }
66 
Dump(Logger * logger) const67 void BackupableDBOptions::Dump(Logger* logger) const {
68   ROCKS_LOG_INFO(logger, "               Options.backup_dir: %s",
69                  backup_dir.c_str());
70   ROCKS_LOG_INFO(logger, "               Options.backup_env: %p", backup_env);
71   ROCKS_LOG_INFO(logger, "        Options.share_table_files: %d",
72                  static_cast<int>(share_table_files));
73   ROCKS_LOG_INFO(logger, "                 Options.info_log: %p", info_log);
74   ROCKS_LOG_INFO(logger, "                     Options.sync: %d",
75                  static_cast<int>(sync));
76   ROCKS_LOG_INFO(logger, "         Options.destroy_old_data: %d",
77                  static_cast<int>(destroy_old_data));
78   ROCKS_LOG_INFO(logger, "         Options.backup_log_files: %d",
79                  static_cast<int>(backup_log_files));
80   ROCKS_LOG_INFO(logger, "        Options.backup_rate_limit: %" PRIu64,
81                  backup_rate_limit);
82   ROCKS_LOG_INFO(logger, "       Options.restore_rate_limit: %" PRIu64,
83                  restore_rate_limit);
84   ROCKS_LOG_INFO(logger, "Options.max_background_operations: %d",
85                  max_background_operations);
86 }
87 
88 // -------- BackupEngineImpl class ---------
89 class BackupEngineImpl : public BackupEngine {
90  public:
91   BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
92                    bool read_only = false);
93   ~BackupEngineImpl() override;
94   Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
95                                      bool flush_before_backup = false,
96                                      std::function<void()> progress_callback =
__anonc0806a9f0102() 97                                          []() {}) override;
98   Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
99   Status DeleteBackup(BackupID backup_id) override;
StopBackup()100   void StopBackup() override {
101     stop_backup_.store(true, std::memory_order_release);
102   }
103   Status GarbageCollect() override;
104 
105   // The returned BackupInfos are in chronological order, which means the
106   // latest backup comes last.
107   void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
108   void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
109   Status RestoreDBFromBackup(
110       BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
111       const RestoreOptions& restore_options = RestoreOptions()) override;
RestoreDBFromLatestBackup(const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options=RestoreOptions ())112   Status RestoreDBFromLatestBackup(
113       const std::string& db_dir, const std::string& wal_dir,
114       const RestoreOptions& restore_options = RestoreOptions()) override {
115     return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
116                                restore_options);
117   }
118 
119   Status VerifyBackup(BackupID backup_id) override;
120 
121   Status Initialize();
122 
123  private:
124   void DeleteChildren(const std::string& dir, uint32_t file_type_filter = 0);
125   Status DeleteBackupInternal(BackupID backup_id);
126 
127   // Extends the "result" map with pathname->size mappings for the contents of
128   // "dir" in "env". Pathnames are prefixed with "dir".
129   Status InsertPathnameToSizeBytes(
130       const std::string& dir, Env* env,
131       std::unordered_map<std::string, uint64_t>* result);
132 
133   struct FileInfo {
FileInfoROCKSDB_NAMESPACE::BackupEngineImpl::FileInfo134     FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum)
135       : refs(0), filename(fname), size(sz), checksum_value(checksum) {}
136 
137     FileInfo(const FileInfo&) = delete;
138     FileInfo& operator=(const FileInfo&) = delete;
139 
140     int refs;
141     const std::string filename;
142     const uint64_t size;
143     const uint32_t checksum_value;
144   };
145 
146   class BackupMeta {
147    public:
BackupMeta(const std::string & meta_filename,const std::string & meta_tmp_filename,std::unordered_map<std::string,std::shared_ptr<FileInfo>> * file_infos,Env * env)148     BackupMeta(
149         const std::string& meta_filename, const std::string& meta_tmp_filename,
150         std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos,
151         Env* env)
152         : timestamp_(0),
153           sequence_number_(0),
154           size_(0),
155           meta_filename_(meta_filename),
156           meta_tmp_filename_(meta_tmp_filename),
157           file_infos_(file_infos),
158           env_(env) {}
159 
160     BackupMeta(const BackupMeta&) = delete;
161     BackupMeta& operator=(const BackupMeta&) = delete;
162 
~BackupMeta()163     ~BackupMeta() {}
164 
RecordTimestamp()165     void RecordTimestamp() {
166       env_->GetCurrentTime(&timestamp_);
167     }
GetTimestamp() const168     int64_t GetTimestamp() const {
169       return timestamp_;
170     }
GetSize() const171     uint64_t GetSize() const {
172       return size_;
173     }
GetNumberFiles()174     uint32_t GetNumberFiles() { return static_cast<uint32_t>(files_.size()); }
SetSequenceNumber(uint64_t sequence_number)175     void SetSequenceNumber(uint64_t sequence_number) {
176       sequence_number_ = sequence_number;
177     }
GetSequenceNumber()178     uint64_t GetSequenceNumber() {
179       return sequence_number_;
180     }
181 
GetAppMetadata() const182     const std::string& GetAppMetadata() const { return app_metadata_; }
183 
SetAppMetadata(const std::string & app_metadata)184     void SetAppMetadata(const std::string& app_metadata) {
185       app_metadata_ = app_metadata;
186     }
187 
188     Status AddFile(std::shared_ptr<FileInfo> file_info);
189 
190     Status Delete(bool delete_meta = true);
191 
Empty()192     bool Empty() {
193       return files_.empty();
194     }
195 
GetFile(const std::string & filename) const196     std::shared_ptr<FileInfo> GetFile(const std::string& filename) const {
197       auto it = file_infos_->find(filename);
198       if (it == file_infos_->end())
199         return nullptr;
200       return it->second;
201     }
202 
GetFiles()203     const std::vector<std::shared_ptr<FileInfo>>& GetFiles() {
204       return files_;
205     }
206 
207     // @param abs_path_to_size Pre-fetched file sizes (bytes).
208     Status LoadFromFile(
209         const std::string& backup_dir,
210         const std::unordered_map<std::string, uint64_t>& abs_path_to_size);
211     Status StoreToFile(bool sync);
212 
GetInfoString()213     std::string GetInfoString() {
214       std::ostringstream ss;
215       ss << "Timestamp: " << timestamp_ << std::endl;
216       char human_size[16];
217       AppendHumanBytes(size_, human_size, sizeof(human_size));
218       ss << "Size: " << human_size << std::endl;
219       ss << "Files:" << std::endl;
220       for (const auto& file : files_) {
221         AppendHumanBytes(file->size, human_size, sizeof(human_size));
222         ss << file->filename << ", size " << human_size << ", refs "
223            << file->refs << std::endl;
224       }
225       return ss.str();
226     }
227 
228    private:
229     int64_t timestamp_;
230     // sequence number is only approximate, should not be used
231     // by clients
232     uint64_t sequence_number_;
233     uint64_t size_;
234     std::string app_metadata_;
235     std::string const meta_filename_;
236     std::string const meta_tmp_filename_;
237     // files with relative paths (without "/" prefix!!)
238     std::vector<std::shared_ptr<FileInfo>> files_;
239     std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
240     Env* env_;
241 
242     static const size_t max_backup_meta_file_size_ = 10 * 1024 * 1024;  // 10MB
243   };  // BackupMeta
244 
GetAbsolutePath(const std::string & relative_path="") const245   inline std::string GetAbsolutePath(
246       const std::string &relative_path = "") const {
247     assert(relative_path.size() == 0 || relative_path[0] != '/');
248     return options_.backup_dir + "/" + relative_path;
249   }
GetPrivateDirRel() const250   inline std::string GetPrivateDirRel() const {
251     return "private";
252   }
GetSharedChecksumDirRel() const253   inline std::string GetSharedChecksumDirRel() const {
254     return "shared_checksum";
255   }
GetPrivateFileRel(BackupID backup_id,bool tmp=false,const std::string & file="") const256   inline std::string GetPrivateFileRel(BackupID backup_id,
257                                        bool tmp = false,
258                                        const std::string& file = "") const {
259     assert(file.size() == 0 || file[0] != '/');
260     return GetPrivateDirRel() + "/" + ROCKSDB_NAMESPACE::ToString(backup_id) +
261            (tmp ? ".tmp" : "") + "/" + file;
262   }
GetSharedFileRel(const std::string & file="",bool tmp=false) const263   inline std::string GetSharedFileRel(const std::string& file = "",
264                                       bool tmp = false) const {
265     assert(file.size() == 0 || file[0] != '/');
266     return std::string("shared/") + (tmp ? "." : "") + file +
267            (tmp ? ".tmp" : "");
268   }
GetSharedFileWithChecksumRel(const std::string & file="",bool tmp=false) const269   inline std::string GetSharedFileWithChecksumRel(const std::string& file = "",
270                                                   bool tmp = false) const {
271     assert(file.size() == 0 || file[0] != '/');
272     return GetSharedChecksumDirRel() + "/" + (tmp ? "." : "") + file +
273            (tmp ? ".tmp" : "");
274   }
GetSharedFileWithChecksum(const std::string & file,const uint32_t checksum_value,const uint64_t file_size) const275   inline std::string GetSharedFileWithChecksum(const std::string& file,
276                                                const uint32_t checksum_value,
277                                                const uint64_t file_size) const {
278     assert(file.size() == 0 || file[0] != '/');
279     std::string file_copy = file;
280     return file_copy.insert(file_copy.find_last_of('.'),
281                             "_" + ROCKSDB_NAMESPACE::ToString(checksum_value) +
282                                 "_" + ROCKSDB_NAMESPACE::ToString(file_size));
283   }
GetFileFromChecksumFile(const std::string & file) const284   inline std::string GetFileFromChecksumFile(const std::string& file) const {
285     assert(file.size() == 0 || file[0] != '/');
286     std::string file_copy = file;
287     size_t first_underscore = file_copy.find_first_of('_');
288     return file_copy.erase(first_underscore,
289                            file_copy.find_last_of('.') - first_underscore);
290   }
GetBackupMetaDir() const291   inline std::string GetBackupMetaDir() const {
292     return GetAbsolutePath("meta");
293   }
GetBackupMetaFile(BackupID backup_id,bool tmp) const294   inline std::string GetBackupMetaFile(BackupID backup_id, bool tmp) const {
295     return GetBackupMetaDir() + "/" + (tmp ? "." : "") +
296            ROCKSDB_NAMESPACE::ToString(backup_id) + (tmp ? ".tmp" : "");
297   }
298 
299   // If size_limit == 0, there is no size limit, copy everything.
300   //
301   // Exactly one of src and contents must be non-empty.
302   //
303   // @param src If non-empty, the file is copied from this pathname.
304   // @param contents If non-empty, the file will be created with these contents.
305   Status CopyOrCreateFile(const std::string& src, const std::string& dst,
306                           const std::string& contents, Env* src_env,
307                           Env* dst_env, const EnvOptions& src_env_options,
308                           bool sync, RateLimiter* rate_limiter,
309                           uint64_t* size = nullptr,
310                           uint32_t* checksum_value = nullptr,
311                           uint64_t size_limit = 0,
__anonc0806a9f0202() 312                           std::function<void()> progress_callback = []() {});
313 
314   Status CalculateChecksum(const std::string& src, Env* src_env,
315                            const EnvOptions& src_env_options,
316                            uint64_t size_limit, uint32_t* checksum_value);
317 
318   struct CopyOrCreateResult {
319     uint64_t size;
320     uint32_t checksum_value;
321     Status status;
322   };
323 
324   // Exactly one of src_path and contents must be non-empty. If src_path is
325   // non-empty, the file is copied from this pathname. Otherwise, if contents is
326   // non-empty, the file will be created at dst_path with these contents.
327   struct CopyOrCreateWorkItem {
328     std::string src_path;
329     std::string dst_path;
330     std::string contents;
331     Env* src_env;
332     Env* dst_env;
333     EnvOptions src_env_options;
334     bool sync;
335     RateLimiter* rate_limiter;
336     uint64_t size_limit;
337     std::promise<CopyOrCreateResult> result;
338     std::function<void()> progress_callback;
339 
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem340     CopyOrCreateWorkItem()
341         : src_path(""),
342           dst_path(""),
343           contents(""),
344           src_env(nullptr),
345           dst_env(nullptr),
346           src_env_options(),
347           sync(false),
348           rate_limiter(nullptr),
349           size_limit(0) {}
350 
351     CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
352     CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
353 
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem354     CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
355       *this = std::move(o);
356     }
357 
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem358     CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
359       src_path = std::move(o.src_path);
360       dst_path = std::move(o.dst_path);
361       contents = std::move(o.contents);
362       src_env = o.src_env;
363       dst_env = o.dst_env;
364       src_env_options = std::move(o.src_env_options);
365       sync = o.sync;
366       rate_limiter = o.rate_limiter;
367       size_limit = o.size_limit;
368       result = std::move(o.result);
369       progress_callback = std::move(o.progress_callback);
370       return *this;
371     }
372 
CopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::CopyOrCreateWorkItem373     CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
374                          std::string _contents, Env* _src_env, Env* _dst_env,
375                          EnvOptions _src_env_options, bool _sync,
376                          RateLimiter* _rate_limiter, uint64_t _size_limit,
377                          std::function<void()> _progress_callback = []() {})
378         : src_path(std::move(_src_path)),
379           dst_path(std::move(_dst_path)),
380           contents(std::move(_contents)),
381           src_env(_src_env),
382           dst_env(_dst_env),
383           src_env_options(std::move(_src_env_options)),
384           sync(_sync),
385           rate_limiter(_rate_limiter),
386           size_limit(_size_limit),
387           progress_callback(_progress_callback) {}
388   };
389 
390   struct BackupAfterCopyOrCreateWorkItem {
391     std::future<CopyOrCreateResult> result;
392     bool shared;
393     bool needed_to_copy;
394     Env* backup_env;
395     std::string dst_path_tmp;
396     std::string dst_path;
397     std::string dst_relative;
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem398     BackupAfterCopyOrCreateWorkItem()
399       : shared(false),
400         needed_to_copy(false),
401         backup_env(nullptr),
402         dst_path_tmp(""),
403         dst_path(""),
404         dst_relative("") {}
405 
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem406     BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
407         ROCKSDB_NOEXCEPT {
408       *this = std::move(o);
409     }
410 
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem411     BackupAfterCopyOrCreateWorkItem& operator=(
412         BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
413       result = std::move(o.result);
414       shared = o.shared;
415       needed_to_copy = o.needed_to_copy;
416       backup_env = o.backup_env;
417       dst_path_tmp = std::move(o.dst_path_tmp);
418       dst_path = std::move(o.dst_path);
419       dst_relative = std::move(o.dst_relative);
420       return *this;
421     }
422 
BackupAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::BackupAfterCopyOrCreateWorkItem423     BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
424                                     bool _shared, bool _needed_to_copy,
425                                     Env* _backup_env, std::string _dst_path_tmp,
426                                     std::string _dst_path,
427                                     std::string _dst_relative)
428         : result(std::move(_result)),
429           shared(_shared),
430           needed_to_copy(_needed_to_copy),
431           backup_env(_backup_env),
432           dst_path_tmp(std::move(_dst_path_tmp)),
433           dst_path(std::move(_dst_path)),
434           dst_relative(std::move(_dst_relative)) {}
435   };
436 
437   struct RestoreAfterCopyOrCreateWorkItem {
438     std::future<CopyOrCreateResult> result;
439     uint32_t checksum_value;
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem440     RestoreAfterCopyOrCreateWorkItem()
441       : checksum_value(0) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem442     RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
443                                      uint32_t _checksum_value)
444         : result(std::move(_result)), checksum_value(_checksum_value) {}
RestoreAfterCopyOrCreateWorkItemROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem445     RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
446         ROCKSDB_NOEXCEPT {
447       *this = std::move(o);
448     }
449 
operator =ROCKSDB_NAMESPACE::BackupEngineImpl::RestoreAfterCopyOrCreateWorkItem450     RestoreAfterCopyOrCreateWorkItem& operator=(
451         RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
452       result = std::move(o.result);
453       checksum_value = o.checksum_value;
454       return *this;
455     }
456   };
457 
458   bool initialized_;
459   std::mutex byte_report_mutex_;
460   channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
461   std::vector<port::Thread> threads_;
462   // Certain operations like PurgeOldBackups and DeleteBackup will trigger
463   // automatic GarbageCollect (true) unless we've already done one in this
464   // session and have not failed to delete backup files since then (false).
465   bool might_need_garbage_collect_ = true;
466 
467   // Adds a file to the backup work queue to be copied or created if it doesn't
468   // already exist.
469   //
470   // Exactly one of src_dir and contents must be non-empty.
471   //
472   // @param src_dir If non-empty, the file in this directory named fname will be
473   //    copied.
474   // @param fname Name of destination file and, in case of copy, source file.
475   // @param contents If non-empty, the file will be created with these contents.
476   Status AddBackupFileWorkItem(
477       std::unordered_set<std::string>& live_dst_paths,
478       std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
479       BackupID backup_id, bool shared, const std::string& src_dir,
480       const std::string& fname,  // starts with "/"
481       const EnvOptions& src_env_options, RateLimiter* rate_limiter,
482       uint64_t size_bytes, uint64_t size_limit = 0,
483       bool shared_checksum = false,
__anonc0806a9f0402() 484       std::function<void()> progress_callback = []() {},
485       const std::string& contents = std::string());
486 
487   // backup state data
488   BackupID latest_backup_id_;
489   BackupID latest_valid_backup_id_;
490   std::map<BackupID, std::unique_ptr<BackupMeta>> backups_;
491   std::map<BackupID, std::pair<Status, std::unique_ptr<BackupMeta>>>
492       corrupt_backups_;
493   std::unordered_map<std::string,
494                      std::shared_ptr<FileInfo>> backuped_file_infos_;
495   std::atomic<bool> stop_backup_;
496 
497   // options data
498   BackupableDBOptions options_;
499   Env* db_env_;
500   Env* backup_env_;
501 
502   // directories
503   std::unique_ptr<Directory> backup_directory_;
504   std::unique_ptr<Directory> shared_directory_;
505   std::unique_ptr<Directory> meta_directory_;
506   std::unique_ptr<Directory> private_directory_;
507 
508   static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL;  // 5MB
509   size_t copy_file_buffer_size_;
510   bool read_only_;
511   BackupStatistics backup_statistics_;
512   static const size_t kMaxAppMetaSize = 1024 * 1024;  // 1MB
513 };
514 
Open(Env * env,const BackupableDBOptions & options,BackupEngine ** backup_engine_ptr)515 Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
516                           BackupEngine** backup_engine_ptr) {
517   std::unique_ptr<BackupEngineImpl> backup_engine(
518       new BackupEngineImpl(env, options));
519   auto s = backup_engine->Initialize();
520   if (!s.ok()) {
521     *backup_engine_ptr = nullptr;
522     return s;
523   }
524   *backup_engine_ptr = backup_engine.release();
525   return Status::OK();
526 }
527 
BackupEngineImpl(Env * db_env,const BackupableDBOptions & options,bool read_only)528 BackupEngineImpl::BackupEngineImpl(Env* db_env,
529                                    const BackupableDBOptions& options,
530                                    bool read_only)
531     : initialized_(false),
532       latest_backup_id_(0),
533       latest_valid_backup_id_(0),
534       stop_backup_(false),
535       options_(options),
536       db_env_(db_env),
537       backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_),
538       copy_file_buffer_size_(kDefaultCopyFileBufferSize),
539       read_only_(read_only) {
540   if (options_.backup_rate_limiter == nullptr &&
541       options_.backup_rate_limit > 0) {
542     options_.backup_rate_limiter.reset(
543         NewGenericRateLimiter(options_.backup_rate_limit));
544   }
545   if (options_.restore_rate_limiter == nullptr &&
546       options_.restore_rate_limit > 0) {
547     options_.restore_rate_limiter.reset(
548         NewGenericRateLimiter(options_.restore_rate_limit));
549   }
550 }
551 
~BackupEngineImpl()552 BackupEngineImpl::~BackupEngineImpl() {
553   files_to_copy_or_create_.sendEof();
554   for (auto& t : threads_) {
555     t.join();
556   }
557   LogFlush(options_.info_log);
558 }
559 
Initialize()560 Status BackupEngineImpl::Initialize() {
561   assert(!initialized_);
562   initialized_ = true;
563   if (read_only_) {
564     ROCKS_LOG_INFO(options_.info_log, "Starting read_only backup engine");
565   }
566   options_.Dump(options_.info_log);
567 
568   if (!read_only_) {
569     // we might need to clean up from previous crash or I/O errors
570     might_need_garbage_collect_ = true;
571 
572     if (options_.max_valid_backups_to_open != port::kMaxInt32) {
573       options_.max_valid_backups_to_open = port::kMaxInt32;
574       ROCKS_LOG_WARN(
575           options_.info_log,
576           "`max_valid_backups_to_open` is not set to the default value. Ignoring "
577           "its value since BackupEngine is not read-only.");
578     }
579 
580     // gather the list of directories that we need to create
581     std::vector<std::pair<std::string, std::unique_ptr<Directory>*>>
582         directories;
583     directories.emplace_back(GetAbsolutePath(), &backup_directory_);
584     if (options_.share_table_files) {
585       if (options_.share_files_with_checksum) {
586         directories.emplace_back(
587             GetAbsolutePath(GetSharedFileWithChecksumRel()),
588             &shared_directory_);
589       } else {
590         directories.emplace_back(GetAbsolutePath(GetSharedFileRel()),
591                                  &shared_directory_);
592       }
593     }
594     directories.emplace_back(GetAbsolutePath(GetPrivateDirRel()),
595                              &private_directory_);
596     directories.emplace_back(GetBackupMetaDir(), &meta_directory_);
597     // create all the dirs we need
598     for (const auto& d : directories) {
599       auto s = backup_env_->CreateDirIfMissing(d.first);
600       if (s.ok()) {
601         s = backup_env_->NewDirectory(d.first, d.second);
602       }
603       if (!s.ok()) {
604         return s;
605       }
606     }
607   }
608 
609   std::vector<std::string> backup_meta_files;
610   {
611     auto s = backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files);
612     if (s.IsNotFound()) {
613       return Status::NotFound(GetBackupMetaDir() + " is missing");
614     } else if (!s.ok()) {
615       return s;
616     }
617   }
618   // create backups_ structure
619   for (auto& file : backup_meta_files) {
620     if (file == "." || file == "..") {
621       continue;
622     }
623     ROCKS_LOG_INFO(options_.info_log, "Detected backup %s", file.c_str());
624     BackupID backup_id = 0;
625     sscanf(file.c_str(), "%u", &backup_id);
626     if (backup_id == 0 || file != ROCKSDB_NAMESPACE::ToString(backup_id)) {
627       if (!read_only_) {
628         // invalid file name, delete that
629         auto s = backup_env_->DeleteFile(GetBackupMetaDir() + "/" + file);
630         ROCKS_LOG_INFO(options_.info_log,
631                        "Unrecognized meta file %s, deleting -- %s",
632                        file.c_str(), s.ToString().c_str());
633       }
634       continue;
635     }
636     assert(backups_.find(backup_id) == backups_.end());
637     backups_.insert(std::make_pair(
638         backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
639                        GetBackupMetaFile(backup_id, false /* tmp */),
640                        GetBackupMetaFile(backup_id, true /* tmp */),
641                        &backuped_file_infos_, backup_env_))));
642   }
643 
644   latest_backup_id_ = 0;
645   latest_valid_backup_id_ = 0;
646   if (options_.destroy_old_data) {  // Destroy old data
647     assert(!read_only_);
648     ROCKS_LOG_INFO(
649         options_.info_log,
650         "Backup Engine started with destroy_old_data == true, deleting all "
651         "backups");
652     auto s = PurgeOldBackups(0);
653     if (s.ok()) {
654       s = GarbageCollect();
655     }
656     if (!s.ok()) {
657       return s;
658     }
659   } else {  // Load data from storage
660     std::unordered_map<std::string, uint64_t> abs_path_to_size;
661     for (const auto& rel_dir :
662          {GetSharedFileRel(), GetSharedFileWithChecksumRel()}) {
663       const auto abs_dir = GetAbsolutePath(rel_dir);
664       InsertPathnameToSizeBytes(abs_dir, backup_env_, &abs_path_to_size);
665     }
666     // load the backups if any, until valid_backups_to_open of the latest
667     // non-corrupted backups have been successfully opened.
668     int valid_backups_to_open = options_.max_valid_backups_to_open;
669     for (auto backup_iter = backups_.rbegin();
670          backup_iter != backups_.rend();
671          ++backup_iter) {
672       assert(latest_backup_id_ == 0 || latest_backup_id_ > backup_iter->first);
673       if (latest_backup_id_ == 0) {
674         latest_backup_id_ = backup_iter->first;
675       }
676       if (valid_backups_to_open == 0) {
677         break;
678       }
679 
680       InsertPathnameToSizeBytes(
681           GetAbsolutePath(GetPrivateFileRel(backup_iter->first)), backup_env_,
682           &abs_path_to_size);
683       Status s = backup_iter->second->LoadFromFile(options_.backup_dir,
684                                                    abs_path_to_size);
685       if (s.IsCorruption()) {
686         ROCKS_LOG_INFO(options_.info_log, "Backup %u corrupted -- %s",
687                        backup_iter->first, s.ToString().c_str());
688         corrupt_backups_.insert(
689             std::make_pair(backup_iter->first,
690                            std::make_pair(s, std::move(backup_iter->second))));
691       } else if (!s.ok()) {
692         // Distinguish corruption errors from errors in the backup Env.
693         // Errors in the backup Env (i.e., this code path) will cause Open() to
694         // fail, whereas corruption errors would not cause Open() failures.
695         return s;
696       } else {
697         ROCKS_LOG_INFO(options_.info_log, "Loading backup %" PRIu32 " OK:\n%s",
698                        backup_iter->first,
699                        backup_iter->second->GetInfoString().c_str());
700         assert(latest_valid_backup_id_ == 0 ||
701                latest_valid_backup_id_ > backup_iter->first);
702         if (latest_valid_backup_id_ == 0) {
703           latest_valid_backup_id_ = backup_iter->first;
704         }
705         --valid_backups_to_open;
706       }
707     }
708 
709     for (const auto& corrupt : corrupt_backups_) {
710       backups_.erase(backups_.find(corrupt.first));
711     }
712     // erase the backups before max_valid_backups_to_open
713     int num_unopened_backups;
714     if (options_.max_valid_backups_to_open == 0) {
715       num_unopened_backups = 0;
716     } else {
717       num_unopened_backups =
718           std::max(0, static_cast<int>(backups_.size()) -
719                           options_.max_valid_backups_to_open);
720     }
721     for (int i = 0; i < num_unopened_backups; ++i) {
722       assert(backups_.begin()->second->Empty());
723       backups_.erase(backups_.begin());
724     }
725   }
726 
727   ROCKS_LOG_INFO(options_.info_log, "Latest backup is %u", latest_backup_id_);
728   ROCKS_LOG_INFO(options_.info_log, "Latest valid backup is %u",
729                  latest_valid_backup_id_);
730 
731   // set up threads perform copies from files_to_copy_or_create_ in the
732   // background
733   for (int t = 0; t < options_.max_background_operations; t++) {
734     threads_.emplace_back([this]() {
735 #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
736 #if __GLIBC_PREREQ(2, 12)
737       pthread_setname_np(pthread_self(), "backup_engine");
738 #endif
739 #endif
740       CopyOrCreateWorkItem work_item;
741       while (files_to_copy_or_create_.read(work_item)) {
742         CopyOrCreateResult result;
743         result.status = CopyOrCreateFile(
744             work_item.src_path, work_item.dst_path, work_item.contents,
745             work_item.src_env, work_item.dst_env, work_item.src_env_options,
746             work_item.sync, work_item.rate_limiter, &result.size,
747             &result.checksum_value, work_item.size_limit,
748             work_item.progress_callback);
749         work_item.result.set_value(std::move(result));
750       }
751     });
752   }
753   ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
754 
755   return Status::OK();
756 }
757 
CreateNewBackupWithMetadata(DB * db,const std::string & app_metadata,bool flush_before_backup,std::function<void ()> progress_callback)758 Status BackupEngineImpl::CreateNewBackupWithMetadata(
759     DB* db, const std::string& app_metadata, bool flush_before_backup,
760     std::function<void()> progress_callback) {
761   assert(initialized_);
762   assert(!read_only_);
763   if (app_metadata.size() > kMaxAppMetaSize) {
764     return Status::InvalidArgument("App metadata too large");
765   }
766 
767   BackupID new_backup_id = latest_backup_id_ + 1;
768 
769   assert(backups_.find(new_backup_id) == backups_.end());
770 
771   auto private_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id));
772   Status s = backup_env_->FileExists(private_dir);
773   if (s.ok()) {
774     // maybe last backup failed and left partial state behind, clean it up.
775     // need to do this before updating backups_ such that a private dir
776     // named after new_backup_id will be cleaned up.
777     // (If an incomplete new backup is followed by an incomplete delete
778     // of the latest full backup, then there could be more than one next
779     // id with a private dir, the last thing to be deleted in delete
780     // backup, but all will be cleaned up with a GarbageCollect.)
781     s = GarbageCollect();
782   } else if (s.IsNotFound()) {
783     // normal case, the new backup's private dir doesn't exist yet
784     s = Status::OK();
785   }
786 
787   auto ret = backups_.insert(std::make_pair(
788       new_backup_id, std::unique_ptr<BackupMeta>(new BackupMeta(
789                          GetBackupMetaFile(new_backup_id, false /* tmp */),
790                          GetBackupMetaFile(new_backup_id, true /* tmp */),
791                          &backuped_file_infos_, backup_env_))));
792   assert(ret.second == true);
793   auto& new_backup = ret.first->second;
794   new_backup->RecordTimestamp();
795   new_backup->SetAppMetadata(app_metadata);
796 
797   auto start_backup = backup_env_->NowMicros();
798 
799   ROCKS_LOG_INFO(options_.info_log,
800                  "Started the backup process -- creating backup %u",
801                  new_backup_id);
802   if (s.ok()) {
803     s = backup_env_->CreateDir(private_dir);
804   }
805 
806   RateLimiter* rate_limiter = options_.backup_rate_limiter.get();
807   if (rate_limiter) {
808     copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
809   }
810 
811   // A set into which we will insert the dst_paths that are calculated for live
812   // files and live WAL files.
813   // This is used to check whether a live files shares a dst_path with another
814   // live file.
815   std::unordered_set<std::string> live_dst_paths;
816 
817   std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
818   // Add a CopyOrCreateWorkItem to the channel for each live file
819   db->DisableFileDeletions();
820   if (s.ok()) {
821     CheckpointImpl checkpoint(db);
822     uint64_t sequence_number = 0;
823     DBOptions db_options = db->GetDBOptions();
824     EnvOptions src_raw_env_options(db_options);
825     s = checkpoint.CreateCustomCheckpoint(
826         db_options,
827         [&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
828             FileType) {
829           // custom checkpoint will switch to calling copy_file_cb after it sees
830           // NotSupported returned from link_file_cb.
831           return Status::NotSupported();
832         } /* link_file_cb */,
833         [&](const std::string& src_dirname, const std::string& fname,
834             uint64_t size_limit_bytes, FileType type) {
835           if (type == kLogFile && !options_.backup_log_files) {
836             return Status::OK();
837           }
838           Log(options_.info_log, "add file for backup %s", fname.c_str());
839           uint64_t size_bytes = 0;
840           Status st;
841           if (type == kTableFile) {
842             st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
843           }
844           EnvOptions src_env_options;
845           switch (type) {
846             case kLogFile:
847               src_env_options =
848                   db_env_->OptimizeForLogRead(src_raw_env_options);
849               break;
850             case kTableFile:
851               src_env_options = db_env_->OptimizeForCompactionTableRead(
852                   src_raw_env_options, ImmutableDBOptions(db_options));
853               break;
854             case kDescriptorFile:
855               src_env_options =
856                   db_env_->OptimizeForManifestRead(src_raw_env_options);
857               break;
858             default:
859               // Other backed up files (like options file) are not read by live
860               // DB, so don't need to worry about avoiding mixing buffered and
861               // direct I/O. Just use plain defaults.
862               src_env_options = src_raw_env_options;
863               break;
864           }
865           if (st.ok()) {
866             st = AddBackupFileWorkItem(
867                 live_dst_paths, backup_items_to_finish, new_backup_id,
868                 options_.share_table_files && type == kTableFile, src_dirname,
869                 fname, src_env_options, rate_limiter, size_bytes,
870                 size_limit_bytes,
871                 options_.share_files_with_checksum && type == kTableFile,
872                 progress_callback);
873           }
874           return st;
875         } /* copy_file_cb */,
876         [&](const std::string& fname, const std::string& contents, FileType) {
877           Log(options_.info_log, "add file for backup %s", fname.c_str());
878           return AddBackupFileWorkItem(
879               live_dst_paths, backup_items_to_finish, new_backup_id,
880               false /* shared */, "" /* src_dir */, fname,
881               EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
882               0 /* size_limit */, false /* shared_checksum */,
883               progress_callback, contents);
884         } /* create_file_cb */,
885         &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
886     if (s.ok()) {
887       new_backup->SetSequenceNumber(sequence_number);
888     }
889   }
890   ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
891   Status item_status;
892   for (auto& item : backup_items_to_finish) {
893     item.result.wait();
894     auto result = item.result.get();
895     item_status = result.status;
896     if (item_status.ok() && item.shared && item.needed_to_copy) {
897       item_status = item.backup_env->RenameFile(item.dst_path_tmp,
898                                                 item.dst_path);
899     }
900     if (item_status.ok()) {
901       item_status = new_backup.get()->AddFile(
902               std::make_shared<FileInfo>(item.dst_relative,
903                                          result.size,
904                                          result.checksum_value));
905     }
906     if (!item_status.ok()) {
907       s = item_status;
908     }
909   }
910 
911   // we copied all the files, enable file deletions
912   db->EnableFileDeletions(false);
913 
914   auto backup_time = backup_env_->NowMicros() - start_backup;
915 
916   if (s.ok()) {
917     // persist the backup metadata on the disk
918     s = new_backup->StoreToFile(options_.sync);
919   }
920   if (s.ok() && options_.sync) {
921     std::unique_ptr<Directory> backup_private_directory;
922     backup_env_->NewDirectory(
923         GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)),
924         &backup_private_directory);
925     if (backup_private_directory != nullptr) {
926       s = backup_private_directory->Fsync();
927     }
928     if (s.ok() && private_directory_ != nullptr) {
929       s = private_directory_->Fsync();
930     }
931     if (s.ok() && meta_directory_ != nullptr) {
932       s = meta_directory_->Fsync();
933     }
934     if (s.ok() && shared_directory_ != nullptr) {
935       s = shared_directory_->Fsync();
936     }
937     if (s.ok() && backup_directory_ != nullptr) {
938       s = backup_directory_->Fsync();
939     }
940   }
941 
942   if (s.ok()) {
943     backup_statistics_.IncrementNumberSuccessBackup();
944   }
945   if (!s.ok()) {
946     backup_statistics_.IncrementNumberFailBackup();
947     // clean all the files we might have created
948     ROCKS_LOG_INFO(options_.info_log, "Backup failed -- %s",
949                    s.ToString().c_str());
950     ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s\n",
951                    backup_statistics_.ToString().c_str());
952     // delete files that we might have already written
953     might_need_garbage_collect_ = true;
954     DeleteBackup(new_backup_id);
955     return s;
956   }
957 
958   // here we know that we succeeded and installed the new backup
959   // in the LATEST_BACKUP file
960   latest_backup_id_ = new_backup_id;
961   latest_valid_backup_id_ = new_backup_id;
962   ROCKS_LOG_INFO(options_.info_log, "Backup DONE. All is good");
963 
964   // backup_speed is in byte/second
965   double backup_speed = new_backup->GetSize() / (1.048576 * backup_time);
966   ROCKS_LOG_INFO(options_.info_log, "Backup number of files: %u",
967                  new_backup->GetNumberFiles());
968   char human_size[16];
969   AppendHumanBytes(new_backup->GetSize(), human_size, sizeof(human_size));
970   ROCKS_LOG_INFO(options_.info_log, "Backup size: %s", human_size);
971   ROCKS_LOG_INFO(options_.info_log, "Backup time: %" PRIu64 " microseconds",
972                  backup_time);
973   ROCKS_LOG_INFO(options_.info_log, "Backup speed: %.3f MB/s", backup_speed);
974   ROCKS_LOG_INFO(options_.info_log, "Backup Statistics %s",
975                  backup_statistics_.ToString().c_str());
976   return s;
977 }
978 
PurgeOldBackups(uint32_t num_backups_to_keep)979 Status BackupEngineImpl::PurgeOldBackups(uint32_t num_backups_to_keep) {
980   assert(initialized_);
981   assert(!read_only_);
982 
983   // Best effort deletion even with errors
984   Status overall_status = Status::OK();
985 
986   ROCKS_LOG_INFO(options_.info_log, "Purging old backups, keeping %u",
987                  num_backups_to_keep);
988   std::vector<BackupID> to_delete;
989   auto itr = backups_.begin();
990   while ((backups_.size() - to_delete.size()) > num_backups_to_keep) {
991     to_delete.push_back(itr->first);
992     itr++;
993   }
994   for (auto backup_id : to_delete) {
995     auto s = DeleteBackupInternal(backup_id);
996     if (!s.ok()) {
997       overall_status = s;
998     }
999   }
1000   // Clean up after any incomplete backup deletion, potentially from
1001   // earlier session.
1002   if (might_need_garbage_collect_) {
1003     auto s = GarbageCollect();
1004     if (!s.ok() && overall_status.ok()) {
1005       overall_status = s;
1006     }
1007   }
1008   return overall_status;
1009 }
1010 
DeleteBackup(BackupID backup_id)1011 Status BackupEngineImpl::DeleteBackup(BackupID backup_id) {
1012   auto s1 = DeleteBackupInternal(backup_id);
1013   auto s2 = Status::OK();
1014 
1015   // Clean up after any incomplete backup deletion, potentially from
1016   // earlier session.
1017   if (might_need_garbage_collect_) {
1018     s2 = GarbageCollect();
1019   }
1020 
1021   if (!s1.ok()) {
1022     return s1;
1023   } else {
1024     return s2;
1025   }
1026 }
1027 
1028 // Does not auto-GarbageCollect
DeleteBackupInternal(BackupID backup_id)1029 Status BackupEngineImpl::DeleteBackupInternal(BackupID backup_id) {
1030   assert(initialized_);
1031   assert(!read_only_);
1032 
1033   ROCKS_LOG_INFO(options_.info_log, "Deleting backup %u", backup_id);
1034   auto backup = backups_.find(backup_id);
1035   if (backup != backups_.end()) {
1036     auto s = backup->second->Delete();
1037     if (!s.ok()) {
1038       return s;
1039     }
1040     backups_.erase(backup);
1041   } else {
1042     auto corrupt = corrupt_backups_.find(backup_id);
1043     if (corrupt == corrupt_backups_.end()) {
1044       return Status::NotFound("Backup not found");
1045     }
1046     auto s = corrupt->second.second->Delete();
1047     if (!s.ok()) {
1048       return s;
1049     }
1050     corrupt_backups_.erase(corrupt);
1051   }
1052 
1053   // After removing meta file, best effort deletion even with errors.
1054   // (Don't delete other files if we can't delete the meta file right
1055   // now.)
1056   std::vector<std::string> to_delete;
1057   for (auto& itr : backuped_file_infos_) {
1058     if (itr.second->refs == 0) {
1059       Status s = backup_env_->DeleteFile(GetAbsolutePath(itr.first));
1060       ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s", itr.first.c_str(),
1061                      s.ToString().c_str());
1062       to_delete.push_back(itr.first);
1063       if (!s.ok()) {
1064         // Trying again later might work
1065         might_need_garbage_collect_ = true;
1066       }
1067     }
1068   }
1069   for (auto& td : to_delete) {
1070     backuped_file_infos_.erase(td);
1071   }
1072 
1073   // take care of private dirs -- GarbageCollect() will take care of them
1074   // if they are not empty
1075   std::string private_dir = GetPrivateFileRel(backup_id);
1076   Status s = backup_env_->DeleteDir(GetAbsolutePath(private_dir));
1077   ROCKS_LOG_INFO(options_.info_log, "Deleting private dir %s -- %s",
1078                  private_dir.c_str(), s.ToString().c_str());
1079   if (!s.ok()) {
1080     // Full gc or trying again later might work
1081     might_need_garbage_collect_ = true;
1082   }
1083   return Status::OK();
1084 }
1085 
GetBackupInfo(std::vector<BackupInfo> * backup_info)1086 void BackupEngineImpl::GetBackupInfo(std::vector<BackupInfo>* backup_info) {
1087   assert(initialized_);
1088   backup_info->reserve(backups_.size());
1089   for (auto& backup : backups_) {
1090     if (!backup.second->Empty()) {
1091       backup_info->push_back(BackupInfo(
1092           backup.first, backup.second->GetTimestamp(), backup.second->GetSize(),
1093           backup.second->GetNumberFiles(), backup.second->GetAppMetadata()));
1094     }
1095   }
1096 }
1097 
1098 void
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1099 BackupEngineImpl::GetCorruptedBackups(
1100     std::vector<BackupID>* corrupt_backup_ids) {
1101   assert(initialized_);
1102   corrupt_backup_ids->reserve(corrupt_backups_.size());
1103   for (auto& backup : corrupt_backups_) {
1104     corrupt_backup_ids->push_back(backup.first);
1105   }
1106 }
1107 
RestoreDBFromBackup(BackupID backup_id,const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options)1108 Status BackupEngineImpl::RestoreDBFromBackup(
1109     BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
1110     const RestoreOptions& restore_options) {
1111   assert(initialized_);
1112   auto corrupt_itr = corrupt_backups_.find(backup_id);
1113   if (corrupt_itr != corrupt_backups_.end()) {
1114     return corrupt_itr->second.first;
1115   }
1116   auto backup_itr = backups_.find(backup_id);
1117   if (backup_itr == backups_.end()) {
1118     return Status::NotFound("Backup not found");
1119   }
1120   auto& backup = backup_itr->second;
1121   if (backup->Empty()) {
1122     return Status::NotFound("Backup not found");
1123   }
1124 
1125   ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
1126   ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
1127                  static_cast<int>(restore_options.keep_log_files));
1128 
1129   // just in case. Ignore errors
1130   db_env_->CreateDirIfMissing(db_dir);
1131   db_env_->CreateDirIfMissing(wal_dir);
1132 
1133   if (restore_options.keep_log_files) {
1134     // delete files in db_dir, but keep all the log files
1135     DeleteChildren(db_dir, 1 << kLogFile);
1136     // move all the files from archive dir to wal_dir
1137     std::string archive_dir = ArchivalDirectory(wal_dir);
1138     std::vector<std::string> archive_files;
1139     db_env_->GetChildren(archive_dir, &archive_files);  // ignore errors
1140     for (const auto& f : archive_files) {
1141       uint64_t number;
1142       FileType type;
1143       bool ok = ParseFileName(f, &number, &type);
1144       if (ok && type == kLogFile) {
1145         ROCKS_LOG_INFO(options_.info_log,
1146                        "Moving log file from archive/ to wal_dir: %s",
1147                        f.c_str());
1148         Status s =
1149             db_env_->RenameFile(archive_dir + "/" + f, wal_dir + "/" + f);
1150         if (!s.ok()) {
1151           // if we can't move log file from archive_dir to wal_dir,
1152           // we should fail, since it might mean data loss
1153           return s;
1154         }
1155       }
1156     }
1157   } else {
1158     DeleteChildren(wal_dir);
1159     DeleteChildren(ArchivalDirectory(wal_dir));
1160     DeleteChildren(db_dir);
1161   }
1162 
1163   RateLimiter* rate_limiter = options_.restore_rate_limiter.get();
1164   if (rate_limiter) {
1165     copy_file_buffer_size_ = static_cast<size_t>(rate_limiter->GetSingleBurstBytes());
1166   }
1167   Status s;
1168   std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
1169   for (const auto& file_info : backup->GetFiles()) {
1170     const std::string &file = file_info->filename;
1171     std::string dst;
1172     // 1. extract the filename
1173     size_t slash = file.find_last_of('/');
1174     // file will either be shared/<file>, shared_checksum/<file_crc32_size>
1175     // or private/<number>/<file>
1176     assert(slash != std::string::npos);
1177     dst = file.substr(slash + 1);
1178 
1179     // if the file was in shared_checksum, extract the real file name
1180     // in this case the file is <number>_<checksum>_<size>.<type>
1181     if (file.substr(0, slash) == GetSharedChecksumDirRel()) {
1182       dst = GetFileFromChecksumFile(dst);
1183     }
1184 
1185     // 2. find the filetype
1186     uint64_t number;
1187     FileType type;
1188     bool ok = ParseFileName(dst, &number, &type);
1189     if (!ok) {
1190       return Status::Corruption("Backup corrupted");
1191     }
1192     // 3. Construct the final path
1193     // kLogFile lives in wal_dir and all the rest live in db_dir
1194     dst = ((type == kLogFile) ? wal_dir : db_dir) +
1195       "/" + dst;
1196 
1197     ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
1198                    dst.c_str());
1199     CopyOrCreateWorkItem copy_or_create_work_item(
1200         GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
1201         EnvOptions() /* src_env_options */, false, rate_limiter,
1202         0 /* size_limit */);
1203     RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1204         copy_or_create_work_item.result.get_future(),
1205         file_info->checksum_value);
1206     files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1207     restore_items_to_finish.push_back(
1208         std::move(after_copy_or_create_work_item));
1209   }
1210   Status item_status;
1211   for (auto& item : restore_items_to_finish) {
1212     item.result.wait();
1213     auto result = item.result.get();
1214     item_status = result.status;
1215     // Note: It is possible that both of the following bad-status cases occur
1216     // during copying. But, we only return one status.
1217     if (!item_status.ok()) {
1218       s = item_status;
1219       break;
1220     } else if (item.checksum_value != result.checksum_value) {
1221       s = Status::Corruption("Checksum check failed");
1222       break;
1223     }
1224   }
1225 
1226   ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
1227                  s.ToString().c_str());
1228   return s;
1229 }
1230 
VerifyBackup(BackupID backup_id)1231 Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
1232   assert(initialized_);
1233   auto corrupt_itr = corrupt_backups_.find(backup_id);
1234   if (corrupt_itr != corrupt_backups_.end()) {
1235     return corrupt_itr->second.first;
1236   }
1237 
1238   auto backup_itr = backups_.find(backup_id);
1239   if (backup_itr == backups_.end()) {
1240     return Status::NotFound();
1241   }
1242 
1243   auto& backup = backup_itr->second;
1244   if (backup->Empty()) {
1245     return Status::NotFound();
1246   }
1247 
1248   ROCKS_LOG_INFO(options_.info_log, "Verifying backup id %u\n", backup_id);
1249 
1250   std::unordered_map<std::string, uint64_t> curr_abs_path_to_size;
1251   for (const auto& rel_dir : {GetPrivateFileRel(backup_id), GetSharedFileRel(),
1252                               GetSharedFileWithChecksumRel()}) {
1253     const auto abs_dir = GetAbsolutePath(rel_dir);
1254     InsertPathnameToSizeBytes(abs_dir, backup_env_, &curr_abs_path_to_size);
1255   }
1256 
1257   for (const auto& file_info : backup->GetFiles()) {
1258     const auto abs_path = GetAbsolutePath(file_info->filename);
1259     if (curr_abs_path_to_size.find(abs_path) == curr_abs_path_to_size.end()) {
1260       return Status::NotFound("File missing: " + abs_path);
1261     }
1262     if (file_info->size != curr_abs_path_to_size[abs_path]) {
1263       return Status::Corruption("File corrupted: " + abs_path);
1264     }
1265   }
1266   return Status::OK();
1267 }
1268 
CopyOrCreateFile(const std::string & src,const std::string & dst,const std::string & contents,Env * src_env,Env * dst_env,const EnvOptions & src_env_options,bool sync,RateLimiter * rate_limiter,uint64_t * size,uint32_t * checksum_value,uint64_t size_limit,std::function<void ()> progress_callback)1269 Status BackupEngineImpl::CopyOrCreateFile(
1270     const std::string& src, const std::string& dst, const std::string& contents,
1271     Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
1272     RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
1273     uint64_t size_limit, std::function<void()> progress_callback) {
1274   assert(src.empty() != contents.empty());
1275   Status s;
1276   std::unique_ptr<WritableFile> dst_file;
1277   std::unique_ptr<SequentialFile> src_file;
1278   EnvOptions dst_env_options;
1279   dst_env_options.use_mmap_writes = false;
1280   // TODO:(gzh) maybe use direct reads/writes here if possible
1281   if (size != nullptr) {
1282     *size = 0;
1283   }
1284   if (checksum_value != nullptr) {
1285     *checksum_value = 0;
1286   }
1287 
1288   // Check if size limit is set. if not, set it to very big number
1289   if (size_limit == 0) {
1290     size_limit = std::numeric_limits<uint64_t>::max();
1291   }
1292 
1293   s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
1294   if (s.ok() && !src.empty()) {
1295     s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1296   }
1297   if (!s.ok()) {
1298     return s;
1299   }
1300 
1301   std::unique_ptr<WritableFileWriter> dest_writer(new WritableFileWriter(
1302       NewLegacyWritableFileWrapper(std::move(dst_file)), dst, dst_env_options));
1303   std::unique_ptr<SequentialFileReader> src_reader;
1304   std::unique_ptr<char[]> buf;
1305   if (!src.empty()) {
1306     src_reader.reset(new SequentialFileReader(
1307         NewLegacySequentialFileWrapper(src_file), src));
1308     buf.reset(new char[copy_file_buffer_size_]);
1309   }
1310 
1311   Slice data;
1312   uint64_t processed_buffer_size = 0;
1313   do {
1314     if (stop_backup_.load(std::memory_order_acquire)) {
1315       return Status::Incomplete("Backup stopped");
1316     }
1317     if (!src.empty()) {
1318       size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
1319                                   ? copy_file_buffer_size_
1320                                   : static_cast<size_t>(size_limit);
1321       s = src_reader->Read(buffer_to_read, &data, buf.get());
1322       processed_buffer_size += buffer_to_read;
1323     } else {
1324       data = contents;
1325     }
1326     size_limit -= data.size();
1327 
1328     if (!s.ok()) {
1329       return s;
1330     }
1331 
1332     if (size != nullptr) {
1333       *size += data.size();
1334     }
1335     if (checksum_value != nullptr) {
1336       *checksum_value =
1337           crc32c::Extend(*checksum_value, data.data(), data.size());
1338     }
1339     s = dest_writer->Append(data);
1340     if (rate_limiter != nullptr) {
1341       rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
1342                             RateLimiter::OpType::kWrite);
1343     }
1344     if (processed_buffer_size > options_.callback_trigger_interval_size) {
1345       processed_buffer_size -= options_.callback_trigger_interval_size;
1346       std::lock_guard<std::mutex> lock(byte_report_mutex_);
1347       progress_callback();
1348     }
1349   } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
1350 
1351   if (s.ok() && sync) {
1352     s = dest_writer->Sync(false);
1353   }
1354   if (s.ok()) {
1355     s = dest_writer->Close();
1356   }
1357   return s;
1358 }
1359 
1360 // fname will always start with "/"
AddBackupFileWorkItem(std::unordered_set<std::string> & live_dst_paths,std::vector<BackupAfterCopyOrCreateWorkItem> & backup_items_to_finish,BackupID backup_id,bool shared,const std::string & src_dir,const std::string & fname,const EnvOptions & src_env_options,RateLimiter * rate_limiter,uint64_t size_bytes,uint64_t size_limit,bool shared_checksum,std::function<void ()> progress_callback,const std::string & contents)1361 Status BackupEngineImpl::AddBackupFileWorkItem(
1362     std::unordered_set<std::string>& live_dst_paths,
1363     std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
1364     BackupID backup_id, bool shared, const std::string& src_dir,
1365     const std::string& fname, const EnvOptions& src_env_options,
1366     RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
1367     bool shared_checksum, std::function<void()> progress_callback,
1368     const std::string& contents) {
1369   assert(!fname.empty() && fname[0] == '/');
1370   assert(contents.empty() != src_dir.empty());
1371 
1372   std::string dst_relative = fname.substr(1);
1373   std::string dst_relative_tmp;
1374   Status s;
1375   uint32_t checksum_value = 0;
1376 
1377   if (shared && shared_checksum) {
1378     // add checksum and file length to the file name
1379     s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
1380                           &checksum_value);
1381     if (!s.ok()) {
1382       return s;
1383     }
1384     if (size_bytes == port::kMaxUint64) {
1385       return Status::NotFound("File missing: " + src_dir + fname);
1386     }
1387     dst_relative =
1388         GetSharedFileWithChecksum(dst_relative, checksum_value, size_bytes);
1389     dst_relative_tmp = GetSharedFileWithChecksumRel(dst_relative, true);
1390     dst_relative = GetSharedFileWithChecksumRel(dst_relative, false);
1391   } else if (shared) {
1392     dst_relative_tmp = GetSharedFileRel(dst_relative, true);
1393     dst_relative = GetSharedFileRel(dst_relative, false);
1394   } else {
1395     dst_relative = GetPrivateFileRel(backup_id, false, dst_relative);
1396   }
1397 
1398   // We copy into `temp_dest_path` and, once finished, rename it to
1399   // `final_dest_path`. This allows files to atomically appear at
1400   // `final_dest_path`. We can copy directly to the final path when atomicity
1401   // is unnecessary, like for files in private backup directories.
1402   const std::string* copy_dest_path;
1403   std::string temp_dest_path;
1404   std::string final_dest_path = GetAbsolutePath(dst_relative);
1405   if (!dst_relative_tmp.empty()) {
1406     temp_dest_path = GetAbsolutePath(dst_relative_tmp);
1407     copy_dest_path = &temp_dest_path;
1408   } else {
1409     copy_dest_path = &final_dest_path;
1410   }
1411 
1412   // if it's shared, we also need to check if it exists -- if it does, no need
1413   // to copy it again.
1414   bool need_to_copy = true;
1415   // true if final_dest_path is the same path as another live file
1416   const bool same_path =
1417       live_dst_paths.find(final_dest_path) != live_dst_paths.end();
1418 
1419   bool file_exists = false;
1420   if (shared && !same_path) {
1421     Status exist = backup_env_->FileExists(final_dest_path);
1422     if (exist.ok()) {
1423       file_exists = true;
1424     } else if (exist.IsNotFound()) {
1425       file_exists = false;
1426     } else {
1427       assert(s.IsIOError());
1428       return exist;
1429     }
1430   }
1431 
1432   if (!contents.empty()) {
1433     need_to_copy = false;
1434   } else if (shared && (same_path || file_exists)) {
1435     need_to_copy = false;
1436     if (shared_checksum) {
1437       ROCKS_LOG_INFO(options_.info_log,
1438                      "%s already present, with checksum %u and size %" PRIu64,
1439                      fname.c_str(), checksum_value, size_bytes);
1440     } else if (backuped_file_infos_.find(dst_relative) ==
1441                backuped_file_infos_.end() && !same_path) {
1442       // file already exists, but it's not referenced by any backup. overwrite
1443       // the file
1444       ROCKS_LOG_INFO(
1445           options_.info_log,
1446           "%s already present, but not referenced by any backup. We will "
1447           "overwrite the file.",
1448           fname.c_str());
1449       need_to_copy = true;
1450       backup_env_->DeleteFile(final_dest_path);
1451     } else {
1452       // the file is present and referenced by a backup
1453       ROCKS_LOG_INFO(options_.info_log,
1454                      "%s already present, calculate checksum", fname.c_str());
1455       s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
1456                             size_limit, &checksum_value);
1457     }
1458   }
1459   live_dst_paths.insert(final_dest_path);
1460 
1461   if (!contents.empty() || need_to_copy) {
1462     ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
1463                    copy_dest_path->c_str());
1464     CopyOrCreateWorkItem copy_or_create_work_item(
1465         src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
1466         db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
1467         size_limit, progress_callback);
1468     BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1469         copy_or_create_work_item.result.get_future(), shared, need_to_copy,
1470         backup_env_, temp_dest_path, final_dest_path, dst_relative);
1471     files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
1472     backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1473   } else {
1474     std::promise<CopyOrCreateResult> promise_result;
1475     BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
1476         promise_result.get_future(), shared, need_to_copy, backup_env_,
1477         temp_dest_path, final_dest_path, dst_relative);
1478     backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
1479     CopyOrCreateResult result;
1480     result.status = s;
1481     result.size = size_bytes;
1482     result.checksum_value = checksum_value;
1483     promise_result.set_value(std::move(result));
1484   }
1485   return s;
1486 }
1487 
CalculateChecksum(const std::string & src,Env * src_env,const EnvOptions & src_env_options,uint64_t size_limit,uint32_t * checksum_value)1488 Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
1489                                            const EnvOptions& src_env_options,
1490                                            uint64_t size_limit,
1491                                            uint32_t* checksum_value) {
1492   *checksum_value = 0;
1493   if (size_limit == 0) {
1494     size_limit = std::numeric_limits<uint64_t>::max();
1495   }
1496 
1497   std::unique_ptr<SequentialFile> src_file;
1498   Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
1499   if (!s.ok()) {
1500     return s;
1501   }
1502 
1503   std::unique_ptr<SequentialFileReader> src_reader(
1504       new SequentialFileReader(NewLegacySequentialFileWrapper(src_file), src));
1505   std::unique_ptr<char[]> buf(new char[copy_file_buffer_size_]);
1506   Slice data;
1507 
1508   do {
1509     if (stop_backup_.load(std::memory_order_acquire)) {
1510       return Status::Incomplete("Backup stopped");
1511     }
1512     size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ?
1513       copy_file_buffer_size_ : static_cast<size_t>(size_limit);
1514     s = src_reader->Read(buffer_to_read, &data, buf.get());
1515 
1516     if (!s.ok()) {
1517       return s;
1518     }
1519 
1520     size_limit -= data.size();
1521     *checksum_value = crc32c::Extend(*checksum_value, data.data(), data.size());
1522   } while (data.size() > 0 && size_limit > 0);
1523 
1524   return s;
1525 }
1526 
DeleteChildren(const std::string & dir,uint32_t file_type_filter)1527 void BackupEngineImpl::DeleteChildren(const std::string& dir,
1528                                       uint32_t file_type_filter) {
1529   std::vector<std::string> children;
1530   db_env_->GetChildren(dir, &children);  // ignore errors
1531 
1532   for (const auto& f : children) {
1533     uint64_t number;
1534     FileType type;
1535     bool ok = ParseFileName(f, &number, &type);
1536     if (ok && (file_type_filter & (1 << type))) {
1537       // don't delete this file
1538       continue;
1539     }
1540     db_env_->DeleteFile(dir + "/" + f);  // ignore errors
1541   }
1542 }
1543 
InsertPathnameToSizeBytes(const std::string & dir,Env * env,std::unordered_map<std::string,uint64_t> * result)1544 Status BackupEngineImpl::InsertPathnameToSizeBytes(
1545     const std::string& dir, Env* env,
1546     std::unordered_map<std::string, uint64_t>* result) {
1547   assert(result != nullptr);
1548   std::vector<Env::FileAttributes> files_attrs;
1549   Status status = env->FileExists(dir);
1550   if (status.ok()) {
1551     status = env->GetChildrenFileAttributes(dir, &files_attrs);
1552   } else if (status.IsNotFound()) {
1553     // Insert no entries can be considered success
1554     status = Status::OK();
1555   }
1556   const bool slash_needed = dir.empty() || dir.back() != '/';
1557   for (const auto& file_attrs : files_attrs) {
1558     result->emplace(dir + (slash_needed ? "/" : "") + file_attrs.name,
1559                     file_attrs.size_bytes);
1560   }
1561   return status;
1562 }
1563 
GarbageCollect()1564 Status BackupEngineImpl::GarbageCollect() {
1565   assert(!read_only_);
1566 
1567   // We will make a best effort to remove all garbage even in the presence
1568   // of inconsistencies or I/O failures that inhibit finding garbage.
1569   Status overall_status = Status::OK();
1570   // If all goes well, we don't need another auto-GC this session
1571   might_need_garbage_collect_ = false;
1572 
1573   ROCKS_LOG_INFO(options_.info_log, "Starting garbage collection");
1574 
1575   // delete obsolete shared files
1576   for (bool with_checksum : {false, true}) {
1577     std::vector<std::string> shared_children;
1578     {
1579       std::string shared_path;
1580       if (with_checksum) {
1581         shared_path = GetAbsolutePath(GetSharedFileWithChecksumRel());
1582       } else {
1583         shared_path = GetAbsolutePath(GetSharedFileRel());
1584       }
1585       auto s = backup_env_->FileExists(shared_path);
1586       if (s.ok()) {
1587         s = backup_env_->GetChildren(shared_path, &shared_children);
1588       } else if (s.IsNotFound()) {
1589         s = Status::OK();
1590       }
1591       if (!s.ok()) {
1592         overall_status = s;
1593         // Trying again later might work
1594         might_need_garbage_collect_ = true;
1595       }
1596     }
1597     for (auto& child : shared_children) {
1598       if (child == "." || child == "..") {
1599         continue;
1600       }
1601       std::string rel_fname;
1602       if (with_checksum) {
1603         rel_fname = GetSharedFileWithChecksumRel(child);
1604       } else {
1605         rel_fname = GetSharedFileRel(child);
1606       }
1607       auto child_itr = backuped_file_infos_.find(rel_fname);
1608       // if it's not refcounted, delete it
1609       if (child_itr == backuped_file_infos_.end() ||
1610           child_itr->second->refs == 0) {
1611         // this might be a directory, but DeleteFile will just fail in that
1612         // case, so we're good
1613         Status s = backup_env_->DeleteFile(GetAbsolutePath(rel_fname));
1614         ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1615                        rel_fname.c_str(), s.ToString().c_str());
1616         backuped_file_infos_.erase(rel_fname);
1617         if (!s.ok()) {
1618           // Trying again later might work
1619           might_need_garbage_collect_ = true;
1620         }
1621       }
1622     }
1623   }
1624 
1625   // delete obsolete private files
1626   std::vector<std::string> private_children;
1627   {
1628     auto s = backup_env_->GetChildren(GetAbsolutePath(GetPrivateDirRel()),
1629                                       &private_children);
1630     if (!s.ok()) {
1631       overall_status = s;
1632       // Trying again later might work
1633       might_need_garbage_collect_ = true;
1634     }
1635   }
1636   for (auto& child : private_children) {
1637     if (child == "." || child == "..") {
1638       continue;
1639     }
1640 
1641     BackupID backup_id = 0;
1642     bool tmp_dir = child.find(".tmp") != std::string::npos;
1643     sscanf(child.c_str(), "%u", &backup_id);
1644     if (!tmp_dir &&  // if it's tmp_dir, delete it
1645         (backup_id == 0 || backups_.find(backup_id) != backups_.end())) {
1646       // it's either not a number or it's still alive. continue
1647       continue;
1648     }
1649     // here we have to delete the dir and all its children
1650     std::string full_private_path =
1651         GetAbsolutePath(GetPrivateFileRel(backup_id));
1652     std::vector<std::string> subchildren;
1653     backup_env_->GetChildren(full_private_path, &subchildren);
1654     for (auto& subchild : subchildren) {
1655       if (subchild == "." || subchild == "..") {
1656         continue;
1657       }
1658       Status s = backup_env_->DeleteFile(full_private_path + subchild);
1659       ROCKS_LOG_INFO(options_.info_log, "Deleting %s -- %s",
1660                      (full_private_path + subchild).c_str(),
1661                      s.ToString().c_str());
1662       if (!s.ok()) {
1663         // Trying again later might work
1664         might_need_garbage_collect_ = true;
1665       }
1666     }
1667     // finally delete the private dir
1668     Status s = backup_env_->DeleteDir(full_private_path);
1669     ROCKS_LOG_INFO(options_.info_log, "Deleting dir %s -- %s",
1670                    full_private_path.c_str(), s.ToString().c_str());
1671     if (!s.ok()) {
1672       // Trying again later might work
1673       might_need_garbage_collect_ = true;
1674     }
1675   }
1676 
1677   assert(overall_status.ok() || might_need_garbage_collect_);
1678   return overall_status;
1679 }
1680 
1681 // ------- BackupMeta class --------
1682 
AddFile(std::shared_ptr<FileInfo> file_info)1683 Status BackupEngineImpl::BackupMeta::AddFile(
1684     std::shared_ptr<FileInfo> file_info) {
1685   auto itr = file_infos_->find(file_info->filename);
1686   if (itr == file_infos_->end()) {
1687     auto ret = file_infos_->insert({file_info->filename, file_info});
1688     if (ret.second) {
1689       itr = ret.first;
1690       itr->second->refs = 1;
1691     } else {
1692       // if this happens, something is seriously wrong
1693       return Status::Corruption("In memory metadata insertion error");
1694     }
1695   } else {
1696     if (itr->second->checksum_value != file_info->checksum_value) {
1697       return Status::Corruption(
1698           "Checksum mismatch for existing backup file. Delete old backups and "
1699           "try again.");
1700     }
1701     ++itr->second->refs;  // increase refcount if already present
1702   }
1703 
1704   size_ += file_info->size;
1705   files_.push_back(itr->second);
1706 
1707   return Status::OK();
1708 }
1709 
Delete(bool delete_meta)1710 Status BackupEngineImpl::BackupMeta::Delete(bool delete_meta) {
1711   Status s;
1712   for (const auto& file : files_) {
1713     --file->refs;  // decrease refcount
1714   }
1715   files_.clear();
1716   // delete meta file
1717   if (delete_meta) {
1718     s = env_->FileExists(meta_filename_);
1719     if (s.ok()) {
1720       s = env_->DeleteFile(meta_filename_);
1721     } else if (s.IsNotFound()) {
1722       s = Status::OK();  // nothing to delete
1723     }
1724   }
1725   timestamp_ = 0;
1726   return s;
1727 }
1728 
1729 Slice kMetaDataPrefix("metadata ");
1730 
1731 // each backup meta file is of the format:
1732 // <timestamp>
1733 // <seq number>
1734 // <metadata(literal string)> <metadata> (optional)
1735 // <number of files>
1736 // <file1> <crc32(literal string)> <crc32_value>
1737 // <file2> <crc32(literal string)> <crc32_value>
1738 // ...
LoadFromFile(const std::string & backup_dir,const std::unordered_map<std::string,uint64_t> & abs_path_to_size)1739 Status BackupEngineImpl::BackupMeta::LoadFromFile(
1740     const std::string& backup_dir,
1741     const std::unordered_map<std::string, uint64_t>& abs_path_to_size) {
1742   assert(Empty());
1743   Status s;
1744   std::unique_ptr<SequentialFile> backup_meta_file;
1745   s = env_->NewSequentialFile(meta_filename_, &backup_meta_file, EnvOptions());
1746   if (!s.ok()) {
1747     return s;
1748   }
1749 
1750   std::unique_ptr<SequentialFileReader> backup_meta_reader(
1751       new SequentialFileReader(NewLegacySequentialFileWrapper(backup_meta_file),
1752                                meta_filename_));
1753   std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_ + 1]);
1754   Slice data;
1755   s = backup_meta_reader->Read(max_backup_meta_file_size_, &data, buf.get());
1756 
1757   if (!s.ok() || data.size() == max_backup_meta_file_size_) {
1758     return s.ok() ? Status::Corruption("File size too big") : s;
1759   }
1760   buf[data.size()] = 0;
1761 
1762   uint32_t num_files = 0;
1763   char *next;
1764   timestamp_ = strtoull(data.data(), &next, 10);
1765   data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1766   sequence_number_ = strtoull(data.data(), &next, 10);
1767   data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1768 
1769   if (data.starts_with(kMetaDataPrefix)) {
1770     // app metadata present
1771     data.remove_prefix(kMetaDataPrefix.size());
1772     Slice hex_encoded_metadata = GetSliceUntil(&data, '\n');
1773     bool decode_success = hex_encoded_metadata.DecodeHex(&app_metadata_);
1774     if (!decode_success) {
1775       return Status::Corruption(
1776           "Failed to decode stored hex encoded app metadata");
1777     }
1778   }
1779 
1780   num_files = static_cast<uint32_t>(strtoul(data.data(), &next, 10));
1781   data.remove_prefix(next - data.data() + 1); // +1 for '\n'
1782 
1783   std::vector<std::shared_ptr<FileInfo>> files;
1784 
1785   Slice checksum_prefix("crc32 ");
1786 
1787   for (uint32_t i = 0; s.ok() && i < num_files; ++i) {
1788     auto line = GetSliceUntil(&data, '\n');
1789     std::string filename = GetSliceUntil(&line, ' ').ToString();
1790 
1791     uint64_t size;
1792     const std::shared_ptr<FileInfo> file_info = GetFile(filename);
1793     if (file_info) {
1794       size = file_info->size;
1795     } else {
1796       std::string abs_path = backup_dir + "/" + filename;
1797       try {
1798         size = abs_path_to_size.at(abs_path);
1799       } catch (std::out_of_range&) {
1800         return Status::Corruption("Size missing for pathname: " + abs_path);
1801       }
1802     }
1803 
1804     if (line.empty()) {
1805       return Status::Corruption("File checksum is missing for " + filename +
1806                                 " in " + meta_filename_);
1807     }
1808 
1809     uint32_t checksum_value = 0;
1810     if (line.starts_with(checksum_prefix)) {
1811       line.remove_prefix(checksum_prefix.size());
1812       checksum_value = static_cast<uint32_t>(
1813           strtoul(line.data(), nullptr, 10));
1814       if (line != ROCKSDB_NAMESPACE::ToString(checksum_value)) {
1815         return Status::Corruption("Invalid checksum value for " + filename +
1816                                   " in " + meta_filename_);
1817       }
1818     } else {
1819       return Status::Corruption("Unknown checksum type for " + filename +
1820                                 " in " + meta_filename_);
1821     }
1822 
1823     files.emplace_back(new FileInfo(filename, size, checksum_value));
1824   }
1825 
1826   if (s.ok() && data.size() > 0) {
1827     // file has to be read completely. if not, we count it as corruption
1828     s = Status::Corruption("Tailing data in backup meta file in " +
1829                            meta_filename_);
1830   }
1831 
1832   if (s.ok()) {
1833     files_.reserve(files.size());
1834     for (const auto& file_info : files) {
1835       s = AddFile(file_info);
1836       if (!s.ok()) {
1837         break;
1838       }
1839     }
1840   }
1841 
1842   return s;
1843 }
1844 
StoreToFile(bool sync)1845 Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
1846   Status s;
1847   std::unique_ptr<WritableFile> backup_meta_file;
1848   EnvOptions env_options;
1849   env_options.use_mmap_writes = false;
1850   env_options.use_direct_writes = false;
1851   s = env_->NewWritableFile(meta_tmp_filename_, &backup_meta_file, env_options);
1852   if (!s.ok()) {
1853     return s;
1854   }
1855 
1856   std::unique_ptr<char[]> buf(new char[max_backup_meta_file_size_]);
1857   size_t len = 0, buf_size = max_backup_meta_file_size_;
1858   len += snprintf(buf.get(), buf_size, "%" PRId64 "\n", timestamp_);
1859   len += snprintf(buf.get() + len, buf_size - len, "%" PRIu64 "\n",
1860                   sequence_number_);
1861   if (!app_metadata_.empty()) {
1862     std::string hex_encoded_metadata =
1863         Slice(app_metadata_).ToString(/* hex */ true);
1864 
1865     // +1 to accommodate newline character
1866     size_t hex_meta_strlen = kMetaDataPrefix.ToString().length() + hex_encoded_metadata.length() + 1;
1867     if (hex_meta_strlen >= buf_size) {
1868       return Status::Corruption("Buffer too small to fit backup metadata");
1869     }
1870     else if (len + hex_meta_strlen >= buf_size) {
1871       backup_meta_file->Append(Slice(buf.get(), len));
1872       buf.reset();
1873       std::unique_ptr<char[]> new_reset_buf(
1874           new char[max_backup_meta_file_size_]);
1875       buf.swap(new_reset_buf);
1876       len = 0;
1877     }
1878     len += snprintf(buf.get() + len, buf_size - len, "%s%s\n",
1879                     kMetaDataPrefix.ToString().c_str(),
1880                     hex_encoded_metadata.c_str());
1881   }
1882 
1883   char writelen_temp[19];
1884   if (len + snprintf(writelen_temp, sizeof(writelen_temp),
1885                      "%" ROCKSDB_PRIszt "\n", files_.size()) >= buf_size) {
1886     backup_meta_file->Append(Slice(buf.get(), len));
1887     buf.reset();
1888     std::unique_ptr<char[]> new_reset_buf(new char[max_backup_meta_file_size_]);
1889     buf.swap(new_reset_buf);
1890     len = 0;
1891   }
1892   {
1893     const char *const_write = writelen_temp;
1894     len += snprintf(buf.get() + len, buf_size - len, "%s", const_write);
1895   }
1896 
1897   for (const auto& file : files_) {
1898     // use crc32 for now, switch to something else if needed
1899 
1900     size_t newlen = len + file->filename.length() + snprintf(writelen_temp,
1901       sizeof(writelen_temp), " crc32 %u\n", file->checksum_value);
1902     const char *const_write = writelen_temp;
1903     if (newlen >= buf_size) {
1904       backup_meta_file->Append(Slice(buf.get(), len));
1905       buf.reset();
1906       std::unique_ptr<char[]> new_reset_buf(
1907           new char[max_backup_meta_file_size_]);
1908       buf.swap(new_reset_buf);
1909       len = 0;
1910     }
1911     len += snprintf(buf.get() + len, buf_size - len, "%s%s",
1912                     file->filename.c_str(), const_write);
1913   }
1914 
1915   s = backup_meta_file->Append(Slice(buf.get(), len));
1916   if (s.ok() && sync) {
1917     s = backup_meta_file->Sync();
1918   }
1919   if (s.ok()) {
1920     s = backup_meta_file->Close();
1921   }
1922   if (s.ok()) {
1923     s = env_->RenameFile(meta_tmp_filename_, meta_filename_);
1924   }
1925   return s;
1926 }
1927 
1928 // -------- BackupEngineReadOnlyImpl ---------
1929 class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
1930  public:
BackupEngineReadOnlyImpl(Env * db_env,const BackupableDBOptions & options)1931   BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
1932       : backup_engine_(new BackupEngineImpl(db_env, options, true)) {}
1933 
~BackupEngineReadOnlyImpl()1934   ~BackupEngineReadOnlyImpl() override {}
1935 
1936   // The returned BackupInfos are in chronological order, which means the
1937   // latest backup comes last.
GetBackupInfo(std::vector<BackupInfo> * backup_info)1938   void GetBackupInfo(std::vector<BackupInfo>* backup_info) override {
1939     backup_engine_->GetBackupInfo(backup_info);
1940   }
1941 
GetCorruptedBackups(std::vector<BackupID> * corrupt_backup_ids)1942   void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override {
1943     backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
1944   }
1945 
RestoreDBFromBackup(BackupID backup_id,const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options=RestoreOptions ())1946   Status RestoreDBFromBackup(
1947       BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
1948       const RestoreOptions& restore_options = RestoreOptions()) override {
1949     return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
1950                                                restore_options);
1951   }
1952 
RestoreDBFromLatestBackup(const std::string & db_dir,const std::string & wal_dir,const RestoreOptions & restore_options=RestoreOptions ())1953   Status RestoreDBFromLatestBackup(
1954       const std::string& db_dir, const std::string& wal_dir,
1955       const RestoreOptions& restore_options = RestoreOptions()) override {
1956     return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
1957                                                      restore_options);
1958   }
1959 
VerifyBackup(BackupID backup_id)1960   Status VerifyBackup(BackupID backup_id) override {
1961     return backup_engine_->VerifyBackup(backup_id);
1962   }
1963 
Initialize()1964   Status Initialize() { return backup_engine_->Initialize(); }
1965 
1966  private:
1967   std::unique_ptr<BackupEngineImpl> backup_engine_;
1968 };
1969 
Open(Env * env,const BackupableDBOptions & options,BackupEngineReadOnly ** backup_engine_ptr)1970 Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
1971                                   BackupEngineReadOnly** backup_engine_ptr) {
1972   if (options.destroy_old_data) {
1973     return Status::InvalidArgument(
1974         "Can't destroy old data with ReadOnly BackupEngine");
1975   }
1976   std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
1977       new BackupEngineReadOnlyImpl(env, options));
1978   auto s = backup_engine->Initialize();
1979   if (!s.ok()) {
1980     *backup_engine_ptr = nullptr;
1981     return s;
1982   }
1983   *backup_engine_ptr = backup_engine.release();
1984   return Status::OK();
1985 }
1986 
1987 }  // namespace ROCKSDB_NAMESPACE
1988 
1989 #endif  // ROCKSDB_LITE
1990