1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Generic; 6 using System.Diagnostics; 7 using System.Threading.Tasks; 8 9 namespace System.Threading.Channels 10 { 11 /// <summary>Provides a channel with a bounded capacity.</summary> 12 [DebuggerDisplay("Items={ItemsCountForDebugger}, Capacity={_bufferedCapacity}")] 13 [DebuggerTypeProxy(typeof(DebugEnumeratorDebugView<>))] 14 internal sealed class BoundedChannel<T> : Channel<T>, IDebugEnumerable<T> 15 { 16 /// <summary>The mode used when the channel hits its bound.</summary> 17 private readonly BoundedChannelFullMode _mode; 18 /// <summary>Task signaled when the channel has completed.</summary> 19 private readonly TaskCompletionSource<VoidResult> _completion; 20 /// <summary>The maximum capacity of the channel.</summary> 21 private readonly int _bufferedCapacity; 22 /// <summary>Items currently stored in the channel waiting to be read.</summary> 23 private readonly Dequeue<T> _items = new Dequeue<T>(); 24 /// <summary>Writers waiting to write to the channel.</summary> 25 private readonly Dequeue<WriterInteractor<T>> _blockedWriters = new Dequeue<WriterInteractor<T>>(); 26 /// <summary>Task signaled when any WaitToReadAsync waiters should be woken up.</summary> 27 private ReaderInteractor<bool> _waitingReaders; 28 /// <summary>Task signaled when any WaitToWriteAsync waiters should be woken up.</summary> 29 private ReaderInteractor<bool> _waitingWriters; 30 /// <summary>Whether to force continuations to be executed asynchronously from producer writes.</summary> 31 private readonly bool _runContinuationsAsynchronously; 32 /// <summary>Set to non-null once Complete has been called.</summary> 33 private Exception _doneWriting; 34 /// <summary>Gets an object used to synchronize all state on the instance.</summary> 35 private object SyncObj => _items; 36 37 /// <summary>Initializes the <see cref="BoundedChannel{T}"/>.</summary> 38 /// <param name="bufferedCapacity">The positive bounded capacity for the channel.</param> 39 /// <param name="mode">The mode used when writing to a full channel.</param> 40 /// <param name="runContinuationsAsynchronously">Whether to force continuations to be executed asynchronously.</param> BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously)41 internal BoundedChannel(int bufferedCapacity, BoundedChannelFullMode mode, bool runContinuationsAsynchronously) 42 { 43 Debug.Assert(bufferedCapacity > 0); 44 _bufferedCapacity = bufferedCapacity; 45 _mode = mode; 46 _runContinuationsAsynchronously = runContinuationsAsynchronously; 47 _completion = new TaskCompletionSource<VoidResult>(runContinuationsAsynchronously ? TaskCreationOptions.RunContinuationsAsynchronously : TaskCreationOptions.None); 48 Reader = new BoundedChannelReader(this); 49 Writer = new BoundedChannelWriter(this); 50 } 51 52 private sealed class BoundedChannelReader : ChannelReader<T> 53 { 54 internal readonly BoundedChannel<T> _parent; 55 internal BoundedChannelReader(BoundedChannel<T> parent) => _parent = parent; 56 57 public override Task Completion => _parent._completion.Task; 58 TryRead(out T item)59 public override bool TryRead(out T item) 60 { 61 BoundedChannel<T> parent = _parent; 62 lock (parent.SyncObj) 63 { 64 parent.AssertInvariants(); 65 66 // Get an item if there is one. 67 if (!parent._items.IsEmpty) 68 { 69 item = DequeueItemAndPostProcess(); 70 return true; 71 } 72 } 73 74 item = default; 75 return false; 76 } 77 WaitToReadAsync(CancellationToken cancellationToken)78 public override Task<bool> WaitToReadAsync(CancellationToken cancellationToken) 79 { 80 if (cancellationToken.IsCancellationRequested) 81 { 82 return Task.FromCanceled<bool>(cancellationToken); 83 } 84 85 BoundedChannel<T> parent = _parent; 86 lock (parent.SyncObj) 87 { 88 parent.AssertInvariants(); 89 90 // If there are any items available, a read is possible. 91 if (!parent._items.IsEmpty) 92 { 93 return ChannelUtilities.s_trueTask; 94 } 95 96 // There were no items available, so if we're done writing, a read will never be possible. 97 if (parent._doneWriting != null) 98 { 99 return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? 100 Task.FromException<bool>(parent._doneWriting) : 101 ChannelUtilities.s_falseTask; 102 } 103 104 // There were no items available, but there could be in the future, so ensure 105 // there's a blocked reader task and return it. 106 return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingReaders, parent._runContinuationsAsynchronously, cancellationToken); 107 } 108 } 109 110 /// <summary>Dequeues an item, and then fixes up our state around writers and completion.</summary> 111 /// <returns>The dequeued item.</returns> DequeueItemAndPostProcess()112 private T DequeueItemAndPostProcess() 113 { 114 BoundedChannel<T> parent = _parent; 115 Debug.Assert(Monitor.IsEntered(parent.SyncObj)); 116 117 // Dequeue an item. 118 T item = parent._items.DequeueHead(); 119 120 // If we're now empty and we're done writing, complete the channel. 121 if (parent._doneWriting != null && parent._items.IsEmpty) 122 { 123 ChannelUtilities.Complete(parent._completion, parent._doneWriting); 124 } 125 126 // If there are any writers blocked, there's now room for at least one 127 // to be promoted to have its item moved into the items queue. We need 128 // to loop while trying to complete the writer in order to find one that 129 // hasn't yet been canceled (canceled writers transition to canceled but 130 // remain in the physical queue). 131 while (!parent._blockedWriters.IsEmpty) 132 { 133 WriterInteractor<T> w = parent._blockedWriters.DequeueHead(); 134 if (w.Success(default)) 135 { 136 parent._items.EnqueueTail(w.Item); 137 return item; 138 } 139 } 140 141 // There was no blocked writer, so see if there's a WaitToWriteAsync 142 // we should wake up. 143 ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: true); 144 145 // Return the item 146 return item; 147 } 148 } 149 150 private sealed class BoundedChannelWriter : ChannelWriter<T> 151 { 152 internal readonly BoundedChannel<T> _parent; 153 internal BoundedChannelWriter(BoundedChannel<T> parent) => _parent = parent; 154 TryComplete(Exception error)155 public override bool TryComplete(Exception error) 156 { 157 BoundedChannel<T> parent = _parent; 158 bool completeTask; 159 lock (parent.SyncObj) 160 { 161 parent.AssertInvariants(); 162 163 // If we've already marked the channel as completed, bail. 164 if (parent._doneWriting != null) 165 { 166 return false; 167 } 168 169 // Mark that we're done writing. 170 parent._doneWriting = error ?? ChannelUtilities.s_doneWritingSentinel; 171 completeTask = parent._items.IsEmpty; 172 } 173 174 // If there are no items in the queue, complete the channel's task, 175 // as no more data can possibly arrive at this point. We do this outside 176 // of the lock in case we'll be running synchronous completions, and we 177 // do it before completing blocked/waiting readers, so that when they 178 // wake up they'll see the task as being completed. 179 if (completeTask) 180 { 181 ChannelUtilities.Complete(parent._completion, error); 182 } 183 184 // At this point, _blockedWriters and _waitingReaders/Writers will not be mutated: 185 // they're only mutated by readers/writers while holding the lock, and only if _doneWriting is null. 186 // We also know that only one thread (this one) will ever get here, as only that thread 187 // will be the one to transition from _doneWriting false to true. As such, we can 188 // freely manipulate them without any concurrency concerns. 189 ChannelUtilities.FailInteractors<WriterInteractor<T>, VoidResult>(parent._blockedWriters, ChannelUtilities.CreateInvalidCompletionException(error)); 190 ChannelUtilities.WakeUpWaiters(ref parent._waitingReaders, result: false, error: error); 191 ChannelUtilities.WakeUpWaiters(ref parent._waitingWriters, result: false, error: error); 192 193 // Successfully transitioned to completed. 194 return true; 195 } 196 TryWrite(T item)197 public override bool TryWrite(T item) 198 { 199 ReaderInteractor<bool> waitingReaders = null; 200 201 BoundedChannel<T> parent = _parent; 202 lock (parent.SyncObj) 203 { 204 parent.AssertInvariants(); 205 206 // If we're done writing, nothing more to do. 207 if (parent._doneWriting != null) 208 { 209 return false; 210 } 211 212 // Get the number of items in the channel currently. 213 int count = parent._items.Count; 214 215 if (count == 0) 216 { 217 // There are no items in the channel, which means we may have waiting readers. 218 // Store the item. 219 parent._items.EnqueueTail(item); 220 waitingReaders = parent._waitingReaders; 221 if (waitingReaders == null) 222 { 223 // If no one's waiting to be notified about a 0-to-1 transition, we're done. 224 return true; 225 } 226 parent._waitingReaders = null; 227 } 228 else if (count < parent._bufferedCapacity) 229 { 230 // There's room in the channel. Since we're not transitioning from 0-to-1 and 231 // since there's room, we can simply store the item and exit without having to 232 // worry about blocked/waiting readers. 233 parent._items.EnqueueTail(item); 234 return true; 235 } 236 else if (parent._mode == BoundedChannelFullMode.Wait) 237 { 238 // The channel is full and we're in a wait mode. 239 // Simply exit and let the caller know we didn't write the data. 240 return false; 241 } 242 else if (parent._mode == BoundedChannelFullMode.DropWrite) 243 { 244 // The channel is full. Just ignore the item being added 245 // but say we added it. 246 return true; 247 } 248 else 249 { 250 // The channel is full, and we're in a dropping mode. 251 // Drop either the oldest or the newest and write the new item. 252 T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ? 253 parent._items.DequeueTail() : 254 parent._items.DequeueHead(); 255 parent._items.EnqueueTail(item); 256 return true; 257 } 258 } 259 260 // We stored an item bringing the count up from 0 to 1. Alert 261 // any waiting readers that there may be something for them to consume. 262 // Since we're no longer holding the lock, it's possible we'll end up 263 // waking readers that have since come in. 264 waitingReaders.Success(item: true); 265 return true; 266 } 267 WaitToWriteAsync(CancellationToken cancellationToken)268 public override Task<bool> WaitToWriteAsync(CancellationToken cancellationToken) 269 { 270 if (cancellationToken.IsCancellationRequested) 271 { 272 return Task.FromCanceled<bool>(cancellationToken); 273 } 274 275 BoundedChannel<T> parent = _parent; 276 lock (parent.SyncObj) 277 { 278 parent.AssertInvariants(); 279 280 // If we're done writing, no writes will ever succeed. 281 if (parent._doneWriting != null) 282 { 283 return parent._doneWriting != ChannelUtilities.s_doneWritingSentinel ? 284 Task.FromException<bool>(parent._doneWriting) : 285 ChannelUtilities.s_falseTask; 286 } 287 288 // If there's space to write, a write is possible. 289 // And if the mode involves dropping/ignoring, we can always write, as even if it's 290 // full we'll just drop an element to make room. 291 if (parent._items.Count < parent._bufferedCapacity || parent._mode != BoundedChannelFullMode.Wait) 292 { 293 return ChannelUtilities.s_trueTask; 294 } 295 296 // We're still allowed to write, but there's no space, so ensure a waiter is queued and return it. 297 return ChannelUtilities.GetOrCreateWaiter(ref parent._waitingWriters, runContinuationsAsynchronously: true, cancellationToken); 298 } 299 } 300 WriteAsync(T item, CancellationToken cancellationToken)301 public override Task WriteAsync(T item, CancellationToken cancellationToken) 302 { 303 if (cancellationToken.IsCancellationRequested) 304 { 305 return Task.FromCanceled(cancellationToken); 306 } 307 308 ReaderInteractor<bool> waitingReaders = null; 309 310 BoundedChannel<T> parent = _parent; 311 lock (parent.SyncObj) 312 { 313 parent.AssertInvariants(); 314 315 // If we're done writing, trying to write is an error. 316 if (parent._doneWriting != null) 317 { 318 return Task.FromException(ChannelUtilities.CreateInvalidCompletionException(parent._doneWriting)); 319 } 320 321 // Get the number of items in the channel currently. 322 int count = parent._items.Count; 323 324 if (count == 0) 325 { 326 // There are no items in the channel, which means we may have waiting readers. 327 // Store the item. 328 parent._items.EnqueueTail(item); 329 waitingReaders = parent._waitingReaders; 330 if (waitingReaders == null) 331 { 332 // If no one's waiting to be notified about a 0-to-1 transition, we're done. 333 return ChannelUtilities.s_trueTask; 334 } 335 parent._waitingReaders = null; 336 } 337 else if (count < parent._bufferedCapacity) 338 { 339 // There's room in the channel. Since we're not transitioning from 0-to-1 and 340 // since there's room, we can simply store the item and exit without having to 341 // worry about blocked/waiting readers. 342 parent._items.EnqueueTail(item); 343 return ChannelUtilities.s_trueTask; 344 } 345 else if (parent._mode == BoundedChannelFullMode.Wait) 346 { 347 // The channel is full and we're in a wait mode. 348 // Queue the writer. 349 var writer = WriterInteractor<T>.Create(runContinuationsAsynchronously: true, item, cancellationToken); 350 parent._blockedWriters.EnqueueTail(writer); 351 return writer.Task; 352 } 353 else if (parent._mode == BoundedChannelFullMode.DropWrite) 354 { 355 // The channel is full and we're in ignore mode. 356 // Ignore the item but say we accepted it. 357 return ChannelUtilities.s_trueTask; 358 } 359 else 360 { 361 // The channel is full, and we're in a dropping mode. 362 // Drop either the oldest or the newest and write the new item. 363 T droppedItem = parent._mode == BoundedChannelFullMode.DropNewest ? 364 parent._items.DequeueTail() : 365 parent._items.DequeueHead(); 366 parent._items.EnqueueTail(item); 367 return ChannelUtilities.s_trueTask; 368 } 369 } 370 371 // We stored an item bringing the count up from 0 to 1. Alert 372 // any waiting readers that there may be something for them to consume. 373 // Since we're no longer holding the lock, it's possible we'll end up 374 // waking readers that have since come in. 375 waitingReaders.Success(item: true); 376 return ChannelUtilities.s_trueTask; 377 } 378 } 379 380 [Conditional("DEBUG")] AssertInvariants()381 private void AssertInvariants() 382 { 383 Debug.Assert(SyncObj != null, "The sync obj must not be null."); 384 Debug.Assert(Monitor.IsEntered(SyncObj), "Invariants can only be validated while holding the lock."); 385 386 if (!_items.IsEmpty) 387 { 388 Debug.Assert(_waitingReaders == null, "There are items available, so there shouldn't be any waiting readers."); 389 } 390 if (_items.Count < _bufferedCapacity) 391 { 392 Debug.Assert(_blockedWriters.IsEmpty, "There's space available, so there shouldn't be any blocked writers."); 393 Debug.Assert(_waitingWriters == null, "There's space available, so there shouldn't be any waiting writers."); 394 } 395 if (!_blockedWriters.IsEmpty) 396 { 397 Debug.Assert(_items.Count == _bufferedCapacity, "We should have a full buffer if there's a blocked writer."); 398 } 399 if (_completion.Task.IsCompleted) 400 { 401 Debug.Assert(_doneWriting != null, "We can only complete if we're done writing."); 402 } 403 } 404 405 /// <summary>Gets the number of items in the channel. This should only be used by the debugger.</summary> 406 private int ItemsCountForDebugger => _items.Count; 407 408 /// <summary>Gets an enumerator the debugger can use to show the contents of the channel.</summary> GetEnumerator()409 IEnumerator<T> IDebugEnumerable<T>.GetEnumerator() => _items.GetEnumerator(); 410 } 411 } 412