1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2012 Facebook.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file.
9 
10 #ifndef ROCKSDB_LITE
11 
12 #include "utilities/checkpoint/checkpoint_impl.h"
13 
14 #include <algorithm>
15 #include <cinttypes>
16 #include <string>
17 #include <vector>
18 
19 #include "db/wal_manager.h"
20 #include "file/file_util.h"
21 #include "file/filename.h"
22 #include "port/port.h"
23 #include "rocksdb/db.h"
24 #include "rocksdb/env.h"
25 #include "rocksdb/metadata.h"
26 #include "rocksdb/transaction_log.h"
27 #include "rocksdb/utilities/checkpoint.h"
28 #include "test_util/sync_point.h"
29 
30 namespace rocksdb {
31 
Create(DB * db,Checkpoint ** checkpoint_ptr)32 Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
33   *checkpoint_ptr = new CheckpointImpl(db);
34   return Status::OK();
35 }
36 
CreateCheckpoint(const std::string &,uint64_t)37 Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
38                                     uint64_t /*log_size_for_flush*/) {
39   return Status::NotSupported("");
40 }
41 
CleanStagingDirectory(const std::string & full_private_path,Logger * info_log)42 void CheckpointImpl::CleanStagingDirectory(
43     const std::string& full_private_path, Logger* info_log) {
44     std::vector<std::string> subchildren;
45   Status s = db_->GetEnv()->FileExists(full_private_path);
46   if (s.IsNotFound()) {
47     return;
48   }
49   ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
50                  full_private_path.c_str(), s.ToString().c_str());
51   db_->GetEnv()->GetChildren(full_private_path, &subchildren);
52   for (auto& subchild : subchildren) {
53     std::string subchild_path = full_private_path + "/" + subchild;
54     s = db_->GetEnv()->DeleteFile(subchild_path);
55     ROCKS_LOG_INFO(info_log, "Delete file %s -- %s",
56                    subchild_path.c_str(), s.ToString().c_str());
57   }
58   // finally delete the private dir
59   s = db_->GetEnv()->DeleteDir(full_private_path);
60   ROCKS_LOG_INFO(info_log, "Delete dir %s -- %s",
61                  full_private_path.c_str(), s.ToString().c_str());
62 }
63 
ExportColumnFamily(ColumnFamilyHandle *,const std::string &,ExportImportFilesMetaData **)64 Status Checkpoint::ExportColumnFamily(
65     ColumnFamilyHandle* /*handle*/, const std::string& /*export_dir*/,
66     ExportImportFilesMetaData** /*metadata*/) {
67   return Status::NotSupported("");
68 }
69 
70 // Builds an openable snapshot of RocksDB
CreateCheckpoint(const std::string & checkpoint_dir,uint64_t log_size_for_flush)71 Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
72                                         uint64_t log_size_for_flush) {
73   DBOptions db_options = db_->GetDBOptions();
74 
75   Status s = db_->GetEnv()->FileExists(checkpoint_dir);
76   if (s.ok()) {
77     return Status::InvalidArgument("Directory exists");
78   } else if (!s.IsNotFound()) {
79     assert(s.IsIOError());
80     return s;
81   }
82 
83   ROCKS_LOG_INFO(
84       db_options.info_log,
85       "Started the snapshot process -- creating snapshot in directory %s",
86       checkpoint_dir.c_str());
87 
88   size_t final_nonslash_idx = checkpoint_dir.find_last_not_of('/');
89   if (final_nonslash_idx == std::string::npos) {
90     // npos means it's only slashes or empty. Non-empty means it's the root
91     // directory, but it shouldn't be because we verified above the directory
92     // doesn't exist.
93     assert(checkpoint_dir.empty());
94     return Status::InvalidArgument("invalid checkpoint directory name");
95   }
96 
97   std::string full_private_path =
98       checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
99   ROCKS_LOG_INFO(
100       db_options.info_log,
101       "Snapshot process -- using temporary directory %s",
102       full_private_path.c_str());
103   CleanStagingDirectory(full_private_path, db_options.info_log.get());
104   // create snapshot directory
105   s = db_->GetEnv()->CreateDir(full_private_path);
106   uint64_t sequence_number = 0;
107   if (s.ok()) {
108     db_->DisableFileDeletions();
109     s = CreateCustomCheckpoint(
110         db_options,
111         [&](const std::string& src_dirname, const std::string& fname,
112             FileType) {
113           ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
114           return db_->GetFileSystem()->LinkFile(src_dirname + fname,
115                                                 full_private_path + fname,
116                                                 IOOptions(), nullptr);
117         } /* link_file_cb */,
118         [&](const std::string& src_dirname, const std::string& fname,
119             uint64_t size_limit_bytes, FileType) {
120           ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
121           return CopyFile(db_->GetFileSystem(), src_dirname + fname,
122                           full_private_path + fname, size_limit_bytes,
123                           db_options.use_fsync);
124         } /* copy_file_cb */,
125         [&](const std::string& fname, const std::string& contents, FileType) {
126           ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
127           return CreateFile(db_->GetFileSystem(), full_private_path + fname,
128                             contents, db_options.use_fsync);
129         } /* create_file_cb */,
130         &sequence_number, log_size_for_flush);
131     // we copied all the files, enable file deletions
132     db_->EnableFileDeletions(false);
133   }
134 
135   if (s.ok()) {
136     // move tmp private backup to real snapshot directory
137     s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
138   }
139   if (s.ok()) {
140     std::unique_ptr<Directory> checkpoint_directory;
141     db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
142     if (checkpoint_directory != nullptr) {
143       s = checkpoint_directory->Fsync();
144     }
145   }
146 
147   if (s.ok()) {
148     // here we know that we succeeded and installed the new snapshot
149     ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
150     ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
151                    sequence_number);
152   } else {
153     // clean all the files we might have created
154     ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
155                    s.ToString().c_str());
156     CleanStagingDirectory(full_private_path, db_options.info_log.get());
157   }
158   return s;
159 }
160 
CreateCustomCheckpoint(const DBOptions & db_options,std::function<Status (const std::string & src_dirname,const std::string & src_fname,FileType type)> link_file_cb,std::function<Status (const std::string & src_dirname,const std::string & src_fname,uint64_t size_limit_bytes,FileType type)> copy_file_cb,std::function<Status (const std::string & fname,const std::string & contents,FileType type)> create_file_cb,uint64_t * sequence_number,uint64_t log_size_for_flush)161 Status CheckpointImpl::CreateCustomCheckpoint(
162     const DBOptions& db_options,
163     std::function<Status(const std::string& src_dirname,
164                          const std::string& src_fname, FileType type)>
165         link_file_cb,
166     std::function<Status(const std::string& src_dirname,
167                          const std::string& src_fname,
168                          uint64_t size_limit_bytes, FileType type)>
169         copy_file_cb,
170     std::function<Status(const std::string& fname, const std::string& contents,
171                          FileType type)>
172         create_file_cb,
173     uint64_t* sequence_number, uint64_t log_size_for_flush) {
174   Status s;
175   std::vector<std::string> live_files;
176   uint64_t manifest_file_size = 0;
177   uint64_t min_log_num = port::kMaxUint64;
178   *sequence_number = db_->GetLatestSequenceNumber();
179   bool same_fs = true;
180   VectorLogPtr live_wal_files;
181 
182   bool flush_memtable = true;
183   if (s.ok()) {
184     if (!db_options.allow_2pc) {
185       if (log_size_for_flush == port::kMaxUint64) {
186         flush_memtable = false;
187       } else if (log_size_for_flush > 0) {
188         // If out standing log files are small, we skip the flush.
189         s = db_->GetSortedWalFiles(live_wal_files);
190 
191         if (!s.ok()) {
192           return s;
193         }
194 
195         // Don't flush column families if total log size is smaller than
196         // log_size_for_flush. We copy the log files instead.
197         // We may be able to cover 2PC case too.
198         uint64_t total_wal_size = 0;
199         for (auto& wal : live_wal_files) {
200           total_wal_size += wal->SizeFileBytes();
201         }
202         if (total_wal_size < log_size_for_flush) {
203           flush_memtable = false;
204         }
205         live_wal_files.clear();
206       }
207     }
208 
209     // this will return live_files prefixed with "/"
210     s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
211 
212     if (s.ok() && db_options.allow_2pc) {
213       // If 2PC is enabled, we need to get minimum log number after the flush.
214       // Need to refetch the live files to recapture the snapshot.
215       if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
216                                &min_log_num)) {
217         return Status::InvalidArgument(
218             "2PC enabled but cannot fine the min log number to keep.");
219       }
220       // We need to refetch live files with flush to handle this case:
221       // A previous 000001.log contains the prepare record of transaction tnx1.
222       // The current log file is 000002.log, and sequence_number points to this
223       // file.
224       // After calling GetLiveFiles(), 000003.log is created.
225       // Then tnx1 is committed. The commit record is written to 000003.log.
226       // Now we fetch min_log_num, which will be 3.
227       // Then only 000002.log and 000003.log will be copied, and 000001.log will
228       // be skipped. 000003.log contains commit message of tnx1, but we don't
229       // have respective prepare record for it.
230       // In order to avoid this situation, we need to force flush to make sure
231       // all transactions committed before getting min_log_num will be flushed
232       // to SST files.
233       // We cannot get min_log_num before calling the GetLiveFiles() for the
234       // first time, because if we do that, all the logs files will be included,
235       // far more than needed.
236       s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
237     }
238 
239     TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
240     TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
241     db_->FlushWAL(false /* sync */);
242   }
243   // if we have more than one column family, we need to also get WAL files
244   if (s.ok()) {
245     s = db_->GetSortedWalFiles(live_wal_files);
246   }
247   if (!s.ok()) {
248     return s;
249   }
250 
251   size_t wal_size = live_wal_files.size();
252 
253   // copy/hard link live_files
254   std::string manifest_fname, current_fname;
255   for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
256     uint64_t number;
257     FileType type;
258     bool ok = ParseFileName(live_files[i], &number, &type);
259     if (!ok) {
260       s = Status::Corruption("Can't parse file name. This is very bad");
261       break;
262     }
263     // we should only get sst, options, manifest and current files here
264     assert(type == kTableFile || type == kDescriptorFile ||
265            type == kCurrentFile || type == kOptionsFile);
266     assert(live_files[i].size() > 0 && live_files[i][0] == '/');
267     if (type == kCurrentFile) {
268       // We will craft the current file manually to ensure it's consistent with
269       // the manifest number. This is necessary because current's file contents
270       // can change during checkpoint creation.
271       current_fname = live_files[i];
272       continue;
273     } else if (type == kDescriptorFile) {
274       manifest_fname = live_files[i];
275     }
276     std::string src_fname = live_files[i];
277 
278     // rules:
279     // * if it's kTableFile, then it's shared
280     // * if it's kDescriptorFile, limit the size to manifest_file_size
281     // * always copy if cross-device link
282     if ((type == kTableFile) && same_fs) {
283       s = link_file_cb(db_->GetName(), src_fname, type);
284       if (s.IsNotSupported()) {
285         same_fs = false;
286         s = Status::OK();
287       }
288     }
289     if ((type != kTableFile) || (!same_fs)) {
290       s = copy_file_cb(db_->GetName(), src_fname,
291                        (type == kDescriptorFile) ? manifest_file_size : 0,
292                        type);
293     }
294   }
295   if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
296     create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
297                    kCurrentFile);
298   }
299   ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
300                  live_wal_files.size());
301 
302   // Link WAL files. Copy exact size of last one because it is the only one
303   // that has changes after the last flush.
304   for (size_t i = 0; s.ok() && i < wal_size; ++i) {
305     if ((live_wal_files[i]->Type() == kAliveLogFile) &&
306         (!flush_memtable ||
307          live_wal_files[i]->StartSequence() >= *sequence_number ||
308          live_wal_files[i]->LogNumber() >= min_log_num)) {
309       if (i + 1 == wal_size) {
310         s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
311                          live_wal_files[i]->SizeFileBytes(), kLogFile);
312         break;
313       }
314       if (same_fs) {
315         // we only care about live log files
316         s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
317                          kLogFile);
318         if (s.IsNotSupported()) {
319           same_fs = false;
320           s = Status::OK();
321         }
322       }
323       if (!same_fs) {
324         s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
325                          kLogFile);
326       }
327     }
328   }
329 
330   return s;
331 }
332 
333 // Exports all live SST files of a specified Column Family onto export_dir,
334 // returning SST files information in metadata.
ExportColumnFamily(ColumnFamilyHandle * handle,const std::string & export_dir,ExportImportFilesMetaData ** metadata)335 Status CheckpointImpl::ExportColumnFamily(
336     ColumnFamilyHandle* handle, const std::string& export_dir,
337     ExportImportFilesMetaData** metadata) {
338   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(handle);
339   const auto cf_name = cfh->GetName();
340   const auto db_options = db_->GetDBOptions();
341 
342   assert(metadata != nullptr);
343   assert(*metadata == nullptr);
344   auto s = db_->GetEnv()->FileExists(export_dir);
345   if (s.ok()) {
346     return Status::InvalidArgument("Specified export_dir exists");
347   } else if (!s.IsNotFound()) {
348     assert(s.IsIOError());
349     return s;
350   }
351 
352   const auto final_nonslash_idx = export_dir.find_last_not_of('/');
353   if (final_nonslash_idx == std::string::npos) {
354     return Status::InvalidArgument("Specified export_dir invalid");
355   }
356   ROCKS_LOG_INFO(db_options.info_log,
357                  "[%s] export column family onto export directory %s",
358                  cf_name.c_str(), export_dir.c_str());
359 
360   // Create a temporary export directory.
361   const auto tmp_export_dir =
362       export_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
363   s = db_->GetEnv()->CreateDir(tmp_export_dir);
364 
365   if (s.ok()) {
366     s = db_->Flush(rocksdb::FlushOptions(), handle);
367   }
368 
369   ColumnFamilyMetaData db_metadata;
370   if (s.ok()) {
371     // Export live sst files with file deletions disabled.
372     s = db_->DisableFileDeletions();
373     if (s.ok()) {
374       db_->GetColumnFamilyMetaData(handle, &db_metadata);
375 
376       s = ExportFilesInMetaData(
377           db_options, db_metadata,
378           [&](const std::string& src_dirname, const std::string& fname) {
379             ROCKS_LOG_INFO(db_options.info_log, "[%s] HardLinking %s",
380                            cf_name.c_str(), fname.c_str());
381             return db_->GetEnv()->LinkFile(src_dirname + fname,
382                                            tmp_export_dir + fname);
383           } /*link_file_cb*/,
384           [&](const std::string& src_dirname, const std::string& fname) {
385             ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
386                            cf_name.c_str(), fname.c_str());
387             return CopyFile(db_->GetFileSystem(), src_dirname + fname,
388                             tmp_export_dir + fname, 0, db_options.use_fsync);
389           } /*copy_file_cb*/);
390 
391       const auto enable_status = db_->EnableFileDeletions(false /*force*/);
392       if (s.ok()) {
393         s = enable_status;
394       }
395     }
396   }
397 
398   auto moved_to_user_specified_dir = false;
399   if (s.ok()) {
400     // Move temporary export directory to the actual export directory.
401     s = db_->GetEnv()->RenameFile(tmp_export_dir, export_dir);
402   }
403 
404   if (s.ok()) {
405     // Fsync export directory.
406     moved_to_user_specified_dir = true;
407     std::unique_ptr<Directory> dir_ptr;
408     s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr);
409     if (s.ok()) {
410       assert(dir_ptr != nullptr);
411       s = dir_ptr->Fsync();
412     }
413   }
414 
415   if (s.ok()) {
416     // Export of files succeeded. Fill in the metadata information.
417     auto result_metadata = new ExportImportFilesMetaData();
418     result_metadata->db_comparator_name = handle->GetComparator()->Name();
419     for (const auto& level_metadata : db_metadata.levels) {
420       for (const auto& file_metadata : level_metadata.files) {
421         LiveFileMetaData live_file_metadata;
422         live_file_metadata.size = file_metadata.size;
423         live_file_metadata.name = std::move(file_metadata.name);
424         live_file_metadata.file_number = file_metadata.file_number;
425         live_file_metadata.db_path = export_dir;
426         live_file_metadata.smallest_seqno = file_metadata.smallest_seqno;
427         live_file_metadata.largest_seqno = file_metadata.largest_seqno;
428         live_file_metadata.smallestkey = std::move(file_metadata.smallestkey);
429         live_file_metadata.largestkey = std::move(file_metadata.largestkey);
430         live_file_metadata.oldest_blob_file_number =
431             file_metadata.oldest_blob_file_number;
432         live_file_metadata.level = level_metadata.level;
433         result_metadata->files.push_back(live_file_metadata);
434       }
435       *metadata = result_metadata;
436     }
437     ROCKS_LOG_INFO(db_options.info_log, "[%s] Export succeeded.",
438                    cf_name.c_str());
439   } else {
440     // Failure: Clean up all the files/directories created.
441     ROCKS_LOG_INFO(db_options.info_log, "[%s] Export failed. %s",
442                    cf_name.c_str(), s.ToString().c_str());
443     std::vector<std::string> subchildren;
444     const auto cleanup_dir =
445         moved_to_user_specified_dir ? export_dir : tmp_export_dir;
446     db_->GetEnv()->GetChildren(cleanup_dir, &subchildren);
447     for (const auto& subchild : subchildren) {
448       const auto subchild_path = cleanup_dir + "/" + subchild;
449       const auto status = db_->GetEnv()->DeleteFile(subchild_path);
450       if (!status.ok()) {
451         ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup file %s: %s",
452                        subchild_path.c_str(), status.ToString().c_str());
453       }
454     }
455     const auto status = db_->GetEnv()->DeleteDir(cleanup_dir);
456     if (!status.ok()) {
457       ROCKS_LOG_WARN(db_options.info_log, "Failed to cleanup dir %s: %s",
458                      cleanup_dir.c_str(), status.ToString().c_str());
459     }
460   }
461   return s;
462 }
463 
ExportFilesInMetaData(const DBOptions & db_options,const ColumnFamilyMetaData & metadata,std::function<Status (const std::string & src_dirname,const std::string & src_fname)> link_file_cb,std::function<Status (const std::string & src_dirname,const std::string & src_fname)> copy_file_cb)464 Status CheckpointImpl::ExportFilesInMetaData(
465     const DBOptions& db_options, const ColumnFamilyMetaData& metadata,
466     std::function<Status(const std::string& src_dirname,
467                          const std::string& src_fname)>
468         link_file_cb,
469     std::function<Status(const std::string& src_dirname,
470                          const std::string& src_fname)>
471         copy_file_cb) {
472   Status s;
473   auto hardlink_file = true;
474 
475   // Copy/hard link files in metadata.
476   size_t num_files = 0;
477   for (const auto& level_metadata : metadata.levels) {
478     for (const auto& file_metadata : level_metadata.files) {
479       uint64_t number;
480       FileType type;
481       const auto ok = ParseFileName(file_metadata.name, &number, &type);
482       if (!ok) {
483         s = Status::Corruption("Could not parse file name");
484         break;
485       }
486 
487       // We should only get sst files here.
488       assert(type == kTableFile);
489       assert(file_metadata.size > 0 && file_metadata.name[0] == '/');
490       const auto src_fname = file_metadata.name;
491       ++num_files;
492 
493       if (hardlink_file) {
494         s = link_file_cb(db_->GetName(), src_fname);
495         if (num_files == 1 && s.IsNotSupported()) {
496           // Fallback to copy if link failed due to cross-device directories.
497           hardlink_file = false;
498           s = Status::OK();
499         }
500       }
501       if (!hardlink_file) {
502         s = copy_file_cb(db_->GetName(), src_fname);
503       }
504       if (!s.ok()) {
505         break;
506       }
507     }
508   }
509   ROCKS_LOG_INFO(db_options.info_log, "Number of table files %" ROCKSDB_PRIszt,
510                  num_files);
511 
512   return s;
513 }
514 }  // namespace rocksdb
515 
516 #endif  // ROCKSDB_LITE
517