1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // Partitioner1Chunk.cs 8 // 9 // 10 // Contains tests for testing the Partitioner1Chunk new Dev11 feature. 11 // In this partitioner the chunk size is always 1 12 // 13 // The included scenarios are: 14 // 1. Partitioner Correctness: 15 // - Chunk is one 16 // - ParallelForEach support iteration dependencies 17 // 2. Enumerators are disposed in ParallelForEach usage 18 // 3. Negative tests. 19 // 20 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 21 22 using System.Collections.Concurrent; 23 using System.Collections.Generic; 24 using System.Diagnostics; 25 using Xunit; 26 27 namespace System.Threading.Tasks.Tests 28 { 29 public static class Partitioner1Chunk 30 { 31 /// <summary> 32 /// Test the fact that every call of the get*DynamicPartitions.GetEnumerator().MoveNext 33 /// results in only one call of the datasource.GetEnumerator().MoveNext 34 /// 35 /// the default chunking algorithm use 2^n chunks. Use these values as the test input data. 36 /// </summary> 37 /// <param name="length">the data source length</param> 38 /// <param name="isOrderable">if OrderablePartitions are used or not</param> 39 [Fact] OneMoveNext()40 public static void OneMoveNext() 41 { 42 int[] lengthsArray = new[] { 1, 8, 16, 32, 64, 1024 }; 43 bool[] isOrderableArray = new[] { true, false }; 44 45 foreach (var length in lengthsArray) 46 { 47 foreach (var order in isOrderableArray) 48 OneMoveNext(length, order); 49 } 50 } OneMoveNext(int length, bool isOrderable)51 private static void OneMoveNext(int length, bool isOrderable) 52 { 53 Debug.WriteLine("Length: {0} IsOrderable: {1}", length, isOrderable); 54 List<int> ds = new List<int>(); 55 for (int i = 0; i < length; i++) 56 ds.Add(i); 57 int dataSourceMoveNextCalls = 0; 58 59 //this is an enumerable that will execute user actions on move next, current and dispose 60 //in this case we will set it to wait on MoveNext for the even indexes 61 UserActionEnumerable<int> customEnumerable = new UserActionEnumerable<int>(ds); 62 Action<int> moveNextUserAction = (currentElement) => 63 { 64 //keep track how many times the move next of the data source was called 65 //it is expected as 66 //every call of MoveNext on partitioner>GetDynamicPartions.GetEnumerator 67 //to result in only one call of datasource Move Next 68 //there is not need to guard for concurrency issues because this scenario is single threaded 69 dataSourceMoveNextCalls++; 70 }; 71 72 customEnumerable.MoveNextAction = moveNextUserAction; 73 74 var partitioner = Partitioner.Create<int>(customEnumerable, EnumerablePartitionerOptions.NoBuffering); 75 //get the dynamic partitions - enumerator 76 if (isOrderable) 77 { 78 IEnumerator<KeyValuePair<long, int>> enumerator = partitioner.GetOrderableDynamicPartitions().GetEnumerator(); 79 while (enumerator.MoveNext()) 80 { 81 Assert.Equal(dataSourceMoveNextCalls, 1); 82 //reset the count - for the next moveNext call 83 dataSourceMoveNextCalls = 0; 84 } 85 } 86 else 87 { 88 IEnumerator<int> enumerator = partitioner.GetDynamicPartitions().GetEnumerator(); 89 90 while (enumerator.MoveNext()) 91 { 92 Assert.Equal(dataSourceMoveNextCalls, 1); 93 //reset the count - for the next moveNext call 94 dataSourceMoveNextCalls = 0; 95 } 96 } 97 } 98 99 /// <summary> 100 /// Test that in a parallel Foreach loop can be dependencies between iterations if a partitioner of chunk size 1 is used 101 /// </summary> 102 /// <param name="length"></param> 103 [Fact] IterationsWithDependency()104 public static void IterationsWithDependency() 105 { 106 IterationsWithDependency(128, 126); 107 IterationsWithDependency(128, 65); 108 } IterationsWithDependency(int length, int dependencyIndex)109 private static void IterationsWithDependency(int length, int dependencyIndex) 110 { 111 List<int> ds = new List<int>(); 112 for (int i = 0; i < length; i++) 113 ds.Add(i); 114 var partitioner = Partitioner.Create<int>(ds, EnumerablePartitionerOptions.NoBuffering); 115 ManualResetEvent mre = new ManualResetEvent(false); 116 ConcurrentQueue<int> savedDS = new ConcurrentQueue<int>(); 117 118 Parallel.ForEach(partitioner, (index) => 119 { 120 if (index == dependencyIndex + 1) 121 { 122 mre.Set(); 123 } 124 if (index == dependencyIndex) 125 { 126 //if the chunk size will not be one, 127 //this iteration and the next one will not be processed by the same thread 128 //waiting here will lead to a deadlock 129 mre.WaitOne(); 130 } 131 savedDS.Enqueue(index); 132 }); 133 //if the PForEach ends this means pass 134 //verify the collection 135 Assert.True(CompareCollections(savedDS, ds)); 136 } 137 138 /// <summary> 139 /// Verify that the enumerators used while executing the ParalleForEach over the partitioner are disposed 140 /// </summary> 141 142 [Fact] PFEDisposeEnum()143 public static void PFEDisposeEnum() 144 { 145 PFEDisposeEnum(1204); 146 } PFEDisposeEnum(int length)147 private static void PFEDisposeEnum(int length) 148 { 149 List<int> ds = new List<int>(); 150 for (int i = 0; i < length; i++) 151 ds.Add(i); 152 //this is an enumerable that will execute user actions on move next, current and dispose 153 //in this case we will set it to wait on MoveNext for the even indexes 154 UserActionEnumerable<int> customEnumerable = new UserActionEnumerable<int>(ds); 155 ConcurrentQueue<int> savedDS = new ConcurrentQueue<int>(); 156 var partitioner = Partitioner.Create<int>(customEnumerable, EnumerablePartitionerOptions.NoBuffering); 157 Parallel.ForEach(partitioner, (index) => 158 { 159 savedDS.Enqueue(index); 160 }); 161 Assert.True(customEnumerable.AreEnumeratorsDisposed()); 162 Assert.True(CompareCollections(savedDS, ds)); 163 } 164 165 /// <summary> 166 /// Negative test: 167 /// Move Next throws 168 /// Partitioner is used in ParallelForEach 169 /// Exception is expected and the enumerators are disposed 170 /// </summary> 171 [Fact] ExceptionOnMoveNext()172 public static void ExceptionOnMoveNext() 173 { 174 ExceptionOnMoveNext(128, 65, true); 175 ExceptionOnMoveNext(128, 65, false); 176 } ExceptionOnMoveNext(int length, int indexToThrow, bool isOrderable)177 private static void ExceptionOnMoveNext(int length, int indexToThrow, bool isOrderable) 178 { 179 List<int> ds = new List<int>(); 180 for (int i = 0; i < length; i++) 181 ds.Add(i); 182 183 Exception userEx = new InvalidOperationException("UserException"); 184 //this is an enumerable that will execute user actions on move next, current and dispose 185 //in this case we will set it to throw on MoveNext for specified index 186 UserActionEnumerable<int> customEnumerable = new UserActionEnumerable<int>(ds); 187 Action<int> moveNextUserAction = (currentElement) => 188 { 189 if (currentElement == indexToThrow) 190 { 191 throw userEx; 192 }; 193 }; 194 195 196 customEnumerable.MoveNextAction = moveNextUserAction; 197 var partitioner = Partitioner.Create<int>(customEnumerable, EnumerablePartitionerOptions.NoBuffering); 198 var exception = Assert.Throws<AggregateException>(() => Parallel.ForEach(partitioner, (index) => { })); 199 VerifyAggregateException(exception, userEx); 200 Assert.True(customEnumerable.AreEnumeratorsDisposed()); 201 } 202 203 /// <summary> 204 /// Use an incorrect buffering value for the EnumerablePartitionerOptions 205 /// </summary> 206 [Fact] IncorrectBuffering()207 public static void IncorrectBuffering() 208 { 209 int length = 16; 210 int[] ds = new int[length]; 211 for (int i = 0; i < 16; i++) 212 ds[i] = i; 213 Assert.Throws<ArgumentOutOfRangeException>(() => { var partitioner = Partitioner.Create<int>(ds, (EnumerablePartitionerOptions)0x2); }); 214 } 215 216 /// <summary> 217 /// Use null data source 218 /// </summary> 219 [Fact] NullDataSource()220 public static void NullDataSource() 221 { 222 Assert.Throws<ArgumentNullException>(() => 223 { 224 var partitioner = Partitioner.Create<int>(null, EnumerablePartitionerOptions.NoBuffering); 225 }); 226 } 227 228 #region Helper Methods 229 230 /// <summary> 231 /// Compare the two collections 232 /// </summary> 233 /// <param name="savedDS">concurrent queue used for saving the consumed data</param> 234 /// <param name="ds">an IEnumerable data source</param> 235 /// <returns></returns> CompareCollections(ConcurrentQueue<int> savedDS, IEnumerable<int> ds)236 private static bool CompareCollections(ConcurrentQueue<int> savedDS, IEnumerable<int> ds) 237 { 238 List<int> dsList = new List<int>(savedDS); 239 dsList.Sort(); 240 List<int> expected = new List<int>(ds); 241 expected.Sort(); 242 243 if (expected.Count != dsList.Count) 244 return false; 245 246 for (int i = 0; i < expected.Count; i++) 247 { 248 int actual = dsList[i]; 249 int exp = expected[i]; 250 if (!actual.Equals(exp)) 251 return false; 252 } 253 return true; 254 } 255 256 /// <summary> 257 /// return the elements from the collection in order; as a string 258 /// </summary> 259 /// <param name="savedDS"></param> 260 /// <returns></returns> Print(ConcurrentQueue<int> savedDS)261 private static string Print(ConcurrentQueue<int> savedDS) 262 { 263 List<int> dsList = new List<int>(savedDS); 264 dsList.Sort(); 265 return string.Join(",", dsList); 266 } 267 268 /// <summary> 269 /// Verifies if an aggregate exception contains a specific user exception 270 /// </summary> 271 /// <param name="aggregatEx"></param> 272 /// <param name="userException"></param> VerifyAggregateException(AggregateException aggregatEx, Exception userException)273 private static void VerifyAggregateException(AggregateException aggregatEx, Exception userException) 274 { 275 Assert.True(aggregatEx.InnerExceptions.Contains(userException)); 276 Assert.Equal(aggregatEx.Flatten().InnerExceptions.Count, 1); 277 } 278 279 #endregion 280 } 281 282 /// <summary> 283 /// an IEnumerable whose enumerator can be configured to execute user code from. 284 /// - MoveNext 285 /// </summary> 286 /// <typeparam name="T"></typeparam> 287 public class UserActionEnumerable<T> : IEnumerable<T> 288 { 289 protected List<T> _data; 290 291 //keeps track of how many enumerators are created 292 //in case of an exception in parallel foreach 293 //the enumerators should be disposed 294 private ConcurrentBag<UserActionEnumerator<T>> _allEnumerators = new ConcurrentBag<UserActionEnumerator<T>>(); 295 296 //called in the beginning of enumerator Move Next 297 private Action<int> _moveNextAction = null; 298 UserActionEnumerable(List<T> enumerable, Action<int> moveNextAction)299 public UserActionEnumerable(List<T> enumerable, Action<int> moveNextAction) 300 { 301 _data = enumerable; 302 _moveNextAction = moveNextAction; 303 } 304 UserActionEnumerable(List<T> enumerable)305 public UserActionEnumerable(List<T> enumerable) 306 { 307 _data = enumerable; 308 } 309 310 /// <summary> 311 /// User action for MoveNext 312 /// </summary> 313 public Action<int> MoveNextAction 314 { 315 set 316 { 317 _moveNextAction = value; 318 } 319 } 320 321 System.Collections.IEnumerable.GetEnumerator()322 System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() 323 { 324 return (System.Collections.IEnumerator)this.GetEnumerator(); ; 325 } 326 GetEnumerator()327 public IEnumerator<T> GetEnumerator() 328 { 329 UserActionEnumerator<T> en = new UserActionEnumerator<T>(_data, _moveNextAction); 330 _allEnumerators.Add(en); 331 332 return en; 333 } 334 335 336 /// <summary> 337 /// verifies if all the enumerators are disposed 338 /// </summary> 339 /// <returns></returns> AreEnumeratorsDisposed()340 public bool AreEnumeratorsDisposed() 341 { 342 foreach (UserActionEnumerator<T> en in _allEnumerators) 343 { 344 if (!en.IsDisposed()) 345 { 346 return false; 347 } 348 } 349 return true; 350 } 351 } 352 353 /// <summary> 354 /// Enumerator used by the UserActionEnumerable class 355 /// </summary> 356 /// <typeparam name="T">The type of the element</typeparam> 357 public class UserActionEnumerator<T> : IEnumerator<T> 358 { 359 private List<T> _data; 360 private volatile int _positionCurrent = -1; 361 private bool _disposed; 362 private object _lock = new object(); 363 private int _length = 0; 364 365 //called in enumerator's MoveNext 366 private Action<int> _moveNextAction = null; 367 UserActionEnumerator(List<T> data, Action<int> moveNextAction)368 internal UserActionEnumerator(List<T> data, Action<int> moveNextAction) 369 { 370 _data = data; 371 _disposed = false; 372 _length = data.Count; 373 _moveNextAction = moveNextAction; 374 } 375 376 /// <summary> 377 /// MoveNext - 378 /// the move next is performed under lock in order to avoid race condition with the Current 379 /// </summary> 380 /// <returns></returns> MoveNext()381 public bool MoveNext() 382 { 383 bool result = false; 384 385 lock (_lock) 386 { 387 _positionCurrent++; 388 result = _positionCurrent < _length; 389 } 390 if (_moveNextAction != null && result) _moveNextAction(_positionCurrent); 391 392 return result; 393 } 394 395 /// <summary> 396 /// current under lock 397 /// </summary> 398 public T Current 399 { 400 get 401 { 402 lock (_lock) 403 { 404 return _data[_positionCurrent]; 405 } 406 } 407 } 408 409 Object System.Collections.IEnumerator.Current 410 { 411 get 412 { 413 return this.Current; 414 } 415 } 416 417 /// <summary> 418 /// Dispose the underlying Enumerator, and suppresses finalization 419 /// so that we will not throw. 420 /// </summary> Dispose()421 public void Dispose() 422 { 423 GC.SuppressFinalize(this); 424 _disposed = true; 425 } 426 Reset()427 public void Reset() 428 { 429 throw new System.NotImplementedException("Reset not implemented"); 430 } 431 IsDisposed()432 public bool IsDisposed() 433 { 434 return _disposed; 435 } 436 } 437 } 438