1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/write_controller.h"
7 
8 #include <algorithm>
9 #include <atomic>
10 #include <cassert>
11 #include <ratio>
12 
13 #include "rocksdb/system_clock.h"
14 
15 namespace ROCKSDB_NAMESPACE {
16 
GetStopToken()17 std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
18   ++total_stopped_;
19   return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
20 }
21 
GetDelayToken(uint64_t write_rate)22 std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
23     uint64_t write_rate) {
24   if (0 == total_delayed_++) {
25     // Starting delay, so reset counters.
26     next_refill_time_ = 0;
27     credit_in_bytes_ = 0;
28   }
29   // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in
30   // next_refill_time_ will be based on an old rate. This rate will apply
31   // for subsequent additional debts and for the next refill.
32   set_delayed_write_rate(write_rate);
33   return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
34 }
35 
36 std::unique_ptr<WriteControllerToken>
GetCompactionPressureToken()37 WriteController::GetCompactionPressureToken() {
38   ++total_compaction_pressure_;
39   return std::unique_ptr<WriteControllerToken>(
40       new CompactionPressureToken(this));
41 }
42 
IsStopped() const43 bool WriteController::IsStopped() const {
44   return total_stopped_.load(std::memory_order_relaxed) > 0;
45 }
46 // This is inside DB mutex, so we can't sleep and need to minimize
47 // frequency to get time.
48 // If it turns out to be a performance issue, we can redesign the thread
49 // synchronization model here.
50 // The function trust caller will sleep micros returned.
GetDelay(SystemClock * clock,uint64_t num_bytes)51 uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
52   if (total_stopped_.load(std::memory_order_relaxed) > 0) {
53     return 0;
54   }
55   if (total_delayed_.load(std::memory_order_relaxed) == 0) {
56     return 0;
57   }
58 
59   if (credit_in_bytes_ >= num_bytes) {
60     credit_in_bytes_ -= num_bytes;
61     return 0;
62   }
63   // The frequency to get time inside DB mutex is less than one per refill
64   // interval.
65   auto time_now = NowMicrosMonotonic(clock);
66 
67   const uint64_t kMicrosPerSecond = 1000000;
68   // Refill every 1 ms
69   const uint64_t kMicrosPerRefill = 1000;
70 
71   if (next_refill_time_ == 0) {
72     // Start with an initial allotment of bytes for one interval
73     next_refill_time_ = time_now;
74   }
75   if (next_refill_time_ <= time_now) {
76     // Refill based on time interval plus any extra elapsed
77     uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill;
78     credit_in_bytes_ += static_cast<uint64_t>(
79         1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999);
80     next_refill_time_ = time_now + kMicrosPerRefill;
81 
82     if (credit_in_bytes_ >= num_bytes) {
83       // Avoid delay if possible, to reduce DB mutex release & re-aquire.
84       credit_in_bytes_ -= num_bytes;
85       return 0;
86     }
87   }
88 
89   // We need to delay to avoid exceeding write rate.
90   assert(num_bytes > credit_in_bytes_);
91   uint64_t bytes_over_budget = num_bytes - credit_in_bytes_;
92   uint64_t needed_delay = static_cast<uint64_t>(
93       1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond);
94 
95   credit_in_bytes_ = 0;
96   next_refill_time_ += needed_delay;
97 
98   // Minimum delay of refill interval, to reduce DB mutex contention.
99   return std::max(next_refill_time_ - time_now, kMicrosPerRefill);
100 }
101 
NowMicrosMonotonic(SystemClock * clock)102 uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
103   return clock->NowNanos() / std::milli::den;
104 }
105 
~StopWriteToken()106 StopWriteToken::~StopWriteToken() {
107   assert(controller_->total_stopped_ >= 1);
108   --controller_->total_stopped_;
109 }
110 
~DelayWriteToken()111 DelayWriteToken::~DelayWriteToken() {
112   controller_->total_delayed_--;
113   assert(controller_->total_delayed_.load() >= 0);
114 }
115 
~CompactionPressureToken()116 CompactionPressureToken::~CompactionPressureToken() {
117   controller_->total_compaction_pressure_--;
118   assert(controller_->total_compaction_pressure_ >= 0);
119 }
120 
121 }  // namespace ROCKSDB_NAMESPACE
122