1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. 2 // This source code is licensed under both the GPLv2 (found in the 3 // COPYING file in the root directory) and Apache 2.0 License 4 // (found in the LICENSE.Apache file in the root directory). 5 6 #pragma once 7 8 #include <stdint.h> 9 10 #include <atomic> 11 #include <memory> 12 #include "rocksdb/rate_limiter.h" 13 14 namespace rocksdb { 15 16 class Env; 17 class WriteControllerToken; 18 19 // WriteController is controlling write stalls in our write code-path. Write 20 // stalls happen when compaction can't keep up with write rate. 21 // All of the methods here (including WriteControllerToken's destructors) need 22 // to be called while holding DB mutex 23 class WriteController { 24 public: 25 explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u, 26 int64_t low_pri_rate_bytes_per_sec = 1024 * 1024) 27 : total_stopped_(0), 28 total_delayed_(0), 29 total_compaction_pressure_(0), 30 bytes_left_(0), 31 last_refill_time_(0), 32 low_pri_rate_limiter_( 33 NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) { 34 set_max_delayed_write_rate(_delayed_write_rate); 35 } 36 ~WriteController() = default; 37 38 // When an actor (column family) requests a stop token, all writes will be 39 // stopped until the stop token is released (deleted) 40 std::unique_ptr<WriteControllerToken> GetStopToken(); 41 // When an actor (column family) requests a delay token, total delay for all 42 // writes to the DB will be controlled under the delayed write rate. Every 43 // write needs to call GetDelay() with number of bytes writing to the DB, 44 // which returns number of microseconds to sleep. 45 std::unique_ptr<WriteControllerToken> GetDelayToken( 46 uint64_t delayed_write_rate); 47 // When an actor (column family) requests a moderate token, compaction 48 // threads will be increased 49 std::unique_ptr<WriteControllerToken> GetCompactionPressureToken(); 50 51 // these three metods are querying the state of the WriteController 52 bool IsStopped() const; NeedsDelay()53 bool NeedsDelay() const { return total_delayed_.load() > 0; } NeedSpeedupCompaction()54 bool NeedSpeedupCompaction() const { 55 return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0; 56 } 57 // return how many microseconds the caller needs to sleep after the call 58 // num_bytes: how many number of bytes to put into the DB. 59 // Prerequisite: DB mutex held. 60 uint64_t GetDelay(Env* env, uint64_t num_bytes); set_delayed_write_rate(uint64_t write_rate)61 void set_delayed_write_rate(uint64_t write_rate) { 62 // avoid divide 0 63 if (write_rate == 0) { 64 write_rate = 1u; 65 } else if (write_rate > max_delayed_write_rate()) { 66 write_rate = max_delayed_write_rate(); 67 } 68 delayed_write_rate_ = write_rate; 69 } 70 set_max_delayed_write_rate(uint64_t write_rate)71 void set_max_delayed_write_rate(uint64_t write_rate) { 72 // avoid divide 0 73 if (write_rate == 0) { 74 write_rate = 1u; 75 } 76 max_delayed_write_rate_ = write_rate; 77 // update delayed_write_rate_ as well 78 delayed_write_rate_ = write_rate; 79 } 80 delayed_write_rate()81 uint64_t delayed_write_rate() const { return delayed_write_rate_; } 82 max_delayed_write_rate()83 uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; } 84 low_pri_rate_limiter()85 RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); } 86 87 private: 88 uint64_t NowMicrosMonotonic(Env* env); 89 90 friend class WriteControllerToken; 91 friend class StopWriteToken; 92 friend class DelayWriteToken; 93 friend class CompactionPressureToken; 94 95 std::atomic<int> total_stopped_; 96 std::atomic<int> total_delayed_; 97 std::atomic<int> total_compaction_pressure_; 98 uint64_t bytes_left_; 99 uint64_t last_refill_time_; 100 // write rate set when initialization or by `DBImpl::SetDBOptions` 101 uint64_t max_delayed_write_rate_; 102 // current write rate 103 uint64_t delayed_write_rate_; 104 105 std::unique_ptr<RateLimiter> low_pri_rate_limiter_; 106 }; 107 108 class WriteControllerToken { 109 public: WriteControllerToken(WriteController * controller)110 explicit WriteControllerToken(WriteController* controller) 111 : controller_(controller) {} ~WriteControllerToken()112 virtual ~WriteControllerToken() {} 113 114 protected: 115 WriteController* controller_; 116 117 private: 118 // no copying allowed 119 WriteControllerToken(const WriteControllerToken&) = delete; 120 void operator=(const WriteControllerToken&) = delete; 121 }; 122 123 class StopWriteToken : public WriteControllerToken { 124 public: StopWriteToken(WriteController * controller)125 explicit StopWriteToken(WriteController* controller) 126 : WriteControllerToken(controller) {} 127 virtual ~StopWriteToken(); 128 }; 129 130 class DelayWriteToken : public WriteControllerToken { 131 public: DelayWriteToken(WriteController * controller)132 explicit DelayWriteToken(WriteController* controller) 133 : WriteControllerToken(controller) {} 134 virtual ~DelayWriteToken(); 135 }; 136 137 class CompactionPressureToken : public WriteControllerToken { 138 public: CompactionPressureToken(WriteController * controller)139 explicit CompactionPressureToken(WriteController* controller) 140 : WriteControllerToken(controller) {} 141 virtual ~CompactionPressureToken(); 142 }; 143 144 } // namespace rocksdb 145