1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #include "db/write_controller.h" 7 8 #include <atomic> 9 #include <cassert> 10 #include <ratio> 11 #include "rocksdb/env.h" 12 13 namespace ROCKSDB_NAMESPACE { 14 GetStopToken()15std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() { 16 ++total_stopped_; 17 return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); 18 } 19 GetDelayToken(uint64_t write_rate)20std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( 21 uint64_t write_rate) { 22 total_delayed_++; 23 // Reset counters. 24 last_refill_time_ = 0; 25 bytes_left_ = 0; 26 set_delayed_write_rate(write_rate); 27 return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); 28 } 29 30 std::unique_ptr<WriteControllerToken> GetCompactionPressureToken()31WriteController::GetCompactionPressureToken() { 32 ++total_compaction_pressure_; 33 return std::unique_ptr<WriteControllerToken>( 34 new CompactionPressureToken(this)); 35 } 36 IsStopped() const37bool WriteController::IsStopped() const { 38 return total_stopped_.load(std::memory_order_relaxed) > 0; 39 } 40 // This is inside DB mutex, so we can't sleep and need to minimize 41 // frequency to get time. 42 // If it turns out to be a performance issue, we can redesign the thread 43 // synchronization model here. 44 // The function trust caller will sleep micros returned. GetDelay(Env * env,uint64_t num_bytes)45uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { 46 if (total_stopped_.load(std::memory_order_relaxed) > 0) { 47 return 0; 48 } 49 if (total_delayed_.load(std::memory_order_relaxed) == 0) { 50 return 0; 51 } 52 53 const uint64_t kMicrosPerSecond = 1000000; 54 const uint64_t kRefillInterval = 1024U; 55 56 if (bytes_left_ >= num_bytes) { 57 bytes_left_ -= num_bytes; 58 return 0; 59 } 60 // The frequency to get time inside DB mutex is less than one per refill 61 // interval. 62 auto time_now = NowMicrosMonotonic(env); 63 64 uint64_t sleep_debt = 0; 65 uint64_t time_since_last_refill = 0; 66 if (last_refill_time_ != 0) { 67 if (last_refill_time_ > time_now) { 68 sleep_debt = last_refill_time_ - time_now; 69 } else { 70 time_since_last_refill = time_now - last_refill_time_; 71 bytes_left_ += 72 static_cast<uint64_t>(static_cast<double>(time_since_last_refill) / 73 kMicrosPerSecond * delayed_write_rate_); 74 if (time_since_last_refill >= kRefillInterval && 75 bytes_left_ > num_bytes) { 76 // If refill interval already passed and we have enough bytes 77 // return without extra sleeping. 78 last_refill_time_ = time_now; 79 bytes_left_ -= num_bytes; 80 return 0; 81 } 82 } 83 } 84 85 uint64_t single_refill_amount = 86 delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; 87 if (bytes_left_ + single_refill_amount >= num_bytes) { 88 // Wait until a refill interval 89 // Never trigger expire for less than one refill interval to avoid to get 90 // time. 91 bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; 92 last_refill_time_ = time_now + kRefillInterval; 93 return kRefillInterval + sleep_debt; 94 } 95 96 // Need to refill more than one interval. Need to sleep longer. Check 97 // whether expiration will hit 98 99 // Sleep just until `num_bytes` is allowed. 100 uint64_t sleep_amount = 101 static_cast<uint64_t>(num_bytes / 102 static_cast<long double>(delayed_write_rate_) * 103 kMicrosPerSecond) + 104 sleep_debt; 105 last_refill_time_ = time_now + sleep_amount; 106 return sleep_amount; 107 } 108 NowMicrosMonotonic(Env * env)109uint64_t WriteController::NowMicrosMonotonic(Env* env) { 110 return env->NowNanos() / std::milli::den; 111 } 112 ~StopWriteToken()113StopWriteToken::~StopWriteToken() { 114 assert(controller_->total_stopped_ >= 1); 115 --controller_->total_stopped_; 116 } 117 ~DelayWriteToken()118DelayWriteToken::~DelayWriteToken() { 119 controller_->total_delayed_--; 120 assert(controller_->total_delayed_.load() >= 0); 121 } 122 ~CompactionPressureToken()123CompactionPressureToken::~CompactionPressureToken() { 124 controller_->total_compaction_pressure_--; 125 assert(controller_->total_compaction_pressure_ >= 0); 126 } 127 128 } // namespace ROCKSDB_NAMESPACE 129