1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package queueset 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "sync" 24 "time" 25 26 "k8s.io/apimachinery/pkg/util/clock" 27 "k8s.io/apimachinery/pkg/util/runtime" 28 "k8s.io/apiserver/pkg/util/flowcontrol/counter" 29 "k8s.io/apiserver/pkg/util/flowcontrol/debug" 30 fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" 31 "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" 32 "k8s.io/apiserver/pkg/util/flowcontrol/metrics" 33 fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" 34 "k8s.io/apiserver/pkg/util/shufflesharding" 35 "k8s.io/klog/v2" 36) 37 38const nsTimeFmt = "2006-01-02 15:04:05.000000000" 39 40// queueSetFactory implements the QueueSetFactory interface 41// queueSetFactory makes QueueSet objects. 42type queueSetFactory struct { 43 counter counter.GoRoutineCounter 44 clock clock.PassiveClock 45} 46 47// `*queueSetCompleter` implements QueueSetCompleter. Exactly one of 48// the fields `factory` and `theSet` is non-nil. 49type queueSetCompleter struct { 50 factory *queueSetFactory 51 obsPair metrics.TimedObserverPair 52 theSet *queueSet 53 qCfg fq.QueuingConfig 54 dealer *shufflesharding.Dealer 55} 56 57// queueSet implements the Fair Queuing for Server Requests technique 58// described in this package's doc, and a pointer to one implements 59// the QueueSet interface. The clock, GoRoutineCounter, and estimated 60// service time should not be changed; the fields listed after the 61// lock must be accessed only while holding the lock. The methods of 62// this type follow the naming convention that the suffix "Locked" 63// means the caller must hold the lock; for a method whose name does 64// not end in "Locked" either acquires the lock or does not care about 65// locking. 66type queueSet struct { 67 clock clock.PassiveClock 68 counter counter.GoRoutineCounter 69 estimatedServiceTime float64 70 obsPair metrics.TimedObserverPair 71 72 lock sync.Mutex 73 74 // qCfg holds the current queuing configuration. Its 75 // DesiredNumQueues may be less than the current number of queues. 76 // If its DesiredNumQueues is zero then its other queuing 77 // parameters retain the settings they had when DesiredNumQueues 78 // was last non-zero (if ever). 79 qCfg fq.QueuingConfig 80 81 // the current dispatching configuration. 82 dCfg fq.DispatchingConfig 83 84 // If `config.DesiredNumQueues` is non-zero then dealer is not nil 85 // and is good for `config`. 86 dealer *shufflesharding.Dealer 87 88 // queues may be longer than the desired number, while the excess 89 // queues are still draining. 90 queues []*queue 91 92 // virtualTime is the number of virtual seconds since process startup 93 virtualTime float64 94 95 // lastRealTime is what `clock.Now()` yielded when `virtualTime` was last updated 96 lastRealTime time.Time 97 98 // robinIndex is the index of the last queue dispatched 99 robinIndex int 100 101 // totRequestsWaiting is the sum, over all the queues, of the 102 // number of requests waiting in that queue 103 totRequestsWaiting int 104 105 // totRequestsExecuting is the total number of requests of this 106 // queueSet that are currently executing. That is the same as the 107 // sum, over all the queues, of the number of requests executing 108 // from that queue. 109 totRequestsExecuting int 110 111 // totSeatsInUse is the number of total "seats" in use by all the 112 // request(s) that are currently executing in this queueset. 113 totSeatsInUse int 114} 115 116// NewQueueSetFactory creates a new QueueSetFactory object 117func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { 118 return &queueSetFactory{ 119 counter: counter, 120 clock: c, 121 } 122} 123 124func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, obsPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { 125 dealer, err := checkConfig(qCfg) 126 if err != nil { 127 return nil, err 128 } 129 return &queueSetCompleter{ 130 factory: qsf, 131 obsPair: obsPair, 132 qCfg: qCfg, 133 dealer: dealer}, nil 134} 135 136// checkConfig returns a non-nil Dealer if the config is valid and 137// calls for one, and returns a non-nil error if the given config is 138// invalid. 139func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { 140 if qCfg.DesiredNumQueues == 0 { 141 return nil, nil 142 } 143 dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) 144 if err != nil { 145 err = fmt.Errorf("the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize): %w", err) 146 } 147 return dealer, err 148} 149 150func (qsc *queueSetCompleter) Complete(dCfg fq.DispatchingConfig) fq.QueueSet { 151 qs := qsc.theSet 152 if qs == nil { 153 qs = &queueSet{ 154 clock: qsc.factory.clock, 155 counter: qsc.factory.counter, 156 estimatedServiceTime: 60, 157 obsPair: qsc.obsPair, 158 qCfg: qsc.qCfg, 159 virtualTime: 0, 160 lastRealTime: qsc.factory.clock.Now(), 161 } 162 } 163 qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg) 164 return qs 165} 166 167// createQueues is a helper method for initializing an array of n queues 168func createQueues(n, baseIndex int) []*queue { 169 fqqueues := make([]*queue, n) 170 for i := 0; i < n; i++ { 171 fqqueues[i] = &queue{index: baseIndex + i, requests: newRequestFIFO()} 172 } 173 return fqqueues 174} 175 176func (qs *queueSet) BeginConfigChange(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { 177 dealer, err := checkConfig(qCfg) 178 if err != nil { 179 return nil, err 180 } 181 return &queueSetCompleter{ 182 theSet: qs, 183 qCfg: qCfg, 184 dealer: dealer}, nil 185} 186 187// SetConfiguration is used to set the configuration for a queueSet. 188// Update handling for when fields are updated is handled here as well - 189// eg: if DesiredNum is increased, SetConfiguration reconciles by 190// adding more queues. 191func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { 192 qs.lockAndSyncTime() 193 defer qs.lock.Unlock() 194 195 if qCfg.DesiredNumQueues > 0 { 196 // Adding queues is the only thing that requires immediate action 197 // Removing queues is handled by omitting indexes >DesiredNum from 198 // chooseQueueIndexLocked 199 numQueues := len(qs.queues) 200 if qCfg.DesiredNumQueues > numQueues { 201 qs.queues = append(qs.queues, 202 createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...) 203 } 204 } else { 205 qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit 206 qCfg.HandSize = qs.qCfg.HandSize 207 qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit 208 } 209 210 qs.qCfg = qCfg 211 qs.dCfg = dCfg 212 qs.dealer = dealer 213 qll := qCfg.QueueLengthLimit 214 if qll < 1 { 215 qll = 1 216 } 217 qs.obsPair.RequestsWaiting.SetX1(float64(qll)) 218 qs.obsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit)) 219 220 qs.dispatchAsMuchAsPossibleLocked() 221} 222 223// A decision about a request 224type requestDecision int 225 226// Values passed through a request's decision 227const ( 228 decisionExecute requestDecision = iota 229 decisionReject 230 decisionCancel 231) 232 233// StartRequest begins the process of handling a request. We take the 234// approach of updating the metrics about total requests queued and 235// executing at each point where there is a change in that quantity, 236// because the metrics --- and only the metrics --- track that 237// quantity per FlowSchema. 238func (qs *queueSet) StartRequest(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) (fq.Request, bool) { 239 qs.lockAndSyncTime() 240 defer qs.lock.Unlock() 241 var req *request 242 243 // ======================================================================== 244 // Step 0: 245 // Apply only concurrency limit, if zero queues desired 246 if qs.qCfg.DesiredNumQueues < 1 { 247 if !qs.canAccommodateSeatsLocked(int(width.Seats)) { 248 klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d seats are asked for, %d seats are in use (%d are executing) and the limit is %d", 249 qs.qCfg.Name, fsName, descr1, descr2, width, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) 250 metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") 251 return nil, qs.isIdleLocked() 252 } 253 req = qs.dispatchSansQueueLocked(ctx, width, flowDistinguisher, fsName, descr1, descr2) 254 return req, false 255 } 256 257 // ======================================================================== 258 // Step 1: 259 // 1) Start with shuffle sharding, to pick a queue. 260 // 2) Reject old requests that have been waiting too long 261 // 3) Reject current request if there is not enough concurrency shares and 262 // we are at max queue length 263 // 4) If not rejected, create a request and enqueue 264 req = qs.timeoutOldRequestsAndRejectOrEnqueueLocked(ctx, width, hashValue, flowDistinguisher, fsName, descr1, descr2, queueNoteFn) 265 // req == nil means that the request was rejected - no remaining 266 // concurrency shares and at max queue length already 267 if req == nil { 268 klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) 269 metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full") 270 return nil, qs.isIdleLocked() 271 } 272 273 // ======================================================================== 274 // Step 2: 275 // The next step is to invoke the method that dequeues as much 276 // as possible. 277 // This method runs a loop, as long as there are non-empty 278 // queues and the number currently executing is less than the 279 // assured concurrency value. The body of the loop uses the 280 // fair queuing technique to pick a queue and dispatch a 281 // request from that queue. 282 qs.dispatchAsMuchAsPossibleLocked() 283 284 // ======================================================================== 285 // Step 3: 286 287 // Set up a relay from the context's Done channel to the world 288 // of well-counted goroutines. We Are Told that every 289 // request's context's Done channel gets closed by the time 290 // the request is done being processed. 291 doneCh := ctx.Done() 292 293 // Retrieve the queueset configuration name while we have the lock 294 // and use it in the goroutine below. 295 configName := qs.qCfg.Name 296 297 if doneCh != nil { 298 qs.preCreateOrUnblockGoroutine() 299 go func() { 300 defer runtime.HandleCrash() 301 qs.goroutineDoneOrBlocked() 302 _ = <-doneCh 303 // Whatever goroutine unblocked the preceding receive MUST 304 // have already either (a) incremented qs.counter or (b) 305 // known that said counter is not actually counting or (c) 306 // known that the count does not need to be accurate. 307 // BTW, the count only needs to be accurate in a test that 308 // uses FakeEventClock::Run(). 309 klog.V(6).Infof("QS(%s): Context of request %q %#+v %#+v is Done", configName, fsName, descr1, descr2) 310 qs.cancelWait(req) 311 qs.goroutineDoneOrBlocked() 312 }() 313 } 314 return req, false 315} 316 317// Seats returns the number of seats this request requires. 318func (req *request) Seats() int { 319 return int(req.width.Seats) 320} 321 322func (req *request) NoteQueued(inQueue bool) { 323 if req.queueNoteFn != nil { 324 req.queueNoteFn(inQueue) 325 } 326} 327 328func (req *request) Finish(execFn func()) bool { 329 exec, idle := req.wait() 330 if !exec { 331 return idle 332 } 333 func() { 334 defer func() { 335 idle = req.qs.finishRequestAndDispatchAsMuchAsPossible(req) 336 }() 337 338 execFn() 339 }() 340 341 return idle 342} 343 344func (req *request) wait() (bool, bool) { 345 qs := req.qs 346 qs.lock.Lock() 347 defer qs.lock.Unlock() 348 if req.waitStarted { 349 // This can not happen, because the client is forbidden to 350 // call Wait twice on the same request 351 panic(fmt.Sprintf("Multiple calls to the Wait method, QueueSet=%s, startTime=%s, descr1=%#+v, descr2=%#+v", req.qs.qCfg.Name, req.startTime, req.descr1, req.descr2)) 352 } 353 req.waitStarted = true 354 355 // ======================================================================== 356 // Step 4: 357 // The final step is to wait on a decision from 358 // somewhere and then act on it. 359 decisionAny := req.decision.GetLocked() 360 qs.syncTimeLocked() 361 decision, isDecision := decisionAny.(requestDecision) 362 if !isDecision { 363 panic(fmt.Sprintf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, req.descr1, req.descr2)) 364 } 365 switch decision { 366 case decisionReject: 367 klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) 368 metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") 369 return false, qs.isIdleLocked() 370 case decisionCancel: 371 // TODO(aaron-prindle) add metrics for this case 372 klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) 373 return false, qs.isIdleLocked() 374 case decisionExecute: 375 klog.V(5).Infof("QS(%s): Dispatching request %#+v %#+v from its queue", qs.qCfg.Name, req.descr1, req.descr2) 376 return true, false 377 default: 378 // This can not happen, all possible values are handled above 379 panic(decision) 380 } 381} 382 383func (qs *queueSet) IsIdle() bool { 384 qs.lock.Lock() 385 defer qs.lock.Unlock() 386 return qs.isIdleLocked() 387} 388 389func (qs *queueSet) isIdleLocked() bool { 390 return qs.totRequestsWaiting == 0 && qs.totRequestsExecuting == 0 391} 392 393// lockAndSyncTime acquires the lock and updates the virtual time. 394// Doing them together avoids the mistake of modify some queue state 395// before calling syncTimeLocked. 396func (qs *queueSet) lockAndSyncTime() { 397 qs.lock.Lock() 398 qs.syncTimeLocked() 399} 400 401// syncTimeLocked updates the virtual time based on the assumption 402// that the current state of the queues has been in effect since 403// `qs.lastRealTime`. Thus, it should be invoked after acquiring the 404// lock and before modifying the state of any queue. 405func (qs *queueSet) syncTimeLocked() { 406 realNow := qs.clock.Now() 407 timeSinceLast := realNow.Sub(qs.lastRealTime).Seconds() 408 qs.lastRealTime = realNow 409 qs.virtualTime += timeSinceLast * qs.getVirtualTimeRatioLocked() 410 metrics.SetCurrentR(qs.qCfg.Name, qs.virtualTime) 411} 412 413// getVirtualTimeRatio calculates the rate at which virtual time has 414// been advancing, according to the logic in `doc.go`. 415func (qs *queueSet) getVirtualTimeRatioLocked() float64 { 416 activeQueues := 0 417 seatsRequested := 0 418 for _, queue := range qs.queues { 419 seatsRequested += (queue.seatsInUse + queue.requests.SeatsSum()) 420 if queue.requests.Length() > 0 || queue.requestsExecuting > 0 { 421 activeQueues++ 422 } 423 } 424 if activeQueues == 0 { 425 return 0 426 } 427 return math.Min(float64(seatsRequested), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues) 428} 429 430// timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required 431// to validate and enqueue a request for the queueSet/QueueSet: 432// 1) Start with shuffle sharding, to pick a queue. 433// 2) Reject old requests that have been waiting too long 434// 3) Reject current request if there is not enough concurrency shares and 435// we are at max queue length 436// 4) If not rejected, create a request and enqueue 437// returns the enqueud request on a successful enqueue 438// returns nil in the case that there is no available concurrency or 439// the queuelengthlimit has been reached 440func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Context, width *fqrequest.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn fq.QueueNoteFn) *request { 441 // Start with the shuffle sharding, to pick a queue. 442 queueIdx := qs.chooseQueueIndexLocked(hashValue, descr1, descr2) 443 queue := qs.queues[queueIdx] 444 // The next step is the logic to reject requests that have been waiting too long 445 qs.removeTimedOutRequestsFromQueueLocked(queue, fsName) 446 // NOTE: currently timeout is only checked for each new request. This means that there can be 447 // requests that are in the queue longer than the timeout if there are no new requests 448 // We prefer the simplicity over the promptness, at least for now. 449 450 // Create a request and enqueue 451 req := &request{ 452 qs: qs, 453 fsName: fsName, 454 flowDistinguisher: flowDistinguisher, 455 ctx: ctx, 456 decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), 457 arrivalTime: qs.clock.Now(), 458 queue: queue, 459 descr1: descr1, 460 descr2: descr2, 461 queueNoteFn: queueNoteFn, 462 width: *width, 463 } 464 if ok := qs.rejectOrEnqueueLocked(req); !ok { 465 return nil 466 } 467 metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, queue.requests.Length()) 468 return req 469} 470 471// chooseQueueIndexLocked uses shuffle sharding to select a queue index 472// using the given hashValue and the shuffle sharding parameters of the queueSet. 473func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 interface{}) int { 474 bestQueueIdx := -1 475 bestQueueSeatsSum := int(math.MaxInt32) 476 // the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`. 477 qs.dealer.Deal(hashValue, func(queueIdx int) { 478 // TODO: Consider taking into account `additional latency` of requests 479 // in addition to their widths. 480 // Ideally, this should be based on projected completion time in the 481 // virtual world of the youngest request in the queue. 482 thisSeatsSum := qs.queues[queueIdx].requests.SeatsSum() 483 klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of seatsSum %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisSeatsSum) 484 if thisSeatsSum < bestQueueSeatsSum { 485 bestQueueIdx, bestQueueSeatsSum = queueIdx, thisSeatsSum 486 } 487 }) 488 klog.V(6).Infof("QS(%s) at r=%s v=%.9fs: For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, descr1, descr2, bestQueueIdx, bestQueueSeatsSum, qs.queues[bestQueueIdx].requestsExecuting) 489 return bestQueueIdx 490} 491 492// removeTimedOutRequestsFromQueueLocked rejects old requests that have been enqueued 493// past the requestWaitLimit 494func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName string) { 495 timeoutCount := 0 496 now := qs.clock.Now() 497 reqs := queue.requests 498 // reqs are sorted oldest -> newest 499 // can short circuit loop (break) if oldest requests are not timing out 500 // as newer requests also will not have timed out 501 502 // now - requestWaitLimit = waitLimit 503 waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) 504 reqs.Walk(func(req *request) bool { 505 if waitLimit.After(req.arrivalTime) { 506 req.decision.SetLocked(decisionReject) 507 timeoutCount++ 508 metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) 509 req.NoteQueued(false) 510 511 // we need to check if the next request has timed out. 512 return true 513 } 514 515 // since reqs are sorted oldest -> newest, we are done here. 516 return false 517 }) 518 519 // remove timed out requests from queue 520 if timeoutCount > 0 { 521 // The number of requests we have timed out is timeoutCount, 522 // so, let's dequeue the exact number of requests for this queue. 523 for i := 0; i < timeoutCount; i++ { 524 queue.requests.Dequeue() 525 } 526 // decrement the # of requestsEnqueued 527 qs.totRequestsWaiting -= timeoutCount 528 qs.obsPair.RequestsWaiting.Add(float64(-timeoutCount)) 529 } 530} 531 532// rejectOrEnqueueLocked rejects or enqueues the newly arrived 533// request, which has been assigned to a queue. If up against the 534// queue length limit and the concurrency limit then returns false. 535// Otherwise enqueues and returns true. 536func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { 537 queue := request.queue 538 curQueueLength := queue.requests.Length() 539 // rejects the newly arrived request if resource criteria not met 540 if qs.totSeatsInUse >= qs.dCfg.ConcurrencyLimit && 541 curQueueLength >= qs.qCfg.QueueLengthLimit { 542 return false 543 } 544 545 qs.enqueueLocked(request) 546 return true 547} 548 549// enqueues a request into its queue. 550func (qs *queueSet) enqueueLocked(request *request) { 551 queue := request.queue 552 now := qs.clock.Now() 553 if queue.requests.Length() == 0 && queue.requestsExecuting == 0 { 554 // the queue’s virtual start time is set to the virtual time. 555 queue.virtualStart = qs.virtualTime 556 if klog.V(6).Enabled() { 557 klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, now.Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) 558 } 559 } 560 queue.Enqueue(request) 561 qs.totRequestsWaiting++ 562 metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) 563 request.NoteQueued(true) 564 qs.obsPair.RequestsWaiting.Add(1) 565} 566 567// dispatchAsMuchAsPossibleLocked runs a loop, as long as there 568// are non-empty queues and the number currently executing is less than the 569// assured concurrency value. The body of the loop uses the fair queuing 570// technique to pick a queue, dequeue the request at the head of that 571// queue, increment the count of the number executing, and send true 572// to the request's channel. 573func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { 574 for qs.totRequestsWaiting != 0 && qs.totSeatsInUse < qs.dCfg.ConcurrencyLimit { 575 ok := qs.dispatchLocked() 576 if !ok { 577 break 578 } 579 } 580} 581 582func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, width *fqrequest.Width, flowDistinguisher, fsName string, descr1, descr2 interface{}) *request { 583 // does not call metrics.SetDispatchMetrics because there is no queuing and thus no interesting virtual world 584 now := qs.clock.Now() 585 req := &request{ 586 qs: qs, 587 fsName: fsName, 588 flowDistinguisher: flowDistinguisher, 589 ctx: ctx, 590 startTime: now, 591 decision: lockingpromise.NewWriteOnce(&qs.lock, qs.counter), 592 arrivalTime: now, 593 descr1: descr1, 594 descr2: descr2, 595 width: *width, 596 } 597 req.decision.SetLocked(decisionExecute) 598 qs.totRequestsExecuting++ 599 qs.totSeatsInUse += req.Seats() 600 metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) 601 metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, fsName, req.Seats()) 602 qs.obsPair.RequestsExecuting.Add(1) 603 if klog.V(5).Enabled() { 604 klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) 605 } 606 return req 607} 608 609// dispatchLocked uses the Fair Queuing for Server Requests method to 610// select a queue and dispatch the oldest request in that queue. The 611// return value indicates whether a request was dispatched; this will 612// be false when there are no requests waiting in any queue. 613func (qs *queueSet) dispatchLocked() bool { 614 queue := qs.selectQueueLocked() 615 if queue == nil { 616 return false 617 } 618 request, ok := queue.Dequeue() 619 if !ok { // This should never happen. But if it does... 620 return false 621 } 622 request.startTime = qs.clock.Now() 623 // At this moment the request leaves its queue and starts 624 // executing. We do not recognize any interim state between 625 // "queued" and "executing". While that means "executing" 626 // includes a little overhead from this package, this is not a 627 // problem because other overhead is also included. 628 qs.totRequestsWaiting-- 629 qs.totRequestsExecuting++ 630 qs.totSeatsInUse += request.Seats() 631 queue.requestsExecuting++ 632 queue.seatsInUse += request.Seats() 633 metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) 634 request.NoteQueued(false) 635 metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) 636 metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, request.fsName, request.Seats()) 637 qs.obsPair.RequestsWaiting.Add(-1) 638 qs.obsPair.RequestsExecuting.Add(1) 639 if klog.V(6).Enabled() { 640 klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", 641 qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, 642 queue.index, queue.virtualStart, queue.requests.Length(), queue.requestsExecuting) 643 } 644 // When a request is dequeued for service -> qs.virtualStart += G 645 queue.virtualStart += qs.estimatedServiceTime * float64(request.Seats()) 646 request.decision.SetLocked(decisionExecute) 647 return ok 648} 649 650// cancelWait ensures the request is not waiting. This is only 651// applicable to a request that has been assigned to a queue. 652func (qs *queueSet) cancelWait(req *request) { 653 qs.lock.Lock() 654 defer qs.lock.Unlock() 655 if req.decision.IsSetLocked() { 656 // The request has already been removed from the queue 657 // and so we consider its wait to be over. 658 return 659 } 660 req.decision.SetLocked(decisionCancel) 661 662 // remove the request from the queue as it has timed out 663 req.removeFromQueueFn() 664 qs.totRequestsWaiting-- 665 metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) 666 req.NoteQueued(false) 667 qs.obsPair.RequestsWaiting.Add(-1) 668} 669 670// canAccommodateSeatsLocked returns true if this queueSet has enough 671// seats available to accommodate a request with the given number of seats, 672// otherwise it returns false. 673func (qs *queueSet) canAccommodateSeatsLocked(seats int) bool { 674 switch { 675 case seats > qs.dCfg.ConcurrencyLimit: 676 // we have picked the queue with the minimum virtual finish time, but 677 // the number of seats this request asks for exceeds the concurrency limit. 678 // TODO: this is a quick fix for now, once we have borrowing in place we will not need it 679 if qs.totRequestsExecuting == 0 { 680 // TODO: apply additional lateny associated with this request, as described in the KEP 681 return true 682 } 683 // wait for all "currently" executing requests in this queueSet 684 // to finish before we can execute this request. 685 if klog.V(4).Enabled() { 686 klog.Infof("QS(%s): seats (%d) asked for exceeds concurrency limit, waiting for currently executing requests to complete, %d seats are in use (%d are executing) and the limit is %d", 687 qs.qCfg.Name, seats, qs.totSeatsInUse, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) 688 } 689 return false 690 case qs.totSeatsInUse+seats > qs.dCfg.ConcurrencyLimit: 691 return false 692 } 693 694 return true 695} 696 697// selectQueueLocked examines the queues in round robin order and 698// returns the first one of those for which the virtual finish time of 699// the oldest waiting request is minimal. 700func (qs *queueSet) selectQueueLocked() *queue { 701 minVirtualFinish := math.Inf(1) 702 sMin := math.Inf(1) 703 dsMin := math.Inf(1) 704 sMax := math.Inf(-1) 705 dsMax := math.Inf(-1) 706 var minQueue *queue 707 var minIndex int 708 nq := len(qs.queues) 709 for range qs.queues { 710 qs.robinIndex = (qs.robinIndex + 1) % nq 711 queue := qs.queues[qs.robinIndex] 712 if queue.requests.Length() != 0 { 713 sMin = math.Min(sMin, queue.virtualStart) 714 sMax = math.Max(sMax, queue.virtualStart) 715 estimatedWorkInProgress := qs.estimatedServiceTime * float64(queue.seatsInUse) 716 dsMin = math.Min(dsMin, queue.virtualStart-estimatedWorkInProgress) 717 dsMax = math.Max(dsMax, queue.virtualStart-estimatedWorkInProgress) 718 // the virtual finish time of the oldest request is: 719 // virtual start time + G 720 // we are not taking the width of the request into account when 721 // we calculate the virtual finish time of the request because 722 // it can starve requests with smaller wdith in other queues. 723 // 724 // so let's draw an example of the starving scenario: 725 // - G=60 (estimated service time in seconds) 726 // - concurrency limit=2 727 // - we have two queues, q1 and q2 728 // - q1 has an infinite supply of requests with width W=1 729 // - q2 has one request waiting in the queue with width W=2 730 // - virtual start time for both q1 and q2 are at t0 731 // - requests complete really fast, S=1ms on q1 732 // in this scenario we will execute roughly 60,000 requests 733 // from q1 before we pick the request from q2. 734 currentVirtualFinish := queue.virtualStart + qs.estimatedServiceTime 735 736 if currentVirtualFinish < minVirtualFinish { 737 minVirtualFinish = currentVirtualFinish 738 minQueue = queue 739 minIndex = qs.robinIndex 740 } 741 } 742 } 743 744 // TODO: add a method to fifo that lets us peek at the oldest request 745 var oldestReqFromMinQueue *request 746 minQueue.requests.Walk(func(r *request) bool { 747 oldestReqFromMinQueue = r 748 return false 749 }) 750 if oldestReqFromMinQueue == nil || !qs.canAccommodateSeatsLocked(oldestReqFromMinQueue.Seats()) { 751 // since we have not picked the queue with the minimum virtual finish 752 // time, we are not going to advance the round robin index here. 753 return nil 754 } 755 756 // we set the round robin indexing to start at the chose queue 757 // for the next round. This way the non-selected queues 758 // win in the case that the virtual finish times are the same 759 qs.robinIndex = minIndex 760 // according to the original FQ formula: 761 // 762 // Si = MAX(R(t), Fi-1) 763 // 764 // the virtual start (excluding the estimated cost) of the chose 765 // queue should always be greater or equal to the global virtual 766 // time. 767 // 768 // hence we're refreshing the per-queue virtual time for the chosen 769 // queue here. if the last virtual start time (excluded estimated cost) 770 // falls behind the global virtual time, we update the latest virtual 771 // start by: <latest global virtual time> + <previously estimated cost> 772 previouslyEstimatedServiceTime := float64(minQueue.seatsInUse) * qs.estimatedServiceTime 773 if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime { 774 // per-queue virtual time should not fall behind the global 775 minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime 776 } 777 metrics.SetDispatchMetrics(qs.qCfg.Name, qs.virtualTime, minQueue.virtualStart, sMin, sMax, dsMin, dsMax) 778 return minQueue 779} 780 781// finishRequestAndDispatchAsMuchAsPossible is a convenience method 782// which calls finishRequest for a given request and then dispatches 783// as many requests as possible. This is all of what needs to be done 784// once a request finishes execution or is canceled. This returns a bool 785// indicating whether the QueueSet is now idle. 786func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool { 787 qs.lockAndSyncTime() 788 defer qs.lock.Unlock() 789 790 qs.finishRequestLocked(req) 791 qs.dispatchAsMuchAsPossibleLocked() 792 return qs.isIdleLocked() 793} 794 795// finishRequestLocked is a callback that should be used when a 796// previously dispatched request has completed it's service. This 797// callback updates important state in the queueSet 798func (qs *queueSet) finishRequestLocked(r *request) { 799 now := qs.clock.Now() 800 qs.totRequestsExecuting-- 801 qs.totSeatsInUse -= r.Seats() 802 metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) 803 metrics.AddRequestConcurrencyInUse(qs.qCfg.Name, r.fsName, -r.Seats()) 804 qs.obsPair.RequestsExecuting.Add(-1) 805 806 if r.queue == nil { 807 if klog.V(6).Enabled() { 808 klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) 809 } 810 return 811 } 812 813 S := now.Sub(r.startTime).Seconds() 814 815 // When a request finishes being served, and the actual service time was S, 816 // the queue’s virtual start time is decremented by (G - S)*width. 817 r.queue.virtualStart -= (qs.estimatedServiceTime - S) * float64(r.Seats()) 818 819 // request has finished, remove from requests executing 820 r.queue.requestsExecuting-- 821 r.queue.seatsInUse -= r.Seats() 822 823 if klog.V(6).Enabled() { 824 klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", 825 qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, 826 r.queue.virtualStart, S, r.queue.requests.Length(), r.queue.requestsExecuting) 827 } 828 829 // If there are more queues than desired and this one has no 830 // requests then remove it 831 if len(qs.queues) > qs.qCfg.DesiredNumQueues && 832 r.queue.requests.Length() == 0 && 833 r.queue.requestsExecuting == 0 { 834 qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) 835 836 // decrement here to maintain the invariant that (qs.robinIndex+1) % numQueues 837 // is the index of the next queue after the one last dispatched from 838 if qs.robinIndex >= r.queue.index { 839 qs.robinIndex-- 840 } 841 } 842} 843 844// removeQueueAndUpdateIndexes uses reslicing to remove an index from a slice 845// and then updates the 'index' field of the queues to be correct 846func removeQueueAndUpdateIndexes(queues []*queue, index int) []*queue { 847 keptQueues := append(queues[:index], queues[index+1:]...) 848 for i := index; i < len(keptQueues); i++ { 849 keptQueues[i].index-- 850 } 851 return keptQueues 852} 853 854// preCreateOrUnblockGoroutine needs to be called before creating a 855// goroutine associated with this queueSet or unblocking a blocked 856// one, to properly update the accounting used in testing. 857func (qs *queueSet) preCreateOrUnblockGoroutine() { 858 qs.counter.Add(1) 859} 860 861// goroutineDoneOrBlocked needs to be called at the end of every 862// goroutine associated with this queueSet or when such a goroutine is 863// about to wait on some other goroutine to do something; this is to 864// properly update the accounting used in testing. 865func (qs *queueSet) goroutineDoneOrBlocked() { 866 qs.counter.Add(-1) 867} 868 869func (qs *queueSet) UpdateObservations() { 870 qs.obsPair.RequestsWaiting.Add(0) 871 qs.obsPair.RequestsExecuting.Add(0) 872} 873 874func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump { 875 qs.lock.Lock() 876 defer qs.lock.Unlock() 877 d := debug.QueueSetDump{ 878 Queues: make([]debug.QueueDump, len(qs.queues)), 879 Waiting: qs.totRequestsWaiting, 880 Executing: qs.totRequestsExecuting, 881 SeatsInUse: qs.totSeatsInUse, 882 } 883 for i, q := range qs.queues { 884 d.Queues[i] = q.dump(includeRequestDetails) 885 } 886 return d 887} 888