1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/column_family.h"
11
12 #include <algorithm>
13 #include <cinttypes>
14 #include <limits>
15 #include <string>
16 #include <vector>
17
18 #include "db/compaction/compaction_picker.h"
19 #include "db/compaction/compaction_picker_fifo.h"
20 #include "db/compaction/compaction_picker_level.h"
21 #include "db/compaction/compaction_picker_universal.h"
22 #include "db/db_impl/db_impl.h"
23 #include "db/internal_stats.h"
24 #include "db/job_context.h"
25 #include "db/range_del_aggregator.h"
26 #include "db/table_properties_collector.h"
27 #include "db/version_set.h"
28 #include "db/write_controller.h"
29 #include "file/sst_file_manager_impl.h"
30 #include "memtable/hash_skiplist_rep.h"
31 #include "monitoring/thread_status_util.h"
32 #include "options/options_helper.h"
33 #include "port/port.h"
34 #include "table/block_based/block_based_table_factory.h"
35 #include "table/merging_iterator.h"
36 #include "util/autovector.h"
37 #include "util/compression.h"
38
39 namespace ROCKSDB_NAMESPACE {
40
ColumnFamilyHandleImpl(ColumnFamilyData * column_family_data,DBImpl * db,InstrumentedMutex * mutex)41 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42 ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
43 : cfd_(column_family_data), db_(db), mutex_(mutex) {
44 if (cfd_ != nullptr) {
45 cfd_->Ref();
46 }
47 }
48
~ColumnFamilyHandleImpl()49 ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
50 if (cfd_ != nullptr) {
51 #ifndef ROCKSDB_LITE
52 for (auto& listener : cfd_->ioptions()->listeners) {
53 listener->OnColumnFamilyHandleDeletionStarted(this);
54 }
55 #endif // ROCKSDB_LITE
56 // Job id == 0 means that this is not our background process, but rather
57 // user thread
58 // Need to hold some shared pointers owned by the initial_cf_options
59 // before final cleaning up finishes.
60 ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61 JobContext job_context(0);
62 mutex_->Lock();
63 bool dropped = cfd_->IsDropped();
64 if (cfd_->UnrefAndTryDelete()) {
65 if (dropped) {
66 db_->FindObsoleteFiles(&job_context, false, true);
67 }
68 }
69 mutex_->Unlock();
70 if (job_context.HaveSomethingToDelete()) {
71 bool defer_purge =
72 db_->immutable_db_options().avoid_unnecessary_blocking_io;
73 db_->PurgeObsoleteFiles(job_context, defer_purge);
74 if (defer_purge) {
75 mutex_->Lock();
76 db_->SchedulePurge();
77 mutex_->Unlock();
78 }
79 }
80 job_context.Clean();
81 }
82 }
83
GetID() const84 uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
85
GetName() const86 const std::string& ColumnFamilyHandleImpl::GetName() const {
87 return cfd()->GetName();
88 }
89
GetDescriptor(ColumnFamilyDescriptor * desc)90 Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
91 #ifndef ROCKSDB_LITE
92 // accessing mutable cf-options requires db mutex.
93 InstrumentedMutexLock l(mutex_);
94 *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95 return Status::OK();
96 #else
97 (void)desc;
98 return Status::NotSupported();
99 #endif // !ROCKSDB_LITE
100 }
101
GetComparator() const102 const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103 return cfd()->user_comparator();
104 }
105
GetIntTblPropCollectorFactory(const ImmutableCFOptions & ioptions,std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories)106 void GetIntTblPropCollectorFactory(
107 const ImmutableCFOptions& ioptions,
108 std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
109 int_tbl_prop_collector_factories) {
110 auto& collector_factories = ioptions.table_properties_collector_factories;
111 for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112 ++i) {
113 assert(collector_factories[i]);
114 int_tbl_prop_collector_factories->emplace_back(
115 new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
116 }
117 }
118
CheckCompressionSupported(const ColumnFamilyOptions & cf_options)119 Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
120 if (!cf_options.compression_per_level.empty()) {
121 for (size_t level = 0; level < cf_options.compression_per_level.size();
122 ++level) {
123 if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
124 return Status::InvalidArgument(
125 "Compression type " +
126 CompressionTypeToString(cf_options.compression_per_level[level]) +
127 " is not linked with the binary.");
128 }
129 }
130 } else {
131 if (!CompressionTypeSupported(cf_options.compression)) {
132 return Status::InvalidArgument(
133 "Compression type " +
134 CompressionTypeToString(cf_options.compression) +
135 " is not linked with the binary.");
136 }
137 }
138 if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139 if (!ZSTD_TrainDictionarySupported()) {
140 return Status::InvalidArgument(
141 "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
142 "is not linked with the binary.");
143 }
144 if (cf_options.compression_opts.max_dict_bytes == 0) {
145 return Status::InvalidArgument(
146 "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
147 "should be nonzero if we're using zstd's dictionary generator.");
148 }
149 }
150 return Status::OK();
151 }
152
CheckConcurrentWritesSupported(const ColumnFamilyOptions & cf_options)153 Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
154 if (cf_options.inplace_update_support) {
155 return Status::InvalidArgument(
156 "In-place memtable updates (inplace_update_support) is not compatible "
157 "with concurrent writes (allow_concurrent_memtable_write)");
158 }
159 if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
160 return Status::InvalidArgument(
161 "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
162 }
163 return Status::OK();
164 }
165
CheckCFPathsSupported(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)166 Status CheckCFPathsSupported(const DBOptions& db_options,
167 const ColumnFamilyOptions& cf_options) {
168 // More than one cf_paths are supported only in universal
169 // and level compaction styles. This function also checks the case
170 // in which cf_paths is not specified, which results in db_paths
171 // being used.
172 if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
173 (cf_options.compaction_style != kCompactionStyleLevel)) {
174 if (cf_options.cf_paths.size() > 1) {
175 return Status::NotSupported(
176 "More than one CF paths are only supported in "
177 "universal and level compaction styles. ");
178 } else if (cf_options.cf_paths.empty() &&
179 db_options.db_paths.size() > 1) {
180 return Status::NotSupported(
181 "More than one DB paths are only supported in "
182 "universal and level compaction styles. ");
183 }
184 }
185 return Status::OK();
186 }
187
188 namespace {
189 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
190 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
191 }; // namespace
192
SanitizeOptions(const ImmutableDBOptions & db_options,const ColumnFamilyOptions & src)193 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
194 const ColumnFamilyOptions& src) {
195 ColumnFamilyOptions result = src;
196 size_t clamp_max = std::conditional<
197 sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
198 std::integral_constant<uint64_t, 64ull << 30>>::type::value;
199 ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
200 // if user sets arena_block_size, we trust user to use this value. Otherwise,
201 // calculate a proper value from writer_buffer_size;
202 if (result.arena_block_size <= 0) {
203 result.arena_block_size = result.write_buffer_size / 8;
204
205 // Align up to 4k
206 const size_t align = 4 * 1024;
207 result.arena_block_size =
208 ((result.arena_block_size + align - 1) / align) * align;
209 }
210 result.min_write_buffer_number_to_merge =
211 std::min(result.min_write_buffer_number_to_merge,
212 result.max_write_buffer_number - 1);
213 if (result.min_write_buffer_number_to_merge < 1) {
214 result.min_write_buffer_number_to_merge = 1;
215 }
216
217 if (result.num_levels < 1) {
218 result.num_levels = 1;
219 }
220 if (result.compaction_style == kCompactionStyleLevel &&
221 result.num_levels < 2) {
222 result.num_levels = 2;
223 }
224
225 if (result.compaction_style == kCompactionStyleUniversal &&
226 db_options.allow_ingest_behind && result.num_levels < 3) {
227 result.num_levels = 3;
228 }
229
230 if (result.max_write_buffer_number < 2) {
231 result.max_write_buffer_number = 2;
232 }
233 // fall back max_write_buffer_number_to_maintain if
234 // max_write_buffer_size_to_maintain is not set
235 if (result.max_write_buffer_size_to_maintain < 0) {
236 result.max_write_buffer_size_to_maintain =
237 result.max_write_buffer_number *
238 static_cast<int64_t>(result.write_buffer_size);
239 } else if (result.max_write_buffer_size_to_maintain == 0 &&
240 result.max_write_buffer_number_to_maintain < 0) {
241 result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
242 }
243 // bloom filter size shouldn't exceed 1/4 of memtable size.
244 if (result.memtable_prefix_bloom_size_ratio > 0.25) {
245 result.memtable_prefix_bloom_size_ratio = 0.25;
246 } else if (result.memtable_prefix_bloom_size_ratio < 0) {
247 result.memtable_prefix_bloom_size_ratio = 0;
248 }
249
250 if (!result.prefix_extractor) {
251 assert(result.memtable_factory);
252 Slice name = result.memtable_factory->Name();
253 if (name.compare("HashSkipListRepFactory") == 0 ||
254 name.compare("HashLinkListRepFactory") == 0) {
255 result.memtable_factory = std::make_shared<SkipListFactory>();
256 }
257 }
258
259 if (result.compaction_style == kCompactionStyleFIFO) {
260 result.num_levels = 1;
261 // since we delete level0 files in FIFO compaction when there are too many
262 // of them, these options don't really mean anything
263 result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
264 result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
265 }
266
267 if (result.max_bytes_for_level_multiplier <= 0) {
268 result.max_bytes_for_level_multiplier = 1;
269 }
270
271 if (result.level0_file_num_compaction_trigger == 0) {
272 ROCKS_LOG_WARN(db_options.info_log.get(),
273 "level0_file_num_compaction_trigger cannot be 0");
274 result.level0_file_num_compaction_trigger = 1;
275 }
276
277 if (result.level0_stop_writes_trigger <
278 result.level0_slowdown_writes_trigger ||
279 result.level0_slowdown_writes_trigger <
280 result.level0_file_num_compaction_trigger) {
281 ROCKS_LOG_WARN(db_options.info_log.get(),
282 "This condition must be satisfied: "
283 "level0_stop_writes_trigger(%d) >= "
284 "level0_slowdown_writes_trigger(%d) >= "
285 "level0_file_num_compaction_trigger(%d)",
286 result.level0_stop_writes_trigger,
287 result.level0_slowdown_writes_trigger,
288 result.level0_file_num_compaction_trigger);
289 if (result.level0_slowdown_writes_trigger <
290 result.level0_file_num_compaction_trigger) {
291 result.level0_slowdown_writes_trigger =
292 result.level0_file_num_compaction_trigger;
293 }
294 if (result.level0_stop_writes_trigger <
295 result.level0_slowdown_writes_trigger) {
296 result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
297 }
298 ROCKS_LOG_WARN(db_options.info_log.get(),
299 "Adjust the value to "
300 "level0_stop_writes_trigger(%d)"
301 "level0_slowdown_writes_trigger(%d)"
302 "level0_file_num_compaction_trigger(%d)",
303 result.level0_stop_writes_trigger,
304 result.level0_slowdown_writes_trigger,
305 result.level0_file_num_compaction_trigger);
306 }
307
308 if (result.soft_pending_compaction_bytes_limit == 0) {
309 result.soft_pending_compaction_bytes_limit =
310 result.hard_pending_compaction_bytes_limit;
311 } else if (result.hard_pending_compaction_bytes_limit > 0 &&
312 result.soft_pending_compaction_bytes_limit >
313 result.hard_pending_compaction_bytes_limit) {
314 result.soft_pending_compaction_bytes_limit =
315 result.hard_pending_compaction_bytes_limit;
316 }
317
318 #ifndef ROCKSDB_LITE
319 // When the DB is stopped, it's possible that there are some .trash files that
320 // were not deleted yet, when we open the DB we will find these .trash files
321 // and schedule them to be deleted (or delete immediately if SstFileManager
322 // was not used)
323 auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
324 for (size_t i = 0; i < result.cf_paths.size(); i++) {
325 DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
326 }
327 #endif
328
329 if (result.cf_paths.empty()) {
330 result.cf_paths = db_options.db_paths;
331 }
332
333 if (result.level_compaction_dynamic_level_bytes) {
334 if (result.compaction_style != kCompactionStyleLevel ||
335 result.cf_paths.size() > 1U) {
336 // 1. level_compaction_dynamic_level_bytes only makes sense for
337 // level-based compaction.
338 // 2. we don't yet know how to make both of this feature and multiple
339 // DB path work.
340 result.level_compaction_dynamic_level_bytes = false;
341 }
342 }
343
344 if (result.max_compaction_bytes == 0) {
345 result.max_compaction_bytes = result.target_file_size_base * 25;
346 }
347
348 bool is_block_based_table =
349 (result.table_factory->Name() == BlockBasedTableFactory().Name());
350
351 const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
352 if (result.ttl == kDefaultTtl) {
353 if (is_block_based_table &&
354 result.compaction_style != kCompactionStyleFIFO) {
355 result.ttl = kAdjustedTtl;
356 } else {
357 result.ttl = 0;
358 }
359 }
360
361 const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
362
363 // Turn on periodic compactions and set them to occur once every 30 days if
364 // compaction filters are used and periodic_compaction_seconds is set to the
365 // default value.
366 if (result.compaction_style != kCompactionStyleFIFO) {
367 if ((result.compaction_filter != nullptr ||
368 result.compaction_filter_factory != nullptr) &&
369 result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
370 is_block_based_table) {
371 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
372 }
373 } else {
374 // result.compaction_style == kCompactionStyleFIFO
375 if (result.ttl == 0) {
376 if (is_block_based_table) {
377 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
378 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
379 }
380 result.ttl = result.periodic_compaction_seconds;
381 }
382 } else if (result.periodic_compaction_seconds != 0) {
383 result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
384 }
385 }
386
387 // TTL compactions would work similar to Periodic Compactions in Universal in
388 // most of the cases. So, if ttl is set, execute the periodic compaction
389 // codepath.
390 if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
391 if (result.periodic_compaction_seconds != 0) {
392 result.periodic_compaction_seconds =
393 std::min(result.ttl, result.periodic_compaction_seconds);
394 } else {
395 result.periodic_compaction_seconds = result.ttl;
396 }
397 }
398
399 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
400 result.periodic_compaction_seconds = 0;
401 }
402
403 return result;
404 }
405
406 int SuperVersion::dummy = 0;
407 void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
408 void* const SuperVersion::kSVObsolete = nullptr;
409
~SuperVersion()410 SuperVersion::~SuperVersion() {
411 for (auto td : to_delete) {
412 delete td;
413 }
414 }
415
Ref()416 SuperVersion* SuperVersion::Ref() {
417 refs.fetch_add(1, std::memory_order_relaxed);
418 return this;
419 }
420
Unref()421 bool SuperVersion::Unref() {
422 // fetch_sub returns the previous value of ref
423 uint32_t previous_refs = refs.fetch_sub(1);
424 assert(previous_refs > 0);
425 return previous_refs == 1;
426 }
427
Cleanup()428 void SuperVersion::Cleanup() {
429 assert(refs.load(std::memory_order_relaxed) == 0);
430 imm->Unref(&to_delete);
431 MemTable* m = mem->Unref();
432 if (m != nullptr) {
433 auto* memory_usage = current->cfd()->imm()->current_memory_usage();
434 assert(*memory_usage >= m->ApproximateMemoryUsage());
435 *memory_usage -= m->ApproximateMemoryUsage();
436 to_delete.push_back(m);
437 }
438 current->Unref();
439 if (cfd->Unref()) {
440 delete cfd;
441 }
442 }
443
Init(ColumnFamilyData * new_cfd,MemTable * new_mem,MemTableListVersion * new_imm,Version * new_current)444 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
445 MemTableListVersion* new_imm, Version* new_current) {
446 cfd = new_cfd;
447 mem = new_mem;
448 imm = new_imm;
449 current = new_current;
450 cfd->Ref();
451 mem->Ref();
452 imm->Ref();
453 current->Ref();
454 refs.store(1, std::memory_order_relaxed);
455 }
456
457 namespace {
SuperVersionUnrefHandle(void * ptr)458 void SuperVersionUnrefHandle(void* ptr) {
459 // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
460 // destroyed. When former happens, the thread shouldn't see kSVInUse.
461 // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
462 // well.
463 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
464 bool was_last_ref __attribute__((__unused__));
465 was_last_ref = sv->Unref();
466 // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
467 // This is important because we can't do SuperVersion cleanup here.
468 // That would require locking DB mutex, which would deadlock because
469 // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
470 assert(!was_last_ref);
471 }
472 } // anonymous namespace
473
ColumnFamilyData(uint32_t id,const std::string & name,Version * _dummy_versions,Cache * _table_cache,WriteBufferManager * write_buffer_manager,const ColumnFamilyOptions & cf_options,const ImmutableDBOptions & db_options,const FileOptions & file_options,ColumnFamilySet * column_family_set,BlockCacheTracer * const block_cache_tracer)474 ColumnFamilyData::ColumnFamilyData(
475 uint32_t id, const std::string& name, Version* _dummy_versions,
476 Cache* _table_cache, WriteBufferManager* write_buffer_manager,
477 const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
478 const FileOptions& file_options, ColumnFamilySet* column_family_set,
479 BlockCacheTracer* const block_cache_tracer)
480 : id_(id),
481 name_(name),
482 dummy_versions_(_dummy_versions),
483 current_(nullptr),
484 refs_(0),
485 initialized_(false),
486 dropped_(false),
487 internal_comparator_(cf_options.comparator),
488 initial_cf_options_(SanitizeOptions(db_options, cf_options)),
489 ioptions_(db_options, initial_cf_options_),
490 mutable_cf_options_(initial_cf_options_),
491 is_delete_range_supported_(
492 cf_options.table_factory->IsDeleteRangeSupported()),
493 write_buffer_manager_(write_buffer_manager),
494 mem_(nullptr),
495 imm_(ioptions_.min_write_buffer_number_to_merge,
496 ioptions_.max_write_buffer_number_to_maintain,
497 ioptions_.max_write_buffer_size_to_maintain),
498 super_version_(nullptr),
499 super_version_number_(0),
500 local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
501 next_(nullptr),
502 prev_(nullptr),
503 log_number_(0),
504 flush_reason_(FlushReason::kOthers),
505 column_family_set_(column_family_set),
506 queued_for_flush_(false),
507 queued_for_compaction_(false),
508 prev_compaction_needed_bytes_(0),
509 allow_2pc_(db_options.allow_2pc),
510 last_memtable_id_(0) {
511 Ref();
512
513 // Convert user defined table properties collector factories to internal ones.
514 GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
515
516 // if _dummy_versions is nullptr, then this is a dummy column family.
517 if (_dummy_versions != nullptr) {
518 internal_stats_.reset(
519 new InternalStats(ioptions_.num_levels, db_options.env, this));
520 table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
521 block_cache_tracer));
522 if (ioptions_.compaction_style == kCompactionStyleLevel) {
523 compaction_picker_.reset(
524 new LevelCompactionPicker(ioptions_, &internal_comparator_));
525 #ifndef ROCKSDB_LITE
526 } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
527 compaction_picker_.reset(
528 new UniversalCompactionPicker(ioptions_, &internal_comparator_));
529 } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
530 compaction_picker_.reset(
531 new FIFOCompactionPicker(ioptions_, &internal_comparator_));
532 } else if (ioptions_.compaction_style == kCompactionStyleNone) {
533 compaction_picker_.reset(new NullCompactionPicker(
534 ioptions_, &internal_comparator_));
535 ROCKS_LOG_WARN(ioptions_.info_log,
536 "Column family %s does not use any background compaction. "
537 "Compactions can only be done via CompactFiles\n",
538 GetName().c_str());
539 #endif // !ROCKSDB_LITE
540 } else {
541 ROCKS_LOG_ERROR(ioptions_.info_log,
542 "Unable to recognize the specified compaction style %d. "
543 "Column family %s will use kCompactionStyleLevel.\n",
544 ioptions_.compaction_style, GetName().c_str());
545 compaction_picker_.reset(
546 new LevelCompactionPicker(ioptions_, &internal_comparator_));
547 }
548
549 if (column_family_set_->NumberOfColumnFamilies() < 10) {
550 ROCKS_LOG_INFO(ioptions_.info_log,
551 "--------------- Options for column family [%s]:\n",
552 name.c_str());
553 initial_cf_options_.Dump(ioptions_.info_log);
554 } else {
555 ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
556 }
557 }
558
559 RecalculateWriteStallConditions(mutable_cf_options_);
560 }
561
562 // DB mutex held
~ColumnFamilyData()563 ColumnFamilyData::~ColumnFamilyData() {
564 assert(refs_.load(std::memory_order_relaxed) == 0);
565 // remove from linked list
566 auto prev = prev_;
567 auto next = next_;
568 prev->next_ = next;
569 next->prev_ = prev;
570
571 if (!dropped_ && column_family_set_ != nullptr) {
572 // If it's dropped, it's already removed from column family set
573 // If column_family_set_ == nullptr, this is dummy CFD and not in
574 // ColumnFamilySet
575 column_family_set_->RemoveColumnFamily(this);
576 }
577
578 if (current_ != nullptr) {
579 current_->Unref();
580 }
581
582 // It would be wrong if this ColumnFamilyData is in flush_queue_ or
583 // compaction_queue_ and we destroyed it
584 assert(!queued_for_flush_);
585 assert(!queued_for_compaction_);
586 assert(super_version_ == nullptr);
587
588 if (dummy_versions_ != nullptr) {
589 // List must be empty
590 assert(dummy_versions_->TEST_Next() == dummy_versions_);
591 bool deleted __attribute__((__unused__));
592 deleted = dummy_versions_->Unref();
593 assert(deleted);
594 }
595
596 if (mem_ != nullptr) {
597 delete mem_->Unref();
598 }
599 autovector<MemTable*> to_delete;
600 imm_.current()->Unref(&to_delete);
601 for (MemTable* m : to_delete) {
602 delete m;
603 }
604 }
605
UnrefAndTryDelete()606 bool ColumnFamilyData::UnrefAndTryDelete() {
607 int old_refs = refs_.fetch_sub(1);
608 assert(old_refs > 0);
609
610 if (old_refs == 1) {
611 assert(super_version_ == nullptr);
612 delete this;
613 return true;
614 }
615
616 if (old_refs == 2 && super_version_ != nullptr) {
617 // Only the super_version_ holds me
618 SuperVersion* sv = super_version_;
619 super_version_ = nullptr;
620 // Release SuperVersion reference kept in ThreadLocalPtr.
621 // This must be done outside of mutex_ since unref handler can lock mutex.
622 sv->db_mutex->Unlock();
623 local_sv_.reset();
624 sv->db_mutex->Lock();
625
626 if (sv->Unref()) {
627 // May delete this ColumnFamilyData after calling Cleanup()
628 sv->Cleanup();
629 delete sv;
630 return true;
631 }
632 }
633 return false;
634 }
635
SetDropped()636 void ColumnFamilyData::SetDropped() {
637 // can't drop default CF
638 assert(id_ != 0);
639 dropped_ = true;
640 write_controller_token_.reset();
641
642 // remove from column_family_set
643 column_family_set_->RemoveColumnFamily(this);
644 }
645
GetLatestCFOptions() const646 ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
647 return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
648 }
649
OldestLogToKeep()650 uint64_t ColumnFamilyData::OldestLogToKeep() {
651 auto current_log = GetLogNumber();
652
653 if (allow_2pc_) {
654 autovector<MemTable*> empty_list;
655 auto imm_prep_log =
656 imm()->PrecomputeMinLogContainingPrepSection(empty_list);
657 auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
658
659 if (imm_prep_log > 0 && imm_prep_log < current_log) {
660 current_log = imm_prep_log;
661 }
662
663 if (mem_prep_log > 0 && mem_prep_log < current_log) {
664 current_log = mem_prep_log;
665 }
666 }
667
668 return current_log;
669 }
670
671 const double kIncSlowdownRatio = 0.8;
672 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
673 const double kNearStopSlowdownRatio = 0.6;
674 const double kDelayRecoverSlowdownRatio = 1.4;
675
676 namespace {
677 // If penalize_stop is true, we further reduce slowdown rate.
SetupDelay(WriteController * write_controller,uint64_t compaction_needed_bytes,uint64_t prev_compaction_need_bytes,bool penalize_stop,bool auto_comapctions_disabled)678 std::unique_ptr<WriteControllerToken> SetupDelay(
679 WriteController* write_controller, uint64_t compaction_needed_bytes,
680 uint64_t prev_compaction_need_bytes, bool penalize_stop,
681 bool auto_comapctions_disabled) {
682 const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
683
684 uint64_t max_write_rate = write_controller->max_delayed_write_rate();
685 uint64_t write_rate = write_controller->delayed_write_rate();
686
687 if (auto_comapctions_disabled) {
688 // When auto compaction is disabled, always use the value user gave.
689 write_rate = max_write_rate;
690 } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
691 // If user gives rate less than kMinWriteRate, don't adjust it.
692 //
693 // If already delayed, need to adjust based on previous compaction debt.
694 // When there are two or more column families require delay, we always
695 // increase or reduce write rate based on information for one single
696 // column family. It is likely to be OK but we can improve if there is a
697 // problem.
698 // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
699 // is only available in level-based compaction
700 //
701 // If the compaction debt stays the same as previously, we also further slow
702 // down. It usually means a mem table is full. It's mainly for the case
703 // where both of flush and compaction are much slower than the speed we
704 // insert to mem tables, so we need to actively slow down before we get
705 // feedback signal from compaction and flushes to avoid the full stop
706 // because of hitting the max write buffer number.
707 //
708 // If DB just falled into the stop condition, we need to further reduce
709 // the write rate to avoid the stop condition.
710 if (penalize_stop) {
711 // Penalize the near stop or stop condition by more aggressive slowdown.
712 // This is to provide the long term slowdown increase signal.
713 // The penalty is more than the reward of recovering to the normal
714 // condition.
715 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
716 kNearStopSlowdownRatio);
717 if (write_rate < kMinWriteRate) {
718 write_rate = kMinWriteRate;
719 }
720 } else if (prev_compaction_need_bytes > 0 &&
721 prev_compaction_need_bytes <= compaction_needed_bytes) {
722 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
723 kIncSlowdownRatio);
724 if (write_rate < kMinWriteRate) {
725 write_rate = kMinWriteRate;
726 }
727 } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
728 // We are speeding up by ratio of kSlowdownRatio when we have paid
729 // compaction debt. But we'll never speed up to faster than the write rate
730 // given by users.
731 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
732 kDecSlowdownRatio);
733 if (write_rate > max_write_rate) {
734 write_rate = max_write_rate;
735 }
736 }
737 }
738 return write_controller->GetDelayToken(write_rate);
739 }
740
GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,int level0_slowdown_writes_trigger)741 int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
742 int level0_slowdown_writes_trigger) {
743 // SanitizeOptions() ensures it.
744 assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
745
746 if (level0_file_num_compaction_trigger < 0) {
747 return std::numeric_limits<int>::max();
748 }
749
750 const int64_t twice_level0_trigger =
751 static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
752
753 const int64_t one_fourth_trigger_slowdown =
754 static_cast<int64_t>(level0_file_num_compaction_trigger) +
755 ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
756 4);
757
758 assert(twice_level0_trigger >= 0);
759 assert(one_fourth_trigger_slowdown >= 0);
760
761 // 1/4 of the way between L0 compaction trigger threshold and slowdown
762 // condition.
763 // Or twice as compaction trigger, if it is smaller.
764 int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
765 if (res >= port::kMaxInt32) {
766 return port::kMaxInt32;
767 } else {
768 // res fits in int
769 return static_cast<int>(res);
770 }
771 }
772 } // namespace
773
774 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
GetWriteStallConditionAndCause(int num_unflushed_memtables,int num_l0_files,uint64_t num_compaction_needed_bytes,const MutableCFOptions & mutable_cf_options)775 ColumnFamilyData::GetWriteStallConditionAndCause(
776 int num_unflushed_memtables, int num_l0_files,
777 uint64_t num_compaction_needed_bytes,
778 const MutableCFOptions& mutable_cf_options) {
779 if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
780 return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
781 } else if (!mutable_cf_options.disable_auto_compactions &&
782 num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
783 return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
784 } else if (!mutable_cf_options.disable_auto_compactions &&
785 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
786 num_compaction_needed_bytes >=
787 mutable_cf_options.hard_pending_compaction_bytes_limit) {
788 return {WriteStallCondition::kStopped,
789 WriteStallCause::kPendingCompactionBytes};
790 } else if (mutable_cf_options.max_write_buffer_number > 3 &&
791 num_unflushed_memtables >=
792 mutable_cf_options.max_write_buffer_number - 1) {
793 return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
794 } else if (!mutable_cf_options.disable_auto_compactions &&
795 mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
796 num_l0_files >=
797 mutable_cf_options.level0_slowdown_writes_trigger) {
798 return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
799 } else if (!mutable_cf_options.disable_auto_compactions &&
800 mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
801 num_compaction_needed_bytes >=
802 mutable_cf_options.soft_pending_compaction_bytes_limit) {
803 return {WriteStallCondition::kDelayed,
804 WriteStallCause::kPendingCompactionBytes};
805 }
806 return {WriteStallCondition::kNormal, WriteStallCause::kNone};
807 }
808
RecalculateWriteStallConditions(const MutableCFOptions & mutable_cf_options)809 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
810 const MutableCFOptions& mutable_cf_options) {
811 auto write_stall_condition = WriteStallCondition::kNormal;
812 if (current_ != nullptr) {
813 auto* vstorage = current_->storage_info();
814 auto write_controller = column_family_set_->write_controller_;
815 uint64_t compaction_needed_bytes =
816 vstorage->estimated_compaction_needed_bytes();
817
818 auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
819 imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
820 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
821 write_stall_condition = write_stall_condition_and_cause.first;
822 auto write_stall_cause = write_stall_condition_and_cause.second;
823
824 bool was_stopped = write_controller->IsStopped();
825 bool needed_delay = write_controller->NeedsDelay();
826
827 if (write_stall_condition == WriteStallCondition::kStopped &&
828 write_stall_cause == WriteStallCause::kMemtableLimit) {
829 write_controller_token_ = write_controller->GetStopToken();
830 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
831 ROCKS_LOG_WARN(
832 ioptions_.info_log,
833 "[%s] Stopping writes because we have %d immutable memtables "
834 "(waiting for flush), max_write_buffer_number is set to %d",
835 name_.c_str(), imm()->NumNotFlushed(),
836 mutable_cf_options.max_write_buffer_number);
837 } else if (write_stall_condition == WriteStallCondition::kStopped &&
838 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
839 write_controller_token_ = write_controller->GetStopToken();
840 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
841 if (compaction_picker_->IsLevel0CompactionInProgress()) {
842 internal_stats_->AddCFStats(
843 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
844 }
845 ROCKS_LOG_WARN(ioptions_.info_log,
846 "[%s] Stopping writes because we have %d level-0 files",
847 name_.c_str(), vstorage->l0_delay_trigger_count());
848 } else if (write_stall_condition == WriteStallCondition::kStopped &&
849 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
850 write_controller_token_ = write_controller->GetStopToken();
851 internal_stats_->AddCFStats(
852 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
853 ROCKS_LOG_WARN(
854 ioptions_.info_log,
855 "[%s] Stopping writes because of estimated pending compaction "
856 "bytes %" PRIu64,
857 name_.c_str(), compaction_needed_bytes);
858 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
859 write_stall_cause == WriteStallCause::kMemtableLimit) {
860 write_controller_token_ =
861 SetupDelay(write_controller, compaction_needed_bytes,
862 prev_compaction_needed_bytes_, was_stopped,
863 mutable_cf_options.disable_auto_compactions);
864 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
865 ROCKS_LOG_WARN(
866 ioptions_.info_log,
867 "[%s] Stalling writes because we have %d immutable memtables "
868 "(waiting for flush), max_write_buffer_number is set to %d "
869 "rate %" PRIu64,
870 name_.c_str(), imm()->NumNotFlushed(),
871 mutable_cf_options.max_write_buffer_number,
872 write_controller->delayed_write_rate());
873 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
874 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
875 // L0 is the last two files from stopping.
876 bool near_stop = vstorage->l0_delay_trigger_count() >=
877 mutable_cf_options.level0_stop_writes_trigger - 2;
878 write_controller_token_ =
879 SetupDelay(write_controller, compaction_needed_bytes,
880 prev_compaction_needed_bytes_, was_stopped || near_stop,
881 mutable_cf_options.disable_auto_compactions);
882 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
883 1);
884 if (compaction_picker_->IsLevel0CompactionInProgress()) {
885 internal_stats_->AddCFStats(
886 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
887 }
888 ROCKS_LOG_WARN(ioptions_.info_log,
889 "[%s] Stalling writes because we have %d level-0 files "
890 "rate %" PRIu64,
891 name_.c_str(), vstorage->l0_delay_trigger_count(),
892 write_controller->delayed_write_rate());
893 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
894 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
895 // If the distance to hard limit is less than 1/4 of the gap between soft
896 // and
897 // hard bytes limit, we think it is near stop and speed up the slowdown.
898 bool near_stop =
899 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
900 (compaction_needed_bytes -
901 mutable_cf_options.soft_pending_compaction_bytes_limit) >
902 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
903 mutable_cf_options.soft_pending_compaction_bytes_limit) /
904 4;
905
906 write_controller_token_ =
907 SetupDelay(write_controller, compaction_needed_bytes,
908 prev_compaction_needed_bytes_, was_stopped || near_stop,
909 mutable_cf_options.disable_auto_compactions);
910 internal_stats_->AddCFStats(
911 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
912 ROCKS_LOG_WARN(
913 ioptions_.info_log,
914 "[%s] Stalling writes because of estimated pending compaction "
915 "bytes %" PRIu64 " rate %" PRIu64,
916 name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
917 write_controller->delayed_write_rate());
918 } else {
919 assert(write_stall_condition == WriteStallCondition::kNormal);
920 if (vstorage->l0_delay_trigger_count() >=
921 GetL0ThresholdSpeedupCompaction(
922 mutable_cf_options.level0_file_num_compaction_trigger,
923 mutable_cf_options.level0_slowdown_writes_trigger)) {
924 write_controller_token_ =
925 write_controller->GetCompactionPressureToken();
926 ROCKS_LOG_INFO(
927 ioptions_.info_log,
928 "[%s] Increasing compaction threads because we have %d level-0 "
929 "files ",
930 name_.c_str(), vstorage->l0_delay_trigger_count());
931 } else if (vstorage->estimated_compaction_needed_bytes() >=
932 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
933 // Increase compaction threads if bytes needed for compaction exceeds
934 // 1/4 of threshold for slowing down.
935 // If soft pending compaction byte limit is not set, always speed up
936 // compaction.
937 write_controller_token_ =
938 write_controller->GetCompactionPressureToken();
939 if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
940 ROCKS_LOG_INFO(
941 ioptions_.info_log,
942 "[%s] Increasing compaction threads because of estimated pending "
943 "compaction "
944 "bytes %" PRIu64,
945 name_.c_str(), vstorage->estimated_compaction_needed_bytes());
946 }
947 } else {
948 write_controller_token_.reset();
949 }
950 // If the DB recovers from delay conditions, we reward with reducing
951 // double the slowdown ratio. This is to balance the long term slowdown
952 // increase signal.
953 if (needed_delay) {
954 uint64_t write_rate = write_controller->delayed_write_rate();
955 write_controller->set_delayed_write_rate(static_cast<uint64_t>(
956 static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
957 // Set the low pri limit to be 1/4 the delayed write rate.
958 // Note we don't reset this value even after delay condition is relased.
959 // Low-pri rate will continue to apply if there is a compaction
960 // pressure.
961 write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
962 4);
963 }
964 }
965 prev_compaction_needed_bytes_ = compaction_needed_bytes;
966 }
967 return write_stall_condition;
968 }
969
soptions() const970 const FileOptions* ColumnFamilyData::soptions() const {
971 return &(column_family_set_->file_options_);
972 }
973
SetCurrent(Version * current_version)974 void ColumnFamilyData::SetCurrent(Version* current_version) {
975 current_ = current_version;
976 }
977
GetNumLiveVersions() const978 uint64_t ColumnFamilyData::GetNumLiveVersions() const {
979 return VersionSet::GetNumLiveVersions(dummy_versions_);
980 }
981
GetTotalSstFilesSize() const982 uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
983 return VersionSet::GetTotalSstFilesSize(dummy_versions_);
984 }
985
GetLiveSstFilesSize() const986 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
987 return current_->GetSstFilesSize();
988 }
989
ConstructNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)990 MemTable* ColumnFamilyData::ConstructNewMemtable(
991 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
992 return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
993 write_buffer_manager_, earliest_seq, id_);
994 }
995
CreateNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)996 void ColumnFamilyData::CreateNewMemtable(
997 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
998 if (mem_ != nullptr) {
999 delete mem_->Unref();
1000 }
1001 SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1002 mem_->Ref();
1003 }
1004
NeedsCompaction() const1005 bool ColumnFamilyData::NeedsCompaction() const {
1006 return compaction_picker_->NeedsCompaction(current_->storage_info());
1007 }
1008
PickCompaction(const MutableCFOptions & mutable_options,LogBuffer * log_buffer)1009 Compaction* ColumnFamilyData::PickCompaction(
1010 const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
1011 SequenceNumber earliest_mem_seqno =
1012 std::min(mem_->GetEarliestSequenceNumber(),
1013 imm_.current()->GetEarliestSequenceNumber(false));
1014 auto* result = compaction_picker_->PickCompaction(
1015 GetName(), mutable_options, current_->storage_info(), log_buffer,
1016 earliest_mem_seqno);
1017 if (result != nullptr) {
1018 result->SetInputVersion(current_);
1019 }
1020 return result;
1021 }
1022
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const1023 bool ColumnFamilyData::RangeOverlapWithCompaction(
1024 const Slice& smallest_user_key, const Slice& largest_user_key,
1025 int level) const {
1026 return compaction_picker_->RangeOverlapWithCompaction(
1027 smallest_user_key, largest_user_key, level);
1028 }
1029
RangesOverlapWithMemtables(const autovector<Range> & ranges,SuperVersion * super_version,bool * overlap)1030 Status ColumnFamilyData::RangesOverlapWithMemtables(
1031 const autovector<Range>& ranges, SuperVersion* super_version,
1032 bool* overlap) {
1033 assert(overlap != nullptr);
1034 *overlap = false;
1035 // Create an InternalIterator over all unflushed memtables
1036 Arena arena;
1037 ReadOptions read_opts;
1038 read_opts.total_order_seek = true;
1039 MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1040 merge_iter_builder.AddIterator(
1041 super_version->mem->NewIterator(read_opts, &arena));
1042 super_version->imm->AddIterators(read_opts, &merge_iter_builder);
1043 ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1044
1045 auto read_seq = super_version->current->version_set()->LastSequence();
1046 ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1047 auto* active_range_del_iter =
1048 super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
1049 range_del_agg.AddTombstones(
1050 std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1051 super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
1052 &range_del_agg);
1053
1054 Status status;
1055 for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1056 auto* vstorage = super_version->current->storage_info();
1057 auto* ucmp = vstorage->InternalComparator()->user_comparator();
1058 InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1059 kValueTypeForSeek);
1060 memtable_iter->Seek(range_start.Encode());
1061 status = memtable_iter->status();
1062 ParsedInternalKey seek_result;
1063 if (status.ok()) {
1064 if (memtable_iter->Valid() &&
1065 !ParseInternalKey(memtable_iter->key(), &seek_result)) {
1066 status = Status::Corruption("DB have corrupted keys");
1067 }
1068 }
1069 if (status.ok()) {
1070 if (memtable_iter->Valid() &&
1071 ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1072 *overlap = true;
1073 } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1074 ranges[i].limit)) {
1075 *overlap = true;
1076 }
1077 }
1078 }
1079 return status;
1080 }
1081
1082 const int ColumnFamilyData::kCompactAllLevels = -1;
1083 const int ColumnFamilyData::kCompactToBaseLevel = -2;
1084
CompactRange(const MutableCFOptions & mutable_cf_options,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * conflict,uint64_t max_file_num_to_ignore)1085 Compaction* ColumnFamilyData::CompactRange(
1086 const MutableCFOptions& mutable_cf_options, int input_level,
1087 int output_level, const CompactRangeOptions& compact_range_options,
1088 const InternalKey* begin, const InternalKey* end,
1089 InternalKey** compaction_end, bool* conflict,
1090 uint64_t max_file_num_to_ignore) {
1091 auto* result = compaction_picker_->CompactRange(
1092 GetName(), mutable_cf_options, current_->storage_info(), input_level,
1093 output_level, compact_range_options, begin, end, compaction_end, conflict,
1094 max_file_num_to_ignore);
1095 if (result != nullptr) {
1096 result->SetInputVersion(current_);
1097 }
1098 return result;
1099 }
1100
GetReferencedSuperVersion(DBImpl * db)1101 SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1102 SuperVersion* sv = GetThreadLocalSuperVersion(db);
1103 sv->Ref();
1104 if (!ReturnThreadLocalSuperVersion(sv)) {
1105 // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1106 // when the thread-local pointer was populated. So, the Ref() earlier in
1107 // this function still prevents the returned SuperVersion* from being
1108 // deleted out from under the caller.
1109 sv->Unref();
1110 }
1111 return sv;
1112 }
1113
GetThreadLocalSuperVersion(DBImpl * db)1114 SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1115 // The SuperVersion is cached in thread local storage to avoid acquiring
1116 // mutex when SuperVersion does not change since the last use. When a new
1117 // SuperVersion is installed, the compaction or flush thread cleans up
1118 // cached SuperVersion in all existing thread local storage. To avoid
1119 // acquiring mutex for this operation, we use atomic Swap() on the thread
1120 // local pointer to guarantee exclusive access. If the thread local pointer
1121 // is being used while a new SuperVersion is installed, the cached
1122 // SuperVersion can become stale. In that case, the background thread would
1123 // have swapped in kSVObsolete. We re-check the value at when returning
1124 // SuperVersion back to thread local, with an atomic compare and swap.
1125 // The superversion will need to be released if detected to be stale.
1126 void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1127 // Invariant:
1128 // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1129 // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1130 // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1131 // (if no Scrape happens).
1132 assert(ptr != SuperVersion::kSVInUse);
1133 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1134 if (sv == SuperVersion::kSVObsolete ||
1135 sv->version_number != super_version_number_.load()) {
1136 RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1137 SuperVersion* sv_to_delete = nullptr;
1138
1139 if (sv && sv->Unref()) {
1140 RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1141 db->mutex()->Lock();
1142 // NOTE: underlying resources held by superversion (sst files) might
1143 // not be released until the next background job.
1144 sv->Cleanup();
1145 if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1146 db->AddSuperVersionsToFreeQueue(sv);
1147 db->SchedulePurge();
1148 } else {
1149 sv_to_delete = sv;
1150 }
1151 } else {
1152 db->mutex()->Lock();
1153 }
1154 sv = super_version_->Ref();
1155 db->mutex()->Unlock();
1156
1157 delete sv_to_delete;
1158 }
1159 assert(sv != nullptr);
1160 return sv;
1161 }
1162
ReturnThreadLocalSuperVersion(SuperVersion * sv)1163 bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1164 assert(sv != nullptr);
1165 // Put the SuperVersion back
1166 void* expected = SuperVersion::kSVInUse;
1167 if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1168 // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1169 // storage has not been altered and no Scrape has happened. The
1170 // SuperVersion is still current.
1171 return true;
1172 } else {
1173 // ThreadLocal scrape happened in the process of this GetImpl call (after
1174 // thread local Swap() at the beginning and before CompareAndSwap()).
1175 // This means the SuperVersion it holds is obsolete.
1176 assert(expected == SuperVersion::kSVObsolete);
1177 }
1178 return false;
1179 }
1180
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex)1181 void ColumnFamilyData::InstallSuperVersion(
1182 SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1183 db_mutex->AssertHeld();
1184 return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1185 }
1186
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex,const MutableCFOptions & mutable_cf_options)1187 void ColumnFamilyData::InstallSuperVersion(
1188 SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1189 const MutableCFOptions& mutable_cf_options) {
1190 SuperVersion* new_superversion = sv_context->new_superversion.release();
1191 new_superversion->db_mutex = db_mutex;
1192 new_superversion->mutable_cf_options = mutable_cf_options;
1193 new_superversion->Init(this, mem_, imm_.current(), current_);
1194 SuperVersion* old_superversion = super_version_;
1195 super_version_ = new_superversion;
1196 ++super_version_number_;
1197 super_version_->version_number = super_version_number_;
1198 super_version_->write_stall_condition =
1199 RecalculateWriteStallConditions(mutable_cf_options);
1200
1201 if (old_superversion != nullptr) {
1202 // Reset SuperVersions cached in thread local storage.
1203 // This should be done before old_superversion->Unref(). That's to ensure
1204 // that local_sv_ never holds the last reference to SuperVersion, since
1205 // it has no means to safely do SuperVersion cleanup.
1206 ResetThreadLocalSuperVersions();
1207
1208 if (old_superversion->mutable_cf_options.write_buffer_size !=
1209 mutable_cf_options.write_buffer_size) {
1210 mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1211 }
1212 if (old_superversion->write_stall_condition !=
1213 new_superversion->write_stall_condition) {
1214 sv_context->PushWriteStallNotification(
1215 old_superversion->write_stall_condition,
1216 new_superversion->write_stall_condition, GetName(), ioptions());
1217 }
1218 if (old_superversion->Unref()) {
1219 old_superversion->Cleanup();
1220 sv_context->superversions_to_free.push_back(old_superversion);
1221 }
1222 }
1223 }
1224
ResetThreadLocalSuperVersions()1225 void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1226 autovector<void*> sv_ptrs;
1227 local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1228 for (auto ptr : sv_ptrs) {
1229 assert(ptr);
1230 if (ptr == SuperVersion::kSVInUse) {
1231 continue;
1232 }
1233 auto sv = static_cast<SuperVersion*>(ptr);
1234 bool was_last_ref __attribute__((__unused__));
1235 was_last_ref = sv->Unref();
1236 // sv couldn't have been the last reference because
1237 // ResetThreadLocalSuperVersions() is called before
1238 // unref'ing super_version_.
1239 assert(!was_last_ref);
1240 }
1241 }
1242
ValidateOptions(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)1243 Status ColumnFamilyData::ValidateOptions(
1244 const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1245 Status s;
1246 s = CheckCompressionSupported(cf_options);
1247 if (s.ok() && db_options.allow_concurrent_memtable_write) {
1248 s = CheckConcurrentWritesSupported(cf_options);
1249 }
1250 if (s.ok() && db_options.unordered_write &&
1251 cf_options.max_successive_merges != 0) {
1252 s = Status::InvalidArgument(
1253 "max_successive_merges > 0 is incompatible with unordered_write");
1254 }
1255 if (s.ok()) {
1256 s = CheckCFPathsSupported(db_options, cf_options);
1257 }
1258 if (!s.ok()) {
1259 return s;
1260 }
1261
1262 if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1263 if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1264 return Status::NotSupported(
1265 "TTL is only supported in Block-Based Table format. ");
1266 }
1267 }
1268
1269 if (cf_options.periodic_compaction_seconds > 0 &&
1270 cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1271 if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1272 return Status::NotSupported(
1273 "Periodic Compaction is only supported in "
1274 "Block-Based Table format. ");
1275 }
1276 }
1277 return s;
1278 }
1279
1280 #ifndef ROCKSDB_LITE
SetOptions(const DBOptions & db_options,const std::unordered_map<std::string,std::string> & options_map)1281 Status ColumnFamilyData::SetOptions(
1282 const DBOptions& db_options,
1283 const std::unordered_map<std::string, std::string>& options_map) {
1284 MutableCFOptions new_mutable_cf_options;
1285 Status s =
1286 GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
1287 ioptions_.info_log, &new_mutable_cf_options);
1288 if (s.ok()) {
1289 ColumnFamilyOptions cf_options =
1290 BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
1291 s = ValidateOptions(db_options, cf_options);
1292 }
1293 if (s.ok()) {
1294 mutable_cf_options_ = new_mutable_cf_options;
1295 mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1296 }
1297 return s;
1298 }
1299 #endif // ROCKSDB_LITE
1300
1301 // REQUIRES: DB mutex held
CalculateSSTWriteHint(int level)1302 Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1303 if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1304 return Env::WLTH_NOT_SET;
1305 }
1306 if (level == 0) {
1307 return Env::WLTH_MEDIUM;
1308 }
1309 int base_level = current_->storage_info()->base_level();
1310
1311 // L1: medium, L2: long, ...
1312 if (level - base_level >= 2) {
1313 return Env::WLTH_EXTREME;
1314 } else if (level < base_level) {
1315 // There is no restriction which prevents level passed in to be smaller
1316 // than base_level.
1317 return Env::WLTH_MEDIUM;
1318 }
1319 return static_cast<Env::WriteLifeTimeHint>(level - base_level +
1320 static_cast<int>(Env::WLTH_MEDIUM));
1321 }
1322
AddDirectories(std::map<std::string,std::shared_ptr<Directory>> * created_dirs)1323 Status ColumnFamilyData::AddDirectories(
1324 std::map<std::string, std::shared_ptr<Directory>>* created_dirs) {
1325 Status s;
1326 assert(created_dirs != nullptr);
1327 assert(data_dirs_.empty());
1328 for (auto& p : ioptions_.cf_paths) {
1329 auto existing_dir = created_dirs->find(p.path);
1330
1331 if (existing_dir == created_dirs->end()) {
1332 std::unique_ptr<Directory> path_directory;
1333 s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
1334 if (!s.ok()) {
1335 return s;
1336 }
1337 assert(path_directory != nullptr);
1338 data_dirs_.emplace_back(path_directory.release());
1339 (*created_dirs)[p.path] = data_dirs_.back();
1340 } else {
1341 data_dirs_.emplace_back(existing_dir->second);
1342 }
1343 }
1344 assert(data_dirs_.size() == ioptions_.cf_paths.size());
1345 return s;
1346 }
1347
GetDataDir(size_t path_id) const1348 Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1349 if (data_dirs_.empty()) {
1350 return nullptr;
1351 }
1352
1353 assert(path_id < data_dirs_.size());
1354 return data_dirs_[path_id].get();
1355 }
1356
ColumnFamilySet(const std::string & dbname,const ImmutableDBOptions * db_options,const FileOptions & file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer)1357 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1358 const ImmutableDBOptions* db_options,
1359 const FileOptions& file_options,
1360 Cache* table_cache,
1361 WriteBufferManager* write_buffer_manager,
1362 WriteController* write_controller,
1363 BlockCacheTracer* const block_cache_tracer)
1364 : max_column_family_(0),
1365 dummy_cfd_(new ColumnFamilyData(
1366 0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
1367 file_options, nullptr, block_cache_tracer)),
1368 default_cfd_cache_(nullptr),
1369 db_name_(dbname),
1370 db_options_(db_options),
1371 file_options_(file_options),
1372 table_cache_(table_cache),
1373 write_buffer_manager_(write_buffer_manager),
1374 write_controller_(write_controller),
1375 block_cache_tracer_(block_cache_tracer) {
1376 // initialize linked list
1377 dummy_cfd_->prev_ = dummy_cfd_;
1378 dummy_cfd_->next_ = dummy_cfd_;
1379 }
1380
~ColumnFamilySet()1381 ColumnFamilySet::~ColumnFamilySet() {
1382 while (column_family_data_.size() > 0) {
1383 // cfd destructor will delete itself from column_family_data_
1384 auto cfd = column_family_data_.begin()->second;
1385 bool last_ref __attribute__((__unused__));
1386 last_ref = cfd->UnrefAndTryDelete();
1387 assert(last_ref);
1388 }
1389 bool dummy_last_ref __attribute__((__unused__));
1390 dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1391 assert(dummy_last_ref);
1392 }
1393
GetDefault() const1394 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1395 assert(default_cfd_cache_ != nullptr);
1396 return default_cfd_cache_;
1397 }
1398
GetColumnFamily(uint32_t id) const1399 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1400 auto cfd_iter = column_family_data_.find(id);
1401 if (cfd_iter != column_family_data_.end()) {
1402 return cfd_iter->second;
1403 } else {
1404 return nullptr;
1405 }
1406 }
1407
GetColumnFamily(const std::string & name) const1408 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
1409 const {
1410 auto cfd_iter = column_families_.find(name);
1411 if (cfd_iter != column_families_.end()) {
1412 auto cfd = GetColumnFamily(cfd_iter->second);
1413 assert(cfd != nullptr);
1414 return cfd;
1415 } else {
1416 return nullptr;
1417 }
1418 }
1419
GetNextColumnFamilyID()1420 uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1421 return ++max_column_family_;
1422 }
1423
GetMaxColumnFamily()1424 uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1425
UpdateMaxColumnFamily(uint32_t new_max_column_family)1426 void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1427 max_column_family_ = std::max(new_max_column_family, max_column_family_);
1428 }
1429
NumberOfColumnFamilies() const1430 size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1431 return column_families_.size();
1432 }
1433
1434 // under a DB mutex AND write thread
CreateColumnFamily(const std::string & name,uint32_t id,Version * dummy_versions,const ColumnFamilyOptions & options)1435 ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1436 const std::string& name, uint32_t id, Version* dummy_versions,
1437 const ColumnFamilyOptions& options) {
1438 assert(column_families_.find(name) == column_families_.end());
1439 ColumnFamilyData* new_cfd = new ColumnFamilyData(
1440 id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1441 *db_options_, file_options_, this, block_cache_tracer_);
1442 column_families_.insert({name, id});
1443 column_family_data_.insert({id, new_cfd});
1444 max_column_family_ = std::max(max_column_family_, id);
1445 // add to linked list
1446 new_cfd->next_ = dummy_cfd_;
1447 auto prev = dummy_cfd_->prev_;
1448 new_cfd->prev_ = prev;
1449 prev->next_ = new_cfd;
1450 dummy_cfd_->prev_ = new_cfd;
1451 if (id == 0) {
1452 default_cfd_cache_ = new_cfd;
1453 }
1454 return new_cfd;
1455 }
1456
1457 // REQUIRES: DB mutex held
FreeDeadColumnFamilies()1458 void ColumnFamilySet::FreeDeadColumnFamilies() {
1459 autovector<ColumnFamilyData*> to_delete;
1460 for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
1461 if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1462 to_delete.push_back(cfd);
1463 }
1464 }
1465 for (auto cfd : to_delete) {
1466 // this is very rare, so it's not a problem that we do it under a mutex
1467 delete cfd;
1468 }
1469 }
1470
1471 // under a DB mutex AND from a write thread
RemoveColumnFamily(ColumnFamilyData * cfd)1472 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1473 auto cfd_iter = column_family_data_.find(cfd->GetID());
1474 assert(cfd_iter != column_family_data_.end());
1475 column_family_data_.erase(cfd_iter);
1476 column_families_.erase(cfd->GetName());
1477 }
1478
1479 // under a DB mutex OR from a write thread
Seek(uint32_t column_family_id)1480 bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1481 if (column_family_id == 0) {
1482 // optimization for common case
1483 current_ = column_family_set_->GetDefault();
1484 } else {
1485 current_ = column_family_set_->GetColumnFamily(column_family_id);
1486 }
1487 handle_.SetCFD(current_);
1488 return current_ != nullptr;
1489 }
1490
GetLogNumber() const1491 uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1492 assert(current_ != nullptr);
1493 return current_->GetLogNumber();
1494 }
1495
GetMemTable() const1496 MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1497 assert(current_ != nullptr);
1498 return current_->mem();
1499 }
1500
GetColumnFamilyHandle()1501 ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1502 assert(current_ != nullptr);
1503 return &handle_;
1504 }
1505
GetColumnFamilyID(ColumnFamilyHandle * column_family)1506 uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1507 uint32_t column_family_id = 0;
1508 if (column_family != nullptr) {
1509 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1510 column_family_id = cfh->GetID();
1511 }
1512 return column_family_id;
1513 }
1514
GetColumnFamilyUserComparator(ColumnFamilyHandle * column_family)1515 const Comparator* GetColumnFamilyUserComparator(
1516 ColumnFamilyHandle* column_family) {
1517 if (column_family != nullptr) {
1518 return column_family->GetComparator();
1519 }
1520 return nullptr;
1521 }
1522
1523 } // namespace ROCKSDB_NAMESPACE
1524