1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Generic;
6 using System.Threading;
7 using Xunit;
8 
9 namespace System.Linq.Parallel.Tests
10 {
11     public static class JoinTests
12     {
13         private const int KeyFactor = 8;
14 
JoinUnorderedData(int[] counts)15         public static IEnumerable<object[]> JoinUnorderedData(int[] counts)
16         {
17             foreach (int leftCount in counts)
18             {
19                 foreach (int rightCount in counts)
20                 {
21                     yield return new object[] { leftCount, rightCount };
22                 }
23             }
24         }
25 
JoinData(int[] counts)26         public static IEnumerable<object[]> JoinData(int[] counts)
27         {
28             // When dealing with joins, if there aren't multiple matches the ordering of the second operand is immaterial.
29             foreach (object[] parms in Sources.Ranges(counts, i => counts))
30             {
31                 yield return parms;
32             }
33         }
34 
JoinMultipleData(int[] counts)35         public static IEnumerable<object[]> JoinMultipleData(int[] counts)
36         {
37             foreach (object[] parms in UnorderedSources.BinaryRanges(counts, counts))
38             {
39                 yield return new object[] { ((Labeled<ParallelQuery<int>>)parms[0]).Order(), parms[1], ((Labeled<ParallelQuery<int>>)parms[2]).Order(), parms[3] };
40             }
41         }
42 
43         //
44         // Join
45         //
46         [Theory]
47         [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_Unordered(int leftCount, int rightCount)48         public static void Join_Unordered(int leftCount, int rightCount)
49         {
50             ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount);
51             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
52             IntegerRangeSet seen = new IntegerRangeSet(0, Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor));
53             foreach (var p in leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y)))
54             {
55                 Assert.Equal(p.Key * KeyFactor, p.Value);
56                 seen.Add(p.Key);
57             }
58             seen.AssertComplete();
59         }
60 
61         [Fact]
62         [OuterLoop]
Join_Unordered_Longrunning()63         public static void Join_Unordered_Longrunning()
64         {
65             Join_Unordered(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64);
66         }
67 
68         [Theory]
69         [MemberData(nameof(JoinData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)70         public static void Join(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)
71         {
72             ParallelQuery<int> leftQuery = left.Item;
73             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
74             int seen = 0;
75             foreach (var p in leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y)))
76             {
77                 Assert.Equal(seen++, p.Key);
78                 Assert.Equal(p.Key * KeyFactor, p.Value);
79             }
80             Assert.Equal(Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor), seen);
81         }
82 
83         [Theory]
84         [OuterLoop]
85         [MemberData(nameof(JoinData), new[] { 512, 1024 })]
Join_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)86         public static void Join_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)
87         {
88             Join(left, leftCount, rightCount);
89         }
90 
91         [Theory]
92         [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_Unordered_NotPipelined(int leftCount, int rightCount)93         public static void Join_Unordered_NotPipelined(int leftCount, int rightCount)
94         {
95             ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount);
96             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
97             IntegerRangeSet seen = new IntegerRangeSet(0, Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor));
98             Assert.All(leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y)).ToList(),
99                 p => { Assert.Equal(p.Key * KeyFactor, p.Value); seen.Add(p.Key); });
100             seen.AssertComplete();
101         }
102 
103         [Fact]
104         [OuterLoop]
Join_Unordered_NotPipelined_Longrunning()105         public static void Join_Unordered_NotPipelined_Longrunning()
106         {
107             Join_Unordered_NotPipelined(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64);
108         }
109 
110         [Theory]
111         [MemberData(nameof(JoinData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_NotPipelined(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)112         public static void Join_NotPipelined(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)
113         {
114             ParallelQuery<int> leftQuery = left.Item;
115             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
116             int seen = 0;
117             Assert.All(leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y)).ToList(),
118                 p => { Assert.Equal(seen++, p.Key); Assert.Equal(p.Key * KeyFactor, p.Value); });
119             Assert.Equal(Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor), seen);
120         }
121 
122         [Theory]
123         [OuterLoop]
124         [MemberData(nameof(JoinData), new[] { 512, 1024 })]
Join_NotPipelined_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)125         public static void Join_NotPipelined_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)
126         {
127             Join_NotPipelined(left, leftCount, rightCount);
128         }
129 
130         [Theory]
131         [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_Unordered_Multiple(int leftCount, int rightCount)132         public static void Join_Unordered_Multiple(int leftCount, int rightCount)
133         {
134             ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount);
135             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
136             IntegerRangeSet seenOuter = new IntegerRangeSet(0, Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor));
137             IntegerRangeSet seenInner = new IntegerRangeSet(0, Math.Min(leftCount * KeyFactor, rightCount));
138             Assert.All(leftQuery.Join(rightQuery, x => x, y => y / KeyFactor, (x, y) => KeyValuePair.Create(x, y)),
139                 p =>
140                 {
141                     Assert.Equal(p.Key, p.Value / KeyFactor);
142                     seenInner.Add(p.Value);
143                     if (p.Value % KeyFactor == 0) seenOuter.Add(p.Key);
144                 });
145             seenOuter.AssertComplete();
146             seenInner.AssertComplete();
147         }
148 
149         [Fact]
150         [OuterLoop]
Join_Unordered_Multiple_Longrunning()151         public static void Join_Unordered_Multiple_Longrunning()
152         {
153             Join_Unordered_Multiple(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64);
154         }
155 
156         [Theory]
157         [ActiveIssue(1155)]
158         [MemberData(nameof(JoinMultipleData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_Multiple(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)159         public static void Join_Multiple(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)
160         {
161             ParallelQuery<int> leftQuery = left.Item;
162             ParallelQuery<int> rightQuery = right.Item;
163             int seen = 0;
164             Assert.All(leftQuery.Join(rightQuery, x => x, y => y / KeyFactor, (x, y) => KeyValuePair.Create(x, y)),
165                 p =>
166                 {
167                     Assert.Equal(p.Key, p.Value / KeyFactor);
168                     Assert.Equal(seen++, p.Value);
169                 });
170             Assert.Equal(Math.Min(leftCount * KeyFactor, rightCount), seen);
171         }
172 
173         [Theory]
174         [ActiveIssue(1155)]
175         [OuterLoop]
176         [MemberData(nameof(JoinMultipleData), new[] { 512, 1024 })]
Join_Multiple_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)177         public static void Join_Multiple_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)
178         {
179             Join_Multiple(left, leftCount, right, rightCount);
180         }
181 
182         [Theory]
183         [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_Unordered_CustomComparator(int leftCount, int rightCount)184         public static void Join_Unordered_CustomComparator(int leftCount, int rightCount)
185         {
186             ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount);
187             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
188             IntegerRangeCounter seenOuter = new IntegerRangeCounter(0, leftCount);
189             IntegerRangeCounter seenInner = new IntegerRangeCounter(0, rightCount);
190             Assert.All(leftQuery.Join(rightQuery, x => x, y => y,
191                 (x, y) => KeyValuePair.Create(x, y), new ModularCongruenceComparer(KeyFactor)),
192                 p =>
193                 {
194                     Assert.Equal(p.Key % KeyFactor, p.Value % KeyFactor);
195                     seenOuter.Add(p.Key);
196                     seenInner.Add(p.Value);
197                 });
198             if (leftCount == 0 || rightCount == 0)
199             {
200                 seenOuter.AssertEncountered(0);
201                 seenInner.AssertEncountered(0);
202             }
203             else
204             {
205                 Func<int, int, int> cartesian = (key, other) => (other + (KeyFactor - 1) - key % KeyFactor) / KeyFactor;
206                 Assert.All(seenOuter, kv => Assert.Equal(cartesian(kv.Key, rightCount), kv.Value));
207                 Assert.All(seenInner, kv => Assert.Equal(cartesian(kv.Key, leftCount), kv.Value));
208             }
209         }
210 
211         [Fact]
212         [OuterLoop]
Join_Unordered_CustomComparator_Longrunning()213         public static void Join_Unordered_CustomComparator_Longrunning()
214         {
215             Join_Unordered_CustomComparator(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64);
216         }
217 
218         [Theory]
219         [ActiveIssue(1155)]
220         [MemberData(nameof(JoinMultipleData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_CustomComparator(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)221         public static void Join_CustomComparator(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)
222         {
223             ParallelQuery<int> leftQuery = left.Item;
224             ParallelQuery<int> rightQuery = right.Item;
225             int seenOuter = 0;
226             int previousOuter = -1;
227             int seenInner = Math.Max(previousOuter % KeyFactor, rightCount - KeyFactor + previousOuter % KeyFactor);
228 
229             Assert.All(leftQuery.Join(rightQuery, x => x, y => y,
230                 (x, y) => KeyValuePair.Create(x, y), new ModularCongruenceComparer(KeyFactor)),
231                 p =>
232                 {
233                     if (p.Key != previousOuter)
234                     {
235                         Assert.Equal(seenOuter, p.Key);
236                         Assert.Equal(p.Key % 8, p.Value);
237                         // If there aren't sufficient elements in the RHS (< 8), the LHS skips an entry at the end of the mod cycle.
238                         seenOuter = Math.Min(leftCount, seenOuter + (p.Key % KeyFactor + 1 == rightCount ? KeyFactor - p.Key % KeyFactor : 1));
239                         Assert.Equal(Math.Max(previousOuter % KeyFactor, rightCount - KeyFactor + previousOuter % KeyFactor), seenInner);
240                         previousOuter = p.Key;
241                         seenInner = (p.Key % KeyFactor) - KeyFactor;
242                     }
243                     Assert.Equal(p.Key % KeyFactor, p.Value % KeyFactor);
244                     Assert.Equal(seenInner += KeyFactor, p.Value);
245                 });
246             Assert.Equal(rightCount == 0 ? 0 : leftCount, seenOuter);
247             Assert.Equal(Math.Max(previousOuter % KeyFactor, rightCount - KeyFactor + previousOuter % KeyFactor), seenInner);
248         }
249 
250         [Theory]
251         [ActiveIssue(1155)]
252         [OuterLoop]
253         [MemberData(nameof(JoinMultipleData), new[] { 512, 1024 })]
Join_CustomComparator_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)254         public static void Join_CustomComparator_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)
255         {
256             Join_CustomComparator(left, leftCount, right, rightCount);
257         }
258 
259         [Theory]
260         [MemberData(nameof(JoinData), new[] { 0, 1, 2, KeyFactor * 2 })]
Join_InnerJoin_Ordered(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)261         public static void Join_InnerJoin_Ordered(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)
262         {
263             ParallelQuery<int> leftQuery = left.Item;
264             ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount);
265             ParallelQuery<int> middleQuery = ParallelEnumerable.Range(0, leftCount).AsOrdered();
266 
267             int seen = 0;
268             Assert.All(leftQuery.Join(middleQuery.Join(rightQuery, x => x * KeyFactor / 2, y => y, (x, y) => KeyValuePair.Create(x, y)),
269                 z => z * 2, p => p.Key, (x, p) => KeyValuePair.Create(x, p)),
270                 pOuter =>
271                 {
272                     KeyValuePair<int, int> pInner = pOuter.Value;
273                     Assert.Equal(seen++, pOuter.Key);
274                     Assert.Equal(pOuter.Key * 2, pInner.Key);
275                     Assert.Equal(pOuter.Key * KeyFactor, pInner.Value);
276                 });
277             Assert.Equal(Math.Min((leftCount + 1) / 2, (rightCount + (KeyFactor - 1)) / KeyFactor), seen);
278         }
279 
280         [Theory]
281         [OuterLoop]
282         [MemberData(nameof(JoinData), new[] { 512, 1024 })]
Join_InnerJoin_Ordered_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)283         public static void Join_InnerJoin_Ordered_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)
284         {
285             Join_InnerJoin_Ordered(left, leftCount, rightCount);
286         }
287 
288         [Fact]
Join_NotSupportedException()289         public static void Join_NotSupportedException()
290         {
291 #pragma warning disable 618
292             Assert.Throws<NotSupportedException>(() => ParallelEnumerable.Range(0, 1).Join(Enumerable.Range(0, 1), i => i, i => i, (i, j) => i));
293             Assert.Throws<NotSupportedException>(() => ParallelEnumerable.Range(0, 1).Join(Enumerable.Range(0, 1), i => i, i => i, (i, j) => i, null));
294 #pragma warning restore 618
295         }
296 
297         [Fact]
298         // Should not get the same setting from both operands.
Join_NoDuplicateSettings()299         public static void Join_NoDuplicateSettings()
300         {
301             CancellationToken t = new CancellationTokenSource().Token;
302             Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithCancellation(t).Join(ParallelEnumerable.Range(0, 1).WithCancellation(t), x => x, y => y, (l, r) => l));
303             Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithDegreeOfParallelism(1).Join(ParallelEnumerable.Range(0, 1).WithDegreeOfParallelism(1), x => x, y => y, (l, r) => l));
304             Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithExecutionMode(ParallelExecutionMode.Default).Join(ParallelEnumerable.Range(0, 1).WithExecutionMode(ParallelExecutionMode.Default), x => x, y => y, (l, r) => l));
305             Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithMergeOptions(ParallelMergeOptions.Default).Join(ParallelEnumerable.Range(0, 1).WithMergeOptions(ParallelMergeOptions.Default), x => x, y => y, (l, r) => l));
306         }
307 
308         [Fact]
Join_ArgumentNullException()309         public static void Join_ArgumentNullException()
310         {
311             AssertExtensions.Throws<ArgumentNullException>("outer", () => ((ParallelQuery<int>)null).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (i, j) => i));
312             AssertExtensions.Throws<ArgumentNullException>("inner", () => ParallelEnumerable.Range(0, 1).Join((ParallelQuery<int>)null, i => i, i => i, (i, j) => i));
313             AssertExtensions.Throws<ArgumentNullException>("outerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), (Func<int, int>)null, i => i, (i, j) => i));
314             AssertExtensions.Throws<ArgumentNullException>("innerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, (Func<int, int>)null, (i, j) => i));
315             AssertExtensions.Throws<ArgumentNullException>("resultSelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (Func<int, int, int>)null));
316             AssertExtensions.Throws<ArgumentNullException>("outer", () => ((ParallelQuery<int>)null).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (i, j) => i, EqualityComparer<int>.Default));
317             AssertExtensions.Throws<ArgumentNullException>("inner", () => ParallelEnumerable.Range(0, 1).Join((ParallelQuery<int>)null, i => i, i => i, (i, j) => i, EqualityComparer<int>.Default));
318             AssertExtensions.Throws<ArgumentNullException>("outerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), (Func<int, int>)null, i => i, (i, j) => i, EqualityComparer<int>.Default));
319             AssertExtensions.Throws<ArgumentNullException>("innerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, (Func<int, int>)null, (i, j) => i, EqualityComparer<int>.Default));
320             AssertExtensions.Throws<ArgumentNullException>("resultSelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (Func<int, int, int>)null, EqualityComparer<int>.Default));
321         }
322     }
323 }
324