1 // Licensed to the .NET Foundation under one or more agreements. 2 // The .NET Foundation licenses this file to you under the MIT license. 3 // See the LICENSE file in the project root for more information. 4 5 using System.Collections.Generic; 6 using System.Diagnostics; 7 using System.Diagnostics.Private; 8 using System.Runtime.InteropServices; 9 using System.Threading; 10 11 namespace System.Collections.Concurrent 12 { 13 /// <summary> 14 /// Represents a thread-safe first-in, first-out collection of objects. 15 /// </summary> 16 /// <typeparam name="T">Specifies the type of elements in the queue.</typeparam> 17 /// <remarks> 18 /// All public and protected members of <see cref="ConcurrentQueue{T}"/> are thread-safe and may be used 19 /// concurrently from multiple threads. 20 /// </remarks> 21 [DebuggerDisplay("Count = {Count}")] 22 [DebuggerTypeProxy(typeof(IProducerConsumerCollectionDebugView<>))] 23 [Serializable] 24 public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T> 25 { 26 // This implementation provides an unbounded, multi-producer multi-consumer queue 27 // that supports the standard Enqueue/TryDequeue operations, as well as support for 28 // snapshot enumeration (GetEnumerator, ToArray, CopyTo), peeking, and Count/IsEmpty. 29 // It is composed of a linked list of bounded ring buffers, each of which has a head 30 // and a tail index, isolated from each other to minimize false sharing. As long as 31 // the number of elements in the queue remains less than the size of the current 32 // buffer (Segment), no additional allocations are required for enqueued items. When 33 // the number of items exceeds the size of the current segment, the current segment is 34 // "frozen" to prevent further enqueues, and a new segment is linked from it and set 35 // as the new tail segment for subsequent enqueues. As old segments are consumed by 36 // dequeues, the head reference is updated to point to the segment that dequeuers should 37 // try next. To support snapshot enumeration, segments also support the notion of 38 // preserving for observation, whereby they avoid overwriting state as part of dequeues. 39 // Any operation that requires a snapshot results in all current segments being 40 // both frozen for enqueues and preserved for observation: any new enqueues will go 41 // to new segments, and dequeuers will consume from the existing segments but without 42 // overwriting the existing data. 43 44 /// <summary>Initial length of the segments used in the queue.</summary> 45 private const int InitialSegmentLength = 32; 46 /// <summary> 47 /// Maximum length of the segments used in the queue. This is a somewhat arbitrary limit: 48 /// larger means that as long as we don't exceed the size, we avoid allocating more segments, 49 /// but if we do exceed it, then the segment becomes garbage. 50 /// </summary> 51 private const int MaxSegmentLength = 1024 * 1024; 52 53 /// <summary> 54 /// Lock used to protect cross-segment operations, including any updates to <see cref="_tail"/> or <see cref="_head"/> 55 /// and any operations that need to get a consistent view of them. 56 /// </summary> 57 private object _crossSegmentLock; 58 /// <summary>The current tail segment.</summary> 59 private volatile Segment _tail; 60 /// <summary>The current head segment.</summary> 61 private volatile Segment _head; 62 63 /// <summary> 64 /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class. 65 /// </summary> ConcurrentQueue()66 public ConcurrentQueue() 67 { 68 _crossSegmentLock = new object(); 69 _tail = _head = new Segment(InitialSegmentLength); 70 } 71 72 /// <summary> 73 /// Initializes the contents of the queue from an existing collection. 74 /// </summary> 75 /// <param name="collection">A collection from which to copy elements.</param> InitializeFromCollection(IEnumerable<T> collection)76 private void InitializeFromCollection(IEnumerable<T> collection) 77 { 78 _crossSegmentLock = new object(); 79 80 // Determine the initial segment size. We'll use the default, 81 // unless the collection is known to be larger than than, in which 82 // case we round its length up to a power of 2, as all segments must 83 // be a power of 2 in length. 84 int length = InitialSegmentLength; 85 var c = collection as ICollection<T>; 86 if (c != null) 87 { 88 int count = c.Count; 89 if (count > length) 90 { 91 length = Math.Min(RoundUpToPowerOf2(count), MaxSegmentLength); 92 } 93 } 94 95 // Initialize the segment and add all of the data to it. 96 _tail = _head = new Segment(length); 97 foreach (T item in collection) 98 { 99 Enqueue(item); 100 } 101 } 102 103 /// <summary> 104 /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class that contains elements copied 105 /// from the specified collection. 106 /// </summary> 107 /// <param name="collection"> 108 /// The collection whose elements are copied to the new <see cref="ConcurrentQueue{T}"/>. 109 /// </param> 110 /// <exception cref="System.ArgumentNullException">The <paramref name="collection"/> argument is null.</exception> ConcurrentQueue(IEnumerable<T> collection)111 public ConcurrentQueue(IEnumerable<T> collection) 112 { 113 if (collection == null) 114 { 115 throw new ArgumentNullException(nameof(collection)); 116 } 117 118 InitializeFromCollection(collection); 119 } 120 121 /// <summary> 122 /// Copies the elements of the <see cref="ICollection"/> to an <see 123 /// cref="Array"/>, starting at a particular <see cref="Array"/> index. 124 /// </summary> 125 /// <param name="array"> 126 /// The one-dimensional <see cref="Array">Array</see> that is the destination of the 127 /// elements copied from the <see cref="ConcurrentQueue{T}"/>. <paramref name="array"/> must have 128 /// zero-based indexing. 129 /// </param> 130 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param> 131 /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in 132 /// Visual Basic).</exception> 133 /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than 134 /// zero.</exception> 135 /// <exception cref="ArgumentException"> 136 /// <paramref name="array"/> is multidimensional. -or- 137 /// <paramref name="array"/> does not have zero-based indexing. -or- 138 /// <paramref name="index"/> is equal to or greater than the length of the <paramref name="array"/> 139 /// -or- The number of elements in the source <see cref="ICollection"/> is 140 /// greater than the available space from <paramref name="index"/> to the end of the destination 141 /// <paramref name="array"/>. -or- The type of the source <see 142 /// cref="ICollection"/> cannot be cast automatically to the type of the 143 /// destination <paramref name="array"/>. 144 /// </exception> ICollection.CopyTo(Array array, int index)145 void ICollection.CopyTo(Array array, int index) 146 { 147 // Special-case when the Array is actually a T[], taking a faster path 148 T[] szArray = array as T[]; 149 if (szArray != null) 150 { 151 CopyTo(szArray, index); 152 return; 153 } 154 155 // Validate arguments. 156 if (array == null) 157 { 158 throw new ArgumentNullException(nameof(array)); 159 } 160 161 // Otherwise, fall back to the slower path that first copies the contents 162 // to an array, and then uses that array's non-generic CopyTo to do the copy. 163 ToArray().CopyTo(array, index); 164 } 165 166 /// <summary> 167 /// Gets a value indicating whether access to the <see cref="ICollection"/> is 168 /// synchronized with the SyncRoot. 169 /// </summary> 170 /// <value>true if access to the <see cref="ICollection"/> is synchronized 171 /// with the SyncRoot; otherwise, false. For <see cref="ConcurrentQueue{T}"/>, this property always 172 /// returns false.</value> 173 bool ICollection.IsSynchronized => false; // always false, as true implies synchronization via SyncRoot 174 175 /// <summary> 176 /// Gets an object that can be used to synchronize access to the <see 177 /// cref="ICollection"/>. This property is not supported. 178 /// </summary> 179 /// <exception cref="NotSupportedException">The SyncRoot property is not supported.</exception> 180 object ICollection.SyncRoot { get { throw new NotSupportedException(SR.ConcurrentCollection_SyncRoot_NotSupported); } } 181 182 /// <summary>Returns an enumerator that iterates through a collection.</summary> 183 /// <returns>An <see cref="IEnumerator"/> that can be used to iterate through the collection.</returns> IEnumerable.GetEnumerator()184 IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<T>)this).GetEnumerator(); 185 186 /// <summary> 187 /// Attempts to add an object to the <see cref="Concurrent.IProducerConsumerCollection{T}"/>. 188 /// </summary> 189 /// <param name="item">The object to add to the <see 190 /// cref="Concurrent.IProducerConsumerCollection{T}"/>. The value can be a null 191 /// reference (Nothing in Visual Basic) for reference types. 192 /// </param> 193 /// <returns>true if the object was added successfully; otherwise, false.</returns> 194 /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will always add the object to the 195 /// end of the <see cref="ConcurrentQueue{T}"/> 196 /// and return true.</remarks> TryAdd(T item)197 bool IProducerConsumerCollection<T>.TryAdd(T item) 198 { 199 Enqueue(item); 200 return true; 201 } 202 203 /// <summary> 204 /// Attempts to remove and return an object from the <see cref="Concurrent.IProducerConsumerCollection{T}"/>. 205 /// </summary> 206 /// <param name="item"> 207 /// When this method returns, if the operation was successful, <paramref name="item"/> contains the 208 /// object removed. If no object was available to be removed, the value is unspecified. 209 /// </param> 210 /// <returns>true if an element was removed and returned successfully; otherwise, false.</returns> 211 /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will attempt to remove the object 212 /// from the beginning of the <see cref="ConcurrentQueue{T}"/>. 213 /// </remarks> 214 bool IProducerConsumerCollection<T>.TryTake(out T item) => TryDequeue(out item); 215 216 /// <summary> 217 /// Gets a value that indicates whether the <see cref="ConcurrentQueue{T}"/> is empty. 218 /// </summary> 219 /// <value>true if the <see cref="ConcurrentQueue{T}"/> is empty; otherwise, false.</value> 220 /// <remarks> 221 /// For determining whether the collection contains any items, use of this property is recommended 222 /// rather than retrieving the number of items from the <see cref="Count"/> property and comparing it 223 /// to 0. However, as this collection is intended to be accessed concurrently, it may be the case 224 /// that another thread will modify the collection after <see cref="IsEmpty"/> returns, thus invalidating 225 /// the result. 226 /// </remarks> 227 public bool IsEmpty 228 { 229 get 230 { 231 // IsEmpty == !TryPeek. We use a "resultUsed:false" peek in order to avoid marking 232 // segments as preserved for observation, making IsEmpty a cheaper way than either 233 // TryPeek(out T) or Count == 0 to check whether any elements are in the queue. 234 T ignoredResult; 235 return !TryPeek(out ignoredResult, resultUsed: false); 236 } 237 } 238 239 /// <summary>Copies the elements stored in the <see cref="ConcurrentQueue{T}"/> to a new array.</summary> 240 /// <returns>A new array containing a snapshot of elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns> ToArray()241 public T[] ToArray() 242 { 243 // Snap the current contents for enumeration. 244 Segment head, tail; 245 int headHead, tailTail; 246 SnapForObservation(out head, out headHead, out tail, out tailTail); 247 248 // Count the number of items in that snapped set, and use it to allocate an 249 // array of the right size. 250 long count = GetCount(head, headHead, tail, tailTail); 251 T[] arr = new T[count]; 252 253 // Now enumerate the contents, copying each element into the array. 254 using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail)) 255 { 256 int i = 0; 257 while (e.MoveNext()) 258 { 259 arr[i++] = e.Current; 260 } 261 Debug.Assert(count == i); 262 } 263 264 // And return it. 265 return arr; 266 } 267 268 /// <summary> 269 /// Gets the number of elements contained in the <see cref="ConcurrentQueue{T}"/>. 270 /// </summary> 271 /// <value>The number of elements contained in the <see cref="ConcurrentQueue{T}"/>.</value> 272 /// <remarks> 273 /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/> 274 /// property is recommended rather than retrieving the number of items from the <see cref="Count"/> 275 /// property and comparing it to 0. 276 /// </remarks> 277 public int Count 278 { 279 get 280 { 281 Segment head, tail; 282 int headHead, headTail, tailHead, tailTail; 283 var spinner = new SpinWait(); 284 while (true) 285 { 286 // Capture the head and tail, as well as the head's head and tail. 287 head = _head; 288 tail = _tail; 289 headHead = Volatile.Read(ref head._headAndTail.Head); 290 headTail = Volatile.Read(ref head._headAndTail.Tail); 291 292 if (head == tail) 293 { 294 // There was a single segment in the queue. If the captured 295 // values still (or again) represent reality, return the segment's 296 // count. A single segment should be the most common case once the 297 // queue's size has stabilized after segments have grown to 298 // the point where growing is no longer needed. 299 if (head == _head && 300 head == _tail && 301 headHead == Volatile.Read(ref head._headAndTail.Head) && 302 headTail == Volatile.Read(ref head._headAndTail.Tail)) 303 { 304 return GetCount(head, headHead, headTail); 305 } 306 } 307 else if (head._nextSegment == tail) 308 { 309 // There were two segments in the queue. Get the positions 310 // from the tail, and if the captured values still (or again) match 311 // reality, return the sum of the counts from both segments. 312 tailHead = Volatile.Read(ref tail._headAndTail.Head); 313 tailTail = Volatile.Read(ref tail._headAndTail.Tail); 314 if (head == _head && 315 tail == _tail && 316 headHead == Volatile.Read(ref head._headAndTail.Head) && 317 headTail == Volatile.Read(ref head._headAndTail.Tail) && 318 tailHead == Volatile.Read(ref tail._headAndTail.Head) && 319 tailTail == Volatile.Read(ref tail._headAndTail.Tail)) 320 { 321 // We got stable values, so we can just compute the sizes based on those 322 // values and return the sum of the counts of the segments. 323 return GetCount(head, headHead, headTail) + GetCount(tail, tailHead, tailTail); 324 } 325 } 326 else 327 { 328 // There were more than two segments. Take the slower path, where we freeze the 329 // queue and then count the now stable segments. 330 SnapForObservation(out head, out headHead, out tail, out tailTail); 331 return unchecked((int)GetCount(head, headHead, tail, tailTail)); 332 } 333 334 // We raced with enqueues/dequeues and captured an inconsistent picture of the queue. 335 // Spin and try again. 336 spinner.SpinOnce(); 337 } 338 } 339 } 340 341 /// <summary>Computes the number of items in a segment based on a fixed head and tail in that segment.</summary> GetCount(Segment s, int head, int tail)342 private static int GetCount(Segment s, int head, int tail) 343 { 344 if (head != tail && head != tail - s.FreezeOffset) 345 { 346 head &= s._slotsMask; 347 tail &= s._slotsMask; 348 return head < tail ? tail - head : s._slots.Length - head + tail; 349 } 350 return 0; 351 } 352 353 /// <summary>Gets the number of items in snapped region.</summary> GetCount(Segment head, int headHead, Segment tail, int tailTail)354 private static long GetCount(Segment head, int headHead, Segment tail, int tailTail) 355 { 356 // All of the segments should have been both frozen for enqueues and preserved for observation. 357 // Validate that here for head and tail; we'll validate it for intermediate segments later. 358 Debug.Assert(head._preservedForObservation); 359 Debug.Assert(head._frozenForEnqueues); 360 Debug.Assert(tail._preservedForObservation); 361 Debug.Assert(tail._frozenForEnqueues); 362 363 long count = 0; 364 365 // Head segment. We've already marked it as frozen for enqueues, so its tail position is fixed, 366 // and we've already marked it as preserved for observation (before we grabbed the head), so we 367 // can safely enumerate from its head to its tail and access its elements. 368 int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset; 369 if (headHead < headTail) 370 { 371 // Mask the head and tail for the head segment 372 headHead &= head._slotsMask; 373 headTail &= head._slotsMask; 374 375 // Increase the count by either the one or two regions, based on whether tail 376 // has wrapped to be less than head. 377 count += headHead < headTail ? 378 headTail - headHead : 379 head._slots.Length - headHead + headTail; 380 } 381 382 // We've enumerated the head. If the tail is different from the head, we need to 383 // enumerate the remaining segments. 384 if (head != tail) 385 { 386 // Count the contents of each segment between head and tail, not including head and tail. 387 // Since there were segments before these, for our purposes we consider them to start at 388 // the 0th element, and since there is at least one segment after each, each was frozen 389 // by the time we snapped it, so we can iterate until each's frozen tail. 390 for (Segment s = head._nextSegment; s != tail; s = s._nextSegment) 391 { 392 Debug.Assert(s._preservedForObservation); 393 Debug.Assert(s._frozenForEnqueues); 394 count += s._headAndTail.Tail - s.FreezeOffset; 395 } 396 397 // Finally, enumerate the tail. As with the intermediate segments, there were segments 398 // before this in the snapped region, so we can start counting from the beginning. Unlike 399 // the intermediate segments, we can't just go until the Tail, as that could still be changing; 400 // instead we need to go until the tail we snapped for observation. 401 count += tailTail - tail.FreezeOffset; 402 } 403 404 // Return the computed count. 405 return count; 406 } 407 408 /// <summary> 409 /// Copies the <see cref="ConcurrentQueue{T}"/> elements to an existing one-dimensional <see 410 /// cref="Array">Array</see>, starting at the specified array index. 411 /// </summary> 412 /// <param name="array">The one-dimensional <see cref="Array">Array</see> that is the 413 /// destination of the elements copied from the 414 /// <see cref="ConcurrentQueue{T}"/>. The <see cref="Array">Array</see> must have zero-based 415 /// indexing.</param> 416 /// <param name="index">The zero-based index in <paramref name="array"/> at which copying 417 /// begins.</param> 418 /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in 419 /// Visual Basic).</exception> 420 /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than 421 /// zero.</exception> 422 /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the 423 /// length of the <paramref name="array"/> 424 /// -or- The number of elements in the source <see cref="ConcurrentQueue{T}"/> is greater than the 425 /// available space from <paramref name="index"/> to the end of the destination <paramref 426 /// name="array"/>. 427 /// </exception> CopyTo(T[] array, int index)428 public void CopyTo(T[] array, int index) 429 { 430 if (array == null) 431 { 432 throw new ArgumentNullException(nameof(array)); 433 } 434 if (index < 0) 435 { 436 throw new ArgumentOutOfRangeException(nameof(index), SR.Collection_CopyTo_ArgumentOutOfRangeException); 437 } 438 439 // Snap for enumeration 440 Segment head, tail; 441 int headHead, tailTail; 442 SnapForObservation(out head, out headHead, out tail, out tailTail); 443 444 // Get the number of items to be enumerated 445 long count = GetCount(head, headHead, tail, tailTail); 446 if (index > array.Length - count) 447 { 448 throw new ArgumentException(SR.Collection_CopyTo_TooManyElems); 449 } 450 451 // Copy the items to the target array 452 int i = index; 453 using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail)) 454 { 455 while (e.MoveNext()) 456 { 457 array[i++] = e.Current; 458 } 459 } 460 Debug.Assert(count == i - index); 461 } 462 463 /// <summary>Returns an enumerator that iterates through the <see cref="ConcurrentQueue{T}"/>.</summary> 464 /// <returns>An enumerator for the contents of the <see 465 /// cref="ConcurrentQueue{T}"/>.</returns> 466 /// <remarks> 467 /// The enumeration represents a moment-in-time snapshot of the contents 468 /// of the queue. It does not reflect any updates to the collection after 469 /// <see cref="GetEnumerator"/> was called. The enumerator is safe to use 470 /// concurrently with reads from and writes to the queue. 471 /// </remarks> GetEnumerator()472 public IEnumerator<T> GetEnumerator() 473 { 474 Segment head, tail; 475 int headHead, tailTail; 476 SnapForObservation(out head, out headHead, out tail, out tailTail); 477 return Enumerate(head, headHead, tail, tailTail); 478 } 479 480 /// <summary> 481 /// Gets the head and tail information of the current contents of the queue. 482 /// After this call returns, the specified region can be enumerated any number 483 /// of times and will not change. 484 /// </summary> SnapForObservation(out Segment head, out int headHead, out Segment tail, out int tailTail)485 private void SnapForObservation(out Segment head, out int headHead, out Segment tail, out int tailTail) 486 { 487 lock (_crossSegmentLock) // _head and _tail may only change while the lock is held. 488 { 489 // Snap the head and tail 490 head = _head; 491 tail = _tail; 492 Debug.Assert(head != null); 493 Debug.Assert(tail != null); 494 Debug.Assert(tail._nextSegment == null); 495 496 // Mark them and all segments in between as preserving, and ensure no additional items 497 // can be added to the tail. 498 for (Segment s = head; ; s = s._nextSegment) 499 { 500 s._preservedForObservation = true; 501 if (s == tail) break; 502 Debug.Assert(s._frozenForEnqueues); // any non-tail should already be marked 503 } 504 tail.EnsureFrozenForEnqueues(); // we want to prevent the tailTail from moving 505 506 // At this point, any dequeues from any segment won't overwrite the value, and 507 // none of the existing segments can have new items enqueued. 508 509 headHead = Volatile.Read(ref head._headAndTail.Head); 510 tailTail = Volatile.Read(ref tail._headAndTail.Tail); 511 } 512 } 513 514 /// <summary>Gets the item stored in the <paramref name="i"/>th entry in <paramref name="segment"/>.</summary> GetItemWhenAvailable(Segment segment, int i)515 private T GetItemWhenAvailable(Segment segment, int i) 516 { 517 Debug.Assert(segment._preservedForObservation); 518 519 // Get the expected value for the sequence number 520 int expectedSequenceNumberAndMask = (i + 1) & segment._slotsMask; 521 522 // If the expected sequence number is not yet written, we're still waiting for 523 // an enqueuer to finish storing it. Spin until it's there. 524 if ((segment._slots[i].SequenceNumber & segment._slotsMask) != expectedSequenceNumberAndMask) 525 { 526 var spinner = new SpinWait(); 527 while ((Volatile.Read(ref segment._slots[i].SequenceNumber) & segment._slotsMask) != expectedSequenceNumberAndMask) 528 { 529 spinner.SpinOnce(); 530 } 531 } 532 533 // Return the value from the slot. 534 return segment._slots[i].Item; 535 } 536 Enumerate(Segment head, int headHead, Segment tail, int tailTail)537 private IEnumerator<T> Enumerate(Segment head, int headHead, Segment tail, int tailTail) 538 { 539 Debug.Assert(head._preservedForObservation); 540 Debug.Assert(head._frozenForEnqueues); 541 Debug.Assert(tail._preservedForObservation); 542 Debug.Assert(tail._frozenForEnqueues); 543 544 // Head segment. We've already marked it as not accepting any more enqueues, 545 // so its tail position is fixed, and we've already marked it as preserved for 546 // enumeration (before we grabbed its head), so we can safely enumerate from 547 // its head to its tail. 548 int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset; 549 if (headHead < headTail) 550 { 551 headHead &= head._slotsMask; 552 headTail &= head._slotsMask; 553 554 if (headHead < headTail) 555 { 556 for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i); 557 } 558 else 559 { 560 for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i); 561 for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i); 562 } 563 } 564 565 // We've enumerated the head. If the tail is the same, we're done. 566 if (head != tail) 567 { 568 // Each segment between head and tail, not including head and tail. Since there were 569 // segments before these, for our purposes we consider it to start at the 0th element. 570 for (Segment s = head._nextSegment; s != tail; s = s._nextSegment) 571 { 572 Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration"); 573 Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate"); 574 575 int sTail = s._headAndTail.Tail - s.FreezeOffset; 576 for (int i = 0; i < sTail; i++) 577 { 578 yield return GetItemWhenAvailable(s, i); 579 } 580 } 581 582 // Enumerate the tail. Since there were segments before this, we can just start at 583 // its beginning, and iterate until the tail we already grabbed. 584 tailTail -= tail.FreezeOffset; 585 for (int i = 0; i < tailTail; i++) 586 { 587 yield return GetItemWhenAvailable(tail, i); 588 } 589 } 590 } 591 592 /// <summary>Round the specified value up to the next power of 2, if it isn't one already.</summary> RoundUpToPowerOf2(int i)593 private static int RoundUpToPowerOf2(int i) 594 { 595 --i; 596 i |= i >> 1; 597 i |= i >> 2; 598 i |= i >> 4; 599 i |= i >> 8; 600 i |= i >> 16; 601 return i + 1; 602 } 603 604 /// <summary>Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.</summary> 605 /// <param name="item"> 606 /// The object to add to the end of the <see cref="ConcurrentQueue{T}"/>. 607 /// The value can be a null reference (Nothing in Visual Basic) for reference types. 608 /// </param> Enqueue(T item)609 public void Enqueue(T item) 610 { 611 // Try to enqueue to the current tail. 612 if (!_tail.TryEnqueue(item)) 613 { 614 // If we're unable to, we need to take a slow path that will 615 // try to add a new tail segment. 616 EnqueueSlow(item); 617 } 618 } 619 620 /// <summary>Adds to the end of the queue, adding a new segment if necessary.</summary> EnqueueSlow(T item)621 private void EnqueueSlow(T item) 622 { 623 while (true) 624 { 625 Segment tail = _tail; 626 627 // Try to append to the existing tail. 628 if (tail.TryEnqueue(item)) 629 { 630 return; 631 } 632 633 // If we were unsuccessful, take the lock so that we can compare and manipulate 634 // the tail. Assuming another enqueuer hasn't already added a new segment, 635 // do so, then loop around to try enqueueing again. 636 lock (_crossSegmentLock) 637 { 638 if (tail == _tail) 639 { 640 // Make sure no one else can enqueue to this segment. 641 tail.EnsureFrozenForEnqueues(); 642 643 // We determine the new segment's length based on the old length. 644 // In general, we double the size of the segment, to make it less likely 645 // that we'll need to grow again. However, if the tail segment is marked 646 // as preserved for observation, something caused us to avoid reusing this 647 // segment, and if that happens a lot and we grow, we'll end up allocating 648 // lots of wasted space. As such, in such situations we reset back to the 649 // initial segment length; if these observations are happening frequently, 650 // this will help to avoid wasted memory, and if they're not, we'll 651 // relatively quickly grow again to a larger size. 652 int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength); 653 var newTail = new Segment(nextSize); 654 655 // Hook up the new tail. 656 tail._nextSegment = newTail; 657 _tail = newTail; 658 } 659 } 660 } 661 } 662 663 /// <summary> 664 /// Attempts to remove and return the object at the beginning of the <see 665 /// cref="ConcurrentQueue{T}"/>. 666 /// </summary> 667 /// <param name="result"> 668 /// When this method returns, if the operation was successful, <paramref name="result"/> contains the 669 /// object removed. If no object was available to be removed, the value is unspecified. 670 /// </param> 671 /// <returns> 672 /// true if an element was removed and returned from the beginning of the 673 /// <see cref="ConcurrentQueue{T}"/> successfully; otherwise, false. 674 /// </returns> 675 public bool TryDequeue(out T result) => 676 _head.TryDequeue(out result) || // fast-path that operates just on the head segment 677 TryDequeueSlow(out result); // slow path that needs to fix up segments 678 679 /// <summary>Tries to dequeue an item, removing empty segments as needed.</summary> TryDequeueSlow(out T item)680 private bool TryDequeueSlow(out T item) 681 { 682 while (true) 683 { 684 // Get the current head 685 Segment head = _head; 686 687 // Try to take. If we're successful, we're done. 688 if (head.TryDequeue(out item)) 689 { 690 return true; 691 } 692 693 // Check to see whether this segment is the last. If it is, we can consider 694 // this to be a moment-in-time empty condition (even though between the TryDequeue 695 // check and this check, another item could have arrived). 696 if (head._nextSegment == null) 697 { 698 item = default(T); 699 return false; 700 } 701 702 // At this point we know that head.Next != null, which means 703 // this segment has been frozen for additional enqueues. But between 704 // the time that we ran TryDequeue and checked for a next segment, 705 // another item could have been added. Try to dequeue one more time 706 // to confirm that the segment is indeed empty. 707 Debug.Assert(head._frozenForEnqueues); 708 if (head.TryDequeue(out item)) 709 { 710 return true; 711 } 712 713 // This segment is frozen (nothing more can be added) and empty (nothing is in it). 714 // Update head to point to the next segment in the list, assuming no one's beat us to it. 715 lock (_crossSegmentLock) 716 { 717 if (head == _head) 718 { 719 _head = head._nextSegment; 720 } 721 } 722 } 723 } 724 725 /// <summary> 726 /// Attempts to return an object from the beginning of the <see cref="ConcurrentQueue{T}"/> 727 /// without removing it. 728 /// </summary> 729 /// <param name="result"> 730 /// When this method returns, <paramref name="result"/> contains an object from 731 /// the beginning of the <see cref="Concurrent.ConcurrentQueue{T}"/> or default(T) 732 /// if the operation failed. 733 /// </param> 734 /// <returns>true if and object was returned successfully; otherwise, false.</returns> 735 /// <remarks> 736 /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/> 737 /// property is recommended rather than peeking. 738 /// </remarks> 739 public bool TryPeek(out T result) => TryPeek(out result, resultUsed: true); 740 741 /// <summary>Attempts to retrieve the value for the first element in the queue.</summary> 742 /// <param name="result">The value of the first element, if found.</param> 743 /// <param name="resultUsed">true if the result is neede; otherwise false if only the true/false outcome is needed.</param> 744 /// <returns>true if an element was found; otherwise, false.</returns> TryPeek(out T result, bool resultUsed)745 private bool TryPeek(out T result, bool resultUsed) 746 { 747 // Starting with the head segment, look through all of the segments 748 // for the first one we can find that's not empty. 749 Segment s = _head; 750 while (true) 751 { 752 // Grab the next segment from this one, before we peek. 753 // This is to be able to see whether the value has changed 754 // during the peek operation. 755 Segment next = Volatile.Read(ref s._nextSegment); 756 757 // Peek at the segment. If we find an element, we're done. 758 if (s.TryPeek(out result, resultUsed)) 759 { 760 return true; 761 } 762 763 // The current segment was empty at the moment we checked. 764 765 if (next != null) 766 { 767 // If prior to the peek there was already a next segment, then 768 // during the peek no additional items could have been enqueued 769 // to it and we can just move on to check the next segment. 770 Debug.Assert(next == s._nextSegment); 771 s = next; 772 } 773 else if (Volatile.Read(ref s._nextSegment) == null) 774 { 775 // The next segment is null. Nothing more to peek at. 776 break; 777 } 778 779 // The next segment was null before we peeked but non-null after. 780 // That means either when we peeked the first segment had 781 // already been frozen but the new segment not yet added, 782 // or that the first segment was empty and between the time 783 // that we peeked and then checked _nextSegment, so many items 784 // were enqueued that we filled the first segment and went 785 // into the next. Since we need to peek in order, we simply 786 // loop around again to peek on the same segment. The next 787 // time around on this segment we'll then either successfully 788 // peek or we'll find that next was non-null before peeking, 789 // and we'll traverse to that segment. 790 } 791 792 result = default(T); 793 return false; 794 } 795 796 /// <summary> 797 /// Removes all objects from the <see cref="ConcurrentQueue{T}"/>. 798 /// </summary> Clear()799 public void Clear() 800 { 801 lock (_crossSegmentLock) 802 { 803 // Simply substitute a new segment for the existing head/tail, 804 // as is done in the constructor. Operations currently in flight 805 // may still read from or write to an existing segment that's 806 // getting dropped, meaning that in flight operations may not be 807 // linear with regards to this clear operation. To help mitigate 808 // in-flight operations enqueuing onto the tail that's about to 809 // be dropped, we first freeze it; that'll force enqueuers to take 810 // this lock to synchronize and see the new tail. 811 _tail.EnsureFrozenForEnqueues(); 812 _tail = _head = new Segment(InitialSegmentLength); 813 } 814 } 815 816 /// <summary> 817 /// Provides a multi-producer, multi-consumer thread-safe bounded segment. When the queue is full, 818 /// enqueues fail and return false. When the queue is empty, dequeues fail and return null. 819 /// These segments are linked together to form the unbounded <see cref="ConcurrentQueue{T}"/>. 820 /// </summary> 821 [DebuggerDisplay("Capacity = {Capacity}")] 822 private sealed class Segment 823 { 824 // Segment design is inspired by the algorithm outlined at: 825 // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue 826 827 /// <summary>The array of items in this queue. Each slot contains the item in that slot and its "sequence number".</summary> 828 internal readonly Slot[] _slots; 829 /// <summary>Mask for quickly accessing a position within the queue's array.</summary> 830 internal readonly int _slotsMask; 831 /// <summary>The head and tail positions, with padding to help avoid false sharing contention.</summary> 832 /// <remarks>Dequeuing happens from the head, enqueuing happens at the tail.</remarks> 833 internal PaddedHeadAndTail _headAndTail; // mutable struct: do not make this readonly 834 835 /// <summary>Indicates whether the segment has been marked such that dequeues don't overwrite the removed data.</summary> 836 internal bool _preservedForObservation; 837 /// <summary>Indicates whether the segment has been marked such that no additional items may be enqueued.</summary> 838 internal bool _frozenForEnqueues; 839 /// <summary>The segment following this one in the queue, or null if this segment is the last in the queue.</summary> 840 internal Segment _nextSegment; 841 842 /// <summary>Creates the segment.</summary> 843 /// <param name="boundedLength"> 844 /// The maximum number of elements the segment can contain. Must be a power of 2. 845 /// </param> Segment(int boundedLength)846 public Segment(int boundedLength) 847 { 848 // Validate the length 849 Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}"); 850 Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}"); 851 852 // Initialize the slots and the mask. The mask is used as a way of quickly doing "% _slots.Length", 853 // instead letting us do "& _slotsMask". 854 _slots = new Slot[boundedLength]; 855 _slotsMask = boundedLength - 1; 856 857 // Initialize the sequence number for each slot. The sequence number provides a ticket that 858 // allows dequeuers to know whether they can dequeue and enqueuers to know whether they can 859 // enqueue. An enqueuer at position N can enqueue when the sequence number is N, and a dequeuer 860 // for position N can dequeue when the sequence number is N + 1. When an enqueuer is done writing 861 // at position N, it sets the sequence number to N + 1 so that a dequeuer will be able to dequeue, 862 // and when a dequeuer is done dequeueing at position N, it sets the sequence number to N + _slots.Length, 863 // so that when an enqueuer loops around the slots, it'll find that the sequence number at 864 // position N is N. This also means that when an enqueuer finds that at position N the sequence 865 // number is < N, there is still a value in that slot, i.e. the segment is full, and when a 866 // dequeuer finds that the value in a slot is < N + 1, there is nothing currently available to 867 // dequeue. (It is possible for multiple enqueuers to enqueue concurrently, writing into 868 // subsequent slots, and to have the first enqueuer take longer, so that the slots for 1, 2, 3, etc. 869 // may have values, but the 0th slot may still be being filled... in that case, TryDequeue will 870 // return false.) 871 for (int i = 0; i < _slots.Length; i++) 872 { 873 _slots[i].SequenceNumber = i; 874 } 875 } 876 877 /// <summary>Gets the number of elements this segment can store.</summary> 878 internal int Capacity => _slots.Length; 879 880 /// <summary>Gets the "freeze offset" for this segment.</summary> 881 internal int FreezeOffset => _slots.Length * 2; 882 883 /// <summary> 884 /// Ensures that the segment will not accept any subsequent enqueues that aren't already underway. 885 /// </summary> 886 /// <remarks> 887 /// When we mark a segment as being frozen for additional enqueues, 888 /// we set the <see cref="_frozenForEnqueues"/> bool, but that's mostly 889 /// as a small helper to avoid marking it twice. The real marking comes 890 /// by modifying the Tail for the segment, increasing it by this 891 /// <see cref="FreezeOffset"/>. This effectively knocks it off the 892 /// sequence expected by future enqueuers, such that any additional enqueuer 893 /// will be unable to enqueue due to it not lining up with the expected 894 /// sequence numbers. This value is chosen specially so that Tail will grow 895 /// to a value that maps to the same slot but that won't be confused with 896 /// any other enqueue/dequeue sequence number. 897 /// </remarks> EnsureFrozenForEnqueues()898 internal void EnsureFrozenForEnqueues() // must only be called while queue's segment lock is held 899 { 900 if (!_frozenForEnqueues) // flag used to ensure we don't increase the Tail more than once if frozen more than once 901 { 902 _frozenForEnqueues = true; 903 904 // Increase the tail by FreezeOffset, spinning until we're successful in doing so. 905 var spinner = new SpinWait(); 906 while (true) 907 { 908 int tail = Volatile.Read(ref _headAndTail.Tail); 909 if (Interlocked.CompareExchange(ref _headAndTail.Tail, tail + FreezeOffset, tail) == tail) 910 { 911 break; 912 } 913 spinner.SpinOnce(); 914 } 915 } 916 } 917 918 /// <summary>Tries to dequeue an element from the queue.</summary> TryDequeue(out T item)919 public bool TryDequeue(out T item) 920 { 921 // Loop in case of contention... 922 var spinner = new SpinWait(); 923 while (true) 924 { 925 // Get the head at which to try to dequeue. 926 int currentHead = Volatile.Read(ref _headAndTail.Head); 927 int slotsIndex = currentHead & _slotsMask; 928 929 // Read the sequence number for the head position. 930 int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber); 931 932 // We can dequeue from this slot if it's been filled by an enqueuer, which 933 // would have left the sequence number at pos+1. 934 int diff = sequenceNumber - (currentHead + 1); 935 if (diff == 0) 936 { 937 // We may be racing with other dequeuers. Try to reserve the slot by incrementing 938 // the head. Once we've done that, no one else will be able to read from this slot, 939 // and no enqueuer will be able to read from this slot until we've written the new 940 // sequence number. WARNING: The next few lines are not reliable on a runtime that 941 // supports thread aborts. If a thread abort were to sneak in after the CompareExchange 942 // but before the Volatile.Write, enqueuers trying to enqueue into this slot would 943 // spin indefinitely. If this implementation is ever used on such a platform, this 944 // if block should be wrapped in a finally / prepared region. 945 if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead) 946 { 947 // Successfully reserved the slot. Note that after the above CompareExchange, other threads 948 // trying to dequeue from this slot will end up spinning until we do the subsequent Write. 949 item = _slots[slotsIndex].Item; 950 if (!Volatile.Read(ref _preservedForObservation)) 951 { 952 // If we're preserving, though, we don't zero out the slot, as we need it for 953 // enumerations, peeking, ToArray, etc. And we don't update the sequence number, 954 // so that an enqueuer will see it as full and be forced to move to a new segment. 955 _slots[slotsIndex].Item = default(T); 956 Volatile.Write(ref _slots[slotsIndex].SequenceNumber, currentHead + _slots.Length); 957 } 958 return true; 959 } 960 } 961 else if (diff < 0) 962 { 963 // The sequence number was less than what we needed, which means this slot doesn't 964 // yet contain a value we can dequeue, i.e. the segment is empty. Technically it's 965 // possible that multiple enqueuers could have written concurrently, with those 966 // getting later slots actually finishing first, so there could be elements after 967 // this one that are available, but we need to dequeue in order. So before declaring 968 // failure and that the segment is empty, we check the tail to see if we're actually 969 // empty or if we're just waiting for items in flight or after this one to become available. 970 bool frozen = _frozenForEnqueues; 971 int currentTail = Volatile.Read(ref _headAndTail.Tail); 972 if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))) 973 { 974 item = default(T); 975 return false; 976 } 977 978 // It's possible it could have become frozen after we checked _frozenForEnqueues 979 // and before reading the tail. That's ok: in that rare race condition, we just 980 // loop around again. 981 } 982 983 // Lost a race. Spin a bit, then try again. 984 spinner.SpinOnce(); 985 } 986 } 987 988 /// <summary>Tries to peek at an element from the queue, without removing it.</summary> TryPeek(out T result, bool resultUsed)989 public bool TryPeek(out T result, bool resultUsed) 990 { 991 if (resultUsed) 992 { 993 // In order to ensure we don't get a torn read on the value, we mark the segment 994 // as preserving for observation. Additional items can still be enqueued to this 995 // segment, but no space will be freed during dequeues, such that the segment will 996 // no longer be reusable. 997 _preservedForObservation = true; 998 Interlocked.MemoryBarrier(); 999 } 1000 1001 // Loop in case of contention... 1002 var spinner = new SpinWait(); 1003 while (true) 1004 { 1005 // Get the head at which to try to peek. 1006 int currentHead = Volatile.Read(ref _headAndTail.Head); 1007 int slotsIndex = currentHead & _slotsMask; 1008 1009 // Read the sequence number for the head position. 1010 int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber); 1011 1012 // We can peek from this slot if it's been filled by an enqueuer, which 1013 // would have left the sequence number at pos+1. 1014 int diff = sequenceNumber - (currentHead + 1); 1015 if (diff == 0) 1016 { 1017 result = resultUsed ? _slots[slotsIndex].Item : default(T); 1018 return true; 1019 } 1020 else if (diff < 0) 1021 { 1022 // The sequence number was less than what we needed, which means this slot doesn't 1023 // yet contain a value we can peek, i.e. the segment is empty. Technically it's 1024 // possible that multiple enqueuers could have written concurrently, with those 1025 // getting later slots actually finishing first, so there could be elements after 1026 // this one that are available, but we need to peek in order. So before declaring 1027 // failure and that the segment is empty, we check the tail to see if we're actually 1028 // empty or if we're just waiting for items in flight or after this one to become available. 1029 bool frozen = _frozenForEnqueues; 1030 int currentTail = Volatile.Read(ref _headAndTail.Tail); 1031 if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0))) 1032 { 1033 result = default(T); 1034 return false; 1035 } 1036 1037 // It's possible it could have become frozen after we checked _frozenForEnqueues 1038 // and before reading the tail. That's ok: in that rare race condition, we just 1039 // loop around again. 1040 } 1041 1042 // Lost a race. Spin a bit, then try again. 1043 spinner.SpinOnce(); 1044 } 1045 } 1046 1047 /// <summary> 1048 /// Attempts to enqueue the item. If successful, the item will be stored 1049 /// in the queue and true will be returned; otherwise, the item won't be stored, and false 1050 /// will be returned. 1051 /// </summary> TryEnqueue(T item)1052 public bool TryEnqueue(T item) 1053 { 1054 // Loop in case of contention... 1055 var spinner = new SpinWait(); 1056 while (true) 1057 { 1058 // Get the tail at which to try to return. 1059 int currentTail = Volatile.Read(ref _headAndTail.Tail); 1060 int slotsIndex = currentTail & _slotsMask; 1061 1062 // Read the sequence number for the tail position. 1063 int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber); 1064 1065 // The slot is empty and ready for us to enqueue into it if its sequence 1066 // number matches the slot. 1067 int diff = sequenceNumber - currentTail; 1068 if (diff == 0) 1069 { 1070 // We may be racing with other enqueuers. Try to reserve the slot by incrementing 1071 // the tail. Once we've done that, no one else will be able to write to this slot, 1072 // and no dequeuer will be able to read from this slot until we've written the new 1073 // sequence number. WARNING: The next few lines are not reliable on a runtime that 1074 // supports thread aborts. If a thread abort were to sneak in after the CompareExchange 1075 // but before the Volatile.Write, other threads will spin trying to access this slot. 1076 // If this implementation is ever used on such a platform, this if block should be 1077 // wrapped in a finally / prepared region. 1078 if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail) 1079 { 1080 // Successfully reserved the slot. Note that after the above CompareExchange, other threads 1081 // trying to return will end up spinning until we do the subsequent Write. 1082 _slots[slotsIndex].Item = item; 1083 Volatile.Write(ref _slots[slotsIndex].SequenceNumber, currentTail + 1); 1084 return true; 1085 } 1086 } 1087 else if (diff < 0) 1088 { 1089 // The sequence number was less than what we needed, which means this slot still 1090 // contains a value, i.e. the segment is full. Technically it's possible that multiple 1091 // dequeuers could have read concurrently, with those getting later slots actually 1092 // finishing first, so there could be spaces after this one that are available, but 1093 // we need to enqueue in order. 1094 return false; 1095 } 1096 1097 // Lost a race. Spin a bit, then try again. 1098 spinner.SpinOnce(); 1099 } 1100 } 1101 1102 /// <summary>Represents a slot in the queue.</summary> 1103 [StructLayout(LayoutKind.Auto)] 1104 [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")] 1105 internal struct Slot 1106 { 1107 /// <summary>The item.</summary> 1108 public T Item; 1109 /// <summary>The sequence number for this slot, used to synchronize between enqueuers and dequeuers.</summary> 1110 public int SequenceNumber; 1111 } 1112 } 1113 } 1114 1115 /// <summary>Padded head and tail indices, to avoid false sharing between producers and consumers.</summary> 1116 [DebuggerDisplay("Head = {Head}, Tail = {Tail}")] 1117 [StructLayout(LayoutKind.Explicit, Size = 384)] // padding before/between/after fields based on worst case cache line size of 128 1118 internal struct PaddedHeadAndTail 1119 { 1120 [FieldOffset(128)] public int Head; 1121 [FieldOffset(256)] public int Tail; 1122 } 1123 } 1124