1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <cinttypes>
10 
11 #include "db/builder.h"
12 #include "db/db_impl/db_impl.h"
13 #include "db/error_handler.h"
14 #include "db/event_helpers.h"
15 #include "file/sst_file_manager_impl.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "monitoring/perf_context_imp.h"
18 #include "monitoring/thread_status_updater.h"
19 #include "monitoring/thread_status_util.h"
20 #include "test_util/sync_point.h"
21 #include "util/cast_util.h"
22 #include "util/concurrent_task_limiter_impl.h"
23 
24 namespace ROCKSDB_NAMESPACE {
25 
EnoughRoomForCompaction(ColumnFamilyData * cfd,const std::vector<CompactionInputFiles> & inputs,bool * sfm_reserved_compact_space,LogBuffer * log_buffer)26 bool DBImpl::EnoughRoomForCompaction(
27     ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
28     bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
29   // Check if we have enough room to do the compaction
30   bool enough_room = true;
31 #ifndef ROCKSDB_LITE
32   auto sfm = static_cast<SstFileManagerImpl*>(
33       immutable_db_options_.sst_file_manager.get());
34   if (sfm) {
35     // Pass the current bg_error_ to SFM so it can decide what checks to
36     // perform. If this DB instance hasn't seen any error yet, the SFM can be
37     // optimistic and not do disk space checks
38     Status bg_error = error_handler_.GetBGError();
39     enough_room = sfm->EnoughRoomForCompaction(cfd, inputs, bg_error);
40     bg_error.PermitUncheckedError();  // bg_error is just a copy of the Status
41                                       // from the error_handler_
42     if (enough_room) {
43       *sfm_reserved_compact_space = true;
44     }
45   }
46 #else
47   (void)cfd;
48   (void)inputs;
49   (void)sfm_reserved_compact_space;
50 #endif  // ROCKSDB_LITE
51   if (!enough_room) {
52     // Just in case tests want to change the value of enough_room
53     TEST_SYNC_POINT_CALLBACK(
54         "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
55     ROCKS_LOG_BUFFER(log_buffer,
56                      "Cancelled compaction because not enough room");
57     RecordTick(stats_, COMPACTION_CANCELLED, 1);
58   }
59   return enough_room;
60 }
61 
RequestCompactionToken(ColumnFamilyData * cfd,bool force,std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)62 bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
63                                     std::unique_ptr<TaskLimiterToken>* token,
64                                     LogBuffer* log_buffer) {
65   assert(*token == nullptr);
66   auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
67       cfd->ioptions()->compaction_thread_limiter.get());
68   if (limiter == nullptr) {
69     return true;
70   }
71   *token = limiter->GetToken(force);
72   if (*token != nullptr) {
73     ROCKS_LOG_BUFFER(log_buffer,
74                      "Thread limiter [%s] increase [%s] compaction task, "
75                      "force: %s, tasks after: %d",
76                      limiter->GetName().c_str(), cfd->GetName().c_str(),
77                      force ? "true" : "false", limiter->GetOutstandingTask());
78     return true;
79   }
80   return false;
81 }
82 
SyncClosedLogs(JobContext * job_context)83 IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
84   TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
85   mutex_.AssertHeld();
86   autovector<log::Writer*, 1> logs_to_sync;
87   uint64_t current_log_number = logfile_number_;
88   while (logs_.front().number < current_log_number &&
89          logs_.front().getting_synced) {
90     log_sync_cv_.Wait();
91   }
92   for (auto it = logs_.begin();
93        it != logs_.end() && it->number < current_log_number; ++it) {
94     auto& log = *it;
95     assert(!log.getting_synced);
96     log.getting_synced = true;
97     logs_to_sync.push_back(log.writer);
98   }
99 
100   IOStatus io_s;
101   if (!logs_to_sync.empty()) {
102     mutex_.Unlock();
103 
104     for (log::Writer* log : logs_to_sync) {
105       ROCKS_LOG_INFO(immutable_db_options_.info_log,
106                      "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
107                      log->get_log_number());
108       io_s = log->file()->Sync(immutable_db_options_.use_fsync);
109       if (!io_s.ok()) {
110         break;
111       }
112 
113       if (immutable_db_options_.recycle_log_file_num > 0) {
114         io_s = log->Close();
115         if (!io_s.ok()) {
116           break;
117         }
118       }
119     }
120     if (io_s.ok()) {
121       io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
122     }
123 
124     mutex_.Lock();
125 
126     // "number <= current_log_number - 1" is equivalent to
127     // "number < current_log_number".
128     if (io_s.ok()) {
129       io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
130     } else {
131       MarkLogsNotSynced(current_log_number - 1);
132     }
133     if (!io_s.ok()) {
134       TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
135       return io_s;
136     }
137   }
138   TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end");
139   return io_s;
140 }
141 
FlushMemTableToOutputFile(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,bool * made_progress,JobContext * job_context,SuperVersionContext * superversion_context,std::vector<SequenceNumber> & snapshot_seqs,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,LogBuffer * log_buffer,Env::Priority thread_pri)142 Status DBImpl::FlushMemTableToOutputFile(
143     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
144     bool* made_progress, JobContext* job_context,
145     SuperVersionContext* superversion_context,
146     std::vector<SequenceNumber>& snapshot_seqs,
147     SequenceNumber earliest_write_conflict_snapshot,
148     SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
149     Env::Priority thread_pri) {
150   mutex_.AssertHeld();
151   assert(cfd);
152   assert(cfd->imm()->NumNotFlushed() != 0);
153   assert(cfd->imm()->IsFlushPending());
154 
155   FlushJob flush_job(
156       dbname_, cfd, immutable_db_options_, mutable_cf_options,
157       port::kMaxUint64 /* memtable_id */, file_options_for_compaction_,
158       versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
159       earliest_write_conflict_snapshot, snapshot_checker, job_context,
160       log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
161       GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
162       &event_logger_, mutable_cf_options.report_bg_io_stats,
163       true /* sync_output_directory */, true /* write_manifest */, thread_pri,
164       io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(),
165       &blob_callback_);
166   FileMetaData file_meta;
167 
168 #ifndef ROCKSDB_LITE
169   // may temporarily unlock and lock the mutex.
170   NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
171 #endif  // ROCKSDB_LITE
172 
173   Status s;
174   bool need_cancel = false;
175   IOStatus log_io_s = IOStatus::OK();
176   if (logfile_number_ > 0 &&
177       versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
178     // If there are more than one column families, we need to make sure that
179     // all the log files except the most recent one are synced. Otherwise if
180     // the host crashes after flushing and before WAL is persistent, the
181     // flushed SST may contain data from write batches whose updates to
182     // other column families are missing.
183     // SyncClosedLogs() may unlock and re-lock the db_mutex.
184     log_io_s = SyncClosedLogs(job_context);
185     if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
186         !log_io_s.IsColumnFamilyDropped()) {
187       error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
188     }
189   } else {
190     TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
191   }
192   s = log_io_s;
193 
194   // If the log sync failed, we do not need to pick memtable. Otherwise,
195   // num_flush_not_started_ needs to be rollback.
196   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
197   if (s.ok()) {
198     flush_job.PickMemTable();
199     need_cancel = true;
200   }
201   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
202 
203   // Within flush_job.Run, rocksdb may call event listener to notify
204   // file creation and deletion.
205   //
206   // Note that flush_job.Run will unlock and lock the db_mutex,
207   // and EventListener callback will be called when the db_mutex
208   // is unlocked by the current thread.
209   if (s.ok()) {
210     s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
211     need_cancel = false;
212   }
213 
214   if (!s.ok() && need_cancel) {
215     flush_job.Cancel();
216   }
217   IOStatus io_s = IOStatus::OK();
218   io_s = flush_job.io_status();
219   if (s.ok()) {
220     s = io_s;
221   }
222 
223   if (s.ok()) {
224     InstallSuperVersionAndScheduleWork(cfd, superversion_context,
225                                        mutable_cf_options);
226     if (made_progress) {
227       *made_progress = true;
228     }
229 
230     const std::string& column_family_name = cfd->GetName();
231 
232     Version* const current = cfd->current();
233     assert(current);
234 
235     const VersionStorageInfo* const storage_info = current->storage_info();
236     assert(storage_info);
237 
238     VersionStorageInfo::LevelSummaryStorage tmp;
239     ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
240                      column_family_name.c_str(),
241                      storage_info->LevelSummary(&tmp));
242 
243     const auto& blob_files = storage_info->GetBlobFiles();
244     if (!blob_files.empty()) {
245       ROCKS_LOG_BUFFER(log_buffer,
246                        "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
247                        "\n",
248                        column_family_name.c_str(), blob_files.begin()->first,
249                        blob_files.rbegin()->first);
250     }
251   }
252 
253   if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
254     if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
255         !io_s.IsColumnFamilyDropped()) {
256       assert(log_io_s.ok());
257       // Error while writing to MANIFEST.
258       // In fact, versions_->io_status() can also be the result of renaming
259       // CURRENT file. With current code, it's just difficult to tell. So just
260       // be pessimistic and try write to a new MANIFEST.
261       // TODO: distinguish between MANIFEST write and CURRENT renaming
262       if (!versions_->io_status().ok()) {
263         // If WAL sync is successful (either WAL size is 0 or there is no IO
264         // error), all the Manifest write will be map to soft error.
265         // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is
266         // needed.
267         error_handler_.SetBGError(io_s,
268                                   BackgroundErrorReason::kManifestWriteNoWAL);
269       } else {
270         // If WAL sync is successful (either WAL size is 0 or there is no IO
271         // error), all the other SST file write errors will be set as
272         // kFlushNoWAL.
273         error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
274       }
275     } else {
276       if (log_io_s.ok()) {
277         Status new_bg_error = s;
278         error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
279       }
280     }
281   } else {
282     // If we got here, then we decided not to care about the i_os status (either
283     // from never needing it or ignoring the flush job status
284     io_s.PermitUncheckedError();
285   }
286   if (s.ok()) {
287 #ifndef ROCKSDB_LITE
288     // may temporarily unlock and lock the mutex.
289     NotifyOnFlushCompleted(cfd, mutable_cf_options,
290                            flush_job.GetCommittedFlushJobsInfo());
291     auto sfm = static_cast<SstFileManagerImpl*>(
292         immutable_db_options_.sst_file_manager.get());
293     if (sfm) {
294       // Notify sst_file_manager that a new file was added
295       std::string file_path = MakeTableFileName(
296           cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
297       // TODO (PR7798).  We should only add the file to the FileManager if it
298       // exists. Otherwise, some tests may fail.  Ignore the error in the
299       // interim.
300       sfm->OnAddFile(file_path).PermitUncheckedError();
301       if (sfm->IsMaxAllowedSpaceReached()) {
302         Status new_bg_error =
303             Status::SpaceLimit("Max allowed space was reached");
304         TEST_SYNC_POINT_CALLBACK(
305             "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
306             &new_bg_error);
307         error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
308       }
309     }
310 #endif  // ROCKSDB_LITE
311   }
312   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
313   return s;
314 }
315 
FlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)316 Status DBImpl::FlushMemTablesToOutputFiles(
317     const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
318     JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
319   if (immutable_db_options_.atomic_flush) {
320     return AtomicFlushMemTablesToOutputFiles(
321         bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
322   }
323   assert(bg_flush_args.size() == 1);
324   std::vector<SequenceNumber> snapshot_seqs;
325   SequenceNumber earliest_write_conflict_snapshot;
326   SnapshotChecker* snapshot_checker;
327   GetSnapshotContext(job_context, &snapshot_seqs,
328                      &earliest_write_conflict_snapshot, &snapshot_checker);
329   const auto& bg_flush_arg = bg_flush_args[0];
330   ColumnFamilyData* cfd = bg_flush_arg.cfd_;
331   MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
332   SuperVersionContext* superversion_context =
333       bg_flush_arg.superversion_context_;
334   Status s = FlushMemTableToOutputFile(
335       cfd, mutable_cf_options, made_progress, job_context, superversion_context,
336       snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
337       log_buffer, thread_pri);
338   return s;
339 }
340 
341 /*
342  * Atomically flushes multiple column families.
343  *
344  * For each column family, all memtables with ID smaller than or equal to the
345  * ID specified in bg_flush_args will be flushed. Only after all column
346  * families finish flush will this function commit to MANIFEST. If any of the
347  * column families are not flushed successfully, this function does not have
348  * any side-effect on the state of the database.
349  */
AtomicFlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)350 Status DBImpl::AtomicFlushMemTablesToOutputFiles(
351     const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
352     JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
353   mutex_.AssertHeld();
354 
355   autovector<ColumnFamilyData*> cfds;
356   for (const auto& arg : bg_flush_args) {
357     cfds.emplace_back(arg.cfd_);
358   }
359 
360 #ifndef NDEBUG
361   for (const auto cfd : cfds) {
362     assert(cfd->imm()->NumNotFlushed() != 0);
363     assert(cfd->imm()->IsFlushPending());
364   }
365 #endif /* !NDEBUG */
366 
367   std::vector<SequenceNumber> snapshot_seqs;
368   SequenceNumber earliest_write_conflict_snapshot;
369   SnapshotChecker* snapshot_checker;
370   GetSnapshotContext(job_context, &snapshot_seqs,
371                      &earliest_write_conflict_snapshot, &snapshot_checker);
372 
373   autovector<FSDirectory*> distinct_output_dirs;
374   autovector<std::string> distinct_output_dir_paths;
375   std::vector<std::unique_ptr<FlushJob>> jobs;
376   std::vector<MutableCFOptions> all_mutable_cf_options;
377   int num_cfs = static_cast<int>(cfds.size());
378   all_mutable_cf_options.reserve(num_cfs);
379   for (int i = 0; i < num_cfs; ++i) {
380     auto cfd = cfds[i];
381     FSDirectory* data_dir = GetDataDir(cfd, 0U);
382     const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
383 
384     // Add to distinct output directories if eligible. Use linear search. Since
385     // the number of elements in the vector is not large, performance should be
386     // tolerable.
387     bool found = false;
388     for (const auto& path : distinct_output_dir_paths) {
389       if (path == curr_path) {
390         found = true;
391         break;
392       }
393     }
394     if (!found) {
395       distinct_output_dir_paths.emplace_back(curr_path);
396       distinct_output_dirs.emplace_back(data_dir);
397     }
398 
399     all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
400     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
401     uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
402     jobs.emplace_back(new FlushJob(
403         dbname_, cfd, immutable_db_options_, mutable_cf_options,
404         max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
405         &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
406         snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
407         data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
408         stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
409         false /* sync_output_directory */, false /* write_manifest */,
410         thread_pri, io_tracer_, db_id_, db_session_id_,
411         cfd->GetFullHistoryTsLow()));
412   }
413 
414   std::vector<FileMetaData> file_meta(num_cfs);
415   Status s;
416   IOStatus log_io_s = IOStatus::OK();
417   assert(num_cfs == static_cast<int>(jobs.size()));
418 
419 #ifndef ROCKSDB_LITE
420   for (int i = 0; i != num_cfs; ++i) {
421     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
422     // may temporarily unlock and lock the mutex.
423     NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
424                        job_context->job_id);
425   }
426 #endif /* !ROCKSDB_LITE */
427 
428   if (logfile_number_ > 0) {
429     // TODO (yanqin) investigate whether we should sync the closed logs for
430     // single column family case.
431     log_io_s = SyncClosedLogs(job_context);
432     if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
433         !log_io_s.IsColumnFamilyDropped()) {
434       if (total_log_size_ > 0) {
435         error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
436       } else {
437         // If the WAL is empty, we use different error reason
438         error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL);
439       }
440     }
441   }
442   s = log_io_s;
443 
444   // exec_status stores the execution status of flush_jobs as
445   // <bool /* executed */, Status /* status code */>
446   autovector<std::pair<bool, Status>> exec_status;
447   autovector<IOStatus> io_status;
448   std::vector<bool> pick_status;
449   for (int i = 0; i != num_cfs; ++i) {
450     // Initially all jobs are not executed, with status OK.
451     exec_status.emplace_back(false, Status::OK());
452     io_status.emplace_back(IOStatus::OK());
453     pick_status.push_back(false);
454   }
455 
456   if (s.ok()) {
457     for (int i = 0; i != num_cfs; ++i) {
458       jobs[i]->PickMemTable();
459       pick_status[i] = true;
460     }
461   }
462 
463   if (s.ok()) {
464     // TODO (yanqin): parallelize jobs with threads.
465     for (int i = 1; i != num_cfs; ++i) {
466       exec_status[i].second =
467           jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
468       exec_status[i].first = true;
469       io_status[i] = jobs[i]->io_status();
470     }
471     if (num_cfs > 1) {
472       TEST_SYNC_POINT(
473           "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
474       TEST_SYNC_POINT(
475           "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
476     }
477     assert(exec_status.size() > 0);
478     assert(!file_meta.empty());
479     exec_status[0].second =
480         jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
481     exec_status[0].first = true;
482     io_status[0] = jobs[0]->io_status();
483 
484     Status error_status;
485     for (const auto& e : exec_status) {
486       if (!e.second.ok()) {
487         s = e.second;
488         if (!e.second.IsShutdownInProgress() &&
489             !e.second.IsColumnFamilyDropped()) {
490           // If a flush job did not return OK, and the CF is not dropped, and
491           // the DB is not shutting down, then we have to return this result to
492           // caller later.
493           error_status = e.second;
494         }
495       }
496     }
497 
498     s = error_status.ok() ? s : error_status;
499   }
500 
501   IOStatus io_s = IOStatus::OK();
502   if (io_s.ok()) {
503     IOStatus io_error = IOStatus::OK();
504     for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
505       if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
506           !io_status[i].IsColumnFamilyDropped()) {
507         io_error = io_status[i];
508       }
509     }
510     io_s = io_error;
511     if (s.ok() && !io_s.ok()) {
512       s = io_s;
513     }
514   }
515 
516   if (s.IsColumnFamilyDropped()) {
517     s = Status::OK();
518   }
519 
520   if (s.ok() || s.IsShutdownInProgress()) {
521     // Sync on all distinct output directories.
522     for (auto dir : distinct_output_dirs) {
523       if (dir != nullptr) {
524         Status error_status = dir->Fsync(IOOptions(), nullptr);
525         if (!error_status.ok()) {
526           s = error_status;
527           break;
528         }
529       }
530     }
531   } else {
532     // Need to undo atomic flush if something went wrong, i.e. s is not OK and
533     // it is not because of CF drop.
534     // Have to cancel the flush jobs that have NOT executed because we need to
535     // unref the versions.
536     for (int i = 0; i != num_cfs; ++i) {
537       if (pick_status[i] && !exec_status[i].first) {
538         jobs[i]->Cancel();
539       }
540     }
541     for (int i = 0; i != num_cfs; ++i) {
542       if (exec_status[i].second.ok() && exec_status[i].first) {
543         auto& mems = jobs[i]->GetMemTables();
544         cfds[i]->imm()->RollbackMemtableFlush(mems,
545                                               file_meta[i].fd.GetNumber());
546       }
547     }
548   }
549 
550   if (s.ok()) {
551     auto wait_to_install_func = [&]() {
552       bool ready = true;
553       for (size_t i = 0; i != cfds.size(); ++i) {
554         const auto& mems = jobs[i]->GetMemTables();
555         if (cfds[i]->IsDropped()) {
556           // If the column family is dropped, then do not wait.
557           continue;
558         } else if (!mems.empty() &&
559                    cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
560           // If a flush job needs to install the flush result for mems and
561           // mems[0] is not the earliest memtable, it means another thread must
562           // be installing flush results for the same column family, then the
563           // current thread needs to wait.
564           ready = false;
565           break;
566         } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
567                                        bg_flush_args[i].max_memtable_id_) {
568           // If a flush job does not need to install flush results, then it has
569           // to wait until all memtables up to max_memtable_id_ (inclusive) are
570           // installed.
571           ready = false;
572           break;
573         }
574       }
575       return ready;
576     };
577 
578     bool resuming_from_bg_err = error_handler_.IsDBStopped();
579     while ((!error_handler_.IsDBStopped() ||
580             error_handler_.GetRecoveryError().ok()) &&
581            !wait_to_install_func()) {
582       atomic_flush_install_cv_.Wait();
583     }
584 
585     s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
586                              : error_handler_.GetBGError();
587   }
588 
589   if (s.ok()) {
590     autovector<ColumnFamilyData*> tmp_cfds;
591     autovector<const autovector<MemTable*>*> mems_list;
592     autovector<const MutableCFOptions*> mutable_cf_options_list;
593     autovector<FileMetaData*> tmp_file_meta;
594     for (int i = 0; i != num_cfs; ++i) {
595       const auto& mems = jobs[i]->GetMemTables();
596       if (!cfds[i]->IsDropped() && !mems.empty()) {
597         tmp_cfds.emplace_back(cfds[i]);
598         mems_list.emplace_back(&mems);
599         mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
600         tmp_file_meta.emplace_back(&file_meta[i]);
601       }
602     }
603 
604     s = InstallMemtableAtomicFlushResults(
605         nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
606         versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
607         &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
608   }
609 
610   if (s.ok()) {
611     assert(num_cfs ==
612            static_cast<int>(job_context->superversion_contexts.size()));
613     for (int i = 0; i != num_cfs; ++i) {
614       assert(cfds[i]);
615 
616       if (cfds[i]->IsDropped()) {
617         continue;
618       }
619       InstallSuperVersionAndScheduleWork(cfds[i],
620                                          &job_context->superversion_contexts[i],
621                                          all_mutable_cf_options[i]);
622 
623       const std::string& column_family_name = cfds[i]->GetName();
624 
625       Version* const current = cfds[i]->current();
626       assert(current);
627 
628       const VersionStorageInfo* const storage_info = current->storage_info();
629       assert(storage_info);
630 
631       VersionStorageInfo::LevelSummaryStorage tmp;
632       ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
633                        column_family_name.c_str(),
634                        storage_info->LevelSummary(&tmp));
635 
636       const auto& blob_files = storage_info->GetBlobFiles();
637       if (!blob_files.empty()) {
638         ROCKS_LOG_BUFFER(log_buffer,
639                          "[%s] Blob file summary: head=%" PRIu64
640                          ", tail=%" PRIu64 "\n",
641                          column_family_name.c_str(), blob_files.begin()->first,
642                          blob_files.rbegin()->first);
643       }
644     }
645     if (made_progress) {
646       *made_progress = true;
647     }
648 #ifndef ROCKSDB_LITE
649     auto sfm = static_cast<SstFileManagerImpl*>(
650         immutable_db_options_.sst_file_manager.get());
651     assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
652     for (int i = 0; s.ok() && i != num_cfs; ++i) {
653       if (cfds[i]->IsDropped()) {
654         continue;
655       }
656       NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
657                              jobs[i]->GetCommittedFlushJobsInfo());
658       if (sfm) {
659         std::string file_path = MakeTableFileName(
660             cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
661         // TODO (PR7798).  We should only add the file to the FileManager if it
662         // exists. Otherwise, some tests may fail.  Ignore the error in the
663         // interim.
664         sfm->OnAddFile(file_path).PermitUncheckedError();
665         if (sfm->IsMaxAllowedSpaceReached() &&
666             error_handler_.GetBGError().ok()) {
667           Status new_bg_error =
668               Status::SpaceLimit("Max allowed space was reached");
669           error_handler_.SetBGError(new_bg_error,
670                                     BackgroundErrorReason::kFlush);
671         }
672       }
673     }
674 #endif  // ROCKSDB_LITE
675   }
676 
677   // Need to undo atomic flush if something went wrong, i.e. s is not OK and
678   // it is not because of CF drop.
679   if (!s.ok() && !s.IsColumnFamilyDropped()) {
680     if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
681       assert(log_io_s.ok());
682       // Error while writing to MANIFEST.
683       // In fact, versions_->io_status() can also be the result of renaming
684       // CURRENT file. With current code, it's just difficult to tell. So just
685       // be pessimistic and try write to a new MANIFEST.
686       // TODO: distinguish between MANIFEST write and CURRENT renaming
687       if (!versions_->io_status().ok()) {
688         // If WAL sync is successful (either WAL size is 0 or there is no IO
689         // error), all the Manifest write will be map to soft error.
690         // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor
691         // is needed.
692         error_handler_.SetBGError(io_s,
693                                   BackgroundErrorReason::kManifestWriteNoWAL);
694       } else {
695         // If WAL sync is successful (either WAL size is 0 or there is no IO
696         // error), all the other SST file write errors will be set as
697         // kFlushNoWAL.
698         error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
699       }
700     } else {
701       if (log_io_s.ok()) {
702         Status new_bg_error = s;
703         error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
704       }
705     }
706   }
707 
708   return s;
709 }
710 
NotifyOnFlushBegin(ColumnFamilyData * cfd,FileMetaData * file_meta,const MutableCFOptions & mutable_cf_options,int job_id)711 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
712                                 const MutableCFOptions& mutable_cf_options,
713                                 int job_id) {
714 #ifndef ROCKSDB_LITE
715   if (immutable_db_options_.listeners.size() == 0U) {
716     return;
717   }
718   mutex_.AssertHeld();
719   if (shutting_down_.load(std::memory_order_acquire)) {
720     return;
721   }
722   bool triggered_writes_slowdown =
723       (cfd->current()->storage_info()->NumLevelFiles(0) >=
724        mutable_cf_options.level0_slowdown_writes_trigger);
725   bool triggered_writes_stop =
726       (cfd->current()->storage_info()->NumLevelFiles(0) >=
727        mutable_cf_options.level0_stop_writes_trigger);
728   // release lock while notifying events
729   mutex_.Unlock();
730   {
731     FlushJobInfo info{};
732     info.cf_id = cfd->GetID();
733     info.cf_name = cfd->GetName();
734     // TODO(yhchiang): make db_paths dynamic in case flush does not
735     //                 go to L0 in the future.
736     const uint64_t file_number = file_meta->fd.GetNumber();
737     info.file_path =
738         MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
739     info.file_number = file_number;
740     info.thread_id = env_->GetThreadID();
741     info.job_id = job_id;
742     info.triggered_writes_slowdown = triggered_writes_slowdown;
743     info.triggered_writes_stop = triggered_writes_stop;
744     info.smallest_seqno = file_meta->fd.smallest_seqno;
745     info.largest_seqno = file_meta->fd.largest_seqno;
746     info.flush_reason = cfd->GetFlushReason();
747     for (auto listener : immutable_db_options_.listeners) {
748       listener->OnFlushBegin(this, info);
749     }
750   }
751   mutex_.Lock();
752 // no need to signal bg_cv_ as it will be signaled at the end of the
753 // flush process.
754 #else
755   (void)cfd;
756   (void)file_meta;
757   (void)mutable_cf_options;
758   (void)job_id;
759 #endif  // ROCKSDB_LITE
760 }
761 
NotifyOnFlushCompleted(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,std::list<std::unique_ptr<FlushJobInfo>> * flush_jobs_info)762 void DBImpl::NotifyOnFlushCompleted(
763     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
764     std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
765 #ifndef ROCKSDB_LITE
766   assert(flush_jobs_info != nullptr);
767   if (immutable_db_options_.listeners.size() == 0U) {
768     return;
769   }
770   mutex_.AssertHeld();
771   if (shutting_down_.load(std::memory_order_acquire)) {
772     return;
773   }
774   bool triggered_writes_slowdown =
775       (cfd->current()->storage_info()->NumLevelFiles(0) >=
776        mutable_cf_options.level0_slowdown_writes_trigger);
777   bool triggered_writes_stop =
778       (cfd->current()->storage_info()->NumLevelFiles(0) >=
779        mutable_cf_options.level0_stop_writes_trigger);
780   // release lock while notifying events
781   mutex_.Unlock();
782   {
783     for (auto& info : *flush_jobs_info) {
784       info->triggered_writes_slowdown = triggered_writes_slowdown;
785       info->triggered_writes_stop = triggered_writes_stop;
786       for (auto listener : immutable_db_options_.listeners) {
787         listener->OnFlushCompleted(this, *info);
788       }
789     }
790     flush_jobs_info->clear();
791   }
792   mutex_.Lock();
793   // no need to signal bg_cv_ as it will be signaled at the end of the
794   // flush process.
795 #else
796   (void)cfd;
797   (void)mutable_cf_options;
798   (void)flush_jobs_info;
799 #endif  // ROCKSDB_LITE
800 }
801 
CompactRange(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin_without_ts,const Slice * end_without_ts)802 Status DBImpl::CompactRange(const CompactRangeOptions& options,
803                             ColumnFamilyHandle* column_family,
804                             const Slice* begin_without_ts,
805                             const Slice* end_without_ts) {
806   if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
807     return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
808   }
809 
810   const Comparator* const ucmp = column_family->GetComparator();
811   assert(ucmp);
812   size_t ts_sz = ucmp->timestamp_size();
813   if (ts_sz == 0) {
814     return CompactRangeInternal(options, column_family, begin_without_ts,
815                                 end_without_ts);
816   }
817 
818   std::string begin_str;
819   std::string end_str;
820 
821   // CompactRange compact all keys: [begin, end] inclusively. Add maximum
822   // timestamp to include all `begin` keys, and add minimal timestamp to include
823   // all `end` keys.
824   if (begin_without_ts != nullptr) {
825     AppendKeyWithMaxTimestamp(&begin_str, *begin_without_ts, ts_sz);
826   }
827   if (end_without_ts != nullptr) {
828     AppendKeyWithMinTimestamp(&end_str, *end_without_ts, ts_sz);
829   }
830   Slice begin(begin_str);
831   Slice end(end_str);
832 
833   Slice* begin_with_ts = begin_without_ts ? &begin : nullptr;
834   Slice* end_with_ts = end_without_ts ? &end : nullptr;
835 
836   return CompactRangeInternal(options, column_family, begin_with_ts,
837                               end_with_ts);
838 }
839 
IncreaseFullHistoryTsLow(ColumnFamilyData * cfd,std::string ts_low)840 Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyData* cfd,
841                                         std::string ts_low) {
842   VersionEdit edit;
843   edit.SetColumnFamily(cfd->GetID());
844   edit.SetFullHistoryTsLow(ts_low);
845 
846   InstrumentedMutexLock l(&mutex_);
847   std::string current_ts_low = cfd->GetFullHistoryTsLow();
848   const Comparator* ucmp = cfd->user_comparator();
849   if (!current_ts_low.empty() &&
850       ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
851     return Status::InvalidArgument(
852         "Cannot decrease full_history_timestamp_low");
853   }
854 
855   return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
856                                 &mutex_);
857 }
858 
CompactRangeInternal(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin,const Slice * end)859 Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
860                                     ColumnFamilyHandle* column_family,
861                                     const Slice* begin, const Slice* end) {
862   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
863   auto cfd = cfh->cfd();
864 
865   if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
866     return Status::InvalidArgument("Invalid target path ID");
867   }
868 
869   bool flush_needed = true;
870 
871   // Update full_history_ts_low if it's set
872   if (options.full_history_ts_low != nullptr &&
873       !options.full_history_ts_low->empty()) {
874     std::string ts_low = options.full_history_ts_low->ToString();
875     if (begin != nullptr || end != nullptr) {
876       return Status::InvalidArgument(
877           "Cannot specify compaction range with full_history_ts_low");
878     }
879     Status s = IncreaseFullHistoryTsLow(cfd, ts_low);
880     if (!s.ok()) {
881       LogFlush(immutable_db_options_.info_log);
882       return s;
883     }
884   }
885 
886   Status s;
887   if (begin != nullptr && end != nullptr) {
888     // TODO(ajkr): We could also optimize away the flush in certain cases where
889     // one/both sides of the interval are unbounded. But it requires more
890     // changes to RangesOverlapWithMemtables.
891     Range range(*begin, *end);
892     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
893     s = cfd->RangesOverlapWithMemtables(
894         {range}, super_version, immutable_db_options_.allow_data_in_errors,
895         &flush_needed);
896     CleanupSuperVersion(super_version);
897   }
898 
899   if (s.ok() && flush_needed) {
900     FlushOptions fo;
901     fo.allow_write_stall = options.allow_write_stall;
902     if (immutable_db_options_.atomic_flush) {
903       autovector<ColumnFamilyData*> cfds;
904       mutex_.Lock();
905       SelectColumnFamiliesForAtomicFlush(&cfds);
906       mutex_.Unlock();
907       s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
908                                false /* writes_stopped */);
909     } else {
910       s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
911                         false /* writes_stopped*/);
912     }
913     if (!s.ok()) {
914       LogFlush(immutable_db_options_.info_log);
915       return s;
916     }
917   }
918 
919   constexpr int kInvalidLevel = -1;
920   int final_output_level = kInvalidLevel;
921   bool exclusive = options.exclusive_manual_compaction;
922   if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
923       cfd->NumberLevels() > 1) {
924     // Always compact all files together.
925     final_output_level = cfd->NumberLevels() - 1;
926     // if bottom most level is reserved
927     if (immutable_db_options_.allow_ingest_behind) {
928       final_output_level--;
929     }
930     s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
931                             final_output_level, options, begin, end, exclusive,
932                             false, port::kMaxUint64);
933   } else {
934     int first_overlapped_level = kInvalidLevel;
935     int max_overlapped_level = kInvalidLevel;
936     {
937       SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
938       Version* current_version = super_version->current;
939       ReadOptions ro;
940       ro.total_order_seek = true;
941       bool overlap;
942       for (int level = 0;
943            level < current_version->storage_info()->num_non_empty_levels();
944            level++) {
945         overlap = true;
946         if (begin != nullptr && end != nullptr) {
947           Status status = current_version->OverlapWithLevelIterator(
948               ro, file_options_, *begin, *end, level, &overlap);
949           if (!status.ok()) {
950             overlap = current_version->storage_info()->OverlapInLevel(
951                 level, begin, end);
952           }
953         } else {
954           overlap = current_version->storage_info()->OverlapInLevel(level,
955                                                                     begin, end);
956         }
957         if (overlap) {
958           if (first_overlapped_level == kInvalidLevel) {
959             first_overlapped_level = level;
960           }
961           max_overlapped_level = level;
962         }
963       }
964       CleanupSuperVersion(super_version);
965     }
966     if (s.ok() && first_overlapped_level != kInvalidLevel) {
967       // max_file_num_to_ignore can be used to filter out newly created SST
968       // files, useful for bottom level compaction in a manual compaction
969       uint64_t max_file_num_to_ignore = port::kMaxUint64;
970       uint64_t next_file_number = versions_->current_next_file_number();
971       final_output_level = max_overlapped_level;
972       int output_level;
973       for (int level = first_overlapped_level; level <= max_overlapped_level;
974            level++) {
975         bool disallow_trivial_move = false;
976         // in case the compaction is universal or if we're compacting the
977         // bottom-most level, the output level will be the same as input one.
978         // level 0 can never be the bottommost level (i.e. if all files are in
979         // level 0, we will compact to level 1)
980         if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
981             cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
982           output_level = level;
983         } else if (level == max_overlapped_level && level > 0) {
984           if (options.bottommost_level_compaction ==
985               BottommostLevelCompaction::kSkip) {
986             // Skip bottommost level compaction
987             continue;
988           } else if (options.bottommost_level_compaction ==
989                          BottommostLevelCompaction::kIfHaveCompactionFilter &&
990                      cfd->ioptions()->compaction_filter == nullptr &&
991                      cfd->ioptions()->compaction_filter_factory == nullptr) {
992             // Skip bottommost level compaction since we don't have a compaction
993             // filter
994             continue;
995           }
996           output_level = level;
997           // update max_file_num_to_ignore only for bottom level compaction
998           // because data in newly compacted files in middle levels may still
999           // need to be pushed down
1000           max_file_num_to_ignore = next_file_number;
1001         } else {
1002           output_level = level + 1;
1003           if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
1004               cfd->ioptions()->level_compaction_dynamic_level_bytes &&
1005               level == 0) {
1006             output_level = ColumnFamilyData::kCompactToBaseLevel;
1007           }
1008           // if it's a BottommostLevel compaction and `kForce*` compaction is
1009           // set, disallow trivial move
1010           if (level == max_overlapped_level &&
1011               (options.bottommost_level_compaction ==
1012                    BottommostLevelCompaction::kForce ||
1013                options.bottommost_level_compaction ==
1014                    BottommostLevelCompaction::kForceOptimized)) {
1015             disallow_trivial_move = true;
1016           }
1017         }
1018         s = RunManualCompaction(cfd, level, output_level, options, begin, end,
1019                                 exclusive, disallow_trivial_move,
1020                                 max_file_num_to_ignore);
1021         if (!s.ok()) {
1022           break;
1023         }
1024         if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
1025           final_output_level = cfd->NumberLevels() - 1;
1026         } else if (output_level > final_output_level) {
1027           final_output_level = output_level;
1028         }
1029         TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
1030         TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
1031       }
1032     }
1033   }
1034   if (!s.ok() || final_output_level == kInvalidLevel) {
1035     LogFlush(immutable_db_options_.info_log);
1036     return s;
1037   }
1038 
1039   if (options.change_level) {
1040     TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1");
1041     TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2");
1042 
1043     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1044                    "[RefitLevel] waiting for background threads to stop");
1045     DisableManualCompaction();
1046     s = PauseBackgroundWork();
1047     if (s.ok()) {
1048       TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
1049       s = ReFitLevel(cfd, final_output_level, options.target_level);
1050       TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
1051       // ContinueBackgroundWork always return Status::OK().
1052       Status temp_s = ContinueBackgroundWork();
1053       assert(temp_s.ok());
1054     }
1055     EnableManualCompaction();
1056   }
1057   LogFlush(immutable_db_options_.info_log);
1058 
1059   {
1060     InstrumentedMutexLock l(&mutex_);
1061     // an automatic compaction that has been scheduled might have been
1062     // preempted by the manual compactions. Need to schedule it back.
1063     MaybeScheduleFlushOrCompaction();
1064   }
1065 
1066   return s;
1067 }
1068 
CompactFiles(const CompactionOptions & compact_options,ColumnFamilyHandle * column_family,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)1069 Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
1070                             ColumnFamilyHandle* column_family,
1071                             const std::vector<std::string>& input_file_names,
1072                             const int output_level, const int output_path_id,
1073                             std::vector<std::string>* const output_file_names,
1074                             CompactionJobInfo* compaction_job_info) {
1075 #ifdef ROCKSDB_LITE
1076   (void)compact_options;
1077   (void)column_family;
1078   (void)input_file_names;
1079   (void)output_level;
1080   (void)output_path_id;
1081   (void)output_file_names;
1082   (void)compaction_job_info;
1083   // not supported in lite version
1084   return Status::NotSupported("Not supported in ROCKSDB LITE");
1085 #else
1086   if (column_family == nullptr) {
1087     return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
1088   }
1089 
1090   auto cfd =
1091       static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
1092   assert(cfd);
1093 
1094   Status s;
1095   JobContext job_context(next_job_id_.fetch_add(1), true);
1096   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1097                        immutable_db_options_.info_log.get());
1098 
1099   // Perform CompactFiles
1100   TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
1101   {
1102     InstrumentedMutexLock l(&mutex_);
1103 
1104     // This call will unlock/lock the mutex to wait for current running
1105     // IngestExternalFile() calls to finish.
1106     WaitForIngestFile();
1107 
1108     // We need to get current after `WaitForIngestFile`, because
1109     // `IngestExternalFile` may add files that overlap with `input_file_names`
1110     auto* current = cfd->current();
1111     current->Ref();
1112 
1113     s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
1114                          output_file_names, output_level, output_path_id,
1115                          &job_context, &log_buffer, compaction_job_info);
1116 
1117     current->Unref();
1118   }
1119 
1120   // Find and delete obsolete files
1121   {
1122     InstrumentedMutexLock l(&mutex_);
1123     // If !s.ok(), this means that Compaction failed. In that case, we want
1124     // to delete all obsolete files we might have created and we force
1125     // FindObsoleteFiles(). This is because job_context does not
1126     // catch all created files if compaction failed.
1127     FindObsoleteFiles(&job_context, !s.ok());
1128   }  // release the mutex
1129 
1130   // delete unnecessary files if any, this is done outside the mutex
1131   if (job_context.HaveSomethingToClean() ||
1132       job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
1133     // Have to flush the info logs before bg_compaction_scheduled_--
1134     // because if bg_flush_scheduled_ becomes 0 and the lock is
1135     // released, the deconstructor of DB can kick in and destroy all the
1136     // states of DB so info_log might not be available after that point.
1137     // It also applies to access other states that DB owns.
1138     log_buffer.FlushBufferToLog();
1139     if (job_context.HaveSomethingToDelete()) {
1140       // no mutex is locked here.  No need to Unlock() and Lock() here.
1141       PurgeObsoleteFiles(job_context);
1142     }
1143     job_context.Clean();
1144   }
1145 
1146   return s;
1147 #endif  // ROCKSDB_LITE
1148 }
1149 
1150 #ifndef ROCKSDB_LITE
CompactFilesImpl(const CompactionOptions & compact_options,ColumnFamilyData * cfd,Version * version,const std::vector<std::string> & input_file_names,std::vector<std::string> * const output_file_names,const int output_level,int output_path_id,JobContext * job_context,LogBuffer * log_buffer,CompactionJobInfo * compaction_job_info)1151 Status DBImpl::CompactFilesImpl(
1152     const CompactionOptions& compact_options, ColumnFamilyData* cfd,
1153     Version* version, const std::vector<std::string>& input_file_names,
1154     std::vector<std::string>* const output_file_names, const int output_level,
1155     int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
1156     CompactionJobInfo* compaction_job_info) {
1157   mutex_.AssertHeld();
1158 
1159   if (shutting_down_.load(std::memory_order_acquire)) {
1160     return Status::ShutdownInProgress();
1161   }
1162   if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1163     return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1164   }
1165 
1166   std::unordered_set<uint64_t> input_set;
1167   for (const auto& file_name : input_file_names) {
1168     input_set.insert(TableFileNameToNumber(file_name));
1169   }
1170 
1171   ColumnFamilyMetaData cf_meta;
1172   // TODO(yhchiang): can directly use version here if none of the
1173   // following functions call is pluggable to external developers.
1174   version->GetColumnFamilyMetaData(&cf_meta);
1175 
1176   if (output_path_id < 0) {
1177     if (cfd->ioptions()->cf_paths.size() == 1U) {
1178       output_path_id = 0;
1179     } else {
1180       return Status::NotSupported(
1181           "Automatic output path selection is not "
1182           "yet supported in CompactFiles()");
1183     }
1184   }
1185 
1186   Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
1187       &input_set, cf_meta, output_level);
1188   if (!s.ok()) {
1189     return s;
1190   }
1191 
1192   std::vector<CompactionInputFiles> input_files;
1193   s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
1194       &input_files, &input_set, version->storage_info(), compact_options);
1195   if (!s.ok()) {
1196     return s;
1197   }
1198 
1199   for (const auto& inputs : input_files) {
1200     if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
1201       return Status::Aborted(
1202           "Some of the necessary compaction input "
1203           "files are already being compacted");
1204     }
1205   }
1206   bool sfm_reserved_compact_space = false;
1207   // First check if we have enough room to do the compaction
1208   bool enough_room = EnoughRoomForCompaction(
1209       cfd, input_files, &sfm_reserved_compact_space, log_buffer);
1210 
1211   if (!enough_room) {
1212     // m's vars will get set properly at the end of this function,
1213     // as long as status == CompactionTooLarge
1214     return Status::CompactionTooLarge();
1215   }
1216 
1217   // At this point, CompactFiles will be run.
1218   bg_compaction_scheduled_++;
1219 
1220   std::unique_ptr<Compaction> c;
1221   assert(cfd->compaction_picker());
1222   c.reset(cfd->compaction_picker()->CompactFiles(
1223       compact_options, input_files, output_level, version->storage_info(),
1224       *cfd->GetLatestMutableCFOptions(), mutable_db_options_, output_path_id));
1225   // we already sanitized the set of input files and checked for conflicts
1226   // without releasing the lock, so we're guaranteed a compaction can be formed.
1227   assert(c != nullptr);
1228 
1229   c->SetInputVersion(version);
1230   // deletion compaction currently not allowed in CompactFiles.
1231   assert(!c->deletion_compaction());
1232 
1233   std::vector<SequenceNumber> snapshot_seqs;
1234   SequenceNumber earliest_write_conflict_snapshot;
1235   SnapshotChecker* snapshot_checker;
1236   GetSnapshotContext(job_context, &snapshot_seqs,
1237                      &earliest_write_conflict_snapshot, &snapshot_checker);
1238 
1239   std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1240       new std::list<uint64_t>::iterator(
1241           CaptureCurrentFileNumberInPendingOutputs()));
1242 
1243   assert(is_snapshot_supported_ || snapshots_.empty());
1244   CompactionJobStats compaction_job_stats;
1245   CompactionJob compaction_job(
1246       job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_,
1247       file_options_for_compaction_, versions_.get(), &shutting_down_,
1248       preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
1249       GetDataDir(c->column_family_data(), c->output_path_id()),
1250       GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
1251       snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1252       table_cache_, &event_logger_,
1253       c->mutable_cf_options()->paranoid_file_checks,
1254       c->mutable_cf_options()->report_bg_io_stats, dbname_,
1255       &compaction_job_stats, Env::Priority::USER, io_tracer_,
1256       &manual_compaction_paused_, db_id_, db_session_id_,
1257       c->column_family_data()->GetFullHistoryTsLow());
1258 
1259   // Creating a compaction influences the compaction score because the score
1260   // takes running compactions into account (by skipping files that are already
1261   // being compacted). Since we just changed compaction score, we recalculate it
1262   // here.
1263   version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
1264                                                   *c->mutable_cf_options());
1265 
1266   compaction_job.Prepare();
1267 
1268   mutex_.Unlock();
1269   TEST_SYNC_POINT("CompactFilesImpl:0");
1270   TEST_SYNC_POINT("CompactFilesImpl:1");
1271   // Ignore the status here, as it will be checked in the Install down below...
1272   compaction_job.Run().PermitUncheckedError();
1273   TEST_SYNC_POINT("CompactFilesImpl:2");
1274   TEST_SYNC_POINT("CompactFilesImpl:3");
1275   mutex_.Lock();
1276 
1277   Status status = compaction_job.Install(*c->mutable_cf_options());
1278   if (status.ok()) {
1279     assert(compaction_job.io_status().ok());
1280     InstallSuperVersionAndScheduleWork(c->column_family_data(),
1281                                        &job_context->superversion_contexts[0],
1282                                        *c->mutable_cf_options());
1283   }
1284   // status above captures any error during compaction_job.Install, so its ok
1285   // not check compaction_job.io_status() explicitly if we're not calling
1286   // SetBGError
1287   compaction_job.io_status().PermitUncheckedError();
1288   c->ReleaseCompactionFiles(s);
1289 #ifndef ROCKSDB_LITE
1290   // Need to make sure SstFileManager does its bookkeeping
1291   auto sfm = static_cast<SstFileManagerImpl*>(
1292       immutable_db_options_.sst_file_manager.get());
1293   if (sfm && sfm_reserved_compact_space) {
1294     sfm->OnCompactionCompletion(c.get());
1295   }
1296 #endif  // ROCKSDB_LITE
1297 
1298   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1299 
1300   if (compaction_job_info != nullptr) {
1301     BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1302                            job_context->job_id, version, compaction_job_info);
1303   }
1304 
1305   if (status.ok()) {
1306     // Done
1307   } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
1308     // Ignore compaction errors found during shutting down
1309   } else if (status.IsManualCompactionPaused()) {
1310     // Don't report stopping manual compaction as error
1311     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1312                    "[%s] [JOB %d] Stopping manual compaction",
1313                    c->column_family_data()->GetName().c_str(),
1314                    job_context->job_id);
1315   } else {
1316     ROCKS_LOG_WARN(immutable_db_options_.info_log,
1317                    "[%s] [JOB %d] Compaction error: %s",
1318                    c->column_family_data()->GetName().c_str(),
1319                    job_context->job_id, status.ToString().c_str());
1320     IOStatus io_s = compaction_job.io_status();
1321     if (!io_s.ok()) {
1322       error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
1323     } else {
1324       error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1325     }
1326   }
1327 
1328   if (output_file_names != nullptr) {
1329     for (const auto& newf : c->edit()->GetNewFiles()) {
1330       (*output_file_names)
1331           .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1332                                    newf.second.fd.GetNumber(),
1333                                    newf.second.fd.GetPathId()));
1334     }
1335   }
1336 
1337   c.reset();
1338 
1339   bg_compaction_scheduled_--;
1340   if (bg_compaction_scheduled_ == 0) {
1341     bg_cv_.SignalAll();
1342   }
1343   MaybeScheduleFlushOrCompaction();
1344   TEST_SYNC_POINT("CompactFilesImpl:End");
1345 
1346   return status;
1347 }
1348 #endif  // ROCKSDB_LITE
1349 
PauseBackgroundWork()1350 Status DBImpl::PauseBackgroundWork() {
1351   InstrumentedMutexLock guard_lock(&mutex_);
1352   bg_compaction_paused_++;
1353   while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1354          bg_flush_scheduled_ > 0) {
1355     bg_cv_.Wait();
1356   }
1357   bg_work_paused_++;
1358   return Status::OK();
1359 }
1360 
ContinueBackgroundWork()1361 Status DBImpl::ContinueBackgroundWork() {
1362   InstrumentedMutexLock guard_lock(&mutex_);
1363   if (bg_work_paused_ == 0) {
1364     return Status::InvalidArgument();
1365   }
1366   assert(bg_work_paused_ > 0);
1367   assert(bg_compaction_paused_ > 0);
1368   bg_compaction_paused_--;
1369   bg_work_paused_--;
1370   // It's sufficient to check just bg_work_paused_ here since
1371   // bg_work_paused_ is always no greater than bg_compaction_paused_
1372   if (bg_work_paused_ == 0) {
1373     MaybeScheduleFlushOrCompaction();
1374   }
1375   return Status::OK();
1376 }
1377 
NotifyOnCompactionBegin(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & job_stats,int job_id)1378 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1379                                      const Status& st,
1380                                      const CompactionJobStats& job_stats,
1381                                      int job_id) {
1382 #ifndef ROCKSDB_LITE
1383   if (immutable_db_options_.listeners.empty()) {
1384     return;
1385   }
1386   mutex_.AssertHeld();
1387   if (shutting_down_.load(std::memory_order_acquire)) {
1388     return;
1389   }
1390   if (c->is_manual_compaction() &&
1391       manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1392     return;
1393   }
1394   Version* current = cfd->current();
1395   current->Ref();
1396   // release lock while notifying events
1397   mutex_.Unlock();
1398   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1399   {
1400     CompactionJobInfo info{};
1401     BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info);
1402     for (auto listener : immutable_db_options_.listeners) {
1403       listener->OnCompactionBegin(this, info);
1404     }
1405     info.status.PermitUncheckedError();
1406   }
1407   mutex_.Lock();
1408   current->Unref();
1409 #else
1410   (void)cfd;
1411   (void)c;
1412   (void)st;
1413   (void)job_stats;
1414   (void)job_id;
1415 #endif  // ROCKSDB_LITE
1416 }
1417 
NotifyOnCompactionCompleted(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id)1418 void DBImpl::NotifyOnCompactionCompleted(
1419     ColumnFamilyData* cfd, Compaction* c, const Status& st,
1420     const CompactionJobStats& compaction_job_stats, const int job_id) {
1421 #ifndef ROCKSDB_LITE
1422   if (immutable_db_options_.listeners.size() == 0U) {
1423     return;
1424   }
1425   mutex_.AssertHeld();
1426   if (shutting_down_.load(std::memory_order_acquire)) {
1427     return;
1428   }
1429   if (c->is_manual_compaction() &&
1430       manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1431     return;
1432   }
1433   Version* current = cfd->current();
1434   current->Ref();
1435   // release lock while notifying events
1436   mutex_.Unlock();
1437   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1438   {
1439     CompactionJobInfo info{};
1440     BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1441                            &info);
1442     for (auto listener : immutable_db_options_.listeners) {
1443       listener->OnCompactionCompleted(this, info);
1444     }
1445   }
1446   mutex_.Lock();
1447   current->Unref();
1448   // no need to signal bg_cv_ as it will be signaled at the end of the
1449   // flush process.
1450 #else
1451   (void)cfd;
1452   (void)c;
1453   (void)st;
1454   (void)compaction_job_stats;
1455   (void)job_id;
1456 #endif  // ROCKSDB_LITE
1457 }
1458 
1459 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1460 // before calling this function
ReFitLevel(ColumnFamilyData * cfd,int level,int target_level)1461 Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1462   assert(level < cfd->NumberLevels());
1463   if (target_level >= cfd->NumberLevels()) {
1464     return Status::InvalidArgument("Target level exceeds number of levels");
1465   }
1466 
1467   SuperVersionContext sv_context(/* create_superversion */ true);
1468 
1469   InstrumentedMutexLock guard_lock(&mutex_);
1470 
1471   // only allow one thread refitting
1472   if (refitting_level_) {
1473     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1474                    "[ReFitLevel] another thread is refitting");
1475     return Status::NotSupported("another thread is refitting");
1476   }
1477   refitting_level_ = true;
1478 
1479   const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1480   // move to a smaller level
1481   int to_level = target_level;
1482   if (target_level < 0) {
1483     to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1484   }
1485 
1486   auto* vstorage = cfd->current()->storage_info();
1487   if (to_level != level) {
1488     if (to_level > level) {
1489       if (level == 0) {
1490         refitting_level_ = false;
1491         return Status::NotSupported(
1492             "Cannot change from level 0 to other levels.");
1493       }
1494       // Check levels are empty for a trivial move
1495       for (int l = level + 1; l <= to_level; l++) {
1496         if (vstorage->NumLevelFiles(l) > 0) {
1497           refitting_level_ = false;
1498           return Status::NotSupported(
1499               "Levels between source and target are not empty for a move.");
1500         }
1501       }
1502     } else {
1503       // to_level < level
1504       // Check levels are empty for a trivial move
1505       for (int l = to_level; l < level; l++) {
1506         if (vstorage->NumLevelFiles(l) > 0) {
1507           refitting_level_ = false;
1508           return Status::NotSupported(
1509               "Levels between source and target are not empty for a move.");
1510         }
1511       }
1512     }
1513     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1514                     "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1515                     cfd->current()->DebugString().data());
1516 
1517     VersionEdit edit;
1518     edit.SetColumnFamily(cfd->GetID());
1519     for (const auto& f : vstorage->LevelFiles(level)) {
1520       edit.DeleteFile(level, f->fd.GetNumber());
1521       edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1522                    f->fd.GetFileSize(), f->smallest, f->largest,
1523                    f->fd.smallest_seqno, f->fd.largest_seqno,
1524                    f->marked_for_compaction, f->oldest_blob_file_number,
1525                    f->oldest_ancester_time, f->file_creation_time,
1526                    f->file_checksum, f->file_checksum_func_name);
1527     }
1528     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1529                     "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1530                     edit.DebugString().data());
1531 
1532     Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit,
1533                                            &mutex_, directories_.GetDbDir());
1534 
1535     InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
1536 
1537     ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1538                     cfd->GetName().c_str(), status.ToString().data());
1539 
1540     if (status.ok()) {
1541       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1542                       "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1543                       cfd->current()->DebugString().data());
1544     }
1545     sv_context.Clean();
1546     refitting_level_ = false;
1547 
1548     return status;
1549   }
1550 
1551   refitting_level_ = false;
1552   return Status::OK();
1553 }
1554 
NumberLevels(ColumnFamilyHandle * column_family)1555 int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1556   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1557   return cfh->cfd()->NumberLevels();
1558 }
1559 
MaxMemCompactionLevel(ColumnFamilyHandle *)1560 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
1561   return 0;
1562 }
1563 
Level0StopWriteTrigger(ColumnFamilyHandle * column_family)1564 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1565   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1566   InstrumentedMutexLock l(&mutex_);
1567   return cfh->cfd()
1568       ->GetSuperVersion()
1569       ->mutable_cf_options.level0_stop_writes_trigger;
1570 }
1571 
Flush(const FlushOptions & flush_options,ColumnFamilyHandle * column_family)1572 Status DBImpl::Flush(const FlushOptions& flush_options,
1573                      ColumnFamilyHandle* column_family) {
1574   auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1575   ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1576                  cfh->GetName().c_str());
1577   Status s;
1578   if (immutable_db_options_.atomic_flush) {
1579     s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1580                              FlushReason::kManualFlush);
1581   } else {
1582     s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1583   }
1584 
1585   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1586                  "[%s] Manual flush finished, status: %s\n",
1587                  cfh->GetName().c_str(), s.ToString().c_str());
1588   return s;
1589 }
1590 
Flush(const FlushOptions & flush_options,const std::vector<ColumnFamilyHandle * > & column_families)1591 Status DBImpl::Flush(const FlushOptions& flush_options,
1592                      const std::vector<ColumnFamilyHandle*>& column_families) {
1593   Status s;
1594   if (!immutable_db_options_.atomic_flush) {
1595     for (auto cfh : column_families) {
1596       s = Flush(flush_options, cfh);
1597       if (!s.ok()) {
1598         break;
1599       }
1600     }
1601   } else {
1602     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1603                    "Manual atomic flush start.\n"
1604                    "=====Column families:=====");
1605     for (auto cfh : column_families) {
1606       auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1607       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1608                      cfhi->GetName().c_str());
1609     }
1610     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1611                    "=====End of column families list=====");
1612     autovector<ColumnFamilyData*> cfds;
1613     std::for_each(column_families.begin(), column_families.end(),
1614                   [&cfds](ColumnFamilyHandle* elem) {
1615                     auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1616                     cfds.emplace_back(cfh->cfd());
1617                   });
1618     s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1619     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1620                    "Manual atomic flush finished, status: %s\n"
1621                    "=====Column families:=====",
1622                    s.ToString().c_str());
1623     for (auto cfh : column_families) {
1624       auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1625       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1626                      cfhi->GetName().c_str());
1627     }
1628     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1629                    "=====End of column families list=====");
1630   }
1631   return s;
1632 }
1633 
RunManualCompaction(ColumnFamilyData * cfd,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const Slice * begin,const Slice * end,bool exclusive,bool disallow_trivial_move,uint64_t max_file_num_to_ignore)1634 Status DBImpl::RunManualCompaction(
1635     ColumnFamilyData* cfd, int input_level, int output_level,
1636     const CompactRangeOptions& compact_range_options, const Slice* begin,
1637     const Slice* end, bool exclusive, bool disallow_trivial_move,
1638     uint64_t max_file_num_to_ignore) {
1639   assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1640          input_level >= 0);
1641 
1642   InternalKey begin_storage, end_storage;
1643   CompactionArg* ca;
1644 
1645   bool scheduled = false;
1646   bool manual_conflict = false;
1647   ManualCompactionState manual;
1648   manual.cfd = cfd;
1649   manual.input_level = input_level;
1650   manual.output_level = output_level;
1651   manual.output_path_id = compact_range_options.target_path_id;
1652   manual.done = false;
1653   manual.in_progress = false;
1654   manual.incomplete = false;
1655   manual.exclusive = exclusive;
1656   manual.disallow_trivial_move = disallow_trivial_move;
1657   // For universal compaction, we enforce every manual compaction to compact
1658   // all files.
1659   if (begin == nullptr ||
1660       cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1661       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1662     manual.begin = nullptr;
1663   } else {
1664     begin_storage.SetMinPossibleForUserKey(*begin);
1665     manual.begin = &begin_storage;
1666   }
1667   if (end == nullptr ||
1668       cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1669       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1670     manual.end = nullptr;
1671   } else {
1672     end_storage.SetMaxPossibleForUserKey(*end);
1673     manual.end = &end_storage;
1674   }
1675 
1676   TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1677   TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1678   InstrumentedMutexLock l(&mutex_);
1679 
1680   // When a manual compaction arrives, temporarily disable scheduling of
1681   // non-manual compactions and wait until the number of scheduled compaction
1682   // jobs drops to zero. This is needed to ensure that this manual compaction
1683   // can compact any range of keys/files.
1684   //
1685   // HasPendingManualCompaction() is true when at least one thread is inside
1686   // RunManualCompaction(), i.e. during that time no other compaction will
1687   // get scheduled (see MaybeScheduleFlushOrCompaction).
1688   //
1689   // Note that the following loop doesn't stop more that one thread calling
1690   // RunManualCompaction() from getting to the second while loop below.
1691   // However, only one of them will actually schedule compaction, while
1692   // others will wait on a condition variable until it completes.
1693 
1694   AddManualCompaction(&manual);
1695   TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1696   if (exclusive) {
1697     while (bg_bottom_compaction_scheduled_ > 0 ||
1698            bg_compaction_scheduled_ > 0) {
1699       TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1700       ROCKS_LOG_INFO(
1701           immutable_db_options_.info_log,
1702           "[%s] Manual compaction waiting for all other scheduled background "
1703           "compactions to finish",
1704           cfd->GetName().c_str());
1705       bg_cv_.Wait();
1706     }
1707   }
1708 
1709   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1710                  "[%s] Manual compaction starting", cfd->GetName().c_str());
1711 
1712   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1713                        immutable_db_options_.info_log.get());
1714   // We don't check bg_error_ here, because if we get the error in compaction,
1715   // the compaction will set manual.status to bg_error_ and set manual.done to
1716   // true.
1717   while (!manual.done) {
1718     assert(HasPendingManualCompaction());
1719     manual_conflict = false;
1720     Compaction* compaction = nullptr;
1721     if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1722         scheduled ||
1723         (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1724          ((compaction = manual.cfd->CompactRange(
1725                *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
1726                manual.input_level, manual.output_level, compact_range_options,
1727                manual.begin, manual.end, &manual.manual_end, &manual_conflict,
1728                max_file_num_to_ignore)) == nullptr &&
1729           manual_conflict))) {
1730       // exclusive manual compactions should not see a conflict during
1731       // CompactRange
1732       assert(!exclusive || !manual_conflict);
1733       // Running either this or some other manual compaction
1734       bg_cv_.Wait();
1735       if (scheduled && manual.incomplete == true) {
1736         assert(!manual.in_progress);
1737         scheduled = false;
1738         manual.incomplete = false;
1739       }
1740     } else if (!scheduled) {
1741       if (compaction == nullptr) {
1742         manual.done = true;
1743         bg_cv_.SignalAll();
1744         continue;
1745       }
1746       ca = new CompactionArg;
1747       ca->db = this;
1748       ca->compaction_pri_ = Env::Priority::LOW;
1749       ca->prepicked_compaction = new PrepickedCompaction;
1750       ca->prepicked_compaction->manual_compaction_state = &manual;
1751       ca->prepicked_compaction->compaction = compaction;
1752       if (!RequestCompactionToken(
1753               cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1754         // Don't throttle manual compaction, only count outstanding tasks.
1755         assert(false);
1756       }
1757       manual.incomplete = false;
1758       bg_compaction_scheduled_++;
1759       Env::Priority thread_pool_pri = Env::Priority::LOW;
1760       if (compaction->bottommost_level() &&
1761           env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
1762         thread_pool_pri = Env::Priority::BOTTOM;
1763       }
1764       env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
1765                      &DBImpl::UnscheduleCompactionCallback);
1766       scheduled = true;
1767     }
1768   }
1769 
1770   log_buffer.FlushBufferToLog();
1771   assert(!manual.in_progress);
1772   assert(HasPendingManualCompaction());
1773   RemoveManualCompaction(&manual);
1774   bg_cv_.SignalAll();
1775   return manual.status;
1776 }
1777 
GenerateFlushRequest(const autovector<ColumnFamilyData * > & cfds,FlushRequest * req)1778 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1779                                   FlushRequest* req) {
1780   assert(req != nullptr);
1781   req->reserve(cfds.size());
1782   for (const auto cfd : cfds) {
1783     if (nullptr == cfd) {
1784       // cfd may be null, see DBImpl::ScheduleFlushes
1785       continue;
1786     }
1787     uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1788     req->emplace_back(cfd, max_memtable_id);
1789   }
1790 }
1791 
FlushMemTable(ColumnFamilyData * cfd,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1792 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1793                              const FlushOptions& flush_options,
1794                              FlushReason flush_reason, bool writes_stopped) {
1795   // This method should not be called if atomic_flush is true.
1796   assert(!immutable_db_options_.atomic_flush);
1797   Status s;
1798   if (!flush_options.allow_write_stall) {
1799     bool flush_needed = true;
1800     s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1801     TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1802     if (!s.ok() || !flush_needed) {
1803       return s;
1804     }
1805   }
1806 
1807   autovector<FlushRequest> flush_reqs;
1808   autovector<uint64_t> memtable_ids_to_wait;
1809   {
1810     WriteContext context;
1811     InstrumentedMutexLock guard_lock(&mutex_);
1812 
1813     WriteThread::Writer w;
1814     WriteThread::Writer nonmem_w;
1815     if (!writes_stopped) {
1816       write_thread_.EnterUnbatched(&w, &mutex_);
1817       if (two_write_queues_) {
1818         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1819       }
1820     }
1821     WaitForPendingWrites();
1822 
1823     if (flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
1824         (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) {
1825       // Note that, when flush reason is kErrorRecoveryRetryFlush, during the
1826       // auto retry resume, we want to avoid creating new small memtables.
1827       // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl
1828       // will iterate through all the CFs and call FlushMemtable during auto
1829       // retry resume, it is possible that in some CFs,
1830       // cfd->imm()->NumNotFlushed() = 0. In this case, so no flush request will
1831       // be created and scheduled, status::OK() will be returned.
1832       s = SwitchMemtable(cfd, &context);
1833     }
1834     const uint64_t flush_memtable_id = port::kMaxUint64;
1835     if (s.ok()) {
1836       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1837           !cached_recoverable_state_empty_.load()) {
1838         FlushRequest req{{cfd, flush_memtable_id}};
1839         flush_reqs.emplace_back(std::move(req));
1840         memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
1841       }
1842       if (immutable_db_options_.persist_stats_to_disk &&
1843           flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
1844         ColumnFamilyData* cfd_stats =
1845             versions_->GetColumnFamilySet()->GetColumnFamily(
1846                 kPersistentStatsColumnFamilyName);
1847         if (cfd_stats != nullptr && cfd_stats != cfd &&
1848             !cfd_stats->mem()->IsEmpty()) {
1849           // only force flush stats CF when it will be the only CF lagging
1850           // behind after the current flush
1851           bool stats_cf_flush_needed = true;
1852           for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1853             if (loop_cfd == cfd_stats || loop_cfd == cfd) {
1854               continue;
1855             }
1856             if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1857               stats_cf_flush_needed = false;
1858             }
1859           }
1860           if (stats_cf_flush_needed) {
1861             ROCKS_LOG_INFO(immutable_db_options_.info_log,
1862                            "Force flushing stats CF with manual flush of %s "
1863                            "to avoid holding old logs",
1864                            cfd->GetName().c_str());
1865             s = SwitchMemtable(cfd_stats, &context);
1866             FlushRequest req{{cfd_stats, flush_memtable_id}};
1867             flush_reqs.emplace_back(std::move(req));
1868             memtable_ids_to_wait.emplace_back(
1869                 cfd->imm()->GetLatestMemTableID());
1870           }
1871         }
1872       }
1873     }
1874 
1875     if (s.ok() && !flush_reqs.empty()) {
1876       for (const auto& req : flush_reqs) {
1877         assert(req.size() == 1);
1878         ColumnFamilyData* loop_cfd = req[0].first;
1879         loop_cfd->imm()->FlushRequested();
1880       }
1881       // If the caller wants to wait for this flush to complete, it indicates
1882       // that the caller expects the ColumnFamilyData not to be free'ed by
1883       // other threads which may drop the column family concurrently.
1884       // Therefore, we increase the cfd's ref count.
1885       if (flush_options.wait) {
1886         for (const auto& req : flush_reqs) {
1887           assert(req.size() == 1);
1888           ColumnFamilyData* loop_cfd = req[0].first;
1889           loop_cfd->Ref();
1890         }
1891       }
1892       for (const auto& req : flush_reqs) {
1893         SchedulePendingFlush(req, flush_reason);
1894       }
1895       MaybeScheduleFlushOrCompaction();
1896     }
1897 
1898     if (!writes_stopped) {
1899       write_thread_.ExitUnbatched(&w);
1900       if (two_write_queues_) {
1901         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1902       }
1903     }
1904   }
1905   TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1906   TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1907   if (s.ok() && flush_options.wait) {
1908     autovector<ColumnFamilyData*> cfds;
1909     autovector<const uint64_t*> flush_memtable_ids;
1910     assert(flush_reqs.size() == memtable_ids_to_wait.size());
1911     for (size_t i = 0; i < flush_reqs.size(); ++i) {
1912       assert(flush_reqs[i].size() == 1);
1913       cfds.push_back(flush_reqs[i][0].first);
1914       flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
1915     }
1916     s = WaitForFlushMemTables(
1917         cfds, flush_memtable_ids,
1918         (flush_reason == FlushReason::kErrorRecovery ||
1919          flush_reason == FlushReason::kErrorRecoveryRetryFlush));
1920     InstrumentedMutexLock lock_guard(&mutex_);
1921     for (auto* tmp_cfd : cfds) {
1922       tmp_cfd->UnrefAndTryDelete();
1923     }
1924   }
1925   TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1926   return s;
1927 }
1928 
1929 // Flush all elements in 'column_family_datas'
1930 // and atomically record the result to the MANIFEST.
AtomicFlushMemTables(const autovector<ColumnFamilyData * > & column_family_datas,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1931 Status DBImpl::AtomicFlushMemTables(
1932     const autovector<ColumnFamilyData*>& column_family_datas,
1933     const FlushOptions& flush_options, FlushReason flush_reason,
1934     bool writes_stopped) {
1935   Status s;
1936   if (!flush_options.allow_write_stall) {
1937     int num_cfs_to_flush = 0;
1938     for (auto cfd : column_family_datas) {
1939       bool flush_needed = true;
1940       s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1941       if (!s.ok()) {
1942         return s;
1943       } else if (flush_needed) {
1944         ++num_cfs_to_flush;
1945       }
1946     }
1947     if (0 == num_cfs_to_flush) {
1948       return s;
1949     }
1950   }
1951   FlushRequest flush_req;
1952   autovector<ColumnFamilyData*> cfds;
1953   {
1954     WriteContext context;
1955     InstrumentedMutexLock guard_lock(&mutex_);
1956 
1957     WriteThread::Writer w;
1958     WriteThread::Writer nonmem_w;
1959     if (!writes_stopped) {
1960       write_thread_.EnterUnbatched(&w, &mutex_);
1961       if (two_write_queues_) {
1962         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1963       }
1964     }
1965     WaitForPendingWrites();
1966 
1967     for (auto cfd : column_family_datas) {
1968       if (cfd->IsDropped()) {
1969         continue;
1970       }
1971       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1972           !cached_recoverable_state_empty_.load()) {
1973         cfds.emplace_back(cfd);
1974       }
1975     }
1976     for (auto cfd : cfds) {
1977       if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
1978           flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
1979         continue;
1980       }
1981       cfd->Ref();
1982       s = SwitchMemtable(cfd, &context);
1983       cfd->UnrefAndTryDelete();
1984       if (!s.ok()) {
1985         break;
1986       }
1987     }
1988     if (s.ok()) {
1989       AssignAtomicFlushSeq(cfds);
1990       for (auto cfd : cfds) {
1991         cfd->imm()->FlushRequested();
1992       }
1993       // If the caller wants to wait for this flush to complete, it indicates
1994       // that the caller expects the ColumnFamilyData not to be free'ed by
1995       // other threads which may drop the column family concurrently.
1996       // Therefore, we increase the cfd's ref count.
1997       if (flush_options.wait) {
1998         for (auto cfd : cfds) {
1999           cfd->Ref();
2000         }
2001       }
2002       GenerateFlushRequest(cfds, &flush_req);
2003       SchedulePendingFlush(flush_req, flush_reason);
2004       MaybeScheduleFlushOrCompaction();
2005     }
2006 
2007     if (!writes_stopped) {
2008       write_thread_.ExitUnbatched(&w);
2009       if (two_write_queues_) {
2010         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
2011       }
2012     }
2013   }
2014   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
2015   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
2016   if (s.ok() && flush_options.wait) {
2017     autovector<const uint64_t*> flush_memtable_ids;
2018     for (auto& iter : flush_req) {
2019       flush_memtable_ids.push_back(&(iter.second));
2020     }
2021     s = WaitForFlushMemTables(
2022         cfds, flush_memtable_ids,
2023         (flush_reason == FlushReason::kErrorRecovery ||
2024          flush_reason == FlushReason::kErrorRecoveryRetryFlush));
2025     InstrumentedMutexLock lock_guard(&mutex_);
2026     for (auto* cfd : cfds) {
2027       cfd->UnrefAndTryDelete();
2028     }
2029   }
2030   return s;
2031 }
2032 
2033 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
2034 // cause write stall, for example if one memtable is being flushed already.
2035 // This method tries to avoid write stall (similar to CompactRange() behavior)
2036 // it emulates how the SuperVersion / LSM would change if flush happens, checks
2037 // it against various constrains and delays flush if it'd cause write stall.
2038 // Called should check status and flush_needed to see if flush already happened.
WaitUntilFlushWouldNotStallWrites(ColumnFamilyData * cfd,bool * flush_needed)2039 Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
2040                                                  bool* flush_needed) {
2041   {
2042     *flush_needed = true;
2043     InstrumentedMutexLock l(&mutex_);
2044     uint64_t orig_active_memtable_id = cfd->mem()->GetID();
2045     WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
2046     do {
2047       if (write_stall_condition != WriteStallCondition::kNormal) {
2048         // Same error handling as user writes: Don't wait if there's a
2049         // background error, even if it's a soft error. We might wait here
2050         // indefinitely as the pending flushes/compactions may never finish
2051         // successfully, resulting in the stall condition lasting indefinitely
2052         if (error_handler_.IsBGWorkStopped()) {
2053           return error_handler_.GetBGError();
2054         }
2055 
2056         TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
2057         ROCKS_LOG_INFO(immutable_db_options_.info_log,
2058                        "[%s] WaitUntilFlushWouldNotStallWrites"
2059                        " waiting on stall conditions to clear",
2060                        cfd->GetName().c_str());
2061         bg_cv_.Wait();
2062       }
2063       if (cfd->IsDropped()) {
2064         return Status::ColumnFamilyDropped();
2065       }
2066       if (shutting_down_.load(std::memory_order_acquire)) {
2067         return Status::ShutdownInProgress();
2068       }
2069 
2070       uint64_t earliest_memtable_id =
2071           std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
2072       if (earliest_memtable_id > orig_active_memtable_id) {
2073         // We waited so long that the memtable we were originally waiting on was
2074         // flushed.
2075         *flush_needed = false;
2076         return Status::OK();
2077       }
2078 
2079       const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
2080       const auto* vstorage = cfd->current()->storage_info();
2081 
2082       // Skip stalling check if we're below auto-flush and auto-compaction
2083       // triggers. If it stalled in these conditions, that'd mean the stall
2084       // triggers are so low that stalling is needed for any background work. In
2085       // that case we shouldn't wait since background work won't be scheduled.
2086       if (cfd->imm()->NumNotFlushed() <
2087               cfd->ioptions()->min_write_buffer_number_to_merge &&
2088           vstorage->l0_delay_trigger_count() <
2089               mutable_cf_options.level0_file_num_compaction_trigger) {
2090         break;
2091       }
2092 
2093       // check whether one extra immutable memtable or an extra L0 file would
2094       // cause write stalling mode to be entered. It could still enter stall
2095       // mode due to pending compaction bytes, but that's less common
2096       write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause(
2097                                   cfd->imm()->NumNotFlushed() + 1,
2098                                   vstorage->l0_delay_trigger_count() + 1,
2099                                   vstorage->estimated_compaction_needed_bytes(),
2100                                   mutable_cf_options, *cfd->ioptions())
2101                                   .first;
2102     } while (write_stall_condition != WriteStallCondition::kNormal);
2103   }
2104   return Status::OK();
2105 }
2106 
2107 // Wait for memtables to be flushed for multiple column families.
2108 // let N = cfds.size()
2109 // for i in [0, N),
2110 //  1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
2111 //     have to be flushed for THIS column family;
2112 //  2) if flush_memtable_ids[i] is null, then all memtables in THIS column
2113 //     family have to be flushed.
2114 // Finish waiting when ALL column families finish flushing memtables.
2115 // resuming_from_bg_err indicates whether the caller is trying to resume from
2116 // background error or in normal processing.
WaitForFlushMemTables(const autovector<ColumnFamilyData * > & cfds,const autovector<const uint64_t * > & flush_memtable_ids,bool resuming_from_bg_err)2117 Status DBImpl::WaitForFlushMemTables(
2118     const autovector<ColumnFamilyData*>& cfds,
2119     const autovector<const uint64_t*>& flush_memtable_ids,
2120     bool resuming_from_bg_err) {
2121   int num = static_cast<int>(cfds.size());
2122   // Wait until the compaction completes
2123   InstrumentedMutexLock l(&mutex_);
2124   // If the caller is trying to resume from bg error, then
2125   // error_handler_.IsDBStopped() is true.
2126   while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
2127     if (shutting_down_.load(std::memory_order_acquire)) {
2128       return Status::ShutdownInProgress();
2129     }
2130     // If an error has occurred during resumption, then no need to wait.
2131     if (!error_handler_.GetRecoveryError().ok()) {
2132       break;
2133     }
2134     // If BGWorkStopped, which indicate that there is a BG error and
2135     // 1) soft error but requires no BG work, 2) no in auto_recovery_
2136     if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() &&
2137         error_handler_.GetBGError().severity() < Status::Severity::kHardError) {
2138       return error_handler_.GetBGError();
2139     }
2140 
2141     // Number of column families that have been dropped.
2142     int num_dropped = 0;
2143     // Number of column families that have finished flush.
2144     int num_finished = 0;
2145     for (int i = 0; i < num; ++i) {
2146       if (cfds[i]->IsDropped()) {
2147         ++num_dropped;
2148       } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
2149                  (flush_memtable_ids[i] != nullptr &&
2150                   cfds[i]->imm()->GetEarliestMemTableID() >
2151                       *flush_memtable_ids[i])) {
2152         ++num_finished;
2153       }
2154     }
2155     if (1 == num_dropped && 1 == num) {
2156       return Status::ColumnFamilyDropped();
2157     }
2158     // Column families involved in this flush request have either been dropped
2159     // or finished flush. Then it's time to finish waiting.
2160     if (num_dropped + num_finished == num) {
2161       break;
2162     }
2163     bg_cv_.Wait();
2164   }
2165   Status s;
2166   // If not resuming from bg error, and an error has caused the DB to stop,
2167   // then report the bg error to caller.
2168   if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
2169     s = error_handler_.GetBGError();
2170   }
2171   return s;
2172 }
2173 
EnableAutoCompaction(const std::vector<ColumnFamilyHandle * > & column_family_handles)2174 Status DBImpl::EnableAutoCompaction(
2175     const std::vector<ColumnFamilyHandle*>& column_family_handles) {
2176   Status s;
2177   for (auto cf_ptr : column_family_handles) {
2178     Status status =
2179         this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
2180     if (!status.ok()) {
2181       s = status;
2182     }
2183   }
2184 
2185   return s;
2186 }
2187 
DisableManualCompaction()2188 void DBImpl::DisableManualCompaction() {
2189   InstrumentedMutexLock l(&mutex_);
2190   manual_compaction_paused_.fetch_add(1, std::memory_order_release);
2191   // Wait for any pending manual compactions to finish (typically through
2192   // failing with `Status::Incomplete`) prior to returning. This way we are
2193   // guaranteed no pending manual compaction will commit while manual
2194   // compactions are "disabled".
2195   while (HasPendingManualCompaction()) {
2196     bg_cv_.Wait();
2197   }
2198 }
2199 
EnableManualCompaction()2200 void DBImpl::EnableManualCompaction() {
2201   InstrumentedMutexLock l(&mutex_);
2202   assert(manual_compaction_paused_ > 0);
2203   manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
2204 }
2205 
MaybeScheduleFlushOrCompaction()2206 void DBImpl::MaybeScheduleFlushOrCompaction() {
2207   mutex_.AssertHeld();
2208   if (!opened_successfully_) {
2209     // Compaction may introduce data race to DB open
2210     return;
2211   }
2212   if (bg_work_paused_ > 0) {
2213     // we paused the background work
2214     return;
2215   } else if (error_handler_.IsBGWorkStopped() &&
2216              !error_handler_.IsRecoveryInProgress()) {
2217     // There has been a hard error and this call is not part of the recovery
2218     // sequence. Bail out here so we don't get into an endless loop of
2219     // scheduling BG work which will again call this function
2220     return;
2221   } else if (shutting_down_.load(std::memory_order_acquire)) {
2222     // DB is being deleted; no more background compactions
2223     return;
2224   }
2225   auto bg_job_limits = GetBGJobLimits();
2226   bool is_flush_pool_empty =
2227       env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
2228   while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
2229          bg_flush_scheduled_ < bg_job_limits.max_flushes) {
2230     bg_flush_scheduled_++;
2231     FlushThreadArg* fta = new FlushThreadArg;
2232     fta->db_ = this;
2233     fta->thread_pri_ = Env::Priority::HIGH;
2234     env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
2235                    &DBImpl::UnscheduleFlushCallback);
2236     --unscheduled_flushes_;
2237     TEST_SYNC_POINT_CALLBACK(
2238         "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
2239         &unscheduled_flushes_);
2240   }
2241 
2242   // special case -- if high-pri (flush) thread pool is empty, then schedule
2243   // flushes in low-pri (compaction) thread pool.
2244   if (is_flush_pool_empty) {
2245     while (unscheduled_flushes_ > 0 &&
2246            bg_flush_scheduled_ + bg_compaction_scheduled_ <
2247                bg_job_limits.max_flushes) {
2248       bg_flush_scheduled_++;
2249       FlushThreadArg* fta = new FlushThreadArg;
2250       fta->db_ = this;
2251       fta->thread_pri_ = Env::Priority::LOW;
2252       env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
2253                      &DBImpl::UnscheduleFlushCallback);
2254       --unscheduled_flushes_;
2255     }
2256   }
2257 
2258   if (bg_compaction_paused_ > 0) {
2259     // we paused the background compaction
2260     return;
2261   } else if (error_handler_.IsBGWorkStopped()) {
2262     // Compaction is not part of the recovery sequence from a hard error. We
2263     // might get here because recovery might do a flush and install a new
2264     // super version, which will try to schedule pending compactions. Bail
2265     // out here and let the higher level recovery handle compactions
2266     return;
2267   }
2268 
2269   if (HasExclusiveManualCompaction()) {
2270     // only manual compactions are allowed to run. don't schedule automatic
2271     // compactions
2272     TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
2273     return;
2274   }
2275 
2276   while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
2277          unscheduled_compactions_ > 0) {
2278     CompactionArg* ca = new CompactionArg;
2279     ca->db = this;
2280     ca->compaction_pri_ = Env::Priority::LOW;
2281     ca->prepicked_compaction = nullptr;
2282     bg_compaction_scheduled_++;
2283     unscheduled_compactions_--;
2284     env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
2285                    &DBImpl::UnscheduleCompactionCallback);
2286   }
2287 }
2288 
GetBGJobLimits() const2289 DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
2290   mutex_.AssertHeld();
2291   return GetBGJobLimits(mutable_db_options_.max_background_flushes,
2292                         mutable_db_options_.max_background_compactions,
2293                         mutable_db_options_.max_background_jobs,
2294                         write_controller_.NeedSpeedupCompaction());
2295 }
2296 
GetBGJobLimits(int max_background_flushes,int max_background_compactions,int max_background_jobs,bool parallelize_compactions)2297 DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
2298                                            int max_background_compactions,
2299                                            int max_background_jobs,
2300                                            bool parallelize_compactions) {
2301   BGJobLimits res;
2302   if (max_background_flushes == -1 && max_background_compactions == -1) {
2303     // for our first stab implementing max_background_jobs, simply allocate a
2304     // quarter of the threads to flushes.
2305     res.max_flushes = std::max(1, max_background_jobs / 4);
2306     res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
2307   } else {
2308     // compatibility code in case users haven't migrated to max_background_jobs,
2309     // which automatically computes flush/compaction limits
2310     res.max_flushes = std::max(1, max_background_flushes);
2311     res.max_compactions = std::max(1, max_background_compactions);
2312   }
2313   if (!parallelize_compactions) {
2314     // throttle background compactions until we deem necessary
2315     res.max_compactions = 1;
2316   }
2317   return res;
2318 }
2319 
AddToCompactionQueue(ColumnFamilyData * cfd)2320 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2321   assert(!cfd->queued_for_compaction());
2322   cfd->Ref();
2323   compaction_queue_.push_back(cfd);
2324   cfd->set_queued_for_compaction(true);
2325 }
2326 
PopFirstFromCompactionQueue()2327 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2328   assert(!compaction_queue_.empty());
2329   auto cfd = *compaction_queue_.begin();
2330   compaction_queue_.pop_front();
2331   assert(cfd->queued_for_compaction());
2332   cfd->set_queued_for_compaction(false);
2333   return cfd;
2334 }
2335 
PopFirstFromFlushQueue()2336 DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
2337   assert(!flush_queue_.empty());
2338   FlushRequest flush_req = flush_queue_.front();
2339   flush_queue_.pop_front();
2340   if (!immutable_db_options_.atomic_flush) {
2341     assert(flush_req.size() == 1);
2342   }
2343   for (const auto& elem : flush_req) {
2344     if (!immutable_db_options_.atomic_flush) {
2345       ColumnFamilyData* cfd = elem.first;
2346       assert(cfd);
2347       assert(cfd->queued_for_flush());
2348       cfd->set_queued_for_flush(false);
2349     }
2350   }
2351   // TODO: need to unset flush reason?
2352   return flush_req;
2353 }
2354 
PickCompactionFromQueue(std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)2355 ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2356     std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2357   assert(!compaction_queue_.empty());
2358   assert(*token == nullptr);
2359   autovector<ColumnFamilyData*> throttled_candidates;
2360   ColumnFamilyData* cfd = nullptr;
2361   while (!compaction_queue_.empty()) {
2362     auto first_cfd = *compaction_queue_.begin();
2363     compaction_queue_.pop_front();
2364     assert(first_cfd->queued_for_compaction());
2365     if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2366       throttled_candidates.push_back(first_cfd);
2367       continue;
2368     }
2369     cfd = first_cfd;
2370     cfd->set_queued_for_compaction(false);
2371     break;
2372   }
2373   // Add throttled compaction candidates back to queue in the original order.
2374   for (auto iter = throttled_candidates.rbegin();
2375        iter != throttled_candidates.rend(); ++iter) {
2376     compaction_queue_.push_front(*iter);
2377   }
2378   return cfd;
2379 }
2380 
SchedulePendingFlush(const FlushRequest & flush_req,FlushReason flush_reason)2381 void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2382                                   FlushReason flush_reason) {
2383   mutex_.AssertHeld();
2384   if (flush_req.empty()) {
2385     return;
2386   }
2387   if (!immutable_db_options_.atomic_flush) {
2388     // For the non-atomic flush case, we never schedule multiple column
2389     // families in the same flush request.
2390     assert(flush_req.size() == 1);
2391     ColumnFamilyData* cfd = flush_req[0].first;
2392     assert(cfd);
2393     if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
2394       cfd->Ref();
2395       cfd->set_queued_for_flush(true);
2396       cfd->SetFlushReason(flush_reason);
2397       ++unscheduled_flushes_;
2398       flush_queue_.push_back(flush_req);
2399     }
2400   } else {
2401     for (auto& iter : flush_req) {
2402       ColumnFamilyData* cfd = iter.first;
2403       cfd->Ref();
2404       cfd->SetFlushReason(flush_reason);
2405     }
2406     ++unscheduled_flushes_;
2407     flush_queue_.push_back(flush_req);
2408   }
2409 }
2410 
SchedulePendingCompaction(ColumnFamilyData * cfd)2411 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2412   mutex_.AssertHeld();
2413   if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
2414     AddToCompactionQueue(cfd);
2415     ++unscheduled_compactions_;
2416   }
2417 }
2418 
SchedulePendingPurge(std::string fname,std::string dir_to_sync,FileType type,uint64_t number,int job_id)2419 void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2420                                   FileType type, uint64_t number, int job_id) {
2421   mutex_.AssertHeld();
2422   PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
2423   purge_files_.insert({{number, std::move(file_info)}});
2424 }
2425 
BGWorkFlush(void * arg)2426 void DBImpl::BGWorkFlush(void* arg) {
2427   FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2428   delete reinterpret_cast<FlushThreadArg*>(arg);
2429 
2430   IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
2431   TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2432   static_cast_with_check<DBImpl>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
2433   TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2434 }
2435 
BGWorkCompaction(void * arg)2436 void DBImpl::BGWorkCompaction(void* arg) {
2437   CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2438   delete reinterpret_cast<CompactionArg*>(arg);
2439   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2440   TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2441   auto prepicked_compaction =
2442       static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2443   static_cast_with_check<DBImpl>(ca.db)->BackgroundCallCompaction(
2444       prepicked_compaction, Env::Priority::LOW);
2445   delete prepicked_compaction;
2446 }
2447 
BGWorkBottomCompaction(void * arg)2448 void DBImpl::BGWorkBottomCompaction(void* arg) {
2449   CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2450   delete static_cast<CompactionArg*>(arg);
2451   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2452   TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2453   auto* prepicked_compaction = ca.prepicked_compaction;
2454   assert(prepicked_compaction && prepicked_compaction->compaction &&
2455          !prepicked_compaction->manual_compaction_state);
2456   ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2457   delete prepicked_compaction;
2458 }
2459 
BGWorkPurge(void * db)2460 void DBImpl::BGWorkPurge(void* db) {
2461   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2462   TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2463   reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2464   TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2465 }
2466 
UnscheduleCompactionCallback(void * arg)2467 void DBImpl::UnscheduleCompactionCallback(void* arg) {
2468   CompactionArg* ca_ptr = reinterpret_cast<CompactionArg*>(arg);
2469   Env::Priority compaction_pri = ca_ptr->compaction_pri_;
2470   if (Env::Priority::BOTTOM == compaction_pri) {
2471     // Decrement bg_bottom_compaction_scheduled_ if priority is BOTTOM
2472     ca_ptr->db->bg_bottom_compaction_scheduled_--;
2473   } else if (Env::Priority::LOW == compaction_pri) {
2474     // Decrement bg_compaction_scheduled_ if priority is LOW
2475     ca_ptr->db->bg_compaction_scheduled_--;
2476   }
2477   CompactionArg ca = *(ca_ptr);
2478   delete reinterpret_cast<CompactionArg*>(arg);
2479   if (ca.prepicked_compaction != nullptr) {
2480     if (ca.prepicked_compaction->compaction != nullptr) {
2481       delete ca.prepicked_compaction->compaction;
2482     }
2483     delete ca.prepicked_compaction;
2484   }
2485   TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2486 }
2487 
UnscheduleFlushCallback(void * arg)2488 void DBImpl::UnscheduleFlushCallback(void* arg) {
2489   // Decrement bg_flush_scheduled_ in flush callback
2490   reinterpret_cast<FlushThreadArg*>(arg)->db_->bg_flush_scheduled_--;
2491   Env::Priority flush_pri = reinterpret_cast<FlushThreadArg*>(arg)->thread_pri_;
2492   if (Env::Priority::LOW == flush_pri) {
2493     TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback");
2494   } else if (Env::Priority::HIGH == flush_pri) {
2495     TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback");
2496   }
2497   delete reinterpret_cast<FlushThreadArg*>(arg);
2498   TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2499 }
2500 
BackgroundFlush(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,FlushReason * reason,Env::Priority thread_pri)2501 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2502                                LogBuffer* log_buffer, FlushReason* reason,
2503                                Env::Priority thread_pri) {
2504   mutex_.AssertHeld();
2505 
2506   Status status;
2507   *reason = FlushReason::kOthers;
2508   // If BG work is stopped due to an error, but a recovery is in progress,
2509   // that means this flush is part of the recovery. So allow it to go through
2510   if (!error_handler_.IsBGWorkStopped()) {
2511     if (shutting_down_.load(std::memory_order_acquire)) {
2512       status = Status::ShutdownInProgress();
2513     }
2514   } else if (!error_handler_.IsRecoveryInProgress()) {
2515     status = error_handler_.GetBGError();
2516   }
2517 
2518   if (!status.ok()) {
2519     return status;
2520   }
2521 
2522   autovector<BGFlushArg> bg_flush_args;
2523   std::vector<SuperVersionContext>& superversion_contexts =
2524       job_context->superversion_contexts;
2525   autovector<ColumnFamilyData*> column_families_not_to_flush;
2526   while (!flush_queue_.empty()) {
2527     // This cfd is already referenced
2528     const FlushRequest& flush_req = PopFirstFromFlushQueue();
2529     superversion_contexts.clear();
2530     superversion_contexts.reserve(flush_req.size());
2531 
2532     for (const auto& iter : flush_req) {
2533       ColumnFamilyData* cfd = iter.first;
2534       if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2535         // can't flush this CF, try next one
2536         column_families_not_to_flush.push_back(cfd);
2537         continue;
2538       }
2539       superversion_contexts.emplace_back(SuperVersionContext(true));
2540       bg_flush_args.emplace_back(cfd, iter.second,
2541                                  &(superversion_contexts.back()));
2542     }
2543     if (!bg_flush_args.empty()) {
2544       break;
2545     }
2546   }
2547 
2548   if (!bg_flush_args.empty()) {
2549     auto bg_job_limits = GetBGJobLimits();
2550     for (const auto& arg : bg_flush_args) {
2551       ColumnFamilyData* cfd = arg.cfd_;
2552       ROCKS_LOG_BUFFER(
2553           log_buffer,
2554           "Calling FlushMemTableToOutputFile with column "
2555           "family [%s], flush slots available %d, compaction slots available "
2556           "%d, "
2557           "flush slots scheduled %d, compaction slots scheduled %d",
2558           cfd->GetName().c_str(), bg_job_limits.max_flushes,
2559           bg_job_limits.max_compactions, bg_flush_scheduled_,
2560           bg_compaction_scheduled_);
2561     }
2562     status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2563                                          job_context, log_buffer, thread_pri);
2564     TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2565     // All the CFDs in the FlushReq must have the same flush reason, so just
2566     // grab the first one
2567     *reason = bg_flush_args[0].cfd_->GetFlushReason();
2568     for (auto& arg : bg_flush_args) {
2569       ColumnFamilyData* cfd = arg.cfd_;
2570       if (cfd->UnrefAndTryDelete()) {
2571         arg.cfd_ = nullptr;
2572       }
2573     }
2574   }
2575   for (auto cfd : column_families_not_to_flush) {
2576     cfd->UnrefAndTryDelete();
2577   }
2578   return status;
2579 }
2580 
BackgroundCallFlush(Env::Priority thread_pri)2581 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
2582   bool made_progress = false;
2583   JobContext job_context(next_job_id_.fetch_add(1), true);
2584 
2585   TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2586 
2587   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2588                        immutable_db_options_.info_log.get());
2589   TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
2590   TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
2591   {
2592     InstrumentedMutexLock l(&mutex_);
2593     assert(bg_flush_scheduled_);
2594     num_running_flushes_++;
2595 
2596     std::unique_ptr<std::list<uint64_t>::iterator>
2597         pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2598             CaptureCurrentFileNumberInPendingOutputs()));
2599     FlushReason reason;
2600 
2601     Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2602                                &reason, thread_pri);
2603     if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2604         reason != FlushReason::kErrorRecovery) {
2605       // Wait a little bit before retrying background flush in
2606       // case this is an environmental problem and we do not want to
2607       // chew up resources for failed flushes for the duration of
2608       // the problem.
2609       uint64_t error_cnt =
2610           default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2611       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2612       mutex_.Unlock();
2613       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2614                       "Waiting after background flush error: %s"
2615                       "Accumulated background error counts: %" PRIu64,
2616                       s.ToString().c_str(), error_cnt);
2617       log_buffer.FlushBufferToLog();
2618       LogFlush(immutable_db_options_.info_log);
2619       immutable_db_options_.clock->SleepForMicroseconds(1000000);
2620       mutex_.Lock();
2621     }
2622 
2623     TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2624     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2625 
2626     // If flush failed, we want to delete all temporary files that we might have
2627     // created. Thus, we force full scan in FindObsoleteFiles()
2628     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2629                                         !s.IsColumnFamilyDropped());
2630     // delete unnecessary files if any, this is done outside the mutex
2631     if (job_context.HaveSomethingToClean() ||
2632         job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2633       mutex_.Unlock();
2634       TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2635       // Have to flush the info logs before bg_flush_scheduled_--
2636       // because if bg_flush_scheduled_ becomes 0 and the lock is
2637       // released, the deconstructor of DB can kick in and destroy all the
2638       // states of DB so info_log might not be available after that point.
2639       // It also applies to access other states that DB owns.
2640       log_buffer.FlushBufferToLog();
2641       if (job_context.HaveSomethingToDelete()) {
2642         PurgeObsoleteFiles(job_context);
2643       }
2644       job_context.Clean();
2645       mutex_.Lock();
2646     }
2647     TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2648 
2649     assert(num_running_flushes_ > 0);
2650     num_running_flushes_--;
2651     bg_flush_scheduled_--;
2652     // See if there's more work to be done
2653     MaybeScheduleFlushOrCompaction();
2654     atomic_flush_install_cv_.SignalAll();
2655     bg_cv_.SignalAll();
2656     // IMPORTANT: there should be no code after calling SignalAll. This call may
2657     // signal the DB destructor that it's OK to proceed with destruction. In
2658     // that case, all DB variables will be dealloacated and referencing them
2659     // will cause trouble.
2660   }
2661 }
2662 
BackgroundCallCompaction(PrepickedCompaction * prepicked_compaction,Env::Priority bg_thread_pri)2663 void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2664                                       Env::Priority bg_thread_pri) {
2665   bool made_progress = false;
2666   JobContext job_context(next_job_id_.fetch_add(1), true);
2667   TEST_SYNC_POINT("BackgroundCallCompaction:0");
2668   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2669                        immutable_db_options_.info_log.get());
2670   {
2671     InstrumentedMutexLock l(&mutex_);
2672 
2673     // This call will unlock/lock the mutex to wait for current running
2674     // IngestExternalFile() calls to finish.
2675     WaitForIngestFile();
2676 
2677     num_running_compactions_++;
2678 
2679     std::unique_ptr<std::list<uint64_t>::iterator>
2680         pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2681             CaptureCurrentFileNumberInPendingOutputs()));
2682 
2683     assert((bg_thread_pri == Env::Priority::BOTTOM &&
2684             bg_bottom_compaction_scheduled_) ||
2685            (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2686     Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2687                                     prepicked_compaction, bg_thread_pri);
2688     TEST_SYNC_POINT("BackgroundCallCompaction:1");
2689     if (s.IsBusy()) {
2690       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2691       mutex_.Unlock();
2692       immutable_db_options_.clock->SleepForMicroseconds(
2693           10000);  // prevent hot loop
2694       mutex_.Lock();
2695     } else if (!s.ok() && !s.IsShutdownInProgress() &&
2696                !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
2697       // Wait a little bit before retrying background compaction in
2698       // case this is an environmental problem and we do not want to
2699       // chew up resources for failed compactions for the duration of
2700       // the problem.
2701       uint64_t error_cnt =
2702           default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2703       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2704       mutex_.Unlock();
2705       log_buffer.FlushBufferToLog();
2706       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2707                       "Waiting after background compaction error: %s, "
2708                       "Accumulated background error counts: %" PRIu64,
2709                       s.ToString().c_str(), error_cnt);
2710       LogFlush(immutable_db_options_.info_log);
2711       immutable_db_options_.clock->SleepForMicroseconds(1000000);
2712       mutex_.Lock();
2713     } else if (s.IsManualCompactionPaused()) {
2714       ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2715       assert(m);
2716       ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2717                        m->cfd->GetName().c_str(), job_context.job_id);
2718     }
2719 
2720     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2721 
2722     // If compaction failed, we want to delete all temporary files that we might
2723     // have created (they might not be all recorded in job_context in case of a
2724     // failure). Thus, we force full scan in FindObsoleteFiles()
2725     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2726                                         !s.IsManualCompactionPaused() &&
2727                                         !s.IsColumnFamilyDropped() &&
2728                                         !s.IsBusy());
2729     TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2730 
2731     // delete unnecessary files if any, this is done outside the mutex
2732     if (job_context.HaveSomethingToClean() ||
2733         job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2734       mutex_.Unlock();
2735       // Have to flush the info logs before bg_compaction_scheduled_--
2736       // because if bg_flush_scheduled_ becomes 0 and the lock is
2737       // released, the deconstructor of DB can kick in and destroy all the
2738       // states of DB so info_log might not be available after that point.
2739       // It also applies to access other states that DB owns.
2740       log_buffer.FlushBufferToLog();
2741       if (job_context.HaveSomethingToDelete()) {
2742         PurgeObsoleteFiles(job_context);
2743         TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2744       }
2745       job_context.Clean();
2746       mutex_.Lock();
2747     }
2748 
2749     assert(num_running_compactions_ > 0);
2750     num_running_compactions_--;
2751     if (bg_thread_pri == Env::Priority::LOW) {
2752       bg_compaction_scheduled_--;
2753     } else {
2754       assert(bg_thread_pri == Env::Priority::BOTTOM);
2755       bg_bottom_compaction_scheduled_--;
2756     }
2757 
2758     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2759 
2760     // See if there's more work to be done
2761     MaybeScheduleFlushOrCompaction();
2762 
2763     if (prepicked_compaction != nullptr &&
2764         prepicked_compaction->task_token != nullptr) {
2765       // Releasing task tokens affects the DB state, so must be done before we
2766       // potentially signal the DB close process to proceed below.
2767       prepicked_compaction->task_token->ReleaseOnce();
2768     }
2769 
2770     if (made_progress ||
2771         (bg_compaction_scheduled_ == 0 &&
2772          bg_bottom_compaction_scheduled_ == 0) ||
2773         HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
2774       // signal if
2775       // * made_progress -- need to wakeup DelayWrite
2776       // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2777       // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2778       // If none of this is true, there is no need to signal since nobody is
2779       // waiting for it
2780       bg_cv_.SignalAll();
2781     }
2782     // IMPORTANT: there should be no code after calling SignalAll. This call may
2783     // signal the DB destructor that it's OK to proceed with destruction. In
2784     // that case, all DB variables will be dealloacated and referencing them
2785     // will cause trouble.
2786   }
2787 }
2788 
BackgroundCompaction(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,PrepickedCompaction * prepicked_compaction,Env::Priority thread_pri)2789 Status DBImpl::BackgroundCompaction(bool* made_progress,
2790                                     JobContext* job_context,
2791                                     LogBuffer* log_buffer,
2792                                     PrepickedCompaction* prepicked_compaction,
2793                                     Env::Priority thread_pri) {
2794   ManualCompactionState* manual_compaction =
2795       prepicked_compaction == nullptr
2796           ? nullptr
2797           : prepicked_compaction->manual_compaction_state;
2798   *made_progress = false;
2799   mutex_.AssertHeld();
2800   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2801 
2802   bool is_manual = (manual_compaction != nullptr);
2803   std::unique_ptr<Compaction> c;
2804   if (prepicked_compaction != nullptr &&
2805       prepicked_compaction->compaction != nullptr) {
2806     c.reset(prepicked_compaction->compaction);
2807   }
2808   bool is_prepicked = is_manual || c;
2809 
2810   // (manual_compaction->in_progress == false);
2811   bool trivial_move_disallowed =
2812       is_manual && manual_compaction->disallow_trivial_move;
2813 
2814   CompactionJobStats compaction_job_stats;
2815   Status status;
2816   if (!error_handler_.IsBGWorkStopped()) {
2817     if (shutting_down_.load(std::memory_order_acquire)) {
2818       status = Status::ShutdownInProgress();
2819     } else if (is_manual &&
2820                manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
2821       status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2822     }
2823   } else {
2824     status = error_handler_.GetBGError();
2825     // If we get here, it means a hard error happened after this compaction
2826     // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2827     // a chance to execute. Since we didn't pop a cfd from the compaction
2828     // queue, increment unscheduled_compactions_
2829     unscheduled_compactions_++;
2830   }
2831 
2832   if (!status.ok()) {
2833     if (is_manual) {
2834       manual_compaction->status = status;
2835       manual_compaction->done = true;
2836       manual_compaction->in_progress = false;
2837       manual_compaction = nullptr;
2838     }
2839     if (c) {
2840       c->ReleaseCompactionFiles(status);
2841       c.reset();
2842     }
2843     return status;
2844   }
2845 
2846   if (is_manual) {
2847     // another thread cannot pick up the same work
2848     manual_compaction->in_progress = true;
2849   }
2850 
2851   std::unique_ptr<TaskLimiterToken> task_token;
2852 
2853   // InternalKey manual_end_storage;
2854   // InternalKey* manual_end = &manual_end_storage;
2855   bool sfm_reserved_compact_space = false;
2856   if (is_manual) {
2857     ManualCompactionState* m = manual_compaction;
2858     assert(m->in_progress);
2859     if (!c) {
2860       m->done = true;
2861       m->manual_end = nullptr;
2862       ROCKS_LOG_BUFFER(
2863           log_buffer,
2864           "[%s] Manual compaction from level-%d from %s .. "
2865           "%s; nothing to do\n",
2866           m->cfd->GetName().c_str(), m->input_level,
2867           (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
2868           (m->end ? m->end->DebugString(true).c_str() : "(end)"));
2869     } else {
2870       // First check if we have enough room to do the compaction
2871       bool enough_room = EnoughRoomForCompaction(
2872           m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2873 
2874       if (!enough_room) {
2875         // Then don't do the compaction
2876         c->ReleaseCompactionFiles(status);
2877         c.reset();
2878         // m's vars will get set properly at the end of this function,
2879         // as long as status == CompactionTooLarge
2880         status = Status::CompactionTooLarge();
2881       } else {
2882         ROCKS_LOG_BUFFER(
2883             log_buffer,
2884             "[%s] Manual compaction from level-%d to level-%d from %s .. "
2885             "%s; will stop at %s\n",
2886             m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2887             (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
2888             (m->end ? m->end->DebugString(true).c_str() : "(end)"),
2889             ((m->done || m->manual_end == nullptr)
2890                  ? "(end)"
2891                  : m->manual_end->DebugString(true).c_str()));
2892       }
2893     }
2894   } else if (!is_prepicked && !compaction_queue_.empty()) {
2895     if (HasExclusiveManualCompaction()) {
2896       // Can't compact right now, but try again later
2897       TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2898 
2899       // Stay in the compaction queue.
2900       unscheduled_compactions_++;
2901 
2902       return Status::OK();
2903     }
2904 
2905     auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2906     if (cfd == nullptr) {
2907       // Can't find any executable task from the compaction queue.
2908       // All tasks have been throttled by compaction thread limiter.
2909       ++unscheduled_compactions_;
2910       return Status::Busy();
2911     }
2912 
2913     // We unreference here because the following code will take a Ref() on
2914     // this cfd if it is going to use it (Compaction class holds a
2915     // reference).
2916     // This will all happen under a mutex so we don't have to be afraid of
2917     // somebody else deleting it.
2918     if (cfd->UnrefAndTryDelete()) {
2919       // This was the last reference of the column family, so no need to
2920       // compact.
2921       return Status::OK();
2922     }
2923 
2924     // Pick up latest mutable CF Options and use it throughout the
2925     // compaction job
2926     // Compaction makes a copy of the latest MutableCFOptions. It should be used
2927     // throughout the compaction procedure to make sure consistency. It will
2928     // eventually be installed into SuperVersion
2929     auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2930     if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2931       // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2932       // compaction is not necessary. Need to make sure mutex is held
2933       // until we make a copy in the following code
2934       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2935       c.reset(cfd->PickCompaction(*mutable_cf_options, mutable_db_options_,
2936                                   log_buffer));
2937       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2938 
2939       if (c != nullptr) {
2940         bool enough_room = EnoughRoomForCompaction(
2941             cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2942 
2943         if (!enough_room) {
2944           // Then don't do the compaction
2945           c->ReleaseCompactionFiles(status);
2946           c->column_family_data()
2947               ->current()
2948               ->storage_info()
2949               ->ComputeCompactionScore(*(c->immutable_cf_options()),
2950                                        *(c->mutable_cf_options()));
2951           AddToCompactionQueue(cfd);
2952           ++unscheduled_compactions_;
2953 
2954           c.reset();
2955           // Don't need to sleep here, because BackgroundCallCompaction
2956           // will sleep if !s.ok()
2957           status = Status::CompactionTooLarge();
2958         } else {
2959           // update statistics
2960           RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2961                             c->inputs(0)->size());
2962           // There are three things that can change compaction score:
2963           // 1) When flush or compaction finish. This case is covered by
2964           // InstallSuperVersionAndScheduleWork
2965           // 2) When MutableCFOptions changes. This case is also covered by
2966           // InstallSuperVersionAndScheduleWork, because this is when the new
2967           // options take effect.
2968           // 3) When we Pick a new compaction, we "remove" those files being
2969           // compacted from the calculation, which then influences compaction
2970           // score. Here we check if we need the new compaction even without the
2971           // files that are currently being compacted. If we need another
2972           // compaction, we might be able to execute it in parallel, so we add
2973           // it to the queue and schedule a new thread.
2974           if (cfd->NeedsCompaction()) {
2975             // Yes, we need more compactions!
2976             AddToCompactionQueue(cfd);
2977             ++unscheduled_compactions_;
2978             MaybeScheduleFlushOrCompaction();
2979           }
2980         }
2981       }
2982     }
2983   }
2984 
2985   IOStatus io_s;
2986   if (!c) {
2987     // Nothing to do
2988     ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2989   } else if (c->deletion_compaction()) {
2990     // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2991     // file if there is alive snapshot pointing to it
2992     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2993                              c->column_family_data());
2994     assert(c->num_input_files(1) == 0);
2995     assert(c->level() == 0);
2996     assert(c->column_family_data()->ioptions()->compaction_style ==
2997            kCompactionStyleFIFO);
2998 
2999     compaction_job_stats.num_input_files = c->num_input_files(0);
3000 
3001     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3002                             compaction_job_stats, job_context->job_id);
3003 
3004     for (const auto& f : *c->inputs(0)) {
3005       c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
3006     }
3007     status = versions_->LogAndApply(c->column_family_data(),
3008                                     *c->mutable_cf_options(), c->edit(),
3009                                     &mutex_, directories_.GetDbDir());
3010     io_s = versions_->io_status();
3011     InstallSuperVersionAndScheduleWork(c->column_family_data(),
3012                                        &job_context->superversion_contexts[0],
3013                                        *c->mutable_cf_options());
3014     ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
3015                      c->column_family_data()->GetName().c_str(),
3016                      c->num_input_files(0));
3017     *made_progress = true;
3018     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3019                              c->column_family_data());
3020   } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
3021     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
3022     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3023                              c->column_family_data());
3024     // Instrument for event update
3025     // TODO(yhchiang): add op details for showing trivial-move.
3026     ThreadStatusUtil::SetColumnFamily(
3027         c->column_family_data(), c->column_family_data()->ioptions()->env,
3028         immutable_db_options_.enable_thread_tracking);
3029     ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
3030 
3031     compaction_job_stats.num_input_files = c->num_input_files(0);
3032 
3033     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3034                             compaction_job_stats, job_context->job_id);
3035 
3036     // Move files to next level
3037     int32_t moved_files = 0;
3038     int64_t moved_bytes = 0;
3039     for (unsigned int l = 0; l < c->num_input_levels(); l++) {
3040       if (c->level(l) == c->output_level()) {
3041         continue;
3042       }
3043       for (size_t i = 0; i < c->num_input_files(l); i++) {
3044         FileMetaData* f = c->input(l, i);
3045         c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
3046         c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
3047                            f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
3048                            f->largest, f->fd.smallest_seqno,
3049                            f->fd.largest_seqno, f->marked_for_compaction,
3050                            f->oldest_blob_file_number, f->oldest_ancester_time,
3051                            f->file_creation_time, f->file_checksum,
3052                            f->file_checksum_func_name);
3053 
3054         ROCKS_LOG_BUFFER(
3055             log_buffer,
3056             "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
3057             c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
3058             c->output_level(), f->fd.GetFileSize());
3059         ++moved_files;
3060         moved_bytes += f->fd.GetFileSize();
3061       }
3062     }
3063 
3064     status = versions_->LogAndApply(c->column_family_data(),
3065                                     *c->mutable_cf_options(), c->edit(),
3066                                     &mutex_, directories_.GetDbDir());
3067     io_s = versions_->io_status();
3068     // Use latest MutableCFOptions
3069     InstallSuperVersionAndScheduleWork(c->column_family_data(),
3070                                        &job_context->superversion_contexts[0],
3071                                        *c->mutable_cf_options());
3072 
3073     VersionStorageInfo::LevelSummaryStorage tmp;
3074     c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
3075                                                              moved_bytes);
3076     {
3077       event_logger_.LogToBuffer(log_buffer)
3078           << "job" << job_context->job_id << "event"
3079           << "trivial_move"
3080           << "destination_level" << c->output_level() << "files" << moved_files
3081           << "total_files_size" << moved_bytes;
3082     }
3083     ROCKS_LOG_BUFFER(
3084         log_buffer,
3085         "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
3086         c->column_family_data()->GetName().c_str(), moved_files,
3087         c->output_level(), moved_bytes, status.ToString().c_str(),
3088         c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
3089     *made_progress = true;
3090 
3091     // Clear Instrument
3092     ThreadStatusUtil::ResetThreadStatus();
3093     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3094                              c->column_family_data());
3095   } else if (!is_prepicked && c->output_level() > 0 &&
3096              c->output_level() ==
3097                  c->column_family_data()
3098                      ->current()
3099                      ->storage_info()
3100                      ->MaxOutputLevel(
3101                          immutable_db_options_.allow_ingest_behind) &&
3102              env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
3103     // Forward compactions involving last level to the bottom pool if it exists,
3104     // such that compactions unlikely to contribute to write stalls can be
3105     // delayed or deprioritized.
3106     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
3107     CompactionArg* ca = new CompactionArg;
3108     ca->db = this;
3109     ca->compaction_pri_ = Env::Priority::BOTTOM;
3110     ca->prepicked_compaction = new PrepickedCompaction;
3111     ca->prepicked_compaction->compaction = c.release();
3112     ca->prepicked_compaction->manual_compaction_state = nullptr;
3113     // Transfer requested token, so it doesn't need to do it again.
3114     ca->prepicked_compaction->task_token = std::move(task_token);
3115     ++bg_bottom_compaction_scheduled_;
3116     env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
3117                    this, &DBImpl::UnscheduleCompactionCallback);
3118   } else {
3119     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3120                              c->column_family_data());
3121     int output_level __attribute__((__unused__));
3122     output_level = c->output_level();
3123     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
3124                              &output_level);
3125     std::vector<SequenceNumber> snapshot_seqs;
3126     SequenceNumber earliest_write_conflict_snapshot;
3127     SnapshotChecker* snapshot_checker;
3128     GetSnapshotContext(job_context, &snapshot_seqs,
3129                        &earliest_write_conflict_snapshot, &snapshot_checker);
3130     assert(is_snapshot_supported_ || snapshots_.empty());
3131     CompactionJob compaction_job(
3132         job_context->job_id, c.get(), immutable_db_options_,
3133         mutable_db_options_, file_options_for_compaction_, versions_.get(),
3134         &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer,
3135         directories_.GetDbDir(),
3136         GetDataDir(c->column_family_data(), c->output_path_id()),
3137         GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
3138         &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
3139         snapshot_checker, table_cache_, &event_logger_,
3140         c->mutable_cf_options()->paranoid_file_checks,
3141         c->mutable_cf_options()->report_bg_io_stats, dbname_,
3142         &compaction_job_stats, thread_pri, io_tracer_,
3143         is_manual ? &manual_compaction_paused_ : nullptr, db_id_,
3144         db_session_id_, c->column_family_data()->GetFullHistoryTsLow());
3145     compaction_job.Prepare();
3146 
3147     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3148                             compaction_job_stats, job_context->job_id);
3149     mutex_.Unlock();
3150     TEST_SYNC_POINT_CALLBACK(
3151         "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
3152     // Should handle erorr?
3153     compaction_job.Run().PermitUncheckedError();
3154     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
3155     mutex_.Lock();
3156 
3157     status = compaction_job.Install(*c->mutable_cf_options());
3158     io_s = compaction_job.io_status();
3159     if (status.ok()) {
3160       InstallSuperVersionAndScheduleWork(c->column_family_data(),
3161                                          &job_context->superversion_contexts[0],
3162                                          *c->mutable_cf_options());
3163     }
3164     *made_progress = true;
3165     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3166                              c->column_family_data());
3167   }
3168 
3169   if (status.ok() && !io_s.ok()) {
3170     status = io_s;
3171   } else {
3172     io_s.PermitUncheckedError();
3173   }
3174 
3175   if (c != nullptr) {
3176     c->ReleaseCompactionFiles(status);
3177     *made_progress = true;
3178 
3179 #ifndef ROCKSDB_LITE
3180     // Need to make sure SstFileManager does its bookkeeping
3181     auto sfm = static_cast<SstFileManagerImpl*>(
3182         immutable_db_options_.sst_file_manager.get());
3183     if (sfm && sfm_reserved_compact_space) {
3184       sfm->OnCompactionCompletion(c.get());
3185     }
3186 #endif  // ROCKSDB_LITE
3187 
3188     NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
3189                                 compaction_job_stats, job_context->job_id);
3190   }
3191 
3192   if (status.ok() || status.IsCompactionTooLarge() ||
3193       status.IsManualCompactionPaused()) {
3194     // Done
3195   } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
3196     // Ignore compaction errors found during shutting down
3197   } else {
3198     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
3199                    status.ToString().c_str());
3200     if (!io_s.ok()) {
3201       // Error while writing to MANIFEST.
3202       // In fact, versions_->io_status() can also be the result of renaming
3203       // CURRENT file. With current code, it's just difficult to tell. So just
3204       // be pessimistic and try write to a new MANIFEST.
3205       // TODO: distinguish between MANIFEST write and CURRENT renaming
3206       auto err_reason = versions_->io_status().ok()
3207                             ? BackgroundErrorReason::kCompaction
3208                             : BackgroundErrorReason::kManifestWrite;
3209       error_handler_.SetBGError(io_s, err_reason);
3210     } else {
3211       error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
3212     }
3213     if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
3214       // Put this cfd back in the compaction queue so we can retry after some
3215       // time
3216       auto cfd = c->column_family_data();
3217       assert(cfd != nullptr);
3218       // Since this compaction failed, we need to recompute the score so it
3219       // takes the original input files into account
3220       c->column_family_data()
3221           ->current()
3222           ->storage_info()
3223           ->ComputeCompactionScore(*(c->immutable_cf_options()),
3224                                    *(c->mutable_cf_options()));
3225       if (!cfd->queued_for_compaction()) {
3226         AddToCompactionQueue(cfd);
3227         ++unscheduled_compactions_;
3228       }
3229     }
3230   }
3231   // this will unref its input_version and column_family_data
3232   c.reset();
3233 
3234   if (is_manual) {
3235     ManualCompactionState* m = manual_compaction;
3236     if (!status.ok()) {
3237       m->status = status;
3238       m->done = true;
3239     }
3240     // For universal compaction:
3241     //   Because universal compaction always happens at level 0, so one
3242     //   compaction will pick up all overlapped files. No files will be
3243     //   filtered out due to size limit and left for a successive compaction.
3244     //   So we can safely conclude the current compaction.
3245     //
3246     //   Also note that, if we don't stop here, then the current compaction
3247     //   writes a new file back to level 0, which will be used in successive
3248     //   compaction. Hence the manual compaction will never finish.
3249     //
3250     // Stop the compaction if manual_end points to nullptr -- this means
3251     // that we compacted the whole range. manual_end should always point
3252     // to nullptr in case of universal compaction
3253     if (m->manual_end == nullptr) {
3254       m->done = true;
3255     }
3256     if (!m->done) {
3257       // We only compacted part of the requested range.  Update *m
3258       // to the range that is left to be compacted.
3259       // Universal and FIFO compactions should always compact the whole range
3260       assert(m->cfd->ioptions()->compaction_style !=
3261                  kCompactionStyleUniversal ||
3262              m->cfd->ioptions()->num_levels > 1);
3263       assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
3264       m->tmp_storage = *m->manual_end;
3265       m->begin = &m->tmp_storage;
3266       m->incomplete = true;
3267     }
3268     m->in_progress = false;  // not being processed anymore
3269   }
3270   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
3271   return status;
3272 }
3273 
HasPendingManualCompaction()3274 bool DBImpl::HasPendingManualCompaction() {
3275   return (!manual_compaction_dequeue_.empty());
3276 }
3277 
AddManualCompaction(DBImpl::ManualCompactionState * m)3278 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
3279   manual_compaction_dequeue_.push_back(m);
3280 }
3281 
RemoveManualCompaction(DBImpl::ManualCompactionState * m)3282 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
3283   // Remove from queue
3284   std::deque<ManualCompactionState*>::iterator it =
3285       manual_compaction_dequeue_.begin();
3286   while (it != manual_compaction_dequeue_.end()) {
3287     if (m == (*it)) {
3288       it = manual_compaction_dequeue_.erase(it);
3289       return;
3290     }
3291     ++it;
3292   }
3293   assert(false);
3294   return;
3295 }
3296 
ShouldntRunManualCompaction(ManualCompactionState * m)3297 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
3298   if (num_running_ingest_file_ > 0) {
3299     // We need to wait for other IngestExternalFile() calls to finish
3300     // before running a manual compaction.
3301     return true;
3302   }
3303   if (m->exclusive) {
3304     return (bg_bottom_compaction_scheduled_ > 0 ||
3305             bg_compaction_scheduled_ > 0);
3306   }
3307   std::deque<ManualCompactionState*>::iterator it =
3308       manual_compaction_dequeue_.begin();
3309   bool seen = false;
3310   while (it != manual_compaction_dequeue_.end()) {
3311     if (m == (*it)) {
3312       ++it;
3313       seen = true;
3314       continue;
3315     } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
3316       // Consider the other manual compaction *it, conflicts if:
3317       // overlaps with m
3318       // and (*it) is ahead in the queue and is not yet in progress
3319       return true;
3320     }
3321     ++it;
3322   }
3323   return false;
3324 }
3325 
HaveManualCompaction(ColumnFamilyData * cfd)3326 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
3327   // Remove from priority queue
3328   std::deque<ManualCompactionState*>::iterator it =
3329       manual_compaction_dequeue_.begin();
3330   while (it != manual_compaction_dequeue_.end()) {
3331     if ((*it)->exclusive) {
3332       return true;
3333     }
3334     if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
3335       // Allow automatic compaction if manual compaction is
3336       // in progress
3337       return true;
3338     }
3339     ++it;
3340   }
3341   return false;
3342 }
3343 
HasExclusiveManualCompaction()3344 bool DBImpl::HasExclusiveManualCompaction() {
3345   // Remove from priority queue
3346   std::deque<ManualCompactionState*>::iterator it =
3347       manual_compaction_dequeue_.begin();
3348   while (it != manual_compaction_dequeue_.end()) {
3349     if ((*it)->exclusive) {
3350       return true;
3351     }
3352     ++it;
3353   }
3354   return false;
3355 }
3356 
MCOverlap(ManualCompactionState * m,ManualCompactionState * m1)3357 bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
3358   if ((m->exclusive) || (m1->exclusive)) {
3359     return true;
3360   }
3361   if (m->cfd != m1->cfd) {
3362     return false;
3363   }
3364   return false;
3365 }
3366 
3367 #ifndef ROCKSDB_LITE
BuildCompactionJobInfo(const ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id,const Version * current,CompactionJobInfo * compaction_job_info) const3368 void DBImpl::BuildCompactionJobInfo(
3369     const ColumnFamilyData* cfd, Compaction* c, const Status& st,
3370     const CompactionJobStats& compaction_job_stats, const int job_id,
3371     const Version* current, CompactionJobInfo* compaction_job_info) const {
3372   assert(compaction_job_info != nullptr);
3373   compaction_job_info->cf_id = cfd->GetID();
3374   compaction_job_info->cf_name = cfd->GetName();
3375   compaction_job_info->status = st;
3376   compaction_job_info->thread_id = env_->GetThreadID();
3377   compaction_job_info->job_id = job_id;
3378   compaction_job_info->base_input_level = c->start_level();
3379   compaction_job_info->output_level = c->output_level();
3380   compaction_job_info->stats = compaction_job_stats;
3381   compaction_job_info->table_properties = c->GetOutputTableProperties();
3382   compaction_job_info->compaction_reason = c->compaction_reason();
3383   compaction_job_info->compression = c->output_compression();
3384   for (size_t i = 0; i < c->num_input_levels(); ++i) {
3385     for (const auto fmd : *c->inputs(i)) {
3386       const FileDescriptor& desc = fmd->fd;
3387       const uint64_t file_number = desc.GetNumber();
3388       auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
3389                               desc.GetPathId());
3390       compaction_job_info->input_files.push_back(fn);
3391       compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
3392           static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
3393       if (compaction_job_info->table_properties.count(fn) == 0) {
3394         std::shared_ptr<const TableProperties> tp;
3395         auto s = current->GetTableProperties(&tp, fmd, &fn);
3396         if (s.ok()) {
3397           compaction_job_info->table_properties[fn] = tp;
3398         }
3399       }
3400     }
3401   }
3402   for (const auto& newf : c->edit()->GetNewFiles()) {
3403     const FileMetaData& meta = newf.second;
3404     const FileDescriptor& desc = meta.fd;
3405     const uint64_t file_number = desc.GetNumber();
3406     compaction_job_info->output_files.push_back(TableFileName(
3407         c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
3408     compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3409         newf.first, file_number, meta.oldest_blob_file_number});
3410   }
3411 }
3412 #endif
3413 
3414 // SuperVersionContext gets created and destructed outside of the lock --
3415 // we use this conveniently to:
3416 // * malloc one SuperVersion() outside of the lock -- new_superversion
3417 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3418 //
3419 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3420 // same sv_context, we can't reuse the SuperVersion() that got
3421 // malloced because
3422 // first call already used it. In that rare case, we take a hit and create a
3423 // new SuperVersion() inside of the mutex. We do similar thing
3424 // for superversion_to_free
3425 
InstallSuperVersionAndScheduleWork(ColumnFamilyData * cfd,SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)3426 void DBImpl::InstallSuperVersionAndScheduleWork(
3427     ColumnFamilyData* cfd, SuperVersionContext* sv_context,
3428     const MutableCFOptions& mutable_cf_options) {
3429   mutex_.AssertHeld();
3430 
3431   // Update max_total_in_memory_state_
3432   size_t old_memtable_size = 0;
3433   auto* old_sv = cfd->GetSuperVersion();
3434   if (old_sv) {
3435     old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3436                         old_sv->mutable_cf_options.max_write_buffer_number;
3437   }
3438 
3439   // this branch is unlikely to step in
3440   if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3441     sv_context->NewSuperVersion();
3442   }
3443   cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
3444 
3445   // There may be a small data race here. The snapshot tricking bottommost
3446   // compaction may already be released here. But assuming there will always be
3447   // newer snapshot created and released frequently, the compaction will be
3448   // triggered soon anyway.
3449   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3450   for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
3451     bottommost_files_mark_threshold_ = std::min(
3452         bottommost_files_mark_threshold_,
3453         my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3454   }
3455 
3456   // Whenever we install new SuperVersion, we might need to issue new flushes or
3457   // compactions.
3458   SchedulePendingCompaction(cfd);
3459   MaybeScheduleFlushOrCompaction();
3460 
3461   // Update max_total_in_memory_state_
3462   max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3463                                mutable_cf_options.write_buffer_size *
3464                                    mutable_cf_options.max_write_buffer_number;
3465 }
3466 
3467 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3468 // and db mutex (mutex_) should already be held.
3469 // Actually, the current implementation of FindObsoleteFiles with
3470 // full_scan=true can issue I/O requests to obtain list of files in
3471 // directories, e.g. env_->getChildren while holding db mutex.
ShouldPurge(uint64_t file_number) const3472 bool DBImpl::ShouldPurge(uint64_t file_number) const {
3473   return files_grabbed_for_purge_.find(file_number) ==
3474              files_grabbed_for_purge_.end() &&
3475          purge_files_.find(file_number) == purge_files_.end();
3476 }
3477 
3478 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3479 // (mutex_) should already be held.
MarkAsGrabbedForPurge(uint64_t file_number)3480 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
3481   files_grabbed_for_purge_.insert(file_number);
3482 }
3483 
SetSnapshotChecker(SnapshotChecker * snapshot_checker)3484 void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3485   InstrumentedMutexLock l(&mutex_);
3486   // snapshot_checker_ should only set once. If we need to set it multiple
3487   // times, we need to make sure the old one is not deleted while it is still
3488   // using by a compaction job.
3489   assert(!snapshot_checker_);
3490   snapshot_checker_.reset(snapshot_checker);
3491 }
3492 
GetSnapshotContext(JobContext * job_context,std::vector<SequenceNumber> * snapshot_seqs,SequenceNumber * earliest_write_conflict_snapshot,SnapshotChecker ** snapshot_checker_ptr)3493 void DBImpl::GetSnapshotContext(
3494     JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3495     SequenceNumber* earliest_write_conflict_snapshot,
3496     SnapshotChecker** snapshot_checker_ptr) {
3497   mutex_.AssertHeld();
3498   assert(job_context != nullptr);
3499   assert(snapshot_seqs != nullptr);
3500   assert(earliest_write_conflict_snapshot != nullptr);
3501   assert(snapshot_checker_ptr != nullptr);
3502 
3503   *snapshot_checker_ptr = snapshot_checker_.get();
3504   if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3505     *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3506   }
3507   if (*snapshot_checker_ptr != nullptr) {
3508     // If snapshot_checker is used, that means the flush/compaction may
3509     // contain values not visible to snapshot taken after
3510     // flush/compaction job starts. Take a snapshot and it will appear
3511     // in snapshot_seqs and force compaction iterator to consider such
3512     // snapshots.
3513     const Snapshot* job_snapshot =
3514         GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3515     job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3516   }
3517   *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3518 }
3519 }  // namespace ROCKSDB_NAMESPACE
3520