1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // ExceptionAggregator.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10 
11 using System;
12 using System.Collections.Generic;
13 using System.Threading;
14 
15 namespace System.Linq.Parallel
16 {
17     internal static class ExceptionAggregator
18     {
19         /// <summary>
20         /// WrapEnumerable.ExceptionAggregator wraps the enumerable with another enumerator that will
21         /// catch exceptions, and wrap each with an AggregateException.
22         ///
23         /// If PLINQ decides to execute a query sequentially, we will reuse LINQ-to-objects
24         /// implementations for the different operators. However, we still need to throw
25         /// AggregateException in the cases when parallel execution would have thrown an
26         /// AggregateException. Thus, we introduce a wrapper enumerator that catches exceptions
27         /// and wraps them with an AggregateException.
28         /// </summary>
WrapEnumerable(IEnumerable<TElement> source, CancellationState cancellationState)29         internal static IEnumerable<TElement> WrapEnumerable<TElement>(IEnumerable<TElement> source, CancellationState cancellationState)
30         {
31             using (IEnumerator<TElement> enumerator = source.GetEnumerator())
32             {
33                 while (true)
34                 {
35                     TElement elem = default(TElement);
36                     try
37                     {
38                         if (!enumerator.MoveNext())
39                         {
40                             yield break;
41                         }
42                         elem = enumerator.Current;
43                     }
44 #if SUPPORT_THREAD_ABORT
45                     catch (ThreadAbortException)
46                     {
47                         // Do not wrap ThreadAbortExceptions
48                         throw;
49                     }
50 #endif
51                     catch (Exception ex)
52                     {
53                         ThrowOCEorAggregateException(ex, cancellationState);
54                     }
55                     yield return elem;
56                 }
57             }
58         }
59 
60 
61         /// <summary>
62         /// A variant of WrapEnumerable that accepts a QueryOperatorEnumerator{,} instead of an IEnumerable{}.
63         /// The code duplication is necessary to avoid extra virtual method calls that would otherwise be needed to
64         /// convert the QueryOperatorEnumerator{,} to an IEnumerator{}.
65         /// </summary>
WrapQueryEnumerator(QueryOperatorEnumerator<TElement, TIgnoreKey> source, CancellationState cancellationState)66         internal static IEnumerable<TElement> WrapQueryEnumerator<TElement, TIgnoreKey>(QueryOperatorEnumerator<TElement, TIgnoreKey> source,
67             CancellationState cancellationState)
68         {
69             TElement elem = default(TElement);
70             TIgnoreKey ignoreKey = default(TIgnoreKey);
71 
72             try
73             {
74                 while (true)
75                 {
76                     try
77                     {
78                         if (!source.MoveNext(ref elem, ref ignoreKey))
79                         {
80                             yield break;
81                         }
82                     }
83 #if SUPPORT_THREAD_ABORT
84                     catch (ThreadAbortException)
85                     {
86                         // Do not wrap ThreadAbortExceptions
87                         throw;
88                     }
89 #endif
90                     catch (Exception ex)
91                     {
92                         ThrowOCEorAggregateException(ex, cancellationState);
93                     }
94 
95                     yield return elem;
96                 }
97             }
98             finally
99             {
100                 source.Dispose();
101             }
102         }
103 
104         /// <summary>
105         /// Accepts an exception, wraps it as if it was crossing the parallel->sequential boundary, and throws the
106         /// wrapped exception. In sequential fallback cases, we use this method to throw exceptions that are consistent
107         /// with exceptions thrown by PLINQ when the query is executed by worker tasks.
108         ///
109         /// The exception will be wrapped into an AggregateException, except for the case when the query is being
110         /// legitimately cancelled, in which case we will propagate the CancellationException with the appropriate
111         /// token.
112         /// </summary>
ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState)113         internal static void ThrowOCEorAggregateException(Exception ex, CancellationState cancellationState)
114         {
115             if (ThrowAnOCE(ex, cancellationState))
116             {
117                 CancellationState.ThrowWithStandardMessageIfCanceled(
118                     cancellationState.ExternalCancellationToken);
119             }
120             else
121             {
122                 throw new AggregateException(ex);
123             }
124         }
125 
126 
127         /// <summary>
128         /// Wraps a function with a try/catch that morphs all exceptions into AggregateException.
129         /// </summary>
130         /// <typeparam name="T">The input argument type.</typeparam>
131         /// <typeparam name="U">The return value type.</typeparam>
132         /// <param name="f">A function to use internally.</param>
133         /// <param name="cancellationState">The cancellation state to use.</param>
134         /// <returns>A new function containing exception wrapping logic.</returns>
WrapFunc(Func<T, U> f, CancellationState cancellationState)135         internal static Func<T, U> WrapFunc<T, U>(Func<T, U> f, CancellationState cancellationState)
136         {
137             return t =>
138                 {
139                     U retval = default(U);
140                     try
141                     {
142                         retval = f(t);
143                     }
144 #if SUPPORT_THREAD_ABORT
145                     catch (ThreadAbortException)
146                     {
147                         // Do not wrap ThreadAbortExceptions
148                         throw;
149                     }
150 #endif
151                     catch (Exception ex)
152                     {
153                         ThrowOCEorAggregateException(ex, cancellationState);
154                     }
155                     return retval;
156                 };
157         }
158 
159         // return: true ==> throw an OCE(externalCT)
160         //         false ==> thrown an AggregateException(ex).
ThrowAnOCE(Exception ex, CancellationState cancellationState)161         private static bool ThrowAnOCE(Exception ex, CancellationState cancellationState)
162         {
163             // if the query has been canceled and the exception represents this, we want to throw OCE
164             // but otherwise we want to throw an AggregateException to mimic normal Plinq operation
165             // See QueryTaskGroupState.WaitAll for the main plinq exception handling logic.
166 
167             // check for co-operative cancellation.
168             OperationCanceledException cancelEx = ex as OperationCanceledException;
169             if (cancelEx != null &&
170                 cancelEx.CancellationToken == cancellationState.ExternalCancellationToken
171                 && cancellationState.ExternalCancellationToken.IsCancellationRequested)
172             {
173                 return true;  // let the OCE(extCT) be rethrown.
174             }
175 
176             // check for external cancellation which triggered the mergedToken.
177             if (cancelEx != null &&
178                 cancelEx.CancellationToken == cancellationState.MergedCancellationToken
179                 && cancellationState.MergedCancellationToken.IsCancellationRequested
180                 && cancellationState.ExternalCancellationToken.IsCancellationRequested)
181             {
182                 return true;  // convert internal cancellation back to OCE(extCT).
183             }
184 
185             return false;
186         }
187     }
188 }
189