1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 using System; 4 using System.Threading; 5 6 namespace System.Reactive 7 { 8 internal class CheckedObserver<T> : IObserver<T> 9 { 10 private readonly IObserver<T> _observer; 11 private int _state; 12 13 private const int IDLE = 0; 14 private const int BUSY = 1; 15 private const int DONE = 2; 16 CheckedObserver(IObserver<T> observer)17 public CheckedObserver(IObserver<T> observer) 18 { 19 _observer = observer; 20 } 21 OnNext(T value)22 public void OnNext(T value) 23 { 24 CheckAccess(); 25 26 try 27 { 28 _observer.OnNext(value); 29 } 30 finally 31 { 32 Interlocked.Exchange(ref _state, IDLE); 33 } 34 } 35 OnError(Exception error)36 public void OnError(Exception error) 37 { 38 CheckAccess(); 39 40 try 41 { 42 _observer.OnError(error); 43 } 44 finally 45 { 46 Interlocked.Exchange(ref _state, DONE); 47 } 48 } 49 OnCompleted()50 public void OnCompleted() 51 { 52 CheckAccess(); 53 54 try 55 { 56 _observer.OnCompleted(); 57 } 58 finally 59 { 60 Interlocked.Exchange(ref _state, DONE); 61 } 62 } 63 CheckAccess()64 private void CheckAccess() 65 { 66 switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE)) 67 { 68 case BUSY: 69 throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED); 70 case DONE: 71 throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED); 72 } 73 } 74 } 75 } 76