1 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2 //  This source code is licensed under both the GPLv2 (found in the
3 //  COPYING file in the root directory) and Apache 2.0 License
4 //  (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 
10 #include "util/rate_limiter.h"
11 
12 #include <chrono>
13 #include <cinttypes>
14 #include <limits>
15 
16 #include "db/db_test_util.h"
17 #include "rocksdb/env.h"
18 #include "test_util/sync_point.h"
19 #include "test_util/testharness.h"
20 #include "util/random.h"
21 
22 namespace ROCKSDB_NAMESPACE {
23 
24 // TODO(yhchiang): the rate will not be accurate when we run test in parallel.
25 class RateLimiterTest : public testing::Test {};
26 
TEST_F(RateLimiterTest,OverflowRate)27 TEST_F(RateLimiterTest, OverflowRate) {
28   GenericRateLimiter limiter(port::kMaxInt64, 1000, 10,
29                              RateLimiter::Mode::kWritesOnly, Env::Default(),
30                              false /* auto_tuned */);
31   ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll);
32 }
33 
TEST_F(RateLimiterTest,StartStop)34 TEST_F(RateLimiterTest, StartStop) {
35   std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(100, 100, 10));
36 }
37 
TEST_F(RateLimiterTest,Modes)38 TEST_F(RateLimiterTest, Modes) {
39   for (auto mode : {RateLimiter::Mode::kWritesOnly,
40                     RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {
41     GenericRateLimiter limiter(
42         2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
43         10 /* fairness */, mode, Env::Default(), false /* auto_tuned */);
44     limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
45                     RateLimiter::OpType::kRead);
46     if (mode == RateLimiter::Mode::kWritesOnly) {
47       ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH));
48     } else {
49       ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
50     }
51 
52     limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */,
53                     RateLimiter::OpType::kWrite);
54     if (mode == RateLimiter::Mode::kAllIo) {
55       ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
56     } else {
57       ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH));
58     }
59   }
60 }
61 
62 #if !(defined(TRAVIS) && defined(OS_MACOSX))
TEST_F(RateLimiterTest,Rate)63 TEST_F(RateLimiterTest, Rate) {
64   auto* env = Env::Default();
65   struct Arg {
66     Arg(int32_t _target_rate, int _burst)
67         : limiter(NewGenericRateLimiter(_target_rate, 100 * 1000, 10)),
68           request_size(_target_rate / 10),
69           burst(_burst) {}
70     std::unique_ptr<RateLimiter> limiter;
71     int32_t request_size;
72     int burst;
73   };
74 
75   auto writer = [](void* p) {
76     auto* thread_env = Env::Default();
77     auto* arg = static_cast<Arg*>(p);
78     // Test for 2 seconds
79     auto until = thread_env->NowMicros() + 2 * 1000000;
80     Random r((uint32_t)(thread_env->NowNanos() %
81                         std::numeric_limits<uint32_t>::max()));
82     while (thread_env->NowMicros() < until) {
83       for (int i = 0; i < static_cast<int>(r.Skewed(arg->burst) + 1); ++i) {
84         arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1,
85                               Env::IO_HIGH, nullptr /* stats */,
86                               RateLimiter::OpType::kWrite);
87       }
88       arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW,
89                             nullptr /* stats */, RateLimiter::OpType::kWrite);
90     }
91   };
92 
93   for (int i = 1; i <= 16; i *= 2) {
94     int32_t target = i * 1024 * 10;
95     Arg arg(target, i / 4 + 1);
96     int64_t old_total_bytes_through = 0;
97     for (int iter = 1; iter <= 2; ++iter) {
98       // second iteration changes the target dynamically
99       if (iter == 2) {
100         target *= 2;
101         arg.limiter->SetBytesPerSecond(target);
102       }
103       auto start = env->NowMicros();
104       for (int t = 0; t < i; ++t) {
105         env->StartThread(writer, &arg);
106       }
107       env->WaitForJoin();
108 
109       auto elapsed = env->NowMicros() - start;
110       double rate =
111           (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) *
112           1000000.0 / elapsed;
113       old_total_bytes_through = arg.limiter->GetTotalBytesThrough();
114       fprintf(stderr,
115               "request size [1 - %" PRIi32 "], limit %" PRIi32
116               " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n",
117               arg.request_size - 1, target / 1024, rate / 1024,
118               elapsed / 1000000.0);
119 
120       ASSERT_GE(rate / target, 0.80);
121       ASSERT_LE(rate / target, 1.25);
122     }
123   }
124 }
125 #endif
126 
TEST_F(RateLimiterTest,LimitChangeTest)127 TEST_F(RateLimiterTest, LimitChangeTest) {
128   // starvation test when limit changes to a smaller value
129   int64_t refill_period = 1000 * 1000;
130   auto* env = Env::Default();
131   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
132   struct Arg {
133     Arg(int32_t _request_size, Env::IOPriority _pri,
134         std::shared_ptr<RateLimiter> _limiter)
135         : request_size(_request_size), pri(_pri), limiter(_limiter) {}
136     int32_t request_size;
137     Env::IOPriority pri;
138     std::shared_ptr<RateLimiter> limiter;
139   };
140 
141   auto writer = [](void* p) {
142     auto* arg = static_cast<Arg*>(p);
143     arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */,
144                           RateLimiter::OpType::kWrite);
145   };
146 
147   for (uint32_t i = 1; i <= 16; i <<= 1) {
148     int32_t target = i * 1024 * 10;
149     // refill per second
150     for (int iter = 0; iter < 2; iter++) {
151       std::shared_ptr<RateLimiter> limiter =
152           std::make_shared<GenericRateLimiter>(
153               target, refill_period, 10, RateLimiter::Mode::kWritesOnly,
154               Env::Default(), false /* auto_tuned */);
155       ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
156           {{"GenericRateLimiter::Request",
157             "RateLimiterTest::LimitChangeTest:changeLimitStart"},
158            {"RateLimiterTest::LimitChangeTest:changeLimitEnd",
159             "GenericRateLimiter::Refill"}});
160       Arg arg(target, Env::IO_HIGH, limiter);
161       // The idea behind is to start a request first, then before it refills,
162       // update limit to a different value (2X/0.5X). No starvation should
163       // be guaranteed under any situation
164       // TODO(lightmark): more test cases are welcome.
165       env->StartThread(writer, &arg);
166       int32_t new_limit = (target << 1) >> (iter << 1);
167       TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart");
168       arg.limiter->SetBytesPerSecond(new_limit);
169       TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd");
170       env->WaitForJoin();
171       fprintf(stderr,
172               "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32
173               "KB/sec, refill period %" PRIi64 " ms\n",
174               target / 1024, new_limit / 1024, refill_period / 1000);
175     }
176   }
177 }
178 
TEST_F(RateLimiterTest,AutoTuneIncreaseWhenFull)179 TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
180   const std::chrono::seconds kTimePerRefill(1);
181   const int kRefillsPerTune = 100;  // needs to match util/rate_limiter.cc
182 
183   SpecialEnv special_env(Env::Default());
184   special_env.no_slowdown_ = true;
185   special_env.time_elapse_only_sleep_ = true;
186 
187   auto stats = CreateDBStatistics();
188   std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(
189       1000 /* rate_bytes_per_sec */,
190       std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */,
191       RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */));
192 
193   // Use callback to advance time because we need to advance (1) after Request()
194   // has determined the bytes are not available; and (2) before Refill()
195   // computes the next refill time (ensuring refill time in the future allows
196   // the next request to drain the rate limiter).
197   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
198       "GenericRateLimiter::Refill", [&](void* /*arg*/) {
199         special_env.SleepForMicroseconds(static_cast<int>(
200             std::chrono::microseconds(kTimePerRefill).count()));
201       });
202   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
203 
204   // verify rate limit increases after a sequence of periods where rate limiter
205   // is always drained
206   int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
207   rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
208                         RateLimiter::OpType::kWrite);
209   while (std::chrono::microseconds(special_env.NowMicros()) <=
210          kRefillsPerTune * kTimePerRefill) {
211     rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(),
212                           RateLimiter::OpType::kWrite);
213   }
214   int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
215   ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec);
216 
217   ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
218 
219   // decreases after a sequence of periods where rate limiter is not drained
220   orig_bytes_per_sec = new_bytes_per_sec;
221   special_env.SleepForMicroseconds(static_cast<int>(
222       kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count()));
223   // make a request so tuner can be triggered
224   rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(),
225                         RateLimiter::OpType::kWrite);
226   new_bytes_per_sec = rate_limiter->GetSingleBurstBytes();
227   ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec);
228 }
229 
230 }  // namespace ROCKSDB_NAMESPACE
231 
main(int argc,char ** argv)232 int main(int argc, char** argv) {
233   ::testing::InitGoogleTest(&argc, argv);
234   return RUN_ALL_TESTS();
235 }
236