1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Generic; 6 using System.Threading; 7 using System.Threading.Tasks; 8 using Xunit; 9 10 namespace System.Collections.Concurrent.Tests 11 { 12 public class BlockingCollectionCancellationTests 13 { 14 [Fact] InternalCancellation_CompleteAdding_Negative()15 public static void InternalCancellation_CompleteAdding_Negative() 16 { 17 BlockingCollection<int> coll1 = new BlockingCollection<int>(); 18 19 Task.Run(() => coll1.CompleteAdding()); 20 //call Take.. it should wake up with an OCE. when CompleteAdding() is called. 21 Assert.Throws<InvalidOperationException>(() => coll1.Take()); 22 // "InternalCancellation_WakingUpTake: an IOE should be thrown if CompleteAdding occurs during blocking Take()"); 23 Assert.Throws<InvalidOperationException>(() => coll1.Add(1)); 24 // "InternalCancellation_WakingUpAdd: an InvalidOpEx should be thrown if CompleteAdding occurs during blocking Add()"); 25 Assert.Throws<InvalidOperationException>(() => coll1.TryAdd(1, 1000000)); //an indefinite wait to add.. 1000 seconds. 26 // "InternalCancellation_WakingUpTryAdd: an InvalidOpEx should be thrown if CompleteAdding occurs during blocking Add()"); 27 } 28 29 //This tests that Take/TryTake wake up correctly if CompleteAdding() is called while waiting 30 [Fact] InternalCancellation_WakingUp()31 public static void InternalCancellation_WakingUp() 32 { 33 for (int test = 0; test < 2; test++) 34 { 35 BlockingCollection<int> coll1 = new BlockingCollection<int>(1); 36 coll1.Add(1); //fills the collection. 37 Assert.False(coll1.IsAddingCompleted, 38 "InternalCancellation_WakingUp: At this point CompleteAdding should not have occurred."); 39 40 // This is racy on what we want to test, in that it's possible this queued work could execute 41 // so quickly that CompleteAdding happens before the tested method gets invoked, but the test 42 // should still pass in such cases, we're just testing something other than we'd planned. 43 Task t = Task.Run(() => coll1.CompleteAdding()); 44 45 // Try different methods that should wake up once CompleteAdding has been called 46 int item = coll1.Take(); // remove the existing item in the collection 47 switch (test) 48 { 49 case 0: 50 Assert.Throws<InvalidOperationException>(() => coll1.Take()); 51 break; 52 case 1: 53 Assert.False(coll1.TryTake(out item)); 54 break; 55 } 56 57 t.Wait(); 58 59 Assert.True(coll1.IsAddingCompleted, 60 "InternalCancellation_WakingUp: At this point CompleteAdding should have occurred."); 61 } 62 } 63 64 [Fact] ExternalCancel_Negative()65 public static void ExternalCancel_Negative() 66 { 67 BlockingCollection<int> bc = new BlockingCollection<int>(); //empty collection. 68 69 CancellationTokenSource cs = new CancellationTokenSource(); 70 Task.Run(() => cs.Cancel()); 71 72 int item; 73 EnsureOperationCanceledExceptionThrown( 74 () => bc.Take(cs.Token), cs.Token, 75 "ExternalCancel_Take: The operation should wake up via token cancellation."); 76 EnsureOperationCanceledExceptionThrown( 77 () => bc.TryTake(out item, 100000, cs.Token), cs.Token, 78 "ExternalCancel_TryTake: The operation should wake up via token cancellation."); 79 EnsureOperationCanceledExceptionThrown( 80 () => bc.Add(1, cs.Token), cs.Token, 81 "ExternalCancel_Add: The operation should wake up via token cancellation."); 82 EnsureOperationCanceledExceptionThrown( 83 () => bc.TryAdd(1, 100000, cs.Token), // a long timeout. 84 cs.Token, 85 "ExternalCancel_TryAdd: The operation should wake up via token cancellation."); 86 87 BlockingCollection<int> bc1 = new BlockingCollection<int>(1); 88 BlockingCollection<int> bc2 = new BlockingCollection<int>(1); 89 bc1.Add(1); //fill the bc. 90 bc2.Add(1); //fill the bc. 91 EnsureOperationCanceledExceptionThrown( 92 () => BlockingCollection<int>.AddToAny(new[] { bc1, bc2 }, 1, cs.Token), 93 cs.Token, 94 "ExternalCancel_AddToAny: The operation should wake up via token cancellation."); 95 EnsureOperationCanceledExceptionThrown( 96 () => BlockingCollection<int>.TryAddToAny(new[] { bc1, bc2 }, 1, 10000, cs.Token), 97 cs.Token, 98 "ExternalCancel_AddToAny: The operation should wake up via token cancellation."); 99 100 IEnumerable<int> enumerable = bc.GetConsumingEnumerable(cs.Token); 101 EnsureOperationCanceledExceptionThrown( 102 () => enumerable.GetEnumerator().MoveNext(), 103 cs.Token, "ExternalCancel_GetConsumingEnumerable: The operation should wake up via token cancellation."); 104 } 105 106 [Fact] ExternalCancel_AddToAny()107 public static void ExternalCancel_AddToAny() 108 { 109 for (int test = 0; test < 3; test++) 110 { 111 BlockingCollection<int> bc1 = new BlockingCollection<int>(1); 112 BlockingCollection<int> bc2 = new BlockingCollection<int>(1); 113 bc1.Add(1); //fill the bc. 114 bc2.Add(1); //fill the bc. 115 116 // This may or may not cancel before {Try}AddToAny executes, but either way the test should pass. 117 // A delay could be used to attempt to force the right timing, but not for an inner loop test. 118 CancellationTokenSource cs = new CancellationTokenSource(); 119 Task.Run(() => cs.Cancel()); 120 Assert.Throws<OperationCanceledException>(() => 121 { 122 switch (test) 123 { 124 case 0: 125 BlockingCollection<int>.AddToAny(new[] { bc1, bc2 }, 42, cs.Token); 126 break; 127 case 1: 128 BlockingCollection<int>.TryAddToAny(new[] { bc1, bc2 }, 42, Timeout.Infinite, cs.Token); 129 break; 130 case 2: 131 BlockingCollection<int>.TryAddToAny(new[] { bc1, bc2 }, 42, (int)TimeSpan.FromDays(1).TotalMilliseconds, cs.Token); 132 break; 133 } 134 }); 135 Assert.True(cs.IsCancellationRequested); 136 } 137 } 138 139 [Fact] ExternalCancel_GetConsumingEnumerable()140 public static void ExternalCancel_GetConsumingEnumerable() 141 { 142 BlockingCollection<int> bc = new BlockingCollection<int>(); 143 bc.Add(1); 144 bc.Add(2); 145 146 var cs = new CancellationTokenSource(); 147 int total = 0; 148 Assert.Throws<OperationCanceledException>(() => 149 { 150 foreach (int item in bc.GetConsumingEnumerable(cs.Token)) 151 { 152 total += item; 153 cs.Cancel(); 154 } 155 }); 156 Assert.True(cs.IsCancellationRequested); 157 Assert.Equal(expected: 1, actual: total); 158 } 159 160 #region Helper Methods 161 EnsureOperationCanceledExceptionThrown(Action action, CancellationToken token, string message)162 public static void EnsureOperationCanceledExceptionThrown(Action action, CancellationToken token, string message) 163 { 164 OperationCanceledException operationCanceledEx = 165 Assert.Throws<OperationCanceledException>(action); // "BlockingCollectionCancellationTests: OperationCanceledException not thrown."); 166 Assert.Equal(token, operationCanceledEx.CancellationToken); 167 } 168 #endregion 169 } 170 } 171