1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Generic;
6 using System.Threading;
7 using System.Threading.Tasks;
8 using Xunit;
9 
10 namespace System.Collections.Concurrent.Tests
11 {
12     public class BlockingCollectionCancellationTests
13     {
14         [Fact]
InternalCancellation_CompleteAdding_Negative()15         public static void InternalCancellation_CompleteAdding_Negative()
16         {
17             BlockingCollection<int> coll1 = new BlockingCollection<int>();
18 
19             Task.Run(() => coll1.CompleteAdding());
20             //call Take.. it should wake up with an OCE. when CompleteAdding() is called.
21             Assert.Throws<InvalidOperationException>(() => coll1.Take());
22             // "InternalCancellation_WakingUpTake:  an IOE should be thrown if CompleteAdding occurs during blocking Take()");
23             Assert.Throws<InvalidOperationException>(() => coll1.Add(1));
24             // "InternalCancellation_WakingUpAdd:  an InvalidOpEx should be thrown if CompleteAdding occurs during blocking Add()");
25             Assert.Throws<InvalidOperationException>(() => coll1.TryAdd(1, 1000000));  //an indefinite wait to add.. 1000 seconds.
26             // "InternalCancellation_WakingUpTryAdd:  an InvalidOpEx should be thrown if CompleteAdding occurs during blocking Add()");
27         }
28 
29         //This tests that Take/TryTake wake up correctly if CompleteAdding() is called while waiting
30         [Fact]
InternalCancellation_WakingUp()31         public static void InternalCancellation_WakingUp()
32         {
33             for (int test = 0; test < 2; test++)
34             {
35                 BlockingCollection<int> coll1 = new BlockingCollection<int>(1);
36                 coll1.Add(1); //fills the collection.
37                 Assert.False(coll1.IsAddingCompleted,
38                    "InternalCancellation_WakingUp:  At this point CompleteAdding should not have occurred.");
39 
40                 // This is racy on what we want to test, in that it's possible this queued work could execute
41                 // so quickly that CompleteAdding happens before the tested method gets invoked, but the test
42                 // should still pass in such cases, we're just testing something other than we'd planned.
43                 Task t = Task.Run(() => coll1.CompleteAdding());
44 
45                 // Try different methods that should wake up once CompleteAdding has been called
46                 int item = coll1.Take(); // remove the existing item in the collection
47                 switch (test)
48                 {
49                     case 0:
50                         Assert.Throws<InvalidOperationException>(() => coll1.Take());
51                         break;
52                     case 1:
53                         Assert.False(coll1.TryTake(out item));
54                         break;
55                 }
56 
57                 t.Wait();
58 
59                 Assert.True(coll1.IsAddingCompleted,
60                    "InternalCancellation_WakingUp:  At this point CompleteAdding should have occurred.");
61             }
62         }
63 
64         [Fact]
ExternalCancel_Negative()65         public static void ExternalCancel_Negative()
66         {
67             BlockingCollection<int> bc = new BlockingCollection<int>(); //empty collection.
68 
69             CancellationTokenSource cs = new CancellationTokenSource();
70             Task.Run(() => cs.Cancel());
71 
72             int item;
73             EnsureOperationCanceledExceptionThrown(
74                 () => bc.Take(cs.Token), cs.Token,
75                 "ExternalCancel_Take:  The operation should wake up via token cancellation.");
76             EnsureOperationCanceledExceptionThrown(
77                () => bc.TryTake(out item, 100000, cs.Token), cs.Token,
78                 "ExternalCancel_TryTake:  The operation should wake up via token cancellation.");
79             EnsureOperationCanceledExceptionThrown(
80                 () => bc.Add(1, cs.Token), cs.Token,
81                 "ExternalCancel_Add:  The operation should wake up via token cancellation.");
82             EnsureOperationCanceledExceptionThrown(
83                 () => bc.TryAdd(1, 100000, cs.Token), // a long timeout.
84                 cs.Token,
85                 "ExternalCancel_TryAdd:  The operation should wake up via token cancellation.");
86 
87             BlockingCollection<int> bc1 = new BlockingCollection<int>(1);
88             BlockingCollection<int> bc2 = new BlockingCollection<int>(1);
89             bc1.Add(1); //fill the bc.
90             bc2.Add(1); //fill the bc.
91             EnsureOperationCanceledExceptionThrown(
92                 () => BlockingCollection<int>.AddToAny(new[] { bc1, bc2 }, 1, cs.Token),
93                 cs.Token,
94                 "ExternalCancel_AddToAny:  The operation should wake up via token cancellation.");
95             EnsureOperationCanceledExceptionThrown(
96                () => BlockingCollection<int>.TryAddToAny(new[] { bc1, bc2 }, 1, 10000, cs.Token),
97                cs.Token,
98                "ExternalCancel_AddToAny:  The operation should wake up via token cancellation.");
99 
100             IEnumerable<int> enumerable = bc.GetConsumingEnumerable(cs.Token);
101             EnsureOperationCanceledExceptionThrown(
102                () => enumerable.GetEnumerator().MoveNext(),
103                cs.Token, "ExternalCancel_GetConsumingEnumerable:  The operation should wake up via token cancellation.");
104         }
105 
106         [Fact]
ExternalCancel_AddToAny()107         public static void ExternalCancel_AddToAny()
108         {
109             for (int test = 0; test < 3; test++)
110             {
111                 BlockingCollection<int> bc1 = new BlockingCollection<int>(1);
112                 BlockingCollection<int> bc2 = new BlockingCollection<int>(1);
113                 bc1.Add(1); //fill the bc.
114                 bc2.Add(1); //fill the bc.
115 
116                 // This may or may not cancel before {Try}AddToAny executes, but either way the test should pass.
117                 // A delay could be used to attempt to force the right timing, but not for an inner loop test.
118                 CancellationTokenSource cs = new CancellationTokenSource();
119                 Task.Run(() => cs.Cancel());
120                 Assert.Throws<OperationCanceledException>(() =>
121                 {
122                     switch (test)
123                     {
124                         case 0:
125                             BlockingCollection<int>.AddToAny(new[] { bc1, bc2 }, 42, cs.Token);
126                             break;
127                         case 1:
128                             BlockingCollection<int>.TryAddToAny(new[] { bc1, bc2 }, 42, Timeout.Infinite, cs.Token);
129                             break;
130                         case 2:
131                             BlockingCollection<int>.TryAddToAny(new[] { bc1, bc2 }, 42, (int)TimeSpan.FromDays(1).TotalMilliseconds, cs.Token);
132                             break;
133                     }
134                 });
135                 Assert.True(cs.IsCancellationRequested);
136             }
137         }
138 
139         [Fact]
ExternalCancel_GetConsumingEnumerable()140         public static void ExternalCancel_GetConsumingEnumerable()
141         {
142             BlockingCollection<int> bc = new BlockingCollection<int>();
143             bc.Add(1);
144             bc.Add(2);
145 
146             var cs = new CancellationTokenSource();
147             int total = 0;
148             Assert.Throws<OperationCanceledException>(() =>
149             {
150                 foreach (int item in bc.GetConsumingEnumerable(cs.Token))
151                 {
152                     total += item;
153                     cs.Cancel();
154                 }
155             });
156             Assert.True(cs.IsCancellationRequested);
157             Assert.Equal(expected: 1, actual: total);
158         }
159 
160         #region Helper Methods
161 
EnsureOperationCanceledExceptionThrown(Action action, CancellationToken token, string message)162         public static void EnsureOperationCanceledExceptionThrown(Action action, CancellationToken token, string message)
163         {
164             OperationCanceledException operationCanceledEx =
165                 Assert.Throws<OperationCanceledException>(action); // "BlockingCollectionCancellationTests: OperationCanceledException not thrown.");
166             Assert.Equal(token, operationCanceledEx.CancellationToken);
167         }
168         #endregion
169     }
170 }
171