1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
6 //
7 // SpoolingTaskBase.cs
8 //
9 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
10 
11 using System.Collections.Generic;
12 using System.Threading;
13 
14 namespace System.Linq.Parallel
15 {
16     /// <summary>
17     /// A spooling task handles marshaling data from a producer to a consumer. It simply
18     /// takes data from a producer and hands it off to a consumer. This class is the base
19     /// class from which other concrete spooling tasks derive, encapsulating some common
20     /// logic (such as capturing exceptions).
21     /// </summary>
22     internal abstract class SpoolingTaskBase : QueryTask
23     {
24         //-----------------------------------------------------------------------------------
25         // Constructs a new spooling task.
26         //
27         // Arguments:
28         //     taskIndex   - the unique index of this task
29         //
30 
SpoolingTaskBase(int taskIndex, QueryTaskGroupState groupState)31         protected SpoolingTaskBase(int taskIndex, QueryTaskGroupState groupState) :
32             base(taskIndex, groupState)
33         {
34         }
35 
36         //-----------------------------------------------------------------------------------
37         // The implementation of the Work API just enumerates the producer's data, and
38         // enqueues it into the consumer channel. Well, really, it just defers to extension
39         // points (below) implemented by subclasses to do these things.
40         //
41 
Work()42         protected override void Work()
43         {
44             try
45             {
46                 // Defer to the base class for the actual work.
47                 SpoolingWork();
48             }
49             catch (Exception ex)
50             {
51                 OperationCanceledException oce = ex as OperationCanceledException;
52                 if (oce != null &&
53                     oce.CancellationToken == _groupState.CancellationState.MergedCancellationToken
54                     && _groupState.CancellationState.MergedCancellationToken.IsCancellationRequested)
55                 {
56                     //an expected internal cancellation has occurred.  suppress this exception.
57                 }
58                 else
59                 {
60                     // TPL will catch and store the exception on the task object. We'll then later
61                     // turn around and wait on it, having the effect of propagating it. In the meantime,
62                     // we want to cooperative cancel all workers.
63                     _groupState.CancellationState.InternalCancellationTokenSource.Cancel();
64 
65                     // And then repropagate to let TPL catch it.
66                     throw;
67                 }
68             }
69             finally
70             {
71                 SpoolingFinally(); //dispose resources etc.
72             }
73         }
74 
75         //-----------------------------------------------------------------------------------
76         // This method is responsible for enumerating results and enqueueing them to
77         // the output channel(s) as appropriate.  Each base class implements its own.
78         //
79 
SpoolingWork()80         protected abstract void SpoolingWork();
81 
82         //-----------------------------------------------------------------------------------
83         // If the subclass needs to do something in the finally block of the main work loop,
84         // it should override this and do it.  Purely optional.
85         //
86 
SpoolingFinally()87         protected virtual void SpoolingFinally()
88         {
89             // (Intentionally left blank.)
90         }
91     }
92 }
93