1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // A helper class that contains parallel versions of various looping constructs. This 7 // internally uses the task parallel library, but takes care to expose very little 8 // evidence of this infrastructure being used. 9 // 10 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 11 12 using System.Collections; 13 using System.Collections.Generic; 14 using System.Collections.Concurrent; 15 using System.Collections.ObjectModel; 16 using System.Runtime.ExceptionServices; 17 using System.Diagnostics.Private; 18 19 namespace System.Threading.Tasks 20 { 21 /// <summary> 22 /// Stores options that configure the operation of methods on the 23 /// <see cref="T:System.Threading.Tasks.Parallel">Parallel</see> class. 24 /// </summary> 25 /// <remarks> 26 /// By default, methods on the Parallel class attempt to utilize all available processors, are non-cancelable, and target 27 /// the default TaskScheduler (TaskScheduler.Default). <see cref="ParallelOptions"/> enables 28 /// overriding these defaults. 29 /// </remarks> 30 public class ParallelOptions 31 { 32 private TaskScheduler _scheduler; 33 private int _maxDegreeOfParallelism; 34 private CancellationToken _cancellationToken; 35 36 /// <summary> 37 /// Initializes a new instance of the <see cref="ParallelOptions"/> class. 38 /// </summary> 39 /// <remarks> 40 /// This constructor initializes the instance with default values. <see cref="MaxDegreeOfParallelism"/> 41 /// is initialized to -1, signifying that there is no upper bound set on how much parallelism should 42 /// be employed. <see cref="CancellationToken"/> is initialized to a non-cancelable token, 43 /// and <see cref="TaskScheduler"/> is initialized to the default scheduler (TaskScheduler.Default). 44 /// All of these defaults may be overwritten using the property set accessors on the instance. 45 /// </remarks> ParallelOptions()46 public ParallelOptions() 47 { 48 _scheduler = TaskScheduler.Default; 49 _maxDegreeOfParallelism = -1; 50 _cancellationToken = CancellationToken.None; 51 } 52 53 /// <summary> 54 /// Gets or sets the <see cref="T:System.Threading.Tasks.TaskScheduler">TaskScheduler</see> 55 /// associated with this <see cref="ParallelOptions"/> instance. Setting this property to null 56 /// indicates that the current scheduler should be used. 57 /// </summary> 58 public TaskScheduler TaskScheduler 59 { 60 get { return _scheduler; } 61 set { _scheduler = value; } 62 } 63 64 // Convenience property used by TPL logic 65 internal TaskScheduler EffectiveTaskScheduler 66 { 67 get 68 { 69 if (_scheduler == null) return TaskScheduler.Current; 70 else return _scheduler; 71 } 72 } 73 74 /// <summary> 75 /// Gets or sets the maximum degree of parallelism enabled by this ParallelOptions instance. 76 /// </summary> 77 /// <remarks> 78 /// The <see cref="MaxDegreeOfParallelism"/> limits the number of concurrent operations run by <see 79 /// cref="T:System.Threading.Tasks.Parallel">Parallel</see> method calls that are passed this 80 /// ParallelOptions instance to the set value, if it is positive. If <see 81 /// cref="MaxDegreeOfParallelism"/> is -1, then there is no limit placed on the number of concurrently 82 /// running operations. 83 /// </remarks> 84 /// <exception cref="T:System.ArgumentOutOfRangeException"> 85 /// The exception that is thrown when this <see cref="MaxDegreeOfParallelism"/> is set to 0 or some 86 /// value less than -1. 87 /// </exception> 88 public int MaxDegreeOfParallelism 89 { 90 get { return _maxDegreeOfParallelism; } 91 set 92 { 93 if ((value == 0) || (value < -1)) 94 throw new ArgumentOutOfRangeException(nameof(MaxDegreeOfParallelism)); 95 _maxDegreeOfParallelism = value; 96 } 97 } 98 99 /// <summary> 100 /// Gets or sets the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> 101 /// associated with this <see cref="ParallelOptions"/> instance. 102 /// </summary> 103 /// <remarks> 104 /// Providing a <see cref="T:System.Threading.CancellationToken">CancellationToken</see> 105 /// to a <see cref="T:System.Threading.Tasks.Parallel">Parallel</see> method enables the operation to be 106 /// exited early. Code external to the operation may cancel the token, and if the operation observes the 107 /// token being set, it may exit early by throwing an 108 /// <see cref="T:System.OperationCanceledException"/>. 109 /// </remarks> 110 public CancellationToken CancellationToken 111 { 112 get { return _cancellationToken; } 113 set { _cancellationToken = value; } 114 } 115 116 internal int EffectiveMaxConcurrencyLevel 117 { 118 get 119 { 120 int rval = MaxDegreeOfParallelism; 121 int schedulerMax = EffectiveTaskScheduler.MaximumConcurrencyLevel; 122 if ((schedulerMax > 0) && (schedulerMax != Int32.MaxValue)) 123 { 124 rval = (rval == -1) ? schedulerMax : Math.Min(schedulerMax, rval); 125 } 126 return rval; 127 } 128 } 129 } // class ParallelOptions 130 131 /// <summary> 132 /// Provides support for parallel loops and regions. 133 /// </summary> 134 /// <remarks> 135 /// The <see cref="T:System.Threading.Tasks.Parallel"/> class provides library-based data parallel replacements 136 /// for common operations such as for loops, for each loops, and execution of a set of statements. 137 /// </remarks> 138 public static class Parallel 139 { 140 // static counter for generating unique Fork/Join Context IDs to be used in ETW events 141 internal static int s_forkJoinContextID; 142 143 // We use a stride for loops to amortize the frequency of interlocked operations. 144 internal const int DEFAULT_LOOP_STRIDE = 16; 145 146 // Static variable to hold default parallel options 147 internal static readonly ParallelOptions s_defaultParallelOptions = new ParallelOptions(); 148 149 /// <summary> 150 /// Executes each of the provided actions, possibly in parallel. 151 /// </summary> 152 /// <param name="actions">An array of <see cref="T:System.Action">Actions</see> to execute.</param> 153 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 154 /// <paramref name="actions"/> argument is null.</exception> 155 /// <exception cref="T:System.ArgumentException">The exception that is thrown when the 156 /// <paramref name="actions"/> array contains a null element.</exception> 157 /// <exception cref="T:System.AggregateException">The exception that is thrown when any 158 /// action in the <paramref name="actions"/> array throws an exception.</exception> 159 /// <remarks> 160 /// This method can be used to execute a set of operations, potentially in parallel. 161 /// No guarantees are made about the order in which the operations execute or whether 162 /// they execute in parallel. This method does not return until each of the 163 /// provided operations has completed, regardless of whether completion 164 /// occurs due to normal or exceptional termination. 165 /// </remarks> Invoke(params Action[] actions)166 public static void Invoke(params Action[] actions) 167 { 168 Invoke(s_defaultParallelOptions, actions); 169 } 170 171 /// <summary> 172 /// Executes each of the provided actions, possibly in parallel. 173 /// </summary> 174 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 175 /// instance that configures the behavior of this operation.</param> 176 /// <param name="actions">An array of <see cref="T:System.Action">Actions</see> to execute.</param> 177 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 178 /// <paramref name="actions"/> argument is null.</exception> 179 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 180 /// <paramref name="parallelOptions"/> argument is null.</exception> 181 /// <exception cref="T:System.ArgumentException">The exception that is thrown when the 182 /// <paramref name="actions"/> array contains a null element.</exception> 183 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when 184 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 185 /// <paramref name="parallelOptions"/> is set.</exception> 186 /// <exception cref="T:System.AggregateException">The exception that is thrown when any 187 /// action in the <paramref name="actions"/> array throws an exception.</exception> 188 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 189 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 190 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 191 /// <paramref name="parallelOptions"/> has been disposed.</exception> 192 /// <remarks> 193 /// This method can be used to execute a set of operations, potentially in parallel. 194 /// No guarantees are made about the order in which the operations execute or whether 195 /// the they execute in parallel. This method does not return until each of the 196 /// provided operations has completed, regardless of whether completion 197 /// occurs due to normal or exceptional termination. 198 /// </remarks> Invoke(ParallelOptions parallelOptions, params Action[] actions)199 public static void Invoke(ParallelOptions parallelOptions, params Action[] actions) 200 { 201 if (actions == null) 202 { 203 throw new ArgumentNullException(nameof(actions)); 204 } 205 if (parallelOptions == null) 206 { 207 throw new ArgumentNullException(nameof(parallelOptions)); 208 } 209 210 // On desktop, we throw an ODE if we're passed a disposed CancellationToken. 211 // Here, CancellationToken.ThrowIfSourceDisposed() is not exposed. 212 // This is benign, because we'll end up throwing ODE when we register 213 // with the token later. 214 215 // Quit early if we're already canceled -- avoid a bunch of work. 216 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 217 218 // We must validate that the actions array contains no null elements, and also 219 // make a defensive copy of the actions array. 220 Action[] actionsCopy = new Action[actions.Length]; 221 for (int i = 0; i < actionsCopy.Length; i++) 222 { 223 actionsCopy[i] = actions[i]; 224 if (actionsCopy[i] == null) 225 { 226 throw new ArgumentException(SR.Parallel_Invoke_ActionNull); 227 } 228 } 229 230 // ETW event for Parallel Invoke Begin 231 int forkJoinContextID = 0; 232 if (ParallelEtwProvider.Log.IsEnabled()) 233 { 234 forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); 235 ParallelEtwProvider.Log.ParallelInvokeBegin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, 236 forkJoinContextID, ParallelEtwProvider.ForkJoinOperationType.ParallelInvoke, 237 actionsCopy.Length); 238 } 239 240 #if DEBUG 241 actions = null; // Ensure we don't accidentally use this below. 242 #endif 243 244 // If we have no work to do, we are done. 245 if (actionsCopy.Length < 1) return; 246 247 // In the algorithm below, if the number of actions is greater than this, we automatically 248 // use Parallel.For() to handle the actions, rather than the Task-per-Action strategy. 249 const int SMALL_ACTIONCOUNT_LIMIT = 10; 250 251 try 252 { 253 // If we've gotten this far, it's time to process the actions. 254 255 // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism: 256 if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) || 257 (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)) 258 { 259 // Used to hold any exceptions encountered during action processing 260 ConcurrentQueue<Exception> exceptionQ = null; // will be lazily initialized if necessary 261 262 // Launch a task replicator to handle the execution of all actions. 263 // This allows us to use as many cores as are available, and no more. 264 // The exception to this rule is that, in the case of a blocked action, 265 // the ThreadPool may inject extra threads, which means extra tasks can run. 266 int actionIndex = 0; 267 268 try 269 { 270 TaskReplicator.Run( 271 (ref object state, int timeout, out bool replicationDelegateYieldedBeforeCompletion) => 272 { 273 // In this particular case, we do not participate in cooperative multitasking: 274 replicationDelegateYieldedBeforeCompletion = false; 275 276 // Each for-task will pull an action at a time from the list 277 int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1 278 while (myIndex <= actionsCopy.Length) 279 { 280 // Catch and store any exceptions. If we don't catch them, the self-replicating 281 // task will exit, and that may cause other SR-tasks to exit. 282 // And (absent cancellation) we want all actions to execute. 283 try 284 { 285 actionsCopy[myIndex - 1](); 286 } 287 catch (Exception e) 288 { 289 LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); 290 exceptionQ.Enqueue(e); 291 } 292 293 // Check for cancellation. If it is encountered, then exit the delegate. 294 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 295 296 // You're still in the game. Grab your next action index. 297 myIndex = Interlocked.Increment(ref actionIndex); 298 } 299 }, 300 parallelOptions, 301 stopOnFirstFailure: false); 302 } 303 catch (Exception e) 304 { 305 LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); 306 307 // Since we're consuming all action exceptions, there are very few reasons that 308 // we would see an exception here. Two that come to mind: 309 // (1) An OCE thrown by one or more actions (AggregateException thrown) 310 // (2) An exception thrown from the TaskReplicator constructor 311 // (regular exception thrown). 312 // We'll need to cover them both. 313 314 ObjectDisposedException ode = e as ObjectDisposedException; 315 if (ode != null) 316 throw; 317 318 AggregateException ae = e as AggregateException; 319 if (ae != null) 320 { 321 // Strip off outer container of an AggregateException, because downstream 322 // logic needs OCEs to be at the top level. 323 foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc); 324 } 325 else 326 { 327 exceptionQ.Enqueue(e); 328 } 329 } 330 331 // If we have encountered any exceptions, then throw. 332 if ((exceptionQ != null) && (exceptionQ.Count > 0)) 333 { 334 ThrowSingleCancellationExceptionOrOtherException(exceptionQ, parallelOptions.CancellationToken, 335 new AggregateException(exceptionQ)); 336 } 337 } 338 else // This is more efficient for a small number of actions and no DOP support: 339 { 340 // Initialize our array of tasks, one per action. 341 Task[] tasks = new Task[actionsCopy.Length]; 342 343 // One more check before we begin... 344 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 345 346 // Invoke all actions as tasks. Queue N-1 of them, and run 1 synchronously. 347 for (int i = 1; i < tasks.Length; i++) 348 { 349 tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None, 350 parallelOptions.EffectiveTaskScheduler); 351 } 352 tasks[0] = new Task(actionsCopy[0], parallelOptions.CancellationToken, TaskCreationOptions.None); 353 tasks[0].RunSynchronously(parallelOptions.EffectiveTaskScheduler); 354 355 // Now wait for the tasks to complete. This will not unblock until all of 356 // them complete, and it will throw an exception if one or more of them also 357 // threw an exception. We let such exceptions go completely unhandled. 358 try 359 { 360 Task.WaitAll(tasks); 361 } 362 catch (AggregateException aggExp) 363 { 364 // see if we can combine it into a single OCE. If not propagate the original exception 365 ThrowSingleCancellationExceptionOrOtherException(aggExp.InnerExceptions, parallelOptions.CancellationToken, aggExp); 366 } 367 } 368 } 369 finally 370 { 371 // ETW event for Parallel Invoke End 372 if (ParallelEtwProvider.Log.IsEnabled()) 373 { 374 ParallelEtwProvider.Log.ParallelInvokeEnd(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 375 } 376 } 377 } 378 379 /// <summary> 380 /// Executes a for loop in which iterations may run in parallel. 381 /// </summary> 382 /// <param name="fromInclusive">The start index, inclusive.</param> 383 /// <param name="toExclusive">The end index, exclusive.</param> 384 /// <param name="body">The delegate that is invoked once per iteration.</param> 385 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 386 /// argument is null.</exception> 387 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 388 /// thrown from one of the specified delegates.</exception> 389 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 390 /// that contains information on what portion of the loop completed.</returns> 391 /// <remarks> 392 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 393 /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter. 394 /// </remarks> For(int fromInclusive, int toExclusive, Action<int> body)395 public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body) 396 { 397 if (body == null) 398 { 399 throw new ArgumentNullException(nameof(body)); 400 } 401 402 return ForWorker<object>( 403 fromInclusive, toExclusive, 404 s_defaultParallelOptions, 405 body, null, null, null, null); 406 } 407 408 /// <summary> 409 /// Executes a for loop in which iterations may run in parallel. 410 /// </summary> 411 /// <param name="fromInclusive">The start index, inclusive.</param> 412 /// <param name="toExclusive">The end index, exclusive.</param> 413 /// <param name="body">The delegate that is invoked once per iteration.</param> 414 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 415 /// argument is null.</exception> 416 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 417 /// thrown from one of the specified delegates.</exception> 418 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 419 /// that contains information on what portion of the loop completed.</returns> 420 /// <remarks> 421 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 422 /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter. 423 /// </remarks> For(long fromInclusive, long toExclusive, Action<long> body)424 public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long> body) 425 { 426 if (body == null) 427 { 428 throw new ArgumentNullException(nameof(body)); 429 } 430 431 return ForWorker64<object>( 432 fromInclusive, toExclusive, s_defaultParallelOptions, 433 body, null, null, null, null); 434 } 435 436 /// <summary> 437 /// Executes a for loop in which iterations may run in parallel. 438 /// </summary> 439 /// <param name="fromInclusive">The start index, inclusive.</param> 440 /// <param name="toExclusive">The end index, exclusive.</param> 441 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 442 /// instance that configures the behavior of this operation.</param> 443 /// <param name="body">The delegate that is invoked once per iteration.</param> 444 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 445 /// argument is null.</exception> 446 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 447 /// <paramref name="parallelOptions"/> argument is null.</exception> 448 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 449 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 450 /// argument is set.</exception> 451 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 452 /// thrown from one of the specified delegates.</exception> 453 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 454 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 455 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 456 /// <paramref name="parallelOptions"/> has been disposed.</exception> 457 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 458 /// that contains information on what portion of the loop completed.</returns> 459 /// <remarks> 460 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 461 /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int32) as a parameter. 462 /// </remarks> For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body)463 public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body) 464 { 465 if (body == null) 466 { 467 throw new ArgumentNullException(nameof(body)); 468 } 469 if (parallelOptions == null) 470 { 471 throw new ArgumentNullException(nameof(parallelOptions)); 472 } 473 474 return ForWorker<object>( 475 fromInclusive, toExclusive, parallelOptions, 476 body, null, null, null, null); 477 } 478 479 /// <summary> 480 /// Executes a for loop in which iterations may run in parallel. 481 /// </summary> 482 /// <param name="fromInclusive">The start index, inclusive.</param> 483 /// <param name="toExclusive">The end index, exclusive.</param> 484 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 485 /// instance that configures the behavior of this operation.</param> 486 /// <param name="body">The delegate that is invoked once per iteration.</param> 487 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 488 /// argument is null.</exception> 489 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 490 /// <paramref name="parallelOptions"/> argument is null.</exception> 491 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 492 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 493 /// argument is set.</exception> 494 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 495 /// thrown from one of the specified delegates.</exception> 496 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 497 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 498 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 499 /// <paramref name="parallelOptions"/> has been disposed.</exception> 500 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 501 /// that contains information on what portion of the loop completed.</returns> 502 /// <remarks> 503 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 504 /// [fromInclusive, toExclusive). It is provided with the iteration count (an Int64) as a parameter. 505 /// </remarks> For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body)506 public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body) 507 { 508 if (body == null) 509 { 510 throw new ArgumentNullException(nameof(body)); 511 } 512 if (parallelOptions == null) 513 { 514 throw new ArgumentNullException(nameof(parallelOptions)); 515 } 516 517 return ForWorker64<object>( 518 fromInclusive, toExclusive, parallelOptions, 519 body, null, null, null, null); 520 } 521 522 /// <summary> 523 /// Executes a for loop in which iterations may run in parallel. 524 /// </summary> 525 /// <param name="fromInclusive">The start index, inclusive.</param> 526 /// <param name="toExclusive">The end index, exclusive.</param> 527 /// <param name="body">The delegate that is invoked once per iteration.</param> 528 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 529 /// argument is null.</exception> 530 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 531 /// thrown from one of the specified delegates.</exception> 532 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 533 /// that contains information on what portion of the loop completed.</returns> 534 /// <remarks> 535 /// <para> 536 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 537 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), 538 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 539 /// used to break out of the loop prematurely. 540 /// </para> 541 /// <para> 542 /// Calling <see cref="System.Threading.Tasks.ParallelLoopState.Break()">ParallelLoopState.Break()</see> 543 /// informs the For operation that iterations after the current one need not 544 /// execute. However, all iterations before the current one will still need to be executed if they have not already. 545 /// Therefore, calling Break is similar to using a break operation within a 546 /// conventional for loop in a language like C#, but it is not a perfect substitute: for example, there is no guarantee that iterations 547 /// after the current one will definitely not execute. 548 /// </para> 549 /// <para> 550 /// If executing all iterations before the current one is not necessary, 551 /// <see cref="System.Threading.Tasks.ParallelLoopState.Stop()">ParallelLoopState.Stop()</see> 552 /// should be preferred to using Break. Calling Stop informs the For loop that it may abandon all remaining 553 /// iterations, regardless of whether they're for iterations above or below the current, 554 /// since all required work has already been completed. As with Break, however, there are no guarantees regarding 555 /// which other iterations will not execute. 556 /// </para> 557 /// <para> 558 /// When a loop is ended prematurely, the <see cref="T:ParallelLoopState"/> that's returned will contain 559 /// relevant information about the loop's completion. 560 /// </para> 561 /// </remarks> For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body)562 public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int, ParallelLoopState> body) 563 { 564 if (body == null) 565 { 566 throw new ArgumentNullException(nameof(body)); 567 } 568 569 return ForWorker<object>( 570 fromInclusive, toExclusive, s_defaultParallelOptions, 571 null, body, null, null, null); 572 } 573 574 /// <summary> 575 /// Executes a for loop in which iterations may run in parallel. 576 /// </summary> 577 /// <param name="fromInclusive">The start index, inclusive.</param> 578 /// <param name="toExclusive">The end index, exclusive.</param> 579 /// <param name="body">The delegate that is invoked once per iteration.</param> 580 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 581 /// argument is null.</exception> 582 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 583 /// thrown from one of the specified delegates.</exception> 584 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 585 /// that contains information on what portion of the loop completed.</returns> 586 /// <remarks> 587 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 588 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), 589 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 590 /// used to break out of the loop prematurely. 591 /// </remarks> For(long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body)592 public static ParallelLoopResult For(long fromInclusive, long toExclusive, Action<long, ParallelLoopState> body) 593 { 594 if (body == null) 595 { 596 throw new ArgumentNullException(nameof(body)); 597 } 598 599 return ForWorker64<object>( 600 fromInclusive, toExclusive, s_defaultParallelOptions, 601 null, body, null, null, null); 602 } 603 604 /// <summary> 605 /// Executes a for loop in which iterations may run in parallel. 606 /// </summary> 607 /// <param name="fromInclusive">The start index, inclusive.</param> 608 /// <param name="toExclusive">The end index, exclusive.</param> 609 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 610 /// instance that configures the behavior of this operation.</param> 611 /// <param name="body">The delegate that is invoked once per iteration.</param> 612 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 613 /// argument is null.</exception> 614 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 615 /// <paramref name="parallelOptions"/> argument is null.</exception> 616 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 617 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 618 /// argument is set.</exception> 619 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 620 /// thrown from one of the specified delegates.</exception> 621 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 622 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 623 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 624 /// <paramref name="parallelOptions"/> has been disposed.</exception> 625 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 626 /// that contains information on what portion of the loop completed.</returns> 627 /// <remarks> 628 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 629 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), 630 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 631 /// used to break out of the loop prematurely. 632 /// </remarks> For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body)633 public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body) 634 { 635 if (body == null) 636 { 637 throw new ArgumentNullException(nameof(body)); 638 } 639 if (parallelOptions == null) 640 { 641 throw new ArgumentNullException(nameof(parallelOptions)); 642 } 643 644 return ForWorker<object>( 645 fromInclusive, toExclusive, parallelOptions, 646 null, body, null, null, null); 647 } 648 649 /// <summary> 650 /// Executes a for loop in which iterations may run in parallel. 651 /// </summary> 652 /// <param name="fromInclusive">The start index, inclusive.</param> 653 /// <param name="toExclusive">The end index, exclusive.</param> 654 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 655 /// instance that configures the behavior of this operation.</param> 656 /// <param name="body">The delegate that is invoked once per iteration.</param> 657 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 658 /// argument is null.</exception> 659 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 660 /// <paramref name="parallelOptions"/> argument is null.</exception> 661 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 662 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 663 /// argument is set.</exception> 664 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 665 /// thrown from one of the specified delegates.</exception> 666 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 667 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 668 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 669 /// <paramref name="parallelOptions"/> has been disposed.</exception> 670 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 671 /// that contains information on what portion of the loop completed.</returns> 672 /// <remarks> 673 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 674 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), 675 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 676 /// used to break out of the loop prematurely. 677 /// </remarks> For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long, ParallelLoopState> body)678 public static ParallelLoopResult For(long fromInclusive, long toExclusive, ParallelOptions parallelOptions, 679 Action<long, ParallelLoopState> body) 680 { 681 if (body == null) 682 { 683 throw new ArgumentNullException(nameof(body)); 684 } 685 if (parallelOptions == null) 686 { 687 throw new ArgumentNullException(nameof(parallelOptions)); 688 } 689 690 return ForWorker64<object>( 691 fromInclusive, toExclusive, parallelOptions, 692 null, body, null, null, null); 693 } 694 695 /// <summary> 696 /// Executes a for loop in which iterations may run in parallel. 697 /// </summary> 698 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 699 /// <param name="fromInclusive">The start index, inclusive.</param> 700 /// <param name="toExclusive">The end index, exclusive.</param> 701 /// <param name="localInit">The function delegate that returns the initial state of the local data 702 /// for each thread.</param> 703 /// <param name="body">The delegate that is invoked once per iteration.</param> 704 /// <param name="localFinally">The delegate that performs a final action on the local state of each 705 /// thread.</param> 706 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 707 /// argument is null.</exception> 708 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 709 /// <paramref name="localInit"/> argument is null.</exception> 710 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 711 /// <paramref name="localFinally"/> argument is null.</exception> 712 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 713 /// thrown from one of the specified delegates.</exception> 714 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 715 /// that contains information on what portion of the loop completed.</returns> 716 /// <remarks> 717 /// <para> 718 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 719 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), 720 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 721 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 722 /// that execute on the same thread. 723 /// </para> 724 /// <para> 725 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 726 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 727 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 728 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 729 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 730 /// action on each thread's local state. 731 /// </para> 732 /// </remarks> For( int fromInclusive, int toExclusive, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)733 public static ParallelLoopResult For<TLocal>( 734 int fromInclusive, int toExclusive, 735 Func<TLocal> localInit, 736 Func<int, ParallelLoopState, TLocal, TLocal> body, 737 Action<TLocal> localFinally) 738 { 739 if (body == null) 740 { 741 throw new ArgumentNullException(nameof(body)); 742 } 743 if (localInit == null) 744 { 745 throw new ArgumentNullException(nameof(localInit)); 746 } 747 if (localFinally == null) 748 { 749 throw new ArgumentNullException(nameof(localFinally)); 750 } 751 752 return ForWorker( 753 fromInclusive, toExclusive, s_defaultParallelOptions, 754 null, null, body, localInit, localFinally); 755 } 756 757 /// <summary> 758 /// Executes a for loop in which iterations may run in parallel. Supports 64-bit indices. 759 /// </summary> 760 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 761 /// <param name="fromInclusive">The start index, inclusive.</param> 762 /// <param name="toExclusive">The end index, exclusive.</param> 763 /// <param name="localInit">The function delegate that returns the initial state of the local data 764 /// for each thread.</param> 765 /// <param name="body">The delegate that is invoked once per iteration.</param> 766 /// <param name="localFinally">The delegate that performs a final action on the local state of each 767 /// thread.</param> 768 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 769 /// argument is null.</exception> 770 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 771 /// <paramref name="localInit"/> argument is null.</exception> 772 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 773 /// <paramref name="localFinally"/> argument is null.</exception> 774 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 775 /// thrown from one of the specified delegates.</exception> 776 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 777 /// that contains information on what portion of the loop completed.</returns> 778 /// <remarks> 779 /// <para> 780 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 781 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), 782 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 783 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 784 /// that execute on the same thread. 785 /// </para> 786 /// <para> 787 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 788 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 789 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 790 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 791 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 792 /// action on each thread's local state. 793 /// </para> 794 /// </remarks> For( long fromInclusive, long toExclusive, Func<TLocal> localInit, Func<long, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)795 public static ParallelLoopResult For<TLocal>( 796 long fromInclusive, long toExclusive, 797 Func<TLocal> localInit, 798 Func<long, ParallelLoopState, TLocal, TLocal> body, 799 Action<TLocal> localFinally) 800 { 801 if (body == null) 802 { 803 throw new ArgumentNullException(nameof(body)); 804 } 805 if (localInit == null) 806 { 807 throw new ArgumentNullException(nameof(localInit)); 808 } 809 if (localFinally == null) 810 { 811 throw new ArgumentNullException(nameof(localFinally)); 812 } 813 814 return ForWorker64( 815 fromInclusive, toExclusive, s_defaultParallelOptions, 816 null, null, body, localInit, localFinally); 817 } 818 819 /// <summary> 820 /// Executes a for loop in which iterations may run in parallel. 821 /// </summary> 822 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 823 /// <param name="fromInclusive">The start index, inclusive.</param> 824 /// <param name="toExclusive">The end index, exclusive.</param> 825 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 826 /// instance that configures the behavior of this operation.</param> 827 /// <param name="localInit">The function delegate that returns the initial state of the local data 828 /// for each thread.</param> 829 /// <param name="body">The delegate that is invoked once per iteration.</param> 830 /// <param name="localFinally">The delegate that performs a final action on the local state of each 831 /// thread.</param> 832 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 833 /// argument is null.</exception> 834 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 835 /// <paramref name="localInit"/> argument is null.</exception> 836 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 837 /// <paramref name="localFinally"/> argument is null.</exception> 838 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 839 /// <paramref name="parallelOptions"/> argument is null.</exception> 840 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 841 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 842 /// argument is set.</exception> 843 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 844 /// thrown from one of the specified delegates.</exception> 845 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 846 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 847 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 848 /// <paramref name="parallelOptions"/> has been disposed.</exception> 849 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 850 /// that contains information on what portion of the loop completed.</returns> 851 /// <remarks> 852 /// <para> 853 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 854 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int32), 855 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 856 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 857 /// that execute on the same thread. 858 /// </para> 859 /// <para> 860 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 861 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 862 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 863 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 864 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 865 /// action on each thread's local state. 866 /// </para> 867 /// </remarks> For( int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)868 public static ParallelLoopResult For<TLocal>( 869 int fromInclusive, int toExclusive, ParallelOptions parallelOptions, 870 Func<TLocal> localInit, 871 Func<int, ParallelLoopState, TLocal, TLocal> body, 872 Action<TLocal> localFinally) 873 { 874 if (body == null) 875 { 876 throw new ArgumentNullException(nameof(body)); 877 } 878 if (localInit == null) 879 { 880 throw new ArgumentNullException(nameof(localInit)); 881 } 882 if (localFinally == null) 883 { 884 throw new ArgumentNullException(nameof(localFinally)); 885 } 886 if (parallelOptions == null) 887 { 888 throw new ArgumentNullException(nameof(parallelOptions)); 889 } 890 891 return ForWorker( 892 fromInclusive, toExclusive, parallelOptions, 893 null, null, body, localInit, localFinally); 894 } 895 896 /// <summary> 897 /// Executes a for loop in which iterations may run in parallel. 898 /// </summary> 899 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 900 /// <param name="fromInclusive">The start index, inclusive.</param> 901 /// <param name="toExclusive">The end index, exclusive.</param> 902 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 903 /// instance that configures the behavior of this operation.</param> 904 /// <param name="localInit">The function delegate that returns the initial state of the local data 905 /// for each thread.</param> 906 /// <param name="body">The delegate that is invoked once per iteration.</param> 907 /// <param name="localFinally">The delegate that performs a final action on the local state of each 908 /// thread.</param> 909 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 910 /// argument is null.</exception> 911 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 912 /// <paramref name="localInit"/> argument is null.</exception> 913 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 914 /// <paramref name="localFinally"/> argument is null.</exception> 915 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 916 /// <paramref name="parallelOptions"/> argument is null.</exception> 917 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 918 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 919 /// argument is set.</exception> 920 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 921 /// thrown from one of the specified delegates.</exception> 922 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 923 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 924 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 925 /// <paramref name="parallelOptions"/> has been disposed.</exception> 926 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 927 /// that contains information on what portion of the loop completed.</returns> 928 /// <remarks> 929 /// <para> 930 /// The <paramref name="body"/> delegate is invoked once for each value in the iteration range: 931 /// [fromInclusive, toExclusive). It is provided with the following parameters: the iteration count (an Int64), 932 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 933 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 934 /// that execute on the same thread. 935 /// </para> 936 /// <para> 937 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 938 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 939 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 940 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 941 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 942 /// action on each thread's local state. 943 /// </para> 944 /// </remarks> For( long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<long, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)945 public static ParallelLoopResult For<TLocal>( 946 long fromInclusive, long toExclusive, ParallelOptions parallelOptions, 947 Func<TLocal> localInit, 948 Func<long, ParallelLoopState, TLocal, TLocal> body, 949 Action<TLocal> localFinally) 950 { 951 if (body == null) 952 { 953 throw new ArgumentNullException(nameof(body)); 954 } 955 if (localInit == null) 956 { 957 throw new ArgumentNullException(nameof(localInit)); 958 } 959 if (localFinally == null) 960 { 961 throw new ArgumentNullException(nameof(localFinally)); 962 } 963 if (parallelOptions == null) 964 { 965 throw new ArgumentNullException(nameof(parallelOptions)); 966 } 967 968 969 return ForWorker64( 970 fromInclusive, toExclusive, parallelOptions, 971 null, null, body, localInit, localFinally); 972 } 973 974 CheckTimeoutReached(Int32 timeoutOccursAt)975 private static bool CheckTimeoutReached(Int32 timeoutOccursAt) 976 { 977 // Note that both, Environment.TickCount and timeoutOccursAt are ints and can overflow and become negative. 978 Int32 currentMillis = Environment.TickCount; 979 980 if (currentMillis < timeoutOccursAt) 981 return false; 982 983 if (0 > timeoutOccursAt && 0 < currentMillis) 984 return false; 985 986 return true; 987 } 988 989 ComputeTimeoutPoint(Int32 timeoutLength)990 private static Int32 ComputeTimeoutPoint(Int32 timeoutLength) 991 { 992 // Environment.TickCount is an int that cycles. We intentionally let the point in time at which the 993 // timeout occurs overflow. It will still stay ahead of Environment.TickCount for the comparisons made 994 // in CheckTimeoutReached(..): 995 unchecked 996 { 997 return Environment.TickCount + timeoutLength; 998 } 999 } 1000 1001 /// <summary> 1002 /// Performs the major work of the parallel for loop. It assumes that argument validation has already 1003 /// been performed by the caller. This function's whole purpose in life is to enable as much reuse of 1004 /// common implementation details for the various For overloads we offer. Without it, we'd end up 1005 /// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on 1006 /// ParallelState, and (3) for loops with thread local data. 1007 /// 1008 /// </summary> 1009 /// <typeparam name="TLocal">The type of the local data.</typeparam> 1010 /// <param name="fromInclusive">The loop's start index, inclusive.</param> 1011 /// <param name="toExclusive">The loop's end index, exclusive.</param> 1012 /// <param name="parallelOptions">A ParallelOptions instance.</param> 1013 /// <param name="body">The simple loop body.</param> 1014 /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> 1015 /// <param name="bodyWithLocal">The loop body for thread local state overloads.</param> 1016 /// <param name="localInit">A selector function that returns new thread local state.</param> 1017 /// <param name="localFinally">A cleanup function to destroy thread local state.</param> 1018 /// <remarks>Only one of the body arguments may be supplied (i.e. they are exclusive).</remarks> 1019 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> ForWorker( int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int> body, Action<int, ParallelLoopState> bodyWithState, Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal, Func<TLocal> localInit, Action<TLocal> localFinally)1020 private static ParallelLoopResult ForWorker<TLocal>( 1021 int fromInclusive, int toExclusive, 1022 ParallelOptions parallelOptions, 1023 Action<int> body, 1024 Action<int, ParallelLoopState> bodyWithState, 1025 Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal, 1026 Func<TLocal> localInit, Action<TLocal> localFinally) 1027 { 1028 Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1, 1029 "expected exactly one body function to be supplied"); 1030 Debug.Assert(bodyWithLocal != null || (localInit == null && localFinally == null), 1031 "thread local functions should only be supplied for loops w/ thread local bodies"); 1032 1033 // Instantiate our result. Specifics will be filled in later. 1034 ParallelLoopResult result = new ParallelLoopResult(); 1035 1036 // We just return immediately if 'to' is smaller (or equal to) 'from'. 1037 if (toExclusive <= fromInclusive) 1038 { 1039 result._completed = true; 1040 return result; 1041 } 1042 1043 // For all loops we need a shared flag even though we don't have a body with state, 1044 // because the shared flag contains the exceptional bool, which triggers other workers 1045 // to exit their loops if one worker catches an exception 1046 ParallelLoopStateFlags32 sharedPStateFlags = new ParallelLoopStateFlags32(); 1047 1048 // Before getting started, do a quick peek to see if we have been canceled already 1049 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 1050 1051 // initialize ranges with passed in loop arguments and expected number of workers 1052 int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? 1053 PlatformHelper.ProcessorCount : 1054 parallelOptions.EffectiveMaxConcurrencyLevel; 1055 RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers); 1056 1057 // Keep track of any cancellations 1058 OperationCanceledException oce = null; 1059 1060 // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled 1061 CancellationTokenRegistration ctr = (!parallelOptions.CancellationToken.CanBeCanceled) 1062 ? default(CancellationTokenRegistration) 1063 : parallelOptions.CancellationToken.Register((o) => 1064 { 1065 // Record our cancellation before stopping processing 1066 oce = new OperationCanceledException(parallelOptions.CancellationToken); 1067 // Cause processing to stop 1068 sharedPStateFlags.Cancel(); 1069 }, state: null, useSynchronizationContext: false); 1070 1071 // ETW event for Parallel For begin 1072 int forkJoinContextID = 0; 1073 if (ParallelEtwProvider.Log.IsEnabled()) 1074 { 1075 forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); 1076 ParallelEtwProvider.Log.ParallelLoopBegin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, 1077 forkJoinContextID, ParallelEtwProvider.ForkJoinOperationType.ParallelFor, 1078 fromInclusive, toExclusive); 1079 } 1080 1081 try 1082 { 1083 try 1084 { 1085 TaskReplicator.Run( 1086 (ref RangeWorker currentWorker, int timeout, out bool replicationDelegateYieldedBeforeCompletion) => 1087 { 1088 // First thing we do upon entering the task is to register as a new "RangeWorker" with the 1089 // shared RangeManager instance. 1090 1091 if (!currentWorker.IsInitialized) 1092 currentWorker = rangeManager.RegisterNewWorker(); 1093 1094 // We will need to reset this to true if we exit due to a timeout: 1095 replicationDelegateYieldedBeforeCompletion = false; 1096 1097 // We need to call FindNewWork32() on it to see whether there's a chunk available. 1098 // These are the local index values to be used in the sequential loop. 1099 // Their values filled in by FindNewWork32 1100 int nFromInclusiveLocal; 1101 int nToExclusiveLocal; 1102 1103 if (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) == false || 1104 sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) 1105 { 1106 return; // no need to run 1107 } 1108 1109 // ETW event for ParallelFor Worker Fork 1110 if (ParallelEtwProvider.Log.IsEnabled()) 1111 { 1112 ParallelEtwProvider.Log.ParallelFork(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 1113 } 1114 1115 TLocal localValue = default(TLocal); 1116 bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't 1117 1118 try 1119 { 1120 // Create a new state object that references the shared "stopped" and "exceptional" flags 1121 // If needed, it will contain a new instance of thread-local state by invoking the selector. 1122 ParallelLoopState32 state = null; 1123 1124 if (bodyWithState != null) 1125 { 1126 Debug.Assert(sharedPStateFlags != null); 1127 state = new ParallelLoopState32(sharedPStateFlags); 1128 } 1129 else if (bodyWithLocal != null) 1130 { 1131 Debug.Assert(sharedPStateFlags != null); 1132 state = new ParallelLoopState32(sharedPStateFlags); 1133 if (localInit != null) 1134 { 1135 localValue = localInit(); 1136 bLocalValueInitialized = true; 1137 } 1138 } 1139 1140 // initialize a loop timer which will help us decide whether we should exit early 1141 Int32 loopTimeout = ComputeTimeoutPoint(timeout); 1142 1143 // Now perform the loop itself. 1144 do 1145 { 1146 if (body != null) 1147 { 1148 for (int j = nFromInclusiveLocal; 1149 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone // fast path check as SEL() doesn't inline 1150 || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state 1151 j += 1) 1152 { 1153 body(j); 1154 } 1155 } 1156 else if (bodyWithState != null) 1157 { 1158 for (int j = nFromInclusiveLocal; 1159 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone // fast path check as SEL() doesn't inline 1160 || !sharedPStateFlags.ShouldExitLoop(j)); 1161 j += 1) 1162 { 1163 state.CurrentIteration = j; 1164 bodyWithState(j, state); 1165 } 1166 } 1167 else 1168 { 1169 for (int j = nFromInclusiveLocal; 1170 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone // fast path check as SEL() doesn't inline 1171 || !sharedPStateFlags.ShouldExitLoop(j)); 1172 j += 1) 1173 { 1174 state.CurrentIteration = j; 1175 localValue = bodyWithLocal(j, state, localValue); 1176 } 1177 } 1178 1179 // Cooperative multitasking: 1180 // Check if allowed loop time is exceeded, if so save current state and return. 1181 // The task replicator will queue up a replacement task. Note that we don't do this on the root task. 1182 if (CheckTimeoutReached(loopTimeout)) 1183 { 1184 replicationDelegateYieldedBeforeCompletion = true; 1185 break; 1186 } 1187 // Exit DO-loop if we can't find new work, or if the loop was stopped: 1188 } while (currentWorker.FindNewWork32(out nFromInclusiveLocal, out nToExclusiveLocal) && 1189 ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone) || 1190 !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); 1191 } 1192 catch (Exception ex) 1193 { 1194 // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow 1195 sharedPStateFlags.SetExceptional(); 1196 ExceptionDispatchInfo.Throw(ex); 1197 } 1198 finally 1199 { 1200 // If a cleanup function was specified, call it. Otherwise, if the type is 1201 // IDisposable, we will invoke Dispose on behalf of the user. 1202 if (localFinally != null && bLocalValueInitialized) 1203 { 1204 localFinally(localValue); 1205 } 1206 1207 // ETW event for ParallelFor Worker Join 1208 if (ParallelEtwProvider.Log.IsEnabled()) 1209 { 1210 ParallelEtwProvider.Log.ParallelJoin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 1211 } 1212 } 1213 }, 1214 parallelOptions, 1215 stopOnFirstFailure: true); 1216 } 1217 finally 1218 { 1219 // Dispose the cancellation token registration before checking for a cancellation exception 1220 if (parallelOptions.CancellationToken.CanBeCanceled) 1221 ctr.Dispose(); 1222 } 1223 1224 // If we got through that with no exceptions, and we were canceled, then 1225 // throw our cancellation exception 1226 if (oce != null) throw oce; 1227 } 1228 catch (AggregateException aggExp) 1229 { 1230 // If we have many cancellation exceptions all caused by the specified user cancel control, then throw only one OCE: 1231 ThrowSingleCancellationExceptionOrOtherException(aggExp.InnerExceptions, parallelOptions.CancellationToken, aggExp); 1232 } 1233 finally 1234 { 1235 int sb_status = sharedPStateFlags.LoopStateFlags; 1236 result._completed = (sb_status == ParallelLoopStateFlags.ParallelLoopStateNone); 1237 if ((sb_status & ParallelLoopStateFlags.ParallelLoopStateBroken) != 0) 1238 { 1239 result._lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; 1240 } 1241 1242 // ETW event for Parallel For End 1243 if (ParallelEtwProvider.Log.IsEnabled()) 1244 { 1245 int nTotalIterations = 0; 1246 1247 // calculate how many iterations we ran in total 1248 if (sb_status == ParallelLoopStateFlags.ParallelLoopStateNone) 1249 nTotalIterations = toExclusive - fromInclusive; 1250 else if ((sb_status & ParallelLoopStateFlags.ParallelLoopStateBroken) != 0) 1251 nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive; 1252 else 1253 nTotalIterations = -1; //ParallelLoopStateStopped! We can't determine this if we were stopped.. 1254 1255 ParallelEtwProvider.Log.ParallelLoopEnd(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID, nTotalIterations); 1256 } 1257 } 1258 1259 return result; 1260 } 1261 1262 /// <summary> 1263 /// Performs the major work of the 64-bit parallel for loop. It assumes that argument validation has already 1264 /// been performed by the caller. This function's whole purpose in life is to enable as much reuse of 1265 /// common implementation details for the various For overloads we offer. Without it, we'd end up 1266 /// with lots of duplicate code. It handles: (1) simple for loops, (2) for loops that depend on 1267 /// ParallelState, and (3) for loops with thread local data. 1268 /// 1269 /// </summary> 1270 /// <typeparam name="TLocal">The type of the local data.</typeparam> 1271 /// <param name="fromInclusive">The loop's start index, inclusive.</param> 1272 /// <param name="toExclusive">The loop's end index, exclusive.</param> 1273 /// <param name="parallelOptions">A ParallelOptions instance.</param> 1274 /// <param name="body">The simple loop body.</param> 1275 /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> 1276 /// <param name="bodyWithLocal">The loop body for thread local state overloads.</param> 1277 /// <param name="localInit">A selector function that returns new thread local state.</param> 1278 /// <param name="localFinally">A cleanup function to destroy thread local state.</param> 1279 /// <remarks>Only one of the body arguments may be supplied (i.e. they are exclusive).</remarks> 1280 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> ForWorker64( long fromInclusive, long toExclusive, ParallelOptions parallelOptions, Action<long> body, Action<long, ParallelLoopState> bodyWithState, Func<long, ParallelLoopState, TLocal, TLocal> bodyWithLocal, Func<TLocal> localInit, Action<TLocal> localFinally)1281 private static ParallelLoopResult ForWorker64<TLocal>( 1282 long fromInclusive, long toExclusive, 1283 ParallelOptions parallelOptions, 1284 Action<long> body, 1285 Action<long, ParallelLoopState> bodyWithState, 1286 Func<long, ParallelLoopState, TLocal, TLocal> bodyWithLocal, 1287 Func<TLocal> localInit, Action<TLocal> localFinally) 1288 { 1289 Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + (bodyWithLocal == null ? 0 : 1)) == 1, 1290 "expected exactly one body function to be supplied"); 1291 Debug.Assert(bodyWithLocal != null || (localInit == null && localFinally == null), 1292 "thread local functions should only be supplied for loops w/ thread local bodies"); 1293 1294 // Instantiate our result. Specifics will be filled in later. 1295 ParallelLoopResult result = new ParallelLoopResult(); 1296 1297 // We just return immediately if 'to' is smaller (or equal to) 'from'. 1298 if (toExclusive <= fromInclusive) 1299 { 1300 result._completed = true; 1301 return result; 1302 } 1303 1304 // For all loops we need a shared flag even though we don't have a body with state, 1305 // because the shared flag contains the exceptional bool, which triggers other workers 1306 // to exit their loops if one worker catches an exception 1307 ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64(); 1308 1309 // Before getting started, do a quick peek to see if we have been canceled already 1310 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 1311 1312 1313 // initialize ranges with passed in loop arguments and expected number of workers 1314 int numExpectedWorkers = (parallelOptions.EffectiveMaxConcurrencyLevel == -1) ? 1315 PlatformHelper.ProcessorCount : 1316 parallelOptions.EffectiveMaxConcurrencyLevel; 1317 RangeManager rangeManager = new RangeManager(fromInclusive, toExclusive, 1, numExpectedWorkers); 1318 1319 // Keep track of any cancellations 1320 OperationCanceledException oce = null; 1321 1322 // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled 1323 CancellationTokenRegistration ctr = (!parallelOptions.CancellationToken.CanBeCanceled) 1324 ? default(CancellationTokenRegistration) 1325 : parallelOptions.CancellationToken.Register((o) => 1326 { 1327 // Record our cancellation before stopping processing 1328 oce = new OperationCanceledException(parallelOptions.CancellationToken); 1329 // Cause processing to stop 1330 sharedPStateFlags.Cancel(); 1331 }, state: null, useSynchronizationContext: false); 1332 1333 // ETW event for Parallel For begin 1334 int forkJoinContextID = 0; 1335 if (ParallelEtwProvider.Log.IsEnabled()) 1336 { 1337 forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); 1338 ParallelEtwProvider.Log.ParallelLoopBegin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, 1339 forkJoinContextID, ParallelEtwProvider.ForkJoinOperationType.ParallelFor, 1340 fromInclusive, toExclusive); 1341 } 1342 1343 try 1344 { 1345 try 1346 { 1347 TaskReplicator.Run( 1348 (ref RangeWorker currentWorker, int timeout, out bool replicationDelegateYieldedBeforeCompletion) => 1349 { 1350 // First thing we do upon entering the task is to register as a new "RangeWorker" with the 1351 // shared RangeManager instance. 1352 1353 if (!currentWorker.IsInitialized) 1354 currentWorker = rangeManager.RegisterNewWorker(); 1355 1356 // We will need to reset this to true if we exit due to a timeout: 1357 replicationDelegateYieldedBeforeCompletion = false; 1358 1359 1360 // These are the local index values to be used in the sequential loop. 1361 // Their values filled in by FindNewWork 1362 long nFromInclusiveLocal; 1363 long nToExclusiveLocal; 1364 1365 if (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) == false || 1366 sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal) == true) 1367 { 1368 return; // no need to run 1369 } 1370 1371 1372 // ETW event for ParallelFor Worker Fork 1373 if (ParallelEtwProvider.Log.IsEnabled()) 1374 { 1375 ParallelEtwProvider.Log.ParallelFork(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 1376 } 1377 1378 TLocal localValue = default(TLocal); 1379 bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't 1380 1381 try 1382 { 1383 // Create a new state object that references the shared "stopped" and "exceptional" flags 1384 // If needed, it will contain a new instance of thread-local state by invoking the selector. 1385 ParallelLoopState64 state = null; 1386 1387 if (bodyWithState != null) 1388 { 1389 Debug.Assert(sharedPStateFlags != null); 1390 state = new ParallelLoopState64(sharedPStateFlags); 1391 } 1392 else if (bodyWithLocal != null) 1393 { 1394 Debug.Assert(sharedPStateFlags != null); 1395 state = new ParallelLoopState64(sharedPStateFlags); 1396 1397 // If a thread-local selector was supplied, invoke it. Otherwise, use the default. 1398 if (localInit != null) 1399 { 1400 localValue = localInit(); 1401 bLocalValueInitialized = true; 1402 } 1403 } 1404 1405 // initialize a loop timer which will help us decide whether we should exit early 1406 Int32 loopTimeout = ComputeTimeoutPoint(timeout); 1407 1408 // Now perform the loop itself. 1409 do 1410 { 1411 if (body != null) 1412 { 1413 for (long j = nFromInclusiveLocal; 1414 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone // fast path check as SEL() doesn't inline 1415 || !sharedPStateFlags.ShouldExitLoop()); // the no-arg version is used since we have no state 1416 j += 1) 1417 { 1418 body(j); 1419 } 1420 } 1421 else if (bodyWithState != null) 1422 { 1423 for (long j = nFromInclusiveLocal; 1424 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone // fast path check as SEL() doesn't inline 1425 || !sharedPStateFlags.ShouldExitLoop(j)); 1426 j += 1) 1427 { 1428 state.CurrentIteration = j; 1429 bodyWithState(j, state); 1430 } 1431 } 1432 else 1433 { 1434 for (long j = nFromInclusiveLocal; 1435 j < nToExclusiveLocal && (sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone // fast path check as SEL() doesn't inline 1436 || !sharedPStateFlags.ShouldExitLoop(j)); 1437 j += 1) 1438 { 1439 state.CurrentIteration = j; 1440 localValue = bodyWithLocal(j, state, localValue); 1441 } 1442 } 1443 1444 // Cooperative multitasking: 1445 // Check if allowed loop time is exceeded, if so save current state and return. 1446 // The task replicator will queue up a replacement task. Note that we don't do this on the root task. 1447 if (CheckTimeoutReached(loopTimeout)) 1448 { 1449 replicationDelegateYieldedBeforeCompletion = true; 1450 break; 1451 } 1452 // Exit DO-loop if we can't find new work, or if the loop was stopped: 1453 } while (currentWorker.FindNewWork(out nFromInclusiveLocal, out nToExclusiveLocal) && 1454 ((sharedPStateFlags.LoopStateFlags == ParallelLoopStateFlags.ParallelLoopStateNone) || 1455 !sharedPStateFlags.ShouldExitLoop(nFromInclusiveLocal))); 1456 } 1457 catch (Exception ex) 1458 { 1459 // if we catch an exception in a worker, we signal the other workers to exit the loop, and we rethrow 1460 sharedPStateFlags.SetExceptional(); 1461 ExceptionDispatchInfo.Throw(ex); 1462 } 1463 finally 1464 { 1465 // If a cleanup function was specified, call it. Otherwise, if the type is 1466 // IDisposable, we will invoke Dispose on behalf of the user. 1467 if (localFinally != null && bLocalValueInitialized) 1468 { 1469 localFinally(localValue); 1470 } 1471 1472 // ETW event for ParallelFor Worker Join 1473 if (ParallelEtwProvider.Log.IsEnabled()) 1474 { 1475 ParallelEtwProvider.Log.ParallelJoin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 1476 } 1477 } 1478 }, 1479 parallelOptions, 1480 stopOnFirstFailure: true); 1481 } 1482 finally 1483 { 1484 // Dispose the cancellation token registration before checking for a cancellation exception 1485 if (parallelOptions.CancellationToken.CanBeCanceled) 1486 ctr.Dispose(); 1487 } 1488 1489 // If we got through that with no exceptions, and we were canceled, then 1490 // throw our cancellation exception 1491 if (oce != null) throw oce; 1492 } 1493 catch (AggregateException aggExp) 1494 { 1495 // If we have many cancellation exceptions all caused by the specified user cancel control, then throw only one OCE: 1496 ThrowSingleCancellationExceptionOrOtherException(aggExp.InnerExceptions, parallelOptions.CancellationToken, aggExp); 1497 } 1498 finally 1499 { 1500 int sb_status = sharedPStateFlags.LoopStateFlags; 1501 result._completed = (sb_status == ParallelLoopStateFlags.ParallelLoopStateNone); 1502 if ((sb_status & ParallelLoopStateFlags.ParallelLoopStateBroken) != 0) 1503 { 1504 result._lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; 1505 } 1506 1507 // ETW event for Parallel For End 1508 if (ParallelEtwProvider.Log.IsEnabled()) 1509 { 1510 long nTotalIterations = 0; 1511 1512 // calculate how many iterations we ran in total 1513 if (sb_status == ParallelLoopStateFlags.ParallelLoopStateNone) 1514 nTotalIterations = toExclusive - fromInclusive; 1515 else if ((sb_status & ParallelLoopStateFlags.ParallelLoopStateBroken) != 0) 1516 nTotalIterations = sharedPStateFlags.LowestBreakIteration - fromInclusive; 1517 else 1518 nTotalIterations = -1; //ParallelLoopStateStopped! We can't determine this if we were stopped.. 1519 1520 ParallelEtwProvider.Log.ParallelLoopEnd(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID, nTotalIterations); 1521 } 1522 } 1523 1524 return result; 1525 } 1526 1527 1528 /// <summary> 1529 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1530 /// in which iterations may run in parallel. 1531 /// </summary> 1532 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1533 /// <param name="source">An enumerable data source.</param> 1534 /// <param name="body">The delegate that is invoked once per iteration.</param> 1535 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1536 /// argument is null.</exception> 1537 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1538 /// argument is null.</exception> 1539 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1540 /// thrown from one of the specified delegates.</exception> 1541 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1542 /// that contains information on what portion of the loop completed.</returns> 1543 /// <remarks> 1544 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1545 /// enumerable. It is provided with the current element as a parameter. 1546 /// </remarks> ForEach(IEnumerable<TSource> source, Action<TSource> body)1547 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body) 1548 { 1549 if (source == null) 1550 { 1551 throw new ArgumentNullException(nameof(source)); 1552 } 1553 if (body == null) 1554 { 1555 throw new ArgumentNullException(nameof(body)); 1556 } 1557 1558 return ForEachWorker<TSource, object>( 1559 source, s_defaultParallelOptions, body, null, null, null, null, null, null); 1560 } 1561 1562 /// <summary> 1563 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1564 /// in which iterations may run in parallel. 1565 /// </summary> 1566 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1567 /// <param name="source">An enumerable data source.</param> 1568 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 1569 /// instance that configures the behavior of this operation.</param> 1570 /// <param name="body">The delegate that is invoked once per iteration.</param> 1571 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1572 /// argument is null.</exception> 1573 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1574 /// <paramref name="parallelOptions"/> argument is null.</exception> 1575 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1576 /// argument is null.</exception> 1577 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 1578 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 1579 /// argument is set</exception> 1580 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1581 /// thrown from one of the specified delegates.</exception> 1582 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 1583 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 1584 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 1585 /// <paramref name="parallelOptions"/> has been disposed.</exception> 1586 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1587 /// that contains information on what portion of the loop completed.</returns> 1588 /// <remarks> 1589 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1590 /// enumerable. It is provided with the current element as a parameter. 1591 /// </remarks> ForEach(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)1592 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body) 1593 { 1594 if (source == null) 1595 { 1596 throw new ArgumentNullException(nameof(source)); 1597 } 1598 if (body == null) 1599 { 1600 throw new ArgumentNullException(nameof(body)); 1601 } 1602 if (parallelOptions == null) 1603 { 1604 throw new ArgumentNullException(nameof(parallelOptions)); 1605 } 1606 1607 return ForEachWorker<TSource, object>( 1608 source, parallelOptions, body, null, null, null, null, null, null); 1609 } 1610 1611 /// <summary> 1612 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1613 /// in which iterations may run in parallel. 1614 /// </summary> 1615 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1616 /// <param name="source">An enumerable data source.</param> 1617 /// <param name="body">The delegate that is invoked once per iteration.</param> 1618 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1619 /// argument is null.</exception> 1620 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1621 /// argument is null.</exception> 1622 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1623 /// thrown from one of the specified delegates.</exception> 1624 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1625 /// that contains information on what portion of the loop completed.</returns> 1626 /// <remarks> 1627 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1628 /// enumerable. It is provided with the following parameters: the current element, 1629 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1630 /// used to break out of the loop prematurely. 1631 /// </remarks> ForEach(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)1632 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body) 1633 { 1634 if (source == null) 1635 { 1636 throw new ArgumentNullException(nameof(source)); 1637 } 1638 if (body == null) 1639 { 1640 throw new ArgumentNullException(nameof(body)); 1641 } 1642 1643 return ForEachWorker<TSource, object>( 1644 source, s_defaultParallelOptions, null, body, null, null, null, null, null); 1645 } 1646 1647 /// <summary> 1648 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1649 /// in which iterations may run in parallel. 1650 /// </summary> 1651 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1652 /// <param name="source">An enumerable data source.</param> 1653 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 1654 /// instance that configures the behavior of this operation.</param> 1655 /// <param name="body">The delegate that is invoked once per iteration.</param> 1656 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1657 /// argument is null.</exception> 1658 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1659 /// <paramref name="parallelOptions"/> argument is null.</exception> 1660 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1661 /// argument is null.</exception> 1662 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 1663 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 1664 /// argument is set</exception> 1665 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1666 /// thrown from one of the specified delegates.</exception> 1667 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 1668 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 1669 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 1670 /// <paramref name="parallelOptions"/> has been disposed.</exception> 1671 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1672 /// that contains information on what portion of the loop completed.</returns> 1673 /// <remarks> 1674 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1675 /// enumerable. It is provided with the following parameters: the current element, 1676 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1677 /// used to break out of the loop prematurely. 1678 /// </remarks> ForEach(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body)1679 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body) 1680 { 1681 if (source == null) 1682 { 1683 throw new ArgumentNullException(nameof(source)); 1684 } 1685 if (body == null) 1686 { 1687 throw new ArgumentNullException(nameof(body)); 1688 } 1689 if (parallelOptions == null) 1690 { 1691 throw new ArgumentNullException(nameof(parallelOptions)); 1692 } 1693 1694 return ForEachWorker<TSource, object>( 1695 source, parallelOptions, null, body, null, null, null, null, null); 1696 } 1697 1698 /// <summary> 1699 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1700 /// in which iterations may run in parallel. 1701 /// </summary> 1702 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1703 /// <param name="source">An enumerable data source.</param> 1704 /// <param name="body">The delegate that is invoked once per iteration.</param> 1705 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1706 /// argument is null.</exception> 1707 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1708 /// argument is null.</exception> 1709 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1710 /// thrown from one of the specified delegates.</exception> 1711 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1712 /// that contains information on what portion of the loop completed.</returns> 1713 /// <remarks> 1714 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1715 /// enumerable. It is provided with the following parameters: the current element, 1716 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1717 /// used to break out of the loop prematurely, and the current element's index (an Int64). 1718 /// </remarks> ForEach(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body)1719 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource, ParallelLoopState, long> body) 1720 { 1721 if (source == null) 1722 { 1723 throw new ArgumentNullException(nameof(source)); 1724 } 1725 if (body == null) 1726 { 1727 throw new ArgumentNullException(nameof(body)); 1728 } 1729 1730 return ForEachWorker<TSource, object>( 1731 source, s_defaultParallelOptions, null, null, body, null, null, null, null); 1732 } 1733 1734 /// <summary> 1735 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1736 /// in which iterations may run in parallel. 1737 /// </summary> 1738 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1739 /// <param name="source">An enumerable data source.</param> 1740 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 1741 /// instance that configures the behavior of this operation.</param> 1742 /// <param name="body">The delegate that is invoked once per iteration.</param> 1743 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1744 /// argument is null.</exception> 1745 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1746 /// <paramref name="parallelOptions"/> argument is null.</exception> 1747 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1748 /// argument is null.</exception> 1749 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 1750 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 1751 /// argument is set</exception> 1752 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1753 /// thrown from one of the specified delegates.</exception> 1754 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 1755 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 1756 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 1757 /// <paramref name="parallelOptions"/> has been disposed.</exception> 1758 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1759 /// that contains information on what portion of the loop completed.</returns> 1760 /// <remarks> 1761 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1762 /// enumerable. It is provided with the following parameters: the current element, 1763 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1764 /// used to break out of the loop prematurely, and the current element's index (an Int64). 1765 /// </remarks> ForEach(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body)1766 public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body) 1767 { 1768 if (source == null) 1769 { 1770 throw new ArgumentNullException(nameof(source)); 1771 } 1772 if (body == null) 1773 { 1774 throw new ArgumentNullException(nameof(body)); 1775 } 1776 if (parallelOptions == null) 1777 { 1778 throw new ArgumentNullException(nameof(parallelOptions)); 1779 } 1780 1781 return ForEachWorker<TSource, object>( 1782 source, parallelOptions, null, null, body, null, null, null, null); 1783 } 1784 1785 /// <summary> 1786 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1787 /// in which iterations may run in parallel. 1788 /// </summary> 1789 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1790 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 1791 /// <param name="source">An enumerable data source.</param> 1792 /// <param name="localInit">The function delegate that returns the initial state of the local data 1793 /// for each thread.</param> 1794 /// <param name="body">The delegate that is invoked once per iteration.</param> 1795 /// <param name="localFinally">The delegate that performs a final action on the local state of each 1796 /// thread.</param> 1797 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1798 /// argument is null.</exception> 1799 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1800 /// argument is null.</exception> 1801 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1802 /// <paramref name="localInit"/> argument is null.</exception> 1803 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1804 /// <paramref name="localFinally"/> argument is null.</exception> 1805 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1806 /// thrown from one of the specified delegates.</exception> 1807 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1808 /// that contains information on what portion of the loop completed.</returns> 1809 /// <remarks> 1810 /// <para> 1811 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1812 /// enumerable. It is provided with the following parameters: the current element, 1813 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1814 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 1815 /// that execute on the same thread. 1816 /// </para> 1817 /// <para> 1818 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 1819 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 1820 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 1821 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 1822 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 1823 /// action on each thread's local state. 1824 /// </para> 1825 /// </remarks> ForEach(IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)1826 public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, 1827 Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally) 1828 { 1829 if (source == null) 1830 { 1831 throw new ArgumentNullException(nameof(source)); 1832 } 1833 if (body == null) 1834 { 1835 throw new ArgumentNullException(nameof(body)); 1836 } 1837 if (localInit == null) 1838 { 1839 throw new ArgumentNullException(nameof(localInit)); 1840 } 1841 if (localFinally == null) 1842 { 1843 throw new ArgumentNullException(nameof(localFinally)); 1844 } 1845 1846 return ForEachWorker<TSource, TLocal>( 1847 source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally); 1848 } 1849 1850 /// <summary> 1851 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1852 /// in which iterations may run in parallel. 1853 /// </summary> 1854 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1855 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 1856 /// <param name="source">An enumerable data source.</param> 1857 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 1858 /// instance that configures the behavior of this operation.</param> 1859 /// <param name="localInit">The function delegate that returns the initial state of the local data 1860 /// for each thread.</param> 1861 /// <param name="body">The delegate that is invoked once per iteration.</param> 1862 /// <param name="localFinally">The delegate that performs a final action on the local state of each 1863 /// thread.</param> 1864 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1865 /// argument is null.</exception> 1866 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1867 /// <paramref name="parallelOptions"/> argument is null.</exception> 1868 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1869 /// argument is null.</exception> 1870 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1871 /// <paramref name="localInit"/> argument is null.</exception> 1872 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1873 /// <paramref name="localFinally"/> argument is null.</exception> 1874 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 1875 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 1876 /// argument is set</exception> 1877 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1878 /// thrown from one of the specified delegates.</exception> 1879 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 1880 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 1881 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 1882 /// <paramref name="parallelOptions"/> has been disposed.</exception> 1883 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1884 /// that contains information on what portion of the loop completed.</returns> 1885 /// <remarks> 1886 /// <para> 1887 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1888 /// enumerable. It is provided with the following parameters: the current element, 1889 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1890 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 1891 /// that execute on the same thread. 1892 /// </para> 1893 /// <para> 1894 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 1895 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 1896 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 1897 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 1898 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 1899 /// action on each thread's local state. 1900 /// </para> 1901 /// </remarks> ForEach(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)1902 public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, 1903 ParallelOptions parallelOptions, Func<TLocal> localInit, 1904 Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally) 1905 { 1906 if (source == null) 1907 { 1908 throw new ArgumentNullException(nameof(source)); 1909 } 1910 if (body == null) 1911 { 1912 throw new ArgumentNullException(nameof(body)); 1913 } 1914 if (localInit == null) 1915 { 1916 throw new ArgumentNullException(nameof(localInit)); 1917 } 1918 if (localFinally == null) 1919 { 1920 throw new ArgumentNullException(nameof(localFinally)); 1921 } 1922 if (parallelOptions == null) 1923 { 1924 throw new ArgumentNullException(nameof(parallelOptions)); 1925 } 1926 1927 return ForEachWorker<TSource, TLocal>( 1928 source, parallelOptions, null, null, null, body, null, localInit, localFinally); 1929 } 1930 1931 /// <summary> 1932 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1933 /// in which iterations may run in parallel. 1934 /// </summary> 1935 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 1936 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 1937 /// <param name="source">An enumerable data source.</param> 1938 /// <param name="localInit">The function delegate that returns the initial state of the local data 1939 /// for each thread.</param> 1940 /// <param name="body">The delegate that is invoked once per iteration.</param> 1941 /// <param name="localFinally">The delegate that performs a final action on the local state of each 1942 /// thread.</param> 1943 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 1944 /// argument is null.</exception> 1945 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 1946 /// argument is null.</exception> 1947 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1948 /// <paramref name="localInit"/> argument is null.</exception> 1949 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 1950 /// <paramref name="localFinally"/> argument is null.</exception> 1951 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 1952 /// thrown from one of the specified delegates.</exception> 1953 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 1954 /// that contains information on what portion of the loop completed.</returns> 1955 /// <remarks> 1956 /// <para> 1957 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 1958 /// enumerable. It is provided with the following parameters: the current element, 1959 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 1960 /// used to break out of the loop prematurely, the current element's index (an Int64), and some local 1961 /// state that may be shared amongst iterations that execute on the same thread. 1962 /// </para> 1963 /// <para> 1964 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 1965 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 1966 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 1967 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 1968 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 1969 /// action on each thread's local state. 1970 /// </para> 1971 /// </remarks> ForEach(IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)1972 public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, Func<TLocal> localInit, 1973 Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally) 1974 { 1975 if (source == null) 1976 { 1977 throw new ArgumentNullException(nameof(source)); 1978 } 1979 if (body == null) 1980 { 1981 throw new ArgumentNullException(nameof(body)); 1982 } 1983 if (localInit == null) 1984 { 1985 throw new ArgumentNullException(nameof(localInit)); 1986 } 1987 if (localFinally == null) 1988 { 1989 throw new ArgumentNullException(nameof(localFinally)); 1990 } 1991 1992 return ForEachWorker<TSource, TLocal>( 1993 source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally); 1994 } 1995 1996 /// <summary> 1997 /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 1998 /// in which iterations may run in parallel. 1999 /// </summary> 2000 /// <typeparam name="TSource">The type of the data in the source.</typeparam> 2001 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 2002 /// <param name="source">An enumerable data source.</param> 2003 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 2004 /// instance that configures the behavior of this operation.</param> 2005 /// <param name="localInit">The function delegate that returns the initial state of the local data 2006 /// for each thread.</param> 2007 /// <param name="body">The delegate that is invoked once per iteration.</param> 2008 /// <param name="localFinally">The delegate that performs a final action on the local state of each 2009 /// thread.</param> 2010 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2011 /// argument is null.</exception> 2012 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2013 /// <paramref name="parallelOptions"/> argument is null.</exception> 2014 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2015 /// argument is null.</exception> 2016 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2017 /// <paramref name="localInit"/> argument is null.</exception> 2018 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2019 /// <paramref name="localFinally"/> argument is null.</exception> 2020 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 2021 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 2022 /// argument is set</exception> 2023 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2024 /// thrown from one of the specified delegates.</exception> 2025 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 2026 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 2027 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 2028 /// <paramref name="parallelOptions"/> has been disposed.</exception> 2029 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2030 /// that contains information on what portion of the loop completed.</returns> 2031 /// <remarks> 2032 /// <para> 2033 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2034 /// enumerable. It is provided with the following parameters: the current element, 2035 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2036 /// used to break out of the loop prematurely, the current element's index (an Int64), and some local 2037 /// state that may be shared amongst iterations that execute on the same thread. 2038 /// </para> 2039 /// <para> 2040 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 2041 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 2042 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 2043 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 2044 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 2045 /// action on each thread's local state. 2046 /// </para> 2047 /// </remarks> ForEach(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)2048 public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, 2049 Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally) 2050 { 2051 if (source == null) 2052 { 2053 throw new ArgumentNullException(nameof(source)); 2054 } 2055 if (body == null) 2056 { 2057 throw new ArgumentNullException(nameof(body)); 2058 } 2059 if (localInit == null) 2060 { 2061 throw new ArgumentNullException(nameof(localInit)); 2062 } 2063 if (localFinally == null) 2064 { 2065 throw new ArgumentNullException(nameof(localFinally)); 2066 } 2067 if (parallelOptions == null) 2068 { 2069 throw new ArgumentNullException(nameof(parallelOptions)); 2070 } 2071 2072 return ForEachWorker<TSource, TLocal>( 2073 source, parallelOptions, null, null, null, null, body, localInit, localFinally); 2074 } 2075 2076 2077 /// <summary> 2078 /// Performs the major work of the parallel foreach loop. It assumes that argument validation has 2079 /// already been performed by the caller. This function's whole purpose in life is to enable as much 2080 /// reuse of common implementation details for the various For overloads we offer. Without it, we'd 2081 /// end up with lots of duplicate code. It handles: (1) simple foreach loops, (2) foreach loops that 2082 /// depend on ParallelState, and (3) foreach loops that access indices, (4) foreach loops with thread 2083 /// local data, and any necessary permutations thereof. 2084 /// 2085 /// </summary> 2086 /// <typeparam name="TSource">The type of the source data.</typeparam> 2087 /// <typeparam name="TLocal">The type of the local data.</typeparam> 2088 /// <param name="source">An enumerable data source.</param> 2089 /// <param name="parallelOptions">ParallelOptions instance to use with this ForEach-loop</param> 2090 /// <param name="body">The simple loop body.</param> 2091 /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> 2092 /// <param name="bodyWithStateAndIndex">The loop body for ParallelState/indexed overloads.</param> 2093 /// <param name="bodyWithStateAndLocal">The loop body for ParallelState/thread local state overloads.</param> 2094 /// <param name="bodyWithEverything">The loop body for ParallelState/indexed/thread local state overloads.</param> 2095 /// <param name="localInit">A selector function that returns new thread local state.</param> 2096 /// <param name="localFinally">A cleanup function to destroy thread local state.</param> 2097 /// <remarks>Only one of the bodyXX arguments may be supplied (i.e. they are exclusive).</remarks> 2098 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> ForEachWorker( IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource> body, Action<TSource, ParallelLoopState> bodyWithState, Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, Func<TLocal> localInit, Action<TLocal> localFinally)2099 private static ParallelLoopResult ForEachWorker<TSource, TLocal>( 2100 IEnumerable<TSource> source, 2101 ParallelOptions parallelOptions, 2102 Action<TSource> body, 2103 Action<TSource, ParallelLoopState> bodyWithState, 2104 Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, 2105 Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, 2106 Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, 2107 Func<TLocal> localInit, Action<TLocal> localFinally) 2108 { 2109 Debug.Assert(((body == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + 2110 (bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1, 2111 "expected exactly one body function to be supplied"); 2112 Debug.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null), 2113 "thread local functions should only be supplied for loops w/ thread local bodies"); 2114 2115 // Before getting started, do a quick peek to see if we have been canceled already 2116 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 2117 2118 // If it's an array, we can use a fast-path that uses ldelems in the IL. 2119 TSource[] sourceAsArray = source as TSource[]; 2120 if (sourceAsArray != null) 2121 { 2122 return ForEachWorker<TSource, TLocal>( 2123 sourceAsArray, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, 2124 bodyWithEverything, localInit, localFinally); 2125 } 2126 2127 // If we can index into the list, we can use a faster code-path that doesn't result in 2128 // contention for the single, shared enumerator object. 2129 IList<TSource> sourceAsList = source as IList<TSource>; 2130 if (sourceAsList != null) 2131 { 2132 return ForEachWorker<TSource, TLocal>( 2133 sourceAsList, parallelOptions, body, bodyWithState, bodyWithStateAndIndex, bodyWithStateAndLocal, 2134 bodyWithEverything, localInit, localFinally); 2135 } 2136 2137 // This is an honest-to-goodness IEnumerable. Wrap it in a Partitioner and defer to our 2138 // ForEach(Partitioner) logic. 2139 return PartitionerForEachWorker<TSource, TLocal>(Partitioner.Create(source), parallelOptions, body, bodyWithState, 2140 bodyWithStateAndIndex, bodyWithStateAndLocal, bodyWithEverything, localInit, localFinally); 2141 } 2142 2143 /// <summary> 2144 /// A fast path for the more general ForEachWorker method above. This uses ldelem instructions to 2145 /// access the individual elements of the array, which will be faster. 2146 /// </summary> 2147 /// <typeparam name="TSource">The type of the source data.</typeparam> 2148 /// <typeparam name="TLocal">The type of the local data.</typeparam> 2149 /// <param name="array">An array data source.</param> 2150 /// <param name="parallelOptions">The options to use for execution.</param> 2151 /// <param name="body">The simple loop body.</param> 2152 /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> 2153 /// <param name="bodyWithStateAndIndex">The loop body for indexed/ParallelLoopState overloads.</param> 2154 /// <param name="bodyWithStateAndLocal">The loop body for local/ParallelLoopState overloads.</param> 2155 /// <param name="bodyWithEverything">The loop body for the most generic overload.</param> 2156 /// <param name="localInit">A selector function that returns new thread local state.</param> 2157 /// <param name="localFinally">A cleanup function to destroy thread local state.</param> 2158 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> ForEachWorker( TSource[] array, ParallelOptions parallelOptions, Action<TSource> body, Action<TSource, ParallelLoopState> bodyWithState, Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, Func<TLocal> localInit, Action<TLocal> localFinally)2159 private static ParallelLoopResult ForEachWorker<TSource, TLocal>( 2160 TSource[] array, 2161 ParallelOptions parallelOptions, 2162 Action<TSource> body, 2163 Action<TSource, ParallelLoopState> bodyWithState, 2164 Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, 2165 Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, 2166 Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, 2167 Func<TLocal> localInit, Action<TLocal> localFinally) 2168 { 2169 Debug.Assert(array != null); 2170 Debug.Assert(parallelOptions != null, "ForEachWorker(array): parallelOptions is null"); 2171 2172 int from = array.GetLowerBound(0); 2173 int to = array.GetUpperBound(0) + 1; 2174 2175 if (body != null) 2176 { 2177 return ForWorker<object>( 2178 from, to, parallelOptions, (i) => body(array[i]), null, null, null, null); 2179 } 2180 else if (bodyWithState != null) 2181 { 2182 return ForWorker<object>( 2183 from, to, parallelOptions, null, (i, state) => bodyWithState(array[i], state), null, null, null); 2184 } 2185 else if (bodyWithStateAndIndex != null) 2186 { 2187 return ForWorker<object>( 2188 from, to, parallelOptions, null, (i, state) => bodyWithStateAndIndex(array[i], state, i), null, null, null); 2189 } 2190 else if (bodyWithStateAndLocal != null) 2191 { 2192 return ForWorker<TLocal>( 2193 from, to, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(array[i], state, local), localInit, localFinally); 2194 } 2195 else 2196 { 2197 return ForWorker<TLocal>( 2198 from, to, parallelOptions, null, null, (i, state, local) => bodyWithEverything(array[i], state, i, local), localInit, localFinally); 2199 } 2200 } 2201 2202 /// <summary> 2203 /// A fast path for the more general ForEachWorker method above. This uses IList<T>'s indexer 2204 /// capabilities to access the individual elements of the list rather than an enumerator. 2205 /// </summary> 2206 /// <typeparam name="TSource">The type of the source data.</typeparam> 2207 /// <typeparam name="TLocal">The type of the local data.</typeparam> 2208 /// <param name="list">A list data source.</param> 2209 /// <param name="parallelOptions">The options to use for execution.</param> 2210 /// <param name="body">The simple loop body.</param> 2211 /// <param name="bodyWithState">The loop body for ParallelState overloads.</param> 2212 /// <param name="bodyWithStateAndIndex">The loop body for indexed/ParallelLoopState overloads.</param> 2213 /// <param name="bodyWithStateAndLocal">The loop body for local/ParallelLoopState overloads.</param> 2214 /// <param name="bodyWithEverything">The loop body for the most generic overload.</param> 2215 /// <param name="localInit">A selector function that returns new thread local state.</param> 2216 /// <param name="localFinally">A cleanup function to destroy thread local state.</param> 2217 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns> ForEachWorker( IList<TSource> list, ParallelOptions parallelOptions, Action<TSource> body, Action<TSource, ParallelLoopState> bodyWithState, Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, Func<TLocal> localInit, Action<TLocal> localFinally)2218 private static ParallelLoopResult ForEachWorker<TSource, TLocal>( 2219 IList<TSource> list, 2220 ParallelOptions parallelOptions, 2221 Action<TSource> body, 2222 Action<TSource, ParallelLoopState> bodyWithState, 2223 Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, 2224 Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, 2225 Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, 2226 Func<TLocal> localInit, Action<TLocal> localFinally) 2227 { 2228 Debug.Assert(list != null); 2229 Debug.Assert(parallelOptions != null, "ForEachWorker(list): parallelOptions is null"); 2230 2231 if (body != null) 2232 { 2233 return ForWorker<object>( 2234 0, list.Count, parallelOptions, (i) => body(list[i]), null, null, null, null); 2235 } 2236 else if (bodyWithState != null) 2237 { 2238 return ForWorker<object>( 2239 0, list.Count, parallelOptions, null, (i, state) => bodyWithState(list[i], state), null, null, null); 2240 } 2241 else if (bodyWithStateAndIndex != null) 2242 { 2243 return ForWorker<object>( 2244 0, list.Count, parallelOptions, null, (i, state) => bodyWithStateAndIndex(list[i], state, i), null, null, null); 2245 } 2246 else if (bodyWithStateAndLocal != null) 2247 { 2248 return ForWorker<TLocal>( 2249 0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithStateAndLocal(list[i], state, local), localInit, localFinally); 2250 } 2251 else 2252 { 2253 return ForWorker<TLocal>( 2254 0, list.Count, parallelOptions, null, null, (i, state, local) => bodyWithEverything(list[i], state, i, local), localInit, localFinally); 2255 } 2256 } 2257 2258 2259 2260 /// <summary> 2261 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> 2262 /// Partitioner</see> in which iterations may run in parallel. 2263 /// </summary> 2264 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2265 /// <param name="source">The Partitioner that contains the original data source.</param> 2266 /// <param name="body">The delegate that is invoked once per iteration.</param> 2267 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2268 /// argument is null.</exception> 2269 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2270 /// argument is null.</exception> 2271 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2272 /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns 2273 /// false.</exception> 2274 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2275 /// methods in the <paramref name="source"/> Partitioner return null.</exception> 2276 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2277 /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return 2278 /// the correct number of partitions.</exception> 2279 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2280 /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList 2281 /// with at least one null value.</exception> 2282 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2283 /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an 2284 /// IEnumerable whose GetEnumerator() method returns null.</exception> 2285 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2286 /// thrown from one of the specified delegates.</exception> 2287 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2288 /// that contains information on what portion of the loop completed.</returns> 2289 /// <remarks> 2290 /// <para> 2291 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2292 /// the elements to be processed, in place of the original data source. If the current element's 2293 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2294 /// OrderablePartitioner</see>. 2295 /// </para> 2296 /// <para> 2297 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2298 /// Partitioner. It is provided with the current element as a parameter. 2299 /// </para> 2300 /// </remarks> ForEach( Partitioner<TSource> source, Action<TSource> body)2301 public static ParallelLoopResult ForEach<TSource>( 2302 Partitioner<TSource> source, 2303 Action<TSource> body) 2304 { 2305 if (source == null) 2306 { 2307 throw new ArgumentNullException(nameof(source)); 2308 } 2309 if (body == null) 2310 { 2311 throw new ArgumentNullException(nameof(body)); 2312 } 2313 2314 return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, body, null, null, null, null, null, null); 2315 } 2316 2317 /// <summary> 2318 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> 2319 /// Partitioner</see> in which iterations may run in parallel. 2320 /// </summary> 2321 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2322 /// <param name="source">The Partitioner that contains the original data source.</param> 2323 /// <param name="body">The delegate that is invoked once per iteration.</param> 2324 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2325 /// argument is null.</exception> 2326 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2327 /// argument is null.</exception> 2328 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2329 /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns 2330 /// false.</exception> 2331 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2332 /// methods in the <paramref name="source"/> Partitioner return null.</exception> 2333 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2334 /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return 2335 /// the correct number of partitions.</exception> 2336 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2337 /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList 2338 /// with at least one null value.</exception> 2339 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2340 /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an 2341 /// IEnumerable whose GetEnumerator() method returns null.</exception> 2342 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2343 /// thrown from one of the specified delegates.</exception> 2344 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2345 /// that contains information on what portion of the loop completed.</returns> 2346 /// <remarks> 2347 /// <para> 2348 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2349 /// the elements to be processed, in place of the original data source. If the current element's 2350 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2351 /// OrderablePartitioner</see>. 2352 /// </para> 2353 /// <para> 2354 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2355 /// Partitioner. It is provided with the following parameters: the current element, 2356 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2357 /// used to break out of the loop prematurely. 2358 /// </para> 2359 /// </remarks> ForEach( Partitioner<TSource> source, Action<TSource, ParallelLoopState> body)2360 public static ParallelLoopResult ForEach<TSource>( 2361 Partitioner<TSource> source, 2362 Action<TSource, ParallelLoopState> body) 2363 { 2364 if (source == null) 2365 { 2366 throw new ArgumentNullException(nameof(source)); 2367 } 2368 if (body == null) 2369 { 2370 throw new ArgumentNullException(nameof(body)); 2371 } 2372 2373 return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, body, null, null, null, null, null); 2374 } 2375 2376 /// <summary> 2377 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> 2378 /// OrderablePartitioner</see> in which iterations may run in parallel. 2379 /// </summary> 2380 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2381 /// <param name="source">The OrderablePartitioner that contains the original data source.</param> 2382 /// <param name="body">The delegate that is invoked once per iteration.</param> 2383 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2384 /// argument is null.</exception> 2385 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2386 /// argument is null.</exception> 2387 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2388 /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns 2389 /// false.</exception> 2390 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2391 /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns 2392 /// false.</exception> 2393 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2394 /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> 2395 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2396 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2397 /// OrderablePartitioner do not return the correct number of partitions.</exception> 2398 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2399 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2400 /// OrderablePartitioner return an IList with at least one null value.</exception> 2401 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2402 /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> 2403 /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> 2404 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2405 /// thrown from one of the specified delegates.</exception> 2406 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2407 /// that contains information on what portion of the loop completed.</returns> 2408 /// <remarks> 2409 /// <para> 2410 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2411 /// the elements to be processed, in place of the original data source. If the current element's 2412 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2413 /// OrderablePartitioner</see>. 2414 /// </para> 2415 /// <para> 2416 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2417 /// Partitioner. It is provided with the following parameters: the current element, 2418 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2419 /// used to break out of the loop prematurely, and the current element's index (an Int64). 2420 /// </para> 2421 /// </remarks> ForEach( OrderablePartitioner<TSource> source, Action<TSource, ParallelLoopState, long> body)2422 public static ParallelLoopResult ForEach<TSource>( 2423 OrderablePartitioner<TSource> source, 2424 Action<TSource, ParallelLoopState, long> body) 2425 { 2426 if (source == null) 2427 { 2428 throw new ArgumentNullException(nameof(source)); 2429 } 2430 if (body == null) 2431 { 2432 throw new ArgumentNullException(nameof(body)); 2433 } 2434 2435 if (!source.KeysNormalized) 2436 { 2437 throw new InvalidOperationException(SR.Parallel_ForEach_OrderedPartitionerKeysNotNormalized); 2438 } 2439 2440 return PartitionerForEachWorker<TSource, object>(source, s_defaultParallelOptions, null, null, body, null, null, null, null); 2441 } 2442 2443 /// <summary> 2444 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> 2445 /// Partitioner</see> in which iterations may run in parallel. 2446 /// </summary> 2447 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2448 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 2449 /// <param name="source">The Partitioner that contains the original data source.</param> 2450 /// <param name="localInit">The function delegate that returns the initial state of the local data 2451 /// for each thread.</param> 2452 /// <param name="body">The delegate that is invoked once per iteration.</param> 2453 /// <param name="localFinally">The delegate that performs a final action on the local state of each 2454 /// thread.</param> 2455 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2456 /// argument is null.</exception> 2457 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2458 /// argument is null.</exception> 2459 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2460 /// <paramref name="localInit"/> argument is null.</exception> 2461 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2462 /// <paramref name="localFinally"/> argument is null.</exception> 2463 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2464 /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns 2465 /// false.</exception> 2466 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2467 /// methods in the <paramref name="source"/> Partitioner return null.</exception> 2468 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2469 /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return 2470 /// the correct number of partitions.</exception> 2471 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2472 /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList 2473 /// with at least one null value.</exception> 2474 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2475 /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an 2476 /// IEnumerable whose GetEnumerator() method returns null.</exception> 2477 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2478 /// thrown from one of the specified delegates.</exception> 2479 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2480 /// that contains information on what portion of the loop completed.</returns> 2481 /// <remarks> 2482 /// <para> 2483 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2484 /// the elements to be processed, in place of the original data source. If the current element's 2485 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2486 /// OrderablePartitioner</see>. 2487 /// </para> 2488 /// <para> 2489 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2490 /// Partitioner. It is provided with the following parameters: the current element, 2491 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2492 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 2493 /// that execute on the same thread. 2494 /// </para> 2495 /// <para> 2496 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 2497 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 2498 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 2499 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 2500 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 2501 /// action on each thread's local state. 2502 /// </para> 2503 /// </remarks> ForEach( Partitioner<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)2504 public static ParallelLoopResult ForEach<TSource, TLocal>( 2505 Partitioner<TSource> source, 2506 Func<TLocal> localInit, 2507 Func<TSource, ParallelLoopState, TLocal, TLocal> body, 2508 Action<TLocal> localFinally) 2509 { 2510 if (source == null) 2511 { 2512 throw new ArgumentNullException(nameof(source)); 2513 } 2514 if (body == null) 2515 { 2516 throw new ArgumentNullException(nameof(body)); 2517 } 2518 if (localInit == null) 2519 { 2520 throw new ArgumentNullException(nameof(localInit)); 2521 } 2522 if (localFinally == null) 2523 { 2524 throw new ArgumentNullException(nameof(localFinally)); 2525 } 2526 2527 return PartitionerForEachWorker<TSource, TLocal>(source, s_defaultParallelOptions, null, null, null, body, null, localInit, localFinally); 2528 } 2529 2530 /// <summary> 2531 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> 2532 /// OrderablePartitioner</see> in which iterations may run in parallel. 2533 /// </summary> 2534 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2535 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 2536 /// <param name="source">The OrderablePartitioner that contains the original data source.</param> 2537 /// <param name="localInit">The function delegate that returns the initial state of the local data 2538 /// for each thread.</param> 2539 /// <param name="body">The delegate that is invoked once per iteration.</param> 2540 /// <param name="localFinally">The delegate that performs a final action on the local state of each 2541 /// thread.</param> 2542 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2543 /// argument is null.</exception> 2544 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2545 /// argument is null.</exception> 2546 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2547 /// <paramref name="localInit"/> argument is null.</exception> 2548 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2549 /// <paramref name="localFinally"/> argument is null.</exception> 2550 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2551 /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns 2552 /// false.</exception> 2553 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2554 /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns 2555 /// false.</exception> 2556 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2557 /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> 2558 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2559 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2560 /// OrderablePartitioner do not return the correct number of partitions.</exception> 2561 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2562 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2563 /// OrderablePartitioner return an IList with at least one null value.</exception> 2564 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2565 /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> 2566 /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> 2567 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2568 /// thrown from one of the specified delegates.</exception> 2569 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2570 /// that contains information on what portion of the loop completed.</returns> 2571 /// <remarks> 2572 /// <para> 2573 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2574 /// the elements to be processed, in place of the original data source. If the current element's 2575 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2576 /// OrderablePartitioner</see>. 2577 /// </para> 2578 /// <para> 2579 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2580 /// Partitioner. It is provided with the following parameters: the current element, 2581 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2582 /// used to break out of the loop prematurely, the current element's index (an Int64), and some local 2583 /// state that may be shared amongst iterations that execute on the same thread. 2584 /// </para> 2585 /// <para> 2586 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 2587 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 2588 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 2589 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 2590 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 2591 /// action on each thread's local state. 2592 /// </para> 2593 /// </remarks> ForEach( OrderablePartitioner<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)2594 public static ParallelLoopResult ForEach<TSource, TLocal>( 2595 OrderablePartitioner<TSource> source, 2596 Func<TLocal> localInit, 2597 Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, 2598 Action<TLocal> localFinally) 2599 { 2600 if (source == null) 2601 { 2602 throw new ArgumentNullException(nameof(source)); 2603 } 2604 if (body == null) 2605 { 2606 throw new ArgumentNullException(nameof(body)); 2607 } 2608 if (localInit == null) 2609 { 2610 throw new ArgumentNullException(nameof(localInit)); 2611 } 2612 if (localFinally == null) 2613 { 2614 throw new ArgumentNullException(nameof(localFinally)); 2615 } 2616 2617 if (!source.KeysNormalized) 2618 { 2619 throw new InvalidOperationException(SR.Parallel_ForEach_OrderedPartitionerKeysNotNormalized); 2620 } 2621 2622 return PartitionerForEachWorker<TSource, TLocal>(source, s_defaultParallelOptions, null, null, null, null, body, localInit, localFinally); 2623 } 2624 2625 /// <summary> 2626 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> 2627 /// Partitioner</see> in which iterations may run in parallel. 2628 /// </summary> 2629 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2630 /// <param name="source">The Partitioner that contains the original data source.</param> 2631 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 2632 /// instance that configures the behavior of this operation.</param> 2633 /// <param name="body">The delegate that is invoked once per iteration.</param> 2634 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2635 /// argument is null.</exception> 2636 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2637 /// <paramref name="parallelOptions"/> argument is null.</exception> 2638 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2639 /// argument is null.</exception> 2640 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 2641 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 2642 /// argument is set</exception> 2643 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2644 /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns 2645 /// false.</exception> 2646 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2647 /// methods in the <paramref name="source"/> Partitioner return null.</exception> 2648 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2649 /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return 2650 /// the correct number of partitions.</exception> 2651 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2652 /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList 2653 /// with at least one null value.</exception> 2654 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2655 /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an 2656 /// IEnumerable whose GetEnumerator() method returns null.</exception> 2657 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2658 /// thrown from one of the specified delegates.</exception> 2659 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 2660 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 2661 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 2662 /// <paramref name="parallelOptions"/> has been disposed.</exception> 2663 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2664 /// that contains information on what portion of the loop completed.</returns> 2665 /// <remarks> 2666 /// <para> 2667 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2668 /// the elements to be processed, in place of the original data source. If the current element's 2669 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2670 /// OrderablePartitioner</see>. 2671 /// </para> 2672 /// <para> 2673 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2674 /// Partitioner. It is provided with the current element as a parameter. 2675 /// </para> 2676 /// </remarks> ForEach( Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource> body)2677 public static ParallelLoopResult ForEach<TSource>( 2678 Partitioner<TSource> source, 2679 ParallelOptions parallelOptions, 2680 Action<TSource> body) 2681 { 2682 if (source == null) 2683 { 2684 throw new ArgumentNullException(nameof(source)); 2685 } 2686 if (body == null) 2687 { 2688 throw new ArgumentNullException(nameof(body)); 2689 } 2690 if (parallelOptions == null) 2691 { 2692 throw new ArgumentNullException(nameof(parallelOptions)); 2693 } 2694 2695 return PartitionerForEachWorker<TSource, object>(source, parallelOptions, body, null, null, null, null, null, null); 2696 } 2697 2698 /// <summary> 2699 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> 2700 /// Partitioner</see> in which iterations may run in parallel. 2701 /// </summary> 2702 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2703 /// <param name="source">The Partitioner that contains the original data source.</param> 2704 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 2705 /// instance that configures the behavior of this operation.</param> 2706 /// <param name="body">The delegate that is invoked once per iteration.</param> 2707 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2708 /// argument is null.</exception> 2709 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2710 /// <paramref name="parallelOptions"/> argument is null.</exception> 2711 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2712 /// argument is null.</exception> 2713 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 2714 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 2715 /// argument is set</exception> 2716 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2717 /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns 2718 /// false.</exception> 2719 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2720 /// methods in the <paramref name="source"/> Partitioner return null.</exception> 2721 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2722 /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return 2723 /// the correct number of partitions.</exception> 2724 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2725 /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList 2726 /// with at least one null value.</exception> 2727 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2728 /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an 2729 /// IEnumerable whose GetEnumerator() method returns null.</exception> 2730 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2731 /// thrown from one of the specified delegates.</exception> 2732 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 2733 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 2734 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 2735 /// <paramref name="parallelOptions"/> has been disposed.</exception> 2736 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2737 /// that contains information on what portion of the loop completed.</returns> 2738 /// <remarks> 2739 /// <para> 2740 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2741 /// the elements to be processed, in place of the original data source. If the current element's 2742 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2743 /// OrderablePartitioner</see>. 2744 /// </para> 2745 /// <para> 2746 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2747 /// Partitioner. It is provided with the following parameters: the current element, 2748 /// and a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2749 /// used to break out of the loop prematurely. 2750 /// </para> 2751 /// </remarks> ForEach( Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body)2752 public static ParallelLoopResult ForEach<TSource>( 2753 Partitioner<TSource> source, 2754 ParallelOptions parallelOptions, 2755 Action<TSource, ParallelLoopState> body) 2756 { 2757 if (source == null) 2758 { 2759 throw new ArgumentNullException(nameof(source)); 2760 } 2761 if (body == null) 2762 { 2763 throw new ArgumentNullException(nameof(body)); 2764 } 2765 if (parallelOptions == null) 2766 { 2767 throw new ArgumentNullException(nameof(parallelOptions)); 2768 } 2769 2770 return PartitionerForEachWorker<TSource, object>(source, parallelOptions, null, body, null, null, null, null, null); 2771 } 2772 2773 /// <summary> 2774 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> 2775 /// OrderablePartitioner</see> in which iterations may run in parallel. 2776 /// </summary> 2777 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2778 /// <param name="source">The OrderablePartitioner that contains the original data source.</param> 2779 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 2780 /// instance that configures the behavior of this operation.</param> 2781 /// <param name="body">The delegate that is invoked once per iteration.</param> 2782 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2783 /// argument is null.</exception> 2784 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2785 /// <paramref name="parallelOptions"/> argument is null.</exception> 2786 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2787 /// argument is null.</exception> 2788 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 2789 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 2790 /// argument is set</exception> 2791 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2792 /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns 2793 /// false.</exception> 2794 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2795 /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns 2796 /// false.</exception> 2797 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2798 /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> 2799 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2800 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2801 /// OrderablePartitioner do not return the correct number of partitions.</exception> 2802 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2803 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2804 /// OrderablePartitioner return an IList with at least one null value.</exception> 2805 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2806 /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> 2807 /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> 2808 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2809 /// thrown from one of the specified delegates.</exception> 2810 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 2811 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 2812 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 2813 /// <paramref name="parallelOptions"/> has been disposed.</exception> 2814 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2815 /// that contains information on what portion of the loop completed.</returns> 2816 /// <remarks> 2817 /// <para> 2818 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2819 /// the elements to be processed, in place of the original data source. If the current element's 2820 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2821 /// OrderablePartitioner</see>. 2822 /// </para> 2823 /// <para> 2824 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2825 /// Partitioner. It is provided with the following parameters: the current element, 2826 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2827 /// used to break out of the loop prematurely, and the current element's index (an Int64). 2828 /// </para> 2829 /// </remarks> ForEach( OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState, long> body)2830 public static ParallelLoopResult ForEach<TSource>( 2831 OrderablePartitioner<TSource> source, 2832 ParallelOptions parallelOptions, 2833 Action<TSource, ParallelLoopState, long> body) 2834 { 2835 if (source == null) 2836 { 2837 throw new ArgumentNullException(nameof(source)); 2838 } 2839 if (body == null) 2840 { 2841 throw new ArgumentNullException(nameof(body)); 2842 } 2843 if (parallelOptions == null) 2844 { 2845 throw new ArgumentNullException(nameof(parallelOptions)); 2846 } 2847 2848 if (!source.KeysNormalized) 2849 { 2850 throw new InvalidOperationException(SR.Parallel_ForEach_OrderedPartitionerKeysNotNormalized); 2851 } 2852 2853 return PartitionerForEachWorker<TSource, object>(source, parallelOptions, null, null, body, null, null, null, null); 2854 } 2855 2856 /// <summary> 2857 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.Partitioner{TSource}"> 2858 /// Partitioner</see> in which iterations may run in parallel. 2859 /// </summary> 2860 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2861 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 2862 /// <param name="source">The Partitioner that contains the original data source.</param> 2863 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 2864 /// instance that configures the behavior of this operation.</param> 2865 /// <param name="localInit">The function delegate that returns the initial state of the local data 2866 /// for each thread.</param> 2867 /// <param name="body">The delegate that is invoked once per iteration.</param> 2868 /// <param name="localFinally">The delegate that performs a final action on the local state of each 2869 /// thread.</param> 2870 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2871 /// argument is null.</exception> 2872 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2873 /// <paramref name="parallelOptions"/> argument is null.</exception> 2874 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2875 /// argument is null.</exception> 2876 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2877 /// <paramref name="localInit"/> argument is null.</exception> 2878 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2879 /// <paramref name="localFinally"/> argument is null.</exception> 2880 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 2881 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 2882 /// argument is set</exception> 2883 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2884 /// SupportsDynamicPartitions property in the <paramref name="source"/> Partitioner returns 2885 /// false.</exception> 2886 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2887 /// methods in the <paramref name="source"/> Partitioner return null.</exception> 2888 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2889 /// GetPartitions() method in the <paramref name="source"/> Partitioner does not return 2890 /// the correct number of partitions.</exception> 2891 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2892 /// GetPartitions() method in the <paramref name="source"/> Partitioner returns an IList 2893 /// with at least one null value.</exception> 2894 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2895 /// GetDynamicPartitions() method in the <paramref name="source"/> Partitioner returns an 2896 /// IEnumerable whose GetEnumerator() method returns null.</exception> 2897 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 2898 /// thrown from one of the specified delegates.</exception> 2899 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 2900 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 2901 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 2902 /// <paramref name="parallelOptions"/> has been disposed.</exception> 2903 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 2904 /// that contains information on what portion of the loop completed.</returns> 2905 /// <remarks> 2906 /// <para> 2907 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 2908 /// the elements to be processed, in place of the original data source. If the current element's 2909 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 2910 /// OrderablePartitioner</see>. 2911 /// </para> 2912 /// <para> 2913 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 2914 /// Partitioner. It is provided with the following parameters: the current element, 2915 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 2916 /// used to break out of the loop prematurely, and some local state that may be shared amongst iterations 2917 /// that execute on the same thread. 2918 /// </para> 2919 /// <para> 2920 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 2921 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 2922 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 2923 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 2924 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 2925 /// action on each thread's local state. 2926 /// </para> 2927 /// </remarks> ForEach( Partitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally)2928 public static ParallelLoopResult ForEach<TSource, TLocal>( 2929 Partitioner<TSource> source, 2930 ParallelOptions parallelOptions, 2931 Func<TLocal> localInit, 2932 Func<TSource, ParallelLoopState, TLocal, TLocal> body, 2933 Action<TLocal> localFinally) 2934 { 2935 if (source == null) 2936 { 2937 throw new ArgumentNullException(nameof(source)); 2938 } 2939 if (body == null) 2940 { 2941 throw new ArgumentNullException(nameof(body)); 2942 } 2943 if (localInit == null) 2944 { 2945 throw new ArgumentNullException(nameof(localInit)); 2946 } 2947 if (localFinally == null) 2948 { 2949 throw new ArgumentNullException(nameof(localFinally)); 2950 } 2951 if (parallelOptions == null) 2952 { 2953 throw new ArgumentNullException(nameof(parallelOptions)); 2954 } 2955 2956 return PartitionerForEachWorker<TSource, TLocal>(source, parallelOptions, null, null, null, body, null, localInit, localFinally); 2957 } 2958 2959 /// <summary> 2960 /// Executes a for each operation on a <see cref="T:System.Collections.Concurrent.OrderablePartitioner{TSource}"> 2961 /// OrderablePartitioner</see> in which iterations may run in parallel. 2962 /// </summary> 2963 /// <typeparam name="TSource">The type of the elements in <paramref name="source"/>.</typeparam> 2964 /// <typeparam name="TLocal">The type of the thread-local data.</typeparam> 2965 /// <param name="source">The OrderablePartitioner that contains the original data source.</param> 2966 /// <param name="parallelOptions">A <see cref="T:System.Threading.Tasks.ParallelOptions">ParallelOptions</see> 2967 /// instance that configures the behavior of this operation.</param> 2968 /// <param name="localInit">The function delegate that returns the initial state of the local data 2969 /// for each thread.</param> 2970 /// <param name="body">The delegate that is invoked once per iteration.</param> 2971 /// <param name="localFinally">The delegate that performs a final action on the local state of each 2972 /// thread.</param> 2973 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 2974 /// argument is null.</exception> 2975 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2976 /// <paramref name="parallelOptions"/> argument is null.</exception> 2977 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 2978 /// argument is null.</exception> 2979 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2980 /// <paramref name="localInit"/> argument is null.</exception> 2981 /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the 2982 /// <paramref name="localFinally"/> argument is null.</exception> 2983 /// <exception cref="T:System.OperationCanceledException">The exception that is thrown when the 2984 /// <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the <paramref name="parallelOptions"/> 2985 /// argument is set</exception> 2986 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2987 /// SupportsDynamicPartitions property in the <paramref name="source"/> OrderablePartitioner returns 2988 /// false.</exception> 2989 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2990 /// KeysNormalized property in the <paramref name="source"/> OrderablePartitioner returns 2991 /// false.</exception> 2992 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when any 2993 /// methods in the <paramref name="source"/> OrderablePartitioner return null.</exception> 2994 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2995 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2996 /// OrderablePartitioner do not return the correct number of partitions.</exception> 2997 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 2998 /// GetPartitions() or GetOrderablePartitions() methods in the <paramref name="source"/> 2999 /// OrderablePartitioner return an IList with at least one null value.</exception> 3000 /// <exception cref="T:System.InvalidOperationException">The exception that is thrown when the 3001 /// GetDynamicPartitions() or GetDynamicOrderablePartitions() methods in the <paramref name="source"/> 3002 /// OrderablePartitioner return an IEnumerable whose GetEnumerator() method returns null.</exception> 3003 /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception 3004 /// thrown from one of the specified delegates.</exception> 3005 /// <exception cref="T:System.ObjectDisposedException">The exception that is thrown when the 3006 /// the <see cref="T:System.Threading.CancellationTokenSource">CancellationTokenSource</see> associated with the 3007 /// the <see cref="T:System.Threading.CancellationToken">CancellationToken</see> in the 3008 /// <paramref name="parallelOptions"/> has been disposed.</exception> 3009 /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure 3010 /// that contains information on what portion of the loop completed.</returns> 3011 /// <remarks> 3012 /// <para> 3013 /// The <see cref="T:System.Collections.Concurrent.Partitioner{TSource}">Partitioner</see> is used to retrieve 3014 /// the elements to be processed, in place of the original data source. If the current element's 3015 /// index is desired, the source must be an <see cref="T:System.Collections.Concurrent.OrderablePartitioner"> 3016 /// OrderablePartitioner</see>. 3017 /// </para> 3018 /// <para> 3019 /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 3020 /// Partitioner. It is provided with the following parameters: the current element, 3021 /// a <see cref="System.Threading.Tasks.ParallelLoopState">ParallelLoopState</see> instance that may be 3022 /// used to break out of the loop prematurely, the current element's index (an Int64), and some local 3023 /// state that may be shared amongst iterations that execute on the same thread. 3024 /// </para> 3025 /// <para> 3026 /// The <paramref name="localInit"/> delegate is invoked once for each thread that participates in the loop's 3027 /// execution and returns the initial local state for each of those threads. These initial states are passed to the first 3028 /// <paramref name="body"/> invocations on each thread. Then, every subsequent body invocation returns a possibly 3029 /// modified state value that is passed to the next body invocation. Finally, the last body invocation on each thread returns a state value 3030 /// that is passed to the <paramref name="localFinally"/> delegate. The localFinally delegate is invoked once per thread to perform a final 3031 /// action on each thread's local state. 3032 /// </para> 3033 /// </remarks> ForEach( OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally)3034 public static ParallelLoopResult ForEach<TSource, TLocal>( 3035 OrderablePartitioner<TSource> source, 3036 ParallelOptions parallelOptions, 3037 Func<TLocal> localInit, 3038 Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, 3039 Action<TLocal> localFinally) 3040 { 3041 if (source == null) 3042 { 3043 throw new ArgumentNullException(nameof(source)); 3044 } 3045 if (body == null) 3046 { 3047 throw new ArgumentNullException(nameof(body)); 3048 } 3049 if (localInit == null) 3050 { 3051 throw new ArgumentNullException(nameof(localInit)); 3052 } 3053 if (localFinally == null) 3054 { 3055 throw new ArgumentNullException(nameof(localFinally)); 3056 } 3057 if (parallelOptions == null) 3058 { 3059 throw new ArgumentNullException(nameof(parallelOptions)); 3060 } 3061 3062 if (!source.KeysNormalized) 3063 { 3064 throw new InvalidOperationException(SR.Parallel_ForEach_OrderedPartitionerKeysNotNormalized); 3065 } 3066 3067 return PartitionerForEachWorker<TSource, TLocal>(source, parallelOptions, null, null, null, null, body, localInit, localFinally); 3068 } 3069 3070 // Main worker method for Parallel.ForEach() calls w/ Partitioners. PartitionerForEachWorker( Partitioner<TSource> source, ParallelOptions parallelOptions, Action<TSource> simpleBody, Action<TSource, ParallelLoopState> bodyWithState, Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, Func<TLocal> localInit, Action<TLocal> localFinally)3071 private static ParallelLoopResult PartitionerForEachWorker<TSource, TLocal>( 3072 Partitioner<TSource> source, // Might be OrderablePartitioner 3073 ParallelOptions parallelOptions, 3074 Action<TSource> simpleBody, 3075 Action<TSource, ParallelLoopState> bodyWithState, 3076 Action<TSource, ParallelLoopState, long> bodyWithStateAndIndex, 3077 Func<TSource, ParallelLoopState, TLocal, TLocal> bodyWithStateAndLocal, 3078 Func<TSource, ParallelLoopState, long, TLocal, TLocal> bodyWithEverything, 3079 Func<TLocal> localInit, 3080 Action<TLocal> localFinally) 3081 { 3082 Debug.Assert(((simpleBody == null ? 0 : 1) + (bodyWithState == null ? 0 : 1) + 3083 (bodyWithStateAndIndex == null ? 0 : 1) + (bodyWithStateAndLocal == null ? 0 : 1) + (bodyWithEverything == null ? 0 : 1)) == 1, 3084 "PartitionForEach: expected exactly one body function to be supplied"); 3085 Debug.Assert((bodyWithStateAndLocal != null) || (bodyWithEverything != null) || (localInit == null && localFinally == null), 3086 "PartitionForEach: thread local functions should only be supplied for loops w/ thread local bodies"); 3087 3088 OrderablePartitioner<TSource> orderedSource = source as OrderablePartitioner<TSource>; 3089 Debug.Assert((orderedSource != null) || (bodyWithStateAndIndex == null && bodyWithEverything == null), 3090 "PartitionForEach: bodies with indices are only allowable for OrderablePartitioner"); 3091 3092 if (!source.SupportsDynamicPartitions) 3093 { 3094 throw new InvalidOperationException(SR.Parallel_ForEach_PartitionerNotDynamic); 3095 } 3096 3097 // Before getting started, do a quick peek to see if we have been canceled already 3098 parallelOptions.CancellationToken.ThrowIfCancellationRequested(); 3099 3100 // ETW event for Parallel For begin 3101 int forkJoinContextID = 0; 3102 if (ParallelEtwProvider.Log.IsEnabled()) 3103 { 3104 forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); 3105 ParallelEtwProvider.Log.ParallelLoopBegin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, 3106 forkJoinContextID, ParallelEtwProvider.ForkJoinOperationType.ParallelForEach, 3107 0, 0); 3108 } 3109 3110 // For all loops we need a shared flag even though we don't have a body with state, 3111 // because the shared flag contains the exceptional bool, which triggers other workers 3112 // to exit their loops if one worker catches an exception 3113 ParallelLoopStateFlags64 sharedPStateFlags = new ParallelLoopStateFlags64(); 3114 3115 // Instantiate our result. Specifics will be filled in later. 3116 ParallelLoopResult result = new ParallelLoopResult(); 3117 3118 // Keep track of any cancellations 3119 OperationCanceledException oce = null; 3120 3121 // if cancellation is enabled, we need to register a callback to stop the loop when it gets signaled 3122 CancellationTokenRegistration ctr = (!parallelOptions.CancellationToken.CanBeCanceled) 3123 ? default(CancellationTokenRegistration) 3124 : parallelOptions.CancellationToken.Register((o) => 3125 { 3126 // Record our cancellation before stopping processing 3127 oce = new OperationCanceledException(parallelOptions.CancellationToken); 3128 // Cause processing to stop 3129 sharedPStateFlags.Cancel(); 3130 }, state: null, useSynchronizationContext: false); 3131 3132 // Get our dynamic partitioner -- depends on whether source is castable to OrderablePartitioner 3133 // Also, do some error checking. 3134 IEnumerable<TSource> partitionerSource = null; 3135 IEnumerable<KeyValuePair<long, TSource>> orderablePartitionerSource = null; 3136 if (orderedSource != null) 3137 { 3138 orderablePartitionerSource = orderedSource.GetOrderableDynamicPartitions(); 3139 if (orderablePartitionerSource == null) 3140 { 3141 throw new InvalidOperationException(SR.Parallel_ForEach_PartitionerReturnedNull); 3142 } 3143 } 3144 else 3145 { 3146 partitionerSource = source.GetDynamicPartitions(); 3147 if (partitionerSource == null) 3148 { 3149 throw new InvalidOperationException(SR.Parallel_ForEach_PartitionerReturnedNull); 3150 } 3151 } 3152 3153 try 3154 { 3155 try 3156 { 3157 TaskReplicator.Run( 3158 (ref IEnumerator partitionState, int timeout, out bool replicationDelegateYieldedBeforeCompletion) => 3159 { 3160 // We will need to reset this to true if we exit due to a timeout: 3161 replicationDelegateYieldedBeforeCompletion = false; 3162 3163 // ETW event for ParallelForEach Worker Fork 3164 if (ParallelEtwProvider.Log.IsEnabled()) 3165 { 3166 ParallelEtwProvider.Log.ParallelFork(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 3167 } 3168 3169 TLocal localValue = default(TLocal); 3170 bool bLocalValueInitialized = false; // Tracks whether localInit ran without exceptions, so that we can skip localFinally if it wasn't 3171 3172 try 3173 { 3174 // Create a new state object that references the shared "stopped" and "exceptional" flags. 3175 // If needed, it will contain a new instance of thread-local state by invoking the selector. 3176 ParallelLoopState64 state = null; 3177 3178 if (bodyWithState != null || bodyWithStateAndIndex != null) 3179 { 3180 state = new ParallelLoopState64(sharedPStateFlags); 3181 } 3182 else if (bodyWithStateAndLocal != null || bodyWithEverything != null) 3183 { 3184 state = new ParallelLoopState64(sharedPStateFlags); 3185 // If a thread-local selector was supplied, invoke it. Otherwise, stick with the default. 3186 if (localInit != null) 3187 { 3188 localValue = localInit(); 3189 bLocalValueInitialized = true; 3190 } 3191 } 3192 3193 // initialize a loop timer which will help us decide whether we should exit early 3194 Int32 loopTimeout = ComputeTimeoutPoint(timeout); 3195 3196 if (orderedSource != null) // Use this path for OrderablePartitioner: 3197 { 3198 // first check if there's saved state from a previous replica that we might be replacing. 3199 // the only state to be passed down in such a transition is the enumerator 3200 IEnumerator<KeyValuePair<long, TSource>> myPartition = partitionState as IEnumerator<KeyValuePair<long, TSource>>; 3201 if (myPartition == null) 3202 { 3203 myPartition = orderablePartitionerSource.GetEnumerator(); 3204 partitionState = myPartition; 3205 } 3206 3207 if (myPartition == null) 3208 throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator); 3209 3210 while (myPartition.MoveNext()) 3211 { 3212 KeyValuePair<long, TSource> kvp = myPartition.Current; 3213 long index = kvp.Key; 3214 TSource value = kvp.Value; 3215 3216 // Update our iteration index 3217 if (state != null) state.CurrentIteration = index; 3218 3219 if (simpleBody != null) 3220 simpleBody(value); 3221 else if (bodyWithState != null) 3222 bodyWithState(value, state); 3223 else if (bodyWithStateAndIndex != null) 3224 bodyWithStateAndIndex(value, state, index); 3225 else if (bodyWithStateAndLocal != null) 3226 localValue = bodyWithStateAndLocal(value, state, localValue); 3227 else 3228 localValue = bodyWithEverything(value, state, index, localValue); 3229 3230 if (sharedPStateFlags.ShouldExitLoop(index)) break; 3231 3232 // Cooperative multitasking: 3233 // Check if allowed loop time is exceeded, if so save current state and return. 3234 // The task replicator will queue up a replacement task. Note that we don't do this on the root task. 3235 if (CheckTimeoutReached(loopTimeout)) 3236 { 3237 replicationDelegateYieldedBeforeCompletion = true; 3238 break; 3239 } 3240 } 3241 } 3242 else // Use this path for Partitioner that is not OrderablePartitioner: 3243 { 3244 // first check if there's saved state from a previous replica that we might be replacing. 3245 // the only state to be passed down in such a transition is the enumerator 3246 IEnumerator<TSource> myPartition = partitionState as IEnumerator<TSource>; 3247 if (myPartition == null) 3248 { 3249 myPartition = partitionerSource.GetEnumerator(); 3250 partitionState = myPartition; 3251 } 3252 3253 if (myPartition == null) 3254 throw new InvalidOperationException(SR.Parallel_ForEach_NullEnumerator); 3255 3256 // I'm not going to try to maintain this 3257 if (state != null) 3258 state.CurrentIteration = 0; 3259 3260 while (myPartition.MoveNext()) 3261 { 3262 TSource t = myPartition.Current; 3263 3264 if (simpleBody != null) 3265 simpleBody(t); 3266 else if (bodyWithState != null) 3267 bodyWithState(t, state); 3268 else if (bodyWithStateAndLocal != null) 3269 localValue = bodyWithStateAndLocal(t, state, localValue); 3270 else 3271 Debug.Fail("PartitionerForEach: illegal body type in Partitioner handler"); 3272 3273 // Any break, stop or exception causes us to halt 3274 // We don't have the global indexing information to discriminate whether or not 3275 // we are before or after a break point. 3276 if (sharedPStateFlags.LoopStateFlags != ParallelLoopStateFlags.ParallelLoopStateNone) 3277 break; 3278 3279 // Cooperative multitasking: 3280 // Check if allowed loop time is exceeded, if so save current state and return. 3281 // The task replicator will queue up a replacement task. Note that we don't do this on the root task. 3282 if (CheckTimeoutReached(loopTimeout)) 3283 { 3284 replicationDelegateYieldedBeforeCompletion = true; 3285 break; 3286 } 3287 } 3288 } 3289 } 3290 catch (Exception ex) 3291 { 3292 // Inform other tasks of the exception, then rethrow 3293 sharedPStateFlags.SetExceptional(); 3294 ExceptionDispatchInfo.Throw(ex); 3295 } 3296 finally 3297 { 3298 if (localFinally != null && bLocalValueInitialized) 3299 { 3300 localFinally(localValue); 3301 } 3302 3303 if (!replicationDelegateYieldedBeforeCompletion) 3304 { 3305 IDisposable partitionToDispose = partitionState as IDisposable; 3306 if (partitionToDispose != null) 3307 partitionToDispose.Dispose(); 3308 } 3309 3310 // ETW event for ParallelFor Worker Join 3311 if (ParallelEtwProvider.Log.IsEnabled()) 3312 { 3313 ParallelEtwProvider.Log.ParallelJoin(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID); 3314 } 3315 } 3316 }, 3317 parallelOptions, 3318 stopOnFirstFailure: true); 3319 } 3320 finally 3321 { 3322 // Dispose the cancellation token registration before checking for a cancellation exception 3323 if (parallelOptions.CancellationToken.CanBeCanceled) 3324 ctr.Dispose(); 3325 } 3326 3327 // If we got through that with no exceptions, and we were canceled, then 3328 // throw our cancellation exception 3329 if (oce != null) throw oce; 3330 } 3331 catch (AggregateException aggExp) 3332 { 3333 // If we have many cancellation exceptions all caused by the specified user cancel control, then throw only one OCE: 3334 ThrowSingleCancellationExceptionOrOtherException(aggExp.InnerExceptions, parallelOptions.CancellationToken, aggExp); 3335 } 3336 finally 3337 { 3338 int sb_status = sharedPStateFlags.LoopStateFlags; 3339 result._completed = (sb_status == ParallelLoopStateFlags.ParallelLoopStateNone); 3340 if ((sb_status & ParallelLoopStateFlags.ParallelLoopStateBroken) != 0) 3341 { 3342 result._lowestBreakIteration = sharedPStateFlags.LowestBreakIteration; 3343 } 3344 3345 //dispose the partitioner source if it implements IDisposable 3346 IDisposable d = null; 3347 if (orderablePartitionerSource != null) 3348 { 3349 d = orderablePartitionerSource as IDisposable; 3350 } 3351 else 3352 { 3353 d = partitionerSource as IDisposable; 3354 } 3355 3356 if (d != null) 3357 { 3358 d.Dispose(); 3359 } 3360 3361 // ETW event for Parallel For End 3362 if (ParallelEtwProvider.Log.IsEnabled()) 3363 { 3364 ParallelEtwProvider.Log.ParallelLoopEnd(TaskScheduler.Current.Id, Task.CurrentId ?? 0, forkJoinContextID, 0); 3365 } 3366 } 3367 3368 return result; 3369 } 3370 3371 /// <summary> 3372 /// If all exceptions in the specified collection are OperationCanceledExceptions with the specified token, 3373 /// then get one such exception (the first one). Otherwise, return null. 3374 /// </summary> ReduceToSingleCancellationException(ICollection exceptions, CancellationToken cancelToken)3375 private static OperationCanceledException ReduceToSingleCancellationException(ICollection exceptions, 3376 CancellationToken cancelToken) 3377 { 3378 // If collection is empty - no match: 3379 if (exceptions == null || exceptions.Count == 0) 3380 return null; 3381 3382 // If token is not cancelled, it can not be part of an exception: 3383 if (!cancelToken.IsCancellationRequested) 3384 return null; 3385 3386 // Check all exceptions: 3387 Exception first = null; 3388 foreach (Object exObj in exceptions) 3389 { 3390 Debug.Assert(exObj is Exception); 3391 Exception ex = (Exception)exObj; 3392 3393 if (first == null) 3394 first = ex; 3395 3396 // If mismatch found, fail-fast: 3397 OperationCanceledException ocEx = ex as OperationCanceledException; 3398 if (ocEx == null || !cancelToken.Equals(ocEx.CancellationToken)) 3399 return null; 3400 } 3401 3402 // All exceptions are OCEs with this token, let's just pick the first: 3403 Debug.Assert(first is OperationCanceledException); 3404 return (OperationCanceledException)first; 3405 } 3406 3407 3408 /// <summary> 3409 /// IF exceptions are all OperationCanceledExceptions with the specified cancelToken, 3410 /// THEN throw that unique OperationCanceledException (pick any); 3411 /// OTHERWISE throw the specified otherException. 3412 /// </summary> ThrowSingleCancellationExceptionOrOtherException(ICollection exceptions, CancellationToken cancelToken, Exception otherException)3413 private static void ThrowSingleCancellationExceptionOrOtherException(ICollection exceptions, 3414 CancellationToken cancelToken, 3415 Exception otherException) 3416 { 3417 OperationCanceledException reducedCancelEx = ReduceToSingleCancellationException(exceptions, cancelToken); 3418 ExceptionDispatchInfo.Throw(reducedCancelEx ?? otherException); 3419 } 3420 } // class Parallel 3421 } // namespace 3422