1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package flowcontrol
18
19import (
20	"context"
21	"crypto/sha256"
22	"encoding/binary"
23	"encoding/json"
24	"errors"
25	"fmt"
26	"math"
27	"math/rand"
28	"sort"
29	"sync"
30	"time"
31
32	apiequality "k8s.io/apimachinery/pkg/api/equality"
33	apierrors "k8s.io/apimachinery/pkg/api/errors"
34	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35	"k8s.io/apimachinery/pkg/labels"
36	apitypes "k8s.io/apimachinery/pkg/types"
37	"k8s.io/apimachinery/pkg/util/clock"
38	utilerrors "k8s.io/apimachinery/pkg/util/errors"
39	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40	"k8s.io/apimachinery/pkg/util/sets"
41	"k8s.io/apimachinery/pkg/util/wait"
42	fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
43	"k8s.io/apiserver/pkg/authentication/user"
44	"k8s.io/apiserver/pkg/endpoints/request"
45	"k8s.io/apiserver/pkg/util/apihelpers"
46	fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
47	fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
48	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
49	fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
50	"k8s.io/client-go/tools/cache"
51	"k8s.io/client-go/util/workqueue"
52	"k8s.io/klog/v2"
53
54	flowcontrol "k8s.io/api/flowcontrol/v1beta1"
55	flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
56	flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1"
57)
58
59const timeFmt = "2006-01-02T15:04:05.999"
60
61// This file contains a simple local (to the apiserver) controller
62// that digests API Priority and Fairness config objects (FlowSchema
63// and PriorityLevelConfiguration) into the data structure that the
64// filter uses.  At this first level of development this controller
65// takes the simplest possible approach: whenever notified of any
66// change to any config object, or when any priority level that is
67// undesired becomes completely unused, all the config objects are
68// read and processed as a whole.
69
70// StartFunction begins the process of handling a request.  If the
71// request gets queued then this function uses the given hashValue as
72// the source of entropy as it shuffle-shards the request into a
73// queue.  The descr1 and descr2 values play no role in the logic but
74// appear in log messages.  This method does not return until the
75// queuing, if any, for this request is done.  If `execute` is false
76// then `afterExecution` is irrelevant and the request should be
77// rejected.  Otherwise the request should be executed and
78// `afterExecution` must be called exactly once.
79type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, afterExecution func())
80
81// RequestDigest holds necessary info from request for flow-control
82type RequestDigest struct {
83	RequestInfo *request.RequestInfo
84	User        user.Info
85	Width       fcrequest.Width
86}
87
88// `*configController` maintains eventual consistency with the API
89// objects that configure API Priority and Fairness, and provides a
90// procedural interface to the configured behavior.  The methods of
91// this type and cfgMeal follow the convention that the suffix
92// "Locked" means that the caller must hold the configController lock.
93type configController struct {
94	name             string // varies in tests of fighting controllers
95	clock            clock.PassiveClock
96	queueSetFactory  fq.QueueSetFactory
97	obsPairGenerator metrics.TimedObserverPairGenerator
98
99	// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
100	asFieldManager string
101
102	// Given a boolean indicating whether a FlowSchema's referenced
103	// PriorityLevelConfig exists, return a boolean indicating whether
104	// the reference is dangling
105	foundToDangling func(bool) bool
106
107	// configQueue holds `(interface{})(0)` when the configuration
108	// objects need to be reprocessed.
109	configQueue workqueue.RateLimitingInterface
110
111	plLister         flowcontrollister.PriorityLevelConfigurationLister
112	plInformerSynced cache.InformerSynced
113
114	fsLister         flowcontrollister.FlowSchemaLister
115	fsInformerSynced cache.InformerSynced
116
117	flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface
118
119	// serverConcurrencyLimit is the limit on the server's total
120	// number of non-exempt requests being served at once.  This comes
121	// from server configuration.
122	serverConcurrencyLimit int
123
124	// requestWaitLimit comes from server configuration.
125	requestWaitLimit time.Duration
126
127	// This must be locked while accessing flowSchemas or
128	// priorityLevelStates.  It is the lock involved in
129	// LockingWriteMultiple.
130	lock sync.Mutex
131
132	// flowSchemas holds the flow schema objects, sorted by increasing
133	// numerical (decreasing logical) matching precedence.  Every
134	// FlowSchema in this slice is immutable.
135	flowSchemas apihelpers.FlowSchemaSequence
136
137	// priorityLevelStates maps the PriorityLevelConfiguration object
138	// name to the state for that level.  Every name referenced from a
139	// member of `flowSchemas` has an entry here.
140	priorityLevelStates map[string]*priorityLevelState
141
142	// the most recent update attempts, ordered by increasing age.
143	// Consumer trims to keep only the last minute's worth of entries.
144	// The controller uses this to limit itself to at most six updates
145	// to a given FlowSchema in any minute.
146	// This may only be accessed from the one and only worker goroutine.
147	mostRecentUpdates []updateAttempt
148
149	// watchTracker implements the necessary WatchTracker interface.
150	WatchTracker
151}
152
153type updateAttempt struct {
154	timeUpdated  time.Time
155	updatedItems sets.String // FlowSchema names
156}
157
158// priorityLevelState holds the state specific to a priority level.
159type priorityLevelState struct {
160	// the API object or prototype prescribing this level.  Nothing
161	// reached through this pointer is mutable.
162	pl *flowcontrol.PriorityLevelConfiguration
163
164	// qsCompleter holds the QueueSetCompleter derived from `config`
165	// and `queues` if config is not exempt, nil otherwise.
166	qsCompleter fq.QueueSetCompleter
167
168	// The QueueSet for this priority level.  This is nil if and only
169	// if the priority level is exempt.
170	queues fq.QueueSet
171
172	// quiescing==true indicates that this priority level should be
173	// removed when its queues have all drained.  May be true only if
174	// queues is non-nil.
175	quiescing bool
176
177	// number of goroutines between Controller::Match and calling the
178	// returned StartFunction
179	numPending int
180
181	// Observers tracking number waiting, executing
182	obsPair metrics.TimedObserverPair
183}
184
185// NewTestableController is extra flexible to facilitate testing
186func newTestableController(config TestableConfig) *configController {
187	cfgCtlr := &configController{
188		name:                   config.Name,
189		clock:                  config.Clock,
190		queueSetFactory:        config.QueueSetFactory,
191		obsPairGenerator:       config.ObsPairGenerator,
192		asFieldManager:         config.AsFieldManager,
193		foundToDangling:        config.FoundToDangling,
194		serverConcurrencyLimit: config.ServerConcurrencyLimit,
195		requestWaitLimit:       config.RequestWaitLimit,
196		flowcontrolClient:      config.FlowcontrolClient,
197		priorityLevelStates:    make(map[string]*priorityLevelState),
198		WatchTracker:           NewWatchTracker(),
199	}
200	klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
201	// Start with longish delay because conflicts will be between
202	// different processes, so take some time to go away.
203	cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
204	// ensure the data structure reflects the mandatory config
205	cfgCtlr.lockAndDigestConfigObjects(nil, nil)
206	fci := config.InformerFactory.Flowcontrol().V1beta1()
207	pli := fci.PriorityLevelConfigurations()
208	fsi := fci.FlowSchemas()
209	cfgCtlr.plLister = pli.Lister()
210	cfgCtlr.plInformerSynced = pli.Informer().HasSynced
211	cfgCtlr.fsLister = fsi.Lister()
212	cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced
213	pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
214		AddFunc: func(obj interface{}) {
215			pl := obj.(*flowcontrol.PriorityLevelConfiguration)
216			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of PLC %s", cfgCtlr.name, pl.Name)
217			cfgCtlr.configQueue.Add(0)
218		},
219		UpdateFunc: func(oldObj, newObj interface{}) {
220			newPL := newObj.(*flowcontrol.PriorityLevelConfiguration)
221			oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration)
222			if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
223				klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec update of PLC %s", cfgCtlr.name, newPL.Name)
224				cfgCtlr.configQueue.Add(0)
225			} else {
226				klog.V(7).Infof("No trigger API priority and fairness config reloading in %s due to spec non-change of PLC %s", cfgCtlr.name, newPL.Name)
227			}
228		},
229		DeleteFunc: func(obj interface{}) {
230			name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
231			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of PLC %s", cfgCtlr.name, name)
232			cfgCtlr.configQueue.Add(0)
233
234		}})
235	fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
236		AddFunc: func(obj interface{}) {
237			fs := obj.(*flowcontrol.FlowSchema)
238			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of FS %s", cfgCtlr.name, fs.Name)
239			cfgCtlr.configQueue.Add(0)
240		},
241		UpdateFunc: func(oldObj, newObj interface{}) {
242			newFS := newObj.(*flowcontrol.FlowSchema)
243			oldFS := oldObj.(*flowcontrol.FlowSchema)
244			// Changes to either Spec or Status are relevant.  The
245			// concern is that we might, in some future release, want
246			// different behavior than is implemented now. One of the
247			// hardest questions is how does an operator roll out the
248			// new release in a cluster with multiple kube-apiservers
249			// --- in a way that works no matter what servers crash
250			// and restart when. If this handler reacts only to
251			// changes in Spec then we have a scenario in which the
252			// rollout leaves the old Status in place. The scenario
253			// ends with this subsequence: deploy the last new server
254			// before deleting the last old server, and in between
255			// those two operations the last old server crashes and
256			// recovers. The chosen solution is making this controller
257			// insist on maintaining the particular state that it
258			// establishes.
259			if !(apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) &&
260				apiequality.Semantic.DeepEqual(oldFS.Status, newFS.Status)) {
261				klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec and/or status update of FS %s", cfgCtlr.name, newFS.Name)
262				cfgCtlr.configQueue.Add(0)
263			} else {
264				klog.V(7).Infof("No trigger of API priority and fairness config reloading in %s due to spec and status non-change of FS %s", cfgCtlr.name, newFS.Name)
265			}
266		},
267		DeleteFunc: func(obj interface{}) {
268			name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
269			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of FS %s", cfgCtlr.name, name)
270			cfgCtlr.configQueue.Add(0)
271
272		}})
273	return cfgCtlr
274}
275
276// MaintainObservations keeps the observers from
277// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
278// too far behind
279func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
280	wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
281}
282
283func (cfgCtlr *configController) updateObservations() {
284	cfgCtlr.lock.Lock()
285	defer cfgCtlr.lock.Unlock()
286	for _, plc := range cfgCtlr.priorityLevelStates {
287		if plc.queues != nil {
288			plc.queues.UpdateObservations()
289		}
290	}
291}
292
293func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
294	defer utilruntime.HandleCrash()
295
296	// Let the config worker stop when we are done
297	defer cfgCtlr.configQueue.ShutDown()
298
299	klog.Info("Starting API Priority and Fairness config controller")
300	if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
301		return fmt.Errorf("Never achieved initial sync")
302	}
303
304	klog.Info("Running API Priority and Fairness config worker")
305	go wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
306
307	<-stopCh
308	klog.Info("Shutting down API Priority and Fairness config worker")
309	return nil
310}
311
312// runWorker is the logic of the one and only worker goroutine.  We
313// limit the number to one in order to obviate explicit
314// synchronization around access to `cfgCtlr.mostRecentUpdates`.
315func (cfgCtlr *configController) runWorker() {
316	for cfgCtlr.processNextWorkItem() {
317	}
318}
319
320// processNextWorkItem works on one entry from the work queue.
321// Only invoke this in the one and only worker goroutine.
322func (cfgCtlr *configController) processNextWorkItem() bool {
323	obj, shutdown := cfgCtlr.configQueue.Get()
324	if shutdown {
325		return false
326	}
327
328	func(obj interface{}) {
329		defer cfgCtlr.configQueue.Done(obj)
330		specificDelay, err := cfgCtlr.syncOne(map[string]string{})
331		switch {
332		case err != nil:
333			klog.Error(err)
334			cfgCtlr.configQueue.AddRateLimited(obj)
335		case specificDelay > 0:
336			cfgCtlr.configQueue.AddAfter(obj, specificDelay)
337		default:
338			cfgCtlr.configQueue.Forget(obj)
339		}
340	}(obj)
341
342	return true
343}
344
345// syncOne does one full synchronization.  It reads all the API
346// objects that configure API Priority and Fairness and updates the
347// local configController accordingly.
348// Only invoke this in the one and only worker goroutine
349func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) {
350	klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt))
351	all := labels.Everything()
352	newPLs, err := cfgCtlr.plLister.List(all)
353	if err != nil {
354		return 0, fmt.Errorf("unable to list PriorityLevelConfiguration objects: %w", err)
355	}
356	newFSs, err := cfgCtlr.fsLister.List(all)
357	if err != nil {
358		return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err)
359	}
360	return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs)
361}
362
363// cfgMeal is the data involved in the process of digesting the API
364// objects that configure API Priority and Fairness.  All the config
365// objects are digested together, because this is the simplest way to
366// cope with the various dependencies between objects.  The process of
367// digestion is done in four passes over config objects --- three
368// passes over PriorityLevelConfigurations and one pass over the
369// FlowSchemas --- with the work dvided among the passes according to
370// those dependencies.
371type cfgMeal struct {
372	cfgCtlr *configController
373
374	newPLStates map[string]*priorityLevelState
375
376	// The sum of the concurrency shares of the priority levels in the
377	// new configuration
378	shareSum float64
379
380	// These keep track of which mandatory priority level config
381	// objects have been digested
382	haveExemptPL, haveCatchAllPL bool
383
384	// Buffered FlowSchema status updates to do.  Do them when the
385	// lock is not held, to avoid a deadlock due to such a request
386	// provoking a call into this controller while the lock held
387	// waiting on that request to complete.
388	fsStatusUpdates []fsStatusUpdate
389}
390
391// A buffered set of status updates for FlowSchemas
392type fsStatusUpdate struct {
393	flowSchema *flowcontrol.FlowSchema
394	condition  flowcontrol.FlowSchemaCondition
395	oldValue   flowcontrol.FlowSchemaCondition
396}
397
398// digestConfigObjects is given all the API objects that configure
399// cfgCtlr and writes its consequent new configState.
400// Only invoke this in the one and only worker goroutine
401func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) {
402	fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
403	var errs []error
404	currResult := updateAttempt{
405		timeUpdated:  cfgCtlr.clock.Now(),
406		updatedItems: sets.String{},
407	}
408	var suggestedDelay time.Duration
409	for _, fsu := range fsStatusUpdates {
410		// if we should skip this name, indicate we will need a delay, but continue with other entries
411		if cfgCtlr.shouldDelayUpdate(fsu.flowSchema.Name) {
412			if suggestedDelay == 0 {
413				suggestedDelay = time.Duration(30+rand.Intn(45)) * time.Second
414			}
415			continue
416		}
417
418		// if we are going to issue an update, be sure we track every name we update so we know if we update it too often.
419		currResult.updatedItems.Insert(fsu.flowSchema.Name)
420
421		enc, err := json.Marshal(fsu.condition)
422		if err != nil {
423			// should never happen because these conditions are created here and well formed
424			panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
425		}
426		klog.V(4).Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s", cfgCtlr.name, string(enc), fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue))
427		fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas()
428		patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc)))
429		patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager}
430		patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
431		if err == nil {
432			key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema)
433			flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion
434		} else if apierrors.IsNotFound(err) {
435			// This object has been deleted.  A notification is coming
436			// and nothing more needs to be done here.
437			klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name)
438		} else {
439			errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err))
440		}
441	}
442	cfgCtlr.addUpdateResult(currResult)
443
444	return suggestedDelay, utilerrors.NewAggregate(errs)
445}
446
447// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed.
448// Only invoke this in the one and only worker goroutine
449func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool {
450	numUpdatesInPastMinute := 0
451	oneMinuteAgo := cfgCtlr.clock.Now().Add(-1 * time.Minute)
452	for idx, update := range cfgCtlr.mostRecentUpdates {
453		if oneMinuteAgo.After(update.timeUpdated) {
454			// this and the remaining items are no longer relevant
455			cfgCtlr.mostRecentUpdates = cfgCtlr.mostRecentUpdates[:idx]
456			return false
457		}
458		if update.updatedItems.Has(flowSchemaName) {
459			numUpdatesInPastMinute++
460			if numUpdatesInPastMinute > 5 {
461				return true
462			}
463		}
464	}
465	return false
466}
467
468// addUpdateResult adds the result. It isn't a ring buffer because
469// this is small and rate limited.
470// Only invoke this in the one and only worker goroutine
471func (cfgCtlr *configController) addUpdateResult(result updateAttempt) {
472	cfgCtlr.mostRecentUpdates = append([]updateAttempt{result}, cfgCtlr.mostRecentUpdates...)
473}
474
475func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate {
476	cfgCtlr.lock.Lock()
477	defer cfgCtlr.lock.Unlock()
478	meal := cfgMeal{
479		cfgCtlr:     cfgCtlr,
480		newPLStates: make(map[string]*priorityLevelState),
481	}
482
483	meal.digestNewPLsLocked(newPLs)
484	meal.digestFlowSchemasLocked(newFSs)
485	meal.processOldPLsLocked()
486
487	// Supply missing mandatory PriorityLevelConfiguration objects
488	if !meal.haveExemptPL {
489		meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit)
490	}
491	if !meal.haveCatchAllPL {
492		meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit)
493	}
494
495	meal.finishQueueSetReconfigsLocked()
496
497	// The new config has been constructed
498	cfgCtlr.priorityLevelStates = meal.newPLStates
499	klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
500	return meal.fsStatusUpdates
501}
502
503// Digest the new set of PriorityLevelConfiguration objects.
504// Pretend broken ones do not exist.
505func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfiguration) {
506	for _, pl := range newPLs {
507		state := meal.cfgCtlr.priorityLevelStates[pl.Name]
508		if state == nil {
509			state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})}
510		}
511		qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair)
512		if err != nil {
513			klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
514			continue
515		}
516		meal.newPLStates[pl.Name] = state
517		state.pl = pl
518		state.qsCompleter = qsCompleter
519		if state.quiescing { // it was undesired, but no longer
520			klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name)
521			state.quiescing = false
522		}
523		if state.pl.Spec.Limited != nil {
524			meal.shareSum += float64(state.pl.Spec.Limited.AssuredConcurrencyShares)
525		}
526		meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt
527		meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll
528	}
529}
530
531// Digest the given FlowSchema objects.  Ones that reference a missing
532// or broken priority level are not to be passed on to the filter for
533// use.  We do this before holding over old priority levels so that
534// requests stop going to those levels and FlowSchemaStatus values
535// reflect this.  This function also adds any missing mandatory
536// FlowSchema objects.  The given objects must all have distinct
537// names.
538func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) {
539	fsSeq := make(apihelpers.FlowSchemaSequence, 0, len(newFSs))
540	fsMap := make(map[string]*flowcontrol.FlowSchema, len(newFSs))
541	var haveExemptFS, haveCatchAllFS bool
542	for i, fs := range newFSs {
543		otherFS := fsMap[fs.Name]
544		if otherFS != nil {
545			// This client is forbidden to do this.
546			panic(fmt.Sprintf("Given two FlowSchema objects with the same name: %s and %s", fcfmt.Fmt(otherFS), fcfmt.Fmt(fs)))
547		}
548		fsMap[fs.Name] = fs
549		_, goodPriorityRef := meal.newPLStates[fs.Spec.PriorityLevelConfiguration.Name]
550
551		// Ensure the object's status reflects whether its priority
552		// level reference is broken.
553		//
554		// TODO: consider not even trying if server is not handling
555		// requests yet.
556		meal.presyncFlowSchemaStatus(fs, meal.cfgCtlr.foundToDangling(goodPriorityRef), fs.Spec.PriorityLevelConfiguration.Name)
557
558		if !goodPriorityRef {
559			klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name)
560			continue
561		}
562		fsSeq = append(fsSeq, newFSs[i])
563		haveExemptFS = haveExemptFS || fs.Name == flowcontrol.FlowSchemaNameExempt
564		haveCatchAllFS = haveCatchAllFS || fs.Name == flowcontrol.FlowSchemaNameCatchAll
565	}
566	// sort into the order to be used for matching
567	sort.Sort(fsSeq)
568
569	// Supply missing mandatory FlowSchemas, in correct position
570	if !haveExemptFS {
571		fsSeq = append(apihelpers.FlowSchemaSequence{fcboot.MandatoryFlowSchemaExempt}, fsSeq...)
572	}
573	if !haveCatchAllFS {
574		fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
575	}
576
577	meal.cfgCtlr.flowSchemas = fsSeq
578	if klog.V(5).Enabled() {
579		for _, fs := range fsSeq {
580			klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs))
581		}
582	}
583}
584
585// Consider all the priority levels in the previous configuration.
586// Keep the ones that are in the new config, supply mandatory
587// behavior, or are still busy; for the rest: drop it if it has no
588// queues, otherwise start the quiescing process if that has not
589// already been started.
590func (meal *cfgMeal) processOldPLsLocked() {
591	for plName, plState := range meal.cfgCtlr.priorityLevelStates {
592		if meal.newPLStates[plName] != nil {
593			// Still desired and already updated
594			continue
595		}
596		if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
597			// BTW, we know the Spec has not changed because the
598			// mandatory objects have immutable Specs
599			klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName)
600		} else {
601			if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() {
602				// Either there are no queues or they are done
603				// draining and no use is coming from another
604				// goroutine
605				klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type)
606				continue
607			}
608			if !plState.quiescing {
609				klog.V(3).Infof("Priority level %q became undesired", plName)
610				plState.quiescing = true
611			}
612		}
613		var err error
614		plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair)
615		if err != nil {
616			// This can not happen because queueSetCompleterForPL already approved this config
617			panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
618		}
619		if plState.pl.Spec.Limited != nil {
620			// We deliberately include the lingering priority levels
621			// here so that their queues get some concurrency and they
622			// continue to drain.  During this interim a lingering
623			// priority level continues to get a concurrency
624			// allocation determined by all the share values in the
625			// regular way.
626			meal.shareSum += float64(plState.pl.Spec.Limited.AssuredConcurrencyShares)
627		}
628		meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt
629		meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll
630		meal.newPLStates[plName] = plState
631	}
632}
633
634// For all the priority levels of the new config, divide up the
635// server's total concurrency limit among them and create/update their
636// QueueSets.
637func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
638	for plName, plState := range meal.newPLStates {
639		if plState.pl.Spec.Limited == nil {
640			klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing)
641			continue
642		}
643
644		// The use of math.Ceil here means that the results might sum
645		// to a little more than serverConcurrencyLimit but the
646		// difference will be negligible.
647		concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
648		metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
649
650		if plState.queues == nil {
651			klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
652		} else {
653			klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
654		}
655		plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit})
656	}
657}
658
659// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
660// given priority level configuration.  Returns nil if that config
661// does not call for limiting.  Returns nil and an error if the given
662// object is malformed in a way that is a problem for this package.
663func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
664	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
665		return nil, errors.New("broken union structure at the top")
666	}
667	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) {
668		// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
669		return nil, errors.New("non-alignment between name and type")
670	}
671	if pl.Spec.Limited == nil {
672		return nil, nil
673	}
674	if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
675		return nil, errors.New("broken union structure for limit response")
676	}
677	qcAPI := pl.Spec.Limited.LimitResponse.Queuing
678	qcQS := fq.QueuingConfig{Name: pl.Name}
679	if qcAPI != nil {
680		qcQS = fq.QueuingConfig{Name: pl.Name,
681			DesiredNumQueues: int(qcAPI.Queues),
682			QueueLengthLimit: int(qcAPI.QueueLengthLimit),
683			HandSize:         int(qcAPI.HandSize),
684			RequestWaitLimit: requestWaitLimit,
685		}
686	}
687	var qsc fq.QueueSetCompleter
688	var err error
689	if queues != nil {
690		qsc, err = queues.BeginConfigChange(qcQS)
691	} else {
692		qsc, err = qsf.BeginConstruction(qcQS, intPair)
693	}
694	if err != nil {
695		err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
696	}
697	return qsc, err
698}
699
700func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangling bool, plName string) {
701	danglingCondition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
702	if danglingCondition == nil {
703		danglingCondition = &flowcontrol.FlowSchemaCondition{
704			Type: flowcontrol.FlowSchemaConditionDangling,
705		}
706	}
707	desiredStatus := flowcontrol.ConditionFalse
708	var desiredReason, desiredMessage string
709	if isDangling {
710		desiredStatus = flowcontrol.ConditionTrue
711		desiredReason = "NotFound"
712		desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q but there is no such object", plName)
713	} else {
714		desiredReason = "Found"
715		desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q and it exists", plName)
716	}
717	if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage {
718		return
719	}
720	now := meal.cfgCtlr.clock.Now()
721	meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{
722		flowSchema: fs,
723		condition: flowcontrol.FlowSchemaCondition{
724			Type:               flowcontrol.FlowSchemaConditionDangling,
725			Status:             desiredStatus,
726			LastTransitionTime: metav1.NewTime(now),
727			Reason:             desiredReason,
728			Message:            desiredMessage,
729		},
730		oldValue: *danglingCondition})
731}
732
733// imaginePL adds a priority level based on one of the mandatory ones
734// that does not actually exist (right now) as a real API object.
735func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
736	klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
737	obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
738	qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
739	if err != nil {
740		// This can not happen because proto is one of the mandatory
741		// objects and these are not erroneous
742		panic(err)
743	}
744	meal.newPLStates[proto.Name] = &priorityLevelState{
745		pl:          proto,
746		qsCompleter: qsCompleter,
747		obsPair:     obsPair,
748	}
749	if proto.Spec.Limited != nil {
750		meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
751	}
752}
753
754type immediateRequest struct{}
755
756func (immediateRequest) Finish(execute func()) bool {
757	execute()
758	return false
759}
760
761// startRequest classifies and, if appropriate, enqueues the request.
762// Returns a nil Request if and only if the request is to be rejected.
763// The returned bool indicates whether the request is exempt from
764// limitation.  The startWaitingTime is when the request started
765// waiting in its queue, or `Time{}` if this did not happen.
766func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
767	klog.V(7).Infof("startRequest(%#+v)", rd)
768	cfgCtlr.lock.Lock()
769	defer cfgCtlr.lock.Unlock()
770	var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
771	for _, fs := range cfgCtlr.flowSchemas {
772		if matchesFlowSchema(rd, fs) {
773			selectedFlowSchema = fs
774			break
775		}
776		if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
777			catchAllFlowSchema = fs
778		}
779	}
780	if selectedFlowSchema == nil {
781		// This should never happen. If the requestDigest's User is a part of
782		// system:authenticated or system:unauthenticated, the catch-all flow
783		// schema should match it. However, if that invariant somehow fails,
784		// fallback to the catch-all flow schema anyway.
785		if catchAllFlowSchema == nil {
786			// This should absolutely never, ever happen! APF guarantees two
787			// undeletable flow schemas at all times: an exempt flow schema and a
788			// catch-all flow schema.
789			panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User))
790		}
791		selectedFlowSchema = catchAllFlowSchema
792		klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema))
793	}
794	plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
795	plState := cfgCtlr.priorityLevelStates[plName]
796	if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
797		klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
798		return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
799	}
800	var numQueues int32
801	if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
802		numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
803	}
804	var flowDistinguisher string
805	var hashValue uint64
806	if numQueues > 1 {
807		flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
808		hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
809	}
810	startWaitingTime = time.Now()
811	klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
812	req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
813	if idle {
814		cfgCtlr.maybeReapLocked(plName, plState)
815	}
816	return selectedFlowSchema, plState.pl, false, req, startWaitingTime
817}
818
819// maybeReap will remove the last internal traces of the named
820// priority level if it has no more use.  Call this after getting a
821// clue that the given priority level is undesired and idle.
822func (cfgCtlr *configController) maybeReap(plName string) {
823	cfgCtlr.lock.Lock()
824	defer cfgCtlr.lock.Unlock()
825	plState := cfgCtlr.priorityLevelStates[plName]
826	if plState == nil {
827		klog.V(7).Infof("plName=%s, plState==nil", plName)
828		return
829	}
830	if plState.queues != nil {
831		useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle()
832		klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless)
833		if !useless {
834			return
835		}
836	}
837	klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
838	cfgCtlr.configQueue.Add(0)
839}
840
841// maybeReapLocked requires the cfgCtlr's lock to already be held and
842// will remove the last internal traces of the named priority level if
843// it has no more use.  Call this if both (1) plState.queues is
844// non-nil and reported being idle, and (2) cfgCtlr's lock has not
845// been released since then.
846func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
847	if !(plState.quiescing && plState.numPending == 0) {
848		return
849	}
850	klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
851	cfgCtlr.configQueue.Add(0)
852}
853
854// computeFlowDistinguisher extracts the flow distinguisher according to the given method
855func computeFlowDistinguisher(rd RequestDigest, method *flowcontrol.FlowDistinguisherMethod) string {
856	if method == nil {
857		return ""
858	}
859	switch method.Type {
860	case flowcontrol.FlowDistinguisherMethodByUserType:
861		return rd.User.GetName()
862	case flowcontrol.FlowDistinguisherMethodByNamespaceType:
863		return rd.RequestInfo.Namespace
864	default:
865		// this line shall never reach
866		panic("invalid flow-distinguisher method")
867	}
868}
869
870func hashFlowID(fsName, fDistinguisher string) uint64 {
871	hash := sha256.New()
872	var sep = [1]byte{0}
873	hash.Write([]byte(fsName))
874	hash.Write(sep[:])
875	hash.Write([]byte(fDistinguisher))
876	var sum [32]byte
877	hash.Sum(sum[:0])
878	return binary.LittleEndian.Uint64(sum[:8])
879}
880