1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// +build go1.7
6
7package rate
8
9import (
10	"context"
11	"math"
12	"runtime"
13	"sync"
14	"sync/atomic"
15	"testing"
16	"time"
17)
18
19func TestLimit(t *testing.T) {
20	if Limit(10) == Inf {
21		t.Errorf("Limit(10) == Inf should be false")
22	}
23}
24
25func closeEnough(a, b Limit) bool {
26	return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9
27}
28
29func TestEvery(t *testing.T) {
30	cases := []struct {
31		interval time.Duration
32		lim      Limit
33	}{
34		{0, Inf},
35		{-1, Inf},
36		{1 * time.Nanosecond, Limit(1e9)},
37		{1 * time.Microsecond, Limit(1e6)},
38		{1 * time.Millisecond, Limit(1e3)},
39		{10 * time.Millisecond, Limit(100)},
40		{100 * time.Millisecond, Limit(10)},
41		{1 * time.Second, Limit(1)},
42		{2 * time.Second, Limit(0.5)},
43		{time.Duration(2.5 * float64(time.Second)), Limit(0.4)},
44		{4 * time.Second, Limit(0.25)},
45		{10 * time.Second, Limit(0.1)},
46		{time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))},
47	}
48	for _, tc := range cases {
49		lim := Every(tc.interval)
50		if !closeEnough(lim, tc.lim) {
51			t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim)
52		}
53	}
54}
55
56const (
57	d = 100 * time.Millisecond
58)
59
60var (
61	t0 = time.Now()
62	t1 = t0.Add(time.Duration(1) * d)
63	t2 = t0.Add(time.Duration(2) * d)
64	t3 = t0.Add(time.Duration(3) * d)
65	t4 = t0.Add(time.Duration(4) * d)
66	t5 = t0.Add(time.Duration(5) * d)
67	t9 = t0.Add(time.Duration(9) * d)
68)
69
70type allow struct {
71	t  time.Time
72	n  int
73	ok bool
74}
75
76func run(t *testing.T, lim *Limiter, allows []allow) {
77	for i, allow := range allows {
78		ok := lim.AllowN(allow.t, allow.n)
79		if ok != allow.ok {
80			t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v",
81				i, allow.t, allow.n, ok, allow.ok)
82		}
83	}
84}
85
86func TestLimiterBurst1(t *testing.T) {
87	run(t, NewLimiter(10, 1), []allow{
88		{t0, 1, true},
89		{t0, 1, false},
90		{t0, 1, false},
91		{t1, 1, true},
92		{t1, 1, false},
93		{t1, 1, false},
94		{t2, 2, false}, // burst size is 1, so n=2 always fails
95		{t2, 1, true},
96		{t2, 1, false},
97	})
98}
99
100func TestLimiterBurst3(t *testing.T) {
101	run(t, NewLimiter(10, 3), []allow{
102		{t0, 2, true},
103		{t0, 2, false},
104		{t0, 1, true},
105		{t0, 1, false},
106		{t1, 4, false},
107		{t2, 1, true},
108		{t3, 1, true},
109		{t4, 1, true},
110		{t4, 1, true},
111		{t4, 1, false},
112		{t4, 1, false},
113		{t9, 3, true},
114		{t9, 0, true},
115	})
116}
117
118func TestLimiterJumpBackwards(t *testing.T) {
119	run(t, NewLimiter(10, 3), []allow{
120		{t1, 1, true}, // start at t1
121		{t0, 1, true}, // jump back to t0, two tokens remain
122		{t0, 1, true},
123		{t0, 1, false},
124		{t0, 1, false},
125		{t1, 1, true}, // got a token
126		{t1, 1, false},
127		{t1, 1, false},
128		{t2, 1, true}, // got another token
129		{t2, 1, false},
130		{t2, 1, false},
131	})
132}
133
134func TestSimultaneousRequests(t *testing.T) {
135	const (
136		limit       = 1
137		burst       = 5
138		numRequests = 15
139	)
140	var (
141		wg    sync.WaitGroup
142		numOK = uint32(0)
143	)
144
145	// Very slow replenishing bucket.
146	lim := NewLimiter(limit, burst)
147
148	// Tries to take a token, atomically updates the counter and decreases the wait
149	// group counter.
150	f := func() {
151		defer wg.Done()
152		if ok := lim.Allow(); ok {
153			atomic.AddUint32(&numOK, 1)
154		}
155	}
156
157	wg.Add(numRequests)
158	for i := 0; i < numRequests; i++ {
159		go f()
160	}
161	wg.Wait()
162	if numOK != burst {
163		t.Errorf("numOK = %d, want %d", numOK, burst)
164	}
165}
166
167func TestLongRunningQPS(t *testing.T) {
168	if testing.Short() {
169		t.Skip("skipping in short mode")
170	}
171	if runtime.GOOS == "openbsd" {
172		t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)")
173		return
174	}
175
176	// The test runs for a few seconds executing many requests and then checks
177	// that overall number of requests is reasonable.
178	const (
179		limit = 100
180		burst = 100
181	)
182	var numOK = int32(0)
183
184	lim := NewLimiter(limit, burst)
185
186	var wg sync.WaitGroup
187	f := func() {
188		if ok := lim.Allow(); ok {
189			atomic.AddInt32(&numOK, 1)
190		}
191		wg.Done()
192	}
193
194	start := time.Now()
195	end := start.Add(5 * time.Second)
196	for time.Now().Before(end) {
197		wg.Add(1)
198		go f()
199
200		// This will still offer ~500 requests per second, but won't consume
201		// outrageous amount of CPU.
202		time.Sleep(2 * time.Millisecond)
203	}
204	wg.Wait()
205	elapsed := time.Since(start)
206	ideal := burst + (limit * float64(elapsed) / float64(time.Second))
207
208	// We should never get more requests than allowed.
209	if want := int32(ideal + 1); numOK > want {
210		t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
211	}
212	// We should get very close to the number of requests allowed.
213	if want := int32(0.999 * ideal); numOK < want {
214		t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
215	}
216}
217
218type request struct {
219	t   time.Time
220	n   int
221	act time.Time
222	ok  bool
223}
224
225// dFromDuration converts a duration to a multiple of the global constant d
226func dFromDuration(dur time.Duration) int {
227	// Adding a millisecond to be swallowed by the integer division
228	// because we don't care about small inaccuracies
229	return int((dur + time.Millisecond) / d)
230}
231
232// dSince returns multiples of d since t0
233func dSince(t time.Time) int {
234	return dFromDuration(t.Sub(t0))
235}
236
237func runReserve(t *testing.T, lim *Limiter, req request) *Reservation {
238	return runReserveMax(t, lim, req, InfDuration)
239}
240
241func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation {
242	r := lim.reserveN(req.t, req.n, maxReserve)
243	if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok {
244		t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)",
245			dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok)
246	}
247	return &r
248}
249
250func TestSimpleReserve(t *testing.T) {
251	lim := NewLimiter(10, 2)
252
253	runReserve(t, lim, request{t0, 2, t0, true})
254	runReserve(t, lim, request{t0, 2, t2, true})
255	runReserve(t, lim, request{t3, 2, t4, true})
256}
257
258func TestMix(t *testing.T) {
259	lim := NewLimiter(10, 2)
260
261	runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst
262	runReserve(t, lim, request{t0, 2, t0, true})
263	run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow
264	runReserve(t, lim, request{t1, 2, t2, true})
265	run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow
266	run(t, lim, []allow{{t3, 1, true}})
267}
268
269func TestCancelInvalid(t *testing.T) {
270	lim := NewLimiter(10, 2)
271
272	runReserve(t, lim, request{t0, 2, t0, true})
273	r := runReserve(t, lim, request{t0, 3, t3, false})
274	r.CancelAt(t0)                               // should have no effect
275	runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens
276}
277
278func TestCancelLast(t *testing.T) {
279	lim := NewLimiter(10, 2)
280
281	runReserve(t, lim, request{t0, 2, t0, true})
282	r := runReserve(t, lim, request{t0, 2, t2, true})
283	r.CancelAt(t1) // got 2 tokens back
284	runReserve(t, lim, request{t1, 2, t2, true})
285}
286
287func TestCancelTooLate(t *testing.T) {
288	lim := NewLimiter(10, 2)
289
290	runReserve(t, lim, request{t0, 2, t0, true})
291	r := runReserve(t, lim, request{t0, 2, t2, true})
292	r.CancelAt(t3) // too late to cancel - should have no effect
293	runReserve(t, lim, request{t3, 2, t4, true})
294}
295
296func TestCancel0Tokens(t *testing.T) {
297	lim := NewLimiter(10, 2)
298
299	runReserve(t, lim, request{t0, 2, t0, true})
300	r := runReserve(t, lim, request{t0, 1, t1, true})
301	runReserve(t, lim, request{t0, 1, t2, true})
302	r.CancelAt(t0) // got 0 tokens back
303	runReserve(t, lim, request{t0, 1, t3, true})
304}
305
306func TestCancel1Token(t *testing.T) {
307	lim := NewLimiter(10, 2)
308
309	runReserve(t, lim, request{t0, 2, t0, true})
310	r := runReserve(t, lim, request{t0, 2, t2, true})
311	runReserve(t, lim, request{t0, 1, t3, true})
312	r.CancelAt(t2) // got 1 token back
313	runReserve(t, lim, request{t2, 2, t4, true})
314}
315
316func TestCancelMulti(t *testing.T) {
317	lim := NewLimiter(10, 4)
318
319	runReserve(t, lim, request{t0, 4, t0, true})
320	rA := runReserve(t, lim, request{t0, 3, t3, true})
321	runReserve(t, lim, request{t0, 1, t4, true})
322	rC := runReserve(t, lim, request{t0, 1, t5, true})
323	rC.CancelAt(t1) // get 1 token back
324	rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved
325	runReserve(t, lim, request{t1, 3, t5, true})
326}
327
328func TestReserveJumpBack(t *testing.T) {
329	lim := NewLimiter(10, 2)
330
331	runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
332	runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst
333	runReserve(t, lim, request{t2, 2, t3, true})
334}
335
336func TestReserveJumpBackCancel(t *testing.T) {
337	lim := NewLimiter(10, 2)
338
339	runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
340	r := runReserve(t, lim, request{t1, 2, t3, true})
341	runReserve(t, lim, request{t1, 1, t4, true})
342	r.CancelAt(t0)                               // cancel at t0, get 1 token back
343	runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst
344}
345
346func TestReserveSetLimit(t *testing.T) {
347	lim := NewLimiter(5, 2)
348
349	runReserve(t, lim, request{t0, 2, t0, true})
350	runReserve(t, lim, request{t0, 2, t4, true})
351	lim.SetLimitAt(t2, 10)
352	runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst
353}
354
355func TestReserveSetBurst(t *testing.T) {
356	lim := NewLimiter(5, 2)
357
358	runReserve(t, lim, request{t0, 2, t0, true})
359	runReserve(t, lim, request{t0, 2, t4, true})
360	lim.SetBurstAt(t3, 4)
361	runReserve(t, lim, request{t0, 4, t9, true}) // violates Limit and Burst
362}
363
364func TestReserveSetLimitCancel(t *testing.T) {
365	lim := NewLimiter(5, 2)
366
367	runReserve(t, lim, request{t0, 2, t0, true})
368	r := runReserve(t, lim, request{t0, 2, t4, true})
369	lim.SetLimitAt(t2, 10)
370	r.CancelAt(t2) // 2 tokens back
371	runReserve(t, lim, request{t2, 2, t3, true})
372}
373
374func TestReserveMax(t *testing.T) {
375	lim := NewLimiter(10, 2)
376	maxT := d
377
378	runReserveMax(t, lim, request{t0, 2, t0, true}, maxT)
379	runReserveMax(t, lim, request{t0, 1, t1, true}, maxT)  // reserve for close future
380	runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future
381}
382
383type wait struct {
384	name   string
385	ctx    context.Context
386	n      int
387	delay  int // in multiples of d
388	nilErr bool
389}
390
391func runWait(t *testing.T, lim *Limiter, w wait) {
392	start := time.Now()
393	err := lim.WaitN(w.ctx, w.n)
394	delay := time.Now().Sub(start)
395	if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) {
396		errString := "<nil>"
397		if !w.nilErr {
398			errString = "<non-nil error>"
399		}
400		t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v",
401			w.name, w.n, err, delay, errString, d*time.Duration(w.delay))
402	}
403}
404
405func TestWaitSimple(t *testing.T) {
406	lim := NewLimiter(10, 3)
407
408	ctx, cancel := context.WithCancel(context.Background())
409	cancel()
410	runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false})
411
412	runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false})
413
414	runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true})
415	runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true})
416}
417
418func TestWaitCancel(t *testing.T) {
419	lim := NewLimiter(10, 3)
420
421	ctx, cancel := context.WithCancel(context.Background())
422	runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1
423	go func() {
424		time.Sleep(d)
425		cancel()
426	}()
427	runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false})
428	// should get 3 tokens back, and have lim.tokens = 2
429	t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent)
430	runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true})
431}
432
433func TestWaitTimeout(t *testing.T) {
434	lim := NewLimiter(10, 3)
435
436	ctx, cancel := context.WithTimeout(context.Background(), d)
437	defer cancel()
438	runWait(t, lim, wait{"act-now", ctx, 2, 0, true})
439	runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false})
440}
441
442func TestWaitInf(t *testing.T) {
443	lim := NewLimiter(Inf, 0)
444
445	runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true})
446}
447
448func BenchmarkAllowN(b *testing.B) {
449	lim := NewLimiter(Every(1*time.Second), 1)
450	now := time.Now()
451	b.ReportAllocs()
452	b.ResetTimer()
453	b.RunParallel(func(pb *testing.PB) {
454		for pb.Next() {
455			lim.AllowN(now, 1)
456		}
457	})
458}
459
460func BenchmarkWaitNNoDelay(b *testing.B) {
461	lim := NewLimiter(Limit(b.N), b.N)
462	ctx := context.Background()
463	b.ReportAllocs()
464	b.ResetTimer()
465	for i := 0; i < b.N; i++ {
466		lim.WaitN(ctx, 1)
467	}
468}
469