1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #ifndef ROCKSDB_LITE
7 
8 #include "file/delete_scheduler.h"
9 
10 #include <cinttypes>
11 #include <thread>
12 #include <vector>
13 
14 #include "file/sst_file_manager_impl.h"
15 #include "logging/logging.h"
16 #include "port/port.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/file_system.h"
19 #include "rocksdb/system_clock.h"
20 #include "test_util/sync_point.h"
21 #include "util/mutexlock.h"
22 
23 namespace ROCKSDB_NAMESPACE {
24 
DeleteScheduler(SystemClock * clock,FileSystem * fs,int64_t rate_bytes_per_sec,Logger * info_log,SstFileManagerImpl * sst_file_manager,double max_trash_db_ratio,uint64_t bytes_max_delete_chunk)25 DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
26                                  int64_t rate_bytes_per_sec, Logger* info_log,
27                                  SstFileManagerImpl* sst_file_manager,
28                                  double max_trash_db_ratio,
29                                  uint64_t bytes_max_delete_chunk)
30     : clock_(clock),
31       fs_(fs),
32       total_trash_size_(0),
33       rate_bytes_per_sec_(rate_bytes_per_sec),
34       pending_files_(0),
35       bytes_max_delete_chunk_(bytes_max_delete_chunk),
36       closing_(false),
37       cv_(&mu_),
38       bg_thread_(nullptr),
39       info_log_(info_log),
40       sst_file_manager_(sst_file_manager),
41       max_trash_db_ratio_(max_trash_db_ratio) {
42   assert(sst_file_manager != nullptr);
43   assert(max_trash_db_ratio >= 0);
44   MaybeCreateBackgroundThread();
45 }
46 
~DeleteScheduler()47 DeleteScheduler::~DeleteScheduler() {
48   {
49     InstrumentedMutexLock l(&mu_);
50     closing_ = true;
51     cv_.SignalAll();
52   }
53   if (bg_thread_) {
54     bg_thread_->join();
55   }
56   for (const auto& it : bg_errors_) {
57     it.second.PermitUncheckedError();
58   }
59 }
60 
DeleteFile(const std::string & file_path,const std::string & dir_to_sync,const bool force_bg)61 Status DeleteScheduler::DeleteFile(const std::string& file_path,
62                                    const std::string& dir_to_sync,
63                                    const bool force_bg) {
64   if (rate_bytes_per_sec_.load() <= 0 || (!force_bg &&
65       total_trash_size_.load() >
66           sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {
67     // Rate limiting is disabled or trash size makes up more than
68     // max_trash_db_ratio_ (default 25%) of the total DB size
69     TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
70     Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
71     if (s.ok()) {
72       s = sst_file_manager_->OnDeleteFile(file_path);
73       ROCKS_LOG_INFO(info_log_,
74                      "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
75                      ", total_trash_size %" PRIu64 " max_trash_db_ratio %lf",
76                      file_path.c_str(), rate_bytes_per_sec_.load(),
77                      total_trash_size_.load(), max_trash_db_ratio_.load());
78       InstrumentedMutexLock l(&mu_);
79       RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
80     }
81     return s;
82   }
83 
84   // Move file to trash
85   std::string trash_file;
86   Status s = MarkAsTrash(file_path, &trash_file);
87   ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
88                  s.ToString().c_str());
89 
90   if (!s.ok()) {
91     ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
92                     file_path.c_str(), s.ToString().c_str());
93     s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
94     if (s.ok()) {
95       s = sst_file_manager_->OnDeleteFile(file_path);
96       ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
97                      trash_file.c_str());
98       InstrumentedMutexLock l(&mu_);
99       RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
100     }
101     return s;
102   }
103 
104   // Update the total trash size
105   uint64_t trash_file_size = 0;
106   IOStatus io_s =
107       fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
108   if (io_s.ok()) {
109     total_trash_size_.fetch_add(trash_file_size);
110   }
111   //**TODO: What should we do if we failed to
112   // get the file size?
113 
114   // Add file to delete queue
115   {
116     InstrumentedMutexLock l(&mu_);
117     RecordTick(stats_.get(), FILES_MARKED_TRASH);
118     queue_.emplace(trash_file, dir_to_sync);
119     pending_files_++;
120     if (pending_files_ == 1) {
121       cv_.SignalAll();
122     }
123   }
124   return s;
125 }
126 
GetBackgroundErrors()127 std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
128   InstrumentedMutexLock l(&mu_);
129   return bg_errors_;
130 }
131 
132 const std::string DeleteScheduler::kTrashExtension = ".trash";
IsTrashFile(const std::string & file_path)133 bool DeleteScheduler::IsTrashFile(const std::string& file_path) {
134   return (file_path.size() >= kTrashExtension.size() &&
135           file_path.rfind(kTrashExtension) ==
136               file_path.size() - kTrashExtension.size());
137 }
138 
CleanupDirectory(Env * env,SstFileManagerImpl * sfm,const std::string & path)139 Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
140                                          const std::string& path) {
141   Status s;
142   // Check if there are any files marked as trash in this path
143   std::vector<std::string> files_in_path;
144   s = env->GetChildren(path, &files_in_path);
145   if (!s.ok()) {
146     return s;
147   }
148   for (const std::string& current_file : files_in_path) {
149     if (!DeleteScheduler::IsTrashFile(current_file)) {
150       // not a trash file, skip
151       continue;
152     }
153 
154     Status file_delete;
155     std::string trash_file = path + "/" + current_file;
156     if (sfm) {
157       // We have an SstFileManager that will schedule the file delete
158       s = sfm->OnAddFile(trash_file);
159       file_delete = sfm->ScheduleFileDeletion(trash_file, path);
160     } else {
161       // Delete the file immediately
162       file_delete = env->DeleteFile(trash_file);
163     }
164 
165     if (s.ok() && !file_delete.ok()) {
166       s = file_delete;
167     }
168   }
169 
170   return s;
171 }
172 
MarkAsTrash(const std::string & file_path,std::string * trash_file)173 Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
174                                     std::string* trash_file) {
175   // Sanity check of the path
176   size_t idx = file_path.rfind("/");
177   if (idx == std::string::npos || idx == file_path.size() - 1) {
178     return Status::InvalidArgument("file_path is corrupted");
179   }
180 
181   if (DeleteScheduler::IsTrashFile(file_path)) {
182     // This is already a trash file
183     *trash_file = file_path;
184     return Status::OK();
185   }
186 
187   *trash_file = file_path + kTrashExtension;
188   // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
189   //             file_move_mu mutex.
190   int cnt = 0;
191   Status s;
192   InstrumentedMutexLock l(&file_move_mu_);
193   while (true) {
194     s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
195     if (s.IsNotFound()) {
196       // We found a path for our file in trash
197       s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);
198       break;
199     } else if (s.ok()) {
200       // Name conflict, generate new random suffix
201       *trash_file = file_path + std::to_string(cnt) + kTrashExtension;
202     } else {
203       // Error during FileExists call, we cannot continue
204       break;
205     }
206     cnt++;
207   }
208   if (s.ok()) {
209     s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
210   }
211   return s;
212 }
213 
BackgroundEmptyTrash()214 void DeleteScheduler::BackgroundEmptyTrash() {
215   TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
216 
217   while (true) {
218     InstrumentedMutexLock l(&mu_);
219     while (queue_.empty() && !closing_) {
220       cv_.Wait();
221     }
222 
223     if (closing_) {
224       return;
225     }
226 
227     // Delete all files in queue_
228     uint64_t start_time = clock_->NowMicros();
229     uint64_t total_deleted_bytes = 0;
230     int64_t current_delete_rate = rate_bytes_per_sec_.load();
231     while (!queue_.empty() && !closing_) {
232       if (current_delete_rate != rate_bytes_per_sec_.load()) {
233         // User changed the delete rate
234         current_delete_rate = rate_bytes_per_sec_.load();
235         start_time = clock_->NowMicros();
236         total_deleted_bytes = 0;
237         ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64,
238                        current_delete_rate);
239       }
240 
241       // Get new file to delete
242       const FileAndDir& fad = queue_.front();
243       std::string path_in_trash = fad.fname;
244 
245       // We don't need to hold the lock while deleting the file
246       mu_.Unlock();
247       uint64_t deleted_bytes = 0;
248       bool is_complete = true;
249       // Delete file from trash and update total_penlty value
250       Status s =
251           DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
252       total_deleted_bytes += deleted_bytes;
253       mu_.Lock();
254       if (is_complete) {
255         queue_.pop();
256       }
257 
258       if (!s.ok()) {
259         bg_errors_[path_in_trash] = s;
260       }
261 
262       // Apply penalty if necessary
263       uint64_t total_penalty;
264       if (current_delete_rate > 0) {
265         // rate limiting is enabled
266         total_penalty =
267             ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
268         ROCKS_LOG_INFO(info_log_,
269                        "Rate limiting is enabled with penalty %" PRIu64
270                        " after deleting file %s",
271                        total_penalty, path_in_trash.c_str());
272         while (!closing_ && !cv_.TimedWait(start_time + total_penalty)) {
273         }
274       } else {
275         // rate limiting is disabled
276         total_penalty = 0;
277         ROCKS_LOG_INFO(info_log_,
278                        "Rate limiting is disabled after deleting file %s",
279                        path_in_trash.c_str());
280       }
281       TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
282                                &total_penalty);
283 
284       if (is_complete) {
285         pending_files_--;
286       }
287       if (pending_files_ == 0) {
288         // Unblock WaitForEmptyTrash since there are no more files waiting
289         // to be deleted
290         cv_.SignalAll();
291       }
292     }
293   }
294 }
295 
DeleteTrashFile(const std::string & path_in_trash,const std::string & dir_to_sync,uint64_t * deleted_bytes,bool * is_complete)296 Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
297                                         const std::string& dir_to_sync,
298                                         uint64_t* deleted_bytes,
299                                         bool* is_complete) {
300   uint64_t file_size;
301   Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
302   *is_complete = true;
303   TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
304   if (s.ok()) {
305     bool need_full_delete = true;
306     if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
307       uint64_t num_hard_links = 2;
308       // We don't have to worry aobut data race between linking a new
309       // file after the number of file link check and ftruncte because
310       // the file is now in trash and no hardlink is supposed to create
311       // to trash files by RocksDB.
312       Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),
313                                            &num_hard_links, nullptr);
314       if (my_status.ok()) {
315         if (num_hard_links == 1) {
316           std::unique_ptr<FSWritableFile> wf;
317           my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(),
318                                               &wf, nullptr);
319           if (my_status.ok()) {
320             my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
321                                      IOOptions(), nullptr);
322             if (my_status.ok()) {
323               TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");
324               my_status = wf->Fsync(IOOptions(), nullptr);
325             }
326           }
327           if (my_status.ok()) {
328             *deleted_bytes = bytes_max_delete_chunk_;
329             need_full_delete = false;
330             *is_complete = false;
331           } else {
332             ROCKS_LOG_WARN(info_log_,
333                            "Failed to partially delete %s from trash -- %s",
334                            path_in_trash.c_str(), my_status.ToString().c_str());
335           }
336         } else {
337           ROCKS_LOG_INFO(info_log_,
338                          "Cannot delete %s slowly through ftruncate from trash "
339                          "as it has other links",
340                          path_in_trash.c_str());
341         }
342       } else if (!num_link_error_printed_) {
343         ROCKS_LOG_INFO(
344             info_log_,
345             "Cannot delete files slowly through ftruncate from trash "
346             "as Env::NumFileLinks() returns error: %s",
347             my_status.ToString().c_str());
348         num_link_error_printed_ = true;
349       }
350     }
351 
352     if (need_full_delete) {
353       s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);
354       if (!dir_to_sync.empty()) {
355         std::unique_ptr<FSDirectory> dir_obj;
356         if (s.ok()) {
357           s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
358         }
359         if (s.ok()) {
360           s = dir_obj->Fsync(IOOptions(), nullptr);
361           TEST_SYNC_POINT_CALLBACK(
362               "DeleteScheduler::DeleteTrashFile::AfterSyncDir",
363               reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
364         }
365       }
366       if (s.ok()) {
367         *deleted_bytes = file_size;
368         s = sst_file_manager_->OnDeleteFile(path_in_trash);
369       }
370     }
371   }
372   if (!s.ok()) {
373     // Error while getting file size or while deleting
374     ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",
375                     path_in_trash.c_str(), s.ToString().c_str());
376     *deleted_bytes = 0;
377   } else {
378     total_trash_size_.fetch_sub(*deleted_bytes);
379   }
380 
381   return s;
382 }
383 
WaitForEmptyTrash()384 void DeleteScheduler::WaitForEmptyTrash() {
385   InstrumentedMutexLock l(&mu_);
386   while (pending_files_ > 0 && !closing_) {
387     cv_.Wait();
388   }
389 }
390 
MaybeCreateBackgroundThread()391 void DeleteScheduler::MaybeCreateBackgroundThread() {
392   if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
393     bg_thread_.reset(
394         new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
395     ROCKS_LOG_INFO(info_log_,
396                    "Created background thread for deletion scheduler with "
397                    "rate_bytes_per_sec: %" PRIi64,
398                    rate_bytes_per_sec_.load());
399   }
400 }
401 
402 }  // namespace ROCKSDB_NAMESPACE
403 
404 #endif  // ROCKSDB_LITE
405