17package flowcontrol
19import (
20	"context"
21	"crypto/sha256"
22	"encoding/binary"
23	"encoding/json"
24	"errors"
25	"fmt"
26	"math"
27	"math/rand"
28	"sort"
29	"sync"
30	"time"
32	apiequality "k8s.io/apimachinery/pkg/api/equality"
33	apierrors "k8s.io/apimachinery/pkg/api/errors"
34	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35	"k8s.io/apimachinery/pkg/labels"
36	apitypes "k8s.io/apimachinery/pkg/types"
37	"k8s.io/apimachinery/pkg/util/clock"
38	utilerrors "k8s.io/apimachinery/pkg/util/errors"
39	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
40	"k8s.io/apimachinery/pkg/util/sets"
41	"k8s.io/apimachinery/pkg/util/wait"
42	fcboot "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
43	"k8s.io/apiserver/pkg/authentication/user"
44	"k8s.io/apiserver/pkg/endpoints/request"
45	"k8s.io/apiserver/pkg/util/apihelpers"
46	fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
47	fcfmt "k8s.io/apiserver/pkg/util/flowcontrol/format"
48	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
49	fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
50	"k8s.io/client-go/tools/cache"
51	"k8s.io/client-go/util/workqueue"
52	"k8s.io/klog/v2"
54	flowcontrol "k8s.io/api/flowcontrol/v1beta1"
55	flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
56	flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta1"
59const timeFmt = "2006-01-02T15:04:05.999"
61// This file contains a simple local (to the apiserver) controller
62// that digests API Priority and Fairness config objects (FlowSchema
63// and PriorityLevelConfiguration) into the data structure that the
64// filter uses.  At this first level of development this controller
65// takes the simplest possible approach: whenever notified of any
66// change to any config object, or when any priority level that is
67// undesired becomes completely unused, all the config objects are
68// read and processed as a whole.
70// StartFunction begins the process of handling a request.  If the
71// request gets queued then this function uses the given hashValue as
72// the source of entropy as it shuffle-shards the request into a
73// queue.  The descr1 and descr2 values play no role in the logic but
74// appear in log messages.  This method does not return until the
75// queuing, if any, for this request is done.  If `execute` is false
76// then `afterExecution` is irrelevant and the request should be
77// rejected.  Otherwise the request should be executed and
78// `afterExecution` must be called exactly once.
79type StartFunction func(ctx context.Context, hashValue uint64) (execute bool, afterExecution func())
81// RequestDigest holds necessary info from request for flow-control
82type RequestDigest struct {
83	RequestInfo *request.RequestInfo
84	User        user.Info
85	Width       fcrequest.Width
88// `*configController` maintains eventual consistency with the API
89// objects that configure API Priority and Fairness, and provides a
90// procedural interface to the configured behavior.  The methods of
91// this type and cfgMeal follow the convention that the suffix
92// "Locked" means that the caller must hold the configController lock.
93type configController struct {
94	name             string // varies in tests of fighting controllers
95	clock            clock.PassiveClock
96	queueSetFactory  fq.QueueSetFactory
97	obsPairGenerator metrics.TimedObserverPairGenerator
99	// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
100	asFieldManager string
102	// Given a boolean indicating whether a FlowSchema's referenced
103	// PriorityLevelConfig exists, return a boolean indicating whether
104	// the reference is dangling
105	foundToDangling func(bool) bool
107	// configQueue holds `(interface{})(0)` when the configuration
108	// objects need to be reprocessed.
109	configQueue workqueue.RateLimitingInterface
111	plLister         flowcontrollister.PriorityLevelConfigurationLister
112	plInformerSynced cache.InformerSynced
114	fsLister         flowcontrollister.FlowSchemaLister
115	fsInformerSynced cache.InformerSynced
117	flowcontrolClient flowcontrolclient.FlowcontrolV1beta1Interface
119	// serverConcurrencyLimit is the limit on the server's total
120	// number of non-exempt requests being served at once.  This comes
121	// from server configuration.
122	serverConcurrencyLimit int
124	// requestWaitLimit comes from server configuration.
125	requestWaitLimit time.Duration
127	// This must be locked while accessing flowSchemas or
128	// priorityLevelStates.  It is the lock involved in
129	// LockingWriteMultiple.
130	lock sync.Mutex
132	// flowSchemas holds the flow schema objects, sorted by increasing
133	// numerical (decreasing logical) matching precedence.  Every
134	// FlowSchema in this slice is immutable.
135	flowSchemas apihelpers.FlowSchemaSequence
137	// priorityLevelStates maps the PriorityLevelConfiguration object
138	// name to the state for that level.  Every name referenced from a
139	// member of `flowSchemas` has an entry here.
140	priorityLevelStates map[string]*priorityLevelState
142	// the most recent update attempts, ordered by increasing age.
143	// Consumer trims to keep only the last minute's worth of entries.
144	// The controller uses this to limit itself to at most six updates
145	// to a given FlowSchema in any minute.
146	// This may only be accessed from the one and only worker goroutine.
147	mostRecentUpdates []updateAttempt
149	// watchTracker implements the necessary WatchTracker interface.
150	WatchTracker
153type updateAttempt struct {
154	timeUpdated  time.Time
155	updatedItems sets.String // FlowSchema names
158// priorityLevelState holds the state specific to a priority level.
159type priorityLevelState struct {
160	// the API object or prototype prescribing this level.  Nothing
161	// reached through this pointer is mutable.
162	pl *flowcontrol.PriorityLevelConfiguration
164	// qsCompleter holds the QueueSetCompleter derived from `config`
165	// and `queues` if config is not exempt, nil otherwise.
166	qsCompleter fq.QueueSetCompleter
168	// The QueueSet for this priority level.  This is nil if and only
169	// if the priority level is exempt.
170	queues fq.QueueSet
172	// quiescing==true indicates that this priority level should be
173	// removed when its queues have all drained.  May be true only if
174	// queues is non-nil.
175	quiescing bool
177	// number of goroutines between Controller::Match and calling the
178	// returned StartFunction
179	numPending int
181	// Observers tracking number waiting, executing
182	obsPair metrics.TimedObserverPair
185// NewTestableController is extra flexible to facilitate testing
186func newTestableController(config TestableConfig) *configController {
187	cfgCtlr := &configController{
188		name:                   config.Name,
189		clock:                  config.Clock,
190		queueSetFactory:        config.QueueSetFactory,
191		obsPairGenerator:       config.ObsPairGenerator,
192		asFieldManager:         config.AsFieldManager,
193		foundToDangling:        config.FoundToDangling,
194		serverConcurrencyLimit: config.ServerConcurrencyLimit,
195		requestWaitLimit:       config.RequestWaitLimit,
196		flowcontrolClient:      config.FlowcontrolClient,
197		priorityLevelStates:    make(map[string]*priorityLevelState),
198		WatchTracker:           NewWatchTracker(),
199	}
200	klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
201	// Start with longish delay because conflicts will be between
202	// different processes, so take some time to go away.
203	cfgCtlr.configQueue = workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 8*time.Hour), "priority_and_fairness_config_queue")
204	// ensure the data structure reflects the mandatory config
205	cfgCtlr.lockAndDigestConfigObjects(nil, nil)
206	fci := config.InformerFactory.Flowcontrol().V1beta1()
207	pli := fci.PriorityLevelConfigurations()
208	fsi := fci.FlowSchemas()
209	cfgCtlr.plLister = pli.Lister()
210	cfgCtlr.plInformerSynced = pli.Informer().HasSynced
211	cfgCtlr.fsLister = fsi.Lister()
212	cfgCtlr.fsInformerSynced = fsi.Informer().HasSynced
213	pli.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
214		AddFunc: func(obj interface{}) {
215			pl := obj.(*flowcontrol.PriorityLevelConfiguration)
216			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of PLC %s", cfgCtlr.name, pl.Name)
217			cfgCtlr.configQueue.Add(0)
218		},
219		UpdateFunc: func(oldObj, newObj interface{}) {
220			newPL := newObj.(*flowcontrol.PriorityLevelConfiguration)
221			oldPL := oldObj.(*flowcontrol.PriorityLevelConfiguration)
222			if !apiequality.Semantic.DeepEqual(oldPL.Spec, newPL.Spec) {
223				klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec update of PLC %s", cfgCtlr.name, newPL.Name)
224				cfgCtlr.configQueue.Add(0)
225			} else {
226				klog.V(7).Infof("No trigger API priority and fairness config reloading in %s due to spec non-change of PLC %s", cfgCtlr.name, newPL.Name)
227			}
228		},
229		DeleteFunc: func(obj interface{}) {
230			name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
231			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of PLC %s", cfgCtlr.name, name)
232			cfgCtlr.configQueue.Add(0)
234		}})
235	fsi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
236		AddFunc: func(obj interface{}) {
237			fs := obj.(*flowcontrol.FlowSchema)
238			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to creation of FS %s", cfgCtlr.name, fs.Name)
239			cfgCtlr.configQueue.Add(0)
240		},
241		UpdateFunc: func(oldObj, newObj interface{}) {
242			newFS := newObj.(*flowcontrol.FlowSchema)
243			oldFS := oldObj.(*flowcontrol.FlowSchema)
244			// Changes to either Spec or Status are relevant.  The
245			// concern is that we might, in some future release, want
246			// different behavior than is implemented now. One of the
247			// hardest questions is how does an operator roll out the
248			// new release in a cluster with multiple kube-apiservers
249			// --- in a way that works no matter what servers crash
250			// and restart when. If this handler reacts only to
251			// changes in Spec then we have a scenario in which the
252			// rollout leaves the old Status in place. The scenario
253			// ends with this subsequence: deploy the last new server
254			// before deleting the last old server, and in between
255			// those two operations the last old server crashes and
256			// recovers. The chosen solution is making this controller
257			// insist on maintaining the particular state that it
258			// establishes.
259			if !(apiequality.Semantic.DeepEqual(oldFS.Spec, newFS.Spec) &&
260				apiequality.Semantic.DeepEqual(oldFS.Status, newFS.Status)) {
261				klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to spec and/or status update of FS %s", cfgCtlr.name, newFS.Name)
262				cfgCtlr.configQueue.Add(0)
263			} else {
264				klog.V(7).Infof("No trigger of API priority and fairness config reloading in %s due to spec and status non-change of FS %s", cfgCtlr.name, newFS.Name)
265			}
266		},
267		DeleteFunc: func(obj interface{}) {
268			name, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
269			klog.V(7).Infof("Triggered API priority and fairness config reloading in %s due to deletion of FS %s", cfgCtlr.name, name)
270			cfgCtlr.configQueue.Add(0)
272		}})
273	return cfgCtlr
276// MaintainObservations keeps the observers from
277// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
278// too far behind
279func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
280	wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
283func (cfgCtlr *configController) updateObservations() {
284	cfgCtlr.lock.Lock()
285	defer cfgCtlr.lock.Unlock()
286	for _, plc := range cfgCtlr.priorityLevelStates {
287		if plc.queues != nil {
288			plc.queues.UpdateObservations()
289		}
290	}
293func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
294	defer utilruntime.HandleCrash()
296	// Let the config worker stop when we are done
297	defer cfgCtlr.configQueue.ShutDown()
299	klog.Info("Starting API Priority and Fairness config controller")
300	if ok := cache.WaitForCacheSync(stopCh, cfgCtlr.plInformerSynced, cfgCtlr.fsInformerSynced); !ok {
301		return fmt.Errorf("Never achieved initial sync")
302	}
304	klog.Info("Running API Priority and Fairness config worker")
305	go wait.Until(cfgCtlr.runWorker, time.Second, stopCh)
307	<-stopCh
308	klog.Info("Shutting down API Priority and Fairness config worker")
309	return nil
312// runWorker is the logic of the one and only worker goroutine.  We
313// limit the number to one in order to obviate explicit
314// synchronization around access to `cfgCtlr.mostRecentUpdates`.
315func (cfgCtlr *configController) runWorker() {
316	for cfgCtlr.processNextWorkItem() {
317	}
320// processNextWorkItem works on one entry from the work queue.
321// Only invoke this in the one and only worker goroutine.
322func (cfgCtlr *configController) processNextWorkItem() bool {
323	obj, shutdown := cfgCtlr.configQueue.Get()
324	if shutdown {
325		return false
326	}
328	func(obj interface{}) {
329		defer cfgCtlr.configQueue.Done(obj)
330		specificDelay, err := cfgCtlr.syncOne(map[string]string{})
331		switch {
332		case err != nil:
333			klog.Error(err)
334			cfgCtlr.configQueue.AddRateLimited(obj)
335		case specificDelay > 0:
336			cfgCtlr.configQueue.AddAfter(obj, specificDelay)
337		default:
338			cfgCtlr.configQueue.Forget(obj)
339		}
340	}(obj)
342	return true
345// syncOne does one full synchronization.  It reads all the API
346// objects that configure API Priority and Fairness and updates the
347// local configController accordingly.
348// Only invoke this in the one and only worker goroutine
349func (cfgCtlr *configController) syncOne(flowSchemaRVs map[string]string) (specificDelay time.Duration, err error) {
350	klog.V(5).Infof("%s syncOne at %s", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt))
351	all := labels.Everything()
352	newPLs, err := cfgCtlr.plLister.List(all)
353	if err != nil {
354		return 0, fmt.Errorf("unable to list PriorityLevelConfiguration objects: %w", err)
355	}
356	newFSs, err := cfgCtlr.fsLister.List(all)
357	if err != nil {
358		return 0, fmt.Errorf("unable to list FlowSchema objects: %w", err)
359	}
360	return cfgCtlr.digestConfigObjects(newPLs, newFSs, flowSchemaRVs)
363// cfgMeal is the data involved in the process of digesting the API
364// objects that configure API Priority and Fairness.  All the config
365// objects are digested together, because this is the simplest way to
366// cope with the various dependencies between objects.  The process of
367// digestion is done in four passes over config objects --- three
368// passes over PriorityLevelConfigurations and one pass over the
369// FlowSchemas --- with the work dvided among the passes according to
370// those dependencies.
371type cfgMeal struct {
372	cfgCtlr *configController
374	newPLStates map[string]*priorityLevelState
376	// The sum of the concurrency shares of the priority levels in the
377	// new configuration
378	shareSum float64
380	// These keep track of which mandatory priority level config
381	// objects have been digested
382	haveExemptPL, haveCatchAllPL bool
384	// Buffered FlowSchema status updates to do.  Do them when the
385	// lock is not held, to avoid a deadlock due to such a request
386	// provoking a call into this controller while the lock held
387	// waiting on that request to complete.
388	fsStatusUpdates []fsStatusUpdate
391// A buffered set of status updates for FlowSchemas
392type fsStatusUpdate struct {
393	flowSchema *flowcontrol.FlowSchema
394	condition  flowcontrol.FlowSchemaCondition
395	oldValue   flowcontrol.FlowSchemaCondition
398// digestConfigObjects is given all the API objects that configure
399// cfgCtlr and writes its consequent new configState.
400// Only invoke this in the one and only worker goroutine
401func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema, flowSchemaRVs map[string]string) (time.Duration, error) {
402	fsStatusUpdates := cfgCtlr.lockAndDigestConfigObjects(newPLs, newFSs)
403	var errs []error
404	currResult := updateAttempt{
405		timeUpdated:  cfgCtlr.clock.Now(),
406		updatedItems: sets.String{},
407	}
408	var suggestedDelay time.Duration
409	for _, fsu := range fsStatusUpdates {
410		// if we should skip this name, indicate we will need a delay, but continue with other entries
411		if cfgCtlr.shouldDelayUpdate(fsu.flowSchema.Name) {
412			if suggestedDelay == 0 {
413				suggestedDelay = time.Duration(30+rand.Intn(45)) * time.Second
414			}
415			continue
416		}
418		// if we are going to issue an update, be sure we track every name we update so we know if we update it too often.
419		currResult.updatedItems.Insert(fsu.flowSchema.Name)
421		enc, err := json.Marshal(fsu.condition)
422		if err != nil {
423			// should never happen because these conditions are created here and well formed
424			panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error()))
425		}
426		klog.V(4).Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s", cfgCtlr.name, string(enc), fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue))
427		fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas()
428		patchBytes := []byte(fmt.Sprintf(`{"status": {"conditions": [ %s ] } }`, string(enc)))
429		patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager}
430		patchedFlowSchema, err := fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status")
431		if err == nil {
432			key, _ := cache.MetaNamespaceKeyFunc(patchedFlowSchema)
433			flowSchemaRVs[key] = patchedFlowSchema.ResourceVersion
434		} else if apierrors.IsNotFound(err) {
435			// This object has been deleted.  A notification is coming
436			// and nothing more needs to be done here.
437			klog.V(5).Infof("%s at %s: attempted update of concurrently deleted FlowSchema %s; nothing more needs to be done", cfgCtlr.name, cfgCtlr.clock.Now().Format(timeFmt), fsu.flowSchema.Name)
438		} else {
439			errs = append(errs, fmt.Errorf("failed to set a status.condition for FlowSchema %s: %w", fsu.flowSchema.Name, err))
440		}
441	}
442	cfgCtlr.addUpdateResult(currResult)
444	return suggestedDelay, utilerrors.NewAggregate(errs)
447// shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed.
448// Only invoke this in the one and only worker goroutine
449func (cfgCtlr *configController) shouldDelayUpdate(flowSchemaName string) bool {
450	numUpdatesInPastMinute := 0
451	oneMinuteAgo := cfgCtlr.clock.Now().Add(-1 * time.Minute)
452	for idx, update := range cfgCtlr.mostRecentUpdates {
453		if oneMinuteAgo.After(update.timeUpdated) {
454			// this and the remaining items are no longer relevant
455			cfgCtlr.mostRecentUpdates = cfgCtlr.mostRecentUpdates[:idx]
456			return false
457		}
458		if update.updatedItems.Has(flowSchemaName) {
459			numUpdatesInPastMinute++
460			if numUpdatesInPastMinute > 5 {
461				return true
462			}
463		}
464	}
465	return false
468// addUpdateResult adds the result. It isn't a ring buffer because
469// this is small and rate limited.
470// Only invoke this in the one and only worker goroutine
471func (cfgCtlr *configController) addUpdateResult(result updateAttempt) {
472	cfgCtlr.mostRecentUpdates = append([]updateAttempt{result}, cfgCtlr.mostRecentUpdates...)
475func (cfgCtlr *configController) lockAndDigestConfigObjects(newPLs []*flowcontrol.PriorityLevelConfiguration, newFSs []*flowcontrol.FlowSchema) []fsStatusUpdate {
476	cfgCtlr.lock.Lock()
477	defer cfgCtlr.lock.Unlock()
478	meal := cfgMeal{
479		cfgCtlr:     cfgCtlr,
480		newPLStates: make(map[string]*priorityLevelState),
481	}
483	meal.digestNewPLsLocked(newPLs)
484	meal.digestFlowSchemasLocked(newFSs)
485	meal.processOldPLsLocked()
487	// Supply missing mandatory PriorityLevelConfiguration objects
488	if !meal.haveExemptPL {
489		meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationExempt, cfgCtlr.requestWaitLimit)
490	}
491	if !meal.haveCatchAllPL {
492		meal.imaginePL(fcboot.MandatoryPriorityLevelConfigurationCatchAll, cfgCtlr.requestWaitLimit)
493	}
495	meal.finishQueueSetReconfigsLocked()
497	// The new config has been constructed
498	cfgCtlr.priorityLevelStates = meal.newPLStates
499	klog.V(5).Infof("Switched to new API Priority and Fairness configuration")
500	return meal.fsStatusUpdates
503// Digest the new set of PriorityLevelConfiguration objects.
504// Pretend broken ones do not exist.
505func (meal *cfgMeal) digestNewPLsLocked(newPLs []*flowcontrol.PriorityLevelConfiguration) {
506	for _, pl := range newPLs {
507		state := meal.cfgCtlr.priorityLevelStates[pl.Name]
508		if state == nil {
509			state = &priorityLevelState{obsPair: meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{pl.Name})}
510		}
511		qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, state.queues, pl, meal.cfgCtlr.requestWaitLimit, state.obsPair)
512		if err != nil {
513			klog.Warningf("Ignoring PriorityLevelConfiguration object %s because its spec (%s) is broken: %s", pl.Name, fcfmt.Fmt(pl.Spec), err)
514			continue
515		}
516		meal.newPLStates[pl.Name] = state
517		state.pl = pl
518		state.qsCompleter = qsCompleter
519		if state.quiescing { // it was undesired, but no longer
520			klog.V(3).Infof("Priority level %q was undesired and has become desired again", pl.Name)
521			state.quiescing = false
522		}
523		if state.pl.Spec.Limited != nil {
524			meal.shareSum += float64(state.pl.Spec.Limited.AssuredConcurrencyShares)
525		}
526		meal.haveExemptPL = meal.haveExemptPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt
527		meal.haveCatchAllPL = meal.haveCatchAllPL || pl.Name == flowcontrol.PriorityLevelConfigurationNameCatchAll
528	}
531// Digest the given FlowSchema objects.  Ones that reference a missing
532// or broken priority level are not to be passed on to the filter for
533// use.  We do this before holding over old priority levels so that
534// requests stop going to those levels and FlowSchemaStatus values
535// reflect this.  This function also adds any missing mandatory
536// FlowSchema objects.  The given objects must all have distinct
537// names.
538func (meal *cfgMeal) digestFlowSchemasLocked(newFSs []*flowcontrol.FlowSchema) {
539	fsSeq := make(apihelpers.FlowSchemaSequence, 0, len(newFSs))
540	fsMap := make(map[string]*flowcontrol.FlowSchema, len(newFSs))
541	var haveExemptFS, haveCatchAllFS bool
542	for i, fs := range newFSs {
543		otherFS := fsMap[fs.Name]
544		if otherFS != nil {
545			// This client is forbidden to do this.
546			panic(fmt.Sprintf("Given two FlowSchema objects with the same name: %s and %s", fcfmt.Fmt(otherFS), fcfmt.Fmt(fs)))
547		}
548		fsMap[fs.Name] = fs
549		_, goodPriorityRef := meal.newPLStates[fs.Spec.PriorityLevelConfiguration.Name]
551		// Ensure the object's status reflects whether its priority
552		// level reference is broken.
553		//
554		// TODO: consider not even trying if server is not handling
555		// requests yet.
556		meal.presyncFlowSchemaStatus(fs, meal.cfgCtlr.foundToDangling(goodPriorityRef), fs.Spec.PriorityLevelConfiguration.Name)
558		if !goodPriorityRef {
559			klog.V(6).Infof("Ignoring FlowSchema %s because of bad priority level reference %q", fs.Name, fs.Spec.PriorityLevelConfiguration.Name)
560			continue
561		}
562		fsSeq = append(fsSeq, newFSs[i])
563		haveExemptFS = haveExemptFS || fs.Name == flowcontrol.FlowSchemaNameExempt
564		haveCatchAllFS = haveCatchAllFS || fs.Name == flowcontrol.FlowSchemaNameCatchAll
565	}
566	// sort into the order to be used for matching
567	sort.Sort(fsSeq)
569	// Supply missing mandatory FlowSchemas, in correct position
570	if !haveExemptFS {
571		fsSeq = append(apihelpers.FlowSchemaSequence{fcboot.MandatoryFlowSchemaExempt}, fsSeq...)
572	}
573	if !haveCatchAllFS {
574		fsSeq = append(fsSeq, fcboot.MandatoryFlowSchemaCatchAll)
575	}
577	meal.cfgCtlr.flowSchemas = fsSeq
578	if klog.V(5).Enabled() {
579		for _, fs := range fsSeq {
580			klog.Infof("Using FlowSchema %s", fcfmt.Fmt(fs))
581		}
582	}
585// Consider all the priority levels in the previous configuration.
586// Keep the ones that are in the new config, supply mandatory
587// behavior, or are still busy; for the rest: drop it if it has no
588// queues, otherwise start the quiescing process if that has not
589// already been started.
590func (meal *cfgMeal) processOldPLsLocked() {
591	for plName, plState := range meal.cfgCtlr.priorityLevelStates {
592		if meal.newPLStates[plName] != nil {
593			// Still desired and already updated
594			continue
595		}
596		if plName == flowcontrol.PriorityLevelConfigurationNameExempt && !meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll && !meal.haveCatchAllPL {
597			// BTW, we know the Spec has not changed because the
598			// mandatory objects have immutable Specs
599			klog.V(3).Infof("Retaining mandatory priority level %q despite lack of API object", plName)
600		} else {
601			if plState.queues == nil || plState.numPending == 0 && plState.queues.IsIdle() {
602				// Either there are no queues or they are done
603				// draining and no use is coming from another
604				// goroutine
605				klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type)
606				continue
607			}
608			if !plState.quiescing {
609				klog.V(3).Infof("Priority level %q became undesired", plName)
610				plState.quiescing = true
611			}
612		}
613		var err error
614		plState.qsCompleter, err = queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, plState.queues, plState.pl, meal.cfgCtlr.requestWaitLimit, plState.obsPair)
615		if err != nil {
616			// This can not happen because queueSetCompleterForPL already approved this config
617			panic(fmt.Sprintf("%s from name=%q spec=%s", err, plName, fcfmt.Fmt(plState.pl.Spec)))
618		}
619		if plState.pl.Spec.Limited != nil {
620			// We deliberately include the lingering priority levels
621			// here so that their queues get some concurrency and they
622			// continue to drain.  During this interim a lingering
623			// priority level continues to get a concurrency
624			// allocation determined by all the share values in the
625			// regular way.
626			meal.shareSum += float64(plState.pl.Spec.Limited.AssuredConcurrencyShares)
627		}
628		meal.haveExemptPL = meal.haveExemptPL || plName == flowcontrol.PriorityLevelConfigurationNameExempt
629		meal.haveCatchAllPL = meal.haveCatchAllPL || plName == flowcontrol.PriorityLevelConfigurationNameCatchAll
630		meal.newPLStates[plName] = plState
631	}
634// For all the priority levels of the new config, divide up the
635// server's total concurrency limit among them and create/update their
636// QueueSets.
637func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
638	for plName, plState := range meal.newPLStates {
639		if plState.pl.Spec.Limited == nil {
640			klog.V(5).Infof("Using exempt priority level %q: quiescing=%v", plName, plState.quiescing)
641			continue
642		}
644		// The use of math.Ceil here means that the results might sum
645		// to a little more than serverConcurrencyLimit but the
646		// difference will be negligible.
647		concurrencyLimit := int(math.Ceil(float64(meal.cfgCtlr.serverConcurrencyLimit) * float64(plState.pl.Spec.Limited.AssuredConcurrencyShares) / meal.shareSum))
648		metrics.UpdateSharedConcurrencyLimit(plName, concurrencyLimit)
650		if plState.queues == nil {
651			klog.V(5).Infof("Introducing queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
652		} else {
653			klog.V(5).Infof("Retaining queues for priority level %q: config=%s, concurrencyLimit=%d, quiescing=%v, numPending=%d (shares=%v, shareSum=%v)", plName, fcfmt.Fmt(plState.pl.Spec), concurrencyLimit, plState.quiescing, plState.numPending, plState.pl.Spec.Limited.AssuredConcurrencyShares, meal.shareSum)
654		}
655		plState.queues = plState.qsCompleter.Complete(fq.DispatchingConfig{ConcurrencyLimit: concurrencyLimit})
656	}
659// queueSetCompleterForPL returns an appropriate QueueSetCompleter for the
660// given priority level configuration.  Returns nil if that config
661// does not call for limiting.  Returns nil and an error if the given
662// object is malformed in a way that is a problem for this package.
663func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, intPair metrics.TimedObserverPair) (fq.QueueSetCompleter, error) {
664	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
665		return nil, errors.New("broken union structure at the top")
666	}
667	if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Name == flowcontrol.PriorityLevelConfigurationNameExempt) {
668		// This package does not attempt to cope with a priority level dynamically switching between exempt and not.
669		return nil, errors.New("non-alignment between name and type")
670	}
671	if pl.Spec.Limited == nil {
672		return nil, nil
673	}
674	if (pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeReject) != (pl.Spec.Limited.LimitResponse.Queuing == nil) {
675		return nil, errors.New("broken union structure for limit response")
676	}
677	qcAPI := pl.Spec.Limited.LimitResponse.Queuing
678	qcQS := fq.QueuingConfig{Name: pl.Name}
679	if qcAPI != nil {
680		qcQS = fq.QueuingConfig{Name: pl.Name,
681			DesiredNumQueues: int(qcAPI.Queues),
682			QueueLengthLimit: int(qcAPI.QueueLengthLimit),
683			HandSize:         int(qcAPI.HandSize),
684			RequestWaitLimit: requestWaitLimit,
685		}
686	}
687	var qsc fq.QueueSetCompleter
688	var err error
689	if queues != nil {
690		qsc, err = queues.BeginConfigChange(qcQS)
691	} else {
692		qsc, err = qsf.BeginConstruction(qcQS, intPair)
693	}
694	if err != nil {
695		err = fmt.Errorf("priority level %q has QueuingConfiguration %#+v, which is invalid: %w", pl.Name, qcAPI, err)
696	}
697	return qsc, err
700func (meal *cfgMeal) presyncFlowSchemaStatus(fs *flowcontrol.FlowSchema, isDangling bool, plName string) {
701	danglingCondition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
702	if danglingCondition == nil {
703		danglingCondition = &flowcontrol.FlowSchemaCondition{
704			Type: flowcontrol.FlowSchemaConditionDangling,
705		}
706	}
707	desiredStatus := flowcontrol.ConditionFalse
708	var desiredReason, desiredMessage string
709	if isDangling {
710		desiredStatus = flowcontrol.ConditionTrue
711		desiredReason = "NotFound"
712		desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q but there is no such object", plName)
713	} else {
714		desiredReason = "Found"
715		desiredMessage = fmt.Sprintf("This FlowSchema references the PriorityLevelConfiguration object named %q and it exists", plName)
716	}
717	if danglingCondition.Status == desiredStatus && danglingCondition.Reason == desiredReason && danglingCondition.Message == desiredMessage {
718		return
719	}
720	now := meal.cfgCtlr.clock.Now()
721	meal.fsStatusUpdates = append(meal.fsStatusUpdates, fsStatusUpdate{
722		flowSchema: fs,
723		condition: flowcontrol.FlowSchemaCondition{
724			Type:               flowcontrol.FlowSchemaConditionDangling,
725			Status:             desiredStatus,
726			LastTransitionTime: metav1.NewTime(now),
727			Reason:             desiredReason,
728			Message:            desiredMessage,
729		},
730		oldValue: *danglingCondition})
733// imaginePL adds a priority level based on one of the mandatory ones
734// that does not actually exist (right now) as a real API object.
735func (meal *cfgMeal) imaginePL(proto *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration) {
736	klog.V(3).Infof("No %s PriorityLevelConfiguration found, imagining one", proto.Name)
737	obsPair := meal.cfgCtlr.obsPairGenerator.Generate(1, 1, []string{proto.Name})
738	qsCompleter, err := queueSetCompleterForPL(meal.cfgCtlr.queueSetFactory, nil, proto, requestWaitLimit, obsPair)
739	if err != nil {
740		// This can not happen because proto is one of the mandatory
741		// objects and these are not erroneous
742		panic(err)
743	}
744	meal.newPLStates[proto.Name] = &priorityLevelState{
745		pl:          proto,
746		qsCompleter: qsCompleter,
747		obsPair:     obsPair,
748	}
749	if proto.Spec.Limited != nil {
750		meal.shareSum += float64(proto.Spec.Limited.AssuredConcurrencyShares)
751	}
754type immediateRequest struct{}
756func (immediateRequest) Finish(execute func()) bool {
757	execute()
758	return false
761// startRequest classifies and, if appropriate, enqueues the request.
762// Returns a nil Request if and only if the request is to be rejected.
763// The returned bool indicates whether the request is exempt from
764// limitation.  The startWaitingTime is when the request started
765// waiting in its queue, or `Time{}` if this did not happen.
766func (cfgCtlr *configController) startRequest(ctx context.Context, rd RequestDigest, queueNoteFn fq.QueueNoteFn) (fs *flowcontrol.FlowSchema, pl *flowcontrol.PriorityLevelConfiguration, isExempt bool, req fq.Request, startWaitingTime time.Time) {
767	klog.V(7).Infof("startRequest(%#+v)", rd)
768	cfgCtlr.lock.Lock()
769	defer cfgCtlr.lock.Unlock()
770	var selectedFlowSchema, catchAllFlowSchema *flowcontrol.FlowSchema
771	for _, fs := range cfgCtlr.flowSchemas {
772		if matchesFlowSchema(rd, fs) {
773			selectedFlowSchema = fs
774			break
775		}
776		if fs.Name == flowcontrol.FlowSchemaNameCatchAll {
777			catchAllFlowSchema = fs
778		}
779	}
780	if selectedFlowSchema == nil {
781		// This should never happen. If the requestDigest's User is a part of
782		// system:authenticated or system:unauthenticated, the catch-all flow
783		// schema should match it. However, if that invariant somehow fails,
784		// fallback to the catch-all flow schema anyway.
785		if catchAllFlowSchema == nil {
786			// This should absolutely never, ever happen! APF guarantees two
787			// undeletable flow schemas at all times: an exempt flow schema and a
788			// catch-all flow schema.
789			panic(fmt.Sprintf("no fallback catch-all flow schema found for request %#+v and user %#+v", rd.RequestInfo, rd.User))
790		}
791		selectedFlowSchema = catchAllFlowSchema
792		klog.Warningf("no match found for request %#+v and user %#+v; selecting catchAll=%s as fallback flow schema", rd.RequestInfo, rd.User, fcfmt.Fmt(selectedFlowSchema))
793	}
794	plName := selectedFlowSchema.Spec.PriorityLevelConfiguration.Name
795	plState := cfgCtlr.priorityLevelStates[plName]
796	if plState.pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt {
797		klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, immediate", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName)
798		return selectedFlowSchema, plState.pl, true, immediateRequest{}, time.Time{}
799	}
800	var numQueues int32
801	if plState.pl.Spec.Limited.LimitResponse.Type == flowcontrol.LimitResponseTypeQueue {
802		numQueues = plState.pl.Spec.Limited.LimitResponse.Queuing.Queues
803	}
804	var flowDistinguisher string
805	var hashValue uint64
806	if numQueues > 1 {
807		flowDistinguisher = computeFlowDistinguisher(rd, selectedFlowSchema.Spec.DistinguisherMethod)
808		hashValue = hashFlowID(selectedFlowSchema.Name, flowDistinguisher)
809	}
810	startWaitingTime = time.Now()
811	klog.V(7).Infof("startRequest(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, numQueues=%d", rd, selectedFlowSchema.Name, selectedFlowSchema.Spec.DistinguisherMethod, plName, numQueues)
812	req, idle := plState.queues.StartRequest(ctx, &rd.Width, hashValue, flowDistinguisher, selectedFlowSchema.Name, rd.RequestInfo, rd.User, queueNoteFn)
813	if idle {
814		cfgCtlr.maybeReapLocked(plName, plState)
815	}
816	return selectedFlowSchema, plState.pl, false, req, startWaitingTime
819// maybeReap will remove the last internal traces of the named
820// priority level if it has no more use.  Call this after getting a
821// clue that the given priority level is undesired and idle.
822func (cfgCtlr *configController) maybeReap(plName string) {
823	cfgCtlr.lock.Lock()
824	defer cfgCtlr.lock.Unlock()
825	plState := cfgCtlr.priorityLevelStates[plName]
826	if plState == nil {
827		klog.V(7).Infof("plName=%s, plState==nil", plName)
828		return
829	}
830	if plState.queues != nil {
831		useless := plState.quiescing && plState.numPending == 0 && plState.queues.IsIdle()
832		klog.V(7).Infof("plState.quiescing=%v, plState.numPending=%d, useless=%v", plState.quiescing, plState.numPending, useless)
833		if !useless {
834			return
835		}
836	}
837	klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
838	cfgCtlr.configQueue.Add(0)
841// maybeReapLocked requires the cfgCtlr's lock to already be held and
842// will remove the last internal traces of the named priority level if
843// it has no more use.  Call this if both (1) plState.queues is
844// non-nil and reported being idle, and (2) cfgCtlr's lock has not
845// been released since then.
846func (cfgCtlr *configController) maybeReapLocked(plName string, plState *priorityLevelState) {
847	if !(plState.quiescing && plState.numPending == 0) {
848		return
849	}
850	klog.V(3).Infof("Triggered API priority and fairness config reloading because priority level %s is undesired and idle", plName)
851	cfgCtlr.configQueue.Add(0)
854// computeFlowDistinguisher extracts the flow distinguisher according to the given method
855func computeFlowDistinguisher(rd RequestDigest, method *flowcontrol.FlowDistinguisherMethod) string {
856	if method == nil {
857		return ""
858	}
859	switch method.Type {
860	case flowcontrol.FlowDistinguisherMethodByUserType:
861		return rd.User.GetName()
862	case flowcontrol.FlowDistinguisherMethodByNamespaceType:
863		return rd.RequestInfo.Namespace
864	default:
865		// this line shall never reach
866		panic("invalid flow-distinguisher method")
867	}
870func hashFlowID(fsName, fDistinguisher string) uint64 {
871	hash := sha256.New()
872	var sep = [1]byte{0}
873	hash.Write([]byte(fsName))
874	hash.Write(sep[:])
875	hash.Write([]byte(fDistinguisher))
876	var sum [32]byte
877	hash.Sum(sum[:0])
878	return binary.LittleEndian.Uint64(sum[:8])