1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Globalization; 6 using Internal.LowLevelLinq; 7 using Internal.Runtime.Augments; 8 9 namespace System.Threading 10 { 11 internal partial class ClrThreadPool 12 { 13 /// <summary> 14 /// The worker thread infastructure for the CLR thread pool. 15 /// </summary> 16 private static class WorkerThread 17 { 18 /// <summary> 19 /// Semaphore for controlling how many threads are currently working. 20 /// </summary> 21 private static LowLevelLifoSemaphore s_semaphore = new LowLevelLifoSemaphore(0, MaxPossibleThreadCount); 22 WorkerThreadStart()23 private static void WorkerThreadStart() 24 { 25 ClrThreadPoolEventSource.Log.WorkerThreadStart(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads); 26 RuntimeThread currentThread = RuntimeThread.CurrentThread; 27 while (true) 28 { 29 while (WaitForRequest()) 30 { 31 if (TakeActiveRequest()) 32 { 33 Volatile.Write(ref ThreadPoolInstance._separated.lastDequeueTime, Environment.TickCount); 34 if (ThreadPoolWorkQueue.Dispatch()) 35 { 36 // If the queue runs out of work for us, we need to update the number of working workers to reflect that we are done working for now 37 RemoveWorkingWorker(); 38 } 39 } 40 else 41 { 42 // If we woke up but couldn't find a request, we need to update the number of working workers to reflect that we are done working for now 43 RemoveWorkingWorker(); 44 } 45 } 46 47 ThreadPoolInstance._hillClimbingThreadAdjustmentLock.Acquire(); 48 try 49 { 50 // At this point, the thread's wait timed out. We are shutting down this thread. 51 // We are going to decrement the number of exisiting threads to no longer include this one 52 // and then change the max number of threads in the thread pool to reflect that we don't need as many 53 // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads. 54 ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts); 55 while (true) 56 { 57 if (counts.numExistingThreads == counts.numProcessingWork) 58 { 59 // In this case, enough work came in that this thread should not time out and should go back to work. 60 break; 61 } 62 63 ThreadCounts newCounts = counts; 64 newCounts.numExistingThreads--; 65 newCounts.numThreadsGoal = Math.Max(ThreadPoolInstance._minThreads, Math.Min(newCounts.numExistingThreads, newCounts.numThreadsGoal)); 66 ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts); 67 if (oldCounts == counts) 68 { 69 HillClimbing.ThreadPoolHillClimber.ForceChange(newCounts.numThreadsGoal, HillClimbing.StateOrTransition.ThreadTimedOut); 70 ClrThreadPoolEventSource.Log.WorkerThreadStop(newCounts.numExistingThreads); 71 return; 72 } 73 } 74 } 75 finally 76 { 77 ThreadPoolInstance._hillClimbingThreadAdjustmentLock.Release(); 78 } 79 } 80 } 81 82 /// <summary> 83 /// Waits for a request to work. 84 /// </summary> 85 /// <returns>If this thread was woken up before it timed out.</returns> WaitForRequest()86 private static bool WaitForRequest() 87 { 88 ClrThreadPoolEventSource.Log.WorkerThreadWait(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads); 89 return s_semaphore.Wait(ThreadPoolThreadTimeoutMs); 90 } 91 92 /// <summary> 93 /// Reduce the number of working workers by one, but maybe add back a worker (possibily this thread) if a thread request comes in while we are marking this thread as not working. 94 /// </summary> RemoveWorkingWorker()95 private static void RemoveWorkingWorker() 96 { 97 ThreadCounts currentCounts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts); 98 while (true) 99 { 100 ThreadCounts newCounts = currentCounts; 101 newCounts.numProcessingWork--; 102 ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, currentCounts); 103 104 if (oldCounts == currentCounts) 105 { 106 break; 107 } 108 currentCounts = oldCounts; 109 } 110 111 // It's possible that we decided we had thread requests just before a request came in, 112 // but reduced the worker count *after* the request came in. In this case, we might 113 // miss the notification of a thread request. So we wake up a thread (maybe this one!) 114 // if there is work to do. 115 if (ThreadPoolInstance._numRequestedWorkers > 0) 116 { 117 MaybeAddWorkingWorker(); 118 } 119 } 120 MaybeAddWorkingWorker()121 internal static void MaybeAddWorkingWorker() 122 { 123 ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts); 124 ThreadCounts newCounts; 125 while (true) 126 { 127 newCounts = counts; 128 newCounts.numProcessingWork = Math.Max(counts.numProcessingWork, Math.Min((short)(counts.numProcessingWork + 1), counts.numThreadsGoal)); 129 newCounts.numExistingThreads = Math.Max(counts.numExistingThreads, newCounts.numProcessingWork); 130 131 if (newCounts == counts) 132 { 133 return; 134 } 135 136 ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts); 137 138 if (oldCounts == counts) 139 { 140 break; 141 } 142 143 counts = oldCounts; 144 } 145 146 int toCreate = newCounts.numExistingThreads - counts.numExistingThreads; 147 int toRelease = newCounts.numProcessingWork - counts.numProcessingWork; 148 149 if (toRelease > 0) 150 { 151 s_semaphore.Release(toRelease); 152 } 153 154 while (toCreate > 0) 155 { 156 if (TryCreateWorkerThread()) 157 { 158 toCreate--; 159 } 160 else 161 { 162 counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts); 163 while (true) 164 { 165 newCounts = counts; 166 newCounts.numProcessingWork -= (short)toCreate; 167 newCounts.numExistingThreads -= (short)toCreate; 168 169 ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts); 170 if(oldCounts == counts) 171 { 172 break; 173 } 174 counts = oldCounts; 175 } 176 toCreate = 0; 177 } 178 } 179 } 180 181 /// <summary> 182 /// Returns if the current thread should stop processing work on the thread pool. 183 /// A thread should stop processing work on the thread pool when work remains only when 184 /// there are more worker threads in the thread pool than we currently want. 185 /// </summary> 186 /// <returns>Whether or not this thread should stop processing work even if there is still work in the queue.</returns> ShouldStopProcessingWorkNow()187 internal static bool ShouldStopProcessingWorkNow() 188 { 189 ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts); 190 while (true) 191 { 192 if (counts.numExistingThreads <= counts.numThreadsGoal) 193 { 194 return false; 195 } 196 197 ThreadCounts newCounts = counts; 198 newCounts.numProcessingWork--; 199 200 ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts); 201 202 if (oldCounts == counts) 203 { 204 return true; 205 } 206 counts = oldCounts; 207 } 208 } 209 TakeActiveRequest()210 private static bool TakeActiveRequest() 211 { 212 int count = ThreadPoolInstance._numRequestedWorkers; 213 while (count > 0) 214 { 215 int prevCount = Interlocked.CompareExchange(ref ThreadPoolInstance._numRequestedWorkers, count - 1, count); 216 if (prevCount == count) 217 { 218 return true; 219 } 220 count = prevCount; 221 } 222 return false; 223 } 224 TryCreateWorkerThread()225 private static bool TryCreateWorkerThread() 226 { 227 try 228 { 229 RuntimeThread workerThread = RuntimeThread.Create(WorkerThreadStart); 230 workerThread.IsThreadPoolThread = true; 231 workerThread.IsBackground = true; 232 workerThread.Start(); 233 } 234 catch (ThreadStartException) 235 { 236 return false; 237 } 238 catch (OutOfMemoryException) 239 { 240 return false; 241 } 242 return true; 243 } 244 } 245 } 246 } 247