1 // 2 // CompletionTest.cs 3 // 4 // Author: 5 // Jérémie "garuma" Laval <jeremie.laval@gmail.com> 6 // Petr Onderka <gsvick@gmail.com> 7 // 8 // Copyright (c) 2011 Jérémie "garuma" Laval 9 // Copyright (c) 2012 Petr Onderka 10 // 11 // Permission is hereby granted, free of charge, to any person obtaining a copy 12 // of this software and associated documentation files (the "Software"), to deal 13 // in the Software without restriction, including without limitation the rights 14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 15 // copies of the Software, and to permit persons to whom the Software is 16 // furnished to do so, subject to the following conditions: 17 // 18 // The above copyright notice and this permission notice shall be included in 19 // all copies or substantial portions of the Software. 20 // 21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 27 // THE SOFTWARE. 28 29 using System; 30 using System.Collections.Generic; 31 using System.Threading; 32 using System.Threading.Tasks; 33 using System.Threading.Tasks.Dataflow; 34 using NUnit.Framework; 35 using System.Linq; 36 37 namespace MonoTests.System.Threading.Tasks.Dataflow { 38 [TestFixture] 39 public class CompletionTest { 40 [Test] WithElementsStillLingering()41 public void WithElementsStillLingering () 42 { 43 var block = new BufferBlock<int> (); 44 Assert.IsTrue (block.Post (42)); 45 block.Complete (); 46 47 Assert.IsFalse (block.Completion.Wait (100)); 48 Assert.IsFalse (block.Completion.IsCompleted); 49 Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status); 50 51 Assert.AreEqual (42, block.Receive ()); 52 53 Assert.IsTrue (block.Completion.Wait (1000)); 54 Assert.IsTrue (block.Completion.IsCompleted); 55 Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status); 56 } 57 58 [Test] WithElementsStillLingeringButFaulted()59 public void WithElementsStillLingeringButFaulted () 60 { 61 var block = new BufferBlock<int> (); 62 Assert.IsTrue (block.Post (42)); 63 ((IDataflowBlock)block).Fault (new Exception ()); 64 65 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 66 Assert.IsTrue (block.Completion.IsCompleted); 67 Assert.AreEqual (TaskStatus.Faulted, block.Completion.Status); 68 Assert.IsFalse (block.Post (43)); 69 } 70 71 [Test] WithElementsStillLingeringButCancelled()72 public void WithElementsStillLingeringButCancelled () 73 { 74 var tokenSource = new CancellationTokenSource (); 75 var block = new BufferBlock<int> ( 76 new DataflowBlockOptions { CancellationToken = tokenSource.Token }); 77 Assert.IsTrue (block.Post (42)); 78 tokenSource.Cancel (); 79 80 var ae = AssertEx.Throws<AggregateException> ( 81 () => block.Completion.Wait (1000)); 82 Assert.AreEqual (1, ae.InnerExceptions.Count); 83 Assert.AreEqual (typeof(TaskCanceledException), ae.InnerException.GetType ()); 84 85 Assert.IsTrue (block.Completion.IsCompleted); 86 Assert.AreEqual (TaskStatus.Canceled, block.Completion.Status); 87 Assert.IsFalse (block.Post (43)); 88 } 89 90 static IEnumerable<Tuple<IDataflowBlock, ITargetBlock<T>>> GetJoinBlocksWithTargets()91 GetJoinBlocksWithTargets<T> () 92 { 93 Func<IDataflowBlock, ITargetBlock<T>, Tuple<IDataflowBlock, ITargetBlock<T>>> 94 createTuple = Tuple.Create; 95 96 var joinBlock = new JoinBlock<T, T> (); 97 yield return createTuple (joinBlock, joinBlock.Target1); 98 var joinBlock3 = new JoinBlock<T, T, T> (); 99 yield return createTuple (joinBlock3, joinBlock3.Target1); 100 var batchedJoinBlock = new BatchedJoinBlock<T, T> (2); 101 yield return createTuple (batchedJoinBlock, batchedJoinBlock.Target1); 102 var batchedJoinBlock3 = new BatchedJoinBlock<T, T, T> (2); 103 yield return createTuple (batchedJoinBlock3, batchedJoinBlock3.Target1); 104 } 105 106 [Test] JoinTargetCompletitionTest()107 public void JoinTargetCompletitionTest () 108 { 109 foreach (var tuple in GetJoinBlocksWithTargets<int> ()) { 110 AssertEx.Throws<NotSupportedException> ( 111 () => { var x = tuple.Item2.Completion; }); 112 Assert.IsTrue (tuple.Item2.Post (1)); 113 tuple.Item2.Complete (); 114 Assert.IsFalse (tuple.Item2.Post (2)); 115 } 116 117 foreach (var tuple in GetJoinBlocksWithTargets<int> ()) { 118 Assert.IsTrue (tuple.Item2.Post (1)); 119 tuple.Item1.Complete (); 120 Assert.IsFalse (tuple.Item2.Post (2)); 121 } 122 } 123 124 [Test] MultipleFaultsTest()125 public void MultipleFaultsTest () 126 { 127 IDataflowBlock block = new BufferBlock<int> (); 128 129 block.Fault (new Exception ("1")); 130 // second exception should be ignored 131 block.Fault (new Exception ("2")); 132 133 Thread.Sleep (100); 134 135 Assert.IsTrue (block.Completion.IsFaulted); 136 var exception = block.Completion.Exception; 137 Assert.IsNotNull (exception); 138 Assert.AreEqual (1, exception.InnerExceptions.Count); 139 Assert.AreEqual ("1", exception.InnerException.Message); 140 } 141 142 [Test] MultipleFaultsWhileExecutingTest()143 public void MultipleFaultsWhileExecutingTest () 144 { 145 var evt = new ManualResetEventSlim (); 146 147 var actionBlock = new ActionBlock<int> (_ => evt.Wait ()); 148 IDataflowBlock dataflowBlock = actionBlock; 149 150 actionBlock.Post (1); 151 Thread.Sleep (100); 152 153 dataflowBlock.Fault (new Exception ("1")); 154 // second exception should still be ignored 155 dataflowBlock.Fault (new Exception ("2")); 156 157 Thread.Sleep (100); 158 159 Assert.IsFalse (actionBlock.Completion.IsCompleted); 160 161 evt.Set (); 162 163 Thread.Sleep (100); 164 165 Assert.IsTrue (actionBlock.Completion.IsFaulted); 166 var exception = actionBlock.Completion.Exception; 167 Assert.IsNotNull (exception); 168 Assert.AreEqual (1, exception.InnerExceptions.Count); 169 Assert.AreEqual ("1", exception.InnerException.Message); 170 } 171 172 [Test] MultipleExceptionsTest()173 public void MultipleExceptionsTest () 174 { 175 // use barrier to make sure both threads have time to start 176 var barrier = new Barrier (2); 177 178 var block = new ActionBlock<int> ( 179 _ => 180 { 181 barrier.SignalAndWait (); 182 throw new Exception (); 183 }, 184 // strictly speaking, the actions are not guaranteed to run in parallel, 185 // but there is no way to test this otherwise 186 new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 }); 187 188 block.Post (1); 189 block.Post (2); 190 191 var exception = 192 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 193 194 Assert.AreEqual (2, exception.InnerExceptions.Count); 195 } 196 197 [Test] ExceptionAndFaultTest()198 public void ExceptionAndFaultTest () 199 { 200 var block = new ActionBlock<int> ( 201 _ => { throw new Exception ("action"); }, 202 new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 }); 203 204 block.Post (1); 205 206 Thread.Sleep (100); 207 208 ((IDataflowBlock)block).Fault (new Exception ("fault")); 209 210 var exception = 211 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 212 213 Assert.AreEqual (1, exception.InnerExceptions.Count); 214 Assert.AreEqual ("action", exception.InnerException.Message); 215 } 216 217 [Test] FaultAndExceptionTest()218 public void FaultAndExceptionTest () 219 { 220 var evt = new ManualResetEventSlim (); 221 var block = new ActionBlock<int> ( 222 _ => 223 { 224 evt.Wait (); 225 throw new Exception ("action"); 226 }, 227 new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 }); 228 229 block.Post (1); 230 231 Thread.Sleep (100); 232 233 ((IDataflowBlock)block).Fault (new Exception ("fault1")); 234 ((IDataflowBlock)block).Fault (new Exception ("fault2")); 235 236 evt.Set (); 237 238 var exception = 239 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100)); 240 241 Assert.AreEqual (2, exception.InnerExceptions.Count); 242 CollectionAssert.AreEqual (new[] { "fault1", "action" }, 243 exception.InnerExceptions.Select (e => e.Message).ToArray ()); 244 } 245 246 [Test] ExceptionAndCancelTest()247 public void ExceptionAndCancelTest () 248 { 249 var tokenSource = new CancellationTokenSource (); 250 var block = new ActionBlock<int> ( 251 _ => { throw new Exception ("action"); }, 252 new ExecutionDataflowBlockOptions 253 { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token }); 254 255 block.Post (1); 256 257 Thread.Sleep (100); 258 259 tokenSource.Cancel (); 260 261 var exception = 262 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 263 264 Assert.AreEqual (1, exception.InnerExceptions.Count); 265 Assert.AreEqual ("action", exception.InnerException.Message); 266 } 267 268 [Test] CancelAndExceptionTest()269 public void CancelAndExceptionTest () 270 { 271 var tokenSource = new CancellationTokenSource (); 272 var evt = new ManualResetEventSlim (); 273 274 var block = new ActionBlock<int> ( 275 _ => 276 { 277 evt.Wait (); 278 throw new Exception ("action"); 279 }, 280 new ExecutionDataflowBlockOptions 281 { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token }); 282 283 block.Post (1); 284 285 Thread.Sleep (100); 286 287 tokenSource.Cancel (); 288 289 evt.Set (); 290 291 var exception = 292 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 293 294 Assert.AreEqual (1, exception.InnerExceptions.Count); 295 Assert.AreEqual ("action", exception.InnerException.Message); 296 } 297 298 [Test] CancelAndFaultTest()299 public void CancelAndFaultTest () 300 { 301 var tokenSource = new CancellationTokenSource (); 302 var block = new BufferBlock<int> ( 303 new DataflowBlockOptions { CancellationToken = tokenSource.Token }); 304 305 tokenSource.Cancel (); 306 307 Thread.Sleep (100); 308 309 ((IDataflowBlock)block).Fault (new Exception ("fault")); 310 311 var exception = 312 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 313 314 Assert.AreEqual (1, exception.InnerExceptions.Count); 315 Assert.AreEqual (typeof(TaskCanceledException), 316 exception.InnerException.GetType ()); 317 } 318 319 [Test] CancelAndFaultWhileExecutingTest()320 public void CancelAndFaultWhileExecutingTest () 321 { 322 var tokenSource = new CancellationTokenSource (); 323 var evt = new ManualResetEventSlim (); 324 var block = new ActionBlock<int> ( 325 _ => evt.Wait (), 326 new ExecutionDataflowBlockOptions 327 { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token }); 328 329 block.Post (1); 330 331 Thread.Sleep (100); 332 333 tokenSource.Cancel (); 334 335 Thread.Sleep (100); 336 337 ((IDataflowBlock)block).Fault (new Exception ("fault")); 338 339 evt.Set (); 340 341 Thread.Sleep (100); 342 343 var exception = 344 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 345 346 Assert.AreEqual (1, exception.InnerExceptions.Count); 347 Assert.AreEqual (typeof(TaskCanceledException), 348 exception.InnerException.GetType ()); 349 } 350 351 [Test] FaultAndCancelWhileExecutingTest()352 public void FaultAndCancelWhileExecutingTest () 353 { 354 var tokenSource = new CancellationTokenSource (); 355 var evt = new ManualResetEventSlim (); 356 var block = new ActionBlock<int> ( 357 _ => evt.Wait (), 358 new ExecutionDataflowBlockOptions 359 { MaxDegreeOfParallelism = -1, CancellationToken = tokenSource.Token }); 360 361 block.Post (1); 362 363 Thread.Sleep (100); 364 365 ((IDataflowBlock)block).Fault (new Exception ("fault")); 366 367 Thread.Sleep (100); 368 369 tokenSource.Cancel (); 370 371 evt.Set (); 372 373 Thread.Sleep (100); 374 375 var exception = 376 AssertEx.Throws<AggregateException> (() => block.Completion.Wait (1000)); 377 378 Assert.AreEqual (1, exception.InnerExceptions.Count); 379 Assert.AreEqual ("fault", exception.InnerException.Message); 380 } 381 } 382 } 383