1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Generic; 6 using System.Threading; 7 using Xunit; 8 9 namespace System.Linq.Parallel.Tests 10 { 11 public static class JoinTests 12 { 13 private const int KeyFactor = 8; 14 JoinUnorderedData(int[] counts)15 public static IEnumerable<object[]> JoinUnorderedData(int[] counts) 16 { 17 foreach (int leftCount in counts) 18 { 19 foreach (int rightCount in counts) 20 { 21 yield return new object[] { leftCount, rightCount }; 22 } 23 } 24 } 25 JoinData(int[] counts)26 public static IEnumerable<object[]> JoinData(int[] counts) 27 { 28 // When dealing with joins, if there aren't multiple matches the ordering of the second operand is immaterial. 29 foreach (object[] parms in Sources.Ranges(counts, i => counts)) 30 { 31 yield return parms; 32 } 33 } 34 JoinMultipleData(int[] counts)35 public static IEnumerable<object[]> JoinMultipleData(int[] counts) 36 { 37 foreach (object[] parms in UnorderedSources.BinaryRanges(counts, counts)) 38 { 39 yield return new object[] { ((Labeled<ParallelQuery<int>>)parms[0]).Order(), parms[1], ((Labeled<ParallelQuery<int>>)parms[2]).Order(), parms[3] }; 40 } 41 } 42 43 // 44 // Join 45 // 46 [Theory] 47 [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_Unordered(int leftCount, int rightCount)48 public static void Join_Unordered(int leftCount, int rightCount) 49 { 50 ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount); 51 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 52 IntegerRangeSet seen = new IntegerRangeSet(0, Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor)); 53 foreach (var p in leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y))) 54 { 55 Assert.Equal(p.Key * KeyFactor, p.Value); 56 seen.Add(p.Key); 57 } 58 seen.AssertComplete(); 59 } 60 61 [Fact] 62 [OuterLoop] Join_Unordered_Longrunning()63 public static void Join_Unordered_Longrunning() 64 { 65 Join_Unordered(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64); 66 } 67 68 [Theory] 69 [MemberData(nameof(JoinData), new[] { 0, 1, 2, KeyFactor * 2 })] Join(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)70 public static void Join(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount) 71 { 72 ParallelQuery<int> leftQuery = left.Item; 73 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 74 int seen = 0; 75 foreach (var p in leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y))) 76 { 77 Assert.Equal(seen++, p.Key); 78 Assert.Equal(p.Key * KeyFactor, p.Value); 79 } 80 Assert.Equal(Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor), seen); 81 } 82 83 [Theory] 84 [OuterLoop] 85 [MemberData(nameof(JoinData), new[] { 512, 1024 })] Join_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)86 public static void Join_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount) 87 { 88 Join(left, leftCount, rightCount); 89 } 90 91 [Theory] 92 [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_Unordered_NotPipelined(int leftCount, int rightCount)93 public static void Join_Unordered_NotPipelined(int leftCount, int rightCount) 94 { 95 ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount); 96 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 97 IntegerRangeSet seen = new IntegerRangeSet(0, Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor)); 98 Assert.All(leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y)).ToList(), 99 p => { Assert.Equal(p.Key * KeyFactor, p.Value); seen.Add(p.Key); }); 100 seen.AssertComplete(); 101 } 102 103 [Fact] 104 [OuterLoop] Join_Unordered_NotPipelined_Longrunning()105 public static void Join_Unordered_NotPipelined_Longrunning() 106 { 107 Join_Unordered_NotPipelined(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64); 108 } 109 110 [Theory] 111 [MemberData(nameof(JoinData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_NotPipelined(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)112 public static void Join_NotPipelined(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount) 113 { 114 ParallelQuery<int> leftQuery = left.Item; 115 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 116 int seen = 0; 117 Assert.All(leftQuery.Join(rightQuery, x => x * KeyFactor, y => y, (x, y) => KeyValuePair.Create(x, y)).ToList(), 118 p => { Assert.Equal(seen++, p.Key); Assert.Equal(p.Key * KeyFactor, p.Value); }); 119 Assert.Equal(Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor), seen); 120 } 121 122 [Theory] 123 [OuterLoop] 124 [MemberData(nameof(JoinData), new[] { 512, 1024 })] Join_NotPipelined_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)125 public static void Join_NotPipelined_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount) 126 { 127 Join_NotPipelined(left, leftCount, rightCount); 128 } 129 130 [Theory] 131 [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_Unordered_Multiple(int leftCount, int rightCount)132 public static void Join_Unordered_Multiple(int leftCount, int rightCount) 133 { 134 ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount); 135 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 136 IntegerRangeSet seenOuter = new IntegerRangeSet(0, Math.Min(leftCount, (rightCount + (KeyFactor - 1)) / KeyFactor)); 137 IntegerRangeSet seenInner = new IntegerRangeSet(0, Math.Min(leftCount * KeyFactor, rightCount)); 138 Assert.All(leftQuery.Join(rightQuery, x => x, y => y / KeyFactor, (x, y) => KeyValuePair.Create(x, y)), 139 p => 140 { 141 Assert.Equal(p.Key, p.Value / KeyFactor); 142 seenInner.Add(p.Value); 143 if (p.Value % KeyFactor == 0) seenOuter.Add(p.Key); 144 }); 145 seenOuter.AssertComplete(); 146 seenInner.AssertComplete(); 147 } 148 149 [Fact] 150 [OuterLoop] Join_Unordered_Multiple_Longrunning()151 public static void Join_Unordered_Multiple_Longrunning() 152 { 153 Join_Unordered_Multiple(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64); 154 } 155 156 [Theory] 157 [ActiveIssue(1155)] 158 [MemberData(nameof(JoinMultipleData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_Multiple(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)159 public static void Join_Multiple(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount) 160 { 161 ParallelQuery<int> leftQuery = left.Item; 162 ParallelQuery<int> rightQuery = right.Item; 163 int seen = 0; 164 Assert.All(leftQuery.Join(rightQuery, x => x, y => y / KeyFactor, (x, y) => KeyValuePair.Create(x, y)), 165 p => 166 { 167 Assert.Equal(p.Key, p.Value / KeyFactor); 168 Assert.Equal(seen++, p.Value); 169 }); 170 Assert.Equal(Math.Min(leftCount * KeyFactor, rightCount), seen); 171 } 172 173 [Theory] 174 [ActiveIssue(1155)] 175 [OuterLoop] 176 [MemberData(nameof(JoinMultipleData), new[] { 512, 1024 })] Join_Multiple_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)177 public static void Join_Multiple_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount) 178 { 179 Join_Multiple(left, leftCount, right, rightCount); 180 } 181 182 [Theory] 183 [MemberData(nameof(JoinUnorderedData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_Unordered_CustomComparator(int leftCount, int rightCount)184 public static void Join_Unordered_CustomComparator(int leftCount, int rightCount) 185 { 186 ParallelQuery<int> leftQuery = UnorderedSources.Default(leftCount); 187 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 188 IntegerRangeCounter seenOuter = new IntegerRangeCounter(0, leftCount); 189 IntegerRangeCounter seenInner = new IntegerRangeCounter(0, rightCount); 190 Assert.All(leftQuery.Join(rightQuery, x => x, y => y, 191 (x, y) => KeyValuePair.Create(x, y), new ModularCongruenceComparer(KeyFactor)), 192 p => 193 { 194 Assert.Equal(p.Key % KeyFactor, p.Value % KeyFactor); 195 seenOuter.Add(p.Key); 196 seenInner.Add(p.Value); 197 }); 198 if (leftCount == 0 || rightCount == 0) 199 { 200 seenOuter.AssertEncountered(0); 201 seenInner.AssertEncountered(0); 202 } 203 else 204 { 205 Func<int, int, int> cartesian = (key, other) => (other + (KeyFactor - 1) - key % KeyFactor) / KeyFactor; 206 Assert.All(seenOuter, kv => Assert.Equal(cartesian(kv.Key, rightCount), kv.Value)); 207 Assert.All(seenInner, kv => Assert.Equal(cartesian(kv.Key, leftCount), kv.Value)); 208 } 209 } 210 211 [Fact] 212 [OuterLoop] Join_Unordered_CustomComparator_Longrunning()213 public static void Join_Unordered_CustomComparator_Longrunning() 214 { 215 Join_Unordered_CustomComparator(Sources.OuterLoopCount / 64, Sources.OuterLoopCount / 64); 216 } 217 218 [Theory] 219 [ActiveIssue(1155)] 220 [MemberData(nameof(JoinMultipleData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_CustomComparator(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)221 public static void Join_CustomComparator(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount) 222 { 223 ParallelQuery<int> leftQuery = left.Item; 224 ParallelQuery<int> rightQuery = right.Item; 225 int seenOuter = 0; 226 int previousOuter = -1; 227 int seenInner = Math.Max(previousOuter % KeyFactor, rightCount - KeyFactor + previousOuter % KeyFactor); 228 229 Assert.All(leftQuery.Join(rightQuery, x => x, y => y, 230 (x, y) => KeyValuePair.Create(x, y), new ModularCongruenceComparer(KeyFactor)), 231 p => 232 { 233 if (p.Key != previousOuter) 234 { 235 Assert.Equal(seenOuter, p.Key); 236 Assert.Equal(p.Key % 8, p.Value); 237 // If there aren't sufficient elements in the RHS (< 8), the LHS skips an entry at the end of the mod cycle. 238 seenOuter = Math.Min(leftCount, seenOuter + (p.Key % KeyFactor + 1 == rightCount ? KeyFactor - p.Key % KeyFactor : 1)); 239 Assert.Equal(Math.Max(previousOuter % KeyFactor, rightCount - KeyFactor + previousOuter % KeyFactor), seenInner); 240 previousOuter = p.Key; 241 seenInner = (p.Key % KeyFactor) - KeyFactor; 242 } 243 Assert.Equal(p.Key % KeyFactor, p.Value % KeyFactor); 244 Assert.Equal(seenInner += KeyFactor, p.Value); 245 }); 246 Assert.Equal(rightCount == 0 ? 0 : leftCount, seenOuter); 247 Assert.Equal(Math.Max(previousOuter % KeyFactor, rightCount - KeyFactor + previousOuter % KeyFactor), seenInner); 248 } 249 250 [Theory] 251 [ActiveIssue(1155)] 252 [OuterLoop] 253 [MemberData(nameof(JoinMultipleData), new[] { 512, 1024 })] Join_CustomComparator_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount)254 public static void Join_CustomComparator_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, Labeled<ParallelQuery<int>> right, int rightCount) 255 { 256 Join_CustomComparator(left, leftCount, right, rightCount); 257 } 258 259 [Theory] 260 [MemberData(nameof(JoinData), new[] { 0, 1, 2, KeyFactor * 2 })] Join_InnerJoin_Ordered(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)261 public static void Join_InnerJoin_Ordered(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount) 262 { 263 ParallelQuery<int> leftQuery = left.Item; 264 ParallelQuery<int> rightQuery = UnorderedSources.Default(rightCount); 265 ParallelQuery<int> middleQuery = ParallelEnumerable.Range(0, leftCount).AsOrdered(); 266 267 int seen = 0; 268 Assert.All(leftQuery.Join(middleQuery.Join(rightQuery, x => x * KeyFactor / 2, y => y, (x, y) => KeyValuePair.Create(x, y)), 269 z => z * 2, p => p.Key, (x, p) => KeyValuePair.Create(x, p)), 270 pOuter => 271 { 272 KeyValuePair<int, int> pInner = pOuter.Value; 273 Assert.Equal(seen++, pOuter.Key); 274 Assert.Equal(pOuter.Key * 2, pInner.Key); 275 Assert.Equal(pOuter.Key * KeyFactor, pInner.Value); 276 }); 277 Assert.Equal(Math.Min((leftCount + 1) / 2, (rightCount + (KeyFactor - 1)) / KeyFactor), seen); 278 } 279 280 [Theory] 281 [OuterLoop] 282 [MemberData(nameof(JoinData), new[] { 512, 1024 })] Join_InnerJoin_Ordered_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount)283 public static void Join_InnerJoin_Ordered_Longrunning(Labeled<ParallelQuery<int>> left, int leftCount, int rightCount) 284 { 285 Join_InnerJoin_Ordered(left, leftCount, rightCount); 286 } 287 288 [Fact] Join_NotSupportedException()289 public static void Join_NotSupportedException() 290 { 291 #pragma warning disable 618 292 Assert.Throws<NotSupportedException>(() => ParallelEnumerable.Range(0, 1).Join(Enumerable.Range(0, 1), i => i, i => i, (i, j) => i)); 293 Assert.Throws<NotSupportedException>(() => ParallelEnumerable.Range(0, 1).Join(Enumerable.Range(0, 1), i => i, i => i, (i, j) => i, null)); 294 #pragma warning restore 618 295 } 296 297 [Fact] 298 // Should not get the same setting from both operands. Join_NoDuplicateSettings()299 public static void Join_NoDuplicateSettings() 300 { 301 CancellationToken t = new CancellationTokenSource().Token; 302 Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithCancellation(t).Join(ParallelEnumerable.Range(0, 1).WithCancellation(t), x => x, y => y, (l, r) => l)); 303 Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithDegreeOfParallelism(1).Join(ParallelEnumerable.Range(0, 1).WithDegreeOfParallelism(1), x => x, y => y, (l, r) => l)); 304 Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithExecutionMode(ParallelExecutionMode.Default).Join(ParallelEnumerable.Range(0, 1).WithExecutionMode(ParallelExecutionMode.Default), x => x, y => y, (l, r) => l)); 305 Assert.Throws<InvalidOperationException>(() => ParallelEnumerable.Range(0, 1).WithMergeOptions(ParallelMergeOptions.Default).Join(ParallelEnumerable.Range(0, 1).WithMergeOptions(ParallelMergeOptions.Default), x => x, y => y, (l, r) => l)); 306 } 307 308 [Fact] Join_ArgumentNullException()309 public static void Join_ArgumentNullException() 310 { 311 AssertExtensions.Throws<ArgumentNullException>("outer", () => ((ParallelQuery<int>)null).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (i, j) => i)); 312 AssertExtensions.Throws<ArgumentNullException>("inner", () => ParallelEnumerable.Range(0, 1).Join((ParallelQuery<int>)null, i => i, i => i, (i, j) => i)); 313 AssertExtensions.Throws<ArgumentNullException>("outerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), (Func<int, int>)null, i => i, (i, j) => i)); 314 AssertExtensions.Throws<ArgumentNullException>("innerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, (Func<int, int>)null, (i, j) => i)); 315 AssertExtensions.Throws<ArgumentNullException>("resultSelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (Func<int, int, int>)null)); 316 AssertExtensions.Throws<ArgumentNullException>("outer", () => ((ParallelQuery<int>)null).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (i, j) => i, EqualityComparer<int>.Default)); 317 AssertExtensions.Throws<ArgumentNullException>("inner", () => ParallelEnumerable.Range(0, 1).Join((ParallelQuery<int>)null, i => i, i => i, (i, j) => i, EqualityComparer<int>.Default)); 318 AssertExtensions.Throws<ArgumentNullException>("outerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), (Func<int, int>)null, i => i, (i, j) => i, EqualityComparer<int>.Default)); 319 AssertExtensions.Throws<ArgumentNullException>("innerKeySelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, (Func<int, int>)null, (i, j) => i, EqualityComparer<int>.Default)); 320 AssertExtensions.Throws<ArgumentNullException>("resultSelector", () => ParallelEnumerable.Range(0, 1).Join(ParallelEnumerable.Range(0, 1), i => i, i => i, (Func<int, int, int>)null, EqualityComparer<int>.Default)); 321 } 322 } 323 } 324