1 // ==++== 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // ==--== 6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 7 // 8 // ExceptionAggregator.cs 9 // 10 // <OWNER>Microsoft</OWNER> 11 // 12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 13 14 15 using System; 16 using System.Collections.Generic; 17 using System.Threading; 18 19 namespace System.Linq.Parallel 20 { 21 internal static class ExceptionAggregator 22 { 23 /// <summary> 24 /// WrapEnumerable.ExceptionAggregator wraps the enumerable with another enumerator that will 25 /// catch exceptions, and wrap each with an AggregateException. 26 /// 27 /// If PLINQ decides to execute a query sequentially, we will reuse LINQ-to-objects 28 /// implementations for the different operators. However, we still need to throw 29 /// AggregateException in the cases when parallel execution would have thrown an 30 /// AggregateException. Thus, we introduce a wrapper enumerator that catches exceptions 31 /// and wraps them with an AggregateException. 32 /// </summary> WrapEnumerable(IEnumerable<TElement> source, CancellationState cancellationState)33 internal static IEnumerable<TElement> WrapEnumerable<TElement>(IEnumerable<TElement> source, CancellationState cancellationState) 34 { 35 using (IEnumerator<TElement> enumerator = source.GetEnumerator()) 36 { 37 while (true) 38 { 39 TElement elem = default(TElement); 40 try 41 { 42 if (!enumerator.MoveNext()) 43 { 44 yield break; 45 } 46 elem = enumerator.Current; 47 } 48 catch (ThreadAbortException) 49 { 50 // Do not wrap ThreadAbortExceptions 51 throw; 52 } 53 catch (Exception ex) 54 { 55 ThrowOCEorAggregateException(ex, cancellationState); 56 } 57 yield return elem; 58 } 59 } 60 61 } 62 63 64 /// <summary> 65 /// A variant of WrapEnumerable that accepts a QueryOperatorEnumerator{,} instead of an IEnumerable{}. 66 /// The code duplication is necessary to avoid extra virtual method calls that would otherwise be needed to 67 /// convert the QueryOperatorEnumerator{,} to an IEnumerator{}. 68 /// </summary> WrapQueryEnumerator(QueryOperatorEnumerator<TElement, TIgnoreKey> source, CancellationState cancellationState)69 internal static IEnumerable<TElement> WrapQueryEnumerator<TElement, TIgnoreKey>(QueryOperatorEnumerator<TElement, TIgnoreKey> source, 70 CancellationState cancellationState) 71 { 72 TElement elem = default(TElement); 73 TIgnoreKey ignoreKey = default(TIgnoreKey); 74 75 try 76 { 77 while (true) 78 { 79 try 80 { 81 if (!source.MoveNext(ref elem, ref ignoreKey)) 82 { 83 yield break; 84 } 85 } 86 catch (ThreadAbortException) 87 { 88 // Do not wrap ThreadAbortExceptions 89 throw; 90 } 91 catch (Exception ex) 92 { 93 ThrowOCEorAggregateException(ex, cancellationState); 94 } 95 96 yield return elem; 97 } 98 } 99 finally 100 { 101 source.Dispose(); 102 } 103 } 104 105 /// <summary> 106 /// Accepts an exception, wraps it as if it was crossing the parallel->sequential boundary, and throws the 107 /// wrapped exception. In sequential fallback cases, we use this method to throw exceptions that are consistent 108 /// with exceptions thrown by PLINQ when the query is executed by worker tasks. 109 /// 110 /// The exception will be wrapped into an AggregateException, except for the case when the query is being 111 /// legitimately cancelled, in which case we will propagate the CancellationException with the appropriate 112 /// token. 113 /// </summary> ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState)114 internal static void ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState) 115 { 116 if (ThrowAnOCE(ex, cancellationState)) 117 { 118 CancellationState.ThrowWithStandardMessageIfCanceled( 119 cancellationState.ExternalCancellationToken); 120 } 121 else 122 { 123 throw new AggregateException(ex); 124 } 125 } 126 127 128 /// <summary> 129 /// Wraps a function with a try/catch that morphs all exceptions into AggregateException. 130 /// </summary> 131 /// <typeparam name="T">The input argument type.</typeparam> 132 /// <typeparam name="U">The return value type.</typeparam> 133 /// <param name="f">A function to use internally.</param> 134 /// <param name="cancellationState">The cancellation state to use.</param> 135 /// <returns>A new function containing exception wrapping logic.</returns> WrapFunc(Func<T, U> f, CancellationState cancellationState)136 internal static Func<T, U> WrapFunc<T, U>(Func<T, U> f, CancellationState cancellationState) 137 { 138 return t => 139 { 140 U retval = default(U); 141 try 142 { 143 retval = f(t); 144 } 145 catch (ThreadAbortException) 146 { 147 // Do not wrap ThreadAbortExceptions 148 throw; 149 } 150 catch (Exception ex) 151 { 152 ThrowOCEorAggregateException(ex, cancellationState); 153 } 154 return retval; 155 }; 156 } 157 158 // return: true ==> throw an OCE(externalCT) 159 // false ==> thrown an AggregateException(ex). ThrowAnOCE(Exception ex, CancellationState cancellationState)160 private static bool ThrowAnOCE(Exception ex, CancellationState cancellationState) 161 { 162 // if the query has been canceled and the exception represents this, we want to throw OCE 163 // but otherwise we want to throw an AggregateException to mimic normal Plinq operation 164 // See QueryTaskGroupState.WaitAll for the main plinq exception handling logic. 165 166 // check for co-operative cancellation. 167 OperationCanceledException cancelEx = ex as OperationCanceledException; 168 if (cancelEx != null && 169 cancelEx.CancellationToken == cancellationState.ExternalCancellationToken 170 && cancellationState.ExternalCancellationToken.IsCancellationRequested) 171 { 172 return true; // let the OCE(extCT) be rethrown. 173 } 174 175 // check for external cancellation which triggered the mergedToken. 176 if (cancelEx != null && 177 cancelEx.CancellationToken == cancellationState.MergedCancellationToken 178 && cancellationState.MergedCancellationToken.IsCancellationRequested 179 && cancellationState.ExternalCancellationToken.IsCancellationRequested) 180 { 181 return true; // convert internal cancellation back to OCE(extCT). 182 } 183 184 return false; 185 } 186 } 187 } 188