1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // CESchedulerPairTests.cs 8 // Tests Ported from the TPL test bed 9 // 10 // Summary: 11 // Implements the tests for the new scheduler ConcurrentExclusiveSchedulerPair 12 // 13 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 14 15 using System; 16 using System.Threading; 17 using System.Threading.Tasks; 18 using System.Collections.Generic; 19 using System.Security; 20 21 using Xunit; 22 using System.Diagnostics; 23 24 namespace System.Threading.Tasks.Tests 25 { 26 public class TrackingTaskScheduler : TaskScheduler 27 { TrackingTaskScheduler(int maxConLevel)28 public TrackingTaskScheduler(int maxConLevel) 29 { 30 //We need to set the value to 1 so that each time a scheduler is created, its tasks will start with one. 31 _counter = 1; 32 if (maxConLevel < 1 && maxConLevel != -1/*infinite*/) 33 throw new ArgumentException("Maximum concurrency level should between 1 and int32.Maxvalue"); 34 35 _maxConcurrencyLevel = maxConLevel; 36 } 37 38 QueueTask(Task task)39 protected override void QueueTask(Task task) 40 { 41 if (task == null) throw new ArgumentNullException("When requesting to QueueTask, the input task can not be null"); 42 Task.Factory.StartNew(() => 43 { 44 lock (_lockObj) //Locking so that if multiple threads in threadpool does not incorrectly increment the counter. 45 { 46 //store the current value of the counter (This becomes the unique ID for this scheduler's Task) 47 SchedulerID.Value = _counter; 48 _counter++; 49 } 50 ExecuteTask(task); //Extracted out due to security attribute reason. 51 }, CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default); 52 } 53 ExecuteTask(Task task)54 private void ExecuteTask(Task task) 55 { 56 base.TryExecuteTask(task); 57 } 58 TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)59 protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) 60 { 61 if (taskWasPreviouslyQueued) return false; 62 return TryExecuteTask(task); 63 } 64 65 66 //public int SchedulerID 67 //{ 68 // get; 69 // set; 70 //} 71 GetScheduledTasks()72 protected override IEnumerable<Task> GetScheduledTasks() { return null; } 73 private Object _lockObj = new Object(); 74 private int _counter = 1; //This is used to keep track of how many scheduler tasks were created 75 public ThreadLocal<int> SchedulerID = new ThreadLocal<int>(); //This is the ID of the scheduler. 76 77 /// <summary>The maximum concurrency level for the scheduler.</summary> 78 private readonly int _maxConcurrencyLevel; 79 public override int MaximumConcurrencyLevel { get { return _maxConcurrencyLevel; } } 80 } 81 82 public class CESchedulerPairTests 83 { 84 #region Test cases 85 86 /// <summary> 87 /// Test to ensure that ConcurrentExclusiveSchedulerPair can be created using user defined parameters 88 /// and those parameters are respected when tasks are executed 89 /// </summary> 90 /// <remarks>maxItemsPerTask and which scheduler is used are verified in other testcases</remarks> 91 [Theory] 92 [InlineData("default")] 93 [InlineData("scheduler")] 94 [InlineData("maxconcurrent")] 95 [InlineData("all")] TestCreationOptions(String ctorType)96 public static void TestCreationOptions(String ctorType) 97 { 98 ConcurrentExclusiveSchedulerPair schedPair = null; 99 //Need to define the default values since these values are passed to the verification methods 100 TaskScheduler scheduler = TaskScheduler.Default; 101 int maxConcurrentLevel = Environment.ProcessorCount; 102 103 //Based on input args, use one of the ctor overloads 104 switch (ctorType.ToLower()) 105 { 106 case "default": 107 schedPair = new ConcurrentExclusiveSchedulerPair(); 108 break; 109 case "scheduler": 110 schedPair = new ConcurrentExclusiveSchedulerPair(scheduler); 111 break; 112 case "maxconcurrent": 113 maxConcurrentLevel = 2; 114 schedPair = new ConcurrentExclusiveSchedulerPair(scheduler, maxConcurrentLevel); 115 break; 116 case "all": 117 maxConcurrentLevel = Int32.MaxValue; 118 schedPair = new ConcurrentExclusiveSchedulerPair(scheduler, -1/*MaxConcurrentLevel*/, -1/*MaxItemsPerTask*/); //-1 gets converted to Int32.MaxValue 119 break; 120 default: 121 throw new NotImplementedException(String.Format("The option specified {0} to create the ConcurrentExclusiveSchedulerPair is invalid", ctorType)); 122 } 123 124 //Create the factories that use the exclusive scheduler and the concurrent scheduler. We test to ensure 125 //that the ConcurrentExclusiveSchedulerPair created are valid by scheduling work on them. 126 TaskFactory writers = new TaskFactory(schedPair.ExclusiveScheduler); 127 TaskFactory readers = new TaskFactory(schedPair.ConcurrentScheduler); 128 129 List<Task> taskList = new List<Task>(); //Store all tasks created, to enable wait until all of them are finished 130 131 // Schedule some dummy work that should be run with as much parallelism as possible 132 for (int i = 0; i < 50; i++) 133 { 134 //In the current design, when there are no more tasks to execute, the Task used by concurrentexclusive scheduler dies 135 //by sleeping we simulate some non trivial work that takes time and causes the concurrentexclusive scheduler Task 136 //to stay around for addition work. 137 taskList.Add(readers.StartNew(() => { var sw = new SpinWait(); while (!sw.NextSpinWillYield) sw.SpinOnce() ; })); 138 } 139 // Schedule work where each item must be run when no other items are running 140 for (int i = 0; i < 10; i++) taskList.Add(writers.StartNew(() => { var sw = new SpinWait(); while (!sw.NextSpinWillYield) sw.SpinOnce(); })); 141 142 //Wait on the tasks to finish to ensure that the ConcurrentExclusiveSchedulerPair created can schedule and execute tasks without issues 143 foreach (var item in taskList) 144 { 145 item.Wait(); 146 } 147 148 //verify that maxconcurrency was respected. 149 if (ctorType == "maxconcurrent") 150 { 151 Assert.Equal(maxConcurrentLevel, schedPair.ConcurrentScheduler.MaximumConcurrencyLevel); 152 } 153 Assert.Equal(1, schedPair.ExclusiveScheduler.MaximumConcurrencyLevel); 154 155 //verify that the schedulers have not completed 156 Assert.False(schedPair.Completion.IsCompleted, "The schedulers should not have completed as a completion request was not issued."); 157 158 //complete the scheduler and make sure it shuts down successfully 159 schedPair.Complete(); 160 schedPair.Completion.Wait(); 161 162 //make sure no additional work may be scheduled 163 foreach (var schedPairScheduler in new TaskScheduler[] { schedPair.ConcurrentScheduler, schedPair.ExclusiveScheduler }) 164 { 165 Exception caughtException = null; 166 try 167 { 168 Task.Factory.StartNew(() => { }, CancellationToken.None, TaskCreationOptions.None, schedPairScheduler); 169 } 170 catch (Exception exc) 171 { 172 caughtException = exc; 173 } 174 Assert.True( 175 caughtException is TaskSchedulerException && caughtException.InnerException is InvalidOperationException, 176 "Queueing after completion should fail"); 177 } 178 } 179 180 /// <summary> 181 /// Test to verify that only up to maxItemsPerTask are executed by a single ConcurrentExclusiveScheduler Task 182 /// </summary> 183 /// <remarks>In ConcurrentExclusiveSchedulerPair, each tasks scheduled are run under an internal Task. The basic idea for the test 184 /// is that each time ConcurrentExclusiveScheduler is called QueueTasK a counter (which acts as scheduler's Task id) is incremented. 185 /// When a task executes, it observes the parent Task Id and if it matches the one its local cache, it increments its local counter (which tracks 186 /// the items executed by a ConcurrentExclusiveScheduler Task). At any given time the Task's local counter cant exceed maxItemsPerTask</remarks> 187 [Theory] 188 [InlineData(4, 1, true)] 189 [InlineData(1, 4, true)] 190 [InlineData(4, 1, false)] 191 [InlineData(1, 4, false)] TestMaxItemsPerTask(int maxConcurrency, int maxItemsPerTask, bool completeBeforeTaskWait)192 public static void TestMaxItemsPerTask(int maxConcurrency, int maxItemsPerTask, bool completeBeforeTaskWait) 193 { 194 //Create a custom TaskScheduler with specified max concurrency (TrackingTaskScheduler is defined in Common\tools\CommonUtils\TPLTestSchedulers.cs) 195 TrackingTaskScheduler scheduler = new TrackingTaskScheduler(maxConcurrency); 196 //We need to use the custom scheduler to achieve the results. As a by-product, we test to ensure custom schedulers are supported 197 ConcurrentExclusiveSchedulerPair schedPair = new ConcurrentExclusiveSchedulerPair(scheduler, maxConcurrency, maxItemsPerTask); 198 TaskFactory readers = new TaskFactory(schedPair.ConcurrentScheduler); //get reader and writer schedulers 199 TaskFactory writers = new TaskFactory(schedPair.ExclusiveScheduler); 200 201 //These are threadlocals to ensure that no concurrency side effects occur 202 ThreadLocal<int> itemsExecutedCount = new ThreadLocal<int>(); //Track the items executed by CEScheduler Task 203 ThreadLocal<int> schedulerIDInsideTask = new ThreadLocal<int>(); //Used to store the Scheduler ID observed by a Task Executed by CEScheduler Task 204 205 //Work done by both reader and writer tasks 206 Action work = () => 207 { 208 //Get the id of the parent Task (which is the task created by the scheduler). Each task run by the scheduler task should 209 //see the same SchedulerID value since they are run on the same thread 210 int id = ((TrackingTaskScheduler)scheduler).SchedulerID.Value; 211 if (id == schedulerIDInsideTask.Value) 212 { //since ids match, this is one more Task being executed by the CEScheduler Task 213 itemsExecutedCount.Value = ++itemsExecutedCount.Value; 214 //This does not need to be thread safe since we are looking to ensure that only n number of tasks were executed and not the order 215 //in which they were executed. Also asserting inside the thread is fine since we just want the test to be marked as failure 216 Assert.True(itemsExecutedCount.Value <= maxItemsPerTask, string.Format("itemsExecutedCount={0} cant be greater than maxValue={1}. Parent TaskID={2}", 217 itemsExecutedCount, maxItemsPerTask, id)); 218 } 219 else 220 { //Since ids don't match, this is the first Task being executed in the CEScheduler Task 221 schedulerIDInsideTask.Value = id; //cache the scheduler ID seen by the thread, so other tasks running in same thread can see this 222 itemsExecutedCount.Value = 1; 223 } 224 //Give enough time for a Task to stay around, so that other tasks will be executed by the same CEScheduler Task 225 //or else the CESchedulerTask will die and each Task might get executed by a different CEScheduler Task. This does not affect the 226 //verifications, but its increases the chance of finding a bug if the maxItemPerTask is not respected 227 new ManualResetEvent(false).WaitOne(1); 228 }; 229 230 List<Task> taskList = new List<Task>(); 231 int maxConcurrentTasks = maxConcurrency * maxItemsPerTask * 5; 232 int maxExclusiveTasks = maxConcurrency * maxItemsPerTask * 2; 233 234 // Schedule Tasks in both concurrent and exclusive mode 235 for (int i = 0; i < maxConcurrentTasks; i++) 236 taskList.Add(readers.StartNew(work)); 237 for (int i = 0; i < maxExclusiveTasks; i++) 238 taskList.Add(writers.StartNew(work)); 239 240 if (completeBeforeTaskWait) 241 { 242 schedPair.Complete(); 243 schedPair.Completion.Wait(); 244 Assert.True(taskList.TrueForAll(t => t.IsCompleted), "All tasks should have completed for scheduler to complete"); 245 } 246 247 //finally wait for all of the tasks, to ensure they all executed properly 248 Task.WaitAll(taskList.ToArray()); 249 250 if (!completeBeforeTaskWait) 251 { 252 schedPair.Complete(); 253 schedPair.Completion.Wait(); 254 Assert.True(taskList.TrueForAll(t => t.IsCompleted), "All tasks should have completed for scheduler to complete"); 255 } 256 } 257 258 /// <summary> 259 /// When user specifies a concurrency level above the level allowed by the task scheduler, the concurrency level should be set 260 /// to the concurrencylevel specified in the taskscheduler. Also tests that the maxConcurrencyLevel specified was respected 261 /// </summary> 262 [Fact] TestLowerConcurrencyLevel()263 public static void TestLowerConcurrencyLevel() 264 { 265 //a custom scheduler with maxConcurrencyLevel of one 266 int customSchedulerConcurrency = 1; 267 TrackingTaskScheduler scheduler = new TrackingTaskScheduler(customSchedulerConcurrency); 268 // specify a maxConcurrencyLevel > TaskScheduler's maxconcurrencyLevel to ensure the pair takes the min of the two 269 ConcurrentExclusiveSchedulerPair schedPair = new ConcurrentExclusiveSchedulerPair(scheduler, Int32.MaxValue); 270 Assert.Equal(scheduler.MaximumConcurrencyLevel, schedPair.ConcurrentScheduler.MaximumConcurrencyLevel); 271 272 //Now schedule a reader task that would block and verify that more reader tasks scheduled are not executed 273 //(as long as the first task is blocked) 274 TaskFactory readers = new TaskFactory(schedPair.ConcurrentScheduler); 275 ManualResetEvent blockReaderTaskEvent = new ManualResetEvent(false); 276 ManualResetEvent blockMainThreadEvent = new ManualResetEvent(false); 277 278 //Add a reader tasks that would block 279 readers.StartNew(() => { blockMainThreadEvent.Set(); blockReaderTaskEvent.WaitOne(); }); 280 blockMainThreadEvent.WaitOne(); // wait for the blockedTask to start execution 281 282 //Now add more reader tasks 283 int maxConcurrentTasks = Environment.ProcessorCount; 284 List<Task> taskList = new List<Task>(); 285 for (int i = 0; i < maxConcurrentTasks; i++) 286 taskList.Add(readers.StartNew(() => { })); //schedule some dummy reader tasks 287 288 foreach (Task task in taskList) 289 { 290 bool wasTaskStarted = (task.Status != TaskStatus.Running) && (task.Status != TaskStatus.RanToCompletion); 291 Assert.True(wasTaskStarted, string.Format("Additional reader tasks should not start when scheduler concurrency is {0} and a reader task is blocked", customSchedulerConcurrency)); 292 } 293 294 //finally unblock the blocjedTask and wait for all of the tasks, to ensure they all executed properly 295 blockReaderTaskEvent.Set(); 296 Task.WaitAll(taskList.ToArray()); 297 } 298 299 [Theory] 300 [InlineData(true)] 301 [InlineData(false)] TestConcurrentBlockage(bool useReader)302 public static void TestConcurrentBlockage(bool useReader) 303 { 304 ConcurrentExclusiveSchedulerPair schedPair = new ConcurrentExclusiveSchedulerPair(); 305 TaskFactory readers = new TaskFactory(schedPair.ConcurrentScheduler); 306 TaskFactory writers = new TaskFactory(schedPair.ExclusiveScheduler); 307 ManualResetEvent blockExclusiveTaskEvent = new ManualResetEvent(false); 308 ManualResetEvent blockMainThreadEvent = new ManualResetEvent(false); 309 ManualResetEvent blockMre = new ManualResetEvent(false); 310 311 //Schedule a concurrent task and ensure that it is executed, just for fun 312 Task<bool> conTask = readers.StartNew<bool>(() => { new ManualResetEvent(false).WaitOne(10); ; return true; }); 313 conTask.Wait(); 314 Assert.True(conTask.Result, "The concurrenttask when executed successfully should have returned true"); 315 316 //Now scehdule an exclusive task that is blocked(thereby preventing other concurrent tasks to finish) 317 Task<bool> exclusiveTask = writers.StartNew<bool>(() => { blockMainThreadEvent.Set(); blockExclusiveTaskEvent.WaitOne(); return true; }); 318 319 //With exclusive task in execution mode, schedule a number of concurrent tasks and ensure they are not executed 320 blockMainThreadEvent.WaitOne(); 321 List<Task> taskList = new List<Task>(); 322 for (int i = 0; i < 20; i++) taskList.Add(readers.StartNew<bool>(() => { blockMre.WaitOne(10); return true; })); 323 324 foreach (Task task in taskList) 325 { 326 bool wasTaskStarted = (task.Status != TaskStatus.Running) && (task.Status != TaskStatus.RanToCompletion); 327 Assert.True(wasTaskStarted, "Concurrent tasks should not be executed when an exclusive task is getting executed"); 328 } 329 330 blockExclusiveTaskEvent.Set(); 331 Task.WaitAll(taskList.ToArray()); 332 } 333 334 [Theory] 335 [MemberData(nameof(ApiType))] TestIntegration(String apiType, bool useReader)336 public static void TestIntegration(String apiType, bool useReader) 337 { 338 Debug.WriteLine(string.Format(" Running apiType:{0} useReader:{1}", apiType, useReader)); 339 int taskCount = Environment.ProcessorCount; //To get varying number of tasks as a function of cores 340 ConcurrentExclusiveSchedulerPair schedPair = new ConcurrentExclusiveSchedulerPair(); 341 CountdownEvent cde = new CountdownEvent(taskCount); //Used to track how many tasks were executed 342 Action work = () => { cde.Signal(); }; //Work done by all APIs 343 //Choose the right scheduler to use based on input parameter 344 TaskScheduler scheduler = useReader ? schedPair.ConcurrentScheduler : schedPair.ExclusiveScheduler; 345 346 SelectAPI2Target(apiType, taskCount, scheduler, work); 347 cde.Wait(); //This will cause the test to block (and timeout) until all tasks are finished 348 } 349 350 351 352 /// <summary> 353 /// Test to ensure that invalid parameters result in exceptions 354 /// </summary> 355 [Fact] TestInvalidParameters()356 public static void TestInvalidParameters() 357 { 358 Assert.Throws<ArgumentNullException>(() => new ConcurrentExclusiveSchedulerPair(null)); //TargetScheduler is null 359 Assert.Throws<ArgumentOutOfRangeException>(() => new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 0)); //maxConcurrencyLevel is invalid 360 Assert.Throws<ArgumentOutOfRangeException>(() => new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, -2)); //maxConcurrencyLevel is invalid 361 Assert.Throws<ArgumentOutOfRangeException>(() => new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, -1, 0)); //maxItemsPerTask is invalid 362 Assert.Throws<ArgumentOutOfRangeException>(() => new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, -1, -2)); //maxItemsPerTask is invalid 363 } 364 365 /// <summary> 366 /// Test to ensure completion task works successfully 367 /// </summary> 368 [Fact] TestCompletionTask()369 public static void TestCompletionTask() 370 { 371 // Completion tasks is valid after initialization 372 { 373 var cesp = new ConcurrentExclusiveSchedulerPair(); 374 Assert.True(cesp.Completion != null, "CompletionTask should never be null (after initialization)"); 375 Assert.True(!cesp.Completion.IsCompleted, "CompletionTask should not have completed"); 376 } 377 378 // Completion task is valid after complete is called 379 { 380 var cesp = new ConcurrentExclusiveSchedulerPair(); 381 cesp.Complete(); 382 Assert.True(cesp.Completion != null, "CompletionTask should never be null (after complete)"); 383 cesp.Completion.Wait(); 384 } 385 386 // Complete method may be called multiple times, and CompletionTask still completes 387 { 388 var cesp = new ConcurrentExclusiveSchedulerPair(); 389 for (int i = 0; i < 20; i++) cesp.Complete(); // ensure multiple calls to Complete succeed 390 Assert.True(cesp.Completion != null, "CompletionTask should never be null (after multiple completes)"); 391 cesp.Completion.Wait(); 392 } 393 394 // Can create a bunch of schedulers, do work on them all, complete them all, and they all complete 395 { 396 var cesps = new ConcurrentExclusiveSchedulerPair[100]; 397 for (int i = 0; i < cesps.Length; i++) 398 { 399 cesps[i] = new ConcurrentExclusiveSchedulerPair(); 400 } 401 for (int i = 0; i < cesps.Length; i++) 402 { 403 Action work = () => new ManualResetEvent(false).WaitOne(2); ; 404 Task.Factory.StartNew(work, CancellationToken.None, TaskCreationOptions.None, cesps[i].ConcurrentScheduler); 405 Task.Factory.StartNew(work, CancellationToken.None, TaskCreationOptions.None, cesps[i].ExclusiveScheduler); 406 } 407 for (int i = 0; i < cesps.Length; i++) 408 { 409 cesps[i].Complete(); 410 cesps[i].Completion.Wait(); 411 } 412 } 413 414 // Validate that CESP does not implement IDisposable 415 Assert.Equal(null, new ConcurrentExclusiveSchedulerPair() as IDisposable); 416 } 417 418 /// <summary> 419 /// Ensure that CESPs can be layered on other CESPs. 420 /// </summary 421 [Fact] TestSchedulerNesting()422 public static void TestSchedulerNesting() 423 { 424 // Create a hierarchical set of scheduler pairs 425 var cespParent = new ConcurrentExclusiveSchedulerPair(); 426 427 var cespChild1 = new ConcurrentExclusiveSchedulerPair(cespParent.ConcurrentScheduler); 428 var cespChild1Child1 = new ConcurrentExclusiveSchedulerPair(cespChild1.ConcurrentScheduler); 429 var cespChild1Child2 = new ConcurrentExclusiveSchedulerPair(cespChild1.ExclusiveScheduler); 430 431 var cespChild2 = new ConcurrentExclusiveSchedulerPair(cespParent.ExclusiveScheduler); 432 var cespChild2Child1 = new ConcurrentExclusiveSchedulerPair(cespChild2.ConcurrentScheduler); 433 var cespChild2Child2 = new ConcurrentExclusiveSchedulerPair(cespChild2.ExclusiveScheduler); 434 435 // these are ordered such that we will complete the child schedulers before we complete their parents. That way 436 // we don't complete a parent that's still in use. 437 var cesps = new[] { 438 cespChild1Child1, 439 cespChild1Child2, 440 cespChild1, 441 cespChild2Child1, 442 cespChild2Child2, 443 cespChild2, 444 cespParent, 445 }; 446 447 // Get the schedulers from all of the pairs 448 List<TaskScheduler> schedulers = new List<TaskScheduler>(); 449 foreach (var s in cesps) 450 { 451 schedulers.Add(s.ConcurrentScheduler); 452 schedulers.Add(s.ExclusiveScheduler); 453 } 454 455 // Keep track of all created tasks 456 var tasks = new List<Task>(); 457 458 // Queue lots of work to each scheduler 459 foreach (var scheduler in schedulers) 460 { 461 // Create a function that schedules and inlines recursively queued tasks 462 Action<int> recursiveWork = null; 463 recursiveWork = depth => 464 { 465 if (depth > 0) 466 { 467 Action work = () => 468 { 469 var sw = new SpinWait(); 470 while (!sw.NextSpinWillYield) sw.SpinOnce(); 471 recursiveWork(depth - 1); 472 }; 473 474 TaskFactory factory = new TaskFactory(scheduler); 475 Debug.WriteLine(string.Format("Start tasks in scheduler {0}", scheduler.Id)); 476 Task t1 = factory.StartNew(work); Task t2 = factory.StartNew(work); Task t3 = factory.StartNew(work); 477 Task.WaitAll(t1, t2, t3); 478 } 479 }; 480 481 for (int i = 0; i < 2; i++) 482 { 483 tasks.Add(Task.Factory.StartNew(() => recursiveWork(2), CancellationToken.None, TaskCreationOptions.None, scheduler)); 484 } 485 } 486 487 // Wait for all tasks to complete, then complete the schedulers 488 Task.WaitAll(tasks.ToArray()); 489 foreach (var cesp in cesps) 490 { 491 cesp.Complete(); 492 cesp.Completion.Wait(); 493 } 494 } 495 496 /// <summary> 497 /// Ensure that continuations and parent/children which hop between concurrent and exclusive work correctly. 498 /// EH 499 /// </summary> 500 [Theory] 501 [InlineData(true)] 502 [InlineData(false)] TestConcurrentExclusiveChain(bool syncContinuations)503 public static void TestConcurrentExclusiveChain(bool syncContinuations) 504 { 505 var scheduler = new TrackingTaskScheduler(Environment.ProcessorCount); 506 var cesp = new ConcurrentExclusiveSchedulerPair(scheduler); 507 508 // continuations 509 { 510 var starter = new Task(() => { }); 511 var t = starter; 512 for (int i = 0; i < 10; i++) 513 { 514 t = t.ContinueWith(delegate { }, CancellationToken.None, syncContinuations ? TaskContinuationOptions.ExecuteSynchronously : TaskContinuationOptions.None, cesp.ConcurrentScheduler); 515 t = t.ContinueWith(delegate { }, CancellationToken.None, syncContinuations ? TaskContinuationOptions.ExecuteSynchronously : TaskContinuationOptions.None, cesp.ExclusiveScheduler); 516 } 517 starter.Start(cesp.ExclusiveScheduler); 518 t.Wait(); 519 } 520 521 // parent/child 522 { 523 var errorString = "hello faulty world"; 524 var root = Task.Factory.StartNew(() => 525 { 526 Task.Factory.StartNew(() => 527 { 528 Task.Factory.StartNew(() => 529 { 530 Task.Factory.StartNew(() => 531 { 532 Task.Factory.StartNew(() => 533 { 534 Task.Factory.StartNew(() => 535 { 536 Task.Factory.StartNew(() => 537 { 538 throw new InvalidOperationException(errorString); 539 }, CancellationToken.None, TaskCreationOptions.AttachedToParent, cesp.ExclusiveScheduler).Wait(); 540 }, CancellationToken.None, TaskCreationOptions.AttachedToParent, cesp.ExclusiveScheduler); 541 }, CancellationToken.None, TaskCreationOptions.AttachedToParent, cesp.ConcurrentScheduler); 542 }, CancellationToken.None, TaskCreationOptions.AttachedToParent, cesp.ExclusiveScheduler); 543 }, CancellationToken.None, TaskCreationOptions.AttachedToParent, cesp.ConcurrentScheduler); 544 }, CancellationToken.None, TaskCreationOptions.AttachedToParent, cesp.ExclusiveScheduler); 545 }, CancellationToken.None, TaskCreationOptions.None, cesp.ConcurrentScheduler); 546 547 ((IAsyncResult)root).AsyncWaitHandle.WaitOne(); 548 Assert.True(root.IsFaulted, "Root should have been faulted by child's error"); 549 var ae = root.Exception.Flatten(); 550 Assert.True(ae.InnerException is InvalidOperationException && ae.InnerException.Message == errorString, 551 "Child's exception should have propagated to the root."); 552 } 553 } 554 #endregion 555 556 #region Helper Methods 557 SelectAPI2Target(string apiType, int taskCount, TaskScheduler scheduler, Action work)558 public static void SelectAPI2Target(string apiType, int taskCount, TaskScheduler scheduler, Action work) 559 { 560 switch (apiType) 561 { 562 case "StartNew": 563 for (int i = 0; i < taskCount; i++) new TaskFactory(scheduler).StartNew(() => { work(); }); 564 break; 565 case "Start": 566 for (int i = 0; i < taskCount; i++) new Task(() => { work(); }).Start(scheduler); 567 break; 568 case "ContinueWith": 569 for (int i = 0; i < taskCount; i++) 570 { 571 new TaskFactory().StartNew(() => { }).ContinueWith((t) => { work(); }, scheduler); 572 } 573 break; 574 case "FromAsync": 575 for (int i = 0; i < taskCount; i++) 576 { 577 new TaskFactory(scheduler).FromAsync(Task.Factory.StartNew(() => { }), (iar) => { work(); }); 578 } 579 break; 580 case "ContinueWhenAll": 581 for (int i = 0; i < taskCount; i++) 582 { 583 new TaskFactory(scheduler).ContinueWhenAll(new Task[] { Task.Factory.StartNew(() => { }) }, (t) => { work(); }); 584 } 585 break; 586 case "ContinueWhenAny": 587 for (int i = 0; i < taskCount; i++) 588 { 589 new TaskFactory(scheduler).ContinueWhenAny(new Task[] { Task.Factory.StartNew(() => { }) }, (t) => { work(); }); 590 } 591 break; 592 default: 593 throw new ArgumentOutOfRangeException(String.Format("Api name specified {0} is invalid or is of incorrect case", apiType)); 594 } 595 } 596 597 /// <summary> 598 /// Used to provide parameters for the TestIntegration test 599 /// </summary> 600 public static IEnumerable<object[]> ApiType 601 { 602 get 603 { 604 List<Object[]> values = new List<object[]>(); 605 foreach (String apiType in new String[] { 606 "StartNew", "Start", "ContinueWith", /* FromAsync: Not supported in .NET Native */ "ContinueWhenAll", "ContinueWhenAny" }) 607 { 608 foreach (bool useReader in new bool[] { true, false }) 609 { 610 values.Add(new Object[] { apiType, useReader }); 611 } 612 } 613 614 return values; 615 } 616 } 617 618 #endregion 619 } 620 } 621