1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include <cinttypes>
10
11 #include "db/builder.h"
12 #include "db/db_impl/db_impl.h"
13 #include "db/error_handler.h"
14 #include "db/event_helpers.h"
15 #include "file/sst_file_manager_impl.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "monitoring/perf_context_imp.h"
18 #include "monitoring/thread_status_updater.h"
19 #include "monitoring/thread_status_util.h"
20 #include "test_util/sync_point.h"
21 #include "util/cast_util.h"
22 #include "util/concurrent_task_limiter_impl.h"
23
24 namespace ROCKSDB_NAMESPACE {
25
EnoughRoomForCompaction(ColumnFamilyData * cfd,const std::vector<CompactionInputFiles> & inputs,bool * sfm_reserved_compact_space,LogBuffer * log_buffer)26 bool DBImpl::EnoughRoomForCompaction(
27 ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
28 bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
29 // Check if we have enough room to do the compaction
30 bool enough_room = true;
31 #ifndef ROCKSDB_LITE
32 auto sfm = static_cast<SstFileManagerImpl*>(
33 immutable_db_options_.sst_file_manager.get());
34 if (sfm) {
35 // Pass the current bg_error_ to SFM so it can decide what checks to
36 // perform. If this DB instance hasn't seen any error yet, the SFM can be
37 // optimistic and not do disk space checks
38 Status bg_error = error_handler_.GetBGError();
39 enough_room = sfm->EnoughRoomForCompaction(cfd, inputs, bg_error);
40 bg_error.PermitUncheckedError(); // bg_error is just a copy of the Status
41 // from the error_handler_
42 if (enough_room) {
43 *sfm_reserved_compact_space = true;
44 }
45 }
46 #else
47 (void)cfd;
48 (void)inputs;
49 (void)sfm_reserved_compact_space;
50 #endif // ROCKSDB_LITE
51 if (!enough_room) {
52 // Just in case tests want to change the value of enough_room
53 TEST_SYNC_POINT_CALLBACK(
54 "DBImpl::BackgroundCompaction():CancelledCompaction", &enough_room);
55 ROCKS_LOG_BUFFER(log_buffer,
56 "Cancelled compaction because not enough room");
57 RecordTick(stats_, COMPACTION_CANCELLED, 1);
58 }
59 return enough_room;
60 }
61
RequestCompactionToken(ColumnFamilyData * cfd,bool force,std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)62 bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
63 std::unique_ptr<TaskLimiterToken>* token,
64 LogBuffer* log_buffer) {
65 assert(*token == nullptr);
66 auto limiter = static_cast<ConcurrentTaskLimiterImpl*>(
67 cfd->ioptions()->compaction_thread_limiter.get());
68 if (limiter == nullptr) {
69 return true;
70 }
71 *token = limiter->GetToken(force);
72 if (*token != nullptr) {
73 ROCKS_LOG_BUFFER(log_buffer,
74 "Thread limiter [%s] increase [%s] compaction task, "
75 "force: %s, tasks after: %d",
76 limiter->GetName().c_str(), cfd->GetName().c_str(),
77 force ? "true" : "false", limiter->GetOutstandingTask());
78 return true;
79 }
80 return false;
81 }
82
SyncClosedLogs(JobContext * job_context)83 IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
84 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
85 mutex_.AssertHeld();
86 autovector<log::Writer*, 1> logs_to_sync;
87 uint64_t current_log_number = logfile_number_;
88 while (logs_.front().number < current_log_number &&
89 logs_.front().getting_synced) {
90 log_sync_cv_.Wait();
91 }
92 for (auto it = logs_.begin();
93 it != logs_.end() && it->number < current_log_number; ++it) {
94 auto& log = *it;
95 assert(!log.getting_synced);
96 log.getting_synced = true;
97 logs_to_sync.push_back(log.writer);
98 }
99
100 IOStatus io_s;
101 if (!logs_to_sync.empty()) {
102 mutex_.Unlock();
103
104 for (log::Writer* log : logs_to_sync) {
105 ROCKS_LOG_INFO(immutable_db_options_.info_log,
106 "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
107 log->get_log_number());
108 io_s = log->file()->Sync(immutable_db_options_.use_fsync);
109 if (!io_s.ok()) {
110 break;
111 }
112
113 if (immutable_db_options_.recycle_log_file_num > 0) {
114 io_s = log->Close();
115 if (!io_s.ok()) {
116 break;
117 }
118 }
119 }
120 if (io_s.ok()) {
121 io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
122 }
123
124 mutex_.Lock();
125
126 // "number <= current_log_number - 1" is equivalent to
127 // "number < current_log_number".
128 if (io_s.ok()) {
129 io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
130 } else {
131 MarkLogsNotSynced(current_log_number - 1);
132 }
133 if (!io_s.ok()) {
134 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
135 return io_s;
136 }
137 }
138 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end");
139 return io_s;
140 }
141
FlushMemTableToOutputFile(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,bool * made_progress,JobContext * job_context,SuperVersionContext * superversion_context,std::vector<SequenceNumber> & snapshot_seqs,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,LogBuffer * log_buffer,Env::Priority thread_pri)142 Status DBImpl::FlushMemTableToOutputFile(
143 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
144 bool* made_progress, JobContext* job_context,
145 SuperVersionContext* superversion_context,
146 std::vector<SequenceNumber>& snapshot_seqs,
147 SequenceNumber earliest_write_conflict_snapshot,
148 SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
149 Env::Priority thread_pri) {
150 mutex_.AssertHeld();
151 assert(cfd);
152 assert(cfd->imm()->NumNotFlushed() != 0);
153 assert(cfd->imm()->IsFlushPending());
154
155 FlushJob flush_job(
156 dbname_, cfd, immutable_db_options_, mutable_cf_options,
157 port::kMaxUint64 /* memtable_id */, file_options_for_compaction_,
158 versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
159 earliest_write_conflict_snapshot, snapshot_checker, job_context,
160 log_buffer, directories_.GetDbDir(), GetDataDir(cfd, 0U),
161 GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
162 &event_logger_, mutable_cf_options.report_bg_io_stats,
163 true /* sync_output_directory */, true /* write_manifest */, thread_pri,
164 io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(),
165 &blob_callback_);
166 FileMetaData file_meta;
167
168 #ifndef ROCKSDB_LITE
169 // may temporarily unlock and lock the mutex.
170 NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
171 #endif // ROCKSDB_LITE
172
173 Status s;
174 bool need_cancel = false;
175 IOStatus log_io_s = IOStatus::OK();
176 if (logfile_number_ > 0 &&
177 versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
178 // If there are more than one column families, we need to make sure that
179 // all the log files except the most recent one are synced. Otherwise if
180 // the host crashes after flushing and before WAL is persistent, the
181 // flushed SST may contain data from write batches whose updates to
182 // other column families are missing.
183 // SyncClosedLogs() may unlock and re-lock the db_mutex.
184 log_io_s = SyncClosedLogs(job_context);
185 if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
186 !log_io_s.IsColumnFamilyDropped()) {
187 error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
188 }
189 } else {
190 TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
191 }
192 s = log_io_s;
193
194 // If the log sync failed, we do not need to pick memtable. Otherwise,
195 // num_flush_not_started_ needs to be rollback.
196 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
197 if (s.ok()) {
198 flush_job.PickMemTable();
199 need_cancel = true;
200 }
201 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables");
202
203 // Within flush_job.Run, rocksdb may call event listener to notify
204 // file creation and deletion.
205 //
206 // Note that flush_job.Run will unlock and lock the db_mutex,
207 // and EventListener callback will be called when the db_mutex
208 // is unlocked by the current thread.
209 if (s.ok()) {
210 s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
211 need_cancel = false;
212 }
213
214 if (!s.ok() && need_cancel) {
215 flush_job.Cancel();
216 }
217 IOStatus io_s = IOStatus::OK();
218 io_s = flush_job.io_status();
219 if (s.ok()) {
220 s = io_s;
221 }
222
223 if (s.ok()) {
224 InstallSuperVersionAndScheduleWork(cfd, superversion_context,
225 mutable_cf_options);
226 if (made_progress) {
227 *made_progress = true;
228 }
229
230 const std::string& column_family_name = cfd->GetName();
231
232 Version* const current = cfd->current();
233 assert(current);
234
235 const VersionStorageInfo* const storage_info = current->storage_info();
236 assert(storage_info);
237
238 VersionStorageInfo::LevelSummaryStorage tmp;
239 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
240 column_family_name.c_str(),
241 storage_info->LevelSummary(&tmp));
242
243 const auto& blob_files = storage_info->GetBlobFiles();
244 if (!blob_files.empty()) {
245 ROCKS_LOG_BUFFER(log_buffer,
246 "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
247 "\n",
248 column_family_name.c_str(), blob_files.begin()->first,
249 blob_files.rbegin()->first);
250 }
251 }
252
253 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
254 if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
255 !io_s.IsColumnFamilyDropped()) {
256 assert(log_io_s.ok());
257 // Error while writing to MANIFEST.
258 // In fact, versions_->io_status() can also be the result of renaming
259 // CURRENT file. With current code, it's just difficult to tell. So just
260 // be pessimistic and try write to a new MANIFEST.
261 // TODO: distinguish between MANIFEST write and CURRENT renaming
262 if (!versions_->io_status().ok()) {
263 // If WAL sync is successful (either WAL size is 0 or there is no IO
264 // error), all the Manifest write will be map to soft error.
265 // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is
266 // needed.
267 error_handler_.SetBGError(io_s,
268 BackgroundErrorReason::kManifestWriteNoWAL);
269 } else {
270 // If WAL sync is successful (either WAL size is 0 or there is no IO
271 // error), all the other SST file write errors will be set as
272 // kFlushNoWAL.
273 error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
274 }
275 } else {
276 if (log_io_s.ok()) {
277 Status new_bg_error = s;
278 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
279 }
280 }
281 } else {
282 // If we got here, then we decided not to care about the i_os status (either
283 // from never needing it or ignoring the flush job status
284 io_s.PermitUncheckedError();
285 }
286 if (s.ok()) {
287 #ifndef ROCKSDB_LITE
288 // may temporarily unlock and lock the mutex.
289 NotifyOnFlushCompleted(cfd, mutable_cf_options,
290 flush_job.GetCommittedFlushJobsInfo());
291 auto sfm = static_cast<SstFileManagerImpl*>(
292 immutable_db_options_.sst_file_manager.get());
293 if (sfm) {
294 // Notify sst_file_manager that a new file was added
295 std::string file_path = MakeTableFileName(
296 cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
297 // TODO (PR7798). We should only add the file to the FileManager if it
298 // exists. Otherwise, some tests may fail. Ignore the error in the
299 // interim.
300 sfm->OnAddFile(file_path).PermitUncheckedError();
301 if (sfm->IsMaxAllowedSpaceReached()) {
302 Status new_bg_error =
303 Status::SpaceLimit("Max allowed space was reached");
304 TEST_SYNC_POINT_CALLBACK(
305 "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
306 &new_bg_error);
307 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
308 }
309 }
310 #endif // ROCKSDB_LITE
311 }
312 TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
313 return s;
314 }
315
FlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)316 Status DBImpl::FlushMemTablesToOutputFiles(
317 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
318 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
319 if (immutable_db_options_.atomic_flush) {
320 return AtomicFlushMemTablesToOutputFiles(
321 bg_flush_args, made_progress, job_context, log_buffer, thread_pri);
322 }
323 assert(bg_flush_args.size() == 1);
324 std::vector<SequenceNumber> snapshot_seqs;
325 SequenceNumber earliest_write_conflict_snapshot;
326 SnapshotChecker* snapshot_checker;
327 GetSnapshotContext(job_context, &snapshot_seqs,
328 &earliest_write_conflict_snapshot, &snapshot_checker);
329 const auto& bg_flush_arg = bg_flush_args[0];
330 ColumnFamilyData* cfd = bg_flush_arg.cfd_;
331 MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
332 SuperVersionContext* superversion_context =
333 bg_flush_arg.superversion_context_;
334 Status s = FlushMemTableToOutputFile(
335 cfd, mutable_cf_options, made_progress, job_context, superversion_context,
336 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
337 log_buffer, thread_pri);
338 return s;
339 }
340
341 /*
342 * Atomically flushes multiple column families.
343 *
344 * For each column family, all memtables with ID smaller than or equal to the
345 * ID specified in bg_flush_args will be flushed. Only after all column
346 * families finish flush will this function commit to MANIFEST. If any of the
347 * column families are not flushed successfully, this function does not have
348 * any side-effect on the state of the database.
349 */
AtomicFlushMemTablesToOutputFiles(const autovector<BGFlushArg> & bg_flush_args,bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,Env::Priority thread_pri)350 Status DBImpl::AtomicFlushMemTablesToOutputFiles(
351 const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
352 JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri) {
353 mutex_.AssertHeld();
354
355 autovector<ColumnFamilyData*> cfds;
356 for (const auto& arg : bg_flush_args) {
357 cfds.emplace_back(arg.cfd_);
358 }
359
360 #ifndef NDEBUG
361 for (const auto cfd : cfds) {
362 assert(cfd->imm()->NumNotFlushed() != 0);
363 assert(cfd->imm()->IsFlushPending());
364 }
365 #endif /* !NDEBUG */
366
367 std::vector<SequenceNumber> snapshot_seqs;
368 SequenceNumber earliest_write_conflict_snapshot;
369 SnapshotChecker* snapshot_checker;
370 GetSnapshotContext(job_context, &snapshot_seqs,
371 &earliest_write_conflict_snapshot, &snapshot_checker);
372
373 autovector<FSDirectory*> distinct_output_dirs;
374 autovector<std::string> distinct_output_dir_paths;
375 std::vector<std::unique_ptr<FlushJob>> jobs;
376 std::vector<MutableCFOptions> all_mutable_cf_options;
377 int num_cfs = static_cast<int>(cfds.size());
378 all_mutable_cf_options.reserve(num_cfs);
379 for (int i = 0; i < num_cfs; ++i) {
380 auto cfd = cfds[i];
381 FSDirectory* data_dir = GetDataDir(cfd, 0U);
382 const std::string& curr_path = cfd->ioptions()->cf_paths[0].path;
383
384 // Add to distinct output directories if eligible. Use linear search. Since
385 // the number of elements in the vector is not large, performance should be
386 // tolerable.
387 bool found = false;
388 for (const auto& path : distinct_output_dir_paths) {
389 if (path == curr_path) {
390 found = true;
391 break;
392 }
393 }
394 if (!found) {
395 distinct_output_dir_paths.emplace_back(curr_path);
396 distinct_output_dirs.emplace_back(data_dir);
397 }
398
399 all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
400 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
401 uint64_t max_memtable_id = bg_flush_args[i].max_memtable_id_;
402 jobs.emplace_back(new FlushJob(
403 dbname_, cfd, immutable_db_options_, mutable_cf_options,
404 max_memtable_id, file_options_for_compaction_, versions_.get(), &mutex_,
405 &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
406 snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
407 data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
408 stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
409 false /* sync_output_directory */, false /* write_manifest */,
410 thread_pri, io_tracer_, db_id_, db_session_id_,
411 cfd->GetFullHistoryTsLow()));
412 }
413
414 std::vector<FileMetaData> file_meta(num_cfs);
415 Status s;
416 IOStatus log_io_s = IOStatus::OK();
417 assert(num_cfs == static_cast<int>(jobs.size()));
418
419 #ifndef ROCKSDB_LITE
420 for (int i = 0; i != num_cfs; ++i) {
421 const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
422 // may temporarily unlock and lock the mutex.
423 NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
424 job_context->job_id);
425 }
426 #endif /* !ROCKSDB_LITE */
427
428 if (logfile_number_ > 0) {
429 // TODO (yanqin) investigate whether we should sync the closed logs for
430 // single column family case.
431 log_io_s = SyncClosedLogs(job_context);
432 if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
433 !log_io_s.IsColumnFamilyDropped()) {
434 if (total_log_size_ > 0) {
435 error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
436 } else {
437 // If the WAL is empty, we use different error reason
438 error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL);
439 }
440 }
441 }
442 s = log_io_s;
443
444 // exec_status stores the execution status of flush_jobs as
445 // <bool /* executed */, Status /* status code */>
446 autovector<std::pair<bool, Status>> exec_status;
447 autovector<IOStatus> io_status;
448 std::vector<bool> pick_status;
449 for (int i = 0; i != num_cfs; ++i) {
450 // Initially all jobs are not executed, with status OK.
451 exec_status.emplace_back(false, Status::OK());
452 io_status.emplace_back(IOStatus::OK());
453 pick_status.push_back(false);
454 }
455
456 if (s.ok()) {
457 for (int i = 0; i != num_cfs; ++i) {
458 jobs[i]->PickMemTable();
459 pick_status[i] = true;
460 }
461 }
462
463 if (s.ok()) {
464 // TODO (yanqin): parallelize jobs with threads.
465 for (int i = 1; i != num_cfs; ++i) {
466 exec_status[i].second =
467 jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
468 exec_status[i].first = true;
469 io_status[i] = jobs[i]->io_status();
470 }
471 if (num_cfs > 1) {
472 TEST_SYNC_POINT(
473 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1");
474 TEST_SYNC_POINT(
475 "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
476 }
477 assert(exec_status.size() > 0);
478 assert(!file_meta.empty());
479 exec_status[0].second =
480 jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
481 exec_status[0].first = true;
482 io_status[0] = jobs[0]->io_status();
483
484 Status error_status;
485 for (const auto& e : exec_status) {
486 if (!e.second.ok()) {
487 s = e.second;
488 if (!e.second.IsShutdownInProgress() &&
489 !e.second.IsColumnFamilyDropped()) {
490 // If a flush job did not return OK, and the CF is not dropped, and
491 // the DB is not shutting down, then we have to return this result to
492 // caller later.
493 error_status = e.second;
494 }
495 }
496 }
497
498 s = error_status.ok() ? s : error_status;
499 }
500
501 IOStatus io_s = IOStatus::OK();
502 if (io_s.ok()) {
503 IOStatus io_error = IOStatus::OK();
504 for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
505 if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
506 !io_status[i].IsColumnFamilyDropped()) {
507 io_error = io_status[i];
508 }
509 }
510 io_s = io_error;
511 if (s.ok() && !io_s.ok()) {
512 s = io_s;
513 }
514 }
515
516 if (s.IsColumnFamilyDropped()) {
517 s = Status::OK();
518 }
519
520 if (s.ok() || s.IsShutdownInProgress()) {
521 // Sync on all distinct output directories.
522 for (auto dir : distinct_output_dirs) {
523 if (dir != nullptr) {
524 Status error_status = dir->Fsync(IOOptions(), nullptr);
525 if (!error_status.ok()) {
526 s = error_status;
527 break;
528 }
529 }
530 }
531 } else {
532 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
533 // it is not because of CF drop.
534 // Have to cancel the flush jobs that have NOT executed because we need to
535 // unref the versions.
536 for (int i = 0; i != num_cfs; ++i) {
537 if (pick_status[i] && !exec_status[i].first) {
538 jobs[i]->Cancel();
539 }
540 }
541 for (int i = 0; i != num_cfs; ++i) {
542 if (exec_status[i].second.ok() && exec_status[i].first) {
543 auto& mems = jobs[i]->GetMemTables();
544 cfds[i]->imm()->RollbackMemtableFlush(mems,
545 file_meta[i].fd.GetNumber());
546 }
547 }
548 }
549
550 if (s.ok()) {
551 auto wait_to_install_func = [&]() {
552 bool ready = true;
553 for (size_t i = 0; i != cfds.size(); ++i) {
554 const auto& mems = jobs[i]->GetMemTables();
555 if (cfds[i]->IsDropped()) {
556 // If the column family is dropped, then do not wait.
557 continue;
558 } else if (!mems.empty() &&
559 cfds[i]->imm()->GetEarliestMemTableID() < mems[0]->GetID()) {
560 // If a flush job needs to install the flush result for mems and
561 // mems[0] is not the earliest memtable, it means another thread must
562 // be installing flush results for the same column family, then the
563 // current thread needs to wait.
564 ready = false;
565 break;
566 } else if (mems.empty() && cfds[i]->imm()->GetEarliestMemTableID() <=
567 bg_flush_args[i].max_memtable_id_) {
568 // If a flush job does not need to install flush results, then it has
569 // to wait until all memtables up to max_memtable_id_ (inclusive) are
570 // installed.
571 ready = false;
572 break;
573 }
574 }
575 return ready;
576 };
577
578 bool resuming_from_bg_err = error_handler_.IsDBStopped();
579 while ((!error_handler_.IsDBStopped() ||
580 error_handler_.GetRecoveryError().ok()) &&
581 !wait_to_install_func()) {
582 atomic_flush_install_cv_.Wait();
583 }
584
585 s = resuming_from_bg_err ? error_handler_.GetRecoveryError()
586 : error_handler_.GetBGError();
587 }
588
589 if (s.ok()) {
590 autovector<ColumnFamilyData*> tmp_cfds;
591 autovector<const autovector<MemTable*>*> mems_list;
592 autovector<const MutableCFOptions*> mutable_cf_options_list;
593 autovector<FileMetaData*> tmp_file_meta;
594 for (int i = 0; i != num_cfs; ++i) {
595 const auto& mems = jobs[i]->GetMemTables();
596 if (!cfds[i]->IsDropped() && !mems.empty()) {
597 tmp_cfds.emplace_back(cfds[i]);
598 mems_list.emplace_back(&mems);
599 mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
600 tmp_file_meta.emplace_back(&file_meta[i]);
601 }
602 }
603
604 s = InstallMemtableAtomicFlushResults(
605 nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
606 versions_.get(), &logs_with_prep_tracker_, &mutex_, tmp_file_meta,
607 &job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
608 }
609
610 if (s.ok()) {
611 assert(num_cfs ==
612 static_cast<int>(job_context->superversion_contexts.size()));
613 for (int i = 0; i != num_cfs; ++i) {
614 assert(cfds[i]);
615
616 if (cfds[i]->IsDropped()) {
617 continue;
618 }
619 InstallSuperVersionAndScheduleWork(cfds[i],
620 &job_context->superversion_contexts[i],
621 all_mutable_cf_options[i]);
622
623 const std::string& column_family_name = cfds[i]->GetName();
624
625 Version* const current = cfds[i]->current();
626 assert(current);
627
628 const VersionStorageInfo* const storage_info = current->storage_info();
629 assert(storage_info);
630
631 VersionStorageInfo::LevelSummaryStorage tmp;
632 ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
633 column_family_name.c_str(),
634 storage_info->LevelSummary(&tmp));
635
636 const auto& blob_files = storage_info->GetBlobFiles();
637 if (!blob_files.empty()) {
638 ROCKS_LOG_BUFFER(log_buffer,
639 "[%s] Blob file summary: head=%" PRIu64
640 ", tail=%" PRIu64 "\n",
641 column_family_name.c_str(), blob_files.begin()->first,
642 blob_files.rbegin()->first);
643 }
644 }
645 if (made_progress) {
646 *made_progress = true;
647 }
648 #ifndef ROCKSDB_LITE
649 auto sfm = static_cast<SstFileManagerImpl*>(
650 immutable_db_options_.sst_file_manager.get());
651 assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
652 for (int i = 0; s.ok() && i != num_cfs; ++i) {
653 if (cfds[i]->IsDropped()) {
654 continue;
655 }
656 NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
657 jobs[i]->GetCommittedFlushJobsInfo());
658 if (sfm) {
659 std::string file_path = MakeTableFileName(
660 cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
661 // TODO (PR7798). We should only add the file to the FileManager if it
662 // exists. Otherwise, some tests may fail. Ignore the error in the
663 // interim.
664 sfm->OnAddFile(file_path).PermitUncheckedError();
665 if (sfm->IsMaxAllowedSpaceReached() &&
666 error_handler_.GetBGError().ok()) {
667 Status new_bg_error =
668 Status::SpaceLimit("Max allowed space was reached");
669 error_handler_.SetBGError(new_bg_error,
670 BackgroundErrorReason::kFlush);
671 }
672 }
673 }
674 #endif // ROCKSDB_LITE
675 }
676
677 // Need to undo atomic flush if something went wrong, i.e. s is not OK and
678 // it is not because of CF drop.
679 if (!s.ok() && !s.IsColumnFamilyDropped()) {
680 if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
681 assert(log_io_s.ok());
682 // Error while writing to MANIFEST.
683 // In fact, versions_->io_status() can also be the result of renaming
684 // CURRENT file. With current code, it's just difficult to tell. So just
685 // be pessimistic and try write to a new MANIFEST.
686 // TODO: distinguish between MANIFEST write and CURRENT renaming
687 if (!versions_->io_status().ok()) {
688 // If WAL sync is successful (either WAL size is 0 or there is no IO
689 // error), all the Manifest write will be map to soft error.
690 // TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor
691 // is needed.
692 error_handler_.SetBGError(io_s,
693 BackgroundErrorReason::kManifestWriteNoWAL);
694 } else {
695 // If WAL sync is successful (either WAL size is 0 or there is no IO
696 // error), all the other SST file write errors will be set as
697 // kFlushNoWAL.
698 error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
699 }
700 } else {
701 if (log_io_s.ok()) {
702 Status new_bg_error = s;
703 error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
704 }
705 }
706 }
707
708 return s;
709 }
710
NotifyOnFlushBegin(ColumnFamilyData * cfd,FileMetaData * file_meta,const MutableCFOptions & mutable_cf_options,int job_id)711 void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
712 const MutableCFOptions& mutable_cf_options,
713 int job_id) {
714 #ifndef ROCKSDB_LITE
715 if (immutable_db_options_.listeners.size() == 0U) {
716 return;
717 }
718 mutex_.AssertHeld();
719 if (shutting_down_.load(std::memory_order_acquire)) {
720 return;
721 }
722 bool triggered_writes_slowdown =
723 (cfd->current()->storage_info()->NumLevelFiles(0) >=
724 mutable_cf_options.level0_slowdown_writes_trigger);
725 bool triggered_writes_stop =
726 (cfd->current()->storage_info()->NumLevelFiles(0) >=
727 mutable_cf_options.level0_stop_writes_trigger);
728 // release lock while notifying events
729 mutex_.Unlock();
730 {
731 FlushJobInfo info{};
732 info.cf_id = cfd->GetID();
733 info.cf_name = cfd->GetName();
734 // TODO(yhchiang): make db_paths dynamic in case flush does not
735 // go to L0 in the future.
736 const uint64_t file_number = file_meta->fd.GetNumber();
737 info.file_path =
738 MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_number);
739 info.file_number = file_number;
740 info.thread_id = env_->GetThreadID();
741 info.job_id = job_id;
742 info.triggered_writes_slowdown = triggered_writes_slowdown;
743 info.triggered_writes_stop = triggered_writes_stop;
744 info.smallest_seqno = file_meta->fd.smallest_seqno;
745 info.largest_seqno = file_meta->fd.largest_seqno;
746 info.flush_reason = cfd->GetFlushReason();
747 for (auto listener : immutable_db_options_.listeners) {
748 listener->OnFlushBegin(this, info);
749 }
750 }
751 mutex_.Lock();
752 // no need to signal bg_cv_ as it will be signaled at the end of the
753 // flush process.
754 #else
755 (void)cfd;
756 (void)file_meta;
757 (void)mutable_cf_options;
758 (void)job_id;
759 #endif // ROCKSDB_LITE
760 }
761
NotifyOnFlushCompleted(ColumnFamilyData * cfd,const MutableCFOptions & mutable_cf_options,std::list<std::unique_ptr<FlushJobInfo>> * flush_jobs_info)762 void DBImpl::NotifyOnFlushCompleted(
763 ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
764 std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
765 #ifndef ROCKSDB_LITE
766 assert(flush_jobs_info != nullptr);
767 if (immutable_db_options_.listeners.size() == 0U) {
768 return;
769 }
770 mutex_.AssertHeld();
771 if (shutting_down_.load(std::memory_order_acquire)) {
772 return;
773 }
774 bool triggered_writes_slowdown =
775 (cfd->current()->storage_info()->NumLevelFiles(0) >=
776 mutable_cf_options.level0_slowdown_writes_trigger);
777 bool triggered_writes_stop =
778 (cfd->current()->storage_info()->NumLevelFiles(0) >=
779 mutable_cf_options.level0_stop_writes_trigger);
780 // release lock while notifying events
781 mutex_.Unlock();
782 {
783 for (auto& info : *flush_jobs_info) {
784 info->triggered_writes_slowdown = triggered_writes_slowdown;
785 info->triggered_writes_stop = triggered_writes_stop;
786 for (auto listener : immutable_db_options_.listeners) {
787 listener->OnFlushCompleted(this, *info);
788 }
789 }
790 flush_jobs_info->clear();
791 }
792 mutex_.Lock();
793 // no need to signal bg_cv_ as it will be signaled at the end of the
794 // flush process.
795 #else
796 (void)cfd;
797 (void)mutable_cf_options;
798 (void)flush_jobs_info;
799 #endif // ROCKSDB_LITE
800 }
801
CompactRange(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin_without_ts,const Slice * end_without_ts)802 Status DBImpl::CompactRange(const CompactRangeOptions& options,
803 ColumnFamilyHandle* column_family,
804 const Slice* begin_without_ts,
805 const Slice* end_without_ts) {
806 if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
807 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
808 }
809
810 const Comparator* const ucmp = column_family->GetComparator();
811 assert(ucmp);
812 size_t ts_sz = ucmp->timestamp_size();
813 if (ts_sz == 0) {
814 return CompactRangeInternal(options, column_family, begin_without_ts,
815 end_without_ts);
816 }
817
818 std::string begin_str;
819 std::string end_str;
820
821 // CompactRange compact all keys: [begin, end] inclusively. Add maximum
822 // timestamp to include all `begin` keys, and add minimal timestamp to include
823 // all `end` keys.
824 if (begin_without_ts != nullptr) {
825 AppendKeyWithMaxTimestamp(&begin_str, *begin_without_ts, ts_sz);
826 }
827 if (end_without_ts != nullptr) {
828 AppendKeyWithMinTimestamp(&end_str, *end_without_ts, ts_sz);
829 }
830 Slice begin(begin_str);
831 Slice end(end_str);
832
833 Slice* begin_with_ts = begin_without_ts ? &begin : nullptr;
834 Slice* end_with_ts = end_without_ts ? &end : nullptr;
835
836 return CompactRangeInternal(options, column_family, begin_with_ts,
837 end_with_ts);
838 }
839
IncreaseFullHistoryTsLow(ColumnFamilyData * cfd,std::string ts_low)840 Status DBImpl::IncreaseFullHistoryTsLow(ColumnFamilyData* cfd,
841 std::string ts_low) {
842 VersionEdit edit;
843 edit.SetColumnFamily(cfd->GetID());
844 edit.SetFullHistoryTsLow(ts_low);
845
846 InstrumentedMutexLock l(&mutex_);
847 std::string current_ts_low = cfd->GetFullHistoryTsLow();
848 const Comparator* ucmp = cfd->user_comparator();
849 if (!current_ts_low.empty() &&
850 ucmp->CompareTimestamp(ts_low, current_ts_low) < 0) {
851 return Status::InvalidArgument(
852 "Cannot decrease full_history_timestamp_low");
853 }
854
855 return versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit,
856 &mutex_);
857 }
858
CompactRangeInternal(const CompactRangeOptions & options,ColumnFamilyHandle * column_family,const Slice * begin,const Slice * end)859 Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
860 ColumnFamilyHandle* column_family,
861 const Slice* begin, const Slice* end) {
862 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
863 auto cfd = cfh->cfd();
864
865 if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
866 return Status::InvalidArgument("Invalid target path ID");
867 }
868
869 bool flush_needed = true;
870
871 // Update full_history_ts_low if it's set
872 if (options.full_history_ts_low != nullptr &&
873 !options.full_history_ts_low->empty()) {
874 std::string ts_low = options.full_history_ts_low->ToString();
875 if (begin != nullptr || end != nullptr) {
876 return Status::InvalidArgument(
877 "Cannot specify compaction range with full_history_ts_low");
878 }
879 Status s = IncreaseFullHistoryTsLow(cfd, ts_low);
880 if (!s.ok()) {
881 LogFlush(immutable_db_options_.info_log);
882 return s;
883 }
884 }
885
886 Status s;
887 if (begin != nullptr && end != nullptr) {
888 // TODO(ajkr): We could also optimize away the flush in certain cases where
889 // one/both sides of the interval are unbounded. But it requires more
890 // changes to RangesOverlapWithMemtables.
891 Range range(*begin, *end);
892 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
893 s = cfd->RangesOverlapWithMemtables(
894 {range}, super_version, immutable_db_options_.allow_data_in_errors,
895 &flush_needed);
896 CleanupSuperVersion(super_version);
897 }
898
899 if (s.ok() && flush_needed) {
900 FlushOptions fo;
901 fo.allow_write_stall = options.allow_write_stall;
902 if (immutable_db_options_.atomic_flush) {
903 autovector<ColumnFamilyData*> cfds;
904 mutex_.Lock();
905 SelectColumnFamiliesForAtomicFlush(&cfds);
906 mutex_.Unlock();
907 s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
908 false /* writes_stopped */);
909 } else {
910 s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
911 false /* writes_stopped*/);
912 }
913 if (!s.ok()) {
914 LogFlush(immutable_db_options_.info_log);
915 return s;
916 }
917 }
918
919 constexpr int kInvalidLevel = -1;
920 int final_output_level = kInvalidLevel;
921 bool exclusive = options.exclusive_manual_compaction;
922 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
923 cfd->NumberLevels() > 1) {
924 // Always compact all files together.
925 final_output_level = cfd->NumberLevels() - 1;
926 // if bottom most level is reserved
927 if (immutable_db_options_.allow_ingest_behind) {
928 final_output_level--;
929 }
930 s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
931 final_output_level, options, begin, end, exclusive,
932 false, port::kMaxUint64);
933 } else {
934 int first_overlapped_level = kInvalidLevel;
935 int max_overlapped_level = kInvalidLevel;
936 {
937 SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
938 Version* current_version = super_version->current;
939 ReadOptions ro;
940 ro.total_order_seek = true;
941 bool overlap;
942 for (int level = 0;
943 level < current_version->storage_info()->num_non_empty_levels();
944 level++) {
945 overlap = true;
946 if (begin != nullptr && end != nullptr) {
947 Status status = current_version->OverlapWithLevelIterator(
948 ro, file_options_, *begin, *end, level, &overlap);
949 if (!status.ok()) {
950 overlap = current_version->storage_info()->OverlapInLevel(
951 level, begin, end);
952 }
953 } else {
954 overlap = current_version->storage_info()->OverlapInLevel(level,
955 begin, end);
956 }
957 if (overlap) {
958 if (first_overlapped_level == kInvalidLevel) {
959 first_overlapped_level = level;
960 }
961 max_overlapped_level = level;
962 }
963 }
964 CleanupSuperVersion(super_version);
965 }
966 if (s.ok() && first_overlapped_level != kInvalidLevel) {
967 // max_file_num_to_ignore can be used to filter out newly created SST
968 // files, useful for bottom level compaction in a manual compaction
969 uint64_t max_file_num_to_ignore = port::kMaxUint64;
970 uint64_t next_file_number = versions_->current_next_file_number();
971 final_output_level = max_overlapped_level;
972 int output_level;
973 for (int level = first_overlapped_level; level <= max_overlapped_level;
974 level++) {
975 bool disallow_trivial_move = false;
976 // in case the compaction is universal or if we're compacting the
977 // bottom-most level, the output level will be the same as input one.
978 // level 0 can never be the bottommost level (i.e. if all files are in
979 // level 0, we will compact to level 1)
980 if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
981 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
982 output_level = level;
983 } else if (level == max_overlapped_level && level > 0) {
984 if (options.bottommost_level_compaction ==
985 BottommostLevelCompaction::kSkip) {
986 // Skip bottommost level compaction
987 continue;
988 } else if (options.bottommost_level_compaction ==
989 BottommostLevelCompaction::kIfHaveCompactionFilter &&
990 cfd->ioptions()->compaction_filter == nullptr &&
991 cfd->ioptions()->compaction_filter_factory == nullptr) {
992 // Skip bottommost level compaction since we don't have a compaction
993 // filter
994 continue;
995 }
996 output_level = level;
997 // update max_file_num_to_ignore only for bottom level compaction
998 // because data in newly compacted files in middle levels may still
999 // need to be pushed down
1000 max_file_num_to_ignore = next_file_number;
1001 } else {
1002 output_level = level + 1;
1003 if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
1004 cfd->ioptions()->level_compaction_dynamic_level_bytes &&
1005 level == 0) {
1006 output_level = ColumnFamilyData::kCompactToBaseLevel;
1007 }
1008 // if it's a BottommostLevel compaction and `kForce*` compaction is
1009 // set, disallow trivial move
1010 if (level == max_overlapped_level &&
1011 (options.bottommost_level_compaction ==
1012 BottommostLevelCompaction::kForce ||
1013 options.bottommost_level_compaction ==
1014 BottommostLevelCompaction::kForceOptimized)) {
1015 disallow_trivial_move = true;
1016 }
1017 }
1018 s = RunManualCompaction(cfd, level, output_level, options, begin, end,
1019 exclusive, disallow_trivial_move,
1020 max_file_num_to_ignore);
1021 if (!s.ok()) {
1022 break;
1023 }
1024 if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
1025 final_output_level = cfd->NumberLevels() - 1;
1026 } else if (output_level > final_output_level) {
1027 final_output_level = output_level;
1028 }
1029 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
1030 TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
1031 }
1032 }
1033 }
1034 if (!s.ok() || final_output_level == kInvalidLevel) {
1035 LogFlush(immutable_db_options_.info_log);
1036 return s;
1037 }
1038
1039 if (options.change_level) {
1040 TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:1");
1041 TEST_SYNC_POINT("DBImpl::CompactRange:BeforeRefit:2");
1042
1043 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1044 "[RefitLevel] waiting for background threads to stop");
1045 DisableManualCompaction();
1046 s = PauseBackgroundWork();
1047 if (s.ok()) {
1048 TEST_SYNC_POINT("DBImpl::CompactRange:PreRefitLevel");
1049 s = ReFitLevel(cfd, final_output_level, options.target_level);
1050 TEST_SYNC_POINT("DBImpl::CompactRange:PostRefitLevel");
1051 // ContinueBackgroundWork always return Status::OK().
1052 Status temp_s = ContinueBackgroundWork();
1053 assert(temp_s.ok());
1054 }
1055 EnableManualCompaction();
1056 }
1057 LogFlush(immutable_db_options_.info_log);
1058
1059 {
1060 InstrumentedMutexLock l(&mutex_);
1061 // an automatic compaction that has been scheduled might have been
1062 // preempted by the manual compactions. Need to schedule it back.
1063 MaybeScheduleFlushOrCompaction();
1064 }
1065
1066 return s;
1067 }
1068
CompactFiles(const CompactionOptions & compact_options,ColumnFamilyHandle * column_family,const std::vector<std::string> & input_file_names,const int output_level,const int output_path_id,std::vector<std::string> * const output_file_names,CompactionJobInfo * compaction_job_info)1069 Status DBImpl::CompactFiles(const CompactionOptions& compact_options,
1070 ColumnFamilyHandle* column_family,
1071 const std::vector<std::string>& input_file_names,
1072 const int output_level, const int output_path_id,
1073 std::vector<std::string>* const output_file_names,
1074 CompactionJobInfo* compaction_job_info) {
1075 #ifdef ROCKSDB_LITE
1076 (void)compact_options;
1077 (void)column_family;
1078 (void)input_file_names;
1079 (void)output_level;
1080 (void)output_path_id;
1081 (void)output_file_names;
1082 (void)compaction_job_info;
1083 // not supported in lite version
1084 return Status::NotSupported("Not supported in ROCKSDB LITE");
1085 #else
1086 if (column_family == nullptr) {
1087 return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
1088 }
1089
1090 auto cfd =
1091 static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
1092 assert(cfd);
1093
1094 Status s;
1095 JobContext job_context(next_job_id_.fetch_add(1), true);
1096 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1097 immutable_db_options_.info_log.get());
1098
1099 // Perform CompactFiles
1100 TEST_SYNC_POINT("TestCompactFiles::IngestExternalFile2");
1101 {
1102 InstrumentedMutexLock l(&mutex_);
1103
1104 // This call will unlock/lock the mutex to wait for current running
1105 // IngestExternalFile() calls to finish.
1106 WaitForIngestFile();
1107
1108 // We need to get current after `WaitForIngestFile`, because
1109 // `IngestExternalFile` may add files that overlap with `input_file_names`
1110 auto* current = cfd->current();
1111 current->Ref();
1112
1113 s = CompactFilesImpl(compact_options, cfd, current, input_file_names,
1114 output_file_names, output_level, output_path_id,
1115 &job_context, &log_buffer, compaction_job_info);
1116
1117 current->Unref();
1118 }
1119
1120 // Find and delete obsolete files
1121 {
1122 InstrumentedMutexLock l(&mutex_);
1123 // If !s.ok(), this means that Compaction failed. In that case, we want
1124 // to delete all obsolete files we might have created and we force
1125 // FindObsoleteFiles(). This is because job_context does not
1126 // catch all created files if compaction failed.
1127 FindObsoleteFiles(&job_context, !s.ok());
1128 } // release the mutex
1129
1130 // delete unnecessary files if any, this is done outside the mutex
1131 if (job_context.HaveSomethingToClean() ||
1132 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
1133 // Have to flush the info logs before bg_compaction_scheduled_--
1134 // because if bg_flush_scheduled_ becomes 0 and the lock is
1135 // released, the deconstructor of DB can kick in and destroy all the
1136 // states of DB so info_log might not be available after that point.
1137 // It also applies to access other states that DB owns.
1138 log_buffer.FlushBufferToLog();
1139 if (job_context.HaveSomethingToDelete()) {
1140 // no mutex is locked here. No need to Unlock() and Lock() here.
1141 PurgeObsoleteFiles(job_context);
1142 }
1143 job_context.Clean();
1144 }
1145
1146 return s;
1147 #endif // ROCKSDB_LITE
1148 }
1149
1150 #ifndef ROCKSDB_LITE
CompactFilesImpl(const CompactionOptions & compact_options,ColumnFamilyData * cfd,Version * version,const std::vector<std::string> & input_file_names,std::vector<std::string> * const output_file_names,const int output_level,int output_path_id,JobContext * job_context,LogBuffer * log_buffer,CompactionJobInfo * compaction_job_info)1151 Status DBImpl::CompactFilesImpl(
1152 const CompactionOptions& compact_options, ColumnFamilyData* cfd,
1153 Version* version, const std::vector<std::string>& input_file_names,
1154 std::vector<std::string>* const output_file_names, const int output_level,
1155 int output_path_id, JobContext* job_context, LogBuffer* log_buffer,
1156 CompactionJobInfo* compaction_job_info) {
1157 mutex_.AssertHeld();
1158
1159 if (shutting_down_.load(std::memory_order_acquire)) {
1160 return Status::ShutdownInProgress();
1161 }
1162 if (manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1163 return Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1164 }
1165
1166 std::unordered_set<uint64_t> input_set;
1167 for (const auto& file_name : input_file_names) {
1168 input_set.insert(TableFileNameToNumber(file_name));
1169 }
1170
1171 ColumnFamilyMetaData cf_meta;
1172 // TODO(yhchiang): can directly use version here if none of the
1173 // following functions call is pluggable to external developers.
1174 version->GetColumnFamilyMetaData(&cf_meta);
1175
1176 if (output_path_id < 0) {
1177 if (cfd->ioptions()->cf_paths.size() == 1U) {
1178 output_path_id = 0;
1179 } else {
1180 return Status::NotSupported(
1181 "Automatic output path selection is not "
1182 "yet supported in CompactFiles()");
1183 }
1184 }
1185
1186 Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
1187 &input_set, cf_meta, output_level);
1188 if (!s.ok()) {
1189 return s;
1190 }
1191
1192 std::vector<CompactionInputFiles> input_files;
1193 s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
1194 &input_files, &input_set, version->storage_info(), compact_options);
1195 if (!s.ok()) {
1196 return s;
1197 }
1198
1199 for (const auto& inputs : input_files) {
1200 if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
1201 return Status::Aborted(
1202 "Some of the necessary compaction input "
1203 "files are already being compacted");
1204 }
1205 }
1206 bool sfm_reserved_compact_space = false;
1207 // First check if we have enough room to do the compaction
1208 bool enough_room = EnoughRoomForCompaction(
1209 cfd, input_files, &sfm_reserved_compact_space, log_buffer);
1210
1211 if (!enough_room) {
1212 // m's vars will get set properly at the end of this function,
1213 // as long as status == CompactionTooLarge
1214 return Status::CompactionTooLarge();
1215 }
1216
1217 // At this point, CompactFiles will be run.
1218 bg_compaction_scheduled_++;
1219
1220 std::unique_ptr<Compaction> c;
1221 assert(cfd->compaction_picker());
1222 c.reset(cfd->compaction_picker()->CompactFiles(
1223 compact_options, input_files, output_level, version->storage_info(),
1224 *cfd->GetLatestMutableCFOptions(), mutable_db_options_, output_path_id));
1225 // we already sanitized the set of input files and checked for conflicts
1226 // without releasing the lock, so we're guaranteed a compaction can be formed.
1227 assert(c != nullptr);
1228
1229 c->SetInputVersion(version);
1230 // deletion compaction currently not allowed in CompactFiles.
1231 assert(!c->deletion_compaction());
1232
1233 std::vector<SequenceNumber> snapshot_seqs;
1234 SequenceNumber earliest_write_conflict_snapshot;
1235 SnapshotChecker* snapshot_checker;
1236 GetSnapshotContext(job_context, &snapshot_seqs,
1237 &earliest_write_conflict_snapshot, &snapshot_checker);
1238
1239 std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1240 new std::list<uint64_t>::iterator(
1241 CaptureCurrentFileNumberInPendingOutputs()));
1242
1243 assert(is_snapshot_supported_ || snapshots_.empty());
1244 CompactionJobStats compaction_job_stats;
1245 CompactionJob compaction_job(
1246 job_context->job_id, c.get(), immutable_db_options_, mutable_db_options_,
1247 file_options_for_compaction_, versions_.get(), &shutting_down_,
1248 preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
1249 GetDataDir(c->column_family_data(), c->output_path_id()),
1250 GetDataDir(c->column_family_data(), 0), stats_, &mutex_, &error_handler_,
1251 snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
1252 table_cache_, &event_logger_,
1253 c->mutable_cf_options()->paranoid_file_checks,
1254 c->mutable_cf_options()->report_bg_io_stats, dbname_,
1255 &compaction_job_stats, Env::Priority::USER, io_tracer_,
1256 &manual_compaction_paused_, db_id_, db_session_id_,
1257 c->column_family_data()->GetFullHistoryTsLow());
1258
1259 // Creating a compaction influences the compaction score because the score
1260 // takes running compactions into account (by skipping files that are already
1261 // being compacted). Since we just changed compaction score, we recalculate it
1262 // here.
1263 version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
1264 *c->mutable_cf_options());
1265
1266 compaction_job.Prepare();
1267
1268 mutex_.Unlock();
1269 TEST_SYNC_POINT("CompactFilesImpl:0");
1270 TEST_SYNC_POINT("CompactFilesImpl:1");
1271 // Ignore the status here, as it will be checked in the Install down below...
1272 compaction_job.Run().PermitUncheckedError();
1273 TEST_SYNC_POINT("CompactFilesImpl:2");
1274 TEST_SYNC_POINT("CompactFilesImpl:3");
1275 mutex_.Lock();
1276
1277 Status status = compaction_job.Install(*c->mutable_cf_options());
1278 if (status.ok()) {
1279 assert(compaction_job.io_status().ok());
1280 InstallSuperVersionAndScheduleWork(c->column_family_data(),
1281 &job_context->superversion_contexts[0],
1282 *c->mutable_cf_options());
1283 }
1284 // status above captures any error during compaction_job.Install, so its ok
1285 // not check compaction_job.io_status() explicitly if we're not calling
1286 // SetBGError
1287 compaction_job.io_status().PermitUncheckedError();
1288 c->ReleaseCompactionFiles(s);
1289 #ifndef ROCKSDB_LITE
1290 // Need to make sure SstFileManager does its bookkeeping
1291 auto sfm = static_cast<SstFileManagerImpl*>(
1292 immutable_db_options_.sst_file_manager.get());
1293 if (sfm && sfm_reserved_compact_space) {
1294 sfm->OnCompactionCompletion(c.get());
1295 }
1296 #endif // ROCKSDB_LITE
1297
1298 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1299
1300 if (compaction_job_info != nullptr) {
1301 BuildCompactionJobInfo(cfd, c.get(), s, compaction_job_stats,
1302 job_context->job_id, version, compaction_job_info);
1303 }
1304
1305 if (status.ok()) {
1306 // Done
1307 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
1308 // Ignore compaction errors found during shutting down
1309 } else if (status.IsManualCompactionPaused()) {
1310 // Don't report stopping manual compaction as error
1311 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1312 "[%s] [JOB %d] Stopping manual compaction",
1313 c->column_family_data()->GetName().c_str(),
1314 job_context->job_id);
1315 } else {
1316 ROCKS_LOG_WARN(immutable_db_options_.info_log,
1317 "[%s] [JOB %d] Compaction error: %s",
1318 c->column_family_data()->GetName().c_str(),
1319 job_context->job_id, status.ToString().c_str());
1320 IOStatus io_s = compaction_job.io_status();
1321 if (!io_s.ok()) {
1322 error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
1323 } else {
1324 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
1325 }
1326 }
1327
1328 if (output_file_names != nullptr) {
1329 for (const auto& newf : c->edit()->GetNewFiles()) {
1330 (*output_file_names)
1331 .push_back(TableFileName(c->immutable_cf_options()->cf_paths,
1332 newf.second.fd.GetNumber(),
1333 newf.second.fd.GetPathId()));
1334 }
1335 }
1336
1337 c.reset();
1338
1339 bg_compaction_scheduled_--;
1340 if (bg_compaction_scheduled_ == 0) {
1341 bg_cv_.SignalAll();
1342 }
1343 MaybeScheduleFlushOrCompaction();
1344 TEST_SYNC_POINT("CompactFilesImpl:End");
1345
1346 return status;
1347 }
1348 #endif // ROCKSDB_LITE
1349
PauseBackgroundWork()1350 Status DBImpl::PauseBackgroundWork() {
1351 InstrumentedMutexLock guard_lock(&mutex_);
1352 bg_compaction_paused_++;
1353 while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
1354 bg_flush_scheduled_ > 0) {
1355 bg_cv_.Wait();
1356 }
1357 bg_work_paused_++;
1358 return Status::OK();
1359 }
1360
ContinueBackgroundWork()1361 Status DBImpl::ContinueBackgroundWork() {
1362 InstrumentedMutexLock guard_lock(&mutex_);
1363 if (bg_work_paused_ == 0) {
1364 return Status::InvalidArgument();
1365 }
1366 assert(bg_work_paused_ > 0);
1367 assert(bg_compaction_paused_ > 0);
1368 bg_compaction_paused_--;
1369 bg_work_paused_--;
1370 // It's sufficient to check just bg_work_paused_ here since
1371 // bg_work_paused_ is always no greater than bg_compaction_paused_
1372 if (bg_work_paused_ == 0) {
1373 MaybeScheduleFlushOrCompaction();
1374 }
1375 return Status::OK();
1376 }
1377
NotifyOnCompactionBegin(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & job_stats,int job_id)1378 void DBImpl::NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1379 const Status& st,
1380 const CompactionJobStats& job_stats,
1381 int job_id) {
1382 #ifndef ROCKSDB_LITE
1383 if (immutable_db_options_.listeners.empty()) {
1384 return;
1385 }
1386 mutex_.AssertHeld();
1387 if (shutting_down_.load(std::memory_order_acquire)) {
1388 return;
1389 }
1390 if (c->is_manual_compaction() &&
1391 manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1392 return;
1393 }
1394 Version* current = cfd->current();
1395 current->Ref();
1396 // release lock while notifying events
1397 mutex_.Unlock();
1398 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionBegin::UnlockMutex");
1399 {
1400 CompactionJobInfo info{};
1401 BuildCompactionJobInfo(cfd, c, st, job_stats, job_id, current, &info);
1402 for (auto listener : immutable_db_options_.listeners) {
1403 listener->OnCompactionBegin(this, info);
1404 }
1405 info.status.PermitUncheckedError();
1406 }
1407 mutex_.Lock();
1408 current->Unref();
1409 #else
1410 (void)cfd;
1411 (void)c;
1412 (void)st;
1413 (void)job_stats;
1414 (void)job_id;
1415 #endif // ROCKSDB_LITE
1416 }
1417
NotifyOnCompactionCompleted(ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id)1418 void DBImpl::NotifyOnCompactionCompleted(
1419 ColumnFamilyData* cfd, Compaction* c, const Status& st,
1420 const CompactionJobStats& compaction_job_stats, const int job_id) {
1421 #ifndef ROCKSDB_LITE
1422 if (immutable_db_options_.listeners.size() == 0U) {
1423 return;
1424 }
1425 mutex_.AssertHeld();
1426 if (shutting_down_.load(std::memory_order_acquire)) {
1427 return;
1428 }
1429 if (c->is_manual_compaction() &&
1430 manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
1431 return;
1432 }
1433 Version* current = cfd->current();
1434 current->Ref();
1435 // release lock while notifying events
1436 mutex_.Unlock();
1437 TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
1438 {
1439 CompactionJobInfo info{};
1440 BuildCompactionJobInfo(cfd, c, st, compaction_job_stats, job_id, current,
1441 &info);
1442 for (auto listener : immutable_db_options_.listeners) {
1443 listener->OnCompactionCompleted(this, info);
1444 }
1445 }
1446 mutex_.Lock();
1447 current->Unref();
1448 // no need to signal bg_cv_ as it will be signaled at the end of the
1449 // flush process.
1450 #else
1451 (void)cfd;
1452 (void)c;
1453 (void)st;
1454 (void)compaction_job_stats;
1455 (void)job_id;
1456 #endif // ROCKSDB_LITE
1457 }
1458
1459 // REQUIREMENT: block all background work by calling PauseBackgroundWork()
1460 // before calling this function
ReFitLevel(ColumnFamilyData * cfd,int level,int target_level)1461 Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
1462 assert(level < cfd->NumberLevels());
1463 if (target_level >= cfd->NumberLevels()) {
1464 return Status::InvalidArgument("Target level exceeds number of levels");
1465 }
1466
1467 SuperVersionContext sv_context(/* create_superversion */ true);
1468
1469 InstrumentedMutexLock guard_lock(&mutex_);
1470
1471 // only allow one thread refitting
1472 if (refitting_level_) {
1473 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1474 "[ReFitLevel] another thread is refitting");
1475 return Status::NotSupported("another thread is refitting");
1476 }
1477 refitting_level_ = true;
1478
1479 const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
1480 // move to a smaller level
1481 int to_level = target_level;
1482 if (target_level < 0) {
1483 to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
1484 }
1485
1486 auto* vstorage = cfd->current()->storage_info();
1487 if (to_level != level) {
1488 if (to_level > level) {
1489 if (level == 0) {
1490 refitting_level_ = false;
1491 return Status::NotSupported(
1492 "Cannot change from level 0 to other levels.");
1493 }
1494 // Check levels are empty for a trivial move
1495 for (int l = level + 1; l <= to_level; l++) {
1496 if (vstorage->NumLevelFiles(l) > 0) {
1497 refitting_level_ = false;
1498 return Status::NotSupported(
1499 "Levels between source and target are not empty for a move.");
1500 }
1501 }
1502 } else {
1503 // to_level < level
1504 // Check levels are empty for a trivial move
1505 for (int l = to_level; l < level; l++) {
1506 if (vstorage->NumLevelFiles(l) > 0) {
1507 refitting_level_ = false;
1508 return Status::NotSupported(
1509 "Levels between source and target are not empty for a move.");
1510 }
1511 }
1512 }
1513 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1514 "[%s] Before refitting:\n%s", cfd->GetName().c_str(),
1515 cfd->current()->DebugString().data());
1516
1517 VersionEdit edit;
1518 edit.SetColumnFamily(cfd->GetID());
1519 for (const auto& f : vstorage->LevelFiles(level)) {
1520 edit.DeleteFile(level, f->fd.GetNumber());
1521 edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
1522 f->fd.GetFileSize(), f->smallest, f->largest,
1523 f->fd.smallest_seqno, f->fd.largest_seqno,
1524 f->marked_for_compaction, f->oldest_blob_file_number,
1525 f->oldest_ancester_time, f->file_creation_time,
1526 f->file_checksum, f->file_checksum_func_name);
1527 }
1528 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1529 "[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
1530 edit.DebugString().data());
1531
1532 Status status = versions_->LogAndApply(cfd, mutable_cf_options, &edit,
1533 &mutex_, directories_.GetDbDir());
1534
1535 InstallSuperVersionAndScheduleWork(cfd, &sv_context, mutable_cf_options);
1536
1537 ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
1538 cfd->GetName().c_str(), status.ToString().data());
1539
1540 if (status.ok()) {
1541 ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1542 "[%s] After refitting:\n%s", cfd->GetName().c_str(),
1543 cfd->current()->DebugString().data());
1544 }
1545 sv_context.Clean();
1546 refitting_level_ = false;
1547
1548 return status;
1549 }
1550
1551 refitting_level_ = false;
1552 return Status::OK();
1553 }
1554
NumberLevels(ColumnFamilyHandle * column_family)1555 int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
1556 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1557 return cfh->cfd()->NumberLevels();
1558 }
1559
MaxMemCompactionLevel(ColumnFamilyHandle *)1560 int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* /*column_family*/) {
1561 return 0;
1562 }
1563
Level0StopWriteTrigger(ColumnFamilyHandle * column_family)1564 int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
1565 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1566 InstrumentedMutexLock l(&mutex_);
1567 return cfh->cfd()
1568 ->GetSuperVersion()
1569 ->mutable_cf_options.level0_stop_writes_trigger;
1570 }
1571
Flush(const FlushOptions & flush_options,ColumnFamilyHandle * column_family)1572 Status DBImpl::Flush(const FlushOptions& flush_options,
1573 ColumnFamilyHandle* column_family) {
1574 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1575 ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
1576 cfh->GetName().c_str());
1577 Status s;
1578 if (immutable_db_options_.atomic_flush) {
1579 s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
1580 FlushReason::kManualFlush);
1581 } else {
1582 s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
1583 }
1584
1585 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1586 "[%s] Manual flush finished, status: %s\n",
1587 cfh->GetName().c_str(), s.ToString().c_str());
1588 return s;
1589 }
1590
Flush(const FlushOptions & flush_options,const std::vector<ColumnFamilyHandle * > & column_families)1591 Status DBImpl::Flush(const FlushOptions& flush_options,
1592 const std::vector<ColumnFamilyHandle*>& column_families) {
1593 Status s;
1594 if (!immutable_db_options_.atomic_flush) {
1595 for (auto cfh : column_families) {
1596 s = Flush(flush_options, cfh);
1597 if (!s.ok()) {
1598 break;
1599 }
1600 }
1601 } else {
1602 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1603 "Manual atomic flush start.\n"
1604 "=====Column families:=====");
1605 for (auto cfh : column_families) {
1606 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1607 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1608 cfhi->GetName().c_str());
1609 }
1610 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1611 "=====End of column families list=====");
1612 autovector<ColumnFamilyData*> cfds;
1613 std::for_each(column_families.begin(), column_families.end(),
1614 [&cfds](ColumnFamilyHandle* elem) {
1615 auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
1616 cfds.emplace_back(cfh->cfd());
1617 });
1618 s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
1619 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1620 "Manual atomic flush finished, status: %s\n"
1621 "=====Column families:=====",
1622 s.ToString().c_str());
1623 for (auto cfh : column_families) {
1624 auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
1625 ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
1626 cfhi->GetName().c_str());
1627 }
1628 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1629 "=====End of column families list=====");
1630 }
1631 return s;
1632 }
1633
RunManualCompaction(ColumnFamilyData * cfd,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const Slice * begin,const Slice * end,bool exclusive,bool disallow_trivial_move,uint64_t max_file_num_to_ignore)1634 Status DBImpl::RunManualCompaction(
1635 ColumnFamilyData* cfd, int input_level, int output_level,
1636 const CompactRangeOptions& compact_range_options, const Slice* begin,
1637 const Slice* end, bool exclusive, bool disallow_trivial_move,
1638 uint64_t max_file_num_to_ignore) {
1639 assert(input_level == ColumnFamilyData::kCompactAllLevels ||
1640 input_level >= 0);
1641
1642 InternalKey begin_storage, end_storage;
1643 CompactionArg* ca;
1644
1645 bool scheduled = false;
1646 bool manual_conflict = false;
1647 ManualCompactionState manual;
1648 manual.cfd = cfd;
1649 manual.input_level = input_level;
1650 manual.output_level = output_level;
1651 manual.output_path_id = compact_range_options.target_path_id;
1652 manual.done = false;
1653 manual.in_progress = false;
1654 manual.incomplete = false;
1655 manual.exclusive = exclusive;
1656 manual.disallow_trivial_move = disallow_trivial_move;
1657 // For universal compaction, we enforce every manual compaction to compact
1658 // all files.
1659 if (begin == nullptr ||
1660 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1661 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1662 manual.begin = nullptr;
1663 } else {
1664 begin_storage.SetMinPossibleForUserKey(*begin);
1665 manual.begin = &begin_storage;
1666 }
1667 if (end == nullptr ||
1668 cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
1669 cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
1670 manual.end = nullptr;
1671 } else {
1672 end_storage.SetMaxPossibleForUserKey(*end);
1673 manual.end = &end_storage;
1674 }
1675
1676 TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
1677 TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
1678 InstrumentedMutexLock l(&mutex_);
1679
1680 // When a manual compaction arrives, temporarily disable scheduling of
1681 // non-manual compactions and wait until the number of scheduled compaction
1682 // jobs drops to zero. This is needed to ensure that this manual compaction
1683 // can compact any range of keys/files.
1684 //
1685 // HasPendingManualCompaction() is true when at least one thread is inside
1686 // RunManualCompaction(), i.e. during that time no other compaction will
1687 // get scheduled (see MaybeScheduleFlushOrCompaction).
1688 //
1689 // Note that the following loop doesn't stop more that one thread calling
1690 // RunManualCompaction() from getting to the second while loop below.
1691 // However, only one of them will actually schedule compaction, while
1692 // others will wait on a condition variable until it completes.
1693
1694 AddManualCompaction(&manual);
1695 TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
1696 if (exclusive) {
1697 while (bg_bottom_compaction_scheduled_ > 0 ||
1698 bg_compaction_scheduled_ > 0) {
1699 TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
1700 ROCKS_LOG_INFO(
1701 immutable_db_options_.info_log,
1702 "[%s] Manual compaction waiting for all other scheduled background "
1703 "compactions to finish",
1704 cfd->GetName().c_str());
1705 bg_cv_.Wait();
1706 }
1707 }
1708
1709 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1710 "[%s] Manual compaction starting", cfd->GetName().c_str());
1711
1712 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
1713 immutable_db_options_.info_log.get());
1714 // We don't check bg_error_ here, because if we get the error in compaction,
1715 // the compaction will set manual.status to bg_error_ and set manual.done to
1716 // true.
1717 while (!manual.done) {
1718 assert(HasPendingManualCompaction());
1719 manual_conflict = false;
1720 Compaction* compaction = nullptr;
1721 if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
1722 scheduled ||
1723 (((manual.manual_end = &manual.tmp_storage1) != nullptr) &&
1724 ((compaction = manual.cfd->CompactRange(
1725 *manual.cfd->GetLatestMutableCFOptions(), mutable_db_options_,
1726 manual.input_level, manual.output_level, compact_range_options,
1727 manual.begin, manual.end, &manual.manual_end, &manual_conflict,
1728 max_file_num_to_ignore)) == nullptr &&
1729 manual_conflict))) {
1730 // exclusive manual compactions should not see a conflict during
1731 // CompactRange
1732 assert(!exclusive || !manual_conflict);
1733 // Running either this or some other manual compaction
1734 bg_cv_.Wait();
1735 if (scheduled && manual.incomplete == true) {
1736 assert(!manual.in_progress);
1737 scheduled = false;
1738 manual.incomplete = false;
1739 }
1740 } else if (!scheduled) {
1741 if (compaction == nullptr) {
1742 manual.done = true;
1743 bg_cv_.SignalAll();
1744 continue;
1745 }
1746 ca = new CompactionArg;
1747 ca->db = this;
1748 ca->compaction_pri_ = Env::Priority::LOW;
1749 ca->prepicked_compaction = new PrepickedCompaction;
1750 ca->prepicked_compaction->manual_compaction_state = &manual;
1751 ca->prepicked_compaction->compaction = compaction;
1752 if (!RequestCompactionToken(
1753 cfd, true, &ca->prepicked_compaction->task_token, &log_buffer)) {
1754 // Don't throttle manual compaction, only count outstanding tasks.
1755 assert(false);
1756 }
1757 manual.incomplete = false;
1758 bg_compaction_scheduled_++;
1759 Env::Priority thread_pool_pri = Env::Priority::LOW;
1760 if (compaction->bottommost_level() &&
1761 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
1762 thread_pool_pri = Env::Priority::BOTTOM;
1763 }
1764 env_->Schedule(&DBImpl::BGWorkCompaction, ca, thread_pool_pri, this,
1765 &DBImpl::UnscheduleCompactionCallback);
1766 scheduled = true;
1767 }
1768 }
1769
1770 log_buffer.FlushBufferToLog();
1771 assert(!manual.in_progress);
1772 assert(HasPendingManualCompaction());
1773 RemoveManualCompaction(&manual);
1774 bg_cv_.SignalAll();
1775 return manual.status;
1776 }
1777
GenerateFlushRequest(const autovector<ColumnFamilyData * > & cfds,FlushRequest * req)1778 void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
1779 FlushRequest* req) {
1780 assert(req != nullptr);
1781 req->reserve(cfds.size());
1782 for (const auto cfd : cfds) {
1783 if (nullptr == cfd) {
1784 // cfd may be null, see DBImpl::ScheduleFlushes
1785 continue;
1786 }
1787 uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
1788 req->emplace_back(cfd, max_memtable_id);
1789 }
1790 }
1791
FlushMemTable(ColumnFamilyData * cfd,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1792 Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
1793 const FlushOptions& flush_options,
1794 FlushReason flush_reason, bool writes_stopped) {
1795 // This method should not be called if atomic_flush is true.
1796 assert(!immutable_db_options_.atomic_flush);
1797 Status s;
1798 if (!flush_options.allow_write_stall) {
1799 bool flush_needed = true;
1800 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1801 TEST_SYNC_POINT("DBImpl::FlushMemTable:StallWaitDone");
1802 if (!s.ok() || !flush_needed) {
1803 return s;
1804 }
1805 }
1806
1807 autovector<FlushRequest> flush_reqs;
1808 autovector<uint64_t> memtable_ids_to_wait;
1809 {
1810 WriteContext context;
1811 InstrumentedMutexLock guard_lock(&mutex_);
1812
1813 WriteThread::Writer w;
1814 WriteThread::Writer nonmem_w;
1815 if (!writes_stopped) {
1816 write_thread_.EnterUnbatched(&w, &mutex_);
1817 if (two_write_queues_) {
1818 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1819 }
1820 }
1821 WaitForPendingWrites();
1822
1823 if (flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
1824 (!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) {
1825 // Note that, when flush reason is kErrorRecoveryRetryFlush, during the
1826 // auto retry resume, we want to avoid creating new small memtables.
1827 // Therefore, SwitchMemtable will not be called. Also, since ResumeImpl
1828 // will iterate through all the CFs and call FlushMemtable during auto
1829 // retry resume, it is possible that in some CFs,
1830 // cfd->imm()->NumNotFlushed() = 0. In this case, so no flush request will
1831 // be created and scheduled, status::OK() will be returned.
1832 s = SwitchMemtable(cfd, &context);
1833 }
1834 const uint64_t flush_memtable_id = port::kMaxUint64;
1835 if (s.ok()) {
1836 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1837 !cached_recoverable_state_empty_.load()) {
1838 FlushRequest req{{cfd, flush_memtable_id}};
1839 flush_reqs.emplace_back(std::move(req));
1840 memtable_ids_to_wait.emplace_back(cfd->imm()->GetLatestMemTableID());
1841 }
1842 if (immutable_db_options_.persist_stats_to_disk &&
1843 flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
1844 ColumnFamilyData* cfd_stats =
1845 versions_->GetColumnFamilySet()->GetColumnFamily(
1846 kPersistentStatsColumnFamilyName);
1847 if (cfd_stats != nullptr && cfd_stats != cfd &&
1848 !cfd_stats->mem()->IsEmpty()) {
1849 // only force flush stats CF when it will be the only CF lagging
1850 // behind after the current flush
1851 bool stats_cf_flush_needed = true;
1852 for (auto* loop_cfd : *versions_->GetColumnFamilySet()) {
1853 if (loop_cfd == cfd_stats || loop_cfd == cfd) {
1854 continue;
1855 }
1856 if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) {
1857 stats_cf_flush_needed = false;
1858 }
1859 }
1860 if (stats_cf_flush_needed) {
1861 ROCKS_LOG_INFO(immutable_db_options_.info_log,
1862 "Force flushing stats CF with manual flush of %s "
1863 "to avoid holding old logs",
1864 cfd->GetName().c_str());
1865 s = SwitchMemtable(cfd_stats, &context);
1866 FlushRequest req{{cfd_stats, flush_memtable_id}};
1867 flush_reqs.emplace_back(std::move(req));
1868 memtable_ids_to_wait.emplace_back(
1869 cfd->imm()->GetLatestMemTableID());
1870 }
1871 }
1872 }
1873 }
1874
1875 if (s.ok() && !flush_reqs.empty()) {
1876 for (const auto& req : flush_reqs) {
1877 assert(req.size() == 1);
1878 ColumnFamilyData* loop_cfd = req[0].first;
1879 loop_cfd->imm()->FlushRequested();
1880 }
1881 // If the caller wants to wait for this flush to complete, it indicates
1882 // that the caller expects the ColumnFamilyData not to be free'ed by
1883 // other threads which may drop the column family concurrently.
1884 // Therefore, we increase the cfd's ref count.
1885 if (flush_options.wait) {
1886 for (const auto& req : flush_reqs) {
1887 assert(req.size() == 1);
1888 ColumnFamilyData* loop_cfd = req[0].first;
1889 loop_cfd->Ref();
1890 }
1891 }
1892 for (const auto& req : flush_reqs) {
1893 SchedulePendingFlush(req, flush_reason);
1894 }
1895 MaybeScheduleFlushOrCompaction();
1896 }
1897
1898 if (!writes_stopped) {
1899 write_thread_.ExitUnbatched(&w);
1900 if (two_write_queues_) {
1901 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1902 }
1903 }
1904 }
1905 TEST_SYNC_POINT("DBImpl::FlushMemTable:AfterScheduleFlush");
1906 TEST_SYNC_POINT("DBImpl::FlushMemTable:BeforeWaitForBgFlush");
1907 if (s.ok() && flush_options.wait) {
1908 autovector<ColumnFamilyData*> cfds;
1909 autovector<const uint64_t*> flush_memtable_ids;
1910 assert(flush_reqs.size() == memtable_ids_to_wait.size());
1911 for (size_t i = 0; i < flush_reqs.size(); ++i) {
1912 assert(flush_reqs[i].size() == 1);
1913 cfds.push_back(flush_reqs[i][0].first);
1914 flush_memtable_ids.push_back(&(memtable_ids_to_wait[i]));
1915 }
1916 s = WaitForFlushMemTables(
1917 cfds, flush_memtable_ids,
1918 (flush_reason == FlushReason::kErrorRecovery ||
1919 flush_reason == FlushReason::kErrorRecoveryRetryFlush));
1920 InstrumentedMutexLock lock_guard(&mutex_);
1921 for (auto* tmp_cfd : cfds) {
1922 tmp_cfd->UnrefAndTryDelete();
1923 }
1924 }
1925 TEST_SYNC_POINT("DBImpl::FlushMemTable:FlushMemTableFinished");
1926 return s;
1927 }
1928
1929 // Flush all elements in 'column_family_datas'
1930 // and atomically record the result to the MANIFEST.
AtomicFlushMemTables(const autovector<ColumnFamilyData * > & column_family_datas,const FlushOptions & flush_options,FlushReason flush_reason,bool writes_stopped)1931 Status DBImpl::AtomicFlushMemTables(
1932 const autovector<ColumnFamilyData*>& column_family_datas,
1933 const FlushOptions& flush_options, FlushReason flush_reason,
1934 bool writes_stopped) {
1935 Status s;
1936 if (!flush_options.allow_write_stall) {
1937 int num_cfs_to_flush = 0;
1938 for (auto cfd : column_family_datas) {
1939 bool flush_needed = true;
1940 s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
1941 if (!s.ok()) {
1942 return s;
1943 } else if (flush_needed) {
1944 ++num_cfs_to_flush;
1945 }
1946 }
1947 if (0 == num_cfs_to_flush) {
1948 return s;
1949 }
1950 }
1951 FlushRequest flush_req;
1952 autovector<ColumnFamilyData*> cfds;
1953 {
1954 WriteContext context;
1955 InstrumentedMutexLock guard_lock(&mutex_);
1956
1957 WriteThread::Writer w;
1958 WriteThread::Writer nonmem_w;
1959 if (!writes_stopped) {
1960 write_thread_.EnterUnbatched(&w, &mutex_);
1961 if (two_write_queues_) {
1962 nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1963 }
1964 }
1965 WaitForPendingWrites();
1966
1967 for (auto cfd : column_family_datas) {
1968 if (cfd->IsDropped()) {
1969 continue;
1970 }
1971 if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
1972 !cached_recoverable_state_empty_.load()) {
1973 cfds.emplace_back(cfd);
1974 }
1975 }
1976 for (auto cfd : cfds) {
1977 if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
1978 flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
1979 continue;
1980 }
1981 cfd->Ref();
1982 s = SwitchMemtable(cfd, &context);
1983 cfd->UnrefAndTryDelete();
1984 if (!s.ok()) {
1985 break;
1986 }
1987 }
1988 if (s.ok()) {
1989 AssignAtomicFlushSeq(cfds);
1990 for (auto cfd : cfds) {
1991 cfd->imm()->FlushRequested();
1992 }
1993 // If the caller wants to wait for this flush to complete, it indicates
1994 // that the caller expects the ColumnFamilyData not to be free'ed by
1995 // other threads which may drop the column family concurrently.
1996 // Therefore, we increase the cfd's ref count.
1997 if (flush_options.wait) {
1998 for (auto cfd : cfds) {
1999 cfd->Ref();
2000 }
2001 }
2002 GenerateFlushRequest(cfds, &flush_req);
2003 SchedulePendingFlush(flush_req, flush_reason);
2004 MaybeScheduleFlushOrCompaction();
2005 }
2006
2007 if (!writes_stopped) {
2008 write_thread_.ExitUnbatched(&w);
2009 if (two_write_queues_) {
2010 nonmem_write_thread_.ExitUnbatched(&nonmem_w);
2011 }
2012 }
2013 }
2014 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:AfterScheduleFlush");
2015 TEST_SYNC_POINT("DBImpl::AtomicFlushMemTables:BeforeWaitForBgFlush");
2016 if (s.ok() && flush_options.wait) {
2017 autovector<const uint64_t*> flush_memtable_ids;
2018 for (auto& iter : flush_req) {
2019 flush_memtable_ids.push_back(&(iter.second));
2020 }
2021 s = WaitForFlushMemTables(
2022 cfds, flush_memtable_ids,
2023 (flush_reason == FlushReason::kErrorRecovery ||
2024 flush_reason == FlushReason::kErrorRecoveryRetryFlush));
2025 InstrumentedMutexLock lock_guard(&mutex_);
2026 for (auto* cfd : cfds) {
2027 cfd->UnrefAndTryDelete();
2028 }
2029 }
2030 return s;
2031 }
2032
2033 // Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
2034 // cause write stall, for example if one memtable is being flushed already.
2035 // This method tries to avoid write stall (similar to CompactRange() behavior)
2036 // it emulates how the SuperVersion / LSM would change if flush happens, checks
2037 // it against various constrains and delays flush if it'd cause write stall.
2038 // Called should check status and flush_needed to see if flush already happened.
WaitUntilFlushWouldNotStallWrites(ColumnFamilyData * cfd,bool * flush_needed)2039 Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
2040 bool* flush_needed) {
2041 {
2042 *flush_needed = true;
2043 InstrumentedMutexLock l(&mutex_);
2044 uint64_t orig_active_memtable_id = cfd->mem()->GetID();
2045 WriteStallCondition write_stall_condition = WriteStallCondition::kNormal;
2046 do {
2047 if (write_stall_condition != WriteStallCondition::kNormal) {
2048 // Same error handling as user writes: Don't wait if there's a
2049 // background error, even if it's a soft error. We might wait here
2050 // indefinitely as the pending flushes/compactions may never finish
2051 // successfully, resulting in the stall condition lasting indefinitely
2052 if (error_handler_.IsBGWorkStopped()) {
2053 return error_handler_.GetBGError();
2054 }
2055
2056 TEST_SYNC_POINT("DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait");
2057 ROCKS_LOG_INFO(immutable_db_options_.info_log,
2058 "[%s] WaitUntilFlushWouldNotStallWrites"
2059 " waiting on stall conditions to clear",
2060 cfd->GetName().c_str());
2061 bg_cv_.Wait();
2062 }
2063 if (cfd->IsDropped()) {
2064 return Status::ColumnFamilyDropped();
2065 }
2066 if (shutting_down_.load(std::memory_order_acquire)) {
2067 return Status::ShutdownInProgress();
2068 }
2069
2070 uint64_t earliest_memtable_id =
2071 std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID());
2072 if (earliest_memtable_id > orig_active_memtable_id) {
2073 // We waited so long that the memtable we were originally waiting on was
2074 // flushed.
2075 *flush_needed = false;
2076 return Status::OK();
2077 }
2078
2079 const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions();
2080 const auto* vstorage = cfd->current()->storage_info();
2081
2082 // Skip stalling check if we're below auto-flush and auto-compaction
2083 // triggers. If it stalled in these conditions, that'd mean the stall
2084 // triggers are so low that stalling is needed for any background work. In
2085 // that case we shouldn't wait since background work won't be scheduled.
2086 if (cfd->imm()->NumNotFlushed() <
2087 cfd->ioptions()->min_write_buffer_number_to_merge &&
2088 vstorage->l0_delay_trigger_count() <
2089 mutable_cf_options.level0_file_num_compaction_trigger) {
2090 break;
2091 }
2092
2093 // check whether one extra immutable memtable or an extra L0 file would
2094 // cause write stalling mode to be entered. It could still enter stall
2095 // mode due to pending compaction bytes, but that's less common
2096 write_stall_condition = ColumnFamilyData::GetWriteStallConditionAndCause(
2097 cfd->imm()->NumNotFlushed() + 1,
2098 vstorage->l0_delay_trigger_count() + 1,
2099 vstorage->estimated_compaction_needed_bytes(),
2100 mutable_cf_options, *cfd->ioptions())
2101 .first;
2102 } while (write_stall_condition != WriteStallCondition::kNormal);
2103 }
2104 return Status::OK();
2105 }
2106
2107 // Wait for memtables to be flushed for multiple column families.
2108 // let N = cfds.size()
2109 // for i in [0, N),
2110 // 1) if flush_memtable_ids[i] is not null, then the memtables with lower IDs
2111 // have to be flushed for THIS column family;
2112 // 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
2113 // family have to be flushed.
2114 // Finish waiting when ALL column families finish flushing memtables.
2115 // resuming_from_bg_err indicates whether the caller is trying to resume from
2116 // background error or in normal processing.
WaitForFlushMemTables(const autovector<ColumnFamilyData * > & cfds,const autovector<const uint64_t * > & flush_memtable_ids,bool resuming_from_bg_err)2117 Status DBImpl::WaitForFlushMemTables(
2118 const autovector<ColumnFamilyData*>& cfds,
2119 const autovector<const uint64_t*>& flush_memtable_ids,
2120 bool resuming_from_bg_err) {
2121 int num = static_cast<int>(cfds.size());
2122 // Wait until the compaction completes
2123 InstrumentedMutexLock l(&mutex_);
2124 // If the caller is trying to resume from bg error, then
2125 // error_handler_.IsDBStopped() is true.
2126 while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
2127 if (shutting_down_.load(std::memory_order_acquire)) {
2128 return Status::ShutdownInProgress();
2129 }
2130 // If an error has occurred during resumption, then no need to wait.
2131 if (!error_handler_.GetRecoveryError().ok()) {
2132 break;
2133 }
2134 // If BGWorkStopped, which indicate that there is a BG error and
2135 // 1) soft error but requires no BG work, 2) no in auto_recovery_
2136 if (!resuming_from_bg_err && error_handler_.IsBGWorkStopped() &&
2137 error_handler_.GetBGError().severity() < Status::Severity::kHardError) {
2138 return error_handler_.GetBGError();
2139 }
2140
2141 // Number of column families that have been dropped.
2142 int num_dropped = 0;
2143 // Number of column families that have finished flush.
2144 int num_finished = 0;
2145 for (int i = 0; i < num; ++i) {
2146 if (cfds[i]->IsDropped()) {
2147 ++num_dropped;
2148 } else if (cfds[i]->imm()->NumNotFlushed() == 0 ||
2149 (flush_memtable_ids[i] != nullptr &&
2150 cfds[i]->imm()->GetEarliestMemTableID() >
2151 *flush_memtable_ids[i])) {
2152 ++num_finished;
2153 }
2154 }
2155 if (1 == num_dropped && 1 == num) {
2156 return Status::ColumnFamilyDropped();
2157 }
2158 // Column families involved in this flush request have either been dropped
2159 // or finished flush. Then it's time to finish waiting.
2160 if (num_dropped + num_finished == num) {
2161 break;
2162 }
2163 bg_cv_.Wait();
2164 }
2165 Status s;
2166 // If not resuming from bg error, and an error has caused the DB to stop,
2167 // then report the bg error to caller.
2168 if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
2169 s = error_handler_.GetBGError();
2170 }
2171 return s;
2172 }
2173
EnableAutoCompaction(const std::vector<ColumnFamilyHandle * > & column_family_handles)2174 Status DBImpl::EnableAutoCompaction(
2175 const std::vector<ColumnFamilyHandle*>& column_family_handles) {
2176 Status s;
2177 for (auto cf_ptr : column_family_handles) {
2178 Status status =
2179 this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
2180 if (!status.ok()) {
2181 s = status;
2182 }
2183 }
2184
2185 return s;
2186 }
2187
DisableManualCompaction()2188 void DBImpl::DisableManualCompaction() {
2189 InstrumentedMutexLock l(&mutex_);
2190 manual_compaction_paused_.fetch_add(1, std::memory_order_release);
2191 // Wait for any pending manual compactions to finish (typically through
2192 // failing with `Status::Incomplete`) prior to returning. This way we are
2193 // guaranteed no pending manual compaction will commit while manual
2194 // compactions are "disabled".
2195 while (HasPendingManualCompaction()) {
2196 bg_cv_.Wait();
2197 }
2198 }
2199
EnableManualCompaction()2200 void DBImpl::EnableManualCompaction() {
2201 InstrumentedMutexLock l(&mutex_);
2202 assert(manual_compaction_paused_ > 0);
2203 manual_compaction_paused_.fetch_sub(1, std::memory_order_release);
2204 }
2205
MaybeScheduleFlushOrCompaction()2206 void DBImpl::MaybeScheduleFlushOrCompaction() {
2207 mutex_.AssertHeld();
2208 if (!opened_successfully_) {
2209 // Compaction may introduce data race to DB open
2210 return;
2211 }
2212 if (bg_work_paused_ > 0) {
2213 // we paused the background work
2214 return;
2215 } else if (error_handler_.IsBGWorkStopped() &&
2216 !error_handler_.IsRecoveryInProgress()) {
2217 // There has been a hard error and this call is not part of the recovery
2218 // sequence. Bail out here so we don't get into an endless loop of
2219 // scheduling BG work which will again call this function
2220 return;
2221 } else if (shutting_down_.load(std::memory_order_acquire)) {
2222 // DB is being deleted; no more background compactions
2223 return;
2224 }
2225 auto bg_job_limits = GetBGJobLimits();
2226 bool is_flush_pool_empty =
2227 env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
2228 while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
2229 bg_flush_scheduled_ < bg_job_limits.max_flushes) {
2230 bg_flush_scheduled_++;
2231 FlushThreadArg* fta = new FlushThreadArg;
2232 fta->db_ = this;
2233 fta->thread_pri_ = Env::Priority::HIGH;
2234 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::HIGH, this,
2235 &DBImpl::UnscheduleFlushCallback);
2236 --unscheduled_flushes_;
2237 TEST_SYNC_POINT_CALLBACK(
2238 "DBImpl::MaybeScheduleFlushOrCompaction:AfterSchedule:0",
2239 &unscheduled_flushes_);
2240 }
2241
2242 // special case -- if high-pri (flush) thread pool is empty, then schedule
2243 // flushes in low-pri (compaction) thread pool.
2244 if (is_flush_pool_empty) {
2245 while (unscheduled_flushes_ > 0 &&
2246 bg_flush_scheduled_ + bg_compaction_scheduled_ <
2247 bg_job_limits.max_flushes) {
2248 bg_flush_scheduled_++;
2249 FlushThreadArg* fta = new FlushThreadArg;
2250 fta->db_ = this;
2251 fta->thread_pri_ = Env::Priority::LOW;
2252 env_->Schedule(&DBImpl::BGWorkFlush, fta, Env::Priority::LOW, this,
2253 &DBImpl::UnscheduleFlushCallback);
2254 --unscheduled_flushes_;
2255 }
2256 }
2257
2258 if (bg_compaction_paused_ > 0) {
2259 // we paused the background compaction
2260 return;
2261 } else if (error_handler_.IsBGWorkStopped()) {
2262 // Compaction is not part of the recovery sequence from a hard error. We
2263 // might get here because recovery might do a flush and install a new
2264 // super version, which will try to schedule pending compactions. Bail
2265 // out here and let the higher level recovery handle compactions
2266 return;
2267 }
2268
2269 if (HasExclusiveManualCompaction()) {
2270 // only manual compactions are allowed to run. don't schedule automatic
2271 // compactions
2272 TEST_SYNC_POINT("DBImpl::MaybeScheduleFlushOrCompaction:Conflict");
2273 return;
2274 }
2275
2276 while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
2277 unscheduled_compactions_ > 0) {
2278 CompactionArg* ca = new CompactionArg;
2279 ca->db = this;
2280 ca->compaction_pri_ = Env::Priority::LOW;
2281 ca->prepicked_compaction = nullptr;
2282 bg_compaction_scheduled_++;
2283 unscheduled_compactions_--;
2284 env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
2285 &DBImpl::UnscheduleCompactionCallback);
2286 }
2287 }
2288
GetBGJobLimits() const2289 DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
2290 mutex_.AssertHeld();
2291 return GetBGJobLimits(mutable_db_options_.max_background_flushes,
2292 mutable_db_options_.max_background_compactions,
2293 mutable_db_options_.max_background_jobs,
2294 write_controller_.NeedSpeedupCompaction());
2295 }
2296
GetBGJobLimits(int max_background_flushes,int max_background_compactions,int max_background_jobs,bool parallelize_compactions)2297 DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
2298 int max_background_compactions,
2299 int max_background_jobs,
2300 bool parallelize_compactions) {
2301 BGJobLimits res;
2302 if (max_background_flushes == -1 && max_background_compactions == -1) {
2303 // for our first stab implementing max_background_jobs, simply allocate a
2304 // quarter of the threads to flushes.
2305 res.max_flushes = std::max(1, max_background_jobs / 4);
2306 res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
2307 } else {
2308 // compatibility code in case users haven't migrated to max_background_jobs,
2309 // which automatically computes flush/compaction limits
2310 res.max_flushes = std::max(1, max_background_flushes);
2311 res.max_compactions = std::max(1, max_background_compactions);
2312 }
2313 if (!parallelize_compactions) {
2314 // throttle background compactions until we deem necessary
2315 res.max_compactions = 1;
2316 }
2317 return res;
2318 }
2319
AddToCompactionQueue(ColumnFamilyData * cfd)2320 void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
2321 assert(!cfd->queued_for_compaction());
2322 cfd->Ref();
2323 compaction_queue_.push_back(cfd);
2324 cfd->set_queued_for_compaction(true);
2325 }
2326
PopFirstFromCompactionQueue()2327 ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
2328 assert(!compaction_queue_.empty());
2329 auto cfd = *compaction_queue_.begin();
2330 compaction_queue_.pop_front();
2331 assert(cfd->queued_for_compaction());
2332 cfd->set_queued_for_compaction(false);
2333 return cfd;
2334 }
2335
PopFirstFromFlushQueue()2336 DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() {
2337 assert(!flush_queue_.empty());
2338 FlushRequest flush_req = flush_queue_.front();
2339 flush_queue_.pop_front();
2340 if (!immutable_db_options_.atomic_flush) {
2341 assert(flush_req.size() == 1);
2342 }
2343 for (const auto& elem : flush_req) {
2344 if (!immutable_db_options_.atomic_flush) {
2345 ColumnFamilyData* cfd = elem.first;
2346 assert(cfd);
2347 assert(cfd->queued_for_flush());
2348 cfd->set_queued_for_flush(false);
2349 }
2350 }
2351 // TODO: need to unset flush reason?
2352 return flush_req;
2353 }
2354
PickCompactionFromQueue(std::unique_ptr<TaskLimiterToken> * token,LogBuffer * log_buffer)2355 ColumnFamilyData* DBImpl::PickCompactionFromQueue(
2356 std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer) {
2357 assert(!compaction_queue_.empty());
2358 assert(*token == nullptr);
2359 autovector<ColumnFamilyData*> throttled_candidates;
2360 ColumnFamilyData* cfd = nullptr;
2361 while (!compaction_queue_.empty()) {
2362 auto first_cfd = *compaction_queue_.begin();
2363 compaction_queue_.pop_front();
2364 assert(first_cfd->queued_for_compaction());
2365 if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) {
2366 throttled_candidates.push_back(first_cfd);
2367 continue;
2368 }
2369 cfd = first_cfd;
2370 cfd->set_queued_for_compaction(false);
2371 break;
2372 }
2373 // Add throttled compaction candidates back to queue in the original order.
2374 for (auto iter = throttled_candidates.rbegin();
2375 iter != throttled_candidates.rend(); ++iter) {
2376 compaction_queue_.push_front(*iter);
2377 }
2378 return cfd;
2379 }
2380
SchedulePendingFlush(const FlushRequest & flush_req,FlushReason flush_reason)2381 void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
2382 FlushReason flush_reason) {
2383 mutex_.AssertHeld();
2384 if (flush_req.empty()) {
2385 return;
2386 }
2387 if (!immutable_db_options_.atomic_flush) {
2388 // For the non-atomic flush case, we never schedule multiple column
2389 // families in the same flush request.
2390 assert(flush_req.size() == 1);
2391 ColumnFamilyData* cfd = flush_req[0].first;
2392 assert(cfd);
2393 if (!cfd->queued_for_flush() && cfd->imm()->IsFlushPending()) {
2394 cfd->Ref();
2395 cfd->set_queued_for_flush(true);
2396 cfd->SetFlushReason(flush_reason);
2397 ++unscheduled_flushes_;
2398 flush_queue_.push_back(flush_req);
2399 }
2400 } else {
2401 for (auto& iter : flush_req) {
2402 ColumnFamilyData* cfd = iter.first;
2403 cfd->Ref();
2404 cfd->SetFlushReason(flush_reason);
2405 }
2406 ++unscheduled_flushes_;
2407 flush_queue_.push_back(flush_req);
2408 }
2409 }
2410
SchedulePendingCompaction(ColumnFamilyData * cfd)2411 void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
2412 mutex_.AssertHeld();
2413 if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) {
2414 AddToCompactionQueue(cfd);
2415 ++unscheduled_compactions_;
2416 }
2417 }
2418
SchedulePendingPurge(std::string fname,std::string dir_to_sync,FileType type,uint64_t number,int job_id)2419 void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2420 FileType type, uint64_t number, int job_id) {
2421 mutex_.AssertHeld();
2422 PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
2423 purge_files_.insert({{number, std::move(file_info)}});
2424 }
2425
BGWorkFlush(void * arg)2426 void DBImpl::BGWorkFlush(void* arg) {
2427 FlushThreadArg fta = *(reinterpret_cast<FlushThreadArg*>(arg));
2428 delete reinterpret_cast<FlushThreadArg*>(arg);
2429
2430 IOSTATS_SET_THREAD_POOL_ID(fta.thread_pri_);
2431 TEST_SYNC_POINT("DBImpl::BGWorkFlush");
2432 static_cast_with_check<DBImpl>(fta.db_)->BackgroundCallFlush(fta.thread_pri_);
2433 TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
2434 }
2435
BGWorkCompaction(void * arg)2436 void DBImpl::BGWorkCompaction(void* arg) {
2437 CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
2438 delete reinterpret_cast<CompactionArg*>(arg);
2439 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
2440 TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
2441 auto prepicked_compaction =
2442 static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
2443 static_cast_with_check<DBImpl>(ca.db)->BackgroundCallCompaction(
2444 prepicked_compaction, Env::Priority::LOW);
2445 delete prepicked_compaction;
2446 }
2447
BGWorkBottomCompaction(void * arg)2448 void DBImpl::BGWorkBottomCompaction(void* arg) {
2449 CompactionArg ca = *(static_cast<CompactionArg*>(arg));
2450 delete static_cast<CompactionArg*>(arg);
2451 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
2452 TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
2453 auto* prepicked_compaction = ca.prepicked_compaction;
2454 assert(prepicked_compaction && prepicked_compaction->compaction &&
2455 !prepicked_compaction->manual_compaction_state);
2456 ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
2457 delete prepicked_compaction;
2458 }
2459
BGWorkPurge(void * db)2460 void DBImpl::BGWorkPurge(void* db) {
2461 IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
2462 TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
2463 reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
2464 TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
2465 }
2466
UnscheduleCompactionCallback(void * arg)2467 void DBImpl::UnscheduleCompactionCallback(void* arg) {
2468 CompactionArg* ca_ptr = reinterpret_cast<CompactionArg*>(arg);
2469 Env::Priority compaction_pri = ca_ptr->compaction_pri_;
2470 if (Env::Priority::BOTTOM == compaction_pri) {
2471 // Decrement bg_bottom_compaction_scheduled_ if priority is BOTTOM
2472 ca_ptr->db->bg_bottom_compaction_scheduled_--;
2473 } else if (Env::Priority::LOW == compaction_pri) {
2474 // Decrement bg_compaction_scheduled_ if priority is LOW
2475 ca_ptr->db->bg_compaction_scheduled_--;
2476 }
2477 CompactionArg ca = *(ca_ptr);
2478 delete reinterpret_cast<CompactionArg*>(arg);
2479 if (ca.prepicked_compaction != nullptr) {
2480 if (ca.prepicked_compaction->compaction != nullptr) {
2481 delete ca.prepicked_compaction->compaction;
2482 }
2483 delete ca.prepicked_compaction;
2484 }
2485 TEST_SYNC_POINT("DBImpl::UnscheduleCompactionCallback");
2486 }
2487
UnscheduleFlushCallback(void * arg)2488 void DBImpl::UnscheduleFlushCallback(void* arg) {
2489 // Decrement bg_flush_scheduled_ in flush callback
2490 reinterpret_cast<FlushThreadArg*>(arg)->db_->bg_flush_scheduled_--;
2491 Env::Priority flush_pri = reinterpret_cast<FlushThreadArg*>(arg)->thread_pri_;
2492 if (Env::Priority::LOW == flush_pri) {
2493 TEST_SYNC_POINT("DBImpl::UnscheduleLowFlushCallback");
2494 } else if (Env::Priority::HIGH == flush_pri) {
2495 TEST_SYNC_POINT("DBImpl::UnscheduleHighFlushCallback");
2496 }
2497 delete reinterpret_cast<FlushThreadArg*>(arg);
2498 TEST_SYNC_POINT("DBImpl::UnscheduleFlushCallback");
2499 }
2500
BackgroundFlush(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,FlushReason * reason,Env::Priority thread_pri)2501 Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
2502 LogBuffer* log_buffer, FlushReason* reason,
2503 Env::Priority thread_pri) {
2504 mutex_.AssertHeld();
2505
2506 Status status;
2507 *reason = FlushReason::kOthers;
2508 // If BG work is stopped due to an error, but a recovery is in progress,
2509 // that means this flush is part of the recovery. So allow it to go through
2510 if (!error_handler_.IsBGWorkStopped()) {
2511 if (shutting_down_.load(std::memory_order_acquire)) {
2512 status = Status::ShutdownInProgress();
2513 }
2514 } else if (!error_handler_.IsRecoveryInProgress()) {
2515 status = error_handler_.GetBGError();
2516 }
2517
2518 if (!status.ok()) {
2519 return status;
2520 }
2521
2522 autovector<BGFlushArg> bg_flush_args;
2523 std::vector<SuperVersionContext>& superversion_contexts =
2524 job_context->superversion_contexts;
2525 autovector<ColumnFamilyData*> column_families_not_to_flush;
2526 while (!flush_queue_.empty()) {
2527 // This cfd is already referenced
2528 const FlushRequest& flush_req = PopFirstFromFlushQueue();
2529 superversion_contexts.clear();
2530 superversion_contexts.reserve(flush_req.size());
2531
2532 for (const auto& iter : flush_req) {
2533 ColumnFamilyData* cfd = iter.first;
2534 if (cfd->IsDropped() || !cfd->imm()->IsFlushPending()) {
2535 // can't flush this CF, try next one
2536 column_families_not_to_flush.push_back(cfd);
2537 continue;
2538 }
2539 superversion_contexts.emplace_back(SuperVersionContext(true));
2540 bg_flush_args.emplace_back(cfd, iter.second,
2541 &(superversion_contexts.back()));
2542 }
2543 if (!bg_flush_args.empty()) {
2544 break;
2545 }
2546 }
2547
2548 if (!bg_flush_args.empty()) {
2549 auto bg_job_limits = GetBGJobLimits();
2550 for (const auto& arg : bg_flush_args) {
2551 ColumnFamilyData* cfd = arg.cfd_;
2552 ROCKS_LOG_BUFFER(
2553 log_buffer,
2554 "Calling FlushMemTableToOutputFile with column "
2555 "family [%s], flush slots available %d, compaction slots available "
2556 "%d, "
2557 "flush slots scheduled %d, compaction slots scheduled %d",
2558 cfd->GetName().c_str(), bg_job_limits.max_flushes,
2559 bg_job_limits.max_compactions, bg_flush_scheduled_,
2560 bg_compaction_scheduled_);
2561 }
2562 status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
2563 job_context, log_buffer, thread_pri);
2564 TEST_SYNC_POINT("DBImpl::BackgroundFlush:BeforeFlush");
2565 // All the CFDs in the FlushReq must have the same flush reason, so just
2566 // grab the first one
2567 *reason = bg_flush_args[0].cfd_->GetFlushReason();
2568 for (auto& arg : bg_flush_args) {
2569 ColumnFamilyData* cfd = arg.cfd_;
2570 if (cfd->UnrefAndTryDelete()) {
2571 arg.cfd_ = nullptr;
2572 }
2573 }
2574 }
2575 for (auto cfd : column_families_not_to_flush) {
2576 cfd->UnrefAndTryDelete();
2577 }
2578 return status;
2579 }
2580
BackgroundCallFlush(Env::Priority thread_pri)2581 void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
2582 bool made_progress = false;
2583 JobContext job_context(next_job_id_.fetch_add(1), true);
2584
2585 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
2586
2587 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2588 immutable_db_options_.info_log.get());
2589 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:1");
2590 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:Start:2");
2591 {
2592 InstrumentedMutexLock l(&mutex_);
2593 assert(bg_flush_scheduled_);
2594 num_running_flushes_++;
2595
2596 std::unique_ptr<std::list<uint64_t>::iterator>
2597 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2598 CaptureCurrentFileNumberInPendingOutputs()));
2599 FlushReason reason;
2600
2601 Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer,
2602 &reason, thread_pri);
2603 if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped() &&
2604 reason != FlushReason::kErrorRecovery) {
2605 // Wait a little bit before retrying background flush in
2606 // case this is an environmental problem and we do not want to
2607 // chew up resources for failed flushes for the duration of
2608 // the problem.
2609 uint64_t error_cnt =
2610 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2611 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2612 mutex_.Unlock();
2613 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2614 "Waiting after background flush error: %s"
2615 "Accumulated background error counts: %" PRIu64,
2616 s.ToString().c_str(), error_cnt);
2617 log_buffer.FlushBufferToLog();
2618 LogFlush(immutable_db_options_.info_log);
2619 immutable_db_options_.clock->SleepForMicroseconds(1000000);
2620 mutex_.Lock();
2621 }
2622
2623 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
2624 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2625
2626 // If flush failed, we want to delete all temporary files that we might have
2627 // created. Thus, we force full scan in FindObsoleteFiles()
2628 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2629 !s.IsColumnFamilyDropped());
2630 // delete unnecessary files if any, this is done outside the mutex
2631 if (job_context.HaveSomethingToClean() ||
2632 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2633 mutex_.Unlock();
2634 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
2635 // Have to flush the info logs before bg_flush_scheduled_--
2636 // because if bg_flush_scheduled_ becomes 0 and the lock is
2637 // released, the deconstructor of DB can kick in and destroy all the
2638 // states of DB so info_log might not be available after that point.
2639 // It also applies to access other states that DB owns.
2640 log_buffer.FlushBufferToLog();
2641 if (job_context.HaveSomethingToDelete()) {
2642 PurgeObsoleteFiles(job_context);
2643 }
2644 job_context.Clean();
2645 mutex_.Lock();
2646 }
2647 TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:ContextCleanedUp");
2648
2649 assert(num_running_flushes_ > 0);
2650 num_running_flushes_--;
2651 bg_flush_scheduled_--;
2652 // See if there's more work to be done
2653 MaybeScheduleFlushOrCompaction();
2654 atomic_flush_install_cv_.SignalAll();
2655 bg_cv_.SignalAll();
2656 // IMPORTANT: there should be no code after calling SignalAll. This call may
2657 // signal the DB destructor that it's OK to proceed with destruction. In
2658 // that case, all DB variables will be dealloacated and referencing them
2659 // will cause trouble.
2660 }
2661 }
2662
BackgroundCallCompaction(PrepickedCompaction * prepicked_compaction,Env::Priority bg_thread_pri)2663 void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2664 Env::Priority bg_thread_pri) {
2665 bool made_progress = false;
2666 JobContext job_context(next_job_id_.fetch_add(1), true);
2667 TEST_SYNC_POINT("BackgroundCallCompaction:0");
2668 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
2669 immutable_db_options_.info_log.get());
2670 {
2671 InstrumentedMutexLock l(&mutex_);
2672
2673 // This call will unlock/lock the mutex to wait for current running
2674 // IngestExternalFile() calls to finish.
2675 WaitForIngestFile();
2676
2677 num_running_compactions_++;
2678
2679 std::unique_ptr<std::list<uint64_t>::iterator>
2680 pending_outputs_inserted_elem(new std::list<uint64_t>::iterator(
2681 CaptureCurrentFileNumberInPendingOutputs()));
2682
2683 assert((bg_thread_pri == Env::Priority::BOTTOM &&
2684 bg_bottom_compaction_scheduled_) ||
2685 (bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
2686 Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
2687 prepicked_compaction, bg_thread_pri);
2688 TEST_SYNC_POINT("BackgroundCallCompaction:1");
2689 if (s.IsBusy()) {
2690 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2691 mutex_.Unlock();
2692 immutable_db_options_.clock->SleepForMicroseconds(
2693 10000); // prevent hot loop
2694 mutex_.Lock();
2695 } else if (!s.ok() && !s.IsShutdownInProgress() &&
2696 !s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
2697 // Wait a little bit before retrying background compaction in
2698 // case this is an environmental problem and we do not want to
2699 // chew up resources for failed compactions for the duration of
2700 // the problem.
2701 uint64_t error_cnt =
2702 default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
2703 bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
2704 mutex_.Unlock();
2705 log_buffer.FlushBufferToLog();
2706 ROCKS_LOG_ERROR(immutable_db_options_.info_log,
2707 "Waiting after background compaction error: %s, "
2708 "Accumulated background error counts: %" PRIu64,
2709 s.ToString().c_str(), error_cnt);
2710 LogFlush(immutable_db_options_.info_log);
2711 immutable_db_options_.clock->SleepForMicroseconds(1000000);
2712 mutex_.Lock();
2713 } else if (s.IsManualCompactionPaused()) {
2714 ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
2715 assert(m);
2716 ROCKS_LOG_BUFFER(&log_buffer, "[%s] [JOB %d] Manual compaction paused",
2717 m->cfd->GetName().c_str(), job_context.job_id);
2718 }
2719
2720 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2721
2722 // If compaction failed, we want to delete all temporary files that we might
2723 // have created (they might not be all recorded in job_context in case of a
2724 // failure). Thus, we force full scan in FindObsoleteFiles()
2725 FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress() &&
2726 !s.IsManualCompactionPaused() &&
2727 !s.IsColumnFamilyDropped() &&
2728 !s.IsBusy());
2729 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles");
2730
2731 // delete unnecessary files if any, this is done outside the mutex
2732 if (job_context.HaveSomethingToClean() ||
2733 job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
2734 mutex_.Unlock();
2735 // Have to flush the info logs before bg_compaction_scheduled_--
2736 // because if bg_flush_scheduled_ becomes 0 and the lock is
2737 // released, the deconstructor of DB can kick in and destroy all the
2738 // states of DB so info_log might not be available after that point.
2739 // It also applies to access other states that DB owns.
2740 log_buffer.FlushBufferToLog();
2741 if (job_context.HaveSomethingToDelete()) {
2742 PurgeObsoleteFiles(job_context);
2743 TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles");
2744 }
2745 job_context.Clean();
2746 mutex_.Lock();
2747 }
2748
2749 assert(num_running_compactions_ > 0);
2750 num_running_compactions_--;
2751 if (bg_thread_pri == Env::Priority::LOW) {
2752 bg_compaction_scheduled_--;
2753 } else {
2754 assert(bg_thread_pri == Env::Priority::BOTTOM);
2755 bg_bottom_compaction_scheduled_--;
2756 }
2757
2758 versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
2759
2760 // See if there's more work to be done
2761 MaybeScheduleFlushOrCompaction();
2762
2763 if (prepicked_compaction != nullptr &&
2764 prepicked_compaction->task_token != nullptr) {
2765 // Releasing task tokens affects the DB state, so must be done before we
2766 // potentially signal the DB close process to proceed below.
2767 prepicked_compaction->task_token->ReleaseOnce();
2768 }
2769
2770 if (made_progress ||
2771 (bg_compaction_scheduled_ == 0 &&
2772 bg_bottom_compaction_scheduled_ == 0) ||
2773 HasPendingManualCompaction() || unscheduled_compactions_ == 0) {
2774 // signal if
2775 // * made_progress -- need to wakeup DelayWrite
2776 // * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
2777 // * HasPendingManualCompaction -- need to wakeup RunManualCompaction
2778 // If none of this is true, there is no need to signal since nobody is
2779 // waiting for it
2780 bg_cv_.SignalAll();
2781 }
2782 // IMPORTANT: there should be no code after calling SignalAll. This call may
2783 // signal the DB destructor that it's OK to proceed with destruction. In
2784 // that case, all DB variables will be dealloacated and referencing them
2785 // will cause trouble.
2786 }
2787 }
2788
BackgroundCompaction(bool * made_progress,JobContext * job_context,LogBuffer * log_buffer,PrepickedCompaction * prepicked_compaction,Env::Priority thread_pri)2789 Status DBImpl::BackgroundCompaction(bool* made_progress,
2790 JobContext* job_context,
2791 LogBuffer* log_buffer,
2792 PrepickedCompaction* prepicked_compaction,
2793 Env::Priority thread_pri) {
2794 ManualCompactionState* manual_compaction =
2795 prepicked_compaction == nullptr
2796 ? nullptr
2797 : prepicked_compaction->manual_compaction_state;
2798 *made_progress = false;
2799 mutex_.AssertHeld();
2800 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
2801
2802 bool is_manual = (manual_compaction != nullptr);
2803 std::unique_ptr<Compaction> c;
2804 if (prepicked_compaction != nullptr &&
2805 prepicked_compaction->compaction != nullptr) {
2806 c.reset(prepicked_compaction->compaction);
2807 }
2808 bool is_prepicked = is_manual || c;
2809
2810 // (manual_compaction->in_progress == false);
2811 bool trivial_move_disallowed =
2812 is_manual && manual_compaction->disallow_trivial_move;
2813
2814 CompactionJobStats compaction_job_stats;
2815 Status status;
2816 if (!error_handler_.IsBGWorkStopped()) {
2817 if (shutting_down_.load(std::memory_order_acquire)) {
2818 status = Status::ShutdownInProgress();
2819 } else if (is_manual &&
2820 manual_compaction_paused_.load(std::memory_order_acquire) > 0) {
2821 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
2822 }
2823 } else {
2824 status = error_handler_.GetBGError();
2825 // If we get here, it means a hard error happened after this compaction
2826 // was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
2827 // a chance to execute. Since we didn't pop a cfd from the compaction
2828 // queue, increment unscheduled_compactions_
2829 unscheduled_compactions_++;
2830 }
2831
2832 if (!status.ok()) {
2833 if (is_manual) {
2834 manual_compaction->status = status;
2835 manual_compaction->done = true;
2836 manual_compaction->in_progress = false;
2837 manual_compaction = nullptr;
2838 }
2839 if (c) {
2840 c->ReleaseCompactionFiles(status);
2841 c.reset();
2842 }
2843 return status;
2844 }
2845
2846 if (is_manual) {
2847 // another thread cannot pick up the same work
2848 manual_compaction->in_progress = true;
2849 }
2850
2851 std::unique_ptr<TaskLimiterToken> task_token;
2852
2853 // InternalKey manual_end_storage;
2854 // InternalKey* manual_end = &manual_end_storage;
2855 bool sfm_reserved_compact_space = false;
2856 if (is_manual) {
2857 ManualCompactionState* m = manual_compaction;
2858 assert(m->in_progress);
2859 if (!c) {
2860 m->done = true;
2861 m->manual_end = nullptr;
2862 ROCKS_LOG_BUFFER(
2863 log_buffer,
2864 "[%s] Manual compaction from level-%d from %s .. "
2865 "%s; nothing to do\n",
2866 m->cfd->GetName().c_str(), m->input_level,
2867 (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
2868 (m->end ? m->end->DebugString(true).c_str() : "(end)"));
2869 } else {
2870 // First check if we have enough room to do the compaction
2871 bool enough_room = EnoughRoomForCompaction(
2872 m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2873
2874 if (!enough_room) {
2875 // Then don't do the compaction
2876 c->ReleaseCompactionFiles(status);
2877 c.reset();
2878 // m's vars will get set properly at the end of this function,
2879 // as long as status == CompactionTooLarge
2880 status = Status::CompactionTooLarge();
2881 } else {
2882 ROCKS_LOG_BUFFER(
2883 log_buffer,
2884 "[%s] Manual compaction from level-%d to level-%d from %s .. "
2885 "%s; will stop at %s\n",
2886 m->cfd->GetName().c_str(), m->input_level, c->output_level(),
2887 (m->begin ? m->begin->DebugString(true).c_str() : "(begin)"),
2888 (m->end ? m->end->DebugString(true).c_str() : "(end)"),
2889 ((m->done || m->manual_end == nullptr)
2890 ? "(end)"
2891 : m->manual_end->DebugString(true).c_str()));
2892 }
2893 }
2894 } else if (!is_prepicked && !compaction_queue_.empty()) {
2895 if (HasExclusiveManualCompaction()) {
2896 // Can't compact right now, but try again later
2897 TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
2898
2899 // Stay in the compaction queue.
2900 unscheduled_compactions_++;
2901
2902 return Status::OK();
2903 }
2904
2905 auto cfd = PickCompactionFromQueue(&task_token, log_buffer);
2906 if (cfd == nullptr) {
2907 // Can't find any executable task from the compaction queue.
2908 // All tasks have been throttled by compaction thread limiter.
2909 ++unscheduled_compactions_;
2910 return Status::Busy();
2911 }
2912
2913 // We unreference here because the following code will take a Ref() on
2914 // this cfd if it is going to use it (Compaction class holds a
2915 // reference).
2916 // This will all happen under a mutex so we don't have to be afraid of
2917 // somebody else deleting it.
2918 if (cfd->UnrefAndTryDelete()) {
2919 // This was the last reference of the column family, so no need to
2920 // compact.
2921 return Status::OK();
2922 }
2923
2924 // Pick up latest mutable CF Options and use it throughout the
2925 // compaction job
2926 // Compaction makes a copy of the latest MutableCFOptions. It should be used
2927 // throughout the compaction procedure to make sure consistency. It will
2928 // eventually be installed into SuperVersion
2929 auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
2930 if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
2931 // NOTE: try to avoid unnecessary copy of MutableCFOptions if
2932 // compaction is not necessary. Need to make sure mutex is held
2933 // until we make a copy in the following code
2934 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
2935 c.reset(cfd->PickCompaction(*mutable_cf_options, mutable_db_options_,
2936 log_buffer));
2937 TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
2938
2939 if (c != nullptr) {
2940 bool enough_room = EnoughRoomForCompaction(
2941 cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
2942
2943 if (!enough_room) {
2944 // Then don't do the compaction
2945 c->ReleaseCompactionFiles(status);
2946 c->column_family_data()
2947 ->current()
2948 ->storage_info()
2949 ->ComputeCompactionScore(*(c->immutable_cf_options()),
2950 *(c->mutable_cf_options()));
2951 AddToCompactionQueue(cfd);
2952 ++unscheduled_compactions_;
2953
2954 c.reset();
2955 // Don't need to sleep here, because BackgroundCallCompaction
2956 // will sleep if !s.ok()
2957 status = Status::CompactionTooLarge();
2958 } else {
2959 // update statistics
2960 RecordInHistogram(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
2961 c->inputs(0)->size());
2962 // There are three things that can change compaction score:
2963 // 1) When flush or compaction finish. This case is covered by
2964 // InstallSuperVersionAndScheduleWork
2965 // 2) When MutableCFOptions changes. This case is also covered by
2966 // InstallSuperVersionAndScheduleWork, because this is when the new
2967 // options take effect.
2968 // 3) When we Pick a new compaction, we "remove" those files being
2969 // compacted from the calculation, which then influences compaction
2970 // score. Here we check if we need the new compaction even without the
2971 // files that are currently being compacted. If we need another
2972 // compaction, we might be able to execute it in parallel, so we add
2973 // it to the queue and schedule a new thread.
2974 if (cfd->NeedsCompaction()) {
2975 // Yes, we need more compactions!
2976 AddToCompactionQueue(cfd);
2977 ++unscheduled_compactions_;
2978 MaybeScheduleFlushOrCompaction();
2979 }
2980 }
2981 }
2982 }
2983 }
2984
2985 IOStatus io_s;
2986 if (!c) {
2987 // Nothing to do
2988 ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
2989 } else if (c->deletion_compaction()) {
2990 // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
2991 // file if there is alive snapshot pointing to it
2992 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
2993 c->column_family_data());
2994 assert(c->num_input_files(1) == 0);
2995 assert(c->level() == 0);
2996 assert(c->column_family_data()->ioptions()->compaction_style ==
2997 kCompactionStyleFIFO);
2998
2999 compaction_job_stats.num_input_files = c->num_input_files(0);
3000
3001 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3002 compaction_job_stats, job_context->job_id);
3003
3004 for (const auto& f : *c->inputs(0)) {
3005 c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
3006 }
3007 status = versions_->LogAndApply(c->column_family_data(),
3008 *c->mutable_cf_options(), c->edit(),
3009 &mutex_, directories_.GetDbDir());
3010 io_s = versions_->io_status();
3011 InstallSuperVersionAndScheduleWork(c->column_family_data(),
3012 &job_context->superversion_contexts[0],
3013 *c->mutable_cf_options());
3014 ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
3015 c->column_family_data()->GetName().c_str(),
3016 c->num_input_files(0));
3017 *made_progress = true;
3018 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3019 c->column_family_data());
3020 } else if (!trivial_move_disallowed && c->IsTrivialMove()) {
3021 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
3022 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3023 c->column_family_data());
3024 // Instrument for event update
3025 // TODO(yhchiang): add op details for showing trivial-move.
3026 ThreadStatusUtil::SetColumnFamily(
3027 c->column_family_data(), c->column_family_data()->ioptions()->env,
3028 immutable_db_options_.enable_thread_tracking);
3029 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
3030
3031 compaction_job_stats.num_input_files = c->num_input_files(0);
3032
3033 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3034 compaction_job_stats, job_context->job_id);
3035
3036 // Move files to next level
3037 int32_t moved_files = 0;
3038 int64_t moved_bytes = 0;
3039 for (unsigned int l = 0; l < c->num_input_levels(); l++) {
3040 if (c->level(l) == c->output_level()) {
3041 continue;
3042 }
3043 for (size_t i = 0; i < c->num_input_files(l); i++) {
3044 FileMetaData* f = c->input(l, i);
3045 c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
3046 c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
3047 f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
3048 f->largest, f->fd.smallest_seqno,
3049 f->fd.largest_seqno, f->marked_for_compaction,
3050 f->oldest_blob_file_number, f->oldest_ancester_time,
3051 f->file_creation_time, f->file_checksum,
3052 f->file_checksum_func_name);
3053
3054 ROCKS_LOG_BUFFER(
3055 log_buffer,
3056 "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n",
3057 c->column_family_data()->GetName().c_str(), f->fd.GetNumber(),
3058 c->output_level(), f->fd.GetFileSize());
3059 ++moved_files;
3060 moved_bytes += f->fd.GetFileSize();
3061 }
3062 }
3063
3064 status = versions_->LogAndApply(c->column_family_data(),
3065 *c->mutable_cf_options(), c->edit(),
3066 &mutex_, directories_.GetDbDir());
3067 io_s = versions_->io_status();
3068 // Use latest MutableCFOptions
3069 InstallSuperVersionAndScheduleWork(c->column_family_data(),
3070 &job_context->superversion_contexts[0],
3071 *c->mutable_cf_options());
3072
3073 VersionStorageInfo::LevelSummaryStorage tmp;
3074 c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
3075 moved_bytes);
3076 {
3077 event_logger_.LogToBuffer(log_buffer)
3078 << "job" << job_context->job_id << "event"
3079 << "trivial_move"
3080 << "destination_level" << c->output_level() << "files" << moved_files
3081 << "total_files_size" << moved_bytes;
3082 }
3083 ROCKS_LOG_BUFFER(
3084 log_buffer,
3085 "[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
3086 c->column_family_data()->GetName().c_str(), moved_files,
3087 c->output_level(), moved_bytes, status.ToString().c_str(),
3088 c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
3089 *made_progress = true;
3090
3091 // Clear Instrument
3092 ThreadStatusUtil::ResetThreadStatus();
3093 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3094 c->column_family_data());
3095 } else if (!is_prepicked && c->output_level() > 0 &&
3096 c->output_level() ==
3097 c->column_family_data()
3098 ->current()
3099 ->storage_info()
3100 ->MaxOutputLevel(
3101 immutable_db_options_.allow_ingest_behind) &&
3102 env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
3103 // Forward compactions involving last level to the bottom pool if it exists,
3104 // such that compactions unlikely to contribute to write stalls can be
3105 // delayed or deprioritized.
3106 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
3107 CompactionArg* ca = new CompactionArg;
3108 ca->db = this;
3109 ca->compaction_pri_ = Env::Priority::BOTTOM;
3110 ca->prepicked_compaction = new PrepickedCompaction;
3111 ca->prepicked_compaction->compaction = c.release();
3112 ca->prepicked_compaction->manual_compaction_state = nullptr;
3113 // Transfer requested token, so it doesn't need to do it again.
3114 ca->prepicked_compaction->task_token = std::move(task_token);
3115 ++bg_bottom_compaction_scheduled_;
3116 env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
3117 this, &DBImpl::UnscheduleCompactionCallback);
3118 } else {
3119 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
3120 c->column_family_data());
3121 int output_level __attribute__((__unused__));
3122 output_level = c->output_level();
3123 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
3124 &output_level);
3125 std::vector<SequenceNumber> snapshot_seqs;
3126 SequenceNumber earliest_write_conflict_snapshot;
3127 SnapshotChecker* snapshot_checker;
3128 GetSnapshotContext(job_context, &snapshot_seqs,
3129 &earliest_write_conflict_snapshot, &snapshot_checker);
3130 assert(is_snapshot_supported_ || snapshots_.empty());
3131 CompactionJob compaction_job(
3132 job_context->job_id, c.get(), immutable_db_options_,
3133 mutable_db_options_, file_options_for_compaction_, versions_.get(),
3134 &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer,
3135 directories_.GetDbDir(),
3136 GetDataDir(c->column_family_data(), c->output_path_id()),
3137 GetDataDir(c->column_family_data(), 0), stats_, &mutex_,
3138 &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
3139 snapshot_checker, table_cache_, &event_logger_,
3140 c->mutable_cf_options()->paranoid_file_checks,
3141 c->mutable_cf_options()->report_bg_io_stats, dbname_,
3142 &compaction_job_stats, thread_pri, io_tracer_,
3143 is_manual ? &manual_compaction_paused_ : nullptr, db_id_,
3144 db_session_id_, c->column_family_data()->GetFullHistoryTsLow());
3145 compaction_job.Prepare();
3146
3147 NotifyOnCompactionBegin(c->column_family_data(), c.get(), status,
3148 compaction_job_stats, job_context->job_id);
3149 mutex_.Unlock();
3150 TEST_SYNC_POINT_CALLBACK(
3151 "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
3152 // Should handle erorr?
3153 compaction_job.Run().PermitUncheckedError();
3154 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
3155 mutex_.Lock();
3156
3157 status = compaction_job.Install(*c->mutable_cf_options());
3158 io_s = compaction_job.io_status();
3159 if (status.ok()) {
3160 InstallSuperVersionAndScheduleWork(c->column_family_data(),
3161 &job_context->superversion_contexts[0],
3162 *c->mutable_cf_options());
3163 }
3164 *made_progress = true;
3165 TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
3166 c->column_family_data());
3167 }
3168
3169 if (status.ok() && !io_s.ok()) {
3170 status = io_s;
3171 } else {
3172 io_s.PermitUncheckedError();
3173 }
3174
3175 if (c != nullptr) {
3176 c->ReleaseCompactionFiles(status);
3177 *made_progress = true;
3178
3179 #ifndef ROCKSDB_LITE
3180 // Need to make sure SstFileManager does its bookkeeping
3181 auto sfm = static_cast<SstFileManagerImpl*>(
3182 immutable_db_options_.sst_file_manager.get());
3183 if (sfm && sfm_reserved_compact_space) {
3184 sfm->OnCompactionCompletion(c.get());
3185 }
3186 #endif // ROCKSDB_LITE
3187
3188 NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
3189 compaction_job_stats, job_context->job_id);
3190 }
3191
3192 if (status.ok() || status.IsCompactionTooLarge() ||
3193 status.IsManualCompactionPaused()) {
3194 // Done
3195 } else if (status.IsColumnFamilyDropped() || status.IsShutdownInProgress()) {
3196 // Ignore compaction errors found during shutting down
3197 } else {
3198 ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
3199 status.ToString().c_str());
3200 if (!io_s.ok()) {
3201 // Error while writing to MANIFEST.
3202 // In fact, versions_->io_status() can also be the result of renaming
3203 // CURRENT file. With current code, it's just difficult to tell. So just
3204 // be pessimistic and try write to a new MANIFEST.
3205 // TODO: distinguish between MANIFEST write and CURRENT renaming
3206 auto err_reason = versions_->io_status().ok()
3207 ? BackgroundErrorReason::kCompaction
3208 : BackgroundErrorReason::kManifestWrite;
3209 error_handler_.SetBGError(io_s, err_reason);
3210 } else {
3211 error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
3212 }
3213 if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
3214 // Put this cfd back in the compaction queue so we can retry after some
3215 // time
3216 auto cfd = c->column_family_data();
3217 assert(cfd != nullptr);
3218 // Since this compaction failed, we need to recompute the score so it
3219 // takes the original input files into account
3220 c->column_family_data()
3221 ->current()
3222 ->storage_info()
3223 ->ComputeCompactionScore(*(c->immutable_cf_options()),
3224 *(c->mutable_cf_options()));
3225 if (!cfd->queued_for_compaction()) {
3226 AddToCompactionQueue(cfd);
3227 ++unscheduled_compactions_;
3228 }
3229 }
3230 }
3231 // this will unref its input_version and column_family_data
3232 c.reset();
3233
3234 if (is_manual) {
3235 ManualCompactionState* m = manual_compaction;
3236 if (!status.ok()) {
3237 m->status = status;
3238 m->done = true;
3239 }
3240 // For universal compaction:
3241 // Because universal compaction always happens at level 0, so one
3242 // compaction will pick up all overlapped files. No files will be
3243 // filtered out due to size limit and left for a successive compaction.
3244 // So we can safely conclude the current compaction.
3245 //
3246 // Also note that, if we don't stop here, then the current compaction
3247 // writes a new file back to level 0, which will be used in successive
3248 // compaction. Hence the manual compaction will never finish.
3249 //
3250 // Stop the compaction if manual_end points to nullptr -- this means
3251 // that we compacted the whole range. manual_end should always point
3252 // to nullptr in case of universal compaction
3253 if (m->manual_end == nullptr) {
3254 m->done = true;
3255 }
3256 if (!m->done) {
3257 // We only compacted part of the requested range. Update *m
3258 // to the range that is left to be compacted.
3259 // Universal and FIFO compactions should always compact the whole range
3260 assert(m->cfd->ioptions()->compaction_style !=
3261 kCompactionStyleUniversal ||
3262 m->cfd->ioptions()->num_levels > 1);
3263 assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
3264 m->tmp_storage = *m->manual_end;
3265 m->begin = &m->tmp_storage;
3266 m->incomplete = true;
3267 }
3268 m->in_progress = false; // not being processed anymore
3269 }
3270 TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
3271 return status;
3272 }
3273
HasPendingManualCompaction()3274 bool DBImpl::HasPendingManualCompaction() {
3275 return (!manual_compaction_dequeue_.empty());
3276 }
3277
AddManualCompaction(DBImpl::ManualCompactionState * m)3278 void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
3279 manual_compaction_dequeue_.push_back(m);
3280 }
3281
RemoveManualCompaction(DBImpl::ManualCompactionState * m)3282 void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
3283 // Remove from queue
3284 std::deque<ManualCompactionState*>::iterator it =
3285 manual_compaction_dequeue_.begin();
3286 while (it != manual_compaction_dequeue_.end()) {
3287 if (m == (*it)) {
3288 it = manual_compaction_dequeue_.erase(it);
3289 return;
3290 }
3291 ++it;
3292 }
3293 assert(false);
3294 return;
3295 }
3296
ShouldntRunManualCompaction(ManualCompactionState * m)3297 bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
3298 if (num_running_ingest_file_ > 0) {
3299 // We need to wait for other IngestExternalFile() calls to finish
3300 // before running a manual compaction.
3301 return true;
3302 }
3303 if (m->exclusive) {
3304 return (bg_bottom_compaction_scheduled_ > 0 ||
3305 bg_compaction_scheduled_ > 0);
3306 }
3307 std::deque<ManualCompactionState*>::iterator it =
3308 manual_compaction_dequeue_.begin();
3309 bool seen = false;
3310 while (it != manual_compaction_dequeue_.end()) {
3311 if (m == (*it)) {
3312 ++it;
3313 seen = true;
3314 continue;
3315 } else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
3316 // Consider the other manual compaction *it, conflicts if:
3317 // overlaps with m
3318 // and (*it) is ahead in the queue and is not yet in progress
3319 return true;
3320 }
3321 ++it;
3322 }
3323 return false;
3324 }
3325
HaveManualCompaction(ColumnFamilyData * cfd)3326 bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
3327 // Remove from priority queue
3328 std::deque<ManualCompactionState*>::iterator it =
3329 manual_compaction_dequeue_.begin();
3330 while (it != manual_compaction_dequeue_.end()) {
3331 if ((*it)->exclusive) {
3332 return true;
3333 }
3334 if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
3335 // Allow automatic compaction if manual compaction is
3336 // in progress
3337 return true;
3338 }
3339 ++it;
3340 }
3341 return false;
3342 }
3343
HasExclusiveManualCompaction()3344 bool DBImpl::HasExclusiveManualCompaction() {
3345 // Remove from priority queue
3346 std::deque<ManualCompactionState*>::iterator it =
3347 manual_compaction_dequeue_.begin();
3348 while (it != manual_compaction_dequeue_.end()) {
3349 if ((*it)->exclusive) {
3350 return true;
3351 }
3352 ++it;
3353 }
3354 return false;
3355 }
3356
MCOverlap(ManualCompactionState * m,ManualCompactionState * m1)3357 bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
3358 if ((m->exclusive) || (m1->exclusive)) {
3359 return true;
3360 }
3361 if (m->cfd != m1->cfd) {
3362 return false;
3363 }
3364 return false;
3365 }
3366
3367 #ifndef ROCKSDB_LITE
BuildCompactionJobInfo(const ColumnFamilyData * cfd,Compaction * c,const Status & st,const CompactionJobStats & compaction_job_stats,const int job_id,const Version * current,CompactionJobInfo * compaction_job_info) const3368 void DBImpl::BuildCompactionJobInfo(
3369 const ColumnFamilyData* cfd, Compaction* c, const Status& st,
3370 const CompactionJobStats& compaction_job_stats, const int job_id,
3371 const Version* current, CompactionJobInfo* compaction_job_info) const {
3372 assert(compaction_job_info != nullptr);
3373 compaction_job_info->cf_id = cfd->GetID();
3374 compaction_job_info->cf_name = cfd->GetName();
3375 compaction_job_info->status = st;
3376 compaction_job_info->thread_id = env_->GetThreadID();
3377 compaction_job_info->job_id = job_id;
3378 compaction_job_info->base_input_level = c->start_level();
3379 compaction_job_info->output_level = c->output_level();
3380 compaction_job_info->stats = compaction_job_stats;
3381 compaction_job_info->table_properties = c->GetOutputTableProperties();
3382 compaction_job_info->compaction_reason = c->compaction_reason();
3383 compaction_job_info->compression = c->output_compression();
3384 for (size_t i = 0; i < c->num_input_levels(); ++i) {
3385 for (const auto fmd : *c->inputs(i)) {
3386 const FileDescriptor& desc = fmd->fd;
3387 const uint64_t file_number = desc.GetNumber();
3388 auto fn = TableFileName(c->immutable_cf_options()->cf_paths, file_number,
3389 desc.GetPathId());
3390 compaction_job_info->input_files.push_back(fn);
3391 compaction_job_info->input_file_infos.push_back(CompactionFileInfo{
3392 static_cast<int>(i), file_number, fmd->oldest_blob_file_number});
3393 if (compaction_job_info->table_properties.count(fn) == 0) {
3394 std::shared_ptr<const TableProperties> tp;
3395 auto s = current->GetTableProperties(&tp, fmd, &fn);
3396 if (s.ok()) {
3397 compaction_job_info->table_properties[fn] = tp;
3398 }
3399 }
3400 }
3401 }
3402 for (const auto& newf : c->edit()->GetNewFiles()) {
3403 const FileMetaData& meta = newf.second;
3404 const FileDescriptor& desc = meta.fd;
3405 const uint64_t file_number = desc.GetNumber();
3406 compaction_job_info->output_files.push_back(TableFileName(
3407 c->immutable_cf_options()->cf_paths, file_number, desc.GetPathId()));
3408 compaction_job_info->output_file_infos.push_back(CompactionFileInfo{
3409 newf.first, file_number, meta.oldest_blob_file_number});
3410 }
3411 }
3412 #endif
3413
3414 // SuperVersionContext gets created and destructed outside of the lock --
3415 // we use this conveniently to:
3416 // * malloc one SuperVersion() outside of the lock -- new_superversion
3417 // * delete SuperVersion()s outside of the lock -- superversions_to_free
3418 //
3419 // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
3420 // same sv_context, we can't reuse the SuperVersion() that got
3421 // malloced because
3422 // first call already used it. In that rare case, we take a hit and create a
3423 // new SuperVersion() inside of the mutex. We do similar thing
3424 // for superversion_to_free
3425
InstallSuperVersionAndScheduleWork(ColumnFamilyData * cfd,SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)3426 void DBImpl::InstallSuperVersionAndScheduleWork(
3427 ColumnFamilyData* cfd, SuperVersionContext* sv_context,
3428 const MutableCFOptions& mutable_cf_options) {
3429 mutex_.AssertHeld();
3430
3431 // Update max_total_in_memory_state_
3432 size_t old_memtable_size = 0;
3433 auto* old_sv = cfd->GetSuperVersion();
3434 if (old_sv) {
3435 old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
3436 old_sv->mutable_cf_options.max_write_buffer_number;
3437 }
3438
3439 // this branch is unlikely to step in
3440 if (UNLIKELY(sv_context->new_superversion == nullptr)) {
3441 sv_context->NewSuperVersion();
3442 }
3443 cfd->InstallSuperVersion(sv_context, &mutex_, mutable_cf_options);
3444
3445 // There may be a small data race here. The snapshot tricking bottommost
3446 // compaction may already be released here. But assuming there will always be
3447 // newer snapshot created and released frequently, the compaction will be
3448 // triggered soon anyway.
3449 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3450 for (auto* my_cfd : *versions_->GetColumnFamilySet()) {
3451 bottommost_files_mark_threshold_ = std::min(
3452 bottommost_files_mark_threshold_,
3453 my_cfd->current()->storage_info()->bottommost_files_mark_threshold());
3454 }
3455
3456 // Whenever we install new SuperVersion, we might need to issue new flushes or
3457 // compactions.
3458 SchedulePendingCompaction(cfd);
3459 MaybeScheduleFlushOrCompaction();
3460
3461 // Update max_total_in_memory_state_
3462 max_total_in_memory_state_ = max_total_in_memory_state_ - old_memtable_size +
3463 mutable_cf_options.write_buffer_size *
3464 mutable_cf_options.max_write_buffer_number;
3465 }
3466
3467 // ShouldPurge is called by FindObsoleteFiles when doing a full scan,
3468 // and db mutex (mutex_) should already be held.
3469 // Actually, the current implementation of FindObsoleteFiles with
3470 // full_scan=true can issue I/O requests to obtain list of files in
3471 // directories, e.g. env_->getChildren while holding db mutex.
ShouldPurge(uint64_t file_number) const3472 bool DBImpl::ShouldPurge(uint64_t file_number) const {
3473 return files_grabbed_for_purge_.find(file_number) ==
3474 files_grabbed_for_purge_.end() &&
3475 purge_files_.find(file_number) == purge_files_.end();
3476 }
3477
3478 // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
3479 // (mutex_) should already be held.
MarkAsGrabbedForPurge(uint64_t file_number)3480 void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
3481 files_grabbed_for_purge_.insert(file_number);
3482 }
3483
SetSnapshotChecker(SnapshotChecker * snapshot_checker)3484 void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
3485 InstrumentedMutexLock l(&mutex_);
3486 // snapshot_checker_ should only set once. If we need to set it multiple
3487 // times, we need to make sure the old one is not deleted while it is still
3488 // using by a compaction job.
3489 assert(!snapshot_checker_);
3490 snapshot_checker_.reset(snapshot_checker);
3491 }
3492
GetSnapshotContext(JobContext * job_context,std::vector<SequenceNumber> * snapshot_seqs,SequenceNumber * earliest_write_conflict_snapshot,SnapshotChecker ** snapshot_checker_ptr)3493 void DBImpl::GetSnapshotContext(
3494 JobContext* job_context, std::vector<SequenceNumber>* snapshot_seqs,
3495 SequenceNumber* earliest_write_conflict_snapshot,
3496 SnapshotChecker** snapshot_checker_ptr) {
3497 mutex_.AssertHeld();
3498 assert(job_context != nullptr);
3499 assert(snapshot_seqs != nullptr);
3500 assert(earliest_write_conflict_snapshot != nullptr);
3501 assert(snapshot_checker_ptr != nullptr);
3502
3503 *snapshot_checker_ptr = snapshot_checker_.get();
3504 if (use_custom_gc_ && *snapshot_checker_ptr == nullptr) {
3505 *snapshot_checker_ptr = DisableGCSnapshotChecker::Instance();
3506 }
3507 if (*snapshot_checker_ptr != nullptr) {
3508 // If snapshot_checker is used, that means the flush/compaction may
3509 // contain values not visible to snapshot taken after
3510 // flush/compaction job starts. Take a snapshot and it will appear
3511 // in snapshot_seqs and force compaction iterator to consider such
3512 // snapshots.
3513 const Snapshot* job_snapshot =
3514 GetSnapshotImpl(false /*write_conflict_boundary*/, false /*lock*/);
3515 job_context->job_snapshot.reset(new ManagedSnapshot(this, job_snapshot));
3516 }
3517 *snapshot_seqs = snapshots_.GetAll(earliest_write_conflict_snapshot);
3518 }
3519 } // namespace ROCKSDB_NAMESPACE
3520