1 // 2 // BroadcastBlockTest.cs 3 // 4 // Author: 5 // Jérémie "garuma" Laval <jeremie.laval@gmail.com> 6 // Petr Onderka <gsvick@gmail.com> 7 // 8 // Copyright (c) 2011 Jérémie "garuma" Laval 9 // Copyright (c) 2012 Petr Onderka 10 // 11 // Permission is hereby granted, free of charge, to any person obtaining a copy 12 // of this software and associated documentation files (the "Software"), to deal 13 // in the Software without restriction, including without limitation the rights 14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 15 // copies of the Software, and to permit persons to whom the Software is 16 // furnished to do so, subject to the following conditions: 17 // 18 // The above copyright notice and this permission notice shall be included in 19 // all copies or substantial portions of the Software. 20 // 21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 27 // THE SOFTWARE. 28 29 using System; 30 using System.Collections.Generic; 31 using System.Threading; 32 using System.Threading.Tasks.Dataflow; 33 using NUnit.Framework; 34 35 namespace MonoTests.System.Threading.Tasks.Dataflow { 36 [TestFixture] 37 public class BroadcastBlockTest { 38 [Test] BasicUsageTest()39 public void BasicUsageTest () 40 { 41 bool act1 = false, act2 = false; 42 var evt = new CountdownEvent (2); 43 44 var broadcast = new BroadcastBlock<int> (null); 45 var action1 = new ActionBlock<int> (i => 46 { 47 act1 = i == 42; 48 evt.Signal (); 49 }); 50 var action2 = new ActionBlock<int> (i => 51 { 52 act2 = i == 42; 53 evt.Signal (); 54 }); 55 56 broadcast.LinkTo (action1); 57 broadcast.LinkTo (action2); 58 59 Assert.IsTrue (broadcast.Post (42)); 60 61 Assert.IsTrue (evt.Wait (1000)); 62 63 Assert.IsTrue (act1); 64 Assert.IsTrue (act2); 65 } 66 67 [Test] LinkAfterPostTest()68 public void LinkAfterPostTest () 69 { 70 bool act = false; 71 var evt = new ManualResetEventSlim (); 72 73 var broadcast = new BroadcastBlock<int> (null); 74 var action = new ActionBlock<int> (i => 75 { 76 act = i == 42; 77 evt.Set (); 78 }); 79 80 Assert.IsTrue (broadcast.Post (42)); 81 82 broadcast.LinkTo (action); 83 84 Assert.IsTrue (evt.Wait (1000)); 85 86 Assert.IsTrue (act); 87 } 88 89 [Test] PostponedTest()90 public void PostponedTest () 91 { 92 var broadcast = new BroadcastBlock<int> (null); 93 var target = new BufferBlock<int> ( 94 new DataflowBlockOptions { BoundedCapacity = 1 }); 95 broadcast.LinkTo (target); 96 97 Assert.IsTrue (target.Post (1)); 98 99 Assert.IsTrue (broadcast.Post (2)); 100 101 Assert.AreEqual (1, target.Receive (TimeSpan.FromMilliseconds (0))); 102 Assert.AreEqual (2, target.Receive (TimeSpan.FromMilliseconds (1000))); 103 } 104 105 [Test] ConsumeChangedTest()106 public void ConsumeChangedTest () 107 { 108 var scheduler = new TestScheduler (); 109 var broadcast = new BroadcastBlock<int> (null, 110 new DataflowBlockOptions { TaskScheduler = scheduler }); 111 var target = new TestTargetBlock<int> { Postpone = true }; 112 113 broadcast.LinkTo (target); 114 115 Assert.IsFalse (target.HasPostponed); 116 117 Assert.IsTrue (broadcast.Post (1)); 118 119 scheduler.ExecuteAll (); 120 121 Assert.IsTrue (target.HasPostponed); 122 123 Assert.IsTrue (broadcast.Post (2)); 124 125 scheduler.ExecuteAll (); 126 127 int value; 128 Assert.IsTrue (target.RetryPostponed (out value)); 129 Assert.AreEqual (2, value); 130 } 131 132 [Test] ReserveConsumeChangedTest()133 public void ReserveConsumeChangedTest () 134 { 135 var scheduler = new TestScheduler (); 136 var broadcast = new BroadcastBlock<int> (null, 137 new DataflowBlockOptions { TaskScheduler = scheduler }); 138 var target = new TestTargetBlock<int> { Postpone = true }; 139 140 broadcast.LinkTo (target); 141 142 Assert.IsFalse (target.HasPostponed); 143 144 Assert.IsTrue (broadcast.Post (1)); 145 146 scheduler.ExecuteAll (); 147 148 Assert.IsTrue (target.HasPostponed); 149 150 Assert.IsTrue (target.ReservePostponed ()); 151 152 Assert.IsTrue (broadcast.Post (2)); 153 154 scheduler.ExecuteAll (); 155 156 int value; 157 Assert.IsTrue (target.RetryPostponed (out value)); 158 Assert.AreEqual (1, value); 159 } 160 161 [Test] ReserveChangedTest()162 public void ReserveChangedTest () 163 { 164 var scheduler = new TestScheduler (); 165 var broadcast = new BroadcastBlock<int> (null, 166 new DataflowBlockOptions { TaskScheduler = scheduler }); 167 var target = new TestTargetBlock<int> { Postpone = true }; 168 169 broadcast.LinkTo (target); 170 171 Assert.IsFalse (target.HasPostponed); 172 173 Assert.IsTrue (broadcast.Post (1)); 174 175 scheduler.ExecuteAll (); 176 177 Assert.IsTrue (target.HasPostponed); 178 179 Assert.IsTrue(broadcast.Post(2)); 180 181 scheduler.ExecuteAll (); 182 183 Assert.IsTrue (target.ReservePostponed ()); 184 185 int value; 186 Assert.IsTrue (target.RetryPostponed (out value)); 187 Assert.AreEqual (2, value); 188 } 189 190 [Test] QueuedMessagesTest()191 public void QueuedMessagesTest () 192 { 193 var scheduler = new TestScheduler (); 194 var broadcast = new BroadcastBlock<int> (null, 195 new DataflowBlockOptions { TaskScheduler = scheduler }); 196 var target = new BufferBlock<int> (); 197 broadcast.LinkTo (target); 198 199 Assert.IsTrue (broadcast.Post (1)); 200 Assert.IsTrue (broadcast.Post (2)); 201 202 AssertEx.Throws<TimeoutException> ( 203 () => target.Receive (TimeSpan.FromMilliseconds (1000))); 204 205 scheduler.ExecuteAll (); 206 207 int item; 208 Assert.IsTrue (target.TryReceive (out item)); 209 Assert.AreEqual (1, item); 210 Assert.IsTrue (target.TryReceive (out item)); 211 Assert.AreEqual (2, item); 212 } 213 214 [Test] BoundedQueuedTest()215 public void BoundedQueuedTest () 216 { 217 var scheduler = new TestScheduler (); 218 var broadcast = new BroadcastBlock<int> ( 219 null, 220 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 }); 221 222 Assert.IsTrue (broadcast.Post (1)); 223 Assert.IsFalse (broadcast.Post (2)); 224 } 225 226 [Test] BoundedPostponedTest()227 public void BoundedPostponedTest () 228 { 229 var scheduler = new TestScheduler (); 230 var broadcast = new BroadcastBlock<int> ( 231 null, 232 new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 }); 233 ITargetBlock<int> target = broadcast; 234 var source = new TestSourceBlock<int> (); 235 236 Assert.IsTrue (broadcast.Post (1)); 237 var header = new DataflowMessageHeader (1); 238 source.AddMessage (header, 2); 239 Assert.AreEqual (DataflowMessageStatus.Postponed, 240 target.OfferMessage (header, 2, source, false)); 241 Assert.IsFalse (source.WasConsumed (header)); 242 243 scheduler.ExecuteAll (); 244 245 Assert.IsTrue (source.WasConsumed (header)); 246 } 247 248 [Test] CloningTest()249 public void CloningTest () 250 { 251 object act1 = null, act2 = null; 252 var evt = new CountdownEvent (2); 253 254 object source = new object (); 255 var broadcast = new BroadcastBlock<object> (o => new object ()); 256 var action1 = new ActionBlock<object> (i => 257 { 258 act1 = i; 259 evt.Signal (); 260 }); 261 var action2 = new ActionBlock<object> (i => 262 { 263 act2 = i; 264 evt.Signal (); 265 }); 266 267 broadcast.LinkTo (action1); 268 broadcast.LinkTo (action2); 269 270 Assert.IsTrue (broadcast.Post (source)); 271 272 Assert.IsTrue (evt.Wait (1000)); 273 274 Assert.IsNotNull (act1); 275 Assert.IsNotNull (act2); 276 277 Assert.IsFalse (source.Equals (act1)); 278 Assert.IsFalse (source.Equals (act2)); 279 Assert.IsFalse (act2.Equals (act1)); 280 } 281 282 [Test] TryReceiveTest()283 public void TryReceiveTest() 284 { 285 var scheduler = new TestScheduler(); 286 var block = new BroadcastBlock<int>(i => i * 10, new DataflowBlockOptions { TaskScheduler = scheduler }); 287 288 int item; 289 Assert.IsFalse(block.TryReceive(null, out item)); 290 291 Assert.IsTrue(block.Post(1)); 292 Assert.IsTrue(block.Post(2)); 293 294 scheduler.ExecuteAll(); 295 296 Assert.IsTrue(block.TryReceive(null, out item)); 297 Assert.AreEqual(20, item); 298 // predicate is tested on original value, but returned is cloned 299 Assert.IsTrue(block.TryReceive(i => i < 10, out item)); 300 Assert.AreEqual(20, item); 301 } 302 303 [Test] TryReceiveAllTest()304 public void TryReceiveAllTest() 305 { 306 var scheduler = new TestScheduler(); 307 var block = new BroadcastBlock<int>(null, new DataflowBlockOptions { TaskScheduler = scheduler }); 308 IReceivableSourceBlock<int> source = block; 309 310 Assert.IsTrue(block.Post(1)); 311 Assert.IsTrue(block.Post(2)); 312 313 scheduler.ExecuteAll(); 314 315 IList<int> items; 316 Assert.IsTrue(source.TryReceiveAll(out items)); 317 318 CollectionAssert.AreEqual(new[] { 2 }, items); 319 } 320 321 [Test] DontOfferTwiceTest()322 public void DontOfferTwiceTest() 323 { 324 var scheduler = new TestScheduler (); 325 var block = new BroadcastBlock<int> (null, 326 new DataflowBlockOptions { TaskScheduler = scheduler }); 327 var target = 328 new TestTargetBlock<int> { Postpone = true }; 329 block.LinkTo (target); 330 331 Assert.IsFalse (target.HasPostponed); 332 333 Assert.IsTrue (block.Post (1)); 334 335 scheduler.ExecuteAll(); 336 337 Assert.IsTrue (target.HasPostponed); 338 339 target.Postpone = false; 340 341 int value; 342 Assert.IsTrue(target.RetryPostponed(out value)); 343 Assert.AreEqual(1, value); 344 345 block.LinkTo(new BufferBlock<int>()); 346 347 scheduler.ExecuteAll(); 348 349 Assert.AreEqual(default(int), target.DirectlyAccepted); 350 } 351 } 352 }