1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
2 
3 using System;
4 using System.ComponentModel;
5 using System.Reactive.Disposables;
6 using System.Threading;
7 
8 namespace System.Reactive.Concurrency
9 {
10     /// <summary>
11     /// Provides basic synchronization and scheduling services for observable sequences.
12     /// </summary>
13     [EditorBrowsable(EditorBrowsableState.Advanced)]
14     public static class Synchronization
15     {
16         #region SubscribeOn
17 
18         /// <summary>
19         /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.
20         /// </summary>
21         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
22         /// <param name="source">Source sequence.</param>
23         /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
24         /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
25         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
26         /// <remarks>
27         /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler.
28         /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
29         /// </remarks>
SubscribeOn(IObservable<TSource> source, IScheduler scheduler)30         public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
31         {
32             if (source == null)
33                 throw new ArgumentNullException("source");
34             if (scheduler == null)
35                 throw new ArgumentNullException("scheduler");
36 
37             return new AnonymousObservable<TSource>(observer =>
38             {
39                 var m = new SingleAssignmentDisposable();
40                 var d = new SerialDisposable();
41                 d.Disposable = m;
42 
43                 m.Disposable = scheduler.Schedule(() =>
44                 {
45                     d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
46                 });
47 
48                 return d;
49             });
50         }
51 
52 #if !NO_SYNCCTX
53         /// <summary>
54         /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context.
55         /// </summary>
56         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
57         /// <param name="source">Source sequence.</param>
58         /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
59         /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
60         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
61         /// <remarks>
62         /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context.
63         /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
64         /// </remarks>
SubscribeOn(IObservable<TSource> source, SynchronizationContext context)65         public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
66         {
67             if (source == null)
68                 throw new ArgumentNullException("source");
69             if (context == null)
70                 throw new ArgumentNullException("context");
71 
72             return new AnonymousObservable<TSource>(observer =>
73             {
74                 var subscription = new SingleAssignmentDisposable();
75                 context.PostWithStartComplete(() =>
76                 {
77                     if (!subscription.IsDisposed)
78                         subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
79                 });
80                 return subscription;
81             });
82         }
83 #endif
84 
85         #endregion
86 
87         #region ObserveOn
88 
89         /// <summary>
90         /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
91         /// </summary>
92         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
93         /// <param name="source">Source sequence.</param>
94         /// <param name="scheduler">Scheduler to notify observers on.</param>
95         /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
96         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
ObserveOn(IObservable<TSource> source, IScheduler scheduler)97         public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
98         {
99             if (source == null)
100                 throw new ArgumentNullException("source");
101             if (scheduler == null)
102                 throw new ArgumentNullException("scheduler");
103 
104 #if !NO_PERF
105             return new ObserveOn<TSource>(source, scheduler);
106 #else
107             return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null)));
108 #endif
109         }
110 
111 #if !NO_SYNCCTX
112         /// <summary>
113         /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
114         /// </summary>
115         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
116         /// <param name="source">Source sequence.</param>
117         /// <param name="context">Synchronization context to notify observers on.</param>
118         /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
119         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
ObserveOn(IObservable<TSource> source, SynchronizationContext context)120         public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
121         {
122             if (source == null)
123                 throw new ArgumentNullException("source");
124             if (context == null)
125                 throw new ArgumentNullException("context");
126 
127 #if !NO_PERF
128             return new ObserveOn<TSource>(source, context);
129 #else
130             return new AnonymousObservable<TSource>(observer =>
131             {
132                 context.OperationStarted();
133 
134                 return source.Subscribe(
135                     x => context.Post(_ =>
136                     {
137                         observer.OnNext(x);
138                     }, null),
139                     exception => context.Post(_ =>
140                     {
141                         observer.OnError(exception);
142                     }, null),
143                     () => context.Post(_ =>
144                     {
145                         observer.OnCompleted();
146                     }, null)
147                 ).Finally(() =>
148                 {
149                     context.OperationCompleted();
150                 });
151             });
152 #endif
153         }
154 #endif
155 
156         #endregion
157 
158         #region Synchronize
159 
160         /// <summary>
161         /// Wraps the source sequence in order to ensure observer callbacks are properly serialized.
162         /// </summary>
163         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
164         /// <param name="source">Source sequence.</param>
165         /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
166         /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
Synchronize(IObservable<TSource> source)167         public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
168         {
169             if (source == null)
170                 throw new ArgumentNullException("source");
171 
172 #if !NO_PERF
173             return new Synchronize<TSource>(source);
174 #else
175             return new AnonymousObservable<TSource>(observer =>
176             {
177                 var gate = new object();
178                 return source.Subscribe(Observer.Synchronize(observer, gate));
179             });
180 #endif
181         }
182 
183         /// <summary>
184         /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object.
185         /// </summary>
186         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
187         /// <param name="source">Source sequence.</param>
188         /// <param name="gate">Gate object to synchronize each observer call on.</param>
189         /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
190         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
Synchronize(IObservable<TSource> source, object gate)191         public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
192         {
193             if (source == null)
194                 throw new ArgumentNullException("source");
195             if (gate == null)
196                 throw new ArgumentNullException("gate");
197 
198 #if !NO_PERF
199             return new Synchronize<TSource>(source, gate);
200 #else
201             return new AnonymousObservable<TSource>(observer =>
202             {
203                 return source.Subscribe(Observer.Synchronize(observer, gate));
204             });
205 #endif
206         }
207 
208         #endregion
209     }
210 }
211