1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "util/rate_limiter.h"
11 #include "monitoring/statistics.h"
12 #include "port/port.h"
13 #include "rocksdb/env.h"
14 #include "test_util/sync_point.h"
15 #include "util/aligned_buffer.h"
16 
17 namespace ROCKSDB_NAMESPACE {
18 
RequestToken(size_t bytes,size_t alignment,Env::IOPriority io_priority,Statistics * stats,RateLimiter::OpType op_type)19 size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
20                                  Env::IOPriority io_priority, Statistics* stats,
21                                  RateLimiter::OpType op_type) {
22   if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
23     bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
24 
25     if (alignment > 0) {
26       // Here we may actually require more than burst and block
27       // but we can not write less than one page at a time on direct I/O
28       // thus we may want not to use ratelimiter
29       bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
30     }
31     Request(bytes, io_priority, stats, op_type);
32   }
33   return bytes;
34 }
35 
36 // Pending request
37 struct GenericRateLimiter::Req {
ReqROCKSDB_NAMESPACE::GenericRateLimiter::Req38   explicit Req(int64_t _bytes, port::Mutex* _mu)
39       : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {}
40   int64_t request_bytes;
41   int64_t bytes;
42   port::CondVar cv;
43   bool granted;
44 };
45 
GenericRateLimiter(int64_t rate_bytes_per_sec,int64_t refill_period_us,int32_t fairness,RateLimiter::Mode mode,Env * env,bool auto_tuned)46 GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec,
47                                        int64_t refill_period_us,
48                                        int32_t fairness, RateLimiter::Mode mode,
49                                        Env* env, bool auto_tuned)
50     : RateLimiter(mode),
51       refill_period_us_(refill_period_us),
52       rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
53                                      : rate_bytes_per_sec),
54       refill_bytes_per_period_(
55           CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)),
56       env_(env),
57       stop_(false),
58       exit_cv_(&request_mutex_),
59       requests_to_wait_(0),
60       available_bytes_(0),
61       next_refill_us_(NowMicrosMonotonic(env_)),
62       fairness_(fairness > 100 ? 100 : fairness),
63       rnd_((uint32_t)time(nullptr)),
64       leader_(nullptr),
65       auto_tuned_(auto_tuned),
66       num_drains_(0),
67       prev_num_drains_(0),
68       max_bytes_per_sec_(rate_bytes_per_sec),
69       tuned_time_(NowMicrosMonotonic(env_)) {
70   total_requests_[0] = 0;
71   total_requests_[1] = 0;
72   total_bytes_through_[0] = 0;
73   total_bytes_through_[1] = 0;
74 }
75 
~GenericRateLimiter()76 GenericRateLimiter::~GenericRateLimiter() {
77   MutexLock g(&request_mutex_);
78   stop_ = true;
79   requests_to_wait_ = static_cast<int32_t>(queue_[Env::IO_LOW].size() +
80                                            queue_[Env::IO_HIGH].size());
81   for (auto& r : queue_[Env::IO_HIGH]) {
82     r->cv.Signal();
83   }
84   for (auto& r : queue_[Env::IO_LOW]) {
85     r->cv.Signal();
86   }
87   while (requests_to_wait_ > 0) {
88     exit_cv_.Wait();
89   }
90 }
91 
92 // This API allows user to dynamically change rate limiter's bytes per second.
SetBytesPerSecond(int64_t bytes_per_second)93 void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
94   assert(bytes_per_second > 0);
95   rate_bytes_per_sec_ = bytes_per_second;
96   refill_bytes_per_period_.store(
97       CalculateRefillBytesPerPeriod(bytes_per_second),
98       std::memory_order_relaxed);
99 }
100 
Request(int64_t bytes,const Env::IOPriority pri,Statistics * stats)101 void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
102                                  Statistics* stats) {
103   assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed));
104   TEST_SYNC_POINT("GenericRateLimiter::Request");
105   TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
106                            &rate_bytes_per_sec_);
107   MutexLock g(&request_mutex_);
108 
109   if (auto_tuned_) {
110     static const int kRefillsPerTune = 100;
111     std::chrono::microseconds now(NowMicrosMonotonic(env_));
112     if (now - tuned_time_ >=
113         kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
114       Tune();
115     }
116   }
117 
118   if (stop_) {
119     return;
120   }
121 
122   ++total_requests_[pri];
123 
124   if (available_bytes_ >= bytes) {
125     // Refill thread assigns quota and notifies requests waiting on
126     // the queue under mutex. So if we get here, that means nobody
127     // is waiting?
128     available_bytes_ -= bytes;
129     total_bytes_through_[pri] += bytes;
130     return;
131   }
132 
133   // Request cannot be satisfied at this moment, enqueue
134   Req r(bytes, &request_mutex_);
135   queue_[pri].push_back(&r);
136 
137   do {
138     bool timedout = false;
139     // Leader election, candidates can be:
140     // (1) a new incoming request,
141     // (2) a previous leader, whose quota has not been not assigned yet due
142     //     to lower priority
143     // (3) a previous waiter at the front of queue, who got notified by
144     //     previous leader
145     if (leader_ == nullptr &&
146         ((!queue_[Env::IO_HIGH].empty() &&
147             &r == queue_[Env::IO_HIGH].front()) ||
148          (!queue_[Env::IO_LOW].empty() &&
149             &r == queue_[Env::IO_LOW].front()))) {
150       leader_ = &r;
151       int64_t delta = next_refill_us_ - NowMicrosMonotonic(env_);
152       delta = delta > 0 ? delta : 0;
153       if (delta == 0) {
154         timedout = true;
155       } else {
156         int64_t wait_until = env_->NowMicros() + delta;
157         RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
158         ++num_drains_;
159         timedout = r.cv.TimedWait(wait_until);
160       }
161     } else {
162       // Not at the front of queue or an leader has already been elected
163       r.cv.Wait();
164     }
165 
166     // request_mutex_ is held from now on
167     if (stop_) {
168       --requests_to_wait_;
169       exit_cv_.Signal();
170       return;
171     }
172 
173     // Make sure the waken up request is always the header of its queue
174     assert(r.granted ||
175            (!queue_[Env::IO_HIGH].empty() &&
176             &r == queue_[Env::IO_HIGH].front()) ||
177            (!queue_[Env::IO_LOW].empty() &&
178             &r == queue_[Env::IO_LOW].front()));
179     assert(leader_ == nullptr ||
180            (!queue_[Env::IO_HIGH].empty() &&
181             leader_ == queue_[Env::IO_HIGH].front()) ||
182            (!queue_[Env::IO_LOW].empty() &&
183             leader_ == queue_[Env::IO_LOW].front()));
184 
185     if (leader_ == &r) {
186       // Waken up from TimedWait()
187       if (timedout) {
188         // Time to do refill!
189         Refill();
190 
191         // Re-elect a new leader regardless. This is to simplify the
192         // election handling.
193         leader_ = nullptr;
194 
195         // Notify the header of queue if current leader is going away
196         if (r.granted) {
197           // Current leader already got granted with quota. Notify header
198           // of waiting queue to participate next round of election.
199           assert((queue_[Env::IO_HIGH].empty() ||
200                     &r != queue_[Env::IO_HIGH].front()) &&
201                  (queue_[Env::IO_LOW].empty() ||
202                     &r != queue_[Env::IO_LOW].front()));
203           if (!queue_[Env::IO_HIGH].empty()) {
204             queue_[Env::IO_HIGH].front()->cv.Signal();
205           } else if (!queue_[Env::IO_LOW].empty()) {
206             queue_[Env::IO_LOW].front()->cv.Signal();
207           }
208           // Done
209           break;
210         }
211       } else {
212         // Spontaneous wake up, need to continue to wait
213         assert(!r.granted);
214         leader_ = nullptr;
215       }
216     } else {
217       // Waken up by previous leader:
218       // (1) if requested quota is granted, it is done.
219       // (2) if requested quota is not granted, this means current thread
220       // was picked as a new leader candidate (previous leader got quota).
221       // It needs to participate leader election because a new request may
222       // come in before this thread gets waken up. So it may actually need
223       // to do Wait() again.
224       assert(!timedout);
225     }
226   } while (!r.granted);
227 }
228 
Refill()229 void GenericRateLimiter::Refill() {
230   TEST_SYNC_POINT("GenericRateLimiter::Refill");
231   next_refill_us_ = NowMicrosMonotonic(env_) + refill_period_us_;
232   // Carry over the left over quota from the last period
233   auto refill_bytes_per_period =
234       refill_bytes_per_period_.load(std::memory_order_relaxed);
235   if (available_bytes_ < refill_bytes_per_period) {
236     available_bytes_ += refill_bytes_per_period;
237   }
238 
239   int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1;
240   for (int q = 0; q < 2; ++q) {
241     auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH;
242     auto* queue = &queue_[use_pri];
243     while (!queue->empty()) {
244       auto* next_req = queue->front();
245       if (available_bytes_ < next_req->request_bytes) {
246         // avoid starvation
247         next_req->request_bytes -= available_bytes_;
248         available_bytes_ = 0;
249         break;
250       }
251       available_bytes_ -= next_req->request_bytes;
252       next_req->request_bytes = 0;
253       total_bytes_through_[use_pri] += next_req->bytes;
254       queue->pop_front();
255 
256       next_req->granted = true;
257       if (next_req != leader_) {
258         // Quota granted, signal the thread
259         next_req->cv.Signal();
260       }
261     }
262   }
263 }
264 
CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec)265 int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
266     int64_t rate_bytes_per_sec) {
267   if (port::kMaxInt64 / rate_bytes_per_sec < refill_period_us_) {
268     // Avoid unexpected result in the overflow case. The result now is still
269     // inaccurate but is a number that is large enough.
270     return port::kMaxInt64 / 1000000;
271   } else {
272     return std::max(kMinRefillBytesPerPeriod,
273                     rate_bytes_per_sec * refill_period_us_ / 1000000);
274   }
275 }
276 
Tune()277 Status GenericRateLimiter::Tune() {
278   const int kLowWatermarkPct = 50;
279   const int kHighWatermarkPct = 90;
280   const int kAdjustFactorPct = 5;
281   // computed rate limit will be in
282   // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
283   const int kAllowedRangeFactor = 20;
284 
285   std::chrono::microseconds prev_tuned_time = tuned_time_;
286   tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_));
287 
288   int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
289                                std::chrono::microseconds(refill_period_us_) -
290                                std::chrono::microseconds(1)) /
291                               std::chrono::microseconds(refill_period_us_);
292   // We tune every kRefillsPerTune intervals, so the overflow and division-by-
293   // zero conditions should never happen.
294   assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100);
295   assert(elapsed_intervals > 0);
296   int64_t drained_pct =
297       (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals;
298 
299   int64_t prev_bytes_per_sec = GetBytesPerSecond();
300   int64_t new_bytes_per_sec;
301   if (drained_pct == 0) {
302     new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
303   } else if (drained_pct < kLowWatermarkPct) {
304     // sanitize to prevent overflow
305     int64_t sanitized_prev_bytes_per_sec =
306         std::min(prev_bytes_per_sec, port::kMaxInt64 / 100);
307     new_bytes_per_sec =
308         std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
309                  sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
310   } else if (drained_pct > kHighWatermarkPct) {
311     // sanitize to prevent overflow
312     int64_t sanitized_prev_bytes_per_sec = std::min(
313         prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct));
314     new_bytes_per_sec =
315         std::min(max_bytes_per_sec_,
316                  sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
317   } else {
318     new_bytes_per_sec = prev_bytes_per_sec;
319   }
320   if (new_bytes_per_sec != prev_bytes_per_sec) {
321     SetBytesPerSecond(new_bytes_per_sec);
322   }
323   num_drains_ = prev_num_drains_;
324   return Status::OK();
325 }
326 
NewGenericRateLimiter(int64_t rate_bytes_per_sec,int64_t refill_period_us,int32_t fairness,RateLimiter::Mode mode,bool auto_tuned)327 RateLimiter* NewGenericRateLimiter(
328     int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
329     int32_t fairness /* = 10 */,
330     RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
331     bool auto_tuned /* = false */) {
332   assert(rate_bytes_per_sec > 0);
333   assert(refill_period_us > 0);
334   assert(fairness > 0);
335   return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness,
336                                 mode, Env::Default(), auto_tuned);
337 }
338 
339 }  // namespace ROCKSDB_NAMESPACE
340