1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 6 // 7 // ExceptionAggregator.cs 8 // 9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 10 11 using System; 12 using System.Collections.Generic; 13 using System.Threading; 14 15 namespace System.Linq.Parallel 16 { 17 internal static class ExceptionAggregator 18 { 19 /// <summary> 20 /// WrapEnumerable.ExceptionAggregator wraps the enumerable with another enumerator that will 21 /// catch exceptions, and wrap each with an AggregateException. 22 /// 23 /// If PLINQ decides to execute a query sequentially, we will reuse LINQ-to-objects 24 /// implementations for the different operators. However, we still need to throw 25 /// AggregateException in the cases when parallel execution would have thrown an 26 /// AggregateException. Thus, we introduce a wrapper enumerator that catches exceptions 27 /// and wraps them with an AggregateException. 28 /// </summary> WrapEnumerable(IEnumerable<TElement> source, CancellationState cancellationState)29 internal static IEnumerable<TElement> WrapEnumerable<TElement>(IEnumerable<TElement> source, CancellationState cancellationState) 30 { 31 using (IEnumerator<TElement> enumerator = source.GetEnumerator()) 32 { 33 while (true) 34 { 35 TElement elem = default(TElement); 36 try 37 { 38 if (!enumerator.MoveNext()) 39 { 40 yield break; 41 } 42 elem = enumerator.Current; 43 } 44 #if SUPPORT_THREAD_ABORT 45 catch (ThreadAbortException) 46 { 47 // Do not wrap ThreadAbortExceptions 48 throw; 49 } 50 #endif 51 catch (Exception ex) 52 { 53 ThrowOCEorAggregateException(ex, cancellationState); 54 } 55 yield return elem; 56 } 57 } 58 } 59 60 61 /// <summary> 62 /// A variant of WrapEnumerable that accepts a QueryOperatorEnumerator{,} instead of an IEnumerable{}. 63 /// The code duplication is necessary to avoid extra virtual method calls that would otherwise be needed to 64 /// convert the QueryOperatorEnumerator{,} to an IEnumerator{}. 65 /// </summary> WrapQueryEnumerator(QueryOperatorEnumerator<TElement, TIgnoreKey> source, CancellationState cancellationState)66 internal static IEnumerable<TElement> WrapQueryEnumerator<TElement, TIgnoreKey>(QueryOperatorEnumerator<TElement, TIgnoreKey> source, 67 CancellationState cancellationState) 68 { 69 TElement elem = default(TElement); 70 TIgnoreKey ignoreKey = default(TIgnoreKey); 71 72 try 73 { 74 while (true) 75 { 76 try 77 { 78 if (!source.MoveNext(ref elem, ref ignoreKey)) 79 { 80 yield break; 81 } 82 } 83 #if SUPPORT_THREAD_ABORT 84 catch (ThreadAbortException) 85 { 86 // Do not wrap ThreadAbortExceptions 87 throw; 88 } 89 #endif 90 catch (Exception ex) 91 { 92 ThrowOCEorAggregateException(ex, cancellationState); 93 } 94 95 yield return elem; 96 } 97 } 98 finally 99 { 100 source.Dispose(); 101 } 102 } 103 104 /// <summary> 105 /// Accepts an exception, wraps it as if it was crossing the parallel->sequential boundary, and throws the 106 /// wrapped exception. In sequential fallback cases, we use this method to throw exceptions that are consistent 107 /// with exceptions thrown by PLINQ when the query is executed by worker tasks. 108 /// 109 /// The exception will be wrapped into an AggregateException, except for the case when the query is being 110 /// legitimately cancelled, in which case we will propagate the CancellationException with the appropriate 111 /// token. 112 /// </summary> ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState)113 internal static void ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState) 114 { 115 if (ThrowAnOCE(ex, cancellationState)) 116 { 117 CancellationState.ThrowWithStandardMessageIfCanceled( 118 cancellationState.ExternalCancellationToken); 119 } 120 else 121 { 122 throw new AggregateException(ex); 123 } 124 } 125 126 127 /// <summary> 128 /// Wraps a function with a try/catch that morphs all exceptions into AggregateException. 129 /// </summary> 130 /// <typeparam name="T">The input argument type.</typeparam> 131 /// <typeparam name="U">The return value type.</typeparam> 132 /// <param name="f">A function to use internally.</param> 133 /// <param name="cancellationState">The cancellation state to use.</param> 134 /// <returns>A new function containing exception wrapping logic.</returns> WrapFunc(Func<T, U> f, CancellationState cancellationState)135 internal static Func<T, U> WrapFunc<T, U>(Func<T, U> f, CancellationState cancellationState) 136 { 137 return t => 138 { 139 U retval = default(U); 140 try 141 { 142 retval = f(t); 143 } 144 #if SUPPORT_THREAD_ABORT 145 catch (ThreadAbortException) 146 { 147 // Do not wrap ThreadAbortExceptions 148 throw; 149 } 150 #endif 151 catch (Exception ex) 152 { 153 ThrowOCEorAggregateException(ex, cancellationState); 154 } 155 return retval; 156 }; 157 } 158 159 // return: true ==> throw an OCE(externalCT) 160 // false ==> thrown an AggregateException(ex). ThrowAnOCE(Exception ex, CancellationState cancellationState)161 private static bool ThrowAnOCE(Exception ex, CancellationState cancellationState) 162 { 163 // if the query has been canceled and the exception represents this, we want to throw OCE 164 // but otherwise we want to throw an AggregateException to mimic normal Plinq operation 165 // See QueryTaskGroupState.WaitAll for the main plinq exception handling logic. 166 167 // check for co-operative cancellation. 168 OperationCanceledException cancelEx = ex as OperationCanceledException; 169 if (cancelEx != null && 170 cancelEx.CancellationToken == cancellationState.ExternalCancellationToken 171 && cancellationState.ExternalCancellationToken.IsCancellationRequested) 172 { 173 return true; // let the OCE(extCT) be rethrown. 174 } 175 176 // check for external cancellation which triggered the mergedToken. 177 if (cancelEx != null && 178 cancelEx.CancellationToken == cancellationState.MergedCancellationToken 179 && cancellationState.MergedCancellationToken.IsCancellationRequested 180 && cancellationState.ExternalCancellationToken.IsCancellationRequested) 181 { 182 return true; // convert internal cancellation back to OCE(extCT). 183 } 184 185 return false; 186 } 187 } 188 } 189