1// Copyright The OpenTelemetry Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//       http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// +build windows
16
17package loadscraper
18
19import (
20	"context"
21	"math"
22	"sync"
23	"time"
24
25	"github.com/shirou/gopsutil/load"
26	"go.uber.org/zap"
27
28	"go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters"
29)
30
31// Sample processor queue length at a 5s frequency, and calculate exponentially weighted moving averages
32// as per https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
33
34const (
35	system               = "System"
36	processorQueueLength = "Processor Queue Length"
37)
38
39var (
40	samplingFrequency = 5 * time.Second
41
42	loadAvgFactor1m  = 1 / math.Exp(samplingFrequency.Seconds()/time.Minute.Seconds())
43	loadAvgFactor5m  = 1 / math.Exp(samplingFrequency.Seconds()/(5*time.Minute).Seconds())
44	loadAvgFactor15m = 1 / math.Exp(samplingFrequency.Seconds()/(15*time.Minute).Seconds())
45)
46
47var (
48	scraperCount int
49	startupLock  sync.Mutex
50
51	samplerInstance *sampler
52)
53
54type sampler struct {
55	done               chan struct{}
56	logger             *zap.Logger
57	perfCounterScraper perfcounters.PerfCounterScraper
58	loadAvg1m          float64
59	loadAvg5m          float64
60	loadAvg15m         float64
61	lock               sync.RWMutex
62}
63
64func startSampling(_ context.Context, logger *zap.Logger) error {
65	startupLock.Lock()
66	defer startupLock.Unlock()
67
68	// startSampling may be called multiple times if multiple scrapers are
69	// initialized - but we only want to initialize a single load sampler
70	scraperCount++
71	if scraperCount > 1 {
72		return nil
73	}
74
75	var err error
76	samplerInstance, err = newSampler(logger)
77	if err != nil {
78		return err
79	}
80
81	samplerInstance.startSamplingTicker()
82	return nil
83}
84
85func newSampler(logger *zap.Logger) (*sampler, error) {
86	perfCounterScraper := &perfcounters.PerfLibScraper{}
87	if err := perfCounterScraper.Initialize(system); err != nil {
88		return nil, err
89	}
90
91	sampler := &sampler{
92		logger:             logger,
93		perfCounterScraper: perfCounterScraper,
94		done:               make(chan struct{}),
95	}
96
97	return sampler, nil
98}
99
100func (sw *sampler) startSamplingTicker() {
101	go func() {
102		ticker := time.NewTicker(samplingFrequency)
103		defer ticker.Stop()
104
105		for {
106			select {
107			case <-ticker.C:
108				sw.sampleLoad()
109			case <-sw.done:
110				return
111			}
112		}
113	}()
114}
115
116func (sw *sampler) sampleLoad() {
117	counters, err := sw.perfCounterScraper.Scrape()
118	if err != nil {
119		sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err))
120		return
121	}
122
123	systemObject, err := counters.GetObject(system)
124	if err != nil {
125		sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err))
126		return
127	}
128
129	counterValues, err := systemObject.GetValues(processorQueueLength)
130	if err != nil {
131		sw.logger.Error("Load Scraper: failed to measure processor queue length", zap.Error(err))
132		return
133	}
134
135	currentLoad := float64(counterValues[0].Values[processorQueueLength])
136
137	sw.lock.Lock()
138	defer sw.lock.Unlock()
139	sw.loadAvg1m = sw.loadAvg1m*loadAvgFactor1m + currentLoad*(1-loadAvgFactor1m)
140	sw.loadAvg5m = sw.loadAvg5m*loadAvgFactor5m + currentLoad*(1-loadAvgFactor5m)
141	sw.loadAvg15m = sw.loadAvg15m*loadAvgFactor15m + currentLoad*(1-loadAvgFactor15m)
142}
143
144func stopSampling(_ context.Context) error {
145	startupLock.Lock()
146	defer startupLock.Unlock()
147
148	// only stop sampling if all load scrapers have been closed
149	scraperCount--
150	if scraperCount > 0 {
151		return nil
152	}
153
154	close(samplerInstance.done)
155	return nil
156}
157
158func getSampledLoadAverages() (*load.AvgStat, error) {
159	samplerInstance.lock.RLock()
160	defer samplerInstance.lock.RUnlock()
161
162	avgStat := &load.AvgStat{
163		Load1:  samplerInstance.loadAvg1m,
164		Load5:  samplerInstance.loadAvg5m,
165		Load15: samplerInstance.loadAvg15m,
166	}
167
168	return avgStat, nil
169}
170