1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
10 
11 #include <cinttypes>
12 
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "db/event_helpers.h"
16 #include "file/sst_file_manager_impl.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "monitoring/thread_status_updater.h"
20 #include "monitoring/thread_status_util.h"
21 #include "test_util/sync_point.h"
22 #include "util/cast_util.h"
23 #include "util/concurrent_task_limiter_impl.h"
24 
25 namespace ROCKSDB_NAMESPACE {
26 
EnoughRoomForCompaction(ColumnFamilyData * cfd,const std::vector<CompactionInputFiles> & inputs,bool * sfm_reserved_compact_space,LogBuffer * log_buffer)27 bool DBImpl::EnoughRoomForCompaction(
28     ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
29     bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
30   // Check if we have enough room to do the compaction
31   bool enough_room = true;
32 #ifndef ROCKSDB_LITE
33   auto sfm = static_cast<SstFileManagerImpl*>(
34       immutable_db_options_.sst_file_manager.get());
35   if (sfm) {
36     // Pass the current bg_error_ to SFM so it can decide what checks to
37     // perform. If this DB instance hasn't seen any error yet, the SFM can be
38     // optimistic and not do disk space checks
39     enough_room =
40         sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
41     if (enough_room) {
42       *sfm_reserved_compact_space = true;
43     }
44   }
45 #else
46   (void)cfd;
47   (void)inputs;
48   (void)sfm_reserved_compact_space;
49 #endif  // ROCKSDB_LITE
50   if (!enough_room) {
51     // Just in case tests want to change the value of enough_room
52     TEST_SYNC_POINT_CALLBACK(
53         "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
54     ROCKS_LOG_BUFFER(log_buffer,
55                      "Cancelled compaction because not enough room");
56     RecordTick(stats_, COMPACTION_CANCELLED, 1);
57   }
58   return enough_room;
59 }
60 
RequestCompactionToken(ColumnFamilyData * cfd,bool force,std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)61 bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
62                                     std::unique_ptr<TaskLimiterToken>* token,
63                                     LogBuffer* log_buffer) {
64   assert(*token == nullptr);
65   auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
66       cfd->ioptions()->compaction_thread_limiter.get());
67   if (limiter == nullptr) {
68     return true;
69   }
70   *token = limiter->GetToken(force);
71   if (*token != nullptr) {
72     ROCKS_LOG_BUFFER(log_buffer,
73                      "Thread limiter [%s] increase [%s] compaction task, "
74                      "force: %s, tasks after: %d",
75                      limiter->GetName().c_str(), cfd->GetName().c_str(),
76                      force ? "true" : "false", limiter->GetOutstandingTask());
77     return true;
78   }
79   return false;
80 }
81 
SyncClosedLogs(JobContext * job_context)82 Status DBImpl::SyncClosedLogs(JobContext* job_context) {
83   TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
84   mutex_.AssertHeld();
85   autovector<log::Writer*, 1> logs_to_sync;
86   uint64_t current_log_number = logfile_number_;
87   while (logs_.front().number < current_log_number &&
88          logs_.front().getting_synced) {
89     log_sync_cv_.Wait();
90   }
91   for (auto it = logs_.begin();
92        it != logs_.end() && it->number < current_log_number; ++it) {
93     auto& log = *it;
94     assert(!log.getting_synced);
95     log.getting_synced = true;
96     logs_to_sync.push_back(log.writer);
97   }
98 
99   Status s;
100   if (!logs_to_sync.empty()) {
101     mutex_.Unlock();
102 
103     for (log::Writer* log : logs_to_sync) {
104       ROCKS_LOG_INFO(immutable_db_options_.info_log,
105                      "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
106                      log->get_log_number());
107       s = log->file()->Sync(immutable_db_options_.use_fsync);
108       if (!s.ok()) {
109         break;
110       }
111 
112       if (immutable_db_options_.recycle_log_file_num > 0) {
113         s = log->Close();
114         if (!s.ok()) {
115           break;
116         }
117       }
118     }
119     if (s.ok()) {
120       s = directories_.GetWalDir()->Fsync();
121     }
122 
123     mutex_.Lock();
124 
125     // "number <= current_log_number - 1" is equivalent to
126     // "number < current_log_number".
127     MarkLogsSynced(current_log_number - 1, true, s);
128     if (!s.ok()) {
129       error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
130       TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
131       return s;
132     }
133   }
134   return s;
135 }
136 
FlushMemTableToOutputFile(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,bool * made_progress,JobContext * job_context,SuperVersionContext * superversion_context,std::vector<SequenceNumber> & snapshot_seqs,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,LogBuffer * log_buffer,Env::Priority thread_pri)137 Status DBImpl::FlushMemTableToOutputFile(
138     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
139     bool* made_progress, JobContext* job_context,
140     SuperVersionContext* superversion_context,
141     std::vector<SequenceNumber>& snapshot_seqs,
142     SequenceNumber earliest_write_conflict_snapshot,
143     SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
144     Env::Priority thread_pri) {
145   mutex_.AssertHeld();
146   assert(cfd->imm()->NumNotFlushed() != 0);
147   assert(cfd->imm()->IsFlushPending());
148 
149   FlushJob flush_job(
150       dbname_, cfd, immutable_db_options_, mutable_cf_options,
151       nullptr /* memtable_id */, file_options_for_compaction_, versions_.get(),
152       &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
153       snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
154       GetDataDir(cfd, 0U),
155       GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
156       &event_logger_, mutable_cf_options.report_bg_io_stats,
157       true /* sync_output_directory */, true /* write_manifest */, thread_pri);
158 
159   FileMetaData file_meta;
160 
161   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
162   flush_job.PickMemTable();
163   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
164 
165 #ifndef ROCKSDB_LITE
166   // may temporarily unlock and lock the mutex.
167   NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
168 #endif  // ROCKSDB_LITE
169 
170   Status s;
171   if (logfile_number_ > 0 &&
172       versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
173     // If there are more than one column families, we need to make sure that
174     // all the log files except the most recent one are synced. Otherwise if
175     // the host crashes after flushing and before WAL is persistent, the
176     // flushed SST may contain data from write batches whose updates to
177     // other column families are missing.
178     // SyncClosedLogs() may unlock and re-lock the db_mutex.
179     s = SyncClosedLogs(job_context);
180   } else {
181     TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
182   }
183 
184   // Within flush_job.Run, rocksdb may call event listener to notify
185   // file creation and deletion.
186   //
187   // Note that flush_job.Run will unlock and lock the db_mutex,
188   // and EventListener callback will be called when the db_mutex
189   // is unlocked by the current thread.
190   if (s.ok()) {
191     s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
192   } else {
193     flush_job.Cancel();
194   }
195 
196   if (s.ok()) {
197     InstallSuperVersionAndScheduleWork(cfd, superversion_context,
198                                        mutable_cf_options);
199     if (made_progress) {
200       *made_progress = true;
201     }
202     VersionStorageInfo::LevelSummaryStorage tmp;
203     ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
204                      cfd->GetName().c_str(),
205                      cfd->current()->storage_info()->LevelSummary(&tmp));
206   }
207 
208   if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
209     Status new_bg_error = s;
210     error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
211   }
212   if (s.ok()) {
213 #ifndef ROCKSDB_LITE
214     // may temporarily unlock and lock the mutex.
215     NotifyOnFlushCompleted(cfd, mutable_cf_options,
216                            flush_job.GetCommittedFlushJobsInfo());
217     auto sfm = static_cast<SstFileManagerImpl*>(
218         immutable_db_options_.sst_file_manager.get());
219     if (sfm) {
220       // Notify sst_file_manager that a new file was added
221       std::string file_path = MakeTableFileName(
222           cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
223       sfm->OnAddFile(file_path);
224       if (sfm->IsMaxAllowedSpaceReached()) {
225         Status new_bg_error =
226             Status::SpaceLimit("Max allowed space was reached");
227         TEST_SYNC_POINT_CALLBACK(
228             "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
229             &new_bg_error);
230         error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
231       }
232     }
233 #endif  // ROCKSDB_LITE
234   }
235   TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
236   return s;
237 }
238 
FlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)239 Status DBImpl::FlushMemTablesToOutputFiles(
240     const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
241     JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
242   if (immutable_db_options_.atomic_flush) {
243     return AtomicFlushMemTablesToOutputFiles(
244         bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
245   }
246   std::vector<SequenceNumber> snapshot_seqs;
247   SequenceNumber earliest_write_conflict_snapshot;
248   SnapshotChecker* snapshot_checker;
249   GetSnapshotContext(job_context, &snapshot_seqs,
250                      &earliest_write_conflict_snapshot, &snapshot_checker);
251   Status status;
252   for (auto& arg : bg_flush_args) {
253     ColumnFamilyData* cfd = arg.cfd_;
254     MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
255     SuperVersionContext* superversion_context = arg.superversion_context_;
256     Status s = FlushMemTableToOutputFile(
257         cfd, mutable_cf_options, made_progress, job_context,
258         superversion_context, snapshot_seqs, earliest_write_conflict_snapshot,
259         snapshot_checker, log_buffer, thread_pri);
260     if (!s.ok()) {
261       status = s;
262       if (!s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
263         // At this point, DB is not shutting down, nor is cfd dropped.
264         // Something is wrong, thus we break out of the loop.
265         break;
266       }
267     }
268   }
269   return status;
270 }
271 
272 /*
273  * Atomically flushes multiple column families.
274  *
275  * For each column family, all memtables with ID smaller than or equal to the
276  * ID specified in bg_flush_args will be flushed. Only after all column
277  * families finish flush will this function commit to MANIFEST. If any of the
278  * column families are not flushed successfully, this function does not have
279  * any side-effect on the state of the database.
280  */
AtomicFlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)281 Status DBImpl::AtomicFlushMemTablesToOutputFiles(
282     const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
283     JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
284   mutex_.AssertHeld();
285 
286   autovector<ColumnFamilyData*> cfds;
287   for (const auto& arg : bg_flush_args) {
288     cfds.emplace_back(arg.cfd_);
289   }
290 
291 #ifndef NDEBUG
292   for (const auto cfd : cfds) {
293     assert(cfd->imm()->NumNotFlushed() != 0);
294     assert(cfd->imm()->IsFlushPending());
295   }
296 #endif /* !NDEBUG */
297 
298   std::vector<SequenceNumber> snapshot_seqs;
299   SequenceNumber earliest_write_conflict_snapshot;
300   SnapshotChecker* snapshot_checker;
301   GetSnapshotContext(job_context, &snapshot_seqs,
302                      &earliest_write_conflict_snapshot, &snapshot_checker);
303 
304   autovector<Directory*> distinct_output_dirs;
305   autovector<std::string> distinct_output_dir_paths;
306   std::vector<std::unique_ptr<FlushJob>> jobs;
307   std::vector<MutableCFOptions> all_mutable_cf_options;
308   int num_cfs = static_cast<int>(cfds.size());
309   all_mutable_cf_options.reserve(num_cfs);
310   for (int i = 0; i < num_cfs; ++i) {
311     auto cfd = cfds[i];
312     Directory* data_dir = GetDataDir(cfd, 0U);
313     const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
314 
315     // Add to distinct output directories if eligible. Use linear search. Since
316     // the number of elements in the vector is not large, performance should be
317     // tolerable.
318     bool found = false;
319     for (const auto& path : distinct_output_dir_paths) {
320       if (path == curr_path) {
321         found = true;
322         break;
323       }
324     }
325     if (!found) {
326       distinct_output_dir_paths.emplace_back(curr_path);
327       distinct_output_dirs.emplace_back(data_dir);
328     }
329 
330     all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
331     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
332     const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
333     jobs.emplace_back(new FlushJob(
334         dbname_, cfd, immutable_db_options_, mutable_cf_options,
335         max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
336         &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
337         snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
338         data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
339         stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
340         false /* sync_output_directory */, false /* write_manifest */,
341         thread_pri));
342     jobs.back()->PickMemTable();
343   }
344 
345   std::vector<FileMetaData> file_meta(num_cfs);
346   Status s;
347   assert(num_cfs == static_cast<int>(jobs.size()));
348 
349 #ifndef ROCKSDB_LITE
350   for (int i = 0; i != num_cfs; ++i) {
351     const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
352     // may temporarily unlock and lock the mutex.
353     NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
354                        job_context->job_id);
355   }
356 #endif /* !ROCKSDB_LITE */
357 
358   if (logfile_number_ > 0) {
359     // TODO (yanqin) investigate whether we should sync the closed logs for
360     // single column family case.
361     s = SyncClosedLogs(job_context);
362   }
363 
364   // exec_status stores the execution status of flush_jobs as
365   // <bool /* executed */, Status /* status code */>
366   autovector<std::pair<bool, Status>> exec_status;
367   for (int i = 0; i != num_cfs; ++i) {
368     // Initially all jobs are not executed, with status OK.
369     exec_status.emplace_back(false, Status::OK());
370   }
371 
372   if (s.ok()) {
373     // TODO (yanqin): parallelize jobs with threads.
374     for (int i = 1; i != num_cfs; ++i) {
375       exec_status[i].second =
376           jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
377       exec_status[i].first = true;
378     }
379     if (num_cfs > 1) {
380       TEST_SYNC_POINT(
381           "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
382       TEST_SYNC_POINT(
383           "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
384     }
385     assert(exec_status.size() > 0);
386     assert(!file_meta.empty());
387     exec_status[0].second =
388         jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
389     exec_status[0].first = true;
390 
391     Status error_status;
392     for (const auto& e : exec_status) {
393       if (!e.second.ok()) {
394         s = e.second;
395         if (!e.second.IsShutdownInProgress() &&
396             !e.second.IsColumnFamilyDropped()) {
397           // If a flush job did not return OK, and the CF is not dropped, and
398           // the DB is not shutting down, then we have to return this result to
399           // caller later.
400           error_status = e.second;
401         }
402       }
403     }
404 
405     s = error_status.ok() ? s : error_status;
406   }
407 
408   if (s.IsColumnFamilyDropped()) {
409     s = Status::OK();
410   }
411 
412   if (s.ok() || s.IsShutdownInProgress()) {
413     // Sync on all distinct output directories.
414     for (auto dir : distinct_output_dirs) {
415       if (dir != nullptr) {
416         Status error_status = dir->Fsync();
417         if (!error_status.ok()) {
418           s = error_status;
419           break;
420         }
421       }
422     }
423   } else {
424     // Need to undo atomic flush if something went wrong, i.e. s is not OK and
425     // it is not because of CF drop.
426     // Have to cancel the flush jobs that have NOT executed because we need to
427     // unref the versions.
428     for (int i = 0; i != num_cfs; ++i) {
429       if (!exec_status[i].first) {
430         jobs[i]->Cancel();
431       }
432     }
433     for (int i = 0; i != num_cfs; ++i) {
434       if (exec_status[i].first && exec_status[i].second.ok()) {
435         auto& mems = jobs[i]->GetMemTables();
436         cfds[i]->imm()->RollbackMemtableFlush(mems,
437                                               file_meta[i].fd.GetNumber());
438       }
439     }
440   }
441 
442   if (s.ok()) {
443     auto wait_to_install_func = [&]() {
444       bool ready = true;
445       for (size_t i = 0; i != cfds.size(); ++i) {
446         const auto& mems = jobs[i]->GetMemTables();
447         if (cfds[i]->IsDropped()) {
448           // If the column family is dropped, then do not wait.
449           continue;
450         } else if (!mems.empty() &&
451                    cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
452           // If a flush job needs to install the flush result for mems and
453           // mems[0] is not the earliest memtable, it means another thread must
454           // be installing flush results for the same column family, then the
455           // current thread needs to wait.
456           ready = false;
457           break;
458         } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
459                                        bg_flush_args[i].max_memtable_id_) {
460           // If a flush job does not need to install flush results, then it has
461           // to wait until all memtables up to max_memtable_id_ (inclusive) are
462           // installed.
463           ready = false;
464           break;
465         }
466       }
467       return ready;
468     };
469 
470     bool resuming_from_bg_err = error_handler_.IsDBStopped();
471     while ((!error_handler_.IsDBStopped() ||
472             error_handler_.GetRecoveryError().ok()) &&
473            !wait_to_install_func()) {
474       atomic_flush_install_cv_.Wait();
475     }
476 
477     s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
478                              : error_handler_.GetBGError();
479   }
480 
481   if (s.ok()) {
482     autovector<ColumnFamilyData*> tmp_cfds;
483     autovector<const autovector<MemTable*>*> mems_list;
484     autovector<const MutableCFOptions*> mutable_cf_options_list;
485     autovector<FileMetaData*> tmp_file_meta;
486     for (int i = 0; i != num_cfs; ++i) {
487       const auto& mems = jobs[i]->GetMemTables();
488       if (!cfds[i]->IsDropped() && !mems.empty()) {
489         tmp_cfds.emplace_back(cfds[i]);
490         mems_list.emplace_back(&mems);
491         mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
492         tmp_file_meta.emplace_back(&file_meta[i]);
493       }
494     }
495 
496     s = InstallMemtableAtomicFlushResults(
497         nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
498         versions_.get(), &mutex_, tmp_file_meta,
499         &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
500   }
501 
502   if (s.ok()) {
503     assert(num_cfs ==
504            static_cast<int>(job_context->superversion_contexts.size()));
505     for (int i = 0; i != num_cfs; ++i) {
506       if (cfds[i]->IsDropped()) {
507         continue;
508       }
509       InstallSuperVersionAndScheduleWork(cfds[i],
510                                          &job_context->superversion_contexts[i],
511                                          all_mutable_cf_options[i]);
512       VersionStorageInfo::LevelSummaryStorage tmp;
513       ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
514                        cfds[i]->GetName().c_str(),
515                        cfds[i]->current()->storage_info()->LevelSummary(&tmp));
516     }
517     if (made_progress) {
518       *made_progress = true;
519     }
520 #ifndef ROCKSDB_LITE
521     auto sfm = static_cast<SstFileManagerImpl*>(
522         immutable_db_options_.sst_file_manager.get());
523     assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
524     for (int i = 0; i != num_cfs; ++i) {
525       if (cfds[i]->IsDropped()) {
526         continue;
527       }
528       NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
529                              jobs[i]->GetCommittedFlushJobsInfo());
530       if (sfm) {
531         std::string file_path = MakeTableFileName(
532             cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
533         sfm->OnAddFile(file_path);
534         if (sfm->IsMaxAllowedSpaceReached() &&
535             error_handler_.GetBGError().ok()) {
536           Status new_bg_error =
537               Status::SpaceLimit("Max allowed space was reached");
538           error_handler_.SetBGError(new_bg_error,
539                                     BackgroundErrorReason::kFlush);
540         }
541       }
542     }
543 #endif  // ROCKSDB_LITE
544   }
545 
546   if (!s.ok() && !s.IsShutdownInProgress()) {
547     Status new_bg_error = s;
548     error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
549   }
550 
551   return s;
552 }
553 
NotifyOnFlushBegin(ColumnFamilyData * cfd,FileMetaData * file_meta,const MutableCFOptions & mutable_cf_options,int job_id)554 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
555                                 const MutableCFOptions& mutable_cf_options,
556                                 int job_id) {
557 #ifndef ROCKSDB_LITE
558   if (immutable_db_options_.listeners.size() == 0U) {
559     return;
560   }
561   mutex_.AssertHeld();
562   if (shutting_down_.load(std::memory_order_acquire)) {
563     return;
564   }
565   bool triggered_writes_slowdown =
566       (cfd->current()->storage_info()->NumLevelFiles(0) >=
567        mutable_cf_options.level0_slowdown_writes_trigger);
568   bool triggered_writes_stop =
569       (cfd->current()->storage_info()->NumLevelFiles(0) >=
570        mutable_cf_options.level0_stop_writes_trigger);
571   // release lock while notifying events
572   mutex_.Unlock();
573   {
574     FlushJobInfo info{};
575     info.cf_id = cfd->GetID();
576     info.cf_name = cfd->GetName();
577     // TODO(yhchiang): make db_paths dynamic in case flush does not
578     //                 go to L0 in the future.
579     const uint64_t file_number = file_meta->fd.GetNumber();
580     info.file_path =
581         MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
582     info.file_number = file_number;
583     info.thread_id = env_->GetThreadID();
584     info.job_id = job_id;
585     info.triggered_writes_slowdown = triggered_writes_slowdown;
586     info.triggered_writes_stop = triggered_writes_stop;
587     info.smallest_seqno = file_meta->fd.smallest_seqno;
588     info.largest_seqno = file_meta->fd.largest_seqno;
589     info.flush_reason = cfd->GetFlushReason();
590     for (auto listener : immutable_db_options_.listeners) {
591       listener->OnFlushBegin(this, info);
592     }
593   }
594   mutex_.Lock();
595 // no need to signal bg_cv_ as it will be signaled at the end of the
596 // flush process.
597 #else
598   (void)cfd;
599   (void)file_meta;
600   (void)mutable_cf_options;
601   (void)job_id;
602 #endif  // ROCKSDB_LITE
603 }
604 
NotifyOnFlushCompleted(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,std::list<std::unique_ptr<FlushJobInfo>> * flush_jobs_info)605 void DBImpl::NotifyOnFlushCompleted(
606     ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
607     std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
608 #ifndef ROCKSDB_LITE
609   assert(flush_jobs_info != nullptr);
610   if (immutable_db_options_.listeners.size() == 0U) {
611     return;
612   }
613   mutex_.AssertHeld();
614   if (shutting_down_.load(std::memory_order_acquire)) {
615     return;
616   }
617   bool triggered_writes_slowdown =
618       (cfd->current()->storage_info()->NumLevelFiles(0) >=
619        mutable_cf_options.level0_slowdown_writes_trigger);
620   bool triggered_writes_stop =
621       (cfd->current()->storage_info()->NumLevelFiles(0) >=
622        mutable_cf_options.level0_stop_writes_trigger);
623   // release lock while notifying events
624   mutex_.Unlock();
625   {
626     for (auto& info : *flush_jobs_info) {
627       info->triggered_writes_slowdown = triggered_writes_slowdown;
628       info->triggered_writes_stop = triggered_writes_stop;
629       for (auto listener : immutable_db_options_.listeners) {
630         listener->OnFlushCompleted(this, *info);
631       }
632     }
633     flush_jobs_info->clear();
634   }
635   mutex_.Lock();
636   // no need to signal bg_cv_ as it will be signaled at the end of the
637   // flush process.
638 #else
639   (void)cfd;
640   (void)mutable_cf_options;
641   (void)flush_jobs_info;
642 #endif  // ROCKSDB_LITE
643 }
644 
CompactRange(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin,const Slice * end)645 Status DBImpl::CompactRange(const CompactRangeOptions& options,
646                             ColumnFamilyHandle* column_family,
647                             const Slice* begin, const Slice* end) {
648   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
649   auto cfd = cfh->cfd();
650 
651   if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
652     return Status::InvalidArgument("Invalid target path ID");
653   }
654 
655   bool exclusive = options.exclusive_manual_compaction;
656 
657   bool flush_needed = true;
658   if (begin != nullptr && end != nullptr) {
659     // TODO(ajkr): We could also optimize away the flush in certain cases where
660     // one/both sides of the interval are unbounded. But it requires more
661     // changes to RangesOverlapWithMemtables.
662     Range range(*begin, *end);
663     SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
664     cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
665     CleanupSuperVersion(super_version);
666   }
667 
668   Status s;
669   if (flush_needed) {
670     FlushOptions fo;
671     fo.allow_write_stall = options.allow_write_stall;
672     if (immutable_db_options_.atomic_flush) {
673       autovector<ColumnFamilyData*> cfds;
674       mutex_.Lock();
675       SelectColumnFamiliesForAtomicFlush(&cfds);
676       mutex_.Unlock();
677       s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
678                                false /* writes_stopped */);
679     } else {
680       s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
681                         false /* writes_stopped*/);
682     }
683     if (!s.ok()) {
684       LogFlush(immutable_db_options_.info_log);
685       return s;
686     }
687   }
688 
689   int max_level_with_files = 0;
690   // max_file_num_to_ignore can be used to filter out newly created SST files,
691   // useful for bottom level compaction in a manual compaction
692   uint64_t max_file_num_to_ignore = port::kMaxUint64;
693   uint64_t next_file_number = port::kMaxUint64;
694   {
695     InstrumentedMutexLock l(&mutex_);
696     Version* base = cfd->current();
697     for (int level = 1; level < base->storage_info()->num_non_empty_levels();
698          level++) {
699       if (base->storage_info()->OverlapInLevel(level, begin, end)) {
700         max_level_with_files = level;
701       }
702     }
703     next_file_number = versions_->current_next_file_number();
704   }
705 
706   int final_output_level = 0;
707 
708   if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
709       cfd->NumberLevels() > 1) {
710     // Always compact all files together.
711     final_output_level = cfd->NumberLevels() - 1;
712     // if bottom most level is reserved
713     if (immutable_db_options_.allow_ingest_behind) {
714       final_output_level--;
715     }
716     s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
717                             final_output_level, options, begin, end, exclusive,
718                             false, max_file_num_to_ignore);
719   } else {
720     for (int level = 0; level <= max_level_with_files; level++) {
721       int output_level;
722       // in case the compaction is universal or if we're compacting the
723       // bottom-most level, the output level will be the same as input one.
724       // level 0 can never be the bottommost level (i.e. if all files are in
725       // level 0, we will compact to level 1)
726       if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
727           cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
728         output_level = level;
729       } else if (level == max_level_with_files && level > 0) {
730         if (options.bottommost_level_compaction ==
731             BottommostLevelCompaction::kSkip) {
732           // Skip bottommost level compaction
733           continue;
734         } else if (options.bottommost_level_compaction ==
735                        BottommostLevelCompaction::kIfHaveCompactionFilter &&
736                    cfd->ioptions()->compaction_filter == nullptr &&
737                    cfd->ioptions()->compaction_filter_factory == nullptr) {
738           // Skip bottommost level compaction since we don't have a compaction
739           // filter
740           continue;
741         }
742         output_level = level;
743         // update max_file_num_to_ignore only for bottom level compaction
744         // because data in newly compacted files in middle levels may still need
745         // to be pushed down
746         max_file_num_to_ignore = next_file_number;
747       } else {
748         output_level = level + 1;
749         if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
750             cfd->ioptions()->level_compaction_dynamic_level_bytes &&
751             level == 0) {
752           output_level = ColumnFamilyData::kCompactToBaseLevel;
753         }
754       }
755       s = RunManualCompaction(cfd, level, output_level, options, begin, end,
756                               exclusive, false, max_file_num_to_ignore);
757       if (!s.ok()) {
758         break;
759       }
760       if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
761         final_output_level = cfd->NumberLevels() - 1;
762       } else if (output_level > final_output_level) {
763         final_output_level = output_level;
764       }
765       TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
766       TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
767     }
768   }
769   if (!s.ok()) {
770     LogFlush(immutable_db_options_.info_log);
771     return s;
772   }
773 
774   if (options.change_level) {
775     ROCKS_LOG_INFO(immutable_db_options_.info_log,
776                    "[RefitLevel] waiting for background threads to stop");
777     s = PauseBackgroundWork();
778     if (s.ok()) {
779       s = ReFitLevel(cfd, final_output_level, options.target_level);
780     }
781     ContinueBackgroundWork();
782   }
783   LogFlush(immutable_db_options_.info_log);
784 
785   {
786     InstrumentedMutexLock l(&mutex_);
787     // an automatic compaction that has been scheduled might have been
788     // preempted by the manual compactions. Need to schedule it back.
789     MaybeScheduleFlushOrCompaction();
790   }
791 
792   return s;
793 }
794 
CompactFiles(const CompactionOptions & compact_options,ColumnFamilyHandle * column_family,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)795 Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
796                             ColumnFamilyHandle* column_family,
797                             const std::vector<std::string>& input_file_names,
798                             const int output_level, const int output_path_id,
799                             std::vector<std::string>* const output_file_names,
800                             CompactionJobInfo* compaction_job_info) {
801 #ifdef ROCKSDB_LITE
802   (void)compact_options;
803   (void)column_family;
804   (void)input_file_names;
805   (void)output_level;
806   (void)output_path_id;
807   (void)output_file_names;
808   (void)compaction_job_info;
809   // not supported in lite version
810   return Status::NotSupported("Not supported in ROCKSDB LITE");
811 #else
812   if (column_family == nullptr) {
813     return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
814   }
815 
816   auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
817   assert(cfd);
818 
819   Status s;
820   JobContext job_context(0, true);
821   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
822                        immutable_db_options_.info_log.get());
823 
824   // Perform CompactFiles
825   TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
826   {
827     InstrumentedMutexLock l(&mutex_);
828 
829     // This call will unlock/lock the mutex to wait for current running
830     // IngestExternalFile() calls to finish.
831     WaitForIngestFile();
832 
833     // We need to get current after `WaitForIngestFile`, because
834     // `IngestExternalFile` may add files that overlap with `input_file_names`
835     auto* current = cfd->current();
836     current->Ref();
837 
838     s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
839                          output_file_names, output_level, output_path_id,
840                          &job_context, &log_buffer, compaction_job_info);
841 
842     current->Unref();
843   }
844 
845   // Find and delete obsolete files
846   {
847     InstrumentedMutexLock l(&mutex_);
848     // If !s.ok(), this means that Compaction failed. In that case, we want
849     // to delete all obsolete files we might have created and we force
850     // FindObsoleteFiles(). This is because job_context does not
851     // catch all created files if compaction failed.
852     FindObsoleteFiles(&job_context, !s.ok());
853   }  // release the mutex
854 
855   // delete unnecessary files if any, this is done outside the mutex
856   if (job_context.HaveSomethingToClean() ||
857       job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
858     // Have to flush the info logs before bg_compaction_scheduled_--
859     // because if bg_flush_scheduled_ becomes 0 and the lock is
860     // released, the deconstructor of DB can kick in and destroy all the
861     // states of DB so info_log might not be available after that point.
862     // It also applies to access other states that DB owns.
863     log_buffer.FlushBufferToLog();
864     if (job_context.HaveSomethingToDelete()) {
865       // no mutex is locked here.  No need to Unlock() and Lock() here.
866       PurgeObsoleteFiles(job_context);
867     }
868     job_context.Clean();
869   }
870 
871   return s;
872 #endif  // ROCKSDB_LITE
873 }
874 
875 #ifndef ROCKSDB_LITE
CompactFilesImpl(const CompactionOptions & compact_options,ColumnFamilyData * cfd,Version * version,const std::vector<std::string> & input_file_names,std::vector<std::string> * const output_file_names,const int output_level,int output_path_id,JobContext * job_context,LogBuffer * log_buffer,CompactionJobInfo * compaction_job_info)876 Status DBImpl::CompactFilesImpl(
877     const CompactionOptions& compact_options, ColumnFamilyData* cfd,
878     Version* version, const std::vector<std::string>& input_file_names,
879     std::vector<std::string>* const output_file_names, const int output_level,
880     int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
881     CompactionJobInfo* compaction_job_info) {
882   mutex_.AssertHeld();
883 
884   if (shutting_down_.load(std::memory_order_acquire)) {
885     return Status::ShutdownInProgress();
886   }
887   if (manual_compaction_paused_.load(std::memory_order_acquire)) {
888     return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
889   }
890 
891   std::unordered_set<uint64_t> input_set;
892   for (const auto& file_name : input_file_names) {
893     input_set.insert(TableFileNameToNumber(file_name));
894   }
895 
896   ColumnFamilyMetaData cf_meta;
897   // TODO(yhchiang): can directly use version here if none of the
898   // following functions call is pluggable to external developers.
899   version->GetColumnFamilyMetaData(&cf_meta);
900 
901   if (output_path_id < 0) {
902     if (cfd->ioptions()->cf_paths.size() == 1U) {
903       output_path_id = 0;
904     } else {
905       return Status::NotSupported(
906           "Automatic output path selection is not "
907           "yet supported in CompactFiles()");
908     }
909   }
910 
911   Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
912       &input_set, cf_meta, output_level);
913   if (!s.ok()) {
914     return s;
915   }
916 
917   std::vector<CompactionInputFiles> input_files;
918   s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
919       &input_files, &input_set, version->storage_info(), compact_options);
920   if (!s.ok()) {
921     return s;
922   }
923 
924   for (const auto& inputs : input_files) {
925     if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
926       return Status::Aborted(
927           "Some of the necessary compaction input "
928           "files are already being compacted");
929     }
930   }
931   bool sfm_reserved_compact_space = false;
932   // First check if we have enough room to do the compaction
933   bool enough_room = EnoughRoomForCompaction(
934       cfd, input_files, &sfm_reserved_compact_space, log_buffer);
935 
936   if (!enough_room) {
937     // m's vars will get set properly at the end of this function,
938     // as long as status == CompactionTooLarge
939     return Status::CompactionTooLarge();
940   }
941 
942   // At this point, CompactFiles will be run.
943   bg_compaction_scheduled_++;
944 
945   std::unique_ptr<Compaction> c;
946   assert(cfd->compaction_picker());
947   c.reset(cfd->compaction_picker()->CompactFiles(
948       compact_options, input_files, output_level, version->storage_info(),
949       *cfd->GetLatestMutableCFOptions(), output_path_id));
950   // we already sanitized the set of input files and checked for conflicts
951   // without releasing the lock, so we're guaranteed a compaction can be formed.
952   assert(c != nullptr);
953 
954   c->SetInputVersion(version);
955   // deletion compaction currently not allowed in CompactFiles.
956   assert(!c->deletion_compaction());
957 
958   std::vector<SequenceNumber> snapshot_seqs;
959   SequenceNumber earliest_write_conflict_snapshot;
960   SnapshotChecker* snapshot_checker;
961   GetSnapshotContext(job_context, &snapshot_seqs,
962                      &earliest_write_conflict_snapshot, &snapshot_checker);
963 
964   std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
965       new std::list<uint64_t>::iterator(
966           CaptureCurrentFileNumberInPendingOutputs()));
967 
968   assert(is_snapshot_supported_ || snapshots_.empty());
969   CompactionJobStats compaction_job_stats;
970   CompactionJob compaction_job(
971       job_context->job_id, c.get(), immutable_db_options_,
972       file_options_for_compaction_, versions_.get(), &shutting_down_,
973       preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
974       GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
975       &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
976       snapshot_checker, table_cache_, &event_logger_,
977       c->mutable_cf_options()->paranoid_file_checks,
978       c->mutable_cf_options()->report_bg_io_stats, dbname_,
979       &compaction_job_stats, Env::Priority::USER, &manual_compaction_paused_);
980 
981   // Creating a compaction influences the compaction score because the score
982   // takes running compactions into account (by skipping files that are already
983   // being compacted). Since we just changed compaction score, we recalculate it
984   // here.
985   version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
986                                                   *c->mutable_cf_options());
987 
988   compaction_job.Prepare();
989 
990   mutex_.Unlock();
991   TEST_SYNC_POINT("CompactFilesImpl:0");
992   TEST_SYNC_POINT("CompactFilesImpl:1");
993   compaction_job.Run();
994   TEST_SYNC_POINT("CompactFilesImpl:2");
995   TEST_SYNC_POINT("CompactFilesImpl:3");
996   mutex_.Lock();
997 
998   Status status = compaction_job.Install(*c->mutable_cf_options());
999   if (status.ok()) {
1000     InstallSuperVersionAndScheduleWork(c->column_family_data(),
1001                                        &job_context->superversion_contexts[0],
1002                                        *c->mutable_cf_options());
1003   }
1004   c->ReleaseCompactionFiles(s);
1005 #ifndef ROCKSDB_LITE
1006   // Need to make sure SstFileManager does its bookkeeping
1007   auto sfm = static_cast<SstFileManagerImpl*>(
1008       immutable_db_options_.sst_file_manager.get());
1009   if (sfm && sfm_reserved_compact_space) {
1010     sfm->OnCompactionCompletion(c.get());
1011   }
1012 #endif  // ROCKSDB_LITE
1013 
1014   ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1015 
1016   if (compaction_job_info != nullptr) {
1017     BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1018                            job_context->job_id, version, compaction_job_info);
1019   }
1020 
1021   if (status.ok()) {
1022     // Done
1023   } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
1024     // Ignore compaction errors found during shutting down
1025   } else if (status.IsManualCompactionPaused()) {
1026     // Don't report stopping manual compaction as error
1027     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1028                    "[%s] [JOB %d] Stopping manual compaction",
1029                    c->column_family_data()->GetName().c_str(),
1030                    job_context->job_id);
1031   } else {
1032     ROCKS_LOG_WARN(immutable_db_options_.info_log,
1033                    "[%s] [JOB %d] Compaction error: %s",
1034                    c->column_family_data()->GetName().c_str(),
1035                    job_context->job_id, status.ToString().c_str());
1036     error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1037   }
1038 
1039   if (output_file_names != nullptr) {
1040     for (const auto newf : c->edit()->GetNewFiles()) {
1041       (*output_file_names)
1042           .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1043                                    newf.second.fd.GetNumber(),
1044                                    newf.second.fd.GetPathId()));
1045     }
1046   }
1047 
1048   c.reset();
1049 
1050   bg_compaction_scheduled_--;
1051   if (bg_compaction_scheduled_ == 0) {
1052     bg_cv_.SignalAll();
1053   }
1054   MaybeScheduleFlushOrCompaction();
1055   TEST_SYNC_POINT("CompactFilesImpl:End");
1056 
1057   return status;
1058 }
1059 #endif  // ROCKSDB_LITE
1060 
PauseBackgroundWork()1061 Status DBImpl::PauseBackgroundWork() {
1062   InstrumentedMutexLock guard_lock(&mutex_);
1063   bg_compaction_paused_++;
1064   while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1065          bg_flush_scheduled_ > 0) {
1066     bg_cv_.Wait();
1067   }
1068   bg_work_paused_++;
1069   return Status::OK();
1070 }
1071 
ContinueBackgroundWork()1072 Status DBImpl::ContinueBackgroundWork() {
1073   InstrumentedMutexLock guard_lock(&mutex_);
1074   if (bg_work_paused_ == 0) {
1075     return Status::InvalidArgument();
1076   }
1077   assert(bg_work_paused_ > 0);
1078   assert(bg_compaction_paused_ > 0);
1079   bg_compaction_paused_--;
1080   bg_work_paused_--;
1081   // It's sufficient to check just bg_work_paused_ here since
1082   // bg_work_paused_ is always no greater than bg_compaction_paused_
1083   if (bg_work_paused_ == 0) {
1084     MaybeScheduleFlushOrCompaction();
1085   }
1086   return Status::OK();
1087 }
1088 
NotifyOnCompactionBegin(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & job_stats,int job_id)1089 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1090                                      const Status& st,
1091                                      const CompactionJobStats& job_stats,
1092                                      int job_id) {
1093 #ifndef ROCKSDB_LITE
1094   if (immutable_db_options_.listeners.empty()) {
1095     return;
1096   }
1097   mutex_.AssertHeld();
1098   if (shutting_down_.load(std::memory_order_acquire)) {
1099     return;
1100   }
1101   if (c->is_manual_compaction() &&
1102       manual_compaction_paused_.load(std::memory_order_acquire)) {
1103     return;
1104   }
1105   Version* current = cfd->current();
1106   current->Ref();
1107   // release lock while notifying events
1108   mutex_.Unlock();
1109   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1110   {
1111     CompactionJobInfo info{};
1112     info.cf_name = cfd->GetName();
1113     info.status = st;
1114     info.thread_id = env_->GetThreadID();
1115     info.job_id = job_id;
1116     info.base_input_level = c->start_level();
1117     info.output_level = c->output_level();
1118     info.stats = job_stats;
1119     info.table_properties = c->GetOutputTableProperties();
1120     info.compaction_reason = c->compaction_reason();
1121     info.compression = c->output_compression();
1122     for (size_t i = 0; i < c->num_input_levels(); ++i) {
1123       for (const auto fmd : *c->inputs(i)) {
1124         const FileDescriptor& desc = fmd->fd;
1125         const uint64_t file_number = desc.GetNumber();
1126         auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
1127                                 file_number, desc.GetPathId());
1128         info.input_files.push_back(fn);
1129         info.input_file_infos.push_back(CompactionFileInfo{
1130             static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
1131         if (info.table_properties.count(fn) == 0) {
1132           std::shared_ptr<const TableProperties> tp;
1133           auto s = current->GetTableProperties(&tp, fmd, &fn);
1134           if (s.ok()) {
1135             info.table_properties[fn] = tp;
1136           }
1137         }
1138       }
1139     }
1140     for (const auto newf : c->edit()->GetNewFiles()) {
1141       const FileMetaData& meta = newf.second;
1142       const FileDescriptor& desc = meta.fd;
1143       const uint64_t file_number = desc.GetNumber();
1144       info.output_files.push_back(TableFileName(
1145           c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
1146       info.output_file_infos.push_back(CompactionFileInfo{
1147           newf.first, file_number, meta.oldest_blob_file_number});
1148     }
1149     for (auto listener : immutable_db_options_.listeners) {
1150       listener->OnCompactionBegin(this, info);
1151     }
1152   }
1153   mutex_.Lock();
1154   current->Unref();
1155 #else
1156   (void)cfd;
1157   (void)c;
1158   (void)st;
1159   (void)job_stats;
1160   (void)job_id;
1161 #endif  // ROCKSDB_LITE
1162 }
1163 
NotifyOnCompactionCompleted(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id)1164 void DBImpl::NotifyOnCompactionCompleted(
1165     ColumnFamilyData* cfd, Compaction* c, const Status& st,
1166     const CompactionJobStats& compaction_job_stats, const int job_id) {
1167 #ifndef ROCKSDB_LITE
1168   if (immutable_db_options_.listeners.size() == 0U) {
1169     return;
1170   }
1171   mutex_.AssertHeld();
1172   if (shutting_down_.load(std::memory_order_acquire)) {
1173     return;
1174   }
1175   if (c->is_manual_compaction() &&
1176       manual_compaction_paused_.load(std::memory_order_acquire)) {
1177     return;
1178   }
1179   Version* current = cfd->current();
1180   current->Ref();
1181   // release lock while notifying events
1182   mutex_.Unlock();
1183   TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1184   {
1185     CompactionJobInfo info{};
1186     BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1187                            &info);
1188     for (auto listener : immutable_db_options_.listeners) {
1189       listener->OnCompactionCompleted(this, info);
1190     }
1191   }
1192   mutex_.Lock();
1193   current->Unref();
1194   // no need to signal bg_cv_ as it will be signaled at the end of the
1195   // flush process.
1196 #else
1197   (void)cfd;
1198   (void)c;
1199   (void)st;
1200   (void)compaction_job_stats;
1201   (void)job_id;
1202 #endif  // ROCKSDB_LITE
1203 }
1204 
1205 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1206 // before calling this function
ReFitLevel(ColumnFamilyData * cfd,int level,int target_level)1207 Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1208   assert(level < cfd->NumberLevels());
1209   if (target_level >= cfd->NumberLevels()) {
1210     return Status::InvalidArgument("Target level exceeds number of levels");
1211   }
1212 
1213   SuperVersionContext sv_context(/* create_superversion */ true);
1214 
1215   Status status;
1216 
1217   InstrumentedMutexLock guard_lock(&mutex_);
1218 
1219   // only allow one thread refitting
1220   if (refitting_level_) {
1221     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1222                    "[ReFitLevel] another thread is refitting");
1223     return Status::NotSupported("another thread is refitting");
1224   }
1225   refitting_level_ = true;
1226 
1227   const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1228   // move to a smaller level
1229   int to_level = target_level;
1230   if (target_level < 0) {
1231     to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1232   }
1233 
1234   auto* vstorage = cfd->current()->storage_info();
1235   if (to_level > level) {
1236     if (level == 0) {
1237       return Status::NotSupported(
1238           "Cannot change from level 0 to other levels.");
1239     }
1240     // Check levels are empty for a trivial move
1241     for (int l = level + 1; l <= to_level; l++) {
1242       if (vstorage->NumLevelFiles(l) > 0) {
1243         return Status::NotSupported(
1244             "Levels between source and target are not empty for a move.");
1245       }
1246     }
1247   }
1248   if (to_level != level) {
1249     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1250                     "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1251                     cfd->current()->DebugString().data());
1252 
1253     VersionEdit edit;
1254     edit.SetColumnFamily(cfd->GetID());
1255     for (const auto& f : vstorage->LevelFiles(level)) {
1256       edit.DeleteFile(level, f->fd.GetNumber());
1257       edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1258                    f->fd.GetFileSize(), f->smallest, f->largest,
1259                    f->fd.smallest_seqno, f->fd.largest_seqno,
1260                    f->marked_for_compaction, f->oldest_blob_file_number,
1261                    f->oldest_ancester_time, f->file_creation_time,
1262                    f->file_checksum, f->file_checksum_func_name);
1263     }
1264     ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1265                     "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1266                     edit.DebugString().data());
1267 
1268     status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
1269                                     directories_.GetDbDir());
1270     InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
1271 
1272     ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1273                     cfd->GetName().c_str(), status.ToString().data());
1274 
1275     if (status.ok()) {
1276       ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1277                       "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1278                       cfd->current()->DebugString().data());
1279     }
1280   }
1281 
1282   sv_context.Clean();
1283   refitting_level_ = false;
1284 
1285   return status;
1286 }
1287 
NumberLevels(ColumnFamilyHandle * column_family)1288 int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1289   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1290   return cfh->cfd()->NumberLevels();
1291 }
1292 
MaxMemCompactionLevel(ColumnFamilyHandle *)1293 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
1294   return 0;
1295 }
1296 
Level0StopWriteTrigger(ColumnFamilyHandle * column_family)1297 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1298   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1299   InstrumentedMutexLock l(&mutex_);
1300   return cfh->cfd()
1301       ->GetSuperVersion()
1302       ->mutable_cf_options.level0_stop_writes_trigger;
1303 }
1304 
Flush(const FlushOptions & flush_options,ColumnFamilyHandle * column_family)1305 Status DBImpl::Flush(const FlushOptions& flush_options,
1306                      ColumnFamilyHandle* column_family) {
1307   auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1308   ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1309                  cfh->GetName().c_str());
1310   Status s;
1311   if (immutable_db_options_.atomic_flush) {
1312     s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1313                              FlushReason::kManualFlush);
1314   } else {
1315     s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1316   }
1317 
1318   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1319                  "[%s] Manual flush finished, status: %s\n",
1320                  cfh->GetName().c_str(), s.ToString().c_str());
1321   return s;
1322 }
1323 
Flush(const FlushOptions & flush_options,const std::vector<ColumnFamilyHandle * > & column_families)1324 Status DBImpl::Flush(const FlushOptions& flush_options,
1325                      const std::vector<ColumnFamilyHandle*>& column_families) {
1326   Status s;
1327   if (!immutable_db_options_.atomic_flush) {
1328     for (auto cfh : column_families) {
1329       s = Flush(flush_options, cfh);
1330       if (!s.ok()) {
1331         break;
1332       }
1333     }
1334   } else {
1335     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1336                    "Manual atomic flush start.\n"
1337                    "=====Column families:=====");
1338     for (auto cfh : column_families) {
1339       auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1340       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1341                      cfhi->GetName().c_str());
1342     }
1343     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1344                    "=====End of column families list=====");
1345     autovector<ColumnFamilyData*> cfds;
1346     std::for_each(column_families.begin(), column_families.end(),
1347                   [&cfds](ColumnFamilyHandle* elem) {
1348                     auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1349                     cfds.emplace_back(cfh->cfd());
1350                   });
1351     s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1352     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1353                    "Manual atomic flush finished, status: %s\n"
1354                    "=====Column families:=====",
1355                    s.ToString().c_str());
1356     for (auto cfh : column_families) {
1357       auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1358       ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1359                      cfhi->GetName().c_str());
1360     }
1361     ROCKS_LOG_INFO(immutable_db_options_.info_log,
1362                    "=====End of column families list=====");
1363   }
1364   return s;
1365 }
1366 
RunManualCompaction(ColumnFamilyData * cfd,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const Slice * begin,const Slice * end,bool exclusive,bool disallow_trivial_move,uint64_t max_file_num_to_ignore)1367 Status DBImpl::RunManualCompaction(
1368     ColumnFamilyData* cfd, int input_level, int output_level,
1369     const CompactRangeOptions& compact_range_options, const Slice* begin,
1370     const Slice* end, bool exclusive, bool disallow_trivial_move,
1371     uint64_t max_file_num_to_ignore) {
1372   assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1373          input_level >= 0);
1374 
1375   InternalKey begin_storage, end_storage;
1376   CompactionArg* ca;
1377 
1378   bool scheduled = false;
1379   bool manual_conflict = false;
1380   ManualCompactionState manual;
1381   manual.cfd = cfd;
1382   manual.input_level = input_level;
1383   manual.output_level = output_level;
1384   manual.output_path_id = compact_range_options.target_path_id;
1385   manual.done = false;
1386   manual.in_progress = false;
1387   manual.incomplete = false;
1388   manual.exclusive = exclusive;
1389   manual.disallow_trivial_move = disallow_trivial_move;
1390   // For universal compaction, we enforce every manual compaction to compact
1391   // all files.
1392   if (begin == nullptr ||
1393       cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1394       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1395     manual.begin = nullptr;
1396   } else {
1397     begin_storage.SetMinPossibleForUserKey(*begin);
1398     manual.begin = &begin_storage;
1399   }
1400   if (end == nullptr ||
1401       cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1402       cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1403     manual.end = nullptr;
1404   } else {
1405     end_storage.SetMaxPossibleForUserKey(*end);
1406     manual.end = &end_storage;
1407   }
1408 
1409   TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1410   TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1411   InstrumentedMutexLock l(&mutex_);
1412 
1413   // When a manual compaction arrives, temporarily disable scheduling of
1414   // non-manual compactions and wait until the number of scheduled compaction
1415   // jobs drops to zero. This is needed to ensure that this manual compaction
1416   // can compact any range of keys/files.
1417   //
1418   // HasPendingManualCompaction() is true when at least one thread is inside
1419   // RunManualCompaction(), i.e. during that time no other compaction will
1420   // get scheduled (see MaybeScheduleFlushOrCompaction).
1421   //
1422   // Note that the following loop doesn't stop more that one thread calling
1423   // RunManualCompaction() from getting to the second while loop below.
1424   // However, only one of them will actually schedule compaction, while
1425   // others will wait on a condition variable until it completes.
1426 
1427   AddManualCompaction(&manual);
1428   TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1429   if (exclusive) {
1430     while (bg_bottom_compaction_scheduled_ > 0 ||
1431            bg_compaction_scheduled_ > 0) {
1432       TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1433       ROCKS_LOG_INFO(
1434           immutable_db_options_.info_log,
1435           "[%s] Manual compaction waiting for all other scheduled background "
1436           "compactions to finish",
1437           cfd->GetName().c_str());
1438       bg_cv_.Wait();
1439     }
1440   }
1441 
1442   ROCKS_LOG_INFO(immutable_db_options_.info_log,
1443                  "[%s] Manual compaction starting", cfd->GetName().c_str());
1444 
1445   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1446                        immutable_db_options_.info_log.get());
1447   // We don't check bg_error_ here, because if we get the error in compaction,
1448   // the compaction will set manual.status to bg_error_ and set manual.done to
1449   // true.
1450   while (!manual.done) {
1451     assert(HasPendingManualCompaction());
1452     manual_conflict = false;
1453     Compaction* compaction = nullptr;
1454     if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1455         scheduled ||
1456         (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1457          ((compaction = manual.cfd->CompactRange(
1458                *manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
1459                manual.output_level, compact_range_options, manual.begin,
1460                manual.end, &manual.manual_end, &manual_conflict,
1461                max_file_num_to_ignore)) == nullptr &&
1462           manual_conflict))) {
1463       // exclusive manual compactions should not see a conflict during
1464       // CompactRange
1465       assert(!exclusive || !manual_conflict);
1466       // Running either this or some other manual compaction
1467       bg_cv_.Wait();
1468       if (scheduled && manual.incomplete == true) {
1469         assert(!manual.in_progress);
1470         scheduled = false;
1471         manual.incomplete = false;
1472       }
1473     } else if (!scheduled) {
1474       if (compaction == nullptr) {
1475         manual.done = true;
1476         bg_cv_.SignalAll();
1477         continue;
1478       }
1479       ca = new CompactionArg;
1480       ca->db = this;
1481       ca->prepicked_compaction = new PrepickedCompaction;
1482       ca->prepicked_compaction->manual_compaction_state = &manual;
1483       ca->prepicked_compaction->compaction = compaction;
1484       if (!RequestCompactionToken(
1485               cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1486         // Don't throttle manual compaction, only count outstanding tasks.
1487         assert(false);
1488       }
1489       manual.incomplete = false;
1490       bg_compaction_scheduled_++;
1491       env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
1492                      &DBImpl::UnscheduleCompactionCallback);
1493       scheduled = true;
1494     }
1495   }
1496 
1497   log_buffer.FlushBufferToLog();
1498   assert(!manual.in_progress);
1499   assert(HasPendingManualCompaction());
1500   RemoveManualCompaction(&manual);
1501   bg_cv_.SignalAll();
1502   return manual.status;
1503 }
1504 
GenerateFlushRequest(const autovector<ColumnFamilyData * > & cfds,FlushRequest * req)1505 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1506                                   FlushRequest* req) {
1507   assert(req != nullptr);
1508   req->reserve(cfds.size());
1509   for (const auto cfd : cfds) {
1510     if (nullptr == cfd) {
1511       // cfd may be null, see DBImpl::ScheduleFlushes
1512       continue;
1513     }
1514     uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1515     req->emplace_back(cfd, max_memtable_id);
1516   }
1517 }
1518 
FlushMemTable(ColumnFamilyData * cfd,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1519 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1520                              const FlushOptions& flush_options,
1521                              FlushReason flush_reason, bool writes_stopped) {
1522   Status s;
1523   uint64_t flush_memtable_id = 0;
1524   if (!flush_options.allow_write_stall) {
1525     bool flush_needed = true;
1526     s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1527     TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1528     if (!s.ok() || !flush_needed) {
1529       return s;
1530     }
1531   }
1532   FlushRequest flush_req;
1533   {
1534     WriteContext context;
1535     InstrumentedMutexLock guard_lock(&mutex_);
1536 
1537     WriteThread::Writer w;
1538     WriteThread::Writer nonmem_w;
1539     if (!writes_stopped) {
1540       write_thread_.EnterUnbatched(&w, &mutex_);
1541       if (two_write_queues_) {
1542         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1543       }
1544     }
1545     WaitForPendingWrites();
1546 
1547     if (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load()) {
1548       s = SwitchMemtable(cfd, &context);
1549     }
1550     if (s.ok()) {
1551       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1552           !cached_recoverable_state_empty_.load()) {
1553         flush_memtable_id = cfd->imm()->GetLatestMemTableID();
1554         flush_req.emplace_back(cfd, flush_memtable_id);
1555       }
1556       if (immutable_db_options_.persist_stats_to_disk) {
1557         ColumnFamilyData* cfd_stats =
1558             versions_->GetColumnFamilySet()->GetColumnFamily(
1559                 kPersistentStatsColumnFamilyName);
1560         if (cfd_stats != nullptr && cfd_stats != cfd &&
1561             !cfd_stats->mem()->IsEmpty()) {
1562           // only force flush stats CF when it will be the only CF lagging
1563           // behind after the current flush
1564           bool stats_cf_flush_needed = true;
1565           for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1566             if (loop_cfd == cfd_stats || loop_cfd == cfd) {
1567               continue;
1568             }
1569             if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1570               stats_cf_flush_needed = false;
1571             }
1572           }
1573           if (stats_cf_flush_needed) {
1574             ROCKS_LOG_INFO(immutable_db_options_.info_log,
1575                            "Force flushing stats CF with manual flush of %s "
1576                            "to avoid holding old logs",
1577                            cfd->GetName().c_str());
1578             s = SwitchMemtable(cfd_stats, &context);
1579             flush_memtable_id = cfd_stats->imm()->GetLatestMemTableID();
1580             flush_req.emplace_back(cfd_stats, flush_memtable_id);
1581           }
1582         }
1583       }
1584     }
1585 
1586     if (s.ok() && !flush_req.empty()) {
1587       for (auto& elem : flush_req) {
1588         ColumnFamilyData* loop_cfd = elem.first;
1589         loop_cfd->imm()->FlushRequested();
1590       }
1591       // If the caller wants to wait for this flush to complete, it indicates
1592       // that the caller expects the ColumnFamilyData not to be free'ed by
1593       // other threads which may drop the column family concurrently.
1594       // Therefore, we increase the cfd's ref count.
1595       if (flush_options.wait) {
1596         for (auto& elem : flush_req) {
1597           ColumnFamilyData* loop_cfd = elem.first;
1598           loop_cfd->Ref();
1599         }
1600       }
1601       SchedulePendingFlush(flush_req, flush_reason);
1602       MaybeScheduleFlushOrCompaction();
1603     }
1604 
1605     if (!writes_stopped) {
1606       write_thread_.ExitUnbatched(&w);
1607       if (two_write_queues_) {
1608         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1609       }
1610     }
1611   }
1612   TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1613   TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1614   if (s.ok() && flush_options.wait) {
1615     autovector<ColumnFamilyData*> cfds;
1616     autovector<const uint64_t*> flush_memtable_ids;
1617     for (auto& iter : flush_req) {
1618       cfds.push_back(iter.first);
1619       flush_memtable_ids.push_back(&(iter.second));
1620     }
1621     s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1622                               (flush_reason == FlushReason::kErrorRecovery));
1623     InstrumentedMutexLock lock_guard(&mutex_);
1624     for (auto* tmp_cfd : cfds) {
1625       tmp_cfd->UnrefAndTryDelete();
1626     }
1627   }
1628   TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1629   return s;
1630 }
1631 
1632 // Flush all elements in 'column_family_datas'
1633 // and atomically record the result to the MANIFEST.
AtomicFlushMemTables(const autovector<ColumnFamilyData * > & column_family_datas,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1634 Status DBImpl::AtomicFlushMemTables(
1635     const autovector<ColumnFamilyData*>& column_family_datas,
1636     const FlushOptions& flush_options, FlushReason flush_reason,
1637     bool writes_stopped) {
1638   Status s;
1639   if (!flush_options.allow_write_stall) {
1640     int num_cfs_to_flush = 0;
1641     for (auto cfd : column_family_datas) {
1642       bool flush_needed = true;
1643       s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1644       if (!s.ok()) {
1645         return s;
1646       } else if (flush_needed) {
1647         ++num_cfs_to_flush;
1648       }
1649     }
1650     if (0 == num_cfs_to_flush) {
1651       return s;
1652     }
1653   }
1654   FlushRequest flush_req;
1655   autovector<ColumnFamilyData*> cfds;
1656   {
1657     WriteContext context;
1658     InstrumentedMutexLock guard_lock(&mutex_);
1659 
1660     WriteThread::Writer w;
1661     WriteThread::Writer nonmem_w;
1662     if (!writes_stopped) {
1663       write_thread_.EnterUnbatched(&w, &mutex_);
1664       if (two_write_queues_) {
1665         nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1666       }
1667     }
1668     WaitForPendingWrites();
1669 
1670     for (auto cfd : column_family_datas) {
1671       if (cfd->IsDropped()) {
1672         continue;
1673       }
1674       if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1675           !cached_recoverable_state_empty_.load()) {
1676         cfds.emplace_back(cfd);
1677       }
1678     }
1679     for (auto cfd : cfds) {
1680       if (cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) {
1681         continue;
1682       }
1683       cfd->Ref();
1684       s = SwitchMemtable(cfd, &context);
1685       cfd->UnrefAndTryDelete();
1686       if (!s.ok()) {
1687         break;
1688       }
1689     }
1690     if (s.ok()) {
1691       AssignAtomicFlushSeq(cfds);
1692       for (auto cfd : cfds) {
1693         cfd->imm()->FlushRequested();
1694       }
1695       // If the caller wants to wait for this flush to complete, it indicates
1696       // that the caller expects the ColumnFamilyData not to be free'ed by
1697       // other threads which may drop the column family concurrently.
1698       // Therefore, we increase the cfd's ref count.
1699       if (flush_options.wait) {
1700         for (auto cfd : cfds) {
1701           cfd->Ref();
1702         }
1703       }
1704       GenerateFlushRequest(cfds, &flush_req);
1705       SchedulePendingFlush(flush_req, flush_reason);
1706       MaybeScheduleFlushOrCompaction();
1707     }
1708 
1709     if (!writes_stopped) {
1710       write_thread_.ExitUnbatched(&w);
1711       if (two_write_queues_) {
1712         nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1713       }
1714     }
1715   }
1716   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
1717   TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
1718   if (s.ok() && flush_options.wait) {
1719     autovector<const uint64_t*> flush_memtable_ids;
1720     for (auto& iter : flush_req) {
1721       flush_memtable_ids.push_back(&(iter.second));
1722     }
1723     s = WaitForFlushMemTables(cfds, flush_memtable_ids,
1724                               (flush_reason == FlushReason::kErrorRecovery));
1725     InstrumentedMutexLock lock_guard(&mutex_);
1726     for (auto* cfd : cfds) {
1727       cfd->UnrefAndTryDelete();
1728     }
1729   }
1730   return s;
1731 }
1732 
1733 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
1734 // cause write stall, for example if one memtable is being flushed already.
1735 // This method tries to avoid write stall (similar to CompactRange() behavior)
1736 // it emulates how the SuperVersion / LSM would change if flush happens, checks
1737 // it against various constrains and delays flush if it'd cause write stall.
1738 // Called should check status and flush_needed to see if flush already happened.
WaitUntilFlushWouldNotStallWrites(ColumnFamilyData * cfd,bool * flush_needed)1739 Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
1740                                                  bool* flush_needed) {
1741   {
1742     *flush_needed = true;
1743     InstrumentedMutexLock l(&mutex_);
1744     uint64_t orig_active_memtable_id = cfd->mem()->GetID();
1745     WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
1746     do {
1747       if (write_stall_condition != WriteStallCondition::kNormal) {
1748         // Same error handling as user writes: Don't wait if there's a
1749         // background error, even if it's a soft error. We might wait here
1750         // indefinitely as the pending flushes/compactions may never finish
1751         // successfully, resulting in the stall condition lasting indefinitely
1752         if (error_handler_.IsBGWorkStopped()) {
1753           return error_handler_.GetBGError();
1754         }
1755 
1756         TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
1757         ROCKS_LOG_INFO(immutable_db_options_.info_log,
1758                        "[%s] WaitUntilFlushWouldNotStallWrites"
1759                        " waiting on stall conditions to clear",
1760                        cfd->GetName().c_str());
1761         bg_cv_.Wait();
1762       }
1763       if (cfd->IsDropped()) {
1764         return Status::ColumnFamilyDropped();
1765       }
1766       if (shutting_down_.load(std::memory_order_acquire)) {
1767         return Status::ShutdownInProgress();
1768       }
1769 
1770       uint64_t earliest_memtable_id =
1771           std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
1772       if (earliest_memtable_id > orig_active_memtable_id) {
1773         // We waited so long that the memtable we were originally waiting on was
1774         // flushed.
1775         *flush_needed = false;
1776         return Status::OK();
1777       }
1778 
1779       const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1780       const auto* vstorage = cfd->current()->storage_info();
1781 
1782       // Skip stalling check if we're below auto-flush and auto-compaction
1783       // triggers. If it stalled in these conditions, that'd mean the stall
1784       // triggers are so low that stalling is needed for any background work. In
1785       // that case we shouldn't wait since background work won't be scheduled.
1786       if (cfd->imm()->NumNotFlushed() <
1787               cfd->ioptions()->min_write_buffer_number_to_merge &&
1788           vstorage->l0_delay_trigger_count() <
1789               mutable_cf_options.level0_file_num_compaction_trigger) {
1790         break;
1791       }
1792 
1793       // check whether one extra immutable memtable or an extra L0 file would
1794       // cause write stalling mode to be entered. It could still enter stall
1795       // mode due to pending compaction bytes, but that's less common
1796       write_stall_condition =
1797           ColumnFamilyData::GetWriteStallConditionAndCause(
1798               cfd->imm()->NumNotFlushed() + 1,
1799               vstorage->l0_delay_trigger_count() + 1,
1800               vstorage->estimated_compaction_needed_bytes(), mutable_cf_options)
1801               .first;
1802     } while (write_stall_condition != WriteStallCondition::kNormal);
1803   }
1804   return Status::OK();
1805 }
1806 
1807 // Wait for memtables to be flushed for multiple column families.
1808 // let N = cfds.size()
1809 // for i in [0, N),
1810 //  1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
1811 //     have to be flushed for THIS column family;
1812 //  2) if flush_memtable_ids[i] is null, then all memtables in THIS column
1813 //     family have to be flushed.
1814 // Finish waiting when ALL column families finish flushing memtables.
1815 // resuming_from_bg_err indicates whether the caller is trying to resume from
1816 // background error or in normal processing.
WaitForFlushMemTables(const autovector<ColumnFamilyData * > & cfds,const autovector<const uint64_t * > & flush_memtable_ids,bool resuming_from_bg_err)1817 Status DBImpl::WaitForFlushMemTables(
1818     const autovector<ColumnFamilyData*>& cfds,
1819     const autovector<const uint64_t*>& flush_memtable_ids,
1820     bool resuming_from_bg_err) {
1821   int num = static_cast<int>(cfds.size());
1822   // Wait until the compaction completes
1823   InstrumentedMutexLock l(&mutex_);
1824   // If the caller is trying to resume from bg error, then
1825   // error_handler_.IsDBStopped() is true.
1826   while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
1827     if (shutting_down_.load(std::memory_order_acquire)) {
1828       return Status::ShutdownInProgress();
1829     }
1830     // If an error has occurred during resumption, then no need to wait.
1831     if (!error_handler_.GetRecoveryError().ok()) {
1832       break;
1833     }
1834     // Number of column families that have been dropped.
1835     int num_dropped = 0;
1836     // Number of column families that have finished flush.
1837     int num_finished = 0;
1838     for (int i = 0; i < num; ++i) {
1839       if (cfds[i]->IsDropped()) {
1840         ++num_dropped;
1841       } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
1842                  (flush_memtable_ids[i] != nullptr &&
1843                   cfds[i]->imm()->GetEarliestMemTableID() >
1844                       *flush_memtable_ids[i])) {
1845         ++num_finished;
1846       }
1847     }
1848     if (1 == num_dropped && 1 == num) {
1849       return Status::InvalidArgument("Cannot flush a dropped CF");
1850     }
1851     // Column families involved in this flush request have either been dropped
1852     // or finished flush. Then it's time to finish waiting.
1853     if (num_dropped + num_finished == num) {
1854       break;
1855     }
1856     bg_cv_.Wait();
1857   }
1858   Status s;
1859   // If not resuming from bg error, and an error has caused the DB to stop,
1860   // then report the bg error to caller.
1861   if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
1862     s = error_handler_.GetBGError();
1863   }
1864   return s;
1865 }
1866 
EnableAutoCompaction(const std::vector<ColumnFamilyHandle * > & column_family_handles)1867 Status DBImpl::EnableAutoCompaction(
1868     const std::vector<ColumnFamilyHandle*>& column_family_handles) {
1869   Status s;
1870   for (auto cf_ptr : column_family_handles) {
1871     Status status =
1872         this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
1873     if (!status.ok()) {
1874       s = status;
1875     }
1876   }
1877 
1878   return s;
1879 }
1880 
DisableManualCompaction()1881 void DBImpl::DisableManualCompaction() {
1882   manual_compaction_paused_.store(true, std::memory_order_release);
1883 }
1884 
EnableManualCompaction()1885 void DBImpl::EnableManualCompaction() {
1886   manual_compaction_paused_.store(false, std::memory_order_release);
1887 }
1888 
MaybeScheduleFlushOrCompaction()1889 void DBImpl::MaybeScheduleFlushOrCompaction() {
1890   mutex_.AssertHeld();
1891   if (!opened_successfully_) {
1892     // Compaction may introduce data race to DB open
1893     return;
1894   }
1895   if (bg_work_paused_ > 0) {
1896     // we paused the background work
1897     return;
1898   } else if (error_handler_.IsBGWorkStopped() &&
1899              !error_handler_.IsRecoveryInProgress()) {
1900     // There has been a hard error and this call is not part of the recovery
1901     // sequence. Bail out here so we don't get into an endless loop of
1902     // scheduling BG work which will again call this function
1903     return;
1904   } else if (shutting_down_.load(std::memory_order_acquire)) {
1905     // DB is being deleted; no more background compactions
1906     return;
1907   }
1908   auto bg_job_limits = GetBGJobLimits();
1909   bool is_flush_pool_empty =
1910       env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
1911   while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
1912          bg_flush_scheduled_ < bg_job_limits.max_flushes) {
1913     bg_flush_scheduled_++;
1914     FlushThreadArg* fta = new FlushThreadArg;
1915     fta->db_ = this;
1916     fta->thread_pri_ = Env::Priority::HIGH;
1917     env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
1918                    &DBImpl::UnscheduleFlushCallback);
1919     --unscheduled_flushes_;
1920     TEST_SYNC_POINT_CALLBACK(
1921         "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
1922         &unscheduled_flushes_);
1923   }
1924 
1925   // special case -- if high-pri (flush) thread pool is empty, then schedule
1926   // flushes in low-pri (compaction) thread pool.
1927   if (is_flush_pool_empty) {
1928     while (unscheduled_flushes_ > 0 &&
1929            bg_flush_scheduled_ + bg_compaction_scheduled_ <
1930                bg_job_limits.max_flushes) {
1931       bg_flush_scheduled_++;
1932       FlushThreadArg* fta = new FlushThreadArg;
1933       fta->db_ = this;
1934       fta->thread_pri_ = Env::Priority::LOW;
1935       env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
1936                      &DBImpl::UnscheduleFlushCallback);
1937       --unscheduled_flushes_;
1938     }
1939   }
1940 
1941   if (bg_compaction_paused_ > 0) {
1942     // we paused the background compaction
1943     return;
1944   } else if (error_handler_.IsBGWorkStopped()) {
1945     // Compaction is not part of the recovery sequence from a hard error. We
1946     // might get here because recovery might do a flush and install a new
1947     // super version, which will try to schedule pending compactions. Bail
1948     // out here and let the higher level recovery handle compactions
1949     return;
1950   }
1951 
1952   if (HasExclusiveManualCompaction()) {
1953     // only manual compactions are allowed to run. don't schedule automatic
1954     // compactions
1955     TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
1956     return;
1957   }
1958 
1959   while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
1960          unscheduled_compactions_ > 0) {
1961     CompactionArg* ca = new CompactionArg;
1962     ca->db = this;
1963     ca->prepicked_compaction = nullptr;
1964     bg_compaction_scheduled_++;
1965     unscheduled_compactions_--;
1966     env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
1967                    &DBImpl::UnscheduleCompactionCallback);
1968   }
1969 }
1970 
GetBGJobLimits() const1971 DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
1972   mutex_.AssertHeld();
1973   return GetBGJobLimits(immutable_db_options_.max_background_flushes,
1974                         mutable_db_options_.max_background_compactions,
1975                         mutable_db_options_.max_background_jobs,
1976                         write_controller_.NeedSpeedupCompaction());
1977 }
1978 
GetBGJobLimits(int max_background_flushes,int max_background_compactions,int max_background_jobs,bool parallelize_compactions)1979 DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
1980                                            int max_background_compactions,
1981                                            int max_background_jobs,
1982                                            bool parallelize_compactions) {
1983   BGJobLimits res;
1984   if (max_background_flushes == -1 && max_background_compactions == -1) {
1985     // for our first stab implementing max_background_jobs, simply allocate a
1986     // quarter of the threads to flushes.
1987     res.max_flushes = std::max(1, max_background_jobs / 4);
1988     res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
1989   } else {
1990     // compatibility code in case users haven't migrated to max_background_jobs,
1991     // which automatically computes flush/compaction limits
1992     res.max_flushes = std::max(1, max_background_flushes);
1993     res.max_compactions = std::max(1, max_background_compactions);
1994   }
1995   if (!parallelize_compactions) {
1996     // throttle background compactions until we deem necessary
1997     res.max_compactions = 1;
1998   }
1999   return res;
2000 }
2001 
AddToCompactionQueue(ColumnFamilyData * cfd)2002 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2003   assert(!cfd->queued_for_compaction());
2004   cfd->Ref();
2005   compaction_queue_.push_back(cfd);
2006   cfd->set_queued_for_compaction(true);
2007 }
2008 
PopFirstFromCompactionQueue()2009 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2010   assert(!compaction_queue_.empty());
2011   auto cfd = *compaction_queue_.begin();
2012   compaction_queue_.pop_front();
2013   assert(cfd->queued_for_compaction());
2014   cfd->set_queued_for_compaction(false);
2015   return cfd;
2016 }
2017 
PopFirstFromFlushQueue()2018 DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
2019   assert(!flush_queue_.empty());
2020   FlushRequest flush_req = flush_queue_.front();
2021   flush_queue_.pop_front();
2022   // TODO: need to unset flush reason?
2023   return flush_req;
2024 }
2025 
PickCompactionFromQueue(std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)2026 ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2027     std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2028   assert(!compaction_queue_.empty());
2029   assert(*token == nullptr);
2030   autovector<ColumnFamilyData*> throttled_candidates;
2031   ColumnFamilyData* cfd = nullptr;
2032   while (!compaction_queue_.empty()) {
2033     auto first_cfd = *compaction_queue_.begin();
2034     compaction_queue_.pop_front();
2035     assert(first_cfd->queued_for_compaction());
2036     if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2037       throttled_candidates.push_back(first_cfd);
2038       continue;
2039     }
2040     cfd = first_cfd;
2041     cfd->set_queued_for_compaction(false);
2042     break;
2043   }
2044   // Add throttled compaction candidates back to queue in the original order.
2045   for (auto iter = throttled_candidates.rbegin();
2046        iter != throttled_candidates.rend(); ++iter) {
2047     compaction_queue_.push_front(*iter);
2048   }
2049   return cfd;
2050 }
2051 
SchedulePendingFlush(const FlushRequest & flush_req,FlushReason flush_reason)2052 void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2053                                   FlushReason flush_reason) {
2054   if (flush_req.empty()) {
2055     return;
2056   }
2057   for (auto& iter : flush_req) {
2058     ColumnFamilyData* cfd = iter.first;
2059     cfd->Ref();
2060     cfd->SetFlushReason(flush_reason);
2061   }
2062   ++unscheduled_flushes_;
2063   flush_queue_.push_back(flush_req);
2064 }
2065 
SchedulePendingCompaction(ColumnFamilyData * cfd)2066 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2067   if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
2068     AddToCompactionQueue(cfd);
2069     ++unscheduled_compactions_;
2070   }
2071 }
2072 
SchedulePendingPurge(std::string fname,std::string dir_to_sync,FileType type,uint64_t number,int job_id)2073 void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2074                                   FileType type, uint64_t number, int job_id) {
2075   mutex_.AssertHeld();
2076   PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
2077   purge_files_.insert({{number, std::move(file_info)}});
2078 }
2079 
BGWorkFlush(void * arg)2080 void DBImpl::BGWorkFlush(void* arg) {
2081   FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2082   delete reinterpret_cast<FlushThreadArg*>(arg);
2083 
2084   IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
2085   TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2086   static_cast_with_check<DBImpl, DB>(fta.db_)->BackgroundCallFlush(
2087       fta.thread_pri_);
2088   TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2089 }
2090 
BGWorkCompaction(void * arg)2091 void DBImpl::BGWorkCompaction(void* arg) {
2092   CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2093   delete reinterpret_cast<CompactionArg*>(arg);
2094   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2095   TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2096   auto prepicked_compaction =
2097       static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2098   static_cast_with_check<DBImpl, DB>(ca.db)->BackgroundCallCompaction(
2099       prepicked_compaction, Env::Priority::LOW);
2100   delete prepicked_compaction;
2101 }
2102 
BGWorkBottomCompaction(void * arg)2103 void DBImpl::BGWorkBottomCompaction(void* arg) {
2104   CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2105   delete static_cast<CompactionArg*>(arg);
2106   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2107   TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2108   auto* prepicked_compaction = ca.prepicked_compaction;
2109   assert(prepicked_compaction && prepicked_compaction->compaction &&
2110          !prepicked_compaction->manual_compaction_state);
2111   ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2112   delete prepicked_compaction;
2113 }
2114 
BGWorkPurge(void * db)2115 void DBImpl::BGWorkPurge(void* db) {
2116   IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2117   TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2118   reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2119   TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2120 }
2121 
UnscheduleCompactionCallback(void * arg)2122 void DBImpl::UnscheduleCompactionCallback(void* arg) {
2123   CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2124   delete reinterpret_cast<CompactionArg*>(arg);
2125   if (ca.prepicked_compaction != nullptr) {
2126     if (ca.prepicked_compaction->compaction != nullptr) {
2127       delete ca.prepicked_compaction->compaction;
2128     }
2129     delete ca.prepicked_compaction;
2130   }
2131   TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2132 }
2133 
UnscheduleFlushCallback(void * arg)2134 void DBImpl::UnscheduleFlushCallback(void* arg) {
2135   delete reinterpret_cast<FlushThreadArg*>(arg);
2136   TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2137 }
2138 
BackgroundFlush(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,FlushReason * reason,Env::Priority thread_pri)2139 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2140                                LogBuffer* log_buffer, FlushReason* reason,
2141                                Env::Priority thread_pri) {
2142   mutex_.AssertHeld();
2143 
2144   Status status;
2145   *reason = FlushReason::kOthers;
2146   // If BG work is stopped due to an error, but a recovery is in progress,
2147   // that means this flush is part of the recovery. So allow it to go through
2148   if (!error_handler_.IsBGWorkStopped()) {
2149     if (shutting_down_.load(std::memory_order_acquire)) {
2150       status = Status::ShutdownInProgress();
2151     }
2152   } else if (!error_handler_.IsRecoveryInProgress()) {
2153     status = error_handler_.GetBGError();
2154   }
2155 
2156   if (!status.ok()) {
2157     return status;
2158   }
2159 
2160   autovector<BGFlushArg> bg_flush_args;
2161   std::vector<SuperVersionContext>& superversion_contexts =
2162       job_context->superversion_contexts;
2163   autovector<ColumnFamilyData*> column_families_not_to_flush;
2164   while (!flush_queue_.empty()) {
2165     // This cfd is already referenced
2166     const FlushRequest& flush_req = PopFirstFromFlushQueue();
2167     superversion_contexts.clear();
2168     superversion_contexts.reserve(flush_req.size());
2169 
2170     for (const auto& iter : flush_req) {
2171       ColumnFamilyData* cfd = iter.first;
2172       if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2173         // can't flush this CF, try next one
2174         column_families_not_to_flush.push_back(cfd);
2175         continue;
2176       }
2177       superversion_contexts.emplace_back(SuperVersionContext(true));
2178       bg_flush_args.emplace_back(cfd, iter.second,
2179                                  &(superversion_contexts.back()));
2180     }
2181     if (!bg_flush_args.empty()) {
2182       break;
2183     }
2184   }
2185 
2186   if (!bg_flush_args.empty()) {
2187     auto bg_job_limits = GetBGJobLimits();
2188     for (const auto& arg : bg_flush_args) {
2189       ColumnFamilyData* cfd = arg.cfd_;
2190       ROCKS_LOG_BUFFER(
2191           log_buffer,
2192           "Calling FlushMemTableToOutputFile with column "
2193           "family [%s], flush slots available %d, compaction slots available "
2194           "%d, "
2195           "flush slots scheduled %d, compaction slots scheduled %d",
2196           cfd->GetName().c_str(), bg_job_limits.max_flushes,
2197           bg_job_limits.max_compactions, bg_flush_scheduled_,
2198           bg_compaction_scheduled_);
2199     }
2200     status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2201                                          job_context, log_buffer, thread_pri);
2202     TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2203     // All the CFDs in the FlushReq must have the same flush reason, so just
2204     // grab the first one
2205     *reason = bg_flush_args[0].cfd_->GetFlushReason();
2206     for (auto& arg : bg_flush_args) {
2207       ColumnFamilyData* cfd = arg.cfd_;
2208       if (cfd->UnrefAndTryDelete()) {
2209         arg.cfd_ = nullptr;
2210       }
2211     }
2212   }
2213   for (auto cfd : column_families_not_to_flush) {
2214     cfd->UnrefAndTryDelete();
2215   }
2216   return status;
2217 }
2218 
BackgroundCallFlush(Env::Priority thread_pri)2219 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
2220   bool made_progress = false;
2221   JobContext job_context(next_job_id_.fetch_add(1), true);
2222 
2223   TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2224 
2225   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2226                        immutable_db_options_.info_log.get());
2227   {
2228     InstrumentedMutexLock l(&mutex_);
2229     assert(bg_flush_scheduled_);
2230     num_running_flushes_++;
2231 
2232     std::unique_ptr<std::list<uint64_t>::iterator>
2233         pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2234             CaptureCurrentFileNumberInPendingOutputs()));
2235     FlushReason reason;
2236 
2237     Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2238                                &reason, thread_pri);
2239     if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2240         reason != FlushReason::kErrorRecovery) {
2241       // Wait a little bit before retrying background flush in
2242       // case this is an environmental problem and we do not want to
2243       // chew up resources for failed flushes for the duration of
2244       // the problem.
2245       uint64_t error_cnt =
2246           default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2247       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2248       mutex_.Unlock();
2249       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2250                       "Waiting after background flush error: %s"
2251                       "Accumulated background error counts: %" PRIu64,
2252                       s.ToString().c_str(), error_cnt);
2253       log_buffer.FlushBufferToLog();
2254       LogFlush(immutable_db_options_.info_log);
2255       env_->SleepForMicroseconds(1000000);
2256       mutex_.Lock();
2257     }
2258 
2259     TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2260     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2261 
2262     // If flush failed, we want to delete all temporary files that we might have
2263     // created. Thus, we force full scan in FindObsoleteFiles()
2264     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2265                                         !s.IsColumnFamilyDropped());
2266     // delete unnecessary files if any, this is done outside the mutex
2267     if (job_context.HaveSomethingToClean() ||
2268         job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2269       mutex_.Unlock();
2270       TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2271       // Have to flush the info logs before bg_flush_scheduled_--
2272       // because if bg_flush_scheduled_ becomes 0 and the lock is
2273       // released, the deconstructor of DB can kick in and destroy all the
2274       // states of DB so info_log might not be available after that point.
2275       // It also applies to access other states that DB owns.
2276       log_buffer.FlushBufferToLog();
2277       if (job_context.HaveSomethingToDelete()) {
2278         PurgeObsoleteFiles(job_context);
2279       }
2280       job_context.Clean();
2281       mutex_.Lock();
2282     }
2283     TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2284 
2285     assert(num_running_flushes_ > 0);
2286     num_running_flushes_--;
2287     bg_flush_scheduled_--;
2288     // See if there's more work to be done
2289     MaybeScheduleFlushOrCompaction();
2290     atomic_flush_install_cv_.SignalAll();
2291     bg_cv_.SignalAll();
2292     // IMPORTANT: there should be no code after calling SignalAll. This call may
2293     // signal the DB destructor that it's OK to proceed with destruction. In
2294     // that case, all DB variables will be dealloacated and referencing them
2295     // will cause trouble.
2296   }
2297 }
2298 
BackgroundCallCompaction(PrepickedCompaction * prepicked_compaction,Env::Priority bg_thread_pri)2299 void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2300                                       Env::Priority bg_thread_pri) {
2301   bool made_progress = false;
2302   JobContext job_context(next_job_id_.fetch_add(1), true);
2303   TEST_SYNC_POINT("BackgroundCallCompaction:0");
2304   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2305                        immutable_db_options_.info_log.get());
2306   {
2307     InstrumentedMutexLock l(&mutex_);
2308 
2309     // This call will unlock/lock the mutex to wait for current running
2310     // IngestExternalFile() calls to finish.
2311     WaitForIngestFile();
2312 
2313     num_running_compactions_++;
2314 
2315     std::unique_ptr<std::list<uint64_t>::iterator>
2316         pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2317             CaptureCurrentFileNumberInPendingOutputs()));
2318 
2319     assert((bg_thread_pri == Env::Priority::BOTTOM &&
2320             bg_bottom_compaction_scheduled_) ||
2321            (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2322     Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2323                                     prepicked_compaction, bg_thread_pri);
2324     TEST_SYNC_POINT("BackgroundCallCompaction:1");
2325     if (s.IsBusy()) {
2326       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2327       mutex_.Unlock();
2328       env_->SleepForMicroseconds(10000);  // prevent hot loop
2329       mutex_.Lock();
2330     } else if (!s.ok() && !s.IsShutdownInProgress() &&
2331                !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
2332       // Wait a little bit before retrying background compaction in
2333       // case this is an environmental problem and we do not want to
2334       // chew up resources for failed compactions for the duration of
2335       // the problem.
2336       uint64_t error_cnt =
2337           default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2338       bg_cv_.SignalAll();  // In case a waiter can proceed despite the error
2339       mutex_.Unlock();
2340       log_buffer.FlushBufferToLog();
2341       ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2342                       "Waiting after background compaction error: %s, "
2343                       "Accumulated background error counts: %" PRIu64,
2344                       s.ToString().c_str(), error_cnt);
2345       LogFlush(immutable_db_options_.info_log);
2346       env_->SleepForMicroseconds(1000000);
2347       mutex_.Lock();
2348     } else if (s.IsManualCompactionPaused()) {
2349       ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2350       assert(m);
2351       ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2352                        m->cfd->GetName().c_str(), job_context.job_id);
2353     }
2354 
2355     ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2356 
2357     // If compaction failed, we want to delete all temporary files that we might
2358     // have created (they might not be all recorded in job_context in case of a
2359     // failure). Thus, we force full scan in FindObsoleteFiles()
2360     FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2361                                         !s.IsManualCompactionPaused() &&
2362                                         !s.IsColumnFamilyDropped());
2363     TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2364 
2365     // delete unnecessary files if any, this is done outside the mutex
2366     if (job_context.HaveSomethingToClean() ||
2367         job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2368       mutex_.Unlock();
2369       // Have to flush the info logs before bg_compaction_scheduled_--
2370       // because if bg_flush_scheduled_ becomes 0 and the lock is
2371       // released, the deconstructor of DB can kick in and destroy all the
2372       // states of DB so info_log might not be available after that point.
2373       // It also applies to access other states that DB owns.
2374       log_buffer.FlushBufferToLog();
2375       if (job_context.HaveSomethingToDelete()) {
2376         PurgeObsoleteFiles(job_context);
2377         TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2378       }
2379       job_context.Clean();
2380       mutex_.Lock();
2381     }
2382 
2383     assert(num_running_compactions_ > 0);
2384     num_running_compactions_--;
2385     if (bg_thread_pri == Env::Priority::LOW) {
2386       bg_compaction_scheduled_--;
2387     } else {
2388       assert(bg_thread_pri == Env::Priority::BOTTOM);
2389       bg_bottom_compaction_scheduled_--;
2390     }
2391 
2392     versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2393 
2394     // See if there's more work to be done
2395     MaybeScheduleFlushOrCompaction();
2396     if (made_progress ||
2397         (bg_compaction_scheduled_ == 0 &&
2398          bg_bottom_compaction_scheduled_ == 0) ||
2399         HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
2400       // signal if
2401       // * made_progress -- need to wakeup DelayWrite
2402       // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2403       // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2404       // If none of this is true, there is no need to signal since nobody is
2405       // waiting for it
2406       bg_cv_.SignalAll();
2407     }
2408     // IMPORTANT: there should be no code after calling SignalAll. This call may
2409     // signal the DB destructor that it's OK to proceed with destruction. In
2410     // that case, all DB variables will be dealloacated and referencing them
2411     // will cause trouble.
2412   }
2413 }
2414 
BackgroundCompaction(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,PrepickedCompaction * prepicked_compaction,Env::Priority thread_pri)2415 Status DBImpl::BackgroundCompaction(bool* made_progress,
2416                                     JobContext* job_context,
2417                                     LogBuffer* log_buffer,
2418                                     PrepickedCompaction* prepicked_compaction,
2419                                     Env::Priority thread_pri) {
2420   ManualCompactionState* manual_compaction =
2421       prepicked_compaction == nullptr
2422           ? nullptr
2423           : prepicked_compaction->manual_compaction_state;
2424   *made_progress = false;
2425   mutex_.AssertHeld();
2426   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2427 
2428   bool is_manual = (manual_compaction != nullptr);
2429   std::unique_ptr<Compaction> c;
2430   if (prepicked_compaction != nullptr &&
2431       prepicked_compaction->compaction != nullptr) {
2432     c.reset(prepicked_compaction->compaction);
2433   }
2434   bool is_prepicked = is_manual || c;
2435 
2436   // (manual_compaction->in_progress == false);
2437   bool trivial_move_disallowed =
2438       is_manual && manual_compaction->disallow_trivial_move;
2439 
2440   CompactionJobStats compaction_job_stats;
2441   Status status;
2442   if (!error_handler_.IsBGWorkStopped()) {
2443     if (shutting_down_.load(std::memory_order_acquire)) {
2444       status = Status::ShutdownInProgress();
2445     } else if (is_manual &&
2446                manual_compaction_paused_.load(std::memory_order_acquire)) {
2447       status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2448     }
2449   } else {
2450     status = error_handler_.GetBGError();
2451     // If we get here, it means a hard error happened after this compaction
2452     // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2453     // a chance to execute. Since we didn't pop a cfd from the compaction
2454     // queue, increment unscheduled_compactions_
2455     unscheduled_compactions_++;
2456   }
2457 
2458   if (!status.ok()) {
2459     if (is_manual) {
2460       manual_compaction->status = status;
2461       manual_compaction->done = true;
2462       manual_compaction->in_progress = false;
2463       manual_compaction = nullptr;
2464     }
2465     if (c) {
2466       c->ReleaseCompactionFiles(status);
2467       c.reset();
2468     }
2469     return status;
2470   }
2471 
2472   if (is_manual) {
2473     // another thread cannot pick up the same work
2474     manual_compaction->in_progress = true;
2475   }
2476 
2477   std::unique_ptr<TaskLimiterToken> task_token;
2478 
2479   // InternalKey manual_end_storage;
2480   // InternalKey* manual_end = &manual_end_storage;
2481   bool sfm_reserved_compact_space = false;
2482   if (is_manual) {
2483     ManualCompactionState* m = manual_compaction;
2484     assert(m->in_progress);
2485     if (!c) {
2486       m->done = true;
2487       m->manual_end = nullptr;
2488       ROCKS_LOG_BUFFER(log_buffer,
2489                        "[%s] Manual compaction from level-%d from %s .. "
2490                        "%s; nothing to do\n",
2491                        m->cfd->GetName().c_str(), m->input_level,
2492                        (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2493                        (m->end ? m->end->DebugString().c_str() : "(end)"));
2494     } else {
2495       // First check if we have enough room to do the compaction
2496       bool enough_room = EnoughRoomForCompaction(
2497           m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2498 
2499       if (!enough_room) {
2500         // Then don't do the compaction
2501         c->ReleaseCompactionFiles(status);
2502         c.reset();
2503         // m's vars will get set properly at the end of this function,
2504         // as long as status == CompactionTooLarge
2505         status = Status::CompactionTooLarge();
2506       } else {
2507         ROCKS_LOG_BUFFER(
2508             log_buffer,
2509             "[%s] Manual compaction from level-%d to level-%d from %s .. "
2510             "%s; will stop at %s\n",
2511             m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2512             (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
2513             (m->end ? m->end->DebugString().c_str() : "(end)"),
2514             ((m->done || m->manual_end == nullptr)
2515                  ? "(end)"
2516                  : m->manual_end->DebugString().c_str()));
2517       }
2518     }
2519   } else if (!is_prepicked && !compaction_queue_.empty()) {
2520     if (HasExclusiveManualCompaction()) {
2521       // Can't compact right now, but try again later
2522       TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2523 
2524       // Stay in the compaction queue.
2525       unscheduled_compactions_++;
2526 
2527       return Status::OK();
2528     }
2529 
2530     auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2531     if (cfd == nullptr) {
2532       // Can't find any executable task from the compaction queue.
2533       // All tasks have been throttled by compaction thread limiter.
2534       ++unscheduled_compactions_;
2535       return Status::Busy();
2536     }
2537 
2538     // We unreference here because the following code will take a Ref() on
2539     // this cfd if it is going to use it (Compaction class holds a
2540     // reference).
2541     // This will all happen under a mutex so we don't have to be afraid of
2542     // somebody else deleting it.
2543     if (cfd->UnrefAndTryDelete()) {
2544       // This was the last reference of the column family, so no need to
2545       // compact.
2546       return Status::OK();
2547     }
2548 
2549     // Pick up latest mutable CF Options and use it throughout the
2550     // compaction job
2551     // Compaction makes a copy of the latest MutableCFOptions. It should be used
2552     // throughout the compaction procedure to make sure consistency. It will
2553     // eventually be installed into SuperVersion
2554     auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2555     if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2556       // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2557       // compaction is not necessary. Need to make sure mutex is held
2558       // until we make a copy in the following code
2559       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2560       c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
2561       TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2562 
2563       if (c != nullptr) {
2564         bool enough_room = EnoughRoomForCompaction(
2565             cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2566 
2567         if (!enough_room) {
2568           // Then don't do the compaction
2569           c->ReleaseCompactionFiles(status);
2570           c->column_family_data()
2571               ->current()
2572               ->storage_info()
2573               ->ComputeCompactionScore(*(c->immutable_cf_options()),
2574                                        *(c->mutable_cf_options()));
2575           AddToCompactionQueue(cfd);
2576           ++unscheduled_compactions_;
2577 
2578           c.reset();
2579           // Don't need to sleep here, because BackgroundCallCompaction
2580           // will sleep if !s.ok()
2581           status = Status::CompactionTooLarge();
2582         } else {
2583           // update statistics
2584           RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2585                             c->inputs(0)->size());
2586           // There are three things that can change compaction score:
2587           // 1) When flush or compaction finish. This case is covered by
2588           // InstallSuperVersionAndScheduleWork
2589           // 2) When MutableCFOptions changes. This case is also covered by
2590           // InstallSuperVersionAndScheduleWork, because this is when the new
2591           // options take effect.
2592           // 3) When we Pick a new compaction, we "remove" those files being
2593           // compacted from the calculation, which then influences compaction
2594           // score. Here we check if we need the new compaction even without the
2595           // files that are currently being compacted. If we need another
2596           // compaction, we might be able to execute it in parallel, so we add
2597           // it to the queue and schedule a new thread.
2598           if (cfd->NeedsCompaction()) {
2599             // Yes, we need more compactions!
2600             AddToCompactionQueue(cfd);
2601             ++unscheduled_compactions_;
2602             MaybeScheduleFlushOrCompaction();
2603           }
2604         }
2605       }
2606     }
2607   }
2608 
2609   if (!c) {
2610     // Nothing to do
2611     ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2612   } else if (c->deletion_compaction()) {
2613     // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2614     // file if there is alive snapshot pointing to it
2615     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2616                              c->column_family_data());
2617     assert(c->num_input_files(1) == 0);
2618     assert(c->level() == 0);
2619     assert(c->column_family_data()->ioptions()->compaction_style ==
2620            kCompactionStyleFIFO);
2621 
2622     compaction_job_stats.num_input_files = c->num_input_files(0);
2623 
2624     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2625                             compaction_job_stats, job_context->job_id);
2626 
2627     for (const auto& f : *c->inputs(0)) {
2628       c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
2629     }
2630     status = versions_->LogAndApply(c->column_family_data(),
2631                                     *c->mutable_cf_options(), c->edit(),
2632                                     &mutex_, directories_.GetDbDir());
2633     InstallSuperVersionAndScheduleWork(c->column_family_data(),
2634                                        &job_context->superversion_contexts[0],
2635                                        *c->mutable_cf_options());
2636     ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
2637                      c->column_family_data()->GetName().c_str(),
2638                      c->num_input_files(0));
2639     *made_progress = true;
2640     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2641                              c->column_family_data());
2642   } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
2643     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
2644     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2645                              c->column_family_data());
2646     // Instrument for event update
2647     // TODO(yhchiang): add op details for showing trivial-move.
2648     ThreadStatusUtil::SetColumnFamily(
2649         c->column_family_data(), c->column_family_data()->ioptions()->env,
2650         immutable_db_options_.enable_thread_tracking);
2651     ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
2652 
2653     compaction_job_stats.num_input_files = c->num_input_files(0);
2654 
2655     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2656                             compaction_job_stats, job_context->job_id);
2657 
2658     // Move files to next level
2659     int32_t moved_files = 0;
2660     int64_t moved_bytes = 0;
2661     for (unsigned int l = 0; l < c->num_input_levels(); l++) {
2662       if (c->level(l) == c->output_level()) {
2663         continue;
2664       }
2665       for (size_t i = 0; i < c->num_input_files(l); i++) {
2666         FileMetaData* f = c->input(l, i);
2667         c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
2668         c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
2669                            f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
2670                            f->largest, f->fd.smallest_seqno,
2671                            f->fd.largest_seqno, f->marked_for_compaction,
2672                            f->oldest_blob_file_number, f->oldest_ancester_time,
2673                            f->file_creation_time, f->file_checksum,
2674                            f->file_checksum_func_name);
2675 
2676         ROCKS_LOG_BUFFER(
2677             log_buffer,
2678             "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
2679             c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
2680             c->output_level(), f->fd.GetFileSize());
2681         ++moved_files;
2682         moved_bytes += f->fd.GetFileSize();
2683       }
2684     }
2685 
2686     status = versions_->LogAndApply(c->column_family_data(),
2687                                     *c->mutable_cf_options(), c->edit(),
2688                                     &mutex_, directories_.GetDbDir());
2689     // Use latest MutableCFOptions
2690     InstallSuperVersionAndScheduleWork(c->column_family_data(),
2691                                        &job_context->superversion_contexts[0],
2692                                        *c->mutable_cf_options());
2693 
2694     VersionStorageInfo::LevelSummaryStorage tmp;
2695     c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
2696                                                              moved_bytes);
2697     {
2698       event_logger_.LogToBuffer(log_buffer)
2699           << "job" << job_context->job_id << "event"
2700           << "trivial_move"
2701           << "destination_level" << c->output_level() << "files" << moved_files
2702           << "total_files_size" << moved_bytes;
2703     }
2704     ROCKS_LOG_BUFFER(
2705         log_buffer,
2706         "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
2707         c->column_family_data()->GetName().c_str(), moved_files,
2708         c->output_level(), moved_bytes, status.ToString().c_str(),
2709         c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
2710     *made_progress = true;
2711 
2712     // Clear Instrument
2713     ThreadStatusUtil::ResetThreadStatus();
2714     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2715                              c->column_family_data());
2716   } else if (!is_prepicked && c->output_level() > 0 &&
2717              c->output_level() ==
2718                  c->column_family_data()
2719                      ->current()
2720                      ->storage_info()
2721                      ->MaxOutputLevel(
2722                          immutable_db_options_.allow_ingest_behind) &&
2723              env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
2724     // Forward compactions involving last level to the bottom pool if it exists,
2725     // such that compactions unlikely to contribute to write stalls can be
2726     // delayed or deprioritized.
2727     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
2728     CompactionArg* ca = new CompactionArg;
2729     ca->db = this;
2730     ca->prepicked_compaction = new PrepickedCompaction;
2731     ca->prepicked_compaction->compaction = c.release();
2732     ca->prepicked_compaction->manual_compaction_state = nullptr;
2733     // Transfer requested token, so it doesn't need to do it again.
2734     ca->prepicked_compaction->task_token = std::move(task_token);
2735     ++bg_bottom_compaction_scheduled_;
2736     env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
2737                    this, &DBImpl::UnscheduleCompactionCallback);
2738   } else {
2739     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2740                              c->column_family_data());
2741     int output_level __attribute__((__unused__));
2742     output_level = c->output_level();
2743     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
2744                              &output_level);
2745     std::vector<SequenceNumber> snapshot_seqs;
2746     SequenceNumber earliest_write_conflict_snapshot;
2747     SnapshotChecker* snapshot_checker;
2748     GetSnapshotContext(job_context, &snapshot_seqs,
2749                        &earliest_write_conflict_snapshot, &snapshot_checker);
2750     assert(is_snapshot_supported_ || snapshots_.empty());
2751     CompactionJob compaction_job(
2752         job_context->job_id, c.get(), immutable_db_options_,
2753         file_options_for_compaction_, versions_.get(), &shutting_down_,
2754         preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
2755         GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
2756         &mutex_, &error_handler_, snapshot_seqs,
2757         earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
2758         &event_logger_, c->mutable_cf_options()->paranoid_file_checks,
2759         c->mutable_cf_options()->report_bg_io_stats, dbname_,
2760         &compaction_job_stats, thread_pri,
2761         is_manual ? &manual_compaction_paused_ : nullptr);
2762     compaction_job.Prepare();
2763 
2764     NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
2765                             compaction_job_stats, job_context->job_id);
2766 
2767     mutex_.Unlock();
2768     TEST_SYNC_POINT_CALLBACK(
2769         "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
2770     compaction_job.Run();
2771     TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
2772     mutex_.Lock();
2773 
2774     status = compaction_job.Install(*c->mutable_cf_options());
2775     if (status.ok()) {
2776       InstallSuperVersionAndScheduleWork(c->column_family_data(),
2777                                          &job_context->superversion_contexts[0],
2778                                          *c->mutable_cf_options());
2779     }
2780     *made_progress = true;
2781     TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
2782                              c->column_family_data());
2783   }
2784   if (c != nullptr) {
2785     c->ReleaseCompactionFiles(status);
2786     *made_progress = true;
2787 
2788 #ifndef ROCKSDB_LITE
2789     // Need to make sure SstFileManager does its bookkeeping
2790     auto sfm = static_cast<SstFileManagerImpl*>(
2791         immutable_db_options_.sst_file_manager.get());
2792     if (sfm && sfm_reserved_compact_space) {
2793       sfm->OnCompactionCompletion(c.get());
2794     }
2795 #endif  // ROCKSDB_LITE
2796 
2797     NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
2798                                 compaction_job_stats, job_context->job_id);
2799   }
2800 
2801   if (status.ok() || status.IsCompactionTooLarge() ||
2802       status.IsManualCompactionPaused()) {
2803     // Done
2804   } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
2805     // Ignore compaction errors found during shutting down
2806   } else {
2807     ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
2808                    status.ToString().c_str());
2809     error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
2810     if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
2811       // Put this cfd back in the compaction queue so we can retry after some
2812       // time
2813       auto cfd = c->column_family_data();
2814       assert(cfd != nullptr);
2815       // Since this compaction failed, we need to recompute the score so it
2816       // takes the original input files into account
2817       c->column_family_data()
2818           ->current()
2819           ->storage_info()
2820           ->ComputeCompactionScore(*(c->immutable_cf_options()),
2821                                    *(c->mutable_cf_options()));
2822       if (!cfd->queued_for_compaction()) {
2823         AddToCompactionQueue(cfd);
2824         ++unscheduled_compactions_;
2825       }
2826     }
2827   }
2828   // this will unref its input_version and column_family_data
2829   c.reset();
2830 
2831   if (is_manual) {
2832     ManualCompactionState* m = manual_compaction;
2833     if (!status.ok()) {
2834       m->status = status;
2835       m->done = true;
2836     }
2837     // For universal compaction:
2838     //   Because universal compaction always happens at level 0, so one
2839     //   compaction will pick up all overlapped files. No files will be
2840     //   filtered out due to size limit and left for a successive compaction.
2841     //   So we can safely conclude the current compaction.
2842     //
2843     //   Also note that, if we don't stop here, then the current compaction
2844     //   writes a new file back to level 0, which will be used in successive
2845     //   compaction. Hence the manual compaction will never finish.
2846     //
2847     // Stop the compaction if manual_end points to nullptr -- this means
2848     // that we compacted the whole range. manual_end should always point
2849     // to nullptr in case of universal compaction
2850     if (m->manual_end == nullptr) {
2851       m->done = true;
2852     }
2853     if (!m->done) {
2854       // We only compacted part of the requested range.  Update *m
2855       // to the range that is left to be compacted.
2856       // Universal and FIFO compactions should always compact the whole range
2857       assert(m->cfd->ioptions()->compaction_style !=
2858                  kCompactionStyleUniversal ||
2859              m->cfd->ioptions()->num_levels > 1);
2860       assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
2861       m->tmp_storage = *m->manual_end;
2862       m->begin = &m->tmp_storage;
2863       m->incomplete = true;
2864     }
2865     m->in_progress = false;  // not being processed anymore
2866   }
2867   TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
2868   return status;
2869 }
2870 
HasPendingManualCompaction()2871 bool DBImpl::HasPendingManualCompaction() {
2872   return (!manual_compaction_dequeue_.empty());
2873 }
2874 
AddManualCompaction(DBImpl::ManualCompactionState * m)2875 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
2876   manual_compaction_dequeue_.push_back(m);
2877 }
2878 
RemoveManualCompaction(DBImpl::ManualCompactionState * m)2879 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
2880   // Remove from queue
2881   std::deque<ManualCompactionState*>::iterator it =
2882       manual_compaction_dequeue_.begin();
2883   while (it != manual_compaction_dequeue_.end()) {
2884     if (m == (*it)) {
2885       it = manual_compaction_dequeue_.erase(it);
2886       return;
2887     }
2888     ++it;
2889   }
2890   assert(false);
2891   return;
2892 }
2893 
ShouldntRunManualCompaction(ManualCompactionState * m)2894 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
2895   if (num_running_ingest_file_ > 0) {
2896     // We need to wait for other IngestExternalFile() calls to finish
2897     // before running a manual compaction.
2898     return true;
2899   }
2900   if (m->exclusive) {
2901     return (bg_bottom_compaction_scheduled_ > 0 ||
2902             bg_compaction_scheduled_ > 0);
2903   }
2904   std::deque<ManualCompactionState*>::iterator it =
2905       manual_compaction_dequeue_.begin();
2906   bool seen = false;
2907   while (it != manual_compaction_dequeue_.end()) {
2908     if (m == (*it)) {
2909       ++it;
2910       seen = true;
2911       continue;
2912     } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
2913       // Consider the other manual compaction *it, conflicts if:
2914       // overlaps with m
2915       // and (*it) is ahead in the queue and is not yet in progress
2916       return true;
2917     }
2918     ++it;
2919   }
2920   return false;
2921 }
2922 
HaveManualCompaction(ColumnFamilyData * cfd)2923 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
2924   // Remove from priority queue
2925   std::deque<ManualCompactionState*>::iterator it =
2926       manual_compaction_dequeue_.begin();
2927   while (it != manual_compaction_dequeue_.end()) {
2928     if ((*it)->exclusive) {
2929       return true;
2930     }
2931     if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
2932       // Allow automatic compaction if manual compaction is
2933       // in progress
2934       return true;
2935     }
2936     ++it;
2937   }
2938   return false;
2939 }
2940 
HasExclusiveManualCompaction()2941 bool DBImpl::HasExclusiveManualCompaction() {
2942   // Remove from priority queue
2943   std::deque<ManualCompactionState*>::iterator it =
2944       manual_compaction_dequeue_.begin();
2945   while (it != manual_compaction_dequeue_.end()) {
2946     if ((*it)->exclusive) {
2947       return true;
2948     }
2949     ++it;
2950   }
2951   return false;
2952 }
2953 
MCOverlap(ManualCompactionState * m,ManualCompactionState * m1)2954 bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
2955   if ((m->exclusive) || (m1->exclusive)) {
2956     return true;
2957   }
2958   if (m->cfd != m1->cfd) {
2959     return false;
2960   }
2961   return true;
2962 }
2963 
2964 #ifndef ROCKSDB_LITE
BuildCompactionJobInfo(const ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id,const Version * current,CompactionJobInfo * compaction_job_info) const2965 void DBImpl::BuildCompactionJobInfo(
2966     const ColumnFamilyData* cfd, Compaction* c, const Status& st,
2967     const CompactionJobStats& compaction_job_stats, const int job_id,
2968     const Version* current, CompactionJobInfo* compaction_job_info) const {
2969   assert(compaction_job_info != nullptr);
2970   compaction_job_info->cf_id = cfd->GetID();
2971   compaction_job_info->cf_name = cfd->GetName();
2972   compaction_job_info->status = st;
2973   compaction_job_info->thread_id = env_->GetThreadID();
2974   compaction_job_info->job_id = job_id;
2975   compaction_job_info->base_input_level = c->start_level();
2976   compaction_job_info->output_level = c->output_level();
2977   compaction_job_info->stats = compaction_job_stats;
2978   compaction_job_info->table_properties = c->GetOutputTableProperties();
2979   compaction_job_info->compaction_reason = c->compaction_reason();
2980   compaction_job_info->compression = c->output_compression();
2981   for (size_t i = 0; i < c->num_input_levels(); ++i) {
2982     for (const auto fmd : *c->inputs(i)) {
2983       const FileDescriptor& desc = fmd->fd;
2984       const uint64_t file_number = desc.GetNumber();
2985       auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
2986                               desc.GetPathId());
2987       compaction_job_info->input_files.push_back(fn);
2988       compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
2989           static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
2990       if (compaction_job_info->table_properties.count(fn) == 0) {
2991         std::shared_ptr<const TableProperties> tp;
2992         auto s = current->GetTableProperties(&tp, fmd, &fn);
2993         if (s.ok()) {
2994           compaction_job_info->table_properties[fn] = tp;
2995         }
2996       }
2997     }
2998   }
2999   for (const auto& newf : c->edit()->GetNewFiles()) {
3000     const FileMetaData& meta = newf.second;
3001     const FileDescriptor& desc = meta.fd;
3002     const uint64_t file_number = desc.GetNumber();
3003     compaction_job_info->output_files.push_back(TableFileName(
3004         c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
3005     compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3006         newf.first, file_number, meta.oldest_blob_file_number});
3007   }
3008 }
3009 #endif
3010 
3011 // SuperVersionContext gets created and destructed outside of the lock --
3012 // we use this conveniently to:
3013 // * malloc one SuperVersion() outside of the lock -- new_superversion
3014 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3015 //
3016 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3017 // same sv_context, we can't reuse the SuperVersion() that got
3018 // malloced because
3019 // first call already used it. In that rare case, we take a hit and create a
3020 // new SuperVersion() inside of the mutex. We do similar thing
3021 // for superversion_to_free
3022 
InstallSuperVersionAndScheduleWork(ColumnFamilyData * cfd,SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)3023 void DBImpl::InstallSuperVersionAndScheduleWork(
3024     ColumnFamilyData* cfd, SuperVersionContext* sv_context,
3025     const MutableCFOptions& mutable_cf_options) {
3026   mutex_.AssertHeld();
3027 
3028   // Update max_total_in_memory_state_
3029   size_t old_memtable_size = 0;
3030   auto* old_sv = cfd->GetSuperVersion();
3031   if (old_sv) {
3032     old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3033                         old_sv->mutable_cf_options.max_write_buffer_number;
3034   }
3035 
3036   // this branch is unlikely to step in
3037   if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3038     sv_context->NewSuperVersion();
3039   }
3040   cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
3041 
3042   // There may be a small data race here. The snapshot tricking bottommost
3043   // compaction may already be released here. But assuming there will always be
3044   // newer snapshot created and released frequently, the compaction will be
3045   // triggered soon anyway.
3046   bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3047   for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
3048     bottommost_files_mark_threshold_ = std::min(
3049         bottommost_files_mark_threshold_,
3050         my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3051   }
3052 
3053   // Whenever we install new SuperVersion, we might need to issue new flushes or
3054   // compactions.
3055   SchedulePendingCompaction(cfd);
3056   MaybeScheduleFlushOrCompaction();
3057 
3058   // Update max_total_in_memory_state_
3059   max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3060                                mutable_cf_options.write_buffer_size *
3061                                    mutable_cf_options.max_write_buffer_number;
3062 }
3063 
3064 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3065 // and db mutex (mutex_) should already be held.
3066 // Actually, the current implementation of FindObsoleteFiles with
3067 // full_scan=true can issue I/O requests to obtain list of files in
3068 // directories, e.g. env_->getChildren while holding db mutex.
ShouldPurge(uint64_t file_number) const3069 bool DBImpl::ShouldPurge(uint64_t file_number) const {
3070   return files_grabbed_for_purge_.find(file_number) ==
3071              files_grabbed_for_purge_.end() &&
3072          purge_files_.find(file_number) == purge_files_.end();
3073 }
3074 
3075 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3076 // (mutex_) should already be held.
MarkAsGrabbedForPurge(uint64_t file_number)3077 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
3078   files_grabbed_for_purge_.insert(file_number);
3079 }
3080 
SetSnapshotChecker(SnapshotChecker * snapshot_checker)3081 void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3082   InstrumentedMutexLock l(&mutex_);
3083   // snapshot_checker_ should only set once. If we need to set it multiple
3084   // times, we need to make sure the old one is not deleted while it is still
3085   // using by a compaction job.
3086   assert(!snapshot_checker_);
3087   snapshot_checker_.reset(snapshot_checker);
3088 }
3089 
GetSnapshotContext(JobContext * job_context,std::vector<SequenceNumber> * snapshot_seqs,SequenceNumber * earliest_write_conflict_snapshot,SnapshotChecker ** snapshot_checker_ptr)3090 void DBImpl::GetSnapshotContext(
3091     JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3092     SequenceNumber* earliest_write_conflict_snapshot,
3093     SnapshotChecker** snapshot_checker_ptr) {
3094   mutex_.AssertHeld();
3095   assert(job_context != nullptr);
3096   assert(snapshot_seqs != nullptr);
3097   assert(earliest_write_conflict_snapshot != nullptr);
3098   assert(snapshot_checker_ptr != nullptr);
3099 
3100   *snapshot_checker_ptr = snapshot_checker_.get();
3101   if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3102     *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3103   }
3104   if (*snapshot_checker_ptr != nullptr) {
3105     // If snapshot_checker is used, that means the flush/compaction may
3106     // contain values not visible to snapshot taken after
3107     // flush/compaction job starts. Take a snapshot and it will appear
3108     // in snapshot_seqs and force compaction iterator to consider such
3109     // snapshots.
3110     const Snapshot* job_snapshot =
3111         GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3112     job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3113   }
3114   *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3115 }
3116 }  // namespace ROCKSDB_NAMESPACE
3117