1 // ==++== 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // ==--== 6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 7 // 8 // WhereQueryOperator.cs 9 // 10 // <OWNER>Microsoft</OWNER> 11 // 12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 13 14 using System.Collections.Generic; 15 using System.Diagnostics.Contracts; 16 using System.Threading; 17 18 namespace System.Linq.Parallel 19 { 20 /// <summary> 21 /// The operator type for Where statements. This operator filters out elements that 22 /// don't match a filter function (supplied at instantiation time). 23 /// </summary> 24 /// <typeparam name="TInputOutput"></typeparam> 25 internal sealed class WhereQueryOperator<TInputOutput> : UnaryQueryOperator<TInputOutput, TInputOutput> 26 { 27 28 // Predicate function. Used to filter out non-matching elements during execution. 29 private Func<TInputOutput, bool> m_predicate; 30 31 //--------------------------------------------------------------------------------------- 32 // Initializes a new where operator. 33 // 34 // Arguments: 35 // child - the child operator or data source from which to pull data 36 // predicate - a delegate representing the predicate function 37 // 38 // Assumptions: 39 // predicate must be non null. 40 // 41 WhereQueryOperator(IEnumerable<TInputOutput> child, Func<TInputOutput, bool> predicate)42 internal WhereQueryOperator(IEnumerable<TInputOutput> child, Func<TInputOutput, bool> predicate) 43 : base(child) 44 { 45 Contract.Assert(child != null, "child data source cannot be null"); 46 Contract.Assert(predicate != null, "need a filter function"); 47 48 SetOrdinalIndexState( 49 ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Increasing)); 50 51 m_predicate = predicate; 52 } 53 WrapPartitionedStream( PartitionedStream<TInputOutput, TKey> inputStream, IPartitionedStreamRecipient<TInputOutput> recipient, bool preferStriping, QuerySettings settings)54 internal override void WrapPartitionedStream<TKey>( 55 PartitionedStream<TInputOutput, TKey> inputStream, IPartitionedStreamRecipient<TInputOutput> recipient, bool preferStriping, QuerySettings settings) 56 { 57 PartitionedStream<TInputOutput, TKey> outputStream = new PartitionedStream<TInputOutput, TKey>( 58 inputStream.PartitionCount, inputStream.KeyComparer, OrdinalIndexState); 59 for (int i = 0; i < inputStream.PartitionCount; i++) 60 { 61 outputStream[i] = new WhereQueryOperatorEnumerator<TKey>(inputStream[i], m_predicate, 62 settings.CancellationState.MergedCancellationToken); 63 } 64 65 recipient.Receive(outputStream); 66 } 67 68 //--------------------------------------------------------------------------------------- 69 // Just opens the current operator, including opening the child and wrapping it with 70 // partitions as needed. 71 // 72 Open(QuerySettings settings, bool preferStriping)73 internal override QueryResults<TInputOutput> Open(QuerySettings settings, bool preferStriping) 74 { 75 // We just open the child operator. 76 QueryResults<TInputOutput> childQueryResults = Child.Open(settings, preferStriping); 77 78 // And then return the query results 79 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); 80 } 81 82 //--------------------------------------------------------------------------------------- 83 // Returns an enumerable that represents the query executing sequentially. 84 // 85 AsSequentialQuery(CancellationToken token)86 internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token) 87 { 88 IEnumerable<TInputOutput> wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token); 89 return wrappedChild.Where(m_predicate); 90 } 91 92 //--------------------------------------------------------------------------------------- 93 // Whether this operator performs a premature merge that would not be performed in 94 // a similar sequential operation (i.e., in LINQ to Objects). 95 // 96 97 internal override bool LimitsParallelism 98 { 99 get { return false; } 100 } 101 102 //----------------------------------------------------------------------------------- 103 // An enumerator that implements the filtering logic. 104 // 105 106 private class WhereQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TInputOutput, TKey> 107 { 108 109 private readonly QueryOperatorEnumerator<TInputOutput, TKey> m_source; // The data source to enumerate. 110 private readonly Func<TInputOutput, bool> m_predicate; // The predicate used for filtering. 111 private CancellationToken m_cancellationToken; 112 private Shared<int> m_outputLoopCount; 113 114 //----------------------------------------------------------------------------------- 115 // Instantiates a new enumerator. 116 // 117 WhereQueryOperatorEnumerator(QueryOperatorEnumerator<TInputOutput, TKey> source, Func<TInputOutput, bool> predicate, CancellationToken cancellationToken)118 internal WhereQueryOperatorEnumerator(QueryOperatorEnumerator<TInputOutput, TKey> source, Func<TInputOutput, bool> predicate, 119 CancellationToken cancellationToken) 120 { 121 Contract.Assert(source != null); 122 Contract.Assert(predicate != null); 123 124 m_source = source; 125 m_predicate = predicate; 126 m_cancellationToken = cancellationToken; 127 } 128 129 //----------------------------------------------------------------------------------- 130 // Moves to the next matching element in the underlying data stream. 131 // 132 MoveNext(ref TInputOutput currentElement, ref TKey currentKey)133 internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey) 134 { 135 Contract.Assert(m_predicate != null, "expected a compiled operator"); 136 137 // Iterate through the input until we reach the end of the sequence or find 138 // an element matching the predicate. 139 140 if (m_outputLoopCount == null) 141 m_outputLoopCount = new Shared<int>(0); 142 143 while (m_source.MoveNext(ref currentElement, ref currentKey)) 144 { 145 if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0) 146 CancellationState.ThrowIfCanceled(m_cancellationToken); 147 148 if (m_predicate(currentElement)) 149 { 150 return true; 151 } 152 } 153 return false; 154 } 155 Dispose(bool disposing)156 protected override void Dispose(bool disposing) 157 { 158 Contract.Assert(m_source != null); 159 m_source.Dispose(); 160 } 161 } 162 } 163 } 164