1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.Threading.Tasks;
8 
9 namespace System.Threading.Channels
10 {
11     /// <summary>Provides internal helper methods for implementing channels.</summary>
12     internal static class ChannelUtilities
13     {
14         /// <summary>Sentinel object used to indicate being done writing.</summary>
15         internal static readonly Exception s_doneWritingSentinel = new Exception(nameof(s_doneWritingSentinel));
16         /// <summary>A cached task with a Boolean true result.</summary>
17         internal static readonly Task<bool> s_trueTask = Task.FromResult(result: true);
18         /// <summary>A cached task with a Boolean false result.</summary>
19         internal static readonly Task<bool> s_falseTask = Task.FromResult(result: false);
20         /// <summary>A cached task that never completes.</summary>
21         internal static readonly Task s_neverCompletingTask = new TaskCompletionSource<bool>().Task;
22 
23         /// <summary>Completes the specified TaskCompletionSource.</summary>
24         /// <param name="tcs">The source to complete.</param>
25         /// <param name="error">
26         /// The optional exception with which to complete.
27         /// If this is null or the DoneWritingSentinel, the source will be completed successfully.
28         /// If this is an OperationCanceledException, it'll be completed with the exception's token.
29         /// Otherwise, it'll be completed as faulted with the exception.
30         /// </param>
Complete(TaskCompletionSource<VoidResult> tcs, Exception error = null)31         internal static void Complete(TaskCompletionSource<VoidResult> tcs, Exception error = null)
32         {
33             if (error is OperationCanceledException oce)
34             {
35                 tcs.TrySetCanceled(oce.CancellationToken);
36             }
37             else if (error != null && error != s_doneWritingSentinel)
38             {
39                 tcs.TrySetException(error);
40             }
41             else
42             {
43                 tcs.TrySetResult(default);
44             }
45         }
46 
47         /// <summary>Wake up all of the waiters and null out the field.</summary>
48         /// <param name="waiters">The waiters.</param>
49         /// <param name="result">The value with which to complete each waiter.</param>
WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result)50         internal static void WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result)
51         {
52             ReaderInteractor<bool> w = waiters;
53             if (w != null)
54             {
55                 w.Success(result);
56                 waiters = null;
57             }
58         }
59 
60         /// <summary>Wake up all of the waiters and null out the field.</summary>
61         /// <param name="waiters">The waiters.</param>
62         /// <param name="result">The success value with which to complete each waiter if <paramref name="error">error</paramref> is null.</param>
63         /// <param name="error">The failure with which to cmplete each waiter, if non-null.</param>
WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result, Exception error = null)64         internal static void WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result, Exception error = null)
65         {
66             ReaderInteractor<bool> w = waiters;
67             if (w != null)
68             {
69                 if (error != null)
70                 {
71                     w.Fail(error);
72                 }
73                 else
74                 {
75                     w.Success(result);
76                 }
77                 waiters = null;
78             }
79         }
80 
81         /// <summary>Removes all interactors from the queue, failing each.</summary>
82         /// <param name="interactors">The queue of interactors to complete.</param>
83         /// <param name="error">The error with which to complete each interactor.</param>
84         internal static void FailInteractors<T, TInner>(Dequeue<T> interactors, Exception error) where T : Interactor<TInner>
85         {
86             Debug.Assert(error != null);
87             while (!interactors.IsEmpty)
88             {
89                 interactors.DequeueHead().Fail(error);
90             }
91         }
92 
93         /// <summary>Gets or creates a "waiter" (e.g. WaitForRead/WriteAsync) interactor.</summary>
94         /// <param name="waiter">The field storing the waiter interactor.</param>
95         /// <param name="runContinuationsAsynchronously">true to force continuations to run asynchronously; otherwise, false.</param>
96         /// <param name="cancellationToken">The token to use to cancel the wait.</param>
GetOrCreateWaiter(ref ReaderInteractor<bool> waiter, bool runContinuationsAsynchronously, CancellationToken cancellationToken)97         internal static Task<bool> GetOrCreateWaiter(ref ReaderInteractor<bool> waiter, bool runContinuationsAsynchronously, CancellationToken cancellationToken)
98         {
99             // Get the existing waiters interactor.
100             ReaderInteractor<bool> w = waiter;
101 
102             // If there isn't one, create one.  This explicitly does not include the cancellation token,
103             // as we reuse it for any number of waiters that overlap.
104             if (w == null)
105             {
106                 waiter = w = ReaderInteractor<bool>.Create(runContinuationsAsynchronously);
107             }
108 
109             // If the cancellation token can't be canceled, then just return the waiter task.
110             // If it can, we need to return a task that will complete when the waiter task does but that can also be canceled.
111             // Easiest way to do that is with a cancelable continuation.
112             return cancellationToken.CanBeCanceled ?
113                 w.Task.ContinueWith(t => t.Result, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) :
114                 w.Task;
115         }
116 
117         /// <summary>Creates and returns an exception object to indicate that a channel has been closed.</summary>
CreateInvalidCompletionException(Exception inner = null)118         internal static Exception CreateInvalidCompletionException(Exception inner = null) =>
119             inner is OperationCanceledException ? inner :
120             inner != null && inner != s_doneWritingSentinel ? new ChannelClosedException(inner) :
121             new ChannelClosedException();
122     }
123 }
124