1 //
2 // BroadcastBlockTest.cs
3 //
4 // Author:
5 //       Jérémie "garuma" Laval <jeremie.laval@gmail.com>
6 //       Petr Onderka <gsvick@gmail.com>
7 //
8 // Copyright (c) 2011 Jérémie "garuma" Laval
9 // Copyright (c) 2012 Petr Onderka
10 //
11 // Permission is hereby granted, free of charge, to any person obtaining a copy
12 // of this software and associated documentation files (the "Software"), to deal
13 // in the Software without restriction, including without limitation the rights
14 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 // copies of the Software, and to permit persons to whom the Software is
16 // furnished to do so, subject to the following conditions:
17 //
18 // The above copyright notice and this permission notice shall be included in
19 // all copies or substantial portions of the Software.
20 //
21 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
27 // THE SOFTWARE.
28 
29 using System;
30 using System.Collections.Generic;
31 using System.Threading;
32 using System.Threading.Tasks.Dataflow;
33 using NUnit.Framework;
34 
35 namespace MonoTests.System.Threading.Tasks.Dataflow {
36 	[TestFixture]
37 	public class BroadcastBlockTest {
38 		[Test]
BasicUsageTest()39 		public void BasicUsageTest ()
40 		{
41 			bool act1 = false, act2 = false;
42 			var evt = new CountdownEvent (2);
43 
44 			var broadcast = new BroadcastBlock<int> (null);
45 			var action1 = new ActionBlock<int> (i =>
46 			{
47 				act1 = i == 42;
48 				evt.Signal ();
49 			});
50 			var action2 = new ActionBlock<int> (i =>
51 			{
52 				act2 = i == 42;
53 				evt.Signal ();
54 			});
55 
56 			broadcast.LinkTo (action1);
57 			broadcast.LinkTo (action2);
58 
59 			Assert.IsTrue (broadcast.Post (42));
60 
61 			Assert.IsTrue (evt.Wait (1000));
62 
63 			Assert.IsTrue (act1);
64 			Assert.IsTrue (act2);
65 		}
66 
67 		[Test]
LinkAfterPostTest()68 		public void LinkAfterPostTest ()
69 		{
70 			bool act = false;
71 			var evt = new ManualResetEventSlim ();
72 
73 			var broadcast = new BroadcastBlock<int> (null);
74 			var action = new ActionBlock<int> (i =>
75 			{
76 				act = i == 42;
77 				evt.Set ();
78 			});
79 
80 			Assert.IsTrue (broadcast.Post (42));
81 
82 			broadcast.LinkTo (action);
83 
84 			Assert.IsTrue (evt.Wait (1000));
85 
86 			Assert.IsTrue (act);
87 		}
88 
89 		[Test]
PostponedTest()90 		public void PostponedTest ()
91 		{
92 			var broadcast = new BroadcastBlock<int> (null);
93 			var target = new BufferBlock<int> (
94 				new DataflowBlockOptions { BoundedCapacity = 1 });
95 			broadcast.LinkTo (target);
96 
97 			Assert.IsTrue (target.Post (1));
98 
99 			Assert.IsTrue (broadcast.Post (2));
100 
101 			Assert.AreEqual (1, target.Receive (TimeSpan.FromMilliseconds (0)));
102 			Assert.AreEqual (2, target.Receive (TimeSpan.FromMilliseconds (1000)));
103 		}
104 
105 		[Test]
ConsumeChangedTest()106 		public void ConsumeChangedTest ()
107 		{
108 			var scheduler = new TestScheduler ();
109 			var broadcast = new BroadcastBlock<int> (null,
110 				new DataflowBlockOptions { TaskScheduler = scheduler });
111 			var target = new TestTargetBlock<int> { Postpone = true };
112 
113 			broadcast.LinkTo (target);
114 
115 			Assert.IsFalse (target.HasPostponed);
116 
117 			Assert.IsTrue (broadcast.Post (1));
118 
119 			scheduler.ExecuteAll ();
120 
121 			Assert.IsTrue (target.HasPostponed);
122 
123 			Assert.IsTrue (broadcast.Post (2));
124 
125 			scheduler.ExecuteAll ();
126 
127 			int value;
128 			Assert.IsTrue (target.RetryPostponed (out value));
129 			Assert.AreEqual (2, value);
130 		}
131 
132 		[Test]
ReserveConsumeChangedTest()133 		public void ReserveConsumeChangedTest ()
134 		{
135 			var scheduler = new TestScheduler ();
136 			var broadcast = new BroadcastBlock<int> (null,
137 				new DataflowBlockOptions { TaskScheduler = scheduler });
138 			var target = new TestTargetBlock<int> { Postpone = true };
139 
140 			broadcast.LinkTo (target);
141 
142 			Assert.IsFalse (target.HasPostponed);
143 
144 			Assert.IsTrue (broadcast.Post (1));
145 
146 			scheduler.ExecuteAll ();
147 
148 			Assert.IsTrue (target.HasPostponed);
149 
150 			Assert.IsTrue (target.ReservePostponed ());
151 
152 			Assert.IsTrue (broadcast.Post (2));
153 
154 			scheduler.ExecuteAll ();
155 
156 			int value;
157 			Assert.IsTrue (target.RetryPostponed (out value));
158 			Assert.AreEqual (1, value);
159 		}
160 
161 		[Test]
ReserveChangedTest()162 		public void ReserveChangedTest ()
163 		{
164 			var scheduler = new TestScheduler ();
165 			var broadcast = new BroadcastBlock<int> (null,
166 				new DataflowBlockOptions { TaskScheduler = scheduler });
167 			var target = new TestTargetBlock<int> { Postpone = true };
168 
169 			broadcast.LinkTo (target);
170 
171 			Assert.IsFalse (target.HasPostponed);
172 
173 			Assert.IsTrue (broadcast.Post (1));
174 
175 			scheduler.ExecuteAll ();
176 
177 			Assert.IsTrue (target.HasPostponed);
178 
179 			Assert.IsTrue(broadcast.Post(2));
180 
181 			scheduler.ExecuteAll ();
182 
183 			Assert.IsTrue (target.ReservePostponed ());
184 
185 			int value;
186 			Assert.IsTrue (target.RetryPostponed (out value));
187 			Assert.AreEqual (2, value);
188 		}
189 
190 		[Test]
QueuedMessagesTest()191 		public void QueuedMessagesTest ()
192 		{
193 			var scheduler = new TestScheduler ();
194 			var broadcast = new BroadcastBlock<int> (null,
195 				new DataflowBlockOptions { TaskScheduler = scheduler });
196 			var target = new BufferBlock<int> ();
197 			broadcast.LinkTo (target);
198 
199 			Assert.IsTrue (broadcast.Post (1));
200 			Assert.IsTrue (broadcast.Post (2));
201 
202 			AssertEx.Throws<TimeoutException> (
203 				() => target.Receive (TimeSpan.FromMilliseconds (1000)));
204 
205 			scheduler.ExecuteAll ();
206 
207 			int item;
208 			Assert.IsTrue (target.TryReceive (out item));
209 			Assert.AreEqual (1, item);
210 			Assert.IsTrue (target.TryReceive (out item));
211 			Assert.AreEqual (2, item);
212 		}
213 
214 		[Test]
BoundedQueuedTest()215 		public void BoundedQueuedTest ()
216 		{
217 			var scheduler = new TestScheduler ();
218 			var broadcast = new BroadcastBlock<int> (
219 				null,
220 				new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
221 
222 			Assert.IsTrue (broadcast.Post (1));
223 			Assert.IsFalse (broadcast.Post (2));
224 		}
225 
226 		[Test]
BoundedPostponedTest()227 		public void BoundedPostponedTest ()
228 		{
229 			var scheduler = new TestScheduler ();
230 			var broadcast = new BroadcastBlock<int> (
231 				null,
232 				new DataflowBlockOptions { TaskScheduler = scheduler, BoundedCapacity = 1 });
233 			ITargetBlock<int> target = broadcast;
234 			var source = new TestSourceBlock<int> ();
235 
236 			Assert.IsTrue (broadcast.Post (1));
237 			var header = new DataflowMessageHeader (1);
238 			source.AddMessage (header, 2);
239 			Assert.AreEqual (DataflowMessageStatus.Postponed,
240 				target.OfferMessage (header, 2, source, false));
241 			Assert.IsFalse (source.WasConsumed (header));
242 
243 			scheduler.ExecuteAll ();
244 
245 			Assert.IsTrue (source.WasConsumed (header));
246 		}
247 
248 		[Test]
CloningTest()249 		public void CloningTest ()
250 		{
251 			object act1 = null, act2 = null;
252 			var evt = new CountdownEvent (2);
253 
254 			object source = new object ();
255 			var broadcast = new BroadcastBlock<object> (o => new object ());
256 			var action1 = new ActionBlock<object> (i =>
257 			{
258 				act1 = i;
259 				evt.Signal ();
260 			});
261 			var action2 = new ActionBlock<object> (i =>
262 			{
263 				act2 = i;
264 				evt.Signal ();
265 			});
266 
267 			broadcast.LinkTo (action1);
268 			broadcast.LinkTo (action2);
269 
270 			Assert.IsTrue (broadcast.Post (source));
271 
272 			Assert.IsTrue (evt.Wait (1000));
273 
274 			Assert.IsNotNull (act1);
275 			Assert.IsNotNull (act2);
276 
277 			Assert.IsFalse (source.Equals (act1));
278 			Assert.IsFalse (source.Equals (act2));
279 			Assert.IsFalse (act2.Equals (act1));
280 		}
281 
282 		[Test]
TryReceiveTest()283 		public void TryReceiveTest()
284 		{
285 			var scheduler = new TestScheduler();
286 			var block = new BroadcastBlock<int>(i => i * 10, new DataflowBlockOptions { TaskScheduler = scheduler });
287 
288 			int item;
289 			Assert.IsFalse(block.TryReceive(null, out item));
290 
291 			Assert.IsTrue(block.Post(1));
292 			Assert.IsTrue(block.Post(2));
293 
294 			scheduler.ExecuteAll();
295 
296 			Assert.IsTrue(block.TryReceive(null, out item));
297 			Assert.AreEqual(20, item);
298 			// predicate is tested on original value, but returned is cloned
299 			Assert.IsTrue(block.TryReceive(i => i < 10, out item));
300 			Assert.AreEqual(20, item);
301 		}
302 
303 		[Test]
TryReceiveAllTest()304 		public void TryReceiveAllTest()
305 		{
306 			var scheduler = new TestScheduler();
307 			var block = new BroadcastBlock<int>(null, new DataflowBlockOptions { TaskScheduler = scheduler });
308 			IReceivableSourceBlock<int> source = block;
309 
310 			Assert.IsTrue(block.Post(1));
311 			Assert.IsTrue(block.Post(2));
312 
313 			scheduler.ExecuteAll();
314 
315 			IList<int> items;
316 			Assert.IsTrue(source.TryReceiveAll(out items));
317 
318 			CollectionAssert.AreEqual(new[] { 2 }, items);
319 		}
320 
321 		[Test]
DontOfferTwiceTest()322 		public void DontOfferTwiceTest()
323 		{
324 			var scheduler = new TestScheduler ();
325 			var block = new BroadcastBlock<int> (null,
326 				new DataflowBlockOptions { TaskScheduler = scheduler });
327 			var target =
328 				new TestTargetBlock<int> { Postpone = true };
329 			block.LinkTo (target);
330 
331 			Assert.IsFalse (target.HasPostponed);
332 
333 			Assert.IsTrue (block.Post (1));
334 
335 			scheduler.ExecuteAll();
336 
337 			Assert.IsTrue (target.HasPostponed);
338 
339 			target.Postpone = false;
340 
341 			int value;
342 			Assert.IsTrue(target.RetryPostponed(out value));
343 			Assert.AreEqual(1, value);
344 
345 			block.LinkTo(new BufferBlock<int>());
346 
347 			scheduler.ExecuteAll();
348 
349 			Assert.AreEqual(default(int), target.DirectlyAccepted);
350 		}
351 	}
352 }