1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Diagnostics;
6 using System.Globalization;
7 
8 namespace System.Threading
9 {
10     internal partial class ClrThreadPool
11     {
12         /// <summary>
13         /// Hill climbing algorithm used for determining the number of threads needed for the thread pool.
14         /// </summary>
15         private partial class HillClimbing
16         {
17             private static readonly Lazy<HillClimbing> s_threadPoolHillClimber = new Lazy<HillClimbing>(CreateHillClimber, true);
18             public static HillClimbing ThreadPoolHillClimber => s_threadPoolHillClimber.Value;
19 
20             private const int DefaultSampleIntervalMsLow = 10;
21             private const int DefaultSampleIntervalMsHigh = 200;
22 
CreateHillClimber()23             private static HillClimbing CreateHillClimber()
24             {
25                 // Default values pulled from CoreCLR
26                 return new HillClimbing(wavePeriod: AppContextConfigHelper.GetInt32Config("HillClimbing_WavePeriod", 4),
27                     maxWaveMagnitude: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxWaveMagnitude", 20),
28                     waveMagnitudeMultiplier: AppContextConfigHelper.GetInt32Config("HillClimbing_WaveMagnitudeMultiplier", 100) / 100.0,
29                     waveHistorySize: AppContextConfigHelper.GetInt32Config("HillClimbing_WaveHistorySize", 8),
30                     targetThroughputRatio: AppContextConfigHelper.GetInt32Config("HillClimbing_Bias", 15) / 100.0,
31                     targetSignalToNoiseRatio: AppContextConfigHelper.GetInt32Config("HillClimbing_TargetSignalToNoiseRatio", 300) / 100.0,
32                     maxChangePerSecond: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxChangePerSecond", 4),
33                     maxChangePerSample: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxChangePerSample", 20),
34                     sampleIntervalMsLow: AppContextConfigHelper.GetInt32Config("HillClimbing_SampleIntervalLow", DefaultSampleIntervalMsLow, allowNegative: false),
35                     sampleIntervalMsHigh: AppContextConfigHelper.GetInt32Config("HillClimbing_SampleIntervalHigh", DefaultSampleIntervalMsHigh, allowNegative: false),
36                     errorSmoothingFactor: AppContextConfigHelper.GetInt32Config("HillClimbing_ErrorSmoothingFactor", 1) / 100.0,
37                     gainExponent: AppContextConfigHelper.GetInt32Config("HillClimbing_GainExponent", 200) / 100.0,
38                     maxSampleError: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxSampleErrorPercent", 15) / 100.0
39                 );
40             }
41             private const int LogCapacity = 200;
42 
43             public enum StateOrTransition
44             {
45                 Warmup,
46                 Initializing,
47                 RandomMove,
48                 ClimbingMove,
49                 ChangePoint,
50                 Stabilizing,
51                 Starvation, // Used as a message from the thread pool for a forced transition
52                 ThreadTimedOut, // Usage as a message from the thread pool for a forced transition
53             }
54 
55             private struct LogEntry
56             {
57                 public int tickCount;
58                 public StateOrTransition stateOrTransition;
59                 public int newControlSetting;
60                 public int lastHistoryCount;
61                 public double lastHistoryMean;
62             }
63 
64             private readonly int _wavePeriod;
65             private readonly int _samplesToMeasure;
66             private readonly double _targetThroughputRatio;
67             private readonly double _targetSignalToNoiseRatio;
68             private readonly double _maxChangePerSecond;
69             private readonly double _maxChangePerSample;
70             private readonly int _maxThreadWaveMagnitude;
71             private readonly int _sampleIntervalMsLow;
72             private readonly double _threadMagnitudeMultiplier;
73             private readonly int _sampleIntervalMsHigh;
74             private readonly double _throughputErrorSmoothingFactor;
75             private readonly double _gainExponent;
76             private readonly double _maxSampleError;
77 
78             private double _currentControlSetting;
79             private long _totalSamples;
80             private int _lastThreadCount;
81             private double _averageThroughputNoise;
82             private double _secondsElapsedSinceLastChange;
83             private double _completionsSinceLastChange;
84             private int _accumulatedCompletionCount;
85             private double _accumulatedSampleDurationSeconds;
86             private double[] _samples;
87             private double[] _threadCounts;
88             private int _currentSampleMs;
89 
90             private Random _randomIntervalGenerator = new Random();
91 
92             private LogEntry[] _log = new LogEntry[LogCapacity];
93             private int _logStart = 0;
94             private int _logSize = 0;
95 
HillClimbing(int wavePeriod, int maxWaveMagnitude, double waveMagnitudeMultiplier, int waveHistorySize, double targetThroughputRatio, double targetSignalToNoiseRatio, double maxChangePerSecond, double maxChangePerSample, int sampleIntervalMsLow, int sampleIntervalMsHigh, double errorSmoothingFactor, double gainExponent, double maxSampleError)96             public HillClimbing(int wavePeriod, int maxWaveMagnitude, double waveMagnitudeMultiplier, int waveHistorySize, double targetThroughputRatio,
97                 double targetSignalToNoiseRatio, double maxChangePerSecond, double maxChangePerSample, int sampleIntervalMsLow, int sampleIntervalMsHigh,
98                 double errorSmoothingFactor, double gainExponent, double maxSampleError)
99             {
100                 _wavePeriod = wavePeriod;
101                 _maxThreadWaveMagnitude = maxWaveMagnitude;
102                 _threadMagnitudeMultiplier = waveMagnitudeMultiplier;
103                 _samplesToMeasure = wavePeriod * waveHistorySize;
104                 _targetThroughputRatio = targetThroughputRatio;
105                 _targetSignalToNoiseRatio = targetSignalToNoiseRatio;
106                 _maxChangePerSecond = maxChangePerSecond;
107                 _maxChangePerSample = maxChangePerSample;
108                 if (sampleIntervalMsLow <= sampleIntervalMsHigh)
109                 {
110                     _sampleIntervalMsLow = sampleIntervalMsLow;
111                     _sampleIntervalMsHigh = sampleIntervalMsHigh;
112                 }
113                 else
114                 {
115                     _sampleIntervalMsLow = DefaultSampleIntervalMsLow;
116                     _sampleIntervalMsHigh = DefaultSampleIntervalMsHigh;
117                 }
118                 _throughputErrorSmoothingFactor = errorSmoothingFactor;
119                 _gainExponent = gainExponent;
120                 _maxSampleError = maxSampleError;
121 
122                 _samples = new double[_samplesToMeasure];
123                 _threadCounts = new double[_samplesToMeasure];
124 
125                 _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
126             }
127 
Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)128             public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)
129             {
130 
131                 //
132                 // If someone changed the thread count without telling us, update our records accordingly.
133                 //
134                 if (currentThreadCount != _lastThreadCount)
135                     ForceChange(currentThreadCount, StateOrTransition.Initializing);
136 
137                 //
138                 // Update the cumulative stats for this thread count
139                 //
140                 _secondsElapsedSinceLastChange += sampleDurationSeconds;
141                 _completionsSinceLastChange += numCompletions;
142 
143                 //
144                 // Add in any data we've already collected about this sample
145                 //
146                 sampleDurationSeconds += _accumulatedSampleDurationSeconds;
147                 numCompletions += _accumulatedCompletionCount;
148 
149                 //
150                 // We need to make sure we're collecting reasonably accurate data.  Since we're just counting the end
151                 // of each work item, we are goinng to be missing some data about what really happened during the
152                 // sample interval.  The count produced by each thread includes an initial work item that may have
153                 // started well before the start of the interval, and each thread may have been running some new
154                 // work item for some time before the end of the interval, which did not yet get counted.  So
155                 // our count is going to be off by +/- threadCount workitems.
156                 //
157                 // The exception is that the thread that reported to us last time definitely wasn't running any work
158                 // at that time, and the thread that's reporting now definitely isn't running a work item now.  So
159                 // we really only need to consider threadCount-1 threads.
160                 //
161                 // Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
162                 //
163                 // We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
164                 // of the way it accumulates over time.  If this sample is off by, say, 33% in the negative direction,
165                 // then the next one likely will be too.  The one after that will include the sum of the completions
166                 // we missed in the previous samples, and so will be 33% positive.  So every three samples we'll have
167                 // two "low" samples and one "high" sample.  This will appear as periodic variation right in the frequency
168                 // range we're targeting, which will not be filtered by the frequency-domain translation.
169                 //
170                 if (_totalSamples > 0 && ((currentThreadCount - 1.0) / numCompletions) >= _maxSampleError)
171                 {
172                     // not accurate enough yet.  Let's accumulate the data so far, and tell the ThreadPool
173                     // to collect a little more.
174                     _accumulatedSampleDurationSeconds = sampleDurationSeconds;
175                     _accumulatedCompletionCount = numCompletions;
176                     return (currentThreadCount, 10);
177                 }
178 
179                 //
180                 // We've got enouugh data for our sample; reset our accumulators for next time.
181                 //
182                 _accumulatedSampleDurationSeconds = 0;
183                 _accumulatedCompletionCount = 0;
184 
185                 //
186                 // Add the current thread count and throughput sample to our history
187                 //
188                 double throughput = numCompletions / sampleDurationSeconds;
189 
190                 ClrThreadPoolEventSource.Log.WorkerThreadAdjustmentSample(throughput);
191 
192                 int sampleIndex = (int)(_totalSamples % _samplesToMeasure);
193                 _samples[sampleIndex] = throughput;
194                 _threadCounts[sampleIndex] = currentThreadCount;
195                 _totalSamples++;
196 
197                 //
198                 // Set up defaults for our metrics
199                 //
200                 Complex threadWaveComponent = default(Complex);
201                 Complex throughputWaveComponent = default(Complex);
202                 double throughputErrorEstimate = 0;
203                 Complex ratio = default(Complex);
204                 double confidence = 0;
205 
206                 StateOrTransition state = StateOrTransition.Warmup;
207 
208                 //
209                 // How many samples will we use?  It must be at least the three wave periods we're looking for, and it must also be a whole
210                 // multiple of the primary wave's period; otherwise the frequency we're looking for will fall between two  frequency bands
211                 // in the Fourier analysis, and we won't be able to measure it accurately.
212                 //
213                 int sampleCount = ((int)Math.Min(_totalSamples - 1, _samplesToMeasure)) / _wavePeriod * _wavePeriod;
214 
215                 if (sampleCount > _wavePeriod)
216                 {
217                     //
218                     // Average the throughput and thread count samples, so we can scale the wave magnitudes later.
219                     //
220                     double sampleSum = 0;
221                     double threadSum = 0;
222                     for (int i = 0; i < sampleCount; i++)
223                     {
224                         sampleSum += _samples[(_totalSamples - sampleCount + i) % _samplesToMeasure];
225                         threadSum += _threadCounts[(_totalSamples - sampleCount + i) % _samplesToMeasure];
226                     }
227                     double averageThroughput = sampleSum / sampleCount;
228                     double averageThreadCount = threadSum / sampleCount;
229 
230                     if (averageThroughput > 0 && averageThreadCount > 0)
231                     {
232                         //
233                         // Calculate the periods of the adjacent frequency bands we'll be using to measure noise levels.
234                         // We want the two adjacent Fourier frequency bands.
235                         //
236                         double adjacentPeriod1 = sampleCount / (((double)sampleCount / _wavePeriod) + 1);
237                         double adjacentPeriod2 = sampleCount / (((double)sampleCount / _wavePeriod) - 1);
238 
239                         //
240                         // Get the the three different frequency components of the throughput (scaled by average
241                         // throughput).  Our "error" estimate (the amount of noise that might be present in the
242                         // frequency band we're really interested in) is the average of the adjacent bands.
243                         //
244                         throughputWaveComponent = GetWaveComponent(_samples, sampleCount, _wavePeriod) / averageThroughput;
245                         throughputErrorEstimate = (GetWaveComponent(_samples, sampleCount, adjacentPeriod1) / averageThroughput).Abs();
246                         if (adjacentPeriod2 <= sampleCount)
247                         {
248                             throughputErrorEstimate = Math.Max(throughputErrorEstimate, (GetWaveComponent(_samples, sampleCount, adjacentPeriod2) / averageThroughput).Abs());
249                         }
250 
251                         //
252                         // Do the same for the thread counts, so we have something to compare to.  We don't measure thread count
253                         // noise, because there is none; these are exact measurements.
254                         //
255                         threadWaveComponent = GetWaveComponent(_threadCounts, sampleCount, _wavePeriod) / averageThreadCount;
256 
257                         //
258                         // Update our moving average of the throughput noise.  We'll use this later as feedback to
259                         // determine the new size of the thread wave.
260                         //
261                         if (_averageThroughputNoise == 0)
262                             _averageThroughputNoise = throughputErrorEstimate;
263                         else
264                             _averageThroughputNoise = (_throughputErrorSmoothingFactor * throughputErrorEstimate) + ((1.0 - _throughputErrorSmoothingFactor) * _averageThroughputNoise);
265 
266                         if (threadWaveComponent.Abs() > 0)
267                         {
268                             //
269                             // Adjust the throughput wave so it's centered around the target wave, and then calculate the adjusted throughput/thread ratio.
270                             //
271                             ratio = (throughputWaveComponent - (_targetThroughputRatio * threadWaveComponent)) / threadWaveComponent;
272                             state = StateOrTransition.ClimbingMove;
273                         }
274                         else
275                         {
276                             ratio = new Complex(0, 0);
277                             state = StateOrTransition.Stabilizing;
278                         }
279 
280                         //
281                         // Calculate how confident we are in the ratio.  More noise == less confident.  This has
282                         // the effect of slowing down movements that might be affected by random noise.
283                         //
284                         double noiseForConfidence = Math.Max(_averageThroughputNoise, throughputErrorEstimate);
285                         if (noiseForConfidence > 0)
286                             confidence = (threadWaveComponent.Abs() / noiseForConfidence) / _targetSignalToNoiseRatio;
287                         else
288                             confidence = 1.0; //there is no noise!
289 
290                     }
291                 }
292 
293                 //
294                 // We use just the real part of the complex ratio we just calculated.  If the throughput signal
295                 // is exactly in phase with the thread signal, this will be the same as taking the magnitude of
296                 // the complex move and moving that far up.  If they're 180 degrees out of phase, we'll move
297                 // backward (because this indicates that our changes are having the opposite of the intended effect).
298                 // If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're
299                 // having a negative or positive effect on throughput.
300                 //
301                 double move = Math.Min(1.0, Math.Max(-1.0, ratio.Real));
302 
303                 //
304                 // Apply our confidence multiplier.
305                 //
306                 move *= Math.Min(1.0, Math.Max(0.0, confidence));
307 
308                 //
309                 // Now apply non-linear gain, such that values around zero are attenuated, while higher values
310                 // are enhanced.  This allows us to move quickly if we're far away from the target, but more slowly
311                 // if we're getting close, giving us rapid ramp-up without wild oscillations around the target.
312                 //
313                 double gain = _maxChangePerSecond * sampleDurationSeconds;
314                 move = Math.Pow(Math.Abs(move), _gainExponent) * (move >= 0.0 ? 1 : -1) * gain;
315                 move = Math.Min(move, _maxChangePerSample);
316 
317                 //
318                 // If the result was positive, and CPU is > 95%, refuse the move.
319                 //
320                 if (move > 0.0 && ThreadPoolInstance._cpuUtilization > CpuUtilizationHigh)
321                     move = 0.0;
322 
323                 //
324                 // Apply the move to our control setting
325                 //
326                 _currentControlSetting += move;
327 
328                 //
329                 // Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of
330                 // the throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first.
331                 //
332                 int newThreadWaveMagnitude = (int)(0.5 + (_currentControlSetting * _averageThroughputNoise * _targetSignalToNoiseRatio * _threadMagnitudeMultiplier * 2.0));
333                 newThreadWaveMagnitude = Math.Min(newThreadWaveMagnitude, _maxThreadWaveMagnitude);
334                 newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1);
335 
336                 //
337                 // Make sure our control setting is within the ThreadPool's limits
338                 //
339                 int maxThreads = ThreadPoolInstance._maxThreads;
340                 int minThreads = ThreadPoolInstance._minThreads;
341 
342                 _currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting);
343                 _currentControlSetting = Math.Max(minThreads, _currentControlSetting);
344 
345                 //
346                 // Calculate the new thread count (control setting + square wave)
347                 //
348                 int newThreadCount = (int)(_currentControlSetting + newThreadWaveMagnitude * ((_totalSamples / (_wavePeriod / 2)) % 2));
349 
350                 //
351                 // Make sure the new thread count doesn't exceed the ThreadPool's limits
352                 //
353                 newThreadCount = Math.Min(maxThreads, newThreadCount);
354                 newThreadCount = Math.Max(minThreads, newThreadCount);
355 
356                 //
357                 // Record these numbers for posterity
358                 //
359 
360                 ClrThreadPoolEventSource.Log.WorkerThreadAdjustmentStats(sampleDurationSeconds, throughput, threadWaveComponent.Real, throughputWaveComponent.Real,
361                     throughputErrorEstimate, _averageThroughputNoise, ratio.Real, confidence, _currentControlSetting, (ushort)newThreadWaveMagnitude);
362 
363 
364                 //
365                 // If all of this caused an actual change in thread count, log that as well.
366                 //
367                 if (newThreadCount != currentThreadCount)
368                     ChangeThreadCount(newThreadCount, state);
369 
370                 //
371                 // Return the new thread count and sample interval.  This is randomized to prevent correlations with other periodic
372                 // changes in throughput.  Among other things, this prevents us from getting confused by Hill Climbing instances
373                 // running in other processes.
374                 //
375                 // If we're at minThreads, and we seem to be hurting performance by going higher, we can't go any lower to fix this.  So
376                 // we'll simply stay at minThreads much longer, and only occasionally try a higher value.
377                 //
378                 int newSampleInterval;
379                 if (ratio.Real < 0.0 && newThreadCount == minThreads)
380                     newSampleInterval = (int)(0.5 + _currentSampleMs * (10.0 * Math.Max(-ratio.Real, 1.0)));
381                 else
382                     newSampleInterval = _currentSampleMs;
383 
384                 return (newThreadCount, newSampleInterval);
385             }
386 
ChangeThreadCount(int newThreadCount, StateOrTransition state)387             private void ChangeThreadCount(int newThreadCount, StateOrTransition state)
388             {
389                 _lastThreadCount = newThreadCount;
390                 _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
391                 double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0;
392                 LogTransition(newThreadCount, throughput, state);
393                 _secondsElapsedSinceLastChange = 0;
394                 _completionsSinceLastChange = 0;
395             }
396 
LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition)397             private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition)
398             {
399                 // Use the _log array as a circular array for log entries
400                 int index = (_logStart + _logSize) % LogCapacity;
401 
402                 if(_logSize == LogCapacity)
403                 {
404                     _logStart = (_logStart + 1) % LogCapacity;
405                     _logSize--; // hide this slot while we update it
406                 }
407 
408                 ref LogEntry entry = ref _log[index];
409 
410                 entry.tickCount = Environment.TickCount;
411                 entry.stateOrTransition = stateOrTransition;
412                 entry.newControlSetting = newThreadCount;
413                 entry.lastHistoryCount = ((int)Math.Min(_totalSamples, _samplesToMeasure) / _wavePeriod) * _wavePeriod;
414                 entry.lastHistoryMean = throughput;
415 
416                 _logSize++;
417 
418                 ClrThreadPoolEventSource.Log.WorkerThreadAdjustmentAdjustment(throughput, newThreadCount, (int)stateOrTransition);
419             }
420 
ForceChange(int newThreadCount, StateOrTransition state)421             public void ForceChange(int newThreadCount, StateOrTransition state)
422             {
423                 if(_lastThreadCount != newThreadCount)
424                 {
425                     _currentControlSetting += newThreadCount - _lastThreadCount;
426                     ChangeThreadCount(newThreadCount, state);
427                 }
428             }
429 
GetWaveComponent(double[] samples, int numSamples, double period)430             private Complex GetWaveComponent(double[] samples, int numSamples, double period)
431             {
432                 Debug.Assert(numSamples >= period); // can't measure a wave that doesn't fit
433                 Debug.Assert(period >= 2); // can't measure above the Nyquist frequency
434                 Debug.Assert(numSamples <= samples.Length); // can't measure more samples than we have
435 
436                 //
437                 // Calculate the sinusoid with the given period.
438                 // We're using the Goertzel algorithm for this.  See http://en.wikipedia.org/wiki/Goertzel_algorithm.
439                 //
440 
441                 double w = 2 * Math.PI / period;
442                 double cos = Math.Cos(w);
443                 double coeff = 2 * cos;
444                 double q0 = 0, q1 = 0, q2 = 0;
445                 for(int i = 0; i < numSamples; ++i)
446                 {
447                     q0 = coeff * q1 - q2 + samples[(_totalSamples - numSamples + i) % _samplesToMeasure];
448                     q2 = q1;
449                     q1 = q0;
450                 }
451                 return new Complex(q1 - q2 * cos, q2 * Math.Sin(w)) / numSamples;
452             }
453         }
454     }
455 }
456