1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // HashJoinQueryOperatorEnumerator.cs 8 // 9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 10 11 using System.Collections.Generic; 12 using System.Diagnostics; 13 using System.Threading; 14 15 namespace System.Linq.Parallel 16 { 17 /// <summary> 18 /// This enumerator implements the hash-join algorithm as noted earlier. 19 /// 20 /// Assumptions: 21 /// This enumerator type won't work properly at all if the analysis engine didn't 22 /// ensure a proper hash-partition. We expect inner and outer elements with equal 23 /// keys are ALWAYS in the same partition. If they aren't (e.g. if the analysis is 24 /// busted) we'll silently drop items on the floor. :( 25 /// 26 /// 27 /// This is the enumerator class for two operators: 28 /// - Join 29 /// - GroupJoin 30 /// </summary> 31 /// <typeparam name="TLeftInput"></typeparam> 32 /// <typeparam name="TLeftKey"></typeparam> 33 /// <typeparam name="TRightInput"></typeparam> 34 /// <typeparam name="THashKey"></typeparam> 35 /// <typeparam name="TOutput"></typeparam> 36 internal class HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, THashKey, TOutput> 37 : QueryOperatorEnumerator<TOutput, TLeftKey> 38 { 39 private readonly QueryOperatorEnumerator<Pair<TLeftInput,THashKey>, TLeftKey> _leftSource; // Left (outer) data source. For probing. 40 private readonly QueryOperatorEnumerator<Pair<TRightInput,THashKey>, int> _rightSource; // Right (inner) data source. For building. 41 private readonly Func<TLeftInput, TRightInput, TOutput> _singleResultSelector; // Single result selector. 42 private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> _groupResultSelector; // Group result selector. 43 private readonly IEqualityComparer<THashKey> _keyComparer; // An optional key comparison object. 44 private readonly CancellationToken _cancellationToken; 45 private Mutables _mutables; 46 47 private class Mutables 48 { 49 internal TLeftInput _currentLeft; // The current matching left element. 50 internal TLeftKey _currentLeftKey; // The current index of the matching left element. 51 internal HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>> _rightHashLookup; // The hash lookup. 52 internal ListChunk<TRightInput> _currentRightMatches; // Current right matches (if any). 53 internal int _currentRightMatchesIndex; // Current index in the set of right matches. 54 internal int _outputLoopCount; 55 } 56 57 //--------------------------------------------------------------------------------------- 58 // Instantiates a new hash-join enumerator. 59 // 60 HashJoinQueryOperatorEnumerator( QueryOperatorEnumerator<Pair<TLeftInput,THashKey>, TLeftKey> leftSource, QueryOperatorEnumerator<Pair<TRightInput,THashKey>, int> rightSource, Func<TLeftInput, TRightInput, TOutput> singleResultSelector, Func<TLeftInput, IEnumerable<TRightInput>, TOutput> groupResultSelector, IEqualityComparer<THashKey> keyComparer, CancellationToken cancellationToken)61 internal HashJoinQueryOperatorEnumerator( 62 QueryOperatorEnumerator<Pair<TLeftInput,THashKey>, TLeftKey> leftSource, 63 QueryOperatorEnumerator<Pair<TRightInput,THashKey>, int> rightSource, 64 Func<TLeftInput, TRightInput, TOutput> singleResultSelector, 65 Func<TLeftInput, IEnumerable<TRightInput>, TOutput> groupResultSelector, 66 IEqualityComparer<THashKey> keyComparer, 67 CancellationToken cancellationToken) 68 { 69 Debug.Assert(leftSource != null); 70 Debug.Assert(rightSource != null); 71 Debug.Assert(singleResultSelector != null || groupResultSelector != null); 72 73 _leftSource = leftSource; 74 _rightSource = rightSource; 75 _singleResultSelector = singleResultSelector; 76 _groupResultSelector = groupResultSelector; 77 _keyComparer = keyComparer; 78 _cancellationToken = cancellationToken; 79 } 80 81 //--------------------------------------------------------------------------------------- 82 // MoveNext implements all the hash-join logic noted earlier. When it is called first, it 83 // will execute the entire inner query tree, and build a hash-table lookup. This is the 84 // Building phase. Then for the first call and all subsequent calls to MoveNext, we will 85 // incrementally perform the Probing phase. We'll keep getting elements from the outer 86 // data source, looking into the hash-table we built, and enumerating the full results. 87 // 88 // This routine supports both inner and outer (group) joins. An outer join will yield a 89 // (possibly empty) list of matching elements from the inner instead of one-at-a-time, 90 // as we do for inner joins. 91 // 92 MoveNext(ref TOutput currentElement, ref TLeftKey currentKey)93 internal override bool MoveNext(ref TOutput currentElement, ref TLeftKey currentKey) 94 { 95 Debug.Assert(_singleResultSelector != null || _groupResultSelector != null, "expected a compiled result selector"); 96 Debug.Assert(_leftSource != null); 97 Debug.Assert(_rightSource != null); 98 99 // BUILD phase: If we haven't built the hash-table yet, create that first. 100 Mutables mutables = _mutables; 101 if (mutables == null) 102 { 103 mutables = _mutables = new Mutables(); 104 #if DEBUG 105 int hashLookupCount = 0; 106 int hashKeyCollisions = 0; 107 #endif 108 mutables._rightHashLookup = new HashLookup<THashKey, Pair<TRightInput, ListChunk<TRightInput>>>(_keyComparer); 109 110 Pair<TRightInput, THashKey> rightPair = default(Pair<TRightInput, THashKey>); 111 int rightKeyUnused = default(int); 112 int i = 0; 113 while (_rightSource.MoveNext(ref rightPair, ref rightKeyUnused)) 114 { 115 if ((i++ & CancellationState.POLL_INTERVAL) == 0) 116 CancellationState.ThrowIfCanceled(_cancellationToken); 117 118 TRightInput rightElement = rightPair.First; 119 THashKey rightHashKey = rightPair.Second; 120 121 // We ignore null keys. 122 if (rightHashKey != null) 123 { 124 #if DEBUG 125 hashLookupCount++; 126 #endif 127 128 // See if we've already stored an element under the current key. If not, we 129 // lazily allocate a pair to hold the elements mapping to the same key. 130 const int INITIAL_CHUNK_SIZE = 2; 131 Pair<TRightInput, ListChunk<TRightInput>> currentValue = default(Pair<TRightInput, ListChunk<TRightInput>>); 132 if (!mutables._rightHashLookup.TryGetValue(rightHashKey, ref currentValue)) 133 { 134 currentValue = new Pair<TRightInput, ListChunk<TRightInput>>(rightElement, null); 135 136 if (_groupResultSelector != null) 137 { 138 // For group joins, we also add the element to the list. This makes 139 // it easier later to yield the list as-is. 140 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE); 141 currentValue.Second.Add(rightElement); 142 } 143 144 mutables._rightHashLookup.Add(rightHashKey, currentValue); 145 } 146 else 147 { 148 if (currentValue.Second == null) 149 { 150 // Lazily allocate a list to hold all but the 1st value. We need to 151 // re-store this element because the pair is a value type. 152 currentValue.Second = new ListChunk<TRightInput>(INITIAL_CHUNK_SIZE); 153 mutables._rightHashLookup[rightHashKey] = currentValue; 154 } 155 156 currentValue.Second.Add(rightElement); 157 #if DEBUG 158 hashKeyCollisions++; 159 #endif 160 } 161 } 162 } 163 164 #if DEBUG 165 TraceHelpers.TraceInfo("ParallelJoinQueryOperator::MoveNext - built hash table [count = {0}, collisions = {1}]", 166 hashLookupCount, hashKeyCollisions); 167 #endif 168 } 169 170 // PROBE phase: So long as the source has a next element, return the match. 171 ListChunk<TRightInput> currentRightChunk = mutables._currentRightMatches; 172 if (currentRightChunk != null && mutables._currentRightMatchesIndex == currentRightChunk.Count) 173 { 174 currentRightChunk = mutables._currentRightMatches = currentRightChunk.Next; 175 mutables._currentRightMatchesIndex = 0; 176 } 177 178 if (mutables._currentRightMatches == null) 179 { 180 // We have to look up the next list of matches in the hash-table. 181 Pair<TLeftInput, THashKey> leftPair = default(Pair<TLeftInput, THashKey>); 182 TLeftKey leftKey = default(TLeftKey); 183 while (_leftSource.MoveNext(ref leftPair, ref leftKey)) 184 { 185 if ((mutables._outputLoopCount++ & CancellationState.POLL_INTERVAL) == 0) 186 CancellationState.ThrowIfCanceled(_cancellationToken); 187 188 // Find the match in the hash table. 189 Pair<TRightInput, ListChunk<TRightInput>> matchValue = default(Pair<TRightInput, ListChunk<TRightInput>>); 190 TLeftInput leftElement = leftPair.First; 191 THashKey leftHashKey = leftPair.Second; 192 193 // Ignore null keys. 194 if (leftHashKey != null) 195 { 196 if (mutables._rightHashLookup.TryGetValue(leftHashKey, ref matchValue)) 197 { 198 // We found a new match. For inner joins, we remember the list in case 199 // there are multiple value under this same key -- the next iteration will pick 200 // them up. For outer joins, we will use the list momentarily. 201 if (_singleResultSelector != null) 202 { 203 mutables._currentRightMatches = matchValue.Second; 204 Debug.Assert(mutables._currentRightMatches == null || mutables._currentRightMatches.Count > 0, 205 "we were expecting that the list would be either null or empty"); 206 mutables._currentRightMatchesIndex = 0; 207 208 // Yield the value. 209 currentElement = _singleResultSelector(leftElement, matchValue.First); 210 currentKey = leftKey; 211 212 // If there is a list of matches, remember the left values for next time. 213 if (matchValue.Second != null) 214 { 215 mutables._currentLeft = leftElement; 216 mutables._currentLeftKey = leftKey; 217 } 218 219 return true; 220 } 221 } 222 } 223 224 // For outer joins, we always yield a result. 225 if (_groupResultSelector != null) 226 { 227 // Grab the matches, or create an empty list if there are none. 228 IEnumerable<TRightInput> matches = matchValue.Second; 229 if (matches == null) 230 { 231 matches = ParallelEnumerable.Empty<TRightInput>(); 232 } 233 234 // Generate the current value. 235 currentElement = _groupResultSelector(leftElement, matches); 236 currentKey = leftKey; 237 return true; 238 } 239 } 240 241 // If we've reached the end of the data source, we're done. 242 return false; 243 } 244 245 // Produce the next element and increment our index within the matches. 246 Debug.Assert(_singleResultSelector != null); 247 Debug.Assert(mutables._currentRightMatches != null); 248 Debug.Assert(0 <= mutables._currentRightMatchesIndex && mutables._currentRightMatchesIndex < mutables._currentRightMatches.Count); 249 250 currentElement = _singleResultSelector( 251 mutables._currentLeft, mutables._currentRightMatches._chunk[mutables._currentRightMatchesIndex]); 252 currentKey = mutables._currentLeftKey; 253 254 mutables._currentRightMatchesIndex++; 255 256 return true; 257 } 258 Dispose(bool disposing)259 protected override void Dispose(bool disposing) 260 { 261 Debug.Assert(_leftSource != null && _rightSource != null); 262 _leftSource.Dispose(); 263 _rightSource.Dispose(); 264 } 265 } 266 } 267