1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Globalization;
6 using Internal.LowLevelLinq;
7 using Internal.Runtime.Augments;
8 
9 namespace System.Threading
10 {
11     internal partial class ClrThreadPool
12     {
13         /// <summary>
14         /// The worker thread infastructure for the CLR thread pool.
15         /// </summary>
16         private static class WorkerThread
17         {
18             /// <summary>
19             /// Semaphore for controlling how many threads are currently working.
20             /// </summary>
21             private static LowLevelLifoSemaphore s_semaphore = new LowLevelLifoSemaphore(0, MaxPossibleThreadCount);
22 
WorkerThreadStart()23             private static void WorkerThreadStart()
24             {
25                 ClrThreadPoolEventSource.Log.WorkerThreadStart(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads);
26                 RuntimeThread currentThread = RuntimeThread.CurrentThread;
27                 while (true)
28                 {
29                     while (WaitForRequest())
30                     {
31                         if (TakeActiveRequest())
32                         {
33                             Volatile.Write(ref ThreadPoolInstance._separated.lastDequeueTime, Environment.TickCount);
34                             if (ThreadPoolWorkQueue.Dispatch())
35                             {
36                                 // If the queue runs out of work for us, we need to update the number of working workers to reflect that we are done working for now
37                                 RemoveWorkingWorker();
38                             }
39                         }
40                         else
41                         {
42                             // If we woke up but couldn't find a request, we need to update the number of working workers to reflect that we are done working for now
43                             RemoveWorkingWorker();
44                         }
45                     }
46 
47                     ThreadPoolInstance._hillClimbingThreadAdjustmentLock.Acquire();
48                     try
49                     {
50                         // At this point, the thread's wait timed out. We are shutting down this thread.
51                         // We are going to decrement the number of exisiting threads to no longer include this one
52                         // and then change the max number of threads in the thread pool to reflect that we don't need as many
53                         // as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
54                         ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
55                         while (true)
56                         {
57                             if (counts.numExistingThreads == counts.numProcessingWork)
58                             {
59                                 // In this case, enough work came in that this thread should not time out and should go back to work.
60                                 break;
61                             }
62 
63                             ThreadCounts newCounts = counts;
64                             newCounts.numExistingThreads--;
65                             newCounts.numThreadsGoal = Math.Max(ThreadPoolInstance._minThreads, Math.Min(newCounts.numExistingThreads, newCounts.numThreadsGoal));
66                             ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
67                             if (oldCounts == counts)
68                             {
69                                 HillClimbing.ThreadPoolHillClimber.ForceChange(newCounts.numThreadsGoal, HillClimbing.StateOrTransition.ThreadTimedOut);
70                                 ClrThreadPoolEventSource.Log.WorkerThreadStop(newCounts.numExistingThreads);
71                                 return;
72                             }
73                         }
74                     }
75                     finally
76                     {
77                         ThreadPoolInstance._hillClimbingThreadAdjustmentLock.Release();
78                     }
79                 }
80             }
81 
82             /// <summary>
83             /// Waits for a request to work.
84             /// </summary>
85             /// <returns>If this thread was woken up before it timed out.</returns>
WaitForRequest()86             private static bool WaitForRequest()
87             {
88                 ClrThreadPoolEventSource.Log.WorkerThreadWait(ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts).numExistingThreads);
89                 return s_semaphore.Wait(ThreadPoolThreadTimeoutMs);
90             }
91 
92             /// <summary>
93             /// Reduce the number of working workers by one, but maybe add back a worker (possibily this thread) if a thread request comes in while we are marking this thread as not working.
94             /// </summary>
RemoveWorkingWorker()95             private static void RemoveWorkingWorker()
96             {
97                 ThreadCounts currentCounts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
98                 while (true)
99                 {
100                     ThreadCounts newCounts = currentCounts;
101                     newCounts.numProcessingWork--;
102                     ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, currentCounts);
103 
104                     if (oldCounts == currentCounts)
105                     {
106                         break;
107                     }
108                     currentCounts = oldCounts;
109                 }
110 
111                 // It's possible that we decided we had thread requests just before a request came in,
112                 // but reduced the worker count *after* the request came in.  In this case, we might
113                 // miss the notification of a thread request.  So we wake up a thread (maybe this one!)
114                 // if there is work to do.
115                 if (ThreadPoolInstance._numRequestedWorkers > 0)
116                 {
117                     MaybeAddWorkingWorker();
118                 }
119             }
120 
MaybeAddWorkingWorker()121             internal static void MaybeAddWorkingWorker()
122             {
123                 ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
124                 ThreadCounts newCounts;
125                 while (true)
126                 {
127                     newCounts = counts;
128                     newCounts.numProcessingWork = Math.Max(counts.numProcessingWork, Math.Min((short)(counts.numProcessingWork + 1), counts.numThreadsGoal));
129                     newCounts.numExistingThreads = Math.Max(counts.numExistingThreads, newCounts.numProcessingWork);
130 
131                     if (newCounts == counts)
132                     {
133                         return;
134                     }
135 
136                     ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
137 
138                     if (oldCounts == counts)
139                     {
140                         break;
141                     }
142 
143                     counts = oldCounts;
144                 }
145 
146                 int toCreate = newCounts.numExistingThreads - counts.numExistingThreads;
147                 int toRelease = newCounts.numProcessingWork - counts.numProcessingWork;
148 
149                 if (toRelease > 0)
150                 {
151                     s_semaphore.Release(toRelease);
152                 }
153 
154                 while (toCreate > 0)
155                 {
156                     if (TryCreateWorkerThread())
157                     {
158                         toCreate--;
159                     }
160                     else
161                     {
162                         counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
163                         while (true)
164                         {
165                             newCounts = counts;
166                             newCounts.numProcessingWork -= (short)toCreate;
167                             newCounts.numExistingThreads -= (short)toCreate;
168 
169                             ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
170                             if(oldCounts == counts)
171                             {
172                                 break;
173                             }
174                             counts = oldCounts;
175                         }
176                         toCreate = 0;
177                     }
178                 }
179             }
180 
181             /// <summary>
182             /// Returns if the current thread should stop processing work on the thread pool.
183             /// A thread should stop processing work on the thread pool when work remains only when
184             /// there are more worker threads in the thread pool than we currently want.
185             /// </summary>
186             /// <returns>Whether or not this thread should stop processing work even if there is still work in the queue.</returns>
ShouldStopProcessingWorkNow()187             internal static bool ShouldStopProcessingWorkNow()
188             {
189                 ThreadCounts counts = ThreadCounts.VolatileReadCounts(ref ThreadPoolInstance._separated.counts);
190                 while (true)
191                 {
192                     if (counts.numExistingThreads <= counts.numThreadsGoal)
193                     {
194                         return false;
195                     }
196 
197                     ThreadCounts newCounts = counts;
198                     newCounts.numProcessingWork--;
199 
200                     ThreadCounts oldCounts = ThreadCounts.CompareExchangeCounts(ref ThreadPoolInstance._separated.counts, newCounts, counts);
201 
202                     if (oldCounts == counts)
203                     {
204                         return true;
205                     }
206                     counts = oldCounts;
207                 }
208             }
209 
TakeActiveRequest()210             private static bool TakeActiveRequest()
211             {
212                 int count = ThreadPoolInstance._numRequestedWorkers;
213                 while (count > 0)
214                 {
215                     int prevCount = Interlocked.CompareExchange(ref ThreadPoolInstance._numRequestedWorkers, count - 1, count);
216                     if (prevCount == count)
217                     {
218                         return true;
219                     }
220                     count = prevCount;
221                 }
222                 return false;
223             }
224 
TryCreateWorkerThread()225             private static bool TryCreateWorkerThread()
226             {
227                 try
228                 {
229                     RuntimeThread workerThread = RuntimeThread.Create(WorkerThreadStart);
230                     workerThread.IsThreadPoolThread = true;
231                     workerThread.IsBackground = true;
232                     workerThread.Start();
233                 }
234                 catch (ThreadStartException)
235                 {
236                     return false;
237                 }
238                 catch (OutOfMemoryException)
239                 {
240                     return false;
241                 }
242                 return true;
243             }
244         }
245     }
246 }
247