1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 
6 #include "db/write_controller.h"
7 
8 #include <atomic>
9 #include <cassert>
10 #include <ratio>
11 #include "rocksdb/env.h"
12 
13 namespace ROCKSDB_NAMESPACE {
14 
GetStopToken()15 std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
16   ++total_stopped_;
17   return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
18 }
19 
GetDelayToken(uint64_t write_rate)20 std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
21     uint64_t write_rate) {
22   total_delayed_++;
23   // Reset counters.
24   last_refill_time_ = 0;
25   bytes_left_ = 0;
26   set_delayed_write_rate(write_rate);
27   return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
28 }
29 
30 std::unique_ptr<WriteControllerToken>
GetCompactionPressureToken()31 WriteController::GetCompactionPressureToken() {
32   ++total_compaction_pressure_;
33   return std::unique_ptr<WriteControllerToken>(
34       new CompactionPressureToken(this));
35 }
36 
IsStopped() const37 bool WriteController::IsStopped() const {
38   return total_stopped_.load(std::memory_order_relaxed) > 0;
39 }
40 // This is inside DB mutex, so we can't sleep and need to minimize
41 // frequency to get time.
42 // If it turns out to be a performance issue, we can redesign the thread
43 // synchronization model here.
44 // The function trust caller will sleep micros returned.
GetDelay(Env * env,uint64_t num_bytes)45 uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
46   if (total_stopped_.load(std::memory_order_relaxed) > 0) {
47     return 0;
48   }
49   if (total_delayed_.load(std::memory_order_relaxed) == 0) {
50     return 0;
51   }
52 
53   const uint64_t kMicrosPerSecond = 1000000;
54   const uint64_t kRefillInterval = 1024U;
55 
56   if (bytes_left_ >= num_bytes) {
57     bytes_left_ -= num_bytes;
58     return 0;
59   }
60   // The frequency to get time inside DB mutex is less than one per refill
61   // interval.
62   auto time_now = NowMicrosMonotonic(env);
63 
64   uint64_t sleep_debt = 0;
65   uint64_t time_since_last_refill = 0;
66   if (last_refill_time_ != 0) {
67     if (last_refill_time_ > time_now) {
68       sleep_debt = last_refill_time_ - time_now;
69     } else {
70       time_since_last_refill = time_now - last_refill_time_;
71       bytes_left_ +=
72           static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
73                                 kMicrosPerSecond * delayed_write_rate_);
74       if (time_since_last_refill >= kRefillInterval &&
75           bytes_left_ > num_bytes) {
76         // If refill interval already passed and we have enough bytes
77         // return without extra sleeping.
78         last_refill_time_ = time_now;
79         bytes_left_ -= num_bytes;
80         return 0;
81       }
82     }
83   }
84 
85   uint64_t single_refill_amount =
86       delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
87   if (bytes_left_ + single_refill_amount >= num_bytes) {
88     // Wait until a refill interval
89     // Never trigger expire for less than one refill interval to avoid to get
90     // time.
91     bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
92     last_refill_time_ = time_now + kRefillInterval;
93     return kRefillInterval + sleep_debt;
94   }
95 
96   // Need to refill more than one interval. Need to sleep longer. Check
97   // whether expiration will hit
98 
99   // Sleep just until `num_bytes` is allowed.
100   uint64_t sleep_amount =
101       static_cast<uint64_t>(num_bytes /
102                             static_cast<long double>(delayed_write_rate_) *
103                             kMicrosPerSecond) +
104       sleep_debt;
105   last_refill_time_ = time_now + sleep_amount;
106   return sleep_amount;
107 }
108 
NowMicrosMonotonic(Env * env)109 uint64_t WriteController::NowMicrosMonotonic(Env* env) {
110   return env->NowNanos() / std::milli::den;
111 }
112 
~StopWriteToken()113 StopWriteToken::~StopWriteToken() {
114   assert(controller_->total_stopped_ >= 1);
115   --controller_->total_stopped_;
116 }
117 
~DelayWriteToken()118 DelayWriteToken::~DelayWriteToken() {
119   controller_->total_delayed_--;
120   assert(controller_->total_delayed_.load() >= 0);
121 }
122 
~CompactionPressureToken()123 CompactionPressureToken::~CompactionPressureToken() {
124   controller_->total_compaction_pressure_--;
125   assert(controller_->total_compaction_pressure_ >= 0);
126 }
127 
128 }  // namespace ROCKSDB_NAMESPACE
129