1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "file/delete_scheduler.h"
9
10 #include <cinttypes>
11 #include <thread>
12 #include <vector>
13
14 #include "file/sst_file_manager_impl.h"
15 #include "logging/logging.h"
16 #include "port/port.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/file_system.h"
19 #include "rocksdb/system_clock.h"
20 #include "test_util/sync_point.h"
21 #include "util/mutexlock.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
DeleteScheduler(SystemClock * clock,FileSystem * fs,int64_t rate_bytes_per_sec,Logger * info_log,SstFileManagerImpl * sst_file_manager,double max_trash_db_ratio,uint64_t bytes_max_delete_chunk)25 DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
26 int64_t rate_bytes_per_sec, Logger* info_log,
27 SstFileManagerImpl* sst_file_manager,
28 double max_trash_db_ratio,
29 uint64_t bytes_max_delete_chunk)
30 : clock_(clock),
31 fs_(fs),
32 total_trash_size_(0),
33 rate_bytes_per_sec_(rate_bytes_per_sec),
34 pending_files_(0),
35 bytes_max_delete_chunk_(bytes_max_delete_chunk),
36 closing_(false),
37 cv_(&mu_),
38 bg_thread_(nullptr),
39 info_log_(info_log),
40 sst_file_manager_(sst_file_manager),
41 max_trash_db_ratio_(max_trash_db_ratio) {
42 assert(sst_file_manager != nullptr);
43 assert(max_trash_db_ratio >= 0);
44 MaybeCreateBackgroundThread();
45 }
46
~DeleteScheduler()47 DeleteScheduler::~DeleteScheduler() {
48 {
49 InstrumentedMutexLock l(&mu_);
50 closing_ = true;
51 cv_.SignalAll();
52 }
53 if (bg_thread_) {
54 bg_thread_->join();
55 }
56 for (const auto& it : bg_errors_) {
57 it.second.PermitUncheckedError();
58 }
59 }
60
DeleteFile(const std::string & file_path,const std::string & dir_to_sync,const bool force_bg)61 Status DeleteScheduler::DeleteFile(const std::string& file_path,
62 const std::string& dir_to_sync,
63 const bool force_bg) {
64 if (rate_bytes_per_sec_.load() <= 0 || (!force_bg &&
65 total_trash_size_.load() >
66 sst_file_manager_->GetTotalSize() * max_trash_db_ratio_.load())) {
67 // Rate limiting is disabled or trash size makes up more than
68 // max_trash_db_ratio_ (default 25%) of the total DB size
69 TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
70 Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
71 if (s.ok()) {
72 s = sst_file_manager_->OnDeleteFile(file_path);
73 ROCKS_LOG_INFO(info_log_,
74 "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
75 ", total_trash_size %" PRIu64 " max_trash_db_ratio %lf",
76 file_path.c_str(), rate_bytes_per_sec_.load(),
77 total_trash_size_.load(), max_trash_db_ratio_.load());
78 InstrumentedMutexLock l(&mu_);
79 RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
80 }
81 return s;
82 }
83
84 // Move file to trash
85 std::string trash_file;
86 Status s = MarkAsTrash(file_path, &trash_file);
87 ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
88 s.ToString().c_str());
89
90 if (!s.ok()) {
91 ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
92 file_path.c_str(), s.ToString().c_str());
93 s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
94 if (s.ok()) {
95 s = sst_file_manager_->OnDeleteFile(file_path);
96 ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
97 trash_file.c_str());
98 InstrumentedMutexLock l(&mu_);
99 RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
100 }
101 return s;
102 }
103
104 // Update the total trash size
105 uint64_t trash_file_size = 0;
106 IOStatus io_s =
107 fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
108 if (io_s.ok()) {
109 total_trash_size_.fetch_add(trash_file_size);
110 }
111 //**TODO: What should we do if we failed to
112 // get the file size?
113
114 // Add file to delete queue
115 {
116 InstrumentedMutexLock l(&mu_);
117 RecordTick(stats_.get(), FILES_MARKED_TRASH);
118 queue_.emplace(trash_file, dir_to_sync);
119 pending_files_++;
120 if (pending_files_ == 1) {
121 cv_.SignalAll();
122 }
123 }
124 return s;
125 }
126
GetBackgroundErrors()127 std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
128 InstrumentedMutexLock l(&mu_);
129 return bg_errors_;
130 }
131
132 const std::string DeleteScheduler::kTrashExtension = ".trash";
IsTrashFile(const std::string & file_path)133 bool DeleteScheduler::IsTrashFile(const std::string& file_path) {
134 return (file_path.size() >= kTrashExtension.size() &&
135 file_path.rfind(kTrashExtension) ==
136 file_path.size() - kTrashExtension.size());
137 }
138
CleanupDirectory(Env * env,SstFileManagerImpl * sfm,const std::string & path)139 Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
140 const std::string& path) {
141 Status s;
142 // Check if there are any files marked as trash in this path
143 std::vector<std::string> files_in_path;
144 s = env->GetChildren(path, &files_in_path);
145 if (!s.ok()) {
146 return s;
147 }
148 for (const std::string& current_file : files_in_path) {
149 if (!DeleteScheduler::IsTrashFile(current_file)) {
150 // not a trash file, skip
151 continue;
152 }
153
154 Status file_delete;
155 std::string trash_file = path + "/" + current_file;
156 if (sfm) {
157 // We have an SstFileManager that will schedule the file delete
158 s = sfm->OnAddFile(trash_file);
159 file_delete = sfm->ScheduleFileDeletion(trash_file, path);
160 } else {
161 // Delete the file immediately
162 file_delete = env->DeleteFile(trash_file);
163 }
164
165 if (s.ok() && !file_delete.ok()) {
166 s = file_delete;
167 }
168 }
169
170 return s;
171 }
172
MarkAsTrash(const std::string & file_path,std::string * trash_file)173 Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
174 std::string* trash_file) {
175 // Sanity check of the path
176 size_t idx = file_path.rfind("/");
177 if (idx == std::string::npos || idx == file_path.size() - 1) {
178 return Status::InvalidArgument("file_path is corrupted");
179 }
180
181 if (DeleteScheduler::IsTrashFile(file_path)) {
182 // This is already a trash file
183 *trash_file = file_path;
184 return Status::OK();
185 }
186
187 *trash_file = file_path + kTrashExtension;
188 // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
189 // file_move_mu mutex.
190 int cnt = 0;
191 Status s;
192 InstrumentedMutexLock l(&file_move_mu_);
193 while (true) {
194 s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
195 if (s.IsNotFound()) {
196 // We found a path for our file in trash
197 s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);
198 break;
199 } else if (s.ok()) {
200 // Name conflict, generate new random suffix
201 *trash_file = file_path + std::to_string(cnt) + kTrashExtension;
202 } else {
203 // Error during FileExists call, we cannot continue
204 break;
205 }
206 cnt++;
207 }
208 if (s.ok()) {
209 s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
210 }
211 return s;
212 }
213
BackgroundEmptyTrash()214 void DeleteScheduler::BackgroundEmptyTrash() {
215 TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
216
217 while (true) {
218 InstrumentedMutexLock l(&mu_);
219 while (queue_.empty() && !closing_) {
220 cv_.Wait();
221 }
222
223 if (closing_) {
224 return;
225 }
226
227 // Delete all files in queue_
228 uint64_t start_time = clock_->NowMicros();
229 uint64_t total_deleted_bytes = 0;
230 int64_t current_delete_rate = rate_bytes_per_sec_.load();
231 while (!queue_.empty() && !closing_) {
232 if (current_delete_rate != rate_bytes_per_sec_.load()) {
233 // User changed the delete rate
234 current_delete_rate = rate_bytes_per_sec_.load();
235 start_time = clock_->NowMicros();
236 total_deleted_bytes = 0;
237 ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64,
238 current_delete_rate);
239 }
240
241 // Get new file to delete
242 const FileAndDir& fad = queue_.front();
243 std::string path_in_trash = fad.fname;
244
245 // We don't need to hold the lock while deleting the file
246 mu_.Unlock();
247 uint64_t deleted_bytes = 0;
248 bool is_complete = true;
249 // Delete file from trash and update total_penlty value
250 Status s =
251 DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
252 total_deleted_bytes += deleted_bytes;
253 mu_.Lock();
254 if (is_complete) {
255 queue_.pop();
256 }
257
258 if (!s.ok()) {
259 bg_errors_[path_in_trash] = s;
260 }
261
262 // Apply penalty if necessary
263 uint64_t total_penalty;
264 if (current_delete_rate > 0) {
265 // rate limiting is enabled
266 total_penalty =
267 ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
268 ROCKS_LOG_INFO(info_log_,
269 "Rate limiting is enabled with penalty %" PRIu64
270 " after deleting file %s",
271 total_penalty, path_in_trash.c_str());
272 while (!closing_ && !cv_.TimedWait(start_time + total_penalty)) {
273 }
274 } else {
275 // rate limiting is disabled
276 total_penalty = 0;
277 ROCKS_LOG_INFO(info_log_,
278 "Rate limiting is disabled after deleting file %s",
279 path_in_trash.c_str());
280 }
281 TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
282 &total_penalty);
283
284 if (is_complete) {
285 pending_files_--;
286 }
287 if (pending_files_ == 0) {
288 // Unblock WaitForEmptyTrash since there are no more files waiting
289 // to be deleted
290 cv_.SignalAll();
291 }
292 }
293 }
294 }
295
DeleteTrashFile(const std::string & path_in_trash,const std::string & dir_to_sync,uint64_t * deleted_bytes,bool * is_complete)296 Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
297 const std::string& dir_to_sync,
298 uint64_t* deleted_bytes,
299 bool* is_complete) {
300 uint64_t file_size;
301 Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
302 *is_complete = true;
303 TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
304 if (s.ok()) {
305 bool need_full_delete = true;
306 if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
307 uint64_t num_hard_links = 2;
308 // We don't have to worry aobut data race between linking a new
309 // file after the number of file link check and ftruncte because
310 // the file is now in trash and no hardlink is supposed to create
311 // to trash files by RocksDB.
312 Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),
313 &num_hard_links, nullptr);
314 if (my_status.ok()) {
315 if (num_hard_links == 1) {
316 std::unique_ptr<FSWritableFile> wf;
317 my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(),
318 &wf, nullptr);
319 if (my_status.ok()) {
320 my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
321 IOOptions(), nullptr);
322 if (my_status.ok()) {
323 TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");
324 my_status = wf->Fsync(IOOptions(), nullptr);
325 }
326 }
327 if (my_status.ok()) {
328 *deleted_bytes = bytes_max_delete_chunk_;
329 need_full_delete = false;
330 *is_complete = false;
331 } else {
332 ROCKS_LOG_WARN(info_log_,
333 "Failed to partially delete %s from trash -- %s",
334 path_in_trash.c_str(), my_status.ToString().c_str());
335 }
336 } else {
337 ROCKS_LOG_INFO(info_log_,
338 "Cannot delete %s slowly through ftruncate from trash "
339 "as it has other links",
340 path_in_trash.c_str());
341 }
342 } else if (!num_link_error_printed_) {
343 ROCKS_LOG_INFO(
344 info_log_,
345 "Cannot delete files slowly through ftruncate from trash "
346 "as Env::NumFileLinks() returns error: %s",
347 my_status.ToString().c_str());
348 num_link_error_printed_ = true;
349 }
350 }
351
352 if (need_full_delete) {
353 s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);
354 if (!dir_to_sync.empty()) {
355 std::unique_ptr<FSDirectory> dir_obj;
356 if (s.ok()) {
357 s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
358 }
359 if (s.ok()) {
360 s = dir_obj->Fsync(IOOptions(), nullptr);
361 TEST_SYNC_POINT_CALLBACK(
362 "DeleteScheduler::DeleteTrashFile::AfterSyncDir",
363 reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
364 }
365 }
366 if (s.ok()) {
367 *deleted_bytes = file_size;
368 s = sst_file_manager_->OnDeleteFile(path_in_trash);
369 }
370 }
371 }
372 if (!s.ok()) {
373 // Error while getting file size or while deleting
374 ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",
375 path_in_trash.c_str(), s.ToString().c_str());
376 *deleted_bytes = 0;
377 } else {
378 total_trash_size_.fetch_sub(*deleted_bytes);
379 }
380
381 return s;
382 }
383
WaitForEmptyTrash()384 void DeleteScheduler::WaitForEmptyTrash() {
385 InstrumentedMutexLock l(&mu_);
386 while (pending_files_ > 0 && !closing_) {
387 cv_.Wait();
388 }
389 }
390
MaybeCreateBackgroundThread()391 void DeleteScheduler::MaybeCreateBackgroundThread() {
392 if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
393 bg_thread_.reset(
394 new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
395 ROCKS_LOG_INFO(info_log_,
396 "Created background thread for deletion scheduler with "
397 "rate_bytes_per_sec: %" PRIi64,
398 rate_bytes_per_sec_.load());
399 }
400 }
401
402 } // namespace ROCKSDB_NAMESPACE
403
404 #endif // ROCKSDB_LITE
405