1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/column_family.h"
11 
12 #include <algorithm>
13 #include <cinttypes>
14 #include <limits>
15 #include <string>
16 #include <vector>
17 
18 #include "db/compaction/compaction_picker.h"
19 #include "db/compaction/compaction_picker_fifo.h"
20 #include "db/compaction/compaction_picker_level.h"
21 #include "db/compaction/compaction_picker_universal.h"
22 #include "db/db_impl/db_impl.h"
23 #include "db/internal_stats.h"
24 #include "db/job_context.h"
25 #include "db/range_del_aggregator.h"
26 #include "db/table_properties_collector.h"
27 #include "db/version_set.h"
28 #include "db/write_controller.h"
29 #include "file/sst_file_manager_impl.h"
30 #include "memtable/hash_skiplist_rep.h"
31 #include "monitoring/thread_status_util.h"
32 #include "options/options_helper.h"
33 #include "port/port.h"
34 #include "table/block_based/block_based_table_factory.h"
35 #include "table/merging_iterator.h"
36 #include "util/autovector.h"
37 #include "util/compression.h"
38 
39 namespace ROCKSDB_NAMESPACE {
40 
ColumnFamilyHandleImpl(ColumnFamilyData * column_family_data,DBImpl * db,InstrumentedMutex * mutex)41 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
42     ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
43     : cfd_(column_family_data), db_(db), mutex_(mutex) {
44   if (cfd_ != nullptr) {
45     cfd_->Ref();
46   }
47 }
48 
~ColumnFamilyHandleImpl()49 ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
50   if (cfd_ != nullptr) {
51 #ifndef ROCKSDB_LITE
52     for (auto& listener : cfd_->ioptions()->listeners) {
53       listener->OnColumnFamilyHandleDeletionStarted(this);
54     }
55 #endif  // ROCKSDB_LITE
56     // Job id == 0 means that this is not our background process, but rather
57     // user thread
58     // Need to hold some shared pointers owned by the initial_cf_options
59     // before final cleaning up finishes.
60     ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
61     JobContext job_context(0);
62     mutex_->Lock();
63     bool dropped = cfd_->IsDropped();
64     if (cfd_->UnrefAndTryDelete()) {
65       if (dropped) {
66         db_->FindObsoleteFiles(&job_context, false, true);
67       }
68     }
69     mutex_->Unlock();
70     if (job_context.HaveSomethingToDelete()) {
71       bool defer_purge =
72           db_->immutable_db_options().avoid_unnecessary_blocking_io;
73       db_->PurgeObsoleteFiles(job_context, defer_purge);
74       if (defer_purge) {
75         mutex_->Lock();
76         db_->SchedulePurge();
77         mutex_->Unlock();
78       }
79     }
80     job_context.Clean();
81   }
82 }
83 
GetID() const84 uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
85 
GetName() const86 const std::string& ColumnFamilyHandleImpl::GetName() const {
87   return cfd()->GetName();
88 }
89 
GetDescriptor(ColumnFamilyDescriptor * desc)90 Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
91 #ifndef ROCKSDB_LITE
92   // accessing mutable cf-options requires db mutex.
93   InstrumentedMutexLock l(mutex_);
94   *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
95   return Status::OK();
96 #else
97   (void)desc;
98   return Status::NotSupported();
99 #endif  // !ROCKSDB_LITE
100 }
101 
GetComparator() const102 const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
103   return cfd()->user_comparator();
104 }
105 
GetIntTblPropCollectorFactory(const ImmutableCFOptions & ioptions,std::vector<std::unique_ptr<IntTblPropCollectorFactory>> * int_tbl_prop_collector_factories)106 void GetIntTblPropCollectorFactory(
107     const ImmutableCFOptions& ioptions,
108     std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
109         int_tbl_prop_collector_factories) {
110   auto& collector_factories = ioptions.table_properties_collector_factories;
111   for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
112        ++i) {
113     assert(collector_factories[i]);
114     int_tbl_prop_collector_factories->emplace_back(
115         new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
116   }
117 }
118 
CheckCompressionSupported(const ColumnFamilyOptions & cf_options)119 Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
120   if (!cf_options.compression_per_level.empty()) {
121     for (size_t level = 0; level < cf_options.compression_per_level.size();
122          ++level) {
123       if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
124         return Status::InvalidArgument(
125             "Compression type " +
126             CompressionTypeToString(cf_options.compression_per_level[level]) +
127             " is not linked with the binary.");
128       }
129     }
130   } else {
131     if (!CompressionTypeSupported(cf_options.compression)) {
132       return Status::InvalidArgument(
133           "Compression type " +
134           CompressionTypeToString(cf_options.compression) +
135           " is not linked with the binary.");
136     }
137   }
138   if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
139     if (!ZSTD_TrainDictionarySupported()) {
140       return Status::InvalidArgument(
141           "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
142           "is not linked with the binary.");
143     }
144     if (cf_options.compression_opts.max_dict_bytes == 0) {
145       return Status::InvalidArgument(
146           "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
147           "should be nonzero if we're using zstd's dictionary generator.");
148     }
149   }
150   return Status::OK();
151 }
152 
CheckConcurrentWritesSupported(const ColumnFamilyOptions & cf_options)153 Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
154   if (cf_options.inplace_update_support) {
155     return Status::InvalidArgument(
156         "In-place memtable updates (inplace_update_support) is not compatible "
157         "with concurrent writes (allow_concurrent_memtable_write)");
158   }
159   if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
160     return Status::InvalidArgument(
161         "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
162   }
163   return Status::OK();
164 }
165 
CheckCFPathsSupported(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)166 Status CheckCFPathsSupported(const DBOptions& db_options,
167                              const ColumnFamilyOptions& cf_options) {
168   // More than one cf_paths are supported only in universal
169   // and level compaction styles. This function also checks the case
170   // in which cf_paths is not specified, which results in db_paths
171   // being used.
172   if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
173       (cf_options.compaction_style != kCompactionStyleLevel)) {
174     if (cf_options.cf_paths.size() > 1) {
175       return Status::NotSupported(
176           "More than one CF paths are only supported in "
177           "universal and level compaction styles. ");
178     } else if (cf_options.cf_paths.empty() &&
179                db_options.db_paths.size() > 1) {
180       return Status::NotSupported(
181           "More than one DB paths are only supported in "
182           "universal and level compaction styles. ");
183     }
184   }
185   return Status::OK();
186 }
187 
188 namespace {
189 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
190 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
191 };  // namespace
192 
SanitizeOptions(const ImmutableDBOptions & db_options,const ColumnFamilyOptions & src)193 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
194                                     const ColumnFamilyOptions& src) {
195   ColumnFamilyOptions result = src;
196   size_t clamp_max = std::conditional<
197       sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
198       std::integral_constant<uint64_t, 64ull << 30>>::type::value;
199   ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
200   // if user sets arena_block_size, we trust user to use this value. Otherwise,
201   // calculate a proper value from writer_buffer_size;
202   if (result.arena_block_size <= 0) {
203     result.arena_block_size = result.write_buffer_size / 8;
204 
205     // Align up to 4k
206     const size_t align = 4 * 1024;
207     result.arena_block_size =
208         ((result.arena_block_size + align - 1) / align) * align;
209   }
210   result.min_write_buffer_number_to_merge =
211       std::min(result.min_write_buffer_number_to_merge,
212                result.max_write_buffer_number - 1);
213   if (result.min_write_buffer_number_to_merge < 1) {
214     result.min_write_buffer_number_to_merge = 1;
215   }
216 
217   if (result.num_levels < 1) {
218     result.num_levels = 1;
219   }
220   if (result.compaction_style == kCompactionStyleLevel &&
221       result.num_levels < 2) {
222     result.num_levels = 2;
223   }
224 
225   if (result.compaction_style == kCompactionStyleUniversal &&
226       db_options.allow_ingest_behind && result.num_levels < 3) {
227     result.num_levels = 3;
228   }
229 
230   if (result.max_write_buffer_number < 2) {
231     result.max_write_buffer_number = 2;
232   }
233   // fall back max_write_buffer_number_to_maintain if
234   // max_write_buffer_size_to_maintain is not set
235   if (result.max_write_buffer_size_to_maintain < 0) {
236     result.max_write_buffer_size_to_maintain =
237         result.max_write_buffer_number *
238         static_cast<int64_t>(result.write_buffer_size);
239   } else if (result.max_write_buffer_size_to_maintain == 0 &&
240              result.max_write_buffer_number_to_maintain < 0) {
241     result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
242   }
243   // bloom filter size shouldn't exceed 1/4 of memtable size.
244   if (result.memtable_prefix_bloom_size_ratio > 0.25) {
245     result.memtable_prefix_bloom_size_ratio = 0.25;
246   } else if (result.memtable_prefix_bloom_size_ratio < 0) {
247     result.memtable_prefix_bloom_size_ratio = 0;
248   }
249 
250   if (!result.prefix_extractor) {
251     assert(result.memtable_factory);
252     Slice name = result.memtable_factory->Name();
253     if (name.compare("HashSkipListRepFactory") == 0 ||
254         name.compare("HashLinkListRepFactory") == 0) {
255       result.memtable_factory = std::make_shared<SkipListFactory>();
256     }
257   }
258 
259   if (result.compaction_style == kCompactionStyleFIFO) {
260     result.num_levels = 1;
261     // since we delete level0 files in FIFO compaction when there are too many
262     // of them, these options don't really mean anything
263     result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
264     result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
265   }
266 
267   if (result.max_bytes_for_level_multiplier <= 0) {
268     result.max_bytes_for_level_multiplier = 1;
269   }
270 
271   if (result.level0_file_num_compaction_trigger == 0) {
272     ROCKS_LOG_WARN(db_options.info_log.get(),
273                    "level0_file_num_compaction_trigger cannot be 0");
274     result.level0_file_num_compaction_trigger = 1;
275   }
276 
277   if (result.level0_stop_writes_trigger <
278           result.level0_slowdown_writes_trigger ||
279       result.level0_slowdown_writes_trigger <
280           result.level0_file_num_compaction_trigger) {
281     ROCKS_LOG_WARN(db_options.info_log.get(),
282                    "This condition must be satisfied: "
283                    "level0_stop_writes_trigger(%d) >= "
284                    "level0_slowdown_writes_trigger(%d) >= "
285                    "level0_file_num_compaction_trigger(%d)",
286                    result.level0_stop_writes_trigger,
287                    result.level0_slowdown_writes_trigger,
288                    result.level0_file_num_compaction_trigger);
289     if (result.level0_slowdown_writes_trigger <
290         result.level0_file_num_compaction_trigger) {
291       result.level0_slowdown_writes_trigger =
292           result.level0_file_num_compaction_trigger;
293     }
294     if (result.level0_stop_writes_trigger <
295         result.level0_slowdown_writes_trigger) {
296       result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
297     }
298     ROCKS_LOG_WARN(db_options.info_log.get(),
299                    "Adjust the value to "
300                    "level0_stop_writes_trigger(%d)"
301                    "level0_slowdown_writes_trigger(%d)"
302                    "level0_file_num_compaction_trigger(%d)",
303                    result.level0_stop_writes_trigger,
304                    result.level0_slowdown_writes_trigger,
305                    result.level0_file_num_compaction_trigger);
306   }
307 
308   if (result.soft_pending_compaction_bytes_limit == 0) {
309     result.soft_pending_compaction_bytes_limit =
310         result.hard_pending_compaction_bytes_limit;
311   } else if (result.hard_pending_compaction_bytes_limit > 0 &&
312              result.soft_pending_compaction_bytes_limit >
313                  result.hard_pending_compaction_bytes_limit) {
314     result.soft_pending_compaction_bytes_limit =
315         result.hard_pending_compaction_bytes_limit;
316   }
317 
318 #ifndef ROCKSDB_LITE
319   // When the DB is stopped, it's possible that there are some .trash files that
320   // were not deleted yet, when we open the DB we will find these .trash files
321   // and schedule them to be deleted (or delete immediately if SstFileManager
322   // was not used)
323   auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
324   for (size_t i = 0; i < result.cf_paths.size(); i++) {
325     DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
326   }
327 #endif
328 
329   if (result.cf_paths.empty()) {
330     result.cf_paths = db_options.db_paths;
331   }
332 
333   if (result.level_compaction_dynamic_level_bytes) {
334     if (result.compaction_style != kCompactionStyleLevel ||
335         result.cf_paths.size() > 1U) {
336       // 1. level_compaction_dynamic_level_bytes only makes sense for
337       //    level-based compaction.
338       // 2. we don't yet know how to make both of this feature and multiple
339       //    DB path work.
340       result.level_compaction_dynamic_level_bytes = false;
341     }
342   }
343 
344   if (result.max_compaction_bytes == 0) {
345     result.max_compaction_bytes = result.target_file_size_base * 25;
346   }
347 
348   bool is_block_based_table =
349       (result.table_factory->Name() == BlockBasedTableFactory().Name());
350 
351   const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
352   if (result.ttl == kDefaultTtl) {
353     if (is_block_based_table &&
354         result.compaction_style != kCompactionStyleFIFO) {
355       result.ttl = kAdjustedTtl;
356     } else {
357       result.ttl = 0;
358     }
359   }
360 
361   const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
362 
363   // Turn on periodic compactions and set them to occur once every 30 days if
364   // compaction filters are used and periodic_compaction_seconds is set to the
365   // default value.
366   if (result.compaction_style != kCompactionStyleFIFO) {
367     if ((result.compaction_filter != nullptr ||
368          result.compaction_filter_factory != nullptr) &&
369         result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
370         is_block_based_table) {
371       result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
372     }
373   } else {
374     // result.compaction_style == kCompactionStyleFIFO
375     if (result.ttl == 0) {
376       if (is_block_based_table) {
377         if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
378           result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
379         }
380         result.ttl = result.periodic_compaction_seconds;
381       }
382     } else if (result.periodic_compaction_seconds != 0) {
383       result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
384     }
385   }
386 
387   // TTL compactions would work similar to Periodic Compactions in Universal in
388   // most of the cases. So, if ttl is set, execute the periodic compaction
389   // codepath.
390   if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
391     if (result.periodic_compaction_seconds != 0) {
392       result.periodic_compaction_seconds =
393           std::min(result.ttl, result.periodic_compaction_seconds);
394     } else {
395       result.periodic_compaction_seconds = result.ttl;
396     }
397   }
398 
399   if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
400     result.periodic_compaction_seconds = 0;
401   }
402 
403   return result;
404 }
405 
406 int SuperVersion::dummy = 0;
407 void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
408 void* const SuperVersion::kSVObsolete = nullptr;
409 
~SuperVersion()410 SuperVersion::~SuperVersion() {
411   for (auto td : to_delete) {
412     delete td;
413   }
414 }
415 
Ref()416 SuperVersion* SuperVersion::Ref() {
417   refs.fetch_add(1, std::memory_order_relaxed);
418   return this;
419 }
420 
Unref()421 bool SuperVersion::Unref() {
422   // fetch_sub returns the previous value of ref
423   uint32_t previous_refs = refs.fetch_sub(1);
424   assert(previous_refs > 0);
425   return previous_refs == 1;
426 }
427 
Cleanup()428 void SuperVersion::Cleanup() {
429   assert(refs.load(std::memory_order_relaxed) == 0);
430   imm->Unref(&to_delete);
431   MemTable* m = mem->Unref();
432   if (m != nullptr) {
433     auto* memory_usage = current->cfd()->imm()->current_memory_usage();
434     assert(*memory_usage >= m->ApproximateMemoryUsage());
435     *memory_usage -= m->ApproximateMemoryUsage();
436     to_delete.push_back(m);
437   }
438   current->Unref();
439   if (cfd->Unref()) {
440     delete cfd;
441   }
442 }
443 
Init(ColumnFamilyData * new_cfd,MemTable * new_mem,MemTableListVersion * new_imm,Version * new_current)444 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
445                         MemTableListVersion* new_imm, Version* new_current) {
446   cfd = new_cfd;
447   mem = new_mem;
448   imm = new_imm;
449   current = new_current;
450   cfd->Ref();
451   mem->Ref();
452   imm->Ref();
453   current->Ref();
454   refs.store(1, std::memory_order_relaxed);
455 }
456 
457 namespace {
SuperVersionUnrefHandle(void * ptr)458 void SuperVersionUnrefHandle(void* ptr) {
459   // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
460   // destroyed. When former happens, the thread shouldn't see kSVInUse.
461   // When latter happens, we are in ~ColumnFamilyData(), no get should happen as
462   // well.
463   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
464   bool was_last_ref __attribute__((__unused__));
465   was_last_ref = sv->Unref();
466   // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
467   // This is important because we can't do SuperVersion cleanup here.
468   // That would require locking DB mutex, which would deadlock because
469   // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
470   assert(!was_last_ref);
471 }
472 }  // anonymous namespace
473 
ColumnFamilyData(uint32_t id,const std::string & name,Version * _dummy_versions,Cache * _table_cache,WriteBufferManager * write_buffer_manager,const ColumnFamilyOptions & cf_options,const ImmutableDBOptions & db_options,const FileOptions & file_options,ColumnFamilySet * column_family_set,BlockCacheTracer * const block_cache_tracer)474 ColumnFamilyData::ColumnFamilyData(
475     uint32_t id, const std::string& name, Version* _dummy_versions,
476     Cache* _table_cache, WriteBufferManager* write_buffer_manager,
477     const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
478     const FileOptions& file_options, ColumnFamilySet* column_family_set,
479     BlockCacheTracer* const block_cache_tracer)
480     : id_(id),
481       name_(name),
482       dummy_versions_(_dummy_versions),
483       current_(nullptr),
484       refs_(0),
485       initialized_(false),
486       dropped_(false),
487       internal_comparator_(cf_options.comparator),
488       initial_cf_options_(SanitizeOptions(db_options, cf_options)),
489       ioptions_(db_options, initial_cf_options_),
490       mutable_cf_options_(initial_cf_options_),
491       is_delete_range_supported_(
492           cf_options.table_factory->IsDeleteRangeSupported()),
493       write_buffer_manager_(write_buffer_manager),
494       mem_(nullptr),
495       imm_(ioptions_.min_write_buffer_number_to_merge,
496            ioptions_.max_write_buffer_number_to_maintain,
497            ioptions_.max_write_buffer_size_to_maintain),
498       super_version_(nullptr),
499       super_version_number_(0),
500       local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
501       next_(nullptr),
502       prev_(nullptr),
503       log_number_(0),
504       flush_reason_(FlushReason::kOthers),
505       column_family_set_(column_family_set),
506       queued_for_flush_(false),
507       queued_for_compaction_(false),
508       prev_compaction_needed_bytes_(0),
509       allow_2pc_(db_options.allow_2pc),
510       last_memtable_id_(0) {
511   Ref();
512 
513   // Convert user defined table properties collector factories to internal ones.
514   GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
515 
516   // if _dummy_versions is nullptr, then this is a dummy column family.
517   if (_dummy_versions != nullptr) {
518     internal_stats_.reset(
519         new InternalStats(ioptions_.num_levels, db_options.env, this));
520     table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
521                                       block_cache_tracer));
522     if (ioptions_.compaction_style == kCompactionStyleLevel) {
523       compaction_picker_.reset(
524           new LevelCompactionPicker(ioptions_, &internal_comparator_));
525 #ifndef ROCKSDB_LITE
526     } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
527       compaction_picker_.reset(
528           new UniversalCompactionPicker(ioptions_, &internal_comparator_));
529     } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
530       compaction_picker_.reset(
531           new FIFOCompactionPicker(ioptions_, &internal_comparator_));
532     } else if (ioptions_.compaction_style == kCompactionStyleNone) {
533       compaction_picker_.reset(new NullCompactionPicker(
534           ioptions_, &internal_comparator_));
535       ROCKS_LOG_WARN(ioptions_.info_log,
536                      "Column family %s does not use any background compaction. "
537                      "Compactions can only be done via CompactFiles\n",
538                      GetName().c_str());
539 #endif  // !ROCKSDB_LITE
540     } else {
541       ROCKS_LOG_ERROR(ioptions_.info_log,
542                       "Unable to recognize the specified compaction style %d. "
543                       "Column family %s will use kCompactionStyleLevel.\n",
544                       ioptions_.compaction_style, GetName().c_str());
545       compaction_picker_.reset(
546           new LevelCompactionPicker(ioptions_, &internal_comparator_));
547     }
548 
549     if (column_family_set_->NumberOfColumnFamilies() < 10) {
550       ROCKS_LOG_INFO(ioptions_.info_log,
551                      "--------------- Options for column family [%s]:\n",
552                      name.c_str());
553       initial_cf_options_.Dump(ioptions_.info_log);
554     } else {
555       ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
556     }
557   }
558 
559   RecalculateWriteStallConditions(mutable_cf_options_);
560 }
561 
562 // DB mutex held
~ColumnFamilyData()563 ColumnFamilyData::~ColumnFamilyData() {
564   assert(refs_.load(std::memory_order_relaxed) == 0);
565   // remove from linked list
566   auto prev = prev_;
567   auto next = next_;
568   prev->next_ = next;
569   next->prev_ = prev;
570 
571   if (!dropped_ && column_family_set_ != nullptr) {
572     // If it's dropped, it's already removed from column family set
573     // If column_family_set_ == nullptr, this is dummy CFD and not in
574     // ColumnFamilySet
575     column_family_set_->RemoveColumnFamily(this);
576   }
577 
578   if (current_ != nullptr) {
579     current_->Unref();
580   }
581 
582   // It would be wrong if this ColumnFamilyData is in flush_queue_ or
583   // compaction_queue_ and we destroyed it
584   assert(!queued_for_flush_);
585   assert(!queued_for_compaction_);
586   assert(super_version_ == nullptr);
587 
588   if (dummy_versions_ != nullptr) {
589     // List must be empty
590     assert(dummy_versions_->TEST_Next() == dummy_versions_);
591     bool deleted __attribute__((__unused__));
592     deleted = dummy_versions_->Unref();
593     assert(deleted);
594   }
595 
596   if (mem_ != nullptr) {
597     delete mem_->Unref();
598   }
599   autovector<MemTable*> to_delete;
600   imm_.current()->Unref(&to_delete);
601   for (MemTable* m : to_delete) {
602     delete m;
603   }
604 }
605 
UnrefAndTryDelete()606 bool ColumnFamilyData::UnrefAndTryDelete() {
607   int old_refs = refs_.fetch_sub(1);
608   assert(old_refs > 0);
609 
610   if (old_refs == 1) {
611     assert(super_version_ == nullptr);
612     delete this;
613     return true;
614   }
615 
616   if (old_refs == 2 && super_version_ != nullptr) {
617     // Only the super_version_ holds me
618     SuperVersion* sv = super_version_;
619     super_version_ = nullptr;
620     // Release SuperVersion reference kept in ThreadLocalPtr.
621     // This must be done outside of mutex_ since unref handler can lock mutex.
622     sv->db_mutex->Unlock();
623     local_sv_.reset();
624     sv->db_mutex->Lock();
625 
626     if (sv->Unref()) {
627       // May delete this ColumnFamilyData after calling Cleanup()
628       sv->Cleanup();
629       delete sv;
630       return true;
631     }
632   }
633   return false;
634 }
635 
SetDropped()636 void ColumnFamilyData::SetDropped() {
637   // can't drop default CF
638   assert(id_ != 0);
639   dropped_ = true;
640   write_controller_token_.reset();
641 
642   // remove from column_family_set
643   column_family_set_->RemoveColumnFamily(this);
644 }
645 
GetLatestCFOptions() const646 ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
647   return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
648 }
649 
OldestLogToKeep()650 uint64_t ColumnFamilyData::OldestLogToKeep() {
651   auto current_log = GetLogNumber();
652 
653   if (allow_2pc_) {
654     autovector<MemTable*> empty_list;
655     auto imm_prep_log =
656         imm()->PrecomputeMinLogContainingPrepSection(empty_list);
657     auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
658 
659     if (imm_prep_log > 0 && imm_prep_log < current_log) {
660       current_log = imm_prep_log;
661     }
662 
663     if (mem_prep_log > 0 && mem_prep_log < current_log) {
664       current_log = mem_prep_log;
665     }
666   }
667 
668   return current_log;
669 }
670 
671 const double kIncSlowdownRatio = 0.8;
672 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
673 const double kNearStopSlowdownRatio = 0.6;
674 const double kDelayRecoverSlowdownRatio = 1.4;
675 
676 namespace {
677 // If penalize_stop is true, we further reduce slowdown rate.
SetupDelay(WriteController * write_controller,uint64_t compaction_needed_bytes,uint64_t prev_compaction_need_bytes,bool penalize_stop,bool auto_comapctions_disabled)678 std::unique_ptr<WriteControllerToken> SetupDelay(
679     WriteController* write_controller, uint64_t compaction_needed_bytes,
680     uint64_t prev_compaction_need_bytes, bool penalize_stop,
681     bool auto_comapctions_disabled) {
682   const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
683 
684   uint64_t max_write_rate = write_controller->max_delayed_write_rate();
685   uint64_t write_rate = write_controller->delayed_write_rate();
686 
687   if (auto_comapctions_disabled) {
688     // When auto compaction is disabled, always use the value user gave.
689     write_rate = max_write_rate;
690   } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
691     // If user gives rate less than kMinWriteRate, don't adjust it.
692     //
693     // If already delayed, need to adjust based on previous compaction debt.
694     // When there are two or more column families require delay, we always
695     // increase or reduce write rate based on information for one single
696     // column family. It is likely to be OK but we can improve if there is a
697     // problem.
698     // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
699     // is only available in level-based compaction
700     //
701     // If the compaction debt stays the same as previously, we also further slow
702     // down. It usually means a mem table is full. It's mainly for the case
703     // where both of flush and compaction are much slower than the speed we
704     // insert to mem tables, so we need to actively slow down before we get
705     // feedback signal from compaction and flushes to avoid the full stop
706     // because of hitting the max write buffer number.
707     //
708     // If DB just falled into the stop condition, we need to further reduce
709     // the write rate to avoid the stop condition.
710     if (penalize_stop) {
711       // Penalize the near stop or stop condition by more aggressive slowdown.
712       // This is to provide the long term slowdown increase signal.
713       // The penalty is more than the reward of recovering to the normal
714       // condition.
715       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
716                                          kNearStopSlowdownRatio);
717       if (write_rate < kMinWriteRate) {
718         write_rate = kMinWriteRate;
719       }
720     } else if (prev_compaction_need_bytes > 0 &&
721                prev_compaction_need_bytes <= compaction_needed_bytes) {
722       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
723                                          kIncSlowdownRatio);
724       if (write_rate < kMinWriteRate) {
725         write_rate = kMinWriteRate;
726       }
727     } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
728       // We are speeding up by ratio of kSlowdownRatio when we have paid
729       // compaction debt. But we'll never speed up to faster than the write rate
730       // given by users.
731       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
732                                          kDecSlowdownRatio);
733       if (write_rate > max_write_rate) {
734         write_rate = max_write_rate;
735       }
736     }
737   }
738   return write_controller->GetDelayToken(write_rate);
739 }
740 
GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,int level0_slowdown_writes_trigger)741 int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
742                                     int level0_slowdown_writes_trigger) {
743   // SanitizeOptions() ensures it.
744   assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
745 
746   if (level0_file_num_compaction_trigger < 0) {
747     return std::numeric_limits<int>::max();
748   }
749 
750   const int64_t twice_level0_trigger =
751       static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
752 
753   const int64_t one_fourth_trigger_slowdown =
754       static_cast<int64_t>(level0_file_num_compaction_trigger) +
755       ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
756        4);
757 
758   assert(twice_level0_trigger >= 0);
759   assert(one_fourth_trigger_slowdown >= 0);
760 
761   // 1/4 of the way between L0 compaction trigger threshold and slowdown
762   // condition.
763   // Or twice as compaction trigger, if it is smaller.
764   int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
765   if (res >= port::kMaxInt32) {
766     return port::kMaxInt32;
767   } else {
768     // res fits in int
769     return static_cast<int>(res);
770   }
771 }
772 }  // namespace
773 
774 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
GetWriteStallConditionAndCause(int num_unflushed_memtables,int num_l0_files,uint64_t num_compaction_needed_bytes,const MutableCFOptions & mutable_cf_options)775 ColumnFamilyData::GetWriteStallConditionAndCause(
776     int num_unflushed_memtables, int num_l0_files,
777     uint64_t num_compaction_needed_bytes,
778     const MutableCFOptions& mutable_cf_options) {
779   if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
780     return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
781   } else if (!mutable_cf_options.disable_auto_compactions &&
782              num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
783     return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
784   } else if (!mutable_cf_options.disable_auto_compactions &&
785              mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
786              num_compaction_needed_bytes >=
787                  mutable_cf_options.hard_pending_compaction_bytes_limit) {
788     return {WriteStallCondition::kStopped,
789             WriteStallCause::kPendingCompactionBytes};
790   } else if (mutable_cf_options.max_write_buffer_number > 3 &&
791              num_unflushed_memtables >=
792                  mutable_cf_options.max_write_buffer_number - 1) {
793     return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
794   } else if (!mutable_cf_options.disable_auto_compactions &&
795              mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
796              num_l0_files >=
797                  mutable_cf_options.level0_slowdown_writes_trigger) {
798     return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
799   } else if (!mutable_cf_options.disable_auto_compactions &&
800              mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
801              num_compaction_needed_bytes >=
802                  mutable_cf_options.soft_pending_compaction_bytes_limit) {
803     return {WriteStallCondition::kDelayed,
804             WriteStallCause::kPendingCompactionBytes};
805   }
806   return {WriteStallCondition::kNormal, WriteStallCause::kNone};
807 }
808 
RecalculateWriteStallConditions(const MutableCFOptions & mutable_cf_options)809 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
810       const MutableCFOptions& mutable_cf_options) {
811   auto write_stall_condition = WriteStallCondition::kNormal;
812   if (current_ != nullptr) {
813     auto* vstorage = current_->storage_info();
814     auto write_controller = column_family_set_->write_controller_;
815     uint64_t compaction_needed_bytes =
816         vstorage->estimated_compaction_needed_bytes();
817 
818     auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
819         imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
820         vstorage->estimated_compaction_needed_bytes(), mutable_cf_options);
821     write_stall_condition = write_stall_condition_and_cause.first;
822     auto write_stall_cause = write_stall_condition_and_cause.second;
823 
824     bool was_stopped = write_controller->IsStopped();
825     bool needed_delay = write_controller->NeedsDelay();
826 
827     if (write_stall_condition == WriteStallCondition::kStopped &&
828         write_stall_cause == WriteStallCause::kMemtableLimit) {
829       write_controller_token_ = write_controller->GetStopToken();
830       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
831       ROCKS_LOG_WARN(
832           ioptions_.info_log,
833           "[%s] Stopping writes because we have %d immutable memtables "
834           "(waiting for flush), max_write_buffer_number is set to %d",
835           name_.c_str(), imm()->NumNotFlushed(),
836           mutable_cf_options.max_write_buffer_number);
837     } else if (write_stall_condition == WriteStallCondition::kStopped &&
838                write_stall_cause == WriteStallCause::kL0FileCountLimit) {
839       write_controller_token_ = write_controller->GetStopToken();
840       internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
841       if (compaction_picker_->IsLevel0CompactionInProgress()) {
842         internal_stats_->AddCFStats(
843             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
844       }
845       ROCKS_LOG_WARN(ioptions_.info_log,
846                      "[%s] Stopping writes because we have %d level-0 files",
847                      name_.c_str(), vstorage->l0_delay_trigger_count());
848     } else if (write_stall_condition == WriteStallCondition::kStopped &&
849                write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
850       write_controller_token_ = write_controller->GetStopToken();
851       internal_stats_->AddCFStats(
852           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
853       ROCKS_LOG_WARN(
854           ioptions_.info_log,
855           "[%s] Stopping writes because of estimated pending compaction "
856           "bytes %" PRIu64,
857           name_.c_str(), compaction_needed_bytes);
858     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
859                write_stall_cause == WriteStallCause::kMemtableLimit) {
860       write_controller_token_ =
861           SetupDelay(write_controller, compaction_needed_bytes,
862                      prev_compaction_needed_bytes_, was_stopped,
863                      mutable_cf_options.disable_auto_compactions);
864       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
865       ROCKS_LOG_WARN(
866           ioptions_.info_log,
867           "[%s] Stalling writes because we have %d immutable memtables "
868           "(waiting for flush), max_write_buffer_number is set to %d "
869           "rate %" PRIu64,
870           name_.c_str(), imm()->NumNotFlushed(),
871           mutable_cf_options.max_write_buffer_number,
872           write_controller->delayed_write_rate());
873     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
874                write_stall_cause == WriteStallCause::kL0FileCountLimit) {
875       // L0 is the last two files from stopping.
876       bool near_stop = vstorage->l0_delay_trigger_count() >=
877                        mutable_cf_options.level0_stop_writes_trigger - 2;
878       write_controller_token_ =
879           SetupDelay(write_controller, compaction_needed_bytes,
880                      prev_compaction_needed_bytes_, was_stopped || near_stop,
881                      mutable_cf_options.disable_auto_compactions);
882       internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
883                                   1);
884       if (compaction_picker_->IsLevel0CompactionInProgress()) {
885         internal_stats_->AddCFStats(
886             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
887       }
888       ROCKS_LOG_WARN(ioptions_.info_log,
889                      "[%s] Stalling writes because we have %d level-0 files "
890                      "rate %" PRIu64,
891                      name_.c_str(), vstorage->l0_delay_trigger_count(),
892                      write_controller->delayed_write_rate());
893     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
894                write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
895       // If the distance to hard limit is less than 1/4 of the gap between soft
896       // and
897       // hard bytes limit, we think it is near stop and speed up the slowdown.
898       bool near_stop =
899           mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
900           (compaction_needed_bytes -
901            mutable_cf_options.soft_pending_compaction_bytes_limit) >
902               3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
903                    mutable_cf_options.soft_pending_compaction_bytes_limit) /
904                   4;
905 
906       write_controller_token_ =
907           SetupDelay(write_controller, compaction_needed_bytes,
908                      prev_compaction_needed_bytes_, was_stopped || near_stop,
909                      mutable_cf_options.disable_auto_compactions);
910       internal_stats_->AddCFStats(
911           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
912       ROCKS_LOG_WARN(
913           ioptions_.info_log,
914           "[%s] Stalling writes because of estimated pending compaction "
915           "bytes %" PRIu64 " rate %" PRIu64,
916           name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
917           write_controller->delayed_write_rate());
918     } else {
919       assert(write_stall_condition == WriteStallCondition::kNormal);
920       if (vstorage->l0_delay_trigger_count() >=
921           GetL0ThresholdSpeedupCompaction(
922               mutable_cf_options.level0_file_num_compaction_trigger,
923               mutable_cf_options.level0_slowdown_writes_trigger)) {
924         write_controller_token_ =
925             write_controller->GetCompactionPressureToken();
926         ROCKS_LOG_INFO(
927             ioptions_.info_log,
928             "[%s] Increasing compaction threads because we have %d level-0 "
929             "files ",
930             name_.c_str(), vstorage->l0_delay_trigger_count());
931       } else if (vstorage->estimated_compaction_needed_bytes() >=
932                  mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
933         // Increase compaction threads if bytes needed for compaction exceeds
934         // 1/4 of threshold for slowing down.
935         // If soft pending compaction byte limit is not set, always speed up
936         // compaction.
937         write_controller_token_ =
938             write_controller->GetCompactionPressureToken();
939         if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
940           ROCKS_LOG_INFO(
941               ioptions_.info_log,
942               "[%s] Increasing compaction threads because of estimated pending "
943               "compaction "
944               "bytes %" PRIu64,
945               name_.c_str(), vstorage->estimated_compaction_needed_bytes());
946         }
947       } else {
948         write_controller_token_.reset();
949       }
950       // If the DB recovers from delay conditions, we reward with reducing
951       // double the slowdown ratio. This is to balance the long term slowdown
952       // increase signal.
953       if (needed_delay) {
954         uint64_t write_rate = write_controller->delayed_write_rate();
955         write_controller->set_delayed_write_rate(static_cast<uint64_t>(
956             static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
957         // Set the low pri limit to be 1/4 the delayed write rate.
958         // Note we don't reset this value even after delay condition is relased.
959         // Low-pri rate will continue to apply if there is a compaction
960         // pressure.
961         write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
962                                                                     4);
963       }
964     }
965     prev_compaction_needed_bytes_ = compaction_needed_bytes;
966   }
967   return write_stall_condition;
968 }
969 
soptions() const970 const FileOptions* ColumnFamilyData::soptions() const {
971   return &(column_family_set_->file_options_);
972 }
973 
SetCurrent(Version * current_version)974 void ColumnFamilyData::SetCurrent(Version* current_version) {
975   current_ = current_version;
976 }
977 
GetNumLiveVersions() const978 uint64_t ColumnFamilyData::GetNumLiveVersions() const {
979   return VersionSet::GetNumLiveVersions(dummy_versions_);
980 }
981 
GetTotalSstFilesSize() const982 uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
983   return VersionSet::GetTotalSstFilesSize(dummy_versions_);
984 }
985 
GetLiveSstFilesSize() const986 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
987   return current_->GetSstFilesSize();
988 }
989 
ConstructNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)990 MemTable* ColumnFamilyData::ConstructNewMemtable(
991     const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
992   return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
993                       write_buffer_manager_, earliest_seq, id_);
994 }
995 
CreateNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)996 void ColumnFamilyData::CreateNewMemtable(
997     const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
998   if (mem_ != nullptr) {
999     delete mem_->Unref();
1000   }
1001   SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1002   mem_->Ref();
1003 }
1004 
NeedsCompaction() const1005 bool ColumnFamilyData::NeedsCompaction() const {
1006   return compaction_picker_->NeedsCompaction(current_->storage_info());
1007 }
1008 
PickCompaction(const MutableCFOptions & mutable_options,LogBuffer * log_buffer)1009 Compaction* ColumnFamilyData::PickCompaction(
1010     const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
1011   SequenceNumber earliest_mem_seqno =
1012       std::min(mem_->GetEarliestSequenceNumber(),
1013                imm_.current()->GetEarliestSequenceNumber(false));
1014   auto* result = compaction_picker_->PickCompaction(
1015       GetName(), mutable_options, current_->storage_info(), log_buffer,
1016       earliest_mem_seqno);
1017   if (result != nullptr) {
1018     result->SetInputVersion(current_);
1019   }
1020   return result;
1021 }
1022 
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const1023 bool ColumnFamilyData::RangeOverlapWithCompaction(
1024     const Slice& smallest_user_key, const Slice& largest_user_key,
1025     int level) const {
1026   return compaction_picker_->RangeOverlapWithCompaction(
1027       smallest_user_key, largest_user_key, level);
1028 }
1029 
RangesOverlapWithMemtables(const autovector<Range> & ranges,SuperVersion * super_version,bool * overlap)1030 Status ColumnFamilyData::RangesOverlapWithMemtables(
1031     const autovector<Range>& ranges, SuperVersion* super_version,
1032     bool* overlap) {
1033   assert(overlap != nullptr);
1034   *overlap = false;
1035   // Create an InternalIterator over all unflushed memtables
1036   Arena arena;
1037   ReadOptions read_opts;
1038   read_opts.total_order_seek = true;
1039   MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1040   merge_iter_builder.AddIterator(
1041       super_version->mem->NewIterator(read_opts, &arena));
1042   super_version->imm->AddIterators(read_opts, &merge_iter_builder);
1043   ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1044 
1045   auto read_seq = super_version->current->version_set()->LastSequence();
1046   ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1047   auto* active_range_del_iter =
1048       super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
1049   range_del_agg.AddTombstones(
1050       std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1051   super_version->imm->AddRangeTombstoneIterators(read_opts, nullptr /* arena */,
1052                                                  &range_del_agg);
1053 
1054   Status status;
1055   for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1056     auto* vstorage = super_version->current->storage_info();
1057     auto* ucmp = vstorage->InternalComparator()->user_comparator();
1058     InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1059                             kValueTypeForSeek);
1060     memtable_iter->Seek(range_start.Encode());
1061     status = memtable_iter->status();
1062     ParsedInternalKey seek_result;
1063     if (status.ok()) {
1064       if (memtable_iter->Valid() &&
1065           !ParseInternalKey(memtable_iter->key(), &seek_result)) {
1066         status = Status::Corruption("DB have corrupted keys");
1067       }
1068     }
1069     if (status.ok()) {
1070       if (memtable_iter->Valid() &&
1071           ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1072         *overlap = true;
1073       } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1074                                                  ranges[i].limit)) {
1075         *overlap = true;
1076       }
1077     }
1078   }
1079   return status;
1080 }
1081 
1082 const int ColumnFamilyData::kCompactAllLevels = -1;
1083 const int ColumnFamilyData::kCompactToBaseLevel = -2;
1084 
CompactRange(const MutableCFOptions & mutable_cf_options,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * conflict,uint64_t max_file_num_to_ignore)1085 Compaction* ColumnFamilyData::CompactRange(
1086     const MutableCFOptions& mutable_cf_options, int input_level,
1087     int output_level, const CompactRangeOptions& compact_range_options,
1088     const InternalKey* begin, const InternalKey* end,
1089     InternalKey** compaction_end, bool* conflict,
1090     uint64_t max_file_num_to_ignore) {
1091   auto* result = compaction_picker_->CompactRange(
1092       GetName(), mutable_cf_options, current_->storage_info(), input_level,
1093       output_level, compact_range_options, begin, end, compaction_end, conflict,
1094       max_file_num_to_ignore);
1095   if (result != nullptr) {
1096     result->SetInputVersion(current_);
1097   }
1098   return result;
1099 }
1100 
GetReferencedSuperVersion(DBImpl * db)1101 SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1102   SuperVersion* sv = GetThreadLocalSuperVersion(db);
1103   sv->Ref();
1104   if (!ReturnThreadLocalSuperVersion(sv)) {
1105     // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1106     // when the thread-local pointer was populated. So, the Ref() earlier in
1107     // this function still prevents the returned SuperVersion* from being
1108     // deleted out from under the caller.
1109     sv->Unref();
1110   }
1111   return sv;
1112 }
1113 
GetThreadLocalSuperVersion(DBImpl * db)1114 SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1115   // The SuperVersion is cached in thread local storage to avoid acquiring
1116   // mutex when SuperVersion does not change since the last use. When a new
1117   // SuperVersion is installed, the compaction or flush thread cleans up
1118   // cached SuperVersion in all existing thread local storage. To avoid
1119   // acquiring mutex for this operation, we use atomic Swap() on the thread
1120   // local pointer to guarantee exclusive access. If the thread local pointer
1121   // is being used while a new SuperVersion is installed, the cached
1122   // SuperVersion can become stale. In that case, the background thread would
1123   // have swapped in kSVObsolete. We re-check the value at when returning
1124   // SuperVersion back to thread local, with an atomic compare and swap.
1125   // The superversion will need to be released if detected to be stale.
1126   void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1127   // Invariant:
1128   // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1129   // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1130   // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1131   // (if no Scrape happens).
1132   assert(ptr != SuperVersion::kSVInUse);
1133   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1134   if (sv == SuperVersion::kSVObsolete ||
1135       sv->version_number != super_version_number_.load()) {
1136     RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
1137     SuperVersion* sv_to_delete = nullptr;
1138 
1139     if (sv && sv->Unref()) {
1140       RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
1141       db->mutex()->Lock();
1142       // NOTE: underlying resources held by superversion (sst files) might
1143       // not be released until the next background job.
1144       sv->Cleanup();
1145       if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1146         db->AddSuperVersionsToFreeQueue(sv);
1147         db->SchedulePurge();
1148       } else {
1149         sv_to_delete = sv;
1150       }
1151     } else {
1152       db->mutex()->Lock();
1153     }
1154     sv = super_version_->Ref();
1155     db->mutex()->Unlock();
1156 
1157     delete sv_to_delete;
1158   }
1159   assert(sv != nullptr);
1160   return sv;
1161 }
1162 
ReturnThreadLocalSuperVersion(SuperVersion * sv)1163 bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1164   assert(sv != nullptr);
1165   // Put the SuperVersion back
1166   void* expected = SuperVersion::kSVInUse;
1167   if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1168     // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1169     // storage has not been altered and no Scrape has happened. The
1170     // SuperVersion is still current.
1171     return true;
1172   } else {
1173     // ThreadLocal scrape happened in the process of this GetImpl call (after
1174     // thread local Swap() at the beginning and before CompareAndSwap()).
1175     // This means the SuperVersion it holds is obsolete.
1176     assert(expected == SuperVersion::kSVObsolete);
1177   }
1178   return false;
1179 }
1180 
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex)1181 void ColumnFamilyData::InstallSuperVersion(
1182     SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1183   db_mutex->AssertHeld();
1184   return InstallSuperVersion(sv_context, db_mutex, mutable_cf_options_);
1185 }
1186 
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex,const MutableCFOptions & mutable_cf_options)1187 void ColumnFamilyData::InstallSuperVersion(
1188     SuperVersionContext* sv_context, InstrumentedMutex* db_mutex,
1189     const MutableCFOptions& mutable_cf_options) {
1190   SuperVersion* new_superversion = sv_context->new_superversion.release();
1191   new_superversion->db_mutex = db_mutex;
1192   new_superversion->mutable_cf_options = mutable_cf_options;
1193   new_superversion->Init(this, mem_, imm_.current(), current_);
1194   SuperVersion* old_superversion = super_version_;
1195   super_version_ = new_superversion;
1196   ++super_version_number_;
1197   super_version_->version_number = super_version_number_;
1198   super_version_->write_stall_condition =
1199       RecalculateWriteStallConditions(mutable_cf_options);
1200 
1201   if (old_superversion != nullptr) {
1202     // Reset SuperVersions cached in thread local storage.
1203     // This should be done before old_superversion->Unref(). That's to ensure
1204     // that local_sv_ never holds the last reference to SuperVersion, since
1205     // it has no means to safely do SuperVersion cleanup.
1206     ResetThreadLocalSuperVersions();
1207 
1208     if (old_superversion->mutable_cf_options.write_buffer_size !=
1209         mutable_cf_options.write_buffer_size) {
1210       mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1211     }
1212     if (old_superversion->write_stall_condition !=
1213         new_superversion->write_stall_condition) {
1214       sv_context->PushWriteStallNotification(
1215           old_superversion->write_stall_condition,
1216           new_superversion->write_stall_condition, GetName(), ioptions());
1217     }
1218     if (old_superversion->Unref()) {
1219       old_superversion->Cleanup();
1220       sv_context->superversions_to_free.push_back(old_superversion);
1221     }
1222   }
1223 }
1224 
ResetThreadLocalSuperVersions()1225 void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1226   autovector<void*> sv_ptrs;
1227   local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1228   for (auto ptr : sv_ptrs) {
1229     assert(ptr);
1230     if (ptr == SuperVersion::kSVInUse) {
1231       continue;
1232     }
1233     auto sv = static_cast<SuperVersion*>(ptr);
1234     bool was_last_ref __attribute__((__unused__));
1235     was_last_ref = sv->Unref();
1236     // sv couldn't have been the last reference because
1237     // ResetThreadLocalSuperVersions() is called before
1238     // unref'ing super_version_.
1239     assert(!was_last_ref);
1240   }
1241 }
1242 
ValidateOptions(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)1243 Status ColumnFamilyData::ValidateOptions(
1244     const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1245   Status s;
1246   s = CheckCompressionSupported(cf_options);
1247   if (s.ok() && db_options.allow_concurrent_memtable_write) {
1248     s = CheckConcurrentWritesSupported(cf_options);
1249   }
1250   if (s.ok() && db_options.unordered_write &&
1251       cf_options.max_successive_merges != 0) {
1252     s = Status::InvalidArgument(
1253         "max_successive_merges > 0 is incompatible with unordered_write");
1254   }
1255   if (s.ok()) {
1256     s = CheckCFPathsSupported(db_options, cf_options);
1257   }
1258   if (!s.ok()) {
1259     return s;
1260   }
1261 
1262   if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1263     if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1264       return Status::NotSupported(
1265           "TTL is only supported in Block-Based Table format. ");
1266     }
1267   }
1268 
1269   if (cf_options.periodic_compaction_seconds > 0 &&
1270       cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1271     if (cf_options.table_factory->Name() != BlockBasedTableFactory().Name()) {
1272       return Status::NotSupported(
1273           "Periodic Compaction is only supported in "
1274           "Block-Based Table format. ");
1275     }
1276   }
1277   return s;
1278 }
1279 
1280 #ifndef ROCKSDB_LITE
SetOptions(const DBOptions & db_options,const std::unordered_map<std::string,std::string> & options_map)1281 Status ColumnFamilyData::SetOptions(
1282     const DBOptions& db_options,
1283     const std::unordered_map<std::string, std::string>& options_map) {
1284   MutableCFOptions new_mutable_cf_options;
1285   Status s =
1286       GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
1287                                    ioptions_.info_log, &new_mutable_cf_options);
1288   if (s.ok()) {
1289     ColumnFamilyOptions cf_options =
1290         BuildColumnFamilyOptions(initial_cf_options_, new_mutable_cf_options);
1291     s = ValidateOptions(db_options, cf_options);
1292   }
1293   if (s.ok()) {
1294     mutable_cf_options_ = new_mutable_cf_options;
1295     mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1296   }
1297   return s;
1298 }
1299 #endif  // ROCKSDB_LITE
1300 
1301 // REQUIRES: DB mutex held
CalculateSSTWriteHint(int level)1302 Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1303   if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1304     return Env::WLTH_NOT_SET;
1305   }
1306   if (level == 0) {
1307     return Env::WLTH_MEDIUM;
1308   }
1309   int base_level = current_->storage_info()->base_level();
1310 
1311   // L1: medium, L2: long, ...
1312   if (level - base_level >= 2) {
1313     return Env::WLTH_EXTREME;
1314   } else if (level < base_level) {
1315     // There is no restriction which prevents level passed in to be smaller
1316     // than base_level.
1317     return Env::WLTH_MEDIUM;
1318   }
1319   return static_cast<Env::WriteLifeTimeHint>(level - base_level +
1320                             static_cast<int>(Env::WLTH_MEDIUM));
1321 }
1322 
AddDirectories(std::map<std::string,std::shared_ptr<Directory>> * created_dirs)1323 Status ColumnFamilyData::AddDirectories(
1324     std::map<std::string, std::shared_ptr<Directory>>* created_dirs) {
1325   Status s;
1326   assert(created_dirs != nullptr);
1327   assert(data_dirs_.empty());
1328   for (auto& p : ioptions_.cf_paths) {
1329     auto existing_dir = created_dirs->find(p.path);
1330 
1331     if (existing_dir == created_dirs->end()) {
1332       std::unique_ptr<Directory> path_directory;
1333       s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
1334       if (!s.ok()) {
1335         return s;
1336       }
1337       assert(path_directory != nullptr);
1338       data_dirs_.emplace_back(path_directory.release());
1339       (*created_dirs)[p.path] = data_dirs_.back();
1340     } else {
1341       data_dirs_.emplace_back(existing_dir->second);
1342     }
1343   }
1344   assert(data_dirs_.size() == ioptions_.cf_paths.size());
1345   return s;
1346 }
1347 
GetDataDir(size_t path_id) const1348 Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1349   if (data_dirs_.empty()) {
1350     return nullptr;
1351   }
1352 
1353   assert(path_id < data_dirs_.size());
1354   return data_dirs_[path_id].get();
1355 }
1356 
ColumnFamilySet(const std::string & dbname,const ImmutableDBOptions * db_options,const FileOptions & file_options,Cache * table_cache,WriteBufferManager * write_buffer_manager,WriteController * write_controller,BlockCacheTracer * const block_cache_tracer)1357 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1358                                  const ImmutableDBOptions* db_options,
1359                                  const FileOptions& file_options,
1360                                  Cache* table_cache,
1361                                  WriteBufferManager* write_buffer_manager,
1362                                  WriteController* write_controller,
1363                                  BlockCacheTracer* const block_cache_tracer)
1364     : max_column_family_(0),
1365       dummy_cfd_(new ColumnFamilyData(
1366           0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
1367           file_options, nullptr, block_cache_tracer)),
1368       default_cfd_cache_(nullptr),
1369       db_name_(dbname),
1370       db_options_(db_options),
1371       file_options_(file_options),
1372       table_cache_(table_cache),
1373       write_buffer_manager_(write_buffer_manager),
1374       write_controller_(write_controller),
1375       block_cache_tracer_(block_cache_tracer) {
1376   // initialize linked list
1377   dummy_cfd_->prev_ = dummy_cfd_;
1378   dummy_cfd_->next_ = dummy_cfd_;
1379 }
1380 
~ColumnFamilySet()1381 ColumnFamilySet::~ColumnFamilySet() {
1382   while (column_family_data_.size() > 0) {
1383     // cfd destructor will delete itself from column_family_data_
1384     auto cfd = column_family_data_.begin()->second;
1385     bool last_ref __attribute__((__unused__));
1386     last_ref = cfd->UnrefAndTryDelete();
1387     assert(last_ref);
1388   }
1389   bool dummy_last_ref __attribute__((__unused__));
1390   dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1391   assert(dummy_last_ref);
1392 }
1393 
GetDefault() const1394 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1395   assert(default_cfd_cache_ != nullptr);
1396   return default_cfd_cache_;
1397 }
1398 
GetColumnFamily(uint32_t id) const1399 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1400   auto cfd_iter = column_family_data_.find(id);
1401   if (cfd_iter != column_family_data_.end()) {
1402     return cfd_iter->second;
1403   } else {
1404     return nullptr;
1405   }
1406 }
1407 
GetColumnFamily(const std::string & name) const1408 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
1409     const {
1410   auto cfd_iter = column_families_.find(name);
1411   if (cfd_iter != column_families_.end()) {
1412     auto cfd = GetColumnFamily(cfd_iter->second);
1413     assert(cfd != nullptr);
1414     return cfd;
1415   } else {
1416     return nullptr;
1417   }
1418 }
1419 
GetNextColumnFamilyID()1420 uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1421   return ++max_column_family_;
1422 }
1423 
GetMaxColumnFamily()1424 uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1425 
UpdateMaxColumnFamily(uint32_t new_max_column_family)1426 void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1427   max_column_family_ = std::max(new_max_column_family, max_column_family_);
1428 }
1429 
NumberOfColumnFamilies() const1430 size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1431   return column_families_.size();
1432 }
1433 
1434 // under a DB mutex AND write thread
CreateColumnFamily(const std::string & name,uint32_t id,Version * dummy_versions,const ColumnFamilyOptions & options)1435 ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1436     const std::string& name, uint32_t id, Version* dummy_versions,
1437     const ColumnFamilyOptions& options) {
1438   assert(column_families_.find(name) == column_families_.end());
1439   ColumnFamilyData* new_cfd = new ColumnFamilyData(
1440       id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1441       *db_options_, file_options_, this, block_cache_tracer_);
1442   column_families_.insert({name, id});
1443   column_family_data_.insert({id, new_cfd});
1444   max_column_family_ = std::max(max_column_family_, id);
1445   // add to linked list
1446   new_cfd->next_ = dummy_cfd_;
1447   auto prev = dummy_cfd_->prev_;
1448   new_cfd->prev_ = prev;
1449   prev->next_ = new_cfd;
1450   dummy_cfd_->prev_ = new_cfd;
1451   if (id == 0) {
1452     default_cfd_cache_ = new_cfd;
1453   }
1454   return new_cfd;
1455 }
1456 
1457 // REQUIRES: DB mutex held
FreeDeadColumnFamilies()1458 void ColumnFamilySet::FreeDeadColumnFamilies() {
1459   autovector<ColumnFamilyData*> to_delete;
1460   for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
1461     if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1462       to_delete.push_back(cfd);
1463     }
1464   }
1465   for (auto cfd : to_delete) {
1466     // this is very rare, so it's not a problem that we do it under a mutex
1467     delete cfd;
1468   }
1469 }
1470 
1471 // under a DB mutex AND from a write thread
RemoveColumnFamily(ColumnFamilyData * cfd)1472 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1473   auto cfd_iter = column_family_data_.find(cfd->GetID());
1474   assert(cfd_iter != column_family_data_.end());
1475   column_family_data_.erase(cfd_iter);
1476   column_families_.erase(cfd->GetName());
1477 }
1478 
1479 // under a DB mutex OR from a write thread
Seek(uint32_t column_family_id)1480 bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1481   if (column_family_id == 0) {
1482     // optimization for common case
1483     current_ = column_family_set_->GetDefault();
1484   } else {
1485     current_ = column_family_set_->GetColumnFamily(column_family_id);
1486   }
1487   handle_.SetCFD(current_);
1488   return current_ != nullptr;
1489 }
1490 
GetLogNumber() const1491 uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1492   assert(current_ != nullptr);
1493   return current_->GetLogNumber();
1494 }
1495 
GetMemTable() const1496 MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1497   assert(current_ != nullptr);
1498   return current_->mem();
1499 }
1500 
GetColumnFamilyHandle()1501 ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1502   assert(current_ != nullptr);
1503   return &handle_;
1504 }
1505 
GetColumnFamilyID(ColumnFamilyHandle * column_family)1506 uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1507   uint32_t column_family_id = 0;
1508   if (column_family != nullptr) {
1509     auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
1510     column_family_id = cfh->GetID();
1511   }
1512   return column_family_id;
1513 }
1514 
GetColumnFamilyUserComparator(ColumnFamilyHandle * column_family)1515 const Comparator* GetColumnFamilyUserComparator(
1516     ColumnFamilyHandle* column_family) {
1517   if (column_family != nullptr) {
1518     return column_family->GetComparator();
1519   }
1520   return nullptr;
1521 }
1522 
1523 }  // namespace ROCKSDB_NAMESPACE
1524