1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 using System;
4 using System.Threading;
5 
6 namespace System.Reactive
7 {
8     //
9     // See AutoDetachObserver.cs for more information on the safeguarding requirement and
10     // its implementation aspects.
11     //
12 
13     /// <summary>
14     /// This class fuses logic from ObserverBase, AnonymousObserver, and SafeObserver into one class. When an observer
15     /// needs to be safeguarded, an instance of this type can be created by SafeObserver.Create when it detects its
16     /// input is an AnonymousObserver, which is commonly used by end users when using the Subscribe extension methods
17     /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
18     /// helps debugging and some performance.
19     /// </summary>
20     class AnonymousSafeObserver<T> : IObserver<T>
21     {
22         private readonly Action<T> _onNext;
23         private readonly Action<Exception> _onError;
24         private readonly Action _onCompleted;
25         private readonly IDisposable _disposable;
26 
27         private int isStopped;
28 
AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)29         public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
30         {
31             _onNext = onNext;
32             _onError = onError;
33             _onCompleted = onCompleted;
34             _disposable = disposable;
35         }
36 
OnNext(T value)37         public void OnNext(T value)
38         {
39             if (isStopped == 0)
40             {
41                 var __noError = false;
42                 try
43                 {
44                     _onNext(value);
45                     __noError = true;
46                 }
47                 finally
48                 {
49                     if (!__noError)
50                         _disposable.Dispose();
51                 }
52             }
53         }
54 
OnError(Exception error)55         public void OnError(Exception error)
56         {
57             if (Interlocked.Exchange(ref isStopped, 1) == 0)
58             {
59                 try
60                 {
61                     _onError(error);
62                 }
63                 finally
64                 {
65                     _disposable.Dispose();
66                 }
67             }
68         }
69 
OnCompleted()70         public void OnCompleted()
71         {
72             if (Interlocked.Exchange(ref isStopped, 1) == 0)
73             {
74                 try
75                 {
76                     _onCompleted();
77                 }
78                 finally
79                 {
80                     _disposable.Dispose();
81                 }
82             }
83         }
84     }
85 }
86