1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "rocksdb/write_buffer_manager.h"
11 #include <mutex>
12 #include "util/coding.h"
13
14 namespace rocksdb {
15 #ifndef ROCKSDB_LITE
16 namespace {
17 const size_t kSizeDummyEntry = 256 * 1024;
18 // The key will be longer than keys for blocks in SST files so they won't
19 // conflict.
20 const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
21 } // namespace
22
23 struct WriteBufferManager::CacheRep {
24 std::shared_ptr<Cache> cache_;
25 std::mutex cache_mutex_;
26 std::atomic<size_t> cache_allocated_size_;
27 // The non-prefix part will be updated according to the ID to use.
28 char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
29 uint64_t next_cache_key_id_ = 0;
30 std::vector<Cache::Handle*> dummy_handles_;
31
CacheReprocksdb::WriteBufferManager::CacheRep32 explicit CacheRep(std::shared_ptr<Cache> cache)
33 : cache_(cache), cache_allocated_size_(0) {
34 memset(cache_key_, 0, kCacheKeyPrefix);
35 size_t pointer_size = sizeof(const void*);
36 assert(pointer_size <= kCacheKeyPrefix);
37 memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
38 }
39
GetNextCacheKeyrocksdb::WriteBufferManager::CacheRep40 Slice GetNextCacheKey() {
41 memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
42 char* end =
43 EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
44 return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
45 }
46 };
47 #else
48 struct WriteBufferManager::CacheRep {};
49 #endif // ROCKSDB_LITE
50
WriteBufferManager(size_t _buffer_size,std::shared_ptr<Cache> cache)51 WriteBufferManager::WriteBufferManager(size_t _buffer_size,
52 std::shared_ptr<Cache> cache)
53 : buffer_size_(_buffer_size),
54 mutable_limit_(buffer_size_ * 7 / 8),
55 memory_used_(0),
56 memory_active_(0),
57 cache_rep_(nullptr) {
58 #ifndef ROCKSDB_LITE
59 if (cache) {
60 // Construct the cache key using the pointer to this.
61 cache_rep_.reset(new CacheRep(cache));
62 }
63 #else
64 (void)cache;
65 #endif // ROCKSDB_LITE
66 }
67
~WriteBufferManager()68 WriteBufferManager::~WriteBufferManager() {
69 #ifndef ROCKSDB_LITE
70 if (cache_rep_) {
71 for (auto* handle : cache_rep_->dummy_handles_) {
72 cache_rep_->cache_->Release(handle, true);
73 }
74 }
75 #endif // ROCKSDB_LITE
76 }
77
78 // Should only be called from write thread
ReserveMemWithCache(size_t mem)79 void WriteBufferManager::ReserveMemWithCache(size_t mem) {
80 #ifndef ROCKSDB_LITE
81 assert(cache_rep_ != nullptr);
82 // Use a mutex to protect various data structures. Can be optimized to a
83 // lock-free solution if it ends up with a performance bottleneck.
84 std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
85
86 size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
87 memory_used_.store(new_mem_used, std::memory_order_relaxed);
88 while (new_mem_used > cache_rep_->cache_allocated_size_) {
89 // Expand size by at least 256KB.
90 // Add a dummy record to the cache
91 Cache::Handle* handle;
92 cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
93 kSizeDummyEntry, nullptr, &handle);
94 cache_rep_->dummy_handles_.push_back(handle);
95 cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
96 }
97 #else
98 (void)mem;
99 #endif // ROCKSDB_LITE
100 }
101
FreeMemWithCache(size_t mem)102 void WriteBufferManager::FreeMemWithCache(size_t mem) {
103 #ifndef ROCKSDB_LITE
104 assert(cache_rep_ != nullptr);
105 // Use a mutex to protect various data structures. Can be optimized to a
106 // lock-free solution if it ends up with a performance bottleneck.
107 std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
108 size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
109 memory_used_.store(new_mem_used, std::memory_order_relaxed);
110 // Gradually shrink memory costed in the block cache if the actual
111 // usage is less than 3/4 of what we reserve from the block cache.
112 // We do this because:
113 // 1. we don't pay the cost of the block cache immediately a memtable is
114 // freed, as block cache insert is expensive;
115 // 2. eventually, if we walk away from a temporary memtable size increase,
116 // we make sure shrink the memory costed in block cache over time.
117 // In this way, we only shrink costed memory showly even there is enough
118 // margin.
119 if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
120 cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
121 assert(!cache_rep_->dummy_handles_.empty());
122 cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
123 cache_rep_->dummy_handles_.pop_back();
124 cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
125 }
126 #else
127 (void)mem;
128 #endif // ROCKSDB_LITE
129 }
130 } // namespace rocksdb
131