1// Copyright (c) 2017 Uber Technologies, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package utils 16 17import ( 18 "sync" 19 "time" 20) 21 22// RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits. 23// 24// TODO (breaking change) remove this interface in favor of public struct below 25// 26// Deprecated, use ReconfigurableRateLimiter. 27type RateLimiter interface { 28 CheckCredit(itemCost float64) bool 29} 30 31// ReconfigurableRateLimiter is a rate limiter based on leaky bucket algorithm, formulated in terms of a 32// credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional 33// to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost 34// of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased" 35// and the balance reduced, indicated by returned value of true. Otherwise the balance is unchanged and return false. 36// 37// This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the 38// max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message 39// to determine if the message is within the rate limit. 40// 41// It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput 42// as bytes/second, and calling CheckCredit() with the actual message size. 43// 44// TODO (breaking change) rename to RateLimiter once the interface is removed 45type ReconfigurableRateLimiter struct { 46 lock sync.Mutex 47 48 creditsPerSecond float64 49 balance float64 50 maxBalance float64 51 lastTick time.Time 52 53 timeNow func() time.Time 54} 55 56// NewRateLimiter creates a new ReconfigurableRateLimiter. 57func NewRateLimiter(creditsPerSecond, maxBalance float64) *ReconfigurableRateLimiter { 58 return &ReconfigurableRateLimiter{ 59 creditsPerSecond: creditsPerSecond, 60 balance: maxBalance, 61 maxBalance: maxBalance, 62 lastTick: time.Now(), 63 timeNow: time.Now, 64 } 65} 66 67// CheckCredit tries to reduce the current balance by itemCost provided that the current balance 68// is not lest than itemCost. 69func (rl *ReconfigurableRateLimiter) CheckCredit(itemCost float64) bool { 70 rl.lock.Lock() 71 defer rl.lock.Unlock() 72 73 // if we have enough credits to pay for current item, then reduce balance and allow 74 if rl.balance >= itemCost { 75 rl.balance -= itemCost 76 return true 77 } 78 // otherwise check if balance can be increased due to time elapsed, and try again 79 rl.updateBalance() 80 if rl.balance >= itemCost { 81 rl.balance -= itemCost 82 return true 83 } 84 return false 85} 86 87// updateBalance recalculates current balance based on time elapsed. Must be called while holding a lock. 88func (rl *ReconfigurableRateLimiter) updateBalance() { 89 // calculate how much time passed since the last tick, and update current tick 90 currentTime := rl.timeNow() 91 elapsedTime := currentTime.Sub(rl.lastTick) 92 rl.lastTick = currentTime 93 // calculate how much credit have we accumulated since the last tick 94 rl.balance += elapsedTime.Seconds() * rl.creditsPerSecond 95 if rl.balance > rl.maxBalance { 96 rl.balance = rl.maxBalance 97 } 98} 99 100// Update changes the main parameters of the rate limiter in-place, while retaining 101// the current accumulated balance (pro-rated to the new maxBalance value). Using this method 102// instead of creating a new rate limiter helps to avoid thundering herd when sampling 103// strategies are updated. 104func (rl *ReconfigurableRateLimiter) Update(creditsPerSecond, maxBalance float64) { 105 rl.lock.Lock() 106 defer rl.lock.Unlock() 107 108 rl.updateBalance() // get up to date balance 109 rl.balance = rl.balance * maxBalance / rl.maxBalance 110 rl.creditsPerSecond = creditsPerSecond 111 rl.maxBalance = maxBalance 112} 113