1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 // 6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. 7 // Use of this source code is governed by a BSD-style license that can be 8 // found in the LICENSE file. See the AUTHORS file for names of contributors. 9 10 #include "db/compaction/compaction_picker_fifo.h" 11 #ifndef ROCKSDB_LITE 12 13 #include <cinttypes> 14 #include <string> 15 #include <vector> 16 #include "db/column_family.h" 17 #include "logging/log_buffer.h" 18 #include "util/string_util.h" 19 20 namespace ROCKSDB_NAMESPACE { 21 namespace { 22 uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) { 23 uint64_t total_size = 0; 24 for (const auto& f : files) { 25 total_size += f->fd.file_size; 26 } 27 return total_size; 28 } 29 } // anonymous namespace 30 31 bool FIFOCompactionPicker::NeedsCompaction( 32 const VersionStorageInfo* vstorage) const { 33 const int kLevel0 = 0; 34 return vstorage->CompactionScore(kLevel0) >= 1; 35 } 36 37 Compaction* FIFOCompactionPicker::PickTTLCompaction( 38 const std::string& cf_name, const MutableCFOptions& mutable_cf_options, 39 VersionStorageInfo* vstorage, LogBuffer* log_buffer) { 40 assert(mutable_cf_options.ttl > 0); 41 42 const int kLevel0 = 0; 43 const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0); 44 uint64_t total_size = GetTotalFilesSize(level_files); 45 46 int64_t _current_time; 47 auto status = ioptions_.env->GetCurrentTime(&_current_time); 48 if (!status.ok()) { 49 ROCKS_LOG_BUFFER(log_buffer, 50 "[%s] FIFO compaction: Couldn't get current time: %s. " 51 "Not doing compactions based on TTL. ", 52 cf_name.c_str(), status.ToString().c_str()); 53 return nullptr; 54 } 55 const uint64_t current_time = static_cast<uint64_t>(_current_time); 56 57 if (!level0_compactions_in_progress_.empty()) { 58 ROCKS_LOG_BUFFER( 59 log_buffer, 60 "[%s] FIFO compaction: Already executing compaction. No need " 61 "to run parallel compactions since compactions are very fast", 62 cf_name.c_str()); 63 return nullptr; 64 } 65 66 std::vector<CompactionInputFiles> inputs; 67 inputs.emplace_back(); 68 inputs[0].level = 0; 69 70 // avoid underflow 71 if (current_time > mutable_cf_options.ttl) { 72 for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { 73 auto f = *ritr; 74 if (f->fd.table_reader != nullptr && 75 f->fd.table_reader->GetTableProperties() != nullptr) { 76 auto creation_time = 77 f->fd.table_reader->GetTableProperties()->creation_time; 78 if (creation_time == 0 || 79 creation_time >= (current_time - mutable_cf_options.ttl)) { 80 break; 81 } 82 total_size -= f->compensated_file_size; 83 inputs[0].files.push_back(f); 84 } 85 } 86 } 87 88 // Return a nullptr and proceed to size-based FIFO compaction if: 89 // 1. there are no files older than ttl OR 90 // 2. there are a few files older than ttl, but deleting them will not bring 91 // the total size to be less than max_table_files_size threshold. 92 if (inputs[0].files.empty() || 93 total_size > 94 mutable_cf_options.compaction_options_fifo.max_table_files_size) { 95 return nullptr; 96 } 97 98 for (const auto& f : inputs[0].files) { 99 ROCKS_LOG_BUFFER(log_buffer, 100 "[%s] FIFO compaction: picking file %" PRIu64 101 " with creation time %" PRIu64 " for deletion", 102 cf_name.c_str(), f->fd.GetNumber(), 103 f->fd.table_reader->GetTableProperties()->creation_time); 104 } 105 106 Compaction* c = new Compaction( 107 vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, 108 kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0, 109 {}, /* is manual */ false, vstorage->CompactionScore(0), 110 /* is deletion compaction */ true, CompactionReason::kFIFOTtl); 111 return c; 112 } 113 114 Compaction* FIFOCompactionPicker::PickSizeCompaction( 115 const std::string& cf_name, const MutableCFOptions& mutable_cf_options, 116 VersionStorageInfo* vstorage, LogBuffer* log_buffer) { 117 const int kLevel0 = 0; 118 const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0); 119 uint64_t total_size = GetTotalFilesSize(level_files); 120 121 if (total_size <= 122 mutable_cf_options.compaction_options_fifo.max_table_files_size || 123 level_files.size() == 0) { 124 // total size not exceeded 125 if (mutable_cf_options.compaction_options_fifo.allow_compaction && 126 level_files.size() > 0) { 127 CompactionInputFiles comp_inputs; 128 // try to prevent same files from being compacted multiple times, which 129 // could produce large files that may never TTL-expire. Achieve this by 130 // disallowing compactions with files larger than memtable (inflate its 131 // size by 10% to account for uncompressed L0 files that may have size 132 // slightly greater than memtable size limit). 133 size_t max_compact_bytes_per_del_file = 134 static_cast<size_t>(MultiplyCheckOverflow( 135 static_cast<uint64_t>(mutable_cf_options.write_buffer_size), 136 1.1)); 137 if (FindIntraL0Compaction( 138 level_files, 139 mutable_cf_options 140 .level0_file_num_compaction_trigger /* min_files_to_compact */ 141 , 142 max_compact_bytes_per_del_file, 143 mutable_cf_options.max_compaction_bytes, &comp_inputs)) { 144 Compaction* c = new Compaction( 145 vstorage, ioptions_, mutable_cf_options, {comp_inputs}, 0, 146 16 * 1024 * 1024 /* output file size limit */, 147 0 /* max compaction bytes, not applicable */, 148 0 /* output path ID */, mutable_cf_options.compression, 149 ioptions_.compression_opts, 0 /* max_subcompactions */, {}, 150 /* is manual */ false, vstorage->CompactionScore(0), 151 /* is deletion compaction */ false, 152 CompactionReason::kFIFOReduceNumFiles); 153 return c; 154 } 155 } 156 157 ROCKS_LOG_BUFFER( 158 log_buffer, 159 "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 160 ", max size %" PRIu64 "\n", 161 cf_name.c_str(), total_size, 162 mutable_cf_options.compaction_options_fifo.max_table_files_size); 163 return nullptr; 164 } 165 166 if (!level0_compactions_in_progress_.empty()) { 167 ROCKS_LOG_BUFFER( 168 log_buffer, 169 "[%s] FIFO compaction: Already executing compaction. No need " 170 "to run parallel compactions since compactions are very fast", 171 cf_name.c_str()); 172 return nullptr; 173 } 174 175 std::vector<CompactionInputFiles> inputs; 176 inputs.emplace_back(); 177 inputs[0].level = 0; 178 179 for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { 180 auto f = *ritr; 181 total_size -= f->compensated_file_size; 182 inputs[0].files.push_back(f); 183 char tmp_fsize[16]; 184 AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); 185 ROCKS_LOG_BUFFER(log_buffer, 186 "[%s] FIFO compaction: picking file %" PRIu64 187 " with size %s for deletion", 188 cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); 189 if (total_size <= 190 mutable_cf_options.compaction_options_fifo.max_table_files_size) { 191 break; 192 } 193 } 194 195 Compaction* c = new Compaction( 196 vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0, 197 kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0, 198 {}, /* is manual */ false, vstorage->CompactionScore(0), 199 /* is deletion compaction */ true, CompactionReason::kFIFOMaxSize); 200 return c; 201 } 202 203 Compaction* FIFOCompactionPicker::PickCompaction( 204 const std::string& cf_name, const MutableCFOptions& mutable_cf_options, 205 VersionStorageInfo* vstorage, LogBuffer* log_buffer, 206 SequenceNumber /*earliest_memtable_seqno*/) { 207 assert(vstorage->num_levels() == 1); 208 209 Compaction* c = nullptr; 210 if (mutable_cf_options.ttl > 0) { 211 c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); 212 } 213 if (c == nullptr) { 214 c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer); 215 } 216 RegisterCompaction(c); 217 return c; 218 } 219 220 Compaction* FIFOCompactionPicker::CompactRange( 221 const std::string& cf_name, const MutableCFOptions& mutable_cf_options, 222 VersionStorageInfo* vstorage, int input_level, int output_level, 223 const CompactRangeOptions& /*compact_range_options*/, 224 const InternalKey* /*begin*/, const InternalKey* /*end*/, 225 InternalKey** compaction_end, bool* /*manual_conflict*/, 226 uint64_t /*max_file_num_to_ignore*/) { 227 #ifdef NDEBUG 228 (void)input_level; 229 (void)output_level; 230 #endif 231 assert(input_level == 0); 232 assert(output_level == 0); 233 *compaction_end = nullptr; 234 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log); 235 Compaction* c = 236 PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer); 237 log_buffer.FlushBufferToLog(); 238 return c; 239 } 240 241 } // namespace ROCKSDB_NAMESPACE 242 #endif // !ROCKSDB_LITE 243