1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <algorithm> 20 #include <atomic> 21 #include <chrono> 22 #include <thread> 23 24 #include <folly/ConstexprMath.h> 25 #include <folly/Likely.h> 26 #include <folly/Optional.h> 27 #include <folly/concurrency/CacheLocality.h> 28 29 namespace folly { 30 31 struct TokenBucketPolicyDefault { 32 using align = 33 std::integral_constant<size_t, hardware_destructive_interference_size>; 34 35 template <typename T> 36 using atom = std::atomic<T>; 37 38 using clock = std::chrono::steady_clock; 39 40 using concurrent = std::true_type; 41 }; 42 43 /** 44 * Thread-safe (atomic) token bucket primitive. 45 * 46 * This primitive can be used to implement a token bucket 47 * (http://en.wikipedia.org/wiki/Token_bucket). It handles 48 * the storage of the state in an atomic way, and presents 49 * an interface dealing with tokens, rate, burstSize and time. 50 * 51 * This primitive records the last time it was updated. This allows the 52 * token bucket to add tokens "just in time" when tokens are requested. 53 * 54 * @tparam Policy A policy. 55 */ 56 template <typename Policy = TokenBucketPolicyDefault> 57 class TokenBucketStorage { 58 template <typename T> 59 using Atom = typename Policy::template atom<T>; 60 using Align = typename Policy::align; 61 using Clock = typename Policy::clock; // do we need clock here? 62 using Concurrent = typename Policy::concurrent; 63 64 static_assert(Clock::is_steady, "clock must be steady"); // do we need clock? 65 66 public: 67 /** 68 * Constructor. 69 * 70 * @param zeroTime Initial time at which to consider the token bucket 71 * starting to fill. Defaults to 0, so by default token 72 * buckets are "full" after construction. 73 */ 74 explicit TokenBucketStorage(double zeroTime = 0) noexcept zeroTime_(zeroTime)75 : zeroTime_(zeroTime) {} 76 77 /** 78 * Copy constructor. 79 * 80 * Thread-safe. (Copy constructors of derived classes may not be thread-safe 81 * however.) 82 */ TokenBucketStorage(const TokenBucketStorage & other)83 TokenBucketStorage(const TokenBucketStorage& other) noexcept 84 : zeroTime_(other.zeroTime_.load(std::memory_order_relaxed)) {} 85 86 /** 87 * Copy-assignment operator. 88 * 89 * Warning: not thread safe for the object being assigned to (including 90 * self-assignment). Thread-safe for the other object. 91 */ 92 TokenBucketStorage& operator=(const TokenBucketStorage& other) noexcept { 93 zeroTime_.store(other.zeroTime(), std::memory_order_relaxed); 94 return *this; 95 } 96 97 /** 98 * Re-initialize token bucket. 99 * 100 * Thread-safe. 101 * 102 * @param zeroTime Initial time at which to consider the token bucket 103 * starting to fill. Defaults to 0, so by default token 104 * bucket is reset to "full". 105 */ 106 void reset(double zeroTime = 0) noexcept { 107 zeroTime_.store(zeroTime, std::memory_order_relaxed); 108 } 109 110 /** 111 * Returns the number of tokens currently available. This could be negative 112 * (if in debt); will be a most burstSize. 113 * 114 * 115 * Thread-safe (but returned values may immediately be outdated). 116 */ available(double rate,double burstSize,double nowInSeconds)117 double available( 118 double rate, double burstSize, double nowInSeconds) const noexcept { 119 assert(rate > 0); 120 assert(burstSize > 0); 121 122 double zt = this->zeroTime_.load(std::memory_order_relaxed); 123 return std::min((nowInSeconds - zt) * rate, burstSize); 124 } 125 126 /** 127 * Consume tokens at the given rate/burst/time. 128 * 129 * Consumption is actually done by the callback function: it's given a 130 * reference with the number of available tokens and returns the number 131 * consumed. Typically the return value would be between 0.0 and available, 132 * but there are no restrictions. 133 * 134 * Note: the callback may be called multiple times, so please no side-effects 135 */ 136 template <typename Callback> consume(double rate,double burstSize,double nowInSeconds,const Callback & callback)137 double consume( 138 double rate, 139 double burstSize, 140 double nowInSeconds, 141 const Callback& callback) { 142 assert(rate > 0); 143 assert(burstSize > 0); 144 145 double zeroTimeOld; 146 double zeroTimeNew; 147 double consumed; 148 do { 149 zeroTimeOld = zeroTime(); 150 double tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize); 151 consumed = callback(tokens); 152 double tokensNew = tokens - consumed; 153 if (consumed == 0.0) { 154 return consumed; 155 } 156 157 zeroTimeNew = nowInSeconds - tokensNew / rate; 158 } while (UNLIKELY( 159 !compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew))); 160 161 return consumed; 162 } 163 164 /** 165 * returns the time at which the bucket will have `target` tokens available. 166 * 167 * Caution: it doesn't make sense to ask about target > burstSize 168 * 169 * Eg. 170 * // time debt repaid 171 * bucket.timeWhenBucket(rate, 0); 172 * 173 * // time bucket is full 174 * bucket.timeWhenBucket(rate, burstSize); 175 */ 176 timeWhenBucket(double rate,double target)177 double timeWhenBucket(double rate, double target) { 178 return zeroTime() + target / rate; 179 } 180 181 /** 182 * Return extra tokens back to the bucket. 183 * 184 * Thread-safe. 185 */ returnTokens(double tokensToReturn,double rate)186 void returnTokens(double tokensToReturn, double rate) { 187 assert(rate > 0); 188 189 returnTokensImpl(tokensToReturn, rate); 190 } 191 192 private: 193 /** 194 * Adjust zeroTime based on rate and tokenCount and return the new value of 195 * zeroTime_. Note: Token count can be negative to move the zeroTime_ 196 * into the future. 197 */ returnTokensImpl(double tokenCount,double rate)198 double returnTokensImpl(double tokenCount, double rate) { 199 auto zeroTimeOld = zeroTime_.load(std::memory_order_relaxed); 200 201 double zeroTimeNew; 202 do { 203 zeroTimeNew = zeroTimeOld - tokenCount / rate; 204 205 } while (UNLIKELY( 206 !compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew))); 207 return zeroTimeNew; 208 } 209 compare_exchange_weak_relaxed(Atom<double> & atom,double & expected,double zeroTime)210 static bool compare_exchange_weak_relaxed( 211 212 Atom<double>& atom, double& expected, double zeroTime) { 213 if (Concurrent::value) { 214 return atom.compare_exchange_weak( 215 expected, zeroTime, std::memory_order_relaxed); 216 } else { 217 return atom.store(zeroTime, std::memory_order_relaxed), true; 218 } 219 } 220 zeroTime()221 double zeroTime() const { 222 return this->zeroTime_.load(std::memory_order_relaxed); 223 } 224 225 static constexpr size_t AlignZeroTime = 226 constexpr_max(Align::value, alignof(Atom<double>)); 227 alignas(AlignZeroTime) Atom<double> zeroTime_; 228 }; 229 230 /** 231 * Thread-safe (atomic) token bucket implementation. 232 * 233 * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream 234 * of events with an average rate and some amount of burstiness. The canonical 235 * example is a packet switched network: the network can accept some number of 236 * bytes per second and the bytes come in finite packets (bursts). A token 237 * bucket stores up to a fixed number of tokens (the burst size). Some number 238 * of tokens are removed when an event occurs. The tokens are replenished at a 239 * fixed rate. Failure to allocate tokens implies resource is unavailable and 240 * caller needs to implement its own retry mechanism. For simple cases where 241 * caller is okay with a FIFO starvation-free scheduling behavior, there are 242 * also APIs to 'borrow' from the future effectively assigning a start time to 243 * the caller when it should proceed with using the resource. It is also 244 * possible to 'return' previously allocated tokens to make them available to 245 * other users. Returns in excess of burstSize are considered expired and 246 * will not be available to later callers. 247 * 248 * This implementation records the last time it was updated. This allows the 249 * token bucket to add tokens "just in time" when tokens are requested. 250 * 251 * The "dynamic" base variant allows the token generation rate and maximum 252 * burst size to change with every token consumption. 253 * 254 * @tparam Policy A policy. 255 */ 256 template <typename Policy = TokenBucketPolicyDefault> 257 class BasicDynamicTokenBucket { 258 template <typename T> 259 using Atom = typename Policy::template atom<T>; 260 using Align = typename Policy::align; 261 using Clock = typename Policy::clock; 262 using Concurrent = typename Policy::concurrent; 263 264 static_assert(Clock::is_steady, "clock must be steady"); 265 266 public: 267 /** 268 * Constructor. 269 * 270 * @param zeroTime Initial time at which to consider the token bucket 271 * starting to fill. Defaults to 0, so by default token 272 * buckets are "full" after construction. 273 */ 274 explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept bucket_(zeroTime)275 : bucket_(zeroTime) {} 276 277 /** 278 * Copy constructor and copy assignment operator. 279 * 280 * Thread-safe. (Copy constructors of derived classes may not be thread-safe 281 * however.) 282 */ 283 BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept = 284 default; 285 BasicDynamicTokenBucket& operator=( 286 const BasicDynamicTokenBucket& other) noexcept = default; 287 288 /** 289 * Re-initialize token bucket. 290 * 291 * Thread-safe. 292 * 293 * @param zeroTime Initial time at which to consider the token bucket 294 * starting to fill. Defaults to 0, so by default token 295 * bucket is reset to "full". 296 */ 297 void reset(double zeroTime = 0) noexcept { bucket_.reset(zeroTime); } 298 299 /** 300 * Returns the current time in seconds since Epoch. 301 */ defaultClockNow()302 static double defaultClockNow() noexcept { 303 auto const now = Clock::now().time_since_epoch(); 304 return std::chrono::duration<double>(now).count(); 305 } 306 307 /** 308 * Attempts to consume some number of tokens. Tokens are first added to the 309 * bucket based on the time elapsed since the last attempt to consume tokens. 310 * Note: Attempts to consume more tokens than the burst size will always 311 * fail. 312 * 313 * Thread-safe. 314 * 315 * @param toConsume The number of tokens to consume. 316 * @param rate Number of tokens to generate per second. 317 * @param burstSize Maximum burst size. Must be greater than 0. 318 * @param nowInSeconds Current time in seconds. Should be monotonically 319 * increasing from the nowInSeconds specified in 320 * this token bucket's constructor. 321 * @return True if the rate limit check passed, false otherwise. 322 */ 323 bool consume( 324 double toConsume, 325 double rate, 326 double burstSize, 327 double nowInSeconds = defaultClockNow()) { 328 assert(rate > 0); 329 assert(burstSize > 0); 330 331 if (bucket_.available(rate, burstSize, nowInSeconds) < 0.0) { 332 return 0; 333 } 334 335 double consumed = bucket_.consume( 336 rate, burstSize, nowInSeconds, [toConsume](double available) { 337 return available < toConsume ? 0.0 : toConsume; 338 }); 339 340 assert(consumed == toConsume || consumed == 0.0); 341 return consumed == toConsume; 342 } 343 344 /** 345 * Similar to consume, but always consumes some number of tokens. If the 346 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the 347 * bucket is drained. 348 * 349 * Thread-safe. 350 * 351 * @param toConsume The number of tokens to consume. 352 * @param rate Number of tokens to generate per second. 353 * @param burstSize Maximum burst size. Must be greater than 0. 354 * @param nowInSeconds Current time in seconds. Should be monotonically 355 * increasing from the nowInSeconds specified in 356 * this token bucket's constructor. 357 * @return number of tokens that were consumed. 358 */ 359 double consumeOrDrain( 360 double toConsume, 361 double rate, 362 double burstSize, 363 double nowInSeconds = defaultClockNow()) { 364 assert(rate > 0); 365 assert(burstSize > 0); 366 367 if (bucket_.available(rate, burstSize, nowInSeconds) <= 0.0) { 368 return 0; 369 } 370 371 double consumed = bucket_.consume( 372 rate, burstSize, nowInSeconds, [toConsume](double available) { 373 return constexpr_min(available, toConsume); 374 }); 375 return consumed; 376 } 377 378 /** 379 * Return extra tokens back to the bucket. 380 * 381 * Thread-safe. 382 */ returnTokens(double tokensToReturn,double rate)383 void returnTokens(double tokensToReturn, double rate) { 384 assert(rate > 0); 385 assert(tokensToReturn > 0); 386 387 bucket_.returnTokens(tokensToReturn, rate); 388 } 389 390 /** 391 * Like consumeOrDrain but the call will always satisfy the asked for count. 392 * It does so by borrowing tokens from the future if the currently available 393 * count isn't sufficient. 394 * 395 * Returns a folly::Optional<double>. The optional wont be set if the request 396 * cannot be satisfied: only case is when it is larger than burstSize. The 397 * value of the optional is a double indicating the time in seconds that the 398 * caller needs to wait at which the reservation becomes valid. The caller 399 * could simply sleep for the returned duration to smooth out the allocation 400 * to match the rate limiter or do some other computation in the meantime. In 401 * any case, any regular consume or consumeOrDrain calls will fail to allocate 402 * any tokens until the future time is reached. 403 * 404 * Note: It is assumed the caller will not ask for a very large count nor use 405 * it immediately (if not waiting inline) as that would break the burst 406 * prevention the limiter is meant to be used for. 407 * 408 * Thread-safe. 409 */ 410 Optional<double> consumeWithBorrowNonBlocking( 411 double toConsume, 412 double rate, 413 double burstSize, 414 double nowInSeconds = defaultClockNow()) { 415 assert(rate > 0); 416 assert(burstSize > 0); 417 418 if (burstSize < toConsume) { 419 return folly::none; 420 } 421 422 while (toConsume > 0) { 423 double consumed = 424 consumeOrDrain(toConsume, rate, burstSize, nowInSeconds); 425 if (consumed > 0) { 426 toConsume -= consumed; 427 } else { 428 bucket_.returnTokens(-toConsume, rate); 429 double debtPaid = bucket_.timeWhenBucket(rate, 0); 430 double napTime = std::max(0.0, debtPaid - nowInSeconds); 431 return napTime; 432 } 433 } 434 return 0; 435 } 436 437 /** 438 * Convenience wrapper around non-blocking borrow to sleep inline until 439 * reservation is valid. 440 */ 441 bool consumeWithBorrowAndWait( 442 double toConsume, 443 double rate, 444 double burstSize, 445 double nowInSeconds = defaultClockNow()) { 446 auto res = 447 consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds); 448 if (res.value_or(0) > 0) { 449 const auto napUSec = static_cast<int64_t>(res.value() * 1000000); 450 std::this_thread::sleep_for(std::chrono::microseconds(napUSec)); 451 } 452 return res.has_value(); 453 } 454 455 /** 456 * Returns the number of tokens currently available. 457 * 458 * Thread-safe (but returned value may immediately be outdated). 459 */ 460 double available( 461 double rate, 462 double burstSize, 463 double nowInSeconds = defaultClockNow()) const noexcept { 464 assert(rate > 0); 465 assert(burstSize > 0); 466 return std::max(0.0, bucket_.available(rate, burstSize, nowInSeconds)); 467 } 468 469 private: 470 TokenBucketStorage<Policy> bucket_; 471 }; 472 473 /** 474 * Specialization of BasicDynamicTokenBucket with a fixed token 475 * generation rate and a fixed maximum burst size. 476 */ 477 template <typename Policy = TokenBucketPolicyDefault> 478 class BasicTokenBucket { 479 private: 480 using Impl = BasicDynamicTokenBucket<Policy>; 481 482 public: 483 /** 484 * Construct a token bucket with a specific maximum rate and burst size. 485 * 486 * @param genRate Number of tokens to generate per second. 487 * @param burstSize Maximum burst size. Must be greater than 0. 488 * @param zeroTime Initial time at which to consider the token bucket 489 * starting to fill. Defaults to 0, so by default token 490 * bucket is "full" after construction. 491 */ 492 BasicTokenBucket( 493 double genRate, double burstSize, double zeroTime = 0) noexcept tokenBucket_(zeroTime)494 : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) { 495 assert(rate_ > 0); 496 assert(burstSize_ > 0); 497 } 498 499 /** 500 * Copy constructor. 501 * 502 * Warning: not thread safe! 503 */ 504 BasicTokenBucket(const BasicTokenBucket& other) noexcept = default; 505 506 /** 507 * Copy-assignment operator. 508 * 509 * Warning: not thread safe! 510 */ 511 BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default; 512 513 /** 514 * Returns the current time in seconds since Epoch. 515 */ defaultClockNow()516 static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) { 517 return Impl::defaultClockNow(); 518 } 519 520 /** 521 * Change rate and burst size. 522 * 523 * Warning: not thread safe! 524 * 525 * @param genRate Number of tokens to generate per second. 526 * @param burstSize Maximum burst size. Must be greater than 0. 527 * @param nowInSeconds Current time in seconds. Should be monotonically 528 * increasing from the nowInSeconds specified in 529 * this token bucket's constructor. 530 */ 531 void reset( 532 double genRate, 533 double burstSize, 534 double nowInSeconds = defaultClockNow()) noexcept { 535 assert(genRate > 0); 536 assert(burstSize > 0); 537 const double availTokens = available(nowInSeconds); 538 rate_ = genRate; 539 burstSize_ = burstSize; 540 setCapacity(availTokens, nowInSeconds); 541 } 542 543 /** 544 * Change number of tokens in bucket. 545 * 546 * Warning: not thread safe! 547 * 548 * @param tokens Desired number of tokens in bucket after the call. 549 * @param nowInSeconds Current time in seconds. Should be monotonically 550 * increasing from the nowInSeconds specified in 551 * this token bucket's constructor. 552 */ setCapacity(double tokens,double nowInSeconds)553 void setCapacity(double tokens, double nowInSeconds) noexcept { 554 tokenBucket_.reset(nowInSeconds - tokens / rate_); 555 } 556 557 /** 558 * Attempts to consume some number of tokens. Tokens are first added to the 559 * bucket based on the time elapsed since the last attempt to consume tokens. 560 * Note: Attempts to consume more tokens than the burst size will always 561 * fail. 562 * 563 * Thread-safe. 564 * 565 * @param toConsume The number of tokens to consume. 566 * @param nowInSeconds Current time in seconds. Should be monotonically 567 * increasing from the nowInSeconds specified in 568 * this token bucket's constructor. 569 * @return True if the rate limit check passed, false otherwise. 570 */ 571 bool consume(double toConsume, double nowInSeconds = defaultClockNow()) { 572 return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds); 573 } 574 575 /** 576 * Similar to consume, but always consumes some number of tokens. If the 577 * bucket contains enough tokens - consumes toConsume tokens. Otherwise the 578 * bucket is drained. 579 * 580 * Thread-safe. 581 * 582 * @param toConsume The number of tokens to consume. 583 * @param nowInSeconds Current time in seconds. Should be monotonically 584 * increasing from the nowInSeconds specified in 585 * this token bucket's constructor. 586 * @return number of tokens that were consumed. 587 */ 588 double consumeOrDrain( 589 double toConsume, double nowInSeconds = defaultClockNow()) { 590 return tokenBucket_.consumeOrDrain( 591 toConsume, rate_, burstSize_, nowInSeconds); 592 } 593 594 /** 595 * Returns extra token back to the bucket. Could be negative--it's all good. 596 */ returnTokens(double tokensToReturn)597 void returnTokens(double tokensToReturn) { 598 return tokenBucket_.returnTokens(tokensToReturn, rate_); 599 } 600 601 /** 602 * Reserve tokens and return time to wait for in order for the reservation to 603 * be compatible with the bucket configuration. 604 */ 605 Optional<double> consumeWithBorrowNonBlocking( 606 double toConsume, double nowInSeconds = defaultClockNow()) { 607 return tokenBucket_.consumeWithBorrowNonBlocking( 608 toConsume, rate_, burstSize_, nowInSeconds); 609 } 610 611 /** 612 * Reserve tokens. Blocks if need be until reservation is satisfied. 613 */ 614 bool consumeWithBorrowAndWait( 615 double toConsume, double nowInSeconds = defaultClockNow()) { 616 return tokenBucket_.consumeWithBorrowAndWait( 617 toConsume, rate_, burstSize_, nowInSeconds); 618 } 619 620 /** 621 * Returns the number of tokens currently available. 622 * 623 * Thread-safe (but returned value may immediately be outdated). 624 */ 625 double available(double nowInSeconds = defaultClockNow()) const { 626 return tokenBucket_.available(rate_, burstSize_, nowInSeconds); 627 } 628 629 /** 630 * Returns the number of tokens generated per second. 631 * 632 * Thread-safe (but returned value may immediately be outdated). 633 */ rate()634 double rate() const noexcept { return rate_; } 635 636 /** 637 * Returns the maximum burst size. 638 * 639 * Thread-safe (but returned value may immediately be outdated). 640 */ burst()641 double burst() const noexcept { return burstSize_; } 642 643 private: 644 Impl tokenBucket_; 645 double rate_; 646 double burstSize_; 647 }; 648 649 using TokenBucket = BasicTokenBucket<>; 650 using DynamicTokenBucket = BasicDynamicTokenBucket<>; 651 652 } // namespace folly 653