1/* 2Copyright 2017 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package async 18 19import ( 20 "sync" 21 "testing" 22 "time" 23) 24 25// Track calls to the managed function. 26type receiver struct { 27 lock sync.Mutex 28 run bool 29 retryFn func() 30} 31 32func (r *receiver) F() { 33 r.lock.Lock() 34 defer r.lock.Unlock() 35 r.run = true 36 37 if r.retryFn != nil { 38 r.retryFn() 39 r.retryFn = nil 40 } 41} 42 43func (r *receiver) reset() bool { 44 r.lock.Lock() 45 defer r.lock.Unlock() 46 was := r.run 47 r.run = false 48 return was 49} 50 51func (r *receiver) setRetryFn(retryFn func()) { 52 r.lock.Lock() 53 defer r.lock.Unlock() 54 r.retryFn = retryFn 55} 56 57// A single change event in the fake timer. 58type timerUpdate struct { 59 active bool 60 next time.Duration // iff active == true 61} 62 63// Fake time. 64type fakeTimer struct { 65 c chan time.Time 66 67 lock sync.Mutex 68 now time.Time 69 timeout time.Time 70 active bool 71 72 updated chan timerUpdate 73} 74 75func newFakeTimer() *fakeTimer { 76 ft := &fakeTimer{ 77 now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), 78 c: make(chan time.Time), 79 updated: make(chan timerUpdate), 80 } 81 return ft 82} 83 84func (ft *fakeTimer) C() <-chan time.Time { 85 return ft.c 86} 87 88func (ft *fakeTimer) Reset(in time.Duration) bool { 89 ft.lock.Lock() 90 defer ft.lock.Unlock() 91 92 was := ft.active 93 ft.active = true 94 ft.timeout = ft.now.Add(in) 95 ft.updated <- timerUpdate{ 96 active: true, 97 next: in, 98 } 99 return was 100} 101 102func (ft *fakeTimer) Stop() bool { 103 ft.lock.Lock() 104 defer ft.lock.Unlock() 105 106 was := ft.active 107 ft.active = false 108 ft.updated <- timerUpdate{ 109 active: false, 110 } 111 return was 112} 113 114func (ft *fakeTimer) Now() time.Time { 115 ft.lock.Lock() 116 defer ft.lock.Unlock() 117 118 return ft.now 119} 120 121func (ft *fakeTimer) Remaining() time.Duration { 122 ft.lock.Lock() 123 defer ft.lock.Unlock() 124 125 return ft.timeout.Sub(ft.now) 126} 127 128func (ft *fakeTimer) Since(t time.Time) time.Duration { 129 ft.lock.Lock() 130 defer ft.lock.Unlock() 131 132 return ft.now.Sub(t) 133} 134 135func (ft *fakeTimer) Sleep(d time.Duration) { 136 // ft.advance grabs ft.lock 137 ft.advance(d) 138} 139 140// advance the current time. 141func (ft *fakeTimer) advance(d time.Duration) { 142 ft.lock.Lock() 143 defer ft.lock.Unlock() 144 145 ft.now = ft.now.Add(d) 146 if ft.active && !ft.now.Before(ft.timeout) { 147 ft.active = false 148 ft.c <- ft.timeout 149 } 150} 151 152// return the calling line number (for printing) 153// test the timer's state 154func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) { 155 if upd.active != active { 156 t.Fatalf("%s: expected timer active=%v", name, active) 157 } 158 if active && upd.next != next { 159 t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next) 160 } 161} 162 163// test and reset the receiver's state 164func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) { 165 triggered := receiver.reset() 166 if expected && !triggered { 167 t.Fatalf("%s: function should have been called", name) 168 } else if !expected && triggered { 169 t.Fatalf("%s: function should not have been called", name) 170 } 171} 172 173// Durations embedded in test cases depend on these. 174var minInterval = 1 * time.Second 175var maxInterval = 10 * time.Second 176 177func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) { 178 upd := <-timer.updated // wait for stop 179 checkReceiver(name, t, obj, expectCall) 180 checkReceiver(name, t, obj, false) // prove post-condition 181 checkTimer(name, t, upd, false, 0) 182 upd = <-timer.updated // wait for reset 183 checkTimer(name, t, upd, true, expectNext) 184} 185 186func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) { 187 waitForReset(name, t, timer, obj, true, maxInterval) 188} 189 190func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { 191 // It will first get reset as with a normal run, and then get set again 192 waitForRun(name, t, timer, obj) 193 waitForReset(name, t, timer, obj, false, expectNext) 194} 195 196func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) { 197 waitForReset(name, t, timer, obj, false, expectNext) 198} 199 200func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) { 201 select { 202 case <-timer.c: 203 t.Fatalf("%s: unexpected timer tick", name) 204 case upd := <-timer.updated: 205 t.Fatalf("%s: unexpected timer update %v", name, upd) 206 default: 207 } 208 checkReceiver(name, t, obj, false) 209} 210 211func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) { 212 obj := &receiver{} 213 timer := newFakeTimer() 214 runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) 215 stop := make(chan struct{}) 216 217 var upd timerUpdate 218 219 // Start. 220 go runner.Loop(stop) 221 upd = <-timer.updated // wait for initial time to be set to max 222 checkTimer("init", t, upd, true, maxInterval) 223 checkReceiver("init", t, obj, false) 224 225 // Run once, immediately. 226 // rel=0ms 227 runner.Run() 228 waitForRun("first run", t, timer, obj) 229 230 // Run again, before minInterval expires. 231 timer.advance(500 * time.Millisecond) // rel=500ms 232 runner.Run() 233 waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond) 234 235 // Run again, before minInterval expires. 236 timer.advance(499 * time.Millisecond) // rel=999ms 237 runner.Run() 238 waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond) 239 240 // Do the deferred run 241 timer.advance(1 * time.Millisecond) // rel=1000ms 242 waitForRun("second run", t, timer, obj) 243 244 // Try again immediately 245 runner.Run() 246 waitForDefer("too soon after second", t, timer, obj, 1*time.Second) 247 248 // Run again, before minInterval expires. 249 timer.advance(1 * time.Millisecond) // rel=1ms 250 runner.Run() 251 waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond) 252 253 // Ensure that we don't run again early 254 timer.advance(998 * time.Millisecond) // rel=999ms 255 waitForNothing("premature", t, timer, obj) 256 257 // Do the deferred run 258 timer.advance(1 * time.Millisecond) // rel=1000ms 259 waitForRun("third run", t, timer, obj) 260 261 // Let minInterval pass, but there are no runs queued 262 timer.advance(1 * time.Second) // rel=1000ms 263 waitForNothing("minInterval", t, timer, obj) 264 265 // Let maxInterval pass 266 timer.advance(9 * time.Second) // rel=10000ms 267 waitForRun("maxInterval", t, timer, obj) 268 269 // Run again, before minInterval expires. 270 timer.advance(1 * time.Millisecond) // rel=1ms 271 runner.Run() 272 waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond) 273 274 // Let minInterval pass 275 timer.advance(999 * time.Millisecond) // rel=1000ms 276 waitForRun("fourth run", t, timer, obj) 277 278 // Clean up. 279 stop <- struct{}{} 280} 281 282func Test_BoundedFrequencyRunnerBurst(t *testing.T) { 283 obj := &receiver{} 284 timer := newFakeTimer() 285 runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer) 286 stop := make(chan struct{}) 287 288 var upd timerUpdate 289 290 // Start. 291 go runner.Loop(stop) 292 upd = <-timer.updated // wait for initial time to be set to max 293 checkTimer("init", t, upd, true, maxInterval) 294 checkReceiver("init", t, obj, false) 295 296 // Run once, immediately. 297 // abs=0ms, rel=0ms 298 runner.Run() 299 waitForRun("first run", t, timer, obj) 300 301 // Run again, before minInterval expires, with burst. 302 timer.advance(1 * time.Millisecond) // abs=1ms, rel=1ms 303 runner.Run() 304 waitForRun("second run", t, timer, obj) 305 306 // Run again, before minInterval expires. 307 timer.advance(498 * time.Millisecond) // abs=499ms, rel=498ms 308 runner.Run() 309 waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond) 310 311 // Run again, before minInterval expires. 312 timer.advance(1 * time.Millisecond) // abs=500ms, rel=499ms 313 runner.Run() 314 waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond) 315 316 // Run again, before minInterval expires. 317 timer.advance(1 * time.Millisecond) // abs=501ms, rel=500ms 318 runner.Run() 319 waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond) 320 321 // Advance timer enough to replenish bursts, but not enough to be minInterval 322 // after the last run 323 timer.advance(499 * time.Millisecond) // abs=1000ms, rel=999ms 324 waitForNothing("not minInterval", t, timer, obj) 325 runner.Run() 326 waitForRun("third run", t, timer, obj) 327 328 // Run again, before minInterval expires. 329 timer.advance(1 * time.Millisecond) // abs=1001ms, rel=1ms 330 runner.Run() 331 waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond) 332 333 // Run again, before minInterval expires. 334 timer.advance(998 * time.Millisecond) // abs=1999ms, rel=999ms 335 runner.Run() 336 waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond) 337 338 // Advance and do the deferred run 339 timer.advance(1 * time.Millisecond) // abs=2000ms, rel=1000ms 340 waitForRun("fourth run", t, timer, obj) 341 342 // Run again, once burst has fully replenished. 343 timer.advance(2 * time.Second) // abs=4000ms, rel=2000ms 344 runner.Run() 345 waitForRun("fifth run", t, timer, obj) 346 runner.Run() 347 waitForRun("sixth run", t, timer, obj) 348 runner.Run() 349 waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second) 350 351 // Wait until minInterval after the last run 352 timer.advance(1 * time.Second) // abs=5000ms, rel=1000ms 353 waitForRun("seventh run", t, timer, obj) 354 355 // Wait for maxInterval 356 timer.advance(10 * time.Second) // abs=15000ms, rel=10000ms 357 waitForRun("maxInterval", t, timer, obj) 358 359 // Clean up. 360 stop <- struct{}{} 361} 362 363func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) { 364 obj := &receiver{} 365 timer := newFakeTimer() 366 runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer) 367 stop := make(chan struct{}) 368 369 var upd timerUpdate 370 371 // Start. 372 go runner.Loop(stop) 373 upd = <-timer.updated // wait for initial time to be set to max 374 checkTimer("init", t, upd, true, maxInterval) 375 checkReceiver("init", t, obj, false) 376 377 // Run once, immediately, and queue a retry 378 // rel=0ms 379 obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) }) 380 runner.Run() 381 waitForRunWithRetry("first run", t, timer, obj, 5*time.Second) 382 383 // Nothing happens... 384 timer.advance(time.Second) // rel=1000ms 385 waitForNothing("minInterval, nothing queued", t, timer, obj) 386 387 // After retryInterval, function is called 388 timer.advance(4 * time.Second) // rel=5000ms 389 waitForRun("retry", t, timer, obj) 390 391 // Run again, before minInterval expires. 392 timer.advance(499 * time.Millisecond) // rel=499ms 393 runner.Run() 394 waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond) 395 396 // Do the deferred run, queue another retry after it returns 397 timer.advance(501 * time.Millisecond) // rel=1000ms 398 runner.RetryAfter(5 * time.Second) 399 waitForRunWithRetry("second run", t, timer, obj, 5*time.Second) 400 401 // Wait for minInterval to pass 402 timer.advance(time.Second) // rel=1000ms 403 waitForNothing("minInterval, nothing queued", t, timer, obj) 404 405 // Now do another run 406 runner.Run() 407 waitForRun("third run", t, timer, obj) 408 409 // Retry was cancelled because we already ran 410 timer.advance(4 * time.Second) 411 waitForNothing("retry cancelled", t, timer, obj) 412 413 // Run, queue a retry from a goroutine 414 obj.setRetryFn(func() { 415 go func() { 416 time.Sleep(100 * time.Millisecond) 417 runner.RetryAfter(5 * time.Second) 418 }() 419 }) 420 runner.Run() 421 waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second) 422 423 // Call Run again before minInterval passes 424 timer.advance(100 * time.Millisecond) // rel=100ms 425 runner.Run() 426 waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond) 427 428 // Deferred run will run after minInterval passes 429 timer.advance(900 * time.Millisecond) // rel=1000ms 430 waitForRun("fifth run", t, timer, obj) 431 432 // Retry was cancelled because we already ran 433 timer.advance(4 * time.Second) // rel=4s since run, 5s since RetryAfter 434 waitForNothing("retry cancelled", t, timer, obj) 435 436 // Rerun happens after maxInterval 437 timer.advance(5 * time.Second) // rel=9s since run, 10s since RetryAfter 438 waitForNothing("premature", t, timer, obj) 439 timer.advance(time.Second) // rel=10s since run 440 waitForRun("maxInterval", t, timer, obj) 441 442 // Clean up. 443 stop <- struct{}{} 444} 445