1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package queueset 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "math" 24 "reflect" 25 "sort" 26 "sync/atomic" 27 "testing" 28 "time" 29 30 "k8s.io/apimachinery/pkg/util/clock" 31 "k8s.io/apiserver/pkg/util/flowcontrol/counter" 32 fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" 33 test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing" 34 testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock" 35 "k8s.io/apiserver/pkg/util/flowcontrol/metrics" 36 fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" 37 "k8s.io/klog/v2" 38) 39 40// fairAlloc computes the max-min fair allocation of the given 41// capacity to the given demands (which slice is not side-effected). 42func fairAlloc(demands []float64, capacity float64) []float64 { 43 count := len(demands) 44 indices := make([]int, count) 45 for i := 0; i < count; i++ { 46 indices[i] = i 47 } 48 sort.Slice(indices, func(i, j int) bool { return demands[indices[i]] < demands[indices[j]] }) 49 alloc := make([]float64, count) 50 var next int 51 var prevAlloc float64 52 for ; next < count; next++ { 53 // `capacity` is how much remains assuming that 54 // all unvisited items get `prevAlloc`. 55 idx := indices[next] 56 demand := demands[idx] 57 if demand <= 0 { 58 continue 59 } 60 // `fullCapacityBite` is how much more capacity would be used 61 // if this and all following items get as much as this one 62 // is demanding. 63 fullCapacityBite := float64(count-next) * (demand - prevAlloc) 64 if fullCapacityBite > capacity { 65 break 66 } 67 prevAlloc = demand 68 alloc[idx] = demand 69 capacity -= fullCapacityBite 70 } 71 for j := next; j < count; j++ { 72 alloc[indices[j]] = prevAlloc + capacity/float64(count-next) 73 } 74 return alloc 75} 76 77func TestFairAlloc(t *testing.T) { 78 if e, a := []float64{0, 0}, fairAlloc([]float64{0, 0}, 42); !reflect.DeepEqual(e, a) { 79 t.Errorf("Expected %#+v, got #%+v", e, a) 80 } 81 if e, a := []float64{42, 0}, fairAlloc([]float64{47, 0}, 42); !reflect.DeepEqual(e, a) { 82 t.Errorf("Expected %#+v, got #%+v", e, a) 83 } 84 if e, a := []float64{1, 41}, fairAlloc([]float64{1, 47}, 42); !reflect.DeepEqual(e, a) { 85 t.Errorf("Expected %#+v, got #%+v", e, a) 86 } 87 if e, a := []float64{3, 5, 5, 1}, fairAlloc([]float64{3, 7, 9, 1}, 14); !reflect.DeepEqual(e, a) { 88 t.Errorf("Expected %#+v, got #%+v", e, a) 89 } 90 if e, a := []float64{1, 9, 7, 3}, fairAlloc([]float64{1, 9, 7, 3}, 21); !reflect.DeepEqual(e, a) { 91 t.Errorf("Expected %#+v, got #%+v", e, a) 92 } 93} 94 95type uniformClient struct { 96 hash uint64 97 nThreads int 98 nCalls int 99 // duration for a simulated synchronous call 100 execDuration time.Duration 101 // duration for simulated "other work". This can be negative, 102 // causing a request to be launched a certain amount of time 103 // before the previous one finishes. 104 thinkDuration time.Duration 105 // When true indicates that only half the specified number of 106 // threads should run during the first half of the evaluation 107 // period 108 split bool 109} 110 111// uniformScenario describes a scenario based on the given set of uniform clients. 112// Each uniform client specifies a number of threads, each of which alternates between thinking 113// and making a synchronous request through the QueueSet. 114// The test measures how much concurrency each client got, on average, over 115// the initial evalDuration and tests to see whether they all got about the fair amount. 116// Each client needs to be demanding enough to use more than its fair share, 117// or overall care needs to be taken about timing so that scheduling details 118// do not cause any client to actually request a significantly smaller share 119// than it theoretically should. 120// expectFair indicate whether the QueueSet is expected to be 121// fair in the respective halves of a split scenario; 122// in a non-split scenario this is a singleton with one expectation. 123// expectAllRequests indicates whether all requests are expected to get dispatched. 124type uniformScenario struct { 125 name string 126 qs fq.QueueSet 127 clients []uniformClient 128 concurrencyLimit int 129 evalDuration time.Duration 130 expectedFair []bool 131 expectedFairnessMargin []float64 132 expectAllRequests bool 133 evalInqueueMetrics, evalExecutingMetrics bool 134 rejectReason string 135 clk *testclock.FakeEventClock 136 counter counter.GoRoutineCounter 137} 138 139func (us uniformScenario) exercise(t *testing.T) { 140 uss := uniformScenarioState{ 141 t: t, 142 uniformScenario: us, 143 startTime: time.Now(), 144 integrators: make([]fq.Integrator, len(us.clients)), 145 executions: make([]int32, len(us.clients)), 146 rejects: make([]int32, len(us.clients)), 147 } 148 for _, uc := range us.clients { 149 uss.doSplit = uss.doSplit || uc.split 150 } 151 uss.exercise() 152} 153 154type uniformScenarioState struct { 155 t *testing.T 156 uniformScenario 157 startTime time.Time 158 doSplit bool 159 integrators []fq.Integrator 160 failedCount uint64 161 expectedInqueue, expectedExecuting, expectedConcurrencyInUse string 162 executions, rejects []int32 163} 164 165func (uss *uniformScenarioState) exercise() { 166 uss.t.Logf("%s: Start %s, doSplit=%v, clk=%p, grc=%p", uss.startTime.Format(nsTimeFmt), uss.name, uss.doSplit, uss.clk, uss.counter) 167 if uss.evalInqueueMetrics || uss.evalExecutingMetrics { 168 metrics.Reset() 169 } 170 for i, uc := range uss.clients { 171 uss.integrators[i] = fq.NewIntegrator(uss.clk) 172 fsName := fmt.Sprintf("client%d", i) 173 uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(` apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") 174 for j := 0; j < uc.nThreads; j++ { 175 ust := uniformScenarioThread{ 176 uss: uss, 177 i: i, 178 j: j, 179 nCalls: uc.nCalls, 180 uc: uc, 181 igr: uss.integrators[i], 182 fsName: fsName, 183 } 184 ust.start() 185 } 186 } 187 if uss.doSplit { 188 uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectedFair[0], uss.expectedFairnessMargin[0]) 189 } 190 uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectedFair[len(uss.expectedFair)-1], uss.expectedFairnessMargin[len(uss.expectedFairnessMargin)-1]) 191 uss.clk.Run(nil) 192 uss.finalReview() 193} 194 195type uniformScenarioThread struct { 196 uss *uniformScenarioState 197 i, j int 198 nCalls int 199 uc uniformClient 200 igr fq.Integrator 201 fsName string 202} 203 204func (ust *uniformScenarioThread) start() { 205 initialDelay := time.Duration(11*ust.j + 2*ust.i) 206 if ust.uc.split && ust.j >= ust.uc.nThreads/2 { 207 initialDelay += ust.uss.evalDuration / 2 208 ust.nCalls = ust.nCalls / 2 209 } 210 ust.uss.clk.EventAfterDuration(ust.genCallK(0), initialDelay) 211} 212 213// generates an EventFunc that forks a goroutine to do call k 214func (ust *uniformScenarioThread) genCallK(k int) func(time.Time) { 215 return func(time.Time) { 216 // As an EventFunc, this has to return without waiting 217 // for time to pass, and so cannot do callK(k) itself. 218 ust.uss.counter.Add(1) 219 go func() { 220 ust.callK(k) 221 ust.uss.counter.Add(-1) 222 }() 223 } 224} 225 226func (ust *uniformScenarioThread) callK(k int) { 227 if k >= ust.nCalls { 228 return 229 } 230 req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.Width{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil) 231 ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle) 232 if req == nil { 233 atomic.AddUint64(&ust.uss.failedCount, 1) 234 atomic.AddInt32(&ust.uss.rejects[ust.i], 1) 235 return 236 } 237 if idle { 238 ust.uss.t.Error("got request but QueueSet reported idle") 239 } 240 var executed bool 241 idle2 := req.Finish(func() { 242 executed = true 243 execStart := ust.uss.clk.Now() 244 ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k) 245 atomic.AddInt32(&ust.uss.executions[ust.i], 1) 246 ust.igr.Add(1) 247 ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration) 248 ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration) 249 ust.igr.Add(-1) 250 }) 251 ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2) 252 if !executed { 253 atomic.AddUint64(&ust.uss.failedCount, 1) 254 atomic.AddInt32(&ust.uss.rejects[ust.i], 1) 255 } 256} 257 258func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) { 259 uss.clk.Run(&lim) 260 uss.clk.SetTime(lim) 261 if uss.doSplit && !last { 262 uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt)) 263 } else { 264 uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt)) 265 } 266 demands := make([]float64, len(uss.clients)) 267 averages := make([]float64, len(uss.clients)) 268 for i, uc := range uss.clients { 269 nThreads := uc.nThreads 270 if uc.split && !last { 271 nThreads = nThreads / 2 272 } 273 demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration) 274 averages[i] = uss.integrators[i].Reset().Average 275 } 276 fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit)) 277 for i := range uss.clients { 278 var gotFair bool 279 if fairAverages[i] > 0 { 280 relDiff := (averages[i] - fairAverages[i]) / fairAverages[i] 281 gotFair = math.Abs(relDiff) <= margin 282 } else { 283 gotFair = math.Abs(averages[i]) <= margin 284 } 285 286 if gotFair != expectFair { 287 uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) 288 } else { 289 uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i]) 290 } 291 } 292} 293 294func (uss *uniformScenarioState) finalReview() { 295 if uss.expectAllRequests && uss.failedCount > 0 { 296 uss.t.Errorf("Expected all requests to be successful but got %v failed requests", uss.failedCount) 297 } else if !uss.expectAllRequests && uss.failedCount == 0 { 298 uss.t.Errorf("Expected failed requests but all requests succeeded") 299 } 300 if uss.evalInqueueMetrics { 301 e := ` 302 # HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system 303 # TYPE apiserver_flowcontrol_current_inqueue_requests gauge 304` + uss.expectedInqueue 305 err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests") 306 if err != nil { 307 uss.t.Error(err) 308 } else { 309 uss.t.Log("Success with" + e) 310 } 311 } 312 expectedRejects := "" 313 for i := range uss.clients { 314 fsName := fmt.Sprintf("client%d", i) 315 if atomic.AddInt32(&uss.executions[i], 0) > 0 { 316 uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(` apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") 317 uss.expectedConcurrencyInUse = uss.expectedConcurrencyInUse + fmt.Sprintf(` apiserver_flowcontrol_request_concurrency_in_use{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n") 318 } 319 if atomic.AddInt32(&uss.rejects[i], 0) > 0 { 320 expectedRejects = expectedRejects + fmt.Sprintf(` apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n") 321 } 322 } 323 if uss.evalExecutingMetrics && len(uss.expectedExecuting) > 0 { 324 e := ` 325 # HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system 326 # TYPE apiserver_flowcontrol_current_executing_requests gauge 327` + uss.expectedExecuting 328 err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests") 329 if err != nil { 330 uss.t.Error(err) 331 } else { 332 uss.t.Log("Success with" + e) 333 } 334 } 335 if uss.evalExecutingMetrics && len(uss.expectedConcurrencyInUse) > 0 { 336 e := ` 337 # HELP apiserver_flowcontrol_request_concurrency_in_use [ALPHA] Concurrency (number of seats) occupided by the currently executing requests in the API Priority and Fairness system 338 # TYPE apiserver_flowcontrol_request_concurrency_in_use gauge 339` + uss.expectedConcurrencyInUse 340 err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_request_concurrency_in_use") 341 if err != nil { 342 uss.t.Error(err) 343 } else { 344 uss.t.Log("Success with" + e) 345 } 346 } 347 if uss.evalExecutingMetrics && len(expectedRejects) > 0 { 348 e := ` 349 # HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system 350 # TYPE apiserver_flowcontrol_rejected_requests_total counter 351` + expectedRejects 352 err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total") 353 if err != nil { 354 uss.t.Error(err) 355 } else { 356 uss.t.Log("Success with" + e) 357 } 358 } 359} 360 361func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) { 362 dunch := make(chan struct{}) 363 clk.EventAfterDuration(func(time.Time) { 364 counter.Add(1) 365 close(dunch) 366 }, duration) 367 counter.Add(-1) 368 <-dunch 369} 370 371func init() { 372 klog.InitFlags(nil) 373} 374 375// TestNoRestraint tests whether the no-restraint factory gives every client what it asks for 376func TestNoRestraint(t *testing.T) { 377 metrics.Register() 378 now := time.Now() 379 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 380 nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk)) 381 if err != nil { 382 t.Fatal(err) 383 } 384 nr := nrc.Complete(fq.DispatchingConfig{}) 385 uniformScenario{name: "NoRestraint", 386 qs: nr, 387 clients: []uniformClient{ 388 {1001001001, 5, 10, time.Second, time.Second, false}, 389 {2002002002, 2, 10, time.Second, time.Second / 2, false}, 390 }, 391 concurrencyLimit: 10, 392 evalDuration: time.Second * 15, 393 expectedFair: []bool{true}, 394 expectedFairnessMargin: []float64{0.1}, 395 expectAllRequests: true, 396 clk: clk, 397 counter: counter, 398 }.exercise(t) 399} 400 401func TestUniformFlowsHandSize1(t *testing.T) { 402 metrics.Register() 403 now := time.Now() 404 405 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 406 qsf := NewQueueSetFactory(clk, counter) 407 qCfg := fq.QueuingConfig{ 408 Name: "TestUniformFlowsHandSize1", 409 DesiredNumQueues: 9, 410 QueueLengthLimit: 8, 411 HandSize: 1, 412 RequestWaitLimit: 10 * time.Minute, 413 } 414 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 415 if err != nil { 416 t.Fatal(err) 417 } 418 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) 419 420 uniformScenario{name: qCfg.Name, 421 qs: qs, 422 clients: []uniformClient{ 423 {1001001001, 8, 20, time.Second, time.Second - 1, false}, 424 {2002002002, 8, 20, time.Second, time.Second - 1, false}, 425 }, 426 concurrencyLimit: 4, 427 evalDuration: time.Second * 50, 428 expectedFair: []bool{true}, 429 expectedFairnessMargin: []float64{0.1}, 430 expectAllRequests: true, 431 evalInqueueMetrics: true, 432 evalExecutingMetrics: true, 433 clk: clk, 434 counter: counter, 435 }.exercise(t) 436} 437 438func TestUniformFlowsHandSize3(t *testing.T) { 439 metrics.Register() 440 now := time.Now() 441 442 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 443 qsf := NewQueueSetFactory(clk, counter) 444 qCfg := fq.QueuingConfig{ 445 Name: "TestUniformFlowsHandSize3", 446 DesiredNumQueues: 8, 447 QueueLengthLimit: 4, 448 HandSize: 3, 449 RequestWaitLimit: 10 * time.Minute, 450 } 451 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 452 if err != nil { 453 t.Fatal(err) 454 } 455 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) 456 uniformScenario{name: qCfg.Name, 457 qs: qs, 458 clients: []uniformClient{ 459 {1001001001, 8, 30, time.Second, time.Second - 1, false}, 460 {2002002002, 8, 30, time.Second, time.Second - 1, false}, 461 }, 462 concurrencyLimit: 4, 463 evalDuration: time.Second * 60, 464 expectedFair: []bool{true}, 465 expectedFairnessMargin: []float64{0.1}, 466 expectAllRequests: true, 467 evalInqueueMetrics: true, 468 evalExecutingMetrics: true, 469 clk: clk, 470 counter: counter, 471 }.exercise(t) 472} 473 474func TestDifferentFlowsExpectEqual(t *testing.T) { 475 metrics.Register() 476 now := time.Now() 477 478 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 479 qsf := NewQueueSetFactory(clk, counter) 480 qCfg := fq.QueuingConfig{ 481 Name: "DiffFlowsExpectEqual", 482 DesiredNumQueues: 9, 483 QueueLengthLimit: 8, 484 HandSize: 1, 485 RequestWaitLimit: 10 * time.Minute, 486 } 487 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 488 if err != nil { 489 t.Fatal(err) 490 } 491 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) 492 493 uniformScenario{name: qCfg.Name, 494 qs: qs, 495 clients: []uniformClient{ 496 {1001001001, 8, 20, time.Second, time.Second, false}, 497 {2002002002, 7, 30, time.Second, time.Second / 2, false}, 498 }, 499 concurrencyLimit: 4, 500 evalDuration: time.Second * 40, 501 expectedFair: []bool{true}, 502 expectedFairnessMargin: []float64{0.1}, 503 expectAllRequests: true, 504 evalInqueueMetrics: true, 505 evalExecutingMetrics: true, 506 clk: clk, 507 counter: counter, 508 }.exercise(t) 509} 510 511func TestDifferentFlowsExpectUnequal(t *testing.T) { 512 metrics.Register() 513 now := time.Now() 514 515 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 516 qsf := NewQueueSetFactory(clk, counter) 517 qCfg := fq.QueuingConfig{ 518 Name: "DiffFlowsExpectUnequal", 519 DesiredNumQueues: 9, 520 QueueLengthLimit: 6, 521 HandSize: 1, 522 RequestWaitLimit: 10 * time.Minute, 523 } 524 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 525 if err != nil { 526 t.Fatal(err) 527 } 528 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3}) 529 530 uniformScenario{name: qCfg.Name, 531 qs: qs, 532 clients: []uniformClient{ 533 {1001001001, 4, 20, time.Second, time.Second - 1, false}, 534 {2002002002, 2, 20, time.Second, time.Second - 1, false}, 535 }, 536 concurrencyLimit: 3, 537 evalDuration: time.Second * 20, 538 expectedFair: []bool{true}, 539 expectedFairnessMargin: []float64{0.1}, 540 expectAllRequests: true, 541 evalInqueueMetrics: true, 542 evalExecutingMetrics: true, 543 clk: clk, 544 counter: counter, 545 }.exercise(t) 546} 547 548func TestWindup(t *testing.T) { 549 metrics.Register() 550 now := time.Now() 551 552 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 553 qsf := NewQueueSetFactory(clk, counter) 554 qCfg := fq.QueuingConfig{ 555 Name: "TestWindup", 556 DesiredNumQueues: 9, 557 QueueLengthLimit: 6, 558 HandSize: 1, 559 RequestWaitLimit: 10 * time.Minute, 560 } 561 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 562 if err != nil { 563 t.Fatal(err) 564 } 565 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3}) 566 567 uniformScenario{name: qCfg.Name, qs: qs, 568 clients: []uniformClient{ 569 {1001001001, 2, 40, time.Second, -1, false}, 570 {2002002002, 2, 40, time.Second, -1, true}, 571 }, 572 concurrencyLimit: 3, 573 evalDuration: time.Second * 40, 574 expectedFair: []bool{true, true}, 575 expectedFairnessMargin: []float64{0.1, 0.26}, 576 expectAllRequests: true, 577 evalInqueueMetrics: true, 578 evalExecutingMetrics: true, 579 clk: clk, 580 counter: counter, 581 }.exercise(t) 582} 583 584func TestDifferentFlowsWithoutQueuing(t *testing.T) { 585 metrics.Register() 586 now := time.Now() 587 588 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 589 qsf := NewQueueSetFactory(clk, counter) 590 qCfg := fq.QueuingConfig{ 591 Name: "TestDifferentFlowsWithoutQueuing", 592 DesiredNumQueues: 0, 593 } 594 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 595 if err != nil { 596 t.Fatal(err) 597 } 598 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4}) 599 600 uniformScenario{name: qCfg.Name, 601 qs: qs, 602 clients: []uniformClient{ 603 {1001001001, 6, 10, time.Second, 57 * time.Millisecond, false}, 604 {2002002002, 4, 15, time.Second, 750 * time.Millisecond, false}, 605 }, 606 concurrencyLimit: 4, 607 evalDuration: time.Second * 13, 608 expectedFair: []bool{false}, 609 expectedFairnessMargin: []float64{0.1}, 610 evalExecutingMetrics: true, 611 rejectReason: "concurrency-limit", 612 clk: clk, 613 counter: counter, 614 }.exercise(t) 615} 616 617func TestTimeout(t *testing.T) { 618 metrics.Register() 619 now := time.Now() 620 621 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 622 qsf := NewQueueSetFactory(clk, counter) 623 qCfg := fq.QueuingConfig{ 624 Name: "TestTimeout", 625 DesiredNumQueues: 128, 626 QueueLengthLimit: 128, 627 HandSize: 1, 628 RequestWaitLimit: 0, 629 } 630 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 631 if err != nil { 632 t.Fatal(err) 633 } 634 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) 635 636 uniformScenario{name: qCfg.Name, 637 qs: qs, 638 clients: []uniformClient{ 639 {1001001001, 5, 100, time.Second, time.Second, false}, 640 }, 641 concurrencyLimit: 1, 642 evalDuration: time.Second * 10, 643 expectedFair: []bool{true}, 644 expectedFairnessMargin: []float64{0.1}, 645 evalInqueueMetrics: true, 646 evalExecutingMetrics: true, 647 rejectReason: "time-out", 648 clk: clk, 649 counter: counter, 650 }.exercise(t) 651} 652 653func TestContextCancel(t *testing.T) { 654 metrics.Register() 655 metrics.Reset() 656 now := time.Now() 657 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 658 qsf := NewQueueSetFactory(clk, counter) 659 qCfg := fq.QueuingConfig{ 660 Name: "TestContextCancel", 661 DesiredNumQueues: 11, 662 QueueLengthLimit: 11, 663 HandSize: 1, 664 RequestWaitLimit: 15 * time.Second, 665 } 666 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 667 if err != nil { 668 t.Fatal(err) 669 } 670 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) 671 counter.Add(1) // account for the goroutine running this test 672 ctx1 := context.Background() 673 b2i := map[bool]int{false: 0, true: 1} 674 var qnc [2][2]int32 675 req1, _ := qs.StartRequest(ctx1, &fcrequest.Width{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) }) 676 if req1 == nil { 677 t.Error("Request rejected") 678 return 679 } 680 if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 { 681 t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a) 682 } 683 if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 { 684 t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a) 685 } 686 var executed1 bool 687 idle1 := req1.Finish(func() { 688 executed1 = true 689 ctx2, cancel2 := context.WithCancel(context.Background()) 690 tBefore := time.Now() 691 go func() { 692 time.Sleep(time.Second) 693 if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 { 694 t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a) 695 } 696 if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 { 697 t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a) 698 } 699 // account for unblocking the goroutine that waits on cancelation 700 counter.Add(1) 701 cancel2() 702 }() 703 req2, idle2a := qs.StartRequest(ctx2, &fcrequest.Width{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) }) 704 if idle2a { 705 t.Error("2nd StartRequest returned idle") 706 } 707 if req2 != nil { 708 idle2b := req2.Finish(func() { 709 t.Error("Executing req2") 710 }) 711 if idle2b { 712 t.Error("2nd Finish returned idle") 713 } 714 if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 { 715 t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a) 716 } 717 } 718 tAfter := time.Now() 719 dt := tAfter.Sub(tBefore) 720 if dt < time.Second || dt > 2*time.Second { 721 t.Errorf("Unexpected: dt=%d", dt) 722 } 723 }) 724 if !executed1 { 725 t.Errorf("Unexpected: executed1=%v", executed1) 726 } 727 if !idle1 { 728 t.Error("Not idle at the end") 729 } 730} 731 732func TestTotalRequestsExecutingWithPanic(t *testing.T) { 733 metrics.Register() 734 metrics.Reset() 735 now := time.Now() 736 clk, counter := testclock.NewFakeEventClock(now, 0, nil) 737 qsf := NewQueueSetFactory(clk, counter) 738 qCfg := fq.QueuingConfig{ 739 Name: "TestTotalRequestsExecutingWithPanic", 740 DesiredNumQueues: 0, 741 RequestWaitLimit: 15 * time.Second, 742 } 743 qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk)) 744 if err != nil { 745 t.Fatal(err) 746 } 747 qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1}) 748 counter.Add(1) // account for the goroutine running this test 749 750 queue, ok := qs.(*queueSet) 751 if !ok { 752 t.Fatalf("expected a QueueSet of type: %T but got: %T", &queueSet{}, qs) 753 } 754 if queue.totRequestsExecuting != 0 { 755 t.Fatalf("precondition: expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting) 756 } 757 if queue.dCfg.ConcurrencyLimit != 1 { 758 t.Fatalf("precondition: expected concurrency limit of the QueueSet to be 1, but got: %d", queue.dCfg.ConcurrencyLimit) 759 } 760 761 ctx := context.Background() 762 req, _ := qs.StartRequest(ctx, &fcrequest.Width{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {}) 763 if req == nil { 764 t.Fatal("expected a Request object from StartRequest, but got nil") 765 } 766 767 panicErrExpected := errors.New("apiserver panic'd") 768 var panicErrGot interface{} 769 func() { 770 defer func() { 771 panicErrGot = recover() 772 }() 773 774 req.Finish(func() { 775 // verify that total requests executing goes up by 1 since the request is executing. 776 if queue.totRequestsExecuting != 1 { 777 t.Fatalf("expected total requests currently executing of the QueueSet to be 1, but got: %d", queue.totRequestsExecuting) 778 } 779 780 panic(panicErrExpected) 781 }) 782 }() 783 784 // verify that the panic was from us (above) 785 if panicErrExpected != panicErrGot { 786 t.Errorf("expected panic error: %#v, but got: %#v", panicErrExpected, panicErrGot) 787 } 788 if queue.totRequestsExecuting != 0 { 789 t.Errorf("expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting) 790 } 791} 792 793func TestSelectQueueLocked(t *testing.T) { 794 var G float64 = 60 795 tests := []struct { 796 name string 797 robinIndex int 798 concurrencyLimit int 799 totSeatsInUse int 800 queues []*queue 801 attempts int 802 beforeSelectQueueLocked func(attempt int, qs *queueSet) 803 minQueueIndexExpected []int 804 robinIndexExpected []int 805 }{ 806 { 807 name: "width=1, seats are available, the queue with less virtual start time wins", 808 concurrencyLimit: 1, 809 totSeatsInUse: 0, 810 robinIndex: -1, 811 queues: []*queue{ 812 { 813 virtualStart: 200, 814 requests: newFIFO( 815 &request{width: fcrequest.Width{Seats: 1}}, 816 ), 817 }, 818 { 819 virtualStart: 100, 820 requests: newFIFO( 821 &request{width: fcrequest.Width{Seats: 1}}, 822 ), 823 }, 824 }, 825 attempts: 1, 826 minQueueIndexExpected: []int{1}, 827 robinIndexExpected: []int{1}, 828 }, 829 { 830 name: "width=1, all seats are occupied, no queue is picked", 831 concurrencyLimit: 1, 832 totSeatsInUse: 1, 833 robinIndex: -1, 834 queues: []*queue{ 835 { 836 virtualStart: 200, 837 requests: newFIFO( 838 &request{width: fcrequest.Width{Seats: 1}}, 839 ), 840 }, 841 }, 842 attempts: 1, 843 minQueueIndexExpected: []int{-1}, 844 robinIndexExpected: []int{0}, 845 }, 846 { 847 name: "width > 1, seats are available for request with the least finish time, queue is picked", 848 concurrencyLimit: 50, 849 totSeatsInUse: 25, 850 robinIndex: -1, 851 queues: []*queue{ 852 { 853 virtualStart: 200, 854 requests: newFIFO( 855 &request{width: fcrequest.Width{Seats: 50}}, 856 ), 857 }, 858 { 859 virtualStart: 100, 860 requests: newFIFO( 861 &request{width: fcrequest.Width{Seats: 25}}, 862 ), 863 }, 864 }, 865 attempts: 1, 866 minQueueIndexExpected: []int{1}, 867 robinIndexExpected: []int{1}, 868 }, 869 { 870 name: "width > 1, seats are not available for request with the least finish time, queue is not picked", 871 concurrencyLimit: 50, 872 totSeatsInUse: 26, 873 robinIndex: -1, 874 queues: []*queue{ 875 { 876 virtualStart: 200, 877 requests: newFIFO( 878 &request{width: fcrequest.Width{Seats: 10}}, 879 ), 880 }, 881 { 882 virtualStart: 100, 883 requests: newFIFO( 884 &request{width: fcrequest.Width{Seats: 25}}, 885 ), 886 }, 887 }, 888 attempts: 3, 889 minQueueIndexExpected: []int{-1, -1, -1}, 890 robinIndexExpected: []int{1, 1, 1}, 891 }, 892 { 893 name: "width > 1, seats become available before 3rd attempt, queue is picked", 894 concurrencyLimit: 50, 895 totSeatsInUse: 26, 896 robinIndex: -1, 897 queues: []*queue{ 898 { 899 virtualStart: 200, 900 requests: newFIFO( 901 &request{width: fcrequest.Width{Seats: 10}}, 902 ), 903 }, 904 { 905 virtualStart: 100, 906 requests: newFIFO( 907 &request{width: fcrequest.Width{Seats: 25}}, 908 ), 909 }, 910 }, 911 beforeSelectQueueLocked: func(attempt int, qs *queueSet) { 912 if attempt == 3 { 913 qs.totSeatsInUse = 25 914 } 915 }, 916 attempts: 3, 917 minQueueIndexExpected: []int{-1, -1, 1}, 918 robinIndexExpected: []int{1, 1, 1}, 919 }, 920 } 921 922 for _, test := range tests { 923 t.Run(test.name, func(t *testing.T) { 924 qs := &queueSet{ 925 estimatedServiceTime: G, 926 robinIndex: test.robinIndex, 927 totSeatsInUse: test.totSeatsInUse, 928 dCfg: fq.DispatchingConfig{ 929 ConcurrencyLimit: test.concurrencyLimit, 930 }, 931 queues: test.queues, 932 } 933 934 t.Logf("QS: robin index=%d, seats in use=%d limit=%d", qs.robinIndex, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit) 935 936 for i := 0; i < test.attempts; i++ { 937 attempt := i + 1 938 if test.beforeSelectQueueLocked != nil { 939 test.beforeSelectQueueLocked(attempt, qs) 940 } 941 942 var minQueueExpected *queue 943 if queueIdx := test.minQueueIndexExpected[i]; queueIdx >= 0 { 944 minQueueExpected = test.queues[queueIdx] 945 } 946 947 minQueueGot := qs.selectQueueLocked() 948 if minQueueExpected != minQueueGot { 949 t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot) 950 } 951 952 robinIndexExpected := test.robinIndexExpected[i] 953 if robinIndexExpected != qs.robinIndex { 954 t.Errorf("Expected robin index: %d for attempt: %d, but got: %d", robinIndexExpected, attempt, qs.robinIndex) 955 } 956 } 957 }) 958 } 959} 960 961func newFIFO(requests ...*request) fifo { 962 l := newRequestFIFO() 963 for i := range requests { 964 l.Enqueue(requests[i]) 965 } 966 return l 967} 968 969func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair { 970 return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"}) 971} 972