1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package flowcontrol 18 19import ( 20 "context" 21 "crypto/sha256" 22 "encoding/binary" 23 "encoding/json" 24 "errors" 25 "fmt" 26 "math" 27 "math/rand" 28 "sort" 29 "sync" 30 "time" 31 32 apiequality "k8s.io/apimachinery/pkg/api/equality" 33 apierrors "k8s.io/apimachinery/pkg/api/errors" 34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 35 "k8s.io/apimachinery/pkg/labels" 36 apitypes "k8s.io/apimachinery/pkg/types" 37 "k8s.io/apimachinery/pkg/util/clock" 38 utilerrors "k8s.io/apimachinery/pkg/util/errors" 39 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 40 "k8s.io/apimachinery/pkg/util/sets" 41 "k8s.io/apimachinery/pkg/util/wait" 42 fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" 43 "k8s.io/apiserver/pkg/authentication/user" 44 "k8s.io/apiserver/pkg/endpoints/request" 45 "k8s.io/apiserver/pkg/util/apihelpers" 46 fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" 47 fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format" 48 "k8s.io/apiserver/pkg/util/flowcontrol/metrics" 49 fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" 50 "k8s.io/client-go/tools/cache" 51 "k8s.io/client-go/util/workqueue" 52 "k8s.io/klog/v2" 53 54 flowcontrol "k8s.io/api/flowcontrol/v1beta1" 55 flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1" 56 flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1" 57) 58 59const timeFmt = "2006-01-02T15:04:05.999" 60 61// This file contains a simple local (to the apiserver) controller 62// that digests API Priority and Fairness config objects (FlowSchema 63// and PriorityLevelConfiguration) into the data structure that the 64// filter uses. At this first level of development this controller 65// takes the simplest possible approach: whenever notified of any 66// change to any config object, or when any priority level that is 67// undesired becomes completely unused, all the config objects are 68// read and processed as a whole. 69 70// StartFunction begins the process of handling a request. If the 71// request gets queued then this function uses the given hashValue as 72// the source of entropy as it shuffle-shards the request into a 73// queue. The descr1 and descr2 values play no role in the logic but 74// appear in log messages. This method does not return until the 75// queuing, if any, for this request is done. If `execute` is false 76// then `afterExecution` is irrelevant and the request should be 77// rejected. Otherwise the request should be executed and 78// `afterExecution` must be called exactly once. 79type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, afterExecution func()) 80 81// RequestDigest holds necessary info from request for flow-control 82type RequestDigest struct { 83 RequestInfo *request.RequestInfo 84 User user.Info 85 Width fcrequest.Width 86} 87 88// `*configController` maintains eventual consistency with the API 89// objects that configure API Priority and Fairness, and provides a 90// procedural interface to the configured behavior. The methods of 91// this type and cfgMeal follow the convention that the suffix 92// "Locked" means that the caller must hold the configController lock. 93type configController struct { 94 name string // varies in tests of fighting controllers 95 clock clock.PassiveClock 96 queueSetFactory fq.QueueSetFactory 97 obsPairGenerator metrics.TimedObserverPairGenerator 98 99 // How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager 100 asFieldManager string 101 102 // Given a boolean indicating whether a FlowSchema's referenced 103 // PriorityLevelConfig exists, return a boolean indicating whether 104 // the reference is dangling 105 foundToDangling func(bool) bool 106 107 // configQueue holds `(interface{})(0)` when the configuration 108 // objects need to be reprocessed. 109 configQueue workqueue.RateLimitingInterface 110 111 plLister flowcontrollister.PriorityLevelConfigurationLister 112 plInformerSynced cache.InformerSynced 113 114 fsLister flowcontrollister.FlowSchemaLister 115 fsInformerSynced cache.InformerSynced 116 117 flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface 118 119 // serverConcurrencyLimit is the limit on the server's total 120 // number of non-exempt requests being served at once. This comes 121 // from server configuration. 122 serverConcurrencyLimit int 123 124 // requestWaitLimit comes from server configuration. 125 requestWaitLimit time.Duration 126 127 // This must be locked while accessing flowSchemas or 128 // priorityLevelStates. It is the lock involved in 129 // LockingWriteMultiple. 130 lock sync.Mutex 131 132 // flowSchemas holds the flow schema objects, sorted by increasing 133 // numerical (decreasing logical) matching precedence. Every 134 // FlowSchema in this slice is immutable. 135 flowSchemas apihelpers.FlowSchemaSequence 136 137 // priorityLevelStates maps the PriorityLevelConfiguration object 138 // name to the state for that level. Every name referenced from a 139 // member of `flowSchemas` has an entry here. 140 priorityLevelStates map[string]*priorityLevelState 141 142 // the most recent update attempts, ordered by increasing age. 143 // Consumer trims to keep only the last minute's worth of entries. 144 // The controller uses this to limit itself to at most six updates 145 // to a given FlowSchema in any minute. 146 // This may only be accessed from the one and only worker goroutine. 147 mostRecentUpdates []updateAttempt 148 149 // watchTracker implements the necessary WatchTracker interface. 150 WatchTracker 151} 152 153type updateAttempt struct { 154 timeUpdated time.Time 155 updatedItems sets.String // FlowSchema names 156} 157 158// priorityLevelState holds the state specific to a priority level. 159type priorityLevelState struct { 160 // the API object or prototype prescribing this level. Nothing 161 // reached through this pointer is mutable. 162 pl *flowcontrol.PriorityLevelConfiguration 163 164 // qsCompleter holds the QueueSetCompleter derived from `config` 165 // and `queues` if config is not exempt, nil otherwise. 166 qsCompleter fq.QueueSetCompleter 167 168 // The QueueSet for this priority level. This is nil if and only 169 // if the priority level is exempt. 170 queues fq.QueueSet 171 172 // quiescing==true indicates that this priority level should be 173 // removed when its queues have all drained. May be true only if 174 // queues is non-nil. 175 quiescing bool 176 177 // number of goroutines between Controller::Match and calling the 178 // returned StartFunction 179 numPending int 180 181 // Observers tracking number waiting, executing 182 obsPair metrics.TimedObserverPair 183} 184 185// NewTestableController is extra flexible to facilitate testing 186func newTestableController(config TestableConfig) *configController { 187 cfgCtlr := &configController{ 188 name: config.Name, 189 clock: config.Clock, 190 queueSetFactory: config.QueueSetFactory, 191 obsPairGenerator: config.ObsPairGenerator, 192 asFieldManager: config.AsFieldManager, 193 foundToDangling: config.FoundToDangling, 194 serverConcurrencyLimit: config.ServerConcurrencyLimit, 195 requestWaitLimit: config.RequestWaitLimit, 196 flowcontrolClient: config.FlowcontrolClient, 197 priorityLevelStates: make(map[string]*priorityLevelState), 198 WatchTracker: NewWatchTracker(), 199 } 200 klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager) 201 // Start with longish delay because conflicts will be between 202 // different processes, so take some time to go away. 203 cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue") 204 // ensure the data structure reflects the mandatory config 205 cfgCtlr.lockAndDigestConfigObjects(nil, nil) 206 fci := config.InformerFactory.Flowcontrol().V1beta1() 207 pli := fci.PriorityLevelConfigurations() 208 fsi := fci.FlowSchemas() 209 cfgCtlr.plLister = pli.Lister() 210 cfgCtlr.plInformerSynced = pli.Informer().HasSynced 211 cfgCtlr.fsLister = fsi.Lister() 212 cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced 213 pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 214 AddFunc: func(obj interface{}) { 215 pl := obj.(*flowcontrol.PriorityLevelConfiguration) 216 klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of PLC %s", cfgCtlr.name, pl.Name) 217 cfgCtlr.configQueue.Add(0) 218 }, 219 UpdateFunc: func(oldObj, newObj interface{}) { 220 newPL := newObj.(*flowcontrol.PriorityLevelConfiguration) 221 oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration) 222 if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) { 223 klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec update of PLC %s", cfgCtlr.name, newPL.Name) 224 cfgCtlr.configQueue.Add(0) 225 } else { 226 klog.V(7).Infof("No trigger API priority and fairness config reloading in %s due to spec non-change of PLC %s", cfgCtlr.name, newPL.Name) 227 } 228 }, 229 DeleteFunc: func(obj interface{}) { 230 name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) 231 klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of PLC %s", cfgCtlr.name, name) 232 cfgCtlr.configQueue.Add(0) 233 234 }}) 235 fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 236 AddFunc: func(obj interface{}) { 237 fs := obj.(*flowcontrol.FlowSchema) 238 klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of FS %s", cfgCtlr.name, fs.Name) 239 cfgCtlr.configQueue.Add(0) 240 }, 241 UpdateFunc: func(oldObj, newObj interface{}) { 242 newFS := newObj.(*flowcontrol.FlowSchema) 243 oldFS := oldObj.(*flowcontrol.FlowSchema) 244 // Changes to either Spec or Status are relevant. The 245 // concern is that we might, in some future release, want 246 // different behavior than is implemented now. One of the 247 // hardest questions is how does an operator roll out the 248 // new release in a cluster with multiple kube-apiservers 249 // --- in a way that works no matter what servers crash 250 // and restart when. If this handler reacts only to 251 // changes in Spec then we have a scenario in which the 252 // rollout leaves the old Status in place. The scenario 253 // ends with this subsequence: deploy the last new server 254 // before deleting the last old server, and in between 255 // those two operations the last old server crashes and 256 // recovers. The chosen solution is making this controller 257 // insist on maintaining the particular state that it 258 // establishes. 259 if !(apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) && 260 apiequality.Semantic.DeepEqual(oldFS.Status, newFS.Status)) { 261 klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec and/or status update of FS %s", cfgCtlr.name, newFS.Name) 262 cfgCtlr.configQueue.Add(0) 263 } else { 264 klog.V(7).Infof("No trigger of API priority and fairness config reloading in %s due to spec and status non-change of FS %s", cfgCtlr.name, newFS.Name) 265 } 266 }, 267 DeleteFunc: func(obj interface{}) { 268 name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) 269 klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of FS %s", cfgCtlr.name, name) 270 cfgCtlr.configQueue.Add(0) 271 272 }}) 273 return cfgCtlr 274} 275 276// MaintainObservations keeps the observers from 277// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling 278// too far behind 279func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) { 280 wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh) 281} 282 283func (cfgCtlr *configController) updateObservations() { 284 cfgCtlr.lock.Lock() 285 defer cfgCtlr.lock.Unlock() 286 for _, plc := range cfgCtlr.priorityLevelStates { 287 if plc.queues != nil { 288 plc.queues.UpdateObservations() 289 } 290 } 291} 292 293func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error { 294 defer utilruntime.HandleCrash() 295 296 // Let the config worker stop when we are done 297 defer cfgCtlr.configQueue.ShutDown() 298 299 klog.Info("Starting API Priority and Fairness config controller") 300 if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok { 301 return fmt.Errorf("Never achieved initial sync") 302 } 303 304 klog.Info("Running API Priority and Fairness config worker") 305 go wait.Until(cfgCtlr.runWorker, time.Second, stopCh) 306 307 <-stopCh 308 klog.Info("Shutting down API Priority and Fairness config worker") 309 return nil 310} 311 312// runWorker is the logic of the one and only worker goroutine. We 313// limit the number to one in order to obviate explicit 314// synchronization around access to `cfgCtlr.mostRecentUpdates`. 315func (cfgCtlr *configController) runWorker() { 316 for cfgCtlr.processNextWorkItem() { 317 } 318} 319 320// processNextWorkItem works on one entry from the work queue. 321// Only invoke this in the one and only worker goroutine. 322func (cfgCtlr *configController) processNextWorkItem() bool { 323 obj, shutdown := cfgCtlr.configQueue.Get() 324 if shutdown { 325 return false 326 } 327 328 func(obj interface{}) { 329 defer cfgCtlr.configQueue.Done(obj) 330 specificDelay, err := cfgCtlr.syncOne(map[string]string{}) 331 switch { 332 case err != nil: 333 klog.Error(err) 334 cfgCtlr.configQueue.AddRateLimited(obj) 335 case specificDelay > 0: 336 cfgCtlr.configQueue.AddAfter(obj, specificDelay) 337 default: 338 cfgCtlr.configQueue.Forget(obj) 339 } 340 }(obj) 341 342 return true 343} 344 345// syncOne does one full synchronization. It reads all the API 346// objects that configure API Priority and Fairness and updates the 347// local configController accordingly. 348// Only invoke this in the one and only worker goroutine 349func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) { 350 klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt)) 351 all := labels.Everything() 352 newPLs, err := cfgCtlr.plLister.List(all) 353 if err != nil { 354 return 0, fmt.Errorf("unable to list PriorityLevelConfiguration objects: %w", err) 355 } 356 newFSs, err := cfgCtlr.fsLister.List(all) 357 if err != nil { 358 return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err) 359 } 360 return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs) 361} 362 363// cfgMeal is the data involved in the process of digesting the API 364// objects that configure API Priority and Fairness. All the config 365// objects are digested together, because this is the simplest way to 366// cope with the various dependencies between objects. The process of 367// digestion is done in four passes over config objects --- three 368// passes over PriorityLevelConfigurations and one pass over the 369// FlowSchemas --- with the work dvided among the passes according to 370// those dependencies. 371type cfgMeal struct { 372 cfgCtlr *configController 373 374 newPLStates map[string]*priorityLevelState 375 376 // The sum of the concurrency shares of the priority levels in the 377 // new configuration 378 shareSum float64 379 380 // These keep track of which mandatory priority level config 381 // objects have been digested 382 haveExemptPL, haveCatchAllPL bool 383 384 // Buffered FlowSchema status updates to do. Do them when the 385 // lock is not held, to avoid a deadlock due to such a request 386 // provoking a call into this controller while the lock held 387 // waiting on that request to complete. 388 fsStatusUpdates []fsStatusUpdate 389} 390 391// A buffered set of status updates for FlowSchemas 392type fsStatusUpdate struct { 393 flowSchema *flowcontrol.FlowSchema 394 condition flowcontrol.FlowSchemaCondition 395 oldValue flowcontrol.FlowSchemaCondition 396} 397 398// digestConfigObjects is given all the API objects that configure 399// cfgCtlr and writes its consequent new configState. 400// Only invoke this in the one and only worker goroutine 401func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) { 402 fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs) 403 var errs []error 404 currResult := updateAttempt{ 405 timeUpdated: cfgCtlr.clock.Now(), 406 updatedItems: sets.String{}, 407 } 408 var suggestedDelay time.Duration 409 for _, fsu := range fsStatusUpdates { 410 // if we should skip this name, indicate we will need a delay, but continue with other entries 411 if cfgCtlr.shouldDelayUpdate(fsu.flowSchema.Name) { 412 if suggestedDelay == 0 { 413 suggestedDelay = time.Duration(30+rand.Intn(45)) * time.Second 414 } 415 continue 416 } 417 418 // if we are going to issue an update, be sure we track every name we update so we know if we update it too often. 419 currResult.updatedItems.Insert(fsu.flowSchema.Name) 420 421 enc, err := json.Marshal(fsu.condition) 422 if err != nil { 423 // should never happen because these conditions are created here and well formed 424 panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) 425 } 426 klog.V(4).Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s", cfgCtlr.name, string(enc), fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue)) 427 fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() 428 patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc))) 429 patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} 430 patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") 431 if err == nil { 432 key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema) 433 flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion 434 } else if apierrors.IsNotFound(err) { 435 // This object has been deleted. A notification is coming 436 // and nothing more needs to be done here. 437 klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name) 438 } else { 439 errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err)) 440 } 441 } 442 cfgCtlr.addUpdateResult(currResult) 443 444 return suggestedDelay, utilerrors.NewAggregate(errs) 445} 446 447// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed. 448// Only invoke this in the one and only worker goroutine 449func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool { 450 numUpdatesInPastMinute := 0 451 oneMinuteAgo := cfgCtlr.clock.Now().Add(-1 * time.Minute) 452 for idx, update := range cfgCtlr.mostRecentUpdates { 453 if oneMinuteAgo.After(update.timeUpdated) { 454 // this and the remaining items are no longer relevant 455 cfgCtlr.mostRecentUpdates = cfgCtlr.mostRecentUpdates[:idx] 456 return false 457 } 458 if update.updatedItems.Has(flowSchemaName) { 459 numUpdatesInPastMinute++ 460 if numUpdatesInPastMinute > 5 { 461 return true 462 } 463 } 464 } 465 return false 466} 467 468// addUpdateResult adds the result. It isn't a ring buffer because 469// this is small and rate limited. 470// Only invoke this in the one and only worker goroutine 471func (cfgCtlr *configController) addUpdateResult(result updateAttempt) { 472 cfgCtlr.mostRecentUpdates = append([]updateAttempt{result}, cfgCtlr.mostRecentUpdates...) 473} 474 475func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate { 476 cfgCtlr.lock.Lock() 477 defer cfgCtlr.lock.Unlock() 478 meal := cfgMeal{ 479 cfgCtlr: cfgCtlr, 480 newPLStates: make(map[string]*priorityLevelState), 481 } 482 483 meal.digestNewPLsLocked(newPLs) 484 meal.digestFlowSchemasLocked(newFSs) 485 meal.processOldPLsLocked() 486 487 // Supply missing mandatory PriorityLevelConfiguration objects 488 if !meal.haveExemptPL { 489 meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit) 490 } 491 if !meal.haveCatchAllPL { 492 meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit) 493 } 494 495 meal.finishQueueSetReconfigsLocked() 496 497 // The new config has been constructed 498 cfgCtlr.priorityLevelStates = meal.newPLStates 499 klog.V(5).Infof("Switched to new API Priority and Fairness configuration") 500 return meal.fsStatusUpdates 501} 502 503// Digest the new set of PriorityLevelConfiguration objects. 504// Pretend broken ones do not exist. 505func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfiguration) { 506 for _, pl := range newPLs { 507 state := meal.cfgCtlr.priorityLevelStates[pl.Name] 508 if state == nil { 509 state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})} 510 } 511 qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair) 512 if err != nil { 513 klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err) 514 continue 515 } 516 meal.newPLStates[pl.Name] = state 517 state.pl = pl 518 state.qsCompleter = qsCompleter 519 if state.quiescing { // it was undesired, but no longer 520 klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name) 521 state.quiescing = false 522 } 523 if state.pl.Spec.Limited != nil { 524 meal.shareSum += float64(state.pl.Spec.Limited.AssuredConcurrencyShares) 525 } 526 meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt 527 meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll 528 } 529} 530 531// Digest the given FlowSchema objects. Ones that reference a missing 532// or broken priority level are not to be passed on to the filter for 533// use. We do this before holding over old priority levels so that 534// requests stop going to those levels and FlowSchemaStatus values 535// reflect this. This function also adds any missing mandatory 536// FlowSchema objects. The given objects must all have distinct 537// names. 538func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) { 539 fsSeq := make(apihelpers.FlowSchemaSequence, 0, len(newFSs)) 540 fsMap := make(map[string]*flowcontrol.FlowSchema, len(newFSs)) 541 var haveExemptFS, haveCatchAllFS bool 542 for i, fs := range newFSs { 543 otherFS := fsMap[fs.Name] 544 if otherFS != nil { 545 // This client is forbidden to do this. 546 panic(fmt.Sprintf("Given two FlowSchema objects with the same name: %s and %s", fcfmt.Fmt(otherFS), fcfmt.Fmt(fs))) 547 } 548 fsMap[fs.Name] = fs 549 _, goodPriorityRef := meal.newPLStates[fs.Spec.PriorityLevelConfiguration.Name] 550 551 // Ensure the object's status reflects whether its priority 552 // level reference is broken. 553 // 554 // TODO: consider not even trying if server is not handling 555 // requests yet. 556 meal.presyncFlowSchemaStatus(fs, meal.cfgCtlr.foundToDangling(goodPriorityRef), fs.Spec.PriorityLevelConfiguration.Name) 557 558 if !goodPriorityRef { 559 klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name) 560 continue 561 } 562 fsSeq = append(fsSeq, newFSs[i]) 563 haveExemptFS = haveExemptFS || fs.Name == flowcontrol.FlowSchemaNameExempt 564 haveCatchAllFS = haveCatchAllFS || fs.Name == flowcontrol.FlowSchemaNameCatchAll 565 } 566 // sort into the order to be used for matching 567 sort.Sort(fsSeq) 568 569 // Supply missing mandatory FlowSchemas, in correct position 570 if !haveExemptFS { 571 fsSeq = append(apihelpers.FlowSchemaSequence{fcboot.MandatoryFlowSchemaExempt}, fsSeq...) 572 } 573 if !haveCatchAllFS { 574 fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll) 575 } 576 577 meal.cfgCtlr.flowSchemas = fsSeq 578 if klog.V(5).Enabled() { 579 for _, fs := range fsSeq { 580 klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs)) 581 } 582 } 583} 584 585// Consider all the priority levels in the previous configuration. 586// Keep the ones that are in the new config, supply mandatory 587// behavior, or are still busy; for the rest: drop it if it has no 588// queues, otherwise start the quiescing process if that has not 589// already been started. 590func (meal *cfgMeal) processOldPLsLocked() { 591 for plName, plState := range meal.cfgCtlr.priorityLevelStates { 592 if meal.newPLStates[plName] != nil { 593 // Still desired and already updated 594 continue 595 } 596 if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL { 597 // BTW, we know the Spec has not changed because the 598 // mandatory objects have immutable Specs 599 klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName) 600 } else { 601 if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() { 602 // Either there are no queues or they are done 603 // draining and no use is coming from another 604 // goroutine 605 klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type) 606 continue 607 } 608 if !plState.quiescing { 609 klog.V(3).Infof("Priority level %q became undesired", plName) 610 plState.quiescing = true 611 } 612 } 613 var err error 614 plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair) 615 if err != nil { 616 // This can not happen because queueSetCompleterForPL already approved this config 617 panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec))) 618 } 619 if plState.pl.Spec.Limited != nil { 620 // We deliberately include the lingering priority levels 621 // here so that their queues get some concurrency and they 622 // continue to drain. During this interim a lingering 623 // priority level continues to get a concurrency 624 // allocation determined by all the share values in the 625 // regular way. 626 meal.shareSum += float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) 627 } 628 meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt 629 meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll 630 meal.newPLStates[plName] = plState 631 } 632} 633 634// For all the priority levels of the new config, divide up the 635// server's total concurrency limit among them and create/update their 636// QueueSets. 637func (meal *cfgMeal) finishQueueSetReconfigsLocked() { 638 for plName, plState := range meal.newPLStates { 639 if plState.pl.Spec.Limited == nil { 640 klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing) 641 continue 642 } 643 644 // The use of math.Ceil here means that the results might sum 645 // to a little more than serverConcurrencyLimit but the 646 // difference will be negligible. 647 concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum)) 648 metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit) 649 650 if plState.queues == nil { 651 klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum) 652 } else { 653 klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum) 654 } 655 plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit}) 656 } 657} 658 659// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the 660// given priority level configuration. Returns nil if that config 661// does not call for limiting. Returns nil and an error if the given 662// object is malformed in a way that is a problem for this package. 663func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) { 664 if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) { 665 return nil, errors.New("broken union structure at the top") 666 } 667 if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) { 668 // This package does not attempt to cope with a priority level dynamically switching between exempt and not. 669 return nil, errors.New("non-alignment between name and type") 670 } 671 if pl.Spec.Limited == nil { 672 return nil, nil 673 } 674 if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) { 675 return nil, errors.New("broken union structure for limit response") 676 } 677 qcAPI := pl.Spec.Limited.LimitResponse.Queuing 678 qcQS := fq.QueuingConfig{Name: pl.Name} 679 if qcAPI != nil { 680 qcQS = fq.QueuingConfig{Name: pl.Name, 681 DesiredNumQueues: int(qcAPI.Queues), 682 QueueLengthLimit: int(qcAPI.QueueLengthLimit), 683 HandSize: int(qcAPI.HandSize), 684 RequestWaitLimit: requestWaitLimit, 685 } 686 } 687 var qsc fq.QueueSetCompleter 688 var err error 689 if queues != nil { 690 qsc, err = queues.BeginConfigChange(qcQS) 691 } else { 692 qsc, err = qsf.BeginConstruction(qcQS, intPair) 693 } 694 if err != nil { 695 err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err) 696 } 697 return qsc, err 698} 699 700func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangling bool, plName string) { 701 danglingCondition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling) 702 if danglingCondition == nil { 703 danglingCondition = &flowcontrol.FlowSchemaCondition{ 704 Type: flowcontrol.FlowSchemaConditionDangling, 705 } 706 } 707 desiredStatus := flowcontrol.ConditionFalse 708 var desiredReason, desiredMessage string 709 if isDangling { 710 desiredStatus = flowcontrol.ConditionTrue 711 desiredReason = "NotFound" 712 desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q but there is no such object", plName) 713 } else { 714 desiredReason = "Found" 715 desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q and it exists", plName) 716 } 717 if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage { 718 return 719 } 720 now := meal.cfgCtlr.clock.Now() 721 meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{ 722 flowSchema: fs, 723 condition: flowcontrol.FlowSchemaCondition{ 724 Type: flowcontrol.FlowSchemaConditionDangling, 725 Status: desiredStatus, 726 LastTransitionTime: metav1.NewTime(now), 727 Reason: desiredReason, 728 Message: desiredMessage, 729 }, 730 oldValue: *danglingCondition}) 731} 732 733// imaginePL adds a priority level based on one of the mandatory ones 734// that does not actually exist (right now) as a real API object. 735func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) { 736 klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name) 737 obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name}) 738 qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair) 739 if err != nil { 740 // This can not happen because proto is one of the mandatory 741 // objects and these are not erroneous 742 panic(err) 743 } 744 meal.newPLStates[proto.Name] = &priorityLevelState{ 745 pl: proto, 746 qsCompleter: qsCompleter, 747 obsPair: obsPair, 748 } 749 if proto.Spec.Limited != nil { 750 meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares) 751 } 752} 753 754type immediateRequest struct{} 755 756func (immediateRequest) Finish(execute func()) bool { 757 execute() 758 return false 759} 760 761// startRequest classifies and, if appropriate, enqueues the request. 762// Returns a nil Request if and only if the request is to be rejected. 763// The returned bool indicates whether the request is exempt from 764// limitation. The startWaitingTime is when the request started 765// waiting in its queue, or `Time{}` if this did not happen. 766func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) { 767 klog.V(7).Infof("startRequest(%#+v)", rd) 768 cfgCtlr.lock.Lock() 769 defer cfgCtlr.lock.Unlock() 770 var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema 771 for _, fs := range cfgCtlr.flowSchemas { 772 if matchesFlowSchema(rd, fs) { 773 selectedFlowSchema = fs 774 break 775 } 776 if fs.Name == flowcontrol.FlowSchemaNameCatchAll { 777 catchAllFlowSchema = fs 778 } 779 } 780 if selectedFlowSchema == nil { 781 // This should never happen. If the requestDigest's User is a part of 782 // system:authenticated or system:unauthenticated, the catch-all flow 783 // schema should match it. However, if that invariant somehow fails, 784 // fallback to the catch-all flow schema anyway. 785 if catchAllFlowSchema == nil { 786 // This should absolutely never, ever happen! APF guarantees two 787 // undeletable flow schemas at all times: an exempt flow schema and a 788 // catch-all flow schema. 789 panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User)) 790 } 791 selectedFlowSchema = catchAllFlowSchema 792 klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema)) 793 } 794 plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name 795 plState := cfgCtlr.priorityLevelStates[plName] 796 if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt { 797 klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName) 798 return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{} 799 } 800 var numQueues int32 801 if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue { 802 numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues 803 } 804 var flowDistinguisher string 805 var hashValue uint64 806 if numQueues > 1 { 807 flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod) 808 hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher) 809 } 810 startWaitingTime = time.Now() 811 klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues) 812 req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn) 813 if idle { 814 cfgCtlr.maybeReapLocked(plName, plState) 815 } 816 return selectedFlowSchema, plState.pl, false, req, startWaitingTime 817} 818 819// maybeReap will remove the last internal traces of the named 820// priority level if it has no more use. Call this after getting a 821// clue that the given priority level is undesired and idle. 822func (cfgCtlr *configController) maybeReap(plName string) { 823 cfgCtlr.lock.Lock() 824 defer cfgCtlr.lock.Unlock() 825 plState := cfgCtlr.priorityLevelStates[plName] 826 if plState == nil { 827 klog.V(7).Infof("plName=%s, plState==nil", plName) 828 return 829 } 830 if plState.queues != nil { 831 useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle() 832 klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless) 833 if !useless { 834 return 835 } 836 } 837 klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName) 838 cfgCtlr.configQueue.Add(0) 839} 840 841// maybeReapLocked requires the cfgCtlr's lock to already be held and 842// will remove the last internal traces of the named priority level if 843// it has no more use. Call this if both (1) plState.queues is 844// non-nil and reported being idle, and (2) cfgCtlr's lock has not 845// been released since then. 846func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) { 847 if !(plState.quiescing && plState.numPending == 0) { 848 return 849 } 850 klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName) 851 cfgCtlr.configQueue.Add(0) 852} 853 854// computeFlowDistinguisher extracts the flow distinguisher according to the given method 855func computeFlowDistinguisher(rd RequestDigest, method *flowcontrol.FlowDistinguisherMethod) string { 856 if method == nil { 857 return "" 858 } 859 switch method.Type { 860 case flowcontrol.FlowDistinguisherMethodByUserType: 861 return rd.User.GetName() 862 case flowcontrol.FlowDistinguisherMethodByNamespaceType: 863 return rd.RequestInfo.Namespace 864 default: 865 // this line shall never reach 866 panic("invalid flow-distinguisher method") 867 } 868} 869 870func hashFlowID(fsName, fDistinguisher string) uint64 { 871 hash := sha256.New() 872 var sep = [1]byte{0} 873 hash.Write([]byte(fsName)) 874 hash.Write(sep[:]) 875 hash.Write([]byte(fDistinguisher)) 876 var sum [32]byte 877 hash.Sum(sum[:0]) 878 return binary.LittleEndian.Uint64(sum[:8]) 879} 880