1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #pragma once
7 
8 #include <stdint.h>
9 
10 #include <atomic>
11 #include <memory>
12 #include "rocksdb/rate_limiter.h"
13 
14 namespace rocksdb {
15 
16 class Env;
17 class WriteControllerToken;
18 
19 // WriteController is controlling write stalls in our write code-path. Write
20 // stalls happen when compaction can't keep up with write rate.
21 // All of the methods here (including WriteControllerToken's destructors) need
22 // to be called while holding DB mutex
23 class WriteController {
24  public:
25   explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
26                            int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
27       : total_stopped_(0),
28         total_delayed_(0),
29         total_compaction_pressure_(0),
30         bytes_left_(0),
31         last_refill_time_(0),
32         low_pri_rate_limiter_(
33             NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
34     set_max_delayed_write_rate(_delayed_write_rate);
35   }
36   ~WriteController() = default;
37 
38   // When an actor (column family) requests a stop token, all writes will be
39   // stopped until the stop token is released (deleted)
40   std::unique_ptr<WriteControllerToken> GetStopToken();
41   // When an actor (column family) requests a delay token, total delay for all
42   // writes to the DB will be controlled under the delayed write rate. Every
43   // write needs to call GetDelay() with number of bytes writing to the DB,
44   // which returns number of microseconds to sleep.
45   std::unique_ptr<WriteControllerToken> GetDelayToken(
46       uint64_t delayed_write_rate);
47   // When an actor (column family) requests a moderate token, compaction
48   // threads will be increased
49   std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
50 
51   // these three metods are querying the state of the WriteController
52   bool IsStopped() const;
NeedsDelay()53   bool NeedsDelay() const { return total_delayed_.load() > 0; }
NeedSpeedupCompaction()54   bool NeedSpeedupCompaction() const {
55     return IsStopped() || NeedsDelay() || total_compaction_pressure_ > 0;
56   }
57   // return how many microseconds the caller needs to sleep after the call
58   // num_bytes: how many number of bytes to put into the DB.
59   // Prerequisite: DB mutex held.
60   uint64_t GetDelay(Env* env, uint64_t num_bytes);
set_delayed_write_rate(uint64_t write_rate)61   void set_delayed_write_rate(uint64_t write_rate) {
62     // avoid divide 0
63     if (write_rate == 0) {
64       write_rate = 1u;
65     } else if (write_rate > max_delayed_write_rate()) {
66       write_rate = max_delayed_write_rate();
67     }
68     delayed_write_rate_ = write_rate;
69   }
70 
set_max_delayed_write_rate(uint64_t write_rate)71   void set_max_delayed_write_rate(uint64_t write_rate) {
72     // avoid divide 0
73     if (write_rate == 0) {
74       write_rate = 1u;
75     }
76     max_delayed_write_rate_ = write_rate;
77     // update delayed_write_rate_ as well
78     delayed_write_rate_ = write_rate;
79   }
80 
delayed_write_rate()81   uint64_t delayed_write_rate() const { return delayed_write_rate_; }
82 
max_delayed_write_rate()83   uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
84 
low_pri_rate_limiter()85   RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
86 
87  private:
88   uint64_t NowMicrosMonotonic(Env* env);
89 
90   friend class WriteControllerToken;
91   friend class StopWriteToken;
92   friend class DelayWriteToken;
93   friend class CompactionPressureToken;
94 
95   std::atomic<int> total_stopped_;
96   std::atomic<int> total_delayed_;
97   std::atomic<int> total_compaction_pressure_;
98   uint64_t bytes_left_;
99   uint64_t last_refill_time_;
100   // write rate set when initialization or by `DBImpl::SetDBOptions`
101   uint64_t max_delayed_write_rate_;
102   // current write rate
103   uint64_t delayed_write_rate_;
104 
105   std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
106 };
107 
108 class WriteControllerToken {
109  public:
WriteControllerToken(WriteController * controller)110   explicit WriteControllerToken(WriteController* controller)
111       : controller_(controller) {}
~WriteControllerToken()112   virtual ~WriteControllerToken() {}
113 
114  protected:
115   WriteController* controller_;
116 
117  private:
118   // no copying allowed
119   WriteControllerToken(const WriteControllerToken&) = delete;
120   void operator=(const WriteControllerToken&) = delete;
121 };
122 
123 class StopWriteToken : public WriteControllerToken {
124  public:
StopWriteToken(WriteController * controller)125   explicit StopWriteToken(WriteController* controller)
126       : WriteControllerToken(controller) {}
127   virtual ~StopWriteToken();
128 };
129 
130 class DelayWriteToken : public WriteControllerToken {
131  public:
DelayWriteToken(WriteController * controller)132   explicit DelayWriteToken(WriteController* controller)
133       : WriteControllerToken(controller) {}
134   virtual ~DelayWriteToken();
135 };
136 
137 class CompactionPressureToken : public WriteControllerToken {
138  public:
CompactionPressureToken(WriteController * controller)139   explicit CompactionPressureToken(WriteController* controller)
140       : WriteControllerToken(controller) {}
141   virtual ~CompactionPressureToken();
142 };
143 
144 }  // namespace rocksdb
145