1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // IntersectQueryOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10 
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14 
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// Operator that yields the intersection of two data sources.
19     /// </summary>
20     /// <typeparam name="TInputOutput"></typeparam>
21     internal sealed class IntersectQueryOperator<TInputOutput> :
22         BinaryQueryOperator<TInputOutput, TInputOutput, TInputOutput>
23     {
24         private readonly IEqualityComparer<TInputOutput> _comparer; // An equality comparer.
25 
26         //---------------------------------------------------------------------------------------
27         // Constructs a new intersection operator.
28         //
29 
IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)30         internal IntersectQueryOperator(ParallelQuery<TInputOutput> left, ParallelQuery<TInputOutput> right, IEqualityComparer<TInputOutput> comparer)
31             : base(left, right)
32         {
33             Debug.Assert(left != null && right != null, "child data sources cannot be null");
34 
35             _comparer = comparer;
36             _outputOrdered = LeftChild.OutputOrdered;
37 
38             SetOrdinalIndex(OrdinalIndexState.Shuffled);
39         }
40 
41 
Open( QuerySettings settings, bool preferStriping)42         internal override QueryResults<TInputOutput> Open(
43             QuerySettings settings, bool preferStriping)
44         {
45             // We just open our child operators, left and then right.  Do not propagate the preferStriping value, but
46             // instead explicitly set it to false. Regardless of whether the parent prefers striping or range
47             // partitioning, the output will be hash-partitioned.
48             QueryResults<TInputOutput> leftChildResults = LeftChild.Open(settings, false);
49             QueryResults<TInputOutput> rightChildResults = RightChild.Open(settings, false);
50 
51             return new BinaryQueryOperatorResults(leftChildResults, rightChildResults, this, settings, false);
52         }
53 
WrapPartitionedStream( PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)54         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
55             PartitionedStream<TInputOutput, TLeftKey> leftPartitionedStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
56             IPartitionedStreamRecipient<TInputOutput> outputRecipient, bool preferStriping, QuerySettings settings)
57         {
58             Debug.Assert(leftPartitionedStream.PartitionCount == rightPartitionedStream.PartitionCount);
59 
60             if (OutputOrdered)
61             {
62                 WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
63                     ExchangeUtilities.HashRepartitionOrdered<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
64                         leftPartitionedStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
65                     rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
66             }
67             else
68             {
69                 WrapPartitionedStreamHelper<int, TRightKey>(
70                     ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TLeftKey>(
71                         leftPartitionedStream, null, null, _comparer, settings.CancellationState.MergedCancellationToken),
72                     rightPartitionedStream, outputRecipient, settings.CancellationState.MergedCancellationToken);
73             }
74         }
75 
76         //---------------------------------------------------------------------------------------
77         // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going
78         // to be, and then call this method with that key as a generic parameter.
79         //
80 
WrapPartitionedStreamHelper( PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream, IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)81         private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>(
82             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftHashStream, PartitionedStream<TInputOutput, TRightKey> rightPartitionedStream,
83             IPartitionedStreamRecipient<TInputOutput> outputRecipient, CancellationToken cancellationToken)
84         {
85             int partitionCount = leftHashStream.PartitionCount;
86 
87             PartitionedStream<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightHashStream =
88                 ExchangeUtilities.HashRepartition<TInputOutput, NoKeyMemoizationRequired, TRightKey>(
89                     rightPartitionedStream, null, null, _comparer, cancellationToken);
90 
91             PartitionedStream<TInputOutput, TLeftKey> outputStream =
92                 new PartitionedStream<TInputOutput, TLeftKey>(partitionCount, leftHashStream.KeyComparer, OrdinalIndexState.Shuffled);
93             for (int i = 0; i < partitionCount; i++)
94             {
95                 if (OutputOrdered)
96                 {
97                     outputStream[i] = new OrderedIntersectQueryOperatorEnumerator<TLeftKey>(
98                         leftHashStream[i], rightHashStream[i], _comparer, leftHashStream.KeyComparer, cancellationToken);
99                 }
100                 else
101                 {
102                     outputStream[i] = (QueryOperatorEnumerator<TInputOutput, TLeftKey>)(object)
103                             new IntersectQueryOperatorEnumerator<TLeftKey>(leftHashStream[i], rightHashStream[i], _comparer, cancellationToken);
104                 }
105             }
106 
107             outputRecipient.Receive(outputStream);
108         }
109 
110         //---------------------------------------------------------------------------------------
111         // Whether this operator performs a premature merge that would not be performed in
112         // a similar sequential operation (i.e., in LINQ to Objects).
113         //
114 
115         internal override bool LimitsParallelism
116         {
117             get { return false; }
118         }
119 
120         //---------------------------------------------------------------------------------------
121         // This enumerator performs the intersection operation incrementally. It does this by
122         // maintaining a history -- in the form of a set -- of all data already seen. It then
123         // only returns elements that are seen twice (returning each one only once).
124         //
125 
126         class IntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, int>
127         {
128             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
129             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
130             private IEqualityComparer<TInputOutput> _comparer; // Comparer to use for equality/hash-coding.
131             private Set<TInputOutput> _hashLookup; // The hash lookup, used to produce the intersection.
132             private CancellationToken _cancellationToken;
133             private Shared<int> _outputLoopCount;
134 
135             //---------------------------------------------------------------------------------------
136             // Instantiates a new intersection operator.
137             //
138 
IntersectQueryOperatorEnumerator( QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource, QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource, IEqualityComparer<TInputOutput> comparer, CancellationToken cancellationToken)139             internal IntersectQueryOperatorEnumerator(
140                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
141                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
142                 IEqualityComparer<TInputOutput> comparer, CancellationToken cancellationToken)
143             {
144                 Debug.Assert(leftSource != null);
145                 Debug.Assert(rightSource != null);
146 
147                 _leftSource = leftSource;
148                 _rightSource = rightSource;
149                 _comparer = comparer;
150                 _cancellationToken = cancellationToken;
151             }
152 
153             //---------------------------------------------------------------------------------------
154             // Walks the two data sources, left and then right, to produce the intersection.
155             //
156 
MoveNext(ref TInputOutput currentElement, ref int currentKey)157             internal override bool MoveNext(ref TInputOutput currentElement, ref int currentKey)
158             {
159                 Debug.Assert(_leftSource != null);
160                 Debug.Assert(_rightSource != null);
161 
162                 // Build the set out of the right data source, if we haven't already.
163 
164                 if (_hashLookup == null)
165                 {
166                     _outputLoopCount = new Shared<int>(0);
167                     _hashLookup = new Set<TInputOutput>(_comparer);
168 
169                     Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
170                     int rightKeyUnused = default(int);
171 
172                     int i = 0;
173                     while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
174                     {
175                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
176                             CancellationState.ThrowIfCanceled(_cancellationToken);
177 
178                         _hashLookup.Add(rightElement.First);
179                     }
180                 }
181 
182                 // Now iterate over the left data source, looking for matches.
183                 Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
184                 TLeftKey keyUnused = default(TLeftKey);
185 
186                 while (_leftSource.MoveNext(ref leftElement, ref keyUnused))
187                 {
188                     if ((_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
189                         CancellationState.ThrowIfCanceled(_cancellationToken);
190 
191                     // If we found the element in our set, and if we haven't returned it yet,
192                     // we can yield it to the caller. We also mark it so we know we've returned
193                     // it once already and never will again.
194                     if (_hashLookup.Remove(leftElement.First))
195                     {
196                         currentElement = leftElement.First;
197 #if DEBUG
198                         currentKey = unchecked((int)0xdeadbeef);
199 #endif
200                         return true;
201                     }
202                 }
203 
204                 return false;
205             }
206 
Dispose(bool disposing)207             protected override void Dispose(bool disposing)
208             {
209                 Debug.Assert(_leftSource != null && _rightSource != null);
210                 _leftSource.Dispose();
211                 _rightSource.Dispose();
212             }
213         }
214 
215         //---------------------------------------------------------------------------------------
216         // Returns an enumerable that represents the query executing sequentially.
217         //
218 
AsSequentialQuery(CancellationToken token)219         internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
220         {
221             IEnumerable<TInputOutput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token);
222             IEnumerable<TInputOutput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token);
223             return wrappedLeftChild.Intersect(wrappedRightChild, _comparer);
224         }
225 
226 
227         class OrderedIntersectQueryOperatorEnumerator<TLeftKey> : QueryOperatorEnumerator<TInputOutput, TLeftKey>
228         {
229             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> _leftSource; // Left data source.
230             private QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> _rightSource; // Right data source.
231             private IEqualityComparer<Wrapper<TInputOutput>> _comparer; // Comparer to use for equality/hash-coding.
232             private IComparer<TLeftKey> _leftKeyComparer; // Comparer to use to determine ordering of order keys.
233             private Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>> _hashLookup; // The hash lookup, used to produce the intersection.
234             private CancellationToken _cancellationToken;
235 
236             //---------------------------------------------------------------------------------------
237             // Instantiates a new intersection operator.
238             //
239 
OrderedIntersectQueryOperatorEnumerator( QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource, QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource, IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer, CancellationToken cancellationToken)240             internal OrderedIntersectQueryOperatorEnumerator(
241                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, TLeftKey> leftSource,
242                 QueryOperatorEnumerator<Pair<TInputOutput, NoKeyMemoizationRequired>, int> rightSource,
243                 IEqualityComparer<TInputOutput> comparer, IComparer<TLeftKey> leftKeyComparer,
244                 CancellationToken cancellationToken)
245             {
246                 Debug.Assert(leftSource != null);
247                 Debug.Assert(rightSource != null);
248 
249                 _leftSource = leftSource;
250                 _rightSource = rightSource;
251                 _comparer = new WrapperEqualityComparer<TInputOutput>(comparer);
252                 _leftKeyComparer = leftKeyComparer;
253                 _cancellationToken = cancellationToken;
254             }
255 
256             //---------------------------------------------------------------------------------------
257             // Walks the two data sources, left and then right, to produce the intersection.
258             //
259 
MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)260             internal override bool MoveNext(ref TInputOutput currentElement, ref TLeftKey currentKey)
261             {
262                 Debug.Assert(_leftSource != null);
263                 Debug.Assert(_rightSource != null);
264 
265                 // Build the set out of the left data source, if we haven't already.
266                 int i = 0;
267                 if (_hashLookup == null)
268                 {
269                     _hashLookup = new Dictionary<Wrapper<TInputOutput>, Pair<TInputOutput, TLeftKey>>(_comparer);
270 
271                     Pair<TInputOutput, NoKeyMemoizationRequired> leftElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
272                     TLeftKey leftKey = default(TLeftKey);
273                     while (_leftSource.MoveNext(ref leftElement, ref leftKey))
274                     {
275                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
276                             CancellationState.ThrowIfCanceled(_cancellationToken);
277 
278                         // For each element, we track the smallest order key for that element that we saw so far
279                         Pair<TInputOutput, TLeftKey> oldEntry;
280                         Wrapper<TInputOutput> wrappedLeftElem = new Wrapper<TInputOutput>(leftElement.First);
281 
282                         // If this is the first occurrence of this element, or the order key is lower than all keys we saw previously,
283                         // update the order key for this element.
284                         if (!_hashLookup.TryGetValue(wrappedLeftElem, out oldEntry) || _leftKeyComparer.Compare(leftKey, oldEntry.Second) < 0)
285                         {
286                             // For each "elem" value, we store the smallest key, and the element value that had that key.
287                             // Note that even though two element values are "equal" according to the EqualityComparer,
288                             // we still cannot choose arbitrarily which of the two to yield.
289                             _hashLookup[wrappedLeftElem] = new Pair<TInputOutput, TLeftKey>(leftElement.First, leftKey);
290                         }
291                     }
292                 }
293 
294                 // Now iterate over the right data source, looking for matches.
295                 Pair<TInputOutput, NoKeyMemoizationRequired> rightElement = default(Pair<TInputOutput, NoKeyMemoizationRequired>);
296                 int rightKeyUnused = default(int);
297                 while (_rightSource.MoveNext(ref rightElement, ref rightKeyUnused))
298                 {
299                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
300                         CancellationState.ThrowIfCanceled(_cancellationToken);
301 
302                     // If we found the element in our set, and if we haven't returned it yet,
303                     // we can yield it to the caller. We also mark it so we know we've returned
304                     // it once already and never will again.
305 
306                     Pair<TInputOutput, TLeftKey> entry;
307                     Wrapper<TInputOutput> wrappedRightElem = new Wrapper<TInputOutput>(rightElement.First);
308 
309                     if (_hashLookup.TryGetValue(wrappedRightElem, out entry))
310                     {
311                         currentElement = entry.First;
312                         currentKey = entry.Second;
313 
314                         _hashLookup.Remove(new Wrapper<TInputOutput>(entry.First));
315                         return true;
316                     }
317                 }
318 
319                 return false;
320             }
321 
Dispose(bool disposing)322             protected override void Dispose(bool disposing)
323             {
324                 Debug.Assert(_leftSource != null && _rightSource != null);
325                 _leftSource.Dispose();
326                 _rightSource.Dispose();
327             }
328         }
329     }
330 }
331