1 //
2 // CompletionTest.cs
3 //
4 // Author:
5 //       Jérémie "garuma" Laval <jeremie.laval@gmail.com>
6 //       Petr Onderka <gsvick@gmail.com>
7 //
8 // Copyright (c) 2011 Jérémie "garuma" Laval
9 // Copyright (c) 2012 Petr Onderka
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining a copy
12 // of this software and associated documentation files (the "Software"), to deal
13 // in the Software without restriction, including without limitation the rights
14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 // copies of the Software, and to permit persons to whom the Software is
16 // furnished to do so, subject to the following conditions:
17 //
18 // The above copyright notice and this permission notice shall be included in
19 // all copies or substantial portions of the Software.
20 //
21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 // THE SOFTWARE.
28 
29 using System;
30 using System.Collections.Generic;
31 using System.Threading;
32 using System.Threading.Tasks;
33 using System.Threading.Tasks.Dataflow;
34 using NUnit.Framework;
35 using System.Linq;
36 
37 namespace MonoTests.System.Threading.Tasks.Dataflow {
38 	[TestFixture]
39 	public class CompletionTest {
40 		[Test]
WithElementsStillLingering()41 		public void WithElementsStillLingering ()
42 		{
43 			var block = new BufferBlock<int> ();
44 			Assert.IsTrue (block.Post (42));
45 			block.Complete ();
46 
47 			Assert.IsFalse (block.Completion.Wait (100));
48 			Assert.IsFalse (block.Completion.IsCompleted);
49 			Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status);
50 
51 			Assert.AreEqual (42, block.Receive ());
52 
53 			Assert.IsTrue (block.Completion.Wait (1000));
54 			Assert.IsTrue (block.Completion.IsCompleted);
55 			Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status);
56 		}
57 
58 		[Test]
WithElementsStillLingeringButFaulted()59 		public void WithElementsStillLingeringButFaulted ()
60 		{
61 			var block = new BufferBlock<int> ();
62 			Assert.IsTrue (block.Post (42));
63 			((IDataflowBlock)block).Fault (new Exception ());
64 
65 			AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
66 			Assert.IsTrue (block.Completion.IsCompleted);
67 			Assert.AreEqual (TaskStatus.Faulted, block.Completion.Status);
68 			Assert.IsFalse (block.Post (43));
69 		}
70 
71 		[Test]
WithElementsStillLingeringButCancelled()72 		public void WithElementsStillLingeringButCancelled ()
73 		{
74 			var tokenSource = new CancellationTokenSource ();
75 			var block = new BufferBlock<int> (
76 				new DataflowBlockOptions { CancellationToken = tokenSource.Token });
77 			Assert.IsTrue (block.Post (42));
78 			tokenSource.Cancel ();
79 
80 			var ae = AssertEx.Throws<AggregateException> (
81 				() => block.Completion.Wait (1000));
82 			Assert.AreEqual (1, ae.InnerExceptions.Count);
83 			Assert.AreEqual (typeof(TaskCanceledException), ae.InnerException.GetType ());
84 
85 			Assert.IsTrue (block.Completion.IsCompleted);
86 			Assert.AreEqual (TaskStatus.Canceled, block.Completion.Status);
87 			Assert.IsFalse (block.Post (43));
88 		}
89 
90 		static IEnumerable<Tuple<IDataflowBlock, ITargetBlock<T>>>
GetJoinBlocksWithTargets()91 			GetJoinBlocksWithTargets<T> ()
92 		{
93 			Func<IDataflowBlock, ITargetBlock<T>, Tuple<IDataflowBlock, ITargetBlock<T>>>
94 				createTuple = Tuple.Create;
95 
96 			var joinBlock = new JoinBlock<T, T> ();
97 			yield return createTuple (joinBlock, joinBlock.Target1);
98 			var joinBlock3 = new JoinBlock<T, T, T> ();
99 			yield return createTuple (joinBlock3, joinBlock3.Target1);
100 			var batchedJoinBlock = new BatchedJoinBlock<T, T> (2);
101 			yield return createTuple (batchedJoinBlock, batchedJoinBlock.Target1);
102 			var batchedJoinBlock3 = new BatchedJoinBlock<T, T, T> (2);
103 			yield return createTuple (batchedJoinBlock3, batchedJoinBlock3.Target1);
104 		}
105 
106 		[Test]
JoinTargetCompletitionTest()107 		public void JoinTargetCompletitionTest ()
108 		{
109 			foreach (var tuple in GetJoinBlocksWithTargets<int> ()) {
110 				AssertEx.Throws<NotSupportedException> (
111 					() => { var x = tuple.Item2.Completion; });
112 				Assert.IsTrue (tuple.Item2.Post (1));
113 				tuple.Item2.Complete ();
114 				Assert.IsFalse (tuple.Item2.Post (2));
115 			}
116 
117 			foreach (var tuple in GetJoinBlocksWithTargets<int> ()) {
118 				Assert.IsTrue (tuple.Item2.Post (1));
119 				tuple.Item1.Complete ();
120 				Assert.IsFalse (tuple.Item2.Post (2));
121 			}
122 		}
123 
124 		[Test]
MultipleFaultsTest()125 		public void MultipleFaultsTest ()
126 		{
127 			IDataflowBlock block = new BufferBlock<int> ();
128 
129 			block.Fault (new Exception ("1"));
130 			// second exception should be ignored
131 			block.Fault (new Exception ("2"));
132 
133 			Thread.Sleep (100);
134 
135 			Assert.IsTrue (block.Completion.IsFaulted);
136 			var exception = block.Completion.Exception;
137 			Assert.IsNotNull (exception);
138 			Assert.AreEqual (1, exception.InnerExceptions.Count);
139 			Assert.AreEqual ("1", exception.InnerException.Message);
140 		}
141 
142 		[Test]
MultipleFaultsWhileExecutingTest()143 		public void MultipleFaultsWhileExecutingTest ()
144 		{
145 			var evt = new ManualResetEventSlim ();
146 
147 			var actionBlock = new ActionBlock<int> (_ => evt.Wait ());
148 			IDataflowBlock dataflowBlock = actionBlock;
149 
150 			actionBlock.Post (1);
151 			Thread.Sleep (100);
152 
153 			dataflowBlock.Fault (new Exception ("1"));
154 			// second exception should still be ignored
155 			dataflowBlock.Fault (new Exception ("2"));
156 
157 			Thread.Sleep (100);
158 
159 			Assert.IsFalse (actionBlock.Completion.IsCompleted);
160 
161 			evt.Set ();
162 
163 			Thread.Sleep (100);
164 
165 			Assert.IsTrue (actionBlock.Completion.IsFaulted);
166 			var exception = actionBlock.Completion.Exception;
167 			Assert.IsNotNull (exception);
168 			Assert.AreEqual (1, exception.InnerExceptions.Count);
169 			Assert.AreEqual ("1", exception.InnerException.Message);
170 		}
171 
172 		[Test]
MultipleExceptionsTest()173 		public void MultipleExceptionsTest ()
174 		{
175 			// use barrier to make sure both threads have time to start
176 			var barrier = new Barrier (2);
177 
178 			var block = new ActionBlock<int> (
179 				_ =>
180 				{
181 					barrier.SignalAndWait ();
182 					throw new Exception ();
183 				},
184 				// strictly speaking, the actions are not guaranteed to run in parallel,
185 				// but there is no way to test this otherwise
186 				new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 });
187 
188 			block.Post (1);
189 			block.Post (2);
190 
191 			var exception =
192 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
193 
194 			Assert.AreEqual (2, exception.InnerExceptions.Count);
195 		}
196 
197 		[Test]
ExceptionAndFaultTest()198 		public void ExceptionAndFaultTest ()
199 		{
200 			var block = new ActionBlock<int> (
201 				_ => { throw new Exception ("action"); },
202 				new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 });
203 
204 			block.Post (1);
205 
206 			Thread.Sleep (100);
207 
208 			((IDataflowBlock)block).Fault (new Exception ("fault"));
209 
210 			var exception =
211 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
212 
213 			Assert.AreEqual (1, exception.InnerExceptions.Count);
214 			Assert.AreEqual ("action", exception.InnerException.Message);
215 		}
216 
217 		[Test]
FaultAndExceptionTest()218 		public void FaultAndExceptionTest ()
219 		{
220 			var evt = new ManualResetEventSlim ();
221 			var block = new ActionBlock<int> (
222 				_ =>
223 				{
224 					evt.Wait ();
225 					throw new Exception ("action");
226 				},
227 				new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 });
228 
229 			block.Post (1);
230 
231 			Thread.Sleep (100);
232 
233 			((IDataflowBlock)block).Fault (new Exception ("fault1"));
234 			((IDataflowBlock)block).Fault (new Exception ("fault2"));
235 
236 			evt.Set ();
237 
238 			var exception =
239 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100));
240 
241 			Assert.AreEqual (2, exception.InnerExceptions.Count);
242 			CollectionAssert.AreEqual (new[] { "fault1", "action" },
243 				exception.InnerExceptions.Select (e => e.Message).ToArray ());
244 		}
245 
246 		[Test]
ExceptionAndCancelTest()247 		public void ExceptionAndCancelTest ()
248 		{
249 			var tokenSource = new CancellationTokenSource ();
250 			var block = new ActionBlock<int> (
251 				_ => { throw new Exception ("action"); },
252 				new ExecutionDataflowBlockOptions
253 				{ MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
254 
255 			block.Post (1);
256 
257 			Thread.Sleep (100);
258 
259 			tokenSource.Cancel ();
260 
261 			var exception =
262 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
263 
264 			Assert.AreEqual (1, exception.InnerExceptions.Count);
265 			Assert.AreEqual ("action", exception.InnerException.Message);
266 		}
267 
268 		[Test]
CancelAndExceptionTest()269 		public void CancelAndExceptionTest ()
270 		{
271 			var tokenSource = new CancellationTokenSource ();
272 			var evt = new ManualResetEventSlim ();
273 
274 			var block = new ActionBlock<int> (
275 				_ =>
276 				{
277 					evt.Wait ();
278 					throw new Exception ("action");
279 				},
280 				new ExecutionDataflowBlockOptions
281 				{ MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
282 
283 			block.Post (1);
284 
285 			Thread.Sleep (100);
286 
287 			tokenSource.Cancel ();
288 
289 			evt.Set ();
290 
291 			var exception =
292 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
293 
294 			Assert.AreEqual (1, exception.InnerExceptions.Count);
295 			Assert.AreEqual ("action", exception.InnerException.Message);
296 		}
297 
298 		[Test]
CancelAndFaultTest()299 		public void CancelAndFaultTest ()
300 		{
301 			var tokenSource = new CancellationTokenSource ();
302 			var block = new BufferBlock<int> (
303 				new DataflowBlockOptions { CancellationToken = tokenSource.Token });
304 
305 			tokenSource.Cancel ();
306 
307 			Thread.Sleep (100);
308 
309 			((IDataflowBlock)block).Fault (new Exception ("fault"));
310 
311 			var exception =
312 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
313 
314 			Assert.AreEqual (1, exception.InnerExceptions.Count);
315 			Assert.AreEqual (typeof(TaskCanceledException),
316 				exception.InnerException.GetType ());
317 		}
318 
319 		[Test]
CancelAndFaultWhileExecutingTest()320 		public void CancelAndFaultWhileExecutingTest ()
321 		{
322 			var tokenSource = new CancellationTokenSource ();
323 			var evt = new ManualResetEventSlim ();
324 			var block = new ActionBlock<int> (
325 				_ => evt.Wait (),
326 				new ExecutionDataflowBlockOptions
327 				{ MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
328 
329 			block.Post (1);
330 
331 			Thread.Sleep (100);
332 
333 			tokenSource.Cancel ();
334 
335 			Thread.Sleep (100);
336 
337 			((IDataflowBlock)block).Fault (new Exception ("fault"));
338 
339 			evt.Set ();
340 
341 			Thread.Sleep (100);
342 
343 			var exception =
344 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
345 
346 			Assert.AreEqual (1, exception.InnerExceptions.Count);
347 			Assert.AreEqual (typeof(TaskCanceledException),
348 				exception.InnerException.GetType ());
349 		}
350 
351 		[Test]
FaultAndCancelWhileExecutingTest()352 		public void FaultAndCancelWhileExecutingTest ()
353 		{
354 			var tokenSource = new CancellationTokenSource ();
355 			var evt = new ManualResetEventSlim ();
356 			var block = new ActionBlock<int> (
357 				_ => evt.Wait (),
358 				new ExecutionDataflowBlockOptions
359 				{ MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token });
360 
361 			block.Post (1);
362 
363 			Thread.Sleep (100);
364 
365 			((IDataflowBlock)block).Fault (new Exception ("fault"));
366 
367 			Thread.Sleep (100);
368 
369 			tokenSource.Cancel ();
370 
371 			evt.Set ();
372 
373 			Thread.Sleep (100);
374 
375 			var exception =
376 				AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000));
377 
378 			Assert.AreEqual (1, exception.InnerExceptions.Count);
379 			Assert.AreEqual ("fault", exception.InnerException.Message);
380 		}
381 	}
382 }
383