1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <chrono>
22 #include <thread>
23 
24 #include <folly/ConstexprMath.h>
25 #include <folly/Likely.h>
26 #include <folly/Optional.h>
27 #include <folly/concurrency/CacheLocality.h>
28 
29 namespace folly {
30 
31 struct TokenBucketPolicyDefault {
32   using align =
33       std::integral_constant<size_t, hardware_destructive_interference_size>;
34 
35   template <typename T>
36   using atom = std::atomic<T>;
37 
38   using clock = std::chrono::steady_clock;
39 
40   using concurrent = std::true_type;
41 };
42 
43 /**
44  * Thread-safe (atomic) token bucket primitive.
45  *
46  * This primitive can be used to implement a token bucket
47  * (http://en.wikipedia.org/wiki/Token_bucket).  It handles
48  * the storage of the state in an atomic way, and presents
49  * an interface dealing with tokens, rate, burstSize and time.
50  *
51  * This primitive records the last time it was updated. This allows the
52  * token bucket to add tokens "just in time" when tokens are requested.
53  *
54  * @tparam Policy A policy.
55  */
56 template <typename Policy = TokenBucketPolicyDefault>
57 class TokenBucketStorage {
58   template <typename T>
59   using Atom = typename Policy::template atom<T>;
60   using Align = typename Policy::align;
61   using Clock = typename Policy::clock; // do we need clock here?
62   using Concurrent = typename Policy::concurrent;
63 
64   static_assert(Clock::is_steady, "clock must be steady"); // do we need clock?
65 
66  public:
67   /**
68    * Constructor.
69    *
70    * @param zeroTime Initial time at which to consider the token bucket
71    *                 starting to fill. Defaults to 0, so by default token
72    *                 buckets are "full" after construction.
73    */
74   explicit TokenBucketStorage(double zeroTime = 0) noexcept
zeroTime_(zeroTime)75       : zeroTime_(zeroTime) {}
76 
77   /**
78    * Copy constructor.
79    *
80    * Thread-safe. (Copy constructors of derived classes may not be thread-safe
81    * however.)
82    */
TokenBucketStorage(const TokenBucketStorage & other)83   TokenBucketStorage(const TokenBucketStorage& other) noexcept
84       : zeroTime_(other.zeroTime_.load(std::memory_order_relaxed)) {}
85 
86   /**
87    * Copy-assignment operator.
88    *
89    * Warning: not thread safe for the object being assigned to (including
90    * self-assignment). Thread-safe for the other object.
91    */
92   TokenBucketStorage& operator=(const TokenBucketStorage& other) noexcept {
93     zeroTime_.store(other.zeroTime(), std::memory_order_relaxed);
94     return *this;
95   }
96 
97   /**
98    * Re-initialize token bucket.
99    *
100    * Thread-safe.
101    *
102    * @param zeroTime Initial time at which to consider the token bucket
103    *                 starting to fill. Defaults to 0, so by default token
104    *                 bucket is reset to "full".
105    */
106   void reset(double zeroTime = 0) noexcept {
107     zeroTime_.store(zeroTime, std::memory_order_relaxed);
108   }
109 
110   /**
111    * Returns the number of tokens currently available.  This could be negative
112    * (if in debt); will be a most burstSize.
113    *
114    *
115    * Thread-safe (but returned values may immediately be outdated).
116    */
available(double rate,double burstSize,double nowInSeconds)117   double available(
118       double rate, double burstSize, double nowInSeconds) const noexcept {
119     assert(rate > 0);
120     assert(burstSize > 0);
121 
122     double zt = this->zeroTime_.load(std::memory_order_relaxed);
123     return std::min((nowInSeconds - zt) * rate, burstSize);
124   }
125 
126   /**
127    * Consume tokens at the given rate/burst/time.
128    *
129    * Consumption is actually done by the callback function: it's given a
130    * reference with the number of available tokens and returns the number
131    * consumed.  Typically the return value would be between 0.0 and available,
132    * but there are no restrictions.
133    *
134    * Note: the callback may be called multiple times, so please no side-effects
135    */
136   template <typename Callback>
consume(double rate,double burstSize,double nowInSeconds,const Callback & callback)137   double consume(
138       double rate,
139       double burstSize,
140       double nowInSeconds,
141       const Callback& callback) {
142     assert(rate > 0);
143     assert(burstSize > 0);
144 
145     double zeroTimeOld;
146     double zeroTimeNew;
147     double consumed;
148     do {
149       zeroTimeOld = zeroTime();
150       double tokens = std::min((nowInSeconds - zeroTimeOld) * rate, burstSize);
151       consumed = callback(tokens);
152       double tokensNew = tokens - consumed;
153       if (consumed == 0.0) {
154         return consumed;
155       }
156 
157       zeroTimeNew = nowInSeconds - tokensNew / rate;
158     } while (UNLIKELY(
159         !compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));
160 
161     return consumed;
162   }
163 
164   /**
165    * returns the time at which the bucket will have `target` tokens available.
166    *
167    * Caution: it doesn't make sense to ask about target > burstSize
168    *
169    * Eg.
170    *     // time debt repaid
171    *     bucket.timeWhenBucket(rate, 0);
172    *
173    *     // time bucket is full
174    *     bucket.timeWhenBucket(rate, burstSize);
175    */
176 
timeWhenBucket(double rate,double target)177   double timeWhenBucket(double rate, double target) {
178     return zeroTime() + target / rate;
179   }
180 
181   /**
182    * Return extra tokens back to the bucket.
183    *
184    * Thread-safe.
185    */
returnTokens(double tokensToReturn,double rate)186   void returnTokens(double tokensToReturn, double rate) {
187     assert(rate > 0);
188 
189     returnTokensImpl(tokensToReturn, rate);
190   }
191 
192  private:
193   /**
194    * Adjust zeroTime based on rate and tokenCount and return the new value of
195    * zeroTime_. Note: Token count can be negative to move the zeroTime_
196    * into the future.
197    */
returnTokensImpl(double tokenCount,double rate)198   double returnTokensImpl(double tokenCount, double rate) {
199     auto zeroTimeOld = zeroTime_.load(std::memory_order_relaxed);
200 
201     double zeroTimeNew;
202     do {
203       zeroTimeNew = zeroTimeOld - tokenCount / rate;
204 
205     } while (UNLIKELY(
206         !compare_exchange_weak_relaxed(zeroTime_, zeroTimeOld, zeroTimeNew)));
207     return zeroTimeNew;
208   }
209 
compare_exchange_weak_relaxed(Atom<double> & atom,double & expected,double zeroTime)210   static bool compare_exchange_weak_relaxed(
211 
212       Atom<double>& atom, double& expected, double zeroTime) {
213     if (Concurrent::value) {
214       return atom.compare_exchange_weak(
215           expected, zeroTime, std::memory_order_relaxed);
216     } else {
217       return atom.store(zeroTime, std::memory_order_relaxed), true;
218     }
219   }
220 
zeroTime()221   double zeroTime() const {
222     return this->zeroTime_.load(std::memory_order_relaxed);
223   }
224 
225   static constexpr size_t AlignZeroTime =
226       constexpr_max(Align::value, alignof(Atom<double>));
227   alignas(AlignZeroTime) Atom<double> zeroTime_;
228 };
229 
230 /**
231  * Thread-safe (atomic) token bucket implementation.
232  *
233  * A token bucket (http://en.wikipedia.org/wiki/Token_bucket) models a stream
234  * of events with an average rate and some amount of burstiness. The canonical
235  * example is a packet switched network: the network can accept some number of
236  * bytes per second and the bytes come in finite packets (bursts). A token
237  * bucket stores up to a fixed number of tokens (the burst size). Some number
238  * of tokens are removed when an event occurs. The tokens are replenished at a
239  * fixed rate. Failure to allocate tokens implies resource is unavailable and
240  * caller needs to implement its own retry mechanism. For simple cases where
241  * caller is okay with a FIFO starvation-free scheduling behavior, there are
242  * also APIs to 'borrow' from the future effectively assigning a start time to
243  * the caller when it should proceed with using the resource. It is also
244  * possible to 'return' previously allocated tokens to make them available to
245  * other users. Returns in excess of burstSize are considered expired and
246  * will not be available to later callers.
247  *
248  * This implementation records the last time it was updated. This allows the
249  * token bucket to add tokens "just in time" when tokens are requested.
250  *
251  * The "dynamic" base variant allows the token generation rate and maximum
252  * burst size to change with every token consumption.
253  *
254  * @tparam Policy A policy.
255  */
256 template <typename Policy = TokenBucketPolicyDefault>
257 class BasicDynamicTokenBucket {
258   template <typename T>
259   using Atom = typename Policy::template atom<T>;
260   using Align = typename Policy::align;
261   using Clock = typename Policy::clock;
262   using Concurrent = typename Policy::concurrent;
263 
264   static_assert(Clock::is_steady, "clock must be steady");
265 
266  public:
267   /**
268    * Constructor.
269    *
270    * @param zeroTime Initial time at which to consider the token bucket
271    *                 starting to fill. Defaults to 0, so by default token
272    *                 buckets are "full" after construction.
273    */
274   explicit BasicDynamicTokenBucket(double zeroTime = 0) noexcept
bucket_(zeroTime)275       : bucket_(zeroTime) {}
276 
277   /**
278    * Copy constructor and copy assignment operator.
279    *
280    * Thread-safe. (Copy constructors of derived classes may not be thread-safe
281    * however.)
282    */
283   BasicDynamicTokenBucket(const BasicDynamicTokenBucket& other) noexcept =
284       default;
285   BasicDynamicTokenBucket& operator=(
286       const BasicDynamicTokenBucket& other) noexcept = default;
287 
288   /**
289    * Re-initialize token bucket.
290    *
291    * Thread-safe.
292    *
293    * @param zeroTime Initial time at which to consider the token bucket
294    *                 starting to fill. Defaults to 0, so by default token
295    *                 bucket is reset to "full".
296    */
297   void reset(double zeroTime = 0) noexcept { bucket_.reset(zeroTime); }
298 
299   /**
300    * Returns the current time in seconds since Epoch.
301    */
defaultClockNow()302   static double defaultClockNow() noexcept {
303     auto const now = Clock::now().time_since_epoch();
304     return std::chrono::duration<double>(now).count();
305   }
306 
307   /**
308    * Attempts to consume some number of tokens. Tokens are first added to the
309    * bucket based on the time elapsed since the last attempt to consume tokens.
310    * Note: Attempts to consume more tokens than the burst size will always
311    * fail.
312    *
313    * Thread-safe.
314    *
315    * @param toConsume The number of tokens to consume.
316    * @param rate Number of tokens to generate per second.
317    * @param burstSize Maximum burst size. Must be greater than 0.
318    * @param nowInSeconds Current time in seconds. Should be monotonically
319    *                     increasing from the nowInSeconds specified in
320    *                     this token bucket's constructor.
321    * @return True if the rate limit check passed, false otherwise.
322    */
323   bool consume(
324       double toConsume,
325       double rate,
326       double burstSize,
327       double nowInSeconds = defaultClockNow()) {
328     assert(rate > 0);
329     assert(burstSize > 0);
330 
331     if (bucket_.available(rate, burstSize, nowInSeconds) < 0.0) {
332       return 0;
333     }
334 
335     double consumed = bucket_.consume(
336         rate, burstSize, nowInSeconds, [toConsume](double available) {
337           return available < toConsume ? 0.0 : toConsume;
338         });
339 
340     assert(consumed == toConsume || consumed == 0.0);
341     return consumed == toConsume;
342   }
343 
344   /**
345    * Similar to consume, but always consumes some number of tokens.  If the
346    * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
347    * bucket is drained.
348    *
349    * Thread-safe.
350    *
351    * @param toConsume The number of tokens to consume.
352    * @param rate Number of tokens to generate per second.
353    * @param burstSize Maximum burst size. Must be greater than 0.
354    * @param nowInSeconds Current time in seconds. Should be monotonically
355    *                     increasing from the nowInSeconds specified in
356    *                     this token bucket's constructor.
357    * @return number of tokens that were consumed.
358    */
359   double consumeOrDrain(
360       double toConsume,
361       double rate,
362       double burstSize,
363       double nowInSeconds = defaultClockNow()) {
364     assert(rate > 0);
365     assert(burstSize > 0);
366 
367     if (bucket_.available(rate, burstSize, nowInSeconds) <= 0.0) {
368       return 0;
369     }
370 
371     double consumed = bucket_.consume(
372         rate, burstSize, nowInSeconds, [toConsume](double available) {
373           return constexpr_min(available, toConsume);
374         });
375     return consumed;
376   }
377 
378   /**
379    * Return extra tokens back to the bucket.
380    *
381    * Thread-safe.
382    */
returnTokens(double tokensToReturn,double rate)383   void returnTokens(double tokensToReturn, double rate) {
384     assert(rate > 0);
385     assert(tokensToReturn > 0);
386 
387     bucket_.returnTokens(tokensToReturn, rate);
388   }
389 
390   /**
391    * Like consumeOrDrain but the call will always satisfy the asked for count.
392    * It does so by borrowing tokens from the future if the currently available
393    * count isn't sufficient.
394    *
395    * Returns a folly::Optional<double>. The optional wont be set if the request
396    * cannot be satisfied: only case is when it is larger than burstSize. The
397    * value of the optional is a double indicating the time in seconds that the
398    * caller needs to wait at which the reservation becomes valid. The caller
399    * could simply sleep for the returned duration to smooth out the allocation
400    * to match the rate limiter or do some other computation in the meantime. In
401    * any case, any regular consume or consumeOrDrain calls will fail to allocate
402    * any tokens until the future time is reached.
403    *
404    * Note: It is assumed the caller will not ask for a very large count nor use
405    * it immediately (if not waiting inline) as that would break the burst
406    * prevention the limiter is meant to be used for.
407    *
408    * Thread-safe.
409    */
410   Optional<double> consumeWithBorrowNonBlocking(
411       double toConsume,
412       double rate,
413       double burstSize,
414       double nowInSeconds = defaultClockNow()) {
415     assert(rate > 0);
416     assert(burstSize > 0);
417 
418     if (burstSize < toConsume) {
419       return folly::none;
420     }
421 
422     while (toConsume > 0) {
423       double consumed =
424           consumeOrDrain(toConsume, rate, burstSize, nowInSeconds);
425       if (consumed > 0) {
426         toConsume -= consumed;
427       } else {
428         bucket_.returnTokens(-toConsume, rate);
429         double debtPaid = bucket_.timeWhenBucket(rate, 0);
430         double napTime = std::max(0.0, debtPaid - nowInSeconds);
431         return napTime;
432       }
433     }
434     return 0;
435   }
436 
437   /**
438    * Convenience wrapper around non-blocking borrow to sleep inline until
439    * reservation is valid.
440    */
441   bool consumeWithBorrowAndWait(
442       double toConsume,
443       double rate,
444       double burstSize,
445       double nowInSeconds = defaultClockNow()) {
446     auto res =
447         consumeWithBorrowNonBlocking(toConsume, rate, burstSize, nowInSeconds);
448     if (res.value_or(0) > 0) {
449       const auto napUSec = static_cast<int64_t>(res.value() * 1000000);
450       std::this_thread::sleep_for(std::chrono::microseconds(napUSec));
451     }
452     return res.has_value();
453   }
454 
455   /**
456    * Returns the number of tokens currently available.
457    *
458    * Thread-safe (but returned value may immediately be outdated).
459    */
460   double available(
461       double rate,
462       double burstSize,
463       double nowInSeconds = defaultClockNow()) const noexcept {
464     assert(rate > 0);
465     assert(burstSize > 0);
466     return std::max(0.0, bucket_.available(rate, burstSize, nowInSeconds));
467   }
468 
469  private:
470   TokenBucketStorage<Policy> bucket_;
471 };
472 
473 /**
474  * Specialization of BasicDynamicTokenBucket with a fixed token
475  * generation rate and a fixed maximum burst size.
476  */
477 template <typename Policy = TokenBucketPolicyDefault>
478 class BasicTokenBucket {
479  private:
480   using Impl = BasicDynamicTokenBucket<Policy>;
481 
482  public:
483   /**
484    * Construct a token bucket with a specific maximum rate and burst size.
485    *
486    * @param genRate Number of tokens to generate per second.
487    * @param burstSize Maximum burst size. Must be greater than 0.
488    * @param zeroTime Initial time at which to consider the token bucket
489    *                 starting to fill. Defaults to 0, so by default token
490    *                 bucket is "full" after construction.
491    */
492   BasicTokenBucket(
493       double genRate, double burstSize, double zeroTime = 0) noexcept
tokenBucket_(zeroTime)494       : tokenBucket_(zeroTime), rate_(genRate), burstSize_(burstSize) {
495     assert(rate_ > 0);
496     assert(burstSize_ > 0);
497   }
498 
499   /**
500    * Copy constructor.
501    *
502    * Warning: not thread safe!
503    */
504   BasicTokenBucket(const BasicTokenBucket& other) noexcept = default;
505 
506   /**
507    * Copy-assignment operator.
508    *
509    * Warning: not thread safe!
510    */
511   BasicTokenBucket& operator=(const BasicTokenBucket& other) noexcept = default;
512 
513   /**
514    * Returns the current time in seconds since Epoch.
515    */
defaultClockNow()516   static double defaultClockNow() noexcept(noexcept(Impl::defaultClockNow())) {
517     return Impl::defaultClockNow();
518   }
519 
520   /**
521    * Change rate and burst size.
522    *
523    * Warning: not thread safe!
524    *
525    * @param genRate Number of tokens to generate per second.
526    * @param burstSize Maximum burst size. Must be greater than 0.
527    * @param nowInSeconds Current time in seconds. Should be monotonically
528    *                     increasing from the nowInSeconds specified in
529    *                     this token bucket's constructor.
530    */
531   void reset(
532       double genRate,
533       double burstSize,
534       double nowInSeconds = defaultClockNow()) noexcept {
535     assert(genRate > 0);
536     assert(burstSize > 0);
537     const double availTokens = available(nowInSeconds);
538     rate_ = genRate;
539     burstSize_ = burstSize;
540     setCapacity(availTokens, nowInSeconds);
541   }
542 
543   /**
544    * Change number of tokens in bucket.
545    *
546    * Warning: not thread safe!
547    *
548    * @param tokens Desired number of tokens in bucket after the call.
549    * @param nowInSeconds Current time in seconds. Should be monotonically
550    *                     increasing from the nowInSeconds specified in
551    *                     this token bucket's constructor.
552    */
setCapacity(double tokens,double nowInSeconds)553   void setCapacity(double tokens, double nowInSeconds) noexcept {
554     tokenBucket_.reset(nowInSeconds - tokens / rate_);
555   }
556 
557   /**
558    * Attempts to consume some number of tokens. Tokens are first added to the
559    * bucket based on the time elapsed since the last attempt to consume tokens.
560    * Note: Attempts to consume more tokens than the burst size will always
561    * fail.
562    *
563    * Thread-safe.
564    *
565    * @param toConsume The number of tokens to consume.
566    * @param nowInSeconds Current time in seconds. Should be monotonically
567    *                     increasing from the nowInSeconds specified in
568    *                     this token bucket's constructor.
569    * @return True if the rate limit check passed, false otherwise.
570    */
571   bool consume(double toConsume, double nowInSeconds = defaultClockNow()) {
572     return tokenBucket_.consume(toConsume, rate_, burstSize_, nowInSeconds);
573   }
574 
575   /**
576    * Similar to consume, but always consumes some number of tokens.  If the
577    * bucket contains enough tokens - consumes toConsume tokens.  Otherwise the
578    * bucket is drained.
579    *
580    * Thread-safe.
581    *
582    * @param toConsume The number of tokens to consume.
583    * @param nowInSeconds Current time in seconds. Should be monotonically
584    *                     increasing from the nowInSeconds specified in
585    *                     this token bucket's constructor.
586    * @return number of tokens that were consumed.
587    */
588   double consumeOrDrain(
589       double toConsume, double nowInSeconds = defaultClockNow()) {
590     return tokenBucket_.consumeOrDrain(
591         toConsume, rate_, burstSize_, nowInSeconds);
592   }
593 
594   /**
595    * Returns extra token back to the bucket.  Could be negative--it's all good.
596    */
returnTokens(double tokensToReturn)597   void returnTokens(double tokensToReturn) {
598     return tokenBucket_.returnTokens(tokensToReturn, rate_);
599   }
600 
601   /**
602    * Reserve tokens and return time to wait for in order for the reservation to
603    * be compatible with the bucket configuration.
604    */
605   Optional<double> consumeWithBorrowNonBlocking(
606       double toConsume, double nowInSeconds = defaultClockNow()) {
607     return tokenBucket_.consumeWithBorrowNonBlocking(
608         toConsume, rate_, burstSize_, nowInSeconds);
609   }
610 
611   /**
612    * Reserve tokens. Blocks if need be until reservation is satisfied.
613    */
614   bool consumeWithBorrowAndWait(
615       double toConsume, double nowInSeconds = defaultClockNow()) {
616     return tokenBucket_.consumeWithBorrowAndWait(
617         toConsume, rate_, burstSize_, nowInSeconds);
618   }
619 
620   /**
621    * Returns the number of tokens currently available.
622    *
623    * Thread-safe (but returned value may immediately be outdated).
624    */
625   double available(double nowInSeconds = defaultClockNow()) const {
626     return tokenBucket_.available(rate_, burstSize_, nowInSeconds);
627   }
628 
629   /**
630    * Returns the number of tokens generated per second.
631    *
632    * Thread-safe (but returned value may immediately be outdated).
633    */
rate()634   double rate() const noexcept { return rate_; }
635 
636   /**
637    * Returns the maximum burst size.
638    *
639    * Thread-safe (but returned value may immediately be outdated).
640    */
burst()641   double burst() const noexcept { return burstSize_; }
642 
643  private:
644   Impl tokenBucket_;
645   double rate_;
646   double burstSize_;
647 };
648 
649 using TokenBucket = BasicTokenBucket<>;
650 using DynamicTokenBucket = BasicDynamicTokenBucket<>;
651 
652 } // namespace folly
653