1/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package flowcontrol
18
19import (
20	"context"
21	"fmt"
22	"sync"
23	"testing"
24	"time"
25)
26
27func TestMultithreadedThrottling(t *testing.T) {
28	// Bucket with 100QPS and no burst
29	r := NewTokenBucketRateLimiter(100, 1)
30
31	// channel to collect 100 tokens
32	taken := make(chan bool, 100)
33
34	// Set up goroutines to hammer the throttler
35	startCh := make(chan bool)
36	endCh := make(chan bool)
37	for i := 0; i < 10; i++ {
38		go func() {
39			// wait for the starting signal
40			<-startCh
41			for {
42				// get a token
43				r.Accept()
44				select {
45				// try to add it to the taken channel
46				case taken <- true:
47					continue
48				// if taken is full, notify and return
49				default:
50					endCh <- true
51					return
52				}
53			}
54		}()
55	}
56
57	// record wall time
58	startTime := time.Now()
59	// take the initial capacity so all tokens are the result of refill
60	r.Accept()
61	// start the thundering herd
62	close(startCh)
63	// wait for the first signal that we collected 100 tokens
64	<-endCh
65	// record wall time
66	endTime := time.Now()
67
68	// tolerate a 1% clock change because these things happen
69	if duration := endTime.Sub(startTime); duration < (time.Second * 99 / 100) {
70		// We shouldn't be able to get 100 tokens out of the bucket in less than 1 second of wall clock time, no matter what
71		t.Errorf("Expected it to take at least 1 second to get 100 tokens, took %v", duration)
72	} else {
73		t.Logf("Took %v to get 100 tokens", duration)
74	}
75}
76
77func TestBasicThrottle(t *testing.T) {
78	r := NewTokenBucketRateLimiter(1, 3)
79	for i := 0; i < 3; i++ {
80		if !r.TryAccept() {
81			t.Error("unexpected false accept")
82		}
83	}
84	if r.TryAccept() {
85		t.Error("unexpected true accept")
86	}
87}
88
89func TestIncrementThrottle(t *testing.T) {
90	r := NewTokenBucketRateLimiter(1, 1)
91	if !r.TryAccept() {
92		t.Error("unexpected false accept")
93	}
94	if r.TryAccept() {
95		t.Error("unexpected true accept")
96	}
97
98	// Allow to refill
99	time.Sleep(2 * time.Second)
100
101	if !r.TryAccept() {
102		t.Error("unexpected false accept")
103	}
104}
105
106func TestThrottle(t *testing.T) {
107	r := NewTokenBucketRateLimiter(10, 5)
108
109	// Should consume 5 tokens immediately, then
110	// the remaining 11 should take at least 1 second (0.1s each)
111	expectedFinish := time.Now().Add(time.Second * 1)
112	for i := 0; i < 16; i++ {
113		r.Accept()
114	}
115	if time.Now().Before(expectedFinish) {
116		t.Error("rate limit was not respected, finished too early")
117	}
118}
119
120func TestAlwaysFake(t *testing.T) {
121	rl := NewFakeAlwaysRateLimiter()
122	if !rl.TryAccept() {
123		t.Error("TryAccept in AlwaysFake should return true.")
124	}
125	// If this will block the test will timeout
126	rl.Accept()
127}
128
129func TestNeverFake(t *testing.T) {
130	rl := NewFakeNeverRateLimiter()
131	if rl.TryAccept() {
132		t.Error("TryAccept in NeverFake should return false.")
133	}
134
135	finished := false
136	wg := sync.WaitGroup{}
137	wg.Add(1)
138	go func() {
139		rl.Accept()
140		finished = true
141		wg.Done()
142	}()
143
144	// Wait some time to make sure it never finished.
145	time.Sleep(time.Second)
146	if finished {
147		t.Error("Accept should block forever in NeverFake.")
148	}
149
150	rl.Stop()
151	wg.Wait()
152	if !finished {
153		t.Error("Stop should make Accept unblock in NeverFake.")
154	}
155}
156
157func TestWait(t *testing.T) {
158	r := NewTokenBucketRateLimiter(0.0001, 1)
159
160	ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
161	defer cancelFn()
162	if err := r.Wait(ctx); err != nil {
163		t.Errorf("unexpected wait failed, err: %v", err)
164	}
165
166	ctx2, cancelFn2 := context.WithTimeout(context.Background(), time.Second)
167	defer cancelFn2()
168	if err := r.Wait(ctx2); err == nil {
169		t.Errorf("unexpected wait success")
170	} else {
171		t.Log(fmt.Sprintf("wait err: %v", err))
172	}
173}
174
175type fakeClock struct {
176	now time.Time
177}
178
179func newFakeClock() *fakeClock {
180	return &fakeClock{
181		now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
182	}
183}
184
185func (fc *fakeClock) Now() time.Time {
186	return fc.now
187}
188
189func (fc *fakeClock) Sleep(d time.Duration) {
190	fc.now = fc.now.Add(d)
191}
192
193func TestRatePrecisionBug(t *testing.T) {
194	// golang.org/x/time/rate used to have bugs around precision and this
195	// proves that they don't recur (at least in the form we know about).  This
196	// case is specifically designed to trigger the problem after 14 seconds.
197	qps := float32(time.Second) / float32(1031425*time.Microsecond)
198	clock := newFakeClock()
199	tb := NewTokenBucketRateLimiterWithClock(qps, 1, clock)
200
201	for i := 0; i < 60; i++ {
202		if !tb.TryAccept() {
203			t.Fatalf("failed after %d seconds", i*2)
204		}
205		clock.Sleep(2 * time.Second)
206	}
207}
208