1// Copyright The OpenTelemetry Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15// +build windows 16 17package loadscraper 18 19import ( 20 "context" 21 "math" 22 "sync" 23 "time" 24 25 "github.com/shirou/gopsutil/load" 26 "go.uber.org/zap" 27 28 "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" 29) 30 31// Sample processor queue length at a 5s frequency, and calculate exponentially weighted moving averages 32// as per https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation 33 34const ( 35 system = "System" 36 processorQueueLength = "Processor Queue Length" 37) 38 39var ( 40 samplingFrequency = 5 * time.Second 41 42 loadAvgFactor1m = 1 / math.Exp(samplingFrequency.Seconds()/time.Minute.Seconds()) 43 loadAvgFactor5m = 1 / math.Exp(samplingFrequency.Seconds()/(5*time.Minute).Seconds()) 44 loadAvgFactor15m = 1 / math.Exp(samplingFrequency.Seconds()/(15*time.Minute).Seconds()) 45) 46 47var ( 48 scraperCount int 49 startupLock sync.Mutex 50 51 samplerInstance *sampler 52) 53 54type sampler struct { 55 done chan struct{} 56 logger *zap.Logger 57 perfCounterScraper perfcounters.PerfCounterScraper 58 loadAvg1m float64 59 loadAvg5m float64 60 loadAvg15m float64 61 lock sync.RWMutex 62} 63 64func startSampling(_ context.Context, logger *zap.Logger) error { 65 startupLock.Lock() 66 defer startupLock.Unlock() 67 68 // startSampling may be called multiple times if multiple scrapers are 69 // initialized - but we only want to initialize a single load sampler 70 scraperCount++ 71 if scraperCount > 1 { 72 return nil 73 } 74 75 var err error 76 samplerInstance, err = newSampler(logger) 77 if err != nil { 78 return err 79 } 80 81 samplerInstance.startSamplingTicker() 82 return nil 83} 84 85func newSampler(logger *zap.Logger) (*sampler, error) { 86 perfCounterScraper := &perfcounters.PerfLibScraper{} 87 if err := perfCounterScraper.Initialize(system); err != nil { 88 return nil, err 89 } 90 91 sampler := &sampler{ 92 logger: logger, 93 perfCounterScraper: perfCounterScraper, 94 done: make(chan struct{}), 95 } 96 97 return sampler, nil 98} 99 100func (sw *sampler) startSamplingTicker() { 101 go func() { 102 ticker := time.NewTicker(samplingFrequency) 103 defer ticker.Stop() 104 105 for { 106 select { 107 case <-ticker.C: 108 sw.sampleLoad() 109 case <-sw.done: 110 return 111 } 112 } 113 }() 114} 115 116func (sw *sampler) sampleLoad() { 117 counters, err := sw.perfCounterScraper.Scrape() 118 if err != nil { 119 sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err)) 120 return 121 } 122 123 systemObject, err := counters.GetObject(system) 124 if err != nil { 125 sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err)) 126 return 127 } 128 129 counterValues, err := systemObject.GetValues(processorQueueLength) 130 if err != nil { 131 sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err)) 132 return 133 } 134 135 currentLoad := float64(counterValues[0].Values[processorQueueLength]) 136 137 sw.lock.Lock() 138 defer sw.lock.Unlock() 139 sw.loadAvg1m = sw.loadAvg1m*loadAvgFactor1m + currentLoad*(1-loadAvgFactor1m) 140 sw.loadAvg5m = sw.loadAvg5m*loadAvgFactor5m + currentLoad*(1-loadAvgFactor5m) 141 sw.loadAvg15m = sw.loadAvg15m*loadAvgFactor15m + currentLoad*(1-loadAvgFactor15m) 142} 143 144func stopSampling(_ context.Context) error { 145 startupLock.Lock() 146 defer startupLock.Unlock() 147 148 // only stop sampling if all load scrapers have been closed 149 scraperCount-- 150 if scraperCount > 0 { 151 return nil 152 } 153 154 close(samplerInstance.done) 155 return nil 156} 157 158func getSampledLoadAverages() (*load.AvgStat, error) { 159 samplerInstance.lock.RLock() 160 defer samplerInstance.lock.RUnlock() 161 162 avgStat := &load.AvgStat{ 163 Load1: samplerInstance.loadAvg1m, 164 Load5: samplerInstance.loadAvg5m, 165 Load15: samplerInstance.loadAvg15m, 166 } 167 168 return avgStat, nil 169} 170