1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/wal_manager.h"
11 
12 #include <algorithm>
13 #include <cinttypes>
14 #include <memory>
15 #include <vector>
16 
17 #include "db/log_reader.h"
18 #include "db/log_writer.h"
19 #include "db/transaction_log_impl.h"
20 #include "db/write_batch_internal.h"
21 #include "file/file_util.h"
22 #include "file/filename.h"
23 #include "file/sequence_file_reader.h"
24 #include "logging/logging.h"
25 #include "port/port.h"
26 #include "rocksdb/env.h"
27 #include "rocksdb/options.h"
28 #include "rocksdb/write_batch.h"
29 #include "test_util/sync_point.h"
30 #include "util/cast_util.h"
31 #include "util/coding.h"
32 #include "util/mutexlock.h"
33 #include "util/string_util.h"
34 
35 namespace ROCKSDB_NAMESPACE {
36 
37 #ifndef ROCKSDB_LITE
38 
DeleteFile(const std::string & fname,uint64_t number)39 Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
40   auto s = env_->DeleteFile(db_options_.wal_dir + "/" + fname);
41   if (s.ok()) {
42     MutexLock l(&read_first_record_cache_mutex_);
43     read_first_record_cache_.erase(number);
44   }
45   return s;
46 }
47 
GetSortedWalFiles(VectorLogPtr & files)48 Status WalManager::GetSortedWalFiles(VectorLogPtr& files) {
49   // First get sorted files in db dir, then get sorted files from archived
50   // dir, to avoid a race condition where a log file is moved to archived
51   // dir in between.
52   Status s;
53   // list wal files in main db dir.
54   VectorLogPtr logs;
55   s = GetSortedWalsOfType(db_options_.wal_dir, logs, kAliveLogFile);
56   if (!s.ok()) {
57     return s;
58   }
59 
60   // Reproduce the race condition where a log file is moved
61   // to archived dir, between these two sync points, used in
62   // (DBTest,TransactionLogIteratorRace)
63   TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
64   TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
65 
66   files.clear();
67   // list wal files in archive dir.
68   std::string archivedir = ArchivalDirectory(db_options_.wal_dir);
69   Status exists = env_->FileExists(archivedir);
70   if (exists.ok()) {
71     s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
72     if (!s.ok()) {
73       return s;
74     }
75   } else if (!exists.IsNotFound()) {
76     assert(s.IsIOError());
77     return s;
78   }
79 
80   uint64_t latest_archived_log_number = 0;
81   if (!files.empty()) {
82     latest_archived_log_number = files.back()->LogNumber();
83     ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
84                    latest_archived_log_number);
85   }
86 
87   files.reserve(files.size() + logs.size());
88   for (auto& log : logs) {
89     if (log->LogNumber() > latest_archived_log_number) {
90       files.push_back(std::move(log));
91     } else {
92       // When the race condition happens, we could see the
93       // same log in both db dir and archived dir. Simply
94       // ignore the one in db dir. Note that, if we read
95       // archived dir first, we would have missed the log file.
96       ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
97                      log->PathName().c_str());
98     }
99   }
100 
101   return s;
102 }
103 
GetUpdatesSince(SequenceNumber seq,std::unique_ptr<TransactionLogIterator> * iter,const TransactionLogIterator::ReadOptions & read_options,VersionSet * version_set)104 Status WalManager::GetUpdatesSince(
105     SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
106     const TransactionLogIterator::ReadOptions& read_options,
107     VersionSet* version_set) {
108 
109   //  Get all sorted Wal Files.
110   //  Do binary search and open files and find the seq number.
111 
112   std::unique_ptr<VectorLogPtr> wal_files(new VectorLogPtr);
113   Status s = GetSortedWalFiles(*wal_files);
114   if (!s.ok()) {
115     return s;
116   }
117 
118   s = RetainProbableWalFiles(*wal_files, seq);
119   if (!s.ok()) {
120     return s;
121   }
122   iter->reset(new TransactionLogIteratorImpl(
123       db_options_.wal_dir, &db_options_, read_options, file_options_, seq,
124       std::move(wal_files), version_set, seq_per_batch_));
125   return (*iter)->status();
126 }
127 
128 // 1. Go through all archived files and
129 //    a. if ttl is enabled, delete outdated files
130 //    b. if archive size limit is enabled, delete empty files,
131 //        compute file number and size.
132 // 2. If size limit is enabled:
133 //    a. compute how many files should be deleted
134 //    b. get sorted non-empty archived logs
135 //    c. delete what should be deleted
PurgeObsoleteWALFiles()136 void WalManager::PurgeObsoleteWALFiles() {
137   bool const ttl_enabled = db_options_.wal_ttl_seconds > 0;
138   bool const size_limit_enabled = db_options_.wal_size_limit_mb > 0;
139   if (!ttl_enabled && !size_limit_enabled) {
140     return;
141   }
142 
143   int64_t current_time;
144   Status s = env_->GetCurrentTime(&current_time);
145   if (!s.ok()) {
146     ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
147                     s.ToString().c_str());
148     assert(false);
149     return;
150   }
151   uint64_t const now_seconds = static_cast<uint64_t>(current_time);
152   uint64_t const time_to_check = (ttl_enabled && !size_limit_enabled)
153                                      ? db_options_.wal_ttl_seconds / 2
154                                      : kDefaultIntervalToDeleteObsoleteWAL;
155 
156   if (purge_wal_files_last_run_ + time_to_check > now_seconds) {
157     return;
158   }
159 
160   purge_wal_files_last_run_ = now_seconds;
161 
162   std::string archival_dir = ArchivalDirectory(db_options_.wal_dir);
163   std::vector<std::string> files;
164   s = env_->GetChildren(archival_dir, &files);
165   if (!s.ok()) {
166     ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
167                     s.ToString().c_str());
168     assert(false);
169     return;
170   }
171 
172   size_t log_files_num = 0;
173   uint64_t log_file_size = 0;
174 
175   for (auto& f : files) {
176     uint64_t number;
177     FileType type;
178     if (ParseFileName(f, &number, &type) && type == kLogFile) {
179       std::string const file_path = archival_dir + "/" + f;
180       if (ttl_enabled) {
181         uint64_t file_m_time;
182         s = env_->GetFileModificationTime(file_path, &file_m_time);
183         if (!s.ok()) {
184           ROCKS_LOG_WARN(db_options_.info_log,
185                          "Can't get file mod time: %s: %s", file_path.c_str(),
186                          s.ToString().c_str());
187           continue;
188         }
189         if (now_seconds - file_m_time > db_options_.wal_ttl_seconds) {
190           s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
191                            /*force_fg=*/!wal_in_db_path_);
192           if (!s.ok()) {
193             ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
194                            file_path.c_str(), s.ToString().c_str());
195             continue;
196           } else {
197             MutexLock l(&read_first_record_cache_mutex_);
198             read_first_record_cache_.erase(number);
199           }
200           continue;
201         }
202       }
203 
204       if (size_limit_enabled) {
205         uint64_t file_size;
206         s = env_->GetFileSize(file_path, &file_size);
207         if (!s.ok()) {
208           ROCKS_LOG_ERROR(db_options_.info_log,
209                           "Unable to get file size: %s: %s", file_path.c_str(),
210                           s.ToString().c_str());
211           return;
212         } else {
213           if (file_size > 0) {
214             log_file_size = std::max(log_file_size, file_size);
215             ++log_files_num;
216           } else {
217             s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
218                              /*force_fg=*/!wal_in_db_path_);
219             if (!s.ok()) {
220               ROCKS_LOG_WARN(db_options_.info_log,
221                              "Unable to delete file: %s: %s", file_path.c_str(),
222                              s.ToString().c_str());
223               continue;
224             } else {
225               MutexLock l(&read_first_record_cache_mutex_);
226               read_first_record_cache_.erase(number);
227             }
228           }
229         }
230       }
231     }
232   }
233 
234   if (0 == log_files_num || !size_limit_enabled) {
235     return;
236   }
237 
238   size_t const files_keep_num =
239       static_cast<size_t>(db_options_.wal_size_limit_mb * 1024 * 1024 / log_file_size);
240   if (log_files_num <= files_keep_num) {
241     return;
242   }
243 
244   size_t files_del_num = log_files_num - files_keep_num;
245   VectorLogPtr archived_logs;
246   GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
247 
248   if (files_del_num > archived_logs.size()) {
249     ROCKS_LOG_WARN(db_options_.info_log,
250                    "Trying to delete more archived log files than "
251                    "exist. Deleting all");
252     files_del_num = archived_logs.size();
253   }
254 
255   for (size_t i = 0; i < files_del_num; ++i) {
256     std::string const file_path = archived_logs[i]->PathName();
257     s = DeleteDBFile(&db_options_, db_options_.wal_dir + "/" + file_path,
258                      db_options_.wal_dir, false,
259                      /*force_fg=*/!wal_in_db_path_);
260     if (!s.ok()) {
261       ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
262                      file_path.c_str(), s.ToString().c_str());
263       continue;
264     } else {
265       MutexLock l(&read_first_record_cache_mutex_);
266       read_first_record_cache_.erase(archived_logs[i]->LogNumber());
267     }
268   }
269 }
270 
ArchiveWALFile(const std::string & fname,uint64_t number)271 void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
272   auto archived_log_name = ArchivedLogFileName(db_options_.wal_dir, number);
273   // The sync point below is used in (DBTest,TransactionLogIteratorRace)
274   TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
275   Status s = env_->RenameFile(fname, archived_log_name);
276   // The sync point below is used in (DBTest,TransactionLogIteratorRace)
277   TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
278   ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
279                  fname.c_str(), archived_log_name.c_str(),
280                  s.ToString().c_str());
281 }
282 
GetSortedWalsOfType(const std::string & path,VectorLogPtr & log_files,WalFileType log_type)283 Status WalManager::GetSortedWalsOfType(const std::string& path,
284                                        VectorLogPtr& log_files,
285                                        WalFileType log_type) {
286   std::vector<std::string> all_files;
287   const Status status = env_->GetChildren(path, &all_files);
288   if (!status.ok()) {
289     return status;
290   }
291   log_files.reserve(all_files.size());
292   for (const auto& f : all_files) {
293     uint64_t number;
294     FileType type;
295     if (ParseFileName(f, &number, &type) && type == kLogFile) {
296       SequenceNumber sequence;
297       Status s = ReadFirstRecord(log_type, number, &sequence);
298       if (!s.ok()) {
299         return s;
300       }
301       if (sequence == 0) {
302         // empty file
303         continue;
304       }
305 
306       // Reproduce the race condition where a log file is moved
307       // to archived dir, between these two sync points, used in
308       // (DBTest,TransactionLogIteratorRace)
309       TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
310       TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
311 
312       uint64_t size_bytes;
313       s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
314       // re-try in case the alive log file has been moved to archive.
315       if (!s.ok() && log_type == kAliveLogFile) {
316         std::string archived_file = ArchivedLogFileName(path, number);
317         if (env_->FileExists(archived_file).ok()) {
318           s = env_->GetFileSize(archived_file, &size_bytes);
319           if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
320             // oops, the file just got deleted from archived dir! move on
321             s = Status::OK();
322             continue;
323           }
324         }
325       }
326       if (!s.ok()) {
327         return s;
328       }
329 
330       log_files.push_back(std::unique_ptr<LogFile>(
331           new LogFileImpl(number, log_type, sequence, size_bytes)));
332     }
333   }
334   std::sort(
335       log_files.begin(), log_files.end(),
336       [](const std::unique_ptr<LogFile>& a, const std::unique_ptr<LogFile>& b) {
337         LogFileImpl* a_impl =
338             static_cast_with_check<LogFileImpl, LogFile>(a.get());
339         LogFileImpl* b_impl =
340             static_cast_with_check<LogFileImpl, LogFile>(b.get());
341         return *a_impl < *b_impl;
342       });
343   return status;
344 }
345 
RetainProbableWalFiles(VectorLogPtr & all_logs,const SequenceNumber target)346 Status WalManager::RetainProbableWalFiles(VectorLogPtr& all_logs,
347                                           const SequenceNumber target) {
348   int64_t start = 0;  // signed to avoid overflow when target is < first file.
349   int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
350   // Binary Search. avoid opening all files.
351   while (end >= start) {
352     int64_t mid = start + (end - start) / 2;  // Avoid overflow.
353     SequenceNumber current_seq_num = all_logs.at(static_cast<size_t>(mid))->StartSequence();
354     if (current_seq_num == target) {
355       end = mid;
356       break;
357     } else if (current_seq_num < target) {
358       start = mid + 1;
359     } else {
360       end = mid - 1;
361     }
362   }
363   // end could be -ve.
364   size_t start_index = static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
365   // The last wal file is always included
366   all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
367   return Status::OK();
368 }
369 
ReadFirstRecord(const WalFileType type,const uint64_t number,SequenceNumber * sequence)370 Status WalManager::ReadFirstRecord(const WalFileType type,
371                                    const uint64_t number,
372                                    SequenceNumber* sequence) {
373   *sequence = 0;
374   if (type != kAliveLogFile && type != kArchivedLogFile) {
375     ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
376                     ToString(type).c_str());
377     return Status::NotSupported(
378         "File Type Not Known " + ToString(type));
379   }
380   {
381     MutexLock l(&read_first_record_cache_mutex_);
382     auto itr = read_first_record_cache_.find(number);
383     if (itr != read_first_record_cache_.end()) {
384       *sequence = itr->second;
385       return Status::OK();
386     }
387   }
388   Status s;
389   if (type == kAliveLogFile) {
390     std::string fname = LogFileName(db_options_.wal_dir, number);
391     s = ReadFirstLine(fname, number, sequence);
392     if (!s.ok() && env_->FileExists(fname).ok()) {
393       // return any error that is not caused by non-existing file
394       return s;
395     }
396   }
397 
398   if (type == kArchivedLogFile || !s.ok()) {
399     //  check if the file got moved to archive.
400     std::string archived_file =
401         ArchivedLogFileName(db_options_.wal_dir, number);
402     s = ReadFirstLine(archived_file, number, sequence);
403     // maybe the file was deleted from archive dir. If that's the case, return
404     // Status::OK(). The caller with identify this as empty file because
405     // *sequence == 0
406     if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
407       return Status::OK();
408     }
409   }
410 
411   if (s.ok() && *sequence != 0) {
412     MutexLock l(&read_first_record_cache_mutex_);
413     read_first_record_cache_.insert({number, *sequence});
414   }
415   return s;
416 }
417 
GetLiveWalFile(uint64_t number,std::unique_ptr<LogFile> * log_file)418 Status WalManager::GetLiveWalFile(uint64_t number,
419                                   std::unique_ptr<LogFile>* log_file) {
420   if (!log_file) {
421     return Status::InvalidArgument("log_file not preallocated.");
422   }
423 
424   if (!number) {
425     return Status::PathNotFound("log file not available");
426   }
427 
428   Status s;
429 
430   uint64_t size_bytes;
431   s = env_->GetFileSize(LogFileName(db_options_.wal_dir, number), &size_bytes);
432 
433   if (!s.ok()) {
434     return s;
435   }
436 
437   log_file->reset(new LogFileImpl(number, kAliveLogFile,
438                                   0,  // SequenceNumber
439                                   size_bytes));
440 
441   return Status::OK();
442 }
443 
444 // the function returns status.ok() and sequence == 0 if the file exists, but is
445 // empty
ReadFirstLine(const std::string & fname,const uint64_t number,SequenceNumber * sequence)446 Status WalManager::ReadFirstLine(const std::string& fname,
447                                  const uint64_t number,
448                                  SequenceNumber* sequence) {
449   struct LogReporter : public log::Reader::Reporter {
450     Env* env;
451     Logger* info_log;
452     const char* fname;
453 
454     Status* status;
455     bool ignore_error;  // true if db_options_.paranoid_checks==false
456     void Corruption(size_t bytes, const Status& s) override {
457       ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
458                      (this->ignore_error ? "(ignoring error) " : ""), fname,
459                      static_cast<int>(bytes), s.ToString().c_str());
460       if (this->status->ok()) {
461         // only keep the first error
462         *this->status = s;
463       }
464     }
465   };
466 
467   std::unique_ptr<FSSequentialFile> file;
468   Status status = fs_->NewSequentialFile(fname,
469                                          fs_->OptimizeForLogRead(file_options_),
470                                          &file, nullptr);
471   std::unique_ptr<SequentialFileReader> file_reader(
472       new SequentialFileReader(std::move(file), fname));
473 
474   if (!status.ok()) {
475     return status;
476   }
477 
478   LogReporter reporter;
479   reporter.env = env_;
480   reporter.info_log = db_options_.info_log.get();
481   reporter.fname = fname.c_str();
482   reporter.status = &status;
483   reporter.ignore_error = !db_options_.paranoid_checks;
484   log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
485                      true /*checksum*/, number);
486   std::string scratch;
487   Slice record;
488 
489   if (reader.ReadRecord(&record, &scratch) &&
490       (status.ok() || !db_options_.paranoid_checks)) {
491     if (record.size() < WriteBatchInternal::kHeader) {
492       reporter.Corruption(record.size(),
493                           Status::Corruption("log record too small"));
494       // TODO read record's till the first no corrupt entry?
495     } else {
496       WriteBatch batch;
497       WriteBatchInternal::SetContents(&batch, record);
498       *sequence = WriteBatchInternal::Sequence(&batch);
499       return Status::OK();
500     }
501   }
502 
503   // ReadRecord returns false on EOF, which means that the log file is empty. we
504   // return status.ok() in that case and set sequence number to 0
505   *sequence = 0;
506   return status;
507 }
508 
509 #endif  // ROCKSDB_LITE
510 }  // namespace ROCKSDB_NAMESPACE
511