1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/compaction/compaction_picker_fifo.h"
11 #ifndef ROCKSDB_LITE
12 
13 #include <cinttypes>
14 #include <string>
15 #include <vector>
16 #include "db/column_family.h"
17 #include "logging/log_buffer.h"
18 #include "util/string_util.h"
19 
20 namespace ROCKSDB_NAMESPACE {
21 namespace {
22 uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) {
23   uint64_t total_size = 0;
24   for (const auto& f : files) {
25     total_size += f->fd.file_size;
26   }
27   return total_size;
28 }
29 }  // anonymous namespace
30 
31 bool FIFOCompactionPicker::NeedsCompaction(
32     const VersionStorageInfo* vstorage) const {
33   const int kLevel0 = 0;
34   return vstorage->CompactionScore(kLevel0) >= 1;
35 }
36 
37 Compaction* FIFOCompactionPicker::PickTTLCompaction(
38     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
39     VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
40   assert(mutable_cf_options.ttl > 0);
41 
42   const int kLevel0 = 0;
43   const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
44   uint64_t total_size = GetTotalFilesSize(level_files);
45 
46   int64_t _current_time;
47   auto status = ioptions_.env->GetCurrentTime(&_current_time);
48   if (!status.ok()) {
49     ROCKS_LOG_BUFFER(log_buffer,
50                      "[%s] FIFO compaction: Couldn't get current time: %s. "
51                      "Not doing compactions based on TTL. ",
52                      cf_name.c_str(), status.ToString().c_str());
53     return nullptr;
54   }
55   const uint64_t current_time = static_cast<uint64_t>(_current_time);
56 
57   if (!level0_compactions_in_progress_.empty()) {
58     ROCKS_LOG_BUFFER(
59         log_buffer,
60         "[%s] FIFO compaction: Already executing compaction. No need "
61         "to run parallel compactions since compactions are very fast",
62         cf_name.c_str());
63     return nullptr;
64   }
65 
66   std::vector<CompactionInputFiles> inputs;
67   inputs.emplace_back();
68   inputs[0].level = 0;
69 
70   // avoid underflow
71   if (current_time > mutable_cf_options.ttl) {
72     for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
73       auto f = *ritr;
74       if (f->fd.table_reader != nullptr &&
75           f->fd.table_reader->GetTableProperties() != nullptr) {
76         auto creation_time =
77             f->fd.table_reader->GetTableProperties()->creation_time;
78         if (creation_time == 0 ||
79             creation_time >= (current_time - mutable_cf_options.ttl)) {
80           break;
81         }
82         total_size -= f->compensated_file_size;
83         inputs[0].files.push_back(f);
84       }
85     }
86   }
87 
88   // Return a nullptr and proceed to size-based FIFO compaction if:
89   // 1. there are no files older than ttl OR
90   // 2. there are a few files older than ttl, but deleting them will not bring
91   //    the total size to be less than max_table_files_size threshold.
92   if (inputs[0].files.empty() ||
93       total_size >
94           mutable_cf_options.compaction_options_fifo.max_table_files_size) {
95     return nullptr;
96   }
97 
98   for (const auto& f : inputs[0].files) {
99     ROCKS_LOG_BUFFER(log_buffer,
100                      "[%s] FIFO compaction: picking file %" PRIu64
101                      " with creation time %" PRIu64 " for deletion",
102                      cf_name.c_str(), f->fd.GetNumber(),
103                      f->fd.table_reader->GetTableProperties()->creation_time);
104   }
105 
106   Compaction* c = new Compaction(
107       vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
108       kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
109       {}, /* is manual */ false, vstorage->CompactionScore(0),
110       /* is deletion compaction */ true, CompactionReason::kFIFOTtl);
111   return c;
112 }
113 
114 Compaction* FIFOCompactionPicker::PickSizeCompaction(
115     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
116     VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
117   const int kLevel0 = 0;
118   const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
119   uint64_t total_size = GetTotalFilesSize(level_files);
120 
121   if (total_size <=
122           mutable_cf_options.compaction_options_fifo.max_table_files_size ||
123       level_files.size() == 0) {
124     // total size not exceeded
125     if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
126         level_files.size() > 0) {
127       CompactionInputFiles comp_inputs;
128       // try to prevent same files from being compacted multiple times, which
129       // could produce large files that may never TTL-expire. Achieve this by
130       // disallowing compactions with files larger than memtable (inflate its
131       // size by 10% to account for uncompressed L0 files that may have size
132       // slightly greater than memtable size limit).
133       size_t max_compact_bytes_per_del_file =
134           static_cast<size_t>(MultiplyCheckOverflow(
135               static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
136               1.1));
137       if (FindIntraL0Compaction(
138               level_files,
139               mutable_cf_options
140                   .level0_file_num_compaction_trigger /* min_files_to_compact */
141               ,
142               max_compact_bytes_per_del_file,
143               mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
144         Compaction* c = new Compaction(
145             vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0,
146             16 * 1024 * 1024 /* output file size limit */,
147             0 /* max compaction bytes, not applicable */,
148             0 /* output path ID */, mutable_cf_options.compression,
149             ioptions_.compression_opts, 0 /* max_subcompactions */, {},
150             /* is manual */ false, vstorage->CompactionScore(0),
151             /* is deletion compaction */ false,
152             CompactionReason::kFIFOReduceNumFiles);
153         return c;
154       }
155     }
156 
157     ROCKS_LOG_BUFFER(
158         log_buffer,
159         "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
160         ", max size %" PRIu64 "\n",
161         cf_name.c_str(), total_size,
162         mutable_cf_options.compaction_options_fifo.max_table_files_size);
163     return nullptr;
164   }
165 
166   if (!level0_compactions_in_progress_.empty()) {
167     ROCKS_LOG_BUFFER(
168         log_buffer,
169         "[%s] FIFO compaction: Already executing compaction. No need "
170         "to run parallel compactions since compactions are very fast",
171         cf_name.c_str());
172     return nullptr;
173   }
174 
175   std::vector<CompactionInputFiles> inputs;
176   inputs.emplace_back();
177   inputs[0].level = 0;
178 
179   for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
180     auto f = *ritr;
181     total_size -= f->compensated_file_size;
182     inputs[0].files.push_back(f);
183     char tmp_fsize[16];
184     AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
185     ROCKS_LOG_BUFFER(log_buffer,
186                      "[%s] FIFO compaction: picking file %" PRIu64
187                      " with size %s for deletion",
188                      cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
189     if (total_size <=
190         mutable_cf_options.compaction_options_fifo.max_table_files_size) {
191       break;
192     }
193   }
194 
195   Compaction* c = new Compaction(
196       vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
197       kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
198       {}, /* is manual */ false, vstorage->CompactionScore(0),
199       /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
200   return c;
201 }
202 
203 Compaction* FIFOCompactionPicker::PickCompaction(
204     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
205     VersionStorageInfo* vstorage, LogBuffer* log_buffer,
206     SequenceNumber /*earliest_memtable_seqno*/) {
207   assert(vstorage->num_levels() == 1);
208 
209   Compaction* c = nullptr;
210   if (mutable_cf_options.ttl > 0) {
211     c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
212   }
213   if (c == nullptr) {
214     c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
215   }
216   RegisterCompaction(c);
217   return c;
218 }
219 
220 Compaction* FIFOCompactionPicker::CompactRange(
221     const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
222     VersionStorageInfo* vstorage, int input_level, int output_level,
223     const CompactRangeOptions& /*compact_range_options*/,
224     const InternalKey* /*begin*/, const InternalKey* /*end*/,
225     InternalKey** compaction_end, bool* /*manual_conflict*/,
226     uint64_t /*max_file_num_to_ignore*/) {
227 #ifdef NDEBUG
228   (void)input_level;
229   (void)output_level;
230 #endif
231   assert(input_level == 0);
232   assert(output_level == 0);
233   *compaction_end = nullptr;
234   LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
235   Compaction* c =
236       PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer);
237   log_buffer.FlushBufferToLog();
238   return c;
239 }
240 
241 }  // namespace ROCKSDB_NAMESPACE
242 #endif  // !ROCKSDB_LITE
243