1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package queueset
18
19import (
20	"context"
21	"fmt"
22	"math"
23	"sync"
24	"time"
25
26	"k8s.io/apimachinery/pkg/util/clock"
27	"k8s.io/apimachinery/pkg/util/runtime"
28	"k8s.io/apiserver/pkg/util/flowcontrol/counter"
29	"k8s.io/apiserver/pkg/util/flowcontrol/debug"
30	fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
31	"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise"
32	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
33	fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
34	"k8s.io/apiserver/pkg/util/shufflesharding"
35	"k8s.io/klog/v2"
36)
37
38const nsTimeFmt = "2006-01-02 15:04:05.000000000"
39
40// queueSetFactory implements the QueueSetFactory interface
41// queueSetFactory makes QueueSet objects.
42type queueSetFactory struct {
43	counter counter.GoRoutineCounter
44	clock   clock.PassiveClock
45}
46
47// `*queueSetCompleter` implements QueueSetCompleter.  Exactly one of
48// the fields `factory` and `theSet` is non-nil.
49type queueSetCompleter struct {
50	factory *queueSetFactory
51	obsPair metrics.TimedObserverPair
52	theSet  *queueSet
53	qCfg    fq.QueuingConfig
54	dealer  *shufflesharding.Dealer
55}
56
57// queueSet implements the Fair Queuing for Server Requests technique
58// described in this package's doc, and a pointer to one implements
59// the QueueSet interface.  The clock, GoRoutineCounter, and estimated
60// service time should not be changed; the fields listed after the
61// lock must be accessed only while holding the lock.  The methods of
62// this type follow the naming convention that the suffix "Locked"
63// means the caller must hold the lock; for a method whose name does
64// not end in "Locked" either acquires the lock or does not care about
65// locking.
66type queueSet struct {
67	clock                clock.PassiveClock
68	counter              counter.GoRoutineCounter
69	estimatedServiceTime float64
70	obsPair              metrics.TimedObserverPair
71
72	lock sync.Mutex
73
74	// qCfg holds the current queuing configuration.  Its
75	// DesiredNumQueues may be less than the current number of queues.
76	// If its DesiredNumQueues is zero then its other queuing
77	// parameters retain the settings they had when DesiredNumQueues
78	// was last non-zero (if ever).
79	qCfg fq.QueuingConfig
80
81	// the current dispatching configuration.
82	dCfg fq.DispatchingConfig
83
84	// If `config.DesiredNumQueues` is non-zero then dealer is not nil
85	// and is good for `config`.
86	dealer *shufflesharding.Dealer
87
88	// queues may be longer than the desired number, while the excess
89	// queues are still draining.
90	queues []*queue
91
92	// virtualTime is the number of virtual seconds since process startup
93	virtualTime float64
94
95	// lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated
96	lastRealTime time.Time
97
98	// robinIndex is the index of the last queue dispatched
99	robinIndex int
100
101	// totRequestsWaiting is the sum, over all the queues, of the
102	// number of requests waiting in that queue
103	totRequestsWaiting int
104
105	// totRequestsExecuting is the total number of requests of this
106	// queueSet that are currently executing.  That is the same as the
107	// sum, over all the queues, of the number of requests executing
108	// from that queue.
109	totRequestsExecuting int
110
111	// totSeatsInUse is the number of total "seats" in use by all the
112	// request(s) that are currently executing in this queueset.
113	totSeatsInUse int
114}
115
116// NewQueueSetFactory creates a new QueueSetFactory object
117func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory {
118	return &queueSetFactory{
119		counter: counter,
120		clock:   c,
121	}
122}
123
124func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
125	dealer, err := checkConfig(qCfg)
126	if err != nil {
127		return nil, err
128	}
129	return &queueSetCompleter{
130		factory: qsf,
131		obsPair: obsPair,
132		qCfg:    qCfg,
133		dealer:  dealer}, nil
134}
135
136// checkConfig returns a non-nil Dealer if the config is valid and
137// calls for one, and returns a non-nil error if the given config is
138// invalid.
139func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) {
140	if qCfg.DesiredNumQueues == 0 {
141		return nil, nil
142	}
143	dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize)
144	if err != nil {
145		err = fmt.Errorf("the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize): %w", err)
146	}
147	return dealer, err
148}
149
150func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet {
151	qs := qsc.theSet
152	if qs == nil {
153		qs = &queueSet{
154			clock:                qsc.factory.clock,
155			counter:              qsc.factory.counter,
156			estimatedServiceTime: 60,
157			obsPair:              qsc.obsPair,
158			qCfg:                 qsc.qCfg,
159			virtualTime:          0,
160			lastRealTime:         qsc.factory.clock.Now(),
161		}
162	}
163	qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg)
164	return qs
165}
166
167// createQueues is a helper method for initializing an array of n queues
168func createQueues(n, baseIndex int) []*queue {
169	fqqueues := make([]*queue, n)
170	for i := 0; i < n; i++ {
171		fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()}
172	}
173	return fqqueues
174}
175
176func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) {
177	dealer, err := checkConfig(qCfg)
178	if err != nil {
179		return nil, err
180	}
181	return &queueSetCompleter{
182		theSet: qs,
183		qCfg:   qCfg,
184		dealer: dealer}, nil
185}
186
187// SetConfiguration is used to set the configuration for a queueSet.
188// Update handling for when fields are updated is handled here as well -
189// eg: if DesiredNum is increased, SetConfiguration reconciles by
190// adding more queues.
191func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) {
192	qs.lockAndSyncTime()
193	defer qs.lock.Unlock()
194
195	if qCfg.DesiredNumQueues > 0 {
196		// Adding queues is the only thing that requires immediate action
197		// Removing queues is handled by omitting indexes >DesiredNum from
198		// chooseQueueIndexLocked
199		numQueues := len(qs.queues)
200		if qCfg.DesiredNumQueues > numQueues {
201			qs.queues = append(qs.queues,
202				createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...)
203		}
204	} else {
205		qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit
206		qCfg.HandSize = qs.qCfg.HandSize
207		qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit
208	}
209
210	qs.qCfg = qCfg
211	qs.dCfg = dCfg
212	qs.dealer = dealer
213	qll := qCfg.QueueLengthLimit
214	if qll < 1 {
215		qll = 1
216	}
217	qs.obsPair.RequestsWaiting.SetX1(float64(qll))
218	qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
219
220	qs.dispatchAsMuchAsPossibleLocked()
221}
222
223// A decision about a request
224type requestDecision int
225
226// Values passed through a request's decision
227const (
228	decisionExecute requestDecision = iota
229	decisionReject
230	decisionCancel
231)
232
233// StartRequest begins the process of handling a request.  We take the
234// approach of updating the metrics about total requests queued and
235// executing at each point where there is a change in that quantity,
236// because the metrics --- and only the metrics --- track that
237// quantity per FlowSchema.
238func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) {
239	qs.lockAndSyncTime()
240	defer qs.lock.Unlock()
241	var req *request
242
243	// ========================================================================
244	// Step 0:
245	// Apply only concurrency limit, if zero queues desired
246	if qs.qCfg.DesiredNumQueues < 1 {
247		if !qs.canAccommodateSeatsLocked(int(width.Seats)) {
248			klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d",
249				qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
250			metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
251			return nil, qs.isIdleLocked()
252		}
253		req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2)
254		return req, false
255	}
256
257	// ========================================================================
258	// Step 1:
259	// 1) Start with shuffle sharding, to pick a queue.
260	// 2) Reject old requests that have been waiting too long
261	// 3) Reject current request if there is not enough concurrency shares and
262	// we are at max queue length
263	// 4) If not rejected, create a request and enqueue
264	req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn)
265	// req == nil means that the request was rejected - no remaining
266	// concurrency shares and at max queue length already
267	if req == nil {
268		klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
269		metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full")
270		return nil, qs.isIdleLocked()
271	}
272
273	// ========================================================================
274	// Step 2:
275	// The next step is to invoke the method that dequeues as much
276	// as possible.
277	// This method runs a loop, as long as there are non-empty
278	// queues and the number currently executing is less than the
279	// assured concurrency value.  The body of the loop uses the
280	// fair queuing technique to pick a queue and dispatch a
281	// request from that queue.
282	qs.dispatchAsMuchAsPossibleLocked()
283
284	// ========================================================================
285	// Step 3:
286
287	// Set up a relay from the context's Done channel to the world
288	// of well-counted goroutines. We Are Told that every
289	// request's context's Done channel gets closed by the time
290	// the request is done being processed.
291	doneCh := ctx.Done()
292
293	// Retrieve the queueset configuration name while we have the lock
294	// and use it in the goroutine below.
295	configName := qs.qCfg.Name
296
297	if doneCh != nil {
298		qs.preCreateOrUnblockGoroutine()
299		go func() {
300			defer runtime.HandleCrash()
301			qs.goroutineDoneOrBlocked()
302			_ = <-doneCh
303			// Whatever goroutine unblocked the preceding receive MUST
304			// have already either (a) incremented qs.counter or (b)
305			// known that said counter is not actually counting or (c)
306			// known that the count does not need to be accurate.
307			// BTW, the count only needs to be accurate in a test that
308			// uses FakeEventClock::Run().
309			klog.V(6).Infof("QS(%s): Context of request %q %#+v %#+v is Done", configName, fsName, descr1, descr2)
310			qs.cancelWait(req)
311			qs.goroutineDoneOrBlocked()
312		}()
313	}
314	return req, false
315}
316
317// Seats returns the number of seats this request requires.
318func (req *request) Seats() int {
319	return int(req.width.Seats)
320}
321
322func (req *request) NoteQueued(inQueue bool) {
323	if req.queueNoteFn != nil {
324		req.queueNoteFn(inQueue)
325	}
326}
327
328func (req *request) Finish(execFn func()) bool {
329	exec, idle := req.wait()
330	if !exec {
331		return idle
332	}
333	func() {
334		defer func() {
335			idle = req.qs.finishRequestAndDispatchAsMuchAsPossible(req)
336		}()
337
338		execFn()
339	}()
340
341	return idle
342}
343
344func (req *request) wait() (bool, bool) {
345	qs := req.qs
346	qs.lock.Lock()
347	defer qs.lock.Unlock()
348	if req.waitStarted {
349		// This can not happen, because the client is forbidden to
350		// call Wait twice on the same request
351		panic(fmt.Sprintf("Multiple calls to the Wait method, QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2))
352	}
353	req.waitStarted = true
354
355	// ========================================================================
356	// Step 4:
357	// The final step is to wait on a decision from
358	// somewhere and then act on it.
359	decisionAny := req.decision.GetLocked()
360	qs.syncTimeLocked()
361	decision, isDecision := decisionAny.(requestDecision)
362	if !isDecision {
363		panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2))
364	}
365	switch decision {
366	case decisionReject:
367		klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
368		metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
369		return false, qs.isIdleLocked()
370	case decisionCancel:
371		// TODO(aaron-prindle) add metrics for this case
372		klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
373		return false, qs.isIdleLocked()
374	case decisionExecute:
375		klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2)
376		return true, false
377	default:
378		// This can not happen, all possible values are handled above
379		panic(decision)
380	}
381}
382
383func (qs *queueSet) IsIdle() bool {
384	qs.lock.Lock()
385	defer qs.lock.Unlock()
386	return qs.isIdleLocked()
387}
388
389func (qs *queueSet) isIdleLocked() bool {
390	return qs.totRequestsWaiting == 0 && qs.totRequestsExecuting == 0
391}
392
393// lockAndSyncTime acquires the lock and updates the virtual time.
394// Doing them together avoids the mistake of modify some queue state
395// before calling syncTimeLocked.
396func (qs *queueSet) lockAndSyncTime() {
397	qs.lock.Lock()
398	qs.syncTimeLocked()
399}
400
401// syncTimeLocked updates the virtual time based on the assumption
402// that the current state of the queues has been in effect since
403// `qs.lastRealTime`.  Thus, it should be invoked after acquiring the
404// lock and before modifying the state of any queue.
405func (qs *queueSet) syncTimeLocked() {
406	realNow := qs.clock.Now()
407	timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds()
408	qs.lastRealTime = realNow
409	qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked()
410	metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime)
411}
412
413// getVirtualTimeRatio calculates the rate at which virtual time has
414// been advancing, according to the logic in `doc.go`.
415func (qs *queueSet) getVirtualTimeRatioLocked() float64 {
416	activeQueues := 0
417	seatsRequested := 0
418	for _, queue := range qs.queues {
419		seatsRequested += (queue.seatsInUse + queue.requests.SeatsSum())
420		if queue.requests.Length() > 0 || queue.requestsExecuting > 0 {
421			activeQueues++
422		}
423	}
424	if activeQueues == 0 {
425		return 0
426	}
427	return math.Min(float64(seatsRequested), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues)
428}
429
430// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
431// to validate and enqueue a request for the queueSet/QueueSet:
432// 1) Start with shuffle sharding, to pick a queue.
433// 2) Reject old requests that have been waiting too long
434// 3) Reject current request if there is not enough concurrency shares and
435// we are at max queue length
436// 4) If not rejected, create a request and enqueue
437// returns the enqueud request on a successful enqueue
438// returns nil in the case that there is no available concurrency or
439// the queuelengthlimit has been reached
440func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request {
441	// Start with the shuffle sharding, to pick a queue.
442	queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2)
443	queue := qs.queues[queueIdx]
444	// The next step is the logic to reject requests that have been waiting too long
445	qs.removeTimedOutRequestsFromQueueLocked(queue, fsName)
446	// NOTE: currently timeout is only checked for each new request.  This means that there can be
447	// requests that are in the queue longer than the timeout if there are no new requests
448	// We prefer the simplicity over the promptness, at least for now.
449
450	// Create a request and enqueue
451	req := &request{
452		qs:                qs,
453		fsName:            fsName,
454		flowDistinguisher: flowDistinguisher,
455		ctx:               ctx,
456		decision:          lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
457		arrivalTime:       qs.clock.Now(),
458		queue:             queue,
459		descr1:            descr1,
460		descr2:            descr2,
461		queueNoteFn:       queueNoteFn,
462		width:             *width,
463	}
464	if ok := qs.rejectOrEnqueueLocked(req); !ok {
465		return nil
466	}
467	metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length())
468	return req
469}
470
471// chooseQueueIndexLocked uses shuffle sharding to select a queue index
472// using the given hashValue and the shuffle sharding parameters of the queueSet.
473func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int {
474	bestQueueIdx := -1
475	bestQueueSeatsSum := int(math.MaxInt32)
476	// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
477	qs.dealer.Deal(hashValue, func(queueIdx int) {
478		// TODO: Consider taking into account `additional latency` of requests
479		// in addition to their widths.
480		// Ideally, this should be based on projected completion time in the
481		// virtual world of the youngest request in the queue.
482		thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum()
483		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum)
484		if thisSeatsSum < bestQueueSeatsSum {
485			bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum
486		}
487	})
488	klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting)
489	return bestQueueIdx
490}
491
492// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued
493// past the requestWaitLimit
494func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) {
495	timeoutCount := 0
496	now := qs.clock.Now()
497	reqs := queue.requests
498	// reqs are sorted oldest -> newest
499	// can short circuit loop (break) if oldest requests are not timing out
500	// as newer requests also will not have timed out
501
502	// now - requestWaitLimit = waitLimit
503	waitLimit := now.Add(-qs.qCfg.RequestWaitLimit)
504	reqs.Walk(func(req *request) bool {
505		if waitLimit.After(req.arrivalTime) {
506			req.decision.SetLocked(decisionReject)
507			timeoutCount++
508			metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
509			req.NoteQueued(false)
510
511			// we need to check if the next request has timed out.
512			return true
513		}
514
515		// since reqs are sorted oldest -> newest, we are done here.
516		return false
517	})
518
519	// remove timed out requests from queue
520	if timeoutCount > 0 {
521		// The number of requests we have timed out is timeoutCount,
522		// so, let's dequeue the exact number of requests for this queue.
523		for i := 0; i < timeoutCount; i++ {
524			queue.requests.Dequeue()
525		}
526		// decrement the # of requestsEnqueued
527		qs.totRequestsWaiting -= timeoutCount
528		qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount))
529	}
530}
531
532// rejectOrEnqueueLocked rejects or enqueues the newly arrived
533// request, which has been assigned to a queue.  If up against the
534// queue length limit and the concurrency limit then returns false.
535// Otherwise enqueues and returns true.
536func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool {
537	queue := request.queue
538	curQueueLength := queue.requests.Length()
539	// rejects the newly arrived request if resource criteria not met
540	if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit &&
541		curQueueLength >= qs.qCfg.QueueLengthLimit {
542		return false
543	}
544
545	qs.enqueueLocked(request)
546	return true
547}
548
549// enqueues a request into its queue.
550func (qs *queueSet) enqueueLocked(request *request) {
551	queue := request.queue
552	now := qs.clock.Now()
553	if queue.requests.Length() == 0 && queue.requestsExecuting == 0 {
554		// the queue’s virtual start time is set to the virtual time.
555		queue.virtualStart = qs.virtualTime
556		if klog.V(6).Enabled() {
557			klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2)
558		}
559	}
560	queue.Enqueue(request)
561	qs.totRequestsWaiting++
562	metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
563	request.NoteQueued(true)
564	qs.obsPair.RequestsWaiting.Add(1)
565}
566
567// dispatchAsMuchAsPossibleLocked runs a loop, as long as there
568// are non-empty queues and the number currently executing is less than the
569// assured concurrency value.  The body of the loop uses the fair queuing
570// technique to pick a queue, dequeue the request at the head of that
571// queue, increment the count of the number executing, and send true
572// to the request's channel.
573func (qs *queueSet) dispatchAsMuchAsPossibleLocked() {
574	for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit {
575		ok := qs.dispatchLocked()
576		if !ok {
577			break
578		}
579	}
580}
581
582func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqrequest.Width, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request {
583	// does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world
584	now := qs.clock.Now()
585	req := &request{
586		qs:                qs,
587		fsName:            fsName,
588		flowDistinguisher: flowDistinguisher,
589		ctx:               ctx,
590		startTime:         now,
591		decision:          lockingpromise.NewWriteOnce(&qs.lock, qs.counter),
592		arrivalTime:       now,
593		descr1:            descr1,
594		descr2:            descr2,
595		width:             *width,
596	}
597	req.decision.SetLocked(decisionExecute)
598	qs.totRequestsExecuting++
599	qs.totSeatsInUse += req.Seats()
600	metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
601	metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats())
602	qs.obsPair.RequestsExecuting.Add(1)
603	if klog.V(5).Enabled() {
604		klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
605	}
606	return req
607}
608
609// dispatchLocked uses the Fair Queuing for Server Requests method to
610// select a queue and dispatch the oldest request in that queue.  The
611// return value indicates whether a request was dispatched; this will
612// be false when there are no requests waiting in any queue.
613func (qs *queueSet) dispatchLocked() bool {
614	queue := qs.selectQueueLocked()
615	if queue == nil {
616		return false
617	}
618	request, ok := queue.Dequeue()
619	if !ok { // This should never happen.  But if it does...
620		return false
621	}
622	request.startTime = qs.clock.Now()
623	// At this moment the request leaves its queue and starts
624	// executing.  We do not recognize any interim state between
625	// "queued" and "executing".  While that means "executing"
626	// includes a little overhead from this package, this is not a
627	// problem because other overhead is also included.
628	qs.totRequestsWaiting--
629	qs.totRequestsExecuting++
630	qs.totSeatsInUse += request.Seats()
631	queue.requestsExecuting++
632	queue.seatsInUse += request.Seats()
633	metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
634	request.NoteQueued(false)
635	metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
636	metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.Seats())
637	qs.obsPair.RequestsWaiting.Add(-1)
638	qs.obsPair.RequestsExecuting.Add(1)
639	if klog.V(6).Enabled() {
640		klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing",
641			qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2,
642			queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting)
643	}
644	// When a request is dequeued for service -> qs.virtualStart += G
645	queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats())
646	request.decision.SetLocked(decisionExecute)
647	return ok
648}
649
650// cancelWait ensures the request is not waiting.  This is only
651// applicable to a request that has been assigned to a queue.
652func (qs *queueSet) cancelWait(req *request) {
653	qs.lock.Lock()
654	defer qs.lock.Unlock()
655	if req.decision.IsSetLocked() {
656		// The request has already been removed from the queue
657		// and so we consider its wait to be over.
658		return
659	}
660	req.decision.SetLocked(decisionCancel)
661
662	// remove the request from the queue as it has timed out
663	req.removeFromQueueFn()
664	qs.totRequestsWaiting--
665	metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
666	req.NoteQueued(false)
667	qs.obsPair.RequestsWaiting.Add(-1)
668}
669
670// canAccommodateSeatsLocked returns true if this queueSet has enough
671// seats available to accommodate a request with the given number of seats,
672// otherwise it returns false.
673func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool {
674	switch {
675	case seats > qs.dCfg.ConcurrencyLimit:
676		// we have picked the queue with the minimum virtual finish time, but
677		// the number of seats this request asks for exceeds the concurrency limit.
678		// TODO: this is a quick fix for now, once we have borrowing in place we will not need it
679		if qs.totRequestsExecuting == 0 {
680			// TODO: apply additional lateny associated with this request, as described in the KEP
681			return true
682		}
683		// wait for all "currently" executing requests in this queueSet
684		// to finish before we can execute this request.
685		if klog.V(4).Enabled() {
686			klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d",
687				qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
688		}
689		return false
690	case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit:
691		return false
692	}
693
694	return true
695}
696
697// selectQueueLocked examines the queues in round robin order and
698// returns the first one of those for which the virtual finish time of
699// the oldest waiting request is minimal.
700func (qs *queueSet) selectQueueLocked() *queue {
701	minVirtualFinish := math.Inf(1)
702	sMin := math.Inf(1)
703	dsMin := math.Inf(1)
704	sMax := math.Inf(-1)
705	dsMax := math.Inf(-1)
706	var minQueue *queue
707	var minIndex int
708	nq := len(qs.queues)
709	for range qs.queues {
710		qs.robinIndex = (qs.robinIndex + 1) % nq
711		queue := qs.queues[qs.robinIndex]
712		if queue.requests.Length() != 0 {
713			sMin = math.Min(sMin, queue.virtualStart)
714			sMax = math.Max(sMax, queue.virtualStart)
715			estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse)
716			dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress)
717			dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress)
718			// the virtual finish time of the oldest request is:
719			//   virtual start time + G
720			// we are not taking the width of the request into account when
721			// we calculate the virtual finish time of the request because
722			// it can starve requests with smaller wdith in other queues.
723			//
724			// so let's draw an example of the starving scenario:
725			//  - G=60 (estimated service time in seconds)
726			//  - concurrency limit=2
727			//  - we have two queues, q1 and q2
728			//  - q1 has an infinite supply of requests with width W=1
729			//  - q2 has one request waiting in the queue with width W=2
730			//  - virtual start time for both q1 and q2 are at t0
731			//  - requests complete really fast, S=1ms on q1
732			// in this scenario we will execute roughly 60,000 requests
733			// from q1 before we pick the request from q2.
734			currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime
735
736			if currentVirtualFinish < minVirtualFinish {
737				minVirtualFinish = currentVirtualFinish
738				minQueue = queue
739				minIndex = qs.robinIndex
740			}
741		}
742	}
743
744	// TODO: add a method to fifo that lets us peek at the oldest request
745	var oldestReqFromMinQueue *request
746	minQueue.requests.Walk(func(r *request) bool {
747		oldestReqFromMinQueue = r
748		return false
749	})
750	if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) {
751		// since we have not picked the queue with the minimum virtual finish
752		// time, we are not going to advance the round robin index here.
753		return nil
754	}
755
756	// we set the round robin indexing to start at the chose queue
757	// for the next round.  This way the non-selected queues
758	// win in the case that the virtual finish times are the same
759	qs.robinIndex = minIndex
760	// according to the original FQ formula:
761	//
762	//   Si = MAX(R(t), Fi-1)
763	//
764	// the virtual start (excluding the estimated cost) of the chose
765	// queue should always be greater or equal to the global virtual
766	// time.
767	//
768	// hence we're refreshing the per-queue virtual time for the chosen
769	// queue here. if the last virtual start time (excluded estimated cost)
770	// falls behind the global virtual time, we update the latest virtual
771	// start by: <latest global virtual time> + <previously estimated cost>
772	previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime
773	if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
774		// per-queue virtual time should not fall behind the global
775		minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
776	}
777	metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax)
778	return minQueue
779}
780
781// finishRequestAndDispatchAsMuchAsPossible is a convenience method
782// which calls finishRequest for a given request and then dispatches
783// as many requests as possible.  This is all of what needs to be done
784// once a request finishes execution or is canceled.  This returns a bool
785// indicating whether the QueueSet is now idle.
786func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool {
787	qs.lockAndSyncTime()
788	defer qs.lock.Unlock()
789
790	qs.finishRequestLocked(req)
791	qs.dispatchAsMuchAsPossibleLocked()
792	return qs.isIdleLocked()
793}
794
795// finishRequestLocked is a callback that should be used when a
796// previously dispatched request has completed it's service.  This
797// callback updates important state in the queueSet
798func (qs *queueSet) finishRequestLocked(r *request) {
799	now := qs.clock.Now()
800	qs.totRequestsExecuting--
801	qs.totSeatsInUse -= r.Seats()
802	metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
803	metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats())
804	qs.obsPair.RequestsExecuting.Add(-1)
805
806	if r.queue == nil {
807		if klog.V(6).Enabled() {
808			klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting)
809		}
810		return
811	}
812
813	S := now.Sub(r.startTime).Seconds()
814
815	// When a request finishes being served, and the actual service time was S,
816	// the queue’s virtual start time is decremented by (G - S)*width.
817	r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats())
818
819	// request has finished, remove from requests executing
820	r.queue.requestsExecuting--
821	r.queue.seatsInUse -= r.Seats()
822
823	if klog.V(6).Enabled() {
824		klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing",
825			qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index,
826			r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting)
827	}
828
829	// If there are more queues than desired and this one has no
830	// requests then remove it
831	if len(qs.queues) > qs.qCfg.DesiredNumQueues &&
832		r.queue.requests.Length() == 0 &&
833		r.queue.requestsExecuting == 0 {
834		qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index)
835
836		// decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues
837		// is the index of the next queue after the one last dispatched from
838		if qs.robinIndex >= r.queue.index {
839			qs.robinIndex--
840		}
841	}
842}
843
844// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice
845// and then updates the 'index' field of the queues to be correct
846func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue {
847	keptQueues := append(queues[:index], queues[index+1:]...)
848	for i := index; i < len(keptQueues); i++ {
849		keptQueues[i].index--
850	}
851	return keptQueues
852}
853
854// preCreateOrUnblockGoroutine needs to be called before creating a
855// goroutine associated with this queueSet or unblocking a blocked
856// one, to properly update the accounting used in testing.
857func (qs *queueSet) preCreateOrUnblockGoroutine() {
858	qs.counter.Add(1)
859}
860
861// goroutineDoneOrBlocked needs to be called at the end of every
862// goroutine associated with this queueSet or when such a goroutine is
863// about to wait on some other goroutine to do something; this is to
864// properly update the accounting used in testing.
865func (qs *queueSet) goroutineDoneOrBlocked() {
866	qs.counter.Add(-1)
867}
868
869func (qs *queueSet) UpdateObservations() {
870	qs.obsPair.RequestsWaiting.Add(0)
871	qs.obsPair.RequestsExecuting.Add(0)
872}
873
874func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
875	qs.lock.Lock()
876	defer qs.lock.Unlock()
877	d := debug.QueueSetDump{
878		Queues:     make([]debug.QueueDump, len(qs.queues)),
879		Waiting:    qs.totRequestsWaiting,
880		Executing:  qs.totRequestsExecuting,
881		SeatsInUse: qs.totSeatsInUse,
882	}
883	for i, q := range qs.queues {
884		d.Queues[i] = q.dump(includeRequestDetails)
885	}
886	return d
887}
888