1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/flush_job.h"
11 
12 #include <cinttypes>
13 
14 #include <algorithm>
15 #include <vector>
16 
17 #include "db/builder.h"
18 #include "db/db_iter.h"
19 #include "db/dbformat.h"
20 #include "db/event_helpers.h"
21 #include "db/log_reader.h"
22 #include "db/log_writer.h"
23 #include "db/memtable.h"
24 #include "db/memtable_list.h"
25 #include "db/merge_context.h"
26 #include "db/range_tombstone_fragmenter.h"
27 #include "db/version_set.h"
28 #include "file/file_util.h"
29 #include "file/filename.h"
30 #include "logging/event_logger.h"
31 #include "logging/log_buffer.h"
32 #include "logging/logging.h"
33 #include "monitoring/iostats_context_imp.h"
34 #include "monitoring/perf_context_imp.h"
35 #include "monitoring/thread_status_util.h"
36 #include "port/port.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/statistics.h"
40 #include "rocksdb/status.h"
41 #include "rocksdb/table.h"
42 #include "table/merging_iterator.h"
43 #include "table/table_builder.h"
44 #include "table/two_level_iterator.h"
45 #include "test_util/sync_point.h"
46 #include "util/coding.h"
47 #include "util/mutexlock.h"
48 #include "util/stop_watch.h"
49 
50 namespace ROCKSDB_NAMESPACE {
51 
GetFlushReasonString(FlushReason flush_reason)52 const char* GetFlushReasonString (FlushReason flush_reason) {
53   switch (flush_reason) {
54     case FlushReason::kOthers:
55       return "Other Reasons";
56     case FlushReason::kGetLiveFiles:
57       return "Get Live Files";
58     case FlushReason::kShutDown:
59       return "Shut down";
60     case FlushReason::kExternalFileIngestion:
61       return "External File Ingestion";
62     case FlushReason::kManualCompaction:
63       return "Manual Compaction";
64     case FlushReason::kWriteBufferManager:
65       return "Write Buffer Manager";
66     case FlushReason::kWriteBufferFull:
67       return "Write Buffer Full";
68     case FlushReason::kTest:
69       return "Test";
70     case FlushReason::kDeleteFiles:
71       return "Delete Files";
72     case FlushReason::kAutoCompaction:
73       return "Auto Compaction";
74     case FlushReason::kManualFlush:
75       return "Manual Flush";
76     case FlushReason::kErrorRecovery:
77       return "Error Recovery";
78     case FlushReason::kWalFull:
79       return "WAL Full";
80     default:
81       return "Invalid";
82   }
83 }
84 
FlushJob(const std::string & dbname,ColumnFamilyData * cfd,const ImmutableDBOptions & db_options,const MutableCFOptions & mutable_cf_options,uint64_t max_memtable_id,const FileOptions & file_options,VersionSet * versions,InstrumentedMutex * db_mutex,std::atomic<bool> * shutting_down,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,JobContext * job_context,LogBuffer * log_buffer,FSDirectory * db_directory,FSDirectory * output_file_directory,CompressionType output_compression,Statistics * stats,EventLogger * event_logger,bool measure_io_stats,const bool sync_output_directory,const bool write_manifest,Env::Priority thread_pri,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_id,const std::string & db_session_id,std::string full_history_ts_low,BlobFileCompletionCallback * blob_callback)85 FlushJob::FlushJob(
86     const std::string& dbname, ColumnFamilyData* cfd,
87     const ImmutableDBOptions& db_options,
88     const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
89     const FileOptions& file_options, VersionSet* versions,
90     InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
91     std::vector<SequenceNumber> existing_snapshots,
92     SequenceNumber earliest_write_conflict_snapshot,
93     SnapshotChecker* snapshot_checker, JobContext* job_context,
94     LogBuffer* log_buffer, FSDirectory* db_directory,
95     FSDirectory* output_file_directory, CompressionType output_compression,
96     Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
97     const bool sync_output_directory, const bool write_manifest,
98     Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
99     const std::string& db_id, const std::string& db_session_id,
100     std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
101     : dbname_(dbname),
102       db_id_(db_id),
103       db_session_id_(db_session_id),
104       cfd_(cfd),
105       db_options_(db_options),
106       mutable_cf_options_(mutable_cf_options),
107       max_memtable_id_(max_memtable_id),
108       file_options_(file_options),
109       versions_(versions),
110       db_mutex_(db_mutex),
111       shutting_down_(shutting_down),
112       existing_snapshots_(std::move(existing_snapshots)),
113       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
114       snapshot_checker_(snapshot_checker),
115       job_context_(job_context),
116       log_buffer_(log_buffer),
117       db_directory_(db_directory),
118       output_file_directory_(output_file_directory),
119       output_compression_(output_compression),
120       stats_(stats),
121       event_logger_(event_logger),
122       measure_io_stats_(measure_io_stats),
123       sync_output_directory_(sync_output_directory),
124       write_manifest_(write_manifest),
125       edit_(nullptr),
126       base_(nullptr),
127       pick_memtable_called(false),
128       thread_pri_(thread_pri),
129       io_tracer_(io_tracer),
130       clock_(db_options_.clock),
131       full_history_ts_low_(std::move(full_history_ts_low)),
132       blob_callback_(blob_callback) {
133   // Update the thread status to indicate flush.
134   ReportStartedFlush();
135   TEST_SYNC_POINT("FlushJob::FlushJob()");
136 }
137 
~FlushJob()138 FlushJob::~FlushJob() {
139   io_status_.PermitUncheckedError();
140   ThreadStatusUtil::ResetThreadStatus();
141 }
142 
ReportStartedFlush()143 void FlushJob::ReportStartedFlush() {
144   ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
145                                     db_options_.enable_thread_tracking);
146   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
147   ThreadStatusUtil::SetThreadOperationProperty(
148       ThreadStatus::COMPACTION_JOB_ID,
149       job_context_->job_id);
150   IOSTATS_RESET(bytes_written);
151 }
152 
ReportFlushInputSize(const autovector<MemTable * > & mems)153 void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
154   uint64_t input_size = 0;
155   for (auto* mem : mems) {
156     input_size += mem->ApproximateMemoryUsage();
157   }
158   ThreadStatusUtil::IncreaseThreadOperationProperty(
159       ThreadStatus::FLUSH_BYTES_MEMTABLES,
160       input_size);
161 }
162 
RecordFlushIOStats()163 void FlushJob::RecordFlushIOStats() {
164   RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
165   ThreadStatusUtil::IncreaseThreadOperationProperty(
166       ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
167   IOSTATS_RESET(bytes_written);
168 }
PickMemTable()169 void FlushJob::PickMemTable() {
170   db_mutex_->AssertHeld();
171   assert(!pick_memtable_called);
172   pick_memtable_called = true;
173   // Save the contents of the earliest memtable as a new Table
174   cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
175   if (mems_.empty()) {
176     return;
177   }
178 
179   ReportFlushInputSize(mems_);
180 
181   // entries mems are (implicitly) sorted in ascending order by their created
182   // time. We will use the first memtable's `edit` to keep the meta info for
183   // this flush.
184   MemTable* m = mems_[0];
185   edit_ = m->GetEdits();
186   edit_->SetPrevLogNumber(0);
187   // SetLogNumber(log_num) indicates logs with number smaller than log_num
188   // will no longer be picked up for recovery.
189   edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
190   edit_->SetColumnFamily(cfd_->GetID());
191 
192   // path 0 for level 0 file.
193   meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
194 
195   base_ = cfd_->current();
196   base_->Ref();  // it is likely that we do not need this reference
197 }
198 
Run(LogsWithPrepTracker * prep_tracker,FileMetaData * file_meta,bool * switched_to_mempurge)199 Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
200                      bool* switched_to_mempurge) {
201   TEST_SYNC_POINT("FlushJob::Start");
202   db_mutex_->AssertHeld();
203   assert(pick_memtable_called);
204   AutoThreadOperationStageUpdater stage_run(
205       ThreadStatus::STAGE_FLUSH_RUN);
206   if (mems_.empty()) {
207     ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
208                      cfd_->GetName().c_str());
209     return Status::OK();
210   }
211 
212   // I/O measurement variables
213   PerfLevel prev_perf_level = PerfLevel::kEnableTime;
214   uint64_t prev_write_nanos = 0;
215   uint64_t prev_fsync_nanos = 0;
216   uint64_t prev_range_sync_nanos = 0;
217   uint64_t prev_prepare_write_nanos = 0;
218   uint64_t prev_cpu_write_nanos = 0;
219   uint64_t prev_cpu_read_nanos = 0;
220   if (measure_io_stats_) {
221     prev_perf_level = GetPerfLevel();
222     SetPerfLevel(PerfLevel::kEnableTime);
223     prev_write_nanos = IOSTATS(write_nanos);
224     prev_fsync_nanos = IOSTATS(fsync_nanos);
225     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
226     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
227     prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
228     prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
229   }
230   Status mempurge_s = Status::NotFound("No MemPurge.");
231   if ((db_options_.experimental_mempurge_threshold > 0.0) &&
232       (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
233       (!mems_.empty()) && MemPurgeDecider()) {
234     mempurge_s = MemPurge();
235     if (!mempurge_s.ok()) {
236       // Mempurge is typically aborted when the output
237       // bytes cannot be contained onto a single output memtable.
238       if (mempurge_s.IsAborted()) {
239         ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
240                        mempurge_s.ToString().c_str());
241       } else {
242         // However the mempurge process can also fail for
243         // other reasons (eg: new_mem->Add() fails).
244         ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
245                        mempurge_s.ToString().c_str());
246       }
247     } else {
248       if (switched_to_mempurge) {
249         *switched_to_mempurge = true;
250       } else {
251         // The mempurge process was successful, but no switch_to_mempurge
252         // pointer provided so no way to propagate the state of flush job.
253         ROCKS_LOG_WARN(db_options_.info_log,
254                        "Mempurge process succeeded"
255                        "but no 'switched_to_mempurge' ptr provided.\n");
256       }
257     }
258   }
259   Status s;
260   if (mempurge_s.ok()) {
261     base_->Unref();
262     s = Status::OK();
263   } else {
264     // This will release and re-acquire the mutex.
265     s = WriteLevel0Table();
266   }
267 
268   if (s.ok() && cfd_->IsDropped()) {
269     s = Status::ColumnFamilyDropped("Column family dropped during compaction");
270   }
271   if ((s.ok() || s.IsColumnFamilyDropped()) &&
272       shutting_down_->load(std::memory_order_acquire)) {
273     s = Status::ShutdownInProgress("Database shutdown");
274   }
275 
276   if (!s.ok()) {
277     cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
278   } else if (write_manifest_) {
279     TEST_SYNC_POINT("FlushJob::InstallResults");
280     // Replace immutable memtable with the generated Table
281     IOStatus tmp_io_s;
282     s = cfd_->imm()->TryInstallMemtableFlushResults(
283         cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
284         meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
285         log_buffer_, &committed_flush_jobs_info_, &tmp_io_s,
286         !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
287                               but 'false' if mempurge successful: no new min log number
288                               or new level 0 file path to write to manifest. */);
289     if (!tmp_io_s.ok()) {
290       io_status_ = tmp_io_s;
291     }
292   }
293 
294   if (s.ok() && file_meta != nullptr) {
295     *file_meta = meta_;
296   }
297   RecordFlushIOStats();
298 
299   // When measure_io_stats_ is true, the default 512 bytes is not enough.
300   auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
301   stream << "job" << job_context_->job_id << "event"
302          << "flush_finished";
303   stream << "output_compression"
304          << CompressionTypeToString(output_compression_);
305   stream << "lsm_state";
306   stream.StartArray();
307   auto vstorage = cfd_->current()->storage_info();
308   for (int level = 0; level < vstorage->num_levels(); ++level) {
309     stream << vstorage->NumLevelFiles(level);
310   }
311   stream.EndArray();
312 
313   const auto& blob_files = vstorage->GetBlobFiles();
314   if (!blob_files.empty()) {
315     stream << "blob_file_head" << blob_files.begin()->first;
316     stream << "blob_file_tail" << blob_files.rbegin()->first;
317   }
318 
319   stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
320 
321   if (measure_io_stats_) {
322     if (prev_perf_level != PerfLevel::kEnableTime) {
323       SetPerfLevel(prev_perf_level);
324     }
325     stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
326     stream << "file_range_sync_nanos"
327            << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
328     stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
329     stream << "file_prepare_write_nanos"
330            << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
331     stream << "file_cpu_write_nanos"
332            << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
333     stream << "file_cpu_read_nanos"
334            << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
335   }
336 
337   return s;
338 }
339 
Cancel()340 void FlushJob::Cancel() {
341   db_mutex_->AssertHeld();
342   assert(base_ != nullptr);
343   base_->Unref();
344 }
345 
MemPurge()346 Status FlushJob::MemPurge() {
347   Status s;
348   db_mutex_->AssertHeld();
349   db_mutex_->Unlock();
350   assert(!mems_.empty());
351 
352   // Measure purging time.
353   const uint64_t start_micros = clock_->NowMicros();
354   const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;
355 
356   MemTable* new_mem = nullptr;
357   // For performance/log investigation purposes:
358   // look at how much useful payload we harvest in the new_mem.
359   // This value is then printed to the DB log.
360   double new_mem_capacity = 0.0;
361 
362   // Create two iterators, one for the memtable data (contains
363   // info from puts + deletes), and one for the memtable
364   // Range Tombstones (from DeleteRanges).
365   ReadOptions ro;
366   ro.total_order_seek = true;
367   Arena arena;
368   std::vector<InternalIterator*> memtables;
369   std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
370       range_del_iters;
371   for (MemTable* m : mems_) {
372     memtables.push_back(m->NewIterator(ro, &arena));
373     auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
374     if (range_del_iter != nullptr) {
375       range_del_iters.emplace_back(range_del_iter);
376     }
377   }
378 
379   assert(!memtables.empty());
380   SequenceNumber first_seqno = kMaxSequenceNumber;
381   SequenceNumber earliest_seqno = kMaxSequenceNumber;
382   // Pick first and earliest seqno as min of all first_seqno
383   // and earliest_seqno of the mempurged memtables.
384   for (const auto& mem : mems_) {
385     first_seqno = mem->GetFirstSequenceNumber() < first_seqno
386                       ? mem->GetFirstSequenceNumber()
387                       : first_seqno;
388     earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
389                          ? mem->GetEarliestSequenceNumber()
390                          : earliest_seqno;
391   }
392 
393   ScopedArenaIterator iter(
394       NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
395                          static_cast<int>(memtables.size()), &arena));
396 
397   auto* ioptions = cfd_->ioptions();
398 
399   // Place iterator at the First (meaning most recent) key node.
400   iter->SeekToFirst();
401 
402   std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
403       new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
404                                        existing_snapshots_));
405   for (auto& rd_iter : range_del_iters) {
406     range_del_agg->AddTombstones(std::move(rd_iter));
407   }
408 
409   // If there is valid data in the memtable,
410   // or at least range tombstones, copy over the info
411   // to the new memtable.
412   if (iter->Valid() || !range_del_agg->IsEmpty()) {
413     // MaxSize is the size of a memtable.
414     size_t maxSize = mutable_cf_options_.write_buffer_size;
415     std::unique_ptr<CompactionFilter> compaction_filter;
416     if (ioptions->compaction_filter_factory != nullptr &&
417         ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
418             TableFileCreationReason::kFlush)) {
419       CompactionFilter::Context ctx;
420       ctx.is_full_compaction = false;
421       ctx.is_manual_compaction = false;
422       ctx.column_family_id = cfd_->GetID();
423       ctx.reason = TableFileCreationReason::kFlush;
424       compaction_filter =
425           ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
426       if (compaction_filter != nullptr &&
427           !compaction_filter->IgnoreSnapshots()) {
428         s = Status::NotSupported(
429             "CompactionFilter::IgnoreSnapshots() = false is not supported "
430             "anymore.");
431         return s;
432       }
433     }
434 
435     new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
436                            mutable_cf_options_, cfd_->write_buffer_mgr(),
437                            earliest_seqno, cfd_->GetID());
438     assert(new_mem != nullptr);
439 
440     Env* env = db_options_.env;
441     assert(env);
442     MergeHelper merge(
443         env, (cfd_->internal_comparator()).user_comparator(),
444         (ioptions->merge_operator).get(), compaction_filter.get(),
445         ioptions->logger, true /* internal key corruption is not ok */,
446         existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
447         snapshot_checker_);
448     CompactionIterator c_iter(
449         iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
450         kMaxSequenceNumber, &existing_snapshots_,
451         earliest_write_conflict_snapshot_, snapshot_checker_, env,
452         ShouldReportDetailedTime(env, ioptions->stats),
453         true /* internal key corruption is not ok */, range_del_agg.get(),
454         nullptr, ioptions->allow_data_in_errors,
455         /*compaction=*/nullptr, compaction_filter.get(),
456         /*shutting_down=*/nullptr,
457         /*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
458         /*manual_compaction_canceled=*/nullptr, ioptions->info_log,
459         &(cfd_->GetFullHistoryTsLow()));
460 
461     // Set earliest sequence number in the new memtable
462     // to be equal to the earliest sequence number of the
463     // memtable being flushed (See later if there is a need
464     // to update this number!).
465     new_mem->SetEarliestSequenceNumber(earliest_seqno);
466     // Likewise for first seq number.
467     new_mem->SetFirstSequenceNumber(first_seqno);
468     SequenceNumber new_first_seqno = kMaxSequenceNumber;
469 
470     c_iter.SeekToFirst();
471 
472     // Key transfer
473     for (; c_iter.Valid(); c_iter.Next()) {
474       const ParsedInternalKey ikey = c_iter.ikey();
475       const Slice value = c_iter.value();
476       new_first_seqno =
477           ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
478 
479       // Should we update "OldestKeyTime" ???? -> timestamp appear
480       // to still be an "experimental" feature.
481       s = new_mem->Add(
482           ikey.sequence, ikey.type, ikey.user_key, value,
483           nullptr,   // KV protection info set as nullptr since it
484                      // should only be useful for the first add to
485                      // the original memtable.
486           false,     // : allow concurrent_memtable_writes_
487                      // Not seen as necessary for now.
488           nullptr,   // get_post_process_info(m) must be nullptr
489                      // when concurrent_memtable_writes is switched off.
490           nullptr);  // hint, only used when concurrent_memtable_writes_
491                      // is switched on.
492       if (!s.ok()) {
493         break;
494       }
495 
496       // If new_mem has size greater than maxSize,
497       // then rollback to regular flush operation,
498       // and destroy new_mem.
499       if (new_mem->ApproximateMemoryUsage() > maxSize) {
500         s = Status::Aborted("Mempurge filled more than one memtable.");
501         new_mem_capacity = 1.0;
502         break;
503       }
504     }
505 
506     // Check status and propagate
507     // potential error status from c_iter
508     if (!s.ok()) {
509       c_iter.status().PermitUncheckedError();
510     } else if (!c_iter.status().ok()) {
511       s = c_iter.status();
512     }
513 
514     // Range tombstone transfer.
515     if (s.ok()) {
516       auto range_del_it = range_del_agg->NewIterator();
517       for (range_del_it->SeekToFirst(); range_del_it->Valid();
518            range_del_it->Next()) {
519         auto tombstone = range_del_it->Tombstone();
520         new_first_seqno =
521             tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
522         s = new_mem->Add(
523             tombstone.seq_,        // Sequence number
524             kTypeRangeDeletion,    // KV type
525             tombstone.start_key_,  // Key is start key.
526             tombstone.end_key_,    // Value is end key.
527             nullptr,               // KV protection info set as nullptr since it
528                                    // should only be useful for the first add to
529                                    // the original memtable.
530             false,                 // : allow concurrent_memtable_writes_
531                                    // Not seen as necessary for now.
532             nullptr,               // get_post_process_info(m) must be nullptr
533                       // when concurrent_memtable_writes is switched off.
534             nullptr);  // hint, only used when concurrent_memtable_writes_
535                        // is switched on.
536 
537         if (!s.ok()) {
538           break;
539         }
540 
541         // If new_mem has size greater than maxSize,
542         // then rollback to regular flush operation,
543         // and destroy new_mem.
544         if (new_mem->ApproximateMemoryUsage() > maxSize) {
545           s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
546           new_mem_capacity = 1.0;
547           break;
548         }
549       }
550     }
551 
552     // If everything happened smoothly and new_mem contains valid data,
553     // decide if it is flushed to storage or kept in the imm()
554     // memtable list (memory).
555     if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
556       // Rectify the first sequence number, which (unlike the earliest seq
557       // number) needs to be present in the new memtable.
558       new_mem->SetFirstSequenceNumber(new_first_seqno);
559 
560       // The new_mem is added to the list of immutable memtables
561       // only if it filled at less than 100% capacity and isn't flagged
562       // as in need of being flushed.
563       if (new_mem->ApproximateMemoryUsage() < maxSize &&
564           !(new_mem->ShouldFlushNow())) {
565         db_mutex_->Lock();
566         uint64_t new_mem_id = mems_[0]->GetID();
567 
568         new_mem->SetID(new_mem_id);
569 
570         // This addition will not trigger another flush, because
571         // we do not call SchedulePendingFlush().
572         cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
573         new_mem->Ref();
574 #ifndef ROCKSDB_LITE
575         // Piggyback FlushJobInfo on the first flushed memtable.
576         db_mutex_->AssertHeld();
577         meta_.fd.file_size = 0;
578         mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
579 #endif  // !ROCKSDB_LITE
580         db_mutex_->Unlock();
581       } else {
582         s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
583         new_mem_capacity = 1.0;
584         if (new_mem) {
585           job_context_->memtables_to_free.push_back(new_mem);
586         }
587       }
588     } else {
589       // In this case, the newly allocated new_mem is empty.
590       assert(new_mem != nullptr);
591       job_context_->memtables_to_free.push_back(new_mem);
592     }
593   }
594 
595   // Reacquire the mutex for WriteLevel0 function.
596   db_mutex_->Lock();
597 
598   // If mempurge successful, don't write input tables to level0,
599   // but write any full output table to level0.
600   if (s.ok()) {
601     TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
602   } else {
603     TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
604   }
605   const uint64_t micros = clock_->NowMicros() - start_micros;
606   const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
607   ROCKS_LOG_INFO(db_options_.info_log,
608                  "[%s] [JOB %d] Mempurge lasted %" PRIu64
609                  " microseconds, and %" PRIu64
610                  " cpu "
611                  "microseconds. Status is %s ok. Perc capacity: %f\n",
612                  cfd_->GetName().c_str(), job_context_->job_id, micros,
613                  cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
614 
615   return s;
616 }
617 
MemPurgeDecider()618 bool FlushJob::MemPurgeDecider() {
619   double threshold = db_options_.experimental_mempurge_threshold;
620   // Never trigger mempurge if threshold is not a strictly positive value.
621   if (!(threshold > 0.0)) {
622     return false;
623   }
624   if (threshold > (1.0 * mems_.size())) {
625     return true;
626   }
627   // Payload and useful_payload (in bytes).
628   // The useful payload ratio of a given MemTable
629   // is estimated to be useful_payload/payload.
630   uint64_t payload = 0, useful_payload = 0, entry_size = 0;
631 
632   // Local variables used repetitively inside the for-loop
633   // when iterating over the sampled entries.
634   Slice key_slice, value_slice;
635   ParsedInternalKey res;
636   SnapshotImpl min_snapshot;
637   std::string vget;
638   Status mget_s, parse_s;
639   MergeContext merge_context;
640   SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
641                  min_seqno_snapshot = 0;
642   bool get_res, can_be_useful_payload, not_in_next_mems;
643 
644   // If estimated_useful_payload is > threshold,
645   // then flush to storage, else MemPurge.
646   double estimated_useful_payload = 0.0;
647   // Cochran formula for determining sample size.
648   // 95% confidence interval, 7% precision.
649   //    n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
650   double n0 = 196.0;
651   ReadOptions ro;
652   ro.total_order_seek = true;
653 
654   // Iterate over each memtable of the set.
655   for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
656        mem_iter++) {
657     MemTable* mt = *mem_iter;
658 
659     // Else sample from the table.
660     uint64_t nentries = mt->num_entries();
661     // Corrected Cochran formula for small populations
662     // (converges to n0 for large populations).
663     uint64_t target_sample_size =
664         static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
665     std::unordered_set<const char*> sentries = {};
666     // Populate sample entries set.
667     mt->UniqueRandomSample(target_sample_size, &sentries);
668 
669     // Estimate the garbage ratio by comparing if
670     // each sample corresponds to a valid entry.
671     for (const char* ss : sentries) {
672       key_slice = GetLengthPrefixedSlice(ss);
673       parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
674       if (!parse_s.ok()) {
675         ROCKS_LOG_WARN(db_options_.info_log,
676                        "Memtable Decider: ParseInternalKey did not parse "
677                        "key_slice %s successfully.",
678                        key_slice.data());
679       }
680 
681       // Size of the entry is "key size (+ value size if KV entry)"
682       entry_size = key_slice.size();
683       if (res.type == kTypeValue) {
684         value_slice =
685             GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
686         entry_size += value_slice.size();
687       }
688 
689       // Count entry bytes as payload.
690       payload += entry_size;
691 
692       LookupKey lkey(res.user_key, kMaxSequenceNumber);
693 
694       // Paranoia: zero out these values just in case.
695       max_covering_tombstone_seq = 0;
696       sqno = 0;
697 
698       // Pick the oldest existing snapshot that is more recent
699       // than the sequence number of the sampled entry.
700       min_seqno_snapshot = kMaxSequenceNumber;
701       for (SequenceNumber seq_num : existing_snapshots_) {
702         if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
703           min_seqno_snapshot = seq_num;
704         }
705       }
706       min_snapshot.number_ = min_seqno_snapshot;
707       ro.snapshot =
708           min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
709 
710       // Estimate if the sample entry is valid or not.
711       get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
712                         &max_covering_tombstone_seq, &sqno, ro);
713       if (!get_res) {
714         ROCKS_LOG_WARN(
715             db_options_.info_log,
716             "Memtable Get returned false when Get(sampled entry). "
717             "Yet each sample entry should exist somewhere in the memtable, "
718             "unrelated to whether it has been deleted or not.");
719       }
720 
721       // TODO(bjlemaire): evaluate typeMerge.
722       // This is where the sampled entry is estimated to be
723       // garbage or not. Note that this is a garbage *estimation*
724       // because we do not include certain items such as
725       // CompactionFitlers triggered at flush, or if the same delete
726       // has been inserted twice or more in the memtable.
727 
728       // Evaluate if the entry can be useful payload
729       // Situation #1: entry is a KV entry, was found in the memtable mt
730       //               and the sequence numbers match.
731       can_be_useful_payload = (res.type == kTypeValue) && get_res &&
732                               mget_s.ok() && (sqno == res.sequence);
733 
734       // Situation #2: entry is a delete entry, was found in the memtable mt
735       //               (because gres==true) and no valid KV entry is found.
736       //               (note: duplicate delete entries are also taken into
737       //               account here, because the sequence number 'sqno'
738       //               in memtable->Get(&sqno) operation is set to be equal
739       //               to the most recent delete entry as well).
740       can_be_useful_payload |=
741           ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
742           mget_s.IsNotFound() && get_res && (sqno == res.sequence);
743 
744       // If there is a chance that the entry is useful payload
745       // Verify that the entry does not appear in the following memtables
746       // (memtables with greater memtable ID/larger sequence numbers).
747       if (can_be_useful_payload) {
748         not_in_next_mems = true;
749         for (auto next_mem_iter = mem_iter + 1;
750              next_mem_iter != std::end(mems_); next_mem_iter++) {
751           if ((*next_mem_iter)
752                   ->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
753                         &max_covering_tombstone_seq, &sqno, ro)) {
754             not_in_next_mems = false;
755             break;
756           }
757         }
758         if (not_in_next_mems) {
759           useful_payload += entry_size;
760         }
761       }
762     }
763     if (payload > 0) {
764       // We use the estimated useful payload ratio to
765       // evaluate how many of the memtable bytes are useful bytes.
766       estimated_useful_payload +=
767           (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
768 
769       ROCKS_LOG_INFO(
770           db_options_.info_log,
771           "Mempurge sampling - found garbage ratio from sampling: %f.\n",
772           (payload - useful_payload) * 1.0 / payload);
773     } else {
774       ROCKS_LOG_WARN(db_options_.info_log,
775                      "Mempurge sampling: null payload measured, and collected "
776                      "sample size is %zu\n.",
777                      sentries.size());
778     }
779   }
780   // We convert the total number of useful payload bytes
781   // into the proportion of memtable necessary to store all these bytes.
782   // We compare this proportion with the threshold value.
783   return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
784           threshold);
785 }
786 
WriteLevel0Table()787 Status FlushJob::WriteLevel0Table() {
788   AutoThreadOperationStageUpdater stage_updater(
789       ThreadStatus::STAGE_FLUSH_WRITE_L0);
790   db_mutex_->AssertHeld();
791   const uint64_t start_micros = clock_->NowMicros();
792   const uint64_t start_cpu_micros = clock_->CPUNanos() / 1000;
793   Status s;
794 
795   std::vector<BlobFileAddition> blob_file_additions;
796 
797   {
798     auto write_hint = cfd_->CalculateSSTWriteHint(0);
799     db_mutex_->Unlock();
800     if (log_buffer_) {
801       log_buffer_->FlushBufferToLog();
802     }
803     // memtables and range_del_iters store internal iterators over each data
804     // memtable and its associated range deletion memtable, respectively, at
805     // corresponding indexes.
806     std::vector<InternalIterator*> memtables;
807     std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
808         range_del_iters;
809     ReadOptions ro;
810     ro.total_order_seek = true;
811     Arena arena;
812     uint64_t total_num_entries = 0, total_num_deletes = 0;
813     uint64_t total_data_size = 0;
814     size_t total_memory_usage = 0;
815     for (MemTable* m : mems_) {
816       ROCKS_LOG_INFO(
817           db_options_.info_log,
818           "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
819           cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
820       memtables.push_back(m->NewIterator(ro, &arena));
821       auto* range_del_iter =
822           m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
823       if (range_del_iter != nullptr) {
824         range_del_iters.emplace_back(range_del_iter);
825       }
826       total_num_entries += m->num_entries();
827       total_num_deletes += m->num_deletes();
828       total_data_size += m->get_data_size();
829       total_memory_usage += m->ApproximateMemoryUsage();
830     }
831 
832     event_logger_->Log() << "job" << job_context_->job_id << "event"
833                          << "flush_started"
834                          << "num_memtables" << mems_.size() << "num_entries"
835                          << total_num_entries << "num_deletes"
836                          << total_num_deletes << "total_data_size"
837                          << total_data_size << "memory_usage"
838                          << total_memory_usage << "flush_reason"
839                          << GetFlushReasonString(cfd_->GetFlushReason());
840 
841     {
842       ScopedArenaIterator iter(
843           NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
844                              static_cast<int>(memtables.size()), &arena));
845       ROCKS_LOG_INFO(db_options_.info_log,
846                      "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
847                      cfd_->GetName().c_str(), job_context_->job_id,
848                      meta_.fd.GetNumber());
849 
850       TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
851                                &output_compression_);
852       int64_t _current_time = 0;
853       auto status = clock_->GetCurrentTime(&_current_time);
854       // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
855       if (!status.ok()) {
856         ROCKS_LOG_WARN(
857             db_options_.info_log,
858             "Failed to get current time to populate creation_time property. "
859             "Status: %s",
860             status.ToString().c_str());
861       }
862       const uint64_t current_time = static_cast<uint64_t>(_current_time);
863 
864       uint64_t oldest_key_time =
865           mems_.front()->ApproximateOldestKeyTime();
866 
867       // It's not clear whether oldest_key_time is always available. In case
868       // it is not available, use current_time.
869       uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
870 
871       TEST_SYNC_POINT_CALLBACK(
872           "FlushJob::WriteLevel0Table:oldest_ancester_time",
873           &oldest_ancester_time);
874       meta_.oldest_ancester_time = oldest_ancester_time;
875 
876       meta_.file_creation_time = current_time;
877 
878       uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
879                                 CompactionStyle::kCompactionStyleFIFO)
880                                    ? current_time
881                                    : meta_.oldest_ancester_time;
882 
883       uint64_t num_input_entries = 0;
884       uint64_t memtable_payload_bytes = 0;
885       uint64_t memtable_garbage_bytes = 0;
886       IOStatus io_s;
887       const std::string* const full_history_ts_low =
888           (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
889       TableBuilderOptions tboptions(
890           *cfd_->ioptions(), mutable_cf_options_, cfd_->internal_comparator(),
891           cfd_->int_tbl_prop_collector_factories(), output_compression_,
892           mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
893           0 /* level */, false /* is_bottommost */,
894           TableFileCreationReason::kFlush, creation_time, oldest_key_time,
895           current_time, db_id_, db_session_id_, 0 /* target_file_size */,
896           meta_.fd.GetNumber());
897       s = BuildTable(
898           dbname_, versions_, db_options_, tboptions, file_options_,
899           cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
900           &blob_file_additions, existing_snapshots_,
901           earliest_write_conflict_snapshot_, snapshot_checker_,
902           mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
903           &io_s, io_tracer_, BlobFileCreationReason::kFlush, event_logger_,
904           job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
905           full_history_ts_low, blob_callback_, &num_input_entries,
906           &memtable_payload_bytes, &memtable_garbage_bytes);
907       if (!io_s.ok()) {
908         io_status_ = io_s;
909       }
910       if (num_input_entries != total_num_entries && s.ok()) {
911         std::string msg = "Expected " + ToString(total_num_entries) +
912                           " entries in memtables, but read " +
913                           ToString(num_input_entries);
914         ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
915                        cfd_->GetName().c_str(), job_context_->job_id,
916                        msg.c_str());
917         if (db_options_.flush_verify_memtable_count) {
918           s = Status::Corruption(msg);
919         }
920       }
921       if (tboptions.reason == TableFileCreationReason::kFlush) {
922         TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
923         RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
924                    memtable_payload_bytes);
925         RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
926                    memtable_garbage_bytes);
927       }
928       LogFlush(db_options_.info_log);
929     }
930     ROCKS_LOG_INFO(db_options_.info_log,
931                    "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
932                    " bytes %s"
933                    "%s",
934                    cfd_->GetName().c_str(), job_context_->job_id,
935                    meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
936                    s.ToString().c_str(),
937                    meta_.marked_for_compaction ? " (needs compaction)" : "");
938 
939     if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
940       s = output_file_directory_->Fsync(IOOptions(), nullptr);
941     }
942     TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
943     db_mutex_->Lock();
944   }
945   base_->Unref();
946 
947   // Note that if file_size is zero, the file has been deleted and
948   // should not be added to the manifest.
949   const bool has_output = meta_.fd.GetFileSize() > 0;
950 
951   if (s.ok() && has_output) {
952     TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
953     // if we have more than 1 background thread, then we cannot
954     // insert files directly into higher levels because some other
955     // threads could be concurrently producing compacted files for
956     // that key range.
957     // Add file to L0
958     edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
959                    meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
960                    meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
961                    meta_.marked_for_compaction, meta_.oldest_blob_file_number,
962                    meta_.oldest_ancester_time, meta_.file_creation_time,
963                    meta_.file_checksum, meta_.file_checksum_func_name);
964 
965     edit_->SetBlobFileAdditions(std::move(blob_file_additions));
966   }
967 #ifndef ROCKSDB_LITE
968   // Piggyback FlushJobInfo on the first first flushed memtable.
969   mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
970 #endif  // !ROCKSDB_LITE
971 
972   // Note that here we treat flush as level 0 compaction in internal stats
973   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
974   const uint64_t micros = clock_->NowMicros() - start_micros;
975   const uint64_t cpu_micros = clock_->CPUNanos() / 1000 - start_cpu_micros;
976   stats.micros = micros;
977   stats.cpu_micros = cpu_micros;
978 
979   ROCKS_LOG_INFO(db_options_.info_log,
980                  "[%s] [JOB %d] Flush lasted %" PRIu64
981                  " microseconds, and %" PRIu64 " cpu microseconds.\n",
982                  cfd_->GetName().c_str(), job_context_->job_id, micros,
983                  cpu_micros);
984 
985   if (has_output) {
986     stats.bytes_written = meta_.fd.GetFileSize();
987     stats.num_output_files = 1;
988   }
989 
990   const auto& blobs = edit_->GetBlobFileAdditions();
991   for (const auto& blob : blobs) {
992     stats.bytes_written_blob += blob.GetTotalBlobBytes();
993   }
994 
995   stats.num_output_files_blob = static_cast<int>(blobs.size());
996 
997   RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
998   cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
999   cfd_->internal_stats()->AddCFStats(
1000       InternalStats::BYTES_FLUSHED,
1001       stats.bytes_written + stats.bytes_written_blob);
1002   RecordFlushIOStats();
1003 
1004   return s;
1005 }
1006 
1007 #ifndef ROCKSDB_LITE
GetFlushJobInfo() const1008 std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
1009   db_mutex_->AssertHeld();
1010   std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
1011   info->cf_id = cfd_->GetID();
1012   info->cf_name = cfd_->GetName();
1013 
1014   const uint64_t file_number = meta_.fd.GetNumber();
1015   info->file_path =
1016       MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
1017   info->file_number = file_number;
1018   info->oldest_blob_file_number = meta_.oldest_blob_file_number;
1019   info->thread_id = db_options_.env->GetThreadID();
1020   info->job_id = job_context_->job_id;
1021   info->smallest_seqno = meta_.fd.smallest_seqno;
1022   info->largest_seqno = meta_.fd.largest_seqno;
1023   info->table_properties = table_properties_;
1024   info->flush_reason = cfd_->GetFlushReason();
1025   info->blob_compression_type = mutable_cf_options_.blob_compression_type;
1026 
1027   // Update BlobFilesInfo.
1028   for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
1029     BlobFileAdditionInfo blob_file_addition_info(
1030         BlobFileName(cfd_->ioptions()->cf_paths.front().path,
1031                      blob_file.GetBlobFileNumber()) /*blob_file_path*/,
1032         blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
1033         blob_file.GetTotalBlobBytes());
1034     info->blob_file_addition_infos.emplace_back(
1035         std::move(blob_file_addition_info));
1036   }
1037   return info;
1038 }
1039 
1040 #endif  // !ROCKSDB_LITE
1041 
1042 }  // namespace ROCKSDB_NAMESPACE
1043