1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Concurrent; 6 using System.Collections.Generic; 7 using System.Threading; 8 using Xunit; 9 10 namespace System.Linq.Parallel.Tests 11 { 12 public static class ExchangeTests 13 { 14 private static readonly ParallelMergeOptions[] Options = new[] { 15 ParallelMergeOptions.AutoBuffered, 16 ParallelMergeOptions.Default, 17 ParallelMergeOptions.FullyBuffered, 18 ParallelMergeOptions.NotBuffered 19 }; 20 21 /// <summary> 22 /// Get a set of ranges, running for each count in `counts`, with 1, 2, and 4 counts for partitions. 23 /// </summary> 24 /// <param name="counts">The sizes of ranges to return.</param> 25 /// <returns>Entries for test data. 26 /// The first element is the Labeled{ParallelQuery{int}} range, 27 /// the second element is the count, and the third is the number of partitions or degrees of parallelism to use.</returns> PartitioningData(int[] counts)28 public static IEnumerable<object[]> PartitioningData(int[] counts) 29 { 30 foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => new[] { 1, 2, 4 })) 31 { 32 yield return results; 33 } 34 } 35 36 // For each source, run with each buffering option. 37 /// <summary> 38 /// Get a set of ranges, and running for each count in `counts`, with each possible ParallelMergeOption 39 /// </summary> 40 /// <param name="counts">The sizes of ranges to return.</param> 41 /// <returns>Entries for test data. 42 /// The first element is the Labeled{ParallelQuery{int}} range, 43 /// the second element is the count, and the third is the ParallelMergeOption to use.</returns> MergeData(int[] counts)44 public static IEnumerable<object[]> MergeData(int[] counts) 45 { 46 foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => Options)) 47 { 48 yield return results; 49 } 50 } 51 52 /// <summary> 53 ///For each count, return an Enumerable source that fails (throws an exception) on that count, with each buffering option. 54 /// </summary> 55 /// <param name="counts">The sizes of ranges to return.</param> 56 /// <returns>Entries for test data. 57 /// The first element is the Labeled{ParallelQuery{int}} range, 58 /// the second element is the count, and the third is the ParallelMergeOption to use.</returns> ThrowOnCount_AllMergeOptions_MemberData(int[] counts)59 public static IEnumerable<object[]> ThrowOnCount_AllMergeOptions_MemberData(int[] counts) 60 { 61 foreach (int count in counts) 62 { 63 var labeled = Labeled.Label("ThrowOnEnumeration " + count, Enumerables<int>.ThrowOnEnumeration(count).AsParallel().AsOrdered()); 64 foreach (ParallelMergeOptions option in Options) 65 { 66 yield return new object[] { labeled, count, option }; 67 } 68 } 69 } 70 71 /// <summary> 72 /// Return merge option combinations, for testing multiple calls to WithMergeOptions 73 /// </summary> 74 /// <returns>Entries for test data.</returns> AllMergeOptions_Multiple()75 public static IEnumerable<object[]> AllMergeOptions_Multiple() 76 { 77 foreach (ParallelMergeOptions first in Options) 78 { 79 foreach (ParallelMergeOptions second in Options) 80 { 81 yield return new object[] { first, second }; 82 } 83 } 84 } 85 86 // The following tests attempt to test internal behavior, 87 // but there doesn't appear to be a way to reliably (or automatically) observe it. 88 // The basic tests are covered elsewhere, although without WithDegreeOfParallelism 89 // or WithMergeOptions 90 91 [Theory] 92 [MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })] Partitioning_Default(Labeled<ParallelQuery<int>> labeled, int count, int partitions)93 public static void Partitioning_Default(Labeled<ParallelQuery<int>> labeled, int count, int partitions) 94 { 95 int seen = 0; 96 foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Select(i => i)) 97 { 98 Assert.Equal(seen++, i); 99 } 100 } 101 102 [Theory] 103 [OuterLoop] 104 [MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })] Partitioning_Default_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions)105 public static void Partitioning_Default_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions) 106 { 107 Partitioning_Default(labeled, count, partitions); 108 } 109 110 [Theory] 111 [MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })] Partitioning_Striped(Labeled<ParallelQuery<int>> labeled, int count, int partitions)112 public static void Partitioning_Striped(Labeled<ParallelQuery<int>> labeled, int count, int partitions) 113 { 114 int seen = 0; 115 foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Take(count).Select(i => i)) 116 { 117 Assert.Equal(seen++, i); 118 } 119 } 120 121 [Theory] 122 [OuterLoop] 123 [MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })] Partitioning_Striped_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions)124 public static void Partitioning_Striped_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions) 125 { 126 Partitioning_Striped(labeled, count, partitions); 127 } 128 129 [Theory] 130 [MemberData(nameof(MergeData), new[] { 0, 1, 2, 16, 1024 })] Merge_Ordered(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)131 public static void Merge_Ordered(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options) 132 { 133 int seen = 0; 134 foreach (int i in labeled.Item.WithMergeOptions(options).Select(i => i)) 135 { 136 Assert.Equal(seen++, i); 137 } 138 } 139 140 [Theory] 141 [OuterLoop] 142 [MemberData(nameof(MergeData), new int[] { /* Sources.OuterLoopCount */ })] Merge_Ordered_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)143 public static void Merge_Ordered_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options) 144 { 145 Merge_Ordered(labeled, count, options); 146 } 147 148 [Theory] 149 [MemberData(nameof(ThrowOnCount_AllMergeOptions_MemberData), new[] { 4, 8 })] 150 // FailingMergeData has enumerables that throw errors when attempting to perform the nth enumeration. 151 // This test checks whether the query runs in a pipelined or buffered fashion. Merge_Ordered_Pipelining(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)152 public static void Merge_Ordered_Pipelining(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options) 153 { 154 Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).First()); 155 } 156 157 [Theory] 158 [MemberData(nameof(MergeData), new[] { 4, 8 })] 159 // This test checks whether the query runs in a pipelined or buffered fashion. Merge_Ordered_Pipelining_Select(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)160 public static void Merge_Ordered_Pipelining_Select(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options) 161 { 162 int countdown = count; 163 Func<int, int> down = i => 164 { 165 if (Interlocked.Decrement(ref countdown) == 0) throw new DeliberateTestException(); 166 return i; 167 }; 168 Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).Select(down).First()); 169 } 170 171 [Theory] 172 [MemberData(nameof(UnorderedSources.Ranges), new[] { 2 }, MemberType = typeof(UnorderedSources))] Merge_ArgumentException(Labeled<ParallelQuery<int>> labeled, int count)173 public static void Merge_ArgumentException(Labeled<ParallelQuery<int>> labeled, int count) 174 { 175 ParallelQuery<int> query = labeled.Item; 176 177 AssertExtensions.Throws<ArgumentException>(null, () => query.WithMergeOptions((ParallelMergeOptions)4)); 178 } 179 180 [Theory] 181 [MemberData(nameof(AllMergeOptions_Multiple))] WithMergeOptions_Multiple(ParallelMergeOptions first, ParallelMergeOptions second)182 public static void WithMergeOptions_Multiple(ParallelMergeOptions first, ParallelMergeOptions second) 183 { 184 Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithMergeOptions(first).WithMergeOptions(second)); 185 } 186 187 [Fact] Merge_ArgumentNullException()188 public static void Merge_ArgumentNullException() 189 { 190 AssertExtensions.Throws<ArgumentNullException>("source", () => ((ParallelQuery<int>)null).WithMergeOptions(ParallelMergeOptions.AutoBuffered)); 191 } 192 193 // The plinq chunk partitioner takes an IEnumerator over the source, and disposes the 194 // enumerator when it is finished. If an exception occurs, the calling enumerator disposes 195 // the source enumerator... but then other worker threads may generate ODEs. 196 // This test verifies any such ODEs are not reflected in the output exception. 197 [Theory] 198 [MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))] PlinqChunkPartitioner_DontEnumerateAfterException(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)199 public static void PlinqChunkPartitioner_DontEnumerateAfterException(Labeled<ParallelQuery<int>> left, int leftCount, 200 Labeled<ParallelQuery<int>> right, int rightCount) 201 { 202 ParallelQuery<int> query = 203 left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism) 204 .Select(x => { if (x == 4) throw new DeliberateTestException(); return x; }) 205 .Zip(right.Item, (a, b) => a + b) 206 .AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism); 207 208 AggregateException ae = Assert.Throws<AggregateException>(() => query.ToArray()); 209 Assert.Single(ae.InnerExceptions); 210 Assert.All(ae.InnerExceptions, e => Assert.IsType<DeliberateTestException>(e)); 211 } 212 213 // The stand-alone chunk partitioner takes an IEnumerator over the source, and 214 // disposes the enumerator when it is finished. If an exception occurs, the calling 215 // enumerator disposes the source enumerator... but then other worker threads may generate ODEs. 216 // This test verifies any such ODEs are not reflected in the output exception. 217 [Theory] 218 [MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))] ManualChunkPartitioner_DontEnumerateAfterException( Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)219 public static void ManualChunkPartitioner_DontEnumerateAfterException( 220 Labeled<ParallelQuery<int>> left, int leftCount, 221 Labeled<ParallelQuery<int>> right, int rightCount) 222 { 223 ParallelQuery<int> query = 224 Partitioner.Create(left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism) 225 .Select(x => { if (x == 4) throw new DeliberateTestException(); return x; }) 226 .Zip(right.Item, (a, b) => a + b)) 227 .AsParallel(); 228 229 AggregateException ae = Assert.Throws<AggregateException>(() => query.ToArray()); 230 Assert.Single(ae.InnerExceptions); 231 Assert.All(ae.InnerExceptions, e => Assert.IsType<DeliberateTestException>(e)); 232 } 233 } 234 } 235