1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/flush_job.h"
11
12 #include <cinttypes>
13
14 #include <algorithm>
15 #include <vector>
16
17 #include "db/builder.h"
18 #include "db/db_iter.h"
19 #include "db/dbformat.h"
20 #include "db/event_helpers.h"
21 #include "db/log_reader.h"
22 #include "db/log_writer.h"
23 #include "db/memtable.h"
24 #include "db/memtable_list.h"
25 #include "db/merge_context.h"
26 #include "db/range_tombstone_fragmenter.h"
27 #include "db/version_set.h"
28 #include "file/file_util.h"
29 #include "file/filename.h"
30 #include "logging/event_logger.h"
31 #include "logging/log_buffer.h"
32 #include "logging/logging.h"
33 #include "monitoring/iostats_context_imp.h"
34 #include "monitoring/perf_context_imp.h"
35 #include "monitoring/thread_status_util.h"
36 #include "port/port.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/statistics.h"
40 #include "rocksdb/status.h"
41 #include "rocksdb/table.h"
42 #include "table/block_based/block.h"
43 #include "table/block_based/block_based_table_factory.h"
44 #include "table/merging_iterator.h"
45 #include "table/table_builder.h"
46 #include "table/two_level_iterator.h"
47 #include "test_util/sync_point.h"
48 #include "util/coding.h"
49 #include "util/mutexlock.h"
50 #include "util/stop_watch.h"
51
52 namespace ROCKSDB_NAMESPACE {
53
GetFlushReasonString(FlushReason flush_reason)54 const char* GetFlushReasonString (FlushReason flush_reason) {
55 switch (flush_reason) {
56 case FlushReason::kOthers:
57 return "Other Reasons";
58 case FlushReason::kGetLiveFiles:
59 return "Get Live Files";
60 case FlushReason::kShutDown:
61 return "Shut down";
62 case FlushReason::kExternalFileIngestion:
63 return "External File Ingestion";
64 case FlushReason::kManualCompaction:
65 return "Manual Compaction";
66 case FlushReason::kWriteBufferManager:
67 return "Write Buffer Manager";
68 case FlushReason::kWriteBufferFull:
69 return "Write Buffer Full";
70 case FlushReason::kTest:
71 return "Test";
72 case FlushReason::kDeleteFiles:
73 return "Delete Files";
74 case FlushReason::kAutoCompaction:
75 return "Auto Compaction";
76 case FlushReason::kManualFlush:
77 return "Manual Flush";
78 case FlushReason::kErrorRecovery:
79 return "Error Recovery";
80 default:
81 return "Invalid";
82 }
83 }
84
FlushJob(const std::string & dbname,ColumnFamilyData * cfd,const ImmutableDBOptions & db_options,const MutableCFOptions & mutable_cf_options,const uint64_t * max_memtable_id,const FileOptions & file_options,VersionSet * versions,InstrumentedMutex * db_mutex,std::atomic<bool> * shutting_down,std::vector<SequenceNumber> existing_snapshots,SequenceNumber earliest_write_conflict_snapshot,SnapshotChecker * snapshot_checker,JobContext * job_context,LogBuffer * log_buffer,Directory * db_directory,Directory * output_file_directory,CompressionType output_compression,Statistics * stats,EventLogger * event_logger,bool measure_io_stats,const bool sync_output_directory,const bool write_manifest,Env::Priority thread_pri)85 FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
86 const ImmutableDBOptions& db_options,
87 const MutableCFOptions& mutable_cf_options,
88 const uint64_t* max_memtable_id,
89 const FileOptions& file_options, VersionSet* versions,
90 InstrumentedMutex* db_mutex,
91 std::atomic<bool>* shutting_down,
92 std::vector<SequenceNumber> existing_snapshots,
93 SequenceNumber earliest_write_conflict_snapshot,
94 SnapshotChecker* snapshot_checker, JobContext* job_context,
95 LogBuffer* log_buffer, Directory* db_directory,
96 Directory* output_file_directory,
97 CompressionType output_compression, Statistics* stats,
98 EventLogger* event_logger, bool measure_io_stats,
99 const bool sync_output_directory, const bool write_manifest,
100 Env::Priority thread_pri)
101 : dbname_(dbname),
102 cfd_(cfd),
103 db_options_(db_options),
104 mutable_cf_options_(mutable_cf_options),
105 max_memtable_id_(max_memtable_id),
106 file_options_(file_options),
107 versions_(versions),
108 db_mutex_(db_mutex),
109 shutting_down_(shutting_down),
110 existing_snapshots_(std::move(existing_snapshots)),
111 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
112 snapshot_checker_(snapshot_checker),
113 job_context_(job_context),
114 log_buffer_(log_buffer),
115 db_directory_(db_directory),
116 output_file_directory_(output_file_directory),
117 output_compression_(output_compression),
118 stats_(stats),
119 event_logger_(event_logger),
120 measure_io_stats_(measure_io_stats),
121 sync_output_directory_(sync_output_directory),
122 write_manifest_(write_manifest),
123 edit_(nullptr),
124 base_(nullptr),
125 pick_memtable_called(false),
126 thread_pri_(thread_pri) {
127 // Update the thread status to indicate flush.
128 ReportStartedFlush();
129 TEST_SYNC_POINT("FlushJob::FlushJob()");
130 }
131
~FlushJob()132 FlushJob::~FlushJob() {
133 ThreadStatusUtil::ResetThreadStatus();
134 }
135
ReportStartedFlush()136 void FlushJob::ReportStartedFlush() {
137 ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
138 db_options_.enable_thread_tracking);
139 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
140 ThreadStatusUtil::SetThreadOperationProperty(
141 ThreadStatus::COMPACTION_JOB_ID,
142 job_context_->job_id);
143 IOSTATS_RESET(bytes_written);
144 }
145
ReportFlushInputSize(const autovector<MemTable * > & mems)146 void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
147 uint64_t input_size = 0;
148 for (auto* mem : mems) {
149 input_size += mem->ApproximateMemoryUsage();
150 }
151 ThreadStatusUtil::IncreaseThreadOperationProperty(
152 ThreadStatus::FLUSH_BYTES_MEMTABLES,
153 input_size);
154 }
155
RecordFlushIOStats()156 void FlushJob::RecordFlushIOStats() {
157 RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
158 ThreadStatusUtil::IncreaseThreadOperationProperty(
159 ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
160 IOSTATS_RESET(bytes_written);
161 }
162
PickMemTable()163 void FlushJob::PickMemTable() {
164 db_mutex_->AssertHeld();
165 assert(!pick_memtable_called);
166 pick_memtable_called = true;
167 // Save the contents of the earliest memtable as a new Table
168 cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
169 if (mems_.empty()) {
170 return;
171 }
172
173 ReportFlushInputSize(mems_);
174
175 // entries mems are (implicitly) sorted in ascending order by their created
176 // time. We will use the first memtable's `edit` to keep the meta info for
177 // this flush.
178 MemTable* m = mems_[0];
179 edit_ = m->GetEdits();
180 edit_->SetPrevLogNumber(0);
181 // SetLogNumber(log_num) indicates logs with number smaller than log_num
182 // will no longer be picked up for recovery.
183 edit_->SetLogNumber(mems_.back()->GetNextLogNumber());
184 edit_->SetColumnFamily(cfd_->GetID());
185
186 // path 0 for level 0 file.
187 meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
188
189 base_ = cfd_->current();
190 base_->Ref(); // it is likely that we do not need this reference
191 }
192
Run(LogsWithPrepTracker * prep_tracker,FileMetaData * file_meta)193 Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
194 FileMetaData* file_meta) {
195 TEST_SYNC_POINT("FlushJob::Start");
196 db_mutex_->AssertHeld();
197 assert(pick_memtable_called);
198 AutoThreadOperationStageUpdater stage_run(
199 ThreadStatus::STAGE_FLUSH_RUN);
200 if (mems_.empty()) {
201 ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
202 cfd_->GetName().c_str());
203 return Status::OK();
204 }
205
206 // I/O measurement variables
207 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
208 uint64_t prev_write_nanos = 0;
209 uint64_t prev_fsync_nanos = 0;
210 uint64_t prev_range_sync_nanos = 0;
211 uint64_t prev_prepare_write_nanos = 0;
212 uint64_t prev_cpu_write_nanos = 0;
213 uint64_t prev_cpu_read_nanos = 0;
214 if (measure_io_stats_) {
215 prev_perf_level = GetPerfLevel();
216 SetPerfLevel(PerfLevel::kEnableTime);
217 prev_write_nanos = IOSTATS(write_nanos);
218 prev_fsync_nanos = IOSTATS(fsync_nanos);
219 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
220 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
221 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
222 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
223 }
224
225 // This will release and re-acquire the mutex.
226 Status s = WriteLevel0Table();
227
228 if (s.ok() && cfd_->IsDropped()) {
229 s = Status::ColumnFamilyDropped("Column family dropped during compaction");
230 }
231 if ((s.ok() || s.IsColumnFamilyDropped()) &&
232 shutting_down_->load(std::memory_order_acquire)) {
233 s = Status::ShutdownInProgress("Database shutdown");
234 }
235
236 if (!s.ok()) {
237 cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
238 } else if (write_manifest_) {
239 TEST_SYNC_POINT("FlushJob::InstallResults");
240 // Replace immutable memtable with the generated Table
241 s = cfd_->imm()->TryInstallMemtableFlushResults(
242 cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
243 meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
244 log_buffer_, &committed_flush_jobs_info_);
245 }
246
247 if (s.ok() && file_meta != nullptr) {
248 *file_meta = meta_;
249 }
250 RecordFlushIOStats();
251
252 // When measure_io_stats_ is true, the default 512 bytes is not enough.
253 auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
254 stream << "job" << job_context_->job_id << "event"
255 << "flush_finished";
256 stream << "output_compression"
257 << CompressionTypeToString(output_compression_);
258 stream << "lsm_state";
259 stream.StartArray();
260 auto vstorage = cfd_->current()->storage_info();
261 for (int level = 0; level < vstorage->num_levels(); ++level) {
262 stream << vstorage->NumLevelFiles(level);
263 }
264 stream.EndArray();
265 stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
266
267 if (measure_io_stats_) {
268 if (prev_perf_level != PerfLevel::kEnableTime) {
269 SetPerfLevel(prev_perf_level);
270 }
271 stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
272 stream << "file_range_sync_nanos"
273 << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
274 stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
275 stream << "file_prepare_write_nanos"
276 << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
277 stream << "file_cpu_write_nanos"
278 << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
279 stream << "file_cpu_read_nanos"
280 << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
281 }
282
283 return s;
284 }
285
Cancel()286 void FlushJob::Cancel() {
287 db_mutex_->AssertHeld();
288 assert(base_ != nullptr);
289 base_->Unref();
290 }
291
WriteLevel0Table()292 Status FlushJob::WriteLevel0Table() {
293 AutoThreadOperationStageUpdater stage_updater(
294 ThreadStatus::STAGE_FLUSH_WRITE_L0);
295 db_mutex_->AssertHeld();
296 const uint64_t start_micros = db_options_.env->NowMicros();
297 const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
298 Status s;
299 {
300 auto write_hint = cfd_->CalculateSSTWriteHint(0);
301 db_mutex_->Unlock();
302 if (log_buffer_) {
303 log_buffer_->FlushBufferToLog();
304 }
305 // memtables and range_del_iters store internal iterators over each data
306 // memtable and its associated range deletion memtable, respectively, at
307 // corresponding indexes.
308 std::vector<InternalIterator*> memtables;
309 std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
310 range_del_iters;
311 ReadOptions ro;
312 ro.total_order_seek = true;
313 Arena arena;
314 uint64_t total_num_entries = 0, total_num_deletes = 0;
315 uint64_t total_data_size = 0;
316 size_t total_memory_usage = 0;
317 for (MemTable* m : mems_) {
318 ROCKS_LOG_INFO(
319 db_options_.info_log,
320 "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
321 cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
322 memtables.push_back(m->NewIterator(ro, &arena));
323 auto* range_del_iter =
324 m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
325 if (range_del_iter != nullptr) {
326 range_del_iters.emplace_back(range_del_iter);
327 }
328 total_num_entries += m->num_entries();
329 total_num_deletes += m->num_deletes();
330 total_data_size += m->get_data_size();
331 total_memory_usage += m->ApproximateMemoryUsage();
332 }
333
334 event_logger_->Log() << "job" << job_context_->job_id << "event"
335 << "flush_started"
336 << "num_memtables" << mems_.size() << "num_entries"
337 << total_num_entries << "num_deletes"
338 << total_num_deletes << "total_data_size"
339 << total_data_size << "memory_usage"
340 << total_memory_usage << "flush_reason"
341 << GetFlushReasonString(cfd_->GetFlushReason());
342
343 {
344 ScopedArenaIterator iter(
345 NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
346 static_cast<int>(memtables.size()), &arena));
347 ROCKS_LOG_INFO(db_options_.info_log,
348 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
349 cfd_->GetName().c_str(), job_context_->job_id,
350 meta_.fd.GetNumber());
351
352 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
353 &output_compression_);
354 int64_t _current_time = 0;
355 auto status = db_options_.env->GetCurrentTime(&_current_time);
356 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
357 if (!status.ok()) {
358 ROCKS_LOG_WARN(
359 db_options_.info_log,
360 "Failed to get current time to populate creation_time property. "
361 "Status: %s",
362 status.ToString().c_str());
363 }
364 const uint64_t current_time = static_cast<uint64_t>(_current_time);
365
366 uint64_t oldest_key_time =
367 mems_.front()->ApproximateOldestKeyTime();
368
369 // It's not clear whether oldest_key_time is always available. In case
370 // it is not available, use current_time.
371 meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
372 meta_.file_creation_time = current_time;
373
374 s = BuildTable(
375 dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
376 mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
377 std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
378 cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
379 cfd_->GetName(), existing_snapshots_,
380 earliest_write_conflict_snapshot_, snapshot_checker_,
381 output_compression_, mutable_cf_options_.sample_for_compression,
382 cfd_->ioptions()->compression_opts,
383 mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
384 TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
385 Env::IO_HIGH, &table_properties_, 0 /* level */,
386 meta_.oldest_ancester_time, oldest_key_time, write_hint,
387 current_time);
388 LogFlush(db_options_.info_log);
389 }
390 ROCKS_LOG_INFO(db_options_.info_log,
391 "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
392 " bytes %s"
393 "%s",
394 cfd_->GetName().c_str(), job_context_->job_id,
395 meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
396 s.ToString().c_str(),
397 meta_.marked_for_compaction ? " (needs compaction)" : "");
398
399 if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
400 s = output_file_directory_->Fsync();
401 }
402 TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
403 db_mutex_->Lock();
404 }
405 base_->Unref();
406
407 // Note that if file_size is zero, the file has been deleted and
408 // should not be added to the manifest.
409 if (s.ok() && meta_.fd.GetFileSize() > 0) {
410 // if we have more than 1 background thread, then we cannot
411 // insert files directly into higher levels because some other
412 // threads could be concurrently producing compacted files for
413 // that key range.
414 // Add file to L0
415 edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
416 meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
417 meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
418 meta_.marked_for_compaction, meta_.oldest_blob_file_number,
419 meta_.oldest_ancester_time, meta_.file_creation_time,
420 meta_.file_checksum, meta_.file_checksum_func_name);
421 }
422 #ifndef ROCKSDB_LITE
423 // Piggyback FlushJobInfo on the first first flushed memtable.
424 mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
425 #endif // !ROCKSDB_LITE
426
427 // Note that here we treat flush as level 0 compaction in internal stats
428 InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
429 stats.micros = db_options_.env->NowMicros() - start_micros;
430 stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
431 stats.bytes_written = meta_.fd.GetFileSize();
432 RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
433 cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
434 cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
435 meta_.fd.GetFileSize());
436 RecordFlushIOStats();
437 return s;
438 }
439
440 #ifndef ROCKSDB_LITE
GetFlushJobInfo() const441 std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
442 db_mutex_->AssertHeld();
443 std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
444 info->cf_id = cfd_->GetID();
445 info->cf_name = cfd_->GetName();
446
447 const uint64_t file_number = meta_.fd.GetNumber();
448 info->file_path =
449 MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
450 info->file_number = file_number;
451 info->oldest_blob_file_number = meta_.oldest_blob_file_number;
452 info->thread_id = db_options_.env->GetThreadID();
453 info->job_id = job_context_->job_id;
454 info->smallest_seqno = meta_.fd.smallest_seqno;
455 info->largest_seqno = meta_.fd.largest_seqno;
456 info->table_properties = table_properties_;
457 info->flush_reason = cfd_->GetFlushReason();
458 return info;
459 }
460 #endif // !ROCKSDB_LITE
461
462 } // namespace ROCKSDB_NAMESPACE
463