1 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. 2 3 using System; 4 using System.ComponentModel; 5 using System.Reactive.Disposables; 6 using System.Threading; 7 8 namespace System.Reactive.Concurrency 9 { 10 /// <summary> 11 /// Provides basic synchronization and scheduling services for observable sequences. 12 /// </summary> 13 [EditorBrowsable(EditorBrowsableState.Advanced)] 14 public static class Synchronization 15 { 16 #region SubscribeOn 17 18 /// <summary> 19 /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. 20 /// </summary> 21 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 22 /// <param name="source">Source sequence.</param> 23 /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param> 24 /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns> 25 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception> 26 /// <remarks> 27 /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified scheduler. 28 /// In order to invoke observer callbacks on the specified scheduler, e.g. to offload callback processing to a dedicated thread, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>. 29 /// </remarks> SubscribeOn(IObservable<TSource> source, IScheduler scheduler)30 public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler) 31 { 32 if (source == null) 33 throw new ArgumentNullException("source"); 34 if (scheduler == null) 35 throw new ArgumentNullException("scheduler"); 36 37 return new AnonymousObservable<TSource>(observer => 38 { 39 var m = new SingleAssignmentDisposable(); 40 var d = new SerialDisposable(); 41 d.Disposable = m; 42 43 m.Disposable = scheduler.Schedule(() => 44 { 45 d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer)); 46 }); 47 48 return d; 49 }); 50 } 51 52 #if !NO_SYNCCTX 53 /// <summary> 54 /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. 55 /// </summary> 56 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 57 /// <param name="source">Source sequence.</param> 58 /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param> 59 /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns> 60 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception> 61 /// <remarks> 62 /// Only the side-effects of subscribing to the source sequence and disposing subscriptions to the source sequence are run on the specified synchronization context. 63 /// In order to invoke observer callbacks on the specified synchronization context, e.g. to post callbacks to a UI thread represented by the synchronization context, use <see cref="Synchronization.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>. 64 /// </remarks> SubscribeOn(IObservable<TSource> source, SynchronizationContext context)65 public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context) 66 { 67 if (source == null) 68 throw new ArgumentNullException("source"); 69 if (context == null) 70 throw new ArgumentNullException("context"); 71 72 return new AnonymousObservable<TSource>(observer => 73 { 74 var subscription = new SingleAssignmentDisposable(); 75 context.PostWithStartComplete(() => 76 { 77 if (!subscription.IsDisposed) 78 subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer)); 79 }); 80 return subscription; 81 }); 82 } 83 #endif 84 85 #endregion 86 87 #region ObserveOn 88 89 /// <summary> 90 /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler. 91 /// </summary> 92 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 93 /// <param name="source">Source sequence.</param> 94 /// <param name="scheduler">Scheduler to notify observers on.</param> 95 /// <returns>The source sequence whose observations happen on the specified scheduler.</returns> 96 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception> ObserveOn(IObservable<TSource> source, IScheduler scheduler)97 public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler) 98 { 99 if (source == null) 100 throw new ArgumentNullException("source"); 101 if (scheduler == null) 102 throw new ArgumentNullException("scheduler"); 103 104 #if !NO_PERF 105 return new ObserveOn<TSource>(source, scheduler); 106 #else 107 return new AnonymousObservable<TSource>(observer => source.Subscribe(new ObserveOnObserver<TSource>(scheduler, observer, null))); 108 #endif 109 } 110 111 #if !NO_SYNCCTX 112 /// <summary> 113 /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context. 114 /// </summary> 115 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 116 /// <param name="source">Source sequence.</param> 117 /// <param name="context">Synchronization context to notify observers on.</param> 118 /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns> 119 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception> ObserveOn(IObservable<TSource> source, SynchronizationContext context)120 public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context) 121 { 122 if (source == null) 123 throw new ArgumentNullException("source"); 124 if (context == null) 125 throw new ArgumentNullException("context"); 126 127 #if !NO_PERF 128 return new ObserveOn<TSource>(source, context); 129 #else 130 return new AnonymousObservable<TSource>(observer => 131 { 132 context.OperationStarted(); 133 134 return source.Subscribe( 135 x => context.Post(_ => 136 { 137 observer.OnNext(x); 138 }, null), 139 exception => context.Post(_ => 140 { 141 observer.OnError(exception); 142 }, null), 143 () => context.Post(_ => 144 { 145 observer.OnCompleted(); 146 }, null) 147 ).Finally(() => 148 { 149 context.OperationCompleted(); 150 }); 151 }); 152 #endif 153 } 154 #endif 155 156 #endregion 157 158 #region Synchronize 159 160 /// <summary> 161 /// Wraps the source sequence in order to ensure observer callbacks are properly serialized. 162 /// </summary> 163 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 164 /// <param name="source">Source sequence.</param> 165 /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns> 166 /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception> Synchronize(IObservable<TSource> source)167 public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source) 168 { 169 if (source == null) 170 throw new ArgumentNullException("source"); 171 172 #if !NO_PERF 173 return new Synchronize<TSource>(source); 174 #else 175 return new AnonymousObservable<TSource>(observer => 176 { 177 var gate = new object(); 178 return source.Subscribe(Observer.Synchronize(observer, gate)); 179 }); 180 #endif 181 } 182 183 /// <summary> 184 /// Wraps the source sequence in order to ensure observer callbacks are synchronized using the specified gate object. 185 /// </summary> 186 /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> 187 /// <param name="source">Source sequence.</param> 188 /// <param name="gate">Gate object to synchronize each observer call on.</param> 189 /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns> 190 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception> Synchronize(IObservable<TSource> source, object gate)191 public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate) 192 { 193 if (source == null) 194 throw new ArgumentNullException("source"); 195 if (gate == null) 196 throw new ArgumentNullException("gate"); 197 198 #if !NO_PERF 199 return new Synchronize<TSource>(source, gate); 200 #else 201 return new AnonymousObservable<TSource>(observer => 202 { 203 return source.Subscribe(Observer.Synchronize(observer, gate)); 204 }); 205 #endif 206 } 207 208 #endregion 209 } 210 } 211