1 // ==++== 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // ==--== 6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 7 // 8 // GroupJoinQueryOperator.cs 9 // 10 // <OWNER>Microsoft</OWNER> 11 // 12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 13 14 using System.Collections.Generic; 15 using System.Diagnostics.Contracts; 16 using System.Threading; 17 18 namespace System.Linq.Parallel 19 { 20 /// <summary> 21 /// A group join operator takes a left query tree and a right query tree, and then yields 22 /// the matching elements between the two. This can be used for outer joins, i.e. those 23 /// where an outer element has no matching inner elements -- the result is just an empty 24 /// list. As with the join algorithm above, we currently use a hash join algorithm. 25 /// </summary> 26 /// <typeparam name="TLeftInput"></typeparam> 27 /// <typeparam name="TRightInput"></typeparam> 28 /// <typeparam name="TKey"></typeparam> 29 /// <typeparam name="TOutput"></typeparam> 30 internal sealed class GroupJoinQueryOperator<TLeftInput, TRightInput, TKey, TOutput> : BinaryQueryOperator<TLeftInput, TRightInput, TOutput> 31 { 32 33 private readonly Func<TLeftInput, TKey> m_leftKeySelector; // The key selection routine for the outer (left) data source. 34 private readonly Func<TRightInput, TKey> m_rightKeySelector; // The key selection routine for the inner (right) data source. 35 private readonly Func<TLeftInput, IEnumerable<TRightInput>, TOutput> m_resultSelector; // The result selection routine. 36 private readonly IEqualityComparer<TKey> m_keyComparer; // An optional key comparison object. 37 38 //--------------------------------------------------------------------------------------- 39 // Constructs a new join operator. 40 // 41 GroupJoinQueryOperator(ParallelQuery<TLeftInput> left, ParallelQuery<TRightInput> right, Func<TLeftInput, TKey> leftKeySelector, Func<TRightInput, TKey> rightKeySelector, Func<TLeftInput, IEnumerable<TRightInput>, TOutput> resultSelector, IEqualityComparer<TKey> keyComparer)42 internal GroupJoinQueryOperator(ParallelQuery<TLeftInput> left, ParallelQuery<TRightInput> right, 43 Func<TLeftInput, TKey> leftKeySelector, 44 Func<TRightInput, TKey> rightKeySelector, 45 Func<TLeftInput, IEnumerable<TRightInput>, TOutput> resultSelector, 46 IEqualityComparer<TKey> keyComparer) 47 :base(left, right) 48 { 49 Contract.Assert(left != null && right != null, "child data sources cannot be null"); 50 Contract.Assert(leftKeySelector != null, "left key selector must not be null"); 51 Contract.Assert(rightKeySelector != null, "right key selector must not be null"); 52 Contract.Assert(resultSelector != null, "need a result selector function"); 53 54 m_leftKeySelector = leftKeySelector; 55 m_rightKeySelector = rightKeySelector; 56 m_resultSelector = resultSelector; 57 m_keyComparer = keyComparer; 58 m_outputOrdered = LeftChild.OutputOrdered; 59 60 SetOrdinalIndex(OrdinalIndexState.Shuffled); 61 } 62 63 //--------------------------------------------------------------------------------------- 64 // Just opens the current operator, including opening the child and wrapping it with 65 // partitions as needed. 66 // 67 Open(QuerySettings settings, bool preferStriping)68 internal override QueryResults<TOutput> Open(QuerySettings settings, bool preferStriping) 69 { 70 QueryResults<TLeftInput> leftResults = LeftChild.Open(settings, false); 71 QueryResults<TRightInput> rightResults = RightChild.Open(settings, false); 72 73 return new BinaryQueryOperatorResults(leftResults, rightResults, this, settings, false); 74 } 75 WrapPartitionedStream( PartitionedStream<TLeftInput, TLeftKey> leftStream, PartitionedStream<TRightInput, TRightKey> rightStream, IPartitionedStreamRecipient<TOutput> outputRecipient, bool preferStriping, QuerySettings settings)76 public override void WrapPartitionedStream<TLeftKey, TRightKey>( 77 PartitionedStream<TLeftInput, TLeftKey> leftStream, PartitionedStream<TRightInput, TRightKey> rightStream, 78 IPartitionedStreamRecipient<TOutput> outputRecipient, bool preferStriping, QuerySettings settings) 79 { 80 Contract.Assert(rightStream.PartitionCount == leftStream.PartitionCount); 81 int partitionCount = leftStream.PartitionCount; 82 83 if (LeftChild.OutputOrdered) 84 { 85 WrapPartitionedStreamHelper<TLeftKey, TRightKey>( 86 ExchangeUtilities.HashRepartitionOrdered(leftStream, m_leftKeySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken), 87 rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); 88 } 89 else 90 { 91 WrapPartitionedStreamHelper<int, TRightKey>( 92 ExchangeUtilities.HashRepartition(leftStream, m_leftKeySelector, m_keyComparer, null, settings.CancellationState.MergedCancellationToken), 93 rightStream, outputRecipient, partitionCount, settings.CancellationState.MergedCancellationToken); 94 } 95 } 96 97 //--------------------------------------------------------------------------------------- 98 // This is a helper method. WrapPartitionedStream decides what type TLeftKey is going 99 // to be, and then call this method with that key as a generic parameter. 100 // 101 WrapPartitionedStreamHelper( PartitionedStream<Pair<TLeftInput, TKey>, TLeftKey> leftHashStream, PartitionedStream<TRightInput, TRightKey> rightPartitionedStream, IPartitionedStreamRecipient<TOutput> outputRecipient, int partitionCount, CancellationToken cancellationToken)102 private void WrapPartitionedStreamHelper<TLeftKey, TRightKey>( 103 PartitionedStream<Pair<TLeftInput, TKey>, TLeftKey> leftHashStream, PartitionedStream<TRightInput, TRightKey> rightPartitionedStream, 104 IPartitionedStreamRecipient<TOutput> outputRecipient, int partitionCount, CancellationToken cancellationToken) 105 { 106 PartitionedStream<Pair<TRightInput, TKey>, int> rightHashStream = ExchangeUtilities.HashRepartition( 107 rightPartitionedStream, m_rightKeySelector, m_keyComparer, null, cancellationToken); 108 109 PartitionedStream<TOutput, TLeftKey> outputStream = new PartitionedStream<TOutput, TLeftKey>( 110 partitionCount, leftHashStream.KeyComparer, OrdinalIndexState); 111 112 for (int i = 0; i < partitionCount; i++) 113 { 114 outputStream[i] = new HashJoinQueryOperatorEnumerator<TLeftInput, TLeftKey, TRightInput, TKey, TOutput>( 115 leftHashStream[i], rightHashStream[i], null, m_resultSelector, m_keyComparer, cancellationToken); 116 } 117 118 outputRecipient.Receive(outputStream); 119 } 120 121 //--------------------------------------------------------------------------------------- 122 // Returns an enumerable that represents the query executing sequentially. 123 // 124 AsSequentialQuery(CancellationToken token)125 internal override IEnumerable<TOutput> AsSequentialQuery(CancellationToken token) 126 { 127 IEnumerable<TLeftInput> wrappedLeftChild = CancellableEnumerable.Wrap(LeftChild.AsSequentialQuery(token), token); 128 IEnumerable<TRightInput> wrappedRightChild = CancellableEnumerable.Wrap(RightChild.AsSequentialQuery(token), token); 129 130 return wrappedLeftChild 131 .GroupJoin( 132 wrappedRightChild, m_leftKeySelector, m_rightKeySelector, m_resultSelector, m_keyComparer); 133 } 134 135 //--------------------------------------------------------------------------------------- 136 // Whether this operator performs a premature merge that would not be performed in 137 // a similar sequential operation (i.e., in LINQ to Objects). 138 // 139 140 internal override bool LimitsParallelism 141 { 142 get { return false; } 143 } 144 } 145 } 146