1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // 8 9 // 10 // A pair of schedulers that together support concurrent (reader) / exclusive (writer) 11 // task scheduling. Using just the exclusive scheduler can be used to simulate a serial 12 // processing queue, and using just the concurrent scheduler with a specified 13 // MaximumConcurrentlyLevel can be used to achieve a MaxDegreeOfParallelism across 14 // a bunch of tasks, parallel loops, dataflow blocks, etc. 15 // 16 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 17 18 using System.Collections.Generic; 19 using System.Diagnostics; 20 using System.Diagnostics.CodeAnalysis; 21 22 namespace System.Threading.Tasks 23 { 24 /// <summary> 25 /// Provides concurrent and exclusive task schedulers that coordinate to execute 26 /// tasks while ensuring that concurrent tasks may run concurrently and exclusive tasks never do. 27 /// </summary> 28 [DebuggerDisplay("Concurrent={ConcurrentTaskCountForDebugger}, Exclusive={ExclusiveTaskCountForDebugger}, Mode={ModeForDebugger}")] 29 [DebuggerTypeProxy(typeof(ConcurrentExclusiveSchedulerPair.DebugView))] 30 public class ConcurrentExclusiveSchedulerPair 31 { 32 /// <summary>A processing mode to denote what kinds of tasks are currently being processed on this thread.</summary> 33 private readonly ThreadLocal<ProcessingMode> m_threadProcessingMode = new ThreadLocal<ProcessingMode>(); 34 /// <summary>The scheduler used to queue and execute "concurrent" tasks that may run concurrently with other concurrent tasks.</summary> 35 private readonly ConcurrentExclusiveTaskScheduler m_concurrentTaskScheduler; 36 /// <summary>The scheduler used to queue and execute "exclusive" tasks that must run exclusively while no other tasks for this pair are running.</summary> 37 private readonly ConcurrentExclusiveTaskScheduler m_exclusiveTaskScheduler; 38 /// <summary>The underlying task scheduler to which all work should be scheduled.</summary> 39 private readonly TaskScheduler m_underlyingTaskScheduler; 40 /// <summary> 41 /// The maximum number of tasks allowed to run concurrently. This only applies to concurrent tasks, 42 /// since exlusive tasks are inherently limited to 1. 43 /// </summary> 44 private readonly int m_maxConcurrencyLevel; 45 /// <summary>The maximum number of tasks we can process before recyling our runner tasks.</summary> 46 private readonly int m_maxItemsPerTask; 47 /// <summary> 48 /// If positive, it represents the number of concurrently running concurrent tasks. 49 /// If negative, it means an exclusive task has been scheduled. 50 /// If 0, nothing has been scheduled. 51 /// </summary> 52 private int m_processingCount; 53 /// <summary>Completion state for a task representing the completion of this pair.</summary> 54 /// <remarks>Lazily-initialized only if the scheduler pair is shutting down or if the Completion is requested.</remarks> 55 private CompletionState m_completionState; 56 57 /// <summary>A constant value used to signal unlimited processing.</summary> 58 private const int UNLIMITED_PROCESSING = -1; 59 /// <summary>Constant used for m_processingCount to indicate that an exclusive task is being processed.</summary> 60 private const int EXCLUSIVE_PROCESSING_SENTINEL = -1; 61 /// <summary>Default MaxItemsPerTask to use for processing if none is specified.</summary> 62 private const int DEFAULT_MAXITEMSPERTASK = UNLIMITED_PROCESSING; 63 /// <summary>Default MaxConcurrencyLevel is the processor count if not otherwise specified.</summary> 64 private static Int32 DefaultMaxConcurrencyLevel { get { return Environment.ProcessorCount; } } 65 66 /// <summary>Gets the sync obj used to protect all state on this instance.</summary> 67 private readonly Lock ValueLock = new Lock(); 68 69 /// <summary> 70 /// Initializes the ConcurrentExclusiveSchedulerPair. 71 /// </summary> ConcurrentExclusiveSchedulerPair()72 public ConcurrentExclusiveSchedulerPair() : 73 this(TaskScheduler.Default, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) 74 { } 75 76 /// <summary> 77 /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler. 78 /// </summary> 79 /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param> ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler)80 public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler) : 81 this(taskScheduler, DefaultMaxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) 82 { } 83 84 /// <summary> 85 /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum concurrency level. 86 /// </summary> 87 /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param> 88 /// <param name="maxConcurrencyLevel">The maximum number of tasks to run concurrently.</param> ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel)89 public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel) : 90 this(taskScheduler, maxConcurrencyLevel, DEFAULT_MAXITEMSPERTASK) 91 { } 92 93 /// <summary> 94 /// Initializes the ConcurrentExclusiveSchedulerPair to target the specified scheduler with a maximum 95 /// concurrency level and a maximum number of scheduled tasks that may be processed as a unit. 96 /// </summary> 97 /// <param name="taskScheduler">The target scheduler on which this pair should execute.</param> 98 /// <param name="maxConcurrencyLevel">The maximum number of tasks to run concurrently.</param> 99 /// <param name="maxItemsPerTask">The maximum number of tasks to process for each underlying scheduled task used by the pair.</param> ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask)100 public ConcurrentExclusiveSchedulerPair(TaskScheduler taskScheduler, int maxConcurrencyLevel, int maxItemsPerTask) 101 { 102 // Validate arguments 103 if (taskScheduler == null) throw new ArgumentNullException(nameof(taskScheduler)); 104 if (maxConcurrencyLevel == 0 || maxConcurrencyLevel < -1) throw new ArgumentOutOfRangeException(nameof(maxConcurrencyLevel)); 105 if (maxItemsPerTask == 0 || maxItemsPerTask < -1) throw new ArgumentOutOfRangeException(nameof(maxItemsPerTask)); 106 107 // Store configuration 108 m_underlyingTaskScheduler = taskScheduler; 109 m_maxConcurrencyLevel = maxConcurrencyLevel; 110 m_maxItemsPerTask = maxItemsPerTask; 111 112 // Downgrade to the underlying scheduler's max degree of parallelism if it's lower than the user-supplied level 113 int mcl = taskScheduler.MaximumConcurrencyLevel; 114 if (mcl > 0 && mcl < m_maxConcurrencyLevel) m_maxConcurrencyLevel = mcl; 115 116 // Treat UNLIMITED_PROCESSING/-1 for both MCL and MIPT as the biggest possible value so that we don't 117 // have to special case UNLIMITED_PROCESSING later on in processing. 118 if (m_maxConcurrencyLevel == UNLIMITED_PROCESSING) m_maxConcurrencyLevel = Int32.MaxValue; 119 if (m_maxItemsPerTask == UNLIMITED_PROCESSING) m_maxItemsPerTask = Int32.MaxValue; 120 121 // Create the concurrent/exclusive schedulers for this pair 122 m_exclusiveTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, 1, ProcessingMode.ProcessingExclusiveTask); 123 m_concurrentTaskScheduler = new ConcurrentExclusiveTaskScheduler(this, m_maxConcurrencyLevel, ProcessingMode.ProcessingConcurrentTasks); 124 } 125 126 /// <summary>Informs the scheduler pair that it should not accept any more tasks.</summary> 127 /// <remarks> 128 /// Calling <see cref="Complete"/> is optional, and it's only necessary if the <see cref="Completion"/> 129 /// will be relied on for notification of all processing being completed. 130 /// </remarks> Complete()131 public void Complete() 132 { 133 using (LockHolder.Hold(ValueLock)) 134 { 135 if (!CompletionRequested) 136 { 137 RequestCompletion(); 138 CleanupStateIfCompletingAndQuiesced(); 139 } 140 } 141 } 142 143 /// <summary>Gets a <see cref="System.Threading.Tasks.Task"/> that will complete when the scheduler has completed processing.</summary> 144 public Task Completion 145 { 146 // ValueLock not needed, but it's ok if it's held 147 get { return EnsureCompletionStateInitialized().Task; } 148 } 149 150 /// <summary>Gets the lazily-initialized completion state.</summary> EnsureCompletionStateInitialized()151 private CompletionState EnsureCompletionStateInitialized() 152 { 153 // ValueLock not needed, but it's ok if it's held 154 return LazyInitializer.EnsureInitialized(ref m_completionState, () => new CompletionState()); 155 } 156 157 /// <summary>Gets whether completion has been requested.</summary> 158 private bool CompletionRequested 159 { 160 // ValueLock not needed, but it's ok if it's held 161 get { return m_completionState != null && Volatile.Read(ref m_completionState.m_completionRequested); } 162 } 163 164 /// <summary>Sets that completion has been requested.</summary> RequestCompletion()165 private void RequestCompletion() 166 { 167 ContractAssertMonitorStatus(ValueLock, held: true); 168 EnsureCompletionStateInitialized().m_completionRequested = true; 169 } 170 171 /// <summary> 172 /// Cleans up state if and only if there's no processing currently happening 173 /// and no more to be done later. 174 /// </summary> CleanupStateIfCompletingAndQuiesced()175 private void CleanupStateIfCompletingAndQuiesced() 176 { 177 ContractAssertMonitorStatus(ValueLock, held: true); 178 if (ReadyToComplete) CompleteTaskAsync(); 179 } 180 181 /// <summary>Gets whether the pair is ready to complete.</summary> 182 private bool ReadyToComplete 183 { 184 get 185 { 186 ContractAssertMonitorStatus(ValueLock, held: true); 187 188 // We can only complete if completion has been requested and no processing is currently happening. 189 if (!CompletionRequested || m_processingCount != 0) return false; 190 191 // Now, only allow shutdown if an exception occurred or if there are no more tasks to process. 192 var cs = EnsureCompletionStateInitialized(); 193 return 194 (cs.m_exceptions != null && cs.m_exceptions.Count > 0) || 195 (m_concurrentTaskScheduler.m_tasks.IsEmpty && m_exclusiveTaskScheduler.m_tasks.IsEmpty); 196 } 197 } 198 199 /// <summary>Completes the completion task asynchronously.</summary> CompleteTaskAsync()200 private void CompleteTaskAsync() 201 { 202 Debug.Assert(ReadyToComplete, "The block must be ready to complete to be here."); 203 ContractAssertMonitorStatus(ValueLock, held: true); 204 205 // Ensure we only try to complete once, then schedule completion 206 // in order to escape held locks and the caller's context 207 var cs = EnsureCompletionStateInitialized(); 208 if (!cs.m_completionQueued) 209 { 210 cs.m_completionQueued = true; 211 ThreadPool.QueueUserWorkItem(state => 212 { 213 var localCs = (CompletionState)state; // don't use 'cs', as it'll force a closure 214 Debug.Assert(!localCs.Task.IsCompleted, "Completion should only happen once."); 215 216 var exceptions = localCs.m_exceptions; 217 bool success = (exceptions != null && exceptions.Count > 0) ? 218 localCs.TrySetException(exceptions) : 219 localCs.TrySetResult(default(VoidTaskResult)); 220 Debug.Assert(success, "Expected to complete completion task."); 221 }, cs); 222 } 223 } 224 225 /// <summary>Initiatites scheduler shutdown due to a worker task faulting..</summary> 226 /// <param name="faultedTask">The faulted worker task that's initiating the shutdown.</param> FaultWithTask(Task faultedTask)227 private void FaultWithTask(Task faultedTask) 228 { 229 Debug.Assert(faultedTask != null && faultedTask.IsFaulted && faultedTask.Exception.InnerExceptions.Count > 0, 230 "Needs a task in the faulted state and thus with exceptions."); 231 ContractAssertMonitorStatus(ValueLock, held: true); 232 233 // Store the faulted task's exceptions 234 var cs = EnsureCompletionStateInitialized(); 235 if (cs.m_exceptions == null) cs.m_exceptions = new LowLevelListWithIList<Exception>(); 236 cs.m_exceptions.AddRange(faultedTask.Exception.InnerExceptions); 237 238 // Now that we're doomed, request completion 239 RequestCompletion(); 240 } 241 242 /// <summary> 243 /// Gets a TaskScheduler that can be used to schedule tasks to this pair 244 /// that may run concurrently with other tasks on this pair. 245 /// </summary> 246 public TaskScheduler ConcurrentScheduler { get { return m_concurrentTaskScheduler; } } 247 /// <summary> 248 /// Gets a TaskScheduler that can be used to schedule tasks to this pair 249 /// that must run exclusively with regards to other tasks on this pair. 250 /// </summary> 251 public TaskScheduler ExclusiveScheduler { get { return m_exclusiveTaskScheduler; } } 252 253 /// <summary>Gets the number of tasks waiting to run concurrently.</summary> 254 /// <remarks>This does not take the necessary lock, as it's only called from under the debugger.</remarks> 255 [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] 256 private int ConcurrentTaskCountForDebugger { get { return m_concurrentTaskScheduler.m_tasks.Count; } } 257 258 /// <summary>Gets the number of tasks waiting to run exclusively.</summary> 259 /// <remarks>This does not take the necessary lock, as it's only called from under the debugger.</remarks> 260 [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] 261 private int ExclusiveTaskCountForDebugger { get { return m_exclusiveTaskScheduler.m_tasks.Count; } } 262 263 /// <summary>Notifies the pair that new work has arrived to be processed.</summary> 264 /// <param name="fairly">Whether tasks should be scheduled fairly with regards to other tasks.</param> 265 /// <remarks>Must only be called while holding the lock.</remarks> 266 [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] 267 [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals")] 268 [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] ProcessAsyncIfNecessary(bool fairly = false)269 private void ProcessAsyncIfNecessary(bool fairly = false) 270 { 271 ContractAssertMonitorStatus(ValueLock, held: true); 272 273 // If the current processing count is >= 0, we can potentially launch further processing. 274 if (m_processingCount >= 0) 275 { 276 // We snap whether there are any exclusive tasks or concurrent tasks waiting. 277 // (We grab the concurrent count below only once we know we need it.) 278 // With processing happening concurrent to this operation, this data may 279 // immediately be out of date, but it can only go from non-empty 280 // to empty and not the other way around. As such, this is safe, 281 // as worst case is we'll schedule an extra task when we didn't 282 // otherwise need to, and we'll just eat its overhead. 283 bool exclusiveTasksAreWaiting = !m_exclusiveTaskScheduler.m_tasks.IsEmpty; 284 285 // If there's no processing currently happening but there are waiting exclusive tasks, 286 // let's start processing those exclusive tasks. 287 Task processingTask = null; 288 if (m_processingCount == 0 && exclusiveTasksAreWaiting) 289 { 290 // Launch exclusive task processing 291 m_processingCount = EXCLUSIVE_PROCESSING_SENTINEL; // -1 292 try 293 { 294 processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessExclusiveTasks(), this, 295 default(CancellationToken), GetCreationOptionsForTask(fairly)); 296 processingTask.Start(m_underlyingTaskScheduler); 297 // When we call Start, if the underlying scheduler throws in QueueTask, TPL will fault the task and rethrow 298 // the exception. To deal with that, we need a reference to the task object, so that we can observe its exception. 299 // Hence, we separate creation and starting, so that we can store a reference to the task before we attempt QueueTask. 300 } 301 catch 302 { 303 m_processingCount = 0; 304 FaultWithTask(processingTask); 305 } 306 } 307 // If there are no waiting exclusive tasks, there are concurrent tasks, and we haven't reached our maximum 308 // concurrency level for processing, let's start processing more concurrent tasks. 309 else 310 { 311 int concurrentTasksWaitingCount = m_concurrentTaskScheduler.m_tasks.Count; 312 313 if (concurrentTasksWaitingCount > 0 && !exclusiveTasksAreWaiting && m_processingCount < m_maxConcurrencyLevel) 314 { 315 // Launch concurrent task processing, up to the allowed limit 316 for (int i = 0; i < concurrentTasksWaitingCount && m_processingCount < m_maxConcurrencyLevel; ++i) 317 { 318 ++m_processingCount; 319 try 320 { 321 processingTask = new Task(thisPair => ((ConcurrentExclusiveSchedulerPair)thisPair).ProcessConcurrentTasks(), this, 322 default(CancellationToken), GetCreationOptionsForTask(fairly)); 323 processingTask.Start(m_underlyingTaskScheduler); // See above logic for why we use new + Start rather than StartNew 324 } 325 catch 326 { 327 --m_processingCount; 328 FaultWithTask(processingTask); 329 } 330 } 331 } 332 } 333 334 // Check to see if all tasks have completed and if completion has been requested. 335 CleanupStateIfCompletingAndQuiesced(); 336 } 337 else Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing count must be the sentinel if it's not >= 0."); 338 } 339 340 /// <summary> 341 /// Processes exclusive tasks serially until either there are no more to process 342 /// or we've reached our user-specified maximum limit. 343 /// </summary> ProcessExclusiveTasks()344 private void ProcessExclusiveTasks() 345 { 346 Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "Processing exclusive tasks requires being in exclusive mode."); 347 Debug.Assert(!m_exclusiveTaskScheduler.m_tasks.IsEmpty, "Processing exclusive tasks requires tasks to be processed."); 348 ContractAssertMonitorStatus(ValueLock, held: false); 349 try 350 { 351 // Note that we're processing exclusive tasks on the current thread 352 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.NotCurrentlyProcessing, 353 "This thread should not yet be involved in this pair's processing."); 354 m_threadProcessingMode.Value = ProcessingMode.ProcessingExclusiveTask; 355 356 // Process up to the maximum number of items per task allowed 357 for (int i = 0; i < m_maxItemsPerTask; i++) 358 { 359 // Get the next available exclusive task. If we can't find one, bail. 360 Task exclusiveTask; 361 if (!m_exclusiveTaskScheduler.m_tasks.TryDequeue(out exclusiveTask)) break; 362 363 // Execute the task. If the scheduler was previously faulted, 364 // this task could have been faulted when it was queued; ignore such tasks. 365 if (!exclusiveTask.IsFaulted) m_exclusiveTaskScheduler.ExecuteTask(exclusiveTask); 366 } 367 } 368 finally 369 { 370 // We're no longer processing exclusive tasks on the current thread 371 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.ProcessingExclusiveTask, 372 "Somehow we ended up escaping exclusive mode."); 373 m_threadProcessingMode.Value = ProcessingMode.NotCurrentlyProcessing; 374 375 using (LockHolder.Hold(ValueLock)) 376 { 377 // When this task was launched, we tracked it by setting m_processingCount to WRITER_IN_PROGRESS. 378 // now reset it to 0. Then check to see whether there's more processing to be done. 379 // There might be more concurrent tasks available, for example, if concurrent tasks arrived 380 // after we exited the loop, or if we exited the loop while concurrent tasks were still 381 // available but we hit our maxItemsPerTask limit. 382 Debug.Assert(m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL, "The processing mode should not have deviated from exclusive."); 383 m_processingCount = 0; 384 ProcessAsyncIfNecessary(true); 385 } 386 } 387 } 388 389 /// <summary> 390 /// Processes concurrent tasks serially until either there are no more to process, 391 /// we've reached our user-specified maximum limit, or exclusive tasks have arrived. 392 /// </summary> ProcessConcurrentTasks()393 private void ProcessConcurrentTasks() 394 { 395 Debug.Assert(m_processingCount > 0, "Processing concurrent tasks requires us to be in concurrent mode."); 396 ContractAssertMonitorStatus(ValueLock, held: false); 397 try 398 { 399 // Note that we're processing concurrent tasks on the current thread 400 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.NotCurrentlyProcessing, 401 "This thread should not yet be involved in this pair's processing."); 402 m_threadProcessingMode.Value = ProcessingMode.ProcessingConcurrentTasks; 403 404 // Process up to the maximum number of items per task allowed 405 for (int i = 0; i < m_maxItemsPerTask; i++) 406 { 407 // Get the next available concurrent task. If we can't find one, bail. 408 Task concurrentTask; 409 if (!m_concurrentTaskScheduler.m_tasks.TryDequeue(out concurrentTask)) break; 410 411 // Execute the task. If the scheduler was previously faulted, 412 // this task could have been faulted when it was queued; ignore such tasks. 413 if (!concurrentTask.IsFaulted) m_concurrentTaskScheduler.ExecuteTask(concurrentTask); 414 415 // Now check to see if exclusive tasks have arrived; if any have, they take priority 416 // so we'll bail out here. Note that we could have checked this condition 417 // in the for loop's condition, but that could lead to extra overhead 418 // in the case where a concurrent task arrives, this task is launched, and then 419 // before entering the loop an exclusive task arrives. If we didn't execute at 420 // least one task, we would have spent all of the overhead to launch a 421 // task but with none of the benefit. There's of course also an inherent 422 // race here with regards to exclusive tasks arriving, and we're ok with 423 // executing one more concurrent task than we should before giving priority to exclusive tasks. 424 if (!m_exclusiveTaskScheduler.m_tasks.IsEmpty) break; 425 } 426 } 427 finally 428 { 429 // We're no longer processing concurrent tasks on the current thread 430 Debug.Assert(m_threadProcessingMode.Value == ProcessingMode.ProcessingConcurrentTasks, 431 "Somehow we ended up escaping concurrent mode."); 432 m_threadProcessingMode.Value = ProcessingMode.NotCurrentlyProcessing; 433 434 using (LockHolder.Hold(ValueLock)) 435 { 436 // When this task was launched, we tracked it with a positive processing count; 437 // decrement that count. Then check to see whether there's more processing to be done. 438 // There might be more concurrent tasks available, for example, if concurrent tasks arrived 439 // after we exited the loop, or if we exited the loop while concurrent tasks were still 440 // available but we hit our maxItemsPerTask limit. 441 Debug.Assert(m_processingCount > 0, "The procesing mode should not have deviated from concurrent."); 442 if (m_processingCount > 0) --m_processingCount; 443 ProcessAsyncIfNecessary(true); 444 } 445 } 446 } 447 448 /// <summary> 449 /// Holder for lazily-initialized state about the completion of a scheduler pair. 450 /// Completion is only triggered either by rare exceptional conditions or by 451 /// the user calling Complete, and as such we only lazily initialize this 452 /// state in one of those conditions or if the user explicitly asks for 453 /// the Completion. 454 /// </summary> 455 [SuppressMessage("Microsoft.Performance", "CA1812:AvoidUninstantiatedInternalClasses")] 456 private sealed class CompletionState : TaskCompletionSource<VoidTaskResult> 457 { 458 /// <summary>Whether the scheduler has had completion requested.</summary> 459 /// <remarks>This variable is not volatile, so to gurantee safe reading reads, Volatile.Read is used in TryExecuteTaskInline.</remarks> 460 internal bool m_completionRequested; 461 /// <summary>Whether completion processing has been queued.</summary> 462 internal bool m_completionQueued; 463 /// <summary>Unrecoverable exceptions incurred while processing.</summary> 464 internal LowLevelListWithIList<Exception> m_exceptions; 465 } 466 467 /// <summary> 468 /// A scheduler shim used to queue tasks to the pair and execute those tasks on request of the pair. 469 /// </summary> 470 [DebuggerDisplay("Count={CountForDebugger}, MaxConcurrencyLevel={m_maxConcurrencyLevel}, Id={Id}")] 471 [DebuggerTypeProxy(typeof(ConcurrentExclusiveTaskScheduler.DebugView))] 472 private sealed class ConcurrentExclusiveTaskScheduler : TaskScheduler 473 { 474 /// <summary>Cached delegate for invoking TryExecuteTaskShim.</summary> 475 private static readonly Func<object, bool> s_tryExecuteTaskShim = new Func<object, bool>(TryExecuteTaskShim); 476 /// <summary>The parent pair.</summary> 477 private readonly ConcurrentExclusiveSchedulerPair m_pair; 478 /// <summary>The maximum concurrency level for the scheduler.</summary> 479 private readonly int m_maxConcurrencyLevel; 480 /// <summary>The processing mode of this scheduler, exclusive or concurrent.</summary> 481 private readonly ProcessingMode m_processingMode; 482 /// <summary>Gets the queue of tasks for this scheduler.</summary> 483 internal readonly IProducerConsumerQueue<Task> m_tasks; 484 485 /// <summary>Initializes the scheduler.</summary> 486 /// <param name="pair">The parent pair.</param> 487 /// <param name="maxConcurrencyLevel">The maximum degree of concurrency this scheduler may use.</param> 488 /// <param name="processingMode">The processing mode of this scheduler.</param> ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair pair, int maxConcurrencyLevel, ProcessingMode processingMode)489 internal ConcurrentExclusiveTaskScheduler(ConcurrentExclusiveSchedulerPair pair, int maxConcurrencyLevel, ProcessingMode processingMode) 490 { 491 Debug.Assert(pair != null, "Scheduler must be associated with a valid pair."); 492 Debug.Assert(processingMode == ProcessingMode.ProcessingConcurrentTasks || processingMode == ProcessingMode.ProcessingExclusiveTask, 493 "Scheduler must be for concurrent or exclusive processing."); 494 Debug.Assert( 495 (processingMode == ProcessingMode.ProcessingConcurrentTasks && (maxConcurrencyLevel >= 1 || maxConcurrencyLevel == UNLIMITED_PROCESSING)) || 496 (processingMode == ProcessingMode.ProcessingExclusiveTask && maxConcurrencyLevel == 1), 497 "If we're in concurrent mode, our concurrency level should be positive or unlimited. If exclusive, it should be 1."); 498 499 m_pair = pair; 500 m_maxConcurrencyLevel = maxConcurrencyLevel; 501 m_processingMode = processingMode; 502 m_tasks = (processingMode == ProcessingMode.ProcessingExclusiveTask) ? 503 (IProducerConsumerQueue<Task>)new SingleProducerSingleConsumerQueue<Task>() : 504 (IProducerConsumerQueue<Task>)new MultiProducerMultiConsumerQueue<Task>(); 505 } 506 507 /// <summary>Gets the maximum concurrency level this scheduler is able to support.</summary> 508 public override int MaximumConcurrencyLevel { get { return m_maxConcurrencyLevel; } } 509 510 /// <summary>Queues a task to the scheduler.</summary> 511 /// <param name="task">The task to be queued.</param> QueueTask(Task task)512 protected internal override void QueueTask(Task task) 513 { 514 Debug.Assert(task != null, "Infrastructure should have provided a non-null task."); 515 using (LockHolder.Hold(m_pair.ValueLock)) 516 { 517 // If the scheduler has already had completion requested, no new work is allowed to be scheduled 518 if (m_pair.CompletionRequested) throw new InvalidOperationException(GetType().ToString()); 519 520 // Queue the task, and then let the pair know that more work is now available to be scheduled 521 m_tasks.Enqueue(task); 522 m_pair.ProcessAsyncIfNecessary(); 523 } 524 } 525 526 /// <summary>Executes a task on this scheduler.</summary> 527 /// <param name="task">The task to be executed.</param> ExecuteTask(Task task)528 internal void ExecuteTask(Task task) 529 { 530 Debug.Assert(task != null, "Infrastructure should have provided a non-null task."); 531 base.TryExecuteTask(task); 532 } 533 534 /// <summary>Tries to execute the task synchronously on this scheduler.</summary> 535 /// <param name="task">The task to execute.</param> 536 /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued to the scheduler.</param> 537 /// <returns>true if the task could be executed; otherwise, false.</returns> TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)538 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 539 { 540 Debug.Assert(task != null, "Infrastructure should have provided a non-null task."); 541 542 // If the scheduler has had completion requested, no new work is allowed to be scheduled. 543 // A non-locked read on m_completionRequested (in CompletionRequested) is acceptable here because: 544 // a) we don't need to be exact... a Complete call could come in later in the function anyway 545 // b) this is only a fast path escape hatch. To actually inline the task, 546 // we need to be inside of an already executing task, and in such a case, 547 // while completion may have been requested, we can't have shutdown yet. 548 if (!taskWasPreviouslyQueued && m_pair.CompletionRequested) return false; 549 550 // We know the implementation of the default scheduler and how it will behave. 551 // As it's the most common underlying scheduler, we optimize for it. 552 bool isDefaultScheduler = m_pair.m_underlyingTaskScheduler == TaskScheduler.Default; 553 554 // If we're targeting the default scheduler and taskWasPreviouslyQueued is true, 555 // we know that the default scheduler will only allow it to be inlined 556 // if we're on a thread pool thread (but it won't always allow it in that case, 557 // since it'll only allow inlining if it can find the task in the local queue). 558 // As such, if we're not on a thread pool thread, we know for sure the 559 // task won't be inlined, so let's not even try. 560 if (isDefaultScheduler && taskWasPreviouslyQueued && !ThreadPool.IsThreadPoolThread) 561 { 562 return false; 563 } 564 else 565 { 566 // If a task is already running on this thread, allow inline execution to proceed. 567 // If there's already a task from this scheduler running on the current thread, we know it's safe 568 // to run this task, in effect temporarily taking that task's count allocation. 569 if (m_pair.m_threadProcessingMode.Value == m_processingMode) 570 { 571 // If we're targeting the default scheduler and taskWasPreviouslyQueued is false, 572 // we know the default scheduler will allow it, so we can just execute it here. 573 // Otherwise, delegate to the target scheduler's inlining. 574 return (isDefaultScheduler && !taskWasPreviouslyQueued) ? 575 TryExecuteTask(task) : 576 TryExecuteTaskInlineOnTargetScheduler(task); 577 } 578 } 579 580 // We're not in the context of a task already executing on this scheduler. Bail. 581 return false; 582 } 583 584 /// <summary> 585 /// Implements a reasonable approximation for TryExecuteTaskInline on the underlying scheduler, 586 /// which we can't call directly on the underlying scheduler. 587 /// </summary> 588 /// <param name="task">The task to execute inline if possible.</param> 589 /// <returns>true if the task was inlined successfully; otherwise, false.</returns> 590 [SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignored")] TryExecuteTaskInlineOnTargetScheduler(Task task)591 private bool TryExecuteTaskInlineOnTargetScheduler(Task task) 592 { 593 // We'd like to simply call TryExecuteTaskInline here, but we can't. 594 // As there's no built-in API for this, a workaround is to create a new task that, 595 // when executed, will simply call TryExecuteTask to run the real task, and then 596 // we run our new shim task synchronously on the target scheduler. If all goes well, 597 // our synchronous invocation will succeed in running the shim task on the current thread, 598 // which will in turn run the real task on the current thread. If the scheduler 599 // doesn't allow that execution, RunSynchronously will block until the underlying scheduler 600 // is able to invoke the task, which might account for an additional but unavoidable delay. 601 // Once it's done, we can return whether the task executed by returning the 602 // shim task's Result, which is in turn the result of TryExecuteTask. 603 var t = new Task<bool>(s_tryExecuteTaskShim, Tuple.Create(this, task)); 604 try 605 { 606 t.RunSynchronously(m_pair.m_underlyingTaskScheduler); 607 return t.Result; 608 } 609 catch 610 { 611 Debug.Assert(t.IsFaulted, "Task should be faulted due to the scheduler faulting it and throwing the exception."); 612 var ignored = t.Exception; 613 throw; 614 } 615 } 616 617 /// <summary>Shim used to invoke this.TryExecuteTask(task).</summary> 618 /// <param name="state">A tuple of the ConcurrentExclusiveTaskScheduler and the task to execute.</param> 619 /// <returns>true if the task was successfully inlined; otherwise, false.</returns> 620 /// <remarks> 621 /// This method is separated out not because of performance reasons but so that 622 /// the SecuritySafeCritical attribute may be employed. 623 /// </remarks> TryExecuteTaskShim(object state)624 private static bool TryExecuteTaskShim(object state) 625 { 626 var tuple = (Tuple<ConcurrentExclusiveTaskScheduler, Task>)state; 627 return tuple.Item1.TryExecuteTask(tuple.Item2); 628 } 629 630 /// <summary>Gets for debugging purposes the tasks scheduled to this scheduler.</summary> 631 /// <returns>An enumerable of the tasks queued.</returns> GetScheduledTasks()632 protected override IEnumerable<Task> GetScheduledTasks() { return m_tasks; } 633 634 /// <summary>Gets the number of tasks queued to this scheduler.</summary> 635 [SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode")] 636 private int CountForDebugger { get { return m_tasks.Count; } } 637 638 /// <summary>Provides a debug view for ConcurrentExclusiveTaskScheduler.</summary> 639 private sealed class DebugView 640 { 641 /// <summary>The scheduler being debugged.</summary> 642 private readonly ConcurrentExclusiveTaskScheduler m_taskScheduler; 643 644 /// <summary>Initializes the debug view.</summary> 645 /// <param name="scheduler">The scheduler being debugged.</param> DebugView(ConcurrentExclusiveTaskScheduler scheduler)646 public DebugView(ConcurrentExclusiveTaskScheduler scheduler) 647 { 648 Debug.Assert(scheduler != null, "Need a scheduler with which to construct the debug view."); 649 m_taskScheduler = scheduler; 650 } 651 652 /// <summary>Gets this pair's maximum allowed concurrency level.</summary> 653 public int MaximumConcurrencyLevel { get { return m_taskScheduler.m_maxConcurrencyLevel; } } 654 /// <summary>Gets the tasks scheduled to this scheduler.</summary> 655 public IEnumerable<Task> ScheduledTasks { get { return m_taskScheduler.m_tasks; } } 656 /// <summary>Gets the scheduler pair with which this scheduler is associated.</summary> 657 public ConcurrentExclusiveSchedulerPair SchedulerPair { get { return m_taskScheduler.m_pair; } } 658 } 659 } 660 661 /// <summary>Provides a debug view for ConcurrentExclusiveSchedulerPair.</summary> 662 private sealed class DebugView 663 { 664 /// <summary>The pair being debugged.</summary> 665 private readonly ConcurrentExclusiveSchedulerPair m_pair; 666 667 /// <summary>Initializes the debug view.</summary> 668 /// <param name="pair">The pair being debugged.</param> DebugView(ConcurrentExclusiveSchedulerPair pair)669 public DebugView(ConcurrentExclusiveSchedulerPair pair) 670 { 671 Debug.Assert(pair != null, "Need a pair with which to construct the debug view."); 672 m_pair = pair; 673 } 674 675 /// <summary>Gets a representation of the execution state of the pair.</summary> 676 public ProcessingMode Mode { get { return m_pair.ModeForDebugger; } } 677 /// <summary>Gets the number of tasks waiting to run exclusively.</summary> 678 public IEnumerable<Task> ScheduledExclusive { get { return m_pair.m_exclusiveTaskScheduler.m_tasks; } } 679 /// <summary>Gets the number of tasks waiting to run concurrently.</summary> 680 public IEnumerable<Task> ScheduledConcurrent { get { return m_pair.m_concurrentTaskScheduler.m_tasks; } } 681 /// <summary>Gets the number of tasks currently being executed.</summary> 682 public int CurrentlyExecutingTaskCount 683 { 684 get { return (m_pair.m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) ? 1 : m_pair.m_processingCount; } 685 } 686 /// <summary>Gets the underlying task scheduler that actually executes the tasks.</summary> 687 public TaskScheduler TargetScheduler { get { return m_pair.m_underlyingTaskScheduler; } } 688 } 689 690 /// <summary>Gets an enumeration for debugging that represents the current state of the scheduler pair.</summary> 691 /// <remarks>This is only for debugging. It does not take the necessary locks to be useful for runtime usage.</remarks> 692 private ProcessingMode ModeForDebugger 693 { 694 get 695 { 696 // If our completion task is done, so are we. 697 if (m_completionState != null && m_completionState.Task.IsCompleted) return ProcessingMode.Completed; 698 699 // Otherwise, summarize our current state. 700 var mode = ProcessingMode.NotCurrentlyProcessing; 701 if (m_processingCount == EXCLUSIVE_PROCESSING_SENTINEL) mode |= ProcessingMode.ProcessingExclusiveTask; 702 if (m_processingCount >= 1) mode |= ProcessingMode.ProcessingConcurrentTasks; 703 if (CompletionRequested) mode |= ProcessingMode.Completing; 704 return mode; 705 } 706 } 707 708 /// <summary>Asserts that a given Lock object is either held or not held.</summary> 709 /// <param name="syncObj">The Lock object to check.</param> 710 /// <param name="held">Whether we want to assert that it's currently held or not held.</param> 711 [Conditional("DEBUG")] ContractAssertMonitorStatus(Lock syncObj, bool held)712 private static void ContractAssertMonitorStatus(Lock syncObj, bool held) 713 { 714 Debug.Assert(syncObj != null, "The Lock object to check must be provided."); 715 Debug.Assert(syncObj.IsAcquired == held, "The locking scheme was not correctly followed."); 716 } 717 718 /// <summary>Gets the options to use for tasks.</summary> 719 /// <param name="isReplacementReplica">If this task is being created to replace another.</param> 720 /// <remarks> 721 /// These options should be used for all tasks that have the potential to run user code or 722 /// that are repeatedly spawned and thus need a modicum of fair treatment. 723 /// </remarks> 724 /// <returns>The options to use.</returns> GetCreationOptionsForTask(bool isReplacementReplica = false)725 internal static TaskCreationOptions GetCreationOptionsForTask(bool isReplacementReplica = false) 726 { 727 TaskCreationOptions options = 728 TaskCreationOptions.DenyChildAttach; 729 730 if (isReplacementReplica) options |= TaskCreationOptions.PreferFairness; 731 return options; 732 } 733 734 /// <summary>Provides an enumeration that represents the current state of the scheduler pair.</summary> 735 [Flags] 736 internal enum ProcessingMode : byte 737 { 738 /// <summary>The scheduler pair is currently dormant, with no work scheduled.</summary> 739 NotCurrentlyProcessing = 0x0, 740 /// <summary>The scheduler pair has queued processing for exclusive tasks.</summary> 741 ProcessingExclusiveTask = 0x1, 742 /// <summary>The scheduler pair has queued processing for concurrent tasks.</summary> 743 ProcessingConcurrentTasks = 0x2, 744 /// <summary>Completion has been requested.</summary> 745 Completing = 0x4, 746 /// <summary>The scheduler pair is finished processing.</summary> 747 Completed = 0x8 748 } 749 } 750 } 751