1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // ConcatQueryOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10 
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Threading;
14 
15 namespace System.Linq.Parallel
16 {
17     /// <summary>
18     /// Concatenates one data source with another.  Order preservation is used to ensure
19     /// the output is actually a concatenation -- i.e. one after the other.  The only
20     /// special synchronization required is to find the largest index N in the first data
21     /// source so that the indices of elements in the second data source can be offset
22     /// by adding N+1.  This makes it appear to the order preservation infrastructure as
23     /// though all elements in the second came after all elements in the first, which is
24     /// precisely what we want.
25     /// </summary>
26     /// <typeparam name="TSource"></typeparam>
27     internal sealed class ConcatQueryOperator<TSource> : BinaryQueryOperator<TSource, TSource, TSource>
28     {
29         private readonly bool _prematureMergeLeft = false; // Whether to prematurely merge the left data source
30         private readonly bool _prematureMergeRight = false; // Whether to prematurely merge the right data source
31 
32         //---------------------------------------------------------------------------------------
33         // Initializes a new concatenation operator.
34         //
35         // Arguments:
36         //     child                - the child whose data we will reverse
37         //
38 
ConcatQueryOperator(ParallelQuery<TSource> firstChild, ParallelQuery<TSource> secondChild)39         internal ConcatQueryOperator(ParallelQuery<TSource> firstChild, ParallelQuery<TSource> secondChild)
40             : base(firstChild, secondChild)
41         {
42             Debug.Assert(firstChild != null, "first child data source cannot be null");
43             Debug.Assert(secondChild != null, "second child data source cannot be null");
44             _outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered;
45 
46             _prematureMergeLeft = LeftChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
47             _prematureMergeRight = RightChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing);
48 
49             if ((LeftChild.OrdinalIndexState == OrdinalIndexState.Indexable)
50                 && (RightChild.OrdinalIndexState == OrdinalIndexState.Indexable))
51             {
52                 SetOrdinalIndex(OrdinalIndexState.Indexable);
53             }
54             else
55             {
56                 SetOrdinalIndex(
57                     ExchangeUtilities.Worse(OrdinalIndexState.Increasing,
58                         ExchangeUtilities.Worse(LeftChild.OrdinalIndexState, RightChild.OrdinalIndexState)));
59             }
60         }
61 
62         //---------------------------------------------------------------------------------------
63         // Just opens the current operator, including opening the child and wrapping it with
64         // partitions as needed.
65         //
66 
Open(QuerySettings settings, bool preferStriping)67         internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping)
68         {
69             // We just open the children operators.
70             QueryResults<TSource> leftChildResults = LeftChild.Open(settings, preferStriping);
71             QueryResults<TSource> rightChildResults = RightChild.Open(settings, preferStriping);
72 
73             return ConcatQueryOperatorResults.NewResults(leftChildResults, rightChildResults, this, settings, preferStriping);
74         }
75 
WrapPartitionedStream( PartitionedStream<TSource, TLeftKey> leftStream, PartitionedStream<TSource, TRightKey> rightStream, IPartitionedStreamRecipient<TSource> outputRecipient, bool preferStriping, QuerySettings settings)76         public override void WrapPartitionedStream<TLeftKey, TRightKey>(
77             PartitionedStream<TSource, TLeftKey> leftStream, PartitionedStream<TSource, TRightKey> rightStream,
78             IPartitionedStreamRecipient<TSource> outputRecipient, bool preferStriping, QuerySettings settings)
79         {
80             // Prematurely merge the left results, if necessary
81             if (_prematureMergeLeft)
82             {
83                 ListQueryResults<TSource> leftStreamResults =
84                     ExecuteAndCollectResults(leftStream, leftStream.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings);
85                 PartitionedStream<TSource, int> leftStreamInc = leftStreamResults.GetPartitionedStream();
86                 WrapHelper<int, TRightKey>(leftStreamInc, rightStream, outputRecipient, settings, preferStriping);
87             }
88             else
89             {
90                 Debug.Assert(!ExchangeUtilities.IsWorseThan(leftStream.OrdinalIndexState, OrdinalIndexState.Increasing));
91                 WrapHelper<TLeftKey, TRightKey>(leftStream, rightStream, outputRecipient, settings, preferStriping);
92             }
93         }
94 
WrapHelper( PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStream, IPartitionedStreamRecipient<TSource> outputRecipient, QuerySettings settings, bool preferStriping)95         private void WrapHelper<TLeftKey, TRightKey>(
96             PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStream,
97             IPartitionedStreamRecipient<TSource> outputRecipient, QuerySettings settings, bool preferStriping)
98         {
99             // Prematurely merge the right results, if necessary
100             if (_prematureMergeRight)
101             {
102                 ListQueryResults<TSource> rightStreamResults =
103                     ExecuteAndCollectResults(rightStream, leftStreamInc.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings);
104                 PartitionedStream<TSource, int> rightStreamInc = rightStreamResults.GetPartitionedStream();
105                 WrapHelper2<TLeftKey, int>(leftStreamInc, rightStreamInc, outputRecipient);
106             }
107             else
108             {
109                 Debug.Assert(!ExchangeUtilities.IsWorseThan(rightStream.OrdinalIndexState, OrdinalIndexState.Increasing));
110                 WrapHelper2<TLeftKey, TRightKey>(leftStreamInc, rightStream, outputRecipient);
111             }
112         }
113 
WrapHelper2( PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStreamInc, IPartitionedStreamRecipient<TSource> outputRecipient)114         private void WrapHelper2<TLeftKey, TRightKey>(
115             PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStreamInc,
116             IPartitionedStreamRecipient<TSource> outputRecipient)
117         {
118             int partitionCount = leftStreamInc.PartitionCount;
119 
120             // Generate the shared data.
121             IComparer<ConcatKey<TLeftKey, TRightKey>> comparer = ConcatKey<TLeftKey, TRightKey>.MakeComparer(
122                 leftStreamInc.KeyComparer, rightStreamInc.KeyComparer);
123             var outputStream = new PartitionedStream<TSource, ConcatKey<TLeftKey, TRightKey>>(partitionCount, comparer, OrdinalIndexState);
124 
125             for (int i = 0; i < partitionCount; i++)
126             {
127                 outputStream[i] = new ConcatQueryOperatorEnumerator<TLeftKey, TRightKey>(leftStreamInc[i], rightStreamInc[i]);
128             }
129 
130             outputRecipient.Receive(outputStream);
131         }
132 
133 
134         //---------------------------------------------------------------------------------------
135         // Returns an enumerable that represents the query executing sequentially.
136         //
137 
AsSequentialQuery(CancellationToken token)138         internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token)
139         {
140             return LeftChild.AsSequentialQuery(token).Concat(RightChild.AsSequentialQuery(token));
141         }
142 
143 
144         //---------------------------------------------------------------------------------------
145         // Whether this operator performs a premature merge that would not be performed in
146         // a similar sequential operation (i.e., in LINQ to Objects).
147         //
148 
149         internal override bool LimitsParallelism
150         {
151             get { return false; }
152         }
153 
154         //---------------------------------------------------------------------------------------
155         // The enumerator type responsible for concatenating two data sources.
156         //
157 
158         sealed class ConcatQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TSource, ConcatKey<TLeftKey, TRightKey>>
159         {
160             private QueryOperatorEnumerator<TSource, TLeftKey> _firstSource; // The first data source to enumerate.
161             private QueryOperatorEnumerator<TSource, TRightKey> _secondSource; // The second data source to enumerate.
162             private bool _begunSecond; // Whether this partition has begun enumerating the second source yet.
163 
164             //---------------------------------------------------------------------------------------
165             // Instantiates a new select enumerator.
166             //
167 
ConcatQueryOperatorEnumerator( QueryOperatorEnumerator<TSource, TLeftKey> firstSource, QueryOperatorEnumerator<TSource, TRightKey> secondSource)168             internal ConcatQueryOperatorEnumerator(
169                 QueryOperatorEnumerator<TSource, TLeftKey> firstSource,
170                 QueryOperatorEnumerator<TSource, TRightKey> secondSource)
171             {
172                 Debug.Assert(firstSource != null);
173                 Debug.Assert(secondSource != null);
174 
175                 _firstSource = firstSource;
176                 _secondSource = secondSource;
177             }
178 
179             //---------------------------------------------------------------------------------------
180             // MoveNext advances to the next element in the output.  While the first data source has
181             // elements, this consists of just advancing through it.  After this, all partitions must
182             // synchronize at a barrier and publish the maximum index N.  Finally, all partitions can
183             // move on to the second data source, adding N+1 to indices in order to get the correct
184             // index offset.
185             //
186 
MoveNext(ref TSource currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)187             internal override bool MoveNext(ref TSource currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)
188             {
189                 Debug.Assert(_firstSource != null);
190                 Debug.Assert(_secondSource != null);
191 
192                 // If we are still enumerating the first source, fetch the next item.
193                 if (!_begunSecond)
194                 {
195                     // If elements remain, just return true and continue enumerating the left.
196                     TLeftKey leftKey = default(TLeftKey);
197                     if (_firstSource.MoveNext(ref currentElement, ref leftKey))
198                     {
199                         currentKey = ConcatKey<TLeftKey, TRightKey>.MakeLeft(leftKey);
200                         return true;
201                     }
202                     _begunSecond = true;
203                 }
204 
205                 // Now either move on to, or continue, enumerating the right data source.
206                 TRightKey rightKey = default(TRightKey);
207                 if (_secondSource.MoveNext(ref currentElement, ref rightKey))
208                 {
209                     currentKey = ConcatKey<TLeftKey, TRightKey>.MakeRight(rightKey);
210                     return true;
211                 }
212 
213                 return false;
214             }
215 
Dispose(bool disposing)216             protected override void Dispose(bool disposing)
217             {
218                 _firstSource.Dispose();
219                 _secondSource.Dispose();
220             }
221         }
222 
223 
224         //-----------------------------------------------------------------------------------
225         // Query results for a Concat operator. The results are indexable if the child
226         // results were indexable.
227         //
228 
229         class ConcatQueryOperatorResults : BinaryQueryOperatorResults
230         {
231             private int _leftChildCount; // The number of elements in the left child result set
232             private int _rightChildCount; // The number of elements in the right child result set
233 
NewResults( QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults, ConcatQueryOperator<TSource> op, QuerySettings settings, bool preferStriping)234             public static QueryResults<TSource> NewResults(
235                 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults,
236                 ConcatQueryOperator<TSource> op, QuerySettings settings,
237                 bool preferStriping)
238             {
239                 if (leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible)
240                 {
241                     return new ConcatQueryOperatorResults(
242                         leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
243                 }
244                 else
245                 {
246                     return new BinaryQueryOperatorResults(
247                         leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping);
248                 }
249             }
250 
ConcatQueryOperatorResults( QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults, ConcatQueryOperator<TSource> concatOp, QuerySettings settings, bool preferStriping)251             private ConcatQueryOperatorResults(
252                 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults,
253                 ConcatQueryOperator<TSource> concatOp, QuerySettings settings,
254                 bool preferStriping)
255                 : base(leftChildQueryResults, rightChildQueryResults, concatOp, settings, preferStriping)
256             {
257                 Debug.Assert(leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible);
258 
259                 _leftChildCount = leftChildQueryResults.ElementsCount;
260                 _rightChildCount = rightChildQueryResults.ElementsCount;
261             }
262 
263             internal override bool IsIndexible
264             {
265                 get { return true; }
266             }
267 
268             internal override int ElementsCount
269             {
270                 get
271                 {
272                     Debug.Assert(_leftChildCount >= 0 && _rightChildCount >= 0);
273                     return _leftChildCount + _rightChildCount;
274                 }
275             }
276 
GetElement(int index)277             internal override TSource GetElement(int index)
278             {
279                 if (index < _leftChildCount)
280                 {
281                     return _leftChildQueryResults.GetElement(index);
282                 }
283                 else
284                 {
285                     return _rightChildQueryResults.GetElement(index - _leftChildCount);
286                 }
287             }
288         }
289     }
290 
291     //---------------------------------------------------------------------------------------
292     // ConcatKey represents an ordering key for the Concat operator. It knows whether the
293     // element it is associated with is from the left source or the right source, and also
294     // the elements ordering key.
295     //
296 
297     internal struct ConcatKey<TLeftKey, TRightKey>
298     {
299         private readonly TLeftKey _leftKey;
300         private readonly TRightKey _rightKey;
301         private readonly bool _isLeft;
302 
ConcatKeySystem.Linq.Parallel.ConcatKey303         private ConcatKey(TLeftKey leftKey, TRightKey rightKey, bool isLeft)
304         {
305             _leftKey = leftKey;
306             _rightKey = rightKey;
307             _isLeft = isLeft;
308         }
309 
MakeLeftSystem.Linq.Parallel.ConcatKey310         internal static ConcatKey<TLeftKey, TRightKey> MakeLeft(TLeftKey leftKey)
311         {
312             return new ConcatKey<TLeftKey, TRightKey>(leftKey, default(TRightKey), isLeft: true);
313         }
314 
MakeRightSystem.Linq.Parallel.ConcatKey315         internal static ConcatKey<TLeftKey, TRightKey> MakeRight(TRightKey rightKey)
316         {
317             return new ConcatKey<TLeftKey, TRightKey>(default(TLeftKey), rightKey, isLeft: false);
318         }
319 
MakeComparerSystem.Linq.Parallel.ConcatKey320         internal static IComparer<ConcatKey<TLeftKey, TRightKey>> MakeComparer(
321             IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)
322         {
323             return new ConcatKeyComparer(leftComparer, rightComparer);
324         }
325 
326         //---------------------------------------------------------------------------------------
327         // ConcatKeyComparer compares ConcatKeys, so that elements from the left source come
328         // before elements from the right source, and elements within each source are ordered
329         // according to the corresponding order key.
330         //
331 
332         private class ConcatKeyComparer : IComparer<ConcatKey<TLeftKey, TRightKey>>
333         {
334             private IComparer<TLeftKey> _leftComparer;
335             private IComparer<TRightKey> _rightComparer;
336 
ConcatKeyComparer(IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)337             internal ConcatKeyComparer(IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)
338             {
339                 _leftComparer = leftComparer;
340                 _rightComparer = rightComparer;
341             }
342 
Compare(ConcatKey<TLeftKey, TRightKey> x, ConcatKey<TLeftKey, TRightKey> y)343             public int Compare(ConcatKey<TLeftKey, TRightKey> x, ConcatKey<TLeftKey, TRightKey> y)
344             {
345                 // If one element is from the left source and the other not, the element from the left source
346                 // comes earlier.
347                 if (x._isLeft != y._isLeft)
348                 {
349                     return x._isLeft ? -1 : 1;
350                 }
351 
352                 // Elements are from the same source (left or right). Compare the corresponding keys.
353                 if (x._isLeft)
354                 {
355                     return _leftComparer.Compare(x._leftKey, y._leftKey);
356                 }
357                 return _rightComparer.Compare(x._rightKey, y._rightKey);
358             }
359         }
360     }
361 }
362