1/*
2Copyright 2019 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package fairqueuing
18
19import (
20	"math"
21	"sync"
22	"time"
23
24	"k8s.io/apimachinery/pkg/util/clock"
25	"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
26)
27
28// Integrator computes the moments of some variable X over time as
29// read from a particular clock.  The integrals start when the
30// Integrator is created, and ends at the latest operation on the
31// Integrator.  As a `metrics.TimedObserver` this fixes X1=1 and
32// ignores attempts to change X1.
33type Integrator interface {
34	metrics.TimedObserver
35
36	GetResults() IntegratorResults
37
38	// Return the results of integrating to now, and reset integration to start now
39	Reset() IntegratorResults
40}
41
42// IntegratorResults holds statistical abstracts of the integration
43type IntegratorResults struct {
44	Duration  float64 //seconds
45	Average   float64 //time-weighted
46	Deviation float64 //standard deviation: sqrt(avg((value-avg)^2))
47	Min, Max  float64
48}
49
50// Equal tests for semantic equality.
51// This considers all NaN values to be equal to each other.
52func (x *IntegratorResults) Equal(y *IntegratorResults) bool {
53	return x == y || x != nil && y != nil && x.Duration == y.Duration && x.Min == y.Min && x.Max == y.Max && (x.Average == y.Average || math.IsNaN(x.Average) && math.IsNaN(y.Average)) && (x.Deviation == y.Deviation || math.IsNaN(x.Deviation) && math.IsNaN(y.Deviation))
54}
55
56type integrator struct {
57	clock clock.PassiveClock
58	sync.Mutex
59	lastTime time.Time
60	x        float64
61	moments  Moments
62	min, max float64
63}
64
65// NewIntegrator makes one that uses the given clock
66func NewIntegrator(clock clock.PassiveClock) Integrator {
67	return &integrator{
68		clock:    clock,
69		lastTime: clock.Now(),
70	}
71}
72
73func (igr *integrator) SetX1(x1 float64) {
74}
75
76func (igr *integrator) Set(x float64) {
77	igr.Lock()
78	igr.setLocked(x)
79	igr.Unlock()
80}
81
82func (igr *integrator) setLocked(x float64) {
83	igr.updateLocked()
84	igr.x = x
85	if x < igr.min {
86		igr.min = x
87	}
88	if x > igr.max {
89		igr.max = x
90	}
91}
92
93func (igr *integrator) Add(deltaX float64) {
94	igr.Lock()
95	igr.setLocked(igr.x + deltaX)
96	igr.Unlock()
97}
98
99func (igr *integrator) updateLocked() {
100	now := igr.clock.Now()
101	dt := now.Sub(igr.lastTime).Seconds()
102	igr.lastTime = now
103	igr.moments = igr.moments.Add(ConstantMoments(dt, igr.x))
104}
105
106func (igr *integrator) GetResults() IntegratorResults {
107	igr.Lock()
108	defer igr.Unlock()
109	return igr.getResultsLocked()
110}
111
112func (igr *integrator) Reset() IntegratorResults {
113	igr.Lock()
114	defer igr.Unlock()
115	results := igr.getResultsLocked()
116	igr.moments = Moments{}
117	igr.min = igr.x
118	igr.max = igr.x
119	return results
120}
121
122func (igr *integrator) getResultsLocked() (results IntegratorResults) {
123	igr.updateLocked()
124	results.Min, results.Max = igr.min, igr.max
125	results.Duration = igr.moments.ElapsedSeconds
126	results.Average, results.Deviation = igr.moments.AvgAndStdDev()
127	return
128}
129
130// Moments are the integrals of the 0, 1, and 2 powers of some
131// variable X over some range of time.
132type Moments struct {
133	ElapsedSeconds float64 // integral of dt
134	IntegralX      float64 // integral of x dt
135	IntegralXX     float64 // integral of x*x dt
136}
137
138// ConstantMoments is for a constant X
139func ConstantMoments(dt, x float64) Moments {
140	return Moments{
141		ElapsedSeconds: dt,
142		IntegralX:      x * dt,
143		IntegralXX:     x * x * dt,
144	}
145}
146
147// Add combines over two ranges of time
148func (igr Moments) Add(ogr Moments) Moments {
149	return Moments{
150		ElapsedSeconds: igr.ElapsedSeconds + ogr.ElapsedSeconds,
151		IntegralX:      igr.IntegralX + ogr.IntegralX,
152		IntegralXX:     igr.IntegralXX + ogr.IntegralXX,
153	}
154}
155
156// Sub finds the difference between a range of time and a subrange
157func (igr Moments) Sub(ogr Moments) Moments {
158	return Moments{
159		ElapsedSeconds: igr.ElapsedSeconds - ogr.ElapsedSeconds,
160		IntegralX:      igr.IntegralX - ogr.IntegralX,
161		IntegralXX:     igr.IntegralXX - ogr.IntegralXX,
162	}
163}
164
165// AvgAndStdDev returns the average and standard devation
166func (igr Moments) AvgAndStdDev() (float64, float64) {
167	if igr.ElapsedSeconds <= 0 {
168		return math.NaN(), math.NaN()
169	}
170	avg := igr.IntegralX / igr.ElapsedSeconds
171	// standard deviation is sqrt( average( (x - xbar)^2 ) )
172	// = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration )
173	// = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration)
174	// = sqrt( Integral(x^2 dt)/Duration - xbar^2 )
175	variance := igr.IntegralXX/igr.ElapsedSeconds - avg*avg
176	if variance >= 0 {
177		return avg, math.Sqrt(variance)
178	}
179	return avg, math.NaN()
180}
181