1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // HashJoinQueryOperatorEnumerator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10 
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14 
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// This enumerator implements the hash-join algorithm as noted earlier.
19     ///
20     /// Assumptions:
21     ///     This enumerator type won't work properly at all if the analysis engine didn't
22     ///     ensure a proper hash-partition. We expect inner and outer elements with equal
23     ///     keys are ALWAYS in the same partition. If they aren't (e.g. if the analysis is
24     ///     busted) we'll silently drop items on the floor. :(
25     ///
26     ///
27     ///  This is the enumerator class for two operators:
28     ///   - Join
29     ///   - GroupJoin
30     /// </summary>
31     /// <typeparam name="TLeftInput"></typeparam>
32     /// <typeparam name="TLeftKey"></typeparam>
33     /// <typeparam name="TRightInput"></typeparam>
34     /// <typeparam name="THashKey"></typeparam>
35     /// <typeparam name="TOutput"></typeparam>
36     internal class HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, THashKey, TOutput>
37         : QueryOperatorEnumerator<TOutput, TLeftKey>
38     {
39         private readonly QueryOperatorEnumerator<Pair<TLeftInput,THashKey>, TLeftKey> _leftSource; // Left (outer) data source. For probing.
40         private readonly QueryOperatorEnumerator<Pair<TRightInput,THashKey>, int> _rightSource; // Right (inner) data source. For building.
41         private readonly Func<TLeftInput, TRightInput, TOutput> _singleResultSelector; // Single result selector.
42         private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> _groupResultSelector; // Group result selector.
43         private readonly IEqualityComparer<THashKey> _keyComparer; // An optional key comparison object.
44         private readonly CancellationToken _cancellationToken;
45         private Mutables _mutables;
46 
47         private class Mutables
48         {
49             internal TLeftInput _currentLeft; // The current matching left element.
50             internal TLeftKey _currentLeftKey; // The current index of the matching left element.
51             internal HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>> _rightHashLookup; // The hash lookup.
52             internal ListChunk<TRightInput> _currentRightMatches; // Current right matches (if any).
53             internal int _currentRightMatchesIndex; // Current index in the set of right matches.
54             internal int _outputLoopCount;
55         }
56 
57         //---------------------------------------------------------------------------------------
58         // Instantiates a new hash-join enumerator.
59         //
60 
HashJoinQueryOperatorEnumerator( QueryOperatorEnumerator<Pair<TLeftInput,THashKey>, TLeftKey> leftSource, QueryOperatorEnumerator<Pair<TRightInput,THashKey>, int> rightSource, Func<TLeftInput, TRightInput, TOutput> singleResultSelector, Func<TLeftInput, IEnumerable<TRightInput>, TOutput> groupResultSelector, IEqualityComparer<THashKey> keyComparer, CancellationToken cancellationToken)61         internal HashJoinQueryOperatorEnumerator(
62             QueryOperatorEnumerator<Pair<TLeftInput,THashKey>, TLeftKey> leftSource,
63             QueryOperatorEnumerator<Pair<TRightInput,THashKey>, int> rightSource,
64             Func<TLeftInput, TRightInput, TOutput> singleResultSelector,
65             Func<TLeftInput, IEnumerable<TRightInput>, TOutput> groupResultSelector,
66             IEqualityComparer<THashKey> keyComparer,
67             CancellationToken cancellationToken)
68         {
69             Debug.Assert(leftSource != null);
70             Debug.Assert(rightSource != null);
71             Debug.Assert(singleResultSelector != null || groupResultSelector != null);
72 
73             _leftSource = leftSource;
74             _rightSource = rightSource;
75             _singleResultSelector = singleResultSelector;
76             _groupResultSelector = groupResultSelector;
77             _keyComparer = keyComparer;
78             _cancellationToken = cancellationToken;
79         }
80 
81         //---------------------------------------------------------------------------------------
82         // MoveNext implements all the hash-join logic noted earlier. When it is called first, it
83         // will execute the entire inner query tree, and build a hash-table lookup. This is the
84         // Building phase. Then for the first call and all subsequent calls to MoveNext, we will
85         // incrementally perform the Probing phase. We'll keep getting elements from the outer
86         // data source, looking into the hash-table we built, and enumerating the full results.
87         //
88         // This routine supports both inner and outer (group) joins. An outer join will yield a
89         // (possibly empty) list of matching elements from the inner instead of one-at-a-time,
90         // as we do for inner joins.
91         //
92 
MoveNext(ref TOutput currentElement, ref TLeftKey currentKey)93         internal override bool MoveNext(ref TOutput currentElement, ref TLeftKey currentKey)
94         {
95             Debug.Assert(_singleResultSelector != null || _groupResultSelector != null, "expected a compiled result selector");
96             Debug.Assert(_leftSource != null);
97             Debug.Assert(_rightSource != null);
98 
99             // BUILD phase: If we haven't built the hash-table yet, create that first.
100             Mutables mutables = _mutables;
101             if (mutables == null)
102             {
103                 mutables = _mutables = new Mutables();
104 #if DEBUG
105                 int hashLookupCount = 0;
106                 int hashKeyCollisions = 0;
107 #endif
108                 mutables._rightHashLookup = new HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>>(_keyComparer);
109 
110                 Pair<TRightInput, THashKey> rightPair = default(Pair<TRightInput, THashKey>);
111                 int rightKeyUnused = default(int);
112                 int i = 0;
113                 while (_rightSource.MoveNext(ref rightPair, ref rightKeyUnused))
114                 {
115                     if ((i++ & CancellationState.POLL_INTERVAL) == 0)
116                         CancellationState.ThrowIfCanceled(_cancellationToken);
117 
118                     TRightInput rightElement = rightPair.First;
119                     THashKey rightHashKey = rightPair.Second;
120 
121                     // We ignore null keys.
122                     if (rightHashKey != null)
123                     {
124 #if DEBUG
125                         hashLookupCount++;
126 #endif
127 
128                         // See if we've already stored an element under the current key. If not, we
129                         // lazily allocate a pair to hold the elements mapping to the same key.
130                         const int INITIAL_CHUNK_SIZE = 2;
131                         Pair<TRightInput, ListChunk<TRightInput>> currentValue = default(Pair<TRightInput, ListChunk<TRightInput>>);
132                         if (!mutables._rightHashLookup.TryGetValue(rightHashKey, ref currentValue))
133                         {
134                             currentValue = new Pair<TRightInput, ListChunk<TRightInput>>(rightElement, null);
135 
136                             if (_groupResultSelector != null)
137                             {
138                                 // For group joins, we also add the element to the list. This makes
139                                 // it easier later to yield the list as-is.
140                                 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE);
141                                 currentValue.Second.Add(rightElement);
142                             }
143 
144                             mutables._rightHashLookup.Add(rightHashKey, currentValue);
145                         }
146                         else
147                         {
148                             if (currentValue.Second == null)
149                             {
150                                 // Lazily allocate a list to hold all but the 1st value. We need to
151                                 // re-store this element because the pair is a value type.
152                                 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE);
153                                 mutables._rightHashLookup[rightHashKey] = currentValue;
154                             }
155 
156                             currentValue.Second.Add(rightElement);
157 #if DEBUG
158                             hashKeyCollisions++;
159 #endif
160                         }
161                     }
162                 }
163 
164 #if DEBUG
165                 TraceHelpers.TraceInfo("ParallelJoinQueryOperator::MoveNext - built hash table [count = {0}, collisions = {1}]",
166                     hashLookupCount, hashKeyCollisions);
167 #endif
168             }
169 
170             // PROBE phase: So long as the source has a next element, return the match.
171             ListChunk<TRightInput> currentRightChunk = mutables._currentRightMatches;
172             if (currentRightChunk != null && mutables._currentRightMatchesIndex == currentRightChunk.Count)
173             {
174                 currentRightChunk = mutables._currentRightMatches = currentRightChunk.Next;
175                 mutables._currentRightMatchesIndex = 0;
176             }
177 
178             if (mutables._currentRightMatches == null)
179             {
180                 // We have to look up the next list of matches in the hash-table.
181                 Pair<TLeftInput, THashKey> leftPair = default(Pair<TLeftInput, THashKey>);
182                 TLeftKey leftKey = default(TLeftKey);
183                 while (_leftSource.MoveNext(ref leftPair, ref leftKey))
184                 {
185                     if ((mutables._outputLoopCount++ & CancellationState.POLL_INTERVAL) == 0)
186                         CancellationState.ThrowIfCanceled(_cancellationToken);
187 
188                     // Find the match in the hash table.
189                     Pair<TRightInput, ListChunk<TRightInput>> matchValue = default(Pair<TRightInput, ListChunk<TRightInput>>);
190                     TLeftInput leftElement = leftPair.First;
191                     THashKey leftHashKey = leftPair.Second;
192 
193                     // Ignore null keys.
194                     if (leftHashKey != null)
195                     {
196                         if (mutables._rightHashLookup.TryGetValue(leftHashKey, ref matchValue))
197                         {
198                             // We found a new match. For inner joins, we remember the list in case
199                             // there are multiple value under this same key -- the next iteration will pick
200                             // them up. For outer joins, we will use the list momentarily.
201                             if (_singleResultSelector != null)
202                             {
203                                 mutables._currentRightMatches = matchValue.Second;
204                                 Debug.Assert(mutables._currentRightMatches == null || mutables._currentRightMatches.Count > 0,
205                                                 "we were expecting that the list would be either null or empty");
206                                 mutables._currentRightMatchesIndex = 0;
207 
208                                 // Yield the value.
209                                 currentElement = _singleResultSelector(leftElement, matchValue.First);
210                                 currentKey = leftKey;
211 
212                                 // If there is a list of matches, remember the left values for next time.
213                                 if (matchValue.Second != null)
214                                 {
215                                     mutables._currentLeft = leftElement;
216                                     mutables._currentLeftKey = leftKey;
217                                 }
218 
219                                 return true;
220                             }
221                         }
222                     }
223 
224                     // For outer joins, we always yield a result.
225                     if (_groupResultSelector != null)
226                     {
227                         // Grab the matches, or create an empty list if there are none.
228                         IEnumerable<TRightInput> matches = matchValue.Second;
229                         if (matches == null)
230                         {
231                             matches = ParallelEnumerable.Empty<TRightInput>();
232                         }
233 
234                         // Generate the current value.
235                         currentElement = _groupResultSelector(leftElement, matches);
236                         currentKey = leftKey;
237                         return true;
238                     }
239                 }
240 
241                 // If we've reached the end of the data source, we're done.
242                 return false;
243             }
244 
245             // Produce the next element and increment our index within the matches.
246             Debug.Assert(_singleResultSelector != null);
247             Debug.Assert(mutables._currentRightMatches != null);
248             Debug.Assert(0 <= mutables._currentRightMatchesIndex && mutables._currentRightMatchesIndex < mutables._currentRightMatches.Count);
249 
250             currentElement = _singleResultSelector(
251                 mutables._currentLeft, mutables._currentRightMatches._chunk[mutables._currentRightMatchesIndex]);
252             currentKey = mutables._currentLeftKey;
253 
254             mutables._currentRightMatchesIndex++;
255 
256             return true;
257         }
258 
Dispose(bool disposing)259         protected override void Dispose(bool disposing)
260         {
261             Debug.Assert(_leftSource != null && _rightSource != null);
262             _leftSource.Dispose();
263             _rightSource.Dispose();
264         }
265     }
266 }
267