// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Xunit; namespace System.Linq.Parallel.Tests { public static class ExchangeTests { private static readonly ParallelMergeOptions[] Options = new[] { ParallelMergeOptions.AutoBuffered, ParallelMergeOptions.Default, ParallelMergeOptions.FullyBuffered, ParallelMergeOptions.NotBuffered }; /// /// Get a set of ranges, running for each count in `counts`, with 1, 2, and 4 counts for partitions. /// /// The sizes of ranges to return. /// Entries for test data. /// The first element is the Labeled{ParallelQuery{int}} range, /// the second element is the count, and the third is the number of partitions or degrees of parallelism to use. public static IEnumerable PartitioningData(int[] counts) { foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => new[] { 1, 2, 4 })) { yield return results; } } // For each source, run with each buffering option. /// /// Get a set of ranges, and running for each count in `counts`, with each possible ParallelMergeOption /// /// The sizes of ranges to return. /// Entries for test data. /// The first element is the Labeled{ParallelQuery{int}} range, /// the second element is the count, and the third is the ParallelMergeOption to use. public static IEnumerable MergeData(int[] counts) { foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => Options)) { yield return results; } } /// ///For each count, return an Enumerable source that fails (throws an exception) on that count, with each buffering option. /// /// The sizes of ranges to return. /// Entries for test data. /// The first element is the Labeled{ParallelQuery{int}} range, /// the second element is the count, and the third is the ParallelMergeOption to use. public static IEnumerable ThrowOnCount_AllMergeOptions_MemberData(int[] counts) { foreach (int count in counts) { var labeled = Labeled.Label("ThrowOnEnumeration " + count, Enumerables.ThrowOnEnumeration(count).AsParallel().AsOrdered()); foreach (ParallelMergeOptions option in Options) { yield return new object[] { labeled, count, option }; } } } /// /// Return merge option combinations, for testing multiple calls to WithMergeOptions /// /// Entries for test data. public static IEnumerable AllMergeOptions_Multiple() { foreach (ParallelMergeOptions first in Options) { foreach (ParallelMergeOptions second in Options) { yield return new object[] { first, second }; } } } // The following tests attempt to test internal behavior, // but there doesn't appear to be a way to reliably (or automatically) observe it. // The basic tests are covered elsewhere, although without WithDegreeOfParallelism // or WithMergeOptions [Theory] [MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })] public static void Partitioning_Default(Labeled> labeled, int count, int partitions) { int seen = 0; foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Select(i => i)) { Assert.Equal(seen++, i); } } [Theory] [OuterLoop] [MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })] public static void Partitioning_Default_Longrunning(Labeled> labeled, int count, int partitions) { Partitioning_Default(labeled, count, partitions); } [Theory] [MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })] public static void Partitioning_Striped(Labeled> labeled, int count, int partitions) { int seen = 0; foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Take(count).Select(i => i)) { Assert.Equal(seen++, i); } } [Theory] [OuterLoop] [MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })] public static void Partitioning_Striped_Longrunning(Labeled> labeled, int count, int partitions) { Partitioning_Striped(labeled, count, partitions); } [Theory] [MemberData(nameof(MergeData), new[] { 0, 1, 2, 16, 1024 })] public static void Merge_Ordered(Labeled> labeled, int count, ParallelMergeOptions options) { int seen = 0; foreach (int i in labeled.Item.WithMergeOptions(options).Select(i => i)) { Assert.Equal(seen++, i); } } [Theory] [OuterLoop] [MemberData(nameof(MergeData), new int[] { /* Sources.OuterLoopCount */ })] public static void Merge_Ordered_Longrunning(Labeled> labeled, int count, ParallelMergeOptions options) { Merge_Ordered(labeled, count, options); } [Theory] [MemberData(nameof(ThrowOnCount_AllMergeOptions_MemberData), new[] { 4, 8 })] // FailingMergeData has enumerables that throw errors when attempting to perform the nth enumeration. // This test checks whether the query runs in a pipelined or buffered fashion. public static void Merge_Ordered_Pipelining(Labeled> labeled, int count, ParallelMergeOptions options) { Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).First()); } [Theory] [MemberData(nameof(MergeData), new[] { 4, 8 })] // This test checks whether the query runs in a pipelined or buffered fashion. public static void Merge_Ordered_Pipelining_Select(Labeled> labeled, int count, ParallelMergeOptions options) { int countdown = count; Func down = i => { if (Interlocked.Decrement(ref countdown) == 0) throw new DeliberateTestException(); return i; }; Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).Select(down).First()); } [Theory] [MemberData(nameof(UnorderedSources.Ranges), new[] { 2 }, MemberType = typeof(UnorderedSources))] public static void Merge_ArgumentException(Labeled> labeled, int count) { ParallelQuery query = labeled.Item; AssertExtensions.Throws(null, () => query.WithMergeOptions((ParallelMergeOptions)4)); } [Theory] [MemberData(nameof(AllMergeOptions_Multiple))] public static void WithMergeOptions_Multiple(ParallelMergeOptions first, ParallelMergeOptions second) { Assert.Throws(() => ParallelEnumerable.Range(0, 1).WithMergeOptions(first).WithMergeOptions(second)); } [Fact] public static void Merge_ArgumentNullException() { AssertExtensions.Throws("source", () => ((ParallelQuery)null).WithMergeOptions(ParallelMergeOptions.AutoBuffered)); } // The plinq chunk partitioner takes an IEnumerator over the source, and disposes the // enumerator when it is finished. If an exception occurs, the calling enumerator disposes // the source enumerator... but then other worker threads may generate ODEs. // This test verifies any such ODEs are not reflected in the output exception. [Theory] [MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))] public static void PlinqChunkPartitioner_DontEnumerateAfterException(Labeled> left, int leftCount, Labeled> right, int rightCount) { ParallelQuery query = left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism) .Select(x => { if (x == 4) throw new DeliberateTestException(); return x; }) .Zip(right.Item, (a, b) => a + b) .AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism); AggregateException ae = Assert.Throws(() => query.ToArray()); Assert.Single(ae.InnerExceptions); Assert.All(ae.InnerExceptions, e => Assert.IsType(e)); } // The stand-alone chunk partitioner takes an IEnumerator over the source, and // disposes the enumerator when it is finished. If an exception occurs, the calling // enumerator disposes the source enumerator... but then other worker threads may generate ODEs. // This test verifies any such ODEs are not reflected in the output exception. [Theory] [MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))] public static void ManualChunkPartitioner_DontEnumerateAfterException( Labeled> left, int leftCount, Labeled> right, int rightCount) { ParallelQuery query = Partitioner.Create(left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism) .Select(x => { if (x == 4) throw new DeliberateTestException(); return x; }) .Zip(right.Item, (a, b) => a + b)) .AsParallel(); AggregateException ae = Assert.Throws(() => query.ToArray()); Assert.Single(ae.InnerExceptions); Assert.All(ae.InnerExceptions, e => Assert.IsType(e)); } } }