1 // Licensed to the .NET Foundation under one or more agreements.
2 // The .NET Foundation licenses this file to you under the MIT license.
3 // See the LICENSE file in the project root for more information.
4 
5 using System.Collections.Generic;
6 using System.Diagnostics;
7 using System.Diagnostics.Private;
8 using System.Runtime.InteropServices;
9 using System.Threading;
10 
11 namespace System.Collections.Concurrent
12 {
13     /// <summary>
14     /// Represents a thread-safe first-in, first-out collection of objects.
15     /// </summary>
16     /// <typeparam name="T">Specifies the type of elements in the queue.</typeparam>
17     /// <remarks>
18     /// All public and protected members of <see cref="ConcurrentQueue{T}"/> are thread-safe and may be used
19     /// concurrently from multiple threads.
20     /// </remarks>
21     [DebuggerDisplay("Count = {Count}")]
22     [DebuggerTypeProxy(typeof(IProducerConsumerCollectionDebugView<>))]
23     [Serializable]
24     public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
25     {
26         // This implementation provides an unbounded, multi-producer multi-consumer queue
27         // that supports the standard Enqueue/TryDequeue operations, as well as support for
28         // snapshot enumeration (GetEnumerator, ToArray, CopyTo), peeking, and Count/IsEmpty.
29         // It is composed of a linked list of bounded ring buffers, each of which has a head
30         // and a tail index, isolated from each other to minimize false sharing.  As long as
31         // the number of elements in the queue remains less than the size of the current
32         // buffer (Segment), no additional allocations are required for enqueued items.  When
33         // the number of items exceeds the size of the current segment, the current segment is
34         // "frozen" to prevent further enqueues, and a new segment is linked from it and set
35         // as the new tail segment for subsequent enqueues.  As old segments are consumed by
36         // dequeues, the head reference is updated to point to the segment that dequeuers should
37         // try next.  To support snapshot enumeration, segments also support the notion of
38         // preserving for observation, whereby they avoid overwriting state as part of dequeues.
39         // Any operation that requires a snapshot results in all current segments being
40         // both frozen for enqueues and preserved for observation: any new enqueues will go
41         // to new segments, and dequeuers will consume from the existing segments but without
42         // overwriting the existing data.
43 
44         /// <summary>Initial length of the segments used in the queue.</summary>
45         private const int InitialSegmentLength = 32;
46         /// <summary>
47         /// Maximum length of the segments used in the queue.  This is a somewhat arbitrary limit:
48         /// larger means that as long as we don't exceed the size, we avoid allocating more segments,
49         /// but if we do exceed it, then the segment becomes garbage.
50         /// </summary>
51         private const int MaxSegmentLength = 1024 * 1024;
52 
53         /// <summary>
54         /// Lock used to protect cross-segment operations, including any updates to <see cref="_tail"/> or <see cref="_head"/>
55         /// and any operations that need to get a consistent view of them.
56         /// </summary>
57         private object _crossSegmentLock;
58         /// <summary>The current tail segment.</summary>
59         private volatile Segment _tail;
60         /// <summary>The current head segment.</summary>
61         private volatile Segment _head;
62 
63         /// <summary>
64         /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
65         /// </summary>
ConcurrentQueue()66         public ConcurrentQueue()
67         {
68             _crossSegmentLock = new object();
69             _tail = _head = new Segment(InitialSegmentLength);
70         }
71 
72         /// <summary>
73         /// Initializes the contents of the queue from an existing collection.
74         /// </summary>
75         /// <param name="collection">A collection from which to copy elements.</param>
InitializeFromCollection(IEnumerable<T> collection)76         private void InitializeFromCollection(IEnumerable<T> collection)
77         {
78             _crossSegmentLock = new object();
79 
80             // Determine the initial segment size.  We'll use the default,
81             // unless the collection is known to be larger than than, in which
82             // case we round its length up to a power of 2, as all segments must
83             // be a power of 2 in length.
84             int length = InitialSegmentLength;
85             var c = collection as ICollection<T>;
86             if (c != null)
87             {
88                 int count = c.Count;
89                 if (count > length)
90                 {
91                     length = Math.Min(RoundUpToPowerOf2(count), MaxSegmentLength);
92                 }
93             }
94 
95             // Initialize the segment and add all of the data to it.
96             _tail = _head = new Segment(length);
97             foreach (T item in collection)
98             {
99                 Enqueue(item);
100             }
101         }
102 
103         /// <summary>
104         /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class that contains elements copied
105         /// from the specified collection.
106         /// </summary>
107         /// <param name="collection">
108         /// The collection whose elements are copied to the new <see cref="ConcurrentQueue{T}"/>.
109         /// </param>
110         /// <exception cref="System.ArgumentNullException">The <paramref name="collection"/> argument is null.</exception>
ConcurrentQueue(IEnumerable<T> collection)111         public ConcurrentQueue(IEnumerable<T> collection)
112         {
113             if (collection == null)
114             {
115                 throw new ArgumentNullException(nameof(collection));
116             }
117 
118             InitializeFromCollection(collection);
119         }
120 
121         /// <summary>
122         /// Copies the elements of the <see cref="ICollection"/> to an <see
123         /// cref="Array"/>, starting at a particular <see cref="Array"/> index.
124         /// </summary>
125         /// <param name="array">
126         /// The one-dimensional <see cref="Array">Array</see> that is the destination of the
127         /// elements copied from the <see cref="ConcurrentQueue{T}"/>. <paramref name="array"/> must have
128         /// zero-based indexing.
129         /// </param>
130         /// <param name="index">The zero-based index in <paramref name="array"/> at which copying begins.</param>
131         /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in
132         /// Visual Basic).</exception>
133         /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than
134         /// zero.</exception>
135         /// <exception cref="ArgumentException">
136         /// <paramref name="array"/> is multidimensional. -or-
137         /// <paramref name="array"/> does not have zero-based indexing. -or-
138         /// <paramref name="index"/> is equal to or greater than the length of the <paramref name="array"/>
139         /// -or- The number of elements in the source <see cref="ICollection"/> is
140         /// greater than the available space from <paramref name="index"/> to the end of the destination
141         /// <paramref name="array"/>. -or- The type of the source <see
142         /// cref="ICollection"/> cannot be cast automatically to the type of the
143         /// destination <paramref name="array"/>.
144         /// </exception>
ICollection.CopyTo(Array array, int index)145         void ICollection.CopyTo(Array array, int index)
146         {
147             // Special-case when the Array is actually a T[], taking a faster path
148             T[] szArray = array as T[];
149             if (szArray != null)
150             {
151                 CopyTo(szArray, index);
152                 return;
153             }
154 
155             // Validate arguments.
156             if (array == null)
157             {
158                 throw new ArgumentNullException(nameof(array));
159             }
160 
161             // Otherwise, fall back to the slower path that first copies the contents
162             // to an array, and then uses that array's non-generic CopyTo to do the copy.
163             ToArray().CopyTo(array, index);
164         }
165 
166         /// <summary>
167         /// Gets a value indicating whether access to the <see cref="ICollection"/> is
168         /// synchronized with the SyncRoot.
169         /// </summary>
170         /// <value>true if access to the <see cref="ICollection"/> is synchronized
171         /// with the SyncRoot; otherwise, false. For <see cref="ConcurrentQueue{T}"/>, this property always
172         /// returns false.</value>
173         bool ICollection.IsSynchronized => false; // always false, as true implies synchronization via SyncRoot
174 
175         /// <summary>
176         /// Gets an object that can be used to synchronize access to the <see
177         /// cref="ICollection"/>. This property is not supported.
178         /// </summary>
179         /// <exception cref="NotSupportedException">The SyncRoot property is not supported.</exception>
180         object ICollection.SyncRoot { get { throw new NotSupportedException(SR.ConcurrentCollection_SyncRoot_NotSupported); } }
181 
182         /// <summary>Returns an enumerator that iterates through a collection.</summary>
183         /// <returns>An <see cref="IEnumerator"/> that can be used to iterate through the collection.</returns>
IEnumerable.GetEnumerator()184         IEnumerator IEnumerable.GetEnumerator() => ((IEnumerable<T>)this).GetEnumerator();
185 
186         /// <summary>
187         /// Attempts to add an object to the <see cref="Concurrent.IProducerConsumerCollection{T}"/>.
188         /// </summary>
189         /// <param name="item">The object to add to the <see
190         /// cref="Concurrent.IProducerConsumerCollection{T}"/>. The value can be a null
191         /// reference (Nothing in Visual Basic) for reference types.
192         /// </param>
193         /// <returns>true if the object was added successfully; otherwise, false.</returns>
194         /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will always add the object to the
195         /// end of the <see cref="ConcurrentQueue{T}"/>
196         /// and return true.</remarks>
TryAdd(T item)197         bool IProducerConsumerCollection<T>.TryAdd(T item)
198         {
199             Enqueue(item);
200             return true;
201         }
202 
203         /// <summary>
204         /// Attempts to remove and return an object from the <see cref="Concurrent.IProducerConsumerCollection{T}"/>.
205         /// </summary>
206         /// <param name="item">
207         /// When this method returns, if the operation was successful, <paramref name="item"/> contains the
208         /// object removed. If no object was available to be removed, the value is unspecified.
209         /// </param>
210         /// <returns>true if an element was removed and returned successfully; otherwise, false.</returns>
211         /// <remarks>For <see cref="ConcurrentQueue{T}"/>, this operation will attempt to remove the object
212         /// from the beginning of the <see cref="ConcurrentQueue{T}"/>.
213         /// </remarks>
214         bool IProducerConsumerCollection<T>.TryTake(out T item) => TryDequeue(out item);
215 
216         /// <summary>
217         /// Gets a value that indicates whether the <see cref="ConcurrentQueue{T}"/> is empty.
218         /// </summary>
219         /// <value>true if the <see cref="ConcurrentQueue{T}"/> is empty; otherwise, false.</value>
220         /// <remarks>
221         /// For determining whether the collection contains any items, use of this property is recommended
222         /// rather than retrieving the number of items from the <see cref="Count"/> property and comparing it
223         /// to 0.  However, as this collection is intended to be accessed concurrently, it may be the case
224         /// that another thread will modify the collection after <see cref="IsEmpty"/> returns, thus invalidating
225         /// the result.
226         /// </remarks>
227         public bool IsEmpty
228         {
229             get
230             {
231                 // IsEmpty == !TryPeek. We use a "resultUsed:false" peek in order to avoid marking
232                 // segments as preserved for observation, making IsEmpty a cheaper way than either
233                 // TryPeek(out T) or Count == 0 to check whether any elements are in the queue.
234                 T ignoredResult;
235                 return !TryPeek(out ignoredResult, resultUsed: false);
236             }
237         }
238 
239         /// <summary>Copies the elements stored in the <see cref="ConcurrentQueue{T}"/> to a new array.</summary>
240         /// <returns>A new array containing a snapshot of elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
ToArray()241         public T[] ToArray()
242         {
243             // Snap the current contents for enumeration.
244             Segment head, tail;
245             int headHead, tailTail;
246             SnapForObservation(out head, out headHead, out tail, out tailTail);
247 
248             // Count the number of items in that snapped set, and use it to allocate an
249             // array of the right size.
250             long count = GetCount(head, headHead, tail, tailTail);
251             T[] arr = new T[count];
252 
253             // Now enumerate the contents, copying each element into the array.
254             using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
255             {
256                 int i = 0;
257                 while (e.MoveNext())
258                 {
259                     arr[i++] = e.Current;
260                 }
261                 Debug.Assert(count == i);
262             }
263 
264             // And return it.
265             return arr;
266         }
267 
268         /// <summary>
269         /// Gets the number of elements contained in the <see cref="ConcurrentQueue{T}"/>.
270         /// </summary>
271         /// <value>The number of elements contained in the <see cref="ConcurrentQueue{T}"/>.</value>
272         /// <remarks>
273         /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/>
274         /// property is recommended rather than retrieving the number of items from the <see cref="Count"/>
275         /// property and comparing it to 0.
276         /// </remarks>
277         public int Count
278         {
279             get
280             {
281                 Segment head, tail;
282                 int headHead, headTail, tailHead, tailTail;
283                 var spinner = new SpinWait();
284                 while (true)
285                 {
286                     // Capture the head and tail, as well as the head's head and tail.
287                     head = _head;
288                     tail = _tail;
289                     headHead = Volatile.Read(ref head._headAndTail.Head);
290                     headTail = Volatile.Read(ref head._headAndTail.Tail);
291 
292                     if (head == tail)
293                     {
294                         // There was a single segment in the queue.  If the captured
295                         // values still (or again) represent reality, return the segment's
296                         // count. A single segment should be the most common case once the
297                         // queue's size has stabilized after segments have grown to
298                         // the point where growing is no longer needed.
299                         if (head == _head &&
300                             head == _tail &&
301                             headHead == Volatile.Read(ref head._headAndTail.Head) &&
302                             headTail == Volatile.Read(ref head._headAndTail.Tail))
303                         {
304                             return GetCount(head, headHead, headTail);
305                         }
306                     }
307                     else if (head._nextSegment == tail)
308                     {
309                         // There were two segments in the queue.  Get the positions
310                         // from the tail, and if the captured values still (or again) match
311                         // reality, return the sum of the counts from both segments.
312                         tailHead = Volatile.Read(ref tail._headAndTail.Head);
313                         tailTail = Volatile.Read(ref tail._headAndTail.Tail);
314                         if (head == _head &&
315                             tail == _tail &&
316                             headHead == Volatile.Read(ref head._headAndTail.Head) &&
317                             headTail == Volatile.Read(ref head._headAndTail.Tail) &&
318                             tailHead == Volatile.Read(ref tail._headAndTail.Head) &&
319                             tailTail == Volatile.Read(ref tail._headAndTail.Tail))
320                         {
321                             // We got stable values, so we can just compute the sizes based on those
322                             // values and return the sum of the counts of the segments.
323                             return GetCount(head, headHead, headTail) + GetCount(tail, tailHead, tailTail);
324                         }
325                     }
326                     else
327                     {
328                         // There were more than two segments.  Take the slower path, where we freeze the
329                         // queue and then count the now stable segments.
330                         SnapForObservation(out head, out headHead, out tail, out tailTail);
331                         return unchecked((int)GetCount(head, headHead, tail, tailTail));
332                     }
333 
334                     // We raced with enqueues/dequeues and captured an inconsistent picture of the queue.
335                     // Spin and try again.
336                     spinner.SpinOnce();
337                 }
338             }
339         }
340 
341         /// <summary>Computes the number of items in a segment based on a fixed head and tail in that segment.</summary>
GetCount(Segment s, int head, int tail)342         private static int GetCount(Segment s, int head, int tail)
343         {
344             if (head != tail && head != tail - s.FreezeOffset)
345             {
346                 head &= s._slotsMask;
347                 tail &= s._slotsMask;
348                 return head < tail ? tail - head : s._slots.Length - head + tail;
349             }
350             return 0;
351         }
352 
353         /// <summary>Gets the number of items in snapped region.</summary>
GetCount(Segment head, int headHead, Segment tail, int tailTail)354         private static long GetCount(Segment head, int headHead, Segment tail, int tailTail)
355         {
356             // All of the segments should have been both frozen for enqueues and preserved for observation.
357             // Validate that here for head and tail; we'll validate it for intermediate segments later.
358             Debug.Assert(head._preservedForObservation);
359             Debug.Assert(head._frozenForEnqueues);
360             Debug.Assert(tail._preservedForObservation);
361             Debug.Assert(tail._frozenForEnqueues);
362 
363             long count = 0;
364 
365             // Head segment.  We've already marked it as frozen for enqueues, so its tail position is fixed,
366             // and we've already marked it as preserved for observation (before we grabbed the head), so we
367             // can safely enumerate from its head to its tail and access its elements.
368             int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset;
369             if (headHead < headTail)
370             {
371                 // Mask the head and tail for the head segment
372                 headHead &= head._slotsMask;
373                 headTail &= head._slotsMask;
374 
375                 // Increase the count by either the one or two regions, based on whether tail
376                 // has wrapped to be less than head.
377                 count += headHead < headTail ?
378                     headTail - headHead :
379                     head._slots.Length - headHead + headTail;
380             }
381 
382             // We've enumerated the head.  If the tail is different from the head, we need to
383             // enumerate the remaining segments.
384             if (head != tail)
385             {
386                 // Count the contents of each segment between head and tail, not including head and tail.
387                 // Since there were segments before these, for our purposes we consider them to start at
388                 // the 0th element, and since there is at least one segment after each, each was frozen
389                 // by the time we snapped it, so we can iterate until each's frozen tail.
390                 for (Segment s = head._nextSegment; s != tail; s = s._nextSegment)
391                 {
392                     Debug.Assert(s._preservedForObservation);
393                     Debug.Assert(s._frozenForEnqueues);
394                     count += s._headAndTail.Tail - s.FreezeOffset;
395                 }
396 
397                 // Finally, enumerate the tail.  As with the intermediate segments, there were segments
398                 // before this in the snapped region, so we can start counting from the beginning. Unlike
399                 // the intermediate segments, we can't just go until the Tail, as that could still be changing;
400                 // instead we need to go until the tail we snapped for observation.
401                 count += tailTail - tail.FreezeOffset;
402             }
403 
404             // Return the computed count.
405             return count;
406         }
407 
408         /// <summary>
409         /// Copies the <see cref="ConcurrentQueue{T}"/> elements to an existing one-dimensional <see
410         /// cref="Array">Array</see>, starting at the specified array index.
411         /// </summary>
412         /// <param name="array">The one-dimensional <see cref="Array">Array</see> that is the
413         /// destination of the elements copied from the
414         /// <see cref="ConcurrentQueue{T}"/>. The <see cref="Array">Array</see> must have zero-based
415         /// indexing.</param>
416         /// <param name="index">The zero-based index in <paramref name="array"/> at which copying
417         /// begins.</param>
418         /// <exception cref="ArgumentNullException"><paramref name="array"/> is a null reference (Nothing in
419         /// Visual Basic).</exception>
420         /// <exception cref="ArgumentOutOfRangeException"><paramref name="index"/> is less than
421         /// zero.</exception>
422         /// <exception cref="ArgumentException"><paramref name="index"/> is equal to or greater than the
423         /// length of the <paramref name="array"/>
424         /// -or- The number of elements in the source <see cref="ConcurrentQueue{T}"/> is greater than the
425         /// available space from <paramref name="index"/> to the end of the destination <paramref
426         /// name="array"/>.
427         /// </exception>
CopyTo(T[] array, int index)428         public void CopyTo(T[] array, int index)
429         {
430             if (array == null)
431             {
432                 throw new ArgumentNullException(nameof(array));
433             }
434             if (index < 0)
435             {
436                 throw new ArgumentOutOfRangeException(nameof(index), SR.Collection_CopyTo_ArgumentOutOfRangeException);
437             }
438 
439             // Snap for enumeration
440             Segment head, tail;
441             int headHead, tailTail;
442             SnapForObservation(out head, out headHead, out tail, out tailTail);
443 
444             // Get the number of items to be enumerated
445             long count = GetCount(head, headHead, tail, tailTail);
446             if (index > array.Length - count)
447             {
448                 throw new ArgumentException(SR.Collection_CopyTo_TooManyElems);
449             }
450 
451             // Copy the items to the target array
452             int i = index;
453             using (IEnumerator<T> e = Enumerate(head, headHead, tail, tailTail))
454             {
455                 while (e.MoveNext())
456                 {
457                     array[i++] = e.Current;
458                 }
459             }
460             Debug.Assert(count == i - index);
461         }
462 
463         /// <summary>Returns an enumerator that iterates through the <see cref="ConcurrentQueue{T}"/>.</summary>
464         /// <returns>An enumerator for the contents of the <see
465         /// cref="ConcurrentQueue{T}"/>.</returns>
466         /// <remarks>
467         /// The enumeration represents a moment-in-time snapshot of the contents
468         /// of the queue.  It does not reflect any updates to the collection after
469         /// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
470         /// concurrently with reads from and writes to the queue.
471         /// </remarks>
GetEnumerator()472         public IEnumerator<T> GetEnumerator()
473         {
474             Segment head, tail;
475             int headHead, tailTail;
476             SnapForObservation(out head, out headHead, out tail, out tailTail);
477             return Enumerate(head, headHead, tail, tailTail);
478         }
479 
480         /// <summary>
481         /// Gets the head and tail information of the current contents of the queue.
482         /// After this call returns, the specified region can be enumerated any number
483         /// of times and will not change.
484         /// </summary>
SnapForObservation(out Segment head, out int headHead, out Segment tail, out int tailTail)485         private void SnapForObservation(out Segment head, out int headHead, out Segment tail, out int tailTail)
486         {
487             lock (_crossSegmentLock) // _head and _tail may only change while the lock is held.
488             {
489                 // Snap the head and tail
490                 head = _head;
491                 tail = _tail;
492                 Debug.Assert(head != null);
493                 Debug.Assert(tail != null);
494                 Debug.Assert(tail._nextSegment == null);
495 
496                 // Mark them and all segments in between as preserving, and ensure no additional items
497                 // can be added to the tail.
498                 for (Segment s = head; ; s = s._nextSegment)
499                 {
500                     s._preservedForObservation = true;
501                     if (s == tail) break;
502                     Debug.Assert(s._frozenForEnqueues); // any non-tail should already be marked
503                 }
504                 tail.EnsureFrozenForEnqueues(); // we want to prevent the tailTail from moving
505 
506                 // At this point, any dequeues from any segment won't overwrite the value, and
507                 // none of the existing segments can have new items enqueued.
508 
509                 headHead = Volatile.Read(ref head._headAndTail.Head);
510                 tailTail = Volatile.Read(ref tail._headAndTail.Tail);
511             }
512         }
513 
514         /// <summary>Gets the item stored in the <paramref name="i"/>th entry in <paramref name="segment"/>.</summary>
GetItemWhenAvailable(Segment segment, int i)515         private T GetItemWhenAvailable(Segment segment, int i)
516         {
517             Debug.Assert(segment._preservedForObservation);
518 
519             // Get the expected value for the sequence number
520             int expectedSequenceNumberAndMask = (i + 1) & segment._slotsMask;
521 
522             // If the expected sequence number is not yet written, we're still waiting for
523             // an enqueuer to finish storing it.  Spin until it's there.
524             if ((segment._slots[i].SequenceNumber & segment._slotsMask) != expectedSequenceNumberAndMask)
525             {
526                 var spinner = new SpinWait();
527                 while ((Volatile.Read(ref segment._slots[i].SequenceNumber) & segment._slotsMask) != expectedSequenceNumberAndMask)
528                 {
529                     spinner.SpinOnce();
530                 }
531             }
532 
533             // Return the value from the slot.
534             return segment._slots[i].Item;
535         }
536 
Enumerate(Segment head, int headHead, Segment tail, int tailTail)537         private IEnumerator<T> Enumerate(Segment head, int headHead, Segment tail, int tailTail)
538         {
539             Debug.Assert(head._preservedForObservation);
540             Debug.Assert(head._frozenForEnqueues);
541             Debug.Assert(tail._preservedForObservation);
542             Debug.Assert(tail._frozenForEnqueues);
543 
544             // Head segment.  We've already marked it as not accepting any more enqueues,
545             // so its tail position is fixed, and we've already marked it as preserved for
546             // enumeration (before we grabbed its head), so we can safely enumerate from
547             // its head to its tail.
548             int headTail = (head == tail ? tailTail : Volatile.Read(ref head._headAndTail.Tail)) - head.FreezeOffset;
549             if (headHead < headTail)
550             {
551                 headHead &= head._slotsMask;
552                 headTail &= head._slotsMask;
553 
554                 if (headHead < headTail)
555                 {
556                     for (int i = headHead; i < headTail; i++) yield return GetItemWhenAvailable(head, i);
557                 }
558                 else
559                 {
560                     for (int i = headHead; i < head._slots.Length; i++) yield return GetItemWhenAvailable(head, i);
561                     for (int i = 0; i < headTail; i++) yield return GetItemWhenAvailable(head, i);
562                 }
563             }
564 
565             // We've enumerated the head.  If the tail is the same, we're done.
566             if (head != tail)
567             {
568                 // Each segment between head and tail, not including head and tail.  Since there were
569                 // segments before these, for our purposes we consider it to start at the 0th element.
570                 for (Segment s = head._nextSegment; s != tail; s = s._nextSegment)
571                 {
572                     Debug.Assert(s._preservedForObservation, "Would have had to been preserved as a segment part of enumeration");
573                     Debug.Assert(s._frozenForEnqueues, "Would have had to be frozen for enqueues as it's intermediate");
574 
575                     int sTail = s._headAndTail.Tail - s.FreezeOffset;
576                     for (int i = 0; i < sTail; i++)
577                     {
578                         yield return GetItemWhenAvailable(s, i);
579                     }
580                 }
581 
582                 // Enumerate the tail.  Since there were segments before this, we can just start at
583                 // its beginning, and iterate until the tail we already grabbed.
584                 tailTail -= tail.FreezeOffset;
585                 for (int i = 0; i < tailTail; i++)
586                 {
587                     yield return GetItemWhenAvailable(tail, i);
588                 }
589             }
590         }
591 
592         /// <summary>Round the specified value up to the next power of 2, if it isn't one already.</summary>
RoundUpToPowerOf2(int i)593         private static int RoundUpToPowerOf2(int i)
594         {
595             --i;
596             i |= i >> 1;
597             i |= i >> 2;
598             i |= i >> 4;
599             i |= i >> 8;
600             i |= i >> 16;
601             return i + 1;
602         }
603 
604         /// <summary>Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.</summary>
605         /// <param name="item">
606         /// The object to add to the end of the <see cref="ConcurrentQueue{T}"/>.
607         /// The value can be a null reference (Nothing in Visual Basic) for reference types.
608         /// </param>
Enqueue(T item)609         public void Enqueue(T item)
610         {
611             // Try to enqueue to the current tail.
612             if (!_tail.TryEnqueue(item))
613             {
614                 // If we're unable to, we need to take a slow path that will
615                 // try to add a new tail segment.
616                 EnqueueSlow(item);
617             }
618         }
619 
620         /// <summary>Adds to the end of the queue, adding a new segment if necessary.</summary>
EnqueueSlow(T item)621         private void EnqueueSlow(T item)
622         {
623             while (true)
624             {
625                 Segment tail = _tail;
626 
627                 // Try to append to the existing tail.
628                 if (tail.TryEnqueue(item))
629                 {
630                     return;
631                 }
632 
633                 // If we were unsuccessful, take the lock so that we can compare and manipulate
634                 // the tail.  Assuming another enqueuer hasn't already added a new segment,
635                 // do so, then loop around to try enqueueing again.
636                 lock (_crossSegmentLock)
637                 {
638                     if (tail == _tail)
639                     {
640                         // Make sure no one else can enqueue to this segment.
641                         tail.EnsureFrozenForEnqueues();
642 
643                         // We determine the new segment's length based on the old length.
644                         // In general, we double the size of the segment, to make it less likely
645                         // that we'll need to grow again.  However, if the tail segment is marked
646                         // as preserved for observation, something caused us to avoid reusing this
647                         // segment, and if that happens a lot and we grow, we'll end up allocating
648                         // lots of wasted space.  As such, in such situations we reset back to the
649                         // initial segment length; if these observations are happening frequently,
650                         // this will help to avoid wasted memory, and if they're not, we'll
651                         // relatively quickly grow again to a larger size.
652                         int nextSize = tail._preservedForObservation ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);
653                         var newTail = new Segment(nextSize);
654 
655                         // Hook up the new tail.
656                         tail._nextSegment = newTail;
657                         _tail = newTail;
658                     }
659                 }
660             }
661         }
662 
663         /// <summary>
664         /// Attempts to remove and return the object at the beginning of the <see
665         /// cref="ConcurrentQueue{T}"/>.
666         /// </summary>
667         /// <param name="result">
668         /// When this method returns, if the operation was successful, <paramref name="result"/> contains the
669         /// object removed. If no object was available to be removed, the value is unspecified.
670         /// </param>
671         /// <returns>
672         /// true if an element was removed and returned from the beginning of the
673         /// <see cref="ConcurrentQueue{T}"/> successfully; otherwise, false.
674         /// </returns>
675         public bool TryDequeue(out T result) =>
676             _head.TryDequeue(out result) || // fast-path that operates just on the head segment
677             TryDequeueSlow(out result); // slow path that needs to fix up segments
678 
679         /// <summary>Tries to dequeue an item, removing empty segments as needed.</summary>
TryDequeueSlow(out T item)680         private bool TryDequeueSlow(out T item)
681         {
682             while (true)
683             {
684                 // Get the current head
685                 Segment head = _head;
686 
687                 // Try to take.  If we're successful, we're done.
688                 if (head.TryDequeue(out item))
689                 {
690                     return true;
691                 }
692 
693                 // Check to see whether this segment is the last. If it is, we can consider
694                 // this to be a moment-in-time empty condition (even though between the TryDequeue
695                 // check and this check, another item could have arrived).
696                 if (head._nextSegment == null)
697                 {
698                     item = default(T);
699                     return false;
700                 }
701 
702                 // At this point we know that head.Next != null, which means
703                 // this segment has been frozen for additional enqueues. But between
704                 // the time that we ran TryDequeue and checked for a next segment,
705                 // another item could have been added.  Try to dequeue one more time
706                 // to confirm that the segment is indeed empty.
707                 Debug.Assert(head._frozenForEnqueues);
708                 if (head.TryDequeue(out item))
709                 {
710                     return true;
711                 }
712 
713                 // This segment is frozen (nothing more can be added) and empty (nothing is in it).
714                 // Update head to point to the next segment in the list, assuming no one's beat us to it.
715                 lock (_crossSegmentLock)
716                 {
717                     if (head == _head)
718                     {
719                         _head = head._nextSegment;
720                     }
721                 }
722             }
723         }
724 
725         /// <summary>
726         /// Attempts to return an object from the beginning of the <see cref="ConcurrentQueue{T}"/>
727         /// without removing it.
728         /// </summary>
729         /// <param name="result">
730         /// When this method returns, <paramref name="result"/> contains an object from
731         /// the beginning of the <see cref="Concurrent.ConcurrentQueue{T}"/> or default(T)
732         /// if the operation failed.
733         /// </param>
734         /// <returns>true if and object was returned successfully; otherwise, false.</returns>
735         /// <remarks>
736         /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/>
737         /// property is recommended rather than peeking.
738         /// </remarks>
739         public bool TryPeek(out T result) => TryPeek(out result, resultUsed: true);
740 
741         /// <summary>Attempts to retrieve the value for the first element in the queue.</summary>
742         /// <param name="result">The value of the first element, if found.</param>
743         /// <param name="resultUsed">true if the result is neede; otherwise false if only the true/false outcome is needed.</param>
744         /// <returns>true if an element was found; otherwise, false.</returns>
TryPeek(out T result, bool resultUsed)745         private bool TryPeek(out T result, bool resultUsed)
746         {
747             // Starting with the head segment, look through all of the segments
748             // for the first one we can find that's not empty.
749             Segment s = _head;
750             while (true)
751             {
752                 // Grab the next segment from this one, before we peek.
753                 // This is to be able to see whether the value has changed
754                 // during the peek operation.
755                 Segment next = Volatile.Read(ref s._nextSegment);
756 
757                 // Peek at the segment.  If we find an element, we're done.
758                 if (s.TryPeek(out result, resultUsed))
759                 {
760                     return true;
761                 }
762 
763                 // The current segment was empty at the moment we checked.
764 
765                 if (next != null)
766                 {
767                     // If prior to the peek there was already a next segment, then
768                     // during the peek no additional items could have been enqueued
769                     // to it and we can just move on to check the next segment.
770                     Debug.Assert(next == s._nextSegment);
771                     s = next;
772                 }
773                 else if (Volatile.Read(ref s._nextSegment) == null)
774                 {
775                     // The next segment is null.  Nothing more to peek at.
776                     break;
777                 }
778 
779                 // The next segment was null before we peeked but non-null after.
780                 // That means either when we peeked the first segment had
781                 // already been frozen but the new segment not yet added,
782                 // or that the first segment was empty and between the time
783                 // that we peeked and then checked _nextSegment, so many items
784                 // were enqueued that we filled the first segment and went
785                 // into the next.  Since we need to peek in order, we simply
786                 // loop around again to peek on the same segment.  The next
787                 // time around on this segment we'll then either successfully
788                 // peek or we'll find that next was non-null before peeking,
789                 // and we'll traverse to that segment.
790             }
791 
792             result = default(T);
793             return false;
794         }
795 
796         /// <summary>
797         /// Removes all objects from the <see cref="ConcurrentQueue{T}"/>.
798         /// </summary>
Clear()799         public void Clear()
800         {
801             lock (_crossSegmentLock)
802             {
803                 // Simply substitute a new segment for the existing head/tail,
804                 // as is done in the constructor.  Operations currently in flight
805                 // may still read from or write to an existing segment that's
806                 // getting dropped, meaning that in flight operations may not be
807                 // linear with regards to this clear operation.  To help mitigate
808                 // in-flight operations enqueuing onto the tail that's about to
809                 // be dropped, we first freeze it; that'll force enqueuers to take
810                 // this lock to synchronize and see the new tail.
811                 _tail.EnsureFrozenForEnqueues();
812                 _tail = _head = new Segment(InitialSegmentLength);
813             }
814         }
815 
816         /// <summary>
817         /// Provides a multi-producer, multi-consumer thread-safe bounded segment.  When the queue is full,
818         /// enqueues fail and return false.  When the queue is empty, dequeues fail and return null.
819         /// These segments are linked together to form the unbounded <see cref="ConcurrentQueue{T}"/>.
820         /// </summary>
821         [DebuggerDisplay("Capacity = {Capacity}")]
822         private sealed class Segment
823         {
824             // Segment design is inspired by the algorithm outlined at:
825             // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
826 
827             /// <summary>The array of items in this queue.  Each slot contains the item in that slot and its "sequence number".</summary>
828             internal readonly Slot[] _slots;
829             /// <summary>Mask for quickly accessing a position within the queue's array.</summary>
830             internal readonly int _slotsMask;
831             /// <summary>The head and tail positions, with padding to help avoid false sharing contention.</summary>
832             /// <remarks>Dequeuing happens from the head, enqueuing happens at the tail.</remarks>
833             internal PaddedHeadAndTail _headAndTail; // mutable struct: do not make this readonly
834 
835             /// <summary>Indicates whether the segment has been marked such that dequeues don't overwrite the removed data.</summary>
836             internal bool _preservedForObservation;
837             /// <summary>Indicates whether the segment has been marked such that no additional items may be enqueued.</summary>
838             internal bool _frozenForEnqueues;
839             /// <summary>The segment following this one in the queue, or null if this segment is the last in the queue.</summary>
840             internal Segment _nextSegment;
841 
842             /// <summary>Creates the segment.</summary>
843             /// <param name="boundedLength">
844             /// The maximum number of elements the segment can contain.  Must be a power of 2.
845             /// </param>
Segment(int boundedLength)846             public Segment(int boundedLength)
847             {
848                 // Validate the length
849                 Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
850                 Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");
851 
852                 // Initialize the slots and the mask.  The mask is used as a way of quickly doing "% _slots.Length",
853                 // instead letting us do "& _slotsMask".
854                 _slots = new Slot[boundedLength];
855                 _slotsMask = boundedLength - 1;
856 
857                 // Initialize the sequence number for each slot.  The sequence number provides a ticket that
858                 // allows dequeuers to know whether they can dequeue and enqueuers to know whether they can
859                 // enqueue.  An enqueuer at position N can enqueue when the sequence number is N, and a dequeuer
860                 // for position N can dequeue when the sequence number is N + 1.  When an enqueuer is done writing
861                 // at position N, it sets the sequence number to N + 1 so that a dequeuer will be able to dequeue,
862                 // and when a dequeuer is done dequeueing at position N, it sets the sequence number to N + _slots.Length,
863                 // so that when an enqueuer loops around the slots, it'll find that the sequence number at
864                 // position N is N.  This also means that when an enqueuer finds that at position N the sequence
865                 // number is < N, there is still a value in that slot, i.e. the segment is full, and when a
866                 // dequeuer finds that the value in a slot is < N + 1, there is nothing currently available to
867                 // dequeue. (It is possible for multiple enqueuers to enqueue concurrently, writing into
868                 // subsequent slots, and to have the first enqueuer take longer, so that the slots for 1, 2, 3, etc.
869                 // may have values, but the 0th slot may still be being filled... in that case, TryDequeue will
870                 // return false.)
871                 for (int i = 0; i < _slots.Length; i++)
872                 {
873                     _slots[i].SequenceNumber = i;
874                 }
875             }
876 
877             /// <summary>Gets the number of elements this segment can store.</summary>
878             internal int Capacity => _slots.Length;
879 
880             /// <summary>Gets the "freeze offset" for this segment.</summary>
881             internal int FreezeOffset => _slots.Length * 2;
882 
883             /// <summary>
884             /// Ensures that the segment will not accept any subsequent enqueues that aren't already underway.
885             /// </summary>
886             /// <remarks>
887             /// When we mark a segment as being frozen for additional enqueues,
888             /// we set the <see cref="_frozenForEnqueues"/> bool, but that's mostly
889             /// as a small helper to avoid marking it twice.  The real marking comes
890             /// by modifying the Tail for the segment, increasing it by this
891             /// <see cref="FreezeOffset"/>.  This effectively knocks it off the
892             /// sequence expected by future enqueuers, such that any additional enqueuer
893             /// will be unable to enqueue due to it not lining up with the expected
894             /// sequence numbers.  This value is chosen specially so that Tail will grow
895             /// to a value that maps to the same slot but that won't be confused with
896             /// any other enqueue/dequeue sequence number.
897             /// </remarks>
EnsureFrozenForEnqueues()898             internal void EnsureFrozenForEnqueues() // must only be called while queue's segment lock is held
899             {
900                 if (!_frozenForEnqueues) // flag used to ensure we don't increase the Tail more than once if frozen more than once
901                 {
902                     _frozenForEnqueues = true;
903 
904                     // Increase the tail by FreezeOffset, spinning until we're successful in doing so.
905                     var spinner = new SpinWait();
906                     while (true)
907                     {
908                         int tail = Volatile.Read(ref _headAndTail.Tail);
909                         if (Interlocked.CompareExchange(ref _headAndTail.Tail, tail + FreezeOffset, tail) == tail)
910                         {
911                             break;
912                         }
913                         spinner.SpinOnce();
914                     }
915                 }
916             }
917 
918             /// <summary>Tries to dequeue an element from the queue.</summary>
TryDequeue(out T item)919             public bool TryDequeue(out T item)
920             {
921                 // Loop in case of contention...
922                 var spinner = new SpinWait();
923                 while (true)
924                 {
925                     // Get the head at which to try to dequeue.
926                     int currentHead = Volatile.Read(ref _headAndTail.Head);
927                     int slotsIndex = currentHead & _slotsMask;
928 
929                     // Read the sequence number for the head position.
930                     int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber);
931 
932                     // We can dequeue from this slot if it's been filled by an enqueuer, which
933                     // would have left the sequence number at pos+1.
934                     int diff = sequenceNumber - (currentHead + 1);
935                     if (diff == 0)
936                     {
937                         // We may be racing with other dequeuers.  Try to reserve the slot by incrementing
938                         // the head.  Once we've done that, no one else will be able to read from this slot,
939                         // and no enqueuer will be able to read from this slot until we've written the new
940                         // sequence number. WARNING: The next few lines are not reliable on a runtime that
941                         // supports thread aborts. If a thread abort were to sneak in after the CompareExchange
942                         // but before the Volatile.Write, enqueuers trying to enqueue into this slot would
943                         // spin indefinitely.  If this implementation is ever used on such a platform, this
944                         // if block should be wrapped in a finally / prepared region.
945                         if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
946                         {
947                             // Successfully reserved the slot.  Note that after the above CompareExchange, other threads
948                             // trying to dequeue from this slot will end up spinning until we do the subsequent Write.
949                             item = _slots[slotsIndex].Item;
950                             if (!Volatile.Read(ref _preservedForObservation))
951                             {
952                                 // If we're preserving, though, we don't zero out the slot, as we need it for
953                                 // enumerations, peeking, ToArray, etc.  And we don't update the sequence number,
954                                 // so that an enqueuer will see it as full and be forced to move to a new segment.
955                                 _slots[slotsIndex].Item = default(T);
956                                 Volatile.Write(ref _slots[slotsIndex].SequenceNumber, currentHead + _slots.Length);
957                             }
958                             return true;
959                         }
960                     }
961                     else if (diff < 0)
962                     {
963                         // The sequence number was less than what we needed, which means this slot doesn't
964                         // yet contain a value we can dequeue, i.e. the segment is empty.  Technically it's
965                         // possible that multiple enqueuers could have written concurrently, with those
966                         // getting later slots actually finishing first, so there could be elements after
967                         // this one that are available, but we need to dequeue in order.  So before declaring
968                         // failure and that the segment is empty, we check the tail to see if we're actually
969                         // empty or if we're just waiting for items in flight or after this one to become available.
970                         bool frozen = _frozenForEnqueues;
971                         int currentTail = Volatile.Read(ref _headAndTail.Tail);
972                         if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
973                         {
974                             item = default(T);
975                             return false;
976                         }
977 
978                         // It's possible it could have become frozen after we checked _frozenForEnqueues
979                         // and before reading the tail.  That's ok: in that rare race condition, we just
980                         // loop around again.
981                     }
982 
983                     // Lost a race. Spin a bit, then try again.
984                     spinner.SpinOnce();
985                 }
986             }
987 
988             /// <summary>Tries to peek at an element from the queue, without removing it.</summary>
TryPeek(out T result, bool resultUsed)989             public bool TryPeek(out T result, bool resultUsed)
990             {
991                 if (resultUsed)
992                 {
993                     // In order to ensure we don't get a torn read on the value, we mark the segment
994                     // as preserving for observation.  Additional items can still be enqueued to this
995                     // segment, but no space will be freed during dequeues, such that the segment will
996                     // no longer be reusable.
997                     _preservedForObservation = true;
998                     Interlocked.MemoryBarrier();
999                 }
1000 
1001                 // Loop in case of contention...
1002                 var spinner = new SpinWait();
1003                 while (true)
1004                 {
1005                     // Get the head at which to try to peek.
1006                     int currentHead = Volatile.Read(ref _headAndTail.Head);
1007                     int slotsIndex = currentHead & _slotsMask;
1008 
1009                     // Read the sequence number for the head position.
1010                     int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber);
1011 
1012                     // We can peek from this slot if it's been filled by an enqueuer, which
1013                     // would have left the sequence number at pos+1.
1014                     int diff = sequenceNumber - (currentHead + 1);
1015                     if (diff == 0)
1016                     {
1017                         result = resultUsed ? _slots[slotsIndex].Item : default(T);
1018                         return true;
1019                     }
1020                     else if (diff < 0)
1021                     {
1022                         // The sequence number was less than what we needed, which means this slot doesn't
1023                         // yet contain a value we can peek, i.e. the segment is empty.  Technically it's
1024                         // possible that multiple enqueuers could have written concurrently, with those
1025                         // getting later slots actually finishing first, so there could be elements after
1026                         // this one that are available, but we need to peek in order.  So before declaring
1027                         // failure and that the segment is empty, we check the tail to see if we're actually
1028                         // empty or if we're just waiting for items in flight or after this one to become available.
1029                         bool frozen = _frozenForEnqueues;
1030                         int currentTail = Volatile.Read(ref _headAndTail.Tail);
1031                         if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
1032                         {
1033                             result = default(T);
1034                             return false;
1035                         }
1036 
1037                         // It's possible it could have become frozen after we checked _frozenForEnqueues
1038                         // and before reading the tail.  That's ok: in that rare race condition, we just
1039                         // loop around again.
1040                     }
1041 
1042                     // Lost a race. Spin a bit, then try again.
1043                     spinner.SpinOnce();
1044                 }
1045             }
1046 
1047             /// <summary>
1048             /// Attempts to enqueue the item.  If successful, the item will be stored
1049             /// in the queue and true will be returned; otherwise, the item won't be stored, and false
1050             /// will be returned.
1051             /// </summary>
TryEnqueue(T item)1052             public bool TryEnqueue(T item)
1053             {
1054                 // Loop in case of contention...
1055                 var spinner = new SpinWait();
1056                 while (true)
1057                 {
1058                     // Get the tail at which to try to return.
1059                     int currentTail = Volatile.Read(ref _headAndTail.Tail);
1060                     int slotsIndex = currentTail & _slotsMask;
1061 
1062                     // Read the sequence number for the tail position.
1063                     int sequenceNumber = Volatile.Read(ref _slots[slotsIndex].SequenceNumber);
1064 
1065                     // The slot is empty and ready for us to enqueue into it if its sequence
1066                     // number matches the slot.
1067                     int diff = sequenceNumber - currentTail;
1068                     if (diff == 0)
1069                     {
1070                         // We may be racing with other enqueuers.  Try to reserve the slot by incrementing
1071                         // the tail.  Once we've done that, no one else will be able to write to this slot,
1072                         // and no dequeuer will be able to read from this slot until we've written the new
1073                         // sequence number. WARNING: The next few lines are not reliable on a runtime that
1074                         // supports thread aborts. If a thread abort were to sneak in after the CompareExchange
1075                         // but before the Volatile.Write, other threads will spin trying to access this slot.
1076                         // If this implementation is ever used on such a platform, this if block should be
1077                         // wrapped in a finally / prepared region.
1078                         if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
1079                         {
1080                             // Successfully reserved the slot.  Note that after the above CompareExchange, other threads
1081                             // trying to return will end up spinning until we do the subsequent Write.
1082                             _slots[slotsIndex].Item = item;
1083                             Volatile.Write(ref _slots[slotsIndex].SequenceNumber, currentTail + 1);
1084                             return true;
1085                         }
1086                     }
1087                     else if (diff < 0)
1088                     {
1089                         // The sequence number was less than what we needed, which means this slot still
1090                         // contains a value, i.e. the segment is full.  Technically it's possible that multiple
1091                         // dequeuers could have read concurrently, with those getting later slots actually
1092                         // finishing first, so there could be spaces after this one that are available, but
1093                         // we need to enqueue in order.
1094                         return false;
1095                     }
1096 
1097                     // Lost a race. Spin a bit, then try again.
1098                     spinner.SpinOnce();
1099                 }
1100             }
1101 
1102             /// <summary>Represents a slot in the queue.</summary>
1103             [StructLayout(LayoutKind.Auto)]
1104             [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")]
1105             internal struct Slot
1106             {
1107                 /// <summary>The item.</summary>
1108                 public T Item;
1109                 /// <summary>The sequence number for this slot, used to synchronize between enqueuers and dequeuers.</summary>
1110                 public int SequenceNumber;
1111             }
1112         }
1113     }
1114 
1115     /// <summary>Padded head and tail indices, to avoid false sharing between producers and consumers.</summary>
1116     [DebuggerDisplay("Head = {Head}, Tail = {Tail}")]
1117     [StructLayout(LayoutKind.Explicit, Size = 384)] // padding before/between/after fields based on worst case cache line size of 128
1118     internal struct PaddedHeadAndTail
1119     {
1120         [FieldOffset(128)] public int Head;
1121         [FieldOffset(256)] public int Tail;
1122     }
1123 }
1124