1 // ==++==
2 //
3 //   Copyright (c) Microsoft Corporation.  All rights reserved.
4 //
5 // ==--==
6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
7 //
8 // Barrier.cs
9 //
10 // <OWNER>Microsoft</OWNER>
11 //
12 // A barrier allows multiple tasks to cooperatively work on some algorithm in parallel.
13 // A group of tasks cooperate by moving through a series of phases, where each in the group signals it has arrived at
14 // the barrier in a given phase and implicitly waits for all others to arrive.
15 // The same barrier can be used for multiple phases.
16 //
17 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
18 using System;
19 using System.Collections.Generic;
20 using System.Diagnostics;
21 using System.Security.Permissions;
22 using System.Runtime.InteropServices;
23 using System.Threading;
24 using System.Runtime.Serialization;
25 using System.Security;
26 namespace System.Threading
27 {
28     /// <summary>
29     /// The exception that is thrown when the post-phase action of a <see cref="Barrier"/> fails.
30     /// </summary>
31 #if !SILVERLIGHT
32     [Serializable]
33 #endif
34     public class BarrierPostPhaseException : Exception
35     {
36         /// <summary>
37         /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class.
38         /// </summary>
BarrierPostPhaseException()39         public BarrierPostPhaseException():this((string)null)
40         {
41         }
42 
43         /// <summary>
44         /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with the specified inner exception.
45         /// </summary>
46         /// <param name="innerException">The exception that is the cause of the current exception.</param>
BarrierPostPhaseException(Exception innerException)47         public BarrierPostPhaseException(Exception innerException): this(null, innerException)
48         {
49         }
50 
51         /// <summary>
52         /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with a specified error message.
53         /// </summary>
54         /// <param name="message">A string that describes the exception.</param>
BarrierPostPhaseException(string message)55         public BarrierPostPhaseException(string message):this(message, null)
56         {
57         }
58 
59         /// <summary>
60         /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with a specified error message and inner exception.
61         /// </summary>
62         /// <param name="message">A string that describes the exception.</param>
63         /// <param name="innerException">The exception that is the cause of the current exception.</param>
BarrierPostPhaseException(string message, Exception innerException)64         public BarrierPostPhaseException(string message, Exception innerException)
65             : base(message == null ? SR.GetString(SR.BarrierPostPhaseException) : message, innerException)
66         {
67         }
68 
69 #if !SILVERLIGHT
70         /// <summary>
71         /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with serialized data.
72         /// </summary>
73         /// <param name="info">The object that holds the serialized object data.</param>
74         /// <param name="context">An object that describes the source or destination of the serialized data.</param>
75         [SecurityCritical]
BarrierPostPhaseException(SerializationInfo info, StreamingContext context)76         protected BarrierPostPhaseException(SerializationInfo info, StreamingContext context)
77             : base(info, context)
78         {
79         }
80 #endif
81     }
82 
83 
84     /// <summary>
85     /// Enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases.
86     /// </summary>
87     /// <remarks>
88     /// <para>
89     /// A group of tasks cooperate by moving through a series of phases, where each in the group signals it
90     /// has arrived at the <see cref="Barrier"/> in a given phase and implicitly waits for all others to
91     /// arrive. The same <see cref="Barrier"/> can be used for multiple phases.
92     /// </para>
93     /// <para>
94     /// All public and protected members of <see cref="Barrier"/> are thread-safe and may be used
95     /// concurrently from multiple threads, with the exception of Dispose, which
96     /// must only be used when all other operations on the <see cref="Barrier"/> have
97     /// completed.
98     /// </para>
99     /// </remarks>
100     [ComVisible(false)]
101 #if !FEATURE_NETCORE
102 #pragma warning disable 0618
103     [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
104 #pragma warning restore 0618
105 #endif
106     [DebuggerDisplay("Participant Count={ParticipantCount},Participants Remaining={ParticipantsRemaining}")]
107     public class Barrier : IDisposable
108     {
109 
110         //This variable holds the basic barrier variables:
111         // 1- The current particiants count
112         // 2- The total participants count
113         // 3- The sense flag (true if the cuurrent phase is even, false otherwise)
114         // The first 15 bits are for the total count which means the maximum participants for the barrier is about 32K
115         // The 16th bit is dummy
116         // The next 15th bit for the current
117         // And the last highest bit is for the sense
118         volatile int m_currentTotalCount;
119 
120         // Bitmask to extract the current count
121         const int CURRENT_MASK = 0x7FFF0000;
122 
123         // Bitmask to extract the total count
124         const int TOTAL_MASK = 0x00007FFF;
125 
126         // Bitmask to extratc the sense flag
127         const int SENSE_MASK = unchecked((int)0x80000000);
128 
129         // The maximum participants the barrier can operate = 32767 ( 2 power 15 - 1 )
130         const int MAX_PARTICIPANTS = TOTAL_MASK;
131 
132 
133         // The current barrier phase
134         // We don't need to worry about overflow, the max value is 2^63-1; If it starts from 0 at a
135         // rate of 4 billion increments per second, it will takes about 64 years to overflow.
136         long m_currentPhase;
137 
138 
139         // dispose flag
140         bool m_disposed;
141 
142         // Odd phases event
143         ManualResetEventSlim m_oddEvent;
144 
145         // Even phases event
146         ManualResetEventSlim m_evenEvent;
147 
148         // The execution context of the creator thread
149         ExecutionContext m_ownerThreadContext;
150 
151         // The EC callback that invokes the psot phase action
152         [SecurityCritical]
153         private static ContextCallback s_invokePostPhaseAction;
154 
155         // Post phase action after each phase
156         Action<Barrier> m_postPhaseAction;
157 
158         // In case the post phase action throws an exception, wraps it in BarrierPostPhaseException
159         Exception m_exception;
160 
161         // This is the ManagedThreadID of the postPhaseAction caller thread, this is used to determine if the SignalAndWait, Dispose or Add/RemoveParticipant caller thread is
162         // the same thread as the postPhaseAction thread which means this method was called from the postPhaseAction which is illegal.
163         // This value is captured before calling the action and reset back to zero after it.
164         int m_actionCallerID;
165 
166         #region Properties
167 
168         /// <summary>
169         /// Gets the number of participants in the barrier that haven’t yet signaled
170         /// in the current phase.
171         /// </summary>
172         /// <remarks>
173         /// This could be 0 during a post-phase action delegate execution or if the
174         /// ParticipantCount is 0.
175         /// </remarks>
176         public int ParticipantsRemaining
177         {
178             get
179             {
180                 int currentTotal = m_currentTotalCount;
181                 int total = (int)(currentTotal & TOTAL_MASK);
182                 int current = (int)((currentTotal & CURRENT_MASK) >> 16);
183                 return total - current;
184             }
185         }
186 
187         /// <summary>
188         /// Gets the total number of participants in the barrier.
189         /// </summary>
190         public int ParticipantCount
191         {
192             get { return (int)(m_currentTotalCount & TOTAL_MASK); }
193         }
194 
195         /// <summary>
196         /// Gets the number of the barrier's current phase.
197         /// </summary>
198         public long CurrentPhaseNumber
199         {
200             // use the new Volatile.Read/Write method because it is cheaper than Interlocked.Read on AMD64 architecture
201             get { return Volatile.Read(ref m_currentPhase); }
202 
203             internal set { Volatile.Write(ref m_currentPhase, value); }
204         }
205 
206         #endregion
207 
208         /// <summary>
209         /// Initializes a new instance of the <see cref="Barrier"/> class.
210         /// </summary>
211         /// <param name="participantCount">The number of participating threads.</param>
212         /// <exception cref="ArgumentOutOfRangeException"> <paramref name="participantCount"/> is less than 0
213         /// or greater than <see cref="T:System.Int16.MaxValue"/>.</exception>
Barrier(int participantCount)214         public Barrier(int participantCount)
215             : this(participantCount, null)
216         {
217         }
218 
219         /// <summary>
220         /// Initializes a new instance of the <see cref="Barrier"/> class.
221         /// </summary>
222         /// <param name="participantCount">The number of participating threads.</param>
223         /// <param name="postPhaseAction">The <see cref="T:System.Action`1"/> to be executed after each
224         /// phase.</param>
225         /// <exception cref="T:System.ArgumentOutOfRangeException"> <paramref name="participantCount"/> is less than 0
226         /// or greater than <see cref="T:System.Int32.MaxValue"/>.</exception>
227         /// <remarks>
228         /// The <paramref name="postPhaseAction"/> delegate will be executed after
229         /// all participants have arrived at the barrier in one phase.  The participants
230         /// will not be released to the next phase until the postPhaseAction delegate
231         /// has completed execution.
232         /// </remarks>
Barrier(int participantCount, Action<Barrier> postPhaseAction)233         public Barrier(int participantCount, Action<Barrier> postPhaseAction)
234         {
235             // the count must be non negative value
236             if (participantCount < 0 || participantCount > MAX_PARTICIPANTS)
237             {
238                 throw new ArgumentOutOfRangeException("participantCount", participantCount, SR.GetString(SR.Barrier_ctor_ArgumentOutOfRange));
239             }
240             m_currentTotalCount = (int)participantCount;
241             m_postPhaseAction = postPhaseAction;
242 
243             //Lazily initialize the events
244             m_oddEvent = new ManualResetEventSlim(true);
245             m_evenEvent = new ManualResetEventSlim(false);
246 
247             // Capture the context if the post phase action is not null
248             if (postPhaseAction != null && !ExecutionContext.IsFlowSuppressed())
249             {
250                 m_ownerThreadContext = ExecutionContext.Capture();
251             }
252 
253             m_actionCallerID = 0;
254 
255         }
256 
257         /// <summary>
258         /// Extract the three variables current, total and sense from a given big variable
259         /// </summary>
260         /// <param name="currentTotal">The integer variable that contains the other three variables</param>
261         /// <param name="current">The current cparticipant count</param>
262         /// <param name="total">The total participants count</param>
263         /// <param name="sense">The sense flag</param>
GetCurrentTotal(int currentTotal, out int current, out int total, out bool sense)264         private void GetCurrentTotal(int currentTotal, out int current, out int total, out bool sense)
265         {
266             total = (int)(currentTotal & TOTAL_MASK);
267             current = (int)((currentTotal & CURRENT_MASK) >> 16);
268             sense = (currentTotal & SENSE_MASK) == 0 ? true : false;
269         }
270 
271         /// <summary>
272         /// Write the three variables current. total and the sense to the m_currentTotal
273         /// </summary>
274         /// <param name="currentTotal">The old current total to compare</param>
275         /// <param name="current">The current cparticipant count</param>
276         /// <param name="total">The total participants count</param>
277         /// <param name="sense">The sense flag</param>
278         /// <returns>True if the CAS succeeded, false otherwise</returns>
SetCurrentTotal(int currentTotal, int current, int total, bool sense)279         private bool SetCurrentTotal(int currentTotal, int current, int total, bool sense)
280         {
281             int newCurrentTotal = (current <<16) | total;
282 
283             if (!sense)
284             {
285                 newCurrentTotal |= SENSE_MASK;
286             }
287 
288 #pragma warning disable 0420
289             return Interlocked.CompareExchange(ref m_currentTotalCount, newCurrentTotal, currentTotal) == currentTotal;
290 #pragma warning restore 0420
291         }
292 
293         /// <summary>
294         /// Notifies the <see cref="Barrier"/> that there will be an additional participant.
295         /// </summary>
296         /// <returns>The phase number of the barrier in which the new participants will first
297         /// participate.</returns>
298         /// <exception cref="T:System.InvalidOperationException">
299         /// Adding a participant would cause the barrier's participant count to
300         /// exceed <see cref="T:System.Int16.MaxValue"/>.
301         /// </exception>
302         /// <exception cref="T:System.InvalidOperationException">
303         /// The method was invoked from within a post-phase action.
304         /// </exception>
305         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
306         /// disposed.</exception>
AddParticipant()307         public long AddParticipant()
308         {
309             try
310             {
311                 return AddParticipants(1);
312             }
313             catch (ArgumentOutOfRangeException)
314             {
315                 throw new InvalidOperationException(SR.GetString(SR.Barrier_AddParticipants_Overflow_ArgumentOutOfRange));
316             }
317         }
318 
319         /// <summary>
320         /// Notifies the <see cref="Barrier"/> that there will be additional participants.
321         /// </summary>
322         /// <param name="participantCount">The number of additional participants to add to the
323         /// barrier.</param>
324         /// <returns>The phase number of the barrier in which the new participants will first
325         /// participate.</returns>
326         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="participantCount"/> is less than
327         /// 0.</exception>
328         /// <exception cref="T:System.ArgumentOutOfRangeException">Adding <paramref name="participantCount"/> participants would cause the
329         /// barrier's participant count to exceed <see cref="T:System.Int16.MaxValue"/>.</exception>
330         /// <exception cref="T:System.InvalidOperationException">
331         /// The method was invoked from within a post-phase action.
332         /// </exception>
333         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
334         /// disposed.</exception>
AddParticipants(int participantCount)335         public long AddParticipants(int participantCount)
336         {
337             // check dispose
338             ThrowIfDisposed();
339 
340             if (participantCount < 1 )
341             {
342                 throw new ArgumentOutOfRangeException("participantCount", participantCount,
343                     SR.GetString(SR.Barrier_AddParticipants_NonPositive_ArgumentOutOfRange));
344             }
345             else if (participantCount > MAX_PARTICIPANTS) //overflow
346             {
347                 throw new ArgumentOutOfRangeException("participantCount",
348                         SR.GetString(SR.Barrier_AddParticipants_Overflow_ArgumentOutOfRange));
349             }
350 
351             // in case of this is called from the PHA
352             if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID)
353             {
354                 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA));
355             }
356 
357             SpinWait spinner = new SpinWait();
358             long newPhase = 0;
359             while (true)
360             {
361                 int currentTotal = m_currentTotalCount;
362                 int total;
363                 int current;
364                 bool sense;
365                 GetCurrentTotal(currentTotal, out current, out total, out sense);
366                 if (participantCount + total > MAX_PARTICIPANTS) //overflow
367                 {
368                     throw new ArgumentOutOfRangeException("participantCount",
369                         SR.GetString(SR.Barrier_AddParticipants_Overflow_ArgumentOutOfRange));
370                 }
371 
372                 if (SetCurrentTotal(currentTotal, current, total + participantCount, sense))
373                 {
374                     // Calculating the first phase for that participant, if the current phase already finished return the nextphase else return the current phase
375                     // To know that the current phase is  the sense doesn't match the
376                     // phase odd even, so that means it didn't yet change the phase count, so currentPhase +1 is returned, otherwise currentPhase is returned
377                     long currPhase = CurrentPhaseNumber;
378                     newPhase = (sense != (currPhase % 2 == 0)) ? currPhase + 1 : currPhase;
379 
380                     // If this participant is going to join the next phase, which means the postPhaseAction is being running, this participants must wait until this done
381                     // and its event is reset.
382                     // Without that, if the postPhaseAction takes long time, this means the event ehich the current participant is goint to wait on is still set
383                     // (FinishPPhase didn't reset it yet) so it should wait until it reset
384                     if (newPhase != currPhase)
385                     {
386                         // Wait on the opposite event
387                         if (sense)
388                         {
389                             m_oddEvent.Wait();
390                         }
391                         else
392                         {
393                             m_evenEvent.Wait();
394                         }
395                     }
396 
397                     //This else to fix the racing where the current phase has been finished, m_currentPhase has been updated but the events have not been set/reset yet
398                     // otherwise when this participant calls SignalAndWait it will wait on a set event however all other participants have not arrived yet.
399                     else
400                     {
401                         if (sense && m_evenEvent.IsSet)
402                             m_evenEvent.Reset();
403                         else if (!sense && m_oddEvent.IsSet)
404                             m_oddEvent.Reset();
405                     }
406                     break;
407                 }
408                 spinner.SpinOnce();
409             }
410             return newPhase;
411         }
412 
413         /// <summary>
414         /// Notifies the <see cref="Barrier"/> that there will be one less participant.
415         /// </summary>
416         /// <exception cref="T:System.InvalidOperationException">The barrier already has 0
417         /// participants.</exception>
418         /// <exception cref="T:System.InvalidOperationException">
419         /// The method was invoked from within a post-phase action.
420         /// </exception>
421         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
422         /// disposed.</exception>
RemoveParticipant()423         public void RemoveParticipant()
424         {
425             RemoveParticipants(1);
426         }
427 
428         /// <summary>
429         /// Notifies the <see cref="Barrier"/> that there will be fewer participants.
430         /// </summary>
431         /// <param name="participantCount">The number of additional participants to remove from the barrier.</param>
432         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="participantCount"/> is less than
433         /// 0.</exception>
434         /// <exception cref="T:System.InvalidOperationException">The barrier already has 0 participants.</exception>
435         /// <exception cref="T:System.InvalidOperationException">
436         /// The method was invoked from within a post-phase action.
437         /// </exception>
438         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
439         /// disposed.</exception>
RemoveParticipants(int participantCount)440         public void RemoveParticipants(int participantCount)
441         {
442             // check dispose
443             ThrowIfDisposed();
444 
445             // Validate input
446             if (participantCount < 1)
447             {
448                 throw new ArgumentOutOfRangeException("participantCount", participantCount,
449                     SR.GetString(SR.Barrier_RemoveParticipants_NonPositive_ArgumentOutOfRange));
450             }
451 
452             // in case of this is called from the PHA
453             if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID)
454             {
455                 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA));
456             }
457 
458             SpinWait spinner = new SpinWait();
459             while (true)
460             {
461                 int currentTotal = m_currentTotalCount;
462                 int total;
463                 int current;
464                 bool sense;
465                 GetCurrentTotal(currentTotal, out current, out total, out sense);
466 
467                 if (total < participantCount)
468                 {
469                     throw new ArgumentOutOfRangeException("participantCount",
470                         SR.GetString(SR.Barrier_RemoveParticipants_ArgumentOutOfRange));
471                 }
472                 if (total - participantCount < current)
473                 {
474                     throw new InvalidOperationException(SR.GetString(SR.Barrier_RemoveParticipants_InvalidOperation));
475                 }
476                 // If the remaining participats = current participants, then finish the current phase
477                 int remaingParticipants = total - participantCount;
478                 if (remaingParticipants > 0 && current == remaingParticipants )
479                 {
480                     if (SetCurrentTotal(currentTotal, 0, total - participantCount, !sense))
481                     {
482                         FinishPhase(sense);
483                         break;
484                     }
485                 }
486                 else
487                 {
488                     if (SetCurrentTotal(currentTotal, current, total - participantCount, sense))
489                     {
490                         break;
491                     }
492                 }
493                 spinner.SpinOnce();
494             }
495         }
496 
497         /// <summary>
498         /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
499         /// participants to reach the barrier as well.
500         /// </summary>
501         /// <exception cref="T:System.InvalidOperationException">
502         /// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
503         /// or the barrier is being used by more threads than are registered as participants.
504         /// </exception>
505         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
506         /// disposed.</exception>
SignalAndWait()507         public void SignalAndWait()
508         {
509             SignalAndWait(new CancellationToken());
510         }
511 
512         /// <summary>
513         /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
514         /// participants to reach the barrier, while observing a <see
515         /// cref="T:System.Threading.CancellationToken"/>.
516         /// </summary>
517         /// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to
518         /// observe.</param>
519         /// <exception cref="T:System.InvalidOperationException">
520         /// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
521         /// or the barrier is being used by more threads than are registered as participants.
522         /// </exception>
523         /// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been
524         /// canceled.</exception>
525         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
526         /// disposed.</exception>
SignalAndWait(CancellationToken cancellationToken)527         public void SignalAndWait(CancellationToken cancellationToken)
528         {
529 #if DEBUG
530             bool result =
531 #endif
532             SignalAndWait(Timeout.Infinite, cancellationToken);
533 #if DEBUG
534             Debug.Assert(result);
535 #endif
536         }
537 
538         /// <summary>
539         /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
540         /// participants to reach the barrier as well, using a
541         /// <see cref="T:System.TimeSpan"/> to measure the time interval.
542         /// </summary>
543         /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of
544         /// milliseconds to wait, or a <see cref="T:System.TimeSpan"/> that represents -1 milliseconds to
545         /// wait indefinitely.</param>
546         /// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
547         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/>is a negative number
548         /// other than -1 milliseconds, which represents an infinite time-out, or it is greater than
549         /// <see cref="T:System.Int32.MaxValue"/>.</exception>
550         /// <exception cref="T:System.InvalidOperationException">
551         /// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
552         /// or the barrier is being used by more threads than are registered as participants.
553         /// </exception>
554         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
555         /// disposed.</exception>
SignalAndWait(TimeSpan timeout)556         public Boolean SignalAndWait(TimeSpan timeout)
557         {
558             return SignalAndWait(timeout, new CancellationToken());
559         }
560 
561         /// <summary>
562         /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
563         /// participants to reach the barrier as well, using a
564         /// <see cref="T:System.TimeSpan"/> to measure the time interval, while observing a <see
565         /// cref="T:System.Threading.CancellationToken"/>.
566         /// </summary>
567         /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of
568         /// milliseconds to wait, or a <see cref="T:System.TimeSpan"/> that represents -1 milliseconds to
569         /// wait indefinitely.</param>
570         /// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to
571         /// observe.</param>
572         /// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
573         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/>is a negative number
574         /// other than -1 milliseconds, which represents an infinite time-out.</exception>
575         /// <exception cref="T:System.InvalidOperationException">
576         /// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
577         /// or the barrier is being used by more threads than are registered as participants.
578         /// </exception>
579         /// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been
580         /// canceled.</exception>
581         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
582         /// disposed.</exception>
SignalAndWait(TimeSpan timeout, CancellationToken cancellationToken)583         public Boolean SignalAndWait(TimeSpan timeout, CancellationToken cancellationToken)
584         {
585             Int64 totalMilliseconds = (Int64)timeout.TotalMilliseconds;
586             if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
587             {
588                 throw new System.ArgumentOutOfRangeException("timeout", timeout,
589                     SR.GetString(SR.Barrier_SignalAndWait_ArgumentOutOfRange));
590             }
591             return SignalAndWait((int)timeout.TotalMilliseconds, cancellationToken);
592         }
593 
594         /// <summary>
595         /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other
596         /// participants to reach the barrier as well, using a
597         /// 32-bit signed integer to measure the time interval.
598         /// </summary>
599         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
600         /// cref="Timeout.Infinite"/>(-1) to wait indefinitely.</param>
601         /// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
602         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
603         /// negative number other than -1, which represents an infinite time-out.</exception>
604         /// <exception cref="T:System.InvalidOperationException">
605         /// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
606         /// or the barrier is being used by more threads than are registered as participants.
607         /// </exception>
608         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
609         /// disposed.</exception>
SignalAndWait(int millisecondsTimeout)610         public bool SignalAndWait(int millisecondsTimeout)
611         {
612             return SignalAndWait(millisecondsTimeout, new CancellationToken());
613         }
614 
615         /// <summary>
616         /// Signals that a participant has reached the barrier and waits for all other participants to reach
617         /// the barrier as well, using a
618         /// 32-bit signed integer to measure the time interval, while observing a <see
619         /// cref="T:System.Threading.CancellationToken"/>.
620         /// </summary>
621         /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see
622         /// cref="Timeout.Infinite"/>(-1) to wait indefinitely.</param>
623         /// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to
624         /// observe.</param>
625         /// <returns>true if all other participants reached the barrier; otherwise, false.</returns>
626         /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a
627         /// negative number other than -1, which represents an infinite time-out.</exception>
628         /// <exception cref="T:System.InvalidOperationException">
629         /// The method was invoked from within a post-phase action, the barrier currently has 0 participants,
630         /// or the barrier is being used by more threads than are registered as participants.
631         /// </exception>
632         /// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been
633         /// canceled.</exception>
634         /// <exception cref="T:System.ObjectDisposedException">The current instance has already been
635         /// disposed.</exception>
SignalAndWait(int millisecondsTimeout, CancellationToken cancellationToken)636         public bool SignalAndWait(int millisecondsTimeout, CancellationToken cancellationToken)
637         {
638             ThrowIfDisposed();
639             cancellationToken.ThrowIfCancellationRequested();
640 
641             if (millisecondsTimeout < -1)
642             {
643                 throw new System.ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout,
644                     SR.GetString(SR.Barrier_SignalAndWait_ArgumentOutOfRange));
645             }
646 
647             // in case of this is called from the PHA
648             if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID)
649             {
650                 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA));
651             }
652 
653             // local variables to extract the basic barrier variable and update them
654             // The are declared here instead of inside the loop body because the will be used outside the loop
655             bool sense; // The sense of the barrier *before* the phase associated with this SignalAndWait call completes
656             int total;
657             int current;
658             int currentTotal;
659             long phase;
660             SpinWait spinner = new SpinWait();
661             while (true)
662             {
663                 currentTotal = m_currentTotalCount;
664                 GetCurrentTotal(currentTotal, out current, out total, out sense);
665                 phase = CurrentPhaseNumber;
666                 // throw if zero participants
667                 if (total == 0)
668                 {
669                     throw new InvalidOperationException(SR.GetString(SR.Barrier_SignalAndWait_InvalidOperation_ZeroTotal));
670                 }
671                 // Try to detect if the number of threads for this phase exceeded the total number of participants or not
672                 // This can be detected if the current is zero which means all participants for that phase has arrived and the phase number is not changed yet
673                 if (current == 0 && sense != (CurrentPhaseNumber % 2 == 0))
674                 {
675                     throw new InvalidOperationException(SR.GetString(SR.Barrier_SignalAndWait_InvalidOperation_ThreadsExceeded));
676                 }
677                 //This is the last thread, finish the phase
678                 if (current + 1 == total)
679                 {
680                     if (SetCurrentTotal(currentTotal, 0, total, !sense))
681                     {
682 #if !FEATURE_PAL && !SILVERLIGHT    // PAL doesn't support  eventing
683                         if (CdsSyncEtwBCLProvider.Log.IsEnabled())
684                         {
685                             CdsSyncEtwBCLProvider.Log.Barrier_PhaseFinished(sense, CurrentPhaseNumber);
686                         }
687 #endif
688                         FinishPhase(sense);
689                         return true;
690                     }
691                 }
692                 else if (SetCurrentTotal(currentTotal, current + 1, total, sense))
693                 {
694                     break;
695                 }
696 
697                 spinner.SpinOnce();
698 
699             }
700 
701             // ** Perform the real wait **
702             // select the correct event to wait on, based on the current sense.
703             ManualResetEventSlim eventToWaitOn = (sense) ? m_evenEvent : m_oddEvent;
704 
705             bool waitWasCanceled = false;
706             bool waitResult = false;
707             try
708             {
709                 waitResult = DiscontinuousWait(eventToWaitOn, millisecondsTimeout, cancellationToken, phase);
710             }
711             catch (OperationCanceledException )
712             {
713                 waitWasCanceled = true;
714             }
715             catch (ObjectDisposedException)// in case a ---- happen where one of the thread returned from SignalAndWait and the current thread calls Wait on a disposed event
716             {
717                 // make sure the current phase for this thread is already finished, otherwise propagate the exception
718                 if (phase < CurrentPhaseNumber)
719                     waitResult = true;
720                 else
721                     throw;
722             }
723 
724 
725 
726             if (!waitResult)
727             {
728                 //reset the spinLock to prepare it for the next loop
729                 spinner.Reset();
730 
731                 //If the wait timeout expired and all other thread didn't reach the barrier yet, update the current count back
732                 while (true)
733                 {
734                     bool newSense;
735                     currentTotal = m_currentTotalCount;
736                     GetCurrentTotal(currentTotal, out current, out total, out newSense);
737                     // If the timeout expired and the phase has just finished, return true and this is considered as succeeded SignalAndWait
738                     //otherwise the timeout expired and the current phase has not been finished yet, return false
739                     //The phase is finished if the phase member variable is changed (incremented) or the sense has been changed
740                     // we have to use the statements in the comparison below for two cases:
741                     // 1- The sense is changed but the last thread didn't update the phase yet
742                     // 2- The phase is already incremented but the sense flipped twice due to the termination of the next phase
743                     if (phase < CurrentPhaseNumber || sense != newSense)
744                     {
745 
746                         // The current phase has been finished, but we shouldn't return before the events are set/reset otherwise this thread could start
747                         // next phase and the appropriate event has not reset yet which could make it return immediately from the next phase SignalAndWait
748                         // before waiting other threads
749                         WaitCurrentPhase(eventToWaitOn, phase);
750                         Debug.Assert(phase < CurrentPhaseNumber);
751                         break;
752                     }
753                     //The phase has not been finished yet, try to update the current count.
754                     if (SetCurrentTotal(currentTotal, current - 1, total, sense))
755                     {
756                         //if here, then the attempt to backout was successful.
757                         //throw (a fresh) oce if cancellation woke the wait
758                         //or return false if it was the timeout that woke the wait.
759                         //
760                         if (waitWasCanceled)
761                             throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken);
762                         else
763                             return false;
764                     }
765                     spinner.SpinOnce();
766                 }
767             }
768 
769             if (m_exception != null)
770                 throw new BarrierPostPhaseException(m_exception);
771 
772             return true;
773 
774         }
775 
776         /// <summary>
777         /// Finish the phase by invoking the post phase action, and setting the event, this must be called by the
778         /// last arrival thread
779         /// </summary>
780         /// <param name="observedSense">The current phase sense</param>
781         [SecuritySafeCritical]
FinishPhase(bool observedSense)782         private void FinishPhase(bool observedSense)
783         {
784             // Execute the PHA in try/finally block to reset the variables back in case of it threw an exception
785             if (m_postPhaseAction != null)
786             {
787                 try
788                 {
789                     // Capture the caller thread ID to check if the Add/RemoveParticipant(s) is called from the PHA
790                     m_actionCallerID = Thread.CurrentThread.ManagedThreadId;
791                     if (m_ownerThreadContext != null)
792                     {
793                         var currentContext = m_ownerThreadContext;
794                         m_ownerThreadContext = m_ownerThreadContext.CreateCopy(); // create a copy for the next run
795 
796                         ContextCallback handler = s_invokePostPhaseAction;
797                         if (handler == null)
798                         {
799                             s_invokePostPhaseAction = handler = InvokePostPhaseAction;
800                         }
801                         ExecutionContext.Run(currentContext, handler, this);
802 #if !PFX_LEGACY_3_5
803                         // Dispose the context directly after using it,
804                         // the copy will either be used and siposed in the next phase or in the Dispose
805                         currentContext.Dispose();
806 #endif
807                     }
808                     else
809                     {
810                         m_postPhaseAction(this);
811                     }
812                     m_exception = null; // reset the exception if it was set previously
813                 }
814                 catch (Exception ex)
815                 {
816                     m_exception = ex;
817                 }
818                 finally
819                 {
820                     m_actionCallerID = 0;
821                     SetResetEvents(observedSense);
822                     if(m_exception != null)
823                         throw new BarrierPostPhaseException(m_exception);
824                 }
825 
826             }
827             else
828             {
829                 SetResetEvents(observedSense);
830             }
831         }
832 
833         /// <summary>
834         /// Helper method to call the post phase action
835         /// </summary>
836         /// <param name="obj"></param>
837         [SecurityCritical]
InvokePostPhaseAction(object obj)838         private static void InvokePostPhaseAction(object obj)
839         {
840             var thisBarrier = (Barrier)obj;
841             thisBarrier.m_postPhaseAction(thisBarrier);
842         }
843 
844         /// <summary>
845         /// Sets the current phase event and reset the next phase event
846         /// </summary>
847         /// <param name="observedSense">The current phase sense</param>
SetResetEvents(bool observedSense)848         private void SetResetEvents(bool observedSense)
849         {
850             // Increment the phase count using Volatile class because m_currentPhase is 64 bit long type, that could cause torn write on 32 bit machines
851             CurrentPhaseNumber = CurrentPhaseNumber + 1;
852             if (observedSense)
853             {
854                 m_oddEvent.Reset();
855                 m_evenEvent.Set();
856             }
857             else
858             {
859                 m_evenEvent.Reset();
860                 m_oddEvent.Set();
861             }
862         }
863 
864         /// <summary>
865         /// Wait until the current phase finishes completely by spinning until either the event is set,
866         /// or the phase count is incremented more than one time
867         /// </summary>
868         /// <param name="currentPhaseEvent">The current phase event</param>
869         /// <param name="observedPhase">The current phase for that thread</param>
WaitCurrentPhase(ManualResetEventSlim currentPhaseEvent, long observedPhase)870         private void WaitCurrentPhase(ManualResetEventSlim currentPhaseEvent, long observedPhase)
871         {
872             //spin until either of these two conditions succeeds
873             //1- The event is set
874             //2- the phase count is incremented more than one time, this means the next phase is finished as well,
875             //but the event will be reset again, so we check the phase count instead
876             SpinWait spinner = new SpinWait();
877             while (!currentPhaseEvent.IsSet && CurrentPhaseNumber - observedPhase <= 1)
878             {
879                 spinner.SpinOnce();
880             }
881         }
882 
883         /// <summary>
884         /// The reason of discontinuous waiting instead of direct waiting on the event is to avoid the ---- where the sense is
885         /// changed twice because the next phase is finished (due to either RemoveParticipant is called or another thread joined
886         /// the next phase instead of the current thread) so the current thread will be stuck on the event because it is reset back
887         /// The maxwait and the shift numbers are arbitrarily choosen, there were no references picking them
888         /// </summary>
889         /// <param name="currentPhaseEvent">The current phase event</param>
890         /// <param name="totalTimeout">wait timeout in milliseconds</param>
891         /// <param name="token">cancellation token passed to SignalAndWait</param>
892         /// <param name="observedPhase">The current phase number for this thread</param>
893         /// <returns>True if the event is set or the phasenumber changed, false if the timeout expired</returns>
DiscontinuousWait(ManualResetEventSlim currentPhaseEvent, int totalTimeout, CancellationToken token, long observedPhase)894         private bool DiscontinuousWait(ManualResetEventSlim currentPhaseEvent, int totalTimeout, CancellationToken token, long observedPhase)
895         {
896             int maxWait = 100; // 100 ms
897             int waitTimeCeiling = 10000; // 10 seconds
898             while (observedPhase == CurrentPhaseNumber)
899             {
900                 // the next wait time, the min of the maxWait and the totalTimeout
901                 int waitTime = totalTimeout == Timeout.Infinite ? maxWait : Math.Min(maxWait, totalTimeout);
902 
903                 if (currentPhaseEvent.Wait(waitTime, token)) return true;
904 
905                 //update the total wait time
906                 if (totalTimeout != Timeout.Infinite)
907                 {
908                     totalTimeout -= waitTime;
909                     if (totalTimeout <= 0) return false;
910                 }
911 
912                 //if the maxwait exceeded 10 seconds then we will stop increasing the maxWait time and keep it 10 seconds, otherwise keep doubling it
913                 maxWait = maxWait >= waitTimeCeiling ? waitTimeCeiling : Math.Min(maxWait << 1, waitTimeCeiling);
914             }
915 
916             //if we exited the loop because the observed phase doesn't match the current phase, then we have to spin to mske sure
917             //the event is set or the next phase is finished
918             WaitCurrentPhase(currentPhaseEvent, observedPhase);
919 
920             return true;
921         }
922 
923         /// <summary>
924         /// Releases all resources used by the current instance of <see cref="Barrier"/>.
925         /// </summary>
926         /// <exception cref="T:System.InvalidOperationException">
927         /// The method was invoked from within a post-phase action.
928         /// </exception>
929         /// <remarks>
930         /// Unlike most of the members of <see cref="Barrier"/>, Dispose is not thread-safe and may not be
931         /// used concurrently with other members of this instance.
932         /// </remarks>
Dispose()933         public void Dispose()
934         {
935             // in case of this is called from the PHA
936             if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID)
937             {
938                 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA));
939             }
940             Dispose(true);
941             GC.SuppressFinalize(this);
942         }
943 
944         /// <summary>
945         /// When overridden in a derived class, releases the unmanaged resources used by the
946         /// <see cref="Barrier"/>, and optionally releases the managed resources.
947         /// </summary>
948         /// <param name="disposing">true to release both managed and unmanaged resources; false to release
949         /// only unmanaged resources.</param>
950         /// <remarks>
951         /// Unlike most of the members of <see cref="Barrier"/>, Dispose is not thread-safe and may not be
952         /// used concurrently with other members of this instance.
953         /// </remarks>
Dispose(bool disposing)954         protected virtual void Dispose(bool disposing)
955         {
956             if (!m_disposed)
957             {
958                 if (disposing)
959                 {
960                     m_oddEvent.Dispose();
961                     m_evenEvent.Dispose();
962 #if !PFX_LEGACY_3_5
963                     if (m_ownerThreadContext != null)
964                     {
965                         m_ownerThreadContext.Dispose();
966                         m_ownerThreadContext = null;
967                     }
968 #endif
969                 }
970                 m_disposed = true;
971             }
972         }
973 
974         /// <summary>
975         /// Throw ObjectDisposedException if the barrier is disposed
976         /// </summary>
ThrowIfDisposed()977         private void ThrowIfDisposed()
978         {
979             if (m_disposed)
980             {
981                 throw new ObjectDisposedException("Barrier", SR.GetString(SR.Barrier_Dispose));
982             }
983         }
984     }
985 }
986