1// Copyright 2015 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Package rate provides a rate limiter. 6package rate 7 8import ( 9 "context" 10 "fmt" 11 "math" 12 "sync" 13 "time" 14) 15 16// Limit defines the maximum frequency of some events. 17// Limit is represented as number of events per second. 18// A zero Limit allows no events. 19type Limit float64 20 21// Inf is the infinite rate limit; it allows all events (even if burst is zero). 22const Inf = Limit(math.MaxFloat64) 23 24// Every converts a minimum time interval between events to a Limit. 25func Every(interval time.Duration) Limit { 26 if interval <= 0 { 27 return Inf 28 } 29 return 1 / Limit(interval.Seconds()) 30} 31 32// A Limiter controls how frequently events are allowed to happen. 33// It implements a "token bucket" of size b, initially full and refilled 34// at rate r tokens per second. 35// Informally, in any large enough time interval, the Limiter limits the 36// rate to r tokens per second, with a maximum burst size of b events. 37// As a special case, if r == Inf (the infinite rate), b is ignored. 38// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. 39// 40// The zero value is a valid Limiter, but it will reject all events. 41// Use NewLimiter to create non-zero Limiters. 42// 43// Limiter has three main methods, Allow, Reserve, and Wait. 44// Most callers should use Wait. 45// 46// Each of the three methods consumes a single token. 47// They differ in their behavior when no token is available. 48// If no token is available, Allow returns false. 49// If no token is available, Reserve returns a reservation for a future token 50// and the amount of time the caller must wait before using it. 51// If no token is available, Wait blocks until one can be obtained 52// or its associated context.Context is canceled. 53// 54// The methods AllowN, ReserveN, and WaitN consume n tokens. 55type Limiter struct { 56 mu sync.Mutex 57 limit Limit 58 burst int 59 tokens float64 60 // last is the last time the limiter's tokens field was updated 61 last time.Time 62 // lastEvent is the latest time of a rate-limited event (past or future) 63 lastEvent time.Time 64} 65 66// Limit returns the maximum overall event rate. 67func (lim *Limiter) Limit() Limit { 68 lim.mu.Lock() 69 defer lim.mu.Unlock() 70 return lim.limit 71} 72 73// Burst returns the maximum burst size. Burst is the maximum number of tokens 74// that can be consumed in a single call to Allow, Reserve, or Wait, so higher 75// Burst values allow more events to happen at once. 76// A zero Burst allows no events, unless limit == Inf. 77func (lim *Limiter) Burst() int { 78 lim.mu.Lock() 79 defer lim.mu.Unlock() 80 return lim.burst 81} 82 83// NewLimiter returns a new Limiter that allows events up to rate r and permits 84// bursts of at most b tokens. 85func NewLimiter(r Limit, b int) *Limiter { 86 return &Limiter{ 87 limit: r, 88 burst: b, 89 } 90} 91 92// Allow is shorthand for AllowN(time.Now(), 1). 93func (lim *Limiter) Allow() bool { 94 return lim.AllowN(time.Now(), 1) 95} 96 97// AllowN reports whether n events may happen at time now. 98// Use this method if you intend to drop / skip events that exceed the rate limit. 99// Otherwise use Reserve or Wait. 100func (lim *Limiter) AllowN(now time.Time, n int) bool { 101 return lim.reserveN(now, n, 0).ok 102} 103 104// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. 105// A Reservation may be canceled, which may enable the Limiter to permit additional events. 106type Reservation struct { 107 ok bool 108 lim *Limiter 109 tokens int 110 timeToAct time.Time 111 // This is the Limit at reservation time, it can change later. 112 limit Limit 113} 114 115// OK returns whether the limiter can provide the requested number of tokens 116// within the maximum wait time. If OK is false, Delay returns InfDuration, and 117// Cancel does nothing. 118func (r *Reservation) OK() bool { 119 return r.ok 120} 121 122// Delay is shorthand for DelayFrom(time.Now()). 123func (r *Reservation) Delay() time.Duration { 124 return r.DelayFrom(time.Now()) 125} 126 127// InfDuration is the duration returned by Delay when a Reservation is not OK. 128const InfDuration = time.Duration(1<<63 - 1) 129 130// DelayFrom returns the duration for which the reservation holder must wait 131// before taking the reserved action. Zero duration means act immediately. 132// InfDuration means the limiter cannot grant the tokens requested in this 133// Reservation within the maximum wait time. 134func (r *Reservation) DelayFrom(now time.Time) time.Duration { 135 if !r.ok { 136 return InfDuration 137 } 138 delay := r.timeToAct.Sub(now) 139 if delay < 0 { 140 return 0 141 } 142 return delay 143} 144 145// Cancel is shorthand for CancelAt(time.Now()). 146func (r *Reservation) Cancel() { 147 r.CancelAt(time.Now()) 148 return 149} 150 151// CancelAt indicates that the reservation holder will not perform the reserved action 152// and reverses the effects of this Reservation on the rate limit as much as possible, 153// considering that other reservations may have already been made. 154func (r *Reservation) CancelAt(now time.Time) { 155 if !r.ok { 156 return 157 } 158 159 r.lim.mu.Lock() 160 defer r.lim.mu.Unlock() 161 162 if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { 163 return 164 } 165 166 // calculate tokens to restore 167 // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved 168 // after r was obtained. These tokens should not be restored. 169 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) 170 if restoreTokens <= 0 { 171 return 172 } 173 // advance time to now 174 now, _, tokens := r.lim.advance(now) 175 // calculate new number of tokens 176 tokens += restoreTokens 177 if burst := float64(r.lim.burst); tokens > burst { 178 tokens = burst 179 } 180 // update state 181 r.lim.last = now 182 r.lim.tokens = tokens 183 if r.timeToAct == r.lim.lastEvent { 184 prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) 185 if !prevEvent.Before(now) { 186 r.lim.lastEvent = prevEvent 187 } 188 } 189 190 return 191} 192 193// Reserve is shorthand for ReserveN(time.Now(), 1). 194func (lim *Limiter) Reserve() *Reservation { 195 return lim.ReserveN(time.Now(), 1) 196} 197 198// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. 199// The Limiter takes this Reservation into account when allowing future events. 200// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size. 201// Usage example: 202// r := lim.ReserveN(time.Now(), 1) 203// if !r.OK() { 204// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? 205// return 206// } 207// time.Sleep(r.Delay()) 208// Act() 209// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. 210// If you need to respect a deadline or cancel the delay, use Wait instead. 211// To drop or skip events exceeding rate limit, use Allow instead. 212func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { 213 r := lim.reserveN(now, n, InfDuration) 214 return &r 215} 216 217// Wait is shorthand for WaitN(ctx, 1). 218func (lim *Limiter) Wait(ctx context.Context) (err error) { 219 return lim.WaitN(ctx, 1) 220} 221 222// WaitN blocks until lim permits n events to happen. 223// It returns an error if n exceeds the Limiter's burst size, the Context is 224// canceled, or the expected wait time exceeds the Context's Deadline. 225// The burst limit is ignored if the rate limit is Inf. 226func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { 227 lim.mu.Lock() 228 burst := lim.burst 229 limit := lim.limit 230 lim.mu.Unlock() 231 232 if n > burst && limit != Inf { 233 return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) 234 } 235 // Check if ctx is already cancelled 236 select { 237 case <-ctx.Done(): 238 return ctx.Err() 239 default: 240 } 241 // Determine wait limit 242 now := time.Now() 243 waitLimit := InfDuration 244 if deadline, ok := ctx.Deadline(); ok { 245 waitLimit = deadline.Sub(now) 246 } 247 // Reserve 248 r := lim.reserveN(now, n, waitLimit) 249 if !r.ok { 250 return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) 251 } 252 // Wait if necessary 253 delay := r.DelayFrom(now) 254 if delay == 0 { 255 return nil 256 } 257 t := time.NewTimer(delay) 258 defer t.Stop() 259 select { 260 case <-t.C: 261 // We can proceed. 262 return nil 263 case <-ctx.Done(): 264 // Context was canceled before we could proceed. Cancel the 265 // reservation, which may permit other events to proceed sooner. 266 r.Cancel() 267 return ctx.Err() 268 } 269} 270 271// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). 272func (lim *Limiter) SetLimit(newLimit Limit) { 273 lim.SetLimitAt(time.Now(), newLimit) 274} 275 276// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated 277// or underutilized by those which reserved (using Reserve or Wait) but did not yet act 278// before SetLimitAt was called. 279func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { 280 lim.mu.Lock() 281 defer lim.mu.Unlock() 282 283 now, _, tokens := lim.advance(now) 284 285 lim.last = now 286 lim.tokens = tokens 287 lim.limit = newLimit 288} 289 290// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). 291func (lim *Limiter) SetBurst(newBurst int) { 292 lim.SetBurstAt(time.Now(), newBurst) 293} 294 295// SetBurstAt sets a new burst size for the limiter. 296func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) { 297 lim.mu.Lock() 298 defer lim.mu.Unlock() 299 300 now, _, tokens := lim.advance(now) 301 302 lim.last = now 303 lim.tokens = tokens 304 lim.burst = newBurst 305} 306 307// reserveN is a helper method for AllowN, ReserveN, and WaitN. 308// maxFutureReserve specifies the maximum reservation wait duration allowed. 309// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. 310func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { 311 lim.mu.Lock() 312 313 if lim.limit == Inf { 314 lim.mu.Unlock() 315 return Reservation{ 316 ok: true, 317 lim: lim, 318 tokens: n, 319 timeToAct: now, 320 } 321 } 322 323 now, last, tokens := lim.advance(now) 324 325 // Calculate the remaining number of tokens resulting from the request. 326 tokens -= float64(n) 327 328 // Calculate the wait duration 329 var waitDuration time.Duration 330 if tokens < 0 { 331 waitDuration = lim.limit.durationFromTokens(-tokens) 332 } 333 334 // Decide result 335 ok := n <= lim.burst && waitDuration <= maxFutureReserve 336 337 // Prepare reservation 338 r := Reservation{ 339 ok: ok, 340 lim: lim, 341 limit: lim.limit, 342 } 343 if ok { 344 r.tokens = n 345 r.timeToAct = now.Add(waitDuration) 346 } 347 348 // Update state 349 if ok { 350 lim.last = now 351 lim.tokens = tokens 352 lim.lastEvent = r.timeToAct 353 } else { 354 lim.last = last 355 } 356 357 lim.mu.Unlock() 358 return r 359} 360 361// advance calculates and returns an updated state for lim resulting from the passage of time. 362// lim is not changed. 363// advance requires that lim.mu is held. 364func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { 365 last := lim.last 366 if now.Before(last) { 367 last = now 368 } 369 370 // Avoid making delta overflow below when last is very old. 371 maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) 372 elapsed := now.Sub(last) 373 if elapsed > maxElapsed { 374 elapsed = maxElapsed 375 } 376 377 // Calculate the new number of tokens, due to time that passed. 378 delta := lim.limit.tokensFromDuration(elapsed) 379 tokens := lim.tokens + delta 380 if burst := float64(lim.burst); tokens > burst { 381 tokens = burst 382 } 383 384 return now, last, tokens 385} 386 387// durationFromTokens is a unit conversion function from the number of tokens to the duration 388// of time it takes to accumulate them at a rate of limit tokens per second. 389func (limit Limit) durationFromTokens(tokens float64) time.Duration { 390 seconds := tokens / float64(limit) 391 return time.Nanosecond * time.Duration(1e9*seconds) 392} 393 394// tokensFromDuration is a unit conversion function from a time duration to the number of tokens 395// which could be accumulated during that duration at a rate of limit tokens per second. 396func (limit Limit) tokensFromDuration(d time.Duration) float64 { 397 // Split the integer and fractional parts ourself to minimize rounding errors. 398 // See golang.org/issues/34861. 399 sec := float64(d/time.Second) * float64(limit) 400 nsec := float64(d%time.Second) * float64(limit) 401 return sec + nsec/1e9 402} 403