1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Concurrent;
6 using System.Collections.Generic;
7 using System.Threading;
8 using Xunit;
9 
10 namespace System.Linq.Parallel.Tests
11 {
12     public static class ExchangeTests
13     {
14         private static readonly ParallelMergeOptions[] Options = new[] {
15             ParallelMergeOptions.AutoBuffered,
16             ParallelMergeOptions.Default,
17             ParallelMergeOptions.FullyBuffered,
18             ParallelMergeOptions.NotBuffered
19         };
20 
21         /// <summary>
22         /// Get a set of ranges, running for each count in `counts`, with 1, 2, and 4 counts for partitions.
23         /// </summary>
24         /// <param name="counts">The sizes of ranges to return.</param>
25         /// <returns>Entries for test data.
26         /// The first element is the Labeled{ParallelQuery{int}} range,
27         /// the second element is the count, and the third is the number of partitions or degrees of parallelism to use.</returns>
PartitioningData(int[] counts)28         public static IEnumerable<object[]> PartitioningData(int[] counts)
29         {
30             foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => new[] { 1, 2, 4 }))
31             {
32                 yield return results;
33             }
34         }
35 
36         // For each source, run with each buffering option.
37         /// <summary>
38         /// Get a set of ranges, and running for each count in `counts`, with each possible ParallelMergeOption
39         /// </summary>
40         /// <param name="counts">The sizes of ranges to return.</param>
41         /// <returns>Entries for test data.
42         /// The first element is the Labeled{ParallelQuery{int}} range,
43         /// the second element is the count, and the third is the ParallelMergeOption to use.</returns>
MergeData(int[] counts)44         public static IEnumerable<object[]> MergeData(int[] counts)
45         {
46             foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => Options))
47             {
48                 yield return results;
49             }
50         }
51 
52         /// <summary>
53         ///For each count, return an Enumerable source that fails (throws an exception) on that count, with each buffering option.
54         /// </summary>
55         /// <param name="counts">The sizes of ranges to return.</param>
56         /// <returns>Entries for test data.
57         /// The first element is the Labeled{ParallelQuery{int}} range,
58         /// the second element is the count, and the third is the ParallelMergeOption to use.</returns>
ThrowOnCount_AllMergeOptions_MemberData(int[] counts)59         public static IEnumerable<object[]> ThrowOnCount_AllMergeOptions_MemberData(int[] counts)
60         {
61             foreach (int count in counts)
62             {
63                 var labeled = Labeled.Label("ThrowOnEnumeration " + count, Enumerables<int>.ThrowOnEnumeration(count).AsParallel().AsOrdered());
64                 foreach (ParallelMergeOptions option in Options)
65                 {
66                     yield return new object[] { labeled, count, option };
67                 }
68             }
69         }
70 
71         /// <summary>
72         /// Return merge option combinations, for testing multiple calls to WithMergeOptions
73         /// </summary>
74         /// <returns>Entries for test data.</returns>
AllMergeOptions_Multiple()75         public static IEnumerable<object[]> AllMergeOptions_Multiple()
76         {
77             foreach (ParallelMergeOptions first in Options)
78             {
79                 foreach (ParallelMergeOptions second in Options)
80                 {
81                     yield return new object[] { first, second };
82                 }
83             }
84         }
85 
86         // The following tests attempt to test internal behavior,
87         // but there doesn't appear to be a way to reliably (or automatically) observe it.
88         // The basic tests are covered elsewhere, although without WithDegreeOfParallelism
89         // or WithMergeOptions
90 
91         [Theory]
92         [MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })]
Partitioning_Default(Labeled<ParallelQuery<int>> labeled, int count, int partitions)93         public static void Partitioning_Default(Labeled<ParallelQuery<int>> labeled, int count, int partitions)
94         {
95             int seen = 0;
96             foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Select(i => i))
97             {
98                 Assert.Equal(seen++, i);
99             }
100         }
101 
102         [Theory]
103         [OuterLoop]
104         [MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })]
Partitioning_Default_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions)105         public static void Partitioning_Default_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions)
106         {
107             Partitioning_Default(labeled, count, partitions);
108         }
109 
110         [Theory]
111         [MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })]
Partitioning_Striped(Labeled<ParallelQuery<int>> labeled, int count, int partitions)112         public static void Partitioning_Striped(Labeled<ParallelQuery<int>> labeled, int count, int partitions)
113         {
114             int seen = 0;
115             foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Take(count).Select(i => i))
116             {
117                 Assert.Equal(seen++, i);
118             }
119         }
120 
121         [Theory]
122         [OuterLoop]
123         [MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })]
Partitioning_Striped_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions)124         public static void Partitioning_Striped_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, int partitions)
125         {
126             Partitioning_Striped(labeled, count, partitions);
127         }
128 
129         [Theory]
130         [MemberData(nameof(MergeData), new[] { 0, 1, 2, 16, 1024 })]
Merge_Ordered(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)131         public static void Merge_Ordered(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
132         {
133             int seen = 0;
134             foreach (int i in labeled.Item.WithMergeOptions(options).Select(i => i))
135             {
136                 Assert.Equal(seen++, i);
137             }
138         }
139 
140         [Theory]
141         [OuterLoop]
142         [MemberData(nameof(MergeData), new int[] { /* Sources.OuterLoopCount */ })]
Merge_Ordered_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)143         public static void Merge_Ordered_Longrunning(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
144         {
145             Merge_Ordered(labeled, count, options);
146         }
147 
148         [Theory]
149         [MemberData(nameof(ThrowOnCount_AllMergeOptions_MemberData), new[] { 4, 8 })]
150         // FailingMergeData has enumerables that throw errors when attempting to perform the nth enumeration.
151         // This test checks whether the query runs in a pipelined or buffered fashion.
Merge_Ordered_Pipelining(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)152         public static void Merge_Ordered_Pipelining(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
153         {
154             Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).First());
155         }
156 
157         [Theory]
158         [MemberData(nameof(MergeData), new[] { 4, 8 })]
159         // This test checks whether the query runs in a pipelined or buffered fashion.
Merge_Ordered_Pipelining_Select(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)160         public static void Merge_Ordered_Pipelining_Select(Labeled<ParallelQuery<int>> labeled, int count, ParallelMergeOptions options)
161         {
162             int countdown = count;
163             Func<int, int> down = i =>
164             {
165                 if (Interlocked.Decrement(ref countdown) == 0) throw new DeliberateTestException();
166                 return i;
167             };
168             Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).Select(down).First());
169         }
170 
171         [Theory]
172         [MemberData(nameof(UnorderedSources.Ranges), new[] { 2 }, MemberType = typeof(UnorderedSources))]
Merge_ArgumentException(Labeled<ParallelQuery<int>> labeled, int count)173         public static void Merge_ArgumentException(Labeled<ParallelQuery<int>> labeled, int count)
174         {
175             ParallelQuery<int> query = labeled.Item;
176 
177             AssertExtensions.Throws<ArgumentException>(null, () => query.WithMergeOptions((ParallelMergeOptions)4));
178         }
179 
180         [Theory]
181         [MemberData(nameof(AllMergeOptions_Multiple))]
WithMergeOptions_Multiple(ParallelMergeOptions first, ParallelMergeOptions second)182         public static void WithMergeOptions_Multiple(ParallelMergeOptions first, ParallelMergeOptions second)
183         {
184             Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithMergeOptions(first).WithMergeOptions(second));
185         }
186 
187         [Fact]
Merge_ArgumentNullException()188         public static void Merge_ArgumentNullException()
189         {
190             AssertExtensions.Throws<ArgumentNullException>("source", () => ((ParallelQuery<int>)null).WithMergeOptions(ParallelMergeOptions.AutoBuffered));
191         }
192 
193         // The plinq chunk partitioner takes an IEnumerator over the source, and disposes the
194         // enumerator when it is finished. If an exception occurs, the calling enumerator disposes
195         // the source enumerator... but then other worker threads may generate ODEs.
196         // This test verifies any such ODEs are not reflected in the output exception.
197         [Theory]
198         [MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))]
PlinqChunkPartitioner_DontEnumerateAfterException(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)199         public static void PlinqChunkPartitioner_DontEnumerateAfterException(Labeled<ParallelQuery<int>> left, int leftCount,
200             Labeled<ParallelQuery<int>> right, int rightCount)
201         {
202             ParallelQuery<int> query =
203                 left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
204                     .Select(x => { if (x == 4) throw new DeliberateTestException(); return x; })
205                     .Zip(right.Item, (a, b) => a + b)
206                 .AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism);
207 
208             AggregateException ae = Assert.Throws<AggregateException>(() => query.ToArray());
209             Assert.Single(ae.InnerExceptions);
210             Assert.All(ae.InnerExceptions, e => Assert.IsType<DeliberateTestException>(e));
211         }
212 
213         // The stand-alone chunk partitioner takes an IEnumerator over the source, and
214         // disposes the enumerator when it is finished.  If an exception occurs, the calling
215         // enumerator disposes the source enumerator... but then other worker threads may generate ODEs.
216         // This test verifies any such ODEs are not reflected in the output exception.
217         [Theory]
218         [MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))]
ManualChunkPartitioner_DontEnumerateAfterException( Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)219         public static void ManualChunkPartitioner_DontEnumerateAfterException(
220             Labeled<ParallelQuery<int>> left, int leftCount,
221             Labeled<ParallelQuery<int>> right, int rightCount)
222         {
223             ParallelQuery<int> query =
224                 Partitioner.Create(left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
225                     .Select(x => { if (x == 4) throw new DeliberateTestException(); return x; })
226                     .Zip(right.Item, (a, b) => a + b))
227                 .AsParallel();
228 
229             AggregateException ae = Assert.Throws<AggregateException>(() => query.ToArray());
230             Assert.Single(ae.InnerExceptions);
231             Assert.All(ae.InnerExceptions, e => Assert.IsType<DeliberateTestException>(e));
232         }
233     }
234 }
235