1 // ==++== 2 // 3 // Copyright (c) Microsoft Corporation. All rights reserved. 4 // 5 // ==--== 6 // =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ 7 // 8 // Barrier.cs 9 // 10 // <OWNER>Microsoft</OWNER> 11 // 12 // A barrier allows multiple tasks to cooperatively work on some algorithm in parallel. 13 // A group of tasks cooperate by moving through a series of phases, where each in the group signals it has arrived at 14 // the barrier in a given phase and implicitly waits for all others to arrive. 15 // The same barrier can be used for multiple phases. 16 // 17 // =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- 18 using System; 19 using System.Collections.Generic; 20 using System.Diagnostics; 21 using System.Security.Permissions; 22 using System.Runtime.InteropServices; 23 using System.Threading; 24 using System.Runtime.Serialization; 25 using System.Security; 26 namespace System.Threading 27 { 28 /// <summary> 29 /// The exception that is thrown when the post-phase action of a <see cref="Barrier"/> fails. 30 /// </summary> 31 #if !SILVERLIGHT 32 [Serializable] 33 #endif 34 public class BarrierPostPhaseException : Exception 35 { 36 /// <summary> 37 /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class. 38 /// </summary> BarrierPostPhaseException()39 public BarrierPostPhaseException():this((string)null) 40 { 41 } 42 43 /// <summary> 44 /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with the specified inner exception. 45 /// </summary> 46 /// <param name="innerException">The exception that is the cause of the current exception.</param> BarrierPostPhaseException(Exception innerException)47 public BarrierPostPhaseException(Exception innerException): this(null, innerException) 48 { 49 } 50 51 /// <summary> 52 /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with a specified error message. 53 /// </summary> 54 /// <param name="message">A string that describes the exception.</param> BarrierPostPhaseException(string message)55 public BarrierPostPhaseException(string message):this(message, null) 56 { 57 } 58 59 /// <summary> 60 /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with a specified error message and inner exception. 61 /// </summary> 62 /// <param name="message">A string that describes the exception.</param> 63 /// <param name="innerException">The exception that is the cause of the current exception.</param> BarrierPostPhaseException(string message, Exception innerException)64 public BarrierPostPhaseException(string message, Exception innerException) 65 : base(message == null ? SR.GetString(SR.BarrierPostPhaseException) : message, innerException) 66 { 67 } 68 69 #if !SILVERLIGHT 70 /// <summary> 71 /// Initializes a new instance of the <see cref="BarrierPostPhaseException"/> class with serialized data. 72 /// </summary> 73 /// <param name="info">The object that holds the serialized object data.</param> 74 /// <param name="context">An object that describes the source or destination of the serialized data.</param> 75 [SecurityCritical] BarrierPostPhaseException(SerializationInfo info, StreamingContext context)76 protected BarrierPostPhaseException(SerializationInfo info, StreamingContext context) 77 : base(info, context) 78 { 79 } 80 #endif 81 } 82 83 84 /// <summary> 85 /// Enables multiple tasks to cooperatively work on an algorithm in parallel through multiple phases. 86 /// </summary> 87 /// <remarks> 88 /// <para> 89 /// A group of tasks cooperate by moving through a series of phases, where each in the group signals it 90 /// has arrived at the <see cref="Barrier"/> in a given phase and implicitly waits for all others to 91 /// arrive. The same <see cref="Barrier"/> can be used for multiple phases. 92 /// </para> 93 /// <para> 94 /// All public and protected members of <see cref="Barrier"/> are thread-safe and may be used 95 /// concurrently from multiple threads, with the exception of Dispose, which 96 /// must only be used when all other operations on the <see cref="Barrier"/> have 97 /// completed. 98 /// </para> 99 /// </remarks> 100 [ComVisible(false)] 101 #if !FEATURE_NETCORE 102 #pragma warning disable 0618 103 [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)] 104 #pragma warning restore 0618 105 #endif 106 [DebuggerDisplay("Participant Count={ParticipantCount},Participants Remaining={ParticipantsRemaining}")] 107 public class Barrier : IDisposable 108 { 109 110 //This variable holds the basic barrier variables: 111 // 1- The current particiants count 112 // 2- The total participants count 113 // 3- The sense flag (true if the cuurrent phase is even, false otherwise) 114 // The first 15 bits are for the total count which means the maximum participants for the barrier is about 32K 115 // The 16th bit is dummy 116 // The next 15th bit for the current 117 // And the last highest bit is for the sense 118 volatile int m_currentTotalCount; 119 120 // Bitmask to extract the current count 121 const int CURRENT_MASK = 0x7FFF0000; 122 123 // Bitmask to extract the total count 124 const int TOTAL_MASK = 0x00007FFF; 125 126 // Bitmask to extratc the sense flag 127 const int SENSE_MASK = unchecked((int)0x80000000); 128 129 // The maximum participants the barrier can operate = 32767 ( 2 power 15 - 1 ) 130 const int MAX_PARTICIPANTS = TOTAL_MASK; 131 132 133 // The current barrier phase 134 // We don't need to worry about overflow, the max value is 2^63-1; If it starts from 0 at a 135 // rate of 4 billion increments per second, it will takes about 64 years to overflow. 136 long m_currentPhase; 137 138 139 // dispose flag 140 bool m_disposed; 141 142 // Odd phases event 143 ManualResetEventSlim m_oddEvent; 144 145 // Even phases event 146 ManualResetEventSlim m_evenEvent; 147 148 // The execution context of the creator thread 149 ExecutionContext m_ownerThreadContext; 150 151 // The EC callback that invokes the psot phase action 152 [SecurityCritical] 153 private static ContextCallback s_invokePostPhaseAction; 154 155 // Post phase action after each phase 156 Action<Barrier> m_postPhaseAction; 157 158 // In case the post phase action throws an exception, wraps it in BarrierPostPhaseException 159 Exception m_exception; 160 161 // This is the ManagedThreadID of the postPhaseAction caller thread, this is used to determine if the SignalAndWait, Dispose or Add/RemoveParticipant caller thread is 162 // the same thread as the postPhaseAction thread which means this method was called from the postPhaseAction which is illegal. 163 // This value is captured before calling the action and reset back to zero after it. 164 int m_actionCallerID; 165 166 #region Properties 167 168 /// <summary> 169 /// Gets the number of participants in the barrier that haven’t yet signaled 170 /// in the current phase. 171 /// </summary> 172 /// <remarks> 173 /// This could be 0 during a post-phase action delegate execution or if the 174 /// ParticipantCount is 0. 175 /// </remarks> 176 public int ParticipantsRemaining 177 { 178 get 179 { 180 int currentTotal = m_currentTotalCount; 181 int total = (int)(currentTotal & TOTAL_MASK); 182 int current = (int)((currentTotal & CURRENT_MASK) >> 16); 183 return total - current; 184 } 185 } 186 187 /// <summary> 188 /// Gets the total number of participants in the barrier. 189 /// </summary> 190 public int ParticipantCount 191 { 192 get { return (int)(m_currentTotalCount & TOTAL_MASK); } 193 } 194 195 /// <summary> 196 /// Gets the number of the barrier's current phase. 197 /// </summary> 198 public long CurrentPhaseNumber 199 { 200 // use the new Volatile.Read/Write method because it is cheaper than Interlocked.Read on AMD64 architecture 201 get { return Volatile.Read(ref m_currentPhase); } 202 203 internal set { Volatile.Write(ref m_currentPhase, value); } 204 } 205 206 #endregion 207 208 /// <summary> 209 /// Initializes a new instance of the <see cref="Barrier"/> class. 210 /// </summary> 211 /// <param name="participantCount">The number of participating threads.</param> 212 /// <exception cref="ArgumentOutOfRangeException"> <paramref name="participantCount"/> is less than 0 213 /// or greater than <see cref="T:System.Int16.MaxValue"/>.</exception> Barrier(int participantCount)214 public Barrier(int participantCount) 215 : this(participantCount, null) 216 { 217 } 218 219 /// <summary> 220 /// Initializes a new instance of the <see cref="Barrier"/> class. 221 /// </summary> 222 /// <param name="participantCount">The number of participating threads.</param> 223 /// <param name="postPhaseAction">The <see cref="T:System.Action`1"/> to be executed after each 224 /// phase.</param> 225 /// <exception cref="T:System.ArgumentOutOfRangeException"> <paramref name="participantCount"/> is less than 0 226 /// or greater than <see cref="T:System.Int32.MaxValue"/>.</exception> 227 /// <remarks> 228 /// The <paramref name="postPhaseAction"/> delegate will be executed after 229 /// all participants have arrived at the barrier in one phase. The participants 230 /// will not be released to the next phase until the postPhaseAction delegate 231 /// has completed execution. 232 /// </remarks> Barrier(int participantCount, Action<Barrier> postPhaseAction)233 public Barrier(int participantCount, Action<Barrier> postPhaseAction) 234 { 235 // the count must be non negative value 236 if (participantCount < 0 || participantCount > MAX_PARTICIPANTS) 237 { 238 throw new ArgumentOutOfRangeException("participantCount", participantCount, SR.GetString(SR.Barrier_ctor_ArgumentOutOfRange)); 239 } 240 m_currentTotalCount = (int)participantCount; 241 m_postPhaseAction = postPhaseAction; 242 243 //Lazily initialize the events 244 m_oddEvent = new ManualResetEventSlim(true); 245 m_evenEvent = new ManualResetEventSlim(false); 246 247 // Capture the context if the post phase action is not null 248 if (postPhaseAction != null && !ExecutionContext.IsFlowSuppressed()) 249 { 250 m_ownerThreadContext = ExecutionContext.Capture(); 251 } 252 253 m_actionCallerID = 0; 254 255 } 256 257 /// <summary> 258 /// Extract the three variables current, total and sense from a given big variable 259 /// </summary> 260 /// <param name="currentTotal">The integer variable that contains the other three variables</param> 261 /// <param name="current">The current cparticipant count</param> 262 /// <param name="total">The total participants count</param> 263 /// <param name="sense">The sense flag</param> GetCurrentTotal(int currentTotal, out int current, out int total, out bool sense)264 private void GetCurrentTotal(int currentTotal, out int current, out int total, out bool sense) 265 { 266 total = (int)(currentTotal & TOTAL_MASK); 267 current = (int)((currentTotal & CURRENT_MASK) >> 16); 268 sense = (currentTotal & SENSE_MASK) == 0 ? true : false; 269 } 270 271 /// <summary> 272 /// Write the three variables current. total and the sense to the m_currentTotal 273 /// </summary> 274 /// <param name="currentTotal">The old current total to compare</param> 275 /// <param name="current">The current cparticipant count</param> 276 /// <param name="total">The total participants count</param> 277 /// <param name="sense">The sense flag</param> 278 /// <returns>True if the CAS succeeded, false otherwise</returns> SetCurrentTotal(int currentTotal, int current, int total, bool sense)279 private bool SetCurrentTotal(int currentTotal, int current, int total, bool sense) 280 { 281 int newCurrentTotal = (current <<16) | total; 282 283 if (!sense) 284 { 285 newCurrentTotal |= SENSE_MASK; 286 } 287 288 #pragma warning disable 0420 289 return Interlocked.CompareExchange(ref m_currentTotalCount, newCurrentTotal, currentTotal) == currentTotal; 290 #pragma warning restore 0420 291 } 292 293 /// <summary> 294 /// Notifies the <see cref="Barrier"/> that there will be an additional participant. 295 /// </summary> 296 /// <returns>The phase number of the barrier in which the new participants will first 297 /// participate.</returns> 298 /// <exception cref="T:System.InvalidOperationException"> 299 /// Adding a participant would cause the barrier's participant count to 300 /// exceed <see cref="T:System.Int16.MaxValue"/>. 301 /// </exception> 302 /// <exception cref="T:System.InvalidOperationException"> 303 /// The method was invoked from within a post-phase action. 304 /// </exception> 305 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 306 /// disposed.</exception> AddParticipant()307 public long AddParticipant() 308 { 309 try 310 { 311 return AddParticipants(1); 312 } 313 catch (ArgumentOutOfRangeException) 314 { 315 throw new InvalidOperationException(SR.GetString(SR.Barrier_AddParticipants_Overflow_ArgumentOutOfRange)); 316 } 317 } 318 319 /// <summary> 320 /// Notifies the <see cref="Barrier"/> that there will be additional participants. 321 /// </summary> 322 /// <param name="participantCount">The number of additional participants to add to the 323 /// barrier.</param> 324 /// <returns>The phase number of the barrier in which the new participants will first 325 /// participate.</returns> 326 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="participantCount"/> is less than 327 /// 0.</exception> 328 /// <exception cref="T:System.ArgumentOutOfRangeException">Adding <paramref name="participantCount"/> participants would cause the 329 /// barrier's participant count to exceed <see cref="T:System.Int16.MaxValue"/>.</exception> 330 /// <exception cref="T:System.InvalidOperationException"> 331 /// The method was invoked from within a post-phase action. 332 /// </exception> 333 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 334 /// disposed.</exception> AddParticipants(int participantCount)335 public long AddParticipants(int participantCount) 336 { 337 // check dispose 338 ThrowIfDisposed(); 339 340 if (participantCount < 1 ) 341 { 342 throw new ArgumentOutOfRangeException("participantCount", participantCount, 343 SR.GetString(SR.Barrier_AddParticipants_NonPositive_ArgumentOutOfRange)); 344 } 345 else if (participantCount > MAX_PARTICIPANTS) //overflow 346 { 347 throw new ArgumentOutOfRangeException("participantCount", 348 SR.GetString(SR.Barrier_AddParticipants_Overflow_ArgumentOutOfRange)); 349 } 350 351 // in case of this is called from the PHA 352 if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID) 353 { 354 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA)); 355 } 356 357 SpinWait spinner = new SpinWait(); 358 long newPhase = 0; 359 while (true) 360 { 361 int currentTotal = m_currentTotalCount; 362 int total; 363 int current; 364 bool sense; 365 GetCurrentTotal(currentTotal, out current, out total, out sense); 366 if (participantCount + total > MAX_PARTICIPANTS) //overflow 367 { 368 throw new ArgumentOutOfRangeException("participantCount", 369 SR.GetString(SR.Barrier_AddParticipants_Overflow_ArgumentOutOfRange)); 370 } 371 372 if (SetCurrentTotal(currentTotal, current, total + participantCount, sense)) 373 { 374 // Calculating the first phase for that participant, if the current phase already finished return the nextphase else return the current phase 375 // To know that the current phase is the sense doesn't match the 376 // phase odd even, so that means it didn't yet change the phase count, so currentPhase +1 is returned, otherwise currentPhase is returned 377 long currPhase = CurrentPhaseNumber; 378 newPhase = (sense != (currPhase % 2 == 0)) ? currPhase + 1 : currPhase; 379 380 // If this participant is going to join the next phase, which means the postPhaseAction is being running, this participants must wait until this done 381 // and its event is reset. 382 // Without that, if the postPhaseAction takes long time, this means the event ehich the current participant is goint to wait on is still set 383 // (FinishPPhase didn't reset it yet) so it should wait until it reset 384 if (newPhase != currPhase) 385 { 386 // Wait on the opposite event 387 if (sense) 388 { 389 m_oddEvent.Wait(); 390 } 391 else 392 { 393 m_evenEvent.Wait(); 394 } 395 } 396 397 //This else to fix the racing where the current phase has been finished, m_currentPhase has been updated but the events have not been set/reset yet 398 // otherwise when this participant calls SignalAndWait it will wait on a set event however all other participants have not arrived yet. 399 else 400 { 401 if (sense && m_evenEvent.IsSet) 402 m_evenEvent.Reset(); 403 else if (!sense && m_oddEvent.IsSet) 404 m_oddEvent.Reset(); 405 } 406 break; 407 } 408 spinner.SpinOnce(); 409 } 410 return newPhase; 411 } 412 413 /// <summary> 414 /// Notifies the <see cref="Barrier"/> that there will be one less participant. 415 /// </summary> 416 /// <exception cref="T:System.InvalidOperationException">The barrier already has 0 417 /// participants.</exception> 418 /// <exception cref="T:System.InvalidOperationException"> 419 /// The method was invoked from within a post-phase action. 420 /// </exception> 421 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 422 /// disposed.</exception> RemoveParticipant()423 public void RemoveParticipant() 424 { 425 RemoveParticipants(1); 426 } 427 428 /// <summary> 429 /// Notifies the <see cref="Barrier"/> that there will be fewer participants. 430 /// </summary> 431 /// <param name="participantCount">The number of additional participants to remove from the barrier.</param> 432 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="participantCount"/> is less than 433 /// 0.</exception> 434 /// <exception cref="T:System.InvalidOperationException">The barrier already has 0 participants.</exception> 435 /// <exception cref="T:System.InvalidOperationException"> 436 /// The method was invoked from within a post-phase action. 437 /// </exception> 438 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 439 /// disposed.</exception> RemoveParticipants(int participantCount)440 public void RemoveParticipants(int participantCount) 441 { 442 // check dispose 443 ThrowIfDisposed(); 444 445 // Validate input 446 if (participantCount < 1) 447 { 448 throw new ArgumentOutOfRangeException("participantCount", participantCount, 449 SR.GetString(SR.Barrier_RemoveParticipants_NonPositive_ArgumentOutOfRange)); 450 } 451 452 // in case of this is called from the PHA 453 if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID) 454 { 455 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA)); 456 } 457 458 SpinWait spinner = new SpinWait(); 459 while (true) 460 { 461 int currentTotal = m_currentTotalCount; 462 int total; 463 int current; 464 bool sense; 465 GetCurrentTotal(currentTotal, out current, out total, out sense); 466 467 if (total < participantCount) 468 { 469 throw new ArgumentOutOfRangeException("participantCount", 470 SR.GetString(SR.Barrier_RemoveParticipants_ArgumentOutOfRange)); 471 } 472 if (total - participantCount < current) 473 { 474 throw new InvalidOperationException(SR.GetString(SR.Barrier_RemoveParticipants_InvalidOperation)); 475 } 476 // If the remaining participats = current participants, then finish the current phase 477 int remaingParticipants = total - participantCount; 478 if (remaingParticipants > 0 && current == remaingParticipants ) 479 { 480 if (SetCurrentTotal(currentTotal, 0, total - participantCount, !sense)) 481 { 482 FinishPhase(sense); 483 break; 484 } 485 } 486 else 487 { 488 if (SetCurrentTotal(currentTotal, current, total - participantCount, sense)) 489 { 490 break; 491 } 492 } 493 spinner.SpinOnce(); 494 } 495 } 496 497 /// <summary> 498 /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other 499 /// participants to reach the barrier as well. 500 /// </summary> 501 /// <exception cref="T:System.InvalidOperationException"> 502 /// The method was invoked from within a post-phase action, the barrier currently has 0 participants, 503 /// or the barrier is being used by more threads than are registered as participants. 504 /// </exception> 505 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 506 /// disposed.</exception> SignalAndWait()507 public void SignalAndWait() 508 { 509 SignalAndWait(new CancellationToken()); 510 } 511 512 /// <summary> 513 /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other 514 /// participants to reach the barrier, while observing a <see 515 /// cref="T:System.Threading.CancellationToken"/>. 516 /// </summary> 517 /// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to 518 /// observe.</param> 519 /// <exception cref="T:System.InvalidOperationException"> 520 /// The method was invoked from within a post-phase action, the barrier currently has 0 participants, 521 /// or the barrier is being used by more threads than are registered as participants. 522 /// </exception> 523 /// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been 524 /// canceled.</exception> 525 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 526 /// disposed.</exception> SignalAndWait(CancellationToken cancellationToken)527 public void SignalAndWait(CancellationToken cancellationToken) 528 { 529 #if DEBUG 530 bool result = 531 #endif 532 SignalAndWait(Timeout.Infinite, cancellationToken); 533 #if DEBUG 534 Debug.Assert(result); 535 #endif 536 } 537 538 /// <summary> 539 /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other 540 /// participants to reach the barrier as well, using a 541 /// <see cref="T:System.TimeSpan"/> to measure the time interval. 542 /// </summary> 543 /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of 544 /// milliseconds to wait, or a <see cref="T:System.TimeSpan"/> that represents -1 milliseconds to 545 /// wait indefinitely.</param> 546 /// <returns>true if all other participants reached the barrier; otherwise, false.</returns> 547 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/>is a negative number 548 /// other than -1 milliseconds, which represents an infinite time-out, or it is greater than 549 /// <see cref="T:System.Int32.MaxValue"/>.</exception> 550 /// <exception cref="T:System.InvalidOperationException"> 551 /// The method was invoked from within a post-phase action, the barrier currently has 0 participants, 552 /// or the barrier is being used by more threads than are registered as participants. 553 /// </exception> 554 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 555 /// disposed.</exception> SignalAndWait(TimeSpan timeout)556 public Boolean SignalAndWait(TimeSpan timeout) 557 { 558 return SignalAndWait(timeout, new CancellationToken()); 559 } 560 561 /// <summary> 562 /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other 563 /// participants to reach the barrier as well, using a 564 /// <see cref="T:System.TimeSpan"/> to measure the time interval, while observing a <see 565 /// cref="T:System.Threading.CancellationToken"/>. 566 /// </summary> 567 /// <param name="timeout">A <see cref="T:System.TimeSpan"/> that represents the number of 568 /// milliseconds to wait, or a <see cref="T:System.TimeSpan"/> that represents -1 milliseconds to 569 /// wait indefinitely.</param> 570 /// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to 571 /// observe.</param> 572 /// <returns>true if all other participants reached the barrier; otherwise, false.</returns> 573 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout"/>is a negative number 574 /// other than -1 milliseconds, which represents an infinite time-out.</exception> 575 /// <exception cref="T:System.InvalidOperationException"> 576 /// The method was invoked from within a post-phase action, the barrier currently has 0 participants, 577 /// or the barrier is being used by more threads than are registered as participants. 578 /// </exception> 579 /// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been 580 /// canceled.</exception> 581 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 582 /// disposed.</exception> SignalAndWait(TimeSpan timeout, CancellationToken cancellationToken)583 public Boolean SignalAndWait(TimeSpan timeout, CancellationToken cancellationToken) 584 { 585 Int64 totalMilliseconds = (Int64)timeout.TotalMilliseconds; 586 if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue) 587 { 588 throw new System.ArgumentOutOfRangeException("timeout", timeout, 589 SR.GetString(SR.Barrier_SignalAndWait_ArgumentOutOfRange)); 590 } 591 return SignalAndWait((int)timeout.TotalMilliseconds, cancellationToken); 592 } 593 594 /// <summary> 595 /// Signals that a participant has reached the <see cref="Barrier"/> and waits for all other 596 /// participants to reach the barrier as well, using a 597 /// 32-bit signed integer to measure the time interval. 598 /// </summary> 599 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 600 /// cref="Timeout.Infinite"/>(-1) to wait indefinitely.</param> 601 /// <returns>true if all other participants reached the barrier; otherwise, false.</returns> 602 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 603 /// negative number other than -1, which represents an infinite time-out.</exception> 604 /// <exception cref="T:System.InvalidOperationException"> 605 /// The method was invoked from within a post-phase action, the barrier currently has 0 participants, 606 /// or the barrier is being used by more threads than are registered as participants. 607 /// </exception> 608 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 609 /// disposed.</exception> SignalAndWait(int millisecondsTimeout)610 public bool SignalAndWait(int millisecondsTimeout) 611 { 612 return SignalAndWait(millisecondsTimeout, new CancellationToken()); 613 } 614 615 /// <summary> 616 /// Signals that a participant has reached the barrier and waits for all other participants to reach 617 /// the barrier as well, using a 618 /// 32-bit signed integer to measure the time interval, while observing a <see 619 /// cref="T:System.Threading.CancellationToken"/>. 620 /// </summary> 621 /// <param name="millisecondsTimeout">The number of milliseconds to wait, or <see 622 /// cref="Timeout.Infinite"/>(-1) to wait indefinitely.</param> 623 /// <param name="cancellationToken">The <see cref="T:System.Threading.CancellationToken"/> to 624 /// observe.</param> 625 /// <returns>true if all other participants reached the barrier; otherwise, false.</returns> 626 /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="millisecondsTimeout"/> is a 627 /// negative number other than -1, which represents an infinite time-out.</exception> 628 /// <exception cref="T:System.InvalidOperationException"> 629 /// The method was invoked from within a post-phase action, the barrier currently has 0 participants, 630 /// or the barrier is being used by more threads than are registered as participants. 631 /// </exception> 632 /// <exception cref="T:System.OperationCanceledException"><paramref name="cancellationToken"/> has been 633 /// canceled.</exception> 634 /// <exception cref="T:System.ObjectDisposedException">The current instance has already been 635 /// disposed.</exception> SignalAndWait(int millisecondsTimeout, CancellationToken cancellationToken)636 public bool SignalAndWait(int millisecondsTimeout, CancellationToken cancellationToken) 637 { 638 ThrowIfDisposed(); 639 cancellationToken.ThrowIfCancellationRequested(); 640 641 if (millisecondsTimeout < -1) 642 { 643 throw new System.ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout, 644 SR.GetString(SR.Barrier_SignalAndWait_ArgumentOutOfRange)); 645 } 646 647 // in case of this is called from the PHA 648 if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID) 649 { 650 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA)); 651 } 652 653 // local variables to extract the basic barrier variable and update them 654 // The are declared here instead of inside the loop body because the will be used outside the loop 655 bool sense; // The sense of the barrier *before* the phase associated with this SignalAndWait call completes 656 int total; 657 int current; 658 int currentTotal; 659 long phase; 660 SpinWait spinner = new SpinWait(); 661 while (true) 662 { 663 currentTotal = m_currentTotalCount; 664 GetCurrentTotal(currentTotal, out current, out total, out sense); 665 phase = CurrentPhaseNumber; 666 // throw if zero participants 667 if (total == 0) 668 { 669 throw new InvalidOperationException(SR.GetString(SR.Barrier_SignalAndWait_InvalidOperation_ZeroTotal)); 670 } 671 // Try to detect if the number of threads for this phase exceeded the total number of participants or not 672 // This can be detected if the current is zero which means all participants for that phase has arrived and the phase number is not changed yet 673 if (current == 0 && sense != (CurrentPhaseNumber % 2 == 0)) 674 { 675 throw new InvalidOperationException(SR.GetString(SR.Barrier_SignalAndWait_InvalidOperation_ThreadsExceeded)); 676 } 677 //This is the last thread, finish the phase 678 if (current + 1 == total) 679 { 680 if (SetCurrentTotal(currentTotal, 0, total, !sense)) 681 { 682 #if !FEATURE_PAL && !SILVERLIGHT // PAL doesn't support eventing 683 if (CdsSyncEtwBCLProvider.Log.IsEnabled()) 684 { 685 CdsSyncEtwBCLProvider.Log.Barrier_PhaseFinished(sense, CurrentPhaseNumber); 686 } 687 #endif 688 FinishPhase(sense); 689 return true; 690 } 691 } 692 else if (SetCurrentTotal(currentTotal, current + 1, total, sense)) 693 { 694 break; 695 } 696 697 spinner.SpinOnce(); 698 699 } 700 701 // ** Perform the real wait ** 702 // select the correct event to wait on, based on the current sense. 703 ManualResetEventSlim eventToWaitOn = (sense) ? m_evenEvent : m_oddEvent; 704 705 bool waitWasCanceled = false; 706 bool waitResult = false; 707 try 708 { 709 waitResult = DiscontinuousWait(eventToWaitOn, millisecondsTimeout, cancellationToken, phase); 710 } 711 catch (OperationCanceledException ) 712 { 713 waitWasCanceled = true; 714 } 715 catch (ObjectDisposedException)// in case a ---- happen where one of the thread returned from SignalAndWait and the current thread calls Wait on a disposed event 716 { 717 // make sure the current phase for this thread is already finished, otherwise propagate the exception 718 if (phase < CurrentPhaseNumber) 719 waitResult = true; 720 else 721 throw; 722 } 723 724 725 726 if (!waitResult) 727 { 728 //reset the spinLock to prepare it for the next loop 729 spinner.Reset(); 730 731 //If the wait timeout expired and all other thread didn't reach the barrier yet, update the current count back 732 while (true) 733 { 734 bool newSense; 735 currentTotal = m_currentTotalCount; 736 GetCurrentTotal(currentTotal, out current, out total, out newSense); 737 // If the timeout expired and the phase has just finished, return true and this is considered as succeeded SignalAndWait 738 //otherwise the timeout expired and the current phase has not been finished yet, return false 739 //The phase is finished if the phase member variable is changed (incremented) or the sense has been changed 740 // we have to use the statements in the comparison below for two cases: 741 // 1- The sense is changed but the last thread didn't update the phase yet 742 // 2- The phase is already incremented but the sense flipped twice due to the termination of the next phase 743 if (phase < CurrentPhaseNumber || sense != newSense) 744 { 745 746 // The current phase has been finished, but we shouldn't return before the events are set/reset otherwise this thread could start 747 // next phase and the appropriate event has not reset yet which could make it return immediately from the next phase SignalAndWait 748 // before waiting other threads 749 WaitCurrentPhase(eventToWaitOn, phase); 750 Debug.Assert(phase < CurrentPhaseNumber); 751 break; 752 } 753 //The phase has not been finished yet, try to update the current count. 754 if (SetCurrentTotal(currentTotal, current - 1, total, sense)) 755 { 756 //if here, then the attempt to backout was successful. 757 //throw (a fresh) oce if cancellation woke the wait 758 //or return false if it was the timeout that woke the wait. 759 // 760 if (waitWasCanceled) 761 throw new OperationCanceledException(SR.GetString(SR.Common_OperationCanceled), cancellationToken); 762 else 763 return false; 764 } 765 spinner.SpinOnce(); 766 } 767 } 768 769 if (m_exception != null) 770 throw new BarrierPostPhaseException(m_exception); 771 772 return true; 773 774 } 775 776 /// <summary> 777 /// Finish the phase by invoking the post phase action, and setting the event, this must be called by the 778 /// last arrival thread 779 /// </summary> 780 /// <param name="observedSense">The current phase sense</param> 781 [SecuritySafeCritical] FinishPhase(bool observedSense)782 private void FinishPhase(bool observedSense) 783 { 784 // Execute the PHA in try/finally block to reset the variables back in case of it threw an exception 785 if (m_postPhaseAction != null) 786 { 787 try 788 { 789 // Capture the caller thread ID to check if the Add/RemoveParticipant(s) is called from the PHA 790 m_actionCallerID = Thread.CurrentThread.ManagedThreadId; 791 if (m_ownerThreadContext != null) 792 { 793 var currentContext = m_ownerThreadContext; 794 m_ownerThreadContext = m_ownerThreadContext.CreateCopy(); // create a copy for the next run 795 796 ContextCallback handler = s_invokePostPhaseAction; 797 if (handler == null) 798 { 799 s_invokePostPhaseAction = handler = InvokePostPhaseAction; 800 } 801 ExecutionContext.Run(currentContext, handler, this); 802 #if !PFX_LEGACY_3_5 803 // Dispose the context directly after using it, 804 // the copy will either be used and siposed in the next phase or in the Dispose 805 currentContext.Dispose(); 806 #endif 807 } 808 else 809 { 810 m_postPhaseAction(this); 811 } 812 m_exception = null; // reset the exception if it was set previously 813 } 814 catch (Exception ex) 815 { 816 m_exception = ex; 817 } 818 finally 819 { 820 m_actionCallerID = 0; 821 SetResetEvents(observedSense); 822 if(m_exception != null) 823 throw new BarrierPostPhaseException(m_exception); 824 } 825 826 } 827 else 828 { 829 SetResetEvents(observedSense); 830 } 831 } 832 833 /// <summary> 834 /// Helper method to call the post phase action 835 /// </summary> 836 /// <param name="obj"></param> 837 [SecurityCritical] InvokePostPhaseAction(object obj)838 private static void InvokePostPhaseAction(object obj) 839 { 840 var thisBarrier = (Barrier)obj; 841 thisBarrier.m_postPhaseAction(thisBarrier); 842 } 843 844 /// <summary> 845 /// Sets the current phase event and reset the next phase event 846 /// </summary> 847 /// <param name="observedSense">The current phase sense</param> SetResetEvents(bool observedSense)848 private void SetResetEvents(bool observedSense) 849 { 850 // Increment the phase count using Volatile class because m_currentPhase is 64 bit long type, that could cause torn write on 32 bit machines 851 CurrentPhaseNumber = CurrentPhaseNumber + 1; 852 if (observedSense) 853 { 854 m_oddEvent.Reset(); 855 m_evenEvent.Set(); 856 } 857 else 858 { 859 m_evenEvent.Reset(); 860 m_oddEvent.Set(); 861 } 862 } 863 864 /// <summary> 865 /// Wait until the current phase finishes completely by spinning until either the event is set, 866 /// or the phase count is incremented more than one time 867 /// </summary> 868 /// <param name="currentPhaseEvent">The current phase event</param> 869 /// <param name="observedPhase">The current phase for that thread</param> WaitCurrentPhase(ManualResetEventSlim currentPhaseEvent, long observedPhase)870 private void WaitCurrentPhase(ManualResetEventSlim currentPhaseEvent, long observedPhase) 871 { 872 //spin until either of these two conditions succeeds 873 //1- The event is set 874 //2- the phase count is incremented more than one time, this means the next phase is finished as well, 875 //but the event will be reset again, so we check the phase count instead 876 SpinWait spinner = new SpinWait(); 877 while (!currentPhaseEvent.IsSet && CurrentPhaseNumber - observedPhase <= 1) 878 { 879 spinner.SpinOnce(); 880 } 881 } 882 883 /// <summary> 884 /// The reason of discontinuous waiting instead of direct waiting on the event is to avoid the ---- where the sense is 885 /// changed twice because the next phase is finished (due to either RemoveParticipant is called or another thread joined 886 /// the next phase instead of the current thread) so the current thread will be stuck on the event because it is reset back 887 /// The maxwait and the shift numbers are arbitrarily choosen, there were no references picking them 888 /// </summary> 889 /// <param name="currentPhaseEvent">The current phase event</param> 890 /// <param name="totalTimeout">wait timeout in milliseconds</param> 891 /// <param name="token">cancellation token passed to SignalAndWait</param> 892 /// <param name="observedPhase">The current phase number for this thread</param> 893 /// <returns>True if the event is set or the phasenumber changed, false if the timeout expired</returns> DiscontinuousWait(ManualResetEventSlim currentPhaseEvent, int totalTimeout, CancellationToken token, long observedPhase)894 private bool DiscontinuousWait(ManualResetEventSlim currentPhaseEvent, int totalTimeout, CancellationToken token, long observedPhase) 895 { 896 int maxWait = 100; // 100 ms 897 int waitTimeCeiling = 10000; // 10 seconds 898 while (observedPhase == CurrentPhaseNumber) 899 { 900 // the next wait time, the min of the maxWait and the totalTimeout 901 int waitTime = totalTimeout == Timeout.Infinite ? maxWait : Math.Min(maxWait, totalTimeout); 902 903 if (currentPhaseEvent.Wait(waitTime, token)) return true; 904 905 //update the total wait time 906 if (totalTimeout != Timeout.Infinite) 907 { 908 totalTimeout -= waitTime; 909 if (totalTimeout <= 0) return false; 910 } 911 912 //if the maxwait exceeded 10 seconds then we will stop increasing the maxWait time and keep it 10 seconds, otherwise keep doubling it 913 maxWait = maxWait >= waitTimeCeiling ? waitTimeCeiling : Math.Min(maxWait << 1, waitTimeCeiling); 914 } 915 916 //if we exited the loop because the observed phase doesn't match the current phase, then we have to spin to mske sure 917 //the event is set or the next phase is finished 918 WaitCurrentPhase(currentPhaseEvent, observedPhase); 919 920 return true; 921 } 922 923 /// <summary> 924 /// Releases all resources used by the current instance of <see cref="Barrier"/>. 925 /// </summary> 926 /// <exception cref="T:System.InvalidOperationException"> 927 /// The method was invoked from within a post-phase action. 928 /// </exception> 929 /// <remarks> 930 /// Unlike most of the members of <see cref="Barrier"/>, Dispose is not thread-safe and may not be 931 /// used concurrently with other members of this instance. 932 /// </remarks> Dispose()933 public void Dispose() 934 { 935 // in case of this is called from the PHA 936 if (m_actionCallerID != 0 && Thread.CurrentThread.ManagedThreadId == m_actionCallerID) 937 { 938 throw new InvalidOperationException(SR.GetString(SR.Barrier_InvalidOperation_CalledFromPHA)); 939 } 940 Dispose(true); 941 GC.SuppressFinalize(this); 942 } 943 944 /// <summary> 945 /// When overridden in a derived class, releases the unmanaged resources used by the 946 /// <see cref="Barrier"/>, and optionally releases the managed resources. 947 /// </summary> 948 /// <param name="disposing">true to release both managed and unmanaged resources; false to release 949 /// only unmanaged resources.</param> 950 /// <remarks> 951 /// Unlike most of the members of <see cref="Barrier"/>, Dispose is not thread-safe and may not be 952 /// used concurrently with other members of this instance. 953 /// </remarks> Dispose(bool disposing)954 protected virtual void Dispose(bool disposing) 955 { 956 if (!m_disposed) 957 { 958 if (disposing) 959 { 960 m_oddEvent.Dispose(); 961 m_evenEvent.Dispose(); 962 #if !PFX_LEGACY_3_5 963 if (m_ownerThreadContext != null) 964 { 965 m_ownerThreadContext.Dispose(); 966 m_ownerThreadContext = null; 967 } 968 #endif 969 } 970 m_disposed = true; 971 } 972 } 973 974 /// <summary> 975 /// Throw ObjectDisposedException if the barrier is disposed 976 /// </summary> ThrowIfDisposed()977 private void ThrowIfDisposed() 978 { 979 if (m_disposed) 980 { 981 throw new ObjectDisposedException("Barrier", SR.GetString(SR.Barrier_Dispose)); 982 } 983 } 984 } 985 } 986