1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // ConcatQueryOperator.cs 8 // 9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 10 11 using System.Collections.Generic; 12 using System.Diagnostics; 13 using System.Threading; 14 15 namespace System.Linq.Parallel 16 { 17 /// <summary> 18 /// Concatenates one data source with another. Order preservation is used to ensure 19 /// the output is actually a concatenation -- i.e. one after the other. The only 20 /// special synchronization required is to find the largest index N in the first data 21 /// source so that the indices of elements in the second data source can be offset 22 /// by adding N+1. This makes it appear to the order preservation infrastructure as 23 /// though all elements in the second came after all elements in the first, which is 24 /// precisely what we want. 25 /// </summary> 26 /// <typeparam name="TSource"></typeparam> 27 internal sealed class ConcatQueryOperator<TSource> : BinaryQueryOperator<TSource, TSource, TSource> 28 { 29 private readonly bool _prematureMergeLeft = false; // Whether to prematurely merge the left data source 30 private readonly bool _prematureMergeRight = false; // Whether to prematurely merge the right data source 31 32 //--------------------------------------------------------------------------------------- 33 // Initializes a new concatenation operator. 34 // 35 // Arguments: 36 // child - the child whose data we will reverse 37 // 38 ConcatQueryOperator(ParallelQuery<TSource> firstChild, ParallelQuery<TSource> secondChild)39 internal ConcatQueryOperator(ParallelQuery<TSource> firstChild, ParallelQuery<TSource> secondChild) 40 : base(firstChild, secondChild) 41 { 42 Debug.Assert(firstChild != null, "first child data source cannot be null"); 43 Debug.Assert(secondChild != null, "second child data source cannot be null"); 44 _outputOrdered = LeftChild.OutputOrdered || RightChild.OutputOrdered; 45 46 _prematureMergeLeft = LeftChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing); 47 _prematureMergeRight = RightChild.OrdinalIndexState.IsWorseThan(OrdinalIndexState.Increasing); 48 49 if ((LeftChild.OrdinalIndexState == OrdinalIndexState.Indexable) 50 && (RightChild.OrdinalIndexState == OrdinalIndexState.Indexable)) 51 { 52 SetOrdinalIndex(OrdinalIndexState.Indexable); 53 } 54 else 55 { 56 SetOrdinalIndex( 57 ExchangeUtilities.Worse(OrdinalIndexState.Increasing, 58 ExchangeUtilities.Worse(LeftChild.OrdinalIndexState, RightChild.OrdinalIndexState))); 59 } 60 } 61 62 //--------------------------------------------------------------------------------------- 63 // Just opens the current operator, including opening the child and wrapping it with 64 // partitions as needed. 65 // 66 Open(QuerySettings settings, bool preferStriping)67 internal override QueryResults<TSource> Open(QuerySettings settings, bool preferStriping) 68 { 69 // We just open the children operators. 70 QueryResults<TSource> leftChildResults = LeftChild.Open(settings, preferStriping); 71 QueryResults<TSource> rightChildResults = RightChild.Open(settings, preferStriping); 72 73 return ConcatQueryOperatorResults.NewResults(leftChildResults, rightChildResults, this, settings, preferStriping); 74 } 75 WrapPartitionedStream( PartitionedStream<TSource, TLeftKey> leftStream, PartitionedStream<TSource, TRightKey> rightStream, IPartitionedStreamRecipient<TSource> outputRecipient, bool preferStriping, QuerySettings settings)76 public override void WrapPartitionedStream<TLeftKey, TRightKey>( 77 PartitionedStream<TSource, TLeftKey> leftStream, PartitionedStream<TSource, TRightKey> rightStream, 78 IPartitionedStreamRecipient<TSource> outputRecipient, bool preferStriping, QuerySettings settings) 79 { 80 // Prematurely merge the left results, if necessary 81 if (_prematureMergeLeft) 82 { 83 ListQueryResults<TSource> leftStreamResults = 84 ExecuteAndCollectResults(leftStream, leftStream.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings); 85 PartitionedStream<TSource, int> leftStreamInc = leftStreamResults.GetPartitionedStream(); 86 WrapHelper<int, TRightKey>(leftStreamInc, rightStream, outputRecipient, settings, preferStriping); 87 } 88 else 89 { 90 Debug.Assert(!ExchangeUtilities.IsWorseThan(leftStream.OrdinalIndexState, OrdinalIndexState.Increasing)); 91 WrapHelper<TLeftKey, TRightKey>(leftStream, rightStream, outputRecipient, settings, preferStriping); 92 } 93 } 94 WrapHelper( PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStream, IPartitionedStreamRecipient<TSource> outputRecipient, QuerySettings settings, bool preferStriping)95 private void WrapHelper<TLeftKey, TRightKey>( 96 PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStream, 97 IPartitionedStreamRecipient<TSource> outputRecipient, QuerySettings settings, bool preferStriping) 98 { 99 // Prematurely merge the right results, if necessary 100 if (_prematureMergeRight) 101 { 102 ListQueryResults<TSource> rightStreamResults = 103 ExecuteAndCollectResults(rightStream, leftStreamInc.PartitionCount, LeftChild.OutputOrdered, preferStriping, settings); 104 PartitionedStream<TSource, int> rightStreamInc = rightStreamResults.GetPartitionedStream(); 105 WrapHelper2<TLeftKey, int>(leftStreamInc, rightStreamInc, outputRecipient); 106 } 107 else 108 { 109 Debug.Assert(!ExchangeUtilities.IsWorseThan(rightStream.OrdinalIndexState, OrdinalIndexState.Increasing)); 110 WrapHelper2<TLeftKey, TRightKey>(leftStreamInc, rightStream, outputRecipient); 111 } 112 } 113 WrapHelper2( PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStreamInc, IPartitionedStreamRecipient<TSource> outputRecipient)114 private void WrapHelper2<TLeftKey, TRightKey>( 115 PartitionedStream<TSource, TLeftKey> leftStreamInc, PartitionedStream<TSource, TRightKey> rightStreamInc, 116 IPartitionedStreamRecipient<TSource> outputRecipient) 117 { 118 int partitionCount = leftStreamInc.PartitionCount; 119 120 // Generate the shared data. 121 IComparer<ConcatKey<TLeftKey, TRightKey>> comparer = ConcatKey<TLeftKey, TRightKey>.MakeComparer( 122 leftStreamInc.KeyComparer, rightStreamInc.KeyComparer); 123 var outputStream = new PartitionedStream<TSource, ConcatKey<TLeftKey, TRightKey>>(partitionCount, comparer, OrdinalIndexState); 124 125 for (int i = 0; i < partitionCount; i++) 126 { 127 outputStream[i] = new ConcatQueryOperatorEnumerator<TLeftKey, TRightKey>(leftStreamInc[i], rightStreamInc[i]); 128 } 129 130 outputRecipient.Receive(outputStream); 131 } 132 133 134 //--------------------------------------------------------------------------------------- 135 // Returns an enumerable that represents the query executing sequentially. 136 // 137 AsSequentialQuery(CancellationToken token)138 internal override IEnumerable<TSource> AsSequentialQuery(CancellationToken token) 139 { 140 return LeftChild.AsSequentialQuery(token).Concat(RightChild.AsSequentialQuery(token)); 141 } 142 143 144 //--------------------------------------------------------------------------------------- 145 // Whether this operator performs a premature merge that would not be performed in 146 // a similar sequential operation (i.e., in LINQ to Objects). 147 // 148 149 internal override bool LimitsParallelism 150 { 151 get { return false; } 152 } 153 154 //--------------------------------------------------------------------------------------- 155 // The enumerator type responsible for concatenating two data sources. 156 // 157 158 sealed class ConcatQueryOperatorEnumerator<TLeftKey, TRightKey> : QueryOperatorEnumerator<TSource, ConcatKey<TLeftKey, TRightKey>> 159 { 160 private QueryOperatorEnumerator<TSource, TLeftKey> _firstSource; // The first data source to enumerate. 161 private QueryOperatorEnumerator<TSource, TRightKey> _secondSource; // The second data source to enumerate. 162 private bool _begunSecond; // Whether this partition has begun enumerating the second source yet. 163 164 //--------------------------------------------------------------------------------------- 165 // Instantiates a new select enumerator. 166 // 167 ConcatQueryOperatorEnumerator( QueryOperatorEnumerator<TSource, TLeftKey> firstSource, QueryOperatorEnumerator<TSource, TRightKey> secondSource)168 internal ConcatQueryOperatorEnumerator( 169 QueryOperatorEnumerator<TSource, TLeftKey> firstSource, 170 QueryOperatorEnumerator<TSource, TRightKey> secondSource) 171 { 172 Debug.Assert(firstSource != null); 173 Debug.Assert(secondSource != null); 174 175 _firstSource = firstSource; 176 _secondSource = secondSource; 177 } 178 179 //--------------------------------------------------------------------------------------- 180 // MoveNext advances to the next element in the output. While the first data source has 181 // elements, this consists of just advancing through it. After this, all partitions must 182 // synchronize at a barrier and publish the maximum index N. Finally, all partitions can 183 // move on to the second data source, adding N+1 to indices in order to get the correct 184 // index offset. 185 // 186 MoveNext(ref TSource currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey)187 internal override bool MoveNext(ref TSource currentElement, ref ConcatKey<TLeftKey, TRightKey> currentKey) 188 { 189 Debug.Assert(_firstSource != null); 190 Debug.Assert(_secondSource != null); 191 192 // If we are still enumerating the first source, fetch the next item. 193 if (!_begunSecond) 194 { 195 // If elements remain, just return true and continue enumerating the left. 196 TLeftKey leftKey = default(TLeftKey); 197 if (_firstSource.MoveNext(ref currentElement, ref leftKey)) 198 { 199 currentKey = ConcatKey<TLeftKey, TRightKey>.MakeLeft(leftKey); 200 return true; 201 } 202 _begunSecond = true; 203 } 204 205 // Now either move on to, or continue, enumerating the right data source. 206 TRightKey rightKey = default(TRightKey); 207 if (_secondSource.MoveNext(ref currentElement, ref rightKey)) 208 { 209 currentKey = ConcatKey<TLeftKey, TRightKey>.MakeRight(rightKey); 210 return true; 211 } 212 213 return false; 214 } 215 Dispose(bool disposing)216 protected override void Dispose(bool disposing) 217 { 218 _firstSource.Dispose(); 219 _secondSource.Dispose(); 220 } 221 } 222 223 224 //----------------------------------------------------------------------------------- 225 // Query results for a Concat operator. The results are indexable if the child 226 // results were indexable. 227 // 228 229 class ConcatQueryOperatorResults : BinaryQueryOperatorResults 230 { 231 private int _leftChildCount; // The number of elements in the left child result set 232 private int _rightChildCount; // The number of elements in the right child result set 233 NewResults( QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults, ConcatQueryOperator<TSource> op, QuerySettings settings, bool preferStriping)234 public static QueryResults<TSource> NewResults( 235 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults, 236 ConcatQueryOperator<TSource> op, QuerySettings settings, 237 bool preferStriping) 238 { 239 if (leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible) 240 { 241 return new ConcatQueryOperatorResults( 242 leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping); 243 } 244 else 245 { 246 return new BinaryQueryOperatorResults( 247 leftChildQueryResults, rightChildQueryResults, op, settings, preferStriping); 248 } 249 } 250 ConcatQueryOperatorResults( QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults, ConcatQueryOperator<TSource> concatOp, QuerySettings settings, bool preferStriping)251 private ConcatQueryOperatorResults( 252 QueryResults<TSource> leftChildQueryResults, QueryResults<TSource> rightChildQueryResults, 253 ConcatQueryOperator<TSource> concatOp, QuerySettings settings, 254 bool preferStriping) 255 : base(leftChildQueryResults, rightChildQueryResults, concatOp, settings, preferStriping) 256 { 257 Debug.Assert(leftChildQueryResults.IsIndexible && rightChildQueryResults.IsIndexible); 258 259 _leftChildCount = leftChildQueryResults.ElementsCount; 260 _rightChildCount = rightChildQueryResults.ElementsCount; 261 } 262 263 internal override bool IsIndexible 264 { 265 get { return true; } 266 } 267 268 internal override int ElementsCount 269 { 270 get 271 { 272 Debug.Assert(_leftChildCount >= 0 && _rightChildCount >= 0); 273 return _leftChildCount + _rightChildCount; 274 } 275 } 276 GetElement(int index)277 internal override TSource GetElement(int index) 278 { 279 if (index < _leftChildCount) 280 { 281 return _leftChildQueryResults.GetElement(index); 282 } 283 else 284 { 285 return _rightChildQueryResults.GetElement(index - _leftChildCount); 286 } 287 } 288 } 289 } 290 291 //--------------------------------------------------------------------------------------- 292 // ConcatKey represents an ordering key for the Concat operator. It knows whether the 293 // element it is associated with is from the left source or the right source, and also 294 // the elements ordering key. 295 // 296 297 internal struct ConcatKey<TLeftKey, TRightKey> 298 { 299 private readonly TLeftKey _leftKey; 300 private readonly TRightKey _rightKey; 301 private readonly bool _isLeft; 302 ConcatKeySystem.Linq.Parallel.ConcatKey303 private ConcatKey(TLeftKey leftKey, TRightKey rightKey, bool isLeft) 304 { 305 _leftKey = leftKey; 306 _rightKey = rightKey; 307 _isLeft = isLeft; 308 } 309 MakeLeftSystem.Linq.Parallel.ConcatKey310 internal static ConcatKey<TLeftKey, TRightKey> MakeLeft(TLeftKey leftKey) 311 { 312 return new ConcatKey<TLeftKey, TRightKey>(leftKey, default(TRightKey), isLeft: true); 313 } 314 MakeRightSystem.Linq.Parallel.ConcatKey315 internal static ConcatKey<TLeftKey, TRightKey> MakeRight(TRightKey rightKey) 316 { 317 return new ConcatKey<TLeftKey, TRightKey>(default(TLeftKey), rightKey, isLeft: false); 318 } 319 MakeComparerSystem.Linq.Parallel.ConcatKey320 internal static IComparer<ConcatKey<TLeftKey, TRightKey>> MakeComparer( 321 IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer) 322 { 323 return new ConcatKeyComparer(leftComparer, rightComparer); 324 } 325 326 //--------------------------------------------------------------------------------------- 327 // ConcatKeyComparer compares ConcatKeys, so that elements from the left source come 328 // before elements from the right source, and elements within each source are ordered 329 // according to the corresponding order key. 330 // 331 332 private class ConcatKeyComparer : IComparer<ConcatKey<TLeftKey, TRightKey>> 333 { 334 private IComparer<TLeftKey> _leftComparer; 335 private IComparer<TRightKey> _rightComparer; 336 ConcatKeyComparer(IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer)337 internal ConcatKeyComparer(IComparer<TLeftKey> leftComparer, IComparer<TRightKey> rightComparer) 338 { 339 _leftComparer = leftComparer; 340 _rightComparer = rightComparer; 341 } 342 Compare(ConcatKey<TLeftKey, TRightKey> x, ConcatKey<TLeftKey, TRightKey> y)343 public int Compare(ConcatKey<TLeftKey, TRightKey> x, ConcatKey<TLeftKey, TRightKey> y) 344 { 345 // If one element is from the left source and the other not, the element from the left source 346 // comes earlier. 347 if (x._isLeft != y._isLeft) 348 { 349 return x._isLeft ? -1 : 1; 350 } 351 352 // Elements are from the same source (left or right). Compare the corresponding keys. 353 if (x._isLeft) 354 { 355 return _leftComparer.Compare(x._leftKey, y._leftKey); 356 } 357 return _rightComparer.Compare(x._rightKey, y._rightKey); 358 } 359 } 360 } 361 } 362