1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/column_family.h"
11
12 #include <algorithm>
13 #include <cinttypes>
14 #include <limits>
15 #include <sstream>
16 #include <string>
17 #include <vector>
18
19 #include "db/blob/blob_file_cache.h"
20 #include "db/compaction/compaction_picker.h"
21 #include "db/compaction/compaction_picker_fifo.h"
22 #include "db/compaction/compaction_picker_level.h"
23 #include "db/compaction/compaction_picker_universal.h"
24 #include "db/db_impl/db_impl.h"
25 #include "db/internal_stats.h"
26 #include "db/job_context.h"
27 #include "db/range_del_aggregator.h"
28 #include "db/table_properties_collector.h"
29 #include "db/version_set.h"
30 #include "db/write_controller.h"
31 #include "file/sst_file_manager_impl.h"
32 #include "logging/logging.h"
33 #include "monitoring/thread_status_util.h"
34 #include "options/options_helper.h"
35 #include "port/port.h"
36 #include "rocksdb/convenience.h"
37 #include "rocksdb/table.h"
38 #include "table/merging_iterator.h"
39 #include "util/autovector.h"
40 #include "util/cast_util.h"
41 #include "util/compression.h"
42
43 namespace ROCKSDB_NAMESPACE {
44
ColumnFamilyHandleImpl(ColumnFamilyData * column_family_data,DBImpl * db,InstrumentedMutex * mutex)45 ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
46 ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
47 : cfd_(column_family_data), db_(db), mutex_(mutex) {
48 if (cfd_ != nullptr) {
49 cfd_->Ref();
50 }
51 }
52
~ColumnFamilyHandleImpl()53 ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
54 if (cfd_ != nullptr) {
55 #ifndef ROCKSDB_LITE
56 for (auto& listener : cfd_->ioptions()->listeners) {
57 listener->OnColumnFamilyHandleDeletionStarted(this);
58 }
59 #endif // ROCKSDB_LITE
60 // Job id == 0 means that this is not our background process, but rather
61 // user thread
62 // Need to hold some shared pointers owned by the initial_cf_options
63 // before final cleaning up finishes.
64 ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options();
65 JobContext job_context(0);
66 mutex_->Lock();
67 bool dropped = cfd_->IsDropped();
68 if (cfd_->UnrefAndTryDelete()) {
69 if (dropped) {
70 db_->FindObsoleteFiles(&job_context, false, true);
71 }
72 }
73 mutex_->Unlock();
74 if (job_context.HaveSomethingToDelete()) {
75 bool defer_purge =
76 db_->immutable_db_options().avoid_unnecessary_blocking_io;
77 db_->PurgeObsoleteFiles(job_context, defer_purge);
78 if (defer_purge) {
79 mutex_->Lock();
80 db_->SchedulePurge();
81 mutex_->Unlock();
82 }
83 }
84 job_context.Clean();
85 }
86 }
87
GetID() const88 uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
89
GetName() const90 const std::string& ColumnFamilyHandleImpl::GetName() const {
91 return cfd()->GetName();
92 }
93
GetDescriptor(ColumnFamilyDescriptor * desc)94 Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
95 #ifndef ROCKSDB_LITE
96 // accessing mutable cf-options requires db mutex.
97 InstrumentedMutexLock l(mutex_);
98 *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
99 return Status::OK();
100 #else
101 (void)desc;
102 return Status::NotSupported();
103 #endif // !ROCKSDB_LITE
104 }
105
GetComparator() const106 const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
107 return cfd()->user_comparator();
108 }
109
GetIntTblPropCollectorFactory(const ImmutableCFOptions & ioptions,IntTblPropCollectorFactories * int_tbl_prop_collector_factories)110 void GetIntTblPropCollectorFactory(
111 const ImmutableCFOptions& ioptions,
112 IntTblPropCollectorFactories* int_tbl_prop_collector_factories) {
113 assert(int_tbl_prop_collector_factories);
114
115 auto& collector_factories = ioptions.table_properties_collector_factories;
116 for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
117 ++i) {
118 assert(collector_factories[i]);
119 int_tbl_prop_collector_factories->emplace_back(
120 new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
121 }
122 }
123
CheckCompressionSupported(const ColumnFamilyOptions & cf_options)124 Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
125 if (!cf_options.compression_per_level.empty()) {
126 for (size_t level = 0; level < cf_options.compression_per_level.size();
127 ++level) {
128 if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
129 return Status::InvalidArgument(
130 "Compression type " +
131 CompressionTypeToString(cf_options.compression_per_level[level]) +
132 " is not linked with the binary.");
133 }
134 }
135 } else {
136 if (!CompressionTypeSupported(cf_options.compression)) {
137 return Status::InvalidArgument(
138 "Compression type " +
139 CompressionTypeToString(cf_options.compression) +
140 " is not linked with the binary.");
141 }
142 }
143 if (cf_options.compression_opts.zstd_max_train_bytes > 0) {
144 if (!ZSTD_TrainDictionarySupported()) {
145 return Status::InvalidArgument(
146 "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ "
147 "is not linked with the binary.");
148 }
149 if (cf_options.compression_opts.max_dict_bytes == 0) {
150 return Status::InvalidArgument(
151 "The dictionary size limit (`CompressionOptions::max_dict_bytes`) "
152 "should be nonzero if we're using zstd's dictionary generator.");
153 }
154 }
155
156 if (!CompressionTypeSupported(cf_options.blob_compression_type)) {
157 std::ostringstream oss;
158 oss << "The specified blob compression type "
159 << CompressionTypeToString(cf_options.blob_compression_type)
160 << " is not available.";
161
162 return Status::InvalidArgument(oss.str());
163 }
164
165 return Status::OK();
166 }
167
CheckConcurrentWritesSupported(const ColumnFamilyOptions & cf_options)168 Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
169 if (cf_options.inplace_update_support) {
170 return Status::InvalidArgument(
171 "In-place memtable updates (inplace_update_support) is not compatible "
172 "with concurrent writes (allow_concurrent_memtable_write)");
173 }
174 if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
175 return Status::InvalidArgument(
176 "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
177 }
178 return Status::OK();
179 }
180
CheckCFPathsSupported(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)181 Status CheckCFPathsSupported(const DBOptions& db_options,
182 const ColumnFamilyOptions& cf_options) {
183 // More than one cf_paths are supported only in universal
184 // and level compaction styles. This function also checks the case
185 // in which cf_paths is not specified, which results in db_paths
186 // being used.
187 if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
188 (cf_options.compaction_style != kCompactionStyleLevel)) {
189 if (cf_options.cf_paths.size() > 1) {
190 return Status::NotSupported(
191 "More than one CF paths are only supported in "
192 "universal and level compaction styles. ");
193 } else if (cf_options.cf_paths.empty() &&
194 db_options.db_paths.size() > 1) {
195 return Status::NotSupported(
196 "More than one DB paths are only supported in "
197 "universal and level compaction styles. ");
198 }
199 }
200 return Status::OK();
201 }
202
203 namespace {
204 const uint64_t kDefaultTtl = 0xfffffffffffffffe;
205 const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe;
206 } // namespace
207
SanitizeOptions(const ImmutableDBOptions & db_options,const ColumnFamilyOptions & src)208 ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
209 const ColumnFamilyOptions& src) {
210 ColumnFamilyOptions result = src;
211 size_t clamp_max = std::conditional<
212 sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
213 std::integral_constant<uint64_t, 64ull << 30>>::type::value;
214 ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
215 // if user sets arena_block_size, we trust user to use this value. Otherwise,
216 // calculate a proper value from writer_buffer_size;
217 if (result.arena_block_size <= 0) {
218 result.arena_block_size =
219 std::min(size_t{1024 * 1024}, result.write_buffer_size / 8);
220
221 // Align up to 4k
222 const size_t align = 4 * 1024;
223 result.arena_block_size =
224 ((result.arena_block_size + align - 1) / align) * align;
225 }
226 result.min_write_buffer_number_to_merge =
227 std::min(result.min_write_buffer_number_to_merge,
228 result.max_write_buffer_number - 1);
229 if (result.min_write_buffer_number_to_merge < 1) {
230 result.min_write_buffer_number_to_merge = 1;
231 }
232
233 if (result.num_levels < 1) {
234 result.num_levels = 1;
235 }
236 if (result.compaction_style == kCompactionStyleLevel &&
237 result.num_levels < 2) {
238 result.num_levels = 2;
239 }
240
241 if (result.compaction_style == kCompactionStyleUniversal &&
242 db_options.allow_ingest_behind && result.num_levels < 3) {
243 result.num_levels = 3;
244 }
245
246 if (result.max_write_buffer_number < 2) {
247 result.max_write_buffer_number = 2;
248 }
249 // fall back max_write_buffer_number_to_maintain if
250 // max_write_buffer_size_to_maintain is not set
251 if (result.max_write_buffer_size_to_maintain < 0) {
252 result.max_write_buffer_size_to_maintain =
253 result.max_write_buffer_number *
254 static_cast<int64_t>(result.write_buffer_size);
255 } else if (result.max_write_buffer_size_to_maintain == 0 &&
256 result.max_write_buffer_number_to_maintain < 0) {
257 result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
258 }
259 // bloom filter size shouldn't exceed 1/4 of memtable size.
260 if (result.memtable_prefix_bloom_size_ratio > 0.25) {
261 result.memtable_prefix_bloom_size_ratio = 0.25;
262 } else if (result.memtable_prefix_bloom_size_ratio < 0) {
263 result.memtable_prefix_bloom_size_ratio = 0;
264 }
265
266 if (!result.prefix_extractor) {
267 assert(result.memtable_factory);
268 Slice name = result.memtable_factory->Name();
269 if (name.compare("HashSkipListRepFactory") == 0 ||
270 name.compare("HashLinkListRepFactory") == 0) {
271 result.memtable_factory = std::make_shared<SkipListFactory>();
272 }
273 }
274
275 if (result.compaction_style == kCompactionStyleFIFO) {
276 result.num_levels = 1;
277 // since we delete level0 files in FIFO compaction when there are too many
278 // of them, these options don't really mean anything
279 result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
280 result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
281 }
282
283 if (result.max_bytes_for_level_multiplier <= 0) {
284 result.max_bytes_for_level_multiplier = 1;
285 }
286
287 if (result.level0_file_num_compaction_trigger == 0) {
288 ROCKS_LOG_WARN(db_options.logger,
289 "level0_file_num_compaction_trigger cannot be 0");
290 result.level0_file_num_compaction_trigger = 1;
291 }
292
293 if (result.level0_stop_writes_trigger <
294 result.level0_slowdown_writes_trigger ||
295 result.level0_slowdown_writes_trigger <
296 result.level0_file_num_compaction_trigger) {
297 ROCKS_LOG_WARN(db_options.logger,
298 "This condition must be satisfied: "
299 "level0_stop_writes_trigger(%d) >= "
300 "level0_slowdown_writes_trigger(%d) >= "
301 "level0_file_num_compaction_trigger(%d)",
302 result.level0_stop_writes_trigger,
303 result.level0_slowdown_writes_trigger,
304 result.level0_file_num_compaction_trigger);
305 if (result.level0_slowdown_writes_trigger <
306 result.level0_file_num_compaction_trigger) {
307 result.level0_slowdown_writes_trigger =
308 result.level0_file_num_compaction_trigger;
309 }
310 if (result.level0_stop_writes_trigger <
311 result.level0_slowdown_writes_trigger) {
312 result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
313 }
314 ROCKS_LOG_WARN(db_options.logger,
315 "Adjust the value to "
316 "level0_stop_writes_trigger(%d)"
317 "level0_slowdown_writes_trigger(%d)"
318 "level0_file_num_compaction_trigger(%d)",
319 result.level0_stop_writes_trigger,
320 result.level0_slowdown_writes_trigger,
321 result.level0_file_num_compaction_trigger);
322 }
323
324 if (result.soft_pending_compaction_bytes_limit == 0) {
325 result.soft_pending_compaction_bytes_limit =
326 result.hard_pending_compaction_bytes_limit;
327 } else if (result.hard_pending_compaction_bytes_limit > 0 &&
328 result.soft_pending_compaction_bytes_limit >
329 result.hard_pending_compaction_bytes_limit) {
330 result.soft_pending_compaction_bytes_limit =
331 result.hard_pending_compaction_bytes_limit;
332 }
333
334 #ifndef ROCKSDB_LITE
335 // When the DB is stopped, it's possible that there are some .trash files that
336 // were not deleted yet, when we open the DB we will find these .trash files
337 // and schedule them to be deleted (or delete immediately if SstFileManager
338 // was not used)
339 auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
340 for (size_t i = 0; i < result.cf_paths.size(); i++) {
341 DeleteScheduler::CleanupDirectory(db_options.env, sfm,
342 result.cf_paths[i].path)
343 .PermitUncheckedError();
344 }
345 #endif
346
347 if (result.cf_paths.empty()) {
348 result.cf_paths = db_options.db_paths;
349 }
350
351 if (result.level_compaction_dynamic_level_bytes) {
352 if (result.compaction_style != kCompactionStyleLevel) {
353 ROCKS_LOG_WARN(db_options.info_log.get(),
354 "level_compaction_dynamic_level_bytes only makes sense"
355 "for level-based compaction");
356 result.level_compaction_dynamic_level_bytes = false;
357 } else if (result.cf_paths.size() > 1U) {
358 // we don't yet know how to make both of this feature and multiple
359 // DB path work.
360 ROCKS_LOG_WARN(db_options.info_log.get(),
361 "multiple cf_paths/db_paths and"
362 "level_compaction_dynamic_level_bytes"
363 "can't be used together");
364 result.level_compaction_dynamic_level_bytes = false;
365 }
366 }
367
368 if (result.max_compaction_bytes == 0) {
369 result.max_compaction_bytes = result.target_file_size_base * 25;
370 }
371
372 bool is_block_based_table = (result.table_factory->IsInstanceOf(
373 TableFactory::kBlockBasedTableName()));
374
375 const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60;
376 if (result.ttl == kDefaultTtl) {
377 if (is_block_based_table &&
378 result.compaction_style != kCompactionStyleFIFO) {
379 result.ttl = kAdjustedTtl;
380 } else {
381 result.ttl = 0;
382 }
383 }
384
385 const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60;
386
387 // Turn on periodic compactions and set them to occur once every 30 days if
388 // compaction filters are used and periodic_compaction_seconds is set to the
389 // default value.
390 if (result.compaction_style != kCompactionStyleFIFO) {
391 if ((result.compaction_filter != nullptr ||
392 result.compaction_filter_factory != nullptr) &&
393 result.periodic_compaction_seconds == kDefaultPeriodicCompSecs &&
394 is_block_based_table) {
395 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
396 }
397 } else {
398 // result.compaction_style == kCompactionStyleFIFO
399 if (result.ttl == 0) {
400 if (is_block_based_table) {
401 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
402 result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs;
403 }
404 result.ttl = result.periodic_compaction_seconds;
405 }
406 } else if (result.periodic_compaction_seconds != 0) {
407 result.ttl = std::min(result.ttl, result.periodic_compaction_seconds);
408 }
409 }
410
411 // TTL compactions would work similar to Periodic Compactions in Universal in
412 // most of the cases. So, if ttl is set, execute the periodic compaction
413 // codepath.
414 if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) {
415 if (result.periodic_compaction_seconds != 0) {
416 result.periodic_compaction_seconds =
417 std::min(result.ttl, result.periodic_compaction_seconds);
418 } else {
419 result.periodic_compaction_seconds = result.ttl;
420 }
421 }
422
423 if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) {
424 result.periodic_compaction_seconds = 0;
425 }
426
427 return result;
428 }
429
430 int SuperVersion::dummy = 0;
431 void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
432 void* const SuperVersion::kSVObsolete = nullptr;
433
~SuperVersion()434 SuperVersion::~SuperVersion() {
435 for (auto td : to_delete) {
436 delete td;
437 }
438 }
439
Ref()440 SuperVersion* SuperVersion::Ref() {
441 refs.fetch_add(1, std::memory_order_relaxed);
442 return this;
443 }
444
Unref()445 bool SuperVersion::Unref() {
446 // fetch_sub returns the previous value of ref
447 uint32_t previous_refs = refs.fetch_sub(1);
448 assert(previous_refs > 0);
449 return previous_refs == 1;
450 }
451
Cleanup()452 void SuperVersion::Cleanup() {
453 assert(refs.load(std::memory_order_relaxed) == 0);
454 // Since this SuperVersion object is being deleted,
455 // decrement reference to the immutable MemtableList
456 // this SV object was pointing to.
457 imm->Unref(&to_delete);
458 MemTable* m = mem->Unref();
459 if (m != nullptr) {
460 auto* memory_usage = current->cfd()->imm()->current_memory_usage();
461 assert(*memory_usage >= m->ApproximateMemoryUsage());
462 *memory_usage -= m->ApproximateMemoryUsage();
463 to_delete.push_back(m);
464 }
465 current->Unref();
466 cfd->UnrefAndTryDelete();
467 }
468
Init(ColumnFamilyData * new_cfd,MemTable * new_mem,MemTableListVersion * new_imm,Version * new_current)469 void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem,
470 MemTableListVersion* new_imm, Version* new_current) {
471 cfd = new_cfd;
472 mem = new_mem;
473 imm = new_imm;
474 current = new_current;
475 cfd->Ref();
476 mem->Ref();
477 imm->Ref();
478 current->Ref();
479 refs.store(1, std::memory_order_relaxed);
480 }
481
482 namespace {
SuperVersionUnrefHandle(void * ptr)483 void SuperVersionUnrefHandle(void* ptr) {
484 // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets
485 // destroyed. When the former happens, the thread shouldn't see kSVInUse.
486 // When the latter happens, only super_version_ holds a reference
487 // to ColumnFamilyData, so no further queries are possible.
488 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
489 bool was_last_ref __attribute__((__unused__));
490 was_last_ref = sv->Unref();
491 // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_.
492 // This is important because we can't do SuperVersion cleanup here.
493 // That would require locking DB mutex, which would deadlock because
494 // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex.
495 assert(!was_last_ref);
496 }
497 } // anonymous namespace
498
GetDbPaths() const499 std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
500 std::vector<std::string> paths;
501 paths.reserve(ioptions_.cf_paths.size());
502 for (const DbPath& db_path : ioptions_.cf_paths) {
503 paths.emplace_back(db_path.path);
504 }
505 return paths;
506 }
507
508 const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
509
ColumnFamilyData(uint32_t id,const std::string & name,Version * _dummy_versions,Cache * _table_cache,WriteBufferManager * write_buffer_manager,const ColumnFamilyOptions & cf_options,const ImmutableDBOptions & db_options,const FileOptions * file_options,ColumnFamilySet * column_family_set,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_session_id)510 ColumnFamilyData::ColumnFamilyData(
511 uint32_t id, const std::string& name, Version* _dummy_versions,
512 Cache* _table_cache, WriteBufferManager* write_buffer_manager,
513 const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
514 const FileOptions* file_options, ColumnFamilySet* column_family_set,
515 BlockCacheTracer* const block_cache_tracer,
516 const std::shared_ptr<IOTracer>& io_tracer,
517 const std::string& db_session_id)
518 : id_(id),
519 name_(name),
520 dummy_versions_(_dummy_versions),
521 current_(nullptr),
522 refs_(0),
523 initialized_(false),
524 dropped_(false),
525 internal_comparator_(cf_options.comparator),
526 initial_cf_options_(SanitizeOptions(db_options, cf_options)),
527 ioptions_(db_options, initial_cf_options_),
528 mutable_cf_options_(initial_cf_options_),
529 is_delete_range_supported_(
530 cf_options.table_factory->IsDeleteRangeSupported()),
531 write_buffer_manager_(write_buffer_manager),
532 mem_(nullptr),
533 imm_(ioptions_.min_write_buffer_number_to_merge,
534 ioptions_.max_write_buffer_number_to_maintain,
535 ioptions_.max_write_buffer_size_to_maintain),
536 super_version_(nullptr),
537 super_version_number_(0),
538 local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
539 next_(nullptr),
540 prev_(nullptr),
541 log_number_(0),
542 flush_reason_(FlushReason::kOthers),
543 column_family_set_(column_family_set),
544 queued_for_flush_(false),
545 queued_for_compaction_(false),
546 prev_compaction_needed_bytes_(0),
547 allow_2pc_(db_options.allow_2pc),
548 last_memtable_id_(0),
549 db_paths_registered_(false) {
550 if (id_ != kDummyColumnFamilyDataId) {
551 // TODO(cc): RegisterDbPaths can be expensive, considering moving it
552 // outside of this constructor which might be called with db mutex held.
553 // TODO(cc): considering using ioptions_.fs, currently some tests rely on
554 // EnvWrapper, that's the main reason why we use env here.
555 Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
556 if (s.ok()) {
557 db_paths_registered_ = true;
558 } else {
559 ROCKS_LOG_ERROR(
560 ioptions_.logger,
561 "Failed to register data paths of column family (id: %d, name: %s)",
562 id_, name_.c_str());
563 }
564 }
565 Ref();
566
567 // Convert user defined table properties collector factories to internal ones.
568 GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
569
570 // if _dummy_versions is nullptr, then this is a dummy column family.
571 if (_dummy_versions != nullptr) {
572 internal_stats_.reset(
573 new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
574 table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
575 block_cache_tracer, io_tracer,
576 db_session_id));
577 blob_file_cache_.reset(
578 new BlobFileCache(_table_cache, ioptions(), soptions(), id_,
579 internal_stats_->GetBlobFileReadHist(), io_tracer));
580
581 if (ioptions_.compaction_style == kCompactionStyleLevel) {
582 compaction_picker_.reset(
583 new LevelCompactionPicker(ioptions_, &internal_comparator_));
584 #ifndef ROCKSDB_LITE
585 } else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
586 compaction_picker_.reset(
587 new UniversalCompactionPicker(ioptions_, &internal_comparator_));
588 } else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
589 compaction_picker_.reset(
590 new FIFOCompactionPicker(ioptions_, &internal_comparator_));
591 } else if (ioptions_.compaction_style == kCompactionStyleNone) {
592 compaction_picker_.reset(new NullCompactionPicker(
593 ioptions_, &internal_comparator_));
594 ROCKS_LOG_WARN(ioptions_.logger,
595 "Column family %s does not use any background compaction. "
596 "Compactions can only be done via CompactFiles\n",
597 GetName().c_str());
598 #endif // !ROCKSDB_LITE
599 } else {
600 ROCKS_LOG_ERROR(ioptions_.logger,
601 "Unable to recognize the specified compaction style %d. "
602 "Column family %s will use kCompactionStyleLevel.\n",
603 ioptions_.compaction_style, GetName().c_str());
604 compaction_picker_.reset(
605 new LevelCompactionPicker(ioptions_, &internal_comparator_));
606 }
607
608 if (column_family_set_->NumberOfColumnFamilies() < 10) {
609 ROCKS_LOG_INFO(ioptions_.logger,
610 "--------------- Options for column family [%s]:\n",
611 name.c_str());
612 initial_cf_options_.Dump(ioptions_.logger);
613 } else {
614 ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n");
615 }
616 }
617
618 RecalculateWriteStallConditions(mutable_cf_options_);
619 }
620
621 // DB mutex held
~ColumnFamilyData()622 ColumnFamilyData::~ColumnFamilyData() {
623 assert(refs_.load(std::memory_order_relaxed) == 0);
624 // remove from linked list
625 auto prev = prev_;
626 auto next = next_;
627 prev->next_ = next;
628 next->prev_ = prev;
629
630 if (!dropped_ && column_family_set_ != nullptr) {
631 // If it's dropped, it's already removed from column family set
632 // If column_family_set_ == nullptr, this is dummy CFD and not in
633 // ColumnFamilySet
634 column_family_set_->RemoveColumnFamily(this);
635 }
636
637 if (current_ != nullptr) {
638 current_->Unref();
639 }
640
641 // It would be wrong if this ColumnFamilyData is in flush_queue_ or
642 // compaction_queue_ and we destroyed it
643 assert(!queued_for_flush_);
644 assert(!queued_for_compaction_);
645 assert(super_version_ == nullptr);
646
647 if (dummy_versions_ != nullptr) {
648 // List must be empty
649 assert(dummy_versions_->Next() == dummy_versions_);
650 bool deleted __attribute__((__unused__));
651 deleted = dummy_versions_->Unref();
652 assert(deleted);
653 }
654
655 if (mem_ != nullptr) {
656 delete mem_->Unref();
657 }
658 autovector<MemTable*> to_delete;
659 imm_.current()->Unref(&to_delete);
660 for (MemTable* m : to_delete) {
661 delete m;
662 }
663
664 if (db_paths_registered_) {
665 // TODO(cc): considering using ioptions_.fs, currently some tests rely on
666 // EnvWrapper, that's the main reason why we use env here.
667 Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
668 if (!s.ok()) {
669 ROCKS_LOG_ERROR(
670 ioptions_.logger,
671 "Failed to unregister data paths of column family (id: %d, name: %s)",
672 id_, name_.c_str());
673 }
674 }
675 }
676
UnrefAndTryDelete()677 bool ColumnFamilyData::UnrefAndTryDelete() {
678 int old_refs = refs_.fetch_sub(1);
679 assert(old_refs > 0);
680
681 if (old_refs == 1) {
682 assert(super_version_ == nullptr);
683 delete this;
684 return true;
685 }
686
687 if (old_refs == 2 && super_version_ != nullptr) {
688 // Only the super_version_ holds me
689 SuperVersion* sv = super_version_;
690 super_version_ = nullptr;
691
692 // Release SuperVersion references kept in ThreadLocalPtr.
693 local_sv_.reset();
694
695 if (sv->Unref()) {
696 // Note: sv will delete this ColumnFamilyData during Cleanup()
697 assert(sv->cfd == this);
698 sv->Cleanup();
699 delete sv;
700 return true;
701 }
702 }
703 return false;
704 }
705
SetDropped()706 void ColumnFamilyData::SetDropped() {
707 // can't drop default CF
708 assert(id_ != 0);
709 dropped_ = true;
710 write_controller_token_.reset();
711
712 // remove from column_family_set
713 column_family_set_->RemoveColumnFamily(this);
714 }
715
GetLatestCFOptions() const716 ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
717 return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
718 }
719
OldestLogToKeep()720 uint64_t ColumnFamilyData::OldestLogToKeep() {
721 auto current_log = GetLogNumber();
722
723 if (allow_2pc_) {
724 auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection();
725 auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
726
727 if (imm_prep_log > 0 && imm_prep_log < current_log) {
728 current_log = imm_prep_log;
729 }
730
731 if (mem_prep_log > 0 && mem_prep_log < current_log) {
732 current_log = mem_prep_log;
733 }
734 }
735
736 return current_log;
737 }
738
739 const double kIncSlowdownRatio = 0.8;
740 const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
741 const double kNearStopSlowdownRatio = 0.6;
742 const double kDelayRecoverSlowdownRatio = 1.4;
743
744 namespace {
745 // If penalize_stop is true, we further reduce slowdown rate.
SetupDelay(WriteController * write_controller,uint64_t compaction_needed_bytes,uint64_t prev_compaction_need_bytes,bool penalize_stop,bool auto_comapctions_disabled)746 std::unique_ptr<WriteControllerToken> SetupDelay(
747 WriteController* write_controller, uint64_t compaction_needed_bytes,
748 uint64_t prev_compaction_need_bytes, bool penalize_stop,
749 bool auto_comapctions_disabled) {
750 const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
751
752 uint64_t max_write_rate = write_controller->max_delayed_write_rate();
753 uint64_t write_rate = write_controller->delayed_write_rate();
754
755 if (auto_comapctions_disabled) {
756 // When auto compaction is disabled, always use the value user gave.
757 write_rate = max_write_rate;
758 } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
759 // If user gives rate less than kMinWriteRate, don't adjust it.
760 //
761 // If already delayed, need to adjust based on previous compaction debt.
762 // When there are two or more column families require delay, we always
763 // increase or reduce write rate based on information for one single
764 // column family. It is likely to be OK but we can improve if there is a
765 // problem.
766 // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
767 // is only available in level-based compaction
768 //
769 // If the compaction debt stays the same as previously, we also further slow
770 // down. It usually means a mem table is full. It's mainly for the case
771 // where both of flush and compaction are much slower than the speed we
772 // insert to mem tables, so we need to actively slow down before we get
773 // feedback signal from compaction and flushes to avoid the full stop
774 // because of hitting the max write buffer number.
775 //
776 // If DB just falled into the stop condition, we need to further reduce
777 // the write rate to avoid the stop condition.
778 if (penalize_stop) {
779 // Penalize the near stop or stop condition by more aggressive slowdown.
780 // This is to provide the long term slowdown increase signal.
781 // The penalty is more than the reward of recovering to the normal
782 // condition.
783 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
784 kNearStopSlowdownRatio);
785 if (write_rate < kMinWriteRate) {
786 write_rate = kMinWriteRate;
787 }
788 } else if (prev_compaction_need_bytes > 0 &&
789 prev_compaction_need_bytes <= compaction_needed_bytes) {
790 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
791 kIncSlowdownRatio);
792 if (write_rate < kMinWriteRate) {
793 write_rate = kMinWriteRate;
794 }
795 } else if (prev_compaction_need_bytes > compaction_needed_bytes) {
796 // We are speeding up by ratio of kSlowdownRatio when we have paid
797 // compaction debt. But we'll never speed up to faster than the write rate
798 // given by users.
799 write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
800 kDecSlowdownRatio);
801 if (write_rate > max_write_rate) {
802 write_rate = max_write_rate;
803 }
804 }
805 }
806 return write_controller->GetDelayToken(write_rate);
807 }
808
GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,int level0_slowdown_writes_trigger)809 int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
810 int level0_slowdown_writes_trigger) {
811 // SanitizeOptions() ensures it.
812 assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
813
814 if (level0_file_num_compaction_trigger < 0) {
815 return std::numeric_limits<int>::max();
816 }
817
818 const int64_t twice_level0_trigger =
819 static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
820
821 const int64_t one_fourth_trigger_slowdown =
822 static_cast<int64_t>(level0_file_num_compaction_trigger) +
823 ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
824 4);
825
826 assert(twice_level0_trigger >= 0);
827 assert(one_fourth_trigger_slowdown >= 0);
828
829 // 1/4 of the way between L0 compaction trigger threshold and slowdown
830 // condition.
831 // Or twice as compaction trigger, if it is smaller.
832 int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
833 if (res >= port::kMaxInt32) {
834 return port::kMaxInt32;
835 } else {
836 // res fits in int
837 return static_cast<int>(res);
838 }
839 }
840 } // namespace
841
842 std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause>
GetWriteStallConditionAndCause(int num_unflushed_memtables,int num_l0_files,uint64_t num_compaction_needed_bytes,const MutableCFOptions & mutable_cf_options,const ImmutableCFOptions & immutable_cf_options)843 ColumnFamilyData::GetWriteStallConditionAndCause(
844 int num_unflushed_memtables, int num_l0_files,
845 uint64_t num_compaction_needed_bytes,
846 const MutableCFOptions& mutable_cf_options,
847 const ImmutableCFOptions& immutable_cf_options) {
848 if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) {
849 return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit};
850 } else if (!mutable_cf_options.disable_auto_compactions &&
851 num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) {
852 return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit};
853 } else if (!mutable_cf_options.disable_auto_compactions &&
854 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
855 num_compaction_needed_bytes >=
856 mutable_cf_options.hard_pending_compaction_bytes_limit) {
857 return {WriteStallCondition::kStopped,
858 WriteStallCause::kPendingCompactionBytes};
859 } else if (mutable_cf_options.max_write_buffer_number > 3 &&
860 num_unflushed_memtables >=
861 mutable_cf_options.max_write_buffer_number - 1 &&
862 num_unflushed_memtables - 1 >=
863 immutable_cf_options.min_write_buffer_number_to_merge) {
864 return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit};
865 } else if (!mutable_cf_options.disable_auto_compactions &&
866 mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
867 num_l0_files >=
868 mutable_cf_options.level0_slowdown_writes_trigger) {
869 return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit};
870 } else if (!mutable_cf_options.disable_auto_compactions &&
871 mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
872 num_compaction_needed_bytes >=
873 mutable_cf_options.soft_pending_compaction_bytes_limit) {
874 return {WriteStallCondition::kDelayed,
875 WriteStallCause::kPendingCompactionBytes};
876 }
877 return {WriteStallCondition::kNormal, WriteStallCause::kNone};
878 }
879
RecalculateWriteStallConditions(const MutableCFOptions & mutable_cf_options)880 WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
881 const MutableCFOptions& mutable_cf_options) {
882 auto write_stall_condition = WriteStallCondition::kNormal;
883 if (current_ != nullptr) {
884 auto* vstorage = current_->storage_info();
885 auto write_controller = column_family_set_->write_controller_;
886 uint64_t compaction_needed_bytes =
887 vstorage->estimated_compaction_needed_bytes();
888
889 auto write_stall_condition_and_cause = GetWriteStallConditionAndCause(
890 imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(),
891 vstorage->estimated_compaction_needed_bytes(), mutable_cf_options,
892 *ioptions());
893 write_stall_condition = write_stall_condition_and_cause.first;
894 auto write_stall_cause = write_stall_condition_and_cause.second;
895
896 bool was_stopped = write_controller->IsStopped();
897 bool needed_delay = write_controller->NeedsDelay();
898
899 if (write_stall_condition == WriteStallCondition::kStopped &&
900 write_stall_cause == WriteStallCause::kMemtableLimit) {
901 write_controller_token_ = write_controller->GetStopToken();
902 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
903 ROCKS_LOG_WARN(
904 ioptions_.logger,
905 "[%s] Stopping writes because we have %d immutable memtables "
906 "(waiting for flush), max_write_buffer_number is set to %d",
907 name_.c_str(), imm()->NumNotFlushed(),
908 mutable_cf_options.max_write_buffer_number);
909 } else if (write_stall_condition == WriteStallCondition::kStopped &&
910 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
911 write_controller_token_ = write_controller->GetStopToken();
912 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
913 if (compaction_picker_->IsLevel0CompactionInProgress()) {
914 internal_stats_->AddCFStats(
915 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1);
916 }
917 ROCKS_LOG_WARN(ioptions_.logger,
918 "[%s] Stopping writes because we have %d level-0 files",
919 name_.c_str(), vstorage->l0_delay_trigger_count());
920 } else if (write_stall_condition == WriteStallCondition::kStopped &&
921 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
922 write_controller_token_ = write_controller->GetStopToken();
923 internal_stats_->AddCFStats(
924 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
925 ROCKS_LOG_WARN(
926 ioptions_.logger,
927 "[%s] Stopping writes because of estimated pending compaction "
928 "bytes %" PRIu64,
929 name_.c_str(), compaction_needed_bytes);
930 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
931 write_stall_cause == WriteStallCause::kMemtableLimit) {
932 write_controller_token_ =
933 SetupDelay(write_controller, compaction_needed_bytes,
934 prev_compaction_needed_bytes_, was_stopped,
935 mutable_cf_options.disable_auto_compactions);
936 internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1);
937 ROCKS_LOG_WARN(
938 ioptions_.logger,
939 "[%s] Stalling writes because we have %d immutable memtables "
940 "(waiting for flush), max_write_buffer_number is set to %d "
941 "rate %" PRIu64,
942 name_.c_str(), imm()->NumNotFlushed(),
943 mutable_cf_options.max_write_buffer_number,
944 write_controller->delayed_write_rate());
945 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
946 write_stall_cause == WriteStallCause::kL0FileCountLimit) {
947 // L0 is the last two files from stopping.
948 bool near_stop = vstorage->l0_delay_trigger_count() >=
949 mutable_cf_options.level0_stop_writes_trigger - 2;
950 write_controller_token_ =
951 SetupDelay(write_controller, compaction_needed_bytes,
952 prev_compaction_needed_bytes_, was_stopped || near_stop,
953 mutable_cf_options.disable_auto_compactions);
954 internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS,
955 1);
956 if (compaction_picker_->IsLevel0CompactionInProgress()) {
957 internal_stats_->AddCFStats(
958 InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1);
959 }
960 ROCKS_LOG_WARN(ioptions_.logger,
961 "[%s] Stalling writes because we have %d level-0 files "
962 "rate %" PRIu64,
963 name_.c_str(), vstorage->l0_delay_trigger_count(),
964 write_controller->delayed_write_rate());
965 } else if (write_stall_condition == WriteStallCondition::kDelayed &&
966 write_stall_cause == WriteStallCause::kPendingCompactionBytes) {
967 // If the distance to hard limit is less than 1/4 of the gap between soft
968 // and
969 // hard bytes limit, we think it is near stop and speed up the slowdown.
970 bool near_stop =
971 mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
972 (compaction_needed_bytes -
973 mutable_cf_options.soft_pending_compaction_bytes_limit) >
974 3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
975 mutable_cf_options.soft_pending_compaction_bytes_limit) /
976 4;
977
978 write_controller_token_ =
979 SetupDelay(write_controller, compaction_needed_bytes,
980 prev_compaction_needed_bytes_, was_stopped || near_stop,
981 mutable_cf_options.disable_auto_compactions);
982 internal_stats_->AddCFStats(
983 InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1);
984 ROCKS_LOG_WARN(
985 ioptions_.logger,
986 "[%s] Stalling writes because of estimated pending compaction "
987 "bytes %" PRIu64 " rate %" PRIu64,
988 name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
989 write_controller->delayed_write_rate());
990 } else {
991 assert(write_stall_condition == WriteStallCondition::kNormal);
992 if (vstorage->l0_delay_trigger_count() >=
993 GetL0ThresholdSpeedupCompaction(
994 mutable_cf_options.level0_file_num_compaction_trigger,
995 mutable_cf_options.level0_slowdown_writes_trigger)) {
996 write_controller_token_ =
997 write_controller->GetCompactionPressureToken();
998 ROCKS_LOG_INFO(
999 ioptions_.logger,
1000 "[%s] Increasing compaction threads because we have %d level-0 "
1001 "files ",
1002 name_.c_str(), vstorage->l0_delay_trigger_count());
1003 } else if (vstorage->estimated_compaction_needed_bytes() >=
1004 mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
1005 // Increase compaction threads if bytes needed for compaction exceeds
1006 // 1/4 of threshold for slowing down.
1007 // If soft pending compaction byte limit is not set, always speed up
1008 // compaction.
1009 write_controller_token_ =
1010 write_controller->GetCompactionPressureToken();
1011 if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
1012 ROCKS_LOG_INFO(
1013 ioptions_.logger,
1014 "[%s] Increasing compaction threads because of estimated pending "
1015 "compaction "
1016 "bytes %" PRIu64,
1017 name_.c_str(), vstorage->estimated_compaction_needed_bytes());
1018 }
1019 } else {
1020 write_controller_token_.reset();
1021 }
1022 // If the DB recovers from delay conditions, we reward with reducing
1023 // double the slowdown ratio. This is to balance the long term slowdown
1024 // increase signal.
1025 if (needed_delay) {
1026 uint64_t write_rate = write_controller->delayed_write_rate();
1027 write_controller->set_delayed_write_rate(static_cast<uint64_t>(
1028 static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
1029 // Set the low pri limit to be 1/4 the delayed write rate.
1030 // Note we don't reset this value even after delay condition is relased.
1031 // Low-pri rate will continue to apply if there is a compaction
1032 // pressure.
1033 write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
1034 4);
1035 }
1036 }
1037 prev_compaction_needed_bytes_ = compaction_needed_bytes;
1038 }
1039 return write_stall_condition;
1040 }
1041
soptions() const1042 const FileOptions* ColumnFamilyData::soptions() const {
1043 return &(column_family_set_->file_options_);
1044 }
1045
SetCurrent(Version * current_version)1046 void ColumnFamilyData::SetCurrent(Version* current_version) {
1047 current_ = current_version;
1048 }
1049
GetNumLiveVersions() const1050 uint64_t ColumnFamilyData::GetNumLiveVersions() const {
1051 return VersionSet::GetNumLiveVersions(dummy_versions_);
1052 }
1053
GetTotalSstFilesSize() const1054 uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
1055 return VersionSet::GetTotalSstFilesSize(dummy_versions_);
1056 }
1057
GetTotalBlobFileSize() const1058 uint64_t ColumnFamilyData::GetTotalBlobFileSize() const {
1059 return VersionSet::GetTotalBlobFileSize(dummy_versions_);
1060 }
1061
GetLiveSstFilesSize() const1062 uint64_t ColumnFamilyData::GetLiveSstFilesSize() const {
1063 return current_->GetSstFilesSize();
1064 }
1065
ConstructNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1066 MemTable* ColumnFamilyData::ConstructNewMemtable(
1067 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1068 return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
1069 write_buffer_manager_, earliest_seq, id_);
1070 }
1071
CreateNewMemtable(const MutableCFOptions & mutable_cf_options,SequenceNumber earliest_seq)1072 void ColumnFamilyData::CreateNewMemtable(
1073 const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
1074 if (mem_ != nullptr) {
1075 delete mem_->Unref();
1076 }
1077 SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
1078 mem_->Ref();
1079 }
1080
NeedsCompaction() const1081 bool ColumnFamilyData::NeedsCompaction() const {
1082 return !mutable_cf_options_.disable_auto_compactions &&
1083 compaction_picker_->NeedsCompaction(current_->storage_info());
1084 }
1085
PickCompaction(const MutableCFOptions & mutable_options,const MutableDBOptions & mutable_db_options,LogBuffer * log_buffer)1086 Compaction* ColumnFamilyData::PickCompaction(
1087 const MutableCFOptions& mutable_options,
1088 const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) {
1089 SequenceNumber earliest_mem_seqno =
1090 std::min(mem_->GetEarliestSequenceNumber(),
1091 imm_.current()->GetEarliestSequenceNumber(false));
1092 auto* result = compaction_picker_->PickCompaction(
1093 GetName(), mutable_options, mutable_db_options, current_->storage_info(),
1094 log_buffer, earliest_mem_seqno);
1095 if (result != nullptr) {
1096 result->SetInputVersion(current_);
1097 }
1098 return result;
1099 }
1100
RangeOverlapWithCompaction(const Slice & smallest_user_key,const Slice & largest_user_key,int level) const1101 bool ColumnFamilyData::RangeOverlapWithCompaction(
1102 const Slice& smallest_user_key, const Slice& largest_user_key,
1103 int level) const {
1104 return compaction_picker_->RangeOverlapWithCompaction(
1105 smallest_user_key, largest_user_key, level);
1106 }
1107
RangesOverlapWithMemtables(const autovector<Range> & ranges,SuperVersion * super_version,bool allow_data_in_errors,bool * overlap)1108 Status ColumnFamilyData::RangesOverlapWithMemtables(
1109 const autovector<Range>& ranges, SuperVersion* super_version,
1110 bool allow_data_in_errors, bool* overlap) {
1111 assert(overlap != nullptr);
1112 *overlap = false;
1113 // Create an InternalIterator over all unflushed memtables
1114 Arena arena;
1115 ReadOptions read_opts;
1116 read_opts.total_order_seek = true;
1117 MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena);
1118 merge_iter_builder.AddIterator(
1119 super_version->mem->NewIterator(read_opts, &arena));
1120 super_version->imm->AddIterators(read_opts, &merge_iter_builder);
1121 ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());
1122
1123 auto read_seq = super_version->current->version_set()->LastSequence();
1124 ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq);
1125 auto* active_range_del_iter =
1126 super_version->mem->NewRangeTombstoneIterator(read_opts, read_seq);
1127 range_del_agg.AddTombstones(
1128 std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter));
1129 Status status;
1130 status = super_version->imm->AddRangeTombstoneIterators(
1131 read_opts, nullptr /* arena */, &range_del_agg);
1132 // AddRangeTombstoneIterators always return Status::OK.
1133 assert(status.ok());
1134
1135 for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) {
1136 auto* vstorage = super_version->current->storage_info();
1137 auto* ucmp = vstorage->InternalComparator()->user_comparator();
1138 InternalKey range_start(ranges[i].start, kMaxSequenceNumber,
1139 kValueTypeForSeek);
1140 memtable_iter->Seek(range_start.Encode());
1141 status = memtable_iter->status();
1142 ParsedInternalKey seek_result;
1143
1144 if (status.ok() && memtable_iter->Valid()) {
1145 status = ParseInternalKey(memtable_iter->key(), &seek_result,
1146 allow_data_in_errors);
1147 }
1148
1149 if (status.ok()) {
1150 if (memtable_iter->Valid() &&
1151 ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) {
1152 *overlap = true;
1153 } else if (range_del_agg.IsRangeOverlapped(ranges[i].start,
1154 ranges[i].limit)) {
1155 *overlap = true;
1156 }
1157 }
1158 }
1159 return status;
1160 }
1161
1162 const int ColumnFamilyData::kCompactAllLevels = -1;
1163 const int ColumnFamilyData::kCompactToBaseLevel = -2;
1164
CompactRange(const MutableCFOptions & mutable_cf_options,const MutableDBOptions & mutable_db_options,int input_level,int output_level,const CompactRangeOptions & compact_range_options,const InternalKey * begin,const InternalKey * end,InternalKey ** compaction_end,bool * conflict,uint64_t max_file_num_to_ignore)1165 Compaction* ColumnFamilyData::CompactRange(
1166 const MutableCFOptions& mutable_cf_options,
1167 const MutableDBOptions& mutable_db_options, int input_level,
1168 int output_level, const CompactRangeOptions& compact_range_options,
1169 const InternalKey* begin, const InternalKey* end,
1170 InternalKey** compaction_end, bool* conflict,
1171 uint64_t max_file_num_to_ignore) {
1172 auto* result = compaction_picker_->CompactRange(
1173 GetName(), mutable_cf_options, mutable_db_options,
1174 current_->storage_info(), input_level, output_level,
1175 compact_range_options, begin, end, compaction_end, conflict,
1176 max_file_num_to_ignore);
1177 if (result != nullptr) {
1178 result->SetInputVersion(current_);
1179 }
1180 return result;
1181 }
1182
GetReferencedSuperVersion(DBImpl * db)1183 SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
1184 SuperVersion* sv = GetThreadLocalSuperVersion(db);
1185 sv->Ref();
1186 if (!ReturnThreadLocalSuperVersion(sv)) {
1187 // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
1188 // when the thread-local pointer was populated. So, the Ref() earlier in
1189 // this function still prevents the returned SuperVersion* from being
1190 // deleted out from under the caller.
1191 sv->Unref();
1192 }
1193 return sv;
1194 }
1195
GetThreadLocalSuperVersion(DBImpl * db)1196 SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
1197 // The SuperVersion is cached in thread local storage to avoid acquiring
1198 // mutex when SuperVersion does not change since the last use. When a new
1199 // SuperVersion is installed, the compaction or flush thread cleans up
1200 // cached SuperVersion in all existing thread local storage. To avoid
1201 // acquiring mutex for this operation, we use atomic Swap() on the thread
1202 // local pointer to guarantee exclusive access. If the thread local pointer
1203 // is being used while a new SuperVersion is installed, the cached
1204 // SuperVersion can become stale. In that case, the background thread would
1205 // have swapped in kSVObsolete. We re-check the value at when returning
1206 // SuperVersion back to thread local, with an atomic compare and swap.
1207 // The superversion will need to be released if detected to be stale.
1208 void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
1209 // Invariant:
1210 // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
1211 // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
1212 // should only keep kSVInUse before ReturnThreadLocalSuperVersion call
1213 // (if no Scrape happens).
1214 assert(ptr != SuperVersion::kSVInUse);
1215 SuperVersion* sv = static_cast<SuperVersion*>(ptr);
1216 if (sv == SuperVersion::kSVObsolete ||
1217 sv->version_number != super_version_number_.load()) {
1218 RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES);
1219 SuperVersion* sv_to_delete = nullptr;
1220
1221 if (sv && sv->Unref()) {
1222 RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS);
1223 db->mutex()->Lock();
1224 // NOTE: underlying resources held by superversion (sst files) might
1225 // not be released until the next background job.
1226 sv->Cleanup();
1227 if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
1228 db->AddSuperVersionsToFreeQueue(sv);
1229 db->SchedulePurge();
1230 } else {
1231 sv_to_delete = sv;
1232 }
1233 } else {
1234 db->mutex()->Lock();
1235 }
1236 sv = super_version_->Ref();
1237 db->mutex()->Unlock();
1238
1239 delete sv_to_delete;
1240 }
1241 assert(sv != nullptr);
1242 return sv;
1243 }
1244
ReturnThreadLocalSuperVersion(SuperVersion * sv)1245 bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
1246 assert(sv != nullptr);
1247 // Put the SuperVersion back
1248 void* expected = SuperVersion::kSVInUse;
1249 if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
1250 // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
1251 // storage has not been altered and no Scrape has happened. The
1252 // SuperVersion is still current.
1253 return true;
1254 } else {
1255 // ThreadLocal scrape happened in the process of this GetImpl call (after
1256 // thread local Swap() at the beginning and before CompareAndSwap()).
1257 // This means the SuperVersion it holds is obsolete.
1258 assert(expected == SuperVersion::kSVObsolete);
1259 }
1260 return false;
1261 }
1262
InstallSuperVersion(SuperVersionContext * sv_context,InstrumentedMutex * db_mutex)1263 void ColumnFamilyData::InstallSuperVersion(
1264 SuperVersionContext* sv_context, InstrumentedMutex* db_mutex) {
1265 db_mutex->AssertHeld();
1266 return InstallSuperVersion(sv_context, mutable_cf_options_);
1267 }
1268
InstallSuperVersion(SuperVersionContext * sv_context,const MutableCFOptions & mutable_cf_options)1269 void ColumnFamilyData::InstallSuperVersion(
1270 SuperVersionContext* sv_context,
1271 const MutableCFOptions& mutable_cf_options) {
1272 SuperVersion* new_superversion = sv_context->new_superversion.release();
1273 new_superversion->mutable_cf_options = mutable_cf_options;
1274 new_superversion->Init(this, mem_, imm_.current(), current_);
1275 SuperVersion* old_superversion = super_version_;
1276 super_version_ = new_superversion;
1277 ++super_version_number_;
1278 super_version_->version_number = super_version_number_;
1279 super_version_->write_stall_condition =
1280 RecalculateWriteStallConditions(mutable_cf_options);
1281
1282 if (old_superversion != nullptr) {
1283 // Reset SuperVersions cached in thread local storage.
1284 // This should be done before old_superversion->Unref(). That's to ensure
1285 // that local_sv_ never holds the last reference to SuperVersion, since
1286 // it has no means to safely do SuperVersion cleanup.
1287 ResetThreadLocalSuperVersions();
1288
1289 if (old_superversion->mutable_cf_options.write_buffer_size !=
1290 mutable_cf_options.write_buffer_size) {
1291 mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size);
1292 }
1293 if (old_superversion->write_stall_condition !=
1294 new_superversion->write_stall_condition) {
1295 sv_context->PushWriteStallNotification(
1296 old_superversion->write_stall_condition,
1297 new_superversion->write_stall_condition, GetName(), ioptions());
1298 }
1299 if (old_superversion->Unref()) {
1300 old_superversion->Cleanup();
1301 sv_context->superversions_to_free.push_back(old_superversion);
1302 }
1303 }
1304 }
1305
ResetThreadLocalSuperVersions()1306 void ColumnFamilyData::ResetThreadLocalSuperVersions() {
1307 autovector<void*> sv_ptrs;
1308 local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
1309 for (auto ptr : sv_ptrs) {
1310 assert(ptr);
1311 if (ptr == SuperVersion::kSVInUse) {
1312 continue;
1313 }
1314 auto sv = static_cast<SuperVersion*>(ptr);
1315 bool was_last_ref __attribute__((__unused__));
1316 was_last_ref = sv->Unref();
1317 // sv couldn't have been the last reference because
1318 // ResetThreadLocalSuperVersions() is called before
1319 // unref'ing super_version_.
1320 assert(!was_last_ref);
1321 }
1322 }
1323
ValidateOptions(const DBOptions & db_options,const ColumnFamilyOptions & cf_options)1324 Status ColumnFamilyData::ValidateOptions(
1325 const DBOptions& db_options, const ColumnFamilyOptions& cf_options) {
1326 Status s;
1327 s = CheckCompressionSupported(cf_options);
1328 if (s.ok() && db_options.allow_concurrent_memtable_write) {
1329 s = CheckConcurrentWritesSupported(cf_options);
1330 }
1331 if (s.ok() && db_options.unordered_write &&
1332 cf_options.max_successive_merges != 0) {
1333 s = Status::InvalidArgument(
1334 "max_successive_merges > 0 is incompatible with unordered_write");
1335 }
1336 if (s.ok()) {
1337 s = CheckCFPathsSupported(db_options, cf_options);
1338 }
1339 if (!s.ok()) {
1340 return s;
1341 }
1342
1343 if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) {
1344 if (!cf_options.table_factory->IsInstanceOf(
1345 TableFactory::kBlockBasedTableName())) {
1346 return Status::NotSupported(
1347 "TTL is only supported in Block-Based Table format. ");
1348 }
1349 }
1350
1351 if (cf_options.periodic_compaction_seconds > 0 &&
1352 cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) {
1353 if (!cf_options.table_factory->IsInstanceOf(
1354 TableFactory::kBlockBasedTableName())) {
1355 return Status::NotSupported(
1356 "Periodic Compaction is only supported in "
1357 "Block-Based Table format. ");
1358 }
1359 }
1360
1361 if (cf_options.enable_blob_garbage_collection) {
1362 if (cf_options.blob_garbage_collection_age_cutoff < 0.0 ||
1363 cf_options.blob_garbage_collection_age_cutoff > 1.0) {
1364 return Status::InvalidArgument(
1365 "The age cutoff for blob garbage collection should be in the range "
1366 "[0.0, 1.0].");
1367 }
1368 if (cf_options.blob_garbage_collection_force_threshold < 0.0 ||
1369 cf_options.blob_garbage_collection_force_threshold > 1.0) {
1370 return Status::InvalidArgument(
1371 "The garbage ratio threshold for forcing blob garbage collection "
1372 "should be in the range [0.0, 1.0].");
1373 }
1374 }
1375
1376 if (cf_options.compaction_style == kCompactionStyleFIFO &&
1377 db_options.max_open_files != -1 && cf_options.ttl > 0) {
1378 return Status::NotSupported(
1379 "FIFO compaction only supported with max_open_files = -1.");
1380 }
1381
1382 return s;
1383 }
1384
1385 #ifndef ROCKSDB_LITE
SetOptions(const DBOptions & db_opts,const std::unordered_map<std::string,std::string> & options_map)1386 Status ColumnFamilyData::SetOptions(
1387 const DBOptions& db_opts,
1388 const std::unordered_map<std::string, std::string>& options_map) {
1389 ColumnFamilyOptions cf_opts =
1390 BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
1391 ConfigOptions config_opts;
1392 config_opts.mutable_options_only = true;
1393 Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map,
1394 &cf_opts);
1395 if (s.ok()) {
1396 s = ValidateOptions(db_opts, cf_opts);
1397 }
1398 if (s.ok()) {
1399 mutable_cf_options_ = MutableCFOptions(cf_opts);
1400 mutable_cf_options_.RefreshDerivedOptions(ioptions_);
1401 }
1402 return s;
1403 }
1404 #endif // ROCKSDB_LITE
1405
1406 // REQUIRES: DB mutex held
CalculateSSTWriteHint(int level)1407 Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
1408 if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
1409 return Env::WLTH_NOT_SET;
1410 }
1411 if (level == 0) {
1412 return Env::WLTH_MEDIUM;
1413 }
1414 int base_level = current_->storage_info()->base_level();
1415
1416 // L1: medium, L2: long, ...
1417 if (level - base_level >= 2) {
1418 return Env::WLTH_EXTREME;
1419 } else if (level < base_level) {
1420 // There is no restriction which prevents level passed in to be smaller
1421 // than base_level.
1422 return Env::WLTH_MEDIUM;
1423 }
1424 return static_cast<Env::WriteLifeTimeHint>(level - base_level +
1425 static_cast<int>(Env::WLTH_MEDIUM));
1426 }
1427
AddDirectories(std::map<std::string,std::shared_ptr<FSDirectory>> * created_dirs)1428 Status ColumnFamilyData::AddDirectories(
1429 std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) {
1430 Status s;
1431 assert(created_dirs != nullptr);
1432 assert(data_dirs_.empty());
1433 for (auto& p : ioptions_.cf_paths) {
1434 auto existing_dir = created_dirs->find(p.path);
1435
1436 if (existing_dir == created_dirs->end()) {
1437 std::unique_ptr<FSDirectory> path_directory;
1438 s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path,
1439 &path_directory);
1440 if (!s.ok()) {
1441 return s;
1442 }
1443 assert(path_directory != nullptr);
1444 data_dirs_.emplace_back(path_directory.release());
1445 (*created_dirs)[p.path] = data_dirs_.back();
1446 } else {
1447 data_dirs_.emplace_back(existing_dir->second);
1448 }
1449 }
1450 assert(data_dirs_.size() == ioptions_.cf_paths.size());
1451 return s;
1452 }
1453
GetDataDir(size_t path_id) const1454 FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const {
1455 if (data_dirs_.empty()) {
1456 return nullptr;
1457 }
1458
1459 assert(path_id < data_dirs_.size());
1460 return data_dirs_[path_id].get();
1461 }
1462
ColumnFamilySet(const std::string & dbname,const ImmutableDBOptions * db_options,const FileOptions & file_options,Cache * table_cache,WriteBufferManager * _write_buffer_manager,WriteController * _write_controller,BlockCacheTracer * const block_cache_tracer,const std::shared_ptr<IOTracer> & io_tracer,const std::string & db_session_id)1463 ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
1464 const ImmutableDBOptions* db_options,
1465 const FileOptions& file_options,
1466 Cache* table_cache,
1467 WriteBufferManager* _write_buffer_manager,
1468 WriteController* _write_controller,
1469 BlockCacheTracer* const block_cache_tracer,
1470 const std::shared_ptr<IOTracer>& io_tracer,
1471 const std::string& db_session_id)
1472 : max_column_family_(0),
1473 file_options_(file_options),
1474 dummy_cfd_(new ColumnFamilyData(
1475 ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
1476 nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr,
1477 block_cache_tracer, io_tracer, db_session_id)),
1478 default_cfd_cache_(nullptr),
1479 db_name_(dbname),
1480 db_options_(db_options),
1481 table_cache_(table_cache),
1482 write_buffer_manager_(_write_buffer_manager),
1483 write_controller_(_write_controller),
1484 block_cache_tracer_(block_cache_tracer),
1485 io_tracer_(io_tracer),
1486 db_session_id_(db_session_id) {
1487 // initialize linked list
1488 dummy_cfd_->prev_ = dummy_cfd_;
1489 dummy_cfd_->next_ = dummy_cfd_;
1490 }
1491
~ColumnFamilySet()1492 ColumnFamilySet::~ColumnFamilySet() {
1493 while (column_family_data_.size() > 0) {
1494 // cfd destructor will delete itself from column_family_data_
1495 auto cfd = column_family_data_.begin()->second;
1496 bool last_ref __attribute__((__unused__));
1497 last_ref = cfd->UnrefAndTryDelete();
1498 assert(last_ref);
1499 }
1500 bool dummy_last_ref __attribute__((__unused__));
1501 dummy_last_ref = dummy_cfd_->UnrefAndTryDelete();
1502 assert(dummy_last_ref);
1503 }
1504
GetDefault() const1505 ColumnFamilyData* ColumnFamilySet::GetDefault() const {
1506 assert(default_cfd_cache_ != nullptr);
1507 return default_cfd_cache_;
1508 }
1509
GetColumnFamily(uint32_t id) const1510 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
1511 auto cfd_iter = column_family_data_.find(id);
1512 if (cfd_iter != column_family_data_.end()) {
1513 return cfd_iter->second;
1514 } else {
1515 return nullptr;
1516 }
1517 }
1518
GetColumnFamily(const std::string & name) const1519 ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
1520 const {
1521 auto cfd_iter = column_families_.find(name);
1522 if (cfd_iter != column_families_.end()) {
1523 auto cfd = GetColumnFamily(cfd_iter->second);
1524 assert(cfd != nullptr);
1525 return cfd;
1526 } else {
1527 return nullptr;
1528 }
1529 }
1530
GetNextColumnFamilyID()1531 uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
1532 return ++max_column_family_;
1533 }
1534
GetMaxColumnFamily()1535 uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
1536
UpdateMaxColumnFamily(uint32_t new_max_column_family)1537 void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
1538 max_column_family_ = std::max(new_max_column_family, max_column_family_);
1539 }
1540
NumberOfColumnFamilies() const1541 size_t ColumnFamilySet::NumberOfColumnFamilies() const {
1542 return column_families_.size();
1543 }
1544
1545 // under a DB mutex AND write thread
CreateColumnFamily(const std::string & name,uint32_t id,Version * dummy_versions,const ColumnFamilyOptions & options)1546 ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
1547 const std::string& name, uint32_t id, Version* dummy_versions,
1548 const ColumnFamilyOptions& options) {
1549 assert(column_families_.find(name) == column_families_.end());
1550 ColumnFamilyData* new_cfd = new ColumnFamilyData(
1551 id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
1552 *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_,
1553 db_session_id_);
1554 column_families_.insert({name, id});
1555 column_family_data_.insert({id, new_cfd});
1556 max_column_family_ = std::max(max_column_family_, id);
1557 // add to linked list
1558 new_cfd->next_ = dummy_cfd_;
1559 auto prev = dummy_cfd_->prev_;
1560 new_cfd->prev_ = prev;
1561 prev->next_ = new_cfd;
1562 dummy_cfd_->prev_ = new_cfd;
1563 if (id == 0) {
1564 default_cfd_cache_ = new_cfd;
1565 }
1566 return new_cfd;
1567 }
1568
1569 // REQUIRES: DB mutex held
FreeDeadColumnFamilies()1570 void ColumnFamilySet::FreeDeadColumnFamilies() {
1571 autovector<ColumnFamilyData*> to_delete;
1572 for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
1573 if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
1574 to_delete.push_back(cfd);
1575 }
1576 }
1577 for (auto cfd : to_delete) {
1578 // this is very rare, so it's not a problem that we do it under a mutex
1579 delete cfd;
1580 }
1581 }
1582
1583 // under a DB mutex AND from a write thread
RemoveColumnFamily(ColumnFamilyData * cfd)1584 void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
1585 auto cfd_iter = column_family_data_.find(cfd->GetID());
1586 assert(cfd_iter != column_family_data_.end());
1587 column_family_data_.erase(cfd_iter);
1588 column_families_.erase(cfd->GetName());
1589 }
1590
1591 // under a DB mutex OR from a write thread
Seek(uint32_t column_family_id)1592 bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
1593 if (column_family_id == 0) {
1594 // optimization for common case
1595 current_ = column_family_set_->GetDefault();
1596 } else {
1597 current_ = column_family_set_->GetColumnFamily(column_family_id);
1598 }
1599 handle_.SetCFD(current_);
1600 return current_ != nullptr;
1601 }
1602
GetLogNumber() const1603 uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
1604 assert(current_ != nullptr);
1605 return current_->GetLogNumber();
1606 }
1607
GetMemTable() const1608 MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
1609 assert(current_ != nullptr);
1610 return current_->mem();
1611 }
1612
GetColumnFamilyHandle()1613 ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
1614 assert(current_ != nullptr);
1615 return &handle_;
1616 }
1617
GetColumnFamilyID(ColumnFamilyHandle * column_family)1618 uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
1619 uint32_t column_family_id = 0;
1620 if (column_family != nullptr) {
1621 auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
1622 column_family_id = cfh->GetID();
1623 }
1624 return column_family_id;
1625 }
1626
GetColumnFamilyUserComparator(ColumnFamilyHandle * column_family)1627 const Comparator* GetColumnFamilyUserComparator(
1628 ColumnFamilyHandle* column_family) {
1629 if (column_family != nullptr) {
1630 return column_family->GetComparator();
1631 }
1632 return nullptr;
1633 }
1634
1635 } // namespace ROCKSDB_NAMESPACE
1636