1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "rocksdb/write_buffer_manager.h"
11 #include <mutex>
12 #include "util/coding.h"
13 
14 namespace rocksdb {
15 #ifndef ROCKSDB_LITE
16 namespace {
17 const size_t kSizeDummyEntry = 256 * 1024;
18 // The key will be longer than keys for blocks in SST files so they won't
19 // conflict.
20 const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
21 }  // namespace
22 
23 struct WriteBufferManager::CacheRep {
24   std::shared_ptr<Cache> cache_;
25   std::mutex cache_mutex_;
26   std::atomic<size_t> cache_allocated_size_;
27   // The non-prefix part will be updated according to the ID to use.
28   char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
29   uint64_t next_cache_key_id_ = 0;
30   std::vector<Cache::Handle*> dummy_handles_;
31 
CacheReprocksdb::WriteBufferManager::CacheRep32   explicit CacheRep(std::shared_ptr<Cache> cache)
33       : cache_(cache), cache_allocated_size_(0) {
34     memset(cache_key_, 0, kCacheKeyPrefix);
35     size_t pointer_size = sizeof(const void*);
36     assert(pointer_size <= kCacheKeyPrefix);
37     memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
38   }
39 
GetNextCacheKeyrocksdb::WriteBufferManager::CacheRep40   Slice GetNextCacheKey() {
41     memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
42     char* end =
43         EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
44     return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
45   }
46 };
47 #else
48 struct WriteBufferManager::CacheRep {};
49 #endif  // ROCKSDB_LITE
50 
WriteBufferManager(size_t _buffer_size,std::shared_ptr<Cache> cache)51 WriteBufferManager::WriteBufferManager(size_t _buffer_size,
52                                        std::shared_ptr<Cache> cache)
53     : buffer_size_(_buffer_size),
54       mutable_limit_(buffer_size_ * 7 / 8),
55       memory_used_(0),
56       memory_active_(0),
57       cache_rep_(nullptr) {
58 #ifndef ROCKSDB_LITE
59   if (cache) {
60     // Construct the cache key using the pointer to this.
61     cache_rep_.reset(new CacheRep(cache));
62   }
63 #else
64   (void)cache;
65 #endif  // ROCKSDB_LITE
66 }
67 
~WriteBufferManager()68 WriteBufferManager::~WriteBufferManager() {
69 #ifndef ROCKSDB_LITE
70   if (cache_rep_) {
71     for (auto* handle : cache_rep_->dummy_handles_) {
72       cache_rep_->cache_->Release(handle, true);
73     }
74   }
75 #endif  // ROCKSDB_LITE
76 }
77 
78 // Should only be called from write thread
ReserveMemWithCache(size_t mem)79 void WriteBufferManager::ReserveMemWithCache(size_t mem) {
80 #ifndef ROCKSDB_LITE
81   assert(cache_rep_ != nullptr);
82   // Use a mutex to protect various data structures. Can be optimized to a
83   // lock-free solution if it ends up with a performance bottleneck.
84   std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
85 
86   size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
87   memory_used_.store(new_mem_used, std::memory_order_relaxed);
88   while (new_mem_used > cache_rep_->cache_allocated_size_) {
89     // Expand size by at least 256KB.
90     // Add a dummy record to the cache
91     Cache::Handle* handle;
92     cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
93                                kSizeDummyEntry, nullptr, &handle);
94     cache_rep_->dummy_handles_.push_back(handle);
95     cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
96   }
97 #else
98   (void)mem;
99 #endif  // ROCKSDB_LITE
100 }
101 
FreeMemWithCache(size_t mem)102 void WriteBufferManager::FreeMemWithCache(size_t mem) {
103 #ifndef ROCKSDB_LITE
104   assert(cache_rep_ != nullptr);
105   // Use a mutex to protect various data structures. Can be optimized to a
106   // lock-free solution if it ends up with a performance bottleneck.
107   std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
108   size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
109   memory_used_.store(new_mem_used, std::memory_order_relaxed);
110   // Gradually shrink memory costed in the block cache if the actual
111   // usage is less than 3/4 of what we reserve from the block cache.
112   // We do this because:
113   // 1. we don't pay the cost of the block cache immediately a memtable is
114   //    freed, as block cache insert is expensive;
115   // 2. eventually, if we walk away from a temporary memtable size increase,
116   //    we make sure shrink the memory costed in block cache over time.
117   // In this way, we only shrink costed memory showly even there is enough
118   // margin.
119   if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
120       cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
121     assert(!cache_rep_->dummy_handles_.empty());
122     cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
123     cache_rep_->dummy_handles_.pop_back();
124     cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
125   }
126 #else
127   (void)mem;
128 #endif  // ROCKSDB_LITE
129 }
130 }  // namespace rocksdb
131