1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // IntersectQueryOperator.cs 8 // 9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 10 11 using System.Collections.Generic; 12 using System.Diagnostics; 13 using System.Threading; 14 15 namespace System.Linq.Parallel 16 { 17 /// <summary> 18 /// Operator that yields the intersection of two data sources. 19 /// </summary> 20 /// <typeparam name="TInputOutput"></typeparam> 21 internal sealed class IntersectQueryOperator<TInputOutput> : 22 BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput> 23 { 24 private readonly IEqualityComparer<TInputOutput> _comparer; // An equality comparer. 25 26 //--------------------------------------------------------------------------------------- 27 // Constructs a new intersection operator. 28 // 29 IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)30 internal IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer) 31 : base(left, right) 32 { 33 Debug.Assert(left != null && right != null, "child data sources cannot be null"); 34 35 _comparer = comparer; 36 _outputOrdered = LeftChild.OutputOrdered; 37 38 SetOrdinalIndex(OrdinalIndexState.Shuffled); 39 } 40 41 Open( QuerySettings settings, bool preferStriping)42 internal override QueryResults<TInputOutput> Open( 43 QuerySettings settings, bool preferStriping) 44 { 45 // We just open our child operators, left and then right. Do not propagate the preferStriping value, but 46 // instead explicitly set it to false. Regardless of whether the parent prefers striping or range 47 // partitioning, the output will be hash-partitioned. 48 QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false); 49 QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false); 50 51 return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false); 52 } 53 WrapPartitionedStream( PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)54 public override void WrapPartitionedStream<TLeftKey, TRightKey>( 55 PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, 56 IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings) 57 { 58 Debug.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount); 59 60 if (OutputOrdered) 61 { 62 WrapPartitionedStreamHelper<TLeftKey, TRightKey>( 63 ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>( 64 leftPartitionedStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken), 65 rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken); 66 } 67 else 68 { 69 WrapPartitionedStreamHelper<int, TRightKey>( 70 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>( 71 leftPartitionedStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken), 72 rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken); 73 } 74 } 75 76 //--------------------------------------------------------------------------------------- 77 // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going 78 // to be, and then call this method with that key as a generic parameter. 79 // 80 WrapPartitionedStreamHelper( PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)81 private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>( 82 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, 83 IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken) 84 { 85 int partitionCount = leftHashStream.PartitionCount; 86 87 PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream = 88 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>( 89 rightPartitionedStream, null, null, _comparer, cancellationToken); 90 91 PartitionedStream<TInputOutput, TLeftKey> outputStream = 92 new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled); 93 for (int i = 0; i < partitionCount; i++) 94 { 95 if (OutputOrdered) 96 { 97 outputStream[i] = new OrderedIntersectQueryOperatorEnumerator<TLeftKey>( 98 leftHashStream[i], rightHashStream[i], _comparer, leftHashStream.KeyComparer, cancellationToken); 99 } 100 else 101 { 102 outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object) 103 new IntersectQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], _comparer, cancellationToken); 104 } 105 } 106 107 outputRecipient.Receive(outputStream); 108 } 109 110 //--------------------------------------------------------------------------------------- 111 // Whether this operator performs a premature merge that would not be performed in 112 // a similar sequential operation (i.e., in LINQ to Objects). 113 // 114 115 internal override bool LimitsParallelism 116 { 117 get { return false; } 118 } 119 120 //--------------------------------------------------------------------------------------- 121 // This enumerator performs the intersection operation incrementally. It does this by 122 // maintaining a history -- in the form of a set -- of all data already seen. It then 123 // only returns elements that are seen twice (returning each one only once). 124 // 125 126 class IntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int> 127 { 128 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source. 129 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source. 130 private IEqualityComparer<TInputOutput> _comparer; // Comparer to use for equality/hash-coding. 131 private Set<TInputOutput> _hashLookup; // The hash lookup, used to produce the intersection. 132 private CancellationToken _cancellationToken; 133 private Shared<int> _outputLoopCount; 134 135 //--------------------------------------------------------------------------------------- 136 // Instantiates a new intersection operator. 137 // 138 IntersectQueryOperatorEnumerator( QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource, QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource, IEqualityComparer<TInputOutput> comparer, CancellationToken cancellationToken)139 internal IntersectQueryOperatorEnumerator( 140 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource, 141 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource, 142 IEqualityComparer<TInputOutput> comparer, CancellationToken cancellationToken) 143 { 144 Debug.Assert(leftSource != null); 145 Debug.Assert(rightSource != null); 146 147 _leftSource = leftSource; 148 _rightSource = rightSource; 149 _comparer = comparer; 150 _cancellationToken = cancellationToken; 151 } 152 153 //--------------------------------------------------------------------------------------- 154 // Walks the two data sources, left and then right, to produce the intersection. 155 // 156 MoveNext(ref TInputOutput currentElement, ref int currentKey)157 internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey) 158 { 159 Debug.Assert(_leftSource != null); 160 Debug.Assert(_rightSource != null); 161 162 // Build the set out of the right data source, if we haven't already. 163 164 if (_hashLookup == null) 165 { 166 _outputLoopCount = new Shared<int>(0); 167 _hashLookup = new Set<TInputOutput>(_comparer); 168 169 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>); 170 int rightKeyUnused = default(int); 171 172 int i = 0; 173 while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused)) 174 { 175 if ((i++ & CancellationState.POLL_INTERVAL) == 0) 176 CancellationState.ThrowIfCanceled(_cancellationToken); 177 178 _hashLookup.Add(rightElement.First); 179 } 180 } 181 182 // Now iterate over the left data source, looking for matches. 183 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>); 184 TLeftKey keyUnused = default(TLeftKey); 185 186 while (_leftSource.MoveNext(ref leftElement, ref keyUnused)) 187 { 188 if ((_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) 189 CancellationState.ThrowIfCanceled(_cancellationToken); 190 191 // If we found the element in our set, and if we haven't returned it yet, 192 // we can yield it to the caller. We also mark it so we know we've returned 193 // it once already and never will again. 194 if (_hashLookup.Remove(leftElement.First)) 195 { 196 currentElement = leftElement.First; 197 #if DEBUG 198 currentKey = unchecked((int)0xdeadbeef); 199 #endif 200 return true; 201 } 202 } 203 204 return false; 205 } 206 Dispose(bool disposing)207 protected override void Dispose(bool disposing) 208 { 209 Debug.Assert(_leftSource != null && _rightSource != null); 210 _leftSource.Dispose(); 211 _rightSource.Dispose(); 212 } 213 } 214 215 //--------------------------------------------------------------------------------------- 216 // Returns an enumerable that represents the query executing sequentially. 217 // 218 AsSequentialQuery(CancellationToken token)219 internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token) 220 { 221 IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token); 222 IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); 223 return wrappedLeftChild.Intersect(wrappedRightChild, _comparer); 224 } 225 226 227 class OrderedIntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey> 228 { 229 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source. 230 private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source. 231 private IEqualityComparer<Wrapper<TInputOutput>> _comparer; // Comparer to use for equality/hash-coding. 232 private IComparer<TLeftKey> _leftKeyComparer; // Comparer to use to determine ordering of order keys. 233 private Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>> _hashLookup; // The hash lookup, used to produce the intersection. 234 private CancellationToken _cancellationToken; 235 236 //--------------------------------------------------------------------------------------- 237 // Instantiates a new intersection operator. 238 // 239 OrderedIntersectQueryOperatorEnumerator( QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource, QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource, IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer, CancellationToken cancellationToken)240 internal OrderedIntersectQueryOperatorEnumerator( 241 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource, 242 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource, 243 IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer, 244 CancellationToken cancellationToken) 245 { 246 Debug.Assert(leftSource != null); 247 Debug.Assert(rightSource != null); 248 249 _leftSource = leftSource; 250 _rightSource = rightSource; 251 _comparer = new WrapperEqualityComparer<TInputOutput>(comparer); 252 _leftKeyComparer = leftKeyComparer; 253 _cancellationToken = cancellationToken; 254 } 255 256 //--------------------------------------------------------------------------------------- 257 // Walks the two data sources, left and then right, to produce the intersection. 258 // 259 MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)260 internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey) 261 { 262 Debug.Assert(_leftSource != null); 263 Debug.Assert(_rightSource != null); 264 265 // Build the set out of the left data source, if we haven't already. 266 int i = 0; 267 if (_hashLookup == null) 268 { 269 _hashLookup = new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(_comparer); 270 271 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>); 272 TLeftKey leftKey = default(TLeftKey); 273 while (_leftSource.MoveNext(ref leftElement, ref leftKey)) 274 { 275 if ((i++ & CancellationState.POLL_INTERVAL) == 0) 276 CancellationState.ThrowIfCanceled(_cancellationToken); 277 278 // For each element, we track the smallest order key for that element that we saw so far 279 Pair<TInputOutput, TLeftKey> oldEntry; 280 Wrapper<TInputOutput> wrappedLeftElem = new Wrapper<TInputOutput>(leftElement.First); 281 282 // If this is the first occurrence of this element, or the order key is lower than all keys we saw previously, 283 // update the order key for this element. 284 if (!_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || _leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0) 285 { 286 // For each "elem" value, we store the smallest key, and the element value that had that key. 287 // Note that even though two element values are "equal" according to the EqualityComparer, 288 // we still cannot choose arbitrarily which of the two to yield. 289 _hashLookup[wrappedLeftElem] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey); 290 } 291 } 292 } 293 294 // Now iterate over the right data source, looking for matches. 295 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>); 296 int rightKeyUnused = default(int); 297 while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused)) 298 { 299 if ((i++ & CancellationState.POLL_INTERVAL) == 0) 300 CancellationState.ThrowIfCanceled(_cancellationToken); 301 302 // If we found the element in our set, and if we haven't returned it yet, 303 // we can yield it to the caller. We also mark it so we know we've returned 304 // it once already and never will again. 305 306 Pair<TInputOutput, TLeftKey> entry; 307 Wrapper<TInputOutput> wrappedRightElem = new Wrapper<TInputOutput>(rightElement.First); 308 309 if (_hashLookup.TryGetValue(wrappedRightElem, out entry)) 310 { 311 currentElement = entry.First; 312 currentKey = entry.Second; 313 314 _hashLookup.Remove(new Wrapper<TInputOutput>(entry.First)); 315 return true; 316 } 317 } 318 319 return false; 320 } 321 Dispose(bool disposing)322 protected override void Dispose(bool disposing) 323 { 324 Debug.Assert(_leftSource != null && _rightSource != null); 325 _leftSource.Dispose(); 326 _rightSource.Dispose(); 327 } 328 } 329 } 330 } 331