1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 using System;
4 using System.Threading;
5 
6 namespace System.Reactive
7 {
8     internal class CheckedObserver<T> : IObserver<T>
9     {
10         private readonly IObserver<T> _observer;
11         private int _state;
12 
13         private const int IDLE = 0;
14         private const int BUSY = 1;
15         private const int DONE = 2;
16 
CheckedObserver(IObserver<T> observer)17         public CheckedObserver(IObserver<T> observer)
18         {
19             _observer = observer;
20         }
21 
OnNext(T value)22         public void OnNext(T value)
23         {
24             CheckAccess();
25 
26             try
27             {
28                 _observer.OnNext(value);
29             }
30             finally
31             {
32                 Interlocked.Exchange(ref _state, IDLE);
33             }
34         }
35 
OnError(Exception error)36         public void OnError(Exception error)
37         {
38             CheckAccess();
39 
40             try
41             {
42                 _observer.OnError(error);
43             }
44             finally
45             {
46                 Interlocked.Exchange(ref _state, DONE);
47             }
48         }
49 
OnCompleted()50         public void OnCompleted()
51         {
52             CheckAccess();
53 
54             try
55             {
56                 _observer.OnCompleted();
57             }
58             finally
59             {
60                 Interlocked.Exchange(ref _state, DONE);
61             }
62         }
63 
CheckAccess()64         private void CheckAccess()
65         {
66             switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE))
67             {
68                 case BUSY:
69                     throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED);
70                 case DONE:
71                     throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED);
72             }
73         }
74     }
75 }
76