1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5//go:build go1.7
6// +build go1.7
7
8package rate
9
10import (
11	"context"
12	"math"
13	"runtime"
14	"sync"
15	"sync/atomic"
16	"testing"
17	"time"
18)
19
20func TestLimit(t *testing.T) {
21	if Limit(10) == Inf {
22		t.Errorf("Limit(10) == Inf should be false")
23	}
24}
25
26func closeEnough(a, b Limit) bool {
27	return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9
28}
29
30func TestEvery(t *testing.T) {
31	cases := []struct {
32		interval time.Duration
33		lim      Limit
34	}{
35		{0, Inf},
36		{-1, Inf},
37		{1 * time.Nanosecond, Limit(1e9)},
38		{1 * time.Microsecond, Limit(1e6)},
39		{1 * time.Millisecond, Limit(1e3)},
40		{10 * time.Millisecond, Limit(100)},
41		{100 * time.Millisecond, Limit(10)},
42		{1 * time.Second, Limit(1)},
43		{2 * time.Second, Limit(0.5)},
44		{time.Duration(2.5 * float64(time.Second)), Limit(0.4)},
45		{4 * time.Second, Limit(0.25)},
46		{10 * time.Second, Limit(0.1)},
47		{time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))},
48	}
49	for _, tc := range cases {
50		lim := Every(tc.interval)
51		if !closeEnough(lim, tc.lim) {
52			t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim)
53		}
54	}
55}
56
57const (
58	d = 100 * time.Millisecond
59)
60
61var (
62	t0 = time.Now()
63	t1 = t0.Add(time.Duration(1) * d)
64	t2 = t0.Add(time.Duration(2) * d)
65	t3 = t0.Add(time.Duration(3) * d)
66	t4 = t0.Add(time.Duration(4) * d)
67	t5 = t0.Add(time.Duration(5) * d)
68	t9 = t0.Add(time.Duration(9) * d)
69)
70
71type allow struct {
72	t  time.Time
73	n  int
74	ok bool
75}
76
77func run(t *testing.T, lim *Limiter, allows []allow) {
78	for i, allow := range allows {
79		ok := lim.AllowN(allow.t, allow.n)
80		if ok != allow.ok {
81			t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v",
82				i, allow.t, allow.n, ok, allow.ok)
83		}
84	}
85}
86
87func TestLimiterBurst1(t *testing.T) {
88	run(t, NewLimiter(10, 1), []allow{
89		{t0, 1, true},
90		{t0, 1, false},
91		{t0, 1, false},
92		{t1, 1, true},
93		{t1, 1, false},
94		{t1, 1, false},
95		{t2, 2, false}, // burst size is 1, so n=2 always fails
96		{t2, 1, true},
97		{t2, 1, false},
98	})
99}
100
101func TestLimiterBurst3(t *testing.T) {
102	run(t, NewLimiter(10, 3), []allow{
103		{t0, 2, true},
104		{t0, 2, false},
105		{t0, 1, true},
106		{t0, 1, false},
107		{t1, 4, false},
108		{t2, 1, true},
109		{t3, 1, true},
110		{t4, 1, true},
111		{t4, 1, true},
112		{t4, 1, false},
113		{t4, 1, false},
114		{t9, 3, true},
115		{t9, 0, true},
116	})
117}
118
119func TestLimiterJumpBackwards(t *testing.T) {
120	run(t, NewLimiter(10, 3), []allow{
121		{t1, 1, true}, // start at t1
122		{t0, 1, true}, // jump back to t0, two tokens remain
123		{t0, 1, true},
124		{t0, 1, false},
125		{t0, 1, false},
126		{t1, 1, true}, // got a token
127		{t1, 1, false},
128		{t1, 1, false},
129		{t2, 1, true}, // got another token
130		{t2, 1, false},
131		{t2, 1, false},
132	})
133}
134
135// Ensure that tokensFromDuration doesn't produce
136// rounding errors by truncating nanoseconds.
137// See golang.org/issues/34861.
138func TestLimiter_noTruncationErrors(t *testing.T) {
139	if !NewLimiter(0.7692307692307693, 1).Allow() {
140		t.Fatal("expected true")
141	}
142}
143
144func TestSimultaneousRequests(t *testing.T) {
145	const (
146		limit       = 1
147		burst       = 5
148		numRequests = 15
149	)
150	var (
151		wg    sync.WaitGroup
152		numOK = uint32(0)
153	)
154
155	// Very slow replenishing bucket.
156	lim := NewLimiter(limit, burst)
157
158	// Tries to take a token, atomically updates the counter and decreases the wait
159	// group counter.
160	f := func() {
161		defer wg.Done()
162		if ok := lim.Allow(); ok {
163			atomic.AddUint32(&numOK, 1)
164		}
165	}
166
167	wg.Add(numRequests)
168	for i := 0; i < numRequests; i++ {
169		go f()
170	}
171	wg.Wait()
172	if numOK != burst {
173		t.Errorf("numOK = %d, want %d", numOK, burst)
174	}
175}
176
177func TestLongRunningQPS(t *testing.T) {
178	if testing.Short() {
179		t.Skip("skipping in short mode")
180	}
181	if runtime.GOOS == "openbsd" {
182		t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)")
183		return
184	}
185
186	// The test runs for a few seconds executing many requests and then checks
187	// that overall number of requests is reasonable.
188	const (
189		limit = 100
190		burst = 100
191	)
192	var numOK = int32(0)
193
194	lim := NewLimiter(limit, burst)
195
196	var wg sync.WaitGroup
197	f := func() {
198		if ok := lim.Allow(); ok {
199			atomic.AddInt32(&numOK, 1)
200		}
201		wg.Done()
202	}
203
204	start := time.Now()
205	end := start.Add(5 * time.Second)
206	for time.Now().Before(end) {
207		wg.Add(1)
208		go f()
209
210		// This will still offer ~500 requests per second, but won't consume
211		// outrageous amount of CPU.
212		time.Sleep(2 * time.Millisecond)
213	}
214	wg.Wait()
215	elapsed := time.Since(start)
216	ideal := burst + (limit * float64(elapsed) / float64(time.Second))
217
218	// We should never get more requests than allowed.
219	if want := int32(ideal + 1); numOK > want {
220		t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
221	}
222	// We should get very close to the number of requests allowed.
223	if want := int32(0.999 * ideal); numOK < want {
224		t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
225	}
226}
227
228type request struct {
229	t   time.Time
230	n   int
231	act time.Time
232	ok  bool
233}
234
235// dFromDuration converts a duration to a multiple of the global constant d
236func dFromDuration(dur time.Duration) int {
237	// Adding a millisecond to be swallowed by the integer division
238	// because we don't care about small inaccuracies
239	return int((dur + time.Millisecond) / d)
240}
241
242// dSince returns multiples of d since t0
243func dSince(t time.Time) int {
244	return dFromDuration(t.Sub(t0))
245}
246
247func runReserve(t *testing.T, lim *Limiter, req request) *Reservation {
248	return runReserveMax(t, lim, req, InfDuration)
249}
250
251func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation {
252	r := lim.reserveN(req.t, req.n, maxReserve)
253	if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok {
254		t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)",
255			dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok)
256	}
257	return &r
258}
259
260func TestSimpleReserve(t *testing.T) {
261	lim := NewLimiter(10, 2)
262
263	runReserve(t, lim, request{t0, 2, t0, true})
264	runReserve(t, lim, request{t0, 2, t2, true})
265	runReserve(t, lim, request{t3, 2, t4, true})
266}
267
268func TestMix(t *testing.T) {
269	lim := NewLimiter(10, 2)
270
271	runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst
272	runReserve(t, lim, request{t0, 2, t0, true})
273	run(t, lim, []allow{{t1, 2, false}}) // not enought tokens - don't allow
274	runReserve(t, lim, request{t1, 2, t2, true})
275	run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow
276	run(t, lim, []allow{{t3, 1, true}})
277}
278
279func TestCancelInvalid(t *testing.T) {
280	lim := NewLimiter(10, 2)
281
282	runReserve(t, lim, request{t0, 2, t0, true})
283	r := runReserve(t, lim, request{t0, 3, t3, false})
284	r.CancelAt(t0)                               // should have no effect
285	runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens
286}
287
288func TestCancelLast(t *testing.T) {
289	lim := NewLimiter(10, 2)
290
291	runReserve(t, lim, request{t0, 2, t0, true})
292	r := runReserve(t, lim, request{t0, 2, t2, true})
293	r.CancelAt(t1) // got 2 tokens back
294	runReserve(t, lim, request{t1, 2, t2, true})
295}
296
297func TestCancelTooLate(t *testing.T) {
298	lim := NewLimiter(10, 2)
299
300	runReserve(t, lim, request{t0, 2, t0, true})
301	r := runReserve(t, lim, request{t0, 2, t2, true})
302	r.CancelAt(t3) // too late to cancel - should have no effect
303	runReserve(t, lim, request{t3, 2, t4, true})
304}
305
306func TestCancel0Tokens(t *testing.T) {
307	lim := NewLimiter(10, 2)
308
309	runReserve(t, lim, request{t0, 2, t0, true})
310	r := runReserve(t, lim, request{t0, 1, t1, true})
311	runReserve(t, lim, request{t0, 1, t2, true})
312	r.CancelAt(t0) // got 0 tokens back
313	runReserve(t, lim, request{t0, 1, t3, true})
314}
315
316func TestCancel1Token(t *testing.T) {
317	lim := NewLimiter(10, 2)
318
319	runReserve(t, lim, request{t0, 2, t0, true})
320	r := runReserve(t, lim, request{t0, 2, t2, true})
321	runReserve(t, lim, request{t0, 1, t3, true})
322	r.CancelAt(t2) // got 1 token back
323	runReserve(t, lim, request{t2, 2, t4, true})
324}
325
326func TestCancelMulti(t *testing.T) {
327	lim := NewLimiter(10, 4)
328
329	runReserve(t, lim, request{t0, 4, t0, true})
330	rA := runReserve(t, lim, request{t0, 3, t3, true})
331	runReserve(t, lim, request{t0, 1, t4, true})
332	rC := runReserve(t, lim, request{t0, 1, t5, true})
333	rC.CancelAt(t1) // get 1 token back
334	rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved
335	runReserve(t, lim, request{t1, 3, t5, true})
336}
337
338func TestReserveJumpBack(t *testing.T) {
339	lim := NewLimiter(10, 2)
340
341	runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
342	runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst
343	runReserve(t, lim, request{t2, 2, t3, true})
344}
345
346func TestReserveJumpBackCancel(t *testing.T) {
347	lim := NewLimiter(10, 2)
348
349	runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
350	r := runReserve(t, lim, request{t1, 2, t3, true})
351	runReserve(t, lim, request{t1, 1, t4, true})
352	r.CancelAt(t0)                               // cancel at t0, get 1 token back
353	runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst
354}
355
356func TestReserveSetLimit(t *testing.T) {
357	lim := NewLimiter(5, 2)
358
359	runReserve(t, lim, request{t0, 2, t0, true})
360	runReserve(t, lim, request{t0, 2, t4, true})
361	lim.SetLimitAt(t2, 10)
362	runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst
363}
364
365func TestReserveSetBurst(t *testing.T) {
366	lim := NewLimiter(5, 2)
367
368	runReserve(t, lim, request{t0, 2, t0, true})
369	runReserve(t, lim, request{t0, 2, t4, true})
370	lim.SetBurstAt(t3, 4)
371	runReserve(t, lim, request{t0, 4, t9, true}) // violates Limit and Burst
372}
373
374func TestReserveSetLimitCancel(t *testing.T) {
375	lim := NewLimiter(5, 2)
376
377	runReserve(t, lim, request{t0, 2, t0, true})
378	r := runReserve(t, lim, request{t0, 2, t4, true})
379	lim.SetLimitAt(t2, 10)
380	r.CancelAt(t2) // 2 tokens back
381	runReserve(t, lim, request{t2, 2, t3, true})
382}
383
384func TestReserveMax(t *testing.T) {
385	lim := NewLimiter(10, 2)
386	maxT := d
387
388	runReserveMax(t, lim, request{t0, 2, t0, true}, maxT)
389	runReserveMax(t, lim, request{t0, 1, t1, true}, maxT)  // reserve for close future
390	runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future
391}
392
393type wait struct {
394	name   string
395	ctx    context.Context
396	n      int
397	delay  int // in multiples of d
398	nilErr bool
399}
400
401func runWait(t *testing.T, lim *Limiter, w wait) {
402	start := time.Now()
403	err := lim.WaitN(w.ctx, w.n)
404	delay := time.Now().Sub(start)
405	if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) {
406		errString := "<nil>"
407		if !w.nilErr {
408			errString = "<non-nil error>"
409		}
410		t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v",
411			w.name, w.n, err, delay, errString, d*time.Duration(w.delay))
412	}
413}
414
415func TestWaitSimple(t *testing.T) {
416	lim := NewLimiter(10, 3)
417
418	ctx, cancel := context.WithCancel(context.Background())
419	cancel()
420	runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false})
421
422	runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false})
423
424	runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true})
425	runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true})
426}
427
428func TestWaitCancel(t *testing.T) {
429	lim := NewLimiter(10, 3)
430
431	ctx, cancel := context.WithCancel(context.Background())
432	runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1
433	go func() {
434		time.Sleep(d)
435		cancel()
436	}()
437	runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false})
438	// should get 3 tokens back, and have lim.tokens = 2
439	t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent)
440	runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true})
441}
442
443func TestWaitTimeout(t *testing.T) {
444	lim := NewLimiter(10, 3)
445
446	ctx, cancel := context.WithTimeout(context.Background(), d)
447	defer cancel()
448	runWait(t, lim, wait{"act-now", ctx, 2, 0, true})
449	runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false})
450}
451
452func TestWaitInf(t *testing.T) {
453	lim := NewLimiter(Inf, 0)
454
455	runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true})
456}
457
458func BenchmarkAllowN(b *testing.B) {
459	lim := NewLimiter(Every(1*time.Second), 1)
460	now := time.Now()
461	b.ReportAllocs()
462	b.ResetTimer()
463	b.RunParallel(func(pb *testing.PB) {
464		for pb.Next() {
465			lim.AllowN(now, 1)
466		}
467	})
468}
469
470func BenchmarkWaitNNoDelay(b *testing.B) {
471	lim := NewLimiter(Limit(b.N), b.N)
472	ctx := context.Background()
473	b.ReportAllocs()
474	b.ResetTimer()
475	for i := 0; i < b.N; i++ {
476		lim.WaitN(ctx, 1)
477	}
478}
479