1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Generic; 6 using System.Diagnostics; 7 using System.Threading.Tasks; 8 9 namespace System.Threading.Channels 10 { 11 /// <summary>Provides internal helper methods for implementing channels.</summary> 12 internal static class ChannelUtilities 13 { 14 /// <summary>Sentinel object used to indicate being done writing.</summary> 15 internal static readonly Exception s_doneWritingSentinel = new Exception(nameof(s_doneWritingSentinel)); 16 /// <summary>A cached task with a Boolean true result.</summary> 17 internal static readonly Task<bool> s_trueTask = Task.FromResult(result: true); 18 /// <summary>A cached task with a Boolean false result.</summary> 19 internal static readonly Task<bool> s_falseTask = Task.FromResult(result: false); 20 /// <summary>A cached task that never completes.</summary> 21 internal static readonly Task s_neverCompletingTask = new TaskCompletionSource<bool>().Task; 22 23 /// <summary>Completes the specified TaskCompletionSource.</summary> 24 /// <param name="tcs">The source to complete.</param> 25 /// <param name="error"> 26 /// The optional exception with which to complete. 27 /// If this is null or the DoneWritingSentinel, the source will be completed successfully. 28 /// If this is an OperationCanceledException, it'll be completed with the exception's token. 29 /// Otherwise, it'll be completed as faulted with the exception. 30 /// </param> Complete(TaskCompletionSource<VoidResult> tcs, Exception error = null)31 internal static void Complete(TaskCompletionSource<VoidResult> tcs, Exception error = null) 32 { 33 if (error is OperationCanceledException oce) 34 { 35 tcs.TrySetCanceled(oce.CancellationToken); 36 } 37 else if (error != null && error != s_doneWritingSentinel) 38 { 39 tcs.TrySetException(error); 40 } 41 else 42 { 43 tcs.TrySetResult(default); 44 } 45 } 46 47 /// <summary>Wake up all of the waiters and null out the field.</summary> 48 /// <param name="waiters">The waiters.</param> 49 /// <param name="result">The value with which to complete each waiter.</param> WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result)50 internal static void WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result) 51 { 52 ReaderInteractor<bool> w = waiters; 53 if (w != null) 54 { 55 w.Success(result); 56 waiters = null; 57 } 58 } 59 60 /// <summary>Wake up all of the waiters and null out the field.</summary> 61 /// <param name="waiters">The waiters.</param> 62 /// <param name="result">The success value with which to complete each waiter if <paramref name="error">error</paramref> is null.</param> 63 /// <param name="error">The failure with which to cmplete each waiter, if non-null.</param> WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result, Exception error = null)64 internal static void WakeUpWaiters(ref ReaderInteractor<bool> waiters, bool result, Exception error = null) 65 { 66 ReaderInteractor<bool> w = waiters; 67 if (w != null) 68 { 69 if (error != null) 70 { 71 w.Fail(error); 72 } 73 else 74 { 75 w.Success(result); 76 } 77 waiters = null; 78 } 79 } 80 81 /// <summary>Removes all interactors from the queue, failing each.</summary> 82 /// <param name="interactors">The queue of interactors to complete.</param> 83 /// <param name="error">The error with which to complete each interactor.</param> 84 internal static void FailInteractors<T, TInner>(Dequeue<T> interactors, Exception error) where T : Interactor<TInner> 85 { 86 Debug.Assert(error != null); 87 while (!interactors.IsEmpty) 88 { 89 interactors.DequeueHead().Fail(error); 90 } 91 } 92 93 /// <summary>Gets or creates a "waiter" (e.g. WaitForRead/WriteAsync) interactor.</summary> 94 /// <param name="waiter">The field storing the waiter interactor.</param> 95 /// <param name="runContinuationsAsynchronously">true to force continuations to run asynchronously; otherwise, false.</param> 96 /// <param name="cancellationToken">The token to use to cancel the wait.</param> GetOrCreateWaiter(ref ReaderInteractor<bool> waiter, bool runContinuationsAsynchronously, CancellationToken cancellationToken)97 internal static Task<bool> GetOrCreateWaiter(ref ReaderInteractor<bool> waiter, bool runContinuationsAsynchronously, CancellationToken cancellationToken) 98 { 99 // Get the existing waiters interactor. 100 ReaderInteractor<bool> w = waiter; 101 102 // If there isn't one, create one. This explicitly does not include the cancellation token, 103 // as we reuse it for any number of waiters that overlap. 104 if (w == null) 105 { 106 waiter = w = ReaderInteractor<bool>.Create(runContinuationsAsynchronously); 107 } 108 109 // If the cancellation token can't be canceled, then just return the waiter task. 110 // If it can, we need to return a task that will complete when the waiter task does but that can also be canceled. 111 // Easiest way to do that is with a cancelable continuation. 112 return cancellationToken.CanBeCanceled ? 113 w.Task.ContinueWith(t => t.Result, cancellationToken, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default) : 114 w.Task; 115 } 116 117 /// <summary>Creates and returns an exception object to indicate that a channel has been closed.</summary> CreateInvalidCompletionException(Exception inner = null)118 internal static Exception CreateInvalidCompletionException(Exception inner = null) => 119 inner is OperationCanceledException ? inner : 120 inner != null && inner != s_doneWritingSentinel ? new ChannelClosedException(inner) : 121 new ChannelClosedException(); 122 } 123 } 124