1// Copyright 2015 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5// Package rate provides a rate limiter. 6package rate 7 8import ( 9 "fmt" 10 "math" 11 "sync" 12 "time" 13) 14 15// Limit defines the maximum frequency of some events. 16// Limit is represented as number of events per second. 17// A zero Limit allows no events. 18type Limit float64 19 20// Inf is the infinite rate limit; it allows all events (even if burst is zero). 21const Inf = Limit(math.MaxFloat64) 22 23// Every converts a minimum time interval between events to a Limit. 24func Every(interval time.Duration) Limit { 25 if interval <= 0 { 26 return Inf 27 } 28 return 1 / Limit(interval.Seconds()) 29} 30 31// A Limiter controls how frequently events are allowed to happen. 32// It implements a "token bucket" of size b, initially full and refilled 33// at rate r tokens per second. 34// Informally, in any large enough time interval, the Limiter limits the 35// rate to r tokens per second, with a maximum burst size of b events. 36// As a special case, if r == Inf (the infinite rate), b is ignored. 37// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. 38// 39// The zero value is a valid Limiter, but it will reject all events. 40// Use NewLimiter to create non-zero Limiters. 41// 42// Limiter has three main methods, Allow, Reserve, and Wait. 43// Most callers should use Wait. 44// 45// Each of the three methods consumes a single token. 46// They differ in their behavior when no token is available. 47// If no token is available, Allow returns false. 48// If no token is available, Reserve returns a reservation for a future token 49// and the amount of time the caller must wait before using it. 50// If no token is available, Wait blocks until one can be obtained 51// or its associated context.Context is canceled. 52// 53// The methods AllowN, ReserveN, and WaitN consume n tokens. 54type Limiter struct { 55 limit Limit 56 burst int 57 58 mu sync.Mutex 59 tokens float64 60 // last is the last time the limiter's tokens field was updated 61 last time.Time 62 // lastEvent is the latest time of a rate-limited event (past or future) 63 lastEvent time.Time 64} 65 66// Limit returns the maximum overall event rate. 67func (lim *Limiter) Limit() Limit { 68 lim.mu.Lock() 69 defer lim.mu.Unlock() 70 return lim.limit 71} 72 73// Burst returns the maximum burst size. Burst is the maximum number of tokens 74// that can be consumed in a single call to Allow, Reserve, or Wait, so higher 75// Burst values allow more events to happen at once. 76// A zero Burst allows no events, unless limit == Inf. 77func (lim *Limiter) Burst() int { 78 return lim.burst 79} 80 81// NewLimiter returns a new Limiter that allows events up to rate r and permits 82// bursts of at most b tokens. 83func NewLimiter(r Limit, b int) *Limiter { 84 return &Limiter{ 85 limit: r, 86 burst: b, 87 } 88} 89 90// Allow is shorthand for AllowN(time.Now(), 1). 91func (lim *Limiter) Allow() bool { 92 return lim.AllowN(time.Now(), 1) 93} 94 95// AllowN reports whether n events may happen at time now. 96// Use this method if you intend to drop / skip events that exceed the rate limit. 97// Otherwise use Reserve or Wait. 98func (lim *Limiter) AllowN(now time.Time, n int) bool { 99 return lim.reserveN(now, n, 0).ok 100} 101 102// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. 103// A Reservation may be canceled, which may enable the Limiter to permit additional events. 104type Reservation struct { 105 ok bool 106 lim *Limiter 107 tokens int 108 timeToAct time.Time 109 // This is the Limit at reservation time, it can change later. 110 limit Limit 111} 112 113// OK returns whether the limiter can provide the requested number of tokens 114// within the maximum wait time. If OK is false, Delay returns InfDuration, and 115// Cancel does nothing. 116func (r *Reservation) OK() bool { 117 return r.ok 118} 119 120// Delay is shorthand for DelayFrom(time.Now()). 121func (r *Reservation) Delay() time.Duration { 122 return r.DelayFrom(time.Now()) 123} 124 125// InfDuration is the duration returned by Delay when a Reservation is not OK. 126const InfDuration = time.Duration(1<<63 - 1) 127 128// DelayFrom returns the duration for which the reservation holder must wait 129// before taking the reserved action. Zero duration means act immediately. 130// InfDuration means the limiter cannot grant the tokens requested in this 131// Reservation within the maximum wait time. 132func (r *Reservation) DelayFrom(now time.Time) time.Duration { 133 if !r.ok { 134 return InfDuration 135 } 136 delay := r.timeToAct.Sub(now) 137 if delay < 0 { 138 return 0 139 } 140 return delay 141} 142 143// Cancel is shorthand for CancelAt(time.Now()). 144func (r *Reservation) Cancel() { 145 r.CancelAt(time.Now()) 146 return 147} 148 149// CancelAt indicates that the reservation holder will not perform the reserved action 150// and reverses the effects of this Reservation on the rate limit as much as possible, 151// considering that other reservations may have already been made. 152func (r *Reservation) CancelAt(now time.Time) { 153 if !r.ok { 154 return 155 } 156 157 r.lim.mu.Lock() 158 defer r.lim.mu.Unlock() 159 160 if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { 161 return 162 } 163 164 // calculate tokens to restore 165 // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved 166 // after r was obtained. These tokens should not be restored. 167 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) 168 if restoreTokens <= 0 { 169 return 170 } 171 // advance time to now 172 now, _, tokens := r.lim.advance(now) 173 // calculate new number of tokens 174 tokens += restoreTokens 175 if burst := float64(r.lim.burst); tokens > burst { 176 tokens = burst 177 } 178 // update state 179 r.lim.last = now 180 r.lim.tokens = tokens 181 if r.timeToAct == r.lim.lastEvent { 182 prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) 183 if !prevEvent.Before(now) { 184 r.lim.lastEvent = prevEvent 185 } 186 } 187 188 return 189} 190 191// Reserve is shorthand for ReserveN(time.Now(), 1). 192func (lim *Limiter) Reserve() *Reservation { 193 return lim.ReserveN(time.Now(), 1) 194} 195 196// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. 197// The Limiter takes this Reservation into account when allowing future events. 198// ReserveN returns false if n exceeds the Limiter's burst size. 199// Usage example: 200// r := lim.ReserveN(time.Now(), 1) 201// if !r.OK() { 202// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? 203// return 204// } 205// time.Sleep(r.Delay()) 206// Act() 207// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. 208// If you need to respect a deadline or cancel the delay, use Wait instead. 209// To drop or skip events exceeding rate limit, use Allow instead. 210func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { 211 r := lim.reserveN(now, n, InfDuration) 212 return &r 213} 214 215// contextContext is a temporary(?) copy of the context.Context type 216// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+ 217// with the built-in context package. If people ever stop using Go 1.6 218// we can remove this. 219type contextContext interface { 220 Deadline() (deadline time.Time, ok bool) 221 Done() <-chan struct{} 222 Err() error 223 Value(key interface{}) interface{} 224} 225 226// Wait is shorthand for WaitN(ctx, 1). 227func (lim *Limiter) wait(ctx contextContext) (err error) { 228 return lim.WaitN(ctx, 1) 229} 230 231// WaitN blocks until lim permits n events to happen. 232// It returns an error if n exceeds the Limiter's burst size, the Context is 233// canceled, or the expected wait time exceeds the Context's Deadline. 234// The burst limit is ignored if the rate limit is Inf. 235func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { 236 if n > lim.burst && lim.limit != Inf { 237 return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) 238 } 239 // Check if ctx is already cancelled 240 select { 241 case <-ctx.Done(): 242 return ctx.Err() 243 default: 244 } 245 // Determine wait limit 246 now := time.Now() 247 waitLimit := InfDuration 248 if deadline, ok := ctx.Deadline(); ok { 249 waitLimit = deadline.Sub(now) 250 } 251 // Reserve 252 r := lim.reserveN(now, n, waitLimit) 253 if !r.ok { 254 return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) 255 } 256 // Wait if necessary 257 delay := r.DelayFrom(now) 258 if delay == 0 { 259 return nil 260 } 261 t := time.NewTimer(delay) 262 defer t.Stop() 263 select { 264 case <-t.C: 265 // We can proceed. 266 return nil 267 case <-ctx.Done(): 268 // Context was canceled before we could proceed. Cancel the 269 // reservation, which may permit other events to proceed sooner. 270 r.Cancel() 271 return ctx.Err() 272 } 273} 274 275// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). 276func (lim *Limiter) SetLimit(newLimit Limit) { 277 lim.SetLimitAt(time.Now(), newLimit) 278} 279 280// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated 281// or underutilized by those which reserved (using Reserve or Wait) but did not yet act 282// before SetLimitAt was called. 283func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { 284 lim.mu.Lock() 285 defer lim.mu.Unlock() 286 287 now, _, tokens := lim.advance(now) 288 289 lim.last = now 290 lim.tokens = tokens 291 lim.limit = newLimit 292} 293 294// reserveN is a helper method for AllowN, ReserveN, and WaitN. 295// maxFutureReserve specifies the maximum reservation wait duration allowed. 296// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. 297func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { 298 lim.mu.Lock() 299 300 if lim.limit == Inf { 301 lim.mu.Unlock() 302 return Reservation{ 303 ok: true, 304 lim: lim, 305 tokens: n, 306 timeToAct: now, 307 } 308 } 309 310 now, last, tokens := lim.advance(now) 311 312 // Calculate the remaining number of tokens resulting from the request. 313 tokens -= float64(n) 314 315 // Calculate the wait duration 316 var waitDuration time.Duration 317 if tokens < 0 { 318 waitDuration = lim.limit.durationFromTokens(-tokens) 319 } 320 321 // Decide result 322 ok := n <= lim.burst && waitDuration <= maxFutureReserve 323 324 // Prepare reservation 325 r := Reservation{ 326 ok: ok, 327 lim: lim, 328 limit: lim.limit, 329 } 330 if ok { 331 r.tokens = n 332 r.timeToAct = now.Add(waitDuration) 333 } 334 335 // Update state 336 if ok { 337 lim.last = now 338 lim.tokens = tokens 339 lim.lastEvent = r.timeToAct 340 } else { 341 lim.last = last 342 } 343 344 lim.mu.Unlock() 345 return r 346} 347 348// advance calculates and returns an updated state for lim resulting from the passage of time. 349// lim is not changed. 350func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { 351 last := lim.last 352 if now.Before(last) { 353 last = now 354 } 355 356 // Avoid making delta overflow below when last is very old. 357 maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) 358 elapsed := now.Sub(last) 359 if elapsed > maxElapsed { 360 elapsed = maxElapsed 361 } 362 363 // Calculate the new number of tokens, due to time that passed. 364 delta := lim.limit.tokensFromDuration(elapsed) 365 tokens := lim.tokens + delta 366 if burst := float64(lim.burst); tokens > burst { 367 tokens = burst 368 } 369 370 return now, last, tokens 371} 372 373// durationFromTokens is a unit conversion function from the number of tokens to the duration 374// of time it takes to accumulate them at a rate of limit tokens per second. 375func (limit Limit) durationFromTokens(tokens float64) time.Duration { 376 seconds := tokens / float64(limit) 377 return time.Nanosecond * time.Duration(1e9*seconds) 378} 379 380// tokensFromDuration is a unit conversion function from a time duration to the number of tokens 381// which could be accumulated during that duration at a rate of limit tokens per second. 382func (limit Limit) tokensFromDuration(d time.Duration) float64 { 383 return d.Seconds() * float64(limit) 384} 385