1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 //
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // WhereQueryOperator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 
14 using System.Collections.Generic;
15 using System.Diagnostics.Contracts;
16 using System.Threading;
17 
18 namespace System.Linq.Parallel
19 {
20     /// <summary>
21     /// The operator type for Where statements. This operator filters out elements that
22     /// don't match a filter function (supplied at instantiation time).
23     /// </summary>
24     /// <typeparam name="TInputOutput"></typeparam>
25     internal sealed class WhereQueryOperator<TInputOutput> : UnaryQueryOperator<TInputOutput, TInputOutput>
26     {
27 
28         // Predicate function. Used to filter out non-matching elements during execution.
29         private Func<TInputOutput, bool> m_predicate;
30 
31         //---------------------------------------------------------------------------------------
32         // Initializes a new where operator.
33         //
34         // Arguments:
35         //    child         - the child operator or data source from which to pull data
36         //    predicate     - a delegate representing the predicate function
37         //
38         // Assumptions:
39         //    predicate must be non null.
40         //
41 
WhereQueryOperator(IEnumerable<TInputOutput> child, Func<TInputOutput, bool> predicate)42         internal WhereQueryOperator(IEnumerable<TInputOutput> child, Func<TInputOutput, bool> predicate)
43             : base(child)
44         {
45             Contract.Assert(child != null, "child data source cannot be null");
46             Contract.Assert(predicate != null, "need a filter function");
47 
48             SetOrdinalIndexState(
49                 ExchangeUtilities.Worse(Child.OrdinalIndexState, OrdinalIndexState.Increasing));
50 
51             m_predicate = predicate;
52         }
53 
WrapPartitionedStream( PartitionedStream<TInputOutput, TKey> inputStream, IPartitionedStreamRecipient<TInputOutput> recipient, bool preferStriping, QuerySettings settings)54         internal override void WrapPartitionedStream<TKey>(
55             PartitionedStream<TInputOutput, TKey> inputStream, IPartitionedStreamRecipient<TInputOutput> recipient, bool preferStriping, QuerySettings settings)
56         {
57             PartitionedStream<TInputOutput, TKey> outputStream = new PartitionedStream<TInputOutput, TKey>(
58                 inputStream.PartitionCount, inputStream.KeyComparer, OrdinalIndexState);
59             for (int i = 0; i < inputStream.PartitionCount; i++)
60             {
61                 outputStream[i] = new WhereQueryOperatorEnumerator<TKey>(inputStream[i], m_predicate,
62                     settings.CancellationState.MergedCancellationToken);
63             }
64 
65             recipient.Receive(outputStream);
66         }
67 
68         //---------------------------------------------------------------------------------------
69         // Just opens the current operator, including opening the child and wrapping it with
70         // partitions as needed.
71         //
72 
Open(QuerySettings settings, bool preferStriping)73         internal override QueryResults<TInputOutput> Open(QuerySettings settings, bool preferStriping)
74         {
75             // We just open the child operator.
76             QueryResults<TInputOutput> childQueryResults = Child.Open(settings, preferStriping);
77 
78             // And then return the query results
79             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
80         }
81 
82         //---------------------------------------------------------------------------------------
83         // Returns an enumerable that represents the query executing sequentially.
84         //
85 
AsSequentialQuery(CancellationToken token)86         internal override IEnumerable<TInputOutput> AsSequentialQuery(CancellationToken token)
87         {
88             IEnumerable<TInputOutput> wrappedChild = CancellableEnumerable.Wrap(Child.AsSequentialQuery(token), token);
89             return wrappedChild.Where(m_predicate);
90         }
91 
92         //---------------------------------------------------------------------------------------
93         // Whether this operator performs a premature merge that would not be performed in
94         // a similar sequential operation (i.e., in LINQ to Objects).
95         //
96 
97         internal override bool LimitsParallelism
98         {
99             get { return false; }
100         }
101 
102         //-----------------------------------------------------------------------------------
103         // An enumerator that implements the filtering logic.
104         //
105 
106         private class WhereQueryOperatorEnumerator<TKey> : QueryOperatorEnumerator<TInputOutput, TKey>
107         {
108 
109             private readonly QueryOperatorEnumerator<TInputOutput, TKey> m_source; // The data source to enumerate.
110             private readonly Func<TInputOutput, bool> m_predicate; // The predicate used for filtering.
111             private CancellationToken m_cancellationToken;
112             private Shared<int> m_outputLoopCount;
113 
114             //-----------------------------------------------------------------------------------
115             // Instantiates a new enumerator.
116             //
117 
WhereQueryOperatorEnumerator(QueryOperatorEnumerator<TInputOutput, TKey> source, Func<TInputOutput, bool> predicate, CancellationToken cancellationToken)118             internal WhereQueryOperatorEnumerator(QueryOperatorEnumerator<TInputOutput, TKey> source, Func<TInputOutput, bool> predicate,
119                 CancellationToken cancellationToken)
120             {
121                 Contract.Assert(source != null);
122                 Contract.Assert(predicate != null);
123 
124                 m_source = source;
125                 m_predicate = predicate;
126                 m_cancellationToken = cancellationToken;
127             }
128 
129             //-----------------------------------------------------------------------------------
130             // Moves to the next matching element in the underlying data stream.
131             //
132 
MoveNext(ref TInputOutput currentElement, ref TKey currentKey)133             internal override bool MoveNext(ref TInputOutput currentElement, ref TKey currentKey)
134             {
135                 Contract.Assert(m_predicate != null, "expected a compiled operator");
136 
137                 // Iterate through the input until we reach the end of the sequence or find
138                 // an element matching the predicate.
139 
140                 if (m_outputLoopCount == null)
141                     m_outputLoopCount = new Shared<int>(0);
142 
143                 while (m_source.MoveNext(ref currentElement, ref currentKey))
144                 {
145                     if ((m_outputLoopCount.Value++ & CancellationState.POLL_INTERVAL) == 0)
146                         CancellationState.ThrowIfCanceled(m_cancellationToken);
147 
148                     if (m_predicate(currentElement))
149                     {
150                         return true;
151                     }
152                 }
153                 return false;
154             }
155 
Dispose(bool disposing)156             protected override void Dispose(bool disposing)
157             {
158                 Contract.Assert(m_source != null);
159                 m_source.Dispose();
160             }
161         }
162     }
163 }
164