1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // ContainsSearchOperator.cs 8 // 9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 10 11 using System.Collections.Generic; 12 using System.Diagnostics; 13 using System.Diagnostics.CodeAnalysis; 14 using System.Threading; 15 16 namespace System.Linq.Parallel 17 { 18 /// <summary> 19 /// Contains is quite similar to the any/all operator above. Each partition searches a 20 /// subset of elements for a match, and the first one to find a match signals to the rest 21 /// of the partitions to stop searching. 22 /// </summary> 23 /// <typeparam name="TInput"></typeparam> 24 internal sealed class ContainsSearchOperator<TInput> : UnaryQueryOperator<TInput, bool> 25 { 26 private readonly TInput _searchValue; // The value for which we are searching. 27 private readonly IEqualityComparer<TInput> _comparer; // The comparer to use for equality tests. 28 29 //--------------------------------------------------------------------------------------- 30 // Constructs a new instance of the contains search operator. 31 // 32 // Arguments: 33 // child - the child tree to enumerate. 34 // searchValue - value we are searching for. 35 // comparer - a comparison routine used to test equality. 36 // 37 ContainsSearchOperator(IEnumerable<TInput> child, TInput searchValue, IEqualityComparer<TInput> comparer)38 internal ContainsSearchOperator(IEnumerable<TInput> child, TInput searchValue, IEqualityComparer<TInput> comparer) 39 : base(child) 40 { 41 Debug.Assert(child != null, "child data source cannot be null"); 42 43 _searchValue = searchValue; 44 45 if (comparer == null) 46 { 47 _comparer = EqualityComparer<TInput>.Default; 48 } 49 else 50 { 51 _comparer = comparer; 52 } 53 } 54 55 //--------------------------------------------------------------------------------------- 56 // Executes the entire query tree, and aggregates the individual partition results to 57 // form an overall answer to the search operation. 58 // 59 Aggregate()60 internal bool Aggregate() 61 { 62 // Because the final reduction is typically much cheaper than the intermediate 63 // reductions over the individual partitions, and because each parallel partition 64 // could do a lot of work to produce a single output element, we prefer to turn off 65 // pipelining, and process the final reductions serially. 66 using (IEnumerator<bool> enumerator = GetEnumerator(ParallelMergeOptions.FullyBuffered, true)) 67 { 68 // Any value of true means the element was found. We needn't consult all partitions 69 while (enumerator.MoveNext()) 70 { 71 if (enumerator.Current) 72 { 73 return true; 74 } 75 } 76 } 77 78 return false; 79 } 80 81 //--------------------------------------------------------------------------------------- 82 // Just opens the current operator, including opening the child and wrapping it with 83 // partitions as needed. 84 // 85 Open(QuerySettings settings, bool preferStriping)86 internal override QueryResults<bool> Open(QuerySettings settings, bool preferStriping) 87 { 88 QueryResults<TInput> childQueryResults = Child.Open(settings, preferStriping); 89 return new UnaryQueryOperatorResults(childQueryResults, this, settings, preferStriping); 90 } 91 WrapPartitionedStream( PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings)92 internal override void WrapPartitionedStream<TKey>( 93 PartitionedStream<TInput, TKey> inputStream, IPartitionedStreamRecipient<bool> recipient, bool preferStriping, QuerySettings settings) 94 { 95 int partitionCount = inputStream.PartitionCount; 96 PartitionedStream<bool, int> outputStream = new PartitionedStream<bool, int>(partitionCount, Util.GetDefaultComparer<int>(), OrdinalIndexState.Correct); 97 98 // Create a shared cancellation variable 99 Shared<bool> resultFoundFlag = new Shared<bool>(false); 100 for (int i = 0; i < partitionCount; i++) 101 { 102 outputStream[i] = new ContainsSearchOperatorEnumerator<TKey>(inputStream[i], _searchValue, _comparer, i, resultFoundFlag, 103 settings.CancellationState.MergedCancellationToken); 104 } 105 106 recipient.Receive(outputStream); 107 } 108 109 //--------------------------------------------------------------------------------------- 110 // Returns an enumerable that represents the query executing sequentially. 111 // 112 113 [ExcludeFromCodeCoverage] AsSequentialQuery(CancellationToken token)114 internal override IEnumerable<bool> AsSequentialQuery(CancellationToken token) 115 { 116 Debug.Fail("This method should never be called as it is an ending operator with LimitsParallelism=false."); 117 throw new NotSupportedException(); 118 } 119 120 //--------------------------------------------------------------------------------------- 121 // Whether this operator performs a premature merge that would not be performed in 122 // a similar sequential operation (i.e., in LINQ to Objects). 123 // 124 125 internal override bool LimitsParallelism 126 { 127 get { return false; } 128 } 129 130 //--------------------------------------------------------------------------------------- 131 // This enumerator performs the search over its input data source. It also cancels peer 132 // enumerators when an answer was found, and polls this cancellation flag to stop when 133 // requested. 134 // 135 136 class ContainsSearchOperatorEnumerator<TKey> : QueryOperatorEnumerator<bool, int> 137 { 138 private readonly QueryOperatorEnumerator<TInput, TKey> _source; // The source data. 139 private readonly TInput _searchValue; // The value for which we are searching. 140 private readonly IEqualityComparer<TInput> _comparer; // The comparer to use for equality tests. 141 private readonly int _partitionIndex; // This partition's unique index. 142 private readonly Shared<bool> _resultFoundFlag; // Whether to cancel the operation. 143 private CancellationToken _cancellationToken; 144 145 //--------------------------------------------------------------------------------------- 146 // Instantiates a new any/all search operator. 147 // 148 ContainsSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, TInput searchValue, IEqualityComparer<TInput> comparer, int partitionIndex, Shared<bool> resultFoundFlag, CancellationToken cancellationToken)149 internal ContainsSearchOperatorEnumerator(QueryOperatorEnumerator<TInput, TKey> source, TInput searchValue, 150 IEqualityComparer<TInput> comparer, int partitionIndex, Shared<bool> resultFoundFlag, 151 CancellationToken cancellationToken) 152 { 153 Debug.Assert(source != null); 154 Debug.Assert(comparer != null); 155 Debug.Assert(resultFoundFlag != null); 156 157 _source = source; 158 _searchValue = searchValue; 159 _comparer = comparer; 160 _partitionIndex = partitionIndex; 161 _resultFoundFlag = resultFoundFlag; 162 _cancellationToken = cancellationToken; 163 } 164 165 //--------------------------------------------------------------------------------------- 166 // This enumerates the entire input source to perform the search. If another peer 167 // partition finds an answer before us, we will voluntarily return (propagating the 168 // peer's result). 169 // 170 MoveNext(ref bool currentElement, ref int currentKey)171 internal override bool MoveNext(ref bool currentElement, ref int currentKey) 172 { 173 Debug.Assert(_comparer != null); 174 175 // Avoid enumerating if we've already found an answer. 176 if (_resultFoundFlag.Value) 177 return false; 178 179 // We just scroll through the enumerator and accumulate the result. 180 TInput element = default(TInput); 181 TKey keyUnused = default(TKey); 182 if (_source.MoveNext(ref element, ref keyUnused)) 183 { 184 currentElement = false; 185 currentKey = _partitionIndex; 186 187 // Continue walking the data so long as we haven't found an item that satisfies 188 // the condition we are searching for. 189 int i = 0; 190 do 191 { 192 if ((i++ & CancellationState.POLL_INTERVAL) == 0) 193 CancellationState.ThrowIfCanceled(_cancellationToken); 194 195 if (_resultFoundFlag.Value) 196 { 197 // If cancellation occurred, it's because a successful answer was found. 198 return false; 199 } 200 201 if (_comparer.Equals(element, _searchValue)) 202 { 203 // We have found an item that satisfies the search. Cancel other 204 // workers that are concurrently searching, and return. 205 _resultFoundFlag.Value = true; 206 currentElement = true; 207 break; 208 } 209 } 210 while (_source.MoveNext(ref element, ref keyUnused)); 211 212 return true; 213 } 214 215 return false; 216 } 217 Dispose(bool disposing)218 protected override void Dispose(bool disposing) 219 { 220 Debug.Assert(_source != null); 221 _source.Dispose(); 222 } 223 } 224 } 225 } 226