// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Xunit;
namespace System.Linq.Parallel.Tests
{
public static class ExchangeTests
{
private static readonly ParallelMergeOptions[] Options = new[] {
ParallelMergeOptions.AutoBuffered,
ParallelMergeOptions.Default,
ParallelMergeOptions.FullyBuffered,
ParallelMergeOptions.NotBuffered
};
///
/// Get a set of ranges, running for each count in `counts`, with 1, 2, and 4 counts for partitions.
///
/// The sizes of ranges to return.
/// Entries for test data.
/// The first element is the Labeled{ParallelQuery{int}} range,
/// the second element is the count, and the third is the number of partitions or degrees of parallelism to use.
public static IEnumerable PartitioningData(int[] counts)
{
foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => new[] { 1, 2, 4 }))
{
yield return results;
}
}
// For each source, run with each buffering option.
///
/// Get a set of ranges, and running for each count in `counts`, with each possible ParallelMergeOption
///
/// The sizes of ranges to return.
/// Entries for test data.
/// The first element is the Labeled{ParallelQuery{int}} range,
/// the second element is the count, and the third is the ParallelMergeOption to use.
public static IEnumerable MergeData(int[] counts)
{
foreach (object[] results in Sources.Ranges(counts.DefaultIfEmpty(Sources.OuterLoopCount), x => Options))
{
yield return results;
}
}
///
///For each count, return an Enumerable source that fails (throws an exception) on that count, with each buffering option.
///
/// The sizes of ranges to return.
/// Entries for test data.
/// The first element is the Labeled{ParallelQuery{int}} range,
/// the second element is the count, and the third is the ParallelMergeOption to use.
public static IEnumerable ThrowOnCount_AllMergeOptions_MemberData(int[] counts)
{
foreach (int count in counts)
{
var labeled = Labeled.Label("ThrowOnEnumeration " + count, Enumerables.ThrowOnEnumeration(count).AsParallel().AsOrdered());
foreach (ParallelMergeOptions option in Options)
{
yield return new object[] { labeled, count, option };
}
}
}
///
/// Return merge option combinations, for testing multiple calls to WithMergeOptions
///
/// Entries for test data.
public static IEnumerable AllMergeOptions_Multiple()
{
foreach (ParallelMergeOptions first in Options)
{
foreach (ParallelMergeOptions second in Options)
{
yield return new object[] { first, second };
}
}
}
// The following tests attempt to test internal behavior,
// but there doesn't appear to be a way to reliably (or automatically) observe it.
// The basic tests are covered elsewhere, although without WithDegreeOfParallelism
// or WithMergeOptions
[Theory]
[MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })]
public static void Partitioning_Default(Labeled> labeled, int count, int partitions)
{
int seen = 0;
foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Select(i => i))
{
Assert.Equal(seen++, i);
}
}
[Theory]
[OuterLoop]
[MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })]
public static void Partitioning_Default_Longrunning(Labeled> labeled, int count, int partitions)
{
Partitioning_Default(labeled, count, partitions);
}
[Theory]
[MemberData(nameof(PartitioningData), new[] { 0, 1, 2, 16, 1024 })]
public static void Partitioning_Striped(Labeled> labeled, int count, int partitions)
{
int seen = 0;
foreach (int i in labeled.Item.WithDegreeOfParallelism(partitions).Take(count).Select(i => i))
{
Assert.Equal(seen++, i);
}
}
[Theory]
[OuterLoop]
[MemberData(nameof(PartitioningData), new int[] { /* Sources.OuterLoopCount */ })]
public static void Partitioning_Striped_Longrunning(Labeled> labeled, int count, int partitions)
{
Partitioning_Striped(labeled, count, partitions);
}
[Theory]
[MemberData(nameof(MergeData), new[] { 0, 1, 2, 16, 1024 })]
public static void Merge_Ordered(Labeled> labeled, int count, ParallelMergeOptions options)
{
int seen = 0;
foreach (int i in labeled.Item.WithMergeOptions(options).Select(i => i))
{
Assert.Equal(seen++, i);
}
}
[Theory]
[OuterLoop]
[MemberData(nameof(MergeData), new int[] { /* Sources.OuterLoopCount */ })]
public static void Merge_Ordered_Longrunning(Labeled> labeled, int count, ParallelMergeOptions options)
{
Merge_Ordered(labeled, count, options);
}
[Theory]
[MemberData(nameof(ThrowOnCount_AllMergeOptions_MemberData), new[] { 4, 8 })]
// FailingMergeData has enumerables that throw errors when attempting to perform the nth enumeration.
// This test checks whether the query runs in a pipelined or buffered fashion.
public static void Merge_Ordered_Pipelining(Labeled> labeled, int count, ParallelMergeOptions options)
{
Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).First());
}
[Theory]
[MemberData(nameof(MergeData), new[] { 4, 8 })]
// This test checks whether the query runs in a pipelined or buffered fashion.
public static void Merge_Ordered_Pipelining_Select(Labeled> labeled, int count, ParallelMergeOptions options)
{
int countdown = count;
Func down = i =>
{
if (Interlocked.Decrement(ref countdown) == 0) throw new DeliberateTestException();
return i;
};
Assert.Equal(0, labeled.Item.WithDegreeOfParallelism(count - 1).WithMergeOptions(options).Select(down).First());
}
[Theory]
[MemberData(nameof(UnorderedSources.Ranges), new[] { 2 }, MemberType = typeof(UnorderedSources))]
public static void Merge_ArgumentException(Labeled> labeled, int count)
{
ParallelQuery query = labeled.Item;
AssertExtensions.Throws(null, () => query.WithMergeOptions((ParallelMergeOptions)4));
}
[Theory]
[MemberData(nameof(AllMergeOptions_Multiple))]
public static void WithMergeOptions_Multiple(ParallelMergeOptions first, ParallelMergeOptions second)
{
Assert.Throws(() => ParallelEnumerable.Range(0, 1).WithMergeOptions(first).WithMergeOptions(second));
}
[Fact]
public static void Merge_ArgumentNullException()
{
AssertExtensions.Throws("source", () => ((ParallelQuery)null).WithMergeOptions(ParallelMergeOptions.AutoBuffered));
}
// The plinq chunk partitioner takes an IEnumerator over the source, and disposes the
// enumerator when it is finished. If an exception occurs, the calling enumerator disposes
// the source enumerator... but then other worker threads may generate ODEs.
// This test verifies any such ODEs are not reflected in the output exception.
[Theory]
[MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))]
public static void PlinqChunkPartitioner_DontEnumerateAfterException(Labeled> left, int leftCount,
Labeled> right, int rightCount)
{
ParallelQuery query =
left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => { if (x == 4) throw new DeliberateTestException(); return x; })
.Zip(right.Item, (a, b) => a + b)
.AsParallel().WithExecutionMode(ParallelExecutionMode.ForceParallelism);
AggregateException ae = Assert.Throws(() => query.ToArray());
Assert.Single(ae.InnerExceptions);
Assert.All(ae.InnerExceptions, e => Assert.IsType(e));
}
// The stand-alone chunk partitioner takes an IEnumerator over the source, and
// disposes the enumerator when it is finished. If an exception occurs, the calling
// enumerator disposes the source enumerator... but then other worker threads may generate ODEs.
// This test verifies any such ODEs are not reflected in the output exception.
[Theory]
[MemberData(nameof(UnorderedSources.BinaryRanges), new[] { 16 }, new[] { 16 }, MemberType = typeof(UnorderedSources))]
public static void ManualChunkPartitioner_DontEnumerateAfterException(
Labeled> left, int leftCount,
Labeled> right, int rightCount)
{
ParallelQuery query =
Partitioner.Create(left.Item.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => { if (x == 4) throw new DeliberateTestException(); return x; })
.Zip(right.Item, (a, b) => a + b))
.AsParallel();
AggregateException ae = Assert.Throws(() => query.ToArray());
Assert.Single(ae.InnerExceptions);
Assert.All(ae.InnerExceptions, e => Assert.IsType(e));
}
}
}