1 // ==++== 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // ==--== 6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 7 // 8 // BlockingCollection.cs 9 // 10 // <OWNER>Microsoft</OWNER> 11 // 12 // A class that implements the bounding and blocking functionality while abstracting away 13 // the underlying storage mechanism. This file also contains BlockingCollection's 14 // associated debugger view type. 15 // 16 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 17 #pragma warning disable 0420 18 using System; 19 using System.Collections.Generic; 20 using System.Collections; 21 using System.Diagnostics; 22 using System.Globalization; 23 using System.Security.Permissions; 24 using System.Runtime.InteropServices; 25 using System.Threading; 26 27 namespace System.Collections.Concurrent 28 { 29 /// <summary> 30 /// Provides blocking and bounding capabilities for thread-safe collections that 31 /// implement <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. 32 /// </summary> 33 /// <remarks> 34 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> represents a collection 35 /// that allows for thread-safe adding and removing of data. 36 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is used as a wrapper 37 /// for an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> instance, allowing 38 /// removal attempts from the collection to block until data is available to be removed. Similarly, 39 /// a <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> can be created to enforce 40 /// an upper-bound on the number of data elements allowed in the 41 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>; addition attempts to the 42 /// collection may then block until space is available to store the added items. In this manner, 43 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is similar to a traditional 44 /// blocking queue data structure, except that the underlying data storage mechanism is abstracted 45 /// away as an <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/>. 46 /// </remarks> 47 /// <typeparam name="T">Specifies the type of elements in the collection.</typeparam> 48 [ComVisible(false)] 49 #if !FEATURE_NETCORE 50 #pragma warning disable 0618 51 [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] 52 #pragma warning restore 0618 53 #endif 54 [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))] 55 [DebuggerDisplay("Count = {Count}, Type = {m_collection}")] 56 public class BlockingCollection<T> : IEnumerable<T>, ICollection, IDisposable, IReadOnlyCollection<T> 57 { 58 private IProducerConsumerCollection<T> m_collection; 59 private int m_boundedCapacity; 60 private const int NON_BOUNDED = -1; 61 private SemaphoreSlim m_freeNodes; 62 private SemaphoreSlim m_occupiedNodes; 63 private bool m_isDisposed; 64 private CancellationTokenSource m_ConsumersCancellationTokenSource; 65 private CancellationTokenSource m_ProducersCancellationTokenSource; 66 67 private volatile int m_currentAdders; 68 private const int COMPLETE_ADDING_ON_MASK = unchecked((int)0x80000000); 69 70 #region Properties 71 /// <summary>Gets the bounded capacity of this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</summary> 72 /// <value>The bounded capacity of this collection, or int.MaxValue if no bound was supplied.</value> 73 /// <exception cref="T:System.ObjectDisposedException">The <see 74 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 75 public int BoundedCapacity 76 { 77 get 78 { 79 CheckDisposed(); 80 return m_boundedCapacity; 81 } 82 } 83 84 /// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding.</summary> 85 /// <value>Whether this collection has been marked as complete for adding.</value> 86 /// <exception cref="T:System.ObjectDisposedException">The <see 87 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 88 public bool IsAddingCompleted 89 { 90 get 91 { 92 CheckDisposed(); 93 return (m_currentAdders == COMPLETE_ADDING_ON_MASK); 94 } 95 } 96 97 /// <summary>Gets whether this <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked as complete for adding and is empty.</summary> 98 /// <value>Whether this collection has been marked as complete for adding and is empty.</value> 99 /// <exception cref="T:System.ObjectDisposedException">The <see 100 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 101 public bool IsCompleted 102 { 103 get 104 { 105 CheckDisposed(); 106 return (IsAddingCompleted && (m_occupiedNodes.CurrentCount == 0)); 107 } 108 } 109 110 /// <summary>Gets the number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary> 111 /// <value>The number of items contained in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</value> 112 /// <exception cref="T:System.ObjectDisposedException">The <see 113 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 114 public int Count 115 { 116 get 117 { 118 CheckDisposed(); 119 return m_occupiedNodes.CurrentCount; 120 } 121 } 122 123 /// <summary>Gets a value indicating whether access to the <see cref="T:System.Collections.ICollection"/> is synchronized.</summary> 124 /// <exception cref="T:System.ObjectDisposedException">The <see 125 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 126 bool ICollection.IsSynchronized 127 { 128 get 129 { 130 CheckDisposed(); 131 return false; 132 } 133 } 134 135 /// <summary> 136 /// Gets an object that can be used to synchronize access to the <see 137 /// cref="T:System.Collections.ICollection"/>. This property is not supported. 138 /// </summary> 139 /// <exception cref="T:System.NotSupportedException">The SyncRoot property is not supported.</exception> 140 object ICollection.SyncRoot 141 { 142 get 143 { 144 throw new NotSupportedException(SR.GetString(SR.ConcurrentCollection_SyncRoot_NotSupported)); 145 } 146 } 147 #endregion 148 149 150 /// <summary>Initializes a new instance of the 151 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> 152 /// class without an upper-bound. 153 /// </summary> 154 /// <remarks> 155 /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue<T></see>. 156 /// </remarks> BlockingCollection()157 public BlockingCollection() 158 : this(new ConcurrentQueue<T>()) 159 { 160 } 161 162 /// <summary>Initializes a new instance of the <see 163 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> 164 /// class with the specified upper-bound. 165 /// </summary> 166 /// <param name="boundedCapacity">The bounded size of the collection.</param> 167 /// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is 168 /// not a positive value.</exception> 169 /// <remarks> 170 /// The default underlying collection is a <see cref="System.Collections.Concurrent.ConcurrentQueue{T}">ConcurrentQueue<T></see>. 171 /// </remarks> BlockingCollection(int boundedCapacity)172 public BlockingCollection(int boundedCapacity) 173 : this(new ConcurrentQueue<T>(), boundedCapacity) 174 { 175 } 176 177 /// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> 178 /// class with the specified upper-bound and using the provided 179 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary> 180 /// <param name="collection">The collection to use as the underlying data store.</param> 181 /// <param name="boundedCapacity">The bounded size of the collection.</param> 182 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is 183 /// null.</exception> 184 /// <exception cref="T:System.ArgumentOutOfRangeException">The <paramref name="boundedCapacity"/> is not a positive value.</exception> 185 /// <exception cref="System.ArgumentException">The supplied <paramref name="collection"/> contains more values 186 /// than is permitted by <paramref name="boundedCapacity"/>.</exception> BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)187 public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity) 188 { 189 if (boundedCapacity < 1) 190 { 191 throw new ArgumentOutOfRangeException( 192 "boundedCapacity", boundedCapacity, 193 SR.GetString(SR.BlockingCollection_ctor_BoundedCapacityRange)); 194 } 195 if (collection == null) 196 { 197 throw new ArgumentNullException("collection"); 198 } 199 int count = collection.Count; 200 if (count > boundedCapacity) 201 { 202 throw new ArgumentException(SR.GetString(SR.BlockingCollection_ctor_CountMoreThanCapacity)); 203 } 204 Initialize(collection, boundedCapacity, count); 205 } 206 207 /// <summary>Initializes a new instance of the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> 208 /// class without an upper-bound and using the provided 209 /// <see cref="T:System.Collections.Concurrent.IProducerConsumerCollection{T}"/> as its underlying data store.</summary> 210 /// <param name="collection">The collection to use as the underlying data store.</param> 211 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collection"/> argument is 212 /// null.</exception> BlockingCollection(IProducerConsumerCollection<T> collection)213 public BlockingCollection(IProducerConsumerCollection<T> collection) 214 { 215 if (collection == null) 216 { 217 throw new ArgumentNullException("collection"); 218 } 219 Initialize(collection, NON_BOUNDED, collection.Count); 220 } 221 222 /// <summary>Initializes the BlockingCollection instance.</summary> 223 /// <param name="collection">The collection to use as the underlying data store.</param> 224 /// <param name="boundedCapacity">The bounded size of the collection.</param> 225 /// <param name="collectionCount">The number of items currently in the underlying collection.</param> Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)226 private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount) 227 { 228 Debug.Assert(boundedCapacity > 0 || boundedCapacity == NON_BOUNDED); 229 230 m_collection = collection; 231 m_boundedCapacity = boundedCapacity; ; 232 m_isDisposed = false; 233 m_ConsumersCancellationTokenSource = new CancellationTokenSource(); 234 m_ProducersCancellationTokenSource = new CancellationTokenSource(); 235 236 if (boundedCapacity == NON_BOUNDED) 237 { 238 m_freeNodes = null; 239 } 240 else 241 { 242 Debug.Assert(boundedCapacity > 0); 243 m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount); 244 } 245 246 247 m_occupiedNodes = new SemaphoreSlim(collectionCount); 248 } 249 250 251 /// <summary> 252 /// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 253 /// </summary> 254 /// <param name="item">The item to be added to the collection. The value can be a null reference.</param> 255 /// <exception cref="T:System.InvalidOperationException">The <see 256 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked 257 /// as complete with regards to additions.</exception> 258 /// <exception cref="T:System.ObjectDisposedException">The <see 259 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 260 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> 261 /// <remarks> 262 /// If a bounded capacity was specified when this instance of 263 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized, 264 /// a call to Add may block until space is available to store the provided item. 265 /// </remarks> Add(T item)266 public void Add(T item) 267 { 268 #if DEBUG 269 bool tryAddReturnValue = 270 #endif 271 TryAddWithNoTimeValidation(item, Timeout.Infinite, new CancellationToken()); 272 #if DEBUG 273 Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); 274 #endif 275 } 276 277 /// <summary> 278 /// Adds the item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 279 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 280 /// canceled. 281 /// </summary> 282 /// <param name="item">The item to be added to the collection. The value can be a null reference.</param> 283 /// <param name="cancellationToken">A cancellation token to observe.</param> 284 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 285 /// <exception cref="T:System.InvalidOperationException">The <see 286 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked 287 /// as complete with regards to additions.</exception> 288 /// <exception cref="T:System.ObjectDisposedException">The <see 289 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 290 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> 291 /// <remarks> 292 /// If a bounded capacity was specified when this instance of 293 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> was initialized, 294 /// a call to <see cref="Add(T,System.Threading.CancellationToken)"/> may block until space is available to store the provided item. 295 /// </remarks> Add(T item, CancellationToken cancellationToken)296 public void Add(T item, CancellationToken cancellationToken) 297 { 298 #if DEBUG 299 bool tryAddReturnValue = 300 #endif 301 TryAddWithNoTimeValidation(item, Timeout.Infinite, cancellationToken); 302 #if DEBUG 303 Debug.Assert(tryAddReturnValue, "TryAdd() was expected to return true."); 304 #endif 305 } 306 307 /// <summary> 308 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 309 /// </summary> 310 /// <param name="item">The item to be added to the collection.</param> 311 /// <returns>true if the <paramref name="item"/> could be added; otherwise, false.</returns> 312 /// <exception cref="T:System.InvalidOperationException">The <see 313 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked 314 /// as complete with regards to additions.</exception> 315 /// <exception cref="T:System.ObjectDisposedException">The <see 316 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 317 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> TryAdd(T item)318 public bool TryAdd(T item) 319 { 320 return TryAddWithNoTimeValidation(item, 0, new CancellationToken()); 321 } 322 323 /// <summary> 324 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 325 /// </summary> 326 /// <param name="item">The item to be added to the collection.</param> 327 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds 328 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely. 329 /// </param> 330 /// <returns>true if the <paramref name="item"/> could be added to the collection within 331 /// the alloted time; otherwise, false.</returns> 332 /// <exception cref="T:System.InvalidOperationException">The <see 333 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked 334 /// as complete with regards to additions.</exception> 335 /// <exception cref="T:System.ObjectDisposedException">The <see 336 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 337 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number 338 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than 339 /// <see cref="System.Int32.MaxValue"/>.</exception> 340 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> TryAdd(T item, TimeSpan timeout)341 public bool TryAdd(T item, TimeSpan timeout) 342 { 343 ValidateTimeout(timeout); 344 return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, new CancellationToken()); 345 } 346 347 /// <summary> 348 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 349 /// </summary> 350 /// <param name="item">The item to be added to the collection.</param> 351 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 352 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 353 /// <returns>true if the <paramref name="item"/> could be added to the collection within 354 /// the alloted time; otherwise, false.</returns> 355 /// <exception cref="T:System.InvalidOperationException">The <see 356 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked 357 /// as complete with regards to additions.</exception> 358 /// <exception cref="T:System.ObjectDisposedException">The <see 359 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 360 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 361 /// negative number other than -1, which represents an infinite time-out.</exception> 362 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> TryAdd(T item, int millisecondsTimeout)363 public bool TryAdd(T item, int millisecondsTimeout) 364 { 365 ValidateMillisecondsTimeout(millisecondsTimeout); 366 return TryAddWithNoTimeValidation(item, millisecondsTimeout, new CancellationToken()); 367 } 368 369 /// <summary> 370 /// Attempts to add the specified item to the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 371 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 372 /// canceled. 373 /// </summary> 374 /// <param name="item">The item to be added to the collection.</param> 375 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 376 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 377 /// <param name="cancellationToken">A cancellation token to observe.</param> 378 /// <returns>true if the <paramref name="item"/> could be added to the collection within 379 /// the alloted time; otherwise, false.</returns> 380 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 381 /// <exception cref="T:System.InvalidOperationException">The <see 382 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been marked 383 /// as complete with regards to additions.</exception> 384 /// <exception cref="T:System.ObjectDisposedException">The <see 385 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 386 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 387 /// negative number other than -1, which represents an infinite time-out.</exception> 388 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)389 public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken) 390 { 391 ValidateMillisecondsTimeout(millisecondsTimeout); 392 return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken); 393 } 394 395 /// <summary>Adds an item into the underlying data store using its IProducerConsumerCollection<T>.Add 396 /// method. If a bounded capacity was specified and the collection was full, 397 /// this method will wait for, at most, the timeout period trying to add the item. 398 /// If the timeout period was exhaused before successfully adding the item this method will 399 /// return false.</summary> 400 /// <param name="item">The item to be added to the collection.</param> 401 /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to accept the item, 402 /// or Timeout.Infinite to wait indefinitely.</param> 403 /// <param name="cancellationToken">A cancellation token to observe.</param> 404 /// <returns>False if the collection remained full till the timeout period was exhausted.True otherwise.</returns> 405 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 406 /// <exception cref="System.InvalidOperationException">the collection has already been marked 407 /// as complete with regards to additions.</exception> 408 /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception> 409 /// <exception cref="T:System.InvalidOperationException">The underlying collection didn't accept the item.</exception> TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)410 private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken) 411 { 412 CheckDisposed(); 413 414 if (cancellationToken.IsCancellationRequested) 415 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); 416 417 if (IsAddingCompleted) 418 { 419 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); 420 } 421 422 bool waitForSemaphoreWasSuccessful = true; 423 424 if (m_freeNodes != null) 425 { 426 //If the m_freeNodes semaphore threw OperationCanceledException then this means that CompleteAdding() 427 //was called concurrently with Adding which is not supported by BlockingCollection. 428 CancellationTokenSource linkedTokenSource = null; 429 try 430 { 431 waitForSemaphoreWasSuccessful = m_freeNodes.Wait(0); 432 if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) 433 { 434 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource( 435 cancellationToken, m_ProducersCancellationTokenSource.Token); 436 waitForSemaphoreWasSuccessful = m_freeNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); 437 } 438 } 439 catch (OperationCanceledException) 440 { 441 //if cancellation was via external token, throw an OCE 442 if (cancellationToken.IsCancellationRequested) 443 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); 444 445 //if cancellation was via internal token, this indicates invalid use, hence InvalidOpEx. 446 //Contract.Assert(m_ProducersCancellationTokenSource.Token.IsCancellationRequested); 447 448 throw new InvalidOperationException 449 (SR.GetString(SR.BlockingCollection_Add_ConcurrentCompleteAdd)); 450 } 451 finally 452 { 453 if (linkedTokenSource != null) 454 { 455 linkedTokenSource.Dispose(); 456 } 457 } 458 } 459 if (waitForSemaphoreWasSuccessful) 460 { 461 // Update the adders count if the complete adding was not requested, otherwise 462 // spins until all adders finish then throw IOE 463 // The idea behind to spin untill all adders finish, is to avoid to return to the caller with IOE while there are still some adders have 464 // not been finished yet 465 SpinWait spinner = new SpinWait(); 466 while (true) 467 { 468 int observedAdders = m_currentAdders; 469 if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) 470 { 471 spinner.Reset(); 472 // CompleteAdding is requested, spin then throw 473 while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); 474 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Completed)); 475 } 476 if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders + 1, observedAdders) == observedAdders) 477 { 478 Debug.Assert((observedAdders + 1) <= (~COMPLETE_ADDING_ON_MASK), "The number of concurrent adders thread excceeded the maximum limit."); 479 break; 480 } 481 spinner.SpinOnce(); 482 } 483 484 // This outer try/finally to workaround of repeating the decrement adders code 3 times, because we should decrement the adders if: 485 // 1- m_collection.TryAdd threw an exception 486 // 2- m_collection.TryAdd succeeded 487 // 3- m_collection.TryAdd returned false 488 // so we put the decrement code in the finally block 489 try 490 { 491 492 //TryAdd is guaranteed to find a place to add the element. Its return value depends 493 //on the semantics of the underlying store. Some underlying stores will not add an already 494 //existing item and thus TryAdd returns false indicating that the size of the underlying 495 //store did not increase. 496 497 498 bool addingSucceeded = false; 499 try 500 { 501 //The token may have been canceled before the collection had space available, so we need a check after the wait has completed. 502 //This fixes bug #702328, case 2 of 2. 503 cancellationToken.ThrowIfCancellationRequested(); 504 addingSucceeded = m_collection.TryAdd(item); 505 } 506 catch 507 { 508 //TryAdd did not result in increasing the size of the underlying store and hence we need 509 //to increment back the count of the m_freeNodes semaphore. 510 if (m_freeNodes != null) 511 { 512 m_freeNodes.Release(); 513 } 514 throw; 515 } 516 if (addingSucceeded) 517 { 518 //After adding an element to the underlying storage, signal to the consumers 519 //waiting on m_occupiedNodes that there is a new item added ready to be consumed. 520 m_occupiedNodes.Release(); 521 } 522 else 523 { 524 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_Add_Failed)); 525 } 526 } 527 finally 528 { 529 // decrement the adders count 530 Debug.Assert((m_currentAdders & ~COMPLETE_ADDING_ON_MASK) > 0); 531 Interlocked.Decrement(ref m_currentAdders); 532 } 533 534 535 } 536 return waitForSemaphoreWasSuccessful; 537 } 538 539 /// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary> 540 /// <returns>The item removed from the collection.</returns> 541 /// <exception cref="T:System.OperationCanceledException">The <see 542 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked 543 /// as complete with regards to additions.</exception> 544 /// <exception cref="T:System.ObjectDisposedException">The <see 545 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 546 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified 547 /// outside of this <see 548 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 549 /// <remarks>A call to <see cref="Take()"/> may block until an item is available to be removed.</remarks> Take()550 public T Take() 551 { 552 T item; 553 554 if (!TryTake(out item, Timeout.Infinite, CancellationToken.None)) 555 { 556 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); 557 } 558 559 return item; 560 } 561 562 /// <summary>Takes an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>.</summary> 563 /// <returns>The item removed from the collection.</returns> 564 /// <exception cref="T:System.OperationCanceledException">If the <see cref="CancellationToken"/> is 565 /// canceled or the <see 566 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> is empty and has been marked 567 /// as complete with regards to additions.</exception> 568 /// <exception cref="T:System.ObjectDisposedException">The <see 569 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 570 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified 571 /// outside of this <see 572 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 573 /// <remarks>A call to <see cref="Take(CancellationToken)"/> may block until an item is available to be removed.</remarks> Take(CancellationToken cancellationToken)574 public T Take(CancellationToken cancellationToken) 575 { 576 T item; 577 578 if (!TryTake(out item, Timeout.Infinite, cancellationToken)) 579 { 580 throw new InvalidOperationException(SR.GetString(SR.BlockingCollection_CantTakeWhenDone)); 581 } 582 583 return item; 584 } 585 586 /// <summary> 587 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 588 /// </summary> 589 /// <param name="item">The item removed from the collection.</param> 590 /// <returns>true if an item could be removed; otherwise, false.</returns> 591 /// <exception cref="T:System.ObjectDisposedException">The <see 592 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 593 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified 594 /// outside of this <see 595 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> TryTake(out T item)596 public bool TryTake(out T item) 597 { 598 return TryTake(out item, 0, CancellationToken.None); 599 } 600 601 /// <summary> 602 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 603 /// </summary> 604 /// <param name="item">The item removed from the collection.</param> 605 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds 606 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely. 607 /// </param> 608 /// <returns>true if an item could be removed from the collection within 609 /// the alloted time; otherwise, false.</returns> 610 /// <exception cref="T:System.ObjectDisposedException">The <see 611 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 612 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number 613 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than 614 /// <see cref="System.Int32.MaxValue"/>.</exception> 615 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified 616 /// outside of this <see 617 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> TryTake(out T item, TimeSpan timeout)618 public bool TryTake(out T item, TimeSpan timeout) 619 { 620 ValidateTimeout(timeout); 621 return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null); 622 } 623 624 /// <summary> 625 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 626 /// </summary> 627 /// <param name="item">The item removed from the collection.</param> 628 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 629 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 630 /// <returns>true if an item could be removed from the collection within 631 /// the alloted time; otherwise, false.</returns> 632 /// <exception cref="T:System.ObjectDisposedException">The <see 633 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 634 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 635 /// negative number other than -1, which represents an infinite time-out.</exception> 636 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified 637 /// outside of this <see 638 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> TryTake(out T item, int millisecondsTimeout)639 public bool TryTake(out T item, int millisecondsTimeout) 640 { 641 ValidateMillisecondsTimeout(millisecondsTimeout); 642 return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null); 643 } 644 645 /// <summary> 646 /// Attempts to remove an item from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/>. 647 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 648 /// canceled. 649 /// </summary> 650 /// <param name="item">The item removed from the collection.</param> 651 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 652 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 653 /// <param name="cancellationToken">A cancellation token to observe.</param> 654 /// <returns>true if an item could be removed from the collection within 655 /// the alloted time; otherwise, false.</returns> 656 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 657 /// <exception cref="T:System.ObjectDisposedException">The <see 658 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 659 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 660 /// negative number other than -1, which represents an infinite time-out.</exception> 661 /// <exception cref="T:System.InvalidOperationException">The underlying collection was modified 662 /// outside of this <see 663 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)664 public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken) 665 { 666 ValidateMillisecondsTimeout(millisecondsTimeout); 667 return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null); 668 } 669 670 /// <summary>Takes an item from the underlying data store using its IProducerConsumerCollection<T>.Take 671 /// method. If the collection was empty, this method will wait for, at most, the timeout period (if AddingIsCompleted is false) 672 /// trying to remove an item. If the timeout period was exhaused before successfully removing an item 673 /// this method will return false. 674 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 675 /// canceled. 676 /// </summary> 677 /// <param name="item">The item removed from the collection.</param> 678 /// <param name="millisecondsTimeout">The number of milliseconds to wait for the collection to have an item available 679 /// for removal, or Timeout.Infinite to wait indefinitely.</param> 680 /// <param name="cancellationToken">A cancellation token to observe.</param> 681 /// <param name="combinedTokenSource">A combined cancellation token if created, it is only created by GetConsumingEnumerable to avoid creating the linked token 682 /// multiple times.</param> 683 /// <returns>False if the collection remained empty till the timeout period was exhausted. True otherwise.</returns> 684 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 685 /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception> TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource)686 private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource) 687 { 688 CheckDisposed(); 689 item = default(T); 690 691 if (cancellationToken.IsCancellationRequested) 692 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); 693 694 //If the collection is completed then there is no need to wait. 695 if (IsCompleted) 696 { 697 return false; 698 } 699 bool waitForSemaphoreWasSuccessful = false; 700 701 // set the combined token source to the combinedToken paramater if it is not null (came from GetConsumingEnumerable) 702 CancellationTokenSource linkedTokenSource = combinedTokenSource; 703 try 704 { 705 waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(0); 706 if (waitForSemaphoreWasSuccessful == false && millisecondsTimeout != 0) 707 { 708 // create the linked token if it is not created yet 709 if (combinedTokenSource == null) 710 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, 711 m_ConsumersCancellationTokenSource.Token); 712 waitForSemaphoreWasSuccessful = m_occupiedNodes.Wait(millisecondsTimeout, linkedTokenSource.Token); 713 } 714 } 715 //The collection became completed while waiting on the semaphore. 716 catch (OperationCanceledException) 717 { 718 if (cancellationToken.IsCancellationRequested) 719 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); 720 721 return false; 722 } 723 finally 724 { 725 // only dispose the combined token source if we created it here, otherwise the caller (GetConsumingEnumerable) is responsible for disposing it 726 if (linkedTokenSource != null && combinedTokenSource == null) 727 { 728 linkedTokenSource.Dispose(); 729 } 730 } 731 732 if (waitForSemaphoreWasSuccessful) 733 { 734 bool removeSucceeded = false; 735 bool removeFaulted = true; 736 try 737 { 738 //The token may have been canceled before an item arrived, so we need a check after the wait has completed. 739 //This fixes bug #702328, case 1 of 2. 740 cancellationToken.ThrowIfCancellationRequested(); 741 742 //If an item was successfully removed from the underlying collection. 743 removeSucceeded = m_collection.TryTake(out item); 744 removeFaulted = false; 745 if (!removeSucceeded) 746 { 747 // Check if the collection is empty which means that the collection was modified outside BlockingCollection 748 throw new InvalidOperationException 749 (SR.GetString(SR.BlockingCollection_Take_CollectionModified)); 750 } 751 } 752 finally 753 { 754 // removeFaulted implies !removeSucceeded, but the reverse is not true. 755 if (removeSucceeded) 756 { 757 if (m_freeNodes != null) 758 { 759 Debug.Assert(m_boundedCapacity != NON_BOUNDED); 760 m_freeNodes.Release(); 761 } 762 } 763 else if (removeFaulted) 764 { 765 m_occupiedNodes.Release(); 766 } 767 //Last remover will detect that it has actually removed the last item from the 768 //collection and that CompleteAdding() was called previously. Thus, it will cancel the semaphores 769 //so that any thread waiting on them wakes up. Note several threads may call CancelWaitingConsumers 770 //but this is not a problem. 771 if (IsCompleted) 772 { 773 CancelWaitingConsumers(); 774 } 775 } 776 } 777 return waitForSemaphoreWasSuccessful; 778 } 779 780 781 782 /// <summary> 783 /// Adds the specified item to any one of the specified 784 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 785 /// </summary> 786 /// <param name="collections">The array of collections.</param> 787 /// <param name="item">The item to be added to one of the collections.</param> 788 /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns> 789 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 790 /// null.</exception> 791 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 792 /// a 0-length array or contains a null element, or at least one of collections has been 793 /// marked as complete for adding.</exception> 794 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 795 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 796 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception> 797 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 798 /// 62 for STA and 63 for MTA.</exception> 799 /// <remarks> 800 /// If a bounded capacity was specified when all of the 801 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized, 802 /// a call to AddToAny may block until space is available in one of the collections 803 /// to store the provided item. 804 /// </remarks> AddToAny(BlockingCollection<T>[] collections, T item)805 public static int AddToAny(BlockingCollection<T>[] collections, T item) 806 { 807 #if DEBUG 808 int tryAddAnyReturnValue = 809 #else 810 return 811 #endif 812 TryAddToAny(collections, item, Timeout.Infinite, CancellationToken.None); 813 #if DEBUG 814 Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) 815 , "TryAddToAny() was expected to return an index within the bounds of the collections array."); 816 return tryAddAnyReturnValue; 817 #endif 818 } 819 820 /// <summary> 821 /// Adds the specified item to any one of the specified 822 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 823 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 824 /// canceled. 825 /// </summary> 826 /// <param name="collections">The array of collections.</param> 827 /// <param name="item">The item to be added to one of the collections.</param> 828 /// <param name="cancellationToken">A cancellation token to observe.</param> 829 /// <returns>The index of the collection in the <paramref name="collections"/> array to which the item was added.</returns> 830 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 831 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 832 /// null.</exception> 833 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 834 /// a 0-length array or contains a null element, or at least one of collections has been 835 /// marked as complete for adding.</exception> 836 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 837 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 838 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception> 839 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 840 /// 62 for STA and 63 for MTA.</exception> 841 /// <remarks> 842 /// If a bounded capacity was specified when all of the 843 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances were initialized, 844 /// a call to AddToAny may block until space is available in one of the collections 845 /// to store the provided item. 846 /// </remarks> AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)847 public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken) 848 { 849 #if DEBUG 850 int tryAddAnyReturnValue = 851 #else 852 return 853 #endif 854 TryAddToAny(collections, item, Timeout.Infinite, cancellationToken); 855 #if DEBUG 856 Debug.Assert((tryAddAnyReturnValue >= 0 && tryAddAnyReturnValue < collections.Length) 857 , "TryAddToAny() was expected to return an index within the bounds of the collections array."); 858 return tryAddAnyReturnValue; 859 #endif 860 } 861 862 /// <summary> 863 /// Attempts to add the specified item to any one of the specified 864 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 865 /// </summary> 866 /// <param name="collections">The array of collections.</param> 867 /// <param name="item">The item to be added to one of the collections.</param> 868 /// <returns>The index of the collection in the <paramref name="collections"/> 869 /// array to which the item was added, or -1 if the item could not be added.</returns> 870 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 871 /// null.</exception> 872 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 873 /// a 0-length array or contains a null element, or at least one of collections has been 874 /// marked as complete for adding.</exception> 875 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 876 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 877 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception> 878 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 879 /// 62 for STA and 63 for MTA.</exception> TryAddToAny(BlockingCollection<T>[] collections, T item)880 public static int TryAddToAny(BlockingCollection<T>[] collections, T item) 881 { 882 return TryAddToAny(collections, item, 0, CancellationToken.None); 883 } 884 885 /// <summary> 886 /// Attempts to add the specified item to any one of the specified 887 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 888 /// </summary> 889 /// <param name="collections">The array of collections.</param> 890 /// <param name="item">The item to be added to one of the collections.</param> 891 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds 892 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely. 893 /// </param> 894 /// <returns>The index of the collection in the <paramref name="collections"/> 895 /// array to which the item was added, or -1 if the item could not be added.</returns> 896 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 897 /// null.</exception> 898 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 899 /// a 0-length array or contains a null element, or at least one of collections has been 900 /// marked as complete for adding.</exception> 901 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 902 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 903 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number 904 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than 905 /// <see cref="System.Int32.MaxValue"/>.</exception> 906 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception> 907 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 908 /// 62 for STA and 63 for MTA.</exception> TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout)909 public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout) 910 { 911 ValidateTimeout(timeout); 912 return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None); 913 } 914 915 /// <summary> 916 /// Attempts to add the specified item to any one of the specified 917 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 918 /// </summary> 919 /// <param name="collections">The array of collections.</param> 920 /// <param name="item">The item to be added to one of the collections.</param> 921 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 922 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> /// <returns>The index of the collection in the <paramref name="collections"/> 923 /// array to which the item was added, or -1 if the item could not be added.</returns> 924 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 925 /// null.</exception> 926 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 927 /// a 0-length array or contains a null element, or at least one of collections has been 928 /// marked as complete for adding.</exception> 929 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 930 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 931 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 932 /// negative number other than -1, which represents an infinite time-out.</exception> 933 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception> 934 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 935 /// 62 for STA and 63 for MTA.</exception> TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)936 public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout) 937 { 938 ValidateMillisecondsTimeout(millisecondsTimeout); 939 return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None); 940 } 941 942 /// <summary> 943 /// Attempts to add the specified item to any one of the specified 944 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 945 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 946 /// canceled. 947 /// </summary> 948 /// <param name="collections">The array of collections.</param> 949 /// <param name="item">The item to be added to one of the collections.</param> 950 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 951 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 952 /// <returns>The index of the collection in the <paramref name="collections"/> 953 /// array to which the item was added, or -1 if the item could not be added.</returns> 954 /// <param name="cancellationToken">A cancellation token to observe.</param> 955 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 956 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 957 /// null.</exception> 958 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 959 /// a 0-length array or contains a null element, or at least one of collections has been 960 /// marked as complete for adding.</exception> 961 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 962 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 963 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 964 /// negative number other than -1, which represents an infinite time-out.</exception> 965 /// <exception cref="T:System.InvalidOperationException">At least one underlying collection didn't accept the item.</exception> 966 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 967 /// 62 for STA and 63 for MTA.</exception> TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)968 public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken) 969 { 970 ValidateMillisecondsTimeout(millisecondsTimeout); 971 return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken); 972 } 973 974 /// <summary>Adds an item to anyone of the specified collections. 975 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 976 /// canceled. 977 /// </summary> 978 /// <param name="collections">The collections into which the item can be added.</param> 979 /// <param name="item">The item to be added .</param> 980 /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the 981 /// operation, or -1 to wait indefinitely.</param> 982 /// <param name="externalCancellationToken">A cancellation token to observe.</param> 983 /// <returns>The index into collections for the collection which accepted the 984 /// adding of the item; -1 if the item could not be added.</returns> 985 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 986 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception> 987 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 988 /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception> 989 /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception> TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)990 private static int TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken) 991 { 992 ValidateCollectionsArray(collections, true); 993 const int OPERATION_FAILED = -1; 994 995 // Copy the wait time to another local variable to update it 996 int timeout = millisecondsTimeout; 997 998 uint startTime = 0; 999 if (millisecondsTimeout != Timeout.Infinite) 1000 { 1001 startTime = (uint)Environment.TickCount; 1002 } 1003 1004 // Fast path for adding if there is at least one unbounded collection 1005 int index = TryAddToAnyFast(collections, item); 1006 if (index > -1) 1007 return index; 1008 1009 1010 // Get wait handles and the tokens for all collections, 1011 // and construct a single combined token from all the tokens, 1012 // add the combined token handle to the handles list 1013 // call WaitAny for all handles 1014 // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not 1015 // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE 1016 // or one if the collection is AddingCompleted then throw AE 1017 CancellationToken[] collatedCancellationTokens; 1018 List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, true, out collatedCancellationTokens); 1019 1020 //Loop until one of these conditions is met: 1021 // 1- The operation is succeeded 1022 // 2- The timeout expired for try* versions 1023 // 3- The external token is cancelled, throw 1024 // 4- There is at least one collection marked as adding completed then throw 1025 while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) 1026 { 1027 index = -1; 1028 1029 using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens)) 1030 { 1031 handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list 1032 1033 //Wait for any collection to become available. 1034 index = WaitHandle.WaitAny(handles.ToArray(), timeout, false); 1035 1036 handles.RemoveAt(handles.Count - 1); //remove the linked token 1037 1038 if (linkedTokenSource.IsCancellationRequested) 1039 { 1040 if (externalCancellationToken.IsCancellationRequested) //case#3 1041 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken); 1042 else //case#4 1043 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); 1044 } 1045 } 1046 1047 Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); 1048 1049 if (index == WaitHandle.WaitTimeout) //case#2 1050 return OPERATION_FAILED; 1051 1052 //If the timeout period was not exhausted and the appropriate operation succeeded. 1053 if (collections[index].TryAdd(item)) //case#1 1054 return index; 1055 1056 // Update the timeout 1057 if (millisecondsTimeout != Timeout.Infinite) 1058 timeout = UpdateTimeOut(startTime, millisecondsTimeout); 1059 } 1060 1061 // case #2 1062 return OPERATION_FAILED; 1063 } 1064 1065 /// <summary> 1066 /// Fast path for TryAddToAny to find a non bounded collection and add the items in it 1067 /// </summary> 1068 /// <param name="collections">The collections list</param> 1069 /// <param name="item">The item to be added</param> 1070 /// <returns>The index which the item has been added, -1 if failed</returns> TryAddToAnyFast(BlockingCollection<T>[] collections, T item)1071 private static int TryAddToAnyFast(BlockingCollection<T>[] collections, T item) 1072 { 1073 for (int i = 0; i < collections.Length; i++) 1074 { 1075 if (collections[i].m_freeNodes == null) 1076 { 1077 #if DEBUG 1078 bool result = 1079 #endif 1080 collections[i].TryAdd(item); 1081 #if DEBUG 1082 Debug.Assert(result); 1083 #endif 1084 return i; 1085 } 1086 } 1087 return -1; 1088 } 1089 /// <summary> 1090 /// Local static method, used by TryAddTakeAny to get the wait handles for the collection, with exclude option to exclude the Compeleted collections 1091 /// </summary> 1092 /// <param name="collections">The blocking collections</param> 1093 /// <param name="externalCancellationToken">The original CancellationToken</param> 1094 /// <param name="isAddOperation">True if Add or TryAdd, false if Take or TryTake</param> 1095 /// <param name="cancellationTokens">Complete list of cancellationTokens to observe</param> 1096 /// <returns>The collections wait handles</returns> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)1097 private static List<WaitHandle> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens) 1098 { 1099 1100 Debug.Assert(collections != null); 1101 List<WaitHandle> handlesList = new List<WaitHandle>(collections.Length + 1); // + 1 for the external token handle to be added 1102 List<CancellationToken> tokensList = new List<CancellationToken>(collections.Length + 1); // + 1 for the external token 1103 tokensList.Add(externalCancellationToken); 1104 1105 //Read the appropriate WaitHandle based on the operation mode. 1106 if (isAddOperation) 1107 { 1108 1109 for (int i = 0; i < collections.Length; i++) 1110 { 1111 if (collections[i].m_freeNodes != null) 1112 { 1113 handlesList.Add(collections[i].m_freeNodes.AvailableWaitHandle); 1114 tokensList.Add(collections[i].m_ProducersCancellationTokenSource.Token); 1115 } 1116 } 1117 } 1118 else 1119 { 1120 for (int i = 0; i < collections.Length; i++) 1121 { 1122 if (collections[i].IsCompleted) //exclude Completed collections if it is take operation 1123 continue; 1124 1125 handlesList.Add(collections[i].m_occupiedNodes.AvailableWaitHandle); 1126 tokensList.Add(collections[i].m_ConsumersCancellationTokenSource.Token); 1127 } 1128 } 1129 1130 cancellationTokens = tokensList.ToArray(); 1131 return handlesList; 1132 } 1133 1134 /// <summary> 1135 /// Helper function to measure and update the wait time 1136 /// </summary> 1137 /// <param name="startTime"> The first time (in milliseconds) observed when the wait started</param> 1138 /// <param name="originalWaitMillisecondsTimeout">The orginal wait timeoutout in milliseconds</param> 1139 /// <returns>The new wait time in milliseconds, -1 if the time expired</returns> UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)1140 private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout) 1141 { 1142 if (originalWaitMillisecondsTimeout == 0) 1143 { 1144 return 0; 1145 } 1146 // The function must be called in case the time out is not infinite 1147 Debug.Assert(originalWaitMillisecondsTimeout != Timeout.Infinite); 1148 1149 uint elapsedMilliseconds = (uint)Environment.TickCount - startTime; 1150 1151 // Check the elapsed milliseconds is greater than max int because this property is uint 1152 if (elapsedMilliseconds > int.MaxValue) 1153 { 1154 return 0; 1155 } 1156 1157 // Subtract the elapsed time from the current wait time 1158 int currentWaitTimeout = originalWaitMillisecondsTimeout - (int)elapsedMilliseconds; ; 1159 if (currentWaitTimeout <= 0) 1160 { 1161 return 0; 1162 } 1163 1164 return currentWaitTimeout; 1165 } 1166 /// <summary> 1167 /// Takes an item from any one of the specified 1168 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 1169 /// </summary> 1170 /// <param name="collections">The array of collections.</param> 1171 /// <param name="item">The item removed from one of the collections.</param> 1172 /// <returns>The index of the collection in the <paramref name="collections"/> array from which 1173 /// the item was removed, or -1 if an item could not be removed.</returns> 1174 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 1175 /// null.</exception> 1176 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 1177 /// a 0-length array or contains a null element.</exception> 1178 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 1179 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 1180 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified 1181 /// outside of its <see 1182 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 1183 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 1184 /// 62 for STA and 63 for MTA.</exception> 1185 /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks> TakeFromAny(BlockingCollection<T>[] collections, out T item)1186 public static int TakeFromAny(BlockingCollection<T>[] collections, out T item) 1187 { 1188 return TakeFromAny(collections, out item, CancellationToken.None); 1189 } 1190 1191 /// <summary> 1192 /// Takes an item from any one of the specified 1193 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 1194 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 1195 /// canceled. 1196 /// </summary> 1197 /// <param name="collections">The array of collections.</param> 1198 /// <param name="item">The item removed from one of the collections.</param> 1199 /// <param name="cancellationToken">A cancellation token to observe.</param> 1200 /// <returns>The index of the collection in the <paramref name="collections"/> array from which 1201 /// the item was removed, or -1 if an item could not be removed.</returns> 1202 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 1203 /// null.</exception> 1204 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 1205 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 1206 /// a 0-length array or contains a null element.</exception> 1207 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 1208 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 1209 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified 1210 /// outside of its <see 1211 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 1212 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 1213 /// 62 for STA and 63 for MTA.</exception> 1214 /// <remarks>A call to TakeFromAny may block until an item is available to be removed.</remarks> TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)1215 public static int TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken) 1216 { 1217 int returnValue = TryTakeFromAnyCore(collections, out item, Timeout.Infinite, true, cancellationToken); 1218 Debug.Assert((returnValue >= 0 && returnValue < collections.Length) 1219 , "TryTakeFromAny() was expected to return an index within the bounds of the collections array."); 1220 return returnValue; 1221 1222 } 1223 1224 /// <summary> 1225 /// Attempts to remove an item from any one of the specified 1226 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 1227 /// </summary> 1228 /// <param name="collections">The array of collections.</param> 1229 /// <param name="item">The item removed from one of the collections.</param> 1230 /// <returns>The index of the collection in the <paramref name="collections"/> array from which 1231 /// the item was removed, or -1 if an item could not be removed.</returns> 1232 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 1233 /// null.</exception> 1234 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 1235 /// a 0-length array or contains a null element.</exception> 1236 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 1237 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 1238 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified 1239 /// outside of its <see 1240 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 1241 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 1242 /// 62 for STA and 63 for MTA.</exception> 1243 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks> TryTakeFromAny(BlockingCollection<T>[] collections, out T item)1244 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item) 1245 { 1246 return TryTakeFromAny(collections, out item, 0); 1247 } 1248 1249 /// <summary> 1250 /// Attempts to remove an item from any one of the specified 1251 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 1252 /// </summary> 1253 /// <param name="collections">The array of collections.</param> 1254 /// <param name="item">The item removed from one of the collections.</param> 1255 /// <param name="timeout">A <see cref="System.TimeSpan"/> that represents the number of milliseconds 1256 /// to wait, or a <see cref="System.TimeSpan"/> that represents -1 milliseconds to wait indefinitely. 1257 /// </param> 1258 /// <returns>The index of the collection in the <paramref name="collections"/> array from which 1259 /// the item was removed, or -1 if an item could not be removed.</returns> 1260 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 1261 /// null.</exception> 1262 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 1263 /// a 0-length array or contains a null element.</exception> 1264 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 1265 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 1266 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/> is a negative number 1267 /// other than -1 milliseconds, which represents an infinite time-out -or- timeout is greater than 1268 /// <see cref="System.Int32.MaxValue"/>.</exception> 1269 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified 1270 /// outside of its <see 1271 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 1272 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 1273 /// 62 for STA and 63 for MTA.</exception> 1274 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks> TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan timeout)1275 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan timeout) 1276 { 1277 ValidateTimeout(timeout); 1278 return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, false, CancellationToken.None); 1279 } 1280 1281 /// <summary> 1282 /// Attempts to remove an item from any one of the specified 1283 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 1284 /// </summary> 1285 /// <param name="collections">The array of collections.</param> 1286 /// <param name="item">The item removed from one of the collections.</param> 1287 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 1288 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 1289 /// <returns>The index of the collection in the <paramref name="collections"/> array from which 1290 /// the item was removed, or -1 if an item could not be removed.</returns> 1291 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 1292 /// null.</exception> 1293 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 1294 /// a 0-length array or contains a null element.</exception> 1295 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 1296 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 1297 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 1298 /// negative number other than -1, which represents an infinite time-out.</exception> 1299 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified 1300 /// outside of its <see 1301 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 1302 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 1303 /// 62 for STA and 63 for MTA.</exception> 1304 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks> TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)1305 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout) 1306 { 1307 ValidateMillisecondsTimeout(millisecondsTimeout); 1308 return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, CancellationToken.None); 1309 } 1310 1311 /// <summary> 1312 /// Attempts to remove an item from any one of the specified 1313 /// <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances. 1314 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 1315 /// canceled. 1316 /// </summary> 1317 /// <param name="collections">The array of collections.</param> 1318 /// <param name="item">The item removed from one of the collections.</param> 1319 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 1320 /// cref="System.Threading.Timeout.Infinite"/> (-1) to wait indefinitely.</param> 1321 /// <param name="cancellationToken">A cancellation token to observe.</param> 1322 /// <returns>The index of the collection in the <paramref name="collections"/> array from which 1323 /// the item was removed, or -1 if an item could not be removed.</returns> 1324 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 1325 /// <exception cref="T:System.ArgumentNullException">The <paramref name="collections"/> argument is 1326 /// null.</exception> 1327 /// <exception cref="T:System.ArgumentException">The <paramref name="collections"/> argument is 1328 /// a 0-length array or contains a null element.</exception> 1329 /// <exception cref="T:System.ObjectDisposedException">At least one of the <see 1330 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances has been disposed.</exception> 1331 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 1332 /// negative number other than -1, which represents an infinite time-out.</exception> 1333 /// <exception cref="T:System.InvalidOperationException">At least one of the underlying collections was modified 1334 /// outside of its <see 1335 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance.</exception> 1336 /// <exception cref="T:System.ArgumentOutOfRangeException">The count of <paramref name="collections"/> is greater than the maximum size of 1337 /// 62 for STA and 63 for MTA.</exception> 1338 /// <remarks>A call to TryTakeFromAny may block until an item is available to be removed.</remarks> TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)1339 public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken) 1340 { 1341 ValidateMillisecondsTimeout(millisecondsTimeout); 1342 return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, false, cancellationToken); 1343 } 1344 1345 /// <summary>Takes an item from anyone of the specified collections. 1346 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 1347 /// canceled. 1348 /// </summary> 1349 /// <param name="collections">The collections from which the item can be removed.</param> 1350 /// <param name="item">The item removed and returned to the caller.</param> 1351 /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the 1352 /// operation, or -1 to wait indefinitely.</param> 1353 /// <param name="isTakeOperation">True if Take, false if TryTake.</param> 1354 /// <param name="externalCancellationToken">A cancellation token to observe.</param> 1355 /// <returns>The index into collections for the collection which accepted the 1356 /// removal of the item; -1 if the item could not be removed.</returns> 1357 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 1358 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception> 1359 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 1360 /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception> 1361 /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception> TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)1362 private static int TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken) 1363 { 1364 1365 ValidateCollectionsArray(collections, false); 1366 1367 //try the fast path first 1368 for (int i = 0; i < collections.Length; i++) 1369 { 1370 // Check if the collection is not completed, and potentially has at least one element by checking the semaphore count 1371 if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item)) 1372 return i; 1373 } 1374 1375 //Fast path failed, try the slow path 1376 return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken); 1377 } 1378 1379 1380 /// <summary>Takes an item from anyone of the specified collections. 1381 /// A <see cref="System.OperationCanceledException"/> is thrown if the <see cref="CancellationToken"/> is 1382 /// canceled. 1383 /// </summary> 1384 /// <param name="collections">The collections copy from which the item can be removed.</param> 1385 /// <param name="item">The item removed and returned to the caller.</param> 1386 /// <param name="millisecondsTimeout">The number of milliseconds to wait for a collection to accept the 1387 /// operation, or -1 to wait indefinitely.</param> 1388 /// <param name="isTakeOperation">True if Take, false if TryTake.</param> 1389 /// <param name="externalCancellationToken">A cancellation token to observe.</param> 1390 /// <returns>The index into collections for the collection which accepted the 1391 /// removal of the item; -1 if the item could not be removed.</returns> 1392 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> 1393 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception> 1394 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 1395 /// null element. Also, if atleast one of the collections has been marked complete for adds.</exception> 1396 /// <exception cref="System.ObjectDisposedException">If atleast one of the collections has been disposed.</exception> TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)1397 private static int TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken) 1398 { 1399 1400 const int OPERATION_FAILED = -1; 1401 1402 // Copy the wait time to another local variable to update it 1403 int timeout = millisecondsTimeout; 1404 1405 uint startTime = 0; 1406 if (millisecondsTimeout != Timeout.Infinite) 1407 { 1408 startTime = (uint)Environment.TickCount; 1409 } 1410 1411 1412 //Loop until one of these conditions is met: 1413 // 1- The operation is succeeded 1414 // 2- The timeout expired for try* versions 1415 // 3- The external token is cancelled, throw 1416 // 4- The operation is TryTake and all collections are marked as completed, return false 1417 // 5- The operation is Take and all collection are marked as completed, throw 1418 while (millisecondsTimeout == Timeout.Infinite || timeout >= 0) 1419 { 1420 // Get wait handles and the tokens for all collections, 1421 // and construct a single combined token from all the tokens, 1422 // add the combined token handle to the handles list 1423 // call WaitAny for all handles 1424 // After WaitAny returns check if the token is cancelled and that caused the WaitAny to return or not 1425 // If the combined token is cancelled, this mean either the external token is cancelled then throw OCE 1426 // or one if the collection is Completed then exclude it and retry 1427 CancellationToken[] collatedCancellationTokens; 1428 List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, false, out collatedCancellationTokens); 1429 1430 if (handles.Count == 0 && isTakeOperation) //case#5 1431 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CantTakeAnyWhenAllDone), "collections"); 1432 1433 else if (handles.Count == 0) //case#4 1434 break; 1435 1436 1437 //Wait for any collection to become available. 1438 using (CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(collatedCancellationTokens)) 1439 { 1440 handles.Add(linkedTokenSource.Token.WaitHandle); // add the combined token to the handles list 1441 int index = WaitHandle.WaitAny(handles.ToArray(), timeout, false); 1442 1443 if (linkedTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)//case#3 1444 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), externalCancellationToken); 1445 1446 1447 else if (!linkedTokenSource.IsCancellationRequested)// if no eiter internal or external cancellation trquested 1448 { 1449 Debug.Assert((index == WaitHandle.WaitTimeout) || (index >= 0 && index < handles.Count)); 1450 if (index == WaitHandle.WaitTimeout) //case#2 1451 break; 1452 1453 // adjust the index in case one or more handles removed because they are completed 1454 if (collections.Length != handles.Count - 1) // -1 because of the combined token handle 1455 { 1456 for (int i = 0; i < collections.Length; i++) 1457 { 1458 if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[index]) 1459 { 1460 index = i; 1461 break; 1462 } 1463 } 1464 } 1465 1466 if (collections[index].TryTake(out item)) //case#1 1467 return index; 1468 } 1469 } 1470 1471 // Update the timeout 1472 if (millisecondsTimeout != Timeout.Infinite) 1473 timeout = UpdateTimeOut(startTime, millisecondsTimeout); 1474 } 1475 1476 item = default(T); //case#2 1477 return OPERATION_FAILED; 1478 } 1479 1480 /// <summary> 1481 /// Marks the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instances 1482 /// as not accepting any more additions. 1483 /// </summary> 1484 /// <remarks> 1485 /// After a collection has been marked as complete for adding, adding to the collection is not permitted 1486 /// and attempts to remove from the collection will not wait when the collection is empty. 1487 /// </remarks> 1488 /// <exception cref="T:System.ObjectDisposedException">The <see 1489 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> CompleteAdding()1490 public void CompleteAdding() 1491 { 1492 CheckDisposed(); 1493 1494 if (IsAddingCompleted) 1495 return; 1496 1497 SpinWait spinner = new SpinWait(); 1498 while (true) 1499 { 1500 int observedAdders = m_currentAdders; 1501 if ((observedAdders & COMPLETE_ADDING_ON_MASK) != 0) 1502 { 1503 spinner.Reset(); 1504 // If there is another COmpleteAdding in progress waiting the current adders, then spin until it finishes 1505 while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); 1506 return; 1507 } 1508 1509 if (Interlocked.CompareExchange(ref m_currentAdders, observedAdders | COMPLETE_ADDING_ON_MASK, observedAdders) == observedAdders) 1510 { 1511 spinner.Reset(); 1512 while (m_currentAdders != COMPLETE_ADDING_ON_MASK) spinner.SpinOnce(); 1513 1514 if (Count == 0) 1515 { 1516 CancelWaitingConsumers(); 1517 } 1518 1519 // We should always wake waiting producers, and have them throw exceptions as 1520 // Add&CompleteAdding should not be used concurrently. 1521 CancelWaitingProducers(); 1522 return; 1523 1524 } 1525 spinner.SpinOnce(); 1526 } 1527 } 1528 1529 /// <summary>Cancels the semaphores.</summary> CancelWaitingConsumers()1530 private void CancelWaitingConsumers() 1531 { 1532 m_ConsumersCancellationTokenSource.Cancel(); 1533 } 1534 CancelWaitingProducers()1535 private void CancelWaitingProducers() 1536 { 1537 m_ProducersCancellationTokenSource.Cancel(); 1538 } 1539 1540 1541 /// <summary> 1542 /// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. 1543 /// </summary> Dispose()1544 public void Dispose() 1545 { 1546 Dispose(true); 1547 GC.SuppressFinalize(this); 1548 } 1549 1550 /// <summary> 1551 /// Releases resources used by the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. 1552 /// </summary> 1553 /// <param name="disposing">Whether being disposed explicitly (true) or due to a finalizer (false).</param> Dispose(bool disposing)1554 protected virtual void Dispose(bool disposing) 1555 { 1556 if (!m_isDisposed) 1557 { 1558 if (m_freeNodes != null) 1559 { 1560 m_freeNodes.Dispose(); 1561 } 1562 1563 m_occupiedNodes.Dispose(); 1564 1565 m_isDisposed = true; 1566 } 1567 } 1568 1569 /// <summary>Copies the items from the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance into a new array.</summary> 1570 /// <returns>An array containing copies of the elements of the collection.</returns> 1571 /// <exception cref="T:System.ObjectDisposedException">The <see 1572 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 1573 /// <remarks> 1574 /// The copied elements are not removed from the collection. 1575 /// </remarks> ToArray()1576 public T[] ToArray() 1577 { 1578 CheckDisposed(); 1579 return m_collection.ToArray(); 1580 } 1581 1582 /// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance 1583 /// to a compatible one-dimensional array, starting at the specified index of the target array. 1584 /// </summary> 1585 /// <param name="array">The one-dimensional array that is the destination of the elements copied from 1586 /// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param> 1587 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param> 1588 /// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is 1589 /// null.</exception> 1590 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception> 1591 /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater 1592 /// than the length of the <paramref name="array"/>.</exception> 1593 /// <exception cref="T:System.ObjectDisposedException">The <see 1594 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> CopyTo(T[] array, int index)1595 public void CopyTo(T[] array, int index) 1596 { 1597 ((ICollection)this).CopyTo(array, index); 1598 } 1599 1600 /// <summary>Copies all of the items in the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance 1601 /// to a compatible one-dimensional array, starting at the specified index of the target array. 1602 /// </summary> 1603 /// <param name="array">The one-dimensional array that is the destination of the elements copied from 1604 /// the <see cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> instance. The array must have zero-based indexing.</param> 1605 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param> 1606 /// <exception cref="T:System.ArgumentNullException">The <paramref name="array"/> argument is 1607 /// null.</exception> 1608 /// <exception cref="System.ArgumentOutOfRangeException">The <paramref name="index"/> argument is less than zero.</exception> 1609 /// <exception cref="System.ArgumentException">The <paramref name="index"/> argument is equal to or greater 1610 /// than the length of the <paramref name="array"/>, the array is multidimensional, or the type parameter for the collection 1611 /// cannot be cast automatically to the type of the destination array.</exception> 1612 /// <exception cref="T:System.ObjectDisposedException">The <see 1613 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> ICollection.CopyTo(Array array, int index)1614 void ICollection.CopyTo(Array array, int index) 1615 { 1616 CheckDisposed(); 1617 1618 //We don't call m_collection.CopyTo() directly because we rely on Array.Copy method to customize 1619 //all array exceptions. 1620 T[] collectionSnapShot = m_collection.ToArray(); 1621 1622 try 1623 { 1624 Array.Copy(collectionSnapShot, 0, array, index, collectionSnapShot.Length); 1625 } 1626 catch (ArgumentNullException) 1627 { 1628 throw new ArgumentNullException("array"); 1629 } 1630 catch (ArgumentOutOfRangeException) 1631 { 1632 throw new ArgumentOutOfRangeException("index", index, SR.GetString(SR.BlockingCollection_CopyTo_NonNegative)); 1633 } 1634 catch (ArgumentException) 1635 { 1636 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_TooManyElems), "index"); 1637 } 1638 catch (RankException) 1639 { 1640 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_MultiDim), "array"); 1641 } 1642 catch (InvalidCastException) 1643 { 1644 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); 1645 } 1646 catch (ArrayTypeMismatchException) 1647 { 1648 throw new ArgumentException(SR.GetString(SR.BlockingCollection_CopyTo_IncorrectType), "array"); 1649 } 1650 } 1651 1652 /// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection.</summary> 1653 /// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns> 1654 /// <exception cref="T:System.ObjectDisposedException">The <see 1655 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> GetConsumingEnumerable()1656 public IEnumerable<T> GetConsumingEnumerable() 1657 { 1658 return GetConsumingEnumerable(CancellationToken.None); 1659 } 1660 1661 /// <summary>Provides a consuming <see cref="T:System.Collections.Generics.IEnumerable{T}"/> for items in the collection. 1662 /// Calling MoveNext on the returned enumerable will block if there is no data available, or will 1663 /// throw an <see cref="System.OperationCanceledException"/> if the <see cref="CancellationToken"/> is canceled. 1664 /// </summary> 1665 /// <param name="cancellationToken">A cancellation token to observe.</param> 1666 /// <returns>An <see cref="T:System.Collections.Generics.IEnumerable{T}"/> that removes and returns items from the collection.</returns> 1667 /// <exception cref="T:System.ObjectDisposedException">The <see 1668 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> 1669 /// <exception cref="OperationCanceledException">If the <see cref="CancellationToken"/> is canceled.</exception> GetConsumingEnumerable(CancellationToken cancellationToken)1670 public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) 1671 { 1672 CancellationTokenSource linkedTokenSource = null; 1673 try 1674 { 1675 linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token); 1676 while (!IsCompleted) 1677 { 1678 T item; 1679 if (TryTakeWithNoTimeValidation(out item, Timeout.Infinite, cancellationToken, linkedTokenSource)) 1680 { 1681 yield return item; 1682 } 1683 } 1684 } 1685 finally 1686 { 1687 if (linkedTokenSource != null) 1688 { 1689 linkedTokenSource.Dispose(); 1690 } 1691 } 1692 } 1693 1694 /// <summary>Provides an <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for items in the collection.</summary> 1695 /// <returns>An <see cref="T:System.Collections.Generics.IEnumerator{T}"/> for the items in the collection.</returns> 1696 /// <exception cref="T:System.ObjectDisposedException">The <see 1697 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> GetEnumerator()1698 IEnumerator<T> IEnumerable<T>.GetEnumerator() 1699 { 1700 CheckDisposed(); 1701 return m_collection.GetEnumerator(); 1702 1703 } 1704 1705 /// <summary>Provides an <see cref="T:System.Collections.IEnumerator"/> for items in the collection.</summary> 1706 /// <returns>An <see cref="T:System.Collections.IEnumerator"/> for the items in the collection.</returns> 1707 /// <exception cref="T:System.ObjectDisposedException">The <see 1708 /// cref="T:System.Collections.Concurrent.BlockingCollection{T}"/> has been disposed.</exception> IEnumerable.GetEnumerator()1709 IEnumerator IEnumerable.GetEnumerator() 1710 { 1711 return ((IEnumerable<T>)this).GetEnumerator(); 1712 } 1713 1714 /// <summary>Centralizes the logic for validating the BlockingCollections array passed to TryAddToAny() 1715 /// and TryTakeFromAny().</summary> 1716 /// <param name="collections">The collections to/from which an item should be added/removed.</param> 1717 /// <param name="operationMode">Indicates whether this method is called to Add or Take.</param> 1718 /// <exception cref="System.ArgumentNullException">If the collections argument is null.</exception> 1719 /// <exception cref="System.ArgumentException">If the collections argument is a 0-length array or contains a 1720 /// null element. Also, if at least one of the collections has been marked complete for adds.</exception> 1721 /// <exception cref="System.ObjectDisposedException">If at least one of the collections has been disposed.</exception> ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation)1722 private static void ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation) 1723 { 1724 if (collections == null) 1725 { 1726 throw new ArgumentNullException("collections"); 1727 } 1728 else if (collections.Length < 1) 1729 { 1730 throw new ArgumentException( 1731 SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_ZeroSize), "collections"); 1732 } 1733 else if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62)) 1734 //The number of WaitHandles must be <= 64 for MTA, and <=63 for STA, and we reserve one for CancellationToken 1735 { 1736 throw new ArgumentOutOfRangeException( 1737 "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_LargeSize)); 1738 } 1739 1740 for (int i = 0; i < collections.Length; ++i) 1741 { 1742 if (collections[i] == null) 1743 { 1744 throw new ArgumentException( 1745 SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_NullElems), "collections"); 1746 } 1747 1748 if (collections[i].m_isDisposed) 1749 throw new ObjectDisposedException( 1750 "collections", SR.GetString(SR.BlockingCollection_ValidateCollectionsArray_DispElems)); 1751 1752 if (isAddOperation && collections[i].IsAddingCompleted) 1753 { 1754 throw new ArgumentException( 1755 SR.GetString(SR.BlockingCollection_CantAddAnyWhenCompleted), "collections"); 1756 } 1757 } 1758 } 1759 1760 private static bool IsSTAThread 1761 { 1762 get 1763 { 1764 #if !SILVERLIGHT 1765 return Thread.CurrentThread.GetApartmentState() == ApartmentState.STA; 1766 #else 1767 return false; 1768 #endif 1769 } 1770 } 1771 1772 // --------- 1773 // Private Helpers. 1774 /// <summary>Centeralizes the logic of validating the timeout input argument.</summary> 1775 /// <param name="timeout">The TimeSpan to wait for to successfully complete an operation on the collection.</param> 1776 /// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds represented by the timeout 1777 /// TimeSpan is less than 0 or is larger than Int32.MaxValue and not Timeout.Infinite</exception> ValidateTimeout(TimeSpan timeout)1778 private static void ValidateTimeout(TimeSpan timeout) 1779 { 1780 long totalMilliseconds = (long)timeout.TotalMilliseconds; 1781 if ((totalMilliseconds < 0 || totalMilliseconds > Int32.MaxValue) && (totalMilliseconds != Timeout.Infinite)) 1782 { 1783 throw new ArgumentOutOfRangeException("timeout", timeout, 1784 String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); 1785 } 1786 } 1787 1788 /// <summary>Centralizes the logic of validating the millisecondsTimeout input argument.</summary> 1789 /// <param name="millisecondsTimeout">The number of milliseconds to wait for to successfully complete an 1790 /// operation on the collection.</param> 1791 /// <exception cref="System.ArgumentOutOfRangeException">If the number of millseconds is less than 0 and not 1792 /// equal to Timeout.Infinite.</exception> ValidateMillisecondsTimeout(int millisecondsTimeout)1793 private static void ValidateMillisecondsTimeout(int millisecondsTimeout) 1794 { 1795 if ((millisecondsTimeout < 0) && (millisecondsTimeout != Timeout.Infinite)) 1796 { 1797 throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout, 1798 String.Format(CultureInfo.InvariantCulture, SR.GetString(SR.BlockingCollection_TimeoutInvalid), Int32.MaxValue)); 1799 } 1800 } 1801 1802 /// <summary>Throws a System.ObjectDisposedException if the collection was disposed</summary> 1803 /// <exception cref="System.ObjectDisposedException">If the collection has been disposed.</exception> CheckDisposed()1804 private void CheckDisposed() 1805 { 1806 if (m_isDisposed) 1807 { 1808 throw new ObjectDisposedException("BlockingCollection", SR.GetString(SR.BlockingCollection_Disposed)); 1809 } 1810 } 1811 1812 } 1813 1814 1815 1816 /// <summary>A debugger view of the blocking collection that makes it simple to browse the 1817 /// collection's contents at a point in time.</summary> 1818 /// <typeparam name="T">The type of element that the BlockingCollection will hold.</typeparam> 1819 internal sealed class SystemThreadingCollections_BlockingCollectionDebugView<T> 1820 { 1821 private BlockingCollection<T> m_blockingCollection; // The collection being viewed. 1822 1823 /// <summary>Constructs a new debugger view object for the provided blocking collection object.</summary> 1824 /// <param name="collection">A blocking collection to browse in the debugger.</param> SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection<T> collection)1825 public SystemThreadingCollections_BlockingCollectionDebugView(BlockingCollection<T> collection) 1826 { 1827 if (collection == null) 1828 { 1829 throw new ArgumentNullException("collection"); 1830 } 1831 1832 m_blockingCollection = collection; 1833 } 1834 1835 /// <summary>Returns a snapshot of the underlying collection's elements.</summary> 1836 [DebuggerBrowsable(DebuggerBrowsableState.RootHidden)] 1837 public T[] Items 1838 { 1839 get 1840 { 1841 return m_blockingCollection.ToArray(); 1842 } 1843 } 1844 1845 } 1846 } 1847 #pragma warning restore 0420 1848