1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "db/column_family.h"
11 
12 #include <algorithm>
13 #include <cinttypes>
14 #include <limits>
15 #include <sstream>
16 #include <string>
17 #include <vector>
18 
19 #include "db/blob/blob_file_cache.h"
20 #include "db/compaction/compaction_picker.h"
21 #include "db/compaction/compaction_picker_fifo.h"
22 #include "db/compaction/compaction_picker_level.h"
23 #include "db/compaction/compaction_picker_universal.h"
24 #include "db/db_impl/db_impl.h"
25 #include "db/internal_stats.h"
26 #include "db/job_context.h"
27 #include "db/range_del_aggregator.h"
28 #include "db/table_properties_collector.h"
29 #include "db/version_set.h"
30 #include "db/write_controller.h"
31 #include "file/sst_file_manager_impl.h"
32 #include "logging/logging.h"
33 #include "monitoring/thread_status_util.h"
34 #include "options/options_helper.h"
35 #include "port/port.h"
36 #include "rocksdb/convenience.h"
37 #include "rocksdb/table.h"
38 #include "table/merging_iterator.h"
39 #include "util/autovector.h"
40 #include "util/cast_util.h"
41 #include "util/compression.h"
42 
43 namespace ROCKSDB_NAMESPACE {
44 
ColumnFamilyHandleImpl(ColumnFamilyData * column_family_data,DBImpl * db,InstrumentedMutex * mutex)45 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
46     ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
47     : cfd_(column_family_data), db_(db), mutex_(mutex) {
48   if (cfd_ != nullptr) {
49     cfd_->Ref();
50   }
51 }
52 
~ColumnFamilyHandleImpl()53 ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
54   if (cfd_ != nullptr) {
55 #ifndef ROCKSDB_LITE
56     for (auto& listener : cfd_->ioptions()->listeners) {
57       listener->OnColumnFamilyHandleDeletionStarted(this);
58     }
59 #endif  // ROCKSDB_LITE
60     // Job id == 0 means that this is not our background process, but rather
61     // user thread
62     // Need to hold some shared pointers owned by the initial_cf_options
63     // before final cleaning up finishes.
64     ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
65     JobContext job_context(0);
66     mutex_->Lock();
67     bool dropped = cfd_->IsDropped();
68     if (cfd_->UnrefAndTryDelete()) {
69       if (dropped) {
70         db_->FindObsoleteFiles(&job_context, false, true);
71       }
72     }
73     mutex_->Unlock();
74     if (job_context.HaveSomethingToDelete()) {
75       bool defer_purge =
76           db_->immutable_db_options().avoid_unnecessary_blocking_io;
77       db_->PurgeObsoleteFiles(job_context, defer_purge);
78       if (defer_purge) {
79         mutex_->Lock();
80         db_->SchedulePurge();
81         mutex_->Unlock();
82       }
83     }
84     job_context.Clean();
85   }
86 }
87 
GetID() const88 uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
89 
GetName() const90 const std::string& ColumnFamilyHandleImpl::GetName() const {
91   return cfd()->GetName();
92 }
93 
GetDescriptor(ColumnFamilyDescriptor * desc)94 Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
95 #ifndef ROCKSDB_LITE
96   // accessing mutable cf-options requires db mutex.
97   InstrumentedMutexLock l(mutex_);
98   *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
99   return Status::OK();
100 #else
101   (void)desc;
102   return Status::NotSupported();
103 #endif  // !ROCKSDB_LITE
104 }
105 
GetComparator() const106 const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
107   return cfd()->user_comparator();
108 }
109 
GetIntTblPropCollectorFactory(const ImmutableCFOptions & ioptions,IntTblPropCollectorFactories * int_tbl_prop_collector_factories)110 void GetIntTblPropCollectorFactory(
111     const ImmutableCFOptions& ioptions,
112     IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
113   assert(int_tbl_prop_collector_factories);
114 
115   auto& collector_factories = ioptions.table_properties_collector_factories;
116   for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
117        ++i) {
118     assert(collector_factories[i]);
119     int_tbl_prop_collector_factories->emplace_back(
120         new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
121   }
122 }
123 
CheckCompressionSupported(const ColumnFamilyOptions & cf_options)124 Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
125   if (!cf_options.compression_per_level.empty()) {
126     for (size_t level = 0; level < cf_options.compression_per_level.size();
127          ++level) {
128       if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
129         return Status::InvalidArgument(
130             "Compression type " +
131             CompressionTypeToString(cf_options.compression_per_level[level]) +
132             " is not linked with the binary.");
133       }
134     }
135   } else {
136     if (!CompressionTypeSupported(cf_options.compression)) {
137       return Status::InvalidArgument(
138           "Compression type " +
139           CompressionTypeToString(cf_options.compression) +
140           " is not linked with the binary.");
141     }
142   }
143   if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
144     if (!ZSTD_TrainDictionarySupported()) {
145       return Status::InvalidArgument(
146           "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
147           "is not linked with the binary.");
148     }
149     if (cf_options.compression_opts.max_dict_bytes == 0) {
150       return Status::InvalidArgument(
151           "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
152           "should be nonzero if we're using zstd's dictionary generator.");
153     }
154   }
155 
156   if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
157     std::ostringstream oss;
158     oss << "The specified blob compression type "
159         << CompressionTypeToString(cf_options.blob_compression_type)
160         << " is not available.";
161 
162     return Status::InvalidArgument(oss.str());
163   }
164 
165   return Status::OK();
166 }
167 
CheckConcurrentWritesSupported(const ColumnFamilyOptions & cf_options)168 Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
169   if (cf_options.inplace_update_support) {
170     return Status::InvalidArgument(
171         "In-place memtable updates (inplace_update_support) is not compatible "
172         "with concurrent writes (allow_concurrent_memtable_write)");
173   }
174   if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
175     return Status::InvalidArgument(
176         "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
177   }
178   return Status::OK();
179 }
180 
CheckCFPathsSupported(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)181 Status CheckCFPathsSupported(const DBOptions& db_options,
182                              const ColumnFamilyOptions& cf_options) {
183   // More than one cf_paths are supported only in universal
184   // and level compaction styles. This function also checks the case
185   // in which cf_paths is not specified, which results in db_paths
186   // being used.
187   if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
188       (cf_options.compaction_style != kCompactionStyleLevel)) {
189     if (cf_options.cf_paths.size() > 1) {
190       return Status::NotSupported(
191           "More than one CF paths are only supported in "
192           "universal and level compaction styles. ");
193     } else if (cf_options.cf_paths.empty() &&
194                db_options.db_paths.size() > 1) {
195       return Status::NotSupported(
196           "More than one DB paths are only supported in "
197           "universal and level compaction styles. ");
198     }
199   }
200   return Status::OK();
201 }
202 
203 namespace {
204 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
205 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
206 }  // namespace
207 
SanitizeOptions(const ImmutableDBOptions & db_options,const ColumnFamilyOptions & src)208 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
209                                     const ColumnFamilyOptions& src) {
210   ColumnFamilyOptions result = src;
211   size_t clamp_max = std::conditional<
212       sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
213       std::integral_constant<uint64_t, 64ull << 30>>::type::value;
214   ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
215   // if user sets arena_block_size, we trust user to use this value. Otherwise,
216   // calculate a proper value from writer_buffer_size;
217   if (result.arena_block_size <= 0) {
218     result.arena_block_size =
219         std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
220 
221     // Align up to 4k
222     const size_t align = 4 * 1024;
223     result.arena_block_size =
224         ((result.arena_block_size + align - 1) / align) * align;
225   }
226   result.min_write_buffer_number_to_merge =
227       std::min(result.min_write_buffer_number_to_merge,
228                result.max_write_buffer_number - 1);
229   if (result.min_write_buffer_number_to_merge < 1) {
230     result.min_write_buffer_number_to_merge = 1;
231   }
232 
233   if (result.num_levels < 1) {
234     result.num_levels = 1;
235   }
236   if (result.compaction_style == kCompactionStyleLevel &&
237       result.num_levels < 2) {
238     result.num_levels = 2;
239   }
240 
241   if (result.compaction_style == kCompactionStyleUniversal &&
242       db_options.allow_ingest_behind && result.num_levels < 3) {
243     result.num_levels = 3;
244   }
245 
246   if (result.max_write_buffer_number < 2) {
247     result.max_write_buffer_number = 2;
248   }
249   // fall back max_write_buffer_number_to_maintain if
250   // max_write_buffer_size_to_maintain is not set
251   if (result.max_write_buffer_size_to_maintain < 0) {
252     result.max_write_buffer_size_to_maintain =
253         result.max_write_buffer_number *
254         static_cast<int64_t>(result.write_buffer_size);
255   } else if (result.max_write_buffer_size_to_maintain == 0 &&
256              result.max_write_buffer_number_to_maintain < 0) {
257     result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
258   }
259   // bloom filter size shouldn't exceed 1/4 of memtable size.
260   if (result.memtable_prefix_bloom_size_ratio > 0.25) {
261     result.memtable_prefix_bloom_size_ratio = 0.25;
262   } else if (result.memtable_prefix_bloom_size_ratio < 0) {
263     result.memtable_prefix_bloom_size_ratio = 0;
264   }
265 
266   if (!result.prefix_extractor) {
267     assert(result.memtable_factory);
268     Slice name = result.memtable_factory->Name();
269     if (name.compare("HashSkipListRepFactory") == 0 ||
270         name.compare("HashLinkListRepFactory") == 0) {
271       result.memtable_factory = std::make_shared<SkipListFactory>();
272     }
273   }
274 
275   if (result.compaction_style == kCompactionStyleFIFO) {
276     result.num_levels = 1;
277     // since we delete level0 files in FIFO compaction when there are too many
278     // of them, these options don't really mean anything
279     result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
280     result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
281   }
282 
283   if (result.max_bytes_for_level_multiplier <= 0) {
284     result.max_bytes_for_level_multiplier = 1;
285   }
286 
287   if (result.level0_file_num_compaction_trigger == 0) {
288     ROCKS_LOG_WARN(db_options.logger,
289                    "level0_file_num_compaction_trigger cannot be 0");
290     result.level0_file_num_compaction_trigger = 1;
291   }
292 
293   if (result.level0_stop_writes_trigger <
294           result.level0_slowdown_writes_trigger ||
295       result.level0_slowdown_writes_trigger <
296           result.level0_file_num_compaction_trigger) {
297     ROCKS_LOG_WARN(db_options.logger,
298                    "This condition must be satisfied: "
299                    "level0_stop_writes_trigger(%d) >= "
300                    "level0_slowdown_writes_trigger(%d) >= "
301                    "level0_file_num_compaction_trigger(%d)",
302                    result.level0_stop_writes_trigger,
303                    result.level0_slowdown_writes_trigger,
304                    result.level0_file_num_compaction_trigger);
305     if (result.level0_slowdown_writes_trigger <
306         result.level0_file_num_compaction_trigger) {
307       result.level0_slowdown_writes_trigger =
308           result.level0_file_num_compaction_trigger;
309     }
310     if (result.level0_stop_writes_trigger <
311         result.level0_slowdown_writes_trigger) {
312       result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
313     }
314     ROCKS_LOG_WARN(db_options.logger,
315                    "Adjust the value to "
316                    "level0_stop_writes_trigger(%d)"
317                    "level0_slowdown_writes_trigger(%d)"
318                    "level0_file_num_compaction_trigger(%d)",
319                    result.level0_stop_writes_trigger,
320                    result.level0_slowdown_writes_trigger,
321                    result.level0_file_num_compaction_trigger);
322   }
323 
324   if (result.soft_pending_compaction_bytes_limit == 0) {
325     result.soft_pending_compaction_bytes_limit =
326         result.hard_pending_compaction_bytes_limit;
327   } else if (result.hard_pending_compaction_bytes_limit > 0 &&
328              result.soft_pending_compaction_bytes_limit >
329                  result.hard_pending_compaction_bytes_limit) {
330     result.soft_pending_compaction_bytes_limit =
331         result.hard_pending_compaction_bytes_limit;
332   }
333 
334 #ifndef ROCKSDB_LITE
335   // When the DB is stopped, it's possible that there are some .trash files that
336   // were not deleted yet, when we open the DB we will find these .trash files
337   // and schedule them to be deleted (or delete immediately if SstFileManager
338   // was not used)
339   auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
340   for (size_t i = 0; i < result.cf_paths.size(); i++) {
341     DeleteScheduler::CleanupDirectory(db_options.env, sfm,
342                                       result.cf_paths[i].path)
343         .PermitUncheckedError();
344   }
345 #endif
346 
347   if (result.cf_paths.empty()) {
348     result.cf_paths = db_options.db_paths;
349   }
350 
351   if (result.level_compaction_dynamic_level_bytes) {
352     if (result.compaction_style != kCompactionStyleLevel) {
353       ROCKS_LOG_WARN(db_options.info_log.get(),
354                      "level_compaction_dynamic_level_bytes only makes sense"
355                      "for level-based compaction");
356       result.level_compaction_dynamic_level_bytes = false;
357     } else if (result.cf_paths.size() > 1U) {
358       // we don't yet know how to make both of this feature and multiple
359       // DB path work.
360       ROCKS_LOG_WARN(db_options.info_log.get(),
361                      "multiple cf_paths/db_paths and"
362                      "level_compaction_dynamic_level_bytes"
363                      "can't be used together");
364       result.level_compaction_dynamic_level_bytes = false;
365     }
366   }
367 
368   if (result.max_compaction_bytes == 0) {
369     result.max_compaction_bytes = result.target_file_size_base * 25;
370   }
371 
372   bool is_block_based_table = (result.table_factory->IsInstanceOf(
373       TableFactory::kBlockBasedTableName()));
374 
375   const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
376   if (result.ttl == kDefaultTtl) {
377     if (is_block_based_table &&
378         result.compaction_style != kCompactionStyleFIFO) {
379       result.ttl = kAdjustedTtl;
380     } else {
381       result.ttl = 0;
382     }
383   }
384 
385   const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
386 
387   // Turn on periodic compactions and set them to occur once every 30 days if
388   // compaction filters are used and periodic_compaction_seconds is set to the
389   // default value.
390   if (result.compaction_style != kCompactionStyleFIFO) {
391     if ((result.compaction_filter != nullptr ||
392          result.compaction_filter_factory != nullptr) &&
393         result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
394         is_block_based_table) {
395       result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
396     }
397   } else {
398     // result.compaction_style == kCompactionStyleFIFO
399     if (result.ttl == 0) {
400       if (is_block_based_table) {
401         if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
402           result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
403         }
404         result.ttl = result.periodic_compaction_seconds;
405       }
406     } else if (result.periodic_compaction_seconds != 0) {
407       result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
408     }
409   }
410 
411   // TTL compactions would work similar to Periodic Compactions in Universal in
412   // most of the cases. So, if ttl is set, execute the periodic compaction
413   // codepath.
414   if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
415     if (result.periodic_compaction_seconds != 0) {
416       result.periodic_compaction_seconds =
417           std::min(result.ttl, result.periodic_compaction_seconds);
418     } else {
419       result.periodic_compaction_seconds = result.ttl;
420     }
421   }
422 
423   if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
424     result.periodic_compaction_seconds = 0;
425   }
426 
427   return result;
428 }
429 
430 int SuperVersion::dummy = 0;
431 void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
432 void* const SuperVersion::kSVObsolete = nullptr;
433 
~SuperVersion()434 SuperVersion::~SuperVersion() {
435   for (auto td : to_delete) {
436     delete td;
437   }
438 }
439 
Ref()440 SuperVersion* SuperVersion::Ref() {
441   refs.fetch_add(1, std::memory_order_relaxed);
442   return this;
443 }
444 
Unref()445 bool SuperVersion::Unref() {
446   // fetch_sub returns the previous value of ref
447   uint32_t previous_refs = refs.fetch_sub(1);
448   assert(previous_refs > 0);
449   return previous_refs == 1;
450 }
451 
Cleanup()452 void SuperVersion::Cleanup() {
453   assert(refs.load(std::memory_order_relaxed) == 0);
454   // Since this SuperVersion object is being deleted,
455   // decrement reference to the immutable MemtableList
456   // this SV object was pointing to.
457   imm->Unref(&to_delete);
458   MemTable* m = mem->Unref();
459   if (m != nullptr) {
460     auto* memory_usage = current->cfd()->imm()->current_memory_usage();
461     assert(*memory_usage >= m->ApproximateMemoryUsage());
462     *memory_usage -= m->ApproximateMemoryUsage();
463     to_delete.push_back(m);
464   }
465   current->Unref();
466   cfd->UnrefAndTryDelete();
467 }
468 
Init(ColumnFamilyData * new_cfd,MemTable * new_mem,MemTableListVersion * new_imm,Version * new_current)469 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
470                         MemTableListVersion* new_imm, Version* new_current) {
471   cfd = new_cfd;
472   mem = new_mem;
473   imm = new_imm;
474   current = new_current;
475   cfd->Ref();
476   mem->Ref();
477   imm->Ref();
478   current->Ref();
479   refs.store(1, std::memory_order_relaxed);
480 }
481 
482 namespace {
SuperVersionUnrefHandle(void * ptr)483 void SuperVersionUnrefHandle(void* ptr) {
484   // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
485   // destroyed. When the former happens, the thread shouldn't see kSVInUse.
486   // When the latter happens, only super_version_ holds a reference
487   // to ColumnFamilyData, so no further queries are possible.
488   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
489   bool was_last_ref __attribute__((__unused__));
490   was_last_ref = sv->Unref();
491   // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
492   // This is important because we can't do SuperVersion cleanup here.
493   // That would require locking DB mutex, which would deadlock because
494   // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
495   assert(!was_last_ref);
496 }
497 }  // anonymous namespace
498 
GetDbPaths() const499 std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
500   std::vector<std::string> paths;
501   paths.reserve(ioptions_.cf_paths.size());
502   for (const DbPath& db_path : ioptions_.cf_paths) {
503     paths.emplace_back(db_path.path);
504   }
505   return paths;
506 }
507 
508 const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
509 
ColumnFamilyData(uint32_t id,const std::string & name,Version * _dummy_versions,Cache * _table_cache,WriteBufferManager * write_buffer_manager,const ColumnFamilyOptions & cf_options,const ImmutableDBOptions & db_options,const FileOptions * file_options,ColumnFamilySet * column_family_set,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_session_id)510 ColumnFamilyData::ColumnFamilyData(
511     uint32_t id, const std::string& name, Version* _dummy_versions,
512     Cache* _table_cache, WriteBufferManager* write_buffer_manager,
513     const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
514     const FileOptions* file_options, ColumnFamilySet* column_family_set,
515     BlockCacheTracer* const block_cache_tracer,
516     const std::shared_ptr<IOTracer>& io_tracer,
517     const std::string& db_session_id)
518     : id_(id),
519       name_(name),
520       dummy_versions_(_dummy_versions),
521       current_(nullptr),
522       refs_(0),
523       initialized_(false),
524       dropped_(false),
525       internal_comparator_(cf_options.comparator),
526       initial_cf_options_(SanitizeOptions(db_options, cf_options)),
527       ioptions_(db_options, initial_cf_options_),
528       mutable_cf_options_(initial_cf_options_),
529       is_delete_range_supported_(
530           cf_options.table_factory->IsDeleteRangeSupported()),
531       write_buffer_manager_(write_buffer_manager),
532       mem_(nullptr),
533       imm_(ioptions_.min_write_buffer_number_to_merge,
534            ioptions_.max_write_buffer_number_to_maintain,
535            ioptions_.max_write_buffer_size_to_maintain),
536       super_version_(nullptr),
537       super_version_number_(0),
538       local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
539       next_(nullptr),
540       prev_(nullptr),
541       log_number_(0),
542       flush_reason_(FlushReason::kOthers),
543       column_family_set_(column_family_set),
544       queued_for_flush_(false),
545       queued_for_compaction_(false),
546       prev_compaction_needed_bytes_(0),
547       allow_2pc_(db_options.allow_2pc),
548       last_memtable_id_(0),
549       db_paths_registered_(false) {
550   if (id_ != kDummyColumnFamilyDataId) {
551     // TODO(cc): RegisterDbPaths can be expensive, considering moving it
552     // outside of this constructor which might be called with db mutex held.
553     // TODO(cc): considering using ioptions_.fs, currently some tests rely on
554     // EnvWrapper, that's the main reason why we use env here.
555     Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
556     if (s.ok()) {
557       db_paths_registered_ = true;
558     } else {
559       ROCKS_LOG_ERROR(
560           ioptions_.logger,
561           "Failed to register data paths of column family (id: %d, name: %s)",
562           id_, name_.c_str());
563     }
564   }
565   Ref();
566 
567   // Convert user defined table properties collector factories to internal ones.
568   GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
569 
570   // if _dummy_versions is nullptr, then this is a dummy column family.
571   if (_dummy_versions != nullptr) {
572     internal_stats_.reset(
573         new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
574     table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
575                                       block_cache_tracer, io_tracer,
576                                       db_session_id));
577     blob_file_cache_.reset(
578         new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
579                           internal_stats_->GetBlobFileReadHist(), io_tracer));
580 
581     if (ioptions_.compaction_style == kCompactionStyleLevel) {
582       compaction_picker_.reset(
583           new LevelCompactionPicker(ioptions_, &internal_comparator_));
584 #ifndef ROCKSDB_LITE
585     } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
586       compaction_picker_.reset(
587           new UniversalCompactionPicker(ioptions_, &internal_comparator_));
588     } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
589       compaction_picker_.reset(
590           new FIFOCompactionPicker(ioptions_, &internal_comparator_));
591     } else if (ioptions_.compaction_style == kCompactionStyleNone) {
592       compaction_picker_.reset(new NullCompactionPicker(
593           ioptions_, &internal_comparator_));
594       ROCKS_LOG_WARN(ioptions_.logger,
595                      "Column family %s does not use any background compaction. "
596                      "Compactions can only be done via CompactFiles\n",
597                      GetName().c_str());
598 #endif  // !ROCKSDB_LITE
599     } else {
600       ROCKS_LOG_ERROR(ioptions_.logger,
601                       "Unable to recognize the specified compaction style %d. "
602                       "Column family %s will use kCompactionStyleLevel.\n",
603                       ioptions_.compaction_style, GetName().c_str());
604       compaction_picker_.reset(
605           new LevelCompactionPicker(ioptions_, &internal_comparator_));
606     }
607 
608     if (column_family_set_->NumberOfColumnFamilies() < 10) {
609       ROCKS_LOG_INFO(ioptions_.logger,
610                      "--------------- Options for column family [%s]:\n",
611                      name.c_str());
612       initial_cf_options_.Dump(ioptions_.logger);
613     } else {
614       ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
615     }
616   }
617 
618   RecalculateWriteStallConditions(mutable_cf_options_);
619 }
620 
621 // DB mutex held
~ColumnFamilyData()622 ColumnFamilyData::~ColumnFamilyData() {
623   assert(refs_.load(std::memory_order_relaxed) == 0);
624   // remove from linked list
625   auto prev = prev_;
626   auto next = next_;
627   prev->next_ = next;
628   next->prev_ = prev;
629 
630   if (!dropped_ && column_family_set_ != nullptr) {
631     // If it's dropped, it's already removed from column family set
632     // If column_family_set_ == nullptr, this is dummy CFD and not in
633     // ColumnFamilySet
634     column_family_set_->RemoveColumnFamily(this);
635   }
636 
637   if (current_ != nullptr) {
638     current_->Unref();
639   }
640 
641   // It would be wrong if this ColumnFamilyData is in flush_queue_ or
642   // compaction_queue_ and we destroyed it
643   assert(!queued_for_flush_);
644   assert(!queued_for_compaction_);
645   assert(super_version_ == nullptr);
646 
647   if (dummy_versions_ != nullptr) {
648     // List must be empty
649     assert(dummy_versions_->Next() == dummy_versions_);
650     bool deleted __attribute__((__unused__));
651     deleted = dummy_versions_->Unref();
652     assert(deleted);
653   }
654 
655   if (mem_ != nullptr) {
656     delete mem_->Unref();
657   }
658   autovector<MemTable*> to_delete;
659   imm_.current()->Unref(&to_delete);
660   for (MemTable* m : to_delete) {
661     delete m;
662   }
663 
664   if (db_paths_registered_) {
665     // TODO(cc): considering using ioptions_.fs, currently some tests rely on
666     // EnvWrapper, that's the main reason why we use env here.
667     Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
668     if (!s.ok()) {
669       ROCKS_LOG_ERROR(
670           ioptions_.logger,
671           "Failed to unregister data paths of column family (id: %d, name: %s)",
672           id_, name_.c_str());
673     }
674   }
675 }
676 
UnrefAndTryDelete()677 bool ColumnFamilyData::UnrefAndTryDelete() {
678   int old_refs = refs_.fetch_sub(1);
679   assert(old_refs > 0);
680 
681   if (old_refs == 1) {
682     assert(super_version_ == nullptr);
683     delete this;
684     return true;
685   }
686 
687   if (old_refs == 2 && super_version_ != nullptr) {
688     // Only the super_version_ holds me
689     SuperVersion* sv = super_version_;
690     super_version_ = nullptr;
691 
692     // Release SuperVersion references kept in ThreadLocalPtr.
693     local_sv_.reset();
694 
695     if (sv->Unref()) {
696       // Note: sv will delete this ColumnFamilyData during Cleanup()
697       assert(sv->cfd == this);
698       sv->Cleanup();
699       delete sv;
700       return true;
701     }
702   }
703   return false;
704 }
705 
SetDropped()706 void ColumnFamilyData::SetDropped() {
707   // can't drop default CF
708   assert(id_ != 0);
709   dropped_ = true;
710   write_controller_token_.reset();
711 
712   // remove from column_family_set
713   column_family_set_->RemoveColumnFamily(this);
714 }
715 
GetLatestCFOptions() const716 ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
717   return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
718 }
719 
OldestLogToKeep()720 uint64_t ColumnFamilyData::OldestLogToKeep() {
721   auto current_log = GetLogNumber();
722 
723   if (allow_2pc_) {
724     auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
725     auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
726 
727     if (imm_prep_log > 0 && imm_prep_log < current_log) {
728       current_log = imm_prep_log;
729     }
730 
731     if (mem_prep_log > 0 && mem_prep_log < current_log) {
732       current_log = mem_prep_log;
733     }
734   }
735 
736   return current_log;
737 }
738 
739 const double kIncSlowdownRatio = 0.8;
740 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
741 const double kNearStopSlowdownRatio = 0.6;
742 const double kDelayRecoverSlowdownRatio = 1.4;
743 
744 namespace {
745 // If penalize_stop is true, we further reduce slowdown rate.
SetupDelay(WriteController * write_controller,uint64_t compaction_needed_bytes,uint64_t prev_compaction_need_bytes,bool penalize_stop,bool auto_comapctions_disabled)746 std::unique_ptr<WriteControllerToken> SetupDelay(
747     WriteController* write_controller, uint64_t compaction_needed_bytes,
748     uint64_t prev_compaction_need_bytes, bool penalize_stop,
749     bool auto_comapctions_disabled) {
750   const uint64_t kMinWriteRate = 16 * 1024u;  // Minimum write rate 16KB/s.
751 
752   uint64_t max_write_rate = write_controller->max_delayed_write_rate();
753   uint64_t write_rate = write_controller->delayed_write_rate();
754 
755   if (auto_comapctions_disabled) {
756     // When auto compaction is disabled, always use the value user gave.
757     write_rate = max_write_rate;
758   } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
759     // If user gives rate less than kMinWriteRate, don't adjust it.
760     //
761     // If already delayed, need to adjust based on previous compaction debt.
762     // When there are two or more column families require delay, we always
763     // increase or reduce write rate based on information for one single
764     // column family. It is likely to be OK but we can improve if there is a
765     // problem.
766     // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
767     // is only available in level-based compaction
768     //
769     // If the compaction debt stays the same as previously, we also further slow
770     // down. It usually means a mem table is full. It's mainly for the case
771     // where both of flush and compaction are much slower than the speed we
772     // insert to mem tables, so we need to actively slow down before we get
773     // feedback signal from compaction and flushes to avoid the full stop
774     // because of hitting the max write buffer number.
775     //
776     // If DB just falled into the stop condition, we need to further reduce
777     // the write rate to avoid the stop condition.
778     if (penalize_stop) {
779       // Penalize the near stop or stop condition by more aggressive slowdown.
780       // This is to provide the long term slowdown increase signal.
781       // The penalty is more than the reward of recovering to the normal
782       // condition.
783       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
784                                          kNearStopSlowdownRatio);
785       if (write_rate < kMinWriteRate) {
786         write_rate = kMinWriteRate;
787       }
788     } else if (prev_compaction_need_bytes > 0 &&
789                prev_compaction_need_bytes <= compaction_needed_bytes) {
790       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
791                                          kIncSlowdownRatio);
792       if (write_rate < kMinWriteRate) {
793         write_rate = kMinWriteRate;
794       }
795     } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
796       // We are speeding up by ratio of kSlowdownRatio when we have paid
797       // compaction debt. But we'll never speed up to faster than the write rate
798       // given by users.
799       write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
800                                          kDecSlowdownRatio);
801       if (write_rate > max_write_rate) {
802         write_rate = max_write_rate;
803       }
804     }
805   }
806   return write_controller->GetDelayToken(write_rate);
807 }
808 
GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,int level0_slowdown_writes_trigger)809 int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
810                                     int level0_slowdown_writes_trigger) {
811   // SanitizeOptions() ensures it.
812   assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
813 
814   if (level0_file_num_compaction_trigger < 0) {
815     return std::numeric_limits<int>::max();
816   }
817 
818   const int64_t twice_level0_trigger =
819       static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
820 
821   const int64_t one_fourth_trigger_slowdown =
822       static_cast<int64_t>(level0_file_num_compaction_trigger) +
823       ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
824        4);
825 
826   assert(twice_level0_trigger >= 0);
827   assert(one_fourth_trigger_slowdown >= 0);
828 
829   // 1/4 of the way between L0 compaction trigger threshold and slowdown
830   // condition.
831   // Or twice as compaction trigger, if it is smaller.
832   int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
833   if (res >= port::kMaxInt32) {
834     return port::kMaxInt32;
835   } else {
836     // res fits in int
837     return static_cast<int>(res);
838   }
839 }
840 }  // namespace
841 
842 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
GetWriteStallConditionAndCause(int num_unflushed_memtables,int num_l0_files,uint64_t num_compaction_needed_bytes,const MutableCFOptions & mutable_cf_options,const ImmutableCFOptions & immutable_cf_options)843 ColumnFamilyData::GetWriteStallConditionAndCause(
844     int num_unflushed_memtables, int num_l0_files,
845     uint64_t num_compaction_needed_bytes,
846     const MutableCFOptions& mutable_cf_options,
847     const ImmutableCFOptions& immutable_cf_options) {
848   if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
849     return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
850   } else if (!mutable_cf_options.disable_auto_compactions &&
851              num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
852     return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
853   } else if (!mutable_cf_options.disable_auto_compactions &&
854              mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
855              num_compaction_needed_bytes >=
856                  mutable_cf_options.hard_pending_compaction_bytes_limit) {
857     return {WriteStallCondition::kStopped,
858             WriteStallCause::kPendingCompactionBytes};
859   } else if (mutable_cf_options.max_write_buffer_number > 3 &&
860              num_unflushed_memtables >=
861                  mutable_cf_options.max_write_buffer_number - 1 &&
862              num_unflushed_memtables - 1 >=
863                  immutable_cf_options.min_write_buffer_number_to_merge) {
864     return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
865   } else if (!mutable_cf_options.disable_auto_compactions &&
866              mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
867              num_l0_files >=
868                  mutable_cf_options.level0_slowdown_writes_trigger) {
869     return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
870   } else if (!mutable_cf_options.disable_auto_compactions &&
871              mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
872              num_compaction_needed_bytes >=
873                  mutable_cf_options.soft_pending_compaction_bytes_limit) {
874     return {WriteStallCondition::kDelayed,
875             WriteStallCause::kPendingCompactionBytes};
876   }
877   return {WriteStallCondition::kNormal, WriteStallCause::kNone};
878 }
879 
RecalculateWriteStallConditions(const MutableCFOptions & mutable_cf_options)880 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
881       const MutableCFOptions& mutable_cf_options) {
882   auto write_stall_condition = WriteStallCondition::kNormal;
883   if (current_ != nullptr) {
884     auto* vstorage = current_->storage_info();
885     auto write_controller = column_family_set_->write_controller_;
886     uint64_t compaction_needed_bytes =
887         vstorage->estimated_compaction_needed_bytes();
888 
889     auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
890         imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
891         vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
892         *ioptions());
893     write_stall_condition = write_stall_condition_and_cause.first;
894     auto write_stall_cause = write_stall_condition_and_cause.second;
895 
896     bool was_stopped = write_controller->IsStopped();
897     bool needed_delay = write_controller->NeedsDelay();
898 
899     if (write_stall_condition == WriteStallCondition::kStopped &&
900         write_stall_cause == WriteStallCause::kMemtableLimit) {
901       write_controller_token_ = write_controller->GetStopToken();
902       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
903       ROCKS_LOG_WARN(
904           ioptions_.logger,
905           "[%s] Stopping writes because we have %d immutable memtables "
906           "(waiting for flush), max_write_buffer_number is set to %d",
907           name_.c_str(), imm()->NumNotFlushed(),
908           mutable_cf_options.max_write_buffer_number);
909     } else if (write_stall_condition == WriteStallCondition::kStopped &&
910                write_stall_cause == WriteStallCause::kL0FileCountLimit) {
911       write_controller_token_ = write_controller->GetStopToken();
912       internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
913       if (compaction_picker_->IsLevel0CompactionInProgress()) {
914         internal_stats_->AddCFStats(
915             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
916       }
917       ROCKS_LOG_WARN(ioptions_.logger,
918                      "[%s] Stopping writes because we have %d level-0 files",
919                      name_.c_str(), vstorage->l0_delay_trigger_count());
920     } else if (write_stall_condition == WriteStallCondition::kStopped &&
921                write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
922       write_controller_token_ = write_controller->GetStopToken();
923       internal_stats_->AddCFStats(
924           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
925       ROCKS_LOG_WARN(
926           ioptions_.logger,
927           "[%s] Stopping writes because of estimated pending compaction "
928           "bytes %" PRIu64,
929           name_.c_str(), compaction_needed_bytes);
930     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
931                write_stall_cause == WriteStallCause::kMemtableLimit) {
932       write_controller_token_ =
933           SetupDelay(write_controller, compaction_needed_bytes,
934                      prev_compaction_needed_bytes_, was_stopped,
935                      mutable_cf_options.disable_auto_compactions);
936       internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
937       ROCKS_LOG_WARN(
938           ioptions_.logger,
939           "[%s] Stalling writes because we have %d immutable memtables "
940           "(waiting for flush), max_write_buffer_number is set to %d "
941           "rate %" PRIu64,
942           name_.c_str(), imm()->NumNotFlushed(),
943           mutable_cf_options.max_write_buffer_number,
944           write_controller->delayed_write_rate());
945     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
946                write_stall_cause == WriteStallCause::kL0FileCountLimit) {
947       // L0 is the last two files from stopping.
948       bool near_stop = vstorage->l0_delay_trigger_count() >=
949                        mutable_cf_options.level0_stop_writes_trigger - 2;
950       write_controller_token_ =
951           SetupDelay(write_controller, compaction_needed_bytes,
952                      prev_compaction_needed_bytes_, was_stopped || near_stop,
953                      mutable_cf_options.disable_auto_compactions);
954       internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
955                                   1);
956       if (compaction_picker_->IsLevel0CompactionInProgress()) {
957         internal_stats_->AddCFStats(
958             InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
959       }
960       ROCKS_LOG_WARN(ioptions_.logger,
961                      "[%s] Stalling writes because we have %d level-0 files "
962                      "rate %" PRIu64,
963                      name_.c_str(), vstorage->l0_delay_trigger_count(),
964                      write_controller->delayed_write_rate());
965     } else if (write_stall_condition == WriteStallCondition::kDelayed &&
966                write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
967       // If the distance to hard limit is less than 1/4 of the gap between soft
968       // and
969       // hard bytes limit, we think it is near stop and speed up the slowdown.
970       bool near_stop =
971           mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
972           (compaction_needed_bytes -
973            mutable_cf_options.soft_pending_compaction_bytes_limit) >
974               3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
975                    mutable_cf_options.soft_pending_compaction_bytes_limit) /
976                   4;
977 
978       write_controller_token_ =
979           SetupDelay(write_controller, compaction_needed_bytes,
980                      prev_compaction_needed_bytes_, was_stopped || near_stop,
981                      mutable_cf_options.disable_auto_compactions);
982       internal_stats_->AddCFStats(
983           InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
984       ROCKS_LOG_WARN(
985           ioptions_.logger,
986           "[%s] Stalling writes because of estimated pending compaction "
987           "bytes %" PRIu64 " rate %" PRIu64,
988           name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
989           write_controller->delayed_write_rate());
990     } else {
991       assert(write_stall_condition == WriteStallCondition::kNormal);
992       if (vstorage->l0_delay_trigger_count() >=
993           GetL0ThresholdSpeedupCompaction(
994               mutable_cf_options.level0_file_num_compaction_trigger,
995               mutable_cf_options.level0_slowdown_writes_trigger)) {
996         write_controller_token_ =
997             write_controller->GetCompactionPressureToken();
998         ROCKS_LOG_INFO(
999             ioptions_.logger,
1000             "[%s] Increasing compaction threads because we have %d level-0 "
1001             "files ",
1002             name_.c_str(), vstorage->l0_delay_trigger_count());
1003       } else if (vstorage->estimated_compaction_needed_bytes() >=
1004                  mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
1005         // Increase compaction threads if bytes needed for compaction exceeds
1006         // 1/4 of threshold for slowing down.
1007         // If soft pending compaction byte limit is not set, always speed up
1008         // compaction.
1009         write_controller_token_ =
1010             write_controller->GetCompactionPressureToken();
1011         if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
1012           ROCKS_LOG_INFO(
1013               ioptions_.logger,
1014               "[%s] Increasing compaction threads because of estimated pending "
1015               "compaction "
1016               "bytes %" PRIu64,
1017               name_.c_str(), vstorage->estimated_compaction_needed_bytes());
1018         }
1019       } else {
1020         write_controller_token_.reset();
1021       }
1022       // If the DB recovers from delay conditions, we reward with reducing
1023       // double the slowdown ratio. This is to balance the long term slowdown
1024       // increase signal.
1025       if (needed_delay) {
1026         uint64_t write_rate = write_controller->delayed_write_rate();
1027         write_controller->set_delayed_write_rate(static_cast<uint64_t>(
1028             static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
1029         // Set the low pri limit to be 1/4 the delayed write rate.
1030         // Note we don't reset this value even after delay condition is relased.
1031         // Low-pri rate will continue to apply if there is a compaction
1032         // pressure.
1033         write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
1034                                                                     4);
1035       }
1036     }
1037     prev_compaction_needed_bytes_ = compaction_needed_bytes;
1038   }
1039   return write_stall_condition;
1040 }
1041 
soptions() const1042 const FileOptions* ColumnFamilyData::soptions() const {
1043   return &(column_family_set_->file_options_);
1044 }
1045 
SetCurrent(Version * current_version)1046 void ColumnFamilyData::SetCurrent(Version* current_version) {
1047   current_ = current_version;
1048 }
1049 
GetNumLiveVersions() const1050 uint64_t ColumnFamilyData::GetNumLiveVersions() const {
1051   return VersionSet::GetNumLiveVersions(dummy_versions_);
1052 }
1053 
GetTotalSstFilesSize() const1054 uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
1055   return VersionSet::GetTotalSstFilesSize(dummy_versions_);
1056 }
1057 
GetTotalBlobFileSize() const1058 uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
1059   return VersionSet::GetTotalBlobFileSize(dummy_versions_);
1060 }
1061 
GetLiveSstFilesSize() const1062 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
1063   return current_->GetSstFilesSize();
1064 }
1065 
ConstructNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1066 MemTable* ColumnFamilyData::ConstructNewMemtable(
1067     const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1068   return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1069                       write_buffer_manager_, earliest_seq, id_);
1070 }
1071 
CreateNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1072 void ColumnFamilyData::CreateNewMemtable(
1073     const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1074   if (mem_ != nullptr) {
1075     delete mem_->Unref();
1076   }
1077   SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1078   mem_->Ref();
1079 }
1080 
NeedsCompaction() const1081 bool ColumnFamilyData::NeedsCompaction() const {
1082   return !mutable_cf_options_.disable_auto_compactions &&
1083          compaction_picker_->NeedsCompaction(current_->storage_info());
1084 }
1085 
PickCompaction(const MutableCFOptions & mutable_options,const MutableDBOptions & mutable_db_options,LogBuffer * log_buffer)1086 Compaction* ColumnFamilyData::PickCompaction(
1087     const MutableCFOptions& mutable_options,
1088     const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
1089   SequenceNumber earliest_mem_seqno =
1090       std::min(mem_->GetEarliestSequenceNumber(),
1091                imm_.current()->GetEarliestSequenceNumber(false));
1092   auto* result = compaction_picker_->PickCompaction(
1093       GetName(), mutable_options, mutable_db_options, current_->storage_info(),
1094       log_buffer, earliest_mem_seqno);
1095   if (result != nullptr) {
1096     result->SetInputVersion(current_);
1097   }
1098   return result;
1099 }
1100 
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const1101 bool ColumnFamilyData::RangeOverlapWithCompaction(
1102     const Slice& smallest_user_key, const Slice& largest_user_key,
1103     int level) const {
1104   return compaction_picker_->RangeOverlapWithCompaction(
1105       smallest_user_key, largest_user_key, level);
1106 }
1107 
RangesOverlapWithMemtables(const autovector<Range> & ranges,SuperVersion * super_version,bool allow_data_in_errors,bool * overlap)1108 Status ColumnFamilyData::RangesOverlapWithMemtables(
1109     const autovector<Range>& ranges, SuperVersion* super_version,
1110     bool allow_data_in_errors, bool* overlap) {
1111   assert(overlap != nullptr);
1112   *overlap = false;
1113   // Create an InternalIterator over all unflushed memtables
1114   Arena arena;
1115   ReadOptions read_opts;
1116   read_opts.total_order_seek = true;
1117   MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1118   merge_iter_builder.AddIterator(
1119       super_version->mem->NewIterator(read_opts, &arena));
1120   super_version->imm->AddIterators(read_opts, &merge_iter_builder);
1121   ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1122 
1123   auto read_seq = super_version->current->version_set()->LastSequence();
1124   ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1125   auto* active_range_del_iter =
1126       super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
1127   range_del_agg.AddTombstones(
1128       std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1129   Status status;
1130   status = super_version->imm->AddRangeTombstoneIterators(
1131       read_opts, nullptr /* arena */, &range_del_agg);
1132   // AddRangeTombstoneIterators always return Status::OK.
1133   assert(status.ok());
1134 
1135   for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1136     auto* vstorage = super_version->current->storage_info();
1137     auto* ucmp = vstorage->InternalComparator()->user_comparator();
1138     InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1139                             kValueTypeForSeek);
1140     memtable_iter->Seek(range_start.Encode());
1141     status = memtable_iter->status();
1142     ParsedInternalKey seek_result;
1143 
1144     if (status.ok() && memtable_iter->Valid()) {
1145       status = ParseInternalKey(memtable_iter->key(), &seek_result,
1146                                 allow_data_in_errors);
1147     }
1148 
1149     if (status.ok()) {
1150       if (memtable_iter->Valid() &&
1151           ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1152         *overlap = true;
1153       } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1154                                                  ranges[i].limit)) {
1155         *overlap = true;
1156       }
1157     }
1158   }
1159   return status;
1160 }
1161 
1162 const int ColumnFamilyData::kCompactAllLevels = -1;
1163 const int ColumnFamilyData::kCompactToBaseLevel = -2;
1164 
CompactRange(const MutableCFOptions & mutable_cf_options,const MutableDBOptions & mutable_db_options,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * conflict,uint64_t max_file_num_to_ignore)1165 Compaction* ColumnFamilyData::CompactRange(
1166     const MutableCFOptions& mutable_cf_options,
1167     const MutableDBOptions& mutable_db_options, int input_level,
1168     int output_level, const CompactRangeOptions& compact_range_options,
1169     const InternalKey* begin, const InternalKey* end,
1170     InternalKey** compaction_end, bool* conflict,
1171     uint64_t max_file_num_to_ignore) {
1172   auto* result = compaction_picker_->CompactRange(
1173       GetName(), mutable_cf_options, mutable_db_options,
1174       current_->storage_info(), input_level, output_level,
1175       compact_range_options, begin, end, compaction_end, conflict,
1176       max_file_num_to_ignore);
1177   if (result != nullptr) {
1178     result->SetInputVersion(current_);
1179   }
1180   return result;
1181 }
1182 
GetReferencedSuperVersion(DBImpl * db)1183 SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1184   SuperVersion* sv = GetThreadLocalSuperVersion(db);
1185   sv->Ref();
1186   if (!ReturnThreadLocalSuperVersion(sv)) {
1187     // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1188     // when the thread-local pointer was populated. So, the Ref() earlier in
1189     // this function still prevents the returned SuperVersion* from being
1190     // deleted out from under the caller.
1191     sv->Unref();
1192   }
1193   return sv;
1194 }
1195 
GetThreadLocalSuperVersion(DBImpl * db)1196 SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1197   // The SuperVersion is cached in thread local storage to avoid acquiring
1198   // mutex when SuperVersion does not change since the last use. When a new
1199   // SuperVersion is installed, the compaction or flush thread cleans up
1200   // cached SuperVersion in all existing thread local storage. To avoid
1201   // acquiring mutex for this operation, we use atomic Swap() on the thread
1202   // local pointer to guarantee exclusive access. If the thread local pointer
1203   // is being used while a new SuperVersion is installed, the cached
1204   // SuperVersion can become stale. In that case, the background thread would
1205   // have swapped in kSVObsolete. We re-check the value at when returning
1206   // SuperVersion back to thread local, with an atomic compare and swap.
1207   // The superversion will need to be released if detected to be stale.
1208   void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1209   // Invariant:
1210   // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1211   // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1212   // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1213   // (if no Scrape happens).
1214   assert(ptr != SuperVersion::kSVInUse);
1215   SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1216   if (sv == SuperVersion::kSVObsolete ||
1217       sv->version_number != super_version_number_.load()) {
1218     RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
1219     SuperVersion* sv_to_delete = nullptr;
1220 
1221     if (sv && sv->Unref()) {
1222       RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS);
1223       db->mutex()->Lock();
1224       // NOTE: underlying resources held by superversion (sst files) might
1225       // not be released until the next background job.
1226       sv->Cleanup();
1227       if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1228         db->AddSuperVersionsToFreeQueue(sv);
1229         db->SchedulePurge();
1230       } else {
1231         sv_to_delete = sv;
1232       }
1233     } else {
1234       db->mutex()->Lock();
1235     }
1236     sv = super_version_->Ref();
1237     db->mutex()->Unlock();
1238 
1239     delete sv_to_delete;
1240   }
1241   assert(sv != nullptr);
1242   return sv;
1243 }
1244 
ReturnThreadLocalSuperVersion(SuperVersion * sv)1245 bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1246   assert(sv != nullptr);
1247   // Put the SuperVersion back
1248   void* expected = SuperVersion::kSVInUse;
1249   if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1250     // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1251     // storage has not been altered and no Scrape has happened. The
1252     // SuperVersion is still current.
1253     return true;
1254   } else {
1255     // ThreadLocal scrape happened in the process of this GetImpl call (after
1256     // thread local Swap() at the beginning and before CompareAndSwap()).
1257     // This means the SuperVersion it holds is obsolete.
1258     assert(expected == SuperVersion::kSVObsolete);
1259   }
1260   return false;
1261 }
1262 
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex)1263 void ColumnFamilyData::InstallSuperVersion(
1264     SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1265   db_mutex->AssertHeld();
1266   return InstallSuperVersion(sv_context, mutable_cf_options_);
1267 }
1268 
InstallSuperVersion(SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)1269 void ColumnFamilyData::InstallSuperVersion(
1270     SuperVersionContext* sv_context,
1271     const MutableCFOptions& mutable_cf_options) {
1272   SuperVersion* new_superversion = sv_context->new_superversion.release();
1273   new_superversion->mutable_cf_options = mutable_cf_options;
1274   new_superversion->Init(this, mem_, imm_.current(), current_);
1275   SuperVersion* old_superversion = super_version_;
1276   super_version_ = new_superversion;
1277   ++super_version_number_;
1278   super_version_->version_number = super_version_number_;
1279   super_version_->write_stall_condition =
1280       RecalculateWriteStallConditions(mutable_cf_options);
1281 
1282   if (old_superversion != nullptr) {
1283     // Reset SuperVersions cached in thread local storage.
1284     // This should be done before old_superversion->Unref(). That's to ensure
1285     // that local_sv_ never holds the last reference to SuperVersion, since
1286     // it has no means to safely do SuperVersion cleanup.
1287     ResetThreadLocalSuperVersions();
1288 
1289     if (old_superversion->mutable_cf_options.write_buffer_size !=
1290         mutable_cf_options.write_buffer_size) {
1291       mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1292     }
1293     if (old_superversion->write_stall_condition !=
1294         new_superversion->write_stall_condition) {
1295       sv_context->PushWriteStallNotification(
1296           old_superversion->write_stall_condition,
1297           new_superversion->write_stall_condition, GetName(), ioptions());
1298     }
1299     if (old_superversion->Unref()) {
1300       old_superversion->Cleanup();
1301       sv_context->superversions_to_free.push_back(old_superversion);
1302     }
1303   }
1304 }
1305 
ResetThreadLocalSuperVersions()1306 void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1307   autovector<void*> sv_ptrs;
1308   local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1309   for (auto ptr : sv_ptrs) {
1310     assert(ptr);
1311     if (ptr == SuperVersion::kSVInUse) {
1312       continue;
1313     }
1314     auto sv = static_cast<SuperVersion*>(ptr);
1315     bool was_last_ref __attribute__((__unused__));
1316     was_last_ref = sv->Unref();
1317     // sv couldn't have been the last reference because
1318     // ResetThreadLocalSuperVersions() is called before
1319     // unref'ing super_version_.
1320     assert(!was_last_ref);
1321   }
1322 }
1323 
ValidateOptions(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)1324 Status ColumnFamilyData::ValidateOptions(
1325     const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1326   Status s;
1327   s = CheckCompressionSupported(cf_options);
1328   if (s.ok() && db_options.allow_concurrent_memtable_write) {
1329     s = CheckConcurrentWritesSupported(cf_options);
1330   }
1331   if (s.ok() && db_options.unordered_write &&
1332       cf_options.max_successive_merges != 0) {
1333     s = Status::InvalidArgument(
1334         "max_successive_merges > 0 is incompatible with unordered_write");
1335   }
1336   if (s.ok()) {
1337     s = CheckCFPathsSupported(db_options, cf_options);
1338   }
1339   if (!s.ok()) {
1340     return s;
1341   }
1342 
1343   if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1344     if (!cf_options.table_factory->IsInstanceOf(
1345             TableFactory::kBlockBasedTableName())) {
1346       return Status::NotSupported(
1347           "TTL is only supported in Block-Based Table format. ");
1348     }
1349   }
1350 
1351   if (cf_options.periodic_compaction_seconds > 0 &&
1352       cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1353     if (!cf_options.table_factory->IsInstanceOf(
1354             TableFactory::kBlockBasedTableName())) {
1355       return Status::NotSupported(
1356           "Periodic Compaction is only supported in "
1357           "Block-Based Table format. ");
1358     }
1359   }
1360 
1361   if (cf_options.enable_blob_garbage_collection) {
1362     if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
1363         cf_options.blob_garbage_collection_age_cutoff > 1.0) {
1364       return Status::InvalidArgument(
1365           "The age cutoff for blob garbage collection should be in the range "
1366           "[0.0, 1.0].");
1367     }
1368     if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
1369         cf_options.blob_garbage_collection_force_threshold > 1.0) {
1370       return Status::InvalidArgument(
1371           "The garbage ratio threshold for forcing blob garbage collection "
1372           "should be in the range [0.0, 1.0].");
1373     }
1374   }
1375 
1376   if (cf_options.compaction_style == kCompactionStyleFIFO &&
1377       db_options.max_open_files != -1 && cf_options.ttl > 0) {
1378     return Status::NotSupported(
1379         "FIFO compaction only supported with max_open_files = -1.");
1380   }
1381 
1382   return s;
1383 }
1384 
1385 #ifndef ROCKSDB_LITE
SetOptions(const DBOptions & db_opts,const std::unordered_map<std::string,std::string> & options_map)1386 Status ColumnFamilyData::SetOptions(
1387     const DBOptions& db_opts,
1388     const std::unordered_map<std::string, std::string>& options_map) {
1389   ColumnFamilyOptions cf_opts =
1390       BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
1391   ConfigOptions config_opts;
1392   config_opts.mutable_options_only = true;
1393   Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
1394                                            &cf_opts);
1395   if (s.ok()) {
1396     s = ValidateOptions(db_opts, cf_opts);
1397   }
1398   if (s.ok()) {
1399     mutable_cf_options_ = MutableCFOptions(cf_opts);
1400     mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1401   }
1402   return s;
1403 }
1404 #endif  // ROCKSDB_LITE
1405 
1406 // REQUIRES: DB mutex held
CalculateSSTWriteHint(int level)1407 Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1408   if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1409     return Env::WLTH_NOT_SET;
1410   }
1411   if (level == 0) {
1412     return Env::WLTH_MEDIUM;
1413   }
1414   int base_level = current_->storage_info()->base_level();
1415 
1416   // L1: medium, L2: long, ...
1417   if (level - base_level >= 2) {
1418     return Env::WLTH_EXTREME;
1419   } else if (level < base_level) {
1420     // There is no restriction which prevents level passed in to be smaller
1421     // than base_level.
1422     return Env::WLTH_MEDIUM;
1423   }
1424   return static_cast<Env::WriteLifeTimeHint>(level - base_level +
1425                             static_cast<int>(Env::WLTH_MEDIUM));
1426 }
1427 
AddDirectories(std::map<std::string,std::shared_ptr<FSDirectory>> * created_dirs)1428 Status ColumnFamilyData::AddDirectories(
1429     std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1430   Status s;
1431   assert(created_dirs != nullptr);
1432   assert(data_dirs_.empty());
1433   for (auto& p : ioptions_.cf_paths) {
1434     auto existing_dir = created_dirs->find(p.path);
1435 
1436     if (existing_dir == created_dirs->end()) {
1437       std::unique_ptr<FSDirectory> path_directory;
1438       s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
1439                                         &path_directory);
1440       if (!s.ok()) {
1441         return s;
1442       }
1443       assert(path_directory != nullptr);
1444       data_dirs_.emplace_back(path_directory.release());
1445       (*created_dirs)[p.path] = data_dirs_.back();
1446     } else {
1447       data_dirs_.emplace_back(existing_dir->second);
1448     }
1449   }
1450   assert(data_dirs_.size() == ioptions_.cf_paths.size());
1451   return s;
1452 }
1453 
GetDataDir(size_t path_id) const1454 FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1455   if (data_dirs_.empty()) {
1456     return nullptr;
1457   }
1458 
1459   assert(path_id < data_dirs_.size());
1460   return data_dirs_[path_id].get();
1461 }
1462 
ColumnFamilySet(const std::string & dbname,const ImmutableDBOptions * db_options,const FileOptions & file_options,Cache * table_cache,WriteBufferManager * _write_buffer_manager,WriteController * _write_controller,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_session_id)1463 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1464                                  const ImmutableDBOptions* db_options,
1465                                  const FileOptions& file_options,
1466                                  Cache* table_cache,
1467                                  WriteBufferManager* _write_buffer_manager,
1468                                  WriteController* _write_controller,
1469                                  BlockCacheTracer* const block_cache_tracer,
1470                                  const std::shared_ptr<IOTracer>& io_tracer,
1471                                  const std::string& db_session_id)
1472     : max_column_family_(0),
1473       file_options_(file_options),
1474       dummy_cfd_(new ColumnFamilyData(
1475           ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1476           nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
1477           block_cache_tracer, io_tracer, db_session_id)),
1478       default_cfd_cache_(nullptr),
1479       db_name_(dbname),
1480       db_options_(db_options),
1481       table_cache_(table_cache),
1482       write_buffer_manager_(_write_buffer_manager),
1483       write_controller_(_write_controller),
1484       block_cache_tracer_(block_cache_tracer),
1485       io_tracer_(io_tracer),
1486       db_session_id_(db_session_id) {
1487   // initialize linked list
1488   dummy_cfd_->prev_ = dummy_cfd_;
1489   dummy_cfd_->next_ = dummy_cfd_;
1490 }
1491 
~ColumnFamilySet()1492 ColumnFamilySet::~ColumnFamilySet() {
1493   while (column_family_data_.size() > 0) {
1494     // cfd destructor will delete itself from column_family_data_
1495     auto cfd = column_family_data_.begin()->second;
1496     bool last_ref __attribute__((__unused__));
1497     last_ref = cfd->UnrefAndTryDelete();
1498     assert(last_ref);
1499   }
1500   bool dummy_last_ref __attribute__((__unused__));
1501   dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1502   assert(dummy_last_ref);
1503 }
1504 
GetDefault() const1505 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1506   assert(default_cfd_cache_ != nullptr);
1507   return default_cfd_cache_;
1508 }
1509 
GetColumnFamily(uint32_t id) const1510 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1511   auto cfd_iter = column_family_data_.find(id);
1512   if (cfd_iter != column_family_data_.end()) {
1513     return cfd_iter->second;
1514   } else {
1515     return nullptr;
1516   }
1517 }
1518 
GetColumnFamily(const std::string & name) const1519 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
1520     const {
1521   auto cfd_iter = column_families_.find(name);
1522   if (cfd_iter != column_families_.end()) {
1523     auto cfd = GetColumnFamily(cfd_iter->second);
1524     assert(cfd != nullptr);
1525     return cfd;
1526   } else {
1527     return nullptr;
1528   }
1529 }
1530 
GetNextColumnFamilyID()1531 uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1532   return ++max_column_family_;
1533 }
1534 
GetMaxColumnFamily()1535 uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1536 
UpdateMaxColumnFamily(uint32_t new_max_column_family)1537 void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1538   max_column_family_ = std::max(new_max_column_family, max_column_family_);
1539 }
1540 
NumberOfColumnFamilies() const1541 size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1542   return column_families_.size();
1543 }
1544 
1545 // under a DB mutex AND write thread
CreateColumnFamily(const std::string & name,uint32_t id,Version * dummy_versions,const ColumnFamilyOptions & options)1546 ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1547     const std::string& name, uint32_t id, Version* dummy_versions,
1548     const ColumnFamilyOptions& options) {
1549   assert(column_families_.find(name) == column_families_.end());
1550   ColumnFamilyData* new_cfd = new ColumnFamilyData(
1551       id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1552       *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
1553       db_session_id_);
1554   column_families_.insert({name, id});
1555   column_family_data_.insert({id, new_cfd});
1556   max_column_family_ = std::max(max_column_family_, id);
1557   // add to linked list
1558   new_cfd->next_ = dummy_cfd_;
1559   auto prev = dummy_cfd_->prev_;
1560   new_cfd->prev_ = prev;
1561   prev->next_ = new_cfd;
1562   dummy_cfd_->prev_ = new_cfd;
1563   if (id == 0) {
1564     default_cfd_cache_ = new_cfd;
1565   }
1566   return new_cfd;
1567 }
1568 
1569 // REQUIRES: DB mutex held
FreeDeadColumnFamilies()1570 void ColumnFamilySet::FreeDeadColumnFamilies() {
1571   autovector<ColumnFamilyData*> to_delete;
1572   for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
1573     if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1574       to_delete.push_back(cfd);
1575     }
1576   }
1577   for (auto cfd : to_delete) {
1578     // this is very rare, so it's not a problem that we do it under a mutex
1579     delete cfd;
1580   }
1581 }
1582 
1583 // under a DB mutex AND from a write thread
RemoveColumnFamily(ColumnFamilyData * cfd)1584 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1585   auto cfd_iter = column_family_data_.find(cfd->GetID());
1586   assert(cfd_iter != column_family_data_.end());
1587   column_family_data_.erase(cfd_iter);
1588   column_families_.erase(cfd->GetName());
1589 }
1590 
1591 // under a DB mutex OR from a write thread
Seek(uint32_t column_family_id)1592 bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1593   if (column_family_id == 0) {
1594     // optimization for common case
1595     current_ = column_family_set_->GetDefault();
1596   } else {
1597     current_ = column_family_set_->GetColumnFamily(column_family_id);
1598   }
1599   handle_.SetCFD(current_);
1600   return current_ != nullptr;
1601 }
1602 
GetLogNumber() const1603 uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1604   assert(current_ != nullptr);
1605   return current_->GetLogNumber();
1606 }
1607 
GetMemTable() const1608 MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1609   assert(current_ != nullptr);
1610   return current_->mem();
1611 }
1612 
GetColumnFamilyHandle()1613 ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1614   assert(current_ != nullptr);
1615   return &handle_;
1616 }
1617 
GetColumnFamilyID(ColumnFamilyHandle * column_family)1618 uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1619   uint32_t column_family_id = 0;
1620   if (column_family != nullptr) {
1621     auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1622     column_family_id = cfh->GetID();
1623   }
1624   return column_family_id;
1625 }
1626 
GetColumnFamilyUserComparator(ColumnFamilyHandle * column_family)1627 const Comparator* GetColumnFamilyUserComparator(
1628     ColumnFamilyHandle* column_family) {
1629   if (column_family != nullptr) {
1630     return column_family->GetComparator();
1631   }
1632   return nullptr;
1633 }
1634 
1635 }  // namespace ROCKSDB_NAMESPACE
1636