1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 //
8 
9 //
10 // A pair of schedulers that together support concurrent (reader) / exclusive (writer)
11 // task scheduling.  Using just the exclusive scheduler can be used to simulate a serial
12 // processing queue, and using just the concurrent scheduler with a specified
13 // MaximumConcurrentlyLevel can be used to achieve a MaxDegreeOfParallelism across
14 // a bunch of tasks, parallel loops, dataflow blocks, etc.
15 //
16 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
17 
18 using System.Collections.Generic;
19 using System.Diagnostics;
20 using System.Diagnostics.CodeAnalysis;
21 
22 namespace System.Threading.Tasks
23 {
24     /// <summary>
25     /// Provides concurrent and exclusive task schedulers that coordinate to execute
26     /// tasks while ensuring that concurrent tasks may run concurrently and exclusive tasks never do.
27     /// </summary>
28     [DebuggerDisplay("Concurrent={ConcurrentTaskCountForDebugger}, Exclusive={ExclusiveTaskCountForDebugger}, Mode={ModeForDebugger}")]
29     [DebuggerTypeProxy(typeof(ConcurrentExclusiveSchedulerPair.DebugView))]
30     public class ConcurrentExclusiveSchedulerPair
31     {
32         /// <summary>A processing mode to denote what kinds of tasks are currently being processed on this thread.</summary>
33         private readonly ThreadLocal<ProcessingMode> m_threadProcessingMode = new ThreadLocal<ProcessingMode>();
34         /// <summary>The scheduler used to queue and execute "concurrent" tasks that may run concurrently with other concurrent tasks.</summary>
35         private readonly ConcurrentExclusiveTaskScheduler m_concurrentTaskScheduler;
36         /// <summary>The scheduler used to queue and execute "exclusive" tasks that must run exclusively while no other tasks for this pair are running.</summary>
37         private readonly ConcurrentExclusiveTaskScheduler m_exclusiveTaskScheduler;
38         /// <summary>The underlying task scheduler to which all work should be scheduled.</summary>
39         private readonly TaskScheduler m_underlyingTaskScheduler;
40         /// <summary>
41         /// The maximum number of tasks allowed to run concurrently.  This only applies to concurrent tasks,
42         /// since exlusive tasks are inherently limited to 1.
43         /// </summary>
44         private readonly int m_maxConcurrencyLevel;
45         /// <summary>The maximum number of tasks we can process before recyling our runner tasks.</summary>
46         private readonly int m_maxItemsPerTask;
47         /// <summary>
48         /// If positive, it represents the number of concurrently running concurrent tasks.
49         /// If negative, it means an exclusive task has been scheduled.
50         /// If 0, nothing has been scheduled.
51         /// </summary>
52         private int m_processingCount;
53         /// <summary>Completion state for a task representing the completion of this pair.</summary>
54         /// <remarks>Lazily-initialized only if the scheduler pair is shutting down or if the Completion is requested.</remarks>
55         private CompletionState m_completionState;
56 
57         /// <summary>A constant value used to signal unlimited processing.</summary>
58         private const int UNLIMITED_PROCESSING = -1;
59         /// <summary>Constant used for m_processingCount to indicate that an exclusive task is being processed.</summary>
60         private const int EXCLUSIVE_PROCESSING_SENTINEL = -1;
61         /// <summary>Default MaxItemsPerTask to use for processing if none is specified.</summary>
62         private const int DEFAULT_MAXITEMSPERTASK = UNLIMITED_PROCESSING;
63         /// <summary>Default MaxConcurrencyLevel is the processor count if not otherwise specified.</summary>
64         private static Int32 DefaultMaxConcurrencyLevel { get { return Environment.ProcessorCount; } }
65 
66         /// <summary>Gets the sync obj used to protect all state on this instance.</summary>
67         private readonly Lock ValueLock = new Lock();
68 
69         /// <summary>
70         /// Initializes the ConcurrentExclusiveSchedulerPair.
71         /// </summary>
ConcurrentExclusiveSchedulerPair()72         public ConcurrentExclusiveSchedulerPair() :
73             this(TaskScheduler.Default, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK)
74         { }
75 
76         /// <summary>
77         /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler.
78         /// </summary>
79         /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param>
ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler)80         public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler) :
81             this(taskScheduler, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK)
82         { }
83 
84         /// <summary>
85         /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum concurrency level.
86         /// </summary>
87         /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param>
88         /// <param name="maxConcurrencyLevel">The maximum number of tasks to run concurrently.</param>
ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel)89         public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel) :
90             this(taskScheduler, maxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK)
91         { }
92 
93         /// <summary>
94         /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum
95         /// concurrency level and a maximum number of scheduled tasks that may be processed as a unit.
96         /// </summary>
97         /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param>
98         /// <param name="maxConcurrencyLevel">The maximum number of tasks to run concurrently.</param>
99         /// <param name="maxItemsPerTask">The maximum number of tasks to process for each underlying scheduled task used by the pair.</param>
ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)100         public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)
101         {
102             // Validate arguments
103             if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler));
104             if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException(nameof(maxConcurrencyLevel));
105             if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException(nameof(maxItemsPerTask));
106 
107             // Store configuration
108             m_underlyingTaskScheduler = taskScheduler;
109             m_maxConcurrencyLevel = maxConcurrencyLevel;
110             m_maxItemsPerTask = maxItemsPerTask;
111 
112             // Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level
113             int mcl = taskScheduler.MaximumConcurrencyLevel;
114             if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl;
115 
116             // Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't
117             // have to special case UNLIMITED_PROCESSING later on in processing.
118             if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue;
119             if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue;
120 
121             // Create the concurrent/exclusive schedulers for this pair
122             m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask);
123             m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks);
124         }
125 
126         /// <summary>Informs the scheduler pair that it should not accept any more tasks.</summary>
127         /// <remarks>
128         /// Calling <see cref="Complete"/> is optional, and it's only necessary if the <see cref="Completion"/>
129         /// will be relied on for notification of all processing being completed.
130         /// </remarks>
Complete()131         public void Complete()
132         {
133             using (LockHolder.Hold(ValueLock))
134             {
135                 if (!CompletionRequested)
136                 {
137                     RequestCompletion();
138                     CleanupStateIfCompletingAndQuiesced();
139                 }
140             }
141         }
142 
143         /// <summary>Gets a <see cref="System.Threading.Tasks.Task"/> that will complete when the scheduler has completed processing.</summary>
144         public Task Completion
145         {
146             // ValueLock not needed, but it's ok if it's held
147             get { return EnsureCompletionStateInitialized().Task; }
148         }
149 
150         /// <summary>Gets the lazily-initialized completion state.</summary>
EnsureCompletionStateInitialized()151         private CompletionState EnsureCompletionStateInitialized()
152         {
153             // ValueLock not needed, but it's ok if it's held
154             return LazyInitializer.EnsureInitialized(ref m_completionState, () => new CompletionState());
155         }
156 
157         /// <summary>Gets whether completion has been requested.</summary>
158         private bool CompletionRequested
159         {
160             // ValueLock not needed, but it's ok if it's held
161             get { return m_completionState != null && Volatile.Read(ref m_completionState.m_completionRequested); }
162         }
163 
164         /// <summary>Sets that completion has been requested.</summary>
RequestCompletion()165         private void RequestCompletion()
166         {
167             ContractAssertMonitorStatus(ValueLock, held: true);
168             EnsureCompletionStateInitialized().m_completionRequested = true;
169         }
170 
171         /// <summary>
172         /// Cleans up state if and only if there's no processing currently happening
173         /// and no more to be done later.
174         /// </summary>
CleanupStateIfCompletingAndQuiesced()175         private void CleanupStateIfCompletingAndQuiesced()
176         {
177             ContractAssertMonitorStatus(ValueLock, held: true);
178             if (ReadyToComplete) CompleteTaskAsync();
179         }
180 
181         /// <summary>Gets whether the pair is ready to complete.</summary>
182         private bool ReadyToComplete
183         {
184             get
185             {
186                 ContractAssertMonitorStatus(ValueLock, held: true);
187 
188                 // We can only complete if completion has been requested and no processing is currently happening.
189                 if (!CompletionRequested || m_processingCount != 0) return false;
190 
191                 // Now, only allow shutdown if an exception occurred or if there are no more tasks to process.
192                 var cs = EnsureCompletionStateInitialized();
193                 return
194                     (cs.m_exceptions != null && cs.m_exceptions.Count > 0) ||
195                     (m_concurrentTaskScheduler.m_tasks.IsEmpty && m_exclusiveTaskScheduler.m_tasks.IsEmpty);
196             }
197         }
198 
199         /// <summary>Completes the completion task asynchronously.</summary>
CompleteTaskAsync()200         private void CompleteTaskAsync()
201         {
202             Debug.Assert(ReadyToComplete, "The block must be ready to complete to be here.");
203             ContractAssertMonitorStatus(ValueLock, held: true);
204 
205             // Ensure we only try to complete once, then schedule completion
206             // in order to escape held locks and the caller's context
207             var cs = EnsureCompletionStateInitialized();
208             if (!cs.m_completionQueued)
209             {
210                 cs.m_completionQueued = true;
211                 ThreadPool.QueueUserWorkItem(state =>
212                 {
213                     var localCs = (CompletionState)state; // don't use 'cs', as it'll force a closure
214                     Debug.Assert(!localCs.Task.IsCompleted, "Completion should only happen once.");
215 
216                     var exceptions = localCs.m_exceptions;
217                     bool success = (exceptions != null && exceptions.Count > 0) ?
218                         localCs.TrySetException(exceptions) :
219                         localCs.TrySetResult(default(VoidTaskResult));
220                     Debug.Assert(success, "Expected to complete completion task.");
221                 }, cs);
222             }
223         }
224 
225         /// <summary>Initiatites scheduler shutdown due to a worker task faulting..</summary>
226         /// <param name="faultedTask">The faulted worker task that's initiating the shutdown.</param>
FaultWithTask(Task faultedTask)227         private void FaultWithTask(Task faultedTask)
228         {
229             Debug.Assert(faultedTask != null && faultedTask.IsFaulted && faultedTask.Exception.InnerExceptions.Count > 0,
230                 "Needs a task in the faulted state and thus with exceptions.");
231             ContractAssertMonitorStatus(ValueLock, held: true);
232 
233             // Store the faulted task's exceptions
234             var cs = EnsureCompletionStateInitialized();
235             if (cs.m_exceptions == null) cs.m_exceptions = new LowLevelListWithIList<Exception>();
236             cs.m_exceptions.AddRange(faultedTask.Exception.InnerExceptions);
237 
238             // Now that we're doomed, request completion
239             RequestCompletion();
240         }
241 
242         /// <summary>
243         /// Gets a TaskScheduler that can be used to schedule tasks to this pair
244         /// that may run concurrently with other tasks on this pair.
245         /// </summary>
246         public TaskScheduler ConcurrentScheduler { get { return m_concurrentTaskScheduler; } }
247         /// <summary>
248         /// Gets a TaskScheduler that can be used to schedule tasks to this pair
249         /// that must run exclusively with regards to other tasks on this pair.
250         /// </summary>
251         public TaskScheduler ExclusiveScheduler { get { return m_exclusiveTaskScheduler; } }
252 
253         /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
254         /// <remarks>This does not take the necessary lock, as it's only called from under the debugger.</remarks>
255         [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
256         private int ConcurrentTaskCountForDebugger { get { return m_concurrentTaskScheduler.m_tasks.Count; } }
257 
258         /// <summary>Gets the number of tasks waiting to run exclusively.</summary>
259         /// <remarks>This does not take the necessary lock, as it's only called from under the debugger.</remarks>
260         [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
261         private int ExclusiveTaskCountForDebugger { get { return m_exclusiveTaskScheduler.m_tasks.Count; } }
262 
263         /// <summary>Notifies the pair that new work has arrived to be processed.</summary>
264         /// <param name="fairly">Whether tasks should be scheduled fairly with regards to other tasks.</param>
265         /// <remarks>Must only be called while holding the lock.</remarks>
266         [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
267         [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")]
268         [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
ProcessAsyncIfNecessary(bool fairly = false)269         private void ProcessAsyncIfNecessary(bool fairly = false)
270         {
271             ContractAssertMonitorStatus(ValueLock, held: true);
272 
273             // If the current processing count is >= 0, we can potentially launch further processing.
274             if (m_processingCount >= 0)
275             {
276                 // We snap whether there are any exclusive tasks or concurrent tasks waiting.
277                 // (We grab the concurrent count below only once we know we need it.)
278                 // With processing happening concurrent to this operation, this data may
279                 // immediately be out of date, but it can only go from non-empty
280                 // to empty and not the other way around.  As such, this is safe,
281                 // as worst case is we'll schedule an extra  task when we didn't
282                 // otherwise need to, and we'll just eat its overhead.
283                 bool exclusiveTasksAreWaiting = !m_exclusiveTaskScheduler.m_tasks.IsEmpty;
284 
285                 // If there's no processing currently happening but there are waiting exclusive tasks,
286                 // let's start processing those exclusive tasks.
287                 Task processingTask = null;
288                 if (m_processingCount == 0 && exclusiveTasksAreWaiting)
289                 {
290                     // Launch exclusive task processing
291                     m_processingCount = EXCLUSIVE_PROCESSING_SENTINEL; // -1
292                     try
293                     {
294                         processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessExclusiveTasks(), this,
295                             default(CancellationToken), GetCreationOptionsForTask(fairly));
296                         processingTask.Start(m_underlyingTaskScheduler);
297                         // When we call Start, if the underlying scheduler throws in QueueTask, TPL will fault the task and rethrow
298                         // the exception.  To deal with that, we need a reference to the task object, so that we can observe its exception.
299                         // Hence, we separate creation and starting, so that we can store a reference to the task before we attempt QueueTask.
300                     }
301                     catch
302                     {
303                         m_processingCount = 0;
304                         FaultWithTask(processingTask);
305                     }
306                 }
307                 // If there are no waiting exclusive tasks, there are concurrent tasks, and we haven't reached our maximum
308                 // concurrency level for processing, let's start processing more concurrent tasks.
309                 else
310                 {
311                     int concurrentTasksWaitingCount = m_concurrentTaskScheduler.m_tasks.Count;
312 
313                     if (concurrentTasksWaitingCount > 0 && !exclusiveTasksAreWaiting && m_processingCount < m_maxConcurrencyLevel)
314                     {
315                         // Launch concurrent task processing, up to the allowed limit
316                         for (int i = 0; i < concurrentTasksWaitingCount && m_processingCount < m_maxConcurrencyLevel; ++i)
317                         {
318                             ++m_processingCount;
319                             try
320                             {
321                                 processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessConcurrentTasks(), this,
322                                     default(CancellationToken), GetCreationOptionsForTask(fairly));
323                                 processingTask.Start(m_underlyingTaskScheduler); // See above logic for why we use new + Start rather than StartNew
324                             }
325                             catch
326                             {
327                                 --m_processingCount;
328                                 FaultWithTask(processingTask);
329                             }
330                         }
331                     }
332                 }
333 
334                 // Check to see if all tasks have completed and if completion has been requested.
335                 CleanupStateIfCompletingAndQuiesced();
336             }
337             else Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing count must be the sentinel if it's not >= 0.");
338         }
339 
340         /// <summary>
341         /// Processes exclusive tasks serially until either there are no more to process
342         /// or we've reached our user-specified maximum limit.
343         /// </summary>
ProcessExclusiveTasks()344         private void ProcessExclusiveTasks()
345         {
346             Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "Processing exclusive tasks requires being in exclusive mode.");
347             Debug.Assert(!m_exclusiveTaskScheduler.m_tasks.IsEmpty, "Processing exclusive tasks requires tasks to be processed.");
348             ContractAssertMonitorStatus(ValueLock, held: false);
349             try
350             {
351                 // Note that we're processing exclusive tasks on the current thread
352                 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.NotCurrentlyProcessing,
353                     "This thread should not yet be involved in this pair's processing.");
354                 m_threadProcessingMode.Value = ProcessingMode.ProcessingExclusiveTask;
355 
356                 // Process up to the maximum number of items per task allowed
357                 for (int i = 0; i < m_maxItemsPerTask; i++)
358                 {
359                     // Get the next available exclusive task.  If we can't find one, bail.
360                     Task exclusiveTask;
361                     if (!m_exclusiveTaskScheduler.m_tasks.TryDequeue(out exclusiveTask)) break;
362 
363                     // Execute the task.  If the scheduler was previously faulted,
364                     // this task could have been faulted when it was queued; ignore such tasks.
365                     if (!exclusiveTask.IsFaulted) m_exclusiveTaskScheduler.ExecuteTask(exclusiveTask);
366                 }
367             }
368             finally
369             {
370                 // We're no longer processing exclusive tasks on the current thread
371                 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.ProcessingExclusiveTask,
372                     "Somehow we ended up escaping exclusive mode.");
373                 m_threadProcessingMode.Value = ProcessingMode.NotCurrentlyProcessing;
374 
375                 using (LockHolder.Hold(ValueLock))
376                 {
377                     // When this task was launched, we tracked it by setting m_processingCount to WRITER_IN_PROGRESS.
378                     // now reset it to 0.  Then check to see whether there's more processing to be done.
379                     // There might be more concurrent tasks available, for example, if concurrent tasks arrived
380                     // after we exited the loop, or if we exited the loop while concurrent tasks were still
381                     // available but we hit our maxItemsPerTask limit.
382                     Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing mode should not have deviated from exclusive.");
383                     m_processingCount = 0;
384                     ProcessAsyncIfNecessary(true);
385                 }
386             }
387         }
388 
389         /// <summary>
390         /// Processes concurrent tasks serially until either there are no more to process,
391         /// we've reached our user-specified maximum limit, or exclusive tasks have arrived.
392         /// </summary>
ProcessConcurrentTasks()393         private void ProcessConcurrentTasks()
394         {
395             Debug.Assert(m_processingCount > 0, "Processing concurrent tasks requires us to be in concurrent mode.");
396             ContractAssertMonitorStatus(ValueLock, held: false);
397             try
398             {
399                 // Note that we're processing concurrent tasks on the current thread
400                 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.NotCurrentlyProcessing,
401                     "This thread should not yet be involved in this pair's processing.");
402                 m_threadProcessingMode.Value = ProcessingMode.ProcessingConcurrentTasks;
403 
404                 // Process up to the maximum number of items per task allowed
405                 for (int i = 0; i < m_maxItemsPerTask; i++)
406                 {
407                     // Get the next available concurrent task.  If we can't find one, bail.
408                     Task concurrentTask;
409                     if (!m_concurrentTaskScheduler.m_tasks.TryDequeue(out concurrentTask)) break;
410 
411                     // Execute the task.  If the scheduler was previously faulted,
412                     // this task could have been faulted when it was queued; ignore such tasks.
413                     if (!concurrentTask.IsFaulted) m_concurrentTaskScheduler.ExecuteTask(concurrentTask);
414 
415                     // Now check to see if exclusive tasks have arrived; if any have, they take priority
416                     // so we'll bail out here.  Note that we could have checked this condition
417                     // in the for loop's condition, but that could lead to extra overhead
418                     // in the case where a concurrent task arrives, this task is launched, and then
419                     // before entering the loop an exclusive task arrives.  If we didn't execute at
420                     // least one task, we would have spent all of the overhead to launch a
421                     // task but with none of the benefit.  There's of course also an inherent
422                     // race here with regards to exclusive tasks arriving, and we're ok with
423                     // executing one more concurrent task than we should before giving priority to exclusive tasks.
424                     if (!m_exclusiveTaskScheduler.m_tasks.IsEmpty) break;
425                 }
426             }
427             finally
428             {
429                 // We're no longer processing concurrent tasks on the current thread
430                 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.ProcessingConcurrentTasks,
431                     "Somehow we ended up escaping concurrent mode.");
432                 m_threadProcessingMode.Value = ProcessingMode.NotCurrentlyProcessing;
433 
434                 using (LockHolder.Hold(ValueLock))
435                 {
436                     // When this task was launched, we tracked it with a positive processing count;
437                     // decrement that count.  Then check to see whether there's more processing to be done.
438                     // There might be more concurrent tasks available, for example, if concurrent tasks arrived
439                     // after we exited the loop, or if we exited the loop while concurrent tasks were still
440                     // available but we hit our maxItemsPerTask limit.
441                     Debug.Assert(m_processingCount > 0, "The procesing mode should not have deviated from concurrent.");
442                     if (m_processingCount > 0) --m_processingCount;
443                     ProcessAsyncIfNecessary(true);
444                 }
445             }
446         }
447 
448         /// <summary>
449         /// Holder for lazily-initialized state about the completion of a scheduler pair.
450         /// Completion is only triggered either by rare exceptional conditions or by
451         /// the user calling Complete, and as such we only lazily initialize this
452         /// state in one of those conditions or if the user explicitly asks for
453         /// the Completion.
454         /// </summary>
455         [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")]
456         private sealed class CompletionState : TaskCompletionSource<VoidTaskResult>
457         {
458             /// <summary>Whether the scheduler has had completion requested.</summary>
459             /// <remarks>This variable is not volatile, so to gurantee safe reading reads, Volatile.Read is used in TryExecuteTaskInline.</remarks>
460             internal bool m_completionRequested;
461             /// <summary>Whether completion processing has been queued.</summary>
462             internal bool m_completionQueued;
463             /// <summary>Unrecoverable exceptions incurred while processing.</summary>
464             internal LowLevelListWithIList<Exception> m_exceptions;
465         }
466 
467         /// <summary>
468         /// A scheduler shim used to queue tasks to the pair and execute those tasks on request of the pair.
469         /// </summary>
470         [DebuggerDisplay("Count={CountForDebugger}, MaxConcurrencyLevel={m_maxConcurrencyLevel}, Id={Id}")]
471         [DebuggerTypeProxy(typeof(ConcurrentExclusiveTaskScheduler.DebugView))]
472         private sealed class ConcurrentExclusiveTaskScheduler : TaskScheduler
473         {
474             /// <summary>Cached delegate for invoking TryExecuteTaskShim.</summary>
475             private static readonly Func<object, bool> s_tryExecuteTaskShim = new Func<object, bool>(TryExecuteTaskShim);
476             /// <summary>The parent pair.</summary>
477             private readonly ConcurrentExclusiveSchedulerPair m_pair;
478             /// <summary>The maximum concurrency level for the scheduler.</summary>
479             private readonly int m_maxConcurrencyLevel;
480             /// <summary>The processing mode of this scheduler, exclusive or concurrent.</summary>
481             private readonly ProcessingMode m_processingMode;
482             /// <summary>Gets the queue of tasks for this scheduler.</summary>
483             internal readonly IProducerConsumerQueue<Task> m_tasks;
484 
485             /// <summary>Initializes the scheduler.</summary>
486             /// <param name="pair">The parent pair.</param>
487             /// <param name="maxConcurrencyLevel">The maximum degree of concurrency this scheduler may use.</param>
488             /// <param name="processingMode">The processing mode of this scheduler.</param>
ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair pair, int maxConcurrencyLevel, ProcessingMode processingMode)489             internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair pair, int maxConcurrencyLevel, ProcessingMode processingMode)
490             {
491                 Debug.Assert(pair != null, "Scheduler must be associated with a valid pair.");
492                 Debug.Assert(processingMode == ProcessingMode.ProcessingConcurrentTasks || processingMode == ProcessingMode.ProcessingExclusiveTask,
493                     "Scheduler must be for concurrent or exclusive processing.");
494                 Debug.Assert(
495                     (processingMode == ProcessingMode.ProcessingConcurrentTasks && (maxConcurrencyLevel >= 1 || maxConcurrencyLevel == UNLIMITED_PROCESSING)) ||
496                     (processingMode == ProcessingMode.ProcessingExclusiveTask && maxConcurrencyLevel == 1),
497                     "If we're in concurrent mode, our concurrency level should be positive or unlimited.  If exclusive, it should be 1.");
498 
499                 m_pair = pair;
500                 m_maxConcurrencyLevel = maxConcurrencyLevel;
501                 m_processingMode = processingMode;
502                 m_tasks = (processingMode == ProcessingMode.ProcessingExclusiveTask) ?
503                     (IProducerConsumerQueue<Task>)new SingleProducerSingleConsumerQueue<Task>() :
504                     (IProducerConsumerQueue<Task>)new MultiProducerMultiConsumerQueue<Task>();
505             }
506 
507             /// <summary>Gets the maximum concurrency level this scheduler is able to support.</summary>
508             public override int MaximumConcurrencyLevel { get { return m_maxConcurrencyLevel; } }
509 
510             /// <summary>Queues a task to the scheduler.</summary>
511             /// <param name="task">The task to be queued.</param>
QueueTask(Task task)512             protected internal override void QueueTask(Task task)
513             {
514                 Debug.Assert(task != null, "Infrastructure should have provided a non-null task.");
515                 using (LockHolder.Hold(m_pair.ValueLock))
516                 {
517                     // If the scheduler has already had completion requested, no new work is allowed to be scheduled
518                     if (m_pair.CompletionRequested) throw new InvalidOperationException(GetType().ToString());
519 
520                     // Queue the task, and then let the pair know that more work is now available to be scheduled
521                     m_tasks.Enqueue(task);
522                     m_pair.ProcessAsyncIfNecessary();
523                 }
524             }
525 
526             /// <summary>Executes a task on this scheduler.</summary>
527             /// <param name="task">The task to be executed.</param>
ExecuteTask(Task task)528             internal void ExecuteTask(Task task)
529             {
530                 Debug.Assert(task != null, "Infrastructure should have provided a non-null task.");
531                 base.TryExecuteTask(task);
532             }
533 
534             /// <summary>Tries to execute the task synchronously on this scheduler.</summary>
535             /// <param name="task">The task to execute.</param>
536             /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to the scheduler.</param>
537             /// <returns>true if the task could be executed; otherwise, false.</returns>
TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)538             protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
539             {
540                 Debug.Assert(task != null, "Infrastructure should have provided a non-null task.");
541 
542                 // If the scheduler has had completion requested, no new work is allowed to be scheduled.
543                 // A non-locked read on m_completionRequested (in CompletionRequested) is acceptable here because:
544                 // a) we don't need to be exact... a Complete call could come in later in the function anyway
545                 // b) this is only a fast path escape hatch.  To actually inline the task,
546                 //    we need to be inside of an already executing task, and in such a case,
547                 //    while completion may have been requested, we can't have shutdown yet.
548                 if (!taskWasPreviouslyQueued && m_pair.CompletionRequested) return false;
549 
550                 // We know the implementation of the default scheduler and how it will behave.
551                 // As it's the most common underlying scheduler, we optimize for it.
552                 bool isDefaultScheduler = m_pair.m_underlyingTaskScheduler == TaskScheduler.Default;
553 
554                 // If we're targeting the default scheduler and taskWasPreviouslyQueued is true,
555                 // we know that the default scheduler will only allow it to be inlined
556                 // if we're on a thread pool thread (but it won't always allow it in that case,
557                 // since it'll only allow inlining if it can find the task in the local queue).
558                 // As such, if we're not on a thread pool thread, we know for sure the
559                 // task won't be inlined, so let's not even try.
560                 if (isDefaultScheduler && taskWasPreviouslyQueued && !ThreadPool.IsThreadPoolThread)
561                 {
562                     return false;
563                 }
564                 else
565                 {
566                     // If a task is already running on this thread, allow inline execution to proceed.
567                     // If there's already a task from this scheduler running on the current thread, we know it's safe
568                     // to run this task, in effect temporarily taking that task's count allocation.
569                     if (m_pair.m_threadProcessingMode.Value == m_processingMode)
570                     {
571                         // If we're targeting the default scheduler and taskWasPreviouslyQueued is false,
572                         // we know the default scheduler will allow it, so we can just execute it here.
573                         // Otherwise, delegate to the target scheduler's inlining.
574                         return (isDefaultScheduler && !taskWasPreviouslyQueued) ?
575                             TryExecuteTask(task) :
576                             TryExecuteTaskInlineOnTargetScheduler(task);
577                     }
578                 }
579 
580                 // We're not in the context of a task already executing on this scheduler.  Bail.
581                 return false;
582             }
583 
584             /// <summary>
585             /// Implements a reasonable approximation for TryExecuteTaskInline on the underlying scheduler,
586             /// which we can't call directly on the underlying scheduler.
587             /// </summary>
588             /// <param name="task">The task to execute inline if possible.</param>
589             /// <returns>true if the task was inlined successfully; otherwise, false.</returns>
590             [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")]
TryExecuteTaskInlineOnTargetScheduler(Task task)591             private bool TryExecuteTaskInlineOnTargetScheduler(Task task)
592             {
593                 // We'd like to simply call TryExecuteTaskInline here, but we can't.
594                 // As there's no built-in API for this, a workaround is to create a new task that,
595                 // when executed, will simply call TryExecuteTask to run the real task, and then
596                 // we run our new shim task synchronously on the target scheduler.  If all goes well,
597                 // our synchronous invocation will succeed in running the shim task on the current thread,
598                 // which will in turn run the real task on the current thread.  If the scheduler
599                 // doesn't allow that execution, RunSynchronously will block until the underlying scheduler
600                 // is able to invoke the task, which might account for an additional but unavoidable delay.
601                 // Once it's done, we can return whether the task executed by returning the
602                 // shim task's Result, which is in turn the result of TryExecuteTask.
603                 var t = new Task<bool>(s_tryExecuteTaskShim, Tuple.Create(this, task));
604                 try
605                 {
606                     t.RunSynchronously(m_pair.m_underlyingTaskScheduler);
607                     return t.Result;
608                 }
609                 catch
610                 {
611                     Debug.Assert(t.IsFaulted, "Task should be faulted due to the scheduler faulting it and throwing the exception.");
612                     var ignored = t.Exception;
613                     throw;
614                 }
615             }
616 
617             /// <summary>Shim used to invoke this.TryExecuteTask(task).</summary>
618             /// <param name="state">A tuple of the ConcurrentExclusiveTaskScheduler and the task to execute.</param>
619             /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
620             /// <remarks>
621             /// This method is separated out not because of performance reasons but so that
622             /// the SecuritySafeCritical attribute may be employed.
623             /// </remarks>
TryExecuteTaskShim(object state)624             private static bool TryExecuteTaskShim(object state)
625             {
626                 var tuple = (Tuple<ConcurrentExclusiveTaskScheduler, Task>)state;
627                 return tuple.Item1.TryExecuteTask(tuple.Item2);
628             }
629 
630             /// <summary>Gets for debugging purposes the tasks scheduled to this scheduler.</summary>
631             /// <returns>An enumerable of the tasks queued.</returns>
GetScheduledTasks()632             protected override IEnumerable<Task> GetScheduledTasks() { return m_tasks; }
633 
634             /// <summary>Gets the number of tasks queued to this scheduler.</summary>
635             [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")]
636             private int CountForDebugger { get { return m_tasks.Count; } }
637 
638             /// <summary>Provides a debug view for ConcurrentExclusiveTaskScheduler.</summary>
639             private sealed class DebugView
640             {
641                 /// <summary>The scheduler being debugged.</summary>
642                 private readonly ConcurrentExclusiveTaskScheduler m_taskScheduler;
643 
644                 /// <summary>Initializes the debug view.</summary>
645                 /// <param name="scheduler">The scheduler being debugged.</param>
DebugView(ConcurrentExclusiveTaskScheduler scheduler)646                 public DebugView(ConcurrentExclusiveTaskScheduler scheduler)
647                 {
648                     Debug.Assert(scheduler != null, "Need a scheduler with which to construct the debug view.");
649                     m_taskScheduler = scheduler;
650                 }
651 
652                 /// <summary>Gets this pair's maximum allowed concurrency level.</summary>
653                 public int MaximumConcurrencyLevel { get { return m_taskScheduler.m_maxConcurrencyLevel; } }
654                 /// <summary>Gets the tasks scheduled to this scheduler.</summary>
655                 public IEnumerable<Task> ScheduledTasks { get { return m_taskScheduler.m_tasks; } }
656                 /// <summary>Gets the scheduler pair with which this scheduler is associated.</summary>
657                 public ConcurrentExclusiveSchedulerPair SchedulerPair { get { return m_taskScheduler.m_pair; } }
658             }
659         }
660 
661         /// <summary>Provides a debug view for ConcurrentExclusiveSchedulerPair.</summary>
662         private sealed class DebugView
663         {
664             /// <summary>The pair being debugged.</summary>
665             private readonly ConcurrentExclusiveSchedulerPair m_pair;
666 
667             /// <summary>Initializes the debug view.</summary>
668             /// <param name="pair">The pair being debugged.</param>
DebugView(ConcurrentExclusiveSchedulerPair pair)669             public DebugView(ConcurrentExclusiveSchedulerPair pair)
670             {
671                 Debug.Assert(pair != null, "Need a pair with which to construct the debug view.");
672                 m_pair = pair;
673             }
674 
675             /// <summary>Gets a representation of the execution state of the pair.</summary>
676             public ProcessingMode Mode { get { return m_pair.ModeForDebugger; } }
677             /// <summary>Gets the number of tasks waiting to run exclusively.</summary>
678             public IEnumerable<Task> ScheduledExclusive { get { return m_pair.m_exclusiveTaskScheduler.m_tasks; } }
679             /// <summary>Gets the number of tasks waiting to run concurrently.</summary>
680             public IEnumerable<Task> ScheduledConcurrent { get { return m_pair.m_concurrentTaskScheduler.m_tasks; } }
681             /// <summary>Gets the number of tasks currently being executed.</summary>
682             public int CurrentlyExecutingTaskCount
683             {
684                 get { return (m_pair.m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) ? 1 : m_pair.m_processingCount; }
685             }
686             /// <summary>Gets the underlying task scheduler that actually executes the tasks.</summary>
687             public TaskScheduler TargetScheduler { get { return m_pair.m_underlyingTaskScheduler; } }
688         }
689 
690         /// <summary>Gets an enumeration for debugging that represents the current state of the scheduler pair.</summary>
691         /// <remarks>This is only for debugging.  It does not take the necessary locks to be useful for runtime usage.</remarks>
692         private ProcessingMode ModeForDebugger
693         {
694             get
695             {
696                 // If our completion task is done, so are we.
697                 if (m_completionState != null && m_completionState.Task.IsCompleted) return ProcessingMode.Completed;
698 
699                 // Otherwise, summarize our current state.
700                 var mode = ProcessingMode.NotCurrentlyProcessing;
701                 if (m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) mode |= ProcessingMode.ProcessingExclusiveTask;
702                 if (m_processingCount >= 1) mode |= ProcessingMode.ProcessingConcurrentTasks;
703                 if (CompletionRequested) mode |= ProcessingMode.Completing;
704                 return mode;
705             }
706         }
707 
708         /// <summary>Asserts that a given Lock object is either held or not held.</summary>
709         /// <param name="syncObj">The Lock object to check.</param>
710         /// <param name="held">Whether we want to assert that it's currently held or not held.</param>
711         [Conditional("DEBUG")]
ContractAssertMonitorStatus(Lock syncObj, bool held)712         private static void ContractAssertMonitorStatus(Lock syncObj, bool held)
713         {
714             Debug.Assert(syncObj != null, "The Lock object to check must be provided.");
715             Debug.Assert(syncObj.IsAcquired == held, "The locking scheme was not correctly followed.");
716         }
717 
718         /// <summary>Gets the options to use for tasks.</summary>
719         /// <param name="isReplacementReplica">If this task is being created to replace another.</param>
720         /// <remarks>
721         /// These options should be used for all tasks that have the potential to run user code or
722         /// that are repeatedly spawned and thus need a modicum of fair treatment.
723         /// </remarks>
724         /// <returns>The options to use.</returns>
GetCreationOptionsForTask(bool isReplacementReplica = false)725         internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false)
726         {
727             TaskCreationOptions options =
728                 TaskCreationOptions.DenyChildAttach;
729 
730             if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness;
731             return options;
732         }
733 
734         /// <summary>Provides an enumeration that represents the current state of the scheduler pair.</summary>
735         [Flags]
736         internal enum ProcessingMode : byte
737         {
738             /// <summary>The scheduler pair is currently dormant, with no work scheduled.</summary>
739             NotCurrentlyProcessing = 0x0,
740             /// <summary>The scheduler pair has queued processing for exclusive tasks.</summary>
741             ProcessingExclusiveTask = 0x1,
742             /// <summary>The scheduler pair has queued processing for concurrent tasks.</summary>
743             ProcessingConcurrentTasks = 0x2,
744             /// <summary>Completion has been requested.</summary>
745             Completing = 0x4,
746             /// <summary>The scheduler pair is finished processing.</summary>
747             Completed = 0x8
748         }
749     }
750 }
751