1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "file/sst_file_manager_impl.h"
7 
8 #include <cinttypes>
9 #include <vector>
10 
11 #include "db/db_impl/db_impl.h"
12 #include "env/composite_env_wrapper.h"
13 #include "port/port.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/sst_file_manager.h"
16 #include "test_util/sync_point.h"
17 #include "util/mutexlock.h"
18 
19 namespace ROCKSDB_NAMESPACE {
20 
21 #ifndef ROCKSDB_LITE
SstFileManagerImpl(Env * env,std::shared_ptr<FileSystem> fs,std::shared_ptr<Logger> logger,int64_t rate_bytes_per_sec,double max_trash_db_ratio,uint64_t bytes_max_delete_chunk)22 SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<FileSystem> fs,
23                                        std::shared_ptr<Logger> logger,
24                                        int64_t rate_bytes_per_sec,
25                                        double max_trash_db_ratio,
26                                        uint64_t bytes_max_delete_chunk)
27     : env_(env),
28       fs_(fs),
29       logger_(logger),
30       total_files_size_(0),
31       in_progress_files_size_(0),
32       compaction_buffer_size_(0),
33       cur_compactions_reserved_size_(0),
34       max_allowed_space_(0),
35       delete_scheduler_(env, fs_.get(), rate_bytes_per_sec, logger.get(), this,
36                         max_trash_db_ratio, bytes_max_delete_chunk),
37       cv_(&mu_),
38       closing_(false),
39       bg_thread_(nullptr),
40       reserved_disk_buffer_(0),
41       free_space_trigger_(0),
42       cur_instance_(nullptr) {}
43 
~SstFileManagerImpl()44 SstFileManagerImpl::~SstFileManagerImpl() {
45   Close();
46 }
47 
Close()48 void SstFileManagerImpl::Close() {
49   {
50     MutexLock l(&mu_);
51     if (closing_) {
52       return;
53     }
54     closing_ = true;
55     cv_.SignalAll();
56   }
57   if (bg_thread_) {
58     bg_thread_->join();
59   }
60 }
61 
OnAddFile(const std::string & file_path,bool compaction)62 Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
63                                      bool compaction) {
64   uint64_t file_size;
65   Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
66   if (s.ok()) {
67     MutexLock l(&mu_);
68     OnAddFileImpl(file_path, file_size, compaction);
69   }
70   TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
71   return s;
72 }
73 
OnAddFile(const std::string & file_path,uint64_t file_size,bool compaction)74 Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
75                                      uint64_t file_size, bool compaction) {
76   MutexLock l(&mu_);
77   OnAddFileImpl(file_path, file_size, compaction);
78   TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
79   return Status::OK();
80 }
81 
OnDeleteFile(const std::string & file_path)82 Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
83   {
84     MutexLock l(&mu_);
85     OnDeleteFileImpl(file_path);
86   }
87   TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
88   return Status::OK();
89 }
90 
OnCompactionCompletion(Compaction * c)91 void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
92   MutexLock l(&mu_);
93   uint64_t size_added_by_compaction = 0;
94   for (size_t i = 0; i < c->num_input_levels(); i++) {
95     for (size_t j = 0; j < c->num_input_files(i); j++) {
96       FileMetaData* filemeta = c->input(i, j);
97       size_added_by_compaction += filemeta->fd.GetFileSize();
98     }
99   }
100   cur_compactions_reserved_size_ -= size_added_by_compaction;
101 
102   auto new_files = c->edit()->GetNewFiles();
103   for (auto& new_file : new_files) {
104     auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
105                             new_file.second.fd.GetNumber(),
106                             new_file.second.fd.GetPathId());
107     if (in_progress_files_.find(fn) != in_progress_files_.end()) {
108       auto tracked_file = tracked_files_.find(fn);
109       assert(tracked_file != tracked_files_.end());
110       in_progress_files_size_ -= tracked_file->second;
111       in_progress_files_.erase(fn);
112     }
113   }
114 }
115 
OnMoveFile(const std::string & old_path,const std::string & new_path,uint64_t * file_size)116 Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
117                                       const std::string& new_path,
118                                       uint64_t* file_size) {
119   {
120     MutexLock l(&mu_);
121     if (file_size != nullptr) {
122       *file_size = tracked_files_[old_path];
123     }
124     OnAddFileImpl(new_path, tracked_files_[old_path], false);
125     OnDeleteFileImpl(old_path);
126   }
127   TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
128   return Status::OK();
129 }
130 
SetMaxAllowedSpaceUsage(uint64_t max_allowed_space)131 void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
132   MutexLock l(&mu_);
133   max_allowed_space_ = max_allowed_space;
134 }
135 
SetCompactionBufferSize(uint64_t compaction_buffer_size)136 void SstFileManagerImpl::SetCompactionBufferSize(
137     uint64_t compaction_buffer_size) {
138   MutexLock l(&mu_);
139   compaction_buffer_size_ = compaction_buffer_size;
140 }
141 
IsMaxAllowedSpaceReached()142 bool SstFileManagerImpl::IsMaxAllowedSpaceReached() {
143   MutexLock l(&mu_);
144   if (max_allowed_space_ <= 0) {
145     return false;
146   }
147   return total_files_size_ >= max_allowed_space_;
148 }
149 
IsMaxAllowedSpaceReachedIncludingCompactions()150 bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
151   MutexLock l(&mu_);
152   if (max_allowed_space_ <= 0) {
153     return false;
154   }
155   return total_files_size_ + cur_compactions_reserved_size_ >=
156          max_allowed_space_;
157 }
158 
EnoughRoomForCompaction(ColumnFamilyData * cfd,const std::vector<CompactionInputFiles> & inputs,Status bg_error)159 bool SstFileManagerImpl::EnoughRoomForCompaction(
160     ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
161     Status bg_error) {
162   MutexLock l(&mu_);
163   uint64_t size_added_by_compaction = 0;
164   // First check if we even have the space to do the compaction
165   for (size_t i = 0; i < inputs.size(); i++) {
166     for (size_t j = 0; j < inputs[i].size(); j++) {
167       FileMetaData* filemeta = inputs[i][j];
168       size_added_by_compaction += filemeta->fd.GetFileSize();
169     }
170   }
171 
172   // Update cur_compactions_reserved_size_ so concurrent compaction
173   // don't max out space
174   size_t needed_headroom =
175       cur_compactions_reserved_size_ + size_added_by_compaction +
176       compaction_buffer_size_;
177   if (max_allowed_space_ != 0 &&
178       (needed_headroom + total_files_size_ > max_allowed_space_)) {
179     return false;
180   }
181 
182   // Implement more aggressive checks only if this DB instance has already
183   // seen a NoSpace() error. This is tin order to contain a single potentially
184   // misbehaving DB instance and prevent it from slowing down compactions of
185   // other DB instances
186   if (CheckFreeSpace() && bg_error == Status::NoSpace()) {
187     auto fn =
188         TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
189                       inputs[0][0]->fd.GetPathId());
190     uint64_t free_space = 0;
191     fs_->GetFreeSpace(fn, IOOptions(), &free_space, nullptr);
192     // needed_headroom is based on current size reserved by compactions,
193     // minus any files created by running compactions as they would count
194     // against the reserved size. If user didn't specify any compaction
195     // buffer, add reserved_disk_buffer_ that's calculated by default so the
196     // compaction doesn't end up leaving nothing for logs and flush SSTs
197     if (compaction_buffer_size_ == 0) {
198       needed_headroom += reserved_disk_buffer_;
199     }
200     needed_headroom -= in_progress_files_size_;
201     if (free_space < needed_headroom + size_added_by_compaction) {
202       // We hit the condition of not enough disk space
203       ROCKS_LOG_ERROR(logger_,
204                       "free space [%" PRIu64
205                       " bytes] is less than "
206                       "needed headroom [%" ROCKSDB_PRIszt " bytes]\n",
207                       free_space, needed_headroom);
208       return false;
209     }
210   }
211 
212   cur_compactions_reserved_size_ += size_added_by_compaction;
213   // Take a snapshot of cur_compactions_reserved_size_ for when we encounter
214   // a NoSpace error.
215   free_space_trigger_ = cur_compactions_reserved_size_;
216   return true;
217 }
218 
GetCompactionsReservedSize()219 uint64_t SstFileManagerImpl::GetCompactionsReservedSize() {
220   MutexLock l(&mu_);
221   return cur_compactions_reserved_size_;
222 }
223 
GetTotalSize()224 uint64_t SstFileManagerImpl::GetTotalSize() {
225   MutexLock l(&mu_);
226   return total_files_size_;
227 }
228 
229 std::unordered_map<std::string, uint64_t>
GetTrackedFiles()230 SstFileManagerImpl::GetTrackedFiles() {
231   MutexLock l(&mu_);
232   return tracked_files_;
233 }
234 
GetDeleteRateBytesPerSecond()235 int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
236   return delete_scheduler_.GetRateBytesPerSecond();
237 }
238 
SetDeleteRateBytesPerSecond(int64_t delete_rate)239 void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) {
240   return delete_scheduler_.SetRateBytesPerSecond(delete_rate);
241 }
242 
GetMaxTrashDBRatio()243 double SstFileManagerImpl::GetMaxTrashDBRatio() {
244   return delete_scheduler_.GetMaxTrashDBRatio();
245 }
246 
SetMaxTrashDBRatio(double r)247 void SstFileManagerImpl::SetMaxTrashDBRatio(double r) {
248   return delete_scheduler_.SetMaxTrashDBRatio(r);
249 }
250 
GetTotalTrashSize()251 uint64_t SstFileManagerImpl::GetTotalTrashSize() {
252   return delete_scheduler_.GetTotalTrashSize();
253 }
254 
ReserveDiskBuffer(uint64_t size,const std::string & path)255 void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
256                                            const std::string& path) {
257   MutexLock l(&mu_);
258 
259   reserved_disk_buffer_ += size;
260   if (path_.empty()) {
261     path_ = path;
262   }
263 }
264 
ClearError()265 void SstFileManagerImpl::ClearError() {
266   while (true) {
267     MutexLock l(&mu_);
268 
269     if (closing_) {
270       return;
271     }
272 
273     uint64_t free_space = 0;
274     Status s = fs_->GetFreeSpace(path_, IOOptions(), &free_space, nullptr);
275     free_space = max_allowed_space_ > 0
276                      ? std::min(max_allowed_space_, free_space)
277                      : free_space;
278     if (s.ok()) {
279       // In case of multi-DB instances, some of them may have experienced a
280       // soft error and some a hard error. In the SstFileManagerImpl, a hard
281       // error will basically override previously reported soft errors. Once
282       // we clear the hard error, we don't keep track of previous errors for
283       // now
284       if (bg_err_.severity() == Status::Severity::kHardError) {
285         if (free_space < reserved_disk_buffer_) {
286           ROCKS_LOG_ERROR(logger_,
287                           "free space [%" PRIu64
288                           " bytes] is less than "
289                           "required disk buffer [%" PRIu64 " bytes]\n",
290                           free_space, reserved_disk_buffer_);
291           ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
292           s = Status::NoSpace();
293         }
294       } else if (bg_err_.severity() == Status::Severity::kSoftError) {
295         if (free_space < free_space_trigger_) {
296           ROCKS_LOG_WARN(logger_,
297                          "free space [%" PRIu64
298                          " bytes] is less than "
299                          "free space for compaction trigger [%" PRIu64
300                          " bytes]\n",
301                          free_space, free_space_trigger_);
302           ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
303           s = Status::NoSpace();
304         }
305       }
306     }
307 
308     // Someone could have called CancelErrorRecovery() and the list could have
309     // become empty, so check again here
310     if (s.ok() && !error_handler_list_.empty()) {
311       auto error_handler = error_handler_list_.front();
312       // Since we will release the mutex, set cur_instance_ to signal to the
313       // shutdown thread, if it calls // CancelErrorRecovery() the meantime,
314       // to indicate that this DB instance is busy. The DB instance is
315       // guaranteed to not be deleted before RecoverFromBGError() returns,
316       // since the ErrorHandler::recovery_in_prog_ flag would be true
317       cur_instance_ = error_handler;
318       mu_.Unlock();
319       s = error_handler->RecoverFromBGError();
320       TEST_SYNC_POINT("SstFileManagerImpl::ErrorCleared");
321       mu_.Lock();
322       // The DB instance might have been deleted while we were
323       // waiting for the mutex, so check cur_instance_ to make sure its
324       // still non-null
325       if (cur_instance_) {
326         // Check for error again, since the instance may have recovered but
327         // immediately got another error. If that's the case, and the new
328         // error is also a NoSpace() non-fatal error, leave the instance in
329         // the list
330         Status err = cur_instance_->GetBGError();
331         if (s.ok() && err == Status::NoSpace() &&
332             err.severity() < Status::Severity::kFatalError) {
333           s = err;
334         }
335         cur_instance_ = nullptr;
336       }
337 
338       if (s.ok() || s.IsShutdownInProgress() ||
339           (!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
340         // If shutdown is in progress, abandon this handler instance
341         // and continue with the others
342         error_handler_list_.pop_front();
343       }
344     }
345 
346     if (!error_handler_list_.empty()) {
347       // If there are more instances to be recovered, reschedule after 5
348       // seconds
349       int64_t wait_until = env_->NowMicros() + 5000000;
350       cv_.TimedWait(wait_until);
351     }
352 
353     // Check again for error_handler_list_ empty, as a DB instance shutdown
354     // could have removed it from the queue while we were in timed wait
355     if (error_handler_list_.empty()) {
356       ROCKS_LOG_INFO(logger_, "Clearing error\n");
357       bg_err_ = Status::OK();
358       return;
359     }
360   }
361 }
362 
StartErrorRecovery(ErrorHandler * handler,Status bg_error)363 void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
364                                             Status bg_error) {
365   MutexLock l(&mu_);
366   if (bg_error.severity() == Status::Severity::kSoftError) {
367     if (bg_err_.ok()) {
368       // Setting bg_err_ basically means we're in degraded mode
369       // Assume that all pending compactions will fail similarly. The trigger
370       // for clearing this condition is set to current compaction reserved
371       // size, so we stop checking disk space available in
372       // EnoughRoomForCompaction once this much free space is available
373       bg_err_ = bg_error;
374     }
375   } else if (bg_error.severity() == Status::Severity::kHardError) {
376     bg_err_ = bg_error;
377   } else {
378     assert(false);
379   }
380 
381   // If this is the first instance of this error, kick of a thread to poll
382   // and recover from this condition
383   if (error_handler_list_.empty()) {
384     error_handler_list_.push_back(handler);
385     // Release lock before calling join. Its ok to do so because
386     // error_handler_list_ is now non-empty, so no other invocation of this
387     // function will execute this piece of code
388     mu_.Unlock();
389     if (bg_thread_) {
390       bg_thread_->join();
391     }
392     // Start a new thread. The previous one would have exited.
393     bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
394     mu_.Lock();
395   } else {
396     // Check if this DB instance is already in the list
397     for (auto iter = error_handler_list_.begin();
398          iter != error_handler_list_.end(); ++iter) {
399       if ((*iter) == handler) {
400         return;
401       }
402     }
403     error_handler_list_.push_back(handler);
404   }
405 }
406 
CancelErrorRecovery(ErrorHandler * handler)407 bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
408   MutexLock l(&mu_);
409 
410   if (cur_instance_ == handler) {
411     // This instance is currently busy attempting to recover
412     // Nullify it so the recovery thread doesn't attempt to access it again
413     cur_instance_ = nullptr;
414     return false;
415   }
416 
417   for (auto iter = error_handler_list_.begin();
418        iter != error_handler_list_.end(); ++iter) {
419     if ((*iter) == handler) {
420       error_handler_list_.erase(iter);
421       return true;
422     }
423   }
424   return false;
425 }
426 
ScheduleFileDeletion(const std::string & file_path,const std::string & path_to_sync,const bool force_bg)427 Status SstFileManagerImpl::ScheduleFileDeletion(
428     const std::string& file_path, const std::string& path_to_sync,
429     const bool force_bg) {
430   TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::ScheduleFileDeletion",
431                            const_cast<std::string*>(&file_path));
432   return delete_scheduler_.DeleteFile(file_path, path_to_sync,
433                                       force_bg);
434 }
435 
WaitForEmptyTrash()436 void SstFileManagerImpl::WaitForEmptyTrash() {
437   delete_scheduler_.WaitForEmptyTrash();
438 }
439 
OnAddFileImpl(const std::string & file_path,uint64_t file_size,bool compaction)440 void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
441                                        uint64_t file_size, bool compaction) {
442   auto tracked_file = tracked_files_.find(file_path);
443   if (tracked_file != tracked_files_.end()) {
444     // File was added before, we will just update the size
445     assert(!compaction);
446     total_files_size_ -= tracked_file->second;
447     total_files_size_ += file_size;
448     cur_compactions_reserved_size_ -= file_size;
449   } else {
450     total_files_size_ += file_size;
451     if (compaction) {
452       // Keep track of the size of files created by in-progress compactions.
453       // When calculating whether there's enough headroom for new compactions,
454       // this will be subtracted from cur_compactions_reserved_size_.
455       // Otherwise, compactions will be double counted.
456       in_progress_files_size_ += file_size;
457       in_progress_files_.insert(file_path);
458     }
459   }
460   tracked_files_[file_path] = file_size;
461 }
462 
OnDeleteFileImpl(const std::string & file_path)463 void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
464   auto tracked_file = tracked_files_.find(file_path);
465   if (tracked_file == tracked_files_.end()) {
466     // File is not tracked
467     assert(in_progress_files_.find(file_path) == in_progress_files_.end());
468     return;
469   }
470 
471   total_files_size_ -= tracked_file->second;
472   // Check if it belonged to an in-progress compaction
473   if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
474     in_progress_files_size_ -= tracked_file->second;
475     in_progress_files_.erase(file_path);
476   }
477   tracked_files_.erase(tracked_file);
478 }
479 
NewSstFileManager(Env * env,std::shared_ptr<Logger> info_log,std::string trash_dir,int64_t rate_bytes_per_sec,bool delete_existing_trash,Status * status,double max_trash_db_ratio,uint64_t bytes_max_delete_chunk)480 SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
481                                   std::string trash_dir,
482                                   int64_t rate_bytes_per_sec,
483                                   bool delete_existing_trash, Status* status,
484                                   double max_trash_db_ratio,
485                                   uint64_t bytes_max_delete_chunk) {
486   std::shared_ptr<FileSystem> fs;
487 
488   if (env == Env::Default()) {
489     fs = FileSystem::Default();
490   } else {
491     fs.reset(new LegacyFileSystemWrapper(env));
492   }
493 
494   return NewSstFileManager(env, fs, info_log, trash_dir, rate_bytes_per_sec,
495                            delete_existing_trash, status, max_trash_db_ratio,
496                            bytes_max_delete_chunk);
497 }
498 
NewSstFileManager(Env * env,std::shared_ptr<FileSystem> fs,std::shared_ptr<Logger> info_log,const std::string & trash_dir,int64_t rate_bytes_per_sec,bool delete_existing_trash,Status * status,double max_trash_db_ratio,uint64_t bytes_max_delete_chunk)499 SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<FileSystem> fs,
500                                   std::shared_ptr<Logger> info_log,
501                                   const std::string& trash_dir,
502                                   int64_t rate_bytes_per_sec,
503                                   bool delete_existing_trash, Status* status,
504                                   double max_trash_db_ratio,
505                                   uint64_t bytes_max_delete_chunk) {
506   SstFileManagerImpl* res =
507       new SstFileManagerImpl(env, fs, info_log, rate_bytes_per_sec,
508                              max_trash_db_ratio, bytes_max_delete_chunk);
509 
510   // trash_dir is deprecated and not needed anymore, but if user passed it
511   // we will still remove files in it.
512   Status s;
513   if (delete_existing_trash && trash_dir != "") {
514     std::vector<std::string> files_in_trash;
515     s = fs->GetChildren(trash_dir, IOOptions(), &files_in_trash, nullptr);
516     if (s.ok()) {
517       for (const std::string& trash_file : files_in_trash) {
518         if (trash_file == "." || trash_file == "..") {
519           continue;
520         }
521 
522         std::string path_in_trash = trash_dir + "/" + trash_file;
523         res->OnAddFile(path_in_trash);
524         Status file_delete =
525             res->ScheduleFileDeletion(path_in_trash, trash_dir);
526         if (s.ok() && !file_delete.ok()) {
527           s = file_delete;
528         }
529       }
530     }
531   }
532 
533   if (status) {
534     *status = s;
535   }
536 
537   return res;
538 }
539 
540 #else
541 
542 SstFileManager* NewSstFileManager(Env* /*env*/,
543                                   std::shared_ptr<Logger> /*info_log*/,
544                                   std::string /*trash_dir*/,
545                                   int64_t /*rate_bytes_per_sec*/,
546                                   bool /*delete_existing_trash*/,
547                                   Status* status, double /*max_trash_db_ratio*/,
548                                   uint64_t /*bytes_max_delete_chunk*/) {
549   if (status) {
550     *status =
551         Status::NotSupported("SstFileManager is not supported in ROCKSDB_LITE");
552   }
553   return nullptr;
554 }
555 
556 #endif  // ROCKSDB_LITE
557 
558 }  // namespace ROCKSDB_NAMESPACE
559