1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.Threading.Tasks;
8 
9 namespace System.Threading.Channels
10 {
11     /// <summary>Provides a channel with a bounded capacity.</summary>
12     [DebuggerDisplay("Items={ItemsCountForDebugger}, Capacity={_bufferedCapacity}")]
13     [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))]
14     internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T>
15     {
16         /// <summary>The mode used when the channel hits its bound.</summary>
17         private readonly BoundedChannelFullMode _mode;
18         /// <summary>Task signaled when the channel has completed.</summary>
19         private readonly TaskCompletionSource<VoidResult> _completion;
20         /// <summary>The maximum capacity of the channel.</summary>
21         private readonly int _bufferedCapacity;
22         /// <summary>Items currently stored in the channel waiting to be read.</summary>
23         private readonly Dequeue<T> _items = new Dequeue<T>();
24         /// <summary>Writers waiting to write to the channel.</summary>
25         private readonly Dequeue<WriterInteractor<T>> _blockedWriters = new Dequeue<WriterInteractor<T>>();
26         /// <summary>Task signaled when any WaitToReadAsync waiters should be woken up.</summary>
27         private ReaderInteractor<bool> _waitingReaders;
28         /// <summary>Task signaled when any WaitToWriteAsync waiters should be woken up.</summary>
29         private ReaderInteractor<bool> _waitingWriters;
30         /// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary>
31         private readonly bool _runContinuationsAsynchronously;
32         /// <summary>Set to non-null once Complete has been called.</summary>
33         private Exception _doneWriting;
34         /// <summary>Gets an object used to synchronize all state on the instance.</summary>
35         private object SyncObj => _items;
36 
37         /// <summary>Initializes the <see cref="BoundedChannel{T}"/>.</summary>
38         /// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param>
39         /// <param name="mode">The mode used when writing to a full channel.</param>
40         /// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param>
BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)41         internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)
42         {
43             Debug.Assert(bufferedCapacity > 0);
44             _bufferedCapacity = bufferedCapacity;
45             _mode = mode;
46             _runContinuationsAsynchronously = runContinuationsAsynchronously;
47             _completion = new TaskCompletionSource<VoidResult>(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None);
48             Reader = new BoundedChannelReader(this);
49             Writer = new BoundedChannelWriter(this);
50         }
51 
52         private sealed class BoundedChannelReader : ChannelReader<T>
53         {
54             internal readonly BoundedChannel<T> _parent;
55             internal BoundedChannelReader(BoundedChannel<T> parent) => _parent = parent;
56 
57             public override Task Completion => _parent._completion.Task;
58 
TryRead(out T item)59             public override bool TryRead(out T item)
60             {
61                 BoundedChannel<T> parent = _parent;
62                 lock (parent.SyncObj)
63                 {
64                     parent.AssertInvariants();
65 
66                     // Get an item if there is one.
67                     if (!parent._items.IsEmpty)
68                     {
69                         item = DequeueItemAndPostProcess();
70                         return true;
71                     }
72                 }
73 
74                 item = default;
75                 return false;
76             }
77 
WaitToReadAsync(CancellationToken cancellationToken)78             public override Task<bool> WaitToReadAsync(CancellationToken cancellationToken)
79             {
80                 if (cancellationToken.IsCancellationRequested)
81                 {
82                     return Task.FromCanceled<bool>(cancellationToken);
83                 }
84 
85                 BoundedChannel<T> parent = _parent;
86                 lock (parent.SyncObj)
87                 {
88                     parent.AssertInvariants();
89 
90                     // If there are any items available, a read is possible.
91                     if (!parent._items.IsEmpty)
92                     {
93                         return ChannelUtilities.s_trueTask;
94                     }
95 
96                     // There were no items available, so if we're done writing, a read will never be possible.
97                     if (parent._doneWriting != null)
98                     {
99                         return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
100                             Task.FromException<bool>(parent._doneWriting) :
101                             ChannelUtilities.s_falseTask;
102                     }
103 
104                     // There were no items available, but there could be in the future, so ensure
105                     // there's a blocked reader task and return it.
106                     return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingReaders, parent._runContinuationsAsynchronously, cancellationToken);
107                 }
108             }
109 
110             /// <summary>Dequeues an item, and then fixes up our state around writers and completion.</summary>
111             /// <returns>The dequeued item.</returns>
DequeueItemAndPostProcess()112             private T DequeueItemAndPostProcess()
113             {
114                 BoundedChannel<T> parent = _parent;
115                 Debug.Assert(Monitor.IsEntered(parent.SyncObj));
116 
117                 // Dequeue an item.
118                 T item = parent._items.DequeueHead();
119 
120                 // If we're now empty and we're done writing, complete the channel.
121                 if (parent._doneWriting != null && parent._items.IsEmpty)
122                 {
123                     ChannelUtilities.Complete(parent._completion, parent._doneWriting);
124                 }
125 
126                 // If there are any writers blocked, there's now room for at least one
127                 // to be promoted to have its item moved into the items queue.  We need
128                 // to loop while trying to complete the writer in order to find one that
129                 // hasn't yet been canceled (canceled writers transition to canceled but
130                 // remain in the physical queue).
131                 while (!parent._blockedWriters.IsEmpty)
132                 {
133                     WriterInteractor<T> w = parent._blockedWriters.DequeueHead();
134                     if (w.Success(default))
135                     {
136                         parent._items.EnqueueTail(w.Item);
137                         return item;
138                     }
139                 }
140 
141                 // There was no blocked writer, so see if there's a WaitToWriteAsync
142                 // we should wake up.
143                 ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: true);
144 
145                 // Return the item
146                 return item;
147             }
148         }
149 
150         private sealed class BoundedChannelWriter : ChannelWriter<T>
151         {
152             internal readonly BoundedChannel<T> _parent;
153             internal BoundedChannelWriter(BoundedChannel<T> parent) => _parent = parent;
154 
TryComplete(Exception error)155             public override bool TryComplete(Exception error)
156             {
157                 BoundedChannel<T> parent = _parent;
158                 bool completeTask;
159                 lock (parent.SyncObj)
160                 {
161                     parent.AssertInvariants();
162 
163                     // If we've already marked the channel as completed, bail.
164                     if (parent._doneWriting != null)
165                     {
166                         return false;
167                     }
168 
169                     // Mark that we're done writing.
170                     parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel;
171                     completeTask = parent._items.IsEmpty;
172                 }
173 
174                 // If there are no items in the queue, complete the channel's task,
175                 // as no more data can possibly arrive at this point.  We do this outside
176                 // of the lock in case we'll be running synchronous completions, and we
177                 // do it before completing blocked/waiting readers, so that when they
178                 // wake up they'll see the task as being completed.
179                 if (completeTask)
180                 {
181                     ChannelUtilities.Complete(parent._completion, error);
182                 }
183 
184                 // At this point, _blockedWriters and _waitingReaders/Writers will not be mutated:
185                 // they're only mutated by readers/writers while holding the lock, and only if _doneWriting is null.
186                 // We also know that only one thread (this one) will ever get here, as only that thread
187                 // will be the one to transition from _doneWriting false to true.  As such, we can
188                 // freely manipulate them without any concurrency concerns.
189                 ChannelUtilities.FailInteractors<WriterInteractor<T>, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error));
190                 ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error);
191                 ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: false, error: error);
192 
193                 // Successfully transitioned to completed.
194                 return true;
195             }
196 
TryWrite(T item)197             public override bool TryWrite(T item)
198             {
199                 ReaderInteractor<bool> waitingReaders = null;
200 
201                 BoundedChannel<T> parent = _parent;
202                 lock (parent.SyncObj)
203                 {
204                     parent.AssertInvariants();
205 
206                     // If we're done writing, nothing more to do.
207                     if (parent._doneWriting != null)
208                     {
209                         return false;
210                     }
211 
212                     // Get the number of items in the channel currently.
213                     int count = parent._items.Count;
214 
215                     if (count == 0)
216                     {
217                         // There are no items in the channel, which means we may have waiting readers.
218                         // Store the item.
219                         parent._items.EnqueueTail(item);
220                         waitingReaders = parent._waitingReaders;
221                         if (waitingReaders == null)
222                         {
223                             // If no one's waiting to be notified about a 0-to-1 transition, we're done.
224                             return true;
225                         }
226                         parent._waitingReaders = null;
227                     }
228                     else if (count < parent._bufferedCapacity)
229                     {
230                         // There's room in the channel.  Since we're not transitioning from 0-to-1 and
231                         // since there's room, we can simply store the item and exit without having to
232                         // worry about blocked/waiting readers.
233                         parent._items.EnqueueTail(item);
234                         return true;
235                     }
236                     else if (parent._mode == BoundedChannelFullMode.Wait)
237                     {
238                         // The channel is full and we're in a wait mode.
239                         // Simply exit and let the caller know we didn't write the data.
240                         return false;
241                     }
242                     else if (parent._mode == BoundedChannelFullMode.DropWrite)
243                     {
244                         // The channel is full.  Just ignore the item being added
245                         // but say we added it.
246                         return true;
247                     }
248                     else
249                     {
250                         // The channel is full, and we're in a dropping mode.
251                         // Drop either the oldest or the newest and write the new item.
252                         T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
253                             parent._items.DequeueTail() :
254                             parent._items.DequeueHead();
255                         parent._items.EnqueueTail(item);
256                         return true;
257                     }
258                 }
259 
260                 // We stored an item bringing the count up from 0 to 1.  Alert
261                 // any waiting readers that there may be something for them to consume.
262                 // Since we're no longer holding the lock, it's possible we'll end up
263                 // waking readers that have since come in.
264                 waitingReaders.Success(item: true);
265                 return true;
266             }
267 
WaitToWriteAsync(CancellationToken cancellationToken)268             public override Task<bool> WaitToWriteAsync(CancellationToken cancellationToken)
269             {
270                 if (cancellationToken.IsCancellationRequested)
271                 {
272                     return Task.FromCanceled<bool>(cancellationToken);
273                 }
274 
275                 BoundedChannel<T> parent = _parent;
276                 lock (parent.SyncObj)
277                 {
278                     parent.AssertInvariants();
279 
280                     // If we're done writing, no writes will ever succeed.
281                     if (parent._doneWriting != null)
282                     {
283                         return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ?
284                             Task.FromException<bool>(parent._doneWriting) :
285                             ChannelUtilities.s_falseTask;
286                     }
287 
288                     // If there's space to write, a write is possible.
289                     // And if the mode involves dropping/ignoring, we can always write, as even if it's
290                     // full we'll just drop an element to make room.
291                     if (parent._items.Count < parent._bufferedCapacity || parent._mode != BoundedChannelFullMode.Wait)
292                     {
293                         return ChannelUtilities.s_trueTask;
294                     }
295 
296                     // We're still allowed to write, but there's no space, so ensure a waiter is queued and return it.
297                     return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingWriters, runContinuationsAsynchronously: true, cancellationToken);
298                 }
299             }
300 
WriteAsync(T item, CancellationToken cancellationToken)301             public override Task WriteAsync(T item, CancellationToken cancellationToken)
302             {
303                 if (cancellationToken.IsCancellationRequested)
304                 {
305                     return Task.FromCanceled(cancellationToken);
306                 }
307 
308                 ReaderInteractor<bool> waitingReaders = null;
309 
310                 BoundedChannel<T> parent = _parent;
311                 lock (parent.SyncObj)
312                 {
313                     parent.AssertInvariants();
314 
315                     // If we're done writing, trying to write is an error.
316                     if (parent._doneWriting != null)
317                     {
318                         return Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting));
319                     }
320 
321                     // Get the number of items in the channel currently.
322                     int count = parent._items.Count;
323 
324                     if (count == 0)
325                     {
326                         // There are no items in the channel, which means we may have waiting readers.
327                         // Store the item.
328                         parent._items.EnqueueTail(item);
329                         waitingReaders = parent._waitingReaders;
330                         if (waitingReaders == null)
331                         {
332                             // If no one's waiting to be notified about a 0-to-1 transition, we're done.
333                             return ChannelUtilities.s_trueTask;
334                         }
335                         parent._waitingReaders = null;
336                     }
337                     else if (count < parent._bufferedCapacity)
338                     {
339                         // There's room in the channel.  Since we're not transitioning from 0-to-1 and
340                         // since there's room, we can simply store the item and exit without having to
341                         // worry about blocked/waiting readers.
342                         parent._items.EnqueueTail(item);
343                         return ChannelUtilities.s_trueTask;
344                     }
345                     else if (parent._mode == BoundedChannelFullMode.Wait)
346                     {
347                         // The channel is full and we're in a wait mode.
348                         // Queue the writer.
349                         var writer = WriterInteractor<T>.Create(runContinuationsAsynchronously: true, item, cancellationToken);
350                         parent._blockedWriters.EnqueueTail(writer);
351                         return writer.Task;
352                     }
353                     else if (parent._mode == BoundedChannelFullMode.DropWrite)
354                     {
355                         // The channel is full and we're in ignore mode.
356                         // Ignore the item but say we accepted it.
357                         return ChannelUtilities.s_trueTask;
358                     }
359                     else
360                     {
361                         // The channel is full, and we're in a dropping mode.
362                         // Drop either the oldest or the newest and write the new item.
363                         T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ?
364                             parent._items.DequeueTail() :
365                             parent._items.DequeueHead();
366                         parent._items.EnqueueTail(item);
367                         return ChannelUtilities.s_trueTask;
368                     }
369                 }
370 
371                 // We stored an item bringing the count up from 0 to 1.  Alert
372                 // any waiting readers that there may be something for them to consume.
373                 // Since we're no longer holding the lock, it's possible we'll end up
374                 // waking readers that have since come in.
375                 waitingReaders.Success(item: true);
376                 return ChannelUtilities.s_trueTask;
377             }
378         }
379 
380         [Conditional("DEBUG")]
AssertInvariants()381         private void AssertInvariants()
382         {
383             Debug.Assert(SyncObj != null, "The sync obj must not be null.");
384             Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock.");
385 
386             if (!_items.IsEmpty)
387             {
388                 Debug.Assert(_waitingReaders == null, "There are items available, so there shouldn't be any waiting readers.");
389             }
390             if (_items.Count < _bufferedCapacity)
391             {
392                 Debug.Assert(_blockedWriters.IsEmpty, "There's space available, so there shouldn't be any blocked writers.");
393                 Debug.Assert(_waitingWriters == null, "There's space available, so there shouldn't be any waiting writers.");
394             }
395             if (!_blockedWriters.IsEmpty)
396             {
397                 Debug.Assert(_items.Count == _bufferedCapacity, "We should have a full buffer if there's a blocked writer.");
398             }
399             if (_completion.Task.IsCompleted)
400             {
401                 Debug.Assert(_doneWriting != null, "We can only complete if we're done writing.");
402             }
403         }
404 
405         /// <summary>Gets the number of items in the channel.  This should only be used by the debugger.</summary>
406         private int ItemsCountForDebugger => _items.Count;
407 
408         /// <summary>Gets an enumerator the debugger can use to show the contents of the channel.</summary>
GetEnumerator()409         IEnumerator<T> IDebugEnumerable<T>.GetEnumerator() => _items.GetEnumerator();
410     }
411 }
412