1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/flush_job.h"
11 
12 #include <cinttypes>
13 
14 #include <algorithm>
15 #include <vector>
16 
17 #include "db/builder.h"
18 #include "db/db_iter.h"
19 #include "db/dbformat.h"
20 #include "db/event_helpers.h"
21 #include "db/log_reader.h"
22 #include "db/log_writer.h"
23 #include "db/memtable.h"
24 #include "db/memtable_list.h"
25 #include "db/merge_context.h"
26 #include "db/range_tombstone_fragmenter.h"
27 #include "db/version_set.h"
28 #include "file/file_util.h"
29 #include "file/filename.h"
30 #include "logging/event_logger.h"
31 #include "logging/log_buffer.h"
32 #include "logging/logging.h"
33 #include "monitoring/iostats_context_imp.h"
34 #include "monitoring/perf_context_imp.h"
35 #include "monitoring/thread_status_util.h"
36 #include "port/port.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/statistics.h"
40 #include "rocksdb/status.h"
41 #include "rocksdb/table.h"
42 #include "table/block_based/block.h"
43 #include "table/block_based/block_based_table_factory.h"
44 #include "table/merging_iterator.h"
45 #include "table/table_builder.h"
46 #include "table/two_level_iterator.h"
47 #include "test_util/sync_point.h"
48 #include "util/coding.h"
49 #include "util/mutexlock.h"
50 #include "util/stop_watch.h"
51 
52 namespace rocksdb {
53 
GetFlushReasonString(FlushReason flush_reason)54 const char* GetFlushReasonString (FlushReason flush_reason) {
55   switch (flush_reason) {
56     case FlushReason::kOthers:
57       return "Other Reasons";
58     case FlushReason::kGetLiveFiles:
59       return "Get Live Files";
60     case FlushReason::kShutDown:
61       return "Shut down";
62     case FlushReason::kExternalFileIngestion:
63       return "External File Ingestion";
64     case FlushReason::kManualCompaction:
65       return "Manual Compaction";
66     case FlushReason::kWriteBufferManager:
67       return "Write Buffer Manager";
68     case FlushReason::kWriteBufferFull:
69       return "Write Buffer Full";
70     case FlushReason::kTest:
71       return "Test";
72     case FlushReason::kDeleteFiles:
73       return "Delete Files";
74     case FlushReason::kAutoCompaction:
75       return "Auto Compaction";
76     case FlushReason::kManualFlush:
77       return "Manual Flush";
78     case FlushReason::kErrorRecovery:
79       return "Error Recovery";
80     default:
81       return "Invalid";
82   }
83 }
84 
FlushJob(const std::string & dbname,ColumnFamilyData * cfd,const ImmutableDBOptions & db_options,const MutableCFOptions & mutable_cf_options,const uint64_t * max_memtable_id,const FileOptions & file_options,VersionSet * versions,InstrumentedMutex * db_mutex,std::atomic<bool> * shutting_down,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,JobContext * job_context,LogBuffer * log_buffer,Directory * db_directory,Directory * output_file_directory,CompressionType output_compression,Statistics * stats,EventLogger * event_logger,bool measure_io_stats,const bool sync_output_directory,const bool write_manifest,Env::Priority thread_pri)85 FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
86                    const ImmutableDBOptions& db_options,
87                    const MutableCFOptions& mutable_cf_options,
88                    const uint64_t* max_memtable_id,
89                    const FileOptions& file_options, VersionSet* versions,
90                    InstrumentedMutex* db_mutex,
91                    std::atomic<bool>* shutting_down,
92                    std::vector<SequenceNumber> existing_snapshots,
93                    SequenceNumber earliest_write_conflict_snapshot,
94                    SnapshotChecker* snapshot_checker, JobContext* job_context,
95                    LogBuffer* log_buffer, Directory* db_directory,
96                    Directory* output_file_directory,
97                    CompressionType output_compression, Statistics* stats,
98                    EventLogger* event_logger, bool measure_io_stats,
99                    const bool sync_output_directory, const bool write_manifest,
100                    Env::Priority thread_pri)
101     : dbname_(dbname),
102       cfd_(cfd),
103       db_options_(db_options),
104       mutable_cf_options_(mutable_cf_options),
105       max_memtable_id_(max_memtable_id),
106       file_options_(file_options),
107       versions_(versions),
108       db_mutex_(db_mutex),
109       shutting_down_(shutting_down),
110       existing_snapshots_(std::move(existing_snapshots)),
111       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
112       snapshot_checker_(snapshot_checker),
113       job_context_(job_context),
114       log_buffer_(log_buffer),
115       db_directory_(db_directory),
116       output_file_directory_(output_file_directory),
117       output_compression_(output_compression),
118       stats_(stats),
119       event_logger_(event_logger),
120       measure_io_stats_(measure_io_stats),
121       sync_output_directory_(sync_output_directory),
122       write_manifest_(write_manifest),
123       edit_(nullptr),
124       base_(nullptr),
125       pick_memtable_called(false),
126       thread_pri_(thread_pri) {
127   // Update the thread status to indicate flush.
128   ReportStartedFlush();
129   TEST_SYNC_POINT("FlushJob::FlushJob()");
130 }
131 
~FlushJob()132 FlushJob::~FlushJob() {
133   ThreadStatusUtil::ResetThreadStatus();
134 }
135 
ReportStartedFlush()136 void FlushJob::ReportStartedFlush() {
137   ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
138                                     db_options_.enable_thread_tracking);
139   ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
140   ThreadStatusUtil::SetThreadOperationProperty(
141       ThreadStatus::COMPACTION_JOB_ID,
142       job_context_->job_id);
143   IOSTATS_RESET(bytes_written);
144 }
145 
ReportFlushInputSize(const autovector<MemTable * > & mems)146 void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
147   uint64_t input_size = 0;
148   for (auto* mem : mems) {
149     input_size += mem->ApproximateMemoryUsage();
150   }
151   ThreadStatusUtil::IncreaseThreadOperationProperty(
152       ThreadStatus::FLUSH_BYTES_MEMTABLES,
153       input_size);
154 }
155 
RecordFlushIOStats()156 void FlushJob::RecordFlushIOStats() {
157   RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
158   ThreadStatusUtil::IncreaseThreadOperationProperty(
159       ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
160   IOSTATS_RESET(bytes_written);
161 }
162 
PickMemTable()163 void FlushJob::PickMemTable() {
164   db_mutex_->AssertHeld();
165   assert(!pick_memtable_called);
166   pick_memtable_called = true;
167   // Save the contents of the earliest memtable as a new Table
168   cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
169   if (mems_.empty()) {
170     return;
171   }
172 
173   ReportFlushInputSize(mems_);
174 
175   // entries mems are (implicitly) sorted in ascending order by their created
176   // time. We will use the first memtable's `edit` to keep the meta info for
177   // this flush.
178   MemTable* m = mems_[0];
179   edit_ = m->GetEdits();
180   edit_->SetPrevLogNumber(0);
181   // SetLogNumber(log_num) indicates logs with number smaller than log_num
182   // will no longer be picked up for recovery.
183   edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
184   edit_->SetColumnFamily(cfd_->GetID());
185 
186   // path 0 for level 0 file.
187   meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
188 
189   base_ = cfd_->current();
190   base_->Ref();  // it is likely that we do not need this reference
191 }
192 
Run(LogsWithPrepTracker * prep_tracker,FileMetaData * file_meta)193 Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
194                      FileMetaData* file_meta) {
195   TEST_SYNC_POINT("FlushJob::Start");
196   db_mutex_->AssertHeld();
197   assert(pick_memtable_called);
198   AutoThreadOperationStageUpdater stage_run(
199       ThreadStatus::STAGE_FLUSH_RUN);
200   if (mems_.empty()) {
201     ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
202                      cfd_->GetName().c_str());
203     return Status::OK();
204   }
205 
206   // I/O measurement variables
207   PerfLevel prev_perf_level = PerfLevel::kEnableTime;
208   uint64_t prev_write_nanos = 0;
209   uint64_t prev_fsync_nanos = 0;
210   uint64_t prev_range_sync_nanos = 0;
211   uint64_t prev_prepare_write_nanos = 0;
212   uint64_t prev_cpu_write_nanos = 0;
213   uint64_t prev_cpu_read_nanos = 0;
214   if (measure_io_stats_) {
215     prev_perf_level = GetPerfLevel();
216     SetPerfLevel(PerfLevel::kEnableTime);
217     prev_write_nanos = IOSTATS(write_nanos);
218     prev_fsync_nanos = IOSTATS(fsync_nanos);
219     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
220     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
221     prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
222     prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
223   }
224 
225   // This will release and re-acquire the mutex.
226   Status s = WriteLevel0Table();
227 
228   if (s.ok() && cfd_->IsDropped()) {
229     s = Status::ColumnFamilyDropped("Column family dropped during compaction");
230   }
231   if ((s.ok() || s.IsColumnFamilyDropped()) &&
232       shutting_down_->load(std::memory_order_acquire)) {
233     s = Status::ShutdownInProgress("Database shutdown");
234   }
235 
236   if (!s.ok()) {
237     cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
238   } else if (write_manifest_) {
239     TEST_SYNC_POINT("FlushJob::InstallResults");
240     // Replace immutable memtable with the generated Table
241     s = cfd_->imm()->TryInstallMemtableFlushResults(
242         cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
243         meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
244         log_buffer_, &committed_flush_jobs_info_);
245   }
246 
247   if (s.ok() && file_meta != nullptr) {
248     *file_meta = meta_;
249   }
250   RecordFlushIOStats();
251 
252   // When measure_io_stats_ is true, the default 512 bytes is not enough.
253   auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
254   stream << "job" << job_context_->job_id << "event"
255          << "flush_finished";
256   stream << "output_compression"
257          << CompressionTypeToString(output_compression_);
258   stream << "lsm_state";
259   stream.StartArray();
260   auto vstorage = cfd_->current()->storage_info();
261   for (int level = 0; level < vstorage->num_levels(); ++level) {
262     stream << vstorage->NumLevelFiles(level);
263   }
264   stream.EndArray();
265   stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
266 
267   if (measure_io_stats_) {
268     if (prev_perf_level != PerfLevel::kEnableTime) {
269       SetPerfLevel(prev_perf_level);
270     }
271     stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
272     stream << "file_range_sync_nanos"
273            << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
274     stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
275     stream << "file_prepare_write_nanos"
276            << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
277     stream << "file_cpu_write_nanos"
278            << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
279     stream << "file_cpu_read_nanos"
280            << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
281   }
282 
283   return s;
284 }
285 
Cancel()286 void FlushJob::Cancel() {
287   db_mutex_->AssertHeld();
288   assert(base_ != nullptr);
289   base_->Unref();
290 }
291 
WriteLevel0Table()292 Status FlushJob::WriteLevel0Table() {
293   AutoThreadOperationStageUpdater stage_updater(
294       ThreadStatus::STAGE_FLUSH_WRITE_L0);
295   db_mutex_->AssertHeld();
296   const uint64_t start_micros = db_options_.env->NowMicros();
297   const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
298   Status s;
299   {
300     auto write_hint = cfd_->CalculateSSTWriteHint(0);
301     db_mutex_->Unlock();
302     if (log_buffer_) {
303       log_buffer_->FlushBufferToLog();
304     }
305     // memtables and range_del_iters store internal iterators over each data
306     // memtable and its associated range deletion memtable, respectively, at
307     // corresponding indexes.
308     std::vector<InternalIterator*> memtables;
309     std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
310         range_del_iters;
311     ReadOptions ro;
312     ro.total_order_seek = true;
313     Arena arena;
314     uint64_t total_num_entries = 0, total_num_deletes = 0;
315     uint64_t total_data_size = 0;
316     size_t total_memory_usage = 0;
317     for (MemTable* m : mems_) {
318       ROCKS_LOG_INFO(
319           db_options_.info_log,
320           "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
321           cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
322       memtables.push_back(m->NewIterator(ro, &arena));
323       auto* range_del_iter =
324           m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
325       if (range_del_iter != nullptr) {
326         range_del_iters.emplace_back(range_del_iter);
327       }
328       total_num_entries += m->num_entries();
329       total_num_deletes += m->num_deletes();
330       total_data_size += m->get_data_size();
331       total_memory_usage += m->ApproximateMemoryUsage();
332     }
333 
334     event_logger_->Log() << "job" << job_context_->job_id << "event"
335                          << "flush_started"
336                          << "num_memtables" << mems_.size() << "num_entries"
337                          << total_num_entries << "num_deletes"
338                          << total_num_deletes << "total_data_size"
339                          << total_data_size << "memory_usage"
340                          << total_memory_usage << "flush_reason"
341                          << GetFlushReasonString(cfd_->GetFlushReason());
342 
343     {
344       ScopedArenaIterator iter(
345           NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
346                              static_cast<int>(memtables.size()), &arena));
347       ROCKS_LOG_INFO(db_options_.info_log,
348                      "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
349                      cfd_->GetName().c_str(), job_context_->job_id,
350                      meta_.fd.GetNumber());
351 
352       TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
353                                &output_compression_);
354       int64_t _current_time = 0;
355       auto status = db_options_.env->GetCurrentTime(&_current_time);
356       // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
357       if (!status.ok()) {
358         ROCKS_LOG_WARN(
359             db_options_.info_log,
360             "Failed to get current time to populate creation_time property. "
361             "Status: %s",
362             status.ToString().c_str());
363       }
364       const uint64_t current_time = static_cast<uint64_t>(_current_time);
365 
366       uint64_t oldest_key_time =
367           mems_.front()->ApproximateOldestKeyTime();
368 
369       // It's not clear whether oldest_key_time is always available. In case
370       // it is not available, use current_time.
371       meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
372       meta_.file_creation_time = current_time;
373 
374       s = BuildTable(
375           dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
376           mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
377           std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
378           cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
379           cfd_->GetName(), existing_snapshots_,
380           earliest_write_conflict_snapshot_, snapshot_checker_,
381           output_compression_, mutable_cf_options_.sample_for_compression,
382           cfd_->ioptions()->compression_opts,
383           mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
384           TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
385           Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
386           oldest_key_time, write_hint, current_time);
387       LogFlush(db_options_.info_log);
388     }
389     ROCKS_LOG_INFO(db_options_.info_log,
390                    "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
391                    " bytes %s"
392                    "%s",
393                    cfd_->GetName().c_str(), job_context_->job_id,
394                    meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
395                    s.ToString().c_str(),
396                    meta_.marked_for_compaction ? " (needs compaction)" : "");
397 
398     if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
399       s = output_file_directory_->Fsync();
400     }
401     TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
402     db_mutex_->Lock();
403   }
404   base_->Unref();
405 
406   // Note that if file_size is zero, the file has been deleted and
407   // should not be added to the manifest.
408   if (s.ok() && meta_.fd.GetFileSize() > 0) {
409     // if we have more than 1 background thread, then we cannot
410     // insert files directly into higher levels because some other
411     // threads could be concurrently producing compacted files for
412     // that key range.
413     // Add file to L0
414     edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
415                    meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
416                    meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
417                    meta_.marked_for_compaction, meta_.oldest_blob_file_number,
418                    meta_.oldest_ancester_time, meta_.file_creation_time);
419   }
420 #ifndef ROCKSDB_LITE
421   // Piggyback FlushJobInfo on the first first flushed memtable.
422   mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
423 #endif  // !ROCKSDB_LITE
424 
425   // Note that here we treat flush as level 0 compaction in internal stats
426   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
427   stats.micros = db_options_.env->NowMicros() - start_micros;
428   stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
429   stats.bytes_written = meta_.fd.GetFileSize();
430   RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
431   cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
432   cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
433                                      meta_.fd.GetFileSize());
434   RecordFlushIOStats();
435   return s;
436 }
437 
438 #ifndef ROCKSDB_LITE
GetFlushJobInfo() const439 std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
440   db_mutex_->AssertHeld();
441   std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
442   info->cf_id = cfd_->GetID();
443   info->cf_name = cfd_->GetName();
444 
445   const uint64_t file_number = meta_.fd.GetNumber();
446   info->file_path =
447       MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
448   info->file_number = file_number;
449   info->oldest_blob_file_number = meta_.oldest_blob_file_number;
450   info->thread_id = db_options_.env->GetThreadID();
451   info->job_id = job_context_->job_id;
452   info->smallest_seqno = meta_.fd.smallest_seqno;
453   info->largest_seqno = meta_.fd.largest_seqno;
454   info->table_properties = table_properties_;
455   info->flush_reason = cfd_->GetFlushReason();
456   return info;
457 }
458 #endif  // !ROCKSDB_LITE
459 
460 }  // namespace rocksdb
461