1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/flush_job.h"
11
12 #include <cinttypes>
13
14 #include <algorithm>
15 #include <vector>
16
17 #include "db/builder.h"
18 #include "db/db_iter.h"
19 #include "db/dbformat.h"
20 #include "db/event_helpers.h"
21 #include "db/log_reader.h"
22 #include "db/log_writer.h"
23 #include "db/memtable.h"
24 #include "db/memtable_list.h"
25 #include "db/merge_context.h"
26 #include "db/range_tombstone_fragmenter.h"
27 #include "db/version_set.h"
28 #include "file/file_util.h"
29 #include "file/filename.h"
30 #include "logging/event_logger.h"
31 #include "logging/log_buffer.h"
32 #include "logging/logging.h"
33 #include "monitoring/iostats_context_imp.h"
34 #include "monitoring/perf_context_imp.h"
35 #include "monitoring/thread_status_util.h"
36 #include "port/port.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/statistics.h"
40 #include "rocksdb/status.h"
41 #include "rocksdb/table.h"
42 #include "table/merging_iterator.h"
43 #include "table/table_builder.h"
44 #include "table/two_level_iterator.h"
45 #include "test_util/sync_point.h"
46 #include "util/coding.h"
47 #include "util/mutexlock.h"
48 #include "util/stop_watch.h"
49
50 namespace ROCKSDB_NAMESPACE {
51
GetFlushReasonString(FlushReason flush_reason)52 const char* GetFlushReasonString (FlushReason flush_reason) {
53 switch (flush_reason) {
54 case FlushReason::kOthers:
55 return "Other Reasons";
56 case FlushReason::kGetLiveFiles:
57 return "Get Live Files";
58 case FlushReason::kShutDown:
59 return "Shut down";
60 case FlushReason::kExternalFileIngestion:
61 return "External File Ingestion";
62 case FlushReason::kManualCompaction:
63 return "Manual Compaction";
64 case FlushReason::kWriteBufferManager:
65 return "Write Buffer Manager";
66 case FlushReason::kWriteBufferFull:
67 return "Write Buffer Full";
68 case FlushReason::kTest:
69 return "Test";
70 case FlushReason::kDeleteFiles:
71 return "Delete Files";
72 case FlushReason::kAutoCompaction:
73 return "Auto Compaction";
74 case FlushReason::kManualFlush:
75 return "Manual Flush";
76 case FlushReason::kErrorRecovery:
77 return "Error Recovery";
78 case FlushReason::kWalFull:
79 return "WAL Full";
80 default:
81 return "Invalid";
82 }
83 }
84
FlushJob(const std::string & dbname,ColumnFamilyData * cfd,const ImmutableDBOptions & db_options,const MutableCFOptions & mutable_cf_options,uint64_t max_memtable_id,const FileOptions & file_options,VersionSet * versions,InstrumentedMutex * db_mutex,std::atomic<bool> * shutting_down,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,JobContext * job_context,LogBuffer * log_buffer,FSDirectory * db_directory,FSDirectory * output_file_directory,CompressionType output_compression,Statistics * stats,EventLogger * event_logger,bool measure_io_stats,const bool sync_output_directory,const bool write_manifest,Env::Priority thread_pri,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_id,const std::string & db_session_id,std::string full_history_ts_low,BlobFileCompletionCallback * blob_callback)85 FlushJob::FlushJob(
86 const std::string& dbname, ColumnFamilyData* cfd,
87 const ImmutableDBOptions& db_options,
88 const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
89 const FileOptions& file_options, VersionSet* versions,
90 InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
91 std::vector<SequenceNumber> existing_snapshots,
92 SequenceNumber earliest_write_conflict_snapshot,
93 SnapshotChecker* snapshot_checker, JobContext* job_context,
94 LogBuffer* log_buffer, FSDirectory* db_directory,
95 FSDirectory* output_file_directory, CompressionType output_compression,
96 Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
97 const bool sync_output_directory, const bool write_manifest,
98 Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
99 const std::string& db_id, const std::string& db_session_id,
100 std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
101 : dbname_(dbname),
102 db_id_(db_id),
103 db_session_id_(db_session_id),
104 cfd_(cfd),
105 db_options_(db_options),
106 mutable_cf_options_(mutable_cf_options),
107 max_memtable_id_(max_memtable_id),
108 file_options_(file_options),
109 versions_(versions),
110 db_mutex_(db_mutex),
111 shutting_down_(shutting_down),
112 existing_snapshots_(std::move(existing_snapshots)),
113 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
114 snapshot_checker_(snapshot_checker),
115 job_context_(job_context),
116 log_buffer_(log_buffer),
117 db_directory_(db_directory),
118 output_file_directory_(output_file_directory),
119 output_compression_(output_compression),
120 stats_(stats),
121 event_logger_(event_logger),
122 measure_io_stats_(measure_io_stats),
123 sync_output_directory_(sync_output_directory),
124 write_manifest_(write_manifest),
125 edit_(nullptr),
126 base_(nullptr),
127 pick_memtable_called(false),
128 thread_pri_(thread_pri),
129 io_tracer_(io_tracer),
130 clock_(db_options_.clock),
131 full_history_ts_low_(std::move(full_history_ts_low)),
132 blob_callback_(blob_callback) {
133 // Update the thread status to indicate flush.
134 ReportStartedFlush();
135 TEST_SYNC_POINT("FlushJob::FlushJob()");
136 }
137
~FlushJob()138 FlushJob::~FlushJob() {
139 io_status_.PermitUncheckedError();
140 ThreadStatusUtil::ResetThreadStatus();
141 }
142
ReportStartedFlush()143 void FlushJob::ReportStartedFlush() {
144 ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
145 db_options_.enable_thread_tracking);
146 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
147 ThreadStatusUtil::SetThreadOperationProperty(
148 ThreadStatus::COMPACTION_JOB_ID,
149 job_context_->job_id);
150 IOSTATS_RESET(bytes_written);
151 }
152
ReportFlushInputSize(const autovector<MemTable * > & mems)153 void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
154 uint64_t input_size = 0;
155 for (auto* mem : mems) {
156 input_size += mem->ApproximateMemoryUsage();
157 }
158 ThreadStatusUtil::IncreaseThreadOperationProperty(
159 ThreadStatus::FLUSH_BYTES_MEMTABLES,
160 input_size);
161 }
162
RecordFlushIOStats()163 void FlushJob::RecordFlushIOStats() {
164 RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
165 ThreadStatusUtil::IncreaseThreadOperationProperty(
166 ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
167 IOSTATS_RESET(bytes_written);
168 }
PickMemTable()169 void FlushJob::PickMemTable() {
170 db_mutex_->AssertHeld();
171 assert(!pick_memtable_called);
172 pick_memtable_called = true;
173 // Save the contents of the earliest memtable as a new Table
174 cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
175 if (mems_.empty()) {
176 return;
177 }
178
179 ReportFlushInputSize(mems_);
180
181 // entries mems are (implicitly) sorted in ascending order by their created
182 // time. We will use the first memtable's `edit` to keep the meta info for
183 // this flush.
184 MemTable* m = mems_[0];
185 edit_ = m->GetEdits();
186 edit_->SetPrevLogNumber(0);
187 // SetLogNumber(log_num) indicates logs with number smaller than log_num
188 // will no longer be picked up for recovery.
189 edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
190 edit_->SetColumnFamily(cfd_->GetID());
191
192 // path 0 for level 0 file.
193 meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
194
195 base_ = cfd_->current();
196 base_->Ref(); // it is likely that we do not need this reference
197 }
198
Run(LogsWithPrepTracker * prep_tracker,FileMetaData * file_meta,bool * switched_to_mempurge)199 Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
200 bool* switched_to_mempurge) {
201 TEST_SYNC_POINT("FlushJob::Start");
202 db_mutex_->AssertHeld();
203 assert(pick_memtable_called);
204 AutoThreadOperationStageUpdater stage_run(
205 ThreadStatus::STAGE_FLUSH_RUN);
206 if (mems_.empty()) {
207 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
208 cfd_->GetName().c_str());
209 return Status::OK();
210 }
211
212 // I/O measurement variables
213 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
214 uint64_t prev_write_nanos = 0;
215 uint64_t prev_fsync_nanos = 0;
216 uint64_t prev_range_sync_nanos = 0;
217 uint64_t prev_prepare_write_nanos = 0;
218 uint64_t prev_cpu_write_nanos = 0;
219 uint64_t prev_cpu_read_nanos = 0;
220 if (measure_io_stats_) {
221 prev_perf_level = GetPerfLevel();
222 SetPerfLevel(PerfLevel::kEnableTime);
223 prev_write_nanos = IOSTATS(write_nanos);
224 prev_fsync_nanos = IOSTATS(fsync_nanos);
225 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
226 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
227 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
228 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
229 }
230 Status mempurge_s = Status::NotFound("No MemPurge.");
231 if ((db_options_.experimental_mempurge_threshold > 0.0) &&
232 (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
233 (!mems_.empty()) && MemPurgeDecider()) {
234 mempurge_s = MemPurge();
235 if (!mempurge_s.ok()) {
236 // Mempurge is typically aborted when the output
237 // bytes cannot be contained onto a single output memtable.
238 if (mempurge_s.IsAborted()) {
239 ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
240 mempurge_s.ToString().c_str());
241 } else {
242 // However the mempurge process can also fail for
243 // other reasons (eg: new_mem->Add() fails).
244 ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
245 mempurge_s.ToString().c_str());
246 }
247 } else {
248 if (switched_to_mempurge) {
249 *switched_to_mempurge = true;
250 } else {
251 // The mempurge process was successful, but no switch_to_mempurge
252 // pointer provided so no way to propagate the state of flush job.
253 ROCKS_LOG_WARN(db_options_.info_log,
254 "Mempurge process succeeded"
255 "but no 'switched_to_mempurge' ptr provided.\n");
256 }
257 }
258 }
259 Status s;
260 if (mempurge_s.ok()) {
261 base_->Unref();
262 s = Status::OK();
263 } else {
264 // This will release and re-acquire the mutex.
265 s = WriteLevel0Table();
266 }
267
268 if (s.ok() && cfd_->IsDropped()) {
269 s = Status::ColumnFamilyDropped("Column family dropped during compaction");
270 }
271 if ((s.ok() || s.IsColumnFamilyDropped()) &&
272 shutting_down_->load(std::memory_order_acquire)) {
273 s = Status::ShutdownInProgress("Database shutdown");
274 }
275
276 if (!s.ok()) {
277 cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
278 } else if (write_manifest_) {
279 TEST_SYNC_POINT("FlushJob::InstallResults");
280 // Replace immutable memtable with the generated Table
281 IOStatus tmp_io_s;
282 s = cfd_->imm()->TryInstallMemtableFlushResults(
283 cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
284 meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
285 log_buffer_, &committed_flush_jobs_info_, &tmp_io_s,
286 !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
287 but 'false' if mempurge successful: no new min log number
288 or new level 0 file path to write to manifest. */);
289 if (!tmp_io_s.ok()) {
290 io_status_ = tmp_io_s;
291 }
292 }
293
294 if (s.ok() && file_meta != nullptr) {
295 *file_meta = meta_;
296 }
297 RecordFlushIOStats();
298
299 // When measure_io_stats_ is true, the default 512 bytes is not enough.
300 auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
301 stream << "job" << job_context_->job_id << "event"
302 << "flush_finished";
303 stream << "output_compression"
304 << CompressionTypeToString(output_compression_);
305 stream << "lsm_state";
306 stream.StartArray();
307 auto vstorage = cfd_->current()->storage_info();
308 for (int level = 0; level < vstorage->num_levels(); ++level) {
309 stream << vstorage->NumLevelFiles(level);
310 }
311 stream.EndArray();
312
313 const auto& blob_files = vstorage->GetBlobFiles();
314 if (!blob_files.empty()) {
315 stream << "blob_file_head" << blob_files.begin()->first;
316 stream << "blob_file_tail" << blob_files.rbegin()->first;
317 }
318
319 stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
320
321 if (measure_io_stats_) {
322 if (prev_perf_level != PerfLevel::kEnableTime) {
323 SetPerfLevel(prev_perf_level);
324 }
325 stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
326 stream << "file_range_sync_nanos"
327 << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
328 stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
329 stream << "file_prepare_write_nanos"
330 << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
331 stream << "file_cpu_write_nanos"
332 << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
333 stream << "file_cpu_read_nanos"
334 << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
335 }
336
337 return s;
338 }
339
Cancel()340 void FlushJob::Cancel() {
341 db_mutex_->AssertHeld();
342 assert(base_ != nullptr);
343 base_->Unref();
344 }
345
MemPurge()346 Status FlushJob::MemPurge() {
347 Status s;
348 db_mutex_->AssertHeld();
349 db_mutex_->Unlock();
350 assert(!mems_.empty());
351
352 // Measure purging time.
353 const uint64_t start_micros = clock_->NowMicros();
354 const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;
355
356 MemTable* new_mem = nullptr;
357 // For performance/log investigation purposes:
358 // look at how much useful payload we harvest in the new_mem.
359 // This value is then printed to the DB log.
360 double new_mem_capacity = 0.0;
361
362 // Create two iterators, one for the memtable data (contains
363 // info from puts + deletes), and one for the memtable
364 // Range Tombstones (from DeleteRanges).
365 ReadOptions ro;
366 ro.total_order_seek = true;
367 Arena arena;
368 std::vector<InternalIterator*> memtables;
369 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
370 range_del_iters;
371 for (MemTable* m : mems_) {
372 memtables.push_back(m->NewIterator(ro, &arena));
373 auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
374 if (range_del_iter != nullptr) {
375 range_del_iters.emplace_back(range_del_iter);
376 }
377 }
378
379 assert(!memtables.empty());
380 SequenceNumber first_seqno = kMaxSequenceNumber;
381 SequenceNumber earliest_seqno = kMaxSequenceNumber;
382 // Pick first and earliest seqno as min of all first_seqno
383 // and earliest_seqno of the mempurged memtables.
384 for (const auto& mem : mems_) {
385 first_seqno = mem->GetFirstSequenceNumber() < first_seqno
386 ? mem->GetFirstSequenceNumber()
387 : first_seqno;
388 earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
389 ? mem->GetEarliestSequenceNumber()
390 : earliest_seqno;
391 }
392
393 ScopedArenaIterator iter(
394 NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
395 static_cast<int>(memtables.size()), &arena));
396
397 auto* ioptions = cfd_->ioptions();
398
399 // Place iterator at the First (meaning most recent) key node.
400 iter->SeekToFirst();
401
402 std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
403 new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
404 existing_snapshots_));
405 for (auto& rd_iter : range_del_iters) {
406 range_del_agg->AddTombstones(std::move(rd_iter));
407 }
408
409 // If there is valid data in the memtable,
410 // or at least range tombstones, copy over the info
411 // to the new memtable.
412 if (iter->Valid() || !range_del_agg->IsEmpty()) {
413 // MaxSize is the size of a memtable.
414 size_t maxSize = mutable_cf_options_.write_buffer_size;
415 std::unique_ptr<CompactionFilter> compaction_filter;
416 if (ioptions->compaction_filter_factory != nullptr &&
417 ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
418 TableFileCreationReason::kFlush)) {
419 CompactionFilter::Context ctx;
420 ctx.is_full_compaction = false;
421 ctx.is_manual_compaction = false;
422 ctx.column_family_id = cfd_->GetID();
423 ctx.reason = TableFileCreationReason::kFlush;
424 compaction_filter =
425 ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
426 if (compaction_filter != nullptr &&
427 !compaction_filter->IgnoreSnapshots()) {
428 s = Status::NotSupported(
429 "CompactionFilter::IgnoreSnapshots() = false is not supported "
430 "anymore.");
431 return s;
432 }
433 }
434
435 new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
436 mutable_cf_options_, cfd_->write_buffer_mgr(),
437 earliest_seqno, cfd_->GetID());
438 assert(new_mem != nullptr);
439
440 Env* env = db_options_.env;
441 assert(env);
442 MergeHelper merge(
443 env, (cfd_->internal_comparator()).user_comparator(),
444 (ioptions->merge_operator).get(), compaction_filter.get(),
445 ioptions->logger, true /* internal key corruption is not ok */,
446 existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
447 snapshot_checker_);
448 CompactionIterator c_iter(
449 iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
450 kMaxSequenceNumber, &existing_snapshots_,
451 earliest_write_conflict_snapshot_, snapshot_checker_, env,
452 ShouldReportDetailedTime(env, ioptions->stats),
453 true /* internal key corruption is not ok */, range_del_agg.get(),
454 nullptr, ioptions->allow_data_in_errors,
455 /*compaction=*/nullptr, compaction_filter.get(),
456 /*shutting_down=*/nullptr,
457 /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
458 /*manual_compaction_canceled=*/nullptr, ioptions->info_log,
459 &(cfd_->GetFullHistoryTsLow()));
460
461 // Set earliest sequence number in the new memtable
462 // to be equal to the earliest sequence number of the
463 // memtable being flushed (See later if there is a need
464 // to update this number!).
465 new_mem->SetEarliestSequenceNumber(earliest_seqno);
466 // Likewise for first seq number.
467 new_mem->SetFirstSequenceNumber(first_seqno);
468 SequenceNumber new_first_seqno = kMaxSequenceNumber;
469
470 c_iter.SeekToFirst();
471
472 // Key transfer
473 for (; c_iter.Valid(); c_iter.Next()) {
474 const ParsedInternalKey ikey = c_iter.ikey();
475 const Slice value = c_iter.value();
476 new_first_seqno =
477 ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
478
479 // Should we update "OldestKeyTime" ???? -> timestamp appear
480 // to still be an "experimental" feature.
481 s = new_mem->Add(
482 ikey.sequence, ikey.type, ikey.user_key, value,
483 nullptr, // KV protection info set as nullptr since it
484 // should only be useful for the first add to
485 // the original memtable.
486 false, // : allow concurrent_memtable_writes_
487 // Not seen as necessary for now.
488 nullptr, // get_post_process_info(m) must be nullptr
489 // when concurrent_memtable_writes is switched off.
490 nullptr); // hint, only used when concurrent_memtable_writes_
491 // is switched on.
492 if (!s.ok()) {
493 break;
494 }
495
496 // If new_mem has size greater than maxSize,
497 // then rollback to regular flush operation,
498 // and destroy new_mem.
499 if (new_mem->ApproximateMemoryUsage() > maxSize) {
500 s = Status::Aborted("Mempurge filled more than one memtable.");
501 new_mem_capacity = 1.0;
502 break;
503 }
504 }
505
506 // Check status and propagate
507 // potential error status from c_iter
508 if (!s.ok()) {
509 c_iter.status().PermitUncheckedError();
510 } else if (!c_iter.status().ok()) {
511 s = c_iter.status();
512 }
513
514 // Range tombstone transfer.
515 if (s.ok()) {
516 auto range_del_it = range_del_agg->NewIterator();
517 for (range_del_it->SeekToFirst(); range_del_it->Valid();
518 range_del_it->Next()) {
519 auto tombstone = range_del_it->Tombstone();
520 new_first_seqno =
521 tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
522 s = new_mem->Add(
523 tombstone.seq_, // Sequence number
524 kTypeRangeDeletion, // KV type
525 tombstone.start_key_, // Key is start key.
526 tombstone.end_key_, // Value is end key.
527 nullptr, // KV protection info set as nullptr since it
528 // should only be useful for the first add to
529 // the original memtable.
530 false, // : allow concurrent_memtable_writes_
531 // Not seen as necessary for now.
532 nullptr, // get_post_process_info(m) must be nullptr
533 // when concurrent_memtable_writes is switched off.
534 nullptr); // hint, only used when concurrent_memtable_writes_
535 // is switched on.
536
537 if (!s.ok()) {
538 break;
539 }
540
541 // If new_mem has size greater than maxSize,
542 // then rollback to regular flush operation,
543 // and destroy new_mem.
544 if (new_mem->ApproximateMemoryUsage() > maxSize) {
545 s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
546 new_mem_capacity = 1.0;
547 break;
548 }
549 }
550 }
551
552 // If everything happened smoothly and new_mem contains valid data,
553 // decide if it is flushed to storage or kept in the imm()
554 // memtable list (memory).
555 if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
556 // Rectify the first sequence number, which (unlike the earliest seq
557 // number) needs to be present in the new memtable.
558 new_mem->SetFirstSequenceNumber(new_first_seqno);
559
560 // The new_mem is added to the list of immutable memtables
561 // only if it filled at less than 100% capacity and isn't flagged
562 // as in need of being flushed.
563 if (new_mem->ApproximateMemoryUsage() < maxSize &&
564 !(new_mem->ShouldFlushNow())) {
565 db_mutex_->Lock();
566 uint64_t new_mem_id = mems_[0]->GetID();
567
568 new_mem->SetID(new_mem_id);
569
570 // This addition will not trigger another flush, because
571 // we do not call SchedulePendingFlush().
572 cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
573 new_mem->Ref();
574 #ifndef ROCKSDB_LITE
575 // Piggyback FlushJobInfo on the first flushed memtable.
576 db_mutex_->AssertHeld();
577 meta_.fd.file_size = 0;
578 mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
579 #endif // !ROCKSDB_LITE
580 db_mutex_->Unlock();
581 } else {
582 s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
583 new_mem_capacity = 1.0;
584 if (new_mem) {
585 job_context_->memtables_to_free.push_back(new_mem);
586 }
587 }
588 } else {
589 // In this case, the newly allocated new_mem is empty.
590 assert(new_mem != nullptr);
591 job_context_->memtables_to_free.push_back(new_mem);
592 }
593 }
594
595 // Reacquire the mutex for WriteLevel0 function.
596 db_mutex_->Lock();
597
598 // If mempurge successful, don't write input tables to level0,
599 // but write any full output table to level0.
600 if (s.ok()) {
601 TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
602 } else {
603 TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
604 }
605 const uint64_t micros = clock_->NowMicros() - start_micros;
606 const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
607 ROCKS_LOG_INFO(db_options_.info_log,
608 "[%s] [JOB %d] Mempurge lasted %" PRIu64
609 " microseconds, and %" PRIu64
610 " cpu "
611 "microseconds. Status is %s ok. Perc capacity: %f\n",
612 cfd_->GetName().c_str(), job_context_->job_id, micros,
613 cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
614
615 return s;
616 }
617
MemPurgeDecider()618 bool FlushJob::MemPurgeDecider() {
619 double threshold = db_options_.experimental_mempurge_threshold;
620 // Never trigger mempurge if threshold is not a strictly positive value.
621 if (!(threshold > 0.0)) {
622 return false;
623 }
624 if (threshold > (1.0 * mems_.size())) {
625 return true;
626 }
627 // Payload and useful_payload (in bytes).
628 // The useful payload ratio of a given MemTable
629 // is estimated to be useful_payload/payload.
630 uint64_t payload = 0, useful_payload = 0, entry_size = 0;
631
632 // Local variables used repetitively inside the for-loop
633 // when iterating over the sampled entries.
634 Slice key_slice, value_slice;
635 ParsedInternalKey res;
636 SnapshotImpl min_snapshot;
637 std::string vget;
638 Status mget_s, parse_s;
639 MergeContext merge_context;
640 SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
641 min_seqno_snapshot = 0;
642 bool get_res, can_be_useful_payload, not_in_next_mems;
643
644 // If estimated_useful_payload is > threshold,
645 // then flush to storage, else MemPurge.
646 double estimated_useful_payload = 0.0;
647 // Cochran formula for determining sample size.
648 // 95% confidence interval, 7% precision.
649 // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
650 double n0 = 196.0;
651 ReadOptions ro;
652 ro.total_order_seek = true;
653
654 // Iterate over each memtable of the set.
655 for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
656 mem_iter++) {
657 MemTable* mt = *mem_iter;
658
659 // Else sample from the table.
660 uint64_t nentries = mt->num_entries();
661 // Corrected Cochran formula for small populations
662 // (converges to n0 for large populations).
663 uint64_t target_sample_size =
664 static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
665 std::unordered_set<const char*> sentries = {};
666 // Populate sample entries set.
667 mt->UniqueRandomSample(target_sample_size, &sentries);
668
669 // Estimate the garbage ratio by comparing if
670 // each sample corresponds to a valid entry.
671 for (const char* ss : sentries) {
672 key_slice = GetLengthPrefixedSlice(ss);
673 parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
674 if (!parse_s.ok()) {
675 ROCKS_LOG_WARN(db_options_.info_log,
676 "Memtable Decider: ParseInternalKey did not parse "
677 "key_slice %s successfully.",
678 key_slice.data());
679 }
680
681 // Size of the entry is "key size (+ value size if KV entry)"
682 entry_size = key_slice.size();
683 if (res.type == kTypeValue) {
684 value_slice =
685 GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
686 entry_size += value_slice.size();
687 }
688
689 // Count entry bytes as payload.
690 payload += entry_size;
691
692 LookupKey lkey(res.user_key, kMaxSequenceNumber);
693
694 // Paranoia: zero out these values just in case.
695 max_covering_tombstone_seq = 0;
696 sqno = 0;
697
698 // Pick the oldest existing snapshot that is more recent
699 // than the sequence number of the sampled entry.
700 min_seqno_snapshot = kMaxSequenceNumber;
701 for (SequenceNumber seq_num : existing_snapshots_) {
702 if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
703 min_seqno_snapshot = seq_num;
704 }
705 }
706 min_snapshot.number_ = min_seqno_snapshot;
707 ro.snapshot =
708 min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
709
710 // Estimate if the sample entry is valid or not.
711 get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
712 &max_covering_tombstone_seq, &sqno, ro);
713 if (!get_res) {
714 ROCKS_LOG_WARN(
715 db_options_.info_log,
716 "Memtable Get returned false when Get(sampled entry). "
717 "Yet each sample entry should exist somewhere in the memtable, "
718 "unrelated to whether it has been deleted or not.");
719 }
720
721 // TODO(bjlemaire): evaluate typeMerge.
722 // This is where the sampled entry is estimated to be
723 // garbage or not. Note that this is a garbage *estimation*
724 // because we do not include certain items such as
725 // CompactionFitlers triggered at flush, or if the same delete
726 // has been inserted twice or more in the memtable.
727
728 // Evaluate if the entry can be useful payload
729 // Situation #1: entry is a KV entry, was found in the memtable mt
730 // and the sequence numbers match.
731 can_be_useful_payload = (res.type == kTypeValue) && get_res &&
732 mget_s.ok() && (sqno == res.sequence);
733
734 // Situation #2: entry is a delete entry, was found in the memtable mt
735 // (because gres==true) and no valid KV entry is found.
736 // (note: duplicate delete entries are also taken into
737 // account here, because the sequence number 'sqno'
738 // in memtable->Get(&sqno) operation is set to be equal
739 // to the most recent delete entry as well).
740 can_be_useful_payload |=
741 ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
742 mget_s.IsNotFound() && get_res && (sqno == res.sequence);
743
744 // If there is a chance that the entry is useful payload
745 // Verify that the entry does not appear in the following memtables
746 // (memtables with greater memtable ID/larger sequence numbers).
747 if (can_be_useful_payload) {
748 not_in_next_mems = true;
749 for (auto next_mem_iter = mem_iter + 1;
750 next_mem_iter != std::end(mems_); next_mem_iter++) {
751 if ((*next_mem_iter)
752 ->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
753 &max_covering_tombstone_seq, &sqno, ro)) {
754 not_in_next_mems = false;
755 break;
756 }
757 }
758 if (not_in_next_mems) {
759 useful_payload += entry_size;
760 }
761 }
762 }
763 if (payload > 0) {
764 // We use the estimated useful payload ratio to
765 // evaluate how many of the memtable bytes are useful bytes.
766 estimated_useful_payload +=
767 (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
768
769 ROCKS_LOG_INFO(
770 db_options_.info_log,
771 "Mempurge sampling - found garbage ratio from sampling: %f.\n",
772 (payload - useful_payload) * 1.0 / payload);
773 } else {
774 ROCKS_LOG_WARN(db_options_.info_log,
775 "Mempurge sampling: null payload measured, and collected "
776 "sample size is %zu\n.",
777 sentries.size());
778 }
779 }
780 // We convert the total number of useful payload bytes
781 // into the proportion of memtable necessary to store all these bytes.
782 // We compare this proportion with the threshold value.
783 return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
784 threshold);
785 }
786
WriteLevel0Table()787 Status FlushJob::WriteLevel0Table() {
788 AutoThreadOperationStageUpdater stage_updater(
789 ThreadStatus::STAGE_FLUSH_WRITE_L0);
790 db_mutex_->AssertHeld();
791 const uint64_t start_micros = clock_->NowMicros();
792 const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;
793 Status s;
794
795 std::vector<BlobFileAddition> blob_file_additions;
796
797 {
798 auto write_hint = cfd_->CalculateSSTWriteHint(0);
799 db_mutex_->Unlock();
800 if (log_buffer_) {
801 log_buffer_->FlushBufferToLog();
802 }
803 // memtables and range_del_iters store internal iterators over each data
804 // memtable and its associated range deletion memtable, respectively, at
805 // corresponding indexes.
806 std::vector<InternalIterator*> memtables;
807 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
808 range_del_iters;
809 ReadOptions ro;
810 ro.total_order_seek = true;
811 Arena arena;
812 uint64_t total_num_entries = 0, total_num_deletes = 0;
813 uint64_t total_data_size = 0;
814 size_t total_memory_usage = 0;
815 for (MemTable* m : mems_) {
816 ROCKS_LOG_INFO(
817 db_options_.info_log,
818 "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
819 cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
820 memtables.push_back(m->NewIterator(ro, &arena));
821 auto* range_del_iter =
822 m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
823 if (range_del_iter != nullptr) {
824 range_del_iters.emplace_back(range_del_iter);
825 }
826 total_num_entries += m->num_entries();
827 total_num_deletes += m->num_deletes();
828 total_data_size += m->get_data_size();
829 total_memory_usage += m->ApproximateMemoryUsage();
830 }
831
832 event_logger_->Log() << "job" << job_context_->job_id << "event"
833 << "flush_started"
834 << "num_memtables" << mems_.size() << "num_entries"
835 << total_num_entries << "num_deletes"
836 << total_num_deletes << "total_data_size"
837 << total_data_size << "memory_usage"
838 << total_memory_usage << "flush_reason"
839 << GetFlushReasonString(cfd_->GetFlushReason());
840
841 {
842 ScopedArenaIterator iter(
843 NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
844 static_cast<int>(memtables.size()), &arena));
845 ROCKS_LOG_INFO(db_options_.info_log,
846 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
847 cfd_->GetName().c_str(), job_context_->job_id,
848 meta_.fd.GetNumber());
849
850 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
851 &output_compression_);
852 int64_t _current_time = 0;
853 auto status = clock_->GetCurrentTime(&_current_time);
854 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
855 if (!status.ok()) {
856 ROCKS_LOG_WARN(
857 db_options_.info_log,
858 "Failed to get current time to populate creation_time property. "
859 "Status: %s",
860 status.ToString().c_str());
861 }
862 const uint64_t current_time = static_cast<uint64_t>(_current_time);
863
864 uint64_t oldest_key_time =
865 mems_.front()->ApproximateOldestKeyTime();
866
867 // It's not clear whether oldest_key_time is always available. In case
868 // it is not available, use current_time.
869 uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
870
871 TEST_SYNC_POINT_CALLBACK(
872 "FlushJob::WriteLevel0Table:oldest_ancester_time",
873 &oldest_ancester_time);
874 meta_.oldest_ancester_time = oldest_ancester_time;
875
876 meta_.file_creation_time = current_time;
877
878 uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
879 CompactionStyle::kCompactionStyleFIFO)
880 ? current_time
881 : meta_.oldest_ancester_time;
882
883 uint64_t num_input_entries = 0;
884 uint64_t memtable_payload_bytes = 0;
885 uint64_t memtable_garbage_bytes = 0;
886 IOStatus io_s;
887 const std::string* const full_history_ts_low =
888 (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
889 TableBuilderOptions tboptions(
890 *cfd_->ioptions(), mutable_cf_options_, cfd_->internal_comparator(),
891 cfd_->int_tbl_prop_collector_factories(), output_compression_,
892 mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
893 0 /* level */, false /* is_bottommost */,
894 TableFileCreationReason::kFlush, creation_time, oldest_key_time,
895 current_time, db_id_, db_session_id_, 0 /* target_file_size */,
896 meta_.fd.GetNumber());
897 s = BuildTable(
898 dbname_, versions_, db_options_, tboptions, file_options_,
899 cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
900 &blob_file_additions, existing_snapshots_,
901 earliest_write_conflict_snapshot_, snapshot_checker_,
902 mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
903 &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_,
904 job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
905 full_history_ts_low, blob_callback_, &num_input_entries,
906 &memtable_payload_bytes, &memtable_garbage_bytes);
907 if (!io_s.ok()) {
908 io_status_ = io_s;
909 }
910 if (num_input_entries != total_num_entries && s.ok()) {
911 std::string msg = "Expected " + ToString(total_num_entries) +
912 " entries in memtables, but read " +
913 ToString(num_input_entries);
914 ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
915 cfd_->GetName().c_str(), job_context_->job_id,
916 msg.c_str());
917 if (db_options_.flush_verify_memtable_count) {
918 s = Status::Corruption(msg);
919 }
920 }
921 if (tboptions.reason == TableFileCreationReason::kFlush) {
922 TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
923 RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
924 memtable_payload_bytes);
925 RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
926 memtable_garbage_bytes);
927 }
928 LogFlush(db_options_.info_log);
929 }
930 ROCKS_LOG_INFO(db_options_.info_log,
931 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
932 " bytes %s"
933 "%s",
934 cfd_->GetName().c_str(), job_context_->job_id,
935 meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
936 s.ToString().c_str(),
937 meta_.marked_for_compaction ? " (needs compaction)" : "");
938
939 if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
940 s = output_file_directory_->Fsync(IOOptions(), nullptr);
941 }
942 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
943 db_mutex_->Lock();
944 }
945 base_->Unref();
946
947 // Note that if file_size is zero, the file has been deleted and
948 // should not be added to the manifest.
949 const bool has_output = meta_.fd.GetFileSize() > 0;
950
951 if (s.ok() && has_output) {
952 TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
953 // if we have more than 1 background thread, then we cannot
954 // insert files directly into higher levels because some other
955 // threads could be concurrently producing compacted files for
956 // that key range.
957 // Add file to L0
958 edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
959 meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
960 meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
961 meta_.marked_for_compaction, meta_.oldest_blob_file_number,
962 meta_.oldest_ancester_time, meta_.file_creation_time,
963 meta_.file_checksum, meta_.file_checksum_func_name);
964
965 edit_->SetBlobFileAdditions(std::move(blob_file_additions));
966 }
967 #ifndef ROCKSDB_LITE
968 // Piggyback FlushJobInfo on the first first flushed memtable.
969 mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
970 #endif // !ROCKSDB_LITE
971
972 // Note that here we treat flush as level 0 compaction in internal stats
973 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
974 const uint64_t micros = clock_->NowMicros() - start_micros;
975 const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
976 stats.micros = micros;
977 stats.cpu_micros = cpu_micros;
978
979 ROCKS_LOG_INFO(db_options_.info_log,
980 "[%s] [JOB %d] Flush lasted %" PRIu64
981 " microseconds, and %" PRIu64 " cpu microseconds.\n",
982 cfd_->GetName().c_str(), job_context_->job_id, micros,
983 cpu_micros);
984
985 if (has_output) {
986 stats.bytes_written = meta_.fd.GetFileSize();
987 stats.num_output_files = 1;
988 }
989
990 const auto& blobs = edit_->GetBlobFileAdditions();
991 for (const auto& blob : blobs) {
992 stats.bytes_written_blob += blob.GetTotalBlobBytes();
993 }
994
995 stats.num_output_files_blob = static_cast<int>(blobs.size());
996
997 RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
998 cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
999 cfd_->internal_stats()->AddCFStats(
1000 InternalStats::BYTES_FLUSHED,
1001 stats.bytes_written + stats.bytes_written_blob);
1002 RecordFlushIOStats();
1003
1004 return s;
1005 }
1006
1007 #ifndef ROCKSDB_LITE
GetFlushJobInfo() const1008 std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
1009 db_mutex_->AssertHeld();
1010 std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
1011 info->cf_id = cfd_->GetID();
1012 info->cf_name = cfd_->GetName();
1013
1014 const uint64_t file_number = meta_.fd.GetNumber();
1015 info->file_path =
1016 MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
1017 info->file_number = file_number;
1018 info->oldest_blob_file_number = meta_.oldest_blob_file_number;
1019 info->thread_id = db_options_.env->GetThreadID();
1020 info->job_id = job_context_->job_id;
1021 info->smallest_seqno = meta_.fd.smallest_seqno;
1022 info->largest_seqno = meta_.fd.largest_seqno;
1023 info->table_properties = table_properties_;
1024 info->flush_reason = cfd_->GetFlushReason();
1025 info->blob_compression_type = mutable_cf_options_.blob_compression_type;
1026
1027 // Update BlobFilesInfo.
1028 for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
1029 BlobFileAdditionInfo blob_file_addition_info(
1030 BlobFileName(cfd_->ioptions()->cf_paths.front().path,
1031 blob_file.GetBlobFileNumber()) /*blob_file_path*/,
1032 blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
1033 blob_file.GetTotalBlobBytes());
1034 info->blob_file_addition_infos.emplace_back(
1035 std::move(blob_file_addition_info));
1036 }
1037 return info;
1038 }
1039
1040 #endif // !ROCKSDB_LITE
1041
1042 } // namespace ROCKSDB_NAMESPACE
1043