1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // Partitioner1Chunk.cs
8 //
9 //
10 // Contains tests for testing the Partitioner1Chunk new Dev11 feature.
11 // In this partitioner the chunk size is always 1
12 //
13 // The included scenarios are:
14 //  1. Partitioner Correctness:
15 //          - Chunk is one
16 //          - ParallelForEach support iteration dependencies
17 //  2. Enumerators are disposed in ParallelForEach usage
18 //  3. Negative tests.
19 //
20 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
21 
22 using System.Collections.Concurrent;
23 using System.Collections.Generic;
24 using System.Diagnostics;
25 using Xunit;
26 
27 namespace System.Threading.Tasks.Tests
28 {
29     public static class Partitioner1Chunk
30     {
31         /// <summary>
32         /// Test the fact that every call of the get*DynamicPartitions.GetEnumerator().MoveNext
33         /// results in only one call of the datasource.GetEnumerator().MoveNext
34         ///
35         /// the default chunking algorithm use 2^n chunks. Use these values as the test input data.
36         /// </summary>
37         /// <param name="length">the data source length</param>
38         /// <param name="isOrderable">if OrderablePartitions are used or not</param>
39         [Fact]
OneMoveNext()40         public static void OneMoveNext()
41         {
42             int[] lengthsArray = new[] { 1, 8, 16, 32, 64, 1024 };
43             bool[] isOrderableArray = new[] { true, false };
44 
45             foreach (var length in lengthsArray)
46             {
47                 foreach (var order in isOrderableArray)
48                     OneMoveNext(length, order);
49             }
50         }
OneMoveNext(int length, bool isOrderable)51         private static void OneMoveNext(int length, bool isOrderable)
52         {
53             Debug.WriteLine("Length: {0} IsOrderable: {1}", length, isOrderable);
54             List<int> ds = new List<int>();
55             for (int i = 0; i < length; i++)
56                 ds.Add(i);
57             int dataSourceMoveNextCalls = 0;
58 
59             //this is an enumerable that will execute user actions on move next, current and dispose
60             //in this case we will set it to wait on MoveNext for the even indexes
61             UserActionEnumerable<int> customEnumerable = new UserActionEnumerable<int>(ds);
62             Action<int> moveNextUserAction = (currentElement) =>
63             {
64                 //keep track how many times the move next of the data source was called
65                 //it is expected as
66                 //every call of MoveNext on partitioner>GetDynamicPartions.GetEnumerator
67                 //to result in only one call of datasource Move Next
68                 //there is not need to guard for concurrency issues because this scenario is single threaded
69                 dataSourceMoveNextCalls++;
70             };
71 
72             customEnumerable.MoveNextAction = moveNextUserAction;
73 
74             var partitioner = Partitioner.Create<int>(customEnumerable, EnumerablePartitionerOptions.NoBuffering);
75             //get the dynamic partitions - enumerator
76             if (isOrderable)
77             {
78                 IEnumerator<KeyValuePair<long, int>> enumerator = partitioner.GetOrderableDynamicPartitions().GetEnumerator();
79                 while (enumerator.MoveNext())
80                 {
81                     Assert.Equal(dataSourceMoveNextCalls, 1);
82                     //reset the count - for the next moveNext call
83                     dataSourceMoveNextCalls = 0;
84                 }
85             }
86             else
87             {
88                 IEnumerator<int> enumerator = partitioner.GetDynamicPartitions().GetEnumerator();
89 
90                 while (enumerator.MoveNext())
91                 {
92                     Assert.Equal(dataSourceMoveNextCalls, 1);
93                     //reset the count - for the next moveNext call
94                     dataSourceMoveNextCalls = 0;
95                 }
96             }
97         }
98 
99         /// <summary>
100         /// Test that in a parallel Foreach loop can be dependencies between iterations if a partitioner of chunk size 1 is used
101         /// </summary>
102         /// <param name="length"></param>
103         [Fact]
IterationsWithDependency()104         public static void IterationsWithDependency()
105         {
106             IterationsWithDependency(128, 126);
107             IterationsWithDependency(128, 65);
108         }
IterationsWithDependency(int length, int dependencyIndex)109         private static void IterationsWithDependency(int length, int dependencyIndex)
110         {
111             List<int> ds = new List<int>();
112             for (int i = 0; i < length; i++)
113                 ds.Add(i);
114             var partitioner = Partitioner.Create<int>(ds, EnumerablePartitionerOptions.NoBuffering);
115             ManualResetEvent mre = new ManualResetEvent(false);
116             ConcurrentQueue<int> savedDS = new ConcurrentQueue<int>();
117 
118             Parallel.ForEach(partitioner, (index) =>
119                 {
120                     if (index == dependencyIndex + 1)
121                     {
122                         mre.Set();
123                     }
124                     if (index == dependencyIndex)
125                     {
126                         //if the chunk size will not be one,
127                         //this iteration and the next one will not be processed by the same thread
128                         //waiting here will lead to a deadlock
129                         mre.WaitOne();
130                     }
131                     savedDS.Enqueue(index);
132                 });
133             //if the PForEach ends this means pass
134             //verify the collection
135             Assert.True(CompareCollections(savedDS, ds));
136         }
137 
138         /// <summary>
139         /// Verify that the enumerators used while executing the ParalleForEach over the partitioner are disposed
140         /// </summary>
141 
142         [Fact]
PFEDisposeEnum()143         public static void PFEDisposeEnum()
144         {
145             PFEDisposeEnum(1204);
146         }
PFEDisposeEnum(int length)147         private static void PFEDisposeEnum(int length)
148         {
149             List<int> ds = new List<int>();
150             for (int i = 0; i < length; i++)
151                 ds.Add(i);
152             //this is an enumerable that will execute user actions on move next, current and dispose
153             //in this case we will set it to wait on MoveNext for the even indexes
154             UserActionEnumerable<int> customEnumerable = new UserActionEnumerable<int>(ds);
155             ConcurrentQueue<int> savedDS = new ConcurrentQueue<int>();
156             var partitioner = Partitioner.Create<int>(customEnumerable, EnumerablePartitionerOptions.NoBuffering);
157             Parallel.ForEach(partitioner, (index) =>
158                 {
159                     savedDS.Enqueue(index);
160                 });
161             Assert.True(customEnumerable.AreEnumeratorsDisposed());
162             Assert.True(CompareCollections(savedDS, ds));
163         }
164 
165         /// <summary>
166         /// Negative test:
167         /// Move Next throws
168         /// Partitioner is used in ParallelForEach
169         /// Exception is expected and the enumerators are disposed
170         /// </summary>
171         [Fact]
ExceptionOnMoveNext()172         public static void ExceptionOnMoveNext()
173         {
174             ExceptionOnMoveNext(128, 65, true);
175             ExceptionOnMoveNext(128, 65, false);
176         }
ExceptionOnMoveNext(int length, int indexToThrow, bool isOrderable)177         private static void ExceptionOnMoveNext(int length, int indexToThrow, bool isOrderable)
178         {
179             List<int> ds = new List<int>();
180             for (int i = 0; i < length; i++)
181                 ds.Add(i);
182 
183             Exception userEx = new InvalidOperationException("UserException");
184             //this is an enumerable that will execute user actions on move next, current and dispose
185             //in this case we will set it to throw on MoveNext for specified index
186             UserActionEnumerable<int> customEnumerable = new UserActionEnumerable<int>(ds);
187             Action<int> moveNextUserAction = (currentElement) =>
188                                                             {
189                                                                 if (currentElement == indexToThrow)
190                                                                 {
191                                                                     throw userEx;
192                                                                 };
193                                                             };
194 
195 
196             customEnumerable.MoveNextAction = moveNextUserAction;
197             var partitioner = Partitioner.Create<int>(customEnumerable, EnumerablePartitionerOptions.NoBuffering);
198             var exception = Assert.Throws<AggregateException>(() => Parallel.ForEach(partitioner, (index) => { }));
199             VerifyAggregateException(exception, userEx);
200             Assert.True(customEnumerable.AreEnumeratorsDisposed());
201         }
202 
203         /// <summary>
204         /// Use an incorrect buffering value for the EnumerablePartitionerOptions
205         /// </summary>
206         [Fact]
IncorrectBuffering()207         public static void IncorrectBuffering()
208         {
209             int length = 16;
210             int[] ds = new int[length];
211             for (int i = 0; i < 16; i++)
212                 ds[i] = i;
213             Assert.Throws<ArgumentOutOfRangeException>(() => { var partitioner = Partitioner.Create<int>(ds, (EnumerablePartitionerOptions)0x2); });
214         }
215 
216         /// <summary>
217         /// Use null data source
218         /// </summary>
219         [Fact]
NullDataSource()220         public static void NullDataSource()
221         {
222             Assert.Throws<ArgumentNullException>(() =>
223             {
224                 var partitioner = Partitioner.Create<int>(null, EnumerablePartitionerOptions.NoBuffering);
225             });
226         }
227 
228         #region Helper Methods
229 
230         /// <summary>
231         /// Compare the two collections
232         /// </summary>
233         /// <param name="savedDS">concurrent queue used for saving the consumed data</param>
234         /// <param name="ds">an IEnumerable data source</param>
235         /// <returns></returns>
CompareCollections(ConcurrentQueue<int> savedDS, IEnumerable<int> ds)236         private static bool CompareCollections(ConcurrentQueue<int> savedDS, IEnumerable<int> ds)
237         {
238             List<int> dsList = new List<int>(savedDS);
239             dsList.Sort();
240             List<int> expected = new List<int>(ds);
241             expected.Sort();
242 
243             if (expected.Count != dsList.Count)
244                 return false;
245 
246             for (int i = 0; i < expected.Count; i++)
247             {
248                 int actual = dsList[i];
249                 int exp = expected[i];
250                 if (!actual.Equals(exp))
251                     return false;
252             }
253             return true;
254         }
255 
256         /// <summary>
257         /// return the elements from the collection in order; as a string
258         /// </summary>
259         /// <param name="savedDS"></param>
260         /// <returns></returns>
Print(ConcurrentQueue<int> savedDS)261         private static string Print(ConcurrentQueue<int> savedDS)
262         {
263             List<int> dsList = new List<int>(savedDS);
264             dsList.Sort();
265             return string.Join(",", dsList);
266         }
267 
268         /// <summary>
269         /// Verifies if an aggregate exception contains a specific user exception
270         /// </summary>
271         /// <param name="aggregatEx"></param>
272         /// <param name="userException"></param>
VerifyAggregateException(AggregateException aggregatEx, Exception userException)273         private static void VerifyAggregateException(AggregateException aggregatEx, Exception userException)
274         {
275             Assert.True(aggregatEx.InnerExceptions.Contains(userException));
276             Assert.Equal(aggregatEx.Flatten().InnerExceptions.Count, 1);
277         }
278 
279         #endregion
280     }
281 
282     /// <summary>
283     /// an IEnumerable whose enumerator can be configured to execute user code from.
284     /// - MoveNext
285     /// </summary>
286     /// <typeparam name="T"></typeparam>
287     public class UserActionEnumerable<T> : IEnumerable<T>
288     {
289         protected List<T> _data;
290 
291         //keeps track of how many enumerators are created
292         //in case of an exception in parallel foreach
293         //the enumerators should be disposed
294         private ConcurrentBag<UserActionEnumerator<T>> _allEnumerators = new ConcurrentBag<UserActionEnumerator<T>>();
295 
296         //called in the beginning of enumerator Move Next
297         private Action<int> _moveNextAction = null;
298 
UserActionEnumerable(List<T> enumerable, Action<int> moveNextAction)299         public UserActionEnumerable(List<T> enumerable, Action<int> moveNextAction)
300         {
301             _data = enumerable;
302             _moveNextAction = moveNextAction;
303         }
304 
UserActionEnumerable(List<T> enumerable)305         public UserActionEnumerable(List<T> enumerable)
306         {
307             _data = enumerable;
308         }
309 
310         /// <summary>
311         /// User action for MoveNext
312         /// </summary>
313         public Action<int> MoveNextAction
314         {
315             set
316             {
317                 _moveNextAction = value;
318             }
319         }
320 
321 
System.Collections.IEnumerable.GetEnumerator()322         System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
323         {
324             return (System.Collections.IEnumerator)this.GetEnumerator(); ;
325         }
326 
GetEnumerator()327         public IEnumerator<T> GetEnumerator()
328         {
329             UserActionEnumerator<T> en = new UserActionEnumerator<T>(_data, _moveNextAction);
330             _allEnumerators.Add(en);
331 
332             return en;
333         }
334 
335 
336         /// <summary>
337         /// verifies if all the enumerators are disposed
338         /// </summary>
339         /// <returns></returns>
AreEnumeratorsDisposed()340         public bool AreEnumeratorsDisposed()
341         {
342             foreach (UserActionEnumerator<T> en in _allEnumerators)
343             {
344                 if (!en.IsDisposed())
345                 {
346                     return false;
347                 }
348             }
349             return true;
350         }
351     }
352 
353     /// <summary>
354     /// Enumerator used by the UserActionEnumerable class
355     /// </summary>
356     /// <typeparam name="T">The type of the element</typeparam>
357     public class UserActionEnumerator<T> : IEnumerator<T>
358     {
359         private List<T> _data;
360         private volatile int _positionCurrent = -1;
361         private bool _disposed;
362         private object _lock = new object();
363         private int _length = 0;
364 
365         //called in enumerator's MoveNext
366         private Action<int> _moveNextAction = null;
367 
UserActionEnumerator(List<T> data, Action<int> moveNextAction)368         internal UserActionEnumerator(List<T> data, Action<int> moveNextAction)
369         {
370             _data = data;
371             _disposed = false;
372             _length = data.Count;
373             _moveNextAction = moveNextAction;
374         }
375 
376         /// <summary>
377         /// MoveNext -
378         /// the move next is performed under lock in order to avoid race condition with the Current
379         /// </summary>
380         /// <returns></returns>
MoveNext()381         public bool MoveNext()
382         {
383             bool result = false;
384 
385             lock (_lock)
386             {
387                 _positionCurrent++;
388                 result = _positionCurrent < _length;
389             }
390             if (_moveNextAction != null && result) _moveNextAction(_positionCurrent);
391 
392             return result;
393         }
394 
395         /// <summary>
396         /// current under lock
397         /// </summary>
398         public T Current
399         {
400             get
401             {
402                 lock (_lock)
403                 {
404                     return _data[_positionCurrent];
405                 }
406             }
407         }
408 
409         Object System.Collections.IEnumerator.Current
410         {
411             get
412             {
413                 return this.Current;
414             }
415         }
416 
417         /// <summary>
418         /// Dispose the underlying Enumerator, and suppresses finalization
419         /// so that we will not throw.
420         /// </summary>
Dispose()421         public void Dispose()
422         {
423             GC.SuppressFinalize(this);
424             _disposed = true;
425         }
426 
Reset()427         public void Reset()
428         {
429             throw new System.NotImplementedException("Reset not implemented");
430         }
431 
IsDisposed()432         public bool IsDisposed()
433         {
434             return _disposed;
435         }
436     }
437 }
438