1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package queueset
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"math"
24	"reflect"
25	"sort"
26	"sync/atomic"
27	"testing"
28	"time"
29
30	"k8s.io/apimachinery/pkg/util/clock"
31	"k8s.io/apiserver/pkg/util/flowcontrol/counter"
32	fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
33	test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
34	testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
35	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
36	fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
37	"k8s.io/klog/v2"
38)
39
40// fairAlloc computes the max-min fair allocation of the given
41// capacity to the given demands (which slice is not side-effected).
42func fairAlloc(demands []float64, capacity float64) []float64 {
43	count := len(demands)
44	indices := make([]int, count)
45	for i := 0; i < count; i++ {
46		indices[i] = i
47	}
48	sort.Slice(indices, func(i, j int) bool { return demands[indices[i]] < demands[indices[j]] })
49	alloc := make([]float64, count)
50	var next int
51	var prevAlloc float64
52	for ; next < count; next++ {
53		// `capacity` is how much remains assuming that
54		// all unvisited items get `prevAlloc`.
55		idx := indices[next]
56		demand := demands[idx]
57		if demand <= 0 {
58			continue
59		}
60		// `fullCapacityBite` is how much more capacity would be used
61		// if this and all following items get as much as this one
62		// is demanding.
63		fullCapacityBite := float64(count-next) * (demand - prevAlloc)
64		if fullCapacityBite > capacity {
65			break
66		}
67		prevAlloc = demand
68		alloc[idx] = demand
69		capacity -= fullCapacityBite
70	}
71	for j := next; j < count; j++ {
72		alloc[indices[j]] = prevAlloc + capacity/float64(count-next)
73	}
74	return alloc
75}
76
77func TestFairAlloc(t *testing.T) {
78	if e, a := []float64{0, 0}, fairAlloc([]float64{0, 0}, 42); !reflect.DeepEqual(e, a) {
79		t.Errorf("Expected %#+v, got #%+v", e, a)
80	}
81	if e, a := []float64{42, 0}, fairAlloc([]float64{47, 0}, 42); !reflect.DeepEqual(e, a) {
82		t.Errorf("Expected %#+v, got #%+v", e, a)
83	}
84	if e, a := []float64{1, 41}, fairAlloc([]float64{1, 47}, 42); !reflect.DeepEqual(e, a) {
85		t.Errorf("Expected %#+v, got #%+v", e, a)
86	}
87	if e, a := []float64{3, 5, 5, 1}, fairAlloc([]float64{3, 7, 9, 1}, 14); !reflect.DeepEqual(e, a) {
88		t.Errorf("Expected %#+v, got #%+v", e, a)
89	}
90	if e, a := []float64{1, 9, 7, 3}, fairAlloc([]float64{1, 9, 7, 3}, 21); !reflect.DeepEqual(e, a) {
91		t.Errorf("Expected %#+v, got #%+v", e, a)
92	}
93}
94
95type uniformClient struct {
96	hash     uint64
97	nThreads int
98	nCalls   int
99	// duration for a simulated synchronous call
100	execDuration time.Duration
101	// duration for simulated "other work".  This can be negative,
102	// causing a request to be launched a certain amount of time
103	// before the previous one finishes.
104	thinkDuration time.Duration
105	// When true indicates that only half the specified number of
106	// threads should run during the first half of the evaluation
107	// period
108	split bool
109}
110
111// uniformScenario describes a scenario based on the given set of uniform clients.
112// Each uniform client specifies a number of threads, each of which alternates between thinking
113// and making a synchronous request through the QueueSet.
114// The test measures how much concurrency each client got, on average, over
115// the initial evalDuration and tests to see whether they all got about the fair amount.
116// Each client needs to be demanding enough to use more than its fair share,
117// or overall care needs to be taken about timing so that scheduling details
118// do not cause any client to actually request a significantly smaller share
119// than it theoretically should.
120// expectFair indicate whether the QueueSet is expected to be
121// fair in the respective halves of a split scenario;
122// in a non-split scenario this is a singleton with one expectation.
123// expectAllRequests indicates whether all requests are expected to get dispatched.
124type uniformScenario struct {
125	name                                     string
126	qs                                       fq.QueueSet
127	clients                                  []uniformClient
128	concurrencyLimit                         int
129	evalDuration                             time.Duration
130	expectedFair                             []bool
131	expectedFairnessMargin                   []float64
132	expectAllRequests                        bool
133	evalInqueueMetrics, evalExecutingMetrics bool
134	rejectReason                             string
135	clk                                      *testclock.FakeEventClock
136	counter                                  counter.GoRoutineCounter
137}
138
139func (us uniformScenario) exercise(t *testing.T) {
140	uss := uniformScenarioState{
141		t:               t,
142		uniformScenario: us,
143		startTime:       time.Now(),
144		integrators:     make([]fq.Integrator, len(us.clients)),
145		executions:      make([]int32, len(us.clients)),
146		rejects:         make([]int32, len(us.clients)),
147	}
148	for _, uc := range us.clients {
149		uss.doSplit = uss.doSplit || uc.split
150	}
151	uss.exercise()
152}
153
154type uniformScenarioState struct {
155	t *testing.T
156	uniformScenario
157	startTime                                                    time.Time
158	doSplit                                                      bool
159	integrators                                                  []fq.Integrator
160	failedCount                                                  uint64
161	expectedInqueue, expectedExecuting, expectedConcurrencyInUse string
162	executions, rejects                                          []int32
163}
164
165func (uss *uniformScenarioState) exercise() {
166	uss.t.Logf("%s: Start %s, doSplit=%v, clk=%p, grc=%p", uss.startTime.Format(nsTimeFmt), uss.name, uss.doSplit, uss.clk, uss.counter)
167	if uss.evalInqueueMetrics || uss.evalExecutingMetrics {
168		metrics.Reset()
169	}
170	for i, uc := range uss.clients {
171		uss.integrators[i] = fq.NewIntegrator(uss.clk)
172		fsName := fmt.Sprintf("client%d", i)
173		uss.expectedInqueue = uss.expectedInqueue + fmt.Sprintf(`				apiserver_flowcontrol_current_inqueue_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
174		for j := 0; j < uc.nThreads; j++ {
175			ust := uniformScenarioThread{
176				uss:    uss,
177				i:      i,
178				j:      j,
179				nCalls: uc.nCalls,
180				uc:     uc,
181				igr:    uss.integrators[i],
182				fsName: fsName,
183			}
184			ust.start()
185		}
186	}
187	if uss.doSplit {
188		uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectedFair[0], uss.expectedFairnessMargin[0])
189	}
190	uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectedFair[len(uss.expectedFair)-1], uss.expectedFairnessMargin[len(uss.expectedFairnessMargin)-1])
191	uss.clk.Run(nil)
192	uss.finalReview()
193}
194
195type uniformScenarioThread struct {
196	uss    *uniformScenarioState
197	i, j   int
198	nCalls int
199	uc     uniformClient
200	igr    fq.Integrator
201	fsName string
202}
203
204func (ust *uniformScenarioThread) start() {
205	initialDelay := time.Duration(11*ust.j + 2*ust.i)
206	if ust.uc.split && ust.j >= ust.uc.nThreads/2 {
207		initialDelay += ust.uss.evalDuration / 2
208		ust.nCalls = ust.nCalls / 2
209	}
210	ust.uss.clk.EventAfterDuration(ust.genCallK(0), initialDelay)
211}
212
213// generates an EventFunc that forks a goroutine to do call k
214func (ust *uniformScenarioThread) genCallK(k int) func(time.Time) {
215	return func(time.Time) {
216		// As an EventFunc, this has to return without waiting
217		// for time to pass, and so cannot do callK(k) itself.
218		ust.uss.counter.Add(1)
219		go func() {
220			ust.callK(k)
221			ust.uss.counter.Add(-1)
222		}()
223	}
224}
225
226func (ust *uniformScenarioThread) callK(k int) {
227	if k >= ust.nCalls {
228		return
229	}
230	req, idle := ust.uss.qs.StartRequest(context.Background(), &fcrequest.Width{Seats: 1}, ust.uc.hash, "", ust.fsName, ust.uss.name, []int{ust.i, ust.j, k}, nil)
231	ust.uss.t.Logf("%s: %d, %d, %d got req=%p, idle=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, req, idle)
232	if req == nil {
233		atomic.AddUint64(&ust.uss.failedCount, 1)
234		atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
235		return
236	}
237	if idle {
238		ust.uss.t.Error("got request but QueueSet reported idle")
239	}
240	var executed bool
241	idle2 := req.Finish(func() {
242		executed = true
243		execStart := ust.uss.clk.Now()
244		ust.uss.t.Logf("%s: %d, %d, %d executing", execStart.Format(nsTimeFmt), ust.i, ust.j, k)
245		atomic.AddInt32(&ust.uss.executions[ust.i], 1)
246		ust.igr.Add(1)
247		ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
248		ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration)
249		ust.igr.Add(-1)
250	})
251	ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
252	if !executed {
253		atomic.AddUint64(&ust.uss.failedCount, 1)
254		atomic.AddInt32(&ust.uss.rejects[ust.i], 1)
255	}
256}
257
258func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) {
259	uss.clk.Run(&lim)
260	uss.clk.SetTime(lim)
261	if uss.doSplit && !last {
262		uss.t.Logf("%s: End of first half", uss.clk.Now().Format(nsTimeFmt))
263	} else {
264		uss.t.Logf("%s: End", uss.clk.Now().Format(nsTimeFmt))
265	}
266	demands := make([]float64, len(uss.clients))
267	averages := make([]float64, len(uss.clients))
268	for i, uc := range uss.clients {
269		nThreads := uc.nThreads
270		if uc.split && !last {
271			nThreads = nThreads / 2
272		}
273		demands[i] = float64(nThreads) * float64(uc.execDuration) / float64(uc.thinkDuration+uc.execDuration)
274		averages[i] = uss.integrators[i].Reset().Average
275	}
276	fairAverages := fairAlloc(demands, float64(uss.concurrencyLimit))
277	for i := range uss.clients {
278		var gotFair bool
279		if fairAverages[i] > 0 {
280			relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
281			gotFair = math.Abs(relDiff) <= margin
282		} else {
283			gotFair = math.Abs(averages[i]) <= margin
284		}
285
286		if gotFair != expectFair {
287			uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
288		} else {
289			uss.t.Logf("%s client %d last=%v got an Average of %v and the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
290		}
291	}
292}
293
294func (uss *uniformScenarioState) finalReview() {
295	if uss.expectAllRequests && uss.failedCount > 0 {
296		uss.t.Errorf("Expected all requests to be successful but got %v failed requests", uss.failedCount)
297	} else if !uss.expectAllRequests && uss.failedCount == 0 {
298		uss.t.Errorf("Expected failed requests but all requests succeeded")
299	}
300	if uss.evalInqueueMetrics {
301		e := `
302				# HELP apiserver_flowcontrol_current_inqueue_requests [ALPHA] Number of requests currently pending in queues of the API Priority and Fairness system
303				# TYPE apiserver_flowcontrol_current_inqueue_requests gauge
304` + uss.expectedInqueue
305		err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_inqueue_requests")
306		if err != nil {
307			uss.t.Error(err)
308		} else {
309			uss.t.Log("Success with" + e)
310		}
311	}
312	expectedRejects := ""
313	for i := range uss.clients {
314		fsName := fmt.Sprintf("client%d", i)
315		if atomic.AddInt32(&uss.executions[i], 0) > 0 {
316			uss.expectedExecuting = uss.expectedExecuting + fmt.Sprintf(`				apiserver_flowcontrol_current_executing_requests{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
317			uss.expectedConcurrencyInUse = uss.expectedConcurrencyInUse + fmt.Sprintf(`				apiserver_flowcontrol_request_concurrency_in_use{flow_schema=%q,priority_level=%q} 0%s`, fsName, uss.name, "\n")
318		}
319		if atomic.AddInt32(&uss.rejects[i], 0) > 0 {
320			expectedRejects = expectedRejects + fmt.Sprintf(`				apiserver_flowcontrol_rejected_requests_total{flow_schema=%q,priority_level=%q,reason=%q} %d%s`, fsName, uss.name, uss.rejectReason, uss.rejects[i], "\n")
321		}
322	}
323	if uss.evalExecutingMetrics && len(uss.expectedExecuting) > 0 {
324		e := `
325				# HELP apiserver_flowcontrol_current_executing_requests [ALPHA] Number of requests currently executing in the API Priority and Fairness system
326				# TYPE apiserver_flowcontrol_current_executing_requests gauge
327` + uss.expectedExecuting
328		err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_current_executing_requests")
329		if err != nil {
330			uss.t.Error(err)
331		} else {
332			uss.t.Log("Success with" + e)
333		}
334	}
335	if uss.evalExecutingMetrics && len(uss.expectedConcurrencyInUse) > 0 {
336		e := `
337				# HELP apiserver_flowcontrol_request_concurrency_in_use [ALPHA] Concurrency (number of seats) occupided by the currently executing requests in the API Priority and Fairness system
338				# TYPE apiserver_flowcontrol_request_concurrency_in_use gauge
339` + uss.expectedConcurrencyInUse
340		err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_request_concurrency_in_use")
341		if err != nil {
342			uss.t.Error(err)
343		} else {
344			uss.t.Log("Success with" + e)
345		}
346	}
347	if uss.evalExecutingMetrics && len(expectedRejects) > 0 {
348		e := `
349				# HELP apiserver_flowcontrol_rejected_requests_total [ALPHA] Number of requests rejected by API Priority and Fairness system
350				# TYPE apiserver_flowcontrol_rejected_requests_total counter
351` + expectedRejects
352		err := metrics.GatherAndCompare(e, "apiserver_flowcontrol_rejected_requests_total")
353		if err != nil {
354			uss.t.Error(err)
355		} else {
356			uss.t.Log("Success with" + e)
357		}
358	}
359}
360
361func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
362	dunch := make(chan struct{})
363	clk.EventAfterDuration(func(time.Time) {
364		counter.Add(1)
365		close(dunch)
366	}, duration)
367	counter.Add(-1)
368	<-dunch
369}
370
371func init() {
372	klog.InitFlags(nil)
373}
374
375// TestNoRestraint tests whether the no-restraint factory gives every client what it asks for
376func TestNoRestraint(t *testing.T) {
377	metrics.Register()
378	now := time.Now()
379	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
380	nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
381	if err != nil {
382		t.Fatal(err)
383	}
384	nr := nrc.Complete(fq.DispatchingConfig{})
385	uniformScenario{name: "NoRestraint",
386		qs: nr,
387		clients: []uniformClient{
388			{1001001001, 5, 10, time.Second, time.Second, false},
389			{2002002002, 2, 10, time.Second, time.Second / 2, false},
390		},
391		concurrencyLimit:       10,
392		evalDuration:           time.Second * 15,
393		expectedFair:           []bool{true},
394		expectedFairnessMargin: []float64{0.1},
395		expectAllRequests:      true,
396		clk:                    clk,
397		counter:                counter,
398	}.exercise(t)
399}
400
401func TestUniformFlowsHandSize1(t *testing.T) {
402	metrics.Register()
403	now := time.Now()
404
405	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
406	qsf := NewQueueSetFactory(clk, counter)
407	qCfg := fq.QueuingConfig{
408		Name:             "TestUniformFlowsHandSize1",
409		DesiredNumQueues: 9,
410		QueueLengthLimit: 8,
411		HandSize:         1,
412		RequestWaitLimit: 10 * time.Minute,
413	}
414	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
415	if err != nil {
416		t.Fatal(err)
417	}
418	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
419
420	uniformScenario{name: qCfg.Name,
421		qs: qs,
422		clients: []uniformClient{
423			{1001001001, 8, 20, time.Second, time.Second - 1, false},
424			{2002002002, 8, 20, time.Second, time.Second - 1, false},
425		},
426		concurrencyLimit:       4,
427		evalDuration:           time.Second * 50,
428		expectedFair:           []bool{true},
429		expectedFairnessMargin: []float64{0.1},
430		expectAllRequests:      true,
431		evalInqueueMetrics:     true,
432		evalExecutingMetrics:   true,
433		clk:                    clk,
434		counter:                counter,
435	}.exercise(t)
436}
437
438func TestUniformFlowsHandSize3(t *testing.T) {
439	metrics.Register()
440	now := time.Now()
441
442	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
443	qsf := NewQueueSetFactory(clk, counter)
444	qCfg := fq.QueuingConfig{
445		Name:             "TestUniformFlowsHandSize3",
446		DesiredNumQueues: 8,
447		QueueLengthLimit: 4,
448		HandSize:         3,
449		RequestWaitLimit: 10 * time.Minute,
450	}
451	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
452	if err != nil {
453		t.Fatal(err)
454	}
455	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
456	uniformScenario{name: qCfg.Name,
457		qs: qs,
458		clients: []uniformClient{
459			{1001001001, 8, 30, time.Second, time.Second - 1, false},
460			{2002002002, 8, 30, time.Second, time.Second - 1, false},
461		},
462		concurrencyLimit:       4,
463		evalDuration:           time.Second * 60,
464		expectedFair:           []bool{true},
465		expectedFairnessMargin: []float64{0.1},
466		expectAllRequests:      true,
467		evalInqueueMetrics:     true,
468		evalExecutingMetrics:   true,
469		clk:                    clk,
470		counter:                counter,
471	}.exercise(t)
472}
473
474func TestDifferentFlowsExpectEqual(t *testing.T) {
475	metrics.Register()
476	now := time.Now()
477
478	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
479	qsf := NewQueueSetFactory(clk, counter)
480	qCfg := fq.QueuingConfig{
481		Name:             "DiffFlowsExpectEqual",
482		DesiredNumQueues: 9,
483		QueueLengthLimit: 8,
484		HandSize:         1,
485		RequestWaitLimit: 10 * time.Minute,
486	}
487	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
488	if err != nil {
489		t.Fatal(err)
490	}
491	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
492
493	uniformScenario{name: qCfg.Name,
494		qs: qs,
495		clients: []uniformClient{
496			{1001001001, 8, 20, time.Second, time.Second, false},
497			{2002002002, 7, 30, time.Second, time.Second / 2, false},
498		},
499		concurrencyLimit:       4,
500		evalDuration:           time.Second * 40,
501		expectedFair:           []bool{true},
502		expectedFairnessMargin: []float64{0.1},
503		expectAllRequests:      true,
504		evalInqueueMetrics:     true,
505		evalExecutingMetrics:   true,
506		clk:                    clk,
507		counter:                counter,
508	}.exercise(t)
509}
510
511func TestDifferentFlowsExpectUnequal(t *testing.T) {
512	metrics.Register()
513	now := time.Now()
514
515	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
516	qsf := NewQueueSetFactory(clk, counter)
517	qCfg := fq.QueuingConfig{
518		Name:             "DiffFlowsExpectUnequal",
519		DesiredNumQueues: 9,
520		QueueLengthLimit: 6,
521		HandSize:         1,
522		RequestWaitLimit: 10 * time.Minute,
523	}
524	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
525	if err != nil {
526		t.Fatal(err)
527	}
528	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
529
530	uniformScenario{name: qCfg.Name,
531		qs: qs,
532		clients: []uniformClient{
533			{1001001001, 4, 20, time.Second, time.Second - 1, false},
534			{2002002002, 2, 20, time.Second, time.Second - 1, false},
535		},
536		concurrencyLimit:       3,
537		evalDuration:           time.Second * 20,
538		expectedFair:           []bool{true},
539		expectedFairnessMargin: []float64{0.1},
540		expectAllRequests:      true,
541		evalInqueueMetrics:     true,
542		evalExecutingMetrics:   true,
543		clk:                    clk,
544		counter:                counter,
545	}.exercise(t)
546}
547
548func TestWindup(t *testing.T) {
549	metrics.Register()
550	now := time.Now()
551
552	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
553	qsf := NewQueueSetFactory(clk, counter)
554	qCfg := fq.QueuingConfig{
555		Name:             "TestWindup",
556		DesiredNumQueues: 9,
557		QueueLengthLimit: 6,
558		HandSize:         1,
559		RequestWaitLimit: 10 * time.Minute,
560	}
561	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
562	if err != nil {
563		t.Fatal(err)
564	}
565	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 3})
566
567	uniformScenario{name: qCfg.Name, qs: qs,
568		clients: []uniformClient{
569			{1001001001, 2, 40, time.Second, -1, false},
570			{2002002002, 2, 40, time.Second, -1, true},
571		},
572		concurrencyLimit:       3,
573		evalDuration:           time.Second * 40,
574		expectedFair:           []bool{true, true},
575		expectedFairnessMargin: []float64{0.1, 0.26},
576		expectAllRequests:      true,
577		evalInqueueMetrics:     true,
578		evalExecutingMetrics:   true,
579		clk:                    clk,
580		counter:                counter,
581	}.exercise(t)
582}
583
584func TestDifferentFlowsWithoutQueuing(t *testing.T) {
585	metrics.Register()
586	now := time.Now()
587
588	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
589	qsf := NewQueueSetFactory(clk, counter)
590	qCfg := fq.QueuingConfig{
591		Name:             "TestDifferentFlowsWithoutQueuing",
592		DesiredNumQueues: 0,
593	}
594	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
595	if err != nil {
596		t.Fatal(err)
597	}
598	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 4})
599
600	uniformScenario{name: qCfg.Name,
601		qs: qs,
602		clients: []uniformClient{
603			{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
604			{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
605		},
606		concurrencyLimit:       4,
607		evalDuration:           time.Second * 13,
608		expectedFair:           []bool{false},
609		expectedFairnessMargin: []float64{0.1},
610		evalExecutingMetrics:   true,
611		rejectReason:           "concurrency-limit",
612		clk:                    clk,
613		counter:                counter,
614	}.exercise(t)
615}
616
617func TestTimeout(t *testing.T) {
618	metrics.Register()
619	now := time.Now()
620
621	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
622	qsf := NewQueueSetFactory(clk, counter)
623	qCfg := fq.QueuingConfig{
624		Name:             "TestTimeout",
625		DesiredNumQueues: 128,
626		QueueLengthLimit: 128,
627		HandSize:         1,
628		RequestWaitLimit: 0,
629	}
630	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
631	if err != nil {
632		t.Fatal(err)
633	}
634	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
635
636	uniformScenario{name: qCfg.Name,
637		qs: qs,
638		clients: []uniformClient{
639			{1001001001, 5, 100, time.Second, time.Second, false},
640		},
641		concurrencyLimit:       1,
642		evalDuration:           time.Second * 10,
643		expectedFair:           []bool{true},
644		expectedFairnessMargin: []float64{0.1},
645		evalInqueueMetrics:     true,
646		evalExecutingMetrics:   true,
647		rejectReason:           "time-out",
648		clk:                    clk,
649		counter:                counter,
650	}.exercise(t)
651}
652
653func TestContextCancel(t *testing.T) {
654	metrics.Register()
655	metrics.Reset()
656	now := time.Now()
657	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
658	qsf := NewQueueSetFactory(clk, counter)
659	qCfg := fq.QueuingConfig{
660		Name:             "TestContextCancel",
661		DesiredNumQueues: 11,
662		QueueLengthLimit: 11,
663		HandSize:         1,
664		RequestWaitLimit: 15 * time.Second,
665	}
666	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
667	if err != nil {
668		t.Fatal(err)
669	}
670	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
671	counter.Add(1) // account for the goroutine running this test
672	ctx1 := context.Background()
673	b2i := map[bool]int{false: 0, true: 1}
674	var qnc [2][2]int32
675	req1, _ := qs.StartRequest(ctx1, &fcrequest.Width{Seats: 1}, 1, "", "fs1", "test", "one", func(inQueue bool) { atomic.AddInt32(&qnc[0][b2i[inQueue]], 1) })
676	if req1 == nil {
677		t.Error("Request rejected")
678		return
679	}
680	if a := atomic.AddInt32(&qnc[0][0], 0); a != 1 {
681		t.Errorf("Got %d calls to queueNoteFn1(false), expected 1", a)
682	}
683	if a := atomic.AddInt32(&qnc[0][1], 0); a != 1 {
684		t.Errorf("Got %d calls to queueNoteFn1(true), expected 1", a)
685	}
686	var executed1 bool
687	idle1 := req1.Finish(func() {
688		executed1 = true
689		ctx2, cancel2 := context.WithCancel(context.Background())
690		tBefore := time.Now()
691		go func() {
692			time.Sleep(time.Second)
693			if a := atomic.AddInt32(&qnc[1][0], 0); a != 0 {
694				t.Errorf("Got %d calls to queueNoteFn2(false), expected 0", a)
695			}
696			if a := atomic.AddInt32(&qnc[1][1], 0); a != 1 {
697				t.Errorf("Got %d calls to queueNoteFn2(true), expected 1", a)
698			}
699			// account for unblocking the goroutine that waits on cancelation
700			counter.Add(1)
701			cancel2()
702		}()
703		req2, idle2a := qs.StartRequest(ctx2, &fcrequest.Width{Seats: 1}, 2, "", "fs2", "test", "two", func(inQueue bool) { atomic.AddInt32(&qnc[1][b2i[inQueue]], 1) })
704		if idle2a {
705			t.Error("2nd StartRequest returned idle")
706		}
707		if req2 != nil {
708			idle2b := req2.Finish(func() {
709				t.Error("Executing req2")
710			})
711			if idle2b {
712				t.Error("2nd Finish returned idle")
713			}
714			if a := atomic.AddInt32(&qnc[1][0], 0); a != 1 {
715				t.Errorf("Got %d calls to queueNoteFn2(false), expected 1", a)
716			}
717		}
718		tAfter := time.Now()
719		dt := tAfter.Sub(tBefore)
720		if dt < time.Second || dt > 2*time.Second {
721			t.Errorf("Unexpected: dt=%d", dt)
722		}
723	})
724	if !executed1 {
725		t.Errorf("Unexpected: executed1=%v", executed1)
726	}
727	if !idle1 {
728		t.Error("Not idle at the end")
729	}
730}
731
732func TestTotalRequestsExecutingWithPanic(t *testing.T) {
733	metrics.Register()
734	metrics.Reset()
735	now := time.Now()
736	clk, counter := testclock.NewFakeEventClock(now, 0, nil)
737	qsf := NewQueueSetFactory(clk, counter)
738	qCfg := fq.QueuingConfig{
739		Name:             "TestTotalRequestsExecutingWithPanic",
740		DesiredNumQueues: 0,
741		RequestWaitLimit: 15 * time.Second,
742	}
743	qsc, err := qsf.BeginConstruction(qCfg, newObserverPair(clk))
744	if err != nil {
745		t.Fatal(err)
746	}
747	qs := qsc.Complete(fq.DispatchingConfig{ConcurrencyLimit: 1})
748	counter.Add(1) // account for the goroutine running this test
749
750	queue, ok := qs.(*queueSet)
751	if !ok {
752		t.Fatalf("expected a QueueSet of type: %T but got: %T", &queueSet{}, qs)
753	}
754	if queue.totRequestsExecuting != 0 {
755		t.Fatalf("precondition: expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting)
756	}
757	if queue.dCfg.ConcurrencyLimit != 1 {
758		t.Fatalf("precondition: expected concurrency limit of the QueueSet to be 1, but got: %d", queue.dCfg.ConcurrencyLimit)
759	}
760
761	ctx := context.Background()
762	req, _ := qs.StartRequest(ctx, &fcrequest.Width{Seats: 1}, 1, "", "fs", "test", "one", func(inQueue bool) {})
763	if req == nil {
764		t.Fatal("expected a Request object from StartRequest, but got nil")
765	}
766
767	panicErrExpected := errors.New("apiserver panic'd")
768	var panicErrGot interface{}
769	func() {
770		defer func() {
771			panicErrGot = recover()
772		}()
773
774		req.Finish(func() {
775			// verify that total requests executing goes up by 1 since the request is executing.
776			if queue.totRequestsExecuting != 1 {
777				t.Fatalf("expected total requests currently executing of the QueueSet to be 1, but got: %d", queue.totRequestsExecuting)
778			}
779
780			panic(panicErrExpected)
781		})
782	}()
783
784	// verify that the panic was from us (above)
785	if panicErrExpected != panicErrGot {
786		t.Errorf("expected panic error: %#v, but got: %#v", panicErrExpected, panicErrGot)
787	}
788	if queue.totRequestsExecuting != 0 {
789		t.Errorf("expected total requests currently executing of the QueueSet to be 0, but got: %d", queue.totRequestsExecuting)
790	}
791}
792
793func TestSelectQueueLocked(t *testing.T) {
794	var G float64 = 60
795	tests := []struct {
796		name                    string
797		robinIndex              int
798		concurrencyLimit        int
799		totSeatsInUse           int
800		queues                  []*queue
801		attempts                int
802		beforeSelectQueueLocked func(attempt int, qs *queueSet)
803		minQueueIndexExpected   []int
804		robinIndexExpected      []int
805	}{
806		{
807			name:             "width=1, seats are available, the queue with less virtual start time wins",
808			concurrencyLimit: 1,
809			totSeatsInUse:    0,
810			robinIndex:       -1,
811			queues: []*queue{
812				{
813					virtualStart: 200,
814					requests: newFIFO(
815						&request{width: fcrequest.Width{Seats: 1}},
816					),
817				},
818				{
819					virtualStart: 100,
820					requests: newFIFO(
821						&request{width: fcrequest.Width{Seats: 1}},
822					),
823				},
824			},
825			attempts:              1,
826			minQueueIndexExpected: []int{1},
827			robinIndexExpected:    []int{1},
828		},
829		{
830			name:             "width=1, all seats are occupied, no queue is picked",
831			concurrencyLimit: 1,
832			totSeatsInUse:    1,
833			robinIndex:       -1,
834			queues: []*queue{
835				{
836					virtualStart: 200,
837					requests: newFIFO(
838						&request{width: fcrequest.Width{Seats: 1}},
839					),
840				},
841			},
842			attempts:              1,
843			minQueueIndexExpected: []int{-1},
844			robinIndexExpected:    []int{0},
845		},
846		{
847			name:             "width > 1, seats are available for request with the least finish time, queue is picked",
848			concurrencyLimit: 50,
849			totSeatsInUse:    25,
850			robinIndex:       -1,
851			queues: []*queue{
852				{
853					virtualStart: 200,
854					requests: newFIFO(
855						&request{width: fcrequest.Width{Seats: 50}},
856					),
857				},
858				{
859					virtualStart: 100,
860					requests: newFIFO(
861						&request{width: fcrequest.Width{Seats: 25}},
862					),
863				},
864			},
865			attempts:              1,
866			minQueueIndexExpected: []int{1},
867			robinIndexExpected:    []int{1},
868		},
869		{
870			name:             "width > 1, seats are not available for request with the least finish time, queue is not picked",
871			concurrencyLimit: 50,
872			totSeatsInUse:    26,
873			robinIndex:       -1,
874			queues: []*queue{
875				{
876					virtualStart: 200,
877					requests: newFIFO(
878						&request{width: fcrequest.Width{Seats: 10}},
879					),
880				},
881				{
882					virtualStart: 100,
883					requests: newFIFO(
884						&request{width: fcrequest.Width{Seats: 25}},
885					),
886				},
887			},
888			attempts:              3,
889			minQueueIndexExpected: []int{-1, -1, -1},
890			robinIndexExpected:    []int{1, 1, 1},
891		},
892		{
893			name:             "width > 1, seats become available before 3rd attempt, queue is picked",
894			concurrencyLimit: 50,
895			totSeatsInUse:    26,
896			robinIndex:       -1,
897			queues: []*queue{
898				{
899					virtualStart: 200,
900					requests: newFIFO(
901						&request{width: fcrequest.Width{Seats: 10}},
902					),
903				},
904				{
905					virtualStart: 100,
906					requests: newFIFO(
907						&request{width: fcrequest.Width{Seats: 25}},
908					),
909				},
910			},
911			beforeSelectQueueLocked: func(attempt int, qs *queueSet) {
912				if attempt == 3 {
913					qs.totSeatsInUse = 25
914				}
915			},
916			attempts:              3,
917			minQueueIndexExpected: []int{-1, -1, 1},
918			robinIndexExpected:    []int{1, 1, 1},
919		},
920	}
921
922	for _, test := range tests {
923		t.Run(test.name, func(t *testing.T) {
924			qs := &queueSet{
925				estimatedServiceTime: G,
926				robinIndex:           test.robinIndex,
927				totSeatsInUse:        test.totSeatsInUse,
928				dCfg: fq.DispatchingConfig{
929					ConcurrencyLimit: test.concurrencyLimit,
930				},
931				queues: test.queues,
932			}
933
934			t.Logf("QS: robin index=%d, seats in use=%d limit=%d", qs.robinIndex, qs.totSeatsInUse, qs.dCfg.ConcurrencyLimit)
935
936			for i := 0; i < test.attempts; i++ {
937				attempt := i + 1
938				if test.beforeSelectQueueLocked != nil {
939					test.beforeSelectQueueLocked(attempt, qs)
940				}
941
942				var minQueueExpected *queue
943				if queueIdx := test.minQueueIndexExpected[i]; queueIdx >= 0 {
944					minQueueExpected = test.queues[queueIdx]
945				}
946
947				minQueueGot := qs.selectQueueLocked()
948				if minQueueExpected != minQueueGot {
949					t.Errorf("Expected queue: %#v, but got: %#v", minQueueExpected, minQueueGot)
950				}
951
952				robinIndexExpected := test.robinIndexExpected[i]
953				if robinIndexExpected != qs.robinIndex {
954					t.Errorf("Expected robin index: %d for attempt: %d, but got: %d", robinIndexExpected, attempt, qs.robinIndex)
955				}
956			}
957		})
958	}
959}
960
961func newFIFO(requests ...*request) fifo {
962	l := newRequestFIFO()
963	for i := range requests {
964		l.Enqueue(requests[i])
965	}
966	return l
967}
968
969func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
970	return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
971}
972