1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 //
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // ExceptionAggregator.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
13 
14 
15 using System;
16 using System.Collections.Generic;
17 using System.Threading;
18 
19 namespace System.Linq.Parallel
20 {
21     internal static class ExceptionAggregator
22     {
23         /// <summary>
24         /// WrapEnumerable.ExceptionAggregator wraps the enumerable with another enumerator that will
25         /// catch exceptions, and wrap each with an AggregateException.
26         ///
27         /// If PLINQ decides to execute a query sequentially, we will reuse LINQ-to-objects
28         /// implementations for the different operators. However, we still need to throw
29         /// AggregateException in the cases when parallel execution would have thrown an
30         /// AggregateException. Thus, we introduce a wrapper enumerator that catches exceptions
31         /// and wraps them with an AggregateException.
32         /// </summary>
WrapEnumerable(IEnumerable<TElement> source, CancellationState cancellationState)33         internal static IEnumerable<TElement> WrapEnumerable<TElement>(IEnumerable<TElement> source, CancellationState cancellationState)
34         {
35             using (IEnumerator<TElement> enumerator = source.GetEnumerator())
36             {
37                 while (true)
38                 {
39                     TElement elem = default(TElement);
40                     try
41                     {
42                         if (!enumerator.MoveNext())
43                         {
44                             yield break;
45                         }
46                         elem = enumerator.Current;
47                     }
48                     catch (ThreadAbortException)
49                     {
50                         // Do not wrap ThreadAbortExceptions
51                         throw;
52                     }
53                     catch (Exception ex)
54                     {
55                         ThrowOCEorAggregateException(ex, cancellationState);
56                     }
57                     yield return elem;
58                 }
59             }
60 
61         }
62 
63 
64         /// <summary>
65         /// A variant of WrapEnumerable that accepts a QueryOperatorEnumerator{,} instead of an IEnumerable{}.
66         /// The code duplication is necessary to avoid extra virtual method calls that would otherwise be needed to
67         /// convert the QueryOperatorEnumerator{,} to an IEnumerator{}.
68         /// </summary>
WrapQueryEnumerator(QueryOperatorEnumerator<TElement, TIgnoreKey> source, CancellationState cancellationState)69         internal static IEnumerable<TElement> WrapQueryEnumerator<TElement, TIgnoreKey>(QueryOperatorEnumerator<TElement, TIgnoreKey> source,
70             CancellationState cancellationState)
71         {
72             TElement elem = default(TElement);
73             TIgnoreKey ignoreKey = default(TIgnoreKey);
74 
75             try
76             {
77                 while (true)
78                 {
79                     try
80                     {
81                         if (!source.MoveNext(ref elem, ref ignoreKey))
82                         {
83                             yield break;
84                         }
85                     }
86                     catch (ThreadAbortException)
87                     {
88                         // Do not wrap ThreadAbortExceptions
89                         throw;
90                     }
91                     catch (Exception ex)
92                     {
93                         ThrowOCEorAggregateException(ex, cancellationState);
94                     }
95 
96                     yield return elem;
97                 }
98             }
99             finally
100             {
101                 source.Dispose();
102             }
103         }
104 
105         /// <summary>
106         /// Accepts an exception, wraps it as if it was crossing the parallel->sequential boundary, and throws the
107         /// wrapped exception. In sequential fallback cases, we use this method to throw exceptions that are consistent
108         /// with exceptions thrown by PLINQ when the query is executed by worker tasks.
109         ///
110         /// The exception will be wrapped into an AggregateException, except for the case when the query is being
111         /// legitimately cancelled, in which case we will propagate the CancellationException with the appropriate
112         /// token.
113         /// </summary>
ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState)114         internal static void ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState)
115         {
116             if (ThrowAnOCE(ex, cancellationState))
117             {
118                 CancellationState.ThrowWithStandardMessageIfCanceled(
119                     cancellationState.ExternalCancellationToken);
120             }
121             else
122             {
123                 throw new AggregateException(ex);
124             }
125         }
126 
127 
128         /// <summary>
129         /// Wraps a function with a try/catch that morphs all exceptions into AggregateException.
130         /// </summary>
131         /// <typeparam name="T">The input argument type.</typeparam>
132         /// <typeparam name="U">The return value type.</typeparam>
133         /// <param name="f">A function to use internally.</param>
134         /// <param name="cancellationState">The cancellation state to use.</param>
135         /// <returns>A new function containing exception wrapping logic.</returns>
WrapFunc(Func<T, U> f, CancellationState cancellationState)136         internal static Func<T, U> WrapFunc<T, U>(Func<T, U> f, CancellationState cancellationState)
137         {
138             return t =>
139                 {
140                     U retval = default(U);
141                     try
142                     {
143                          retval = f(t);
144                     }
145                     catch (ThreadAbortException)
146                     {
147                         // Do not wrap ThreadAbortExceptions
148                         throw;
149                     }
150                     catch (Exception ex)
151                     {
152                         ThrowOCEorAggregateException(ex, cancellationState);
153                     }
154                     return retval;
155                 };
156         }
157 
158         // return: true ==> throw an OCE(externalCT)
159         //         false ==> thrown an AggregateException(ex).
ThrowAnOCE(Exception ex, CancellationState cancellationState)160         private static bool ThrowAnOCE(Exception ex, CancellationState cancellationState)
161         {
162             // if the query has been canceled and the exception represents this, we want to throw OCE
163             // but otherwise we want to throw an AggregateException to mimic normal Plinq operation
164             // See QueryTaskGroupState.WaitAll for the main plinq exception handling logic.
165 
166             // check for co-operative cancellation.
167             OperationCanceledException cancelEx = ex as OperationCanceledException;
168             if (cancelEx != null &&
169                 cancelEx.CancellationToken == cancellationState.ExternalCancellationToken
170                 && cancellationState.ExternalCancellationToken.IsCancellationRequested)
171             {
172                 return true;  // let the OCE(extCT) be rethrown.
173             }
174 
175             // check for external cancellation which triggered the mergedToken.
176             if (cancelEx != null &&
177                 cancelEx.CancellationToken == cancellationState.MergedCancellationToken
178                 && cancellationState.MergedCancellationToken.IsCancellationRequested
179                 && cancellationState.ExternalCancellationToken.IsCancellationRequested)
180             {
181                 return true;  // convert internal cancellation back to OCE(extCT).
182             }
183 
184             return false;
185         }
186     }
187 }
188