1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package fairqueuing
18
19import (
20	"context"
21	"time"
22
23	"k8s.io/apiserver/pkg/util/flowcontrol/debug"
24	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
25	"k8s.io/apiserver/pkg/util/flowcontrol/request"
26)
27
28// QueueSetFactory is used to create QueueSet objects.  Creation, like
29// config update, is done in two phases: the first phase consumes the
30// QueuingConfig and the second consumes the DispatchingConfig.  They
31// are separated so that errors from the first phase can be found
32// before committing to a concurrency allotment for the second.
33type QueueSetFactory interface {
34	// BeginConstruction does the first phase of creating a QueueSet
35	BeginConstruction(QueuingConfig, metrics.TimedObserverPair) (QueueSetCompleter, error)
36}
37
38// QueueSetCompleter finishes the two-step process of creating or
39// reconfiguring a QueueSet
40type QueueSetCompleter interface {
41	// Complete returns a QueueSet configured by the given
42	// dispatching configuration.
43	Complete(DispatchingConfig) QueueSet
44}
45
46// QueueSet is the abstraction for the queuing and dispatching
47// functionality of one non-exempt priority level.  It covers the
48// functionality described in the "Assignment to a Queue", "Queuing",
49// and "Dispatching" sections of
50// https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md
51// .  Some day we may have connections between priority levels, but
52// today is not that day.
53type QueueSet interface {
54	// BeginConfigChange starts the two-step process of updating the
55	// configuration.  No change is made until Complete is called.  If
56	// `C := X.BeginConstruction(q)` then `C.Complete(d)` returns the
57	// same value `X`.  If the QueuingConfig's DesiredNumQueues field
58	// is zero then the other queuing-specific config parameters are
59	// not changed, so that the queues continue draining as before.
60	// In any case, reconfiguration does not discard any queue unless
61	// and until it is undesired and empty.
62	BeginConfigChange(QueuingConfig) (QueueSetCompleter, error)
63
64	// IsIdle returns a bool indicating whether the QueueSet was idle
65	// at the moment of the return.  Idle means the QueueSet has zero
66	// requests queued and zero executing.  This bit can change only
67	// (1) during a call to StartRequest and (2) during a call to
68	// Request::Finish.  In the latter case idleness can only change
69	// from false to true.
70	IsIdle() bool
71
72	// StartRequest begins the process of handling a request.  If the
73	// request gets queued and the number of queues is greater than 1
74	// then StartRequest uses the given hashValue as the source of
75	// entropy as it shuffle-shards the request into a queue.  The
76	// descr1 and descr2 values play no role in the logic but appear
77	// in log messages.  This method always returns quickly (without
78	// waiting for the request to be dequeued).  If this method
79	// returns a nil Request value then caller should reject the
80	// request and the returned bool indicates whether the QueueSet
81	// was idle at the moment of the return.  Otherwise idle==false
82	// and the client must call the Finish method of the Request
83	// exactly once.
84	StartRequest(ctx context.Context, width *request.Width, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)
85
86	// UpdateObservations makes sure any time-based statistics have
87	// caught up with the current clock reading
88	UpdateObservations()
89
90	// Dump saves and returns the instant internal state of the queue-set.
91	// Note that dumping process will stop the queue-set from proceeding
92	// any requests.
93	// For debugging only.
94	Dump(includeRequestDetails bool) debug.QueueSetDump
95}
96
97// QueueNoteFn is called when a request enters and leaves a queue
98type QueueNoteFn func(inQueue bool)
99
100// Request represents the remainder of the handling of one request
101type Request interface {
102	// Finish determines whether to execute or reject the request and
103	// invokes `execute` if the decision is to execute the request.
104	// The returned `idle bool` value indicates whether the QueueSet
105	// was idle when the value was calculated, but might no longer be
106	// accurate by the time the client examines that value.
107	Finish(execute func()) (idle bool)
108}
109
110// QueuingConfig defines the configuration of the queuing aspect of a QueueSet.
111type QueuingConfig struct {
112	// Name is used to identify a queue set, allowing for descriptive information about its intended use
113	Name string
114
115	// DesiredNumQueues is the number of queues that the API says
116	// should exist now.  This may be zero, in which case
117	// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
118	DesiredNumQueues int
119
120	// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
121	QueueLengthLimit int
122
123	// HandSize is a parameter of shuffle sharding.  Upon arrival of a request, a queue is chosen by randomly
124	// dealing a "hand" of this many queues and then picking one of minimum length.
125	HandSize int
126
127	// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
128	// If, by the end of that time, the request has not been dispatched then it is rejected.
129	RequestWaitLimit time.Duration
130}
131
132// DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
133type DispatchingConfig struct {
134	// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
135	ConcurrencyLimit int
136}
137