1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // ContainsSearchOperator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10 
11 using System.Collections.Generic;
12 using System.Diagnostics;
13 using System.Diagnostics.CodeAnalysis;
14 using System.Threading;
15 
16 namespace System.Linq.Parallel
17 {
18     /// <summary>
19     /// Contains is quite similar to the any/all operator above. Each partition searches a
20     /// subset of elements for a match, and the first one to find a match signals to the rest
21     /// of the partitions to stop searching.
22     /// </summary>
23     /// <typeparam name="TInput"></typeparam>
24     internal sealed class ContainsSearchOperator<TInput> : UnaryQueryOperator<TInput, bool>
25     {
26         private readonly TInput _searchValue; // The value for which we are searching.
27         private readonly IEqualityComparer<TInput> _comparer; // The comparer to use for equality tests.
28 
29         //---------------------------------------------------------------------------------------
30         // Constructs a new instance of the contains search operator.
31         //
32         // Arguments:
33         //     child       - the child tree to enumerate.
34         //     searchValue - value we are searching for.
35         //     comparer    - a comparison routine used to test equality.
36         //
37 
ContainsSearchOperator(IEnumerable<TInput> child, TInput searchValue, IEqualityComparer<TInput> comparer)38         internal ContainsSearchOperator(IEnumerable<TInput> child, TInput searchValue, IEqualityComparer<TInput> comparer)
39             : base(child)
40         {
41             Debug.Assert(child != null, "child data source cannot be null");
42 
43             _searchValue = searchValue;
44 
45             if (comparer == null)
46             {
47                 _comparer = EqualityComparer<TInput>.Default;
48             }
49             else
50             {
51                 _comparer = comparer;
52             }
53         }
54 
55         //---------------------------------------------------------------------------------------
56         // Executes the entire query tree, and aggregates the individual partition results to
57         // form an overall answer to the search operation.
58         //
59 
Aggregate()60         internal bool Aggregate()
61         {
62             // Because the final reduction is typically much cheaper than the intermediate
63             // reductions over the individual partitions, and because each parallel partition
64             // could do a lot of work to produce a single output element, we prefer to turn off
65             // pipelining, and process the final reductions serially.
66             using (IEnumerator<bool> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true))
67             {
68                 // Any value of true means the element was found. We needn't consult all partitions
69                 while (enumerator.MoveNext())
70                 {
71                     if (enumerator.Current)
72                     {
73                         return true;
74                     }
75                 }
76             }
77 
78             return false;
79         }
80 
81         //---------------------------------------------------------------------------------------
82         // Just opens the current operator, including opening the child and wrapping it with
83         // partitions as needed.
84         //
85 
Open(QuerySettings settings, bool preferStriping)86         internal override QueryResults<bool> Open(QuerySettings settings, bool preferStriping)
87         {
88             QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping);
89             return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping);
90         }
91 
WrapPartitionedStream( PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)92         internal override void WrapPartitionedStream<TKey>(
93             PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)
94         {
95             int partitionCount = inputStream.PartitionCount;
96             PartitionedStream<bool, int> outputStream = new PartitionedStream<bool, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct);
97 
98             // Create a shared cancellation variable
99             Shared<bool> resultFoundFlag = new Shared<bool>(false);
100             for (int i = 0; i < partitionCount; i++)
101             {
102                 outputStream[i] = new ContainsSearchOperatorEnumerator<TKey>(inputStream[i], _searchValue, _comparer, i, resultFoundFlag,
103                     settings.CancellationState.MergedCancellationToken);
104             }
105 
106             recipient.Receive(outputStream);
107         }
108 
109         //---------------------------------------------------------------------------------------
110         // Returns an enumerable that represents the query executing sequentially.
111         //
112 
113         [ExcludeFromCodeCoverage]
AsSequentialQuery(CancellationToken token)114         internal override IEnumerable<bool> AsSequentialQuery(CancellationToken token)
115         {
116             Debug.Fail("This method should never be called as it is an ending operator with LimitsParallelism=false.");
117             throw new NotSupportedException();
118         }
119 
120         //---------------------------------------------------------------------------------------
121         // Whether this operator performs a premature merge that would not be performed in
122         // a similar sequential operation (i.e., in LINQ to Objects).
123         //
124 
125         internal override bool LimitsParallelism
126         {
127             get { return false; }
128         }
129 
130         //---------------------------------------------------------------------------------------
131         // This enumerator performs the search over its input data source. It also cancels peer
132         // enumerators when an answer was found, and polls this cancellation flag to stop when
133         // requested.
134         //
135 
136         class ContainsSearchOperatorEnumerator<TKey> : QueryOperatorEnumerator<bool, int>
137         {
138             private readonly QueryOperatorEnumerator<TInput, TKey> _source; // The source data.
139             private readonly TInput _searchValue; // The value for which we are searching.
140             private readonly IEqualityComparer<TInput> _comparer; // The comparer to use for equality tests.
141             private readonly int _partitionIndex; // This partition's unique index.
142             private readonly Shared<bool> _resultFoundFlag; // Whether to cancel the operation.
143             private CancellationToken _cancellationToken;
144 
145             //---------------------------------------------------------------------------------------
146             // Instantiates a new any/all search operator.
147             //
148 
ContainsSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, TInput searchValue, IEqualityComparer<TInput> comparer, int partitionIndex, Shared<bool> resultFoundFlag, CancellationToken cancellationToken)149             internal ContainsSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, TInput searchValue,
150                                                       IEqualityComparer<TInput> comparer, int partitionIndex, Shared<bool> resultFoundFlag,
151                 CancellationToken cancellationToken)
152             {
153                 Debug.Assert(source != null);
154                 Debug.Assert(comparer != null);
155                 Debug.Assert(resultFoundFlag != null);
156 
157                 _source = source;
158                 _searchValue = searchValue;
159                 _comparer = comparer;
160                 _partitionIndex = partitionIndex;
161                 _resultFoundFlag = resultFoundFlag;
162                 _cancellationToken = cancellationToken;
163             }
164 
165             //---------------------------------------------------------------------------------------
166             // This enumerates the entire input source to perform the search. If another peer
167             // partition finds an answer before us, we will voluntarily return (propagating the
168             // peer's result).
169             //
170 
MoveNext(ref bool currentElement, ref int currentKey)171             internal override bool MoveNext(ref bool currentElement, ref int currentKey)
172             {
173                 Debug.Assert(_comparer != null);
174 
175                 // Avoid enumerating if we've already found an answer.
176                 if (_resultFoundFlag.Value)
177                     return false;
178 
179                 // We just scroll through the enumerator and accumulate the result.
180                 TInput element = default(TInput);
181                 TKey keyUnused = default(TKey);
182                 if (_source.MoveNext(ref element, ref keyUnused))
183                 {
184                     currentElement = false;
185                     currentKey = _partitionIndex;
186 
187                     // Continue walking the data so long as we haven't found an item that satisfies
188                     // the condition we are searching for.
189                     int i = 0;
190                     do
191                     {
192                         if ((i++ & CancellationState.POLL_INTERVAL) == 0)
193                             CancellationState.ThrowIfCanceled(_cancellationToken);
194 
195                         if (_resultFoundFlag.Value)
196                         {
197                             // If cancellation occurred, it's because a successful answer was found.
198                             return false;
199                         }
200 
201                         if (_comparer.Equals(element, _searchValue))
202                         {
203                             // We have found an item that satisfies the search. Cancel other
204                             // workers that are concurrently searching, and return.
205                             _resultFoundFlag.Value = true;
206                             currentElement = true;
207                             break;
208                         }
209                     }
210                     while (_source.MoveNext(ref element, ref keyUnused));
211 
212                     return true;
213                 }
214 
215                 return false;
216             }
217 
Dispose(bool disposing)218             protected override void Dispose(bool disposing)
219             {
220                 Debug.Assert(_source != null);
221                 _source.Dispose();
222             }
223         }
224     }
225 }
226