1/*
2Copyright 2017 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package async
18
19import (
20	"sync"
21	"testing"
22	"time"
23)
24
25// Track calls to the managed function.
26type receiver struct {
27	lock    sync.Mutex
28	run     bool
29	retryFn func()
30}
31
32func (r *receiver) F() {
33	r.lock.Lock()
34	defer r.lock.Unlock()
35	r.run = true
36
37	if r.retryFn != nil {
38		r.retryFn()
39		r.retryFn = nil
40	}
41}
42
43func (r *receiver) reset() bool {
44	r.lock.Lock()
45	defer r.lock.Unlock()
46	was := r.run
47	r.run = false
48	return was
49}
50
51func (r *receiver) setRetryFn(retryFn func()) {
52	r.lock.Lock()
53	defer r.lock.Unlock()
54	r.retryFn = retryFn
55}
56
57// A single change event in the fake timer.
58type timerUpdate struct {
59	active bool
60	next   time.Duration // iff active == true
61}
62
63// Fake time.
64type fakeTimer struct {
65	c chan time.Time
66
67	lock    sync.Mutex
68	now     time.Time
69	timeout time.Time
70	active  bool
71
72	updated chan timerUpdate
73}
74
75func newFakeTimer() *fakeTimer {
76	ft := &fakeTimer{
77		now:     time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
78		c:       make(chan time.Time),
79		updated: make(chan timerUpdate),
80	}
81	return ft
82}
83
84func (ft *fakeTimer) C() <-chan time.Time {
85	return ft.c
86}
87
88func (ft *fakeTimer) Reset(in time.Duration) bool {
89	ft.lock.Lock()
90	defer ft.lock.Unlock()
91
92	was := ft.active
93	ft.active = true
94	ft.timeout = ft.now.Add(in)
95	ft.updated <- timerUpdate{
96		active: true,
97		next:   in,
98	}
99	return was
100}
101
102func (ft *fakeTimer) Stop() bool {
103	ft.lock.Lock()
104	defer ft.lock.Unlock()
105
106	was := ft.active
107	ft.active = false
108	ft.updated <- timerUpdate{
109		active: false,
110	}
111	return was
112}
113
114func (ft *fakeTimer) Now() time.Time {
115	ft.lock.Lock()
116	defer ft.lock.Unlock()
117
118	return ft.now
119}
120
121func (ft *fakeTimer) Remaining() time.Duration {
122	ft.lock.Lock()
123	defer ft.lock.Unlock()
124
125	return ft.timeout.Sub(ft.now)
126}
127
128func (ft *fakeTimer) Since(t time.Time) time.Duration {
129	ft.lock.Lock()
130	defer ft.lock.Unlock()
131
132	return ft.now.Sub(t)
133}
134
135func (ft *fakeTimer) Sleep(d time.Duration) {
136	// ft.advance grabs ft.lock
137	ft.advance(d)
138}
139
140// advance the current time.
141func (ft *fakeTimer) advance(d time.Duration) {
142	ft.lock.Lock()
143	defer ft.lock.Unlock()
144
145	ft.now = ft.now.Add(d)
146	if ft.active && !ft.now.Before(ft.timeout) {
147		ft.active = false
148		ft.c <- ft.timeout
149	}
150}
151
152// return the calling line number (for printing)
153// test the timer's state
154func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
155	if upd.active != active {
156		t.Fatalf("%s: expected timer active=%v", name, active)
157	}
158	if active && upd.next != next {
159		t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
160	}
161}
162
163// test and reset the receiver's state
164func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
165	triggered := receiver.reset()
166	if expected && !triggered {
167		t.Fatalf("%s: function should have been called", name)
168	} else if !expected && triggered {
169		t.Fatalf("%s: function should not have been called", name)
170	}
171}
172
173// Durations embedded in test cases depend on these.
174var minInterval = 1 * time.Second
175var maxInterval = 10 * time.Second
176
177func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
178	upd := <-timer.updated // wait for stop
179	checkReceiver(name, t, obj, expectCall)
180	checkReceiver(name, t, obj, false) // prove post-condition
181	checkTimer(name, t, upd, false, 0)
182	upd = <-timer.updated // wait for reset
183	checkTimer(name, t, upd, true, expectNext)
184}
185
186func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
187	waitForReset(name, t, timer, obj, true, maxInterval)
188}
189
190func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
191	// It will first get reset as with a normal run, and then get set again
192	waitForRun(name, t, timer, obj)
193	waitForReset(name, t, timer, obj, false, expectNext)
194}
195
196func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
197	waitForReset(name, t, timer, obj, false, expectNext)
198}
199
200func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
201	select {
202	case <-timer.c:
203		t.Fatalf("%s: unexpected timer tick", name)
204	case upd := <-timer.updated:
205		t.Fatalf("%s: unexpected timer update %v", name, upd)
206	default:
207	}
208	checkReceiver(name, t, obj, false)
209}
210
211func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
212	obj := &receiver{}
213	timer := newFakeTimer()
214	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
215	stop := make(chan struct{})
216
217	var upd timerUpdate
218
219	// Start.
220	go runner.Loop(stop)
221	upd = <-timer.updated // wait for initial time to be set to max
222	checkTimer("init", t, upd, true, maxInterval)
223	checkReceiver("init", t, obj, false)
224
225	// Run once, immediately.
226	// rel=0ms
227	runner.Run()
228	waitForRun("first run", t, timer, obj)
229
230	// Run again, before minInterval expires.
231	timer.advance(500 * time.Millisecond) // rel=500ms
232	runner.Run()
233	waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
234
235	// Run again, before minInterval expires.
236	timer.advance(499 * time.Millisecond) // rel=999ms
237	runner.Run()
238	waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
239
240	// Do the deferred run
241	timer.advance(1 * time.Millisecond) // rel=1000ms
242	waitForRun("second run", t, timer, obj)
243
244	// Try again immediately
245	runner.Run()
246	waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
247
248	// Run again, before minInterval expires.
249	timer.advance(1 * time.Millisecond) // rel=1ms
250	runner.Run()
251	waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
252
253	// Ensure that we don't run again early
254	timer.advance(998 * time.Millisecond) // rel=999ms
255	waitForNothing("premature", t, timer, obj)
256
257	// Do the deferred run
258	timer.advance(1 * time.Millisecond) // rel=1000ms
259	waitForRun("third run", t, timer, obj)
260
261	// Let minInterval pass, but there are no runs queued
262	timer.advance(1 * time.Second) // rel=1000ms
263	waitForNothing("minInterval", t, timer, obj)
264
265	// Let maxInterval pass
266	timer.advance(9 * time.Second) // rel=10000ms
267	waitForRun("maxInterval", t, timer, obj)
268
269	// Run again, before minInterval expires.
270	timer.advance(1 * time.Millisecond) // rel=1ms
271	runner.Run()
272	waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
273
274	// Let minInterval pass
275	timer.advance(999 * time.Millisecond) // rel=1000ms
276	waitForRun("fourth run", t, timer, obj)
277
278	// Clean up.
279	stop <- struct{}{}
280}
281
282func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
283	obj := &receiver{}
284	timer := newFakeTimer()
285	runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
286	stop := make(chan struct{})
287
288	var upd timerUpdate
289
290	// Start.
291	go runner.Loop(stop)
292	upd = <-timer.updated // wait for initial time to be set to max
293	checkTimer("init", t, upd, true, maxInterval)
294	checkReceiver("init", t, obj, false)
295
296	// Run once, immediately.
297	// abs=0ms, rel=0ms
298	runner.Run()
299	waitForRun("first run", t, timer, obj)
300
301	// Run again, before minInterval expires, with burst.
302	timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms
303	runner.Run()
304	waitForRun("second run", t, timer, obj)
305
306	// Run again, before minInterval expires.
307	timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms
308	runner.Run()
309	waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
310
311	// Run again, before minInterval expires.
312	timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms
313	runner.Run()
314	waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
315
316	// Run again, before minInterval expires.
317	timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms
318	runner.Run()
319	waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
320
321	// Advance timer enough to replenish bursts, but not enough to be minInterval
322	// after the last run
323	timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms
324	waitForNothing("not minInterval", t, timer, obj)
325	runner.Run()
326	waitForRun("third run", t, timer, obj)
327
328	// Run again, before minInterval expires.
329	timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms
330	runner.Run()
331	waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
332
333	// Run again, before minInterval expires.
334	timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms
335	runner.Run()
336	waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
337
338	// Advance and do the deferred run
339	timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms
340	waitForRun("fourth run", t, timer, obj)
341
342	// Run again, once burst has fully replenished.
343	timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms
344	runner.Run()
345	waitForRun("fifth run", t, timer, obj)
346	runner.Run()
347	waitForRun("sixth run", t, timer, obj)
348	runner.Run()
349	waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
350
351	// Wait until minInterval after the last run
352	timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms
353	waitForRun("seventh run", t, timer, obj)
354
355	// Wait for maxInterval
356	timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms
357	waitForRun("maxInterval", t, timer, obj)
358
359	// Clean up.
360	stop <- struct{}{}
361}
362
363func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
364	obj := &receiver{}
365	timer := newFakeTimer()
366	runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
367	stop := make(chan struct{})
368
369	var upd timerUpdate
370
371	// Start.
372	go runner.Loop(stop)
373	upd = <-timer.updated // wait for initial time to be set to max
374	checkTimer("init", t, upd, true, maxInterval)
375	checkReceiver("init", t, obj, false)
376
377	// Run once, immediately, and queue a retry
378	// rel=0ms
379	obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
380	runner.Run()
381	waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
382
383	// Nothing happens...
384	timer.advance(time.Second) // rel=1000ms
385	waitForNothing("minInterval, nothing queued", t, timer, obj)
386
387	// After retryInterval, function is called
388	timer.advance(4 * time.Second) // rel=5000ms
389	waitForRun("retry", t, timer, obj)
390
391	// Run again, before minInterval expires.
392	timer.advance(499 * time.Millisecond) // rel=499ms
393	runner.Run()
394	waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
395
396	// Do the deferred run, queue another retry after it returns
397	timer.advance(501 * time.Millisecond) // rel=1000ms
398	runner.RetryAfter(5 * time.Second)
399	waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
400
401	// Wait for minInterval to pass
402	timer.advance(time.Second) // rel=1000ms
403	waitForNothing("minInterval, nothing queued", t, timer, obj)
404
405	// Now do another run
406	runner.Run()
407	waitForRun("third run", t, timer, obj)
408
409	// Retry was cancelled because we already ran
410	timer.advance(4 * time.Second)
411	waitForNothing("retry cancelled", t, timer, obj)
412
413	// Run, queue a retry from a goroutine
414	obj.setRetryFn(func() {
415		go func() {
416			time.Sleep(100 * time.Millisecond)
417			runner.RetryAfter(5 * time.Second)
418		}()
419	})
420	runner.Run()
421	waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
422
423	// Call Run again before minInterval passes
424	timer.advance(100 * time.Millisecond) // rel=100ms
425	runner.Run()
426	waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
427
428	// Deferred run will run after minInterval passes
429	timer.advance(900 * time.Millisecond) // rel=1000ms
430	waitForRun("fifth run", t, timer, obj)
431
432	// Retry was cancelled because we already ran
433	timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter
434	waitForNothing("retry cancelled", t, timer, obj)
435
436	// Rerun happens after maxInterval
437	timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter
438	waitForNothing("premature", t, timer, obj)
439	timer.advance(time.Second) // rel=10s since run
440	waitForRun("maxInterval", t, timer, obj)
441
442	// Clean up.
443	stop <- struct{}{}
444}
445