1/* 2Copyright 2014 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package flowcontrol 18 19import ( 20 "sync" 21 "time" 22 23 "golang.org/x/time/rate" 24) 25 26type RateLimiter interface { 27 // TryAccept returns true if a token is taken immediately. Otherwise, 28 // it returns false. 29 TryAccept() bool 30 // Accept returns once a token becomes available. 31 Accept() 32 // Stop stops the rate limiter, subsequent calls to CanAccept will return false 33 Stop() 34 // QPS returns QPS of this rate limiter 35 QPS() float32 36} 37 38type tokenBucketRateLimiter struct { 39 limiter *rate.Limiter 40 clock Clock 41 qps float32 42} 43 44// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. 45// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a 46// smoothed qps rate of 'qps'. 47// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. 48// The maximum number of tokens in the bucket is capped at 'burst'. 49func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter { 50 limiter := rate.NewLimiter(rate.Limit(qps), burst) 51 return newTokenBucketRateLimiter(limiter, realClock{}, qps) 52} 53 54// An injectable, mockable clock interface. 55type Clock interface { 56 Now() time.Time 57 Sleep(time.Duration) 58} 59 60type realClock struct{} 61 62func (realClock) Now() time.Time { 63 return time.Now() 64} 65func (realClock) Sleep(d time.Duration) { 66 time.Sleep(d) 67} 68 69// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter 70// but allows an injectable clock, for testing. 71func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter { 72 limiter := rate.NewLimiter(rate.Limit(qps), burst) 73 return newTokenBucketRateLimiter(limiter, c, qps) 74} 75 76func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter { 77 return &tokenBucketRateLimiter{ 78 limiter: limiter, 79 clock: c, 80 qps: qps, 81 } 82} 83 84func (t *tokenBucketRateLimiter) TryAccept() bool { 85 return t.limiter.AllowN(t.clock.Now(), 1) 86} 87 88// Accept will block until a token becomes available 89func (t *tokenBucketRateLimiter) Accept() { 90 now := t.clock.Now() 91 t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now)) 92} 93 94func (t *tokenBucketRateLimiter) Stop() { 95} 96 97func (t *tokenBucketRateLimiter) QPS() float32 { 98 return t.qps 99} 100 101type fakeAlwaysRateLimiter struct{} 102 103func NewFakeAlwaysRateLimiter() RateLimiter { 104 return &fakeAlwaysRateLimiter{} 105} 106 107func (t *fakeAlwaysRateLimiter) TryAccept() bool { 108 return true 109} 110 111func (t *fakeAlwaysRateLimiter) Stop() {} 112 113func (t *fakeAlwaysRateLimiter) Accept() {} 114 115func (t *fakeAlwaysRateLimiter) QPS() float32 { 116 return 1 117} 118 119type fakeNeverRateLimiter struct { 120 wg sync.WaitGroup 121} 122 123func NewFakeNeverRateLimiter() RateLimiter { 124 rl := fakeNeverRateLimiter{} 125 rl.wg.Add(1) 126 return &rl 127} 128 129func (t *fakeNeverRateLimiter) TryAccept() bool { 130 return false 131} 132 133func (t *fakeNeverRateLimiter) Stop() { 134 t.wg.Done() 135} 136 137func (t *fakeNeverRateLimiter) Accept() { 138 t.wg.Wait() 139} 140 141func (t *fakeNeverRateLimiter) QPS() float32 { 142 return 1 143} 144