1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Diagnostics; 6 using System.Globalization; 7 8 namespace System.Threading 9 { 10 internal partial class ClrThreadPool 11 { 12 /// <summary> 13 /// Hill climbing algorithm used for determining the number of threads needed for the thread pool. 14 /// </summary> 15 private partial class HillClimbing 16 { 17 private static readonly Lazy<HillClimbing> s_threadPoolHillClimber = new Lazy<HillClimbing>(CreateHillClimber, true); 18 public static HillClimbing ThreadPoolHillClimber => s_threadPoolHillClimber.Value; 19 20 private const int DefaultSampleIntervalMsLow = 10; 21 private const int DefaultSampleIntervalMsHigh = 200; 22 CreateHillClimber()23 private static HillClimbing CreateHillClimber() 24 { 25 // Default values pulled from CoreCLR 26 return new HillClimbing(wavePeriod: AppContextConfigHelper.GetInt32Config("HillClimbing_WavePeriod", 4), 27 maxWaveMagnitude: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxWaveMagnitude", 20), 28 waveMagnitudeMultiplier: AppContextConfigHelper.GetInt32Config("HillClimbing_WaveMagnitudeMultiplier", 100) / 100.0, 29 waveHistorySize: AppContextConfigHelper.GetInt32Config("HillClimbing_WaveHistorySize", 8), 30 targetThroughputRatio: AppContextConfigHelper.GetInt32Config("HillClimbing_Bias", 15) / 100.0, 31 targetSignalToNoiseRatio: AppContextConfigHelper.GetInt32Config("HillClimbing_TargetSignalToNoiseRatio", 300) / 100.0, 32 maxChangePerSecond: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxChangePerSecond", 4), 33 maxChangePerSample: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxChangePerSample", 20), 34 sampleIntervalMsLow: AppContextConfigHelper.GetInt32Config("HillClimbing_SampleIntervalLow", DefaultSampleIntervalMsLow, allowNegative: false), 35 sampleIntervalMsHigh: AppContextConfigHelper.GetInt32Config("HillClimbing_SampleIntervalHigh", DefaultSampleIntervalMsHigh, allowNegative: false), 36 errorSmoothingFactor: AppContextConfigHelper.GetInt32Config("HillClimbing_ErrorSmoothingFactor", 1) / 100.0, 37 gainExponent: AppContextConfigHelper.GetInt32Config("HillClimbing_GainExponent", 200) / 100.0, 38 maxSampleError: AppContextConfigHelper.GetInt32Config("HillClimbing_MaxSampleErrorPercent", 15) / 100.0 39 ); 40 } 41 private const int LogCapacity = 200; 42 43 public enum StateOrTransition 44 { 45 Warmup, 46 Initializing, 47 RandomMove, 48 ClimbingMove, 49 ChangePoint, 50 Stabilizing, 51 Starvation, // Used as a message from the thread pool for a forced transition 52 ThreadTimedOut, // Usage as a message from the thread pool for a forced transition 53 } 54 55 private struct LogEntry 56 { 57 public int tickCount; 58 public StateOrTransition stateOrTransition; 59 public int newControlSetting; 60 public int lastHistoryCount; 61 public double lastHistoryMean; 62 } 63 64 private readonly int _wavePeriod; 65 private readonly int _samplesToMeasure; 66 private readonly double _targetThroughputRatio; 67 private readonly double _targetSignalToNoiseRatio; 68 private readonly double _maxChangePerSecond; 69 private readonly double _maxChangePerSample; 70 private readonly int _maxThreadWaveMagnitude; 71 private readonly int _sampleIntervalMsLow; 72 private readonly double _threadMagnitudeMultiplier; 73 private readonly int _sampleIntervalMsHigh; 74 private readonly double _throughputErrorSmoothingFactor; 75 private readonly double _gainExponent; 76 private readonly double _maxSampleError; 77 78 private double _currentControlSetting; 79 private long _totalSamples; 80 private int _lastThreadCount; 81 private double _averageThroughputNoise; 82 private double _secondsElapsedSinceLastChange; 83 private double _completionsSinceLastChange; 84 private int _accumulatedCompletionCount; 85 private double _accumulatedSampleDurationSeconds; 86 private double[] _samples; 87 private double[] _threadCounts; 88 private int _currentSampleMs; 89 90 private Random _randomIntervalGenerator = new Random(); 91 92 private LogEntry[] _log = new LogEntry[LogCapacity]; 93 private int _logStart = 0; 94 private int _logSize = 0; 95 HillClimbing(int wavePeriod, int maxWaveMagnitude, double waveMagnitudeMultiplier, int waveHistorySize, double targetThroughputRatio, double targetSignalToNoiseRatio, double maxChangePerSecond, double maxChangePerSample, int sampleIntervalMsLow, int sampleIntervalMsHigh, double errorSmoothingFactor, double gainExponent, double maxSampleError)96 public HillClimbing(int wavePeriod, int maxWaveMagnitude, double waveMagnitudeMultiplier, int waveHistorySize, double targetThroughputRatio, 97 double targetSignalToNoiseRatio, double maxChangePerSecond, double maxChangePerSample, int sampleIntervalMsLow, int sampleIntervalMsHigh, 98 double errorSmoothingFactor, double gainExponent, double maxSampleError) 99 { 100 _wavePeriod = wavePeriod; 101 _maxThreadWaveMagnitude = maxWaveMagnitude; 102 _threadMagnitudeMultiplier = waveMagnitudeMultiplier; 103 _samplesToMeasure = wavePeriod * waveHistorySize; 104 _targetThroughputRatio = targetThroughputRatio; 105 _targetSignalToNoiseRatio = targetSignalToNoiseRatio; 106 _maxChangePerSecond = maxChangePerSecond; 107 _maxChangePerSample = maxChangePerSample; 108 if (sampleIntervalMsLow <= sampleIntervalMsHigh) 109 { 110 _sampleIntervalMsLow = sampleIntervalMsLow; 111 _sampleIntervalMsHigh = sampleIntervalMsHigh; 112 } 113 else 114 { 115 _sampleIntervalMsLow = DefaultSampleIntervalMsLow; 116 _sampleIntervalMsHigh = DefaultSampleIntervalMsHigh; 117 } 118 _throughputErrorSmoothingFactor = errorSmoothingFactor; 119 _gainExponent = gainExponent; 120 _maxSampleError = maxSampleError; 121 122 _samples = new double[_samplesToMeasure]; 123 _threadCounts = new double[_samplesToMeasure]; 124 125 _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); 126 } 127 Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)128 public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions) 129 { 130 131 // 132 // If someone changed the thread count without telling us, update our records accordingly. 133 // 134 if (currentThreadCount != _lastThreadCount) 135 ForceChange(currentThreadCount, StateOrTransition.Initializing); 136 137 // 138 // Update the cumulative stats for this thread count 139 // 140 _secondsElapsedSinceLastChange += sampleDurationSeconds; 141 _completionsSinceLastChange += numCompletions; 142 143 // 144 // Add in any data we've already collected about this sample 145 // 146 sampleDurationSeconds += _accumulatedSampleDurationSeconds; 147 numCompletions += _accumulatedCompletionCount; 148 149 // 150 // We need to make sure we're collecting reasonably accurate data. Since we're just counting the end 151 // of each work item, we are goinng to be missing some data about what really happened during the 152 // sample interval. The count produced by each thread includes an initial work item that may have 153 // started well before the start of the interval, and each thread may have been running some new 154 // work item for some time before the end of the interval, which did not yet get counted. So 155 // our count is going to be off by +/- threadCount workitems. 156 // 157 // The exception is that the thread that reported to us last time definitely wasn't running any work 158 // at that time, and the thread that's reporting now definitely isn't running a work item now. So 159 // we really only need to consider threadCount-1 threads. 160 // 161 // Thus the percent error in our count is +/- (threadCount-1)/numCompletions. 162 // 163 // We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because 164 // of the way it accumulates over time. If this sample is off by, say, 33% in the negative direction, 165 // then the next one likely will be too. The one after that will include the sum of the completions 166 // we missed in the previous samples, and so will be 33% positive. So every three samples we'll have 167 // two "low" samples and one "high" sample. This will appear as periodic variation right in the frequency 168 // range we're targeting, which will not be filtered by the frequency-domain translation. 169 // 170 if (_totalSamples > 0 && ((currentThreadCount - 1.0) / numCompletions) >= _maxSampleError) 171 { 172 // not accurate enough yet. Let's accumulate the data so far, and tell the ThreadPool 173 // to collect a little more. 174 _accumulatedSampleDurationSeconds = sampleDurationSeconds; 175 _accumulatedCompletionCount = numCompletions; 176 return (currentThreadCount, 10); 177 } 178 179 // 180 // We've got enouugh data for our sample; reset our accumulators for next time. 181 // 182 _accumulatedSampleDurationSeconds = 0; 183 _accumulatedCompletionCount = 0; 184 185 // 186 // Add the current thread count and throughput sample to our history 187 // 188 double throughput = numCompletions / sampleDurationSeconds; 189 190 ClrThreadPoolEventSource.Log.WorkerThreadAdjustmentSample(throughput); 191 192 int sampleIndex = (int)(_totalSamples % _samplesToMeasure); 193 _samples[sampleIndex] = throughput; 194 _threadCounts[sampleIndex] = currentThreadCount; 195 _totalSamples++; 196 197 // 198 // Set up defaults for our metrics 199 // 200 Complex threadWaveComponent = default(Complex); 201 Complex throughputWaveComponent = default(Complex); 202 double throughputErrorEstimate = 0; 203 Complex ratio = default(Complex); 204 double confidence = 0; 205 206 StateOrTransition state = StateOrTransition.Warmup; 207 208 // 209 // How many samples will we use? It must be at least the three wave periods we're looking for, and it must also be a whole 210 // multiple of the primary wave's period; otherwise the frequency we're looking for will fall between two frequency bands 211 // in the Fourier analysis, and we won't be able to measure it accurately. 212 // 213 int sampleCount = ((int)Math.Min(_totalSamples - 1, _samplesToMeasure)) / _wavePeriod * _wavePeriod; 214 215 if (sampleCount > _wavePeriod) 216 { 217 // 218 // Average the throughput and thread count samples, so we can scale the wave magnitudes later. 219 // 220 double sampleSum = 0; 221 double threadSum = 0; 222 for (int i = 0; i < sampleCount; i++) 223 { 224 sampleSum += _samples[(_totalSamples - sampleCount + i) % _samplesToMeasure]; 225 threadSum += _threadCounts[(_totalSamples - sampleCount + i) % _samplesToMeasure]; 226 } 227 double averageThroughput = sampleSum / sampleCount; 228 double averageThreadCount = threadSum / sampleCount; 229 230 if (averageThroughput > 0 && averageThreadCount > 0) 231 { 232 // 233 // Calculate the periods of the adjacent frequency bands we'll be using to measure noise levels. 234 // We want the two adjacent Fourier frequency bands. 235 // 236 double adjacentPeriod1 = sampleCount / (((double)sampleCount / _wavePeriod) + 1); 237 double adjacentPeriod2 = sampleCount / (((double)sampleCount / _wavePeriod) - 1); 238 239 // 240 // Get the the three different frequency components of the throughput (scaled by average 241 // throughput). Our "error" estimate (the amount of noise that might be present in the 242 // frequency band we're really interested in) is the average of the adjacent bands. 243 // 244 throughputWaveComponent = GetWaveComponent(_samples, sampleCount, _wavePeriod) / averageThroughput; 245 throughputErrorEstimate = (GetWaveComponent(_samples, sampleCount, adjacentPeriod1) / averageThroughput).Abs(); 246 if (adjacentPeriod2 <= sampleCount) 247 { 248 throughputErrorEstimate = Math.Max(throughputErrorEstimate, (GetWaveComponent(_samples, sampleCount, adjacentPeriod2) / averageThroughput).Abs()); 249 } 250 251 // 252 // Do the same for the thread counts, so we have something to compare to. We don't measure thread count 253 // noise, because there is none; these are exact measurements. 254 // 255 threadWaveComponent = GetWaveComponent(_threadCounts, sampleCount, _wavePeriod) / averageThreadCount; 256 257 // 258 // Update our moving average of the throughput noise. We'll use this later as feedback to 259 // determine the new size of the thread wave. 260 // 261 if (_averageThroughputNoise == 0) 262 _averageThroughputNoise = throughputErrorEstimate; 263 else 264 _averageThroughputNoise = (_throughputErrorSmoothingFactor * throughputErrorEstimate) + ((1.0 - _throughputErrorSmoothingFactor) * _averageThroughputNoise); 265 266 if (threadWaveComponent.Abs() > 0) 267 { 268 // 269 // Adjust the throughput wave so it's centered around the target wave, and then calculate the adjusted throughput/thread ratio. 270 // 271 ratio = (throughputWaveComponent - (_targetThroughputRatio * threadWaveComponent)) / threadWaveComponent; 272 state = StateOrTransition.ClimbingMove; 273 } 274 else 275 { 276 ratio = new Complex(0, 0); 277 state = StateOrTransition.Stabilizing; 278 } 279 280 // 281 // Calculate how confident we are in the ratio. More noise == less confident. This has 282 // the effect of slowing down movements that might be affected by random noise. 283 // 284 double noiseForConfidence = Math.Max(_averageThroughputNoise, throughputErrorEstimate); 285 if (noiseForConfidence > 0) 286 confidence = (threadWaveComponent.Abs() / noiseForConfidence) / _targetSignalToNoiseRatio; 287 else 288 confidence = 1.0; //there is no noise! 289 290 } 291 } 292 293 // 294 // We use just the real part of the complex ratio we just calculated. If the throughput signal 295 // is exactly in phase with the thread signal, this will be the same as taking the magnitude of 296 // the complex move and moving that far up. If they're 180 degrees out of phase, we'll move 297 // backward (because this indicates that our changes are having the opposite of the intended effect). 298 // If they're 90 degrees out of phase, we won't move at all, because we can't tell wether we're 299 // having a negative or positive effect on throughput. 300 // 301 double move = Math.Min(1.0, Math.Max(-1.0, ratio.Real)); 302 303 // 304 // Apply our confidence multiplier. 305 // 306 move *= Math.Min(1.0, Math.Max(0.0, confidence)); 307 308 // 309 // Now apply non-linear gain, such that values around zero are attenuated, while higher values 310 // are enhanced. This allows us to move quickly if we're far away from the target, but more slowly 311 // if we're getting close, giving us rapid ramp-up without wild oscillations around the target. 312 // 313 double gain = _maxChangePerSecond * sampleDurationSeconds; 314 move = Math.Pow(Math.Abs(move), _gainExponent) * (move >= 0.0 ? 1 : -1) * gain; 315 move = Math.Min(move, _maxChangePerSample); 316 317 // 318 // If the result was positive, and CPU is > 95%, refuse the move. 319 // 320 if (move > 0.0 && ThreadPoolInstance._cpuUtilization > CpuUtilizationHigh) 321 move = 0.0; 322 323 // 324 // Apply the move to our control setting 325 // 326 _currentControlSetting += move; 327 328 // 329 // Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of 330 // the throughput error. This average starts at zero, so we'll start with a nice safe little wave at first. 331 // 332 int newThreadWaveMagnitude = (int)(0.5 + (_currentControlSetting * _averageThroughputNoise * _targetSignalToNoiseRatio * _threadMagnitudeMultiplier * 2.0)); 333 newThreadWaveMagnitude = Math.Min(newThreadWaveMagnitude, _maxThreadWaveMagnitude); 334 newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1); 335 336 // 337 // Make sure our control setting is within the ThreadPool's limits 338 // 339 int maxThreads = ThreadPoolInstance._maxThreads; 340 int minThreads = ThreadPoolInstance._minThreads; 341 342 _currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting); 343 _currentControlSetting = Math.Max(minThreads, _currentControlSetting); 344 345 // 346 // Calculate the new thread count (control setting + square wave) 347 // 348 int newThreadCount = (int)(_currentControlSetting + newThreadWaveMagnitude * ((_totalSamples / (_wavePeriod / 2)) % 2)); 349 350 // 351 // Make sure the new thread count doesn't exceed the ThreadPool's limits 352 // 353 newThreadCount = Math.Min(maxThreads, newThreadCount); 354 newThreadCount = Math.Max(minThreads, newThreadCount); 355 356 // 357 // Record these numbers for posterity 358 // 359 360 ClrThreadPoolEventSource.Log.WorkerThreadAdjustmentStats(sampleDurationSeconds, throughput, threadWaveComponent.Real, throughputWaveComponent.Real, 361 throughputErrorEstimate, _averageThroughputNoise, ratio.Real, confidence, _currentControlSetting, (ushort)newThreadWaveMagnitude); 362 363 364 // 365 // If all of this caused an actual change in thread count, log that as well. 366 // 367 if (newThreadCount != currentThreadCount) 368 ChangeThreadCount(newThreadCount, state); 369 370 // 371 // Return the new thread count and sample interval. This is randomized to prevent correlations with other periodic 372 // changes in throughput. Among other things, this prevents us from getting confused by Hill Climbing instances 373 // running in other processes. 374 // 375 // If we're at minThreads, and we seem to be hurting performance by going higher, we can't go any lower to fix this. So 376 // we'll simply stay at minThreads much longer, and only occasionally try a higher value. 377 // 378 int newSampleInterval; 379 if (ratio.Real < 0.0 && newThreadCount == minThreads) 380 newSampleInterval = (int)(0.5 + _currentSampleMs * (10.0 * Math.Max(-ratio.Real, 1.0))); 381 else 382 newSampleInterval = _currentSampleMs; 383 384 return (newThreadCount, newSampleInterval); 385 } 386 ChangeThreadCount(int newThreadCount, StateOrTransition state)387 private void ChangeThreadCount(int newThreadCount, StateOrTransition state) 388 { 389 _lastThreadCount = newThreadCount; 390 _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1); 391 double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0; 392 LogTransition(newThreadCount, throughput, state); 393 _secondsElapsedSinceLastChange = 0; 394 _completionsSinceLastChange = 0; 395 } 396 LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition)397 private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition) 398 { 399 // Use the _log array as a circular array for log entries 400 int index = (_logStart + _logSize) % LogCapacity; 401 402 if(_logSize == LogCapacity) 403 { 404 _logStart = (_logStart + 1) % LogCapacity; 405 _logSize--; // hide this slot while we update it 406 } 407 408 ref LogEntry entry = ref _log[index]; 409 410 entry.tickCount = Environment.TickCount; 411 entry.stateOrTransition = stateOrTransition; 412 entry.newControlSetting = newThreadCount; 413 entry.lastHistoryCount = ((int)Math.Min(_totalSamples, _samplesToMeasure) / _wavePeriod) * _wavePeriod; 414 entry.lastHistoryMean = throughput; 415 416 _logSize++; 417 418 ClrThreadPoolEventSource.Log.WorkerThreadAdjustmentAdjustment(throughput, newThreadCount, (int)stateOrTransition); 419 } 420 ForceChange(int newThreadCount, StateOrTransition state)421 public void ForceChange(int newThreadCount, StateOrTransition state) 422 { 423 if(_lastThreadCount != newThreadCount) 424 { 425 _currentControlSetting += newThreadCount - _lastThreadCount; 426 ChangeThreadCount(newThreadCount, state); 427 } 428 } 429 GetWaveComponent(double[] samples, int numSamples, double period)430 private Complex GetWaveComponent(double[] samples, int numSamples, double period) 431 { 432 Debug.Assert(numSamples >= period); // can't measure a wave that doesn't fit 433 Debug.Assert(period >= 2); // can't measure above the Nyquist frequency 434 Debug.Assert(numSamples <= samples.Length); // can't measure more samples than we have 435 436 // 437 // Calculate the sinusoid with the given period. 438 // We're using the Goertzel algorithm for this. See http://en.wikipedia.org/wiki/Goertzel_algorithm. 439 // 440 441 double w = 2 * Math.PI / period; 442 double cos = Math.Cos(w); 443 double coeff = 2 * cos; 444 double q0 = 0, q1 = 0, q2 = 0; 445 for(int i = 0; i < numSamples; ++i) 446 { 447 q0 = coeff * q1 - q2 + samples[(_totalSamples - numSamples + i) % _samplesToMeasure]; 448 q2 = q1; 449 q1 = q0; 450 } 451 return new Complex(q1 - q2 * cos, q2 * Math.Sin(w)) / numSamples; 452 } 453 } 454 } 455 } 456