1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5//go:build go1.7
6// +build go1.7
7
8package rate
9
10import (
11	"context"
12	"math"
13	"runtime"
14	"sync"
15	"sync/atomic"
16	"testing"
17	"time"
18)
19
20func TestLimit(t *testing.T) {
21	if Limit(10) == Inf {
22		t.Errorf("Limit(10) == Inf should be false")
23	}
24}
25
26func closeEnough(a, b Limit) bool {
27	return (math.Abs(float64(a)/float64(b)) - 1.0) < 1e-9
28}
29
30func TestEvery(t *testing.T) {
31	cases := []struct {
32		interval time.Duration
33		lim      Limit
34	}{
35		{0, Inf},
36		{-1, Inf},
37		{1 * time.Nanosecond, Limit(1e9)},
38		{1 * time.Microsecond, Limit(1e6)},
39		{1 * time.Millisecond, Limit(1e3)},
40		{10 * time.Millisecond, Limit(100)},
41		{100 * time.Millisecond, Limit(10)},
42		{1 * time.Second, Limit(1)},
43		{2 * time.Second, Limit(0.5)},
44		{time.Duration(2.5 * float64(time.Second)), Limit(0.4)},
45		{4 * time.Second, Limit(0.25)},
46		{10 * time.Second, Limit(0.1)},
47		{time.Duration(math.MaxInt64), Limit(1e9 / float64(math.MaxInt64))},
48	}
49	for _, tc := range cases {
50		lim := Every(tc.interval)
51		if !closeEnough(lim, tc.lim) {
52			t.Errorf("Every(%v) = %v want %v", tc.interval, lim, tc.lim)
53		}
54	}
55}
56
57const (
58	d = 100 * time.Millisecond
59)
60
61var (
62	t0 = time.Now()
63	t1 = t0.Add(time.Duration(1) * d)
64	t2 = t0.Add(time.Duration(2) * d)
65	t3 = t0.Add(time.Duration(3) * d)
66	t4 = t0.Add(time.Duration(4) * d)
67	t5 = t0.Add(time.Duration(5) * d)
68	t9 = t0.Add(time.Duration(9) * d)
69)
70
71type allow struct {
72	t  time.Time
73	n  int
74	ok bool
75}
76
77func run(t *testing.T, lim *Limiter, allows []allow) {
78	t.Helper()
79	for i, allow := range allows {
80		ok := lim.AllowN(allow.t, allow.n)
81		if ok != allow.ok {
82			t.Errorf("step %d: lim.AllowN(%v, %v) = %v want %v",
83				i, allow.t, allow.n, ok, allow.ok)
84		}
85	}
86}
87
88func TestLimiterBurst1(t *testing.T) {
89	run(t, NewLimiter(10, 1), []allow{
90		{t0, 1, true},
91		{t0, 1, false},
92		{t0, 1, false},
93		{t1, 1, true},
94		{t1, 1, false},
95		{t1, 1, false},
96		{t2, 2, false}, // burst size is 1, so n=2 always fails
97		{t2, 1, true},
98		{t2, 1, false},
99	})
100}
101
102func TestLimiterBurst3(t *testing.T) {
103	run(t, NewLimiter(10, 3), []allow{
104		{t0, 2, true},
105		{t0, 2, false},
106		{t0, 1, true},
107		{t0, 1, false},
108		{t1, 4, false},
109		{t2, 1, true},
110		{t3, 1, true},
111		{t4, 1, true},
112		{t4, 1, true},
113		{t4, 1, false},
114		{t4, 1, false},
115		{t9, 3, true},
116		{t9, 0, true},
117	})
118}
119
120func TestLimiterJumpBackwards(t *testing.T) {
121	run(t, NewLimiter(10, 3), []allow{
122		{t1, 1, true}, // start at t1
123		{t0, 1, true}, // jump back to t0, two tokens remain
124		{t0, 1, true},
125		{t0, 1, false},
126		{t0, 1, false},
127		{t1, 1, true}, // got a token
128		{t1, 1, false},
129		{t1, 1, false},
130		{t2, 1, true}, // got another token
131		{t2, 1, false},
132		{t2, 1, false},
133	})
134}
135
136// Ensure that tokensFromDuration doesn't produce
137// rounding errors by truncating nanoseconds.
138// See golang.org/issues/34861.
139func TestLimiter_noTruncationErrors(t *testing.T) {
140	if !NewLimiter(0.7692307692307693, 1).Allow() {
141		t.Fatal("expected true")
142	}
143}
144
145func TestSimultaneousRequests(t *testing.T) {
146	const (
147		limit       = 1
148		burst       = 5
149		numRequests = 15
150	)
151	var (
152		wg    sync.WaitGroup
153		numOK = uint32(0)
154	)
155
156	// Very slow replenishing bucket.
157	lim := NewLimiter(limit, burst)
158
159	// Tries to take a token, atomically updates the counter and decreases the wait
160	// group counter.
161	f := func() {
162		defer wg.Done()
163		if ok := lim.Allow(); ok {
164			atomic.AddUint32(&numOK, 1)
165		}
166	}
167
168	wg.Add(numRequests)
169	for i := 0; i < numRequests; i++ {
170		go f()
171	}
172	wg.Wait()
173	if numOK != burst {
174		t.Errorf("numOK = %d, want %d", numOK, burst)
175	}
176}
177
178func TestLongRunningQPS(t *testing.T) {
179	if testing.Short() {
180		t.Skip("skipping in short mode")
181	}
182	if runtime.GOOS == "openbsd" {
183		t.Skip("low resolution time.Sleep invalidates test (golang.org/issue/14183)")
184		return
185	}
186
187	// The test runs for a few seconds executing many requests and then checks
188	// that overall number of requests is reasonable.
189	const (
190		limit = 100
191		burst = 100
192	)
193	var numOK = int32(0)
194
195	lim := NewLimiter(limit, burst)
196
197	var wg sync.WaitGroup
198	f := func() {
199		if ok := lim.Allow(); ok {
200			atomic.AddInt32(&numOK, 1)
201		}
202		wg.Done()
203	}
204
205	start := time.Now()
206	end := start.Add(5 * time.Second)
207	for time.Now().Before(end) {
208		wg.Add(1)
209		go f()
210
211		// This will still offer ~500 requests per second, but won't consume
212		// outrageous amount of CPU.
213		time.Sleep(2 * time.Millisecond)
214	}
215	wg.Wait()
216	elapsed := time.Since(start)
217	ideal := burst + (limit * float64(elapsed) / float64(time.Second))
218
219	// We should never get more requests than allowed.
220	if want := int32(ideal + 1); numOK > want {
221		t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
222	}
223	// We should get very close to the number of requests allowed.
224	if want := int32(0.999 * ideal); numOK < want {
225		t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
226	}
227}
228
229type request struct {
230	t   time.Time
231	n   int
232	act time.Time
233	ok  bool
234}
235
236// dFromDuration converts a duration to a multiple of the global constant d
237func dFromDuration(dur time.Duration) int {
238	// Adding a millisecond to be swallowed by the integer division
239	// because we don't care about small inaccuracies
240	return int((dur + time.Millisecond) / d)
241}
242
243// dSince returns multiples of d since t0
244func dSince(t time.Time) int {
245	return dFromDuration(t.Sub(t0))
246}
247
248func runReserve(t *testing.T, lim *Limiter, req request) *Reservation {
249	t.Helper()
250	return runReserveMax(t, lim, req, InfDuration)
251}
252
253func runReserveMax(t *testing.T, lim *Limiter, req request, maxReserve time.Duration) *Reservation {
254	t.Helper()
255	r := lim.reserveN(req.t, req.n, maxReserve)
256	if r.ok && (dSince(r.timeToAct) != dSince(req.act)) || r.ok != req.ok {
257		t.Errorf("lim.reserveN(t%d, %v, %v) = (t%d, %v) want (t%d, %v)",
258			dSince(req.t), req.n, maxReserve, dSince(r.timeToAct), r.ok, dSince(req.act), req.ok)
259	}
260	return &r
261}
262
263func TestSimpleReserve(t *testing.T) {
264	lim := NewLimiter(10, 2)
265
266	runReserve(t, lim, request{t0, 2, t0, true})
267	runReserve(t, lim, request{t0, 2, t2, true})
268	runReserve(t, lim, request{t3, 2, t4, true})
269}
270
271func TestMix(t *testing.T) {
272	lim := NewLimiter(10, 2)
273
274	runReserve(t, lim, request{t0, 3, t1, false}) // should return false because n > Burst
275	runReserve(t, lim, request{t0, 2, t0, true})
276	run(t, lim, []allow{{t1, 2, false}}) // not enough tokens - don't allow
277	runReserve(t, lim, request{t1, 2, t2, true})
278	run(t, lim, []allow{{t1, 1, false}}) // negative tokens - don't allow
279	run(t, lim, []allow{{t3, 1, true}})
280}
281
282func TestCancelInvalid(t *testing.T) {
283	lim := NewLimiter(10, 2)
284
285	runReserve(t, lim, request{t0, 2, t0, true})
286	r := runReserve(t, lim, request{t0, 3, t3, false})
287	r.CancelAt(t0)                               // should have no effect
288	runReserve(t, lim, request{t0, 2, t2, true}) // did not get extra tokens
289}
290
291func TestCancelLast(t *testing.T) {
292	lim := NewLimiter(10, 2)
293
294	runReserve(t, lim, request{t0, 2, t0, true})
295	r := runReserve(t, lim, request{t0, 2, t2, true})
296	r.CancelAt(t1) // got 2 tokens back
297	runReserve(t, lim, request{t1, 2, t2, true})
298}
299
300func TestCancelTooLate(t *testing.T) {
301	lim := NewLimiter(10, 2)
302
303	runReserve(t, lim, request{t0, 2, t0, true})
304	r := runReserve(t, lim, request{t0, 2, t2, true})
305	r.CancelAt(t3) // too late to cancel - should have no effect
306	runReserve(t, lim, request{t3, 2, t4, true})
307}
308
309func TestCancel0Tokens(t *testing.T) {
310	lim := NewLimiter(10, 2)
311
312	runReserve(t, lim, request{t0, 2, t0, true})
313	r := runReserve(t, lim, request{t0, 1, t1, true})
314	runReserve(t, lim, request{t0, 1, t2, true})
315	r.CancelAt(t0) // got 0 tokens back
316	runReserve(t, lim, request{t0, 1, t3, true})
317}
318
319func TestCancel1Token(t *testing.T) {
320	lim := NewLimiter(10, 2)
321
322	runReserve(t, lim, request{t0, 2, t0, true})
323	r := runReserve(t, lim, request{t0, 2, t2, true})
324	runReserve(t, lim, request{t0, 1, t3, true})
325	r.CancelAt(t2) // got 1 token back
326	runReserve(t, lim, request{t2, 2, t4, true})
327}
328
329func TestCancelMulti(t *testing.T) {
330	lim := NewLimiter(10, 4)
331
332	runReserve(t, lim, request{t0, 4, t0, true})
333	rA := runReserve(t, lim, request{t0, 3, t3, true})
334	runReserve(t, lim, request{t0, 1, t4, true})
335	rC := runReserve(t, lim, request{t0, 1, t5, true})
336	rC.CancelAt(t1) // get 1 token back
337	rA.CancelAt(t1) // get 2 tokens back, as if C was never reserved
338	runReserve(t, lim, request{t1, 3, t5, true})
339}
340
341func TestReserveJumpBack(t *testing.T) {
342	lim := NewLimiter(10, 2)
343
344	runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
345	runReserve(t, lim, request{t0, 1, t1, true}) // should violate Limit,Burst
346	runReserve(t, lim, request{t2, 2, t3, true})
347}
348
349func TestReserveJumpBackCancel(t *testing.T) {
350	lim := NewLimiter(10, 2)
351
352	runReserve(t, lim, request{t1, 2, t1, true}) // start at t1
353	r := runReserve(t, lim, request{t1, 2, t3, true})
354	runReserve(t, lim, request{t1, 1, t4, true})
355	r.CancelAt(t0)                               // cancel at t0, get 1 token back
356	runReserve(t, lim, request{t1, 2, t4, true}) // should violate Limit,Burst
357}
358
359func TestReserveSetLimit(t *testing.T) {
360	lim := NewLimiter(5, 2)
361
362	runReserve(t, lim, request{t0, 2, t0, true})
363	runReserve(t, lim, request{t0, 2, t4, true})
364	lim.SetLimitAt(t2, 10)
365	runReserve(t, lim, request{t2, 1, t4, true}) // violates Limit and Burst
366}
367
368func TestReserveSetBurst(t *testing.T) {
369	lim := NewLimiter(5, 2)
370
371	runReserve(t, lim, request{t0, 2, t0, true})
372	runReserve(t, lim, request{t0, 2, t4, true})
373	lim.SetBurstAt(t3, 4)
374	runReserve(t, lim, request{t0, 4, t9, true}) // violates Limit and Burst
375}
376
377func TestReserveSetLimitCancel(t *testing.T) {
378	lim := NewLimiter(5, 2)
379
380	runReserve(t, lim, request{t0, 2, t0, true})
381	r := runReserve(t, lim, request{t0, 2, t4, true})
382	lim.SetLimitAt(t2, 10)
383	r.CancelAt(t2) // 2 tokens back
384	runReserve(t, lim, request{t2, 2, t3, true})
385}
386
387func TestReserveMax(t *testing.T) {
388	lim := NewLimiter(10, 2)
389	maxT := d
390
391	runReserveMax(t, lim, request{t0, 2, t0, true}, maxT)
392	runReserveMax(t, lim, request{t0, 1, t1, true}, maxT)  // reserve for close future
393	runReserveMax(t, lim, request{t0, 1, t2, false}, maxT) // time to act too far in the future
394}
395
396type wait struct {
397	name   string
398	ctx    context.Context
399	n      int
400	delay  int // in multiples of d
401	nilErr bool
402}
403
404func runWait(t *testing.T, lim *Limiter, w wait) {
405	t.Helper()
406	start := time.Now()
407	err := lim.WaitN(w.ctx, w.n)
408	delay := time.Since(start)
409	if (w.nilErr && err != nil) || (!w.nilErr && err == nil) || w.delay != dFromDuration(delay) {
410		errString := "<nil>"
411		if !w.nilErr {
412			errString = "<non-nil error>"
413		}
414		t.Errorf("lim.WaitN(%v, lim, %v) = %v with delay %v ; want %v with delay %v",
415			w.name, w.n, err, delay, errString, d*time.Duration(w.delay))
416	}
417}
418
419func TestWaitSimple(t *testing.T) {
420	lim := NewLimiter(10, 3)
421
422	ctx, cancel := context.WithCancel(context.Background())
423	cancel()
424	runWait(t, lim, wait{"already-cancelled", ctx, 1, 0, false})
425
426	runWait(t, lim, wait{"exceed-burst-error", context.Background(), 4, 0, false})
427
428	runWait(t, lim, wait{"act-now", context.Background(), 2, 0, true})
429	runWait(t, lim, wait{"act-later", context.Background(), 3, 2, true})
430}
431
432func TestWaitCancel(t *testing.T) {
433	lim := NewLimiter(10, 3)
434
435	ctx, cancel := context.WithCancel(context.Background())
436	runWait(t, lim, wait{"act-now", ctx, 2, 0, true}) // after this lim.tokens = 1
437	go func() {
438		time.Sleep(d)
439		cancel()
440	}()
441	runWait(t, lim, wait{"will-cancel", ctx, 3, 1, false})
442	// should get 3 tokens back, and have lim.tokens = 2
443	t.Logf("tokens:%v last:%v lastEvent:%v", lim.tokens, lim.last, lim.lastEvent)
444	runWait(t, lim, wait{"act-now-after-cancel", context.Background(), 2, 0, true})
445}
446
447func TestWaitTimeout(t *testing.T) {
448	lim := NewLimiter(10, 3)
449
450	ctx, cancel := context.WithTimeout(context.Background(), d)
451	defer cancel()
452	runWait(t, lim, wait{"act-now", ctx, 2, 0, true})
453	runWait(t, lim, wait{"w-timeout-err", ctx, 3, 0, false})
454}
455
456func TestWaitInf(t *testing.T) {
457	lim := NewLimiter(Inf, 0)
458
459	runWait(t, lim, wait{"exceed-burst-no-error", context.Background(), 3, 0, true})
460}
461
462func BenchmarkAllowN(b *testing.B) {
463	lim := NewLimiter(Every(1*time.Second), 1)
464	now := time.Now()
465	b.ReportAllocs()
466	b.ResetTimer()
467	b.RunParallel(func(pb *testing.PB) {
468		for pb.Next() {
469			lim.AllowN(now, 1)
470		}
471	})
472}
473
474func BenchmarkWaitNNoDelay(b *testing.B) {
475	lim := NewLimiter(Limit(b.N), b.N)
476	ctx := context.Background()
477	b.ReportAllocs()
478	b.ResetTimer()
479	for i := 0; i < b.N; i++ {
480		lim.WaitN(ctx, 1)
481	}
482}
483