1/* 2Copyright 2019 The Kubernetes Authors. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package fairqueuing 18 19import ( 20 "math" 21 "sync" 22 "time" 23 24 "k8s.io/apimachinery/pkg/util/clock" 25 "k8s.io/apiserver/pkg/util/flowcontrol/metrics" 26) 27 28// Integrator computes the moments of some variable X over time as 29// read from a particular clock. The integrals start when the 30// Integrator is created, and ends at the latest operation on the 31// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and 32// ignores attempts to change X1. 33type Integrator interface { 34 metrics.TimedObserver 35 36 GetResults() IntegratorResults 37 38 // Return the results of integrating to now, and reset integration to start now 39 Reset() IntegratorResults 40} 41 42// IntegratorResults holds statistical abstracts of the integration 43type IntegratorResults struct { 44 Duration float64 //seconds 45 Average float64 //time-weighted 46 Deviation float64 //standard deviation: sqrt(avg((value-avg)^2)) 47 Min, Max float64 48} 49 50// Equal tests for semantic equality. 51// This considers all NaN values to be equal to each other. 52func (x *IntegratorResults) Equal(y *IntegratorResults) bool { 53 return x == y || x != nil && y != nil && x.Duration == y.Duration && x.Min == y.Min && x.Max == y.Max && (x.Average == y.Average || math.IsNaN(x.Average) && math.IsNaN(y.Average)) && (x.Deviation == y.Deviation || math.IsNaN(x.Deviation) && math.IsNaN(y.Deviation)) 54} 55 56type integrator struct { 57 clock clock.PassiveClock 58 sync.Mutex 59 lastTime time.Time 60 x float64 61 moments Moments 62 min, max float64 63} 64 65// NewIntegrator makes one that uses the given clock 66func NewIntegrator(clock clock.PassiveClock) Integrator { 67 return &integrator{ 68 clock: clock, 69 lastTime: clock.Now(), 70 } 71} 72 73func (igr *integrator) SetX1(x1 float64) { 74} 75 76func (igr *integrator) Set(x float64) { 77 igr.Lock() 78 igr.setLocked(x) 79 igr.Unlock() 80} 81 82func (igr *integrator) setLocked(x float64) { 83 igr.updateLocked() 84 igr.x = x 85 if x < igr.min { 86 igr.min = x 87 } 88 if x > igr.max { 89 igr.max = x 90 } 91} 92 93func (igr *integrator) Add(deltaX float64) { 94 igr.Lock() 95 igr.setLocked(igr.x + deltaX) 96 igr.Unlock() 97} 98 99func (igr *integrator) updateLocked() { 100 now := igr.clock.Now() 101 dt := now.Sub(igr.lastTime).Seconds() 102 igr.lastTime = now 103 igr.moments = igr.moments.Add(ConstantMoments(dt, igr.x)) 104} 105 106func (igr *integrator) GetResults() IntegratorResults { 107 igr.Lock() 108 defer igr.Unlock() 109 return igr.getResultsLocked() 110} 111 112func (igr *integrator) Reset() IntegratorResults { 113 igr.Lock() 114 defer igr.Unlock() 115 results := igr.getResultsLocked() 116 igr.moments = Moments{} 117 igr.min = igr.x 118 igr.max = igr.x 119 return results 120} 121 122func (igr *integrator) getResultsLocked() (results IntegratorResults) { 123 igr.updateLocked() 124 results.Min, results.Max = igr.min, igr.max 125 results.Duration = igr.moments.ElapsedSeconds 126 results.Average, results.Deviation = igr.moments.AvgAndStdDev() 127 return 128} 129 130// Moments are the integrals of the 0, 1, and 2 powers of some 131// variable X over some range of time. 132type Moments struct { 133 ElapsedSeconds float64 // integral of dt 134 IntegralX float64 // integral of x dt 135 IntegralXX float64 // integral of x*x dt 136} 137 138// ConstantMoments is for a constant X 139func ConstantMoments(dt, x float64) Moments { 140 return Moments{ 141 ElapsedSeconds: dt, 142 IntegralX: x * dt, 143 IntegralXX: x * x * dt, 144 } 145} 146 147// Add combines over two ranges of time 148func (igr Moments) Add(ogr Moments) Moments { 149 return Moments{ 150 ElapsedSeconds: igr.ElapsedSeconds + ogr.ElapsedSeconds, 151 IntegralX: igr.IntegralX + ogr.IntegralX, 152 IntegralXX: igr.IntegralXX + ogr.IntegralXX, 153 } 154} 155 156// Sub finds the difference between a range of time and a subrange 157func (igr Moments) Sub(ogr Moments) Moments { 158 return Moments{ 159 ElapsedSeconds: igr.ElapsedSeconds - ogr.ElapsedSeconds, 160 IntegralX: igr.IntegralX - ogr.IntegralX, 161 IntegralXX: igr.IntegralXX - ogr.IntegralXX, 162 } 163} 164 165// AvgAndStdDev returns the average and standard devation 166func (igr Moments) AvgAndStdDev() (float64, float64) { 167 if igr.ElapsedSeconds <= 0 { 168 return math.NaN(), math.NaN() 169 } 170 avg := igr.IntegralX / igr.ElapsedSeconds 171 // standard deviation is sqrt( average( (x - xbar)^2 ) ) 172 // = sqrt( Integral( x^2 + xbar^2 -2*x*xbar dt ) / Duration ) 173 // = sqrt( ( Integral( x^2 dt ) + Duration * xbar^2 - 2*xbar*Integral(x dt) ) / Duration) 174 // = sqrt( Integral(x^2 dt)/Duration - xbar^2 ) 175 variance := igr.IntegralXX/igr.ElapsedSeconds - avg*avg 176 if variance >= 0 { 177 return avg, math.Sqrt(variance) 178 } 179 return avg, math.NaN() 180} 181